1import asyncore
2import base64
3import email.mime.text
4from email.message import EmailMessage
5from email.base64mime import body_encode as encode_base64
6import email.utils
7import hashlib
8import hmac
9import socket
10import smtpd
11import smtplib
12import io
13import re
14import sys
15import time
16import select
17import errno
18import textwrap
19import threading
20
21import unittest
22from test import support, mock_socket
23from test.support import HOST
24from test.support import threading_setup, threading_cleanup, join_thread
25from test.support import requires_hashdigest
26from unittest.mock import Mock
27
28
29if sys.platform == 'darwin':
30    # select.poll returns a select.POLLHUP at the end of the tests
31    # on darwin, so just ignore it
32    def handle_expt(self):
33        pass
34    smtpd.SMTPChannel.handle_expt = handle_expt
35
36
37def server(evt, buf, serv):
38    serv.listen()
39    evt.set()
40    try:
41        conn, addr = serv.accept()
42    except socket.timeout:
43        pass
44    else:
45        n = 500
46        while buf and n > 0:
47            r, w, e = select.select([], [conn], [])
48            if w:
49                sent = conn.send(buf)
50                buf = buf[sent:]
51
52            n -= 1
53
54        conn.close()
55    finally:
56        serv.close()
57        evt.set()
58
59class GeneralTests(unittest.TestCase):
60
61    def setUp(self):
62        smtplib.socket = mock_socket
63        self.port = 25
64
65    def tearDown(self):
66        smtplib.socket = socket
67
68    # This method is no longer used but is retained for backward compatibility,
69    # so test to make sure it still works.
70    def testQuoteData(self):
71        teststr  = "abc\n.jkl\rfoo\r\n..blue"
72        expected = "abc\r\n..jkl\r\nfoo\r\n...blue"
73        self.assertEqual(expected, smtplib.quotedata(teststr))
74
75    def testBasic1(self):
76        mock_socket.reply_with(b"220 Hola mundo")
77        # connects
78        smtp = smtplib.SMTP(HOST, self.port)
79        smtp.close()
80
81    def testSourceAddress(self):
82        mock_socket.reply_with(b"220 Hola mundo")
83        # connects
84        smtp = smtplib.SMTP(HOST, self.port,
85                source_address=('127.0.0.1',19876))
86        self.assertEqual(smtp.source_address, ('127.0.0.1', 19876))
87        smtp.close()
88
89    def testBasic2(self):
90        mock_socket.reply_with(b"220 Hola mundo")
91        # connects, include port in host name
92        smtp = smtplib.SMTP("%s:%s" % (HOST, self.port))
93        smtp.close()
94
95    def testLocalHostName(self):
96        mock_socket.reply_with(b"220 Hola mundo")
97        # check that supplied local_hostname is used
98        smtp = smtplib.SMTP(HOST, self.port, local_hostname="testhost")
99        self.assertEqual(smtp.local_hostname, "testhost")
100        smtp.close()
101
102    def testTimeoutDefault(self):
103        mock_socket.reply_with(b"220 Hola mundo")
104        self.assertIsNone(mock_socket.getdefaulttimeout())
105        mock_socket.setdefaulttimeout(30)
106        self.assertEqual(mock_socket.getdefaulttimeout(), 30)
107        try:
108            smtp = smtplib.SMTP(HOST, self.port)
109        finally:
110            mock_socket.setdefaulttimeout(None)
111        self.assertEqual(smtp.sock.gettimeout(), 30)
112        smtp.close()
113
114    def testTimeoutNone(self):
115        mock_socket.reply_with(b"220 Hola mundo")
116        self.assertIsNone(socket.getdefaulttimeout())
117        socket.setdefaulttimeout(30)
118        try:
119            smtp = smtplib.SMTP(HOST, self.port, timeout=None)
120        finally:
121            socket.setdefaulttimeout(None)
122        self.assertIsNone(smtp.sock.gettimeout())
123        smtp.close()
124
125    def testTimeoutValue(self):
126        mock_socket.reply_with(b"220 Hola mundo")
127        smtp = smtplib.SMTP(HOST, self.port, timeout=30)
128        self.assertEqual(smtp.sock.gettimeout(), 30)
129        smtp.close()
130
131    def test_debuglevel(self):
132        mock_socket.reply_with(b"220 Hello world")
133        smtp = smtplib.SMTP()
134        smtp.set_debuglevel(1)
135        with support.captured_stderr() as stderr:
136            smtp.connect(HOST, self.port)
137        smtp.close()
138        expected = re.compile(r"^connect:", re.MULTILINE)
139        self.assertRegex(stderr.getvalue(), expected)
140
141    def test_debuglevel_2(self):
142        mock_socket.reply_with(b"220 Hello world")
143        smtp = smtplib.SMTP()
144        smtp.set_debuglevel(2)
145        with support.captured_stderr() as stderr:
146            smtp.connect(HOST, self.port)
147        smtp.close()
148        expected = re.compile(r"^\d{2}:\d{2}:\d{2}\.\d{6} connect: ",
149                              re.MULTILINE)
150        self.assertRegex(stderr.getvalue(), expected)
151
152
153# Test server thread using the specified SMTP server class
154def debugging_server(serv, serv_evt, client_evt):
155    serv_evt.set()
156
157    try:
158        if hasattr(select, 'poll'):
159            poll_fun = asyncore.poll2
160        else:
161            poll_fun = asyncore.poll
162
163        n = 1000
164        while asyncore.socket_map and n > 0:
165            poll_fun(0.01, asyncore.socket_map)
166
167            # when the client conversation is finished, it will
168            # set client_evt, and it's then ok to kill the server
169            if client_evt.is_set():
170                serv.close()
171                break
172
173            n -= 1
174
175    except socket.timeout:
176        pass
177    finally:
178        if not client_evt.is_set():
179            # allow some time for the client to read the result
180            time.sleep(0.5)
181            serv.close()
182        asyncore.close_all()
183        serv_evt.set()
184
185MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
186MSG_END = '------------ END MESSAGE ------------\n'
187
188# NOTE: Some SMTP objects in the tests below are created with a non-default
189# local_hostname argument to the constructor, since (on some systems) the FQDN
190# lookup caused by the default local_hostname sometimes takes so long that the
191# test server times out, causing the test to fail.
192
193# Test behavior of smtpd.DebuggingServer
194class DebuggingServerTests(unittest.TestCase):
195
196    maxDiff = None
197
198    def setUp(self):
199        self.thread_key = threading_setup()
200        self.real_getfqdn = socket.getfqdn
201        socket.getfqdn = mock_socket.getfqdn
202        # temporarily replace sys.stdout to capture DebuggingServer output
203        self.old_stdout = sys.stdout
204        self.output = io.StringIO()
205        sys.stdout = self.output
206
207        self.serv_evt = threading.Event()
208        self.client_evt = threading.Event()
209        # Capture SMTPChannel debug output
210        self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
211        smtpd.DEBUGSTREAM = io.StringIO()
212        # Pick a random unused port by passing 0 for the port number
213        self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1),
214                                          decode_data=True)
215        # Keep a note of what server host and port were assigned
216        self.host, self.port = self.serv.socket.getsockname()[:2]
217        serv_args = (self.serv, self.serv_evt, self.client_evt)
218        self.thread = threading.Thread(target=debugging_server, args=serv_args)
219        self.thread.start()
220
221        # wait until server thread has assigned a port number
222        self.serv_evt.wait()
223        self.serv_evt.clear()
224
225    def tearDown(self):
226        socket.getfqdn = self.real_getfqdn
227        # indicate that the client is finished
228        self.client_evt.set()
229        # wait for the server thread to terminate
230        self.serv_evt.wait()
231        join_thread(self.thread)
232        # restore sys.stdout
233        sys.stdout = self.old_stdout
234        # restore DEBUGSTREAM
235        smtpd.DEBUGSTREAM.close()
236        smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM
237        del self.thread
238        self.doCleanups()
239        threading_cleanup(*self.thread_key)
240
241    def get_output_without_xpeer(self):
242        test_output = self.output.getvalue()
243        return re.sub(r'(.*?)^X-Peer:\s*\S+\n(.*)', r'\1\2',
244                      test_output, flags=re.MULTILINE|re.DOTALL)
245
246    def testBasic(self):
247        # connect
248        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
249        smtp.quit()
250
251    def testSourceAddress(self):
252        # connect
253        src_port = support.find_unused_port()
254        try:
255            smtp = smtplib.SMTP(self.host, self.port, local_hostname='localhost',
256                                timeout=3, source_address=(self.host, src_port))
257            self.addCleanup(smtp.close)
258            self.assertEqual(smtp.source_address, (self.host, src_port))
259            self.assertEqual(smtp.local_hostname, 'localhost')
260            smtp.quit()
261        except OSError as e:
262            if e.errno == errno.EADDRINUSE:
263                self.skipTest("couldn't bind to source port %d" % src_port)
264            raise
265
266    def testNOOP(self):
267        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
268        self.addCleanup(smtp.close)
269        expected = (250, b'OK')
270        self.assertEqual(smtp.noop(), expected)
271        smtp.quit()
272
273    def testRSET(self):
274        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
275        self.addCleanup(smtp.close)
276        expected = (250, b'OK')
277        self.assertEqual(smtp.rset(), expected)
278        smtp.quit()
279
280    def testELHO(self):
281        # EHLO isn't implemented in DebuggingServer
282        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
283        self.addCleanup(smtp.close)
284        expected = (250, b'\nSIZE 33554432\nHELP')
285        self.assertEqual(smtp.ehlo(), expected)
286        smtp.quit()
287
288    def testEXPNNotImplemented(self):
289        # EXPN isn't implemented in DebuggingServer
290        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
291        self.addCleanup(smtp.close)
292        expected = (502, b'EXPN not implemented')
293        smtp.putcmd('EXPN')
294        self.assertEqual(smtp.getreply(), expected)
295        smtp.quit()
296
297    def test_issue43124_putcmd_escapes_newline(self):
298        # see: https://bugs.python.org/issue43124
299        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
300                            timeout=10)  # support.LOOPBACK_TIMEOUT in newer Pythons
301        self.addCleanup(smtp.close)
302        with self.assertRaises(ValueError) as exc:
303            smtp.putcmd('helo\nX-INJECTED')
304        self.assertIn("prohibited newline characters", str(exc.exception))
305        smtp.quit()
306
307    def testVRFY(self):
308        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
309        self.addCleanup(smtp.close)
310        expected = (252, b'Cannot VRFY user, but will accept message ' + \
311                         b'and attempt delivery')
312        self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected)
313        self.assertEqual(smtp.verify('nobody@nowhere.com'), expected)
314        smtp.quit()
315
316    def testSecondHELO(self):
317        # check that a second HELO returns a message that it's a duplicate
318        # (this behavior is specific to smtpd.SMTPChannel)
319        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
320        self.addCleanup(smtp.close)
321        smtp.helo()
322        expected = (503, b'Duplicate HELO/EHLO')
323        self.assertEqual(smtp.helo(), expected)
324        smtp.quit()
325
326    def testHELP(self):
327        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
328        self.addCleanup(smtp.close)
329        self.assertEqual(smtp.help(), b'Supported commands: EHLO HELO MAIL ' + \
330                                      b'RCPT DATA RSET NOOP QUIT VRFY')
331        smtp.quit()
332
333    def testSend(self):
334        # connect and send mail
335        m = 'A test message'
336        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
337        self.addCleanup(smtp.close)
338        smtp.sendmail('John', 'Sally', m)
339        # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor
340        # in asyncore.  This sleep might help, but should really be fixed
341        # properly by using an Event variable.
342        time.sleep(0.01)
343        smtp.quit()
344
345        self.client_evt.set()
346        self.serv_evt.wait()
347        self.output.flush()
348        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
349        self.assertEqual(self.output.getvalue(), mexpect)
350
351    def testSendBinary(self):
352        m = b'A test message'
353        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
354        self.addCleanup(smtp.close)
355        smtp.sendmail('John', 'Sally', m)
356        # XXX (see comment in testSend)
357        time.sleep(0.01)
358        smtp.quit()
359
360        self.client_evt.set()
361        self.serv_evt.wait()
362        self.output.flush()
363        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END)
364        self.assertEqual(self.output.getvalue(), mexpect)
365
366    def testSendNeedingDotQuote(self):
367        # Issue 12283
368        m = '.A test\n.mes.sage.'
369        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
370        self.addCleanup(smtp.close)
371        smtp.sendmail('John', 'Sally', m)
372        # XXX (see comment in testSend)
373        time.sleep(0.01)
374        smtp.quit()
375
376        self.client_evt.set()
377        self.serv_evt.wait()
378        self.output.flush()
379        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
380        self.assertEqual(self.output.getvalue(), mexpect)
381
382    def test_issue43124_escape_localhostname(self):
383        # see: https://bugs.python.org/issue43124
384        # connect and send mail
385        m = 'wazzuuup\nlinetwo'
386        smtp = smtplib.SMTP(HOST, self.port, local_hostname='hi\nX-INJECTED',
387                            timeout=10)  # support.LOOPBACK_TIMEOUT in newer Pythons
388        self.addCleanup(smtp.close)
389        with self.assertRaises(ValueError) as exc:
390            smtp.sendmail("hi@me.com", "you@me.com", m)
391        self.assertIn(
392            "prohibited newline characters: ehlo hi\\nX-INJECTED",
393            str(exc.exception),
394        )
395        # XXX (see comment in testSend)
396        time.sleep(0.01)
397        smtp.quit()
398
399        debugout = smtpd.DEBUGSTREAM.getvalue()
400        self.assertNotIn("X-INJECTED", debugout)
401
402    def test_issue43124_escape_options(self):
403        # see: https://bugs.python.org/issue43124
404        # connect and send mail
405        m = 'wazzuuup\nlinetwo'
406        smtp = smtplib.SMTP(
407            HOST, self.port, local_hostname='localhost',
408            timeout=10)  # support.LOOPBACK_TIMEOUT in newer Pythons
409
410        self.addCleanup(smtp.close)
411        smtp.sendmail("hi@me.com", "you@me.com", m)
412        with self.assertRaises(ValueError) as exc:
413            smtp.mail("hi@me.com", ["X-OPTION\nX-INJECTED-1", "X-OPTION2\nX-INJECTED-2"])
414        msg = str(exc.exception)
415        self.assertIn("prohibited newline characters", msg)
416        self.assertIn("X-OPTION\\nX-INJECTED-1 X-OPTION2\\nX-INJECTED-2", msg)
417        # XXX (see comment in testSend)
418        time.sleep(0.01)
419        smtp.quit()
420
421        debugout = smtpd.DEBUGSTREAM.getvalue()
422        self.assertNotIn("X-OPTION", debugout)
423        self.assertNotIn("X-OPTION2", debugout)
424        self.assertNotIn("X-INJECTED-1", debugout)
425        self.assertNotIn("X-INJECTED-2", debugout)
426
427    def testSendNullSender(self):
428        m = 'A test message'
429        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
430        self.addCleanup(smtp.close)
431        smtp.sendmail('<>', 'Sally', m)
432        # XXX (see comment in testSend)
433        time.sleep(0.01)
434        smtp.quit()
435
436        self.client_evt.set()
437        self.serv_evt.wait()
438        self.output.flush()
439        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
440        self.assertEqual(self.output.getvalue(), mexpect)
441        debugout = smtpd.DEBUGSTREAM.getvalue()
442        sender = re.compile("^sender: <>$", re.MULTILINE)
443        self.assertRegex(debugout, sender)
444
445    def testSendMessage(self):
446        m = email.mime.text.MIMEText('A test message')
447        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
448        self.addCleanup(smtp.close)
449        smtp.send_message(m, from_addr='John', to_addrs='Sally')
450        # XXX (see comment in testSend)
451        time.sleep(0.01)
452        smtp.quit()
453
454        self.client_evt.set()
455        self.serv_evt.wait()
456        self.output.flush()
457        # Remove the X-Peer header that DebuggingServer adds as figuring out
458        # exactly what IP address format is put there is not easy (and
459        # irrelevant to our test).  Typically 127.0.0.1 or ::1, but it is
460        # not always the same as socket.gethostbyname(HOST). :(
461        test_output = self.get_output_without_xpeer()
462        del m['X-Peer']
463        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
464        self.assertEqual(test_output, mexpect)
465
466    def testSendMessageWithAddresses(self):
467        m = email.mime.text.MIMEText('A test message')
468        m['From'] = 'foo@bar.com'
469        m['To'] = 'John'
470        m['CC'] = 'Sally, Fred'
471        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
472        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
473        self.addCleanup(smtp.close)
474        smtp.send_message(m)
475        # XXX (see comment in testSend)
476        time.sleep(0.01)
477        smtp.quit()
478        # make sure the Bcc header is still in the message.
479        self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" '
480                                    '<warped@silly.walks.com>')
481
482        self.client_evt.set()
483        self.serv_evt.wait()
484        self.output.flush()
485        # Remove the X-Peer header that DebuggingServer adds.
486        test_output = self.get_output_without_xpeer()
487        del m['X-Peer']
488        # The Bcc header should not be transmitted.
489        del m['Bcc']
490        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
491        self.assertEqual(test_output, mexpect)
492        debugout = smtpd.DEBUGSTREAM.getvalue()
493        sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
494        self.assertRegex(debugout, sender)
495        for addr in ('John', 'Sally', 'Fred', 'root@localhost',
496                     'warped@silly.walks.com'):
497            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
498                                 re.MULTILINE)
499            self.assertRegex(debugout, to_addr)
500
501    def testSendMessageWithSomeAddresses(self):
502        # Make sure nothing breaks if not all of the three 'to' headers exist
503        m = email.mime.text.MIMEText('A test message')
504        m['From'] = 'foo@bar.com'
505        m['To'] = 'John, Dinsdale'
506        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
507        self.addCleanup(smtp.close)
508        smtp.send_message(m)
509        # XXX (see comment in testSend)
510        time.sleep(0.01)
511        smtp.quit()
512
513        self.client_evt.set()
514        self.serv_evt.wait()
515        self.output.flush()
516        # Remove the X-Peer header that DebuggingServer adds.
517        test_output = self.get_output_without_xpeer()
518        del m['X-Peer']
519        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
520        self.assertEqual(test_output, mexpect)
521        debugout = smtpd.DEBUGSTREAM.getvalue()
522        sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
523        self.assertRegex(debugout, sender)
524        for addr in ('John', 'Dinsdale'):
525            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
526                                 re.MULTILINE)
527            self.assertRegex(debugout, to_addr)
528
529    def testSendMessageWithSpecifiedAddresses(self):
530        # Make sure addresses specified in call override those in message.
531        m = email.mime.text.MIMEText('A test message')
532        m['From'] = 'foo@bar.com'
533        m['To'] = 'John, Dinsdale'
534        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
535        self.addCleanup(smtp.close)
536        smtp.send_message(m, from_addr='joe@example.com', to_addrs='foo@example.net')
537        # XXX (see comment in testSend)
538        time.sleep(0.01)
539        smtp.quit()
540
541        self.client_evt.set()
542        self.serv_evt.wait()
543        self.output.flush()
544        # Remove the X-Peer header that DebuggingServer adds.
545        test_output = self.get_output_without_xpeer()
546        del m['X-Peer']
547        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
548        self.assertEqual(test_output, mexpect)
549        debugout = smtpd.DEBUGSTREAM.getvalue()
550        sender = re.compile("^sender: joe@example.com$", re.MULTILINE)
551        self.assertRegex(debugout, sender)
552        for addr in ('John', 'Dinsdale'):
553            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
554                                 re.MULTILINE)
555            self.assertNotRegex(debugout, to_addr)
556        recip = re.compile(r"^recips: .*'foo@example.net'.*$", re.MULTILINE)
557        self.assertRegex(debugout, recip)
558
559    def testSendMessageWithMultipleFrom(self):
560        # Sender overrides To
561        m = email.mime.text.MIMEText('A test message')
562        m['From'] = 'Bernard, Bianca'
563        m['Sender'] = 'the_rescuers@Rescue-Aid-Society.com'
564        m['To'] = 'John, Dinsdale'
565        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
566        self.addCleanup(smtp.close)
567        smtp.send_message(m)
568        # XXX (see comment in testSend)
569        time.sleep(0.01)
570        smtp.quit()
571
572        self.client_evt.set()
573        self.serv_evt.wait()
574        self.output.flush()
575        # Remove the X-Peer header that DebuggingServer adds.
576        test_output = self.get_output_without_xpeer()
577        del m['X-Peer']
578        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
579        self.assertEqual(test_output, mexpect)
580        debugout = smtpd.DEBUGSTREAM.getvalue()
581        sender = re.compile("^sender: the_rescuers@Rescue-Aid-Society.com$", re.MULTILINE)
582        self.assertRegex(debugout, sender)
583        for addr in ('John', 'Dinsdale'):
584            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
585                                 re.MULTILINE)
586            self.assertRegex(debugout, to_addr)
587
588    def testSendMessageResent(self):
589        m = email.mime.text.MIMEText('A test message')
590        m['From'] = 'foo@bar.com'
591        m['To'] = 'John'
592        m['CC'] = 'Sally, Fred'
593        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
594        m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
595        m['Resent-From'] = 'holy@grail.net'
596        m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
597        m['Resent-Bcc'] = 'doe@losthope.net'
598        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
599        self.addCleanup(smtp.close)
600        smtp.send_message(m)
601        # XXX (see comment in testSend)
602        time.sleep(0.01)
603        smtp.quit()
604
605        self.client_evt.set()
606        self.serv_evt.wait()
607        self.output.flush()
608        # The Resent-Bcc headers are deleted before serialization.
609        del m['Bcc']
610        del m['Resent-Bcc']
611        # Remove the X-Peer header that DebuggingServer adds.
612        test_output = self.get_output_without_xpeer()
613        del m['X-Peer']
614        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
615        self.assertEqual(test_output, mexpect)
616        debugout = smtpd.DEBUGSTREAM.getvalue()
617        sender = re.compile("^sender: holy@grail.net$", re.MULTILINE)
618        self.assertRegex(debugout, sender)
619        for addr in ('my_mom@great.cooker.com', 'Jeff', 'doe@losthope.net'):
620            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
621                                 re.MULTILINE)
622            self.assertRegex(debugout, to_addr)
623
624    def testSendMessageMultipleResentRaises(self):
625        m = email.mime.text.MIMEText('A test message')
626        m['From'] = 'foo@bar.com'
627        m['To'] = 'John'
628        m['CC'] = 'Sally, Fred'
629        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
630        m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
631        m['Resent-From'] = 'holy@grail.net'
632        m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
633        m['Resent-Bcc'] = 'doe@losthope.net'
634        m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000'
635        m['Resent-To'] = 'holy@grail.net'
636        m['Resent-From'] = 'Martha <my_mom@great.cooker.com>, Jeff'
637        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
638        self.addCleanup(smtp.close)
639        with self.assertRaises(ValueError):
640            smtp.send_message(m)
641        smtp.close()
642
643class NonConnectingTests(unittest.TestCase):
644
645    def testNotConnected(self):
646        # Test various operations on an unconnected SMTP object that
647        # should raise exceptions (at present the attempt in SMTP.send
648        # to reference the nonexistent 'sock' attribute of the SMTP object
649        # causes an AttributeError)
650        smtp = smtplib.SMTP()
651        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
652        self.assertRaises(smtplib.SMTPServerDisconnected,
653                          smtp.send, 'test msg')
654
655    def testNonnumericPort(self):
656        # check that non-numeric port raises OSError
657        self.assertRaises(OSError, smtplib.SMTP,
658                          "localhost", "bogus")
659        self.assertRaises(OSError, smtplib.SMTP,
660                          "localhost:bogus")
661
662    def testSockAttributeExists(self):
663        # check that sock attribute is present outside of a connect() call
664        # (regression test, the previous behavior raised an
665        #  AttributeError: 'SMTP' object has no attribute 'sock')
666        with smtplib.SMTP() as smtp:
667            self.assertIsNone(smtp.sock)
668
669
670class DefaultArgumentsTests(unittest.TestCase):
671
672    def setUp(self):
673        self.msg = EmailMessage()
674        self.msg['From'] = 'Páolo <főo@bar.com>'
675        self.smtp = smtplib.SMTP()
676        self.smtp.ehlo = Mock(return_value=(200, 'OK'))
677        self.smtp.has_extn, self.smtp.sendmail = Mock(), Mock()
678
679    def testSendMessage(self):
680        expected_mail_options = ('SMTPUTF8', 'BODY=8BITMIME')
681        self.smtp.send_message(self.msg)
682        self.smtp.send_message(self.msg)
683        self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
684                         expected_mail_options)
685        self.assertEqual(self.smtp.sendmail.call_args_list[1][0][3],
686                         expected_mail_options)
687
688    def testSendMessageWithMailOptions(self):
689        mail_options = ['STARTTLS']
690        expected_mail_options = ('STARTTLS', 'SMTPUTF8', 'BODY=8BITMIME')
691        self.smtp.send_message(self.msg, None, None, mail_options)
692        self.assertEqual(mail_options, ['STARTTLS'])
693        self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
694                         expected_mail_options)
695
696
697# test response of client to a non-successful HELO message
698class BadHELOServerTests(unittest.TestCase):
699
700    def setUp(self):
701        smtplib.socket = mock_socket
702        mock_socket.reply_with(b"199 no hello for you!")
703        self.old_stdout = sys.stdout
704        self.output = io.StringIO()
705        sys.stdout = self.output
706        self.port = 25
707
708    def tearDown(self):
709        smtplib.socket = socket
710        sys.stdout = self.old_stdout
711
712    def testFailingHELO(self):
713        self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,
714                            HOST, self.port, 'localhost', 3)
715
716
717class TooLongLineTests(unittest.TestCase):
718    respdata = b'250 OK' + (b'.' * smtplib._MAXLINE * 2) + b'\n'
719
720    def setUp(self):
721        self.thread_key = threading_setup()
722        self.old_stdout = sys.stdout
723        self.output = io.StringIO()
724        sys.stdout = self.output
725
726        self.evt = threading.Event()
727        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
728        self.sock.settimeout(15)
729        self.port = support.bind_port(self.sock)
730        servargs = (self.evt, self.respdata, self.sock)
731        self.thread = threading.Thread(target=server, args=servargs)
732        self.thread.start()
733        self.evt.wait()
734        self.evt.clear()
735
736    def tearDown(self):
737        self.evt.wait()
738        sys.stdout = self.old_stdout
739        join_thread(self.thread)
740        del self.thread
741        self.doCleanups()
742        threading_cleanup(*self.thread_key)
743
744    def testLineTooLong(self):
745        self.assertRaises(smtplib.SMTPResponseException, smtplib.SMTP,
746                          HOST, self.port, 'localhost', 3)
747
748
749sim_users = {'Mr.A@somewhere.com':'John A',
750             'Ms.B@xn--fo-fka.com':'Sally B',
751             'Mrs.C@somewhereesle.com':'Ruth C',
752            }
753
754sim_auth = ('Mr.A@somewhere.com', 'somepassword')
755sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'
756                          'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')
757sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'],
758             'list-2':['Ms.B@xn--fo-fka.com',],
759            }
760
761# Simulated SMTP channel & server
762class ResponseException(Exception): pass
763class SimSMTPChannel(smtpd.SMTPChannel):
764
765    quit_response = None
766    mail_response = None
767    rcpt_response = None
768    data_response = None
769    rcpt_count = 0
770    rset_count = 0
771    disconnect = 0
772    AUTH = 99    # Add protocol state to enable auth testing.
773    authenticated_user = None
774
775    def __init__(self, extra_features, *args, **kw):
776        self._extrafeatures = ''.join(
777            [ "250-{0}\r\n".format(x) for x in extra_features ])
778        super(SimSMTPChannel, self).__init__(*args, **kw)
779
780    # AUTH related stuff.  It would be nice if support for this were in smtpd.
781    def found_terminator(self):
782        if self.smtp_state == self.AUTH:
783            line = self._emptystring.join(self.received_lines)
784            print('Data:', repr(line), file=smtpd.DEBUGSTREAM)
785            self.received_lines = []
786            try:
787                self.auth_object(line)
788            except ResponseException as e:
789                self.smtp_state = self.COMMAND
790                self.push('%s %s' % (e.smtp_code, e.smtp_error))
791            return
792        super().found_terminator()
793
794
795    def smtp_AUTH(self, arg):
796        if not self.seen_greeting:
797            self.push('503 Error: send EHLO first')
798            return
799        if not self.extended_smtp or 'AUTH' not in self._extrafeatures:
800            self.push('500 Error: command "AUTH" not recognized')
801            return
802        if self.authenticated_user is not None:
803            self.push(
804                '503 Bad sequence of commands: already authenticated')
805            return
806        args = arg.split()
807        if len(args) not in [1, 2]:
808            self.push('501 Syntax: AUTH <mechanism> [initial-response]')
809            return
810        auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_')
811        try:
812            self.auth_object = getattr(self, auth_object_name)
813        except AttributeError:
814            self.push('504 Command parameter not implemented: unsupported '
815                      ' authentication mechanism {!r}'.format(auth_object_name))
816            return
817        self.smtp_state = self.AUTH
818        self.auth_object(args[1] if len(args) == 2 else None)
819
820    def _authenticated(self, user, valid):
821        if valid:
822            self.authenticated_user = user
823            self.push('235 Authentication Succeeded')
824        else:
825            self.push('535 Authentication credentials invalid')
826        self.smtp_state = self.COMMAND
827
828    def _decode_base64(self, string):
829        return base64.decodebytes(string.encode('ascii')).decode('utf-8')
830
831    def _auth_plain(self, arg=None):
832        if arg is None:
833            self.push('334 ')
834        else:
835            logpass = self._decode_base64(arg)
836            try:
837                *_, user, password = logpass.split('\0')
838            except ValueError as e:
839                self.push('535 Splitting response {!r} into user and password'
840                          ' failed: {}'.format(logpass, e))
841                return
842            self._authenticated(user, password == sim_auth[1])
843
844    def _auth_login(self, arg=None):
845        if arg is None:
846            # base64 encoded 'Username:'
847            self.push('334 VXNlcm5hbWU6')
848        elif not hasattr(self, '_auth_login_user'):
849            self._auth_login_user = self._decode_base64(arg)
850            # base64 encoded 'Password:'
851            self.push('334 UGFzc3dvcmQ6')
852        else:
853            password = self._decode_base64(arg)
854            self._authenticated(self._auth_login_user, password == sim_auth[1])
855            del self._auth_login_user
856
857    def _auth_buggy(self, arg=None):
858        # This AUTH mechanism will 'trap' client in a neverending 334
859        # base64 encoded 'BuGgYbUgGy'
860        self.push('334 QnVHZ1liVWdHeQ==')
861
862    def _auth_cram_md5(self, arg=None):
863        if arg is None:
864            self.push('334 {}'.format(sim_cram_md5_challenge))
865        else:
866            logpass = self._decode_base64(arg)
867            try:
868                user, hashed_pass = logpass.split()
869            except ValueError as e:
870                self.push('535 Splitting response {!r} into user and password '
871                          'failed: {}'.format(logpass, e))
872                return False
873            valid_hashed_pass = hmac.HMAC(
874                sim_auth[1].encode('ascii'),
875                self._decode_base64(sim_cram_md5_challenge).encode('ascii'),
876                'md5').hexdigest()
877            self._authenticated(user, hashed_pass == valid_hashed_pass)
878    # end AUTH related stuff.
879
880    def smtp_EHLO(self, arg):
881        resp = ('250-testhost\r\n'
882                '250-EXPN\r\n'
883                '250-SIZE 20000000\r\n'
884                '250-STARTTLS\r\n'
885                '250-DELIVERBY\r\n')
886        resp = resp + self._extrafeatures + '250 HELP'
887        self.push(resp)
888        self.seen_greeting = arg
889        self.extended_smtp = True
890
891    def smtp_VRFY(self, arg):
892        # For max compatibility smtplib should be sending the raw address.
893        if arg in sim_users:
894            self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg)))
895        else:
896            self.push('550 No such user: %s' % arg)
897
898    def smtp_EXPN(self, arg):
899        list_name = arg.lower()
900        if list_name in sim_lists:
901            user_list = sim_lists[list_name]
902            for n, user_email in enumerate(user_list):
903                quoted_addr = smtplib.quoteaddr(user_email)
904                if n < len(user_list) - 1:
905                    self.push('250-%s %s' % (sim_users[user_email], quoted_addr))
906                else:
907                    self.push('250 %s %s' % (sim_users[user_email], quoted_addr))
908        else:
909            self.push('550 No access for you!')
910
911    def smtp_QUIT(self, arg):
912        if self.quit_response is None:
913            super(SimSMTPChannel, self).smtp_QUIT(arg)
914        else:
915            self.push(self.quit_response)
916            self.close_when_done()
917
918    def smtp_MAIL(self, arg):
919        if self.mail_response is None:
920            super().smtp_MAIL(arg)
921        else:
922            self.push(self.mail_response)
923            if self.disconnect:
924                self.close_when_done()
925
926    def smtp_RCPT(self, arg):
927        if self.rcpt_response is None:
928            super().smtp_RCPT(arg)
929            return
930        self.rcpt_count += 1
931        self.push(self.rcpt_response[self.rcpt_count-1])
932
933    def smtp_RSET(self, arg):
934        self.rset_count += 1
935        super().smtp_RSET(arg)
936
937    def smtp_DATA(self, arg):
938        if self.data_response is None:
939            super().smtp_DATA(arg)
940        else:
941            self.push(self.data_response)
942
943    def handle_error(self):
944        raise
945
946
947class SimSMTPServer(smtpd.SMTPServer):
948
949    channel_class = SimSMTPChannel
950
951    def __init__(self, *args, **kw):
952        self._extra_features = []
953        self._addresses = {}
954        smtpd.SMTPServer.__init__(self, *args, **kw)
955
956    def handle_accepted(self, conn, addr):
957        self._SMTPchannel = self.channel_class(
958            self._extra_features, self, conn, addr,
959            decode_data=self._decode_data)
960
961    def process_message(self, peer, mailfrom, rcpttos, data):
962        self._addresses['from'] = mailfrom
963        self._addresses['tos'] = rcpttos
964
965    def add_feature(self, feature):
966        self._extra_features.append(feature)
967
968    def handle_error(self):
969        raise
970
971
972# Test various SMTP & ESMTP commands/behaviors that require a simulated server
973# (i.e., something with more features than DebuggingServer)
974class SMTPSimTests(unittest.TestCase):
975
976    def setUp(self):
977        self.thread_key = threading_setup()
978        self.real_getfqdn = socket.getfqdn
979        socket.getfqdn = mock_socket.getfqdn
980        self.serv_evt = threading.Event()
981        self.client_evt = threading.Event()
982        # Pick a random unused port by passing 0 for the port number
983        self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1), decode_data=True)
984        # Keep a note of what port was assigned
985        self.port = self.serv.socket.getsockname()[1]
986        serv_args = (self.serv, self.serv_evt, self.client_evt)
987        self.thread = threading.Thread(target=debugging_server, args=serv_args)
988        self.thread.start()
989
990        # wait until server thread has assigned a port number
991        self.serv_evt.wait()
992        self.serv_evt.clear()
993
994    def tearDown(self):
995        socket.getfqdn = self.real_getfqdn
996        # indicate that the client is finished
997        self.client_evt.set()
998        # wait for the server thread to terminate
999        self.serv_evt.wait()
1000        join_thread(self.thread)
1001        del self.thread
1002        self.doCleanups()
1003        threading_cleanup(*self.thread_key)
1004
1005    def testBasic(self):
1006        # smoke test
1007        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1008        smtp.quit()
1009
1010    def testEHLO(self):
1011        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1012
1013        # no features should be present before the EHLO
1014        self.assertEqual(smtp.esmtp_features, {})
1015
1016        # features expected from the test server
1017        expected_features = {'expn':'',
1018                             'size': '20000000',
1019                             'starttls': '',
1020                             'deliverby': '',
1021                             'help': '',
1022                             }
1023
1024        smtp.ehlo()
1025        self.assertEqual(smtp.esmtp_features, expected_features)
1026        for k in expected_features:
1027            self.assertTrue(smtp.has_extn(k))
1028        self.assertFalse(smtp.has_extn('unsupported-feature'))
1029        smtp.quit()
1030
1031    def testVRFY(self):
1032        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1033
1034        for addr_spec, name in sim_users.items():
1035            expected_known = (250, bytes('%s %s' %
1036                                         (name, smtplib.quoteaddr(addr_spec)),
1037                                         "ascii"))
1038            self.assertEqual(smtp.vrfy(addr_spec), expected_known)
1039
1040        u = 'nobody@nowhere.com'
1041        expected_unknown = (550, ('No such user: %s' % u).encode('ascii'))
1042        self.assertEqual(smtp.vrfy(u), expected_unknown)
1043        smtp.quit()
1044
1045    def testEXPN(self):
1046        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1047
1048        for listname, members in sim_lists.items():
1049            users = []
1050            for m in members:
1051                users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m)))
1052            expected_known = (250, bytes('\n'.join(users), "ascii"))
1053            self.assertEqual(smtp.expn(listname), expected_known)
1054
1055        u = 'PSU-Members-List'
1056        expected_unknown = (550, b'No access for you!')
1057        self.assertEqual(smtp.expn(u), expected_unknown)
1058        smtp.quit()
1059
1060    def testAUTH_PLAIN(self):
1061        self.serv.add_feature("AUTH PLAIN")
1062        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1063        resp = smtp.login(sim_auth[0], sim_auth[1])
1064        self.assertEqual(resp, (235, b'Authentication Succeeded'))
1065        smtp.close()
1066
1067    def testAUTH_LOGIN(self):
1068        self.serv.add_feature("AUTH LOGIN")
1069        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1070        resp = smtp.login(sim_auth[0], sim_auth[1])
1071        self.assertEqual(resp, (235, b'Authentication Succeeded'))
1072        smtp.close()
1073
1074    def testAUTH_LOGIN_initial_response_ok(self):
1075        self.serv.add_feature("AUTH LOGIN")
1076        with smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) as smtp:
1077            smtp.user, smtp.password = sim_auth
1078            smtp.ehlo("test_auth_login")
1079            resp = smtp.auth("LOGIN", smtp.auth_login, initial_response_ok=True)
1080            self.assertEqual(resp, (235, b'Authentication Succeeded'))
1081
1082    def testAUTH_LOGIN_initial_response_notok(self):
1083        self.serv.add_feature("AUTH LOGIN")
1084        with smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) as smtp:
1085            smtp.user, smtp.password = sim_auth
1086            smtp.ehlo("test_auth_login")
1087            resp = smtp.auth("LOGIN", smtp.auth_login, initial_response_ok=False)
1088            self.assertEqual(resp, (235, b'Authentication Succeeded'))
1089
1090    def testAUTH_BUGGY(self):
1091        self.serv.add_feature("AUTH BUGGY")
1092
1093        def auth_buggy(challenge=None):
1094            self.assertEqual(b"BuGgYbUgGy", challenge)
1095            return "\0"
1096
1097        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1098        try:
1099            smtp.user, smtp.password = sim_auth
1100            smtp.ehlo("test_auth_buggy")
1101            expect = r"^Server AUTH mechanism infinite loop.*"
1102            with self.assertRaisesRegex(smtplib.SMTPException, expect) as cm:
1103                smtp.auth("BUGGY", auth_buggy, initial_response_ok=False)
1104        finally:
1105            smtp.close()
1106
1107    @requires_hashdigest('md5')
1108    def testAUTH_CRAM_MD5(self):
1109        self.serv.add_feature("AUTH CRAM-MD5")
1110        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1111        resp = smtp.login(sim_auth[0], sim_auth[1])
1112        self.assertEqual(resp, (235, b'Authentication Succeeded'))
1113        smtp.close()
1114
1115    def testAUTH_multiple(self):
1116        # Test that multiple authentication methods are tried.
1117        self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5")
1118        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1119        resp = smtp.login(sim_auth[0], sim_auth[1])
1120        self.assertEqual(resp, (235, b'Authentication Succeeded'))
1121        smtp.close()
1122
1123    def test_auth_function(self):
1124        supported = {'PLAIN', 'LOGIN'}
1125        try:
1126            hashlib.md5()
1127        except ValueError:
1128            pass
1129        else:
1130            supported.add('CRAM-MD5')
1131        for mechanism in supported:
1132            self.serv.add_feature("AUTH {}".format(mechanism))
1133        for mechanism in supported:
1134            with self.subTest(mechanism=mechanism):
1135                smtp = smtplib.SMTP(HOST, self.port,
1136                                    local_hostname='localhost', timeout=15)
1137                smtp.ehlo('foo')
1138                smtp.user, smtp.password = sim_auth[0], sim_auth[1]
1139                method = 'auth_' + mechanism.lower().replace('-', '_')
1140                resp = smtp.auth(mechanism, getattr(smtp, method))
1141                self.assertEqual(resp, (235, b'Authentication Succeeded'))
1142                smtp.close()
1143
1144    def test_quit_resets_greeting(self):
1145        smtp = smtplib.SMTP(HOST, self.port,
1146                            local_hostname='localhost',
1147                            timeout=15)
1148        code, message = smtp.ehlo()
1149        self.assertEqual(code, 250)
1150        self.assertIn('size', smtp.esmtp_features)
1151        smtp.quit()
1152        self.assertNotIn('size', smtp.esmtp_features)
1153        smtp.connect(HOST, self.port)
1154        self.assertNotIn('size', smtp.esmtp_features)
1155        smtp.ehlo_or_helo_if_needed()
1156        self.assertIn('size', smtp.esmtp_features)
1157        smtp.quit()
1158
1159    def test_with_statement(self):
1160        with smtplib.SMTP(HOST, self.port) as smtp:
1161            code, message = smtp.noop()
1162            self.assertEqual(code, 250)
1163        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
1164        with smtplib.SMTP(HOST, self.port) as smtp:
1165            smtp.close()
1166        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
1167
1168    def test_with_statement_QUIT_failure(self):
1169        with self.assertRaises(smtplib.SMTPResponseException) as error:
1170            with smtplib.SMTP(HOST, self.port) as smtp:
1171                smtp.noop()
1172                self.serv._SMTPchannel.quit_response = '421 QUIT FAILED'
1173        self.assertEqual(error.exception.smtp_code, 421)
1174        self.assertEqual(error.exception.smtp_error, b'QUIT FAILED')
1175
1176    #TODO: add tests for correct AUTH method fallback now that the
1177    #test infrastructure can support it.
1178
1179    # Issue 17498: make sure _rset does not raise SMTPServerDisconnected exception
1180    def test__rest_from_mail_cmd(self):
1181        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1182        smtp.noop()
1183        self.serv._SMTPchannel.mail_response = '451 Requested action aborted'
1184        self.serv._SMTPchannel.disconnect = True
1185        with self.assertRaises(smtplib.SMTPSenderRefused):
1186            smtp.sendmail('John', 'Sally', 'test message')
1187        self.assertIsNone(smtp.sock)
1188
1189    # Issue 5713: make sure close, not rset, is called if we get a 421 error
1190    def test_421_from_mail_cmd(self):
1191        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1192        smtp.noop()
1193        self.serv._SMTPchannel.mail_response = '421 closing connection'
1194        with self.assertRaises(smtplib.SMTPSenderRefused):
1195            smtp.sendmail('John', 'Sally', 'test message')
1196        self.assertIsNone(smtp.sock)
1197        self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
1198
1199    def test_421_from_rcpt_cmd(self):
1200        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1201        smtp.noop()
1202        self.serv._SMTPchannel.rcpt_response = ['250 accepted', '421 closing']
1203        with self.assertRaises(smtplib.SMTPRecipientsRefused) as r:
1204            smtp.sendmail('John', ['Sally', 'Frank', 'George'], 'test message')
1205        self.assertIsNone(smtp.sock)
1206        self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
1207        self.assertDictEqual(r.exception.args[0], {'Frank': (421, b'closing')})
1208
1209    def test_421_from_data_cmd(self):
1210        class MySimSMTPChannel(SimSMTPChannel):
1211            def found_terminator(self):
1212                if self.smtp_state == self.DATA:
1213                    self.push('421 closing')
1214                else:
1215                    super().found_terminator()
1216        self.serv.channel_class = MySimSMTPChannel
1217        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
1218        smtp.noop()
1219        with self.assertRaises(smtplib.SMTPDataError):
1220            smtp.sendmail('John@foo.org', ['Sally@foo.org'], 'test message')
1221        self.assertIsNone(smtp.sock)
1222        self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0)
1223
1224    def test_smtputf8_NotSupportedError_if_no_server_support(self):
1225        smtp = smtplib.SMTP(
1226            HOST, self.port, local_hostname='localhost', timeout=3)
1227        self.addCleanup(smtp.close)
1228        smtp.ehlo()
1229        self.assertTrue(smtp.does_esmtp)
1230        self.assertFalse(smtp.has_extn('smtputf8'))
1231        self.assertRaises(
1232            smtplib.SMTPNotSupportedError,
1233            smtp.sendmail,
1234            'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
1235        self.assertRaises(
1236            smtplib.SMTPNotSupportedError,
1237            smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8'])
1238
1239    def test_send_unicode_without_SMTPUTF8(self):
1240        smtp = smtplib.SMTP(
1241            HOST, self.port, local_hostname='localhost', timeout=3)
1242        self.addCleanup(smtp.close)
1243        self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Böb', '')
1244        self.assertRaises(UnicodeEncodeError, smtp.mail, 'Älice')
1245
1246    def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self):
1247        # This test is located here and not in the SMTPUTF8SimTests
1248        # class because it needs a "regular" SMTP server to work
1249        msg = EmailMessage()
1250        msg['From'] = "Páolo <főo@bar.com>"
1251        msg['To'] = 'Dinsdale'
1252        msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
1253        smtp = smtplib.SMTP(
1254            HOST, self.port, local_hostname='localhost', timeout=3)
1255        self.addCleanup(smtp.close)
1256        with self.assertRaises(smtplib.SMTPNotSupportedError):
1257            smtp.send_message(msg)
1258
1259    def test_name_field_not_included_in_envelop_addresses(self):
1260        smtp = smtplib.SMTP(
1261            HOST, self.port, local_hostname='localhost', timeout=3
1262        )
1263        self.addCleanup(smtp.close)
1264
1265        message = EmailMessage()
1266        message['From'] = email.utils.formataddr(('Michaël', 'michael@example.com'))
1267        message['To'] = email.utils.formataddr(('René', 'rene@example.com'))
1268
1269        self.assertDictEqual(smtp.send_message(message), {})
1270
1271        self.assertEqual(self.serv._addresses['from'], 'michael@example.com')
1272        self.assertEqual(self.serv._addresses['tos'], ['rene@example.com'])
1273
1274
1275class SimSMTPUTF8Server(SimSMTPServer):
1276
1277    def __init__(self, *args, **kw):
1278        # The base SMTP server turns these on automatically, but our test
1279        # server is set up to munge the EHLO response, so we need to provide
1280        # them as well.  And yes, the call is to SMTPServer not SimSMTPServer.
1281        self._extra_features = ['SMTPUTF8', '8BITMIME']
1282        smtpd.SMTPServer.__init__(self, *args, **kw)
1283
1284    def handle_accepted(self, conn, addr):
1285        self._SMTPchannel = self.channel_class(
1286            self._extra_features, self, conn, addr,
1287            decode_data=self._decode_data,
1288            enable_SMTPUTF8=self.enable_SMTPUTF8,
1289        )
1290
1291    def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None,
1292                                                             rcpt_options=None):
1293        self.last_peer = peer
1294        self.last_mailfrom = mailfrom
1295        self.last_rcpttos = rcpttos
1296        self.last_message = data
1297        self.last_mail_options = mail_options
1298        self.last_rcpt_options = rcpt_options
1299
1300
1301class SMTPUTF8SimTests(unittest.TestCase):
1302
1303    maxDiff = None
1304
1305    def setUp(self):
1306        self.thread_key = threading_setup()
1307        self.real_getfqdn = socket.getfqdn
1308        socket.getfqdn = mock_socket.getfqdn
1309        self.serv_evt = threading.Event()
1310        self.client_evt = threading.Event()
1311        # Pick a random unused port by passing 0 for the port number
1312        self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1),
1313                                      decode_data=False,
1314                                      enable_SMTPUTF8=True)
1315        # Keep a note of what port was assigned
1316        self.port = self.serv.socket.getsockname()[1]
1317        serv_args = (self.serv, self.serv_evt, self.client_evt)
1318        self.thread = threading.Thread(target=debugging_server, args=serv_args)
1319        self.thread.start()
1320
1321        # wait until server thread has assigned a port number
1322        self.serv_evt.wait()
1323        self.serv_evt.clear()
1324
1325    def tearDown(self):
1326        socket.getfqdn = self.real_getfqdn
1327        # indicate that the client is finished
1328        self.client_evt.set()
1329        # wait for the server thread to terminate
1330        self.serv_evt.wait()
1331        join_thread(self.thread)
1332        del self.thread
1333        self.doCleanups()
1334        threading_cleanup(*self.thread_key)
1335
1336    def test_test_server_supports_extensions(self):
1337        smtp = smtplib.SMTP(
1338            HOST, self.port, local_hostname='localhost', timeout=3)
1339        self.addCleanup(smtp.close)
1340        smtp.ehlo()
1341        self.assertTrue(smtp.does_esmtp)
1342        self.assertTrue(smtp.has_extn('smtputf8'))
1343
1344    def test_send_unicode_with_SMTPUTF8_via_sendmail(self):
1345        m = '¡a test message containing unicode!'.encode('utf-8')
1346        smtp = smtplib.SMTP(
1347            HOST, self.port, local_hostname='localhost', timeout=3)
1348        self.addCleanup(smtp.close)
1349        smtp.sendmail('Jőhn', 'Sálly', m,
1350                      mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
1351        self.assertEqual(self.serv.last_mailfrom, 'Jőhn')
1352        self.assertEqual(self.serv.last_rcpttos, ['Sálly'])
1353        self.assertEqual(self.serv.last_message, m)
1354        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1355        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1356        self.assertEqual(self.serv.last_rcpt_options, [])
1357
1358    def test_send_unicode_with_SMTPUTF8_via_low_level_API(self):
1359        m = '¡a test message containing unicode!'.encode('utf-8')
1360        smtp = smtplib.SMTP(
1361            HOST, self.port, local_hostname='localhost', timeout=3)
1362        self.addCleanup(smtp.close)
1363        smtp.ehlo()
1364        self.assertEqual(
1365            smtp.mail('Jő', options=['BODY=8BITMIME', 'SMTPUTF8']),
1366            (250, b'OK'))
1367        self.assertEqual(smtp.rcpt('János'), (250, b'OK'))
1368        self.assertEqual(smtp.data(m), (250, b'OK'))
1369        self.assertEqual(self.serv.last_mailfrom, 'Jő')
1370        self.assertEqual(self.serv.last_rcpttos, ['János'])
1371        self.assertEqual(self.serv.last_message, m)
1372        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1373        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1374        self.assertEqual(self.serv.last_rcpt_options, [])
1375
1376    def test_send_message_uses_smtputf8_if_addrs_non_ascii(self):
1377        msg = EmailMessage()
1378        msg['From'] = "Páolo <főo@bar.com>"
1379        msg['To'] = 'Dinsdale'
1380        msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
1381        # XXX I don't know why I need two \n's here, but this is an existing
1382        # bug (if it is one) and not a problem with the new functionality.
1383        msg.set_content("oh là là, know what I mean, know what I mean?\n\n")
1384        # XXX smtpd converts received /r/n to /n, so we can't easily test that
1385        # we are successfully sending /r/n :(.
1386        expected = textwrap.dedent("""\
1387            From: Páolo <főo@bar.com>
1388            To: Dinsdale
1389            Subject: Nudge nudge, wink, wink \u1F609
1390            Content-Type: text/plain; charset="utf-8"
1391            Content-Transfer-Encoding: 8bit
1392            MIME-Version: 1.0
1393
1394            oh là là, know what I mean, know what I mean?
1395            """)
1396        smtp = smtplib.SMTP(
1397            HOST, self.port, local_hostname='localhost', timeout=3)
1398        self.addCleanup(smtp.close)
1399        self.assertEqual(smtp.send_message(msg), {})
1400        self.assertEqual(self.serv.last_mailfrom, 'főo@bar.com')
1401        self.assertEqual(self.serv.last_rcpttos, ['Dinsdale'])
1402        self.assertEqual(self.serv.last_message.decode(), expected)
1403        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1404        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1405        self.assertEqual(self.serv.last_rcpt_options, [])
1406
1407
1408EXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='')
1409
1410class SimSMTPAUTHInitialResponseChannel(SimSMTPChannel):
1411    def smtp_AUTH(self, arg):
1412        # RFC 4954's AUTH command allows for an optional initial-response.
1413        # Not all AUTH methods support this; some require a challenge.  AUTH
1414        # PLAIN does those, so test that here.  See issue #15014.
1415        args = arg.split()
1416        if args[0].lower() == 'plain':
1417            if len(args) == 2:
1418                # AUTH PLAIN <initial-response> with the response base 64
1419                # encoded.  Hard code the expected response for the test.
1420                if args[1] == EXPECTED_RESPONSE:
1421                    self.push('235 Ok')
1422                    return
1423        self.push('571 Bad authentication')
1424
1425class SimSMTPAUTHInitialResponseServer(SimSMTPServer):
1426    channel_class = SimSMTPAUTHInitialResponseChannel
1427
1428
1429class SMTPAUTHInitialResponseSimTests(unittest.TestCase):
1430    def setUp(self):
1431        self.thread_key = threading_setup()
1432        self.real_getfqdn = socket.getfqdn
1433        socket.getfqdn = mock_socket.getfqdn
1434        self.serv_evt = threading.Event()
1435        self.client_evt = threading.Event()
1436        # Pick a random unused port by passing 0 for the port number
1437        self.serv = SimSMTPAUTHInitialResponseServer(
1438            (HOST, 0), ('nowhere', -1), decode_data=True)
1439        # Keep a note of what port was assigned
1440        self.port = self.serv.socket.getsockname()[1]
1441        serv_args = (self.serv, self.serv_evt, self.client_evt)
1442        self.thread = threading.Thread(target=debugging_server, args=serv_args)
1443        self.thread.start()
1444
1445        # wait until server thread has assigned a port number
1446        self.serv_evt.wait()
1447        self.serv_evt.clear()
1448
1449    def tearDown(self):
1450        socket.getfqdn = self.real_getfqdn
1451        # indicate that the client is finished
1452        self.client_evt.set()
1453        # wait for the server thread to terminate
1454        self.serv_evt.wait()
1455        join_thread(self.thread)
1456        del self.thread
1457        self.doCleanups()
1458        threading_cleanup(*self.thread_key)
1459
1460    def testAUTH_PLAIN_initial_response_login(self):
1461        self.serv.add_feature('AUTH PLAIN')
1462        smtp = smtplib.SMTP(HOST, self.port,
1463                            local_hostname='localhost', timeout=15)
1464        smtp.login('psu', 'doesnotexist')
1465        smtp.close()
1466
1467    def testAUTH_PLAIN_initial_response_auth(self):
1468        self.serv.add_feature('AUTH PLAIN')
1469        smtp = smtplib.SMTP(HOST, self.port,
1470                            local_hostname='localhost', timeout=15)
1471        smtp.user = 'psu'
1472        smtp.password = 'doesnotexist'
1473        code, response = smtp.auth('plain', smtp.auth_plain)
1474        smtp.close()
1475        self.assertEqual(code, 235)
1476
1477
1478if __name__ == '__main__':
1479    unittest.main()
1480