1"""Test script for ftplib module.""" 2 3# Modified by Giampaolo Rodola' to test FTP class, IPv6 and TLS 4# environment 5 6import ftplib 7import socket 8import io 9import errno 10import os 11import threading 12import time 13try: 14 import ssl 15except ImportError: 16 ssl = None 17 18from unittest import TestCase, skipUnless 19from test import support 20from test.support import threading_helper 21from test.support import socket_helper 22from test.support import warnings_helper 23from test.support.socket_helper import HOST, HOSTv6 24 25import warnings 26with warnings.catch_warnings(): 27 warnings.simplefilter('ignore', DeprecationWarning) 28 import asyncore 29 import asynchat 30 31 32TIMEOUT = support.LOOPBACK_TIMEOUT 33DEFAULT_ENCODING = 'utf-8' 34# the dummy data returned by server over the data channel when 35# RETR, LIST, NLST, MLSD commands are issued 36RETR_DATA = 'abcde12345\r\n' * 1000 + 'non-ascii char \xAE\r\n' 37LIST_DATA = 'foo\r\nbar\r\n non-ascii char \xAE\r\n' 38NLST_DATA = 'foo\r\nbar\r\n non-ascii char \xAE\r\n' 39MLSD_DATA = ("type=cdir;perm=el;unique==keVO1+ZF4; test\r\n" 40 "type=pdir;perm=e;unique==keVO1+d?3; ..\r\n" 41 "type=OS.unix=slink:/foobar;perm=;unique==keVO1+4G4; foobar\r\n" 42 "type=OS.unix=chr-13/29;perm=;unique==keVO1+5G4; device\r\n" 43 "type=OS.unix=blk-11/108;perm=;unique==keVO1+6G4; block\r\n" 44 "type=file;perm=awr;unique==keVO1+8G4; writable\r\n" 45 "type=dir;perm=cpmel;unique==keVO1+7G4; promiscuous\r\n" 46 "type=dir;perm=;unique==keVO1+1t2; no-exec\r\n" 47 "type=file;perm=r;unique==keVO1+EG4; two words\r\n" 48 "type=file;perm=r;unique==keVO1+IH4; leading space\r\n" 49 "type=file;perm=r;unique==keVO1+1G4; file1\r\n" 50 "type=dir;perm=cpmel;unique==keVO1+7G4; incoming\r\n" 51 "type=file;perm=r;unique==keVO1+1G4; file2\r\n" 52 "type=file;perm=r;unique==keVO1+1G4; file3\r\n" 53 "type=file;perm=r;unique==keVO1+1G4; file4\r\n" 54 "type=dir;perm=cpmel;unique==SGP1; dir \xAE non-ascii char\r\n" 55 "type=file;perm=r;unique==SGP2; file \xAE non-ascii char\r\n") 56 57 58class DummyDTPHandler(asynchat.async_chat): 59 dtp_conn_closed = False 60 61 def __init__(self, conn, baseclass): 62 asynchat.async_chat.__init__(self, conn) 63 self.baseclass = baseclass 64 self.baseclass.last_received_data = '' 65 self.encoding = baseclass.encoding 66 67 def handle_read(self): 68 new_data = self.recv(1024).decode(self.encoding, 'replace') 69 self.baseclass.last_received_data += new_data 70 71 def handle_close(self): 72 # XXX: this method can be called many times in a row for a single 73 # connection, including in clear-text (non-TLS) mode. 74 # (behaviour witnessed with test_data_connection) 75 if not self.dtp_conn_closed: 76 self.baseclass.push('226 transfer complete') 77 self.close() 78 self.dtp_conn_closed = True 79 80 def push(self, what): 81 if self.baseclass.next_data is not None: 82 what = self.baseclass.next_data 83 self.baseclass.next_data = None 84 if not what: 85 return self.close_when_done() 86 super(DummyDTPHandler, self).push(what.encode(self.encoding)) 87 88 def handle_error(self): 89 raise Exception 90 91 92class DummyFTPHandler(asynchat.async_chat): 93 94 dtp_handler = DummyDTPHandler 95 96 def __init__(self, conn, encoding=DEFAULT_ENCODING): 97 asynchat.async_chat.__init__(self, conn) 98 # tells the socket to handle urgent data inline (ABOR command) 99 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_OOBINLINE, 1) 100 self.set_terminator(b"\r\n") 101 self.in_buffer = [] 102 self.dtp = None 103 self.last_received_cmd = None 104 self.last_received_data = '' 105 self.next_response = '' 106 self.next_data = None 107 self.rest = None 108 self.next_retr_data = RETR_DATA 109 self.push('220 welcome') 110 self.encoding = encoding 111 # We use this as the string IPv4 address to direct the client 112 # to in response to a PASV command. To test security behavior. 113 # https://bugs.python.org/issue43285/. 114 self.fake_pasv_server_ip = '252.253.254.255' 115 116 def collect_incoming_data(self, data): 117 self.in_buffer.append(data) 118 119 def found_terminator(self): 120 line = b''.join(self.in_buffer).decode(self.encoding) 121 self.in_buffer = [] 122 if self.next_response: 123 self.push(self.next_response) 124 self.next_response = '' 125 cmd = line.split(' ')[0].lower() 126 self.last_received_cmd = cmd 127 space = line.find(' ') 128 if space != -1: 129 arg = line[space + 1:] 130 else: 131 arg = "" 132 if hasattr(self, 'cmd_' + cmd): 133 method = getattr(self, 'cmd_' + cmd) 134 method(arg) 135 else: 136 self.push('550 command "%s" not understood.' %cmd) 137 138 def handle_error(self): 139 raise Exception 140 141 def push(self, data): 142 asynchat.async_chat.push(self, data.encode(self.encoding) + b'\r\n') 143 144 def cmd_port(self, arg): 145 addr = list(map(int, arg.split(','))) 146 ip = '%d.%d.%d.%d' %tuple(addr[:4]) 147 port = (addr[4] * 256) + addr[5] 148 s = socket.create_connection((ip, port), timeout=TIMEOUT) 149 self.dtp = self.dtp_handler(s, baseclass=self) 150 self.push('200 active data connection established') 151 152 def cmd_pasv(self, arg): 153 with socket.create_server((self.socket.getsockname()[0], 0)) as sock: 154 sock.settimeout(TIMEOUT) 155 port = sock.getsockname()[1] 156 ip = self.fake_pasv_server_ip 157 ip = ip.replace('.', ','); p1 = port / 256; p2 = port % 256 158 self.push('227 entering passive mode (%s,%d,%d)' %(ip, p1, p2)) 159 conn, addr = sock.accept() 160 self.dtp = self.dtp_handler(conn, baseclass=self) 161 162 def cmd_eprt(self, arg): 163 af, ip, port = arg.split(arg[0])[1:-1] 164 port = int(port) 165 s = socket.create_connection((ip, port), timeout=TIMEOUT) 166 self.dtp = self.dtp_handler(s, baseclass=self) 167 self.push('200 active data connection established') 168 169 def cmd_epsv(self, arg): 170 with socket.create_server((self.socket.getsockname()[0], 0), 171 family=socket.AF_INET6) as sock: 172 sock.settimeout(TIMEOUT) 173 port = sock.getsockname()[1] 174 self.push('229 entering extended passive mode (|||%d|)' %port) 175 conn, addr = sock.accept() 176 self.dtp = self.dtp_handler(conn, baseclass=self) 177 178 def cmd_echo(self, arg): 179 # sends back the received string (used by the test suite) 180 self.push(arg) 181 182 def cmd_noop(self, arg): 183 self.push('200 noop ok') 184 185 def cmd_user(self, arg): 186 self.push('331 username ok') 187 188 def cmd_pass(self, arg): 189 self.push('230 password ok') 190 191 def cmd_acct(self, arg): 192 self.push('230 acct ok') 193 194 def cmd_rnfr(self, arg): 195 self.push('350 rnfr ok') 196 197 def cmd_rnto(self, arg): 198 self.push('250 rnto ok') 199 200 def cmd_dele(self, arg): 201 self.push('250 dele ok') 202 203 def cmd_cwd(self, arg): 204 self.push('250 cwd ok') 205 206 def cmd_size(self, arg): 207 self.push('250 1000') 208 209 def cmd_mkd(self, arg): 210 self.push('257 "%s"' %arg) 211 212 def cmd_rmd(self, arg): 213 self.push('250 rmd ok') 214 215 def cmd_pwd(self, arg): 216 self.push('257 "pwd ok"') 217 218 def cmd_type(self, arg): 219 self.push('200 type ok') 220 221 def cmd_quit(self, arg): 222 self.push('221 quit ok') 223 self.close() 224 225 def cmd_abor(self, arg): 226 self.push('226 abor ok') 227 228 def cmd_stor(self, arg): 229 self.push('125 stor ok') 230 231 def cmd_rest(self, arg): 232 self.rest = arg 233 self.push('350 rest ok') 234 235 def cmd_retr(self, arg): 236 self.push('125 retr ok') 237 if self.rest is not None: 238 offset = int(self.rest) 239 else: 240 offset = 0 241 self.dtp.push(self.next_retr_data[offset:]) 242 self.dtp.close_when_done() 243 self.rest = None 244 245 def cmd_list(self, arg): 246 self.push('125 list ok') 247 self.dtp.push(LIST_DATA) 248 self.dtp.close_when_done() 249 250 def cmd_nlst(self, arg): 251 self.push('125 nlst ok') 252 self.dtp.push(NLST_DATA) 253 self.dtp.close_when_done() 254 255 def cmd_opts(self, arg): 256 self.push('200 opts ok') 257 258 def cmd_mlsd(self, arg): 259 self.push('125 mlsd ok') 260 self.dtp.push(MLSD_DATA) 261 self.dtp.close_when_done() 262 263 def cmd_setlongretr(self, arg): 264 # For testing. Next RETR will return long line. 265 self.next_retr_data = 'x' * int(arg) 266 self.push('125 setlongretr ok') 267 268 269class DummyFTPServer(asyncore.dispatcher, threading.Thread): 270 271 handler = DummyFTPHandler 272 273 def __init__(self, address, af=socket.AF_INET, encoding=DEFAULT_ENCODING): 274 threading.Thread.__init__(self) 275 asyncore.dispatcher.__init__(self) 276 self.daemon = True 277 self.create_socket(af, socket.SOCK_STREAM) 278 self.bind(address) 279 self.listen(5) 280 self.active = False 281 self.active_lock = threading.Lock() 282 self.host, self.port = self.socket.getsockname()[:2] 283 self.handler_instance = None 284 self.encoding = encoding 285 286 def start(self): 287 assert not self.active 288 self.__flag = threading.Event() 289 threading.Thread.start(self) 290 self.__flag.wait() 291 292 def run(self): 293 self.active = True 294 self.__flag.set() 295 while self.active and asyncore.socket_map: 296 self.active_lock.acquire() 297 asyncore.loop(timeout=0.1, count=1) 298 self.active_lock.release() 299 asyncore.close_all(ignore_all=True) 300 301 def stop(self): 302 assert self.active 303 self.active = False 304 self.join() 305 306 def handle_accepted(self, conn, addr): 307 self.handler_instance = self.handler(conn, encoding=self.encoding) 308 309 def handle_connect(self): 310 self.close() 311 handle_read = handle_connect 312 313 def writable(self): 314 return 0 315 316 def handle_error(self): 317 raise Exception 318 319 320if ssl is not None: 321 322 CERTFILE = os.path.join(os.path.dirname(__file__), "keycert3.pem") 323 CAFILE = os.path.join(os.path.dirname(__file__), "pycacert.pem") 324 325 class SSLConnection(asyncore.dispatcher): 326 """An asyncore.dispatcher subclass supporting TLS/SSL.""" 327 328 _ssl_accepting = False 329 _ssl_closing = False 330 331 def secure_connection(self): 332 context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 333 context.load_cert_chain(CERTFILE) 334 socket = context.wrap_socket(self.socket, 335 suppress_ragged_eofs=False, 336 server_side=True, 337 do_handshake_on_connect=False) 338 self.del_channel() 339 self.set_socket(socket) 340 self._ssl_accepting = True 341 342 def _do_ssl_handshake(self): 343 try: 344 self.socket.do_handshake() 345 except ssl.SSLError as err: 346 if err.args[0] in (ssl.SSL_ERROR_WANT_READ, 347 ssl.SSL_ERROR_WANT_WRITE): 348 return 349 elif err.args[0] == ssl.SSL_ERROR_EOF: 350 return self.handle_close() 351 # TODO: SSLError does not expose alert information 352 elif "SSLV3_ALERT_BAD_CERTIFICATE" in err.args[1]: 353 return self.handle_close() 354 raise 355 except OSError as err: 356 if err.args[0] == errno.ECONNABORTED: 357 return self.handle_close() 358 else: 359 self._ssl_accepting = False 360 361 def _do_ssl_shutdown(self): 362 self._ssl_closing = True 363 try: 364 self.socket = self.socket.unwrap() 365 except ssl.SSLError as err: 366 if err.args[0] in (ssl.SSL_ERROR_WANT_READ, 367 ssl.SSL_ERROR_WANT_WRITE): 368 return 369 except OSError: 370 # Any "socket error" corresponds to a SSL_ERROR_SYSCALL return 371 # from OpenSSL's SSL_shutdown(), corresponding to a 372 # closed socket condition. See also: 373 # http://www.mail-archive.com/openssl-users@openssl.org/msg60710.html 374 pass 375 self._ssl_closing = False 376 if getattr(self, '_ccc', False) is False: 377 super(SSLConnection, self).close() 378 else: 379 pass 380 381 def handle_read_event(self): 382 if self._ssl_accepting: 383 self._do_ssl_handshake() 384 elif self._ssl_closing: 385 self._do_ssl_shutdown() 386 else: 387 super(SSLConnection, self).handle_read_event() 388 389 def handle_write_event(self): 390 if self._ssl_accepting: 391 self._do_ssl_handshake() 392 elif self._ssl_closing: 393 self._do_ssl_shutdown() 394 else: 395 super(SSLConnection, self).handle_write_event() 396 397 def send(self, data): 398 try: 399 return super(SSLConnection, self).send(data) 400 except ssl.SSLError as err: 401 if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN, 402 ssl.SSL_ERROR_WANT_READ, 403 ssl.SSL_ERROR_WANT_WRITE): 404 return 0 405 raise 406 407 def recv(self, buffer_size): 408 try: 409 return super(SSLConnection, self).recv(buffer_size) 410 except ssl.SSLError as err: 411 if err.args[0] in (ssl.SSL_ERROR_WANT_READ, 412 ssl.SSL_ERROR_WANT_WRITE): 413 return b'' 414 if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN): 415 self.handle_close() 416 return b'' 417 raise 418 419 def handle_error(self): 420 raise Exception 421 422 def close(self): 423 if (isinstance(self.socket, ssl.SSLSocket) and 424 self.socket._sslobj is not None): 425 self._do_ssl_shutdown() 426 else: 427 super(SSLConnection, self).close() 428 429 430 class DummyTLS_DTPHandler(SSLConnection, DummyDTPHandler): 431 """A DummyDTPHandler subclass supporting TLS/SSL.""" 432 433 def __init__(self, conn, baseclass): 434 DummyDTPHandler.__init__(self, conn, baseclass) 435 if self.baseclass.secure_data_channel: 436 self.secure_connection() 437 438 439 class DummyTLS_FTPHandler(SSLConnection, DummyFTPHandler): 440 """A DummyFTPHandler subclass supporting TLS/SSL.""" 441 442 dtp_handler = DummyTLS_DTPHandler 443 444 def __init__(self, conn, encoding=DEFAULT_ENCODING): 445 DummyFTPHandler.__init__(self, conn, encoding=encoding) 446 self.secure_data_channel = False 447 self._ccc = False 448 449 def cmd_auth(self, line): 450 """Set up secure control channel.""" 451 self.push('234 AUTH TLS successful') 452 self.secure_connection() 453 454 def cmd_ccc(self, line): 455 self.push('220 Reverting back to clear-text') 456 self._ccc = True 457 self._do_ssl_shutdown() 458 459 def cmd_pbsz(self, line): 460 """Negotiate size of buffer for secure data transfer. 461 For TLS/SSL the only valid value for the parameter is '0'. 462 Any other value is accepted but ignored. 463 """ 464 self.push('200 PBSZ=0 successful.') 465 466 def cmd_prot(self, line): 467 """Setup un/secure data channel.""" 468 arg = line.upper() 469 if arg == 'C': 470 self.push('200 Protection set to Clear') 471 self.secure_data_channel = False 472 elif arg == 'P': 473 self.push('200 Protection set to Private') 474 self.secure_data_channel = True 475 else: 476 self.push("502 Unrecognized PROT type (use C or P).") 477 478 479 class DummyTLS_FTPServer(DummyFTPServer): 480 handler = DummyTLS_FTPHandler 481 482 483class TestFTPClass(TestCase): 484 485 def setUp(self, encoding=DEFAULT_ENCODING): 486 self.server = DummyFTPServer((HOST, 0), encoding=encoding) 487 self.server.start() 488 self.client = ftplib.FTP(timeout=TIMEOUT, encoding=encoding) 489 self.client.connect(self.server.host, self.server.port) 490 491 def tearDown(self): 492 self.client.close() 493 self.server.stop() 494 # Explicitly clear the attribute to prevent dangling thread 495 self.server = None 496 asyncore.close_all(ignore_all=True) 497 498 def check_data(self, received, expected): 499 self.assertEqual(len(received), len(expected)) 500 self.assertEqual(received, expected) 501 502 def test_getwelcome(self): 503 self.assertEqual(self.client.getwelcome(), '220 welcome') 504 505 def test_sanitize(self): 506 self.assertEqual(self.client.sanitize('foo'), repr('foo')) 507 self.assertEqual(self.client.sanitize('pass 12345'), repr('pass *****')) 508 self.assertEqual(self.client.sanitize('PASS 12345'), repr('PASS *****')) 509 510 def test_exceptions(self): 511 self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\r\n0') 512 self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\n0') 513 self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\r0') 514 self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 400') 515 self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 499') 516 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 500') 517 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 599') 518 self.assertRaises(ftplib.error_proto, self.client.sendcmd, 'echo 999') 519 520 def test_all_errors(self): 521 exceptions = (ftplib.error_reply, ftplib.error_temp, ftplib.error_perm, 522 ftplib.error_proto, ftplib.Error, OSError, 523 EOFError) 524 for x in exceptions: 525 try: 526 raise x('exception not included in all_errors set') 527 except ftplib.all_errors: 528 pass 529 530 def test_set_pasv(self): 531 # passive mode is supposed to be enabled by default 532 self.assertTrue(self.client.passiveserver) 533 self.client.set_pasv(True) 534 self.assertTrue(self.client.passiveserver) 535 self.client.set_pasv(False) 536 self.assertFalse(self.client.passiveserver) 537 538 def test_voidcmd(self): 539 self.client.voidcmd('echo 200') 540 self.client.voidcmd('echo 299') 541 self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 199') 542 self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 300') 543 544 def test_login(self): 545 self.client.login() 546 547 def test_acct(self): 548 self.client.acct('passwd') 549 550 def test_rename(self): 551 self.client.rename('a', 'b') 552 self.server.handler_instance.next_response = '200' 553 self.assertRaises(ftplib.error_reply, self.client.rename, 'a', 'b') 554 555 def test_delete(self): 556 self.client.delete('foo') 557 self.server.handler_instance.next_response = '199' 558 self.assertRaises(ftplib.error_reply, self.client.delete, 'foo') 559 560 def test_size(self): 561 self.client.size('foo') 562 563 def test_mkd(self): 564 dir = self.client.mkd('/foo') 565 self.assertEqual(dir, '/foo') 566 567 def test_rmd(self): 568 self.client.rmd('foo') 569 570 def test_cwd(self): 571 dir = self.client.cwd('/foo') 572 self.assertEqual(dir, '250 cwd ok') 573 574 def test_pwd(self): 575 dir = self.client.pwd() 576 self.assertEqual(dir, 'pwd ok') 577 578 def test_quit(self): 579 self.assertEqual(self.client.quit(), '221 quit ok') 580 # Ensure the connection gets closed; sock attribute should be None 581 self.assertEqual(self.client.sock, None) 582 583 def test_abort(self): 584 self.client.abort() 585 586 def test_retrbinary(self): 587 def callback(data): 588 received.append(data.decode(self.client.encoding)) 589 received = [] 590 self.client.retrbinary('retr', callback) 591 self.check_data(''.join(received), RETR_DATA) 592 593 def test_retrbinary_rest(self): 594 def callback(data): 595 received.append(data.decode(self.client.encoding)) 596 for rest in (0, 10, 20): 597 received = [] 598 self.client.retrbinary('retr', callback, rest=rest) 599 self.check_data(''.join(received), RETR_DATA[rest:]) 600 601 def test_retrlines(self): 602 received = [] 603 self.client.retrlines('retr', received.append) 604 self.check_data(''.join(received), RETR_DATA.replace('\r\n', '')) 605 606 def test_storbinary(self): 607 f = io.BytesIO(RETR_DATA.encode(self.client.encoding)) 608 self.client.storbinary('stor', f) 609 self.check_data(self.server.handler_instance.last_received_data, RETR_DATA) 610 # test new callback arg 611 flag = [] 612 f.seek(0) 613 self.client.storbinary('stor', f, callback=lambda x: flag.append(None)) 614 self.assertTrue(flag) 615 616 def test_storbinary_rest(self): 617 data = RETR_DATA.replace('\r\n', '\n').encode(self.client.encoding) 618 f = io.BytesIO(data) 619 for r in (30, '30'): 620 f.seek(0) 621 self.client.storbinary('stor', f, rest=r) 622 self.assertEqual(self.server.handler_instance.rest, str(r)) 623 624 def test_storlines(self): 625 data = RETR_DATA.replace('\r\n', '\n').encode(self.client.encoding) 626 f = io.BytesIO(data) 627 self.client.storlines('stor', f) 628 self.check_data(self.server.handler_instance.last_received_data, RETR_DATA) 629 # test new callback arg 630 flag = [] 631 f.seek(0) 632 self.client.storlines('stor foo', f, callback=lambda x: flag.append(None)) 633 self.assertTrue(flag) 634 635 f = io.StringIO(RETR_DATA.replace('\r\n', '\n')) 636 # storlines() expects a binary file, not a text file 637 with warnings_helper.check_warnings(('', BytesWarning), quiet=True): 638 self.assertRaises(TypeError, self.client.storlines, 'stor foo', f) 639 640 def test_nlst(self): 641 self.client.nlst() 642 self.assertEqual(self.client.nlst(), NLST_DATA.split('\r\n')[:-1]) 643 644 def test_dir(self): 645 l = [] 646 self.client.dir(lambda x: l.append(x)) 647 self.assertEqual(''.join(l), LIST_DATA.replace('\r\n', '')) 648 649 def test_mlsd(self): 650 list(self.client.mlsd()) 651 list(self.client.mlsd(path='/')) 652 list(self.client.mlsd(path='/', facts=['size', 'type'])) 653 654 ls = list(self.client.mlsd()) 655 for name, facts in ls: 656 self.assertIsInstance(name, str) 657 self.assertIsInstance(facts, dict) 658 self.assertTrue(name) 659 self.assertIn('type', facts) 660 self.assertIn('perm', facts) 661 self.assertIn('unique', facts) 662 663 def set_data(data): 664 self.server.handler_instance.next_data = data 665 666 def test_entry(line, type=None, perm=None, unique=None, name=None): 667 type = 'type' if type is None else type 668 perm = 'perm' if perm is None else perm 669 unique = 'unique' if unique is None else unique 670 name = 'name' if name is None else name 671 set_data(line) 672 _name, facts = next(self.client.mlsd()) 673 self.assertEqual(_name, name) 674 self.assertEqual(facts['type'], type) 675 self.assertEqual(facts['perm'], perm) 676 self.assertEqual(facts['unique'], unique) 677 678 # plain 679 test_entry('type=type;perm=perm;unique=unique; name\r\n') 680 # "=" in fact value 681 test_entry('type=ty=pe;perm=perm;unique=unique; name\r\n', type="ty=pe") 682 test_entry('type==type;perm=perm;unique=unique; name\r\n', type="=type") 683 test_entry('type=t=y=pe;perm=perm;unique=unique; name\r\n', type="t=y=pe") 684 test_entry('type=====;perm=perm;unique=unique; name\r\n', type="====") 685 # spaces in name 686 test_entry('type=type;perm=perm;unique=unique; na me\r\n', name="na me") 687 test_entry('type=type;perm=perm;unique=unique; name \r\n', name="name ") 688 test_entry('type=type;perm=perm;unique=unique; name\r\n', name=" name") 689 test_entry('type=type;perm=perm;unique=unique; n am e\r\n', name="n am e") 690 # ";" in name 691 test_entry('type=type;perm=perm;unique=unique; na;me\r\n', name="na;me") 692 test_entry('type=type;perm=perm;unique=unique; ;name\r\n', name=";name") 693 test_entry('type=type;perm=perm;unique=unique; ;name;\r\n', name=";name;") 694 test_entry('type=type;perm=perm;unique=unique; ;;;;\r\n', name=";;;;") 695 # case sensitiveness 696 set_data('Type=type;TyPe=perm;UNIQUE=unique; name\r\n') 697 _name, facts = next(self.client.mlsd()) 698 for x in facts: 699 self.assertTrue(x.islower()) 700 # no data (directory empty) 701 set_data('') 702 self.assertRaises(StopIteration, next, self.client.mlsd()) 703 set_data('') 704 for x in self.client.mlsd(): 705 self.fail("unexpected data %s" % x) 706 707 def test_makeport(self): 708 with self.client.makeport(): 709 # IPv4 is in use, just make sure send_eprt has not been used 710 self.assertEqual(self.server.handler_instance.last_received_cmd, 711 'port') 712 713 def test_makepasv(self): 714 host, port = self.client.makepasv() 715 conn = socket.create_connection((host, port), timeout=TIMEOUT) 716 conn.close() 717 # IPv4 is in use, just make sure send_epsv has not been used 718 self.assertEqual(self.server.handler_instance.last_received_cmd, 'pasv') 719 720 def test_makepasv_issue43285_security_disabled(self): 721 """Test the opt-in to the old vulnerable behavior.""" 722 self.client.trust_server_pasv_ipv4_address = True 723 bad_host, port = self.client.makepasv() 724 self.assertEqual( 725 bad_host, self.server.handler_instance.fake_pasv_server_ip) 726 # Opening and closing a connection keeps the dummy server happy 727 # instead of timing out on accept. 728 socket.create_connection((self.client.sock.getpeername()[0], port), 729 timeout=TIMEOUT).close() 730 731 def test_makepasv_issue43285_security_enabled_default(self): 732 self.assertFalse(self.client.trust_server_pasv_ipv4_address) 733 trusted_host, port = self.client.makepasv() 734 self.assertNotEqual( 735 trusted_host, self.server.handler_instance.fake_pasv_server_ip) 736 # Opening and closing a connection keeps the dummy server happy 737 # instead of timing out on accept. 738 socket.create_connection((trusted_host, port), timeout=TIMEOUT).close() 739 740 def test_with_statement(self): 741 self.client.quit() 742 743 def is_client_connected(): 744 if self.client.sock is None: 745 return False 746 try: 747 self.client.sendcmd('noop') 748 except (OSError, EOFError): 749 return False 750 return True 751 752 # base test 753 with ftplib.FTP(timeout=TIMEOUT) as self.client: 754 self.client.connect(self.server.host, self.server.port) 755 self.client.sendcmd('noop') 756 self.assertTrue(is_client_connected()) 757 self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit') 758 self.assertFalse(is_client_connected()) 759 760 # QUIT sent inside the with block 761 with ftplib.FTP(timeout=TIMEOUT) as self.client: 762 self.client.connect(self.server.host, self.server.port) 763 self.client.sendcmd('noop') 764 self.client.quit() 765 self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit') 766 self.assertFalse(is_client_connected()) 767 768 # force a wrong response code to be sent on QUIT: error_perm 769 # is expected and the connection is supposed to be closed 770 try: 771 with ftplib.FTP(timeout=TIMEOUT) as self.client: 772 self.client.connect(self.server.host, self.server.port) 773 self.client.sendcmd('noop') 774 self.server.handler_instance.next_response = '550 error on quit' 775 except ftplib.error_perm as err: 776 self.assertEqual(str(err), '550 error on quit') 777 else: 778 self.fail('Exception not raised') 779 # needed to give the threaded server some time to set the attribute 780 # which otherwise would still be == 'noop' 781 time.sleep(0.1) 782 self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit') 783 self.assertFalse(is_client_connected()) 784 785 def test_source_address(self): 786 self.client.quit() 787 port = socket_helper.find_unused_port() 788 try: 789 self.client.connect(self.server.host, self.server.port, 790 source_address=(HOST, port)) 791 self.assertEqual(self.client.sock.getsockname()[1], port) 792 self.client.quit() 793 except OSError as e: 794 if e.errno == errno.EADDRINUSE: 795 self.skipTest("couldn't bind to port %d" % port) 796 raise 797 798 def test_source_address_passive_connection(self): 799 port = socket_helper.find_unused_port() 800 self.client.source_address = (HOST, port) 801 try: 802 with self.client.transfercmd('list') as sock: 803 self.assertEqual(sock.getsockname()[1], port) 804 except OSError as e: 805 if e.errno == errno.EADDRINUSE: 806 self.skipTest("couldn't bind to port %d" % port) 807 raise 808 809 def test_parse257(self): 810 self.assertEqual(ftplib.parse257('257 "/foo/bar"'), '/foo/bar') 811 self.assertEqual(ftplib.parse257('257 "/foo/bar" created'), '/foo/bar') 812 self.assertEqual(ftplib.parse257('257 ""'), '') 813 self.assertEqual(ftplib.parse257('257 "" created'), '') 814 self.assertRaises(ftplib.error_reply, ftplib.parse257, '250 "/foo/bar"') 815 # The 257 response is supposed to include the directory 816 # name and in case it contains embedded double-quotes 817 # they must be doubled (see RFC-959, chapter 7, appendix 2). 818 self.assertEqual(ftplib.parse257('257 "/foo/b""ar"'), '/foo/b"ar') 819 self.assertEqual(ftplib.parse257('257 "/foo/b""ar" created'), '/foo/b"ar') 820 821 def test_line_too_long(self): 822 self.assertRaises(ftplib.Error, self.client.sendcmd, 823 'x' * self.client.maxline * 2) 824 825 def test_retrlines_too_long(self): 826 self.client.sendcmd('SETLONGRETR %d' % (self.client.maxline * 2)) 827 received = [] 828 self.assertRaises(ftplib.Error, 829 self.client.retrlines, 'retr', received.append) 830 831 def test_storlines_too_long(self): 832 f = io.BytesIO(b'x' * self.client.maxline * 2) 833 self.assertRaises(ftplib.Error, self.client.storlines, 'stor', f) 834 835 def test_encoding_param(self): 836 encodings = ['latin-1', 'utf-8'] 837 for encoding in encodings: 838 with self.subTest(encoding=encoding): 839 self.tearDown() 840 self.setUp(encoding=encoding) 841 self.assertEqual(encoding, self.client.encoding) 842 self.test_retrbinary() 843 self.test_storbinary() 844 self.test_retrlines() 845 new_dir = self.client.mkd('/non-ascii dir \xAE') 846 self.check_data(new_dir, '/non-ascii dir \xAE') 847 # Check default encoding 848 client = ftplib.FTP(timeout=TIMEOUT) 849 self.assertEqual(DEFAULT_ENCODING, client.encoding) 850 851 852@skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled") 853class TestIPv6Environment(TestCase): 854 855 def setUp(self): 856 self.server = DummyFTPServer((HOSTv6, 0), 857 af=socket.AF_INET6, 858 encoding=DEFAULT_ENCODING) 859 self.server.start() 860 self.client = ftplib.FTP(timeout=TIMEOUT, encoding=DEFAULT_ENCODING) 861 self.client.connect(self.server.host, self.server.port) 862 863 def tearDown(self): 864 self.client.close() 865 self.server.stop() 866 # Explicitly clear the attribute to prevent dangling thread 867 self.server = None 868 asyncore.close_all(ignore_all=True) 869 870 def test_af(self): 871 self.assertEqual(self.client.af, socket.AF_INET6) 872 873 def test_makeport(self): 874 with self.client.makeport(): 875 self.assertEqual(self.server.handler_instance.last_received_cmd, 876 'eprt') 877 878 def test_makepasv(self): 879 host, port = self.client.makepasv() 880 conn = socket.create_connection((host, port), timeout=TIMEOUT) 881 conn.close() 882 self.assertEqual(self.server.handler_instance.last_received_cmd, 'epsv') 883 884 def test_transfer(self): 885 def retr(): 886 def callback(data): 887 received.append(data.decode(self.client.encoding)) 888 received = [] 889 self.client.retrbinary('retr', callback) 890 self.assertEqual(len(''.join(received)), len(RETR_DATA)) 891 self.assertEqual(''.join(received), RETR_DATA) 892 self.client.set_pasv(True) 893 retr() 894 self.client.set_pasv(False) 895 retr() 896 897 898@skipUnless(ssl, "SSL not available") 899class TestTLS_FTPClassMixin(TestFTPClass): 900 """Repeat TestFTPClass tests starting the TLS layer for both control 901 and data connections first. 902 """ 903 904 def setUp(self, encoding=DEFAULT_ENCODING): 905 self.server = DummyTLS_FTPServer((HOST, 0), encoding=encoding) 906 self.server.start() 907 self.client = ftplib.FTP_TLS(timeout=TIMEOUT, encoding=encoding) 908 self.client.connect(self.server.host, self.server.port) 909 # enable TLS 910 self.client.auth() 911 self.client.prot_p() 912 913 914@skipUnless(ssl, "SSL not available") 915class TestTLS_FTPClass(TestCase): 916 """Specific TLS_FTP class tests.""" 917 918 def setUp(self, encoding=DEFAULT_ENCODING): 919 self.server = DummyTLS_FTPServer((HOST, 0), encoding=encoding) 920 self.server.start() 921 self.client = ftplib.FTP_TLS(timeout=TIMEOUT) 922 self.client.connect(self.server.host, self.server.port) 923 924 def tearDown(self): 925 self.client.close() 926 self.server.stop() 927 # Explicitly clear the attribute to prevent dangling thread 928 self.server = None 929 asyncore.close_all(ignore_all=True) 930 931 def test_control_connection(self): 932 self.assertNotIsInstance(self.client.sock, ssl.SSLSocket) 933 self.client.auth() 934 self.assertIsInstance(self.client.sock, ssl.SSLSocket) 935 936 def test_data_connection(self): 937 # clear text 938 with self.client.transfercmd('list') as sock: 939 self.assertNotIsInstance(sock, ssl.SSLSocket) 940 self.assertEqual(sock.recv(1024), 941 LIST_DATA.encode(self.client.encoding)) 942 self.assertEqual(self.client.voidresp(), "226 transfer complete") 943 944 # secured, after PROT P 945 self.client.prot_p() 946 with self.client.transfercmd('list') as sock: 947 self.assertIsInstance(sock, ssl.SSLSocket) 948 # consume from SSL socket to finalize handshake and avoid 949 # "SSLError [SSL] shutdown while in init" 950 self.assertEqual(sock.recv(1024), 951 LIST_DATA.encode(self.client.encoding)) 952 self.assertEqual(self.client.voidresp(), "226 transfer complete") 953 954 # PROT C is issued, the connection must be in cleartext again 955 self.client.prot_c() 956 with self.client.transfercmd('list') as sock: 957 self.assertNotIsInstance(sock, ssl.SSLSocket) 958 self.assertEqual(sock.recv(1024), 959 LIST_DATA.encode(self.client.encoding)) 960 self.assertEqual(self.client.voidresp(), "226 transfer complete") 961 962 def test_login(self): 963 # login() is supposed to implicitly secure the control connection 964 self.assertNotIsInstance(self.client.sock, ssl.SSLSocket) 965 self.client.login() 966 self.assertIsInstance(self.client.sock, ssl.SSLSocket) 967 # make sure that AUTH TLS doesn't get issued again 968 self.client.login() 969 970 def test_auth_issued_twice(self): 971 self.client.auth() 972 self.assertRaises(ValueError, self.client.auth) 973 974 def test_context(self): 975 self.client.quit() 976 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 977 ctx.check_hostname = False 978 ctx.verify_mode = ssl.CERT_NONE 979 self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE, 980 context=ctx) 981 self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE, 982 context=ctx) 983 self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE, 984 keyfile=CERTFILE, context=ctx) 985 986 self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT) 987 self.client.connect(self.server.host, self.server.port) 988 self.assertNotIsInstance(self.client.sock, ssl.SSLSocket) 989 self.client.auth() 990 self.assertIs(self.client.sock.context, ctx) 991 self.assertIsInstance(self.client.sock, ssl.SSLSocket) 992 993 self.client.prot_p() 994 with self.client.transfercmd('list') as sock: 995 self.assertIs(sock.context, ctx) 996 self.assertIsInstance(sock, ssl.SSLSocket) 997 998 def test_ccc(self): 999 self.assertRaises(ValueError, self.client.ccc) 1000 self.client.login(secure=True) 1001 self.assertIsInstance(self.client.sock, ssl.SSLSocket) 1002 self.client.ccc() 1003 self.assertRaises(ValueError, self.client.sock.unwrap) 1004 1005 @skipUnless(False, "FIXME: bpo-32706") 1006 def test_check_hostname(self): 1007 self.client.quit() 1008 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1009 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) 1010 self.assertEqual(ctx.check_hostname, True) 1011 ctx.load_verify_locations(CAFILE) 1012 self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT) 1013 1014 # 127.0.0.1 doesn't match SAN 1015 self.client.connect(self.server.host, self.server.port) 1016 with self.assertRaises(ssl.CertificateError): 1017 self.client.auth() 1018 # exception quits connection 1019 1020 self.client.connect(self.server.host, self.server.port) 1021 self.client.prot_p() 1022 with self.assertRaises(ssl.CertificateError): 1023 with self.client.transfercmd("list") as sock: 1024 pass 1025 self.client.quit() 1026 1027 self.client.connect("localhost", self.server.port) 1028 self.client.auth() 1029 self.client.quit() 1030 1031 self.client.connect("localhost", self.server.port) 1032 self.client.prot_p() 1033 with self.client.transfercmd("list") as sock: 1034 pass 1035 1036 1037class TestTimeouts(TestCase): 1038 1039 def setUp(self): 1040 self.evt = threading.Event() 1041 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1042 self.sock.settimeout(20) 1043 self.port = socket_helper.bind_port(self.sock) 1044 self.server_thread = threading.Thread(target=self.server) 1045 self.server_thread.daemon = True 1046 self.server_thread.start() 1047 # Wait for the server to be ready. 1048 self.evt.wait() 1049 self.evt.clear() 1050 self.old_port = ftplib.FTP.port 1051 ftplib.FTP.port = self.port 1052 1053 def tearDown(self): 1054 ftplib.FTP.port = self.old_port 1055 self.server_thread.join() 1056 # Explicitly clear the attribute to prevent dangling thread 1057 self.server_thread = None 1058 1059 def server(self): 1060 # This method sets the evt 3 times: 1061 # 1) when the connection is ready to be accepted. 1062 # 2) when it is safe for the caller to close the connection 1063 # 3) when we have closed the socket 1064 self.sock.listen() 1065 # (1) Signal the caller that we are ready to accept the connection. 1066 self.evt.set() 1067 try: 1068 conn, addr = self.sock.accept() 1069 except TimeoutError: 1070 pass 1071 else: 1072 conn.sendall(b"1 Hola mundo\n") 1073 conn.shutdown(socket.SHUT_WR) 1074 # (2) Signal the caller that it is safe to close the socket. 1075 self.evt.set() 1076 conn.close() 1077 finally: 1078 self.sock.close() 1079 1080 def testTimeoutDefault(self): 1081 # default -- use global socket timeout 1082 self.assertIsNone(socket.getdefaulttimeout()) 1083 socket.setdefaulttimeout(30) 1084 try: 1085 ftp = ftplib.FTP(HOST) 1086 finally: 1087 socket.setdefaulttimeout(None) 1088 self.assertEqual(ftp.sock.gettimeout(), 30) 1089 self.evt.wait() 1090 ftp.close() 1091 1092 def testTimeoutNone(self): 1093 # no timeout -- do not use global socket timeout 1094 self.assertIsNone(socket.getdefaulttimeout()) 1095 socket.setdefaulttimeout(30) 1096 try: 1097 ftp = ftplib.FTP(HOST, timeout=None) 1098 finally: 1099 socket.setdefaulttimeout(None) 1100 self.assertIsNone(ftp.sock.gettimeout()) 1101 self.evt.wait() 1102 ftp.close() 1103 1104 def testTimeoutValue(self): 1105 # a value 1106 ftp = ftplib.FTP(HOST, timeout=30) 1107 self.assertEqual(ftp.sock.gettimeout(), 30) 1108 self.evt.wait() 1109 ftp.close() 1110 1111 # bpo-39259 1112 with self.assertRaises(ValueError): 1113 ftplib.FTP(HOST, timeout=0) 1114 1115 def testTimeoutConnect(self): 1116 ftp = ftplib.FTP() 1117 ftp.connect(HOST, timeout=30) 1118 self.assertEqual(ftp.sock.gettimeout(), 30) 1119 self.evt.wait() 1120 ftp.close() 1121 1122 def testTimeoutDifferentOrder(self): 1123 ftp = ftplib.FTP(timeout=30) 1124 ftp.connect(HOST) 1125 self.assertEqual(ftp.sock.gettimeout(), 30) 1126 self.evt.wait() 1127 ftp.close() 1128 1129 def testTimeoutDirectAccess(self): 1130 ftp = ftplib.FTP() 1131 ftp.timeout = 30 1132 ftp.connect(HOST) 1133 self.assertEqual(ftp.sock.gettimeout(), 30) 1134 self.evt.wait() 1135 ftp.close() 1136 1137 1138class MiscTestCase(TestCase): 1139 def test__all__(self): 1140 not_exported = { 1141 'MSG_OOB', 'FTP_PORT', 'MAXLINE', 'CRLF', 'B_CRLF', 'Error', 1142 'parse150', 'parse227', 'parse229', 'parse257', 'print_line', 1143 'ftpcp', 'test'} 1144 support.check__all__(self, ftplib, not_exported=not_exported) 1145 1146 1147def test_main(): 1148 tests = [TestFTPClass, TestTimeouts, 1149 TestIPv6Environment, 1150 TestTLS_FTPClassMixin, TestTLS_FTPClass, 1151 MiscTestCase] 1152 1153 thread_info = threading_helper.threading_setup() 1154 try: 1155 support.run_unittest(*tests) 1156 finally: 1157 threading_helper.threading_cleanup(*thread_info) 1158 1159 1160if __name__ == '__main__': 1161 test_main() 1162