1# encoding=utf-8 2# Copyright (C) 2011-2012 Patrick Totzke <patricktotzke@gmail.com> 3# Copyright © 2017 Dylan Baker <dylan@pnwbakers.com> 4# This file is released under the GNU GPL, version 3 or a later revision. 5# For further details see the COPYING file 6import os 7import email 8import email.charset as charset 9import email.policy 10import email.utils 11import tempfile 12import re 13import logging 14import mailcap 15import io 16import base64 17import quopri 18 19from .. import crypto 20from .. import helper 21from ..errors import GPGProblem 22from ..settings.const import settings 23from ..helper import string_sanitize 24from ..helper import string_decode 25from ..helper import parse_mailcap_nametemplate 26from ..helper import split_commandstring 27 28charset.add_charset('utf-8', charset.QP, charset.QP, 'utf-8') 29 30X_SIGNATURE_VALID_HEADER = 'X-Alot-OpenPGP-Signature-Valid' 31X_SIGNATURE_MESSAGE_HEADER = 'X-Alot-OpenPGP-Signature-Message' 32 33_APP_PGP_SIG = 'application/pgp-signature' 34_APP_PGP_ENC = 'application/pgp-encrypted' 35 36 37def add_signature_headers(mail, sigs, error_msg): 38 '''Add pseudo headers to the mail indicating whether the signature 39 verification was successful. 40 41 :param mail: :class:`email.message.Message` the message to entitle 42 :param sigs: list of :class:`gpg.results.Signature` 43 :param error_msg: An error message if there is one, or None 44 :type error_msg: :class:`str` or `None` 45 ''' 46 sig_from = '' 47 sig_known = True 48 uid_trusted = False 49 50 assert error_msg is None or isinstance(error_msg, str) 51 52 if not sigs: 53 error_msg = error_msg or 'no signature found' 54 elif not error_msg: 55 try: 56 key = crypto.get_key(sigs[0].fpr) 57 for uid in key.uids: 58 if crypto.check_uid_validity(key, uid.email): 59 sig_from = uid.uid 60 uid_trusted = True 61 break 62 else: 63 # No trusted uid found, since we did not break from the loop. 64 sig_from = key.uids[0].uid 65 except GPGProblem: 66 sig_from = sigs[0].fpr 67 sig_known = False 68 69 if error_msg: 70 msg = 'Invalid: {}'.format(error_msg) 71 elif uid_trusted: 72 msg = 'Valid: {}'.format(sig_from) 73 else: 74 msg = 'Untrusted: {}'.format(sig_from) 75 76 mail.add_header(X_SIGNATURE_VALID_HEADER, 77 'False' if (error_msg or not sig_known) else 'True') 78 mail.add_header(X_SIGNATURE_MESSAGE_HEADER, msg) 79 80 81def get_params(mail, failobj=None, header='content-type', unquote=True): 82 '''Get Content-Type parameters as dict. 83 84 RFC 2045 specifies that parameter names are case-insensitive, so 85 we normalize them here. 86 87 :param mail: :class:`email.message.Message` 88 :param failobj: object to return if no such header is found 89 :param header: the header to search for parameters, default 90 :param unquote: unquote the values 91 :returns: a `dict` containing the parameters 92 ''' 93 failobj = failobj or [] 94 return {k.lower(): v for k, v in mail.get_params(failobj, header, unquote)} 95 96 97def _handle_signatures(original, message, params): 98 """Shared code for handling message signatures. 99 100 RFC 3156 is quite strict: 101 * exactly two messages 102 * the second is of type 'application/pgp-signature' 103 * the second contains the detached signature 104 105 :param original: The original top-level mail. This is required to attache 106 special headers to 107 :type original: :class:`email.message.Message` 108 :param message: The multipart/signed payload to verify 109 :type message: :class:`email.message.Message` 110 :param params: the message parameters as returned by :func:`get_params` 111 :type params: dict[str, str] 112 """ 113 malformed = None 114 if len(message.get_payload()) != 2: 115 malformed = 'expected exactly two messages, got {0}'.format( 116 len(message.get_payload())) 117 else: 118 ct = message.get_payload(1).get_content_type() 119 if ct != _APP_PGP_SIG: 120 malformed = 'expected Content-Type: {0}, got: {1}'.format( 121 _APP_PGP_SIG, ct) 122 123 # TODO: RFC 3156 says the alg has to be lower case, but I've seen a message 124 # with 'PGP-'. maybe we should be more permissive here, or maybe not, this 125 # is crypto stuff... 126 if not params.get('micalg', 'nothing').startswith('pgp-'): 127 malformed = 'expected micalg=pgp-..., got: {0}'.format( 128 params.get('micalg', 'nothing')) 129 130 sigs = [] 131 if not malformed: 132 try: 133 sigs = crypto.verify_detached( 134 message.get_payload(0).as_bytes(policy=email.policy.SMTP), 135 message.get_payload(1).get_payload(decode=True)) 136 except GPGProblem as e: 137 malformed = str(e) 138 139 add_signature_headers(original, sigs, malformed) 140 141 142def _handle_encrypted(original, message, session_keys=None): 143 """Handle encrypted messages helper. 144 145 RFC 3156 is quite strict: 146 * exactly two messages 147 * the first is of type 'application/pgp-encrypted' 148 * the first contains 'Version: 1' 149 * the second is of type 'application/octet-stream' 150 * the second contains the encrypted and possibly signed data 151 152 :param original: The original top-level mail. This is required to attache 153 special headers to 154 :type original: :class:`email.message.Message` 155 :param message: The multipart/signed payload to verify 156 :type message: :class:`email.message.Message` 157 :param session_keys: a list OpenPGP session keys 158 :type session_keys: [str] 159 """ 160 malformed = False 161 162 ct = message.get_payload(0).get_content_type() 163 if ct != _APP_PGP_ENC: 164 malformed = 'expected Content-Type: {0}, got: {1}'.format( 165 _APP_PGP_ENC, ct) 166 167 want = 'application/octet-stream' 168 ct = message.get_payload(1).get_content_type() 169 if ct != want: 170 malformed = 'expected Content-Type: {0}, got: {1}'.format(want, ct) 171 172 if not malformed: 173 # This should be safe because PGP uses US-ASCII characters only 174 payload = message.get_payload(1).get_payload().encode('ascii') 175 try: 176 sigs, d = crypto.decrypt_verify(payload, session_keys) 177 except GPGProblem as e: 178 # signature verification failures end up here too if the combined 179 # method is used, currently this prevents the interpretation of the 180 # recovered plain text mail. maybe that's a feature. 181 malformed = str(e) 182 else: 183 n = decrypted_message_from_bytes(d, session_keys) 184 185 # add the decrypted message to message. note that n contains all 186 # the attachments, no need to walk over n here. 187 original.attach(n) 188 189 original.defects.extend(n.defects) 190 191 # there are two methods for both signed and encrypted data, one is 192 # called 'RFC 1847 Encapsulation' by RFC 3156, and one is the 193 # 'Combined method'. 194 if not sigs: 195 # 'RFC 1847 Encapsulation', the signature is a detached 196 # signature found in the recovered mime message of type 197 # multipart/signed. 198 if X_SIGNATURE_VALID_HEADER in n: 199 for k in (X_SIGNATURE_VALID_HEADER, 200 X_SIGNATURE_MESSAGE_HEADER): 201 original[k] = n[k] 202 else: 203 # 'Combined method', the signatures are returned by the 204 # decrypt_verify function. 205 206 # note that if we reached this point, we know the signatures 207 # are valid. if they were not valid, the else block of the 208 # current try would not have been executed 209 add_signature_headers(original, sigs, '') 210 211 if malformed: 212 msg = 'Malformed OpenPGP message: {0}'.format(malformed) 213 content = email.message_from_string(msg, 214 _class=email.message.EmailMessage, 215 policy=email.policy.SMTP) 216 content.set_charset('utf-8') 217 original.attach(content) 218 219 220def decrypted_message_from_file(handle, session_keys=None): 221 '''Reads a mail from the given file-like object and returns an email 222 object, very much like email.message_from_file. In addition to 223 that OpenPGP encrypted data is detected and decrypted. If this 224 succeeds, any mime messages found in the recovered plaintext 225 message are added to the returned message object. 226 227 :param handle: a file-like object 228 :param session_keys: a list OpenPGP session keys 229 :returns: :class:`email.message.Message` possibly augmented with 230 decrypted data 231 ''' 232 return decrypted_message_from_message(email.message_from_file(handle, 233 _class=email.message.EmailMessage), 234 session_keys) 235 236 237def decrypted_message_from_message(m, session_keys=None): 238 '''Detect and decrypt OpenPGP encrypted data in an email object. If this 239 succeeds, any mime messages found in the recovered plaintext 240 message are added to the returned message object. 241 242 :param m: an email object 243 :param session_keys: a list OpenPGP session keys 244 :returns: :class:`email.message.Message` possibly augmented with 245 decrypted data 246 ''' 247 # make sure no one smuggles a token in (data from m is untrusted) 248 del m[X_SIGNATURE_VALID_HEADER] 249 del m[X_SIGNATURE_MESSAGE_HEADER] 250 251 if m.is_multipart(): 252 p = get_params(m) 253 254 # handle OpenPGP signed data 255 if (m.get_content_subtype() == 'signed' and 256 p.get('protocol') == _APP_PGP_SIG): 257 _handle_signatures(m, m, p) 258 259 # handle OpenPGP encrypted data 260 elif (m.get_content_subtype() == 'encrypted' and 261 p.get('protocol') == _APP_PGP_ENC and 262 'Version: 1' in m.get_payload(0).get_payload()): 263 _handle_encrypted(m, m, session_keys) 264 265 # It is also possible to put either of the abov into a multipart/mixed 266 # segment 267 elif m.get_content_subtype() == 'mixed': 268 sub = m.get_payload(0) 269 270 if sub.is_multipart(): 271 p = get_params(sub) 272 273 if (sub.get_content_subtype() == 'signed' and 274 p.get('protocol') == _APP_PGP_SIG): 275 _handle_signatures(m, sub, p) 276 elif (sub.get_content_subtype() == 'encrypted' and 277 p.get('protocol') == _APP_PGP_ENC): 278 _handle_encrypted(m, sub, session_keys) 279 280 return m 281 282 283def decrypted_message_from_string(s, session_keys=None): 284 '''Reads a mail from the given string. This is the equivalent of 285 :func:`email.message_from_string` which does nothing but to wrap 286 the given string in a StringIO object and to call 287 :func:`email.message_from_file`. 288 289 Please refer to the documentation of :func:`message_from_file` for 290 details. 291 292 ''' 293 return decrypted_message_from_file(io.StringIO(s), session_keys) 294 295 296def decrypted_message_from_bytes(bytestring, session_keys=None): 297 """Create a Message from bytes. 298 299 :param bytes bytestring: an email message as raw bytes 300 :param session_keys: a list OpenPGP session keys 301 """ 302 return decrypted_message_from_message( 303 email.message_from_bytes(bytestring, 304 _class=email.message.EmailMessage, 305 policy=email.policy.SMTP), 306 session_keys) 307 308 309def extract_headers(mail, headers=None): 310 """ 311 returns subset of this messages headers as human-readable format: 312 all header values are decoded, the resulting string has 313 one line "KEY: VALUE" for each requested header present in the mail. 314 315 :param mail: the mail to use 316 :type mail: :class:`email.message.EmailMessage` 317 :param headers: headers to extract 318 :type headers: list of str 319 """ 320 headertext = '' 321 if headers is None: 322 headers = mail.keys() 323 for key in headers: 324 value = '' 325 if key in mail: 326 value = decode_header(mail.get(key, '')) 327 headertext += '%s: %s\n' % (key, value) 328 return headertext 329 330 331def render_part(part, field_key='copiousoutput'): 332 """ 333 renders a non-multipart email part into displayable plaintext by piping its 334 payload through an external script. The handler itself is determined by 335 the mailcap entry for this part's ctype. 336 """ 337 ctype = part.get_content_type() 338 raw_payload = remove_cte(part) 339 rendered_payload = None 340 # get mime handler 341 _, entry = settings.mailcap_find_match(ctype, key=field_key) 342 if entry is not None: 343 tempfile_name = None 344 stdin = None 345 handler_raw_commandstring = entry['view'] 346 # in case the mailcap defined command contains no '%s', 347 # we pipe the files content to the handling command via stdin 348 if '%s' in handler_raw_commandstring: 349 # open tempfile, respect mailcaps nametemplate 350 nametemplate = entry.get('nametemplate', '%s') 351 prefix, suffix = parse_mailcap_nametemplate(nametemplate) 352 with tempfile.NamedTemporaryFile( 353 delete=False, prefix=prefix, suffix=suffix) \ 354 as tmpfile: 355 tmpfile.write(raw_payload) 356 tempfile_name = tmpfile.name 357 else: 358 stdin = raw_payload 359 360 # read parameter, create handler command 361 parms = tuple('='.join(p) for p in part.get_params()) 362 363 # create and call external command 364 cmd = mailcap.subst(entry['view'], ctype, 365 filename=tempfile_name, plist=parms) 366 logging.debug('command: %s', cmd) 367 logging.debug('parms: %s', str(parms)) 368 cmdlist = split_commandstring(cmd) 369 # call handler 370 stdout, _, _ = helper.call_cmd(cmdlist, stdin=stdin) 371 if stdout: 372 rendered_payload = stdout 373 374 # remove tempfile 375 if tempfile_name: 376 os.unlink(tempfile_name) 377 378 return rendered_payload 379 380 381def remove_cte(part, as_string=False): 382 """Interpret MIME-part according to it's Content-Transfer-Encodings. 383 384 This returns the payload of `part` as string or bytestring for display, or 385 to be passed to an external program. In the raw file the payload may be 386 encoded, e.g. in base64, quoted-printable, 7bit, or 8bit. This method will 387 look for one of the above Content-Transfer-Encoding header and interpret 388 the payload accordingly. 389 390 Incorrect header values (common in spam messages) will be interpreted as 391 lenient as possible and will result in INFO-level debug messages. 392 393 ..Note:: All this may be depricated in favour of 394 `email.contentmanager.raw_data_manager` (v3.6+) 395 396 :param email.message.EmailMessage part: The part to decode 397 :param bool as_string: If true return a str, otherwise return bytes 398 :returns: The mail with any Content-Transfer-Encoding removed 399 :rtype: Union[str, bytes] 400 """ 401 enc = part.get_content_charset() or 'ascii' 402 cte = str(part.get('content-transfer-encoding', '7bit')).lower().strip() 403 payload = part.get_payload() 404 sp = '' # string variant of return value 405 bp = b'' # bytestring variant 406 407 logging.debug('Content-Transfer-Encoding: "{}"'.format(cte)) 408 if cte not in ['quoted-printable', 'base64', '7bit', '8bit', 'binary']: 409 logging.info('Unknown Content-Transfer-Encoding: "{}"'.format(cte)) 410 411 # switch through all sensible cases 412 # starting with those where payload is already a str 413 if '7bit' in cte or 'binary' in cte: 414 logging.debug('assuming Content-Transfer-Encoding: 7bit') 415 sp = payload 416 if as_string: 417 return sp 418 bp = payload.encode('utf-8') 419 return bp 420 421 # the remaining cases need decoding and define only bt; 422 # decoding into a str is done at the end if requested 423 elif '8bit' in cte: 424 logging.debug('assuming Content-Transfer-Encoding: 8bit') 425 # Python's mail library may decode 8bit as raw-unicode-escape, so 426 # we need to encode that back to bytes so we can decode it using 427 # the correct encoding, or it might not, in which case assume that 428 # the str representation we got is correct. 429 bp = payload.encode('raw-unicode-escape') 430 431 elif 'quoted-printable' in cte: 432 logging.debug('assuming Content-Transfer-Encoding: quoted-printable') 433 bp = quopri.decodestring(payload.encode('ascii')) 434 435 elif 'base64' in cte: 436 logging.debug('assuming Content-Transfer-Encoding: base64') 437 bp = base64.b64decode(payload) 438 439 else: 440 logging.debug('failed to interpret Content-Transfer-Encoding: ' 441 '"{}"'.format(cte)) 442 443 # by now, bp is defined, sp is not. 444 if as_string: 445 try: 446 sp = bp.decode(enc) 447 except LookupError: 448 # enc is unknown; 449 # fall back to guessing the correct encoding using libmagic 450 sp = helper.try_decode(bp) 451 except UnicodeDecodeError as emsg: 452 # the mail contains chars that are not enc-encoded. 453 # libmagic works better than just ignoring those 454 logging.debug('Decoding failure: {}'.format(emsg)) 455 sp = helper.try_decode(bp) 456 return sp 457 return bp 458 459 460MISSING_HTML_MSG = ("This message contains a text/html part that was not " 461 "rendered due to a missing mailcap entry. " 462 "Please refer to item 5 in our FAQ: " 463 "http://alot.rtfd.io/en/latest/faq.html") 464 465 466def extract_body(mail): 467 """Returns a string view of a Message. 468 469 This consults :ref:`prefer_plaintext <prefer-plaintext>` 470 to determine if a "text/plain" alternative is preferred over a "text/html" 471 part. 472 473 :param mail: the mail to use 474 :type mail: :class:`email.message.EmailMessage` 475 :returns: The combined text of any parts to be used 476 :rtype: str 477 """ 478 479 if settings.get('prefer_plaintext'): 480 preferencelist = ('plain', 'html') 481 else: 482 preferencelist = ('html', 'plain') 483 484 body_part = mail.get_body(preferencelist) 485 if body_part is None: # if no part matching preferredlist was found 486 return "" 487 488 displaystring = "" 489 490 if body_part.get_content_type() == 'text/plain': 491 displaystring = string_sanitize(remove_cte(body_part, as_string=True)) 492 else: 493 rendered_payload = render_part(body_part) 494 if rendered_payload: # handler had output 495 displaystring = string_sanitize(rendered_payload) 496 else: 497 if body_part.get_content_type() == 'text/html': 498 displaystring = MISSING_HTML_MSG 499 return displaystring 500 501 502def formataddr(pair): 503 """ this is the inverse of email.utils.parseaddr: 504 other than email.utils.formataddr, this 505 - *will not* re-encode unicode strings, and 506 - *will* re-introduce quotes around real names containing commas 507 """ 508 name, address = pair 509 if not name: 510 return address 511 elif ',' in name: 512 name = "\"" + name + "\"" 513 return "{0} <{1}>".format(name, address) 514 515 516def decode_header(header, normalize=False): 517 """ 518 decode a header value to a unicode string 519 520 values are usually a mixture of different substrings 521 encoded in quoted printable using different encodings. 522 This turns it into a single unicode string 523 524 :param header: the header value 525 :type header: str 526 :param normalize: replace trailing spaces after newlines 527 :type normalize: bool 528 :rtype: str 529 """ 530 logging.debug("unquoted header: |%s|", header) 531 532 valuelist = email.header.decode_header(header) 533 decoded_list = [] 534 for v, enc in valuelist: 535 v = string_decode(v, enc) 536 decoded_list.append(string_sanitize(v)) 537 value = ''.join(decoded_list) 538 if normalize: 539 value = re.sub(r'\n\s+', r' ', value) 540 return value 541 542 543def is_subdir_of(subpath, superpath): 544 # make both absolute 545 superpath = os.path.realpath(superpath) 546 subpath = os.path.realpath(subpath) 547 548 # return true, if the common prefix of both is equal to directory 549 # e.g. /a/b/c/d.rst and directory is /a/b, the common prefix is /a/b 550 return os.path.commonprefix([subpath, superpath]) == superpath 551 552 553def clear_my_address(my_account, value): 554 """return recipient header without the addresses in my_account 555 556 :param my_account: my account 557 :type my_account: :class:`Account` 558 :param value: a list of recipient or sender strings (with or without 559 real names as taken from email headers) 560 :type value: list(str) 561 :returns: a new, potentially shortend list 562 :rtype: list(str) 563 """ 564 new_value = [] 565 for name, address in email.utils.getaddresses(value): 566 if not my_account.matches_address(address): 567 new_value.append(formataddr((name, address))) 568 return new_value 569 570 571def ensure_unique_address(recipients): 572 """ 573 clean up a list of name,address pairs so that 574 no address appears multiple times. 575 """ 576 res = dict() 577 for name, address in email.utils.getaddresses(recipients): 578 res[address] = name 579 logging.debug(res) 580 urecipients = [formataddr((n, a)) for a, n in res.items()] 581 return sorted(urecipients) 582