1# Copyright (c) Twisted Matrix Laboratories. 2# See LICENSE for details. 3 4""" 5Test cases for Ltwisted.mail.pop3} module. 6""" 7 8import StringIO 9import hmac 10import base64 11import itertools 12 13from zope.interface import implements 14 15from twisted.internet import defer 16 17from twisted.trial import unittest, util 18from twisted import mail 19import twisted.mail.protocols 20import twisted.mail.pop3 21import twisted.internet.protocol 22from twisted import internet 23from twisted.mail import pop3 24from twisted.protocols import loopback 25from twisted.python import failure 26from twisted.python.util import OrderedDict 27 28from twisted import cred 29import twisted.cred.portal 30import twisted.cred.checkers 31import twisted.cred.credentials 32 33from twisted.test.proto_helpers import LineSendingProtocol 34 35 36class UtilityTestCase(unittest.TestCase): 37 """ 38 Test the various helper functions and classes used by the POP3 server 39 protocol implementation. 40 """ 41 42 def testLineBuffering(self): 43 """ 44 Test creating a LineBuffer and feeding it some lines. The lines should 45 build up in its internal buffer for a while and then get spat out to 46 the writer. 47 """ 48 output = [] 49 input = iter(itertools.cycle(['012', '345', '6', '7', '8', '9'])) 50 c = pop3._IteratorBuffer(output.extend, input, 6) 51 i = iter(c) 52 self.assertEqual(output, []) # nothing is buffer 53 i.next() 54 self.assertEqual(output, []) # '012' is buffered 55 i.next() 56 self.assertEqual(output, []) # '012345' is buffered 57 i.next() 58 self.assertEqual(output, ['012', '345', '6']) # nothing is buffered 59 for n in range(5): 60 i.next() 61 self.assertEqual(output, ['012', '345', '6', '7', '8', '9', '012', '345']) 62 63 64 def testFinishLineBuffering(self): 65 """ 66 Test that a LineBuffer flushes everything when its iterator is 67 exhausted, and itself raises StopIteration. 68 """ 69 output = [] 70 input = iter(['a', 'b', 'c']) 71 c = pop3._IteratorBuffer(output.extend, input, 5) 72 for i in c: 73 pass 74 self.assertEqual(output, ['a', 'b', 'c']) 75 76 77 def testSuccessResponseFormatter(self): 78 """ 79 Test that the thing that spits out POP3 'success responses' works 80 right. 81 """ 82 self.assertEqual( 83 pop3.successResponse('Great.'), 84 '+OK Great.\r\n') 85 86 87 def testStatLineFormatter(self): 88 """ 89 Test that the function which formats stat lines does so appropriately. 90 """ 91 statLine = list(pop3.formatStatResponse([]))[-1] 92 self.assertEqual(statLine, '+OK 0 0\r\n') 93 94 statLine = list(pop3.formatStatResponse([10, 31, 0, 10101]))[-1] 95 self.assertEqual(statLine, '+OK 4 10142\r\n') 96 97 98 def testListLineFormatter(self): 99 """ 100 Test that the function which formats the lines in response to a LIST 101 command does so appropriately. 102 """ 103 listLines = list(pop3.formatListResponse([])) 104 self.assertEqual( 105 listLines, 106 ['+OK 0\r\n', '.\r\n']) 107 108 listLines = list(pop3.formatListResponse([1, 2, 3, 100])) 109 self.assertEqual( 110 listLines, 111 ['+OK 4\r\n', '1 1\r\n', '2 2\r\n', '3 3\r\n', '4 100\r\n', '.\r\n']) 112 113 114 115 def testUIDListLineFormatter(self): 116 """ 117 Test that the function which formats lines in response to a UIDL 118 command does so appropriately. 119 """ 120 UIDs = ['abc', 'def', 'ghi'] 121 listLines = list(pop3.formatUIDListResponse([], UIDs.__getitem__)) 122 self.assertEqual( 123 listLines, 124 ['+OK \r\n', '.\r\n']) 125 126 listLines = list(pop3.formatUIDListResponse([123, 431, 591], UIDs.__getitem__)) 127 self.assertEqual( 128 listLines, 129 ['+OK \r\n', '1 abc\r\n', '2 def\r\n', '3 ghi\r\n', '.\r\n']) 130 131 listLines = list(pop3.formatUIDListResponse([0, None, 591], UIDs.__getitem__)) 132 self.assertEqual( 133 listLines, 134 ['+OK \r\n', '1 abc\r\n', '3 ghi\r\n', '.\r\n']) 135 136 137 138class MyVirtualPOP3(mail.protocols.VirtualPOP3): 139 140 magic = '<moshez>' 141 142 def authenticateUserAPOP(self, user, digest): 143 user, domain = self.lookupDomain(user) 144 return self.service.domains['baz.com'].authenticateUserAPOP(user, digest, self.magic, domain) 145 146class DummyDomain: 147 148 def __init__(self): 149 self.users = {} 150 151 def addUser(self, name): 152 self.users[name] = [] 153 154 def addMessage(self, name, message): 155 self.users[name].append(message) 156 157 def authenticateUserAPOP(self, name, digest, magic, domain): 158 return pop3.IMailbox, ListMailbox(self.users[name]), lambda: None 159 160 161class ListMailbox: 162 163 def __init__(self, list): 164 self.list = list 165 166 def listMessages(self, i=None): 167 if i is None: 168 return map(len, self.list) 169 return len(self.list[i]) 170 171 def getMessage(self, i): 172 return StringIO.StringIO(self.list[i]) 173 174 def getUidl(self, i): 175 return i 176 177 def deleteMessage(self, i): 178 self.list[i] = '' 179 180 def sync(self): 181 pass 182 183class MyPOP3Downloader(pop3.POP3Client): 184 185 def handle_WELCOME(self, line): 186 pop3.POP3Client.handle_WELCOME(self, line) 187 self.apop('hello@baz.com', 'world') 188 189 def handle_APOP(self, line): 190 parts = line.split() 191 code = parts[0] 192 data = (parts[1:] or ['NONE'])[0] 193 if code != '+OK': 194 print parts 195 raise AssertionError, 'code is ' + code 196 self.lines = [] 197 self.retr(1) 198 199 def handle_RETR_continue(self, line): 200 self.lines.append(line) 201 202 def handle_RETR_end(self): 203 self.message = '\n'.join(self.lines) + '\n' 204 self.quit() 205 206 def handle_QUIT(self, line): 207 if line[:3] != '+OK': 208 raise AssertionError, 'code is ' + line 209 210 211class POP3TestCase(unittest.TestCase): 212 213 message = '''\ 214Subject: urgent 215 216Someone set up us the bomb! 217''' 218 219 expectedOutput = '''\ 220+OK <moshez>\015 221+OK Authentication succeeded\015 222+OK \015 2231 0\015 224.\015 225+OK %d\015 226Subject: urgent\015 227\015 228Someone set up us the bomb!\015 229.\015 230+OK \015 231''' % len(message) 232 233 def setUp(self): 234 self.factory = internet.protocol.Factory() 235 self.factory.domains = {} 236 self.factory.domains['baz.com'] = DummyDomain() 237 self.factory.domains['baz.com'].addUser('hello') 238 self.factory.domains['baz.com'].addMessage('hello', self.message) 239 240 def testMessages(self): 241 client = LineSendingProtocol([ 242 'APOP hello@baz.com world', 243 'UIDL', 244 'RETR 1', 245 'QUIT', 246 ]) 247 server = MyVirtualPOP3() 248 server.service = self.factory 249 def check(ignored): 250 output = '\r\n'.join(client.response) + '\r\n' 251 self.assertEqual(output, self.expectedOutput) 252 return loopback.loopbackTCP(server, client).addCallback(check) 253 254 def testLoopback(self): 255 protocol = MyVirtualPOP3() 256 protocol.service = self.factory 257 clientProtocol = MyPOP3Downloader() 258 def check(ignored): 259 self.assertEqual(clientProtocol.message, self.message) 260 protocol.connectionLost( 261 failure.Failure(Exception("Test harness disconnect"))) 262 d = loopback.loopbackAsync(protocol, clientProtocol) 263 return d.addCallback(check) 264 testLoopback.suppress = [util.suppress(message="twisted.mail.pop3.POP3Client is deprecated")] 265 266 267 268class DummyPOP3(pop3.POP3): 269 270 magic = '<moshez>' 271 272 def authenticateUserAPOP(self, user, password): 273 return pop3.IMailbox, DummyMailbox(ValueError), lambda: None 274 275 276 277class DummyMailbox(pop3.Mailbox): 278 279 messages = ['From: moshe\nTo: moshe\n\nHow are you, friend?\n'] 280 281 def __init__(self, exceptionType): 282 self.messages = DummyMailbox.messages[:] 283 self.exceptionType = exceptionType 284 285 def listMessages(self, i=None): 286 if i is None: 287 return map(len, self.messages) 288 if i >= len(self.messages): 289 raise self.exceptionType() 290 return len(self.messages[i]) 291 292 def getMessage(self, i): 293 return StringIO.StringIO(self.messages[i]) 294 295 def getUidl(self, i): 296 if i >= len(self.messages): 297 raise self.exceptionType() 298 return str(i) 299 300 def deleteMessage(self, i): 301 self.messages[i] = '' 302 303 304class AnotherPOP3TestCase(unittest.TestCase): 305 306 def runTest(self, lines, expectedOutput): 307 dummy = DummyPOP3() 308 client = LineSendingProtocol(lines) 309 d = loopback.loopbackAsync(dummy, client) 310 return d.addCallback(self._cbRunTest, client, dummy, expectedOutput) 311 312 313 def _cbRunTest(self, ignored, client, dummy, expectedOutput): 314 self.assertEqual('\r\n'.join(expectedOutput), 315 '\r\n'.join(client.response)) 316 dummy.connectionLost(failure.Failure(Exception("Test harness disconnect"))) 317 return ignored 318 319 320 def test_buffer(self): 321 """ 322 Test a lot of different POP3 commands in an extremely pipelined 323 scenario. 324 325 This test may cover legitimate behavior, but the intent and 326 granularity are not very good. It would likely be an improvement to 327 split it into a number of smaller, more focused tests. 328 """ 329 return self.runTest( 330 ["APOP moshez dummy", 331 "LIST", 332 "UIDL", 333 "RETR 1", 334 "RETR 2", 335 "DELE 1", 336 "RETR 1", 337 "QUIT"], 338 ['+OK <moshez>', 339 '+OK Authentication succeeded', 340 '+OK 1', 341 '1 44', 342 '.', 343 '+OK ', 344 '1 0', 345 '.', 346 '+OK 44', 347 'From: moshe', 348 'To: moshe', 349 '', 350 'How are you, friend?', 351 '.', 352 '-ERR Bad message number argument', 353 '+OK ', 354 '-ERR message deleted', 355 '+OK ']) 356 357 358 def test_noop(self): 359 """ 360 Test the no-op command. 361 """ 362 return self.runTest( 363 ['APOP spiv dummy', 364 'NOOP', 365 'QUIT'], 366 ['+OK <moshez>', 367 '+OK Authentication succeeded', 368 '+OK ', 369 '+OK ']) 370 371 372 def testAuthListing(self): 373 p = DummyPOP3() 374 p.factory = internet.protocol.Factory() 375 p.factory.challengers = {'Auth1': None, 'secondAuth': None, 'authLast': None} 376 client = LineSendingProtocol([ 377 "AUTH", 378 "QUIT", 379 ]) 380 381 d = loopback.loopbackAsync(p, client) 382 return d.addCallback(self._cbTestAuthListing, client) 383 384 def _cbTestAuthListing(self, ignored, client): 385 self.failUnless(client.response[1].startswith('+OK')) 386 self.assertEqual(sorted(client.response[2:5]), 387 ["AUTH1", "AUTHLAST", "SECONDAUTH"]) 388 self.assertEqual(client.response[5], ".") 389 390 def testIllegalPASS(self): 391 dummy = DummyPOP3() 392 client = LineSendingProtocol([ 393 "PASS fooz", 394 "QUIT" 395 ]) 396 d = loopback.loopbackAsync(dummy, client) 397 return d.addCallback(self._cbTestIllegalPASS, client, dummy) 398 399 def _cbTestIllegalPASS(self, ignored, client, dummy): 400 expected_output = '+OK <moshez>\r\n-ERR USER required before PASS\r\n+OK \r\n' 401 self.assertEqual(expected_output, '\r\n'.join(client.response) + '\r\n') 402 dummy.connectionLost(failure.Failure(Exception("Test harness disconnect"))) 403 404 def testEmptyPASS(self): 405 dummy = DummyPOP3() 406 client = LineSendingProtocol([ 407 "PASS ", 408 "QUIT" 409 ]) 410 d = loopback.loopbackAsync(dummy, client) 411 return d.addCallback(self._cbTestEmptyPASS, client, dummy) 412 413 def _cbTestEmptyPASS(self, ignored, client, dummy): 414 expected_output = '+OK <moshez>\r\n-ERR USER required before PASS\r\n+OK \r\n' 415 self.assertEqual(expected_output, '\r\n'.join(client.response) + '\r\n') 416 dummy.connectionLost(failure.Failure(Exception("Test harness disconnect"))) 417 418 419class TestServerFactory: 420 implements(pop3.IServerFactory) 421 422 def cap_IMPLEMENTATION(self): 423 return "Test Implementation String" 424 425 def cap_EXPIRE(self): 426 return 60 427 428 challengers = OrderedDict([("SCHEME_1", None), ("SCHEME_2", None)]) 429 430 def cap_LOGIN_DELAY(self): 431 return 120 432 433 pue = True 434 def perUserExpiration(self): 435 return self.pue 436 437 puld = True 438 def perUserLoginDelay(self): 439 return self.puld 440 441 442class TestMailbox: 443 loginDelay = 100 444 messageExpiration = 25 445 446 447class CapabilityTestCase(unittest.TestCase): 448 def setUp(self): 449 s = StringIO.StringIO() 450 p = pop3.POP3() 451 p.factory = TestServerFactory() 452 p.transport = internet.protocol.FileWrapper(s) 453 p.connectionMade() 454 p.do_CAPA() 455 456 self.caps = p.listCapabilities() 457 self.pcaps = s.getvalue().splitlines() 458 459 s = StringIO.StringIO() 460 p.mbox = TestMailbox() 461 p.transport = internet.protocol.FileWrapper(s) 462 p.do_CAPA() 463 464 self.lpcaps = s.getvalue().splitlines() 465 p.connectionLost(failure.Failure(Exception("Test harness disconnect"))) 466 467 def contained(self, s, *caps): 468 for c in caps: 469 self.assertIn(s, c) 470 471 def testUIDL(self): 472 self.contained("UIDL", self.caps, self.pcaps, self.lpcaps) 473 474 def testTOP(self): 475 self.contained("TOP", self.caps, self.pcaps, self.lpcaps) 476 477 def testUSER(self): 478 self.contained("USER", self.caps, self.pcaps, self.lpcaps) 479 480 def testEXPIRE(self): 481 self.contained("EXPIRE 60 USER", self.caps, self.pcaps) 482 self.contained("EXPIRE 25", self.lpcaps) 483 484 def testIMPLEMENTATION(self): 485 self.contained( 486 "IMPLEMENTATION Test Implementation String", 487 self.caps, self.pcaps, self.lpcaps 488 ) 489 490 def testSASL(self): 491 self.contained( 492 "SASL SCHEME_1 SCHEME_2", 493 self.caps, self.pcaps, self.lpcaps 494 ) 495 496 def testLOGIN_DELAY(self): 497 self.contained("LOGIN-DELAY 120 USER", self.caps, self.pcaps) 498 self.assertIn("LOGIN-DELAY 100", self.lpcaps) 499 500 501 502class GlobalCapabilitiesTestCase(unittest.TestCase): 503 def setUp(self): 504 s = StringIO.StringIO() 505 p = pop3.POP3() 506 p.factory = TestServerFactory() 507 p.factory.pue = p.factory.puld = False 508 p.transport = internet.protocol.FileWrapper(s) 509 p.connectionMade() 510 p.do_CAPA() 511 512 self.caps = p.listCapabilities() 513 self.pcaps = s.getvalue().splitlines() 514 515 s = StringIO.StringIO() 516 p.mbox = TestMailbox() 517 p.transport = internet.protocol.FileWrapper(s) 518 p.do_CAPA() 519 520 self.lpcaps = s.getvalue().splitlines() 521 p.connectionLost(failure.Failure(Exception("Test harness disconnect"))) 522 523 def contained(self, s, *caps): 524 for c in caps: 525 self.assertIn(s, c) 526 527 def testEXPIRE(self): 528 self.contained("EXPIRE 60", self.caps, self.pcaps, self.lpcaps) 529 530 def testLOGIN_DELAY(self): 531 self.contained("LOGIN-DELAY 120", self.caps, self.pcaps, self.lpcaps) 532 533 534 535class TestRealm: 536 def requestAvatar(self, avatarId, mind, *interfaces): 537 if avatarId == 'testuser': 538 return pop3.IMailbox, DummyMailbox(ValueError), lambda: None 539 assert False 540 541 542 543class SASLTestCase(unittest.TestCase): 544 def testValidLogin(self): 545 p = pop3.POP3() 546 p.factory = TestServerFactory() 547 p.factory.challengers = {'CRAM-MD5': cred.credentials.CramMD5Credentials} 548 p.portal = cred.portal.Portal(TestRealm()) 549 ch = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse() 550 ch.addUser('testuser', 'testpassword') 551 p.portal.registerChecker(ch) 552 553 s = StringIO.StringIO() 554 p.transport = internet.protocol.FileWrapper(s) 555 p.connectionMade() 556 557 p.lineReceived("CAPA") 558 self.failUnless(s.getvalue().find("SASL CRAM-MD5") >= 0) 559 560 p.lineReceived("AUTH CRAM-MD5") 561 chal = s.getvalue().splitlines()[-1][2:] 562 chal = base64.decodestring(chal) 563 response = hmac.HMAC('testpassword', chal).hexdigest() 564 565 p.lineReceived(base64.encodestring('testuser ' + response).rstrip('\n')) 566 self.failUnless(p.mbox) 567 self.failUnless(s.getvalue().splitlines()[-1].find("+OK") >= 0) 568 p.connectionLost(failure.Failure(Exception("Test harness disconnect"))) 569 570 571 572class CommandMixin: 573 """ 574 Tests for all the commands a POP3 server is allowed to receive. 575 """ 576 577 extraMessage = '''\ 578From: guy 579To: fellow 580 581More message text for you. 582''' 583 584 585 def setUp(self): 586 """ 587 Make a POP3 server protocol instance hooked up to a simple mailbox and 588 a transport that buffers output to a StringIO. 589 """ 590 p = pop3.POP3() 591 p.mbox = self.mailboxType(self.exceptionType) 592 p.schedule = list 593 self.pop3Server = p 594 595 s = StringIO.StringIO() 596 p.transport = internet.protocol.FileWrapper(s) 597 p.connectionMade() 598 s.truncate(0) 599 self.pop3Transport = s 600 601 602 def tearDown(self): 603 """ 604 Disconnect the server protocol so it can clean up anything it might 605 need to clean up. 606 """ 607 self.pop3Server.connectionLost(failure.Failure(Exception("Test harness disconnect"))) 608 609 610 def _flush(self): 611 """ 612 Do some of the things that the reactor would take care of, if the 613 reactor were actually running. 614 """ 615 # Oh man FileWrapper is pooh. 616 self.pop3Server.transport._checkProducer() 617 618 619 def testLIST(self): 620 """ 621 Test the two forms of list: with a message index number, which should 622 return a short-form response, and without a message index number, which 623 should return a long-form response, one line per message. 624 """ 625 p = self.pop3Server 626 s = self.pop3Transport 627 628 p.lineReceived("LIST 1") 629 self._flush() 630 self.assertEqual(s.getvalue(), "+OK 1 44\r\n") 631 s.truncate(0) 632 633 p.lineReceived("LIST") 634 self._flush() 635 self.assertEqual(s.getvalue(), "+OK 1\r\n1 44\r\n.\r\n") 636 637 638 def testLISTWithBadArgument(self): 639 """ 640 Test that non-integers and out-of-bound integers produce appropriate 641 error responses. 642 """ 643 p = self.pop3Server 644 s = self.pop3Transport 645 646 p.lineReceived("LIST a") 647 self.assertEqual( 648 s.getvalue(), 649 "-ERR Invalid message-number: 'a'\r\n") 650 s.truncate(0) 651 652 p.lineReceived("LIST 0") 653 self.assertEqual( 654 s.getvalue(), 655 "-ERR Invalid message-number: 0\r\n") 656 s.truncate(0) 657 658 p.lineReceived("LIST 2") 659 self.assertEqual( 660 s.getvalue(), 661 "-ERR Invalid message-number: 2\r\n") 662 s.truncate(0) 663 664 665 def testUIDL(self): 666 """ 667 Test the two forms of the UIDL command. These are just like the two 668 forms of the LIST command. 669 """ 670 p = self.pop3Server 671 s = self.pop3Transport 672 673 p.lineReceived("UIDL 1") 674 self.assertEqual(s.getvalue(), "+OK 0\r\n") 675 s.truncate(0) 676 677 p.lineReceived("UIDL") 678 self._flush() 679 self.assertEqual(s.getvalue(), "+OK \r\n1 0\r\n.\r\n") 680 681 682 def testUIDLWithBadArgument(self): 683 """ 684 Test that UIDL with a non-integer or an out-of-bounds integer produces 685 the appropriate error response. 686 """ 687 p = self.pop3Server 688 s = self.pop3Transport 689 690 p.lineReceived("UIDL a") 691 self.assertEqual( 692 s.getvalue(), 693 "-ERR Bad message number argument\r\n") 694 s.truncate(0) 695 696 p.lineReceived("UIDL 0") 697 self.assertEqual( 698 s.getvalue(), 699 "-ERR Bad message number argument\r\n") 700 s.truncate(0) 701 702 p.lineReceived("UIDL 2") 703 self.assertEqual( 704 s.getvalue(), 705 "-ERR Bad message number argument\r\n") 706 s.truncate(0) 707 708 709 def testSTAT(self): 710 """ 711 Test the single form of the STAT command, which returns a short-form 712 response of the number of messages in the mailbox and their total size. 713 """ 714 p = self.pop3Server 715 s = self.pop3Transport 716 717 p.lineReceived("STAT") 718 self._flush() 719 self.assertEqual(s.getvalue(), "+OK 1 44\r\n") 720 721 722 def testRETR(self): 723 """ 724 Test downloading a message. 725 """ 726 p = self.pop3Server 727 s = self.pop3Transport 728 729 p.lineReceived("RETR 1") 730 self._flush() 731 self.assertEqual( 732 s.getvalue(), 733 "+OK 44\r\n" 734 "From: moshe\r\n" 735 "To: moshe\r\n" 736 "\r\n" 737 "How are you, friend?\r\n" 738 ".\r\n") 739 s.truncate(0) 740 741 742 def testRETRWithBadArgument(self): 743 """ 744 Test that trying to download a message with a bad argument, either not 745 an integer or an out-of-bounds integer, fails with the appropriate 746 error response. 747 """ 748 p = self.pop3Server 749 s = self.pop3Transport 750 751 p.lineReceived("RETR a") 752 self.assertEqual( 753 s.getvalue(), 754 "-ERR Bad message number argument\r\n") 755 s.truncate(0) 756 757 p.lineReceived("RETR 0") 758 self.assertEqual( 759 s.getvalue(), 760 "-ERR Bad message number argument\r\n") 761 s.truncate(0) 762 763 p.lineReceived("RETR 2") 764 self.assertEqual( 765 s.getvalue(), 766 "-ERR Bad message number argument\r\n") 767 s.truncate(0) 768 769 770 def testTOP(self): 771 """ 772 Test downloading the headers and part of the body of a message. 773 """ 774 p = self.pop3Server 775 s = self.pop3Transport 776 p.mbox.messages.append(self.extraMessage) 777 778 p.lineReceived("TOP 1 0") 779 self._flush() 780 self.assertEqual( 781 s.getvalue(), 782 "+OK Top of message follows\r\n" 783 "From: moshe\r\n" 784 "To: moshe\r\n" 785 "\r\n" 786 ".\r\n") 787 788 789 def testTOPWithBadArgument(self): 790 """ 791 Test that trying to download a message with a bad argument, either a 792 message number which isn't an integer or is an out-of-bounds integer or 793 a number of lines which isn't an integer or is a negative integer, 794 fails with the appropriate error response. 795 """ 796 p = self.pop3Server 797 s = self.pop3Transport 798 p.mbox.messages.append(self.extraMessage) 799 800 p.lineReceived("TOP 1 a") 801 self.assertEqual( 802 s.getvalue(), 803 "-ERR Bad line count argument\r\n") 804 s.truncate(0) 805 806 p.lineReceived("TOP 1 -1") 807 self.assertEqual( 808 s.getvalue(), 809 "-ERR Bad line count argument\r\n") 810 s.truncate(0) 811 812 p.lineReceived("TOP a 1") 813 self.assertEqual( 814 s.getvalue(), 815 "-ERR Bad message number argument\r\n") 816 s.truncate(0) 817 818 p.lineReceived("TOP 0 1") 819 self.assertEqual( 820 s.getvalue(), 821 "-ERR Bad message number argument\r\n") 822 s.truncate(0) 823 824 p.lineReceived("TOP 3 1") 825 self.assertEqual( 826 s.getvalue(), 827 "-ERR Bad message number argument\r\n") 828 s.truncate(0) 829 830 831 def testLAST(self): 832 """ 833 Test the exceedingly pointless LAST command, which tells you the 834 highest message index which you have already downloaded. 835 """ 836 p = self.pop3Server 837 s = self.pop3Transport 838 p.mbox.messages.append(self.extraMessage) 839 840 p.lineReceived('LAST') 841 self.assertEqual( 842 s.getvalue(), 843 "+OK 0\r\n") 844 s.truncate(0) 845 846 847 def testRetrieveUpdatesHighest(self): 848 """ 849 Test that issuing a RETR command updates the LAST response. 850 """ 851 p = self.pop3Server 852 s = self.pop3Transport 853 p.mbox.messages.append(self.extraMessage) 854 855 p.lineReceived('RETR 2') 856 self._flush() 857 s.truncate(0) 858 p.lineReceived('LAST') 859 self.assertEqual( 860 s.getvalue(), 861 '+OK 2\r\n') 862 s.truncate(0) 863 864 865 def testTopUpdatesHighest(self): 866 """ 867 Test that issuing a TOP command updates the LAST response. 868 """ 869 p = self.pop3Server 870 s = self.pop3Transport 871 p.mbox.messages.append(self.extraMessage) 872 873 p.lineReceived('TOP 2 10') 874 self._flush() 875 s.truncate(0) 876 p.lineReceived('LAST') 877 self.assertEqual( 878 s.getvalue(), 879 '+OK 2\r\n') 880 881 882 def testHighestOnlyProgresses(self): 883 """ 884 Test that downloading a message with a smaller index than the current 885 LAST response doesn't change the LAST response. 886 """ 887 p = self.pop3Server 888 s = self.pop3Transport 889 p.mbox.messages.append(self.extraMessage) 890 891 p.lineReceived('RETR 2') 892 self._flush() 893 p.lineReceived('TOP 1 10') 894 self._flush() 895 s.truncate(0) 896 p.lineReceived('LAST') 897 self.assertEqual( 898 s.getvalue(), 899 '+OK 2\r\n') 900 901 902 def testResetClearsHighest(self): 903 """ 904 Test that issuing RSET changes the LAST response to 0. 905 """ 906 p = self.pop3Server 907 s = self.pop3Transport 908 p.mbox.messages.append(self.extraMessage) 909 910 p.lineReceived('RETR 2') 911 self._flush() 912 p.lineReceived('RSET') 913 s.truncate(0) 914 p.lineReceived('LAST') 915 self.assertEqual( 916 s.getvalue(), 917 '+OK 0\r\n') 918 919 920 921_listMessageDeprecation = ( 922 "twisted.mail.pop3.IMailbox.listMessages may not " 923 "raise IndexError for out-of-bounds message numbers: " 924 "raise ValueError instead.") 925_listMessageSuppression = util.suppress( 926 message=_listMessageDeprecation, 927 category=PendingDeprecationWarning) 928 929_getUidlDeprecation = ( 930 "twisted.mail.pop3.IMailbox.getUidl may not " 931 "raise IndexError for out-of-bounds message numbers: " 932 "raise ValueError instead.") 933_getUidlSuppression = util.suppress( 934 message=_getUidlDeprecation, 935 category=PendingDeprecationWarning) 936 937class IndexErrorCommandTestCase(CommandMixin, unittest.TestCase): 938 """ 939 Run all of the command tests against a mailbox which raises IndexError 940 when an out of bounds request is made. This behavior will be deprecated 941 shortly and then removed. 942 """ 943 exceptionType = IndexError 944 mailboxType = DummyMailbox 945 946 def testLISTWithBadArgument(self): 947 return CommandMixin.testLISTWithBadArgument(self) 948 testLISTWithBadArgument.suppress = [_listMessageSuppression] 949 950 951 def testUIDLWithBadArgument(self): 952 return CommandMixin.testUIDLWithBadArgument(self) 953 testUIDLWithBadArgument.suppress = [_getUidlSuppression] 954 955 956 def testTOPWithBadArgument(self): 957 return CommandMixin.testTOPWithBadArgument(self) 958 testTOPWithBadArgument.suppress = [_listMessageSuppression] 959 960 961 def testRETRWithBadArgument(self): 962 return CommandMixin.testRETRWithBadArgument(self) 963 testRETRWithBadArgument.suppress = [_listMessageSuppression] 964 965 966 967class ValueErrorCommandTestCase(CommandMixin, unittest.TestCase): 968 """ 969 Run all of the command tests against a mailbox which raises ValueError 970 when an out of bounds request is made. This is the correct behavior and 971 after support for mailboxes which raise IndexError is removed, this will 972 become just C{CommandTestCase}. 973 """ 974 exceptionType = ValueError 975 mailboxType = DummyMailbox 976 977 978 979class SyncDeferredMailbox(DummyMailbox): 980 """ 981 Mailbox which has a listMessages implementation which returns a Deferred 982 which has already fired. 983 """ 984 def listMessages(self, n=None): 985 return defer.succeed(DummyMailbox.listMessages(self, n)) 986 987 988 989class IndexErrorSyncDeferredCommandTestCase(IndexErrorCommandTestCase): 990 """ 991 Run all of the L{IndexErrorCommandTestCase} tests with a 992 synchronous-Deferred returning IMailbox implementation. 993 """ 994 mailboxType = SyncDeferredMailbox 995 996 997 998class ValueErrorSyncDeferredCommandTestCase(ValueErrorCommandTestCase): 999 """ 1000 Run all of the L{ValueErrorCommandTestCase} tests with a 1001 synchronous-Deferred returning IMailbox implementation. 1002 """ 1003 mailboxType = SyncDeferredMailbox 1004 1005 1006 1007class AsyncDeferredMailbox(DummyMailbox): 1008 """ 1009 Mailbox which has a listMessages implementation which returns a Deferred 1010 which has not yet fired. 1011 """ 1012 def __init__(self, *a, **kw): 1013 self.waiting = [] 1014 DummyMailbox.__init__(self, *a, **kw) 1015 1016 1017 def listMessages(self, n=None): 1018 d = defer.Deferred() 1019 # See AsyncDeferredMailbox._flush 1020 self.waiting.append((d, DummyMailbox.listMessages(self, n))) 1021 return d 1022 1023 1024 1025class IndexErrorAsyncDeferredCommandTestCase(IndexErrorCommandTestCase): 1026 """ 1027 Run all of the L{IndexErrorCommandTestCase} tests with an asynchronous-Deferred 1028 returning IMailbox implementation. 1029 """ 1030 mailboxType = AsyncDeferredMailbox 1031 1032 def _flush(self): 1033 """ 1034 Fire whatever Deferreds we've built up in our mailbox. 1035 """ 1036 while self.pop3Server.mbox.waiting: 1037 d, a = self.pop3Server.mbox.waiting.pop() 1038 d.callback(a) 1039 IndexErrorCommandTestCase._flush(self) 1040 1041 1042 1043class ValueErrorAsyncDeferredCommandTestCase(ValueErrorCommandTestCase): 1044 """ 1045 Run all of the L{IndexErrorCommandTestCase} tests with an asynchronous-Deferred 1046 returning IMailbox implementation. 1047 """ 1048 mailboxType = AsyncDeferredMailbox 1049 1050 def _flush(self): 1051 """ 1052 Fire whatever Deferreds we've built up in our mailbox. 1053 """ 1054 while self.pop3Server.mbox.waiting: 1055 d, a = self.pop3Server.mbox.waiting.pop() 1056 d.callback(a) 1057 ValueErrorCommandTestCase._flush(self) 1058 1059class POP3MiscTestCase(unittest.TestCase): 1060 """ 1061 Miscellaneous tests more to do with module/package structure than 1062 anything to do with the Post Office Protocol. 1063 """ 1064 def test_all(self): 1065 """ 1066 This test checks that all names listed in 1067 twisted.mail.pop3.__all__ are actually present in the module. 1068 """ 1069 mod = twisted.mail.pop3 1070 for attr in mod.__all__: 1071 self.failUnless(hasattr(mod, attr)) 1072