1import asyncore 2import base64 3import email.mime.text 4from email.message import EmailMessage 5from email.base64mime import body_encode as encode_base64 6import email.utils 7import hmac 8import socket 9import smtpd 10import smtplib 11import io 12import re 13import sys 14import time 15import select 16import errno 17import textwrap 18import threading 19 20import unittest 21from test import support, mock_socket 22from test.support import HOST, HOSTv4, HOSTv6 23from unittest.mock import Mock 24 25 26if sys.platform == 'darwin': 27 # select.poll returns a select.POLLHUP at the end of the tests 28 # on darwin, so just ignore it 29 def handle_expt(self): 30 pass 31 smtpd.SMTPChannel.handle_expt = handle_expt 32 33 34def server(evt, buf, serv): 35 serv.listen() 36 evt.set() 37 try: 38 conn, addr = serv.accept() 39 except socket.timeout: 40 pass 41 else: 42 n = 500 43 while buf and n > 0: 44 r, w, e = select.select([], [conn], []) 45 if w: 46 sent = conn.send(buf) 47 buf = buf[sent:] 48 49 n -= 1 50 51 conn.close() 52 finally: 53 serv.close() 54 evt.set() 55 56class GeneralTests(unittest.TestCase): 57 58 def setUp(self): 59 smtplib.socket = mock_socket 60 self.port = 25 61 62 def tearDown(self): 63 smtplib.socket = socket 64 65 # This method is no longer used but is retained for backward compatibility, 66 # so test to make sure it still works. 67 def testQuoteData(self): 68 teststr = "abc\n.jkl\rfoo\r\n..blue" 69 expected = "abc\r\n..jkl\r\nfoo\r\n...blue" 70 self.assertEqual(expected, smtplib.quotedata(teststr)) 71 72 def testBasic1(self): 73 mock_socket.reply_with(b"220 Hola mundo") 74 # connects 75 smtp = smtplib.SMTP(HOST, self.port) 76 smtp.close() 77 78 def testSourceAddress(self): 79 mock_socket.reply_with(b"220 Hola mundo") 80 # connects 81 smtp = smtplib.SMTP(HOST, self.port, 82 source_address=('127.0.0.1',19876)) 83 self.assertEqual(smtp.source_address, ('127.0.0.1', 19876)) 84 smtp.close() 85 86 def testBasic2(self): 87 mock_socket.reply_with(b"220 Hola mundo") 88 # connects, include port in host name 89 smtp = smtplib.SMTP("%s:%s" % (HOST, self.port)) 90 smtp.close() 91 92 def testLocalHostName(self): 93 mock_socket.reply_with(b"220 Hola mundo") 94 # check that supplied local_hostname is used 95 smtp = smtplib.SMTP(HOST, self.port, local_hostname="testhost") 96 self.assertEqual(smtp.local_hostname, "testhost") 97 smtp.close() 98 99 def testTimeoutDefault(self): 100 mock_socket.reply_with(b"220 Hola mundo") 101 self.assertIsNone(mock_socket.getdefaulttimeout()) 102 mock_socket.setdefaulttimeout(30) 103 self.assertEqual(mock_socket.getdefaulttimeout(), 30) 104 try: 105 smtp = smtplib.SMTP(HOST, self.port) 106 finally: 107 mock_socket.setdefaulttimeout(None) 108 self.assertEqual(smtp.sock.gettimeout(), 30) 109 smtp.close() 110 111 def testTimeoutNone(self): 112 mock_socket.reply_with(b"220 Hola mundo") 113 self.assertIsNone(socket.getdefaulttimeout()) 114 socket.setdefaulttimeout(30) 115 try: 116 smtp = smtplib.SMTP(HOST, self.port, timeout=None) 117 finally: 118 socket.setdefaulttimeout(None) 119 self.assertIsNone(smtp.sock.gettimeout()) 120 smtp.close() 121 122 def testTimeoutValue(self): 123 mock_socket.reply_with(b"220 Hola mundo") 124 smtp = smtplib.SMTP(HOST, self.port, timeout=30) 125 self.assertEqual(smtp.sock.gettimeout(), 30) 126 smtp.close() 127 128 def test_debuglevel(self): 129 mock_socket.reply_with(b"220 Hello world") 130 smtp = smtplib.SMTP() 131 smtp.set_debuglevel(1) 132 with support.captured_stderr() as stderr: 133 smtp.connect(HOST, self.port) 134 smtp.close() 135 expected = re.compile(r"^connect:", re.MULTILINE) 136 self.assertRegex(stderr.getvalue(), expected) 137 138 def test_debuglevel_2(self): 139 mock_socket.reply_with(b"220 Hello world") 140 smtp = smtplib.SMTP() 141 smtp.set_debuglevel(2) 142 with support.captured_stderr() as stderr: 143 smtp.connect(HOST, self.port) 144 smtp.close() 145 expected = re.compile(r"^\d{2}:\d{2}:\d{2}\.\d{6} connect: ", 146 re.MULTILINE) 147 self.assertRegex(stderr.getvalue(), expected) 148 149 150# Test server thread using the specified SMTP server class 151def debugging_server(serv, serv_evt, client_evt): 152 serv_evt.set() 153 154 try: 155 if hasattr(select, 'poll'): 156 poll_fun = asyncore.poll2 157 else: 158 poll_fun = asyncore.poll 159 160 n = 1000 161 while asyncore.socket_map and n > 0: 162 poll_fun(0.01, asyncore.socket_map) 163 164 # when the client conversation is finished, it will 165 # set client_evt, and it's then ok to kill the server 166 if client_evt.is_set(): 167 serv.close() 168 break 169 170 n -= 1 171 172 except socket.timeout: 173 pass 174 finally: 175 if not client_evt.is_set(): 176 # allow some time for the client to read the result 177 time.sleep(0.5) 178 serv.close() 179 asyncore.close_all() 180 serv_evt.set() 181 182MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n' 183MSG_END = '------------ END MESSAGE ------------\n' 184 185# NOTE: Some SMTP objects in the tests below are created with a non-default 186# local_hostname argument to the constructor, since (on some systems) the FQDN 187# lookup caused by the default local_hostname sometimes takes so long that the 188# test server times out, causing the test to fail. 189 190# Test behavior of smtpd.DebuggingServer 191class DebuggingServerTests(unittest.TestCase): 192 193 maxDiff = None 194 195 def setUp(self): 196 self.real_getfqdn = socket.getfqdn 197 socket.getfqdn = mock_socket.getfqdn 198 # temporarily replace sys.stdout to capture DebuggingServer output 199 self.old_stdout = sys.stdout 200 self.output = io.StringIO() 201 sys.stdout = self.output 202 203 self.serv_evt = threading.Event() 204 self.client_evt = threading.Event() 205 # Capture SMTPChannel debug output 206 self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM 207 smtpd.DEBUGSTREAM = io.StringIO() 208 # Pick a random unused port by passing 0 for the port number 209 self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1), 210 decode_data=True) 211 # Keep a note of what server host and port were assigned 212 self.host, self.port = self.serv.socket.getsockname()[:2] 213 serv_args = (self.serv, self.serv_evt, self.client_evt) 214 self.thread = threading.Thread(target=debugging_server, args=serv_args) 215 self.thread.start() 216 217 # wait until server thread has assigned a port number 218 self.serv_evt.wait() 219 self.serv_evt.clear() 220 221 def tearDown(self): 222 socket.getfqdn = self.real_getfqdn 223 # indicate that the client is finished 224 self.client_evt.set() 225 # wait for the server thread to terminate 226 self.serv_evt.wait() 227 self.thread.join() 228 # restore sys.stdout 229 sys.stdout = self.old_stdout 230 # restore DEBUGSTREAM 231 smtpd.DEBUGSTREAM.close() 232 smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM 233 234 def get_output_without_xpeer(self): 235 test_output = self.output.getvalue() 236 return re.sub(r'(.*?)^X-Peer:\s*\S+\n(.*)', r'\1\2', 237 test_output, flags=re.MULTILINE|re.DOTALL) 238 239 def testBasic(self): 240 # connect 241 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 242 smtp.quit() 243 244 def testSourceAddress(self): 245 # connect 246 src_port = support.find_unused_port() 247 try: 248 smtp = smtplib.SMTP(self.host, self.port, local_hostname='localhost', 249 timeout=3, source_address=(self.host, src_port)) 250 self.assertEqual(smtp.source_address, (self.host, src_port)) 251 self.assertEqual(smtp.local_hostname, 'localhost') 252 smtp.quit() 253 except OSError as e: 254 if e.errno == errno.EADDRINUSE: 255 self.skipTest("couldn't bind to source port %d" % src_port) 256 raise 257 258 def testNOOP(self): 259 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 260 expected = (250, b'OK') 261 self.assertEqual(smtp.noop(), expected) 262 smtp.quit() 263 264 def testRSET(self): 265 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 266 expected = (250, b'OK') 267 self.assertEqual(smtp.rset(), expected) 268 smtp.quit() 269 270 def testELHO(self): 271 # EHLO isn't implemented in DebuggingServer 272 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 273 expected = (250, b'\nSIZE 33554432\nHELP') 274 self.assertEqual(smtp.ehlo(), expected) 275 smtp.quit() 276 277 def testEXPNNotImplemented(self): 278 # EXPN isn't implemented in DebuggingServer 279 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 280 expected = (502, b'EXPN not implemented') 281 smtp.putcmd('EXPN') 282 self.assertEqual(smtp.getreply(), expected) 283 smtp.quit() 284 285 def test_issue43124_putcmd_escapes_newline(self): 286 # see: https://bugs.python.org/issue43124 287 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 288 timeout=10) # support.LOOPBACK_TIMEOUT in newer Pythons 289 self.addCleanup(smtp.close) 290 with self.assertRaises(ValueError) as exc: 291 smtp.putcmd('helo\nX-INJECTED') 292 self.assertIn("prohibited newline characters", str(exc.exception)) 293 smtp.quit() 294 295 def testVRFY(self): 296 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 297 expected = (252, b'Cannot VRFY user, but will accept message ' + \ 298 b'and attempt delivery') 299 self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected) 300 self.assertEqual(smtp.verify('nobody@nowhere.com'), expected) 301 smtp.quit() 302 303 def testSecondHELO(self): 304 # check that a second HELO returns a message that it's a duplicate 305 # (this behavior is specific to smtpd.SMTPChannel) 306 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 307 smtp.helo() 308 expected = (503, b'Duplicate HELO/EHLO') 309 self.assertEqual(smtp.helo(), expected) 310 smtp.quit() 311 312 def testHELP(self): 313 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 314 self.assertEqual(smtp.help(), b'Supported commands: EHLO HELO MAIL ' + \ 315 b'RCPT DATA RSET NOOP QUIT VRFY') 316 smtp.quit() 317 318 def testSend(self): 319 # connect and send mail 320 m = 'A test message' 321 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 322 smtp.sendmail('John', 'Sally', m) 323 # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor 324 # in asyncore. This sleep might help, but should really be fixed 325 # properly by using an Event variable. 326 time.sleep(0.01) 327 smtp.quit() 328 329 self.client_evt.set() 330 self.serv_evt.wait() 331 self.output.flush() 332 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 333 self.assertEqual(self.output.getvalue(), mexpect) 334 335 def testSendBinary(self): 336 m = b'A test message' 337 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 338 smtp.sendmail('John', 'Sally', m) 339 # XXX (see comment in testSend) 340 time.sleep(0.01) 341 smtp.quit() 342 343 self.client_evt.set() 344 self.serv_evt.wait() 345 self.output.flush() 346 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END) 347 self.assertEqual(self.output.getvalue(), mexpect) 348 349 def testSendNeedingDotQuote(self): 350 # Issue 12283 351 m = '.A test\n.mes.sage.' 352 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 353 smtp.sendmail('John', 'Sally', m) 354 # XXX (see comment in testSend) 355 time.sleep(0.01) 356 smtp.quit() 357 358 self.client_evt.set() 359 self.serv_evt.wait() 360 self.output.flush() 361 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 362 self.assertEqual(self.output.getvalue(), mexpect) 363 364 def test_issue43124_escape_localhostname(self): 365 # see: https://bugs.python.org/issue43124 366 # connect and send mail 367 m = 'wazzuuup\nlinetwo' 368 smtp = smtplib.SMTP(HOST, self.port, local_hostname='hi\nX-INJECTED', 369 timeout=10) # support.LOOPBACK_TIMEOUT in newer Pythons 370 self.addCleanup(smtp.close) 371 with self.assertRaises(ValueError) as exc: 372 smtp.sendmail("hi@me.com", "you@me.com", m) 373 self.assertIn( 374 "prohibited newline characters: ehlo hi\\nX-INJECTED", 375 str(exc.exception), 376 ) 377 # XXX (see comment in testSend) 378 time.sleep(0.01) 379 smtp.quit() 380 381 debugout = smtpd.DEBUGSTREAM.getvalue() 382 self.assertNotIn("X-INJECTED", debugout) 383 384 def test_issue43124_escape_options(self): 385 # see: https://bugs.python.org/issue43124 386 # connect and send mail 387 m = 'wazzuuup\nlinetwo' 388 smtp = smtplib.SMTP( 389 HOST, self.port, local_hostname='localhost', 390 timeout=10) # support.LOOPBACK_TIMEOUT in newer Pythons 391 392 self.addCleanup(smtp.close) 393 smtp.sendmail("hi@me.com", "you@me.com", m) 394 with self.assertRaises(ValueError) as exc: 395 smtp.mail("hi@me.com", ["X-OPTION\nX-INJECTED-1", "X-OPTION2\nX-INJECTED-2"]) 396 msg = str(exc.exception) 397 self.assertIn("prohibited newline characters", msg) 398 self.assertIn("X-OPTION\\nX-INJECTED-1 X-OPTION2\\nX-INJECTED-2", msg) 399 # XXX (see comment in testSend) 400 time.sleep(0.01) 401 smtp.quit() 402 403 debugout = smtpd.DEBUGSTREAM.getvalue() 404 self.assertNotIn("X-OPTION", debugout) 405 self.assertNotIn("X-OPTION2", debugout) 406 self.assertNotIn("X-INJECTED-1", debugout) 407 self.assertNotIn("X-INJECTED-2", debugout) 408 409 def testSendNullSender(self): 410 m = 'A test message' 411 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 412 smtp.sendmail('<>', 'Sally', m) 413 # XXX (see comment in testSend) 414 time.sleep(0.01) 415 smtp.quit() 416 417 self.client_evt.set() 418 self.serv_evt.wait() 419 self.output.flush() 420 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 421 self.assertEqual(self.output.getvalue(), mexpect) 422 debugout = smtpd.DEBUGSTREAM.getvalue() 423 sender = re.compile("^sender: <>$", re.MULTILINE) 424 self.assertRegex(debugout, sender) 425 426 def testSendMessage(self): 427 m = email.mime.text.MIMEText('A test message') 428 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 429 smtp.send_message(m, from_addr='John', to_addrs='Sally') 430 # XXX (see comment in testSend) 431 time.sleep(0.01) 432 smtp.quit() 433 434 self.client_evt.set() 435 self.serv_evt.wait() 436 self.output.flush() 437 # Remove the X-Peer header that DebuggingServer adds as figuring out 438 # exactly what IP address format is put there is not easy (and 439 # irrelevant to our test). Typically 127.0.0.1 or ::1, but it is 440 # not always the same as socket.gethostbyname(HOST). :( 441 test_output = self.get_output_without_xpeer() 442 del m['X-Peer'] 443 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 444 self.assertEqual(test_output, mexpect) 445 446 def testSendMessageWithAddresses(self): 447 m = email.mime.text.MIMEText('A test message') 448 m['From'] = 'foo@bar.com' 449 m['To'] = 'John' 450 m['CC'] = 'Sally, Fred' 451 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>' 452 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 453 smtp.send_message(m) 454 # XXX (see comment in testSend) 455 time.sleep(0.01) 456 smtp.quit() 457 # make sure the Bcc header is still in the message. 458 self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" ' 459 '<warped@silly.walks.com>') 460 461 self.client_evt.set() 462 self.serv_evt.wait() 463 self.output.flush() 464 # Remove the X-Peer header that DebuggingServer adds. 465 test_output = self.get_output_without_xpeer() 466 del m['X-Peer'] 467 # The Bcc header should not be transmitted. 468 del m['Bcc'] 469 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 470 self.assertEqual(test_output, mexpect) 471 debugout = smtpd.DEBUGSTREAM.getvalue() 472 sender = re.compile("^sender: foo@bar.com$", re.MULTILINE) 473 self.assertRegex(debugout, sender) 474 for addr in ('John', 'Sally', 'Fred', 'root@localhost', 475 'warped@silly.walks.com'): 476 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 477 re.MULTILINE) 478 self.assertRegex(debugout, to_addr) 479 480 def testSendMessageWithSomeAddresses(self): 481 # Make sure nothing breaks if not all of the three 'to' headers exist 482 m = email.mime.text.MIMEText('A test message') 483 m['From'] = 'foo@bar.com' 484 m['To'] = 'John, Dinsdale' 485 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 486 smtp.send_message(m) 487 # XXX (see comment in testSend) 488 time.sleep(0.01) 489 smtp.quit() 490 491 self.client_evt.set() 492 self.serv_evt.wait() 493 self.output.flush() 494 # Remove the X-Peer header that DebuggingServer adds. 495 test_output = self.get_output_without_xpeer() 496 del m['X-Peer'] 497 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 498 self.assertEqual(test_output, mexpect) 499 debugout = smtpd.DEBUGSTREAM.getvalue() 500 sender = re.compile("^sender: foo@bar.com$", re.MULTILINE) 501 self.assertRegex(debugout, sender) 502 for addr in ('John', 'Dinsdale'): 503 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 504 re.MULTILINE) 505 self.assertRegex(debugout, to_addr) 506 507 def testSendMessageWithSpecifiedAddresses(self): 508 # Make sure addresses specified in call override those in message. 509 m = email.mime.text.MIMEText('A test message') 510 m['From'] = 'foo@bar.com' 511 m['To'] = 'John, Dinsdale' 512 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 513 smtp.send_message(m, from_addr='joe@example.com', to_addrs='foo@example.net') 514 # XXX (see comment in testSend) 515 time.sleep(0.01) 516 smtp.quit() 517 518 self.client_evt.set() 519 self.serv_evt.wait() 520 self.output.flush() 521 # Remove the X-Peer header that DebuggingServer adds. 522 test_output = self.get_output_without_xpeer() 523 del m['X-Peer'] 524 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 525 self.assertEqual(test_output, mexpect) 526 debugout = smtpd.DEBUGSTREAM.getvalue() 527 sender = re.compile("^sender: joe@example.com$", re.MULTILINE) 528 self.assertRegex(debugout, sender) 529 for addr in ('John', 'Dinsdale'): 530 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 531 re.MULTILINE) 532 self.assertNotRegex(debugout, to_addr) 533 recip = re.compile(r"^recips: .*'foo@example.net'.*$", re.MULTILINE) 534 self.assertRegex(debugout, recip) 535 536 def testSendMessageWithMultipleFrom(self): 537 # Sender overrides To 538 m = email.mime.text.MIMEText('A test message') 539 m['From'] = 'Bernard, Bianca' 540 m['Sender'] = 'the_rescuers@Rescue-Aid-Society.com' 541 m['To'] = 'John, Dinsdale' 542 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 543 smtp.send_message(m) 544 # XXX (see comment in testSend) 545 time.sleep(0.01) 546 smtp.quit() 547 548 self.client_evt.set() 549 self.serv_evt.wait() 550 self.output.flush() 551 # Remove the X-Peer header that DebuggingServer adds. 552 test_output = self.get_output_without_xpeer() 553 del m['X-Peer'] 554 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 555 self.assertEqual(test_output, mexpect) 556 debugout = smtpd.DEBUGSTREAM.getvalue() 557 sender = re.compile("^sender: the_rescuers@Rescue-Aid-Society.com$", re.MULTILINE) 558 self.assertRegex(debugout, sender) 559 for addr in ('John', 'Dinsdale'): 560 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 561 re.MULTILINE) 562 self.assertRegex(debugout, to_addr) 563 564 def testSendMessageResent(self): 565 m = email.mime.text.MIMEText('A test message') 566 m['From'] = 'foo@bar.com' 567 m['To'] = 'John' 568 m['CC'] = 'Sally, Fred' 569 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>' 570 m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000' 571 m['Resent-From'] = 'holy@grail.net' 572 m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff' 573 m['Resent-Bcc'] = 'doe@losthope.net' 574 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 575 smtp.send_message(m) 576 # XXX (see comment in testSend) 577 time.sleep(0.01) 578 smtp.quit() 579 580 self.client_evt.set() 581 self.serv_evt.wait() 582 self.output.flush() 583 # The Resent-Bcc headers are deleted before serialization. 584 del m['Bcc'] 585 del m['Resent-Bcc'] 586 # Remove the X-Peer header that DebuggingServer adds. 587 test_output = self.get_output_without_xpeer() 588 del m['X-Peer'] 589 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 590 self.assertEqual(test_output, mexpect) 591 debugout = smtpd.DEBUGSTREAM.getvalue() 592 sender = re.compile("^sender: holy@grail.net$", re.MULTILINE) 593 self.assertRegex(debugout, sender) 594 for addr in ('my_mom@great.cooker.com', 'Jeff', 'doe@losthope.net'): 595 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 596 re.MULTILINE) 597 self.assertRegex(debugout, to_addr) 598 599 def testSendMessageMultipleResentRaises(self): 600 m = email.mime.text.MIMEText('A test message') 601 m['From'] = 'foo@bar.com' 602 m['To'] = 'John' 603 m['CC'] = 'Sally, Fred' 604 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>' 605 m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000' 606 m['Resent-From'] = 'holy@grail.net' 607 m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff' 608 m['Resent-Bcc'] = 'doe@losthope.net' 609 m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000' 610 m['Resent-To'] = 'holy@grail.net' 611 m['Resent-From'] = 'Martha <my_mom@great.cooker.com>, Jeff' 612 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) 613 with self.assertRaises(ValueError): 614 smtp.send_message(m) 615 smtp.close() 616 617class NonConnectingTests(unittest.TestCase): 618 619 def testNotConnected(self): 620 # Test various operations on an unconnected SMTP object that 621 # should raise exceptions (at present the attempt in SMTP.send 622 # to reference the nonexistent 'sock' attribute of the SMTP object 623 # causes an AttributeError) 624 smtp = smtplib.SMTP() 625 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo) 626 self.assertRaises(smtplib.SMTPServerDisconnected, 627 smtp.send, 'test msg') 628 629 def testNonnumericPort(self): 630 # check that non-numeric port raises OSError 631 self.assertRaises(OSError, smtplib.SMTP, 632 "localhost", "bogus") 633 self.assertRaises(OSError, smtplib.SMTP, 634 "localhost:bogus") 635 636 637class DefaultArgumentsTests(unittest.TestCase): 638 639 def setUp(self): 640 self.msg = EmailMessage() 641 self.msg['From'] = 'Páolo <főo@bar.com>' 642 self.smtp = smtplib.SMTP() 643 self.smtp.ehlo = Mock(return_value=(200, 'OK')) 644 self.smtp.has_extn, self.smtp.sendmail = Mock(), Mock() 645 646 def testSendMessage(self): 647 expected_mail_options = ('SMTPUTF8', 'BODY=8BITMIME') 648 self.smtp.send_message(self.msg) 649 self.smtp.send_message(self.msg) 650 self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3], 651 expected_mail_options) 652 self.assertEqual(self.smtp.sendmail.call_args_list[1][0][3], 653 expected_mail_options) 654 655 def testSendMessageWithMailOptions(self): 656 mail_options = ['STARTTLS'] 657 expected_mail_options = ('STARTTLS', 'SMTPUTF8', 'BODY=8BITMIME') 658 self.smtp.send_message(self.msg, None, None, mail_options) 659 self.assertEqual(mail_options, ['STARTTLS']) 660 self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3], 661 expected_mail_options) 662 663 664# test response of client to a non-successful HELO message 665class BadHELOServerTests(unittest.TestCase): 666 667 def setUp(self): 668 smtplib.socket = mock_socket 669 mock_socket.reply_with(b"199 no hello for you!") 670 self.old_stdout = sys.stdout 671 self.output = io.StringIO() 672 sys.stdout = self.output 673 self.port = 25 674 675 def tearDown(self): 676 smtplib.socket = socket 677 sys.stdout = self.old_stdout 678 679 def testFailingHELO(self): 680 self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP, 681 HOST, self.port, 'localhost', 3) 682 683 684class TooLongLineTests(unittest.TestCase): 685 respdata = b'250 OK' + (b'.' * smtplib._MAXLINE * 2) + b'\n' 686 687 def setUp(self): 688 self.old_stdout = sys.stdout 689 self.output = io.StringIO() 690 sys.stdout = self.output 691 692 self.evt = threading.Event() 693 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 694 self.sock.settimeout(15) 695 self.port = support.bind_port(self.sock) 696 servargs = (self.evt, self.respdata, self.sock) 697 thread = threading.Thread(target=server, args=servargs) 698 thread.start() 699 self.addCleanup(thread.join) 700 self.evt.wait() 701 self.evt.clear() 702 703 def tearDown(self): 704 self.evt.wait() 705 sys.stdout = self.old_stdout 706 707 def testLineTooLong(self): 708 self.assertRaises(smtplib.SMTPResponseException, smtplib.SMTP, 709 HOST, self.port, 'localhost', 3) 710 711 712sim_users = {'Mr.A@somewhere.com':'John A', 713 'Ms.B@xn--fo-fka.com':'Sally B', 714 'Mrs.C@somewhereesle.com':'Ruth C', 715 } 716 717sim_auth = ('Mr.A@somewhere.com', 'somepassword') 718sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn' 719 'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=') 720sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'], 721 'list-2':['Ms.B@xn--fo-fka.com',], 722 } 723 724# Simulated SMTP channel & server 725class ResponseException(Exception): pass 726class SimSMTPChannel(smtpd.SMTPChannel): 727 728 quit_response = None 729 mail_response = None 730 rcpt_response = None 731 data_response = None 732 rcpt_count = 0 733 rset_count = 0 734 disconnect = 0 735 AUTH = 99 # Add protocol state to enable auth testing. 736 authenticated_user = None 737 738 def __init__(self, extra_features, *args, **kw): 739 self._extrafeatures = ''.join( 740 [ "250-{0}\r\n".format(x) for x in extra_features ]) 741 super(SimSMTPChannel, self).__init__(*args, **kw) 742 743 # AUTH related stuff. It would be nice if support for this were in smtpd. 744 def found_terminator(self): 745 if self.smtp_state == self.AUTH: 746 line = self._emptystring.join(self.received_lines) 747 print('Data:', repr(line), file=smtpd.DEBUGSTREAM) 748 self.received_lines = [] 749 try: 750 self.auth_object(line) 751 except ResponseException as e: 752 self.smtp_state = self.COMMAND 753 self.push('%s %s' % (e.smtp_code, e.smtp_error)) 754 return 755 super().found_terminator() 756 757 758 def smtp_AUTH(self, arg): 759 if not self.seen_greeting: 760 self.push('503 Error: send EHLO first') 761 return 762 if not self.extended_smtp or 'AUTH' not in self._extrafeatures: 763 self.push('500 Error: command "AUTH" not recognized') 764 return 765 if self.authenticated_user is not None: 766 self.push( 767 '503 Bad sequence of commands: already authenticated') 768 return 769 args = arg.split() 770 if len(args) not in [1, 2]: 771 self.push('501 Syntax: AUTH <mechanism> [initial-response]') 772 return 773 auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_') 774 try: 775 self.auth_object = getattr(self, auth_object_name) 776 except AttributeError: 777 self.push('504 Command parameter not implemented: unsupported ' 778 ' authentication mechanism {!r}'.format(auth_object_name)) 779 return 780 self.smtp_state = self.AUTH 781 self.auth_object(args[1] if len(args) == 2 else None) 782 783 def _authenticated(self, user, valid): 784 if valid: 785 self.authenticated_user = user 786 self.push('235 Authentication Succeeded') 787 else: 788 self.push('535 Authentication credentials invalid') 789 self.smtp_state = self.COMMAND 790 791 def _decode_base64(self, string): 792 return base64.decodebytes(string.encode('ascii')).decode('utf-8') 793 794 def _auth_plain(self, arg=None): 795 if arg is None: 796 self.push('334 ') 797 else: 798 logpass = self._decode_base64(arg) 799 try: 800 *_, user, password = logpass.split('\0') 801 except ValueError as e: 802 self.push('535 Splitting response {!r} into user and password' 803 ' failed: {}'.format(logpass, e)) 804 return 805 self._authenticated(user, password == sim_auth[1]) 806 807 def _auth_login(self, arg=None): 808 if arg is None: 809 # base64 encoded 'Username:' 810 self.push('334 VXNlcm5hbWU6') 811 elif not hasattr(self, '_auth_login_user'): 812 self._auth_login_user = self._decode_base64(arg) 813 # base64 encoded 'Password:' 814 self.push('334 UGFzc3dvcmQ6') 815 else: 816 password = self._decode_base64(arg) 817 self._authenticated(self._auth_login_user, password == sim_auth[1]) 818 del self._auth_login_user 819 820 def _auth_cram_md5(self, arg=None): 821 if arg is None: 822 self.push('334 {}'.format(sim_cram_md5_challenge)) 823 else: 824 logpass = self._decode_base64(arg) 825 try: 826 user, hashed_pass = logpass.split() 827 except ValueError as e: 828 self.push('535 Splitting response {!r} into user and password ' 829 'failed: {}'.format(logpass, e)) 830 return False 831 valid_hashed_pass = hmac.HMAC( 832 sim_auth[1].encode('ascii'), 833 self._decode_base64(sim_cram_md5_challenge).encode('ascii'), 834 'md5').hexdigest() 835 self._authenticated(user, hashed_pass == valid_hashed_pass) 836 # end AUTH related stuff. 837 838 def smtp_EHLO(self, arg): 839 resp = ('250-testhost\r\n' 840 '250-EXPN\r\n' 841 '250-SIZE 20000000\r\n' 842 '250-STARTTLS\r\n' 843 '250-DELIVERBY\r\n') 844 resp = resp + self._extrafeatures + '250 HELP' 845 self.push(resp) 846 self.seen_greeting = arg 847 self.extended_smtp = True 848 849 def smtp_VRFY(self, arg): 850 # For max compatibility smtplib should be sending the raw address. 851 if arg in sim_users: 852 self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg))) 853 else: 854 self.push('550 No such user: %s' % arg) 855 856 def smtp_EXPN(self, arg): 857 list_name = arg.lower() 858 if list_name in sim_lists: 859 user_list = sim_lists[list_name] 860 for n, user_email in enumerate(user_list): 861 quoted_addr = smtplib.quoteaddr(user_email) 862 if n < len(user_list) - 1: 863 self.push('250-%s %s' % (sim_users[user_email], quoted_addr)) 864 else: 865 self.push('250 %s %s' % (sim_users[user_email], quoted_addr)) 866 else: 867 self.push('550 No access for you!') 868 869 def smtp_QUIT(self, arg): 870 if self.quit_response is None: 871 super(SimSMTPChannel, self).smtp_QUIT(arg) 872 else: 873 self.push(self.quit_response) 874 self.close_when_done() 875 876 def smtp_MAIL(self, arg): 877 if self.mail_response is None: 878 super().smtp_MAIL(arg) 879 else: 880 self.push(self.mail_response) 881 if self.disconnect: 882 self.close_when_done() 883 884 def smtp_RCPT(self, arg): 885 if self.rcpt_response is None: 886 super().smtp_RCPT(arg) 887 return 888 self.rcpt_count += 1 889 self.push(self.rcpt_response[self.rcpt_count-1]) 890 891 def smtp_RSET(self, arg): 892 self.rset_count += 1 893 super().smtp_RSET(arg) 894 895 def smtp_DATA(self, arg): 896 if self.data_response is None: 897 super().smtp_DATA(arg) 898 else: 899 self.push(self.data_response) 900 901 def handle_error(self): 902 raise 903 904 905class SimSMTPServer(smtpd.SMTPServer): 906 907 channel_class = SimSMTPChannel 908 909 def __init__(self, *args, **kw): 910 self._extra_features = [] 911 self._addresses = {} 912 smtpd.SMTPServer.__init__(self, *args, **kw) 913 914 def handle_accepted(self, conn, addr): 915 self._SMTPchannel = self.channel_class( 916 self._extra_features, self, conn, addr, 917 decode_data=self._decode_data) 918 919 def process_message(self, peer, mailfrom, rcpttos, data): 920 self._addresses['from'] = mailfrom 921 self._addresses['tos'] = rcpttos 922 923 def add_feature(self, feature): 924 self._extra_features.append(feature) 925 926 def handle_error(self): 927 raise 928 929 930# Test various SMTP & ESMTP commands/behaviors that require a simulated server 931# (i.e., something with more features than DebuggingServer) 932class SMTPSimTests(unittest.TestCase): 933 934 def setUp(self): 935 self.real_getfqdn = socket.getfqdn 936 socket.getfqdn = mock_socket.getfqdn 937 self.serv_evt = threading.Event() 938 self.client_evt = threading.Event() 939 # Pick a random unused port by passing 0 for the port number 940 self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1), decode_data=True) 941 # Keep a note of what port was assigned 942 self.port = self.serv.socket.getsockname()[1] 943 serv_args = (self.serv, self.serv_evt, self.client_evt) 944 self.thread = threading.Thread(target=debugging_server, args=serv_args) 945 self.thread.start() 946 947 # wait until server thread has assigned a port number 948 self.serv_evt.wait() 949 self.serv_evt.clear() 950 951 def tearDown(self): 952 socket.getfqdn = self.real_getfqdn 953 # indicate that the client is finished 954 self.client_evt.set() 955 # wait for the server thread to terminate 956 self.serv_evt.wait() 957 self.thread.join() 958 959 def testBasic(self): 960 # smoke test 961 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 962 smtp.quit() 963 964 def testEHLO(self): 965 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 966 967 # no features should be present before the EHLO 968 self.assertEqual(smtp.esmtp_features, {}) 969 970 # features expected from the test server 971 expected_features = {'expn':'', 972 'size': '20000000', 973 'starttls': '', 974 'deliverby': '', 975 'help': '', 976 } 977 978 smtp.ehlo() 979 self.assertEqual(smtp.esmtp_features, expected_features) 980 for k in expected_features: 981 self.assertTrue(smtp.has_extn(k)) 982 self.assertFalse(smtp.has_extn('unsupported-feature')) 983 smtp.quit() 984 985 def testVRFY(self): 986 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 987 988 for addr_spec, name in sim_users.items(): 989 expected_known = (250, bytes('%s %s' % 990 (name, smtplib.quoteaddr(addr_spec)), 991 "ascii")) 992 self.assertEqual(smtp.vrfy(addr_spec), expected_known) 993 994 u = 'nobody@nowhere.com' 995 expected_unknown = (550, ('No such user: %s' % u).encode('ascii')) 996 self.assertEqual(smtp.vrfy(u), expected_unknown) 997 smtp.quit() 998 999 def testEXPN(self): 1000 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1001 1002 for listname, members in sim_lists.items(): 1003 users = [] 1004 for m in members: 1005 users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m))) 1006 expected_known = (250, bytes('\n'.join(users), "ascii")) 1007 self.assertEqual(smtp.expn(listname), expected_known) 1008 1009 u = 'PSU-Members-List' 1010 expected_unknown = (550, b'No access for you!') 1011 self.assertEqual(smtp.expn(u), expected_unknown) 1012 smtp.quit() 1013 1014 def testAUTH_PLAIN(self): 1015 self.serv.add_feature("AUTH PLAIN") 1016 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1017 resp = smtp.login(sim_auth[0], sim_auth[1]) 1018 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1019 smtp.close() 1020 1021 def testAUTH_LOGIN(self): 1022 self.serv.add_feature("AUTH LOGIN") 1023 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1024 resp = smtp.login(sim_auth[0], sim_auth[1]) 1025 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1026 smtp.close() 1027 1028 def testAUTH_CRAM_MD5(self): 1029 self.serv.add_feature("AUTH CRAM-MD5") 1030 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1031 resp = smtp.login(sim_auth[0], sim_auth[1]) 1032 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1033 smtp.close() 1034 1035 def testAUTH_multiple(self): 1036 # Test that multiple authentication methods are tried. 1037 self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5") 1038 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1039 resp = smtp.login(sim_auth[0], sim_auth[1]) 1040 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1041 smtp.close() 1042 1043 def test_auth_function(self): 1044 supported = {'CRAM-MD5', 'PLAIN', 'LOGIN'} 1045 for mechanism in supported: 1046 self.serv.add_feature("AUTH {}".format(mechanism)) 1047 for mechanism in supported: 1048 with self.subTest(mechanism=mechanism): 1049 smtp = smtplib.SMTP(HOST, self.port, 1050 local_hostname='localhost', timeout=15) 1051 smtp.ehlo('foo') 1052 smtp.user, smtp.password = sim_auth[0], sim_auth[1] 1053 method = 'auth_' + mechanism.lower().replace('-', '_') 1054 resp = smtp.auth(mechanism, getattr(smtp, method)) 1055 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1056 smtp.close() 1057 1058 def test_quit_resets_greeting(self): 1059 smtp = smtplib.SMTP(HOST, self.port, 1060 local_hostname='localhost', 1061 timeout=15) 1062 code, message = smtp.ehlo() 1063 self.assertEqual(code, 250) 1064 self.assertIn('size', smtp.esmtp_features) 1065 smtp.quit() 1066 self.assertNotIn('size', smtp.esmtp_features) 1067 smtp.connect(HOST, self.port) 1068 self.assertNotIn('size', smtp.esmtp_features) 1069 smtp.ehlo_or_helo_if_needed() 1070 self.assertIn('size', smtp.esmtp_features) 1071 smtp.quit() 1072 1073 def test_with_statement(self): 1074 with smtplib.SMTP(HOST, self.port) as smtp: 1075 code, message = smtp.noop() 1076 self.assertEqual(code, 250) 1077 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo') 1078 with smtplib.SMTP(HOST, self.port) as smtp: 1079 smtp.close() 1080 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo') 1081 1082 def test_with_statement_QUIT_failure(self): 1083 with self.assertRaises(smtplib.SMTPResponseException) as error: 1084 with smtplib.SMTP(HOST, self.port) as smtp: 1085 smtp.noop() 1086 self.serv._SMTPchannel.quit_response = '421 QUIT FAILED' 1087 self.assertEqual(error.exception.smtp_code, 421) 1088 self.assertEqual(error.exception.smtp_error, b'QUIT FAILED') 1089 1090 #TODO: add tests for correct AUTH method fallback now that the 1091 #test infrastructure can support it. 1092 1093 # Issue 17498: make sure _rset does not raise SMTPServerDisconnected exception 1094 def test__rest_from_mail_cmd(self): 1095 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1096 smtp.noop() 1097 self.serv._SMTPchannel.mail_response = '451 Requested action aborted' 1098 self.serv._SMTPchannel.disconnect = True 1099 with self.assertRaises(smtplib.SMTPSenderRefused): 1100 smtp.sendmail('John', 'Sally', 'test message') 1101 self.assertIsNone(smtp.sock) 1102 1103 # Issue 5713: make sure close, not rset, is called if we get a 421 error 1104 def test_421_from_mail_cmd(self): 1105 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1106 smtp.noop() 1107 self.serv._SMTPchannel.mail_response = '421 closing connection' 1108 with self.assertRaises(smtplib.SMTPSenderRefused): 1109 smtp.sendmail('John', 'Sally', 'test message') 1110 self.assertIsNone(smtp.sock) 1111 self.assertEqual(self.serv._SMTPchannel.rset_count, 0) 1112 1113 def test_421_from_rcpt_cmd(self): 1114 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1115 smtp.noop() 1116 self.serv._SMTPchannel.rcpt_response = ['250 accepted', '421 closing'] 1117 with self.assertRaises(smtplib.SMTPRecipientsRefused) as r: 1118 smtp.sendmail('John', ['Sally', 'Frank', 'George'], 'test message') 1119 self.assertIsNone(smtp.sock) 1120 self.assertEqual(self.serv._SMTPchannel.rset_count, 0) 1121 self.assertDictEqual(r.exception.args[0], {'Frank': (421, b'closing')}) 1122 1123 def test_421_from_data_cmd(self): 1124 class MySimSMTPChannel(SimSMTPChannel): 1125 def found_terminator(self): 1126 if self.smtp_state == self.DATA: 1127 self.push('421 closing') 1128 else: 1129 super().found_terminator() 1130 self.serv.channel_class = MySimSMTPChannel 1131 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) 1132 smtp.noop() 1133 with self.assertRaises(smtplib.SMTPDataError): 1134 smtp.sendmail('John@foo.org', ['Sally@foo.org'], 'test message') 1135 self.assertIsNone(smtp.sock) 1136 self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0) 1137 1138 def test_smtputf8_NotSupportedError_if_no_server_support(self): 1139 smtp = smtplib.SMTP( 1140 HOST, self.port, local_hostname='localhost', timeout=3) 1141 self.addCleanup(smtp.close) 1142 smtp.ehlo() 1143 self.assertTrue(smtp.does_esmtp) 1144 self.assertFalse(smtp.has_extn('smtputf8')) 1145 self.assertRaises( 1146 smtplib.SMTPNotSupportedError, 1147 smtp.sendmail, 1148 'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8']) 1149 self.assertRaises( 1150 smtplib.SMTPNotSupportedError, 1151 smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8']) 1152 1153 def test_send_unicode_without_SMTPUTF8(self): 1154 smtp = smtplib.SMTP( 1155 HOST, self.port, local_hostname='localhost', timeout=3) 1156 self.addCleanup(smtp.close) 1157 self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Böb', '') 1158 self.assertRaises(UnicodeEncodeError, smtp.mail, 'Älice') 1159 1160 def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self): 1161 # This test is located here and not in the SMTPUTF8SimTests 1162 # class because it needs a "regular" SMTP server to work 1163 msg = EmailMessage() 1164 msg['From'] = "Páolo <főo@bar.com>" 1165 msg['To'] = 'Dinsdale' 1166 msg['Subject'] = 'Nudge nudge, wink, wink \u1F609' 1167 smtp = smtplib.SMTP( 1168 HOST, self.port, local_hostname='localhost', timeout=3) 1169 self.addCleanup(smtp.close) 1170 with self.assertRaises(smtplib.SMTPNotSupportedError): 1171 smtp.send_message(msg) 1172 1173 def test_name_field_not_included_in_envelop_addresses(self): 1174 smtp = smtplib.SMTP( 1175 HOST, self.port, local_hostname='localhost', timeout=3 1176 ) 1177 self.addCleanup(smtp.close) 1178 1179 message = EmailMessage() 1180 message['From'] = email.utils.formataddr(('Michaël', 'michael@example.com')) 1181 message['To'] = email.utils.formataddr(('René', 'rene@example.com')) 1182 1183 self.assertDictEqual(smtp.send_message(message), {}) 1184 1185 self.assertEqual(self.serv._addresses['from'], 'michael@example.com') 1186 self.assertEqual(self.serv._addresses['tos'], ['rene@example.com']) 1187 1188 1189class SimSMTPUTF8Server(SimSMTPServer): 1190 1191 def __init__(self, *args, **kw): 1192 # The base SMTP server turns these on automatically, but our test 1193 # server is set up to munge the EHLO response, so we need to provide 1194 # them as well. And yes, the call is to SMTPServer not SimSMTPServer. 1195 self._extra_features = ['SMTPUTF8', '8BITMIME'] 1196 smtpd.SMTPServer.__init__(self, *args, **kw) 1197 1198 def handle_accepted(self, conn, addr): 1199 self._SMTPchannel = self.channel_class( 1200 self._extra_features, self, conn, addr, 1201 decode_data=self._decode_data, 1202 enable_SMTPUTF8=self.enable_SMTPUTF8, 1203 ) 1204 1205 def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None, 1206 rcpt_options=None): 1207 self.last_peer = peer 1208 self.last_mailfrom = mailfrom 1209 self.last_rcpttos = rcpttos 1210 self.last_message = data 1211 self.last_mail_options = mail_options 1212 self.last_rcpt_options = rcpt_options 1213 1214 1215class SMTPUTF8SimTests(unittest.TestCase): 1216 1217 maxDiff = None 1218 1219 def setUp(self): 1220 self.real_getfqdn = socket.getfqdn 1221 socket.getfqdn = mock_socket.getfqdn 1222 self.serv_evt = threading.Event() 1223 self.client_evt = threading.Event() 1224 # Pick a random unused port by passing 0 for the port number 1225 self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1), 1226 decode_data=False, 1227 enable_SMTPUTF8=True) 1228 # Keep a note of what port was assigned 1229 self.port = self.serv.socket.getsockname()[1] 1230 serv_args = (self.serv, self.serv_evt, self.client_evt) 1231 self.thread = threading.Thread(target=debugging_server, args=serv_args) 1232 self.thread.start() 1233 1234 # wait until server thread has assigned a port number 1235 self.serv_evt.wait() 1236 self.serv_evt.clear() 1237 1238 def tearDown(self): 1239 socket.getfqdn = self.real_getfqdn 1240 # indicate that the client is finished 1241 self.client_evt.set() 1242 # wait for the server thread to terminate 1243 self.serv_evt.wait() 1244 self.thread.join() 1245 1246 def test_test_server_supports_extensions(self): 1247 smtp = smtplib.SMTP( 1248 HOST, self.port, local_hostname='localhost', timeout=3) 1249 self.addCleanup(smtp.close) 1250 smtp.ehlo() 1251 self.assertTrue(smtp.does_esmtp) 1252 self.assertTrue(smtp.has_extn('smtputf8')) 1253 1254 def test_send_unicode_with_SMTPUTF8_via_sendmail(self): 1255 m = '¡a test message containing unicode!'.encode('utf-8') 1256 smtp = smtplib.SMTP( 1257 HOST, self.port, local_hostname='localhost', timeout=3) 1258 self.addCleanup(smtp.close) 1259 smtp.sendmail('Jőhn', 'Sálly', m, 1260 mail_options=['BODY=8BITMIME', 'SMTPUTF8']) 1261 self.assertEqual(self.serv.last_mailfrom, 'Jőhn') 1262 self.assertEqual(self.serv.last_rcpttos, ['Sálly']) 1263 self.assertEqual(self.serv.last_message, m) 1264 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 1265 self.assertIn('SMTPUTF8', self.serv.last_mail_options) 1266 self.assertEqual(self.serv.last_rcpt_options, []) 1267 1268 def test_send_unicode_with_SMTPUTF8_via_low_level_API(self): 1269 m = '¡a test message containing unicode!'.encode('utf-8') 1270 smtp = smtplib.SMTP( 1271 HOST, self.port, local_hostname='localhost', timeout=3) 1272 self.addCleanup(smtp.close) 1273 smtp.ehlo() 1274 self.assertEqual( 1275 smtp.mail('Jő', options=['BODY=8BITMIME', 'SMTPUTF8']), 1276 (250, b'OK')) 1277 self.assertEqual(smtp.rcpt('János'), (250, b'OK')) 1278 self.assertEqual(smtp.data(m), (250, b'OK')) 1279 self.assertEqual(self.serv.last_mailfrom, 'Jő') 1280 self.assertEqual(self.serv.last_rcpttos, ['János']) 1281 self.assertEqual(self.serv.last_message, m) 1282 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 1283 self.assertIn('SMTPUTF8', self.serv.last_mail_options) 1284 self.assertEqual(self.serv.last_rcpt_options, []) 1285 1286 def test_send_message_uses_smtputf8_if_addrs_non_ascii(self): 1287 msg = EmailMessage() 1288 msg['From'] = "Páolo <főo@bar.com>" 1289 msg['To'] = 'Dinsdale' 1290 msg['Subject'] = 'Nudge nudge, wink, wink \u1F609' 1291 # XXX I don't know why I need two \n's here, but this is an existing 1292 # bug (if it is one) and not a problem with the new functionality. 1293 msg.set_content("oh là là, know what I mean, know what I mean?\n\n") 1294 # XXX smtpd converts received /r/n to /n, so we can't easily test that 1295 # we are successfully sending /r/n :(. 1296 expected = textwrap.dedent("""\ 1297 From: Páolo <főo@bar.com> 1298 To: Dinsdale 1299 Subject: Nudge nudge, wink, wink \u1F609 1300 Content-Type: text/plain; charset="utf-8" 1301 Content-Transfer-Encoding: 8bit 1302 MIME-Version: 1.0 1303 1304 oh là là, know what I mean, know what I mean? 1305 """) 1306 smtp = smtplib.SMTP( 1307 HOST, self.port, local_hostname='localhost', timeout=3) 1308 self.addCleanup(smtp.close) 1309 self.assertEqual(smtp.send_message(msg), {}) 1310 self.assertEqual(self.serv.last_mailfrom, 'főo@bar.com') 1311 self.assertEqual(self.serv.last_rcpttos, ['Dinsdale']) 1312 self.assertEqual(self.serv.last_message.decode(), expected) 1313 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 1314 self.assertIn('SMTPUTF8', self.serv.last_mail_options) 1315 self.assertEqual(self.serv.last_rcpt_options, []) 1316 1317 1318EXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='') 1319 1320class SimSMTPAUTHInitialResponseChannel(SimSMTPChannel): 1321 def smtp_AUTH(self, arg): 1322 # RFC 4954's AUTH command allows for an optional initial-response. 1323 # Not all AUTH methods support this; some require a challenge. AUTH 1324 # PLAIN does those, so test that here. See issue #15014. 1325 args = arg.split() 1326 if args[0].lower() == 'plain': 1327 if len(args) == 2: 1328 # AUTH PLAIN <initial-response> with the response base 64 1329 # encoded. Hard code the expected response for the test. 1330 if args[1] == EXPECTED_RESPONSE: 1331 self.push('235 Ok') 1332 return 1333 self.push('571 Bad authentication') 1334 1335class SimSMTPAUTHInitialResponseServer(SimSMTPServer): 1336 channel_class = SimSMTPAUTHInitialResponseChannel 1337 1338 1339class SMTPAUTHInitialResponseSimTests(unittest.TestCase): 1340 def setUp(self): 1341 self.real_getfqdn = socket.getfqdn 1342 socket.getfqdn = mock_socket.getfqdn 1343 self.serv_evt = threading.Event() 1344 self.client_evt = threading.Event() 1345 # Pick a random unused port by passing 0 for the port number 1346 self.serv = SimSMTPAUTHInitialResponseServer( 1347 (HOST, 0), ('nowhere', -1), decode_data=True) 1348 # Keep a note of what port was assigned 1349 self.port = self.serv.socket.getsockname()[1] 1350 serv_args = (self.serv, self.serv_evt, self.client_evt) 1351 self.thread = threading.Thread(target=debugging_server, args=serv_args) 1352 self.thread.start() 1353 1354 # wait until server thread has assigned a port number 1355 self.serv_evt.wait() 1356 self.serv_evt.clear() 1357 1358 def tearDown(self): 1359 socket.getfqdn = self.real_getfqdn 1360 # indicate that the client is finished 1361 self.client_evt.set() 1362 # wait for the server thread to terminate 1363 self.serv_evt.wait() 1364 self.thread.join() 1365 1366 def testAUTH_PLAIN_initial_response_login(self): 1367 self.serv.add_feature('AUTH PLAIN') 1368 smtp = smtplib.SMTP(HOST, self.port, 1369 local_hostname='localhost', timeout=15) 1370 smtp.login('psu', 'doesnotexist') 1371 smtp.close() 1372 1373 def testAUTH_PLAIN_initial_response_auth(self): 1374 self.serv.add_feature('AUTH PLAIN') 1375 smtp = smtplib.SMTP(HOST, self.port, 1376 local_hostname='localhost', timeout=15) 1377 smtp.user = 'psu' 1378 smtp.password = 'doesnotexist' 1379 code, response = smtp.auth('plain', smtp.auth_plain) 1380 smtp.close() 1381 self.assertEqual(code, 235) 1382 1383 1384if __name__ == '__main__': 1385 unittest.main() 1386