1import base64
2import email.mime.text
3from email.message import EmailMessage
4from email.base64mime import body_encode as encode_base64
5import email.utils
6import hashlib
7import hmac
8import socket
9import smtplib
10import io
11import re
12import sys
13import time
14import select
15import errno
16import textwrap
17import threading
18
19import unittest
20from test import support, mock_socket
21from test.support import hashlib_helper
22from test.support import socket_helper
23from test.support import threading_helper
24from unittest.mock import Mock
25
26import warnings
27with warnings.catch_warnings():
28    warnings.simplefilter('ignore', DeprecationWarning)
29    import asyncore
30    import smtpd
31
32HOST = socket_helper.HOST
33
34if sys.platform == 'darwin':
35    # select.poll returns a select.POLLHUP at the end of the tests
36    # on darwin, so just ignore it
37    def handle_expt(self):
38        pass
39    smtpd.SMTPChannel.handle_expt = handle_expt
40
41
42def server(evt, buf, serv):
43    serv.listen()
44    evt.set()
45    try:
46        conn, addr = serv.accept()
47    except TimeoutError:
48        pass
49    else:
50        n = 500
51        while buf and n > 0:
52            r, w, e = select.select([], [conn], [])
53            if w:
54                sent = conn.send(buf)
55                buf = buf[sent:]
56
57            n -= 1
58
59        conn.close()
60    finally:
61        serv.close()
62        evt.set()
63
64class GeneralTests:
65
66    def setUp(self):
67        smtplib.socket = mock_socket
68        self.port = 25
69
70    def tearDown(self):
71        smtplib.socket = socket
72
73    # This method is no longer used but is retained for backward compatibility,
74    # so test to make sure it still works.
75    def testQuoteData(self):
76        teststr  = "abc\n.jkl\rfoo\r\n..blue"
77        expected = "abc\r\n..jkl\r\nfoo\r\n...blue"
78        self.assertEqual(expected, smtplib.quotedata(teststr))
79
80    def testBasic1(self):
81        mock_socket.reply_with(b"220 Hola mundo")
82        # connects
83        client = self.client(HOST, self.port)
84        client.close()
85
86    def testSourceAddress(self):
87        mock_socket.reply_with(b"220 Hola mundo")
88        # connects
89        client = self.client(HOST, self.port,
90                             source_address=('127.0.0.1',19876))
91        self.assertEqual(client.source_address, ('127.0.0.1', 19876))
92        client.close()
93
94    def testBasic2(self):
95        mock_socket.reply_with(b"220 Hola mundo")
96        # connects, include port in host name
97        client = self.client("%s:%s" % (HOST, self.port))
98        client.close()
99
100    def testLocalHostName(self):
101        mock_socket.reply_with(b"220 Hola mundo")
102        # check that supplied local_hostname is used
103        client = self.client(HOST, self.port, local_hostname="testhost")
104        self.assertEqual(client.local_hostname, "testhost")
105        client.close()
106
107    def testTimeoutDefault(self):
108        mock_socket.reply_with(b"220 Hola mundo")
109        self.assertIsNone(mock_socket.getdefaulttimeout())
110        mock_socket.setdefaulttimeout(30)
111        self.assertEqual(mock_socket.getdefaulttimeout(), 30)
112        try:
113            client = self.client(HOST, self.port)
114        finally:
115            mock_socket.setdefaulttimeout(None)
116        self.assertEqual(client.sock.gettimeout(), 30)
117        client.close()
118
119    def testTimeoutNone(self):
120        mock_socket.reply_with(b"220 Hola mundo")
121        self.assertIsNone(socket.getdefaulttimeout())
122        socket.setdefaulttimeout(30)
123        try:
124            client = self.client(HOST, self.port, timeout=None)
125        finally:
126            socket.setdefaulttimeout(None)
127        self.assertIsNone(client.sock.gettimeout())
128        client.close()
129
130    def testTimeoutZero(self):
131        mock_socket.reply_with(b"220 Hola mundo")
132        with self.assertRaises(ValueError):
133            self.client(HOST, self.port, timeout=0)
134
135    def testTimeoutValue(self):
136        mock_socket.reply_with(b"220 Hola mundo")
137        client = self.client(HOST, self.port, timeout=30)
138        self.assertEqual(client.sock.gettimeout(), 30)
139        client.close()
140
141    def test_debuglevel(self):
142        mock_socket.reply_with(b"220 Hello world")
143        client = self.client()
144        client.set_debuglevel(1)
145        with support.captured_stderr() as stderr:
146            client.connect(HOST, self.port)
147        client.close()
148        expected = re.compile(r"^connect:", re.MULTILINE)
149        self.assertRegex(stderr.getvalue(), expected)
150
151    def test_debuglevel_2(self):
152        mock_socket.reply_with(b"220 Hello world")
153        client = self.client()
154        client.set_debuglevel(2)
155        with support.captured_stderr() as stderr:
156            client.connect(HOST, self.port)
157        client.close()
158        expected = re.compile(r"^\d{2}:\d{2}:\d{2}\.\d{6} connect: ",
159                              re.MULTILINE)
160        self.assertRegex(stderr.getvalue(), expected)
161
162
163class SMTPGeneralTests(GeneralTests, unittest.TestCase):
164
165    client = smtplib.SMTP
166
167
168class LMTPGeneralTests(GeneralTests, unittest.TestCase):
169
170    client = smtplib.LMTP
171
172    @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), "test requires Unix domain socket")
173    def testUnixDomainSocketTimeoutDefault(self):
174        local_host = '/some/local/lmtp/delivery/program'
175        mock_socket.reply_with(b"220 Hello world")
176        try:
177            client = self.client(local_host, self.port)
178        finally:
179            mock_socket.setdefaulttimeout(None)
180        self.assertIsNone(client.sock.gettimeout())
181        client.close()
182
183    def testTimeoutZero(self):
184        super().testTimeoutZero()
185        local_host = '/some/local/lmtp/delivery/program'
186        with self.assertRaises(ValueError):
187            self.client(local_host, timeout=0)
188
189# Test server thread using the specified SMTP server class
190def debugging_server(serv, serv_evt, client_evt):
191    serv_evt.set()
192
193    try:
194        if hasattr(select, 'poll'):
195            poll_fun = asyncore.poll2
196        else:
197            poll_fun = asyncore.poll
198
199        n = 1000
200        while asyncore.socket_map and n > 0:
201            poll_fun(0.01, asyncore.socket_map)
202
203            # when the client conversation is finished, it will
204            # set client_evt, and it's then ok to kill the server
205            if client_evt.is_set():
206                serv.close()
207                break
208
209            n -= 1
210
211    except TimeoutError:
212        pass
213    finally:
214        if not client_evt.is_set():
215            # allow some time for the client to read the result
216            time.sleep(0.5)
217            serv.close()
218        asyncore.close_all()
219        serv_evt.set()
220
221MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
222MSG_END = '------------ END MESSAGE ------------\n'
223
224# NOTE: Some SMTP objects in the tests below are created with a non-default
225# local_hostname argument to the constructor, since (on some systems) the FQDN
226# lookup caused by the default local_hostname sometimes takes so long that the
227# test server times out, causing the test to fail.
228
229# Test behavior of smtpd.DebuggingServer
230class DebuggingServerTests(unittest.TestCase):
231
232    maxDiff = None
233
234    def setUp(self):
235        self.thread_key = threading_helper.threading_setup()
236        self.real_getfqdn = socket.getfqdn
237        socket.getfqdn = mock_socket.getfqdn
238        # temporarily replace sys.stdout to capture DebuggingServer output
239        self.old_stdout = sys.stdout
240        self.output = io.StringIO()
241        sys.stdout = self.output
242
243        self.serv_evt = threading.Event()
244        self.client_evt = threading.Event()
245        # Capture SMTPChannel debug output
246        self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
247        smtpd.DEBUGSTREAM = io.StringIO()
248        # Pick a random unused port by passing 0 for the port number
249        self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1),
250                                          decode_data=True)
251        # Keep a note of what server host and port were assigned
252        self.host, self.port = self.serv.socket.getsockname()[:2]
253        serv_args = (self.serv, self.serv_evt, self.client_evt)
254        self.thread = threading.Thread(target=debugging_server, args=serv_args)
255        self.thread.start()
256
257        # wait until server thread has assigned a port number
258        self.serv_evt.wait()
259        self.serv_evt.clear()
260
261    def tearDown(self):
262        socket.getfqdn = self.real_getfqdn
263        # indicate that the client is finished
264        self.client_evt.set()
265        # wait for the server thread to terminate
266        self.serv_evt.wait()
267        threading_helper.join_thread(self.thread)
268        # restore sys.stdout
269        sys.stdout = self.old_stdout
270        # restore DEBUGSTREAM
271        smtpd.DEBUGSTREAM.close()
272        smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM
273        del self.thread
274        self.doCleanups()
275        threading_helper.threading_cleanup(*self.thread_key)
276
277    def get_output_without_xpeer(self):
278        test_output = self.output.getvalue()
279        return re.sub(r'(.*?)^X-Peer:\s*\S+\n(.*)', r'\1\2',
280                      test_output, flags=re.MULTILINE|re.DOTALL)
281
282    def testBasic(self):
283        # connect
284        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
285                            timeout=support.LOOPBACK_TIMEOUT)
286        smtp.quit()
287
288    def testSourceAddress(self):
289        # connect
290        src_port = socket_helper.find_unused_port()
291        try:
292            smtp = smtplib.SMTP(self.host, self.port, local_hostname='localhost',
293                                timeout=support.LOOPBACK_TIMEOUT,
294                                source_address=(self.host, src_port))
295            self.addCleanup(smtp.close)
296            self.assertEqual(smtp.source_address, (self.host, src_port))
297            self.assertEqual(smtp.local_hostname, 'localhost')
298            smtp.quit()
299        except OSError as e:
300            if e.errno == errno.EADDRINUSE:
301                self.skipTest("couldn't bind to source port %d" % src_port)
302            raise
303
304    def testNOOP(self):
305        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
306                            timeout=support.LOOPBACK_TIMEOUT)
307        self.addCleanup(smtp.close)
308        expected = (250, b'OK')
309        self.assertEqual(smtp.noop(), expected)
310        smtp.quit()
311
312    def testRSET(self):
313        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
314                            timeout=support.LOOPBACK_TIMEOUT)
315        self.addCleanup(smtp.close)
316        expected = (250, b'OK')
317        self.assertEqual(smtp.rset(), expected)
318        smtp.quit()
319
320    def testELHO(self):
321        # EHLO isn't implemented in DebuggingServer
322        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
323                            timeout=support.LOOPBACK_TIMEOUT)
324        self.addCleanup(smtp.close)
325        expected = (250, b'\nSIZE 33554432\nHELP')
326        self.assertEqual(smtp.ehlo(), expected)
327        smtp.quit()
328
329    def testEXPNNotImplemented(self):
330        # EXPN isn't implemented in DebuggingServer
331        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
332                            timeout=support.LOOPBACK_TIMEOUT)
333        self.addCleanup(smtp.close)
334        expected = (502, b'EXPN not implemented')
335        smtp.putcmd('EXPN')
336        self.assertEqual(smtp.getreply(), expected)
337        smtp.quit()
338
339    def test_issue43124_putcmd_escapes_newline(self):
340        # see: https://bugs.python.org/issue43124
341        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
342                            timeout=support.LOOPBACK_TIMEOUT)
343        self.addCleanup(smtp.close)
344        with self.assertRaises(ValueError) as exc:
345            smtp.putcmd('helo\nX-INJECTED')
346        self.assertIn("prohibited newline characters", str(exc.exception))
347        smtp.quit()
348
349    def testVRFY(self):
350        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
351                            timeout=support.LOOPBACK_TIMEOUT)
352        self.addCleanup(smtp.close)
353        expected = (252, b'Cannot VRFY user, but will accept message ' + \
354                         b'and attempt delivery')
355        self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected)
356        self.assertEqual(smtp.verify('nobody@nowhere.com'), expected)
357        smtp.quit()
358
359    def testSecondHELO(self):
360        # check that a second HELO returns a message that it's a duplicate
361        # (this behavior is specific to smtpd.SMTPChannel)
362        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
363                            timeout=support.LOOPBACK_TIMEOUT)
364        self.addCleanup(smtp.close)
365        smtp.helo()
366        expected = (503, b'Duplicate HELO/EHLO')
367        self.assertEqual(smtp.helo(), expected)
368        smtp.quit()
369
370    def testHELP(self):
371        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
372                            timeout=support.LOOPBACK_TIMEOUT)
373        self.addCleanup(smtp.close)
374        self.assertEqual(smtp.help(), b'Supported commands: EHLO HELO MAIL ' + \
375                                      b'RCPT DATA RSET NOOP QUIT VRFY')
376        smtp.quit()
377
378    def testSend(self):
379        # connect and send mail
380        m = 'A test message'
381        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
382                            timeout=support.LOOPBACK_TIMEOUT)
383        self.addCleanup(smtp.close)
384        smtp.sendmail('John', 'Sally', m)
385        # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor
386        # in asyncore.  This sleep might help, but should really be fixed
387        # properly by using an Event variable.
388        time.sleep(0.01)
389        smtp.quit()
390
391        self.client_evt.set()
392        self.serv_evt.wait()
393        self.output.flush()
394        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
395        self.assertEqual(self.output.getvalue(), mexpect)
396
397    def testSendBinary(self):
398        m = b'A test message'
399        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
400                            timeout=support.LOOPBACK_TIMEOUT)
401        self.addCleanup(smtp.close)
402        smtp.sendmail('John', 'Sally', m)
403        # XXX (see comment in testSend)
404        time.sleep(0.01)
405        smtp.quit()
406
407        self.client_evt.set()
408        self.serv_evt.wait()
409        self.output.flush()
410        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END)
411        self.assertEqual(self.output.getvalue(), mexpect)
412
413    def testSendNeedingDotQuote(self):
414        # Issue 12283
415        m = '.A test\n.mes.sage.'
416        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
417                            timeout=support.LOOPBACK_TIMEOUT)
418        self.addCleanup(smtp.close)
419        smtp.sendmail('John', 'Sally', m)
420        # XXX (see comment in testSend)
421        time.sleep(0.01)
422        smtp.quit()
423
424        self.client_evt.set()
425        self.serv_evt.wait()
426        self.output.flush()
427        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
428        self.assertEqual(self.output.getvalue(), mexpect)
429
430    def test_issue43124_escape_localhostname(self):
431        # see: https://bugs.python.org/issue43124
432        # connect and send mail
433        m = 'wazzuuup\nlinetwo'
434        smtp = smtplib.SMTP(HOST, self.port, local_hostname='hi\nX-INJECTED',
435                            timeout=support.LOOPBACK_TIMEOUT)
436        self.addCleanup(smtp.close)
437        with self.assertRaises(ValueError) as exc:
438            smtp.sendmail("hi@me.com", "you@me.com", m)
439        self.assertIn(
440            "prohibited newline characters: ehlo hi\\nX-INJECTED",
441            str(exc.exception),
442        )
443        # XXX (see comment in testSend)
444        time.sleep(0.01)
445        smtp.quit()
446
447        debugout = smtpd.DEBUGSTREAM.getvalue()
448        self.assertNotIn("X-INJECTED", debugout)
449
450    def test_issue43124_escape_options(self):
451        # see: https://bugs.python.org/issue43124
452        # connect and send mail
453        m = 'wazzuuup\nlinetwo'
454        smtp = smtplib.SMTP(
455            HOST, self.port, local_hostname='localhost',
456            timeout=support.LOOPBACK_TIMEOUT)
457
458        self.addCleanup(smtp.close)
459        smtp.sendmail("hi@me.com", "you@me.com", m)
460        with self.assertRaises(ValueError) as exc:
461            smtp.mail("hi@me.com", ["X-OPTION\nX-INJECTED-1", "X-OPTION2\nX-INJECTED-2"])
462        msg = str(exc.exception)
463        self.assertIn("prohibited newline characters", msg)
464        self.assertIn("X-OPTION\\nX-INJECTED-1 X-OPTION2\\nX-INJECTED-2", msg)
465        # XXX (see comment in testSend)
466        time.sleep(0.01)
467        smtp.quit()
468
469        debugout = smtpd.DEBUGSTREAM.getvalue()
470        self.assertNotIn("X-OPTION", debugout)
471        self.assertNotIn("X-OPTION2", debugout)
472        self.assertNotIn("X-INJECTED-1", debugout)
473        self.assertNotIn("X-INJECTED-2", debugout)
474
475    def testSendNullSender(self):
476        m = 'A test message'
477        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
478                            timeout=support.LOOPBACK_TIMEOUT)
479        self.addCleanup(smtp.close)
480        smtp.sendmail('<>', 'Sally', m)
481        # XXX (see comment in testSend)
482        time.sleep(0.01)
483        smtp.quit()
484
485        self.client_evt.set()
486        self.serv_evt.wait()
487        self.output.flush()
488        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
489        self.assertEqual(self.output.getvalue(), mexpect)
490        debugout = smtpd.DEBUGSTREAM.getvalue()
491        sender = re.compile("^sender: <>$", re.MULTILINE)
492        self.assertRegex(debugout, sender)
493
494    def testSendMessage(self):
495        m = email.mime.text.MIMEText('A test message')
496        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
497                            timeout=support.LOOPBACK_TIMEOUT)
498        self.addCleanup(smtp.close)
499        smtp.send_message(m, from_addr='John', to_addrs='Sally')
500        # XXX (see comment in testSend)
501        time.sleep(0.01)
502        smtp.quit()
503
504        self.client_evt.set()
505        self.serv_evt.wait()
506        self.output.flush()
507        # Remove the X-Peer header that DebuggingServer adds as figuring out
508        # exactly what IP address format is put there is not easy (and
509        # irrelevant to our test).  Typically 127.0.0.1 or ::1, but it is
510        # not always the same as socket.gethostbyname(HOST). :(
511        test_output = self.get_output_without_xpeer()
512        del m['X-Peer']
513        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
514        self.assertEqual(test_output, mexpect)
515
516    def testSendMessageWithAddresses(self):
517        m = email.mime.text.MIMEText('A test message')
518        m['From'] = 'foo@bar.com'
519        m['To'] = 'John'
520        m['CC'] = 'Sally, Fred'
521        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
522        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
523                            timeout=support.LOOPBACK_TIMEOUT)
524        self.addCleanup(smtp.close)
525        smtp.send_message(m)
526        # XXX (see comment in testSend)
527        time.sleep(0.01)
528        smtp.quit()
529        # make sure the Bcc header is still in the message.
530        self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" '
531                                    '<warped@silly.walks.com>')
532
533        self.client_evt.set()
534        self.serv_evt.wait()
535        self.output.flush()
536        # Remove the X-Peer header that DebuggingServer adds.
537        test_output = self.get_output_without_xpeer()
538        del m['X-Peer']
539        # The Bcc header should not be transmitted.
540        del m['Bcc']
541        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
542        self.assertEqual(test_output, mexpect)
543        debugout = smtpd.DEBUGSTREAM.getvalue()
544        sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
545        self.assertRegex(debugout, sender)
546        for addr in ('John', 'Sally', 'Fred', 'root@localhost',
547                     'warped@silly.walks.com'):
548            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
549                                 re.MULTILINE)
550            self.assertRegex(debugout, to_addr)
551
552    def testSendMessageWithSomeAddresses(self):
553        # Make sure nothing breaks if not all of the three 'to' headers exist
554        m = email.mime.text.MIMEText('A test message')
555        m['From'] = 'foo@bar.com'
556        m['To'] = 'John, Dinsdale'
557        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
558                            timeout=support.LOOPBACK_TIMEOUT)
559        self.addCleanup(smtp.close)
560        smtp.send_message(m)
561        # XXX (see comment in testSend)
562        time.sleep(0.01)
563        smtp.quit()
564
565        self.client_evt.set()
566        self.serv_evt.wait()
567        self.output.flush()
568        # Remove the X-Peer header that DebuggingServer adds.
569        test_output = self.get_output_without_xpeer()
570        del m['X-Peer']
571        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
572        self.assertEqual(test_output, mexpect)
573        debugout = smtpd.DEBUGSTREAM.getvalue()
574        sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
575        self.assertRegex(debugout, sender)
576        for addr in ('John', 'Dinsdale'):
577            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
578                                 re.MULTILINE)
579            self.assertRegex(debugout, to_addr)
580
581    def testSendMessageWithSpecifiedAddresses(self):
582        # Make sure addresses specified in call override those in message.
583        m = email.mime.text.MIMEText('A test message')
584        m['From'] = 'foo@bar.com'
585        m['To'] = 'John, Dinsdale'
586        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
587                            timeout=support.LOOPBACK_TIMEOUT)
588        self.addCleanup(smtp.close)
589        smtp.send_message(m, from_addr='joe@example.com', to_addrs='foo@example.net')
590        # XXX (see comment in testSend)
591        time.sleep(0.01)
592        smtp.quit()
593
594        self.client_evt.set()
595        self.serv_evt.wait()
596        self.output.flush()
597        # Remove the X-Peer header that DebuggingServer adds.
598        test_output = self.get_output_without_xpeer()
599        del m['X-Peer']
600        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
601        self.assertEqual(test_output, mexpect)
602        debugout = smtpd.DEBUGSTREAM.getvalue()
603        sender = re.compile("^sender: joe@example.com$", re.MULTILINE)
604        self.assertRegex(debugout, sender)
605        for addr in ('John', 'Dinsdale'):
606            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
607                                 re.MULTILINE)
608            self.assertNotRegex(debugout, to_addr)
609        recip = re.compile(r"^recips: .*'foo@example.net'.*$", re.MULTILINE)
610        self.assertRegex(debugout, recip)
611
612    def testSendMessageWithMultipleFrom(self):
613        # Sender overrides To
614        m = email.mime.text.MIMEText('A test message')
615        m['From'] = 'Bernard, Bianca'
616        m['Sender'] = 'the_rescuers@Rescue-Aid-Society.com'
617        m['To'] = 'John, Dinsdale'
618        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
619                            timeout=support.LOOPBACK_TIMEOUT)
620        self.addCleanup(smtp.close)
621        smtp.send_message(m)
622        # XXX (see comment in testSend)
623        time.sleep(0.01)
624        smtp.quit()
625
626        self.client_evt.set()
627        self.serv_evt.wait()
628        self.output.flush()
629        # Remove the X-Peer header that DebuggingServer adds.
630        test_output = self.get_output_without_xpeer()
631        del m['X-Peer']
632        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
633        self.assertEqual(test_output, mexpect)
634        debugout = smtpd.DEBUGSTREAM.getvalue()
635        sender = re.compile("^sender: the_rescuers@Rescue-Aid-Society.com$", re.MULTILINE)
636        self.assertRegex(debugout, sender)
637        for addr in ('John', 'Dinsdale'):
638            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
639                                 re.MULTILINE)
640            self.assertRegex(debugout, to_addr)
641
642    def testSendMessageResent(self):
643        m = email.mime.text.MIMEText('A test message')
644        m['From'] = 'foo@bar.com'
645        m['To'] = 'John'
646        m['CC'] = 'Sally, Fred'
647        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
648        m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
649        m['Resent-From'] = 'holy@grail.net'
650        m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
651        m['Resent-Bcc'] = 'doe@losthope.net'
652        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
653                            timeout=support.LOOPBACK_TIMEOUT)
654        self.addCleanup(smtp.close)
655        smtp.send_message(m)
656        # XXX (see comment in testSend)
657        time.sleep(0.01)
658        smtp.quit()
659
660        self.client_evt.set()
661        self.serv_evt.wait()
662        self.output.flush()
663        # The Resent-Bcc headers are deleted before serialization.
664        del m['Bcc']
665        del m['Resent-Bcc']
666        # Remove the X-Peer header that DebuggingServer adds.
667        test_output = self.get_output_without_xpeer()
668        del m['X-Peer']
669        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
670        self.assertEqual(test_output, mexpect)
671        debugout = smtpd.DEBUGSTREAM.getvalue()
672        sender = re.compile("^sender: holy@grail.net$", re.MULTILINE)
673        self.assertRegex(debugout, sender)
674        for addr in ('my_mom@great.cooker.com', 'Jeff', 'doe@losthope.net'):
675            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
676                                 re.MULTILINE)
677            self.assertRegex(debugout, to_addr)
678
679    def testSendMessageMultipleResentRaises(self):
680        m = email.mime.text.MIMEText('A test message')
681        m['From'] = 'foo@bar.com'
682        m['To'] = 'John'
683        m['CC'] = 'Sally, Fred'
684        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
685        m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
686        m['Resent-From'] = 'holy@grail.net'
687        m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
688        m['Resent-Bcc'] = 'doe@losthope.net'
689        m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000'
690        m['Resent-To'] = 'holy@grail.net'
691        m['Resent-From'] = 'Martha <my_mom@great.cooker.com>, Jeff'
692        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
693                            timeout=support.LOOPBACK_TIMEOUT)
694        self.addCleanup(smtp.close)
695        with self.assertRaises(ValueError):
696            smtp.send_message(m)
697        smtp.close()
698
699class NonConnectingTests(unittest.TestCase):
700
701    def testNotConnected(self):
702        # Test various operations on an unconnected SMTP object that
703        # should raise exceptions (at present the attempt in SMTP.send
704        # to reference the nonexistent 'sock' attribute of the SMTP object
705        # causes an AttributeError)
706        smtp = smtplib.SMTP()
707        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
708        self.assertRaises(smtplib.SMTPServerDisconnected,
709                          smtp.send, 'test msg')
710
711    def testNonnumericPort(self):
712        # check that non-numeric port raises OSError
713        self.assertRaises(OSError, smtplib.SMTP,
714                          "localhost", "bogus")
715        self.assertRaises(OSError, smtplib.SMTP,
716                          "localhost:bogus")
717
718    def testSockAttributeExists(self):
719        # check that sock attribute is present outside of a connect() call
720        # (regression test, the previous behavior raised an
721        #  AttributeError: 'SMTP' object has no attribute 'sock')
722        with smtplib.SMTP() as smtp:
723            self.assertIsNone(smtp.sock)
724
725
726class DefaultArgumentsTests(unittest.TestCase):
727
728    def setUp(self):
729        self.msg = EmailMessage()
730        self.msg['From'] = 'Páolo <főo@bar.com>'
731        self.smtp = smtplib.SMTP()
732        self.smtp.ehlo = Mock(return_value=(200, 'OK'))
733        self.smtp.has_extn, self.smtp.sendmail = Mock(), Mock()
734
735    def testSendMessage(self):
736        expected_mail_options = ('SMTPUTF8', 'BODY=8BITMIME')
737        self.smtp.send_message(self.msg)
738        self.smtp.send_message(self.msg)
739        self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
740                         expected_mail_options)
741        self.assertEqual(self.smtp.sendmail.call_args_list[1][0][3],
742                         expected_mail_options)
743
744    def testSendMessageWithMailOptions(self):
745        mail_options = ['STARTTLS']
746        expected_mail_options = ('STARTTLS', 'SMTPUTF8', 'BODY=8BITMIME')
747        self.smtp.send_message(self.msg, None, None, mail_options)
748        self.assertEqual(mail_options, ['STARTTLS'])
749        self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
750                         expected_mail_options)
751
752
753# test response of client to a non-successful HELO message
754class BadHELOServerTests(unittest.TestCase):
755
756    def setUp(self):
757        smtplib.socket = mock_socket
758        mock_socket.reply_with(b"199 no hello for you!")
759        self.old_stdout = sys.stdout
760        self.output = io.StringIO()
761        sys.stdout = self.output
762        self.port = 25
763
764    def tearDown(self):
765        smtplib.socket = socket
766        sys.stdout = self.old_stdout
767
768    def testFailingHELO(self):
769        self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,
770                            HOST, self.port, 'localhost', 3)
771
772
773class TooLongLineTests(unittest.TestCase):
774    respdata = b'250 OK' + (b'.' * smtplib._MAXLINE * 2) + b'\n'
775
776    def setUp(self):
777        self.thread_key = threading_helper.threading_setup()
778        self.old_stdout = sys.stdout
779        self.output = io.StringIO()
780        sys.stdout = self.output
781
782        self.evt = threading.Event()
783        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
784        self.sock.settimeout(15)
785        self.port = socket_helper.bind_port(self.sock)
786        servargs = (self.evt, self.respdata, self.sock)
787        self.thread = threading.Thread(target=server, args=servargs)
788        self.thread.start()
789        self.evt.wait()
790        self.evt.clear()
791
792    def tearDown(self):
793        self.evt.wait()
794        sys.stdout = self.old_stdout
795        threading_helper.join_thread(self.thread)
796        del self.thread
797        self.doCleanups()
798        threading_helper.threading_cleanup(*self.thread_key)
799
800    def testLineTooLong(self):
801        self.assertRaises(smtplib.SMTPResponseException, smtplib.SMTP,
802                          HOST, self.port, 'localhost', 3)
803
804
805sim_users = {'Mr.A@somewhere.com':'John A',
806             'Ms.B@xn--fo-fka.com':'Sally B',
807             'Mrs.C@somewhereesle.com':'Ruth C',
808            }
809
810sim_auth = ('Mr.A@somewhere.com', 'somepassword')
811sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'
812                          'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')
813sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'],
814             'list-2':['Ms.B@xn--fo-fka.com',],
815            }
816
817# Simulated SMTP channel & server
818class ResponseException(Exception): pass
819class SimSMTPChannel(smtpd.SMTPChannel):
820
821    quit_response = None
822    mail_response = None
823    rcpt_response = None
824    data_response = None
825    rcpt_count = 0
826    rset_count = 0
827    disconnect = 0
828    AUTH = 99    # Add protocol state to enable auth testing.
829    authenticated_user = None
830
831    def __init__(self, extra_features, *args, **kw):
832        self._extrafeatures = ''.join(
833            [ "250-{0}\r\n".format(x) for x in extra_features ])
834        super(SimSMTPChannel, self).__init__(*args, **kw)
835
836    # AUTH related stuff.  It would be nice if support for this were in smtpd.
837    def found_terminator(self):
838        if self.smtp_state == self.AUTH:
839            line = self._emptystring.join(self.received_lines)
840            print('Data:', repr(line), file=smtpd.DEBUGSTREAM)
841            self.received_lines = []
842            try:
843                self.auth_object(line)
844            except ResponseException as e:
845                self.smtp_state = self.COMMAND
846                self.push('%s %s' % (e.smtp_code, e.smtp_error))
847            return
848        super().found_terminator()
849
850
851    def smtp_AUTH(self, arg):
852        if not self.seen_greeting:
853            self.push('503 Error: send EHLO first')
854            return
855        if not self.extended_smtp or 'AUTH' not in self._extrafeatures:
856            self.push('500 Error: command "AUTH" not recognized')
857            return
858        if self.authenticated_user is not None:
859            self.push(
860                '503 Bad sequence of commands: already authenticated')
861            return
862        args = arg.split()
863        if len(args) not in [1, 2]:
864            self.push('501 Syntax: AUTH <mechanism> [initial-response]')
865            return
866        auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_')
867        try:
868            self.auth_object = getattr(self, auth_object_name)
869        except AttributeError:
870            self.push('504 Command parameter not implemented: unsupported '
871                      ' authentication mechanism {!r}'.format(auth_object_name))
872            return
873        self.smtp_state = self.AUTH
874        self.auth_object(args[1] if len(args) == 2 else None)
875
876    def _authenticated(self, user, valid):
877        if valid:
878            self.authenticated_user = user
879            self.push('235 Authentication Succeeded')
880        else:
881            self.push('535 Authentication credentials invalid')
882        self.smtp_state = self.COMMAND
883
884    def _decode_base64(self, string):
885        return base64.decodebytes(string.encode('ascii')).decode('utf-8')
886
887    def _auth_plain(self, arg=None):
888        if arg is None:
889            self.push('334 ')
890        else:
891            logpass = self._decode_base64(arg)
892            try:
893                *_, user, password = logpass.split('\0')
894            except ValueError as e:
895                self.push('535 Splitting response {!r} into user and password'
896                          ' failed: {}'.format(logpass, e))
897                return
898            self._authenticated(user, password == sim_auth[1])
899
900    def _auth_login(self, arg=None):
901        if arg is None:
902            # base64 encoded 'Username:'
903            self.push('334 VXNlcm5hbWU6')
904        elif not hasattr(self, '_auth_login_user'):
905            self._auth_login_user = self._decode_base64(arg)
906            # base64 encoded 'Password:'
907            self.push('334 UGFzc3dvcmQ6')
908        else:
909            password = self._decode_base64(arg)
910            self._authenticated(self._auth_login_user, password == sim_auth[1])
911            del self._auth_login_user
912
913    def _auth_buggy(self, arg=None):
914        # This AUTH mechanism will 'trap' client in a neverending 334
915        # base64 encoded 'BuGgYbUgGy'
916        self.push('334 QnVHZ1liVWdHeQ==')
917
918    def _auth_cram_md5(self, arg=None):
919        if arg is None:
920            self.push('334 {}'.format(sim_cram_md5_challenge))
921        else:
922            logpass = self._decode_base64(arg)
923            try:
924                user, hashed_pass = logpass.split()
925            except ValueError as e:
926                self.push('535 Splitting response {!r} into user and password '
927                          'failed: {}'.format(logpass, e))
928                return False
929            valid_hashed_pass = hmac.HMAC(
930                sim_auth[1].encode('ascii'),
931                self._decode_base64(sim_cram_md5_challenge).encode('ascii'),
932                'md5').hexdigest()
933            self._authenticated(user, hashed_pass == valid_hashed_pass)
934    # end AUTH related stuff.
935
936    def smtp_EHLO(self, arg):
937        resp = ('250-testhost\r\n'
938                '250-EXPN\r\n'
939                '250-SIZE 20000000\r\n'
940                '250-STARTTLS\r\n'
941                '250-DELIVERBY\r\n')
942        resp = resp + self._extrafeatures + '250 HELP'
943        self.push(resp)
944        self.seen_greeting = arg
945        self.extended_smtp = True
946
947    def smtp_VRFY(self, arg):
948        # For max compatibility smtplib should be sending the raw address.
949        if arg in sim_users:
950            self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg)))
951        else:
952            self.push('550 No such user: %s' % arg)
953
954    def smtp_EXPN(self, arg):
955        list_name = arg.lower()
956        if list_name in sim_lists:
957            user_list = sim_lists[list_name]
958            for n, user_email in enumerate(user_list):
959                quoted_addr = smtplib.quoteaddr(user_email)
960                if n < len(user_list) - 1:
961                    self.push('250-%s %s' % (sim_users[user_email], quoted_addr))
962                else:
963                    self.push('250 %s %s' % (sim_users[user_email], quoted_addr))
964        else:
965            self.push('550 No access for you!')
966
967    def smtp_QUIT(self, arg):
968        if self.quit_response is None:
969            super(SimSMTPChannel, self).smtp_QUIT(arg)
970        else:
971            self.push(self.quit_response)
972            self.close_when_done()
973
974    def smtp_MAIL(self, arg):
975        if self.mail_response is None:
976            super().smtp_MAIL(arg)
977        else:
978            self.push(self.mail_response)
979            if self.disconnect:
980                self.close_when_done()
981
982    def smtp_RCPT(self, arg):
983        if self.rcpt_response is None:
984            super().smtp_RCPT(arg)
985            return
986        self.rcpt_count += 1
987        self.push(self.rcpt_response[self.rcpt_count-1])
988
989    def smtp_RSET(self, arg):
990        self.rset_count += 1
991        super().smtp_RSET(arg)
992
993    def smtp_DATA(self, arg):
994        if self.data_response is None:
995            super().smtp_DATA(arg)
996        else:
997            self.push(self.data_response)
998
999    def handle_error(self):
1000        raise
1001
1002
1003class SimSMTPServer(smtpd.SMTPServer):
1004
1005    channel_class = SimSMTPChannel
1006
1007    def __init__(self, *args, **kw):
1008        self._extra_features = []
1009        self._addresses = {}
1010        smtpd.SMTPServer.__init__(self, *args, **kw)
1011
1012    def handle_accepted(self, conn, addr):
1013        self._SMTPchannel = self.channel_class(
1014            self._extra_features, self, conn, addr,
1015            decode_data=self._decode_data)
1016
1017    def process_message(self, peer, mailfrom, rcpttos, data):
1018        self._addresses['from'] = mailfrom
1019        self._addresses['tos'] = rcpttos
1020
1021    def add_feature(self, feature):
1022        self._extra_features.append(feature)
1023
1024    def handle_error(self):
1025        raise
1026
1027
1028# Test various SMTP & ESMTP commands/behaviors that require a simulated server
1029# (i.e., something with more features than DebuggingServer)
1030class SMTPSimTests(unittest.TestCase):
1031
1032    def setUp(self):
1033        self.thread_key = threading_helper.threading_setup()
1034        self.real_getfqdn = socket.getfqdn
1035        socket.getfqdn = mock_socket.getfqdn
1036        self.serv_evt = threading.Event()
1037        self.client_evt = threading.Event()
1038        # Pick a random unused port by passing 0 for the port number
1039        self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1), decode_data=True)
1040        # Keep a note of what port was assigned
1041        self.port = self.serv.socket.getsockname()[1]
1042        serv_args = (self.serv, self.serv_evt, self.client_evt)
1043        self.thread = threading.Thread(target=debugging_server, args=serv_args)
1044        self.thread.start()
1045
1046        # wait until server thread has assigned a port number
1047        self.serv_evt.wait()
1048        self.serv_evt.clear()
1049
1050    def tearDown(self):
1051        socket.getfqdn = self.real_getfqdn
1052        # indicate that the client is finished
1053        self.client_evt.set()
1054        # wait for the server thread to terminate
1055        self.serv_evt.wait()
1056        threading_helper.join_thread(self.thread)
1057        del self.thread
1058        self.doCleanups()
1059        threading_helper.threading_cleanup(*self.thread_key)
1060
1061    def testBasic(self):
1062        # smoke test
1063        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1064                            timeout=support.LOOPBACK_TIMEOUT)
1065        smtp.quit()
1066
1067    def testEHLO(self):
1068        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1069                            timeout=support.LOOPBACK_TIMEOUT)
1070
1071        # no features should be present before the EHLO
1072        self.assertEqual(smtp.esmtp_features, {})
1073
1074        # features expected from the test server
1075        expected_features = {'expn':'',
1076                             'size': '20000000',
1077                             'starttls': '',
1078                             'deliverby': '',
1079                             'help': '',
1080                             }
1081
1082        smtp.ehlo()
1083        self.assertEqual(smtp.esmtp_features, expected_features)
1084        for k in expected_features:
1085            self.assertTrue(smtp.has_extn(k))
1086        self.assertFalse(smtp.has_extn('unsupported-feature'))
1087        smtp.quit()
1088
1089    def testVRFY(self):
1090        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1091                            timeout=support.LOOPBACK_TIMEOUT)
1092
1093        for addr_spec, name in sim_users.items():
1094            expected_known = (250, bytes('%s %s' %
1095                                         (name, smtplib.quoteaddr(addr_spec)),
1096                                         "ascii"))
1097            self.assertEqual(smtp.vrfy(addr_spec), expected_known)
1098
1099        u = 'nobody@nowhere.com'
1100        expected_unknown = (550, ('No such user: %s' % u).encode('ascii'))
1101        self.assertEqual(smtp.vrfy(u), expected_unknown)
1102        smtp.quit()
1103
1104    def testEXPN(self):
1105        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1106                            timeout=support.LOOPBACK_TIMEOUT)
1107
1108        for listname, members in sim_lists.items():
1109            users = []
1110            for m in members:
1111                users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m)))
1112            expected_known = (250, bytes('\n'.join(users), "ascii"))
1113            self.assertEqual(smtp.expn(listname), expected_known)
1114
1115        u = 'PSU-Members-List'
1116        expected_unknown = (550, b'No access for you!')
1117        self.assertEqual(smtp.expn(u), expected_unknown)
1118        smtp.quit()
1119
1120    def testAUTH_PLAIN(self):
1121        self.serv.add_feature("AUTH PLAIN")
1122        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1123                            timeout=support.LOOPBACK_TIMEOUT)
1124        resp = smtp.login(sim_auth[0], sim_auth[1])
1125        self.assertEqual(resp, (235, b'Authentication Succeeded'))
1126        smtp.close()
1127
1128    def testAUTH_LOGIN(self):
1129        self.serv.add_feature("AUTH LOGIN")
1130        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1131                            timeout=support.LOOPBACK_TIMEOUT)
1132        resp = smtp.login(sim_auth[0], sim_auth[1])
1133        self.assertEqual(resp, (235, b'Authentication Succeeded'))
1134        smtp.close()
1135
1136    def testAUTH_LOGIN_initial_response_ok(self):
1137        self.serv.add_feature("AUTH LOGIN")
1138        with smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1139                          timeout=support.LOOPBACK_TIMEOUT) as smtp:
1140            smtp.user, smtp.password = sim_auth
1141            smtp.ehlo("test_auth_login")
1142            resp = smtp.auth("LOGIN", smtp.auth_login, initial_response_ok=True)
1143            self.assertEqual(resp, (235, b'Authentication Succeeded'))
1144
1145    def testAUTH_LOGIN_initial_response_notok(self):
1146        self.serv.add_feature("AUTH LOGIN")
1147        with smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1148                          timeout=support.LOOPBACK_TIMEOUT) as smtp:
1149            smtp.user, smtp.password = sim_auth
1150            smtp.ehlo("test_auth_login")
1151            resp = smtp.auth("LOGIN", smtp.auth_login, initial_response_ok=False)
1152            self.assertEqual(resp, (235, b'Authentication Succeeded'))
1153
1154    def testAUTH_BUGGY(self):
1155        self.serv.add_feature("AUTH BUGGY")
1156
1157        def auth_buggy(challenge=None):
1158            self.assertEqual(b"BuGgYbUgGy", challenge)
1159            return "\0"
1160
1161        smtp = smtplib.SMTP(
1162            HOST, self.port, local_hostname='localhost',
1163            timeout=support.LOOPBACK_TIMEOUT
1164        )
1165        try:
1166            smtp.user, smtp.password = sim_auth
1167            smtp.ehlo("test_auth_buggy")
1168            expect = r"^Server AUTH mechanism infinite loop.*"
1169            with self.assertRaisesRegex(smtplib.SMTPException, expect) as cm:
1170                smtp.auth("BUGGY", auth_buggy, initial_response_ok=False)
1171        finally:
1172            smtp.close()
1173
1174    @hashlib_helper.requires_hashdigest('md5')
1175    def testAUTH_CRAM_MD5(self):
1176        self.serv.add_feature("AUTH CRAM-MD5")
1177        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1178                            timeout=support.LOOPBACK_TIMEOUT)
1179        resp = smtp.login(sim_auth[0], sim_auth[1])
1180        self.assertEqual(resp, (235, b'Authentication Succeeded'))
1181        smtp.close()
1182
1183    @hashlib_helper.requires_hashdigest('md5')
1184    def testAUTH_multiple(self):
1185        # Test that multiple authentication methods are tried.
1186        self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5")
1187        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1188                            timeout=support.LOOPBACK_TIMEOUT)
1189        resp = smtp.login(sim_auth[0], sim_auth[1])
1190        self.assertEqual(resp, (235, b'Authentication Succeeded'))
1191        smtp.close()
1192
1193    def test_auth_function(self):
1194        supported = {'PLAIN', 'LOGIN'}
1195        try:
1196            hashlib.md5()
1197        except ValueError:
1198            pass
1199        else:
1200            supported.add('CRAM-MD5')
1201        for mechanism in supported:
1202            self.serv.add_feature("AUTH {}".format(mechanism))
1203        for mechanism in supported:
1204            with self.subTest(mechanism=mechanism):
1205                smtp = smtplib.SMTP(HOST, self.port,
1206                                    local_hostname='localhost',
1207                                    timeout=support.LOOPBACK_TIMEOUT)
1208                smtp.ehlo('foo')
1209                smtp.user, smtp.password = sim_auth[0], sim_auth[1]
1210                method = 'auth_' + mechanism.lower().replace('-', '_')
1211                resp = smtp.auth(mechanism, getattr(smtp, method))
1212                self.assertEqual(resp, (235, b'Authentication Succeeded'))
1213                smtp.close()
1214
1215    def test_quit_resets_greeting(self):
1216        smtp = smtplib.SMTP(HOST, self.port,
1217                            local_hostname='localhost',
1218                            timeout=support.LOOPBACK_TIMEOUT)
1219        code, message = smtp.ehlo()
1220        self.assertEqual(code, 250)
1221        self.assertIn('size', smtp.esmtp_features)
1222        smtp.quit()
1223        self.assertNotIn('size', smtp.esmtp_features)
1224        smtp.connect(HOST, self.port)
1225        self.assertNotIn('size', smtp.esmtp_features)
1226        smtp.ehlo_or_helo_if_needed()
1227        self.assertIn('size', smtp.esmtp_features)
1228        smtp.quit()
1229
1230    def test_with_statement(self):
1231        with smtplib.SMTP(HOST, self.port) as smtp:
1232            code, message = smtp.noop()
1233            self.assertEqual(code, 250)
1234        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
1235        with smtplib.SMTP(HOST, self.port) as smtp:
1236            smtp.close()
1237        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
1238
1239    def test_with_statement_QUIT_failure(self):
1240        with self.assertRaises(smtplib.SMTPResponseException) as error:
1241            with smtplib.SMTP(HOST, self.port) as smtp:
1242                smtp.noop()
1243                self.serv._SMTPchannel.quit_response = '421 QUIT FAILED'
1244        self.assertEqual(error.exception.smtp_code, 421)
1245        self.assertEqual(error.exception.smtp_error, b'QUIT FAILED')
1246
1247    #TODO: add tests for correct AUTH method fallback now that the
1248    #test infrastructure can support it.
1249
1250    # Issue 17498: make sure _rset does not raise SMTPServerDisconnected exception
1251    def test__rest_from_mail_cmd(self):
1252        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1253                            timeout=support.LOOPBACK_TIMEOUT)
1254        smtp.noop()
1255        self.serv._SMTPchannel.mail_response = '451 Requested action aborted'
1256        self.serv._SMTPchannel.disconnect = True
1257        with self.assertRaises(smtplib.SMTPSenderRefused):
1258            smtp.sendmail('John', 'Sally', 'test message')
1259        self.assertIsNone(smtp.sock)
1260
1261    # Issue 5713: make sure close, not rset, is called if we get a 421 error
1262    def test_421_from_mail_cmd(self):
1263        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1264                            timeout=support.LOOPBACK_TIMEOUT)
1265        smtp.noop()
1266        self.serv._SMTPchannel.mail_response = '421 closing connection'
1267        with self.assertRaises(smtplib.SMTPSenderRefused):
1268            smtp.sendmail('John', 'Sally', 'test message')
1269        self.assertIsNone(smtp.sock)
1270        self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
1271
1272    def test_421_from_rcpt_cmd(self):
1273        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1274                            timeout=support.LOOPBACK_TIMEOUT)
1275        smtp.noop()
1276        self.serv._SMTPchannel.rcpt_response = ['250 accepted', '421 closing']
1277        with self.assertRaises(smtplib.SMTPRecipientsRefused) as r:
1278            smtp.sendmail('John', ['Sally', 'Frank', 'George'], 'test message')
1279        self.assertIsNone(smtp.sock)
1280        self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
1281        self.assertDictEqual(r.exception.args[0], {'Frank': (421, b'closing')})
1282
1283    def test_421_from_data_cmd(self):
1284        class MySimSMTPChannel(SimSMTPChannel):
1285            def found_terminator(self):
1286                if self.smtp_state == self.DATA:
1287                    self.push('421 closing')
1288                else:
1289                    super().found_terminator()
1290        self.serv.channel_class = MySimSMTPChannel
1291        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1292                            timeout=support.LOOPBACK_TIMEOUT)
1293        smtp.noop()
1294        with self.assertRaises(smtplib.SMTPDataError):
1295            smtp.sendmail('John@foo.org', ['Sally@foo.org'], 'test message')
1296        self.assertIsNone(smtp.sock)
1297        self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0)
1298
1299    def test_smtputf8_NotSupportedError_if_no_server_support(self):
1300        smtp = smtplib.SMTP(
1301            HOST, self.port, local_hostname='localhost',
1302            timeout=support.LOOPBACK_TIMEOUT)
1303        self.addCleanup(smtp.close)
1304        smtp.ehlo()
1305        self.assertTrue(smtp.does_esmtp)
1306        self.assertFalse(smtp.has_extn('smtputf8'))
1307        self.assertRaises(
1308            smtplib.SMTPNotSupportedError,
1309            smtp.sendmail,
1310            'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
1311        self.assertRaises(
1312            smtplib.SMTPNotSupportedError,
1313            smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8'])
1314
1315    def test_send_unicode_without_SMTPUTF8(self):
1316        smtp = smtplib.SMTP(
1317            HOST, self.port, local_hostname='localhost',
1318            timeout=support.LOOPBACK_TIMEOUT)
1319        self.addCleanup(smtp.close)
1320        self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Böb', '')
1321        self.assertRaises(UnicodeEncodeError, smtp.mail, 'Älice')
1322
1323    def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self):
1324        # This test is located here and not in the SMTPUTF8SimTests
1325        # class because it needs a "regular" SMTP server to work
1326        msg = EmailMessage()
1327        msg['From'] = "Páolo <főo@bar.com>"
1328        msg['To'] = 'Dinsdale'
1329        msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
1330        smtp = smtplib.SMTP(
1331            HOST, self.port, local_hostname='localhost',
1332            timeout=support.LOOPBACK_TIMEOUT)
1333        self.addCleanup(smtp.close)
1334        with self.assertRaises(smtplib.SMTPNotSupportedError):
1335            smtp.send_message(msg)
1336
1337    def test_name_field_not_included_in_envelop_addresses(self):
1338        smtp = smtplib.SMTP(
1339            HOST, self.port, local_hostname='localhost',
1340            timeout=support.LOOPBACK_TIMEOUT)
1341        self.addCleanup(smtp.close)
1342
1343        message = EmailMessage()
1344        message['From'] = email.utils.formataddr(('Michaël', 'michael@example.com'))
1345        message['To'] = email.utils.formataddr(('René', 'rene@example.com'))
1346
1347        self.assertDictEqual(smtp.send_message(message), {})
1348
1349        self.assertEqual(self.serv._addresses['from'], 'michael@example.com')
1350        self.assertEqual(self.serv._addresses['tos'], ['rene@example.com'])
1351
1352
1353class SimSMTPUTF8Server(SimSMTPServer):
1354
1355    def __init__(self, *args, **kw):
1356        # The base SMTP server turns these on automatically, but our test
1357        # server is set up to munge the EHLO response, so we need to provide
1358        # them as well.  And yes, the call is to SMTPServer not SimSMTPServer.
1359        self._extra_features = ['SMTPUTF8', '8BITMIME']
1360        smtpd.SMTPServer.__init__(self, *args, **kw)
1361
1362    def handle_accepted(self, conn, addr):
1363        self._SMTPchannel = self.channel_class(
1364            self._extra_features, self, conn, addr,
1365            decode_data=self._decode_data,
1366            enable_SMTPUTF8=self.enable_SMTPUTF8,
1367        )
1368
1369    def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None,
1370                                                             rcpt_options=None):
1371        self.last_peer = peer
1372        self.last_mailfrom = mailfrom
1373        self.last_rcpttos = rcpttos
1374        self.last_message = data
1375        self.last_mail_options = mail_options
1376        self.last_rcpt_options = rcpt_options
1377
1378
1379class SMTPUTF8SimTests(unittest.TestCase):
1380
1381    maxDiff = None
1382
1383    def setUp(self):
1384        self.thread_key = threading_helper.threading_setup()
1385        self.real_getfqdn = socket.getfqdn
1386        socket.getfqdn = mock_socket.getfqdn
1387        self.serv_evt = threading.Event()
1388        self.client_evt = threading.Event()
1389        # Pick a random unused port by passing 0 for the port number
1390        self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1),
1391                                      decode_data=False,
1392                                      enable_SMTPUTF8=True)
1393        # Keep a note of what port was assigned
1394        self.port = self.serv.socket.getsockname()[1]
1395        serv_args = (self.serv, self.serv_evt, self.client_evt)
1396        self.thread = threading.Thread(target=debugging_server, args=serv_args)
1397        self.thread.start()
1398
1399        # wait until server thread has assigned a port number
1400        self.serv_evt.wait()
1401        self.serv_evt.clear()
1402
1403    def tearDown(self):
1404        socket.getfqdn = self.real_getfqdn
1405        # indicate that the client is finished
1406        self.client_evt.set()
1407        # wait for the server thread to terminate
1408        self.serv_evt.wait()
1409        threading_helper.join_thread(self.thread)
1410        del self.thread
1411        self.doCleanups()
1412        threading_helper.threading_cleanup(*self.thread_key)
1413
1414    def test_test_server_supports_extensions(self):
1415        smtp = smtplib.SMTP(
1416            HOST, self.port, local_hostname='localhost',
1417            timeout=support.LOOPBACK_TIMEOUT)
1418        self.addCleanup(smtp.close)
1419        smtp.ehlo()
1420        self.assertTrue(smtp.does_esmtp)
1421        self.assertTrue(smtp.has_extn('smtputf8'))
1422
1423    def test_send_unicode_with_SMTPUTF8_via_sendmail(self):
1424        m = '¡a test message containing unicode!'.encode('utf-8')
1425        smtp = smtplib.SMTP(
1426            HOST, self.port, local_hostname='localhost',
1427            timeout=support.LOOPBACK_TIMEOUT)
1428        self.addCleanup(smtp.close)
1429        smtp.sendmail('Jőhn', 'Sálly', m,
1430                      mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
1431        self.assertEqual(self.serv.last_mailfrom, 'Jőhn')
1432        self.assertEqual(self.serv.last_rcpttos, ['Sálly'])
1433        self.assertEqual(self.serv.last_message, m)
1434        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1435        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1436        self.assertEqual(self.serv.last_rcpt_options, [])
1437
1438    def test_send_unicode_with_SMTPUTF8_via_low_level_API(self):
1439        m = '¡a test message containing unicode!'.encode('utf-8')
1440        smtp = smtplib.SMTP(
1441            HOST, self.port, local_hostname='localhost',
1442            timeout=support.LOOPBACK_TIMEOUT)
1443        self.addCleanup(smtp.close)
1444        smtp.ehlo()
1445        self.assertEqual(
1446            smtp.mail('Jő', options=['BODY=8BITMIME', 'SMTPUTF8']),
1447            (250, b'OK'))
1448        self.assertEqual(smtp.rcpt('János'), (250, b'OK'))
1449        self.assertEqual(smtp.data(m), (250, b'OK'))
1450        self.assertEqual(self.serv.last_mailfrom, 'Jő')
1451        self.assertEqual(self.serv.last_rcpttos, ['János'])
1452        self.assertEqual(self.serv.last_message, m)
1453        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1454        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1455        self.assertEqual(self.serv.last_rcpt_options, [])
1456
1457    def test_send_message_uses_smtputf8_if_addrs_non_ascii(self):
1458        msg = EmailMessage()
1459        msg['From'] = "Páolo <főo@bar.com>"
1460        msg['To'] = 'Dinsdale'
1461        msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
1462        # XXX I don't know why I need two \n's here, but this is an existing
1463        # bug (if it is one) and not a problem with the new functionality.
1464        msg.set_content("oh là là, know what I mean, know what I mean?\n\n")
1465        # XXX smtpd converts received /r/n to /n, so we can't easily test that
1466        # we are successfully sending /r/n :(.
1467        expected = textwrap.dedent("""\
1468            From: Páolo <főo@bar.com>
1469            To: Dinsdale
1470            Subject: Nudge nudge, wink, wink \u1F609
1471            Content-Type: text/plain; charset="utf-8"
1472            Content-Transfer-Encoding: 8bit
1473            MIME-Version: 1.0
1474
1475            oh là là, know what I mean, know what I mean?
1476            """)
1477        smtp = smtplib.SMTP(
1478            HOST, self.port, local_hostname='localhost',
1479            timeout=support.LOOPBACK_TIMEOUT)
1480        self.addCleanup(smtp.close)
1481        self.assertEqual(smtp.send_message(msg), {})
1482        self.assertEqual(self.serv.last_mailfrom, 'főo@bar.com')
1483        self.assertEqual(self.serv.last_rcpttos, ['Dinsdale'])
1484        self.assertEqual(self.serv.last_message.decode(), expected)
1485        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1486        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1487        self.assertEqual(self.serv.last_rcpt_options, [])
1488
1489
1490EXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='')
1491
1492class SimSMTPAUTHInitialResponseChannel(SimSMTPChannel):
1493    def smtp_AUTH(self, arg):
1494        # RFC 4954's AUTH command allows for an optional initial-response.
1495        # Not all AUTH methods support this; some require a challenge.  AUTH
1496        # PLAIN does those, so test that here.  See issue #15014.
1497        args = arg.split()
1498        if args[0].lower() == 'plain':
1499            if len(args) == 2:
1500                # AUTH PLAIN <initial-response> with the response base 64
1501                # encoded.  Hard code the expected response for the test.
1502                if args[1] == EXPECTED_RESPONSE:
1503                    self.push('235 Ok')
1504                    return
1505        self.push('571 Bad authentication')
1506
1507class SimSMTPAUTHInitialResponseServer(SimSMTPServer):
1508    channel_class = SimSMTPAUTHInitialResponseChannel
1509
1510
1511class SMTPAUTHInitialResponseSimTests(unittest.TestCase):
1512    def setUp(self):
1513        self.thread_key = threading_helper.threading_setup()
1514        self.real_getfqdn = socket.getfqdn
1515        socket.getfqdn = mock_socket.getfqdn
1516        self.serv_evt = threading.Event()
1517        self.client_evt = threading.Event()
1518        # Pick a random unused port by passing 0 for the port number
1519        self.serv = SimSMTPAUTHInitialResponseServer(
1520            (HOST, 0), ('nowhere', -1), decode_data=True)
1521        # Keep a note of what port was assigned
1522        self.port = self.serv.socket.getsockname()[1]
1523        serv_args = (self.serv, self.serv_evt, self.client_evt)
1524        self.thread = threading.Thread(target=debugging_server, args=serv_args)
1525        self.thread.start()
1526
1527        # wait until server thread has assigned a port number
1528        self.serv_evt.wait()
1529        self.serv_evt.clear()
1530
1531    def tearDown(self):
1532        socket.getfqdn = self.real_getfqdn
1533        # indicate that the client is finished
1534        self.client_evt.set()
1535        # wait for the server thread to terminate
1536        self.serv_evt.wait()
1537        threading_helper.join_thread(self.thread)
1538        del self.thread
1539        self.doCleanups()
1540        threading_helper.threading_cleanup(*self.thread_key)
1541
1542    def testAUTH_PLAIN_initial_response_login(self):
1543        self.serv.add_feature('AUTH PLAIN')
1544        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1545                            timeout=support.LOOPBACK_TIMEOUT)
1546        smtp.login('psu', 'doesnotexist')
1547        smtp.close()
1548
1549    def testAUTH_PLAIN_initial_response_auth(self):
1550        self.serv.add_feature('AUTH PLAIN')
1551        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1552                            timeout=support.LOOPBACK_TIMEOUT)
1553        smtp.user = 'psu'
1554        smtp.password = 'doesnotexist'
1555        code, response = smtp.auth('plain', smtp.auth_plain)
1556        smtp.close()
1557        self.assertEqual(code, 235)
1558
1559
1560if __name__ == '__main__':
1561    unittest.main()
1562