1import asyncore
2import base64
3import email.mime.text
4from email.message import EmailMessage
5from email.base64mime import body_encode as encode_base64
6import email.utils
7import hmac
8import socket
9import smtpd
10import smtplib
11import io
12import re
13import sys
14import time
15import select
16import errno
17import textwrap
18import threading
19
20import unittest
21from test import support, mock_socket
22from test.support import HOST, HOSTv4, HOSTv6
23from unittest.mock import Mock
24
25
26if sys.platform == 'darwin':
27    # select.poll returns a select.POLLHUP at the end of the tests
28    # on darwin, so just ignore it
29    def handle_expt(self):
30        pass
31    smtpd.SMTPChannel.handle_expt = handle_expt
32
33
34def server(evt, buf, serv):
35    serv.listen()
36    evt.set()
37    try:
38        conn, addr = serv.accept()
39    except socket.timeout:
40        pass
41    else:
42        n = 500
43        while buf and n > 0:
44            r, w, e = select.select([], [conn], [])
45            if w:
46                sent = conn.send(buf)
47                buf = buf[sent:]
48
49            n -= 1
50
51        conn.close()
52    finally:
53        serv.close()
54        evt.set()
55
56class GeneralTests(unittest.TestCase):
57
58    def setUp(self):
59        smtplib.socket = mock_socket
60        self.port = 25
61
62    def tearDown(self):
63        smtplib.socket = socket
64
65    # This method is no longer used but is retained for backward compatibility,
66    # so test to make sure it still works.
67    def testQuoteData(self):
68        teststr  = "abc\n.jkl\rfoo\r\n..blue"
69        expected = "abc\r\n..jkl\r\nfoo\r\n...blue"
70        self.assertEqual(expected, smtplib.quotedata(teststr))
71
72    def testBasic1(self):
73        mock_socket.reply_with(b"220 Hola mundo")
74        # connects
75        smtp = smtplib.SMTP(HOST, self.port)
76        smtp.close()
77
78    def testSourceAddress(self):
79        mock_socket.reply_with(b"220 Hola mundo")
80        # connects
81        smtp = smtplib.SMTP(HOST, self.port,
82                source_address=('127.0.0.1',19876))
83        self.assertEqual(smtp.source_address, ('127.0.0.1', 19876))
84        smtp.close()
85
86    def testBasic2(self):
87        mock_socket.reply_with(b"220 Hola mundo")
88        # connects, include port in host name
89        smtp = smtplib.SMTP("%s:%s" % (HOST, self.port))
90        smtp.close()
91
92    def testLocalHostName(self):
93        mock_socket.reply_with(b"220 Hola mundo")
94        # check that supplied local_hostname is used
95        smtp = smtplib.SMTP(HOST, self.port, local_hostname="testhost")
96        self.assertEqual(smtp.local_hostname, "testhost")
97        smtp.close()
98
99    def testTimeoutDefault(self):
100        mock_socket.reply_with(b"220 Hola mundo")
101        self.assertIsNone(mock_socket.getdefaulttimeout())
102        mock_socket.setdefaulttimeout(30)
103        self.assertEqual(mock_socket.getdefaulttimeout(), 30)
104        try:
105            smtp = smtplib.SMTP(HOST, self.port)
106        finally:
107            mock_socket.setdefaulttimeout(None)
108        self.assertEqual(smtp.sock.gettimeout(), 30)
109        smtp.close()
110
111    def testTimeoutNone(self):
112        mock_socket.reply_with(b"220 Hola mundo")
113        self.assertIsNone(socket.getdefaulttimeout())
114        socket.setdefaulttimeout(30)
115        try:
116            smtp = smtplib.SMTP(HOST, self.port, timeout=None)
117        finally:
118            socket.setdefaulttimeout(None)
119        self.assertIsNone(smtp.sock.gettimeout())
120        smtp.close()
121
122    def testTimeoutValue(self):
123        mock_socket.reply_with(b"220 Hola mundo")
124        smtp = smtplib.SMTP(HOST, self.port, timeout=30)
125        self.assertEqual(smtp.sock.gettimeout(), 30)
126        smtp.close()
127
128    def test_debuglevel(self):
129        mock_socket.reply_with(b"220 Hello world")
130        smtp = smtplib.SMTP()
131        smtp.set_debuglevel(1)
132        with support.captured_stderr() as stderr:
133            smtp.connect(HOST, self.port)
134        smtp.close()
135        expected = re.compile(r"^connect:", re.MULTILINE)
136        self.assertRegex(stderr.getvalue(), expected)
137
138    def test_debuglevel_2(self):
139        mock_socket.reply_with(b"220 Hello world")
140        smtp = smtplib.SMTP()
141        smtp.set_debuglevel(2)
142        with support.captured_stderr() as stderr:
143            smtp.connect(HOST, self.port)
144        smtp.close()
145        expected = re.compile(r"^\d{2}:\d{2}:\d{2}\.\d{6} connect: ",
146                              re.MULTILINE)
147        self.assertRegex(stderr.getvalue(), expected)
148
149
150# Test server thread using the specified SMTP server class
151def debugging_server(serv, serv_evt, client_evt):
152    serv_evt.set()
153
154    try:
155        if hasattr(select, 'poll'):
156            poll_fun = asyncore.poll2
157        else:
158            poll_fun = asyncore.poll
159
160        n = 1000
161        while asyncore.socket_map and n > 0:
162            poll_fun(0.01, asyncore.socket_map)
163
164            # when the client conversation is finished, it will
165            # set client_evt, and it's then ok to kill the server
166            if client_evt.is_set():
167                serv.close()
168                break
169
170            n -= 1
171
172    except socket.timeout:
173        pass
174    finally:
175        if not client_evt.is_set():
176            # allow some time for the client to read the result
177            time.sleep(0.5)
178            serv.close()
179        asyncore.close_all()
180        serv_evt.set()
181
182MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
183MSG_END = '------------ END MESSAGE ------------\n'
184
185# NOTE: Some SMTP objects in the tests below are created with a non-default
186# local_hostname argument to the constructor, since (on some systems) the FQDN
187# lookup caused by the default local_hostname sometimes takes so long that the
188# test server times out, causing the test to fail.
189
190# Test behavior of smtpd.DebuggingServer
191class DebuggingServerTests(unittest.TestCase):
192
193    maxDiff = None
194
195    def setUp(self):
196        self.real_getfqdn = socket.getfqdn
197        socket.getfqdn = mock_socket.getfqdn
198        # temporarily replace sys.stdout to capture DebuggingServer output
199        self.old_stdout = sys.stdout
200        self.output = io.StringIO()
201        sys.stdout = self.output
202
203        self.serv_evt = threading.Event()
204        self.client_evt = threading.Event()
205        # Capture SMTPChannel debug output
206        self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
207        smtpd.DEBUGSTREAM = io.StringIO()
208        # Pick a random unused port by passing 0 for the port number
209        self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1),
210                                          decode_data=True)
211        # Keep a note of what server host and port were assigned
212        self.host, self.port = self.serv.socket.getsockname()[:2]
213        serv_args = (self.serv, self.serv_evt, self.client_evt)
214        self.thread = threading.Thread(target=debugging_server, args=serv_args)
215        self.thread.start()
216
217        # wait until server thread has assigned a port number
218        self.serv_evt.wait()
219        self.serv_evt.clear()
220
221    def tearDown(self):
222        socket.getfqdn = self.real_getfqdn
223        # indicate that the client is finished
224        self.client_evt.set()
225        # wait for the server thread to terminate
226        self.serv_evt.wait()
227        self.thread.join()
228        # restore sys.stdout
229        sys.stdout = self.old_stdout
230        # restore DEBUGSTREAM
231        smtpd.DEBUGSTREAM.close()
232        smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM
233
234    def get_output_without_xpeer(self):
235        test_output = self.output.getvalue()
236        return re.sub(r'(.*?)^X-Peer:\s*\S+\n(.*)', r'\1\2',
237                      test_output, flags=re.MULTILINE|re.DOTALL)
238
239    def testBasic(self):
240        # connect
241        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
242        smtp.quit()
243
244    def testSourceAddress(self):
245        # connect
246        src_port = support.find_unused_port()
247        try:
248            smtp = smtplib.SMTP(self.host, self.port, local_hostname='localhost',
249                                timeout=3, source_address=(self.host, src_port))
250            self.assertEqual(smtp.source_address, (self.host, src_port))
251            self.assertEqual(smtp.local_hostname, 'localhost')
252            smtp.quit()
253        except OSError as e:
254            if e.errno == errno.EADDRINUSE:
255                self.skipTest("couldn't bind to source port %d" % src_port)
256            raise
257
258    def testNOOP(self):
259        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
260        expected = (250, b'OK')
261        self.assertEqual(smtp.noop(), expected)
262        smtp.quit()
263
264    def testRSET(self):
265        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
266        expected = (250, b'OK')
267        self.assertEqual(smtp.rset(), expected)
268        smtp.quit()
269
270    def testELHO(self):
271        # EHLO isn't implemented in DebuggingServer
272        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
273        expected = (250, b'\nSIZE 33554432\nHELP')
274        self.assertEqual(smtp.ehlo(), expected)
275        smtp.quit()
276
277    def testEXPNNotImplemented(self):
278        # EXPN isn't implemented in DebuggingServer
279        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
280        expected = (502, b'EXPN not implemented')
281        smtp.putcmd('EXPN')
282        self.assertEqual(smtp.getreply(), expected)
283        smtp.quit()
284
285    def test_issue43124_putcmd_escapes_newline(self):
286        # see: https://bugs.python.org/issue43124
287        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
288                            timeout=10)  # support.LOOPBACK_TIMEOUT in newer Pythons
289        self.addCleanup(smtp.close)
290        with self.assertRaises(ValueError) as exc:
291            smtp.putcmd('helo\nX-INJECTED')
292        self.assertIn("prohibited newline characters", str(exc.exception))
293        smtp.quit()
294
295    def testVRFY(self):
296        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
297        expected = (252, b'Cannot VRFY user, but will accept message ' + \
298                         b'and attempt delivery')
299        self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected)
300        self.assertEqual(smtp.verify('nobody@nowhere.com'), expected)
301        smtp.quit()
302
303    def testSecondHELO(self):
304        # check that a second HELO returns a message that it's a duplicate
305        # (this behavior is specific to smtpd.SMTPChannel)
306        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
307        smtp.helo()
308        expected = (503, b'Duplicate HELO/EHLO')
309        self.assertEqual(smtp.helo(), expected)
310        smtp.quit()
311
312    def testHELP(self):
313        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
314        self.assertEqual(smtp.help(), b'Supported commands: EHLO HELO MAIL ' + \
315                                      b'RCPT DATA RSET NOOP QUIT VRFY')
316        smtp.quit()
317
318    def testSend(self):
319        # connect and send mail
320        m = 'A test message'
321        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
322        smtp.sendmail('John', 'Sally', m)
323        # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor
324        # in asyncore.  This sleep might help, but should really be fixed
325        # properly by using an Event variable.
326        time.sleep(0.01)
327        smtp.quit()
328
329        self.client_evt.set()
330        self.serv_evt.wait()
331        self.output.flush()
332        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
333        self.assertEqual(self.output.getvalue(), mexpect)
334
335    def testSendBinary(self):
336        m = b'A test message'
337        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
338        smtp.sendmail('John', 'Sally', m)
339        # XXX (see comment in testSend)
340        time.sleep(0.01)
341        smtp.quit()
342
343        self.client_evt.set()
344        self.serv_evt.wait()
345        self.output.flush()
346        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END)
347        self.assertEqual(self.output.getvalue(), mexpect)
348
349    def testSendNeedingDotQuote(self):
350        # Issue 12283
351        m = '.A test\n.mes.sage.'
352        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
353        smtp.sendmail('John', 'Sally', m)
354        # XXX (see comment in testSend)
355        time.sleep(0.01)
356        smtp.quit()
357
358        self.client_evt.set()
359        self.serv_evt.wait()
360        self.output.flush()
361        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
362        self.assertEqual(self.output.getvalue(), mexpect)
363
364    def test_issue43124_escape_localhostname(self):
365        # see: https://bugs.python.org/issue43124
366        # connect and send mail
367        m = 'wazzuuup\nlinetwo'
368        smtp = smtplib.SMTP(HOST, self.port, local_hostname='hi\nX-INJECTED',
369                            timeout=10)  # support.LOOPBACK_TIMEOUT in newer Pythons
370        self.addCleanup(smtp.close)
371        with self.assertRaises(ValueError) as exc:
372            smtp.sendmail("hi@me.com", "you@me.com", m)
373        self.assertIn(
374            "prohibited newline characters: ehlo hi\\nX-INJECTED",
375            str(exc.exception),
376        )
377        # XXX (see comment in testSend)
378        time.sleep(0.01)
379        smtp.quit()
380
381        debugout = smtpd.DEBUGSTREAM.getvalue()
382        self.assertNotIn("X-INJECTED", debugout)
383
384    def test_issue43124_escape_options(self):
385        # see: https://bugs.python.org/issue43124
386        # connect and send mail
387        m = 'wazzuuup\nlinetwo'
388        smtp = smtplib.SMTP(
389            HOST, self.port, local_hostname='localhost',
390            timeout=10)  # support.LOOPBACK_TIMEOUT in newer Pythons
391
392        self.addCleanup(smtp.close)
393        smtp.sendmail("hi@me.com", "you@me.com", m)
394        with self.assertRaises(ValueError) as exc:
395            smtp.mail("hi@me.com", ["X-OPTION\nX-INJECTED-1", "X-OPTION2\nX-INJECTED-2"])
396        msg = str(exc.exception)
397        self.assertIn("prohibited newline characters", msg)
398        self.assertIn("X-OPTION\\nX-INJECTED-1 X-OPTION2\\nX-INJECTED-2", msg)
399        # XXX (see comment in testSend)
400        time.sleep(0.01)
401        smtp.quit()
402
403        debugout = smtpd.DEBUGSTREAM.getvalue()
404        self.assertNotIn("X-OPTION", debugout)
405        self.assertNotIn("X-OPTION2", debugout)
406        self.assertNotIn("X-INJECTED-1", debugout)
407        self.assertNotIn("X-INJECTED-2", debugout)
408
409    def testSendNullSender(self):
410        m = 'A test message'
411        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
412        smtp.sendmail('<>', 'Sally', m)
413        # XXX (see comment in testSend)
414        time.sleep(0.01)
415        smtp.quit()
416
417        self.client_evt.set()
418        self.serv_evt.wait()
419        self.output.flush()
420        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
421        self.assertEqual(self.output.getvalue(), mexpect)
422        debugout = smtpd.DEBUGSTREAM.getvalue()
423        sender = re.compile("^sender: <>$", re.MULTILINE)
424        self.assertRegex(debugout, sender)
425
426    def testSendMessage(self):
427        m = email.mime.text.MIMEText('A test message')
428        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
429        smtp.send_message(m, from_addr='John', to_addrs='Sally')
430        # XXX (see comment in testSend)
431        time.sleep(0.01)
432        smtp.quit()
433
434        self.client_evt.set()
435        self.serv_evt.wait()
436        self.output.flush()
437        # Remove the X-Peer header that DebuggingServer adds as figuring out
438        # exactly what IP address format is put there is not easy (and
439        # irrelevant to our test).  Typically 127.0.0.1 or ::1, but it is
440        # not always the same as socket.gethostbyname(HOST). :(
441        test_output = self.get_output_without_xpeer()
442        del m['X-Peer']
443        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
444        self.assertEqual(test_output, mexpect)
445
446    def testSendMessageWithAddresses(self):
447        m = email.mime.text.MIMEText('A test message')
448        m['From'] = 'foo@bar.com'
449        m['To'] = 'John'
450        m['CC'] = 'Sally, Fred'
451        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
452        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
453        smtp.send_message(m)
454        # XXX (see comment in testSend)
455        time.sleep(0.01)
456        smtp.quit()
457        # make sure the Bcc header is still in the message.
458        self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" '
459                                    '<warped@silly.walks.com>')
460
461        self.client_evt.set()
462        self.serv_evt.wait()
463        self.output.flush()
464        # Remove the X-Peer header that DebuggingServer adds.
465        test_output = self.get_output_without_xpeer()
466        del m['X-Peer']
467        # The Bcc header should not be transmitted.
468        del m['Bcc']
469        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
470        self.assertEqual(test_output, mexpect)
471        debugout = smtpd.DEBUGSTREAM.getvalue()
472        sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
473        self.assertRegex(debugout, sender)
474        for addr in ('John', 'Sally', 'Fred', 'root@localhost',
475                     'warped@silly.walks.com'):
476            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
477                                 re.MULTILINE)
478            self.assertRegex(debugout, to_addr)
479
480    def testSendMessageWithSomeAddresses(self):
481        # Make sure nothing breaks if not all of the three 'to' headers exist
482        m = email.mime.text.MIMEText('A test message')
483        m['From'] = 'foo@bar.com'
484        m['To'] = 'John, Dinsdale'
485        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
486        smtp.send_message(m)
487        # XXX (see comment in testSend)
488        time.sleep(0.01)
489        smtp.quit()
490
491        self.client_evt.set()
492        self.serv_evt.wait()
493        self.output.flush()
494        # Remove the X-Peer header that DebuggingServer adds.
495        test_output = self.get_output_without_xpeer()
496        del m['X-Peer']
497        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
498        self.assertEqual(test_output, mexpect)
499        debugout = smtpd.DEBUGSTREAM.getvalue()
500        sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
501        self.assertRegex(debugout, sender)
502        for addr in ('John', 'Dinsdale'):
503            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
504                                 re.MULTILINE)
505            self.assertRegex(debugout, to_addr)
506
507    def testSendMessageWithSpecifiedAddresses(self):
508        # Make sure addresses specified in call override those in message.
509        m = email.mime.text.MIMEText('A test message')
510        m['From'] = 'foo@bar.com'
511        m['To'] = 'John, Dinsdale'
512        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
513        smtp.send_message(m, from_addr='joe@example.com', to_addrs='foo@example.net')
514        # XXX (see comment in testSend)
515        time.sleep(0.01)
516        smtp.quit()
517
518        self.client_evt.set()
519        self.serv_evt.wait()
520        self.output.flush()
521        # Remove the X-Peer header that DebuggingServer adds.
522        test_output = self.get_output_without_xpeer()
523        del m['X-Peer']
524        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
525        self.assertEqual(test_output, mexpect)
526        debugout = smtpd.DEBUGSTREAM.getvalue()
527        sender = re.compile("^sender: joe@example.com$", re.MULTILINE)
528        self.assertRegex(debugout, sender)
529        for addr in ('John', 'Dinsdale'):
530            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
531                                 re.MULTILINE)
532            self.assertNotRegex(debugout, to_addr)
533        recip = re.compile(r"^recips: .*'foo@example.net'.*$", re.MULTILINE)
534        self.assertRegex(debugout, recip)
535
536    def testSendMessageWithMultipleFrom(self):
537        # Sender overrides To
538        m = email.mime.text.MIMEText('A test message')
539        m['From'] = 'Bernard, Bianca'
540        m['Sender'] = 'the_rescuers@Rescue-Aid-Society.com'
541        m['To'] = 'John, Dinsdale'
542        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
543        smtp.send_message(m)
544        # XXX (see comment in testSend)
545        time.sleep(0.01)
546        smtp.quit()
547
548        self.client_evt.set()
549        self.serv_evt.wait()
550        self.output.flush()
551        # Remove the X-Peer header that DebuggingServer adds.
552        test_output = self.get_output_without_xpeer()
553        del m['X-Peer']
554        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
555        self.assertEqual(test_output, mexpect)
556        debugout = smtpd.DEBUGSTREAM.getvalue()
557        sender = re.compile("^sender: the_rescuers@Rescue-Aid-Society.com$", re.MULTILINE)
558        self.assertRegex(debugout, sender)
559        for addr in ('John', 'Dinsdale'):
560            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
561                                 re.MULTILINE)
562            self.assertRegex(debugout, to_addr)
563
564    def testSendMessageResent(self):
565        m = email.mime.text.MIMEText('A test message')
566        m['From'] = 'foo@bar.com'
567        m['To'] = 'John'
568        m['CC'] = 'Sally, Fred'
569        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
570        m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
571        m['Resent-From'] = 'holy@grail.net'
572        m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
573        m['Resent-Bcc'] = 'doe@losthope.net'
574        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
575        smtp.send_message(m)
576        # XXX (see comment in testSend)
577        time.sleep(0.01)
578        smtp.quit()
579
580        self.client_evt.set()
581        self.serv_evt.wait()
582        self.output.flush()
583        # The Resent-Bcc headers are deleted before serialization.
584        del m['Bcc']
585        del m['Resent-Bcc']
586        # Remove the X-Peer header that DebuggingServer adds.
587        test_output = self.get_output_without_xpeer()
588        del m['X-Peer']
589        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
590        self.assertEqual(test_output, mexpect)
591        debugout = smtpd.DEBUGSTREAM.getvalue()
592        sender = re.compile("^sender: holy@grail.net$", re.MULTILINE)
593        self.assertRegex(debugout, sender)
594        for addr in ('my_mom@great.cooker.com', 'Jeff', 'doe@losthope.net'):
595            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
596                                 re.MULTILINE)
597            self.assertRegex(debugout, to_addr)
598
599    def testSendMessageMultipleResentRaises(self):
600        m = email.mime.text.MIMEText('A test message')
601        m['From'] = 'foo@bar.com'
602        m['To'] = 'John'
603        m['CC'] = 'Sally, Fred'
604        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
605        m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
606        m['Resent-From'] = 'holy@grail.net'
607        m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
608        m['Resent-Bcc'] = 'doe@losthope.net'
609        m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000'
610        m['Resent-To'] = 'holy@grail.net'
611        m['Resent-From'] = 'Martha <my_mom@great.cooker.com>, Jeff'
612        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
613        with self.assertRaises(ValueError):
614            smtp.send_message(m)
615        smtp.close()
616
617class NonConnectingTests(unittest.TestCase):
618
619    def testNotConnected(self):
620        # Test various operations on an unconnected SMTP object that
621        # should raise exceptions (at present the attempt in SMTP.send
622        # to reference the nonexistent 'sock' attribute of the SMTP object
623        # causes an AttributeError)
624        smtp = smtplib.SMTP()
625        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
626        self.assertRaises(smtplib.SMTPServerDisconnected,
627                          smtp.send, 'test msg')
628
629    def testNonnumericPort(self):
630        # check that non-numeric port raises OSError
631        self.assertRaises(OSError, smtplib.SMTP,
632                          "localhost", "bogus")
633        self.assertRaises(OSError, smtplib.SMTP,
634                          "localhost:bogus")
635
636
637class DefaultArgumentsTests(unittest.TestCase):
638
639    def setUp(self):
640        self.msg = EmailMessage()
641        self.msg['From'] = 'Páolo <főo@bar.com>'
642        self.smtp = smtplib.SMTP()
643        self.smtp.ehlo = Mock(return_value=(200, 'OK'))
644        self.smtp.has_extn, self.smtp.sendmail = Mock(), Mock()
645
646    def testSendMessage(self):
647        expected_mail_options = ('SMTPUTF8', 'BODY=8BITMIME')
648        self.smtp.send_message(self.msg)
649        self.smtp.send_message(self.msg)
650        self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
651                         expected_mail_options)
652        self.assertEqual(self.smtp.sendmail.call_args_list[1][0][3],
653                         expected_mail_options)
654
655    def testSendMessageWithMailOptions(self):
656        mail_options = ['STARTTLS']
657        expected_mail_options = ('STARTTLS', 'SMTPUTF8', 'BODY=8BITMIME')
658        self.smtp.send_message(self.msg, None, None, mail_options)
659        self.assertEqual(mail_options, ['STARTTLS'])
660        self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
661                         expected_mail_options)
662
663
664# test response of client to a non-successful HELO message
665class BadHELOServerTests(unittest.TestCase):
666
667    def setUp(self):
668        smtplib.socket = mock_socket
669        mock_socket.reply_with(b"199 no hello for you!")
670        self.old_stdout = sys.stdout
671        self.output = io.StringIO()
672        sys.stdout = self.output
673        self.port = 25
674
675    def tearDown(self):
676        smtplib.socket = socket
677        sys.stdout = self.old_stdout
678
679    def testFailingHELO(self):
680        self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,
681                            HOST, self.port, 'localhost', 3)
682
683
684class TooLongLineTests(unittest.TestCase):
685    respdata = b'250 OK' + (b'.' * smtplib._MAXLINE * 2) + b'\n'
686
687    def setUp(self):
688        self.old_stdout = sys.stdout
689        self.output = io.StringIO()
690        sys.stdout = self.output
691
692        self.evt = threading.Event()
693        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
694        self.sock.settimeout(15)
695        self.port = support.bind_port(self.sock)
696        servargs = (self.evt, self.respdata, self.sock)
697        thread = threading.Thread(target=server, args=servargs)
698        thread.start()
699        self.addCleanup(thread.join)
700        self.evt.wait()
701        self.evt.clear()
702
703    def tearDown(self):
704        self.evt.wait()
705        sys.stdout = self.old_stdout
706
707    def testLineTooLong(self):
708        self.assertRaises(smtplib.SMTPResponseException, smtplib.SMTP,
709                          HOST, self.port, 'localhost', 3)
710
711
712sim_users = {'Mr.A@somewhere.com':'John A',
713             'Ms.B@xn--fo-fka.com':'Sally B',
714             'Mrs.C@somewhereesle.com':'Ruth C',
715            }
716
717sim_auth = ('Mr.A@somewhere.com', 'somepassword')
718sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'
719                          'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')
720sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'],
721             'list-2':['Ms.B@xn--fo-fka.com',],
722            }
723
724# Simulated SMTP channel & server
725class ResponseException(Exception): pass
726class SimSMTPChannel(smtpd.SMTPChannel):
727
728    quit_response = None
729    mail_response = None
730    rcpt_response = None
731    data_response = None
732    rcpt_count = 0
733    rset_count = 0
734    disconnect = 0
735    AUTH = 99    # Add protocol state to enable auth testing.
736    authenticated_user = None
737
738    def __init__(self, extra_features, *args, **kw):
739        self._extrafeatures = ''.join(
740            [ "250-{0}\r\n".format(x) for x in extra_features ])
741        super(SimSMTPChannel, self).__init__(*args, **kw)
742
743    # AUTH related stuff.  It would be nice if support for this were in smtpd.
744    def found_terminator(self):
745        if self.smtp_state == self.AUTH:
746            line = self._emptystring.join(self.received_lines)
747            print('Data:', repr(line), file=smtpd.DEBUGSTREAM)
748            self.received_lines = []
749            try:
750                self.auth_object(line)
751            except ResponseException as e:
752                self.smtp_state = self.COMMAND
753                self.push('%s %s' % (e.smtp_code, e.smtp_error))
754                return
755        super().found_terminator()
756
757
758    def smtp_AUTH(self, arg):
759        if not self.seen_greeting:
760            self.push('503 Error: send EHLO first')
761            return
762        if not self.extended_smtp or 'AUTH' not in self._extrafeatures:
763            self.push('500 Error: command "AUTH" not recognized')
764            return
765        if self.authenticated_user is not None:
766            self.push(
767                '503 Bad sequence of commands: already authenticated')
768            return
769        args = arg.split()
770        if len(args) not in [1, 2]:
771            self.push('501 Syntax: AUTH <mechanism> [initial-response]')
772            return
773        auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_')
774        try:
775            self.auth_object = getattr(self, auth_object_name)
776        except AttributeError:
777            self.push('504 Command parameter not implemented: unsupported '
778                      ' authentication mechanism {!r}'.format(auth_object_name))
779            return
780        self.smtp_state = self.AUTH
781        self.auth_object(args[1] if len(args) == 2 else None)
782
783    def _authenticated(self, user, valid):
784        if valid:
785            self.authenticated_user = user
786            self.push('235 Authentication Succeeded')
787        else:
788            self.push('535 Authentication credentials invalid')
789        self.smtp_state = self.COMMAND
790
791    def _decode_base64(self, string):
792        return base64.decodebytes(string.encode('ascii')).decode('utf-8')
793
794    def _auth_plain(self, arg=None):
795        if arg is None:
796            self.push('334 ')
797        else:
798            logpass = self._decode_base64(arg)
799            try:
800                *_, user, password = logpass.split('\0')
801            except ValueError as e:
802                self.push('535 Splitting response {!r} into user and password'
803                          ' failed: {}'.format(logpass, e))
804                return
805            self._authenticated(user, password == sim_auth[1])
806
807    def _auth_login(self, arg=None):
808        if arg is None:
809            # base64 encoded 'Username:'
810            self.push('334 VXNlcm5hbWU6')
811        elif not hasattr(self, '_auth_login_user'):
812            self._auth_login_user = self._decode_base64(arg)
813            # base64 encoded 'Password:'
814            self.push('334 UGFzc3dvcmQ6')
815        else:
816            password = self._decode_base64(arg)
817            self._authenticated(self._auth_login_user, password == sim_auth[1])
818            del self._auth_login_user
819
820    def _auth_cram_md5(self, arg=None):
821        if arg is None:
822            self.push('334 {}'.format(sim_cram_md5_challenge))
823        else:
824            logpass = self._decode_base64(arg)
825            try:
826                user, hashed_pass = logpass.split()
827            except ValueError as e:
828                self.push('535 Splitting response {!r} into user and password '
829                          'failed: {}'.format(logpass, e))
830                return False
831            valid_hashed_pass = hmac.HMAC(
832                sim_auth[1].encode('ascii'),
833                self._decode_base64(sim_cram_md5_challenge).encode('ascii'),
834                'md5').hexdigest()
835            self._authenticated(user, hashed_pass == valid_hashed_pass)
836    # end AUTH related stuff.
837
838    def smtp_EHLO(self, arg):
839        resp = ('250-testhost\r\n'
840                '250-EXPN\r\n'
841                '250-SIZE 20000000\r\n'
842                '250-STARTTLS\r\n'
843                '250-DELIVERBY\r\n')
844        resp = resp + self._extrafeatures + '250 HELP'
845        self.push(resp)
846        self.seen_greeting = arg
847        self.extended_smtp = True
848
849    def smtp_VRFY(self, arg):
850        # For max compatibility smtplib should be sending the raw address.
851        if arg in sim_users:
852            self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg)))
853        else:
854            self.push('550 No such user: %s' % arg)
855
856    def smtp_EXPN(self, arg):
857        list_name = arg.lower()
858        if list_name in sim_lists:
859            user_list = sim_lists[list_name]
860            for n, user_email in enumerate(user_list):
861                quoted_addr = smtplib.quoteaddr(user_email)
862                if n < len(user_list) - 1:
863                    self.push('250-%s %s' % (sim_users[user_email], quoted_addr))
864                else:
865                    self.push('250 %s %s' % (sim_users[user_email], quoted_addr))
866        else:
867            self.push('550 No access for you!')
868
869    def smtp_QUIT(self, arg):
870        if self.quit_response is None:
871            super(SimSMTPChannel, self).smtp_QUIT(arg)
872        else:
873            self.push(self.quit_response)
874            self.close_when_done()
875
876    def smtp_MAIL(self, arg):
877        if self.mail_response is None:
878            super().smtp_MAIL(arg)
879        else:
880            self.push(self.mail_response)
881            if self.disconnect:
882                self.close_when_done()
883
884    def smtp_RCPT(self, arg):
885        if self.rcpt_response is None:
886            super().smtp_RCPT(arg)
887            return
888        self.rcpt_count += 1
889        self.push(self.rcpt_response[self.rcpt_count-1])
890
891    def smtp_RSET(self, arg):
892        self.rset_count += 1
893        super().smtp_RSET(arg)
894
895    def smtp_DATA(self, arg):
896        if self.data_response is None:
897            super().smtp_DATA(arg)
898        else:
899            self.push(self.data_response)
900
901    def handle_error(self):
902        raise
903
904
905class SimSMTPServer(smtpd.SMTPServer):
906
907    channel_class = SimSMTPChannel
908
909    def __init__(self, *args, **kw):
910        self._extra_features = []
911        self._addresses = {}
912        smtpd.SMTPServer.__init__(self, *args, **kw)
913
914    def handle_accepted(self, conn, addr):
915        self._SMTPchannel = self.channel_class(
916            self._extra_features, self, conn, addr,
917            decode_data=self._decode_data)
918
919    def process_message(self, peer, mailfrom, rcpttos, data):
920        self._addresses['from'] = mailfrom
921        self._addresses['tos'] = rcpttos
922
923    def add_feature(self, feature):
924        self._extra_features.append(feature)
925
926    def handle_error(self):
927        raise
928
929
930# Test various SMTP & ESMTP commands/behaviors that require a simulated server
931# (i.e., something with more features than DebuggingServer)
932class SMTPSimTests(unittest.TestCase):
933
934    def setUp(self):
935        self.real_getfqdn = socket.getfqdn
936        socket.getfqdn = mock_socket.getfqdn
937        self.serv_evt = threading.Event()
938        self.client_evt = threading.Event()
939        # Pick a random unused port by passing 0 for the port number
940        self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1), decode_data=True)
941        # Keep a note of what port was assigned
942        self.port = self.serv.socket.getsockname()[1]
943        serv_args = (self.serv, self.serv_evt, self.client_evt)
944        self.thread = threading.Thread(target=debugging_server, args=serv_args)
945        self.thread.start()
946
947        # wait until server thread has assigned a port number
948        self.serv_evt.wait()
949        self.serv_evt.clear()
950
951    def tearDown(self):
952        socket.getfqdn = self.real_getfqdn
953        # indicate that the client is finished
954        self.client_evt.set()
955        # wait for the server thread to terminate
956        self.serv_evt.wait()
957        self.thread.join()
958
959    def testBasic(self):
960        # smoke test
961        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
962        smtp.quit()
963
964    def testEHLO(self):
965        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
966
967        # no features should be present before the EHLO
968        self.assertEqual(smtp.esmtp_features, {})
969
970        # features expected from the test server
971        expected_features = {'expn':'',
972                             'size': '20000000',
973                             'starttls': '',
974                             'deliverby': '',
975                             'help': '',
976                             }
977
978        smtp.ehlo()
979        self.assertEqual(smtp.esmtp_features, expected_features)
980        for k in expected_features:
981            self.assertTrue(smtp.has_extn(k))
982        self.assertFalse(smtp.has_extn('unsupported-feature'))
983        smtp.quit()
984
985    def testVRFY(self):
986        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
987
988        for addr_spec, name in sim_users.items():
989            expected_known = (250, bytes('%s %s' %
990                                         (name, smtplib.quoteaddr(addr_spec)),
991                                         "ascii"))
992            self.assertEqual(smtp.vrfy(addr_spec), expected_known)
993
994        u = 'nobody@nowhere.com'
995        expected_unknown = (550, ('No such user: %s' % u).encode('ascii'))
996        self.assertEqual(smtp.vrfy(u), expected_unknown)
997        smtp.quit()
998
999    def testEXPN(self):
1000        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1001
1002        for listname, members in sim_lists.items():
1003            users = []
1004            for m in members:
1005                users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m)))
1006            expected_known = (250, bytes('\n'.join(users), "ascii"))
1007            self.assertEqual(smtp.expn(listname), expected_known)
1008
1009        u = 'PSU-Members-List'
1010        expected_unknown = (550, b'No access for you!')
1011        self.assertEqual(smtp.expn(u), expected_unknown)
1012        smtp.quit()
1013
1014    def testAUTH_PLAIN(self):
1015        self.serv.add_feature("AUTH PLAIN")
1016        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1017        resp = smtp.login(sim_auth[0], sim_auth[1])
1018        self.assertEqual(resp, (235, b'Authentication Succeeded'))
1019        smtp.close()
1020
1021    def testAUTH_LOGIN(self):
1022        self.serv.add_feature("AUTH LOGIN")
1023        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1024        resp = smtp.login(sim_auth[0], sim_auth[1])
1025        self.assertEqual(resp, (235, b'Authentication Succeeded'))
1026        smtp.close()
1027
1028    def testAUTH_CRAM_MD5(self):
1029        self.serv.add_feature("AUTH CRAM-MD5")
1030        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1031        resp = smtp.login(sim_auth[0], sim_auth[1])
1032        self.assertEqual(resp, (235, b'Authentication Succeeded'))
1033        smtp.close()
1034
1035    def testAUTH_multiple(self):
1036        # Test that multiple authentication methods are tried.
1037        self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5")
1038        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1039        resp = smtp.login(sim_auth[0], sim_auth[1])
1040        self.assertEqual(resp, (235, b'Authentication Succeeded'))
1041        smtp.close()
1042
1043    def test_auth_function(self):
1044        supported = {'CRAM-MD5', 'PLAIN', 'LOGIN'}
1045        for mechanism in supported:
1046            self.serv.add_feature("AUTH {}".format(mechanism))
1047        for mechanism in supported:
1048            with self.subTest(mechanism=mechanism):
1049                smtp = smtplib.SMTP(HOST, self.port,
1050                                    local_hostname='localhost', timeout=15)
1051                smtp.ehlo('foo')
1052                smtp.user, smtp.password = sim_auth[0], sim_auth[1]
1053                method = 'auth_' + mechanism.lower().replace('-', '_')
1054                resp = smtp.auth(mechanism, getattr(smtp, method))
1055                self.assertEqual(resp, (235, b'Authentication Succeeded'))
1056                smtp.close()
1057
1058    def test_quit_resets_greeting(self):
1059        smtp = smtplib.SMTP(HOST, self.port,
1060                            local_hostname='localhost',
1061                            timeout=15)
1062        code, message = smtp.ehlo()
1063        self.assertEqual(code, 250)
1064        self.assertIn('size', smtp.esmtp_features)
1065        smtp.quit()
1066        self.assertNotIn('size', smtp.esmtp_features)
1067        smtp.connect(HOST, self.port)
1068        self.assertNotIn('size', smtp.esmtp_features)
1069        smtp.ehlo_or_helo_if_needed()
1070        self.assertIn('size', smtp.esmtp_features)
1071        smtp.quit()
1072
1073    def test_with_statement(self):
1074        with smtplib.SMTP(HOST, self.port) as smtp:
1075            code, message = smtp.noop()
1076            self.assertEqual(code, 250)
1077        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
1078        with smtplib.SMTP(HOST, self.port) as smtp:
1079            smtp.close()
1080        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
1081
1082    def test_with_statement_QUIT_failure(self):
1083        with self.assertRaises(smtplib.SMTPResponseException) as error:
1084            with smtplib.SMTP(HOST, self.port) as smtp:
1085                smtp.noop()
1086                self.serv._SMTPchannel.quit_response = '421 QUIT FAILED'
1087        self.assertEqual(error.exception.smtp_code, 421)
1088        self.assertEqual(error.exception.smtp_error, b'QUIT FAILED')
1089
1090    #TODO: add tests for correct AUTH method fallback now that the
1091    #test infrastructure can support it.
1092
1093    # Issue 17498: make sure _rset does not raise SMTPServerDisconnected exception
1094    def test__rest_from_mail_cmd(self):
1095        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1096        smtp.noop()
1097        self.serv._SMTPchannel.mail_response = '451 Requested action aborted'
1098        self.serv._SMTPchannel.disconnect = True
1099        with self.assertRaises(smtplib.SMTPSenderRefused):
1100            smtp.sendmail('John', 'Sally', 'test message')
1101        self.assertIsNone(smtp.sock)
1102
1103    # Issue 5713: make sure close, not rset, is called if we get a 421 error
1104    def test_421_from_mail_cmd(self):
1105        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1106        smtp.noop()
1107        self.serv._SMTPchannel.mail_response = '421 closing connection'
1108        with self.assertRaises(smtplib.SMTPSenderRefused):
1109            smtp.sendmail('John', 'Sally', 'test message')
1110        self.assertIsNone(smtp.sock)
1111        self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
1112
1113    def test_421_from_rcpt_cmd(self):
1114        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1115        smtp.noop()
1116        self.serv._SMTPchannel.rcpt_response = ['250 accepted', '421 closing']
1117        with self.assertRaises(smtplib.SMTPRecipientsRefused) as r:
1118            smtp.sendmail('John', ['Sally', 'Frank', 'George'], 'test message')
1119        self.assertIsNone(smtp.sock)
1120        self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
1121        self.assertDictEqual(r.exception.args[0], {'Frank': (421, b'closing')})
1122
1123    def test_421_from_data_cmd(self):
1124        class MySimSMTPChannel(SimSMTPChannel):
1125            def found_terminator(self):
1126                if self.smtp_state == self.DATA:
1127                    self.push('421 closing')
1128                else:
1129                    super().found_terminator()
1130        self.serv.channel_class = MySimSMTPChannel
1131        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1132        smtp.noop()
1133        with self.assertRaises(smtplib.SMTPDataError):
1134            smtp.sendmail('John@foo.org', ['Sally@foo.org'], 'test message')
1135        self.assertIsNone(smtp.sock)
1136        self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0)
1137
1138    def test_smtputf8_NotSupportedError_if_no_server_support(self):
1139        smtp = smtplib.SMTP(
1140            HOST, self.port, local_hostname='localhost', timeout=3)
1141        self.addCleanup(smtp.close)
1142        smtp.ehlo()
1143        self.assertTrue(smtp.does_esmtp)
1144        self.assertFalse(smtp.has_extn('smtputf8'))
1145        self.assertRaises(
1146            smtplib.SMTPNotSupportedError,
1147            smtp.sendmail,
1148            'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
1149        self.assertRaises(
1150            smtplib.SMTPNotSupportedError,
1151            smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8'])
1152
1153    def test_send_unicode_without_SMTPUTF8(self):
1154        smtp = smtplib.SMTP(
1155            HOST, self.port, local_hostname='localhost', timeout=3)
1156        self.addCleanup(smtp.close)
1157        self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Böb', '')
1158        self.assertRaises(UnicodeEncodeError, smtp.mail, 'Älice')
1159
1160    def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self):
1161        # This test is located here and not in the SMTPUTF8SimTests
1162        # class because it needs a "regular" SMTP server to work
1163        msg = EmailMessage()
1164        msg['From'] = "Páolo <főo@bar.com>"
1165        msg['To'] = 'Dinsdale'
1166        msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
1167        smtp = smtplib.SMTP(
1168            HOST, self.port, local_hostname='localhost', timeout=3)
1169        self.addCleanup(smtp.close)
1170        with self.assertRaises(smtplib.SMTPNotSupportedError):
1171            smtp.send_message(msg)
1172
1173    def test_name_field_not_included_in_envelop_addresses(self):
1174        smtp = smtplib.SMTP(
1175            HOST, self.port, local_hostname='localhost', timeout=3
1176        )
1177        self.addCleanup(smtp.close)
1178
1179        message = EmailMessage()
1180        message['From'] = email.utils.formataddr(('Michaël', 'michael@example.com'))
1181        message['To'] = email.utils.formataddr(('René', 'rene@example.com'))
1182
1183        self.assertDictEqual(smtp.send_message(message), {})
1184
1185        self.assertEqual(self.serv._addresses['from'], 'michael@example.com')
1186        self.assertEqual(self.serv._addresses['tos'], ['rene@example.com'])
1187
1188
1189class SimSMTPUTF8Server(SimSMTPServer):
1190
1191    def __init__(self, *args, **kw):
1192        # The base SMTP server turns these on automatically, but our test
1193        # server is set up to munge the EHLO response, so we need to provide
1194        # them as well.  And yes, the call is to SMTPServer not SimSMTPServer.
1195        self._extra_features = ['SMTPUTF8', '8BITMIME']
1196        smtpd.SMTPServer.__init__(self, *args, **kw)
1197
1198    def handle_accepted(self, conn, addr):
1199        self._SMTPchannel = self.channel_class(
1200            self._extra_features, self, conn, addr,
1201            decode_data=self._decode_data,
1202            enable_SMTPUTF8=self.enable_SMTPUTF8,
1203        )
1204
1205    def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None,
1206                                                             rcpt_options=None):
1207        self.last_peer = peer
1208        self.last_mailfrom = mailfrom
1209        self.last_rcpttos = rcpttos
1210        self.last_message = data
1211        self.last_mail_options = mail_options
1212        self.last_rcpt_options = rcpt_options
1213
1214
1215class SMTPUTF8SimTests(unittest.TestCase):
1216
1217    maxDiff = None
1218
1219    def setUp(self):
1220        self.real_getfqdn = socket.getfqdn
1221        socket.getfqdn = mock_socket.getfqdn
1222        self.serv_evt = threading.Event()
1223        self.client_evt = threading.Event()
1224        # Pick a random unused port by passing 0 for the port number
1225        self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1),
1226                                      decode_data=False,
1227                                      enable_SMTPUTF8=True)
1228        # Keep a note of what port was assigned
1229        self.port = self.serv.socket.getsockname()[1]
1230        serv_args = (self.serv, self.serv_evt, self.client_evt)
1231        self.thread = threading.Thread(target=debugging_server, args=serv_args)
1232        self.thread.start()
1233
1234        # wait until server thread has assigned a port number
1235        self.serv_evt.wait()
1236        self.serv_evt.clear()
1237
1238    def tearDown(self):
1239        socket.getfqdn = self.real_getfqdn
1240        # indicate that the client is finished
1241        self.client_evt.set()
1242        # wait for the server thread to terminate
1243        self.serv_evt.wait()
1244        self.thread.join()
1245
1246    def test_test_server_supports_extensions(self):
1247        smtp = smtplib.SMTP(
1248            HOST, self.port, local_hostname='localhost', timeout=3)
1249        self.addCleanup(smtp.close)
1250        smtp.ehlo()
1251        self.assertTrue(smtp.does_esmtp)
1252        self.assertTrue(smtp.has_extn('smtputf8'))
1253
1254    def test_send_unicode_with_SMTPUTF8_via_sendmail(self):
1255        m = '¡a test message containing unicode!'.encode('utf-8')
1256        smtp = smtplib.SMTP(
1257            HOST, self.port, local_hostname='localhost', timeout=3)
1258        self.addCleanup(smtp.close)
1259        smtp.sendmail('Jőhn', 'Sálly', m,
1260                      mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
1261        self.assertEqual(self.serv.last_mailfrom, 'Jőhn')
1262        self.assertEqual(self.serv.last_rcpttos, ['Sálly'])
1263        self.assertEqual(self.serv.last_message, m)
1264        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1265        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1266        self.assertEqual(self.serv.last_rcpt_options, [])
1267
1268    def test_send_unicode_with_SMTPUTF8_via_low_level_API(self):
1269        m = '¡a test message containing unicode!'.encode('utf-8')
1270        smtp = smtplib.SMTP(
1271            HOST, self.port, local_hostname='localhost', timeout=3)
1272        self.addCleanup(smtp.close)
1273        smtp.ehlo()
1274        self.assertEqual(
1275            smtp.mail('Jő', options=['BODY=8BITMIME', 'SMTPUTF8']),
1276            (250, b'OK'))
1277        self.assertEqual(smtp.rcpt('János'), (250, b'OK'))
1278        self.assertEqual(smtp.data(m), (250, b'OK'))
1279        self.assertEqual(self.serv.last_mailfrom, 'Jő')
1280        self.assertEqual(self.serv.last_rcpttos, ['János'])
1281        self.assertEqual(self.serv.last_message, m)
1282        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1283        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1284        self.assertEqual(self.serv.last_rcpt_options, [])
1285
1286    def test_send_message_uses_smtputf8_if_addrs_non_ascii(self):
1287        msg = EmailMessage()
1288        msg['From'] = "Páolo <főo@bar.com>"
1289        msg['To'] = 'Dinsdale'
1290        msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
1291        # XXX I don't know why I need two \n's here, but this is an existing
1292        # bug (if it is one) and not a problem with the new functionality.
1293        msg.set_content("oh là là, know what I mean, know what I mean?\n\n")
1294        # XXX smtpd converts received /r/n to /n, so we can't easily test that
1295        # we are successfully sending /r/n :(.
1296        expected = textwrap.dedent("""\
1297            From: Páolo <főo@bar.com>
1298            To: Dinsdale
1299            Subject: Nudge nudge, wink, wink \u1F609
1300            Content-Type: text/plain; charset="utf-8"
1301            Content-Transfer-Encoding: 8bit
1302            MIME-Version: 1.0
1303
1304            oh là là, know what I mean, know what I mean?
1305            """)
1306        smtp = smtplib.SMTP(
1307            HOST, self.port, local_hostname='localhost', timeout=3)
1308        self.addCleanup(smtp.close)
1309        self.assertEqual(smtp.send_message(msg), {})
1310        self.assertEqual(self.serv.last_mailfrom, 'főo@bar.com')
1311        self.assertEqual(self.serv.last_rcpttos, ['Dinsdale'])
1312        self.assertEqual(self.serv.last_message.decode(), expected)
1313        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1314        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1315        self.assertEqual(self.serv.last_rcpt_options, [])
1316
1317
1318EXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='')
1319
1320class SimSMTPAUTHInitialResponseChannel(SimSMTPChannel):
1321    def smtp_AUTH(self, arg):
1322        # RFC 4954's AUTH command allows for an optional initial-response.
1323        # Not all AUTH methods support this; some require a challenge.  AUTH
1324        # PLAIN does those, so test that here.  See issue #15014.
1325        args = arg.split()
1326        if args[0].lower() == 'plain':
1327            if len(args) == 2:
1328                # AUTH PLAIN <initial-response> with the response base 64
1329                # encoded.  Hard code the expected response for the test.
1330                if args[1] == EXPECTED_RESPONSE:
1331                    self.push('235 Ok')
1332                    return
1333        self.push('571 Bad authentication')
1334
1335class SimSMTPAUTHInitialResponseServer(SimSMTPServer):
1336    channel_class = SimSMTPAUTHInitialResponseChannel
1337
1338
1339class SMTPAUTHInitialResponseSimTests(unittest.TestCase):
1340    def setUp(self):
1341        self.real_getfqdn = socket.getfqdn
1342        socket.getfqdn = mock_socket.getfqdn
1343        self.serv_evt = threading.Event()
1344        self.client_evt = threading.Event()
1345        # Pick a random unused port by passing 0 for the port number
1346        self.serv = SimSMTPAUTHInitialResponseServer(
1347            (HOST, 0), ('nowhere', -1), decode_data=True)
1348        # Keep a note of what port was assigned
1349        self.port = self.serv.socket.getsockname()[1]
1350        serv_args = (self.serv, self.serv_evt, self.client_evt)
1351        self.thread = threading.Thread(target=debugging_server, args=serv_args)
1352        self.thread.start()
1353
1354        # wait until server thread has assigned a port number
1355        self.serv_evt.wait()
1356        self.serv_evt.clear()
1357
1358    def tearDown(self):
1359        socket.getfqdn = self.real_getfqdn
1360        # indicate that the client is finished
1361        self.client_evt.set()
1362        # wait for the server thread to terminate
1363        self.serv_evt.wait()
1364        self.thread.join()
1365
1366    def testAUTH_PLAIN_initial_response_login(self):
1367        self.serv.add_feature('AUTH PLAIN')
1368        smtp = smtplib.SMTP(HOST, self.port,
1369                            local_hostname='localhost', timeout=15)
1370        smtp.login('psu', 'doesnotexist')
1371        smtp.close()
1372
1373    def testAUTH_PLAIN_initial_response_auth(self):
1374        self.serv.add_feature('AUTH PLAIN')
1375        smtp = smtplib.SMTP(HOST, self.port,
1376                            local_hostname='localhost', timeout=15)
1377        smtp.user = 'psu'
1378        smtp.password = 'doesnotexist'
1379        code, response = smtp.auth('plain', smtp.auth_plain)
1380        smtp.close()
1381        self.assertEqual(code, 235)
1382
1383
1384if __name__ == '__main__':
1385    unittest.main()
1386