1"""Test script for ftplib module."""
2
3# Modified by Giampaolo Rodola' to test FTP class, IPv6 and TLS
4# environment
5
6import ftplib
7import asyncore
8import asynchat
9import socket
10import io
11import errno
12import os
13import threading
14import time
15try:
16    import ssl
17except ImportError:
18    ssl = None
19
20from unittest import TestCase, skipUnless
21from test import support
22from test.support import HOST, HOSTv6
23
24TIMEOUT = 3
25# the dummy data returned by server over the data channel when
26# RETR, LIST, NLST, MLSD commands are issued
27RETR_DATA = 'abcde12345\r\n' * 1000
28LIST_DATA = 'foo\r\nbar\r\n'
29NLST_DATA = 'foo\r\nbar\r\n'
30MLSD_DATA = ("type=cdir;perm=el;unique==keVO1+ZF4; test\r\n"
31             "type=pdir;perm=e;unique==keVO1+d?3; ..\r\n"
32             "type=OS.unix=slink:/foobar;perm=;unique==keVO1+4G4; foobar\r\n"
33             "type=OS.unix=chr-13/29;perm=;unique==keVO1+5G4; device\r\n"
34             "type=OS.unix=blk-11/108;perm=;unique==keVO1+6G4; block\r\n"
35             "type=file;perm=awr;unique==keVO1+8G4; writable\r\n"
36             "type=dir;perm=cpmel;unique==keVO1+7G4; promiscuous\r\n"
37             "type=dir;perm=;unique==keVO1+1t2; no-exec\r\n"
38             "type=file;perm=r;unique==keVO1+EG4; two words\r\n"
39             "type=file;perm=r;unique==keVO1+IH4;  leading space\r\n"
40             "type=file;perm=r;unique==keVO1+1G4; file1\r\n"
41             "type=dir;perm=cpmel;unique==keVO1+7G4; incoming\r\n"
42             "type=file;perm=r;unique==keVO1+1G4; file2\r\n"
43             "type=file;perm=r;unique==keVO1+1G4; file3\r\n"
44             "type=file;perm=r;unique==keVO1+1G4; file4\r\n")
45
46
47class DummyDTPHandler(asynchat.async_chat):
48    dtp_conn_closed = False
49
50    def __init__(self, conn, baseclass):
51        asynchat.async_chat.__init__(self, conn)
52        self.baseclass = baseclass
53        self.baseclass.last_received_data = ''
54
55    def handle_read(self):
56        self.baseclass.last_received_data += self.recv(1024).decode('ascii')
57
58    def handle_close(self):
59        # XXX: this method can be called many times in a row for a single
60        # connection, including in clear-text (non-TLS) mode.
61        # (behaviour witnessed with test_data_connection)
62        if not self.dtp_conn_closed:
63            self.baseclass.push('226 transfer complete')
64            self.close()
65            self.dtp_conn_closed = True
66
67    def push(self, what):
68        if self.baseclass.next_data is not None:
69            what = self.baseclass.next_data
70            self.baseclass.next_data = None
71        if not what:
72            return self.close_when_done()
73        super(DummyDTPHandler, self).push(what.encode('ascii'))
74
75    def handle_error(self):
76        raise Exception
77
78
79class DummyFTPHandler(asynchat.async_chat):
80
81    dtp_handler = DummyDTPHandler
82
83    def __init__(self, conn):
84        asynchat.async_chat.__init__(self, conn)
85        # tells the socket to handle urgent data inline (ABOR command)
86        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_OOBINLINE, 1)
87        self.set_terminator(b"\r\n")
88        self.in_buffer = []
89        self.dtp = None
90        self.last_received_cmd = None
91        self.last_received_data = ''
92        self.next_response = ''
93        self.next_data = None
94        self.rest = None
95        self.next_retr_data = RETR_DATA
96        self.push('220 welcome')
97        # We use this as the string IPv4 address to direct the client
98        # to in response to a PASV command.  To test security behavior.
99        # https://bugs.python.org/issue43285/.
100        self.fake_pasv_server_ip = '252.253.254.255'
101
102    def collect_incoming_data(self, data):
103        self.in_buffer.append(data)
104
105    def found_terminator(self):
106        line = b''.join(self.in_buffer).decode('ascii')
107        self.in_buffer = []
108        if self.next_response:
109            self.push(self.next_response)
110            self.next_response = ''
111        cmd = line.split(' ')[0].lower()
112        self.last_received_cmd = cmd
113        space = line.find(' ')
114        if space != -1:
115            arg = line[space + 1:]
116        else:
117            arg = ""
118        if hasattr(self, 'cmd_' + cmd):
119            method = getattr(self, 'cmd_' + cmd)
120            method(arg)
121        else:
122            self.push('550 command "%s" not understood.' %cmd)
123
124    def handle_error(self):
125        raise Exception
126
127    def push(self, data):
128        asynchat.async_chat.push(self, data.encode('ascii') + b'\r\n')
129
130    def cmd_port(self, arg):
131        addr = list(map(int, arg.split(',')))
132        ip = '%d.%d.%d.%d' %tuple(addr[:4])
133        port = (addr[4] * 256) + addr[5]
134        s = socket.create_connection((ip, port), timeout=TIMEOUT)
135        self.dtp = self.dtp_handler(s, baseclass=self)
136        self.push('200 active data connection established')
137
138    def cmd_pasv(self, arg):
139        with socket.create_server((self.socket.getsockname()[0], 0)) as sock:
140            sock.settimeout(TIMEOUT)
141            port = sock.getsockname()[1]
142            ip = self.fake_pasv_server_ip
143            ip = ip.replace('.', ','); p1 = port / 256; p2 = port % 256
144            self.push('227 entering passive mode (%s,%d,%d)' %(ip, p1, p2))
145            conn, addr = sock.accept()
146            self.dtp = self.dtp_handler(conn, baseclass=self)
147
148    def cmd_eprt(self, arg):
149        af, ip, port = arg.split(arg[0])[1:-1]
150        port = int(port)
151        s = socket.create_connection((ip, port), timeout=TIMEOUT)
152        self.dtp = self.dtp_handler(s, baseclass=self)
153        self.push('200 active data connection established')
154
155    def cmd_epsv(self, arg):
156        with socket.create_server((self.socket.getsockname()[0], 0),
157                                  family=socket.AF_INET6) as sock:
158            sock.settimeout(TIMEOUT)
159            port = sock.getsockname()[1]
160            self.push('229 entering extended passive mode (|||%d|)' %port)
161            conn, addr = sock.accept()
162            self.dtp = self.dtp_handler(conn, baseclass=self)
163
164    def cmd_echo(self, arg):
165        # sends back the received string (used by the test suite)
166        self.push(arg)
167
168    def cmd_noop(self, arg):
169        self.push('200 noop ok')
170
171    def cmd_user(self, arg):
172        self.push('331 username ok')
173
174    def cmd_pass(self, arg):
175        self.push('230 password ok')
176
177    def cmd_acct(self, arg):
178        self.push('230 acct ok')
179
180    def cmd_rnfr(self, arg):
181        self.push('350 rnfr ok')
182
183    def cmd_rnto(self, arg):
184        self.push('250 rnto ok')
185
186    def cmd_dele(self, arg):
187        self.push('250 dele ok')
188
189    def cmd_cwd(self, arg):
190        self.push('250 cwd ok')
191
192    def cmd_size(self, arg):
193        self.push('250 1000')
194
195    def cmd_mkd(self, arg):
196        self.push('257 "%s"' %arg)
197
198    def cmd_rmd(self, arg):
199        self.push('250 rmd ok')
200
201    def cmd_pwd(self, arg):
202        self.push('257 "pwd ok"')
203
204    def cmd_type(self, arg):
205        self.push('200 type ok')
206
207    def cmd_quit(self, arg):
208        self.push('221 quit ok')
209        self.close()
210
211    def cmd_abor(self, arg):
212        self.push('226 abor ok')
213
214    def cmd_stor(self, arg):
215        self.push('125 stor ok')
216
217    def cmd_rest(self, arg):
218        self.rest = arg
219        self.push('350 rest ok')
220
221    def cmd_retr(self, arg):
222        self.push('125 retr ok')
223        if self.rest is not None:
224            offset = int(self.rest)
225        else:
226            offset = 0
227        self.dtp.push(self.next_retr_data[offset:])
228        self.dtp.close_when_done()
229        self.rest = None
230
231    def cmd_list(self, arg):
232        self.push('125 list ok')
233        self.dtp.push(LIST_DATA)
234        self.dtp.close_when_done()
235
236    def cmd_nlst(self, arg):
237        self.push('125 nlst ok')
238        self.dtp.push(NLST_DATA)
239        self.dtp.close_when_done()
240
241    def cmd_opts(self, arg):
242        self.push('200 opts ok')
243
244    def cmd_mlsd(self, arg):
245        self.push('125 mlsd ok')
246        self.dtp.push(MLSD_DATA)
247        self.dtp.close_when_done()
248
249    def cmd_setlongretr(self, arg):
250        # For testing. Next RETR will return long line.
251        self.next_retr_data = 'x' * int(arg)
252        self.push('125 setlongretr ok')
253
254
255class DummyFTPServer(asyncore.dispatcher, threading.Thread):
256
257    handler = DummyFTPHandler
258
259    def __init__(self, address, af=socket.AF_INET):
260        threading.Thread.__init__(self)
261        asyncore.dispatcher.__init__(self)
262        self.daemon = True
263        self.create_socket(af, socket.SOCK_STREAM)
264        self.bind(address)
265        self.listen(5)
266        self.active = False
267        self.active_lock = threading.Lock()
268        self.host, self.port = self.socket.getsockname()[:2]
269        self.handler_instance = None
270
271    def start(self):
272        assert not self.active
273        self.__flag = threading.Event()
274        threading.Thread.start(self)
275        self.__flag.wait()
276
277    def run(self):
278        self.active = True
279        self.__flag.set()
280        while self.active and asyncore.socket_map:
281            self.active_lock.acquire()
282            asyncore.loop(timeout=0.1, count=1)
283            self.active_lock.release()
284        asyncore.close_all(ignore_all=True)
285
286    def stop(self):
287        assert self.active
288        self.active = False
289        self.join()
290
291    def handle_accepted(self, conn, addr):
292        self.handler_instance = self.handler(conn)
293
294    def handle_connect(self):
295        self.close()
296    handle_read = handle_connect
297
298    def writable(self):
299        return 0
300
301    def handle_error(self):
302        raise Exception
303
304
305if ssl is not None:
306
307    CERTFILE = os.path.join(os.path.dirname(__file__), "keycert3.pem")
308    CAFILE = os.path.join(os.path.dirname(__file__), "pycacert.pem")
309
310    class SSLConnection(asyncore.dispatcher):
311        """An asyncore.dispatcher subclass supporting TLS/SSL."""
312
313        _ssl_accepting = False
314        _ssl_closing = False
315
316        def secure_connection(self):
317            context = ssl.SSLContext()
318            context.load_cert_chain(CERTFILE)
319            socket = context.wrap_socket(self.socket,
320                                         suppress_ragged_eofs=False,
321                                         server_side=True,
322                                         do_handshake_on_connect=False)
323            self.del_channel()
324            self.set_socket(socket)
325            self._ssl_accepting = True
326
327        def _do_ssl_handshake(self):
328            try:
329                self.socket.do_handshake()
330            except ssl.SSLError as err:
331                if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
332                                   ssl.SSL_ERROR_WANT_WRITE):
333                    return
334                elif err.args[0] == ssl.SSL_ERROR_EOF:
335                    return self.handle_close()
336                # TODO: SSLError does not expose alert information
337                elif "SSLV3_ALERT_BAD_CERTIFICATE" in err.args[1]:
338                    return self.handle_close()
339                raise
340            except OSError as err:
341                if err.args[0] == errno.ECONNABORTED:
342                    return self.handle_close()
343            else:
344                self._ssl_accepting = False
345
346        def _do_ssl_shutdown(self):
347            self._ssl_closing = True
348            try:
349                self.socket = self.socket.unwrap()
350            except ssl.SSLError as err:
351                if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
352                                   ssl.SSL_ERROR_WANT_WRITE):
353                    return
354            except OSError as err:
355                # Any "socket error" corresponds to a SSL_ERROR_SYSCALL return
356                # from OpenSSL's SSL_shutdown(), corresponding to a
357                # closed socket condition. See also:
358                # http://www.mail-archive.com/openssl-users@openssl.org/msg60710.html
359                pass
360            self._ssl_closing = False
361            if getattr(self, '_ccc', False) is False:
362                super(SSLConnection, self).close()
363            else:
364                pass
365
366        def handle_read_event(self):
367            if self._ssl_accepting:
368                self._do_ssl_handshake()
369            elif self._ssl_closing:
370                self._do_ssl_shutdown()
371            else:
372                super(SSLConnection, self).handle_read_event()
373
374        def handle_write_event(self):
375            if self._ssl_accepting:
376                self._do_ssl_handshake()
377            elif self._ssl_closing:
378                self._do_ssl_shutdown()
379            else:
380                super(SSLConnection, self).handle_write_event()
381
382        def send(self, data):
383            try:
384                return super(SSLConnection, self).send(data)
385            except ssl.SSLError as err:
386                if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN,
387                                   ssl.SSL_ERROR_WANT_READ,
388                                   ssl.SSL_ERROR_WANT_WRITE):
389                    return 0
390                raise
391
392        def recv(self, buffer_size):
393            try:
394                return super(SSLConnection, self).recv(buffer_size)
395            except ssl.SSLError as err:
396                if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
397                                   ssl.SSL_ERROR_WANT_WRITE):
398                    return b''
399                if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN):
400                    self.handle_close()
401                    return b''
402                raise
403
404        def handle_error(self):
405            raise Exception
406
407        def close(self):
408            if (isinstance(self.socket, ssl.SSLSocket) and
409                    self.socket._sslobj is not None):
410                self._do_ssl_shutdown()
411            else:
412                super(SSLConnection, self).close()
413
414
415    class DummyTLS_DTPHandler(SSLConnection, DummyDTPHandler):
416        """A DummyDTPHandler subclass supporting TLS/SSL."""
417
418        def __init__(self, conn, baseclass):
419            DummyDTPHandler.__init__(self, conn, baseclass)
420            if self.baseclass.secure_data_channel:
421                self.secure_connection()
422
423
424    class DummyTLS_FTPHandler(SSLConnection, DummyFTPHandler):
425        """A DummyFTPHandler subclass supporting TLS/SSL."""
426
427        dtp_handler = DummyTLS_DTPHandler
428
429        def __init__(self, conn):
430            DummyFTPHandler.__init__(self, conn)
431            self.secure_data_channel = False
432            self._ccc = False
433
434        def cmd_auth(self, line):
435            """Set up secure control channel."""
436            self.push('234 AUTH TLS successful')
437            self.secure_connection()
438
439        def cmd_ccc(self, line):
440            self.push('220 Reverting back to clear-text')
441            self._ccc = True
442            self._do_ssl_shutdown()
443
444        def cmd_pbsz(self, line):
445            """Negotiate size of buffer for secure data transfer.
446            For TLS/SSL the only valid value for the parameter is '0'.
447            Any other value is accepted but ignored.
448            """
449            self.push('200 PBSZ=0 successful.')
450
451        def cmd_prot(self, line):
452            """Setup un/secure data channel."""
453            arg = line.upper()
454            if arg == 'C':
455                self.push('200 Protection set to Clear')
456                self.secure_data_channel = False
457            elif arg == 'P':
458                self.push('200 Protection set to Private')
459                self.secure_data_channel = True
460            else:
461                self.push("502 Unrecognized PROT type (use C or P).")
462
463
464    class DummyTLS_FTPServer(DummyFTPServer):
465        handler = DummyTLS_FTPHandler
466
467
468class TestFTPClass(TestCase):
469
470    def setUp(self):
471        self.server = DummyFTPServer((HOST, 0))
472        self.server.start()
473        self.client = ftplib.FTP(timeout=TIMEOUT)
474        self.client.connect(self.server.host, self.server.port)
475
476    def tearDown(self):
477        self.client.close()
478        self.server.stop()
479        # Explicitly clear the attribute to prevent dangling thread
480        self.server = None
481        asyncore.close_all(ignore_all=True)
482
483    def check_data(self, received, expected):
484        self.assertEqual(len(received), len(expected))
485        self.assertEqual(received, expected)
486
487    def test_getwelcome(self):
488        self.assertEqual(self.client.getwelcome(), '220 welcome')
489
490    def test_sanitize(self):
491        self.assertEqual(self.client.sanitize('foo'), repr('foo'))
492        self.assertEqual(self.client.sanitize('pass 12345'), repr('pass *****'))
493        self.assertEqual(self.client.sanitize('PASS 12345'), repr('PASS *****'))
494
495    def test_exceptions(self):
496        self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\r\n0')
497        self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\n0')
498        self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\r0')
499        self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 400')
500        self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 499')
501        self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 500')
502        self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 599')
503        self.assertRaises(ftplib.error_proto, self.client.sendcmd, 'echo 999')
504
505    def test_all_errors(self):
506        exceptions = (ftplib.error_reply, ftplib.error_temp, ftplib.error_perm,
507                      ftplib.error_proto, ftplib.Error, OSError,
508                      EOFError)
509        for x in exceptions:
510            try:
511                raise x('exception not included in all_errors set')
512            except ftplib.all_errors:
513                pass
514
515    def test_set_pasv(self):
516        # passive mode is supposed to be enabled by default
517        self.assertTrue(self.client.passiveserver)
518        self.client.set_pasv(True)
519        self.assertTrue(self.client.passiveserver)
520        self.client.set_pasv(False)
521        self.assertFalse(self.client.passiveserver)
522
523    def test_voidcmd(self):
524        self.client.voidcmd('echo 200')
525        self.client.voidcmd('echo 299')
526        self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 199')
527        self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 300')
528
529    def test_login(self):
530        self.client.login()
531
532    def test_acct(self):
533        self.client.acct('passwd')
534
535    def test_rename(self):
536        self.client.rename('a', 'b')
537        self.server.handler_instance.next_response = '200'
538        self.assertRaises(ftplib.error_reply, self.client.rename, 'a', 'b')
539
540    def test_delete(self):
541        self.client.delete('foo')
542        self.server.handler_instance.next_response = '199'
543        self.assertRaises(ftplib.error_reply, self.client.delete, 'foo')
544
545    def test_size(self):
546        self.client.size('foo')
547
548    def test_mkd(self):
549        dir = self.client.mkd('/foo')
550        self.assertEqual(dir, '/foo')
551
552    def test_rmd(self):
553        self.client.rmd('foo')
554
555    def test_cwd(self):
556        dir = self.client.cwd('/foo')
557        self.assertEqual(dir, '250 cwd ok')
558
559    def test_pwd(self):
560        dir = self.client.pwd()
561        self.assertEqual(dir, 'pwd ok')
562
563    def test_quit(self):
564        self.assertEqual(self.client.quit(), '221 quit ok')
565        # Ensure the connection gets closed; sock attribute should be None
566        self.assertEqual(self.client.sock, None)
567
568    def test_abort(self):
569        self.client.abort()
570
571    def test_retrbinary(self):
572        def callback(data):
573            received.append(data.decode('ascii'))
574        received = []
575        self.client.retrbinary('retr', callback)
576        self.check_data(''.join(received), RETR_DATA)
577
578    def test_retrbinary_rest(self):
579        def callback(data):
580            received.append(data.decode('ascii'))
581        for rest in (0, 10, 20):
582            received = []
583            self.client.retrbinary('retr', callback, rest=rest)
584            self.check_data(''.join(received), RETR_DATA[rest:])
585
586    def test_retrlines(self):
587        received = []
588        self.client.retrlines('retr', received.append)
589        self.check_data(''.join(received), RETR_DATA.replace('\r\n', ''))
590
591    def test_storbinary(self):
592        f = io.BytesIO(RETR_DATA.encode('ascii'))
593        self.client.storbinary('stor', f)
594        self.check_data(self.server.handler_instance.last_received_data, RETR_DATA)
595        # test new callback arg
596        flag = []
597        f.seek(0)
598        self.client.storbinary('stor', f, callback=lambda x: flag.append(None))
599        self.assertTrue(flag)
600
601    def test_storbinary_rest(self):
602        f = io.BytesIO(RETR_DATA.replace('\r\n', '\n').encode('ascii'))
603        for r in (30, '30'):
604            f.seek(0)
605            self.client.storbinary('stor', f, rest=r)
606            self.assertEqual(self.server.handler_instance.rest, str(r))
607
608    def test_storlines(self):
609        f = io.BytesIO(RETR_DATA.replace('\r\n', '\n').encode('ascii'))
610        self.client.storlines('stor', f)
611        self.check_data(self.server.handler_instance.last_received_data, RETR_DATA)
612        # test new callback arg
613        flag = []
614        f.seek(0)
615        self.client.storlines('stor foo', f, callback=lambda x: flag.append(None))
616        self.assertTrue(flag)
617
618        f = io.StringIO(RETR_DATA.replace('\r\n', '\n'))
619        # storlines() expects a binary file, not a text file
620        with support.check_warnings(('', BytesWarning), quiet=True):
621            self.assertRaises(TypeError, self.client.storlines, 'stor foo', f)
622
623    def test_nlst(self):
624        self.client.nlst()
625        self.assertEqual(self.client.nlst(), NLST_DATA.split('\r\n')[:-1])
626
627    def test_dir(self):
628        l = []
629        self.client.dir(lambda x: l.append(x))
630        self.assertEqual(''.join(l), LIST_DATA.replace('\r\n', ''))
631
632    def test_mlsd(self):
633        list(self.client.mlsd())
634        list(self.client.mlsd(path='/'))
635        list(self.client.mlsd(path='/', facts=['size', 'type']))
636
637        ls = list(self.client.mlsd())
638        for name, facts in ls:
639            self.assertIsInstance(name, str)
640            self.assertIsInstance(facts, dict)
641            self.assertTrue(name)
642            self.assertIn('type', facts)
643            self.assertIn('perm', facts)
644            self.assertIn('unique', facts)
645
646        def set_data(data):
647            self.server.handler_instance.next_data = data
648
649        def test_entry(line, type=None, perm=None, unique=None, name=None):
650            type = 'type' if type is None else type
651            perm = 'perm' if perm is None else perm
652            unique = 'unique' if unique is None else unique
653            name = 'name' if name is None else name
654            set_data(line)
655            _name, facts = next(self.client.mlsd())
656            self.assertEqual(_name, name)
657            self.assertEqual(facts['type'], type)
658            self.assertEqual(facts['perm'], perm)
659            self.assertEqual(facts['unique'], unique)
660
661        # plain
662        test_entry('type=type;perm=perm;unique=unique; name\r\n')
663        # "=" in fact value
664        test_entry('type=ty=pe;perm=perm;unique=unique; name\r\n', type="ty=pe")
665        test_entry('type==type;perm=perm;unique=unique; name\r\n', type="=type")
666        test_entry('type=t=y=pe;perm=perm;unique=unique; name\r\n', type="t=y=pe")
667        test_entry('type=====;perm=perm;unique=unique; name\r\n', type="====")
668        # spaces in name
669        test_entry('type=type;perm=perm;unique=unique; na me\r\n', name="na me")
670        test_entry('type=type;perm=perm;unique=unique; name \r\n', name="name ")
671        test_entry('type=type;perm=perm;unique=unique;  name\r\n', name=" name")
672        test_entry('type=type;perm=perm;unique=unique; n am  e\r\n', name="n am  e")
673        # ";" in name
674        test_entry('type=type;perm=perm;unique=unique; na;me\r\n', name="na;me")
675        test_entry('type=type;perm=perm;unique=unique; ;name\r\n', name=";name")
676        test_entry('type=type;perm=perm;unique=unique; ;name;\r\n', name=";name;")
677        test_entry('type=type;perm=perm;unique=unique; ;;;;\r\n', name=";;;;")
678        # case sensitiveness
679        set_data('Type=type;TyPe=perm;UNIQUE=unique; name\r\n')
680        _name, facts = next(self.client.mlsd())
681        for x in facts:
682            self.assertTrue(x.islower())
683        # no data (directory empty)
684        set_data('')
685        self.assertRaises(StopIteration, next, self.client.mlsd())
686        set_data('')
687        for x in self.client.mlsd():
688            self.fail("unexpected data %s" % x)
689
690    def test_makeport(self):
691        with self.client.makeport():
692            # IPv4 is in use, just make sure send_eprt has not been used
693            self.assertEqual(self.server.handler_instance.last_received_cmd,
694                                'port')
695
696    def test_makepasv(self):
697        host, port = self.client.makepasv()
698        conn = socket.create_connection((host, port), timeout=TIMEOUT)
699        conn.close()
700        # IPv4 is in use, just make sure send_epsv has not been used
701        self.assertEqual(self.server.handler_instance.last_received_cmd, 'pasv')
702
703    def test_makepasv_issue43285_security_disabled(self):
704        """Test the opt-in to the old vulnerable behavior."""
705        self.client.trust_server_pasv_ipv4_address = True
706        bad_host, port = self.client.makepasv()
707        self.assertEqual(
708                bad_host, self.server.handler_instance.fake_pasv_server_ip)
709        # Opening and closing a connection keeps the dummy server happy
710        # instead of timing out on accept.
711        socket.create_connection((self.client.sock.getpeername()[0], port),
712                                 timeout=TIMEOUT).close()
713
714    def test_makepasv_issue43285_security_enabled_default(self):
715        self.assertFalse(self.client.trust_server_pasv_ipv4_address)
716        trusted_host, port = self.client.makepasv()
717        self.assertNotEqual(
718                trusted_host, self.server.handler_instance.fake_pasv_server_ip)
719        # Opening and closing a connection keeps the dummy server happy
720        # instead of timing out on accept.
721        socket.create_connection((trusted_host, port), timeout=TIMEOUT).close()
722
723    def test_with_statement(self):
724        self.client.quit()
725
726        def is_client_connected():
727            if self.client.sock is None:
728                return False
729            try:
730                self.client.sendcmd('noop')
731            except (OSError, EOFError):
732                return False
733            return True
734
735        # base test
736        with ftplib.FTP(timeout=TIMEOUT) as self.client:
737            self.client.connect(self.server.host, self.server.port)
738            self.client.sendcmd('noop')
739            self.assertTrue(is_client_connected())
740        self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
741        self.assertFalse(is_client_connected())
742
743        # QUIT sent inside the with block
744        with ftplib.FTP(timeout=TIMEOUT) as self.client:
745            self.client.connect(self.server.host, self.server.port)
746            self.client.sendcmd('noop')
747            self.client.quit()
748        self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
749        self.assertFalse(is_client_connected())
750
751        # force a wrong response code to be sent on QUIT: error_perm
752        # is expected and the connection is supposed to be closed
753        try:
754            with ftplib.FTP(timeout=TIMEOUT) as self.client:
755                self.client.connect(self.server.host, self.server.port)
756                self.client.sendcmd('noop')
757                self.server.handler_instance.next_response = '550 error on quit'
758        except ftplib.error_perm as err:
759            self.assertEqual(str(err), '550 error on quit')
760        else:
761            self.fail('Exception not raised')
762        # needed to give the threaded server some time to set the attribute
763        # which otherwise would still be == 'noop'
764        time.sleep(0.1)
765        self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
766        self.assertFalse(is_client_connected())
767
768    def test_source_address(self):
769        self.client.quit()
770        port = support.find_unused_port()
771        try:
772            self.client.connect(self.server.host, self.server.port,
773                                source_address=(HOST, port))
774            self.assertEqual(self.client.sock.getsockname()[1], port)
775            self.client.quit()
776        except OSError as e:
777            if e.errno == errno.EADDRINUSE:
778                self.skipTest("couldn't bind to port %d" % port)
779            raise
780
781    def test_source_address_passive_connection(self):
782        port = support.find_unused_port()
783        self.client.source_address = (HOST, port)
784        try:
785            with self.client.transfercmd('list') as sock:
786                self.assertEqual(sock.getsockname()[1], port)
787        except OSError as e:
788            if e.errno == errno.EADDRINUSE:
789                self.skipTest("couldn't bind to port %d" % port)
790            raise
791
792    def test_parse257(self):
793        self.assertEqual(ftplib.parse257('257 "/foo/bar"'), '/foo/bar')
794        self.assertEqual(ftplib.parse257('257 "/foo/bar" created'), '/foo/bar')
795        self.assertEqual(ftplib.parse257('257 ""'), '')
796        self.assertEqual(ftplib.parse257('257 "" created'), '')
797        self.assertRaises(ftplib.error_reply, ftplib.parse257, '250 "/foo/bar"')
798        # The 257 response is supposed to include the directory
799        # name and in case it contains embedded double-quotes
800        # they must be doubled (see RFC-959, chapter 7, appendix 2).
801        self.assertEqual(ftplib.parse257('257 "/foo/b""ar"'), '/foo/b"ar')
802        self.assertEqual(ftplib.parse257('257 "/foo/b""ar" created'), '/foo/b"ar')
803
804    def test_line_too_long(self):
805        self.assertRaises(ftplib.Error, self.client.sendcmd,
806                          'x' * self.client.maxline * 2)
807
808    def test_retrlines_too_long(self):
809        self.client.sendcmd('SETLONGRETR %d' % (self.client.maxline * 2))
810        received = []
811        self.assertRaises(ftplib.Error,
812                          self.client.retrlines, 'retr', received.append)
813
814    def test_storlines_too_long(self):
815        f = io.BytesIO(b'x' * self.client.maxline * 2)
816        self.assertRaises(ftplib.Error, self.client.storlines, 'stor', f)
817
818
819@skipUnless(support.IPV6_ENABLED, "IPv6 not enabled")
820class TestIPv6Environment(TestCase):
821
822    def setUp(self):
823        self.server = DummyFTPServer((HOSTv6, 0), af=socket.AF_INET6)
824        self.server.start()
825        self.client = ftplib.FTP(timeout=TIMEOUT)
826        self.client.connect(self.server.host, self.server.port)
827
828    def tearDown(self):
829        self.client.close()
830        self.server.stop()
831        # Explicitly clear the attribute to prevent dangling thread
832        self.server = None
833        asyncore.close_all(ignore_all=True)
834
835    def test_af(self):
836        self.assertEqual(self.client.af, socket.AF_INET6)
837
838    def test_makeport(self):
839        with self.client.makeport():
840            self.assertEqual(self.server.handler_instance.last_received_cmd,
841                                'eprt')
842
843    def test_makepasv(self):
844        host, port = self.client.makepasv()
845        conn = socket.create_connection((host, port), timeout=TIMEOUT)
846        conn.close()
847        self.assertEqual(self.server.handler_instance.last_received_cmd, 'epsv')
848
849    def test_transfer(self):
850        def retr():
851            def callback(data):
852                received.append(data.decode('ascii'))
853            received = []
854            self.client.retrbinary('retr', callback)
855            self.assertEqual(len(''.join(received)), len(RETR_DATA))
856            self.assertEqual(''.join(received), RETR_DATA)
857        self.client.set_pasv(True)
858        retr()
859        self.client.set_pasv(False)
860        retr()
861
862
863@skipUnless(ssl, "SSL not available")
864class TestTLS_FTPClassMixin(TestFTPClass):
865    """Repeat TestFTPClass tests starting the TLS layer for both control
866    and data connections first.
867    """
868
869    def setUp(self):
870        self.server = DummyTLS_FTPServer((HOST, 0))
871        self.server.start()
872        self.client = ftplib.FTP_TLS(timeout=TIMEOUT)
873        self.client.connect(self.server.host, self.server.port)
874        # enable TLS
875        self.client.auth()
876        self.client.prot_p()
877
878
879@skipUnless(ssl, "SSL not available")
880class TestTLS_FTPClass(TestCase):
881    """Specific TLS_FTP class tests."""
882
883    def setUp(self):
884        self.server = DummyTLS_FTPServer((HOST, 0))
885        self.server.start()
886        self.client = ftplib.FTP_TLS(timeout=TIMEOUT)
887        self.client.connect(self.server.host, self.server.port)
888
889    def tearDown(self):
890        self.client.close()
891        self.server.stop()
892        # Explicitly clear the attribute to prevent dangling thread
893        self.server = None
894        asyncore.close_all(ignore_all=True)
895
896    def test_control_connection(self):
897        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
898        self.client.auth()
899        self.assertIsInstance(self.client.sock, ssl.SSLSocket)
900
901    def test_data_connection(self):
902        # clear text
903        with self.client.transfercmd('list') as sock:
904            self.assertNotIsInstance(sock, ssl.SSLSocket)
905            self.assertEqual(sock.recv(1024), LIST_DATA.encode('ascii'))
906        self.assertEqual(self.client.voidresp(), "226 transfer complete")
907
908        # secured, after PROT P
909        self.client.prot_p()
910        with self.client.transfercmd('list') as sock:
911            self.assertIsInstance(sock, ssl.SSLSocket)
912            # consume from SSL socket to finalize handshake and avoid
913            # "SSLError [SSL] shutdown while in init"
914            self.assertEqual(sock.recv(1024), LIST_DATA.encode('ascii'))
915        self.assertEqual(self.client.voidresp(), "226 transfer complete")
916
917        # PROT C is issued, the connection must be in cleartext again
918        self.client.prot_c()
919        with self.client.transfercmd('list') as sock:
920            self.assertNotIsInstance(sock, ssl.SSLSocket)
921            self.assertEqual(sock.recv(1024), LIST_DATA.encode('ascii'))
922        self.assertEqual(self.client.voidresp(), "226 transfer complete")
923
924    def test_login(self):
925        # login() is supposed to implicitly secure the control connection
926        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
927        self.client.login()
928        self.assertIsInstance(self.client.sock, ssl.SSLSocket)
929        # make sure that AUTH TLS doesn't get issued again
930        self.client.login()
931
932    def test_auth_issued_twice(self):
933        self.client.auth()
934        self.assertRaises(ValueError, self.client.auth)
935
936    def test_context(self):
937        self.client.quit()
938        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
939        ctx.check_hostname = False
940        ctx.verify_mode = ssl.CERT_NONE
941        self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE,
942                          context=ctx)
943        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
944                          context=ctx)
945        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
946                          keyfile=CERTFILE, context=ctx)
947
948        self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
949        self.client.connect(self.server.host, self.server.port)
950        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
951        self.client.auth()
952        self.assertIs(self.client.sock.context, ctx)
953        self.assertIsInstance(self.client.sock, ssl.SSLSocket)
954
955        self.client.prot_p()
956        with self.client.transfercmd('list') as sock:
957            self.assertIs(sock.context, ctx)
958            self.assertIsInstance(sock, ssl.SSLSocket)
959
960    def test_ccc(self):
961        self.assertRaises(ValueError, self.client.ccc)
962        self.client.login(secure=True)
963        self.assertIsInstance(self.client.sock, ssl.SSLSocket)
964        self.client.ccc()
965        self.assertRaises(ValueError, self.client.sock.unwrap)
966
967    @skipUnless(False, "FIXME: bpo-32706")
968    def test_check_hostname(self):
969        self.client.quit()
970        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
971        self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
972        self.assertEqual(ctx.check_hostname, True)
973        ctx.load_verify_locations(CAFILE)
974        self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
975
976        # 127.0.0.1 doesn't match SAN
977        self.client.connect(self.server.host, self.server.port)
978        with self.assertRaises(ssl.CertificateError):
979            self.client.auth()
980        # exception quits connection
981
982        self.client.connect(self.server.host, self.server.port)
983        self.client.prot_p()
984        with self.assertRaises(ssl.CertificateError):
985            with self.client.transfercmd("list") as sock:
986                pass
987        self.client.quit()
988
989        self.client.connect("localhost", self.server.port)
990        self.client.auth()
991        self.client.quit()
992
993        self.client.connect("localhost", self.server.port)
994        self.client.prot_p()
995        with self.client.transfercmd("list") as sock:
996            pass
997
998
999class TestTimeouts(TestCase):
1000
1001    def setUp(self):
1002        self.evt = threading.Event()
1003        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1004        self.sock.settimeout(20)
1005        self.port = support.bind_port(self.sock)
1006        self.server_thread = threading.Thread(target=self.server)
1007        self.server_thread.daemon = True
1008        self.server_thread.start()
1009        # Wait for the server to be ready.
1010        self.evt.wait()
1011        self.evt.clear()
1012        self.old_port = ftplib.FTP.port
1013        ftplib.FTP.port = self.port
1014
1015    def tearDown(self):
1016        ftplib.FTP.port = self.old_port
1017        self.server_thread.join()
1018        # Explicitly clear the attribute to prevent dangling thread
1019        self.server_thread = None
1020
1021    def server(self):
1022        # This method sets the evt 3 times:
1023        #  1) when the connection is ready to be accepted.
1024        #  2) when it is safe for the caller to close the connection
1025        #  3) when we have closed the socket
1026        self.sock.listen()
1027        # (1) Signal the caller that we are ready to accept the connection.
1028        self.evt.set()
1029        try:
1030            conn, addr = self.sock.accept()
1031        except socket.timeout:
1032            pass
1033        else:
1034            conn.sendall(b"1 Hola mundo\n")
1035            conn.shutdown(socket.SHUT_WR)
1036            # (2) Signal the caller that it is safe to close the socket.
1037            self.evt.set()
1038            conn.close()
1039        finally:
1040            self.sock.close()
1041
1042    def testTimeoutDefault(self):
1043        # default -- use global socket timeout
1044        self.assertIsNone(socket.getdefaulttimeout())
1045        socket.setdefaulttimeout(30)
1046        try:
1047            ftp = ftplib.FTP(HOST)
1048        finally:
1049            socket.setdefaulttimeout(None)
1050        self.assertEqual(ftp.sock.gettimeout(), 30)
1051        self.evt.wait()
1052        ftp.close()
1053
1054    def testTimeoutNone(self):
1055        # no timeout -- do not use global socket timeout
1056        self.assertIsNone(socket.getdefaulttimeout())
1057        socket.setdefaulttimeout(30)
1058        try:
1059            ftp = ftplib.FTP(HOST, timeout=None)
1060        finally:
1061            socket.setdefaulttimeout(None)
1062        self.assertIsNone(ftp.sock.gettimeout())
1063        self.evt.wait()
1064        ftp.close()
1065
1066    def testTimeoutValue(self):
1067        # a value
1068        ftp = ftplib.FTP(HOST, timeout=30)
1069        self.assertEqual(ftp.sock.gettimeout(), 30)
1070        self.evt.wait()
1071        ftp.close()
1072
1073    def testTimeoutConnect(self):
1074        ftp = ftplib.FTP()
1075        ftp.connect(HOST, timeout=30)
1076        self.assertEqual(ftp.sock.gettimeout(), 30)
1077        self.evt.wait()
1078        ftp.close()
1079
1080    def testTimeoutDifferentOrder(self):
1081        ftp = ftplib.FTP(timeout=30)
1082        ftp.connect(HOST)
1083        self.assertEqual(ftp.sock.gettimeout(), 30)
1084        self.evt.wait()
1085        ftp.close()
1086
1087    def testTimeoutDirectAccess(self):
1088        ftp = ftplib.FTP()
1089        ftp.timeout = 30
1090        ftp.connect(HOST)
1091        self.assertEqual(ftp.sock.gettimeout(), 30)
1092        self.evt.wait()
1093        ftp.close()
1094
1095
1096class MiscTestCase(TestCase):
1097    def test__all__(self):
1098        blacklist = {'MSG_OOB', 'FTP_PORT', 'MAXLINE', 'CRLF', 'B_CRLF',
1099                     'Error', 'parse150', 'parse227', 'parse229', 'parse257',
1100                     'print_line', 'ftpcp', 'test'}
1101        support.check__all__(self, ftplib, blacklist=blacklist)
1102
1103
1104def test_main():
1105    tests = [TestFTPClass, TestTimeouts,
1106             TestIPv6Environment,
1107             TestTLS_FTPClassMixin, TestTLS_FTPClass,
1108             MiscTestCase]
1109
1110    thread_info = support.threading_setup()
1111    try:
1112        support.run_unittest(*tests)
1113    finally:
1114        support.threading_cleanup(*thread_info)
1115
1116
1117if __name__ == '__main__':
1118    test_main()
1119