1import os 2import sys 3import time 4import stat 5import socket 6import email 7import email.message 8import re 9import io 10import tempfile 11from test import support 12from test.support import os_helper 13import unittest 14import textwrap 15import mailbox 16import glob 17 18 19class TestBase: 20 21 all_mailbox_types = (mailbox.Message, mailbox.MaildirMessage, 22 mailbox.mboxMessage, mailbox.MHMessage, 23 mailbox.BabylMessage, mailbox.MMDFMessage) 24 25 def _check_sample(self, msg): 26 # Inspect a mailbox.Message representation of the sample message 27 self.assertIsInstance(msg, email.message.Message) 28 self.assertIsInstance(msg, mailbox.Message) 29 for key, value in _sample_headers.items(): 30 self.assertIn(value, msg.get_all(key)) 31 self.assertTrue(msg.is_multipart()) 32 self.assertEqual(len(msg.get_payload()), len(_sample_payloads)) 33 for i, payload in enumerate(_sample_payloads): 34 part = msg.get_payload(i) 35 self.assertIsInstance(part, email.message.Message) 36 self.assertNotIsInstance(part, mailbox.Message) 37 self.assertEqual(part.get_payload(), payload) 38 39 def _delete_recursively(self, target): 40 # Delete a file or delete a directory recursively 41 if os.path.isdir(target): 42 os_helper.rmtree(target) 43 elif os.path.exists(target): 44 os_helper.unlink(target) 45 46 47class TestMailbox(TestBase): 48 49 maxDiff = None 50 51 _factory = None # Overridden by subclasses to reuse tests 52 _template = 'From: foo\n\n%s\n' 53 54 def setUp(self): 55 self._path = os_helper.TESTFN 56 self._delete_recursively(self._path) 57 self._box = self._factory(self._path) 58 59 def tearDown(self): 60 self._box.close() 61 self._delete_recursively(self._path) 62 63 def test_add(self): 64 # Add copies of a sample message 65 keys = [] 66 keys.append(self._box.add(self._template % 0)) 67 self.assertEqual(len(self._box), 1) 68 keys.append(self._box.add(mailbox.Message(_sample_message))) 69 self.assertEqual(len(self._box), 2) 70 keys.append(self._box.add(email.message_from_string(_sample_message))) 71 self.assertEqual(len(self._box), 3) 72 keys.append(self._box.add(io.BytesIO(_bytes_sample_message))) 73 self.assertEqual(len(self._box), 4) 74 keys.append(self._box.add(_sample_message)) 75 self.assertEqual(len(self._box), 5) 76 keys.append(self._box.add(_bytes_sample_message)) 77 self.assertEqual(len(self._box), 6) 78 with self.assertWarns(DeprecationWarning): 79 keys.append(self._box.add( 80 io.TextIOWrapper(io.BytesIO(_bytes_sample_message), encoding="utf-8"))) 81 self.assertEqual(len(self._box), 7) 82 self.assertEqual(self._box.get_string(keys[0]), self._template % 0) 83 for i in (1, 2, 3, 4, 5, 6): 84 self._check_sample(self._box[keys[i]]) 85 86 _nonascii_msg = textwrap.dedent("""\ 87 From: foo 88 Subject: Falinaptár házhozszállítással. Már rendeltél? 89 90 0 91 """) 92 93 def test_add_invalid_8bit_bytes_header(self): 94 key = self._box.add(self._nonascii_msg.encode('latin-1')) 95 self.assertEqual(len(self._box), 1) 96 self.assertEqual(self._box.get_bytes(key), 97 self._nonascii_msg.encode('latin-1')) 98 99 def test_invalid_nonascii_header_as_string(self): 100 subj = self._nonascii_msg.splitlines()[1] 101 key = self._box.add(subj.encode('latin-1')) 102 self.assertEqual(self._box.get_string(key), 103 'Subject: =?unknown-8bit?b?RmFsaW5hcHThciBo4Xpob3pzeuFsbO104XNz' 104 'YWwuIE3hciByZW5kZWx06Ww/?=\n\n') 105 106 def test_add_nonascii_string_header_raises(self): 107 with self.assertRaisesRegex(ValueError, "ASCII-only"): 108 self._box.add(self._nonascii_msg) 109 self._box.flush() 110 self.assertEqual(len(self._box), 0) 111 self.assertMailboxEmpty() 112 113 def test_add_that_raises_leaves_mailbox_empty(self): 114 def raiser(*args, **kw): 115 raise Exception("a fake error") 116 support.patch(self, email.generator.BytesGenerator, 'flatten', raiser) 117 with self.assertRaises(Exception): 118 self._box.add(email.message_from_string("From: Alphöso")) 119 self.assertEqual(len(self._box), 0) 120 self._box.close() 121 self.assertMailboxEmpty() 122 123 _non_latin_bin_msg = textwrap.dedent("""\ 124 From: foo@bar.com 125 To: báz 126 Subject: Maintenant je vous présente mon collègue, le pouf célèbre 127 \tJean de Baddie 128 Mime-Version: 1.0 129 Content-Type: text/plain; charset="utf-8" 130 Content-Transfer-Encoding: 8bit 131 132 Да, они летят. 133 """).encode('utf-8') 134 135 def test_add_8bit_body(self): 136 key = self._box.add(self._non_latin_bin_msg) 137 self.assertEqual(self._box.get_bytes(key), 138 self._non_latin_bin_msg) 139 with self._box.get_file(key) as f: 140 self.assertEqual(f.read(), 141 self._non_latin_bin_msg.replace(b'\n', 142 os.linesep.encode())) 143 self.assertEqual(self._box[key].get_payload(), 144 "Да, они летят.\n") 145 146 def test_add_binary_file(self): 147 with tempfile.TemporaryFile('wb+') as f: 148 f.write(_bytes_sample_message) 149 f.seek(0) 150 key = self._box.add(f) 151 self.assertEqual(self._box.get_bytes(key).split(b'\n'), 152 _bytes_sample_message.split(b'\n')) 153 154 def test_add_binary_nonascii_file(self): 155 with tempfile.TemporaryFile('wb+') as f: 156 f.write(self._non_latin_bin_msg) 157 f.seek(0) 158 key = self._box.add(f) 159 self.assertEqual(self._box.get_bytes(key).split(b'\n'), 160 self._non_latin_bin_msg.split(b'\n')) 161 162 def test_add_text_file_warns(self): 163 with tempfile.TemporaryFile('w+', encoding='utf-8') as f: 164 f.write(_sample_message) 165 f.seek(0) 166 with self.assertWarns(DeprecationWarning): 167 key = self._box.add(f) 168 self.assertEqual(self._box.get_bytes(key).split(b'\n'), 169 _bytes_sample_message.split(b'\n')) 170 171 def test_add_StringIO_warns(self): 172 with self.assertWarns(DeprecationWarning): 173 key = self._box.add(io.StringIO(self._template % "0")) 174 self.assertEqual(self._box.get_string(key), self._template % "0") 175 176 def test_add_nonascii_StringIO_raises(self): 177 with self.assertWarns(DeprecationWarning): 178 with self.assertRaisesRegex(ValueError, "ASCII-only"): 179 self._box.add(io.StringIO(self._nonascii_msg)) 180 self.assertEqual(len(self._box), 0) 181 self._box.close() 182 self.assertMailboxEmpty() 183 184 def test_remove(self): 185 # Remove messages using remove() 186 self._test_remove_or_delitem(self._box.remove) 187 188 def test_delitem(self): 189 # Remove messages using __delitem__() 190 self._test_remove_or_delitem(self._box.__delitem__) 191 192 def _test_remove_or_delitem(self, method): 193 # (Used by test_remove() and test_delitem().) 194 key0 = self._box.add(self._template % 0) 195 key1 = self._box.add(self._template % 1) 196 self.assertEqual(len(self._box), 2) 197 method(key0) 198 self.assertEqual(len(self._box), 1) 199 self.assertRaises(KeyError, lambda: self._box[key0]) 200 self.assertRaises(KeyError, lambda: method(key0)) 201 self.assertEqual(self._box.get_string(key1), self._template % 1) 202 key2 = self._box.add(self._template % 2) 203 self.assertEqual(len(self._box), 2) 204 method(key2) 205 self.assertEqual(len(self._box), 1) 206 self.assertRaises(KeyError, lambda: self._box[key2]) 207 self.assertRaises(KeyError, lambda: method(key2)) 208 self.assertEqual(self._box.get_string(key1), self._template % 1) 209 method(key1) 210 self.assertEqual(len(self._box), 0) 211 self.assertRaises(KeyError, lambda: self._box[key1]) 212 self.assertRaises(KeyError, lambda: method(key1)) 213 214 def test_discard(self, repetitions=10): 215 # Discard messages 216 key0 = self._box.add(self._template % 0) 217 key1 = self._box.add(self._template % 1) 218 self.assertEqual(len(self._box), 2) 219 self._box.discard(key0) 220 self.assertEqual(len(self._box), 1) 221 self.assertRaises(KeyError, lambda: self._box[key0]) 222 self._box.discard(key0) 223 self.assertEqual(len(self._box), 1) 224 self.assertRaises(KeyError, lambda: self._box[key0]) 225 226 def test_get(self): 227 # Retrieve messages using get() 228 key0 = self._box.add(self._template % 0) 229 msg = self._box.get(key0) 230 self.assertEqual(msg['from'], 'foo') 231 self.assertEqual(msg.get_payload(), '0\n') 232 self.assertIsNone(self._box.get('foo')) 233 self.assertIs(self._box.get('foo', False), False) 234 self._box.close() 235 self._box = self._factory(self._path) 236 key1 = self._box.add(self._template % 1) 237 msg = self._box.get(key1) 238 self.assertEqual(msg['from'], 'foo') 239 self.assertEqual(msg.get_payload(), '1\n') 240 241 def test_getitem(self): 242 # Retrieve message using __getitem__() 243 key0 = self._box.add(self._template % 0) 244 msg = self._box[key0] 245 self.assertEqual(msg['from'], 'foo') 246 self.assertEqual(msg.get_payload(), '0\n') 247 self.assertRaises(KeyError, lambda: self._box['foo']) 248 self._box.discard(key0) 249 self.assertRaises(KeyError, lambda: self._box[key0]) 250 251 def test_get_message(self): 252 # Get Message representations of messages 253 key0 = self._box.add(self._template % 0) 254 key1 = self._box.add(_sample_message) 255 msg0 = self._box.get_message(key0) 256 self.assertIsInstance(msg0, mailbox.Message) 257 self.assertEqual(msg0['from'], 'foo') 258 self.assertEqual(msg0.get_payload(), '0\n') 259 self._check_sample(self._box.get_message(key1)) 260 261 def test_get_bytes(self): 262 # Get bytes representations of messages 263 key0 = self._box.add(self._template % 0) 264 key1 = self._box.add(_sample_message) 265 self.assertEqual(self._box.get_bytes(key0), 266 (self._template % 0).encode('ascii')) 267 self.assertEqual(self._box.get_bytes(key1), _bytes_sample_message) 268 269 def test_get_string(self): 270 # Get string representations of messages 271 key0 = self._box.add(self._template % 0) 272 key1 = self._box.add(_sample_message) 273 self.assertEqual(self._box.get_string(key0), self._template % 0) 274 self.assertEqual(self._box.get_string(key1).split('\n'), 275 _sample_message.split('\n')) 276 277 def test_get_file(self): 278 # Get file representations of messages 279 key0 = self._box.add(self._template % 0) 280 key1 = self._box.add(_sample_message) 281 with self._box.get_file(key0) as file: 282 data0 = file.read() 283 with self._box.get_file(key1) as file: 284 data1 = file.read() 285 self.assertEqual(data0.decode('ascii').replace(os.linesep, '\n'), 286 self._template % 0) 287 self.assertEqual(data1.decode('ascii').replace(os.linesep, '\n'), 288 _sample_message) 289 290 def test_get_file_can_be_closed_twice(self): 291 # Issue 11700 292 key = self._box.add(_sample_message) 293 f = self._box.get_file(key) 294 f.close() 295 f.close() 296 297 def test_iterkeys(self): 298 # Get keys using iterkeys() 299 self._check_iteration(self._box.iterkeys, do_keys=True, do_values=False) 300 301 def test_keys(self): 302 # Get keys using keys() 303 self._check_iteration(self._box.keys, do_keys=True, do_values=False) 304 305 def test_itervalues(self): 306 # Get values using itervalues() 307 self._check_iteration(self._box.itervalues, do_keys=False, 308 do_values=True) 309 310 def test_iter(self): 311 # Get values using __iter__() 312 self._check_iteration(self._box.__iter__, do_keys=False, 313 do_values=True) 314 315 def test_values(self): 316 # Get values using values() 317 self._check_iteration(self._box.values, do_keys=False, do_values=True) 318 319 def test_iteritems(self): 320 # Get keys and values using iteritems() 321 self._check_iteration(self._box.iteritems, do_keys=True, 322 do_values=True) 323 324 def test_items(self): 325 # Get keys and values using items() 326 self._check_iteration(self._box.items, do_keys=True, do_values=True) 327 328 def _check_iteration(self, method, do_keys, do_values, repetitions=10): 329 for value in method(): 330 self.fail("Not empty") 331 keys, values = [], [] 332 for i in range(repetitions): 333 keys.append(self._box.add(self._template % i)) 334 values.append(self._template % i) 335 if do_keys and not do_values: 336 returned_keys = list(method()) 337 elif do_values and not do_keys: 338 returned_values = list(method()) 339 else: 340 returned_keys, returned_values = [], [] 341 for key, value in method(): 342 returned_keys.append(key) 343 returned_values.append(value) 344 if do_keys: 345 self.assertEqual(len(keys), len(returned_keys)) 346 self.assertEqual(set(keys), set(returned_keys)) 347 if do_values: 348 count = 0 349 for value in returned_values: 350 self.assertEqual(value['from'], 'foo') 351 self.assertLess(int(value.get_payload()), repetitions) 352 count += 1 353 self.assertEqual(len(values), count) 354 355 def test_contains(self): 356 # Check existence of keys using __contains__() 357 self.assertNotIn('foo', self._box) 358 key0 = self._box.add(self._template % 0) 359 self.assertIn(key0, self._box) 360 self.assertNotIn('foo', self._box) 361 key1 = self._box.add(self._template % 1) 362 self.assertIn(key1, self._box) 363 self.assertIn(key0, self._box) 364 self.assertNotIn('foo', self._box) 365 self._box.remove(key0) 366 self.assertNotIn(key0, self._box) 367 self.assertIn(key1, self._box) 368 self.assertNotIn('foo', self._box) 369 self._box.remove(key1) 370 self.assertNotIn(key1, self._box) 371 self.assertNotIn(key0, self._box) 372 self.assertNotIn('foo', self._box) 373 374 def test_len(self, repetitions=10): 375 # Get message count 376 keys = [] 377 for i in range(repetitions): 378 self.assertEqual(len(self._box), i) 379 keys.append(self._box.add(self._template % i)) 380 self.assertEqual(len(self._box), i + 1) 381 for i in range(repetitions): 382 self.assertEqual(len(self._box), repetitions - i) 383 self._box.remove(keys[i]) 384 self.assertEqual(len(self._box), repetitions - i - 1) 385 386 def test_set_item(self): 387 # Modify messages using __setitem__() 388 key0 = self._box.add(self._template % 'original 0') 389 self.assertEqual(self._box.get_string(key0), 390 self._template % 'original 0') 391 key1 = self._box.add(self._template % 'original 1') 392 self.assertEqual(self._box.get_string(key1), 393 self._template % 'original 1') 394 self._box[key0] = self._template % 'changed 0' 395 self.assertEqual(self._box.get_string(key0), 396 self._template % 'changed 0') 397 self._box[key1] = self._template % 'changed 1' 398 self.assertEqual(self._box.get_string(key1), 399 self._template % 'changed 1') 400 self._box[key0] = _sample_message 401 self._check_sample(self._box[key0]) 402 self._box[key1] = self._box[key0] 403 self._check_sample(self._box[key1]) 404 self._box[key0] = self._template % 'original 0' 405 self.assertEqual(self._box.get_string(key0), 406 self._template % 'original 0') 407 self._check_sample(self._box[key1]) 408 self.assertRaises(KeyError, 409 lambda: self._box.__setitem__('foo', 'bar')) 410 self.assertRaises(KeyError, lambda: self._box['foo']) 411 self.assertEqual(len(self._box), 2) 412 413 def test_clear(self, iterations=10): 414 # Remove all messages using clear() 415 keys = [] 416 for i in range(iterations): 417 self._box.add(self._template % i) 418 for i, key in enumerate(keys): 419 self.assertEqual(self._box.get_string(key), self._template % i) 420 self._box.clear() 421 self.assertEqual(len(self._box), 0) 422 for i, key in enumerate(keys): 423 self.assertRaises(KeyError, lambda: self._box.get_string(key)) 424 425 def test_pop(self): 426 # Get and remove a message using pop() 427 key0 = self._box.add(self._template % 0) 428 self.assertIn(key0, self._box) 429 key1 = self._box.add(self._template % 1) 430 self.assertIn(key1, self._box) 431 self.assertEqual(self._box.pop(key0).get_payload(), '0\n') 432 self.assertNotIn(key0, self._box) 433 self.assertIn(key1, self._box) 434 key2 = self._box.add(self._template % 2) 435 self.assertIn(key2, self._box) 436 self.assertEqual(self._box.pop(key2).get_payload(), '2\n') 437 self.assertNotIn(key2, self._box) 438 self.assertIn(key1, self._box) 439 self.assertEqual(self._box.pop(key1).get_payload(), '1\n') 440 self.assertNotIn(key1, self._box) 441 self.assertEqual(len(self._box), 0) 442 443 def test_popitem(self, iterations=10): 444 # Get and remove an arbitrary (key, message) using popitem() 445 keys = [] 446 for i in range(10): 447 keys.append(self._box.add(self._template % i)) 448 seen = [] 449 for i in range(10): 450 key, msg = self._box.popitem() 451 self.assertIn(key, keys) 452 self.assertNotIn(key, seen) 453 seen.append(key) 454 self.assertEqual(int(msg.get_payload()), keys.index(key)) 455 self.assertEqual(len(self._box), 0) 456 for key in keys: 457 self.assertRaises(KeyError, lambda: self._box[key]) 458 459 def test_update(self): 460 # Modify multiple messages using update() 461 key0 = self._box.add(self._template % 'original 0') 462 key1 = self._box.add(self._template % 'original 1') 463 key2 = self._box.add(self._template % 'original 2') 464 self._box.update({key0: self._template % 'changed 0', 465 key2: _sample_message}) 466 self.assertEqual(len(self._box), 3) 467 self.assertEqual(self._box.get_string(key0), 468 self._template % 'changed 0') 469 self.assertEqual(self._box.get_string(key1), 470 self._template % 'original 1') 471 self._check_sample(self._box[key2]) 472 self._box.update([(key2, self._template % 'changed 2'), 473 (key1, self._template % 'changed 1'), 474 (key0, self._template % 'original 0')]) 475 self.assertEqual(len(self._box), 3) 476 self.assertEqual(self._box.get_string(key0), 477 self._template % 'original 0') 478 self.assertEqual(self._box.get_string(key1), 479 self._template % 'changed 1') 480 self.assertEqual(self._box.get_string(key2), 481 self._template % 'changed 2') 482 self.assertRaises(KeyError, 483 lambda: self._box.update({'foo': 'bar', 484 key0: self._template % "changed 0"})) 485 self.assertEqual(len(self._box), 3) 486 self.assertEqual(self._box.get_string(key0), 487 self._template % "changed 0") 488 self.assertEqual(self._box.get_string(key1), 489 self._template % "changed 1") 490 self.assertEqual(self._box.get_string(key2), 491 self._template % "changed 2") 492 493 def test_flush(self): 494 # Write changes to disk 495 self._test_flush_or_close(self._box.flush, True) 496 497 def test_popitem_and_flush_twice(self): 498 # See #15036. 499 self._box.add(self._template % 0) 500 self._box.add(self._template % 1) 501 self._box.flush() 502 503 self._box.popitem() 504 self._box.flush() 505 self._box.popitem() 506 self._box.flush() 507 508 def test_lock_unlock(self): 509 # Lock and unlock the mailbox 510 self.assertFalse(os.path.exists(self._get_lock_path())) 511 self._box.lock() 512 self.assertTrue(os.path.exists(self._get_lock_path())) 513 self._box.unlock() 514 self.assertFalse(os.path.exists(self._get_lock_path())) 515 516 def test_close(self): 517 # Close mailbox and flush changes to disk 518 self._test_flush_or_close(self._box.close, False) 519 520 def _test_flush_or_close(self, method, should_call_close): 521 contents = [self._template % i for i in range(3)] 522 self._box.add(contents[0]) 523 self._box.add(contents[1]) 524 self._box.add(contents[2]) 525 oldbox = self._box 526 method() 527 if should_call_close: 528 self._box.close() 529 self._box = self._factory(self._path) 530 keys = self._box.keys() 531 self.assertEqual(len(keys), 3) 532 for key in keys: 533 self.assertIn(self._box.get_string(key), contents) 534 oldbox.close() 535 536 def test_dump_message(self): 537 # Write message representations to disk 538 for input in (email.message_from_string(_sample_message), 539 _sample_message, io.BytesIO(_bytes_sample_message)): 540 output = io.BytesIO() 541 self._box._dump_message(input, output) 542 self.assertEqual(output.getvalue(), 543 _bytes_sample_message.replace(b'\n', os.linesep.encode())) 544 output = io.BytesIO() 545 self.assertRaises(TypeError, 546 lambda: self._box._dump_message(None, output)) 547 548 def _get_lock_path(self): 549 # Return the path of the dot lock file. May be overridden. 550 return self._path + '.lock' 551 552 553class TestMailboxSuperclass(TestBase, unittest.TestCase): 554 555 def test_notimplemented(self): 556 # Test that all Mailbox methods raise NotImplementedException. 557 box = mailbox.Mailbox('path') 558 self.assertRaises(NotImplementedError, lambda: box.add('')) 559 self.assertRaises(NotImplementedError, lambda: box.remove('')) 560 self.assertRaises(NotImplementedError, lambda: box.__delitem__('')) 561 self.assertRaises(NotImplementedError, lambda: box.discard('')) 562 self.assertRaises(NotImplementedError, lambda: box.__setitem__('', '')) 563 self.assertRaises(NotImplementedError, lambda: box.iterkeys()) 564 self.assertRaises(NotImplementedError, lambda: box.keys()) 565 self.assertRaises(NotImplementedError, lambda: box.itervalues().__next__()) 566 self.assertRaises(NotImplementedError, lambda: box.__iter__().__next__()) 567 self.assertRaises(NotImplementedError, lambda: box.values()) 568 self.assertRaises(NotImplementedError, lambda: box.iteritems().__next__()) 569 self.assertRaises(NotImplementedError, lambda: box.items()) 570 self.assertRaises(NotImplementedError, lambda: box.get('')) 571 self.assertRaises(NotImplementedError, lambda: box.__getitem__('')) 572 self.assertRaises(NotImplementedError, lambda: box.get_message('')) 573 self.assertRaises(NotImplementedError, lambda: box.get_string('')) 574 self.assertRaises(NotImplementedError, lambda: box.get_bytes('')) 575 self.assertRaises(NotImplementedError, lambda: box.get_file('')) 576 self.assertRaises(NotImplementedError, lambda: '' in box) 577 self.assertRaises(NotImplementedError, lambda: box.__contains__('')) 578 self.assertRaises(NotImplementedError, lambda: box.__len__()) 579 self.assertRaises(NotImplementedError, lambda: box.clear()) 580 self.assertRaises(NotImplementedError, lambda: box.pop('')) 581 self.assertRaises(NotImplementedError, lambda: box.popitem()) 582 self.assertRaises(NotImplementedError, lambda: box.update((('', ''),))) 583 self.assertRaises(NotImplementedError, lambda: box.flush()) 584 self.assertRaises(NotImplementedError, lambda: box.lock()) 585 self.assertRaises(NotImplementedError, lambda: box.unlock()) 586 self.assertRaises(NotImplementedError, lambda: box.close()) 587 588 589class TestMaildir(TestMailbox, unittest.TestCase): 590 591 _factory = lambda self, path, factory=None: mailbox.Maildir(path, factory) 592 593 def setUp(self): 594 TestMailbox.setUp(self) 595 if (os.name == 'nt') or (sys.platform == 'cygwin'): 596 self._box.colon = '!' 597 598 def assertMailboxEmpty(self): 599 self.assertEqual(os.listdir(os.path.join(self._path, 'tmp')), []) 600 601 def test_add_MM(self): 602 # Add a MaildirMessage instance 603 msg = mailbox.MaildirMessage(self._template % 0) 604 msg.set_subdir('cur') 605 msg.set_info('foo') 606 key = self._box.add(msg) 607 self.assertTrue(os.path.exists(os.path.join(self._path, 'cur', '%s%sfoo' % 608 (key, self._box.colon)))) 609 610 def test_get_MM(self): 611 # Get a MaildirMessage instance 612 msg = mailbox.MaildirMessage(self._template % 0) 613 msg.set_subdir('cur') 614 msg.set_flags('RF') 615 key = self._box.add(msg) 616 msg_returned = self._box.get_message(key) 617 self.assertIsInstance(msg_returned, mailbox.MaildirMessage) 618 self.assertEqual(msg_returned.get_subdir(), 'cur') 619 self.assertEqual(msg_returned.get_flags(), 'FR') 620 621 def test_set_MM(self): 622 # Set with a MaildirMessage instance 623 msg0 = mailbox.MaildirMessage(self._template % 0) 624 msg0.set_flags('TP') 625 key = self._box.add(msg0) 626 msg_returned = self._box.get_message(key) 627 self.assertEqual(msg_returned.get_subdir(), 'new') 628 self.assertEqual(msg_returned.get_flags(), 'PT') 629 msg1 = mailbox.MaildirMessage(self._template % 1) 630 self._box[key] = msg1 631 msg_returned = self._box.get_message(key) 632 self.assertEqual(msg_returned.get_subdir(), 'new') 633 self.assertEqual(msg_returned.get_flags(), '') 634 self.assertEqual(msg_returned.get_payload(), '1\n') 635 msg2 = mailbox.MaildirMessage(self._template % 2) 636 msg2.set_info('2,S') 637 self._box[key] = msg2 638 self._box[key] = self._template % 3 639 msg_returned = self._box.get_message(key) 640 self.assertEqual(msg_returned.get_subdir(), 'new') 641 self.assertEqual(msg_returned.get_flags(), 'S') 642 self.assertEqual(msg_returned.get_payload(), '3\n') 643 644 def test_consistent_factory(self): 645 # Add a message. 646 msg = mailbox.MaildirMessage(self._template % 0) 647 msg.set_subdir('cur') 648 msg.set_flags('RF') 649 key = self._box.add(msg) 650 651 # Create new mailbox with 652 class FakeMessage(mailbox.MaildirMessage): 653 pass 654 box = mailbox.Maildir(self._path, factory=FakeMessage) 655 box.colon = self._box.colon 656 msg2 = box.get_message(key) 657 self.assertIsInstance(msg2, FakeMessage) 658 659 def test_initialize_new(self): 660 # Initialize a non-existent mailbox 661 self.tearDown() 662 self._box = mailbox.Maildir(self._path) 663 self._check_basics() 664 self._delete_recursively(self._path) 665 self._box = self._factory(self._path, factory=None) 666 self._check_basics() 667 668 def test_initialize_existing(self): 669 # Initialize an existing mailbox 670 self.tearDown() 671 for subdir in '', 'tmp', 'new', 'cur': 672 os.mkdir(os.path.normpath(os.path.join(self._path, subdir))) 673 self._box = mailbox.Maildir(self._path) 674 self._check_basics() 675 676 def _check_basics(self, factory=None): 677 # (Used by test_open_new() and test_open_existing().) 678 self.assertEqual(self._box._path, os.path.abspath(self._path)) 679 self.assertEqual(self._box._factory, factory) 680 for subdir in '', 'tmp', 'new', 'cur': 681 path = os.path.join(self._path, subdir) 682 mode = os.stat(path)[stat.ST_MODE] 683 self.assertTrue(stat.S_ISDIR(mode), "Not a directory: '%s'" % path) 684 685 def test_list_folders(self): 686 # List folders 687 self._box.add_folder('one') 688 self._box.add_folder('two') 689 self._box.add_folder('three') 690 self.assertEqual(len(self._box.list_folders()), 3) 691 self.assertEqual(set(self._box.list_folders()), 692 set(('one', 'two', 'three'))) 693 694 def test_get_folder(self): 695 # Open folders 696 self._box.add_folder('foo.bar') 697 folder0 = self._box.get_folder('foo.bar') 698 folder0.add(self._template % 'bar') 699 self.assertTrue(os.path.isdir(os.path.join(self._path, '.foo.bar'))) 700 folder1 = self._box.get_folder('foo.bar') 701 self.assertEqual(folder1.get_string(folder1.keys()[0]), 702 self._template % 'bar') 703 704 def test_add_and_remove_folders(self): 705 # Delete folders 706 self._box.add_folder('one') 707 self._box.add_folder('two') 708 self.assertEqual(len(self._box.list_folders()), 2) 709 self.assertEqual(set(self._box.list_folders()), set(('one', 'two'))) 710 self._box.remove_folder('one') 711 self.assertEqual(len(self._box.list_folders()), 1) 712 self.assertEqual(set(self._box.list_folders()), set(('two',))) 713 self._box.add_folder('three') 714 self.assertEqual(len(self._box.list_folders()), 2) 715 self.assertEqual(set(self._box.list_folders()), set(('two', 'three'))) 716 self._box.remove_folder('three') 717 self.assertEqual(len(self._box.list_folders()), 1) 718 self.assertEqual(set(self._box.list_folders()), set(('two',))) 719 self._box.remove_folder('two') 720 self.assertEqual(len(self._box.list_folders()), 0) 721 self.assertEqual(self._box.list_folders(), []) 722 723 def test_clean(self): 724 # Remove old files from 'tmp' 725 foo_path = os.path.join(self._path, 'tmp', 'foo') 726 bar_path = os.path.join(self._path, 'tmp', 'bar') 727 with open(foo_path, 'w', encoding='utf-8') as f: 728 f.write("@") 729 with open(bar_path, 'w', encoding='utf-8') as f: 730 f.write("@") 731 self._box.clean() 732 self.assertTrue(os.path.exists(foo_path)) 733 self.assertTrue(os.path.exists(bar_path)) 734 foo_stat = os.stat(foo_path) 735 os.utime(foo_path, (time.time() - 129600 - 2, 736 foo_stat.st_mtime)) 737 self._box.clean() 738 self.assertFalse(os.path.exists(foo_path)) 739 self.assertTrue(os.path.exists(bar_path)) 740 741 def test_create_tmp(self, repetitions=10): 742 # Create files in tmp directory 743 hostname = socket.gethostname() 744 if '/' in hostname: 745 hostname = hostname.replace('/', r'\057') 746 if ':' in hostname: 747 hostname = hostname.replace(':', r'\072') 748 pid = os.getpid() 749 pattern = re.compile(r"(?P<time>\d+)\.M(?P<M>\d{1,6})P(?P<P>\d+)" 750 r"Q(?P<Q>\d+)\.(?P<host>[^:/]*)") 751 previous_groups = None 752 for x in range(repetitions): 753 tmp_file = self._box._create_tmp() 754 head, tail = os.path.split(tmp_file.name) 755 self.assertEqual(head, os.path.abspath(os.path.join(self._path, 756 "tmp")), 757 "File in wrong location: '%s'" % head) 758 match = pattern.match(tail) 759 self.assertIsNotNone(match, "Invalid file name: '%s'" % tail) 760 groups = match.groups() 761 if previous_groups is not None: 762 self.assertGreaterEqual(int(groups[0]), int(previous_groups[0]), 763 "Non-monotonic seconds: '%s' before '%s'" % 764 (previous_groups[0], groups[0])) 765 if int(groups[0]) == int(previous_groups[0]): 766 self.assertGreaterEqual(int(groups[1]), int(previous_groups[1]), 767 "Non-monotonic milliseconds: '%s' before '%s'" % 768 (previous_groups[1], groups[1])) 769 self.assertEqual(int(groups[2]), pid, 770 "Process ID mismatch: '%s' should be '%s'" % 771 (groups[2], pid)) 772 self.assertEqual(int(groups[3]), int(previous_groups[3]) + 1, 773 "Non-sequential counter: '%s' before '%s'" % 774 (previous_groups[3], groups[3])) 775 self.assertEqual(groups[4], hostname, 776 "Host name mismatch: '%s' should be '%s'" % 777 (groups[4], hostname)) 778 previous_groups = groups 779 tmp_file.write(_bytes_sample_message) 780 tmp_file.seek(0) 781 self.assertEqual(tmp_file.read(), _bytes_sample_message) 782 tmp_file.close() 783 file_count = len(os.listdir(os.path.join(self._path, "tmp"))) 784 self.assertEqual(file_count, repetitions, 785 "Wrong file count: '%s' should be '%s'" % 786 (file_count, repetitions)) 787 788 def test_refresh(self): 789 # Update the table of contents 790 self.assertEqual(self._box._toc, {}) 791 key0 = self._box.add(self._template % 0) 792 key1 = self._box.add(self._template % 1) 793 self.assertEqual(self._box._toc, {}) 794 self._box._refresh() 795 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0), 796 key1: os.path.join('new', key1)}) 797 key2 = self._box.add(self._template % 2) 798 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0), 799 key1: os.path.join('new', key1)}) 800 self._box._refresh() 801 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0), 802 key1: os.path.join('new', key1), 803 key2: os.path.join('new', key2)}) 804 805 def test_refresh_after_safety_period(self): 806 # Issue #13254: Call _refresh after the "file system safety 807 # period" of 2 seconds has passed; _toc should still be 808 # updated because this is the first call to _refresh. 809 key0 = self._box.add(self._template % 0) 810 key1 = self._box.add(self._template % 1) 811 812 self._box = self._factory(self._path) 813 self.assertEqual(self._box._toc, {}) 814 815 # Emulate sleeping. Instead of sleeping for 2 seconds, use the 816 # skew factor to make _refresh think that the filesystem 817 # safety period has passed and re-reading the _toc is only 818 # required if mtimes differ. 819 self._box._skewfactor = -3 820 821 self._box._refresh() 822 self.assertEqual(sorted(self._box._toc.keys()), sorted([key0, key1])) 823 824 def test_lookup(self): 825 # Look up message subpaths in the TOC 826 self.assertRaises(KeyError, lambda: self._box._lookup('foo')) 827 key0 = self._box.add(self._template % 0) 828 self.assertEqual(self._box._lookup(key0), os.path.join('new', key0)) 829 os.remove(os.path.join(self._path, 'new', key0)) 830 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0)}) 831 # Be sure that the TOC is read back from disk (see issue #6896 832 # about bad mtime behaviour on some systems). 833 self._box.flush() 834 self.assertRaises(KeyError, lambda: self._box._lookup(key0)) 835 self.assertEqual(self._box._toc, {}) 836 837 def test_lock_unlock(self): 838 # Lock and unlock the mailbox. For Maildir, this does nothing. 839 self._box.lock() 840 self._box.unlock() 841 842 def test_folder (self): 843 # Test for bug #1569790: verify that folders returned by .get_folder() 844 # use the same factory function. 845 def dummy_factory (s): 846 return None 847 box = self._factory(self._path, factory=dummy_factory) 848 folder = box.add_folder('folder1') 849 self.assertIs(folder._factory, dummy_factory) 850 851 folder1_alias = box.get_folder('folder1') 852 self.assertIs(folder1_alias._factory, dummy_factory) 853 854 def test_directory_in_folder (self): 855 # Test that mailboxes still work if there's a stray extra directory 856 # in a folder. 857 for i in range(10): 858 self._box.add(mailbox.Message(_sample_message)) 859 860 # Create a stray directory 861 os.mkdir(os.path.join(self._path, 'cur', 'stray-dir')) 862 863 # Check that looping still works with the directory present. 864 for msg in self._box: 865 pass 866 867 @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()') 868 def test_file_permissions(self): 869 # Verify that message files are created without execute permissions 870 msg = mailbox.MaildirMessage(self._template % 0) 871 orig_umask = os.umask(0) 872 try: 873 key = self._box.add(msg) 874 finally: 875 os.umask(orig_umask) 876 path = os.path.join(self._path, self._box._lookup(key)) 877 mode = os.stat(path).st_mode 878 self.assertFalse(mode & 0o111) 879 880 @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()') 881 def test_folder_file_perms(self): 882 # From bug #3228, we want to verify that the file created inside a Maildir 883 # subfolder isn't marked as executable. 884 orig_umask = os.umask(0) 885 try: 886 subfolder = self._box.add_folder('subfolder') 887 finally: 888 os.umask(orig_umask) 889 890 path = os.path.join(subfolder._path, 'maildirfolder') 891 st = os.stat(path) 892 perms = st.st_mode 893 self.assertFalse((perms & 0o111)) # Execute bits should all be off. 894 895 def test_reread(self): 896 # Do an initial unconditional refresh 897 self._box._refresh() 898 899 # Put the last modified times more than two seconds into the past 900 # (because mtime may have a two second granularity) 901 for subdir in ('cur', 'new'): 902 os.utime(os.path.join(self._box._path, subdir), 903 (time.time()-5,)*2) 904 905 # Because mtime has a two second granularity in worst case (FAT), a 906 # refresh is done unconditionally if called for within 907 # two-second-plus-a-bit of the last one, just in case the mbox has 908 # changed; so now we have to wait for that interval to expire. 909 # 910 # Because this is a test, emulate sleeping. Instead of 911 # sleeping for 2 seconds, use the skew factor to make _refresh 912 # think that 2 seconds have passed and re-reading the _toc is 913 # only required if mtimes differ. 914 self._box._skewfactor = -3 915 916 # Re-reading causes the ._toc attribute to be assigned a new dictionary 917 # object, so we'll check that the ._toc attribute isn't a different 918 # object. 919 orig_toc = self._box._toc 920 def refreshed(): 921 return self._box._toc is not orig_toc 922 923 self._box._refresh() 924 self.assertFalse(refreshed()) 925 926 # Now, write something into cur and remove it. This changes 927 # the mtime and should cause a re-read. Note that "sleep 928 # emulation" is still in effect, as skewfactor is -3. 929 filename = os.path.join(self._path, 'cur', 'stray-file') 930 os_helper.create_empty_file(filename) 931 os.unlink(filename) 932 self._box._refresh() 933 self.assertTrue(refreshed()) 934 935 936class _TestSingleFile(TestMailbox): 937 '''Common tests for single-file mailboxes''' 938 939 def test_add_doesnt_rewrite(self): 940 # When only adding messages, flush() should not rewrite the 941 # mailbox file. See issue #9559. 942 943 # Inode number changes if the contents are written to another 944 # file which is then renamed over the original file. So we 945 # must check that the inode number doesn't change. 946 inode_before = os.stat(self._path).st_ino 947 948 self._box.add(self._template % 0) 949 self._box.flush() 950 951 inode_after = os.stat(self._path).st_ino 952 self.assertEqual(inode_before, inode_after) 953 954 # Make sure the message was really added 955 self._box.close() 956 self._box = self._factory(self._path) 957 self.assertEqual(len(self._box), 1) 958 959 def test_permissions_after_flush(self): 960 # See issue #5346 961 962 # Make the mailbox world writable. It's unlikely that the new 963 # mailbox file would have these permissions after flush(), 964 # because umask usually prevents it. 965 mode = os.stat(self._path).st_mode | 0o666 966 os.chmod(self._path, mode) 967 968 self._box.add(self._template % 0) 969 i = self._box.add(self._template % 1) 970 # Need to remove one message to make flush() create a new file 971 self._box.remove(i) 972 self._box.flush() 973 974 self.assertEqual(os.stat(self._path).st_mode, mode) 975 976 977class _TestMboxMMDF(_TestSingleFile): 978 979 def tearDown(self): 980 super().tearDown() 981 self._box.close() 982 self._delete_recursively(self._path) 983 for lock_remnant in glob.glob(glob.escape(self._path) + '.*'): 984 os_helper.unlink(lock_remnant) 985 986 def assertMailboxEmpty(self): 987 with open(self._path, 'rb') as f: 988 self.assertEqual(f.readlines(), []) 989 990 def test_get_bytes_from(self): 991 # Get bytes representations of messages with _unixfrom. 992 unixfrom = 'From foo@bar blah\n' 993 key0 = self._box.add(unixfrom + self._template % 0) 994 key1 = self._box.add(unixfrom + _sample_message) 995 self.assertEqual(self._box.get_bytes(key0, from_=False), 996 (self._template % 0).encode('ascii')) 997 self.assertEqual(self._box.get_bytes(key1, from_=False), 998 _bytes_sample_message) 999 self.assertEqual(self._box.get_bytes(key0, from_=True), 1000 (unixfrom + self._template % 0).encode('ascii')) 1001 self.assertEqual(self._box.get_bytes(key1, from_=True), 1002 unixfrom.encode('ascii') + _bytes_sample_message) 1003 1004 def test_get_string_from(self): 1005 # Get string representations of messages with _unixfrom. 1006 unixfrom = 'From foo@bar blah\n' 1007 key0 = self._box.add(unixfrom + self._template % 0) 1008 key1 = self._box.add(unixfrom + _sample_message) 1009 self.assertEqual(self._box.get_string(key0, from_=False), 1010 self._template % 0) 1011 self.assertEqual(self._box.get_string(key1, from_=False).split('\n'), 1012 _sample_message.split('\n')) 1013 self.assertEqual(self._box.get_string(key0, from_=True), 1014 unixfrom + self._template % 0) 1015 self.assertEqual(self._box.get_string(key1, from_=True).split('\n'), 1016 (unixfrom + _sample_message).split('\n')) 1017 1018 def test_add_from_string(self): 1019 # Add a string starting with 'From ' to the mailbox 1020 key = self._box.add('From foo@bar blah\nFrom: foo\n\n0\n') 1021 self.assertEqual(self._box[key].get_from(), 'foo@bar blah') 1022 self.assertEqual(self._box[key].get_payload(), '0\n') 1023 1024 def test_add_from_bytes(self): 1025 # Add a byte string starting with 'From ' to the mailbox 1026 key = self._box.add(b'From foo@bar blah\nFrom: foo\n\n0\n') 1027 self.assertEqual(self._box[key].get_from(), 'foo@bar blah') 1028 self.assertEqual(self._box[key].get_payload(), '0\n') 1029 1030 def test_add_mbox_or_mmdf_message(self): 1031 # Add an mboxMessage or MMDFMessage 1032 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1033 msg = class_('From foo@bar blah\nFrom: foo\n\n0\n') 1034 key = self._box.add(msg) 1035 1036 def test_open_close_open(self): 1037 # Open and inspect previously-created mailbox 1038 values = [self._template % i for i in range(3)] 1039 for value in values: 1040 self._box.add(value) 1041 self._box.close() 1042 mtime = os.path.getmtime(self._path) 1043 self._box = self._factory(self._path) 1044 self.assertEqual(len(self._box), 3) 1045 for key in self._box.iterkeys(): 1046 self.assertIn(self._box.get_string(key), values) 1047 self._box.close() 1048 self.assertEqual(mtime, os.path.getmtime(self._path)) 1049 1050 def test_add_and_close(self): 1051 # Verifying that closing a mailbox doesn't change added items 1052 self._box.add(_sample_message) 1053 for i in range(3): 1054 self._box.add(self._template % i) 1055 self._box.add(_sample_message) 1056 self._box._file.flush() 1057 self._box._file.seek(0) 1058 contents = self._box._file.read() 1059 self._box.close() 1060 with open(self._path, 'rb') as f: 1061 self.assertEqual(contents, f.read()) 1062 self._box = self._factory(self._path) 1063 1064 @unittest.skipUnless(hasattr(os, 'fork'), "Test needs fork().") 1065 @unittest.skipUnless(hasattr(socket, 'socketpair'), "Test needs socketpair().") 1066 def test_lock_conflict(self): 1067 # Fork off a child process that will lock the mailbox temporarily, 1068 # unlock it and exit. 1069 c, p = socket.socketpair() 1070 self.addCleanup(c.close) 1071 self.addCleanup(p.close) 1072 1073 pid = os.fork() 1074 if pid == 0: 1075 # child 1076 try: 1077 # lock the mailbox, and signal the parent it can proceed 1078 self._box.lock() 1079 c.send(b'c') 1080 1081 # wait until the parent is done, and unlock the mailbox 1082 c.recv(1) 1083 self._box.unlock() 1084 finally: 1085 os._exit(0) 1086 1087 # In the parent, wait until the child signals it locked the mailbox. 1088 p.recv(1) 1089 try: 1090 self.assertRaises(mailbox.ExternalClashError, 1091 self._box.lock) 1092 finally: 1093 # Signal the child it can now release the lock and exit. 1094 p.send(b'p') 1095 # Wait for child to exit. Locking should now succeed. 1096 support.wait_process(pid, exitcode=0) 1097 1098 self._box.lock() 1099 self._box.unlock() 1100 1101 def test_relock(self): 1102 # Test case for bug #1575506: the mailbox class was locking the 1103 # wrong file object in its flush() method. 1104 msg = "Subject: sub\n\nbody\n" 1105 key1 = self._box.add(msg) 1106 self._box.flush() 1107 self._box.close() 1108 1109 self._box = self._factory(self._path) 1110 self._box.lock() 1111 key2 = self._box.add(msg) 1112 self._box.flush() 1113 self.assertTrue(self._box._locked) 1114 self._box.close() 1115 1116 1117class TestMbox(_TestMboxMMDF, unittest.TestCase): 1118 1119 _factory = lambda self, path, factory=None: mailbox.mbox(path, factory) 1120 1121 @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()') 1122 def test_file_perms(self): 1123 # From bug #3228, we want to verify that the mailbox file isn't executable, 1124 # even if the umask is set to something that would leave executable bits set. 1125 # We only run this test on platforms that support umask. 1126 try: 1127 old_umask = os.umask(0o077) 1128 self._box.close() 1129 os.unlink(self._path) 1130 self._box = mailbox.mbox(self._path, create=True) 1131 self._box.add('') 1132 self._box.close() 1133 finally: 1134 os.umask(old_umask) 1135 1136 st = os.stat(self._path) 1137 perms = st.st_mode 1138 self.assertFalse((perms & 0o111)) # Execute bits should all be off. 1139 1140 def test_terminating_newline(self): 1141 message = email.message.Message() 1142 message['From'] = 'john@example.com' 1143 message.set_payload('No newline at the end') 1144 i = self._box.add(message) 1145 1146 # A newline should have been appended to the payload 1147 message = self._box.get(i) 1148 self.assertEqual(message.get_payload(), 'No newline at the end\n') 1149 1150 def test_message_separator(self): 1151 # Check there's always a single blank line after each message 1152 self._box.add('From: foo\n\n0') # No newline at the end 1153 with open(self._path, encoding='utf-8') as f: 1154 data = f.read() 1155 self.assertEqual(data[-3:], '0\n\n') 1156 1157 self._box.add('From: foo\n\n0\n') # Newline at the end 1158 with open(self._path, encoding='utf-8') as f: 1159 data = f.read() 1160 self.assertEqual(data[-3:], '0\n\n') 1161 1162 1163class TestMMDF(_TestMboxMMDF, unittest.TestCase): 1164 1165 _factory = lambda self, path, factory=None: mailbox.MMDF(path, factory) 1166 1167 1168class TestMH(TestMailbox, unittest.TestCase): 1169 1170 _factory = lambda self, path, factory=None: mailbox.MH(path, factory) 1171 1172 def assertMailboxEmpty(self): 1173 self.assertEqual(os.listdir(self._path), ['.mh_sequences']) 1174 1175 def test_list_folders(self): 1176 # List folders 1177 self._box.add_folder('one') 1178 self._box.add_folder('two') 1179 self._box.add_folder('three') 1180 self.assertEqual(len(self._box.list_folders()), 3) 1181 self.assertEqual(set(self._box.list_folders()), 1182 set(('one', 'two', 'three'))) 1183 1184 def test_get_folder(self): 1185 # Open folders 1186 def dummy_factory (s): 1187 return None 1188 self._box = self._factory(self._path, dummy_factory) 1189 1190 new_folder = self._box.add_folder('foo.bar') 1191 folder0 = self._box.get_folder('foo.bar') 1192 folder0.add(self._template % 'bar') 1193 self.assertTrue(os.path.isdir(os.path.join(self._path, 'foo.bar'))) 1194 folder1 = self._box.get_folder('foo.bar') 1195 self.assertEqual(folder1.get_string(folder1.keys()[0]), 1196 self._template % 'bar') 1197 1198 # Test for bug #1569790: verify that folders returned by .get_folder() 1199 # use the same factory function. 1200 self.assertIs(new_folder._factory, self._box._factory) 1201 self.assertIs(folder0._factory, self._box._factory) 1202 1203 def test_add_and_remove_folders(self): 1204 # Delete folders 1205 self._box.add_folder('one') 1206 self._box.add_folder('two') 1207 self.assertEqual(len(self._box.list_folders()), 2) 1208 self.assertEqual(set(self._box.list_folders()), set(('one', 'two'))) 1209 self._box.remove_folder('one') 1210 self.assertEqual(len(self._box.list_folders()), 1) 1211 self.assertEqual(set(self._box.list_folders()), set(('two',))) 1212 self._box.add_folder('three') 1213 self.assertEqual(len(self._box.list_folders()), 2) 1214 self.assertEqual(set(self._box.list_folders()), set(('two', 'three'))) 1215 self._box.remove_folder('three') 1216 self.assertEqual(len(self._box.list_folders()), 1) 1217 self.assertEqual(set(self._box.list_folders()), set(('two',))) 1218 self._box.remove_folder('two') 1219 self.assertEqual(len(self._box.list_folders()), 0) 1220 self.assertEqual(self._box.list_folders(), []) 1221 1222 def test_sequences(self): 1223 # Get and set sequences 1224 self.assertEqual(self._box.get_sequences(), {}) 1225 msg0 = mailbox.MHMessage(self._template % 0) 1226 msg0.add_sequence('foo') 1227 key0 = self._box.add(msg0) 1228 self.assertEqual(self._box.get_sequences(), {'foo':[key0]}) 1229 msg1 = mailbox.MHMessage(self._template % 1) 1230 msg1.set_sequences(['bar', 'replied', 'foo']) 1231 key1 = self._box.add(msg1) 1232 self.assertEqual(self._box.get_sequences(), 1233 {'foo':[key0, key1], 'bar':[key1], 'replied':[key1]}) 1234 msg0.set_sequences(['flagged']) 1235 self._box[key0] = msg0 1236 self.assertEqual(self._box.get_sequences(), 1237 {'foo':[key1], 'bar':[key1], 'replied':[key1], 1238 'flagged':[key0]}) 1239 self._box.remove(key1) 1240 self.assertEqual(self._box.get_sequences(), {'flagged':[key0]}) 1241 1242 def test_issue2625(self): 1243 msg0 = mailbox.MHMessage(self._template % 0) 1244 msg0.add_sequence('foo') 1245 key0 = self._box.add(msg0) 1246 refmsg0 = self._box.get_message(key0) 1247 1248 def test_issue7627(self): 1249 msg0 = mailbox.MHMessage(self._template % 0) 1250 key0 = self._box.add(msg0) 1251 self._box.lock() 1252 self._box.remove(key0) 1253 self._box.unlock() 1254 1255 def test_pack(self): 1256 # Pack the contents of the mailbox 1257 msg0 = mailbox.MHMessage(self._template % 0) 1258 msg1 = mailbox.MHMessage(self._template % 1) 1259 msg2 = mailbox.MHMessage(self._template % 2) 1260 msg3 = mailbox.MHMessage(self._template % 3) 1261 msg0.set_sequences(['foo', 'unseen']) 1262 msg1.set_sequences(['foo']) 1263 msg2.set_sequences(['foo', 'flagged']) 1264 msg3.set_sequences(['foo', 'bar', 'replied']) 1265 key0 = self._box.add(msg0) 1266 key1 = self._box.add(msg1) 1267 key2 = self._box.add(msg2) 1268 key3 = self._box.add(msg3) 1269 self.assertEqual(self._box.get_sequences(), 1270 {'foo':[key0,key1,key2,key3], 'unseen':[key0], 1271 'flagged':[key2], 'bar':[key3], 'replied':[key3]}) 1272 self._box.remove(key2) 1273 self.assertEqual(self._box.get_sequences(), 1274 {'foo':[key0,key1,key3], 'unseen':[key0], 'bar':[key3], 1275 'replied':[key3]}) 1276 self._box.pack() 1277 self.assertEqual(self._box.keys(), [1, 2, 3]) 1278 key0 = key0 1279 key1 = key0 + 1 1280 key2 = key1 + 1 1281 self.assertEqual(self._box.get_sequences(), 1282 {'foo':[1, 2, 3], 'unseen':[1], 'bar':[3], 'replied':[3]}) 1283 1284 # Test case for packing while holding the mailbox locked. 1285 key0 = self._box.add(msg1) 1286 key1 = self._box.add(msg1) 1287 key2 = self._box.add(msg1) 1288 key3 = self._box.add(msg1) 1289 1290 self._box.remove(key0) 1291 self._box.remove(key2) 1292 self._box.lock() 1293 self._box.pack() 1294 self._box.unlock() 1295 self.assertEqual(self._box.get_sequences(), 1296 {'foo':[1, 2, 3, 4, 5], 1297 'unseen':[1], 'bar':[3], 'replied':[3]}) 1298 1299 def _get_lock_path(self): 1300 return os.path.join(self._path, '.mh_sequences.lock') 1301 1302 1303class TestBabyl(_TestSingleFile, unittest.TestCase): 1304 1305 _factory = lambda self, path, factory=None: mailbox.Babyl(path, factory) 1306 1307 def assertMailboxEmpty(self): 1308 with open(self._path, 'rb') as f: 1309 self.assertEqual(f.readlines(), []) 1310 1311 def tearDown(self): 1312 super().tearDown() 1313 self._box.close() 1314 self._delete_recursively(self._path) 1315 for lock_remnant in glob.glob(glob.escape(self._path) + '.*'): 1316 os_helper.unlink(lock_remnant) 1317 1318 def test_labels(self): 1319 # Get labels from the mailbox 1320 self.assertEqual(self._box.get_labels(), []) 1321 msg0 = mailbox.BabylMessage(self._template % 0) 1322 msg0.add_label('foo') 1323 key0 = self._box.add(msg0) 1324 self.assertEqual(self._box.get_labels(), ['foo']) 1325 msg1 = mailbox.BabylMessage(self._template % 1) 1326 msg1.set_labels(['bar', 'answered', 'foo']) 1327 key1 = self._box.add(msg1) 1328 self.assertEqual(set(self._box.get_labels()), set(['foo', 'bar'])) 1329 msg0.set_labels(['blah', 'filed']) 1330 self._box[key0] = msg0 1331 self.assertEqual(set(self._box.get_labels()), 1332 set(['foo', 'bar', 'blah'])) 1333 self._box.remove(key1) 1334 self.assertEqual(set(self._box.get_labels()), set(['blah'])) 1335 1336 1337class FakeFileLikeObject: 1338 1339 def __init__(self): 1340 self.closed = False 1341 1342 def close(self): 1343 self.closed = True 1344 1345 1346class FakeMailBox(mailbox.Mailbox): 1347 1348 def __init__(self): 1349 mailbox.Mailbox.__init__(self, '', lambda file: None) 1350 self.files = [FakeFileLikeObject() for i in range(10)] 1351 1352 def get_file(self, key): 1353 return self.files[key] 1354 1355 1356class TestFakeMailBox(unittest.TestCase): 1357 1358 def test_closing_fd(self): 1359 box = FakeMailBox() 1360 for i in range(10): 1361 self.assertFalse(box.files[i].closed) 1362 for i in range(10): 1363 box[i] 1364 for i in range(10): 1365 self.assertTrue(box.files[i].closed) 1366 1367 1368class TestMessage(TestBase, unittest.TestCase): 1369 1370 _factory = mailbox.Message # Overridden by subclasses to reuse tests 1371 1372 def setUp(self): 1373 self._path = os_helper.TESTFN 1374 1375 def tearDown(self): 1376 self._delete_recursively(self._path) 1377 1378 def test_initialize_with_eMM(self): 1379 # Initialize based on email.message.Message instance 1380 eMM = email.message_from_string(_sample_message) 1381 msg = self._factory(eMM) 1382 self._post_initialize_hook(msg) 1383 self._check_sample(msg) 1384 1385 def test_initialize_with_string(self): 1386 # Initialize based on string 1387 msg = self._factory(_sample_message) 1388 self._post_initialize_hook(msg) 1389 self._check_sample(msg) 1390 1391 def test_initialize_with_file(self): 1392 # Initialize based on contents of file 1393 with open(self._path, 'w+', encoding='utf-8') as f: 1394 f.write(_sample_message) 1395 f.seek(0) 1396 msg = self._factory(f) 1397 self._post_initialize_hook(msg) 1398 self._check_sample(msg) 1399 1400 def test_initialize_with_binary_file(self): 1401 # Initialize based on contents of binary file 1402 with open(self._path, 'wb+') as f: 1403 f.write(_bytes_sample_message) 1404 f.seek(0) 1405 msg = self._factory(f) 1406 self._post_initialize_hook(msg) 1407 self._check_sample(msg) 1408 1409 def test_initialize_with_nothing(self): 1410 # Initialize without arguments 1411 msg = self._factory() 1412 self._post_initialize_hook(msg) 1413 self.assertIsInstance(msg, email.message.Message) 1414 self.assertIsInstance(msg, mailbox.Message) 1415 self.assertIsInstance(msg, self._factory) 1416 self.assertEqual(msg.keys(), []) 1417 self.assertFalse(msg.is_multipart()) 1418 self.assertIsNone(msg.get_payload()) 1419 1420 def test_initialize_incorrectly(self): 1421 # Initialize with invalid argument 1422 self.assertRaises(TypeError, lambda: self._factory(object())) 1423 1424 def test_all_eMM_attributes_exist(self): 1425 # Issue 12537 1426 eMM = email.message_from_string(_sample_message) 1427 msg = self._factory(_sample_message) 1428 for attr in eMM.__dict__: 1429 self.assertIn(attr, msg.__dict__, 1430 '{} attribute does not exist'.format(attr)) 1431 1432 def test_become_message(self): 1433 # Take on the state of another message 1434 eMM = email.message_from_string(_sample_message) 1435 msg = self._factory() 1436 msg._become_message(eMM) 1437 self._check_sample(msg) 1438 1439 def test_explain_to(self): 1440 # Copy self's format-specific data to other message formats. 1441 # This test is superficial; better ones are in TestMessageConversion. 1442 msg = self._factory() 1443 for class_ in self.all_mailbox_types: 1444 other_msg = class_() 1445 msg._explain_to(other_msg) 1446 other_msg = email.message.Message() 1447 self.assertRaises(TypeError, lambda: msg._explain_to(other_msg)) 1448 1449 def _post_initialize_hook(self, msg): 1450 # Overridden by subclasses to check extra things after initialization 1451 pass 1452 1453 1454class TestMaildirMessage(TestMessage, unittest.TestCase): 1455 1456 _factory = mailbox.MaildirMessage 1457 1458 def _post_initialize_hook(self, msg): 1459 self.assertEqual(msg._subdir, 'new') 1460 self.assertEqual(msg._info, '') 1461 1462 def test_subdir(self): 1463 # Use get_subdir() and set_subdir() 1464 msg = mailbox.MaildirMessage(_sample_message) 1465 self.assertEqual(msg.get_subdir(), 'new') 1466 msg.set_subdir('cur') 1467 self.assertEqual(msg.get_subdir(), 'cur') 1468 msg.set_subdir('new') 1469 self.assertEqual(msg.get_subdir(), 'new') 1470 self.assertRaises(ValueError, lambda: msg.set_subdir('tmp')) 1471 self.assertEqual(msg.get_subdir(), 'new') 1472 msg.set_subdir('new') 1473 self.assertEqual(msg.get_subdir(), 'new') 1474 self._check_sample(msg) 1475 1476 def test_flags(self): 1477 # Use get_flags(), set_flags(), add_flag(), remove_flag() 1478 msg = mailbox.MaildirMessage(_sample_message) 1479 self.assertEqual(msg.get_flags(), '') 1480 self.assertEqual(msg.get_subdir(), 'new') 1481 msg.set_flags('F') 1482 self.assertEqual(msg.get_subdir(), 'new') 1483 self.assertEqual(msg.get_flags(), 'F') 1484 msg.set_flags('SDTP') 1485 self.assertEqual(msg.get_flags(), 'DPST') 1486 msg.add_flag('FT') 1487 self.assertEqual(msg.get_flags(), 'DFPST') 1488 msg.remove_flag('TDRP') 1489 self.assertEqual(msg.get_flags(), 'FS') 1490 self.assertEqual(msg.get_subdir(), 'new') 1491 self._check_sample(msg) 1492 1493 def test_date(self): 1494 # Use get_date() and set_date() 1495 msg = mailbox.MaildirMessage(_sample_message) 1496 self.assertLess(abs(msg.get_date() - time.time()), 60) 1497 msg.set_date(0.0) 1498 self.assertEqual(msg.get_date(), 0.0) 1499 1500 def test_info(self): 1501 # Use get_info() and set_info() 1502 msg = mailbox.MaildirMessage(_sample_message) 1503 self.assertEqual(msg.get_info(), '') 1504 msg.set_info('1,foo=bar') 1505 self.assertEqual(msg.get_info(), '1,foo=bar') 1506 self.assertRaises(TypeError, lambda: msg.set_info(None)) 1507 self._check_sample(msg) 1508 1509 def test_info_and_flags(self): 1510 # Test interaction of info and flag methods 1511 msg = mailbox.MaildirMessage(_sample_message) 1512 self.assertEqual(msg.get_info(), '') 1513 msg.set_flags('SF') 1514 self.assertEqual(msg.get_flags(), 'FS') 1515 self.assertEqual(msg.get_info(), '2,FS') 1516 msg.set_info('1,') 1517 self.assertEqual(msg.get_flags(), '') 1518 self.assertEqual(msg.get_info(), '1,') 1519 msg.remove_flag('RPT') 1520 self.assertEqual(msg.get_flags(), '') 1521 self.assertEqual(msg.get_info(), '1,') 1522 msg.add_flag('D') 1523 self.assertEqual(msg.get_flags(), 'D') 1524 self.assertEqual(msg.get_info(), '2,D') 1525 self._check_sample(msg) 1526 1527 1528class _TestMboxMMDFMessage: 1529 1530 _factory = mailbox._mboxMMDFMessage 1531 1532 def _post_initialize_hook(self, msg): 1533 self._check_from(msg) 1534 1535 def test_initialize_with_unixfrom(self): 1536 # Initialize with a message that already has a _unixfrom attribute 1537 msg = mailbox.Message(_sample_message) 1538 msg.set_unixfrom('From foo@bar blah') 1539 msg = mailbox.mboxMessage(msg) 1540 self.assertEqual(msg.get_from(), 'foo@bar blah', msg.get_from()) 1541 1542 def test_from(self): 1543 # Get and set "From " line 1544 msg = mailbox.mboxMessage(_sample_message) 1545 self._check_from(msg) 1546 msg.set_from('foo bar') 1547 self.assertEqual(msg.get_from(), 'foo bar') 1548 msg.set_from('foo@bar', True) 1549 self._check_from(msg, 'foo@bar') 1550 msg.set_from('blah@temp', time.localtime()) 1551 self._check_from(msg, 'blah@temp') 1552 1553 def test_flags(self): 1554 # Use get_flags(), set_flags(), add_flag(), remove_flag() 1555 msg = mailbox.mboxMessage(_sample_message) 1556 self.assertEqual(msg.get_flags(), '') 1557 msg.set_flags('F') 1558 self.assertEqual(msg.get_flags(), 'F') 1559 msg.set_flags('XODR') 1560 self.assertEqual(msg.get_flags(), 'RODX') 1561 msg.add_flag('FA') 1562 self.assertEqual(msg.get_flags(), 'RODFAX') 1563 msg.remove_flag('FDXA') 1564 self.assertEqual(msg.get_flags(), 'RO') 1565 self._check_sample(msg) 1566 1567 def _check_from(self, msg, sender=None): 1568 # Check contents of "From " line 1569 if sender is None: 1570 sender = "MAILER-DAEMON" 1571 self.assertIsNotNone(re.match( 1572 sender + r" \w{3} \w{3} [\d ]\d [\d ]\d:\d{2}:\d{2} \d{4}", 1573 msg.get_from())) 1574 1575 1576class TestMboxMessage(_TestMboxMMDFMessage, TestMessage): 1577 1578 _factory = mailbox.mboxMessage 1579 1580 1581class TestMHMessage(TestMessage, unittest.TestCase): 1582 1583 _factory = mailbox.MHMessage 1584 1585 def _post_initialize_hook(self, msg): 1586 self.assertEqual(msg._sequences, []) 1587 1588 def test_sequences(self): 1589 # Get, set, join, and leave sequences 1590 msg = mailbox.MHMessage(_sample_message) 1591 self.assertEqual(msg.get_sequences(), []) 1592 msg.set_sequences(['foobar']) 1593 self.assertEqual(msg.get_sequences(), ['foobar']) 1594 msg.set_sequences([]) 1595 self.assertEqual(msg.get_sequences(), []) 1596 msg.add_sequence('unseen') 1597 self.assertEqual(msg.get_sequences(), ['unseen']) 1598 msg.add_sequence('flagged') 1599 self.assertEqual(msg.get_sequences(), ['unseen', 'flagged']) 1600 msg.add_sequence('flagged') 1601 self.assertEqual(msg.get_sequences(), ['unseen', 'flagged']) 1602 msg.remove_sequence('unseen') 1603 self.assertEqual(msg.get_sequences(), ['flagged']) 1604 msg.add_sequence('foobar') 1605 self.assertEqual(msg.get_sequences(), ['flagged', 'foobar']) 1606 msg.remove_sequence('replied') 1607 self.assertEqual(msg.get_sequences(), ['flagged', 'foobar']) 1608 msg.set_sequences(['foobar', 'replied']) 1609 self.assertEqual(msg.get_sequences(), ['foobar', 'replied']) 1610 1611 1612class TestBabylMessage(TestMessage, unittest.TestCase): 1613 1614 _factory = mailbox.BabylMessage 1615 1616 def _post_initialize_hook(self, msg): 1617 self.assertEqual(msg._labels, []) 1618 1619 def test_labels(self): 1620 # Get, set, join, and leave labels 1621 msg = mailbox.BabylMessage(_sample_message) 1622 self.assertEqual(msg.get_labels(), []) 1623 msg.set_labels(['foobar']) 1624 self.assertEqual(msg.get_labels(), ['foobar']) 1625 msg.set_labels([]) 1626 self.assertEqual(msg.get_labels(), []) 1627 msg.add_label('filed') 1628 self.assertEqual(msg.get_labels(), ['filed']) 1629 msg.add_label('resent') 1630 self.assertEqual(msg.get_labels(), ['filed', 'resent']) 1631 msg.add_label('resent') 1632 self.assertEqual(msg.get_labels(), ['filed', 'resent']) 1633 msg.remove_label('filed') 1634 self.assertEqual(msg.get_labels(), ['resent']) 1635 msg.add_label('foobar') 1636 self.assertEqual(msg.get_labels(), ['resent', 'foobar']) 1637 msg.remove_label('unseen') 1638 self.assertEqual(msg.get_labels(), ['resent', 'foobar']) 1639 msg.set_labels(['foobar', 'answered']) 1640 self.assertEqual(msg.get_labels(), ['foobar', 'answered']) 1641 1642 def test_visible(self): 1643 # Get, set, and update visible headers 1644 msg = mailbox.BabylMessage(_sample_message) 1645 visible = msg.get_visible() 1646 self.assertEqual(visible.keys(), []) 1647 self.assertIsNone(visible.get_payload()) 1648 visible['User-Agent'] = 'FooBar 1.0' 1649 visible['X-Whatever'] = 'Blah' 1650 self.assertEqual(msg.get_visible().keys(), []) 1651 msg.set_visible(visible) 1652 visible = msg.get_visible() 1653 self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever']) 1654 self.assertEqual(visible['User-Agent'], 'FooBar 1.0') 1655 self.assertEqual(visible['X-Whatever'], 'Blah') 1656 self.assertIsNone(visible.get_payload()) 1657 msg.update_visible() 1658 self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever']) 1659 self.assertIsNone(visible.get_payload()) 1660 visible = msg.get_visible() 1661 self.assertEqual(visible.keys(), ['User-Agent', 'Date', 'From', 'To', 1662 'Subject']) 1663 for header in ('User-Agent', 'Date', 'From', 'To', 'Subject'): 1664 self.assertEqual(visible[header], msg[header]) 1665 1666 1667class TestMMDFMessage(_TestMboxMMDFMessage, TestMessage): 1668 1669 _factory = mailbox.MMDFMessage 1670 1671 1672class TestMessageConversion(TestBase, unittest.TestCase): 1673 1674 def test_plain_to_x(self): 1675 # Convert Message to all formats 1676 for class_ in self.all_mailbox_types: 1677 msg_plain = mailbox.Message(_sample_message) 1678 msg = class_(msg_plain) 1679 self._check_sample(msg) 1680 1681 def test_x_to_plain(self): 1682 # Convert all formats to Message 1683 for class_ in self.all_mailbox_types: 1684 msg = class_(_sample_message) 1685 msg_plain = mailbox.Message(msg) 1686 self._check_sample(msg_plain) 1687 1688 def test_x_from_bytes(self): 1689 # Convert all formats to Message 1690 for class_ in self.all_mailbox_types: 1691 msg = class_(_bytes_sample_message) 1692 self._check_sample(msg) 1693 1694 def test_x_to_invalid(self): 1695 # Convert all formats to an invalid format 1696 for class_ in self.all_mailbox_types: 1697 self.assertRaises(TypeError, lambda: class_(False)) 1698 1699 def test_type_specific_attributes_removed_on_conversion(self): 1700 reference = {class_: class_(_sample_message).__dict__ 1701 for class_ in self.all_mailbox_types} 1702 for class1 in self.all_mailbox_types: 1703 for class2 in self.all_mailbox_types: 1704 if class1 is class2: 1705 continue 1706 source = class1(_sample_message) 1707 target = class2(source) 1708 type_specific = [a for a in reference[class1] 1709 if a not in reference[class2]] 1710 for attr in type_specific: 1711 self.assertNotIn(attr, target.__dict__, 1712 "while converting {} to {}".format(class1, class2)) 1713 1714 def test_maildir_to_maildir(self): 1715 # Convert MaildirMessage to MaildirMessage 1716 msg_maildir = mailbox.MaildirMessage(_sample_message) 1717 msg_maildir.set_flags('DFPRST') 1718 msg_maildir.set_subdir('cur') 1719 date = msg_maildir.get_date() 1720 msg = mailbox.MaildirMessage(msg_maildir) 1721 self._check_sample(msg) 1722 self.assertEqual(msg.get_flags(), 'DFPRST') 1723 self.assertEqual(msg.get_subdir(), 'cur') 1724 self.assertEqual(msg.get_date(), date) 1725 1726 def test_maildir_to_mboxmmdf(self): 1727 # Convert MaildirMessage to mboxmessage and MMDFMessage 1728 pairs = (('D', ''), ('F', 'F'), ('P', ''), ('R', 'A'), ('S', 'R'), 1729 ('T', 'D'), ('DFPRST', 'RDFA')) 1730 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1731 msg_maildir = mailbox.MaildirMessage(_sample_message) 1732 msg_maildir.set_date(0.0) 1733 for setting, result in pairs: 1734 msg_maildir.set_flags(setting) 1735 msg = class_(msg_maildir) 1736 self.assertEqual(msg.get_flags(), result) 1737 self.assertEqual(msg.get_from(), 'MAILER-DAEMON %s' % 1738 time.asctime(time.gmtime(0.0))) 1739 msg_maildir.set_subdir('cur') 1740 self.assertEqual(class_(msg_maildir).get_flags(), 'RODFA') 1741 1742 def test_maildir_to_mh(self): 1743 # Convert MaildirMessage to MHMessage 1744 msg_maildir = mailbox.MaildirMessage(_sample_message) 1745 pairs = (('D', ['unseen']), ('F', ['unseen', 'flagged']), 1746 ('P', ['unseen']), ('R', ['unseen', 'replied']), ('S', []), 1747 ('T', ['unseen']), ('DFPRST', ['replied', 'flagged'])) 1748 for setting, result in pairs: 1749 msg_maildir.set_flags(setting) 1750 self.assertEqual(mailbox.MHMessage(msg_maildir).get_sequences(), 1751 result) 1752 1753 def test_maildir_to_babyl(self): 1754 # Convert MaildirMessage to Babyl 1755 msg_maildir = mailbox.MaildirMessage(_sample_message) 1756 pairs = (('D', ['unseen']), ('F', ['unseen']), 1757 ('P', ['unseen', 'forwarded']), ('R', ['unseen', 'answered']), 1758 ('S', []), ('T', ['unseen', 'deleted']), 1759 ('DFPRST', ['deleted', 'answered', 'forwarded'])) 1760 for setting, result in pairs: 1761 msg_maildir.set_flags(setting) 1762 self.assertEqual(mailbox.BabylMessage(msg_maildir).get_labels(), 1763 result) 1764 1765 def test_mboxmmdf_to_maildir(self): 1766 # Convert mboxMessage and MMDFMessage to MaildirMessage 1767 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1768 msg_mboxMMDF = class_(_sample_message) 1769 msg_mboxMMDF.set_from('foo@bar', time.gmtime(0.0)) 1770 pairs = (('R', 'S'), ('O', ''), ('D', 'T'), ('F', 'F'), ('A', 'R'), 1771 ('RODFA', 'FRST')) 1772 for setting, result in pairs: 1773 msg_mboxMMDF.set_flags(setting) 1774 msg = mailbox.MaildirMessage(msg_mboxMMDF) 1775 self.assertEqual(msg.get_flags(), result) 1776 self.assertEqual(msg.get_date(), 0.0) 1777 msg_mboxMMDF.set_flags('O') 1778 self.assertEqual(mailbox.MaildirMessage(msg_mboxMMDF).get_subdir(), 1779 'cur') 1780 1781 def test_mboxmmdf_to_mboxmmdf(self): 1782 # Convert mboxMessage and MMDFMessage to mboxMessage and MMDFMessage 1783 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1784 msg_mboxMMDF = class_(_sample_message) 1785 msg_mboxMMDF.set_flags('RODFA') 1786 msg_mboxMMDF.set_from('foo@bar') 1787 for class2_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1788 msg2 = class2_(msg_mboxMMDF) 1789 self.assertEqual(msg2.get_flags(), 'RODFA') 1790 self.assertEqual(msg2.get_from(), 'foo@bar') 1791 1792 def test_mboxmmdf_to_mh(self): 1793 # Convert mboxMessage and MMDFMessage to MHMessage 1794 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1795 msg_mboxMMDF = class_(_sample_message) 1796 pairs = (('R', []), ('O', ['unseen']), ('D', ['unseen']), 1797 ('F', ['unseen', 'flagged']), 1798 ('A', ['unseen', 'replied']), 1799 ('RODFA', ['replied', 'flagged'])) 1800 for setting, result in pairs: 1801 msg_mboxMMDF.set_flags(setting) 1802 self.assertEqual(mailbox.MHMessage(msg_mboxMMDF).get_sequences(), 1803 result) 1804 1805 def test_mboxmmdf_to_babyl(self): 1806 # Convert mboxMessage and MMDFMessage to BabylMessage 1807 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1808 msg = class_(_sample_message) 1809 pairs = (('R', []), ('O', ['unseen']), 1810 ('D', ['unseen', 'deleted']), ('F', ['unseen']), 1811 ('A', ['unseen', 'answered']), 1812 ('RODFA', ['deleted', 'answered'])) 1813 for setting, result in pairs: 1814 msg.set_flags(setting) 1815 self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result) 1816 1817 def test_mh_to_maildir(self): 1818 # Convert MHMessage to MaildirMessage 1819 pairs = (('unseen', ''), ('replied', 'RS'), ('flagged', 'FS')) 1820 for setting, result in pairs: 1821 msg = mailbox.MHMessage(_sample_message) 1822 msg.add_sequence(setting) 1823 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result) 1824 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur') 1825 msg = mailbox.MHMessage(_sample_message) 1826 msg.add_sequence('unseen') 1827 msg.add_sequence('replied') 1828 msg.add_sequence('flagged') 1829 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'FR') 1830 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur') 1831 1832 def test_mh_to_mboxmmdf(self): 1833 # Convert MHMessage to mboxMessage and MMDFMessage 1834 pairs = (('unseen', 'O'), ('replied', 'ROA'), ('flagged', 'ROF')) 1835 for setting, result in pairs: 1836 msg = mailbox.MHMessage(_sample_message) 1837 msg.add_sequence(setting) 1838 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1839 self.assertEqual(class_(msg).get_flags(), result) 1840 msg = mailbox.MHMessage(_sample_message) 1841 msg.add_sequence('unseen') 1842 msg.add_sequence('replied') 1843 msg.add_sequence('flagged') 1844 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1845 self.assertEqual(class_(msg).get_flags(), 'OFA') 1846 1847 def test_mh_to_mh(self): 1848 # Convert MHMessage to MHMessage 1849 msg = mailbox.MHMessage(_sample_message) 1850 msg.add_sequence('unseen') 1851 msg.add_sequence('replied') 1852 msg.add_sequence('flagged') 1853 self.assertEqual(mailbox.MHMessage(msg).get_sequences(), 1854 ['unseen', 'replied', 'flagged']) 1855 1856 def test_mh_to_babyl(self): 1857 # Convert MHMessage to BabylMessage 1858 pairs = (('unseen', ['unseen']), ('replied', ['answered']), 1859 ('flagged', [])) 1860 for setting, result in pairs: 1861 msg = mailbox.MHMessage(_sample_message) 1862 msg.add_sequence(setting) 1863 self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result) 1864 msg = mailbox.MHMessage(_sample_message) 1865 msg.add_sequence('unseen') 1866 msg.add_sequence('replied') 1867 msg.add_sequence('flagged') 1868 self.assertEqual(mailbox.BabylMessage(msg).get_labels(), 1869 ['unseen', 'answered']) 1870 1871 def test_babyl_to_maildir(self): 1872 # Convert BabylMessage to MaildirMessage 1873 pairs = (('unseen', ''), ('deleted', 'ST'), ('filed', 'S'), 1874 ('answered', 'RS'), ('forwarded', 'PS'), ('edited', 'S'), 1875 ('resent', 'PS')) 1876 for setting, result in pairs: 1877 msg = mailbox.BabylMessage(_sample_message) 1878 msg.add_label(setting) 1879 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result) 1880 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur') 1881 msg = mailbox.BabylMessage(_sample_message) 1882 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', 1883 'edited', 'resent'): 1884 msg.add_label(label) 1885 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'PRT') 1886 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur') 1887 1888 def test_babyl_to_mboxmmdf(self): 1889 # Convert BabylMessage to mboxMessage and MMDFMessage 1890 pairs = (('unseen', 'O'), ('deleted', 'ROD'), ('filed', 'RO'), 1891 ('answered', 'ROA'), ('forwarded', 'RO'), ('edited', 'RO'), 1892 ('resent', 'RO')) 1893 for setting, result in pairs: 1894 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1895 msg = mailbox.BabylMessage(_sample_message) 1896 msg.add_label(setting) 1897 self.assertEqual(class_(msg).get_flags(), result) 1898 msg = mailbox.BabylMessage(_sample_message) 1899 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', 1900 'edited', 'resent'): 1901 msg.add_label(label) 1902 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1903 self.assertEqual(class_(msg).get_flags(), 'ODA') 1904 1905 def test_babyl_to_mh(self): 1906 # Convert BabylMessage to MHMessage 1907 pairs = (('unseen', ['unseen']), ('deleted', []), ('filed', []), 1908 ('answered', ['replied']), ('forwarded', []), ('edited', []), 1909 ('resent', [])) 1910 for setting, result in pairs: 1911 msg = mailbox.BabylMessage(_sample_message) 1912 msg.add_label(setting) 1913 self.assertEqual(mailbox.MHMessage(msg).get_sequences(), result) 1914 msg = mailbox.BabylMessage(_sample_message) 1915 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', 1916 'edited', 'resent'): 1917 msg.add_label(label) 1918 self.assertEqual(mailbox.MHMessage(msg).get_sequences(), 1919 ['unseen', 'replied']) 1920 1921 def test_babyl_to_babyl(self): 1922 # Convert BabylMessage to BabylMessage 1923 msg = mailbox.BabylMessage(_sample_message) 1924 msg.update_visible() 1925 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', 1926 'edited', 'resent'): 1927 msg.add_label(label) 1928 msg2 = mailbox.BabylMessage(msg) 1929 self.assertEqual(msg2.get_labels(), ['unseen', 'deleted', 'filed', 1930 'answered', 'forwarded', 'edited', 1931 'resent']) 1932 self.assertEqual(msg.get_visible().keys(), msg2.get_visible().keys()) 1933 for key in msg.get_visible().keys(): 1934 self.assertEqual(msg.get_visible()[key], msg2.get_visible()[key]) 1935 1936 1937class TestProxyFileBase(TestBase): 1938 1939 def _test_read(self, proxy): 1940 # Read by byte 1941 proxy.seek(0) 1942 self.assertEqual(proxy.read(), b'bar') 1943 proxy.seek(1) 1944 self.assertEqual(proxy.read(), b'ar') 1945 proxy.seek(0) 1946 self.assertEqual(proxy.read(2), b'ba') 1947 proxy.seek(1) 1948 self.assertEqual(proxy.read(-1), b'ar') 1949 proxy.seek(2) 1950 self.assertEqual(proxy.read(1000), b'r') 1951 1952 def _test_readline(self, proxy): 1953 # Read by line 1954 linesep = os.linesep.encode() 1955 proxy.seek(0) 1956 self.assertEqual(proxy.readline(), b'foo' + linesep) 1957 self.assertEqual(proxy.readline(), b'bar' + linesep) 1958 self.assertEqual(proxy.readline(), b'fred' + linesep) 1959 self.assertEqual(proxy.readline(), b'bob') 1960 proxy.seek(2) 1961 self.assertEqual(proxy.readline(), b'o' + linesep) 1962 proxy.seek(6 + 2 * len(os.linesep)) 1963 self.assertEqual(proxy.readline(), b'fred' + linesep) 1964 proxy.seek(6 + 2 * len(os.linesep)) 1965 self.assertEqual(proxy.readline(2), b'fr') 1966 self.assertEqual(proxy.readline(-10), b'ed' + linesep) 1967 1968 def _test_readlines(self, proxy): 1969 # Read multiple lines 1970 linesep = os.linesep.encode() 1971 proxy.seek(0) 1972 self.assertEqual(proxy.readlines(), [b'foo' + linesep, 1973 b'bar' + linesep, 1974 b'fred' + linesep, b'bob']) 1975 proxy.seek(0) 1976 self.assertEqual(proxy.readlines(2), [b'foo' + linesep]) 1977 proxy.seek(3 + len(linesep)) 1978 self.assertEqual(proxy.readlines(4 + len(linesep)), 1979 [b'bar' + linesep, b'fred' + linesep]) 1980 proxy.seek(3) 1981 self.assertEqual(proxy.readlines(1000), [linesep, b'bar' + linesep, 1982 b'fred' + linesep, b'bob']) 1983 1984 def _test_iteration(self, proxy): 1985 # Iterate by line 1986 linesep = os.linesep.encode() 1987 proxy.seek(0) 1988 iterator = iter(proxy) 1989 self.assertEqual(next(iterator), b'foo' + linesep) 1990 self.assertEqual(next(iterator), b'bar' + linesep) 1991 self.assertEqual(next(iterator), b'fred' + linesep) 1992 self.assertEqual(next(iterator), b'bob') 1993 self.assertRaises(StopIteration, next, iterator) 1994 1995 def _test_seek_and_tell(self, proxy): 1996 # Seek and use tell to check position 1997 linesep = os.linesep.encode() 1998 proxy.seek(3) 1999 self.assertEqual(proxy.tell(), 3) 2000 self.assertEqual(proxy.read(len(linesep)), linesep) 2001 proxy.seek(2, 1) 2002 self.assertEqual(proxy.read(1 + len(linesep)), b'r' + linesep) 2003 proxy.seek(-3 - len(linesep), 2) 2004 self.assertEqual(proxy.read(3), b'bar') 2005 proxy.seek(2, 0) 2006 self.assertEqual(proxy.read(), b'o' + linesep + b'bar' + linesep) 2007 proxy.seek(100) 2008 self.assertFalse(proxy.read()) 2009 2010 def _test_close(self, proxy): 2011 # Close a file 2012 self.assertFalse(proxy.closed) 2013 proxy.close() 2014 self.assertTrue(proxy.closed) 2015 # Issue 11700 subsequent closes should be a no-op. 2016 proxy.close() 2017 self.assertTrue(proxy.closed) 2018 2019 2020class TestProxyFile(TestProxyFileBase, unittest.TestCase): 2021 2022 def setUp(self): 2023 self._path = os_helper.TESTFN 2024 self._file = open(self._path, 'wb+') 2025 2026 def tearDown(self): 2027 self._file.close() 2028 self._delete_recursively(self._path) 2029 2030 def test_initialize(self): 2031 # Initialize and check position 2032 self._file.write(b'foo') 2033 pos = self._file.tell() 2034 proxy0 = mailbox._ProxyFile(self._file) 2035 self.assertEqual(proxy0.tell(), pos) 2036 self.assertEqual(self._file.tell(), pos) 2037 proxy1 = mailbox._ProxyFile(self._file, 0) 2038 self.assertEqual(proxy1.tell(), 0) 2039 self.assertEqual(self._file.tell(), pos) 2040 2041 def test_read(self): 2042 self._file.write(b'bar') 2043 self._test_read(mailbox._ProxyFile(self._file)) 2044 2045 def test_readline(self): 2046 self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep, 2047 os.linesep), 'ascii')) 2048 self._test_readline(mailbox._ProxyFile(self._file)) 2049 2050 def test_readlines(self): 2051 self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep, 2052 os.linesep), 'ascii')) 2053 self._test_readlines(mailbox._ProxyFile(self._file)) 2054 2055 def test_iteration(self): 2056 self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep, 2057 os.linesep), 'ascii')) 2058 self._test_iteration(mailbox._ProxyFile(self._file)) 2059 2060 def test_seek_and_tell(self): 2061 self._file.write(bytes('foo%sbar%s' % (os.linesep, os.linesep), 'ascii')) 2062 self._test_seek_and_tell(mailbox._ProxyFile(self._file)) 2063 2064 def test_close(self): 2065 self._file.write(bytes('foo%sbar%s' % (os.linesep, os.linesep), 'ascii')) 2066 self._test_close(mailbox._ProxyFile(self._file)) 2067 2068 2069class TestPartialFile(TestProxyFileBase, unittest.TestCase): 2070 2071 def setUp(self): 2072 self._path = os_helper.TESTFN 2073 self._file = open(self._path, 'wb+') 2074 2075 def tearDown(self): 2076 self._file.close() 2077 self._delete_recursively(self._path) 2078 2079 def test_initialize(self): 2080 # Initialize and check position 2081 self._file.write(bytes('foo' + os.linesep + 'bar', 'ascii')) 2082 pos = self._file.tell() 2083 proxy = mailbox._PartialFile(self._file, 2, 5) 2084 self.assertEqual(proxy.tell(), 0) 2085 self.assertEqual(self._file.tell(), pos) 2086 2087 def test_read(self): 2088 self._file.write(bytes('***bar***', 'ascii')) 2089 self._test_read(mailbox._PartialFile(self._file, 3, 6)) 2090 2091 def test_readline(self): 2092 self._file.write(bytes('!!!!!foo%sbar%sfred%sbob!!!!!' % 2093 (os.linesep, os.linesep, os.linesep), 'ascii')) 2094 self._test_readline(mailbox._PartialFile(self._file, 5, 2095 18 + 3 * len(os.linesep))) 2096 2097 def test_readlines(self): 2098 self._file.write(bytes('foo%sbar%sfred%sbob?????' % 2099 (os.linesep, os.linesep, os.linesep), 'ascii')) 2100 self._test_readlines(mailbox._PartialFile(self._file, 0, 2101 13 + 3 * len(os.linesep))) 2102 2103 def test_iteration(self): 2104 self._file.write(bytes('____foo%sbar%sfred%sbob####' % 2105 (os.linesep, os.linesep, os.linesep), 'ascii')) 2106 self._test_iteration(mailbox._PartialFile(self._file, 4, 2107 17 + 3 * len(os.linesep))) 2108 2109 def test_seek_and_tell(self): 2110 self._file.write(bytes('(((foo%sbar%s$$$' % (os.linesep, os.linesep), 'ascii')) 2111 self._test_seek_and_tell(mailbox._PartialFile(self._file, 3, 2112 9 + 2 * len(os.linesep))) 2113 2114 def test_close(self): 2115 self._file.write(bytes('&foo%sbar%s^' % (os.linesep, os.linesep), 'ascii')) 2116 self._test_close(mailbox._PartialFile(self._file, 1, 2117 6 + 3 * len(os.linesep))) 2118 2119 2120## Start: tests from the original module (for backward compatibility). 2121 2122FROM_ = "From some.body@dummy.domain Sat Jul 24 13:43:35 2004\n" 2123DUMMY_MESSAGE = """\ 2124From: some.body@dummy.domain 2125To: me@my.domain 2126Subject: Simple Test 2127 2128This is a dummy message. 2129""" 2130 2131class MaildirTestCase(unittest.TestCase): 2132 2133 def setUp(self): 2134 # create a new maildir mailbox to work with: 2135 self._dir = os_helper.TESTFN 2136 if os.path.isdir(self._dir): 2137 os_helper.rmtree(self._dir) 2138 elif os.path.isfile(self._dir): 2139 os_helper.unlink(self._dir) 2140 os.mkdir(self._dir) 2141 os.mkdir(os.path.join(self._dir, "cur")) 2142 os.mkdir(os.path.join(self._dir, "tmp")) 2143 os.mkdir(os.path.join(self._dir, "new")) 2144 self._counter = 1 2145 self._msgfiles = [] 2146 2147 def tearDown(self): 2148 list(map(os.unlink, self._msgfiles)) 2149 os_helper.rmdir(os.path.join(self._dir, "cur")) 2150 os_helper.rmdir(os.path.join(self._dir, "tmp")) 2151 os_helper.rmdir(os.path.join(self._dir, "new")) 2152 os_helper.rmdir(self._dir) 2153 2154 def createMessage(self, dir, mbox=False): 2155 t = int(time.time() % 1000000) 2156 pid = self._counter 2157 self._counter += 1 2158 filename = ".".join((str(t), str(pid), "myhostname", "mydomain")) 2159 tmpname = os.path.join(self._dir, "tmp", filename) 2160 newname = os.path.join(self._dir, dir, filename) 2161 with open(tmpname, "w", encoding="utf-8") as fp: 2162 self._msgfiles.append(tmpname) 2163 if mbox: 2164 fp.write(FROM_) 2165 fp.write(DUMMY_MESSAGE) 2166 try: 2167 os.link(tmpname, newname) 2168 except (AttributeError, PermissionError): 2169 with open(newname, "w") as fp: 2170 fp.write(DUMMY_MESSAGE) 2171 self._msgfiles.append(newname) 2172 return tmpname 2173 2174 def test_empty_maildir(self): 2175 """Test an empty maildir mailbox""" 2176 # Test for regression on bug #117490: 2177 # Make sure the boxes attribute actually gets set. 2178 self.mbox = mailbox.Maildir(os_helper.TESTFN) 2179 #self.assertTrue(hasattr(self.mbox, "boxes")) 2180 #self.assertEqual(len(self.mbox.boxes), 0) 2181 self.assertIsNone(self.mbox.next()) 2182 self.assertIsNone(self.mbox.next()) 2183 2184 def test_nonempty_maildir_cur(self): 2185 self.createMessage("cur") 2186 self.mbox = mailbox.Maildir(os_helper.TESTFN) 2187 #self.assertEqual(len(self.mbox.boxes), 1) 2188 self.assertIsNotNone(self.mbox.next()) 2189 self.assertIsNone(self.mbox.next()) 2190 self.assertIsNone(self.mbox.next()) 2191 2192 def test_nonempty_maildir_new(self): 2193 self.createMessage("new") 2194 self.mbox = mailbox.Maildir(os_helper.TESTFN) 2195 #self.assertEqual(len(self.mbox.boxes), 1) 2196 self.assertIsNotNone(self.mbox.next()) 2197 self.assertIsNone(self.mbox.next()) 2198 self.assertIsNone(self.mbox.next()) 2199 2200 def test_nonempty_maildir_both(self): 2201 self.createMessage("cur") 2202 self.createMessage("new") 2203 self.mbox = mailbox.Maildir(os_helper.TESTFN) 2204 #self.assertEqual(len(self.mbox.boxes), 2) 2205 self.assertIsNotNone(self.mbox.next()) 2206 self.assertIsNotNone(self.mbox.next()) 2207 self.assertIsNone(self.mbox.next()) 2208 self.assertIsNone(self.mbox.next()) 2209 2210## End: tests from the original module (for backward compatibility). 2211 2212 2213_sample_message = """\ 2214Return-Path: <gkj@gregorykjohnson.com> 2215X-Original-To: gkj+person@localhost 2216Delivered-To: gkj+person@localhost 2217Received: from localhost (localhost [127.0.0.1]) 2218 by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17 2219 for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT) 2220Delivered-To: gkj@sundance.gregorykjohnson.com 2221Received: from localhost [127.0.0.1] 2222 by localhost with POP3 (fetchmail-6.2.5) 2223 for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT) 2224Received: from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228]) 2225 by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746 2226 for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT) 2227Received: by andy.gregorykjohnson.com (Postfix, from userid 1000) 2228 id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT) 2229Date: Wed, 13 Jul 2005 17:23:11 -0400 2230From: "Gregory K. Johnson" <gkj@gregorykjohnson.com> 2231To: gkj@gregorykjohnson.com 2232Subject: Sample message 2233Message-ID: <20050713212311.GC4701@andy.gregorykjohnson.com> 2234Mime-Version: 1.0 2235Content-Type: multipart/mixed; boundary="NMuMz9nt05w80d4+" 2236Content-Disposition: inline 2237User-Agent: Mutt/1.5.9i 2238 2239 2240--NMuMz9nt05w80d4+ 2241Content-Type: text/plain; charset=us-ascii 2242Content-Disposition: inline 2243 2244This is a sample message. 2245 2246-- 2247Gregory K. Johnson 2248 2249--NMuMz9nt05w80d4+ 2250Content-Type: application/octet-stream 2251Content-Disposition: attachment; filename="text.gz" 2252Content-Transfer-Encoding: base64 2253 2254H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs 22553FYlAAAA 2256 2257--NMuMz9nt05w80d4+-- 2258""" 2259 2260_bytes_sample_message = _sample_message.encode('ascii') 2261 2262_sample_headers = { 2263 "Return-Path":"<gkj@gregorykjohnson.com>", 2264 "X-Original-To":"gkj+person@localhost", 2265 "Delivered-To":"gkj+person@localhost", 2266 "Received":"""from localhost (localhost [127.0.0.1]) 2267 by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17 2268 for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""", 2269 "Delivered-To":"gkj@sundance.gregorykjohnson.com", 2270 "Received":"""from localhost [127.0.0.1] 2271 by localhost with POP3 (fetchmail-6.2.5) 2272 for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""", 2273 "Received":"""from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228]) 2274 by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746 2275 for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""", 2276 "Received":"""by andy.gregorykjohnson.com (Postfix, from userid 1000) 2277 id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""", 2278 "Date":"Wed, 13 Jul 2005 17:23:11 -0400", 2279 "From":""""Gregory K. Johnson" <gkj@gregorykjohnson.com>""", 2280 "To":"gkj@gregorykjohnson.com", 2281 "Subject":"Sample message", 2282 "Mime-Version":"1.0", 2283 "Content-Type":"""multipart/mixed; boundary="NMuMz9nt05w80d4+\"""", 2284 "Content-Disposition":"inline", 2285 "User-Agent": "Mutt/1.5.9i" } 2286 2287_sample_payloads = ("""This is a sample message. 2288 2289-- 2290Gregory K. Johnson 2291""", 2292"""H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs 22933FYlAAAA 2294""") 2295 2296 2297class MiscTestCase(unittest.TestCase): 2298 def test__all__(self): 2299 support.check__all__(self, mailbox, 2300 not_exported={"linesep", "fcntl"}) 2301 2302 2303def tearDownModule(): 2304 support.reap_children() 2305 2306 2307if __name__ == '__main__': 2308 unittest.main() 2309