1# -*- test-case-name: twisted.mail.test.test_imap -*-
2# Copyright (c) Twisted Matrix Laboratories.
3# See LICENSE for details.
4
5
6"""
7Test case for twisted.mail.imap4
8"""
9
10import base64
11import codecs
12import functools
13import locale
14import os
15import uuid
16from collections import OrderedDict
17from io import BytesIO
18from itertools import chain
19from typing import List, Optional, Tuple, Type
20from unittest import skipIf
21
22from zope.interface import implementer
23from zope.interface.verify import verifyClass, verifyObject
24
25from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse
26from twisted.cred.credentials import (
27    CramMD5Credentials,
28    IUsernameHashedPassword,
29    IUsernamePassword,
30)
31from twisted.cred.error import UnauthorizedLogin
32from twisted.cred.portal import IRealm, Portal
33from twisted.internet import defer, error, interfaces, reactor
34from twisted.internet.task import Clock
35from twisted.mail import imap4
36from twisted.mail.imap4 import MessageSet
37from twisted.mail.interfaces import (
38    IChallengeResponse,
39    IClientAuthentication,
40    ICloseableMailboxIMAP,
41)
42from twisted.protocols import loopback
43from twisted.python import failure, log, util
44from twisted.python.compat import iterbytes, nativeString, networkString
45from twisted.test.proto_helpers import StringTransport, StringTransportWithDisconnection
46from twisted.trial.unittest import SynchronousTestCase, TestCase
47
48try:
49    from twisted.test.ssl_helpers import ClientTLSContext, ServerTLSContext
50except ImportError:
51    ClientTLSContext = None  # type: ignore[assignment,misc]
52    ServerTLSContext = None  # type: ignore[assignment,misc]
53
54
55def strip(f):
56    return lambda result, f=f: f()
57
58
59class IMAP4UTF7Tests(TestCase):
60    tests = [
61        ["Hello world", b"Hello world"],
62        ["Hello & world", b"Hello &- world"],
63        ["Hello\xffworld", b"Hello&AP8-world"],
64        ["\xff\xfe\xfd\xfc", b"&AP8A,gD9APw-"],
65        [
66            "~peter/mail/\u65e5\u672c\u8a9e/\u53f0\u5317",
67            b"~peter/mail/&ZeVnLIqe-/&U,BTFw-",
68        ],  # example from RFC 2060
69    ]
70
71    def test_encodeWithErrors(self):
72        """
73        Specifying an error policy to C{unicode.encode} with the
74        I{imap4-utf-7} codec should produce the same result as not
75        specifying the error policy.
76        """
77        text = "Hello world"
78        self.assertEqual(
79            text.encode("imap4-utf-7", "strict"), text.encode("imap4-utf-7")
80        )
81
82    def test_decodeWithErrors(self):
83        """
84        Similar to L{test_encodeWithErrors}, but for C{bytes.decode}.
85        """
86        bytes = b"Hello world"
87        self.assertEqual(
88            bytes.decode("imap4-utf-7", "strict"), bytes.decode("imap4-utf-7")
89        )
90
91    def test_encodeAmpersand(self):
92        """
93        Unicode strings that contain an ampersand (C{&}) can be
94        encoded to bytes with the I{imap4-utf-7} codec.
95        """
96        text = "&Hello&\N{VULGAR FRACTION ONE HALF}&"
97        self.assertEqual(
98            text.encode("imap4-utf-7"),
99            b"&-Hello&-&AL0-&-",
100        )
101
102    def test_decodeWithoutFinalASCIIShift(self):
103        """
104        An I{imap4-utf-7} encoded string that does not shift back to
105        ASCII (i.e., it lacks a final C{-}) can be decoded.
106        """
107        self.assertEqual(
108            b"&AL0".decode("imap4-utf-7"),
109            "\N{VULGAR FRACTION ONE HALF}",
110        )
111
112    def test_getreader(self):
113        """
114        C{codecs.getreader('imap4-utf-7')} returns the I{imap4-utf-7} stream
115        reader class.
116        """
117        reader = codecs.getreader("imap4-utf-7")(BytesIO(b"Hello&AP8-world"))
118        self.assertEqual(reader.read(), "Hello\xffworld")
119
120    def test_getwriter(self):
121        """
122        C{codecs.getwriter('imap4-utf-7')} returns the I{imap4-utf-7} stream
123        writer class.
124        """
125        output = BytesIO()
126        writer = codecs.getwriter("imap4-utf-7")(output)
127        writer.write("Hello\xffworld")
128        self.assertEqual(output.getvalue(), b"Hello&AP8-world")
129
130    def test_encode(self):
131        """
132        The I{imap4-utf-7} can be used to encode a unicode string into a byte
133        string according to the IMAP4 modified UTF-7 encoding rules.
134        """
135        for (input, output) in self.tests:
136            self.assertEqual(input.encode("imap4-utf-7"), output)
137
138    def test_decode(self):
139        """
140        The I{imap4-utf-7} can be used to decode a byte string into a unicode
141        string according to the IMAP4 modified UTF-7 encoding rules.
142        """
143        for (input, output) in self.tests:
144            self.assertEqual(input, output.decode("imap4-utf-7"))
145
146    def test_printableSingletons(self):
147        """
148        The IMAP4 modified UTF-7 implementation encodes all printable
149        characters which are in ASCII using the corresponding ASCII byte.
150        """
151        # All printables represent themselves
152        for o in chain(range(0x20, 0x26), range(0x27, 0x7F)):
153            charbyte = chr(o).encode()
154            self.assertEqual(charbyte, chr(o).encode("imap4-utf-7"))
155            self.assertEqual(chr(o), charbyte.decode("imap4-utf-7"))
156        self.assertEqual("&".encode("imap4-utf-7"), b"&-")
157        self.assertEqual(b"&-".decode("imap4-utf-7"), "&")
158
159
160class BufferingConsumer:
161    def __init__(self):
162        self.buffer = []
163
164    def write(self, bytes):
165        self.buffer.append(bytes)
166        if self.consumer:
167            self.consumer.resumeProducing()
168
169    def registerProducer(self, consumer, streaming):
170        self.consumer = consumer
171        self.consumer.resumeProducing()
172
173    def unregisterProducer(self):
174        self.consumer = None
175
176
177class MessageProducerTests(SynchronousTestCase):
178    def testSinglePart(self):
179        body = b"This is body text.  Rar."
180        headers = OrderedDict()
181        headers["from"] = "sender@host"
182        headers["to"] = "recipient@domain"
183        headers["subject"] = "booga booga boo"
184        headers["content-type"] = "text/plain"
185
186        msg = FakeyMessage(headers, (), None, body, 123, None)
187
188        c = BufferingConsumer()
189        p = imap4.MessageProducer(msg)
190        d = p.beginProducing(c)
191
192        def cbProduced(result):
193            self.assertIdentical(result, p)
194            self.assertEqual(
195                b"".join(c.buffer),
196                b"{119}\r\n"
197                b"From: sender@host\r\n"
198                b"To: recipient@domain\r\n"
199                b"Subject: booga booga boo\r\n"
200                b"Content-Type: text/plain\r\n"
201                b"\r\n" + body,
202            )
203
204        return d.addCallback(cbProduced)
205
206    def testSingleMultiPart(self):
207        outerBody = b""
208        innerBody = b"Contained body message text.  Squarge."
209        headers = OrderedDict()
210        headers["from"] = "sender@host"
211        headers["to"] = "recipient@domain"
212        headers["subject"] = "booga booga boo"
213        headers["content-type"] = 'multipart/alternative; boundary="xyz"'
214
215        innerHeaders = OrderedDict()
216        innerHeaders["subject"] = "this is subject text"
217        innerHeaders["content-type"] = "text/plain"
218        msg = FakeyMessage(
219            headers,
220            (),
221            None,
222            outerBody,
223            123,
224            [FakeyMessage(innerHeaders, (), None, innerBody, None, None)],
225        )
226
227        c = BufferingConsumer()
228        p = imap4.MessageProducer(msg)
229        d = p.beginProducing(c)
230
231        def cbProduced(result):
232            self.failUnlessIdentical(result, p)
233
234            self.assertEqual(
235                b"".join(c.buffer),
236                b"{239}\r\n"
237                b"From: sender@host\r\n"
238                b"To: recipient@domain\r\n"
239                b"Subject: booga booga boo\r\n"
240                b'Content-Type: multipart/alternative; boundary="xyz"\r\n'
241                b"\r\n"
242                b"\r\n"
243                b"--xyz\r\n"
244                b"Subject: this is subject text\r\n"
245                b"Content-Type: text/plain\r\n"
246                b"\r\n" + innerBody + b"\r\n--xyz--\r\n",
247            )
248
249        return d.addCallback(cbProduced)
250
251    def testMultipleMultiPart(self):
252        outerBody = b""
253        innerBody1 = b"Contained body message text.  Squarge."
254        innerBody2 = b"Secondary <i>message</i> text of squarge body."
255        headers = OrderedDict()
256        headers["from"] = "sender@host"
257        headers["to"] = "recipient@domain"
258        headers["subject"] = "booga booga boo"
259        headers["content-type"] = 'multipart/alternative; boundary="xyz"'
260        innerHeaders = OrderedDict()
261        innerHeaders["subject"] = "this is subject text"
262        innerHeaders["content-type"] = "text/plain"
263        innerHeaders2 = OrderedDict()
264        innerHeaders2["subject"] = "<b>this is subject</b>"
265        innerHeaders2["content-type"] = "text/html"
266        msg = FakeyMessage(
267            headers,
268            (),
269            None,
270            outerBody,
271            123,
272            [
273                FakeyMessage(innerHeaders, (), None, innerBody1, None, None),
274                FakeyMessage(innerHeaders2, (), None, innerBody2, None, None),
275            ],
276        )
277
278        c = BufferingConsumer()
279        p = imap4.MessageProducer(msg)
280        d = p.beginProducing(c)
281
282        def cbProduced(result):
283            self.failUnlessIdentical(result, p)
284
285            self.assertEqual(
286                b"".join(c.buffer),
287                b"{354}\r\n"
288                b"From: sender@host\r\n"
289                b"To: recipient@domain\r\n"
290                b"Subject: booga booga boo\r\n"
291                b'Content-Type: multipart/alternative; boundary="xyz"\r\n'
292                b"\r\n"
293                b"\r\n"
294                b"--xyz\r\n"
295                b"Subject: this is subject text\r\n"
296                b"Content-Type: text/plain\r\n"
297                b"\r\n" + innerBody1 + b"\r\n--xyz\r\n"
298                b"Subject: <b>this is subject</b>\r\n"
299                b"Content-Type: text/html\r\n"
300                b"\r\n" + innerBody2 + b"\r\n--xyz--\r\n",
301            )
302
303        return d.addCallback(cbProduced)
304
305    def test_multiPartNoBoundary(self):
306        """
307        A boundary is generated if none is provided.
308        """
309        outerBody = b""
310        innerBody = b"Contained body message text.  Squarge."
311        headers = OrderedDict()
312        headers["from"] = "sender@host"
313        headers["to"] = "recipient@domain"
314        headers["subject"] = "booga booga boo"
315        headers["content-type"] = "multipart/alternative"
316
317        innerHeaders = OrderedDict()
318        innerHeaders["subject"] = "this is subject text"
319        innerHeaders["content-type"] = "text/plain"
320        msg = FakeyMessage(
321            headers,
322            (),
323            None,
324            outerBody,
325            123,
326            [FakeyMessage(innerHeaders, (), None, innerBody, None, None)],
327        )
328
329        c = BufferingConsumer()
330        p = imap4.MessageProducer(msg)
331        p._uuid4 = lambda: uuid.UUID("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa")
332
333        d = p.beginProducing(c)
334
335        def cbProduced(result):
336            self.failUnlessIdentical(result, p)
337            self.assertEqual(
338                b"".join(c.buffer),
339                b"{341}\r\n"
340                b"From: sender@host\r\n"
341                b"To: recipient@domain\r\n"
342                b"Subject: booga booga boo\r\n"
343                b"Content-Type: multipart/alternative; boundary="
344                b'"----=aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"'
345                b"\r\n"
346                b"\r\n"
347                b"\r\n"
348                b"------=aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\r\n"
349                b"Subject: this is subject text\r\n"
350                b"Content-Type: text/plain\r\n"
351                b"\r\n"
352                + innerBody
353                + b"\r\n------=aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa--\r\n",
354            )
355
356        return d.addCallback(cbProduced)
357
358    def test_multiPartNoQuotes(self):
359        """
360        A boundary without does not have them added.
361        """
362        outerBody = b""
363        innerBody = b"Contained body message text.  Squarge."
364        headers = OrderedDict()
365        headers["from"] = "sender@host"
366        headers["to"] = "recipient@domain"
367        headers["subject"] = "booga booga boo"
368        headers["content-type"] = "multipart/alternative; boundary=xyz"
369
370        innerHeaders = OrderedDict()
371        innerHeaders["subject"] = "this is subject text"
372        innerHeaders["content-type"] = "text/plain"
373        msg = FakeyMessage(
374            headers,
375            (),
376            None,
377            outerBody,
378            123,
379            [FakeyMessage(innerHeaders, (), None, innerBody, None, None)],
380        )
381
382        c = BufferingConsumer()
383        p = imap4.MessageProducer(msg)
384        d = p.beginProducing(c)
385
386        def cbProduced(result):
387            self.failUnlessIdentical(result, p)
388            self.assertEqual(
389                b"".join(c.buffer),
390                b"{237}\r\n"
391                b"From: sender@host\r\n"
392                b"To: recipient@domain\r\n"
393                b"Subject: booga booga boo\r\n"
394                b"Content-Type: multipart/alternative; boundary="
395                b"xyz"
396                b"\r\n"
397                b"\r\n"
398                b"\r\n"
399                b"--xyz\r\n"
400                b"Subject: this is subject text\r\n"
401                b"Content-Type: text/plain\r\n"
402                b"\r\n" + innerBody + b"\r\n--xyz--\r\n",
403            )
404
405        return d.addCallback(cbProduced)
406
407
408class MessageSetTests(SynchronousTestCase):
409    """
410    Tests for L{MessageSet}.
411    """
412
413    def test_equalityIterationAndAddition(self):
414        """
415        Test the following properties of L{MessageSet} addition and
416        equality:
417
418            1. Two empty L{MessageSet}s are equal to each other;
419
420            2. A L{MessageSet} is not equal to any other object;
421
422            2. Adding a L{MessageSet} and another L{MessageSet} or an
423               L{int} representing a single message or a sequence of
424               L{int}s representing a sequence of message numbers
425               produces a new L{MessageSet} that:
426
427            3. Has a length equal to the number of messages within
428               each sequence of message numbers;
429
430            4. Yields each message number in ascending order when
431               iterated over;
432
433            6. L{MessageSet.add} with a single message or a start and
434               end message satisfies 3 and 4 above.
435        """
436        m1 = MessageSet()
437        m2 = MessageSet()
438
439        self.assertEqual(m1, m2)
440        self.assertNotEqual(m1, ())
441
442        m1 = m1 + 1
443        self.assertEqual(len(m1), 1)
444        self.assertEqual(list(m1), [1])
445
446        m1 = m1 + (1, 3)
447        self.assertEqual(len(m1), 3)
448        self.assertEqual(list(m1), [1, 2, 3])
449
450        m2 = m2 + (1, 3)
451        self.assertEqual(m1, m2)
452        self.assertEqual(list(m1 + m2), [1, 2, 3])
453
454        m1.add(5)
455        self.assertEqual(len(m1), 4)
456        self.assertEqual(list(m1), [1, 2, 3, 5])
457
458        self.assertNotEqual(m1, m2)
459
460        m1.add(6, 8)
461        self.assertEqual(len(m1), 7)
462        self.assertEqual(list(m1), [1, 2, 3, 5, 6, 7, 8])
463
464    def test_lengthWithWildcardRange(self):
465        """
466        A L{MessageSet} that has a range that ends with L{None} raises
467        a L{TypeError} when its length is requested.
468        """
469        self.assertRaises(TypeError, len, MessageSet(1, None))
470
471    def test_reprSanity(self):
472        """
473        L{MessageSet.__repr__} does not raise an exception
474        """
475        repr(MessageSet(1, 2))
476
477    def test_stringRepresentationWithWildcards(self):
478        """
479        In a L{MessageSet}, in the presence of wildcards, if the
480        highest message id is known, the wildcard should get replaced
481        by that high value.
482        """
483        inputs = [
484            imap4.parseIdList(b"*"),
485            imap4.parseIdList(b"1:*"),
486            imap4.parseIdList(b"3:*", 6),
487            imap4.parseIdList(b"*:2", 6),
488        ]
489
490        outputs = [
491            "*",
492            "1:*",
493            "3:6",
494            "2:6",
495        ]
496
497        for i, o in zip(inputs, outputs):
498            self.assertEqual(str(i), o)
499
500    def test_stringRepresentationWithInversion(self):
501        """
502        In a L{MessageSet}, inverting the high and low numbers in a
503        range doesn't affect the meaning of the range.  For example,
504        3:2 displays just like 2:3, because according to the RFC they
505        have the same meaning.
506        """
507        inputs = [
508            imap4.parseIdList(b"2:3"),
509            imap4.parseIdList(b"3:2"),
510        ]
511
512        outputs = [
513            "2:3",
514            "2:3",
515        ]
516
517        for i, o in zip(inputs, outputs):
518            self.assertEqual(str(i), o)
519
520    def test_createWithSingleMessageNumber(self):
521        """
522        Creating a L{MessageSet} with a single message number adds
523        only that message to the L{MessageSet}; its serialized form
524        includes only that message number, its length is one, and it
525        yields only that message number.
526        """
527        m = MessageSet(1)
528        self.assertEqual(str(m), "1")
529        self.assertEqual(len(m), 1)
530        self.assertEqual(list(m), [1])
531
532    def test_createWithSequence(self):
533        """
534        Creating a L{MessageSet} with both a start and end message
535        number adds the sequence between to the L{MessageSet}; its
536        serialized form consists that range, its length is the length
537        of the sequence, and it yields the message numbers inclusively
538        between the start and end.
539        """
540        m = MessageSet(1, 10)
541        self.assertEqual(str(m), "1:10")
542        self.assertEqual(len(m), 10)
543        self.assertEqual(list(m), [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
544
545    def test_createWithSingleWildcard(self):
546        """
547        Creating a L{MessageSet} with a single L{None}, representing
548        C{*}, adds C{*} to the range; its serialized form includes
549        only C{*}, its length is one, but it cannot be iterated over
550        because its endpoint is unknown.
551        """
552        m = MessageSet(None)
553        self.assertEqual(str(m), "*")
554        self.assertEqual(len(m), 1)
555        self.assertRaises(TypeError, list, m)
556
557    def test_setLastSingleWildcard(self):
558        """
559        Setting L{MessageSet.last} replaces L{None}, representing
560        C{*}, with that number, making that L{MessageSet} iterable.
561        """
562        singleMessageReplaced = MessageSet(None)
563        singleMessageReplaced.last = 10
564        self.assertEqual(list(singleMessageReplaced), [10])
565
566        rangeReplaced = MessageSet(3, None)
567        rangeReplaced.last = 1
568        self.assertEqual(list(rangeReplaced), [1, 2, 3])
569
570    def test_setLastWithWildcardRange(self):
571        """
572        Setting L{MessageSet.last} replaces L{None} in all ranges.
573        """
574        m = MessageSet(1, None)
575        m.add(2, None)
576        m.last = 5
577        self.assertEqual(list(m), [1, 2, 3, 4, 5])
578
579    def test_setLastTwiceFails(self):
580        """
581        L{MessageSet.last} cannot be set twice.
582        """
583        m = MessageSet(1, None)
584        m.last = 2
585        with self.assertRaises(ValueError):
586            m.last = 3
587
588    def test_lastOverridesNoneInAdd(self):
589        """
590        Adding a L{None}, representing C{*}, or a sequence that
591        includes L{None} to a L{MessageSet} whose
592        L{last<MessageSet.last>} property has been set replaces all
593        occurrences of L{None} with the value of
594        L{last<MessageSet.last>}.
595        """
596        hasLast = MessageSet(1)
597        hasLast.last = 4
598
599        hasLast.add(None)
600        self.assertEqual(list(hasLast), [1, 4])
601
602        self.assertEqual(list(hasLast + (None, 5)), [1, 4, 5])
603
604        hasLast.add(3, None)
605        self.assertEqual(list(hasLast), [1, 3, 4])
606
607    def test_getLast(self):
608        """
609        Accessing L{MessageSet.last} returns the last value.
610        """
611        m = MessageSet(1, None)
612        m.last = 2
613        self.assertEqual(m.last, 2)
614
615    def test_extend(self):
616        """
617        L{MessageSet.extend} accepts as its arugment an L{int} or
618        L{None}, or a sequence L{int}s or L{None}s of length two, or
619        another L{MessageSet}, combining its argument with its
620        instance's existing ranges.
621        """
622        extendWithInt = MessageSet()
623        extendWithInt.extend(1)
624        self.assertEqual(list(extendWithInt), [1])
625
626        extendWithNone = MessageSet()
627        extendWithNone.extend(None)
628        self.assertEqual(str(extendWithNone), "*")
629
630        extendWithSequenceOfInts = MessageSet()
631        extendWithSequenceOfInts.extend((1, 3))
632        self.assertEqual(list(extendWithSequenceOfInts), [1, 2, 3])
633
634        extendWithSequenceOfNones = MessageSet()
635        extendWithSequenceOfNones.extend((None, None))
636        self.assertEqual(str(extendWithSequenceOfNones), "*")
637
638        extendWithMessageSet = MessageSet()
639        extendWithMessageSet.extend(MessageSet(1, 3))
640        self.assertEqual(list(extendWithMessageSet), [1, 2, 3])
641
642    def test_contains(self):
643        """
644        A L{MessageSet} contains a number if the number falls within
645        one of its ranges, and raises L{TypeError} if any range
646        contains L{None}.
647        """
648        hasFive = MessageSet(1, 7)
649        doesNotHaveFive = MessageSet(1, 4) + MessageSet(6, 7)
650
651        self.assertIn(5, hasFive)
652        self.assertNotIn(5, doesNotHaveFive)
653
654        hasFiveButHasNone = hasFive + None
655        with self.assertRaises(TypeError):
656            5 in hasFiveButHasNone
657
658        hasFiveButHasNoneInSequence = hasFive + (10, 12)
659        hasFiveButHasNoneInSequence.add(8, None)
660        with self.assertRaises(TypeError):
661            5 in hasFiveButHasNoneInSequence
662
663    def test_rangesMerged(self):
664        """
665        Adding a sequence of message numbers to a L{MessageSet} that
666        begins or ends immediately before or after an existing
667        sequence in that L{MessageSet}, or overlaps one, merges the two.
668        """
669
670        mergeAfter = MessageSet(1, 3)
671        mergeBefore = MessageSet(6, 8)
672
673        mergeBetweenSequence = mergeAfter + mergeBefore
674        mergeBetweenNumber = mergeAfter + MessageSet(5, 7)
675
676        self.assertEqual(list(mergeAfter + (2, 4)), [1, 2, 3, 4])
677        self.assertEqual(list(mergeAfter + (3, 5)), [1, 2, 3, 4, 5])
678
679        self.assertEqual(list(mergeBefore + (5, 7)), [5, 6, 7, 8])
680        self.assertEqual(list(mergeBefore + (4, 6)), [4, 5, 6, 7, 8])
681
682        self.assertEqual(list(mergeBetweenSequence + (3, 5)), [1, 2, 3, 4, 5, 6, 7, 8])
683        self.assertEqual(
684            list(mergeBetweenNumber + MessageSet(4)), [1, 2, 3, 4, 5, 6, 7]
685        )
686
687    def test_seq_rangeExamples(self):
688        """
689        Test the C{seq-range} examples from Section 9, "Formal Syntax"
690        of RFC 3501::
691
692            Example: 2:4 and 4:2 are equivalent and indicate values
693                     2, 3, and 4.
694
695            Example: a unique identifier sequence range of
696                     3291:* includes the UID of the last message in
697                     the mailbox, even if that value is less than 3291.
698
699        @see: U{http://tools.ietf.org/html/rfc3501#section-9}
700        """
701
702        self.assertEqual(MessageSet(2, 4), MessageSet(4, 2))
703        self.assertEqual(list(MessageSet(2, 4)), [2, 3, 4])
704
705        m = MessageSet(3291, None)
706        m.last = 3290
707        self.assertEqual(list(m), [3290, 3291])
708
709    def test_sequence_setExamples(self):
710        """
711        Test the C{sequence-set} examples from Section 9, "Formal
712        Syntax" of RFC 3501.  In particular, L{MessageSet} reorders
713        and coalesces overlaps::
714
715            Example: a message sequence number set of
716                     2,4:7,9,12:* for a mailbox with 15 messages is
717                     equivalent to 2,4,5,6,7,9,12,13,14,15
718
719            Example: a message sequence number set of *:4,5:7
720                     for a mailbox with 10 messages is equivalent to
721                     10,9,8,7,6,5,4,5,6,7 and MAY be reordered and
722                     overlap coalesced to be 4,5,6,7,8,9,10.
723
724        @see: U{http://tools.ietf.org/html/rfc3501#section-9}
725        """
726        fromFifteenMessages = (
727            MessageSet(2) + MessageSet(4, 7) + MessageSet(9) + MessageSet(12, None)
728        )
729        fromFifteenMessages.last = 15
730        self.assertEqual(
731            ",".join(str(i) for i in fromFifteenMessages), "2,4,5,6,7,9,12,13,14,15"
732        )
733
734        fromTenMessages = MessageSet(None, 4) + MessageSet(5, 7)
735        fromTenMessages.last = 10
736        self.assertEqual(",".join(str(i) for i in fromTenMessages), "4,5,6,7,8,9,10")
737
738
739class IMAP4HelperTests(TestCase):
740    """
741    Tests for various helper utilities in the IMAP4 module.
742    """
743
744    def test_commandRepr(self):
745        """
746        L{imap4.Command}'s C{repr} does not raise an exception.
747        """
748        repr(imap4.Command(b"COMMAND", [b"arg"], (b"extra")))
749
750    def test_fileProducer(self):
751        b = ((b"x" * 1) + (b"y" * 1) + (b"z" * 1)) * 10
752        c = BufferingConsumer()
753        f = BytesIO(b)
754        p = imap4.FileProducer(f)
755        d = p.beginProducing(c)
756
757        def cbProduced(result):
758            self.failUnlessIdentical(result, p)
759            self.assertEqual(b"{%d}\r\n%b" % (len(b), b), b"".join(c.buffer))
760            return result
761
762        def cbResume(result):
763            # Calling resumeProducing after completion does not raise
764            # an exception
765            p.resumeProducing()
766            return result
767
768        d.addCallback(cbProduced)
769        d.addCallback(cbResume)
770        # The second cbProduced ensures calling resumeProducing after
771        # completion does not change the result.
772        return d.addCallback(cbProduced)
773
774    def test_wildcard(self):
775        cases = [
776            [
777                "foo/%gum/bar",
778                ["foo/bar", "oo/lalagum/bar", "foo/gumx/bar", "foo/gum/baz"],
779                ["foo/xgum/bar", "foo/gum/bar"],
780            ],
781            [
782                "foo/x%x/bar",
783                ["foo", "bar", "fuz fuz fuz", "foo/*/bar", "foo/xyz/bar", "foo/xx/baz"],
784                ["foo/xyx/bar", "foo/xx/bar", "foo/xxxxxxxxxxxxxx/bar"],
785            ],
786            [
787                "foo/xyz*abc/bar",
788                ["foo/xyz/bar", "foo/abc/bar", "foo/xyzab/cbar", "foo/xyza/bcbar"],
789                ["foo/xyzabc/bar", "foo/xyz/abc/bar", "foo/xyz/123/abc/bar"],
790            ],
791        ]
792
793        for (wildcard, fail, succeed) in cases:
794            wildcard = imap4.wildcardToRegexp(wildcard, "/")
795            for x in fail:
796                self.assertFalse(wildcard.match(x))
797            for x in succeed:
798                self.assertTrue(wildcard.match(x))
799
800    def test_wildcardNoDelim(self):
801        cases = [
802            [
803                "foo/%gum/bar",
804                ["foo/bar", "oo/lalagum/bar", "foo/gumx/bar", "foo/gum/baz"],
805                ["foo/xgum/bar", "foo/gum/bar", "foo/x/gum/bar"],
806            ],
807            [
808                "foo/x%x/bar",
809                ["foo", "bar", "fuz fuz fuz", "foo/*/bar", "foo/xyz/bar", "foo/xx/baz"],
810                ["foo/xyx/bar", "foo/xx/bar", "foo/xxxxxxxxxxxxxx/bar", "foo/x/x/bar"],
811            ],
812            [
813                "foo/xyz*abc/bar",
814                ["foo/xyz/bar", "foo/abc/bar", "foo/xyzab/cbar", "foo/xyza/bcbar"],
815                ["foo/xyzabc/bar", "foo/xyz/abc/bar", "foo/xyz/123/abc/bar"],
816            ],
817        ]
818
819        for (wildcard, fail, succeed) in cases:
820            wildcard = imap4.wildcardToRegexp(wildcard, None)
821            for x in fail:
822                self.assertFalse(wildcard.match(x), x)
823            for x in succeed:
824                self.assertTrue(wildcard.match(x), x)
825
826    def test_headerFormatter(self):
827        """
828        L{imap4._formatHeaders} accepts a C{dict} of header name/value pairs and
829        returns a string representing those headers in the standard multiline,
830        C{":"}-separated format.
831        """
832        cases = [
833            (
834                {"Header1": "Value1", "Header2": "Value2"},
835                b"Header2: Value2\r\nHeader1: Value1\r\n",
836            ),
837        ]
838
839        for (input, expected) in cases:
840            output = imap4._formatHeaders(input)
841            self.assertEqual(
842                sorted(output.splitlines(True)), sorted(expected.splitlines(True))
843            )
844
845    def test_quotedSplitter(self):
846        cases = [
847            b"""Hello World""",
848            b'''Hello "World!"''',
849            b'''World "Hello" "How are you?"''',
850            b'''"Hello world" How "are you?"''',
851            b"""foo bar "baz buz" NIL""",
852            b'''foo bar "baz buz" "NIL"''',
853            b"""foo NIL "baz buz" bar""",
854            b"""foo "NIL" "baz buz" bar""",
855            b""""NIL" bar "baz buz" foo""",
856            b'oo \\"oo\\" oo',
857            b'"oo \\"oo\\" oo"',
858            b"oo \t oo",
859            b'"oo \t oo"',
860            b"oo \\t oo",
861            b'"oo \\t oo"',
862            br"oo \o oo",
863            br'"oo \o oo"',
864            b"oo \\o oo",
865            b'"oo \\o oo"',
866        ]
867
868        answers = [
869            [b"Hello", b"World"],
870            [b"Hello", b"World!"],
871            [b"World", b"Hello", b"How are you?"],
872            [b"Hello world", b"How", b"are you?"],
873            [b"foo", b"bar", b"baz buz", None],
874            [b"foo", b"bar", b"baz buz", b"NIL"],
875            [b"foo", None, b"baz buz", b"bar"],
876            [b"foo", b"NIL", b"baz buz", b"bar"],
877            [b"NIL", b"bar", b"baz buz", b"foo"],
878            [b"oo", b'"oo"', b"oo"],
879            [b'oo "oo" oo'],
880            [b"oo", b"oo"],
881            [b"oo \t oo"],
882            [b"oo", b"\\t", b"oo"],
883            [b"oo \\t oo"],
884            [b"oo", br"\o", b"oo"],
885            [br"oo \o oo"],
886            [b"oo", b"\\o", b"oo"],
887            [b"oo \\o oo"],
888        ]
889
890        errors = [
891            b'"mismatched quote',
892            b'mismatched quote"',
893            b'mismatched"quote',
894            b'"oops here is" another"',
895        ]
896
897        for s in errors:
898            self.assertRaises(imap4.MismatchedQuoting, imap4.splitQuoted, s)
899
900        for (case, expected) in zip(cases, answers):
901            self.assertEqual(imap4.splitQuoted(case), expected)
902
903    def test_stringCollapser(self):
904        cases = [
905            [b"a", b"b", b"c", b"d", b"e"],
906            [b"a", b" ", b'"', b"b", b"c", b" ", b'"', b" ", b"d", b"e"],
907            [[b"a", b"b", b"c"], b"d", b"e"],
908            [b"a", [b"b", b"c", b"d"], b"e"],
909            [b"a", b"b", [b"c", b"d", b"e"]],
910            [b'"', b"a", b" ", b'"', [b"b", b"c", b"d"], b'"', b" ", b"e", b'"'],
911            [b"a", [b'"', b" ", b"b", b"c", b" ", b" ", b'"'], b"d", b"e"],
912        ]
913
914        answers = [
915            [b"abcde"],
916            [b"a", b"bc ", b"de"],
917            [[b"abc"], b"de"],
918            [b"a", [b"bcd"], b"e"],
919            [b"ab", [b"cde"]],
920            [b"a ", [b"bcd"], b" e"],
921            [b"a", [b" bc  "], b"de"],
922        ]
923
924        for (case, expected) in zip(cases, answers):
925            self.assertEqual(imap4.collapseStrings(case), expected)
926
927    def test_parenParser(self):
928        s = b"\r\n".join([b"xx"] * 4)
929
930        def check(case, expected):
931            parsed = imap4.parseNestedParens(case)
932            self.assertEqual(parsed, [expected])
933            # XXX This code used to work, but changes occurred within the
934            # imap4.py module which made it no longer necessary for *all* of it
935            # to work.  In particular, only the part that makes
936            # 'BODY.PEEK[HEADER.FIELDS.NOT (Subject Bcc Cc)]' come out
937            # correctly no longer needs to work.  So, I am loathe to delete the
938            # entire section of the test. --exarkun
939
940            # self.assertEqual(b'(' + imap4.collapseNestedLists(parsed) + b')',
941            #                  expected)
942
943        check(
944            b"(BODY.PEEK[HEADER.FIELDS.NOT (subject bcc cc)] {%d}\r\n%b)" % (len(s), s),
945            [b"BODY.PEEK", [b"HEADER.FIELDS.NOT", [b"subject", b"bcc", b"cc"]], s],
946        )
947        check(
948            b'(FLAGS (\\Seen) INTERNALDATE "17-Jul-1996 02:44:25 -0700" '
949            b"RFC822.SIZE 4286 ENVELOPE "
950            b'("Wed, 17 Jul 1996 02:23:25 -0700 (PDT)" '
951            b'"IMAP4rev1 WG mtg summary and minutes" '
952            b'(("Terry Gray" NIL gray cac.washington.edu)) '
953            b'(("Terry Gray" NIL gray cac.washington.edu)) '
954            b'(("Terry Gray" NIL gray cac.washington.edu)) '
955            b"((NIL NIL imap cac.washington.edu)) "
956            b"((NIL NIL minutes CNRI.Reston.VA.US) "
957            b'("John Klensin" NIL KLENSIN INFOODS.MIT.EDU)) NIL NIL '
958            b"<B27397-0100000@cac.washington.edu>) "
959            b"BODY (TEXT PLAIN (CHARSET US-ASCII) NIL NIL 7BIT 3028 92))",
960            [
961                b"FLAGS",
962                [br"\Seen"],
963                b"INTERNALDATE",
964                b"17-Jul-1996 02:44:25 -0700",
965                b"RFC822.SIZE",
966                b"4286",
967                b"ENVELOPE",
968                [
969                    b"Wed, 17 Jul 1996 02:23:25 -0700 (PDT)",
970                    b"IMAP4rev1 WG mtg summary and minutes",
971                    [[b"Terry Gray", None, b"gray", b"cac.washington.edu"]],
972                    [[b"Terry Gray", None, b"gray", b"cac.washington.edu"]],
973                    [[b"Terry Gray", None, b"gray", b"cac.washington.edu"]],
974                    [[None, None, b"imap", b"cac.washington.edu"]],
975                    [
976                        [None, None, b"minutes", b"CNRI.Reston.VA.US"],
977                        [b"John Klensin", None, b"KLENSIN", b"INFOODS.MIT.EDU"],
978                    ],
979                    None,
980                    None,
981                    b"<B27397-0100000@cac.washington.edu>",
982                ],
983                b"BODY",
984                [
985                    b"TEXT",
986                    b"PLAIN",
987                    [b"CHARSET", b"US-ASCII"],
988                    None,
989                    None,
990                    b"7BIT",
991                    b"3028",
992                    b"92",
993                ],
994            ],
995        )
996
997        check(b'("oo \\"oo\\" oo")', [b'oo "oo" oo'])
998        check(b'("oo \\\\ oo")', [b"oo \\\\ oo"])
999        check(b'("oo \\ oo")', [b"oo \\ oo"])
1000
1001        check(b'("oo \\o")', [b"oo \\o"])
1002        check(br'("oo \o")', [br"oo \o"])
1003        check(br"(oo \o)", [b"oo", br"\o"])
1004        check(b"(oo \\o)", [b"oo", b"\\o"])
1005
1006    def test_fetchParserSimple(self):
1007        cases = [
1008            ["ENVELOPE", "Envelope", "envelope"],
1009            ["FLAGS", "Flags", "flags"],
1010            ["INTERNALDATE", "InternalDate", "internaldate"],
1011            ["RFC822.HEADER", "RFC822Header", "rfc822.header"],
1012            ["RFC822.SIZE", "RFC822Size", "rfc822.size"],
1013            ["RFC822.TEXT", "RFC822Text", "rfc822.text"],
1014            ["RFC822", "RFC822", "rfc822"],
1015            ["UID", "UID", "uid"],
1016            ["BODYSTRUCTURE", "BodyStructure", "bodystructure"],
1017        ]
1018
1019        for (inp, outp, asString) in cases:
1020            inp = inp.encode("ascii")
1021            p = imap4._FetchParser()
1022            p.parseString(inp)
1023            self.assertEqual(len(p.result), 1)
1024            self.assertTrue(isinstance(p.result[0], getattr(p, outp)))
1025            self.assertEqual(str(p.result[0]), asString)
1026
1027    def test_fetchParserMacros(self):
1028        cases = [
1029            [b"ALL", (4, [b"flags", b"internaldate", b"rfc822.size", b"envelope"])],
1030            [
1031                b"FULL",
1032                (5, [b"flags", b"internaldate", b"rfc822.size", b"envelope", b"body"]),
1033            ],
1034            [b"FAST", (3, [b"flags", b"internaldate", b"rfc822.size"])],
1035        ]
1036
1037        for (inp, outp) in cases:
1038            p = imap4._FetchParser()
1039            p.parseString(inp)
1040            self.assertEqual(len(p.result), outp[0])
1041            expectedResult = [str(token).lower().encode("ascii") for token in p.result]
1042            expectedResult.sort()
1043            outp[1].sort()
1044            self.assertEqual(expectedResult, outp[1])
1045
1046    def test_fetchParserBody(self):
1047        P = imap4._FetchParser
1048
1049        p = P()
1050        p.parseString(b"BODY")
1051        self.assertEqual(len(p.result), 1)
1052        self.assertTrue(isinstance(p.result[0], p.Body))
1053        self.assertEqual(p.result[0].peek, False)
1054        self.assertEqual(p.result[0].header, None)
1055        self.assertEqual(str(p.result[0]), "BODY")
1056
1057        p = P()
1058        p.parseString(b"BODY.PEEK")
1059        self.assertEqual(len(p.result), 1)
1060        self.assertTrue(isinstance(p.result[0], p.Body))
1061        self.assertEqual(p.result[0].peek, True)
1062        self.assertEqual(str(p.result[0]), "BODY")
1063
1064        p = P()
1065        p.parseString(b"BODY[]")
1066        self.assertEqual(len(p.result), 1)
1067        self.assertTrue(isinstance(p.result[0], p.Body))
1068        self.assertEqual(p.result[0].empty, True)
1069        self.assertEqual(str(p.result[0]), "BODY[]")
1070
1071        p = P()
1072        p.parseString(b"BODY[HEADER]")
1073        self.assertEqual(len(p.result), 1)
1074        self.assertTrue(isinstance(p.result[0], p.Body))
1075        self.assertEqual(p.result[0].peek, False)
1076        self.assertTrue(isinstance(p.result[0].header, p.Header))
1077        self.assertEqual(p.result[0].header.negate, True)
1078        self.assertEqual(p.result[0].header.fields, ())
1079        self.assertEqual(p.result[0].empty, False)
1080        self.assertEqual(str(p.result[0]), "BODY[HEADER]")
1081
1082        p = P()
1083        p.parseString(b"BODY.PEEK[HEADER]")
1084        self.assertEqual(len(p.result), 1)
1085        self.assertTrue(isinstance(p.result[0], p.Body))
1086        self.assertEqual(p.result[0].peek, True)
1087        self.assertTrue(isinstance(p.result[0].header, p.Header))
1088        self.assertEqual(p.result[0].header.negate, True)
1089        self.assertEqual(p.result[0].header.fields, ())
1090        self.assertEqual(p.result[0].empty, False)
1091        self.assertEqual(str(p.result[0]), "BODY[HEADER]")
1092
1093        p = P()
1094        p.parseString(b"BODY[HEADER.FIELDS (Subject Cc Message-Id)]")
1095        self.assertEqual(len(p.result), 1)
1096        self.assertTrue(isinstance(p.result[0], p.Body))
1097        self.assertEqual(p.result[0].peek, False)
1098        self.assertTrue(isinstance(p.result[0].header, p.Header))
1099        self.assertEqual(p.result[0].header.negate, False)
1100        self.assertEqual(p.result[0].header.fields, [b"SUBJECT", b"CC", b"MESSAGE-ID"])
1101        self.assertEqual(p.result[0].empty, False)
1102        self.assertEqual(
1103            bytes(p.result[0]), b"BODY[HEADER.FIELDS (Subject Cc Message-Id)]"
1104        )
1105
1106        p = P()
1107        p.parseString(b"BODY.PEEK[HEADER.FIELDS (Subject Cc Message-Id)]")
1108        self.assertEqual(len(p.result), 1)
1109        self.assertTrue(isinstance(p.result[0], p.Body))
1110        self.assertEqual(p.result[0].peek, True)
1111        self.assertTrue(isinstance(p.result[0].header, p.Header))
1112        self.assertEqual(p.result[0].header.negate, False)
1113        self.assertEqual(p.result[0].header.fields, [b"SUBJECT", b"CC", b"MESSAGE-ID"])
1114        self.assertEqual(p.result[0].empty, False)
1115        self.assertEqual(
1116            bytes(p.result[0]), b"BODY[HEADER.FIELDS (Subject Cc Message-Id)]"
1117        )
1118
1119        p = P()
1120        p.parseString(b"BODY.PEEK[HEADER.FIELDS.NOT (Subject Cc Message-Id)]")
1121        self.assertEqual(len(p.result), 1)
1122        self.assertTrue(isinstance(p.result[0], p.Body))
1123        self.assertEqual(p.result[0].peek, True)
1124        self.assertTrue(isinstance(p.result[0].header, p.Header))
1125        self.assertEqual(p.result[0].header.negate, True)
1126        self.assertEqual(p.result[0].header.fields, [b"SUBJECT", b"CC", b"MESSAGE-ID"])
1127        self.assertEqual(p.result[0].empty, False)
1128        self.assertEqual(
1129            bytes(p.result[0]), b"BODY[HEADER.FIELDS.NOT (Subject Cc Message-Id)]"
1130        )
1131
1132        p = P()
1133        p.parseString(b"BODY[1.MIME]<10.50>")
1134        self.assertEqual(len(p.result), 1)
1135        self.assertTrue(isinstance(p.result[0], p.Body))
1136        self.assertEqual(p.result[0].peek, False)
1137        self.assertTrue(isinstance(p.result[0].mime, p.MIME))
1138        self.assertEqual(p.result[0].part, (0,))
1139        self.assertEqual(p.result[0].partialBegin, 10)
1140        self.assertEqual(p.result[0].partialLength, 50)
1141        self.assertEqual(p.result[0].empty, False)
1142        self.assertEqual(bytes(p.result[0]), b"BODY[1.MIME]<10.50>")
1143
1144        p = P()
1145        p.parseString(
1146            b"BODY.PEEK[1.3.9.11.HEADER.FIELDS.NOT (Message-Id Date)]<103.69>"
1147        )
1148        self.assertEqual(len(p.result), 1)
1149        self.assertTrue(isinstance(p.result[0], p.Body))
1150        self.assertEqual(p.result[0].peek, True)
1151        self.assertTrue(isinstance(p.result[0].header, p.Header))
1152        self.assertEqual(p.result[0].part, (0, 2, 8, 10))
1153        self.assertEqual(p.result[0].header.fields, [b"MESSAGE-ID", b"DATE"])
1154        self.assertEqual(p.result[0].partialBegin, 103)
1155        self.assertEqual(p.result[0].partialLength, 69)
1156        self.assertEqual(p.result[0].empty, False)
1157        self.assertEqual(
1158            bytes(p.result[0]),
1159            b"BODY[1.3.9.11.HEADER.FIELDS.NOT (Message-Id Date)]<103.69>",
1160        )
1161
1162    def test_fetchParserQuotedHeader(self):
1163        """
1164        Parsing a C{BODY} whose C{HEADER} values require quoting
1165        results in a object that perserves that quoting when
1166        serialized.
1167        """
1168        p = imap4._FetchParser()
1169        p.parseString(b"BODY[HEADER.FIELDS ((Quoted)]")
1170        self.assertEqual(len(p.result), 1)
1171        self.assertEqual(p.result[0].peek, False)
1172        self.assertIsInstance(p.result[0], p.Body)
1173        self.assertIsInstance(p.result[0].header, p.Header)
1174        self.assertEqual(bytes(p.result[0]), b'BODY[HEADER.FIELDS ("(Quoted")]')
1175
1176    def test_fetchParserEmptyString(self):
1177        """
1178        Parsing an empty string results in no data.
1179        """
1180        p = imap4._FetchParser()
1181        p.parseString(b"")
1182        self.assertFalse(len(p.result))
1183
1184    def test_fetchParserUnknownAttribute(self):
1185        """
1186        Parsing a string with an unknown attribute raises an
1187        L{Exception}.
1188        """
1189        p = imap4._FetchParser()
1190        self.assertRaises(Exception, p.parseString, b"UNKNOWN")
1191
1192    def test_fetchParserIncompleteStringEndsInWhitespace(self):
1193        """
1194        Parsing a string that prematurely ends in whitespace raises an
1195        L{Exception}.
1196        """
1197        p = imap4._FetchParser()
1198        self.assertRaises(Exception, p.parseString, b"BODY[HEADER.FIELDS  ")
1199
1200    def test_fetchParserExpectedWhitespace(self):
1201        """
1202        Parsing a string that contains an unexpected character rather
1203        than whitespace raises an L{Exception}.
1204        """
1205        p = imap4._FetchParser()
1206        self.assertRaises(Exception, p.parseString, b"BODY[HEADER.FIELDS!]")
1207
1208    def test_fetchParserTextSection(self):
1209        """
1210        A C{BODY} can contain a C{TEXT} section.
1211        """
1212        p = imap4._FetchParser()
1213        p.parseString(b"BODY[TEXT]")
1214        self.assertEqual(len(p.result), 1)
1215        self.assertIsInstance(p.result[0], p.Body)
1216        self.assertEqual(p.result[0].peek, False)
1217        self.assertIsInstance(p.result[0].text, p.Text)
1218        self.assertEqual(bytes(p.result[0]), b"BODY[TEXT]")
1219
1220    def test_fetchParserUnknownSection(self):
1221        """
1222        Parsing a C{BODY} with an unknown section raises an
1223        L{Exception}.
1224        """
1225        p = imap4._FetchParser()
1226        self.assertRaises(Exception, p.parseString, b"BODY[UNKNOWN]")
1227
1228    def test_fetchParserMissingSectionClose(self):
1229        """
1230        Parsing a C{BODY} with an unterminated section list raises an
1231        L{Exception}.
1232        """
1233        p = imap4._FetchParser()
1234        self.assertRaises(Exception, p.parseString, b"BODY[HEADER")
1235        p = imap4._FetchParser()
1236        self.assertRaises(Exception, p.parseString, b"BODY[HEADER.FIELDS (SUBJECT)")
1237
1238    def test_fetchParserHeaderMissingParentheses(self):
1239        """
1240        Parsing a C{BODY} whose C{HEADER.FIELDS} list does not begin
1241        with an open parenthesis (C{(}) or end with a close
1242        parenthesis (C{)}) raises an L{Exception}.
1243        """
1244        p = imap4._FetchParser()
1245        self.assertRaises(Exception, p.parseString, b"BODY[HEADER.FIELDS Missing)]")
1246        p = imap4._FetchParser()
1247        self.assertRaises(Exception, p.parseString, b"BODY[HEADER.FIELDS (Missing]")
1248
1249    def test_fetchParserDotlessPartial(self):
1250        """
1251        Parsing a C{BODY} with a range that lacks a period (C{.})
1252        raises an L{Exception}.
1253        """
1254        p = imap4._FetchParser()
1255        self.assertRaises(Exception, p.parseString, b"BODY<01>")
1256
1257    def test_fetchParserUnclosedPartial(self):
1258        """
1259        Parsing a C{BODY} with a partial range that's missing its
1260        closing greater than sign (C{>}) raises an L{EXCEPTION}.
1261        """
1262        p = imap4._FetchParser()
1263        self.assertRaises(Exception, p.parseString, b"BODY<0")
1264
1265    def test_files(self):
1266        inputStructure = [
1267            "foo",
1268            "bar",
1269            "baz",
1270            BytesIO(b"this is a file\r\n"),
1271            "buz",
1272            "biz",
1273        ]
1274
1275        output = b'"foo" "bar" "baz" {16}\r\nthis is a file\r\n "buz" "biz"'
1276
1277        self.assertEqual(imap4.collapseNestedLists(inputStructure), output)
1278
1279    def test_quoteAvoider(self):
1280        input = [
1281            b"foo",
1282            imap4.DontQuoteMe(b"bar"),
1283            b"baz",
1284            BytesIO(b"this is a file\r\n"),
1285            b"this is\r\nquoted",
1286            imap4.DontQuoteMe(b"buz"),
1287            b"",
1288        ]
1289
1290        output = (
1291            b'"foo" bar "baz"'
1292            b" {16}\r\nthis is a file\r\n "
1293            b"{15}\r\nthis is\r\nquoted"
1294            b' buz ""'
1295        )
1296
1297        self.assertEqual(imap4.collapseNestedLists(input), output)
1298
1299    def test_literals(self):
1300        cases = [
1301            (b"({10}\r\n0123456789)", [[b"0123456789"]]),
1302        ]
1303
1304        for (case, expected) in cases:
1305            self.assertEqual(imap4.parseNestedParens(case), expected)
1306
1307    def test_queryBuilder(self):
1308        inputs = [
1309            imap4.Query(flagged=1),
1310            imap4.Query(sorted=1, unflagged=1, deleted=1),
1311            imap4.Or(imap4.Query(flagged=1), imap4.Query(deleted=1)),
1312            imap4.Query(before="today"),
1313            imap4.Or(imap4.Query(deleted=1), imap4.Query(unseen=1), imap4.Query(new=1)),
1314            imap4.Or(
1315                imap4.Not(
1316                    imap4.Or(
1317                        imap4.Query(sorted=1, since="yesterday", smaller=1000),
1318                        imap4.Query(sorted=1, before="tuesday", larger=10000),
1319                        imap4.Query(sorted=1, unseen=1, deleted=1, before="today"),
1320                        imap4.Not(imap4.Query(subject="spam")),
1321                    ),
1322                ),
1323                imap4.Not(imap4.Query(uid="1:5")),
1324            ),
1325        ]
1326
1327        outputs = [
1328            "FLAGGED",
1329            "(DELETED UNFLAGGED)",
1330            "(OR FLAGGED DELETED)",
1331            '(BEFORE "today")',
1332            "(OR DELETED (OR UNSEEN NEW))",
1333            '(OR (NOT (OR (SINCE "yesterday" SMALLER 1000) '  # Continuing
1334            '(OR (BEFORE "tuesday" LARGER 10000) (OR (BEFORE '  # Some more
1335            '"today" DELETED UNSEEN) (NOT (SUBJECT "spam")))))) '  # And more
1336            "(NOT (UID 1:5)))",
1337        ]
1338
1339        for (query, expected) in zip(inputs, outputs):
1340            self.assertEqual(query, expected)
1341
1342    def test_queryKeywordFlagWithQuotes(self):
1343        """
1344        When passed the C{keyword} argument, L{imap4.Query} returns an unquoted
1345        string.
1346
1347        @see: U{http://tools.ietf.org/html/rfc3501#section-9}
1348        @see: U{http://tools.ietf.org/html/rfc3501#section-6.4.4}
1349        """
1350        query = imap4.Query(keyword="twisted")
1351        self.assertEqual("(KEYWORD twisted)", query)
1352
1353    def test_queryUnkeywordFlagWithQuotes(self):
1354        """
1355        When passed the C{unkeyword} argument, L{imap4.Query} returns an
1356        unquoted string.
1357
1358        @see: U{http://tools.ietf.org/html/rfc3501#section-9}
1359        @see: U{http://tools.ietf.org/html/rfc3501#section-6.4.4}
1360        """
1361        query = imap4.Query(unkeyword="twisted")
1362        self.assertEqual("(UNKEYWORD twisted)", query)
1363
1364    def test_queryWithMesssageSet(self):
1365        """
1366        When passed a L{MessageSet}, L{imap4.Query} returns a query
1367        containing a quoted string representing the ID sequence.
1368        """
1369        query = imap4.Query(messages=imap4.MessageSet(1, None))
1370        self.assertEqual(query, '(MESSAGES "1:*")')
1371
1372    def test_queryWithInteger(self):
1373        """
1374        When passed an L{int}, L{imap4.Query} returns a query
1375        containing a quoted integer.
1376        """
1377        query = imap4.Query(messages=1)
1378        self.assertEqual(query, '(MESSAGES "1")')
1379
1380    def test_queryOrIllegalQuery(self):
1381        """
1382        An L{imap4.Or} query with less than two arguments raises an
1383        L{imap4.IllegalQueryError}.
1384        """
1385        self.assertRaises(imap4.IllegalQueryError, imap4.Or, imap4.Query(messages=1))
1386
1387    def _keywordFilteringTest(self, keyword):
1388        """
1389        Helper to implement tests for value filtering of KEYWORD and UNKEYWORD
1390        queries.
1391
1392        @param keyword: A native string giving the name of the L{imap4.Query}
1393            keyword argument to test.
1394        """
1395        # Check all the printable exclusions
1396        self.assertEqual(
1397            f"({keyword.upper()} twistedrocks)",
1398            imap4.Query(**{keyword: r'twisted (){%*"\] rocks'}),
1399        )
1400
1401        # Check all the non-printable exclusions
1402        self.assertEqual(
1403            f"({keyword.upper()} twistedrocks)",
1404            imap4.Query(
1405                **{
1406                    keyword: "twisted %s rocks"
1407                    % ("".join(chr(ch) for ch in range(33)),)
1408                }
1409            ),
1410        )
1411
1412    def test_queryKeywordFlag(self):
1413        r"""
1414        When passed the C{keyword} argument, L{imap4.Query} returns an
1415        C{atom} that consists of one or more non-special characters.
1416
1417        List of the invalid characters:
1418
1419            ( ) { % * " \ ] CTL SP
1420
1421        @see: U{ABNF definition of CTL and SP<https://tools.ietf.org/html/rfc2234>}
1422        @see: U{IMAP4 grammar<http://tools.ietf.org/html/rfc3501#section-9>}
1423        @see: U{IMAP4 SEARCH specification<http://tools.ietf.org/html/rfc3501#section-6.4.4>}
1424        """
1425        self._keywordFilteringTest("keyword")
1426
1427    def test_queryUnkeywordFlag(self):
1428        r"""
1429        When passed the C{unkeyword} argument, L{imap4.Query} returns an
1430        C{atom} that consists of one or more non-special characters.
1431
1432        List of the invalid characters:
1433
1434            ( ) { % * " \ ] CTL SP
1435
1436        @see: U{ABNF definition of CTL and SP<https://tools.ietf.org/html/rfc2234>}
1437        @see: U{IMAP4 grammar<http://tools.ietf.org/html/rfc3501#section-9>}
1438        @see: U{IMAP4 SEARCH specification<http://tools.ietf.org/html/rfc3501#section-6.4.4>}
1439        """
1440        self._keywordFilteringTest("unkeyword")
1441
1442    def test_invalidIdListParser(self):
1443        """
1444        Trying to parse an invalid representation of a sequence range raises an
1445        L{IllegalIdentifierError}.
1446        """
1447        inputs = [b"*:*", b"foo", b"4:", b"bar:5"]
1448
1449        for input in inputs:
1450            self.assertRaises(
1451                imap4.IllegalIdentifierError, imap4.parseIdList, input, 12345
1452            )
1453
1454    def test_invalidIdListParserNonPositive(self):
1455        """
1456        Zeroes and negative values are not accepted in id range expressions. RFC
1457        3501 states that sequence numbers and sequence ranges consist of
1458        non-negative numbers (RFC 3501 section 9, the seq-number grammar item).
1459        """
1460        inputs = [b"0:5", b"0:0", b"*:0", b"0", b"-3:5", b"1:-2", b"-1"]
1461
1462        for input in inputs:
1463            self.assertRaises(
1464                imap4.IllegalIdentifierError, imap4.parseIdList, input, 12345
1465            )
1466
1467    def test_parseIdList(self):
1468        """
1469        The function to parse sequence ranges yields appropriate L{MessageSet}
1470        objects.
1471        """
1472        inputs = [
1473            b"1:*",
1474            b"5:*",
1475            b"1:2,5:*",
1476            b"*",
1477            b"1",
1478            b"1,2",
1479            b"1,3,5",
1480            b"1:10",
1481            b"1:10,11",
1482            b"1:5,10:20",
1483            b"1,5:10",
1484            b"1,5:10,15:20",
1485            b"1:10,15,20:25",
1486            b"4:2",
1487        ]
1488
1489        outputs = [
1490            MessageSet(1, None),
1491            MessageSet(5, None),
1492            MessageSet(5, None) + MessageSet(1, 2),
1493            MessageSet(None, None),
1494            MessageSet(1),
1495            MessageSet(1, 2),
1496            MessageSet(1) + MessageSet(3) + MessageSet(5),
1497            MessageSet(1, 10),
1498            MessageSet(1, 11),
1499            MessageSet(1, 5) + MessageSet(10, 20),
1500            MessageSet(1) + MessageSet(5, 10),
1501            MessageSet(1) + MessageSet(5, 10) + MessageSet(15, 20),
1502            MessageSet(1, 10) + MessageSet(15) + MessageSet(20, 25),
1503            MessageSet(2, 4),
1504        ]
1505
1506        lengths = [None, None, None, 1, 1, 2, 3, 10, 11, 16, 7, 13, 17, 3]
1507
1508        for (input, expected) in zip(inputs, outputs):
1509            self.assertEqual(imap4.parseIdList(input), expected)
1510
1511        for (input, expected) in zip(inputs, lengths):
1512            if expected is None:
1513                self.assertRaises(TypeError, len, imap4.parseIdList(input))
1514            else:
1515                L = len(imap4.parseIdList(input))
1516                self.assertEqual(L, expected, f"len({input!r}) = {L!r} != {expected!r}")
1517
1518    def test_parseTimeInvalidFormat(self):
1519        """
1520        L{imap4.parseTime} raises L{ValueError} when given a a time
1521        string whose format is invalid.
1522        """
1523        self.assertRaises(ValueError, imap4.parseTime, "invalid")
1524
1525    def test_parseTimeInvalidValues(self):
1526        """
1527        L{imap4.parseTime} raises L{ValueError} when given a time
1528        string composed of invalid values.
1529        """
1530        invalidStrings = [
1531            "invalid-July-2017",
1532            "2-invalid-2017",
1533            "2-July-invalid",
1534        ]
1535        for invalid in invalidStrings:
1536            self.assertRaises(ValueError, imap4.parseTime, invalid)
1537
1538    def test_statusRequestHelper(self):
1539        """
1540        L{imap4.statusRequestHelper} builds a L{dict} mapping the
1541        requested status names to values extracted from the provided
1542        L{IMailboxIMAP}'s.
1543        """
1544        mbox = SimpleMailbox()
1545
1546        expected = {
1547            "MESSAGES": mbox.getMessageCount(),
1548            "RECENT": mbox.getRecentCount(),
1549            "UIDNEXT": mbox.getUIDNext(),
1550            "UIDVALIDITY": mbox.getUIDValidity(),
1551            "UNSEEN": mbox.getUnseenCount(),
1552        }
1553
1554        result = imap4.statusRequestHelper(mbox, expected.keys())
1555
1556        self.assertEqual(expected, result)
1557
1558
1559@implementer(imap4.IMailboxInfo, imap4.IMailbox, imap4.ICloseableMailbox)
1560class SimpleMailbox:
1561    flags = ("\\Flag1", "Flag2", "\\AnotherSysFlag", "LastFlag")
1562    messages: List[Tuple[bytes, list, bytes, int]] = []
1563    mUID = 0
1564    rw = 1
1565    closed = False
1566
1567    def __init__(self):
1568        self.listeners = []
1569        self.addListener = self.listeners.append
1570        self.removeListener = self.listeners.remove
1571
1572    def getFlags(self):
1573        return self.flags
1574
1575    def getUIDValidity(self):
1576        return 42
1577
1578    def getUIDNext(self):
1579        return len(self.messages) + 1
1580
1581    def getMessageCount(self):
1582        return 9
1583
1584    def getRecentCount(self):
1585        return 3
1586
1587    def getUnseenCount(self):
1588        return 4
1589
1590    def isWriteable(self):
1591        return self.rw
1592
1593    def destroy(self):
1594        pass
1595
1596    def getHierarchicalDelimiter(self):
1597        return "/"
1598
1599    def requestStatus(self, names):
1600        r = {}
1601        if "MESSAGES" in names:
1602            r["MESSAGES"] = self.getMessageCount()
1603        if "RECENT" in names:
1604            r["RECENT"] = self.getRecentCount()
1605        if "UIDNEXT" in names:
1606            r["UIDNEXT"] = self.getMessageCount() + 1
1607        if "UIDVALIDITY" in names:
1608            r["UIDVALIDITY"] = self.getUID()
1609        if "UNSEEN" in names:
1610            r["UNSEEN"] = self.getUnseenCount()
1611        return defer.succeed(r)
1612
1613    def addMessage(self, message, flags, date=None):
1614        self.messages.append((message, flags, date, self.mUID))
1615        self.mUID += 1
1616        return defer.succeed(None)
1617
1618    def expunge(self):
1619        delete = []
1620        for i in self.messages:
1621            if "\\Deleted" in i[1]:
1622                delete.append(i)
1623        for i in delete:
1624            self.messages.remove(i)
1625        return [i[3] for i in delete]
1626
1627    def close(self):
1628        self.closed = True
1629
1630    def fetch(self, messages, uid):
1631        # IMailboxIMAP.fetch
1632        pass
1633
1634    def getUID(self, message):
1635        # IMailboxIMAP.getUID
1636        pass
1637
1638    def store(self, messages, flags, mode, uid):
1639        # IMailboxIMAP.store
1640        pass
1641
1642
1643@implementer(imap4.IMailboxInfo, imap4.IMailbox)
1644class UncloseableMailbox:
1645    """
1646    A mailbox that cannot be closed.
1647    """
1648
1649    flags = ("\\Flag1", "Flag2", "\\AnotherSysFlag", "LastFlag")
1650    messages: List[Tuple[bytes, list, bytes, int]] = []
1651    mUID = 0
1652    rw = 1
1653    closed = False
1654
1655    def __init__(self):
1656        self.listeners = []
1657        self.addListener = self.listeners.append
1658        self.removeListener = self.listeners.remove
1659
1660    def getFlags(self):
1661        """
1662        The flags
1663
1664        @return: A sequence of flags.
1665        """
1666        return self.flags
1667
1668    def getUIDValidity(self):
1669        """
1670        The UID validity value.
1671
1672        @return: The value.
1673        """
1674        return 42
1675
1676    def getUIDNext(self):
1677        """
1678        The next UID.
1679
1680        @return: The UID.
1681        """
1682        return len(self.messages) + 1
1683
1684    def getMessageCount(self):
1685        """
1686        The number of messages.
1687
1688        @return: The number.
1689        """
1690        return 9
1691
1692    def getRecentCount(self):
1693        """
1694        The recent messages.
1695
1696        @return: The number.
1697        """
1698        return 3
1699
1700    def getUnseenCount(self):
1701        """
1702        The recent messages.
1703
1704        @return: The number.
1705        """
1706        return 4
1707
1708    def isWriteable(self):
1709        """
1710        The recent messages.
1711
1712        @return: Whether or not the mailbox is writable.
1713        """
1714        return self.rw
1715
1716    def destroy(self):
1717        """
1718        Destroy this mailbox.
1719        """
1720        pass
1721
1722    def getHierarchicalDelimiter(self):
1723        """
1724        Return the hierarchical delimiter.
1725
1726        @return: The delimiter.
1727        """
1728        return "/"
1729
1730    def requestStatus(self, names):
1731        """
1732        Return the mailbox's status.
1733
1734        @param names: The status items to include.
1735
1736        @return: A L{dict} of status data.
1737        """
1738        r = {}
1739        if "MESSAGES" in names:
1740            r["MESSAGES"] = self.getMessageCount()
1741        if "RECENT" in names:
1742            r["RECENT"] = self.getRecentCount()
1743        if "UIDNEXT" in names:
1744            r["UIDNEXT"] = self.getMessageCount() + 1
1745        if "UIDVALIDITY" in names:
1746            r["UIDVALIDITY"] = self.getUID()
1747        if "UNSEEN" in names:
1748            r["UNSEEN"] = self.getUnseenCount()
1749        return defer.succeed(r)
1750
1751    def addMessage(self, message, flags, date=None):
1752        """
1753        Add a message to the mailbox.
1754
1755        @param message: The message body.
1756
1757        @param flags: The message flags.
1758
1759        @param date: The message date.
1760
1761        @return: A L{Deferred} that fires when the message has been
1762            added.
1763        """
1764        self.messages.append((message, flags, date, self.mUID))
1765        self.mUID += 1
1766        return defer.succeed(None)
1767
1768    def expunge(self):
1769        """
1770        Delete messages marked for deletion.
1771
1772        @return: A L{list} of deleted message IDs.
1773        """
1774        delete = []
1775        for i in self.messages:
1776            if "\\Deleted" in i[1]:
1777                delete.append(i)
1778        for i in delete:
1779            self.messages.remove(i)
1780        return [i[3] for i in delete]
1781
1782    def fetch(self, messages, uid):
1783        # IMailboxIMAP.fetch
1784        pass
1785
1786    def getUID(self, message):
1787        # IMailboxIMAP.getUID
1788        pass
1789
1790    def store(self, messages, flags, mode, uid):
1791        # IMailboxIMAP.store
1792        pass
1793
1794
1795class AccountWithoutNamespaces(imap4.MemoryAccountWithoutNamespaces):
1796    """
1797    An in-memory account that does not provide L{INamespacePresenter}.
1798    """
1799
1800    mailboxFactory = SimpleMailbox
1801
1802    def _emptyMailbox(self, name, id):
1803        return self.mailboxFactory()
1804
1805    def select(self, name, rw=1):
1806        mbox = imap4.MemoryAccount.select(self, name)
1807        if mbox is not None:
1808            mbox.rw = rw
1809        return mbox
1810
1811
1812class Account(AccountWithoutNamespaces, imap4.MemoryAccount):
1813    """
1814    An in-memory account that provides L{INamespacePresenter}.
1815    """
1816
1817
1818class SimpleServer(imap4.IMAP4Server):
1819    theAccount = Account(b"testuser")
1820
1821    def __init__(self, *args, **kw):
1822        imap4.IMAP4Server.__init__(self, *args, **kw)
1823        realm = TestRealm(accountHolder=self)
1824        portal = Portal(realm)
1825        c = InMemoryUsernamePasswordDatabaseDontUse()
1826        c.addUser(b"testuser", b"password-test")
1827        self.checker = c
1828        self.portal = portal
1829        portal.registerChecker(c)
1830        self.timeoutTest = False
1831
1832    def lineReceived(self, line):
1833        if self.timeoutTest:
1834            # Do not send a response
1835            return
1836
1837        imap4.IMAP4Server.lineReceived(self, line)
1838
1839
1840class SimpleClient(imap4.IMAP4Client):
1841    def __init__(self, deferred, contextFactory=None):
1842        imap4.IMAP4Client.__init__(self, contextFactory)
1843        self.deferred = deferred
1844        self.events = []
1845
1846    def serverGreeting(self, caps):
1847        self.deferred.callback(None)
1848
1849    def modeChanged(self, writeable):
1850        self.events.append(["modeChanged", writeable])
1851        self.transport.loseConnection()
1852
1853    def flagsChanged(self, newFlags):
1854        self.events.append(["flagsChanged", newFlags])
1855        self.transport.loseConnection()
1856
1857    def newMessages(self, exists, recent):
1858        self.events.append(["newMessages", exists, recent])
1859        self.transport.loseConnection()
1860
1861
1862class IMAP4HelperMixin:
1863
1864    serverCTX: Optional[ServerTLSContext] = None
1865    clientCTX: Optional[ClientTLSContext] = None
1866
1867    def setUp(self):
1868        d = defer.Deferred()
1869        self.server = SimpleServer(contextFactory=self.serverCTX)
1870        self.client = SimpleClient(d, contextFactory=self.clientCTX)
1871        self.connected = d
1872
1873        SimpleMailbox.messages = []
1874        theAccount = Account(b"testuser")
1875        theAccount.mboxType = SimpleMailbox
1876        SimpleServer.theAccount = theAccount
1877
1878    def tearDown(self):
1879        del self.server
1880        del self.client
1881        del self.connected
1882
1883    def _cbStopClient(self, ignore):
1884        self.client.transport.loseConnection()
1885
1886    def _ebGeneral(self, failure):
1887        self.client.transport.loseConnection()
1888        self.server.transport.loseConnection()
1889        log.err(failure, "Problem with " + str(self))
1890
1891    def loopback(self):
1892        return loopback.loopbackAsync(self.server, self.client)
1893
1894    def assertClientFailureMessage(self, failure, expected):
1895        """
1896        Assert that the provided failure is an L{IMAP4Exception} with
1897        the given message.
1898
1899        @param failure: A failure whose value L{IMAP4Exception}
1900        @type failure: L{failure.Failure}
1901
1902        @param expected: The expected failure message.
1903        @type expected: L{bytes}
1904        """
1905        failure.trap(imap4.IMAP4Exception)
1906        message = str(failure.value)
1907        expected = repr(expected)
1908
1909        self.assertEqual(message, expected)
1910
1911
1912class IMAP4ServerTests(IMAP4HelperMixin, TestCase):
1913    def testCapability(self):
1914        caps = {}
1915
1916        def getCaps():
1917            def gotCaps(c):
1918                caps.update(c)
1919                self.server.transport.loseConnection()
1920
1921            return self.client.getCapabilities().addCallback(gotCaps)
1922
1923        d1 = self.connected.addCallback(strip(getCaps)).addErrback(self._ebGeneral)
1924        d = defer.gatherResults([self.loopback(), d1])
1925        expected = {b"IMAP4rev1": None, b"NAMESPACE": None, b"IDLE": None}
1926        return d.addCallback(lambda _: self.assertEqual(expected, caps))
1927
1928    def testCapabilityWithAuth(self):
1929        caps = {}
1930        self.server.challengers[b"CRAM-MD5"] = CramMD5Credentials
1931
1932        def getCaps():
1933            def gotCaps(c):
1934                caps.update(c)
1935                self.server.transport.loseConnection()
1936
1937            return self.client.getCapabilities().addCallback(gotCaps)
1938
1939        d1 = self.connected.addCallback(strip(getCaps)).addErrback(self._ebGeneral)
1940        d = defer.gatherResults([self.loopback(), d1])
1941
1942        expCap = {
1943            b"IMAP4rev1": None,
1944            b"NAMESPACE": None,
1945            b"IDLE": None,
1946            b"AUTH": [b"CRAM-MD5"],
1947        }
1948
1949        return d.addCallback(lambda _: self.assertEqual(expCap, caps))
1950
1951    def testLogout(self):
1952        self.loggedOut = 0
1953
1954        def logout():
1955            def setLoggedOut():
1956                self.loggedOut = 1
1957
1958            self.client.logout().addCallback(strip(setLoggedOut))
1959
1960        self.connected.addCallback(strip(logout)).addErrback(self._ebGeneral)
1961        d = self.loopback()
1962        return d.addCallback(lambda _: self.assertEqual(self.loggedOut, 1))
1963
1964    def testNoop(self):
1965        self.responses = None
1966
1967        def noop():
1968            def setResponses(responses):
1969                self.responses = responses
1970                self.server.transport.loseConnection()
1971
1972            self.client.noop().addCallback(setResponses)
1973
1974        self.connected.addCallback(strip(noop)).addErrback(self._ebGeneral)
1975        d = self.loopback()
1976        return d.addCallback(lambda _: self.assertEqual(self.responses, []))
1977
1978    def testLogin(self):
1979        def login():
1980            d = self.client.login(b"testuser", b"password-test")
1981            d.addCallback(self._cbStopClient)
1982
1983        d1 = self.connected.addCallback(strip(login)).addErrback(self._ebGeneral)
1984        d = defer.gatherResults([d1, self.loopback()])
1985        return d.addCallback(self._cbTestLogin)
1986
1987    def _cbTestLogin(self, ignored):
1988        self.assertEqual(self.server.account, SimpleServer.theAccount)
1989        self.assertEqual(self.server.state, "auth")
1990
1991    def testFailedLogin(self):
1992        def login():
1993            d = self.client.login(b"testuser", b"wrong-password")
1994            d.addBoth(self._cbStopClient)
1995
1996        d1 = self.connected.addCallback(strip(login)).addErrback(self._ebGeneral)
1997        d2 = self.loopback()
1998        d = defer.gatherResults([d1, d2])
1999        return d.addCallback(self._cbTestFailedLogin)
2000
2001    def _cbTestFailedLogin(self, ignored):
2002        self.assertEqual(self.server.account, None)
2003        self.assertEqual(self.server.state, "unauth")
2004
2005    def test_loginWithoutPortal(self):
2006        """
2007        Attempting to log into a server that has no L{Portal} results
2008        in a failed login.
2009        """
2010        self.server.portal = None
2011
2012        def login():
2013            d = self.client.login(b"testuser", b"wrong-password")
2014            d.addBoth(self._cbStopClient)
2015
2016        d1 = self.connected.addCallback(strip(login)).addErrback(self._ebGeneral)
2017        d2 = self.loopback()
2018        d = defer.gatherResults([d1, d2])
2019        return d.addCallback(self._cbTestFailedLogin)
2020
2021    def test_nonIAccountAvatar(self):
2022        """
2023        The server responds with a C{BAD} response when its portal
2024        attempts to log a user in with checker that claims to support
2025        L{IAccount} but returns an an avatar interface that is not
2026        L{IAccount}.
2027        """
2028
2029        def brokenRequestAvatar(*_, **__):
2030            return ("Not IAccount", "Not an account", lambda: None)
2031
2032        self.server.portal.realm.requestAvatar = brokenRequestAvatar
2033
2034        def login():
2035            d = self.client.login(b"testuser", b"password-test")
2036            d.addBoth(self._cbStopClient)
2037
2038        d1 = self.connected.addCallback(strip(login)).addErrback(self._ebGeneral)
2039        d2 = self.loopback()
2040        d = defer.gatherResults([d1, d2])
2041        return d.addCallback(self._cbTestFailedLogin)
2042
2043    def test_loginException(self):
2044        """
2045        Any exception raised by L{IMAP4Server.authenticateLogin} that
2046        is not L{UnauthorizedLogin} is logged results in a C{BAD}
2047        response.
2048        """
2049
2050        class UnexpectedException(Exception):
2051            """
2052            An unexpected exception.
2053            """
2054
2055        def raisesUnexpectedException(user, passwd):
2056            raise UnexpectedException("Whoops")
2057
2058        self.server.authenticateLogin = raisesUnexpectedException
2059
2060        def login():
2061            return self.client.login(b"testuser", b"password-test")
2062
2063        d1 = self.connected.addCallback(strip(login))
2064
2065        d1.addErrback(self.assertClientFailureMessage, b"Server error: Whoops")
2066
2067        @d1.addCallback
2068        def assertErrorLogged(_):
2069            self.assertTrue(self.flushLoggedErrors(UnexpectedException))
2070
2071        d1.addErrback(self._ebGeneral)
2072        d1.addBoth(self._cbStopClient)
2073
2074        d2 = self.loopback()
2075        d = defer.gatherResults([d1, d2])
2076        return d.addCallback(self._cbTestFailedLogin)
2077
2078    def testLoginRequiringQuoting(self):
2079        self.server.checker.users = {b"{test}user": b"{test}password"}
2080
2081        def login():
2082            d = self.client.login(b"{test}user", b"{test}password")
2083            d.addErrback(log.err, "Problem with " + str(self))
2084            d.addCallback(self._cbStopClient)
2085
2086        d1 = self.connected.addCallback(strip(login)).addErrback(self._ebGeneral)
2087        d = defer.gatherResults([self.loopback(), d1])
2088        return d.addCallback(self._cbTestLoginRequiringQuoting)
2089
2090    def _cbTestLoginRequiringQuoting(self, ignored):
2091        self.assertEqual(self.server.account, SimpleServer.theAccount)
2092        self.assertEqual(self.server.state, "auth")
2093
2094    def testNamespace(self):
2095        self.namespaceArgs = None
2096
2097        def login():
2098            return self.client.login(b"testuser", b"password-test")
2099
2100        def namespace():
2101            def gotNamespace(args):
2102                self.namespaceArgs = args
2103                self._cbStopClient(None)
2104
2105            return self.client.namespace().addCallback(gotNamespace)
2106
2107        d1 = self.connected.addCallback(strip(login))
2108        d1.addCallback(strip(namespace))
2109        d1.addErrback(self._ebGeneral)
2110        d2 = self.loopback()
2111        d = defer.gatherResults([d1, d2])
2112
2113        @d.addCallback
2114        def assertAllPairsNativeStrings(ignored):
2115            for namespaces in self.namespaceArgs:
2116                for pair in namespaces:
2117                    for value in pair:
2118                        self.assertIsInstance(value, str)
2119            return self.namespaceArgs
2120
2121        d.addCallback(self.assertEqual, [[["", "/"]], [], []])
2122        return d
2123
2124    def test_mailboxWithoutNamespace(self):
2125        """
2126        A mailbox that does not provide L{INamespacePresenter} returns
2127        empty L{list}s for its personal, shared, and user namespaces.
2128        """
2129        self.server.theAccount = AccountWithoutNamespaces(b"testuser")
2130        self.namespaceArgs = None
2131
2132        def login():
2133            return self.client.login(b"testuser", b"password-test")
2134
2135        def namespace():
2136            def gotNamespace(args):
2137                self.namespaceArgs = args
2138                self._cbStopClient(None)
2139
2140            return self.client.namespace().addCallback(gotNamespace)
2141
2142        d1 = self.connected.addCallback(strip(login))
2143        d1.addCallback(strip(namespace))
2144        d1.addErrback(self._ebGeneral)
2145        d2 = self.loopback()
2146        d = defer.gatherResults([d1, d2])
2147        d.addCallback(lambda _: self.namespaceArgs)
2148        d.addCallback(self.assertEqual, [[], [], []])
2149        return d
2150
2151    def testSelect(self):
2152        SimpleServer.theAccount.addMailbox("test-mailbox")
2153        self.selectedArgs = None
2154
2155        def login():
2156            return self.client.login(b"testuser", b"password-test")
2157
2158        def select():
2159            def selected(args):
2160                self.selectedArgs = args
2161                self._cbStopClient(None)
2162
2163            d = self.client.select("test-mailbox")
2164            d.addCallback(selected)
2165            return d
2166
2167        d1 = self.connected.addCallback(strip(login))
2168        d1.addCallback(strip(select))
2169        d1.addErrback(self._ebGeneral)
2170        d2 = self.loopback()
2171        return defer.gatherResults([d1, d2]).addCallback(self._cbTestSelect)
2172
2173    def test_selectWithoutMailbox(self):
2174        """
2175        A client that selects a mailbox that does not exist receives a
2176        C{NO} response.
2177        """
2178
2179        def login():
2180            return self.client.login(b"testuser", b"password-test")
2181
2182        def select():
2183            return self.client.select("test-mailbox")
2184
2185        self.connected.addCallback(strip(login))
2186        self.connected.addCallback(strip(select))
2187        self.connected.addErrback(self.assertClientFailureMessage, b"No such mailbox")
2188        self.connected.addCallback(self._cbStopClient)
2189        self.connected.addErrback(self._ebGeneral)
2190
2191        connectionComplete = defer.gatherResults([self.connected, self.loopback()])
2192
2193        @connectionComplete.addCallback
2194        def assertNoMailboxSelected(_):
2195            self.assertIsNone(self.server.mbox)
2196
2197        return connectionComplete
2198
2199    def _cbTestSelect(self, ignored):
2200        mbox = SimpleServer.theAccount.mailboxes["TEST-MAILBOX"]
2201        self.assertEqual(self.server.mbox, mbox)
2202        self.assertEqual(
2203            self.selectedArgs,
2204            {
2205                "EXISTS": 9,
2206                "RECENT": 3,
2207                "UIDVALIDITY": 42,
2208                "FLAGS": ("\\Flag1", "Flag2", "\\AnotherSysFlag", "LastFlag"),
2209                "READ-WRITE": True,
2210            },
2211        )
2212
2213    def test_examine(self):
2214        """
2215        L{IMAP4Client.examine} issues an I{EXAMINE} command to the server and
2216        returns a L{Deferred} which fires with a C{dict} with as many of the
2217        following keys as the server includes in its response: C{'FLAGS'},
2218        C{'EXISTS'}, C{'RECENT'}, C{'UNSEEN'}, C{'READ-WRITE'}, C{'READ-ONLY'},
2219        C{'UIDVALIDITY'}, and C{'PERMANENTFLAGS'}.
2220
2221        Unfortunately the server doesn't generate all of these so it's hard to
2222        test the client's handling of them here.  See
2223        L{IMAP4ClientExamineTests} below.
2224
2225        See U{RFC 3501<http://www.faqs.org/rfcs/rfc3501.html>}, section 6.3.2,
2226        for details.
2227        """
2228        SimpleServer.theAccount.addMailbox("test-mailbox")
2229        self.examinedArgs = None
2230
2231        def login():
2232            return self.client.login(b"testuser", b"password-test")
2233
2234        def examine():
2235            def examined(args):
2236                self.examinedArgs = args
2237                self._cbStopClient(None)
2238
2239            d = self.client.examine("test-mailbox")
2240            d.addCallback(examined)
2241            return d
2242
2243        d1 = self.connected.addCallback(strip(login))
2244        d1.addCallback(strip(examine))
2245        d1.addErrback(self._ebGeneral)
2246        d2 = self.loopback()
2247        d = defer.gatherResults([d1, d2])
2248        return d.addCallback(self._cbTestExamine)
2249
2250    def _cbTestExamine(self, ignored):
2251        mbox = SimpleServer.theAccount.mailboxes["TEST-MAILBOX"]
2252        self.assertEqual(self.server.mbox, mbox)
2253        self.assertEqual(
2254            self.examinedArgs,
2255            {
2256                "EXISTS": 9,
2257                "RECENT": 3,
2258                "UIDVALIDITY": 42,
2259                "FLAGS": ("\\Flag1", "Flag2", "\\AnotherSysFlag", "LastFlag"),
2260                "READ-WRITE": False,
2261            },
2262        )
2263
2264    def testCreate(self):
2265        succeed = ("testbox", "test/box", "test/", "test/box/box", "INBOX")
2266        fail = ("testbox", "test/box")
2267
2268        def cb():
2269            self.result.append(1)
2270
2271        def eb(failure):
2272            self.result.append(0)
2273
2274        def login():
2275            return self.client.login(b"testuser", b"password-test")
2276
2277        def create():
2278            for name in succeed + fail:
2279                d = self.client.create(name)
2280                d.addCallback(strip(cb)).addErrback(eb)
2281            d.addCallbacks(self._cbStopClient, self._ebGeneral)
2282
2283        self.result = []
2284        d1 = self.connected.addCallback(strip(login)).addCallback(strip(create))
2285        d2 = self.loopback()
2286        d = defer.gatherResults([d1, d2])
2287        return d.addCallback(self._cbTestCreate, succeed, fail)
2288
2289    def _cbTestCreate(self, ignored, succeed, fail):
2290        self.assertEqual(self.result, [1] * len(succeed) + [0] * len(fail))
2291        mbox = sorted(SimpleServer.theAccount.mailboxes)
2292        answers = sorted(["inbox", "testbox", "test/box", "test", "test/box/box"])
2293        self.assertEqual(mbox, [a.upper() for a in answers])
2294
2295    def testDelete(self):
2296        SimpleServer.theAccount.addMailbox("delete/me")
2297
2298        def login():
2299            return self.client.login(b"testuser", b"password-test")
2300
2301        def delete():
2302            return self.client.delete("delete/me")
2303
2304        d1 = self.connected.addCallback(strip(login))
2305        d1.addCallbacks(strip(delete), self._ebGeneral)
2306        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
2307        d2 = self.loopback()
2308        d = defer.gatherResults([d1, d2])
2309        d.addCallback(
2310            lambda _: self.assertEqual(list(SimpleServer.theAccount.mailboxes), [])
2311        )
2312        return d
2313
2314    def testDeleteWithInferiorHierarchicalNames(self):
2315        """
2316        Attempting to delete a mailbox with hierarchically inferior
2317        names fails with an informative error.
2318
2319        @see: U{https://tools.ietf.org/html/rfc3501#section-6.3.4}
2320
2321        @return: A L{Deferred} with assertions.
2322        """
2323        SimpleServer.theAccount.addMailbox("delete")
2324        SimpleServer.theAccount.addMailbox("delete/me")
2325
2326        def login():
2327            return self.client.login(b"testuser", b"password-test")
2328
2329        def delete():
2330            return self.client.delete("delete")
2331
2332        def assertIMAPException(failure):
2333            failure.trap(imap4.IMAP4Exception)
2334            self.assertEqual(
2335                str(failure.value),
2336                str(b'Name "DELETE" has inferior hierarchical names'),
2337            )
2338
2339        loggedIn = self.connected.addCallback(strip(login))
2340        loggedIn.addCallbacks(strip(delete), self._ebGeneral)
2341        loggedIn.addErrback(assertIMAPException)
2342        loggedIn.addCallbacks(self._cbStopClient)
2343
2344        loopedBack = self.loopback()
2345        d = defer.gatherResults([loggedIn, loopedBack])
2346        d.addCallback(
2347            lambda _: self.assertEqual(
2348                sorted(SimpleServer.theAccount.mailboxes), ["DELETE", "DELETE/ME"]
2349            )
2350        )
2351        return d
2352
2353    def testIllegalInboxDelete(self):
2354        self.stashed = None
2355
2356        def login():
2357            return self.client.login(b"testuser", b"password-test")
2358
2359        def delete():
2360            return self.client.delete("inbox")
2361
2362        def stash(result):
2363            self.stashed = result
2364
2365        d1 = self.connected.addCallback(strip(login))
2366        d1.addCallbacks(strip(delete), self._ebGeneral)
2367        d1.addBoth(stash)
2368        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
2369        d2 = self.loopback()
2370        d = defer.gatherResults([d1, d2])
2371        d.addCallback(
2372            lambda _: self.assertTrue(isinstance(self.stashed, failure.Failure))
2373        )
2374        return d
2375
2376    def testNonExistentDelete(self):
2377        def login():
2378            return self.client.login(b"testuser", b"password-test")
2379
2380        def delete():
2381            return self.client.delete("delete/me")
2382
2383        def deleteFailed(failure):
2384            self.failure = failure
2385
2386        self.failure = None
2387        d1 = self.connected.addCallback(strip(login))
2388        d1.addCallback(strip(delete)).addErrback(deleteFailed)
2389        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
2390        d2 = self.loopback()
2391        d = defer.gatherResults([d1, d2])
2392        d.addCallback(
2393            lambda _: self.assertEqual(str(self.failure.value), str(b"No such mailbox"))
2394        )
2395        return d
2396
2397    def testIllegalDelete(self):
2398        m = SimpleMailbox()
2399        m.flags = (r"\Noselect",)
2400        SimpleServer.theAccount.addMailbox("delete", m)
2401        SimpleServer.theAccount.addMailbox("delete/me")
2402
2403        def login():
2404            return self.client.login(b"testuser", b"password-test")
2405
2406        def delete():
2407            return self.client.delete("delete")
2408
2409        def deleteFailed(failure):
2410            self.failure = failure
2411
2412        self.failure = None
2413        d1 = self.connected.addCallback(strip(login))
2414        d1.addCallback(strip(delete)).addErrback(deleteFailed)
2415        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
2416        d2 = self.loopback()
2417        d = defer.gatherResults([d1, d2])
2418        expected = str(
2419            b"Hierarchically inferior mailboxes exist " b"and \\Noselect is set"
2420        )
2421        d.addCallback(lambda _: self.assertEqual(str(self.failure.value), expected))
2422        return d
2423
2424    def testRename(self):
2425        SimpleServer.theAccount.addMailbox("oldmbox")
2426
2427        def login():
2428            return self.client.login(b"testuser", b"password-test")
2429
2430        def rename():
2431            return self.client.rename(b"oldmbox", b"newname")
2432
2433        d1 = self.connected.addCallback(strip(login))
2434        d1.addCallbacks(strip(rename), self._ebGeneral)
2435        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
2436        d2 = self.loopback()
2437        d = defer.gatherResults([d1, d2])
2438        d.addCallback(
2439            lambda _: self.assertEqual(
2440                list(SimpleServer.theAccount.mailboxes.keys()), ["NEWNAME"]
2441            )
2442        )
2443        return d
2444
2445    def testIllegalInboxRename(self):
2446        self.stashed = None
2447
2448        def login():
2449            return self.client.login(b"testuser", b"password-test")
2450
2451        def rename():
2452            return self.client.rename("inbox", "frotz")
2453
2454        def stash(stuff):
2455            self.stashed = stuff
2456
2457        d1 = self.connected.addCallback(strip(login))
2458        d1.addCallbacks(strip(rename), self._ebGeneral)
2459        d1.addBoth(stash)
2460        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
2461        d2 = self.loopback()
2462        d = defer.gatherResults([d1, d2])
2463        d.addCallback(
2464            lambda _: self.assertTrue(isinstance(self.stashed, failure.Failure))
2465        )
2466        return d
2467
2468    def testHierarchicalRename(self):
2469        SimpleServer.theAccount.create("oldmbox/m1")
2470        SimpleServer.theAccount.create("oldmbox/m2")
2471
2472        def login():
2473            return self.client.login(b"testuser", b"password-test")
2474
2475        def rename():
2476            return self.client.rename("oldmbox", "newname")
2477
2478        d1 = self.connected.addCallback(strip(login))
2479        d1.addCallbacks(strip(rename), self._ebGeneral)
2480        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
2481        d2 = self.loopback()
2482        d = defer.gatherResults([d1, d2])
2483        return d.addCallback(self._cbTestHierarchicalRename)
2484
2485    def _cbTestHierarchicalRename(self, ignored):
2486        mboxes = SimpleServer.theAccount.mailboxes.keys()
2487        expected = ["newname", "newname/m1", "newname/m2"]
2488        mboxes = list(sorted(mboxes))
2489        self.assertEqual(mboxes, [s.upper() for s in expected])
2490
2491    def testSubscribe(self):
2492        def login():
2493            return self.client.login(b"testuser", b"password-test")
2494
2495        def subscribe():
2496            return self.client.subscribe("this/mbox")
2497
2498        d1 = self.connected.addCallback(strip(login))
2499        d1.addCallbacks(strip(subscribe), self._ebGeneral)
2500        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
2501        d2 = self.loopback()
2502        d = defer.gatherResults([d1, d2])
2503        d.addCallback(
2504            lambda _: self.assertEqual(
2505                SimpleServer.theAccount.subscriptions, ["THIS/MBOX"]
2506            )
2507        )
2508        return d
2509
2510    def testUnsubscribe(self):
2511        SimpleServer.theAccount.subscriptions = ["THIS/MBOX", "THAT/MBOX"]
2512
2513        def login():
2514            return self.client.login(b"testuser", b"password-test")
2515
2516        def unsubscribe():
2517            return self.client.unsubscribe("this/mbox")
2518
2519        d1 = self.connected.addCallback(strip(login))
2520        d1.addCallbacks(strip(unsubscribe), self._ebGeneral)
2521        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
2522        d2 = self.loopback()
2523        d = defer.gatherResults([d1, d2])
2524        d.addCallback(
2525            lambda _: self.assertEqual(
2526                SimpleServer.theAccount.subscriptions, ["THAT/MBOX"]
2527            )
2528        )
2529        return d
2530
2531    def _listSetup(self, f):
2532        SimpleServer.theAccount.addMailbox("root/subthing")
2533        SimpleServer.theAccount.addMailbox("root/another-thing")
2534        SimpleServer.theAccount.addMailbox("non-root/subthing")
2535
2536        def login():
2537            return self.client.login(b"testuser", b"password-test")
2538
2539        def listed(answers):
2540            self.listed = answers
2541
2542        self.listed = None
2543        d1 = self.connected.addCallback(strip(login))
2544        d1.addCallbacks(strip(f), self._ebGeneral)
2545        d1.addCallbacks(listed, self._ebGeneral)
2546        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
2547        d2 = self.loopback()
2548        return defer.gatherResults([d1, d2]).addCallback(lambda _: self.listed)
2549
2550    def assertListDelimiterAndMailboxAreStrings(self, results):
2551        """
2552        Assert a C{LIST} response's delimiter and mailbox are native
2553        strings.
2554
2555        @param results: A list of tuples as returned by
2556            L{IMAP4Client.list} or L{IMAP4Client.lsub}.
2557        """
2558        for result in results:
2559            self.assertIsInstance(result[1], str, "delimiter %r is not a str")
2560            self.assertIsInstance(result[2], str, "mailbox %r is not a str")
2561        return results
2562
2563    def testList(self):
2564        def mailboxList():
2565            return self.client.list("root", "%")
2566
2567        d = self._listSetup(mailboxList)
2568
2569        @d.addCallback
2570        def assertListContents(listed):
2571            expectedContents = [
2572                (sorted(SimpleMailbox.flags), "/", "ROOT/SUBTHING"),
2573                (sorted(SimpleMailbox.flags), "/", "ROOT/ANOTHER-THING"),
2574            ]
2575
2576            for _ in range(2):
2577                flags, delimiter, mailbox = listed.pop(0)
2578                self.assertIn(
2579                    (sorted(flags), delimiter, mailbox),
2580                    expectedContents,
2581                )
2582
2583            self.assertFalse(listed, f"More results than expected: {listed!r}")
2584
2585        return d
2586
2587    def testLSub(self):
2588        SimpleServer.theAccount.subscribe("ROOT/SUBTHING")
2589
2590        def lsub():
2591            return self.client.lsub("root", "%")
2592
2593        d = self._listSetup(lsub)
2594        d.addCallback(self.assertListDelimiterAndMailboxAreStrings)
2595        d.addCallback(self.assertEqual, [(SimpleMailbox.flags, "/", "ROOT/SUBTHING")])
2596        return d
2597
2598    def testStatus(self):
2599        SimpleServer.theAccount.addMailbox("root/subthing")
2600
2601        def login():
2602            return self.client.login(b"testuser", b"password-test")
2603
2604        def status():
2605            return self.client.status("root/subthing", "MESSAGES", "UIDNEXT", "UNSEEN")
2606
2607        def statused(result):
2608            self.statused = result
2609
2610        self.statused = None
2611        d1 = self.connected.addCallback(strip(login))
2612        d1.addCallbacks(strip(status), self._ebGeneral)
2613        d1.addCallbacks(statused, self._ebGeneral)
2614        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
2615        d2 = self.loopback()
2616        d = defer.gatherResults([d1, d2])
2617        d.addCallback(
2618            lambda _: self.assertEqual(
2619                self.statused, {"MESSAGES": 9, "UIDNEXT": b"10", "UNSEEN": 4}
2620            )
2621        )
2622        return d
2623
2624    def testFailedStatus(self):
2625        def login():
2626            return self.client.login(b"testuser", b"password-test")
2627
2628        def status():
2629            return self.client.status(
2630                "root/nonexistent", "MESSAGES", "UIDNEXT", "UNSEEN"
2631            )
2632
2633        def statused(result):
2634            self.statused = result
2635
2636        def failed(failure):
2637            self.failure = failure
2638
2639        self.statused = self.failure = None
2640        d1 = self.connected.addCallback(strip(login))
2641        d1.addCallbacks(strip(status), self._ebGeneral)
2642        d1.addCallbacks(statused, failed)
2643        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
2644        d2 = self.loopback()
2645        return defer.gatherResults([d1, d2]).addCallback(self._cbTestFailedStatus)
2646
2647    def _cbTestFailedStatus(self, ignored):
2648        self.assertEqual(self.statused, None)
2649        self.assertEqual(self.failure.value.args, (b"Could not open mailbox",))
2650
2651    def testFullAppend(self):
2652        infile = util.sibpath(__file__, "rfc822.message")
2653        SimpleServer.theAccount.addMailbox("root/subthing")
2654
2655        def login():
2656            return self.client.login(b"testuser", b"password-test")
2657
2658        @defer.inlineCallbacks
2659        def append():
2660            with open(infile, "rb") as message:
2661                result = yield self.client.append(
2662                    "root/subthing",
2663                    message,
2664                    ("\\SEEN", "\\DELETED"),
2665                    "Tue, 17 Jun 2003 11:22:16 -0600 (MDT)",
2666                )
2667                defer.returnValue(result)
2668
2669        d1 = self.connected.addCallback(strip(login))
2670        d1.addCallbacks(strip(append), self._ebGeneral)
2671        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
2672        d2 = self.loopback()
2673
2674        d = defer.gatherResults([d1, d2])
2675
2676        return d.addCallback(self._cbTestFullAppend, infile)
2677
2678    def _cbTestFullAppend(self, ignored, infile):
2679        mb = SimpleServer.theAccount.mailboxes["ROOT/SUBTHING"]
2680        self.assertEqual(1, len(mb.messages))
2681        self.assertEqual(
2682            (["\\SEEN", "\\DELETED"], b"Tue, 17 Jun 2003 11:22:16 -0600 (MDT)", 0),
2683            mb.messages[0][1:],
2684        )
2685        with open(infile, "rb") as f:
2686            self.assertEqual(f.read(), mb.messages[0][0].getvalue())
2687
2688    def testPartialAppend(self):
2689        infile = util.sibpath(__file__, "rfc822.message")
2690        SimpleServer.theAccount.addMailbox("PARTIAL/SUBTHING")
2691
2692        def login():
2693            return self.client.login(b"testuser", b"password-test")
2694
2695        @defer.inlineCallbacks
2696        def append():
2697            with open(infile, "rb") as message:
2698                result = yield self.client.sendCommand(
2699                    imap4.Command(
2700                        b"APPEND",
2701                        # Using networkString is cheating!  In this
2702                        # particular case the mailbox name happens to
2703                        # be ASCII.  In real code, the mailbox would
2704                        # be encoded with imap4-utf-7.
2705                        networkString(
2706                            "PARTIAL/SUBTHING "
2707                            '(\\SEEN) "Right now" '
2708                            "{%d}" % (os.path.getsize(infile),)
2709                        ),
2710                        (),
2711                        self.client._IMAP4Client__cbContinueAppend,
2712                        message,
2713                    )
2714                )
2715                defer.returnValue(result)
2716
2717        d1 = self.connected.addCallback(strip(login))
2718        d1.addCallbacks(strip(append), self._ebGeneral)
2719        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
2720        d2 = self.loopback()
2721        d = defer.gatherResults([d1, d2])
2722        return d.addCallback(self._cbTestPartialAppend, infile)
2723
2724    def _cbTestPartialAppend(self, ignored, infile):
2725        mb = SimpleServer.theAccount.mailboxes["PARTIAL/SUBTHING"]
2726        self.assertEqual(1, len(mb.messages))
2727        self.assertEqual((["\\SEEN"], b"Right now", 0), mb.messages[0][1:])
2728        with open(infile, "rb") as f:
2729            self.assertEqual(f.read(), mb.messages[0][0].getvalue())
2730
2731    def _testCheck(self):
2732        SimpleServer.theAccount.addMailbox(b"root/subthing")
2733
2734        def login():
2735            return self.client.login(b"testuser", b"password-test")
2736
2737        def select():
2738            return self.client.select(b"root/subthing")
2739
2740        def check():
2741            return self.client.check()
2742
2743        d = self.connected.addCallback(strip(login))
2744        d.addCallbacks(strip(select), self._ebGeneral)
2745        d.addCallbacks(strip(check), self._ebGeneral)
2746        d.addCallbacks(self._cbStopClient, self._ebGeneral)
2747        return self.loopback()
2748
2749    def test_check(self):
2750        """
2751        Trigger the L{imap.IMAP4Server._cbSelectWork} callback
2752        by selecting an mbox.
2753        """
2754        return self._testCheck()
2755
2756    def test_checkFail(self):
2757        """
2758        Trigger the L{imap.IMAP4Server._ebSelectWork} errback
2759        by failing when we select an mbox.
2760        """
2761
2762        def failSelect(self, name, rw=1):
2763            raise imap4.IllegalMailboxEncoding("encoding")
2764
2765        def checkResponse(ignore):
2766            failures = self.flushLoggedErrors()
2767            self.assertEqual(failures[1].value.args[0], b"SELECT failed: Server error")
2768
2769        self.patch(Account, "select", failSelect)
2770        d = self._testCheck()
2771        return d.addCallback(checkResponse)
2772
2773    def testClose(self):
2774        m = SimpleMailbox()
2775        m.messages = [
2776            (b"Message 1", ("\\Deleted", "AnotherFlag"), None, 0),
2777            (b"Message 2", ("AnotherFlag",), None, 1),
2778            (b"Message 3", ("\\Deleted",), None, 2),
2779        ]
2780        SimpleServer.theAccount.addMailbox("mailbox", m)
2781
2782        def login():
2783            return self.client.login(b"testuser", b"password-test")
2784
2785        def select():
2786            return self.client.select(b"mailbox")
2787
2788        def close():
2789            return self.client.close()
2790
2791        d = self.connected.addCallback(strip(login))
2792        d.addCallbacks(strip(select), self._ebGeneral)
2793        d.addCallbacks(strip(close), self._ebGeneral)
2794        d.addCallbacks(self._cbStopClient, self._ebGeneral)
2795        d2 = self.loopback()
2796        return defer.gatherResults([d, d2]).addCallback(self._cbTestClose, m)
2797
2798    def _cbTestClose(self, ignored, m):
2799        self.assertEqual(len(m.messages), 1)
2800        self.assertEqual(m.messages[0], (b"Message 2", ("AnotherFlag",), None, 1))
2801        self.assertTrue(m.closed)
2802
2803    def testExpunge(self):
2804        m = SimpleMailbox()
2805        m.messages = [
2806            (b"Message 1", ("\\Deleted", "AnotherFlag"), None, 0),
2807            (b"Message 2", ("AnotherFlag",), None, 1),
2808            (b"Message 3", ("\\Deleted",), None, 2),
2809        ]
2810        SimpleServer.theAccount.addMailbox("mailbox", m)
2811
2812        def login():
2813            return self.client.login(b"testuser", b"password-test")
2814
2815        def select():
2816            return self.client.select("mailbox")
2817
2818        def expunge():
2819            return self.client.expunge()
2820
2821        def expunged(results):
2822            self.assertFalse(self.server.mbox is None)
2823            self.results = results
2824
2825        self.results = None
2826        d1 = self.connected.addCallback(strip(login))
2827        d1.addCallbacks(strip(select), self._ebGeneral)
2828        d1.addCallbacks(strip(expunge), self._ebGeneral)
2829        d1.addCallbacks(expunged, self._ebGeneral)
2830        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
2831        d2 = self.loopback()
2832        d = defer.gatherResults([d1, d2])
2833        return d.addCallback(self._cbTestExpunge, m)
2834
2835    def _cbTestExpunge(self, ignored, m):
2836        self.assertEqual(len(m.messages), 1)
2837        self.assertEqual(m.messages[0], (b"Message 2", ("AnotherFlag",), None, 1))
2838
2839        self.assertEqual(self.results, [0, 2])
2840
2841
2842class IMAP4ServerParsingTests(SynchronousTestCase):
2843    """
2844    Test L{imap4.IMAP4Server}'s command parsing.
2845    """
2846
2847    def setUp(self):
2848        self.transport = StringTransport()
2849        self.server = imap4.IMAP4Server()
2850        self.server.makeConnection(self.transport)
2851        self.transport.clear()
2852
2853    def tearDown(self):
2854        self.server.connectionLost(failure.Failure(error.ConnectionDone()))
2855
2856    def test_parseMethodExceptionLogged(self):
2857        """
2858        L{imap4.IMAP4Server} logs exceptions raised by parse methods.
2859        """
2860
2861        class UnhandledException(Exception):
2862            """
2863            An unhandled exception.
2864            """
2865
2866        def raisesValueError(line):
2867            raise UnhandledException
2868
2869        self.server.parseState = "command"
2870        self.server.parse_command = raisesValueError
2871
2872        self.server.lineReceived(b"invalid")
2873
2874        self.assertTrue(self.flushLoggedErrors(UnhandledException))
2875
2876    def test_missingCommand(self):
2877        """
2878        L{imap4.IMAP4Server.parse_command} sends a C{BAD} response to
2879        a line that includes a tag but no command.
2880        """
2881        self.server.parse_command(b"001")
2882
2883        self.assertEqual(self.transport.value(), b"001 BAD Missing command\r\n")
2884
2885        self.server.connectionLost(
2886            failure.Failure(error.ConnectionDone("Done")),
2887        )
2888
2889    def test_emptyLine(self):
2890        """
2891        L{imap4.IMAP4Server.parse_command} sends a C{BAD} response to
2892        an empty line.
2893        """
2894        self.server.parse_command(b"")
2895
2896        self.assertEqual(self.transport.value(), b"* BAD Null command\r\n")
2897
2898    def assertParseExceptionResponse(self, exception, tag, expectedResponse):
2899        """
2900        Assert that the given exception results in the expected
2901        response.
2902
2903        @param exception: The exception to raise.
2904        @type exception: L{Exception}
2905
2906        @param tag: The IMAP tag.
2907
2908        @type: L{bytes}
2909
2910        @param expectedResponse: The expected bad response.
2911        @type expectedResponse: L{bytes}
2912        """
2913
2914        def raises(tag, cmd, rest):
2915            raise exception
2916
2917        self.server.dispatchCommand = raises
2918
2919        self.server.parse_command(b" ".join([tag, b"invalid"]))
2920
2921        self.assertEqual(self.transport.value(), b" ".join([tag, expectedResponse]))
2922
2923    def test_parsingRaisesIllegalClientResponse(self):
2924        """
2925        When a parsing method raises L{IllegalClientResponse}, the
2926        server sends a C{BAD} response.
2927        """
2928        self.assertParseExceptionResponse(
2929            imap4.IllegalClientResponse("client response"),
2930            b"001",
2931            b"BAD Illegal syntax: client response\r\n",
2932        )
2933
2934    def test_parsingRaisesIllegalOperationResponse(self):
2935        """
2936        When a parsing method raises L{IllegalOperation}, the server
2937        sends a C{NO} response.
2938        """
2939        self.assertParseExceptionResponse(
2940            imap4.IllegalOperation("operation"),
2941            b"001",
2942            b"NO Illegal operation: operation\r\n",
2943        )
2944
2945    def test_parsingRaisesIllegalMailboxEncoding(self):
2946        """
2947        When a parsing method raises L{IllegalMailboxEncoding}, the
2948        server sends a C{NO} response.
2949        """
2950        self.assertParseExceptionResponse(
2951            imap4.IllegalMailboxEncoding("encoding"),
2952            b"001",
2953            b"NO Illegal mailbox name: encoding\r\n",
2954        )
2955
2956    def test_unsupportedCommand(self):
2957        """
2958        L{imap4.IMAP4Server} responds to an unsupported command with a
2959        C{BAD} response.
2960        """
2961        self.server.lineReceived(b"001 HULLABALOO")
2962        self.assertEqual(self.transport.value(), b"001 BAD Unsupported command\r\n")
2963
2964    def test_tooManyArgumentsForCommand(self):
2965        """
2966        L{imap4.IMAP4Server} responds with a C{BAD} response to a
2967        command with more arguments than expected.
2968        """
2969        self.server.lineReceived(b"001 LOGIN A B C")
2970        self.assertEqual(
2971            self.transport.value(),
2972            (
2973                b"001 BAD Illegal syntax:"
2974                + b" Too many arguments for command: "
2975                + repr(b"C").encode("utf-8")
2976                + b"\r\n"
2977            ),
2978        )
2979
2980    def assertCommandExceptionResponse(self, exception, tag, expectedResponse):
2981        """
2982        Assert that the given exception results in the expected
2983        response.
2984
2985        @param exception: The exception to raise.
2986        @type exception: L{Exception}
2987
2988        @param: The IMAP tag.
2989
2990        @type: L{bytes}
2991
2992        @param expectedResponse: The expected bad response.
2993        @type expectedResponse: L{bytes}
2994        """
2995
2996        def raises(serverInstance, tag, user, passwd):
2997            raise exception
2998
2999        self.assertEqual(self.server.state, "unauth")
3000
3001        self.server.unauth_LOGIN = (raises,) + self.server.unauth_LOGIN[1:]
3002
3003        self.server.dispatchCommand(tag, b"LOGIN", b"user passwd")
3004
3005        self.assertEqual(self.transport.value(), b" ".join([tag, expectedResponse]))
3006
3007    def test_commandRaisesIllegalClientResponse(self):
3008        """
3009        When a command raises L{IllegalClientResponse}, the
3010        server sends a C{BAD} response.
3011        """
3012        self.assertCommandExceptionResponse(
3013            imap4.IllegalClientResponse("client response"),
3014            b"001",
3015            b"BAD Illegal syntax: client response\r\n",
3016        )
3017
3018    def test_commandRaisesIllegalOperationResponse(self):
3019        """
3020        When a command raises L{IllegalOperation}, the server sends a
3021        C{NO} response.
3022        """
3023        self.assertCommandExceptionResponse(
3024            imap4.IllegalOperation("operation"),
3025            b"001",
3026            b"NO Illegal operation: operation\r\n",
3027        )
3028
3029    def test_commandRaisesIllegalMailboxEncoding(self):
3030        """
3031        When a command raises L{IllegalMailboxEncoding}, the server
3032        sends a C{NO} response.
3033        """
3034        self.assertCommandExceptionResponse(
3035            imap4.IllegalMailboxEncoding("encoding"),
3036            b"001",
3037            b"NO Illegal mailbox name: encoding\r\n",
3038        )
3039
3040    def test_commandRaisesUnhandledException(self):
3041        """
3042        Wehn a command raises an unhandled exception, the server sends
3043        a C{BAD} response and logs the exception.
3044        """
3045
3046        class UnhandledException(Exception):
3047            """
3048            An unhandled exception.
3049            """
3050
3051        self.assertCommandExceptionResponse(
3052            UnhandledException("unhandled"),
3053            b"001",
3054            b"BAD Server error: unhandled\r\n",
3055        )
3056
3057        self.assertTrue(self.flushLoggedErrors(UnhandledException))
3058
3059    def test_stringLiteralTooLong(self):
3060        """
3061        A string literal whose length exceeds the maximum allowed
3062        length results in a C{BAD} response.
3063        """
3064        self.server._literalStringLimit = 4
3065        self.server.lineReceived(b"001 LOGIN {5}\r\n")
3066
3067        self.assertEqual(
3068            self.transport.value(),
3069            b"001 BAD Illegal syntax: Literal too long!"
3070            b" I accept at most 4 octets\r\n",
3071        )
3072
3073    def test_arg_astringEmptyLine(self):
3074        """
3075        An empty string argument raises L{imap4.IllegalClientResponse}.
3076        """
3077        for empty in [b"", b"\r\n", b" "]:
3078            self.assertRaises(
3079                imap4.IllegalClientResponse, self.server.arg_astring, empty
3080            )
3081
3082    def test_arg_astringUnmatchedQuotes(self):
3083        """
3084        An unmatched quote in a string argument raises
3085        L{imap4.IllegalClientResponse}.
3086        """
3087        self.assertRaises(
3088            imap4.IllegalClientResponse, self.server.arg_astring, b'"open'
3089        )
3090
3091    def test_arg_astringUnmatchedLiteralBraces(self):
3092        """
3093        An unmatched brace in a string literal's size raises
3094        L{imap4.IllegalClientResponse}.
3095        """
3096        self.assertRaises(imap4.IllegalClientResponse, self.server.arg_astring, b"{0")
3097
3098    def test_arg_astringInvalidLiteralSize(self):
3099        """
3100        A non-integral string literal size raises
3101        L{imap4.IllegalClientResponse}.
3102        """
3103        self.assertRaises(
3104            imap4.IllegalClientResponse, self.server.arg_astring, b"{[object Object]}"
3105        )
3106
3107    def test_arg_atomEmptyLine(self):
3108        """
3109        An empty atom raises L{IllegalClientResponse}.
3110        """
3111        self.assertRaises(imap4.IllegalClientResponse, self.server.arg_atom, b"")
3112
3113    def test_arg_atomMalformedAtom(self):
3114        """
3115        A malformed atom raises L{IllegalClientResponse}.
3116        """
3117        self.assertRaises(
3118            imap4.IllegalClientResponse, self.server.arg_atom, b" not an atom "
3119        )
3120
3121    def test_arg_plistEmptyLine(self):
3122        """
3123        An empty parenthesized list raises L{IllegalClientResponse}.
3124        """
3125        self.assertRaises(imap4.IllegalClientResponse, self.server.arg_plist, b"")
3126
3127    def test_arg_plistUnmatchedParentheses(self):
3128        """
3129        A parenthesized with unmatched parentheses raises
3130        L{IllegalClientResponse}.
3131        """
3132        self.assertRaises(imap4.IllegalClientResponse, self.server.arg_plist, b"(foo")
3133        self.assertRaises(imap4.IllegalClientResponse, self.server.arg_plist, b"foo)")
3134
3135    def test_arg_literalEmptyLine(self):
3136        """
3137        An empty file literal raises L{IllegalClientResponse}.
3138        """
3139        self.assertRaises(imap4.IllegalClientResponse, self.server.arg_literal, b"")
3140
3141    def test_arg_literalUnmatchedBraces(self):
3142        """
3143        A literal with unmatched braces raises
3144        L{IllegalClientResponse}.
3145        """
3146        self.assertRaises(imap4.IllegalClientResponse, self.server.arg_literal, b"{10")
3147        self.assertRaises(imap4.IllegalClientResponse, self.server.arg_literal, b"10}")
3148
3149    def test_arg_literalInvalidLiteralSize(self):
3150        """
3151        A non-integral literal size raises
3152        L{imap4.IllegalClientResponse}.
3153        """
3154        self.assertRaises(
3155            imap4.IllegalClientResponse, self.server.arg_literal, b"{[object Object]}"
3156        )
3157
3158    def test_arg_seqsetReturnsRest(self):
3159        """
3160        A sequence set returns the unparsed portion of a line.
3161        """
3162        sequence = b"1:* blah blah blah"
3163        _, rest = self.server.arg_seqset(sequence)
3164        self.assertEqual(rest, b"blah blah blah")
3165
3166    def test_arg_seqsetInvalidSequence(self):
3167        """
3168        An invalid sequence raises L{imap4.IllegalClientResponse}.
3169        """
3170        self.assertRaises(imap4.IllegalClientResponse, self.server.arg_seqset, b"x:y")
3171
3172    def test_arg_flaglistOneFlag(self):
3173        """
3174        A single flag that is not contained in a list is parsed.
3175        """
3176        flag = b"flag"
3177        parsed, rest = self.server.arg_flaglist(flag)
3178        self.assertEqual(parsed, [flag])
3179        self.assertFalse(rest)
3180
3181    def test_arg_flaglistMismatchedParentehses(self):
3182        """
3183        A list of flags with unmatched parentheses raises
3184        L{imap4.IllegalClientResponse}.
3185        """
3186        self.assertRaises(
3187            imap4.IllegalClientResponse,
3188            self.server.arg_flaglist,
3189            b"(invalid",
3190        )
3191
3192    def test_arg_flaglistMalformedFlag(self):
3193        """
3194        A list of flags that contains a malformed flag raises
3195        L{imap4.IllegalClientResponse}.
3196        """
3197        self.assertRaises(
3198            imap4.IllegalClientResponse, self.server.arg_flaglist, b"(first \x00)"
3199        )
3200        self.assertRaises(
3201            imap4.IllegalClientResponse, self.server.arg_flaglist, b"(first \x00second)"
3202        )
3203
3204    def test_opt_plistMissingOpenParenthesis(self):
3205        """
3206        A line that does not begin with an open parenthesis (C{(}) is
3207        parsed as L{None}, and the remainder is the whole line.
3208        """
3209        line = b"not ("
3210        plist, remainder = self.server.opt_plist(line)
3211        self.assertIsNone(plist)
3212        self.assertEqual(remainder, line)
3213
3214    def test_opt_datetimeMissingOpenQuote(self):
3215        """
3216        A line that does not begin with a double quote (C{"}) is
3217        parsed as L{None}, and the remainder is the whole line.
3218        """
3219        line = b'not "'
3220        dt, remainder = self.server.opt_datetime(line)
3221        self.assertIsNone(dt)
3222        self.assertEqual(remainder, line)
3223
3224    def test_opt_datetimeMissingCloseQuote(self):
3225        """
3226        A line that does not have a closing double quote (C{"}) raises
3227        L{imap4.IllegalClientResponse}.
3228        """
3229        line = b'"21-Jul-2017 19:37:07 -0700'
3230        self.assertRaises(imap4.IllegalClientResponse, self.server.opt_datetime, line)
3231
3232    def test_opt_charsetMissingIdentifier(self):
3233        """
3234        A line that contains C{CHARSET} but no character set
3235        identifier raises L{imap4.IllegalClientResponse}.
3236        """
3237        line = b"CHARSET"
3238        self.assertRaises(imap4.IllegalClientResponse, self.server.opt_charset, line)
3239
3240    def test_opt_charsetEndOfLine(self):
3241        """
3242        A line that ends with a C{CHARSET} identifier is parsed as
3243        that identifier, and the remainder is the empty string.
3244        """
3245        line = b"CHARSET UTF-8"
3246        identifier, remainder = self.server.opt_charset(line)
3247        self.assertEqual(identifier, b"UTF-8")
3248        self.assertEqual(remainder, b"")
3249
3250    def test_opt_charsetWithRemainder(self):
3251        """
3252        A line that has additional data after a C{CHARSET} identifier
3253        is parsed as that identifier, and the remainder is that
3254        additional data.
3255        """
3256        line = b"CHARSET UTF-8 remainder"
3257        identifier, remainder = self.server.opt_charset(line)
3258        self.assertEqual(identifier, b"UTF-8")
3259        self.assertEqual(remainder, b"remainder")
3260
3261
3262class IMAP4ServerSearchTests(IMAP4HelperMixin, TestCase):
3263    """
3264    Tests for the behavior of the search_* functions in L{imap4.IMAP4Server}.
3265    """
3266
3267    def setUp(self):
3268        IMAP4HelperMixin.setUp(self)
3269        self.earlierQuery = ["10-Dec-2009"]
3270        self.sameDateQuery = ["13-Dec-2009"]
3271        self.laterQuery = ["16-Dec-2009"]
3272        self.seq = 0
3273        self.msg = FakeyMessage(
3274            {"date": "Mon, 13 Dec 2009 21:25:10 GMT"},
3275            [],
3276            "13 Dec 2009 00:00:00 GMT",
3277            "",
3278            1234,
3279            None,
3280        )
3281
3282    def test_searchSentBefore(self):
3283        """
3284        L{imap4.IMAP4Server.search_SENTBEFORE} returns True if the message date
3285        is earlier than the query date.
3286        """
3287        self.assertFalse(
3288            self.server.search_SENTBEFORE(self.earlierQuery, self.seq, self.msg)
3289        )
3290        self.assertTrue(
3291            self.server.search_SENTBEFORE(self.laterQuery, self.seq, self.msg)
3292        )
3293
3294    def test_searchWildcard(self):
3295        """
3296        L{imap4.IMAP4Server.search_UID} returns True if the message UID is in
3297        the search range.
3298        """
3299        self.assertFalse(
3300            self.server.search_UID([b"2:3"], self.seq, self.msg, (1, 1234))
3301        )
3302        # 2:* should get translated to 2:<max UID> and then to 1:2
3303        self.assertTrue(self.server.search_UID([b"2:*"], self.seq, self.msg, (1, 1234)))
3304        self.assertTrue(self.server.search_UID([b"*"], self.seq, self.msg, (1, 1234)))
3305
3306    def test_searchWildcardHigh(self):
3307        """
3308        L{imap4.IMAP4Server.search_UID} should return True if there is a
3309        wildcard, because a wildcard means "highest UID in the mailbox".
3310        """
3311        self.assertTrue(
3312            self.server.search_UID([b"1235:*"], self.seq, self.msg, (1234, 1))
3313        )
3314
3315    def test_reversedSearchTerms(self):
3316        """
3317        L{imap4.IMAP4Server.search_SENTON} returns True if the message date is
3318        the same as the query date.
3319        """
3320        msgset = imap4.parseIdList(b"4:2")
3321        self.assertEqual(list(msgset), [2, 3, 4])
3322
3323    def test_searchSentOn(self):
3324        """
3325        L{imap4.IMAP4Server.search_SENTON} returns True if the message date is
3326        the same as the query date.
3327        """
3328        self.assertFalse(
3329            self.server.search_SENTON(self.earlierQuery, self.seq, self.msg)
3330        )
3331        self.assertTrue(
3332            self.server.search_SENTON(self.sameDateQuery, self.seq, self.msg)
3333        )
3334        self.assertFalse(self.server.search_SENTON(self.laterQuery, self.seq, self.msg))
3335
3336    def test_searchSentSince(self):
3337        """
3338        L{imap4.IMAP4Server.search_SENTSINCE} returns True if the message date
3339        is later than the query date.
3340        """
3341        self.assertTrue(
3342            self.server.search_SENTSINCE(self.earlierQuery, self.seq, self.msg)
3343        )
3344        self.assertFalse(
3345            self.server.search_SENTSINCE(self.laterQuery, self.seq, self.msg)
3346        )
3347
3348    def test_searchOr(self):
3349        """
3350        L{imap4.IMAP4Server.search_OR} returns true if either of the two
3351        expressions supplied to it returns true and returns false if neither
3352        does.
3353        """
3354        self.assertTrue(
3355            self.server.search_OR(
3356                ["SENTSINCE"] + self.earlierQuery + ["SENTSINCE"] + self.laterQuery,
3357                self.seq,
3358                self.msg,
3359                (None, None),
3360            )
3361        )
3362        self.assertTrue(
3363            self.server.search_OR(
3364                ["SENTSINCE"] + self.laterQuery + ["SENTSINCE"] + self.earlierQuery,
3365                self.seq,
3366                self.msg,
3367                (None, None),
3368            )
3369        )
3370        self.assertFalse(
3371            self.server.search_OR(
3372                ["SENTON"] + self.laterQuery + ["SENTSINCE"] + self.laterQuery,
3373                self.seq,
3374                self.msg,
3375                (None, None),
3376            )
3377        )
3378
3379    def test_searchNot(self):
3380        """
3381        L{imap4.IMAP4Server.search_NOT} returns the negation of the result
3382        of the expression supplied to it.
3383        """
3384        self.assertFalse(
3385            self.server.search_NOT(
3386                ["SENTSINCE"] + self.earlierQuery, self.seq, self.msg, (None, None)
3387            )
3388        )
3389        self.assertTrue(
3390            self.server.search_NOT(
3391                ["SENTON"] + self.laterQuery, self.seq, self.msg, (None, None)
3392            )
3393        )
3394
3395    def test_searchBefore(self):
3396        """
3397        L{imap4.IMAP4Server.search_BEFORE} returns True if the
3398        internal message date is before the query date.
3399        """
3400        self.assertFalse(
3401            self.server.search_BEFORE(self.earlierQuery, self.seq, self.msg)
3402        )
3403        self.assertFalse(
3404            self.server.search_BEFORE(self.sameDateQuery, self.seq, self.msg)
3405        )
3406        self.assertTrue(self.server.search_BEFORE(self.laterQuery, self.seq, self.msg))
3407
3408    def test_searchOn(self):
3409        """
3410        L{imap4.IMAP4Server.search_ON} returns True if the
3411        internal message date is the same as the query date.
3412        """
3413        self.assertFalse(self.server.search_ON(self.earlierQuery, self.seq, self.msg))
3414        self.assertFalse(self.server.search_ON(self.sameDateQuery, self.seq, self.msg))
3415        self.assertFalse(self.server.search_ON(self.laterQuery, self.seq, self.msg))
3416
3417    def test_searchSince(self):
3418        """
3419        L{imap4.IMAP4Server.search_SINCE} returns True if the
3420        internal message date is greater than the query date.
3421        """
3422        self.assertTrue(self.server.search_SINCE(self.earlierQuery, self.seq, self.msg))
3423        self.assertTrue(
3424            self.server.search_SINCE(self.sameDateQuery, self.seq, self.msg)
3425        )
3426        self.assertFalse(self.server.search_SINCE(self.laterQuery, self.seq, self.msg))
3427
3428
3429@implementer(IRealm)
3430class TestRealm:
3431    """
3432    A L{IRealm} for tests.
3433
3434    @cvar theAccount: An C{Account} instance.  Tests can set this to
3435        ensure predictable account retrieval.
3436    """
3437
3438    theAccount = None
3439
3440    def __init__(self, accountHolder=None):
3441        """
3442        Create a realm for testing.
3443
3444        @param accountHolder: (optional) An object whose C{theAccount}
3445            attribute will be returned instead of
3446            L{TestRealm.theAccount}.  Attribute access occurs on every
3447            avatar request, so any modifications to
3448            C{accountHolder.theAccount} will be reflected here.
3449        """
3450        if accountHolder:
3451            self._getAccount = lambda: accountHolder.theAccount
3452        else:
3453            self._getAccount = lambda: self.theAccount
3454
3455    def requestAvatar(self, avatarId, mind, *interfaces):
3456        return imap4.IAccount, self._getAccount(), lambda: None
3457
3458
3459class TestChecker:
3460    credentialInterfaces = (IUsernameHashedPassword, IUsernamePassword)
3461
3462    users = {b"testuser": b"secret"}
3463
3464    def requestAvatarId(self, credentials):
3465        if credentials.username in self.users:
3466            return defer.maybeDeferred(
3467                credentials.checkPassword, self.users[credentials.username]
3468            ).addCallback(self._cbCheck, credentials.username)
3469
3470    def _cbCheck(self, result, username):
3471        if result:
3472            return username
3473        raise UnauthorizedLogin()
3474
3475
3476class AuthenticatorTests(IMAP4HelperMixin, TestCase):
3477    def setUp(self):
3478        IMAP4HelperMixin.setUp(self)
3479
3480        realm = TestRealm()
3481        realm.theAccount = Account(b"testuser")
3482        self.portal = Portal(realm)
3483        self.portal.registerChecker(TestChecker())
3484        self.server.portal = self.portal
3485
3486        self.authenticated = 0
3487        self.account = realm.theAccount
3488
3489    def test_customChallengers(self):
3490        """
3491        L{imap4.IMAP4Server} accepts a L{dict} mapping challenge type
3492        names to L{twisted.mail.interfaces.IChallengeResponse}
3493        providers.
3494        """
3495
3496        @implementer(IChallengeResponse, IUsernamePassword)
3497        class SPECIALAuth:
3498            def getChallenge(self):
3499                return b"SPECIAL"
3500
3501            def setResponse(self, response):
3502                self.username, self.password = response.split(None, 1)
3503
3504            def moreChallenges(self):
3505                return False
3506
3507            def checkPassword(self, password):
3508                self.password = self.password
3509
3510        special = SPECIALAuth()
3511        verifyObject(IChallengeResponse, special)
3512
3513        server = imap4.IMAP4Server({b"SPECIAL": SPECIALAuth})
3514        server.portal = self.portal
3515
3516        transport = StringTransport()
3517        server.makeConnection(transport)
3518        self.addCleanup(server.connectionLost, error.ConnectionDone("Connection done."))
3519
3520        self.assertIn(b"AUTH=SPECIAL", transport.value())
3521
3522        transport.clear()
3523        server.dataReceived(b"001 AUTHENTICATE SPECIAL\r\n")
3524
3525        self.assertIn(base64.b64encode(special.getChallenge()), transport.value())
3526
3527        transport.clear()
3528        server.dataReceived(base64.b64encode(b"username password") + b"\r\n")
3529
3530        self.assertEqual(transport.value(), b"001 OK Authentication successful\r\n")
3531
3532    def test_unsupportedMethod(self):
3533        """
3534        An unsupported C{AUTHENTICATE} method results in a negative
3535        response.
3536        """
3537        server = imap4.IMAP4Server()
3538        server.portal = self.portal
3539
3540        transport = StringTransport()
3541        server.makeConnection(transport)
3542        self.addCleanup(server.connectionLost, error.ConnectionDone("Connection done."))
3543
3544        transport.clear()
3545
3546        server.dataReceived(b"001 AUTHENTICATE UNKNOWN\r\n")
3547        self.assertEqual(
3548            transport.value(), b"001 NO AUTHENTICATE method unsupported\r\n"
3549        )
3550
3551    def test_missingPortal(self):
3552        """
3553        An L{imap4.IMAP4Server} that is missing a L{Portal} responds
3554        negatively to an authentication
3555        """
3556        self.server.challengers[b"LOGIN"] = imap4.LOGINCredentials
3557
3558        cAuth = imap4.LOGINAuthenticator(b"testuser")
3559        self.client.registerAuthenticator(cAuth)
3560
3561        self.server.portal = None
3562
3563        def auth():
3564            return self.client.authenticate(b"secret")
3565
3566        d = self.connected.addCallback(strip(auth))
3567        d.addErrback(
3568            self.assertClientFailureMessage, b"Temporary authentication failure"
3569        )
3570        d.addCallbacks(self._cbStopClient, self._ebGeneral)
3571
3572        return defer.gatherResults([d, self.loopback()])
3573
3574    def test_challengerRaisesException(self):
3575        """
3576        When a challenger's
3577        L{getChallenge<IChallengeResponse.getChallenge>} method raises
3578        any exception, a C{NO} response is sent.
3579        """
3580
3581        @implementer(IChallengeResponse)
3582        class ValueErrorAuthChallenge:
3583            message = b"A challenge failure"
3584
3585            def getChallenge(self):
3586                raise ValueError(self.message)
3587
3588            def setResponse(self, response):
3589                """
3590                Never called.
3591
3592                @param response: See L{IChallengeResponse.setResponse}
3593                """
3594
3595            def moreChallenges(self):
3596                """
3597                Never called.
3598                """
3599
3600        @implementer(IClientAuthentication)
3601        class ValueErrorAuthenticator:
3602            def getName(self):
3603                return b"ERROR"
3604
3605            def challengeResponse(self, secret, chal):
3606                return b"IGNORED"
3607
3608        bad = ValueErrorAuthChallenge()
3609        verifyObject(IChallengeResponse, bad)
3610
3611        self.server.challengers[b"ERROR"] = ValueErrorAuthChallenge
3612        self.client.registerAuthenticator(ValueErrorAuthenticator())
3613
3614        def auth():
3615            return self.client.authenticate(b"secret")
3616
3617        d = self.connected.addCallback(strip(auth))
3618        d.addErrback(
3619            self.assertClientFailureMessage,
3620            ("Server error: " + str(ValueErrorAuthChallenge.message)).encode("ascii"),
3621        )
3622        d.addCallbacks(self._cbStopClient, self._ebGeneral)
3623
3624        return defer.gatherResults([d, self.loopback()])
3625
3626    def test_authNotBase64(self):
3627        """
3628        A client that responds with a challenge that cannot be decoded
3629        as Base 64 receives an L{IllegalClientResponse}.
3630        """
3631
3632        @implementer(IChallengeResponse)
3633        class NotBase64AuthChallenge:
3634            message = b"Malformed Response - not base64"
3635
3636            def getChallenge(self):
3637                return b"SomeChallenge"
3638
3639            def setResponse(self, response):
3640                """
3641                Never called.
3642
3643                @param response: See L{IChallengeResponse.setResponse}
3644                """
3645
3646            def moreChallenges(self):
3647                """
3648                Never called.
3649                """
3650
3651        notBase64 = NotBase64AuthChallenge()
3652        verifyObject(IChallengeResponse, notBase64)
3653
3654        server = imap4.IMAP4Server()
3655        server.portal = self.portal
3656        server.challengers[b"NOTBASE64"] = NotBase64AuthChallenge
3657
3658        transport = StringTransport()
3659        server.makeConnection(transport)
3660        self.addCleanup(server.connectionLost, error.ConnectionDone("Connection done."))
3661
3662        self.assertIn(b"AUTH=NOTBASE64", transport.value())
3663
3664        transport.clear()
3665        server.dataReceived(b"001 AUTHENTICATE NOTBASE64\r\n")
3666
3667        self.assertIn(base64.b64encode(notBase64.getChallenge()), transport.value())
3668
3669        transport.clear()
3670        server.dataReceived(b"\x00 Not base64\r\n")
3671
3672        self.assertEqual(
3673            transport.value(),
3674            b"".join([b"001 NO Authentication failed: ", notBase64.message, b"\r\n"]),
3675        )
3676
3677    def test_unhandledCredentials(self):
3678        """
3679        A challenger that causes the login to fail
3680        L{UnhandledCredentials} results in an C{NO} response.
3681
3682        @return: A L{Deferred} that fires when the authorization has
3683            failed.
3684        """
3685        realm = TestRealm()
3686        portal = Portal(realm)
3687        # This portal has no checkers, so all logins will fail with
3688        # UnhandledCredentials
3689        self.server.portal = portal
3690
3691        self.server.challengers[b"LOGIN"] = loginCred = imap4.LOGINCredentials
3692
3693        verifyClass(IChallengeResponse, loginCred)
3694
3695        cAuth = imap4.LOGINAuthenticator(b"testuser")
3696        self.client.registerAuthenticator(cAuth)
3697
3698        def auth():
3699            return self.client.authenticate(b"secret")
3700
3701        d1 = self.connected.addCallback(strip(auth))
3702        d1.addErrback(
3703            self.assertClientFailureMessage,
3704            b"Authentication failed: server misconfigured",
3705        )
3706        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
3707        d = defer.gatherResults([self.loopback(), d1])
3708        return d
3709
3710    def test_unexpectedLoginFailure(self):
3711        """
3712        If the portal raises an exception other than
3713        L{UnauthorizedLogin} or L{UnhandledCredentials}, the server
3714        responds with a C{BAD} response and the exception is logged.
3715        """
3716
3717        class UnexpectedException(Exception):
3718            """
3719            An unexpected exception.
3720            """
3721
3722        class FailingChecker:
3723            """
3724            A credentials checker whose L{requestAvatarId} method
3725            raises L{UnexpectedException}.
3726            """
3727
3728            credentialInterfaces = (IUsernameHashedPassword, IUsernamePassword)
3729
3730            def requestAvatarId(self, credentials):
3731                raise UnexpectedException("Unexpected error.")
3732
3733        realm = TestRealm()
3734        portal = Portal(realm)
3735        portal.registerChecker(FailingChecker())
3736        self.server.portal = portal
3737
3738        self.server.challengers[b"LOGIN"] = loginCred = imap4.LOGINCredentials
3739
3740        verifyClass(IChallengeResponse, loginCred)
3741
3742        cAuth = imap4.LOGINAuthenticator(b"testuser")
3743        self.client.registerAuthenticator(cAuth)
3744
3745        def auth():
3746            return self.client.authenticate(b"secret")
3747
3748        def assertUnexpectedExceptionLogged():
3749            self.assertTrue(self.flushLoggedErrors(UnexpectedException))
3750
3751        d1 = self.connected.addCallback(strip(auth))
3752        d1.addErrback(
3753            self.assertClientFailureMessage, b"Server error: login failed unexpectedly"
3754        )
3755        d1.addCallback(strip(assertUnexpectedExceptionLogged))
3756        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
3757        d = defer.gatherResults([self.loopback(), d1])
3758        return d
3759
3760    def testCramMD5(self):
3761        self.server.challengers[b"CRAM-MD5"] = CramMD5Credentials
3762        cAuth = imap4.CramMD5ClientAuthenticator(b"testuser")
3763        self.client.registerAuthenticator(cAuth)
3764
3765        def auth():
3766            return self.client.authenticate(b"secret")
3767
3768        def authed():
3769            self.authenticated = 1
3770
3771        d1 = self.connected.addCallback(strip(auth))
3772        d1.addCallbacks(strip(authed), self._ebGeneral)
3773        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
3774        d2 = self.loopback()
3775        d = defer.gatherResults([d1, d2])
3776        return d.addCallback(self._cbTestCramMD5)
3777
3778    def _cbTestCramMD5(self, ignored):
3779        self.assertEqual(self.authenticated, 1)
3780        self.assertEqual(self.server.account, self.account)
3781
3782    def testFailedCramMD5(self):
3783        self.server.challengers[b"CRAM-MD5"] = CramMD5Credentials
3784        cAuth = imap4.CramMD5ClientAuthenticator(b"testuser")
3785        self.client.registerAuthenticator(cAuth)
3786
3787        def misauth():
3788            return self.client.authenticate(b"not the secret")
3789
3790        def authed():
3791            self.authenticated = 1
3792
3793        def misauthed():
3794            self.authenticated = -1
3795
3796        d1 = self.connected.addCallback(strip(misauth))
3797        d1.addCallbacks(strip(authed), strip(misauthed))
3798        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
3799        d = defer.gatherResults([self.loopback(), d1])
3800        return d.addCallback(self._cbTestFailedCramMD5)
3801
3802    def _cbTestFailedCramMD5(self, ignored):
3803        self.assertEqual(self.authenticated, -1)
3804        self.assertEqual(self.server.account, None)
3805
3806    def testLOGIN(self):
3807        self.server.challengers[b"LOGIN"] = loginCred = imap4.LOGINCredentials
3808
3809        verifyClass(IChallengeResponse, loginCred)
3810
3811        cAuth = imap4.LOGINAuthenticator(b"testuser")
3812        self.client.registerAuthenticator(cAuth)
3813
3814        def auth():
3815            return self.client.authenticate(b"secret")
3816
3817        def authed():
3818            self.authenticated = 1
3819
3820        d1 = self.connected.addCallback(strip(auth))
3821        d1.addCallbacks(strip(authed), self._ebGeneral)
3822        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
3823        d = defer.gatherResults([self.loopback(), d1])
3824        return d.addCallback(self._cbTestLOGIN)
3825
3826    def _cbTestLOGIN(self, ignored):
3827        self.assertEqual(self.authenticated, 1)
3828        self.assertEqual(self.server.account, self.account)
3829
3830    def testFailedLOGIN(self):
3831        self.server.challengers[b"LOGIN"] = imap4.LOGINCredentials
3832        cAuth = imap4.LOGINAuthenticator(b"testuser")
3833        self.client.registerAuthenticator(cAuth)
3834
3835        def misauth():
3836            return self.client.authenticate(b"not the secret")
3837
3838        def authed():
3839            self.authenticated = 1
3840
3841        def misauthed():
3842            self.authenticated = -1
3843
3844        d1 = self.connected.addCallback(strip(misauth))
3845        d1.addCallbacks(strip(authed), strip(misauthed))
3846        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
3847        d = defer.gatherResults([self.loopback(), d1])
3848        return d.addCallback(self._cbTestFailedLOGIN)
3849
3850    def _cbTestFailedLOGIN(self, ignored):
3851        self.assertEqual(self.authenticated, -1)
3852        self.assertEqual(self.server.account, None)
3853
3854    def testPLAIN(self):
3855        self.server.challengers[b"PLAIN"] = plainCred = imap4.PLAINCredentials
3856
3857        verifyClass(IChallengeResponse, plainCred)
3858
3859        cAuth = imap4.PLAINAuthenticator(b"testuser")
3860        self.client.registerAuthenticator(cAuth)
3861
3862        def auth():
3863            return self.client.authenticate(b"secret")
3864
3865        def authed():
3866            self.authenticated = 1
3867
3868        d1 = self.connected.addCallback(strip(auth))
3869        d1.addCallbacks(strip(authed), self._ebGeneral)
3870        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
3871        d = defer.gatherResults([self.loopback(), d1])
3872        return d.addCallback(self._cbTestPLAIN)
3873
3874    def _cbTestPLAIN(self, ignored):
3875        self.assertEqual(self.authenticated, 1)
3876        self.assertEqual(self.server.account, self.account)
3877
3878    def testFailedPLAIN(self):
3879        self.server.challengers[b"PLAIN"] = imap4.PLAINCredentials
3880        cAuth = imap4.PLAINAuthenticator(b"testuser")
3881        self.client.registerAuthenticator(cAuth)
3882
3883        def misauth():
3884            return self.client.authenticate(b"not the secret")
3885
3886        def authed():
3887            self.authenticated = 1
3888
3889        def misauthed():
3890            self.authenticated = -1
3891
3892        d1 = self.connected.addCallback(strip(misauth))
3893        d1.addCallbacks(strip(authed), strip(misauthed))
3894        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
3895        d = defer.gatherResults([self.loopback(), d1])
3896        return d.addCallback(self._cbTestFailedPLAIN)
3897
3898    def _cbTestFailedPLAIN(self, ignored):
3899        self.assertEqual(self.authenticated, -1)
3900        self.assertEqual(self.server.account, None)
3901
3902
3903class SASLPLAINTests(TestCase):
3904    """
3905    Tests for I{SASL PLAIN} authentication, as implemented by
3906    L{imap4.PLAINAuthenticator} and L{imap4.PLAINCredentials}.
3907
3908    @see: U{http://www.faqs.org/rfcs/rfc2595.html}
3909    @see: U{http://www.faqs.org/rfcs/rfc4616.html}
3910    """
3911
3912    def test_authenticatorChallengeResponse(self):
3913        """
3914        L{PLAINAuthenticator.challengeResponse} returns challenge strings of
3915        the form::
3916
3917            NUL<authn-id>NUL<secret>
3918        """
3919        username = b"testuser"
3920        secret = b"secret"
3921        chal = b"challenge"
3922        cAuth = imap4.PLAINAuthenticator(username)
3923        response = cAuth.challengeResponse(secret, chal)
3924        self.assertEqual(response, b"\0" + username + b"\0" + secret)
3925
3926    def test_credentialsSetResponse(self):
3927        """
3928        L{PLAINCredentials.setResponse} parses challenge strings of the
3929        form::
3930
3931            NUL<authn-id>NUL<secret>
3932        """
3933        cred = imap4.PLAINCredentials()
3934        cred.setResponse(b"\0testuser\0secret")
3935        self.assertEqual(cred.username, b"testuser")
3936        self.assertEqual(cred.password, b"secret")
3937
3938    def test_credentialsInvalidResponse(self):
3939        """
3940        L{PLAINCredentials.setResponse} raises L{imap4.IllegalClientResponse}
3941        when passed a string not of the expected form.
3942        """
3943        cred = imap4.PLAINCredentials()
3944        self.assertRaises(imap4.IllegalClientResponse, cred.setResponse, b"hello")
3945        self.assertRaises(
3946            imap4.IllegalClientResponse, cred.setResponse, b"hello\0world"
3947        )
3948        self.assertRaises(
3949            imap4.IllegalClientResponse, cred.setResponse, b"hello\0world\0Zoom!\0"
3950        )
3951
3952
3953class UnsolicitedResponseTests(IMAP4HelperMixin, TestCase):
3954    def testReadWrite(self):
3955        def login():
3956            return self.client.login(b"testuser", b"password-test")
3957
3958        def loggedIn():
3959            self.server.modeChanged(1)
3960
3961        d1 = self.connected.addCallback(strip(login))
3962        d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
3963        d = defer.gatherResults([self.loopback(), d1])
3964        return d.addCallback(self._cbTestReadWrite)
3965
3966    def _cbTestReadWrite(self, ignored):
3967        E = self.client.events
3968        self.assertEqual(E, [["modeChanged", 1]])
3969
3970    def testReadOnly(self):
3971        def login():
3972            return self.client.login(b"testuser", b"password-test")
3973
3974        def loggedIn():
3975            self.server.modeChanged(0)
3976
3977        d1 = self.connected.addCallback(strip(login))
3978        d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
3979        d = defer.gatherResults([self.loopback(), d1])
3980        return d.addCallback(self._cbTestReadOnly)
3981
3982    def _cbTestReadOnly(self, ignored):
3983        E = self.client.events
3984        self.assertEqual(E, [["modeChanged", 0]])
3985
3986    def testFlagChange(self):
3987        flags = {1: ["\\Answered", "\\Deleted"], 5: [], 10: ["\\Recent"]}
3988
3989        def login():
3990            return self.client.login(b"testuser", b"password-test")
3991
3992        def loggedIn():
3993            self.server.flagsChanged(flags)
3994
3995        d1 = self.connected.addCallback(strip(login))
3996        d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
3997        d = defer.gatherResults([self.loopback(), d1])
3998        return d.addCallback(self._cbTestFlagChange, flags)
3999
4000    def _cbTestFlagChange(self, ignored, flags):
4001        E = self.client.events
4002        expect = [["flagsChanged", {x[0]: x[1]}] for x in flags.items()]
4003        E.sort(key=lambda o: o[0])
4004        expect.sort(key=lambda o: o[0])
4005        self.assertEqual(E, expect)
4006
4007    def testNewMessages(self):
4008        def login():
4009            return self.client.login(b"testuser", b"password-test")
4010
4011        def loggedIn():
4012            self.server.newMessages(10, None)
4013
4014        d1 = self.connected.addCallback(strip(login))
4015        d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
4016        d = defer.gatherResults([self.loopback(), d1])
4017        return d.addCallback(self._cbTestNewMessages)
4018
4019    def _cbTestNewMessages(self, ignored):
4020        E = self.client.events
4021        self.assertEqual(E, [["newMessages", 10, None]])
4022
4023    def testNewRecentMessages(self):
4024        def login():
4025            return self.client.login(b"testuser", b"password-test")
4026
4027        def loggedIn():
4028            self.server.newMessages(None, 10)
4029
4030        d1 = self.connected.addCallback(strip(login))
4031        d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
4032        d = defer.gatherResults([self.loopback(), d1])
4033        return d.addCallback(self._cbTestNewRecentMessages)
4034
4035    def _cbTestNewRecentMessages(self, ignored):
4036        E = self.client.events
4037        self.assertEqual(E, [["newMessages", None, 10]])
4038
4039    def testNewMessagesAndRecent(self):
4040        def login():
4041            return self.client.login(b"testuser", b"password-test")
4042
4043        def loggedIn():
4044            self.server.newMessages(20, 10)
4045
4046        d1 = self.connected.addCallback(strip(login))
4047        d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
4048        d = defer.gatherResults([self.loopback(), d1])
4049        return d.addCallback(self._cbTestNewMessagesAndRecent)
4050
4051    def _cbTestNewMessagesAndRecent(self, ignored):
4052        E = self.client.events
4053        self.assertEqual(E, [["newMessages", 20, None], ["newMessages", None, 10]])
4054
4055
4056class ClientCapabilityTests(TestCase):
4057    """
4058    Tests for issuance of the CAPABILITY command and handling of its response.
4059    """
4060
4061    def setUp(self):
4062        """
4063        Create an L{imap4.IMAP4Client} connected to a L{StringTransport}.
4064        """
4065        self.transport = StringTransport()
4066        self.protocol = imap4.IMAP4Client()
4067        self.protocol.makeConnection(self.transport)
4068        self.protocol.dataReceived(b"* OK [IMAP4rev1]\r\n")
4069
4070    def test_simpleAtoms(self):
4071        """
4072        A capability response consisting only of atoms without C{'='} in them
4073        should result in a dict mapping those atoms to L{None}.
4074        """
4075        capabilitiesResult = self.protocol.getCapabilities(useCache=False)
4076        self.protocol.dataReceived(b"* CAPABILITY IMAP4rev1 LOGINDISABLED\r\n")
4077        self.protocol.dataReceived(b"0001 OK Capability completed.\r\n")
4078
4079        def gotCapabilities(capabilities):
4080            self.assertEqual(capabilities, {b"IMAP4rev1": None, b"LOGINDISABLED": None})
4081
4082        capabilitiesResult.addCallback(gotCapabilities)
4083        return capabilitiesResult
4084
4085    def test_categoryAtoms(self):
4086        """
4087        A capability response consisting of atoms including C{'='} should have
4088        those atoms split on that byte and have capabilities in the same
4089        category aggregated into lists in the resulting dictionary.
4090
4091        (n.b. - I made up the word "category atom"; the protocol has no notion
4092        of structure here, but rather allows each capability to define the
4093        semantics of its entry in the capability response in a freeform manner.
4094        If I had realized this earlier, the API for capabilities would look
4095        different.  As it is, we can hope that no one defines any crazy
4096        semantics which are incompatible with this API, or try to figure out a
4097        better API when someone does. -exarkun)
4098        """
4099        capabilitiesResult = self.protocol.getCapabilities(useCache=False)
4100        self.protocol.dataReceived(b"* CAPABILITY IMAP4rev1 AUTH=LOGIN AUTH=PLAIN\r\n")
4101        self.protocol.dataReceived(b"0001 OK Capability completed.\r\n")
4102
4103        def gotCapabilities(capabilities):
4104            self.assertEqual(
4105                capabilities, {b"IMAP4rev1": None, b"AUTH": [b"LOGIN", b"PLAIN"]}
4106            )
4107
4108        capabilitiesResult.addCallback(gotCapabilities)
4109        return capabilitiesResult
4110
4111    def test_mixedAtoms(self):
4112        """
4113        A capability response consisting of both simple and category atoms of
4114        the same type should result in a list containing L{None} as well as the
4115        values for the category.
4116        """
4117        capabilitiesResult = self.protocol.getCapabilities(useCache=False)
4118        # Exercise codepath for both orderings of =-having and =-missing
4119        # capabilities.
4120        self.protocol.dataReceived(
4121            b"* CAPABILITY IMAP4rev1 FOO FOO=BAR BAR=FOO BAR\r\n"
4122        )
4123        self.protocol.dataReceived(b"0001 OK Capability completed.\r\n")
4124
4125        def gotCapabilities(capabilities):
4126            self.assertEqual(
4127                capabilities,
4128                {b"IMAP4rev1": None, b"FOO": [None, b"BAR"], b"BAR": [b"FOO", None]},
4129            )
4130
4131        capabilitiesResult.addCallback(gotCapabilities)
4132        return capabilitiesResult
4133
4134
4135class StillSimplerClient(imap4.IMAP4Client):
4136    """
4137    An IMAP4 client which keeps track of unsolicited flag changes.
4138    """
4139
4140    def __init__(self):
4141        imap4.IMAP4Client.__init__(self)
4142        self.flags = {}
4143
4144    def flagsChanged(self, newFlags):
4145        self.flags.update(newFlags)
4146
4147
4148class HandCraftedTests(IMAP4HelperMixin, TestCase):
4149    def testTrailingLiteral(self):
4150        transport = StringTransport()
4151        c = imap4.IMAP4Client()
4152        c.makeConnection(transport)
4153        c.lineReceived(b"* OK [IMAP4rev1]")
4154
4155        def cbCheckTransport(ignored):
4156            self.assertEqual(
4157                transport.value().splitlines()[-1],
4158                b"0003 FETCH 1 (RFC822)",
4159            )
4160
4161        def cbSelect(ignored):
4162            d = c.fetchMessage("1")
4163            c.dataReceived(
4164                b"* 1 FETCH (RFC822 {10}\r\n0123456789\r\n RFC822.SIZE 10)\r\n"
4165            )
4166            c.dataReceived(b"0003 OK FETCH\r\n")
4167            d.addCallback(cbCheckTransport)
4168            return d
4169
4170        def cbLogin(ignored):
4171            d = c.select("inbox")
4172            c.lineReceived(b"0002 OK SELECT")
4173            d.addCallback(cbSelect)
4174            return d
4175
4176        d = c.login(b"blah", b"blah")
4177        c.dataReceived(b"0001 OK LOGIN\r\n")
4178        d.addCallback(cbLogin)
4179        return d
4180
4181    def test_fragmentedStringLiterals(self):
4182        """
4183        String literals whose data is not immediately available are
4184        parsed.
4185        """
4186        self.server.checker.addUser(b"testuser", b"password-test")
4187        transport = StringTransport()
4188        self.server.makeConnection(transport)
4189
4190        transport.clear()
4191        self.server.dataReceived(b"01 LOGIN {8}\r\n")
4192        self.assertEqual(transport.value(), b"+ Ready for 8 octets of text\r\n")
4193
4194        transport.clear()
4195        self.server.dataReceived(b"testuser {13}\r\n")
4196        self.assertEqual(transport.value(), b"+ Ready for 13 octets of text\r\n")
4197
4198        transport.clear()
4199        self.server.dataReceived(b"password")
4200        self.assertNot(transport.value())
4201        self.server.dataReceived(b"-test\r\n")
4202        self.assertEqual(transport.value(), b"01 OK LOGIN succeeded\r\n")
4203        self.assertEqual(self.server.state, "auth")
4204
4205        self.server.connectionLost(error.ConnectionDone("Connection done."))
4206
4207    def test_emptyStringLiteral(self):
4208        """
4209        Empty string literals are parsed.
4210        """
4211        self.server.checker.users = {b"": b""}
4212        transport = StringTransport()
4213        self.server.makeConnection(transport)
4214
4215        transport.clear()
4216        self.server.dataReceived(b"01 LOGIN {0}\r\n")
4217        self.assertEqual(transport.value(), b"+ Ready for 0 octets of text\r\n")
4218
4219        transport.clear()
4220        self.server.dataReceived(b"{0}\r\n")
4221        self.assertEqual(transport.value(), b"01 OK LOGIN succeeded\r\n")
4222        self.assertEqual(self.server.state, "auth")
4223
4224        self.server.connectionLost(error.ConnectionDone("Connection done."))
4225
4226    def test_unsolicitedResponseMixedWithSolicitedResponse(self):
4227        """
4228        If unsolicited data is received along with solicited data in the
4229        response to a I{FETCH} command issued by L{IMAP4Client.fetchSpecific},
4230        the unsolicited data is passed to the appropriate callback and not
4231        included in the result with which the L{Deferred} returned by
4232        L{IMAP4Client.fetchSpecific} fires.
4233        """
4234        transport = StringTransport()
4235        c = StillSimplerClient()
4236        c.makeConnection(transport)
4237        c.lineReceived(b"* OK [IMAP4rev1]")
4238
4239        def login():
4240            d = c.login(b"blah", b"blah")
4241            c.dataReceived(b"0001 OK LOGIN\r\n")
4242            return d
4243
4244        def select():
4245            d = c.select("inbox")
4246            c.lineReceived(b"0002 OK SELECT")
4247            return d
4248
4249        def fetch():
4250            d = c.fetchSpecific(
4251                "1:*", headerType="HEADER.FIELDS", headerArgs=["SUBJECT"]
4252            )
4253            c.dataReceived(b'* 1 FETCH (BODY[HEADER.FIELDS ("SUBJECT")] {38}\r\n')
4254            c.dataReceived(b"Subject: Suprise for your woman...\r\n")
4255            c.dataReceived(b"\r\n")
4256            c.dataReceived(b")\r\n")
4257            c.dataReceived(b"* 1 FETCH (FLAGS (\\Seen))\r\n")
4258            c.dataReceived(b'* 2 FETCH (BODY[HEADER.FIELDS ("SUBJECT")] {75}\r\n')
4259            c.dataReceived(
4260                b"Subject: What you been doing. Order your meds here . ,. handcuff madsen\r\n"
4261            )
4262            c.dataReceived(b"\r\n")
4263            c.dataReceived(b")\r\n")
4264            c.dataReceived(b"0003 OK FETCH completed\r\n")
4265            return d
4266
4267        def test(res):
4268            self.assertEqual(
4269                transport.value().splitlines()[-1],
4270                b"0003 FETCH 1:* BODY[HEADER.FIELDS (SUBJECT)]",
4271            )
4272
4273            self.assertEqual(
4274                res,
4275                {
4276                    1: [
4277                        [
4278                            "BODY",
4279                            ["HEADER.FIELDS", ["SUBJECT"]],
4280                            "Subject: Suprise for your woman...\r\n\r\n",
4281                        ]
4282                    ],
4283                    2: [
4284                        [
4285                            "BODY",
4286                            ["HEADER.FIELDS", ["SUBJECT"]],
4287                            "Subject: What you been doing. Order your meds here . ,. handcuff madsen\r\n\r\n",
4288                        ]
4289                    ],
4290                },
4291            )
4292
4293            self.assertEqual(c.flags, {1: ["\\Seen"]})
4294
4295        return (
4296            login()
4297            .addCallback(strip(select))
4298            .addCallback(strip(fetch))
4299            .addCallback(test)
4300        )
4301
4302    def test_literalWithoutPrecedingWhitespace(self):
4303        """
4304        Literals should be recognized even when they are not preceded by
4305        whitespace.
4306        """
4307        transport = StringTransport()
4308        protocol = imap4.IMAP4Client()
4309
4310        protocol.makeConnection(transport)
4311        protocol.lineReceived(b"* OK [IMAP4rev1]")
4312
4313        def login():
4314            d = protocol.login(b"blah", b"blah")
4315            protocol.dataReceived(b"0001 OK LOGIN\r\n")
4316            return d
4317
4318        def select():
4319            d = protocol.select(b"inbox")
4320            protocol.lineReceived(b"0002 OK SELECT")
4321            return d
4322
4323        def fetch():
4324            d = protocol.fetchSpecific(
4325                "1:*", headerType="HEADER.FIELDS", headerArgs=["SUBJECT"]
4326            )
4327            protocol.dataReceived(
4328                b'* 1 FETCH (BODY[HEADER.FIELDS ({7}\r\nSUBJECT)] "Hello")\r\n'
4329            )
4330            protocol.dataReceived(b"0003 OK FETCH completed\r\n")
4331            return d
4332
4333        def test(result):
4334            self.assertEqual(
4335                transport.value().splitlines()[-1],
4336                b"0003 FETCH 1:* BODY[HEADER.FIELDS (SUBJECT)]",
4337            )
4338            self.assertEqual(
4339                result, {1: [["BODY", ["HEADER.FIELDS", ["SUBJECT"]], "Hello"]]}
4340            )
4341
4342        d = login()
4343        d.addCallback(strip(select))
4344        d.addCallback(strip(fetch))
4345        d.addCallback(test)
4346        return d
4347
4348    def test_nonIntegerLiteralLength(self):
4349        """
4350        If the server sends a literal length which cannot be parsed as an
4351        integer, L{IMAP4Client.lineReceived} should cause the protocol to be
4352        disconnected by raising L{imap4.IllegalServerResponse}.
4353        """
4354        transport = StringTransport()
4355        protocol = imap4.IMAP4Client()
4356
4357        protocol.makeConnection(transport)
4358        protocol.lineReceived(b"* OK [IMAP4rev1]")
4359
4360        def login():
4361            d = protocol.login(b"blah", b"blah")
4362            protocol.dataReceived(b"0001 OK LOGIN\r\n")
4363            return d
4364
4365        def select():
4366            d = protocol.select("inbox")
4367            protocol.lineReceived(b"0002 OK SELECT")
4368            return d
4369
4370        def fetch():
4371            protocol.fetchSpecific(
4372                "1:*", headerType="HEADER.FIELDS", headerArgs=["SUBJECT"]
4373            )
4374
4375            self.assertEqual(
4376                transport.value().splitlines()[-1],
4377                b"0003 FETCH 1:* BODY[HEADER.FIELDS (SUBJECT)]",
4378            )
4379
4380            self.assertRaises(
4381                imap4.IllegalServerResponse,
4382                protocol.dataReceived,
4383                b"* 1 FETCH {xyz}\r\n...",
4384            )
4385
4386        d = login()
4387        d.addCallback(strip(select))
4388        d.addCallback(strip(fetch))
4389        return d
4390
4391    def test_flagsChangedInsideFetchSpecificResponse(self):
4392        """
4393        Any unrequested flag information received along with other requested
4394        information in an untagged I{FETCH} received in response to a request
4395        issued with L{IMAP4Client.fetchSpecific} is passed to the
4396        C{flagsChanged} callback.
4397        """
4398        transport = StringTransport()
4399        c = StillSimplerClient()
4400        c.makeConnection(transport)
4401        c.lineReceived(b"* OK [IMAP4rev1]")
4402
4403        def login():
4404            d = c.login(b"blah", b"blah")
4405            c.dataReceived(b"0001 OK LOGIN\r\n")
4406            return d
4407
4408        def select():
4409            d = c.select("inbox")
4410            c.lineReceived(b"0002 OK SELECT")
4411            return d
4412
4413        def fetch():
4414            d = c.fetchSpecific(
4415                b"1:*", headerType="HEADER.FIELDS", headerArgs=["SUBJECT"]
4416            )
4417            # This response includes FLAGS after the requested data.
4418            c.dataReceived(b'* 1 FETCH (BODY[HEADER.FIELDS ("SUBJECT")] {22}\r\n')
4419            c.dataReceived(b"Subject: subject one\r\n")
4420            c.dataReceived(b" FLAGS (\\Recent))\r\n")
4421            # And this one includes it before!  Either is possible.
4422            c.dataReceived(
4423                b'* 2 FETCH (FLAGS (\\Seen) BODY[HEADER.FIELDS ("SUBJECT")] {22}\r\n'
4424            )
4425            c.dataReceived(b"Subject: subject two\r\n")
4426            c.dataReceived(b")\r\n")
4427            c.dataReceived(b"0003 OK FETCH completed\r\n")
4428            return d
4429
4430        def test(res):
4431            self.assertEqual(
4432                res,
4433                {
4434                    1: [
4435                        [
4436                            "BODY",
4437                            ["HEADER.FIELDS", ["SUBJECT"]],
4438                            "Subject: subject one\r\n",
4439                        ]
4440                    ],
4441                    2: [
4442                        [
4443                            "BODY",
4444                            ["HEADER.FIELDS", ["SUBJECT"]],
4445                            "Subject: subject two\r\n",
4446                        ]
4447                    ],
4448                },
4449            )
4450
4451            self.assertEqual(c.flags, {1: ["\\Recent"], 2: ["\\Seen"]})
4452
4453        return (
4454            login()
4455            .addCallback(strip(select))
4456            .addCallback(strip(fetch))
4457            .addCallback(test)
4458        )
4459
4460    def test_flagsChangedInsideFetchMessageResponse(self):
4461        """
4462        Any unrequested flag information received along with other requested
4463        information in an untagged I{FETCH} received in response to a request
4464        issued with L{IMAP4Client.fetchMessage} is passed to the
4465        C{flagsChanged} callback.
4466        """
4467        transport = StringTransport()
4468        c = StillSimplerClient()
4469        c.makeConnection(transport)
4470        c.lineReceived(b"* OK [IMAP4rev1]")
4471
4472        def login():
4473            d = c.login(b"blah", b"blah")
4474            c.dataReceived(b"0001 OK LOGIN\r\n")
4475            return d
4476
4477        def select():
4478            d = c.select("inbox")
4479            c.lineReceived(b"0002 OK SELECT")
4480            return d
4481
4482        def fetch():
4483            d = c.fetchMessage("1:*")
4484            c.dataReceived(b"* 1 FETCH (RFC822 {24}\r\n")
4485            c.dataReceived(b"Subject: first subject\r\n")
4486            c.dataReceived(b" FLAGS (\\Seen))\r\n")
4487            c.dataReceived(b"* 2 FETCH (FLAGS (\\Recent \\Seen) RFC822 {25}\r\n")
4488            c.dataReceived(b"Subject: second subject\r\n")
4489            c.dataReceived(b")\r\n")
4490            c.dataReceived(b"0003 OK FETCH completed\r\n")
4491            return d
4492
4493        def test(res):
4494            self.assertEqual(
4495                transport.value().splitlines()[-1],
4496                b"0003 FETCH 1:* (RFC822)",
4497            )
4498
4499            self.assertEqual(
4500                res,
4501                {
4502                    1: {"RFC822": "Subject: first subject\r\n"},
4503                    2: {"RFC822": "Subject: second subject\r\n"},
4504                },
4505            )
4506
4507            self.assertEqual(c.flags, {1: ["\\Seen"], 2: ["\\Recent", "\\Seen"]})
4508
4509        return (
4510            login()
4511            .addCallback(strip(select))
4512            .addCallback(strip(fetch))
4513            .addCallback(test)
4514        )
4515
4516    def test_authenticationChallengeDecodingException(self):
4517        """
4518        When decoding a base64 encoded authentication message from the server,
4519        decoding errors are logged and then the client closes the connection.
4520        """
4521        transport = StringTransportWithDisconnection()
4522        protocol = imap4.IMAP4Client()
4523        transport.protocol = protocol
4524
4525        protocol.makeConnection(transport)
4526        protocol.lineReceived(
4527            b"* OK [CAPABILITY IMAP4rev1 IDLE NAMESPACE AUTH=CRAM-MD5] "
4528            b"Twisted IMAP4rev1 Ready"
4529        )
4530        cAuth = imap4.CramMD5ClientAuthenticator(b"testuser")
4531        protocol.registerAuthenticator(cAuth)
4532
4533        d = protocol.authenticate("secret")
4534        # Should really be something describing the base64 decode error.  See
4535        # #6021.
4536        self.assertFailure(d, error.ConnectionDone)
4537
4538        protocol.dataReceived(b"+ Something bad! and bad\r\n")
4539
4540        # This should not really be logged.  See #6021.
4541        logged = self.flushLoggedErrors(imap4.IllegalServerResponse)
4542        self.assertEqual(len(logged), 1)
4543        self.assertEqual(logged[0].value.args[0], b"Something bad! and bad")
4544        return d
4545
4546
4547class PreauthIMAP4ClientMixin:
4548    """
4549    Mixin for L{SynchronousTestCase} subclasses which
4550    provides a C{setUp} method which creates an L{IMAP4Client}
4551    connected to a L{StringTransport} and puts it into the
4552    I{authenticated} state.
4553
4554    @ivar transport: A L{StringTransport} to which C{client} is
4555        connected.
4556
4557    @ivar client: An L{IMAP4Client} which is connected to
4558        C{transport}.
4559    """
4560
4561    clientProtocol: Type[imap4.IMAP4Client] = imap4.IMAP4Client
4562
4563    def setUp(self):
4564        """
4565        Create an IMAP4Client connected to a fake transport and in the
4566        authenticated state.
4567        """
4568        self.transport = StringTransport()
4569        self.client = self.clientProtocol()
4570        self.client.makeConnection(self.transport)
4571        self.client.dataReceived(b"* PREAUTH Hello unittest\r\n")
4572
4573
4574class SelectionTestsMixin(PreauthIMAP4ClientMixin):
4575    """
4576    Mixin for test cases which defines tests which apply to both I{EXAMINE} and
4577    I{SELECT} support.
4578    """
4579
4580    def _examineOrSelect(self):
4581        """
4582        Issue either an I{EXAMINE} or I{SELECT} command (depending on
4583        C{self.method}), assert that the correct bytes are written to the
4584        transport, and return the L{Deferred} returned by whichever method was
4585        called.
4586        """
4587        d = getattr(self.client, self.method)("foobox")
4588        self.assertEqual(
4589            self.transport.value(), b"0001 " + self.command + b" foobox\r\n"
4590        )
4591        return d
4592
4593    def _response(self, *lines):
4594        """
4595        Deliver the given (unterminated) response lines to C{self.client} and
4596        then deliver a tagged SELECT or EXAMINE completion line to finish the
4597        SELECT or EXAMINE response.
4598        """
4599        for line in lines:
4600            self.client.dataReceived(line + b"\r\n")
4601        self.client.dataReceived(
4602            b"0001 OK [READ-ONLY] " + self.command + b" completed\r\n"
4603        )
4604
4605    def test_exists(self):
4606        """
4607        If the server response to a I{SELECT} or I{EXAMINE} command includes an
4608        I{EXISTS} response, the L{Deferred} return by L{IMAP4Client.select} or
4609        L{IMAP4Client.examine} fires with a C{dict} including the value
4610        associated with the C{'EXISTS'} key.
4611        """
4612        d = self._examineOrSelect()
4613        self._response(b"* 3 EXISTS")
4614        self.assertEqual(self.successResultOf(d), {"READ-WRITE": False, "EXISTS": 3})
4615
4616    def test_nonIntegerExists(self):
4617        """
4618        If the server returns a non-integer EXISTS value in its response to a
4619        I{SELECT} or I{EXAMINE} command, the L{Deferred} returned by
4620        L{IMAP4Client.select} or L{IMAP4Client.examine} fails with
4621        L{IllegalServerResponse}.
4622        """
4623        d = self._examineOrSelect()
4624        self._response(b"* foo EXISTS")
4625        self.failureResultOf(d, imap4.IllegalServerResponse)
4626
4627    def test_recent(self):
4628        """
4629        If the server response to a I{SELECT} or I{EXAMINE} command includes an
4630        I{RECENT} response, the L{Deferred} return by L{IMAP4Client.select} or
4631        L{IMAP4Client.examine} fires with a C{dict} including the value
4632        associated with the C{'RECENT'} key.
4633        """
4634        d = self._examineOrSelect()
4635        self._response(b"* 5 RECENT")
4636        self.assertEqual(self.successResultOf(d), {"READ-WRITE": False, "RECENT": 5})
4637
4638    def test_nonIntegerRecent(self):
4639        """
4640        If the server returns a non-integer RECENT value in its response to a
4641        I{SELECT} or I{EXAMINE} command, the L{Deferred} returned by
4642        L{IMAP4Client.select} or L{IMAP4Client.examine} fails with
4643        L{IllegalServerResponse}.
4644        """
4645        d = self._examineOrSelect()
4646        self._response(b"* foo RECENT")
4647        self.failureResultOf(d, imap4.IllegalServerResponse)
4648
4649    def test_unseen(self):
4650        """
4651        If the server response to a I{SELECT} or I{EXAMINE} command includes an
4652        I{UNSEEN} response, the L{Deferred} returned by L{IMAP4Client.select} or
4653        L{IMAP4Client.examine} fires with a C{dict} including the value
4654        associated with the C{'UNSEEN'} key.
4655        """
4656        d = self._examineOrSelect()
4657        self._response(b"* OK [UNSEEN 8] Message 8 is first unseen")
4658        self.assertEqual(self.successResultOf(d), {"READ-WRITE": False, "UNSEEN": 8})
4659
4660    def test_nonIntegerUnseen(self):
4661        """
4662        If the server returns a non-integer UNSEEN value in its response to a
4663        I{SELECT} or I{EXAMINE} command, the L{Deferred} returned by
4664        L{IMAP4Client.select} or L{IMAP4Client.examine} fails with
4665        L{IllegalServerResponse}.
4666        """
4667        d = self._examineOrSelect()
4668        self._response(b"* OK [UNSEEN foo] Message foo is first unseen")
4669        self.failureResultOf(d, imap4.IllegalServerResponse)
4670
4671    def test_uidvalidity(self):
4672        """
4673        If the server response to a I{SELECT} or I{EXAMINE} command includes an
4674        I{UIDVALIDITY} response, the L{Deferred} returned by
4675        L{IMAP4Client.select} or L{IMAP4Client.examine} fires with a C{dict}
4676        including the value associated with the C{'UIDVALIDITY'} key.
4677        """
4678        d = self._examineOrSelect()
4679        self._response(b"* OK [UIDVALIDITY 12345] UIDs valid")
4680        self.assertEqual(
4681            self.successResultOf(d), {"READ-WRITE": False, "UIDVALIDITY": 12345}
4682        )
4683
4684    def test_nonIntegerUIDVALIDITY(self):
4685        """
4686        If the server returns a non-integer UIDVALIDITY value in its response to
4687        a I{SELECT} or I{EXAMINE} command, the L{Deferred} returned by
4688        L{IMAP4Client.select} or L{IMAP4Client.examine} fails with
4689        L{IllegalServerResponse}.
4690        """
4691        d = self._examineOrSelect()
4692        self._response(b"* OK [UIDVALIDITY foo] UIDs valid")
4693        self.failureResultOf(d, imap4.IllegalServerResponse)
4694
4695    def test_uidnext(self):
4696        """
4697        If the server response to a I{SELECT} or I{EXAMINE} command includes an
4698        I{UIDNEXT} response, the L{Deferred} returned by L{IMAP4Client.select}
4699        or L{IMAP4Client.examine} fires with a C{dict} including the value
4700        associated with the C{'UIDNEXT'} key.
4701        """
4702        d = self._examineOrSelect()
4703        self._response(b"* OK [UIDNEXT 4392] Predicted next UID")
4704        self.assertEqual(
4705            self.successResultOf(d), {"READ-WRITE": False, "UIDNEXT": 4392}
4706        )
4707
4708    def test_nonIntegerUIDNEXT(self):
4709        """
4710        If the server returns a non-integer UIDNEXT value in its response to a
4711        I{SELECT} or I{EXAMINE} command, the L{Deferred} returned by
4712        L{IMAP4Client.select} or L{IMAP4Client.examine} fails with
4713        L{IllegalServerResponse}.
4714        """
4715        d = self._examineOrSelect()
4716        self._response(b"* OK [UIDNEXT foo] Predicted next UID")
4717        self.failureResultOf(d, imap4.IllegalServerResponse)
4718
4719    def test_flags(self):
4720        """
4721        If the server response to a I{SELECT} or I{EXAMINE} command includes an
4722        I{FLAGS} response, the L{Deferred} returned by L{IMAP4Client.select} or
4723        L{IMAP4Client.examine} fires with a C{dict} including the value
4724        associated with the C{'FLAGS'} key.
4725        """
4726        d = self._examineOrSelect()
4727        self._response(b"* FLAGS (\\Answered \\Flagged \\Deleted \\Seen \\Draft)")
4728        self.assertEqual(
4729            self.successResultOf(d),
4730            {
4731                "READ-WRITE": False,
4732                "FLAGS": ("\\Answered", "\\Flagged", "\\Deleted", "\\Seen", "\\Draft"),
4733            },
4734        )
4735
4736    def test_permanentflags(self):
4737        """
4738        If the server response to a I{SELECT} or I{EXAMINE} command includes an
4739        I{FLAGS} response, the L{Deferred} returned by L{IMAP4Client.select} or
4740        L{IMAP4Client.examine} fires with a C{dict} including the value
4741        associated with the C{'FLAGS'} key.
4742        """
4743        d = self._examineOrSelect()
4744        self._response(
4745            b"* OK [PERMANENTFLAGS (\\Starred)] Just one permanent flag in "
4746            b"that list up there"
4747        )
4748        self.assertEqual(
4749            self.successResultOf(d),
4750            {"READ-WRITE": False, "PERMANENTFLAGS": ("\\Starred",)},
4751        )
4752
4753    def test_unrecognizedOk(self):
4754        """
4755        If the server response to a I{SELECT} or I{EXAMINE} command includes an
4756        I{OK} with unrecognized response code text, parsing does not fail.
4757        """
4758        d = self._examineOrSelect()
4759        self._response(b"* OK [X-MADE-UP] I just made this response text up.")
4760        # The value won't show up in the result.  It would be okay if it did
4761        # someday, perhaps.  This shouldn't ever happen, though.
4762        self.assertEqual(self.successResultOf(d), {"READ-WRITE": False})
4763
4764    def test_bareOk(self):
4765        """
4766        If the server response to a I{SELECT} or I{EXAMINE} command includes an
4767        I{OK} with no response code text, parsing does not fail.
4768        """
4769        d = self._examineOrSelect()
4770        self._response(b"* OK")
4771        self.assertEqual(self.successResultOf(d), {"READ-WRITE": False})
4772
4773
4774class IMAP4ClientExamineTests(SelectionTestsMixin, SynchronousTestCase):
4775    """
4776    Tests for the L{IMAP4Client.examine} method.
4777
4778    An example of usage of the EXAMINE command from RFC 3501, section 6.3.2::
4779
4780        S: * 17 EXISTS
4781        S: * 2 RECENT
4782        S: * OK [UNSEEN 8] Message 8 is first unseen
4783        S: * OK [UIDVALIDITY 3857529045] UIDs valid
4784        S: * OK [UIDNEXT 4392] Predicted next UID
4785        S: * FLAGS (\\Answered \\Flagged \\Deleted \\Seen \\Draft)
4786        S: * OK [PERMANENTFLAGS ()] No permanent flags permitted
4787        S: A932 OK [READ-ONLY] EXAMINE completed
4788    """
4789
4790    method = "examine"
4791    command = b"EXAMINE"
4792
4793
4794class IMAP4ClientSelectTests(SelectionTestsMixin, SynchronousTestCase):
4795    r"""
4796    Tests for the L{IMAP4Client.select} method.
4797
4798    An example of usage of the SELECT command from RFC 3501, section 6.3.1::
4799
4800        C: A142 SELECT INBOX
4801        S: * 172 EXISTS
4802        S: * 1 RECENT
4803        S: * OK [UNSEEN 12] Message 12 is first unseen
4804        S: * OK [UIDVALIDITY 3857529045] UIDs valid
4805        S: * OK [UIDNEXT 4392] Predicted next UID
4806        S: * FLAGS (\Answered \Flagged \Deleted \Seen \Draft)
4807        S: * OK [PERMANENTFLAGS (\Deleted \Seen \*)] Limited
4808        S: A142 OK [READ-WRITE] SELECT completed
4809    """
4810
4811    method = "select"
4812    command = b"SELECT"
4813
4814
4815class IMAP4ClientExpungeTests(PreauthIMAP4ClientMixin, SynchronousTestCase):
4816    """
4817    Tests for the L{IMAP4Client.expunge} method.
4818
4819    An example of usage of the EXPUNGE command from RFC 3501, section 6.4.3::
4820
4821        C: A202 EXPUNGE
4822        S: * 3 EXPUNGE
4823        S: * 3 EXPUNGE
4824        S: * 5 EXPUNGE
4825        S: * 8 EXPUNGE
4826        S: A202 OK EXPUNGE completed
4827    """
4828
4829    def _expunge(self):
4830        d = self.client.expunge()
4831        self.assertEqual(self.transport.value(), b"0001 EXPUNGE\r\n")
4832        self.transport.clear()
4833        return d
4834
4835    def _response(self, sequenceNumbers):
4836        for number in sequenceNumbers:
4837            self.client.lineReceived(networkString(f"* {number} EXPUNGE"))
4838        self.client.lineReceived(b"0001 OK EXPUNGE COMPLETED")
4839
4840    def test_expunge(self):
4841        """
4842        L{IMAP4Client.expunge} sends the I{EXPUNGE} command and returns a
4843        L{Deferred} which fires with a C{list} of message sequence numbers
4844        given by the server's response.
4845        """
4846        d = self._expunge()
4847        self._response([3, 3, 5, 8])
4848        self.assertEqual(self.successResultOf(d), [3, 3, 5, 8])
4849
4850    def test_nonIntegerExpunged(self):
4851        """
4852        If the server responds with a non-integer where a message sequence
4853        number is expected, the L{Deferred} returned by L{IMAP4Client.expunge}
4854        fails with L{IllegalServerResponse}.
4855        """
4856        d = self._expunge()
4857        self._response([3, 3, "foo", 8])
4858        self.failureResultOf(d, imap4.IllegalServerResponse)
4859
4860
4861class IMAP4ClientSearchTests(PreauthIMAP4ClientMixin, SynchronousTestCase):
4862    """
4863    Tests for the L{IMAP4Client.search} method.
4864
4865    An example of usage of the SEARCH command from RFC 3501, section 6.4.4::
4866
4867        C: A282 SEARCH FLAGGED SINCE 1-Feb-1994 NOT FROM "Smith"
4868        S: * SEARCH 2 84 882
4869        S: A282 OK SEARCH completed
4870        C: A283 SEARCH TEXT "string not in mailbox"
4871        S: * SEARCH
4872        S: A283 OK SEARCH completed
4873        C: A284 SEARCH CHARSET UTF-8 TEXT {6}
4874        C: XXXXXX
4875        S: * SEARCH 43
4876        S: A284 OK SEARCH completed
4877    """
4878
4879    def _search(self):
4880        d = self.client.search(imap4.Query(text="ABCDEF"))
4881        self.assertEqual(self.transport.value(), b'0001 SEARCH (TEXT "ABCDEF")\r\n')
4882        return d
4883
4884    def _response(self, messageNumbers):
4885        self.client.lineReceived(
4886            b"* SEARCH " + networkString(" ".join(map(str, messageNumbers)))
4887        )
4888        self.client.lineReceived(b"0001 OK SEARCH completed")
4889
4890    def test_search(self):
4891        """
4892        L{IMAP4Client.search} sends the I{SEARCH} command and returns a
4893        L{Deferred} which fires with a C{list} of message sequence numbers
4894        given by the server's response.
4895        """
4896        d = self._search()
4897        self._response([2, 5, 10])
4898        self.assertEqual(self.successResultOf(d), [2, 5, 10])
4899
4900    def test_nonIntegerFound(self):
4901        """
4902        If the server responds with a non-integer where a message sequence
4903        number is expected, the L{Deferred} returned by L{IMAP4Client.search}
4904        fails with L{IllegalServerResponse}.
4905        """
4906        d = self._search()
4907        self._response([2, "foo", 10])
4908        self.failureResultOf(d, imap4.IllegalServerResponse)
4909
4910
4911class IMAP4ClientFetchTests(PreauthIMAP4ClientMixin, SynchronousTestCase):
4912    """
4913    Tests for the L{IMAP4Client.fetch} method.
4914
4915    See RFC 3501, section 6.4.5.
4916    """
4917
4918    def test_fetchUID(self):
4919        """
4920        L{IMAP4Client.fetchUID} sends the I{FETCH UID} command and returns a
4921        L{Deferred} which fires with a C{dict} mapping message sequence numbers
4922        to C{dict}s mapping C{'UID'} to that message's I{UID} in the server's
4923        response.
4924        """
4925        d = self.client.fetchUID("1:7")
4926        self.assertEqual(self.transport.value(), b"0001 FETCH 1:7 (UID)\r\n")
4927        self.client.lineReceived(b"* 2 FETCH (UID 22)")
4928        self.client.lineReceived(b"* 3 FETCH (UID 23)")
4929        self.client.lineReceived(b"* 4 FETCH (UID 24)")
4930        self.client.lineReceived(b"* 5 FETCH (UID 25)")
4931        self.client.lineReceived(b"0001 OK FETCH completed")
4932        self.assertEqual(
4933            self.successResultOf(d),
4934            {2: {"UID": "22"}, 3: {"UID": "23"}, 4: {"UID": "24"}, 5: {"UID": "25"}},
4935        )
4936
4937    def test_fetchUIDNonIntegerFound(self):
4938        """
4939        If the server responds with a non-integer where a message sequence
4940        number is expected, the L{Deferred} returned by L{IMAP4Client.fetchUID}
4941        fails with L{IllegalServerResponse}.
4942        """
4943        d = self.client.fetchUID("1")
4944        self.assertEqual(self.transport.value(), b"0001 FETCH 1 (UID)\r\n")
4945        self.client.lineReceived(b"* foo FETCH (UID 22)")
4946        self.client.lineReceived(b"0001 OK FETCH completed")
4947        self.failureResultOf(d, imap4.IllegalServerResponse)
4948
4949    def test_incompleteFetchUIDResponse(self):
4950        """
4951        If the server responds with an incomplete I{FETCH} response line, the
4952        L{Deferred} returned by L{IMAP4Client.fetchUID} fails with
4953        L{IllegalServerResponse}.
4954        """
4955        d = self.client.fetchUID("1:7")
4956        self.assertEqual(self.transport.value(), b"0001 FETCH 1:7 (UID)\r\n")
4957        self.client.lineReceived(b"* 2 FETCH (UID 22)")
4958        self.client.lineReceived(b"* 3 FETCH (UID)")
4959        self.client.lineReceived(b"* 4 FETCH (UID 24)")
4960        self.client.lineReceived(b"0001 OK FETCH completed")
4961        self.failureResultOf(d, imap4.IllegalServerResponse)
4962
4963    def test_fetchBody(self):
4964        """
4965        L{IMAP4Client.fetchBody} sends the I{FETCH BODY} command and returns a
4966        L{Deferred} which fires with a C{dict} mapping message sequence numbers
4967        to C{dict}s mapping C{'RFC822.TEXT'} to that message's body as given in
4968        the server's response.
4969        """
4970        d = self.client.fetchBody("3")
4971        self.assertEqual(self.transport.value(), b"0001 FETCH 3 (RFC822.TEXT)\r\n")
4972        self.client.lineReceived(b'* 3 FETCH (RFC822.TEXT "Message text")')
4973        self.client.lineReceived(b"0001 OK FETCH completed")
4974        self.assertEqual(self.successResultOf(d), {3: {"RFC822.TEXT": "Message text"}})
4975
4976    def test_fetchSpecific(self):
4977        """
4978        L{IMAP4Client.fetchSpecific} sends the I{BODY[]} command if no
4979        parameters beyond the message set to retrieve are given.  It returns a
4980        L{Deferred} which fires with a C{dict} mapping message sequence numbers
4981        to C{list}s of corresponding message data given by the server's
4982        response.
4983        """
4984        d = self.client.fetchSpecific("7")
4985        self.assertEqual(self.transport.value(), b"0001 FETCH 7 BODY[]\r\n")
4986        self.client.lineReceived(b'* 7 FETCH (BODY[] "Some body")')
4987        self.client.lineReceived(b"0001 OK FETCH completed")
4988        self.assertEqual(self.successResultOf(d), {7: [["BODY", [], "Some body"]]})
4989
4990    def test_fetchSpecificPeek(self):
4991        """
4992        L{IMAP4Client.fetchSpecific} issues a I{BODY.PEEK[]} command if passed
4993        C{True} for the C{peek} parameter.
4994        """
4995        d = self.client.fetchSpecific("6", peek=True)
4996        self.assertEqual(self.transport.value(), b"0001 FETCH 6 BODY.PEEK[]\r\n")
4997        # BODY.PEEK responses are just BODY
4998        self.client.lineReceived(b'* 6 FETCH (BODY[] "Some body")')
4999        self.client.lineReceived(b"0001 OK FETCH completed")
5000        self.assertEqual(self.successResultOf(d), {6: [["BODY", [], "Some body"]]})
5001
5002    def test_fetchSpecificNumbered(self):
5003        """
5004        L{IMAP4Client.fetchSpecific}, when passed a sequence for
5005        C{headerNumber}, sends the I{BODY[N.M]} command.  It returns a
5006        L{Deferred} which fires with a C{dict} mapping message sequence numbers
5007        to C{list}s of corresponding message data given by the server's
5008        response.
5009        """
5010        d = self.client.fetchSpecific("7", headerNumber=(1, 2, 3))
5011        self.assertEqual(self.transport.value(), b"0001 FETCH 7 BODY[1.2.3]\r\n")
5012        self.client.lineReceived(b'* 7 FETCH (BODY[1.2.3] "Some body")')
5013        self.client.lineReceived(b"0001 OK FETCH completed")
5014        self.assertEqual(
5015            self.successResultOf(d), {7: [["BODY", ["1.2.3"], "Some body"]]}
5016        )
5017
5018    def test_fetchSpecificText(self):
5019        """
5020        L{IMAP4Client.fetchSpecific}, when passed C{'TEXT'} for C{headerType},
5021        sends the I{BODY[TEXT]} command.  It returns a L{Deferred} which fires
5022        with a C{dict} mapping message sequence numbers to C{list}s of
5023        corresponding message data given by the server's response.
5024        """
5025        d = self.client.fetchSpecific("8", headerType="TEXT")
5026        self.assertEqual(self.transport.value(), b"0001 FETCH 8 BODY[TEXT]\r\n")
5027        self.client.lineReceived(b'* 8 FETCH (BODY[TEXT] "Some body")')
5028        self.client.lineReceived(b"0001 OK FETCH completed")
5029        self.assertEqual(
5030            self.successResultOf(d), {8: [["BODY", ["TEXT"], "Some body"]]}
5031        )
5032
5033    def test_fetchSpecificNumberedText(self):
5034        """
5035        If passed a value for the C{headerNumber} parameter and C{'TEXT'} for
5036        the C{headerType} parameter, L{IMAP4Client.fetchSpecific} sends a
5037        I{BODY[number.TEXT]} request and returns a L{Deferred} which fires with
5038        a C{dict} mapping message sequence numbers to C{list}s of message data
5039        given by the server's response.
5040        """
5041        d = self.client.fetchSpecific("4", headerType="TEXT", headerNumber=7)
5042        self.assertEqual(self.transport.value(), b"0001 FETCH 4 BODY[7.TEXT]\r\n")
5043        self.client.lineReceived(b'* 4 FETCH (BODY[7.TEXT] "Some body")')
5044        self.client.lineReceived(b"0001 OK FETCH completed")
5045        self.assertEqual(
5046            self.successResultOf(d), {4: [["BODY", ["7.TEXT"], "Some body"]]}
5047        )
5048
5049    def test_incompleteFetchSpecificTextResponse(self):
5050        """
5051        If the server responds to a I{BODY[TEXT]} request with a I{FETCH} line
5052        which is truncated after the I{BODY[TEXT]} tokens, the L{Deferred}
5053        returned by L{IMAP4Client.fetchUID} fails with
5054        L{IllegalServerResponse}.
5055        """
5056        d = self.client.fetchSpecific("8", headerType="TEXT")
5057        self.assertEqual(self.transport.value(), b"0001 FETCH 8 BODY[TEXT]\r\n")
5058        self.client.lineReceived(b"* 8 FETCH (BODY[TEXT])")
5059        self.client.lineReceived(b"0001 OK FETCH completed")
5060        self.failureResultOf(d, imap4.IllegalServerResponse)
5061
5062    def test_fetchSpecificMIME(self):
5063        """
5064        L{IMAP4Client.fetchSpecific}, when passed C{'MIME'} for C{headerType},
5065        sends the I{BODY[MIME]} command.  It returns a L{Deferred} which fires
5066        with a C{dict} mapping message sequence numbers to C{list}s of
5067        corresponding message data given by the server's response.
5068        """
5069        d = self.client.fetchSpecific("8", headerType="MIME")
5070        self.assertEqual(self.transport.value(), b"0001 FETCH 8 BODY[MIME]\r\n")
5071        self.client.lineReceived(b'* 8 FETCH (BODY[MIME] "Some body")')
5072        self.client.lineReceived(b"0001 OK FETCH completed")
5073        self.assertEqual(
5074            self.successResultOf(d), {8: [["BODY", ["MIME"], "Some body"]]}
5075        )
5076
5077    def test_fetchSpecificPartial(self):
5078        """
5079        L{IMAP4Client.fetchSpecific}, when passed C{offset} and C{length},
5080        sends a partial content request (like I{BODY[TEXT]<offset.length>}).
5081        It returns a L{Deferred} which fires with a C{dict} mapping message
5082        sequence numbers to C{list}s of corresponding message data given by the
5083        server's response.
5084        """
5085        d = self.client.fetchSpecific("9", headerType="TEXT", offset=17, length=3)
5086        self.assertEqual(self.transport.value(), b"0001 FETCH 9 BODY[TEXT]<17.3>\r\n")
5087        self.client.lineReceived(b'* 9 FETCH (BODY[TEXT]<17> "foo")')
5088        self.client.lineReceived(b"0001 OK FETCH completed")
5089        self.assertEqual(
5090            self.successResultOf(d), {9: [["BODY", ["TEXT"], "<17>", "foo"]]}
5091        )
5092
5093    def test_incompleteFetchSpecificPartialResponse(self):
5094        """
5095        If the server responds to a I{BODY[TEXT]} request with a I{FETCH} line
5096        which is truncated after the I{BODY[TEXT]<offset>} tokens, the
5097        L{Deferred} returned by L{IMAP4Client.fetchUID} fails with
5098        L{IllegalServerResponse}.
5099        """
5100        d = self.client.fetchSpecific("8", headerType="TEXT")
5101        self.assertEqual(self.transport.value(), b"0001 FETCH 8 BODY[TEXT]\r\n")
5102        self.client.lineReceived(b"* 8 FETCH (BODY[TEXT]<17>)")
5103        self.client.lineReceived(b"0001 OK FETCH completed")
5104        self.failureResultOf(d, imap4.IllegalServerResponse)
5105
5106    def test_fetchSpecificHTML(self):
5107        """
5108        If the body of a message begins with I{<} and ends with I{>} (as,
5109        for example, HTML bodies typically will), this is still interpreted
5110        as the body by L{IMAP4Client.fetchSpecific} (and particularly, not
5111        as a length indicator for a response to a request for a partial
5112        body).
5113        """
5114        d = self.client.fetchSpecific("7")
5115        self.assertEqual(self.transport.value(), b"0001 FETCH 7 BODY[]\r\n")
5116        self.client.lineReceived(b'* 7 FETCH (BODY[] "<html>test</html>")')
5117        self.client.lineReceived(b"0001 OK FETCH completed")
5118        self.assertEqual(
5119            self.successResultOf(d), {7: [["BODY", [], "<html>test</html>"]]}
5120        )
5121
5122    def assertFetchSpecificFieldsWithEmptyList(self, section):
5123        """
5124        Assert that the provided C{BODY} section, when invoked with no
5125        arguments, produces an empty list, and that it returns a
5126        L{Deferred} which fires with a C{dict} mapping message
5127        sequence numbers to C{list}s of corresponding message data
5128        given by the server's response.
5129
5130        @param section: The C{BODY} section to test: either
5131            C{'HEADER.FIELDS'} or C{'HEADER.FIELDS.NOT'}
5132        @type section: L{str}
5133        """
5134        d = self.client.fetchSpecific("10", headerType=section)
5135        self.assertEqual(
5136            self.transport.value(),
5137            b"0001 FETCH 10 BODY[" + section.encode("ascii") + b" ()]\r\n",
5138        )
5139        # It's unclear what the response would look like - would it be
5140        # an empty string?  No IMAP server parses an empty list of headers
5141        self.client.lineReceived(
5142            b"* 10 FETCH (BODY[" + section.encode("ascii") + b' ()] "")'
5143        )
5144        self.client.lineReceived(b"0001 OK FETCH completed")
5145        self.assertEqual(self.successResultOf(d), {10: [["BODY", [section, []], ""]]})
5146
5147    def test_fetchSpecificHeaderFieldsWithoutHeaders(self):
5148        """
5149        L{IMAP4Client.fetchSpecific}, when passed C{'HEADER.FIELDS'}
5150        for C{headerType} but no C{headerArgs}, sends the
5151        I{BODY[HEADER.FIELDS]} command with no arguments.  It returns
5152        a L{Deferred} which fires with a C{dict} mapping message
5153        sequence numbers to C{list}s of corresponding message data
5154        given by the server's response.
5155        """
5156        self.assertFetchSpecificFieldsWithEmptyList("HEADER.FIELDS")
5157
5158    def test_fetchSpecificHeaderFieldsNotWithoutHeaders(self):
5159        """
5160        L{IMAP4Client.fetchSpecific}, when passed
5161        C{'HEADER.FIELDS.NOT'} for C{headerType} but no C{headerArgs},
5162        sends the I{BODY[HEADER.FIELDS.NOT]} command with no
5163        arguments.  It returns a L{Deferred} which fires with a
5164        C{dict} mapping message sequence numbers to C{list}s of
5165        corresponding message data given by the server's response.
5166        """
5167        self.assertFetchSpecificFieldsWithEmptyList("HEADER.FIELDS.NOT")
5168
5169    def test_fetchSpecificHeader(self):
5170        """
5171        L{IMAP4Client.fetchSpecific}, when passed C{'HEADER'} for
5172        C{headerType}, sends the I{BODY[HEADER]} command.  It returns
5173        a L{Deferred} which fires with a C{dict} mapping message
5174        sequence numbers to C{list}s of corresponding message data
5175        given by the server's response.
5176        """
5177        d = self.client.fetchSpecific("11", headerType="HEADER")
5178        self.assertEqual(self.transport.value(), b"0001 FETCH 11 BODY[HEADER]\r\n")
5179        self.client.lineReceived(
5180            b"* 11 FETCH (BODY[HEADER]"
5181            b' "From: someone@localhost\r\nSubject: Some subject")'
5182        )
5183        self.client.lineReceived(b"0001 OK FETCH completed")
5184        self.assertEqual(
5185            self.successResultOf(d),
5186            {
5187                11: [
5188                    [
5189                        "BODY",
5190                        ["HEADER"],
5191                        "From: someone@localhost\r\nSubject: Some subject",
5192                    ]
5193                ]
5194            },
5195        )
5196
5197
5198class IMAP4ClientStoreTests(PreauthIMAP4ClientMixin, TestCase):
5199    r"""
5200    Tests for the L{IMAP4Client.setFlags}, L{IMAP4Client.addFlags}, and
5201    L{IMAP4Client.removeFlags} methods.
5202
5203    An example of usage of the STORE command, in terms of which these three
5204    methods are implemented, from RFC 3501, section 6.4.6::
5205
5206        C: A003 STORE 2:4 +FLAGS (\Deleted)
5207        S: * 2 FETCH (FLAGS (\Deleted \Seen))
5208        S: * 3 FETCH (FLAGS (\Deleted))
5209        S: * 4 FETCH (FLAGS (\Deleted \Flagged \Seen))
5210        S: A003 OK STORE completed
5211    """
5212
5213    clientProtocol = StillSimplerClient
5214
5215    def _flagsTest(self, method, item):
5216        """
5217        Test a non-silent flag modifying method.  Call the method, assert that
5218        the correct bytes are sent, deliver a I{FETCH} response, and assert
5219        that the result of the Deferred returned by the method is correct.
5220
5221        @param method: The name of the method to test.
5222        @param item: The data item which is expected to be specified.
5223        """
5224        d = getattr(self.client, method)("3", ("\\Read", "\\Seen"), False)
5225        self.assertEqual(
5226            self.transport.value(), b"0001 STORE 3 " + item + b" (\\Read \\Seen)\r\n"
5227        )
5228        self.client.lineReceived(b"* 3 FETCH (FLAGS (\\Read \\Seen))")
5229        self.client.lineReceived(b"0001 OK STORE completed")
5230        self.assertEqual(self.successResultOf(d), {3: {"FLAGS": ["\\Read", "\\Seen"]}})
5231
5232    def _flagsSilentlyTest(self, method, item):
5233        """
5234        Test a silent flag modifying method.  Call the method, assert that the
5235        correct bytes are sent, deliver an I{OK} response, and assert that the
5236        result of the Deferred returned by the method is correct.
5237
5238        @param method: The name of the method to test.
5239        @param item: The data item which is expected to be specified.
5240        """
5241        d = getattr(self.client, method)("3", ("\\Read", "\\Seen"), True)
5242        self.assertEqual(
5243            self.transport.value(), b"0001 STORE 3 " + item + b" (\\Read \\Seen)\r\n"
5244        )
5245        self.client.lineReceived(b"0001 OK STORE completed")
5246        self.assertEqual(self.successResultOf(d), {})
5247
5248    def _flagsSilentlyWithUnsolicitedDataTest(self, method, item):
5249        """
5250        Test unsolicited data received in response to a silent flag modifying
5251        method.  Call the method, assert that the correct bytes are sent,
5252        deliver the unsolicited I{FETCH} response, and assert that the result
5253        of the Deferred returned by the method is correct.
5254
5255        @param method: The name of the method to test.
5256        @param item: The data item which is expected to be specified.
5257        """
5258        d = getattr(self.client, method)("3", ("\\Read", "\\Seen"), True)
5259        self.assertEqual(
5260            self.transport.value(), b"0001 STORE 3 " + item + b" (\\Read \\Seen)\r\n"
5261        )
5262        self.client.lineReceived(b"* 2 FETCH (FLAGS (\\Read \\Seen))")
5263        self.client.lineReceived(b"0001 OK STORE completed")
5264        self.assertEqual(self.successResultOf(d), {})
5265        self.assertEqual(self.client.flags, {2: ["\\Read", "\\Seen"]})
5266
5267    def test_setFlags(self):
5268        """
5269        When passed a C{False} value for the C{silent} parameter,
5270        L{IMAP4Client.setFlags} sends the I{STORE} command with a I{FLAGS} data
5271        item and returns a L{Deferred} which fires with a C{dict} mapping
5272        message sequence numbers to C{dict}s mapping C{'FLAGS'} to the new
5273        flags of those messages.
5274        """
5275        self._flagsTest("setFlags", b"FLAGS")
5276
5277    def test_setFlagsSilently(self):
5278        """
5279        When passed a C{True} value for the C{silent} parameter,
5280        L{IMAP4Client.setFlags} sends the I{STORE} command with a
5281        I{FLAGS.SILENT} data item and returns a L{Deferred} which fires with an
5282        empty dictionary.
5283        """
5284        self._flagsSilentlyTest("setFlags", b"FLAGS.SILENT")
5285
5286    def test_setFlagsSilentlyWithUnsolicitedData(self):
5287        """
5288        If unsolicited flag data is received in response to a I{STORE}
5289        I{FLAGS.SILENT} request, that data is passed to the C{flagsChanged}
5290        callback.
5291        """
5292        self._flagsSilentlyWithUnsolicitedDataTest("setFlags", b"FLAGS.SILENT")
5293
5294    def test_addFlags(self):
5295        """
5296        L{IMAP4Client.addFlags} is like L{IMAP4Client.setFlags}, but sends
5297        I{+FLAGS} instead of I{FLAGS}.
5298        """
5299        self._flagsTest("addFlags", b"+FLAGS")
5300
5301    def test_addFlagsSilently(self):
5302        """
5303        L{IMAP4Client.addFlags} with a C{True} value for C{silent} behaves like
5304        L{IMAP4Client.setFlags} with a C{True} value for C{silent}, but it
5305        sends I{+FLAGS.SILENT} instead of I{FLAGS.SILENT}.
5306        """
5307        self._flagsSilentlyTest("addFlags", b"+FLAGS.SILENT")
5308
5309    def test_addFlagsSilentlyWithUnsolicitedData(self):
5310        """
5311        L{IMAP4Client.addFlags} behaves like L{IMAP4Client.setFlags} when used
5312        in silent mode and unsolicited data is received.
5313        """
5314        self._flagsSilentlyWithUnsolicitedDataTest("addFlags", b"+FLAGS.SILENT")
5315
5316    def test_removeFlags(self):
5317        """
5318        L{IMAP4Client.removeFlags} is like L{IMAP4Client.setFlags}, but sends
5319        I{-FLAGS} instead of I{FLAGS}.
5320        """
5321        self._flagsTest("removeFlags", b"-FLAGS")
5322
5323    def test_removeFlagsSilently(self):
5324        """
5325        L{IMAP4Client.removeFlags} with a C{True} value for C{silent} behaves
5326        like L{IMAP4Client.setFlags} with a C{True} value for C{silent}, but it
5327        sends I{-FLAGS.SILENT} instead of I{FLAGS.SILENT}.
5328        """
5329        self._flagsSilentlyTest("removeFlags", b"-FLAGS.SILENT")
5330
5331    def test_removeFlagsSilentlyWithUnsolicitedData(self):
5332        """
5333        L{IMAP4Client.removeFlags} behaves like L{IMAP4Client.setFlags} when
5334        used in silent mode and unsolicited data is received.
5335        """
5336        self._flagsSilentlyWithUnsolicitedDataTest("removeFlags", b"-FLAGS.SILENT")
5337
5338
5339class IMAP4ClientStatusTests(PreauthIMAP4ClientMixin, SynchronousTestCase):
5340    """
5341    Tests for the L{IMAP4Client.status} method.
5342
5343    An example of usage of the STATUS command from RFC 3501, section
5344    5.1.2::
5345
5346        C: A042 STATUS blurdybloop (UIDNEXT MESSAGES)
5347        S: * STATUS blurdybloop (MESSAGES 231 UIDNEXT 44292)
5348        S: A042 OK STATUS completed
5349
5350    @see: U{https://tools.ietf.org/html/rfc3501#section-5.1.2}
5351    """
5352
5353    def testUnknownName(self):
5354        """
5355        Only allow sending the C{STATUS} names defined in RFC 3501.
5356
5357        @see: U{https://tools.ietf.org/html/rfc3501#section-5.1.2}
5358        """
5359        exc = self.assertRaises(
5360            ValueError,
5361            self.client.status,
5362            "ignored",
5363            "IMPOSSIBLE?!",
5364        )
5365        self.assertEqual(str(exc), "Unknown names: " + repr({"IMPOSSIBLE?!"}))
5366
5367    def testUndecodableName(self):
5368        """
5369        C{STATUS} names that cannot be decoded as ASCII cause the
5370        status Deferred to fail with L{IllegalServerResponse}
5371        """
5372
5373        d = self.client.status("blurdybloop", "MESSAGES")
5374        self.assertEqual(
5375            self.transport.value(),
5376            b"0001 STATUS blurdybloop (MESSAGES)\r\n",
5377        )
5378
5379        self.client.lineReceived(
5380            b"* STATUS blurdybloop " b'(MESSAGES 1 ASCIINAME "OK" NOT\xffASCII "NO")'
5381        )
5382        self.client.lineReceived(b"0001 OK STATUS completed")
5383        self.failureResultOf(d, imap4.IllegalServerResponse)
5384
5385
5386class IMAP4ClientCopyTests(PreauthIMAP4ClientMixin, SynchronousTestCase):
5387    """
5388    Tests for the L{IMAP4Client.copy} method.
5389
5390    An example of the C{COPY} command, which this method implements,
5391    from RFC 3501, section 6.4.7::
5392
5393        C: A003 COPY 2:4 MEETING
5394        S: A003 OK COPY completed
5395    """
5396
5397    clientProtocol = StillSimplerClient
5398
5399    def test_copySequenceNumbers(self):
5400        """
5401        L{IMAP4Client.copy} copies the messages identified by their
5402        sequence numbers to the mailbox, returning a L{Deferred} that
5403        succeeds with a true value.
5404        """
5405        d = self.client.copy("2:3", "MEETING", uid=False)
5406
5407        self.assertEqual(
5408            self.transport.value(),
5409            b"0001 COPY 2:3 MEETING\r\n",
5410        )
5411
5412        self.client.lineReceived(b"0001 OK COPY completed")
5413        self.assertEqual(self.successResultOf(d), ([], b"OK COPY completed"))
5414
5415    def test_copySequenceNumbersFails(self):
5416        """
5417        L{IMAP4Client.copy} returns a L{Deferred} that fails with an
5418        L{IMAP4Exception} when the messages specified by the given
5419        sequence numbers could not be copied to the mailbox.
5420        """
5421        d = self.client.copy("2:3", "MEETING", uid=False)
5422
5423        self.assertEqual(
5424            self.transport.value(),
5425            b"0001 COPY 2:3 MEETING\r\n",
5426        )
5427
5428        self.client.lineReceived(b"0001 BAD COPY failed")
5429        self.assertIsInstance(self.failureResultOf(d).value, imap4.IMAP4Exception)
5430
5431    def test_copyUIDs(self):
5432        """
5433        L{IMAP4Client.copy} copies the messages identified by their
5434        UIDs to the mailbox, returning a L{Deferred} that succeeds
5435        with a true value.
5436        """
5437        d = self.client.copy("2:3", "MEETING", uid=True)
5438
5439        self.assertEqual(
5440            self.transport.value(),
5441            b"0001 UID COPY 2:3 MEETING\r\n",
5442        )
5443
5444        self.client.lineReceived(b"0001 OK COPY completed")
5445        self.assertEqual(self.successResultOf(d), ([], b"OK COPY completed"))
5446
5447    def test_copyUIDsFails(self):
5448        """
5449        L{IMAP4Client.copy} returns a L{Deferred} that fails with an
5450        L{IMAP4Exception} when the messages specified by the given
5451        UIDs could not be copied to the mailbox.
5452        """
5453        d = self.client.copy("2:3", "MEETING", uid=True)
5454
5455        self.assertEqual(
5456            self.transport.value(),
5457            b"0001 UID COPY 2:3 MEETING\r\n",
5458        )
5459
5460        self.client.lineReceived(b"0001 BAD COPY failed")
5461        self.assertIsInstance(self.failureResultOf(d).value, imap4.IMAP4Exception)
5462
5463
5464class FakeyServer(imap4.IMAP4Server):
5465    state = "select"
5466    timeout = None
5467
5468    def sendServerGreeting(self):
5469        pass
5470
5471
5472@implementer(imap4.IMessage)
5473class FakeyMessage(util.FancyStrMixin):
5474    showAttributes = ("headers", "flags", "date", "_body", "uid")
5475
5476    def __init__(self, headers, flags, date, body, uid, subpart):
5477        self.headers = headers
5478        self.flags = flags
5479        self._body = body
5480        self.size = len(body)
5481        self.date = date
5482        self.uid = uid
5483        self.subpart = subpart
5484
5485    def getHeaders(self, negate, *names):
5486        self.got_headers = negate, names
5487        return self.headers
5488
5489    def getFlags(self):
5490        return self.flags
5491
5492    def getInternalDate(self):
5493        return self.date
5494
5495    def getBodyFile(self):
5496        return BytesIO(self._body)
5497
5498    def getSize(self):
5499        return self.size
5500
5501    def getUID(self):
5502        return self.uid
5503
5504    def isMultipart(self):
5505        return self.subpart is not None
5506
5507    def getSubPart(self, part):
5508        self.got_subpart = part
5509        return self.subpart[part]
5510
5511
5512class NewStoreTests(TestCase, IMAP4HelperMixin):
5513    result = None
5514    storeArgs = None
5515
5516    def setUp(self):
5517        self.received_messages = self.received_uid = None
5518
5519        self.server = imap4.IMAP4Server()
5520        self.server.state = "select"
5521        self.server.mbox = self
5522        self.connected = defer.Deferred()
5523        self.client = SimpleClient(self.connected)
5524
5525    def addListener(self, x):
5526        pass
5527
5528    def removeListener(self, x):
5529        pass
5530
5531    def store(self, *args, **kw):
5532        self.storeArgs = args, kw
5533        return self.response
5534
5535    def _storeWork(self):
5536        def connected():
5537            return self.function(self.messages, self.flags, self.silent, self.uid)
5538
5539        def result(R):
5540            self.result = R
5541
5542        self.connected.addCallback(strip(connected)).addCallback(result).addCallback(
5543            self._cbStopClient
5544        ).addErrback(self._ebGeneral)
5545
5546        def check(ignored):
5547            self.assertEqual(self.result, self.expected)
5548            self.assertEqual(self.storeArgs, self.expectedArgs)
5549
5550        d = loopback.loopbackTCP(self.server, self.client, noisy=False)
5551        d.addCallback(check)
5552        return d
5553
5554    def testSetFlags(self, uid=0):
5555        self.function = self.client.setFlags
5556        self.messages = "1,5,9"
5557        self.flags = ["\\A", "\\B", "C"]
5558        self.silent = False
5559        self.uid = uid
5560        self.response = {
5561            1: ["\\A", "\\B", "C"],
5562            5: ["\\A", "\\B", "C"],
5563            9: ["\\A", "\\B", "C"],
5564        }
5565        self.expected = {
5566            1: {"FLAGS": ["\\A", "\\B", "C"]},
5567            5: {"FLAGS": ["\\A", "\\B", "C"]},
5568            9: {"FLAGS": ["\\A", "\\B", "C"]},
5569        }
5570        msg = imap4.MessageSet()
5571        msg.add(1)
5572        msg.add(5)
5573        msg.add(9)
5574        self.expectedArgs = ((msg, ["\\A", "\\B", "C"], 0), {"uid": 0})
5575        return self._storeWork()
5576
5577
5578class GetBodyStructureTests(TestCase):
5579    """
5580    Tests for L{imap4.getBodyStructure}, a helper for constructing a list which
5581    directly corresponds to the wire information needed for a I{BODY} or
5582    I{BODYSTRUCTURE} response.
5583    """
5584
5585    def test_singlePart(self):
5586        """
5587        L{imap4.getBodyStructure} accepts a L{IMessagePart} provider and returns
5588        a list giving the basic fields for the I{BODY} response for that
5589        message.
5590        """
5591        body = b"hello, world"
5592        major = "image"
5593        minor = "jpeg"
5594        charset = "us-ascii"
5595        identifier = "some kind of id"
5596        description = "great justice"
5597        encoding = "maximum"
5598        msg = FakeyMessage(
5599            {
5600                "content-type": major + "/" + minor + "; charset=" + charset + "; x=y",
5601                "content-id": identifier,
5602                "content-description": description,
5603                "content-transfer-encoding": encoding,
5604            },
5605            (),
5606            b"",
5607            body,
5608            123,
5609            None,
5610        )
5611        structure = imap4.getBodyStructure(msg)
5612        self.assertEqual(
5613            [
5614                major,
5615                minor,
5616                ["charset", charset, "x", "y"],
5617                identifier,
5618                description,
5619                encoding,
5620                len(body),
5621            ],
5622            structure,
5623        )
5624
5625    def test_emptyContentType(self):
5626        """
5627        L{imap4.getBodyStructure} returns L{None} for the major and
5628        minor MIME types of a L{IMessagePart} provider whose headers
5629        lack a C{Content-Type}, or have an empty value for it.
5630        """
5631        missing = FakeyMessage({}, (), b"", b"", 123, None)
5632        missingContentTypeStructure = imap4.getBodyStructure(missing)
5633        missingMajor, missingMinor = missingContentTypeStructure[:2]
5634        self.assertIs(None, missingMajor)
5635        self.assertIs(None, missingMinor)
5636
5637        empty = FakeyMessage({"content-type": ""}, (), b"", b"", 123, None)
5638        emptyContentTypeStructure = imap4.getBodyStructure(empty)
5639        emptyMajor, emptyMinor = emptyContentTypeStructure[:2]
5640        self.assertIs(None, emptyMajor)
5641        self.assertIs(None, emptyMinor)
5642
5643        newline = FakeyMessage({"content-type": "\n"}, (), b"", b"", 123, None)
5644        newlineContentTypeStructure = imap4.getBodyStructure(newline)
5645        newlineMajor, newlineMinor = newlineContentTypeStructure[:2]
5646        self.assertIs(None, newlineMajor)
5647        self.assertIs(None, newlineMinor)
5648
5649    def test_onlyMajorContentType(self):
5650        """
5651        L{imap4.getBodyStructure} returns only a non-L{None} major
5652        MIME type for a L{IMessagePart} provider whose headers only
5653        have a main a C{Content-Type}.
5654        """
5655        main = FakeyMessage({"content-type": "main"}, (), b"", b"", 123, None)
5656        mainStructure = imap4.getBodyStructure(main)
5657        mainMajor, mainMinor = mainStructure[:2]
5658        self.assertEqual(mainMajor, "main")
5659        self.assertIs(mainMinor, None)
5660
5661    def test_singlePartExtended(self):
5662        """
5663        L{imap4.getBodyStructure} returns a list giving the basic and extended
5664        fields for a I{BODYSTRUCTURE} response if passed C{True} for the
5665        C{extended} parameter.
5666        """
5667        body = b"hello, world"
5668        major = "image"
5669        minor = "jpeg"
5670        charset = "us-ascii"
5671        identifier = "some kind of id"
5672        description = "great justice"
5673        encoding = "maximum"
5674        md5 = "abcdefabcdef"
5675        msg = FakeyMessage(
5676            {
5677                "content-type": major + "/" + minor + "; charset=" + charset + "; x=y",
5678                "content-id": identifier,
5679                "content-description": description,
5680                "content-transfer-encoding": encoding,
5681                "content-md5": md5,
5682                "content-disposition": "attachment; name=foo; size=bar",
5683                "content-language": "fr",
5684                "content-location": "France",
5685            },
5686            (),
5687            "",
5688            body,
5689            123,
5690            None,
5691        )
5692        structure = imap4.getBodyStructure(msg, extended=True)
5693        self.assertEqual(
5694            [
5695                major,
5696                minor,
5697                ["charset", charset, "x", "y"],
5698                identifier,
5699                description,
5700                encoding,
5701                len(body),
5702                md5,
5703                ["attachment", ["name", "foo", "size", "bar"]],
5704                "fr",
5705                "France",
5706            ],
5707            structure,
5708        )
5709
5710    def test_singlePartWithMissing(self):
5711        """
5712        For fields with no information contained in the message headers,
5713        L{imap4.getBodyStructure} fills in L{None} values in its result.
5714        """
5715        major = "image"
5716        minor = "jpeg"
5717        body = b"hello, world"
5718        msg = FakeyMessage(
5719            {"content-type": major + "/" + minor}, (), b"", body, 123, None
5720        )
5721        structure = imap4.getBodyStructure(msg, extended=True)
5722        self.assertEqual(
5723            [major, minor, None, None, None, None, len(body), None, None, None, None],
5724            structure,
5725        )
5726
5727    def test_textPart(self):
5728        """
5729        For a I{text/*} message, the number of lines in the message body are
5730        included after the common single-part basic fields.
5731        """
5732        body = b"hello, world\nhow are you?\ngoodbye\n"
5733        major = "text"
5734        minor = "jpeg"
5735        charset = "us-ascii"
5736        identifier = "some kind of id"
5737        description = "great justice"
5738        encoding = "maximum"
5739        msg = FakeyMessage(
5740            {
5741                "content-type": major + "/" + minor + "; charset=" + charset + "; x=y",
5742                "content-id": identifier,
5743                "content-description": description,
5744                "content-transfer-encoding": encoding,
5745            },
5746            (),
5747            b"",
5748            body,
5749            123,
5750            None,
5751        )
5752        structure = imap4.getBodyStructure(msg)
5753        self.assertEqual(
5754            [
5755                major,
5756                minor,
5757                ["charset", charset, "x", "y"],
5758                identifier,
5759                description,
5760                encoding,
5761                len(body),
5762                len(body.splitlines()),
5763            ],
5764            structure,
5765        )
5766
5767    def test_rfc822Message(self):
5768        """
5769        For a I{message/rfc822} message, the common basic fields are followed
5770        by information about the contained message.
5771        """
5772        body = b"hello, world\nhow are you?\ngoodbye\n"
5773        major = "text"
5774        minor = "jpeg"
5775        charset = "us-ascii"
5776        identifier = "some kind of id"
5777        description = "great justice"
5778        encoding = "maximum"
5779        msg = FakeyMessage(
5780            {
5781                "content-type": major + "/" + minor + "; charset=" + charset + "; x=y",
5782                "from": "Alice <alice@example.com>",
5783                "to": "Bob <bob@example.com>",
5784                "content-id": identifier,
5785                "content-description": description,
5786                "content-transfer-encoding": encoding,
5787            },
5788            (),
5789            "",
5790            body,
5791            123,
5792            None,
5793        )
5794
5795        container = FakeyMessage(
5796            {
5797                "content-type": "message/rfc822",
5798            },
5799            (),
5800            b"",
5801            b"",
5802            123,
5803            [msg],
5804        )
5805
5806        structure = imap4.getBodyStructure(container)
5807        self.assertEqual(
5808            [
5809                "message",
5810                "rfc822",
5811                None,
5812                None,
5813                None,
5814                None,
5815                0,
5816                imap4.getEnvelope(msg),
5817                imap4.getBodyStructure(msg),
5818                3,
5819            ],
5820            structure,
5821        )
5822
5823    def test_multiPart(self):
5824        """
5825        For a I{multipart/*} message, L{imap4.getBodyStructure} returns a list
5826        containing the body structure information for each part of the message
5827        followed by an element giving the MIME subtype of the message.
5828        """
5829        oneSubPart = FakeyMessage(
5830            {
5831                "content-type": "image/jpeg; x=y",
5832                "content-id": "some kind of id",
5833                "content-description": "great justice",
5834                "content-transfer-encoding": "maximum",
5835            },
5836            (),
5837            b"",
5838            b"hello world",
5839            123,
5840            None,
5841        )
5842
5843        anotherSubPart = FakeyMessage(
5844            {
5845                "content-type": "text/plain; charset=us-ascii",
5846            },
5847            (),
5848            b"",
5849            b"some stuff",
5850            321,
5851            None,
5852        )
5853
5854        container = FakeyMessage(
5855            {
5856                "content-type": "multipart/related",
5857            },
5858            (),
5859            b"",
5860            b"",
5861            555,
5862            [oneSubPart, anotherSubPart],
5863        )
5864
5865        self.assertEqual(
5866            [
5867                imap4.getBodyStructure(oneSubPart),
5868                imap4.getBodyStructure(anotherSubPart),
5869                "related",
5870            ],
5871            imap4.getBodyStructure(container),
5872        )
5873
5874    def test_multiPartExtended(self):
5875        """
5876        When passed a I{multipart/*} message and C{True} for the C{extended}
5877        argument, L{imap4.getBodyStructure} includes extended structure
5878        information from the parts of the multipart message and extended
5879        structure information about the multipart message itself.
5880        """
5881        oneSubPart = FakeyMessage(
5882            {
5883                b"content-type": b"image/jpeg; x=y",
5884                b"content-id": b"some kind of id",
5885                b"content-description": b"great justice",
5886                b"content-transfer-encoding": b"maximum",
5887            },
5888            (),
5889            b"",
5890            b"hello world",
5891            123,
5892            None,
5893        )
5894
5895        anotherSubPart = FakeyMessage(
5896            {
5897                b"content-type": b"text/plain; charset=us-ascii",
5898            },
5899            (),
5900            b"",
5901            b"some stuff",
5902            321,
5903            None,
5904        )
5905
5906        container = FakeyMessage(
5907            {
5908                "content-type": "multipart/related; foo=bar",
5909                "content-language": "es",
5910                "content-location": "Spain",
5911                "content-disposition": "attachment; name=monkeys",
5912            },
5913            (),
5914            b"",
5915            b"",
5916            555,
5917            [oneSubPart, anotherSubPart],
5918        )
5919
5920        self.assertEqual(
5921            [
5922                imap4.getBodyStructure(oneSubPart, extended=True),
5923                imap4.getBodyStructure(anotherSubPart, extended=True),
5924                "related",
5925                ["foo", "bar"],
5926                ["attachment", ["name", "monkeys"]],
5927                "es",
5928                "Spain",
5929            ],
5930            imap4.getBodyStructure(container, extended=True),
5931        )
5932
5933
5934class NewFetchTests(TestCase, IMAP4HelperMixin):
5935    def setUp(self):
5936        self.received_messages = self.received_uid = None
5937        self.result = None
5938
5939        self.server = imap4.IMAP4Server()
5940        self.server.state = "select"
5941        self.server.mbox = self
5942        self.connected = defer.Deferred()
5943        self.client = SimpleClient(self.connected)
5944
5945    def addListener(self, x):
5946        pass
5947
5948    def removeListener(self, x):
5949        pass
5950
5951    def fetch(self, messages, uid):
5952        self.received_messages = messages
5953        self.received_uid = uid
5954        return iter(zip(range(len(self.msgObjs)), self.msgObjs))
5955
5956    def _fetchWork(self, uid):
5957        if uid:
5958            for (i, msg) in zip(range(len(self.msgObjs)), self.msgObjs):
5959                self.expected[i]["UID"] = str(msg.getUID())
5960
5961        def result(R):
5962            self.result = R
5963
5964        self.connected.addCallback(
5965            lambda _: self.function(self.messages, uid)
5966        ).addCallback(result).addCallback(self._cbStopClient).addErrback(
5967            self._ebGeneral
5968        )
5969
5970        d = loopback.loopbackTCP(self.server, self.client, noisy=False)
5971        d.addCallback(lambda x: self.assertEqual(self.result, self.expected))
5972        return d
5973
5974    def testFetchUID(self):
5975        self.function = lambda m, u: self.client.fetchUID(m)
5976
5977        self.messages = "7"
5978        self.msgObjs = [
5979            FakeyMessage({}, (), b"", b"", 12345, None),
5980            FakeyMessage({}, (), b"", b"", 999, None),
5981            FakeyMessage({}, (), b"", b"", 10101, None),
5982        ]
5983        self.expected = {
5984            0: {"UID": "12345"},
5985            1: {"UID": "999"},
5986            2: {"UID": "10101"},
5987        }
5988        return self._fetchWork(0)
5989
5990    def testFetchFlags(self, uid=0):
5991        self.function = self.client.fetchFlags
5992        self.messages = "9"
5993        self.msgObjs = [
5994            FakeyMessage({}, ["FlagA", "FlagB", "\\FlagC"], b"", b"", 54321, None),
5995            FakeyMessage({}, ["\\FlagC", "FlagA", "FlagB"], b"", b"", 12345, None),
5996        ]
5997        self.expected = {
5998            0: {"FLAGS": ["FlagA", "FlagB", "\\FlagC"]},
5999            1: {"FLAGS": ["\\FlagC", "FlagA", "FlagB"]},
6000        }
6001        return self._fetchWork(uid)
6002
6003    def testFetchFlagsUID(self):
6004        return self.testFetchFlags(1)
6005
6006    def testFetchInternalDate(self, uid=0):
6007        self.function = self.client.fetchInternalDate
6008        self.messages = "13"
6009        self.msgObjs = [
6010            FakeyMessage({}, (), b"Fri, 02 Nov 2003 21:25:10 GMT", b"", 23232, None),
6011            FakeyMessage({}, (), b"Thu, 29 Dec 2013 11:31:52 EST", b"", 101, None),
6012            FakeyMessage({}, (), b"Mon, 10 Mar 1992 02:44:30 CST", b"", 202, None),
6013            FakeyMessage({}, (), b"Sat, 11 Jan 2000 14:40:24 PST", b"", 303, None),
6014        ]
6015        self.expected = {
6016            0: {"INTERNALDATE": "02-Nov-2003 21:25:10 +0000"},
6017            1: {"INTERNALDATE": "29-Dec-2013 11:31:52 -0500"},
6018            2: {"INTERNALDATE": "10-Mar-1992 02:44:30 -0600"},
6019            3: {"INTERNALDATE": "11-Jan-2000 14:40:24 -0800"},
6020        }
6021        return self._fetchWork(uid)
6022
6023    def testFetchInternalDateUID(self):
6024        return self.testFetchInternalDate(1)
6025
6026    # if alternate locale is not available, the previous test will be skipped,
6027    # please install this locale for it to run.  Avoid using locale.getlocale
6028    # to learn the current locale; its values don't round-trip well on all
6029    # platforms.  Fortunately setlocale returns a value which does round-trip
6030    # well.
6031    currentLocale = locale.setlocale(locale.LC_ALL, None)
6032    try:
6033        locale.setlocale(locale.LC_ALL, "es_AR.UTF8")
6034    except locale.Error:
6035        noEsARLocale = True
6036    else:
6037        locale.setlocale(locale.LC_ALL, currentLocale)
6038        noEsARLocale = False
6039
6040    @skipIf(noEsARLocale, "The es_AR.UTF8 locale is not installed.")
6041    def test_fetchInternalDateLocaleIndependent(self):
6042        """
6043        The month name in the date is locale independent.
6044        """
6045        # Fake that we're in a language where December is not Dec
6046        currentLocale = locale.setlocale(locale.LC_ALL, None)
6047        locale.setlocale(locale.LC_ALL, "es_AR.UTF8")
6048        self.addCleanup(locale.setlocale, locale.LC_ALL, currentLocale)
6049        return self.testFetchInternalDate(1)
6050
6051    def testFetchEnvelope(self, uid=0):
6052        self.function = self.client.fetchEnvelope
6053        self.messages = "15"
6054        self.msgObjs = [
6055            FakeyMessage(
6056                {
6057                    "from": "user@domain",
6058                    "to": "resu@domain",
6059                    "date": "thursday",
6060                    "subject": "it is a message",
6061                    "message-id": "id-id-id-yayaya",
6062                },
6063                (),
6064                b"",
6065                b"",
6066                65656,
6067                None,
6068            ),
6069        ]
6070        self.expected = {
6071            0: {
6072                "ENVELOPE": [
6073                    "thursday",
6074                    "it is a message",
6075                    [[None, None, "user", "domain"]],
6076                    [[None, None, "user", "domain"]],
6077                    [[None, None, "user", "domain"]],
6078                    [[None, None, "resu", "domain"]],
6079                    None,
6080                    None,
6081                    None,
6082                    "id-id-id-yayaya",
6083                ]
6084            }
6085        }
6086        return self._fetchWork(uid)
6087
6088    def testFetchEnvelopeUID(self):
6089        return self.testFetchEnvelope(1)
6090
6091    def test_fetchBodyStructure(self, uid=0):
6092        """
6093        L{IMAP4Client.fetchBodyStructure} issues a I{FETCH BODYSTRUCTURE}
6094        command and returns a Deferred which fires with a structure giving the
6095        result of parsing the server's response.  The structure is a list
6096        reflecting the parenthesized data sent by the server, as described by
6097        RFC 3501, section 7.4.2.
6098        """
6099        self.function = self.client.fetchBodyStructure
6100        self.messages = "3:9,10:*"
6101        self.msgObjs = [
6102            FakeyMessage(
6103                {
6104                    "content-type": 'text/plain; name=thing; key="value"',
6105                    "content-id": "this-is-the-content-id",
6106                    "content-description": "describing-the-content-goes-here!",
6107                    "content-transfer-encoding": "8BIT",
6108                    "content-md5": "abcdef123456",
6109                    "content-disposition": "attachment; filename=monkeys",
6110                    "content-language": "es",
6111                    "content-location": "http://example.com/monkeys",
6112                },
6113                (),
6114                "",
6115                b"Body\nText\nGoes\nHere\n",
6116                919293,
6117                None,
6118            )
6119        ]
6120        self.expected = {
6121            0: {
6122                "BODYSTRUCTURE": [
6123                    "text",
6124                    "plain",
6125                    ["key", "value", "name", "thing"],
6126                    "this-is-the-content-id",
6127                    "describing-the-content-goes-here!",
6128                    "8BIT",
6129                    "20",
6130                    "4",
6131                    "abcdef123456",
6132                    ["attachment", ["filename", "monkeys"]],
6133                    "es",
6134                    "http://example.com/monkeys",
6135                ]
6136            }
6137        }
6138        return self._fetchWork(uid)
6139
6140    def testFetchBodyStructureUID(self):
6141        """
6142        If passed C{True} for the C{uid} argument, C{fetchBodyStructure} can
6143        also issue a I{UID FETCH BODYSTRUCTURE} command.
6144        """
6145        return self.test_fetchBodyStructure(1)
6146
6147    def test_fetchBodyStructureMultipart(self, uid=0):
6148        """
6149        L{IMAP4Client.fetchBodyStructure} can also parse the response to a
6150        I{FETCH BODYSTRUCTURE} command for a multipart message.
6151        """
6152        self.function = self.client.fetchBodyStructure
6153        self.messages = "3:9,10:*"
6154        innerMessage = FakeyMessage(
6155            {
6156                "content-type": 'text/plain; name=thing; key="value"',
6157                "content-id": "this-is-the-content-id",
6158                "content-description": "describing-the-content-goes-here!",
6159                "content-transfer-encoding": "8BIT",
6160                "content-language": "fr",
6161                "content-md5": "123456abcdef",
6162                "content-disposition": "inline",
6163                "content-location": "outer space",
6164            },
6165            (),
6166            b"",
6167            b"Body\nText\nGoes\nHere\n",
6168            919293,
6169            None,
6170        )
6171        self.msgObjs = [
6172            FakeyMessage(
6173                {
6174                    "content-type": 'multipart/mixed; boundary="xyz"',
6175                    "content-language": "en",
6176                    "content-location": "nearby",
6177                },
6178                (),
6179                b"",
6180                b"",
6181                919293,
6182                [innerMessage],
6183            )
6184        ]
6185        self.expected = {
6186            0: {
6187                "BODYSTRUCTURE": [
6188                    [
6189                        "text",
6190                        "plain",
6191                        ["key", "value", "name", "thing"],
6192                        "this-is-the-content-id",
6193                        "describing-the-content-goes-here!",
6194                        "8BIT",
6195                        "20",
6196                        "4",
6197                        "123456abcdef",
6198                        ["inline", None],
6199                        "fr",
6200                        "outer space",
6201                    ],
6202                    "mixed",
6203                    ["boundary", "xyz"],
6204                    None,
6205                    "en",
6206                    "nearby",
6207                ]
6208            }
6209        }
6210        return self._fetchWork(uid)
6211
6212    def testFetchSimplifiedBody(self, uid=0):
6213        self.function = self.client.fetchSimplifiedBody
6214        self.messages = "21"
6215        self.msgObjs = [
6216            FakeyMessage(
6217                {},
6218                (),
6219                b"",
6220                b"Yea whatever",
6221                91825,
6222                [
6223                    FakeyMessage(
6224                        {"content-type": "image/jpg"},
6225                        (),
6226                        b"",
6227                        b"Body Body Body",
6228                        None,
6229                        None,
6230                    )
6231                ],
6232            )
6233        ]
6234        self.expected = {0: {"BODY": [None, None, None, None, None, None, "12"]}}
6235
6236        return self._fetchWork(uid)
6237
6238    def testFetchSimplifiedBodyUID(self):
6239        return self.testFetchSimplifiedBody(1)
6240
6241    def testFetchSimplifiedBodyText(self, uid=0):
6242        self.function = self.client.fetchSimplifiedBody
6243        self.messages = "21"
6244        self.msgObjs = [
6245            FakeyMessage(
6246                {"content-type": "text/plain"}, (), b"", b"Yea whatever", 91825, None
6247            )
6248        ]
6249        self.expected = {
6250            0: {"BODY": ["text", "plain", None, None, None, None, "12", "1"]}
6251        }
6252
6253        return self._fetchWork(uid)
6254
6255    def testFetchSimplifiedBodyTextUID(self):
6256        return self.testFetchSimplifiedBodyText(1)
6257
6258    def testFetchSimplifiedBodyRFC822(self, uid=0):
6259        self.function = self.client.fetchSimplifiedBody
6260        self.messages = "21"
6261        self.msgObjs = [
6262            FakeyMessage(
6263                {"content-type": "message/rfc822"},
6264                (),
6265                b"",
6266                b"Yea whatever",
6267                91825,
6268                [
6269                    FakeyMessage(
6270                        {"content-type": "image/jpg"},
6271                        (),
6272                        "",
6273                        b"Body Body Body",
6274                        None,
6275                        None,
6276                    )
6277                ],
6278            )
6279        ]
6280        self.expected = {
6281            0: {
6282                "BODY": [
6283                    "message",
6284                    "rfc822",
6285                    None,
6286                    None,
6287                    None,
6288                    None,
6289                    "12",
6290                    [
6291                        None,
6292                        None,
6293                        [[None, None, None]],
6294                        [[None, None, None]],
6295                        None,
6296                        None,
6297                        None,
6298                        None,
6299                        None,
6300                        None,
6301                    ],
6302                    ["image", "jpg", None, None, None, None, "14"],
6303                    "1",
6304                ]
6305            }
6306        }
6307
6308        return self._fetchWork(uid)
6309
6310    def testFetchSimplifiedBodyRFC822UID(self):
6311        return self.testFetchSimplifiedBodyRFC822(1)
6312
6313    def test_fetchSimplifiedBodyMultipart(self):
6314        """
6315        L{IMAP4Client.fetchSimplifiedBody} returns a dictionary mapping message
6316        sequence numbers to fetch responses for the corresponding messages.  In
6317        particular, for a multipart message, the value in the dictionary maps
6318        the string C{"BODY"} to a list giving the body structure information for
6319        that message, in the form of a list of subpart body structure
6320        information followed by the subtype of the message (eg C{"alternative"}
6321        for a I{multipart/alternative} message).  This structure is self-similar
6322        in the case where a subpart is itself multipart.
6323        """
6324        self.function = self.client.fetchSimplifiedBody
6325        self.messages = "21"
6326
6327        # A couple non-multipart messages to use as the inner-most payload
6328        singles = [
6329            FakeyMessage(
6330                {"content-type": "text/plain"}, (), b"date", b"Stuff", 54321, None
6331            ),
6332            FakeyMessage(
6333                {"content-type": "text/html"}, (), b"date", b"Things", 32415, None
6334            ),
6335        ]
6336
6337        # A multipart/alternative message containing the above non-multipart
6338        # messages.  This will be the payload of the outer-most message.
6339        alternative = FakeyMessage(
6340            {"content-type": "multipart/alternative"},
6341            (),
6342            b"",
6343            b"Irrelevant",
6344            12345,
6345            singles,
6346        )
6347
6348        # The outer-most message, also with a multipart type, containing just
6349        # the single middle message.
6350        mixed = FakeyMessage(
6351            # The message is multipart/mixed
6352            {"content-type": "multipart/mixed"},
6353            (),
6354            b"",
6355            b"RootOf",
6356            98765,
6357            [alternative],
6358        )
6359
6360        self.msgObjs = [mixed]
6361
6362        self.expected = {
6363            0: {
6364                "BODY": [
6365                    [
6366                        ["text", "plain", None, None, None, None, "5", "1"],
6367                        ["text", "html", None, None, None, None, "6", "1"],
6368                        "alternative",
6369                    ],
6370                    "mixed",
6371                ]
6372            }
6373        }
6374
6375        return self._fetchWork(False)
6376
6377    def testFetchMessage(self, uid=0):
6378        self.function = self.client.fetchMessage
6379        self.messages = "1,3,7,10101"
6380        self.msgObjs = [
6381            FakeyMessage({"Header": "Value"}, (), b"", b"BODY TEXT\r\n", 91, None),
6382        ]
6383        self.expected = {0: {"RFC822": "Header: Value\r\n\r\nBODY TEXT\r\n"}}
6384        return self._fetchWork(uid)
6385
6386    def testFetchMessageUID(self):
6387        return self.testFetchMessage(1)
6388
6389    def testFetchHeaders(self, uid=0):
6390        self.function = self.client.fetchHeaders
6391        self.messages = "9,6,2"
6392        self.msgObjs = [
6393            FakeyMessage({"H1": "V1", "H2": "V2"}, (), b"", b"", 99, None),
6394        ]
6395
6396        headers = nativeString(imap4._formatHeaders({"H1": "V1", "H2": "V2"}))
6397
6398        self.expected = {
6399            0: {"RFC822.HEADER": headers},
6400        }
6401        return self._fetchWork(uid)
6402
6403    def testFetchHeadersUID(self):
6404        return self.testFetchHeaders(1)
6405
6406    def testFetchBody(self, uid=0):
6407        self.function = self.client.fetchBody
6408        self.messages = "1,2,3,4,5,6,7"
6409        self.msgObjs = [
6410            FakeyMessage({"Header": "Value"}, (), "", b"Body goes here\r\n", 171, None),
6411        ]
6412        self.expected = {
6413            0: {"RFC822.TEXT": "Body goes here\r\n"},
6414        }
6415        return self._fetchWork(uid)
6416
6417    def testFetchBodyUID(self):
6418        return self.testFetchBody(1)
6419
6420    def testFetchBodyParts(self):
6421        """
6422        Test the server's handling of requests for specific body sections.
6423        """
6424        self.function = self.client.fetchSpecific
6425        self.messages = "1"
6426        outerBody = ""
6427        innerBody1 = b"Contained body message text.  Squarge."
6428        innerBody2 = b"Secondary <i>message</i> text of squarge body."
6429        headers = OrderedDict()
6430        headers["from"] = "sender@host"
6431        headers["to"] = "recipient@domain"
6432        headers["subject"] = "booga booga boo"
6433        headers["content-type"] = 'multipart/alternative; boundary="xyz"'
6434        innerHeaders = OrderedDict()
6435        innerHeaders["subject"] = "this is subject text"
6436        innerHeaders["content-type"] = "text/plain"
6437        innerHeaders2 = OrderedDict()
6438        innerHeaders2["subject"] = "<b>this is subject</b>"
6439        innerHeaders2["content-type"] = "text/html"
6440        self.msgObjs = [
6441            FakeyMessage(
6442                headers,
6443                (),
6444                None,
6445                outerBody,
6446                123,
6447                [
6448                    FakeyMessage(innerHeaders, (), None, innerBody1, None, None),
6449                    FakeyMessage(innerHeaders2, (), None, innerBody2, None, None),
6450                ],
6451            )
6452        ]
6453        self.expected = {0: [["BODY", ["1"], "Contained body message text.  Squarge."]]}
6454
6455        def result(R):
6456            self.result = R
6457
6458        self.connected.addCallback(
6459            lambda _: self.function(self.messages, headerNumber=1)
6460        )
6461        self.connected.addCallback(result)
6462        self.connected.addCallback(self._cbStopClient)
6463        self.connected.addErrback(self._ebGeneral)
6464
6465        d = loopback.loopbackTCP(self.server, self.client, noisy=False)
6466        d.addCallback(lambda ign: self.assertEqual(self.result, self.expected))
6467        return d
6468
6469    def test_fetchBodyPartOfNonMultipart(self):
6470        """
6471        Single-part messages have an implicit first part which clients
6472        should be able to retrieve explicitly.  Test that a client
6473        requesting part 1 of a text/plain message receives the body of the
6474        text/plain part.
6475        """
6476        self.function = self.client.fetchSpecific
6477        self.messages = "1"
6478        parts = [1]
6479        outerBody = b"DA body"
6480        headers = OrderedDict()
6481        headers["from"] = "sender@host"
6482        headers["to"] = "recipient@domain"
6483        headers["subject"] = "booga booga boo"
6484        headers["content-type"] = "text/plain"
6485        self.msgObjs = [FakeyMessage(headers, (), None, outerBody, 123, None)]
6486
6487        self.expected = {0: [["BODY", ["1"], "DA body"]]}
6488
6489        def result(R):
6490            self.result = R
6491
6492        self.connected.addCallback(
6493            lambda _: self.function(self.messages, headerNumber=parts)
6494        )
6495        self.connected.addCallback(result)
6496        self.connected.addCallback(self._cbStopClient)
6497        self.connected.addErrback(self._ebGeneral)
6498
6499        d = loopback.loopbackTCP(self.server, self.client, noisy=False)
6500        d.addCallback(lambda ign: self.assertEqual(self.result, self.expected))
6501        return d
6502
6503    def testFetchSize(self, uid=0):
6504        self.function = self.client.fetchSize
6505        self.messages = "1:100,2:*"
6506        self.msgObjs = [
6507            FakeyMessage({}, (), b"", b"x" * 20, 123, None),
6508        ]
6509        self.expected = {
6510            0: {"RFC822.SIZE": "20"},
6511        }
6512        return self._fetchWork(uid)
6513
6514    def testFetchSizeUID(self):
6515        return self.testFetchSize(1)
6516
6517    def testFetchFull(self, uid=0):
6518        self.function = self.client.fetchFull
6519        self.messages = "1,3"
6520        self.msgObjs = [
6521            FakeyMessage(
6522                {},
6523                ("\\XYZ", "\\YZX", "Abc"),
6524                b"Sun, 25 Jul 2010 06:20:30 -0400 (EDT)",
6525                b"xyz" * 2,
6526                654,
6527                None,
6528            ),
6529            FakeyMessage(
6530                {},
6531                ("\\One", "\\Two", "Three"),
6532                b"Mon, 14 Apr 2003 19:43:44 -0400",
6533                b"abc" * 4,
6534                555,
6535                None,
6536            ),
6537        ]
6538        self.expected = {
6539            0: {
6540                "FLAGS": ["\\XYZ", "\\YZX", "Abc"],
6541                "INTERNALDATE": "25-Jul-2010 06:20:30 -0400",
6542                "RFC822.SIZE": "6",
6543                "ENVELOPE": [
6544                    None,
6545                    None,
6546                    [[None, None, None]],
6547                    [[None, None, None]],
6548                    None,
6549                    None,
6550                    None,
6551                    None,
6552                    None,
6553                    None,
6554                ],
6555                "BODY": [None, None, None, None, None, None, "6"],
6556            },
6557            1: {
6558                "FLAGS": ["\\One", "\\Two", "Three"],
6559                "INTERNALDATE": "14-Apr-2003 19:43:44 -0400",
6560                "RFC822.SIZE": "12",
6561                "ENVELOPE": [
6562                    None,
6563                    None,
6564                    [[None, None, None]],
6565                    [[None, None, None]],
6566                    None,
6567                    None,
6568                    None,
6569                    None,
6570                    None,
6571                    None,
6572                ],
6573                "BODY": [None, None, None, None, None, None, "12"],
6574            },
6575        }
6576        return self._fetchWork(uid)
6577
6578    def testFetchFullUID(self):
6579        return self.testFetchFull(1)
6580
6581    def testFetchAll(self, uid=0):
6582        self.function = self.client.fetchAll
6583        self.messages = "1,2:3"
6584        self.msgObjs = [
6585            FakeyMessage(
6586                {}, (), b"Mon, 14 Apr 2003 19:43:44 +0400", b"Lalala", 10101, None
6587            ),
6588            FakeyMessage(
6589                {}, (), b"Tue, 15 Apr 2003 19:43:44 +0200", b"Alalal", 20202, None
6590            ),
6591        ]
6592        self.expected = {
6593            0: {
6594                "ENVELOPE": [
6595                    None,
6596                    None,
6597                    [[None, None, None]],
6598                    [[None, None, None]],
6599                    None,
6600                    None,
6601                    None,
6602                    None,
6603                    None,
6604                    None,
6605                ],
6606                "RFC822.SIZE": "6",
6607                "INTERNALDATE": "14-Apr-2003 19:43:44 +0400",
6608                "FLAGS": [],
6609            },
6610            1: {
6611                "ENVELOPE": [
6612                    None,
6613                    None,
6614                    [[None, None, None]],
6615                    [[None, None, None]],
6616                    None,
6617                    None,
6618                    None,
6619                    None,
6620                    None,
6621                    None,
6622                ],
6623                "RFC822.SIZE": "6",
6624                "INTERNALDATE": "15-Apr-2003 19:43:44 +0200",
6625                "FLAGS": [],
6626            },
6627        }
6628        return self._fetchWork(uid)
6629
6630    def testFetchAllUID(self):
6631        return self.testFetchAll(1)
6632
6633    def testFetchFast(self, uid=0):
6634        self.function = self.client.fetchFast
6635        self.messages = "1"
6636        self.msgObjs = [
6637            FakeyMessage({}, ("\\X",), b"19 Mar 2003 19:22:21 -0500", b"", 9, None),
6638        ]
6639        self.expected = {
6640            0: {
6641                "FLAGS": ["\\X"],
6642                "INTERNALDATE": "19-Mar-2003 19:22:21 -0500",
6643                "RFC822.SIZE": "0",
6644            },
6645        }
6646        return self._fetchWork(uid)
6647
6648    def testFetchFastUID(self):
6649        return self.testFetchFast(1)
6650
6651
6652class DefaultSearchTests(IMAP4HelperMixin, TestCase):
6653    """
6654    Test the behavior of the server's SEARCH implementation, particularly in
6655    the face of unhandled search terms.
6656    """
6657
6658    def setUp(self):
6659        self.server = imap4.IMAP4Server()
6660        self.server.state = "select"
6661        self.server.mbox = self
6662        self.connected = defer.Deferred()
6663        self.client = SimpleClient(self.connected)
6664        self.msgObjs = [
6665            FakeyMessage({}, (), b"", b"", 999, None),
6666            FakeyMessage({}, (), b"", b"", 10101, None),
6667            FakeyMessage({}, (), b"", b"", 12345, None),
6668            FakeyMessage({}, (), b"", b"", 20001, None),
6669            FakeyMessage({}, (), b"", b"", 20002, None),
6670        ]
6671
6672    def fetch(self, messages, uid):
6673        """
6674        Pretend to be a mailbox and let C{self.server} lookup messages on me.
6675        """
6676        return list(zip(range(1, len(self.msgObjs) + 1), self.msgObjs))
6677
6678    def _messageSetSearchTest(self, queryTerms, expectedMessages):
6679        """
6680        Issue a search with given query and verify that the returned messages
6681        match the given expected messages.
6682
6683        @param queryTerms: A string giving the search query.
6684        @param expectedMessages: A list of the message sequence numbers
6685            expected as the result of the search.
6686        @return: A L{Deferred} which fires when the test is complete.
6687        """
6688
6689        def search():
6690            return self.client.search(queryTerms)
6691
6692        d = self.connected.addCallback(strip(search))
6693
6694        def searched(results):
6695            self.assertEqual(results, expectedMessages)
6696
6697        d.addCallback(searched)
6698        d.addCallback(self._cbStopClient)
6699        d.addErrback(self._ebGeneral)
6700        self.loopback()
6701        return d
6702
6703    def test_searchMessageSet(self):
6704        """
6705        Test that a search which starts with a message set properly limits
6706        the search results to messages in that set.
6707        """
6708        return self._messageSetSearchTest("1", [1])
6709
6710    def test_searchMessageSetWithStar(self):
6711        """
6712        If the search filter ends with a star, all the message from the
6713        starting point are returned.
6714        """
6715        return self._messageSetSearchTest("2:*", [2, 3, 4, 5])
6716
6717    def test_searchMessageSetWithStarFirst(self):
6718        """
6719        If the search filter starts with a star, the result should be identical
6720        with if the filter would end with a star.
6721        """
6722        return self._messageSetSearchTest("*:2", [2, 3, 4, 5])
6723
6724    def test_searchMessageSetUIDWithStar(self):
6725        """
6726        If the search filter ends with a star, all the message from the
6727        starting point are returned (also for the SEARCH UID case).
6728        """
6729        return self._messageSetSearchTest("UID 10000:*", [2, 3, 4, 5])
6730
6731    def test_searchMessageSetUIDWithStarFirst(self):
6732        """
6733        If the search filter starts with a star, the result should be identical
6734        with if the filter would end with a star (also for the SEARCH UID case).
6735        """
6736        return self._messageSetSearchTest("UID *:10000", [2, 3, 4, 5])
6737
6738    def test_searchMessageSetUIDWithStarAndHighStart(self):
6739        """
6740        A search filter of 1234:* should include the UID of the last message in
6741        the mailbox, even if its UID is less than 1234.
6742        """
6743        # in our fake mbox the highest message UID is 20002
6744        return self._messageSetSearchTest("UID 30000:*", [5])
6745
6746    def test_searchMessageSetWithList(self):
6747        """
6748        If the search filter contains nesting terms, one of which includes a
6749        message sequence set with a wildcard, IT ALL WORKS GOOD.
6750        """
6751        # 6 is bigger than the biggest message sequence number, but that's
6752        # okay, because N:* includes the biggest message sequence number even
6753        # if N is bigger than that (read the rfc nub).
6754        return self._messageSetSearchTest("(6:*)", [5])
6755
6756    def test_searchOr(self):
6757        """
6758        If the search filter contains an I{OR} term, all messages
6759        which match either subexpression are returned.
6760        """
6761        return self._messageSetSearchTest("OR 1 2", [1, 2])
6762
6763    def test_searchOrMessageSet(self):
6764        """
6765        If the search filter contains an I{OR} term with a
6766        subexpression which includes a message sequence set wildcard,
6767        all messages in that set are considered for inclusion in the
6768        results.
6769        """
6770        return self._messageSetSearchTest("OR 2:* 2:*", [2, 3, 4, 5])
6771
6772    def test_searchNot(self):
6773        """
6774        If the search filter contains a I{NOT} term, all messages
6775        which do not match the subexpression are returned.
6776        """
6777        return self._messageSetSearchTest("NOT 3", [1, 2, 4, 5])
6778
6779    def test_searchNotMessageSet(self):
6780        """
6781        If the search filter contains a I{NOT} term with a
6782        subexpression which includes a message sequence set wildcard,
6783        no messages in that set are considered for inclusion in the
6784        result.
6785        """
6786        return self._messageSetSearchTest("NOT 2:*", [1])
6787
6788    def test_searchAndMessageSet(self):
6789        """
6790        If the search filter contains multiple terms implicitly
6791        conjoined with a message sequence set wildcard, only the
6792        intersection of the results of each term are returned.
6793        """
6794        return self._messageSetSearchTest("2:* 3", [3])
6795
6796    def test_searchInvalidCriteria(self):
6797        """
6798        If the search criteria is not a valid key, a NO result is returned to
6799        the client (resulting in an error callback), and an IllegalQueryError is
6800        logged on the server side.
6801        """
6802        queryTerms = "FOO"
6803
6804        def search():
6805            return self.client.search(queryTerms)
6806
6807        d = self.connected.addCallback(strip(search))
6808        d = self.assertFailure(d, imap4.IMAP4Exception)
6809
6810        def errorReceived(results):
6811            """
6812            Verify that the server logs an IllegalQueryError and the
6813            client raises an IMAP4Exception with 'Search failed:...'
6814            """
6815            self.client.transport.loseConnection()
6816            self.server.transport.loseConnection()
6817
6818            # Check what the server logs
6819            errors = self.flushLoggedErrors(imap4.IllegalQueryError)
6820            self.assertEqual(len(errors), 1)
6821
6822            # Verify exception given to client has the correct message
6823            self.assertEqual(
6824                str(b"SEARCH failed: Invalid search command FOO"),
6825                str(results),
6826            )
6827
6828        d.addCallback(errorReceived)
6829        d.addErrback(self._ebGeneral)
6830        self.loopback()
6831        return d
6832
6833
6834@implementer(imap4.ISearchableMailbox)
6835class FetchSearchStoreTests(TestCase, IMAP4HelperMixin):
6836    def setUp(self):
6837        self.expected = self.result = None
6838        self.server_received_query = None
6839        self.server_received_uid = None
6840        self.server_received_parts = None
6841        self.server_received_messages = None
6842
6843        self.server = imap4.IMAP4Server()
6844        self.server.state = "select"
6845        self.server.mbox = self
6846        self.connected = defer.Deferred()
6847        self.client = SimpleClient(self.connected)
6848
6849    def search(self, query, uid):
6850        # Look for a specific bad query, so we can verify we handle it properly
6851        if query == [b"FOO"]:
6852            raise imap4.IllegalQueryError("FOO is not a valid search criteria")
6853
6854        self.server_received_query = query
6855        self.server_received_uid = uid
6856        return self.expected
6857
6858    def addListener(self, *a, **kw):
6859        pass
6860
6861    removeListener = addListener
6862
6863    def _searchWork(self, uid):
6864        def search():
6865            return self.client.search(self.query, uid=uid)
6866
6867        def result(R):
6868            self.result = R
6869
6870        self.connected.addCallback(strip(search)).addCallback(result).addCallback(
6871            self._cbStopClient
6872        ).addErrback(self._ebGeneral)
6873
6874        def check(ignored):
6875            # Ensure no short-circuiting weirdness is going on
6876            self.assertFalse(self.result is self.expected)
6877
6878            self.assertEqual(self.result, self.expected)
6879            self.assertEqual(self.uid, self.server_received_uid)
6880            self.assertEqual(
6881                # Queries should be decoded as ASCII unless a charset
6882                # identifier is provided.  See #9201.
6883                imap4.parseNestedParens(self.query.encode("charmap")),
6884                self.server_received_query,
6885            )
6886
6887        d = loopback.loopbackTCP(self.server, self.client, noisy=False)
6888        d.addCallback(check)
6889        return d
6890
6891    def testSearch(self):
6892        self.query = imap4.Or(
6893            imap4.Query(header=("subject", "substring")),
6894            imap4.Query(larger=1024, smaller=4096),
6895        )
6896        self.expected = [1, 4, 5, 7]
6897        self.uid = 0
6898        return self._searchWork(0)
6899
6900    def testUIDSearch(self):
6901        self.query = imap4.Or(
6902            imap4.Query(header=("subject", "substring")),
6903            imap4.Query(larger=1024, smaller=4096),
6904        )
6905        self.uid = 1
6906        self.expected = [1, 2, 3]
6907        return self._searchWork(1)
6908
6909    def getUID(self, msg):
6910        try:
6911            return self.expected[msg]["UID"]
6912        except (TypeError, IndexError):
6913            return self.expected[msg - 1]
6914        except KeyError:
6915            return 42
6916
6917    def fetch(self, messages, uid):
6918        self.server_received_uid = uid
6919        self.server_received_messages = str(messages)
6920        return self.expected
6921
6922    def _fetchWork(self, fetch):
6923        def result(R):
6924            self.result = R
6925
6926        self.connected.addCallback(strip(fetch)).addCallback(result).addCallback(
6927            self._cbStopClient
6928        ).addErrback(self._ebGeneral)
6929
6930        def check(ignored):
6931            # Ensure no short-circuiting weirdness is going on
6932            self.assertFalse(self.result is self.expected)
6933
6934            self.parts and self.parts.sort()
6935            self.server_received_parts and self.server_received_parts.sort()
6936
6937            if self.uid:
6938                for (k, v) in self.expected.items():
6939                    v["UID"] = str(k)
6940
6941            self.assertEqual(self.result, self.expected)
6942            self.assertEqual(self.uid, self.server_received_uid)
6943            self.assertEqual(self.parts, self.server_received_parts)
6944            self.assertEqual(
6945                imap4.parseIdList(self.messages),
6946                imap4.parseIdList(self.server_received_messages),
6947            )
6948
6949        d = loopback.loopbackTCP(self.server, self.client, noisy=False)
6950        d.addCallback(check)
6951        return d
6952
6953    def test_invalidTerm(self):
6954        """
6955        If, as part of a search, an ISearchableMailbox raises an
6956        IllegalQueryError (e.g. due to invalid search criteria), client sees a
6957        failure response, and an IllegalQueryError is logged on the server.
6958        """
6959        query = "FOO"
6960
6961        def search():
6962            return self.client.search(query)
6963
6964        d = self.connected.addCallback(strip(search))
6965        d = self.assertFailure(d, imap4.IMAP4Exception)
6966
6967        def errorReceived(results):
6968            """
6969            Verify that the server logs an IllegalQueryError and the
6970            client raises an IMAP4Exception with 'Search failed:...'
6971            """
6972            self.client.transport.loseConnection()
6973            self.server.transport.loseConnection()
6974
6975            # Check what the server logs
6976            errors = self.flushLoggedErrors(imap4.IllegalQueryError)
6977            self.assertEqual(len(errors), 1)
6978
6979            # Verify exception given to client has the correct message
6980            self.assertEqual(
6981                str(b"SEARCH failed: FOO is not a valid search criteria"), str(results)
6982            )
6983
6984        d.addCallback(errorReceived)
6985        d.addErrback(self._ebGeneral)
6986        self.loopback()
6987        return d
6988
6989
6990class FakeMailbox:
6991    def __init__(self):
6992        self.args = []
6993
6994    def addMessage(self, body, flags, date):
6995        self.args.append((body, flags, date))
6996        return defer.succeed(None)
6997
6998
6999@implementer(imap4.IMessageFile)
7000class FeaturefulMessage:
7001    def getFlags(self):
7002        return "flags"
7003
7004    def getInternalDate(self):
7005        return "internaldate"
7006
7007    def open(self):
7008        return BytesIO(b"open")
7009
7010
7011@implementer(imap4.IMessageCopier)
7012class MessageCopierMailbox:
7013    def __init__(self):
7014        self.msgs = []
7015
7016    def copy(self, msg):
7017        self.msgs.append(msg)
7018        return len(self.msgs)
7019
7020
7021class CopyWorkerTests(TestCase):
7022    def testFeaturefulMessage(self):
7023        s = imap4.IMAP4Server()
7024
7025        # Yes.  I am grabbing this uber-non-public method to test it.
7026        # It is complex.  It needs to be tested directly!
7027        # Perhaps it should be refactored, simplified, or split up into
7028        # not-so-private components, but that is a task for another day.
7029
7030        # Ha ha! Addendum!  Soon it will be split up, and this test will
7031        # be re-written to just use the default adapter for IMailbox to
7032        # IMessageCopier and call .copy on that adapter.
7033        f = s._IMAP4Server__cbCopy
7034
7035        m = FakeMailbox()
7036        d = f([(i, FeaturefulMessage()) for i in range(1, 11)], "tag", m)
7037
7038        def cbCopy(results):
7039            for a in m.args:
7040                self.assertEqual(a[0].read(), b"open")
7041                self.assertEqual(a[1], "flags")
7042                self.assertEqual(a[2], "internaldate")
7043
7044            for (status, result) in results:
7045                self.assertTrue(status)
7046                self.assertEqual(result, None)
7047
7048        return d.addCallback(cbCopy)
7049
7050    def testUnfeaturefulMessage(self):
7051        s = imap4.IMAP4Server()
7052
7053        # See above comment
7054        f = s._IMAP4Server__cbCopy
7055
7056        m = FakeMailbox()
7057        msgs = [
7058            FakeyMessage(
7059                {"Header-Counter": str(i)}, (), b"Date", b"Body %d" % (i,), i + 10, None
7060            )
7061            for i in range(1, 11)
7062        ]
7063        d = f([im for im in zip(range(1, 11), msgs)], "tag", m)
7064
7065        def cbCopy(results):
7066            seen = []
7067            for a in m.args:
7068                seen.append(a[0].read())
7069                self.assertEqual(a[1], ())
7070                self.assertEqual(a[2], b"Date")
7071
7072            seen.sort()
7073            exp = sorted(
7074                b"Header-Counter: %d\r\n\r\nBody %d" % (i, i) for i in range(1, 11)
7075            )
7076            self.assertEqual(seen, exp)
7077
7078            for (status, result) in results:
7079                self.assertTrue(status)
7080                self.assertEqual(result, None)
7081
7082        return d.addCallback(cbCopy)
7083
7084    def testMessageCopier(self):
7085        s = imap4.IMAP4Server()
7086
7087        # See above comment
7088        f = s._IMAP4Server__cbCopy
7089
7090        m = MessageCopierMailbox()
7091        msgs = [object() for i in range(1, 11)]
7092        d = f([im for im in zip(range(1, 11), msgs)], b"tag", m)
7093
7094        def cbCopy(results):
7095            self.assertEqual(results, list(zip([1] * 10, range(1, 11))))
7096            for (orig, new) in zip(msgs, m.msgs):
7097                self.assertIdentical(orig, new)
7098
7099        return d.addCallback(cbCopy)
7100
7101
7102@skipIf(not ClientTLSContext, "OpenSSL not present")
7103@skipIf(not interfaces.IReactorSSL(reactor, None), "Reactor doesn't support SSL")
7104class TLSTests(IMAP4HelperMixin, TestCase):
7105    serverCTX = None
7106    clientCTX = None
7107    if ServerTLSContext:
7108        serverCTX = ServerTLSContext()
7109    if ClientTLSContext:
7110        clientCTX = ClientTLSContext()
7111
7112    def loopback(self):
7113        return loopback.loopbackTCP(self.server, self.client, noisy=False)
7114
7115    def testAPileOfThings(self):
7116        SimpleServer.theAccount.addMailbox(b"inbox")
7117        called = []
7118
7119        def login():
7120            called.append(None)
7121            return self.client.login(b"testuser", b"password-test")
7122
7123        def list():
7124            called.append(None)
7125            return self.client.list(b"inbox", b"%")
7126
7127        def status():
7128            called.append(None)
7129            return self.client.status(b"inbox", "UIDNEXT")
7130
7131        def examine():
7132            called.append(None)
7133            return self.client.examine(b"inbox")
7134
7135        def logout():
7136            called.append(None)
7137            return self.client.logout()
7138
7139        self.client.requireTransportSecurity = True
7140
7141        methods = [login, list, status, examine, logout]
7142        for method in methods:
7143            self.connected.addCallback(strip(method))
7144
7145        self.connected.addCallbacks(self._cbStopClient, self._ebGeneral)
7146
7147        def check(ignored):
7148            self.assertEqual(self.server.startedTLS, True)
7149            self.assertEqual(self.client.startedTLS, True)
7150            self.assertEqual(len(called), len(methods))
7151
7152        d = self.loopback()
7153        d.addCallback(check)
7154        return d
7155
7156    def testLoginLogin(self):
7157        self.server.checker.addUser(b"testuser", b"password-test")
7158        success = []
7159        self.client.registerAuthenticator(imap4.LOGINAuthenticator(b"testuser"))
7160        self.connected.addCallback(
7161            lambda _: self.client.authenticate(b"password-test")
7162        ).addCallback(lambda _: self.client.logout()).addCallback(
7163            success.append
7164        ).addCallback(
7165            self._cbStopClient
7166        ).addErrback(
7167            self._ebGeneral
7168        )
7169
7170        d = self.loopback()
7171        d.addCallback(lambda x: self.assertEqual(len(success), 1))
7172        return d
7173
7174    def startTLSAndAssertSession(self):
7175        """
7176        Begin a C{STARTTLS} sequence and assert that it results in a
7177        TLS session.
7178
7179        @return: A L{Deferred} that fires when the underlying
7180            connection between the client and server has been terminated.
7181        """
7182        success = []
7183        self.connected.addCallback(strip(self.client.startTLS))
7184
7185        def checkSecure(ignored):
7186            self.assertTrue(interfaces.ISSLTransport.providedBy(self.client.transport))
7187
7188        self.connected.addCallback(checkSecure)
7189        self.connected.addCallback(success.append)
7190
7191        d = self.loopback()
7192        d.addCallback(lambda x: self.assertTrue(success))
7193        return defer.gatherResults([d, self.connected])
7194
7195    def test_startTLS(self):
7196        """
7197        L{IMAP4Client.startTLS} triggers TLS negotiation and returns a
7198        L{Deferred} which fires after the client's transport is using
7199        encryption.
7200        """
7201        disconnected = self.startTLSAndAssertSession()
7202        self.connected.addCallback(self._cbStopClient)
7203        self.connected.addErrback(self._ebGeneral)
7204        return disconnected
7205
7206    def test_doubleSTARTTLS(self):
7207        """
7208        A server that receives a second C{STARTTLS} sends a C{NO}
7209        response.
7210        """
7211
7212        class DoubleSTARTTLSClient(SimpleClient):
7213            def startTLS(self):
7214                if not self.startedTLS:
7215                    return SimpleClient.startTLS(self)
7216
7217                return self.sendCommand(imap4.Command(b"STARTTLS"))
7218
7219        self.client = DoubleSTARTTLSClient(
7220            self.connected, contextFactory=self.clientCTX
7221        )
7222
7223        disconnected = self.startTLSAndAssertSession()
7224
7225        self.connected.addCallback(strip(self.client.startTLS))
7226        self.connected.addErrback(
7227            self.assertClientFailureMessage, b"TLS already negotiated"
7228        )
7229
7230        self.connected.addCallback(self._cbStopClient)
7231        self.connected.addErrback(self._ebGeneral)
7232
7233        return disconnected
7234
7235    def test_startTLSWithExistingChallengers(self):
7236        """
7237        Starting a TLS negotiation with an L{IMAP4Server} that already
7238        has C{LOGIN} and C{PLAIN} L{IChallengeResponse} factories uses
7239        those factories.
7240        """
7241        self.server.challengers = {
7242            b"LOGIN": imap4.LOGINCredentials,
7243            b"PLAIN": imap4.PLAINCredentials,
7244        }
7245
7246        @defer.inlineCallbacks
7247        def assertLOGINandPLAIN():
7248            capabilities = yield self.client.getCapabilities()
7249            self.assertIn(b"AUTH", capabilities)
7250            self.assertIn(b"LOGIN", capabilities[b"AUTH"])
7251            self.assertIn(b"PLAIN", capabilities[b"AUTH"])
7252
7253        self.connected.addCallback(strip(assertLOGINandPLAIN))
7254
7255        disconnected = self.startTLSAndAssertSession()
7256
7257        self.connected.addCallback(strip(assertLOGINandPLAIN))
7258
7259        self.connected.addCallback(self._cbStopClient)
7260        self.connected.addErrback(self._ebGeneral)
7261
7262        return disconnected
7263
7264    def test_loginBeforeSTARTTLS(self):
7265        """
7266        A client that attempts to log in before issuing the
7267        C{STARTTLS} command receives a C{NO} response.
7268        """
7269        # Prevent the client from issuing STARTTLS.
7270        self.client.startTLS = lambda: defer.succeed(
7271            ([], "OK Begin TLS negotiation now")
7272        )
7273        self.connected.addCallback(
7274            lambda _: self.client.login(b"wrong", b"time"),
7275        )
7276
7277        self.connected.addErrback(
7278            self.assertClientFailureMessage,
7279            b"LOGIN is disabled before STARTTLS",
7280        )
7281
7282        self.connected.addCallback(self._cbStopClient)
7283        self.connected.addErrback(self._ebGeneral)
7284
7285        return defer.gatherResults([self.loopback(), self.connected])
7286
7287    def testFailedStartTLS(self):
7288        failures = []
7289
7290        def breakServerTLS(ign):
7291            self.server.canStartTLS = False
7292
7293        self.connected.addCallback(breakServerTLS)
7294        self.connected.addCallback(lambda ign: self.client.startTLS())
7295        self.connected.addErrback(
7296            lambda err: failures.append(err.trap(imap4.IMAP4Exception))
7297        )
7298        self.connected.addCallback(self._cbStopClient)
7299        self.connected.addErrback(self._ebGeneral)
7300
7301        def check(ignored):
7302            self.assertTrue(failures)
7303            self.assertIdentical(failures[0], imap4.IMAP4Exception)
7304
7305        return self.loopback().addCallback(check)
7306
7307
7308class SlowMailbox(SimpleMailbox):
7309    howSlow = 2
7310    callLater = None
7311    fetchDeferred = None
7312
7313    # Not a very nice implementation of fetch(), but it'll
7314    # do for the purposes of testing.
7315    def fetch(self, messages, uid):
7316        d = defer.Deferred()
7317        self.callLater(self.howSlow, d.callback, ())
7318        self.fetchDeferred.callback(None)
7319        return d
7320
7321
7322class TimeoutTests(IMAP4HelperMixin, TestCase):
7323    def test_serverTimeout(self):
7324        """
7325        The *client* has a timeout mechanism which will close connections that
7326        are inactive for a period.
7327        """
7328        c = Clock()
7329        self.server.timeoutTest = True
7330        self.client.timeout = 5  # seconds
7331        self.client.callLater = c.callLater
7332        self.selectedArgs = None
7333
7334        def login():
7335            d = self.client.login(b"testuser", b"password-test")
7336            c.advance(5)
7337            d.addErrback(timedOut)
7338            return d
7339
7340        def timedOut(failure):
7341            self._cbStopClient(None)
7342            failure.trap(error.TimeoutError)
7343
7344        d = self.connected.addCallback(strip(login))
7345        d.addErrback(self._ebGeneral)
7346        return defer.gatherResults([d, self.loopback()])
7347
7348    def test_serverTimesOut(self):
7349        """
7350        The server times out a connection.
7351        """
7352        c = Clock()
7353        self.server.callLater = c.callLater
7354
7355        def login():
7356            return self.client.login(b"testuser", b"password-test")
7357
7358        def expireTime():
7359            c.advance(self.server.POSTAUTH_TIMEOUT * 2)
7360
7361        d = self.connected.addCallback(strip(login))
7362        d.addCallback(strip(expireTime))
7363
7364        # The loopback method's Deferred fires the connection is
7365        # closed, and the server closes the connection as a result of
7366        # expireTime.
7367        return defer.gatherResults([d, self.loopback()])
7368
7369    def test_serverUnselectsMailbox(self):
7370        """
7371        The server unsets the selected mailbox when timing out a
7372        connection.
7373        """
7374        self.patch(SimpleServer.theAccount, "mailboxFactory", UncloseableMailbox)
7375        SimpleServer.theAccount.addMailbox("mailbox-test")
7376        mbox = SimpleServer.theAccount.mailboxes["MAILBOX-TEST"]
7377        self.assertFalse(ICloseableMailboxIMAP.providedBy(mbox))
7378
7379        c = Clock()
7380        self.server.callLater = c.callLater
7381
7382        def login():
7383            return self.client.login(b"testuser", b"password-test")
7384
7385        def select():
7386            return self.client.select("mailbox-test")
7387
7388        def assertSet():
7389            self.assertIs(mbox, self.server.mbox)
7390
7391        def expireTime():
7392            c.advance(self.server.POSTAUTH_TIMEOUT * 2)
7393
7394        def assertUnset():
7395            self.assertFalse(self.server.mbox)
7396
7397        d = self.connected.addCallback(strip(login))
7398        d.addCallback(strip(select))
7399        d.addCallback(strip(assertSet))
7400        d.addCallback(strip(expireTime))
7401        d.addCallback(strip(assertUnset))
7402
7403        # The loopback method's Deferred fires the connection is
7404        # closed, and the server closes the connection as a result of
7405        # expireTime.
7406        return defer.gatherResults([d, self.loopback()])
7407
7408    def test_serverTimesOutAndClosesMailbox(self):
7409        """
7410        The server closes the selected, closeable mailbox when timing
7411        out a connection.
7412        """
7413        SimpleServer.theAccount.addMailbox("mailbox-test")
7414        mbox = SimpleServer.theAccount.mailboxes["MAILBOX-TEST"]
7415        verifyObject(ICloseableMailboxIMAP, mbox)
7416
7417        c = Clock()
7418        self.server.callLater = c.callLater
7419
7420        def login():
7421            return self.client.login(b"testuser", b"password-test")
7422
7423        def select():
7424            return self.client.select("mailbox-test")
7425
7426        def assertMailboxOpen():
7427            self.assertFalse(mbox.closed)
7428
7429        def expireTime():
7430            c.advance(self.server.POSTAUTH_TIMEOUT * 2)
7431
7432        def assertMailboxClosed():
7433            self.assertTrue(mbox.closed)
7434
7435        d = self.connected.addCallback(strip(login))
7436        d.addCallback(strip(select))
7437        d.addCallback(strip(assertMailboxOpen))
7438        d.addCallback(strip(expireTime))
7439        d.addCallback(strip(assertMailboxClosed))
7440
7441        # The loopback method's Deferred fires the connection is
7442        # closed, and the server closes the connection as a result of
7443        # expireTime.
7444        return defer.gatherResults([d, self.loopback()])
7445
7446    def test_longFetchDoesntTimeout(self):
7447        """
7448        The connection timeout does not take effect during fetches.
7449        """
7450        c = Clock()
7451        SlowMailbox.callLater = c.callLater
7452        SlowMailbox.fetchDeferred = defer.Deferred()
7453        self.server.callLater = c.callLater
7454        SimpleServer.theAccount.mailboxFactory = SlowMailbox
7455        SimpleServer.theAccount.addMailbox("mailbox-test")
7456
7457        self.server.setTimeout(1)
7458
7459        def login():
7460            return self.client.login(b"testuser", b"password-test")
7461
7462        def select():
7463            self.server.setTimeout(1)
7464            return self.client.select("mailbox-test")
7465
7466        def fetch():
7467            return self.client.fetchUID("1:*")
7468
7469        def stillConnected():
7470            self.assertNotEqual(self.server.state, "timeout")
7471
7472        def cbAdvance(ignored):
7473            for i in range(4):
7474                c.advance(0.5)
7475
7476        SlowMailbox.fetchDeferred.addCallback(cbAdvance)
7477
7478        d1 = self.connected.addCallback(strip(login))
7479        d1.addCallback(strip(select))
7480        d1.addCallback(strip(fetch))
7481        d1.addCallback(strip(stillConnected))
7482        d1.addCallback(self._cbStopClient)
7483        d1.addErrback(self._ebGeneral)
7484        d = defer.gatherResults([d1, self.loopback()])
7485        return d
7486
7487    def test_idleClientDoesDisconnect(self):
7488        """
7489        The *server* has a timeout mechanism which will close connections that
7490        are inactive for a period.
7491        """
7492        c = Clock()
7493        # Hook up our server protocol
7494        transport = StringTransportWithDisconnection()
7495        transport.protocol = self.server
7496        self.server.callLater = c.callLater
7497        self.server.makeConnection(transport)
7498
7499        # Make sure we can notice when the connection goes away
7500        lost = []
7501        connLost = self.server.connectionLost
7502        self.server.connectionLost = lambda reason: (
7503            lost.append(None),
7504            connLost(reason),
7505        )[1]
7506
7507        # 2/3rds of the idle timeout elapses...
7508        c.pump([0.0] + [self.server.timeOut / 3.0] * 2)
7509        self.assertFalse(lost, lost)
7510
7511        # Now some more
7512        c.pump([0.0, self.server.timeOut / 2.0])
7513        self.assertTrue(lost)
7514
7515
7516class DisconnectionTests(TestCase):
7517    def testClientDisconnectFailsDeferreds(self):
7518        c = imap4.IMAP4Client()
7519        t = StringTransportWithDisconnection()
7520        c.makeConnection(t)
7521        d = self.assertFailure(
7522            c.login(b"testuser", "example.com"), error.ConnectionDone
7523        )
7524        c.connectionLost(error.ConnectionDone("Connection closed"))
7525        return d
7526
7527
7528class SynchronousMailbox:
7529    """
7530    Trivial, in-memory mailbox implementation which can produce a message
7531    synchronously.
7532    """
7533
7534    def __init__(self, messages):
7535        self.messages = messages
7536
7537    def fetch(self, msgset, uid):
7538        assert not uid, "Cannot handle uid requests."
7539        for msg in msgset:
7540            yield msg, self.messages[msg - 1]
7541
7542
7543class PipeliningTests(TestCase):
7544    """
7545    Tests for various aspects of the IMAP4 server's pipelining support.
7546    """
7547
7548    messages = [
7549        FakeyMessage({}, [], b"", b"0", None, None),
7550        FakeyMessage({}, [], b"", b"1", None, None),
7551        FakeyMessage({}, [], b"", b"2", None, None),
7552    ]
7553
7554    def setUp(self):
7555        self.iterators = []
7556
7557        self.transport = StringTransport()
7558        self.server = imap4.IMAP4Server(None, None, self.iterateInReactor)
7559        self.server.makeConnection(self.transport)
7560
7561        mailbox = SynchronousMailbox(self.messages)
7562
7563        # Skip over authentication and folder selection
7564        self.server.state = "select"
7565        self.server.mbox = mailbox
7566
7567        # Get rid of any greeting junk
7568        self.transport.clear()
7569
7570    def iterateInReactor(self, iterator):
7571        """
7572        A fake L{imap4.iterateInReactor} that records the iterators it
7573        receives.
7574
7575        @param iterator: An iterator.
7576
7577        @return: A L{Deferred} associated with this iterator.
7578        """
7579        d = defer.Deferred()
7580        self.iterators.append((iterator, d))
7581        return d
7582
7583    def flushPending(self, asLongAs=lambda: True):
7584        """
7585        Advance pending iterators enqueued with L{iterateInReactor} in
7586        a round-robin fashion, resuming the transport's producer until
7587        it has completed.  This ensures bodies are flushed.
7588
7589        @param asLongAs: (optional) An optional predicate function.
7590            Flushing iterators continues as long as there are
7591            iterators and this returns L{True}.
7592        """
7593        while self.iterators and asLongAs():
7594            for e in self.iterators[0][0]:
7595                while self.transport.producer:
7596                    self.transport.producer.resumeProducing()
7597            else:
7598                self.iterators.pop(0)[1].callback(None)
7599
7600    def tearDown(self):
7601        self.server.connectionLost(failure.Failure(error.ConnectionDone()))
7602
7603    def test_synchronousFetch(self):
7604        """
7605        Test that pipelined FETCH commands which can be responded to
7606        synchronously are responded to correctly.
7607        """
7608        # Here's some pipelined stuff
7609        self.server.dataReceived(
7610            b"01 FETCH 1 BODY[]\r\n" b"02 FETCH 2 BODY[]\r\n" b"03 FETCH 3 BODY[]\r\n"
7611        )
7612
7613        self.flushPending()
7614
7615        self.assertEqual(
7616            self.transport.value(),
7617            b"".join(
7618                [
7619                    b"* 1 FETCH (BODY[] )\r\n",
7620                    networkString(
7621                        "01 OK FETCH completed\r\n{5}\r\n\r\n\r\n%s"
7622                        % (nativeString(self.messages[0].getBodyFile().read()),)
7623                    ),
7624                    b"* 2 FETCH (BODY[] )\r\n",
7625                    networkString(
7626                        "02 OK FETCH completed\r\n{5}\r\n\r\n\r\n%s"
7627                        % (nativeString(self.messages[1].getBodyFile().read()),)
7628                    ),
7629                    b"* 3 FETCH (BODY[] )\r\n",
7630                    networkString(
7631                        "03 OK FETCH completed\r\n{5}\r\n\r\n\r\n%s"
7632                        % (nativeString(self.messages[2].getBodyFile().read()),)
7633                    ),
7634                ]
7635            ),
7636        )
7637
7638    def test_bufferedServerStatus(self):
7639        """
7640        When a server status change occurs during an ongoing FETCH
7641        command, the server status is buffered until the FETCH
7642        completes.
7643        """
7644        self.server.dataReceived(b"01 FETCH 1,2 BODY[]\r\n")
7645
7646        # Two iterations yields the untagged response and the first
7647        # fetched message's body
7648        twice = functools.partial(next, iter([True, True, False]))
7649        self.flushPending(asLongAs=twice)
7650
7651        self.assertEqual(
7652            self.transport.value(),
7653            b"".join(
7654                [
7655                    # The untagged response...
7656                    b"* 1 FETCH (BODY[] )\r\n",
7657                    # ...and its body
7658                    networkString(
7659                        "{5}\r\n\r\n\r\n%s"
7660                        % (nativeString(self.messages[0].getBodyFile().read()),)
7661                    ),
7662                ]
7663            ),
7664        )
7665
7666        self.transport.clear()
7667
7668        # A server status change...
7669        self.server.modeChanged(writeable=True)
7670
7671        # ...remains buffered...
7672        self.assertFalse(self.transport.value())
7673
7674        self.flushPending()
7675
7676        self.assertEqual(
7677            self.transport.value(),
7678            b"".join(
7679                [
7680                    # The untagged response...
7681                    b"* 2 FETCH (BODY[] )\r\n",
7682                    # ...the status change...
7683                    b"* [READ-WRITE]\r\n",
7684                    # ...and the completion status and final message's body
7685                    networkString(
7686                        "01 OK FETCH completed\r\n{5}\r\n\r\n\r\n%s"
7687                        % (nativeString(self.messages[1].getBodyFile().read()),)
7688                    ),
7689                ]
7690            ),
7691        )
7692
7693
7694class IMAP4ServerFetchTests(TestCase):
7695    """
7696    This test case is for the FETCH tests that require
7697    a C{StringTransport}.
7698    """
7699
7700    def setUp(self):
7701        self.transport = StringTransport()
7702        self.server = imap4.IMAP4Server()
7703        self.server.state = "select"
7704        self.server.makeConnection(self.transport)
7705
7706    def test_fetchWithPartialValidArgument(self):
7707        """
7708        If by any chance, extra bytes got appended at the end of a valid
7709        FETCH arguments, the client should get a BAD - arguments invalid
7710        response.
7711
7712        See U{RFC 3501<http://tools.ietf.org/html/rfc3501#section-6.4.5>},
7713        section 6.4.5,
7714        """
7715        # We need to clear out the welcome message.
7716        self.transport.clear()
7717        # Let's send out the faulty command.
7718        self.server.dataReceived(b"0001 FETCH 1 FULLL\r\n")
7719        expected = b"0001 BAD Illegal syntax: Invalid Argument\r\n"
7720        self.assertEqual(self.transport.value(), expected)
7721        self.transport.clear()
7722        self.server.connectionLost(error.ConnectionDone("Connection closed"))
7723
7724
7725class LiteralTestsMixin:
7726    """
7727    Shared tests for literal classes.
7728
7729    @ivar literalFactory: A callable that returns instances of the
7730        literal under test.
7731    """
7732
7733    def setUp(self):
7734        """
7735        Shared setup.
7736        """
7737        self.deferred = defer.Deferred()
7738
7739    def test_partialWrite(self):
7740        """
7741        The literal returns L{None} when given less data than the
7742        literal requires.
7743        """
7744        literal = self.literalFactory(1024, self.deferred)
7745        self.assertIs(None, literal.write(b"incomplete"))
7746        self.assertNoResult(self.deferred)
7747
7748    def test_exactWrite(self):
7749        """
7750        The literal returns an empty L{bytes} instance when given
7751        exactly the data the literal requires.
7752        """
7753        data = b"complete"
7754        literal = self.literalFactory(len(data), self.deferred)
7755        leftover = literal.write(data)
7756
7757        self.assertIsInstance(leftover, bytes)
7758        self.assertFalse(leftover)
7759        self.assertNoResult(self.deferred)
7760
7761    def test_overlongWrite(self):
7762        """
7763        The literal returns any left over L{bytes} when given more
7764        data than the literal requires.
7765        """
7766        data = b"completeleftover"
7767        literal = self.literalFactory(len(b"complete"), self.deferred)
7768
7769        leftover = literal.write(data)
7770
7771        self.assertEqual(leftover, b"leftover")
7772
7773    def test_emptyLiteral(self):
7774        """
7775        The literal returns an empty L{bytes} instance
7776        when given an empty L{bytes} instance.
7777        """
7778        literal = self.literalFactory(0, self.deferred)
7779        data = b"leftover"
7780
7781        leftover = literal.write(data)
7782
7783        self.assertEqual(leftover, data)
7784
7785
7786class LiteralStringTests(LiteralTestsMixin, SynchronousTestCase):
7787    """
7788    Tests for L{self.literalFactory}.
7789    """
7790
7791    literalFactory = imap4.LiteralString
7792
7793    def test_callback(self):
7794        """
7795        Calling L{imap4.LiteralString.callback} with a line fires the
7796        instance's L{Deferred} with a 2-L{tuple} whose first element
7797        is the collected data and whose second is the provided line.
7798        """
7799        data = b"data"
7800        extra = b"extra"
7801
7802        literal = imap4.LiteralString(len(data), self.deferred)
7803
7804        for c in iterbytes(data):
7805            literal.write(c)
7806
7807        literal.callback(b"extra")
7808
7809        result = self.successResultOf(self.deferred)
7810        self.assertEqual(result, (data, extra))
7811
7812
7813class LiteralFileTests(LiteralTestsMixin, TestCase):
7814    """
7815    Tests for L{imap4.LiteralFile}.
7816    """
7817
7818    literalFactory = imap4.LiteralFile
7819
7820    def test_callback(self):
7821        """
7822        Calling L{imap4.LiteralFile.callback} with a line fires the
7823        instance's L{Deferred} with a 2-L{tuple} whose first element
7824        is the file and whose second is the provided line.
7825        """
7826        data = b"data"
7827        extra = b"extra"
7828
7829        literal = imap4.LiteralFile(len(data), self.deferred)
7830
7831        for c in iterbytes(data):
7832            literal.write(c)
7833
7834        literal.callback(b"extra")
7835
7836        result = self.successResultOf(self.deferred)
7837        self.assertEqual(len(result), 2)
7838
7839        dataFile, extra = result
7840        self.assertEqual(dataFile.read(), b"data")
7841
7842    def test_callbackSpooledToDisk(self):
7843        """
7844        A L{imap4.LiteralFile} whose size exceeds the maximum
7845        in-memory size spools its content to disk, and invoking its
7846        L{callback} with a line fires the instance's L{Deferred} with
7847        a 2-L{tuple} whose first element is the spooled file and whose second
7848        is the provided line.
7849        """
7850        data = b"data"
7851        extra = b"extra"
7852
7853        self.patch(imap4.LiteralFile, "_memoryFileLimit", 1)
7854
7855        literal = imap4.LiteralFile(len(data), self.deferred)
7856
7857        for c in iterbytes(data):
7858            literal.write(c)
7859
7860        literal.callback(b"extra")
7861
7862        result = self.successResultOf(self.deferred)
7863        self.assertEqual(len(result), 2)
7864
7865        dataFile, extra = result
7866        self.assertEqual(dataFile.read(), b"data")
7867
7868
7869class WriteBufferTests(SynchronousTestCase):
7870    """
7871    Tests for L{imap4.WriteBuffer}.
7872    """
7873
7874    def setUp(self):
7875        self.transport = StringTransport()
7876
7877    def test_partialWrite(self):
7878        """
7879        L{imap4.WriteBuffer} buffers writes that are smaller than its
7880        buffer size.
7881        """
7882        buf = imap4.WriteBuffer(self.transport)
7883        data = b"x" * buf.bufferSize
7884
7885        buf.write(data)
7886
7887        self.assertFalse(self.transport.value())
7888
7889    def test_overlongWrite(self):
7890        """
7891        L{imap4.WriteBuffer} writes data without buffering it when
7892        the size of the data exceeds the size of its buffer.
7893        """
7894        buf = imap4.WriteBuffer(self.transport)
7895        data = b"x" * (buf.bufferSize + 1)
7896
7897        buf.write(data)
7898
7899        self.assertEqual(self.transport.value(), data)
7900
7901    def test_writesImplyFlush(self):
7902        """
7903        L{imap4.WriteBuffer} buffers writes until its buffer's size
7904        exceeds its maximum value.
7905        """
7906        buf = imap4.WriteBuffer(self.transport)
7907        firstData = b"x" * buf.bufferSize
7908        secondData = b"y"
7909
7910        buf.write(firstData)
7911
7912        self.assertFalse(self.transport.value())
7913
7914        buf.write(secondData)
7915
7916        self.assertEqual(self.transport.value(), firstData + secondData)
7917
7918    def test_explicitFlush(self):
7919        """
7920        L{imap4.WriteBuffer.flush} flushes the buffer even when its
7921        size is smaller than the buffer size.
7922        """
7923        buf = imap4.WriteBuffer(self.transport)
7924        data = b"x" * (buf.bufferSize)
7925
7926        buf.write(data)
7927
7928        self.assertFalse(self.transport.value())
7929
7930        buf.flush()
7931
7932        self.assertEqual(self.transport.value(), data)
7933
7934    def test_explicitFlushEmptyBuffer(self):
7935        """
7936        L{imap4.WriteBuffer.flush} has no effect if when the buffer is
7937        empty.
7938        """
7939        buf = imap4.WriteBuffer(self.transport)
7940
7941        buf.flush()
7942
7943        self.assertFalse(self.transport.value())
7944