1import textwrap
2import unittest
3import contextlib
4from email import policy
5from email import errors
6from test.test_email import TestEmailBase
7
8
9class TestDefectsBase:
10
11    policy = policy.default
12    raise_expected = False
13
14    @contextlib.contextmanager
15    def _raise_point(self, defect):
16        yield
17
18    def test_same_boundary_inner_outer(self):
19        source = textwrap.dedent("""\
20            Subject: XX
21            From: xx@xx.dk
22            To: XX
23            Mime-version: 1.0
24            Content-type: multipart/mixed;
25               boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
26
27            --MS_Mac_OE_3071477847_720252_MIME_Part
28            Content-type: multipart/alternative;
29               boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
30
31            --MS_Mac_OE_3071477847_720252_MIME_Part
32            Content-type: text/plain; charset="ISO-8859-1"
33            Content-transfer-encoding: quoted-printable
34
35            text
36
37            --MS_Mac_OE_3071477847_720252_MIME_Part
38            Content-type: text/html; charset="ISO-8859-1"
39            Content-transfer-encoding: quoted-printable
40
41            <HTML></HTML>
42
43            --MS_Mac_OE_3071477847_720252_MIME_Part--
44
45            --MS_Mac_OE_3071477847_720252_MIME_Part
46            Content-type: image/gif; name="xx.gif";
47            Content-disposition: attachment
48            Content-transfer-encoding: base64
49
50            Some removed base64 encoded chars.
51
52            --MS_Mac_OE_3071477847_720252_MIME_Part--
53
54            """)
55        # XXX better would be to actually detect the duplicate.
56        with self._raise_point(errors.StartBoundaryNotFoundDefect):
57            msg = self._str_msg(source)
58        if self.raise_expected: return
59        inner = msg.get_payload(0)
60        self.assertTrue(hasattr(inner, 'defects'))
61        self.assertEqual(len(self.get_defects(inner)), 1)
62        self.assertIsInstance(self.get_defects(inner)[0],
63                              errors.StartBoundaryNotFoundDefect)
64
65    def test_multipart_no_boundary(self):
66        source = textwrap.dedent("""\
67            Date: Fri, 6 Apr 2001 09:23:06 -0800 (GMT-0800)
68            From: foobar
69            Subject: broken mail
70            MIME-Version: 1.0
71            Content-Type: multipart/report; report-type=delivery-status;
72
73            --JAB03225.986577786/zinfandel.lacita.com
74
75            One part
76
77            --JAB03225.986577786/zinfandel.lacita.com
78            Content-Type: message/delivery-status
79
80            Header: Another part
81
82            --JAB03225.986577786/zinfandel.lacita.com--
83            """)
84        with self._raise_point(errors.NoBoundaryInMultipartDefect):
85            msg = self._str_msg(source)
86        if self.raise_expected: return
87        self.assertIsInstance(msg.get_payload(), str)
88        self.assertEqual(len(self.get_defects(msg)), 2)
89        self.assertIsInstance(self.get_defects(msg)[0],
90                              errors.NoBoundaryInMultipartDefect)
91        self.assertIsInstance(self.get_defects(msg)[1],
92                              errors.MultipartInvariantViolationDefect)
93
94    multipart_msg = textwrap.dedent("""\
95        Date: Wed, 14 Nov 2007 12:56:23 GMT
96        From: foo@bar.invalid
97        To: foo@bar.invalid
98        Subject: Content-Transfer-Encoding: base64 and multipart
99        MIME-Version: 1.0
100        Content-Type: multipart/mixed;
101            boundary="===============3344438784458119861=="{}
102
103        --===============3344438784458119861==
104        Content-Type: text/plain
105
106        Test message
107
108        --===============3344438784458119861==
109        Content-Type: application/octet-stream
110        Content-Transfer-Encoding: base64
111
112        YWJj
113
114        --===============3344438784458119861==--
115        """)
116
117    def test_multipart_invalid_cte(self):
118        with self._raise_point(
119                errors.InvalidMultipartContentTransferEncodingDefect):
120            msg = self._str_msg(
121                    self.multipart_msg.format(
122                        "\nContent-Transfer-Encoding: base64"))
123        if self.raise_expected: return
124        self.assertEqual(len(self.get_defects(msg)), 1)
125        self.assertIsInstance(self.get_defects(msg)[0],
126            errors.InvalidMultipartContentTransferEncodingDefect)
127
128    def test_multipart_no_cte_no_defect(self):
129        if self.raise_expected: return
130        msg = self._str_msg(self.multipart_msg.format(''))
131        self.assertEqual(len(self.get_defects(msg)), 0)
132
133    def test_multipart_valid_cte_no_defect(self):
134        if self.raise_expected: return
135        for cte in ('7bit', '8bit', 'BINary'):
136            msg = self._str_msg(
137                self.multipart_msg.format("\nContent-Transfer-Encoding: "+cte))
138            self.assertEqual(len(self.get_defects(msg)), 0, "cte="+cte)
139
140    def test_lying_multipart(self):
141        source = textwrap.dedent("""\
142            From: "Allison Dunlap" <xxx@example.com>
143            To: yyy@example.com
144            Subject: 64423
145            Date: Sun, 11 Jul 2004 16:09:27 -0300
146            MIME-Version: 1.0
147            Content-Type: multipart/alternative;
148
149            Blah blah blah
150            """)
151        with self._raise_point(errors.NoBoundaryInMultipartDefect):
152            msg = self._str_msg(source)
153        if self.raise_expected: return
154        self.assertTrue(hasattr(msg, 'defects'))
155        self.assertEqual(len(self.get_defects(msg)), 2)
156        self.assertIsInstance(self.get_defects(msg)[0],
157                              errors.NoBoundaryInMultipartDefect)
158        self.assertIsInstance(self.get_defects(msg)[1],
159                              errors.MultipartInvariantViolationDefect)
160
161    def test_missing_start_boundary(self):
162        source = textwrap.dedent("""\
163            Content-Type: multipart/mixed; boundary="AAA"
164            From: Mail Delivery Subsystem <xxx@example.com>
165            To: yyy@example.com
166
167            --AAA
168
169            Stuff
170
171            --AAA
172            Content-Type: message/rfc822
173
174            From: webmaster@python.org
175            To: zzz@example.com
176            Content-Type: multipart/mixed; boundary="BBB"
177
178            --BBB--
179
180            --AAA--
181
182            """)
183        # The message structure is:
184        #
185        # multipart/mixed
186        #    text/plain
187        #    message/rfc822
188        #        multipart/mixed [*]
189        #
190        # [*] This message is missing its start boundary
191        with self._raise_point(errors.StartBoundaryNotFoundDefect):
192            outer = self._str_msg(source)
193        if self.raise_expected: return
194        bad = outer.get_payload(1).get_payload(0)
195        self.assertEqual(len(self.get_defects(bad)), 1)
196        self.assertIsInstance(self.get_defects(bad)[0],
197                              errors.StartBoundaryNotFoundDefect)
198
199    def test_first_line_is_continuation_header(self):
200        with self._raise_point(errors.FirstHeaderLineIsContinuationDefect):
201            msg = self._str_msg(' Line 1\nSubject: test\n\nbody')
202        if self.raise_expected: return
203        self.assertEqual(msg.keys(), ['Subject'])
204        self.assertEqual(msg.get_payload(), 'body')
205        self.assertEqual(len(self.get_defects(msg)), 1)
206        self.assertDefectsEqual(self.get_defects(msg),
207                                 [errors.FirstHeaderLineIsContinuationDefect])
208        self.assertEqual(self.get_defects(msg)[0].line, ' Line 1\n')
209
210    def test_missing_header_body_separator(self):
211        # Our heuristic if we see a line that doesn't look like a header (no
212        # leading whitespace but no ':') is to assume that the blank line that
213        # separates the header from the body is missing, and to stop parsing
214        # headers and start parsing the body.
215        with self._raise_point(errors.MissingHeaderBodySeparatorDefect):
216            msg = self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n')
217        if self.raise_expected: return
218        self.assertEqual(msg.keys(), ['Subject'])
219        self.assertEqual(msg.get_payload(), 'not a header\nTo: abc\n\nb\n')
220        self.assertDefectsEqual(self.get_defects(msg),
221                                [errors.MissingHeaderBodySeparatorDefect])
222
223    def test_bad_padding_in_base64_payload(self):
224        source = textwrap.dedent("""\
225            Subject: test
226            MIME-Version: 1.0
227            Content-Type: text/plain; charset="utf-8"
228            Content-Transfer-Encoding: base64
229
230            dmk
231            """)
232        msg = self._str_msg(source)
233        with self._raise_point(errors.InvalidBase64PaddingDefect):
234            payload = msg.get_payload(decode=True)
235        if self.raise_expected: return
236        self.assertEqual(payload, b'vi')
237        self.assertDefectsEqual(self.get_defects(msg),
238                                [errors.InvalidBase64PaddingDefect])
239
240    def test_invalid_chars_in_base64_payload(self):
241        source = textwrap.dedent("""\
242            Subject: test
243            MIME-Version: 1.0
244            Content-Type: text/plain; charset="utf-8"
245            Content-Transfer-Encoding: base64
246
247            dm\x01k===
248            """)
249        msg = self._str_msg(source)
250        with self._raise_point(errors.InvalidBase64CharactersDefect):
251            payload = msg.get_payload(decode=True)
252        if self.raise_expected: return
253        self.assertEqual(payload, b'vi')
254        self.assertDefectsEqual(self.get_defects(msg),
255                                [errors.InvalidBase64CharactersDefect])
256
257    def test_invalid_length_of_base64_payload(self):
258        source = textwrap.dedent("""\
259            Subject: test
260            MIME-Version: 1.0
261            Content-Type: text/plain; charset="utf-8"
262            Content-Transfer-Encoding: base64
263
264            abcde
265            """)
266        msg = self._str_msg(source)
267        with self._raise_point(errors.InvalidBase64LengthDefect):
268            payload = msg.get_payload(decode=True)
269        if self.raise_expected: return
270        self.assertEqual(payload, b'abcde')
271        self.assertDefectsEqual(self.get_defects(msg),
272                                [errors.InvalidBase64LengthDefect])
273
274    def test_missing_ending_boundary(self):
275        source = textwrap.dedent("""\
276            To: 1@harrydomain4.com
277            Subject: Fwd: 1
278            MIME-Version: 1.0
279            Content-Type: multipart/alternative;
280             boundary="------------000101020201080900040301"
281
282            --------------000101020201080900040301
283            Content-Type: text/plain; charset=ISO-8859-1
284            Content-Transfer-Encoding: 7bit
285
286            Alternative 1
287
288            --------------000101020201080900040301
289            Content-Type: text/html; charset=ISO-8859-1
290            Content-Transfer-Encoding: 7bit
291
292            Alternative 2
293
294            """)
295        with self._raise_point(errors.CloseBoundaryNotFoundDefect):
296            msg = self._str_msg(source)
297        if self.raise_expected: return
298        self.assertEqual(len(msg.get_payload()), 2)
299        self.assertEqual(msg.get_payload(1).get_payload(), 'Alternative 2\n')
300        self.assertDefectsEqual(self.get_defects(msg),
301                                [errors.CloseBoundaryNotFoundDefect])
302
303
304class TestDefectDetection(TestDefectsBase, TestEmailBase):
305
306    def get_defects(self, obj):
307        return obj.defects
308
309
310class TestDefectCapture(TestDefectsBase, TestEmailBase):
311
312    class CapturePolicy(policy.EmailPolicy):
313        captured = None
314        def register_defect(self, obj, defect):
315            self.captured.append(defect)
316
317    def setUp(self):
318        self.policy = self.CapturePolicy(captured=list())
319
320    def get_defects(self, obj):
321        return self.policy.captured
322
323
324class TestDefectRaising(TestDefectsBase, TestEmailBase):
325
326    policy = TestDefectsBase.policy
327    policy = policy.clone(raise_on_defect=True)
328    raise_expected = True
329
330    @contextlib.contextmanager
331    def _raise_point(self, defect):
332        with self.assertRaises(defect):
333            yield
334
335
336if __name__ == '__main__':
337    unittest.main()
338