1import asyncore
2import base64
3import email.mime.text
4from email.message import EmailMessage
5from email.base64mime import body_encode as encode_base64
6import email.utils
7import hmac
8import socket
9import smtpd
10import smtplib
11import io
12import re
13import sys
14import time
15import select
16import errno
17import textwrap
18
19import unittest
20from test import support, mock_socket
21from unittest.mock import Mock
22
23HOST = "localhost"
24HOSTv4 = "127.0.0.1"
25HOSTv6 = "::1"
26
27try:
28    import threading
29except ImportError:
30    threading = None
31
32HOST = support.HOST
33
34if sys.platform == 'darwin':
35    # select.poll returns a select.POLLHUP at the end of the tests
36    # on darwin, so just ignore it
37    def handle_expt(self):
38        pass
39    smtpd.SMTPChannel.handle_expt = handle_expt
40
41
42def server(evt, buf, serv):
43    serv.listen()
44    evt.set()
45    try:
46        conn, addr = serv.accept()
47    except socket.timeout:
48        pass
49    else:
50        n = 500
51        while buf and n > 0:
52            r, w, e = select.select([], [conn], [])
53            if w:
54                sent = conn.send(buf)
55                buf = buf[sent:]
56
57            n -= 1
58
59        conn.close()
60    finally:
61        serv.close()
62        evt.set()
63
64class GeneralTests(unittest.TestCase):
65
66    def setUp(self):
67        smtplib.socket = mock_socket
68        self.port = 25
69
70    def tearDown(self):
71        smtplib.socket = socket
72
73    # This method is no longer used but is retained for backward compatibility,
74    # so test to make sure it still works.
75    def testQuoteData(self):
76        teststr  = "abc\n.jkl\rfoo\r\n..blue"
77        expected = "abc\r\n..jkl\r\nfoo\r\n...blue"
78        self.assertEqual(expected, smtplib.quotedata(teststr))
79
80    def testBasic1(self):
81        mock_socket.reply_with(b"220 Hola mundo")
82        # connects
83        smtp = smtplib.SMTP(HOST, self.port)
84        smtp.close()
85
86    def testSourceAddress(self):
87        mock_socket.reply_with(b"220 Hola mundo")
88        # connects
89        smtp = smtplib.SMTP(HOST, self.port,
90                source_address=('127.0.0.1',19876))
91        self.assertEqual(smtp.source_address, ('127.0.0.1', 19876))
92        smtp.close()
93
94    def testBasic2(self):
95        mock_socket.reply_with(b"220 Hola mundo")
96        # connects, include port in host name
97        smtp = smtplib.SMTP("%s:%s" % (HOST, self.port))
98        smtp.close()
99
100    def testLocalHostName(self):
101        mock_socket.reply_with(b"220 Hola mundo")
102        # check that supplied local_hostname is used
103        smtp = smtplib.SMTP(HOST, self.port, local_hostname="testhost")
104        self.assertEqual(smtp.local_hostname, "testhost")
105        smtp.close()
106
107    def testTimeoutDefault(self):
108        mock_socket.reply_with(b"220 Hola mundo")
109        self.assertIsNone(mock_socket.getdefaulttimeout())
110        mock_socket.setdefaulttimeout(30)
111        self.assertEqual(mock_socket.getdefaulttimeout(), 30)
112        try:
113            smtp = smtplib.SMTP(HOST, self.port)
114        finally:
115            mock_socket.setdefaulttimeout(None)
116        self.assertEqual(smtp.sock.gettimeout(), 30)
117        smtp.close()
118
119    def testTimeoutNone(self):
120        mock_socket.reply_with(b"220 Hola mundo")
121        self.assertIsNone(socket.getdefaulttimeout())
122        socket.setdefaulttimeout(30)
123        try:
124            smtp = smtplib.SMTP(HOST, self.port, timeout=None)
125        finally:
126            socket.setdefaulttimeout(None)
127        self.assertIsNone(smtp.sock.gettimeout())
128        smtp.close()
129
130    def testTimeoutValue(self):
131        mock_socket.reply_with(b"220 Hola mundo")
132        smtp = smtplib.SMTP(HOST, self.port, timeout=30)
133        self.assertEqual(smtp.sock.gettimeout(), 30)
134        smtp.close()
135
136    def test_debuglevel(self):
137        mock_socket.reply_with(b"220 Hello world")
138        smtp = smtplib.SMTP()
139        smtp.set_debuglevel(1)
140        with support.captured_stderr() as stderr:
141            smtp.connect(HOST, self.port)
142        smtp.close()
143        expected = re.compile(r"^connect:", re.MULTILINE)
144        self.assertRegex(stderr.getvalue(), expected)
145
146    def test_debuglevel_2(self):
147        mock_socket.reply_with(b"220 Hello world")
148        smtp = smtplib.SMTP()
149        smtp.set_debuglevel(2)
150        with support.captured_stderr() as stderr:
151            smtp.connect(HOST, self.port)
152        smtp.close()
153        expected = re.compile(r"^\d{2}:\d{2}:\d{2}\.\d{6} connect: ",
154                              re.MULTILINE)
155        self.assertRegex(stderr.getvalue(), expected)
156
157
158# Test server thread using the specified SMTP server class
159def debugging_server(serv, serv_evt, client_evt):
160    serv_evt.set()
161
162    try:
163        if hasattr(select, 'poll'):
164            poll_fun = asyncore.poll2
165        else:
166            poll_fun = asyncore.poll
167
168        n = 1000
169        while asyncore.socket_map and n > 0:
170            poll_fun(0.01, asyncore.socket_map)
171
172            # when the client conversation is finished, it will
173            # set client_evt, and it's then ok to kill the server
174            if client_evt.is_set():
175                serv.close()
176                break
177
178            n -= 1
179
180    except socket.timeout:
181        pass
182    finally:
183        if not client_evt.is_set():
184            # allow some time for the client to read the result
185            time.sleep(0.5)
186            serv.close()
187        asyncore.close_all()
188        serv_evt.set()
189
190MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
191MSG_END = '------------ END MESSAGE ------------\n'
192
193# NOTE: Some SMTP objects in the tests below are created with a non-default
194# local_hostname argument to the constructor, since (on some systems) the FQDN
195# lookup caused by the default local_hostname sometimes takes so long that the
196# test server times out, causing the test to fail.
197
198# Test behavior of smtpd.DebuggingServer
199@unittest.skipUnless(threading, 'Threading required for this test.')
200class DebuggingServerTests(unittest.TestCase):
201
202    maxDiff = None
203
204    def setUp(self):
205        self.real_getfqdn = socket.getfqdn
206        socket.getfqdn = mock_socket.getfqdn
207        # temporarily replace sys.stdout to capture DebuggingServer output
208        self.old_stdout = sys.stdout
209        self.output = io.StringIO()
210        sys.stdout = self.output
211
212        self.serv_evt = threading.Event()
213        self.client_evt = threading.Event()
214        # Capture SMTPChannel debug output
215        self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
216        smtpd.DEBUGSTREAM = io.StringIO()
217        # Pick a random unused port by passing 0 for the port number
218        self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1),
219                                          decode_data=True)
220        # Keep a note of what port was assigned
221        self.port = self.serv.socket.getsockname()[1]
222        serv_args = (self.serv, self.serv_evt, self.client_evt)
223        self.thread = threading.Thread(target=debugging_server, args=serv_args)
224        self.thread.start()
225
226        # wait until server thread has assigned a port number
227        self.serv_evt.wait()
228        self.serv_evt.clear()
229
230    def tearDown(self):
231        socket.getfqdn = self.real_getfqdn
232        # indicate that the client is finished
233        self.client_evt.set()
234        # wait for the server thread to terminate
235        self.serv_evt.wait()
236        self.thread.join()
237        # restore sys.stdout
238        sys.stdout = self.old_stdout
239        # restore DEBUGSTREAM
240        smtpd.DEBUGSTREAM.close()
241        smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM
242
243    def testBasic(self):
244        # connect
245        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
246        smtp.quit()
247
248    def testSourceAddress(self):
249        # connect
250        port = support.find_unused_port()
251        try:
252            smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
253                    timeout=3, source_address=('127.0.0.1', port))
254            self.assertEqual(smtp.source_address, ('127.0.0.1', port))
255            self.assertEqual(smtp.local_hostname, 'localhost')
256            smtp.quit()
257        except OSError as e:
258            if e.errno == errno.EADDRINUSE:
259                self.skipTest("couldn't bind to port %d" % port)
260            raise
261
262    def testNOOP(self):
263        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
264        expected = (250, b'OK')
265        self.assertEqual(smtp.noop(), expected)
266        smtp.quit()
267
268    def testRSET(self):
269        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
270        expected = (250, b'OK')
271        self.assertEqual(smtp.rset(), expected)
272        smtp.quit()
273
274    def testELHO(self):
275        # EHLO isn't implemented in DebuggingServer
276        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
277        expected = (250, b'\nSIZE 33554432\nHELP')
278        self.assertEqual(smtp.ehlo(), expected)
279        smtp.quit()
280
281    def testEXPNNotImplemented(self):
282        # EXPN isn't implemented in DebuggingServer
283        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
284        expected = (502, b'EXPN not implemented')
285        smtp.putcmd('EXPN')
286        self.assertEqual(smtp.getreply(), expected)
287        smtp.quit()
288
289    def testVRFY(self):
290        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
291        expected = (252, b'Cannot VRFY user, but will accept message ' + \
292                         b'and attempt delivery')
293        self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected)
294        self.assertEqual(smtp.verify('nobody@nowhere.com'), expected)
295        smtp.quit()
296
297    def testSecondHELO(self):
298        # check that a second HELO returns a message that it's a duplicate
299        # (this behavior is specific to smtpd.SMTPChannel)
300        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
301        smtp.helo()
302        expected = (503, b'Duplicate HELO/EHLO')
303        self.assertEqual(smtp.helo(), expected)
304        smtp.quit()
305
306    def testHELP(self):
307        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
308        self.assertEqual(smtp.help(), b'Supported commands: EHLO HELO MAIL ' + \
309                                      b'RCPT DATA RSET NOOP QUIT VRFY')
310        smtp.quit()
311
312    def testSend(self):
313        # connect and send mail
314        m = 'A test message'
315        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
316        smtp.sendmail('John', 'Sally', m)
317        # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor
318        # in asyncore.  This sleep might help, but should really be fixed
319        # properly by using an Event variable.
320        time.sleep(0.01)
321        smtp.quit()
322
323        self.client_evt.set()
324        self.serv_evt.wait()
325        self.output.flush()
326        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
327        self.assertEqual(self.output.getvalue(), mexpect)
328
329    def testSendBinary(self):
330        m = b'A test message'
331        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
332        smtp.sendmail('John', 'Sally', m)
333        # XXX (see comment in testSend)
334        time.sleep(0.01)
335        smtp.quit()
336
337        self.client_evt.set()
338        self.serv_evt.wait()
339        self.output.flush()
340        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END)
341        self.assertEqual(self.output.getvalue(), mexpect)
342
343    def testSendNeedingDotQuote(self):
344        # Issue 12283
345        m = '.A test\n.mes.sage.'
346        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
347        smtp.sendmail('John', 'Sally', m)
348        # XXX (see comment in testSend)
349        time.sleep(0.01)
350        smtp.quit()
351
352        self.client_evt.set()
353        self.serv_evt.wait()
354        self.output.flush()
355        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
356        self.assertEqual(self.output.getvalue(), mexpect)
357
358    def testSendNullSender(self):
359        m = 'A test message'
360        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
361        smtp.sendmail('<>', 'Sally', m)
362        # XXX (see comment in testSend)
363        time.sleep(0.01)
364        smtp.quit()
365
366        self.client_evt.set()
367        self.serv_evt.wait()
368        self.output.flush()
369        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
370        self.assertEqual(self.output.getvalue(), mexpect)
371        debugout = smtpd.DEBUGSTREAM.getvalue()
372        sender = re.compile("^sender: <>$", re.MULTILINE)
373        self.assertRegex(debugout, sender)
374
375    def testSendMessage(self):
376        m = email.mime.text.MIMEText('A test message')
377        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
378        smtp.send_message(m, from_addr='John', to_addrs='Sally')
379        # XXX (see comment in testSend)
380        time.sleep(0.01)
381        smtp.quit()
382
383        self.client_evt.set()
384        self.serv_evt.wait()
385        self.output.flush()
386        # Add the X-Peer header that DebuggingServer adds
387        m['X-Peer'] = socket.gethostbyname('localhost')
388        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
389        self.assertEqual(self.output.getvalue(), mexpect)
390
391    def testSendMessageWithAddresses(self):
392        m = email.mime.text.MIMEText('A test message')
393        m['From'] = 'foo@bar.com'
394        m['To'] = 'John'
395        m['CC'] = 'Sally, Fred'
396        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
397        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
398        smtp.send_message(m)
399        # XXX (see comment in testSend)
400        time.sleep(0.01)
401        smtp.quit()
402        # make sure the Bcc header is still in the message.
403        self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" '
404                                    '<warped@silly.walks.com>')
405
406        self.client_evt.set()
407        self.serv_evt.wait()
408        self.output.flush()
409        # Add the X-Peer header that DebuggingServer adds
410        m['X-Peer'] = socket.gethostbyname('localhost')
411        # The Bcc header should not be transmitted.
412        del m['Bcc']
413        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
414        self.assertEqual(self.output.getvalue(), mexpect)
415        debugout = smtpd.DEBUGSTREAM.getvalue()
416        sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
417        self.assertRegex(debugout, sender)
418        for addr in ('John', 'Sally', 'Fred', 'root@localhost',
419                     'warped@silly.walks.com'):
420            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
421                                 re.MULTILINE)
422            self.assertRegex(debugout, to_addr)
423
424    def testSendMessageWithSomeAddresses(self):
425        # Make sure nothing breaks if not all of the three 'to' headers exist
426        m = email.mime.text.MIMEText('A test message')
427        m['From'] = 'foo@bar.com'
428        m['To'] = 'John, Dinsdale'
429        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
430        smtp.send_message(m)
431        # XXX (see comment in testSend)
432        time.sleep(0.01)
433        smtp.quit()
434
435        self.client_evt.set()
436        self.serv_evt.wait()
437        self.output.flush()
438        # Add the X-Peer header that DebuggingServer adds
439        m['X-Peer'] = socket.gethostbyname('localhost')
440        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
441        self.assertEqual(self.output.getvalue(), mexpect)
442        debugout = smtpd.DEBUGSTREAM.getvalue()
443        sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
444        self.assertRegex(debugout, sender)
445        for addr in ('John', 'Dinsdale'):
446            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
447                                 re.MULTILINE)
448            self.assertRegex(debugout, to_addr)
449
450    def testSendMessageWithSpecifiedAddresses(self):
451        # Make sure addresses specified in call override those in message.
452        m = email.mime.text.MIMEText('A test message')
453        m['From'] = 'foo@bar.com'
454        m['To'] = 'John, Dinsdale'
455        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
456        smtp.send_message(m, from_addr='joe@example.com', to_addrs='foo@example.net')
457        # XXX (see comment in testSend)
458        time.sleep(0.01)
459        smtp.quit()
460
461        self.client_evt.set()
462        self.serv_evt.wait()
463        self.output.flush()
464        # Add the X-Peer header that DebuggingServer adds
465        m['X-Peer'] = socket.gethostbyname('localhost')
466        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
467        self.assertEqual(self.output.getvalue(), mexpect)
468        debugout = smtpd.DEBUGSTREAM.getvalue()
469        sender = re.compile("^sender: joe@example.com$", re.MULTILINE)
470        self.assertRegex(debugout, sender)
471        for addr in ('John', 'Dinsdale'):
472            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
473                                 re.MULTILINE)
474            self.assertNotRegex(debugout, to_addr)
475        recip = re.compile(r"^recips: .*'foo@example.net'.*$", re.MULTILINE)
476        self.assertRegex(debugout, recip)
477
478    def testSendMessageWithMultipleFrom(self):
479        # Sender overrides To
480        m = email.mime.text.MIMEText('A test message')
481        m['From'] = 'Bernard, Bianca'
482        m['Sender'] = 'the_rescuers@Rescue-Aid-Society.com'
483        m['To'] = 'John, Dinsdale'
484        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
485        smtp.send_message(m)
486        # XXX (see comment in testSend)
487        time.sleep(0.01)
488        smtp.quit()
489
490        self.client_evt.set()
491        self.serv_evt.wait()
492        self.output.flush()
493        # Add the X-Peer header that DebuggingServer adds
494        m['X-Peer'] = socket.gethostbyname('localhost')
495        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
496        self.assertEqual(self.output.getvalue(), mexpect)
497        debugout = smtpd.DEBUGSTREAM.getvalue()
498        sender = re.compile("^sender: the_rescuers@Rescue-Aid-Society.com$", re.MULTILINE)
499        self.assertRegex(debugout, sender)
500        for addr in ('John', 'Dinsdale'):
501            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
502                                 re.MULTILINE)
503            self.assertRegex(debugout, to_addr)
504
505    def testSendMessageResent(self):
506        m = email.mime.text.MIMEText('A test message')
507        m['From'] = 'foo@bar.com'
508        m['To'] = 'John'
509        m['CC'] = 'Sally, Fred'
510        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
511        m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
512        m['Resent-From'] = 'holy@grail.net'
513        m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
514        m['Resent-Bcc'] = 'doe@losthope.net'
515        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
516        smtp.send_message(m)
517        # XXX (see comment in testSend)
518        time.sleep(0.01)
519        smtp.quit()
520
521        self.client_evt.set()
522        self.serv_evt.wait()
523        self.output.flush()
524        # The Resent-Bcc headers are deleted before serialization.
525        del m['Bcc']
526        del m['Resent-Bcc']
527        # Add the X-Peer header that DebuggingServer adds
528        m['X-Peer'] = socket.gethostbyname('localhost')
529        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
530        self.assertEqual(self.output.getvalue(), mexpect)
531        debugout = smtpd.DEBUGSTREAM.getvalue()
532        sender = re.compile("^sender: holy@grail.net$", re.MULTILINE)
533        self.assertRegex(debugout, sender)
534        for addr in ('my_mom@great.cooker.com', 'Jeff', 'doe@losthope.net'):
535            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
536                                 re.MULTILINE)
537            self.assertRegex(debugout, to_addr)
538
539    def testSendMessageMultipleResentRaises(self):
540        m = email.mime.text.MIMEText('A test message')
541        m['From'] = 'foo@bar.com'
542        m['To'] = 'John'
543        m['CC'] = 'Sally, Fred'
544        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
545        m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
546        m['Resent-From'] = 'holy@grail.net'
547        m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
548        m['Resent-Bcc'] = 'doe@losthope.net'
549        m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000'
550        m['Resent-To'] = 'holy@grail.net'
551        m['Resent-From'] = 'Martha <my_mom@great.cooker.com>, Jeff'
552        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
553        with self.assertRaises(ValueError):
554            smtp.send_message(m)
555        smtp.close()
556
557class NonConnectingTests(unittest.TestCase):
558
559    def testNotConnected(self):
560        # Test various operations on an unconnected SMTP object that
561        # should raise exceptions (at present the attempt in SMTP.send
562        # to reference the nonexistent 'sock' attribute of the SMTP object
563        # causes an AttributeError)
564        smtp = smtplib.SMTP()
565        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
566        self.assertRaises(smtplib.SMTPServerDisconnected,
567                          smtp.send, 'test msg')
568
569    def testNonnumericPort(self):
570        # check that non-numeric port raises OSError
571        self.assertRaises(OSError, smtplib.SMTP,
572                          "localhost", "bogus")
573        self.assertRaises(OSError, smtplib.SMTP,
574                          "localhost:bogus")
575
576
577class DefaultArgumentsTests(unittest.TestCase):
578
579    def setUp(self):
580        self.msg = EmailMessage()
581        self.msg['From'] = 'Páolo <főo@bar.com>'
582        self.smtp = smtplib.SMTP()
583        self.smtp.ehlo = Mock(return_value=(200, 'OK'))
584        self.smtp.has_extn, self.smtp.sendmail = Mock(), Mock()
585
586    def testSendMessage(self):
587        expected_mail_options = ('SMTPUTF8', 'BODY=8BITMIME')
588        self.smtp.send_message(self.msg)
589        self.smtp.send_message(self.msg)
590        self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
591                         expected_mail_options)
592        self.assertEqual(self.smtp.sendmail.call_args_list[1][0][3],
593                         expected_mail_options)
594
595    def testSendMessageWithMailOptions(self):
596        mail_options = ['STARTTLS']
597        expected_mail_options = ('STARTTLS', 'SMTPUTF8', 'BODY=8BITMIME')
598        self.smtp.send_message(self.msg, None, None, mail_options)
599        self.assertEqual(mail_options, ['STARTTLS'])
600        self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
601                         expected_mail_options)
602
603
604# test response of client to a non-successful HELO message
605@unittest.skipUnless(threading, 'Threading required for this test.')
606class BadHELOServerTests(unittest.TestCase):
607
608    def setUp(self):
609        smtplib.socket = mock_socket
610        mock_socket.reply_with(b"199 no hello for you!")
611        self.old_stdout = sys.stdout
612        self.output = io.StringIO()
613        sys.stdout = self.output
614        self.port = 25
615
616    def tearDown(self):
617        smtplib.socket = socket
618        sys.stdout = self.old_stdout
619
620    def testFailingHELO(self):
621        self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,
622                            HOST, self.port, 'localhost', 3)
623
624
625@unittest.skipUnless(threading, 'Threading required for this test.')
626class TooLongLineTests(unittest.TestCase):
627    respdata = b'250 OK' + (b'.' * smtplib._MAXLINE * 2) + b'\n'
628
629    def setUp(self):
630        self.old_stdout = sys.stdout
631        self.output = io.StringIO()
632        sys.stdout = self.output
633
634        self.evt = threading.Event()
635        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
636        self.sock.settimeout(15)
637        self.port = support.bind_port(self.sock)
638        servargs = (self.evt, self.respdata, self.sock)
639        thread = threading.Thread(target=server, args=servargs)
640        thread.start()
641        self.addCleanup(thread.join)
642        self.evt.wait()
643        self.evt.clear()
644
645    def tearDown(self):
646        self.evt.wait()
647        sys.stdout = self.old_stdout
648
649    def testLineTooLong(self):
650        self.assertRaises(smtplib.SMTPResponseException, smtplib.SMTP,
651                          HOST, self.port, 'localhost', 3)
652
653
654sim_users = {'Mr.A@somewhere.com':'John A',
655             'Ms.B@xn--fo-fka.com':'Sally B',
656             'Mrs.C@somewhereesle.com':'Ruth C',
657            }
658
659sim_auth = ('Mr.A@somewhere.com', 'somepassword')
660sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'
661                          'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')
662sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'],
663             'list-2':['Ms.B@xn--fo-fka.com',],
664            }
665
666# Simulated SMTP channel & server
667class ResponseException(Exception): pass
668class SimSMTPChannel(smtpd.SMTPChannel):
669
670    quit_response = None
671    mail_response = None
672    rcpt_response = None
673    data_response = None
674    rcpt_count = 0
675    rset_count = 0
676    disconnect = 0
677    AUTH = 99    # Add protocol state to enable auth testing.
678    authenticated_user = None
679
680    def __init__(self, extra_features, *args, **kw):
681        self._extrafeatures = ''.join(
682            [ "250-{0}\r\n".format(x) for x in extra_features ])
683        super(SimSMTPChannel, self).__init__(*args, **kw)
684
685    # AUTH related stuff.  It would be nice if support for this were in smtpd.
686    def found_terminator(self):
687        if self.smtp_state == self.AUTH:
688            line = self._emptystring.join(self.received_lines)
689            print('Data:', repr(line), file=smtpd.DEBUGSTREAM)
690            self.received_lines = []
691            try:
692                self.auth_object(line)
693            except ResponseException as e:
694                self.smtp_state = self.COMMAND
695                self.push('%s %s' % (e.smtp_code, e.smtp_error))
696                return
697        super().found_terminator()
698
699
700    def smtp_AUTH(self, arg):
701        if not self.seen_greeting:
702            self.push('503 Error: send EHLO first')
703            return
704        if not self.extended_smtp or 'AUTH' not in self._extrafeatures:
705            self.push('500 Error: command "AUTH" not recognized')
706            return
707        if self.authenticated_user is not None:
708            self.push(
709                '503 Bad sequence of commands: already authenticated')
710            return
711        args = arg.split()
712        if len(args) not in [1, 2]:
713            self.push('501 Syntax: AUTH <mechanism> [initial-response]')
714            return
715        auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_')
716        try:
717            self.auth_object = getattr(self, auth_object_name)
718        except AttributeError:
719            self.push('504 Command parameter not implemented: unsupported '
720                      ' authentication mechanism {!r}'.format(auth_object_name))
721            return
722        self.smtp_state = self.AUTH
723        self.auth_object(args[1] if len(args) == 2 else None)
724
725    def _authenticated(self, user, valid):
726        if valid:
727            self.authenticated_user = user
728            self.push('235 Authentication Succeeded')
729        else:
730            self.push('535 Authentication credentials invalid')
731        self.smtp_state = self.COMMAND
732
733    def _decode_base64(self, string):
734        return base64.decodebytes(string.encode('ascii')).decode('utf-8')
735
736    def _auth_plain(self, arg=None):
737        if arg is None:
738            self.push('334 ')
739        else:
740            logpass = self._decode_base64(arg)
741            try:
742                *_, user, password = logpass.split('\0')
743            except ValueError as e:
744                self.push('535 Splitting response {!r} into user and password'
745                          ' failed: {}'.format(logpass, e))
746                return
747            self._authenticated(user, password == sim_auth[1])
748
749    def _auth_login(self, arg=None):
750        if arg is None:
751            # base64 encoded 'Username:'
752            self.push('334 VXNlcm5hbWU6')
753        elif not hasattr(self, '_auth_login_user'):
754            self._auth_login_user = self._decode_base64(arg)
755            # base64 encoded 'Password:'
756            self.push('334 UGFzc3dvcmQ6')
757        else:
758            password = self._decode_base64(arg)
759            self._authenticated(self._auth_login_user, password == sim_auth[1])
760            del self._auth_login_user
761
762    def _auth_cram_md5(self, arg=None):
763        if arg is None:
764            self.push('334 {}'.format(sim_cram_md5_challenge))
765        else:
766            logpass = self._decode_base64(arg)
767            try:
768                user, hashed_pass = logpass.split()
769            except ValueError as e:
770                self.push('535 Splitting response {!r} into user and password '
771                          'failed: {}'.format(logpass, e))
772                return False
773            valid_hashed_pass = hmac.HMAC(
774                sim_auth[1].encode('ascii'),
775                self._decode_base64(sim_cram_md5_challenge).encode('ascii'),
776                'md5').hexdigest()
777            self._authenticated(user, hashed_pass == valid_hashed_pass)
778    # end AUTH related stuff.
779
780    def smtp_EHLO(self, arg):
781        resp = ('250-testhost\r\n'
782                '250-EXPN\r\n'
783                '250-SIZE 20000000\r\n'
784                '250-STARTTLS\r\n'
785                '250-DELIVERBY\r\n')
786        resp = resp + self._extrafeatures + '250 HELP'
787        self.push(resp)
788        self.seen_greeting = arg
789        self.extended_smtp = True
790
791    def smtp_VRFY(self, arg):
792        # For max compatibility smtplib should be sending the raw address.
793        if arg in sim_users:
794            self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg)))
795        else:
796            self.push('550 No such user: %s' % arg)
797
798    def smtp_EXPN(self, arg):
799        list_name = arg.lower()
800        if list_name in sim_lists:
801            user_list = sim_lists[list_name]
802            for n, user_email in enumerate(user_list):
803                quoted_addr = smtplib.quoteaddr(user_email)
804                if n < len(user_list) - 1:
805                    self.push('250-%s %s' % (sim_users[user_email], quoted_addr))
806                else:
807                    self.push('250 %s %s' % (sim_users[user_email], quoted_addr))
808        else:
809            self.push('550 No access for you!')
810
811    def smtp_QUIT(self, arg):
812        if self.quit_response is None:
813            super(SimSMTPChannel, self).smtp_QUIT(arg)
814        else:
815            self.push(self.quit_response)
816            self.close_when_done()
817
818    def smtp_MAIL(self, arg):
819        if self.mail_response is None:
820            super().smtp_MAIL(arg)
821        else:
822            self.push(self.mail_response)
823            if self.disconnect:
824                self.close_when_done()
825
826    def smtp_RCPT(self, arg):
827        if self.rcpt_response is None:
828            super().smtp_RCPT(arg)
829            return
830        self.rcpt_count += 1
831        self.push(self.rcpt_response[self.rcpt_count-1])
832
833    def smtp_RSET(self, arg):
834        self.rset_count += 1
835        super().smtp_RSET(arg)
836
837    def smtp_DATA(self, arg):
838        if self.data_response is None:
839            super().smtp_DATA(arg)
840        else:
841            self.push(self.data_response)
842
843    def handle_error(self):
844        raise
845
846
847class SimSMTPServer(smtpd.SMTPServer):
848
849    channel_class = SimSMTPChannel
850
851    def __init__(self, *args, **kw):
852        self._extra_features = []
853        self._addresses = {}
854        smtpd.SMTPServer.__init__(self, *args, **kw)
855
856    def handle_accepted(self, conn, addr):
857        self._SMTPchannel = self.channel_class(
858            self._extra_features, self, conn, addr,
859            decode_data=self._decode_data)
860
861    def process_message(self, peer, mailfrom, rcpttos, data):
862        self._addresses['from'] = mailfrom
863        self._addresses['tos'] = rcpttos
864
865    def add_feature(self, feature):
866        self._extra_features.append(feature)
867
868    def handle_error(self):
869        raise
870
871
872# Test various SMTP & ESMTP commands/behaviors that require a simulated server
873# (i.e., something with more features than DebuggingServer)
874@unittest.skipUnless(threading, 'Threading required for this test.')
875class SMTPSimTests(unittest.TestCase):
876
877    def setUp(self):
878        self.real_getfqdn = socket.getfqdn
879        socket.getfqdn = mock_socket.getfqdn
880        self.serv_evt = threading.Event()
881        self.client_evt = threading.Event()
882        # Pick a random unused port by passing 0 for the port number
883        self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1), decode_data=True)
884        # Keep a note of what port was assigned
885        self.port = self.serv.socket.getsockname()[1]
886        serv_args = (self.serv, self.serv_evt, self.client_evt)
887        self.thread = threading.Thread(target=debugging_server, args=serv_args)
888        self.thread.start()
889
890        # wait until server thread has assigned a port number
891        self.serv_evt.wait()
892        self.serv_evt.clear()
893
894    def tearDown(self):
895        socket.getfqdn = self.real_getfqdn
896        # indicate that the client is finished
897        self.client_evt.set()
898        # wait for the server thread to terminate
899        self.serv_evt.wait()
900        self.thread.join()
901
902    def testBasic(self):
903        # smoke test
904        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
905        smtp.quit()
906
907    def testEHLO(self):
908        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
909
910        # no features should be present before the EHLO
911        self.assertEqual(smtp.esmtp_features, {})
912
913        # features expected from the test server
914        expected_features = {'expn':'',
915                             'size': '20000000',
916                             'starttls': '',
917                             'deliverby': '',
918                             'help': '',
919                             }
920
921        smtp.ehlo()
922        self.assertEqual(smtp.esmtp_features, expected_features)
923        for k in expected_features:
924            self.assertTrue(smtp.has_extn(k))
925        self.assertFalse(smtp.has_extn('unsupported-feature'))
926        smtp.quit()
927
928    def testVRFY(self):
929        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
930
931        for addr_spec, name in sim_users.items():
932            expected_known = (250, bytes('%s %s' %
933                                         (name, smtplib.quoteaddr(addr_spec)),
934                                         "ascii"))
935            self.assertEqual(smtp.vrfy(addr_spec), expected_known)
936
937        u = 'nobody@nowhere.com'
938        expected_unknown = (550, ('No such user: %s' % u).encode('ascii'))
939        self.assertEqual(smtp.vrfy(u), expected_unknown)
940        smtp.quit()
941
942    def testEXPN(self):
943        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
944
945        for listname, members in sim_lists.items():
946            users = []
947            for m in members:
948                users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m)))
949            expected_known = (250, bytes('\n'.join(users), "ascii"))
950            self.assertEqual(smtp.expn(listname), expected_known)
951
952        u = 'PSU-Members-List'
953        expected_unknown = (550, b'No access for you!')
954        self.assertEqual(smtp.expn(u), expected_unknown)
955        smtp.quit()
956
957    def testAUTH_PLAIN(self):
958        self.serv.add_feature("AUTH PLAIN")
959        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
960        resp = smtp.login(sim_auth[0], sim_auth[1])
961        self.assertEqual(resp, (235, b'Authentication Succeeded'))
962        smtp.close()
963
964    def testAUTH_LOGIN(self):
965        self.serv.add_feature("AUTH LOGIN")
966        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
967        resp = smtp.login(sim_auth[0], sim_auth[1])
968        self.assertEqual(resp, (235, b'Authentication Succeeded'))
969        smtp.close()
970
971    def testAUTH_CRAM_MD5(self):
972        self.serv.add_feature("AUTH CRAM-MD5")
973        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
974        resp = smtp.login(sim_auth[0], sim_auth[1])
975        self.assertEqual(resp, (235, b'Authentication Succeeded'))
976        smtp.close()
977
978    def testAUTH_multiple(self):
979        # Test that multiple authentication methods are tried.
980        self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5")
981        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
982        resp = smtp.login(sim_auth[0], sim_auth[1])
983        self.assertEqual(resp, (235, b'Authentication Succeeded'))
984        smtp.close()
985
986    def test_auth_function(self):
987        supported = {'CRAM-MD5', 'PLAIN', 'LOGIN'}
988        for mechanism in supported:
989            self.serv.add_feature("AUTH {}".format(mechanism))
990        for mechanism in supported:
991            with self.subTest(mechanism=mechanism):
992                smtp = smtplib.SMTP(HOST, self.port,
993                                    local_hostname='localhost', timeout=15)
994                smtp.ehlo('foo')
995                smtp.user, smtp.password = sim_auth[0], sim_auth[1]
996                method = 'auth_' + mechanism.lower().replace('-', '_')
997                resp = smtp.auth(mechanism, getattr(smtp, method))
998                self.assertEqual(resp, (235, b'Authentication Succeeded'))
999                smtp.close()
1000
1001    def test_quit_resets_greeting(self):
1002        smtp = smtplib.SMTP(HOST, self.port,
1003                            local_hostname='localhost',
1004                            timeout=15)
1005        code, message = smtp.ehlo()
1006        self.assertEqual(code, 250)
1007        self.assertIn('size', smtp.esmtp_features)
1008        smtp.quit()
1009        self.assertNotIn('size', smtp.esmtp_features)
1010        smtp.connect(HOST, self.port)
1011        self.assertNotIn('size', smtp.esmtp_features)
1012        smtp.ehlo_or_helo_if_needed()
1013        self.assertIn('size', smtp.esmtp_features)
1014        smtp.quit()
1015
1016    def test_with_statement(self):
1017        with smtplib.SMTP(HOST, self.port) as smtp:
1018            code, message = smtp.noop()
1019            self.assertEqual(code, 250)
1020        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
1021        with smtplib.SMTP(HOST, self.port) as smtp:
1022            smtp.close()
1023        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
1024
1025    def test_with_statement_QUIT_failure(self):
1026        with self.assertRaises(smtplib.SMTPResponseException) as error:
1027            with smtplib.SMTP(HOST, self.port) as smtp:
1028                smtp.noop()
1029                self.serv._SMTPchannel.quit_response = '421 QUIT FAILED'
1030        self.assertEqual(error.exception.smtp_code, 421)
1031        self.assertEqual(error.exception.smtp_error, b'QUIT FAILED')
1032
1033    #TODO: add tests for correct AUTH method fallback now that the
1034    #test infrastructure can support it.
1035
1036    # Issue 17498: make sure _rset does not raise SMTPServerDisconnected exception
1037    def test__rest_from_mail_cmd(self):
1038        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1039        smtp.noop()
1040        self.serv._SMTPchannel.mail_response = '451 Requested action aborted'
1041        self.serv._SMTPchannel.disconnect = True
1042        with self.assertRaises(smtplib.SMTPSenderRefused):
1043            smtp.sendmail('John', 'Sally', 'test message')
1044        self.assertIsNone(smtp.sock)
1045
1046    # Issue 5713: make sure close, not rset, is called if we get a 421 error
1047    def test_421_from_mail_cmd(self):
1048        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1049        smtp.noop()
1050        self.serv._SMTPchannel.mail_response = '421 closing connection'
1051        with self.assertRaises(smtplib.SMTPSenderRefused):
1052            smtp.sendmail('John', 'Sally', 'test message')
1053        self.assertIsNone(smtp.sock)
1054        self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
1055
1056    def test_421_from_rcpt_cmd(self):
1057        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1058        smtp.noop()
1059        self.serv._SMTPchannel.rcpt_response = ['250 accepted', '421 closing']
1060        with self.assertRaises(smtplib.SMTPRecipientsRefused) as r:
1061            smtp.sendmail('John', ['Sally', 'Frank', 'George'], 'test message')
1062        self.assertIsNone(smtp.sock)
1063        self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
1064        self.assertDictEqual(r.exception.args[0], {'Frank': (421, b'closing')})
1065
1066    def test_421_from_data_cmd(self):
1067        class MySimSMTPChannel(SimSMTPChannel):
1068            def found_terminator(self):
1069                if self.smtp_state == self.DATA:
1070                    self.push('421 closing')
1071                else:
1072                    super().found_terminator()
1073        self.serv.channel_class = MySimSMTPChannel
1074        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1075        smtp.noop()
1076        with self.assertRaises(smtplib.SMTPDataError):
1077            smtp.sendmail('John@foo.org', ['Sally@foo.org'], 'test message')
1078        self.assertIsNone(smtp.sock)
1079        self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0)
1080
1081    def test_smtputf8_NotSupportedError_if_no_server_support(self):
1082        smtp = smtplib.SMTP(
1083            HOST, self.port, local_hostname='localhost', timeout=3)
1084        self.addCleanup(smtp.close)
1085        smtp.ehlo()
1086        self.assertTrue(smtp.does_esmtp)
1087        self.assertFalse(smtp.has_extn('smtputf8'))
1088        self.assertRaises(
1089            smtplib.SMTPNotSupportedError,
1090            smtp.sendmail,
1091            'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
1092        self.assertRaises(
1093            smtplib.SMTPNotSupportedError,
1094            smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8'])
1095
1096    def test_send_unicode_without_SMTPUTF8(self):
1097        smtp = smtplib.SMTP(
1098            HOST, self.port, local_hostname='localhost', timeout=3)
1099        self.addCleanup(smtp.close)
1100        self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Böb', '')
1101        self.assertRaises(UnicodeEncodeError, smtp.mail, 'Älice')
1102
1103    def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self):
1104        # This test is located here and not in the SMTPUTF8SimTests
1105        # class because it needs a "regular" SMTP server to work
1106        msg = EmailMessage()
1107        msg['From'] = "Páolo <főo@bar.com>"
1108        msg['To'] = 'Dinsdale'
1109        msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
1110        smtp = smtplib.SMTP(
1111            HOST, self.port, local_hostname='localhost', timeout=3)
1112        self.addCleanup(smtp.close)
1113        with self.assertRaises(smtplib.SMTPNotSupportedError):
1114            smtp.send_message(msg)
1115
1116    def test_name_field_not_included_in_envelop_addresses(self):
1117        smtp = smtplib.SMTP(
1118            HOST, self.port, local_hostname='localhost', timeout=3
1119        )
1120        self.addCleanup(smtp.close)
1121
1122        message = EmailMessage()
1123        message['From'] = email.utils.formataddr(('Michaël', 'michael@example.com'))
1124        message['To'] = email.utils.formataddr(('René', 'rene@example.com'))
1125
1126        self.assertDictEqual(smtp.send_message(message), {})
1127
1128        self.assertEqual(self.serv._addresses['from'], 'michael@example.com')
1129        self.assertEqual(self.serv._addresses['tos'], ['rene@example.com'])
1130
1131
1132class SimSMTPUTF8Server(SimSMTPServer):
1133
1134    def __init__(self, *args, **kw):
1135        # The base SMTP server turns these on automatically, but our test
1136        # server is set up to munge the EHLO response, so we need to provide
1137        # them as well.  And yes, the call is to SMTPServer not SimSMTPServer.
1138        self._extra_features = ['SMTPUTF8', '8BITMIME']
1139        smtpd.SMTPServer.__init__(self, *args, **kw)
1140
1141    def handle_accepted(self, conn, addr):
1142        self._SMTPchannel = self.channel_class(
1143            self._extra_features, self, conn, addr,
1144            decode_data=self._decode_data,
1145            enable_SMTPUTF8=self.enable_SMTPUTF8,
1146        )
1147
1148    def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None,
1149                                                             rcpt_options=None):
1150        self.last_peer = peer
1151        self.last_mailfrom = mailfrom
1152        self.last_rcpttos = rcpttos
1153        self.last_message = data
1154        self.last_mail_options = mail_options
1155        self.last_rcpt_options = rcpt_options
1156
1157
1158@unittest.skipUnless(threading, 'Threading required for this test.')
1159class SMTPUTF8SimTests(unittest.TestCase):
1160
1161    maxDiff = None
1162
1163    def setUp(self):
1164        self.real_getfqdn = socket.getfqdn
1165        socket.getfqdn = mock_socket.getfqdn
1166        self.serv_evt = threading.Event()
1167        self.client_evt = threading.Event()
1168        # Pick a random unused port by passing 0 for the port number
1169        self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1),
1170                                      decode_data=False,
1171                                      enable_SMTPUTF8=True)
1172        # Keep a note of what port was assigned
1173        self.port = self.serv.socket.getsockname()[1]
1174        serv_args = (self.serv, self.serv_evt, self.client_evt)
1175        self.thread = threading.Thread(target=debugging_server, args=serv_args)
1176        self.thread.start()
1177
1178        # wait until server thread has assigned a port number
1179        self.serv_evt.wait()
1180        self.serv_evt.clear()
1181
1182    def tearDown(self):
1183        socket.getfqdn = self.real_getfqdn
1184        # indicate that the client is finished
1185        self.client_evt.set()
1186        # wait for the server thread to terminate
1187        self.serv_evt.wait()
1188        self.thread.join()
1189
1190    def test_test_server_supports_extensions(self):
1191        smtp = smtplib.SMTP(
1192            HOST, self.port, local_hostname='localhost', timeout=3)
1193        self.addCleanup(smtp.close)
1194        smtp.ehlo()
1195        self.assertTrue(smtp.does_esmtp)
1196        self.assertTrue(smtp.has_extn('smtputf8'))
1197
1198    def test_send_unicode_with_SMTPUTF8_via_sendmail(self):
1199        m = '¡a test message containing unicode!'.encode('utf-8')
1200        smtp = smtplib.SMTP(
1201            HOST, self.port, local_hostname='localhost', timeout=3)
1202        self.addCleanup(smtp.close)
1203        smtp.sendmail('Jőhn', 'Sálly', m,
1204                      mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
1205        self.assertEqual(self.serv.last_mailfrom, 'Jőhn')
1206        self.assertEqual(self.serv.last_rcpttos, ['Sálly'])
1207        self.assertEqual(self.serv.last_message, m)
1208        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1209        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1210        self.assertEqual(self.serv.last_rcpt_options, [])
1211
1212    def test_send_unicode_with_SMTPUTF8_via_low_level_API(self):
1213        m = '¡a test message containing unicode!'.encode('utf-8')
1214        smtp = smtplib.SMTP(
1215            HOST, self.port, local_hostname='localhost', timeout=3)
1216        self.addCleanup(smtp.close)
1217        smtp.ehlo()
1218        self.assertEqual(
1219            smtp.mail('Jő', options=['BODY=8BITMIME', 'SMTPUTF8']),
1220            (250, b'OK'))
1221        self.assertEqual(smtp.rcpt('János'), (250, b'OK'))
1222        self.assertEqual(smtp.data(m), (250, b'OK'))
1223        self.assertEqual(self.serv.last_mailfrom, 'Jő')
1224        self.assertEqual(self.serv.last_rcpttos, ['János'])
1225        self.assertEqual(self.serv.last_message, m)
1226        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1227        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1228        self.assertEqual(self.serv.last_rcpt_options, [])
1229
1230    def test_send_message_uses_smtputf8_if_addrs_non_ascii(self):
1231        msg = EmailMessage()
1232        msg['From'] = "Páolo <főo@bar.com>"
1233        msg['To'] = 'Dinsdale'
1234        msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
1235        # XXX I don't know why I need two \n's here, but this is an existing
1236        # bug (if it is one) and not a problem with the new functionality.
1237        msg.set_content("oh là là, know what I mean, know what I mean?\n\n")
1238        # XXX smtpd converts received /r/n to /n, so we can't easily test that
1239        # we are successfully sending /r/n :(.
1240        expected = textwrap.dedent("""\
1241            From: Páolo <főo@bar.com>
1242            To: Dinsdale
1243            Subject: Nudge nudge, wink, wink \u1F609
1244            Content-Type: text/plain; charset="utf-8"
1245            Content-Transfer-Encoding: 8bit
1246            MIME-Version: 1.0
1247
1248            oh là là, know what I mean, know what I mean?
1249            """)
1250        smtp = smtplib.SMTP(
1251            HOST, self.port, local_hostname='localhost', timeout=3)
1252        self.addCleanup(smtp.close)
1253        self.assertEqual(smtp.send_message(msg), {})
1254        self.assertEqual(self.serv.last_mailfrom, 'főo@bar.com')
1255        self.assertEqual(self.serv.last_rcpttos, ['Dinsdale'])
1256        self.assertEqual(self.serv.last_message.decode(), expected)
1257        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1258        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1259        self.assertEqual(self.serv.last_rcpt_options, [])
1260
1261
1262EXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='')
1263
1264class SimSMTPAUTHInitialResponseChannel(SimSMTPChannel):
1265    def smtp_AUTH(self, arg):
1266        # RFC 4954's AUTH command allows for an optional initial-response.
1267        # Not all AUTH methods support this; some require a challenge.  AUTH
1268        # PLAIN does those, so test that here.  See issue #15014.
1269        args = arg.split()
1270        if args[0].lower() == 'plain':
1271            if len(args) == 2:
1272                # AUTH PLAIN <initial-response> with the response base 64
1273                # encoded.  Hard code the expected response for the test.
1274                if args[1] == EXPECTED_RESPONSE:
1275                    self.push('235 Ok')
1276                    return
1277        self.push('571 Bad authentication')
1278
1279class SimSMTPAUTHInitialResponseServer(SimSMTPServer):
1280    channel_class = SimSMTPAUTHInitialResponseChannel
1281
1282
1283@unittest.skipUnless(threading, 'Threading required for this test.')
1284class SMTPAUTHInitialResponseSimTests(unittest.TestCase):
1285    def setUp(self):
1286        self.real_getfqdn = socket.getfqdn
1287        socket.getfqdn = mock_socket.getfqdn
1288        self.serv_evt = threading.Event()
1289        self.client_evt = threading.Event()
1290        # Pick a random unused port by passing 0 for the port number
1291        self.serv = SimSMTPAUTHInitialResponseServer(
1292            (HOST, 0), ('nowhere', -1), decode_data=True)
1293        # Keep a note of what port was assigned
1294        self.port = self.serv.socket.getsockname()[1]
1295        serv_args = (self.serv, self.serv_evt, self.client_evt)
1296        self.thread = threading.Thread(target=debugging_server, args=serv_args)
1297        self.thread.start()
1298
1299        # wait until server thread has assigned a port number
1300        self.serv_evt.wait()
1301        self.serv_evt.clear()
1302
1303    def tearDown(self):
1304        socket.getfqdn = self.real_getfqdn
1305        # indicate that the client is finished
1306        self.client_evt.set()
1307        # wait for the server thread to terminate
1308        self.serv_evt.wait()
1309        self.thread.join()
1310
1311    def testAUTH_PLAIN_initial_response_login(self):
1312        self.serv.add_feature('AUTH PLAIN')
1313        smtp = smtplib.SMTP(HOST, self.port,
1314                            local_hostname='localhost', timeout=15)
1315        smtp.login('psu', 'doesnotexist')
1316        smtp.close()
1317
1318    def testAUTH_PLAIN_initial_response_auth(self):
1319        self.serv.add_feature('AUTH PLAIN')
1320        smtp = smtplib.SMTP(HOST, self.port,
1321                            local_hostname='localhost', timeout=15)
1322        smtp.user = 'psu'
1323        smtp.password = 'doesnotexist'
1324        code, response = smtp.auth('plain', smtp.auth_plain)
1325        smtp.close()
1326        self.assertEqual(code, 235)
1327
1328
1329if __name__ == '__main__':
1330    unittest.main()
1331