1from test import support
2from test.support import socket_helper
3
4from contextlib import contextmanager
5import imaplib
6import os.path
7import socketserver
8import time
9import calendar
10import threading
11import socket
12
13from test.support import (verbose,
14                          run_with_tz, run_with_locale, cpython_only)
15from test.support import hashlib_helper
16from test.support import threading_helper
17from test.support import warnings_helper
18import unittest
19from unittest import mock
20from datetime import datetime, timezone, timedelta
21try:
22    import ssl
23except ImportError:
24    ssl = None
25
26CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert3.pem")
27CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "pycacert.pem")
28
29
30class TestImaplib(unittest.TestCase):
31
32    def test_Internaldate2tuple(self):
33        t0 = calendar.timegm((2000, 1, 1, 0, 0, 0, -1, -1, -1))
34        tt = imaplib.Internaldate2tuple(
35            b'25 (INTERNALDATE "01-Jan-2000 00:00:00 +0000")')
36        self.assertEqual(time.mktime(tt), t0)
37        tt = imaplib.Internaldate2tuple(
38            b'25 (INTERNALDATE "01-Jan-2000 11:30:00 +1130")')
39        self.assertEqual(time.mktime(tt), t0)
40        tt = imaplib.Internaldate2tuple(
41            b'25 (INTERNALDATE "31-Dec-1999 12:30:00 -1130")')
42        self.assertEqual(time.mktime(tt), t0)
43
44    @run_with_tz('MST+07MDT,M4.1.0,M10.5.0')
45    def test_Internaldate2tuple_issue10941(self):
46        self.assertNotEqual(imaplib.Internaldate2tuple(
47            b'25 (INTERNALDATE "02-Apr-2000 02:30:00 +0000")'),
48            imaplib.Internaldate2tuple(
49                b'25 (INTERNALDATE "02-Apr-2000 03:30:00 +0000")'))
50
51    def timevalues(self):
52        return [2000000000, 2000000000.0, time.localtime(2000000000),
53                (2033, 5, 18, 5, 33, 20, -1, -1, -1),
54                (2033, 5, 18, 5, 33, 20, -1, -1, 1),
55                datetime.fromtimestamp(2000000000,
56                                       timezone(timedelta(0, 2 * 60 * 60))),
57                '"18-May-2033 05:33:20 +0200"']
58
59    @run_with_locale('LC_ALL', 'de_DE', 'fr_FR')
60    # DST rules included to work around quirk where the Gnu C library may not
61    # otherwise restore the previous time zone
62    @run_with_tz('STD-1DST,M3.2.0,M11.1.0')
63    def test_Time2Internaldate(self):
64        expected = '"18-May-2033 05:33:20 +0200"'
65
66        for t in self.timevalues():
67            internal = imaplib.Time2Internaldate(t)
68            self.assertEqual(internal, expected)
69
70    def test_that_Time2Internaldate_returns_a_result(self):
71        # Without tzset, we can check only that it successfully
72        # produces a result, not the correctness of the result itself,
73        # since the result depends on the timezone the machine is in.
74        for t in self.timevalues():
75            imaplib.Time2Internaldate(t)
76
77    def test_imap4_host_default_value(self):
78        # Check whether the IMAP4_PORT is truly unavailable.
79        with socket.socket() as s:
80            try:
81                s.connect(('', imaplib.IMAP4_PORT))
82                self.skipTest(
83                    "Cannot run the test with local IMAP server running.")
84            except socket.error:
85                pass
86
87        # This is the exception that should be raised.
88        expected_errnos = socket_helper.get_socket_conn_refused_errs()
89        with self.assertRaises(OSError) as cm:
90            imaplib.IMAP4()
91        self.assertIn(cm.exception.errno, expected_errnos)
92
93
94if ssl:
95    class SecureTCPServer(socketserver.TCPServer):
96
97        def get_request(self):
98            newsocket, fromaddr = self.socket.accept()
99            context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
100            context.load_cert_chain(CERTFILE)
101            connstream = context.wrap_socket(newsocket, server_side=True)
102            return connstream, fromaddr
103
104    IMAP4_SSL = imaplib.IMAP4_SSL
105
106else:
107
108    class SecureTCPServer:
109        pass
110
111    IMAP4_SSL = None
112
113
114class SimpleIMAPHandler(socketserver.StreamRequestHandler):
115    timeout = support.LOOPBACK_TIMEOUT
116    continuation = None
117    capabilities = ''
118
119    def setup(self):
120        super().setup()
121        self.server.is_selected = False
122        self.server.logged = None
123
124    def _send(self, message):
125        if verbose:
126            print("SENT: %r" % message.strip())
127        self.wfile.write(message)
128
129    def _send_line(self, message):
130        self._send(message + b'\r\n')
131
132    def _send_textline(self, message):
133        self._send_line(message.encode('ASCII'))
134
135    def _send_tagged(self, tag, code, message):
136        self._send_textline(' '.join((tag, code, message)))
137
138    def handle(self):
139        # Send a welcome message.
140        self._send_textline('* OK IMAP4rev1')
141        while 1:
142            # Gather up input until we receive a line terminator or we timeout.
143            # Accumulate read(1) because it's simpler to handle the differences
144            # between naked sockets and SSL sockets.
145            line = b''
146            while 1:
147                try:
148                    part = self.rfile.read(1)
149                    if part == b'':
150                        # Naked sockets return empty strings..
151                        return
152                    line += part
153                except OSError:
154                    # ..but SSLSockets raise exceptions.
155                    return
156                if line.endswith(b'\r\n'):
157                    break
158
159            if verbose:
160                print('GOT: %r' % line.strip())
161            if self.continuation:
162                try:
163                    self.continuation.send(line)
164                except StopIteration:
165                    self.continuation = None
166                continue
167            splitline = line.decode('ASCII').split()
168            tag = splitline[0]
169            cmd = splitline[1]
170            args = splitline[2:]
171
172            if hasattr(self, 'cmd_' + cmd):
173                continuation = getattr(self, 'cmd_' + cmd)(tag, args)
174                if continuation:
175                    self.continuation = continuation
176                    next(continuation)
177            else:
178                self._send_tagged(tag, 'BAD', cmd + ' unknown')
179
180    def cmd_CAPABILITY(self, tag, args):
181        caps = ('IMAP4rev1 ' + self.capabilities
182                if self.capabilities
183                else 'IMAP4rev1')
184        self._send_textline('* CAPABILITY ' + caps)
185        self._send_tagged(tag, 'OK', 'CAPABILITY completed')
186
187    def cmd_LOGOUT(self, tag, args):
188        self.server.logged = None
189        self._send_textline('* BYE IMAP4ref1 Server logging out')
190        self._send_tagged(tag, 'OK', 'LOGOUT completed')
191
192    def cmd_LOGIN(self, tag, args):
193        self.server.logged = args[0]
194        self._send_tagged(tag, 'OK', 'LOGIN completed')
195
196    def cmd_SELECT(self, tag, args):
197        self.server.is_selected = True
198        self._send_line(b'* 2 EXISTS')
199        self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.')
200
201    def cmd_UNSELECT(self, tag, args):
202        if self.server.is_selected:
203            self.server.is_selected = False
204            self._send_tagged(tag, 'OK', 'Returned to authenticated state. (Success)')
205        else:
206            self._send_tagged(tag, 'BAD', 'No mailbox selected')
207
208
209class NewIMAPTestsMixin():
210    client = None
211
212    def _setup(self, imap_handler, connect=True):
213        """
214        Sets up imap_handler for tests. imap_handler should inherit from either:
215        - SimpleIMAPHandler - for testing IMAP commands,
216        - socketserver.StreamRequestHandler - if raw access to stream is needed.
217        Returns (client, server).
218        """
219        class TestTCPServer(self.server_class):
220            def handle_error(self, request, client_address):
221                """
222                End request and raise the error if one occurs.
223                """
224                self.close_request(request)
225                self.server_close()
226                raise
227
228        self.addCleanup(self._cleanup)
229        self.server = self.server_class((socket_helper.HOST, 0), imap_handler)
230        self.thread = threading.Thread(
231            name=self._testMethodName+'-server',
232            target=self.server.serve_forever,
233            # Short poll interval to make the test finish quickly.
234            # Time between requests is short enough that we won't wake
235            # up spuriously too many times.
236            kwargs={'poll_interval': 0.01})
237        self.thread.daemon = True  # In case this function raises.
238        self.thread.start()
239
240        if connect:
241            self.client = self.imap_class(*self.server.server_address)
242
243        return self.client, self.server
244
245    def _cleanup(self):
246        """
247        Cleans up the test server. This method should not be called manually,
248        it is added to the cleanup queue in the _setup method already.
249        """
250        # if logout was called already we'd raise an exception trying to
251        # shutdown the client once again
252        if self.client is not None and self.client.state != 'LOGOUT':
253            self.client.shutdown()
254        # cleanup the server
255        self.server.shutdown()
256        self.server.server_close()
257        threading_helper.join_thread(self.thread)
258        # Explicitly clear the attribute to prevent dangling thread
259        self.thread = None
260
261    def test_EOF_without_complete_welcome_message(self):
262        # http://bugs.python.org/issue5949
263        class EOFHandler(socketserver.StreamRequestHandler):
264            def handle(self):
265                self.wfile.write(b'* OK')
266        _, server = self._setup(EOFHandler, connect=False)
267        self.assertRaises(imaplib.IMAP4.abort, self.imap_class,
268                          *server.server_address)
269
270    def test_line_termination(self):
271        class BadNewlineHandler(SimpleIMAPHandler):
272            def cmd_CAPABILITY(self, tag, args):
273                self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
274                self._send_tagged(tag, 'OK', 'CAPABILITY completed')
275        _, server = self._setup(BadNewlineHandler, connect=False)
276        self.assertRaises(imaplib.IMAP4.abort, self.imap_class,
277                          *server.server_address)
278
279    def test_enable_raises_error_if_not_AUTH(self):
280        class EnableHandler(SimpleIMAPHandler):
281            capabilities = 'AUTH ENABLE UTF8=ACCEPT'
282        client, _ = self._setup(EnableHandler)
283        self.assertFalse(client.utf8_enabled)
284        with self.assertRaisesRegex(imaplib.IMAP4.error, 'ENABLE.*NONAUTH'):
285            client.enable('foo')
286        self.assertFalse(client.utf8_enabled)
287
288    def test_enable_raises_error_if_no_capability(self):
289        client, _ = self._setup(SimpleIMAPHandler)
290        with self.assertRaisesRegex(imaplib.IMAP4.error,
291                'does not support ENABLE'):
292            client.enable('foo')
293
294    def test_enable_UTF8_raises_error_if_not_supported(self):
295        client, _ = self._setup(SimpleIMAPHandler)
296        typ, data = client.login('user', 'pass')
297        self.assertEqual(typ, 'OK')
298        with self.assertRaisesRegex(imaplib.IMAP4.error,
299                'does not support ENABLE'):
300            client.enable('UTF8=ACCEPT')
301
302    def test_enable_UTF8_True_append(self):
303        class UTF8AppendServer(SimpleIMAPHandler):
304            capabilities = 'ENABLE UTF8=ACCEPT'
305            def cmd_ENABLE(self, tag, args):
306                self._send_tagged(tag, 'OK', 'ENABLE successful')
307            def cmd_AUTHENTICATE(self, tag, args):
308                self._send_textline('+')
309                self.server.response = yield
310                self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
311            def cmd_APPEND(self, tag, args):
312                self._send_textline('+')
313                self.server.response = yield
314                self._send_tagged(tag, 'OK', 'okay')
315        client, server = self._setup(UTF8AppendServer)
316        self.assertEqual(client._encoding, 'ascii')
317        code, _ = client.authenticate('MYAUTH', lambda x: b'fake')
318        self.assertEqual(code, 'OK')
319        self.assertEqual(server.response, b'ZmFrZQ==\r\n')  # b64 encoded 'fake'
320        code, _ = client.enable('UTF8=ACCEPT')
321        self.assertEqual(code, 'OK')
322        self.assertEqual(client._encoding, 'utf-8')
323        msg_string = 'Subject: üñí©öðé'
324        typ, data = client.append(None, None, None, msg_string.encode('utf-8'))
325        self.assertEqual(typ, 'OK')
326        self.assertEqual(server.response,
327            ('UTF8 (%s)\r\n' % msg_string).encode('utf-8'))
328
329    def test_search_disallows_charset_in_utf8_mode(self):
330        class UTF8Server(SimpleIMAPHandler):
331            capabilities = 'AUTH ENABLE UTF8=ACCEPT'
332            def cmd_ENABLE(self, tag, args):
333                self._send_tagged(tag, 'OK', 'ENABLE successful')
334            def cmd_AUTHENTICATE(self, tag, args):
335                self._send_textline('+')
336                self.server.response = yield
337                self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
338        client, _ = self._setup(UTF8Server)
339        typ, _ = client.authenticate('MYAUTH', lambda x: b'fake')
340        self.assertEqual(typ, 'OK')
341        typ, _ = client.enable('UTF8=ACCEPT')
342        self.assertEqual(typ, 'OK')
343        self.assertTrue(client.utf8_enabled)
344        with self.assertRaisesRegex(imaplib.IMAP4.error, 'charset.*UTF8'):
345            client.search('foo', 'bar')
346
347    def test_bad_auth_name(self):
348        class MyServer(SimpleIMAPHandler):
349            def cmd_AUTHENTICATE(self, tag, args):
350                self._send_tagged(tag, 'NO',
351                    'unrecognized authentication type {}'.format(args[0]))
352        client, _ = self._setup(MyServer)
353        with self.assertRaisesRegex(imaplib.IMAP4.error,
354                'unrecognized authentication type METHOD'):
355            client.authenticate('METHOD', lambda: 1)
356
357    def test_invalid_authentication(self):
358        class MyServer(SimpleIMAPHandler):
359            def cmd_AUTHENTICATE(self, tag, args):
360                self._send_textline('+')
361                self.response = yield
362                self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid')
363        client, _ = self._setup(MyServer)
364        with self.assertRaisesRegex(imaplib.IMAP4.error,
365                r'\[AUTHENTICATIONFAILED\] invalid'):
366            client.authenticate('MYAUTH', lambda x: b'fake')
367
368    def test_valid_authentication_bytes(self):
369        class MyServer(SimpleIMAPHandler):
370            def cmd_AUTHENTICATE(self, tag, args):
371                self._send_textline('+')
372                self.server.response = yield
373                self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
374        client, server = self._setup(MyServer)
375        code, _ = client.authenticate('MYAUTH', lambda x: b'fake')
376        self.assertEqual(code, 'OK')
377        self.assertEqual(server.response, b'ZmFrZQ==\r\n')  # b64 encoded 'fake'
378
379    def test_valid_authentication_plain_text(self):
380        class MyServer(SimpleIMAPHandler):
381            def cmd_AUTHENTICATE(self, tag, args):
382                self._send_textline('+')
383                self.server.response = yield
384                self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
385        client, server = self._setup(MyServer)
386        code, _ = client.authenticate('MYAUTH', lambda x: 'fake')
387        self.assertEqual(code, 'OK')
388        self.assertEqual(server.response, b'ZmFrZQ==\r\n')  # b64 encoded 'fake'
389
390    @hashlib_helper.requires_hashdigest('md5')
391    def test_login_cram_md5_bytes(self):
392        class AuthHandler(SimpleIMAPHandler):
393            capabilities = 'LOGINDISABLED AUTH=CRAM-MD5'
394            def cmd_AUTHENTICATE(self, tag, args):
395                self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm'
396                                    'VzdG9uLm1jaS5uZXQ=')
397                r = yield
398                if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT'
399                         b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'):
400                    self._send_tagged(tag, 'OK', 'CRAM-MD5 successful')
401                else:
402                    self._send_tagged(tag, 'NO', 'No access')
403        client, _ = self._setup(AuthHandler)
404        self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
405        ret, _ = client.login_cram_md5("tim", b"tanstaaftanstaaf")
406        self.assertEqual(ret, "OK")
407
408    @hashlib_helper.requires_hashdigest('md5')
409    def test_login_cram_md5_plain_text(self):
410        class AuthHandler(SimpleIMAPHandler):
411            capabilities = 'LOGINDISABLED AUTH=CRAM-MD5'
412            def cmd_AUTHENTICATE(self, tag, args):
413                self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm'
414                                    'VzdG9uLm1jaS5uZXQ=')
415                r = yield
416                if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT'
417                         b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'):
418                    self._send_tagged(tag, 'OK', 'CRAM-MD5 successful')
419                else:
420                    self._send_tagged(tag, 'NO', 'No access')
421        client, _ = self._setup(AuthHandler)
422        self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
423        ret, _ = client.login_cram_md5("tim", "tanstaaftanstaaf")
424        self.assertEqual(ret, "OK")
425
426    def test_aborted_authentication(self):
427        class MyServer(SimpleIMAPHandler):
428            def cmd_AUTHENTICATE(self, tag, args):
429                self._send_textline('+')
430                self.response = yield
431                if self.response == b'*\r\n':
432                    self._send_tagged(
433                        tag,
434                        'NO',
435                        '[AUTHENTICATIONFAILED] aborted')
436                else:
437                    self._send_tagged(tag, 'OK', 'MYAUTH successful')
438        client, _ = self._setup(MyServer)
439        with self.assertRaisesRegex(imaplib.IMAP4.error,
440                r'\[AUTHENTICATIONFAILED\] aborted'):
441            client.authenticate('MYAUTH', lambda x: None)
442
443    @mock.patch('imaplib._MAXLINE', 10)
444    def test_linetoolong(self):
445        class TooLongHandler(SimpleIMAPHandler):
446            def handle(self):
447                # send response line longer than the limit set in the next line
448                self.wfile.write(b'* OK ' + 11 * b'x' + b'\r\n')
449        _, server = self._setup(TooLongHandler, connect=False)
450        with self.assertRaisesRegex(imaplib.IMAP4.error,
451                'got more than 10 bytes'):
452            self.imap_class(*server.server_address)
453
454    def test_simple_with_statement(self):
455        _, server = self._setup(SimpleIMAPHandler, connect=False)
456        with self.imap_class(*server.server_address):
457            pass
458
459    def test_imaplib_timeout_test(self):
460        _, server = self._setup(SimpleIMAPHandler)
461        addr = server.server_address[1]
462        client = self.imap_class("localhost", addr, timeout=None)
463        self.assertEqual(client.sock.timeout, None)
464        client.shutdown()
465        client = self.imap_class("localhost", addr, timeout=support.LOOPBACK_TIMEOUT)
466        self.assertEqual(client.sock.timeout, support.LOOPBACK_TIMEOUT)
467        client.shutdown()
468        with self.assertRaises(ValueError):
469            client = self.imap_class("localhost", addr, timeout=0)
470
471    def test_imaplib_timeout_functionality_test(self):
472        class TimeoutHandler(SimpleIMAPHandler):
473            def handle(self):
474                time.sleep(1)
475                SimpleIMAPHandler.handle(self)
476
477        _, server = self._setup(TimeoutHandler)
478        addr = server.server_address[1]
479        with self.assertRaises(TimeoutError):
480            client = self.imap_class("localhost", addr, timeout=0.001)
481
482    def test_with_statement(self):
483        _, server = self._setup(SimpleIMAPHandler, connect=False)
484        with self.imap_class(*server.server_address) as imap:
485            imap.login('user', 'pass')
486            self.assertEqual(server.logged, 'user')
487        self.assertIsNone(server.logged)
488
489    def test_with_statement_logout(self):
490        # It is legal to log out explicitly inside the with block
491        _, server = self._setup(SimpleIMAPHandler, connect=False)
492        with self.imap_class(*server.server_address) as imap:
493            imap.login('user', 'pass')
494            self.assertEqual(server.logged, 'user')
495            imap.logout()
496            self.assertIsNone(server.logged)
497        self.assertIsNone(server.logged)
498
499    # command tests
500
501    def test_login(self):
502        client, _ = self._setup(SimpleIMAPHandler)
503        typ, data = client.login('user', 'pass')
504        self.assertEqual(typ, 'OK')
505        self.assertEqual(data[0], b'LOGIN completed')
506        self.assertEqual(client.state, 'AUTH')
507
508    def test_logout(self):
509        client, _ = self._setup(SimpleIMAPHandler)
510        typ, data = client.login('user', 'pass')
511        self.assertEqual(typ, 'OK')
512        self.assertEqual(data[0], b'LOGIN completed')
513        typ, data = client.logout()
514        self.assertEqual(typ, 'BYE', (typ, data))
515        self.assertEqual(data[0], b'IMAP4ref1 Server logging out', (typ, data))
516        self.assertEqual(client.state, 'LOGOUT')
517
518    def test_lsub(self):
519        class LsubCmd(SimpleIMAPHandler):
520            def cmd_LSUB(self, tag, args):
521                self._send_textline('* LSUB () "." directoryA')
522                return self._send_tagged(tag, 'OK', 'LSUB completed')
523        client, _ = self._setup(LsubCmd)
524        client.login('user', 'pass')
525        typ, data = client.lsub()
526        self.assertEqual(typ, 'OK')
527        self.assertEqual(data[0], b'() "." directoryA')
528
529    def test_unselect(self):
530        client, _ = self._setup(SimpleIMAPHandler)
531        client.login('user', 'pass')
532        typ, data = client.select()
533        self.assertEqual(typ, 'OK')
534        self.assertEqual(data[0], b'2')
535
536        typ, data = client.unselect()
537        self.assertEqual(typ, 'OK')
538        self.assertEqual(data[0], b'Returned to authenticated state. (Success)')
539        self.assertEqual(client.state, 'AUTH')
540
541
542class NewIMAPTests(NewIMAPTestsMixin, unittest.TestCase):
543    imap_class = imaplib.IMAP4
544    server_class = socketserver.TCPServer
545
546
547@unittest.skipUnless(ssl, "SSL not available")
548class NewIMAPSSLTests(NewIMAPTestsMixin, unittest.TestCase):
549    imap_class = IMAP4_SSL
550    server_class = SecureTCPServer
551
552    def test_ssl_raises(self):
553        ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
554        self.assertEqual(ssl_context.verify_mode, ssl.CERT_REQUIRED)
555        self.assertEqual(ssl_context.check_hostname, True)
556        ssl_context.load_verify_locations(CAFILE)
557
558        with self.assertRaisesRegex(ssl.CertificateError,
559                "IP address mismatch, certificate is not valid for "
560                "'127.0.0.1'"):
561            _, server = self._setup(SimpleIMAPHandler)
562            client = self.imap_class(*server.server_address,
563                                     ssl_context=ssl_context)
564            client.shutdown()
565
566    def test_ssl_verified(self):
567        ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
568        ssl_context.load_verify_locations(CAFILE)
569
570        _, server = self._setup(SimpleIMAPHandler)
571        client = self.imap_class("localhost", server.server_address[1],
572                                 ssl_context=ssl_context)
573        client.shutdown()
574
575    # Mock the private method _connect(), so mark the test as specific
576    # to CPython stdlib
577    @cpython_only
578    def test_certfile_arg_warn(self):
579        with warnings_helper.check_warnings(('', DeprecationWarning)):
580            with mock.patch.object(self.imap_class, 'open'):
581                with mock.patch.object(self.imap_class, '_connect'):
582                    self.imap_class('localhost', 143, certfile=CERTFILE)
583
584class ThreadedNetworkedTests(unittest.TestCase):
585    server_class = socketserver.TCPServer
586    imap_class = imaplib.IMAP4
587
588    def make_server(self, addr, hdlr):
589
590        class MyServer(self.server_class):
591            def handle_error(self, request, client_address):
592                self.close_request(request)
593                self.server_close()
594                raise
595
596        if verbose:
597            print("creating server")
598        server = MyServer(addr, hdlr)
599        self.assertEqual(server.server_address, server.socket.getsockname())
600
601        if verbose:
602            print("server created")
603            print("ADDR =", addr)
604            print("CLASS =", self.server_class)
605            print("HDLR =", server.RequestHandlerClass)
606
607        t = threading.Thread(
608            name='%s serving' % self.server_class,
609            target=server.serve_forever,
610            # Short poll interval to make the test finish quickly.
611            # Time between requests is short enough that we won't wake
612            # up spuriously too many times.
613            kwargs={'poll_interval': 0.01})
614        t.daemon = True  # In case this function raises.
615        t.start()
616        if verbose:
617            print("server running")
618        return server, t
619
620    def reap_server(self, server, thread):
621        if verbose:
622            print("waiting for server")
623        server.shutdown()
624        server.server_close()
625        thread.join()
626        if verbose:
627            print("done")
628
629    @contextmanager
630    def reaped_server(self, hdlr):
631        server, thread = self.make_server((socket_helper.HOST, 0), hdlr)
632        try:
633            yield server
634        finally:
635            self.reap_server(server, thread)
636
637    @contextmanager
638    def reaped_pair(self, hdlr):
639        with self.reaped_server(hdlr) as server:
640            client = self.imap_class(*server.server_address)
641            try:
642                yield server, client
643            finally:
644                client.logout()
645
646    @threading_helper.reap_threads
647    def test_connect(self):
648        with self.reaped_server(SimpleIMAPHandler) as server:
649            client = self.imap_class(*server.server_address)
650            client.shutdown()
651
652    @threading_helper.reap_threads
653    def test_bracket_flags(self):
654
655        # This violates RFC 3501, which disallows ']' characters in tag names,
656        # but imaplib has allowed producing such tags forever, other programs
657        # also produce them (eg: OtherInbox's Organizer app as of 20140716),
658        # and Gmail, for example, accepts them and produces them.  So we
659        # support them.  See issue #21815.
660
661        class BracketFlagHandler(SimpleIMAPHandler):
662
663            def handle(self):
664                self.flags = ['Answered', 'Flagged', 'Deleted', 'Seen', 'Draft']
665                super().handle()
666
667            def cmd_AUTHENTICATE(self, tag, args):
668                self._send_textline('+')
669                self.server.response = yield
670                self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
671
672            def cmd_SELECT(self, tag, args):
673                flag_msg = ' \\'.join(self.flags)
674                self._send_line(('* FLAGS (%s)' % flag_msg).encode('ascii'))
675                self._send_line(b'* 2 EXISTS')
676                self._send_line(b'* 0 RECENT')
677                msg = ('* OK [PERMANENTFLAGS %s \\*)] Flags permitted.'
678                        % flag_msg)
679                self._send_line(msg.encode('ascii'))
680                self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.')
681
682            def cmd_STORE(self, tag, args):
683                new_flags = args[2].strip('(').strip(')').split()
684                self.flags.extend(new_flags)
685                flags_msg = '(FLAGS (%s))' % ' \\'.join(self.flags)
686                msg = '* %s FETCH %s' % (args[0], flags_msg)
687                self._send_line(msg.encode('ascii'))
688                self._send_tagged(tag, 'OK', 'STORE completed.')
689
690        with self.reaped_pair(BracketFlagHandler) as (server, client):
691            code, data = client.authenticate('MYAUTH', lambda x: b'fake')
692            self.assertEqual(code, 'OK')
693            self.assertEqual(server.response, b'ZmFrZQ==\r\n')
694            client.select('test')
695            typ, [data] = client.store(b'1', "+FLAGS", "[test]")
696            self.assertIn(b'[test]', data)
697            client.select('test')
698            typ, [data] = client.response('PERMANENTFLAGS')
699            self.assertIn(b'[test]', data)
700
701    @threading_helper.reap_threads
702    def test_issue5949(self):
703
704        class EOFHandler(socketserver.StreamRequestHandler):
705            def handle(self):
706                # EOF without sending a complete welcome message.
707                self.wfile.write(b'* OK')
708
709        with self.reaped_server(EOFHandler) as server:
710            self.assertRaises(imaplib.IMAP4.abort,
711                              self.imap_class, *server.server_address)
712
713    @threading_helper.reap_threads
714    def test_line_termination(self):
715
716        class BadNewlineHandler(SimpleIMAPHandler):
717
718            def cmd_CAPABILITY(self, tag, args):
719                self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
720                self._send_tagged(tag, 'OK', 'CAPABILITY completed')
721
722        with self.reaped_server(BadNewlineHandler) as server:
723            self.assertRaises(imaplib.IMAP4.abort,
724                              self.imap_class, *server.server_address)
725
726    class UTF8Server(SimpleIMAPHandler):
727        capabilities = 'AUTH ENABLE UTF8=ACCEPT'
728
729        def cmd_ENABLE(self, tag, args):
730            self._send_tagged(tag, 'OK', 'ENABLE successful')
731
732        def cmd_AUTHENTICATE(self, tag, args):
733            self._send_textline('+')
734            self.server.response = yield
735            self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
736
737    @threading_helper.reap_threads
738    def test_enable_raises_error_if_not_AUTH(self):
739        with self.reaped_pair(self.UTF8Server) as (server, client):
740            self.assertFalse(client.utf8_enabled)
741            self.assertRaises(imaplib.IMAP4.error, client.enable, 'foo')
742            self.assertFalse(client.utf8_enabled)
743
744    # XXX Also need a test that enable after SELECT raises an error.
745
746    @threading_helper.reap_threads
747    def test_enable_raises_error_if_no_capability(self):
748        class NoEnableServer(self.UTF8Server):
749            capabilities = 'AUTH'
750        with self.reaped_pair(NoEnableServer) as (server, client):
751            self.assertRaises(imaplib.IMAP4.error, client.enable, 'foo')
752
753    @threading_helper.reap_threads
754    def test_enable_UTF8_raises_error_if_not_supported(self):
755        class NonUTF8Server(SimpleIMAPHandler):
756            pass
757        with self.assertRaises(imaplib.IMAP4.error):
758            with self.reaped_pair(NonUTF8Server) as (server, client):
759                typ, data = client.login('user', 'pass')
760                self.assertEqual(typ, 'OK')
761                client.enable('UTF8=ACCEPT')
762                pass
763
764    @threading_helper.reap_threads
765    def test_enable_UTF8_True_append(self):
766
767        class UTF8AppendServer(self.UTF8Server):
768            def cmd_APPEND(self, tag, args):
769                self._send_textline('+')
770                self.server.response = yield
771                self._send_tagged(tag, 'OK', 'okay')
772
773        with self.reaped_pair(UTF8AppendServer) as (server, client):
774            self.assertEqual(client._encoding, 'ascii')
775            code, _ = client.authenticate('MYAUTH', lambda x: b'fake')
776            self.assertEqual(code, 'OK')
777            self.assertEqual(server.response,
778                             b'ZmFrZQ==\r\n')  # b64 encoded 'fake'
779            code, _ = client.enable('UTF8=ACCEPT')
780            self.assertEqual(code, 'OK')
781            self.assertEqual(client._encoding, 'utf-8')
782            msg_string = 'Subject: üñí©öðé'
783            typ, data = client.append(
784                None, None, None, msg_string.encode('utf-8'))
785            self.assertEqual(typ, 'OK')
786            self.assertEqual(
787                server.response,
788                ('UTF8 (%s)\r\n' % msg_string).encode('utf-8')
789            )
790
791    # XXX also need a test that makes sure that the Literal and Untagged_status
792    # regexes uses unicode in UTF8 mode instead of the default ASCII.
793
794    @threading_helper.reap_threads
795    def test_search_disallows_charset_in_utf8_mode(self):
796        with self.reaped_pair(self.UTF8Server) as (server, client):
797            typ, _ = client.authenticate('MYAUTH', lambda x: b'fake')
798            self.assertEqual(typ, 'OK')
799            typ, _ = client.enable('UTF8=ACCEPT')
800            self.assertEqual(typ, 'OK')
801            self.assertTrue(client.utf8_enabled)
802            self.assertRaises(imaplib.IMAP4.error, client.search, 'foo', 'bar')
803
804    @threading_helper.reap_threads
805    def test_bad_auth_name(self):
806
807        class MyServer(SimpleIMAPHandler):
808
809            def cmd_AUTHENTICATE(self, tag, args):
810                self._send_tagged(tag, 'NO', 'unrecognized authentication '
811                                  'type {}'.format(args[0]))
812
813        with self.reaped_pair(MyServer) as (server, client):
814            with self.assertRaises(imaplib.IMAP4.error):
815                client.authenticate('METHOD', lambda: 1)
816
817    @threading_helper.reap_threads
818    def test_invalid_authentication(self):
819
820        class MyServer(SimpleIMAPHandler):
821
822            def cmd_AUTHENTICATE(self, tag, args):
823                self._send_textline('+')
824                self.response = yield
825                self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid')
826
827        with self.reaped_pair(MyServer) as (server, client):
828            with self.assertRaises(imaplib.IMAP4.error):
829                code, data = client.authenticate('MYAUTH', lambda x: b'fake')
830
831    @threading_helper.reap_threads
832    def test_valid_authentication(self):
833
834        class MyServer(SimpleIMAPHandler):
835
836            def cmd_AUTHENTICATE(self, tag, args):
837                self._send_textline('+')
838                self.server.response = yield
839                self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
840
841        with self.reaped_pair(MyServer) as (server, client):
842            code, data = client.authenticate('MYAUTH', lambda x: b'fake')
843            self.assertEqual(code, 'OK')
844            self.assertEqual(server.response,
845                             b'ZmFrZQ==\r\n')  # b64 encoded 'fake'
846
847        with self.reaped_pair(MyServer) as (server, client):
848            code, data = client.authenticate('MYAUTH', lambda x: 'fake')
849            self.assertEqual(code, 'OK')
850            self.assertEqual(server.response,
851                             b'ZmFrZQ==\r\n')  # b64 encoded 'fake'
852
853    @threading_helper.reap_threads
854    @hashlib_helper.requires_hashdigest('md5')
855    def test_login_cram_md5(self):
856
857        class AuthHandler(SimpleIMAPHandler):
858
859            capabilities = 'LOGINDISABLED AUTH=CRAM-MD5'
860
861            def cmd_AUTHENTICATE(self, tag, args):
862                self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm'
863                                    'VzdG9uLm1jaS5uZXQ=')
864                r = yield
865                if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT'
866                         b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'):
867                    self._send_tagged(tag, 'OK', 'CRAM-MD5 successful')
868                else:
869                    self._send_tagged(tag, 'NO', 'No access')
870
871        with self.reaped_pair(AuthHandler) as (server, client):
872            self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
873            ret, data = client.login_cram_md5("tim", "tanstaaftanstaaf")
874            self.assertEqual(ret, "OK")
875
876        with self.reaped_pair(AuthHandler) as (server, client):
877            self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
878            ret, data = client.login_cram_md5("tim", b"tanstaaftanstaaf")
879            self.assertEqual(ret, "OK")
880
881
882    @threading_helper.reap_threads
883    def test_aborted_authentication(self):
884
885        class MyServer(SimpleIMAPHandler):
886
887            def cmd_AUTHENTICATE(self, tag, args):
888                self._send_textline('+')
889                self.response = yield
890
891                if self.response == b'*\r\n':
892                    self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] aborted')
893                else:
894                    self._send_tagged(tag, 'OK', 'MYAUTH successful')
895
896        with self.reaped_pair(MyServer) as (server, client):
897            with self.assertRaises(imaplib.IMAP4.error):
898                code, data = client.authenticate('MYAUTH', lambda x: None)
899
900
901    def test_linetoolong(self):
902        class TooLongHandler(SimpleIMAPHandler):
903            def handle(self):
904                # Send a very long response line
905                self.wfile.write(b'* OK ' + imaplib._MAXLINE * b'x' + b'\r\n')
906
907        with self.reaped_server(TooLongHandler) as server:
908            self.assertRaises(imaplib.IMAP4.error,
909                              self.imap_class, *server.server_address)
910
911    @threading_helper.reap_threads
912    def test_simple_with_statement(self):
913        # simplest call
914        with self.reaped_server(SimpleIMAPHandler) as server:
915            with self.imap_class(*server.server_address):
916                pass
917
918    @threading_helper.reap_threads
919    def test_with_statement(self):
920        with self.reaped_server(SimpleIMAPHandler) as server:
921            with self.imap_class(*server.server_address) as imap:
922                imap.login('user', 'pass')
923                self.assertEqual(server.logged, 'user')
924            self.assertIsNone(server.logged)
925
926    @threading_helper.reap_threads
927    def test_with_statement_logout(self):
928        # what happens if already logout in the block?
929        with self.reaped_server(SimpleIMAPHandler) as server:
930            with self.imap_class(*server.server_address) as imap:
931                imap.login('user', 'pass')
932                self.assertEqual(server.logged, 'user')
933                imap.logout()
934                self.assertIsNone(server.logged)
935            self.assertIsNone(server.logged)
936
937    @threading_helper.reap_threads
938    @cpython_only
939    def test_dump_ur(self):
940        # See: http://bugs.python.org/issue26543
941        untagged_resp_dict = {'READ-WRITE': [b'']}
942
943        with self.reaped_server(SimpleIMAPHandler) as server:
944            with self.imap_class(*server.server_address) as imap:
945                with mock.patch.object(imap, '_mesg') as mock_mesg:
946                    imap._dump_ur(untagged_resp_dict)
947                    mock_mesg.assert_called_with(
948                        "untagged responses dump:READ-WRITE: [b'']"
949                    )
950
951
952@unittest.skipUnless(ssl, "SSL not available")
953class ThreadedNetworkedTestsSSL(ThreadedNetworkedTests):
954    server_class = SecureTCPServer
955    imap_class = IMAP4_SSL
956
957    @threading_helper.reap_threads
958    def test_ssl_verified(self):
959        ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
960        ssl_context.load_verify_locations(CAFILE)
961
962        with self.assertRaisesRegex(
963                ssl.CertificateError,
964                "IP address mismatch, certificate is not valid for "
965                "'127.0.0.1'"):
966            with self.reaped_server(SimpleIMAPHandler) as server:
967                client = self.imap_class(*server.server_address,
968                                         ssl_context=ssl_context)
969                client.shutdown()
970
971        with self.reaped_server(SimpleIMAPHandler) as server:
972            client = self.imap_class("localhost", server.server_address[1],
973                                     ssl_context=ssl_context)
974            client.shutdown()
975
976
977@unittest.skipUnless(
978    support.is_resource_enabled('network'), 'network resource disabled')
979@unittest.skip('cyrus.andrew.cmu.edu blocks connections')
980class RemoteIMAPTest(unittest.TestCase):
981    host = 'cyrus.andrew.cmu.edu'
982    port = 143
983    username = 'anonymous'
984    password = 'pass'
985    imap_class = imaplib.IMAP4
986
987    def setUp(self):
988        with socket_helper.transient_internet(self.host):
989            self.server = self.imap_class(self.host, self.port)
990
991    def tearDown(self):
992        if self.server is not None:
993            with socket_helper.transient_internet(self.host):
994                self.server.logout()
995
996    def test_logincapa(self):
997        with socket_helper.transient_internet(self.host):
998            for cap in self.server.capabilities:
999                self.assertIsInstance(cap, str)
1000            self.assertIn('LOGINDISABLED', self.server.capabilities)
1001            self.assertIn('AUTH=ANONYMOUS', self.server.capabilities)
1002            rs = self.server.login(self.username, self.password)
1003            self.assertEqual(rs[0], 'OK')
1004
1005    def test_logout(self):
1006        with socket_helper.transient_internet(self.host):
1007            rs = self.server.logout()
1008            self.server = None
1009            self.assertEqual(rs[0], 'BYE', rs)
1010
1011
1012@unittest.skipUnless(ssl, "SSL not available")
1013@unittest.skipUnless(
1014    support.is_resource_enabled('network'), 'network resource disabled')
1015@unittest.skip('cyrus.andrew.cmu.edu blocks connections')
1016class RemoteIMAP_STARTTLSTest(RemoteIMAPTest):
1017
1018    def setUp(self):
1019        super().setUp()
1020        with socket_helper.transient_internet(self.host):
1021            rs = self.server.starttls()
1022            self.assertEqual(rs[0], 'OK')
1023
1024    def test_logincapa(self):
1025        for cap in self.server.capabilities:
1026            self.assertIsInstance(cap, str)
1027        self.assertNotIn('LOGINDISABLED', self.server.capabilities)
1028
1029
1030@unittest.skipUnless(ssl, "SSL not available")
1031@unittest.skip('cyrus.andrew.cmu.edu blocks connections')
1032class RemoteIMAP_SSLTest(RemoteIMAPTest):
1033    port = 993
1034    imap_class = IMAP4_SSL
1035
1036    def setUp(self):
1037        pass
1038
1039    def tearDown(self):
1040        pass
1041
1042    def create_ssl_context(self):
1043        ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1044        ssl_context.check_hostname = False
1045        ssl_context.verify_mode = ssl.CERT_NONE
1046        ssl_context.load_cert_chain(CERTFILE)
1047        return ssl_context
1048
1049    def check_logincapa(self, server):
1050        try:
1051            for cap in server.capabilities:
1052                self.assertIsInstance(cap, str)
1053            self.assertNotIn('LOGINDISABLED', server.capabilities)
1054            self.assertIn('AUTH=PLAIN', server.capabilities)
1055            rs = server.login(self.username, self.password)
1056            self.assertEqual(rs[0], 'OK')
1057        finally:
1058            server.logout()
1059
1060    def test_logincapa(self):
1061        with socket_helper.transient_internet(self.host):
1062            _server = self.imap_class(self.host, self.port)
1063            self.check_logincapa(_server)
1064
1065    def test_logout(self):
1066        with socket_helper.transient_internet(self.host):
1067            _server = self.imap_class(self.host, self.port)
1068            rs = _server.logout()
1069            self.assertEqual(rs[0], 'BYE', rs)
1070
1071    def test_ssl_context_certfile_exclusive(self):
1072        with socket_helper.transient_internet(self.host):
1073            self.assertRaises(
1074                ValueError, self.imap_class, self.host, self.port,
1075                certfile=CERTFILE, ssl_context=self.create_ssl_context())
1076
1077    def test_ssl_context_keyfile_exclusive(self):
1078        with socket_helper.transient_internet(self.host):
1079            self.assertRaises(
1080                ValueError, self.imap_class, self.host, self.port,
1081                keyfile=CERTFILE, ssl_context=self.create_ssl_context())
1082
1083
1084if __name__ == "__main__":
1085    unittest.main()
1086