1# -*- coding: utf-8 -*- 2# vim:ts=4:sw=4:expandtab 3 4"""Mime message parsing and manipulation functions for Autocrypt usage. """ 5 6from __future__ import unicode_literals, print_function 7import logging 8import copy 9import email.parser 10import base64 11import quopri 12import time 13from .myattr import attrs, attrib, attrib_bytes_or_none, attrib_text_or_none 14from email.mime.text import MIMEText 15from email.mime.multipart import MIMEMultipart 16from email.utils import formatdate, make_msgid 17from email.utils import formataddr # noqa 18from email.generator import _make_boundary 19import six 20 21if six.PY3: 22 from email.generator import BytesGenerator 23 from email import message_from_bytes, message_from_binary_file 24else: 25 from email.generator import Generator as BytesGenerator 26 from email import message_from_string as message_from_bytes # noqa 27 from email import message_from_file as message_from_binary_file # noqa 28 29 30def decode_keydata(ascii_keydata): 31 return base64.b64decode(ascii_keydata) 32 33 34# slighly hacky way to get a byte string out of a message 35 36class MyBytesIO(six.BytesIO): 37 def write(self, s): 38 if isinstance(s, six.text_type): 39 s = s.encode("ascii") 40 return six.BytesIO.write(self, s) 41 42 43def msg2bytes(msg): 44 # f = six.BytesIO() 45 f = MyBytesIO() 46 BytesGenerator(f).flatten(msg) 47 return f.getvalue() 48 49 50# main functions 51 52def make_ac_header_value(addr, keydata, prefer_encrypt="nopreference"): 53 addr = parse_email_addr(addr) 54 assert keydata 55 key = base64.b64encode(keydata) if isinstance(keydata, bytes) else keydata 56 if isinstance(key, bytes): 57 key = key.decode("ascii") 58 l = ["addr=" + addr] 59 if prefer_encrypt != "nopreference": 60 l.append("prefer-encrypt=" + prefer_encrypt) 61 l.append("keydata=\n" + indented_split(key)) 62 return "; ".join(l) 63 64 65def indented_split(value, maxlen=78, indent=" "): 66 assert "\n" not in value 67 l = [] 68 for i in range(0, len(value), maxlen): 69 l.append(indent + value[i:i + maxlen] + "\n") 70 return "".join(l).rstrip() 71 72 73def get_target_emailadr(msg): 74 return [x[1] for x in get_target_fulladr(msg)] 75 76 77def get_target_fulladr(msg): 78 tos = (msg.get_all("to") or []) + (msg.get_all("cc") or []) 79 return email.utils.getaddresses(tos) 80 81 82def parse_email_addr(string): 83 """ return the routable email address part from a email-field string. 84 85 If the address is of type bytes and not ascii, it is returned in 86 quoted printable encoding. 87 """ 88 prefix, emailadr = email.utils.parseaddr(string) 89 if isinstance(emailadr, bytes): 90 emailadr = six.text_type(quopri.encodestring(emailadr)) 91 return emailadr.lower() 92 93 94def parse_message_from_file(fp): 95 return email.parser.Parser().parse(fp) 96 97 98def parse_message_from_string(string): 99 if isinstance(string, bytes): 100 stream = six.BytesIO(string) 101 else: 102 stream = six.StringIO(string) 103 return parse_message_from_file(stream) 104 105 106def is_encrypted(msg): 107 if msg.get_content_type() == "multipart/encrypted": 108 parts = msg.get_payload() 109 return (len(parts) == 2 110 and parts[0].get_content_type() == 'application/pgp-encrypted' 111 and parts[1].get_content_type() == 'application/octet-stream') 112 113 114def parse_one_ac_header_from_string(string): 115 msg = parse_message_from_string(string) 116 return parse_one_ac_header_from_msg(msg) 117 118 119def parse_one_ac_header_from_msg(msg, FromList=None): 120 if msg.get_content_type() == 'multipart/report': 121 return ACParseResult(error="Ignoring 'multipart/report' message.") 122 froms = msg.get_all("From") or [] 123 if FromList is not None: 124 FromList = [parse_email_addr(x) for x in FromList] 125 126 if len(email.utils.getaddresses(froms)) > 1: 127 return ACParseResult(error="Ignoring message with more than one address in From header.") 128 results = [] 129 err_results = [] 130 for ac_header_value in msg.get_all("Autocrypt") or []: 131 r = parse_ac_headervalue(ac_header_value) 132 if r.error: 133 err_results.append(r) 134 elif FromList and r.addr not in FromList: 135 e = ACParseResult(error="addr %r does not match %r" % (r.addr, FromList)) 136 err_results.append(e) 137 else: 138 results.append(r) 139 140 if len(results) == 1: 141 return results[0] 142 if len(results) > 1: 143 return ACParseResult(error="more than one valid Autocrypt header found") 144 if err_results: 145 return err_results[0] 146 return ACParseResult(error="no valid Autocrypt header found") 147 148 149def get_gossip_headers_from_msg(msg): 150 results = {} 151 for ac_header_value in msg.get_all("Autocrypt-Gossip") or []: 152 r = parse_ac_headervalue(ac_header_value) 153 if not r.error: 154 results[r.addr] = r 155 else: 156 logging.error(r.error) 157 158 return results 159 160 161def parse_ac_headervalue(value): 162 """ return a Result object with keydata/addr/prefer_encrypt/extra_attr/error 163 attributes. 164 165 If the error attribute is set on the result object then all 166 other attribute values are undefined. 167 """ 168 parts = filter(None, [x.strip() for x in value.split(";")]) 169 if not parts: 170 return ACParseResult(error="empty header") 171 172 result_dict = {"prefer_encrypt": "nopreference"} 173 extra_attr = {} 174 for x in parts: 175 kv = x.split("=", 1) 176 if not len(kv) == 2: 177 return ACParseResult(error="malformed setting") 178 name, value = [x.strip() for x in kv] 179 if name == "keydata": 180 try: 181 value = decode_keydata("".join(value.split())) 182 except Exception: 183 return ACParseResult(error="failed to decode keydata") 184 elif name == "prefer-encrypt": 185 name = "prefer_encrypt" 186 if value not in ("nopreference", "mutual"): 187 return ACParseResult(error="unknown prefer-encrypt setting '%s'" % value) 188 elif name == "addr": 189 value = parse_email_addr(value) 190 elif name[0] != "_": 191 return ACParseResult(error="unknown critical attr '%s'" % name) 192 else: 193 extra_attr[name] = value 194 continue 195 result_dict[name] = value 196 for attr in ("keydata", "addr"): 197 if attr not in result_dict: 198 return ACParseResult(error="critical attr '%s' missing" % attr) 199 return ACParseResult(extra_attr=extra_attr, **result_dict) 200 201 202@attrs 203class ACParseResult(object): 204 keydata = attrib_bytes_or_none() 205 addr = attrib_text_or_none() 206 prefer_encrypt = attrib_text_or_none() 207 extra_attr = attrib(default=None) 208 error = attrib_text_or_none() 209 210 211def gen_mail_msg(From, To, Cc=None, _extra=None, Autocrypt=None, 212 Subject="testmail", Date=None, _dto=False, 213 MessageID=None, payload='Autoresponse\n', 214 ENCRYPT=None, 215 charset=None): 216 if Cc is None: 217 Cc = [] 218 assert isinstance(To, (list, tuple)) 219 assert isinstance(Cc, (list, tuple)) 220 if MessageID is None: 221 MessageID = make_msgid() 222 223 if not isinstance(payload, list): 224 msg = MIMEText(payload or '', _charset=charset) 225 else: 226 msg = MIMEMultipart() 227 assert not payload 228 229 msg['From'] = From 230 msg['To'] = ",".join(To) 231 if Cc: 232 msg['Cc'] = ",".join(Cc) 233 msg['Message-ID'] = MessageID 234 if Subject is not None: 235 msg['Subject'] = Subject 236 if ENCRYPT is not None: 237 msg['ENCRYPT'] = ENCRYPT 238 Date = 0 if not Date else Date 239 if isinstance(Date, int): 240 Date = formatdate(time.time() + Date) 241 msg['Date'] = Date 242 if _extra: 243 for name, value in _extra.items(): 244 msg.add_header(name, value) 245 if _dto is True: 246 msg["Delivered-To"] = To[0] 247 elif isinstance(_dto, six.text_type): 248 msg["Delivered-To"] = _dto 249 if Autocrypt: 250 msg["Autocrypt"] = Autocrypt 251 return msg 252 253 254def gen_boundary(): 255 return _make_boundary() 256 257 258def make_message(content_type, payload=None): 259 msg = email.message.Message() 260 del msg["MIME-Version"] 261 msg["Content-Type"] = content_type 262 if payload is not None: 263 msg.set_payload(payload) 264 return msg 265 266 267def make_content_message_from_email(msg): 268 newmsg = copy.deepcopy(msg) 269 for key in newmsg.keys(): 270 if key.lower() not in ("content-transfer-encoding", 271 "content-type"): 272 del newmsg[key] 273 return newmsg 274 275 276def transfer_non_content_headers(msg, newmsg): 277 _ignore_headers = ["content-type", "mime-version", "content-transfer-encoding"] 278 for header, value in msg.items(): 279 if header.lower() not in _ignore_headers: 280 newmsg[header] = value 281 282 283def get_delivered_to(msg, fallback_delivto=None): 284 delivto = parse_email_addr(msg.get("Delivered-To")) 285 if not delivto and fallback_delivto: 286 delivto = parse_email_addr(fallback_delivto) 287 if not delivto: 288 raise ValueError("could not determine my own delivered-to address") 289 return delivto 290 291 292def make_displayable(string): 293 if string is None: 294 return '' 295 if isinstance(string, six.text_type): 296 return string 297 assert isinstance(string, bytes) 298 for enc in ["utf-8", "latin1"]: 299 try: 300 return string.decode(enc) 301 except Exception: 302 pass 303 return six.text_type(quopri.encodestring(enc)) 304 305 306# adapted from ModernPGP:memoryhole/generators/generator.py which 307# was adapted from notmuch:devel/printmimestructure 308def render_mime_structure(msg, prefix='└'): 309 '''msg should be an email.message.Message object''' 310 stream = six.StringIO() 311 mcset = msg.get_charset() 312 fn = make_displayable(msg.get_filename()) 313 fname = ' [' + fn + ']' 314 cset = '' if mcset is None else ' ({})'.format(mcset) 315 disp = msg.get_params(None, header='Content-Disposition') 316 if (disp is None): 317 disposition = '' 318 else: 319 disposition = '' 320 for d in disp: 321 if d[0] in ['attachment', 'inline']: 322 disposition = ' ' + d[0] 323 324 if 'subject' in msg: 325 subject = ' (Subject: %s)' % msg['subject'] 326 else: 327 subject = '' 328 if (msg.is_multipart()): 329 print(prefix + '┬╴' + msg.get_content_type() + cset 330 + disposition + fname, str(len(msg.as_string())) 331 + ' bytes' + subject, file=stream) 332 if prefix.endswith('└'): 333 prefix = prefix.rpartition('└')[0] + ' ' 334 if prefix.endswith('├'): 335 prefix = prefix.rpartition('├')[0] + '│' 336 parts = msg.get_payload() 337 i = 0 338 while (i < len(parts) - 1): 339 print(render_mime_structure(parts[i], prefix + '├'), file=stream) 340 i += 1 341 print(render_mime_structure(parts[i], prefix + '└'), file=stream) 342 # FIXME: show epilogue? 343 else: 344 print(prefix + '─╴' + msg.get_content_type() + cset + disposition + 345 fname, msg.get_payload().__len__().__str__(), 346 'bytes' + subject, file=stream) 347 return stream.getvalue().rstrip() 348