1import unittest
2from test.test_email import TestEmailBase, parameterize
3import textwrap
4from email import policy
5from email.message import EmailMessage
6from email.contentmanager import ContentManager, raw_data_manager
7
8
9@parameterize
10class TestContentManager(TestEmailBase):
11
12    policy = policy.default
13    message = EmailMessage
14
15    get_key_params = {
16        'full_type':        (1, 'text/plain',),
17        'maintype_only':    (2, 'text',),
18        'null_key':         (3, '',),
19        }
20
21    def get_key_as_get_content_key(self, order, key):
22        def foo_getter(msg, foo=None):
23            bar = msg['X-Bar-Header']
24            return foo, bar
25        cm = ContentManager()
26        cm.add_get_handler(key, foo_getter)
27        m = self._make_message()
28        m['Content-Type'] = 'text/plain'
29        m['X-Bar-Header'] = 'foo'
30        self.assertEqual(cm.get_content(m, foo='bar'), ('bar', 'foo'))
31
32    def get_key_as_get_content_key_order(self, order, key):
33        def bar_getter(msg):
34            return msg['X-Bar-Header']
35        def foo_getter(msg):
36            return msg['X-Foo-Header']
37        cm = ContentManager()
38        cm.add_get_handler(key, foo_getter)
39        for precedence, key in self.get_key_params.values():
40            if precedence > order:
41                cm.add_get_handler(key, bar_getter)
42        m = self._make_message()
43        m['Content-Type'] = 'text/plain'
44        m['X-Bar-Header'] = 'bar'
45        m['X-Foo-Header'] = 'foo'
46        self.assertEqual(cm.get_content(m), ('foo'))
47
48    def test_get_content_raises_if_unknown_mimetype_and_no_default(self):
49        cm = ContentManager()
50        m = self._make_message()
51        m['Content-Type'] = 'text/plain'
52        with self.assertRaisesRegex(KeyError, 'text/plain'):
53            cm.get_content(m)
54
55    class BaseThing(str):
56        pass
57    baseobject_full_path = __name__ + '.' + 'TestContentManager.BaseThing'
58    class Thing(BaseThing):
59        pass
60    testobject_full_path = __name__ + '.' + 'TestContentManager.Thing'
61
62    set_key_params = {
63        'type':             (0,  Thing,),
64        'full_path':        (1,  testobject_full_path,),
65        'qualname':         (2,  'TestContentManager.Thing',),
66        'name':             (3,  'Thing',),
67        'base_type':        (4,  BaseThing,),
68        'base_full_path':   (5,  baseobject_full_path,),
69        'base_qualname':    (6,  'TestContentManager.BaseThing',),
70        'base_name':        (7,  'BaseThing',),
71        'str_type':         (8,  str,),
72        'str_full_path':    (9,  'builtins.str',),
73        'str_name':         (10, 'str',),   # str name and qualname are the same
74        'null_key':         (11, None,),
75        }
76
77    def set_key_as_set_content_key(self, order, key):
78        def foo_setter(msg, obj, foo=None):
79            msg['X-Foo-Header'] = foo
80            msg.set_payload(obj)
81        cm = ContentManager()
82        cm.add_set_handler(key, foo_setter)
83        m = self._make_message()
84        msg_obj = self.Thing()
85        cm.set_content(m, msg_obj, foo='bar')
86        self.assertEqual(m['X-Foo-Header'], 'bar')
87        self.assertEqual(m.get_payload(), msg_obj)
88
89    def set_key_as_set_content_key_order(self, order, key):
90        def foo_setter(msg, obj):
91            msg['X-FooBar-Header'] = 'foo'
92            msg.set_payload(obj)
93        def bar_setter(msg, obj):
94            msg['X-FooBar-Header'] = 'bar'
95        cm = ContentManager()
96        cm.add_set_handler(key, foo_setter)
97        for precedence, key in self.get_key_params.values():
98            if precedence > order:
99                cm.add_set_handler(key, bar_setter)
100        m = self._make_message()
101        msg_obj = self.Thing()
102        cm.set_content(m, msg_obj)
103        self.assertEqual(m['X-FooBar-Header'], 'foo')
104        self.assertEqual(m.get_payload(), msg_obj)
105
106    def test_set_content_raises_if_unknown_type_and_no_default(self):
107        cm = ContentManager()
108        m = self._make_message()
109        msg_obj = self.Thing()
110        with self.assertRaisesRegex(KeyError, self.testobject_full_path):
111            cm.set_content(m, msg_obj)
112
113    def test_set_content_raises_if_called_on_multipart(self):
114        cm = ContentManager()
115        m = self._make_message()
116        m['Content-Type'] = 'multipart/foo'
117        with self.assertRaises(TypeError):
118            cm.set_content(m, 'test')
119
120    def test_set_content_calls_clear_content(self):
121        m = self._make_message()
122        m['Content-Foo'] = 'bar'
123        m['Content-Type'] = 'text/html'
124        m['To'] = 'test'
125        m.set_payload('abc')
126        cm = ContentManager()
127        cm.add_set_handler(str, lambda *args, **kw: None)
128        m.set_content('xyz', content_manager=cm)
129        self.assertIsNone(m['Content-Foo'])
130        self.assertIsNone(m['Content-Type'])
131        self.assertEqual(m['To'], 'test')
132        self.assertIsNone(m.get_payload())
133
134
135@parameterize
136class TestRawDataManager(TestEmailBase):
137    # Note: these tests are dependent on the order in which headers are added
138    # to the message objects by the code.  There's no defined ordering in
139    # RFC5322/MIME, so this makes the tests more fragile than the standards
140    # require.  However, if the header order changes it is best to understand
141    # *why*, and make sure it isn't a subtle bug in whatever change was
142    # applied.
143
144    policy = policy.default.clone(max_line_length=60,
145                                  content_manager=raw_data_manager)
146    message = EmailMessage
147
148    def test_get_text_plain(self):
149        m = self._str_msg(textwrap.dedent("""\
150            Content-Type: text/plain
151
152            Basic text.
153            """))
154        self.assertEqual(raw_data_manager.get_content(m), "Basic text.\n")
155
156    def test_get_text_html(self):
157        m = self._str_msg(textwrap.dedent("""\
158            Content-Type: text/html
159
160            <p>Basic text.</p>
161            """))
162        self.assertEqual(raw_data_manager.get_content(m),
163                         "<p>Basic text.</p>\n")
164
165    def test_get_text_plain_latin1(self):
166        m = self._bytes_msg(textwrap.dedent("""\
167            Content-Type: text/plain; charset=latin1
168
169            Basìc tëxt.
170            """).encode('latin1'))
171        self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n")
172
173    def test_get_text_plain_latin1_quoted_printable(self):
174        m = self._str_msg(textwrap.dedent("""\
175            Content-Type: text/plain; charset="latin-1"
176            Content-Transfer-Encoding: quoted-printable
177
178            Bas=ECc t=EBxt.
179            """))
180        self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n")
181
182    def test_get_text_plain_utf8_base64(self):
183        m = self._str_msg(textwrap.dedent("""\
184            Content-Type: text/plain; charset="utf8"
185            Content-Transfer-Encoding: base64
186
187            QmFzw6xjIHTDq3h0Lgo=
188            """))
189        self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n")
190
191    def test_get_text_plain_bad_utf8_quoted_printable(self):
192        m = self._str_msg(textwrap.dedent("""\
193            Content-Type: text/plain; charset="utf8"
194            Content-Transfer-Encoding: quoted-printable
195
196            Bas=c3=acc t=c3=abxt=fd.
197            """))
198        self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt�.\n")
199
200    def test_get_text_plain_bad_utf8_quoted_printable_ignore_errors(self):
201        m = self._str_msg(textwrap.dedent("""\
202            Content-Type: text/plain; charset="utf8"
203            Content-Transfer-Encoding: quoted-printable
204
205            Bas=c3=acc t=c3=abxt=fd.
206            """))
207        self.assertEqual(raw_data_manager.get_content(m, errors='ignore'),
208                         "Basìc tëxt.\n")
209
210    def test_get_text_plain_utf8_base64_recoverable_bad_CTE_data(self):
211        m = self._str_msg(textwrap.dedent("""\
212            Content-Type: text/plain; charset="utf8"
213            Content-Transfer-Encoding: base64
214
215            QmFzw6xjIHTDq3h0Lgo\xFF=
216            """))
217        self.assertEqual(raw_data_manager.get_content(m, errors='ignore'),
218                         "Basìc tëxt.\n")
219
220    def test_get_text_invalid_keyword(self):
221        m = self._str_msg(textwrap.dedent("""\
222            Content-Type: text/plain
223
224            Basic text.
225            """))
226        with self.assertRaises(TypeError):
227            raw_data_manager.get_content(m, foo='ignore')
228
229    def test_get_non_text(self):
230        template = textwrap.dedent("""\
231            Content-Type: {}
232            Content-Transfer-Encoding: base64
233
234            Ym9ndXMgZGF0YQ==
235            """)
236        for maintype in 'audio image video application'.split():
237            with self.subTest(maintype=maintype):
238                m = self._str_msg(template.format(maintype+'/foo'))
239                self.assertEqual(raw_data_manager.get_content(m), b"bogus data")
240
241    def test_get_non_text_invalid_keyword(self):
242        m = self._str_msg(textwrap.dedent("""\
243            Content-Type: image/jpg
244            Content-Transfer-Encoding: base64
245
246            Ym9ndXMgZGF0YQ==
247            """))
248        with self.assertRaises(TypeError):
249            raw_data_manager.get_content(m, errors='ignore')
250
251    def test_get_raises_on_multipart(self):
252        m = self._str_msg(textwrap.dedent("""\
253            Content-Type: multipart/mixed; boundary="==="
254
255            --===
256            --===--
257            """))
258        with self.assertRaises(KeyError):
259            raw_data_manager.get_content(m)
260
261    def test_get_message_rfc822_and_external_body(self):
262        template = textwrap.dedent("""\
263            Content-Type: message/{}
264
265            To: foo@example.com
266            From: bar@example.com
267            Subject: example
268
269            an example message
270            """)
271        for subtype in 'rfc822 external-body'.split():
272            with self.subTest(subtype=subtype):
273                m = self._str_msg(template.format(subtype))
274                sub_msg = raw_data_manager.get_content(m)
275                self.assertIsInstance(sub_msg, self.message)
276                self.assertEqual(raw_data_manager.get_content(sub_msg),
277                                 "an example message\n")
278                self.assertEqual(sub_msg['to'], 'foo@example.com')
279                self.assertEqual(sub_msg['from'].addresses[0].username, 'bar')
280
281    def test_get_message_non_rfc822_or_external_body_yields_bytes(self):
282        m = self._str_msg(textwrap.dedent("""\
283            Content-Type: message/partial
284
285            To: foo@example.com
286            From: bar@example.com
287            Subject: example
288
289            The real body is in another message.
290            """))
291        self.assertEqual(raw_data_manager.get_content(m)[:10], b'To: foo@ex')
292
293    def test_set_text_plain(self):
294        m = self._make_message()
295        content = "Simple message.\n"
296        raw_data_manager.set_content(m, content)
297        self.assertEqual(str(m), textwrap.dedent("""\
298            Content-Type: text/plain; charset="utf-8"
299            Content-Transfer-Encoding: 7bit
300
301            Simple message.
302            """))
303        self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
304        self.assertEqual(m.get_content(), content)
305
306    def test_set_text_plain_null(self):
307        m = self._make_message()
308        content = ''
309        raw_data_manager.set_content(m, content)
310        self.assertEqual(str(m), textwrap.dedent("""\
311            Content-Type: text/plain; charset="utf-8"
312            Content-Transfer-Encoding: 7bit
313
314
315            """))
316        self.assertEqual(m.get_payload(decode=True).decode('utf-8'), '\n')
317        self.assertEqual(m.get_content(), '\n')
318
319    def test_set_text_html(self):
320        m = self._make_message()
321        content = "<p>Simple message.</p>\n"
322        raw_data_manager.set_content(m, content, subtype='html')
323        self.assertEqual(str(m), textwrap.dedent("""\
324            Content-Type: text/html; charset="utf-8"
325            Content-Transfer-Encoding: 7bit
326
327            <p>Simple message.</p>
328            """))
329        self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
330        self.assertEqual(m.get_content(), content)
331
332    def test_set_text_charset_latin_1(self):
333        m = self._make_message()
334        content = "Simple message.\n"
335        raw_data_manager.set_content(m, content, charset='latin-1')
336        self.assertEqual(str(m), textwrap.dedent("""\
337            Content-Type: text/plain; charset="iso-8859-1"
338            Content-Transfer-Encoding: 7bit
339
340            Simple message.
341            """))
342        self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
343        self.assertEqual(m.get_content(), content)
344
345    def test_set_text_plain_long_line_heuristics(self):
346        m = self._make_message()
347        content = ("Simple but long message that is over 78 characters"
348                   " long to force transfer encoding.\n")
349        raw_data_manager.set_content(m, content)
350        self.assertEqual(str(m), textwrap.dedent("""\
351            Content-Type: text/plain; charset="utf-8"
352            Content-Transfer-Encoding: quoted-printable
353
354            Simple but long message that is over 78 characters long to =
355            force transfer encoding.
356            """))
357        self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
358        self.assertEqual(m.get_content(), content)
359
360    def test_set_text_short_line_minimal_non_ascii_heuristics(self):
361        m = self._make_message()
362        content = "et là il est monté sur moi et il commence à m'éto.\n"
363        raw_data_manager.set_content(m, content)
364        self.assertEqual(bytes(m), textwrap.dedent("""\
365            Content-Type: text/plain; charset="utf-8"
366            Content-Transfer-Encoding: 8bit
367
368            et là il est monté sur moi et il commence à m'éto.
369            """).encode('utf-8'))
370        self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
371        self.assertEqual(m.get_content(), content)
372
373    def test_set_text_long_line_minimal_non_ascii_heuristics(self):
374        m = self._make_message()
375        content = ("j'ai un problème de python. il est sorti de son"
376                   " vivarium.  et là il est monté sur moi et il commence"
377                   " à m'éto.\n")
378        raw_data_manager.set_content(m, content)
379        self.assertEqual(bytes(m), textwrap.dedent("""\
380            Content-Type: text/plain; charset="utf-8"
381            Content-Transfer-Encoding: quoted-printable
382
383            j'ai un probl=C3=A8me de python. il est sorti de son vivari=
384            um.  et l=C3=A0 il est mont=C3=A9 sur moi et il commence =
385            =C3=A0 m'=C3=A9to.
386            """).encode('utf-8'))
387        self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
388        self.assertEqual(m.get_content(), content)
389
390    def test_set_text_11_lines_long_line_minimal_non_ascii_heuristics(self):
391        m = self._make_message()
392        content = '\n'*10 + (
393                  "j'ai un problème de python. il est sorti de son"
394                  " vivarium.  et là il est monté sur moi et il commence"
395                  " à m'éto.\n")
396        raw_data_manager.set_content(m, content)
397        self.assertEqual(bytes(m), textwrap.dedent("""\
398            Content-Type: text/plain; charset="utf-8"
399            Content-Transfer-Encoding: quoted-printable
400            """ + '\n'*10 + """
401            j'ai un probl=C3=A8me de python. il est sorti de son vivari=
402            um.  et l=C3=A0 il est mont=C3=A9 sur moi et il commence =
403            =C3=A0 m'=C3=A9to.
404            """).encode('utf-8'))
405        self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
406        self.assertEqual(m.get_content(), content)
407
408    def test_set_text_maximal_non_ascii_heuristics(self):
409        m = self._make_message()
410        content = "áàäéèęöő.\n"
411        raw_data_manager.set_content(m, content)
412        self.assertEqual(bytes(m), textwrap.dedent("""\
413            Content-Type: text/plain; charset="utf-8"
414            Content-Transfer-Encoding: 8bit
415
416            áàäéèęöő.
417            """).encode('utf-8'))
418        self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
419        self.assertEqual(m.get_content(), content)
420
421    def test_set_text_11_lines_maximal_non_ascii_heuristics(self):
422        m = self._make_message()
423        content = '\n'*10 + "áàäéèęöő.\n"
424        raw_data_manager.set_content(m, content)
425        self.assertEqual(bytes(m), textwrap.dedent("""\
426            Content-Type: text/plain; charset="utf-8"
427            Content-Transfer-Encoding: 8bit
428            """ + '\n'*10 + """
429            áàäéèęöő.
430            """).encode('utf-8'))
431        self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
432        self.assertEqual(m.get_content(), content)
433
434    def test_set_text_long_line_maximal_non_ascii_heuristics(self):
435        m = self._make_message()
436        content = ("áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
437                   "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
438                   "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n")
439        raw_data_manager.set_content(m, content)
440        self.assertEqual(bytes(m), textwrap.dedent("""\
441            Content-Type: text/plain; charset="utf-8"
442            Content-Transfer-Encoding: base64
443
444            w6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TDqcOoxJnD
445            tsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TDqcOo
446            xJnDtsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TD
447            qcOoxJnDtsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOg
448            w6TDqcOoxJnDtsWRLgo=
449            """).encode('utf-8'))
450        self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
451        self.assertEqual(m.get_content(), content)
452
453    def test_set_text_11_lines_long_line_maximal_non_ascii_heuristics(self):
454        # Yes, it chooses "wrong" here.  It's a heuristic.  So this result
455        # could change if we come up with a better heuristic.
456        m = self._make_message()
457        content = ('\n'*10 +
458                   "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
459                   "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
460                   "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n")
461        raw_data_manager.set_content(m, "\n"*10 +
462                                        "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
463                                        "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
464                                        "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n")
465        self.assertEqual(bytes(m), textwrap.dedent("""\
466            Content-Type: text/plain; charset="utf-8"
467            Content-Transfer-Encoding: quoted-printable
468            """ + '\n'*10 + """
469            =C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=
470            =A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=
471            =C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=
472            =A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=
473            =C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=
474            =91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=
475            =C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=
476            =A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=
477            =C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=
478            =99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=
479            =C5=91.
480            """).encode('utf-8'))
481        self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
482        self.assertEqual(m.get_content(), content)
483
484    def test_set_text_non_ascii_with_cte_7bit_raises(self):
485        m = self._make_message()
486        with self.assertRaises(UnicodeError):
487            raw_data_manager.set_content(m,"áàäéèęöő.\n", cte='7bit')
488
489    def test_set_text_non_ascii_with_charset_ascii_raises(self):
490        m = self._make_message()
491        with self.assertRaises(UnicodeError):
492            raw_data_manager.set_content(m,"áàäéèęöő.\n", charset='ascii')
493
494    def test_set_text_non_ascii_with_cte_7bit_and_charset_ascii_raises(self):
495        m = self._make_message()
496        with self.assertRaises(UnicodeError):
497            raw_data_manager.set_content(m,"áàäéèęöő.\n", cte='7bit', charset='ascii')
498
499    def test_set_message(self):
500        m = self._make_message()
501        m['Subject'] = "Forwarded message"
502        content = self._make_message()
503        content['To'] = 'python@vivarium.org'
504        content['From'] = 'police@monty.org'
505        content['Subject'] = "get back in your box"
506        content.set_content("Or face the comfy chair.")
507        raw_data_manager.set_content(m, content)
508        self.assertEqual(str(m), textwrap.dedent("""\
509            Subject: Forwarded message
510            Content-Type: message/rfc822
511            Content-Transfer-Encoding: 8bit
512
513            To: python@vivarium.org
514            From: police@monty.org
515            Subject: get back in your box
516            Content-Type: text/plain; charset="utf-8"
517            Content-Transfer-Encoding: 7bit
518            MIME-Version: 1.0
519
520            Or face the comfy chair.
521            """))
522        payload = m.get_payload(0)
523        self.assertIsInstance(payload, self.message)
524        self.assertEqual(str(payload), str(content))
525        self.assertIsInstance(m.get_content(), self.message)
526        self.assertEqual(str(m.get_content()), str(content))
527
528    def test_set_message_with_non_ascii_and_coercion_to_7bit(self):
529        m = self._make_message()
530        m['Subject'] = "Escape report"
531        content = self._make_message()
532        content['To'] = 'police@monty.org'
533        content['From'] = 'victim@monty.org'
534        content['Subject'] = "Help"
535        content.set_content("j'ai un problème de python. il est sorti de son"
536                            " vivarium.")
537        raw_data_manager.set_content(m, content)
538        self.assertEqual(bytes(m), textwrap.dedent("""\
539            Subject: Escape report
540            Content-Type: message/rfc822
541            Content-Transfer-Encoding: 8bit
542
543            To: police@monty.org
544            From: victim@monty.org
545            Subject: Help
546            Content-Type: text/plain; charset="utf-8"
547            Content-Transfer-Encoding: 8bit
548            MIME-Version: 1.0
549
550            j'ai un problème de python. il est sorti de son vivarium.
551            """).encode('utf-8'))
552        # The choice of base64 for the body encoding is because generator
553        # doesn't bother with heuristics and uses it unconditionally for utf-8
554        # text.
555        # XXX: the first cte should be 7bit, too...that's a generator bug.
556        # XXX: the line length in the body also looks like a generator bug.
557        self.assertEqual(m.as_string(maxheaderlen=self.policy.max_line_length),
558                         textwrap.dedent("""\
559            Subject: Escape report
560            Content-Type: message/rfc822
561            Content-Transfer-Encoding: 8bit
562
563            To: police@monty.org
564            From: victim@monty.org
565            Subject: Help
566            Content-Type: text/plain; charset="utf-8"
567            Content-Transfer-Encoding: base64
568            MIME-Version: 1.0
569
570            aidhaSB1biBwcm9ibMOobWUgZGUgcHl0aG9uLiBpbCBlc3Qgc29ydGkgZGUgc29uIHZpdmFyaXVt
571            Lgo=
572            """))
573        self.assertIsInstance(m.get_content(), self.message)
574        self.assertEqual(str(m.get_content()), str(content))
575
576    def test_set_message_invalid_cte_raises(self):
577        m = self._make_message()
578        content = self._make_message()
579        for cte in 'quoted-printable base64'.split():
580            for subtype in 'rfc822 external-body'.split():
581                with self.subTest(cte=cte, subtype=subtype):
582                    with self.assertRaises(ValueError) as ar:
583                        m.set_content(content, subtype, cte=cte)
584                    exc = str(ar.exception)
585                    self.assertIn(cte, exc)
586                    self.assertIn(subtype, exc)
587        subtype = 'external-body'
588        for cte in '8bit binary'.split():
589            with self.subTest(cte=cte, subtype=subtype):
590                with self.assertRaises(ValueError) as ar:
591                    m.set_content(content, subtype, cte=cte)
592                exc = str(ar.exception)
593                self.assertIn(cte, exc)
594                self.assertIn(subtype, exc)
595
596    def test_set_image_jpg(self):
597        for content in (b"bogus content",
598                        bytearray(b"bogus content"),
599                        memoryview(b"bogus content")):
600            with self.subTest(content=content):
601                m = self._make_message()
602                raw_data_manager.set_content(m, content, 'image', 'jpeg')
603                self.assertEqual(str(m), textwrap.dedent("""\
604                    Content-Type: image/jpeg
605                    Content-Transfer-Encoding: base64
606
607                    Ym9ndXMgY29udGVudA==
608                    """))
609                self.assertEqual(m.get_payload(decode=True), content)
610                self.assertEqual(m.get_content(), content)
611
612    def test_set_audio_aif_with_quoted_printable_cte(self):
613        # Why you would use qp, I don't know, but it is technically supported.
614        # XXX: the incorrect line length is because binascii.b2a_qp doesn't
615        # support a line length parameter, but we must use it to get newline
616        # encoding.
617        # XXX: what about that lack of tailing newline?  Do we actually handle
618        # that correctly in all cases?  That is, if the *source* has an
619        # unencoded newline, do we add an extra newline to the returned payload
620        # or not?  And can that actually be disambiguated based on the RFC?
621        m = self._make_message()
622        content = b'b\xFFgus\tcon\nt\rent ' + b'z'*100
623        m.set_content(content, 'audio', 'aif', cte='quoted-printable')
624        self.assertEqual(bytes(m), textwrap.dedent("""\
625            Content-Type: audio/aif
626            Content-Transfer-Encoding: quoted-printable
627            MIME-Version: 1.0
628
629            b=FFgus=09con=0At=0Dent=20zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz=
630            zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz""").encode('latin-1'))
631        self.assertEqual(m.get_payload(decode=True), content)
632        self.assertEqual(m.get_content(), content)
633
634    def test_set_video_mpeg_with_binary_cte(self):
635        m = self._make_message()
636        content = b'b\xFFgus\tcon\nt\rent ' + b'z'*100
637        m.set_content(content, 'video', 'mpeg', cte='binary')
638        self.assertEqual(bytes(m), textwrap.dedent("""\
639            Content-Type: video/mpeg
640            Content-Transfer-Encoding: binary
641            MIME-Version: 1.0
642
643            """).encode('ascii') +
644            # XXX: the second \n ought to be a \r, but generator gets it wrong.
645            # THIS MEANS WE DON'T ACTUALLY SUPPORT THE 'binary' CTE.
646            b'b\xFFgus\tcon\nt\nent zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz' +
647            b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz')
648        self.assertEqual(m.get_payload(decode=True), content)
649        self.assertEqual(m.get_content(), content)
650
651    def test_set_application_octet_stream_with_8bit_cte(self):
652        # In 8bit mode, universal line end logic applies.  It is up to the
653        # application to make sure the lines are short enough; we don't check.
654        m = self._make_message()
655        content = b'b\xFFgus\tcon\nt\rent\n' + b'z'*60 + b'\n'
656        m.set_content(content, 'application', 'octet-stream', cte='8bit')
657        self.assertEqual(bytes(m), textwrap.dedent("""\
658            Content-Type: application/octet-stream
659            Content-Transfer-Encoding: 8bit
660            MIME-Version: 1.0
661
662            """).encode('ascii') +
663            b'b\xFFgus\tcon\nt\nent\n' +
664            b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz\n')
665        self.assertEqual(m.get_payload(decode=True), content)
666        self.assertEqual(m.get_content(), content)
667
668    def test_set_headers_from_header_objects(self):
669        m = self._make_message()
670        content = "Simple message.\n"
671        header_factory = self.policy.header_factory
672        raw_data_manager.set_content(m, content, headers=(
673            header_factory("To", "foo@example.com"),
674            header_factory("From", "foo@example.com"),
675            header_factory("Subject", "I'm talking to myself.")))
676        self.assertEqual(str(m), textwrap.dedent("""\
677            Content-Type: text/plain; charset="utf-8"
678            To: foo@example.com
679            From: foo@example.com
680            Subject: I'm talking to myself.
681            Content-Transfer-Encoding: 7bit
682
683            Simple message.
684            """))
685
686    def test_set_headers_from_strings(self):
687        m = self._make_message()
688        content = "Simple message.\n"
689        raw_data_manager.set_content(m, content, headers=(
690            "X-Foo-Header: foo",
691            "X-Bar-Header: bar",))
692        self.assertEqual(str(m), textwrap.dedent("""\
693            Content-Type: text/plain; charset="utf-8"
694            X-Foo-Header: foo
695            X-Bar-Header: bar
696            Content-Transfer-Encoding: 7bit
697
698            Simple message.
699            """))
700
701    def test_set_headers_with_invalid_duplicate_string_header_raises(self):
702        m = self._make_message()
703        content = "Simple message.\n"
704        with self.assertRaisesRegex(ValueError, 'Content-Type'):
705            raw_data_manager.set_content(m, content, headers=(
706                "Content-Type: foo/bar",)
707                )
708
709    def test_set_headers_with_invalid_duplicate_header_header_raises(self):
710        m = self._make_message()
711        content = "Simple message.\n"
712        header_factory = self.policy.header_factory
713        with self.assertRaisesRegex(ValueError, 'Content-Type'):
714            raw_data_manager.set_content(m, content, headers=(
715                header_factory("Content-Type", " foo/bar"),)
716                )
717
718    def test_set_headers_with_defective_string_header_raises(self):
719        m = self._make_message()
720        content = "Simple message.\n"
721        with self.assertRaisesRegex(ValueError, 'a@fairly@@invalid@address'):
722            raw_data_manager.set_content(m, content, headers=(
723                'To: a@fairly@@invalid@address',)
724                )
725            print(m['To'].defects)
726
727    def test_set_headers_with_defective_header_header_raises(self):
728        m = self._make_message()
729        content = "Simple message.\n"
730        header_factory = self.policy.header_factory
731        with self.assertRaisesRegex(ValueError, 'a@fairly@@invalid@address'):
732            raw_data_manager.set_content(m, content, headers=(
733                header_factory('To', 'a@fairly@@invalid@address'),)
734                )
735            print(m['To'].defects)
736
737    def test_set_disposition_inline(self):
738        m = self._make_message()
739        m.set_content('foo', disposition='inline')
740        self.assertEqual(m['Content-Disposition'], 'inline')
741
742    def test_set_disposition_attachment(self):
743        m = self._make_message()
744        m.set_content('foo', disposition='attachment')
745        self.assertEqual(m['Content-Disposition'], 'attachment')
746
747    def test_set_disposition_foo(self):
748        m = self._make_message()
749        m.set_content('foo', disposition='foo')
750        self.assertEqual(m['Content-Disposition'], 'foo')
751
752    # XXX: we should have a 'strict' policy mode (beyond raise_on_defect) that
753    # would cause 'foo' above to raise.
754
755    def test_set_filename(self):
756        m = self._make_message()
757        m.set_content('foo', filename='bar.txt')
758        self.assertEqual(m['Content-Disposition'],
759                         'attachment; filename="bar.txt"')
760
761    def test_set_filename_and_disposition_inline(self):
762        m = self._make_message()
763        m.set_content('foo', disposition='inline', filename='bar.txt')
764        self.assertEqual(m['Content-Disposition'], 'inline; filename="bar.txt"')
765
766    def test_set_non_ascii_filename(self):
767        m = self._make_message()
768        m.set_content('foo', filename='ábárî.txt')
769        self.assertEqual(bytes(m), textwrap.dedent("""\
770            Content-Type: text/plain; charset="utf-8"
771            Content-Transfer-Encoding: 7bit
772            Content-Disposition: attachment;
773             filename*=utf-8''%C3%A1b%C3%A1r%C3%AE.txt
774            MIME-Version: 1.0
775
776            foo
777            """).encode('ascii'))
778
779    content_object_params = {
780        'text_plain': ('content', ()),
781        'text_html': ('content', ('html',)),
782        'application_octet_stream': (b'content',
783                                     ('application', 'octet_stream')),
784        'image_jpeg': (b'content', ('image', 'jpeg')),
785        'message_rfc822': (message(), ()),
786        'message_external_body': (message(), ('external-body',)),
787        }
788
789    def content_object_as_header_receiver(self, obj, mimetype):
790        m = self._make_message()
791        m.set_content(obj, *mimetype, headers=(
792            'To: foo@example.com',
793            'From: bar@simple.net'))
794        self.assertEqual(m['to'], 'foo@example.com')
795        self.assertEqual(m['from'], 'bar@simple.net')
796
797    def content_object_as_disposition_inline_receiver(self, obj, mimetype):
798        m = self._make_message()
799        m.set_content(obj, *mimetype, disposition='inline')
800        self.assertEqual(m['Content-Disposition'], 'inline')
801
802    def content_object_as_non_ascii_filename_receiver(self, obj, mimetype):
803        m = self._make_message()
804        m.set_content(obj, *mimetype, disposition='inline', filename='bár.txt')
805        self.assertEqual(m['Content-Disposition'], 'inline; filename="bár.txt"')
806        self.assertEqual(m.get_filename(), "bár.txt")
807        self.assertEqual(m['Content-Disposition'].params['filename'], "bár.txt")
808
809    def content_object_as_cid_receiver(self, obj, mimetype):
810        m = self._make_message()
811        m.set_content(obj, *mimetype, cid='some_random_stuff')
812        self.assertEqual(m['Content-ID'], 'some_random_stuff')
813
814    def content_object_as_params_receiver(self, obj, mimetype):
815        m = self._make_message()
816        params = {'foo': 'bár', 'abc': 'xyz'}
817        m.set_content(obj, *mimetype, params=params)
818        if isinstance(obj, str):
819            params['charset'] = 'utf-8'
820        self.assertEqual(m['Content-Type'].params, params)
821
822
823if __name__ == '__main__':
824    unittest.main()
825