1from test import support 2 3from contextlib import contextmanager 4import errno 5import imaplib 6import os.path 7import socketserver 8import time 9import calendar 10import threading 11import socket 12 13from test.support import (reap_threads, verbose, transient_internet, 14 run_with_tz, run_with_locale, cpython_only) 15import unittest 16from unittest import mock 17from datetime import datetime, timezone, timedelta 18try: 19 import ssl 20except ImportError: 21 ssl = None 22 23CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert3.pem") 24CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "pycacert.pem") 25 26 27class TestImaplib(unittest.TestCase): 28 29 def test_Internaldate2tuple(self): 30 t0 = calendar.timegm((2000, 1, 1, 0, 0, 0, -1, -1, -1)) 31 tt = imaplib.Internaldate2tuple( 32 b'25 (INTERNALDATE "01-Jan-2000 00:00:00 +0000")') 33 self.assertEqual(time.mktime(tt), t0) 34 tt = imaplib.Internaldate2tuple( 35 b'25 (INTERNALDATE "01-Jan-2000 11:30:00 +1130")') 36 self.assertEqual(time.mktime(tt), t0) 37 tt = imaplib.Internaldate2tuple( 38 b'25 (INTERNALDATE "31-Dec-1999 12:30:00 -1130")') 39 self.assertEqual(time.mktime(tt), t0) 40 41 @run_with_tz('MST+07MDT,M4.1.0,M10.5.0') 42 def test_Internaldate2tuple_issue10941(self): 43 self.assertNotEqual(imaplib.Internaldate2tuple( 44 b'25 (INTERNALDATE "02-Apr-2000 02:30:00 +0000")'), 45 imaplib.Internaldate2tuple( 46 b'25 (INTERNALDATE "02-Apr-2000 03:30:00 +0000")')) 47 48 def timevalues(self): 49 return [2000000000, 2000000000.0, time.localtime(2000000000), 50 (2033, 5, 18, 5, 33, 20, -1, -1, -1), 51 (2033, 5, 18, 5, 33, 20, -1, -1, 1), 52 datetime.fromtimestamp(2000000000, 53 timezone(timedelta(0, 2 * 60 * 60))), 54 '"18-May-2033 05:33:20 +0200"'] 55 56 @run_with_locale('LC_ALL', 'de_DE', 'fr_FR') 57 # DST rules included to work around quirk where the Gnu C library may not 58 # otherwise restore the previous time zone 59 @run_with_tz('STD-1DST,M3.2.0,M11.1.0') 60 def test_Time2Internaldate(self): 61 expected = '"18-May-2033 05:33:20 +0200"' 62 63 for t in self.timevalues(): 64 internal = imaplib.Time2Internaldate(t) 65 self.assertEqual(internal, expected) 66 67 def test_that_Time2Internaldate_returns_a_result(self): 68 # Without tzset, we can check only that it successfully 69 # produces a result, not the correctness of the result itself, 70 # since the result depends on the timezone the machine is in. 71 for t in self.timevalues(): 72 imaplib.Time2Internaldate(t) 73 74 def test_imap4_host_default_value(self): 75 # Check whether the IMAP4_PORT is truly unavailable. 76 with socket.socket() as s: 77 try: 78 s.connect(('', imaplib.IMAP4_PORT)) 79 self.skipTest( 80 "Cannot run the test with local IMAP server running.") 81 except socket.error: 82 pass 83 84 # This is the exception that should be raised. 85 expected_errnos = support.get_socket_conn_refused_errs() 86 with self.assertRaises(OSError) as cm: 87 imaplib.IMAP4() 88 self.assertIn(cm.exception.errno, expected_errnos) 89 90 91if ssl: 92 class SecureTCPServer(socketserver.TCPServer): 93 94 def get_request(self): 95 newsocket, fromaddr = self.socket.accept() 96 context = ssl.SSLContext() 97 context.load_cert_chain(CERTFILE) 98 connstream = context.wrap_socket(newsocket, server_side=True) 99 return connstream, fromaddr 100 101 IMAP4_SSL = imaplib.IMAP4_SSL 102 103else: 104 105 class SecureTCPServer: 106 pass 107 108 IMAP4_SSL = None 109 110 111class SimpleIMAPHandler(socketserver.StreamRequestHandler): 112 timeout = 1 113 continuation = None 114 capabilities = '' 115 116 def setup(self): 117 super().setup() 118 self.server.logged = None 119 120 def _send(self, message): 121 if verbose: 122 print("SENT: %r" % message.strip()) 123 self.wfile.write(message) 124 125 def _send_line(self, message): 126 self._send(message + b'\r\n') 127 128 def _send_textline(self, message): 129 self._send_line(message.encode('ASCII')) 130 131 def _send_tagged(self, tag, code, message): 132 self._send_textline(' '.join((tag, code, message))) 133 134 def handle(self): 135 # Send a welcome message. 136 self._send_textline('* OK IMAP4rev1') 137 while 1: 138 # Gather up input until we receive a line terminator or we timeout. 139 # Accumulate read(1) because it's simpler to handle the differences 140 # between naked sockets and SSL sockets. 141 line = b'' 142 while 1: 143 try: 144 part = self.rfile.read(1) 145 if part == b'': 146 # Naked sockets return empty strings.. 147 return 148 line += part 149 except OSError: 150 # ..but SSLSockets raise exceptions. 151 return 152 if line.endswith(b'\r\n'): 153 break 154 155 if verbose: 156 print('GOT: %r' % line.strip()) 157 if self.continuation: 158 try: 159 self.continuation.send(line) 160 except StopIteration: 161 self.continuation = None 162 continue 163 splitline = line.decode('ASCII').split() 164 tag = splitline[0] 165 cmd = splitline[1] 166 args = splitline[2:] 167 168 if hasattr(self, 'cmd_' + cmd): 169 continuation = getattr(self, 'cmd_' + cmd)(tag, args) 170 if continuation: 171 self.continuation = continuation 172 next(continuation) 173 else: 174 self._send_tagged(tag, 'BAD', cmd + ' unknown') 175 176 def cmd_CAPABILITY(self, tag, args): 177 caps = ('IMAP4rev1 ' + self.capabilities 178 if self.capabilities 179 else 'IMAP4rev1') 180 self._send_textline('* CAPABILITY ' + caps) 181 self._send_tagged(tag, 'OK', 'CAPABILITY completed') 182 183 def cmd_LOGOUT(self, tag, args): 184 self.server.logged = None 185 self._send_textline('* BYE IMAP4ref1 Server logging out') 186 self._send_tagged(tag, 'OK', 'LOGOUT completed') 187 188 def cmd_LOGIN(self, tag, args): 189 self.server.logged = args[0] 190 self._send_tagged(tag, 'OK', 'LOGIN completed') 191 192 193class NewIMAPTestsMixin(): 194 client = None 195 196 def _setup(self, imap_handler, connect=True): 197 """ 198 Sets up imap_handler for tests. imap_handler should inherit from either: 199 - SimpleIMAPHandler - for testing IMAP commands, 200 - socketserver.StreamRequestHandler - if raw access to stream is needed. 201 Returns (client, server). 202 """ 203 class TestTCPServer(self.server_class): 204 def handle_error(self, request, client_address): 205 """ 206 End request and raise the error if one occurs. 207 """ 208 self.close_request(request) 209 self.server_close() 210 raise 211 212 self.addCleanup(self._cleanup) 213 self.server = self.server_class((support.HOST, 0), imap_handler) 214 self.thread = threading.Thread( 215 name=self._testMethodName+'-server', 216 target=self.server.serve_forever, 217 # Short poll interval to make the test finish quickly. 218 # Time between requests is short enough that we won't wake 219 # up spuriously too many times. 220 kwargs={'poll_interval': 0.01}) 221 self.thread.daemon = True # In case this function raises. 222 self.thread.start() 223 224 if connect: 225 self.client = self.imap_class(*self.server.server_address) 226 227 return self.client, self.server 228 229 def _cleanup(self): 230 """ 231 Cleans up the test server. This method should not be called manually, 232 it is added to the cleanup queue in the _setup method already. 233 """ 234 # if logout was called already we'd raise an exception trying to 235 # shutdown the client once again 236 if self.client is not None and self.client.state != 'LOGOUT': 237 self.client.shutdown() 238 # cleanup the server 239 self.server.shutdown() 240 self.server.server_close() 241 support.join_thread(self.thread, 3.0) 242 # Explicitly clear the attribute to prevent dangling thread 243 self.thread = None 244 245 def test_EOF_without_complete_welcome_message(self): 246 # http://bugs.python.org/issue5949 247 class EOFHandler(socketserver.StreamRequestHandler): 248 def handle(self): 249 self.wfile.write(b'* OK') 250 _, server = self._setup(EOFHandler, connect=False) 251 self.assertRaises(imaplib.IMAP4.abort, self.imap_class, 252 *server.server_address) 253 254 def test_line_termination(self): 255 class BadNewlineHandler(SimpleIMAPHandler): 256 def cmd_CAPABILITY(self, tag, args): 257 self._send(b'* CAPABILITY IMAP4rev1 AUTH\n') 258 self._send_tagged(tag, 'OK', 'CAPABILITY completed') 259 _, server = self._setup(BadNewlineHandler, connect=False) 260 self.assertRaises(imaplib.IMAP4.abort, self.imap_class, 261 *server.server_address) 262 263 def test_enable_raises_error_if_not_AUTH(self): 264 class EnableHandler(SimpleIMAPHandler): 265 capabilities = 'AUTH ENABLE UTF8=ACCEPT' 266 client, _ = self._setup(EnableHandler) 267 self.assertFalse(client.utf8_enabled) 268 with self.assertRaisesRegex(imaplib.IMAP4.error, 'ENABLE.*NONAUTH'): 269 client.enable('foo') 270 self.assertFalse(client.utf8_enabled) 271 272 def test_enable_raises_error_if_no_capability(self): 273 client, _ = self._setup(SimpleIMAPHandler) 274 with self.assertRaisesRegex(imaplib.IMAP4.error, 275 'does not support ENABLE'): 276 client.enable('foo') 277 278 def test_enable_UTF8_raises_error_if_not_supported(self): 279 client, _ = self._setup(SimpleIMAPHandler) 280 typ, data = client.login('user', 'pass') 281 self.assertEqual(typ, 'OK') 282 with self.assertRaisesRegex(imaplib.IMAP4.error, 283 'does not support ENABLE'): 284 client.enable('UTF8=ACCEPT') 285 286 def test_enable_UTF8_True_append(self): 287 class UTF8AppendServer(SimpleIMAPHandler): 288 capabilities = 'ENABLE UTF8=ACCEPT' 289 def cmd_ENABLE(self, tag, args): 290 self._send_tagged(tag, 'OK', 'ENABLE successful') 291 def cmd_AUTHENTICATE(self, tag, args): 292 self._send_textline('+') 293 self.server.response = yield 294 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 295 def cmd_APPEND(self, tag, args): 296 self._send_textline('+') 297 self.server.response = yield 298 self._send_tagged(tag, 'OK', 'okay') 299 client, server = self._setup(UTF8AppendServer) 300 self.assertEqual(client._encoding, 'ascii') 301 code, _ = client.authenticate('MYAUTH', lambda x: b'fake') 302 self.assertEqual(code, 'OK') 303 self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake' 304 code, _ = client.enable('UTF8=ACCEPT') 305 self.assertEqual(code, 'OK') 306 self.assertEqual(client._encoding, 'utf-8') 307 msg_string = 'Subject: üñí©öðé' 308 typ, data = client.append(None, None, None, msg_string.encode('utf-8')) 309 self.assertEqual(typ, 'OK') 310 self.assertEqual(server.response, 311 ('UTF8 (%s)\r\n' % msg_string).encode('utf-8')) 312 313 def test_search_disallows_charset_in_utf8_mode(self): 314 class UTF8Server(SimpleIMAPHandler): 315 capabilities = 'AUTH ENABLE UTF8=ACCEPT' 316 def cmd_ENABLE(self, tag, args): 317 self._send_tagged(tag, 'OK', 'ENABLE successful') 318 def cmd_AUTHENTICATE(self, tag, args): 319 self._send_textline('+') 320 self.server.response = yield 321 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 322 client, _ = self._setup(UTF8Server) 323 typ, _ = client.authenticate('MYAUTH', lambda x: b'fake') 324 self.assertEqual(typ, 'OK') 325 typ, _ = client.enable('UTF8=ACCEPT') 326 self.assertEqual(typ, 'OK') 327 self.assertTrue(client.utf8_enabled) 328 with self.assertRaisesRegex(imaplib.IMAP4.error, 'charset.*UTF8'): 329 client.search('foo', 'bar') 330 331 def test_bad_auth_name(self): 332 class MyServer(SimpleIMAPHandler): 333 def cmd_AUTHENTICATE(self, tag, args): 334 self._send_tagged(tag, 'NO', 335 'unrecognized authentication type {}'.format(args[0])) 336 client, _ = self._setup(MyServer) 337 with self.assertRaisesRegex(imaplib.IMAP4.error, 338 'unrecognized authentication type METHOD'): 339 client.authenticate('METHOD', lambda: 1) 340 341 def test_invalid_authentication(self): 342 class MyServer(SimpleIMAPHandler): 343 def cmd_AUTHENTICATE(self, tag, args): 344 self._send_textline('+') 345 self.response = yield 346 self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid') 347 client, _ = self._setup(MyServer) 348 with self.assertRaisesRegex(imaplib.IMAP4.error, 349 r'\[AUTHENTICATIONFAILED\] invalid'): 350 client.authenticate('MYAUTH', lambda x: b'fake') 351 352 def test_valid_authentication_bytes(self): 353 class MyServer(SimpleIMAPHandler): 354 def cmd_AUTHENTICATE(self, tag, args): 355 self._send_textline('+') 356 self.server.response = yield 357 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 358 client, server = self._setup(MyServer) 359 code, _ = client.authenticate('MYAUTH', lambda x: b'fake') 360 self.assertEqual(code, 'OK') 361 self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake' 362 363 def test_valid_authentication_plain_text(self): 364 class MyServer(SimpleIMAPHandler): 365 def cmd_AUTHENTICATE(self, tag, args): 366 self._send_textline('+') 367 self.server.response = yield 368 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 369 client, server = self._setup(MyServer) 370 code, _ = client.authenticate('MYAUTH', lambda x: 'fake') 371 self.assertEqual(code, 'OK') 372 self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake' 373 374 def test_login_cram_md5_bytes(self): 375 class AuthHandler(SimpleIMAPHandler): 376 capabilities = 'LOGINDISABLED AUTH=CRAM-MD5' 377 def cmd_AUTHENTICATE(self, tag, args): 378 self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm' 379 'VzdG9uLm1jaS5uZXQ=') 380 r = yield 381 if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT' 382 b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'): 383 self._send_tagged(tag, 'OK', 'CRAM-MD5 successful') 384 else: 385 self._send_tagged(tag, 'NO', 'No access') 386 client, _ = self._setup(AuthHandler) 387 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities) 388 ret, _ = client.login_cram_md5("tim", b"tanstaaftanstaaf") 389 self.assertEqual(ret, "OK") 390 391 def test_login_cram_md5_plain_text(self): 392 class AuthHandler(SimpleIMAPHandler): 393 capabilities = 'LOGINDISABLED AUTH=CRAM-MD5' 394 def cmd_AUTHENTICATE(self, tag, args): 395 self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm' 396 'VzdG9uLm1jaS5uZXQ=') 397 r = yield 398 if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT' 399 b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'): 400 self._send_tagged(tag, 'OK', 'CRAM-MD5 successful') 401 else: 402 self._send_tagged(tag, 'NO', 'No access') 403 client, _ = self._setup(AuthHandler) 404 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities) 405 ret, _ = client.login_cram_md5("tim", "tanstaaftanstaaf") 406 self.assertEqual(ret, "OK") 407 408 def test_aborted_authentication(self): 409 class MyServer(SimpleIMAPHandler): 410 def cmd_AUTHENTICATE(self, tag, args): 411 self._send_textline('+') 412 self.response = yield 413 if self.response == b'*\r\n': 414 self._send_tagged( 415 tag, 416 'NO', 417 '[AUTHENTICATIONFAILED] aborted') 418 else: 419 self._send_tagged(tag, 'OK', 'MYAUTH successful') 420 client, _ = self._setup(MyServer) 421 with self.assertRaisesRegex(imaplib.IMAP4.error, 422 r'\[AUTHENTICATIONFAILED\] aborted'): 423 client.authenticate('MYAUTH', lambda x: None) 424 425 @mock.patch('imaplib._MAXLINE', 10) 426 def test_linetoolong(self): 427 class TooLongHandler(SimpleIMAPHandler): 428 def handle(self): 429 # send response line longer than the limit set in the next line 430 self.wfile.write(b'* OK ' + 11 * b'x' + b'\r\n') 431 _, server = self._setup(TooLongHandler, connect=False) 432 with self.assertRaisesRegex(imaplib.IMAP4.error, 433 'got more than 10 bytes'): 434 self.imap_class(*server.server_address) 435 436 def test_simple_with_statement(self): 437 _, server = self._setup(SimpleIMAPHandler, connect=False) 438 with self.imap_class(*server.server_address): 439 pass 440 441 def test_with_statement(self): 442 _, server = self._setup(SimpleIMAPHandler, connect=False) 443 with self.imap_class(*server.server_address) as imap: 444 imap.login('user', 'pass') 445 self.assertEqual(server.logged, 'user') 446 self.assertIsNone(server.logged) 447 448 def test_with_statement_logout(self): 449 # It is legal to log out explicitly inside the with block 450 _, server = self._setup(SimpleIMAPHandler, connect=False) 451 with self.imap_class(*server.server_address) as imap: 452 imap.login('user', 'pass') 453 self.assertEqual(server.logged, 'user') 454 imap.logout() 455 self.assertIsNone(server.logged) 456 self.assertIsNone(server.logged) 457 458 # command tests 459 460 def test_login(self): 461 client, _ = self._setup(SimpleIMAPHandler) 462 typ, data = client.login('user', 'pass') 463 self.assertEqual(typ, 'OK') 464 self.assertEqual(data[0], b'LOGIN completed') 465 self.assertEqual(client.state, 'AUTH') 466 467 def test_logout(self): 468 client, _ = self._setup(SimpleIMAPHandler) 469 typ, data = client.login('user', 'pass') 470 self.assertEqual(typ, 'OK') 471 self.assertEqual(data[0], b'LOGIN completed') 472 typ, data = client.logout() 473 self.assertEqual(typ, 'BYE', (typ, data)) 474 self.assertEqual(data[0], b'IMAP4ref1 Server logging out', (typ, data)) 475 self.assertEqual(client.state, 'LOGOUT') 476 477 def test_lsub(self): 478 class LsubCmd(SimpleIMAPHandler): 479 def cmd_LSUB(self, tag, args): 480 self._send_textline('* LSUB () "." directoryA') 481 return self._send_tagged(tag, 'OK', 'LSUB completed') 482 client, _ = self._setup(LsubCmd) 483 client.login('user', 'pass') 484 typ, data = client.lsub() 485 self.assertEqual(typ, 'OK') 486 self.assertEqual(data[0], b'() "." directoryA') 487 488 489class NewIMAPTests(NewIMAPTestsMixin, unittest.TestCase): 490 imap_class = imaplib.IMAP4 491 server_class = socketserver.TCPServer 492 493 494@unittest.skipUnless(ssl, "SSL not available") 495class NewIMAPSSLTests(NewIMAPTestsMixin, unittest.TestCase): 496 imap_class = IMAP4_SSL 497 server_class = SecureTCPServer 498 499 def test_ssl_raises(self): 500 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 501 self.assertEqual(ssl_context.verify_mode, ssl.CERT_REQUIRED) 502 self.assertEqual(ssl_context.check_hostname, True) 503 ssl_context.load_verify_locations(CAFILE) 504 505 with self.assertRaisesRegex(ssl.CertificateError, 506 "IP address mismatch, certificate is not valid for " 507 "'127.0.0.1'"): 508 _, server = self._setup(SimpleIMAPHandler) 509 client = self.imap_class(*server.server_address, 510 ssl_context=ssl_context) 511 client.shutdown() 512 513 def test_ssl_verified(self): 514 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 515 ssl_context.load_verify_locations(CAFILE) 516 517 _, server = self._setup(SimpleIMAPHandler) 518 client = self.imap_class("localhost", server.server_address[1], 519 ssl_context=ssl_context) 520 client.shutdown() 521 522 # Mock the private method _connect(), so mark the test as specific 523 # to CPython stdlib 524 @cpython_only 525 def test_certfile_arg_warn(self): 526 with support.check_warnings(('', DeprecationWarning)): 527 with mock.patch.object(self.imap_class, 'open'): 528 with mock.patch.object(self.imap_class, '_connect'): 529 self.imap_class('localhost', 143, certfile=CERTFILE) 530 531class ThreadedNetworkedTests(unittest.TestCase): 532 server_class = socketserver.TCPServer 533 imap_class = imaplib.IMAP4 534 535 def make_server(self, addr, hdlr): 536 537 class MyServer(self.server_class): 538 def handle_error(self, request, client_address): 539 self.close_request(request) 540 self.server_close() 541 raise 542 543 if verbose: 544 print("creating server") 545 server = MyServer(addr, hdlr) 546 self.assertEqual(server.server_address, server.socket.getsockname()) 547 548 if verbose: 549 print("server created") 550 print("ADDR =", addr) 551 print("CLASS =", self.server_class) 552 print("HDLR =", server.RequestHandlerClass) 553 554 t = threading.Thread( 555 name='%s serving' % self.server_class, 556 target=server.serve_forever, 557 # Short poll interval to make the test finish quickly. 558 # Time between requests is short enough that we won't wake 559 # up spuriously too many times. 560 kwargs={'poll_interval': 0.01}) 561 t.daemon = True # In case this function raises. 562 t.start() 563 if verbose: 564 print("server running") 565 return server, t 566 567 def reap_server(self, server, thread): 568 if verbose: 569 print("waiting for server") 570 server.shutdown() 571 server.server_close() 572 thread.join() 573 if verbose: 574 print("done") 575 576 @contextmanager 577 def reaped_server(self, hdlr): 578 server, thread = self.make_server((support.HOST, 0), hdlr) 579 try: 580 yield server 581 finally: 582 self.reap_server(server, thread) 583 584 @contextmanager 585 def reaped_pair(self, hdlr): 586 with self.reaped_server(hdlr) as server: 587 client = self.imap_class(*server.server_address) 588 try: 589 yield server, client 590 finally: 591 client.logout() 592 593 @reap_threads 594 def test_connect(self): 595 with self.reaped_server(SimpleIMAPHandler) as server: 596 client = self.imap_class(*server.server_address) 597 client.shutdown() 598 599 @reap_threads 600 def test_bracket_flags(self): 601 602 # This violates RFC 3501, which disallows ']' characters in tag names, 603 # but imaplib has allowed producing such tags forever, other programs 604 # also produce them (eg: OtherInbox's Organizer app as of 20140716), 605 # and Gmail, for example, accepts them and produces them. So we 606 # support them. See issue #21815. 607 608 class BracketFlagHandler(SimpleIMAPHandler): 609 610 def handle(self): 611 self.flags = ['Answered', 'Flagged', 'Deleted', 'Seen', 'Draft'] 612 super().handle() 613 614 def cmd_AUTHENTICATE(self, tag, args): 615 self._send_textline('+') 616 self.server.response = yield 617 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 618 619 def cmd_SELECT(self, tag, args): 620 flag_msg = ' \\'.join(self.flags) 621 self._send_line(('* FLAGS (%s)' % flag_msg).encode('ascii')) 622 self._send_line(b'* 2 EXISTS') 623 self._send_line(b'* 0 RECENT') 624 msg = ('* OK [PERMANENTFLAGS %s \\*)] Flags permitted.' 625 % flag_msg) 626 self._send_line(msg.encode('ascii')) 627 self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.') 628 629 def cmd_STORE(self, tag, args): 630 new_flags = args[2].strip('(').strip(')').split() 631 self.flags.extend(new_flags) 632 flags_msg = '(FLAGS (%s))' % ' \\'.join(self.flags) 633 msg = '* %s FETCH %s' % (args[0], flags_msg) 634 self._send_line(msg.encode('ascii')) 635 self._send_tagged(tag, 'OK', 'STORE completed.') 636 637 with self.reaped_pair(BracketFlagHandler) as (server, client): 638 code, data = client.authenticate('MYAUTH', lambda x: b'fake') 639 self.assertEqual(code, 'OK') 640 self.assertEqual(server.response, b'ZmFrZQ==\r\n') 641 client.select('test') 642 typ, [data] = client.store(b'1', "+FLAGS", "[test]") 643 self.assertIn(b'[test]', data) 644 client.select('test') 645 typ, [data] = client.response('PERMANENTFLAGS') 646 self.assertIn(b'[test]', data) 647 648 @reap_threads 649 def test_issue5949(self): 650 651 class EOFHandler(socketserver.StreamRequestHandler): 652 def handle(self): 653 # EOF without sending a complete welcome message. 654 self.wfile.write(b'* OK') 655 656 with self.reaped_server(EOFHandler) as server: 657 self.assertRaises(imaplib.IMAP4.abort, 658 self.imap_class, *server.server_address) 659 660 @reap_threads 661 def test_line_termination(self): 662 663 class BadNewlineHandler(SimpleIMAPHandler): 664 665 def cmd_CAPABILITY(self, tag, args): 666 self._send(b'* CAPABILITY IMAP4rev1 AUTH\n') 667 self._send_tagged(tag, 'OK', 'CAPABILITY completed') 668 669 with self.reaped_server(BadNewlineHandler) as server: 670 self.assertRaises(imaplib.IMAP4.abort, 671 self.imap_class, *server.server_address) 672 673 class UTF8Server(SimpleIMAPHandler): 674 capabilities = 'AUTH ENABLE UTF8=ACCEPT' 675 676 def cmd_ENABLE(self, tag, args): 677 self._send_tagged(tag, 'OK', 'ENABLE successful') 678 679 def cmd_AUTHENTICATE(self, tag, args): 680 self._send_textline('+') 681 self.server.response = yield 682 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 683 684 @reap_threads 685 def test_enable_raises_error_if_not_AUTH(self): 686 with self.reaped_pair(self.UTF8Server) as (server, client): 687 self.assertFalse(client.utf8_enabled) 688 self.assertRaises(imaplib.IMAP4.error, client.enable, 'foo') 689 self.assertFalse(client.utf8_enabled) 690 691 # XXX Also need a test that enable after SELECT raises an error. 692 693 @reap_threads 694 def test_enable_raises_error_if_no_capability(self): 695 class NoEnableServer(self.UTF8Server): 696 capabilities = 'AUTH' 697 with self.reaped_pair(NoEnableServer) as (server, client): 698 self.assertRaises(imaplib.IMAP4.error, client.enable, 'foo') 699 700 @reap_threads 701 def test_enable_UTF8_raises_error_if_not_supported(self): 702 class NonUTF8Server(SimpleIMAPHandler): 703 pass 704 with self.assertRaises(imaplib.IMAP4.error): 705 with self.reaped_pair(NonUTF8Server) as (server, client): 706 typ, data = client.login('user', 'pass') 707 self.assertEqual(typ, 'OK') 708 client.enable('UTF8=ACCEPT') 709 pass 710 711 @reap_threads 712 def test_enable_UTF8_True_append(self): 713 714 class UTF8AppendServer(self.UTF8Server): 715 def cmd_APPEND(self, tag, args): 716 self._send_textline('+') 717 self.server.response = yield 718 self._send_tagged(tag, 'OK', 'okay') 719 720 with self.reaped_pair(UTF8AppendServer) as (server, client): 721 self.assertEqual(client._encoding, 'ascii') 722 code, _ = client.authenticate('MYAUTH', lambda x: b'fake') 723 self.assertEqual(code, 'OK') 724 self.assertEqual(server.response, 725 b'ZmFrZQ==\r\n') # b64 encoded 'fake' 726 code, _ = client.enable('UTF8=ACCEPT') 727 self.assertEqual(code, 'OK') 728 self.assertEqual(client._encoding, 'utf-8') 729 msg_string = 'Subject: üñí©öðé' 730 typ, data = client.append( 731 None, None, None, msg_string.encode('utf-8')) 732 self.assertEqual(typ, 'OK') 733 self.assertEqual( 734 server.response, 735 ('UTF8 (%s)\r\n' % msg_string).encode('utf-8') 736 ) 737 738 # XXX also need a test that makes sure that the Literal and Untagged_status 739 # regexes uses unicode in UTF8 mode instead of the default ASCII. 740 741 @reap_threads 742 def test_search_disallows_charset_in_utf8_mode(self): 743 with self.reaped_pair(self.UTF8Server) as (server, client): 744 typ, _ = client.authenticate('MYAUTH', lambda x: b'fake') 745 self.assertEqual(typ, 'OK') 746 typ, _ = client.enable('UTF8=ACCEPT') 747 self.assertEqual(typ, 'OK') 748 self.assertTrue(client.utf8_enabled) 749 self.assertRaises(imaplib.IMAP4.error, client.search, 'foo', 'bar') 750 751 @reap_threads 752 def test_bad_auth_name(self): 753 754 class MyServer(SimpleIMAPHandler): 755 756 def cmd_AUTHENTICATE(self, tag, args): 757 self._send_tagged(tag, 'NO', 'unrecognized authentication ' 758 'type {}'.format(args[0])) 759 760 with self.reaped_pair(MyServer) as (server, client): 761 with self.assertRaises(imaplib.IMAP4.error): 762 client.authenticate('METHOD', lambda: 1) 763 764 @reap_threads 765 def test_invalid_authentication(self): 766 767 class MyServer(SimpleIMAPHandler): 768 769 def cmd_AUTHENTICATE(self, tag, args): 770 self._send_textline('+') 771 self.response = yield 772 self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid') 773 774 with self.reaped_pair(MyServer) as (server, client): 775 with self.assertRaises(imaplib.IMAP4.error): 776 code, data = client.authenticate('MYAUTH', lambda x: b'fake') 777 778 @reap_threads 779 def test_valid_authentication(self): 780 781 class MyServer(SimpleIMAPHandler): 782 783 def cmd_AUTHENTICATE(self, tag, args): 784 self._send_textline('+') 785 self.server.response = yield 786 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 787 788 with self.reaped_pair(MyServer) as (server, client): 789 code, data = client.authenticate('MYAUTH', lambda x: b'fake') 790 self.assertEqual(code, 'OK') 791 self.assertEqual(server.response, 792 b'ZmFrZQ==\r\n') # b64 encoded 'fake' 793 794 with self.reaped_pair(MyServer) as (server, client): 795 code, data = client.authenticate('MYAUTH', lambda x: 'fake') 796 self.assertEqual(code, 'OK') 797 self.assertEqual(server.response, 798 b'ZmFrZQ==\r\n') # b64 encoded 'fake' 799 800 @reap_threads 801 def test_login_cram_md5(self): 802 803 class AuthHandler(SimpleIMAPHandler): 804 805 capabilities = 'LOGINDISABLED AUTH=CRAM-MD5' 806 807 def cmd_AUTHENTICATE(self, tag, args): 808 self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm' 809 'VzdG9uLm1jaS5uZXQ=') 810 r = yield 811 if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT' 812 b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'): 813 self._send_tagged(tag, 'OK', 'CRAM-MD5 successful') 814 else: 815 self._send_tagged(tag, 'NO', 'No access') 816 817 with self.reaped_pair(AuthHandler) as (server, client): 818 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities) 819 ret, data = client.login_cram_md5("tim", "tanstaaftanstaaf") 820 self.assertEqual(ret, "OK") 821 822 with self.reaped_pair(AuthHandler) as (server, client): 823 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities) 824 ret, data = client.login_cram_md5("tim", b"tanstaaftanstaaf") 825 self.assertEqual(ret, "OK") 826 827 828 @reap_threads 829 def test_aborted_authentication(self): 830 831 class MyServer(SimpleIMAPHandler): 832 833 def cmd_AUTHENTICATE(self, tag, args): 834 self._send_textline('+') 835 self.response = yield 836 837 if self.response == b'*\r\n': 838 self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] aborted') 839 else: 840 self._send_tagged(tag, 'OK', 'MYAUTH successful') 841 842 with self.reaped_pair(MyServer) as (server, client): 843 with self.assertRaises(imaplib.IMAP4.error): 844 code, data = client.authenticate('MYAUTH', lambda x: None) 845 846 847 def test_linetoolong(self): 848 class TooLongHandler(SimpleIMAPHandler): 849 def handle(self): 850 # Send a very long response line 851 self.wfile.write(b'* OK ' + imaplib._MAXLINE * b'x' + b'\r\n') 852 853 with self.reaped_server(TooLongHandler) as server: 854 self.assertRaises(imaplib.IMAP4.error, 855 self.imap_class, *server.server_address) 856 857 @reap_threads 858 def test_simple_with_statement(self): 859 # simplest call 860 with self.reaped_server(SimpleIMAPHandler) as server: 861 with self.imap_class(*server.server_address): 862 pass 863 864 @reap_threads 865 def test_with_statement(self): 866 with self.reaped_server(SimpleIMAPHandler) as server: 867 with self.imap_class(*server.server_address) as imap: 868 imap.login('user', 'pass') 869 self.assertEqual(server.logged, 'user') 870 self.assertIsNone(server.logged) 871 872 @reap_threads 873 def test_with_statement_logout(self): 874 # what happens if already logout in the block? 875 with self.reaped_server(SimpleIMAPHandler) as server: 876 with self.imap_class(*server.server_address) as imap: 877 imap.login('user', 'pass') 878 self.assertEqual(server.logged, 'user') 879 imap.logout() 880 self.assertIsNone(server.logged) 881 self.assertIsNone(server.logged) 882 883 884@unittest.skipUnless(ssl, "SSL not available") 885class ThreadedNetworkedTestsSSL(ThreadedNetworkedTests): 886 server_class = SecureTCPServer 887 imap_class = IMAP4_SSL 888 889 @reap_threads 890 def test_ssl_verified(self): 891 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 892 ssl_context.load_verify_locations(CAFILE) 893 894 with self.assertRaisesRegex( 895 ssl.CertificateError, 896 "IP address mismatch, certificate is not valid for " 897 "'127.0.0.1'"): 898 with self.reaped_server(SimpleIMAPHandler) as server: 899 client = self.imap_class(*server.server_address, 900 ssl_context=ssl_context) 901 client.shutdown() 902 903 with self.reaped_server(SimpleIMAPHandler) as server: 904 client = self.imap_class("localhost", server.server_address[1], 905 ssl_context=ssl_context) 906 client.shutdown() 907 908 909@unittest.skipUnless( 910 support.is_resource_enabled('network'), 'network resource disabled') 911@unittest.skip('cyrus.andrew.cmu.edu blocks connections') 912class RemoteIMAPTest(unittest.TestCase): 913 host = 'cyrus.andrew.cmu.edu' 914 port = 143 915 username = 'anonymous' 916 password = 'pass' 917 imap_class = imaplib.IMAP4 918 919 def setUp(self): 920 with transient_internet(self.host): 921 self.server = self.imap_class(self.host, self.port) 922 923 def tearDown(self): 924 if self.server is not None: 925 with transient_internet(self.host): 926 self.server.logout() 927 928 def test_logincapa(self): 929 with transient_internet(self.host): 930 for cap in self.server.capabilities: 931 self.assertIsInstance(cap, str) 932 self.assertIn('LOGINDISABLED', self.server.capabilities) 933 self.assertIn('AUTH=ANONYMOUS', self.server.capabilities) 934 rs = self.server.login(self.username, self.password) 935 self.assertEqual(rs[0], 'OK') 936 937 def test_logout(self): 938 with transient_internet(self.host): 939 rs = self.server.logout() 940 self.server = None 941 self.assertEqual(rs[0], 'BYE', rs) 942 943 944@unittest.skipUnless(ssl, "SSL not available") 945@unittest.skipUnless( 946 support.is_resource_enabled('network'), 'network resource disabled') 947@unittest.skip('cyrus.andrew.cmu.edu blocks connections') 948class RemoteIMAP_STARTTLSTest(RemoteIMAPTest): 949 950 def setUp(self): 951 super().setUp() 952 with transient_internet(self.host): 953 rs = self.server.starttls() 954 self.assertEqual(rs[0], 'OK') 955 956 def test_logincapa(self): 957 for cap in self.server.capabilities: 958 self.assertIsInstance(cap, str) 959 self.assertNotIn('LOGINDISABLED', self.server.capabilities) 960 961 962@unittest.skipUnless(ssl, "SSL not available") 963@unittest.skip('cyrus.andrew.cmu.edu blocks connections') 964class RemoteIMAP_SSLTest(RemoteIMAPTest): 965 port = 993 966 imap_class = IMAP4_SSL 967 968 def setUp(self): 969 pass 970 971 def tearDown(self): 972 pass 973 974 def create_ssl_context(self): 975 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 976 ssl_context.check_hostname = False 977 ssl_context.verify_mode = ssl.CERT_NONE 978 ssl_context.load_cert_chain(CERTFILE) 979 return ssl_context 980 981 def check_logincapa(self, server): 982 try: 983 for cap in server.capabilities: 984 self.assertIsInstance(cap, str) 985 self.assertNotIn('LOGINDISABLED', server.capabilities) 986 self.assertIn('AUTH=PLAIN', server.capabilities) 987 rs = server.login(self.username, self.password) 988 self.assertEqual(rs[0], 'OK') 989 finally: 990 server.logout() 991 992 def test_logincapa(self): 993 with transient_internet(self.host): 994 _server = self.imap_class(self.host, self.port) 995 self.check_logincapa(_server) 996 997 def test_logout(self): 998 with transient_internet(self.host): 999 _server = self.imap_class(self.host, self.port) 1000 rs = _server.logout() 1001 self.assertEqual(rs[0], 'BYE', rs) 1002 1003 def test_ssl_context_certfile_exclusive(self): 1004 with transient_internet(self.host): 1005 self.assertRaises( 1006 ValueError, self.imap_class, self.host, self.port, 1007 certfile=CERTFILE, ssl_context=self.create_ssl_context()) 1008 1009 def test_ssl_context_keyfile_exclusive(self): 1010 with transient_internet(self.host): 1011 self.assertRaises( 1012 ValueError, self.imap_class, self.host, self.port, 1013 keyfile=CERTFILE, ssl_context=self.create_ssl_context()) 1014 1015 1016if __name__ == "__main__": 1017 unittest.main() 1018