1# encoding=utf-8
2# Copyright (C) 2011-2012  Patrick Totzke <patricktotzke@gmail.com>
3# Copyright © 2017 Dylan Baker <dylan@pnwbakers.com>
4# This file is released under the GNU GPL, version 3 or a later revision.
5# For further details see the COPYING file
6import os
7import email
8import email.charset as charset
9import email.policy
10import email.utils
11import tempfile
12import re
13import logging
14import mailcap
15import io
16import base64
17import quopri
18
19from .. import crypto
20from .. import helper
21from ..errors import GPGProblem
22from ..settings.const import settings
23from ..helper import string_sanitize
24from ..helper import string_decode
25from ..helper import parse_mailcap_nametemplate
26from ..helper import split_commandstring
27
28charset.add_charset('utf-8', charset.QP, charset.QP, 'utf-8')
29
30X_SIGNATURE_VALID_HEADER = 'X-Alot-OpenPGP-Signature-Valid'
31X_SIGNATURE_MESSAGE_HEADER = 'X-Alot-OpenPGP-Signature-Message'
32
33_APP_PGP_SIG = 'application/pgp-signature'
34_APP_PGP_ENC = 'application/pgp-encrypted'
35
36
37def add_signature_headers(mail, sigs, error_msg):
38    '''Add pseudo headers to the mail indicating whether the signature
39    verification was successful.
40
41    :param mail: :class:`email.message.Message` the message to entitle
42    :param sigs: list of :class:`gpg.results.Signature`
43    :param error_msg: An error message if there is one, or None
44    :type error_msg: :class:`str` or `None`
45    '''
46    sig_from = ''
47    sig_known = True
48    uid_trusted = False
49
50    assert error_msg is None or isinstance(error_msg, str)
51
52    if not sigs:
53        error_msg = error_msg or 'no signature found'
54    elif not error_msg:
55        try:
56            key = crypto.get_key(sigs[0].fpr)
57            for uid in key.uids:
58                if crypto.check_uid_validity(key, uid.email):
59                    sig_from = uid.uid
60                    uid_trusted = True
61                    break
62            else:
63                # No trusted uid found, since we did not break from the loop.
64                sig_from = key.uids[0].uid
65        except GPGProblem:
66            sig_from = sigs[0].fpr
67            sig_known = False
68
69    if error_msg:
70        msg = 'Invalid: {}'.format(error_msg)
71    elif uid_trusted:
72        msg = 'Valid: {}'.format(sig_from)
73    else:
74        msg = 'Untrusted: {}'.format(sig_from)
75
76    mail.add_header(X_SIGNATURE_VALID_HEADER,
77                    'False' if (error_msg or not sig_known) else 'True')
78    mail.add_header(X_SIGNATURE_MESSAGE_HEADER, msg)
79
80
81def get_params(mail, failobj=None, header='content-type', unquote=True):
82    '''Get Content-Type parameters as dict.
83
84    RFC 2045 specifies that parameter names are case-insensitive, so
85    we normalize them here.
86
87    :param mail: :class:`email.message.Message`
88    :param failobj: object to return if no such header is found
89    :param header: the header to search for parameters, default
90    :param unquote: unquote the values
91    :returns: a `dict` containing the parameters
92    '''
93    failobj = failobj or []
94    return {k.lower(): v for k, v in mail.get_params(failobj, header, unquote)}
95
96
97def _handle_signatures(original, message, params):
98    """Shared code for handling message signatures.
99
100    RFC 3156 is quite strict:
101    * exactly two messages
102    * the second is of type 'application/pgp-signature'
103    * the second contains the detached signature
104
105    :param original: The original top-level mail. This is required to attache
106        special headers to
107    :type original: :class:`email.message.Message`
108    :param message: The multipart/signed payload to verify
109    :type message: :class:`email.message.Message`
110    :param params: the message parameters as returned by :func:`get_params`
111    :type params: dict[str, str]
112    """
113    malformed = None
114    if len(message.get_payload()) != 2:
115        malformed = 'expected exactly two messages, got {0}'.format(
116            len(message.get_payload()))
117    else:
118        ct = message.get_payload(1).get_content_type()
119        if ct != _APP_PGP_SIG:
120            malformed = 'expected Content-Type: {0}, got: {1}'.format(
121                _APP_PGP_SIG, ct)
122
123    # TODO: RFC 3156 says the alg has to be lower case, but I've seen a message
124    # with 'PGP-'. maybe we should be more permissive here, or maybe not, this
125    # is crypto stuff...
126    if not params.get('micalg', 'nothing').startswith('pgp-'):
127        malformed = 'expected micalg=pgp-..., got: {0}'.format(
128            params.get('micalg', 'nothing'))
129
130    sigs = []
131    if not malformed:
132        try:
133            sigs = crypto.verify_detached(
134                message.get_payload(0).as_bytes(policy=email.policy.SMTP),
135                message.get_payload(1).get_payload(decode=True))
136        except GPGProblem as e:
137            malformed = str(e)
138
139    add_signature_headers(original, sigs, malformed)
140
141
142def _handle_encrypted(original, message, session_keys=None):
143    """Handle encrypted messages helper.
144
145    RFC 3156 is quite strict:
146    * exactly two messages
147    * the first is of type 'application/pgp-encrypted'
148    * the first contains 'Version: 1'
149    * the second is of type 'application/octet-stream'
150    * the second contains the encrypted and possibly signed data
151
152    :param original: The original top-level mail. This is required to attache
153        special headers to
154    :type original: :class:`email.message.Message`
155    :param message: The multipart/signed payload to verify
156    :type message: :class:`email.message.Message`
157    :param session_keys: a list OpenPGP session keys
158    :type session_keys: [str]
159    """
160    malformed = False
161
162    ct = message.get_payload(0).get_content_type()
163    if ct != _APP_PGP_ENC:
164        malformed = 'expected Content-Type: {0}, got: {1}'.format(
165            _APP_PGP_ENC, ct)
166
167    want = 'application/octet-stream'
168    ct = message.get_payload(1).get_content_type()
169    if ct != want:
170        malformed = 'expected Content-Type: {0}, got: {1}'.format(want, ct)
171
172    if not malformed:
173        # This should be safe because PGP uses US-ASCII characters only
174        payload = message.get_payload(1).get_payload().encode('ascii')
175        try:
176            sigs, d = crypto.decrypt_verify(payload, session_keys)
177        except GPGProblem as e:
178            # signature verification failures end up here too if the combined
179            # method is used, currently this prevents the interpretation of the
180            # recovered plain text mail. maybe that's a feature.
181            malformed = str(e)
182        else:
183            n = decrypted_message_from_bytes(d, session_keys)
184
185            # add the decrypted message to message. note that n contains all
186            # the attachments, no need to walk over n here.
187            original.attach(n)
188
189            original.defects.extend(n.defects)
190
191            # there are two methods for both signed and encrypted data, one is
192            # called 'RFC 1847 Encapsulation' by RFC 3156, and one is the
193            # 'Combined method'.
194            if not sigs:
195                # 'RFC 1847 Encapsulation', the signature is a detached
196                # signature found in the recovered mime message of type
197                # multipart/signed.
198                if X_SIGNATURE_VALID_HEADER in n:
199                    for k in (X_SIGNATURE_VALID_HEADER,
200                              X_SIGNATURE_MESSAGE_HEADER):
201                        original[k] = n[k]
202            else:
203                # 'Combined method', the signatures are returned by the
204                # decrypt_verify function.
205
206                # note that if we reached this point, we know the signatures
207                # are valid. if they were not valid, the else block of the
208                # current try would not have been executed
209                add_signature_headers(original, sigs, '')
210
211    if malformed:
212        msg = 'Malformed OpenPGP message: {0}'.format(malformed)
213        content = email.message_from_string(msg,
214                                            _class=email.message.EmailMessage,
215                                            policy=email.policy.SMTP)
216        content.set_charset('utf-8')
217        original.attach(content)
218
219
220def decrypted_message_from_file(handle, session_keys=None):
221    '''Reads a mail from the given file-like object and returns an email
222    object, very much like email.message_from_file. In addition to
223    that OpenPGP encrypted data is detected and decrypted. If this
224    succeeds, any mime messages found in the recovered plaintext
225    message are added to the returned message object.
226
227    :param handle: a file-like object
228    :param session_keys: a list OpenPGP session keys
229    :returns: :class:`email.message.Message` possibly augmented with
230              decrypted data
231    '''
232    return decrypted_message_from_message(email.message_from_file(handle,
233                                          _class=email.message.EmailMessage),
234                                          session_keys)
235
236
237def decrypted_message_from_message(m, session_keys=None):
238    '''Detect and decrypt OpenPGP encrypted data in an email object. If this
239    succeeds, any mime messages found in the recovered plaintext
240    message are added to the returned message object.
241
242    :param m: an email object
243    :param session_keys: a list OpenPGP session keys
244    :returns: :class:`email.message.Message` possibly augmented with
245              decrypted data
246    '''
247    # make sure no one smuggles a token in (data from m is untrusted)
248    del m[X_SIGNATURE_VALID_HEADER]
249    del m[X_SIGNATURE_MESSAGE_HEADER]
250
251    if m.is_multipart():
252        p = get_params(m)
253
254        # handle OpenPGP signed data
255        if (m.get_content_subtype() == 'signed' and
256                p.get('protocol') == _APP_PGP_SIG):
257            _handle_signatures(m, m, p)
258
259        # handle OpenPGP encrypted data
260        elif (m.get_content_subtype() == 'encrypted' and
261              p.get('protocol') == _APP_PGP_ENC and
262              'Version: 1' in m.get_payload(0).get_payload()):
263            _handle_encrypted(m, m, session_keys)
264
265        # It is also possible to put either of the abov into a multipart/mixed
266        # segment
267        elif m.get_content_subtype() == 'mixed':
268            sub = m.get_payload(0)
269
270            if sub.is_multipart():
271                p = get_params(sub)
272
273                if (sub.get_content_subtype() == 'signed' and
274                        p.get('protocol') == _APP_PGP_SIG):
275                    _handle_signatures(m, sub, p)
276                elif (sub.get_content_subtype() == 'encrypted' and
277                      p.get('protocol') == _APP_PGP_ENC):
278                    _handle_encrypted(m, sub, session_keys)
279
280    return m
281
282
283def decrypted_message_from_string(s, session_keys=None):
284    '''Reads a mail from the given string. This is the equivalent of
285    :func:`email.message_from_string` which does nothing but to wrap
286    the given string in a StringIO object and to call
287    :func:`email.message_from_file`.
288
289    Please refer to the documentation of :func:`message_from_file` for
290    details.
291
292    '''
293    return decrypted_message_from_file(io.StringIO(s), session_keys)
294
295
296def decrypted_message_from_bytes(bytestring, session_keys=None):
297    """Create a Message from bytes.
298
299    :param bytes bytestring: an email message as raw bytes
300    :param session_keys: a list OpenPGP session keys
301    """
302    return decrypted_message_from_message(
303        email.message_from_bytes(bytestring,
304                                 _class=email.message.EmailMessage,
305                                 policy=email.policy.SMTP),
306        session_keys)
307
308
309def extract_headers(mail, headers=None):
310    """
311    returns subset of this messages headers as human-readable format:
312    all header values are decoded, the resulting string has
313    one line "KEY: VALUE" for each requested header present in the mail.
314
315    :param mail: the mail to use
316    :type mail: :class:`email.message.EmailMessage`
317    :param headers: headers to extract
318    :type headers: list of str
319    """
320    headertext = ''
321    if headers is None:
322        headers = mail.keys()
323    for key in headers:
324        value = ''
325        if key in mail:
326            value = decode_header(mail.get(key, ''))
327        headertext += '%s: %s\n' % (key, value)
328    return headertext
329
330
331def render_part(part, field_key='copiousoutput'):
332    """
333    renders a non-multipart email part into displayable plaintext by piping its
334    payload through an external script. The handler itself is determined by
335    the mailcap entry for this part's ctype.
336    """
337    ctype = part.get_content_type()
338    raw_payload = remove_cte(part)
339    rendered_payload = None
340    # get mime handler
341    _, entry = settings.mailcap_find_match(ctype, key=field_key)
342    if entry is not None:
343        tempfile_name = None
344        stdin = None
345        handler_raw_commandstring = entry['view']
346        # in case the mailcap defined command contains no '%s',
347        # we pipe the files content to the handling command via stdin
348        if '%s' in handler_raw_commandstring:
349            # open tempfile, respect mailcaps nametemplate
350            nametemplate = entry.get('nametemplate', '%s')
351            prefix, suffix = parse_mailcap_nametemplate(nametemplate)
352            with tempfile.NamedTemporaryFile(
353                    delete=False, prefix=prefix, suffix=suffix) \
354                    as tmpfile:
355                tmpfile.write(raw_payload)
356                tempfile_name = tmpfile.name
357        else:
358            stdin = raw_payload
359
360        # read parameter, create handler command
361        parms = tuple('='.join(p) for p in part.get_params())
362
363        # create and call external command
364        cmd = mailcap.subst(entry['view'], ctype,
365                            filename=tempfile_name, plist=parms)
366        logging.debug('command: %s', cmd)
367        logging.debug('parms: %s', str(parms))
368        cmdlist = split_commandstring(cmd)
369        # call handler
370        stdout, _, _ = helper.call_cmd(cmdlist, stdin=stdin)
371        if stdout:
372            rendered_payload = stdout
373
374        # remove tempfile
375        if tempfile_name:
376            os.unlink(tempfile_name)
377
378    return rendered_payload
379
380
381def remove_cte(part, as_string=False):
382    """Interpret MIME-part according to it's Content-Transfer-Encodings.
383
384    This returns the payload of `part` as string or bytestring for display, or
385    to be passed to an external program. In the raw file the payload may be
386    encoded, e.g. in base64, quoted-printable, 7bit, or 8bit. This method will
387    look for one of the above Content-Transfer-Encoding header and interpret
388    the payload accordingly.
389
390    Incorrect header values (common in spam messages) will be interpreted as
391    lenient as possible and will result in INFO-level debug messages.
392
393    ..Note:: All this may be depricated in favour of
394             `email.contentmanager.raw_data_manager` (v3.6+)
395
396    :param email.message.EmailMessage part: The part to decode
397    :param bool as_string: If true return a str, otherwise return bytes
398    :returns: The mail with any Content-Transfer-Encoding removed
399    :rtype: Union[str, bytes]
400    """
401    enc = part.get_content_charset() or 'ascii'
402    cte = str(part.get('content-transfer-encoding', '7bit')).lower().strip()
403    payload = part.get_payload()
404    sp = ''  # string variant of return value
405    bp = b''  # bytestring variant
406
407    logging.debug('Content-Transfer-Encoding: "{}"'.format(cte))
408    if cte not in ['quoted-printable', 'base64', '7bit', '8bit', 'binary']:
409        logging.info('Unknown Content-Transfer-Encoding: "{}"'.format(cte))
410
411    # switch through all sensible cases
412    # starting with those where payload is already a str
413    if '7bit' in cte or 'binary' in cte:
414        logging.debug('assuming Content-Transfer-Encoding: 7bit')
415        sp = payload
416        if as_string:
417            return sp
418        bp = payload.encode('utf-8')
419        return bp
420
421    # the remaining cases need decoding and define only bt;
422    # decoding into a str is done at the end if requested
423    elif '8bit' in cte:
424        logging.debug('assuming Content-Transfer-Encoding: 8bit')
425        # Python's mail library may decode 8bit as raw-unicode-escape, so
426        # we need to encode that back to bytes so we can decode it using
427        # the correct encoding, or it might not, in which case assume that
428        # the str representation we got is correct.
429        bp = payload.encode('raw-unicode-escape')
430
431    elif 'quoted-printable' in cte:
432        logging.debug('assuming Content-Transfer-Encoding: quoted-printable')
433        bp = quopri.decodestring(payload.encode('ascii'))
434
435    elif 'base64' in cte:
436        logging.debug('assuming Content-Transfer-Encoding: base64')
437        bp = base64.b64decode(payload)
438
439    else:
440        logging.debug('failed to interpret Content-Transfer-Encoding: '
441                      '"{}"'.format(cte))
442
443    # by now, bp is defined, sp is not.
444    if as_string:
445        try:
446            sp = bp.decode(enc)
447        except LookupError:
448            # enc is unknown;
449            # fall back to guessing the correct encoding using libmagic
450            sp = helper.try_decode(bp)
451        except UnicodeDecodeError as emsg:
452            # the mail contains chars that are not enc-encoded.
453            # libmagic works better than just ignoring those
454            logging.debug('Decoding failure: {}'.format(emsg))
455            sp = helper.try_decode(bp)
456        return sp
457    return bp
458
459
460MISSING_HTML_MSG = ("This message contains a text/html part that was not "
461                    "rendered due to a missing mailcap entry. "
462                    "Please refer to item 5 in our FAQ: "
463                    "http://alot.rtfd.io/en/latest/faq.html")
464
465
466def extract_body(mail):
467    """Returns a string view of a Message.
468
469    This consults :ref:`prefer_plaintext <prefer-plaintext>`
470    to determine if a "text/plain" alternative is preferred over a "text/html"
471    part.
472
473    :param mail: the mail to use
474    :type mail: :class:`email.message.EmailMessage`
475    :returns: The combined text of any parts to be used
476    :rtype: str
477    """
478
479    if settings.get('prefer_plaintext'):
480        preferencelist = ('plain', 'html')
481    else:
482        preferencelist = ('html', 'plain')
483
484    body_part = mail.get_body(preferencelist)
485    if body_part is None:  # if no part matching preferredlist was found
486        return ""
487
488    displaystring = ""
489
490    if body_part.get_content_type() == 'text/plain':
491        displaystring = string_sanitize(remove_cte(body_part, as_string=True))
492    else:
493        rendered_payload = render_part(body_part)
494        if rendered_payload:  # handler had output
495            displaystring = string_sanitize(rendered_payload)
496        else:
497            if body_part.get_content_type() == 'text/html':
498                displaystring = MISSING_HTML_MSG
499    return displaystring
500
501
502def formataddr(pair):
503    """ this is the inverse of email.utils.parseaddr:
504    other than email.utils.formataddr, this
505    - *will not* re-encode unicode strings, and
506    - *will* re-introduce quotes around real names containing commas
507    """
508    name, address = pair
509    if not name:
510        return address
511    elif ',' in name:
512        name = "\"" + name + "\""
513    return "{0} <{1}>".format(name, address)
514
515
516def decode_header(header, normalize=False):
517    """
518    decode a header value to a unicode string
519
520    values are usually a mixture of different substrings
521    encoded in quoted printable using different encodings.
522    This turns it into a single unicode string
523
524    :param header: the header value
525    :type header: str
526    :param normalize: replace trailing spaces after newlines
527    :type normalize: bool
528    :rtype: str
529    """
530    logging.debug("unquoted header: |%s|", header)
531
532    valuelist = email.header.decode_header(header)
533    decoded_list = []
534    for v, enc in valuelist:
535        v = string_decode(v, enc)
536        decoded_list.append(string_sanitize(v))
537    value = ''.join(decoded_list)
538    if normalize:
539        value = re.sub(r'\n\s+', r' ', value)
540    return value
541
542
543def is_subdir_of(subpath, superpath):
544    # make both absolute
545    superpath = os.path.realpath(superpath)
546    subpath = os.path.realpath(subpath)
547
548    # return true, if the common prefix of both is equal to directory
549    # e.g. /a/b/c/d.rst and directory is /a/b, the common prefix is /a/b
550    return os.path.commonprefix([subpath, superpath]) == superpath
551
552
553def clear_my_address(my_account, value):
554    """return recipient header without the addresses in my_account
555
556    :param my_account: my account
557    :type my_account: :class:`Account`
558    :param value: a list of recipient or sender strings (with or without
559        real names as taken from email headers)
560    :type value: list(str)
561    :returns: a new, potentially shortend list
562    :rtype: list(str)
563    """
564    new_value = []
565    for name, address in email.utils.getaddresses(value):
566        if not my_account.matches_address(address):
567            new_value.append(formataddr((name, address)))
568    return new_value
569
570
571def ensure_unique_address(recipients):
572    """
573    clean up a list of name,address pairs so that
574    no address appears multiple times.
575    """
576    res = dict()
577    for name, address in email.utils.getaddresses(recipients):
578        res[address] = name
579    logging.debug(res)
580    urecipients = [formataddr((n, a)) for a, n in res.items()]
581    return sorted(urecipients)
582