# -*- coding: utf-8 -*-
# vim:ts=4:sw=4:expandtab
"""Mime message parsing and manipulation functions for Autocrypt usage. """
from __future__ import unicode_literals, print_function
import logging
import copy
import email.parser
import base64
import quopri
import time
from .myattr import attrs, attrib, attrib_bytes_or_none, attrib_text_or_none
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.utils import formatdate, make_msgid
from email.utils import formataddr # noqa
from email.generator import _make_boundary
import six
if six.PY3:
from email.generator import BytesGenerator
from email import message_from_bytes, message_from_binary_file
else:
from email.generator import Generator as BytesGenerator
from email import message_from_string as message_from_bytes # noqa
from email import message_from_file as message_from_binary_file # noqa
def decode_keydata(ascii_keydata):
return base64.b64decode(ascii_keydata)
# slighly hacky way to get a byte string out of a message
class MyBytesIO(six.BytesIO):
def write(self, s):
if isinstance(s, six.text_type):
s = s.encode("ascii")
return six.BytesIO.write(self, s)
def msg2bytes(msg):
# f = six.BytesIO()
f = MyBytesIO()
BytesGenerator(f).flatten(msg)
return f.getvalue()
# main functions
def make_ac_header_value(addr, keydata, prefer_encrypt="nopreference"):
addr = parse_email_addr(addr)
assert keydata
key = base64.b64encode(keydata) if isinstance(keydata, bytes) else keydata
if isinstance(key, bytes):
key = key.decode("ascii")
l = ["addr=" + addr]
if prefer_encrypt != "nopreference":
l.append("prefer-encrypt=" + prefer_encrypt)
l.append("keydata=\n" + indented_split(key))
return "; ".join(l)
def indented_split(value, maxlen=78, indent=" "):
assert "\n" not in value
l = []
for i in range(0, len(value), maxlen):
l.append(indent + value[i:i + maxlen] + "\n")
return "".join(l).rstrip()
def get_target_emailadr(msg):
return [x[1] for x in get_target_fulladr(msg)]
def get_target_fulladr(msg):
tos = (msg.get_all("to") or []) + (msg.get_all("cc") or [])
return email.utils.getaddresses(tos)
[docs]def parse_email_addr(string):
""" return the routable email address part from a email-field string.
If the address is of type bytes and not ascii, it is returned in
quoted printable encoding.
"""
prefix, emailadr = email.utils.parseaddr(string)
if isinstance(emailadr, bytes):
emailadr = six.text_type(quopri.encodestring(emailadr))
return emailadr.lower()
def parse_message_from_file(fp):
return email.parser.Parser().parse(fp)
def parse_message_from_string(string):
if isinstance(string, bytes):
stream = six.BytesIO(string)
else:
stream = six.StringIO(string)
return parse_message_from_file(stream)
def is_encrypted(msg):
if msg.get_content_type() == "multipart/encrypted":
parts = msg.get_payload()
return (len(parts) == 2
and parts[0].get_content_type() == 'application/pgp-encrypted'
and parts[1].get_content_type() == 'application/octet-stream')
def parse_one_ac_header_from_string(string):
msg = parse_message_from_string(string)
return parse_one_ac_header_from_msg(msg)
def parse_one_ac_header_from_msg(msg, FromList=None):
if msg.get_content_type() == 'multipart/report':
return ACParseResult(error="Ignoring 'multipart/report' message.")
froms = msg.get_all("From") or []
if FromList is not None:
FromList = [parse_email_addr(x) for x in FromList]
if len(email.utils.getaddresses(froms)) > 1:
return ACParseResult(error="Ignoring message with more than one address in From header.")
results = []
err_results = []
for ac_header_value in msg.get_all("Autocrypt") or []:
r = parse_ac_headervalue(ac_header_value)
if r.error:
err_results.append(r)
elif FromList and r.addr not in FromList:
e = ACParseResult(error="addr %r does not match %r" % (r.addr, FromList))
err_results.append(e)
else:
results.append(r)
if len(results) == 1:
return results[0]
if len(results) > 1:
return ACParseResult(error="more than one valid Autocrypt header found")
if err_results:
return err_results[0]
return ACParseResult(error="no valid Autocrypt header found")
def get_gossip_headers_from_msg(msg):
results = {}
for ac_header_value in msg.get_all("Autocrypt-Gossip") or []:
r = parse_ac_headervalue(ac_header_value)
if not r.error:
results[r.addr] = r
else:
logging.error(r.error)
return results
@attrs
class ACParseResult(object):
keydata = attrib_bytes_or_none()
addr = attrib_text_or_none()
prefer_encrypt = attrib_text_or_none()
extra_attr = attrib(default=None)
error = attrib_text_or_none()
def gen_mail_msg(From, To, Cc=None, _extra=None, Autocrypt=None,
Subject="testmail", Date=None, _dto=False,
MessageID=None, payload='Autoresponse\n',
ENCRYPT=None,
charset=None):
if Cc is None:
Cc = []
assert isinstance(To, (list, tuple))
assert isinstance(Cc, (list, tuple))
if MessageID is None:
MessageID = make_msgid()
if not isinstance(payload, list):
msg = MIMEText(payload or '', _charset=charset)
else:
msg = MIMEMultipart()
assert not payload
msg['From'] = From
msg['To'] = ",".join(To)
if Cc:
msg['Cc'] = ",".join(Cc)
msg['Message-ID'] = MessageID
if Subject is not None:
msg['Subject'] = Subject
if ENCRYPT is not None:
msg['ENCRYPT'] = ENCRYPT
Date = 0 if not Date else Date
if isinstance(Date, int):
Date = formatdate(time.time() + Date)
msg['Date'] = Date
if _extra:
for name, value in _extra.items():
msg.add_header(name, value)
if _dto is True:
msg["Delivered-To"] = To[0]
elif isinstance(_dto, six.text_type):
msg["Delivered-To"] = _dto
if Autocrypt:
msg["Autocrypt"] = Autocrypt
return msg
def gen_boundary():
return _make_boundary()
def make_message(content_type, payload=None):
msg = email.message.Message()
del msg["MIME-Version"]
msg["Content-Type"] = content_type
if payload is not None:
msg.set_payload(payload)
return msg
def make_content_message_from_email(msg):
newmsg = copy.deepcopy(msg)
for key in newmsg.keys():
if key.lower() not in ("content-transfer-encoding",
"content-type"):
del newmsg[key]
return newmsg
def transfer_non_content_headers(msg, newmsg):
_ignore_headers = ["content-type", "mime-version", "content-transfer-encoding"]
for header, value in msg.items():
if header.lower() not in _ignore_headers:
newmsg[header] = value
def get_delivered_to(msg, fallback_delivto=None):
delivto = parse_email_addr(msg.get("Delivered-To"))
if not delivto and fallback_delivto:
delivto = parse_email_addr(fallback_delivto)
if not delivto:
raise ValueError("could not determine my own delivered-to address")
return delivto
def make_displayable(string):
if string is None:
return ''
if isinstance(string, six.text_type):
return string
assert isinstance(string, bytes)
for enc in ["utf-8", "latin1"]:
try:
return string.decode(enc)
except Exception:
pass
return six.text_type(quopri.encodestring(enc))
# adapted from ModernPGP:memoryhole/generators/generator.py which
# was adapted from notmuch:devel/printmimestructure
[docs]def render_mime_structure(msg, prefix='└'):
'''msg should be an email.message.Message object'''
stream = six.StringIO()
mcset = msg.get_charset()
fn = make_displayable(msg.get_filename())
fname = ' [' + fn + ']'
cset = '' if mcset is None else ' ({})'.format(mcset)
disp = msg.get_params(None, header='Content-Disposition')
if (disp is None):
disposition = ''
else:
disposition = ''
for d in disp:
if d[0] in ['attachment', 'inline']:
disposition = ' ' + d[0]
if 'subject' in msg:
subject = ' (Subject: %s)' % msg['subject']
else:
subject = ''
if (msg.is_multipart()):
print(prefix + '┬╴' + msg.get_content_type() + cset
+ disposition + fname, str(len(msg.as_string()))
+ ' bytes' + subject, file=stream)
if prefix.endswith('└'):
prefix = prefix.rpartition('└')[0] + ' '
if prefix.endswith('├'):
prefix = prefix.rpartition('├')[0] + '│'
parts = msg.get_payload()
i = 0
while (i < len(parts) - 1):
print(render_mime_structure(parts[i], prefix + '├'), file=stream)
i += 1
print(render_mime_structure(parts[i], prefix + '└'), file=stream)
# FIXME: show epilogue?
else:
print(prefix + '─╴' + msg.get_content_type() + cset + disposition +
fname, msg.get_payload().__len__().__str__(),
'bytes' + subject, file=stream)
return stream.getvalue().rstrip()