1import asyncore 2import base64 3import email.mime.text 4from email.message import EmailMessage 5from email.base64mime import body_encode as encode_base64 6import email.utils 7import hashlib 8import hmac 9import socket 10import smtpd 11import smtplib 12import io 13import re 14import sys 15import time 16import select 17import errno 18import textwrap 19import threading 20 21import unittest 22from test import support, mock_socket 23from test.support import HOST 24from test.support import threading_setup, threading_cleanup, join_thread 25from test.support import requires_hashdigest 26from unittest.mock import Mock 27 28 29if sys.platform == 'darwin': 30 # select.poll returns a select.POLLHUP at the end of the tests 31 # on darwin, so just ignore it 32 def handle_expt(self): 33 pass 34 smtpd.SMTPChannel.handle_expt = handle_expt 35 36 37def server(evt, buf, serv): 38 serv.listen() 39 evt.set() 40 try: 41 conn, addr = serv.accept() 42 except socket.timeout: 43 pass 44 else: 45 n = 500 46 while buf and n > 0: 47 r, w, e = select.select([], [conn], []) 48 if w: 49 sent = conn.send(buf) 50 buf = buf[sent:] 51 52 n -= 1 53 54 conn.close() 55 finally: 56 serv.close() 57 evt.set() 58 59class GeneralTests(unittest.TestCase): 60 61 def setUp(self): 62 smtplib.socket = mock_socket 63 self.port = 25 64 65 def tearDown(self): 66 smtplib.socket = socket 67 68 # This method is no longer used but is retained for backward compatibility, 69 # so test to make sure it still works. 70 def testQuoteData(self): 71 teststr = "abc\n.jkl\rfoo\r\n..blue" 72 expected = "abc\r\n..jkl\r\nfoo\r\n...blue" 73 self.assertEqual(expected, smtplib.quotedata(teststr)) 74 75 def testBasic1(self): 76 mock_socket.reply_with(b"220 Hola mundo") 77 # connects 78 smtp = smtplib.SMTP(HOST, self.port) 79 smtp.close() 80 81 def testSourceAddress(self): 82 mock_socket.reply_with(b"220 Hola mundo") 83 # connects 84 smtp = smtplib.SMTP(HOST, self.port, 85 source_address=('127.0.0.1',19876)) 86 self.assertEqual(smtp.source_address, ('127.0.0.1', 19876)) 87 smtp.close() 88 89 def testBasic2(self): 90 mock_socket.reply_with(b"220 Hola mundo") 91 # connects, include port in host name 92 smtp = smtplib.SMTP("%s:%s" % (HOST, self.port)) 93 smtp.close() 94 95 def testLocalHostName(self): 96 mock_socket.reply_with(b"220 Hola mundo") 97 # check that supplied local_hostname is used 98 smtp = smtplib.SMTP(HOST, self.port, local_hostname="testhost") 99 self.assertEqual(smtp.local_hostname, "testhost") 100 smtp.close() 101 102 def testTimeoutDefault(self): 103 mock_socket.reply_with(b"220 Hola mundo") 104 self.assertIsNone(mock_socket.getdefaulttimeout()) 105 mock_socket.setdefaulttimeout(30) 106 self.assertEqual(mock_socket.getdefaulttimeout(), 30) 107 try: 108 smtp = smtplib.SMTP(HOST, self.port) 109 finally: 110 mock_socket.setdefaulttimeout(None) 111 self.assertEqual(smtp.sock.gettimeout(), 30) 112 smtp.close() 113 114 def testTimeoutNone(self): 115 mock_socket.reply_with(b"220 Hola mundo") 116 self.assertIsNone(socket.getdefaulttimeout()) 117 socket.setdefaulttimeout(30) 118 try: 119 smtp = smtplib.SMTP(HOST, self.port, timeout=None) 120 finally: 121 socket.setdefaulttimeout(None) 122 self.assertIsNone(smtp.sock.gettimeout()) 123 smtp.close() 124 125 def testTimeoutValue(self): 126 mock_socket.reply_with(b"220 Hola mundo") 127 smtp = smtplib.SMTP(HOST, self.port, timeout=30) 128 self.assertEqual(smtp.sock.gettimeout(), 30) 129 smtp.close() 130 131 def test_debuglevel(self): 132 mock_socket.reply_with(b"220 Hello world") 133 smtp = smtplib.SMTP() 134 smtp.set_debuglevel(1) 135 with support.captured_stderr() as stderr: 136 smtp.connect(HOST, self.port) 137 smtp.close() 138 expected = re.compile(r"^connect:", re.MULTILINE) 139 self.assertRegex(stderr.getvalue(), expected) 140 141 def test_debuglevel_2(self): 142 mock_socket.reply_with(b"220 Hello world") 143 smtp = smtplib.SMTP() 144 smtp.set_debuglevel(2) 145 with support.captured_stderr() as stderr: 146 smtp.connect(HOST, self.port) 147 smtp.close() 148 expected = re.compile(r"^\d{2}:\d{2}:\d{2}\.\d{6} connect: ", 149 re.MULTILINE) 150 self.assertRegex(stderr.getvalue(), expected) 151 152 153# Test server thread using the specified SMTP server class 154def debugging_server(serv, serv_evt, client_evt): 155 serv_evt.set() 156 157 try: 158 if hasattr(select, 'poll'): 159 poll_fun = asyncore.poll2 160 else: 161 poll_fun = asyncore.poll 162 163 n = 1000 164 while asyncore.socket_map and n > 0: 165 poll_fun(0.01, asyncore.socket_map) 166 167 # when the client conversation is finished, it will 168 # set client_evt, and it's then ok to kill the server 169 if client_evt.is_set(): 170 serv.close() 171 break 172 173 n -= 1 174 175 except socket.timeout: 176 pass 177 finally: 178 if not client_evt.is_set(): 179 # allow some time for the client to read the result 180 time.sleep(0.5) 181 serv.close() 182 asyncore.close_all() 183 serv_evt.set() 184 185MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n' 186MSG_END = '------------ END MESSAGE ------------\n' 187 188# NOTE: Some SMTP objects in the tests below are created with a non-default 189# local_hostname argument to the constructor, since (on some systems) the FQDN 190# lookup caused by the default local_hostname sometimes takes so long that the 191# test server times out, causing the test to fail. 192 193# Test behavior of smtpd.DebuggingServer 194class DebuggingServerTests(unittest.TestCase): 195 196 maxDiff = None 197 198 def setUp(self): 199 self.thread_key = threading_setup() 200 self.real_getfqdn = socket.getfqdn 201 socket.getfqdn = mock_socket.getfqdn 202 # temporarily replace sys.stdout to capture DebuggingServer output 203 self.old_stdout = sys.stdout 204 self.output = io.StringIO() 205 sys.stdout = self.output 206 207 self.serv_evt = threading.Event() 208 self.client_evt = threading.Event() 209 # Capture SMTPChannel debug output 210 self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM 211 smtpd.DEBUGSTREAM = io.StringIO() 212 # Pick a random unused port by passing 0 for the port number 213 self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1), 214 decode_data=True) 215 # Keep a note of what server host and port were assigned 216 self.host, self.port = self.serv.socket.getsockname()[:2] 217 serv_args = (self.serv, self.serv_evt, self.client_evt) 218 self.thread = threading.Thread(target=debugging_server, args=serv_args) 219 self.thread.start() 220 221 # wait until server thread has assigned a port number 222 self.serv_evt.wait() 223 self.serv_evt.clear() 224 225 def tearDown(self): 226 socket.getfqdn = self.real_getfqdn 227 # indicate that the client is finished 228 self.client_evt.set() 229 # wait for the server thread to terminate 230 self.serv_evt.wait() 231 join_thread(self.thread) 232 # restore sys.stdout 233 sys.stdout = self.old_stdout 234 # restore DEBUGSTREAM 235 smtpd.DEBUGSTREAM.close() 236 smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM 237 del self.thread 238 self.doCleanups() 239 threading_cleanup(*self.thread_key) 240 241 def get_output_without_xpeer(self): 242 test_output = self.output.getvalue() 243 return re.sub(r'(.*?)^X-Peer:\s*\S+\n(.*)', r'\1\2', 244 test_output, flags=re.MULTILINE|re.DOTALL) 245 246 def testBasic(self): 247 # connect 248 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 249 smtp.quit() 250 251 def testSourceAddress(self): 252 # connect 253 src_port = support.find_unused_port() 254 try: 255 smtp = smtplib.SMTP(self.host, self.port, local_hostname='localhost', 256 timeout=3, source_address=(self.host, src_port)) 257 self.addCleanup(smtp.close) 258 self.assertEqual(smtp.source_address, (self.host, src_port)) 259 self.assertEqual(smtp.local_hostname, 'localhost') 260 smtp.quit() 261 except OSError as e: 262 if e.errno == errno.EADDRINUSE: 263 self.skipTest("couldn't bind to source port %d" % src_port) 264 raise 265 266 def testNOOP(self): 267 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 268 self.addCleanup(smtp.close) 269 expected = (250, b'OK') 270 self.assertEqual(smtp.noop(), expected) 271 smtp.quit() 272 273 def testRSET(self): 274 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 275 self.addCleanup(smtp.close) 276 expected = (250, b'OK') 277 self.assertEqual(smtp.rset(), expected) 278 smtp.quit() 279 280 def testELHO(self): 281 # EHLO isn't implemented in DebuggingServer 282 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 283 self.addCleanup(smtp.close) 284 expected = (250, b'\nSIZE 33554432\nHELP') 285 self.assertEqual(smtp.ehlo(), expected) 286 smtp.quit() 287 288 def testEXPNNotImplemented(self): 289 # EXPN isn't implemented in DebuggingServer 290 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 291 self.addCleanup(smtp.close) 292 expected = (502, b'EXPN not implemented') 293 smtp.putcmd('EXPN') 294 self.assertEqual(smtp.getreply(), expected) 295 smtp.quit() 296 297 def test_issue43124_putcmd_escapes_newline(self): 298 # see: https://bugs.python.org/issue43124 299 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 300 timeout=10) # support.LOOPBACK_TIMEOUT in newer Pythons 301 self.addCleanup(smtp.close) 302 with self.assertRaises(ValueError) as exc: 303 smtp.putcmd('helo\nX-INJECTED') 304 self.assertIn("prohibited newline characters", str(exc.exception)) 305 smtp.quit() 306 307 def testVRFY(self): 308 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 309 self.addCleanup(smtp.close) 310 expected = (252, b'Cannot VRFY user, but will accept message ' + \ 311 b'and attempt delivery') 312 self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected) 313 self.assertEqual(smtp.verify('nobody@nowhere.com'), expected) 314 smtp.quit() 315 316 def testSecondHELO(self): 317 # check that a second HELO returns a message that it's a duplicate 318 # (this behavior is specific to smtpd.SMTPChannel) 319 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 320 self.addCleanup(smtp.close) 321 smtp.helo() 322 expected = (503, b'Duplicate HELO/EHLO') 323 self.assertEqual(smtp.helo(), expected) 324 smtp.quit() 325 326 def testHELP(self): 327 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 328 self.addCleanup(smtp.close) 329 self.assertEqual(smtp.help(), b'Supported commands: EHLO HELO MAIL ' + \ 330 b'RCPT DATA RSET NOOP QUIT VRFY') 331 smtp.quit() 332 333 def testSend(self): 334 # connect and send mail 335 m = 'A test message' 336 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 337 self.addCleanup(smtp.close) 338 smtp.sendmail('John', 'Sally', m) 339 # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor 340 # in asyncore. This sleep might help, but should really be fixed 341 # properly by using an Event variable. 342 time.sleep(0.01) 343 smtp.quit() 344 345 self.client_evt.set() 346 self.serv_evt.wait() 347 self.output.flush() 348 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 349 self.assertEqual(self.output.getvalue(), mexpect) 350 351 def testSendBinary(self): 352 m = b'A test message' 353 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 354 self.addCleanup(smtp.close) 355 smtp.sendmail('John', 'Sally', m) 356 # XXX (see comment in testSend) 357 time.sleep(0.01) 358 smtp.quit() 359 360 self.client_evt.set() 361 self.serv_evt.wait() 362 self.output.flush() 363 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END) 364 self.assertEqual(self.output.getvalue(), mexpect) 365 366 def testSendNeedingDotQuote(self): 367 # Issue 12283 368 m = '.A test\n.mes.sage.' 369 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 370 self.addCleanup(smtp.close) 371 smtp.sendmail('John', 'Sally', m) 372 # XXX (see comment in testSend) 373 time.sleep(0.01) 374 smtp.quit() 375 376 self.client_evt.set() 377 self.serv_evt.wait() 378 self.output.flush() 379 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 380 self.assertEqual(self.output.getvalue(), mexpect) 381 382 def test_issue43124_escape_localhostname(self): 383 # see: https://bugs.python.org/issue43124 384 # connect and send mail 385 m = 'wazzuuup\nlinetwo' 386 smtp = smtplib.SMTP(HOST, self.port, local_hostname='hi\nX-INJECTED', 387 timeout=10) # support.LOOPBACK_TIMEOUT in newer Pythons 388 self.addCleanup(smtp.close) 389 with self.assertRaises(ValueError) as exc: 390 smtp.sendmail("hi@me.com", "you@me.com", m) 391 self.assertIn( 392 "prohibited newline characters: ehlo hi\\nX-INJECTED", 393 str(exc.exception), 394 ) 395 # XXX (see comment in testSend) 396 time.sleep(0.01) 397 smtp.quit() 398 399 debugout = smtpd.DEBUGSTREAM.getvalue() 400 self.assertNotIn("X-INJECTED", debugout) 401 402 def test_issue43124_escape_options(self): 403 # see: https://bugs.python.org/issue43124 404 # connect and send mail 405 m = 'wazzuuup\nlinetwo' 406 smtp = smtplib.SMTP( 407 HOST, self.port, local_hostname='localhost', 408 timeout=10) # support.LOOPBACK_TIMEOUT in newer Pythons 409 410 self.addCleanup(smtp.close) 411 smtp.sendmail("hi@me.com", "you@me.com", m) 412 with self.assertRaises(ValueError) as exc: 413 smtp.mail("hi@me.com", ["X-OPTION\nX-INJECTED-1", "X-OPTION2\nX-INJECTED-2"]) 414 msg = str(exc.exception) 415 self.assertIn("prohibited newline characters", msg) 416 self.assertIn("X-OPTION\\nX-INJECTED-1 X-OPTION2\\nX-INJECTED-2", msg) 417 # XXX (see comment in testSend) 418 time.sleep(0.01) 419 smtp.quit() 420 421 debugout = smtpd.DEBUGSTREAM.getvalue() 422 self.assertNotIn("X-OPTION", debugout) 423 self.assertNotIn("X-OPTION2", debugout) 424 self.assertNotIn("X-INJECTED-1", debugout) 425 self.assertNotIn("X-INJECTED-2", debugout) 426 427 def testSendNullSender(self): 428 m = 'A test message' 429 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 430 self.addCleanup(smtp.close) 431 smtp.sendmail('<>', 'Sally', m) 432 # XXX (see comment in testSend) 433 time.sleep(0.01) 434 smtp.quit() 435 436 self.client_evt.set() 437 self.serv_evt.wait() 438 self.output.flush() 439 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 440 self.assertEqual(self.output.getvalue(), mexpect) 441 debugout = smtpd.DEBUGSTREAM.getvalue() 442 sender = re.compile("^sender: <>$", re.MULTILINE) 443 self.assertRegex(debugout, sender) 444 445 def testSendMessage(self): 446 m = email.mime.text.MIMEText('A test message') 447 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 448 self.addCleanup(smtp.close) 449 smtp.send_message(m, from_addr='John', to_addrs='Sally') 450 # XXX (see comment in testSend) 451 time.sleep(0.01) 452 smtp.quit() 453 454 self.client_evt.set() 455 self.serv_evt.wait() 456 self.output.flush() 457 # Remove the X-Peer header that DebuggingServer adds as figuring out 458 # exactly what IP address format is put there is not easy (and 459 # irrelevant to our test). Typically 127.0.0.1 or ::1, but it is 460 # not always the same as socket.gethostbyname(HOST). :( 461 test_output = self.get_output_without_xpeer() 462 del m['X-Peer'] 463 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 464 self.assertEqual(test_output, mexpect) 465 466 def testSendMessageWithAddresses(self): 467 m = email.mime.text.MIMEText('A test message') 468 m['From'] = 'foo@bar.com' 469 m['To'] = 'John' 470 m['CC'] = 'Sally, Fred' 471 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>' 472 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 473 self.addCleanup(smtp.close) 474 smtp.send_message(m) 475 # XXX (see comment in testSend) 476 time.sleep(0.01) 477 smtp.quit() 478 # make sure the Bcc header is still in the message. 479 self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" ' 480 '<warped@silly.walks.com>') 481 482 self.client_evt.set() 483 self.serv_evt.wait() 484 self.output.flush() 485 # Remove the X-Peer header that DebuggingServer adds. 486 test_output = self.get_output_without_xpeer() 487 del m['X-Peer'] 488 # The Bcc header should not be transmitted. 489 del m['Bcc'] 490 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 491 self.assertEqual(test_output, mexpect) 492 debugout = smtpd.DEBUGSTREAM.getvalue() 493 sender = re.compile("^sender: foo@bar.com$", re.MULTILINE) 494 self.assertRegex(debugout, sender) 495 for addr in ('John', 'Sally', 'Fred', 'root@localhost', 496 'warped@silly.walks.com'): 497 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 498 re.MULTILINE) 499 self.assertRegex(debugout, to_addr) 500 501 def testSendMessageWithSomeAddresses(self): 502 # Make sure nothing breaks if not all of the three 'to' headers exist 503 m = email.mime.text.MIMEText('A test message') 504 m['From'] = 'foo@bar.com' 505 m['To'] = 'John, Dinsdale' 506 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 507 self.addCleanup(smtp.close) 508 smtp.send_message(m) 509 # XXX (see comment in testSend) 510 time.sleep(0.01) 511 smtp.quit() 512 513 self.client_evt.set() 514 self.serv_evt.wait() 515 self.output.flush() 516 # Remove the X-Peer header that DebuggingServer adds. 517 test_output = self.get_output_without_xpeer() 518 del m['X-Peer'] 519 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 520 self.assertEqual(test_output, mexpect) 521 debugout = smtpd.DEBUGSTREAM.getvalue() 522 sender = re.compile("^sender: foo@bar.com$", re.MULTILINE) 523 self.assertRegex(debugout, sender) 524 for addr in ('John', 'Dinsdale'): 525 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 526 re.MULTILINE) 527 self.assertRegex(debugout, to_addr) 528 529 def testSendMessageWithSpecifiedAddresses(self): 530 # Make sure addresses specified in call override those in message. 531 m = email.mime.text.MIMEText('A test message') 532 m['From'] = 'foo@bar.com' 533 m['To'] = 'John, Dinsdale' 534 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 535 self.addCleanup(smtp.close) 536 smtp.send_message(m, from_addr='joe@example.com', to_addrs='foo@example.net') 537 # XXX (see comment in testSend) 538 time.sleep(0.01) 539 smtp.quit() 540 541 self.client_evt.set() 542 self.serv_evt.wait() 543 self.output.flush() 544 # Remove the X-Peer header that DebuggingServer adds. 545 test_output = self.get_output_without_xpeer() 546 del m['X-Peer'] 547 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 548 self.assertEqual(test_output, mexpect) 549 debugout = smtpd.DEBUGSTREAM.getvalue() 550 sender = re.compile("^sender: joe@example.com$", re.MULTILINE) 551 self.assertRegex(debugout, sender) 552 for addr in ('John', 'Dinsdale'): 553 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 554 re.MULTILINE) 555 self.assertNotRegex(debugout, to_addr) 556 recip = re.compile(r"^recips: .*'foo@example.net'.*$", re.MULTILINE) 557 self.assertRegex(debugout, recip) 558 559 def testSendMessageWithMultipleFrom(self): 560 # Sender overrides To 561 m = email.mime.text.MIMEText('A test message') 562 m['From'] = 'Bernard, Bianca' 563 m['Sender'] = 'the_rescuers@Rescue-Aid-Society.com' 564 m['To'] = 'John, Dinsdale' 565 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 566 self.addCleanup(smtp.close) 567 smtp.send_message(m) 568 # XXX (see comment in testSend) 569 time.sleep(0.01) 570 smtp.quit() 571 572 self.client_evt.set() 573 self.serv_evt.wait() 574 self.output.flush() 575 # Remove the X-Peer header that DebuggingServer adds. 576 test_output = self.get_output_without_xpeer() 577 del m['X-Peer'] 578 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 579 self.assertEqual(test_output, mexpect) 580 debugout = smtpd.DEBUGSTREAM.getvalue() 581 sender = re.compile("^sender: the_rescuers@Rescue-Aid-Society.com$", re.MULTILINE) 582 self.assertRegex(debugout, sender) 583 for addr in ('John', 'Dinsdale'): 584 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 585 re.MULTILINE) 586 self.assertRegex(debugout, to_addr) 587 588 def testSendMessageResent(self): 589 m = email.mime.text.MIMEText('A test message') 590 m['From'] = 'foo@bar.com' 591 m['To'] = 'John' 592 m['CC'] = 'Sally, Fred' 593 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>' 594 m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000' 595 m['Resent-From'] = 'holy@grail.net' 596 m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff' 597 m['Resent-Bcc'] = 'doe@losthope.net' 598 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 599 self.addCleanup(smtp.close) 600 smtp.send_message(m) 601 # XXX (see comment in testSend) 602 time.sleep(0.01) 603 smtp.quit() 604 605 self.client_evt.set() 606 self.serv_evt.wait() 607 self.output.flush() 608 # The Resent-Bcc headers are deleted before serialization. 609 del m['Bcc'] 610 del m['Resent-Bcc'] 611 # Remove the X-Peer header that DebuggingServer adds. 612 test_output = self.get_output_without_xpeer() 613 del m['X-Peer'] 614 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 615 self.assertEqual(test_output, mexpect) 616 debugout = smtpd.DEBUGSTREAM.getvalue() 617 sender = re.compile("^sender: holy@grail.net$", re.MULTILINE) 618 self.assertRegex(debugout, sender) 619 for addr in ('my_mom@great.cooker.com', 'Jeff', 'doe@losthope.net'): 620 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 621 re.MULTILINE) 622 self.assertRegex(debugout, to_addr) 623 624 def testSendMessageMultipleResentRaises(self): 625 m = email.mime.text.MIMEText('A test message') 626 m['From'] = 'foo@bar.com' 627 m['To'] = 'John' 628 m['CC'] = 'Sally, Fred' 629 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>' 630 m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000' 631 m['Resent-From'] = 'holy@grail.net' 632 m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff' 633 m['Resent-Bcc'] = 'doe@losthope.net' 634 m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000' 635 m['Resent-To'] = 'holy@grail.net' 636 m['Resent-From'] = 'Martha <my_mom@great.cooker.com>, Jeff' 637 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 638 self.addCleanup(smtp.close) 639 with self.assertRaises(ValueError): 640 smtp.send_message(m) 641 smtp.close() 642 643class NonConnectingTests(unittest.TestCase): 644 645 def testNotConnected(self): 646 # Test various operations on an unconnected SMTP object that 647 # should raise exceptions (at present the attempt in SMTP.send 648 # to reference the nonexistent 'sock' attribute of the SMTP object 649 # causes an AttributeError) 650 smtp = smtplib.SMTP() 651 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo) 652 self.assertRaises(smtplib.SMTPServerDisconnected, 653 smtp.send, 'test msg') 654 655 def testNonnumericPort(self): 656 # check that non-numeric port raises OSError 657 self.assertRaises(OSError, smtplib.SMTP, 658 "localhost", "bogus") 659 self.assertRaises(OSError, smtplib.SMTP, 660 "localhost:bogus") 661 662 def testSockAttributeExists(self): 663 # check that sock attribute is present outside of a connect() call 664 # (regression test, the previous behavior raised an 665 # AttributeError: 'SMTP' object has no attribute 'sock') 666 with smtplib.SMTP() as smtp: 667 self.assertIsNone(smtp.sock) 668 669 670class DefaultArgumentsTests(unittest.TestCase): 671 672 def setUp(self): 673 self.msg = EmailMessage() 674 self.msg['From'] = 'Páolo <főo@bar.com>' 675 self.smtp = smtplib.SMTP() 676 self.smtp.ehlo = Mock(return_value=(200, 'OK')) 677 self.smtp.has_extn, self.smtp.sendmail = Mock(), Mock() 678 679 def testSendMessage(self): 680 expected_mail_options = ('SMTPUTF8', 'BODY=8BITMIME') 681 self.smtp.send_message(self.msg) 682 self.smtp.send_message(self.msg) 683 self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3], 684 expected_mail_options) 685 self.assertEqual(self.smtp.sendmail.call_args_list[1][0][3], 686 expected_mail_options) 687 688 def testSendMessageWithMailOptions(self): 689 mail_options = ['STARTTLS'] 690 expected_mail_options = ('STARTTLS', 'SMTPUTF8', 'BODY=8BITMIME') 691 self.smtp.send_message(self.msg, None, None, mail_options) 692 self.assertEqual(mail_options, ['STARTTLS']) 693 self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3], 694 expected_mail_options) 695 696 697# test response of client to a non-successful HELO message 698class BadHELOServerTests(unittest.TestCase): 699 700 def setUp(self): 701 smtplib.socket = mock_socket 702 mock_socket.reply_with(b"199 no hello for you!") 703 self.old_stdout = sys.stdout 704 self.output = io.StringIO() 705 sys.stdout = self.output 706 self.port = 25 707 708 def tearDown(self): 709 smtplib.socket = socket 710 sys.stdout = self.old_stdout 711 712 def testFailingHELO(self): 713 self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP, 714 HOST, self.port, 'localhost', 3) 715 716 717class TooLongLineTests(unittest.TestCase): 718 respdata = b'250 OK' + (b'.' * smtplib._MAXLINE * 2) + b'\n' 719 720 def setUp(self): 721 self.thread_key = threading_setup() 722 self.old_stdout = sys.stdout 723 self.output = io.StringIO() 724 sys.stdout = self.output 725 726 self.evt = threading.Event() 727 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 728 self.sock.settimeout(15) 729 self.port = support.bind_port(self.sock) 730 servargs = (self.evt, self.respdata, self.sock) 731 self.thread = threading.Thread(target=server, args=servargs) 732 self.thread.start() 733 self.evt.wait() 734 self.evt.clear() 735 736 def tearDown(self): 737 self.evt.wait() 738 sys.stdout = self.old_stdout 739 join_thread(self.thread) 740 del self.thread 741 self.doCleanups() 742 threading_cleanup(*self.thread_key) 743 744 def testLineTooLong(self): 745 self.assertRaises(smtplib.SMTPResponseException, smtplib.SMTP, 746 HOST, self.port, 'localhost', 3) 747 748 749sim_users = {'Mr.A@somewhere.com':'John A', 750 'Ms.B@xn--fo-fka.com':'Sally B', 751 'Mrs.C@somewhereesle.com':'Ruth C', 752 } 753 754sim_auth = ('Mr.A@somewhere.com', 'somepassword') 755sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn' 756 'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=') 757sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'], 758 'list-2':['Ms.B@xn--fo-fka.com',], 759 } 760 761# Simulated SMTP channel & server 762class ResponseException(Exception): pass 763class SimSMTPChannel(smtpd.SMTPChannel): 764 765 quit_response = None 766 mail_response = None 767 rcpt_response = None 768 data_response = None 769 rcpt_count = 0 770 rset_count = 0 771 disconnect = 0 772 AUTH = 99 # Add protocol state to enable auth testing. 773 authenticated_user = None 774 775 def __init__(self, extra_features, *args, **kw): 776 self._extrafeatures = ''.join( 777 [ "250-{0}\r\n".format(x) for x in extra_features ]) 778 super(SimSMTPChannel, self).__init__(*args, **kw) 779 780 # AUTH related stuff. It would be nice if support for this were in smtpd. 781 def found_terminator(self): 782 if self.smtp_state == self.AUTH: 783 line = self._emptystring.join(self.received_lines) 784 print('Data:', repr(line), file=smtpd.DEBUGSTREAM) 785 self.received_lines = [] 786 try: 787 self.auth_object(line) 788 except ResponseException as e: 789 self.smtp_state = self.COMMAND 790 self.push('%s %s' % (e.smtp_code, e.smtp_error)) 791 return 792 super().found_terminator() 793 794 795 def smtp_AUTH(self, arg): 796 if not self.seen_greeting: 797 self.push('503 Error: send EHLO first') 798 return 799 if not self.extended_smtp or 'AUTH' not in self._extrafeatures: 800 self.push('500 Error: command "AUTH" not recognized') 801 return 802 if self.authenticated_user is not None: 803 self.push( 804 '503 Bad sequence of commands: already authenticated') 805 return 806 args = arg.split() 807 if len(args) not in [1, 2]: 808 self.push('501 Syntax: AUTH <mechanism> [initial-response]') 809 return 810 auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_') 811 try: 812 self.auth_object = getattr(self, auth_object_name) 813 except AttributeError: 814 self.push('504 Command parameter not implemented: unsupported ' 815 ' authentication mechanism {!r}'.format(auth_object_name)) 816 return 817 self.smtp_state = self.AUTH 818 self.auth_object(args[1] if len(args) == 2 else None) 819 820 def _authenticated(self, user, valid): 821 if valid: 822 self.authenticated_user = user 823 self.push('235 Authentication Succeeded') 824 else: 825 self.push('535 Authentication credentials invalid') 826 self.smtp_state = self.COMMAND 827 828 def _decode_base64(self, string): 829 return base64.decodebytes(string.encode('ascii')).decode('utf-8') 830 831 def _auth_plain(self, arg=None): 832 if arg is None: 833 self.push('334 ') 834 else: 835 logpass = self._decode_base64(arg) 836 try: 837 *_, user, password = logpass.split('\0') 838 except ValueError as e: 839 self.push('535 Splitting response {!r} into user and password' 840 ' failed: {}'.format(logpass, e)) 841 return 842 self._authenticated(user, password == sim_auth[1]) 843 844 def _auth_login(self, arg=None): 845 if arg is None: 846 # base64 encoded 'Username:' 847 self.push('334 VXNlcm5hbWU6') 848 elif not hasattr(self, '_auth_login_user'): 849 self._auth_login_user = self._decode_base64(arg) 850 # base64 encoded 'Password:' 851 self.push('334 UGFzc3dvcmQ6') 852 else: 853 password = self._decode_base64(arg) 854 self._authenticated(self._auth_login_user, password == sim_auth[1]) 855 del self._auth_login_user 856 857 def _auth_buggy(self, arg=None): 858 # This AUTH mechanism will 'trap' client in a neverending 334 859 # base64 encoded 'BuGgYbUgGy' 860 self.push('334 QnVHZ1liVWdHeQ==') 861 862 def _auth_cram_md5(self, arg=None): 863 if arg is None: 864 self.push('334 {}'.format(sim_cram_md5_challenge)) 865 else: 866 logpass = self._decode_base64(arg) 867 try: 868 user, hashed_pass = logpass.split() 869 except ValueError as e: 870 self.push('535 Splitting response {!r} into user and password ' 871 'failed: {}'.format(logpass, e)) 872 return False 873 valid_hashed_pass = hmac.HMAC( 874 sim_auth[1].encode('ascii'), 875 self._decode_base64(sim_cram_md5_challenge).encode('ascii'), 876 'md5').hexdigest() 877 self._authenticated(user, hashed_pass == valid_hashed_pass) 878 # end AUTH related stuff. 879 880 def smtp_EHLO(self, arg): 881 resp = ('250-testhost\r\n' 882 '250-EXPN\r\n' 883 '250-SIZE 20000000\r\n' 884 '250-STARTTLS\r\n' 885 '250-DELIVERBY\r\n') 886 resp = resp + self._extrafeatures + '250 HELP' 887 self.push(resp) 888 self.seen_greeting = arg 889 self.extended_smtp = True 890 891 def smtp_VRFY(self, arg): 892 # For max compatibility smtplib should be sending the raw address. 893 if arg in sim_users: 894 self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg))) 895 else: 896 self.push('550 No such user: %s' % arg) 897 898 def smtp_EXPN(self, arg): 899 list_name = arg.lower() 900 if list_name in sim_lists: 901 user_list = sim_lists[list_name] 902 for n, user_email in enumerate(user_list): 903 quoted_addr = smtplib.quoteaddr(user_email) 904 if n < len(user_list) - 1: 905 self.push('250-%s %s' % (sim_users[user_email], quoted_addr)) 906 else: 907 self.push('250 %s %s' % (sim_users[user_email], quoted_addr)) 908 else: 909 self.push('550 No access for you!') 910 911 def smtp_QUIT(self, arg): 912 if self.quit_response is None: 913 super(SimSMTPChannel, self).smtp_QUIT(arg) 914 else: 915 self.push(self.quit_response) 916 self.close_when_done() 917 918 def smtp_MAIL(self, arg): 919 if self.mail_response is None: 920 super().smtp_MAIL(arg) 921 else: 922 self.push(self.mail_response) 923 if self.disconnect: 924 self.close_when_done() 925 926 def smtp_RCPT(self, arg): 927 if self.rcpt_response is None: 928 super().smtp_RCPT(arg) 929 return 930 self.rcpt_count += 1 931 self.push(self.rcpt_response[self.rcpt_count-1]) 932 933 def smtp_RSET(self, arg): 934 self.rset_count += 1 935 super().smtp_RSET(arg) 936 937 def smtp_DATA(self, arg): 938 if self.data_response is None: 939 super().smtp_DATA(arg) 940 else: 941 self.push(self.data_response) 942 943 def handle_error(self): 944 raise 945 946 947class SimSMTPServer(smtpd.SMTPServer): 948 949 channel_class = SimSMTPChannel 950 951 def __init__(self, *args, **kw): 952 self._extra_features = [] 953 self._addresses = {} 954 smtpd.SMTPServer.__init__(self, *args, **kw) 955 956 def handle_accepted(self, conn, addr): 957 self._SMTPchannel = self.channel_class( 958 self._extra_features, self, conn, addr, 959 decode_data=self._decode_data) 960 961 def process_message(self, peer, mailfrom, rcpttos, data): 962 self._addresses['from'] = mailfrom 963 self._addresses['tos'] = rcpttos 964 965 def add_feature(self, feature): 966 self._extra_features.append(feature) 967 968 def handle_error(self): 969 raise 970 971 972# Test various SMTP & ESMTP commands/behaviors that require a simulated server 973# (i.e., something with more features than DebuggingServer) 974class SMTPSimTests(unittest.TestCase): 975 976 def setUp(self): 977 self.thread_key = threading_setup() 978 self.real_getfqdn = socket.getfqdn 979 socket.getfqdn = mock_socket.getfqdn 980 self.serv_evt = threading.Event() 981 self.client_evt = threading.Event() 982 # Pick a random unused port by passing 0 for the port number 983 self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1), decode_data=True) 984 # Keep a note of what port was assigned 985 self.port = self.serv.socket.getsockname()[1] 986 serv_args = (self.serv, self.serv_evt, self.client_evt) 987 self.thread = threading.Thread(target=debugging_server, args=serv_args) 988 self.thread.start() 989 990 # wait until server thread has assigned a port number 991 self.serv_evt.wait() 992 self.serv_evt.clear() 993 994 def tearDown(self): 995 socket.getfqdn = self.real_getfqdn 996 # indicate that the client is finished 997 self.client_evt.set() 998 # wait for the server thread to terminate 999 self.serv_evt.wait() 1000 join_thread(self.thread) 1001 del self.thread 1002 self.doCleanups() 1003 threading_cleanup(*self.thread_key) 1004 1005 def testBasic(self): 1006 # smoke test 1007 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1008 smtp.quit() 1009 1010 def testEHLO(self): 1011 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1012 1013 # no features should be present before the EHLO 1014 self.assertEqual(smtp.esmtp_features, {}) 1015 1016 # features expected from the test server 1017 expected_features = {'expn':'', 1018 'size': '20000000', 1019 'starttls': '', 1020 'deliverby': '', 1021 'help': '', 1022 } 1023 1024 smtp.ehlo() 1025 self.assertEqual(smtp.esmtp_features, expected_features) 1026 for k in expected_features: 1027 self.assertTrue(smtp.has_extn(k)) 1028 self.assertFalse(smtp.has_extn('unsupported-feature')) 1029 smtp.quit() 1030 1031 def testVRFY(self): 1032 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1033 1034 for addr_spec, name in sim_users.items(): 1035 expected_known = (250, bytes('%s %s' % 1036 (name, smtplib.quoteaddr(addr_spec)), 1037 "ascii")) 1038 self.assertEqual(smtp.vrfy(addr_spec), expected_known) 1039 1040 u = 'nobody@nowhere.com' 1041 expected_unknown = (550, ('No such user: %s' % u).encode('ascii')) 1042 self.assertEqual(smtp.vrfy(u), expected_unknown) 1043 smtp.quit() 1044 1045 def testEXPN(self): 1046 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1047 1048 for listname, members in sim_lists.items(): 1049 users = [] 1050 for m in members: 1051 users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m))) 1052 expected_known = (250, bytes('\n'.join(users), "ascii")) 1053 self.assertEqual(smtp.expn(listname), expected_known) 1054 1055 u = 'PSU-Members-List' 1056 expected_unknown = (550, b'No access for you!') 1057 self.assertEqual(smtp.expn(u), expected_unknown) 1058 smtp.quit() 1059 1060 def testAUTH_PLAIN(self): 1061 self.serv.add_feature("AUTH PLAIN") 1062 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1063 resp = smtp.login(sim_auth[0], sim_auth[1]) 1064 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1065 smtp.close() 1066 1067 def testAUTH_LOGIN(self): 1068 self.serv.add_feature("AUTH LOGIN") 1069 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1070 resp = smtp.login(sim_auth[0], sim_auth[1]) 1071 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1072 smtp.close() 1073 1074 def testAUTH_LOGIN_initial_response_ok(self): 1075 self.serv.add_feature("AUTH LOGIN") 1076 with smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) as smtp: 1077 smtp.user, smtp.password = sim_auth 1078 smtp.ehlo("test_auth_login") 1079 resp = smtp.auth("LOGIN", smtp.auth_login, initial_response_ok=True) 1080 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1081 1082 def testAUTH_LOGIN_initial_response_notok(self): 1083 self.serv.add_feature("AUTH LOGIN") 1084 with smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) as smtp: 1085 smtp.user, smtp.password = sim_auth 1086 smtp.ehlo("test_auth_login") 1087 resp = smtp.auth("LOGIN", smtp.auth_login, initial_response_ok=False) 1088 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1089 1090 def testAUTH_BUGGY(self): 1091 self.serv.add_feature("AUTH BUGGY") 1092 1093 def auth_buggy(challenge=None): 1094 self.assertEqual(b"BuGgYbUgGy", challenge) 1095 return "\0" 1096 1097 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1098 try: 1099 smtp.user, smtp.password = sim_auth 1100 smtp.ehlo("test_auth_buggy") 1101 expect = r"^Server AUTH mechanism infinite loop.*" 1102 with self.assertRaisesRegex(smtplib.SMTPException, expect) as cm: 1103 smtp.auth("BUGGY", auth_buggy, initial_response_ok=False) 1104 finally: 1105 smtp.close() 1106 1107 @requires_hashdigest('md5') 1108 def testAUTH_CRAM_MD5(self): 1109 self.serv.add_feature("AUTH CRAM-MD5") 1110 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1111 resp = smtp.login(sim_auth[0], sim_auth[1]) 1112 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1113 smtp.close() 1114 1115 def testAUTH_multiple(self): 1116 # Test that multiple authentication methods are tried. 1117 self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5") 1118 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1119 resp = smtp.login(sim_auth[0], sim_auth[1]) 1120 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1121 smtp.close() 1122 1123 def test_auth_function(self): 1124 supported = {'PLAIN', 'LOGIN'} 1125 try: 1126 hashlib.md5() 1127 except ValueError: 1128 pass 1129 else: 1130 supported.add('CRAM-MD5') 1131 for mechanism in supported: 1132 self.serv.add_feature("AUTH {}".format(mechanism)) 1133 for mechanism in supported: 1134 with self.subTest(mechanism=mechanism): 1135 smtp = smtplib.SMTP(HOST, self.port, 1136 local_hostname='localhost', timeout=15) 1137 smtp.ehlo('foo') 1138 smtp.user, smtp.password = sim_auth[0], sim_auth[1] 1139 method = 'auth_' + mechanism.lower().replace('-', '_') 1140 resp = smtp.auth(mechanism, getattr(smtp, method)) 1141 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1142 smtp.close() 1143 1144 def test_quit_resets_greeting(self): 1145 smtp = smtplib.SMTP(HOST, self.port, 1146 local_hostname='localhost', 1147 timeout=15) 1148 code, message = smtp.ehlo() 1149 self.assertEqual(code, 250) 1150 self.assertIn('size', smtp.esmtp_features) 1151 smtp.quit() 1152 self.assertNotIn('size', smtp.esmtp_features) 1153 smtp.connect(HOST, self.port) 1154 self.assertNotIn('size', smtp.esmtp_features) 1155 smtp.ehlo_or_helo_if_needed() 1156 self.assertIn('size', smtp.esmtp_features) 1157 smtp.quit() 1158 1159 def test_with_statement(self): 1160 with smtplib.SMTP(HOST, self.port) as smtp: 1161 code, message = smtp.noop() 1162 self.assertEqual(code, 250) 1163 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo') 1164 with smtplib.SMTP(HOST, self.port) as smtp: 1165 smtp.close() 1166 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo') 1167 1168 def test_with_statement_QUIT_failure(self): 1169 with self.assertRaises(smtplib.SMTPResponseException) as error: 1170 with smtplib.SMTP(HOST, self.port) as smtp: 1171 smtp.noop() 1172 self.serv._SMTPchannel.quit_response = '421 QUIT FAILED' 1173 self.assertEqual(error.exception.smtp_code, 421) 1174 self.assertEqual(error.exception.smtp_error, b'QUIT FAILED') 1175 1176 #TODO: add tests for correct AUTH method fallback now that the 1177 #test infrastructure can support it. 1178 1179 # Issue 17498: make sure _rset does not raise SMTPServerDisconnected exception 1180 def test__rest_from_mail_cmd(self): 1181 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1182 smtp.noop() 1183 self.serv._SMTPchannel.mail_response = '451 Requested action aborted' 1184 self.serv._SMTPchannel.disconnect = True 1185 with self.assertRaises(smtplib.SMTPSenderRefused): 1186 smtp.sendmail('John', 'Sally', 'test message') 1187 self.assertIsNone(smtp.sock) 1188 1189 # Issue 5713: make sure close, not rset, is called if we get a 421 error 1190 def test_421_from_mail_cmd(self): 1191 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1192 smtp.noop() 1193 self.serv._SMTPchannel.mail_response = '421 closing connection' 1194 with self.assertRaises(smtplib.SMTPSenderRefused): 1195 smtp.sendmail('John', 'Sally', 'test message') 1196 self.assertIsNone(smtp.sock) 1197 self.assertEqual(self.serv._SMTPchannel.rset_count, 0) 1198 1199 def test_421_from_rcpt_cmd(self): 1200 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1201 smtp.noop() 1202 self.serv._SMTPchannel.rcpt_response = ['250 accepted', '421 closing'] 1203 with self.assertRaises(smtplib.SMTPRecipientsRefused) as r: 1204 smtp.sendmail('John', ['Sally', 'Frank', 'George'], 'test message') 1205 self.assertIsNone(smtp.sock) 1206 self.assertEqual(self.serv._SMTPchannel.rset_count, 0) 1207 self.assertDictEqual(r.exception.args[0], {'Frank': (421, b'closing')}) 1208 1209 def test_421_from_data_cmd(self): 1210 class MySimSMTPChannel(SimSMTPChannel): 1211 def found_terminator(self): 1212 if self.smtp_state == self.DATA: 1213 self.push('421 closing') 1214 else: 1215 super().found_terminator() 1216 self.serv.channel_class = MySimSMTPChannel 1217 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1218 smtp.noop() 1219 with self.assertRaises(smtplib.SMTPDataError): 1220 smtp.sendmail('John@foo.org', ['Sally@foo.org'], 'test message') 1221 self.assertIsNone(smtp.sock) 1222 self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0) 1223 1224 def test_smtputf8_NotSupportedError_if_no_server_support(self): 1225 smtp = smtplib.SMTP( 1226 HOST, self.port, local_hostname='localhost', timeout=3) 1227 self.addCleanup(smtp.close) 1228 smtp.ehlo() 1229 self.assertTrue(smtp.does_esmtp) 1230 self.assertFalse(smtp.has_extn('smtputf8')) 1231 self.assertRaises( 1232 smtplib.SMTPNotSupportedError, 1233 smtp.sendmail, 1234 'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8']) 1235 self.assertRaises( 1236 smtplib.SMTPNotSupportedError, 1237 smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8']) 1238 1239 def test_send_unicode_without_SMTPUTF8(self): 1240 smtp = smtplib.SMTP( 1241 HOST, self.port, local_hostname='localhost', timeout=3) 1242 self.addCleanup(smtp.close) 1243 self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Böb', '') 1244 self.assertRaises(UnicodeEncodeError, smtp.mail, 'Älice') 1245 1246 def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self): 1247 # This test is located here and not in the SMTPUTF8SimTests 1248 # class because it needs a "regular" SMTP server to work 1249 msg = EmailMessage() 1250 msg['From'] = "Páolo <főo@bar.com>" 1251 msg['To'] = 'Dinsdale' 1252 msg['Subject'] = 'Nudge nudge, wink, wink \u1F609' 1253 smtp = smtplib.SMTP( 1254 HOST, self.port, local_hostname='localhost', timeout=3) 1255 self.addCleanup(smtp.close) 1256 with self.assertRaises(smtplib.SMTPNotSupportedError): 1257 smtp.send_message(msg) 1258 1259 def test_name_field_not_included_in_envelop_addresses(self): 1260 smtp = smtplib.SMTP( 1261 HOST, self.port, local_hostname='localhost', timeout=3 1262 ) 1263 self.addCleanup(smtp.close) 1264 1265 message = EmailMessage() 1266 message['From'] = email.utils.formataddr(('Michaël', 'michael@example.com')) 1267 message['To'] = email.utils.formataddr(('René', 'rene@example.com')) 1268 1269 self.assertDictEqual(smtp.send_message(message), {}) 1270 1271 self.assertEqual(self.serv._addresses['from'], 'michael@example.com') 1272 self.assertEqual(self.serv._addresses['tos'], ['rene@example.com']) 1273 1274 1275class SimSMTPUTF8Server(SimSMTPServer): 1276 1277 def __init__(self, *args, **kw): 1278 # The base SMTP server turns these on automatically, but our test 1279 # server is set up to munge the EHLO response, so we need to provide 1280 # them as well. And yes, the call is to SMTPServer not SimSMTPServer. 1281 self._extra_features = ['SMTPUTF8', '8BITMIME'] 1282 smtpd.SMTPServer.__init__(self, *args, **kw) 1283 1284 def handle_accepted(self, conn, addr): 1285 self._SMTPchannel = self.channel_class( 1286 self._extra_features, self, conn, addr, 1287 decode_data=self._decode_data, 1288 enable_SMTPUTF8=self.enable_SMTPUTF8, 1289 ) 1290 1291 def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None, 1292 rcpt_options=None): 1293 self.last_peer = peer 1294 self.last_mailfrom = mailfrom 1295 self.last_rcpttos = rcpttos 1296 self.last_message = data 1297 self.last_mail_options = mail_options 1298 self.last_rcpt_options = rcpt_options 1299 1300 1301class SMTPUTF8SimTests(unittest.TestCase): 1302 1303 maxDiff = None 1304 1305 def setUp(self): 1306 self.thread_key = threading_setup() 1307 self.real_getfqdn = socket.getfqdn 1308 socket.getfqdn = mock_socket.getfqdn 1309 self.serv_evt = threading.Event() 1310 self.client_evt = threading.Event() 1311 # Pick a random unused port by passing 0 for the port number 1312 self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1), 1313 decode_data=False, 1314 enable_SMTPUTF8=True) 1315 # Keep a note of what port was assigned 1316 self.port = self.serv.socket.getsockname()[1] 1317 serv_args = (self.serv, self.serv_evt, self.client_evt) 1318 self.thread = threading.Thread(target=debugging_server, args=serv_args) 1319 self.thread.start() 1320 1321 # wait until server thread has assigned a port number 1322 self.serv_evt.wait() 1323 self.serv_evt.clear() 1324 1325 def tearDown(self): 1326 socket.getfqdn = self.real_getfqdn 1327 # indicate that the client is finished 1328 self.client_evt.set() 1329 # wait for the server thread to terminate 1330 self.serv_evt.wait() 1331 join_thread(self.thread) 1332 del self.thread 1333 self.doCleanups() 1334 threading_cleanup(*self.thread_key) 1335 1336 def test_test_server_supports_extensions(self): 1337 smtp = smtplib.SMTP( 1338 HOST, self.port, local_hostname='localhost', timeout=3) 1339 self.addCleanup(smtp.close) 1340 smtp.ehlo() 1341 self.assertTrue(smtp.does_esmtp) 1342 self.assertTrue(smtp.has_extn('smtputf8')) 1343 1344 def test_send_unicode_with_SMTPUTF8_via_sendmail(self): 1345 m = '¡a test message containing unicode!'.encode('utf-8') 1346 smtp = smtplib.SMTP( 1347 HOST, self.port, local_hostname='localhost', timeout=3) 1348 self.addCleanup(smtp.close) 1349 smtp.sendmail('Jőhn', 'Sálly', m, 1350 mail_options=['BODY=8BITMIME', 'SMTPUTF8']) 1351 self.assertEqual(self.serv.last_mailfrom, 'Jőhn') 1352 self.assertEqual(self.serv.last_rcpttos, ['Sálly']) 1353 self.assertEqual(self.serv.last_message, m) 1354 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 1355 self.assertIn('SMTPUTF8', self.serv.last_mail_options) 1356 self.assertEqual(self.serv.last_rcpt_options, []) 1357 1358 def test_send_unicode_with_SMTPUTF8_via_low_level_API(self): 1359 m = '¡a test message containing unicode!'.encode('utf-8') 1360 smtp = smtplib.SMTP( 1361 HOST, self.port, local_hostname='localhost', timeout=3) 1362 self.addCleanup(smtp.close) 1363 smtp.ehlo() 1364 self.assertEqual( 1365 smtp.mail('Jő', options=['BODY=8BITMIME', 'SMTPUTF8']), 1366 (250, b'OK')) 1367 self.assertEqual(smtp.rcpt('János'), (250, b'OK')) 1368 self.assertEqual(smtp.data(m), (250, b'OK')) 1369 self.assertEqual(self.serv.last_mailfrom, 'Jő') 1370 self.assertEqual(self.serv.last_rcpttos, ['János']) 1371 self.assertEqual(self.serv.last_message, m) 1372 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 1373 self.assertIn('SMTPUTF8', self.serv.last_mail_options) 1374 self.assertEqual(self.serv.last_rcpt_options, []) 1375 1376 def test_send_message_uses_smtputf8_if_addrs_non_ascii(self): 1377 msg = EmailMessage() 1378 msg['From'] = "Páolo <főo@bar.com>" 1379 msg['To'] = 'Dinsdale' 1380 msg['Subject'] = 'Nudge nudge, wink, wink \u1F609' 1381 # XXX I don't know why I need two \n's here, but this is an existing 1382 # bug (if it is one) and not a problem with the new functionality. 1383 msg.set_content("oh là là, know what I mean, know what I mean?\n\n") 1384 # XXX smtpd converts received /r/n to /n, so we can't easily test that 1385 # we are successfully sending /r/n :(. 1386 expected = textwrap.dedent("""\ 1387 From: Páolo <főo@bar.com> 1388 To: Dinsdale 1389 Subject: Nudge nudge, wink, wink \u1F609 1390 Content-Type: text/plain; charset="utf-8" 1391 Content-Transfer-Encoding: 8bit 1392 MIME-Version: 1.0 1393 1394 oh là là, know what I mean, know what I mean? 1395 """) 1396 smtp = smtplib.SMTP( 1397 HOST, self.port, local_hostname='localhost', timeout=3) 1398 self.addCleanup(smtp.close) 1399 self.assertEqual(smtp.send_message(msg), {}) 1400 self.assertEqual(self.serv.last_mailfrom, 'főo@bar.com') 1401 self.assertEqual(self.serv.last_rcpttos, ['Dinsdale']) 1402 self.assertEqual(self.serv.last_message.decode(), expected) 1403 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 1404 self.assertIn('SMTPUTF8', self.serv.last_mail_options) 1405 self.assertEqual(self.serv.last_rcpt_options, []) 1406 1407 1408EXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='') 1409 1410class SimSMTPAUTHInitialResponseChannel(SimSMTPChannel): 1411 def smtp_AUTH(self, arg): 1412 # RFC 4954's AUTH command allows for an optional initial-response. 1413 # Not all AUTH methods support this; some require a challenge. AUTH 1414 # PLAIN does those, so test that here. See issue #15014. 1415 args = arg.split() 1416 if args[0].lower() == 'plain': 1417 if len(args) == 2: 1418 # AUTH PLAIN <initial-response> with the response base 64 1419 # encoded. Hard code the expected response for the test. 1420 if args[1] == EXPECTED_RESPONSE: 1421 self.push('235 Ok') 1422 return 1423 self.push('571 Bad authentication') 1424 1425class SimSMTPAUTHInitialResponseServer(SimSMTPServer): 1426 channel_class = SimSMTPAUTHInitialResponseChannel 1427 1428 1429class SMTPAUTHInitialResponseSimTests(unittest.TestCase): 1430 def setUp(self): 1431 self.thread_key = threading_setup() 1432 self.real_getfqdn = socket.getfqdn 1433 socket.getfqdn = mock_socket.getfqdn 1434 self.serv_evt = threading.Event() 1435 self.client_evt = threading.Event() 1436 # Pick a random unused port by passing 0 for the port number 1437 self.serv = SimSMTPAUTHInitialResponseServer( 1438 (HOST, 0), ('nowhere', -1), decode_data=True) 1439 # Keep a note of what port was assigned 1440 self.port = self.serv.socket.getsockname()[1] 1441 serv_args = (self.serv, self.serv_evt, self.client_evt) 1442 self.thread = threading.Thread(target=debugging_server, args=serv_args) 1443 self.thread.start() 1444 1445 # wait until server thread has assigned a port number 1446 self.serv_evt.wait() 1447 self.serv_evt.clear() 1448 1449 def tearDown(self): 1450 socket.getfqdn = self.real_getfqdn 1451 # indicate that the client is finished 1452 self.client_evt.set() 1453 # wait for the server thread to terminate 1454 self.serv_evt.wait() 1455 join_thread(self.thread) 1456 del self.thread 1457 self.doCleanups() 1458 threading_cleanup(*self.thread_key) 1459 1460 def testAUTH_PLAIN_initial_response_login(self): 1461 self.serv.add_feature('AUTH PLAIN') 1462 smtp = smtplib.SMTP(HOST, self.port, 1463 local_hostname='localhost', timeout=15) 1464 smtp.login('psu', 'doesnotexist') 1465 smtp.close() 1466 1467 def testAUTH_PLAIN_initial_response_auth(self): 1468 self.serv.add_feature('AUTH PLAIN') 1469 smtp = smtplib.SMTP(HOST, self.port, 1470 local_hostname='localhost', timeout=15) 1471 smtp.user = 'psu' 1472 smtp.password = 'doesnotexist' 1473 code, response = smtp.auth('plain', smtp.auth_plain) 1474 smtp.close() 1475 self.assertEqual(code, 235) 1476 1477 1478if __name__ == '__main__': 1479 unittest.main() 1480