1import unittest 2import textwrap 3from test import support, mock_socket 4from test.support import socket_helper 5import socket 6import io 7import smtpd 8import asyncore 9 10 11class DummyServer(smtpd.SMTPServer): 12 def __init__(self, *args, **kwargs): 13 smtpd.SMTPServer.__init__(self, *args, **kwargs) 14 self.messages = [] 15 if self._decode_data: 16 self.return_status = 'return status' 17 else: 18 self.return_status = b'return status' 19 20 def process_message(self, peer, mailfrom, rcpttos, data, **kw): 21 self.messages.append((peer, mailfrom, rcpttos, data)) 22 if data == self.return_status: 23 return '250 Okish' 24 if 'mail_options' in kw and 'SMTPUTF8' in kw['mail_options']: 25 return '250 SMTPUTF8 message okish' 26 27 28class DummyDispatcherBroken(Exception): 29 pass 30 31 32class BrokenDummyServer(DummyServer): 33 def listen(self, num): 34 raise DummyDispatcherBroken() 35 36 37class SMTPDServerTest(unittest.TestCase): 38 def setUp(self): 39 smtpd.socket = asyncore.socket = mock_socket 40 41 def test_process_message_unimplemented(self): 42 server = smtpd.SMTPServer((socket_helper.HOST, 0), ('b', 0), 43 decode_data=True) 44 conn, addr = server.accept() 45 channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True) 46 47 def write_line(line): 48 channel.socket.queue_recv(line) 49 channel.handle_read() 50 51 write_line(b'HELO example') 52 write_line(b'MAIL From:eggs@example') 53 write_line(b'RCPT To:spam@example') 54 write_line(b'DATA') 55 self.assertRaises(NotImplementedError, write_line, b'spam\r\n.\r\n') 56 57 def test_decode_data_and_enable_SMTPUTF8_raises(self): 58 self.assertRaises( 59 ValueError, 60 smtpd.SMTPServer, 61 (socket_helper.HOST, 0), 62 ('b', 0), 63 enable_SMTPUTF8=True, 64 decode_data=True) 65 66 def tearDown(self): 67 asyncore.close_all() 68 asyncore.socket = smtpd.socket = socket 69 70 71class DebuggingServerTest(unittest.TestCase): 72 73 def setUp(self): 74 smtpd.socket = asyncore.socket = mock_socket 75 76 def send_data(self, channel, data, enable_SMTPUTF8=False): 77 def write_line(line): 78 channel.socket.queue_recv(line) 79 channel.handle_read() 80 write_line(b'EHLO example') 81 if enable_SMTPUTF8: 82 write_line(b'MAIL From:eggs@example BODY=8BITMIME SMTPUTF8') 83 else: 84 write_line(b'MAIL From:eggs@example') 85 write_line(b'RCPT To:spam@example') 86 write_line(b'DATA') 87 write_line(data) 88 write_line(b'.') 89 90 def test_process_message_with_decode_data_true(self): 91 server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0), 92 decode_data=True) 93 conn, addr = server.accept() 94 channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True) 95 with support.captured_stdout() as s: 96 self.send_data(channel, b'From: test\n\nhello\n') 97 stdout = s.getvalue() 98 self.assertEqual(stdout, textwrap.dedent("""\ 99 ---------- MESSAGE FOLLOWS ---------- 100 From: test 101 X-Peer: peer-address 102 103 hello 104 ------------ END MESSAGE ------------ 105 """)) 106 107 def test_process_message_with_decode_data_false(self): 108 server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0)) 109 conn, addr = server.accept() 110 channel = smtpd.SMTPChannel(server, conn, addr) 111 with support.captured_stdout() as s: 112 self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n') 113 stdout = s.getvalue() 114 self.assertEqual(stdout, textwrap.dedent("""\ 115 ---------- MESSAGE FOLLOWS ---------- 116 b'From: test' 117 b'X-Peer: peer-address' 118 b'' 119 b'h\\xc3\\xa9llo\\xff' 120 ------------ END MESSAGE ------------ 121 """)) 122 123 def test_process_message_with_enable_SMTPUTF8_true(self): 124 server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0), 125 enable_SMTPUTF8=True) 126 conn, addr = server.accept() 127 channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True) 128 with support.captured_stdout() as s: 129 self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n') 130 stdout = s.getvalue() 131 self.assertEqual(stdout, textwrap.dedent("""\ 132 ---------- MESSAGE FOLLOWS ---------- 133 b'From: test' 134 b'X-Peer: peer-address' 135 b'' 136 b'h\\xc3\\xa9llo\\xff' 137 ------------ END MESSAGE ------------ 138 """)) 139 140 def test_process_SMTPUTF8_message_with_enable_SMTPUTF8_true(self): 141 server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0), 142 enable_SMTPUTF8=True) 143 conn, addr = server.accept() 144 channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True) 145 with support.captured_stdout() as s: 146 self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n', 147 enable_SMTPUTF8=True) 148 stdout = s.getvalue() 149 self.assertEqual(stdout, textwrap.dedent("""\ 150 ---------- MESSAGE FOLLOWS ---------- 151 mail options: ['BODY=8BITMIME', 'SMTPUTF8'] 152 b'From: test' 153 b'X-Peer: peer-address' 154 b'' 155 b'h\\xc3\\xa9llo\\xff' 156 ------------ END MESSAGE ------------ 157 """)) 158 159 def tearDown(self): 160 asyncore.close_all() 161 asyncore.socket = smtpd.socket = socket 162 163 164class TestFamilyDetection(unittest.TestCase): 165 def setUp(self): 166 smtpd.socket = asyncore.socket = mock_socket 167 168 def tearDown(self): 169 asyncore.close_all() 170 asyncore.socket = smtpd.socket = socket 171 172 @unittest.skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled") 173 def test_socket_uses_IPv6(self): 174 server = smtpd.SMTPServer((socket_helper.HOSTv6, 0), (socket_helper.HOSTv4, 0)) 175 self.assertEqual(server.socket.family, socket.AF_INET6) 176 177 def test_socket_uses_IPv4(self): 178 server = smtpd.SMTPServer((socket_helper.HOSTv4, 0), (socket_helper.HOSTv6, 0)) 179 self.assertEqual(server.socket.family, socket.AF_INET) 180 181 182class TestRcptOptionParsing(unittest.TestCase): 183 error_response = (b'555 RCPT TO parameters not recognized or not ' 184 b'implemented\r\n') 185 186 def setUp(self): 187 smtpd.socket = asyncore.socket = mock_socket 188 self.old_debugstream = smtpd.DEBUGSTREAM 189 self.debug = smtpd.DEBUGSTREAM = io.StringIO() 190 191 def tearDown(self): 192 asyncore.close_all() 193 asyncore.socket = smtpd.socket = socket 194 smtpd.DEBUGSTREAM = self.old_debugstream 195 196 def write_line(self, channel, line): 197 channel.socket.queue_recv(line) 198 channel.handle_read() 199 200 def test_params_rejected(self): 201 server = DummyServer((socket_helper.HOST, 0), ('b', 0)) 202 conn, addr = server.accept() 203 channel = smtpd.SMTPChannel(server, conn, addr) 204 self.write_line(channel, b'EHLO example') 205 self.write_line(channel, b'MAIL from: <foo@example.com> size=20') 206 self.write_line(channel, b'RCPT to: <foo@example.com> foo=bar') 207 self.assertEqual(channel.socket.last, self.error_response) 208 209 def test_nothing_accepted(self): 210 server = DummyServer((socket_helper.HOST, 0), ('b', 0)) 211 conn, addr = server.accept() 212 channel = smtpd.SMTPChannel(server, conn, addr) 213 self.write_line(channel, b'EHLO example') 214 self.write_line(channel, b'MAIL from: <foo@example.com> size=20') 215 self.write_line(channel, b'RCPT to: <foo@example.com>') 216 self.assertEqual(channel.socket.last, b'250 OK\r\n') 217 218 219class TestMailOptionParsing(unittest.TestCase): 220 error_response = (b'555 MAIL FROM parameters not recognized or not ' 221 b'implemented\r\n') 222 223 def setUp(self): 224 smtpd.socket = asyncore.socket = mock_socket 225 self.old_debugstream = smtpd.DEBUGSTREAM 226 self.debug = smtpd.DEBUGSTREAM = io.StringIO() 227 228 def tearDown(self): 229 asyncore.close_all() 230 asyncore.socket = smtpd.socket = socket 231 smtpd.DEBUGSTREAM = self.old_debugstream 232 233 def write_line(self, channel, line): 234 channel.socket.queue_recv(line) 235 channel.handle_read() 236 237 def test_with_decode_data_true(self): 238 server = DummyServer((socket_helper.HOST, 0), ('b', 0), decode_data=True) 239 conn, addr = server.accept() 240 channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True) 241 self.write_line(channel, b'EHLO example') 242 for line in [ 243 b'MAIL from: <foo@example.com> size=20 SMTPUTF8', 244 b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=8BITMIME', 245 b'MAIL from: <foo@example.com> size=20 BODY=UNKNOWN', 246 b'MAIL from: <foo@example.com> size=20 body=8bitmime', 247 ]: 248 self.write_line(channel, line) 249 self.assertEqual(channel.socket.last, self.error_response) 250 self.write_line(channel, b'MAIL from: <foo@example.com> size=20') 251 self.assertEqual(channel.socket.last, b'250 OK\r\n') 252 253 def test_with_decode_data_false(self): 254 server = DummyServer((socket_helper.HOST, 0), ('b', 0)) 255 conn, addr = server.accept() 256 channel = smtpd.SMTPChannel(server, conn, addr) 257 self.write_line(channel, b'EHLO example') 258 for line in [ 259 b'MAIL from: <foo@example.com> size=20 SMTPUTF8', 260 b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=8BITMIME', 261 ]: 262 self.write_line(channel, line) 263 self.assertEqual(channel.socket.last, self.error_response) 264 self.write_line( 265 channel, 266 b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=UNKNOWN') 267 self.assertEqual( 268 channel.socket.last, 269 b'501 Error: BODY can only be one of 7BIT, 8BITMIME\r\n') 270 self.write_line( 271 channel, b'MAIL from: <foo@example.com> size=20 body=8bitmime') 272 self.assertEqual(channel.socket.last, b'250 OK\r\n') 273 274 def test_with_enable_smtputf8_true(self): 275 server = DummyServer((socket_helper.HOST, 0), ('b', 0), enable_SMTPUTF8=True) 276 conn, addr = server.accept() 277 channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True) 278 self.write_line(channel, b'EHLO example') 279 self.write_line( 280 channel, 281 b'MAIL from: <foo@example.com> size=20 body=8bitmime smtputf8') 282 self.assertEqual(channel.socket.last, b'250 OK\r\n') 283 284 285class SMTPDChannelTest(unittest.TestCase): 286 def setUp(self): 287 smtpd.socket = asyncore.socket = mock_socket 288 self.old_debugstream = smtpd.DEBUGSTREAM 289 self.debug = smtpd.DEBUGSTREAM = io.StringIO() 290 self.server = DummyServer((socket_helper.HOST, 0), ('b', 0), 291 decode_data=True) 292 conn, addr = self.server.accept() 293 self.channel = smtpd.SMTPChannel(self.server, conn, addr, 294 decode_data=True) 295 296 def tearDown(self): 297 asyncore.close_all() 298 asyncore.socket = smtpd.socket = socket 299 smtpd.DEBUGSTREAM = self.old_debugstream 300 301 def write_line(self, line): 302 self.channel.socket.queue_recv(line) 303 self.channel.handle_read() 304 305 def test_broken_connect(self): 306 self.assertRaises( 307 DummyDispatcherBroken, BrokenDummyServer, 308 (socket_helper.HOST, 0), ('b', 0), decode_data=True) 309 310 def test_decode_data_and_enable_SMTPUTF8_raises(self): 311 self.assertRaises( 312 ValueError, smtpd.SMTPChannel, 313 self.server, self.channel.conn, self.channel.addr, 314 enable_SMTPUTF8=True, decode_data=True) 315 316 def test_server_accept(self): 317 self.server.handle_accept() 318 319 def test_missing_data(self): 320 self.write_line(b'') 321 self.assertEqual(self.channel.socket.last, 322 b'500 Error: bad syntax\r\n') 323 324 def test_EHLO(self): 325 self.write_line(b'EHLO example') 326 self.assertEqual(self.channel.socket.last, b'250 HELP\r\n') 327 328 def test_EHLO_bad_syntax(self): 329 self.write_line(b'EHLO') 330 self.assertEqual(self.channel.socket.last, 331 b'501 Syntax: EHLO hostname\r\n') 332 333 def test_EHLO_duplicate(self): 334 self.write_line(b'EHLO example') 335 self.write_line(b'EHLO example') 336 self.assertEqual(self.channel.socket.last, 337 b'503 Duplicate HELO/EHLO\r\n') 338 339 def test_EHLO_HELO_duplicate(self): 340 self.write_line(b'EHLO example') 341 self.write_line(b'HELO example') 342 self.assertEqual(self.channel.socket.last, 343 b'503 Duplicate HELO/EHLO\r\n') 344 345 def test_HELO(self): 346 name = smtpd.socket.getfqdn() 347 self.write_line(b'HELO example') 348 self.assertEqual(self.channel.socket.last, 349 '250 {}\r\n'.format(name).encode('ascii')) 350 351 def test_HELO_EHLO_duplicate(self): 352 self.write_line(b'HELO example') 353 self.write_line(b'EHLO example') 354 self.assertEqual(self.channel.socket.last, 355 b'503 Duplicate HELO/EHLO\r\n') 356 357 def test_HELP(self): 358 self.write_line(b'HELP') 359 self.assertEqual(self.channel.socket.last, 360 b'250 Supported commands: EHLO HELO MAIL RCPT ' + \ 361 b'DATA RSET NOOP QUIT VRFY\r\n') 362 363 def test_HELP_command(self): 364 self.write_line(b'HELP MAIL') 365 self.assertEqual(self.channel.socket.last, 366 b'250 Syntax: MAIL FROM: <address>\r\n') 367 368 def test_HELP_command_unknown(self): 369 self.write_line(b'HELP SPAM') 370 self.assertEqual(self.channel.socket.last, 371 b'501 Supported commands: EHLO HELO MAIL RCPT ' + \ 372 b'DATA RSET NOOP QUIT VRFY\r\n') 373 374 def test_HELO_bad_syntax(self): 375 self.write_line(b'HELO') 376 self.assertEqual(self.channel.socket.last, 377 b'501 Syntax: HELO hostname\r\n') 378 379 def test_HELO_duplicate(self): 380 self.write_line(b'HELO example') 381 self.write_line(b'HELO example') 382 self.assertEqual(self.channel.socket.last, 383 b'503 Duplicate HELO/EHLO\r\n') 384 385 def test_HELO_parameter_rejected_when_extensions_not_enabled(self): 386 self.extended_smtp = False 387 self.write_line(b'HELO example') 388 self.write_line(b'MAIL from:<foo@example.com> SIZE=1234') 389 self.assertEqual(self.channel.socket.last, 390 b'501 Syntax: MAIL FROM: <address>\r\n') 391 392 def test_MAIL_allows_space_after_colon(self): 393 self.write_line(b'HELO example') 394 self.write_line(b'MAIL from: <foo@example.com>') 395 self.assertEqual(self.channel.socket.last, 396 b'250 OK\r\n') 397 398 def test_extended_MAIL_allows_space_after_colon(self): 399 self.write_line(b'EHLO example') 400 self.write_line(b'MAIL from: <foo@example.com> size=20') 401 self.assertEqual(self.channel.socket.last, 402 b'250 OK\r\n') 403 404 def test_NOOP(self): 405 self.write_line(b'NOOP') 406 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 407 408 def test_HELO_NOOP(self): 409 self.write_line(b'HELO example') 410 self.write_line(b'NOOP') 411 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 412 413 def test_NOOP_bad_syntax(self): 414 self.write_line(b'NOOP hi') 415 self.assertEqual(self.channel.socket.last, 416 b'501 Syntax: NOOP\r\n') 417 418 def test_QUIT(self): 419 self.write_line(b'QUIT') 420 self.assertEqual(self.channel.socket.last, b'221 Bye\r\n') 421 422 def test_HELO_QUIT(self): 423 self.write_line(b'HELO example') 424 self.write_line(b'QUIT') 425 self.assertEqual(self.channel.socket.last, b'221 Bye\r\n') 426 427 def test_QUIT_arg_ignored(self): 428 self.write_line(b'QUIT bye bye') 429 self.assertEqual(self.channel.socket.last, b'221 Bye\r\n') 430 431 def test_bad_state(self): 432 self.channel.smtp_state = 'BAD STATE' 433 self.write_line(b'HELO example') 434 self.assertEqual(self.channel.socket.last, 435 b'451 Internal confusion\r\n') 436 437 def test_command_too_long(self): 438 self.write_line(b'HELO example') 439 self.write_line(b'MAIL from: ' + 440 b'a' * self.channel.command_size_limit + 441 b'@example') 442 self.assertEqual(self.channel.socket.last, 443 b'500 Error: line too long\r\n') 444 445 def test_MAIL_command_limit_extended_with_SIZE(self): 446 self.write_line(b'EHLO example') 447 fill_len = self.channel.command_size_limit - len('MAIL from:<@example>') 448 self.write_line(b'MAIL from:<' + 449 b'a' * fill_len + 450 b'@example> SIZE=1234') 451 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 452 453 self.write_line(b'MAIL from:<' + 454 b'a' * (fill_len + 26) + 455 b'@example> SIZE=1234') 456 self.assertEqual(self.channel.socket.last, 457 b'500 Error: line too long\r\n') 458 459 def test_MAIL_command_rejects_SMTPUTF8_by_default(self): 460 self.write_line(b'EHLO example') 461 self.write_line( 462 b'MAIL from: <naive@example.com> BODY=8BITMIME SMTPUTF8') 463 self.assertEqual(self.channel.socket.last[0:1], b'5') 464 465 def test_data_longer_than_default_data_size_limit(self): 466 # Hack the default so we don't have to generate so much data. 467 self.channel.data_size_limit = 1048 468 self.write_line(b'HELO example') 469 self.write_line(b'MAIL From:eggs@example') 470 self.write_line(b'RCPT To:spam@example') 471 self.write_line(b'DATA') 472 self.write_line(b'A' * self.channel.data_size_limit + 473 b'A\r\n.') 474 self.assertEqual(self.channel.socket.last, 475 b'552 Error: Too much mail data\r\n') 476 477 def test_MAIL_size_parameter(self): 478 self.write_line(b'EHLO example') 479 self.write_line(b'MAIL FROM:<eggs@example> SIZE=512') 480 self.assertEqual(self.channel.socket.last, 481 b'250 OK\r\n') 482 483 def test_MAIL_invalid_size_parameter(self): 484 self.write_line(b'EHLO example') 485 self.write_line(b'MAIL FROM:<eggs@example> SIZE=invalid') 486 self.assertEqual(self.channel.socket.last, 487 b'501 Syntax: MAIL FROM: <address> [SP <mail-parameters>]\r\n') 488 489 def test_MAIL_RCPT_unknown_parameters(self): 490 self.write_line(b'EHLO example') 491 self.write_line(b'MAIL FROM:<eggs@example> ham=green') 492 self.assertEqual(self.channel.socket.last, 493 b'555 MAIL FROM parameters not recognized or not implemented\r\n') 494 495 self.write_line(b'MAIL FROM:<eggs@example>') 496 self.write_line(b'RCPT TO:<eggs@example> ham=green') 497 self.assertEqual(self.channel.socket.last, 498 b'555 RCPT TO parameters not recognized or not implemented\r\n') 499 500 def test_MAIL_size_parameter_larger_than_default_data_size_limit(self): 501 self.channel.data_size_limit = 1048 502 self.write_line(b'EHLO example') 503 self.write_line(b'MAIL FROM:<eggs@example> SIZE=2096') 504 self.assertEqual(self.channel.socket.last, 505 b'552 Error: message size exceeds fixed maximum message size\r\n') 506 507 def test_need_MAIL(self): 508 self.write_line(b'HELO example') 509 self.write_line(b'RCPT to:spam@example') 510 self.assertEqual(self.channel.socket.last, 511 b'503 Error: need MAIL command\r\n') 512 513 def test_MAIL_syntax_HELO(self): 514 self.write_line(b'HELO example') 515 self.write_line(b'MAIL from eggs@example') 516 self.assertEqual(self.channel.socket.last, 517 b'501 Syntax: MAIL FROM: <address>\r\n') 518 519 def test_MAIL_syntax_EHLO(self): 520 self.write_line(b'EHLO example') 521 self.write_line(b'MAIL from eggs@example') 522 self.assertEqual(self.channel.socket.last, 523 b'501 Syntax: MAIL FROM: <address> [SP <mail-parameters>]\r\n') 524 525 def test_MAIL_missing_address(self): 526 self.write_line(b'HELO example') 527 self.write_line(b'MAIL from:') 528 self.assertEqual(self.channel.socket.last, 529 b'501 Syntax: MAIL FROM: <address>\r\n') 530 531 def test_MAIL_chevrons(self): 532 self.write_line(b'HELO example') 533 self.write_line(b'MAIL from:<eggs@example>') 534 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 535 536 def test_MAIL_empty_chevrons(self): 537 self.write_line(b'EHLO example') 538 self.write_line(b'MAIL from:<>') 539 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 540 541 def test_MAIL_quoted_localpart(self): 542 self.write_line(b'EHLO example') 543 self.write_line(b'MAIL from: <"Fred Blogs"@example.com>') 544 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 545 self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com') 546 547 def test_MAIL_quoted_localpart_no_angles(self): 548 self.write_line(b'EHLO example') 549 self.write_line(b'MAIL from: "Fred Blogs"@example.com') 550 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 551 self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com') 552 553 def test_MAIL_quoted_localpart_with_size(self): 554 self.write_line(b'EHLO example') 555 self.write_line(b'MAIL from: <"Fred Blogs"@example.com> SIZE=1000') 556 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 557 self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com') 558 559 def test_MAIL_quoted_localpart_with_size_no_angles(self): 560 self.write_line(b'EHLO example') 561 self.write_line(b'MAIL from: "Fred Blogs"@example.com SIZE=1000') 562 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 563 self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com') 564 565 def test_nested_MAIL(self): 566 self.write_line(b'HELO example') 567 self.write_line(b'MAIL from:eggs@example') 568 self.write_line(b'MAIL from:spam@example') 569 self.assertEqual(self.channel.socket.last, 570 b'503 Error: nested MAIL command\r\n') 571 572 def test_VRFY(self): 573 self.write_line(b'VRFY eggs@example') 574 self.assertEqual(self.channel.socket.last, 575 b'252 Cannot VRFY user, but will accept message and attempt ' + \ 576 b'delivery\r\n') 577 578 def test_VRFY_syntax(self): 579 self.write_line(b'VRFY') 580 self.assertEqual(self.channel.socket.last, 581 b'501 Syntax: VRFY <address>\r\n') 582 583 def test_EXPN_not_implemented(self): 584 self.write_line(b'EXPN') 585 self.assertEqual(self.channel.socket.last, 586 b'502 EXPN not implemented\r\n') 587 588 def test_no_HELO_MAIL(self): 589 self.write_line(b'MAIL from:<foo@example.com>') 590 self.assertEqual(self.channel.socket.last, 591 b'503 Error: send HELO first\r\n') 592 593 def test_need_RCPT(self): 594 self.write_line(b'HELO example') 595 self.write_line(b'MAIL From:eggs@example') 596 self.write_line(b'DATA') 597 self.assertEqual(self.channel.socket.last, 598 b'503 Error: need RCPT command\r\n') 599 600 def test_RCPT_syntax_HELO(self): 601 self.write_line(b'HELO example') 602 self.write_line(b'MAIL From: eggs@example') 603 self.write_line(b'RCPT to eggs@example') 604 self.assertEqual(self.channel.socket.last, 605 b'501 Syntax: RCPT TO: <address>\r\n') 606 607 def test_RCPT_syntax_EHLO(self): 608 self.write_line(b'EHLO example') 609 self.write_line(b'MAIL From: eggs@example') 610 self.write_line(b'RCPT to eggs@example') 611 self.assertEqual(self.channel.socket.last, 612 b'501 Syntax: RCPT TO: <address> [SP <mail-parameters>]\r\n') 613 614 def test_RCPT_lowercase_to_OK(self): 615 self.write_line(b'HELO example') 616 self.write_line(b'MAIL From: eggs@example') 617 self.write_line(b'RCPT to: <eggs@example>') 618 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 619 620 def test_no_HELO_RCPT(self): 621 self.write_line(b'RCPT to eggs@example') 622 self.assertEqual(self.channel.socket.last, 623 b'503 Error: send HELO first\r\n') 624 625 def test_data_dialog(self): 626 self.write_line(b'HELO example') 627 self.write_line(b'MAIL From:eggs@example') 628 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 629 self.write_line(b'RCPT To:spam@example') 630 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 631 632 self.write_line(b'DATA') 633 self.assertEqual(self.channel.socket.last, 634 b'354 End data with <CR><LF>.<CR><LF>\r\n') 635 self.write_line(b'data\r\nmore\r\n.') 636 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 637 self.assertEqual(self.server.messages, 638 [(('peer-address', 'peer-port'), 639 'eggs@example', 640 ['spam@example'], 641 'data\nmore')]) 642 643 def test_DATA_syntax(self): 644 self.write_line(b'HELO example') 645 self.write_line(b'MAIL From:eggs@example') 646 self.write_line(b'RCPT To:spam@example') 647 self.write_line(b'DATA spam') 648 self.assertEqual(self.channel.socket.last, b'501 Syntax: DATA\r\n') 649 650 def test_no_HELO_DATA(self): 651 self.write_line(b'DATA spam') 652 self.assertEqual(self.channel.socket.last, 653 b'503 Error: send HELO first\r\n') 654 655 def test_data_transparency_section_4_5_2(self): 656 self.write_line(b'HELO example') 657 self.write_line(b'MAIL From:eggs@example') 658 self.write_line(b'RCPT To:spam@example') 659 self.write_line(b'DATA') 660 self.write_line(b'..\r\n.\r\n') 661 self.assertEqual(self.channel.received_data, '.') 662 663 def test_multiple_RCPT(self): 664 self.write_line(b'HELO example') 665 self.write_line(b'MAIL From:eggs@example') 666 self.write_line(b'RCPT To:spam@example') 667 self.write_line(b'RCPT To:ham@example') 668 self.write_line(b'DATA') 669 self.write_line(b'data\r\n.') 670 self.assertEqual(self.server.messages, 671 [(('peer-address', 'peer-port'), 672 'eggs@example', 673 ['spam@example','ham@example'], 674 'data')]) 675 676 def test_manual_status(self): 677 # checks that the Channel is able to return a custom status message 678 self.write_line(b'HELO example') 679 self.write_line(b'MAIL From:eggs@example') 680 self.write_line(b'RCPT To:spam@example') 681 self.write_line(b'DATA') 682 self.write_line(b'return status\r\n.') 683 self.assertEqual(self.channel.socket.last, b'250 Okish\r\n') 684 685 def test_RSET(self): 686 self.write_line(b'HELO example') 687 self.write_line(b'MAIL From:eggs@example') 688 self.write_line(b'RCPT To:spam@example') 689 self.write_line(b'RSET') 690 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 691 self.write_line(b'MAIL From:foo@example') 692 self.write_line(b'RCPT To:eggs@example') 693 self.write_line(b'DATA') 694 self.write_line(b'data\r\n.') 695 self.assertEqual(self.server.messages, 696 [(('peer-address', 'peer-port'), 697 'foo@example', 698 ['eggs@example'], 699 'data')]) 700 701 def test_HELO_RSET(self): 702 self.write_line(b'HELO example') 703 self.write_line(b'RSET') 704 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 705 706 def test_RSET_syntax(self): 707 self.write_line(b'RSET hi') 708 self.assertEqual(self.channel.socket.last, b'501 Syntax: RSET\r\n') 709 710 def test_unknown_command(self): 711 self.write_line(b'UNKNOWN_CMD') 712 self.assertEqual(self.channel.socket.last, 713 b'500 Error: command "UNKNOWN_CMD" not ' + \ 714 b'recognized\r\n') 715 716 def test_attribute_deprecations(self): 717 with support.check_warnings(('', DeprecationWarning)): 718 spam = self.channel._SMTPChannel__server 719 with support.check_warnings(('', DeprecationWarning)): 720 self.channel._SMTPChannel__server = 'spam' 721 with support.check_warnings(('', DeprecationWarning)): 722 spam = self.channel._SMTPChannel__line 723 with support.check_warnings(('', DeprecationWarning)): 724 self.channel._SMTPChannel__line = 'spam' 725 with support.check_warnings(('', DeprecationWarning)): 726 spam = self.channel._SMTPChannel__state 727 with support.check_warnings(('', DeprecationWarning)): 728 self.channel._SMTPChannel__state = 'spam' 729 with support.check_warnings(('', DeprecationWarning)): 730 spam = self.channel._SMTPChannel__greeting 731 with support.check_warnings(('', DeprecationWarning)): 732 self.channel._SMTPChannel__greeting = 'spam' 733 with support.check_warnings(('', DeprecationWarning)): 734 spam = self.channel._SMTPChannel__mailfrom 735 with support.check_warnings(('', DeprecationWarning)): 736 self.channel._SMTPChannel__mailfrom = 'spam' 737 with support.check_warnings(('', DeprecationWarning)): 738 spam = self.channel._SMTPChannel__rcpttos 739 with support.check_warnings(('', DeprecationWarning)): 740 self.channel._SMTPChannel__rcpttos = 'spam' 741 with support.check_warnings(('', DeprecationWarning)): 742 spam = self.channel._SMTPChannel__data 743 with support.check_warnings(('', DeprecationWarning)): 744 self.channel._SMTPChannel__data = 'spam' 745 with support.check_warnings(('', DeprecationWarning)): 746 spam = self.channel._SMTPChannel__fqdn 747 with support.check_warnings(('', DeprecationWarning)): 748 self.channel._SMTPChannel__fqdn = 'spam' 749 with support.check_warnings(('', DeprecationWarning)): 750 spam = self.channel._SMTPChannel__peer 751 with support.check_warnings(('', DeprecationWarning)): 752 self.channel._SMTPChannel__peer = 'spam' 753 with support.check_warnings(('', DeprecationWarning)): 754 spam = self.channel._SMTPChannel__conn 755 with support.check_warnings(('', DeprecationWarning)): 756 self.channel._SMTPChannel__conn = 'spam' 757 with support.check_warnings(('', DeprecationWarning)): 758 spam = self.channel._SMTPChannel__addr 759 with support.check_warnings(('', DeprecationWarning)): 760 self.channel._SMTPChannel__addr = 'spam' 761 762@unittest.skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled") 763class SMTPDChannelIPv6Test(SMTPDChannelTest): 764 def setUp(self): 765 smtpd.socket = asyncore.socket = mock_socket 766 self.old_debugstream = smtpd.DEBUGSTREAM 767 self.debug = smtpd.DEBUGSTREAM = io.StringIO() 768 self.server = DummyServer((socket_helper.HOSTv6, 0), ('b', 0), 769 decode_data=True) 770 conn, addr = self.server.accept() 771 self.channel = smtpd.SMTPChannel(self.server, conn, addr, 772 decode_data=True) 773 774class SMTPDChannelWithDataSizeLimitTest(unittest.TestCase): 775 776 def setUp(self): 777 smtpd.socket = asyncore.socket = mock_socket 778 self.old_debugstream = smtpd.DEBUGSTREAM 779 self.debug = smtpd.DEBUGSTREAM = io.StringIO() 780 self.server = DummyServer((socket_helper.HOST, 0), ('b', 0), 781 decode_data=True) 782 conn, addr = self.server.accept() 783 # Set DATA size limit to 32 bytes for easy testing 784 self.channel = smtpd.SMTPChannel(self.server, conn, addr, 32, 785 decode_data=True) 786 787 def tearDown(self): 788 asyncore.close_all() 789 asyncore.socket = smtpd.socket = socket 790 smtpd.DEBUGSTREAM = self.old_debugstream 791 792 def write_line(self, line): 793 self.channel.socket.queue_recv(line) 794 self.channel.handle_read() 795 796 def test_data_limit_dialog(self): 797 self.write_line(b'HELO example') 798 self.write_line(b'MAIL From:eggs@example') 799 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 800 self.write_line(b'RCPT To:spam@example') 801 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 802 803 self.write_line(b'DATA') 804 self.assertEqual(self.channel.socket.last, 805 b'354 End data with <CR><LF>.<CR><LF>\r\n') 806 self.write_line(b'data\r\nmore\r\n.') 807 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 808 self.assertEqual(self.server.messages, 809 [(('peer-address', 'peer-port'), 810 'eggs@example', 811 ['spam@example'], 812 'data\nmore')]) 813 814 def test_data_limit_dialog_too_much_data(self): 815 self.write_line(b'HELO example') 816 self.write_line(b'MAIL From:eggs@example') 817 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 818 self.write_line(b'RCPT To:spam@example') 819 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 820 821 self.write_line(b'DATA') 822 self.assertEqual(self.channel.socket.last, 823 b'354 End data with <CR><LF>.<CR><LF>\r\n') 824 self.write_line(b'This message is longer than 32 bytes\r\n.') 825 self.assertEqual(self.channel.socket.last, 826 b'552 Error: Too much mail data\r\n') 827 828 829class SMTPDChannelWithDecodeDataFalse(unittest.TestCase): 830 831 def setUp(self): 832 smtpd.socket = asyncore.socket = mock_socket 833 self.old_debugstream = smtpd.DEBUGSTREAM 834 self.debug = smtpd.DEBUGSTREAM = io.StringIO() 835 self.server = DummyServer((socket_helper.HOST, 0), ('b', 0)) 836 conn, addr = self.server.accept() 837 self.channel = smtpd.SMTPChannel(self.server, conn, addr) 838 839 def tearDown(self): 840 asyncore.close_all() 841 asyncore.socket = smtpd.socket = socket 842 smtpd.DEBUGSTREAM = self.old_debugstream 843 844 def write_line(self, line): 845 self.channel.socket.queue_recv(line) 846 self.channel.handle_read() 847 848 def test_ascii_data(self): 849 self.write_line(b'HELO example') 850 self.write_line(b'MAIL From:eggs@example') 851 self.write_line(b'RCPT To:spam@example') 852 self.write_line(b'DATA') 853 self.write_line(b'plain ascii text') 854 self.write_line(b'.') 855 self.assertEqual(self.channel.received_data, b'plain ascii text') 856 857 def test_utf8_data(self): 858 self.write_line(b'HELO example') 859 self.write_line(b'MAIL From:eggs@example') 860 self.write_line(b'RCPT To:spam@example') 861 self.write_line(b'DATA') 862 self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87') 863 self.write_line(b'and some plain ascii') 864 self.write_line(b'.') 865 self.assertEqual( 866 self.channel.received_data, 867 b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87\n' 868 b'and some plain ascii') 869 870 871class SMTPDChannelWithDecodeDataTrue(unittest.TestCase): 872 873 def setUp(self): 874 smtpd.socket = asyncore.socket = mock_socket 875 self.old_debugstream = smtpd.DEBUGSTREAM 876 self.debug = smtpd.DEBUGSTREAM = io.StringIO() 877 self.server = DummyServer((socket_helper.HOST, 0), ('b', 0), 878 decode_data=True) 879 conn, addr = self.server.accept() 880 # Set decode_data to True 881 self.channel = smtpd.SMTPChannel(self.server, conn, addr, 882 decode_data=True) 883 884 def tearDown(self): 885 asyncore.close_all() 886 asyncore.socket = smtpd.socket = socket 887 smtpd.DEBUGSTREAM = self.old_debugstream 888 889 def write_line(self, line): 890 self.channel.socket.queue_recv(line) 891 self.channel.handle_read() 892 893 def test_ascii_data(self): 894 self.write_line(b'HELO example') 895 self.write_line(b'MAIL From:eggs@example') 896 self.write_line(b'RCPT To:spam@example') 897 self.write_line(b'DATA') 898 self.write_line(b'plain ascii text') 899 self.write_line(b'.') 900 self.assertEqual(self.channel.received_data, 'plain ascii text') 901 902 def test_utf8_data(self): 903 self.write_line(b'HELO example') 904 self.write_line(b'MAIL From:eggs@example') 905 self.write_line(b'RCPT To:spam@example') 906 self.write_line(b'DATA') 907 self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87') 908 self.write_line(b'and some plain ascii') 909 self.write_line(b'.') 910 self.assertEqual( 911 self.channel.received_data, 912 'utf8 enriched text: żźć\nand some plain ascii') 913 914 915class SMTPDChannelTestWithEnableSMTPUTF8True(unittest.TestCase): 916 def setUp(self): 917 smtpd.socket = asyncore.socket = mock_socket 918 self.old_debugstream = smtpd.DEBUGSTREAM 919 self.debug = smtpd.DEBUGSTREAM = io.StringIO() 920 self.server = DummyServer((socket_helper.HOST, 0), ('b', 0), 921 enable_SMTPUTF8=True) 922 conn, addr = self.server.accept() 923 self.channel = smtpd.SMTPChannel(self.server, conn, addr, 924 enable_SMTPUTF8=True) 925 926 def tearDown(self): 927 asyncore.close_all() 928 asyncore.socket = smtpd.socket = socket 929 smtpd.DEBUGSTREAM = self.old_debugstream 930 931 def write_line(self, line): 932 self.channel.socket.queue_recv(line) 933 self.channel.handle_read() 934 935 def test_MAIL_command_accepts_SMTPUTF8_when_announced(self): 936 self.write_line(b'EHLO example') 937 self.write_line( 938 'MAIL from: <naïve@example.com> BODY=8BITMIME SMTPUTF8'.encode( 939 'utf-8') 940 ) 941 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 942 943 def test_process_smtputf8_message(self): 944 self.write_line(b'EHLO example') 945 for mail_parameters in [b'', b'BODY=8BITMIME SMTPUTF8']: 946 self.write_line(b'MAIL from: <a@example> ' + mail_parameters) 947 self.assertEqual(self.channel.socket.last[0:3], b'250') 948 self.write_line(b'rcpt to:<b@example.com>') 949 self.assertEqual(self.channel.socket.last[0:3], b'250') 950 self.write_line(b'data') 951 self.assertEqual(self.channel.socket.last[0:3], b'354') 952 self.write_line(b'c\r\n.') 953 if mail_parameters == b'': 954 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 955 else: 956 self.assertEqual(self.channel.socket.last, 957 b'250 SMTPUTF8 message okish\r\n') 958 959 def test_utf8_data(self): 960 self.write_line(b'EHLO example') 961 self.write_line( 962 'MAIL From: naïve@examplé BODY=8BITMIME SMTPUTF8'.encode('utf-8')) 963 self.assertEqual(self.channel.socket.last[0:3], b'250') 964 self.write_line('RCPT To:späm@examplé'.encode('utf-8')) 965 self.assertEqual(self.channel.socket.last[0:3], b'250') 966 self.write_line(b'DATA') 967 self.assertEqual(self.channel.socket.last[0:3], b'354') 968 self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87') 969 self.write_line(b'.') 970 self.assertEqual( 971 self.channel.received_data, 972 b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87') 973 974 def test_MAIL_command_limit_extended_with_SIZE_and_SMTPUTF8(self): 975 self.write_line(b'ehlo example') 976 fill_len = (512 + 26 + 10) - len('mail from:<@example>') 977 self.write_line(b'MAIL from:<' + 978 b'a' * (fill_len + 1) + 979 b'@example>') 980 self.assertEqual(self.channel.socket.last, 981 b'500 Error: line too long\r\n') 982 self.write_line(b'MAIL from:<' + 983 b'a' * fill_len + 984 b'@example>') 985 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 986 987 def test_multiple_emails_with_extended_command_length(self): 988 self.write_line(b'ehlo example') 989 fill_len = (512 + 26 + 10) - len('mail from:<@example>') 990 for char in [b'a', b'b', b'c']: 991 self.write_line(b'MAIL from:<' + char * fill_len + b'a@example>') 992 self.assertEqual(self.channel.socket.last[0:3], b'500') 993 self.write_line(b'MAIL from:<' + char * fill_len + b'@example>') 994 self.assertEqual(self.channel.socket.last[0:3], b'250') 995 self.write_line(b'rcpt to:<hans@example.com>') 996 self.assertEqual(self.channel.socket.last[0:3], b'250') 997 self.write_line(b'data') 998 self.assertEqual(self.channel.socket.last[0:3], b'354') 999 self.write_line(b'test\r\n.') 1000 self.assertEqual(self.channel.socket.last[0:3], b'250') 1001 1002 1003class MiscTestCase(unittest.TestCase): 1004 def test__all__(self): 1005 blacklist = { 1006 "program", "Devnull", "DEBUGSTREAM", "NEWLINE", "COMMASPACE", 1007 "DATA_SIZE_DEFAULT", "usage", "Options", "parseargs", 1008 1009 } 1010 support.check__all__(self, smtpd, blacklist=blacklist) 1011 1012 1013if __name__ == "__main__": 1014 unittest.main() 1015