1# encoding: utf-8 2# Copyright (C) 2017 Lucas Hoffmann 3# Copyright © 2017 Dylan Baker 4# This file is released under the GNU GPL, version 3 or a later revision. 5# For further details see the COPYING file 6import base64 7import codecs 8import email 9import email.header 10import email.mime.application 11import email.policy 12import email.utils 13from email.message import EmailMessage 14import io 15import os 16import os.path 17import shutil 18import tempfile 19import unittest 20from unittest import mock 21 22import gpg 23 24from alot import crypto 25from alot.db import utils 26from alot.errors import GPGProblem 27from alot.account import Account 28from ..utilities import make_key, make_uid, TestCaseClassCleanup 29 30 31class TestGetParams(unittest.TestCase): 32 33 mailstring = '\n'.join([ 34 'From: me', 35 'To: you', 36 'Subject: header field capitalisation', 37 'Content-type: text/plain; charset=utf-8', 38 'X-Header: param=one; and=two; or=three', 39 "X-Quoted: param=utf-8''%C3%9Cmlaut; second=plain%C3%9C", 40 'X-UPPERCASE: PARAM1=ONE; PARAM2=TWO' 41 '\n', 42 'content' 43 ]) 44 mail = email.message_from_string(mailstring) 45 46 def test_returns_content_type_parameters_by_default(self): 47 actual = utils.get_params(self.mail) 48 expected = {'text/plain': '', 'charset': 'utf-8'} 49 self.assertDictEqual(actual, expected) 50 51 def test_can_return_params_of_any_header_field(self): 52 actual = utils.get_params(self.mail, header='x-header') 53 expected = {'param': 'one', 'and': 'two', 'or': 'three'} 54 self.assertDictEqual(actual, expected) 55 56 @unittest.expectedFailure 57 def test_parameters_are_decoded(self): 58 actual = utils.get_params(self.mail, header='x-quoted') 59 expected = {'param': 'Ümlaut', 'second': 'plain%C3%9C'} 60 self.assertDictEqual(actual, expected) 61 62 def test_parameters_names_are_converted_to_lowercase(self): 63 actual = utils.get_params(self.mail, header='x-uppercase') 64 expected = {'param1': 'ONE', 'param2': 'TWO'} 65 self.assertDictEqual(actual, expected) 66 67 def test_returns_empty_dict_if_header_not_present(self): 68 actual = utils.get_params(self.mail, header='x-header-not-present') 69 self.assertDictEqual(actual, dict()) 70 71 def test_returns_failobj_if_header_not_present(self): 72 failobj = [('my special failobj for the test', 'needs to be a pair!')] 73 actual = utils.get_params(self.mail, header='x-header-not-present', 74 failobj=failobj) 75 expected = dict(failobj) 76 self.assertEqual(actual, expected) 77 78 79class TestIsSubdirOf(unittest.TestCase): 80 81 def test_both_paths_absolute_matching(self): 82 superpath = '/a/b' 83 subpath = '/a/b/c/d.rst' 84 result = utils.is_subdir_of(subpath, superpath) 85 self.assertTrue(result) 86 87 def test_both_paths_absolute_not_matching(self): 88 superpath = '/a/z' 89 subpath = '/a/b/c/d.rst' 90 result = utils.is_subdir_of(subpath, superpath) 91 self.assertFalse(result) 92 93 def test_both_paths_relative_matching(self): 94 superpath = 'a/b' 95 subpath = 'a/b/c/d.rst' 96 result = utils.is_subdir_of(subpath, superpath) 97 self.assertTrue(result) 98 99 def test_both_paths_relative_not_matching(self): 100 superpath = 'a/z' 101 subpath = 'a/b/c/d.rst' 102 result = utils.is_subdir_of(subpath, superpath) 103 self.assertFalse(result) 104 105 def test_relative_path_and_absolute_path_matching(self): 106 superpath = 'a/b' 107 subpath = os.path.join(os.getcwd(), 'a/b/c/d.rst') 108 result = utils.is_subdir_of(subpath, superpath) 109 self.assertTrue(result) 110 111 112class TestExtractHeader(unittest.TestCase): 113 114 mailstring = '\n'.join([ 115 'From: me', 116 'To: you', 117 'Subject: header field capitalisation', 118 'Content-type: text/plain; charset=utf-8', 119 'X-Header: param=one; and=two; or=three', 120 "X-Quoted: param=utf-8''%C3%9Cmlaut; second=plain%C3%9C", 121 'X-UPPERCASE: PARAM1=ONE; PARAM2=TWO' 122 '\n', 123 'content' 124 ]) 125 mail = email.message_from_string(mailstring) 126 127 def test_default_arguments_yield_all_headers(self): 128 actual = utils.extract_headers(self.mail) 129 # collect all lines until the first empty line, hence all header lines 130 expected = [] 131 for line in self.mailstring.splitlines(): 132 if not line: 133 break 134 expected.append(line) 135 expected = '\n'.join(expected) + '\n' 136 self.assertEqual(actual, expected) 137 138 def test_single_headers_can_be_retrieved(self): 139 actual = utils.extract_headers(self.mail, ['from']) 140 expected = 'from: me\n' 141 self.assertEqual(actual, expected) 142 143 def test_multible_headers_can_be_retrieved_in_predevined_order(self): 144 headers = ['x-header', 'to', 'x-uppercase'] 145 actual = utils.extract_headers(self.mail, headers) 146 expected = 'x-header: param=one; and=two; or=three\nto: you\n' \ 147 'x-uppercase: PARAM1=ONE; PARAM2=TWO\n' 148 self.assertEqual(actual, expected) 149 150 def test_headers_can_be_retrieved_multible_times(self): 151 headers = ['from', 'from'] 152 actual = utils.extract_headers(self.mail, headers) 153 expected = 'from: me\nfrom: me\n' 154 self.assertEqual(actual, expected) 155 156 def test_case_is_prserved_in_header_keys_but_irelevant(self): 157 headers = ['FROM', 'from'] 158 actual = utils.extract_headers(self.mail, headers) 159 expected = 'FROM: me\nfrom: me\n' 160 self.assertEqual(actual, expected) 161 162 @unittest.expectedFailure 163 def test_header_values_are_not_decoded(self): 164 actual = utils.extract_headers(self.mail, ['x-quoted']) 165 expected = "x-quoted: param=utf-8''%C3%9Cmlaut; second=plain%C3%9C\n", 166 self.assertEqual(actual, expected) 167 168 169class TestDecodeHeader(unittest.TestCase): 170 171 @staticmethod 172 def _quote(unicode_string, encoding): 173 """Turn a unicode string into a RFC2047 quoted ascii string 174 175 :param unicode_string: the string to encode 176 :type unicode_string: unicode 177 :param encoding: the encoding to use, 'utf-8', 'iso-8859-1', ... 178 :type encoding: str 179 :returns: the encoded string 180 :rtype: str 181 """ 182 string = unicode_string.encode(encoding) 183 output = b'=?' + encoding.encode('ascii') + b'?Q?' 184 for byte in string: 185 output += b'=' + codecs.encode(bytes([byte]), 'hex').upper() 186 return (output + b'?=').decode('ascii') 187 188 @staticmethod 189 def _base64(unicode_string, encoding): 190 """Turn a unicode string into a RFC2047 base64 encoded ascii string 191 192 :param unicode_string: the string to encode 193 :type unicode_string: unicode 194 :param encoding: the encoding to use, 'utf-8', 'iso-8859-1', ... 195 :type encoding: str 196 :returns: the encoded string 197 :rtype: str 198 """ 199 string = unicode_string.encode(encoding) 200 b64 = base64.encodebytes(string).strip() 201 result_bytes = b'=?' + encoding.encode('utf-8') + b'?B?' + b64 + b'?=' 202 result = result_bytes.decode('ascii') 203 return result 204 205 def _test(self, teststring, expected): 206 actual = utils.decode_header(teststring) 207 self.assertEqual(actual, expected) 208 209 def test_non_ascii_strings_are_returned_as_unicode_directly(self): 210 text = 'Nön ÄSCII string¡' 211 self._test(text, text) 212 213 def test_basic_utf_8_quoted(self): 214 expected = 'ÄÖÜäöü' 215 text = self._quote(expected, 'utf-8') 216 self._test(text, expected) 217 218 def test_basic_iso_8859_1_quoted(self): 219 expected = 'ÄÖÜäöü' 220 text = self._quote(expected, 'iso-8859-1') 221 self._test(text, expected) 222 223 def test_basic_windows_1252_quoted(self): 224 expected = 'ÄÖÜäöü' 225 text = self._quote(expected, 'windows-1252') 226 self._test(text, expected) 227 228 def test_basic_utf_8_base64(self): 229 expected = 'ÄÖÜäöü' 230 text = self._base64(expected, 'utf-8') 231 self._test(text, expected) 232 233 def test_basic_iso_8859_1_base64(self): 234 expected = 'ÄÖÜäöü' 235 text = self._base64(expected, 'iso-8859-1') 236 self._test(text, expected) 237 238 def test_basic_iso_1252_base64(self): 239 expected = 'ÄÖÜäöü' 240 text = self._base64(expected, 'windows-1252') 241 self._test(text, expected) 242 243 def test_quoted_words_can_be_interrupted(self): 244 part = 'ÄÖÜäöü' 245 text = self._base64(part, 'utf-8') + ' and ' + \ 246 self._quote(part, 'utf-8') 247 expected = 'ÄÖÜäöü and ÄÖÜäöü' 248 self._test(text, expected) 249 250 def test_different_encodings_can_be_mixed(self): 251 part = 'ÄÖÜäöü' 252 text = 'utf-8: ' + self._base64(part, 'utf-8') + \ 253 ' again: ' + self._quote(part, 'utf-8') + \ 254 ' latin1: ' + self._base64(part, 'iso-8859-1') + \ 255 ' and ' + self._quote(part, 'iso-8859-1') 256 expected = ( 257 'utf-8: ÄÖÜäöü ' 258 'again: ÄÖÜäöü ' 259 'latin1: ÄÖÜäöü and ÄÖÜäöü' 260 ) 261 self._test(text, expected) 262 263 def test_tabs_are_expanded_to_align_with_eigth_spaces(self): 264 text = 'tab: \t' 265 expected = 'tab: ' 266 self._test(text, expected) 267 268 def test_newlines_are_not_touched_by_default(self): 269 text = 'first\nsecond\n third\n fourth' 270 expected = 'first\nsecond\n third\n fourth' 271 self._test(text, expected) 272 273 def test_continuation_newlines_can_be_normalized(self): 274 text = 'first\nsecond\n third\n\tfourth\n \t fifth' 275 expected = 'first\nsecond third fourth fifth' 276 actual = utils.decode_header(text, normalize=True) 277 self.assertEqual(actual, expected) 278 279 def test_exchange_quotes_remain(self): 280 # issue #1347 281 expected = '"Mouse, Michaël" <x@y.z>' 282 text = self._quote(expected, 'utf-8') 283 self._test(text, expected) 284 285 286class TestAddSignatureHeaders(unittest.TestCase): 287 288 class FakeMail(object): 289 def __init__(self): 290 self.headers = [] 291 292 def add_header(self, header, value): 293 self.headers.append((header, value)) 294 295 def check(self, key, valid, error_msg=''): 296 mail = self.FakeMail() 297 298 with mock.patch('alot.db.utils.crypto.get_key', 299 mock.Mock(return_value=key)), \ 300 mock.patch('alot.db.utils.crypto.check_uid_validity', 301 mock.Mock(return_value=valid)): 302 utils.add_signature_headers(mail, [mock.Mock(fpr='')], error_msg) 303 304 return mail 305 306 def test_length_0(self): 307 mail = self.FakeMail() 308 utils.add_signature_headers(mail, [], '') 309 self.assertIn((utils.X_SIGNATURE_VALID_HEADER, 'False'), mail.headers) 310 self.assertIn( 311 (utils.X_SIGNATURE_MESSAGE_HEADER, 'Invalid: no signature found'), 312 mail.headers) 313 314 def test_valid(self): 315 key = make_key() 316 mail = self.check(key, True) 317 318 self.assertIn((utils.X_SIGNATURE_VALID_HEADER, 'True'), mail.headers) 319 self.assertIn( 320 (utils.X_SIGNATURE_MESSAGE_HEADER, 'Valid: mocked'), mail.headers) 321 322 def test_untrusted(self): 323 key = make_key() 324 mail = self.check(key, False) 325 326 self.assertIn((utils.X_SIGNATURE_VALID_HEADER, 'True'), mail.headers) 327 self.assertIn( 328 (utils.X_SIGNATURE_MESSAGE_HEADER, 'Untrusted: mocked'), 329 mail.headers) 330 331 def test_unicode_as_bytes(self): 332 mail = self.FakeMail() 333 key = make_key() 334 key.uids = [make_uid('andreá@example.com', uid='Andreá')] 335 mail = self.check(key, True) 336 337 self.assertIn((utils.X_SIGNATURE_VALID_HEADER, 'True'), mail.headers) 338 self.assertIn( 339 (utils.X_SIGNATURE_MESSAGE_HEADER, 'Valid: Andreá'), 340 mail.headers) 341 342 def test_error_message_unicode(self): 343 mail = self.check(mock.Mock(), mock.Mock(), 'error message') 344 self.assertIn((utils.X_SIGNATURE_VALID_HEADER, 'False'), mail.headers) 345 self.assertIn( 346 (utils.X_SIGNATURE_MESSAGE_HEADER, 'Invalid: error message'), 347 mail.headers) 348 349 def test_get_key_fails(self): 350 mail = self.FakeMail() 351 with mock.patch('alot.db.utils.crypto.get_key', 352 mock.Mock(side_effect=GPGProblem('', 0))): 353 utils.add_signature_headers(mail, [mock.Mock(fpr='')], '') 354 self.assertIn((utils.X_SIGNATURE_VALID_HEADER, 'False'), mail.headers) 355 self.assertIn( 356 (utils.X_SIGNATURE_MESSAGE_HEADER, 'Untrusted: '), 357 mail.headers) 358 359 360class TestMessageFromFile(TestCaseClassCleanup): 361 362 @classmethod 363 def setUpClass(cls): 364 home = tempfile.mkdtemp() 365 cls.addClassCleanup(shutil.rmtree, home) 366 mock_home = mock.patch.dict(os.environ, {'GNUPGHOME': home}) 367 mock_home.start() 368 cls.addClassCleanup(mock_home.stop) 369 370 with gpg.core.Context() as ctx: 371 search_dir = os.path.join(os.path.dirname(__file__), 372 '../static/gpg-keys') 373 for each in os.listdir(search_dir): 374 if os.path.splitext(each)[1] == '.gpg': 375 with open(os.path.join(search_dir, each)) as f: 376 ctx.op_import(f) 377 378 cls.keys = [ 379 ctx.get_key("DD19862809A7573A74058FF255937AFBB156245D")] 380 381 def test_erase_alot_header_signature_valid(self): 382 """Alot uses special headers for passing certain kinds of information, 383 it's important that information isn't passed in from the original 384 message as a way to trick the user. 385 """ 386 m = email.message.Message() 387 m.add_header(utils.X_SIGNATURE_VALID_HEADER, 'Bad') 388 message = utils.decrypted_message_from_file(io.StringIO(m.as_string())) 389 self.assertIs(message.get(utils.X_SIGNATURE_VALID_HEADER), None) 390 391 def test_erase_alot_header_message(self): 392 m = email.message.Message() 393 m.add_header(utils.X_SIGNATURE_MESSAGE_HEADER, 'Bad') 394 message = utils.decrypted_message_from_file(io.StringIO(m.as_string())) 395 self.assertIs(message.get(utils.X_SIGNATURE_MESSAGE_HEADER), None) 396 397 def test_plain_mail(self): 398 m = email.mime.text.MIMEText('This is some text', 'plain', 'utf-8') 399 m['Subject'] = 'test' 400 m['From'] = 'me' 401 m['To'] = 'Nobody' 402 message = utils.decrypted_message_from_file(io.StringIO(m.as_string())) 403 self.assertEqual(message.get_payload(), 'This is some text') 404 405 def _make_signed(self): 406 """Create a signed message that is multipart/signed.""" 407 text = b'This is some text' 408 t = email.mime.text.MIMEText(text, 'plain', 'utf-8') 409 _, sig = crypto.detached_signature_for( 410 t.as_bytes(policy=email.policy.SMTP), self.keys) 411 s = email.mime.application.MIMEApplication( 412 sig, 'pgp-signature', email.encoders.encode_7or8bit) 413 m = email.mime.multipart.MIMEMultipart('signed', None, [t, s]) 414 m.set_param('protocol', 'application/pgp-signature') 415 m.set_param('micalg', 'pgp-sha256') 416 return m 417 418 def test_signed_headers_included(self): 419 """Headers are added to the message.""" 420 m = self._make_signed() 421 m = utils.decrypted_message_from_file(io.StringIO(m.as_string())) 422 self.assertIn(utils.X_SIGNATURE_VALID_HEADER, m) 423 self.assertIn(utils.X_SIGNATURE_MESSAGE_HEADER, m) 424 425 def test_signed_valid(self): 426 """Test that the signature is valid.""" 427 m = self._make_signed() 428 m = utils.decrypted_message_from_file(io.StringIO(m.as_string())) 429 self.assertEqual(m[utils.X_SIGNATURE_VALID_HEADER], 'True') 430 431 def test_signed_correct_from(self): 432 """Test that the signature is valid.""" 433 m = self._make_signed() 434 m = utils.decrypted_message_from_file(io.StringIO(m.as_string())) 435 # Don't test for valid/invalid since that might change 436 self.assertIn( 437 'ambig <ambig@example.com>', m[utils.X_SIGNATURE_MESSAGE_HEADER]) 438 439 def test_signed_wrong_mimetype_second_payload(self): 440 m = self._make_signed() 441 m.get_payload(1).set_type('text/plain') 442 m = utils.decrypted_message_from_file(io.StringIO(m.as_string())) 443 self.assertIn('expected Content-Type: ', 444 m[utils.X_SIGNATURE_MESSAGE_HEADER]) 445 446 def test_signed_wrong_micalg(self): 447 m = self._make_signed() 448 m.set_param('micalg', 'foo') 449 m = utils.decrypted_message_from_file(io.StringIO(m.as_string())) 450 self.assertIn('expected micalg=pgp-...', 451 m[utils.X_SIGNATURE_MESSAGE_HEADER]) 452 453 def test_signed_micalg_cap(self): 454 """The micalg parameter should be normalized to lower case. 455 456 From RFC 3156 § 5 457 458 The "micalg" parameter for the "application/pgp-signature" protocol 459 MUST contain exactly one hash-symbol of the format "pgp-<hash- 460 identifier>", where <hash-identifier> identifies the Message 461 Integrity Check (MIC) algorithm used to generate the signature. 462 Hash-symbols are constructed from the text names registered in [1] 463 or according to the mechanism defined in that document by 464 converting the text name to lower case and prefixing it with the 465 four characters "pgp-". 466 467 The spec is pretty clear that this is supposed to be lower cased. 468 """ 469 m = self._make_signed() 470 m.set_param('micalg', 'PGP-SHA1') 471 m = utils.decrypted_message_from_file(io.StringIO(m.as_string())) 472 self.assertIn('expected micalg=pgp-', 473 m[utils.X_SIGNATURE_MESSAGE_HEADER]) 474 475 def test_signed_more_than_two_messages(self): 476 """Per the spec only 2 payloads may be encapsulated inside the 477 multipart/signed payload, while it might be nice to cover more than 2 478 payloads (Postel's law), it would introduce serious complexity 479 since we would also need to cover those payloads being misordered. 480 Since getting the right number of payloads and getting them in the 481 right order should be fairly easy to implement correctly enforcing that 482 there are only two payloads seems reasonable. 483 """ 484 m = self._make_signed() 485 m.attach(email.mime.text.MIMEText('foo')) 486 m = utils.decrypted_message_from_file(io.StringIO(m.as_string())) 487 self.assertIn('expected exactly two messages, got 3', 488 m[utils.X_SIGNATURE_MESSAGE_HEADER]) 489 490 # TODO: The case of more than two payloads, or the payloads being out of 491 # order. Also for the encrypted case. 492 493 def _make_encrypted(self, signed=False): 494 """Create an encrypted (and optionally signed) message.""" 495 if signed: 496 t = self._make_signed() 497 else: 498 text = b'This is some text' 499 t = email.mime.text.MIMEText(text, 'plain', 'utf-8') 500 enc = crypto.encrypt(t.as_bytes(policy=email.policy.SMTP), self.keys) 501 e = email.mime.application.MIMEApplication( 502 enc, 'octet-stream', email.encoders.encode_7or8bit) 503 504 f = email.mime.application.MIMEApplication( 505 b'Version: 1', 'pgp-encrypted', email.encoders.encode_7or8bit) 506 507 m = email.mime.multipart.MIMEMultipart('encrypted', None, [f, e]) 508 m.set_param('protocol', 'application/pgp-encrypted') 509 510 return m 511 512 def test_encrypted_length(self): 513 # It seems string that we just attach the unsigned message to the end 514 # of the mail, rather than replacing the whole encrypted payload with 515 # it's unencrypted equivalent 516 m = self._make_encrypted() 517 m = utils.decrypted_message_from_file(io.StringIO(m.as_string())) 518 self.assertEqual(len(m.get_payload()), 3) 519 520 def test_encrypted_unsigned_is_decrypted(self): 521 m = self._make_encrypted() 522 m = utils.decrypted_message_from_file(io.StringIO(m.as_string())) 523 # Check using m.walk, since we're not checking for ordering, just 524 # existence. 525 self.assertIn('This is some text', [n.get_payload() for n in m.walk()]) 526 527 def test_encrypted_unsigned_doesnt_add_signed_headers(self): 528 """Since the message isn't signed, it shouldn't have headers saying 529 that there is a signature. 530 """ 531 m = self._make_encrypted() 532 m = utils.decrypted_message_from_file(io.StringIO(m.as_string())) 533 self.assertNotIn(utils.X_SIGNATURE_VALID_HEADER, m) 534 self.assertNotIn(utils.X_SIGNATURE_MESSAGE_HEADER, m) 535 536 def test_encrypted_signed_is_decrypted(self): 537 m = self._make_encrypted(True) 538 m = utils.decrypted_message_from_file(io.StringIO(m.as_string())) 539 self.assertIn('This is some text', [n.get_payload() for n in m.walk()]) 540 541 def test_encrypted_signed_headers(self): 542 """Since the message is signed, it should have headers saying that 543 there is a signature. 544 """ 545 m = self._make_encrypted(True) 546 m = utils.decrypted_message_from_file(io.StringIO(m.as_string())) 547 self.assertIn(utils.X_SIGNATURE_MESSAGE_HEADER, m) 548 self.assertIn( 549 'ambig <ambig@example.com>', m[utils.X_SIGNATURE_MESSAGE_HEADER]) 550 551 # TODO: tests for the RFC 2440 style combined signed/encrypted blob 552 553 def test_encrypted_wrong_mimetype_first_payload(self): 554 m = self._make_encrypted() 555 m.get_payload(0).set_type('text/plain') 556 m = utils.decrypted_message_from_file(io.StringIO(m.as_string())) 557 self.assertIn('Malformed OpenPGP message:', 558 m.get_payload(2).get_payload()) 559 560 def test_encrypted_wrong_mimetype_second_payload(self): 561 m = self._make_encrypted() 562 m.get_payload(1).set_type('text/plain') 563 m = utils.decrypted_message_from_file(io.StringIO(m.as_string())) 564 self.assertIn('Malformed OpenPGP message:', 565 m.get_payload(2).get_payload()) 566 567 def test_signed_in_multipart_mixed(self): 568 """It is valid to encapsulate a multipart/signed payload inside a 569 multipart/mixed payload, verify that works. 570 """ 571 s = self._make_signed() 572 m = email.mime.multipart.MIMEMultipart('mixed', None, [s]) 573 m = utils.decrypted_message_from_file(io.StringIO(m.as_string())) 574 self.assertIn(utils.X_SIGNATURE_VALID_HEADER, m) 575 self.assertIn(utils.X_SIGNATURE_MESSAGE_HEADER, m) 576 577 def test_encrypted_unsigned_in_multipart_mixed(self): 578 """It is valid to encapsulate a multipart/encrypted payload inside a 579 multipart/mixed payload, verify that works. 580 """ 581 s = self._make_encrypted() 582 m = email.mime.multipart.MIMEMultipart('mixed', None, [s]) 583 m = utils.decrypted_message_from_file(io.StringIO(m.as_string())) 584 self.assertIn('This is some text', [n.get_payload() for n in m.walk()]) 585 self.assertNotIn(utils.X_SIGNATURE_VALID_HEADER, m) 586 self.assertNotIn(utils.X_SIGNATURE_MESSAGE_HEADER, m) 587 588 def test_encrypted_signed_in_multipart_mixed(self): 589 """It is valid to encapsulate a multipart/encrypted payload inside a 590 multipart/mixed payload, verify that works when the multipart/encrypted 591 contains a multipart/signed. 592 """ 593 s = self._make_encrypted(True) 594 m = email.mime.multipart.MIMEMultipart('mixed', None, [s]) 595 m = utils.decrypted_message_from_file(io.StringIO(m.as_string())) 596 self.assertIn('This is some text', [n.get_payload() for n in m.walk()]) 597 self.assertIn(utils.X_SIGNATURE_VALID_HEADER, m) 598 self.assertIn(utils.X_SIGNATURE_MESSAGE_HEADER, m) 599 600 601class TestExtractBody(unittest.TestCase): 602 603 @staticmethod 604 def _set_basic_headers(mail): 605 mail['Subject'] = 'Test email' 606 mail['To'] = 'foo@example.com' 607 mail['From'] = 'bar@example.com' 608 609 def test_single_text_plain(self): 610 mail = EmailMessage() 611 self._set_basic_headers(mail) 612 mail.set_content('This is an email') 613 actual = utils.extract_body(mail) 614 615 expected = 'This is an email\n' 616 617 self.assertEqual(actual, expected) 618 619 @unittest.expectedFailure 620 # This makes no sense 621 def test_two_text_plain(self): 622 mail = email.mime.multipart.MIMEMultipart() 623 self._set_basic_headers(mail) 624 mail.attach(email.mime.text.MIMEText('This is an email')) 625 mail.attach(email.mime.text.MIMEText('This is a second part')) 626 627 actual = utils.extract_body(mail) 628 expected = 'This is an email\n\nThis is a second part' 629 630 self.assertEqual(actual, expected) 631 632 def test_text_plain_with_attachment_text(self): 633 mail = EmailMessage() 634 self._set_basic_headers(mail) 635 mail.set_content('This is an email') 636 mail.add_attachment('this shouldnt be displayed') 637 638 actual = utils.extract_body(mail) 639 expected = 'This is an email\n' 640 641 self.assertEqual(actual, expected) 642 643 def _make_mixed_plain_html(self): 644 mail = EmailMessage() 645 self._set_basic_headers(mail) 646 mail.set_content('This is an email') 647 mail.add_alternative( 648 '<!DOCTYPE html><html><body>This is an html email</body></html>', 649 subtype='html') 650 return mail 651 652 @mock.patch('alot.db.utils.settings.get', mock.Mock(return_value=True)) 653 def test_prefer_plaintext_mixed(self): 654 expected = 'This is an email\n' 655 mail = self._make_mixed_plain_html() 656 actual = utils.extract_body(mail) 657 658 self.assertEqual(actual, expected) 659 660 # Mock the handler to cat, so that no transformations of the html are made 661 # making the result non-deterministic 662 @mock.patch('alot.db.utils.settings.get', mock.Mock(return_value=False)) 663 @mock.patch('alot.db.utils.settings.mailcap_find_match', 664 mock.Mock(return_value=(None, {'view': 'cat'}))) 665 def test_prefer_html_mixed(self): 666 expected = '<!DOCTYPE html><html><body>This is an html email</body></html>\n' 667 mail = self._make_mixed_plain_html() 668 actual = utils.extract_body(mail) 669 670 self.assertEqual(actual, expected) 671 672 def _make_html_only(self): 673 mail = EmailMessage() 674 self._set_basic_headers(mail) 675 mail.set_content( 676 '<!DOCTYPE html><html><body>This is an html email</body></html>', 677 subtype='html') 678 return mail 679 680 @mock.patch('alot.db.utils.settings.get', mock.Mock(return_value=True)) 681 @mock.patch('alot.db.utils.settings.mailcap_find_match', 682 mock.Mock(return_value=(None, {'view': 'cat'}))) 683 def test_prefer_plaintext_only(self): 684 expected = '<!DOCTYPE html><html><body>This is an html email</body></html>\n' 685 mail = self._make_html_only() 686 actual = utils.extract_body(mail) 687 688 self.assertEqual(actual, expected) 689 690 # Mock the handler to cat, so that no transformations of the html are made 691 # making the result non-deterministic 692 @mock.patch('alot.db.utils.settings.get', mock.Mock(return_value=False)) 693 @mock.patch('alot.db.utils.settings.mailcap_find_match', 694 mock.Mock(return_value=(None, {'view': 'cat'}))) 695 def test_prefer_html_only(self): 696 expected = '<!DOCTYPE html><html><body>This is an html email</body></html>\n' 697 mail = self._make_html_only() 698 actual = utils.extract_body(mail) 699 700 self.assertEqual(actual, expected) 701 702 def test_simple_utf8_file(self): 703 mail = email.message_from_binary_file( 704 open('tests/static/mail/utf8.eml', 'rb'), 705 _class=email.message.EmailMessage) 706 actual = utils.extract_body(mail) 707 expected = "Liebe Grüße!\n" 708 709 self.assertEqual(actual, expected) 710 711class TestMessageFromString(unittest.TestCase): 712 713 """Tests for decrypted_message_from_string. 714 715 Because the implementation is that this is a wrapper around 716 decrypted_message_from_file, it's not important to have a large swath of 717 tests, just enough to show that things are being passed correctly. 718 """ 719 720 def test(self): 721 m = email.mime.text.MIMEText('This is some text', 'plain', 'utf-8') 722 m['Subject'] = 'test' 723 m['From'] = 'me' 724 m['To'] = 'Nobody' 725 message = utils.decrypted_message_from_string(m.as_string()) 726 self.assertEqual(message.get_payload(), 'This is some text') 727 728 729class TestRemoveCte(unittest.TestCase): 730 731 def test_char_vs_cte_mismatch(self): # #1291 732 with open('tests/static/mail/broken-utf8.eml') as fp: 733 mail = email.message_from_file(fp) 734 # This should not raise an UnicodeDecodeError. 735 with self.assertLogs(level='DEBUG') as cm: # keep logs 736 utils.remove_cte(mail, as_string=True) 737 # We expect no Exceptions but a complaint in the log 738 logmsg = 'DEBUG:root:Decoding failure: \'utf-8\' codec can\'t decode '\ 739 'byte 0xa1 in position 14: invalid start byte' 740 self.assertIn(logmsg, cm.output) 741 742 def test_malformed_cte_value(self): 743 with open('tests/static/mail/malformed-header-CTE.eml') as fp: 744 mail = email.message_from_file(fp) 745 746 with self.assertLogs(level='INFO') as cm: # keep logs 747 utils.remove_cte(mail, as_string=True) 748 749 # We expect no Exceptions but a complaint in the log 750 logmsg = 'INFO:root:Unknown Content-Transfer-Encoding: "7bit;"' 751 self.assertEqual(cm.output, [logmsg]) 752 753 def test_unknown_cte_value(self): 754 with open('tests/static/mail/malformed-header-CTE-2.eml') as fp: 755 mail = email.message_from_file(fp) 756 757 with self.assertLogs(level='DEBUG') as cm: # keep logs 758 utils.remove_cte(mail, as_string=True) 759 760 # We expect no Exceptions but a complaint in the log 761 logmsg = 'DEBUG:root:failed to interpret Content-Transfer-Encoding: '\ 762 '"normal"' 763 self.assertIn(logmsg, cm.output) 764 765 766class Test_ensure_unique_address(unittest.TestCase): 767 768 foo = 'foo <foo@example.com>' 769 foo2 = 'foo the fanzy <foo@example.com>' 770 bar = 'bar <bar@example.com>' 771 baz = 'baz <baz@example.com>' 772 773 def test_unique_lists_are_unchanged(self): 774 expected = sorted([self.foo, self.bar]) 775 actual = utils.ensure_unique_address(expected) 776 self.assertListEqual(actual, expected) 777 778 def test_equal_entries_are_detected(self): 779 actual = utils.ensure_unique_address( 780 [self.foo, self.bar, self.foo]) 781 expected = sorted([self.foo, self.bar]) 782 self.assertListEqual(actual, expected) 783 784 def test_same_address_with_different_name_is_detected(self): 785 actual = utils.ensure_unique_address( 786 [self.foo, self.foo2]) 787 expected = [self.foo2] 788 self.assertListEqual(actual, expected) 789 790 791class _AccountTestClass(Account): 792 """Implements stubs for ABC methods.""" 793 794 def send_mail(self, mail): 795 pass 796 797 798class TestClearMyAddress(unittest.TestCase): 799 800 me1 = 'me@example.com' 801 me2 = 'ME@example.com' 802 me3 = 'me+label@example.com' 803 me4 = 'ME+label@example.com' 804 me_regex = r'me\+.*@example.com' 805 me_named = 'alot team <me@example.com>' 806 you = 'you@example.com' 807 named = 'somebody you know <somebody@example.com>' 808 imposter = 'alot team <imposter@example.com>' 809 mine = _AccountTestClass( 810 address=me1, aliases=[], alias_regexp=me_regex, case_sensitive_username=True) 811 812 813 def test_empty_input_returns_empty_list(self): 814 self.assertListEqual( 815 utils.clear_my_address(self.mine, []), []) 816 817 def test_only_my_emails_result_in_empty_list(self): 818 expected = [] 819 actual = utils.clear_my_address( 820 self.mine, [self.me1, self.me3, self.me_named]) 821 self.assertListEqual(actual, expected) 822 823 def test_other_emails_are_untouched(self): 824 input_ = [self.you, self.me1, self.me_named, self.named] 825 expected = [self.you, self.named] 826 actual = utils.clear_my_address(self.mine, input_) 827 self.assertListEqual(actual, expected) 828 829 def test_case_matters(self): 830 input_ = [self.me1, self.me2, self.me3, self.me4] 831 expected = [self.me2, self.me4] 832 actual = utils.clear_my_address(self.mine, input_) 833 self.assertListEqual(actual, expected) 834 835 def test_same_address_with_different_real_name_is_removed(self): 836 input_ = [self.me_named, self.you] 837 expected = [self.you] 838 actual = utils.clear_my_address(self.mine, input_) 839 self.assertListEqual(actual, expected) 840 841 842class TestFormataddr(unittest.TestCase): 843 844 address = 'me@example.com' 845 umlauts_and_comma = '"Ö, Ä" <a@b.c>' 846 847 def test_is_inverse(self): 848 self.assertEqual( 849 utils.formataddr(email.utils.parseaddr(self.umlauts_and_comma)), 850 self.umlauts_and_comma 851 ) 852 853 def test_address_only(self): 854 self.assertEqual(utils.formataddr(("", self.address)), self.address) 855 856 def test_name_and_address_no_comma(self): 857 self.assertEqual( 858 utils.formataddr(("Me", self.address)), 859 "Me <me@example.com>" 860 ) 861 def test_name_and_address_with_comma(self): 862 self.assertEqual( 863 utils.formataddr(("Last, Name", self.address)), 864 "\"Last, Name\" <me@example.com>" 865 ) 866