1# -*- test-case-name: twisted.mail.test.test_imap -*- 2# Copyright (c) Twisted Matrix Laboratories. 3# See LICENSE for details. 4 5 6""" 7Test case for twisted.mail.imap4 8""" 9 10import base64 11import codecs 12import functools 13import locale 14import os 15import uuid 16from collections import OrderedDict 17from io import BytesIO 18from itertools import chain 19from typing import List, Optional, Tuple, Type 20from unittest import skipIf 21 22from zope.interface import implementer 23from zope.interface.verify import verifyClass, verifyObject 24 25from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse 26from twisted.cred.credentials import ( 27 CramMD5Credentials, 28 IUsernameHashedPassword, 29 IUsernamePassword, 30) 31from twisted.cred.error import UnauthorizedLogin 32from twisted.cred.portal import IRealm, Portal 33from twisted.internet import defer, error, interfaces, reactor 34from twisted.internet.task import Clock 35from twisted.mail import imap4 36from twisted.mail.imap4 import MessageSet 37from twisted.mail.interfaces import ( 38 IChallengeResponse, 39 IClientAuthentication, 40 ICloseableMailboxIMAP, 41) 42from twisted.protocols import loopback 43from twisted.python import failure, log, util 44from twisted.python.compat import iterbytes, nativeString, networkString 45from twisted.test.proto_helpers import StringTransport, StringTransportWithDisconnection 46from twisted.trial.unittest import SynchronousTestCase, TestCase 47 48try: 49 from twisted.test.ssl_helpers import ClientTLSContext, ServerTLSContext 50except ImportError: 51 ClientTLSContext = None # type: ignore[assignment,misc] 52 ServerTLSContext = None # type: ignore[assignment,misc] 53 54 55def strip(f): 56 return lambda result, f=f: f() 57 58 59class IMAP4UTF7Tests(TestCase): 60 tests = [ 61 ["Hello world", b"Hello world"], 62 ["Hello & world", b"Hello &- world"], 63 ["Hello\xffworld", b"Hello&AP8-world"], 64 ["\xff\xfe\xfd\xfc", b"&AP8A,gD9APw-"], 65 [ 66 "~peter/mail/\u65e5\u672c\u8a9e/\u53f0\u5317", 67 b"~peter/mail/&ZeVnLIqe-/&U,BTFw-", 68 ], # example from RFC 2060 69 ] 70 71 def test_encodeWithErrors(self): 72 """ 73 Specifying an error policy to C{unicode.encode} with the 74 I{imap4-utf-7} codec should produce the same result as not 75 specifying the error policy. 76 """ 77 text = "Hello world" 78 self.assertEqual( 79 text.encode("imap4-utf-7", "strict"), text.encode("imap4-utf-7") 80 ) 81 82 def test_decodeWithErrors(self): 83 """ 84 Similar to L{test_encodeWithErrors}, but for C{bytes.decode}. 85 """ 86 bytes = b"Hello world" 87 self.assertEqual( 88 bytes.decode("imap4-utf-7", "strict"), bytes.decode("imap4-utf-7") 89 ) 90 91 def test_encodeAmpersand(self): 92 """ 93 Unicode strings that contain an ampersand (C{&}) can be 94 encoded to bytes with the I{imap4-utf-7} codec. 95 """ 96 text = "&Hello&\N{VULGAR FRACTION ONE HALF}&" 97 self.assertEqual( 98 text.encode("imap4-utf-7"), 99 b"&-Hello&-&AL0-&-", 100 ) 101 102 def test_decodeWithoutFinalASCIIShift(self): 103 """ 104 An I{imap4-utf-7} encoded string that does not shift back to 105 ASCII (i.e., it lacks a final C{-}) can be decoded. 106 """ 107 self.assertEqual( 108 b"&AL0".decode("imap4-utf-7"), 109 "\N{VULGAR FRACTION ONE HALF}", 110 ) 111 112 def test_getreader(self): 113 """ 114 C{codecs.getreader('imap4-utf-7')} returns the I{imap4-utf-7} stream 115 reader class. 116 """ 117 reader = codecs.getreader("imap4-utf-7")(BytesIO(b"Hello&AP8-world")) 118 self.assertEqual(reader.read(), "Hello\xffworld") 119 120 def test_getwriter(self): 121 """ 122 C{codecs.getwriter('imap4-utf-7')} returns the I{imap4-utf-7} stream 123 writer class. 124 """ 125 output = BytesIO() 126 writer = codecs.getwriter("imap4-utf-7")(output) 127 writer.write("Hello\xffworld") 128 self.assertEqual(output.getvalue(), b"Hello&AP8-world") 129 130 def test_encode(self): 131 """ 132 The I{imap4-utf-7} can be used to encode a unicode string into a byte 133 string according to the IMAP4 modified UTF-7 encoding rules. 134 """ 135 for (input, output) in self.tests: 136 self.assertEqual(input.encode("imap4-utf-7"), output) 137 138 def test_decode(self): 139 """ 140 The I{imap4-utf-7} can be used to decode a byte string into a unicode 141 string according to the IMAP4 modified UTF-7 encoding rules. 142 """ 143 for (input, output) in self.tests: 144 self.assertEqual(input, output.decode("imap4-utf-7")) 145 146 def test_printableSingletons(self): 147 """ 148 The IMAP4 modified UTF-7 implementation encodes all printable 149 characters which are in ASCII using the corresponding ASCII byte. 150 """ 151 # All printables represent themselves 152 for o in chain(range(0x20, 0x26), range(0x27, 0x7F)): 153 charbyte = chr(o).encode() 154 self.assertEqual(charbyte, chr(o).encode("imap4-utf-7")) 155 self.assertEqual(chr(o), charbyte.decode("imap4-utf-7")) 156 self.assertEqual("&".encode("imap4-utf-7"), b"&-") 157 self.assertEqual(b"&-".decode("imap4-utf-7"), "&") 158 159 160class BufferingConsumer: 161 def __init__(self): 162 self.buffer = [] 163 164 def write(self, bytes): 165 self.buffer.append(bytes) 166 if self.consumer: 167 self.consumer.resumeProducing() 168 169 def registerProducer(self, consumer, streaming): 170 self.consumer = consumer 171 self.consumer.resumeProducing() 172 173 def unregisterProducer(self): 174 self.consumer = None 175 176 177class MessageProducerTests(SynchronousTestCase): 178 def testSinglePart(self): 179 body = b"This is body text. Rar." 180 headers = OrderedDict() 181 headers["from"] = "sender@host" 182 headers["to"] = "recipient@domain" 183 headers["subject"] = "booga booga boo" 184 headers["content-type"] = "text/plain" 185 186 msg = FakeyMessage(headers, (), None, body, 123, None) 187 188 c = BufferingConsumer() 189 p = imap4.MessageProducer(msg) 190 d = p.beginProducing(c) 191 192 def cbProduced(result): 193 self.assertIdentical(result, p) 194 self.assertEqual( 195 b"".join(c.buffer), 196 b"{119}\r\n" 197 b"From: sender@host\r\n" 198 b"To: recipient@domain\r\n" 199 b"Subject: booga booga boo\r\n" 200 b"Content-Type: text/plain\r\n" 201 b"\r\n" + body, 202 ) 203 204 return d.addCallback(cbProduced) 205 206 def testSingleMultiPart(self): 207 outerBody = b"" 208 innerBody = b"Contained body message text. Squarge." 209 headers = OrderedDict() 210 headers["from"] = "sender@host" 211 headers["to"] = "recipient@domain" 212 headers["subject"] = "booga booga boo" 213 headers["content-type"] = 'multipart/alternative; boundary="xyz"' 214 215 innerHeaders = OrderedDict() 216 innerHeaders["subject"] = "this is subject text" 217 innerHeaders["content-type"] = "text/plain" 218 msg = FakeyMessage( 219 headers, 220 (), 221 None, 222 outerBody, 223 123, 224 [FakeyMessage(innerHeaders, (), None, innerBody, None, None)], 225 ) 226 227 c = BufferingConsumer() 228 p = imap4.MessageProducer(msg) 229 d = p.beginProducing(c) 230 231 def cbProduced(result): 232 self.failUnlessIdentical(result, p) 233 234 self.assertEqual( 235 b"".join(c.buffer), 236 b"{239}\r\n" 237 b"From: sender@host\r\n" 238 b"To: recipient@domain\r\n" 239 b"Subject: booga booga boo\r\n" 240 b'Content-Type: multipart/alternative; boundary="xyz"\r\n' 241 b"\r\n" 242 b"\r\n" 243 b"--xyz\r\n" 244 b"Subject: this is subject text\r\n" 245 b"Content-Type: text/plain\r\n" 246 b"\r\n" + innerBody + b"\r\n--xyz--\r\n", 247 ) 248 249 return d.addCallback(cbProduced) 250 251 def testMultipleMultiPart(self): 252 outerBody = b"" 253 innerBody1 = b"Contained body message text. Squarge." 254 innerBody2 = b"Secondary <i>message</i> text of squarge body." 255 headers = OrderedDict() 256 headers["from"] = "sender@host" 257 headers["to"] = "recipient@domain" 258 headers["subject"] = "booga booga boo" 259 headers["content-type"] = 'multipart/alternative; boundary="xyz"' 260 innerHeaders = OrderedDict() 261 innerHeaders["subject"] = "this is subject text" 262 innerHeaders["content-type"] = "text/plain" 263 innerHeaders2 = OrderedDict() 264 innerHeaders2["subject"] = "<b>this is subject</b>" 265 innerHeaders2["content-type"] = "text/html" 266 msg = FakeyMessage( 267 headers, 268 (), 269 None, 270 outerBody, 271 123, 272 [ 273 FakeyMessage(innerHeaders, (), None, innerBody1, None, None), 274 FakeyMessage(innerHeaders2, (), None, innerBody2, None, None), 275 ], 276 ) 277 278 c = BufferingConsumer() 279 p = imap4.MessageProducer(msg) 280 d = p.beginProducing(c) 281 282 def cbProduced(result): 283 self.failUnlessIdentical(result, p) 284 285 self.assertEqual( 286 b"".join(c.buffer), 287 b"{354}\r\n" 288 b"From: sender@host\r\n" 289 b"To: recipient@domain\r\n" 290 b"Subject: booga booga boo\r\n" 291 b'Content-Type: multipart/alternative; boundary="xyz"\r\n' 292 b"\r\n" 293 b"\r\n" 294 b"--xyz\r\n" 295 b"Subject: this is subject text\r\n" 296 b"Content-Type: text/plain\r\n" 297 b"\r\n" + innerBody1 + b"\r\n--xyz\r\n" 298 b"Subject: <b>this is subject</b>\r\n" 299 b"Content-Type: text/html\r\n" 300 b"\r\n" + innerBody2 + b"\r\n--xyz--\r\n", 301 ) 302 303 return d.addCallback(cbProduced) 304 305 def test_multiPartNoBoundary(self): 306 """ 307 A boundary is generated if none is provided. 308 """ 309 outerBody = b"" 310 innerBody = b"Contained body message text. Squarge." 311 headers = OrderedDict() 312 headers["from"] = "sender@host" 313 headers["to"] = "recipient@domain" 314 headers["subject"] = "booga booga boo" 315 headers["content-type"] = "multipart/alternative" 316 317 innerHeaders = OrderedDict() 318 innerHeaders["subject"] = "this is subject text" 319 innerHeaders["content-type"] = "text/plain" 320 msg = FakeyMessage( 321 headers, 322 (), 323 None, 324 outerBody, 325 123, 326 [FakeyMessage(innerHeaders, (), None, innerBody, None, None)], 327 ) 328 329 c = BufferingConsumer() 330 p = imap4.MessageProducer(msg) 331 p._uuid4 = lambda: uuid.UUID("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa") 332 333 d = p.beginProducing(c) 334 335 def cbProduced(result): 336 self.failUnlessIdentical(result, p) 337 self.assertEqual( 338 b"".join(c.buffer), 339 b"{341}\r\n" 340 b"From: sender@host\r\n" 341 b"To: recipient@domain\r\n" 342 b"Subject: booga booga boo\r\n" 343 b"Content-Type: multipart/alternative; boundary=" 344 b'"----=aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"' 345 b"\r\n" 346 b"\r\n" 347 b"\r\n" 348 b"------=aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\r\n" 349 b"Subject: this is subject text\r\n" 350 b"Content-Type: text/plain\r\n" 351 b"\r\n" 352 + innerBody 353 + b"\r\n------=aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa--\r\n", 354 ) 355 356 return d.addCallback(cbProduced) 357 358 def test_multiPartNoQuotes(self): 359 """ 360 A boundary without does not have them added. 361 """ 362 outerBody = b"" 363 innerBody = b"Contained body message text. Squarge." 364 headers = OrderedDict() 365 headers["from"] = "sender@host" 366 headers["to"] = "recipient@domain" 367 headers["subject"] = "booga booga boo" 368 headers["content-type"] = "multipart/alternative; boundary=xyz" 369 370 innerHeaders = OrderedDict() 371 innerHeaders["subject"] = "this is subject text" 372 innerHeaders["content-type"] = "text/plain" 373 msg = FakeyMessage( 374 headers, 375 (), 376 None, 377 outerBody, 378 123, 379 [FakeyMessage(innerHeaders, (), None, innerBody, None, None)], 380 ) 381 382 c = BufferingConsumer() 383 p = imap4.MessageProducer(msg) 384 d = p.beginProducing(c) 385 386 def cbProduced(result): 387 self.failUnlessIdentical(result, p) 388 self.assertEqual( 389 b"".join(c.buffer), 390 b"{237}\r\n" 391 b"From: sender@host\r\n" 392 b"To: recipient@domain\r\n" 393 b"Subject: booga booga boo\r\n" 394 b"Content-Type: multipart/alternative; boundary=" 395 b"xyz" 396 b"\r\n" 397 b"\r\n" 398 b"\r\n" 399 b"--xyz\r\n" 400 b"Subject: this is subject text\r\n" 401 b"Content-Type: text/plain\r\n" 402 b"\r\n" + innerBody + b"\r\n--xyz--\r\n", 403 ) 404 405 return d.addCallback(cbProduced) 406 407 408class MessageSetTests(SynchronousTestCase): 409 """ 410 Tests for L{MessageSet}. 411 """ 412 413 def test_equalityIterationAndAddition(self): 414 """ 415 Test the following properties of L{MessageSet} addition and 416 equality: 417 418 1. Two empty L{MessageSet}s are equal to each other; 419 420 2. A L{MessageSet} is not equal to any other object; 421 422 2. Adding a L{MessageSet} and another L{MessageSet} or an 423 L{int} representing a single message or a sequence of 424 L{int}s representing a sequence of message numbers 425 produces a new L{MessageSet} that: 426 427 3. Has a length equal to the number of messages within 428 each sequence of message numbers; 429 430 4. Yields each message number in ascending order when 431 iterated over; 432 433 6. L{MessageSet.add} with a single message or a start and 434 end message satisfies 3 and 4 above. 435 """ 436 m1 = MessageSet() 437 m2 = MessageSet() 438 439 self.assertEqual(m1, m2) 440 self.assertNotEqual(m1, ()) 441 442 m1 = m1 + 1 443 self.assertEqual(len(m1), 1) 444 self.assertEqual(list(m1), [1]) 445 446 m1 = m1 + (1, 3) 447 self.assertEqual(len(m1), 3) 448 self.assertEqual(list(m1), [1, 2, 3]) 449 450 m2 = m2 + (1, 3) 451 self.assertEqual(m1, m2) 452 self.assertEqual(list(m1 + m2), [1, 2, 3]) 453 454 m1.add(5) 455 self.assertEqual(len(m1), 4) 456 self.assertEqual(list(m1), [1, 2, 3, 5]) 457 458 self.assertNotEqual(m1, m2) 459 460 m1.add(6, 8) 461 self.assertEqual(len(m1), 7) 462 self.assertEqual(list(m1), [1, 2, 3, 5, 6, 7, 8]) 463 464 def test_lengthWithWildcardRange(self): 465 """ 466 A L{MessageSet} that has a range that ends with L{None} raises 467 a L{TypeError} when its length is requested. 468 """ 469 self.assertRaises(TypeError, len, MessageSet(1, None)) 470 471 def test_reprSanity(self): 472 """ 473 L{MessageSet.__repr__} does not raise an exception 474 """ 475 repr(MessageSet(1, 2)) 476 477 def test_stringRepresentationWithWildcards(self): 478 """ 479 In a L{MessageSet}, in the presence of wildcards, if the 480 highest message id is known, the wildcard should get replaced 481 by that high value. 482 """ 483 inputs = [ 484 imap4.parseIdList(b"*"), 485 imap4.parseIdList(b"1:*"), 486 imap4.parseIdList(b"3:*", 6), 487 imap4.parseIdList(b"*:2", 6), 488 ] 489 490 outputs = [ 491 "*", 492 "1:*", 493 "3:6", 494 "2:6", 495 ] 496 497 for i, o in zip(inputs, outputs): 498 self.assertEqual(str(i), o) 499 500 def test_stringRepresentationWithInversion(self): 501 """ 502 In a L{MessageSet}, inverting the high and low numbers in a 503 range doesn't affect the meaning of the range. For example, 504 3:2 displays just like 2:3, because according to the RFC they 505 have the same meaning. 506 """ 507 inputs = [ 508 imap4.parseIdList(b"2:3"), 509 imap4.parseIdList(b"3:2"), 510 ] 511 512 outputs = [ 513 "2:3", 514 "2:3", 515 ] 516 517 for i, o in zip(inputs, outputs): 518 self.assertEqual(str(i), o) 519 520 def test_createWithSingleMessageNumber(self): 521 """ 522 Creating a L{MessageSet} with a single message number adds 523 only that message to the L{MessageSet}; its serialized form 524 includes only that message number, its length is one, and it 525 yields only that message number. 526 """ 527 m = MessageSet(1) 528 self.assertEqual(str(m), "1") 529 self.assertEqual(len(m), 1) 530 self.assertEqual(list(m), [1]) 531 532 def test_createWithSequence(self): 533 """ 534 Creating a L{MessageSet} with both a start and end message 535 number adds the sequence between to the L{MessageSet}; its 536 serialized form consists that range, its length is the length 537 of the sequence, and it yields the message numbers inclusively 538 between the start and end. 539 """ 540 m = MessageSet(1, 10) 541 self.assertEqual(str(m), "1:10") 542 self.assertEqual(len(m), 10) 543 self.assertEqual(list(m), [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) 544 545 def test_createWithSingleWildcard(self): 546 """ 547 Creating a L{MessageSet} with a single L{None}, representing 548 C{*}, adds C{*} to the range; its serialized form includes 549 only C{*}, its length is one, but it cannot be iterated over 550 because its endpoint is unknown. 551 """ 552 m = MessageSet(None) 553 self.assertEqual(str(m), "*") 554 self.assertEqual(len(m), 1) 555 self.assertRaises(TypeError, list, m) 556 557 def test_setLastSingleWildcard(self): 558 """ 559 Setting L{MessageSet.last} replaces L{None}, representing 560 C{*}, with that number, making that L{MessageSet} iterable. 561 """ 562 singleMessageReplaced = MessageSet(None) 563 singleMessageReplaced.last = 10 564 self.assertEqual(list(singleMessageReplaced), [10]) 565 566 rangeReplaced = MessageSet(3, None) 567 rangeReplaced.last = 1 568 self.assertEqual(list(rangeReplaced), [1, 2, 3]) 569 570 def test_setLastWithWildcardRange(self): 571 """ 572 Setting L{MessageSet.last} replaces L{None} in all ranges. 573 """ 574 m = MessageSet(1, None) 575 m.add(2, None) 576 m.last = 5 577 self.assertEqual(list(m), [1, 2, 3, 4, 5]) 578 579 def test_setLastTwiceFails(self): 580 """ 581 L{MessageSet.last} cannot be set twice. 582 """ 583 m = MessageSet(1, None) 584 m.last = 2 585 with self.assertRaises(ValueError): 586 m.last = 3 587 588 def test_lastOverridesNoneInAdd(self): 589 """ 590 Adding a L{None}, representing C{*}, or a sequence that 591 includes L{None} to a L{MessageSet} whose 592 L{last<MessageSet.last>} property has been set replaces all 593 occurrences of L{None} with the value of 594 L{last<MessageSet.last>}. 595 """ 596 hasLast = MessageSet(1) 597 hasLast.last = 4 598 599 hasLast.add(None) 600 self.assertEqual(list(hasLast), [1, 4]) 601 602 self.assertEqual(list(hasLast + (None, 5)), [1, 4, 5]) 603 604 hasLast.add(3, None) 605 self.assertEqual(list(hasLast), [1, 3, 4]) 606 607 def test_getLast(self): 608 """ 609 Accessing L{MessageSet.last} returns the last value. 610 """ 611 m = MessageSet(1, None) 612 m.last = 2 613 self.assertEqual(m.last, 2) 614 615 def test_extend(self): 616 """ 617 L{MessageSet.extend} accepts as its arugment an L{int} or 618 L{None}, or a sequence L{int}s or L{None}s of length two, or 619 another L{MessageSet}, combining its argument with its 620 instance's existing ranges. 621 """ 622 extendWithInt = MessageSet() 623 extendWithInt.extend(1) 624 self.assertEqual(list(extendWithInt), [1]) 625 626 extendWithNone = MessageSet() 627 extendWithNone.extend(None) 628 self.assertEqual(str(extendWithNone), "*") 629 630 extendWithSequenceOfInts = MessageSet() 631 extendWithSequenceOfInts.extend((1, 3)) 632 self.assertEqual(list(extendWithSequenceOfInts), [1, 2, 3]) 633 634 extendWithSequenceOfNones = MessageSet() 635 extendWithSequenceOfNones.extend((None, None)) 636 self.assertEqual(str(extendWithSequenceOfNones), "*") 637 638 extendWithMessageSet = MessageSet() 639 extendWithMessageSet.extend(MessageSet(1, 3)) 640 self.assertEqual(list(extendWithMessageSet), [1, 2, 3]) 641 642 def test_contains(self): 643 """ 644 A L{MessageSet} contains a number if the number falls within 645 one of its ranges, and raises L{TypeError} if any range 646 contains L{None}. 647 """ 648 hasFive = MessageSet(1, 7) 649 doesNotHaveFive = MessageSet(1, 4) + MessageSet(6, 7) 650 651 self.assertIn(5, hasFive) 652 self.assertNotIn(5, doesNotHaveFive) 653 654 hasFiveButHasNone = hasFive + None 655 with self.assertRaises(TypeError): 656 5 in hasFiveButHasNone 657 658 hasFiveButHasNoneInSequence = hasFive + (10, 12) 659 hasFiveButHasNoneInSequence.add(8, None) 660 with self.assertRaises(TypeError): 661 5 in hasFiveButHasNoneInSequence 662 663 def test_rangesMerged(self): 664 """ 665 Adding a sequence of message numbers to a L{MessageSet} that 666 begins or ends immediately before or after an existing 667 sequence in that L{MessageSet}, or overlaps one, merges the two. 668 """ 669 670 mergeAfter = MessageSet(1, 3) 671 mergeBefore = MessageSet(6, 8) 672 673 mergeBetweenSequence = mergeAfter + mergeBefore 674 mergeBetweenNumber = mergeAfter + MessageSet(5, 7) 675 676 self.assertEqual(list(mergeAfter + (2, 4)), [1, 2, 3, 4]) 677 self.assertEqual(list(mergeAfter + (3, 5)), [1, 2, 3, 4, 5]) 678 679 self.assertEqual(list(mergeBefore + (5, 7)), [5, 6, 7, 8]) 680 self.assertEqual(list(mergeBefore + (4, 6)), [4, 5, 6, 7, 8]) 681 682 self.assertEqual(list(mergeBetweenSequence + (3, 5)), [1, 2, 3, 4, 5, 6, 7, 8]) 683 self.assertEqual( 684 list(mergeBetweenNumber + MessageSet(4)), [1, 2, 3, 4, 5, 6, 7] 685 ) 686 687 def test_seq_rangeExamples(self): 688 """ 689 Test the C{seq-range} examples from Section 9, "Formal Syntax" 690 of RFC 3501:: 691 692 Example: 2:4 and 4:2 are equivalent and indicate values 693 2, 3, and 4. 694 695 Example: a unique identifier sequence range of 696 3291:* includes the UID of the last message in 697 the mailbox, even if that value is less than 3291. 698 699 @see: U{http://tools.ietf.org/html/rfc3501#section-9} 700 """ 701 702 self.assertEqual(MessageSet(2, 4), MessageSet(4, 2)) 703 self.assertEqual(list(MessageSet(2, 4)), [2, 3, 4]) 704 705 m = MessageSet(3291, None) 706 m.last = 3290 707 self.assertEqual(list(m), [3290, 3291]) 708 709 def test_sequence_setExamples(self): 710 """ 711 Test the C{sequence-set} examples from Section 9, "Formal 712 Syntax" of RFC 3501. In particular, L{MessageSet} reorders 713 and coalesces overlaps:: 714 715 Example: a message sequence number set of 716 2,4:7,9,12:* for a mailbox with 15 messages is 717 equivalent to 2,4,5,6,7,9,12,13,14,15 718 719 Example: a message sequence number set of *:4,5:7 720 for a mailbox with 10 messages is equivalent to 721 10,9,8,7,6,5,4,5,6,7 and MAY be reordered and 722 overlap coalesced to be 4,5,6,7,8,9,10. 723 724 @see: U{http://tools.ietf.org/html/rfc3501#section-9} 725 """ 726 fromFifteenMessages = ( 727 MessageSet(2) + MessageSet(4, 7) + MessageSet(9) + MessageSet(12, None) 728 ) 729 fromFifteenMessages.last = 15 730 self.assertEqual( 731 ",".join(str(i) for i in fromFifteenMessages), "2,4,5,6,7,9,12,13,14,15" 732 ) 733 734 fromTenMessages = MessageSet(None, 4) + MessageSet(5, 7) 735 fromTenMessages.last = 10 736 self.assertEqual(",".join(str(i) for i in fromTenMessages), "4,5,6,7,8,9,10") 737 738 739class IMAP4HelperTests(TestCase): 740 """ 741 Tests for various helper utilities in the IMAP4 module. 742 """ 743 744 def test_commandRepr(self): 745 """ 746 L{imap4.Command}'s C{repr} does not raise an exception. 747 """ 748 repr(imap4.Command(b"COMMAND", [b"arg"], (b"extra"))) 749 750 def test_fileProducer(self): 751 b = ((b"x" * 1) + (b"y" * 1) + (b"z" * 1)) * 10 752 c = BufferingConsumer() 753 f = BytesIO(b) 754 p = imap4.FileProducer(f) 755 d = p.beginProducing(c) 756 757 def cbProduced(result): 758 self.failUnlessIdentical(result, p) 759 self.assertEqual(b"{%d}\r\n%b" % (len(b), b), b"".join(c.buffer)) 760 return result 761 762 def cbResume(result): 763 # Calling resumeProducing after completion does not raise 764 # an exception 765 p.resumeProducing() 766 return result 767 768 d.addCallback(cbProduced) 769 d.addCallback(cbResume) 770 # The second cbProduced ensures calling resumeProducing after 771 # completion does not change the result. 772 return d.addCallback(cbProduced) 773 774 def test_wildcard(self): 775 cases = [ 776 [ 777 "foo/%gum/bar", 778 ["foo/bar", "oo/lalagum/bar", "foo/gumx/bar", "foo/gum/baz"], 779 ["foo/xgum/bar", "foo/gum/bar"], 780 ], 781 [ 782 "foo/x%x/bar", 783 ["foo", "bar", "fuz fuz fuz", "foo/*/bar", "foo/xyz/bar", "foo/xx/baz"], 784 ["foo/xyx/bar", "foo/xx/bar", "foo/xxxxxxxxxxxxxx/bar"], 785 ], 786 [ 787 "foo/xyz*abc/bar", 788 ["foo/xyz/bar", "foo/abc/bar", "foo/xyzab/cbar", "foo/xyza/bcbar"], 789 ["foo/xyzabc/bar", "foo/xyz/abc/bar", "foo/xyz/123/abc/bar"], 790 ], 791 ] 792 793 for (wildcard, fail, succeed) in cases: 794 wildcard = imap4.wildcardToRegexp(wildcard, "/") 795 for x in fail: 796 self.assertFalse(wildcard.match(x)) 797 for x in succeed: 798 self.assertTrue(wildcard.match(x)) 799 800 def test_wildcardNoDelim(self): 801 cases = [ 802 [ 803 "foo/%gum/bar", 804 ["foo/bar", "oo/lalagum/bar", "foo/gumx/bar", "foo/gum/baz"], 805 ["foo/xgum/bar", "foo/gum/bar", "foo/x/gum/bar"], 806 ], 807 [ 808 "foo/x%x/bar", 809 ["foo", "bar", "fuz fuz fuz", "foo/*/bar", "foo/xyz/bar", "foo/xx/baz"], 810 ["foo/xyx/bar", "foo/xx/bar", "foo/xxxxxxxxxxxxxx/bar", "foo/x/x/bar"], 811 ], 812 [ 813 "foo/xyz*abc/bar", 814 ["foo/xyz/bar", "foo/abc/bar", "foo/xyzab/cbar", "foo/xyza/bcbar"], 815 ["foo/xyzabc/bar", "foo/xyz/abc/bar", "foo/xyz/123/abc/bar"], 816 ], 817 ] 818 819 for (wildcard, fail, succeed) in cases: 820 wildcard = imap4.wildcardToRegexp(wildcard, None) 821 for x in fail: 822 self.assertFalse(wildcard.match(x), x) 823 for x in succeed: 824 self.assertTrue(wildcard.match(x), x) 825 826 def test_headerFormatter(self): 827 """ 828 L{imap4._formatHeaders} accepts a C{dict} of header name/value pairs and 829 returns a string representing those headers in the standard multiline, 830 C{":"}-separated format. 831 """ 832 cases = [ 833 ( 834 {"Header1": "Value1", "Header2": "Value2"}, 835 b"Header2: Value2\r\nHeader1: Value1\r\n", 836 ), 837 ] 838 839 for (input, expected) in cases: 840 output = imap4._formatHeaders(input) 841 self.assertEqual( 842 sorted(output.splitlines(True)), sorted(expected.splitlines(True)) 843 ) 844 845 def test_quotedSplitter(self): 846 cases = [ 847 b"""Hello World""", 848 b'''Hello "World!"''', 849 b'''World "Hello" "How are you?"''', 850 b'''"Hello world" How "are you?"''', 851 b"""foo bar "baz buz" NIL""", 852 b'''foo bar "baz buz" "NIL"''', 853 b"""foo NIL "baz buz" bar""", 854 b"""foo "NIL" "baz buz" bar""", 855 b""""NIL" bar "baz buz" foo""", 856 b'oo \\"oo\\" oo', 857 b'"oo \\"oo\\" oo"', 858 b"oo \t oo", 859 b'"oo \t oo"', 860 b"oo \\t oo", 861 b'"oo \\t oo"', 862 br"oo \o oo", 863 br'"oo \o oo"', 864 b"oo \\o oo", 865 b'"oo \\o oo"', 866 ] 867 868 answers = [ 869 [b"Hello", b"World"], 870 [b"Hello", b"World!"], 871 [b"World", b"Hello", b"How are you?"], 872 [b"Hello world", b"How", b"are you?"], 873 [b"foo", b"bar", b"baz buz", None], 874 [b"foo", b"bar", b"baz buz", b"NIL"], 875 [b"foo", None, b"baz buz", b"bar"], 876 [b"foo", b"NIL", b"baz buz", b"bar"], 877 [b"NIL", b"bar", b"baz buz", b"foo"], 878 [b"oo", b'"oo"', b"oo"], 879 [b'oo "oo" oo'], 880 [b"oo", b"oo"], 881 [b"oo \t oo"], 882 [b"oo", b"\\t", b"oo"], 883 [b"oo \\t oo"], 884 [b"oo", br"\o", b"oo"], 885 [br"oo \o oo"], 886 [b"oo", b"\\o", b"oo"], 887 [b"oo \\o oo"], 888 ] 889 890 errors = [ 891 b'"mismatched quote', 892 b'mismatched quote"', 893 b'mismatched"quote', 894 b'"oops here is" another"', 895 ] 896 897 for s in errors: 898 self.assertRaises(imap4.MismatchedQuoting, imap4.splitQuoted, s) 899 900 for (case, expected) in zip(cases, answers): 901 self.assertEqual(imap4.splitQuoted(case), expected) 902 903 def test_stringCollapser(self): 904 cases = [ 905 [b"a", b"b", b"c", b"d", b"e"], 906 [b"a", b" ", b'"', b"b", b"c", b" ", b'"', b" ", b"d", b"e"], 907 [[b"a", b"b", b"c"], b"d", b"e"], 908 [b"a", [b"b", b"c", b"d"], b"e"], 909 [b"a", b"b", [b"c", b"d", b"e"]], 910 [b'"', b"a", b" ", b'"', [b"b", b"c", b"d"], b'"', b" ", b"e", b'"'], 911 [b"a", [b'"', b" ", b"b", b"c", b" ", b" ", b'"'], b"d", b"e"], 912 ] 913 914 answers = [ 915 [b"abcde"], 916 [b"a", b"bc ", b"de"], 917 [[b"abc"], b"de"], 918 [b"a", [b"bcd"], b"e"], 919 [b"ab", [b"cde"]], 920 [b"a ", [b"bcd"], b" e"], 921 [b"a", [b" bc "], b"de"], 922 ] 923 924 for (case, expected) in zip(cases, answers): 925 self.assertEqual(imap4.collapseStrings(case), expected) 926 927 def test_parenParser(self): 928 s = b"\r\n".join([b"xx"] * 4) 929 930 def check(case, expected): 931 parsed = imap4.parseNestedParens(case) 932 self.assertEqual(parsed, [expected]) 933 # XXX This code used to work, but changes occurred within the 934 # imap4.py module which made it no longer necessary for *all* of it 935 # to work. In particular, only the part that makes 936 # 'BODY.PEEK[HEADER.FIELDS.NOT (Subject Bcc Cc)]' come out 937 # correctly no longer needs to work. So, I am loathe to delete the 938 # entire section of the test. --exarkun 939 940 # self.assertEqual(b'(' + imap4.collapseNestedLists(parsed) + b')', 941 # expected) 942 943 check( 944 b"(BODY.PEEK[HEADER.FIELDS.NOT (subject bcc cc)] {%d}\r\n%b)" % (len(s), s), 945 [b"BODY.PEEK", [b"HEADER.FIELDS.NOT", [b"subject", b"bcc", b"cc"]], s], 946 ) 947 check( 948 b'(FLAGS (\\Seen) INTERNALDATE "17-Jul-1996 02:44:25 -0700" ' 949 b"RFC822.SIZE 4286 ENVELOPE " 950 b'("Wed, 17 Jul 1996 02:23:25 -0700 (PDT)" ' 951 b'"IMAP4rev1 WG mtg summary and minutes" ' 952 b'(("Terry Gray" NIL gray cac.washington.edu)) ' 953 b'(("Terry Gray" NIL gray cac.washington.edu)) ' 954 b'(("Terry Gray" NIL gray cac.washington.edu)) ' 955 b"((NIL NIL imap cac.washington.edu)) " 956 b"((NIL NIL minutes CNRI.Reston.VA.US) " 957 b'("John Klensin" NIL KLENSIN INFOODS.MIT.EDU)) NIL NIL ' 958 b"<B27397-0100000@cac.washington.edu>) " 959 b"BODY (TEXT PLAIN (CHARSET US-ASCII) NIL NIL 7BIT 3028 92))", 960 [ 961 b"FLAGS", 962 [br"\Seen"], 963 b"INTERNALDATE", 964 b"17-Jul-1996 02:44:25 -0700", 965 b"RFC822.SIZE", 966 b"4286", 967 b"ENVELOPE", 968 [ 969 b"Wed, 17 Jul 1996 02:23:25 -0700 (PDT)", 970 b"IMAP4rev1 WG mtg summary and minutes", 971 [[b"Terry Gray", None, b"gray", b"cac.washington.edu"]], 972 [[b"Terry Gray", None, b"gray", b"cac.washington.edu"]], 973 [[b"Terry Gray", None, b"gray", b"cac.washington.edu"]], 974 [[None, None, b"imap", b"cac.washington.edu"]], 975 [ 976 [None, None, b"minutes", b"CNRI.Reston.VA.US"], 977 [b"John Klensin", None, b"KLENSIN", b"INFOODS.MIT.EDU"], 978 ], 979 None, 980 None, 981 b"<B27397-0100000@cac.washington.edu>", 982 ], 983 b"BODY", 984 [ 985 b"TEXT", 986 b"PLAIN", 987 [b"CHARSET", b"US-ASCII"], 988 None, 989 None, 990 b"7BIT", 991 b"3028", 992 b"92", 993 ], 994 ], 995 ) 996 997 check(b'("oo \\"oo\\" oo")', [b'oo "oo" oo']) 998 check(b'("oo \\\\ oo")', [b"oo \\\\ oo"]) 999 check(b'("oo \\ oo")', [b"oo \\ oo"]) 1000 1001 check(b'("oo \\o")', [b"oo \\o"]) 1002 check(br'("oo \o")', [br"oo \o"]) 1003 check(br"(oo \o)", [b"oo", br"\o"]) 1004 check(b"(oo \\o)", [b"oo", b"\\o"]) 1005 1006 def test_fetchParserSimple(self): 1007 cases = [ 1008 ["ENVELOPE", "Envelope", "envelope"], 1009 ["FLAGS", "Flags", "flags"], 1010 ["INTERNALDATE", "InternalDate", "internaldate"], 1011 ["RFC822.HEADER", "RFC822Header", "rfc822.header"], 1012 ["RFC822.SIZE", "RFC822Size", "rfc822.size"], 1013 ["RFC822.TEXT", "RFC822Text", "rfc822.text"], 1014 ["RFC822", "RFC822", "rfc822"], 1015 ["UID", "UID", "uid"], 1016 ["BODYSTRUCTURE", "BodyStructure", "bodystructure"], 1017 ] 1018 1019 for (inp, outp, asString) in cases: 1020 inp = inp.encode("ascii") 1021 p = imap4._FetchParser() 1022 p.parseString(inp) 1023 self.assertEqual(len(p.result), 1) 1024 self.assertTrue(isinstance(p.result[0], getattr(p, outp))) 1025 self.assertEqual(str(p.result[0]), asString) 1026 1027 def test_fetchParserMacros(self): 1028 cases = [ 1029 [b"ALL", (4, [b"flags", b"internaldate", b"rfc822.size", b"envelope"])], 1030 [ 1031 b"FULL", 1032 (5, [b"flags", b"internaldate", b"rfc822.size", b"envelope", b"body"]), 1033 ], 1034 [b"FAST", (3, [b"flags", b"internaldate", b"rfc822.size"])], 1035 ] 1036 1037 for (inp, outp) in cases: 1038 p = imap4._FetchParser() 1039 p.parseString(inp) 1040 self.assertEqual(len(p.result), outp[0]) 1041 expectedResult = [str(token).lower().encode("ascii") for token in p.result] 1042 expectedResult.sort() 1043 outp[1].sort() 1044 self.assertEqual(expectedResult, outp[1]) 1045 1046 def test_fetchParserBody(self): 1047 P = imap4._FetchParser 1048 1049 p = P() 1050 p.parseString(b"BODY") 1051 self.assertEqual(len(p.result), 1) 1052 self.assertTrue(isinstance(p.result[0], p.Body)) 1053 self.assertEqual(p.result[0].peek, False) 1054 self.assertEqual(p.result[0].header, None) 1055 self.assertEqual(str(p.result[0]), "BODY") 1056 1057 p = P() 1058 p.parseString(b"BODY.PEEK") 1059 self.assertEqual(len(p.result), 1) 1060 self.assertTrue(isinstance(p.result[0], p.Body)) 1061 self.assertEqual(p.result[0].peek, True) 1062 self.assertEqual(str(p.result[0]), "BODY") 1063 1064 p = P() 1065 p.parseString(b"BODY[]") 1066 self.assertEqual(len(p.result), 1) 1067 self.assertTrue(isinstance(p.result[0], p.Body)) 1068 self.assertEqual(p.result[0].empty, True) 1069 self.assertEqual(str(p.result[0]), "BODY[]") 1070 1071 p = P() 1072 p.parseString(b"BODY[HEADER]") 1073 self.assertEqual(len(p.result), 1) 1074 self.assertTrue(isinstance(p.result[0], p.Body)) 1075 self.assertEqual(p.result[0].peek, False) 1076 self.assertTrue(isinstance(p.result[0].header, p.Header)) 1077 self.assertEqual(p.result[0].header.negate, True) 1078 self.assertEqual(p.result[0].header.fields, ()) 1079 self.assertEqual(p.result[0].empty, False) 1080 self.assertEqual(str(p.result[0]), "BODY[HEADER]") 1081 1082 p = P() 1083 p.parseString(b"BODY.PEEK[HEADER]") 1084 self.assertEqual(len(p.result), 1) 1085 self.assertTrue(isinstance(p.result[0], p.Body)) 1086 self.assertEqual(p.result[0].peek, True) 1087 self.assertTrue(isinstance(p.result[0].header, p.Header)) 1088 self.assertEqual(p.result[0].header.negate, True) 1089 self.assertEqual(p.result[0].header.fields, ()) 1090 self.assertEqual(p.result[0].empty, False) 1091 self.assertEqual(str(p.result[0]), "BODY[HEADER]") 1092 1093 p = P() 1094 p.parseString(b"BODY[HEADER.FIELDS (Subject Cc Message-Id)]") 1095 self.assertEqual(len(p.result), 1) 1096 self.assertTrue(isinstance(p.result[0], p.Body)) 1097 self.assertEqual(p.result[0].peek, False) 1098 self.assertTrue(isinstance(p.result[0].header, p.Header)) 1099 self.assertEqual(p.result[0].header.negate, False) 1100 self.assertEqual(p.result[0].header.fields, [b"SUBJECT", b"CC", b"MESSAGE-ID"]) 1101 self.assertEqual(p.result[0].empty, False) 1102 self.assertEqual( 1103 bytes(p.result[0]), b"BODY[HEADER.FIELDS (Subject Cc Message-Id)]" 1104 ) 1105 1106 p = P() 1107 p.parseString(b"BODY.PEEK[HEADER.FIELDS (Subject Cc Message-Id)]") 1108 self.assertEqual(len(p.result), 1) 1109 self.assertTrue(isinstance(p.result[0], p.Body)) 1110 self.assertEqual(p.result[0].peek, True) 1111 self.assertTrue(isinstance(p.result[0].header, p.Header)) 1112 self.assertEqual(p.result[0].header.negate, False) 1113 self.assertEqual(p.result[0].header.fields, [b"SUBJECT", b"CC", b"MESSAGE-ID"]) 1114 self.assertEqual(p.result[0].empty, False) 1115 self.assertEqual( 1116 bytes(p.result[0]), b"BODY[HEADER.FIELDS (Subject Cc Message-Id)]" 1117 ) 1118 1119 p = P() 1120 p.parseString(b"BODY.PEEK[HEADER.FIELDS.NOT (Subject Cc Message-Id)]") 1121 self.assertEqual(len(p.result), 1) 1122 self.assertTrue(isinstance(p.result[0], p.Body)) 1123 self.assertEqual(p.result[0].peek, True) 1124 self.assertTrue(isinstance(p.result[0].header, p.Header)) 1125 self.assertEqual(p.result[0].header.negate, True) 1126 self.assertEqual(p.result[0].header.fields, [b"SUBJECT", b"CC", b"MESSAGE-ID"]) 1127 self.assertEqual(p.result[0].empty, False) 1128 self.assertEqual( 1129 bytes(p.result[0]), b"BODY[HEADER.FIELDS.NOT (Subject Cc Message-Id)]" 1130 ) 1131 1132 p = P() 1133 p.parseString(b"BODY[1.MIME]<10.50>") 1134 self.assertEqual(len(p.result), 1) 1135 self.assertTrue(isinstance(p.result[0], p.Body)) 1136 self.assertEqual(p.result[0].peek, False) 1137 self.assertTrue(isinstance(p.result[0].mime, p.MIME)) 1138 self.assertEqual(p.result[0].part, (0,)) 1139 self.assertEqual(p.result[0].partialBegin, 10) 1140 self.assertEqual(p.result[0].partialLength, 50) 1141 self.assertEqual(p.result[0].empty, False) 1142 self.assertEqual(bytes(p.result[0]), b"BODY[1.MIME]<10.50>") 1143 1144 p = P() 1145 p.parseString( 1146 b"BODY.PEEK[1.3.9.11.HEADER.FIELDS.NOT (Message-Id Date)]<103.69>" 1147 ) 1148 self.assertEqual(len(p.result), 1) 1149 self.assertTrue(isinstance(p.result[0], p.Body)) 1150 self.assertEqual(p.result[0].peek, True) 1151 self.assertTrue(isinstance(p.result[0].header, p.Header)) 1152 self.assertEqual(p.result[0].part, (0, 2, 8, 10)) 1153 self.assertEqual(p.result[0].header.fields, [b"MESSAGE-ID", b"DATE"]) 1154 self.assertEqual(p.result[0].partialBegin, 103) 1155 self.assertEqual(p.result[0].partialLength, 69) 1156 self.assertEqual(p.result[0].empty, False) 1157 self.assertEqual( 1158 bytes(p.result[0]), 1159 b"BODY[1.3.9.11.HEADER.FIELDS.NOT (Message-Id Date)]<103.69>", 1160 ) 1161 1162 def test_fetchParserQuotedHeader(self): 1163 """ 1164 Parsing a C{BODY} whose C{HEADER} values require quoting 1165 results in a object that perserves that quoting when 1166 serialized. 1167 """ 1168 p = imap4._FetchParser() 1169 p.parseString(b"BODY[HEADER.FIELDS ((Quoted)]") 1170 self.assertEqual(len(p.result), 1) 1171 self.assertEqual(p.result[0].peek, False) 1172 self.assertIsInstance(p.result[0], p.Body) 1173 self.assertIsInstance(p.result[0].header, p.Header) 1174 self.assertEqual(bytes(p.result[0]), b'BODY[HEADER.FIELDS ("(Quoted")]') 1175 1176 def test_fetchParserEmptyString(self): 1177 """ 1178 Parsing an empty string results in no data. 1179 """ 1180 p = imap4._FetchParser() 1181 p.parseString(b"") 1182 self.assertFalse(len(p.result)) 1183 1184 def test_fetchParserUnknownAttribute(self): 1185 """ 1186 Parsing a string with an unknown attribute raises an 1187 L{Exception}. 1188 """ 1189 p = imap4._FetchParser() 1190 self.assertRaises(Exception, p.parseString, b"UNKNOWN") 1191 1192 def test_fetchParserIncompleteStringEndsInWhitespace(self): 1193 """ 1194 Parsing a string that prematurely ends in whitespace raises an 1195 L{Exception}. 1196 """ 1197 p = imap4._FetchParser() 1198 self.assertRaises(Exception, p.parseString, b"BODY[HEADER.FIELDS ") 1199 1200 def test_fetchParserExpectedWhitespace(self): 1201 """ 1202 Parsing a string that contains an unexpected character rather 1203 than whitespace raises an L{Exception}. 1204 """ 1205 p = imap4._FetchParser() 1206 self.assertRaises(Exception, p.parseString, b"BODY[HEADER.FIELDS!]") 1207 1208 def test_fetchParserTextSection(self): 1209 """ 1210 A C{BODY} can contain a C{TEXT} section. 1211 """ 1212 p = imap4._FetchParser() 1213 p.parseString(b"BODY[TEXT]") 1214 self.assertEqual(len(p.result), 1) 1215 self.assertIsInstance(p.result[0], p.Body) 1216 self.assertEqual(p.result[0].peek, False) 1217 self.assertIsInstance(p.result[0].text, p.Text) 1218 self.assertEqual(bytes(p.result[0]), b"BODY[TEXT]") 1219 1220 def test_fetchParserUnknownSection(self): 1221 """ 1222 Parsing a C{BODY} with an unknown section raises an 1223 L{Exception}. 1224 """ 1225 p = imap4._FetchParser() 1226 self.assertRaises(Exception, p.parseString, b"BODY[UNKNOWN]") 1227 1228 def test_fetchParserMissingSectionClose(self): 1229 """ 1230 Parsing a C{BODY} with an unterminated section list raises an 1231 L{Exception}. 1232 """ 1233 p = imap4._FetchParser() 1234 self.assertRaises(Exception, p.parseString, b"BODY[HEADER") 1235 p = imap4._FetchParser() 1236 self.assertRaises(Exception, p.parseString, b"BODY[HEADER.FIELDS (SUBJECT)") 1237 1238 def test_fetchParserHeaderMissingParentheses(self): 1239 """ 1240 Parsing a C{BODY} whose C{HEADER.FIELDS} list does not begin 1241 with an open parenthesis (C{(}) or end with a close 1242 parenthesis (C{)}) raises an L{Exception}. 1243 """ 1244 p = imap4._FetchParser() 1245 self.assertRaises(Exception, p.parseString, b"BODY[HEADER.FIELDS Missing)]") 1246 p = imap4._FetchParser() 1247 self.assertRaises(Exception, p.parseString, b"BODY[HEADER.FIELDS (Missing]") 1248 1249 def test_fetchParserDotlessPartial(self): 1250 """ 1251 Parsing a C{BODY} with a range that lacks a period (C{.}) 1252 raises an L{Exception}. 1253 """ 1254 p = imap4._FetchParser() 1255 self.assertRaises(Exception, p.parseString, b"BODY<01>") 1256 1257 def test_fetchParserUnclosedPartial(self): 1258 """ 1259 Parsing a C{BODY} with a partial range that's missing its 1260 closing greater than sign (C{>}) raises an L{EXCEPTION}. 1261 """ 1262 p = imap4._FetchParser() 1263 self.assertRaises(Exception, p.parseString, b"BODY<0") 1264 1265 def test_files(self): 1266 inputStructure = [ 1267 "foo", 1268 "bar", 1269 "baz", 1270 BytesIO(b"this is a file\r\n"), 1271 "buz", 1272 "biz", 1273 ] 1274 1275 output = b'"foo" "bar" "baz" {16}\r\nthis is a file\r\n "buz" "biz"' 1276 1277 self.assertEqual(imap4.collapseNestedLists(inputStructure), output) 1278 1279 def test_quoteAvoider(self): 1280 input = [ 1281 b"foo", 1282 imap4.DontQuoteMe(b"bar"), 1283 b"baz", 1284 BytesIO(b"this is a file\r\n"), 1285 b"this is\r\nquoted", 1286 imap4.DontQuoteMe(b"buz"), 1287 b"", 1288 ] 1289 1290 output = ( 1291 b'"foo" bar "baz"' 1292 b" {16}\r\nthis is a file\r\n " 1293 b"{15}\r\nthis is\r\nquoted" 1294 b' buz ""' 1295 ) 1296 1297 self.assertEqual(imap4.collapseNestedLists(input), output) 1298 1299 def test_literals(self): 1300 cases = [ 1301 (b"({10}\r\n0123456789)", [[b"0123456789"]]), 1302 ] 1303 1304 for (case, expected) in cases: 1305 self.assertEqual(imap4.parseNestedParens(case), expected) 1306 1307 def test_queryBuilder(self): 1308 inputs = [ 1309 imap4.Query(flagged=1), 1310 imap4.Query(sorted=1, unflagged=1, deleted=1), 1311 imap4.Or(imap4.Query(flagged=1), imap4.Query(deleted=1)), 1312 imap4.Query(before="today"), 1313 imap4.Or(imap4.Query(deleted=1), imap4.Query(unseen=1), imap4.Query(new=1)), 1314 imap4.Or( 1315 imap4.Not( 1316 imap4.Or( 1317 imap4.Query(sorted=1, since="yesterday", smaller=1000), 1318 imap4.Query(sorted=1, before="tuesday", larger=10000), 1319 imap4.Query(sorted=1, unseen=1, deleted=1, before="today"), 1320 imap4.Not(imap4.Query(subject="spam")), 1321 ), 1322 ), 1323 imap4.Not(imap4.Query(uid="1:5")), 1324 ), 1325 ] 1326 1327 outputs = [ 1328 "FLAGGED", 1329 "(DELETED UNFLAGGED)", 1330 "(OR FLAGGED DELETED)", 1331 '(BEFORE "today")', 1332 "(OR DELETED (OR UNSEEN NEW))", 1333 '(OR (NOT (OR (SINCE "yesterday" SMALLER 1000) ' # Continuing 1334 '(OR (BEFORE "tuesday" LARGER 10000) (OR (BEFORE ' # Some more 1335 '"today" DELETED UNSEEN) (NOT (SUBJECT "spam")))))) ' # And more 1336 "(NOT (UID 1:5)))", 1337 ] 1338 1339 for (query, expected) in zip(inputs, outputs): 1340 self.assertEqual(query, expected) 1341 1342 def test_queryKeywordFlagWithQuotes(self): 1343 """ 1344 When passed the C{keyword} argument, L{imap4.Query} returns an unquoted 1345 string. 1346 1347 @see: U{http://tools.ietf.org/html/rfc3501#section-9} 1348 @see: U{http://tools.ietf.org/html/rfc3501#section-6.4.4} 1349 """ 1350 query = imap4.Query(keyword="twisted") 1351 self.assertEqual("(KEYWORD twisted)", query) 1352 1353 def test_queryUnkeywordFlagWithQuotes(self): 1354 """ 1355 When passed the C{unkeyword} argument, L{imap4.Query} returns an 1356 unquoted string. 1357 1358 @see: U{http://tools.ietf.org/html/rfc3501#section-9} 1359 @see: U{http://tools.ietf.org/html/rfc3501#section-6.4.4} 1360 """ 1361 query = imap4.Query(unkeyword="twisted") 1362 self.assertEqual("(UNKEYWORD twisted)", query) 1363 1364 def test_queryWithMesssageSet(self): 1365 """ 1366 When passed a L{MessageSet}, L{imap4.Query} returns a query 1367 containing a quoted string representing the ID sequence. 1368 """ 1369 query = imap4.Query(messages=imap4.MessageSet(1, None)) 1370 self.assertEqual(query, '(MESSAGES "1:*")') 1371 1372 def test_queryWithInteger(self): 1373 """ 1374 When passed an L{int}, L{imap4.Query} returns a query 1375 containing a quoted integer. 1376 """ 1377 query = imap4.Query(messages=1) 1378 self.assertEqual(query, '(MESSAGES "1")') 1379 1380 def test_queryOrIllegalQuery(self): 1381 """ 1382 An L{imap4.Or} query with less than two arguments raises an 1383 L{imap4.IllegalQueryError}. 1384 """ 1385 self.assertRaises(imap4.IllegalQueryError, imap4.Or, imap4.Query(messages=1)) 1386 1387 def _keywordFilteringTest(self, keyword): 1388 """ 1389 Helper to implement tests for value filtering of KEYWORD and UNKEYWORD 1390 queries. 1391 1392 @param keyword: A native string giving the name of the L{imap4.Query} 1393 keyword argument to test. 1394 """ 1395 # Check all the printable exclusions 1396 self.assertEqual( 1397 f"({keyword.upper()} twistedrocks)", 1398 imap4.Query(**{keyword: r'twisted (){%*"\] rocks'}), 1399 ) 1400 1401 # Check all the non-printable exclusions 1402 self.assertEqual( 1403 f"({keyword.upper()} twistedrocks)", 1404 imap4.Query( 1405 **{ 1406 keyword: "twisted %s rocks" 1407 % ("".join(chr(ch) for ch in range(33)),) 1408 } 1409 ), 1410 ) 1411 1412 def test_queryKeywordFlag(self): 1413 r""" 1414 When passed the C{keyword} argument, L{imap4.Query} returns an 1415 C{atom} that consists of one or more non-special characters. 1416 1417 List of the invalid characters: 1418 1419 ( ) { % * " \ ] CTL SP 1420 1421 @see: U{ABNF definition of CTL and SP<https://tools.ietf.org/html/rfc2234>} 1422 @see: U{IMAP4 grammar<http://tools.ietf.org/html/rfc3501#section-9>} 1423 @see: U{IMAP4 SEARCH specification<http://tools.ietf.org/html/rfc3501#section-6.4.4>} 1424 """ 1425 self._keywordFilteringTest("keyword") 1426 1427 def test_queryUnkeywordFlag(self): 1428 r""" 1429 When passed the C{unkeyword} argument, L{imap4.Query} returns an 1430 C{atom} that consists of one or more non-special characters. 1431 1432 List of the invalid characters: 1433 1434 ( ) { % * " \ ] CTL SP 1435 1436 @see: U{ABNF definition of CTL and SP<https://tools.ietf.org/html/rfc2234>} 1437 @see: U{IMAP4 grammar<http://tools.ietf.org/html/rfc3501#section-9>} 1438 @see: U{IMAP4 SEARCH specification<http://tools.ietf.org/html/rfc3501#section-6.4.4>} 1439 """ 1440 self._keywordFilteringTest("unkeyword") 1441 1442 def test_invalidIdListParser(self): 1443 """ 1444 Trying to parse an invalid representation of a sequence range raises an 1445 L{IllegalIdentifierError}. 1446 """ 1447 inputs = [b"*:*", b"foo", b"4:", b"bar:5"] 1448 1449 for input in inputs: 1450 self.assertRaises( 1451 imap4.IllegalIdentifierError, imap4.parseIdList, input, 12345 1452 ) 1453 1454 def test_invalidIdListParserNonPositive(self): 1455 """ 1456 Zeroes and negative values are not accepted in id range expressions. RFC 1457 3501 states that sequence numbers and sequence ranges consist of 1458 non-negative numbers (RFC 3501 section 9, the seq-number grammar item). 1459 """ 1460 inputs = [b"0:5", b"0:0", b"*:0", b"0", b"-3:5", b"1:-2", b"-1"] 1461 1462 for input in inputs: 1463 self.assertRaises( 1464 imap4.IllegalIdentifierError, imap4.parseIdList, input, 12345 1465 ) 1466 1467 def test_parseIdList(self): 1468 """ 1469 The function to parse sequence ranges yields appropriate L{MessageSet} 1470 objects. 1471 """ 1472 inputs = [ 1473 b"1:*", 1474 b"5:*", 1475 b"1:2,5:*", 1476 b"*", 1477 b"1", 1478 b"1,2", 1479 b"1,3,5", 1480 b"1:10", 1481 b"1:10,11", 1482 b"1:5,10:20", 1483 b"1,5:10", 1484 b"1,5:10,15:20", 1485 b"1:10,15,20:25", 1486 b"4:2", 1487 ] 1488 1489 outputs = [ 1490 MessageSet(1, None), 1491 MessageSet(5, None), 1492 MessageSet(5, None) + MessageSet(1, 2), 1493 MessageSet(None, None), 1494 MessageSet(1), 1495 MessageSet(1, 2), 1496 MessageSet(1) + MessageSet(3) + MessageSet(5), 1497 MessageSet(1, 10), 1498 MessageSet(1, 11), 1499 MessageSet(1, 5) + MessageSet(10, 20), 1500 MessageSet(1) + MessageSet(5, 10), 1501 MessageSet(1) + MessageSet(5, 10) + MessageSet(15, 20), 1502 MessageSet(1, 10) + MessageSet(15) + MessageSet(20, 25), 1503 MessageSet(2, 4), 1504 ] 1505 1506 lengths = [None, None, None, 1, 1, 2, 3, 10, 11, 16, 7, 13, 17, 3] 1507 1508 for (input, expected) in zip(inputs, outputs): 1509 self.assertEqual(imap4.parseIdList(input), expected) 1510 1511 for (input, expected) in zip(inputs, lengths): 1512 if expected is None: 1513 self.assertRaises(TypeError, len, imap4.parseIdList(input)) 1514 else: 1515 L = len(imap4.parseIdList(input)) 1516 self.assertEqual(L, expected, f"len({input!r}) = {L!r} != {expected!r}") 1517 1518 def test_parseTimeInvalidFormat(self): 1519 """ 1520 L{imap4.parseTime} raises L{ValueError} when given a a time 1521 string whose format is invalid. 1522 """ 1523 self.assertRaises(ValueError, imap4.parseTime, "invalid") 1524 1525 def test_parseTimeInvalidValues(self): 1526 """ 1527 L{imap4.parseTime} raises L{ValueError} when given a time 1528 string composed of invalid values. 1529 """ 1530 invalidStrings = [ 1531 "invalid-July-2017", 1532 "2-invalid-2017", 1533 "2-July-invalid", 1534 ] 1535 for invalid in invalidStrings: 1536 self.assertRaises(ValueError, imap4.parseTime, invalid) 1537 1538 def test_statusRequestHelper(self): 1539 """ 1540 L{imap4.statusRequestHelper} builds a L{dict} mapping the 1541 requested status names to values extracted from the provided 1542 L{IMailboxIMAP}'s. 1543 """ 1544 mbox = SimpleMailbox() 1545 1546 expected = { 1547 "MESSAGES": mbox.getMessageCount(), 1548 "RECENT": mbox.getRecentCount(), 1549 "UIDNEXT": mbox.getUIDNext(), 1550 "UIDVALIDITY": mbox.getUIDValidity(), 1551 "UNSEEN": mbox.getUnseenCount(), 1552 } 1553 1554 result = imap4.statusRequestHelper(mbox, expected.keys()) 1555 1556 self.assertEqual(expected, result) 1557 1558 1559@implementer(imap4.IMailboxInfo, imap4.IMailbox, imap4.ICloseableMailbox) 1560class SimpleMailbox: 1561 flags = ("\\Flag1", "Flag2", "\\AnotherSysFlag", "LastFlag") 1562 messages: List[Tuple[bytes, list, bytes, int]] = [] 1563 mUID = 0 1564 rw = 1 1565 closed = False 1566 1567 def __init__(self): 1568 self.listeners = [] 1569 self.addListener = self.listeners.append 1570 self.removeListener = self.listeners.remove 1571 1572 def getFlags(self): 1573 return self.flags 1574 1575 def getUIDValidity(self): 1576 return 42 1577 1578 def getUIDNext(self): 1579 return len(self.messages) + 1 1580 1581 def getMessageCount(self): 1582 return 9 1583 1584 def getRecentCount(self): 1585 return 3 1586 1587 def getUnseenCount(self): 1588 return 4 1589 1590 def isWriteable(self): 1591 return self.rw 1592 1593 def destroy(self): 1594 pass 1595 1596 def getHierarchicalDelimiter(self): 1597 return "/" 1598 1599 def requestStatus(self, names): 1600 r = {} 1601 if "MESSAGES" in names: 1602 r["MESSAGES"] = self.getMessageCount() 1603 if "RECENT" in names: 1604 r["RECENT"] = self.getRecentCount() 1605 if "UIDNEXT" in names: 1606 r["UIDNEXT"] = self.getMessageCount() + 1 1607 if "UIDVALIDITY" in names: 1608 r["UIDVALIDITY"] = self.getUID() 1609 if "UNSEEN" in names: 1610 r["UNSEEN"] = self.getUnseenCount() 1611 return defer.succeed(r) 1612 1613 def addMessage(self, message, flags, date=None): 1614 self.messages.append((message, flags, date, self.mUID)) 1615 self.mUID += 1 1616 return defer.succeed(None) 1617 1618 def expunge(self): 1619 delete = [] 1620 for i in self.messages: 1621 if "\\Deleted" in i[1]: 1622 delete.append(i) 1623 for i in delete: 1624 self.messages.remove(i) 1625 return [i[3] for i in delete] 1626 1627 def close(self): 1628 self.closed = True 1629 1630 def fetch(self, messages, uid): 1631 # IMailboxIMAP.fetch 1632 pass 1633 1634 def getUID(self, message): 1635 # IMailboxIMAP.getUID 1636 pass 1637 1638 def store(self, messages, flags, mode, uid): 1639 # IMailboxIMAP.store 1640 pass 1641 1642 1643@implementer(imap4.IMailboxInfo, imap4.IMailbox) 1644class UncloseableMailbox: 1645 """ 1646 A mailbox that cannot be closed. 1647 """ 1648 1649 flags = ("\\Flag1", "Flag2", "\\AnotherSysFlag", "LastFlag") 1650 messages: List[Tuple[bytes, list, bytes, int]] = [] 1651 mUID = 0 1652 rw = 1 1653 closed = False 1654 1655 def __init__(self): 1656 self.listeners = [] 1657 self.addListener = self.listeners.append 1658 self.removeListener = self.listeners.remove 1659 1660 def getFlags(self): 1661 """ 1662 The flags 1663 1664 @return: A sequence of flags. 1665 """ 1666 return self.flags 1667 1668 def getUIDValidity(self): 1669 """ 1670 The UID validity value. 1671 1672 @return: The value. 1673 """ 1674 return 42 1675 1676 def getUIDNext(self): 1677 """ 1678 The next UID. 1679 1680 @return: The UID. 1681 """ 1682 return len(self.messages) + 1 1683 1684 def getMessageCount(self): 1685 """ 1686 The number of messages. 1687 1688 @return: The number. 1689 """ 1690 return 9 1691 1692 def getRecentCount(self): 1693 """ 1694 The recent messages. 1695 1696 @return: The number. 1697 """ 1698 return 3 1699 1700 def getUnseenCount(self): 1701 """ 1702 The recent messages. 1703 1704 @return: The number. 1705 """ 1706 return 4 1707 1708 def isWriteable(self): 1709 """ 1710 The recent messages. 1711 1712 @return: Whether or not the mailbox is writable. 1713 """ 1714 return self.rw 1715 1716 def destroy(self): 1717 """ 1718 Destroy this mailbox. 1719 """ 1720 pass 1721 1722 def getHierarchicalDelimiter(self): 1723 """ 1724 Return the hierarchical delimiter. 1725 1726 @return: The delimiter. 1727 """ 1728 return "/" 1729 1730 def requestStatus(self, names): 1731 """ 1732 Return the mailbox's status. 1733 1734 @param names: The status items to include. 1735 1736 @return: A L{dict} of status data. 1737 """ 1738 r = {} 1739 if "MESSAGES" in names: 1740 r["MESSAGES"] = self.getMessageCount() 1741 if "RECENT" in names: 1742 r["RECENT"] = self.getRecentCount() 1743 if "UIDNEXT" in names: 1744 r["UIDNEXT"] = self.getMessageCount() + 1 1745 if "UIDVALIDITY" in names: 1746 r["UIDVALIDITY"] = self.getUID() 1747 if "UNSEEN" in names: 1748 r["UNSEEN"] = self.getUnseenCount() 1749 return defer.succeed(r) 1750 1751 def addMessage(self, message, flags, date=None): 1752 """ 1753 Add a message to the mailbox. 1754 1755 @param message: The message body. 1756 1757 @param flags: The message flags. 1758 1759 @param date: The message date. 1760 1761 @return: A L{Deferred} that fires when the message has been 1762 added. 1763 """ 1764 self.messages.append((message, flags, date, self.mUID)) 1765 self.mUID += 1 1766 return defer.succeed(None) 1767 1768 def expunge(self): 1769 """ 1770 Delete messages marked for deletion. 1771 1772 @return: A L{list} of deleted message IDs. 1773 """ 1774 delete = [] 1775 for i in self.messages: 1776 if "\\Deleted" in i[1]: 1777 delete.append(i) 1778 for i in delete: 1779 self.messages.remove(i) 1780 return [i[3] for i in delete] 1781 1782 def fetch(self, messages, uid): 1783 # IMailboxIMAP.fetch 1784 pass 1785 1786 def getUID(self, message): 1787 # IMailboxIMAP.getUID 1788 pass 1789 1790 def store(self, messages, flags, mode, uid): 1791 # IMailboxIMAP.store 1792 pass 1793 1794 1795class AccountWithoutNamespaces(imap4.MemoryAccountWithoutNamespaces): 1796 """ 1797 An in-memory account that does not provide L{INamespacePresenter}. 1798 """ 1799 1800 mailboxFactory = SimpleMailbox 1801 1802 def _emptyMailbox(self, name, id): 1803 return self.mailboxFactory() 1804 1805 def select(self, name, rw=1): 1806 mbox = imap4.MemoryAccount.select(self, name) 1807 if mbox is not None: 1808 mbox.rw = rw 1809 return mbox 1810 1811 1812class Account(AccountWithoutNamespaces, imap4.MemoryAccount): 1813 """ 1814 An in-memory account that provides L{INamespacePresenter}. 1815 """ 1816 1817 1818class SimpleServer(imap4.IMAP4Server): 1819 theAccount = Account(b"testuser") 1820 1821 def __init__(self, *args, **kw): 1822 imap4.IMAP4Server.__init__(self, *args, **kw) 1823 realm = TestRealm(accountHolder=self) 1824 portal = Portal(realm) 1825 c = InMemoryUsernamePasswordDatabaseDontUse() 1826 c.addUser(b"testuser", b"password-test") 1827 self.checker = c 1828 self.portal = portal 1829 portal.registerChecker(c) 1830 self.timeoutTest = False 1831 1832 def lineReceived(self, line): 1833 if self.timeoutTest: 1834 # Do not send a response 1835 return 1836 1837 imap4.IMAP4Server.lineReceived(self, line) 1838 1839 1840class SimpleClient(imap4.IMAP4Client): 1841 def __init__(self, deferred, contextFactory=None): 1842 imap4.IMAP4Client.__init__(self, contextFactory) 1843 self.deferred = deferred 1844 self.events = [] 1845 1846 def serverGreeting(self, caps): 1847 self.deferred.callback(None) 1848 1849 def modeChanged(self, writeable): 1850 self.events.append(["modeChanged", writeable]) 1851 self.transport.loseConnection() 1852 1853 def flagsChanged(self, newFlags): 1854 self.events.append(["flagsChanged", newFlags]) 1855 self.transport.loseConnection() 1856 1857 def newMessages(self, exists, recent): 1858 self.events.append(["newMessages", exists, recent]) 1859 self.transport.loseConnection() 1860 1861 1862class IMAP4HelperMixin: 1863 1864 serverCTX: Optional[ServerTLSContext] = None 1865 clientCTX: Optional[ClientTLSContext] = None 1866 1867 def setUp(self): 1868 d = defer.Deferred() 1869 self.server = SimpleServer(contextFactory=self.serverCTX) 1870 self.client = SimpleClient(d, contextFactory=self.clientCTX) 1871 self.connected = d 1872 1873 SimpleMailbox.messages = [] 1874 theAccount = Account(b"testuser") 1875 theAccount.mboxType = SimpleMailbox 1876 SimpleServer.theAccount = theAccount 1877 1878 def tearDown(self): 1879 del self.server 1880 del self.client 1881 del self.connected 1882 1883 def _cbStopClient(self, ignore): 1884 self.client.transport.loseConnection() 1885 1886 def _ebGeneral(self, failure): 1887 self.client.transport.loseConnection() 1888 self.server.transport.loseConnection() 1889 log.err(failure, "Problem with " + str(self)) 1890 1891 def loopback(self): 1892 return loopback.loopbackAsync(self.server, self.client) 1893 1894 def assertClientFailureMessage(self, failure, expected): 1895 """ 1896 Assert that the provided failure is an L{IMAP4Exception} with 1897 the given message. 1898 1899 @param failure: A failure whose value L{IMAP4Exception} 1900 @type failure: L{failure.Failure} 1901 1902 @param expected: The expected failure message. 1903 @type expected: L{bytes} 1904 """ 1905 failure.trap(imap4.IMAP4Exception) 1906 message = str(failure.value) 1907 expected = repr(expected) 1908 1909 self.assertEqual(message, expected) 1910 1911 1912class IMAP4ServerTests(IMAP4HelperMixin, TestCase): 1913 def testCapability(self): 1914 caps = {} 1915 1916 def getCaps(): 1917 def gotCaps(c): 1918 caps.update(c) 1919 self.server.transport.loseConnection() 1920 1921 return self.client.getCapabilities().addCallback(gotCaps) 1922 1923 d1 = self.connected.addCallback(strip(getCaps)).addErrback(self._ebGeneral) 1924 d = defer.gatherResults([self.loopback(), d1]) 1925 expected = {b"IMAP4rev1": None, b"NAMESPACE": None, b"IDLE": None} 1926 return d.addCallback(lambda _: self.assertEqual(expected, caps)) 1927 1928 def testCapabilityWithAuth(self): 1929 caps = {} 1930 self.server.challengers[b"CRAM-MD5"] = CramMD5Credentials 1931 1932 def getCaps(): 1933 def gotCaps(c): 1934 caps.update(c) 1935 self.server.transport.loseConnection() 1936 1937 return self.client.getCapabilities().addCallback(gotCaps) 1938 1939 d1 = self.connected.addCallback(strip(getCaps)).addErrback(self._ebGeneral) 1940 d = defer.gatherResults([self.loopback(), d1]) 1941 1942 expCap = { 1943 b"IMAP4rev1": None, 1944 b"NAMESPACE": None, 1945 b"IDLE": None, 1946 b"AUTH": [b"CRAM-MD5"], 1947 } 1948 1949 return d.addCallback(lambda _: self.assertEqual(expCap, caps)) 1950 1951 def testLogout(self): 1952 self.loggedOut = 0 1953 1954 def logout(): 1955 def setLoggedOut(): 1956 self.loggedOut = 1 1957 1958 self.client.logout().addCallback(strip(setLoggedOut)) 1959 1960 self.connected.addCallback(strip(logout)).addErrback(self._ebGeneral) 1961 d = self.loopback() 1962 return d.addCallback(lambda _: self.assertEqual(self.loggedOut, 1)) 1963 1964 def testNoop(self): 1965 self.responses = None 1966 1967 def noop(): 1968 def setResponses(responses): 1969 self.responses = responses 1970 self.server.transport.loseConnection() 1971 1972 self.client.noop().addCallback(setResponses) 1973 1974 self.connected.addCallback(strip(noop)).addErrback(self._ebGeneral) 1975 d = self.loopback() 1976 return d.addCallback(lambda _: self.assertEqual(self.responses, [])) 1977 1978 def testLogin(self): 1979 def login(): 1980 d = self.client.login(b"testuser", b"password-test") 1981 d.addCallback(self._cbStopClient) 1982 1983 d1 = self.connected.addCallback(strip(login)).addErrback(self._ebGeneral) 1984 d = defer.gatherResults([d1, self.loopback()]) 1985 return d.addCallback(self._cbTestLogin) 1986 1987 def _cbTestLogin(self, ignored): 1988 self.assertEqual(self.server.account, SimpleServer.theAccount) 1989 self.assertEqual(self.server.state, "auth") 1990 1991 def testFailedLogin(self): 1992 def login(): 1993 d = self.client.login(b"testuser", b"wrong-password") 1994 d.addBoth(self._cbStopClient) 1995 1996 d1 = self.connected.addCallback(strip(login)).addErrback(self._ebGeneral) 1997 d2 = self.loopback() 1998 d = defer.gatherResults([d1, d2]) 1999 return d.addCallback(self._cbTestFailedLogin) 2000 2001 def _cbTestFailedLogin(self, ignored): 2002 self.assertEqual(self.server.account, None) 2003 self.assertEqual(self.server.state, "unauth") 2004 2005 def test_loginWithoutPortal(self): 2006 """ 2007 Attempting to log into a server that has no L{Portal} results 2008 in a failed login. 2009 """ 2010 self.server.portal = None 2011 2012 def login(): 2013 d = self.client.login(b"testuser", b"wrong-password") 2014 d.addBoth(self._cbStopClient) 2015 2016 d1 = self.connected.addCallback(strip(login)).addErrback(self._ebGeneral) 2017 d2 = self.loopback() 2018 d = defer.gatherResults([d1, d2]) 2019 return d.addCallback(self._cbTestFailedLogin) 2020 2021 def test_nonIAccountAvatar(self): 2022 """ 2023 The server responds with a C{BAD} response when its portal 2024 attempts to log a user in with checker that claims to support 2025 L{IAccount} but returns an an avatar interface that is not 2026 L{IAccount}. 2027 """ 2028 2029 def brokenRequestAvatar(*_, **__): 2030 return ("Not IAccount", "Not an account", lambda: None) 2031 2032 self.server.portal.realm.requestAvatar = brokenRequestAvatar 2033 2034 def login(): 2035 d = self.client.login(b"testuser", b"password-test") 2036 d.addBoth(self._cbStopClient) 2037 2038 d1 = self.connected.addCallback(strip(login)).addErrback(self._ebGeneral) 2039 d2 = self.loopback() 2040 d = defer.gatherResults([d1, d2]) 2041 return d.addCallback(self._cbTestFailedLogin) 2042 2043 def test_loginException(self): 2044 """ 2045 Any exception raised by L{IMAP4Server.authenticateLogin} that 2046 is not L{UnauthorizedLogin} is logged results in a C{BAD} 2047 response. 2048 """ 2049 2050 class UnexpectedException(Exception): 2051 """ 2052 An unexpected exception. 2053 """ 2054 2055 def raisesUnexpectedException(user, passwd): 2056 raise UnexpectedException("Whoops") 2057 2058 self.server.authenticateLogin = raisesUnexpectedException 2059 2060 def login(): 2061 return self.client.login(b"testuser", b"password-test") 2062 2063 d1 = self.connected.addCallback(strip(login)) 2064 2065 d1.addErrback(self.assertClientFailureMessage, b"Server error: Whoops") 2066 2067 @d1.addCallback 2068 def assertErrorLogged(_): 2069 self.assertTrue(self.flushLoggedErrors(UnexpectedException)) 2070 2071 d1.addErrback(self._ebGeneral) 2072 d1.addBoth(self._cbStopClient) 2073 2074 d2 = self.loopback() 2075 d = defer.gatherResults([d1, d2]) 2076 return d.addCallback(self._cbTestFailedLogin) 2077 2078 def testLoginRequiringQuoting(self): 2079 self.server.checker.users = {b"{test}user": b"{test}password"} 2080 2081 def login(): 2082 d = self.client.login(b"{test}user", b"{test}password") 2083 d.addErrback(log.err, "Problem with " + str(self)) 2084 d.addCallback(self._cbStopClient) 2085 2086 d1 = self.connected.addCallback(strip(login)).addErrback(self._ebGeneral) 2087 d = defer.gatherResults([self.loopback(), d1]) 2088 return d.addCallback(self._cbTestLoginRequiringQuoting) 2089 2090 def _cbTestLoginRequiringQuoting(self, ignored): 2091 self.assertEqual(self.server.account, SimpleServer.theAccount) 2092 self.assertEqual(self.server.state, "auth") 2093 2094 def testNamespace(self): 2095 self.namespaceArgs = None 2096 2097 def login(): 2098 return self.client.login(b"testuser", b"password-test") 2099 2100 def namespace(): 2101 def gotNamespace(args): 2102 self.namespaceArgs = args 2103 self._cbStopClient(None) 2104 2105 return self.client.namespace().addCallback(gotNamespace) 2106 2107 d1 = self.connected.addCallback(strip(login)) 2108 d1.addCallback(strip(namespace)) 2109 d1.addErrback(self._ebGeneral) 2110 d2 = self.loopback() 2111 d = defer.gatherResults([d1, d2]) 2112 2113 @d.addCallback 2114 def assertAllPairsNativeStrings(ignored): 2115 for namespaces in self.namespaceArgs: 2116 for pair in namespaces: 2117 for value in pair: 2118 self.assertIsInstance(value, str) 2119 return self.namespaceArgs 2120 2121 d.addCallback(self.assertEqual, [[["", "/"]], [], []]) 2122 return d 2123 2124 def test_mailboxWithoutNamespace(self): 2125 """ 2126 A mailbox that does not provide L{INamespacePresenter} returns 2127 empty L{list}s for its personal, shared, and user namespaces. 2128 """ 2129 self.server.theAccount = AccountWithoutNamespaces(b"testuser") 2130 self.namespaceArgs = None 2131 2132 def login(): 2133 return self.client.login(b"testuser", b"password-test") 2134 2135 def namespace(): 2136 def gotNamespace(args): 2137 self.namespaceArgs = args 2138 self._cbStopClient(None) 2139 2140 return self.client.namespace().addCallback(gotNamespace) 2141 2142 d1 = self.connected.addCallback(strip(login)) 2143 d1.addCallback(strip(namespace)) 2144 d1.addErrback(self._ebGeneral) 2145 d2 = self.loopback() 2146 d = defer.gatherResults([d1, d2]) 2147 d.addCallback(lambda _: self.namespaceArgs) 2148 d.addCallback(self.assertEqual, [[], [], []]) 2149 return d 2150 2151 def testSelect(self): 2152 SimpleServer.theAccount.addMailbox("test-mailbox") 2153 self.selectedArgs = None 2154 2155 def login(): 2156 return self.client.login(b"testuser", b"password-test") 2157 2158 def select(): 2159 def selected(args): 2160 self.selectedArgs = args 2161 self._cbStopClient(None) 2162 2163 d = self.client.select("test-mailbox") 2164 d.addCallback(selected) 2165 return d 2166 2167 d1 = self.connected.addCallback(strip(login)) 2168 d1.addCallback(strip(select)) 2169 d1.addErrback(self._ebGeneral) 2170 d2 = self.loopback() 2171 return defer.gatherResults([d1, d2]).addCallback(self._cbTestSelect) 2172 2173 def test_selectWithoutMailbox(self): 2174 """ 2175 A client that selects a mailbox that does not exist receives a 2176 C{NO} response. 2177 """ 2178 2179 def login(): 2180 return self.client.login(b"testuser", b"password-test") 2181 2182 def select(): 2183 return self.client.select("test-mailbox") 2184 2185 self.connected.addCallback(strip(login)) 2186 self.connected.addCallback(strip(select)) 2187 self.connected.addErrback(self.assertClientFailureMessage, b"No such mailbox") 2188 self.connected.addCallback(self._cbStopClient) 2189 self.connected.addErrback(self._ebGeneral) 2190 2191 connectionComplete = defer.gatherResults([self.connected, self.loopback()]) 2192 2193 @connectionComplete.addCallback 2194 def assertNoMailboxSelected(_): 2195 self.assertIsNone(self.server.mbox) 2196 2197 return connectionComplete 2198 2199 def _cbTestSelect(self, ignored): 2200 mbox = SimpleServer.theAccount.mailboxes["TEST-MAILBOX"] 2201 self.assertEqual(self.server.mbox, mbox) 2202 self.assertEqual( 2203 self.selectedArgs, 2204 { 2205 "EXISTS": 9, 2206 "RECENT": 3, 2207 "UIDVALIDITY": 42, 2208 "FLAGS": ("\\Flag1", "Flag2", "\\AnotherSysFlag", "LastFlag"), 2209 "READ-WRITE": True, 2210 }, 2211 ) 2212 2213 def test_examine(self): 2214 """ 2215 L{IMAP4Client.examine} issues an I{EXAMINE} command to the server and 2216 returns a L{Deferred} which fires with a C{dict} with as many of the 2217 following keys as the server includes in its response: C{'FLAGS'}, 2218 C{'EXISTS'}, C{'RECENT'}, C{'UNSEEN'}, C{'READ-WRITE'}, C{'READ-ONLY'}, 2219 C{'UIDVALIDITY'}, and C{'PERMANENTFLAGS'}. 2220 2221 Unfortunately the server doesn't generate all of these so it's hard to 2222 test the client's handling of them here. See 2223 L{IMAP4ClientExamineTests} below. 2224 2225 See U{RFC 3501<http://www.faqs.org/rfcs/rfc3501.html>}, section 6.3.2, 2226 for details. 2227 """ 2228 SimpleServer.theAccount.addMailbox("test-mailbox") 2229 self.examinedArgs = None 2230 2231 def login(): 2232 return self.client.login(b"testuser", b"password-test") 2233 2234 def examine(): 2235 def examined(args): 2236 self.examinedArgs = args 2237 self._cbStopClient(None) 2238 2239 d = self.client.examine("test-mailbox") 2240 d.addCallback(examined) 2241 return d 2242 2243 d1 = self.connected.addCallback(strip(login)) 2244 d1.addCallback(strip(examine)) 2245 d1.addErrback(self._ebGeneral) 2246 d2 = self.loopback() 2247 d = defer.gatherResults([d1, d2]) 2248 return d.addCallback(self._cbTestExamine) 2249 2250 def _cbTestExamine(self, ignored): 2251 mbox = SimpleServer.theAccount.mailboxes["TEST-MAILBOX"] 2252 self.assertEqual(self.server.mbox, mbox) 2253 self.assertEqual( 2254 self.examinedArgs, 2255 { 2256 "EXISTS": 9, 2257 "RECENT": 3, 2258 "UIDVALIDITY": 42, 2259 "FLAGS": ("\\Flag1", "Flag2", "\\AnotherSysFlag", "LastFlag"), 2260 "READ-WRITE": False, 2261 }, 2262 ) 2263 2264 def testCreate(self): 2265 succeed = ("testbox", "test/box", "test/", "test/box/box", "INBOX") 2266 fail = ("testbox", "test/box") 2267 2268 def cb(): 2269 self.result.append(1) 2270 2271 def eb(failure): 2272 self.result.append(0) 2273 2274 def login(): 2275 return self.client.login(b"testuser", b"password-test") 2276 2277 def create(): 2278 for name in succeed + fail: 2279 d = self.client.create(name) 2280 d.addCallback(strip(cb)).addErrback(eb) 2281 d.addCallbacks(self._cbStopClient, self._ebGeneral) 2282 2283 self.result = [] 2284 d1 = self.connected.addCallback(strip(login)).addCallback(strip(create)) 2285 d2 = self.loopback() 2286 d = defer.gatherResults([d1, d2]) 2287 return d.addCallback(self._cbTestCreate, succeed, fail) 2288 2289 def _cbTestCreate(self, ignored, succeed, fail): 2290 self.assertEqual(self.result, [1] * len(succeed) + [0] * len(fail)) 2291 mbox = sorted(SimpleServer.theAccount.mailboxes) 2292 answers = sorted(["inbox", "testbox", "test/box", "test", "test/box/box"]) 2293 self.assertEqual(mbox, [a.upper() for a in answers]) 2294 2295 def testDelete(self): 2296 SimpleServer.theAccount.addMailbox("delete/me") 2297 2298 def login(): 2299 return self.client.login(b"testuser", b"password-test") 2300 2301 def delete(): 2302 return self.client.delete("delete/me") 2303 2304 d1 = self.connected.addCallback(strip(login)) 2305 d1.addCallbacks(strip(delete), self._ebGeneral) 2306 d1.addCallbacks(self._cbStopClient, self._ebGeneral) 2307 d2 = self.loopback() 2308 d = defer.gatherResults([d1, d2]) 2309 d.addCallback( 2310 lambda _: self.assertEqual(list(SimpleServer.theAccount.mailboxes), []) 2311 ) 2312 return d 2313 2314 def testDeleteWithInferiorHierarchicalNames(self): 2315 """ 2316 Attempting to delete a mailbox with hierarchically inferior 2317 names fails with an informative error. 2318 2319 @see: U{https://tools.ietf.org/html/rfc3501#section-6.3.4} 2320 2321 @return: A L{Deferred} with assertions. 2322 """ 2323 SimpleServer.theAccount.addMailbox("delete") 2324 SimpleServer.theAccount.addMailbox("delete/me") 2325 2326 def login(): 2327 return self.client.login(b"testuser", b"password-test") 2328 2329 def delete(): 2330 return self.client.delete("delete") 2331 2332 def assertIMAPException(failure): 2333 failure.trap(imap4.IMAP4Exception) 2334 self.assertEqual( 2335 str(failure.value), 2336 str(b'Name "DELETE" has inferior hierarchical names'), 2337 ) 2338 2339 loggedIn = self.connected.addCallback(strip(login)) 2340 loggedIn.addCallbacks(strip(delete), self._ebGeneral) 2341 loggedIn.addErrback(assertIMAPException) 2342 loggedIn.addCallbacks(self._cbStopClient) 2343 2344 loopedBack = self.loopback() 2345 d = defer.gatherResults([loggedIn, loopedBack]) 2346 d.addCallback( 2347 lambda _: self.assertEqual( 2348 sorted(SimpleServer.theAccount.mailboxes), ["DELETE", "DELETE/ME"] 2349 ) 2350 ) 2351 return d 2352 2353 def testIllegalInboxDelete(self): 2354 self.stashed = None 2355 2356 def login(): 2357 return self.client.login(b"testuser", b"password-test") 2358 2359 def delete(): 2360 return self.client.delete("inbox") 2361 2362 def stash(result): 2363 self.stashed = result 2364 2365 d1 = self.connected.addCallback(strip(login)) 2366 d1.addCallbacks(strip(delete), self._ebGeneral) 2367 d1.addBoth(stash) 2368 d1.addCallbacks(self._cbStopClient, self._ebGeneral) 2369 d2 = self.loopback() 2370 d = defer.gatherResults([d1, d2]) 2371 d.addCallback( 2372 lambda _: self.assertTrue(isinstance(self.stashed, failure.Failure)) 2373 ) 2374 return d 2375 2376 def testNonExistentDelete(self): 2377 def login(): 2378 return self.client.login(b"testuser", b"password-test") 2379 2380 def delete(): 2381 return self.client.delete("delete/me") 2382 2383 def deleteFailed(failure): 2384 self.failure = failure 2385 2386 self.failure = None 2387 d1 = self.connected.addCallback(strip(login)) 2388 d1.addCallback(strip(delete)).addErrback(deleteFailed) 2389 d1.addCallbacks(self._cbStopClient, self._ebGeneral) 2390 d2 = self.loopback() 2391 d = defer.gatherResults([d1, d2]) 2392 d.addCallback( 2393 lambda _: self.assertEqual(str(self.failure.value), str(b"No such mailbox")) 2394 ) 2395 return d 2396 2397 def testIllegalDelete(self): 2398 m = SimpleMailbox() 2399 m.flags = (r"\Noselect",) 2400 SimpleServer.theAccount.addMailbox("delete", m) 2401 SimpleServer.theAccount.addMailbox("delete/me") 2402 2403 def login(): 2404 return self.client.login(b"testuser", b"password-test") 2405 2406 def delete(): 2407 return self.client.delete("delete") 2408 2409 def deleteFailed(failure): 2410 self.failure = failure 2411 2412 self.failure = None 2413 d1 = self.connected.addCallback(strip(login)) 2414 d1.addCallback(strip(delete)).addErrback(deleteFailed) 2415 d1.addCallbacks(self._cbStopClient, self._ebGeneral) 2416 d2 = self.loopback() 2417 d = defer.gatherResults([d1, d2]) 2418 expected = str( 2419 b"Hierarchically inferior mailboxes exist " b"and \\Noselect is set" 2420 ) 2421 d.addCallback(lambda _: self.assertEqual(str(self.failure.value), expected)) 2422 return d 2423 2424 def testRename(self): 2425 SimpleServer.theAccount.addMailbox("oldmbox") 2426 2427 def login(): 2428 return self.client.login(b"testuser", b"password-test") 2429 2430 def rename(): 2431 return self.client.rename(b"oldmbox", b"newname") 2432 2433 d1 = self.connected.addCallback(strip(login)) 2434 d1.addCallbacks(strip(rename), self._ebGeneral) 2435 d1.addCallbacks(self._cbStopClient, self._ebGeneral) 2436 d2 = self.loopback() 2437 d = defer.gatherResults([d1, d2]) 2438 d.addCallback( 2439 lambda _: self.assertEqual( 2440 list(SimpleServer.theAccount.mailboxes.keys()), ["NEWNAME"] 2441 ) 2442 ) 2443 return d 2444 2445 def testIllegalInboxRename(self): 2446 self.stashed = None 2447 2448 def login(): 2449 return self.client.login(b"testuser", b"password-test") 2450 2451 def rename(): 2452 return self.client.rename("inbox", "frotz") 2453 2454 def stash(stuff): 2455 self.stashed = stuff 2456 2457 d1 = self.connected.addCallback(strip(login)) 2458 d1.addCallbacks(strip(rename), self._ebGeneral) 2459 d1.addBoth(stash) 2460 d1.addCallbacks(self._cbStopClient, self._ebGeneral) 2461 d2 = self.loopback() 2462 d = defer.gatherResults([d1, d2]) 2463 d.addCallback( 2464 lambda _: self.assertTrue(isinstance(self.stashed, failure.Failure)) 2465 ) 2466 return d 2467 2468 def testHierarchicalRename(self): 2469 SimpleServer.theAccount.create("oldmbox/m1") 2470 SimpleServer.theAccount.create("oldmbox/m2") 2471 2472 def login(): 2473 return self.client.login(b"testuser", b"password-test") 2474 2475 def rename(): 2476 return self.client.rename("oldmbox", "newname") 2477 2478 d1 = self.connected.addCallback(strip(login)) 2479 d1.addCallbacks(strip(rename), self._ebGeneral) 2480 d1.addCallbacks(self._cbStopClient, self._ebGeneral) 2481 d2 = self.loopback() 2482 d = defer.gatherResults([d1, d2]) 2483 return d.addCallback(self._cbTestHierarchicalRename) 2484 2485 def _cbTestHierarchicalRename(self, ignored): 2486 mboxes = SimpleServer.theAccount.mailboxes.keys() 2487 expected = ["newname", "newname/m1", "newname/m2"] 2488 mboxes = list(sorted(mboxes)) 2489 self.assertEqual(mboxes, [s.upper() for s in expected]) 2490 2491 def testSubscribe(self): 2492 def login(): 2493 return self.client.login(b"testuser", b"password-test") 2494 2495 def subscribe(): 2496 return self.client.subscribe("this/mbox") 2497 2498 d1 = self.connected.addCallback(strip(login)) 2499 d1.addCallbacks(strip(subscribe), self._ebGeneral) 2500 d1.addCallbacks(self._cbStopClient, self._ebGeneral) 2501 d2 = self.loopback() 2502 d = defer.gatherResults([d1, d2]) 2503 d.addCallback( 2504 lambda _: self.assertEqual( 2505 SimpleServer.theAccount.subscriptions, ["THIS/MBOX"] 2506 ) 2507 ) 2508 return d 2509 2510 def testUnsubscribe(self): 2511 SimpleServer.theAccount.subscriptions = ["THIS/MBOX", "THAT/MBOX"] 2512 2513 def login(): 2514 return self.client.login(b"testuser", b"password-test") 2515 2516 def unsubscribe(): 2517 return self.client.unsubscribe("this/mbox") 2518 2519 d1 = self.connected.addCallback(strip(login)) 2520 d1.addCallbacks(strip(unsubscribe), self._ebGeneral) 2521 d1.addCallbacks(self._cbStopClient, self._ebGeneral) 2522 d2 = self.loopback() 2523 d = defer.gatherResults([d1, d2]) 2524 d.addCallback( 2525 lambda _: self.assertEqual( 2526 SimpleServer.theAccount.subscriptions, ["THAT/MBOX"] 2527 ) 2528 ) 2529 return d 2530 2531 def _listSetup(self, f): 2532 SimpleServer.theAccount.addMailbox("root/subthing") 2533 SimpleServer.theAccount.addMailbox("root/another-thing") 2534 SimpleServer.theAccount.addMailbox("non-root/subthing") 2535 2536 def login(): 2537 return self.client.login(b"testuser", b"password-test") 2538 2539 def listed(answers): 2540 self.listed = answers 2541 2542 self.listed = None 2543 d1 = self.connected.addCallback(strip(login)) 2544 d1.addCallbacks(strip(f), self._ebGeneral) 2545 d1.addCallbacks(listed, self._ebGeneral) 2546 d1.addCallbacks(self._cbStopClient, self._ebGeneral) 2547 d2 = self.loopback() 2548 return defer.gatherResults([d1, d2]).addCallback(lambda _: self.listed) 2549 2550 def assertListDelimiterAndMailboxAreStrings(self, results): 2551 """ 2552 Assert a C{LIST} response's delimiter and mailbox are native 2553 strings. 2554 2555 @param results: A list of tuples as returned by 2556 L{IMAP4Client.list} or L{IMAP4Client.lsub}. 2557 """ 2558 for result in results: 2559 self.assertIsInstance(result[1], str, "delimiter %r is not a str") 2560 self.assertIsInstance(result[2], str, "mailbox %r is not a str") 2561 return results 2562 2563 def testList(self): 2564 def mailboxList(): 2565 return self.client.list("root", "%") 2566 2567 d = self._listSetup(mailboxList) 2568 2569 @d.addCallback 2570 def assertListContents(listed): 2571 expectedContents = [ 2572 (sorted(SimpleMailbox.flags), "/", "ROOT/SUBTHING"), 2573 (sorted(SimpleMailbox.flags), "/", "ROOT/ANOTHER-THING"), 2574 ] 2575 2576 for _ in range(2): 2577 flags, delimiter, mailbox = listed.pop(0) 2578 self.assertIn( 2579 (sorted(flags), delimiter, mailbox), 2580 expectedContents, 2581 ) 2582 2583 self.assertFalse(listed, f"More results than expected: {listed!r}") 2584 2585 return d 2586 2587 def testLSub(self): 2588 SimpleServer.theAccount.subscribe("ROOT/SUBTHING") 2589 2590 def lsub(): 2591 return self.client.lsub("root", "%") 2592 2593 d = self._listSetup(lsub) 2594 d.addCallback(self.assertListDelimiterAndMailboxAreStrings) 2595 d.addCallback(self.assertEqual, [(SimpleMailbox.flags, "/", "ROOT/SUBTHING")]) 2596 return d 2597 2598 def testStatus(self): 2599 SimpleServer.theAccount.addMailbox("root/subthing") 2600 2601 def login(): 2602 return self.client.login(b"testuser", b"password-test") 2603 2604 def status(): 2605 return self.client.status("root/subthing", "MESSAGES", "UIDNEXT", "UNSEEN") 2606 2607 def statused(result): 2608 self.statused = result 2609 2610 self.statused = None 2611 d1 = self.connected.addCallback(strip(login)) 2612 d1.addCallbacks(strip(status), self._ebGeneral) 2613 d1.addCallbacks(statused, self._ebGeneral) 2614 d1.addCallbacks(self._cbStopClient, self._ebGeneral) 2615 d2 = self.loopback() 2616 d = defer.gatherResults([d1, d2]) 2617 d.addCallback( 2618 lambda _: self.assertEqual( 2619 self.statused, {"MESSAGES": 9, "UIDNEXT": b"10", "UNSEEN": 4} 2620 ) 2621 ) 2622 return d 2623 2624 def testFailedStatus(self): 2625 def login(): 2626 return self.client.login(b"testuser", b"password-test") 2627 2628 def status(): 2629 return self.client.status( 2630 "root/nonexistent", "MESSAGES", "UIDNEXT", "UNSEEN" 2631 ) 2632 2633 def statused(result): 2634 self.statused = result 2635 2636 def failed(failure): 2637 self.failure = failure 2638 2639 self.statused = self.failure = None 2640 d1 = self.connected.addCallback(strip(login)) 2641 d1.addCallbacks(strip(status), self._ebGeneral) 2642 d1.addCallbacks(statused, failed) 2643 d1.addCallbacks(self._cbStopClient, self._ebGeneral) 2644 d2 = self.loopback() 2645 return defer.gatherResults([d1, d2]).addCallback(self._cbTestFailedStatus) 2646 2647 def _cbTestFailedStatus(self, ignored): 2648 self.assertEqual(self.statused, None) 2649 self.assertEqual(self.failure.value.args, (b"Could not open mailbox",)) 2650 2651 def testFullAppend(self): 2652 infile = util.sibpath(__file__, "rfc822.message") 2653 SimpleServer.theAccount.addMailbox("root/subthing") 2654 2655 def login(): 2656 return self.client.login(b"testuser", b"password-test") 2657 2658 @defer.inlineCallbacks 2659 def append(): 2660 with open(infile, "rb") as message: 2661 result = yield self.client.append( 2662 "root/subthing", 2663 message, 2664 ("\\SEEN", "\\DELETED"), 2665 "Tue, 17 Jun 2003 11:22:16 -0600 (MDT)", 2666 ) 2667 defer.returnValue(result) 2668 2669 d1 = self.connected.addCallback(strip(login)) 2670 d1.addCallbacks(strip(append), self._ebGeneral) 2671 d1.addCallbacks(self._cbStopClient, self._ebGeneral) 2672 d2 = self.loopback() 2673 2674 d = defer.gatherResults([d1, d2]) 2675 2676 return d.addCallback(self._cbTestFullAppend, infile) 2677 2678 def _cbTestFullAppend(self, ignored, infile): 2679 mb = SimpleServer.theAccount.mailboxes["ROOT/SUBTHING"] 2680 self.assertEqual(1, len(mb.messages)) 2681 self.assertEqual( 2682 (["\\SEEN", "\\DELETED"], b"Tue, 17 Jun 2003 11:22:16 -0600 (MDT)", 0), 2683 mb.messages[0][1:], 2684 ) 2685 with open(infile, "rb") as f: 2686 self.assertEqual(f.read(), mb.messages[0][0].getvalue()) 2687 2688 def testPartialAppend(self): 2689 infile = util.sibpath(__file__, "rfc822.message") 2690 SimpleServer.theAccount.addMailbox("PARTIAL/SUBTHING") 2691 2692 def login(): 2693 return self.client.login(b"testuser", b"password-test") 2694 2695 @defer.inlineCallbacks 2696 def append(): 2697 with open(infile, "rb") as message: 2698 result = yield self.client.sendCommand( 2699 imap4.Command( 2700 b"APPEND", 2701 # Using networkString is cheating! In this 2702 # particular case the mailbox name happens to 2703 # be ASCII. In real code, the mailbox would 2704 # be encoded with imap4-utf-7. 2705 networkString( 2706 "PARTIAL/SUBTHING " 2707 '(\\SEEN) "Right now" ' 2708 "{%d}" % (os.path.getsize(infile),) 2709 ), 2710 (), 2711 self.client._IMAP4Client__cbContinueAppend, 2712 message, 2713 ) 2714 ) 2715 defer.returnValue(result) 2716 2717 d1 = self.connected.addCallback(strip(login)) 2718 d1.addCallbacks(strip(append), self._ebGeneral) 2719 d1.addCallbacks(self._cbStopClient, self._ebGeneral) 2720 d2 = self.loopback() 2721 d = defer.gatherResults([d1, d2]) 2722 return d.addCallback(self._cbTestPartialAppend, infile) 2723 2724 def _cbTestPartialAppend(self, ignored, infile): 2725 mb = SimpleServer.theAccount.mailboxes["PARTIAL/SUBTHING"] 2726 self.assertEqual(1, len(mb.messages)) 2727 self.assertEqual((["\\SEEN"], b"Right now", 0), mb.messages[0][1:]) 2728 with open(infile, "rb") as f: 2729 self.assertEqual(f.read(), mb.messages[0][0].getvalue()) 2730 2731 def _testCheck(self): 2732 SimpleServer.theAccount.addMailbox(b"root/subthing") 2733 2734 def login(): 2735 return self.client.login(b"testuser", b"password-test") 2736 2737 def select(): 2738 return self.client.select(b"root/subthing") 2739 2740 def check(): 2741 return self.client.check() 2742 2743 d = self.connected.addCallback(strip(login)) 2744 d.addCallbacks(strip(select), self._ebGeneral) 2745 d.addCallbacks(strip(check), self._ebGeneral) 2746 d.addCallbacks(self._cbStopClient, self._ebGeneral) 2747 return self.loopback() 2748 2749 def test_check(self): 2750 """ 2751 Trigger the L{imap.IMAP4Server._cbSelectWork} callback 2752 by selecting an mbox. 2753 """ 2754 return self._testCheck() 2755 2756 def test_checkFail(self): 2757 """ 2758 Trigger the L{imap.IMAP4Server._ebSelectWork} errback 2759 by failing when we select an mbox. 2760 """ 2761 2762 def failSelect(self, name, rw=1): 2763 raise imap4.IllegalMailboxEncoding("encoding") 2764 2765 def checkResponse(ignore): 2766 failures = self.flushLoggedErrors() 2767 self.assertEqual(failures[1].value.args[0], b"SELECT failed: Server error") 2768 2769 self.patch(Account, "select", failSelect) 2770 d = self._testCheck() 2771 return d.addCallback(checkResponse) 2772 2773 def testClose(self): 2774 m = SimpleMailbox() 2775 m.messages = [ 2776 (b"Message 1", ("\\Deleted", "AnotherFlag"), None, 0), 2777 (b"Message 2", ("AnotherFlag",), None, 1), 2778 (b"Message 3", ("\\Deleted",), None, 2), 2779 ] 2780 SimpleServer.theAccount.addMailbox("mailbox", m) 2781 2782 def login(): 2783 return self.client.login(b"testuser", b"password-test") 2784 2785 def select(): 2786 return self.client.select(b"mailbox") 2787 2788 def close(): 2789 return self.client.close() 2790 2791 d = self.connected.addCallback(strip(login)) 2792 d.addCallbacks(strip(select), self._ebGeneral) 2793 d.addCallbacks(strip(close), self._ebGeneral) 2794 d.addCallbacks(self._cbStopClient, self._ebGeneral) 2795 d2 = self.loopback() 2796 return defer.gatherResults([d, d2]).addCallback(self._cbTestClose, m) 2797 2798 def _cbTestClose(self, ignored, m): 2799 self.assertEqual(len(m.messages), 1) 2800 self.assertEqual(m.messages[0], (b"Message 2", ("AnotherFlag",), None, 1)) 2801 self.assertTrue(m.closed) 2802 2803 def testExpunge(self): 2804 m = SimpleMailbox() 2805 m.messages = [ 2806 (b"Message 1", ("\\Deleted", "AnotherFlag"), None, 0), 2807 (b"Message 2", ("AnotherFlag",), None, 1), 2808 (b"Message 3", ("\\Deleted",), None, 2), 2809 ] 2810 SimpleServer.theAccount.addMailbox("mailbox", m) 2811 2812 def login(): 2813 return self.client.login(b"testuser", b"password-test") 2814 2815 def select(): 2816 return self.client.select("mailbox") 2817 2818 def expunge(): 2819 return self.client.expunge() 2820 2821 def expunged(results): 2822 self.assertFalse(self.server.mbox is None) 2823 self.results = results 2824 2825 self.results = None 2826 d1 = self.connected.addCallback(strip(login)) 2827 d1.addCallbacks(strip(select), self._ebGeneral) 2828 d1.addCallbacks(strip(expunge), self._ebGeneral) 2829 d1.addCallbacks(expunged, self._ebGeneral) 2830 d1.addCallbacks(self._cbStopClient, self._ebGeneral) 2831 d2 = self.loopback() 2832 d = defer.gatherResults([d1, d2]) 2833 return d.addCallback(self._cbTestExpunge, m) 2834 2835 def _cbTestExpunge(self, ignored, m): 2836 self.assertEqual(len(m.messages), 1) 2837 self.assertEqual(m.messages[0], (b"Message 2", ("AnotherFlag",), None, 1)) 2838 2839 self.assertEqual(self.results, [0, 2]) 2840 2841 2842class IMAP4ServerParsingTests(SynchronousTestCase): 2843 """ 2844 Test L{imap4.IMAP4Server}'s command parsing. 2845 """ 2846 2847 def setUp(self): 2848 self.transport = StringTransport() 2849 self.server = imap4.IMAP4Server() 2850 self.server.makeConnection(self.transport) 2851 self.transport.clear() 2852 2853 def tearDown(self): 2854 self.server.connectionLost(failure.Failure(error.ConnectionDone())) 2855 2856 def test_parseMethodExceptionLogged(self): 2857 """ 2858 L{imap4.IMAP4Server} logs exceptions raised by parse methods. 2859 """ 2860 2861 class UnhandledException(Exception): 2862 """ 2863 An unhandled exception. 2864 """ 2865 2866 def raisesValueError(line): 2867 raise UnhandledException 2868 2869 self.server.parseState = "command" 2870 self.server.parse_command = raisesValueError 2871 2872 self.server.lineReceived(b"invalid") 2873 2874 self.assertTrue(self.flushLoggedErrors(UnhandledException)) 2875 2876 def test_missingCommand(self): 2877 """ 2878 L{imap4.IMAP4Server.parse_command} sends a C{BAD} response to 2879 a line that includes a tag but no command. 2880 """ 2881 self.server.parse_command(b"001") 2882 2883 self.assertEqual(self.transport.value(), b"001 BAD Missing command\r\n") 2884 2885 self.server.connectionLost( 2886 failure.Failure(error.ConnectionDone("Done")), 2887 ) 2888 2889 def test_emptyLine(self): 2890 """ 2891 L{imap4.IMAP4Server.parse_command} sends a C{BAD} response to 2892 an empty line. 2893 """ 2894 self.server.parse_command(b"") 2895 2896 self.assertEqual(self.transport.value(), b"* BAD Null command\r\n") 2897 2898 def assertParseExceptionResponse(self, exception, tag, expectedResponse): 2899 """ 2900 Assert that the given exception results in the expected 2901 response. 2902 2903 @param exception: The exception to raise. 2904 @type exception: L{Exception} 2905 2906 @param tag: The IMAP tag. 2907 2908 @type: L{bytes} 2909 2910 @param expectedResponse: The expected bad response. 2911 @type expectedResponse: L{bytes} 2912 """ 2913 2914 def raises(tag, cmd, rest): 2915 raise exception 2916 2917 self.server.dispatchCommand = raises 2918 2919 self.server.parse_command(b" ".join([tag, b"invalid"])) 2920 2921 self.assertEqual(self.transport.value(), b" ".join([tag, expectedResponse])) 2922 2923 def test_parsingRaisesIllegalClientResponse(self): 2924 """ 2925 When a parsing method raises L{IllegalClientResponse}, the 2926 server sends a C{BAD} response. 2927 """ 2928 self.assertParseExceptionResponse( 2929 imap4.IllegalClientResponse("client response"), 2930 b"001", 2931 b"BAD Illegal syntax: client response\r\n", 2932 ) 2933 2934 def test_parsingRaisesIllegalOperationResponse(self): 2935 """ 2936 When a parsing method raises L{IllegalOperation}, the server 2937 sends a C{NO} response. 2938 """ 2939 self.assertParseExceptionResponse( 2940 imap4.IllegalOperation("operation"), 2941 b"001", 2942 b"NO Illegal operation: operation\r\n", 2943 ) 2944 2945 def test_parsingRaisesIllegalMailboxEncoding(self): 2946 """ 2947 When a parsing method raises L{IllegalMailboxEncoding}, the 2948 server sends a C{NO} response. 2949 """ 2950 self.assertParseExceptionResponse( 2951 imap4.IllegalMailboxEncoding("encoding"), 2952 b"001", 2953 b"NO Illegal mailbox name: encoding\r\n", 2954 ) 2955 2956 def test_unsupportedCommand(self): 2957 """ 2958 L{imap4.IMAP4Server} responds to an unsupported command with a 2959 C{BAD} response. 2960 """ 2961 self.server.lineReceived(b"001 HULLABALOO") 2962 self.assertEqual(self.transport.value(), b"001 BAD Unsupported command\r\n") 2963 2964 def test_tooManyArgumentsForCommand(self): 2965 """ 2966 L{imap4.IMAP4Server} responds with a C{BAD} response to a 2967 command with more arguments than expected. 2968 """ 2969 self.server.lineReceived(b"001 LOGIN A B C") 2970 self.assertEqual( 2971 self.transport.value(), 2972 ( 2973 b"001 BAD Illegal syntax:" 2974 + b" Too many arguments for command: " 2975 + repr(b"C").encode("utf-8") 2976 + b"\r\n" 2977 ), 2978 ) 2979 2980 def assertCommandExceptionResponse(self, exception, tag, expectedResponse): 2981 """ 2982 Assert that the given exception results in the expected 2983 response. 2984 2985 @param exception: The exception to raise. 2986 @type exception: L{Exception} 2987 2988 @param: The IMAP tag. 2989 2990 @type: L{bytes} 2991 2992 @param expectedResponse: The expected bad response. 2993 @type expectedResponse: L{bytes} 2994 """ 2995 2996 def raises(serverInstance, tag, user, passwd): 2997 raise exception 2998 2999 self.assertEqual(self.server.state, "unauth") 3000 3001 self.server.unauth_LOGIN = (raises,) + self.server.unauth_LOGIN[1:] 3002 3003 self.server.dispatchCommand(tag, b"LOGIN", b"user passwd") 3004 3005 self.assertEqual(self.transport.value(), b" ".join([tag, expectedResponse])) 3006 3007 def test_commandRaisesIllegalClientResponse(self): 3008 """ 3009 When a command raises L{IllegalClientResponse}, the 3010 server sends a C{BAD} response. 3011 """ 3012 self.assertCommandExceptionResponse( 3013 imap4.IllegalClientResponse("client response"), 3014 b"001", 3015 b"BAD Illegal syntax: client response\r\n", 3016 ) 3017 3018 def test_commandRaisesIllegalOperationResponse(self): 3019 """ 3020 When a command raises L{IllegalOperation}, the server sends a 3021 C{NO} response. 3022 """ 3023 self.assertCommandExceptionResponse( 3024 imap4.IllegalOperation("operation"), 3025 b"001", 3026 b"NO Illegal operation: operation\r\n", 3027 ) 3028 3029 def test_commandRaisesIllegalMailboxEncoding(self): 3030 """ 3031 When a command raises L{IllegalMailboxEncoding}, the server 3032 sends a C{NO} response. 3033 """ 3034 self.assertCommandExceptionResponse( 3035 imap4.IllegalMailboxEncoding("encoding"), 3036 b"001", 3037 b"NO Illegal mailbox name: encoding\r\n", 3038 ) 3039 3040 def test_commandRaisesUnhandledException(self): 3041 """ 3042 Wehn a command raises an unhandled exception, the server sends 3043 a C{BAD} response and logs the exception. 3044 """ 3045 3046 class UnhandledException(Exception): 3047 """ 3048 An unhandled exception. 3049 """ 3050 3051 self.assertCommandExceptionResponse( 3052 UnhandledException("unhandled"), 3053 b"001", 3054 b"BAD Server error: unhandled\r\n", 3055 ) 3056 3057 self.assertTrue(self.flushLoggedErrors(UnhandledException)) 3058 3059 def test_stringLiteralTooLong(self): 3060 """ 3061 A string literal whose length exceeds the maximum allowed 3062 length results in a C{BAD} response. 3063 """ 3064 self.server._literalStringLimit = 4 3065 self.server.lineReceived(b"001 LOGIN {5}\r\n") 3066 3067 self.assertEqual( 3068 self.transport.value(), 3069 b"001 BAD Illegal syntax: Literal too long!" 3070 b" I accept at most 4 octets\r\n", 3071 ) 3072 3073 def test_arg_astringEmptyLine(self): 3074 """ 3075 An empty string argument raises L{imap4.IllegalClientResponse}. 3076 """ 3077 for empty in [b"", b"\r\n", b" "]: 3078 self.assertRaises( 3079 imap4.IllegalClientResponse, self.server.arg_astring, empty 3080 ) 3081 3082 def test_arg_astringUnmatchedQuotes(self): 3083 """ 3084 An unmatched quote in a string argument raises 3085 L{imap4.IllegalClientResponse}. 3086 """ 3087 self.assertRaises( 3088 imap4.IllegalClientResponse, self.server.arg_astring, b'"open' 3089 ) 3090 3091 def test_arg_astringUnmatchedLiteralBraces(self): 3092 """ 3093 An unmatched brace in a string literal's size raises 3094 L{imap4.IllegalClientResponse}. 3095 """ 3096 self.assertRaises(imap4.IllegalClientResponse, self.server.arg_astring, b"{0") 3097 3098 def test_arg_astringInvalidLiteralSize(self): 3099 """ 3100 A non-integral string literal size raises 3101 L{imap4.IllegalClientResponse}. 3102 """ 3103 self.assertRaises( 3104 imap4.IllegalClientResponse, self.server.arg_astring, b"{[object Object]}" 3105 ) 3106 3107 def test_arg_atomEmptyLine(self): 3108 """ 3109 An empty atom raises L{IllegalClientResponse}. 3110 """ 3111 self.assertRaises(imap4.IllegalClientResponse, self.server.arg_atom, b"") 3112 3113 def test_arg_atomMalformedAtom(self): 3114 """ 3115 A malformed atom raises L{IllegalClientResponse}. 3116 """ 3117 self.assertRaises( 3118 imap4.IllegalClientResponse, self.server.arg_atom, b" not an atom " 3119 ) 3120 3121 def test_arg_plistEmptyLine(self): 3122 """ 3123 An empty parenthesized list raises L{IllegalClientResponse}. 3124 """ 3125 self.assertRaises(imap4.IllegalClientResponse, self.server.arg_plist, b"") 3126 3127 def test_arg_plistUnmatchedParentheses(self): 3128 """ 3129 A parenthesized with unmatched parentheses raises 3130 L{IllegalClientResponse}. 3131 """ 3132 self.assertRaises(imap4.IllegalClientResponse, self.server.arg_plist, b"(foo") 3133 self.assertRaises(imap4.IllegalClientResponse, self.server.arg_plist, b"foo)") 3134 3135 def test_arg_literalEmptyLine(self): 3136 """ 3137 An empty file literal raises L{IllegalClientResponse}. 3138 """ 3139 self.assertRaises(imap4.IllegalClientResponse, self.server.arg_literal, b"") 3140 3141 def test_arg_literalUnmatchedBraces(self): 3142 """ 3143 A literal with unmatched braces raises 3144 L{IllegalClientResponse}. 3145 """ 3146 self.assertRaises(imap4.IllegalClientResponse, self.server.arg_literal, b"{10") 3147 self.assertRaises(imap4.IllegalClientResponse, self.server.arg_literal, b"10}") 3148 3149 def test_arg_literalInvalidLiteralSize(self): 3150 """ 3151 A non-integral literal size raises 3152 L{imap4.IllegalClientResponse}. 3153 """ 3154 self.assertRaises( 3155 imap4.IllegalClientResponse, self.server.arg_literal, b"{[object Object]}" 3156 ) 3157 3158 def test_arg_seqsetReturnsRest(self): 3159 """ 3160 A sequence set returns the unparsed portion of a line. 3161 """ 3162 sequence = b"1:* blah blah blah" 3163 _, rest = self.server.arg_seqset(sequence) 3164 self.assertEqual(rest, b"blah blah blah") 3165 3166 def test_arg_seqsetInvalidSequence(self): 3167 """ 3168 An invalid sequence raises L{imap4.IllegalClientResponse}. 3169 """ 3170 self.assertRaises(imap4.IllegalClientResponse, self.server.arg_seqset, b"x:y") 3171 3172 def test_arg_flaglistOneFlag(self): 3173 """ 3174 A single flag that is not contained in a list is parsed. 3175 """ 3176 flag = b"flag" 3177 parsed, rest = self.server.arg_flaglist(flag) 3178 self.assertEqual(parsed, [flag]) 3179 self.assertFalse(rest) 3180 3181 def test_arg_flaglistMismatchedParentehses(self): 3182 """ 3183 A list of flags with unmatched parentheses raises 3184 L{imap4.IllegalClientResponse}. 3185 """ 3186 self.assertRaises( 3187 imap4.IllegalClientResponse, 3188 self.server.arg_flaglist, 3189 b"(invalid", 3190 ) 3191 3192 def test_arg_flaglistMalformedFlag(self): 3193 """ 3194 A list of flags that contains a malformed flag raises 3195 L{imap4.IllegalClientResponse}. 3196 """ 3197 self.assertRaises( 3198 imap4.IllegalClientResponse, self.server.arg_flaglist, b"(first \x00)" 3199 ) 3200 self.assertRaises( 3201 imap4.IllegalClientResponse, self.server.arg_flaglist, b"(first \x00second)" 3202 ) 3203 3204 def test_opt_plistMissingOpenParenthesis(self): 3205 """ 3206 A line that does not begin with an open parenthesis (C{(}) is 3207 parsed as L{None}, and the remainder is the whole line. 3208 """ 3209 line = b"not (" 3210 plist, remainder = self.server.opt_plist(line) 3211 self.assertIsNone(plist) 3212 self.assertEqual(remainder, line) 3213 3214 def test_opt_datetimeMissingOpenQuote(self): 3215 """ 3216 A line that does not begin with a double quote (C{"}) is 3217 parsed as L{None}, and the remainder is the whole line. 3218 """ 3219 line = b'not "' 3220 dt, remainder = self.server.opt_datetime(line) 3221 self.assertIsNone(dt) 3222 self.assertEqual(remainder, line) 3223 3224 def test_opt_datetimeMissingCloseQuote(self): 3225 """ 3226 A line that does not have a closing double quote (C{"}) raises 3227 L{imap4.IllegalClientResponse}. 3228 """ 3229 line = b'"21-Jul-2017 19:37:07 -0700' 3230 self.assertRaises(imap4.IllegalClientResponse, self.server.opt_datetime, line) 3231 3232 def test_opt_charsetMissingIdentifier(self): 3233 """ 3234 A line that contains C{CHARSET} but no character set 3235 identifier raises L{imap4.IllegalClientResponse}. 3236 """ 3237 line = b"CHARSET" 3238 self.assertRaises(imap4.IllegalClientResponse, self.server.opt_charset, line) 3239 3240 def test_opt_charsetEndOfLine(self): 3241 """ 3242 A line that ends with a C{CHARSET} identifier is parsed as 3243 that identifier, and the remainder is the empty string. 3244 """ 3245 line = b"CHARSET UTF-8" 3246 identifier, remainder = self.server.opt_charset(line) 3247 self.assertEqual(identifier, b"UTF-8") 3248 self.assertEqual(remainder, b"") 3249 3250 def test_opt_charsetWithRemainder(self): 3251 """ 3252 A line that has additional data after a C{CHARSET} identifier 3253 is parsed as that identifier, and the remainder is that 3254 additional data. 3255 """ 3256 line = b"CHARSET UTF-8 remainder" 3257 identifier, remainder = self.server.opt_charset(line) 3258 self.assertEqual(identifier, b"UTF-8") 3259 self.assertEqual(remainder, b"remainder") 3260 3261 3262class IMAP4ServerSearchTests(IMAP4HelperMixin, TestCase): 3263 """ 3264 Tests for the behavior of the search_* functions in L{imap4.IMAP4Server}. 3265 """ 3266 3267 def setUp(self): 3268 IMAP4HelperMixin.setUp(self) 3269 self.earlierQuery = ["10-Dec-2009"] 3270 self.sameDateQuery = ["13-Dec-2009"] 3271 self.laterQuery = ["16-Dec-2009"] 3272 self.seq = 0 3273 self.msg = FakeyMessage( 3274 {"date": "Mon, 13 Dec 2009 21:25:10 GMT"}, 3275 [], 3276 "13 Dec 2009 00:00:00 GMT", 3277 "", 3278 1234, 3279 None, 3280 ) 3281 3282 def test_searchSentBefore(self): 3283 """ 3284 L{imap4.IMAP4Server.search_SENTBEFORE} returns True if the message date 3285 is earlier than the query date. 3286 """ 3287 self.assertFalse( 3288 self.server.search_SENTBEFORE(self.earlierQuery, self.seq, self.msg) 3289 ) 3290 self.assertTrue( 3291 self.server.search_SENTBEFORE(self.laterQuery, self.seq, self.msg) 3292 ) 3293 3294 def test_searchWildcard(self): 3295 """ 3296 L{imap4.IMAP4Server.search_UID} returns True if the message UID is in 3297 the search range. 3298 """ 3299 self.assertFalse( 3300 self.server.search_UID([b"2:3"], self.seq, self.msg, (1, 1234)) 3301 ) 3302 # 2:* should get translated to 2:<max UID> and then to 1:2 3303 self.assertTrue(self.server.search_UID([b"2:*"], self.seq, self.msg, (1, 1234))) 3304 self.assertTrue(self.server.search_UID([b"*"], self.seq, self.msg, (1, 1234))) 3305 3306 def test_searchWildcardHigh(self): 3307 """ 3308 L{imap4.IMAP4Server.search_UID} should return True if there is a 3309 wildcard, because a wildcard means "highest UID in the mailbox". 3310 """ 3311 self.assertTrue( 3312 self.server.search_UID([b"1235:*"], self.seq, self.msg, (1234, 1)) 3313 ) 3314 3315 def test_reversedSearchTerms(self): 3316 """ 3317 L{imap4.IMAP4Server.search_SENTON} returns True if the message date is 3318 the same as the query date. 3319 """ 3320 msgset = imap4.parseIdList(b"4:2") 3321 self.assertEqual(list(msgset), [2, 3, 4]) 3322 3323 def test_searchSentOn(self): 3324 """ 3325 L{imap4.IMAP4Server.search_SENTON} returns True if the message date is 3326 the same as the query date. 3327 """ 3328 self.assertFalse( 3329 self.server.search_SENTON(self.earlierQuery, self.seq, self.msg) 3330 ) 3331 self.assertTrue( 3332 self.server.search_SENTON(self.sameDateQuery, self.seq, self.msg) 3333 ) 3334 self.assertFalse(self.server.search_SENTON(self.laterQuery, self.seq, self.msg)) 3335 3336 def test_searchSentSince(self): 3337 """ 3338 L{imap4.IMAP4Server.search_SENTSINCE} returns True if the message date 3339 is later than the query date. 3340 """ 3341 self.assertTrue( 3342 self.server.search_SENTSINCE(self.earlierQuery, self.seq, self.msg) 3343 ) 3344 self.assertFalse( 3345 self.server.search_SENTSINCE(self.laterQuery, self.seq, self.msg) 3346 ) 3347 3348 def test_searchOr(self): 3349 """ 3350 L{imap4.IMAP4Server.search_OR} returns true if either of the two 3351 expressions supplied to it returns true and returns false if neither 3352 does. 3353 """ 3354 self.assertTrue( 3355 self.server.search_OR( 3356 ["SENTSINCE"] + self.earlierQuery + ["SENTSINCE"] + self.laterQuery, 3357 self.seq, 3358 self.msg, 3359 (None, None), 3360 ) 3361 ) 3362 self.assertTrue( 3363 self.server.search_OR( 3364 ["SENTSINCE"] + self.laterQuery + ["SENTSINCE"] + self.earlierQuery, 3365 self.seq, 3366 self.msg, 3367 (None, None), 3368 ) 3369 ) 3370 self.assertFalse( 3371 self.server.search_OR( 3372 ["SENTON"] + self.laterQuery + ["SENTSINCE"] + self.laterQuery, 3373 self.seq, 3374 self.msg, 3375 (None, None), 3376 ) 3377 ) 3378 3379 def test_searchNot(self): 3380 """ 3381 L{imap4.IMAP4Server.search_NOT} returns the negation of the result 3382 of the expression supplied to it. 3383 """ 3384 self.assertFalse( 3385 self.server.search_NOT( 3386 ["SENTSINCE"] + self.earlierQuery, self.seq, self.msg, (None, None) 3387 ) 3388 ) 3389 self.assertTrue( 3390 self.server.search_NOT( 3391 ["SENTON"] + self.laterQuery, self.seq, self.msg, (None, None) 3392 ) 3393 ) 3394 3395 def test_searchBefore(self): 3396 """ 3397 L{imap4.IMAP4Server.search_BEFORE} returns True if the 3398 internal message date is before the query date. 3399 """ 3400 self.assertFalse( 3401 self.server.search_BEFORE(self.earlierQuery, self.seq, self.msg) 3402 ) 3403 self.assertFalse( 3404 self.server.search_BEFORE(self.sameDateQuery, self.seq, self.msg) 3405 ) 3406 self.assertTrue(self.server.search_BEFORE(self.laterQuery, self.seq, self.msg)) 3407 3408 def test_searchOn(self): 3409 """ 3410 L{imap4.IMAP4Server.search_ON} returns True if the 3411 internal message date is the same as the query date. 3412 """ 3413 self.assertFalse(self.server.search_ON(self.earlierQuery, self.seq, self.msg)) 3414 self.assertFalse(self.server.search_ON(self.sameDateQuery, self.seq, self.msg)) 3415 self.assertFalse(self.server.search_ON(self.laterQuery, self.seq, self.msg)) 3416 3417 def test_searchSince(self): 3418 """ 3419 L{imap4.IMAP4Server.search_SINCE} returns True if the 3420 internal message date is greater than the query date. 3421 """ 3422 self.assertTrue(self.server.search_SINCE(self.earlierQuery, self.seq, self.msg)) 3423 self.assertTrue( 3424 self.server.search_SINCE(self.sameDateQuery, self.seq, self.msg) 3425 ) 3426 self.assertFalse(self.server.search_SINCE(self.laterQuery, self.seq, self.msg)) 3427 3428 3429@implementer(IRealm) 3430class TestRealm: 3431 """ 3432 A L{IRealm} for tests. 3433 3434 @cvar theAccount: An C{Account} instance. Tests can set this to 3435 ensure predictable account retrieval. 3436 """ 3437 3438 theAccount = None 3439 3440 def __init__(self, accountHolder=None): 3441 """ 3442 Create a realm for testing. 3443 3444 @param accountHolder: (optional) An object whose C{theAccount} 3445 attribute will be returned instead of 3446 L{TestRealm.theAccount}. Attribute access occurs on every 3447 avatar request, so any modifications to 3448 C{accountHolder.theAccount} will be reflected here. 3449 """ 3450 if accountHolder: 3451 self._getAccount = lambda: accountHolder.theAccount 3452 else: 3453 self._getAccount = lambda: self.theAccount 3454 3455 def requestAvatar(self, avatarId, mind, *interfaces): 3456 return imap4.IAccount, self._getAccount(), lambda: None 3457 3458 3459class TestChecker: 3460 credentialInterfaces = (IUsernameHashedPassword, IUsernamePassword) 3461 3462 users = {b"testuser": b"secret"} 3463 3464 def requestAvatarId(self, credentials): 3465 if credentials.username in self.users: 3466 return defer.maybeDeferred( 3467 credentials.checkPassword, self.users[credentials.username] 3468 ).addCallback(self._cbCheck, credentials.username) 3469 3470 def _cbCheck(self, result, username): 3471 if result: 3472 return username 3473 raise UnauthorizedLogin() 3474 3475 3476class AuthenticatorTests(IMAP4HelperMixin, TestCase): 3477 def setUp(self): 3478 IMAP4HelperMixin.setUp(self) 3479 3480 realm = TestRealm() 3481 realm.theAccount = Account(b"testuser") 3482 self.portal = Portal(realm) 3483 self.portal.registerChecker(TestChecker()) 3484 self.server.portal = self.portal 3485 3486 self.authenticated = 0 3487 self.account = realm.theAccount 3488 3489 def test_customChallengers(self): 3490 """ 3491 L{imap4.IMAP4Server} accepts a L{dict} mapping challenge type 3492 names to L{twisted.mail.interfaces.IChallengeResponse} 3493 providers. 3494 """ 3495 3496 @implementer(IChallengeResponse, IUsernamePassword) 3497 class SPECIALAuth: 3498 def getChallenge(self): 3499 return b"SPECIAL" 3500 3501 def setResponse(self, response): 3502 self.username, self.password = response.split(None, 1) 3503 3504 def moreChallenges(self): 3505 return False 3506 3507 def checkPassword(self, password): 3508 self.password = self.password 3509 3510 special = SPECIALAuth() 3511 verifyObject(IChallengeResponse, special) 3512 3513 server = imap4.IMAP4Server({b"SPECIAL": SPECIALAuth}) 3514 server.portal = self.portal 3515 3516 transport = StringTransport() 3517 server.makeConnection(transport) 3518 self.addCleanup(server.connectionLost, error.ConnectionDone("Connection done.")) 3519 3520 self.assertIn(b"AUTH=SPECIAL", transport.value()) 3521 3522 transport.clear() 3523 server.dataReceived(b"001 AUTHENTICATE SPECIAL\r\n") 3524 3525 self.assertIn(base64.b64encode(special.getChallenge()), transport.value()) 3526 3527 transport.clear() 3528 server.dataReceived(base64.b64encode(b"username password") + b"\r\n") 3529 3530 self.assertEqual(transport.value(), b"001 OK Authentication successful\r\n") 3531 3532 def test_unsupportedMethod(self): 3533 """ 3534 An unsupported C{AUTHENTICATE} method results in a negative 3535 response. 3536 """ 3537 server = imap4.IMAP4Server() 3538 server.portal = self.portal 3539 3540 transport = StringTransport() 3541 server.makeConnection(transport) 3542 self.addCleanup(server.connectionLost, error.ConnectionDone("Connection done.")) 3543 3544 transport.clear() 3545 3546 server.dataReceived(b"001 AUTHENTICATE UNKNOWN\r\n") 3547 self.assertEqual( 3548 transport.value(), b"001 NO AUTHENTICATE method unsupported\r\n" 3549 ) 3550 3551 def test_missingPortal(self): 3552 """ 3553 An L{imap4.IMAP4Server} that is missing a L{Portal} responds 3554 negatively to an authentication 3555 """ 3556 self.server.challengers[b"LOGIN"] = imap4.LOGINCredentials 3557 3558 cAuth = imap4.LOGINAuthenticator(b"testuser") 3559 self.client.registerAuthenticator(cAuth) 3560 3561 self.server.portal = None 3562 3563 def auth(): 3564 return self.client.authenticate(b"secret") 3565 3566 d = self.connected.addCallback(strip(auth)) 3567 d.addErrback( 3568 self.assertClientFailureMessage, b"Temporary authentication failure" 3569 ) 3570 d.addCallbacks(self._cbStopClient, self._ebGeneral) 3571 3572 return defer.gatherResults([d, self.loopback()]) 3573 3574 def test_challengerRaisesException(self): 3575 """ 3576 When a challenger's 3577 L{getChallenge<IChallengeResponse.getChallenge>} method raises 3578 any exception, a C{NO} response is sent. 3579 """ 3580 3581 @implementer(IChallengeResponse) 3582 class ValueErrorAuthChallenge: 3583 message = b"A challenge failure" 3584 3585 def getChallenge(self): 3586 raise ValueError(self.message) 3587 3588 def setResponse(self, response): 3589 """ 3590 Never called. 3591 3592 @param response: See L{IChallengeResponse.setResponse} 3593 """ 3594 3595 def moreChallenges(self): 3596 """ 3597 Never called. 3598 """ 3599 3600 @implementer(IClientAuthentication) 3601 class ValueErrorAuthenticator: 3602 def getName(self): 3603 return b"ERROR" 3604 3605 def challengeResponse(self, secret, chal): 3606 return b"IGNORED" 3607 3608 bad = ValueErrorAuthChallenge() 3609 verifyObject(IChallengeResponse, bad) 3610 3611 self.server.challengers[b"ERROR"] = ValueErrorAuthChallenge 3612 self.client.registerAuthenticator(ValueErrorAuthenticator()) 3613 3614 def auth(): 3615 return self.client.authenticate(b"secret") 3616 3617 d = self.connected.addCallback(strip(auth)) 3618 d.addErrback( 3619 self.assertClientFailureMessage, 3620 ("Server error: " + str(ValueErrorAuthChallenge.message)).encode("ascii"), 3621 ) 3622 d.addCallbacks(self._cbStopClient, self._ebGeneral) 3623 3624 return defer.gatherResults([d, self.loopback()]) 3625 3626 def test_authNotBase64(self): 3627 """ 3628 A client that responds with a challenge that cannot be decoded 3629 as Base 64 receives an L{IllegalClientResponse}. 3630 """ 3631 3632 @implementer(IChallengeResponse) 3633 class NotBase64AuthChallenge: 3634 message = b"Malformed Response - not base64" 3635 3636 def getChallenge(self): 3637 return b"SomeChallenge" 3638 3639 def setResponse(self, response): 3640 """ 3641 Never called. 3642 3643 @param response: See L{IChallengeResponse.setResponse} 3644 """ 3645 3646 def moreChallenges(self): 3647 """ 3648 Never called. 3649 """ 3650 3651 notBase64 = NotBase64AuthChallenge() 3652 verifyObject(IChallengeResponse, notBase64) 3653 3654 server = imap4.IMAP4Server() 3655 server.portal = self.portal 3656 server.challengers[b"NOTBASE64"] = NotBase64AuthChallenge 3657 3658 transport = StringTransport() 3659 server.makeConnection(transport) 3660 self.addCleanup(server.connectionLost, error.ConnectionDone("Connection done.")) 3661 3662 self.assertIn(b"AUTH=NOTBASE64", transport.value()) 3663 3664 transport.clear() 3665 server.dataReceived(b"001 AUTHENTICATE NOTBASE64\r\n") 3666 3667 self.assertIn(base64.b64encode(notBase64.getChallenge()), transport.value()) 3668 3669 transport.clear() 3670 server.dataReceived(b"\x00 Not base64\r\n") 3671 3672 self.assertEqual( 3673 transport.value(), 3674 b"".join([b"001 NO Authentication failed: ", notBase64.message, b"\r\n"]), 3675 ) 3676 3677 def test_unhandledCredentials(self): 3678 """ 3679 A challenger that causes the login to fail 3680 L{UnhandledCredentials} results in an C{NO} response. 3681 3682 @return: A L{Deferred} that fires when the authorization has 3683 failed. 3684 """ 3685 realm = TestRealm() 3686 portal = Portal(realm) 3687 # This portal has no checkers, so all logins will fail with 3688 # UnhandledCredentials 3689 self.server.portal = portal 3690 3691 self.server.challengers[b"LOGIN"] = loginCred = imap4.LOGINCredentials 3692 3693 verifyClass(IChallengeResponse, loginCred) 3694 3695 cAuth = imap4.LOGINAuthenticator(b"testuser") 3696 self.client.registerAuthenticator(cAuth) 3697 3698 def auth(): 3699 return self.client.authenticate(b"secret") 3700 3701 d1 = self.connected.addCallback(strip(auth)) 3702 d1.addErrback( 3703 self.assertClientFailureMessage, 3704 b"Authentication failed: server misconfigured", 3705 ) 3706 d1.addCallbacks(self._cbStopClient, self._ebGeneral) 3707 d = defer.gatherResults([self.loopback(), d1]) 3708 return d 3709 3710 def test_unexpectedLoginFailure(self): 3711 """ 3712 If the portal raises an exception other than 3713 L{UnauthorizedLogin} or L{UnhandledCredentials}, the server 3714 responds with a C{BAD} response and the exception is logged. 3715 """ 3716 3717 class UnexpectedException(Exception): 3718 """ 3719 An unexpected exception. 3720 """ 3721 3722 class FailingChecker: 3723 """ 3724 A credentials checker whose L{requestAvatarId} method 3725 raises L{UnexpectedException}. 3726 """ 3727 3728 credentialInterfaces = (IUsernameHashedPassword, IUsernamePassword) 3729 3730 def requestAvatarId(self, credentials): 3731 raise UnexpectedException("Unexpected error.") 3732 3733 realm = TestRealm() 3734 portal = Portal(realm) 3735 portal.registerChecker(FailingChecker()) 3736 self.server.portal = portal 3737 3738 self.server.challengers[b"LOGIN"] = loginCred = imap4.LOGINCredentials 3739 3740 verifyClass(IChallengeResponse, loginCred) 3741 3742 cAuth = imap4.LOGINAuthenticator(b"testuser") 3743 self.client.registerAuthenticator(cAuth) 3744 3745 def auth(): 3746 return self.client.authenticate(b"secret") 3747 3748 def assertUnexpectedExceptionLogged(): 3749 self.assertTrue(self.flushLoggedErrors(UnexpectedException)) 3750 3751 d1 = self.connected.addCallback(strip(auth)) 3752 d1.addErrback( 3753 self.assertClientFailureMessage, b"Server error: login failed unexpectedly" 3754 ) 3755 d1.addCallback(strip(assertUnexpectedExceptionLogged)) 3756 d1.addCallbacks(self._cbStopClient, self._ebGeneral) 3757 d = defer.gatherResults([self.loopback(), d1]) 3758 return d 3759 3760 def testCramMD5(self): 3761 self.server.challengers[b"CRAM-MD5"] = CramMD5Credentials 3762 cAuth = imap4.CramMD5ClientAuthenticator(b"testuser") 3763 self.client.registerAuthenticator(cAuth) 3764 3765 def auth(): 3766 return self.client.authenticate(b"secret") 3767 3768 def authed(): 3769 self.authenticated = 1 3770 3771 d1 = self.connected.addCallback(strip(auth)) 3772 d1.addCallbacks(strip(authed), self._ebGeneral) 3773 d1.addCallbacks(self._cbStopClient, self._ebGeneral) 3774 d2 = self.loopback() 3775 d = defer.gatherResults([d1, d2]) 3776 return d.addCallback(self._cbTestCramMD5) 3777 3778 def _cbTestCramMD5(self, ignored): 3779 self.assertEqual(self.authenticated, 1) 3780 self.assertEqual(self.server.account, self.account) 3781 3782 def testFailedCramMD5(self): 3783 self.server.challengers[b"CRAM-MD5"] = CramMD5Credentials 3784 cAuth = imap4.CramMD5ClientAuthenticator(b"testuser") 3785 self.client.registerAuthenticator(cAuth) 3786 3787 def misauth(): 3788 return self.client.authenticate(b"not the secret") 3789 3790 def authed(): 3791 self.authenticated = 1 3792 3793 def misauthed(): 3794 self.authenticated = -1 3795 3796 d1 = self.connected.addCallback(strip(misauth)) 3797 d1.addCallbacks(strip(authed), strip(misauthed)) 3798 d1.addCallbacks(self._cbStopClient, self._ebGeneral) 3799 d = defer.gatherResults([self.loopback(), d1]) 3800 return d.addCallback(self._cbTestFailedCramMD5) 3801 3802 def _cbTestFailedCramMD5(self, ignored): 3803 self.assertEqual(self.authenticated, -1) 3804 self.assertEqual(self.server.account, None) 3805 3806 def testLOGIN(self): 3807 self.server.challengers[b"LOGIN"] = loginCred = imap4.LOGINCredentials 3808 3809 verifyClass(IChallengeResponse, loginCred) 3810 3811 cAuth = imap4.LOGINAuthenticator(b"testuser") 3812 self.client.registerAuthenticator(cAuth) 3813 3814 def auth(): 3815 return self.client.authenticate(b"secret") 3816 3817 def authed(): 3818 self.authenticated = 1 3819 3820 d1 = self.connected.addCallback(strip(auth)) 3821 d1.addCallbacks(strip(authed), self._ebGeneral) 3822 d1.addCallbacks(self._cbStopClient, self._ebGeneral) 3823 d = defer.gatherResults([self.loopback(), d1]) 3824 return d.addCallback(self._cbTestLOGIN) 3825 3826 def _cbTestLOGIN(self, ignored): 3827 self.assertEqual(self.authenticated, 1) 3828 self.assertEqual(self.server.account, self.account) 3829 3830 def testFailedLOGIN(self): 3831 self.server.challengers[b"LOGIN"] = imap4.LOGINCredentials 3832 cAuth = imap4.LOGINAuthenticator(b"testuser") 3833 self.client.registerAuthenticator(cAuth) 3834 3835 def misauth(): 3836 return self.client.authenticate(b"not the secret") 3837 3838 def authed(): 3839 self.authenticated = 1 3840 3841 def misauthed(): 3842 self.authenticated = -1 3843 3844 d1 = self.connected.addCallback(strip(misauth)) 3845 d1.addCallbacks(strip(authed), strip(misauthed)) 3846 d1.addCallbacks(self._cbStopClient, self._ebGeneral) 3847 d = defer.gatherResults([self.loopback(), d1]) 3848 return d.addCallback(self._cbTestFailedLOGIN) 3849 3850 def _cbTestFailedLOGIN(self, ignored): 3851 self.assertEqual(self.authenticated, -1) 3852 self.assertEqual(self.server.account, None) 3853 3854 def testPLAIN(self): 3855 self.server.challengers[b"PLAIN"] = plainCred = imap4.PLAINCredentials 3856 3857 verifyClass(IChallengeResponse, plainCred) 3858 3859 cAuth = imap4.PLAINAuthenticator(b"testuser") 3860 self.client.registerAuthenticator(cAuth) 3861 3862 def auth(): 3863 return self.client.authenticate(b"secret") 3864 3865 def authed(): 3866 self.authenticated = 1 3867 3868 d1 = self.connected.addCallback(strip(auth)) 3869 d1.addCallbacks(strip(authed), self._ebGeneral) 3870 d1.addCallbacks(self._cbStopClient, self._ebGeneral) 3871 d = defer.gatherResults([self.loopback(), d1]) 3872 return d.addCallback(self._cbTestPLAIN) 3873 3874 def _cbTestPLAIN(self, ignored): 3875 self.assertEqual(self.authenticated, 1) 3876 self.assertEqual(self.server.account, self.account) 3877 3878 def testFailedPLAIN(self): 3879 self.server.challengers[b"PLAIN"] = imap4.PLAINCredentials 3880 cAuth = imap4.PLAINAuthenticator(b"testuser") 3881 self.client.registerAuthenticator(cAuth) 3882 3883 def misauth(): 3884 return self.client.authenticate(b"not the secret") 3885 3886 def authed(): 3887 self.authenticated = 1 3888 3889 def misauthed(): 3890 self.authenticated = -1 3891 3892 d1 = self.connected.addCallback(strip(misauth)) 3893 d1.addCallbacks(strip(authed), strip(misauthed)) 3894 d1.addCallbacks(self._cbStopClient, self._ebGeneral) 3895 d = defer.gatherResults([self.loopback(), d1]) 3896 return d.addCallback(self._cbTestFailedPLAIN) 3897 3898 def _cbTestFailedPLAIN(self, ignored): 3899 self.assertEqual(self.authenticated, -1) 3900 self.assertEqual(self.server.account, None) 3901 3902 3903class SASLPLAINTests(TestCase): 3904 """ 3905 Tests for I{SASL PLAIN} authentication, as implemented by 3906 L{imap4.PLAINAuthenticator} and L{imap4.PLAINCredentials}. 3907 3908 @see: U{http://www.faqs.org/rfcs/rfc2595.html} 3909 @see: U{http://www.faqs.org/rfcs/rfc4616.html} 3910 """ 3911 3912 def test_authenticatorChallengeResponse(self): 3913 """ 3914 L{PLAINAuthenticator.challengeResponse} returns challenge strings of 3915 the form:: 3916 3917 NUL<authn-id>NUL<secret> 3918 """ 3919 username = b"testuser" 3920 secret = b"secret" 3921 chal = b"challenge" 3922 cAuth = imap4.PLAINAuthenticator(username) 3923 response = cAuth.challengeResponse(secret, chal) 3924 self.assertEqual(response, b"\0" + username + b"\0" + secret) 3925 3926 def test_credentialsSetResponse(self): 3927 """ 3928 L{PLAINCredentials.setResponse} parses challenge strings of the 3929 form:: 3930 3931 NUL<authn-id>NUL<secret> 3932 """ 3933 cred = imap4.PLAINCredentials() 3934 cred.setResponse(b"\0testuser\0secret") 3935 self.assertEqual(cred.username, b"testuser") 3936 self.assertEqual(cred.password, b"secret") 3937 3938 def test_credentialsInvalidResponse(self): 3939 """ 3940 L{PLAINCredentials.setResponse} raises L{imap4.IllegalClientResponse} 3941 when passed a string not of the expected form. 3942 """ 3943 cred = imap4.PLAINCredentials() 3944 self.assertRaises(imap4.IllegalClientResponse, cred.setResponse, b"hello") 3945 self.assertRaises( 3946 imap4.IllegalClientResponse, cred.setResponse, b"hello\0world" 3947 ) 3948 self.assertRaises( 3949 imap4.IllegalClientResponse, cred.setResponse, b"hello\0world\0Zoom!\0" 3950 ) 3951 3952 3953class UnsolicitedResponseTests(IMAP4HelperMixin, TestCase): 3954 def testReadWrite(self): 3955 def login(): 3956 return self.client.login(b"testuser", b"password-test") 3957 3958 def loggedIn(): 3959 self.server.modeChanged(1) 3960 3961 d1 = self.connected.addCallback(strip(login)) 3962 d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral) 3963 d = defer.gatherResults([self.loopback(), d1]) 3964 return d.addCallback(self._cbTestReadWrite) 3965 3966 def _cbTestReadWrite(self, ignored): 3967 E = self.client.events 3968 self.assertEqual(E, [["modeChanged", 1]]) 3969 3970 def testReadOnly(self): 3971 def login(): 3972 return self.client.login(b"testuser", b"password-test") 3973 3974 def loggedIn(): 3975 self.server.modeChanged(0) 3976 3977 d1 = self.connected.addCallback(strip(login)) 3978 d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral) 3979 d = defer.gatherResults([self.loopback(), d1]) 3980 return d.addCallback(self._cbTestReadOnly) 3981 3982 def _cbTestReadOnly(self, ignored): 3983 E = self.client.events 3984 self.assertEqual(E, [["modeChanged", 0]]) 3985 3986 def testFlagChange(self): 3987 flags = {1: ["\\Answered", "\\Deleted"], 5: [], 10: ["\\Recent"]} 3988 3989 def login(): 3990 return self.client.login(b"testuser", b"password-test") 3991 3992 def loggedIn(): 3993 self.server.flagsChanged(flags) 3994 3995 d1 = self.connected.addCallback(strip(login)) 3996 d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral) 3997 d = defer.gatherResults([self.loopback(), d1]) 3998 return d.addCallback(self._cbTestFlagChange, flags) 3999 4000 def _cbTestFlagChange(self, ignored, flags): 4001 E = self.client.events 4002 expect = [["flagsChanged", {x[0]: x[1]}] for x in flags.items()] 4003 E.sort(key=lambda o: o[0]) 4004 expect.sort(key=lambda o: o[0]) 4005 self.assertEqual(E, expect) 4006 4007 def testNewMessages(self): 4008 def login(): 4009 return self.client.login(b"testuser", b"password-test") 4010 4011 def loggedIn(): 4012 self.server.newMessages(10, None) 4013 4014 d1 = self.connected.addCallback(strip(login)) 4015 d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral) 4016 d = defer.gatherResults([self.loopback(), d1]) 4017 return d.addCallback(self._cbTestNewMessages) 4018 4019 def _cbTestNewMessages(self, ignored): 4020 E = self.client.events 4021 self.assertEqual(E, [["newMessages", 10, None]]) 4022 4023 def testNewRecentMessages(self): 4024 def login(): 4025 return self.client.login(b"testuser", b"password-test") 4026 4027 def loggedIn(): 4028 self.server.newMessages(None, 10) 4029 4030 d1 = self.connected.addCallback(strip(login)) 4031 d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral) 4032 d = defer.gatherResults([self.loopback(), d1]) 4033 return d.addCallback(self._cbTestNewRecentMessages) 4034 4035 def _cbTestNewRecentMessages(self, ignored): 4036 E = self.client.events 4037 self.assertEqual(E, [["newMessages", None, 10]]) 4038 4039 def testNewMessagesAndRecent(self): 4040 def login(): 4041 return self.client.login(b"testuser", b"password-test") 4042 4043 def loggedIn(): 4044 self.server.newMessages(20, 10) 4045 4046 d1 = self.connected.addCallback(strip(login)) 4047 d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral) 4048 d = defer.gatherResults([self.loopback(), d1]) 4049 return d.addCallback(self._cbTestNewMessagesAndRecent) 4050 4051 def _cbTestNewMessagesAndRecent(self, ignored): 4052 E = self.client.events 4053 self.assertEqual(E, [["newMessages", 20, None], ["newMessages", None, 10]]) 4054 4055 4056class ClientCapabilityTests(TestCase): 4057 """ 4058 Tests for issuance of the CAPABILITY command and handling of its response. 4059 """ 4060 4061 def setUp(self): 4062 """ 4063 Create an L{imap4.IMAP4Client} connected to a L{StringTransport}. 4064 """ 4065 self.transport = StringTransport() 4066 self.protocol = imap4.IMAP4Client() 4067 self.protocol.makeConnection(self.transport) 4068 self.protocol.dataReceived(b"* OK [IMAP4rev1]\r\n") 4069 4070 def test_simpleAtoms(self): 4071 """ 4072 A capability response consisting only of atoms without C{'='} in them 4073 should result in a dict mapping those atoms to L{None}. 4074 """ 4075 capabilitiesResult = self.protocol.getCapabilities(useCache=False) 4076 self.protocol.dataReceived(b"* CAPABILITY IMAP4rev1 LOGINDISABLED\r\n") 4077 self.protocol.dataReceived(b"0001 OK Capability completed.\r\n") 4078 4079 def gotCapabilities(capabilities): 4080 self.assertEqual(capabilities, {b"IMAP4rev1": None, b"LOGINDISABLED": None}) 4081 4082 capabilitiesResult.addCallback(gotCapabilities) 4083 return capabilitiesResult 4084 4085 def test_categoryAtoms(self): 4086 """ 4087 A capability response consisting of atoms including C{'='} should have 4088 those atoms split on that byte and have capabilities in the same 4089 category aggregated into lists in the resulting dictionary. 4090 4091 (n.b. - I made up the word "category atom"; the protocol has no notion 4092 of structure here, but rather allows each capability to define the 4093 semantics of its entry in the capability response in a freeform manner. 4094 If I had realized this earlier, the API for capabilities would look 4095 different. As it is, we can hope that no one defines any crazy 4096 semantics which are incompatible with this API, or try to figure out a 4097 better API when someone does. -exarkun) 4098 """ 4099 capabilitiesResult = self.protocol.getCapabilities(useCache=False) 4100 self.protocol.dataReceived(b"* CAPABILITY IMAP4rev1 AUTH=LOGIN AUTH=PLAIN\r\n") 4101 self.protocol.dataReceived(b"0001 OK Capability completed.\r\n") 4102 4103 def gotCapabilities(capabilities): 4104 self.assertEqual( 4105 capabilities, {b"IMAP4rev1": None, b"AUTH": [b"LOGIN", b"PLAIN"]} 4106 ) 4107 4108 capabilitiesResult.addCallback(gotCapabilities) 4109 return capabilitiesResult 4110 4111 def test_mixedAtoms(self): 4112 """ 4113 A capability response consisting of both simple and category atoms of 4114 the same type should result in a list containing L{None} as well as the 4115 values for the category. 4116 """ 4117 capabilitiesResult = self.protocol.getCapabilities(useCache=False) 4118 # Exercise codepath for both orderings of =-having and =-missing 4119 # capabilities. 4120 self.protocol.dataReceived( 4121 b"* CAPABILITY IMAP4rev1 FOO FOO=BAR BAR=FOO BAR\r\n" 4122 ) 4123 self.protocol.dataReceived(b"0001 OK Capability completed.\r\n") 4124 4125 def gotCapabilities(capabilities): 4126 self.assertEqual( 4127 capabilities, 4128 {b"IMAP4rev1": None, b"FOO": [None, b"BAR"], b"BAR": [b"FOO", None]}, 4129 ) 4130 4131 capabilitiesResult.addCallback(gotCapabilities) 4132 return capabilitiesResult 4133 4134 4135class StillSimplerClient(imap4.IMAP4Client): 4136 """ 4137 An IMAP4 client which keeps track of unsolicited flag changes. 4138 """ 4139 4140 def __init__(self): 4141 imap4.IMAP4Client.__init__(self) 4142 self.flags = {} 4143 4144 def flagsChanged(self, newFlags): 4145 self.flags.update(newFlags) 4146 4147 4148class HandCraftedTests(IMAP4HelperMixin, TestCase): 4149 def testTrailingLiteral(self): 4150 transport = StringTransport() 4151 c = imap4.IMAP4Client() 4152 c.makeConnection(transport) 4153 c.lineReceived(b"* OK [IMAP4rev1]") 4154 4155 def cbCheckTransport(ignored): 4156 self.assertEqual( 4157 transport.value().splitlines()[-1], 4158 b"0003 FETCH 1 (RFC822)", 4159 ) 4160 4161 def cbSelect(ignored): 4162 d = c.fetchMessage("1") 4163 c.dataReceived( 4164 b"* 1 FETCH (RFC822 {10}\r\n0123456789\r\n RFC822.SIZE 10)\r\n" 4165 ) 4166 c.dataReceived(b"0003 OK FETCH\r\n") 4167 d.addCallback(cbCheckTransport) 4168 return d 4169 4170 def cbLogin(ignored): 4171 d = c.select("inbox") 4172 c.lineReceived(b"0002 OK SELECT") 4173 d.addCallback(cbSelect) 4174 return d 4175 4176 d = c.login(b"blah", b"blah") 4177 c.dataReceived(b"0001 OK LOGIN\r\n") 4178 d.addCallback(cbLogin) 4179 return d 4180 4181 def test_fragmentedStringLiterals(self): 4182 """ 4183 String literals whose data is not immediately available are 4184 parsed. 4185 """ 4186 self.server.checker.addUser(b"testuser", b"password-test") 4187 transport = StringTransport() 4188 self.server.makeConnection(transport) 4189 4190 transport.clear() 4191 self.server.dataReceived(b"01 LOGIN {8}\r\n") 4192 self.assertEqual(transport.value(), b"+ Ready for 8 octets of text\r\n") 4193 4194 transport.clear() 4195 self.server.dataReceived(b"testuser {13}\r\n") 4196 self.assertEqual(transport.value(), b"+ Ready for 13 octets of text\r\n") 4197 4198 transport.clear() 4199 self.server.dataReceived(b"password") 4200 self.assertNot(transport.value()) 4201 self.server.dataReceived(b"-test\r\n") 4202 self.assertEqual(transport.value(), b"01 OK LOGIN succeeded\r\n") 4203 self.assertEqual(self.server.state, "auth") 4204 4205 self.server.connectionLost(error.ConnectionDone("Connection done.")) 4206 4207 def test_emptyStringLiteral(self): 4208 """ 4209 Empty string literals are parsed. 4210 """ 4211 self.server.checker.users = {b"": b""} 4212 transport = StringTransport() 4213 self.server.makeConnection(transport) 4214 4215 transport.clear() 4216 self.server.dataReceived(b"01 LOGIN {0}\r\n") 4217 self.assertEqual(transport.value(), b"+ Ready for 0 octets of text\r\n") 4218 4219 transport.clear() 4220 self.server.dataReceived(b"{0}\r\n") 4221 self.assertEqual(transport.value(), b"01 OK LOGIN succeeded\r\n") 4222 self.assertEqual(self.server.state, "auth") 4223 4224 self.server.connectionLost(error.ConnectionDone("Connection done.")) 4225 4226 def test_unsolicitedResponseMixedWithSolicitedResponse(self): 4227 """ 4228 If unsolicited data is received along with solicited data in the 4229 response to a I{FETCH} command issued by L{IMAP4Client.fetchSpecific}, 4230 the unsolicited data is passed to the appropriate callback and not 4231 included in the result with which the L{Deferred} returned by 4232 L{IMAP4Client.fetchSpecific} fires. 4233 """ 4234 transport = StringTransport() 4235 c = StillSimplerClient() 4236 c.makeConnection(transport) 4237 c.lineReceived(b"* OK [IMAP4rev1]") 4238 4239 def login(): 4240 d = c.login(b"blah", b"blah") 4241 c.dataReceived(b"0001 OK LOGIN\r\n") 4242 return d 4243 4244 def select(): 4245 d = c.select("inbox") 4246 c.lineReceived(b"0002 OK SELECT") 4247 return d 4248 4249 def fetch(): 4250 d = c.fetchSpecific( 4251 "1:*", headerType="HEADER.FIELDS", headerArgs=["SUBJECT"] 4252 ) 4253 c.dataReceived(b'* 1 FETCH (BODY[HEADER.FIELDS ("SUBJECT")] {38}\r\n') 4254 c.dataReceived(b"Subject: Suprise for your woman...\r\n") 4255 c.dataReceived(b"\r\n") 4256 c.dataReceived(b")\r\n") 4257 c.dataReceived(b"* 1 FETCH (FLAGS (\\Seen))\r\n") 4258 c.dataReceived(b'* 2 FETCH (BODY[HEADER.FIELDS ("SUBJECT")] {75}\r\n') 4259 c.dataReceived( 4260 b"Subject: What you been doing. Order your meds here . ,. handcuff madsen\r\n" 4261 ) 4262 c.dataReceived(b"\r\n") 4263 c.dataReceived(b")\r\n") 4264 c.dataReceived(b"0003 OK FETCH completed\r\n") 4265 return d 4266 4267 def test(res): 4268 self.assertEqual( 4269 transport.value().splitlines()[-1], 4270 b"0003 FETCH 1:* BODY[HEADER.FIELDS (SUBJECT)]", 4271 ) 4272 4273 self.assertEqual( 4274 res, 4275 { 4276 1: [ 4277 [ 4278 "BODY", 4279 ["HEADER.FIELDS", ["SUBJECT"]], 4280 "Subject: Suprise for your woman...\r\n\r\n", 4281 ] 4282 ], 4283 2: [ 4284 [ 4285 "BODY", 4286 ["HEADER.FIELDS", ["SUBJECT"]], 4287 "Subject: What you been doing. Order your meds here . ,. handcuff madsen\r\n\r\n", 4288 ] 4289 ], 4290 }, 4291 ) 4292 4293 self.assertEqual(c.flags, {1: ["\\Seen"]}) 4294 4295 return ( 4296 login() 4297 .addCallback(strip(select)) 4298 .addCallback(strip(fetch)) 4299 .addCallback(test) 4300 ) 4301 4302 def test_literalWithoutPrecedingWhitespace(self): 4303 """ 4304 Literals should be recognized even when they are not preceded by 4305 whitespace. 4306 """ 4307 transport = StringTransport() 4308 protocol = imap4.IMAP4Client() 4309 4310 protocol.makeConnection(transport) 4311 protocol.lineReceived(b"* OK [IMAP4rev1]") 4312 4313 def login(): 4314 d = protocol.login(b"blah", b"blah") 4315 protocol.dataReceived(b"0001 OK LOGIN\r\n") 4316 return d 4317 4318 def select(): 4319 d = protocol.select(b"inbox") 4320 protocol.lineReceived(b"0002 OK SELECT") 4321 return d 4322 4323 def fetch(): 4324 d = protocol.fetchSpecific( 4325 "1:*", headerType="HEADER.FIELDS", headerArgs=["SUBJECT"] 4326 ) 4327 protocol.dataReceived( 4328 b'* 1 FETCH (BODY[HEADER.FIELDS ({7}\r\nSUBJECT)] "Hello")\r\n' 4329 ) 4330 protocol.dataReceived(b"0003 OK FETCH completed\r\n") 4331 return d 4332 4333 def test(result): 4334 self.assertEqual( 4335 transport.value().splitlines()[-1], 4336 b"0003 FETCH 1:* BODY[HEADER.FIELDS (SUBJECT)]", 4337 ) 4338 self.assertEqual( 4339 result, {1: [["BODY", ["HEADER.FIELDS", ["SUBJECT"]], "Hello"]]} 4340 ) 4341 4342 d = login() 4343 d.addCallback(strip(select)) 4344 d.addCallback(strip(fetch)) 4345 d.addCallback(test) 4346 return d 4347 4348 def test_nonIntegerLiteralLength(self): 4349 """ 4350 If the server sends a literal length which cannot be parsed as an 4351 integer, L{IMAP4Client.lineReceived} should cause the protocol to be 4352 disconnected by raising L{imap4.IllegalServerResponse}. 4353 """ 4354 transport = StringTransport() 4355 protocol = imap4.IMAP4Client() 4356 4357 protocol.makeConnection(transport) 4358 protocol.lineReceived(b"* OK [IMAP4rev1]") 4359 4360 def login(): 4361 d = protocol.login(b"blah", b"blah") 4362 protocol.dataReceived(b"0001 OK LOGIN\r\n") 4363 return d 4364 4365 def select(): 4366 d = protocol.select("inbox") 4367 protocol.lineReceived(b"0002 OK SELECT") 4368 return d 4369 4370 def fetch(): 4371 protocol.fetchSpecific( 4372 "1:*", headerType="HEADER.FIELDS", headerArgs=["SUBJECT"] 4373 ) 4374 4375 self.assertEqual( 4376 transport.value().splitlines()[-1], 4377 b"0003 FETCH 1:* BODY[HEADER.FIELDS (SUBJECT)]", 4378 ) 4379 4380 self.assertRaises( 4381 imap4.IllegalServerResponse, 4382 protocol.dataReceived, 4383 b"* 1 FETCH {xyz}\r\n...", 4384 ) 4385 4386 d = login() 4387 d.addCallback(strip(select)) 4388 d.addCallback(strip(fetch)) 4389 return d 4390 4391 def test_flagsChangedInsideFetchSpecificResponse(self): 4392 """ 4393 Any unrequested flag information received along with other requested 4394 information in an untagged I{FETCH} received in response to a request 4395 issued with L{IMAP4Client.fetchSpecific} is passed to the 4396 C{flagsChanged} callback. 4397 """ 4398 transport = StringTransport() 4399 c = StillSimplerClient() 4400 c.makeConnection(transport) 4401 c.lineReceived(b"* OK [IMAP4rev1]") 4402 4403 def login(): 4404 d = c.login(b"blah", b"blah") 4405 c.dataReceived(b"0001 OK LOGIN\r\n") 4406 return d 4407 4408 def select(): 4409 d = c.select("inbox") 4410 c.lineReceived(b"0002 OK SELECT") 4411 return d 4412 4413 def fetch(): 4414 d = c.fetchSpecific( 4415 b"1:*", headerType="HEADER.FIELDS", headerArgs=["SUBJECT"] 4416 ) 4417 # This response includes FLAGS after the requested data. 4418 c.dataReceived(b'* 1 FETCH (BODY[HEADER.FIELDS ("SUBJECT")] {22}\r\n') 4419 c.dataReceived(b"Subject: subject one\r\n") 4420 c.dataReceived(b" FLAGS (\\Recent))\r\n") 4421 # And this one includes it before! Either is possible. 4422 c.dataReceived( 4423 b'* 2 FETCH (FLAGS (\\Seen) BODY[HEADER.FIELDS ("SUBJECT")] {22}\r\n' 4424 ) 4425 c.dataReceived(b"Subject: subject two\r\n") 4426 c.dataReceived(b")\r\n") 4427 c.dataReceived(b"0003 OK FETCH completed\r\n") 4428 return d 4429 4430 def test(res): 4431 self.assertEqual( 4432 res, 4433 { 4434 1: [ 4435 [ 4436 "BODY", 4437 ["HEADER.FIELDS", ["SUBJECT"]], 4438 "Subject: subject one\r\n", 4439 ] 4440 ], 4441 2: [ 4442 [ 4443 "BODY", 4444 ["HEADER.FIELDS", ["SUBJECT"]], 4445 "Subject: subject two\r\n", 4446 ] 4447 ], 4448 }, 4449 ) 4450 4451 self.assertEqual(c.flags, {1: ["\\Recent"], 2: ["\\Seen"]}) 4452 4453 return ( 4454 login() 4455 .addCallback(strip(select)) 4456 .addCallback(strip(fetch)) 4457 .addCallback(test) 4458 ) 4459 4460 def test_flagsChangedInsideFetchMessageResponse(self): 4461 """ 4462 Any unrequested flag information received along with other requested 4463 information in an untagged I{FETCH} received in response to a request 4464 issued with L{IMAP4Client.fetchMessage} is passed to the 4465 C{flagsChanged} callback. 4466 """ 4467 transport = StringTransport() 4468 c = StillSimplerClient() 4469 c.makeConnection(transport) 4470 c.lineReceived(b"* OK [IMAP4rev1]") 4471 4472 def login(): 4473 d = c.login(b"blah", b"blah") 4474 c.dataReceived(b"0001 OK LOGIN\r\n") 4475 return d 4476 4477 def select(): 4478 d = c.select("inbox") 4479 c.lineReceived(b"0002 OK SELECT") 4480 return d 4481 4482 def fetch(): 4483 d = c.fetchMessage("1:*") 4484 c.dataReceived(b"* 1 FETCH (RFC822 {24}\r\n") 4485 c.dataReceived(b"Subject: first subject\r\n") 4486 c.dataReceived(b" FLAGS (\\Seen))\r\n") 4487 c.dataReceived(b"* 2 FETCH (FLAGS (\\Recent \\Seen) RFC822 {25}\r\n") 4488 c.dataReceived(b"Subject: second subject\r\n") 4489 c.dataReceived(b")\r\n") 4490 c.dataReceived(b"0003 OK FETCH completed\r\n") 4491 return d 4492 4493 def test(res): 4494 self.assertEqual( 4495 transport.value().splitlines()[-1], 4496 b"0003 FETCH 1:* (RFC822)", 4497 ) 4498 4499 self.assertEqual( 4500 res, 4501 { 4502 1: {"RFC822": "Subject: first subject\r\n"}, 4503 2: {"RFC822": "Subject: second subject\r\n"}, 4504 }, 4505 ) 4506 4507 self.assertEqual(c.flags, {1: ["\\Seen"], 2: ["\\Recent", "\\Seen"]}) 4508 4509 return ( 4510 login() 4511 .addCallback(strip(select)) 4512 .addCallback(strip(fetch)) 4513 .addCallback(test) 4514 ) 4515 4516 def test_authenticationChallengeDecodingException(self): 4517 """ 4518 When decoding a base64 encoded authentication message from the server, 4519 decoding errors are logged and then the client closes the connection. 4520 """ 4521 transport = StringTransportWithDisconnection() 4522 protocol = imap4.IMAP4Client() 4523 transport.protocol = protocol 4524 4525 protocol.makeConnection(transport) 4526 protocol.lineReceived( 4527 b"* OK [CAPABILITY IMAP4rev1 IDLE NAMESPACE AUTH=CRAM-MD5] " 4528 b"Twisted IMAP4rev1 Ready" 4529 ) 4530 cAuth = imap4.CramMD5ClientAuthenticator(b"testuser") 4531 protocol.registerAuthenticator(cAuth) 4532 4533 d = protocol.authenticate("secret") 4534 # Should really be something describing the base64 decode error. See 4535 # #6021. 4536 self.assertFailure(d, error.ConnectionDone) 4537 4538 protocol.dataReceived(b"+ Something bad! and bad\r\n") 4539 4540 # This should not really be logged. See #6021. 4541 logged = self.flushLoggedErrors(imap4.IllegalServerResponse) 4542 self.assertEqual(len(logged), 1) 4543 self.assertEqual(logged[0].value.args[0], b"Something bad! and bad") 4544 return d 4545 4546 4547class PreauthIMAP4ClientMixin: 4548 """ 4549 Mixin for L{SynchronousTestCase} subclasses which 4550 provides a C{setUp} method which creates an L{IMAP4Client} 4551 connected to a L{StringTransport} and puts it into the 4552 I{authenticated} state. 4553 4554 @ivar transport: A L{StringTransport} to which C{client} is 4555 connected. 4556 4557 @ivar client: An L{IMAP4Client} which is connected to 4558 C{transport}. 4559 """ 4560 4561 clientProtocol: Type[imap4.IMAP4Client] = imap4.IMAP4Client 4562 4563 def setUp(self): 4564 """ 4565 Create an IMAP4Client connected to a fake transport and in the 4566 authenticated state. 4567 """ 4568 self.transport = StringTransport() 4569 self.client = self.clientProtocol() 4570 self.client.makeConnection(self.transport) 4571 self.client.dataReceived(b"* PREAUTH Hello unittest\r\n") 4572 4573 4574class SelectionTestsMixin(PreauthIMAP4ClientMixin): 4575 """ 4576 Mixin for test cases which defines tests which apply to both I{EXAMINE} and 4577 I{SELECT} support. 4578 """ 4579 4580 def _examineOrSelect(self): 4581 """ 4582 Issue either an I{EXAMINE} or I{SELECT} command (depending on 4583 C{self.method}), assert that the correct bytes are written to the 4584 transport, and return the L{Deferred} returned by whichever method was 4585 called. 4586 """ 4587 d = getattr(self.client, self.method)("foobox") 4588 self.assertEqual( 4589 self.transport.value(), b"0001 " + self.command + b" foobox\r\n" 4590 ) 4591 return d 4592 4593 def _response(self, *lines): 4594 """ 4595 Deliver the given (unterminated) response lines to C{self.client} and 4596 then deliver a tagged SELECT or EXAMINE completion line to finish the 4597 SELECT or EXAMINE response. 4598 """ 4599 for line in lines: 4600 self.client.dataReceived(line + b"\r\n") 4601 self.client.dataReceived( 4602 b"0001 OK [READ-ONLY] " + self.command + b" completed\r\n" 4603 ) 4604 4605 def test_exists(self): 4606 """ 4607 If the server response to a I{SELECT} or I{EXAMINE} command includes an 4608 I{EXISTS} response, the L{Deferred} return by L{IMAP4Client.select} or 4609 L{IMAP4Client.examine} fires with a C{dict} including the value 4610 associated with the C{'EXISTS'} key. 4611 """ 4612 d = self._examineOrSelect() 4613 self._response(b"* 3 EXISTS") 4614 self.assertEqual(self.successResultOf(d), {"READ-WRITE": False, "EXISTS": 3}) 4615 4616 def test_nonIntegerExists(self): 4617 """ 4618 If the server returns a non-integer EXISTS value in its response to a 4619 I{SELECT} or I{EXAMINE} command, the L{Deferred} returned by 4620 L{IMAP4Client.select} or L{IMAP4Client.examine} fails with 4621 L{IllegalServerResponse}. 4622 """ 4623 d = self._examineOrSelect() 4624 self._response(b"* foo EXISTS") 4625 self.failureResultOf(d, imap4.IllegalServerResponse) 4626 4627 def test_recent(self): 4628 """ 4629 If the server response to a I{SELECT} or I{EXAMINE} command includes an 4630 I{RECENT} response, the L{Deferred} return by L{IMAP4Client.select} or 4631 L{IMAP4Client.examine} fires with a C{dict} including the value 4632 associated with the C{'RECENT'} key. 4633 """ 4634 d = self._examineOrSelect() 4635 self._response(b"* 5 RECENT") 4636 self.assertEqual(self.successResultOf(d), {"READ-WRITE": False, "RECENT": 5}) 4637 4638 def test_nonIntegerRecent(self): 4639 """ 4640 If the server returns a non-integer RECENT value in its response to a 4641 I{SELECT} or I{EXAMINE} command, the L{Deferred} returned by 4642 L{IMAP4Client.select} or L{IMAP4Client.examine} fails with 4643 L{IllegalServerResponse}. 4644 """ 4645 d = self._examineOrSelect() 4646 self._response(b"* foo RECENT") 4647 self.failureResultOf(d, imap4.IllegalServerResponse) 4648 4649 def test_unseen(self): 4650 """ 4651 If the server response to a I{SELECT} or I{EXAMINE} command includes an 4652 I{UNSEEN} response, the L{Deferred} returned by L{IMAP4Client.select} or 4653 L{IMAP4Client.examine} fires with a C{dict} including the value 4654 associated with the C{'UNSEEN'} key. 4655 """ 4656 d = self._examineOrSelect() 4657 self._response(b"* OK [UNSEEN 8] Message 8 is first unseen") 4658 self.assertEqual(self.successResultOf(d), {"READ-WRITE": False, "UNSEEN": 8}) 4659 4660 def test_nonIntegerUnseen(self): 4661 """ 4662 If the server returns a non-integer UNSEEN value in its response to a 4663 I{SELECT} or I{EXAMINE} command, the L{Deferred} returned by 4664 L{IMAP4Client.select} or L{IMAP4Client.examine} fails with 4665 L{IllegalServerResponse}. 4666 """ 4667 d = self._examineOrSelect() 4668 self._response(b"* OK [UNSEEN foo] Message foo is first unseen") 4669 self.failureResultOf(d, imap4.IllegalServerResponse) 4670 4671 def test_uidvalidity(self): 4672 """ 4673 If the server response to a I{SELECT} or I{EXAMINE} command includes an 4674 I{UIDVALIDITY} response, the L{Deferred} returned by 4675 L{IMAP4Client.select} or L{IMAP4Client.examine} fires with a C{dict} 4676 including the value associated with the C{'UIDVALIDITY'} key. 4677 """ 4678 d = self._examineOrSelect() 4679 self._response(b"* OK [UIDVALIDITY 12345] UIDs valid") 4680 self.assertEqual( 4681 self.successResultOf(d), {"READ-WRITE": False, "UIDVALIDITY": 12345} 4682 ) 4683 4684 def test_nonIntegerUIDVALIDITY(self): 4685 """ 4686 If the server returns a non-integer UIDVALIDITY value in its response to 4687 a I{SELECT} or I{EXAMINE} command, the L{Deferred} returned by 4688 L{IMAP4Client.select} or L{IMAP4Client.examine} fails with 4689 L{IllegalServerResponse}. 4690 """ 4691 d = self._examineOrSelect() 4692 self._response(b"* OK [UIDVALIDITY foo] UIDs valid") 4693 self.failureResultOf(d, imap4.IllegalServerResponse) 4694 4695 def test_uidnext(self): 4696 """ 4697 If the server response to a I{SELECT} or I{EXAMINE} command includes an 4698 I{UIDNEXT} response, the L{Deferred} returned by L{IMAP4Client.select} 4699 or L{IMAP4Client.examine} fires with a C{dict} including the value 4700 associated with the C{'UIDNEXT'} key. 4701 """ 4702 d = self._examineOrSelect() 4703 self._response(b"* OK [UIDNEXT 4392] Predicted next UID") 4704 self.assertEqual( 4705 self.successResultOf(d), {"READ-WRITE": False, "UIDNEXT": 4392} 4706 ) 4707 4708 def test_nonIntegerUIDNEXT(self): 4709 """ 4710 If the server returns a non-integer UIDNEXT value in its response to a 4711 I{SELECT} or I{EXAMINE} command, the L{Deferred} returned by 4712 L{IMAP4Client.select} or L{IMAP4Client.examine} fails with 4713 L{IllegalServerResponse}. 4714 """ 4715 d = self._examineOrSelect() 4716 self._response(b"* OK [UIDNEXT foo] Predicted next UID") 4717 self.failureResultOf(d, imap4.IllegalServerResponse) 4718 4719 def test_flags(self): 4720 """ 4721 If the server response to a I{SELECT} or I{EXAMINE} command includes an 4722 I{FLAGS} response, the L{Deferred} returned by L{IMAP4Client.select} or 4723 L{IMAP4Client.examine} fires with a C{dict} including the value 4724 associated with the C{'FLAGS'} key. 4725 """ 4726 d = self._examineOrSelect() 4727 self._response(b"* FLAGS (\\Answered \\Flagged \\Deleted \\Seen \\Draft)") 4728 self.assertEqual( 4729 self.successResultOf(d), 4730 { 4731 "READ-WRITE": False, 4732 "FLAGS": ("\\Answered", "\\Flagged", "\\Deleted", "\\Seen", "\\Draft"), 4733 }, 4734 ) 4735 4736 def test_permanentflags(self): 4737 """ 4738 If the server response to a I{SELECT} or I{EXAMINE} command includes an 4739 I{FLAGS} response, the L{Deferred} returned by L{IMAP4Client.select} or 4740 L{IMAP4Client.examine} fires with a C{dict} including the value 4741 associated with the C{'FLAGS'} key. 4742 """ 4743 d = self._examineOrSelect() 4744 self._response( 4745 b"* OK [PERMANENTFLAGS (\\Starred)] Just one permanent flag in " 4746 b"that list up there" 4747 ) 4748 self.assertEqual( 4749 self.successResultOf(d), 4750 {"READ-WRITE": False, "PERMANENTFLAGS": ("\\Starred",)}, 4751 ) 4752 4753 def test_unrecognizedOk(self): 4754 """ 4755 If the server response to a I{SELECT} or I{EXAMINE} command includes an 4756 I{OK} with unrecognized response code text, parsing does not fail. 4757 """ 4758 d = self._examineOrSelect() 4759 self._response(b"* OK [X-MADE-UP] I just made this response text up.") 4760 # The value won't show up in the result. It would be okay if it did 4761 # someday, perhaps. This shouldn't ever happen, though. 4762 self.assertEqual(self.successResultOf(d), {"READ-WRITE": False}) 4763 4764 def test_bareOk(self): 4765 """ 4766 If the server response to a I{SELECT} or I{EXAMINE} command includes an 4767 I{OK} with no response code text, parsing does not fail. 4768 """ 4769 d = self._examineOrSelect() 4770 self._response(b"* OK") 4771 self.assertEqual(self.successResultOf(d), {"READ-WRITE": False}) 4772 4773 4774class IMAP4ClientExamineTests(SelectionTestsMixin, SynchronousTestCase): 4775 """ 4776 Tests for the L{IMAP4Client.examine} method. 4777 4778 An example of usage of the EXAMINE command from RFC 3501, section 6.3.2:: 4779 4780 S: * 17 EXISTS 4781 S: * 2 RECENT 4782 S: * OK [UNSEEN 8] Message 8 is first unseen 4783 S: * OK [UIDVALIDITY 3857529045] UIDs valid 4784 S: * OK [UIDNEXT 4392] Predicted next UID 4785 S: * FLAGS (\\Answered \\Flagged \\Deleted \\Seen \\Draft) 4786 S: * OK [PERMANENTFLAGS ()] No permanent flags permitted 4787 S: A932 OK [READ-ONLY] EXAMINE completed 4788 """ 4789 4790 method = "examine" 4791 command = b"EXAMINE" 4792 4793 4794class IMAP4ClientSelectTests(SelectionTestsMixin, SynchronousTestCase): 4795 r""" 4796 Tests for the L{IMAP4Client.select} method. 4797 4798 An example of usage of the SELECT command from RFC 3501, section 6.3.1:: 4799 4800 C: A142 SELECT INBOX 4801 S: * 172 EXISTS 4802 S: * 1 RECENT 4803 S: * OK [UNSEEN 12] Message 12 is first unseen 4804 S: * OK [UIDVALIDITY 3857529045] UIDs valid 4805 S: * OK [UIDNEXT 4392] Predicted next UID 4806 S: * FLAGS (\Answered \Flagged \Deleted \Seen \Draft) 4807 S: * OK [PERMANENTFLAGS (\Deleted \Seen \*)] Limited 4808 S: A142 OK [READ-WRITE] SELECT completed 4809 """ 4810 4811 method = "select" 4812 command = b"SELECT" 4813 4814 4815class IMAP4ClientExpungeTests(PreauthIMAP4ClientMixin, SynchronousTestCase): 4816 """ 4817 Tests for the L{IMAP4Client.expunge} method. 4818 4819 An example of usage of the EXPUNGE command from RFC 3501, section 6.4.3:: 4820 4821 C: A202 EXPUNGE 4822 S: * 3 EXPUNGE 4823 S: * 3 EXPUNGE 4824 S: * 5 EXPUNGE 4825 S: * 8 EXPUNGE 4826 S: A202 OK EXPUNGE completed 4827 """ 4828 4829 def _expunge(self): 4830 d = self.client.expunge() 4831 self.assertEqual(self.transport.value(), b"0001 EXPUNGE\r\n") 4832 self.transport.clear() 4833 return d 4834 4835 def _response(self, sequenceNumbers): 4836 for number in sequenceNumbers: 4837 self.client.lineReceived(networkString(f"* {number} EXPUNGE")) 4838 self.client.lineReceived(b"0001 OK EXPUNGE COMPLETED") 4839 4840 def test_expunge(self): 4841 """ 4842 L{IMAP4Client.expunge} sends the I{EXPUNGE} command and returns a 4843 L{Deferred} which fires with a C{list} of message sequence numbers 4844 given by the server's response. 4845 """ 4846 d = self._expunge() 4847 self._response([3, 3, 5, 8]) 4848 self.assertEqual(self.successResultOf(d), [3, 3, 5, 8]) 4849 4850 def test_nonIntegerExpunged(self): 4851 """ 4852 If the server responds with a non-integer where a message sequence 4853 number is expected, the L{Deferred} returned by L{IMAP4Client.expunge} 4854 fails with L{IllegalServerResponse}. 4855 """ 4856 d = self._expunge() 4857 self._response([3, 3, "foo", 8]) 4858 self.failureResultOf(d, imap4.IllegalServerResponse) 4859 4860 4861class IMAP4ClientSearchTests(PreauthIMAP4ClientMixin, SynchronousTestCase): 4862 """ 4863 Tests for the L{IMAP4Client.search} method. 4864 4865 An example of usage of the SEARCH command from RFC 3501, section 6.4.4:: 4866 4867 C: A282 SEARCH FLAGGED SINCE 1-Feb-1994 NOT FROM "Smith" 4868 S: * SEARCH 2 84 882 4869 S: A282 OK SEARCH completed 4870 C: A283 SEARCH TEXT "string not in mailbox" 4871 S: * SEARCH 4872 S: A283 OK SEARCH completed 4873 C: A284 SEARCH CHARSET UTF-8 TEXT {6} 4874 C: XXXXXX 4875 S: * SEARCH 43 4876 S: A284 OK SEARCH completed 4877 """ 4878 4879 def _search(self): 4880 d = self.client.search(imap4.Query(text="ABCDEF")) 4881 self.assertEqual(self.transport.value(), b'0001 SEARCH (TEXT "ABCDEF")\r\n') 4882 return d 4883 4884 def _response(self, messageNumbers): 4885 self.client.lineReceived( 4886 b"* SEARCH " + networkString(" ".join(map(str, messageNumbers))) 4887 ) 4888 self.client.lineReceived(b"0001 OK SEARCH completed") 4889 4890 def test_search(self): 4891 """ 4892 L{IMAP4Client.search} sends the I{SEARCH} command and returns a 4893 L{Deferred} which fires with a C{list} of message sequence numbers 4894 given by the server's response. 4895 """ 4896 d = self._search() 4897 self._response([2, 5, 10]) 4898 self.assertEqual(self.successResultOf(d), [2, 5, 10]) 4899 4900 def test_nonIntegerFound(self): 4901 """ 4902 If the server responds with a non-integer where a message sequence 4903 number is expected, the L{Deferred} returned by L{IMAP4Client.search} 4904 fails with L{IllegalServerResponse}. 4905 """ 4906 d = self._search() 4907 self._response([2, "foo", 10]) 4908 self.failureResultOf(d, imap4.IllegalServerResponse) 4909 4910 4911class IMAP4ClientFetchTests(PreauthIMAP4ClientMixin, SynchronousTestCase): 4912 """ 4913 Tests for the L{IMAP4Client.fetch} method. 4914 4915 See RFC 3501, section 6.4.5. 4916 """ 4917 4918 def test_fetchUID(self): 4919 """ 4920 L{IMAP4Client.fetchUID} sends the I{FETCH UID} command and returns a 4921 L{Deferred} which fires with a C{dict} mapping message sequence numbers 4922 to C{dict}s mapping C{'UID'} to that message's I{UID} in the server's 4923 response. 4924 """ 4925 d = self.client.fetchUID("1:7") 4926 self.assertEqual(self.transport.value(), b"0001 FETCH 1:7 (UID)\r\n") 4927 self.client.lineReceived(b"* 2 FETCH (UID 22)") 4928 self.client.lineReceived(b"* 3 FETCH (UID 23)") 4929 self.client.lineReceived(b"* 4 FETCH (UID 24)") 4930 self.client.lineReceived(b"* 5 FETCH (UID 25)") 4931 self.client.lineReceived(b"0001 OK FETCH completed") 4932 self.assertEqual( 4933 self.successResultOf(d), 4934 {2: {"UID": "22"}, 3: {"UID": "23"}, 4: {"UID": "24"}, 5: {"UID": "25"}}, 4935 ) 4936 4937 def test_fetchUIDNonIntegerFound(self): 4938 """ 4939 If the server responds with a non-integer where a message sequence 4940 number is expected, the L{Deferred} returned by L{IMAP4Client.fetchUID} 4941 fails with L{IllegalServerResponse}. 4942 """ 4943 d = self.client.fetchUID("1") 4944 self.assertEqual(self.transport.value(), b"0001 FETCH 1 (UID)\r\n") 4945 self.client.lineReceived(b"* foo FETCH (UID 22)") 4946 self.client.lineReceived(b"0001 OK FETCH completed") 4947 self.failureResultOf(d, imap4.IllegalServerResponse) 4948 4949 def test_incompleteFetchUIDResponse(self): 4950 """ 4951 If the server responds with an incomplete I{FETCH} response line, the 4952 L{Deferred} returned by L{IMAP4Client.fetchUID} fails with 4953 L{IllegalServerResponse}. 4954 """ 4955 d = self.client.fetchUID("1:7") 4956 self.assertEqual(self.transport.value(), b"0001 FETCH 1:7 (UID)\r\n") 4957 self.client.lineReceived(b"* 2 FETCH (UID 22)") 4958 self.client.lineReceived(b"* 3 FETCH (UID)") 4959 self.client.lineReceived(b"* 4 FETCH (UID 24)") 4960 self.client.lineReceived(b"0001 OK FETCH completed") 4961 self.failureResultOf(d, imap4.IllegalServerResponse) 4962 4963 def test_fetchBody(self): 4964 """ 4965 L{IMAP4Client.fetchBody} sends the I{FETCH BODY} command and returns a 4966 L{Deferred} which fires with a C{dict} mapping message sequence numbers 4967 to C{dict}s mapping C{'RFC822.TEXT'} to that message's body as given in 4968 the server's response. 4969 """ 4970 d = self.client.fetchBody("3") 4971 self.assertEqual(self.transport.value(), b"0001 FETCH 3 (RFC822.TEXT)\r\n") 4972 self.client.lineReceived(b'* 3 FETCH (RFC822.TEXT "Message text")') 4973 self.client.lineReceived(b"0001 OK FETCH completed") 4974 self.assertEqual(self.successResultOf(d), {3: {"RFC822.TEXT": "Message text"}}) 4975 4976 def test_fetchSpecific(self): 4977 """ 4978 L{IMAP4Client.fetchSpecific} sends the I{BODY[]} command if no 4979 parameters beyond the message set to retrieve are given. It returns a 4980 L{Deferred} which fires with a C{dict} mapping message sequence numbers 4981 to C{list}s of corresponding message data given by the server's 4982 response. 4983 """ 4984 d = self.client.fetchSpecific("7") 4985 self.assertEqual(self.transport.value(), b"0001 FETCH 7 BODY[]\r\n") 4986 self.client.lineReceived(b'* 7 FETCH (BODY[] "Some body")') 4987 self.client.lineReceived(b"0001 OK FETCH completed") 4988 self.assertEqual(self.successResultOf(d), {7: [["BODY", [], "Some body"]]}) 4989 4990 def test_fetchSpecificPeek(self): 4991 """ 4992 L{IMAP4Client.fetchSpecific} issues a I{BODY.PEEK[]} command if passed 4993 C{True} for the C{peek} parameter. 4994 """ 4995 d = self.client.fetchSpecific("6", peek=True) 4996 self.assertEqual(self.transport.value(), b"0001 FETCH 6 BODY.PEEK[]\r\n") 4997 # BODY.PEEK responses are just BODY 4998 self.client.lineReceived(b'* 6 FETCH (BODY[] "Some body")') 4999 self.client.lineReceived(b"0001 OK FETCH completed") 5000 self.assertEqual(self.successResultOf(d), {6: [["BODY", [], "Some body"]]}) 5001 5002 def test_fetchSpecificNumbered(self): 5003 """ 5004 L{IMAP4Client.fetchSpecific}, when passed a sequence for 5005 C{headerNumber}, sends the I{BODY[N.M]} command. It returns a 5006 L{Deferred} which fires with a C{dict} mapping message sequence numbers 5007 to C{list}s of corresponding message data given by the server's 5008 response. 5009 """ 5010 d = self.client.fetchSpecific("7", headerNumber=(1, 2, 3)) 5011 self.assertEqual(self.transport.value(), b"0001 FETCH 7 BODY[1.2.3]\r\n") 5012 self.client.lineReceived(b'* 7 FETCH (BODY[1.2.3] "Some body")') 5013 self.client.lineReceived(b"0001 OK FETCH completed") 5014 self.assertEqual( 5015 self.successResultOf(d), {7: [["BODY", ["1.2.3"], "Some body"]]} 5016 ) 5017 5018 def test_fetchSpecificText(self): 5019 """ 5020 L{IMAP4Client.fetchSpecific}, when passed C{'TEXT'} for C{headerType}, 5021 sends the I{BODY[TEXT]} command. It returns a L{Deferred} which fires 5022 with a C{dict} mapping message sequence numbers to C{list}s of 5023 corresponding message data given by the server's response. 5024 """ 5025 d = self.client.fetchSpecific("8", headerType="TEXT") 5026 self.assertEqual(self.transport.value(), b"0001 FETCH 8 BODY[TEXT]\r\n") 5027 self.client.lineReceived(b'* 8 FETCH (BODY[TEXT] "Some body")') 5028 self.client.lineReceived(b"0001 OK FETCH completed") 5029 self.assertEqual( 5030 self.successResultOf(d), {8: [["BODY", ["TEXT"], "Some body"]]} 5031 ) 5032 5033 def test_fetchSpecificNumberedText(self): 5034 """ 5035 If passed a value for the C{headerNumber} parameter and C{'TEXT'} for 5036 the C{headerType} parameter, L{IMAP4Client.fetchSpecific} sends a 5037 I{BODY[number.TEXT]} request and returns a L{Deferred} which fires with 5038 a C{dict} mapping message sequence numbers to C{list}s of message data 5039 given by the server's response. 5040 """ 5041 d = self.client.fetchSpecific("4", headerType="TEXT", headerNumber=7) 5042 self.assertEqual(self.transport.value(), b"0001 FETCH 4 BODY[7.TEXT]\r\n") 5043 self.client.lineReceived(b'* 4 FETCH (BODY[7.TEXT] "Some body")') 5044 self.client.lineReceived(b"0001 OK FETCH completed") 5045 self.assertEqual( 5046 self.successResultOf(d), {4: [["BODY", ["7.TEXT"], "Some body"]]} 5047 ) 5048 5049 def test_incompleteFetchSpecificTextResponse(self): 5050 """ 5051 If the server responds to a I{BODY[TEXT]} request with a I{FETCH} line 5052 which is truncated after the I{BODY[TEXT]} tokens, the L{Deferred} 5053 returned by L{IMAP4Client.fetchUID} fails with 5054 L{IllegalServerResponse}. 5055 """ 5056 d = self.client.fetchSpecific("8", headerType="TEXT") 5057 self.assertEqual(self.transport.value(), b"0001 FETCH 8 BODY[TEXT]\r\n") 5058 self.client.lineReceived(b"* 8 FETCH (BODY[TEXT])") 5059 self.client.lineReceived(b"0001 OK FETCH completed") 5060 self.failureResultOf(d, imap4.IllegalServerResponse) 5061 5062 def test_fetchSpecificMIME(self): 5063 """ 5064 L{IMAP4Client.fetchSpecific}, when passed C{'MIME'} for C{headerType}, 5065 sends the I{BODY[MIME]} command. It returns a L{Deferred} which fires 5066 with a C{dict} mapping message sequence numbers to C{list}s of 5067 corresponding message data given by the server's response. 5068 """ 5069 d = self.client.fetchSpecific("8", headerType="MIME") 5070 self.assertEqual(self.transport.value(), b"0001 FETCH 8 BODY[MIME]\r\n") 5071 self.client.lineReceived(b'* 8 FETCH (BODY[MIME] "Some body")') 5072 self.client.lineReceived(b"0001 OK FETCH completed") 5073 self.assertEqual( 5074 self.successResultOf(d), {8: [["BODY", ["MIME"], "Some body"]]} 5075 ) 5076 5077 def test_fetchSpecificPartial(self): 5078 """ 5079 L{IMAP4Client.fetchSpecific}, when passed C{offset} and C{length}, 5080 sends a partial content request (like I{BODY[TEXT]<offset.length>}). 5081 It returns a L{Deferred} which fires with a C{dict} mapping message 5082 sequence numbers to C{list}s of corresponding message data given by the 5083 server's response. 5084 """ 5085 d = self.client.fetchSpecific("9", headerType="TEXT", offset=17, length=3) 5086 self.assertEqual(self.transport.value(), b"0001 FETCH 9 BODY[TEXT]<17.3>\r\n") 5087 self.client.lineReceived(b'* 9 FETCH (BODY[TEXT]<17> "foo")') 5088 self.client.lineReceived(b"0001 OK FETCH completed") 5089 self.assertEqual( 5090 self.successResultOf(d), {9: [["BODY", ["TEXT"], "<17>", "foo"]]} 5091 ) 5092 5093 def test_incompleteFetchSpecificPartialResponse(self): 5094 """ 5095 If the server responds to a I{BODY[TEXT]} request with a I{FETCH} line 5096 which is truncated after the I{BODY[TEXT]<offset>} tokens, the 5097 L{Deferred} returned by L{IMAP4Client.fetchUID} fails with 5098 L{IllegalServerResponse}. 5099 """ 5100 d = self.client.fetchSpecific("8", headerType="TEXT") 5101 self.assertEqual(self.transport.value(), b"0001 FETCH 8 BODY[TEXT]\r\n") 5102 self.client.lineReceived(b"* 8 FETCH (BODY[TEXT]<17>)") 5103 self.client.lineReceived(b"0001 OK FETCH completed") 5104 self.failureResultOf(d, imap4.IllegalServerResponse) 5105 5106 def test_fetchSpecificHTML(self): 5107 """ 5108 If the body of a message begins with I{<} and ends with I{>} (as, 5109 for example, HTML bodies typically will), this is still interpreted 5110 as the body by L{IMAP4Client.fetchSpecific} (and particularly, not 5111 as a length indicator for a response to a request for a partial 5112 body). 5113 """ 5114 d = self.client.fetchSpecific("7") 5115 self.assertEqual(self.transport.value(), b"0001 FETCH 7 BODY[]\r\n") 5116 self.client.lineReceived(b'* 7 FETCH (BODY[] "<html>test</html>")') 5117 self.client.lineReceived(b"0001 OK FETCH completed") 5118 self.assertEqual( 5119 self.successResultOf(d), {7: [["BODY", [], "<html>test</html>"]]} 5120 ) 5121 5122 def assertFetchSpecificFieldsWithEmptyList(self, section): 5123 """ 5124 Assert that the provided C{BODY} section, when invoked with no 5125 arguments, produces an empty list, and that it returns a 5126 L{Deferred} which fires with a C{dict} mapping message 5127 sequence numbers to C{list}s of corresponding message data 5128 given by the server's response. 5129 5130 @param section: The C{BODY} section to test: either 5131 C{'HEADER.FIELDS'} or C{'HEADER.FIELDS.NOT'} 5132 @type section: L{str} 5133 """ 5134 d = self.client.fetchSpecific("10", headerType=section) 5135 self.assertEqual( 5136 self.transport.value(), 5137 b"0001 FETCH 10 BODY[" + section.encode("ascii") + b" ()]\r\n", 5138 ) 5139 # It's unclear what the response would look like - would it be 5140 # an empty string? No IMAP server parses an empty list of headers 5141 self.client.lineReceived( 5142 b"* 10 FETCH (BODY[" + section.encode("ascii") + b' ()] "")' 5143 ) 5144 self.client.lineReceived(b"0001 OK FETCH completed") 5145 self.assertEqual(self.successResultOf(d), {10: [["BODY", [section, []], ""]]}) 5146 5147 def test_fetchSpecificHeaderFieldsWithoutHeaders(self): 5148 """ 5149 L{IMAP4Client.fetchSpecific}, when passed C{'HEADER.FIELDS'} 5150 for C{headerType} but no C{headerArgs}, sends the 5151 I{BODY[HEADER.FIELDS]} command with no arguments. It returns 5152 a L{Deferred} which fires with a C{dict} mapping message 5153 sequence numbers to C{list}s of corresponding message data 5154 given by the server's response. 5155 """ 5156 self.assertFetchSpecificFieldsWithEmptyList("HEADER.FIELDS") 5157 5158 def test_fetchSpecificHeaderFieldsNotWithoutHeaders(self): 5159 """ 5160 L{IMAP4Client.fetchSpecific}, when passed 5161 C{'HEADER.FIELDS.NOT'} for C{headerType} but no C{headerArgs}, 5162 sends the I{BODY[HEADER.FIELDS.NOT]} command with no 5163 arguments. It returns a L{Deferred} which fires with a 5164 C{dict} mapping message sequence numbers to C{list}s of 5165 corresponding message data given by the server's response. 5166 """ 5167 self.assertFetchSpecificFieldsWithEmptyList("HEADER.FIELDS.NOT") 5168 5169 def test_fetchSpecificHeader(self): 5170 """ 5171 L{IMAP4Client.fetchSpecific}, when passed C{'HEADER'} for 5172 C{headerType}, sends the I{BODY[HEADER]} command. It returns 5173 a L{Deferred} which fires with a C{dict} mapping message 5174 sequence numbers to C{list}s of corresponding message data 5175 given by the server's response. 5176 """ 5177 d = self.client.fetchSpecific("11", headerType="HEADER") 5178 self.assertEqual(self.transport.value(), b"0001 FETCH 11 BODY[HEADER]\r\n") 5179 self.client.lineReceived( 5180 b"* 11 FETCH (BODY[HEADER]" 5181 b' "From: someone@localhost\r\nSubject: Some subject")' 5182 ) 5183 self.client.lineReceived(b"0001 OK FETCH completed") 5184 self.assertEqual( 5185 self.successResultOf(d), 5186 { 5187 11: [ 5188 [ 5189 "BODY", 5190 ["HEADER"], 5191 "From: someone@localhost\r\nSubject: Some subject", 5192 ] 5193 ] 5194 }, 5195 ) 5196 5197 5198class IMAP4ClientStoreTests(PreauthIMAP4ClientMixin, TestCase): 5199 r""" 5200 Tests for the L{IMAP4Client.setFlags}, L{IMAP4Client.addFlags}, and 5201 L{IMAP4Client.removeFlags} methods. 5202 5203 An example of usage of the STORE command, in terms of which these three 5204 methods are implemented, from RFC 3501, section 6.4.6:: 5205 5206 C: A003 STORE 2:4 +FLAGS (\Deleted) 5207 S: * 2 FETCH (FLAGS (\Deleted \Seen)) 5208 S: * 3 FETCH (FLAGS (\Deleted)) 5209 S: * 4 FETCH (FLAGS (\Deleted \Flagged \Seen)) 5210 S: A003 OK STORE completed 5211 """ 5212 5213 clientProtocol = StillSimplerClient 5214 5215 def _flagsTest(self, method, item): 5216 """ 5217 Test a non-silent flag modifying method. Call the method, assert that 5218 the correct bytes are sent, deliver a I{FETCH} response, and assert 5219 that the result of the Deferred returned by the method is correct. 5220 5221 @param method: The name of the method to test. 5222 @param item: The data item which is expected to be specified. 5223 """ 5224 d = getattr(self.client, method)("3", ("\\Read", "\\Seen"), False) 5225 self.assertEqual( 5226 self.transport.value(), b"0001 STORE 3 " + item + b" (\\Read \\Seen)\r\n" 5227 ) 5228 self.client.lineReceived(b"* 3 FETCH (FLAGS (\\Read \\Seen))") 5229 self.client.lineReceived(b"0001 OK STORE completed") 5230 self.assertEqual(self.successResultOf(d), {3: {"FLAGS": ["\\Read", "\\Seen"]}}) 5231 5232 def _flagsSilentlyTest(self, method, item): 5233 """ 5234 Test a silent flag modifying method. Call the method, assert that the 5235 correct bytes are sent, deliver an I{OK} response, and assert that the 5236 result of the Deferred returned by the method is correct. 5237 5238 @param method: The name of the method to test. 5239 @param item: The data item which is expected to be specified. 5240 """ 5241 d = getattr(self.client, method)("3", ("\\Read", "\\Seen"), True) 5242 self.assertEqual( 5243 self.transport.value(), b"0001 STORE 3 " + item + b" (\\Read \\Seen)\r\n" 5244 ) 5245 self.client.lineReceived(b"0001 OK STORE completed") 5246 self.assertEqual(self.successResultOf(d), {}) 5247 5248 def _flagsSilentlyWithUnsolicitedDataTest(self, method, item): 5249 """ 5250 Test unsolicited data received in response to a silent flag modifying 5251 method. Call the method, assert that the correct bytes are sent, 5252 deliver the unsolicited I{FETCH} response, and assert that the result 5253 of the Deferred returned by the method is correct. 5254 5255 @param method: The name of the method to test. 5256 @param item: The data item which is expected to be specified. 5257 """ 5258 d = getattr(self.client, method)("3", ("\\Read", "\\Seen"), True) 5259 self.assertEqual( 5260 self.transport.value(), b"0001 STORE 3 " + item + b" (\\Read \\Seen)\r\n" 5261 ) 5262 self.client.lineReceived(b"* 2 FETCH (FLAGS (\\Read \\Seen))") 5263 self.client.lineReceived(b"0001 OK STORE completed") 5264 self.assertEqual(self.successResultOf(d), {}) 5265 self.assertEqual(self.client.flags, {2: ["\\Read", "\\Seen"]}) 5266 5267 def test_setFlags(self): 5268 """ 5269 When passed a C{False} value for the C{silent} parameter, 5270 L{IMAP4Client.setFlags} sends the I{STORE} command with a I{FLAGS} data 5271 item and returns a L{Deferred} which fires with a C{dict} mapping 5272 message sequence numbers to C{dict}s mapping C{'FLAGS'} to the new 5273 flags of those messages. 5274 """ 5275 self._flagsTest("setFlags", b"FLAGS") 5276 5277 def test_setFlagsSilently(self): 5278 """ 5279 When passed a C{True} value for the C{silent} parameter, 5280 L{IMAP4Client.setFlags} sends the I{STORE} command with a 5281 I{FLAGS.SILENT} data item and returns a L{Deferred} which fires with an 5282 empty dictionary. 5283 """ 5284 self._flagsSilentlyTest("setFlags", b"FLAGS.SILENT") 5285 5286 def test_setFlagsSilentlyWithUnsolicitedData(self): 5287 """ 5288 If unsolicited flag data is received in response to a I{STORE} 5289 I{FLAGS.SILENT} request, that data is passed to the C{flagsChanged} 5290 callback. 5291 """ 5292 self._flagsSilentlyWithUnsolicitedDataTest("setFlags", b"FLAGS.SILENT") 5293 5294 def test_addFlags(self): 5295 """ 5296 L{IMAP4Client.addFlags} is like L{IMAP4Client.setFlags}, but sends 5297 I{+FLAGS} instead of I{FLAGS}. 5298 """ 5299 self._flagsTest("addFlags", b"+FLAGS") 5300 5301 def test_addFlagsSilently(self): 5302 """ 5303 L{IMAP4Client.addFlags} with a C{True} value for C{silent} behaves like 5304 L{IMAP4Client.setFlags} with a C{True} value for C{silent}, but it 5305 sends I{+FLAGS.SILENT} instead of I{FLAGS.SILENT}. 5306 """ 5307 self._flagsSilentlyTest("addFlags", b"+FLAGS.SILENT") 5308 5309 def test_addFlagsSilentlyWithUnsolicitedData(self): 5310 """ 5311 L{IMAP4Client.addFlags} behaves like L{IMAP4Client.setFlags} when used 5312 in silent mode and unsolicited data is received. 5313 """ 5314 self._flagsSilentlyWithUnsolicitedDataTest("addFlags", b"+FLAGS.SILENT") 5315 5316 def test_removeFlags(self): 5317 """ 5318 L{IMAP4Client.removeFlags} is like L{IMAP4Client.setFlags}, but sends 5319 I{-FLAGS} instead of I{FLAGS}. 5320 """ 5321 self._flagsTest("removeFlags", b"-FLAGS") 5322 5323 def test_removeFlagsSilently(self): 5324 """ 5325 L{IMAP4Client.removeFlags} with a C{True} value for C{silent} behaves 5326 like L{IMAP4Client.setFlags} with a C{True} value for C{silent}, but it 5327 sends I{-FLAGS.SILENT} instead of I{FLAGS.SILENT}. 5328 """ 5329 self._flagsSilentlyTest("removeFlags", b"-FLAGS.SILENT") 5330 5331 def test_removeFlagsSilentlyWithUnsolicitedData(self): 5332 """ 5333 L{IMAP4Client.removeFlags} behaves like L{IMAP4Client.setFlags} when 5334 used in silent mode and unsolicited data is received. 5335 """ 5336 self._flagsSilentlyWithUnsolicitedDataTest("removeFlags", b"-FLAGS.SILENT") 5337 5338 5339class IMAP4ClientStatusTests(PreauthIMAP4ClientMixin, SynchronousTestCase): 5340 """ 5341 Tests for the L{IMAP4Client.status} method. 5342 5343 An example of usage of the STATUS command from RFC 3501, section 5344 5.1.2:: 5345 5346 C: A042 STATUS blurdybloop (UIDNEXT MESSAGES) 5347 S: * STATUS blurdybloop (MESSAGES 231 UIDNEXT 44292) 5348 S: A042 OK STATUS completed 5349 5350 @see: U{https://tools.ietf.org/html/rfc3501#section-5.1.2} 5351 """ 5352 5353 def testUnknownName(self): 5354 """ 5355 Only allow sending the C{STATUS} names defined in RFC 3501. 5356 5357 @see: U{https://tools.ietf.org/html/rfc3501#section-5.1.2} 5358 """ 5359 exc = self.assertRaises( 5360 ValueError, 5361 self.client.status, 5362 "ignored", 5363 "IMPOSSIBLE?!", 5364 ) 5365 self.assertEqual(str(exc), "Unknown names: " + repr({"IMPOSSIBLE?!"})) 5366 5367 def testUndecodableName(self): 5368 """ 5369 C{STATUS} names that cannot be decoded as ASCII cause the 5370 status Deferred to fail with L{IllegalServerResponse} 5371 """ 5372 5373 d = self.client.status("blurdybloop", "MESSAGES") 5374 self.assertEqual( 5375 self.transport.value(), 5376 b"0001 STATUS blurdybloop (MESSAGES)\r\n", 5377 ) 5378 5379 self.client.lineReceived( 5380 b"* STATUS blurdybloop " b'(MESSAGES 1 ASCIINAME "OK" NOT\xffASCII "NO")' 5381 ) 5382 self.client.lineReceived(b"0001 OK STATUS completed") 5383 self.failureResultOf(d, imap4.IllegalServerResponse) 5384 5385 5386class IMAP4ClientCopyTests(PreauthIMAP4ClientMixin, SynchronousTestCase): 5387 """ 5388 Tests for the L{IMAP4Client.copy} method. 5389 5390 An example of the C{COPY} command, which this method implements, 5391 from RFC 3501, section 6.4.7:: 5392 5393 C: A003 COPY 2:4 MEETING 5394 S: A003 OK COPY completed 5395 """ 5396 5397 clientProtocol = StillSimplerClient 5398 5399 def test_copySequenceNumbers(self): 5400 """ 5401 L{IMAP4Client.copy} copies the messages identified by their 5402 sequence numbers to the mailbox, returning a L{Deferred} that 5403 succeeds with a true value. 5404 """ 5405 d = self.client.copy("2:3", "MEETING", uid=False) 5406 5407 self.assertEqual( 5408 self.transport.value(), 5409 b"0001 COPY 2:3 MEETING\r\n", 5410 ) 5411 5412 self.client.lineReceived(b"0001 OK COPY completed") 5413 self.assertEqual(self.successResultOf(d), ([], b"OK COPY completed")) 5414 5415 def test_copySequenceNumbersFails(self): 5416 """ 5417 L{IMAP4Client.copy} returns a L{Deferred} that fails with an 5418 L{IMAP4Exception} when the messages specified by the given 5419 sequence numbers could not be copied to the mailbox. 5420 """ 5421 d = self.client.copy("2:3", "MEETING", uid=False) 5422 5423 self.assertEqual( 5424 self.transport.value(), 5425 b"0001 COPY 2:3 MEETING\r\n", 5426 ) 5427 5428 self.client.lineReceived(b"0001 BAD COPY failed") 5429 self.assertIsInstance(self.failureResultOf(d).value, imap4.IMAP4Exception) 5430 5431 def test_copyUIDs(self): 5432 """ 5433 L{IMAP4Client.copy} copies the messages identified by their 5434 UIDs to the mailbox, returning a L{Deferred} that succeeds 5435 with a true value. 5436 """ 5437 d = self.client.copy("2:3", "MEETING", uid=True) 5438 5439 self.assertEqual( 5440 self.transport.value(), 5441 b"0001 UID COPY 2:3 MEETING\r\n", 5442 ) 5443 5444 self.client.lineReceived(b"0001 OK COPY completed") 5445 self.assertEqual(self.successResultOf(d), ([], b"OK COPY completed")) 5446 5447 def test_copyUIDsFails(self): 5448 """ 5449 L{IMAP4Client.copy} returns a L{Deferred} that fails with an 5450 L{IMAP4Exception} when the messages specified by the given 5451 UIDs could not be copied to the mailbox. 5452 """ 5453 d = self.client.copy("2:3", "MEETING", uid=True) 5454 5455 self.assertEqual( 5456 self.transport.value(), 5457 b"0001 UID COPY 2:3 MEETING\r\n", 5458 ) 5459 5460 self.client.lineReceived(b"0001 BAD COPY failed") 5461 self.assertIsInstance(self.failureResultOf(d).value, imap4.IMAP4Exception) 5462 5463 5464class FakeyServer(imap4.IMAP4Server): 5465 state = "select" 5466 timeout = None 5467 5468 def sendServerGreeting(self): 5469 pass 5470 5471 5472@implementer(imap4.IMessage) 5473class FakeyMessage(util.FancyStrMixin): 5474 showAttributes = ("headers", "flags", "date", "_body", "uid") 5475 5476 def __init__(self, headers, flags, date, body, uid, subpart): 5477 self.headers = headers 5478 self.flags = flags 5479 self._body = body 5480 self.size = len(body) 5481 self.date = date 5482 self.uid = uid 5483 self.subpart = subpart 5484 5485 def getHeaders(self, negate, *names): 5486 self.got_headers = negate, names 5487 return self.headers 5488 5489 def getFlags(self): 5490 return self.flags 5491 5492 def getInternalDate(self): 5493 return self.date 5494 5495 def getBodyFile(self): 5496 return BytesIO(self._body) 5497 5498 def getSize(self): 5499 return self.size 5500 5501 def getUID(self): 5502 return self.uid 5503 5504 def isMultipart(self): 5505 return self.subpart is not None 5506 5507 def getSubPart(self, part): 5508 self.got_subpart = part 5509 return self.subpart[part] 5510 5511 5512class NewStoreTests(TestCase, IMAP4HelperMixin): 5513 result = None 5514 storeArgs = None 5515 5516 def setUp(self): 5517 self.received_messages = self.received_uid = None 5518 5519 self.server = imap4.IMAP4Server() 5520 self.server.state = "select" 5521 self.server.mbox = self 5522 self.connected = defer.Deferred() 5523 self.client = SimpleClient(self.connected) 5524 5525 def addListener(self, x): 5526 pass 5527 5528 def removeListener(self, x): 5529 pass 5530 5531 def store(self, *args, **kw): 5532 self.storeArgs = args, kw 5533 return self.response 5534 5535 def _storeWork(self): 5536 def connected(): 5537 return self.function(self.messages, self.flags, self.silent, self.uid) 5538 5539 def result(R): 5540 self.result = R 5541 5542 self.connected.addCallback(strip(connected)).addCallback(result).addCallback( 5543 self._cbStopClient 5544 ).addErrback(self._ebGeneral) 5545 5546 def check(ignored): 5547 self.assertEqual(self.result, self.expected) 5548 self.assertEqual(self.storeArgs, self.expectedArgs) 5549 5550 d = loopback.loopbackTCP(self.server, self.client, noisy=False) 5551 d.addCallback(check) 5552 return d 5553 5554 def testSetFlags(self, uid=0): 5555 self.function = self.client.setFlags 5556 self.messages = "1,5,9" 5557 self.flags = ["\\A", "\\B", "C"] 5558 self.silent = False 5559 self.uid = uid 5560 self.response = { 5561 1: ["\\A", "\\B", "C"], 5562 5: ["\\A", "\\B", "C"], 5563 9: ["\\A", "\\B", "C"], 5564 } 5565 self.expected = { 5566 1: {"FLAGS": ["\\A", "\\B", "C"]}, 5567 5: {"FLAGS": ["\\A", "\\B", "C"]}, 5568 9: {"FLAGS": ["\\A", "\\B", "C"]}, 5569 } 5570 msg = imap4.MessageSet() 5571 msg.add(1) 5572 msg.add(5) 5573 msg.add(9) 5574 self.expectedArgs = ((msg, ["\\A", "\\B", "C"], 0), {"uid": 0}) 5575 return self._storeWork() 5576 5577 5578class GetBodyStructureTests(TestCase): 5579 """ 5580 Tests for L{imap4.getBodyStructure}, a helper for constructing a list which 5581 directly corresponds to the wire information needed for a I{BODY} or 5582 I{BODYSTRUCTURE} response. 5583 """ 5584 5585 def test_singlePart(self): 5586 """ 5587 L{imap4.getBodyStructure} accepts a L{IMessagePart} provider and returns 5588 a list giving the basic fields for the I{BODY} response for that 5589 message. 5590 """ 5591 body = b"hello, world" 5592 major = "image" 5593 minor = "jpeg" 5594 charset = "us-ascii" 5595 identifier = "some kind of id" 5596 description = "great justice" 5597 encoding = "maximum" 5598 msg = FakeyMessage( 5599 { 5600 "content-type": major + "/" + minor + "; charset=" + charset + "; x=y", 5601 "content-id": identifier, 5602 "content-description": description, 5603 "content-transfer-encoding": encoding, 5604 }, 5605 (), 5606 b"", 5607 body, 5608 123, 5609 None, 5610 ) 5611 structure = imap4.getBodyStructure(msg) 5612 self.assertEqual( 5613 [ 5614 major, 5615 minor, 5616 ["charset", charset, "x", "y"], 5617 identifier, 5618 description, 5619 encoding, 5620 len(body), 5621 ], 5622 structure, 5623 ) 5624 5625 def test_emptyContentType(self): 5626 """ 5627 L{imap4.getBodyStructure} returns L{None} for the major and 5628 minor MIME types of a L{IMessagePart} provider whose headers 5629 lack a C{Content-Type}, or have an empty value for it. 5630 """ 5631 missing = FakeyMessage({}, (), b"", b"", 123, None) 5632 missingContentTypeStructure = imap4.getBodyStructure(missing) 5633 missingMajor, missingMinor = missingContentTypeStructure[:2] 5634 self.assertIs(None, missingMajor) 5635 self.assertIs(None, missingMinor) 5636 5637 empty = FakeyMessage({"content-type": ""}, (), b"", b"", 123, None) 5638 emptyContentTypeStructure = imap4.getBodyStructure(empty) 5639 emptyMajor, emptyMinor = emptyContentTypeStructure[:2] 5640 self.assertIs(None, emptyMajor) 5641 self.assertIs(None, emptyMinor) 5642 5643 newline = FakeyMessage({"content-type": "\n"}, (), b"", b"", 123, None) 5644 newlineContentTypeStructure = imap4.getBodyStructure(newline) 5645 newlineMajor, newlineMinor = newlineContentTypeStructure[:2] 5646 self.assertIs(None, newlineMajor) 5647 self.assertIs(None, newlineMinor) 5648 5649 def test_onlyMajorContentType(self): 5650 """ 5651 L{imap4.getBodyStructure} returns only a non-L{None} major 5652 MIME type for a L{IMessagePart} provider whose headers only 5653 have a main a C{Content-Type}. 5654 """ 5655 main = FakeyMessage({"content-type": "main"}, (), b"", b"", 123, None) 5656 mainStructure = imap4.getBodyStructure(main) 5657 mainMajor, mainMinor = mainStructure[:2] 5658 self.assertEqual(mainMajor, "main") 5659 self.assertIs(mainMinor, None) 5660 5661 def test_singlePartExtended(self): 5662 """ 5663 L{imap4.getBodyStructure} returns a list giving the basic and extended 5664 fields for a I{BODYSTRUCTURE} response if passed C{True} for the 5665 C{extended} parameter. 5666 """ 5667 body = b"hello, world" 5668 major = "image" 5669 minor = "jpeg" 5670 charset = "us-ascii" 5671 identifier = "some kind of id" 5672 description = "great justice" 5673 encoding = "maximum" 5674 md5 = "abcdefabcdef" 5675 msg = FakeyMessage( 5676 { 5677 "content-type": major + "/" + minor + "; charset=" + charset + "; x=y", 5678 "content-id": identifier, 5679 "content-description": description, 5680 "content-transfer-encoding": encoding, 5681 "content-md5": md5, 5682 "content-disposition": "attachment; name=foo; size=bar", 5683 "content-language": "fr", 5684 "content-location": "France", 5685 }, 5686 (), 5687 "", 5688 body, 5689 123, 5690 None, 5691 ) 5692 structure = imap4.getBodyStructure(msg, extended=True) 5693 self.assertEqual( 5694 [ 5695 major, 5696 minor, 5697 ["charset", charset, "x", "y"], 5698 identifier, 5699 description, 5700 encoding, 5701 len(body), 5702 md5, 5703 ["attachment", ["name", "foo", "size", "bar"]], 5704 "fr", 5705 "France", 5706 ], 5707 structure, 5708 ) 5709 5710 def test_singlePartWithMissing(self): 5711 """ 5712 For fields with no information contained in the message headers, 5713 L{imap4.getBodyStructure} fills in L{None} values in its result. 5714 """ 5715 major = "image" 5716 minor = "jpeg" 5717 body = b"hello, world" 5718 msg = FakeyMessage( 5719 {"content-type": major + "/" + minor}, (), b"", body, 123, None 5720 ) 5721 structure = imap4.getBodyStructure(msg, extended=True) 5722 self.assertEqual( 5723 [major, minor, None, None, None, None, len(body), None, None, None, None], 5724 structure, 5725 ) 5726 5727 def test_textPart(self): 5728 """ 5729 For a I{text/*} message, the number of lines in the message body are 5730 included after the common single-part basic fields. 5731 """ 5732 body = b"hello, world\nhow are you?\ngoodbye\n" 5733 major = "text" 5734 minor = "jpeg" 5735 charset = "us-ascii" 5736 identifier = "some kind of id" 5737 description = "great justice" 5738 encoding = "maximum" 5739 msg = FakeyMessage( 5740 { 5741 "content-type": major + "/" + minor + "; charset=" + charset + "; x=y", 5742 "content-id": identifier, 5743 "content-description": description, 5744 "content-transfer-encoding": encoding, 5745 }, 5746 (), 5747 b"", 5748 body, 5749 123, 5750 None, 5751 ) 5752 structure = imap4.getBodyStructure(msg) 5753 self.assertEqual( 5754 [ 5755 major, 5756 minor, 5757 ["charset", charset, "x", "y"], 5758 identifier, 5759 description, 5760 encoding, 5761 len(body), 5762 len(body.splitlines()), 5763 ], 5764 structure, 5765 ) 5766 5767 def test_rfc822Message(self): 5768 """ 5769 For a I{message/rfc822} message, the common basic fields are followed 5770 by information about the contained message. 5771 """ 5772 body = b"hello, world\nhow are you?\ngoodbye\n" 5773 major = "text" 5774 minor = "jpeg" 5775 charset = "us-ascii" 5776 identifier = "some kind of id" 5777 description = "great justice" 5778 encoding = "maximum" 5779 msg = FakeyMessage( 5780 { 5781 "content-type": major + "/" + minor + "; charset=" + charset + "; x=y", 5782 "from": "Alice <alice@example.com>", 5783 "to": "Bob <bob@example.com>", 5784 "content-id": identifier, 5785 "content-description": description, 5786 "content-transfer-encoding": encoding, 5787 }, 5788 (), 5789 "", 5790 body, 5791 123, 5792 None, 5793 ) 5794 5795 container = FakeyMessage( 5796 { 5797 "content-type": "message/rfc822", 5798 }, 5799 (), 5800 b"", 5801 b"", 5802 123, 5803 [msg], 5804 ) 5805 5806 structure = imap4.getBodyStructure(container) 5807 self.assertEqual( 5808 [ 5809 "message", 5810 "rfc822", 5811 None, 5812 None, 5813 None, 5814 None, 5815 0, 5816 imap4.getEnvelope(msg), 5817 imap4.getBodyStructure(msg), 5818 3, 5819 ], 5820 structure, 5821 ) 5822 5823 def test_multiPart(self): 5824 """ 5825 For a I{multipart/*} message, L{imap4.getBodyStructure} returns a list 5826 containing the body structure information for each part of the message 5827 followed by an element giving the MIME subtype of the message. 5828 """ 5829 oneSubPart = FakeyMessage( 5830 { 5831 "content-type": "image/jpeg; x=y", 5832 "content-id": "some kind of id", 5833 "content-description": "great justice", 5834 "content-transfer-encoding": "maximum", 5835 }, 5836 (), 5837 b"", 5838 b"hello world", 5839 123, 5840 None, 5841 ) 5842 5843 anotherSubPart = FakeyMessage( 5844 { 5845 "content-type": "text/plain; charset=us-ascii", 5846 }, 5847 (), 5848 b"", 5849 b"some stuff", 5850 321, 5851 None, 5852 ) 5853 5854 container = FakeyMessage( 5855 { 5856 "content-type": "multipart/related", 5857 }, 5858 (), 5859 b"", 5860 b"", 5861 555, 5862 [oneSubPart, anotherSubPart], 5863 ) 5864 5865 self.assertEqual( 5866 [ 5867 imap4.getBodyStructure(oneSubPart), 5868 imap4.getBodyStructure(anotherSubPart), 5869 "related", 5870 ], 5871 imap4.getBodyStructure(container), 5872 ) 5873 5874 def test_multiPartExtended(self): 5875 """ 5876 When passed a I{multipart/*} message and C{True} for the C{extended} 5877 argument, L{imap4.getBodyStructure} includes extended structure 5878 information from the parts of the multipart message and extended 5879 structure information about the multipart message itself. 5880 """ 5881 oneSubPart = FakeyMessage( 5882 { 5883 b"content-type": b"image/jpeg; x=y", 5884 b"content-id": b"some kind of id", 5885 b"content-description": b"great justice", 5886 b"content-transfer-encoding": b"maximum", 5887 }, 5888 (), 5889 b"", 5890 b"hello world", 5891 123, 5892 None, 5893 ) 5894 5895 anotherSubPart = FakeyMessage( 5896 { 5897 b"content-type": b"text/plain; charset=us-ascii", 5898 }, 5899 (), 5900 b"", 5901 b"some stuff", 5902 321, 5903 None, 5904 ) 5905 5906 container = FakeyMessage( 5907 { 5908 "content-type": "multipart/related; foo=bar", 5909 "content-language": "es", 5910 "content-location": "Spain", 5911 "content-disposition": "attachment; name=monkeys", 5912 }, 5913 (), 5914 b"", 5915 b"", 5916 555, 5917 [oneSubPart, anotherSubPart], 5918 ) 5919 5920 self.assertEqual( 5921 [ 5922 imap4.getBodyStructure(oneSubPart, extended=True), 5923 imap4.getBodyStructure(anotherSubPart, extended=True), 5924 "related", 5925 ["foo", "bar"], 5926 ["attachment", ["name", "monkeys"]], 5927 "es", 5928 "Spain", 5929 ], 5930 imap4.getBodyStructure(container, extended=True), 5931 ) 5932 5933 5934class NewFetchTests(TestCase, IMAP4HelperMixin): 5935 def setUp(self): 5936 self.received_messages = self.received_uid = None 5937 self.result = None 5938 5939 self.server = imap4.IMAP4Server() 5940 self.server.state = "select" 5941 self.server.mbox = self 5942 self.connected = defer.Deferred() 5943 self.client = SimpleClient(self.connected) 5944 5945 def addListener(self, x): 5946 pass 5947 5948 def removeListener(self, x): 5949 pass 5950 5951 def fetch(self, messages, uid): 5952 self.received_messages = messages 5953 self.received_uid = uid 5954 return iter(zip(range(len(self.msgObjs)), self.msgObjs)) 5955 5956 def _fetchWork(self, uid): 5957 if uid: 5958 for (i, msg) in zip(range(len(self.msgObjs)), self.msgObjs): 5959 self.expected[i]["UID"] = str(msg.getUID()) 5960 5961 def result(R): 5962 self.result = R 5963 5964 self.connected.addCallback( 5965 lambda _: self.function(self.messages, uid) 5966 ).addCallback(result).addCallback(self._cbStopClient).addErrback( 5967 self._ebGeneral 5968 ) 5969 5970 d = loopback.loopbackTCP(self.server, self.client, noisy=False) 5971 d.addCallback(lambda x: self.assertEqual(self.result, self.expected)) 5972 return d 5973 5974 def testFetchUID(self): 5975 self.function = lambda m, u: self.client.fetchUID(m) 5976 5977 self.messages = "7" 5978 self.msgObjs = [ 5979 FakeyMessage({}, (), b"", b"", 12345, None), 5980 FakeyMessage({}, (), b"", b"", 999, None), 5981 FakeyMessage({}, (), b"", b"", 10101, None), 5982 ] 5983 self.expected = { 5984 0: {"UID": "12345"}, 5985 1: {"UID": "999"}, 5986 2: {"UID": "10101"}, 5987 } 5988 return self._fetchWork(0) 5989 5990 def testFetchFlags(self, uid=0): 5991 self.function = self.client.fetchFlags 5992 self.messages = "9" 5993 self.msgObjs = [ 5994 FakeyMessage({}, ["FlagA", "FlagB", "\\FlagC"], b"", b"", 54321, None), 5995 FakeyMessage({}, ["\\FlagC", "FlagA", "FlagB"], b"", b"", 12345, None), 5996 ] 5997 self.expected = { 5998 0: {"FLAGS": ["FlagA", "FlagB", "\\FlagC"]}, 5999 1: {"FLAGS": ["\\FlagC", "FlagA", "FlagB"]}, 6000 } 6001 return self._fetchWork(uid) 6002 6003 def testFetchFlagsUID(self): 6004 return self.testFetchFlags(1) 6005 6006 def testFetchInternalDate(self, uid=0): 6007 self.function = self.client.fetchInternalDate 6008 self.messages = "13" 6009 self.msgObjs = [ 6010 FakeyMessage({}, (), b"Fri, 02 Nov 2003 21:25:10 GMT", b"", 23232, None), 6011 FakeyMessage({}, (), b"Thu, 29 Dec 2013 11:31:52 EST", b"", 101, None), 6012 FakeyMessage({}, (), b"Mon, 10 Mar 1992 02:44:30 CST", b"", 202, None), 6013 FakeyMessage({}, (), b"Sat, 11 Jan 2000 14:40:24 PST", b"", 303, None), 6014 ] 6015 self.expected = { 6016 0: {"INTERNALDATE": "02-Nov-2003 21:25:10 +0000"}, 6017 1: {"INTERNALDATE": "29-Dec-2013 11:31:52 -0500"}, 6018 2: {"INTERNALDATE": "10-Mar-1992 02:44:30 -0600"}, 6019 3: {"INTERNALDATE": "11-Jan-2000 14:40:24 -0800"}, 6020 } 6021 return self._fetchWork(uid) 6022 6023 def testFetchInternalDateUID(self): 6024 return self.testFetchInternalDate(1) 6025 6026 # if alternate locale is not available, the previous test will be skipped, 6027 # please install this locale for it to run. Avoid using locale.getlocale 6028 # to learn the current locale; its values don't round-trip well on all 6029 # platforms. Fortunately setlocale returns a value which does round-trip 6030 # well. 6031 currentLocale = locale.setlocale(locale.LC_ALL, None) 6032 try: 6033 locale.setlocale(locale.LC_ALL, "es_AR.UTF8") 6034 except locale.Error: 6035 noEsARLocale = True 6036 else: 6037 locale.setlocale(locale.LC_ALL, currentLocale) 6038 noEsARLocale = False 6039 6040 @skipIf(noEsARLocale, "The es_AR.UTF8 locale is not installed.") 6041 def test_fetchInternalDateLocaleIndependent(self): 6042 """ 6043 The month name in the date is locale independent. 6044 """ 6045 # Fake that we're in a language where December is not Dec 6046 currentLocale = locale.setlocale(locale.LC_ALL, None) 6047 locale.setlocale(locale.LC_ALL, "es_AR.UTF8") 6048 self.addCleanup(locale.setlocale, locale.LC_ALL, currentLocale) 6049 return self.testFetchInternalDate(1) 6050 6051 def testFetchEnvelope(self, uid=0): 6052 self.function = self.client.fetchEnvelope 6053 self.messages = "15" 6054 self.msgObjs = [ 6055 FakeyMessage( 6056 { 6057 "from": "user@domain", 6058 "to": "resu@domain", 6059 "date": "thursday", 6060 "subject": "it is a message", 6061 "message-id": "id-id-id-yayaya", 6062 }, 6063 (), 6064 b"", 6065 b"", 6066 65656, 6067 None, 6068 ), 6069 ] 6070 self.expected = { 6071 0: { 6072 "ENVELOPE": [ 6073 "thursday", 6074 "it is a message", 6075 [[None, None, "user", "domain"]], 6076 [[None, None, "user", "domain"]], 6077 [[None, None, "user", "domain"]], 6078 [[None, None, "resu", "domain"]], 6079 None, 6080 None, 6081 None, 6082 "id-id-id-yayaya", 6083 ] 6084 } 6085 } 6086 return self._fetchWork(uid) 6087 6088 def testFetchEnvelopeUID(self): 6089 return self.testFetchEnvelope(1) 6090 6091 def test_fetchBodyStructure(self, uid=0): 6092 """ 6093 L{IMAP4Client.fetchBodyStructure} issues a I{FETCH BODYSTRUCTURE} 6094 command and returns a Deferred which fires with a structure giving the 6095 result of parsing the server's response. The structure is a list 6096 reflecting the parenthesized data sent by the server, as described by 6097 RFC 3501, section 7.4.2. 6098 """ 6099 self.function = self.client.fetchBodyStructure 6100 self.messages = "3:9,10:*" 6101 self.msgObjs = [ 6102 FakeyMessage( 6103 { 6104 "content-type": 'text/plain; name=thing; key="value"', 6105 "content-id": "this-is-the-content-id", 6106 "content-description": "describing-the-content-goes-here!", 6107 "content-transfer-encoding": "8BIT", 6108 "content-md5": "abcdef123456", 6109 "content-disposition": "attachment; filename=monkeys", 6110 "content-language": "es", 6111 "content-location": "http://example.com/monkeys", 6112 }, 6113 (), 6114 "", 6115 b"Body\nText\nGoes\nHere\n", 6116 919293, 6117 None, 6118 ) 6119 ] 6120 self.expected = { 6121 0: { 6122 "BODYSTRUCTURE": [ 6123 "text", 6124 "plain", 6125 ["key", "value", "name", "thing"], 6126 "this-is-the-content-id", 6127 "describing-the-content-goes-here!", 6128 "8BIT", 6129 "20", 6130 "4", 6131 "abcdef123456", 6132 ["attachment", ["filename", "monkeys"]], 6133 "es", 6134 "http://example.com/monkeys", 6135 ] 6136 } 6137 } 6138 return self._fetchWork(uid) 6139 6140 def testFetchBodyStructureUID(self): 6141 """ 6142 If passed C{True} for the C{uid} argument, C{fetchBodyStructure} can 6143 also issue a I{UID FETCH BODYSTRUCTURE} command. 6144 """ 6145 return self.test_fetchBodyStructure(1) 6146 6147 def test_fetchBodyStructureMultipart(self, uid=0): 6148 """ 6149 L{IMAP4Client.fetchBodyStructure} can also parse the response to a 6150 I{FETCH BODYSTRUCTURE} command for a multipart message. 6151 """ 6152 self.function = self.client.fetchBodyStructure 6153 self.messages = "3:9,10:*" 6154 innerMessage = FakeyMessage( 6155 { 6156 "content-type": 'text/plain; name=thing; key="value"', 6157 "content-id": "this-is-the-content-id", 6158 "content-description": "describing-the-content-goes-here!", 6159 "content-transfer-encoding": "8BIT", 6160 "content-language": "fr", 6161 "content-md5": "123456abcdef", 6162 "content-disposition": "inline", 6163 "content-location": "outer space", 6164 }, 6165 (), 6166 b"", 6167 b"Body\nText\nGoes\nHere\n", 6168 919293, 6169 None, 6170 ) 6171 self.msgObjs = [ 6172 FakeyMessage( 6173 { 6174 "content-type": 'multipart/mixed; boundary="xyz"', 6175 "content-language": "en", 6176 "content-location": "nearby", 6177 }, 6178 (), 6179 b"", 6180 b"", 6181 919293, 6182 [innerMessage], 6183 ) 6184 ] 6185 self.expected = { 6186 0: { 6187 "BODYSTRUCTURE": [ 6188 [ 6189 "text", 6190 "plain", 6191 ["key", "value", "name", "thing"], 6192 "this-is-the-content-id", 6193 "describing-the-content-goes-here!", 6194 "8BIT", 6195 "20", 6196 "4", 6197 "123456abcdef", 6198 ["inline", None], 6199 "fr", 6200 "outer space", 6201 ], 6202 "mixed", 6203 ["boundary", "xyz"], 6204 None, 6205 "en", 6206 "nearby", 6207 ] 6208 } 6209 } 6210 return self._fetchWork(uid) 6211 6212 def testFetchSimplifiedBody(self, uid=0): 6213 self.function = self.client.fetchSimplifiedBody 6214 self.messages = "21" 6215 self.msgObjs = [ 6216 FakeyMessage( 6217 {}, 6218 (), 6219 b"", 6220 b"Yea whatever", 6221 91825, 6222 [ 6223 FakeyMessage( 6224 {"content-type": "image/jpg"}, 6225 (), 6226 b"", 6227 b"Body Body Body", 6228 None, 6229 None, 6230 ) 6231 ], 6232 ) 6233 ] 6234 self.expected = {0: {"BODY": [None, None, None, None, None, None, "12"]}} 6235 6236 return self._fetchWork(uid) 6237 6238 def testFetchSimplifiedBodyUID(self): 6239 return self.testFetchSimplifiedBody(1) 6240 6241 def testFetchSimplifiedBodyText(self, uid=0): 6242 self.function = self.client.fetchSimplifiedBody 6243 self.messages = "21" 6244 self.msgObjs = [ 6245 FakeyMessage( 6246 {"content-type": "text/plain"}, (), b"", b"Yea whatever", 91825, None 6247 ) 6248 ] 6249 self.expected = { 6250 0: {"BODY": ["text", "plain", None, None, None, None, "12", "1"]} 6251 } 6252 6253 return self._fetchWork(uid) 6254 6255 def testFetchSimplifiedBodyTextUID(self): 6256 return self.testFetchSimplifiedBodyText(1) 6257 6258 def testFetchSimplifiedBodyRFC822(self, uid=0): 6259 self.function = self.client.fetchSimplifiedBody 6260 self.messages = "21" 6261 self.msgObjs = [ 6262 FakeyMessage( 6263 {"content-type": "message/rfc822"}, 6264 (), 6265 b"", 6266 b"Yea whatever", 6267 91825, 6268 [ 6269 FakeyMessage( 6270 {"content-type": "image/jpg"}, 6271 (), 6272 "", 6273 b"Body Body Body", 6274 None, 6275 None, 6276 ) 6277 ], 6278 ) 6279 ] 6280 self.expected = { 6281 0: { 6282 "BODY": [ 6283 "message", 6284 "rfc822", 6285 None, 6286 None, 6287 None, 6288 None, 6289 "12", 6290 [ 6291 None, 6292 None, 6293 [[None, None, None]], 6294 [[None, None, None]], 6295 None, 6296 None, 6297 None, 6298 None, 6299 None, 6300 None, 6301 ], 6302 ["image", "jpg", None, None, None, None, "14"], 6303 "1", 6304 ] 6305 } 6306 } 6307 6308 return self._fetchWork(uid) 6309 6310 def testFetchSimplifiedBodyRFC822UID(self): 6311 return self.testFetchSimplifiedBodyRFC822(1) 6312 6313 def test_fetchSimplifiedBodyMultipart(self): 6314 """ 6315 L{IMAP4Client.fetchSimplifiedBody} returns a dictionary mapping message 6316 sequence numbers to fetch responses for the corresponding messages. In 6317 particular, for a multipart message, the value in the dictionary maps 6318 the string C{"BODY"} to a list giving the body structure information for 6319 that message, in the form of a list of subpart body structure 6320 information followed by the subtype of the message (eg C{"alternative"} 6321 for a I{multipart/alternative} message). This structure is self-similar 6322 in the case where a subpart is itself multipart. 6323 """ 6324 self.function = self.client.fetchSimplifiedBody 6325 self.messages = "21" 6326 6327 # A couple non-multipart messages to use as the inner-most payload 6328 singles = [ 6329 FakeyMessage( 6330 {"content-type": "text/plain"}, (), b"date", b"Stuff", 54321, None 6331 ), 6332 FakeyMessage( 6333 {"content-type": "text/html"}, (), b"date", b"Things", 32415, None 6334 ), 6335 ] 6336 6337 # A multipart/alternative message containing the above non-multipart 6338 # messages. This will be the payload of the outer-most message. 6339 alternative = FakeyMessage( 6340 {"content-type": "multipart/alternative"}, 6341 (), 6342 b"", 6343 b"Irrelevant", 6344 12345, 6345 singles, 6346 ) 6347 6348 # The outer-most message, also with a multipart type, containing just 6349 # the single middle message. 6350 mixed = FakeyMessage( 6351 # The message is multipart/mixed 6352 {"content-type": "multipart/mixed"}, 6353 (), 6354 b"", 6355 b"RootOf", 6356 98765, 6357 [alternative], 6358 ) 6359 6360 self.msgObjs = [mixed] 6361 6362 self.expected = { 6363 0: { 6364 "BODY": [ 6365 [ 6366 ["text", "plain", None, None, None, None, "5", "1"], 6367 ["text", "html", None, None, None, None, "6", "1"], 6368 "alternative", 6369 ], 6370 "mixed", 6371 ] 6372 } 6373 } 6374 6375 return self._fetchWork(False) 6376 6377 def testFetchMessage(self, uid=0): 6378 self.function = self.client.fetchMessage 6379 self.messages = "1,3,7,10101" 6380 self.msgObjs = [ 6381 FakeyMessage({"Header": "Value"}, (), b"", b"BODY TEXT\r\n", 91, None), 6382 ] 6383 self.expected = {0: {"RFC822": "Header: Value\r\n\r\nBODY TEXT\r\n"}} 6384 return self._fetchWork(uid) 6385 6386 def testFetchMessageUID(self): 6387 return self.testFetchMessage(1) 6388 6389 def testFetchHeaders(self, uid=0): 6390 self.function = self.client.fetchHeaders 6391 self.messages = "9,6,2" 6392 self.msgObjs = [ 6393 FakeyMessage({"H1": "V1", "H2": "V2"}, (), b"", b"", 99, None), 6394 ] 6395 6396 headers = nativeString(imap4._formatHeaders({"H1": "V1", "H2": "V2"})) 6397 6398 self.expected = { 6399 0: {"RFC822.HEADER": headers}, 6400 } 6401 return self._fetchWork(uid) 6402 6403 def testFetchHeadersUID(self): 6404 return self.testFetchHeaders(1) 6405 6406 def testFetchBody(self, uid=0): 6407 self.function = self.client.fetchBody 6408 self.messages = "1,2,3,4,5,6,7" 6409 self.msgObjs = [ 6410 FakeyMessage({"Header": "Value"}, (), "", b"Body goes here\r\n", 171, None), 6411 ] 6412 self.expected = { 6413 0: {"RFC822.TEXT": "Body goes here\r\n"}, 6414 } 6415 return self._fetchWork(uid) 6416 6417 def testFetchBodyUID(self): 6418 return self.testFetchBody(1) 6419 6420 def testFetchBodyParts(self): 6421 """ 6422 Test the server's handling of requests for specific body sections. 6423 """ 6424 self.function = self.client.fetchSpecific 6425 self.messages = "1" 6426 outerBody = "" 6427 innerBody1 = b"Contained body message text. Squarge." 6428 innerBody2 = b"Secondary <i>message</i> text of squarge body." 6429 headers = OrderedDict() 6430 headers["from"] = "sender@host" 6431 headers["to"] = "recipient@domain" 6432 headers["subject"] = "booga booga boo" 6433 headers["content-type"] = 'multipart/alternative; boundary="xyz"' 6434 innerHeaders = OrderedDict() 6435 innerHeaders["subject"] = "this is subject text" 6436 innerHeaders["content-type"] = "text/plain" 6437 innerHeaders2 = OrderedDict() 6438 innerHeaders2["subject"] = "<b>this is subject</b>" 6439 innerHeaders2["content-type"] = "text/html" 6440 self.msgObjs = [ 6441 FakeyMessage( 6442 headers, 6443 (), 6444 None, 6445 outerBody, 6446 123, 6447 [ 6448 FakeyMessage(innerHeaders, (), None, innerBody1, None, None), 6449 FakeyMessage(innerHeaders2, (), None, innerBody2, None, None), 6450 ], 6451 ) 6452 ] 6453 self.expected = {0: [["BODY", ["1"], "Contained body message text. Squarge."]]} 6454 6455 def result(R): 6456 self.result = R 6457 6458 self.connected.addCallback( 6459 lambda _: self.function(self.messages, headerNumber=1) 6460 ) 6461 self.connected.addCallback(result) 6462 self.connected.addCallback(self._cbStopClient) 6463 self.connected.addErrback(self._ebGeneral) 6464 6465 d = loopback.loopbackTCP(self.server, self.client, noisy=False) 6466 d.addCallback(lambda ign: self.assertEqual(self.result, self.expected)) 6467 return d 6468 6469 def test_fetchBodyPartOfNonMultipart(self): 6470 """ 6471 Single-part messages have an implicit first part which clients 6472 should be able to retrieve explicitly. Test that a client 6473 requesting part 1 of a text/plain message receives the body of the 6474 text/plain part. 6475 """ 6476 self.function = self.client.fetchSpecific 6477 self.messages = "1" 6478 parts = [1] 6479 outerBody = b"DA body" 6480 headers = OrderedDict() 6481 headers["from"] = "sender@host" 6482 headers["to"] = "recipient@domain" 6483 headers["subject"] = "booga booga boo" 6484 headers["content-type"] = "text/plain" 6485 self.msgObjs = [FakeyMessage(headers, (), None, outerBody, 123, None)] 6486 6487 self.expected = {0: [["BODY", ["1"], "DA body"]]} 6488 6489 def result(R): 6490 self.result = R 6491 6492 self.connected.addCallback( 6493 lambda _: self.function(self.messages, headerNumber=parts) 6494 ) 6495 self.connected.addCallback(result) 6496 self.connected.addCallback(self._cbStopClient) 6497 self.connected.addErrback(self._ebGeneral) 6498 6499 d = loopback.loopbackTCP(self.server, self.client, noisy=False) 6500 d.addCallback(lambda ign: self.assertEqual(self.result, self.expected)) 6501 return d 6502 6503 def testFetchSize(self, uid=0): 6504 self.function = self.client.fetchSize 6505 self.messages = "1:100,2:*" 6506 self.msgObjs = [ 6507 FakeyMessage({}, (), b"", b"x" * 20, 123, None), 6508 ] 6509 self.expected = { 6510 0: {"RFC822.SIZE": "20"}, 6511 } 6512 return self._fetchWork(uid) 6513 6514 def testFetchSizeUID(self): 6515 return self.testFetchSize(1) 6516 6517 def testFetchFull(self, uid=0): 6518 self.function = self.client.fetchFull 6519 self.messages = "1,3" 6520 self.msgObjs = [ 6521 FakeyMessage( 6522 {}, 6523 ("\\XYZ", "\\YZX", "Abc"), 6524 b"Sun, 25 Jul 2010 06:20:30 -0400 (EDT)", 6525 b"xyz" * 2, 6526 654, 6527 None, 6528 ), 6529 FakeyMessage( 6530 {}, 6531 ("\\One", "\\Two", "Three"), 6532 b"Mon, 14 Apr 2003 19:43:44 -0400", 6533 b"abc" * 4, 6534 555, 6535 None, 6536 ), 6537 ] 6538 self.expected = { 6539 0: { 6540 "FLAGS": ["\\XYZ", "\\YZX", "Abc"], 6541 "INTERNALDATE": "25-Jul-2010 06:20:30 -0400", 6542 "RFC822.SIZE": "6", 6543 "ENVELOPE": [ 6544 None, 6545 None, 6546 [[None, None, None]], 6547 [[None, None, None]], 6548 None, 6549 None, 6550 None, 6551 None, 6552 None, 6553 None, 6554 ], 6555 "BODY": [None, None, None, None, None, None, "6"], 6556 }, 6557 1: { 6558 "FLAGS": ["\\One", "\\Two", "Three"], 6559 "INTERNALDATE": "14-Apr-2003 19:43:44 -0400", 6560 "RFC822.SIZE": "12", 6561 "ENVELOPE": [ 6562 None, 6563 None, 6564 [[None, None, None]], 6565 [[None, None, None]], 6566 None, 6567 None, 6568 None, 6569 None, 6570 None, 6571 None, 6572 ], 6573 "BODY": [None, None, None, None, None, None, "12"], 6574 }, 6575 } 6576 return self._fetchWork(uid) 6577 6578 def testFetchFullUID(self): 6579 return self.testFetchFull(1) 6580 6581 def testFetchAll(self, uid=0): 6582 self.function = self.client.fetchAll 6583 self.messages = "1,2:3" 6584 self.msgObjs = [ 6585 FakeyMessage( 6586 {}, (), b"Mon, 14 Apr 2003 19:43:44 +0400", b"Lalala", 10101, None 6587 ), 6588 FakeyMessage( 6589 {}, (), b"Tue, 15 Apr 2003 19:43:44 +0200", b"Alalal", 20202, None 6590 ), 6591 ] 6592 self.expected = { 6593 0: { 6594 "ENVELOPE": [ 6595 None, 6596 None, 6597 [[None, None, None]], 6598 [[None, None, None]], 6599 None, 6600 None, 6601 None, 6602 None, 6603 None, 6604 None, 6605 ], 6606 "RFC822.SIZE": "6", 6607 "INTERNALDATE": "14-Apr-2003 19:43:44 +0400", 6608 "FLAGS": [], 6609 }, 6610 1: { 6611 "ENVELOPE": [ 6612 None, 6613 None, 6614 [[None, None, None]], 6615 [[None, None, None]], 6616 None, 6617 None, 6618 None, 6619 None, 6620 None, 6621 None, 6622 ], 6623 "RFC822.SIZE": "6", 6624 "INTERNALDATE": "15-Apr-2003 19:43:44 +0200", 6625 "FLAGS": [], 6626 }, 6627 } 6628 return self._fetchWork(uid) 6629 6630 def testFetchAllUID(self): 6631 return self.testFetchAll(1) 6632 6633 def testFetchFast(self, uid=0): 6634 self.function = self.client.fetchFast 6635 self.messages = "1" 6636 self.msgObjs = [ 6637 FakeyMessage({}, ("\\X",), b"19 Mar 2003 19:22:21 -0500", b"", 9, None), 6638 ] 6639 self.expected = { 6640 0: { 6641 "FLAGS": ["\\X"], 6642 "INTERNALDATE": "19-Mar-2003 19:22:21 -0500", 6643 "RFC822.SIZE": "0", 6644 }, 6645 } 6646 return self._fetchWork(uid) 6647 6648 def testFetchFastUID(self): 6649 return self.testFetchFast(1) 6650 6651 6652class DefaultSearchTests(IMAP4HelperMixin, TestCase): 6653 """ 6654 Test the behavior of the server's SEARCH implementation, particularly in 6655 the face of unhandled search terms. 6656 """ 6657 6658 def setUp(self): 6659 self.server = imap4.IMAP4Server() 6660 self.server.state = "select" 6661 self.server.mbox = self 6662 self.connected = defer.Deferred() 6663 self.client = SimpleClient(self.connected) 6664 self.msgObjs = [ 6665 FakeyMessage({}, (), b"", b"", 999, None), 6666 FakeyMessage({}, (), b"", b"", 10101, None), 6667 FakeyMessage({}, (), b"", b"", 12345, None), 6668 FakeyMessage({}, (), b"", b"", 20001, None), 6669 FakeyMessage({}, (), b"", b"", 20002, None), 6670 ] 6671 6672 def fetch(self, messages, uid): 6673 """ 6674 Pretend to be a mailbox and let C{self.server} lookup messages on me. 6675 """ 6676 return list(zip(range(1, len(self.msgObjs) + 1), self.msgObjs)) 6677 6678 def _messageSetSearchTest(self, queryTerms, expectedMessages): 6679 """ 6680 Issue a search with given query and verify that the returned messages 6681 match the given expected messages. 6682 6683 @param queryTerms: A string giving the search query. 6684 @param expectedMessages: A list of the message sequence numbers 6685 expected as the result of the search. 6686 @return: A L{Deferred} which fires when the test is complete. 6687 """ 6688 6689 def search(): 6690 return self.client.search(queryTerms) 6691 6692 d = self.connected.addCallback(strip(search)) 6693 6694 def searched(results): 6695 self.assertEqual(results, expectedMessages) 6696 6697 d.addCallback(searched) 6698 d.addCallback(self._cbStopClient) 6699 d.addErrback(self._ebGeneral) 6700 self.loopback() 6701 return d 6702 6703 def test_searchMessageSet(self): 6704 """ 6705 Test that a search which starts with a message set properly limits 6706 the search results to messages in that set. 6707 """ 6708 return self._messageSetSearchTest("1", [1]) 6709 6710 def test_searchMessageSetWithStar(self): 6711 """ 6712 If the search filter ends with a star, all the message from the 6713 starting point are returned. 6714 """ 6715 return self._messageSetSearchTest("2:*", [2, 3, 4, 5]) 6716 6717 def test_searchMessageSetWithStarFirst(self): 6718 """ 6719 If the search filter starts with a star, the result should be identical 6720 with if the filter would end with a star. 6721 """ 6722 return self._messageSetSearchTest("*:2", [2, 3, 4, 5]) 6723 6724 def test_searchMessageSetUIDWithStar(self): 6725 """ 6726 If the search filter ends with a star, all the message from the 6727 starting point are returned (also for the SEARCH UID case). 6728 """ 6729 return self._messageSetSearchTest("UID 10000:*", [2, 3, 4, 5]) 6730 6731 def test_searchMessageSetUIDWithStarFirst(self): 6732 """ 6733 If the search filter starts with a star, the result should be identical 6734 with if the filter would end with a star (also for the SEARCH UID case). 6735 """ 6736 return self._messageSetSearchTest("UID *:10000", [2, 3, 4, 5]) 6737 6738 def test_searchMessageSetUIDWithStarAndHighStart(self): 6739 """ 6740 A search filter of 1234:* should include the UID of the last message in 6741 the mailbox, even if its UID is less than 1234. 6742 """ 6743 # in our fake mbox the highest message UID is 20002 6744 return self._messageSetSearchTest("UID 30000:*", [5]) 6745 6746 def test_searchMessageSetWithList(self): 6747 """ 6748 If the search filter contains nesting terms, one of which includes a 6749 message sequence set with a wildcard, IT ALL WORKS GOOD. 6750 """ 6751 # 6 is bigger than the biggest message sequence number, but that's 6752 # okay, because N:* includes the biggest message sequence number even 6753 # if N is bigger than that (read the rfc nub). 6754 return self._messageSetSearchTest("(6:*)", [5]) 6755 6756 def test_searchOr(self): 6757 """ 6758 If the search filter contains an I{OR} term, all messages 6759 which match either subexpression are returned. 6760 """ 6761 return self._messageSetSearchTest("OR 1 2", [1, 2]) 6762 6763 def test_searchOrMessageSet(self): 6764 """ 6765 If the search filter contains an I{OR} term with a 6766 subexpression which includes a message sequence set wildcard, 6767 all messages in that set are considered for inclusion in the 6768 results. 6769 """ 6770 return self._messageSetSearchTest("OR 2:* 2:*", [2, 3, 4, 5]) 6771 6772 def test_searchNot(self): 6773 """ 6774 If the search filter contains a I{NOT} term, all messages 6775 which do not match the subexpression are returned. 6776 """ 6777 return self._messageSetSearchTest("NOT 3", [1, 2, 4, 5]) 6778 6779 def test_searchNotMessageSet(self): 6780 """ 6781 If the search filter contains a I{NOT} term with a 6782 subexpression which includes a message sequence set wildcard, 6783 no messages in that set are considered for inclusion in the 6784 result. 6785 """ 6786 return self._messageSetSearchTest("NOT 2:*", [1]) 6787 6788 def test_searchAndMessageSet(self): 6789 """ 6790 If the search filter contains multiple terms implicitly 6791 conjoined with a message sequence set wildcard, only the 6792 intersection of the results of each term are returned. 6793 """ 6794 return self._messageSetSearchTest("2:* 3", [3]) 6795 6796 def test_searchInvalidCriteria(self): 6797 """ 6798 If the search criteria is not a valid key, a NO result is returned to 6799 the client (resulting in an error callback), and an IllegalQueryError is 6800 logged on the server side. 6801 """ 6802 queryTerms = "FOO" 6803 6804 def search(): 6805 return self.client.search(queryTerms) 6806 6807 d = self.connected.addCallback(strip(search)) 6808 d = self.assertFailure(d, imap4.IMAP4Exception) 6809 6810 def errorReceived(results): 6811 """ 6812 Verify that the server logs an IllegalQueryError and the 6813 client raises an IMAP4Exception with 'Search failed:...' 6814 """ 6815 self.client.transport.loseConnection() 6816 self.server.transport.loseConnection() 6817 6818 # Check what the server logs 6819 errors = self.flushLoggedErrors(imap4.IllegalQueryError) 6820 self.assertEqual(len(errors), 1) 6821 6822 # Verify exception given to client has the correct message 6823 self.assertEqual( 6824 str(b"SEARCH failed: Invalid search command FOO"), 6825 str(results), 6826 ) 6827 6828 d.addCallback(errorReceived) 6829 d.addErrback(self._ebGeneral) 6830 self.loopback() 6831 return d 6832 6833 6834@implementer(imap4.ISearchableMailbox) 6835class FetchSearchStoreTests(TestCase, IMAP4HelperMixin): 6836 def setUp(self): 6837 self.expected = self.result = None 6838 self.server_received_query = None 6839 self.server_received_uid = None 6840 self.server_received_parts = None 6841 self.server_received_messages = None 6842 6843 self.server = imap4.IMAP4Server() 6844 self.server.state = "select" 6845 self.server.mbox = self 6846 self.connected = defer.Deferred() 6847 self.client = SimpleClient(self.connected) 6848 6849 def search(self, query, uid): 6850 # Look for a specific bad query, so we can verify we handle it properly 6851 if query == [b"FOO"]: 6852 raise imap4.IllegalQueryError("FOO is not a valid search criteria") 6853 6854 self.server_received_query = query 6855 self.server_received_uid = uid 6856 return self.expected 6857 6858 def addListener(self, *a, **kw): 6859 pass 6860 6861 removeListener = addListener 6862 6863 def _searchWork(self, uid): 6864 def search(): 6865 return self.client.search(self.query, uid=uid) 6866 6867 def result(R): 6868 self.result = R 6869 6870 self.connected.addCallback(strip(search)).addCallback(result).addCallback( 6871 self._cbStopClient 6872 ).addErrback(self._ebGeneral) 6873 6874 def check(ignored): 6875 # Ensure no short-circuiting weirdness is going on 6876 self.assertFalse(self.result is self.expected) 6877 6878 self.assertEqual(self.result, self.expected) 6879 self.assertEqual(self.uid, self.server_received_uid) 6880 self.assertEqual( 6881 # Queries should be decoded as ASCII unless a charset 6882 # identifier is provided. See #9201. 6883 imap4.parseNestedParens(self.query.encode("charmap")), 6884 self.server_received_query, 6885 ) 6886 6887 d = loopback.loopbackTCP(self.server, self.client, noisy=False) 6888 d.addCallback(check) 6889 return d 6890 6891 def testSearch(self): 6892 self.query = imap4.Or( 6893 imap4.Query(header=("subject", "substring")), 6894 imap4.Query(larger=1024, smaller=4096), 6895 ) 6896 self.expected = [1, 4, 5, 7] 6897 self.uid = 0 6898 return self._searchWork(0) 6899 6900 def testUIDSearch(self): 6901 self.query = imap4.Or( 6902 imap4.Query(header=("subject", "substring")), 6903 imap4.Query(larger=1024, smaller=4096), 6904 ) 6905 self.uid = 1 6906 self.expected = [1, 2, 3] 6907 return self._searchWork(1) 6908 6909 def getUID(self, msg): 6910 try: 6911 return self.expected[msg]["UID"] 6912 except (TypeError, IndexError): 6913 return self.expected[msg - 1] 6914 except KeyError: 6915 return 42 6916 6917 def fetch(self, messages, uid): 6918 self.server_received_uid = uid 6919 self.server_received_messages = str(messages) 6920 return self.expected 6921 6922 def _fetchWork(self, fetch): 6923 def result(R): 6924 self.result = R 6925 6926 self.connected.addCallback(strip(fetch)).addCallback(result).addCallback( 6927 self._cbStopClient 6928 ).addErrback(self._ebGeneral) 6929 6930 def check(ignored): 6931 # Ensure no short-circuiting weirdness is going on 6932 self.assertFalse(self.result is self.expected) 6933 6934 self.parts and self.parts.sort() 6935 self.server_received_parts and self.server_received_parts.sort() 6936 6937 if self.uid: 6938 for (k, v) in self.expected.items(): 6939 v["UID"] = str(k) 6940 6941 self.assertEqual(self.result, self.expected) 6942 self.assertEqual(self.uid, self.server_received_uid) 6943 self.assertEqual(self.parts, self.server_received_parts) 6944 self.assertEqual( 6945 imap4.parseIdList(self.messages), 6946 imap4.parseIdList(self.server_received_messages), 6947 ) 6948 6949 d = loopback.loopbackTCP(self.server, self.client, noisy=False) 6950 d.addCallback(check) 6951 return d 6952 6953 def test_invalidTerm(self): 6954 """ 6955 If, as part of a search, an ISearchableMailbox raises an 6956 IllegalQueryError (e.g. due to invalid search criteria), client sees a 6957 failure response, and an IllegalQueryError is logged on the server. 6958 """ 6959 query = "FOO" 6960 6961 def search(): 6962 return self.client.search(query) 6963 6964 d = self.connected.addCallback(strip(search)) 6965 d = self.assertFailure(d, imap4.IMAP4Exception) 6966 6967 def errorReceived(results): 6968 """ 6969 Verify that the server logs an IllegalQueryError and the 6970 client raises an IMAP4Exception with 'Search failed:...' 6971 """ 6972 self.client.transport.loseConnection() 6973 self.server.transport.loseConnection() 6974 6975 # Check what the server logs 6976 errors = self.flushLoggedErrors(imap4.IllegalQueryError) 6977 self.assertEqual(len(errors), 1) 6978 6979 # Verify exception given to client has the correct message 6980 self.assertEqual( 6981 str(b"SEARCH failed: FOO is not a valid search criteria"), str(results) 6982 ) 6983 6984 d.addCallback(errorReceived) 6985 d.addErrback(self._ebGeneral) 6986 self.loopback() 6987 return d 6988 6989 6990class FakeMailbox: 6991 def __init__(self): 6992 self.args = [] 6993 6994 def addMessage(self, body, flags, date): 6995 self.args.append((body, flags, date)) 6996 return defer.succeed(None) 6997 6998 6999@implementer(imap4.IMessageFile) 7000class FeaturefulMessage: 7001 def getFlags(self): 7002 return "flags" 7003 7004 def getInternalDate(self): 7005 return "internaldate" 7006 7007 def open(self): 7008 return BytesIO(b"open") 7009 7010 7011@implementer(imap4.IMessageCopier) 7012class MessageCopierMailbox: 7013 def __init__(self): 7014 self.msgs = [] 7015 7016 def copy(self, msg): 7017 self.msgs.append(msg) 7018 return len(self.msgs) 7019 7020 7021class CopyWorkerTests(TestCase): 7022 def testFeaturefulMessage(self): 7023 s = imap4.IMAP4Server() 7024 7025 # Yes. I am grabbing this uber-non-public method to test it. 7026 # It is complex. It needs to be tested directly! 7027 # Perhaps it should be refactored, simplified, or split up into 7028 # not-so-private components, but that is a task for another day. 7029 7030 # Ha ha! Addendum! Soon it will be split up, and this test will 7031 # be re-written to just use the default adapter for IMailbox to 7032 # IMessageCopier and call .copy on that adapter. 7033 f = s._IMAP4Server__cbCopy 7034 7035 m = FakeMailbox() 7036 d = f([(i, FeaturefulMessage()) for i in range(1, 11)], "tag", m) 7037 7038 def cbCopy(results): 7039 for a in m.args: 7040 self.assertEqual(a[0].read(), b"open") 7041 self.assertEqual(a[1], "flags") 7042 self.assertEqual(a[2], "internaldate") 7043 7044 for (status, result) in results: 7045 self.assertTrue(status) 7046 self.assertEqual(result, None) 7047 7048 return d.addCallback(cbCopy) 7049 7050 def testUnfeaturefulMessage(self): 7051 s = imap4.IMAP4Server() 7052 7053 # See above comment 7054 f = s._IMAP4Server__cbCopy 7055 7056 m = FakeMailbox() 7057 msgs = [ 7058 FakeyMessage( 7059 {"Header-Counter": str(i)}, (), b"Date", b"Body %d" % (i,), i + 10, None 7060 ) 7061 for i in range(1, 11) 7062 ] 7063 d = f([im for im in zip(range(1, 11), msgs)], "tag", m) 7064 7065 def cbCopy(results): 7066 seen = [] 7067 for a in m.args: 7068 seen.append(a[0].read()) 7069 self.assertEqual(a[1], ()) 7070 self.assertEqual(a[2], b"Date") 7071 7072 seen.sort() 7073 exp = sorted( 7074 b"Header-Counter: %d\r\n\r\nBody %d" % (i, i) for i in range(1, 11) 7075 ) 7076 self.assertEqual(seen, exp) 7077 7078 for (status, result) in results: 7079 self.assertTrue(status) 7080 self.assertEqual(result, None) 7081 7082 return d.addCallback(cbCopy) 7083 7084 def testMessageCopier(self): 7085 s = imap4.IMAP4Server() 7086 7087 # See above comment 7088 f = s._IMAP4Server__cbCopy 7089 7090 m = MessageCopierMailbox() 7091 msgs = [object() for i in range(1, 11)] 7092 d = f([im for im in zip(range(1, 11), msgs)], b"tag", m) 7093 7094 def cbCopy(results): 7095 self.assertEqual(results, list(zip([1] * 10, range(1, 11)))) 7096 for (orig, new) in zip(msgs, m.msgs): 7097 self.assertIdentical(orig, new) 7098 7099 return d.addCallback(cbCopy) 7100 7101 7102@skipIf(not ClientTLSContext, "OpenSSL not present") 7103@skipIf(not interfaces.IReactorSSL(reactor, None), "Reactor doesn't support SSL") 7104class TLSTests(IMAP4HelperMixin, TestCase): 7105 serverCTX = None 7106 clientCTX = None 7107 if ServerTLSContext: 7108 serverCTX = ServerTLSContext() 7109 if ClientTLSContext: 7110 clientCTX = ClientTLSContext() 7111 7112 def loopback(self): 7113 return loopback.loopbackTCP(self.server, self.client, noisy=False) 7114 7115 def testAPileOfThings(self): 7116 SimpleServer.theAccount.addMailbox(b"inbox") 7117 called = [] 7118 7119 def login(): 7120 called.append(None) 7121 return self.client.login(b"testuser", b"password-test") 7122 7123 def list(): 7124 called.append(None) 7125 return self.client.list(b"inbox", b"%") 7126 7127 def status(): 7128 called.append(None) 7129 return self.client.status(b"inbox", "UIDNEXT") 7130 7131 def examine(): 7132 called.append(None) 7133 return self.client.examine(b"inbox") 7134 7135 def logout(): 7136 called.append(None) 7137 return self.client.logout() 7138 7139 self.client.requireTransportSecurity = True 7140 7141 methods = [login, list, status, examine, logout] 7142 for method in methods: 7143 self.connected.addCallback(strip(method)) 7144 7145 self.connected.addCallbacks(self._cbStopClient, self._ebGeneral) 7146 7147 def check(ignored): 7148 self.assertEqual(self.server.startedTLS, True) 7149 self.assertEqual(self.client.startedTLS, True) 7150 self.assertEqual(len(called), len(methods)) 7151 7152 d = self.loopback() 7153 d.addCallback(check) 7154 return d 7155 7156 def testLoginLogin(self): 7157 self.server.checker.addUser(b"testuser", b"password-test") 7158 success = [] 7159 self.client.registerAuthenticator(imap4.LOGINAuthenticator(b"testuser")) 7160 self.connected.addCallback( 7161 lambda _: self.client.authenticate(b"password-test") 7162 ).addCallback(lambda _: self.client.logout()).addCallback( 7163 success.append 7164 ).addCallback( 7165 self._cbStopClient 7166 ).addErrback( 7167 self._ebGeneral 7168 ) 7169 7170 d = self.loopback() 7171 d.addCallback(lambda x: self.assertEqual(len(success), 1)) 7172 return d 7173 7174 def startTLSAndAssertSession(self): 7175 """ 7176 Begin a C{STARTTLS} sequence and assert that it results in a 7177 TLS session. 7178 7179 @return: A L{Deferred} that fires when the underlying 7180 connection between the client and server has been terminated. 7181 """ 7182 success = [] 7183 self.connected.addCallback(strip(self.client.startTLS)) 7184 7185 def checkSecure(ignored): 7186 self.assertTrue(interfaces.ISSLTransport.providedBy(self.client.transport)) 7187 7188 self.connected.addCallback(checkSecure) 7189 self.connected.addCallback(success.append) 7190 7191 d = self.loopback() 7192 d.addCallback(lambda x: self.assertTrue(success)) 7193 return defer.gatherResults([d, self.connected]) 7194 7195 def test_startTLS(self): 7196 """ 7197 L{IMAP4Client.startTLS} triggers TLS negotiation and returns a 7198 L{Deferred} which fires after the client's transport is using 7199 encryption. 7200 """ 7201 disconnected = self.startTLSAndAssertSession() 7202 self.connected.addCallback(self._cbStopClient) 7203 self.connected.addErrback(self._ebGeneral) 7204 return disconnected 7205 7206 def test_doubleSTARTTLS(self): 7207 """ 7208 A server that receives a second C{STARTTLS} sends a C{NO} 7209 response. 7210 """ 7211 7212 class DoubleSTARTTLSClient(SimpleClient): 7213 def startTLS(self): 7214 if not self.startedTLS: 7215 return SimpleClient.startTLS(self) 7216 7217 return self.sendCommand(imap4.Command(b"STARTTLS")) 7218 7219 self.client = DoubleSTARTTLSClient( 7220 self.connected, contextFactory=self.clientCTX 7221 ) 7222 7223 disconnected = self.startTLSAndAssertSession() 7224 7225 self.connected.addCallback(strip(self.client.startTLS)) 7226 self.connected.addErrback( 7227 self.assertClientFailureMessage, b"TLS already negotiated" 7228 ) 7229 7230 self.connected.addCallback(self._cbStopClient) 7231 self.connected.addErrback(self._ebGeneral) 7232 7233 return disconnected 7234 7235 def test_startTLSWithExistingChallengers(self): 7236 """ 7237 Starting a TLS negotiation with an L{IMAP4Server} that already 7238 has C{LOGIN} and C{PLAIN} L{IChallengeResponse} factories uses 7239 those factories. 7240 """ 7241 self.server.challengers = { 7242 b"LOGIN": imap4.LOGINCredentials, 7243 b"PLAIN": imap4.PLAINCredentials, 7244 } 7245 7246 @defer.inlineCallbacks 7247 def assertLOGINandPLAIN(): 7248 capabilities = yield self.client.getCapabilities() 7249 self.assertIn(b"AUTH", capabilities) 7250 self.assertIn(b"LOGIN", capabilities[b"AUTH"]) 7251 self.assertIn(b"PLAIN", capabilities[b"AUTH"]) 7252 7253 self.connected.addCallback(strip(assertLOGINandPLAIN)) 7254 7255 disconnected = self.startTLSAndAssertSession() 7256 7257 self.connected.addCallback(strip(assertLOGINandPLAIN)) 7258 7259 self.connected.addCallback(self._cbStopClient) 7260 self.connected.addErrback(self._ebGeneral) 7261 7262 return disconnected 7263 7264 def test_loginBeforeSTARTTLS(self): 7265 """ 7266 A client that attempts to log in before issuing the 7267 C{STARTTLS} command receives a C{NO} response. 7268 """ 7269 # Prevent the client from issuing STARTTLS. 7270 self.client.startTLS = lambda: defer.succeed( 7271 ([], "OK Begin TLS negotiation now") 7272 ) 7273 self.connected.addCallback( 7274 lambda _: self.client.login(b"wrong", b"time"), 7275 ) 7276 7277 self.connected.addErrback( 7278 self.assertClientFailureMessage, 7279 b"LOGIN is disabled before STARTTLS", 7280 ) 7281 7282 self.connected.addCallback(self._cbStopClient) 7283 self.connected.addErrback(self._ebGeneral) 7284 7285 return defer.gatherResults([self.loopback(), self.connected]) 7286 7287 def testFailedStartTLS(self): 7288 failures = [] 7289 7290 def breakServerTLS(ign): 7291 self.server.canStartTLS = False 7292 7293 self.connected.addCallback(breakServerTLS) 7294 self.connected.addCallback(lambda ign: self.client.startTLS()) 7295 self.connected.addErrback( 7296 lambda err: failures.append(err.trap(imap4.IMAP4Exception)) 7297 ) 7298 self.connected.addCallback(self._cbStopClient) 7299 self.connected.addErrback(self._ebGeneral) 7300 7301 def check(ignored): 7302 self.assertTrue(failures) 7303 self.assertIdentical(failures[0], imap4.IMAP4Exception) 7304 7305 return self.loopback().addCallback(check) 7306 7307 7308class SlowMailbox(SimpleMailbox): 7309 howSlow = 2 7310 callLater = None 7311 fetchDeferred = None 7312 7313 # Not a very nice implementation of fetch(), but it'll 7314 # do for the purposes of testing. 7315 def fetch(self, messages, uid): 7316 d = defer.Deferred() 7317 self.callLater(self.howSlow, d.callback, ()) 7318 self.fetchDeferred.callback(None) 7319 return d 7320 7321 7322class TimeoutTests(IMAP4HelperMixin, TestCase): 7323 def test_serverTimeout(self): 7324 """ 7325 The *client* has a timeout mechanism which will close connections that 7326 are inactive for a period. 7327 """ 7328 c = Clock() 7329 self.server.timeoutTest = True 7330 self.client.timeout = 5 # seconds 7331 self.client.callLater = c.callLater 7332 self.selectedArgs = None 7333 7334 def login(): 7335 d = self.client.login(b"testuser", b"password-test") 7336 c.advance(5) 7337 d.addErrback(timedOut) 7338 return d 7339 7340 def timedOut(failure): 7341 self._cbStopClient(None) 7342 failure.trap(error.TimeoutError) 7343 7344 d = self.connected.addCallback(strip(login)) 7345 d.addErrback(self._ebGeneral) 7346 return defer.gatherResults([d, self.loopback()]) 7347 7348 def test_serverTimesOut(self): 7349 """ 7350 The server times out a connection. 7351 """ 7352 c = Clock() 7353 self.server.callLater = c.callLater 7354 7355 def login(): 7356 return self.client.login(b"testuser", b"password-test") 7357 7358 def expireTime(): 7359 c.advance(self.server.POSTAUTH_TIMEOUT * 2) 7360 7361 d = self.connected.addCallback(strip(login)) 7362 d.addCallback(strip(expireTime)) 7363 7364 # The loopback method's Deferred fires the connection is 7365 # closed, and the server closes the connection as a result of 7366 # expireTime. 7367 return defer.gatherResults([d, self.loopback()]) 7368 7369 def test_serverUnselectsMailbox(self): 7370 """ 7371 The server unsets the selected mailbox when timing out a 7372 connection. 7373 """ 7374 self.patch(SimpleServer.theAccount, "mailboxFactory", UncloseableMailbox) 7375 SimpleServer.theAccount.addMailbox("mailbox-test") 7376 mbox = SimpleServer.theAccount.mailboxes["MAILBOX-TEST"] 7377 self.assertFalse(ICloseableMailboxIMAP.providedBy(mbox)) 7378 7379 c = Clock() 7380 self.server.callLater = c.callLater 7381 7382 def login(): 7383 return self.client.login(b"testuser", b"password-test") 7384 7385 def select(): 7386 return self.client.select("mailbox-test") 7387 7388 def assertSet(): 7389 self.assertIs(mbox, self.server.mbox) 7390 7391 def expireTime(): 7392 c.advance(self.server.POSTAUTH_TIMEOUT * 2) 7393 7394 def assertUnset(): 7395 self.assertFalse(self.server.mbox) 7396 7397 d = self.connected.addCallback(strip(login)) 7398 d.addCallback(strip(select)) 7399 d.addCallback(strip(assertSet)) 7400 d.addCallback(strip(expireTime)) 7401 d.addCallback(strip(assertUnset)) 7402 7403 # The loopback method's Deferred fires the connection is 7404 # closed, and the server closes the connection as a result of 7405 # expireTime. 7406 return defer.gatherResults([d, self.loopback()]) 7407 7408 def test_serverTimesOutAndClosesMailbox(self): 7409 """ 7410 The server closes the selected, closeable mailbox when timing 7411 out a connection. 7412 """ 7413 SimpleServer.theAccount.addMailbox("mailbox-test") 7414 mbox = SimpleServer.theAccount.mailboxes["MAILBOX-TEST"] 7415 verifyObject(ICloseableMailboxIMAP, mbox) 7416 7417 c = Clock() 7418 self.server.callLater = c.callLater 7419 7420 def login(): 7421 return self.client.login(b"testuser", b"password-test") 7422 7423 def select(): 7424 return self.client.select("mailbox-test") 7425 7426 def assertMailboxOpen(): 7427 self.assertFalse(mbox.closed) 7428 7429 def expireTime(): 7430 c.advance(self.server.POSTAUTH_TIMEOUT * 2) 7431 7432 def assertMailboxClosed(): 7433 self.assertTrue(mbox.closed) 7434 7435 d = self.connected.addCallback(strip(login)) 7436 d.addCallback(strip(select)) 7437 d.addCallback(strip(assertMailboxOpen)) 7438 d.addCallback(strip(expireTime)) 7439 d.addCallback(strip(assertMailboxClosed)) 7440 7441 # The loopback method's Deferred fires the connection is 7442 # closed, and the server closes the connection as a result of 7443 # expireTime. 7444 return defer.gatherResults([d, self.loopback()]) 7445 7446 def test_longFetchDoesntTimeout(self): 7447 """ 7448 The connection timeout does not take effect during fetches. 7449 """ 7450 c = Clock() 7451 SlowMailbox.callLater = c.callLater 7452 SlowMailbox.fetchDeferred = defer.Deferred() 7453 self.server.callLater = c.callLater 7454 SimpleServer.theAccount.mailboxFactory = SlowMailbox 7455 SimpleServer.theAccount.addMailbox("mailbox-test") 7456 7457 self.server.setTimeout(1) 7458 7459 def login(): 7460 return self.client.login(b"testuser", b"password-test") 7461 7462 def select(): 7463 self.server.setTimeout(1) 7464 return self.client.select("mailbox-test") 7465 7466 def fetch(): 7467 return self.client.fetchUID("1:*") 7468 7469 def stillConnected(): 7470 self.assertNotEqual(self.server.state, "timeout") 7471 7472 def cbAdvance(ignored): 7473 for i in range(4): 7474 c.advance(0.5) 7475 7476 SlowMailbox.fetchDeferred.addCallback(cbAdvance) 7477 7478 d1 = self.connected.addCallback(strip(login)) 7479 d1.addCallback(strip(select)) 7480 d1.addCallback(strip(fetch)) 7481 d1.addCallback(strip(stillConnected)) 7482 d1.addCallback(self._cbStopClient) 7483 d1.addErrback(self._ebGeneral) 7484 d = defer.gatherResults([d1, self.loopback()]) 7485 return d 7486 7487 def test_idleClientDoesDisconnect(self): 7488 """ 7489 The *server* has a timeout mechanism which will close connections that 7490 are inactive for a period. 7491 """ 7492 c = Clock() 7493 # Hook up our server protocol 7494 transport = StringTransportWithDisconnection() 7495 transport.protocol = self.server 7496 self.server.callLater = c.callLater 7497 self.server.makeConnection(transport) 7498 7499 # Make sure we can notice when the connection goes away 7500 lost = [] 7501 connLost = self.server.connectionLost 7502 self.server.connectionLost = lambda reason: ( 7503 lost.append(None), 7504 connLost(reason), 7505 )[1] 7506 7507 # 2/3rds of the idle timeout elapses... 7508 c.pump([0.0] + [self.server.timeOut / 3.0] * 2) 7509 self.assertFalse(lost, lost) 7510 7511 # Now some more 7512 c.pump([0.0, self.server.timeOut / 2.0]) 7513 self.assertTrue(lost) 7514 7515 7516class DisconnectionTests(TestCase): 7517 def testClientDisconnectFailsDeferreds(self): 7518 c = imap4.IMAP4Client() 7519 t = StringTransportWithDisconnection() 7520 c.makeConnection(t) 7521 d = self.assertFailure( 7522 c.login(b"testuser", "example.com"), error.ConnectionDone 7523 ) 7524 c.connectionLost(error.ConnectionDone("Connection closed")) 7525 return d 7526 7527 7528class SynchronousMailbox: 7529 """ 7530 Trivial, in-memory mailbox implementation which can produce a message 7531 synchronously. 7532 """ 7533 7534 def __init__(self, messages): 7535 self.messages = messages 7536 7537 def fetch(self, msgset, uid): 7538 assert not uid, "Cannot handle uid requests." 7539 for msg in msgset: 7540 yield msg, self.messages[msg - 1] 7541 7542 7543class PipeliningTests(TestCase): 7544 """ 7545 Tests for various aspects of the IMAP4 server's pipelining support. 7546 """ 7547 7548 messages = [ 7549 FakeyMessage({}, [], b"", b"0", None, None), 7550 FakeyMessage({}, [], b"", b"1", None, None), 7551 FakeyMessage({}, [], b"", b"2", None, None), 7552 ] 7553 7554 def setUp(self): 7555 self.iterators = [] 7556 7557 self.transport = StringTransport() 7558 self.server = imap4.IMAP4Server(None, None, self.iterateInReactor) 7559 self.server.makeConnection(self.transport) 7560 7561 mailbox = SynchronousMailbox(self.messages) 7562 7563 # Skip over authentication and folder selection 7564 self.server.state = "select" 7565 self.server.mbox = mailbox 7566 7567 # Get rid of any greeting junk 7568 self.transport.clear() 7569 7570 def iterateInReactor(self, iterator): 7571 """ 7572 A fake L{imap4.iterateInReactor} that records the iterators it 7573 receives. 7574 7575 @param iterator: An iterator. 7576 7577 @return: A L{Deferred} associated with this iterator. 7578 """ 7579 d = defer.Deferred() 7580 self.iterators.append((iterator, d)) 7581 return d 7582 7583 def flushPending(self, asLongAs=lambda: True): 7584 """ 7585 Advance pending iterators enqueued with L{iterateInReactor} in 7586 a round-robin fashion, resuming the transport's producer until 7587 it has completed. This ensures bodies are flushed. 7588 7589 @param asLongAs: (optional) An optional predicate function. 7590 Flushing iterators continues as long as there are 7591 iterators and this returns L{True}. 7592 """ 7593 while self.iterators and asLongAs(): 7594 for e in self.iterators[0][0]: 7595 while self.transport.producer: 7596 self.transport.producer.resumeProducing() 7597 else: 7598 self.iterators.pop(0)[1].callback(None) 7599 7600 def tearDown(self): 7601 self.server.connectionLost(failure.Failure(error.ConnectionDone())) 7602 7603 def test_synchronousFetch(self): 7604 """ 7605 Test that pipelined FETCH commands which can be responded to 7606 synchronously are responded to correctly. 7607 """ 7608 # Here's some pipelined stuff 7609 self.server.dataReceived( 7610 b"01 FETCH 1 BODY[]\r\n" b"02 FETCH 2 BODY[]\r\n" b"03 FETCH 3 BODY[]\r\n" 7611 ) 7612 7613 self.flushPending() 7614 7615 self.assertEqual( 7616 self.transport.value(), 7617 b"".join( 7618 [ 7619 b"* 1 FETCH (BODY[] )\r\n", 7620 networkString( 7621 "01 OK FETCH completed\r\n{5}\r\n\r\n\r\n%s" 7622 % (nativeString(self.messages[0].getBodyFile().read()),) 7623 ), 7624 b"* 2 FETCH (BODY[] )\r\n", 7625 networkString( 7626 "02 OK FETCH completed\r\n{5}\r\n\r\n\r\n%s" 7627 % (nativeString(self.messages[1].getBodyFile().read()),) 7628 ), 7629 b"* 3 FETCH (BODY[] )\r\n", 7630 networkString( 7631 "03 OK FETCH completed\r\n{5}\r\n\r\n\r\n%s" 7632 % (nativeString(self.messages[2].getBodyFile().read()),) 7633 ), 7634 ] 7635 ), 7636 ) 7637 7638 def test_bufferedServerStatus(self): 7639 """ 7640 When a server status change occurs during an ongoing FETCH 7641 command, the server status is buffered until the FETCH 7642 completes. 7643 """ 7644 self.server.dataReceived(b"01 FETCH 1,2 BODY[]\r\n") 7645 7646 # Two iterations yields the untagged response and the first 7647 # fetched message's body 7648 twice = functools.partial(next, iter([True, True, False])) 7649 self.flushPending(asLongAs=twice) 7650 7651 self.assertEqual( 7652 self.transport.value(), 7653 b"".join( 7654 [ 7655 # The untagged response... 7656 b"* 1 FETCH (BODY[] )\r\n", 7657 # ...and its body 7658 networkString( 7659 "{5}\r\n\r\n\r\n%s" 7660 % (nativeString(self.messages[0].getBodyFile().read()),) 7661 ), 7662 ] 7663 ), 7664 ) 7665 7666 self.transport.clear() 7667 7668 # A server status change... 7669 self.server.modeChanged(writeable=True) 7670 7671 # ...remains buffered... 7672 self.assertFalse(self.transport.value()) 7673 7674 self.flushPending() 7675 7676 self.assertEqual( 7677 self.transport.value(), 7678 b"".join( 7679 [ 7680 # The untagged response... 7681 b"* 2 FETCH (BODY[] )\r\n", 7682 # ...the status change... 7683 b"* [READ-WRITE]\r\n", 7684 # ...and the completion status and final message's body 7685 networkString( 7686 "01 OK FETCH completed\r\n{5}\r\n\r\n\r\n%s" 7687 % (nativeString(self.messages[1].getBodyFile().read()),) 7688 ), 7689 ] 7690 ), 7691 ) 7692 7693 7694class IMAP4ServerFetchTests(TestCase): 7695 """ 7696 This test case is for the FETCH tests that require 7697 a C{StringTransport}. 7698 """ 7699 7700 def setUp(self): 7701 self.transport = StringTransport() 7702 self.server = imap4.IMAP4Server() 7703 self.server.state = "select" 7704 self.server.makeConnection(self.transport) 7705 7706 def test_fetchWithPartialValidArgument(self): 7707 """ 7708 If by any chance, extra bytes got appended at the end of a valid 7709 FETCH arguments, the client should get a BAD - arguments invalid 7710 response. 7711 7712 See U{RFC 3501<http://tools.ietf.org/html/rfc3501#section-6.4.5>}, 7713 section 6.4.5, 7714 """ 7715 # We need to clear out the welcome message. 7716 self.transport.clear() 7717 # Let's send out the faulty command. 7718 self.server.dataReceived(b"0001 FETCH 1 FULLL\r\n") 7719 expected = b"0001 BAD Illegal syntax: Invalid Argument\r\n" 7720 self.assertEqual(self.transport.value(), expected) 7721 self.transport.clear() 7722 self.server.connectionLost(error.ConnectionDone("Connection closed")) 7723 7724 7725class LiteralTestsMixin: 7726 """ 7727 Shared tests for literal classes. 7728 7729 @ivar literalFactory: A callable that returns instances of the 7730 literal under test. 7731 """ 7732 7733 def setUp(self): 7734 """ 7735 Shared setup. 7736 """ 7737 self.deferred = defer.Deferred() 7738 7739 def test_partialWrite(self): 7740 """ 7741 The literal returns L{None} when given less data than the 7742 literal requires. 7743 """ 7744 literal = self.literalFactory(1024, self.deferred) 7745 self.assertIs(None, literal.write(b"incomplete")) 7746 self.assertNoResult(self.deferred) 7747 7748 def test_exactWrite(self): 7749 """ 7750 The literal returns an empty L{bytes} instance when given 7751 exactly the data the literal requires. 7752 """ 7753 data = b"complete" 7754 literal = self.literalFactory(len(data), self.deferred) 7755 leftover = literal.write(data) 7756 7757 self.assertIsInstance(leftover, bytes) 7758 self.assertFalse(leftover) 7759 self.assertNoResult(self.deferred) 7760 7761 def test_overlongWrite(self): 7762 """ 7763 The literal returns any left over L{bytes} when given more 7764 data than the literal requires. 7765 """ 7766 data = b"completeleftover" 7767 literal = self.literalFactory(len(b"complete"), self.deferred) 7768 7769 leftover = literal.write(data) 7770 7771 self.assertEqual(leftover, b"leftover") 7772 7773 def test_emptyLiteral(self): 7774 """ 7775 The literal returns an empty L{bytes} instance 7776 when given an empty L{bytes} instance. 7777 """ 7778 literal = self.literalFactory(0, self.deferred) 7779 data = b"leftover" 7780 7781 leftover = literal.write(data) 7782 7783 self.assertEqual(leftover, data) 7784 7785 7786class LiteralStringTests(LiteralTestsMixin, SynchronousTestCase): 7787 """ 7788 Tests for L{self.literalFactory}. 7789 """ 7790 7791 literalFactory = imap4.LiteralString 7792 7793 def test_callback(self): 7794 """ 7795 Calling L{imap4.LiteralString.callback} with a line fires the 7796 instance's L{Deferred} with a 2-L{tuple} whose first element 7797 is the collected data and whose second is the provided line. 7798 """ 7799 data = b"data" 7800 extra = b"extra" 7801 7802 literal = imap4.LiteralString(len(data), self.deferred) 7803 7804 for c in iterbytes(data): 7805 literal.write(c) 7806 7807 literal.callback(b"extra") 7808 7809 result = self.successResultOf(self.deferred) 7810 self.assertEqual(result, (data, extra)) 7811 7812 7813class LiteralFileTests(LiteralTestsMixin, TestCase): 7814 """ 7815 Tests for L{imap4.LiteralFile}. 7816 """ 7817 7818 literalFactory = imap4.LiteralFile 7819 7820 def test_callback(self): 7821 """ 7822 Calling L{imap4.LiteralFile.callback} with a line fires the 7823 instance's L{Deferred} with a 2-L{tuple} whose first element 7824 is the file and whose second is the provided line. 7825 """ 7826 data = b"data" 7827 extra = b"extra" 7828 7829 literal = imap4.LiteralFile(len(data), self.deferred) 7830 7831 for c in iterbytes(data): 7832 literal.write(c) 7833 7834 literal.callback(b"extra") 7835 7836 result = self.successResultOf(self.deferred) 7837 self.assertEqual(len(result), 2) 7838 7839 dataFile, extra = result 7840 self.assertEqual(dataFile.read(), b"data") 7841 7842 def test_callbackSpooledToDisk(self): 7843 """ 7844 A L{imap4.LiteralFile} whose size exceeds the maximum 7845 in-memory size spools its content to disk, and invoking its 7846 L{callback} with a line fires the instance's L{Deferred} with 7847 a 2-L{tuple} whose first element is the spooled file and whose second 7848 is the provided line. 7849 """ 7850 data = b"data" 7851 extra = b"extra" 7852 7853 self.patch(imap4.LiteralFile, "_memoryFileLimit", 1) 7854 7855 literal = imap4.LiteralFile(len(data), self.deferred) 7856 7857 for c in iterbytes(data): 7858 literal.write(c) 7859 7860 literal.callback(b"extra") 7861 7862 result = self.successResultOf(self.deferred) 7863 self.assertEqual(len(result), 2) 7864 7865 dataFile, extra = result 7866 self.assertEqual(dataFile.read(), b"data") 7867 7868 7869class WriteBufferTests(SynchronousTestCase): 7870 """ 7871 Tests for L{imap4.WriteBuffer}. 7872 """ 7873 7874 def setUp(self): 7875 self.transport = StringTransport() 7876 7877 def test_partialWrite(self): 7878 """ 7879 L{imap4.WriteBuffer} buffers writes that are smaller than its 7880 buffer size. 7881 """ 7882 buf = imap4.WriteBuffer(self.transport) 7883 data = b"x" * buf.bufferSize 7884 7885 buf.write(data) 7886 7887 self.assertFalse(self.transport.value()) 7888 7889 def test_overlongWrite(self): 7890 """ 7891 L{imap4.WriteBuffer} writes data without buffering it when 7892 the size of the data exceeds the size of its buffer. 7893 """ 7894 buf = imap4.WriteBuffer(self.transport) 7895 data = b"x" * (buf.bufferSize + 1) 7896 7897 buf.write(data) 7898 7899 self.assertEqual(self.transport.value(), data) 7900 7901 def test_writesImplyFlush(self): 7902 """ 7903 L{imap4.WriteBuffer} buffers writes until its buffer's size 7904 exceeds its maximum value. 7905 """ 7906 buf = imap4.WriteBuffer(self.transport) 7907 firstData = b"x" * buf.bufferSize 7908 secondData = b"y" 7909 7910 buf.write(firstData) 7911 7912 self.assertFalse(self.transport.value()) 7913 7914 buf.write(secondData) 7915 7916 self.assertEqual(self.transport.value(), firstData + secondData) 7917 7918 def test_explicitFlush(self): 7919 """ 7920 L{imap4.WriteBuffer.flush} flushes the buffer even when its 7921 size is smaller than the buffer size. 7922 """ 7923 buf = imap4.WriteBuffer(self.transport) 7924 data = b"x" * (buf.bufferSize) 7925 7926 buf.write(data) 7927 7928 self.assertFalse(self.transport.value()) 7929 7930 buf.flush() 7931 7932 self.assertEqual(self.transport.value(), data) 7933 7934 def test_explicitFlushEmptyBuffer(self): 7935 """ 7936 L{imap4.WriteBuffer.flush} has no effect if when the buffer is 7937 empty. 7938 """ 7939 buf = imap4.WriteBuffer(self.transport) 7940 7941 buf.flush() 7942 7943 self.assertFalse(self.transport.value()) 7944