1"""Test script for ftplib module."""
2
3# Modified by Giampaolo Rodola' to test FTP class, IPv6 and TLS
4# environment
5
6import ftplib
7import asyncore
8import asynchat
9import socket
10import io
11import errno
12import os
13import threading
14import time
15try:
16    import ssl
17except ImportError:
18    ssl = None
19
20from unittest import TestCase, skipUnless
21from test import support
22from test.support import HOST, HOSTv6
23
24TIMEOUT = 3
25# the dummy data returned by server over the data channel when
26# RETR, LIST, NLST, MLSD commands are issued
27RETR_DATA = 'abcde12345\r\n' * 1000
28LIST_DATA = 'foo\r\nbar\r\n'
29NLST_DATA = 'foo\r\nbar\r\n'
30MLSD_DATA = ("type=cdir;perm=el;unique==keVO1+ZF4; test\r\n"
31             "type=pdir;perm=e;unique==keVO1+d?3; ..\r\n"
32             "type=OS.unix=slink:/foobar;perm=;unique==keVO1+4G4; foobar\r\n"
33             "type=OS.unix=chr-13/29;perm=;unique==keVO1+5G4; device\r\n"
34             "type=OS.unix=blk-11/108;perm=;unique==keVO1+6G4; block\r\n"
35             "type=file;perm=awr;unique==keVO1+8G4; writable\r\n"
36             "type=dir;perm=cpmel;unique==keVO1+7G4; promiscuous\r\n"
37             "type=dir;perm=;unique==keVO1+1t2; no-exec\r\n"
38             "type=file;perm=r;unique==keVO1+EG4; two words\r\n"
39             "type=file;perm=r;unique==keVO1+IH4;  leading space\r\n"
40             "type=file;perm=r;unique==keVO1+1G4; file1\r\n"
41             "type=dir;perm=cpmel;unique==keVO1+7G4; incoming\r\n"
42             "type=file;perm=r;unique==keVO1+1G4; file2\r\n"
43             "type=file;perm=r;unique==keVO1+1G4; file3\r\n"
44             "type=file;perm=r;unique==keVO1+1G4; file4\r\n")
45
46
47class DummyDTPHandler(asynchat.async_chat):
48    dtp_conn_closed = False
49
50    def __init__(self, conn, baseclass):
51        asynchat.async_chat.__init__(self, conn)
52        self.baseclass = baseclass
53        self.baseclass.last_received_data = ''
54
55    def handle_read(self):
56        self.baseclass.last_received_data += self.recv(1024).decode('ascii')
57
58    def handle_close(self):
59        # XXX: this method can be called many times in a row for a single
60        # connection, including in clear-text (non-TLS) mode.
61        # (behaviour witnessed with test_data_connection)
62        if not self.dtp_conn_closed:
63            self.baseclass.push('226 transfer complete')
64            self.close()
65            self.dtp_conn_closed = True
66
67    def push(self, what):
68        if self.baseclass.next_data is not None:
69            what = self.baseclass.next_data
70            self.baseclass.next_data = None
71        if not what:
72            return self.close_when_done()
73        super(DummyDTPHandler, self).push(what.encode('ascii'))
74
75    def handle_error(self):
76        raise Exception
77
78
79class DummyFTPHandler(asynchat.async_chat):
80
81    dtp_handler = DummyDTPHandler
82
83    def __init__(self, conn):
84        asynchat.async_chat.__init__(self, conn)
85        # tells the socket to handle urgent data inline (ABOR command)
86        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_OOBINLINE, 1)
87        self.set_terminator(b"\r\n")
88        self.in_buffer = []
89        self.dtp = None
90        self.last_received_cmd = None
91        self.last_received_data = ''
92        self.next_response = ''
93        self.next_data = None
94        self.rest = None
95        self.next_retr_data = RETR_DATA
96        self.push('220 welcome')
97        # We use this as the string IPv4 address to direct the client
98        # to in response to a PASV command.  To test security behavior.
99        # https://bugs.python.org/issue43285/.
100        self.fake_pasv_server_ip = '252.253.254.255'
101
102    def collect_incoming_data(self, data):
103        self.in_buffer.append(data)
104
105    def found_terminator(self):
106        line = b''.join(self.in_buffer).decode('ascii')
107        self.in_buffer = []
108        if self.next_response:
109            self.push(self.next_response)
110            self.next_response = ''
111        cmd = line.split(' ')[0].lower()
112        self.last_received_cmd = cmd
113        space = line.find(' ')
114        if space != -1:
115            arg = line[space + 1:]
116        else:
117            arg = ""
118        if hasattr(self, 'cmd_' + cmd):
119            method = getattr(self, 'cmd_' + cmd)
120            method(arg)
121        else:
122            self.push('550 command "%s" not understood.' %cmd)
123
124    def handle_error(self):
125        raise Exception
126
127    def push(self, data):
128        asynchat.async_chat.push(self, data.encode('ascii') + b'\r\n')
129
130    def cmd_port(self, arg):
131        addr = list(map(int, arg.split(',')))
132        ip = '%d.%d.%d.%d' %tuple(addr[:4])
133        port = (addr[4] * 256) + addr[5]
134        s = socket.create_connection((ip, port), timeout=TIMEOUT)
135        self.dtp = self.dtp_handler(s, baseclass=self)
136        self.push('200 active data connection established')
137
138    def cmd_pasv(self, arg):
139        with socket.socket() as sock:
140            sock.bind((self.socket.getsockname()[0], 0))
141            sock.listen()
142            sock.settimeout(TIMEOUT)
143            port = sock.getsockname()[1]
144            ip = self.fake_pasv_server_ip
145            ip = ip.replace('.', ','); p1 = port / 256; p2 = port % 256
146            self.push('227 entering passive mode (%s,%d,%d)' %(ip, p1, p2))
147            conn, addr = sock.accept()
148            self.dtp = self.dtp_handler(conn, baseclass=self)
149
150    def cmd_eprt(self, arg):
151        af, ip, port = arg.split(arg[0])[1:-1]
152        port = int(port)
153        s = socket.create_connection((ip, port), timeout=TIMEOUT)
154        self.dtp = self.dtp_handler(s, baseclass=self)
155        self.push('200 active data connection established')
156
157    def cmd_epsv(self, arg):
158        with socket.socket(socket.AF_INET6) as sock:
159            sock.bind((self.socket.getsockname()[0], 0))
160            sock.listen()
161            sock.settimeout(TIMEOUT)
162            port = sock.getsockname()[1]
163            self.push('229 entering extended passive mode (|||%d|)' %port)
164            conn, addr = sock.accept()
165            self.dtp = self.dtp_handler(conn, baseclass=self)
166
167    def cmd_echo(self, arg):
168        # sends back the received string (used by the test suite)
169        self.push(arg)
170
171    def cmd_noop(self, arg):
172        self.push('200 noop ok')
173
174    def cmd_user(self, arg):
175        self.push('331 username ok')
176
177    def cmd_pass(self, arg):
178        self.push('230 password ok')
179
180    def cmd_acct(self, arg):
181        self.push('230 acct ok')
182
183    def cmd_rnfr(self, arg):
184        self.push('350 rnfr ok')
185
186    def cmd_rnto(self, arg):
187        self.push('250 rnto ok')
188
189    def cmd_dele(self, arg):
190        self.push('250 dele ok')
191
192    def cmd_cwd(self, arg):
193        self.push('250 cwd ok')
194
195    def cmd_size(self, arg):
196        self.push('250 1000')
197
198    def cmd_mkd(self, arg):
199        self.push('257 "%s"' %arg)
200
201    def cmd_rmd(self, arg):
202        self.push('250 rmd ok')
203
204    def cmd_pwd(self, arg):
205        self.push('257 "pwd ok"')
206
207    def cmd_type(self, arg):
208        self.push('200 type ok')
209
210    def cmd_quit(self, arg):
211        self.push('221 quit ok')
212        self.close()
213
214    def cmd_abor(self, arg):
215        self.push('226 abor ok')
216
217    def cmd_stor(self, arg):
218        self.push('125 stor ok')
219
220    def cmd_rest(self, arg):
221        self.rest = arg
222        self.push('350 rest ok')
223
224    def cmd_retr(self, arg):
225        self.push('125 retr ok')
226        if self.rest is not None:
227            offset = int(self.rest)
228        else:
229            offset = 0
230        self.dtp.push(self.next_retr_data[offset:])
231        self.dtp.close_when_done()
232        self.rest = None
233
234    def cmd_list(self, arg):
235        self.push('125 list ok')
236        self.dtp.push(LIST_DATA)
237        self.dtp.close_when_done()
238
239    def cmd_nlst(self, arg):
240        self.push('125 nlst ok')
241        self.dtp.push(NLST_DATA)
242        self.dtp.close_when_done()
243
244    def cmd_opts(self, arg):
245        self.push('200 opts ok')
246
247    def cmd_mlsd(self, arg):
248        self.push('125 mlsd ok')
249        self.dtp.push(MLSD_DATA)
250        self.dtp.close_when_done()
251
252    def cmd_setlongretr(self, arg):
253        # For testing. Next RETR will return long line.
254        self.next_retr_data = 'x' * int(arg)
255        self.push('125 setlongretr ok')
256
257
258class DummyFTPServer(asyncore.dispatcher, threading.Thread):
259
260    handler = DummyFTPHandler
261
262    def __init__(self, address, af=socket.AF_INET):
263        threading.Thread.__init__(self)
264        asyncore.dispatcher.__init__(self)
265        self.daemon = True
266        self.create_socket(af, socket.SOCK_STREAM)
267        self.bind(address)
268        self.listen(5)
269        self.active = False
270        self.active_lock = threading.Lock()
271        self.host, self.port = self.socket.getsockname()[:2]
272        self.handler_instance = None
273
274    def start(self):
275        assert not self.active
276        self.__flag = threading.Event()
277        threading.Thread.start(self)
278        self.__flag.wait()
279
280    def run(self):
281        self.active = True
282        self.__flag.set()
283        while self.active and asyncore.socket_map:
284            self.active_lock.acquire()
285            asyncore.loop(timeout=0.1, count=1)
286            self.active_lock.release()
287        asyncore.close_all(ignore_all=True)
288
289    def stop(self):
290        assert self.active
291        self.active = False
292        self.join()
293
294    def handle_accepted(self, conn, addr):
295        self.handler_instance = self.handler(conn)
296
297    def handle_connect(self):
298        self.close()
299    handle_read = handle_connect
300
301    def writable(self):
302        return 0
303
304    def handle_error(self):
305        raise Exception
306
307
308if ssl is not None:
309
310    CERTFILE = os.path.join(os.path.dirname(__file__), "keycert3.pem")
311    CAFILE = os.path.join(os.path.dirname(__file__), "pycacert.pem")
312
313    class SSLConnection(asyncore.dispatcher):
314        """An asyncore.dispatcher subclass supporting TLS/SSL."""
315
316        _ssl_accepting = False
317        _ssl_closing = False
318
319        def secure_connection(self):
320            context = ssl.SSLContext()
321            context.load_cert_chain(CERTFILE)
322            socket = context.wrap_socket(self.socket,
323                                         suppress_ragged_eofs=False,
324                                         server_side=True,
325                                         do_handshake_on_connect=False)
326            self.del_channel()
327            self.set_socket(socket)
328            self._ssl_accepting = True
329
330        def _do_ssl_handshake(self):
331            try:
332                self.socket.do_handshake()
333            except ssl.SSLError as err:
334                if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
335                                   ssl.SSL_ERROR_WANT_WRITE):
336                    return
337                elif err.args[0] == ssl.SSL_ERROR_EOF:
338                    return self.handle_close()
339                # TODO: SSLError does not expose alert information
340                elif "SSLV3_ALERT_BAD_CERTIFICATE" in err.args[1]:
341                    return self.handle_close()
342                raise
343            except OSError as err:
344                if err.args[0] == errno.ECONNABORTED:
345                    return self.handle_close()
346            else:
347                self._ssl_accepting = False
348
349        def _do_ssl_shutdown(self):
350            self._ssl_closing = True
351            try:
352                self.socket = self.socket.unwrap()
353            except ssl.SSLError as err:
354                if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
355                                   ssl.SSL_ERROR_WANT_WRITE):
356                    return
357            except OSError as err:
358                # Any "socket error" corresponds to a SSL_ERROR_SYSCALL return
359                # from OpenSSL's SSL_shutdown(), corresponding to a
360                # closed socket condition. See also:
361                # http://www.mail-archive.com/openssl-users@openssl.org/msg60710.html
362                pass
363            self._ssl_closing = False
364            if getattr(self, '_ccc', False) is False:
365                super(SSLConnection, self).close()
366            else:
367                pass
368
369        def handle_read_event(self):
370            if self._ssl_accepting:
371                self._do_ssl_handshake()
372            elif self._ssl_closing:
373                self._do_ssl_shutdown()
374            else:
375                super(SSLConnection, self).handle_read_event()
376
377        def handle_write_event(self):
378            if self._ssl_accepting:
379                self._do_ssl_handshake()
380            elif self._ssl_closing:
381                self._do_ssl_shutdown()
382            else:
383                super(SSLConnection, self).handle_write_event()
384
385        def send(self, data):
386            try:
387                return super(SSLConnection, self).send(data)
388            except ssl.SSLError as err:
389                if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN,
390                                   ssl.SSL_ERROR_WANT_READ,
391                                   ssl.SSL_ERROR_WANT_WRITE):
392                    return 0
393                raise
394
395        def recv(self, buffer_size):
396            try:
397                return super(SSLConnection, self).recv(buffer_size)
398            except ssl.SSLError as err:
399                if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
400                                   ssl.SSL_ERROR_WANT_WRITE):
401                    return b''
402                if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN):
403                    self.handle_close()
404                    return b''
405                raise
406
407        def handle_error(self):
408            raise Exception
409
410        def close(self):
411            if (isinstance(self.socket, ssl.SSLSocket) and
412                    self.socket._sslobj is not None):
413                self._do_ssl_shutdown()
414            else:
415                super(SSLConnection, self).close()
416
417
418    class DummyTLS_DTPHandler(SSLConnection, DummyDTPHandler):
419        """A DummyDTPHandler subclass supporting TLS/SSL."""
420
421        def __init__(self, conn, baseclass):
422            DummyDTPHandler.__init__(self, conn, baseclass)
423            if self.baseclass.secure_data_channel:
424                self.secure_connection()
425
426
427    class DummyTLS_FTPHandler(SSLConnection, DummyFTPHandler):
428        """A DummyFTPHandler subclass supporting TLS/SSL."""
429
430        dtp_handler = DummyTLS_DTPHandler
431
432        def __init__(self, conn):
433            DummyFTPHandler.__init__(self, conn)
434            self.secure_data_channel = False
435            self._ccc = False
436
437        def cmd_auth(self, line):
438            """Set up secure control channel."""
439            self.push('234 AUTH TLS successful')
440            self.secure_connection()
441
442        def cmd_ccc(self, line):
443            self.push('220 Reverting back to clear-text')
444            self._ccc = True
445            self._do_ssl_shutdown()
446
447        def cmd_pbsz(self, line):
448            """Negotiate size of buffer for secure data transfer.
449            For TLS/SSL the only valid value for the parameter is '0'.
450            Any other value is accepted but ignored.
451            """
452            self.push('200 PBSZ=0 successful.')
453
454        def cmd_prot(self, line):
455            """Setup un/secure data channel."""
456            arg = line.upper()
457            if arg == 'C':
458                self.push('200 Protection set to Clear')
459                self.secure_data_channel = False
460            elif arg == 'P':
461                self.push('200 Protection set to Private')
462                self.secure_data_channel = True
463            else:
464                self.push("502 Unrecognized PROT type (use C or P).")
465
466
467    class DummyTLS_FTPServer(DummyFTPServer):
468        handler = DummyTLS_FTPHandler
469
470
471class TestFTPClass(TestCase):
472
473    def setUp(self):
474        self.server = DummyFTPServer((HOST, 0))
475        self.server.start()
476        self.client = ftplib.FTP(timeout=TIMEOUT)
477        self.client.connect(self.server.host, self.server.port)
478
479    def tearDown(self):
480        self.client.close()
481        self.server.stop()
482        # Explicitly clear the attribute to prevent dangling thread
483        self.server = None
484        asyncore.close_all(ignore_all=True)
485
486    def check_data(self, received, expected):
487        self.assertEqual(len(received), len(expected))
488        self.assertEqual(received, expected)
489
490    def test_getwelcome(self):
491        self.assertEqual(self.client.getwelcome(), '220 welcome')
492
493    def test_sanitize(self):
494        self.assertEqual(self.client.sanitize('foo'), repr('foo'))
495        self.assertEqual(self.client.sanitize('pass 12345'), repr('pass *****'))
496        self.assertEqual(self.client.sanitize('PASS 12345'), repr('PASS *****'))
497
498    def test_exceptions(self):
499        self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\r\n0')
500        self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\n0')
501        self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\r0')
502        self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 400')
503        self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 499')
504        self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 500')
505        self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 599')
506        self.assertRaises(ftplib.error_proto, self.client.sendcmd, 'echo 999')
507
508    def test_all_errors(self):
509        exceptions = (ftplib.error_reply, ftplib.error_temp, ftplib.error_perm,
510                      ftplib.error_proto, ftplib.Error, OSError,
511                      EOFError)
512        for x in exceptions:
513            try:
514                raise x('exception not included in all_errors set')
515            except ftplib.all_errors:
516                pass
517
518    def test_set_pasv(self):
519        # passive mode is supposed to be enabled by default
520        self.assertTrue(self.client.passiveserver)
521        self.client.set_pasv(True)
522        self.assertTrue(self.client.passiveserver)
523        self.client.set_pasv(False)
524        self.assertFalse(self.client.passiveserver)
525
526    def test_voidcmd(self):
527        self.client.voidcmd('echo 200')
528        self.client.voidcmd('echo 299')
529        self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 199')
530        self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 300')
531
532    def test_login(self):
533        self.client.login()
534
535    def test_acct(self):
536        self.client.acct('passwd')
537
538    def test_rename(self):
539        self.client.rename('a', 'b')
540        self.server.handler_instance.next_response = '200'
541        self.assertRaises(ftplib.error_reply, self.client.rename, 'a', 'b')
542
543    def test_delete(self):
544        self.client.delete('foo')
545        self.server.handler_instance.next_response = '199'
546        self.assertRaises(ftplib.error_reply, self.client.delete, 'foo')
547
548    def test_size(self):
549        self.client.size('foo')
550
551    def test_mkd(self):
552        dir = self.client.mkd('/foo')
553        self.assertEqual(dir, '/foo')
554
555    def test_rmd(self):
556        self.client.rmd('foo')
557
558    def test_cwd(self):
559        dir = self.client.cwd('/foo')
560        self.assertEqual(dir, '250 cwd ok')
561
562    def test_pwd(self):
563        dir = self.client.pwd()
564        self.assertEqual(dir, 'pwd ok')
565
566    def test_quit(self):
567        self.assertEqual(self.client.quit(), '221 quit ok')
568        # Ensure the connection gets closed; sock attribute should be None
569        self.assertEqual(self.client.sock, None)
570
571    def test_abort(self):
572        self.client.abort()
573
574    def test_retrbinary(self):
575        def callback(data):
576            received.append(data.decode('ascii'))
577        received = []
578        self.client.retrbinary('retr', callback)
579        self.check_data(''.join(received), RETR_DATA)
580
581    def test_retrbinary_rest(self):
582        def callback(data):
583            received.append(data.decode('ascii'))
584        for rest in (0, 10, 20):
585            received = []
586            self.client.retrbinary('retr', callback, rest=rest)
587            self.check_data(''.join(received), RETR_DATA[rest:])
588
589    def test_retrlines(self):
590        received = []
591        self.client.retrlines('retr', received.append)
592        self.check_data(''.join(received), RETR_DATA.replace('\r\n', ''))
593
594    def test_storbinary(self):
595        f = io.BytesIO(RETR_DATA.encode('ascii'))
596        self.client.storbinary('stor', f)
597        self.check_data(self.server.handler_instance.last_received_data, RETR_DATA)
598        # test new callback arg
599        flag = []
600        f.seek(0)
601        self.client.storbinary('stor', f, callback=lambda x: flag.append(None))
602        self.assertTrue(flag)
603
604    def test_storbinary_rest(self):
605        f = io.BytesIO(RETR_DATA.replace('\r\n', '\n').encode('ascii'))
606        for r in (30, '30'):
607            f.seek(0)
608            self.client.storbinary('stor', f, rest=r)
609            self.assertEqual(self.server.handler_instance.rest, str(r))
610
611    def test_storlines(self):
612        f = io.BytesIO(RETR_DATA.replace('\r\n', '\n').encode('ascii'))
613        self.client.storlines('stor', f)
614        self.check_data(self.server.handler_instance.last_received_data, RETR_DATA)
615        # test new callback arg
616        flag = []
617        f.seek(0)
618        self.client.storlines('stor foo', f, callback=lambda x: flag.append(None))
619        self.assertTrue(flag)
620
621        f = io.StringIO(RETR_DATA.replace('\r\n', '\n'))
622        # storlines() expects a binary file, not a text file
623        with support.check_warnings(('', BytesWarning), quiet=True):
624            self.assertRaises(TypeError, self.client.storlines, 'stor foo', f)
625
626    def test_nlst(self):
627        self.client.nlst()
628        self.assertEqual(self.client.nlst(), NLST_DATA.split('\r\n')[:-1])
629
630    def test_dir(self):
631        l = []
632        self.client.dir(lambda x: l.append(x))
633        self.assertEqual(''.join(l), LIST_DATA.replace('\r\n', ''))
634
635    def test_mlsd(self):
636        list(self.client.mlsd())
637        list(self.client.mlsd(path='/'))
638        list(self.client.mlsd(path='/', facts=['size', 'type']))
639
640        ls = list(self.client.mlsd())
641        for name, facts in ls:
642            self.assertIsInstance(name, str)
643            self.assertIsInstance(facts, dict)
644            self.assertTrue(name)
645            self.assertIn('type', facts)
646            self.assertIn('perm', facts)
647            self.assertIn('unique', facts)
648
649        def set_data(data):
650            self.server.handler_instance.next_data = data
651
652        def test_entry(line, type=None, perm=None, unique=None, name=None):
653            type = 'type' if type is None else type
654            perm = 'perm' if perm is None else perm
655            unique = 'unique' if unique is None else unique
656            name = 'name' if name is None else name
657            set_data(line)
658            _name, facts = next(self.client.mlsd())
659            self.assertEqual(_name, name)
660            self.assertEqual(facts['type'], type)
661            self.assertEqual(facts['perm'], perm)
662            self.assertEqual(facts['unique'], unique)
663
664        # plain
665        test_entry('type=type;perm=perm;unique=unique; name\r\n')
666        # "=" in fact value
667        test_entry('type=ty=pe;perm=perm;unique=unique; name\r\n', type="ty=pe")
668        test_entry('type==type;perm=perm;unique=unique; name\r\n', type="=type")
669        test_entry('type=t=y=pe;perm=perm;unique=unique; name\r\n', type="t=y=pe")
670        test_entry('type=====;perm=perm;unique=unique; name\r\n', type="====")
671        # spaces in name
672        test_entry('type=type;perm=perm;unique=unique; na me\r\n', name="na me")
673        test_entry('type=type;perm=perm;unique=unique; name \r\n', name="name ")
674        test_entry('type=type;perm=perm;unique=unique;  name\r\n', name=" name")
675        test_entry('type=type;perm=perm;unique=unique; n am  e\r\n', name="n am  e")
676        # ";" in name
677        test_entry('type=type;perm=perm;unique=unique; na;me\r\n', name="na;me")
678        test_entry('type=type;perm=perm;unique=unique; ;name\r\n', name=";name")
679        test_entry('type=type;perm=perm;unique=unique; ;name;\r\n', name=";name;")
680        test_entry('type=type;perm=perm;unique=unique; ;;;;\r\n', name=";;;;")
681        # case sensitiveness
682        set_data('Type=type;TyPe=perm;UNIQUE=unique; name\r\n')
683        _name, facts = next(self.client.mlsd())
684        for x in facts:
685            self.assertTrue(x.islower())
686        # no data (directory empty)
687        set_data('')
688        self.assertRaises(StopIteration, next, self.client.mlsd())
689        set_data('')
690        for x in self.client.mlsd():
691            self.fail("unexpected data %s" % x)
692
693    def test_makeport(self):
694        with self.client.makeport():
695            # IPv4 is in use, just make sure send_eprt has not been used
696            self.assertEqual(self.server.handler_instance.last_received_cmd,
697                                'port')
698
699    def test_makepasv(self):
700        host, port = self.client.makepasv()
701        conn = socket.create_connection((host, port), timeout=TIMEOUT)
702        conn.close()
703        # IPv4 is in use, just make sure send_epsv has not been used
704        self.assertEqual(self.server.handler_instance.last_received_cmd, 'pasv')
705
706    def test_makepasv_issue43285_security_disabled(self):
707        """Test the opt-in to the old vulnerable behavior."""
708        self.client.trust_server_pasv_ipv4_address = True
709        bad_host, port = self.client.makepasv()
710        self.assertEqual(
711                bad_host, self.server.handler_instance.fake_pasv_server_ip)
712        # Opening and closing a connection keeps the dummy server happy
713        # instead of timing out on accept.
714        socket.create_connection((self.client.sock.getpeername()[0], port),
715                                 timeout=TIMEOUT).close()
716
717    def test_makepasv_issue43285_security_enabled_default(self):
718        self.assertFalse(self.client.trust_server_pasv_ipv4_address)
719        trusted_host, port = self.client.makepasv()
720        self.assertNotEqual(
721                trusted_host, self.server.handler_instance.fake_pasv_server_ip)
722        # Opening and closing a connection keeps the dummy server happy
723        # instead of timing out on accept.
724        socket.create_connection((trusted_host, port), timeout=TIMEOUT).close()
725
726    def test_with_statement(self):
727        self.client.quit()
728
729        def is_client_connected():
730            if self.client.sock is None:
731                return False
732            try:
733                self.client.sendcmd('noop')
734            except (OSError, EOFError):
735                return False
736            return True
737
738        # base test
739        with ftplib.FTP(timeout=TIMEOUT) as self.client:
740            self.client.connect(self.server.host, self.server.port)
741            self.client.sendcmd('noop')
742            self.assertTrue(is_client_connected())
743        self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
744        self.assertFalse(is_client_connected())
745
746        # QUIT sent inside the with block
747        with ftplib.FTP(timeout=TIMEOUT) as self.client:
748            self.client.connect(self.server.host, self.server.port)
749            self.client.sendcmd('noop')
750            self.client.quit()
751        self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
752        self.assertFalse(is_client_connected())
753
754        # force a wrong response code to be sent on QUIT: error_perm
755        # is expected and the connection is supposed to be closed
756        try:
757            with ftplib.FTP(timeout=TIMEOUT) as self.client:
758                self.client.connect(self.server.host, self.server.port)
759                self.client.sendcmd('noop')
760                self.server.handler_instance.next_response = '550 error on quit'
761        except ftplib.error_perm as err:
762            self.assertEqual(str(err), '550 error on quit')
763        else:
764            self.fail('Exception not raised')
765        # needed to give the threaded server some time to set the attribute
766        # which otherwise would still be == 'noop'
767        time.sleep(0.1)
768        self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
769        self.assertFalse(is_client_connected())
770
771    def test_source_address(self):
772        self.client.quit()
773        port = support.find_unused_port()
774        try:
775            self.client.connect(self.server.host, self.server.port,
776                                source_address=(HOST, port))
777            self.assertEqual(self.client.sock.getsockname()[1], port)
778            self.client.quit()
779        except OSError as e:
780            if e.errno == errno.EADDRINUSE:
781                self.skipTest("couldn't bind to port %d" % port)
782            raise
783
784    def test_source_address_passive_connection(self):
785        port = support.find_unused_port()
786        self.client.source_address = (HOST, port)
787        try:
788            with self.client.transfercmd('list') as sock:
789                self.assertEqual(sock.getsockname()[1], port)
790        except OSError as e:
791            if e.errno == errno.EADDRINUSE:
792                self.skipTest("couldn't bind to port %d" % port)
793            raise
794
795    def test_parse257(self):
796        self.assertEqual(ftplib.parse257('257 "/foo/bar"'), '/foo/bar')
797        self.assertEqual(ftplib.parse257('257 "/foo/bar" created'), '/foo/bar')
798        self.assertEqual(ftplib.parse257('257 ""'), '')
799        self.assertEqual(ftplib.parse257('257 "" created'), '')
800        self.assertRaises(ftplib.error_reply, ftplib.parse257, '250 "/foo/bar"')
801        # The 257 response is supposed to include the directory
802        # name and in case it contains embedded double-quotes
803        # they must be doubled (see RFC-959, chapter 7, appendix 2).
804        self.assertEqual(ftplib.parse257('257 "/foo/b""ar"'), '/foo/b"ar')
805        self.assertEqual(ftplib.parse257('257 "/foo/b""ar" created'), '/foo/b"ar')
806
807    def test_line_too_long(self):
808        self.assertRaises(ftplib.Error, self.client.sendcmd,
809                          'x' * self.client.maxline * 2)
810
811    def test_retrlines_too_long(self):
812        self.client.sendcmd('SETLONGRETR %d' % (self.client.maxline * 2))
813        received = []
814        self.assertRaises(ftplib.Error,
815                          self.client.retrlines, 'retr', received.append)
816
817    def test_storlines_too_long(self):
818        f = io.BytesIO(b'x' * self.client.maxline * 2)
819        self.assertRaises(ftplib.Error, self.client.storlines, 'stor', f)
820
821
822@skipUnless(support.IPV6_ENABLED, "IPv6 not enabled")
823class TestIPv6Environment(TestCase):
824
825    def setUp(self):
826        self.server = DummyFTPServer((HOSTv6, 0), af=socket.AF_INET6)
827        self.server.start()
828        self.client = ftplib.FTP(timeout=TIMEOUT)
829        self.client.connect(self.server.host, self.server.port)
830
831    def tearDown(self):
832        self.client.close()
833        self.server.stop()
834        # Explicitly clear the attribute to prevent dangling thread
835        self.server = None
836        asyncore.close_all(ignore_all=True)
837
838    def test_af(self):
839        self.assertEqual(self.client.af, socket.AF_INET6)
840
841    def test_makeport(self):
842        with self.client.makeport():
843            self.assertEqual(self.server.handler_instance.last_received_cmd,
844                                'eprt')
845
846    def test_makepasv(self):
847        host, port = self.client.makepasv()
848        conn = socket.create_connection((host, port), timeout=TIMEOUT)
849        conn.close()
850        self.assertEqual(self.server.handler_instance.last_received_cmd, 'epsv')
851
852    def test_transfer(self):
853        def retr():
854            def callback(data):
855                received.append(data.decode('ascii'))
856            received = []
857            self.client.retrbinary('retr', callback)
858            self.assertEqual(len(''.join(received)), len(RETR_DATA))
859            self.assertEqual(''.join(received), RETR_DATA)
860        self.client.set_pasv(True)
861        retr()
862        self.client.set_pasv(False)
863        retr()
864
865
866@skipUnless(ssl, "SSL not available")
867class TestTLS_FTPClassMixin(TestFTPClass):
868    """Repeat TestFTPClass tests starting the TLS layer for both control
869    and data connections first.
870    """
871
872    def setUp(self):
873        self.server = DummyTLS_FTPServer((HOST, 0))
874        self.server.start()
875        self.client = ftplib.FTP_TLS(timeout=TIMEOUT)
876        self.client.connect(self.server.host, self.server.port)
877        # enable TLS
878        self.client.auth()
879        self.client.prot_p()
880
881
882@skipUnless(ssl, "SSL not available")
883class TestTLS_FTPClass(TestCase):
884    """Specific TLS_FTP class tests."""
885
886    def setUp(self):
887        self.server = DummyTLS_FTPServer((HOST, 0))
888        self.server.start()
889        self.client = ftplib.FTP_TLS(timeout=TIMEOUT)
890        self.client.connect(self.server.host, self.server.port)
891
892    def tearDown(self):
893        self.client.close()
894        self.server.stop()
895        # Explicitly clear the attribute to prevent dangling thread
896        self.server = None
897        asyncore.close_all(ignore_all=True)
898
899    def test_control_connection(self):
900        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
901        self.client.auth()
902        self.assertIsInstance(self.client.sock, ssl.SSLSocket)
903
904    def test_data_connection(self):
905        # clear text
906        with self.client.transfercmd('list') as sock:
907            self.assertNotIsInstance(sock, ssl.SSLSocket)
908            self.assertEqual(sock.recv(1024), LIST_DATA.encode('ascii'))
909        self.assertEqual(self.client.voidresp(), "226 transfer complete")
910
911        # secured, after PROT P
912        self.client.prot_p()
913        with self.client.transfercmd('list') as sock:
914            self.assertIsInstance(sock, ssl.SSLSocket)
915            # consume from SSL socket to finalize handshake and avoid
916            # "SSLError [SSL] shutdown while in init"
917            self.assertEqual(sock.recv(1024), LIST_DATA.encode('ascii'))
918        self.assertEqual(self.client.voidresp(), "226 transfer complete")
919
920        # PROT C is issued, the connection must be in cleartext again
921        self.client.prot_c()
922        with self.client.transfercmd('list') as sock:
923            self.assertNotIsInstance(sock, ssl.SSLSocket)
924            self.assertEqual(sock.recv(1024), LIST_DATA.encode('ascii'))
925        self.assertEqual(self.client.voidresp(), "226 transfer complete")
926
927    def test_login(self):
928        # login() is supposed to implicitly secure the control connection
929        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
930        self.client.login()
931        self.assertIsInstance(self.client.sock, ssl.SSLSocket)
932        # make sure that AUTH TLS doesn't get issued again
933        self.client.login()
934
935    def test_auth_issued_twice(self):
936        self.client.auth()
937        self.assertRaises(ValueError, self.client.auth)
938
939    def test_context(self):
940        self.client.quit()
941        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
942        ctx.check_hostname = False
943        ctx.verify_mode = ssl.CERT_NONE
944        self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE,
945                          context=ctx)
946        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
947                          context=ctx)
948        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
949                          keyfile=CERTFILE, context=ctx)
950
951        self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
952        self.client.connect(self.server.host, self.server.port)
953        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
954        self.client.auth()
955        self.assertIs(self.client.sock.context, ctx)
956        self.assertIsInstance(self.client.sock, ssl.SSLSocket)
957
958        self.client.prot_p()
959        with self.client.transfercmd('list') as sock:
960            self.assertIs(sock.context, ctx)
961            self.assertIsInstance(sock, ssl.SSLSocket)
962
963    def test_ccc(self):
964        self.assertRaises(ValueError, self.client.ccc)
965        self.client.login(secure=True)
966        self.assertIsInstance(self.client.sock, ssl.SSLSocket)
967        self.client.ccc()
968        self.assertRaises(ValueError, self.client.sock.unwrap)
969
970    @skipUnless(False, "FIXME: bpo-32706")
971    def test_check_hostname(self):
972        self.client.quit()
973        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
974        self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
975        self.assertEqual(ctx.check_hostname, True)
976        ctx.load_verify_locations(CAFILE)
977        self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
978
979        # 127.0.0.1 doesn't match SAN
980        self.client.connect(self.server.host, self.server.port)
981        with self.assertRaises(ssl.CertificateError):
982            self.client.auth()
983        # exception quits connection
984
985        self.client.connect(self.server.host, self.server.port)
986        self.client.prot_p()
987        with self.assertRaises(ssl.CertificateError):
988            with self.client.transfercmd("list") as sock:
989                pass
990        self.client.quit()
991
992        self.client.connect("localhost", self.server.port)
993        self.client.auth()
994        self.client.quit()
995
996        self.client.connect("localhost", self.server.port)
997        self.client.prot_p()
998        with self.client.transfercmd("list") as sock:
999            pass
1000
1001
1002class TestTimeouts(TestCase):
1003
1004    def setUp(self):
1005        self.evt = threading.Event()
1006        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1007        self.sock.settimeout(20)
1008        self.port = support.bind_port(self.sock)
1009        self.server_thread = threading.Thread(target=self.server)
1010        self.server_thread.daemon = True
1011        self.server_thread.start()
1012        # Wait for the server to be ready.
1013        self.evt.wait()
1014        self.evt.clear()
1015        self.old_port = ftplib.FTP.port
1016        ftplib.FTP.port = self.port
1017
1018    def tearDown(self):
1019        ftplib.FTP.port = self.old_port
1020        self.server_thread.join()
1021        # Explicitly clear the attribute to prevent dangling thread
1022        self.server_thread = None
1023
1024    def server(self):
1025        # This method sets the evt 3 times:
1026        #  1) when the connection is ready to be accepted.
1027        #  2) when it is safe for the caller to close the connection
1028        #  3) when we have closed the socket
1029        self.sock.listen()
1030        # (1) Signal the caller that we are ready to accept the connection.
1031        self.evt.set()
1032        try:
1033            conn, addr = self.sock.accept()
1034        except socket.timeout:
1035            pass
1036        else:
1037            conn.sendall(b"1 Hola mundo\n")
1038            conn.shutdown(socket.SHUT_WR)
1039            # (2) Signal the caller that it is safe to close the socket.
1040            self.evt.set()
1041            conn.close()
1042        finally:
1043            self.sock.close()
1044
1045    def testTimeoutDefault(self):
1046        # default -- use global socket timeout
1047        self.assertIsNone(socket.getdefaulttimeout())
1048        socket.setdefaulttimeout(30)
1049        try:
1050            ftp = ftplib.FTP(HOST)
1051        finally:
1052            socket.setdefaulttimeout(None)
1053        self.assertEqual(ftp.sock.gettimeout(), 30)
1054        self.evt.wait()
1055        ftp.close()
1056
1057    def testTimeoutNone(self):
1058        # no timeout -- do not use global socket timeout
1059        self.assertIsNone(socket.getdefaulttimeout())
1060        socket.setdefaulttimeout(30)
1061        try:
1062            ftp = ftplib.FTP(HOST, timeout=None)
1063        finally:
1064            socket.setdefaulttimeout(None)
1065        self.assertIsNone(ftp.sock.gettimeout())
1066        self.evt.wait()
1067        ftp.close()
1068
1069    def testTimeoutValue(self):
1070        # a value
1071        ftp = ftplib.FTP(HOST, timeout=30)
1072        self.assertEqual(ftp.sock.gettimeout(), 30)
1073        self.evt.wait()
1074        ftp.close()
1075
1076    def testTimeoutConnect(self):
1077        ftp = ftplib.FTP()
1078        ftp.connect(HOST, timeout=30)
1079        self.assertEqual(ftp.sock.gettimeout(), 30)
1080        self.evt.wait()
1081        ftp.close()
1082
1083    def testTimeoutDifferentOrder(self):
1084        ftp = ftplib.FTP(timeout=30)
1085        ftp.connect(HOST)
1086        self.assertEqual(ftp.sock.gettimeout(), 30)
1087        self.evt.wait()
1088        ftp.close()
1089
1090    def testTimeoutDirectAccess(self):
1091        ftp = ftplib.FTP()
1092        ftp.timeout = 30
1093        ftp.connect(HOST)
1094        self.assertEqual(ftp.sock.gettimeout(), 30)
1095        self.evt.wait()
1096        ftp.close()
1097
1098
1099class MiscTestCase(TestCase):
1100    def test__all__(self):
1101        blacklist = {'MSG_OOB', 'FTP_PORT', 'MAXLINE', 'CRLF', 'B_CRLF',
1102                     'Error', 'parse150', 'parse227', 'parse229', 'parse257',
1103                     'print_line', 'ftpcp', 'test'}
1104        support.check__all__(self, ftplib, blacklist=blacklist)
1105
1106
1107def test_main():
1108    tests = [TestFTPClass, TestTimeouts,
1109             TestIPv6Environment,
1110             TestTLS_FTPClassMixin, TestTLS_FTPClass,
1111             MiscTestCase]
1112
1113    thread_info = support.threading_setup()
1114    try:
1115        support.run_unittest(*tests)
1116    finally:
1117        support.threading_cleanup(*thread_info)
1118
1119
1120if __name__ == '__main__':
1121    test_main()
1122