1"""Test script for ftplib module."""
2
3# Modified by Giampaolo Rodola' to test FTP class, IPv6 and TLS
4# environment
5
6import ftplib
7import asyncore
8import asynchat
9import socket
10import StringIO
11import errno
12import os
13try:
14    import ssl
15except ImportError:
16    ssl = None
17
18from unittest import TestCase, SkipTest, skipUnless
19from test import test_support
20from test.test_support import HOST, HOSTv6
21threading = test_support.import_module('threading')
22
23TIMEOUT = 3
24# the dummy data returned by server over the data channel when
25# RETR, LIST and NLST commands are issued
26RETR_DATA = 'abcde12345\r\n' * 1000
27LIST_DATA = 'foo\r\nbar\r\n'
28NLST_DATA = 'foo\r\nbar\r\n'
29
30
31class DummyDTPHandler(asynchat.async_chat):
32    dtp_conn_closed = False
33
34    def __init__(self, conn, baseclass):
35        asynchat.async_chat.__init__(self, conn)
36        self.baseclass = baseclass
37        self.baseclass.last_received_data = ''
38
39    def handle_read(self):
40        self.baseclass.last_received_data += self.recv(1024)
41
42    def handle_close(self):
43        # XXX: this method can be called many times in a row for a single
44        # connection, including in clear-text (non-TLS) mode.
45        # (behaviour witnessed with test_data_connection)
46        if not self.dtp_conn_closed:
47            self.baseclass.push('226 transfer complete')
48            self.close()
49            self.dtp_conn_closed = True
50
51    def handle_error(self):
52        raise
53
54
55class DummyFTPHandler(asynchat.async_chat):
56
57    dtp_handler = DummyDTPHandler
58
59    def __init__(self, conn):
60        asynchat.async_chat.__init__(self, conn)
61        self.set_terminator("\r\n")
62        self.in_buffer = []
63        self.dtp = None
64        self.last_received_cmd = None
65        self.last_received_data = ''
66        self.next_response = ''
67        self.rest = None
68        self.next_retr_data = RETR_DATA
69        self.push('220 welcome')
70
71    def collect_incoming_data(self, data):
72        self.in_buffer.append(data)
73
74    def found_terminator(self):
75        line = ''.join(self.in_buffer)
76        self.in_buffer = []
77        if self.next_response:
78            self.push(self.next_response)
79            self.next_response = ''
80        cmd = line.split(' ')[0].lower()
81        self.last_received_cmd = cmd
82        space = line.find(' ')
83        if space != -1:
84            arg = line[space + 1:]
85        else:
86            arg = ""
87        if hasattr(self, 'cmd_' + cmd):
88            method = getattr(self, 'cmd_' + cmd)
89            method(arg)
90        else:
91            self.push('550 command "%s" not understood.' %cmd)
92
93    def handle_error(self):
94        raise
95
96    def push(self, data):
97        asynchat.async_chat.push(self, data + '\r\n')
98
99    def cmd_port(self, arg):
100        addr = map(int, arg.split(','))
101        ip = '%d.%d.%d.%d' %tuple(addr[:4])
102        port = (addr[4] * 256) + addr[5]
103        s = socket.create_connection((ip, port), timeout=10)
104        self.dtp = self.dtp_handler(s, baseclass=self)
105        self.push('200 active data connection established')
106
107    def cmd_pasv(self, arg):
108        sock = socket.socket()
109        sock.bind((self.socket.getsockname()[0], 0))
110        sock.listen(5)
111        sock.settimeout(10)
112        ip, port = sock.getsockname()[:2]
113        ip = ip.replace('.', ',')
114        p1, p2 = divmod(port, 256)
115        self.push('227 entering passive mode (%s,%d,%d)' %(ip, p1, p2))
116        conn, addr = sock.accept()
117        self.dtp = self.dtp_handler(conn, baseclass=self)
118
119    def cmd_eprt(self, arg):
120        af, ip, port = arg.split(arg[0])[1:-1]
121        port = int(port)
122        s = socket.create_connection((ip, port), timeout=10)
123        self.dtp = self.dtp_handler(s, baseclass=self)
124        self.push('200 active data connection established')
125
126    def cmd_epsv(self, arg):
127        sock = socket.socket(socket.AF_INET6)
128        sock.bind((self.socket.getsockname()[0], 0))
129        sock.listen(5)
130        sock.settimeout(10)
131        port = sock.getsockname()[1]
132        self.push('229 entering extended passive mode (|||%d|)' %port)
133        conn, addr = sock.accept()
134        self.dtp = self.dtp_handler(conn, baseclass=self)
135
136    def cmd_echo(self, arg):
137        # sends back the received string (used by the test suite)
138        self.push(arg)
139
140    def cmd_user(self, arg):
141        self.push('331 username ok')
142
143    def cmd_pass(self, arg):
144        self.push('230 password ok')
145
146    def cmd_acct(self, arg):
147        self.push('230 acct ok')
148
149    def cmd_rnfr(self, arg):
150        self.push('350 rnfr ok')
151
152    def cmd_rnto(self, arg):
153        self.push('250 rnto ok')
154
155    def cmd_dele(self, arg):
156        self.push('250 dele ok')
157
158    def cmd_cwd(self, arg):
159        self.push('250 cwd ok')
160
161    def cmd_size(self, arg):
162        self.push('250 1000')
163
164    def cmd_mkd(self, arg):
165        self.push('257 "%s"' %arg)
166
167    def cmd_rmd(self, arg):
168        self.push('250 rmd ok')
169
170    def cmd_pwd(self, arg):
171        self.push('257 "pwd ok"')
172
173    def cmd_type(self, arg):
174        self.push('200 type ok')
175
176    def cmd_quit(self, arg):
177        self.push('221 quit ok')
178        self.close()
179
180    def cmd_stor(self, arg):
181        self.push('125 stor ok')
182
183    def cmd_rest(self, arg):
184        self.rest = arg
185        self.push('350 rest ok')
186
187    def cmd_retr(self, arg):
188        self.push('125 retr ok')
189        if self.rest is not None:
190            offset = int(self.rest)
191        else:
192            offset = 0
193        self.dtp.push(self.next_retr_data[offset:])
194        self.dtp.close_when_done()
195        self.rest = None
196
197    def cmd_list(self, arg):
198        self.push('125 list ok')
199        self.dtp.push(LIST_DATA)
200        self.dtp.close_when_done()
201
202    def cmd_nlst(self, arg):
203        self.push('125 nlst ok')
204        self.dtp.push(NLST_DATA)
205        self.dtp.close_when_done()
206
207    def cmd_setlongretr(self, arg):
208        # For testing. Next RETR will return long line.
209        self.next_retr_data = 'x' * int(arg)
210        self.push('125 setlongretr ok')
211
212
213class DummyFTPServer(asyncore.dispatcher, threading.Thread):
214
215    handler = DummyFTPHandler
216
217    def __init__(self, address, af=socket.AF_INET):
218        threading.Thread.__init__(self)
219        asyncore.dispatcher.__init__(self)
220        self.create_socket(af, socket.SOCK_STREAM)
221        try:
222            self.bind(address)
223            self.listen(5)
224            self.active = False
225            self.active_lock = threading.Lock()
226            self.host, self.port = self.socket.getsockname()[:2]
227            self.handler_instance = None
228        except:
229            # unregister the server on bind() error,
230            # needed by TestIPv6Environment.setUpClass()
231            self.del_channel()
232            raise
233
234    def start(self):
235        assert not self.active
236        self.__flag = threading.Event()
237        threading.Thread.start(self)
238        self.__flag.wait()
239
240    def run(self):
241        self.active = True
242        self.__flag.set()
243        while self.active and asyncore.socket_map:
244            self.active_lock.acquire()
245            asyncore.loop(timeout=0.1, count=1)
246            self.active_lock.release()
247        asyncore.close_all(ignore_all=True)
248
249    def stop(self):
250        assert self.active
251        self.active = False
252        self.join()
253
254    def handle_accept(self):
255        conn, addr = self.accept()
256        self.handler_instance = self.handler(conn)
257
258    def handle_connect(self):
259        self.close()
260    handle_read = handle_connect
261
262    def writable(self):
263        return 0
264
265    def handle_error(self):
266        raise
267
268
269if ssl is not None:
270
271    CERTFILE = os.path.join(os.path.dirname(__file__), "keycert3.pem")
272    CAFILE = os.path.join(os.path.dirname(__file__), "pycacert.pem")
273
274    class SSLConnection(object, asyncore.dispatcher):
275        """An asyncore.dispatcher subclass supporting TLS/SSL."""
276
277        _ssl_accepting = False
278        _ssl_closing = False
279
280        def secure_connection(self):
281            socket = ssl.wrap_socket(self.socket, suppress_ragged_eofs=False,
282                                     certfile=CERTFILE, server_side=True,
283                                     do_handshake_on_connect=False,
284                                     ssl_version=ssl.PROTOCOL_SSLv23)
285            self.del_channel()
286            self.set_socket(socket)
287            self._ssl_accepting = True
288
289        def _do_ssl_handshake(self):
290            try:
291                self.socket.do_handshake()
292            except ssl.SSLError as err:
293                if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
294                                   ssl.SSL_ERROR_WANT_WRITE):
295                    return
296                elif err.args[0] == ssl.SSL_ERROR_EOF:
297                    return self.handle_close()
298                raise
299            except socket.error as err:
300                if err.args[0] == errno.ECONNABORTED:
301                    return self.handle_close()
302            else:
303                self._ssl_accepting = False
304
305        def _do_ssl_shutdown(self):
306            self._ssl_closing = True
307            try:
308                self.socket = self.socket.unwrap()
309            except ssl.SSLError as err:
310                if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
311                                   ssl.SSL_ERROR_WANT_WRITE):
312                    return
313            except socket.error as err:
314                # Any "socket error" corresponds to a SSL_ERROR_SYSCALL return
315                # from OpenSSL's SSL_shutdown(), corresponding to a
316                # closed socket condition. See also:
317                # http://www.mail-archive.com/openssl-users@openssl.org/msg60710.html
318                pass
319            self._ssl_closing = False
320            if getattr(self, '_ccc', False) is False:
321                super(SSLConnection, self).close()
322            else:
323                pass
324
325        def handle_read_event(self):
326            if self._ssl_accepting:
327                self._do_ssl_handshake()
328            elif self._ssl_closing:
329                self._do_ssl_shutdown()
330            else:
331                super(SSLConnection, self).handle_read_event()
332
333        def handle_write_event(self):
334            if self._ssl_accepting:
335                self._do_ssl_handshake()
336            elif self._ssl_closing:
337                self._do_ssl_shutdown()
338            else:
339                super(SSLConnection, self).handle_write_event()
340
341        def send(self, data):
342            try:
343                return super(SSLConnection, self).send(data)
344            except ssl.SSLError as err:
345                if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN,
346                                   ssl.SSL_ERROR_WANT_READ,
347                                   ssl.SSL_ERROR_WANT_WRITE):
348                    return 0
349                raise
350
351        def recv(self, buffer_size):
352            try:
353                return super(SSLConnection, self).recv(buffer_size)
354            except ssl.SSLError as err:
355                if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
356                                   ssl.SSL_ERROR_WANT_WRITE):
357                    return b''
358                if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN):
359                    self.handle_close()
360                    return b''
361                raise
362
363        def handle_error(self):
364            raise
365
366        def close(self):
367            if (isinstance(self.socket, ssl.SSLSocket) and
368                self.socket._sslobj is not None):
369                self._do_ssl_shutdown()
370            else:
371                super(SSLConnection, self).close()
372
373
374    class DummyTLS_DTPHandler(SSLConnection, DummyDTPHandler):
375        """A DummyDTPHandler subclass supporting TLS/SSL."""
376
377        def __init__(self, conn, baseclass):
378            DummyDTPHandler.__init__(self, conn, baseclass)
379            if self.baseclass.secure_data_channel:
380                self.secure_connection()
381
382
383    class DummyTLS_FTPHandler(SSLConnection, DummyFTPHandler):
384        """A DummyFTPHandler subclass supporting TLS/SSL."""
385
386        dtp_handler = DummyTLS_DTPHandler
387
388        def __init__(self, conn):
389            DummyFTPHandler.__init__(self, conn)
390            self.secure_data_channel = False
391
392        def cmd_auth(self, line):
393            """Set up secure control channel."""
394            self.push('234 AUTH TLS successful')
395            self.secure_connection()
396
397        def cmd_pbsz(self, line):
398            """Negotiate size of buffer for secure data transfer.
399            For TLS/SSL the only valid value for the parameter is '0'.
400            Any other value is accepted but ignored.
401            """
402            self.push('200 PBSZ=0 successful.')
403
404        def cmd_prot(self, line):
405            """Setup un/secure data channel."""
406            arg = line.upper()
407            if arg == 'C':
408                self.push('200 Protection set to Clear')
409                self.secure_data_channel = False
410            elif arg == 'P':
411                self.push('200 Protection set to Private')
412                self.secure_data_channel = True
413            else:
414                self.push("502 Unrecognized PROT type (use C or P).")
415
416
417    class DummyTLS_FTPServer(DummyFTPServer):
418        handler = DummyTLS_FTPHandler
419
420
421class TestFTPClass(TestCase):
422
423    def setUp(self):
424        self.server = DummyFTPServer((HOST, 0))
425        self.server.start()
426        self.client = ftplib.FTP(timeout=10)
427        self.client.connect(self.server.host, self.server.port)
428
429    def tearDown(self):
430        self.client.close()
431        self.server.stop()
432
433    def test_getwelcome(self):
434        self.assertEqual(self.client.getwelcome(), '220 welcome')
435
436    def test_sanitize(self):
437        self.assertEqual(self.client.sanitize('foo'), repr('foo'))
438        self.assertEqual(self.client.sanitize('pass 12345'), repr('pass *****'))
439        self.assertEqual(self.client.sanitize('PASS 12345'), repr('PASS *****'))
440
441    def test_exceptions(self):
442        self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\r\n0')
443        self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\n0')
444        self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\r0')
445        self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 400')
446        self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 499')
447        self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 500')
448        self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 599')
449        self.assertRaises(ftplib.error_proto, self.client.sendcmd, 'echo 999')
450
451    def test_all_errors(self):
452        exceptions = (ftplib.error_reply, ftplib.error_temp, ftplib.error_perm,
453                      ftplib.error_proto, ftplib.Error, IOError, EOFError)
454        for x in exceptions:
455            try:
456                raise x('exception not included in all_errors set')
457            except ftplib.all_errors:
458                pass
459
460    def test_set_pasv(self):
461        # passive mode is supposed to be enabled by default
462        self.assertTrue(self.client.passiveserver)
463        self.client.set_pasv(True)
464        self.assertTrue(self.client.passiveserver)
465        self.client.set_pasv(False)
466        self.assertFalse(self.client.passiveserver)
467
468    def test_voidcmd(self):
469        self.client.voidcmd('echo 200')
470        self.client.voidcmd('echo 299')
471        self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 199')
472        self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 300')
473
474    def test_login(self):
475        self.client.login()
476
477    def test_acct(self):
478        self.client.acct('passwd')
479
480    def test_rename(self):
481        self.client.rename('a', 'b')
482        self.server.handler_instance.next_response = '200'
483        self.assertRaises(ftplib.error_reply, self.client.rename, 'a', 'b')
484
485    def test_delete(self):
486        self.client.delete('foo')
487        self.server.handler_instance.next_response = '199'
488        self.assertRaises(ftplib.error_reply, self.client.delete, 'foo')
489
490    def test_size(self):
491        self.client.size('foo')
492
493    def test_mkd(self):
494        dir = self.client.mkd('/foo')
495        self.assertEqual(dir, '/foo')
496
497    def test_rmd(self):
498        self.client.rmd('foo')
499
500    def test_cwd(self):
501        dir = self.client.cwd('/foo')
502        self.assertEqual(dir, '250 cwd ok')
503
504    def test_pwd(self):
505        dir = self.client.pwd()
506        self.assertEqual(dir, 'pwd ok')
507
508    def test_quit(self):
509        self.assertEqual(self.client.quit(), '221 quit ok')
510        # Ensure the connection gets closed; sock attribute should be None
511        self.assertEqual(self.client.sock, None)
512
513    def test_retrbinary(self):
514        received = []
515        self.client.retrbinary('retr', received.append)
516        self.assertEqual(''.join(received), RETR_DATA)
517
518    def test_retrbinary_rest(self):
519        for rest in (0, 10, 20):
520            received = []
521            self.client.retrbinary('retr', received.append, rest=rest)
522            self.assertEqual(''.join(received), RETR_DATA[rest:],
523                             msg='rest test case %d %d %d' % (rest,
524                                                              len(''.join(received)),
525                                                              len(RETR_DATA[rest:])))
526
527    def test_retrlines(self):
528        received = []
529        self.client.retrlines('retr', received.append)
530        self.assertEqual(''.join(received), RETR_DATA.replace('\r\n', ''))
531
532    def test_storbinary(self):
533        f = StringIO.StringIO(RETR_DATA)
534        self.client.storbinary('stor', f)
535        self.assertEqual(self.server.handler_instance.last_received_data, RETR_DATA)
536        # test new callback arg
537        flag = []
538        f.seek(0)
539        self.client.storbinary('stor', f, callback=lambda x: flag.append(None))
540        self.assertTrue(flag)
541
542    def test_storbinary_rest(self):
543        f = StringIO.StringIO(RETR_DATA)
544        for r in (30, '30'):
545            f.seek(0)
546            self.client.storbinary('stor', f, rest=r)
547            self.assertEqual(self.server.handler_instance.rest, str(r))
548
549    def test_storlines(self):
550        f = StringIO.StringIO(RETR_DATA.replace('\r\n', '\n'))
551        self.client.storlines('stor', f)
552        self.assertEqual(self.server.handler_instance.last_received_data, RETR_DATA)
553        # test new callback arg
554        flag = []
555        f.seek(0)
556        self.client.storlines('stor foo', f, callback=lambda x: flag.append(None))
557        self.assertTrue(flag)
558
559    def test_nlst(self):
560        self.client.nlst()
561        self.assertEqual(self.client.nlst(), NLST_DATA.split('\r\n')[:-1])
562
563    def test_dir(self):
564        l = []
565        self.client.dir(lambda x: l.append(x))
566        self.assertEqual(''.join(l), LIST_DATA.replace('\r\n', ''))
567
568    def test_makeport(self):
569        self.client.makeport()
570        # IPv4 is in use, just make sure send_eprt has not been used
571        self.assertEqual(self.server.handler_instance.last_received_cmd, 'port')
572
573    def test_makepasv(self):
574        host, port = self.client.makepasv()
575        conn = socket.create_connection((host, port), 10)
576        conn.close()
577        # IPv4 is in use, just make sure send_epsv has not been used
578        self.assertEqual(self.server.handler_instance.last_received_cmd, 'pasv')
579
580    def test_line_too_long(self):
581        self.assertRaises(ftplib.Error, self.client.sendcmd,
582                          'x' * self.client.maxline * 2)
583
584    def test_retrlines_too_long(self):
585        self.client.sendcmd('SETLONGRETR %d' % (self.client.maxline * 2))
586        received = []
587        self.assertRaises(ftplib.Error,
588                          self.client.retrlines, 'retr', received.append)
589
590    def test_storlines_too_long(self):
591        f = StringIO.StringIO('x' * self.client.maxline * 2)
592        self.assertRaises(ftplib.Error, self.client.storlines, 'stor', f)
593
594
595@skipUnless(socket.has_ipv6, "IPv6 not enabled")
596class TestIPv6Environment(TestCase):
597
598    @classmethod
599    def setUpClass(cls):
600        try:
601            DummyFTPServer((HOST, 0), af=socket.AF_INET6)
602        except socket.error:
603            raise SkipTest("IPv6 not enabled")
604
605    def setUp(self):
606        self.server = DummyFTPServer((HOSTv6, 0), af=socket.AF_INET6)
607        self.server.start()
608        self.client = ftplib.FTP()
609        self.client.connect(self.server.host, self.server.port)
610
611    def tearDown(self):
612        self.client.close()
613        self.server.stop()
614
615    def test_af(self):
616        self.assertEqual(self.client.af, socket.AF_INET6)
617
618    def test_makeport(self):
619        self.client.makeport()
620        self.assertEqual(self.server.handler_instance.last_received_cmd, 'eprt')
621
622    def test_makepasv(self):
623        host, port = self.client.makepasv()
624        conn = socket.create_connection((host, port), 10)
625        conn.close()
626        self.assertEqual(self.server.handler_instance.last_received_cmd, 'epsv')
627
628    def test_transfer(self):
629        def retr():
630            received = []
631            self.client.retrbinary('retr', received.append)
632            self.assertEqual(''.join(received), RETR_DATA)
633        self.client.set_pasv(True)
634        retr()
635        self.client.set_pasv(False)
636        retr()
637
638
639@skipUnless(ssl, "SSL not available")
640class TestTLS_FTPClassMixin(TestFTPClass):
641    """Repeat TestFTPClass tests starting the TLS layer for both control
642    and data connections first.
643    """
644
645    def setUp(self):
646        self.server = DummyTLS_FTPServer((HOST, 0))
647        self.server.start()
648        self.client = ftplib.FTP_TLS(timeout=10)
649        self.client.connect(self.server.host, self.server.port)
650        # enable TLS
651        self.client.auth()
652        self.client.prot_p()
653
654
655@skipUnless(ssl, "SSL not available")
656class TestTLS_FTPClass(TestCase):
657    """Specific TLS_FTP class tests."""
658
659    def setUp(self):
660        self.server = DummyTLS_FTPServer((HOST, 0))
661        self.server.start()
662        self.client = ftplib.FTP_TLS(timeout=TIMEOUT)
663        self.client.connect(self.server.host, self.server.port)
664
665    def tearDown(self):
666        self.client.close()
667        self.server.stop()
668
669    def test_control_connection(self):
670        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
671        self.client.auth()
672        self.assertIsInstance(self.client.sock, ssl.SSLSocket)
673
674    def test_data_connection(self):
675        # clear text
676        sock = self.client.transfercmd('list')
677        self.assertNotIsInstance(sock, ssl.SSLSocket)
678        self.assertEqual(sock.recv(1024), LIST_DATA.encode('ascii'))
679        sock.close()
680        self.assertEqual(self.client.voidresp(), "226 transfer complete")
681
682        # secured, after PROT P
683        self.client.prot_p()
684        sock = self.client.transfercmd('list')
685        self.assertIsInstance(sock, ssl.SSLSocket)
686        # consume from SSL socket to finalize handshake and avoid
687        # "SSLError [SSL] shutdown while in init"
688        self.assertEqual(sock.recv(1024), LIST_DATA.encode('ascii'))
689        sock.close()
690        self.assertEqual(self.client.voidresp(), "226 transfer complete")
691
692        # PROT C is issued, the connection must be in cleartext again
693        self.client.prot_c()
694        sock = self.client.transfercmd('list')
695        self.assertNotIsInstance(sock, ssl.SSLSocket)
696        self.assertEqual(sock.recv(1024), LIST_DATA.encode('ascii'))
697        sock.close()
698        self.assertEqual(self.client.voidresp(), "226 transfer complete")
699
700    def test_login(self):
701        # login() is supposed to implicitly secure the control connection
702        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
703        self.client.login()
704        self.assertIsInstance(self.client.sock, ssl.SSLSocket)
705        # make sure that AUTH TLS doesn't get issued again
706        self.client.login()
707
708    def test_auth_issued_twice(self):
709        self.client.auth()
710        self.assertRaises(ValueError, self.client.auth)
711
712    def test_auth_ssl(self):
713        try:
714            self.client.ssl_version = ssl.PROTOCOL_SSLv23
715            self.client.auth()
716            self.assertRaises(ValueError, self.client.auth)
717        finally:
718            self.client.ssl_version = ssl.PROTOCOL_TLS
719
720    def test_context(self):
721        self.client.quit()
722        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
723        self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE,
724                          context=ctx)
725        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
726                          context=ctx)
727        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
728                          keyfile=CERTFILE, context=ctx)
729
730        self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
731        self.client.connect(self.server.host, self.server.port)
732        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
733        self.client.auth()
734        self.assertIs(self.client.sock.context, ctx)
735        self.assertIsInstance(self.client.sock, ssl.SSLSocket)
736
737        self.client.prot_p()
738        sock = self.client.transfercmd('list')
739        try:
740            self.assertIs(sock.context, ctx)
741            self.assertIsInstance(sock, ssl.SSLSocket)
742        finally:
743            sock.close()
744
745    def test_check_hostname(self):
746        self.client.quit()
747        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
748        ctx.verify_mode = ssl.CERT_REQUIRED
749        ctx.check_hostname = True
750        ctx.load_verify_locations(CAFILE)
751        self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
752
753        # 127.0.0.1 doesn't match SAN
754        self.client.connect(self.server.host, self.server.port)
755        with self.assertRaises(ssl.CertificateError):
756            self.client.auth()
757        # exception quits connection
758
759        self.client.connect(self.server.host, self.server.port)
760        self.client.prot_p()
761        with self.assertRaises(ssl.CertificateError):
762            self.client.transfercmd("list").close()
763        self.client.quit()
764
765        self.client.connect("localhost", self.server.port)
766        self.client.auth()
767        self.client.quit()
768
769        self.client.connect("localhost", self.server.port)
770        self.client.prot_p()
771        self.client.transfercmd("list").close()
772
773
774class TestTimeouts(TestCase):
775
776    def setUp(self):
777        self.evt = threading.Event()
778        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
779        self.sock.settimeout(10)
780        self.port = test_support.bind_port(self.sock)
781        threading.Thread(target=self.server, args=(self.evt,self.sock)).start()
782        # Wait for the server to be ready.
783        self.evt.wait()
784        self.evt.clear()
785        ftplib.FTP.port = self.port
786
787    def tearDown(self):
788        self.evt.wait()
789
790    def server(self, evt, serv):
791        # This method sets the evt 3 times:
792        #  1) when the connection is ready to be accepted.
793        #  2) when it is safe for the caller to close the connection
794        #  3) when we have closed the socket
795        serv.listen(5)
796        # (1) Signal the caller that we are ready to accept the connection.
797        evt.set()
798        try:
799            conn, addr = serv.accept()
800        except socket.timeout:
801            pass
802        else:
803            conn.send("1 Hola mundo\n")
804            # (2) Signal the caller that it is safe to close the socket.
805            evt.set()
806            conn.close()
807        finally:
808            serv.close()
809            # (3) Signal the caller that we are done.
810            evt.set()
811
812    def testTimeoutDefault(self):
813        # default -- use global socket timeout
814        self.assertIsNone(socket.getdefaulttimeout())
815        socket.setdefaulttimeout(30)
816        try:
817            ftp = ftplib.FTP(HOST)
818        finally:
819            socket.setdefaulttimeout(None)
820        self.assertEqual(ftp.sock.gettimeout(), 30)
821        self.evt.wait()
822        ftp.close()
823
824    def testTimeoutNone(self):
825        # no timeout -- do not use global socket timeout
826        self.assertIsNone(socket.getdefaulttimeout())
827        socket.setdefaulttimeout(30)
828        try:
829            ftp = ftplib.FTP(HOST, timeout=None)
830        finally:
831            socket.setdefaulttimeout(None)
832        self.assertIsNone(ftp.sock.gettimeout())
833        self.evt.wait()
834        ftp.close()
835
836    def testTimeoutValue(self):
837        # a value
838        ftp = ftplib.FTP(HOST, timeout=30)
839        self.assertEqual(ftp.sock.gettimeout(), 30)
840        self.evt.wait()
841        ftp.close()
842
843    def testTimeoutConnect(self):
844        ftp = ftplib.FTP()
845        ftp.connect(HOST, timeout=30)
846        self.assertEqual(ftp.sock.gettimeout(), 30)
847        self.evt.wait()
848        ftp.close()
849
850    def testTimeoutDifferentOrder(self):
851        ftp = ftplib.FTP(timeout=30)
852        ftp.connect(HOST)
853        self.assertEqual(ftp.sock.gettimeout(), 30)
854        self.evt.wait()
855        ftp.close()
856
857    def testTimeoutDirectAccess(self):
858        ftp = ftplib.FTP()
859        ftp.timeout = 30
860        ftp.connect(HOST)
861        self.assertEqual(ftp.sock.gettimeout(), 30)
862        self.evt.wait()
863        ftp.close()
864
865
866def test_main():
867    tests = [TestFTPClass, TestTimeouts,
868             TestIPv6Environment,
869             TestTLS_FTPClassMixin, TestTLS_FTPClass]
870
871    thread_info = test_support.threading_setup()
872    try:
873        test_support.run_unittest(*tests)
874    finally:
875        test_support.threading_cleanup(*thread_info)
876
877
878if __name__ == '__main__':
879    test_main()
880