1import os
2import sys
3import time
4import stat
5import socket
6import email
7import email.message
8import re
9import io
10import tempfile
11from test import support
12from test.support import os_helper
13import unittest
14import textwrap
15import mailbox
16import glob
17
18
19class TestBase:
20
21    all_mailbox_types = (mailbox.Message, mailbox.MaildirMessage,
22                         mailbox.mboxMessage, mailbox.MHMessage,
23                         mailbox.BabylMessage, mailbox.MMDFMessage)
24
25    def _check_sample(self, msg):
26        # Inspect a mailbox.Message representation of the sample message
27        self.assertIsInstance(msg, email.message.Message)
28        self.assertIsInstance(msg, mailbox.Message)
29        for key, value in _sample_headers.items():
30            self.assertIn(value, msg.get_all(key))
31        self.assertTrue(msg.is_multipart())
32        self.assertEqual(len(msg.get_payload()), len(_sample_payloads))
33        for i, payload in enumerate(_sample_payloads):
34            part = msg.get_payload(i)
35            self.assertIsInstance(part, email.message.Message)
36            self.assertNotIsInstance(part, mailbox.Message)
37            self.assertEqual(part.get_payload(), payload)
38
39    def _delete_recursively(self, target):
40        # Delete a file or delete a directory recursively
41        if os.path.isdir(target):
42            os_helper.rmtree(target)
43        elif os.path.exists(target):
44            os_helper.unlink(target)
45
46
47class TestMailbox(TestBase):
48
49    maxDiff = None
50
51    _factory = None     # Overridden by subclasses to reuse tests
52    _template = 'From: foo\n\n%s\n'
53
54    def setUp(self):
55        self._path = os_helper.TESTFN
56        self._delete_recursively(self._path)
57        self._box = self._factory(self._path)
58
59    def tearDown(self):
60        self._box.close()
61        self._delete_recursively(self._path)
62
63    def test_add(self):
64        # Add copies of a sample message
65        keys = []
66        keys.append(self._box.add(self._template % 0))
67        self.assertEqual(len(self._box), 1)
68        keys.append(self._box.add(mailbox.Message(_sample_message)))
69        self.assertEqual(len(self._box), 2)
70        keys.append(self._box.add(email.message_from_string(_sample_message)))
71        self.assertEqual(len(self._box), 3)
72        keys.append(self._box.add(io.BytesIO(_bytes_sample_message)))
73        self.assertEqual(len(self._box), 4)
74        keys.append(self._box.add(_sample_message))
75        self.assertEqual(len(self._box), 5)
76        keys.append(self._box.add(_bytes_sample_message))
77        self.assertEqual(len(self._box), 6)
78        with self.assertWarns(DeprecationWarning):
79            keys.append(self._box.add(
80                io.TextIOWrapper(io.BytesIO(_bytes_sample_message), encoding="utf-8")))
81        self.assertEqual(len(self._box), 7)
82        self.assertEqual(self._box.get_string(keys[0]), self._template % 0)
83        for i in (1, 2, 3, 4, 5, 6):
84            self._check_sample(self._box[keys[i]])
85
86    _nonascii_msg = textwrap.dedent("""\
87            From: foo
88            Subject: Falinaptár házhozszállítással. Már rendeltél?
89
90            0
91            """)
92
93    def test_add_invalid_8bit_bytes_header(self):
94        key = self._box.add(self._nonascii_msg.encode('latin-1'))
95        self.assertEqual(len(self._box), 1)
96        self.assertEqual(self._box.get_bytes(key),
97            self._nonascii_msg.encode('latin-1'))
98
99    def test_invalid_nonascii_header_as_string(self):
100        subj = self._nonascii_msg.splitlines()[1]
101        key = self._box.add(subj.encode('latin-1'))
102        self.assertEqual(self._box.get_string(key),
103            'Subject: =?unknown-8bit?b?RmFsaW5hcHThciBo4Xpob3pzeuFsbO104XNz'
104            'YWwuIE3hciByZW5kZWx06Ww/?=\n\n')
105
106    def test_add_nonascii_string_header_raises(self):
107        with self.assertRaisesRegex(ValueError, "ASCII-only"):
108            self._box.add(self._nonascii_msg)
109        self._box.flush()
110        self.assertEqual(len(self._box), 0)
111        self.assertMailboxEmpty()
112
113    def test_add_that_raises_leaves_mailbox_empty(self):
114        def raiser(*args, **kw):
115            raise Exception("a fake error")
116        support.patch(self, email.generator.BytesGenerator, 'flatten', raiser)
117        with self.assertRaises(Exception):
118            self._box.add(email.message_from_string("From: Alphöso"))
119        self.assertEqual(len(self._box), 0)
120        self._box.close()
121        self.assertMailboxEmpty()
122
123    _non_latin_bin_msg = textwrap.dedent("""\
124        From: foo@bar.com
125        To: báz
126        Subject: Maintenant je vous présente mon collègue, le pouf célèbre
127        \tJean de Baddie
128        Mime-Version: 1.0
129        Content-Type: text/plain; charset="utf-8"
130        Content-Transfer-Encoding: 8bit
131
132        Да, они летят.
133        """).encode('utf-8')
134
135    def test_add_8bit_body(self):
136        key = self._box.add(self._non_latin_bin_msg)
137        self.assertEqual(self._box.get_bytes(key),
138                         self._non_latin_bin_msg)
139        with self._box.get_file(key) as f:
140            self.assertEqual(f.read(),
141                             self._non_latin_bin_msg.replace(b'\n',
142                                os.linesep.encode()))
143        self.assertEqual(self._box[key].get_payload(),
144                        "Да, они летят.\n")
145
146    def test_add_binary_file(self):
147        with tempfile.TemporaryFile('wb+') as f:
148            f.write(_bytes_sample_message)
149            f.seek(0)
150            key = self._box.add(f)
151        self.assertEqual(self._box.get_bytes(key).split(b'\n'),
152            _bytes_sample_message.split(b'\n'))
153
154    def test_add_binary_nonascii_file(self):
155        with tempfile.TemporaryFile('wb+') as f:
156            f.write(self._non_latin_bin_msg)
157            f.seek(0)
158            key = self._box.add(f)
159        self.assertEqual(self._box.get_bytes(key).split(b'\n'),
160            self._non_latin_bin_msg.split(b'\n'))
161
162    def test_add_text_file_warns(self):
163        with tempfile.TemporaryFile('w+', encoding='utf-8') as f:
164            f.write(_sample_message)
165            f.seek(0)
166            with self.assertWarns(DeprecationWarning):
167                key = self._box.add(f)
168        self.assertEqual(self._box.get_bytes(key).split(b'\n'),
169            _bytes_sample_message.split(b'\n'))
170
171    def test_add_StringIO_warns(self):
172        with self.assertWarns(DeprecationWarning):
173            key = self._box.add(io.StringIO(self._template % "0"))
174        self.assertEqual(self._box.get_string(key), self._template % "0")
175
176    def test_add_nonascii_StringIO_raises(self):
177        with self.assertWarns(DeprecationWarning):
178            with self.assertRaisesRegex(ValueError, "ASCII-only"):
179                self._box.add(io.StringIO(self._nonascii_msg))
180        self.assertEqual(len(self._box), 0)
181        self._box.close()
182        self.assertMailboxEmpty()
183
184    def test_remove(self):
185        # Remove messages using remove()
186        self._test_remove_or_delitem(self._box.remove)
187
188    def test_delitem(self):
189        # Remove messages using __delitem__()
190        self._test_remove_or_delitem(self._box.__delitem__)
191
192    def _test_remove_or_delitem(self, method):
193        # (Used by test_remove() and test_delitem().)
194        key0 = self._box.add(self._template % 0)
195        key1 = self._box.add(self._template % 1)
196        self.assertEqual(len(self._box), 2)
197        method(key0)
198        self.assertEqual(len(self._box), 1)
199        self.assertRaises(KeyError, lambda: self._box[key0])
200        self.assertRaises(KeyError, lambda: method(key0))
201        self.assertEqual(self._box.get_string(key1), self._template % 1)
202        key2 = self._box.add(self._template % 2)
203        self.assertEqual(len(self._box), 2)
204        method(key2)
205        self.assertEqual(len(self._box), 1)
206        self.assertRaises(KeyError, lambda: self._box[key2])
207        self.assertRaises(KeyError, lambda: method(key2))
208        self.assertEqual(self._box.get_string(key1), self._template % 1)
209        method(key1)
210        self.assertEqual(len(self._box), 0)
211        self.assertRaises(KeyError, lambda: self._box[key1])
212        self.assertRaises(KeyError, lambda: method(key1))
213
214    def test_discard(self, repetitions=10):
215        # Discard messages
216        key0 = self._box.add(self._template % 0)
217        key1 = self._box.add(self._template % 1)
218        self.assertEqual(len(self._box), 2)
219        self._box.discard(key0)
220        self.assertEqual(len(self._box), 1)
221        self.assertRaises(KeyError, lambda: self._box[key0])
222        self._box.discard(key0)
223        self.assertEqual(len(self._box), 1)
224        self.assertRaises(KeyError, lambda: self._box[key0])
225
226    def test_get(self):
227        # Retrieve messages using get()
228        key0 = self._box.add(self._template % 0)
229        msg = self._box.get(key0)
230        self.assertEqual(msg['from'], 'foo')
231        self.assertEqual(msg.get_payload(), '0\n')
232        self.assertIsNone(self._box.get('foo'))
233        self.assertIs(self._box.get('foo', False), False)
234        self._box.close()
235        self._box = self._factory(self._path)
236        key1 = self._box.add(self._template % 1)
237        msg = self._box.get(key1)
238        self.assertEqual(msg['from'], 'foo')
239        self.assertEqual(msg.get_payload(), '1\n')
240
241    def test_getitem(self):
242        # Retrieve message using __getitem__()
243        key0 = self._box.add(self._template % 0)
244        msg = self._box[key0]
245        self.assertEqual(msg['from'], 'foo')
246        self.assertEqual(msg.get_payload(), '0\n')
247        self.assertRaises(KeyError, lambda: self._box['foo'])
248        self._box.discard(key0)
249        self.assertRaises(KeyError, lambda: self._box[key0])
250
251    def test_get_message(self):
252        # Get Message representations of messages
253        key0 = self._box.add(self._template % 0)
254        key1 = self._box.add(_sample_message)
255        msg0 = self._box.get_message(key0)
256        self.assertIsInstance(msg0, mailbox.Message)
257        self.assertEqual(msg0['from'], 'foo')
258        self.assertEqual(msg0.get_payload(), '0\n')
259        self._check_sample(self._box.get_message(key1))
260
261    def test_get_bytes(self):
262        # Get bytes representations of messages
263        key0 = self._box.add(self._template % 0)
264        key1 = self._box.add(_sample_message)
265        self.assertEqual(self._box.get_bytes(key0),
266            (self._template % 0).encode('ascii'))
267        self.assertEqual(self._box.get_bytes(key1), _bytes_sample_message)
268
269    def test_get_string(self):
270        # Get string representations of messages
271        key0 = self._box.add(self._template % 0)
272        key1 = self._box.add(_sample_message)
273        self.assertEqual(self._box.get_string(key0), self._template % 0)
274        self.assertEqual(self._box.get_string(key1).split('\n'),
275                         _sample_message.split('\n'))
276
277    def test_get_file(self):
278        # Get file representations of messages
279        key0 = self._box.add(self._template % 0)
280        key1 = self._box.add(_sample_message)
281        with self._box.get_file(key0) as file:
282            data0 = file.read()
283        with self._box.get_file(key1) as file:
284            data1 = file.read()
285        self.assertEqual(data0.decode('ascii').replace(os.linesep, '\n'),
286                         self._template % 0)
287        self.assertEqual(data1.decode('ascii').replace(os.linesep, '\n'),
288                         _sample_message)
289
290    def test_get_file_can_be_closed_twice(self):
291        # Issue 11700
292        key = self._box.add(_sample_message)
293        f = self._box.get_file(key)
294        f.close()
295        f.close()
296
297    def test_iterkeys(self):
298        # Get keys using iterkeys()
299        self._check_iteration(self._box.iterkeys, do_keys=True, do_values=False)
300
301    def test_keys(self):
302        # Get keys using keys()
303        self._check_iteration(self._box.keys, do_keys=True, do_values=False)
304
305    def test_itervalues(self):
306        # Get values using itervalues()
307        self._check_iteration(self._box.itervalues, do_keys=False,
308                              do_values=True)
309
310    def test_iter(self):
311        # Get values using __iter__()
312        self._check_iteration(self._box.__iter__, do_keys=False,
313                              do_values=True)
314
315    def test_values(self):
316        # Get values using values()
317        self._check_iteration(self._box.values, do_keys=False, do_values=True)
318
319    def test_iteritems(self):
320        # Get keys and values using iteritems()
321        self._check_iteration(self._box.iteritems, do_keys=True,
322                              do_values=True)
323
324    def test_items(self):
325        # Get keys and values using items()
326        self._check_iteration(self._box.items, do_keys=True, do_values=True)
327
328    def _check_iteration(self, method, do_keys, do_values, repetitions=10):
329        for value in method():
330            self.fail("Not empty")
331        keys, values = [], []
332        for i in range(repetitions):
333            keys.append(self._box.add(self._template % i))
334            values.append(self._template % i)
335        if do_keys and not do_values:
336            returned_keys = list(method())
337        elif do_values and not do_keys:
338            returned_values = list(method())
339        else:
340            returned_keys, returned_values = [], []
341            for key, value in method():
342                returned_keys.append(key)
343                returned_values.append(value)
344        if do_keys:
345            self.assertEqual(len(keys), len(returned_keys))
346            self.assertEqual(set(keys), set(returned_keys))
347        if do_values:
348            count = 0
349            for value in returned_values:
350                self.assertEqual(value['from'], 'foo')
351                self.assertLess(int(value.get_payload()), repetitions)
352                count += 1
353            self.assertEqual(len(values), count)
354
355    def test_contains(self):
356        # Check existence of keys using __contains__()
357        self.assertNotIn('foo', self._box)
358        key0 = self._box.add(self._template % 0)
359        self.assertIn(key0, self._box)
360        self.assertNotIn('foo', self._box)
361        key1 = self._box.add(self._template % 1)
362        self.assertIn(key1, self._box)
363        self.assertIn(key0, self._box)
364        self.assertNotIn('foo', self._box)
365        self._box.remove(key0)
366        self.assertNotIn(key0, self._box)
367        self.assertIn(key1, self._box)
368        self.assertNotIn('foo', self._box)
369        self._box.remove(key1)
370        self.assertNotIn(key1, self._box)
371        self.assertNotIn(key0, self._box)
372        self.assertNotIn('foo', self._box)
373
374    def test_len(self, repetitions=10):
375        # Get message count
376        keys = []
377        for i in range(repetitions):
378            self.assertEqual(len(self._box), i)
379            keys.append(self._box.add(self._template % i))
380            self.assertEqual(len(self._box), i + 1)
381        for i in range(repetitions):
382            self.assertEqual(len(self._box), repetitions - i)
383            self._box.remove(keys[i])
384            self.assertEqual(len(self._box), repetitions - i - 1)
385
386    def test_set_item(self):
387        # Modify messages using __setitem__()
388        key0 = self._box.add(self._template % 'original 0')
389        self.assertEqual(self._box.get_string(key0),
390                         self._template % 'original 0')
391        key1 = self._box.add(self._template % 'original 1')
392        self.assertEqual(self._box.get_string(key1),
393                         self._template % 'original 1')
394        self._box[key0] = self._template % 'changed 0'
395        self.assertEqual(self._box.get_string(key0),
396                         self._template % 'changed 0')
397        self._box[key1] = self._template % 'changed 1'
398        self.assertEqual(self._box.get_string(key1),
399                         self._template % 'changed 1')
400        self._box[key0] = _sample_message
401        self._check_sample(self._box[key0])
402        self._box[key1] = self._box[key0]
403        self._check_sample(self._box[key1])
404        self._box[key0] = self._template % 'original 0'
405        self.assertEqual(self._box.get_string(key0),
406                     self._template % 'original 0')
407        self._check_sample(self._box[key1])
408        self.assertRaises(KeyError,
409                          lambda: self._box.__setitem__('foo', 'bar'))
410        self.assertRaises(KeyError, lambda: self._box['foo'])
411        self.assertEqual(len(self._box), 2)
412
413    def test_clear(self, iterations=10):
414        # Remove all messages using clear()
415        keys = []
416        for i in range(iterations):
417            self._box.add(self._template % i)
418        for i, key in enumerate(keys):
419            self.assertEqual(self._box.get_string(key), self._template % i)
420        self._box.clear()
421        self.assertEqual(len(self._box), 0)
422        for i, key in enumerate(keys):
423            self.assertRaises(KeyError, lambda: self._box.get_string(key))
424
425    def test_pop(self):
426        # Get and remove a message using pop()
427        key0 = self._box.add(self._template % 0)
428        self.assertIn(key0, self._box)
429        key1 = self._box.add(self._template % 1)
430        self.assertIn(key1, self._box)
431        self.assertEqual(self._box.pop(key0).get_payload(), '0\n')
432        self.assertNotIn(key0, self._box)
433        self.assertIn(key1, self._box)
434        key2 = self._box.add(self._template % 2)
435        self.assertIn(key2, self._box)
436        self.assertEqual(self._box.pop(key2).get_payload(), '2\n')
437        self.assertNotIn(key2, self._box)
438        self.assertIn(key1, self._box)
439        self.assertEqual(self._box.pop(key1).get_payload(), '1\n')
440        self.assertNotIn(key1, self._box)
441        self.assertEqual(len(self._box), 0)
442
443    def test_popitem(self, iterations=10):
444        # Get and remove an arbitrary (key, message) using popitem()
445        keys = []
446        for i in range(10):
447            keys.append(self._box.add(self._template % i))
448        seen = []
449        for i in range(10):
450            key, msg = self._box.popitem()
451            self.assertIn(key, keys)
452            self.assertNotIn(key, seen)
453            seen.append(key)
454            self.assertEqual(int(msg.get_payload()), keys.index(key))
455        self.assertEqual(len(self._box), 0)
456        for key in keys:
457            self.assertRaises(KeyError, lambda: self._box[key])
458
459    def test_update(self):
460        # Modify multiple messages using update()
461        key0 = self._box.add(self._template % 'original 0')
462        key1 = self._box.add(self._template % 'original 1')
463        key2 = self._box.add(self._template % 'original 2')
464        self._box.update({key0: self._template % 'changed 0',
465                          key2: _sample_message})
466        self.assertEqual(len(self._box), 3)
467        self.assertEqual(self._box.get_string(key0),
468                     self._template % 'changed 0')
469        self.assertEqual(self._box.get_string(key1),
470                     self._template % 'original 1')
471        self._check_sample(self._box[key2])
472        self._box.update([(key2, self._template % 'changed 2'),
473                    (key1, self._template % 'changed 1'),
474                    (key0, self._template % 'original 0')])
475        self.assertEqual(len(self._box), 3)
476        self.assertEqual(self._box.get_string(key0),
477                     self._template % 'original 0')
478        self.assertEqual(self._box.get_string(key1),
479                     self._template % 'changed 1')
480        self.assertEqual(self._box.get_string(key2),
481                     self._template % 'changed 2')
482        self.assertRaises(KeyError,
483                          lambda: self._box.update({'foo': 'bar',
484                                          key0: self._template % "changed 0"}))
485        self.assertEqual(len(self._box), 3)
486        self.assertEqual(self._box.get_string(key0),
487                     self._template % "changed 0")
488        self.assertEqual(self._box.get_string(key1),
489                     self._template % "changed 1")
490        self.assertEqual(self._box.get_string(key2),
491                     self._template % "changed 2")
492
493    def test_flush(self):
494        # Write changes to disk
495        self._test_flush_or_close(self._box.flush, True)
496
497    def test_popitem_and_flush_twice(self):
498        # See #15036.
499        self._box.add(self._template % 0)
500        self._box.add(self._template % 1)
501        self._box.flush()
502
503        self._box.popitem()
504        self._box.flush()
505        self._box.popitem()
506        self._box.flush()
507
508    def test_lock_unlock(self):
509        # Lock and unlock the mailbox
510        self.assertFalse(os.path.exists(self._get_lock_path()))
511        self._box.lock()
512        self.assertTrue(os.path.exists(self._get_lock_path()))
513        self._box.unlock()
514        self.assertFalse(os.path.exists(self._get_lock_path()))
515
516    def test_close(self):
517        # Close mailbox and flush changes to disk
518        self._test_flush_or_close(self._box.close, False)
519
520    def _test_flush_or_close(self, method, should_call_close):
521        contents = [self._template % i for i in range(3)]
522        self._box.add(contents[0])
523        self._box.add(contents[1])
524        self._box.add(contents[2])
525        oldbox = self._box
526        method()
527        if should_call_close:
528            self._box.close()
529        self._box = self._factory(self._path)
530        keys = self._box.keys()
531        self.assertEqual(len(keys), 3)
532        for key in keys:
533            self.assertIn(self._box.get_string(key), contents)
534        oldbox.close()
535
536    def test_dump_message(self):
537        # Write message representations to disk
538        for input in (email.message_from_string(_sample_message),
539                      _sample_message, io.BytesIO(_bytes_sample_message)):
540            output = io.BytesIO()
541            self._box._dump_message(input, output)
542            self.assertEqual(output.getvalue(),
543                _bytes_sample_message.replace(b'\n', os.linesep.encode()))
544        output = io.BytesIO()
545        self.assertRaises(TypeError,
546                          lambda: self._box._dump_message(None, output))
547
548    def _get_lock_path(self):
549        # Return the path of the dot lock file. May be overridden.
550        return self._path + '.lock'
551
552
553class TestMailboxSuperclass(TestBase, unittest.TestCase):
554
555    def test_notimplemented(self):
556        # Test that all Mailbox methods raise NotImplementedException.
557        box = mailbox.Mailbox('path')
558        self.assertRaises(NotImplementedError, lambda: box.add(''))
559        self.assertRaises(NotImplementedError, lambda: box.remove(''))
560        self.assertRaises(NotImplementedError, lambda: box.__delitem__(''))
561        self.assertRaises(NotImplementedError, lambda: box.discard(''))
562        self.assertRaises(NotImplementedError, lambda: box.__setitem__('', ''))
563        self.assertRaises(NotImplementedError, lambda: box.iterkeys())
564        self.assertRaises(NotImplementedError, lambda: box.keys())
565        self.assertRaises(NotImplementedError, lambda: box.itervalues().__next__())
566        self.assertRaises(NotImplementedError, lambda: box.__iter__().__next__())
567        self.assertRaises(NotImplementedError, lambda: box.values())
568        self.assertRaises(NotImplementedError, lambda: box.iteritems().__next__())
569        self.assertRaises(NotImplementedError, lambda: box.items())
570        self.assertRaises(NotImplementedError, lambda: box.get(''))
571        self.assertRaises(NotImplementedError, lambda: box.__getitem__(''))
572        self.assertRaises(NotImplementedError, lambda: box.get_message(''))
573        self.assertRaises(NotImplementedError, lambda: box.get_string(''))
574        self.assertRaises(NotImplementedError, lambda: box.get_bytes(''))
575        self.assertRaises(NotImplementedError, lambda: box.get_file(''))
576        self.assertRaises(NotImplementedError, lambda: '' in box)
577        self.assertRaises(NotImplementedError, lambda: box.__contains__(''))
578        self.assertRaises(NotImplementedError, lambda: box.__len__())
579        self.assertRaises(NotImplementedError, lambda: box.clear())
580        self.assertRaises(NotImplementedError, lambda: box.pop(''))
581        self.assertRaises(NotImplementedError, lambda: box.popitem())
582        self.assertRaises(NotImplementedError, lambda: box.update((('', ''),)))
583        self.assertRaises(NotImplementedError, lambda: box.flush())
584        self.assertRaises(NotImplementedError, lambda: box.lock())
585        self.assertRaises(NotImplementedError, lambda: box.unlock())
586        self.assertRaises(NotImplementedError, lambda: box.close())
587
588
589class TestMaildir(TestMailbox, unittest.TestCase):
590
591    _factory = lambda self, path, factory=None: mailbox.Maildir(path, factory)
592
593    def setUp(self):
594        TestMailbox.setUp(self)
595        if (os.name == 'nt') or (sys.platform == 'cygwin'):
596            self._box.colon = '!'
597
598    def assertMailboxEmpty(self):
599        self.assertEqual(os.listdir(os.path.join(self._path, 'tmp')), [])
600
601    def test_add_MM(self):
602        # Add a MaildirMessage instance
603        msg = mailbox.MaildirMessage(self._template % 0)
604        msg.set_subdir('cur')
605        msg.set_info('foo')
606        key = self._box.add(msg)
607        self.assertTrue(os.path.exists(os.path.join(self._path, 'cur', '%s%sfoo' %
608                                                 (key, self._box.colon))))
609
610    def test_get_MM(self):
611        # Get a MaildirMessage instance
612        msg = mailbox.MaildirMessage(self._template % 0)
613        msg.set_subdir('cur')
614        msg.set_flags('RF')
615        key = self._box.add(msg)
616        msg_returned = self._box.get_message(key)
617        self.assertIsInstance(msg_returned, mailbox.MaildirMessage)
618        self.assertEqual(msg_returned.get_subdir(), 'cur')
619        self.assertEqual(msg_returned.get_flags(), 'FR')
620
621    def test_set_MM(self):
622        # Set with a MaildirMessage instance
623        msg0 = mailbox.MaildirMessage(self._template % 0)
624        msg0.set_flags('TP')
625        key = self._box.add(msg0)
626        msg_returned = self._box.get_message(key)
627        self.assertEqual(msg_returned.get_subdir(), 'new')
628        self.assertEqual(msg_returned.get_flags(), 'PT')
629        msg1 = mailbox.MaildirMessage(self._template % 1)
630        self._box[key] = msg1
631        msg_returned = self._box.get_message(key)
632        self.assertEqual(msg_returned.get_subdir(), 'new')
633        self.assertEqual(msg_returned.get_flags(), '')
634        self.assertEqual(msg_returned.get_payload(), '1\n')
635        msg2 = mailbox.MaildirMessage(self._template % 2)
636        msg2.set_info('2,S')
637        self._box[key] = msg2
638        self._box[key] = self._template % 3
639        msg_returned = self._box.get_message(key)
640        self.assertEqual(msg_returned.get_subdir(), 'new')
641        self.assertEqual(msg_returned.get_flags(), 'S')
642        self.assertEqual(msg_returned.get_payload(), '3\n')
643
644    def test_consistent_factory(self):
645        # Add a message.
646        msg = mailbox.MaildirMessage(self._template % 0)
647        msg.set_subdir('cur')
648        msg.set_flags('RF')
649        key = self._box.add(msg)
650
651        # Create new mailbox with
652        class FakeMessage(mailbox.MaildirMessage):
653            pass
654        box = mailbox.Maildir(self._path, factory=FakeMessage)
655        box.colon = self._box.colon
656        msg2 = box.get_message(key)
657        self.assertIsInstance(msg2, FakeMessage)
658
659    def test_initialize_new(self):
660        # Initialize a non-existent mailbox
661        self.tearDown()
662        self._box = mailbox.Maildir(self._path)
663        self._check_basics()
664        self._delete_recursively(self._path)
665        self._box = self._factory(self._path, factory=None)
666        self._check_basics()
667
668    def test_initialize_existing(self):
669        # Initialize an existing mailbox
670        self.tearDown()
671        for subdir in '', 'tmp', 'new', 'cur':
672            os.mkdir(os.path.normpath(os.path.join(self._path, subdir)))
673        self._box = mailbox.Maildir(self._path)
674        self._check_basics()
675
676    def _check_basics(self, factory=None):
677        # (Used by test_open_new() and test_open_existing().)
678        self.assertEqual(self._box._path, os.path.abspath(self._path))
679        self.assertEqual(self._box._factory, factory)
680        for subdir in '', 'tmp', 'new', 'cur':
681            path = os.path.join(self._path, subdir)
682            mode = os.stat(path)[stat.ST_MODE]
683            self.assertTrue(stat.S_ISDIR(mode), "Not a directory: '%s'" % path)
684
685    def test_list_folders(self):
686        # List folders
687        self._box.add_folder('one')
688        self._box.add_folder('two')
689        self._box.add_folder('three')
690        self.assertEqual(len(self._box.list_folders()), 3)
691        self.assertEqual(set(self._box.list_folders()),
692                     set(('one', 'two', 'three')))
693
694    def test_get_folder(self):
695        # Open folders
696        self._box.add_folder('foo.bar')
697        folder0 = self._box.get_folder('foo.bar')
698        folder0.add(self._template % 'bar')
699        self.assertTrue(os.path.isdir(os.path.join(self._path, '.foo.bar')))
700        folder1 = self._box.get_folder('foo.bar')
701        self.assertEqual(folder1.get_string(folder1.keys()[0]),
702                         self._template % 'bar')
703
704    def test_add_and_remove_folders(self):
705        # Delete folders
706        self._box.add_folder('one')
707        self._box.add_folder('two')
708        self.assertEqual(len(self._box.list_folders()), 2)
709        self.assertEqual(set(self._box.list_folders()), set(('one', 'two')))
710        self._box.remove_folder('one')
711        self.assertEqual(len(self._box.list_folders()), 1)
712        self.assertEqual(set(self._box.list_folders()), set(('two',)))
713        self._box.add_folder('three')
714        self.assertEqual(len(self._box.list_folders()), 2)
715        self.assertEqual(set(self._box.list_folders()), set(('two', 'three')))
716        self._box.remove_folder('three')
717        self.assertEqual(len(self._box.list_folders()), 1)
718        self.assertEqual(set(self._box.list_folders()), set(('two',)))
719        self._box.remove_folder('two')
720        self.assertEqual(len(self._box.list_folders()), 0)
721        self.assertEqual(self._box.list_folders(), [])
722
723    def test_clean(self):
724        # Remove old files from 'tmp'
725        foo_path = os.path.join(self._path, 'tmp', 'foo')
726        bar_path = os.path.join(self._path, 'tmp', 'bar')
727        with open(foo_path, 'w', encoding='utf-8') as f:
728            f.write("@")
729        with open(bar_path, 'w', encoding='utf-8') as f:
730            f.write("@")
731        self._box.clean()
732        self.assertTrue(os.path.exists(foo_path))
733        self.assertTrue(os.path.exists(bar_path))
734        foo_stat = os.stat(foo_path)
735        os.utime(foo_path, (time.time() - 129600 - 2,
736                            foo_stat.st_mtime))
737        self._box.clean()
738        self.assertFalse(os.path.exists(foo_path))
739        self.assertTrue(os.path.exists(bar_path))
740
741    def test_create_tmp(self, repetitions=10):
742        # Create files in tmp directory
743        hostname = socket.gethostname()
744        if '/' in hostname:
745            hostname = hostname.replace('/', r'\057')
746        if ':' in hostname:
747            hostname = hostname.replace(':', r'\072')
748        pid = os.getpid()
749        pattern = re.compile(r"(?P<time>\d+)\.M(?P<M>\d{1,6})P(?P<P>\d+)"
750                             r"Q(?P<Q>\d+)\.(?P<host>[^:/]*)")
751        previous_groups = None
752        for x in range(repetitions):
753            tmp_file = self._box._create_tmp()
754            head, tail = os.path.split(tmp_file.name)
755            self.assertEqual(head, os.path.abspath(os.path.join(self._path,
756                                                                "tmp")),
757                             "File in wrong location: '%s'" % head)
758            match = pattern.match(tail)
759            self.assertIsNotNone(match, "Invalid file name: '%s'" % tail)
760            groups = match.groups()
761            if previous_groups is not None:
762                self.assertGreaterEqual(int(groups[0]), int(previous_groups[0]),
763                             "Non-monotonic seconds: '%s' before '%s'" %
764                             (previous_groups[0], groups[0]))
765                if int(groups[0]) == int(previous_groups[0]):
766                    self.assertGreaterEqual(int(groups[1]), int(previous_groups[1]),
767                                "Non-monotonic milliseconds: '%s' before '%s'" %
768                                (previous_groups[1], groups[1]))
769                self.assertEqual(int(groups[2]), pid,
770                             "Process ID mismatch: '%s' should be '%s'" %
771                             (groups[2], pid))
772                self.assertEqual(int(groups[3]), int(previous_groups[3]) + 1,
773                             "Non-sequential counter: '%s' before '%s'" %
774                             (previous_groups[3], groups[3]))
775                self.assertEqual(groups[4], hostname,
776                             "Host name mismatch: '%s' should be '%s'" %
777                             (groups[4], hostname))
778            previous_groups = groups
779            tmp_file.write(_bytes_sample_message)
780            tmp_file.seek(0)
781            self.assertEqual(tmp_file.read(), _bytes_sample_message)
782            tmp_file.close()
783        file_count = len(os.listdir(os.path.join(self._path, "tmp")))
784        self.assertEqual(file_count, repetitions,
785                     "Wrong file count: '%s' should be '%s'" %
786                     (file_count, repetitions))
787
788    def test_refresh(self):
789        # Update the table of contents
790        self.assertEqual(self._box._toc, {})
791        key0 = self._box.add(self._template % 0)
792        key1 = self._box.add(self._template % 1)
793        self.assertEqual(self._box._toc, {})
794        self._box._refresh()
795        self.assertEqual(self._box._toc, {key0: os.path.join('new', key0),
796                                          key1: os.path.join('new', key1)})
797        key2 = self._box.add(self._template % 2)
798        self.assertEqual(self._box._toc, {key0: os.path.join('new', key0),
799                                          key1: os.path.join('new', key1)})
800        self._box._refresh()
801        self.assertEqual(self._box._toc, {key0: os.path.join('new', key0),
802                                          key1: os.path.join('new', key1),
803                                          key2: os.path.join('new', key2)})
804
805    def test_refresh_after_safety_period(self):
806        # Issue #13254: Call _refresh after the "file system safety
807        # period" of 2 seconds has passed; _toc should still be
808        # updated because this is the first call to _refresh.
809        key0 = self._box.add(self._template % 0)
810        key1 = self._box.add(self._template % 1)
811
812        self._box = self._factory(self._path)
813        self.assertEqual(self._box._toc, {})
814
815        # Emulate sleeping. Instead of sleeping for 2 seconds, use the
816        # skew factor to make _refresh think that the filesystem
817        # safety period has passed and re-reading the _toc is only
818        # required if mtimes differ.
819        self._box._skewfactor = -3
820
821        self._box._refresh()
822        self.assertEqual(sorted(self._box._toc.keys()), sorted([key0, key1]))
823
824    def test_lookup(self):
825        # Look up message subpaths in the TOC
826        self.assertRaises(KeyError, lambda: self._box._lookup('foo'))
827        key0 = self._box.add(self._template % 0)
828        self.assertEqual(self._box._lookup(key0), os.path.join('new', key0))
829        os.remove(os.path.join(self._path, 'new', key0))
830        self.assertEqual(self._box._toc, {key0: os.path.join('new', key0)})
831        # Be sure that the TOC is read back from disk (see issue #6896
832        # about bad mtime behaviour on some systems).
833        self._box.flush()
834        self.assertRaises(KeyError, lambda: self._box._lookup(key0))
835        self.assertEqual(self._box._toc, {})
836
837    def test_lock_unlock(self):
838        # Lock and unlock the mailbox. For Maildir, this does nothing.
839        self._box.lock()
840        self._box.unlock()
841
842    def test_folder (self):
843        # Test for bug #1569790: verify that folders returned by .get_folder()
844        # use the same factory function.
845        def dummy_factory (s):
846            return None
847        box = self._factory(self._path, factory=dummy_factory)
848        folder = box.add_folder('folder1')
849        self.assertIs(folder._factory, dummy_factory)
850
851        folder1_alias = box.get_folder('folder1')
852        self.assertIs(folder1_alias._factory, dummy_factory)
853
854    def test_directory_in_folder (self):
855        # Test that mailboxes still work if there's a stray extra directory
856        # in a folder.
857        for i in range(10):
858            self._box.add(mailbox.Message(_sample_message))
859
860        # Create a stray directory
861        os.mkdir(os.path.join(self._path, 'cur', 'stray-dir'))
862
863        # Check that looping still works with the directory present.
864        for msg in self._box:
865            pass
866
867    @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
868    def test_file_permissions(self):
869        # Verify that message files are created without execute permissions
870        msg = mailbox.MaildirMessage(self._template % 0)
871        orig_umask = os.umask(0)
872        try:
873            key = self._box.add(msg)
874        finally:
875            os.umask(orig_umask)
876        path = os.path.join(self._path, self._box._lookup(key))
877        mode = os.stat(path).st_mode
878        self.assertFalse(mode & 0o111)
879
880    @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
881    def test_folder_file_perms(self):
882        # From bug #3228, we want to verify that the file created inside a Maildir
883        # subfolder isn't marked as executable.
884        orig_umask = os.umask(0)
885        try:
886            subfolder = self._box.add_folder('subfolder')
887        finally:
888            os.umask(orig_umask)
889
890        path = os.path.join(subfolder._path, 'maildirfolder')
891        st = os.stat(path)
892        perms = st.st_mode
893        self.assertFalse((perms & 0o111)) # Execute bits should all be off.
894
895    def test_reread(self):
896        # Do an initial unconditional refresh
897        self._box._refresh()
898
899        # Put the last modified times more than two seconds into the past
900        # (because mtime may have a two second granularity)
901        for subdir in ('cur', 'new'):
902            os.utime(os.path.join(self._box._path, subdir),
903                     (time.time()-5,)*2)
904
905        # Because mtime has a two second granularity in worst case (FAT), a
906        # refresh is done unconditionally if called for within
907        # two-second-plus-a-bit of the last one, just in case the mbox has
908        # changed; so now we have to wait for that interval to expire.
909        #
910        # Because this is a test, emulate sleeping. Instead of
911        # sleeping for 2 seconds, use the skew factor to make _refresh
912        # think that 2 seconds have passed and re-reading the _toc is
913        # only required if mtimes differ.
914        self._box._skewfactor = -3
915
916        # Re-reading causes the ._toc attribute to be assigned a new dictionary
917        # object, so we'll check that the ._toc attribute isn't a different
918        # object.
919        orig_toc = self._box._toc
920        def refreshed():
921            return self._box._toc is not orig_toc
922
923        self._box._refresh()
924        self.assertFalse(refreshed())
925
926        # Now, write something into cur and remove it.  This changes
927        # the mtime and should cause a re-read. Note that "sleep
928        # emulation" is still in effect, as skewfactor is -3.
929        filename = os.path.join(self._path, 'cur', 'stray-file')
930        os_helper.create_empty_file(filename)
931        os.unlink(filename)
932        self._box._refresh()
933        self.assertTrue(refreshed())
934
935
936class _TestSingleFile(TestMailbox):
937    '''Common tests for single-file mailboxes'''
938
939    def test_add_doesnt_rewrite(self):
940        # When only adding messages, flush() should not rewrite the
941        # mailbox file. See issue #9559.
942
943        # Inode number changes if the contents are written to another
944        # file which is then renamed over the original file. So we
945        # must check that the inode number doesn't change.
946        inode_before = os.stat(self._path).st_ino
947
948        self._box.add(self._template % 0)
949        self._box.flush()
950
951        inode_after = os.stat(self._path).st_ino
952        self.assertEqual(inode_before, inode_after)
953
954        # Make sure the message was really added
955        self._box.close()
956        self._box = self._factory(self._path)
957        self.assertEqual(len(self._box), 1)
958
959    def test_permissions_after_flush(self):
960        # See issue #5346
961
962        # Make the mailbox world writable. It's unlikely that the new
963        # mailbox file would have these permissions after flush(),
964        # because umask usually prevents it.
965        mode = os.stat(self._path).st_mode | 0o666
966        os.chmod(self._path, mode)
967
968        self._box.add(self._template % 0)
969        i = self._box.add(self._template % 1)
970        # Need to remove one message to make flush() create a new file
971        self._box.remove(i)
972        self._box.flush()
973
974        self.assertEqual(os.stat(self._path).st_mode, mode)
975
976
977class _TestMboxMMDF(_TestSingleFile):
978
979    def tearDown(self):
980        super().tearDown()
981        self._box.close()
982        self._delete_recursively(self._path)
983        for lock_remnant in glob.glob(glob.escape(self._path) + '.*'):
984            os_helper.unlink(lock_remnant)
985
986    def assertMailboxEmpty(self):
987        with open(self._path, 'rb') as f:
988            self.assertEqual(f.readlines(), [])
989
990    def test_get_bytes_from(self):
991        # Get bytes representations of messages with _unixfrom.
992        unixfrom = 'From foo@bar blah\n'
993        key0 = self._box.add(unixfrom + self._template % 0)
994        key1 = self._box.add(unixfrom + _sample_message)
995        self.assertEqual(self._box.get_bytes(key0, from_=False),
996            (self._template % 0).encode('ascii'))
997        self.assertEqual(self._box.get_bytes(key1, from_=False),
998            _bytes_sample_message)
999        self.assertEqual(self._box.get_bytes(key0, from_=True),
1000            (unixfrom + self._template % 0).encode('ascii'))
1001        self.assertEqual(self._box.get_bytes(key1, from_=True),
1002            unixfrom.encode('ascii') + _bytes_sample_message)
1003
1004    def test_get_string_from(self):
1005        # Get string representations of messages with _unixfrom.
1006        unixfrom = 'From foo@bar blah\n'
1007        key0 = self._box.add(unixfrom + self._template % 0)
1008        key1 = self._box.add(unixfrom + _sample_message)
1009        self.assertEqual(self._box.get_string(key0, from_=False),
1010                         self._template % 0)
1011        self.assertEqual(self._box.get_string(key1, from_=False).split('\n'),
1012                         _sample_message.split('\n'))
1013        self.assertEqual(self._box.get_string(key0, from_=True),
1014                         unixfrom + self._template % 0)
1015        self.assertEqual(self._box.get_string(key1, from_=True).split('\n'),
1016                         (unixfrom + _sample_message).split('\n'))
1017
1018    def test_add_from_string(self):
1019        # Add a string starting with 'From ' to the mailbox
1020        key = self._box.add('From foo@bar blah\nFrom: foo\n\n0\n')
1021        self.assertEqual(self._box[key].get_from(), 'foo@bar blah')
1022        self.assertEqual(self._box[key].get_payload(), '0\n')
1023
1024    def test_add_from_bytes(self):
1025        # Add a byte string starting with 'From ' to the mailbox
1026        key = self._box.add(b'From foo@bar blah\nFrom: foo\n\n0\n')
1027        self.assertEqual(self._box[key].get_from(), 'foo@bar blah')
1028        self.assertEqual(self._box[key].get_payload(), '0\n')
1029
1030    def test_add_mbox_or_mmdf_message(self):
1031        # Add an mboxMessage or MMDFMessage
1032        for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1033            msg = class_('From foo@bar blah\nFrom: foo\n\n0\n')
1034            key = self._box.add(msg)
1035
1036    def test_open_close_open(self):
1037        # Open and inspect previously-created mailbox
1038        values = [self._template % i for i in range(3)]
1039        for value in values:
1040            self._box.add(value)
1041        self._box.close()
1042        mtime = os.path.getmtime(self._path)
1043        self._box = self._factory(self._path)
1044        self.assertEqual(len(self._box), 3)
1045        for key in self._box.iterkeys():
1046            self.assertIn(self._box.get_string(key), values)
1047        self._box.close()
1048        self.assertEqual(mtime, os.path.getmtime(self._path))
1049
1050    def test_add_and_close(self):
1051        # Verifying that closing a mailbox doesn't change added items
1052        self._box.add(_sample_message)
1053        for i in range(3):
1054            self._box.add(self._template % i)
1055        self._box.add(_sample_message)
1056        self._box._file.flush()
1057        self._box._file.seek(0)
1058        contents = self._box._file.read()
1059        self._box.close()
1060        with open(self._path, 'rb') as f:
1061            self.assertEqual(contents, f.read())
1062        self._box = self._factory(self._path)
1063
1064    @unittest.skipUnless(hasattr(os, 'fork'), "Test needs fork().")
1065    @unittest.skipUnless(hasattr(socket, 'socketpair'), "Test needs socketpair().")
1066    def test_lock_conflict(self):
1067        # Fork off a child process that will lock the mailbox temporarily,
1068        # unlock it and exit.
1069        c, p = socket.socketpair()
1070        self.addCleanup(c.close)
1071        self.addCleanup(p.close)
1072
1073        pid = os.fork()
1074        if pid == 0:
1075            # child
1076            try:
1077                # lock the mailbox, and signal the parent it can proceed
1078                self._box.lock()
1079                c.send(b'c')
1080
1081                # wait until the parent is done, and unlock the mailbox
1082                c.recv(1)
1083                self._box.unlock()
1084            finally:
1085                os._exit(0)
1086
1087        # In the parent, wait until the child signals it locked the mailbox.
1088        p.recv(1)
1089        try:
1090            self.assertRaises(mailbox.ExternalClashError,
1091                              self._box.lock)
1092        finally:
1093            # Signal the child it can now release the lock and exit.
1094            p.send(b'p')
1095            # Wait for child to exit.  Locking should now succeed.
1096            support.wait_process(pid, exitcode=0)
1097
1098        self._box.lock()
1099        self._box.unlock()
1100
1101    def test_relock(self):
1102        # Test case for bug #1575506: the mailbox class was locking the
1103        # wrong file object in its flush() method.
1104        msg = "Subject: sub\n\nbody\n"
1105        key1 = self._box.add(msg)
1106        self._box.flush()
1107        self._box.close()
1108
1109        self._box = self._factory(self._path)
1110        self._box.lock()
1111        key2 = self._box.add(msg)
1112        self._box.flush()
1113        self.assertTrue(self._box._locked)
1114        self._box.close()
1115
1116
1117class TestMbox(_TestMboxMMDF, unittest.TestCase):
1118
1119    _factory = lambda self, path, factory=None: mailbox.mbox(path, factory)
1120
1121    @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
1122    def test_file_perms(self):
1123        # From bug #3228, we want to verify that the mailbox file isn't executable,
1124        # even if the umask is set to something that would leave executable bits set.
1125        # We only run this test on platforms that support umask.
1126        try:
1127            old_umask = os.umask(0o077)
1128            self._box.close()
1129            os.unlink(self._path)
1130            self._box = mailbox.mbox(self._path, create=True)
1131            self._box.add('')
1132            self._box.close()
1133        finally:
1134            os.umask(old_umask)
1135
1136        st = os.stat(self._path)
1137        perms = st.st_mode
1138        self.assertFalse((perms & 0o111)) # Execute bits should all be off.
1139
1140    def test_terminating_newline(self):
1141        message = email.message.Message()
1142        message['From'] = 'john@example.com'
1143        message.set_payload('No newline at the end')
1144        i = self._box.add(message)
1145
1146        # A newline should have been appended to the payload
1147        message = self._box.get(i)
1148        self.assertEqual(message.get_payload(), 'No newline at the end\n')
1149
1150    def test_message_separator(self):
1151        # Check there's always a single blank line after each message
1152        self._box.add('From: foo\n\n0')  # No newline at the end
1153        with open(self._path, encoding='utf-8') as f:
1154            data = f.read()
1155            self.assertEqual(data[-3:], '0\n\n')
1156
1157        self._box.add('From: foo\n\n0\n')  # Newline at the end
1158        with open(self._path, encoding='utf-8') as f:
1159            data = f.read()
1160            self.assertEqual(data[-3:], '0\n\n')
1161
1162
1163class TestMMDF(_TestMboxMMDF, unittest.TestCase):
1164
1165    _factory = lambda self, path, factory=None: mailbox.MMDF(path, factory)
1166
1167
1168class TestMH(TestMailbox, unittest.TestCase):
1169
1170    _factory = lambda self, path, factory=None: mailbox.MH(path, factory)
1171
1172    def assertMailboxEmpty(self):
1173        self.assertEqual(os.listdir(self._path), ['.mh_sequences'])
1174
1175    def test_list_folders(self):
1176        # List folders
1177        self._box.add_folder('one')
1178        self._box.add_folder('two')
1179        self._box.add_folder('three')
1180        self.assertEqual(len(self._box.list_folders()), 3)
1181        self.assertEqual(set(self._box.list_folders()),
1182                     set(('one', 'two', 'three')))
1183
1184    def test_get_folder(self):
1185        # Open folders
1186        def dummy_factory (s):
1187            return None
1188        self._box = self._factory(self._path, dummy_factory)
1189
1190        new_folder = self._box.add_folder('foo.bar')
1191        folder0 = self._box.get_folder('foo.bar')
1192        folder0.add(self._template % 'bar')
1193        self.assertTrue(os.path.isdir(os.path.join(self._path, 'foo.bar')))
1194        folder1 = self._box.get_folder('foo.bar')
1195        self.assertEqual(folder1.get_string(folder1.keys()[0]),
1196                         self._template % 'bar')
1197
1198        # Test for bug #1569790: verify that folders returned by .get_folder()
1199        # use the same factory function.
1200        self.assertIs(new_folder._factory, self._box._factory)
1201        self.assertIs(folder0._factory, self._box._factory)
1202
1203    def test_add_and_remove_folders(self):
1204        # Delete folders
1205        self._box.add_folder('one')
1206        self._box.add_folder('two')
1207        self.assertEqual(len(self._box.list_folders()), 2)
1208        self.assertEqual(set(self._box.list_folders()), set(('one', 'two')))
1209        self._box.remove_folder('one')
1210        self.assertEqual(len(self._box.list_folders()), 1)
1211        self.assertEqual(set(self._box.list_folders()), set(('two',)))
1212        self._box.add_folder('three')
1213        self.assertEqual(len(self._box.list_folders()), 2)
1214        self.assertEqual(set(self._box.list_folders()), set(('two', 'three')))
1215        self._box.remove_folder('three')
1216        self.assertEqual(len(self._box.list_folders()), 1)
1217        self.assertEqual(set(self._box.list_folders()), set(('two',)))
1218        self._box.remove_folder('two')
1219        self.assertEqual(len(self._box.list_folders()), 0)
1220        self.assertEqual(self._box.list_folders(), [])
1221
1222    def test_sequences(self):
1223        # Get and set sequences
1224        self.assertEqual(self._box.get_sequences(), {})
1225        msg0 = mailbox.MHMessage(self._template % 0)
1226        msg0.add_sequence('foo')
1227        key0 = self._box.add(msg0)
1228        self.assertEqual(self._box.get_sequences(), {'foo':[key0]})
1229        msg1 = mailbox.MHMessage(self._template % 1)
1230        msg1.set_sequences(['bar', 'replied', 'foo'])
1231        key1 = self._box.add(msg1)
1232        self.assertEqual(self._box.get_sequences(),
1233                     {'foo':[key0, key1], 'bar':[key1], 'replied':[key1]})
1234        msg0.set_sequences(['flagged'])
1235        self._box[key0] = msg0
1236        self.assertEqual(self._box.get_sequences(),
1237                     {'foo':[key1], 'bar':[key1], 'replied':[key1],
1238                      'flagged':[key0]})
1239        self._box.remove(key1)
1240        self.assertEqual(self._box.get_sequences(), {'flagged':[key0]})
1241
1242    def test_issue2625(self):
1243        msg0 = mailbox.MHMessage(self._template % 0)
1244        msg0.add_sequence('foo')
1245        key0 = self._box.add(msg0)
1246        refmsg0 = self._box.get_message(key0)
1247
1248    def test_issue7627(self):
1249        msg0 = mailbox.MHMessage(self._template % 0)
1250        key0 = self._box.add(msg0)
1251        self._box.lock()
1252        self._box.remove(key0)
1253        self._box.unlock()
1254
1255    def test_pack(self):
1256        # Pack the contents of the mailbox
1257        msg0 = mailbox.MHMessage(self._template % 0)
1258        msg1 = mailbox.MHMessage(self._template % 1)
1259        msg2 = mailbox.MHMessage(self._template % 2)
1260        msg3 = mailbox.MHMessage(self._template % 3)
1261        msg0.set_sequences(['foo', 'unseen'])
1262        msg1.set_sequences(['foo'])
1263        msg2.set_sequences(['foo', 'flagged'])
1264        msg3.set_sequences(['foo', 'bar', 'replied'])
1265        key0 = self._box.add(msg0)
1266        key1 = self._box.add(msg1)
1267        key2 = self._box.add(msg2)
1268        key3 = self._box.add(msg3)
1269        self.assertEqual(self._box.get_sequences(),
1270                     {'foo':[key0,key1,key2,key3], 'unseen':[key0],
1271                      'flagged':[key2], 'bar':[key3], 'replied':[key3]})
1272        self._box.remove(key2)
1273        self.assertEqual(self._box.get_sequences(),
1274                     {'foo':[key0,key1,key3], 'unseen':[key0], 'bar':[key3],
1275                      'replied':[key3]})
1276        self._box.pack()
1277        self.assertEqual(self._box.keys(), [1, 2, 3])
1278        key0 = key0
1279        key1 = key0 + 1
1280        key2 = key1 + 1
1281        self.assertEqual(self._box.get_sequences(),
1282                     {'foo':[1, 2, 3], 'unseen':[1], 'bar':[3], 'replied':[3]})
1283
1284        # Test case for packing while holding the mailbox locked.
1285        key0 = self._box.add(msg1)
1286        key1 = self._box.add(msg1)
1287        key2 = self._box.add(msg1)
1288        key3 = self._box.add(msg1)
1289
1290        self._box.remove(key0)
1291        self._box.remove(key2)
1292        self._box.lock()
1293        self._box.pack()
1294        self._box.unlock()
1295        self.assertEqual(self._box.get_sequences(),
1296                     {'foo':[1, 2, 3, 4, 5],
1297                      'unseen':[1], 'bar':[3], 'replied':[3]})
1298
1299    def _get_lock_path(self):
1300        return os.path.join(self._path, '.mh_sequences.lock')
1301
1302
1303class TestBabyl(_TestSingleFile, unittest.TestCase):
1304
1305    _factory = lambda self, path, factory=None: mailbox.Babyl(path, factory)
1306
1307    def assertMailboxEmpty(self):
1308        with open(self._path, 'rb') as f:
1309            self.assertEqual(f.readlines(), [])
1310
1311    def tearDown(self):
1312        super().tearDown()
1313        self._box.close()
1314        self._delete_recursively(self._path)
1315        for lock_remnant in glob.glob(glob.escape(self._path) + '.*'):
1316            os_helper.unlink(lock_remnant)
1317
1318    def test_labels(self):
1319        # Get labels from the mailbox
1320        self.assertEqual(self._box.get_labels(), [])
1321        msg0 = mailbox.BabylMessage(self._template % 0)
1322        msg0.add_label('foo')
1323        key0 = self._box.add(msg0)
1324        self.assertEqual(self._box.get_labels(), ['foo'])
1325        msg1 = mailbox.BabylMessage(self._template % 1)
1326        msg1.set_labels(['bar', 'answered', 'foo'])
1327        key1 = self._box.add(msg1)
1328        self.assertEqual(set(self._box.get_labels()), set(['foo', 'bar']))
1329        msg0.set_labels(['blah', 'filed'])
1330        self._box[key0] = msg0
1331        self.assertEqual(set(self._box.get_labels()),
1332                     set(['foo', 'bar', 'blah']))
1333        self._box.remove(key1)
1334        self.assertEqual(set(self._box.get_labels()), set(['blah']))
1335
1336
1337class FakeFileLikeObject:
1338
1339    def __init__(self):
1340        self.closed = False
1341
1342    def close(self):
1343        self.closed = True
1344
1345
1346class FakeMailBox(mailbox.Mailbox):
1347
1348    def __init__(self):
1349        mailbox.Mailbox.__init__(self, '', lambda file: None)
1350        self.files = [FakeFileLikeObject() for i in range(10)]
1351
1352    def get_file(self, key):
1353        return self.files[key]
1354
1355
1356class TestFakeMailBox(unittest.TestCase):
1357
1358    def test_closing_fd(self):
1359        box = FakeMailBox()
1360        for i in range(10):
1361            self.assertFalse(box.files[i].closed)
1362        for i in range(10):
1363            box[i]
1364        for i in range(10):
1365            self.assertTrue(box.files[i].closed)
1366
1367
1368class TestMessage(TestBase, unittest.TestCase):
1369
1370    _factory = mailbox.Message      # Overridden by subclasses to reuse tests
1371
1372    def setUp(self):
1373        self._path = os_helper.TESTFN
1374
1375    def tearDown(self):
1376        self._delete_recursively(self._path)
1377
1378    def test_initialize_with_eMM(self):
1379        # Initialize based on email.message.Message instance
1380        eMM = email.message_from_string(_sample_message)
1381        msg = self._factory(eMM)
1382        self._post_initialize_hook(msg)
1383        self._check_sample(msg)
1384
1385    def test_initialize_with_string(self):
1386        # Initialize based on string
1387        msg = self._factory(_sample_message)
1388        self._post_initialize_hook(msg)
1389        self._check_sample(msg)
1390
1391    def test_initialize_with_file(self):
1392        # Initialize based on contents of file
1393        with open(self._path, 'w+', encoding='utf-8') as f:
1394            f.write(_sample_message)
1395            f.seek(0)
1396            msg = self._factory(f)
1397            self._post_initialize_hook(msg)
1398            self._check_sample(msg)
1399
1400    def test_initialize_with_binary_file(self):
1401        # Initialize based on contents of binary file
1402        with open(self._path, 'wb+') as f:
1403            f.write(_bytes_sample_message)
1404            f.seek(0)
1405            msg = self._factory(f)
1406            self._post_initialize_hook(msg)
1407            self._check_sample(msg)
1408
1409    def test_initialize_with_nothing(self):
1410        # Initialize without arguments
1411        msg = self._factory()
1412        self._post_initialize_hook(msg)
1413        self.assertIsInstance(msg, email.message.Message)
1414        self.assertIsInstance(msg, mailbox.Message)
1415        self.assertIsInstance(msg, self._factory)
1416        self.assertEqual(msg.keys(), [])
1417        self.assertFalse(msg.is_multipart())
1418        self.assertIsNone(msg.get_payload())
1419
1420    def test_initialize_incorrectly(self):
1421        # Initialize with invalid argument
1422        self.assertRaises(TypeError, lambda: self._factory(object()))
1423
1424    def test_all_eMM_attributes_exist(self):
1425        # Issue 12537
1426        eMM = email.message_from_string(_sample_message)
1427        msg = self._factory(_sample_message)
1428        for attr in eMM.__dict__:
1429            self.assertIn(attr, msg.__dict__,
1430                '{} attribute does not exist'.format(attr))
1431
1432    def test_become_message(self):
1433        # Take on the state of another message
1434        eMM = email.message_from_string(_sample_message)
1435        msg = self._factory()
1436        msg._become_message(eMM)
1437        self._check_sample(msg)
1438
1439    def test_explain_to(self):
1440        # Copy self's format-specific data to other message formats.
1441        # This test is superficial; better ones are in TestMessageConversion.
1442        msg = self._factory()
1443        for class_ in self.all_mailbox_types:
1444            other_msg = class_()
1445            msg._explain_to(other_msg)
1446        other_msg = email.message.Message()
1447        self.assertRaises(TypeError, lambda: msg._explain_to(other_msg))
1448
1449    def _post_initialize_hook(self, msg):
1450        # Overridden by subclasses to check extra things after initialization
1451        pass
1452
1453
1454class TestMaildirMessage(TestMessage, unittest.TestCase):
1455
1456    _factory = mailbox.MaildirMessage
1457
1458    def _post_initialize_hook(self, msg):
1459        self.assertEqual(msg._subdir, 'new')
1460        self.assertEqual(msg._info, '')
1461
1462    def test_subdir(self):
1463        # Use get_subdir() and set_subdir()
1464        msg = mailbox.MaildirMessage(_sample_message)
1465        self.assertEqual(msg.get_subdir(), 'new')
1466        msg.set_subdir('cur')
1467        self.assertEqual(msg.get_subdir(), 'cur')
1468        msg.set_subdir('new')
1469        self.assertEqual(msg.get_subdir(), 'new')
1470        self.assertRaises(ValueError, lambda: msg.set_subdir('tmp'))
1471        self.assertEqual(msg.get_subdir(), 'new')
1472        msg.set_subdir('new')
1473        self.assertEqual(msg.get_subdir(), 'new')
1474        self._check_sample(msg)
1475
1476    def test_flags(self):
1477        # Use get_flags(), set_flags(), add_flag(), remove_flag()
1478        msg = mailbox.MaildirMessage(_sample_message)
1479        self.assertEqual(msg.get_flags(), '')
1480        self.assertEqual(msg.get_subdir(), 'new')
1481        msg.set_flags('F')
1482        self.assertEqual(msg.get_subdir(), 'new')
1483        self.assertEqual(msg.get_flags(), 'F')
1484        msg.set_flags('SDTP')
1485        self.assertEqual(msg.get_flags(), 'DPST')
1486        msg.add_flag('FT')
1487        self.assertEqual(msg.get_flags(), 'DFPST')
1488        msg.remove_flag('TDRP')
1489        self.assertEqual(msg.get_flags(), 'FS')
1490        self.assertEqual(msg.get_subdir(), 'new')
1491        self._check_sample(msg)
1492
1493    def test_date(self):
1494        # Use get_date() and set_date()
1495        msg = mailbox.MaildirMessage(_sample_message)
1496        self.assertLess(abs(msg.get_date() - time.time()), 60)
1497        msg.set_date(0.0)
1498        self.assertEqual(msg.get_date(), 0.0)
1499
1500    def test_info(self):
1501        # Use get_info() and set_info()
1502        msg = mailbox.MaildirMessage(_sample_message)
1503        self.assertEqual(msg.get_info(), '')
1504        msg.set_info('1,foo=bar')
1505        self.assertEqual(msg.get_info(), '1,foo=bar')
1506        self.assertRaises(TypeError, lambda: msg.set_info(None))
1507        self._check_sample(msg)
1508
1509    def test_info_and_flags(self):
1510        # Test interaction of info and flag methods
1511        msg = mailbox.MaildirMessage(_sample_message)
1512        self.assertEqual(msg.get_info(), '')
1513        msg.set_flags('SF')
1514        self.assertEqual(msg.get_flags(), 'FS')
1515        self.assertEqual(msg.get_info(), '2,FS')
1516        msg.set_info('1,')
1517        self.assertEqual(msg.get_flags(), '')
1518        self.assertEqual(msg.get_info(), '1,')
1519        msg.remove_flag('RPT')
1520        self.assertEqual(msg.get_flags(), '')
1521        self.assertEqual(msg.get_info(), '1,')
1522        msg.add_flag('D')
1523        self.assertEqual(msg.get_flags(), 'D')
1524        self.assertEqual(msg.get_info(), '2,D')
1525        self._check_sample(msg)
1526
1527
1528class _TestMboxMMDFMessage:
1529
1530    _factory = mailbox._mboxMMDFMessage
1531
1532    def _post_initialize_hook(self, msg):
1533        self._check_from(msg)
1534
1535    def test_initialize_with_unixfrom(self):
1536        # Initialize with a message that already has a _unixfrom attribute
1537        msg = mailbox.Message(_sample_message)
1538        msg.set_unixfrom('From foo@bar blah')
1539        msg = mailbox.mboxMessage(msg)
1540        self.assertEqual(msg.get_from(), 'foo@bar blah', msg.get_from())
1541
1542    def test_from(self):
1543        # Get and set "From " line
1544        msg = mailbox.mboxMessage(_sample_message)
1545        self._check_from(msg)
1546        msg.set_from('foo bar')
1547        self.assertEqual(msg.get_from(), 'foo bar')
1548        msg.set_from('foo@bar', True)
1549        self._check_from(msg, 'foo@bar')
1550        msg.set_from('blah@temp', time.localtime())
1551        self._check_from(msg, 'blah@temp')
1552
1553    def test_flags(self):
1554        # Use get_flags(), set_flags(), add_flag(), remove_flag()
1555        msg = mailbox.mboxMessage(_sample_message)
1556        self.assertEqual(msg.get_flags(), '')
1557        msg.set_flags('F')
1558        self.assertEqual(msg.get_flags(), 'F')
1559        msg.set_flags('XODR')
1560        self.assertEqual(msg.get_flags(), 'RODX')
1561        msg.add_flag('FA')
1562        self.assertEqual(msg.get_flags(), 'RODFAX')
1563        msg.remove_flag('FDXA')
1564        self.assertEqual(msg.get_flags(), 'RO')
1565        self._check_sample(msg)
1566
1567    def _check_from(self, msg, sender=None):
1568        # Check contents of "From " line
1569        if sender is None:
1570            sender = "MAILER-DAEMON"
1571        self.assertIsNotNone(re.match(
1572                sender + r" \w{3} \w{3} [\d ]\d [\d ]\d:\d{2}:\d{2} \d{4}",
1573                msg.get_from()))
1574
1575
1576class TestMboxMessage(_TestMboxMMDFMessage, TestMessage):
1577
1578    _factory = mailbox.mboxMessage
1579
1580
1581class TestMHMessage(TestMessage, unittest.TestCase):
1582
1583    _factory = mailbox.MHMessage
1584
1585    def _post_initialize_hook(self, msg):
1586        self.assertEqual(msg._sequences, [])
1587
1588    def test_sequences(self):
1589        # Get, set, join, and leave sequences
1590        msg = mailbox.MHMessage(_sample_message)
1591        self.assertEqual(msg.get_sequences(), [])
1592        msg.set_sequences(['foobar'])
1593        self.assertEqual(msg.get_sequences(), ['foobar'])
1594        msg.set_sequences([])
1595        self.assertEqual(msg.get_sequences(), [])
1596        msg.add_sequence('unseen')
1597        self.assertEqual(msg.get_sequences(), ['unseen'])
1598        msg.add_sequence('flagged')
1599        self.assertEqual(msg.get_sequences(), ['unseen', 'flagged'])
1600        msg.add_sequence('flagged')
1601        self.assertEqual(msg.get_sequences(), ['unseen', 'flagged'])
1602        msg.remove_sequence('unseen')
1603        self.assertEqual(msg.get_sequences(), ['flagged'])
1604        msg.add_sequence('foobar')
1605        self.assertEqual(msg.get_sequences(), ['flagged', 'foobar'])
1606        msg.remove_sequence('replied')
1607        self.assertEqual(msg.get_sequences(), ['flagged', 'foobar'])
1608        msg.set_sequences(['foobar', 'replied'])
1609        self.assertEqual(msg.get_sequences(), ['foobar', 'replied'])
1610
1611
1612class TestBabylMessage(TestMessage, unittest.TestCase):
1613
1614    _factory = mailbox.BabylMessage
1615
1616    def _post_initialize_hook(self, msg):
1617        self.assertEqual(msg._labels, [])
1618
1619    def test_labels(self):
1620        # Get, set, join, and leave labels
1621        msg = mailbox.BabylMessage(_sample_message)
1622        self.assertEqual(msg.get_labels(), [])
1623        msg.set_labels(['foobar'])
1624        self.assertEqual(msg.get_labels(), ['foobar'])
1625        msg.set_labels([])
1626        self.assertEqual(msg.get_labels(), [])
1627        msg.add_label('filed')
1628        self.assertEqual(msg.get_labels(), ['filed'])
1629        msg.add_label('resent')
1630        self.assertEqual(msg.get_labels(), ['filed', 'resent'])
1631        msg.add_label('resent')
1632        self.assertEqual(msg.get_labels(), ['filed', 'resent'])
1633        msg.remove_label('filed')
1634        self.assertEqual(msg.get_labels(), ['resent'])
1635        msg.add_label('foobar')
1636        self.assertEqual(msg.get_labels(), ['resent', 'foobar'])
1637        msg.remove_label('unseen')
1638        self.assertEqual(msg.get_labels(), ['resent', 'foobar'])
1639        msg.set_labels(['foobar', 'answered'])
1640        self.assertEqual(msg.get_labels(), ['foobar', 'answered'])
1641
1642    def test_visible(self):
1643        # Get, set, and update visible headers
1644        msg = mailbox.BabylMessage(_sample_message)
1645        visible = msg.get_visible()
1646        self.assertEqual(visible.keys(), [])
1647        self.assertIsNone(visible.get_payload())
1648        visible['User-Agent'] = 'FooBar 1.0'
1649        visible['X-Whatever'] = 'Blah'
1650        self.assertEqual(msg.get_visible().keys(), [])
1651        msg.set_visible(visible)
1652        visible = msg.get_visible()
1653        self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever'])
1654        self.assertEqual(visible['User-Agent'], 'FooBar 1.0')
1655        self.assertEqual(visible['X-Whatever'], 'Blah')
1656        self.assertIsNone(visible.get_payload())
1657        msg.update_visible()
1658        self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever'])
1659        self.assertIsNone(visible.get_payload())
1660        visible = msg.get_visible()
1661        self.assertEqual(visible.keys(), ['User-Agent', 'Date', 'From', 'To',
1662                                          'Subject'])
1663        for header in ('User-Agent', 'Date', 'From', 'To', 'Subject'):
1664            self.assertEqual(visible[header], msg[header])
1665
1666
1667class TestMMDFMessage(_TestMboxMMDFMessage, TestMessage):
1668
1669    _factory = mailbox.MMDFMessage
1670
1671
1672class TestMessageConversion(TestBase, unittest.TestCase):
1673
1674    def test_plain_to_x(self):
1675        # Convert Message to all formats
1676        for class_ in self.all_mailbox_types:
1677            msg_plain = mailbox.Message(_sample_message)
1678            msg = class_(msg_plain)
1679            self._check_sample(msg)
1680
1681    def test_x_to_plain(self):
1682        # Convert all formats to Message
1683        for class_ in self.all_mailbox_types:
1684            msg = class_(_sample_message)
1685            msg_plain = mailbox.Message(msg)
1686            self._check_sample(msg_plain)
1687
1688    def test_x_from_bytes(self):
1689        # Convert all formats to Message
1690        for class_ in self.all_mailbox_types:
1691            msg = class_(_bytes_sample_message)
1692            self._check_sample(msg)
1693
1694    def test_x_to_invalid(self):
1695        # Convert all formats to an invalid format
1696        for class_ in self.all_mailbox_types:
1697            self.assertRaises(TypeError, lambda: class_(False))
1698
1699    def test_type_specific_attributes_removed_on_conversion(self):
1700        reference = {class_: class_(_sample_message).__dict__
1701                        for class_ in self.all_mailbox_types}
1702        for class1 in self.all_mailbox_types:
1703            for class2 in self.all_mailbox_types:
1704                if class1 is class2:
1705                    continue
1706                source = class1(_sample_message)
1707                target = class2(source)
1708                type_specific = [a for a in reference[class1]
1709                                   if a not in reference[class2]]
1710                for attr in type_specific:
1711                    self.assertNotIn(attr, target.__dict__,
1712                        "while converting {} to {}".format(class1, class2))
1713
1714    def test_maildir_to_maildir(self):
1715        # Convert MaildirMessage to MaildirMessage
1716        msg_maildir = mailbox.MaildirMessage(_sample_message)
1717        msg_maildir.set_flags('DFPRST')
1718        msg_maildir.set_subdir('cur')
1719        date = msg_maildir.get_date()
1720        msg = mailbox.MaildirMessage(msg_maildir)
1721        self._check_sample(msg)
1722        self.assertEqual(msg.get_flags(), 'DFPRST')
1723        self.assertEqual(msg.get_subdir(), 'cur')
1724        self.assertEqual(msg.get_date(), date)
1725
1726    def test_maildir_to_mboxmmdf(self):
1727        # Convert MaildirMessage to mboxmessage and MMDFMessage
1728        pairs = (('D', ''), ('F', 'F'), ('P', ''), ('R', 'A'), ('S', 'R'),
1729                 ('T', 'D'), ('DFPRST', 'RDFA'))
1730        for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1731            msg_maildir = mailbox.MaildirMessage(_sample_message)
1732            msg_maildir.set_date(0.0)
1733            for setting, result in pairs:
1734                msg_maildir.set_flags(setting)
1735                msg = class_(msg_maildir)
1736                self.assertEqual(msg.get_flags(), result)
1737                self.assertEqual(msg.get_from(), 'MAILER-DAEMON %s' %
1738                             time.asctime(time.gmtime(0.0)))
1739            msg_maildir.set_subdir('cur')
1740            self.assertEqual(class_(msg_maildir).get_flags(), 'RODFA')
1741
1742    def test_maildir_to_mh(self):
1743        # Convert MaildirMessage to MHMessage
1744        msg_maildir = mailbox.MaildirMessage(_sample_message)
1745        pairs = (('D', ['unseen']), ('F', ['unseen', 'flagged']),
1746                 ('P', ['unseen']), ('R', ['unseen', 'replied']), ('S', []),
1747                 ('T', ['unseen']), ('DFPRST', ['replied', 'flagged']))
1748        for setting, result in pairs:
1749            msg_maildir.set_flags(setting)
1750            self.assertEqual(mailbox.MHMessage(msg_maildir).get_sequences(),
1751                             result)
1752
1753    def test_maildir_to_babyl(self):
1754        # Convert MaildirMessage to Babyl
1755        msg_maildir = mailbox.MaildirMessage(_sample_message)
1756        pairs = (('D', ['unseen']), ('F', ['unseen']),
1757                 ('P', ['unseen', 'forwarded']), ('R', ['unseen', 'answered']),
1758                 ('S', []), ('T', ['unseen', 'deleted']),
1759                 ('DFPRST', ['deleted', 'answered', 'forwarded']))
1760        for setting, result in pairs:
1761            msg_maildir.set_flags(setting)
1762            self.assertEqual(mailbox.BabylMessage(msg_maildir).get_labels(),
1763                             result)
1764
1765    def test_mboxmmdf_to_maildir(self):
1766        # Convert mboxMessage and MMDFMessage to MaildirMessage
1767        for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1768            msg_mboxMMDF = class_(_sample_message)
1769            msg_mboxMMDF.set_from('foo@bar', time.gmtime(0.0))
1770            pairs = (('R', 'S'), ('O', ''), ('D', 'T'), ('F', 'F'), ('A', 'R'),
1771                     ('RODFA', 'FRST'))
1772            for setting, result in pairs:
1773                msg_mboxMMDF.set_flags(setting)
1774                msg = mailbox.MaildirMessage(msg_mboxMMDF)
1775                self.assertEqual(msg.get_flags(), result)
1776                self.assertEqual(msg.get_date(), 0.0)
1777            msg_mboxMMDF.set_flags('O')
1778            self.assertEqual(mailbox.MaildirMessage(msg_mboxMMDF).get_subdir(),
1779                             'cur')
1780
1781    def test_mboxmmdf_to_mboxmmdf(self):
1782        # Convert mboxMessage and MMDFMessage to mboxMessage and MMDFMessage
1783        for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1784            msg_mboxMMDF = class_(_sample_message)
1785            msg_mboxMMDF.set_flags('RODFA')
1786            msg_mboxMMDF.set_from('foo@bar')
1787            for class2_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1788                msg2 = class2_(msg_mboxMMDF)
1789                self.assertEqual(msg2.get_flags(), 'RODFA')
1790                self.assertEqual(msg2.get_from(), 'foo@bar')
1791
1792    def test_mboxmmdf_to_mh(self):
1793        # Convert mboxMessage and MMDFMessage to MHMessage
1794        for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1795            msg_mboxMMDF = class_(_sample_message)
1796            pairs = (('R', []), ('O', ['unseen']), ('D', ['unseen']),
1797                     ('F', ['unseen', 'flagged']),
1798                     ('A', ['unseen', 'replied']),
1799                     ('RODFA', ['replied', 'flagged']))
1800            for setting, result in pairs:
1801                msg_mboxMMDF.set_flags(setting)
1802                self.assertEqual(mailbox.MHMessage(msg_mboxMMDF).get_sequences(),
1803                                 result)
1804
1805    def test_mboxmmdf_to_babyl(self):
1806        # Convert mboxMessage and MMDFMessage to BabylMessage
1807        for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1808            msg = class_(_sample_message)
1809            pairs = (('R', []), ('O', ['unseen']),
1810                     ('D', ['unseen', 'deleted']), ('F', ['unseen']),
1811                     ('A', ['unseen', 'answered']),
1812                     ('RODFA', ['deleted', 'answered']))
1813            for setting, result in pairs:
1814                msg.set_flags(setting)
1815                self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result)
1816
1817    def test_mh_to_maildir(self):
1818        # Convert MHMessage to MaildirMessage
1819        pairs = (('unseen', ''), ('replied', 'RS'), ('flagged', 'FS'))
1820        for setting, result in pairs:
1821            msg = mailbox.MHMessage(_sample_message)
1822            msg.add_sequence(setting)
1823            self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result)
1824            self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
1825        msg = mailbox.MHMessage(_sample_message)
1826        msg.add_sequence('unseen')
1827        msg.add_sequence('replied')
1828        msg.add_sequence('flagged')
1829        self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'FR')
1830        self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
1831
1832    def test_mh_to_mboxmmdf(self):
1833        # Convert MHMessage to mboxMessage and MMDFMessage
1834        pairs = (('unseen', 'O'), ('replied', 'ROA'), ('flagged', 'ROF'))
1835        for setting, result in pairs:
1836            msg = mailbox.MHMessage(_sample_message)
1837            msg.add_sequence(setting)
1838            for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1839                self.assertEqual(class_(msg).get_flags(), result)
1840        msg = mailbox.MHMessage(_sample_message)
1841        msg.add_sequence('unseen')
1842        msg.add_sequence('replied')
1843        msg.add_sequence('flagged')
1844        for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1845            self.assertEqual(class_(msg).get_flags(), 'OFA')
1846
1847    def test_mh_to_mh(self):
1848        # Convert MHMessage to MHMessage
1849        msg = mailbox.MHMessage(_sample_message)
1850        msg.add_sequence('unseen')
1851        msg.add_sequence('replied')
1852        msg.add_sequence('flagged')
1853        self.assertEqual(mailbox.MHMessage(msg).get_sequences(),
1854                         ['unseen', 'replied', 'flagged'])
1855
1856    def test_mh_to_babyl(self):
1857        # Convert MHMessage to BabylMessage
1858        pairs = (('unseen', ['unseen']), ('replied', ['answered']),
1859                 ('flagged', []))
1860        for setting, result in pairs:
1861            msg = mailbox.MHMessage(_sample_message)
1862            msg.add_sequence(setting)
1863            self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result)
1864        msg = mailbox.MHMessage(_sample_message)
1865        msg.add_sequence('unseen')
1866        msg.add_sequence('replied')
1867        msg.add_sequence('flagged')
1868        self.assertEqual(mailbox.BabylMessage(msg).get_labels(),
1869                         ['unseen', 'answered'])
1870
1871    def test_babyl_to_maildir(self):
1872        # Convert BabylMessage to MaildirMessage
1873        pairs = (('unseen', ''), ('deleted', 'ST'), ('filed', 'S'),
1874                 ('answered', 'RS'), ('forwarded', 'PS'), ('edited', 'S'),
1875                 ('resent', 'PS'))
1876        for setting, result in pairs:
1877            msg = mailbox.BabylMessage(_sample_message)
1878            msg.add_label(setting)
1879            self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result)
1880            self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
1881        msg = mailbox.BabylMessage(_sample_message)
1882        for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
1883                      'edited', 'resent'):
1884            msg.add_label(label)
1885        self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'PRT')
1886        self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
1887
1888    def test_babyl_to_mboxmmdf(self):
1889        # Convert BabylMessage to mboxMessage and MMDFMessage
1890        pairs = (('unseen', 'O'), ('deleted', 'ROD'), ('filed', 'RO'),
1891                 ('answered', 'ROA'), ('forwarded', 'RO'), ('edited', 'RO'),
1892                 ('resent', 'RO'))
1893        for setting, result in pairs:
1894            for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1895                msg = mailbox.BabylMessage(_sample_message)
1896                msg.add_label(setting)
1897                self.assertEqual(class_(msg).get_flags(), result)
1898        msg = mailbox.BabylMessage(_sample_message)
1899        for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
1900                      'edited', 'resent'):
1901            msg.add_label(label)
1902        for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1903            self.assertEqual(class_(msg).get_flags(), 'ODA')
1904
1905    def test_babyl_to_mh(self):
1906        # Convert BabylMessage to MHMessage
1907        pairs = (('unseen', ['unseen']), ('deleted', []), ('filed', []),
1908                 ('answered', ['replied']), ('forwarded', []), ('edited', []),
1909                 ('resent', []))
1910        for setting, result in pairs:
1911            msg = mailbox.BabylMessage(_sample_message)
1912            msg.add_label(setting)
1913            self.assertEqual(mailbox.MHMessage(msg).get_sequences(), result)
1914        msg = mailbox.BabylMessage(_sample_message)
1915        for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
1916                      'edited', 'resent'):
1917            msg.add_label(label)
1918        self.assertEqual(mailbox.MHMessage(msg).get_sequences(),
1919                         ['unseen', 'replied'])
1920
1921    def test_babyl_to_babyl(self):
1922        # Convert BabylMessage to BabylMessage
1923        msg = mailbox.BabylMessage(_sample_message)
1924        msg.update_visible()
1925        for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
1926                      'edited', 'resent'):
1927            msg.add_label(label)
1928        msg2 = mailbox.BabylMessage(msg)
1929        self.assertEqual(msg2.get_labels(), ['unseen', 'deleted', 'filed',
1930                                             'answered', 'forwarded', 'edited',
1931                                             'resent'])
1932        self.assertEqual(msg.get_visible().keys(), msg2.get_visible().keys())
1933        for key in msg.get_visible().keys():
1934            self.assertEqual(msg.get_visible()[key], msg2.get_visible()[key])
1935
1936
1937class TestProxyFileBase(TestBase):
1938
1939    def _test_read(self, proxy):
1940        # Read by byte
1941        proxy.seek(0)
1942        self.assertEqual(proxy.read(), b'bar')
1943        proxy.seek(1)
1944        self.assertEqual(proxy.read(), b'ar')
1945        proxy.seek(0)
1946        self.assertEqual(proxy.read(2), b'ba')
1947        proxy.seek(1)
1948        self.assertEqual(proxy.read(-1), b'ar')
1949        proxy.seek(2)
1950        self.assertEqual(proxy.read(1000), b'r')
1951
1952    def _test_readline(self, proxy):
1953        # Read by line
1954        linesep = os.linesep.encode()
1955        proxy.seek(0)
1956        self.assertEqual(proxy.readline(), b'foo' + linesep)
1957        self.assertEqual(proxy.readline(), b'bar' + linesep)
1958        self.assertEqual(proxy.readline(), b'fred' + linesep)
1959        self.assertEqual(proxy.readline(), b'bob')
1960        proxy.seek(2)
1961        self.assertEqual(proxy.readline(), b'o' + linesep)
1962        proxy.seek(6 + 2 * len(os.linesep))
1963        self.assertEqual(proxy.readline(), b'fred' + linesep)
1964        proxy.seek(6 + 2 * len(os.linesep))
1965        self.assertEqual(proxy.readline(2), b'fr')
1966        self.assertEqual(proxy.readline(-10), b'ed' + linesep)
1967
1968    def _test_readlines(self, proxy):
1969        # Read multiple lines
1970        linesep = os.linesep.encode()
1971        proxy.seek(0)
1972        self.assertEqual(proxy.readlines(), [b'foo' + linesep,
1973                                           b'bar' + linesep,
1974                                           b'fred' + linesep, b'bob'])
1975        proxy.seek(0)
1976        self.assertEqual(proxy.readlines(2), [b'foo' + linesep])
1977        proxy.seek(3 + len(linesep))
1978        self.assertEqual(proxy.readlines(4 + len(linesep)),
1979                     [b'bar' + linesep, b'fred' + linesep])
1980        proxy.seek(3)
1981        self.assertEqual(proxy.readlines(1000), [linesep, b'bar' + linesep,
1982                                               b'fred' + linesep, b'bob'])
1983
1984    def _test_iteration(self, proxy):
1985        # Iterate by line
1986        linesep = os.linesep.encode()
1987        proxy.seek(0)
1988        iterator = iter(proxy)
1989        self.assertEqual(next(iterator), b'foo' + linesep)
1990        self.assertEqual(next(iterator), b'bar' + linesep)
1991        self.assertEqual(next(iterator), b'fred' + linesep)
1992        self.assertEqual(next(iterator), b'bob')
1993        self.assertRaises(StopIteration, next, iterator)
1994
1995    def _test_seek_and_tell(self, proxy):
1996        # Seek and use tell to check position
1997        linesep = os.linesep.encode()
1998        proxy.seek(3)
1999        self.assertEqual(proxy.tell(), 3)
2000        self.assertEqual(proxy.read(len(linesep)), linesep)
2001        proxy.seek(2, 1)
2002        self.assertEqual(proxy.read(1 + len(linesep)), b'r' + linesep)
2003        proxy.seek(-3 - len(linesep), 2)
2004        self.assertEqual(proxy.read(3), b'bar')
2005        proxy.seek(2, 0)
2006        self.assertEqual(proxy.read(), b'o' + linesep + b'bar' + linesep)
2007        proxy.seek(100)
2008        self.assertFalse(proxy.read())
2009
2010    def _test_close(self, proxy):
2011        # Close a file
2012        self.assertFalse(proxy.closed)
2013        proxy.close()
2014        self.assertTrue(proxy.closed)
2015        # Issue 11700 subsequent closes should be a no-op.
2016        proxy.close()
2017        self.assertTrue(proxy.closed)
2018
2019
2020class TestProxyFile(TestProxyFileBase, unittest.TestCase):
2021
2022    def setUp(self):
2023        self._path = os_helper.TESTFN
2024        self._file = open(self._path, 'wb+')
2025
2026    def tearDown(self):
2027        self._file.close()
2028        self._delete_recursively(self._path)
2029
2030    def test_initialize(self):
2031        # Initialize and check position
2032        self._file.write(b'foo')
2033        pos = self._file.tell()
2034        proxy0 = mailbox._ProxyFile(self._file)
2035        self.assertEqual(proxy0.tell(), pos)
2036        self.assertEqual(self._file.tell(), pos)
2037        proxy1 = mailbox._ProxyFile(self._file, 0)
2038        self.assertEqual(proxy1.tell(), 0)
2039        self.assertEqual(self._file.tell(), pos)
2040
2041    def test_read(self):
2042        self._file.write(b'bar')
2043        self._test_read(mailbox._ProxyFile(self._file))
2044
2045    def test_readline(self):
2046        self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
2047                                                  os.linesep), 'ascii'))
2048        self._test_readline(mailbox._ProxyFile(self._file))
2049
2050    def test_readlines(self):
2051        self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
2052                                                  os.linesep), 'ascii'))
2053        self._test_readlines(mailbox._ProxyFile(self._file))
2054
2055    def test_iteration(self):
2056        self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
2057                                                  os.linesep), 'ascii'))
2058        self._test_iteration(mailbox._ProxyFile(self._file))
2059
2060    def test_seek_and_tell(self):
2061        self._file.write(bytes('foo%sbar%s' % (os.linesep, os.linesep), 'ascii'))
2062        self._test_seek_and_tell(mailbox._ProxyFile(self._file))
2063
2064    def test_close(self):
2065        self._file.write(bytes('foo%sbar%s' % (os.linesep, os.linesep), 'ascii'))
2066        self._test_close(mailbox._ProxyFile(self._file))
2067
2068
2069class TestPartialFile(TestProxyFileBase, unittest.TestCase):
2070
2071    def setUp(self):
2072        self._path = os_helper.TESTFN
2073        self._file = open(self._path, 'wb+')
2074
2075    def tearDown(self):
2076        self._file.close()
2077        self._delete_recursively(self._path)
2078
2079    def test_initialize(self):
2080        # Initialize and check position
2081        self._file.write(bytes('foo' + os.linesep + 'bar', 'ascii'))
2082        pos = self._file.tell()
2083        proxy = mailbox._PartialFile(self._file, 2, 5)
2084        self.assertEqual(proxy.tell(), 0)
2085        self.assertEqual(self._file.tell(), pos)
2086
2087    def test_read(self):
2088        self._file.write(bytes('***bar***', 'ascii'))
2089        self._test_read(mailbox._PartialFile(self._file, 3, 6))
2090
2091    def test_readline(self):
2092        self._file.write(bytes('!!!!!foo%sbar%sfred%sbob!!!!!' %
2093                         (os.linesep, os.linesep, os.linesep), 'ascii'))
2094        self._test_readline(mailbox._PartialFile(self._file, 5,
2095                                                 18 + 3 * len(os.linesep)))
2096
2097    def test_readlines(self):
2098        self._file.write(bytes('foo%sbar%sfred%sbob?????' %
2099                         (os.linesep, os.linesep, os.linesep), 'ascii'))
2100        self._test_readlines(mailbox._PartialFile(self._file, 0,
2101                                                  13 + 3 * len(os.linesep)))
2102
2103    def test_iteration(self):
2104        self._file.write(bytes('____foo%sbar%sfred%sbob####' %
2105                         (os.linesep, os.linesep, os.linesep), 'ascii'))
2106        self._test_iteration(mailbox._PartialFile(self._file, 4,
2107                                                  17 + 3 * len(os.linesep)))
2108
2109    def test_seek_and_tell(self):
2110        self._file.write(bytes('(((foo%sbar%s$$$' % (os.linesep, os.linesep), 'ascii'))
2111        self._test_seek_and_tell(mailbox._PartialFile(self._file, 3,
2112                                                      9 + 2 * len(os.linesep)))
2113
2114    def test_close(self):
2115        self._file.write(bytes('&foo%sbar%s^' % (os.linesep, os.linesep), 'ascii'))
2116        self._test_close(mailbox._PartialFile(self._file, 1,
2117                                              6 + 3 * len(os.linesep)))
2118
2119
2120## Start: tests from the original module (for backward compatibility).
2121
2122FROM_ = "From some.body@dummy.domain  Sat Jul 24 13:43:35 2004\n"
2123DUMMY_MESSAGE = """\
2124From: some.body@dummy.domain
2125To: me@my.domain
2126Subject: Simple Test
2127
2128This is a dummy message.
2129"""
2130
2131class MaildirTestCase(unittest.TestCase):
2132
2133    def setUp(self):
2134        # create a new maildir mailbox to work with:
2135        self._dir = os_helper.TESTFN
2136        if os.path.isdir(self._dir):
2137            os_helper.rmtree(self._dir)
2138        elif os.path.isfile(self._dir):
2139            os_helper.unlink(self._dir)
2140        os.mkdir(self._dir)
2141        os.mkdir(os.path.join(self._dir, "cur"))
2142        os.mkdir(os.path.join(self._dir, "tmp"))
2143        os.mkdir(os.path.join(self._dir, "new"))
2144        self._counter = 1
2145        self._msgfiles = []
2146
2147    def tearDown(self):
2148        list(map(os.unlink, self._msgfiles))
2149        os_helper.rmdir(os.path.join(self._dir, "cur"))
2150        os_helper.rmdir(os.path.join(self._dir, "tmp"))
2151        os_helper.rmdir(os.path.join(self._dir, "new"))
2152        os_helper.rmdir(self._dir)
2153
2154    def createMessage(self, dir, mbox=False):
2155        t = int(time.time() % 1000000)
2156        pid = self._counter
2157        self._counter += 1
2158        filename = ".".join((str(t), str(pid), "myhostname", "mydomain"))
2159        tmpname = os.path.join(self._dir, "tmp", filename)
2160        newname = os.path.join(self._dir, dir, filename)
2161        with open(tmpname, "w", encoding="utf-8") as fp:
2162            self._msgfiles.append(tmpname)
2163            if mbox:
2164                fp.write(FROM_)
2165            fp.write(DUMMY_MESSAGE)
2166        try:
2167            os.link(tmpname, newname)
2168        except (AttributeError, PermissionError):
2169            with open(newname, "w") as fp:
2170                fp.write(DUMMY_MESSAGE)
2171        self._msgfiles.append(newname)
2172        return tmpname
2173
2174    def test_empty_maildir(self):
2175        """Test an empty maildir mailbox"""
2176        # Test for regression on bug #117490:
2177        # Make sure the boxes attribute actually gets set.
2178        self.mbox = mailbox.Maildir(os_helper.TESTFN)
2179        #self.assertTrue(hasattr(self.mbox, "boxes"))
2180        #self.assertEqual(len(self.mbox.boxes), 0)
2181        self.assertIsNone(self.mbox.next())
2182        self.assertIsNone(self.mbox.next())
2183
2184    def test_nonempty_maildir_cur(self):
2185        self.createMessage("cur")
2186        self.mbox = mailbox.Maildir(os_helper.TESTFN)
2187        #self.assertEqual(len(self.mbox.boxes), 1)
2188        self.assertIsNotNone(self.mbox.next())
2189        self.assertIsNone(self.mbox.next())
2190        self.assertIsNone(self.mbox.next())
2191
2192    def test_nonempty_maildir_new(self):
2193        self.createMessage("new")
2194        self.mbox = mailbox.Maildir(os_helper.TESTFN)
2195        #self.assertEqual(len(self.mbox.boxes), 1)
2196        self.assertIsNotNone(self.mbox.next())
2197        self.assertIsNone(self.mbox.next())
2198        self.assertIsNone(self.mbox.next())
2199
2200    def test_nonempty_maildir_both(self):
2201        self.createMessage("cur")
2202        self.createMessage("new")
2203        self.mbox = mailbox.Maildir(os_helper.TESTFN)
2204        #self.assertEqual(len(self.mbox.boxes), 2)
2205        self.assertIsNotNone(self.mbox.next())
2206        self.assertIsNotNone(self.mbox.next())
2207        self.assertIsNone(self.mbox.next())
2208        self.assertIsNone(self.mbox.next())
2209
2210## End: tests from the original module (for backward compatibility).
2211
2212
2213_sample_message = """\
2214Return-Path: <gkj@gregorykjohnson.com>
2215X-Original-To: gkj+person@localhost
2216Delivered-To: gkj+person@localhost
2217Received: from localhost (localhost [127.0.0.1])
2218        by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17
2219        for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)
2220Delivered-To: gkj@sundance.gregorykjohnson.com
2221Received: from localhost [127.0.0.1]
2222        by localhost with POP3 (fetchmail-6.2.5)
2223        for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)
2224Received: from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228])
2225        by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746
2226        for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)
2227Received: by andy.gregorykjohnson.com (Postfix, from userid 1000)
2228        id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)
2229Date: Wed, 13 Jul 2005 17:23:11 -0400
2230From: "Gregory K. Johnson" <gkj@gregorykjohnson.com>
2231To: gkj@gregorykjohnson.com
2232Subject: Sample message
2233Message-ID: <20050713212311.GC4701@andy.gregorykjohnson.com>
2234Mime-Version: 1.0
2235Content-Type: multipart/mixed; boundary="NMuMz9nt05w80d4+"
2236Content-Disposition: inline
2237User-Agent: Mutt/1.5.9i
2238
2239
2240--NMuMz9nt05w80d4+
2241Content-Type: text/plain; charset=us-ascii
2242Content-Disposition: inline
2243
2244This is a sample message.
2245
2246--
2247Gregory K. Johnson
2248
2249--NMuMz9nt05w80d4+
2250Content-Type: application/octet-stream
2251Content-Disposition: attachment; filename="text.gz"
2252Content-Transfer-Encoding: base64
2253
2254H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs
22553FYlAAAA
2256
2257--NMuMz9nt05w80d4+--
2258"""
2259
2260_bytes_sample_message = _sample_message.encode('ascii')
2261
2262_sample_headers = {
2263    "Return-Path":"<gkj@gregorykjohnson.com>",
2264    "X-Original-To":"gkj+person@localhost",
2265    "Delivered-To":"gkj+person@localhost",
2266    "Received":"""from localhost (localhost [127.0.0.1])
2267        by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17
2268        for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""",
2269    "Delivered-To":"gkj@sundance.gregorykjohnson.com",
2270    "Received":"""from localhost [127.0.0.1]
2271        by localhost with POP3 (fetchmail-6.2.5)
2272        for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""",
2273    "Received":"""from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228])
2274        by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746
2275        for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""",
2276    "Received":"""by andy.gregorykjohnson.com (Postfix, from userid 1000)
2277        id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""",
2278    "Date":"Wed, 13 Jul 2005 17:23:11 -0400",
2279    "From":""""Gregory K. Johnson" <gkj@gregorykjohnson.com>""",
2280    "To":"gkj@gregorykjohnson.com",
2281    "Subject":"Sample message",
2282    "Mime-Version":"1.0",
2283    "Content-Type":"""multipart/mixed; boundary="NMuMz9nt05w80d4+\"""",
2284    "Content-Disposition":"inline",
2285    "User-Agent": "Mutt/1.5.9i" }
2286
2287_sample_payloads = ("""This is a sample message.
2288
2289--
2290Gregory K. Johnson
2291""",
2292"""H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs
22933FYlAAAA
2294""")
2295
2296
2297class MiscTestCase(unittest.TestCase):
2298    def test__all__(self):
2299        support.check__all__(self, mailbox,
2300                             not_exported={"linesep", "fcntl"})
2301
2302
2303def tearDownModule():
2304    support.reap_children()
2305
2306
2307if __name__ == '__main__':
2308    unittest.main()
2309