1"""Test script for ftplib module."""
2
3# Modified by Giampaolo Rodola' to test FTP class, IPv6 and TLS
4# environment
5
6import ftplib
7import socket
8import io
9import errno
10import os
11import threading
12import time
13try:
14    import ssl
15except ImportError:
16    ssl = None
17
18from unittest import TestCase, skipUnless
19from test import support
20from test.support import threading_helper
21from test.support import socket_helper
22from test.support import warnings_helper
23from test.support.socket_helper import HOST, HOSTv6
24
25import warnings
26with warnings.catch_warnings():
27    warnings.simplefilter('ignore', DeprecationWarning)
28    import asyncore
29    import asynchat
30
31
32TIMEOUT = support.LOOPBACK_TIMEOUT
33DEFAULT_ENCODING = 'utf-8'
34# the dummy data returned by server over the data channel when
35# RETR, LIST, NLST, MLSD commands are issued
36RETR_DATA = 'abcde12345\r\n' * 1000 + 'non-ascii char \xAE\r\n'
37LIST_DATA = 'foo\r\nbar\r\n non-ascii char \xAE\r\n'
38NLST_DATA = 'foo\r\nbar\r\n non-ascii char \xAE\r\n'
39MLSD_DATA = ("type=cdir;perm=el;unique==keVO1+ZF4; test\r\n"
40             "type=pdir;perm=e;unique==keVO1+d?3; ..\r\n"
41             "type=OS.unix=slink:/foobar;perm=;unique==keVO1+4G4; foobar\r\n"
42             "type=OS.unix=chr-13/29;perm=;unique==keVO1+5G4; device\r\n"
43             "type=OS.unix=blk-11/108;perm=;unique==keVO1+6G4; block\r\n"
44             "type=file;perm=awr;unique==keVO1+8G4; writable\r\n"
45             "type=dir;perm=cpmel;unique==keVO1+7G4; promiscuous\r\n"
46             "type=dir;perm=;unique==keVO1+1t2; no-exec\r\n"
47             "type=file;perm=r;unique==keVO1+EG4; two words\r\n"
48             "type=file;perm=r;unique==keVO1+IH4;  leading space\r\n"
49             "type=file;perm=r;unique==keVO1+1G4; file1\r\n"
50             "type=dir;perm=cpmel;unique==keVO1+7G4; incoming\r\n"
51             "type=file;perm=r;unique==keVO1+1G4; file2\r\n"
52             "type=file;perm=r;unique==keVO1+1G4; file3\r\n"
53             "type=file;perm=r;unique==keVO1+1G4; file4\r\n"
54             "type=dir;perm=cpmel;unique==SGP1; dir \xAE non-ascii char\r\n"
55             "type=file;perm=r;unique==SGP2; file \xAE non-ascii char\r\n")
56
57
58class DummyDTPHandler(asynchat.async_chat):
59    dtp_conn_closed = False
60
61    def __init__(self, conn, baseclass):
62        asynchat.async_chat.__init__(self, conn)
63        self.baseclass = baseclass
64        self.baseclass.last_received_data = ''
65        self.encoding = baseclass.encoding
66
67    def handle_read(self):
68        new_data = self.recv(1024).decode(self.encoding, 'replace')
69        self.baseclass.last_received_data += new_data
70
71    def handle_close(self):
72        # XXX: this method can be called many times in a row for a single
73        # connection, including in clear-text (non-TLS) mode.
74        # (behaviour witnessed with test_data_connection)
75        if not self.dtp_conn_closed:
76            self.baseclass.push('226 transfer complete')
77            self.close()
78            self.dtp_conn_closed = True
79
80    def push(self, what):
81        if self.baseclass.next_data is not None:
82            what = self.baseclass.next_data
83            self.baseclass.next_data = None
84        if not what:
85            return self.close_when_done()
86        super(DummyDTPHandler, self).push(what.encode(self.encoding))
87
88    def handle_error(self):
89        raise Exception
90
91
92class DummyFTPHandler(asynchat.async_chat):
93
94    dtp_handler = DummyDTPHandler
95
96    def __init__(self, conn, encoding=DEFAULT_ENCODING):
97        asynchat.async_chat.__init__(self, conn)
98        # tells the socket to handle urgent data inline (ABOR command)
99        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_OOBINLINE, 1)
100        self.set_terminator(b"\r\n")
101        self.in_buffer = []
102        self.dtp = None
103        self.last_received_cmd = None
104        self.last_received_data = ''
105        self.next_response = ''
106        self.next_data = None
107        self.rest = None
108        self.next_retr_data = RETR_DATA
109        self.push('220 welcome')
110        self.encoding = encoding
111        # We use this as the string IPv4 address to direct the client
112        # to in response to a PASV command.  To test security behavior.
113        # https://bugs.python.org/issue43285/.
114        self.fake_pasv_server_ip = '252.253.254.255'
115
116    def collect_incoming_data(self, data):
117        self.in_buffer.append(data)
118
119    def found_terminator(self):
120        line = b''.join(self.in_buffer).decode(self.encoding)
121        self.in_buffer = []
122        if self.next_response:
123            self.push(self.next_response)
124            self.next_response = ''
125        cmd = line.split(' ')[0].lower()
126        self.last_received_cmd = cmd
127        space = line.find(' ')
128        if space != -1:
129            arg = line[space + 1:]
130        else:
131            arg = ""
132        if hasattr(self, 'cmd_' + cmd):
133            method = getattr(self, 'cmd_' + cmd)
134            method(arg)
135        else:
136            self.push('550 command "%s" not understood.' %cmd)
137
138    def handle_error(self):
139        raise Exception
140
141    def push(self, data):
142        asynchat.async_chat.push(self, data.encode(self.encoding) + b'\r\n')
143
144    def cmd_port(self, arg):
145        addr = list(map(int, arg.split(',')))
146        ip = '%d.%d.%d.%d' %tuple(addr[:4])
147        port = (addr[4] * 256) + addr[5]
148        s = socket.create_connection((ip, port), timeout=TIMEOUT)
149        self.dtp = self.dtp_handler(s, baseclass=self)
150        self.push('200 active data connection established')
151
152    def cmd_pasv(self, arg):
153        with socket.create_server((self.socket.getsockname()[0], 0)) as sock:
154            sock.settimeout(TIMEOUT)
155            port = sock.getsockname()[1]
156            ip = self.fake_pasv_server_ip
157            ip = ip.replace('.', ','); p1 = port / 256; p2 = port % 256
158            self.push('227 entering passive mode (%s,%d,%d)' %(ip, p1, p2))
159            conn, addr = sock.accept()
160            self.dtp = self.dtp_handler(conn, baseclass=self)
161
162    def cmd_eprt(self, arg):
163        af, ip, port = arg.split(arg[0])[1:-1]
164        port = int(port)
165        s = socket.create_connection((ip, port), timeout=TIMEOUT)
166        self.dtp = self.dtp_handler(s, baseclass=self)
167        self.push('200 active data connection established')
168
169    def cmd_epsv(self, arg):
170        with socket.create_server((self.socket.getsockname()[0], 0),
171                                  family=socket.AF_INET6) as sock:
172            sock.settimeout(TIMEOUT)
173            port = sock.getsockname()[1]
174            self.push('229 entering extended passive mode (|||%d|)' %port)
175            conn, addr = sock.accept()
176            self.dtp = self.dtp_handler(conn, baseclass=self)
177
178    def cmd_echo(self, arg):
179        # sends back the received string (used by the test suite)
180        self.push(arg)
181
182    def cmd_noop(self, arg):
183        self.push('200 noop ok')
184
185    def cmd_user(self, arg):
186        self.push('331 username ok')
187
188    def cmd_pass(self, arg):
189        self.push('230 password ok')
190
191    def cmd_acct(self, arg):
192        self.push('230 acct ok')
193
194    def cmd_rnfr(self, arg):
195        self.push('350 rnfr ok')
196
197    def cmd_rnto(self, arg):
198        self.push('250 rnto ok')
199
200    def cmd_dele(self, arg):
201        self.push('250 dele ok')
202
203    def cmd_cwd(self, arg):
204        self.push('250 cwd ok')
205
206    def cmd_size(self, arg):
207        self.push('250 1000')
208
209    def cmd_mkd(self, arg):
210        self.push('257 "%s"' %arg)
211
212    def cmd_rmd(self, arg):
213        self.push('250 rmd ok')
214
215    def cmd_pwd(self, arg):
216        self.push('257 "pwd ok"')
217
218    def cmd_type(self, arg):
219        self.push('200 type ok')
220
221    def cmd_quit(self, arg):
222        self.push('221 quit ok')
223        self.close()
224
225    def cmd_abor(self, arg):
226        self.push('226 abor ok')
227
228    def cmd_stor(self, arg):
229        self.push('125 stor ok')
230
231    def cmd_rest(self, arg):
232        self.rest = arg
233        self.push('350 rest ok')
234
235    def cmd_retr(self, arg):
236        self.push('125 retr ok')
237        if self.rest is not None:
238            offset = int(self.rest)
239        else:
240            offset = 0
241        self.dtp.push(self.next_retr_data[offset:])
242        self.dtp.close_when_done()
243        self.rest = None
244
245    def cmd_list(self, arg):
246        self.push('125 list ok')
247        self.dtp.push(LIST_DATA)
248        self.dtp.close_when_done()
249
250    def cmd_nlst(self, arg):
251        self.push('125 nlst ok')
252        self.dtp.push(NLST_DATA)
253        self.dtp.close_when_done()
254
255    def cmd_opts(self, arg):
256        self.push('200 opts ok')
257
258    def cmd_mlsd(self, arg):
259        self.push('125 mlsd ok')
260        self.dtp.push(MLSD_DATA)
261        self.dtp.close_when_done()
262
263    def cmd_setlongretr(self, arg):
264        # For testing. Next RETR will return long line.
265        self.next_retr_data = 'x' * int(arg)
266        self.push('125 setlongretr ok')
267
268
269class DummyFTPServer(asyncore.dispatcher, threading.Thread):
270
271    handler = DummyFTPHandler
272
273    def __init__(self, address, af=socket.AF_INET, encoding=DEFAULT_ENCODING):
274        threading.Thread.__init__(self)
275        asyncore.dispatcher.__init__(self)
276        self.daemon = True
277        self.create_socket(af, socket.SOCK_STREAM)
278        self.bind(address)
279        self.listen(5)
280        self.active = False
281        self.active_lock = threading.Lock()
282        self.host, self.port = self.socket.getsockname()[:2]
283        self.handler_instance = None
284        self.encoding = encoding
285
286    def start(self):
287        assert not self.active
288        self.__flag = threading.Event()
289        threading.Thread.start(self)
290        self.__flag.wait()
291
292    def run(self):
293        self.active = True
294        self.__flag.set()
295        while self.active and asyncore.socket_map:
296            self.active_lock.acquire()
297            asyncore.loop(timeout=0.1, count=1)
298            self.active_lock.release()
299        asyncore.close_all(ignore_all=True)
300
301    def stop(self):
302        assert self.active
303        self.active = False
304        self.join()
305
306    def handle_accepted(self, conn, addr):
307        self.handler_instance = self.handler(conn, encoding=self.encoding)
308
309    def handle_connect(self):
310        self.close()
311    handle_read = handle_connect
312
313    def writable(self):
314        return 0
315
316    def handle_error(self):
317        raise Exception
318
319
320if ssl is not None:
321
322    CERTFILE = os.path.join(os.path.dirname(__file__), "keycert3.pem")
323    CAFILE = os.path.join(os.path.dirname(__file__), "pycacert.pem")
324
325    class SSLConnection(asyncore.dispatcher):
326        """An asyncore.dispatcher subclass supporting TLS/SSL."""
327
328        _ssl_accepting = False
329        _ssl_closing = False
330
331        def secure_connection(self):
332            context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
333            context.load_cert_chain(CERTFILE)
334            socket = context.wrap_socket(self.socket,
335                                         suppress_ragged_eofs=False,
336                                         server_side=True,
337                                         do_handshake_on_connect=False)
338            self.del_channel()
339            self.set_socket(socket)
340            self._ssl_accepting = True
341
342        def _do_ssl_handshake(self):
343            try:
344                self.socket.do_handshake()
345            except ssl.SSLError as err:
346                if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
347                                   ssl.SSL_ERROR_WANT_WRITE):
348                    return
349                elif err.args[0] == ssl.SSL_ERROR_EOF:
350                    return self.handle_close()
351                # TODO: SSLError does not expose alert information
352                elif "SSLV3_ALERT_BAD_CERTIFICATE" in err.args[1]:
353                    return self.handle_close()
354                raise
355            except OSError as err:
356                if err.args[0] == errno.ECONNABORTED:
357                    return self.handle_close()
358            else:
359                self._ssl_accepting = False
360
361        def _do_ssl_shutdown(self):
362            self._ssl_closing = True
363            try:
364                self.socket = self.socket.unwrap()
365            except ssl.SSLError as err:
366                if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
367                                   ssl.SSL_ERROR_WANT_WRITE):
368                    return
369            except OSError:
370                # Any "socket error" corresponds to a SSL_ERROR_SYSCALL return
371                # from OpenSSL's SSL_shutdown(), corresponding to a
372                # closed socket condition. See also:
373                # http://www.mail-archive.com/openssl-users@openssl.org/msg60710.html
374                pass
375            self._ssl_closing = False
376            if getattr(self, '_ccc', False) is False:
377                super(SSLConnection, self).close()
378            else:
379                pass
380
381        def handle_read_event(self):
382            if self._ssl_accepting:
383                self._do_ssl_handshake()
384            elif self._ssl_closing:
385                self._do_ssl_shutdown()
386            else:
387                super(SSLConnection, self).handle_read_event()
388
389        def handle_write_event(self):
390            if self._ssl_accepting:
391                self._do_ssl_handshake()
392            elif self._ssl_closing:
393                self._do_ssl_shutdown()
394            else:
395                super(SSLConnection, self).handle_write_event()
396
397        def send(self, data):
398            try:
399                return super(SSLConnection, self).send(data)
400            except ssl.SSLError as err:
401                if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN,
402                                   ssl.SSL_ERROR_WANT_READ,
403                                   ssl.SSL_ERROR_WANT_WRITE):
404                    return 0
405                raise
406
407        def recv(self, buffer_size):
408            try:
409                return super(SSLConnection, self).recv(buffer_size)
410            except ssl.SSLError as err:
411                if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
412                                   ssl.SSL_ERROR_WANT_WRITE):
413                    return b''
414                if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN):
415                    self.handle_close()
416                    return b''
417                raise
418
419        def handle_error(self):
420            raise Exception
421
422        def close(self):
423            if (isinstance(self.socket, ssl.SSLSocket) and
424                    self.socket._sslobj is not None):
425                self._do_ssl_shutdown()
426            else:
427                super(SSLConnection, self).close()
428
429
430    class DummyTLS_DTPHandler(SSLConnection, DummyDTPHandler):
431        """A DummyDTPHandler subclass supporting TLS/SSL."""
432
433        def __init__(self, conn, baseclass):
434            DummyDTPHandler.__init__(self, conn, baseclass)
435            if self.baseclass.secure_data_channel:
436                self.secure_connection()
437
438
439    class DummyTLS_FTPHandler(SSLConnection, DummyFTPHandler):
440        """A DummyFTPHandler subclass supporting TLS/SSL."""
441
442        dtp_handler = DummyTLS_DTPHandler
443
444        def __init__(self, conn, encoding=DEFAULT_ENCODING):
445            DummyFTPHandler.__init__(self, conn, encoding=encoding)
446            self.secure_data_channel = False
447            self._ccc = False
448
449        def cmd_auth(self, line):
450            """Set up secure control channel."""
451            self.push('234 AUTH TLS successful')
452            self.secure_connection()
453
454        def cmd_ccc(self, line):
455            self.push('220 Reverting back to clear-text')
456            self._ccc = True
457            self._do_ssl_shutdown()
458
459        def cmd_pbsz(self, line):
460            """Negotiate size of buffer for secure data transfer.
461            For TLS/SSL the only valid value for the parameter is '0'.
462            Any other value is accepted but ignored.
463            """
464            self.push('200 PBSZ=0 successful.')
465
466        def cmd_prot(self, line):
467            """Setup un/secure data channel."""
468            arg = line.upper()
469            if arg == 'C':
470                self.push('200 Protection set to Clear')
471                self.secure_data_channel = False
472            elif arg == 'P':
473                self.push('200 Protection set to Private')
474                self.secure_data_channel = True
475            else:
476                self.push("502 Unrecognized PROT type (use C or P).")
477
478
479    class DummyTLS_FTPServer(DummyFTPServer):
480        handler = DummyTLS_FTPHandler
481
482
483class TestFTPClass(TestCase):
484
485    def setUp(self, encoding=DEFAULT_ENCODING):
486        self.server = DummyFTPServer((HOST, 0), encoding=encoding)
487        self.server.start()
488        self.client = ftplib.FTP(timeout=TIMEOUT, encoding=encoding)
489        self.client.connect(self.server.host, self.server.port)
490
491    def tearDown(self):
492        self.client.close()
493        self.server.stop()
494        # Explicitly clear the attribute to prevent dangling thread
495        self.server = None
496        asyncore.close_all(ignore_all=True)
497
498    def check_data(self, received, expected):
499        self.assertEqual(len(received), len(expected))
500        self.assertEqual(received, expected)
501
502    def test_getwelcome(self):
503        self.assertEqual(self.client.getwelcome(), '220 welcome')
504
505    def test_sanitize(self):
506        self.assertEqual(self.client.sanitize('foo'), repr('foo'))
507        self.assertEqual(self.client.sanitize('pass 12345'), repr('pass *****'))
508        self.assertEqual(self.client.sanitize('PASS 12345'), repr('PASS *****'))
509
510    def test_exceptions(self):
511        self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\r\n0')
512        self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\n0')
513        self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\r0')
514        self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 400')
515        self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 499')
516        self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 500')
517        self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 599')
518        self.assertRaises(ftplib.error_proto, self.client.sendcmd, 'echo 999')
519
520    def test_all_errors(self):
521        exceptions = (ftplib.error_reply, ftplib.error_temp, ftplib.error_perm,
522                      ftplib.error_proto, ftplib.Error, OSError,
523                      EOFError)
524        for x in exceptions:
525            try:
526                raise x('exception not included in all_errors set')
527            except ftplib.all_errors:
528                pass
529
530    def test_set_pasv(self):
531        # passive mode is supposed to be enabled by default
532        self.assertTrue(self.client.passiveserver)
533        self.client.set_pasv(True)
534        self.assertTrue(self.client.passiveserver)
535        self.client.set_pasv(False)
536        self.assertFalse(self.client.passiveserver)
537
538    def test_voidcmd(self):
539        self.client.voidcmd('echo 200')
540        self.client.voidcmd('echo 299')
541        self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 199')
542        self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 300')
543
544    def test_login(self):
545        self.client.login()
546
547    def test_acct(self):
548        self.client.acct('passwd')
549
550    def test_rename(self):
551        self.client.rename('a', 'b')
552        self.server.handler_instance.next_response = '200'
553        self.assertRaises(ftplib.error_reply, self.client.rename, 'a', 'b')
554
555    def test_delete(self):
556        self.client.delete('foo')
557        self.server.handler_instance.next_response = '199'
558        self.assertRaises(ftplib.error_reply, self.client.delete, 'foo')
559
560    def test_size(self):
561        self.client.size('foo')
562
563    def test_mkd(self):
564        dir = self.client.mkd('/foo')
565        self.assertEqual(dir, '/foo')
566
567    def test_rmd(self):
568        self.client.rmd('foo')
569
570    def test_cwd(self):
571        dir = self.client.cwd('/foo')
572        self.assertEqual(dir, '250 cwd ok')
573
574    def test_pwd(self):
575        dir = self.client.pwd()
576        self.assertEqual(dir, 'pwd ok')
577
578    def test_quit(self):
579        self.assertEqual(self.client.quit(), '221 quit ok')
580        # Ensure the connection gets closed; sock attribute should be None
581        self.assertEqual(self.client.sock, None)
582
583    def test_abort(self):
584        self.client.abort()
585
586    def test_retrbinary(self):
587        def callback(data):
588            received.append(data.decode(self.client.encoding))
589        received = []
590        self.client.retrbinary('retr', callback)
591        self.check_data(''.join(received), RETR_DATA)
592
593    def test_retrbinary_rest(self):
594        def callback(data):
595            received.append(data.decode(self.client.encoding))
596        for rest in (0, 10, 20):
597            received = []
598            self.client.retrbinary('retr', callback, rest=rest)
599            self.check_data(''.join(received), RETR_DATA[rest:])
600
601    def test_retrlines(self):
602        received = []
603        self.client.retrlines('retr', received.append)
604        self.check_data(''.join(received), RETR_DATA.replace('\r\n', ''))
605
606    def test_storbinary(self):
607        f = io.BytesIO(RETR_DATA.encode(self.client.encoding))
608        self.client.storbinary('stor', f)
609        self.check_data(self.server.handler_instance.last_received_data, RETR_DATA)
610        # test new callback arg
611        flag = []
612        f.seek(0)
613        self.client.storbinary('stor', f, callback=lambda x: flag.append(None))
614        self.assertTrue(flag)
615
616    def test_storbinary_rest(self):
617        data = RETR_DATA.replace('\r\n', '\n').encode(self.client.encoding)
618        f = io.BytesIO(data)
619        for r in (30, '30'):
620            f.seek(0)
621            self.client.storbinary('stor', f, rest=r)
622            self.assertEqual(self.server.handler_instance.rest, str(r))
623
624    def test_storlines(self):
625        data = RETR_DATA.replace('\r\n', '\n').encode(self.client.encoding)
626        f = io.BytesIO(data)
627        self.client.storlines('stor', f)
628        self.check_data(self.server.handler_instance.last_received_data, RETR_DATA)
629        # test new callback arg
630        flag = []
631        f.seek(0)
632        self.client.storlines('stor foo', f, callback=lambda x: flag.append(None))
633        self.assertTrue(flag)
634
635        f = io.StringIO(RETR_DATA.replace('\r\n', '\n'))
636        # storlines() expects a binary file, not a text file
637        with warnings_helper.check_warnings(('', BytesWarning), quiet=True):
638            self.assertRaises(TypeError, self.client.storlines, 'stor foo', f)
639
640    def test_nlst(self):
641        self.client.nlst()
642        self.assertEqual(self.client.nlst(), NLST_DATA.split('\r\n')[:-1])
643
644    def test_dir(self):
645        l = []
646        self.client.dir(lambda x: l.append(x))
647        self.assertEqual(''.join(l), LIST_DATA.replace('\r\n', ''))
648
649    def test_mlsd(self):
650        list(self.client.mlsd())
651        list(self.client.mlsd(path='/'))
652        list(self.client.mlsd(path='/', facts=['size', 'type']))
653
654        ls = list(self.client.mlsd())
655        for name, facts in ls:
656            self.assertIsInstance(name, str)
657            self.assertIsInstance(facts, dict)
658            self.assertTrue(name)
659            self.assertIn('type', facts)
660            self.assertIn('perm', facts)
661            self.assertIn('unique', facts)
662
663        def set_data(data):
664            self.server.handler_instance.next_data = data
665
666        def test_entry(line, type=None, perm=None, unique=None, name=None):
667            type = 'type' if type is None else type
668            perm = 'perm' if perm is None else perm
669            unique = 'unique' if unique is None else unique
670            name = 'name' if name is None else name
671            set_data(line)
672            _name, facts = next(self.client.mlsd())
673            self.assertEqual(_name, name)
674            self.assertEqual(facts['type'], type)
675            self.assertEqual(facts['perm'], perm)
676            self.assertEqual(facts['unique'], unique)
677
678        # plain
679        test_entry('type=type;perm=perm;unique=unique; name\r\n')
680        # "=" in fact value
681        test_entry('type=ty=pe;perm=perm;unique=unique; name\r\n', type="ty=pe")
682        test_entry('type==type;perm=perm;unique=unique; name\r\n', type="=type")
683        test_entry('type=t=y=pe;perm=perm;unique=unique; name\r\n', type="t=y=pe")
684        test_entry('type=====;perm=perm;unique=unique; name\r\n', type="====")
685        # spaces in name
686        test_entry('type=type;perm=perm;unique=unique; na me\r\n', name="na me")
687        test_entry('type=type;perm=perm;unique=unique; name \r\n', name="name ")
688        test_entry('type=type;perm=perm;unique=unique;  name\r\n', name=" name")
689        test_entry('type=type;perm=perm;unique=unique; n am  e\r\n', name="n am  e")
690        # ";" in name
691        test_entry('type=type;perm=perm;unique=unique; na;me\r\n', name="na;me")
692        test_entry('type=type;perm=perm;unique=unique; ;name\r\n', name=";name")
693        test_entry('type=type;perm=perm;unique=unique; ;name;\r\n', name=";name;")
694        test_entry('type=type;perm=perm;unique=unique; ;;;;\r\n', name=";;;;")
695        # case sensitiveness
696        set_data('Type=type;TyPe=perm;UNIQUE=unique; name\r\n')
697        _name, facts = next(self.client.mlsd())
698        for x in facts:
699            self.assertTrue(x.islower())
700        # no data (directory empty)
701        set_data('')
702        self.assertRaises(StopIteration, next, self.client.mlsd())
703        set_data('')
704        for x in self.client.mlsd():
705            self.fail("unexpected data %s" % x)
706
707    def test_makeport(self):
708        with self.client.makeport():
709            # IPv4 is in use, just make sure send_eprt has not been used
710            self.assertEqual(self.server.handler_instance.last_received_cmd,
711                                'port')
712
713    def test_makepasv(self):
714        host, port = self.client.makepasv()
715        conn = socket.create_connection((host, port), timeout=TIMEOUT)
716        conn.close()
717        # IPv4 is in use, just make sure send_epsv has not been used
718        self.assertEqual(self.server.handler_instance.last_received_cmd, 'pasv')
719
720    def test_makepasv_issue43285_security_disabled(self):
721        """Test the opt-in to the old vulnerable behavior."""
722        self.client.trust_server_pasv_ipv4_address = True
723        bad_host, port = self.client.makepasv()
724        self.assertEqual(
725                bad_host, self.server.handler_instance.fake_pasv_server_ip)
726        # Opening and closing a connection keeps the dummy server happy
727        # instead of timing out on accept.
728        socket.create_connection((self.client.sock.getpeername()[0], port),
729                                 timeout=TIMEOUT).close()
730
731    def test_makepasv_issue43285_security_enabled_default(self):
732        self.assertFalse(self.client.trust_server_pasv_ipv4_address)
733        trusted_host, port = self.client.makepasv()
734        self.assertNotEqual(
735                trusted_host, self.server.handler_instance.fake_pasv_server_ip)
736        # Opening and closing a connection keeps the dummy server happy
737        # instead of timing out on accept.
738        socket.create_connection((trusted_host, port), timeout=TIMEOUT).close()
739
740    def test_with_statement(self):
741        self.client.quit()
742
743        def is_client_connected():
744            if self.client.sock is None:
745                return False
746            try:
747                self.client.sendcmd('noop')
748            except (OSError, EOFError):
749                return False
750            return True
751
752        # base test
753        with ftplib.FTP(timeout=TIMEOUT) as self.client:
754            self.client.connect(self.server.host, self.server.port)
755            self.client.sendcmd('noop')
756            self.assertTrue(is_client_connected())
757        self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
758        self.assertFalse(is_client_connected())
759
760        # QUIT sent inside the with block
761        with ftplib.FTP(timeout=TIMEOUT) as self.client:
762            self.client.connect(self.server.host, self.server.port)
763            self.client.sendcmd('noop')
764            self.client.quit()
765        self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
766        self.assertFalse(is_client_connected())
767
768        # force a wrong response code to be sent on QUIT: error_perm
769        # is expected and the connection is supposed to be closed
770        try:
771            with ftplib.FTP(timeout=TIMEOUT) as self.client:
772                self.client.connect(self.server.host, self.server.port)
773                self.client.sendcmd('noop')
774                self.server.handler_instance.next_response = '550 error on quit'
775        except ftplib.error_perm as err:
776            self.assertEqual(str(err), '550 error on quit')
777        else:
778            self.fail('Exception not raised')
779        # needed to give the threaded server some time to set the attribute
780        # which otherwise would still be == 'noop'
781        time.sleep(0.1)
782        self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
783        self.assertFalse(is_client_connected())
784
785    def test_source_address(self):
786        self.client.quit()
787        port = socket_helper.find_unused_port()
788        try:
789            self.client.connect(self.server.host, self.server.port,
790                                source_address=(HOST, port))
791            self.assertEqual(self.client.sock.getsockname()[1], port)
792            self.client.quit()
793        except OSError as e:
794            if e.errno == errno.EADDRINUSE:
795                self.skipTest("couldn't bind to port %d" % port)
796            raise
797
798    def test_source_address_passive_connection(self):
799        port = socket_helper.find_unused_port()
800        self.client.source_address = (HOST, port)
801        try:
802            with self.client.transfercmd('list') as sock:
803                self.assertEqual(sock.getsockname()[1], port)
804        except OSError as e:
805            if e.errno == errno.EADDRINUSE:
806                self.skipTest("couldn't bind to port %d" % port)
807            raise
808
809    def test_parse257(self):
810        self.assertEqual(ftplib.parse257('257 "/foo/bar"'), '/foo/bar')
811        self.assertEqual(ftplib.parse257('257 "/foo/bar" created'), '/foo/bar')
812        self.assertEqual(ftplib.parse257('257 ""'), '')
813        self.assertEqual(ftplib.parse257('257 "" created'), '')
814        self.assertRaises(ftplib.error_reply, ftplib.parse257, '250 "/foo/bar"')
815        # The 257 response is supposed to include the directory
816        # name and in case it contains embedded double-quotes
817        # they must be doubled (see RFC-959, chapter 7, appendix 2).
818        self.assertEqual(ftplib.parse257('257 "/foo/b""ar"'), '/foo/b"ar')
819        self.assertEqual(ftplib.parse257('257 "/foo/b""ar" created'), '/foo/b"ar')
820
821    def test_line_too_long(self):
822        self.assertRaises(ftplib.Error, self.client.sendcmd,
823                          'x' * self.client.maxline * 2)
824
825    def test_retrlines_too_long(self):
826        self.client.sendcmd('SETLONGRETR %d' % (self.client.maxline * 2))
827        received = []
828        self.assertRaises(ftplib.Error,
829                          self.client.retrlines, 'retr', received.append)
830
831    def test_storlines_too_long(self):
832        f = io.BytesIO(b'x' * self.client.maxline * 2)
833        self.assertRaises(ftplib.Error, self.client.storlines, 'stor', f)
834
835    def test_encoding_param(self):
836        encodings = ['latin-1', 'utf-8']
837        for encoding in encodings:
838            with self.subTest(encoding=encoding):
839                self.tearDown()
840                self.setUp(encoding=encoding)
841                self.assertEqual(encoding, self.client.encoding)
842                self.test_retrbinary()
843                self.test_storbinary()
844                self.test_retrlines()
845                new_dir = self.client.mkd('/non-ascii dir \xAE')
846                self.check_data(new_dir, '/non-ascii dir \xAE')
847        # Check default encoding
848        client = ftplib.FTP(timeout=TIMEOUT)
849        self.assertEqual(DEFAULT_ENCODING, client.encoding)
850
851
852@skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled")
853class TestIPv6Environment(TestCase):
854
855    def setUp(self):
856        self.server = DummyFTPServer((HOSTv6, 0),
857                                     af=socket.AF_INET6,
858                                     encoding=DEFAULT_ENCODING)
859        self.server.start()
860        self.client = ftplib.FTP(timeout=TIMEOUT, encoding=DEFAULT_ENCODING)
861        self.client.connect(self.server.host, self.server.port)
862
863    def tearDown(self):
864        self.client.close()
865        self.server.stop()
866        # Explicitly clear the attribute to prevent dangling thread
867        self.server = None
868        asyncore.close_all(ignore_all=True)
869
870    def test_af(self):
871        self.assertEqual(self.client.af, socket.AF_INET6)
872
873    def test_makeport(self):
874        with self.client.makeport():
875            self.assertEqual(self.server.handler_instance.last_received_cmd,
876                                'eprt')
877
878    def test_makepasv(self):
879        host, port = self.client.makepasv()
880        conn = socket.create_connection((host, port), timeout=TIMEOUT)
881        conn.close()
882        self.assertEqual(self.server.handler_instance.last_received_cmd, 'epsv')
883
884    def test_transfer(self):
885        def retr():
886            def callback(data):
887                received.append(data.decode(self.client.encoding))
888            received = []
889            self.client.retrbinary('retr', callback)
890            self.assertEqual(len(''.join(received)), len(RETR_DATA))
891            self.assertEqual(''.join(received), RETR_DATA)
892        self.client.set_pasv(True)
893        retr()
894        self.client.set_pasv(False)
895        retr()
896
897
898@skipUnless(ssl, "SSL not available")
899class TestTLS_FTPClassMixin(TestFTPClass):
900    """Repeat TestFTPClass tests starting the TLS layer for both control
901    and data connections first.
902    """
903
904    def setUp(self, encoding=DEFAULT_ENCODING):
905        self.server = DummyTLS_FTPServer((HOST, 0), encoding=encoding)
906        self.server.start()
907        self.client = ftplib.FTP_TLS(timeout=TIMEOUT, encoding=encoding)
908        self.client.connect(self.server.host, self.server.port)
909        # enable TLS
910        self.client.auth()
911        self.client.prot_p()
912
913
914@skipUnless(ssl, "SSL not available")
915class TestTLS_FTPClass(TestCase):
916    """Specific TLS_FTP class tests."""
917
918    def setUp(self, encoding=DEFAULT_ENCODING):
919        self.server = DummyTLS_FTPServer((HOST, 0), encoding=encoding)
920        self.server.start()
921        self.client = ftplib.FTP_TLS(timeout=TIMEOUT)
922        self.client.connect(self.server.host, self.server.port)
923
924    def tearDown(self):
925        self.client.close()
926        self.server.stop()
927        # Explicitly clear the attribute to prevent dangling thread
928        self.server = None
929        asyncore.close_all(ignore_all=True)
930
931    def test_control_connection(self):
932        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
933        self.client.auth()
934        self.assertIsInstance(self.client.sock, ssl.SSLSocket)
935
936    def test_data_connection(self):
937        # clear text
938        with self.client.transfercmd('list') as sock:
939            self.assertNotIsInstance(sock, ssl.SSLSocket)
940            self.assertEqual(sock.recv(1024),
941                             LIST_DATA.encode(self.client.encoding))
942        self.assertEqual(self.client.voidresp(), "226 transfer complete")
943
944        # secured, after PROT P
945        self.client.prot_p()
946        with self.client.transfercmd('list') as sock:
947            self.assertIsInstance(sock, ssl.SSLSocket)
948            # consume from SSL socket to finalize handshake and avoid
949            # "SSLError [SSL] shutdown while in init"
950            self.assertEqual(sock.recv(1024),
951                             LIST_DATA.encode(self.client.encoding))
952        self.assertEqual(self.client.voidresp(), "226 transfer complete")
953
954        # PROT C is issued, the connection must be in cleartext again
955        self.client.prot_c()
956        with self.client.transfercmd('list') as sock:
957            self.assertNotIsInstance(sock, ssl.SSLSocket)
958            self.assertEqual(sock.recv(1024),
959                             LIST_DATA.encode(self.client.encoding))
960        self.assertEqual(self.client.voidresp(), "226 transfer complete")
961
962    def test_login(self):
963        # login() is supposed to implicitly secure the control connection
964        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
965        self.client.login()
966        self.assertIsInstance(self.client.sock, ssl.SSLSocket)
967        # make sure that AUTH TLS doesn't get issued again
968        self.client.login()
969
970    def test_auth_issued_twice(self):
971        self.client.auth()
972        self.assertRaises(ValueError, self.client.auth)
973
974    def test_context(self):
975        self.client.quit()
976        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
977        ctx.check_hostname = False
978        ctx.verify_mode = ssl.CERT_NONE
979        self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE,
980                          context=ctx)
981        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
982                          context=ctx)
983        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
984                          keyfile=CERTFILE, context=ctx)
985
986        self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
987        self.client.connect(self.server.host, self.server.port)
988        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
989        self.client.auth()
990        self.assertIs(self.client.sock.context, ctx)
991        self.assertIsInstance(self.client.sock, ssl.SSLSocket)
992
993        self.client.prot_p()
994        with self.client.transfercmd('list') as sock:
995            self.assertIs(sock.context, ctx)
996            self.assertIsInstance(sock, ssl.SSLSocket)
997
998    def test_ccc(self):
999        self.assertRaises(ValueError, self.client.ccc)
1000        self.client.login(secure=True)
1001        self.assertIsInstance(self.client.sock, ssl.SSLSocket)
1002        self.client.ccc()
1003        self.assertRaises(ValueError, self.client.sock.unwrap)
1004
1005    @skipUnless(False, "FIXME: bpo-32706")
1006    def test_check_hostname(self):
1007        self.client.quit()
1008        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1009        self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1010        self.assertEqual(ctx.check_hostname, True)
1011        ctx.load_verify_locations(CAFILE)
1012        self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
1013
1014        # 127.0.0.1 doesn't match SAN
1015        self.client.connect(self.server.host, self.server.port)
1016        with self.assertRaises(ssl.CertificateError):
1017            self.client.auth()
1018        # exception quits connection
1019
1020        self.client.connect(self.server.host, self.server.port)
1021        self.client.prot_p()
1022        with self.assertRaises(ssl.CertificateError):
1023            with self.client.transfercmd("list") as sock:
1024                pass
1025        self.client.quit()
1026
1027        self.client.connect("localhost", self.server.port)
1028        self.client.auth()
1029        self.client.quit()
1030
1031        self.client.connect("localhost", self.server.port)
1032        self.client.prot_p()
1033        with self.client.transfercmd("list") as sock:
1034            pass
1035
1036
1037class TestTimeouts(TestCase):
1038
1039    def setUp(self):
1040        self.evt = threading.Event()
1041        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1042        self.sock.settimeout(20)
1043        self.port = socket_helper.bind_port(self.sock)
1044        self.server_thread = threading.Thread(target=self.server)
1045        self.server_thread.daemon = True
1046        self.server_thread.start()
1047        # Wait for the server to be ready.
1048        self.evt.wait()
1049        self.evt.clear()
1050        self.old_port = ftplib.FTP.port
1051        ftplib.FTP.port = self.port
1052
1053    def tearDown(self):
1054        ftplib.FTP.port = self.old_port
1055        self.server_thread.join()
1056        # Explicitly clear the attribute to prevent dangling thread
1057        self.server_thread = None
1058
1059    def server(self):
1060        # This method sets the evt 3 times:
1061        #  1) when the connection is ready to be accepted.
1062        #  2) when it is safe for the caller to close the connection
1063        #  3) when we have closed the socket
1064        self.sock.listen()
1065        # (1) Signal the caller that we are ready to accept the connection.
1066        self.evt.set()
1067        try:
1068            conn, addr = self.sock.accept()
1069        except TimeoutError:
1070            pass
1071        else:
1072            conn.sendall(b"1 Hola mundo\n")
1073            conn.shutdown(socket.SHUT_WR)
1074            # (2) Signal the caller that it is safe to close the socket.
1075            self.evt.set()
1076            conn.close()
1077        finally:
1078            self.sock.close()
1079
1080    def testTimeoutDefault(self):
1081        # default -- use global socket timeout
1082        self.assertIsNone(socket.getdefaulttimeout())
1083        socket.setdefaulttimeout(30)
1084        try:
1085            ftp = ftplib.FTP(HOST)
1086        finally:
1087            socket.setdefaulttimeout(None)
1088        self.assertEqual(ftp.sock.gettimeout(), 30)
1089        self.evt.wait()
1090        ftp.close()
1091
1092    def testTimeoutNone(self):
1093        # no timeout -- do not use global socket timeout
1094        self.assertIsNone(socket.getdefaulttimeout())
1095        socket.setdefaulttimeout(30)
1096        try:
1097            ftp = ftplib.FTP(HOST, timeout=None)
1098        finally:
1099            socket.setdefaulttimeout(None)
1100        self.assertIsNone(ftp.sock.gettimeout())
1101        self.evt.wait()
1102        ftp.close()
1103
1104    def testTimeoutValue(self):
1105        # a value
1106        ftp = ftplib.FTP(HOST, timeout=30)
1107        self.assertEqual(ftp.sock.gettimeout(), 30)
1108        self.evt.wait()
1109        ftp.close()
1110
1111        # bpo-39259
1112        with self.assertRaises(ValueError):
1113            ftplib.FTP(HOST, timeout=0)
1114
1115    def testTimeoutConnect(self):
1116        ftp = ftplib.FTP()
1117        ftp.connect(HOST, timeout=30)
1118        self.assertEqual(ftp.sock.gettimeout(), 30)
1119        self.evt.wait()
1120        ftp.close()
1121
1122    def testTimeoutDifferentOrder(self):
1123        ftp = ftplib.FTP(timeout=30)
1124        ftp.connect(HOST)
1125        self.assertEqual(ftp.sock.gettimeout(), 30)
1126        self.evt.wait()
1127        ftp.close()
1128
1129    def testTimeoutDirectAccess(self):
1130        ftp = ftplib.FTP()
1131        ftp.timeout = 30
1132        ftp.connect(HOST)
1133        self.assertEqual(ftp.sock.gettimeout(), 30)
1134        self.evt.wait()
1135        ftp.close()
1136
1137
1138class MiscTestCase(TestCase):
1139    def test__all__(self):
1140        not_exported = {
1141            'MSG_OOB', 'FTP_PORT', 'MAXLINE', 'CRLF', 'B_CRLF', 'Error',
1142            'parse150', 'parse227', 'parse229', 'parse257', 'print_line',
1143            'ftpcp', 'test'}
1144        support.check__all__(self, ftplib, not_exported=not_exported)
1145
1146
1147def test_main():
1148    tests = [TestFTPClass, TestTimeouts,
1149             TestIPv6Environment,
1150             TestTLS_FTPClassMixin, TestTLS_FTPClass,
1151             MiscTestCase]
1152
1153    thread_info = threading_helper.threading_setup()
1154    try:
1155        support.run_unittest(*tests)
1156    finally:
1157        threading_helper.threading_cleanup(*thread_info)
1158
1159
1160if __name__ == '__main__':
1161    test_main()
1162