1import asyncore 2import base64 3import email.mime.text 4from email.message import EmailMessage 5from email.base64mime import body_encode as encode_base64 6import email.utils 7import hmac 8import socket 9import smtpd 10import smtplib 11import io 12import re 13import sys 14import time 15import select 16import errno 17import textwrap 18 19import unittest 20from test import support, mock_socket 21from unittest.mock import Mock 22 23HOST = "localhost" 24HOSTv4 = "127.0.0.1" 25HOSTv6 = "::1" 26 27try: 28 import threading 29except ImportError: 30 threading = None 31 32HOST = support.HOST 33 34if sys.platform == 'darwin': 35 # select.poll returns a select.POLLHUP at the end of the tests 36 # on darwin, so just ignore it 37 def handle_expt(self): 38 pass 39 smtpd.SMTPChannel.handle_expt = handle_expt 40 41 42def server(evt, buf, serv): 43 serv.listen() 44 evt.set() 45 try: 46 conn, addr = serv.accept() 47 except socket.timeout: 48 pass 49 else: 50 n = 500 51 while buf and n > 0: 52 r, w, e = select.select([], [conn], []) 53 if w: 54 sent = conn.send(buf) 55 buf = buf[sent:] 56 57 n -= 1 58 59 conn.close() 60 finally: 61 serv.close() 62 evt.set() 63 64class GeneralTests(unittest.TestCase): 65 66 def setUp(self): 67 smtplib.socket = mock_socket 68 self.port = 25 69 70 def tearDown(self): 71 smtplib.socket = socket 72 73 # This method is no longer used but is retained for backward compatibility, 74 # so test to make sure it still works. 75 def testQuoteData(self): 76 teststr = "abc\n.jkl\rfoo\r\n..blue" 77 expected = "abc\r\n..jkl\r\nfoo\r\n...blue" 78 self.assertEqual(expected, smtplib.quotedata(teststr)) 79 80 def testBasic1(self): 81 mock_socket.reply_with(b"220 Hola mundo") 82 # connects 83 smtp = smtplib.SMTP(HOST, self.port) 84 smtp.close() 85 86 def testSourceAddress(self): 87 mock_socket.reply_with(b"220 Hola mundo") 88 # connects 89 smtp = smtplib.SMTP(HOST, self.port, 90 source_address=('127.0.0.1',19876)) 91 self.assertEqual(smtp.source_address, ('127.0.0.1', 19876)) 92 smtp.close() 93 94 def testBasic2(self): 95 mock_socket.reply_with(b"220 Hola mundo") 96 # connects, include port in host name 97 smtp = smtplib.SMTP("%s:%s" % (HOST, self.port)) 98 smtp.close() 99 100 def testLocalHostName(self): 101 mock_socket.reply_with(b"220 Hola mundo") 102 # check that supplied local_hostname is used 103 smtp = smtplib.SMTP(HOST, self.port, local_hostname="testhost") 104 self.assertEqual(smtp.local_hostname, "testhost") 105 smtp.close() 106 107 def testTimeoutDefault(self): 108 mock_socket.reply_with(b"220 Hola mundo") 109 self.assertIsNone(mock_socket.getdefaulttimeout()) 110 mock_socket.setdefaulttimeout(30) 111 self.assertEqual(mock_socket.getdefaulttimeout(), 30) 112 try: 113 smtp = smtplib.SMTP(HOST, self.port) 114 finally: 115 mock_socket.setdefaulttimeout(None) 116 self.assertEqual(smtp.sock.gettimeout(), 30) 117 smtp.close() 118 119 def testTimeoutNone(self): 120 mock_socket.reply_with(b"220 Hola mundo") 121 self.assertIsNone(socket.getdefaulttimeout()) 122 socket.setdefaulttimeout(30) 123 try: 124 smtp = smtplib.SMTP(HOST, self.port, timeout=None) 125 finally: 126 socket.setdefaulttimeout(None) 127 self.assertIsNone(smtp.sock.gettimeout()) 128 smtp.close() 129 130 def testTimeoutValue(self): 131 mock_socket.reply_with(b"220 Hola mundo") 132 smtp = smtplib.SMTP(HOST, self.port, timeout=30) 133 self.assertEqual(smtp.sock.gettimeout(), 30) 134 smtp.close() 135 136 def test_debuglevel(self): 137 mock_socket.reply_with(b"220 Hello world") 138 smtp = smtplib.SMTP() 139 smtp.set_debuglevel(1) 140 with support.captured_stderr() as stderr: 141 smtp.connect(HOST, self.port) 142 smtp.close() 143 expected = re.compile(r"^connect:", re.MULTILINE) 144 self.assertRegex(stderr.getvalue(), expected) 145 146 def test_debuglevel_2(self): 147 mock_socket.reply_with(b"220 Hello world") 148 smtp = smtplib.SMTP() 149 smtp.set_debuglevel(2) 150 with support.captured_stderr() as stderr: 151 smtp.connect(HOST, self.port) 152 smtp.close() 153 expected = re.compile(r"^\d{2}:\d{2}:\d{2}\.\d{6} connect: ", 154 re.MULTILINE) 155 self.assertRegex(stderr.getvalue(), expected) 156 157 158# Test server thread using the specified SMTP server class 159def debugging_server(serv, serv_evt, client_evt): 160 serv_evt.set() 161 162 try: 163 if hasattr(select, 'poll'): 164 poll_fun = asyncore.poll2 165 else: 166 poll_fun = asyncore.poll 167 168 n = 1000 169 while asyncore.socket_map and n > 0: 170 poll_fun(0.01, asyncore.socket_map) 171 172 # when the client conversation is finished, it will 173 # set client_evt, and it's then ok to kill the server 174 if client_evt.is_set(): 175 serv.close() 176 break 177 178 n -= 1 179 180 except socket.timeout: 181 pass 182 finally: 183 if not client_evt.is_set(): 184 # allow some time for the client to read the result 185 time.sleep(0.5) 186 serv.close() 187 asyncore.close_all() 188 serv_evt.set() 189 190MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n' 191MSG_END = '------------ END MESSAGE ------------\n' 192 193# NOTE: Some SMTP objects in the tests below are created with a non-default 194# local_hostname argument to the constructor, since (on some systems) the FQDN 195# lookup caused by the default local_hostname sometimes takes so long that the 196# test server times out, causing the test to fail. 197 198# Test behavior of smtpd.DebuggingServer 199@unittest.skipUnless(threading, 'Threading required for this test.') 200class DebuggingServerTests(unittest.TestCase): 201 202 maxDiff = None 203 204 def setUp(self): 205 self.real_getfqdn = socket.getfqdn 206 socket.getfqdn = mock_socket.getfqdn 207 # temporarily replace sys.stdout to capture DebuggingServer output 208 self.old_stdout = sys.stdout 209 self.output = io.StringIO() 210 sys.stdout = self.output 211 212 self.serv_evt = threading.Event() 213 self.client_evt = threading.Event() 214 # Capture SMTPChannel debug output 215 self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM 216 smtpd.DEBUGSTREAM = io.StringIO() 217 # Pick a random unused port by passing 0 for the port number 218 self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1), 219 decode_data=True) 220 # Keep a note of what port was assigned 221 self.port = self.serv.socket.getsockname()[1] 222 serv_args = (self.serv, self.serv_evt, self.client_evt) 223 self.thread = threading.Thread(target=debugging_server, args=serv_args) 224 self.thread.start() 225 226 # wait until server thread has assigned a port number 227 self.serv_evt.wait() 228 self.serv_evt.clear() 229 230 def tearDown(self): 231 socket.getfqdn = self.real_getfqdn 232 # indicate that the client is finished 233 self.client_evt.set() 234 # wait for the server thread to terminate 235 self.serv_evt.wait() 236 self.thread.join() 237 # restore sys.stdout 238 sys.stdout = self.old_stdout 239 # restore DEBUGSTREAM 240 smtpd.DEBUGSTREAM.close() 241 smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM 242 243 def testBasic(self): 244 # connect 245 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 246 smtp.quit() 247 248 def testSourceAddress(self): 249 # connect 250 port = support.find_unused_port() 251 try: 252 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 253 timeout=3, source_address=('127.0.0.1', port)) 254 self.assertEqual(smtp.source_address, ('127.0.0.1', port)) 255 self.assertEqual(smtp.local_hostname, 'localhost') 256 smtp.quit() 257 except OSError as e: 258 if e.errno == errno.EADDRINUSE: 259 self.skipTest("couldn't bind to port %d" % port) 260 raise 261 262 def testNOOP(self): 263 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 264 expected = (250, b'OK') 265 self.assertEqual(smtp.noop(), expected) 266 smtp.quit() 267 268 def testRSET(self): 269 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 270 expected = (250, b'OK') 271 self.assertEqual(smtp.rset(), expected) 272 smtp.quit() 273 274 def testELHO(self): 275 # EHLO isn't implemented in DebuggingServer 276 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 277 expected = (250, b'\nSIZE 33554432\nHELP') 278 self.assertEqual(smtp.ehlo(), expected) 279 smtp.quit() 280 281 def testEXPNNotImplemented(self): 282 # EXPN isn't implemented in DebuggingServer 283 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 284 expected = (502, b'EXPN not implemented') 285 smtp.putcmd('EXPN') 286 self.assertEqual(smtp.getreply(), expected) 287 smtp.quit() 288 289 def testVRFY(self): 290 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 291 expected = (252, b'Cannot VRFY user, but will accept message ' + \ 292 b'and attempt delivery') 293 self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected) 294 self.assertEqual(smtp.verify('nobody@nowhere.com'), expected) 295 smtp.quit() 296 297 def testSecondHELO(self): 298 # check that a second HELO returns a message that it's a duplicate 299 # (this behavior is specific to smtpd.SMTPChannel) 300 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 301 smtp.helo() 302 expected = (503, b'Duplicate HELO/EHLO') 303 self.assertEqual(smtp.helo(), expected) 304 smtp.quit() 305 306 def testHELP(self): 307 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 308 self.assertEqual(smtp.help(), b'Supported commands: EHLO HELO MAIL ' + \ 309 b'RCPT DATA RSET NOOP QUIT VRFY') 310 smtp.quit() 311 312 def testSend(self): 313 # connect and send mail 314 m = 'A test message' 315 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 316 smtp.sendmail('John', 'Sally', m) 317 # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor 318 # in asyncore. This sleep might help, but should really be fixed 319 # properly by using an Event variable. 320 time.sleep(0.01) 321 smtp.quit() 322 323 self.client_evt.set() 324 self.serv_evt.wait() 325 self.output.flush() 326 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 327 self.assertEqual(self.output.getvalue(), mexpect) 328 329 def testSendBinary(self): 330 m = b'A test message' 331 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 332 smtp.sendmail('John', 'Sally', m) 333 # XXX (see comment in testSend) 334 time.sleep(0.01) 335 smtp.quit() 336 337 self.client_evt.set() 338 self.serv_evt.wait() 339 self.output.flush() 340 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END) 341 self.assertEqual(self.output.getvalue(), mexpect) 342 343 def testSendNeedingDotQuote(self): 344 # Issue 12283 345 m = '.A test\n.mes.sage.' 346 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 347 smtp.sendmail('John', 'Sally', m) 348 # XXX (see comment in testSend) 349 time.sleep(0.01) 350 smtp.quit() 351 352 self.client_evt.set() 353 self.serv_evt.wait() 354 self.output.flush() 355 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 356 self.assertEqual(self.output.getvalue(), mexpect) 357 358 def testSendNullSender(self): 359 m = 'A test message' 360 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 361 smtp.sendmail('<>', 'Sally', m) 362 # XXX (see comment in testSend) 363 time.sleep(0.01) 364 smtp.quit() 365 366 self.client_evt.set() 367 self.serv_evt.wait() 368 self.output.flush() 369 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 370 self.assertEqual(self.output.getvalue(), mexpect) 371 debugout = smtpd.DEBUGSTREAM.getvalue() 372 sender = re.compile("^sender: <>$", re.MULTILINE) 373 self.assertRegex(debugout, sender) 374 375 def testSendMessage(self): 376 m = email.mime.text.MIMEText('A test message') 377 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 378 smtp.send_message(m, from_addr='John', to_addrs='Sally') 379 # XXX (see comment in testSend) 380 time.sleep(0.01) 381 smtp.quit() 382 383 self.client_evt.set() 384 self.serv_evt.wait() 385 self.output.flush() 386 # Add the X-Peer header that DebuggingServer adds 387 m['X-Peer'] = socket.gethostbyname('localhost') 388 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 389 self.assertEqual(self.output.getvalue(), mexpect) 390 391 def testSendMessageWithAddresses(self): 392 m = email.mime.text.MIMEText('A test message') 393 m['From'] = 'foo@bar.com' 394 m['To'] = 'John' 395 m['CC'] = 'Sally, Fred' 396 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>' 397 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 398 smtp.send_message(m) 399 # XXX (see comment in testSend) 400 time.sleep(0.01) 401 smtp.quit() 402 # make sure the Bcc header is still in the message. 403 self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" ' 404 '<warped@silly.walks.com>') 405 406 self.client_evt.set() 407 self.serv_evt.wait() 408 self.output.flush() 409 # Add the X-Peer header that DebuggingServer adds 410 m['X-Peer'] = socket.gethostbyname('localhost') 411 # The Bcc header should not be transmitted. 412 del m['Bcc'] 413 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 414 self.assertEqual(self.output.getvalue(), mexpect) 415 debugout = smtpd.DEBUGSTREAM.getvalue() 416 sender = re.compile("^sender: foo@bar.com$", re.MULTILINE) 417 self.assertRegex(debugout, sender) 418 for addr in ('John', 'Sally', 'Fred', 'root@localhost', 419 'warped@silly.walks.com'): 420 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 421 re.MULTILINE) 422 self.assertRegex(debugout, to_addr) 423 424 def testSendMessageWithSomeAddresses(self): 425 # Make sure nothing breaks if not all of the three 'to' headers exist 426 m = email.mime.text.MIMEText('A test message') 427 m['From'] = 'foo@bar.com' 428 m['To'] = 'John, Dinsdale' 429 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 430 smtp.send_message(m) 431 # XXX (see comment in testSend) 432 time.sleep(0.01) 433 smtp.quit() 434 435 self.client_evt.set() 436 self.serv_evt.wait() 437 self.output.flush() 438 # Add the X-Peer header that DebuggingServer adds 439 m['X-Peer'] = socket.gethostbyname('localhost') 440 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 441 self.assertEqual(self.output.getvalue(), mexpect) 442 debugout = smtpd.DEBUGSTREAM.getvalue() 443 sender = re.compile("^sender: foo@bar.com$", re.MULTILINE) 444 self.assertRegex(debugout, sender) 445 for addr in ('John', 'Dinsdale'): 446 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 447 re.MULTILINE) 448 self.assertRegex(debugout, to_addr) 449 450 def testSendMessageWithSpecifiedAddresses(self): 451 # Make sure addresses specified in call override those in message. 452 m = email.mime.text.MIMEText('A test message') 453 m['From'] = 'foo@bar.com' 454 m['To'] = 'John, Dinsdale' 455 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 456 smtp.send_message(m, from_addr='joe@example.com', to_addrs='foo@example.net') 457 # XXX (see comment in testSend) 458 time.sleep(0.01) 459 smtp.quit() 460 461 self.client_evt.set() 462 self.serv_evt.wait() 463 self.output.flush() 464 # Add the X-Peer header that DebuggingServer adds 465 m['X-Peer'] = socket.gethostbyname('localhost') 466 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 467 self.assertEqual(self.output.getvalue(), mexpect) 468 debugout = smtpd.DEBUGSTREAM.getvalue() 469 sender = re.compile("^sender: joe@example.com$", re.MULTILINE) 470 self.assertRegex(debugout, sender) 471 for addr in ('John', 'Dinsdale'): 472 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 473 re.MULTILINE) 474 self.assertNotRegex(debugout, to_addr) 475 recip = re.compile(r"^recips: .*'foo@example.net'.*$", re.MULTILINE) 476 self.assertRegex(debugout, recip) 477 478 def testSendMessageWithMultipleFrom(self): 479 # Sender overrides To 480 m = email.mime.text.MIMEText('A test message') 481 m['From'] = 'Bernard, Bianca' 482 m['Sender'] = 'the_rescuers@Rescue-Aid-Society.com' 483 m['To'] = 'John, Dinsdale' 484 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 485 smtp.send_message(m) 486 # XXX (see comment in testSend) 487 time.sleep(0.01) 488 smtp.quit() 489 490 self.client_evt.set() 491 self.serv_evt.wait() 492 self.output.flush() 493 # Add the X-Peer header that DebuggingServer adds 494 m['X-Peer'] = socket.gethostbyname('localhost') 495 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 496 self.assertEqual(self.output.getvalue(), mexpect) 497 debugout = smtpd.DEBUGSTREAM.getvalue() 498 sender = re.compile("^sender: the_rescuers@Rescue-Aid-Society.com$", re.MULTILINE) 499 self.assertRegex(debugout, sender) 500 for addr in ('John', 'Dinsdale'): 501 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 502 re.MULTILINE) 503 self.assertRegex(debugout, to_addr) 504 505 def testSendMessageResent(self): 506 m = email.mime.text.MIMEText('A test message') 507 m['From'] = 'foo@bar.com' 508 m['To'] = 'John' 509 m['CC'] = 'Sally, Fred' 510 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>' 511 m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000' 512 m['Resent-From'] = 'holy@grail.net' 513 m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff' 514 m['Resent-Bcc'] = 'doe@losthope.net' 515 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 516 smtp.send_message(m) 517 # XXX (see comment in testSend) 518 time.sleep(0.01) 519 smtp.quit() 520 521 self.client_evt.set() 522 self.serv_evt.wait() 523 self.output.flush() 524 # The Resent-Bcc headers are deleted before serialization. 525 del m['Bcc'] 526 del m['Resent-Bcc'] 527 # Add the X-Peer header that DebuggingServer adds 528 m['X-Peer'] = socket.gethostbyname('localhost') 529 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 530 self.assertEqual(self.output.getvalue(), mexpect) 531 debugout = smtpd.DEBUGSTREAM.getvalue() 532 sender = re.compile("^sender: holy@grail.net$", re.MULTILINE) 533 self.assertRegex(debugout, sender) 534 for addr in ('my_mom@great.cooker.com', 'Jeff', 'doe@losthope.net'): 535 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 536 re.MULTILINE) 537 self.assertRegex(debugout, to_addr) 538 539 def testSendMessageMultipleResentRaises(self): 540 m = email.mime.text.MIMEText('A test message') 541 m['From'] = 'foo@bar.com' 542 m['To'] = 'John' 543 m['CC'] = 'Sally, Fred' 544 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>' 545 m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000' 546 m['Resent-From'] = 'holy@grail.net' 547 m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff' 548 m['Resent-Bcc'] = 'doe@losthope.net' 549 m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000' 550 m['Resent-To'] = 'holy@grail.net' 551 m['Resent-From'] = 'Martha <my_mom@great.cooker.com>, Jeff' 552 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 553 with self.assertRaises(ValueError): 554 smtp.send_message(m) 555 smtp.close() 556 557class NonConnectingTests(unittest.TestCase): 558 559 def testNotConnected(self): 560 # Test various operations on an unconnected SMTP object that 561 # should raise exceptions (at present the attempt in SMTP.send 562 # to reference the nonexistent 'sock' attribute of the SMTP object 563 # causes an AttributeError) 564 smtp = smtplib.SMTP() 565 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo) 566 self.assertRaises(smtplib.SMTPServerDisconnected, 567 smtp.send, 'test msg') 568 569 def testNonnumericPort(self): 570 # check that non-numeric port raises OSError 571 self.assertRaises(OSError, smtplib.SMTP, 572 "localhost", "bogus") 573 self.assertRaises(OSError, smtplib.SMTP, 574 "localhost:bogus") 575 576 577class DefaultArgumentsTests(unittest.TestCase): 578 579 def setUp(self): 580 self.msg = EmailMessage() 581 self.msg['From'] = 'Páolo <főo@bar.com>' 582 self.smtp = smtplib.SMTP() 583 self.smtp.ehlo = Mock(return_value=(200, 'OK')) 584 self.smtp.has_extn, self.smtp.sendmail = Mock(), Mock() 585 586 def testSendMessage(self): 587 expected_mail_options = ('SMTPUTF8', 'BODY=8BITMIME') 588 self.smtp.send_message(self.msg) 589 self.smtp.send_message(self.msg) 590 self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3], 591 expected_mail_options) 592 self.assertEqual(self.smtp.sendmail.call_args_list[1][0][3], 593 expected_mail_options) 594 595 def testSendMessageWithMailOptions(self): 596 mail_options = ['STARTTLS'] 597 expected_mail_options = ('STARTTLS', 'SMTPUTF8', 'BODY=8BITMIME') 598 self.smtp.send_message(self.msg, None, None, mail_options) 599 self.assertEqual(mail_options, ['STARTTLS']) 600 self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3], 601 expected_mail_options) 602 603 604# test response of client to a non-successful HELO message 605@unittest.skipUnless(threading, 'Threading required for this test.') 606class BadHELOServerTests(unittest.TestCase): 607 608 def setUp(self): 609 smtplib.socket = mock_socket 610 mock_socket.reply_with(b"199 no hello for you!") 611 self.old_stdout = sys.stdout 612 self.output = io.StringIO() 613 sys.stdout = self.output 614 self.port = 25 615 616 def tearDown(self): 617 smtplib.socket = socket 618 sys.stdout = self.old_stdout 619 620 def testFailingHELO(self): 621 self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP, 622 HOST, self.port, 'localhost', 3) 623 624 625@unittest.skipUnless(threading, 'Threading required for this test.') 626class TooLongLineTests(unittest.TestCase): 627 respdata = b'250 OK' + (b'.' * smtplib._MAXLINE * 2) + b'\n' 628 629 def setUp(self): 630 self.old_stdout = sys.stdout 631 self.output = io.StringIO() 632 sys.stdout = self.output 633 634 self.evt = threading.Event() 635 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 636 self.sock.settimeout(15) 637 self.port = support.bind_port(self.sock) 638 servargs = (self.evt, self.respdata, self.sock) 639 thread = threading.Thread(target=server, args=servargs) 640 thread.start() 641 self.addCleanup(thread.join) 642 self.evt.wait() 643 self.evt.clear() 644 645 def tearDown(self): 646 self.evt.wait() 647 sys.stdout = self.old_stdout 648 649 def testLineTooLong(self): 650 self.assertRaises(smtplib.SMTPResponseException, smtplib.SMTP, 651 HOST, self.port, 'localhost', 3) 652 653 654sim_users = {'Mr.A@somewhere.com':'John A', 655 'Ms.B@xn--fo-fka.com':'Sally B', 656 'Mrs.C@somewhereesle.com':'Ruth C', 657 } 658 659sim_auth = ('Mr.A@somewhere.com', 'somepassword') 660sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn' 661 'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=') 662sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'], 663 'list-2':['Ms.B@xn--fo-fka.com',], 664 } 665 666# Simulated SMTP channel & server 667class ResponseException(Exception): pass 668class SimSMTPChannel(smtpd.SMTPChannel): 669 670 quit_response = None 671 mail_response = None 672 rcpt_response = None 673 data_response = None 674 rcpt_count = 0 675 rset_count = 0 676 disconnect = 0 677 AUTH = 99 # Add protocol state to enable auth testing. 678 authenticated_user = None 679 680 def __init__(self, extra_features, *args, **kw): 681 self._extrafeatures = ''.join( 682 [ "250-{0}\r\n".format(x) for x in extra_features ]) 683 super(SimSMTPChannel, self).__init__(*args, **kw) 684 685 # AUTH related stuff. It would be nice if support for this were in smtpd. 686 def found_terminator(self): 687 if self.smtp_state == self.AUTH: 688 line = self._emptystring.join(self.received_lines) 689 print('Data:', repr(line), file=smtpd.DEBUGSTREAM) 690 self.received_lines = [] 691 try: 692 self.auth_object(line) 693 except ResponseException as e: 694 self.smtp_state = self.COMMAND 695 self.push('%s %s' % (e.smtp_code, e.smtp_error)) 696 return 697 super().found_terminator() 698 699 700 def smtp_AUTH(self, arg): 701 if not self.seen_greeting: 702 self.push('503 Error: send EHLO first') 703 return 704 if not self.extended_smtp or 'AUTH' not in self._extrafeatures: 705 self.push('500 Error: command "AUTH" not recognized') 706 return 707 if self.authenticated_user is not None: 708 self.push( 709 '503 Bad sequence of commands: already authenticated') 710 return 711 args = arg.split() 712 if len(args) not in [1, 2]: 713 self.push('501 Syntax: AUTH <mechanism> [initial-response]') 714 return 715 auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_') 716 try: 717 self.auth_object = getattr(self, auth_object_name) 718 except AttributeError: 719 self.push('504 Command parameter not implemented: unsupported ' 720 ' authentication mechanism {!r}'.format(auth_object_name)) 721 return 722 self.smtp_state = self.AUTH 723 self.auth_object(args[1] if len(args) == 2 else None) 724 725 def _authenticated(self, user, valid): 726 if valid: 727 self.authenticated_user = user 728 self.push('235 Authentication Succeeded') 729 else: 730 self.push('535 Authentication credentials invalid') 731 self.smtp_state = self.COMMAND 732 733 def _decode_base64(self, string): 734 return base64.decodebytes(string.encode('ascii')).decode('utf-8') 735 736 def _auth_plain(self, arg=None): 737 if arg is None: 738 self.push('334 ') 739 else: 740 logpass = self._decode_base64(arg) 741 try: 742 *_, user, password = logpass.split('\0') 743 except ValueError as e: 744 self.push('535 Splitting response {!r} into user and password' 745 ' failed: {}'.format(logpass, e)) 746 return 747 self._authenticated(user, password == sim_auth[1]) 748 749 def _auth_login(self, arg=None): 750 if arg is None: 751 # base64 encoded 'Username:' 752 self.push('334 VXNlcm5hbWU6') 753 elif not hasattr(self, '_auth_login_user'): 754 self._auth_login_user = self._decode_base64(arg) 755 # base64 encoded 'Password:' 756 self.push('334 UGFzc3dvcmQ6') 757 else: 758 password = self._decode_base64(arg) 759 self._authenticated(self._auth_login_user, password == sim_auth[1]) 760 del self._auth_login_user 761 762 def _auth_cram_md5(self, arg=None): 763 if arg is None: 764 self.push('334 {}'.format(sim_cram_md5_challenge)) 765 else: 766 logpass = self._decode_base64(arg) 767 try: 768 user, hashed_pass = logpass.split() 769 except ValueError as e: 770 self.push('535 Splitting response {!r} into user and password ' 771 'failed: {}'.format(logpass, e)) 772 return False 773 valid_hashed_pass = hmac.HMAC( 774 sim_auth[1].encode('ascii'), 775 self._decode_base64(sim_cram_md5_challenge).encode('ascii'), 776 'md5').hexdigest() 777 self._authenticated(user, hashed_pass == valid_hashed_pass) 778 # end AUTH related stuff. 779 780 def smtp_EHLO(self, arg): 781 resp = ('250-testhost\r\n' 782 '250-EXPN\r\n' 783 '250-SIZE 20000000\r\n' 784 '250-STARTTLS\r\n' 785 '250-DELIVERBY\r\n') 786 resp = resp + self._extrafeatures + '250 HELP' 787 self.push(resp) 788 self.seen_greeting = arg 789 self.extended_smtp = True 790 791 def smtp_VRFY(self, arg): 792 # For max compatibility smtplib should be sending the raw address. 793 if arg in sim_users: 794 self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg))) 795 else: 796 self.push('550 No such user: %s' % arg) 797 798 def smtp_EXPN(self, arg): 799 list_name = arg.lower() 800 if list_name in sim_lists: 801 user_list = sim_lists[list_name] 802 for n, user_email in enumerate(user_list): 803 quoted_addr = smtplib.quoteaddr(user_email) 804 if n < len(user_list) - 1: 805 self.push('250-%s %s' % (sim_users[user_email], quoted_addr)) 806 else: 807 self.push('250 %s %s' % (sim_users[user_email], quoted_addr)) 808 else: 809 self.push('550 No access for you!') 810 811 def smtp_QUIT(self, arg): 812 if self.quit_response is None: 813 super(SimSMTPChannel, self).smtp_QUIT(arg) 814 else: 815 self.push(self.quit_response) 816 self.close_when_done() 817 818 def smtp_MAIL(self, arg): 819 if self.mail_response is None: 820 super().smtp_MAIL(arg) 821 else: 822 self.push(self.mail_response) 823 if self.disconnect: 824 self.close_when_done() 825 826 def smtp_RCPT(self, arg): 827 if self.rcpt_response is None: 828 super().smtp_RCPT(arg) 829 return 830 self.rcpt_count += 1 831 self.push(self.rcpt_response[self.rcpt_count-1]) 832 833 def smtp_RSET(self, arg): 834 self.rset_count += 1 835 super().smtp_RSET(arg) 836 837 def smtp_DATA(self, arg): 838 if self.data_response is None: 839 super().smtp_DATA(arg) 840 else: 841 self.push(self.data_response) 842 843 def handle_error(self): 844 raise 845 846 847class SimSMTPServer(smtpd.SMTPServer): 848 849 channel_class = SimSMTPChannel 850 851 def __init__(self, *args, **kw): 852 self._extra_features = [] 853 self._addresses = {} 854 smtpd.SMTPServer.__init__(self, *args, **kw) 855 856 def handle_accepted(self, conn, addr): 857 self._SMTPchannel = self.channel_class( 858 self._extra_features, self, conn, addr, 859 decode_data=self._decode_data) 860 861 def process_message(self, peer, mailfrom, rcpttos, data): 862 self._addresses['from'] = mailfrom 863 self._addresses['tos'] = rcpttos 864 865 def add_feature(self, feature): 866 self._extra_features.append(feature) 867 868 def handle_error(self): 869 raise 870 871 872# Test various SMTP & ESMTP commands/behaviors that require a simulated server 873# (i.e., something with more features than DebuggingServer) 874@unittest.skipUnless(threading, 'Threading required for this test.') 875class SMTPSimTests(unittest.TestCase): 876 877 def setUp(self): 878 self.real_getfqdn = socket.getfqdn 879 socket.getfqdn = mock_socket.getfqdn 880 self.serv_evt = threading.Event() 881 self.client_evt = threading.Event() 882 # Pick a random unused port by passing 0 for the port number 883 self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1), decode_data=True) 884 # Keep a note of what port was assigned 885 self.port = self.serv.socket.getsockname()[1] 886 serv_args = (self.serv, self.serv_evt, self.client_evt) 887 self.thread = threading.Thread(target=debugging_server, args=serv_args) 888 self.thread.start() 889 890 # wait until server thread has assigned a port number 891 self.serv_evt.wait() 892 self.serv_evt.clear() 893 894 def tearDown(self): 895 socket.getfqdn = self.real_getfqdn 896 # indicate that the client is finished 897 self.client_evt.set() 898 # wait for the server thread to terminate 899 self.serv_evt.wait() 900 self.thread.join() 901 902 def testBasic(self): 903 # smoke test 904 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 905 smtp.quit() 906 907 def testEHLO(self): 908 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 909 910 # no features should be present before the EHLO 911 self.assertEqual(smtp.esmtp_features, {}) 912 913 # features expected from the test server 914 expected_features = {'expn':'', 915 'size': '20000000', 916 'starttls': '', 917 'deliverby': '', 918 'help': '', 919 } 920 921 smtp.ehlo() 922 self.assertEqual(smtp.esmtp_features, expected_features) 923 for k in expected_features: 924 self.assertTrue(smtp.has_extn(k)) 925 self.assertFalse(smtp.has_extn('unsupported-feature')) 926 smtp.quit() 927 928 def testVRFY(self): 929 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 930 931 for addr_spec, name in sim_users.items(): 932 expected_known = (250, bytes('%s %s' % 933 (name, smtplib.quoteaddr(addr_spec)), 934 "ascii")) 935 self.assertEqual(smtp.vrfy(addr_spec), expected_known) 936 937 u = 'nobody@nowhere.com' 938 expected_unknown = (550, ('No such user: %s' % u).encode('ascii')) 939 self.assertEqual(smtp.vrfy(u), expected_unknown) 940 smtp.quit() 941 942 def testEXPN(self): 943 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 944 945 for listname, members in sim_lists.items(): 946 users = [] 947 for m in members: 948 users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m))) 949 expected_known = (250, bytes('\n'.join(users), "ascii")) 950 self.assertEqual(smtp.expn(listname), expected_known) 951 952 u = 'PSU-Members-List' 953 expected_unknown = (550, b'No access for you!') 954 self.assertEqual(smtp.expn(u), expected_unknown) 955 smtp.quit() 956 957 def testAUTH_PLAIN(self): 958 self.serv.add_feature("AUTH PLAIN") 959 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 960 resp = smtp.login(sim_auth[0], sim_auth[1]) 961 self.assertEqual(resp, (235, b'Authentication Succeeded')) 962 smtp.close() 963 964 def testAUTH_LOGIN(self): 965 self.serv.add_feature("AUTH LOGIN") 966 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 967 resp = smtp.login(sim_auth[0], sim_auth[1]) 968 self.assertEqual(resp, (235, b'Authentication Succeeded')) 969 smtp.close() 970 971 def testAUTH_CRAM_MD5(self): 972 self.serv.add_feature("AUTH CRAM-MD5") 973 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 974 resp = smtp.login(sim_auth[0], sim_auth[1]) 975 self.assertEqual(resp, (235, b'Authentication Succeeded')) 976 smtp.close() 977 978 def testAUTH_multiple(self): 979 # Test that multiple authentication methods are tried. 980 self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5") 981 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 982 resp = smtp.login(sim_auth[0], sim_auth[1]) 983 self.assertEqual(resp, (235, b'Authentication Succeeded')) 984 smtp.close() 985 986 def test_auth_function(self): 987 supported = {'CRAM-MD5', 'PLAIN', 'LOGIN'} 988 for mechanism in supported: 989 self.serv.add_feature("AUTH {}".format(mechanism)) 990 for mechanism in supported: 991 with self.subTest(mechanism=mechanism): 992 smtp = smtplib.SMTP(HOST, self.port, 993 local_hostname='localhost', timeout=15) 994 smtp.ehlo('foo') 995 smtp.user, smtp.password = sim_auth[0], sim_auth[1] 996 method = 'auth_' + mechanism.lower().replace('-', '_') 997 resp = smtp.auth(mechanism, getattr(smtp, method)) 998 self.assertEqual(resp, (235, b'Authentication Succeeded')) 999 smtp.close() 1000 1001 def test_quit_resets_greeting(self): 1002 smtp = smtplib.SMTP(HOST, self.port, 1003 local_hostname='localhost', 1004 timeout=15) 1005 code, message = smtp.ehlo() 1006 self.assertEqual(code, 250) 1007 self.assertIn('size', smtp.esmtp_features) 1008 smtp.quit() 1009 self.assertNotIn('size', smtp.esmtp_features) 1010 smtp.connect(HOST, self.port) 1011 self.assertNotIn('size', smtp.esmtp_features) 1012 smtp.ehlo_or_helo_if_needed() 1013 self.assertIn('size', smtp.esmtp_features) 1014 smtp.quit() 1015 1016 def test_with_statement(self): 1017 with smtplib.SMTP(HOST, self.port) as smtp: 1018 code, message = smtp.noop() 1019 self.assertEqual(code, 250) 1020 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo') 1021 with smtplib.SMTP(HOST, self.port) as smtp: 1022 smtp.close() 1023 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo') 1024 1025 def test_with_statement_QUIT_failure(self): 1026 with self.assertRaises(smtplib.SMTPResponseException) as error: 1027 with smtplib.SMTP(HOST, self.port) as smtp: 1028 smtp.noop() 1029 self.serv._SMTPchannel.quit_response = '421 QUIT FAILED' 1030 self.assertEqual(error.exception.smtp_code, 421) 1031 self.assertEqual(error.exception.smtp_error, b'QUIT FAILED') 1032 1033 #TODO: add tests for correct AUTH method fallback now that the 1034 #test infrastructure can support it. 1035 1036 # Issue 17498: make sure _rset does not raise SMTPServerDisconnected exception 1037 def test__rest_from_mail_cmd(self): 1038 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1039 smtp.noop() 1040 self.serv._SMTPchannel.mail_response = '451 Requested action aborted' 1041 self.serv._SMTPchannel.disconnect = True 1042 with self.assertRaises(smtplib.SMTPSenderRefused): 1043 smtp.sendmail('John', 'Sally', 'test message') 1044 self.assertIsNone(smtp.sock) 1045 1046 # Issue 5713: make sure close, not rset, is called if we get a 421 error 1047 def test_421_from_mail_cmd(self): 1048 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1049 smtp.noop() 1050 self.serv._SMTPchannel.mail_response = '421 closing connection' 1051 with self.assertRaises(smtplib.SMTPSenderRefused): 1052 smtp.sendmail('John', 'Sally', 'test message') 1053 self.assertIsNone(smtp.sock) 1054 self.assertEqual(self.serv._SMTPchannel.rset_count, 0) 1055 1056 def test_421_from_rcpt_cmd(self): 1057 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1058 smtp.noop() 1059 self.serv._SMTPchannel.rcpt_response = ['250 accepted', '421 closing'] 1060 with self.assertRaises(smtplib.SMTPRecipientsRefused) as r: 1061 smtp.sendmail('John', ['Sally', 'Frank', 'George'], 'test message') 1062 self.assertIsNone(smtp.sock) 1063 self.assertEqual(self.serv._SMTPchannel.rset_count, 0) 1064 self.assertDictEqual(r.exception.args[0], {'Frank': (421, b'closing')}) 1065 1066 def test_421_from_data_cmd(self): 1067 class MySimSMTPChannel(SimSMTPChannel): 1068 def found_terminator(self): 1069 if self.smtp_state == self.DATA: 1070 self.push('421 closing') 1071 else: 1072 super().found_terminator() 1073 self.serv.channel_class = MySimSMTPChannel 1074 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1075 smtp.noop() 1076 with self.assertRaises(smtplib.SMTPDataError): 1077 smtp.sendmail('John@foo.org', ['Sally@foo.org'], 'test message') 1078 self.assertIsNone(smtp.sock) 1079 self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0) 1080 1081 def test_smtputf8_NotSupportedError_if_no_server_support(self): 1082 smtp = smtplib.SMTP( 1083 HOST, self.port, local_hostname='localhost', timeout=3) 1084 self.addCleanup(smtp.close) 1085 smtp.ehlo() 1086 self.assertTrue(smtp.does_esmtp) 1087 self.assertFalse(smtp.has_extn('smtputf8')) 1088 self.assertRaises( 1089 smtplib.SMTPNotSupportedError, 1090 smtp.sendmail, 1091 'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8']) 1092 self.assertRaises( 1093 smtplib.SMTPNotSupportedError, 1094 smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8']) 1095 1096 def test_send_unicode_without_SMTPUTF8(self): 1097 smtp = smtplib.SMTP( 1098 HOST, self.port, local_hostname='localhost', timeout=3) 1099 self.addCleanup(smtp.close) 1100 self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Böb', '') 1101 self.assertRaises(UnicodeEncodeError, smtp.mail, 'Älice') 1102 1103 def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self): 1104 # This test is located here and not in the SMTPUTF8SimTests 1105 # class because it needs a "regular" SMTP server to work 1106 msg = EmailMessage() 1107 msg['From'] = "Páolo <főo@bar.com>" 1108 msg['To'] = 'Dinsdale' 1109 msg['Subject'] = 'Nudge nudge, wink, wink \u1F609' 1110 smtp = smtplib.SMTP( 1111 HOST, self.port, local_hostname='localhost', timeout=3) 1112 self.addCleanup(smtp.close) 1113 with self.assertRaises(smtplib.SMTPNotSupportedError): 1114 smtp.send_message(msg) 1115 1116 def test_name_field_not_included_in_envelop_addresses(self): 1117 smtp = smtplib.SMTP( 1118 HOST, self.port, local_hostname='localhost', timeout=3 1119 ) 1120 self.addCleanup(smtp.close) 1121 1122 message = EmailMessage() 1123 message['From'] = email.utils.formataddr(('Michaël', 'michael@example.com')) 1124 message['To'] = email.utils.formataddr(('René', 'rene@example.com')) 1125 1126 self.assertDictEqual(smtp.send_message(message), {}) 1127 1128 self.assertEqual(self.serv._addresses['from'], 'michael@example.com') 1129 self.assertEqual(self.serv._addresses['tos'], ['rene@example.com']) 1130 1131 1132class SimSMTPUTF8Server(SimSMTPServer): 1133 1134 def __init__(self, *args, **kw): 1135 # The base SMTP server turns these on automatically, but our test 1136 # server is set up to munge the EHLO response, so we need to provide 1137 # them as well. And yes, the call is to SMTPServer not SimSMTPServer. 1138 self._extra_features = ['SMTPUTF8', '8BITMIME'] 1139 smtpd.SMTPServer.__init__(self, *args, **kw) 1140 1141 def handle_accepted(self, conn, addr): 1142 self._SMTPchannel = self.channel_class( 1143 self._extra_features, self, conn, addr, 1144 decode_data=self._decode_data, 1145 enable_SMTPUTF8=self.enable_SMTPUTF8, 1146 ) 1147 1148 def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None, 1149 rcpt_options=None): 1150 self.last_peer = peer 1151 self.last_mailfrom = mailfrom 1152 self.last_rcpttos = rcpttos 1153 self.last_message = data 1154 self.last_mail_options = mail_options 1155 self.last_rcpt_options = rcpt_options 1156 1157 1158@unittest.skipUnless(threading, 'Threading required for this test.') 1159class SMTPUTF8SimTests(unittest.TestCase): 1160 1161 maxDiff = None 1162 1163 def setUp(self): 1164 self.real_getfqdn = socket.getfqdn 1165 socket.getfqdn = mock_socket.getfqdn 1166 self.serv_evt = threading.Event() 1167 self.client_evt = threading.Event() 1168 # Pick a random unused port by passing 0 for the port number 1169 self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1), 1170 decode_data=False, 1171 enable_SMTPUTF8=True) 1172 # Keep a note of what port was assigned 1173 self.port = self.serv.socket.getsockname()[1] 1174 serv_args = (self.serv, self.serv_evt, self.client_evt) 1175 self.thread = threading.Thread(target=debugging_server, args=serv_args) 1176 self.thread.start() 1177 1178 # wait until server thread has assigned a port number 1179 self.serv_evt.wait() 1180 self.serv_evt.clear() 1181 1182 def tearDown(self): 1183 socket.getfqdn = self.real_getfqdn 1184 # indicate that the client is finished 1185 self.client_evt.set() 1186 # wait for the server thread to terminate 1187 self.serv_evt.wait() 1188 self.thread.join() 1189 1190 def test_test_server_supports_extensions(self): 1191 smtp = smtplib.SMTP( 1192 HOST, self.port, local_hostname='localhost', timeout=3) 1193 self.addCleanup(smtp.close) 1194 smtp.ehlo() 1195 self.assertTrue(smtp.does_esmtp) 1196 self.assertTrue(smtp.has_extn('smtputf8')) 1197 1198 def test_send_unicode_with_SMTPUTF8_via_sendmail(self): 1199 m = '¡a test message containing unicode!'.encode('utf-8') 1200 smtp = smtplib.SMTP( 1201 HOST, self.port, local_hostname='localhost', timeout=3) 1202 self.addCleanup(smtp.close) 1203 smtp.sendmail('Jőhn', 'Sálly', m, 1204 mail_options=['BODY=8BITMIME', 'SMTPUTF8']) 1205 self.assertEqual(self.serv.last_mailfrom, 'Jőhn') 1206 self.assertEqual(self.serv.last_rcpttos, ['Sálly']) 1207 self.assertEqual(self.serv.last_message, m) 1208 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 1209 self.assertIn('SMTPUTF8', self.serv.last_mail_options) 1210 self.assertEqual(self.serv.last_rcpt_options, []) 1211 1212 def test_send_unicode_with_SMTPUTF8_via_low_level_API(self): 1213 m = '¡a test message containing unicode!'.encode('utf-8') 1214 smtp = smtplib.SMTP( 1215 HOST, self.port, local_hostname='localhost', timeout=3) 1216 self.addCleanup(smtp.close) 1217 smtp.ehlo() 1218 self.assertEqual( 1219 smtp.mail('Jő', options=['BODY=8BITMIME', 'SMTPUTF8']), 1220 (250, b'OK')) 1221 self.assertEqual(smtp.rcpt('János'), (250, b'OK')) 1222 self.assertEqual(smtp.data(m), (250, b'OK')) 1223 self.assertEqual(self.serv.last_mailfrom, 'Jő') 1224 self.assertEqual(self.serv.last_rcpttos, ['János']) 1225 self.assertEqual(self.serv.last_message, m) 1226 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 1227 self.assertIn('SMTPUTF8', self.serv.last_mail_options) 1228 self.assertEqual(self.serv.last_rcpt_options, []) 1229 1230 def test_send_message_uses_smtputf8_if_addrs_non_ascii(self): 1231 msg = EmailMessage() 1232 msg['From'] = "Páolo <főo@bar.com>" 1233 msg['To'] = 'Dinsdale' 1234 msg['Subject'] = 'Nudge nudge, wink, wink \u1F609' 1235 # XXX I don't know why I need two \n's here, but this is an existing 1236 # bug (if it is one) and not a problem with the new functionality. 1237 msg.set_content("oh là là, know what I mean, know what I mean?\n\n") 1238 # XXX smtpd converts received /r/n to /n, so we can't easily test that 1239 # we are successfully sending /r/n :(. 1240 expected = textwrap.dedent("""\ 1241 From: Páolo <főo@bar.com> 1242 To: Dinsdale 1243 Subject: Nudge nudge, wink, wink \u1F609 1244 Content-Type: text/plain; charset="utf-8" 1245 Content-Transfer-Encoding: 8bit 1246 MIME-Version: 1.0 1247 1248 oh là là, know what I mean, know what I mean? 1249 """) 1250 smtp = smtplib.SMTP( 1251 HOST, self.port, local_hostname='localhost', timeout=3) 1252 self.addCleanup(smtp.close) 1253 self.assertEqual(smtp.send_message(msg), {}) 1254 self.assertEqual(self.serv.last_mailfrom, 'főo@bar.com') 1255 self.assertEqual(self.serv.last_rcpttos, ['Dinsdale']) 1256 self.assertEqual(self.serv.last_message.decode(), expected) 1257 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 1258 self.assertIn('SMTPUTF8', self.serv.last_mail_options) 1259 self.assertEqual(self.serv.last_rcpt_options, []) 1260 1261 1262EXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='') 1263 1264class SimSMTPAUTHInitialResponseChannel(SimSMTPChannel): 1265 def smtp_AUTH(self, arg): 1266 # RFC 4954's AUTH command allows for an optional initial-response. 1267 # Not all AUTH methods support this; some require a challenge. AUTH 1268 # PLAIN does those, so test that here. See issue #15014. 1269 args = arg.split() 1270 if args[0].lower() == 'plain': 1271 if len(args) == 2: 1272 # AUTH PLAIN <initial-response> with the response base 64 1273 # encoded. Hard code the expected response for the test. 1274 if args[1] == EXPECTED_RESPONSE: 1275 self.push('235 Ok') 1276 return 1277 self.push('571 Bad authentication') 1278 1279class SimSMTPAUTHInitialResponseServer(SimSMTPServer): 1280 channel_class = SimSMTPAUTHInitialResponseChannel 1281 1282 1283@unittest.skipUnless(threading, 'Threading required for this test.') 1284class SMTPAUTHInitialResponseSimTests(unittest.TestCase): 1285 def setUp(self): 1286 self.real_getfqdn = socket.getfqdn 1287 socket.getfqdn = mock_socket.getfqdn 1288 self.serv_evt = threading.Event() 1289 self.client_evt = threading.Event() 1290 # Pick a random unused port by passing 0 for the port number 1291 self.serv = SimSMTPAUTHInitialResponseServer( 1292 (HOST, 0), ('nowhere', -1), decode_data=True) 1293 # Keep a note of what port was assigned 1294 self.port = self.serv.socket.getsockname()[1] 1295 serv_args = (self.serv, self.serv_evt, self.client_evt) 1296 self.thread = threading.Thread(target=debugging_server, args=serv_args) 1297 self.thread.start() 1298 1299 # wait until server thread has assigned a port number 1300 self.serv_evt.wait() 1301 self.serv_evt.clear() 1302 1303 def tearDown(self): 1304 socket.getfqdn = self.real_getfqdn 1305 # indicate that the client is finished 1306 self.client_evt.set() 1307 # wait for the server thread to terminate 1308 self.serv_evt.wait() 1309 self.thread.join() 1310 1311 def testAUTH_PLAIN_initial_response_login(self): 1312 self.serv.add_feature('AUTH PLAIN') 1313 smtp = smtplib.SMTP(HOST, self.port, 1314 local_hostname='localhost', timeout=15) 1315 smtp.login('psu', 'doesnotexist') 1316 smtp.close() 1317 1318 def testAUTH_PLAIN_initial_response_auth(self): 1319 self.serv.add_feature('AUTH PLAIN') 1320 smtp = smtplib.SMTP(HOST, self.port, 1321 local_hostname='localhost', timeout=15) 1322 smtp.user = 'psu' 1323 smtp.password = 'doesnotexist' 1324 code, response = smtp.auth('plain', smtp.auth_plain) 1325 smtp.close() 1326 self.assertEqual(code, 235) 1327 1328 1329if __name__ == '__main__': 1330 unittest.main() 1331