1import base64 2import email.mime.text 3from email.message import EmailMessage 4from email.base64mime import body_encode as encode_base64 5import email.utils 6import hashlib 7import hmac 8import socket 9import smtplib 10import io 11import re 12import sys 13import time 14import select 15import errno 16import textwrap 17import threading 18 19import unittest 20from test import support, mock_socket 21from test.support import hashlib_helper 22from test.support import socket_helper 23from test.support import threading_helper 24from unittest.mock import Mock 25 26import warnings 27with warnings.catch_warnings(): 28 warnings.simplefilter('ignore', DeprecationWarning) 29 import asyncore 30 import smtpd 31 32HOST = socket_helper.HOST 33 34if sys.platform == 'darwin': 35 # select.poll returns a select.POLLHUP at the end of the tests 36 # on darwin, so just ignore it 37 def handle_expt(self): 38 pass 39 smtpd.SMTPChannel.handle_expt = handle_expt 40 41 42def server(evt, buf, serv): 43 serv.listen() 44 evt.set() 45 try: 46 conn, addr = serv.accept() 47 except TimeoutError: 48 pass 49 else: 50 n = 500 51 while buf and n > 0: 52 r, w, e = select.select([], [conn], []) 53 if w: 54 sent = conn.send(buf) 55 buf = buf[sent:] 56 57 n -= 1 58 59 conn.close() 60 finally: 61 serv.close() 62 evt.set() 63 64class GeneralTests: 65 66 def setUp(self): 67 smtplib.socket = mock_socket 68 self.port = 25 69 70 def tearDown(self): 71 smtplib.socket = socket 72 73 # This method is no longer used but is retained for backward compatibility, 74 # so test to make sure it still works. 75 def testQuoteData(self): 76 teststr = "abc\n.jkl\rfoo\r\n..blue" 77 expected = "abc\r\n..jkl\r\nfoo\r\n...blue" 78 self.assertEqual(expected, smtplib.quotedata(teststr)) 79 80 def testBasic1(self): 81 mock_socket.reply_with(b"220 Hola mundo") 82 # connects 83 client = self.client(HOST, self.port) 84 client.close() 85 86 def testSourceAddress(self): 87 mock_socket.reply_with(b"220 Hola mundo") 88 # connects 89 client = self.client(HOST, self.port, 90 source_address=('127.0.0.1',19876)) 91 self.assertEqual(client.source_address, ('127.0.0.1', 19876)) 92 client.close() 93 94 def testBasic2(self): 95 mock_socket.reply_with(b"220 Hola mundo") 96 # connects, include port in host name 97 client = self.client("%s:%s" % (HOST, self.port)) 98 client.close() 99 100 def testLocalHostName(self): 101 mock_socket.reply_with(b"220 Hola mundo") 102 # check that supplied local_hostname is used 103 client = self.client(HOST, self.port, local_hostname="testhost") 104 self.assertEqual(client.local_hostname, "testhost") 105 client.close() 106 107 def testTimeoutDefault(self): 108 mock_socket.reply_with(b"220 Hola mundo") 109 self.assertIsNone(mock_socket.getdefaulttimeout()) 110 mock_socket.setdefaulttimeout(30) 111 self.assertEqual(mock_socket.getdefaulttimeout(), 30) 112 try: 113 client = self.client(HOST, self.port) 114 finally: 115 mock_socket.setdefaulttimeout(None) 116 self.assertEqual(client.sock.gettimeout(), 30) 117 client.close() 118 119 def testTimeoutNone(self): 120 mock_socket.reply_with(b"220 Hola mundo") 121 self.assertIsNone(socket.getdefaulttimeout()) 122 socket.setdefaulttimeout(30) 123 try: 124 client = self.client(HOST, self.port, timeout=None) 125 finally: 126 socket.setdefaulttimeout(None) 127 self.assertIsNone(client.sock.gettimeout()) 128 client.close() 129 130 def testTimeoutZero(self): 131 mock_socket.reply_with(b"220 Hola mundo") 132 with self.assertRaises(ValueError): 133 self.client(HOST, self.port, timeout=0) 134 135 def testTimeoutValue(self): 136 mock_socket.reply_with(b"220 Hola mundo") 137 client = self.client(HOST, self.port, timeout=30) 138 self.assertEqual(client.sock.gettimeout(), 30) 139 client.close() 140 141 def test_debuglevel(self): 142 mock_socket.reply_with(b"220 Hello world") 143 client = self.client() 144 client.set_debuglevel(1) 145 with support.captured_stderr() as stderr: 146 client.connect(HOST, self.port) 147 client.close() 148 expected = re.compile(r"^connect:", re.MULTILINE) 149 self.assertRegex(stderr.getvalue(), expected) 150 151 def test_debuglevel_2(self): 152 mock_socket.reply_with(b"220 Hello world") 153 client = self.client() 154 client.set_debuglevel(2) 155 with support.captured_stderr() as stderr: 156 client.connect(HOST, self.port) 157 client.close() 158 expected = re.compile(r"^\d{2}:\d{2}:\d{2}\.\d{6} connect: ", 159 re.MULTILINE) 160 self.assertRegex(stderr.getvalue(), expected) 161 162 163class SMTPGeneralTests(GeneralTests, unittest.TestCase): 164 165 client = smtplib.SMTP 166 167 168class LMTPGeneralTests(GeneralTests, unittest.TestCase): 169 170 client = smtplib.LMTP 171 172 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), "test requires Unix domain socket") 173 def testUnixDomainSocketTimeoutDefault(self): 174 local_host = '/some/local/lmtp/delivery/program' 175 mock_socket.reply_with(b"220 Hello world") 176 try: 177 client = self.client(local_host, self.port) 178 finally: 179 mock_socket.setdefaulttimeout(None) 180 self.assertIsNone(client.sock.gettimeout()) 181 client.close() 182 183 def testTimeoutZero(self): 184 super().testTimeoutZero() 185 local_host = '/some/local/lmtp/delivery/program' 186 with self.assertRaises(ValueError): 187 self.client(local_host, timeout=0) 188 189# Test server thread using the specified SMTP server class 190def debugging_server(serv, serv_evt, client_evt): 191 serv_evt.set() 192 193 try: 194 if hasattr(select, 'poll'): 195 poll_fun = asyncore.poll2 196 else: 197 poll_fun = asyncore.poll 198 199 n = 1000 200 while asyncore.socket_map and n > 0: 201 poll_fun(0.01, asyncore.socket_map) 202 203 # when the client conversation is finished, it will 204 # set client_evt, and it's then ok to kill the server 205 if client_evt.is_set(): 206 serv.close() 207 break 208 209 n -= 1 210 211 except TimeoutError: 212 pass 213 finally: 214 if not client_evt.is_set(): 215 # allow some time for the client to read the result 216 time.sleep(0.5) 217 serv.close() 218 asyncore.close_all() 219 serv_evt.set() 220 221MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n' 222MSG_END = '------------ END MESSAGE ------------\n' 223 224# NOTE: Some SMTP objects in the tests below are created with a non-default 225# local_hostname argument to the constructor, since (on some systems) the FQDN 226# lookup caused by the default local_hostname sometimes takes so long that the 227# test server times out, causing the test to fail. 228 229# Test behavior of smtpd.DebuggingServer 230class DebuggingServerTests(unittest.TestCase): 231 232 maxDiff = None 233 234 def setUp(self): 235 self.thread_key = threading_helper.threading_setup() 236 self.real_getfqdn = socket.getfqdn 237 socket.getfqdn = mock_socket.getfqdn 238 # temporarily replace sys.stdout to capture DebuggingServer output 239 self.old_stdout = sys.stdout 240 self.output = io.StringIO() 241 sys.stdout = self.output 242 243 self.serv_evt = threading.Event() 244 self.client_evt = threading.Event() 245 # Capture SMTPChannel debug output 246 self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM 247 smtpd.DEBUGSTREAM = io.StringIO() 248 # Pick a random unused port by passing 0 for the port number 249 self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1), 250 decode_data=True) 251 # Keep a note of what server host and port were assigned 252 self.host, self.port = self.serv.socket.getsockname()[:2] 253 serv_args = (self.serv, self.serv_evt, self.client_evt) 254 self.thread = threading.Thread(target=debugging_server, args=serv_args) 255 self.thread.start() 256 257 # wait until server thread has assigned a port number 258 self.serv_evt.wait() 259 self.serv_evt.clear() 260 261 def tearDown(self): 262 socket.getfqdn = self.real_getfqdn 263 # indicate that the client is finished 264 self.client_evt.set() 265 # wait for the server thread to terminate 266 self.serv_evt.wait() 267 threading_helper.join_thread(self.thread) 268 # restore sys.stdout 269 sys.stdout = self.old_stdout 270 # restore DEBUGSTREAM 271 smtpd.DEBUGSTREAM.close() 272 smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM 273 del self.thread 274 self.doCleanups() 275 threading_helper.threading_cleanup(*self.thread_key) 276 277 def get_output_without_xpeer(self): 278 test_output = self.output.getvalue() 279 return re.sub(r'(.*?)^X-Peer:\s*\S+\n(.*)', r'\1\2', 280 test_output, flags=re.MULTILINE|re.DOTALL) 281 282 def testBasic(self): 283 # connect 284 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 285 timeout=support.LOOPBACK_TIMEOUT) 286 smtp.quit() 287 288 def testSourceAddress(self): 289 # connect 290 src_port = socket_helper.find_unused_port() 291 try: 292 smtp = smtplib.SMTP(self.host, self.port, local_hostname='localhost', 293 timeout=support.LOOPBACK_TIMEOUT, 294 source_address=(self.host, src_port)) 295 self.addCleanup(smtp.close) 296 self.assertEqual(smtp.source_address, (self.host, src_port)) 297 self.assertEqual(smtp.local_hostname, 'localhost') 298 smtp.quit() 299 except OSError as e: 300 if e.errno == errno.EADDRINUSE: 301 self.skipTest("couldn't bind to source port %d" % src_port) 302 raise 303 304 def testNOOP(self): 305 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 306 timeout=support.LOOPBACK_TIMEOUT) 307 self.addCleanup(smtp.close) 308 expected = (250, b'OK') 309 self.assertEqual(smtp.noop(), expected) 310 smtp.quit() 311 312 def testRSET(self): 313 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 314 timeout=support.LOOPBACK_TIMEOUT) 315 self.addCleanup(smtp.close) 316 expected = (250, b'OK') 317 self.assertEqual(smtp.rset(), expected) 318 smtp.quit() 319 320 def testELHO(self): 321 # EHLO isn't implemented in DebuggingServer 322 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 323 timeout=support.LOOPBACK_TIMEOUT) 324 self.addCleanup(smtp.close) 325 expected = (250, b'\nSIZE 33554432\nHELP') 326 self.assertEqual(smtp.ehlo(), expected) 327 smtp.quit() 328 329 def testEXPNNotImplemented(self): 330 # EXPN isn't implemented in DebuggingServer 331 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 332 timeout=support.LOOPBACK_TIMEOUT) 333 self.addCleanup(smtp.close) 334 expected = (502, b'EXPN not implemented') 335 smtp.putcmd('EXPN') 336 self.assertEqual(smtp.getreply(), expected) 337 smtp.quit() 338 339 def test_issue43124_putcmd_escapes_newline(self): 340 # see: https://bugs.python.org/issue43124 341 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 342 timeout=support.LOOPBACK_TIMEOUT) 343 self.addCleanup(smtp.close) 344 with self.assertRaises(ValueError) as exc: 345 smtp.putcmd('helo\nX-INJECTED') 346 self.assertIn("prohibited newline characters", str(exc.exception)) 347 smtp.quit() 348 349 def testVRFY(self): 350 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 351 timeout=support.LOOPBACK_TIMEOUT) 352 self.addCleanup(smtp.close) 353 expected = (252, b'Cannot VRFY user, but will accept message ' + \ 354 b'and attempt delivery') 355 self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected) 356 self.assertEqual(smtp.verify('nobody@nowhere.com'), expected) 357 smtp.quit() 358 359 def testSecondHELO(self): 360 # check that a second HELO returns a message that it's a duplicate 361 # (this behavior is specific to smtpd.SMTPChannel) 362 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 363 timeout=support.LOOPBACK_TIMEOUT) 364 self.addCleanup(smtp.close) 365 smtp.helo() 366 expected = (503, b'Duplicate HELO/EHLO') 367 self.assertEqual(smtp.helo(), expected) 368 smtp.quit() 369 370 def testHELP(self): 371 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 372 timeout=support.LOOPBACK_TIMEOUT) 373 self.addCleanup(smtp.close) 374 self.assertEqual(smtp.help(), b'Supported commands: EHLO HELO MAIL ' + \ 375 b'RCPT DATA RSET NOOP QUIT VRFY') 376 smtp.quit() 377 378 def testSend(self): 379 # connect and send mail 380 m = 'A test message' 381 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 382 timeout=support.LOOPBACK_TIMEOUT) 383 self.addCleanup(smtp.close) 384 smtp.sendmail('John', 'Sally', m) 385 # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor 386 # in asyncore. This sleep might help, but should really be fixed 387 # properly by using an Event variable. 388 time.sleep(0.01) 389 smtp.quit() 390 391 self.client_evt.set() 392 self.serv_evt.wait() 393 self.output.flush() 394 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 395 self.assertEqual(self.output.getvalue(), mexpect) 396 397 def testSendBinary(self): 398 m = b'A test message' 399 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 400 timeout=support.LOOPBACK_TIMEOUT) 401 self.addCleanup(smtp.close) 402 smtp.sendmail('John', 'Sally', m) 403 # XXX (see comment in testSend) 404 time.sleep(0.01) 405 smtp.quit() 406 407 self.client_evt.set() 408 self.serv_evt.wait() 409 self.output.flush() 410 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END) 411 self.assertEqual(self.output.getvalue(), mexpect) 412 413 def testSendNeedingDotQuote(self): 414 # Issue 12283 415 m = '.A test\n.mes.sage.' 416 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 417 timeout=support.LOOPBACK_TIMEOUT) 418 self.addCleanup(smtp.close) 419 smtp.sendmail('John', 'Sally', m) 420 # XXX (see comment in testSend) 421 time.sleep(0.01) 422 smtp.quit() 423 424 self.client_evt.set() 425 self.serv_evt.wait() 426 self.output.flush() 427 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 428 self.assertEqual(self.output.getvalue(), mexpect) 429 430 def test_issue43124_escape_localhostname(self): 431 # see: https://bugs.python.org/issue43124 432 # connect and send mail 433 m = 'wazzuuup\nlinetwo' 434 smtp = smtplib.SMTP(HOST, self.port, local_hostname='hi\nX-INJECTED', 435 timeout=support.LOOPBACK_TIMEOUT) 436 self.addCleanup(smtp.close) 437 with self.assertRaises(ValueError) as exc: 438 smtp.sendmail("hi@me.com", "you@me.com", m) 439 self.assertIn( 440 "prohibited newline characters: ehlo hi\\nX-INJECTED", 441 str(exc.exception), 442 ) 443 # XXX (see comment in testSend) 444 time.sleep(0.01) 445 smtp.quit() 446 447 debugout = smtpd.DEBUGSTREAM.getvalue() 448 self.assertNotIn("X-INJECTED", debugout) 449 450 def test_issue43124_escape_options(self): 451 # see: https://bugs.python.org/issue43124 452 # connect and send mail 453 m = 'wazzuuup\nlinetwo' 454 smtp = smtplib.SMTP( 455 HOST, self.port, local_hostname='localhost', 456 timeout=support.LOOPBACK_TIMEOUT) 457 458 self.addCleanup(smtp.close) 459 smtp.sendmail("hi@me.com", "you@me.com", m) 460 with self.assertRaises(ValueError) as exc: 461 smtp.mail("hi@me.com", ["X-OPTION\nX-INJECTED-1", "X-OPTION2\nX-INJECTED-2"]) 462 msg = str(exc.exception) 463 self.assertIn("prohibited newline characters", msg) 464 self.assertIn("X-OPTION\\nX-INJECTED-1 X-OPTION2\\nX-INJECTED-2", msg) 465 # XXX (see comment in testSend) 466 time.sleep(0.01) 467 smtp.quit() 468 469 debugout = smtpd.DEBUGSTREAM.getvalue() 470 self.assertNotIn("X-OPTION", debugout) 471 self.assertNotIn("X-OPTION2", debugout) 472 self.assertNotIn("X-INJECTED-1", debugout) 473 self.assertNotIn("X-INJECTED-2", debugout) 474 475 def testSendNullSender(self): 476 m = 'A test message' 477 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 478 timeout=support.LOOPBACK_TIMEOUT) 479 self.addCleanup(smtp.close) 480 smtp.sendmail('<>', 'Sally', m) 481 # XXX (see comment in testSend) 482 time.sleep(0.01) 483 smtp.quit() 484 485 self.client_evt.set() 486 self.serv_evt.wait() 487 self.output.flush() 488 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 489 self.assertEqual(self.output.getvalue(), mexpect) 490 debugout = smtpd.DEBUGSTREAM.getvalue() 491 sender = re.compile("^sender: <>$", re.MULTILINE) 492 self.assertRegex(debugout, sender) 493 494 def testSendMessage(self): 495 m = email.mime.text.MIMEText('A test message') 496 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 497 timeout=support.LOOPBACK_TIMEOUT) 498 self.addCleanup(smtp.close) 499 smtp.send_message(m, from_addr='John', to_addrs='Sally') 500 # XXX (see comment in testSend) 501 time.sleep(0.01) 502 smtp.quit() 503 504 self.client_evt.set() 505 self.serv_evt.wait() 506 self.output.flush() 507 # Remove the X-Peer header that DebuggingServer adds as figuring out 508 # exactly what IP address format is put there is not easy (and 509 # irrelevant to our test). Typically 127.0.0.1 or ::1, but it is 510 # not always the same as socket.gethostbyname(HOST). :( 511 test_output = self.get_output_without_xpeer() 512 del m['X-Peer'] 513 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 514 self.assertEqual(test_output, mexpect) 515 516 def testSendMessageWithAddresses(self): 517 m = email.mime.text.MIMEText('A test message') 518 m['From'] = 'foo@bar.com' 519 m['To'] = 'John' 520 m['CC'] = 'Sally, Fred' 521 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>' 522 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 523 timeout=support.LOOPBACK_TIMEOUT) 524 self.addCleanup(smtp.close) 525 smtp.send_message(m) 526 # XXX (see comment in testSend) 527 time.sleep(0.01) 528 smtp.quit() 529 # make sure the Bcc header is still in the message. 530 self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" ' 531 '<warped@silly.walks.com>') 532 533 self.client_evt.set() 534 self.serv_evt.wait() 535 self.output.flush() 536 # Remove the X-Peer header that DebuggingServer adds. 537 test_output = self.get_output_without_xpeer() 538 del m['X-Peer'] 539 # The Bcc header should not be transmitted. 540 del m['Bcc'] 541 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 542 self.assertEqual(test_output, mexpect) 543 debugout = smtpd.DEBUGSTREAM.getvalue() 544 sender = re.compile("^sender: foo@bar.com$", re.MULTILINE) 545 self.assertRegex(debugout, sender) 546 for addr in ('John', 'Sally', 'Fred', 'root@localhost', 547 'warped@silly.walks.com'): 548 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 549 re.MULTILINE) 550 self.assertRegex(debugout, to_addr) 551 552 def testSendMessageWithSomeAddresses(self): 553 # Make sure nothing breaks if not all of the three 'to' headers exist 554 m = email.mime.text.MIMEText('A test message') 555 m['From'] = 'foo@bar.com' 556 m['To'] = 'John, Dinsdale' 557 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 558 timeout=support.LOOPBACK_TIMEOUT) 559 self.addCleanup(smtp.close) 560 smtp.send_message(m) 561 # XXX (see comment in testSend) 562 time.sleep(0.01) 563 smtp.quit() 564 565 self.client_evt.set() 566 self.serv_evt.wait() 567 self.output.flush() 568 # Remove the X-Peer header that DebuggingServer adds. 569 test_output = self.get_output_without_xpeer() 570 del m['X-Peer'] 571 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 572 self.assertEqual(test_output, mexpect) 573 debugout = smtpd.DEBUGSTREAM.getvalue() 574 sender = re.compile("^sender: foo@bar.com$", re.MULTILINE) 575 self.assertRegex(debugout, sender) 576 for addr in ('John', 'Dinsdale'): 577 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 578 re.MULTILINE) 579 self.assertRegex(debugout, to_addr) 580 581 def testSendMessageWithSpecifiedAddresses(self): 582 # Make sure addresses specified in call override those in message. 583 m = email.mime.text.MIMEText('A test message') 584 m['From'] = 'foo@bar.com' 585 m['To'] = 'John, Dinsdale' 586 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 587 timeout=support.LOOPBACK_TIMEOUT) 588 self.addCleanup(smtp.close) 589 smtp.send_message(m, from_addr='joe@example.com', to_addrs='foo@example.net') 590 # XXX (see comment in testSend) 591 time.sleep(0.01) 592 smtp.quit() 593 594 self.client_evt.set() 595 self.serv_evt.wait() 596 self.output.flush() 597 # Remove the X-Peer header that DebuggingServer adds. 598 test_output = self.get_output_without_xpeer() 599 del m['X-Peer'] 600 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 601 self.assertEqual(test_output, mexpect) 602 debugout = smtpd.DEBUGSTREAM.getvalue() 603 sender = re.compile("^sender: joe@example.com$", re.MULTILINE) 604 self.assertRegex(debugout, sender) 605 for addr in ('John', 'Dinsdale'): 606 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 607 re.MULTILINE) 608 self.assertNotRegex(debugout, to_addr) 609 recip = re.compile(r"^recips: .*'foo@example.net'.*$", re.MULTILINE) 610 self.assertRegex(debugout, recip) 611 612 def testSendMessageWithMultipleFrom(self): 613 # Sender overrides To 614 m = email.mime.text.MIMEText('A test message') 615 m['From'] = 'Bernard, Bianca' 616 m['Sender'] = 'the_rescuers@Rescue-Aid-Society.com' 617 m['To'] = 'John, Dinsdale' 618 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 619 timeout=support.LOOPBACK_TIMEOUT) 620 self.addCleanup(smtp.close) 621 smtp.send_message(m) 622 # XXX (see comment in testSend) 623 time.sleep(0.01) 624 smtp.quit() 625 626 self.client_evt.set() 627 self.serv_evt.wait() 628 self.output.flush() 629 # Remove the X-Peer header that DebuggingServer adds. 630 test_output = self.get_output_without_xpeer() 631 del m['X-Peer'] 632 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 633 self.assertEqual(test_output, mexpect) 634 debugout = smtpd.DEBUGSTREAM.getvalue() 635 sender = re.compile("^sender: the_rescuers@Rescue-Aid-Society.com$", re.MULTILINE) 636 self.assertRegex(debugout, sender) 637 for addr in ('John', 'Dinsdale'): 638 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 639 re.MULTILINE) 640 self.assertRegex(debugout, to_addr) 641 642 def testSendMessageResent(self): 643 m = email.mime.text.MIMEText('A test message') 644 m['From'] = 'foo@bar.com' 645 m['To'] = 'John' 646 m['CC'] = 'Sally, Fred' 647 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>' 648 m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000' 649 m['Resent-From'] = 'holy@grail.net' 650 m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff' 651 m['Resent-Bcc'] = 'doe@losthope.net' 652 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 653 timeout=support.LOOPBACK_TIMEOUT) 654 self.addCleanup(smtp.close) 655 smtp.send_message(m) 656 # XXX (see comment in testSend) 657 time.sleep(0.01) 658 smtp.quit() 659 660 self.client_evt.set() 661 self.serv_evt.wait() 662 self.output.flush() 663 # The Resent-Bcc headers are deleted before serialization. 664 del m['Bcc'] 665 del m['Resent-Bcc'] 666 # Remove the X-Peer header that DebuggingServer adds. 667 test_output = self.get_output_without_xpeer() 668 del m['X-Peer'] 669 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 670 self.assertEqual(test_output, mexpect) 671 debugout = smtpd.DEBUGSTREAM.getvalue() 672 sender = re.compile("^sender: holy@grail.net$", re.MULTILINE) 673 self.assertRegex(debugout, sender) 674 for addr in ('my_mom@great.cooker.com', 'Jeff', 'doe@losthope.net'): 675 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 676 re.MULTILINE) 677 self.assertRegex(debugout, to_addr) 678 679 def testSendMessageMultipleResentRaises(self): 680 m = email.mime.text.MIMEText('A test message') 681 m['From'] = 'foo@bar.com' 682 m['To'] = 'John' 683 m['CC'] = 'Sally, Fred' 684 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>' 685 m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000' 686 m['Resent-From'] = 'holy@grail.net' 687 m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff' 688 m['Resent-Bcc'] = 'doe@losthope.net' 689 m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000' 690 m['Resent-To'] = 'holy@grail.net' 691 m['Resent-From'] = 'Martha <my_mom@great.cooker.com>, Jeff' 692 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 693 timeout=support.LOOPBACK_TIMEOUT) 694 self.addCleanup(smtp.close) 695 with self.assertRaises(ValueError): 696 smtp.send_message(m) 697 smtp.close() 698 699class NonConnectingTests(unittest.TestCase): 700 701 def testNotConnected(self): 702 # Test various operations on an unconnected SMTP object that 703 # should raise exceptions (at present the attempt in SMTP.send 704 # to reference the nonexistent 'sock' attribute of the SMTP object 705 # causes an AttributeError) 706 smtp = smtplib.SMTP() 707 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo) 708 self.assertRaises(smtplib.SMTPServerDisconnected, 709 smtp.send, 'test msg') 710 711 def testNonnumericPort(self): 712 # check that non-numeric port raises OSError 713 self.assertRaises(OSError, smtplib.SMTP, 714 "localhost", "bogus") 715 self.assertRaises(OSError, smtplib.SMTP, 716 "localhost:bogus") 717 718 def testSockAttributeExists(self): 719 # check that sock attribute is present outside of a connect() call 720 # (regression test, the previous behavior raised an 721 # AttributeError: 'SMTP' object has no attribute 'sock') 722 with smtplib.SMTP() as smtp: 723 self.assertIsNone(smtp.sock) 724 725 726class DefaultArgumentsTests(unittest.TestCase): 727 728 def setUp(self): 729 self.msg = EmailMessage() 730 self.msg['From'] = 'Páolo <főo@bar.com>' 731 self.smtp = smtplib.SMTP() 732 self.smtp.ehlo = Mock(return_value=(200, 'OK')) 733 self.smtp.has_extn, self.smtp.sendmail = Mock(), Mock() 734 735 def testSendMessage(self): 736 expected_mail_options = ('SMTPUTF8', 'BODY=8BITMIME') 737 self.smtp.send_message(self.msg) 738 self.smtp.send_message(self.msg) 739 self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3], 740 expected_mail_options) 741 self.assertEqual(self.smtp.sendmail.call_args_list[1][0][3], 742 expected_mail_options) 743 744 def testSendMessageWithMailOptions(self): 745 mail_options = ['STARTTLS'] 746 expected_mail_options = ('STARTTLS', 'SMTPUTF8', 'BODY=8BITMIME') 747 self.smtp.send_message(self.msg, None, None, mail_options) 748 self.assertEqual(mail_options, ['STARTTLS']) 749 self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3], 750 expected_mail_options) 751 752 753# test response of client to a non-successful HELO message 754class BadHELOServerTests(unittest.TestCase): 755 756 def setUp(self): 757 smtplib.socket = mock_socket 758 mock_socket.reply_with(b"199 no hello for you!") 759 self.old_stdout = sys.stdout 760 self.output = io.StringIO() 761 sys.stdout = self.output 762 self.port = 25 763 764 def tearDown(self): 765 smtplib.socket = socket 766 sys.stdout = self.old_stdout 767 768 def testFailingHELO(self): 769 self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP, 770 HOST, self.port, 'localhost', 3) 771 772 773class TooLongLineTests(unittest.TestCase): 774 respdata = b'250 OK' + (b'.' * smtplib._MAXLINE * 2) + b'\n' 775 776 def setUp(self): 777 self.thread_key = threading_helper.threading_setup() 778 self.old_stdout = sys.stdout 779 self.output = io.StringIO() 780 sys.stdout = self.output 781 782 self.evt = threading.Event() 783 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 784 self.sock.settimeout(15) 785 self.port = socket_helper.bind_port(self.sock) 786 servargs = (self.evt, self.respdata, self.sock) 787 self.thread = threading.Thread(target=server, args=servargs) 788 self.thread.start() 789 self.evt.wait() 790 self.evt.clear() 791 792 def tearDown(self): 793 self.evt.wait() 794 sys.stdout = self.old_stdout 795 threading_helper.join_thread(self.thread) 796 del self.thread 797 self.doCleanups() 798 threading_helper.threading_cleanup(*self.thread_key) 799 800 def testLineTooLong(self): 801 self.assertRaises(smtplib.SMTPResponseException, smtplib.SMTP, 802 HOST, self.port, 'localhost', 3) 803 804 805sim_users = {'Mr.A@somewhere.com':'John A', 806 'Ms.B@xn--fo-fka.com':'Sally B', 807 'Mrs.C@somewhereesle.com':'Ruth C', 808 } 809 810sim_auth = ('Mr.A@somewhere.com', 'somepassword') 811sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn' 812 'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=') 813sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'], 814 'list-2':['Ms.B@xn--fo-fka.com',], 815 } 816 817# Simulated SMTP channel & server 818class ResponseException(Exception): pass 819class SimSMTPChannel(smtpd.SMTPChannel): 820 821 quit_response = None 822 mail_response = None 823 rcpt_response = None 824 data_response = None 825 rcpt_count = 0 826 rset_count = 0 827 disconnect = 0 828 AUTH = 99 # Add protocol state to enable auth testing. 829 authenticated_user = None 830 831 def __init__(self, extra_features, *args, **kw): 832 self._extrafeatures = ''.join( 833 [ "250-{0}\r\n".format(x) for x in extra_features ]) 834 super(SimSMTPChannel, self).__init__(*args, **kw) 835 836 # AUTH related stuff. It would be nice if support for this were in smtpd. 837 def found_terminator(self): 838 if self.smtp_state == self.AUTH: 839 line = self._emptystring.join(self.received_lines) 840 print('Data:', repr(line), file=smtpd.DEBUGSTREAM) 841 self.received_lines = [] 842 try: 843 self.auth_object(line) 844 except ResponseException as e: 845 self.smtp_state = self.COMMAND 846 self.push('%s %s' % (e.smtp_code, e.smtp_error)) 847 return 848 super().found_terminator() 849 850 851 def smtp_AUTH(self, arg): 852 if not self.seen_greeting: 853 self.push('503 Error: send EHLO first') 854 return 855 if not self.extended_smtp or 'AUTH' not in self._extrafeatures: 856 self.push('500 Error: command "AUTH" not recognized') 857 return 858 if self.authenticated_user is not None: 859 self.push( 860 '503 Bad sequence of commands: already authenticated') 861 return 862 args = arg.split() 863 if len(args) not in [1, 2]: 864 self.push('501 Syntax: AUTH <mechanism> [initial-response]') 865 return 866 auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_') 867 try: 868 self.auth_object = getattr(self, auth_object_name) 869 except AttributeError: 870 self.push('504 Command parameter not implemented: unsupported ' 871 ' authentication mechanism {!r}'.format(auth_object_name)) 872 return 873 self.smtp_state = self.AUTH 874 self.auth_object(args[1] if len(args) == 2 else None) 875 876 def _authenticated(self, user, valid): 877 if valid: 878 self.authenticated_user = user 879 self.push('235 Authentication Succeeded') 880 else: 881 self.push('535 Authentication credentials invalid') 882 self.smtp_state = self.COMMAND 883 884 def _decode_base64(self, string): 885 return base64.decodebytes(string.encode('ascii')).decode('utf-8') 886 887 def _auth_plain(self, arg=None): 888 if arg is None: 889 self.push('334 ') 890 else: 891 logpass = self._decode_base64(arg) 892 try: 893 *_, user, password = logpass.split('\0') 894 except ValueError as e: 895 self.push('535 Splitting response {!r} into user and password' 896 ' failed: {}'.format(logpass, e)) 897 return 898 self._authenticated(user, password == sim_auth[1]) 899 900 def _auth_login(self, arg=None): 901 if arg is None: 902 # base64 encoded 'Username:' 903 self.push('334 VXNlcm5hbWU6') 904 elif not hasattr(self, '_auth_login_user'): 905 self._auth_login_user = self._decode_base64(arg) 906 # base64 encoded 'Password:' 907 self.push('334 UGFzc3dvcmQ6') 908 else: 909 password = self._decode_base64(arg) 910 self._authenticated(self._auth_login_user, password == sim_auth[1]) 911 del self._auth_login_user 912 913 def _auth_buggy(self, arg=None): 914 # This AUTH mechanism will 'trap' client in a neverending 334 915 # base64 encoded 'BuGgYbUgGy' 916 self.push('334 QnVHZ1liVWdHeQ==') 917 918 def _auth_cram_md5(self, arg=None): 919 if arg is None: 920 self.push('334 {}'.format(sim_cram_md5_challenge)) 921 else: 922 logpass = self._decode_base64(arg) 923 try: 924 user, hashed_pass = logpass.split() 925 except ValueError as e: 926 self.push('535 Splitting response {!r} into user and password ' 927 'failed: {}'.format(logpass, e)) 928 return False 929 valid_hashed_pass = hmac.HMAC( 930 sim_auth[1].encode('ascii'), 931 self._decode_base64(sim_cram_md5_challenge).encode('ascii'), 932 'md5').hexdigest() 933 self._authenticated(user, hashed_pass == valid_hashed_pass) 934 # end AUTH related stuff. 935 936 def smtp_EHLO(self, arg): 937 resp = ('250-testhost\r\n' 938 '250-EXPN\r\n' 939 '250-SIZE 20000000\r\n' 940 '250-STARTTLS\r\n' 941 '250-DELIVERBY\r\n') 942 resp = resp + self._extrafeatures + '250 HELP' 943 self.push(resp) 944 self.seen_greeting = arg 945 self.extended_smtp = True 946 947 def smtp_VRFY(self, arg): 948 # For max compatibility smtplib should be sending the raw address. 949 if arg in sim_users: 950 self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg))) 951 else: 952 self.push('550 No such user: %s' % arg) 953 954 def smtp_EXPN(self, arg): 955 list_name = arg.lower() 956 if list_name in sim_lists: 957 user_list = sim_lists[list_name] 958 for n, user_email in enumerate(user_list): 959 quoted_addr = smtplib.quoteaddr(user_email) 960 if n < len(user_list) - 1: 961 self.push('250-%s %s' % (sim_users[user_email], quoted_addr)) 962 else: 963 self.push('250 %s %s' % (sim_users[user_email], quoted_addr)) 964 else: 965 self.push('550 No access for you!') 966 967 def smtp_QUIT(self, arg): 968 if self.quit_response is None: 969 super(SimSMTPChannel, self).smtp_QUIT(arg) 970 else: 971 self.push(self.quit_response) 972 self.close_when_done() 973 974 def smtp_MAIL(self, arg): 975 if self.mail_response is None: 976 super().smtp_MAIL(arg) 977 else: 978 self.push(self.mail_response) 979 if self.disconnect: 980 self.close_when_done() 981 982 def smtp_RCPT(self, arg): 983 if self.rcpt_response is None: 984 super().smtp_RCPT(arg) 985 return 986 self.rcpt_count += 1 987 self.push(self.rcpt_response[self.rcpt_count-1]) 988 989 def smtp_RSET(self, arg): 990 self.rset_count += 1 991 super().smtp_RSET(arg) 992 993 def smtp_DATA(self, arg): 994 if self.data_response is None: 995 super().smtp_DATA(arg) 996 else: 997 self.push(self.data_response) 998 999 def handle_error(self): 1000 raise 1001 1002 1003class SimSMTPServer(smtpd.SMTPServer): 1004 1005 channel_class = SimSMTPChannel 1006 1007 def __init__(self, *args, **kw): 1008 self._extra_features = [] 1009 self._addresses = {} 1010 smtpd.SMTPServer.__init__(self, *args, **kw) 1011 1012 def handle_accepted(self, conn, addr): 1013 self._SMTPchannel = self.channel_class( 1014 self._extra_features, self, conn, addr, 1015 decode_data=self._decode_data) 1016 1017 def process_message(self, peer, mailfrom, rcpttos, data): 1018 self._addresses['from'] = mailfrom 1019 self._addresses['tos'] = rcpttos 1020 1021 def add_feature(self, feature): 1022 self._extra_features.append(feature) 1023 1024 def handle_error(self): 1025 raise 1026 1027 1028# Test various SMTP & ESMTP commands/behaviors that require a simulated server 1029# (i.e., something with more features than DebuggingServer) 1030class SMTPSimTests(unittest.TestCase): 1031 1032 def setUp(self): 1033 self.thread_key = threading_helper.threading_setup() 1034 self.real_getfqdn = socket.getfqdn 1035 socket.getfqdn = mock_socket.getfqdn 1036 self.serv_evt = threading.Event() 1037 self.client_evt = threading.Event() 1038 # Pick a random unused port by passing 0 for the port number 1039 self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1), decode_data=True) 1040 # Keep a note of what port was assigned 1041 self.port = self.serv.socket.getsockname()[1] 1042 serv_args = (self.serv, self.serv_evt, self.client_evt) 1043 self.thread = threading.Thread(target=debugging_server, args=serv_args) 1044 self.thread.start() 1045 1046 # wait until server thread has assigned a port number 1047 self.serv_evt.wait() 1048 self.serv_evt.clear() 1049 1050 def tearDown(self): 1051 socket.getfqdn = self.real_getfqdn 1052 # indicate that the client is finished 1053 self.client_evt.set() 1054 # wait for the server thread to terminate 1055 self.serv_evt.wait() 1056 threading_helper.join_thread(self.thread) 1057 del self.thread 1058 self.doCleanups() 1059 threading_helper.threading_cleanup(*self.thread_key) 1060 1061 def testBasic(self): 1062 # smoke test 1063 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1064 timeout=support.LOOPBACK_TIMEOUT) 1065 smtp.quit() 1066 1067 def testEHLO(self): 1068 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1069 timeout=support.LOOPBACK_TIMEOUT) 1070 1071 # no features should be present before the EHLO 1072 self.assertEqual(smtp.esmtp_features, {}) 1073 1074 # features expected from the test server 1075 expected_features = {'expn':'', 1076 'size': '20000000', 1077 'starttls': '', 1078 'deliverby': '', 1079 'help': '', 1080 } 1081 1082 smtp.ehlo() 1083 self.assertEqual(smtp.esmtp_features, expected_features) 1084 for k in expected_features: 1085 self.assertTrue(smtp.has_extn(k)) 1086 self.assertFalse(smtp.has_extn('unsupported-feature')) 1087 smtp.quit() 1088 1089 def testVRFY(self): 1090 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1091 timeout=support.LOOPBACK_TIMEOUT) 1092 1093 for addr_spec, name in sim_users.items(): 1094 expected_known = (250, bytes('%s %s' % 1095 (name, smtplib.quoteaddr(addr_spec)), 1096 "ascii")) 1097 self.assertEqual(smtp.vrfy(addr_spec), expected_known) 1098 1099 u = 'nobody@nowhere.com' 1100 expected_unknown = (550, ('No such user: %s' % u).encode('ascii')) 1101 self.assertEqual(smtp.vrfy(u), expected_unknown) 1102 smtp.quit() 1103 1104 def testEXPN(self): 1105 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1106 timeout=support.LOOPBACK_TIMEOUT) 1107 1108 for listname, members in sim_lists.items(): 1109 users = [] 1110 for m in members: 1111 users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m))) 1112 expected_known = (250, bytes('\n'.join(users), "ascii")) 1113 self.assertEqual(smtp.expn(listname), expected_known) 1114 1115 u = 'PSU-Members-List' 1116 expected_unknown = (550, b'No access for you!') 1117 self.assertEqual(smtp.expn(u), expected_unknown) 1118 smtp.quit() 1119 1120 def testAUTH_PLAIN(self): 1121 self.serv.add_feature("AUTH PLAIN") 1122 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1123 timeout=support.LOOPBACK_TIMEOUT) 1124 resp = smtp.login(sim_auth[0], sim_auth[1]) 1125 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1126 smtp.close() 1127 1128 def testAUTH_LOGIN(self): 1129 self.serv.add_feature("AUTH LOGIN") 1130 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1131 timeout=support.LOOPBACK_TIMEOUT) 1132 resp = smtp.login(sim_auth[0], sim_auth[1]) 1133 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1134 smtp.close() 1135 1136 def testAUTH_LOGIN_initial_response_ok(self): 1137 self.serv.add_feature("AUTH LOGIN") 1138 with smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1139 timeout=support.LOOPBACK_TIMEOUT) as smtp: 1140 smtp.user, smtp.password = sim_auth 1141 smtp.ehlo("test_auth_login") 1142 resp = smtp.auth("LOGIN", smtp.auth_login, initial_response_ok=True) 1143 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1144 1145 def testAUTH_LOGIN_initial_response_notok(self): 1146 self.serv.add_feature("AUTH LOGIN") 1147 with smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1148 timeout=support.LOOPBACK_TIMEOUT) as smtp: 1149 smtp.user, smtp.password = sim_auth 1150 smtp.ehlo("test_auth_login") 1151 resp = smtp.auth("LOGIN", smtp.auth_login, initial_response_ok=False) 1152 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1153 1154 def testAUTH_BUGGY(self): 1155 self.serv.add_feature("AUTH BUGGY") 1156 1157 def auth_buggy(challenge=None): 1158 self.assertEqual(b"BuGgYbUgGy", challenge) 1159 return "\0" 1160 1161 smtp = smtplib.SMTP( 1162 HOST, self.port, local_hostname='localhost', 1163 timeout=support.LOOPBACK_TIMEOUT 1164 ) 1165 try: 1166 smtp.user, smtp.password = sim_auth 1167 smtp.ehlo("test_auth_buggy") 1168 expect = r"^Server AUTH mechanism infinite loop.*" 1169 with self.assertRaisesRegex(smtplib.SMTPException, expect) as cm: 1170 smtp.auth("BUGGY", auth_buggy, initial_response_ok=False) 1171 finally: 1172 smtp.close() 1173 1174 @hashlib_helper.requires_hashdigest('md5') 1175 def testAUTH_CRAM_MD5(self): 1176 self.serv.add_feature("AUTH CRAM-MD5") 1177 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1178 timeout=support.LOOPBACK_TIMEOUT) 1179 resp = smtp.login(sim_auth[0], sim_auth[1]) 1180 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1181 smtp.close() 1182 1183 @hashlib_helper.requires_hashdigest('md5') 1184 def testAUTH_multiple(self): 1185 # Test that multiple authentication methods are tried. 1186 self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5") 1187 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1188 timeout=support.LOOPBACK_TIMEOUT) 1189 resp = smtp.login(sim_auth[0], sim_auth[1]) 1190 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1191 smtp.close() 1192 1193 def test_auth_function(self): 1194 supported = {'PLAIN', 'LOGIN'} 1195 try: 1196 hashlib.md5() 1197 except ValueError: 1198 pass 1199 else: 1200 supported.add('CRAM-MD5') 1201 for mechanism in supported: 1202 self.serv.add_feature("AUTH {}".format(mechanism)) 1203 for mechanism in supported: 1204 with self.subTest(mechanism=mechanism): 1205 smtp = smtplib.SMTP(HOST, self.port, 1206 local_hostname='localhost', 1207 timeout=support.LOOPBACK_TIMEOUT) 1208 smtp.ehlo('foo') 1209 smtp.user, smtp.password = sim_auth[0], sim_auth[1] 1210 method = 'auth_' + mechanism.lower().replace('-', '_') 1211 resp = smtp.auth(mechanism, getattr(smtp, method)) 1212 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1213 smtp.close() 1214 1215 def test_quit_resets_greeting(self): 1216 smtp = smtplib.SMTP(HOST, self.port, 1217 local_hostname='localhost', 1218 timeout=support.LOOPBACK_TIMEOUT) 1219 code, message = smtp.ehlo() 1220 self.assertEqual(code, 250) 1221 self.assertIn('size', smtp.esmtp_features) 1222 smtp.quit() 1223 self.assertNotIn('size', smtp.esmtp_features) 1224 smtp.connect(HOST, self.port) 1225 self.assertNotIn('size', smtp.esmtp_features) 1226 smtp.ehlo_or_helo_if_needed() 1227 self.assertIn('size', smtp.esmtp_features) 1228 smtp.quit() 1229 1230 def test_with_statement(self): 1231 with smtplib.SMTP(HOST, self.port) as smtp: 1232 code, message = smtp.noop() 1233 self.assertEqual(code, 250) 1234 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo') 1235 with smtplib.SMTP(HOST, self.port) as smtp: 1236 smtp.close() 1237 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo') 1238 1239 def test_with_statement_QUIT_failure(self): 1240 with self.assertRaises(smtplib.SMTPResponseException) as error: 1241 with smtplib.SMTP(HOST, self.port) as smtp: 1242 smtp.noop() 1243 self.serv._SMTPchannel.quit_response = '421 QUIT FAILED' 1244 self.assertEqual(error.exception.smtp_code, 421) 1245 self.assertEqual(error.exception.smtp_error, b'QUIT FAILED') 1246 1247 #TODO: add tests for correct AUTH method fallback now that the 1248 #test infrastructure can support it. 1249 1250 # Issue 17498: make sure _rset does not raise SMTPServerDisconnected exception 1251 def test__rest_from_mail_cmd(self): 1252 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1253 timeout=support.LOOPBACK_TIMEOUT) 1254 smtp.noop() 1255 self.serv._SMTPchannel.mail_response = '451 Requested action aborted' 1256 self.serv._SMTPchannel.disconnect = True 1257 with self.assertRaises(smtplib.SMTPSenderRefused): 1258 smtp.sendmail('John', 'Sally', 'test message') 1259 self.assertIsNone(smtp.sock) 1260 1261 # Issue 5713: make sure close, not rset, is called if we get a 421 error 1262 def test_421_from_mail_cmd(self): 1263 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1264 timeout=support.LOOPBACK_TIMEOUT) 1265 smtp.noop() 1266 self.serv._SMTPchannel.mail_response = '421 closing connection' 1267 with self.assertRaises(smtplib.SMTPSenderRefused): 1268 smtp.sendmail('John', 'Sally', 'test message') 1269 self.assertIsNone(smtp.sock) 1270 self.assertEqual(self.serv._SMTPchannel.rset_count, 0) 1271 1272 def test_421_from_rcpt_cmd(self): 1273 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1274 timeout=support.LOOPBACK_TIMEOUT) 1275 smtp.noop() 1276 self.serv._SMTPchannel.rcpt_response = ['250 accepted', '421 closing'] 1277 with self.assertRaises(smtplib.SMTPRecipientsRefused) as r: 1278 smtp.sendmail('John', ['Sally', 'Frank', 'George'], 'test message') 1279 self.assertIsNone(smtp.sock) 1280 self.assertEqual(self.serv._SMTPchannel.rset_count, 0) 1281 self.assertDictEqual(r.exception.args[0], {'Frank': (421, b'closing')}) 1282 1283 def test_421_from_data_cmd(self): 1284 class MySimSMTPChannel(SimSMTPChannel): 1285 def found_terminator(self): 1286 if self.smtp_state == self.DATA: 1287 self.push('421 closing') 1288 else: 1289 super().found_terminator() 1290 self.serv.channel_class = MySimSMTPChannel 1291 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1292 timeout=support.LOOPBACK_TIMEOUT) 1293 smtp.noop() 1294 with self.assertRaises(smtplib.SMTPDataError): 1295 smtp.sendmail('John@foo.org', ['Sally@foo.org'], 'test message') 1296 self.assertIsNone(smtp.sock) 1297 self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0) 1298 1299 def test_smtputf8_NotSupportedError_if_no_server_support(self): 1300 smtp = smtplib.SMTP( 1301 HOST, self.port, local_hostname='localhost', 1302 timeout=support.LOOPBACK_TIMEOUT) 1303 self.addCleanup(smtp.close) 1304 smtp.ehlo() 1305 self.assertTrue(smtp.does_esmtp) 1306 self.assertFalse(smtp.has_extn('smtputf8')) 1307 self.assertRaises( 1308 smtplib.SMTPNotSupportedError, 1309 smtp.sendmail, 1310 'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8']) 1311 self.assertRaises( 1312 smtplib.SMTPNotSupportedError, 1313 smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8']) 1314 1315 def test_send_unicode_without_SMTPUTF8(self): 1316 smtp = smtplib.SMTP( 1317 HOST, self.port, local_hostname='localhost', 1318 timeout=support.LOOPBACK_TIMEOUT) 1319 self.addCleanup(smtp.close) 1320 self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Böb', '') 1321 self.assertRaises(UnicodeEncodeError, smtp.mail, 'Älice') 1322 1323 def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self): 1324 # This test is located here and not in the SMTPUTF8SimTests 1325 # class because it needs a "regular" SMTP server to work 1326 msg = EmailMessage() 1327 msg['From'] = "Páolo <főo@bar.com>" 1328 msg['To'] = 'Dinsdale' 1329 msg['Subject'] = 'Nudge nudge, wink, wink \u1F609' 1330 smtp = smtplib.SMTP( 1331 HOST, self.port, local_hostname='localhost', 1332 timeout=support.LOOPBACK_TIMEOUT) 1333 self.addCleanup(smtp.close) 1334 with self.assertRaises(smtplib.SMTPNotSupportedError): 1335 smtp.send_message(msg) 1336 1337 def test_name_field_not_included_in_envelop_addresses(self): 1338 smtp = smtplib.SMTP( 1339 HOST, self.port, local_hostname='localhost', 1340 timeout=support.LOOPBACK_TIMEOUT) 1341 self.addCleanup(smtp.close) 1342 1343 message = EmailMessage() 1344 message['From'] = email.utils.formataddr(('Michaël', 'michael@example.com')) 1345 message['To'] = email.utils.formataddr(('René', 'rene@example.com')) 1346 1347 self.assertDictEqual(smtp.send_message(message), {}) 1348 1349 self.assertEqual(self.serv._addresses['from'], 'michael@example.com') 1350 self.assertEqual(self.serv._addresses['tos'], ['rene@example.com']) 1351 1352 1353class SimSMTPUTF8Server(SimSMTPServer): 1354 1355 def __init__(self, *args, **kw): 1356 # The base SMTP server turns these on automatically, but our test 1357 # server is set up to munge the EHLO response, so we need to provide 1358 # them as well. And yes, the call is to SMTPServer not SimSMTPServer. 1359 self._extra_features = ['SMTPUTF8', '8BITMIME'] 1360 smtpd.SMTPServer.__init__(self, *args, **kw) 1361 1362 def handle_accepted(self, conn, addr): 1363 self._SMTPchannel = self.channel_class( 1364 self._extra_features, self, conn, addr, 1365 decode_data=self._decode_data, 1366 enable_SMTPUTF8=self.enable_SMTPUTF8, 1367 ) 1368 1369 def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None, 1370 rcpt_options=None): 1371 self.last_peer = peer 1372 self.last_mailfrom = mailfrom 1373 self.last_rcpttos = rcpttos 1374 self.last_message = data 1375 self.last_mail_options = mail_options 1376 self.last_rcpt_options = rcpt_options 1377 1378 1379class SMTPUTF8SimTests(unittest.TestCase): 1380 1381 maxDiff = None 1382 1383 def setUp(self): 1384 self.thread_key = threading_helper.threading_setup() 1385 self.real_getfqdn = socket.getfqdn 1386 socket.getfqdn = mock_socket.getfqdn 1387 self.serv_evt = threading.Event() 1388 self.client_evt = threading.Event() 1389 # Pick a random unused port by passing 0 for the port number 1390 self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1), 1391 decode_data=False, 1392 enable_SMTPUTF8=True) 1393 # Keep a note of what port was assigned 1394 self.port = self.serv.socket.getsockname()[1] 1395 serv_args = (self.serv, self.serv_evt, self.client_evt) 1396 self.thread = threading.Thread(target=debugging_server, args=serv_args) 1397 self.thread.start() 1398 1399 # wait until server thread has assigned a port number 1400 self.serv_evt.wait() 1401 self.serv_evt.clear() 1402 1403 def tearDown(self): 1404 socket.getfqdn = self.real_getfqdn 1405 # indicate that the client is finished 1406 self.client_evt.set() 1407 # wait for the server thread to terminate 1408 self.serv_evt.wait() 1409 threading_helper.join_thread(self.thread) 1410 del self.thread 1411 self.doCleanups() 1412 threading_helper.threading_cleanup(*self.thread_key) 1413 1414 def test_test_server_supports_extensions(self): 1415 smtp = smtplib.SMTP( 1416 HOST, self.port, local_hostname='localhost', 1417 timeout=support.LOOPBACK_TIMEOUT) 1418 self.addCleanup(smtp.close) 1419 smtp.ehlo() 1420 self.assertTrue(smtp.does_esmtp) 1421 self.assertTrue(smtp.has_extn('smtputf8')) 1422 1423 def test_send_unicode_with_SMTPUTF8_via_sendmail(self): 1424 m = '¡a test message containing unicode!'.encode('utf-8') 1425 smtp = smtplib.SMTP( 1426 HOST, self.port, local_hostname='localhost', 1427 timeout=support.LOOPBACK_TIMEOUT) 1428 self.addCleanup(smtp.close) 1429 smtp.sendmail('Jőhn', 'Sálly', m, 1430 mail_options=['BODY=8BITMIME', 'SMTPUTF8']) 1431 self.assertEqual(self.serv.last_mailfrom, 'Jőhn') 1432 self.assertEqual(self.serv.last_rcpttos, ['Sálly']) 1433 self.assertEqual(self.serv.last_message, m) 1434 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 1435 self.assertIn('SMTPUTF8', self.serv.last_mail_options) 1436 self.assertEqual(self.serv.last_rcpt_options, []) 1437 1438 def test_send_unicode_with_SMTPUTF8_via_low_level_API(self): 1439 m = '¡a test message containing unicode!'.encode('utf-8') 1440 smtp = smtplib.SMTP( 1441 HOST, self.port, local_hostname='localhost', 1442 timeout=support.LOOPBACK_TIMEOUT) 1443 self.addCleanup(smtp.close) 1444 smtp.ehlo() 1445 self.assertEqual( 1446 smtp.mail('Jő', options=['BODY=8BITMIME', 'SMTPUTF8']), 1447 (250, b'OK')) 1448 self.assertEqual(smtp.rcpt('János'), (250, b'OK')) 1449 self.assertEqual(smtp.data(m), (250, b'OK')) 1450 self.assertEqual(self.serv.last_mailfrom, 'Jő') 1451 self.assertEqual(self.serv.last_rcpttos, ['János']) 1452 self.assertEqual(self.serv.last_message, m) 1453 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 1454 self.assertIn('SMTPUTF8', self.serv.last_mail_options) 1455 self.assertEqual(self.serv.last_rcpt_options, []) 1456 1457 def test_send_message_uses_smtputf8_if_addrs_non_ascii(self): 1458 msg = EmailMessage() 1459 msg['From'] = "Páolo <főo@bar.com>" 1460 msg['To'] = 'Dinsdale' 1461 msg['Subject'] = 'Nudge nudge, wink, wink \u1F609' 1462 # XXX I don't know why I need two \n's here, but this is an existing 1463 # bug (if it is one) and not a problem with the new functionality. 1464 msg.set_content("oh là là, know what I mean, know what I mean?\n\n") 1465 # XXX smtpd converts received /r/n to /n, so we can't easily test that 1466 # we are successfully sending /r/n :(. 1467 expected = textwrap.dedent("""\ 1468 From: Páolo <főo@bar.com> 1469 To: Dinsdale 1470 Subject: Nudge nudge, wink, wink \u1F609 1471 Content-Type: text/plain; charset="utf-8" 1472 Content-Transfer-Encoding: 8bit 1473 MIME-Version: 1.0 1474 1475 oh là là, know what I mean, know what I mean? 1476 """) 1477 smtp = smtplib.SMTP( 1478 HOST, self.port, local_hostname='localhost', 1479 timeout=support.LOOPBACK_TIMEOUT) 1480 self.addCleanup(smtp.close) 1481 self.assertEqual(smtp.send_message(msg), {}) 1482 self.assertEqual(self.serv.last_mailfrom, 'főo@bar.com') 1483 self.assertEqual(self.serv.last_rcpttos, ['Dinsdale']) 1484 self.assertEqual(self.serv.last_message.decode(), expected) 1485 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 1486 self.assertIn('SMTPUTF8', self.serv.last_mail_options) 1487 self.assertEqual(self.serv.last_rcpt_options, []) 1488 1489 1490EXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='') 1491 1492class SimSMTPAUTHInitialResponseChannel(SimSMTPChannel): 1493 def smtp_AUTH(self, arg): 1494 # RFC 4954's AUTH command allows for an optional initial-response. 1495 # Not all AUTH methods support this; some require a challenge. AUTH 1496 # PLAIN does those, so test that here. See issue #15014. 1497 args = arg.split() 1498 if args[0].lower() == 'plain': 1499 if len(args) == 2: 1500 # AUTH PLAIN <initial-response> with the response base 64 1501 # encoded. Hard code the expected response for the test. 1502 if args[1] == EXPECTED_RESPONSE: 1503 self.push('235 Ok') 1504 return 1505 self.push('571 Bad authentication') 1506 1507class SimSMTPAUTHInitialResponseServer(SimSMTPServer): 1508 channel_class = SimSMTPAUTHInitialResponseChannel 1509 1510 1511class SMTPAUTHInitialResponseSimTests(unittest.TestCase): 1512 def setUp(self): 1513 self.thread_key = threading_helper.threading_setup() 1514 self.real_getfqdn = socket.getfqdn 1515 socket.getfqdn = mock_socket.getfqdn 1516 self.serv_evt = threading.Event() 1517 self.client_evt = threading.Event() 1518 # Pick a random unused port by passing 0 for the port number 1519 self.serv = SimSMTPAUTHInitialResponseServer( 1520 (HOST, 0), ('nowhere', -1), decode_data=True) 1521 # Keep a note of what port was assigned 1522 self.port = self.serv.socket.getsockname()[1] 1523 serv_args = (self.serv, self.serv_evt, self.client_evt) 1524 self.thread = threading.Thread(target=debugging_server, args=serv_args) 1525 self.thread.start() 1526 1527 # wait until server thread has assigned a port number 1528 self.serv_evt.wait() 1529 self.serv_evt.clear() 1530 1531 def tearDown(self): 1532 socket.getfqdn = self.real_getfqdn 1533 # indicate that the client is finished 1534 self.client_evt.set() 1535 # wait for the server thread to terminate 1536 self.serv_evt.wait() 1537 threading_helper.join_thread(self.thread) 1538 del self.thread 1539 self.doCleanups() 1540 threading_helper.threading_cleanup(*self.thread_key) 1541 1542 def testAUTH_PLAIN_initial_response_login(self): 1543 self.serv.add_feature('AUTH PLAIN') 1544 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1545 timeout=support.LOOPBACK_TIMEOUT) 1546 smtp.login('psu', 'doesnotexist') 1547 smtp.close() 1548 1549 def testAUTH_PLAIN_initial_response_auth(self): 1550 self.serv.add_feature('AUTH PLAIN') 1551 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1552 timeout=support.LOOPBACK_TIMEOUT) 1553 smtp.user = 'psu' 1554 smtp.password = 'doesnotexist' 1555 code, response = smtp.auth('plain', smtp.auth_plain) 1556 smtp.close() 1557 self.assertEqual(code, 235) 1558 1559 1560if __name__ == '__main__': 1561 unittest.main() 1562