1"""Test script for ftplib module."""
2
3# Modified by Giampaolo Rodola' to test FTP class, IPv6 and TLS
4# environment
5
6import ftplib
7import socket
8import io
9import errno
10import os
11import threading
12import time
13import unittest
14try:
15    import ssl
16except ImportError:
17    ssl = None
18
19from unittest import TestCase, skipUnless
20from test import support
21from test.support import threading_helper
22from test.support import socket_helper
23from test.support import warnings_helper
24from test.support.socket_helper import HOST, HOSTv6
25
26import warnings
27with warnings.catch_warnings():
28    warnings.simplefilter('ignore', DeprecationWarning)
29    import asyncore
30    import asynchat
31
32
33TIMEOUT = support.LOOPBACK_TIMEOUT
34DEFAULT_ENCODING = 'utf-8'
35# the dummy data returned by server over the data channel when
36# RETR, LIST, NLST, MLSD commands are issued
37RETR_DATA = 'abcde12345\r\n' * 1000 + 'non-ascii char \xAE\r\n'
38LIST_DATA = 'foo\r\nbar\r\n non-ascii char \xAE\r\n'
39NLST_DATA = 'foo\r\nbar\r\n non-ascii char \xAE\r\n'
40MLSD_DATA = ("type=cdir;perm=el;unique==keVO1+ZF4; test\r\n"
41             "type=pdir;perm=e;unique==keVO1+d?3; ..\r\n"
42             "type=OS.unix=slink:/foobar;perm=;unique==keVO1+4G4; foobar\r\n"
43             "type=OS.unix=chr-13/29;perm=;unique==keVO1+5G4; device\r\n"
44             "type=OS.unix=blk-11/108;perm=;unique==keVO1+6G4; block\r\n"
45             "type=file;perm=awr;unique==keVO1+8G4; writable\r\n"
46             "type=dir;perm=cpmel;unique==keVO1+7G4; promiscuous\r\n"
47             "type=dir;perm=;unique==keVO1+1t2; no-exec\r\n"
48             "type=file;perm=r;unique==keVO1+EG4; two words\r\n"
49             "type=file;perm=r;unique==keVO1+IH4;  leading space\r\n"
50             "type=file;perm=r;unique==keVO1+1G4; file1\r\n"
51             "type=dir;perm=cpmel;unique==keVO1+7G4; incoming\r\n"
52             "type=file;perm=r;unique==keVO1+1G4; file2\r\n"
53             "type=file;perm=r;unique==keVO1+1G4; file3\r\n"
54             "type=file;perm=r;unique==keVO1+1G4; file4\r\n"
55             "type=dir;perm=cpmel;unique==SGP1; dir \xAE non-ascii char\r\n"
56             "type=file;perm=r;unique==SGP2; file \xAE non-ascii char\r\n")
57
58
59class DummyDTPHandler(asynchat.async_chat):
60    dtp_conn_closed = False
61
62    def __init__(self, conn, baseclass):
63        asynchat.async_chat.__init__(self, conn)
64        self.baseclass = baseclass
65        self.baseclass.last_received_data = ''
66        self.encoding = baseclass.encoding
67
68    def handle_read(self):
69        new_data = self.recv(1024).decode(self.encoding, 'replace')
70        self.baseclass.last_received_data += new_data
71
72    def handle_close(self):
73        # XXX: this method can be called many times in a row for a single
74        # connection, including in clear-text (non-TLS) mode.
75        # (behaviour witnessed with test_data_connection)
76        if not self.dtp_conn_closed:
77            self.baseclass.push('226 transfer complete')
78            self.close()
79            self.dtp_conn_closed = True
80
81    def push(self, what):
82        if self.baseclass.next_data is not None:
83            what = self.baseclass.next_data
84            self.baseclass.next_data = None
85        if not what:
86            return self.close_when_done()
87        super(DummyDTPHandler, self).push(what.encode(self.encoding))
88
89    def handle_error(self):
90        raise Exception
91
92
93class DummyFTPHandler(asynchat.async_chat):
94
95    dtp_handler = DummyDTPHandler
96
97    def __init__(self, conn, encoding=DEFAULT_ENCODING):
98        asynchat.async_chat.__init__(self, conn)
99        # tells the socket to handle urgent data inline (ABOR command)
100        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_OOBINLINE, 1)
101        self.set_terminator(b"\r\n")
102        self.in_buffer = []
103        self.dtp = None
104        self.last_received_cmd = None
105        self.last_received_data = ''
106        self.next_response = ''
107        self.next_data = None
108        self.rest = None
109        self.next_retr_data = RETR_DATA
110        self.push('220 welcome')
111        self.encoding = encoding
112        # We use this as the string IPv4 address to direct the client
113        # to in response to a PASV command.  To test security behavior.
114        # https://bugs.python.org/issue43285/.
115        self.fake_pasv_server_ip = '252.253.254.255'
116
117    def collect_incoming_data(self, data):
118        self.in_buffer.append(data)
119
120    def found_terminator(self):
121        line = b''.join(self.in_buffer).decode(self.encoding)
122        self.in_buffer = []
123        if self.next_response:
124            self.push(self.next_response)
125            self.next_response = ''
126        cmd = line.split(' ')[0].lower()
127        self.last_received_cmd = cmd
128        space = line.find(' ')
129        if space != -1:
130            arg = line[space + 1:]
131        else:
132            arg = ""
133        if hasattr(self, 'cmd_' + cmd):
134            method = getattr(self, 'cmd_' + cmd)
135            method(arg)
136        else:
137            self.push('550 command "%s" not understood.' %cmd)
138
139    def handle_error(self):
140        raise Exception
141
142    def push(self, data):
143        asynchat.async_chat.push(self, data.encode(self.encoding) + b'\r\n')
144
145    def cmd_port(self, arg):
146        addr = list(map(int, arg.split(',')))
147        ip = '%d.%d.%d.%d' %tuple(addr[:4])
148        port = (addr[4] * 256) + addr[5]
149        s = socket.create_connection((ip, port), timeout=TIMEOUT)
150        self.dtp = self.dtp_handler(s, baseclass=self)
151        self.push('200 active data connection established')
152
153    def cmd_pasv(self, arg):
154        with socket.create_server((self.socket.getsockname()[0], 0)) as sock:
155            sock.settimeout(TIMEOUT)
156            port = sock.getsockname()[1]
157            ip = self.fake_pasv_server_ip
158            ip = ip.replace('.', ','); p1 = port / 256; p2 = port % 256
159            self.push('227 entering passive mode (%s,%d,%d)' %(ip, p1, p2))
160            conn, addr = sock.accept()
161            self.dtp = self.dtp_handler(conn, baseclass=self)
162
163    def cmd_eprt(self, arg):
164        af, ip, port = arg.split(arg[0])[1:-1]
165        port = int(port)
166        s = socket.create_connection((ip, port), timeout=TIMEOUT)
167        self.dtp = self.dtp_handler(s, baseclass=self)
168        self.push('200 active data connection established')
169
170    def cmd_epsv(self, arg):
171        with socket.create_server((self.socket.getsockname()[0], 0),
172                                  family=socket.AF_INET6) as sock:
173            sock.settimeout(TIMEOUT)
174            port = sock.getsockname()[1]
175            self.push('229 entering extended passive mode (|||%d|)' %port)
176            conn, addr = sock.accept()
177            self.dtp = self.dtp_handler(conn, baseclass=self)
178
179    def cmd_echo(self, arg):
180        # sends back the received string (used by the test suite)
181        self.push(arg)
182
183    def cmd_noop(self, arg):
184        self.push('200 noop ok')
185
186    def cmd_user(self, arg):
187        self.push('331 username ok')
188
189    def cmd_pass(self, arg):
190        self.push('230 password ok')
191
192    def cmd_acct(self, arg):
193        self.push('230 acct ok')
194
195    def cmd_rnfr(self, arg):
196        self.push('350 rnfr ok')
197
198    def cmd_rnto(self, arg):
199        self.push('250 rnto ok')
200
201    def cmd_dele(self, arg):
202        self.push('250 dele ok')
203
204    def cmd_cwd(self, arg):
205        self.push('250 cwd ok')
206
207    def cmd_size(self, arg):
208        self.push('250 1000')
209
210    def cmd_mkd(self, arg):
211        self.push('257 "%s"' %arg)
212
213    def cmd_rmd(self, arg):
214        self.push('250 rmd ok')
215
216    def cmd_pwd(self, arg):
217        self.push('257 "pwd ok"')
218
219    def cmd_type(self, arg):
220        self.push('200 type ok')
221
222    def cmd_quit(self, arg):
223        self.push('221 quit ok')
224        self.close()
225
226    def cmd_abor(self, arg):
227        self.push('226 abor ok')
228
229    def cmd_stor(self, arg):
230        self.push('125 stor ok')
231
232    def cmd_rest(self, arg):
233        self.rest = arg
234        self.push('350 rest ok')
235
236    def cmd_retr(self, arg):
237        self.push('125 retr ok')
238        if self.rest is not None:
239            offset = int(self.rest)
240        else:
241            offset = 0
242        self.dtp.push(self.next_retr_data[offset:])
243        self.dtp.close_when_done()
244        self.rest = None
245
246    def cmd_list(self, arg):
247        self.push('125 list ok')
248        self.dtp.push(LIST_DATA)
249        self.dtp.close_when_done()
250
251    def cmd_nlst(self, arg):
252        self.push('125 nlst ok')
253        self.dtp.push(NLST_DATA)
254        self.dtp.close_when_done()
255
256    def cmd_opts(self, arg):
257        self.push('200 opts ok')
258
259    def cmd_mlsd(self, arg):
260        self.push('125 mlsd ok')
261        self.dtp.push(MLSD_DATA)
262        self.dtp.close_when_done()
263
264    def cmd_setlongretr(self, arg):
265        # For testing. Next RETR will return long line.
266        self.next_retr_data = 'x' * int(arg)
267        self.push('125 setlongretr ok')
268
269
270class DummyFTPServer(asyncore.dispatcher, threading.Thread):
271
272    handler = DummyFTPHandler
273
274    def __init__(self, address, af=socket.AF_INET, encoding=DEFAULT_ENCODING):
275        threading.Thread.__init__(self)
276        asyncore.dispatcher.__init__(self)
277        self.daemon = True
278        self.create_socket(af, socket.SOCK_STREAM)
279        self.bind(address)
280        self.listen(5)
281        self.active = False
282        self.active_lock = threading.Lock()
283        self.host, self.port = self.socket.getsockname()[:2]
284        self.handler_instance = None
285        self.encoding = encoding
286
287    def start(self):
288        assert not self.active
289        self.__flag = threading.Event()
290        threading.Thread.start(self)
291        self.__flag.wait()
292
293    def run(self):
294        self.active = True
295        self.__flag.set()
296        while self.active and asyncore.socket_map:
297            self.active_lock.acquire()
298            asyncore.loop(timeout=0.1, count=1)
299            self.active_lock.release()
300        asyncore.close_all(ignore_all=True)
301
302    def stop(self):
303        assert self.active
304        self.active = False
305        self.join()
306
307    def handle_accepted(self, conn, addr):
308        self.handler_instance = self.handler(conn, encoding=self.encoding)
309
310    def handle_connect(self):
311        self.close()
312    handle_read = handle_connect
313
314    def writable(self):
315        return 0
316
317    def handle_error(self):
318        raise Exception
319
320
321if ssl is not None:
322
323    CERTFILE = os.path.join(os.path.dirname(__file__), "keycert3.pem")
324    CAFILE = os.path.join(os.path.dirname(__file__), "pycacert.pem")
325
326    class SSLConnection(asyncore.dispatcher):
327        """An asyncore.dispatcher subclass supporting TLS/SSL."""
328
329        _ssl_accepting = False
330        _ssl_closing = False
331
332        def secure_connection(self):
333            context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
334            context.load_cert_chain(CERTFILE)
335            socket = context.wrap_socket(self.socket,
336                                         suppress_ragged_eofs=False,
337                                         server_side=True,
338                                         do_handshake_on_connect=False)
339            self.del_channel()
340            self.set_socket(socket)
341            self._ssl_accepting = True
342
343        def _do_ssl_handshake(self):
344            try:
345                self.socket.do_handshake()
346            except ssl.SSLError as err:
347                if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
348                                   ssl.SSL_ERROR_WANT_WRITE):
349                    return
350                elif err.args[0] == ssl.SSL_ERROR_EOF:
351                    return self.handle_close()
352                # TODO: SSLError does not expose alert information
353                elif "SSLV3_ALERT_BAD_CERTIFICATE" in err.args[1]:
354                    return self.handle_close()
355                raise
356            except OSError as err:
357                if err.args[0] == errno.ECONNABORTED:
358                    return self.handle_close()
359            else:
360                self._ssl_accepting = False
361
362        def _do_ssl_shutdown(self):
363            self._ssl_closing = True
364            try:
365                self.socket = self.socket.unwrap()
366            except ssl.SSLError as err:
367                if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
368                                   ssl.SSL_ERROR_WANT_WRITE):
369                    return
370            except OSError:
371                # Any "socket error" corresponds to a SSL_ERROR_SYSCALL return
372                # from OpenSSL's SSL_shutdown(), corresponding to a
373                # closed socket condition. See also:
374                # http://www.mail-archive.com/openssl-users@openssl.org/msg60710.html
375                pass
376            self._ssl_closing = False
377            if getattr(self, '_ccc', False) is False:
378                super(SSLConnection, self).close()
379            else:
380                pass
381
382        def handle_read_event(self):
383            if self._ssl_accepting:
384                self._do_ssl_handshake()
385            elif self._ssl_closing:
386                self._do_ssl_shutdown()
387            else:
388                super(SSLConnection, self).handle_read_event()
389
390        def handle_write_event(self):
391            if self._ssl_accepting:
392                self._do_ssl_handshake()
393            elif self._ssl_closing:
394                self._do_ssl_shutdown()
395            else:
396                super(SSLConnection, self).handle_write_event()
397
398        def send(self, data):
399            try:
400                return super(SSLConnection, self).send(data)
401            except ssl.SSLError as err:
402                if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN,
403                                   ssl.SSL_ERROR_WANT_READ,
404                                   ssl.SSL_ERROR_WANT_WRITE):
405                    return 0
406                raise
407
408        def recv(self, buffer_size):
409            try:
410                return super(SSLConnection, self).recv(buffer_size)
411            except ssl.SSLError as err:
412                if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
413                                   ssl.SSL_ERROR_WANT_WRITE):
414                    return b''
415                if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN):
416                    self.handle_close()
417                    return b''
418                raise
419
420        def handle_error(self):
421            raise Exception
422
423        def close(self):
424            if (isinstance(self.socket, ssl.SSLSocket) and
425                    self.socket._sslobj is not None):
426                self._do_ssl_shutdown()
427            else:
428                super(SSLConnection, self).close()
429
430
431    class DummyTLS_DTPHandler(SSLConnection, DummyDTPHandler):
432        """A DummyDTPHandler subclass supporting TLS/SSL."""
433
434        def __init__(self, conn, baseclass):
435            DummyDTPHandler.__init__(self, conn, baseclass)
436            if self.baseclass.secure_data_channel:
437                self.secure_connection()
438
439
440    class DummyTLS_FTPHandler(SSLConnection, DummyFTPHandler):
441        """A DummyFTPHandler subclass supporting TLS/SSL."""
442
443        dtp_handler = DummyTLS_DTPHandler
444
445        def __init__(self, conn, encoding=DEFAULT_ENCODING):
446            DummyFTPHandler.__init__(self, conn, encoding=encoding)
447            self.secure_data_channel = False
448            self._ccc = False
449
450        def cmd_auth(self, line):
451            """Set up secure control channel."""
452            self.push('234 AUTH TLS successful')
453            self.secure_connection()
454
455        def cmd_ccc(self, line):
456            self.push('220 Reverting back to clear-text')
457            self._ccc = True
458            self._do_ssl_shutdown()
459
460        def cmd_pbsz(self, line):
461            """Negotiate size of buffer for secure data transfer.
462            For TLS/SSL the only valid value for the parameter is '0'.
463            Any other value is accepted but ignored.
464            """
465            self.push('200 PBSZ=0 successful.')
466
467        def cmd_prot(self, line):
468            """Setup un/secure data channel."""
469            arg = line.upper()
470            if arg == 'C':
471                self.push('200 Protection set to Clear')
472                self.secure_data_channel = False
473            elif arg == 'P':
474                self.push('200 Protection set to Private')
475                self.secure_data_channel = True
476            else:
477                self.push("502 Unrecognized PROT type (use C or P).")
478
479
480    class DummyTLS_FTPServer(DummyFTPServer):
481        handler = DummyTLS_FTPHandler
482
483
484class TestFTPClass(TestCase):
485
486    def setUp(self, encoding=DEFAULT_ENCODING):
487        self.server = DummyFTPServer((HOST, 0), encoding=encoding)
488        self.server.start()
489        self.client = ftplib.FTP(timeout=TIMEOUT, encoding=encoding)
490        self.client.connect(self.server.host, self.server.port)
491
492    def tearDown(self):
493        self.client.close()
494        self.server.stop()
495        # Explicitly clear the attribute to prevent dangling thread
496        self.server = None
497        asyncore.close_all(ignore_all=True)
498
499    def check_data(self, received, expected):
500        self.assertEqual(len(received), len(expected))
501        self.assertEqual(received, expected)
502
503    def test_getwelcome(self):
504        self.assertEqual(self.client.getwelcome(), '220 welcome')
505
506    def test_sanitize(self):
507        self.assertEqual(self.client.sanitize('foo'), repr('foo'))
508        self.assertEqual(self.client.sanitize('pass 12345'), repr('pass *****'))
509        self.assertEqual(self.client.sanitize('PASS 12345'), repr('PASS *****'))
510
511    def test_exceptions(self):
512        self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\r\n0')
513        self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\n0')
514        self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\r0')
515        self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 400')
516        self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 499')
517        self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 500')
518        self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 599')
519        self.assertRaises(ftplib.error_proto, self.client.sendcmd, 'echo 999')
520
521    def test_all_errors(self):
522        exceptions = (ftplib.error_reply, ftplib.error_temp, ftplib.error_perm,
523                      ftplib.error_proto, ftplib.Error, OSError,
524                      EOFError)
525        for x in exceptions:
526            try:
527                raise x('exception not included in all_errors set')
528            except ftplib.all_errors:
529                pass
530
531    def test_set_pasv(self):
532        # passive mode is supposed to be enabled by default
533        self.assertTrue(self.client.passiveserver)
534        self.client.set_pasv(True)
535        self.assertTrue(self.client.passiveserver)
536        self.client.set_pasv(False)
537        self.assertFalse(self.client.passiveserver)
538
539    def test_voidcmd(self):
540        self.client.voidcmd('echo 200')
541        self.client.voidcmd('echo 299')
542        self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 199')
543        self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 300')
544
545    def test_login(self):
546        self.client.login()
547
548    def test_acct(self):
549        self.client.acct('passwd')
550
551    def test_rename(self):
552        self.client.rename('a', 'b')
553        self.server.handler_instance.next_response = '200'
554        self.assertRaises(ftplib.error_reply, self.client.rename, 'a', 'b')
555
556    def test_delete(self):
557        self.client.delete('foo')
558        self.server.handler_instance.next_response = '199'
559        self.assertRaises(ftplib.error_reply, self.client.delete, 'foo')
560
561    def test_size(self):
562        self.client.size('foo')
563
564    def test_mkd(self):
565        dir = self.client.mkd('/foo')
566        self.assertEqual(dir, '/foo')
567
568    def test_rmd(self):
569        self.client.rmd('foo')
570
571    def test_cwd(self):
572        dir = self.client.cwd('/foo')
573        self.assertEqual(dir, '250 cwd ok')
574
575    def test_pwd(self):
576        dir = self.client.pwd()
577        self.assertEqual(dir, 'pwd ok')
578
579    def test_quit(self):
580        self.assertEqual(self.client.quit(), '221 quit ok')
581        # Ensure the connection gets closed; sock attribute should be None
582        self.assertEqual(self.client.sock, None)
583
584    def test_abort(self):
585        self.client.abort()
586
587    def test_retrbinary(self):
588        def callback(data):
589            received.append(data.decode(self.client.encoding))
590        received = []
591        self.client.retrbinary('retr', callback)
592        self.check_data(''.join(received), RETR_DATA)
593
594    def test_retrbinary_rest(self):
595        def callback(data):
596            received.append(data.decode(self.client.encoding))
597        for rest in (0, 10, 20):
598            received = []
599            self.client.retrbinary('retr', callback, rest=rest)
600            self.check_data(''.join(received), RETR_DATA[rest:])
601
602    def test_retrlines(self):
603        received = []
604        self.client.retrlines('retr', received.append)
605        self.check_data(''.join(received), RETR_DATA.replace('\r\n', ''))
606
607    def test_storbinary(self):
608        f = io.BytesIO(RETR_DATA.encode(self.client.encoding))
609        self.client.storbinary('stor', f)
610        self.check_data(self.server.handler_instance.last_received_data, RETR_DATA)
611        # test new callback arg
612        flag = []
613        f.seek(0)
614        self.client.storbinary('stor', f, callback=lambda x: flag.append(None))
615        self.assertTrue(flag)
616
617    def test_storbinary_rest(self):
618        data = RETR_DATA.replace('\r\n', '\n').encode(self.client.encoding)
619        f = io.BytesIO(data)
620        for r in (30, '30'):
621            f.seek(0)
622            self.client.storbinary('stor', f, rest=r)
623            self.assertEqual(self.server.handler_instance.rest, str(r))
624
625    def test_storlines(self):
626        data = RETR_DATA.replace('\r\n', '\n').encode(self.client.encoding)
627        f = io.BytesIO(data)
628        self.client.storlines('stor', f)
629        self.check_data(self.server.handler_instance.last_received_data, RETR_DATA)
630        # test new callback arg
631        flag = []
632        f.seek(0)
633        self.client.storlines('stor foo', f, callback=lambda x: flag.append(None))
634        self.assertTrue(flag)
635
636        f = io.StringIO(RETR_DATA.replace('\r\n', '\n'))
637        # storlines() expects a binary file, not a text file
638        with warnings_helper.check_warnings(('', BytesWarning), quiet=True):
639            self.assertRaises(TypeError, self.client.storlines, 'stor foo', f)
640
641    def test_nlst(self):
642        self.client.nlst()
643        self.assertEqual(self.client.nlst(), NLST_DATA.split('\r\n')[:-1])
644
645    def test_dir(self):
646        l = []
647        self.client.dir(lambda x: l.append(x))
648        self.assertEqual(''.join(l), LIST_DATA.replace('\r\n', ''))
649
650    def test_mlsd(self):
651        list(self.client.mlsd())
652        list(self.client.mlsd(path='/'))
653        list(self.client.mlsd(path='/', facts=['size', 'type']))
654
655        ls = list(self.client.mlsd())
656        for name, facts in ls:
657            self.assertIsInstance(name, str)
658            self.assertIsInstance(facts, dict)
659            self.assertTrue(name)
660            self.assertIn('type', facts)
661            self.assertIn('perm', facts)
662            self.assertIn('unique', facts)
663
664        def set_data(data):
665            self.server.handler_instance.next_data = data
666
667        def test_entry(line, type=None, perm=None, unique=None, name=None):
668            type = 'type' if type is None else type
669            perm = 'perm' if perm is None else perm
670            unique = 'unique' if unique is None else unique
671            name = 'name' if name is None else name
672            set_data(line)
673            _name, facts = next(self.client.mlsd())
674            self.assertEqual(_name, name)
675            self.assertEqual(facts['type'], type)
676            self.assertEqual(facts['perm'], perm)
677            self.assertEqual(facts['unique'], unique)
678
679        # plain
680        test_entry('type=type;perm=perm;unique=unique; name\r\n')
681        # "=" in fact value
682        test_entry('type=ty=pe;perm=perm;unique=unique; name\r\n', type="ty=pe")
683        test_entry('type==type;perm=perm;unique=unique; name\r\n', type="=type")
684        test_entry('type=t=y=pe;perm=perm;unique=unique; name\r\n', type="t=y=pe")
685        test_entry('type=====;perm=perm;unique=unique; name\r\n', type="====")
686        # spaces in name
687        test_entry('type=type;perm=perm;unique=unique; na me\r\n', name="na me")
688        test_entry('type=type;perm=perm;unique=unique; name \r\n', name="name ")
689        test_entry('type=type;perm=perm;unique=unique;  name\r\n', name=" name")
690        test_entry('type=type;perm=perm;unique=unique; n am  e\r\n', name="n am  e")
691        # ";" in name
692        test_entry('type=type;perm=perm;unique=unique; na;me\r\n', name="na;me")
693        test_entry('type=type;perm=perm;unique=unique; ;name\r\n', name=";name")
694        test_entry('type=type;perm=perm;unique=unique; ;name;\r\n', name=";name;")
695        test_entry('type=type;perm=perm;unique=unique; ;;;;\r\n', name=";;;;")
696        # case sensitiveness
697        set_data('Type=type;TyPe=perm;UNIQUE=unique; name\r\n')
698        _name, facts = next(self.client.mlsd())
699        for x in facts:
700            self.assertTrue(x.islower())
701        # no data (directory empty)
702        set_data('')
703        self.assertRaises(StopIteration, next, self.client.mlsd())
704        set_data('')
705        for x in self.client.mlsd():
706            self.fail("unexpected data %s" % x)
707
708    def test_makeport(self):
709        with self.client.makeport():
710            # IPv4 is in use, just make sure send_eprt has not been used
711            self.assertEqual(self.server.handler_instance.last_received_cmd,
712                                'port')
713
714    def test_makepasv(self):
715        host, port = self.client.makepasv()
716        conn = socket.create_connection((host, port), timeout=TIMEOUT)
717        conn.close()
718        # IPv4 is in use, just make sure send_epsv has not been used
719        self.assertEqual(self.server.handler_instance.last_received_cmd, 'pasv')
720
721    def test_makepasv_issue43285_security_disabled(self):
722        """Test the opt-in to the old vulnerable behavior."""
723        self.client.trust_server_pasv_ipv4_address = True
724        bad_host, port = self.client.makepasv()
725        self.assertEqual(
726                bad_host, self.server.handler_instance.fake_pasv_server_ip)
727        # Opening and closing a connection keeps the dummy server happy
728        # instead of timing out on accept.
729        socket.create_connection((self.client.sock.getpeername()[0], port),
730                                 timeout=TIMEOUT).close()
731
732    def test_makepasv_issue43285_security_enabled_default(self):
733        self.assertFalse(self.client.trust_server_pasv_ipv4_address)
734        trusted_host, port = self.client.makepasv()
735        self.assertNotEqual(
736                trusted_host, self.server.handler_instance.fake_pasv_server_ip)
737        # Opening and closing a connection keeps the dummy server happy
738        # instead of timing out on accept.
739        socket.create_connection((trusted_host, port), timeout=TIMEOUT).close()
740
741    def test_with_statement(self):
742        self.client.quit()
743
744        def is_client_connected():
745            if self.client.sock is None:
746                return False
747            try:
748                self.client.sendcmd('noop')
749            except (OSError, EOFError):
750                return False
751            return True
752
753        # base test
754        with ftplib.FTP(timeout=TIMEOUT) as self.client:
755            self.client.connect(self.server.host, self.server.port)
756            self.client.sendcmd('noop')
757            self.assertTrue(is_client_connected())
758        self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
759        self.assertFalse(is_client_connected())
760
761        # QUIT sent inside the with block
762        with ftplib.FTP(timeout=TIMEOUT) as self.client:
763            self.client.connect(self.server.host, self.server.port)
764            self.client.sendcmd('noop')
765            self.client.quit()
766        self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
767        self.assertFalse(is_client_connected())
768
769        # force a wrong response code to be sent on QUIT: error_perm
770        # is expected and the connection is supposed to be closed
771        try:
772            with ftplib.FTP(timeout=TIMEOUT) as self.client:
773                self.client.connect(self.server.host, self.server.port)
774                self.client.sendcmd('noop')
775                self.server.handler_instance.next_response = '550 error on quit'
776        except ftplib.error_perm as err:
777            self.assertEqual(str(err), '550 error on quit')
778        else:
779            self.fail('Exception not raised')
780        # needed to give the threaded server some time to set the attribute
781        # which otherwise would still be == 'noop'
782        time.sleep(0.1)
783        self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
784        self.assertFalse(is_client_connected())
785
786    def test_source_address(self):
787        self.client.quit()
788        port = socket_helper.find_unused_port()
789        try:
790            self.client.connect(self.server.host, self.server.port,
791                                source_address=(HOST, port))
792            self.assertEqual(self.client.sock.getsockname()[1], port)
793            self.client.quit()
794        except OSError as e:
795            if e.errno == errno.EADDRINUSE:
796                self.skipTest("couldn't bind to port %d" % port)
797            raise
798
799    def test_source_address_passive_connection(self):
800        port = socket_helper.find_unused_port()
801        self.client.source_address = (HOST, port)
802        try:
803            with self.client.transfercmd('list') as sock:
804                self.assertEqual(sock.getsockname()[1], port)
805        except OSError as e:
806            if e.errno == errno.EADDRINUSE:
807                self.skipTest("couldn't bind to port %d" % port)
808            raise
809
810    def test_parse257(self):
811        self.assertEqual(ftplib.parse257('257 "/foo/bar"'), '/foo/bar')
812        self.assertEqual(ftplib.parse257('257 "/foo/bar" created'), '/foo/bar')
813        self.assertEqual(ftplib.parse257('257 ""'), '')
814        self.assertEqual(ftplib.parse257('257 "" created'), '')
815        self.assertRaises(ftplib.error_reply, ftplib.parse257, '250 "/foo/bar"')
816        # The 257 response is supposed to include the directory
817        # name and in case it contains embedded double-quotes
818        # they must be doubled (see RFC-959, chapter 7, appendix 2).
819        self.assertEqual(ftplib.parse257('257 "/foo/b""ar"'), '/foo/b"ar')
820        self.assertEqual(ftplib.parse257('257 "/foo/b""ar" created'), '/foo/b"ar')
821
822    def test_line_too_long(self):
823        self.assertRaises(ftplib.Error, self.client.sendcmd,
824                          'x' * self.client.maxline * 2)
825
826    def test_retrlines_too_long(self):
827        self.client.sendcmd('SETLONGRETR %d' % (self.client.maxline * 2))
828        received = []
829        self.assertRaises(ftplib.Error,
830                          self.client.retrlines, 'retr', received.append)
831
832    def test_storlines_too_long(self):
833        f = io.BytesIO(b'x' * self.client.maxline * 2)
834        self.assertRaises(ftplib.Error, self.client.storlines, 'stor', f)
835
836    def test_encoding_param(self):
837        encodings = ['latin-1', 'utf-8']
838        for encoding in encodings:
839            with self.subTest(encoding=encoding):
840                self.tearDown()
841                self.setUp(encoding=encoding)
842                self.assertEqual(encoding, self.client.encoding)
843                self.test_retrbinary()
844                self.test_storbinary()
845                self.test_retrlines()
846                new_dir = self.client.mkd('/non-ascii dir \xAE')
847                self.check_data(new_dir, '/non-ascii dir \xAE')
848        # Check default encoding
849        client = ftplib.FTP(timeout=TIMEOUT)
850        self.assertEqual(DEFAULT_ENCODING, client.encoding)
851
852
853@skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled")
854class TestIPv6Environment(TestCase):
855
856    def setUp(self):
857        self.server = DummyFTPServer((HOSTv6, 0),
858                                     af=socket.AF_INET6,
859                                     encoding=DEFAULT_ENCODING)
860        self.server.start()
861        self.client = ftplib.FTP(timeout=TIMEOUT, encoding=DEFAULT_ENCODING)
862        self.client.connect(self.server.host, self.server.port)
863
864    def tearDown(self):
865        self.client.close()
866        self.server.stop()
867        # Explicitly clear the attribute to prevent dangling thread
868        self.server = None
869        asyncore.close_all(ignore_all=True)
870
871    def test_af(self):
872        self.assertEqual(self.client.af, socket.AF_INET6)
873
874    def test_makeport(self):
875        with self.client.makeport():
876            self.assertEqual(self.server.handler_instance.last_received_cmd,
877                                'eprt')
878
879    def test_makepasv(self):
880        host, port = self.client.makepasv()
881        conn = socket.create_connection((host, port), timeout=TIMEOUT)
882        conn.close()
883        self.assertEqual(self.server.handler_instance.last_received_cmd, 'epsv')
884
885    def test_transfer(self):
886        def retr():
887            def callback(data):
888                received.append(data.decode(self.client.encoding))
889            received = []
890            self.client.retrbinary('retr', callback)
891            self.assertEqual(len(''.join(received)), len(RETR_DATA))
892            self.assertEqual(''.join(received), RETR_DATA)
893        self.client.set_pasv(True)
894        retr()
895        self.client.set_pasv(False)
896        retr()
897
898
899@skipUnless(ssl, "SSL not available")
900class TestTLS_FTPClassMixin(TestFTPClass):
901    """Repeat TestFTPClass tests starting the TLS layer for both control
902    and data connections first.
903    """
904
905    def setUp(self, encoding=DEFAULT_ENCODING):
906        self.server = DummyTLS_FTPServer((HOST, 0), encoding=encoding)
907        self.server.start()
908        self.client = ftplib.FTP_TLS(timeout=TIMEOUT, encoding=encoding)
909        self.client.connect(self.server.host, self.server.port)
910        # enable TLS
911        self.client.auth()
912        self.client.prot_p()
913
914
915@skipUnless(ssl, "SSL not available")
916class TestTLS_FTPClass(TestCase):
917    """Specific TLS_FTP class tests."""
918
919    def setUp(self, encoding=DEFAULT_ENCODING):
920        self.server = DummyTLS_FTPServer((HOST, 0), encoding=encoding)
921        self.server.start()
922        self.client = ftplib.FTP_TLS(timeout=TIMEOUT)
923        self.client.connect(self.server.host, self.server.port)
924
925    def tearDown(self):
926        self.client.close()
927        self.server.stop()
928        # Explicitly clear the attribute to prevent dangling thread
929        self.server = None
930        asyncore.close_all(ignore_all=True)
931
932    def test_control_connection(self):
933        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
934        self.client.auth()
935        self.assertIsInstance(self.client.sock, ssl.SSLSocket)
936
937    def test_data_connection(self):
938        # clear text
939        with self.client.transfercmd('list') as sock:
940            self.assertNotIsInstance(sock, ssl.SSLSocket)
941            self.assertEqual(sock.recv(1024),
942                             LIST_DATA.encode(self.client.encoding))
943        self.assertEqual(self.client.voidresp(), "226 transfer complete")
944
945        # secured, after PROT P
946        self.client.prot_p()
947        with self.client.transfercmd('list') as sock:
948            self.assertIsInstance(sock, ssl.SSLSocket)
949            # consume from SSL socket to finalize handshake and avoid
950            # "SSLError [SSL] shutdown while in init"
951            self.assertEqual(sock.recv(1024),
952                             LIST_DATA.encode(self.client.encoding))
953        self.assertEqual(self.client.voidresp(), "226 transfer complete")
954
955        # PROT C is issued, the connection must be in cleartext again
956        self.client.prot_c()
957        with self.client.transfercmd('list') as sock:
958            self.assertNotIsInstance(sock, ssl.SSLSocket)
959            self.assertEqual(sock.recv(1024),
960                             LIST_DATA.encode(self.client.encoding))
961        self.assertEqual(self.client.voidresp(), "226 transfer complete")
962
963    def test_login(self):
964        # login() is supposed to implicitly secure the control connection
965        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
966        self.client.login()
967        self.assertIsInstance(self.client.sock, ssl.SSLSocket)
968        # make sure that AUTH TLS doesn't get issued again
969        self.client.login()
970
971    def test_auth_issued_twice(self):
972        self.client.auth()
973        self.assertRaises(ValueError, self.client.auth)
974
975    def test_context(self):
976        self.client.quit()
977        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
978        ctx.check_hostname = False
979        ctx.verify_mode = ssl.CERT_NONE
980        self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE,
981                          context=ctx)
982        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
983                          context=ctx)
984        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
985                          keyfile=CERTFILE, context=ctx)
986
987        self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
988        self.client.connect(self.server.host, self.server.port)
989        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
990        self.client.auth()
991        self.assertIs(self.client.sock.context, ctx)
992        self.assertIsInstance(self.client.sock, ssl.SSLSocket)
993
994        self.client.prot_p()
995        with self.client.transfercmd('list') as sock:
996            self.assertIs(sock.context, ctx)
997            self.assertIsInstance(sock, ssl.SSLSocket)
998
999    def test_ccc(self):
1000        self.assertRaises(ValueError, self.client.ccc)
1001        self.client.login(secure=True)
1002        self.assertIsInstance(self.client.sock, ssl.SSLSocket)
1003        self.client.ccc()
1004        self.assertRaises(ValueError, self.client.sock.unwrap)
1005
1006    @skipUnless(False, "FIXME: bpo-32706")
1007    def test_check_hostname(self):
1008        self.client.quit()
1009        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1010        self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1011        self.assertEqual(ctx.check_hostname, True)
1012        ctx.load_verify_locations(CAFILE)
1013        self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
1014
1015        # 127.0.0.1 doesn't match SAN
1016        self.client.connect(self.server.host, self.server.port)
1017        with self.assertRaises(ssl.CertificateError):
1018            self.client.auth()
1019        # exception quits connection
1020
1021        self.client.connect(self.server.host, self.server.port)
1022        self.client.prot_p()
1023        with self.assertRaises(ssl.CertificateError):
1024            with self.client.transfercmd("list") as sock:
1025                pass
1026        self.client.quit()
1027
1028        self.client.connect("localhost", self.server.port)
1029        self.client.auth()
1030        self.client.quit()
1031
1032        self.client.connect("localhost", self.server.port)
1033        self.client.prot_p()
1034        with self.client.transfercmd("list") as sock:
1035            pass
1036
1037
1038class TestTimeouts(TestCase):
1039
1040    def setUp(self):
1041        self.evt = threading.Event()
1042        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1043        self.sock.settimeout(20)
1044        self.port = socket_helper.bind_port(self.sock)
1045        self.server_thread = threading.Thread(target=self.server)
1046        self.server_thread.daemon = True
1047        self.server_thread.start()
1048        # Wait for the server to be ready.
1049        self.evt.wait()
1050        self.evt.clear()
1051        self.old_port = ftplib.FTP.port
1052        ftplib.FTP.port = self.port
1053
1054    def tearDown(self):
1055        ftplib.FTP.port = self.old_port
1056        self.server_thread.join()
1057        # Explicitly clear the attribute to prevent dangling thread
1058        self.server_thread = None
1059
1060    def server(self):
1061        # This method sets the evt 3 times:
1062        #  1) when the connection is ready to be accepted.
1063        #  2) when it is safe for the caller to close the connection
1064        #  3) when we have closed the socket
1065        self.sock.listen()
1066        # (1) Signal the caller that we are ready to accept the connection.
1067        self.evt.set()
1068        try:
1069            conn, addr = self.sock.accept()
1070        except TimeoutError:
1071            pass
1072        else:
1073            conn.sendall(b"1 Hola mundo\n")
1074            conn.shutdown(socket.SHUT_WR)
1075            # (2) Signal the caller that it is safe to close the socket.
1076            self.evt.set()
1077            conn.close()
1078        finally:
1079            self.sock.close()
1080
1081    def testTimeoutDefault(self):
1082        # default -- use global socket timeout
1083        self.assertIsNone(socket.getdefaulttimeout())
1084        socket.setdefaulttimeout(30)
1085        try:
1086            ftp = ftplib.FTP(HOST)
1087        finally:
1088            socket.setdefaulttimeout(None)
1089        self.assertEqual(ftp.sock.gettimeout(), 30)
1090        self.evt.wait()
1091        ftp.close()
1092
1093    def testTimeoutNone(self):
1094        # no timeout -- do not use global socket timeout
1095        self.assertIsNone(socket.getdefaulttimeout())
1096        socket.setdefaulttimeout(30)
1097        try:
1098            ftp = ftplib.FTP(HOST, timeout=None)
1099        finally:
1100            socket.setdefaulttimeout(None)
1101        self.assertIsNone(ftp.sock.gettimeout())
1102        self.evt.wait()
1103        ftp.close()
1104
1105    def testTimeoutValue(self):
1106        # a value
1107        ftp = ftplib.FTP(HOST, timeout=30)
1108        self.assertEqual(ftp.sock.gettimeout(), 30)
1109        self.evt.wait()
1110        ftp.close()
1111
1112        # bpo-39259
1113        with self.assertRaises(ValueError):
1114            ftplib.FTP(HOST, timeout=0)
1115
1116    def testTimeoutConnect(self):
1117        ftp = ftplib.FTP()
1118        ftp.connect(HOST, timeout=30)
1119        self.assertEqual(ftp.sock.gettimeout(), 30)
1120        self.evt.wait()
1121        ftp.close()
1122
1123    def testTimeoutDifferentOrder(self):
1124        ftp = ftplib.FTP(timeout=30)
1125        ftp.connect(HOST)
1126        self.assertEqual(ftp.sock.gettimeout(), 30)
1127        self.evt.wait()
1128        ftp.close()
1129
1130    def testTimeoutDirectAccess(self):
1131        ftp = ftplib.FTP()
1132        ftp.timeout = 30
1133        ftp.connect(HOST)
1134        self.assertEqual(ftp.sock.gettimeout(), 30)
1135        self.evt.wait()
1136        ftp.close()
1137
1138
1139class MiscTestCase(TestCase):
1140    def test__all__(self):
1141        not_exported = {
1142            'MSG_OOB', 'FTP_PORT', 'MAXLINE', 'CRLF', 'B_CRLF', 'Error',
1143            'parse150', 'parse227', 'parse229', 'parse257', 'print_line',
1144            'ftpcp', 'test'}
1145        support.check__all__(self, ftplib, not_exported=not_exported)
1146
1147
1148def setUpModule():
1149    thread_info = threading_helper.threading_setup()
1150    unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
1151
1152
1153if __name__ == '__main__':
1154    unittest.main()
1155