1"""Test script for ftplib module."""
2
3# Modified by Giampaolo Rodola' to test FTP class, IPv6 and TLS
4# environment
5
6import ftplib
7import asyncore
8import asynchat
9import socket
10import StringIO
11import errno
12import os
13try:
14    import ssl
15except ImportError:
16    ssl = None
17
18from unittest import TestCase
19from test import test_support
20from test.test_support import HOST
21threading = test_support.import_module('threading')
22
23
24# the dummy data returned by server over the data channel when
25# RETR, LIST and NLST commands are issued
26RETR_DATA = 'abcde12345\r\n' * 1000
27LIST_DATA = 'foo\r\nbar\r\n'
28NLST_DATA = 'foo\r\nbar\r\n'
29
30
31class DummyDTPHandler(asynchat.async_chat):
32    dtp_conn_closed = False
33
34    def __init__(self, conn, baseclass):
35        asynchat.async_chat.__init__(self, conn)
36        self.baseclass = baseclass
37        self.baseclass.last_received_data = ''
38
39    def handle_read(self):
40        self.baseclass.last_received_data += self.recv(1024)
41
42    def handle_close(self):
43        # XXX: this method can be called many times in a row for a single
44        # connection, including in clear-text (non-TLS) mode.
45        # (behaviour witnessed with test_data_connection)
46        if not self.dtp_conn_closed:
47            self.baseclass.push('226 transfer complete')
48            self.close()
49            self.dtp_conn_closed = True
50
51    def handle_error(self):
52        raise
53
54
55class DummyFTPHandler(asynchat.async_chat):
56
57    dtp_handler = DummyDTPHandler
58
59    def __init__(self, conn):
60        asynchat.async_chat.__init__(self, conn)
61        self.set_terminator("\r\n")
62        self.in_buffer = []
63        self.dtp = None
64        self.last_received_cmd = None
65        self.last_received_data = ''
66        self.next_response = ''
67        self.rest = None
68        self.push('220 welcome')
69
70    def collect_incoming_data(self, data):
71        self.in_buffer.append(data)
72
73    def found_terminator(self):
74        line = ''.join(self.in_buffer)
75        self.in_buffer = []
76        if self.next_response:
77            self.push(self.next_response)
78            self.next_response = ''
79        cmd = line.split(' ')[0].lower()
80        self.last_received_cmd = cmd
81        space = line.find(' ')
82        if space != -1:
83            arg = line[space + 1:]
84        else:
85            arg = ""
86        if hasattr(self, 'cmd_' + cmd):
87            method = getattr(self, 'cmd_' + cmd)
88            method(arg)
89        else:
90            self.push('550 command "%s" not understood.' %cmd)
91
92    def handle_error(self):
93        raise
94
95    def push(self, data):
96        asynchat.async_chat.push(self, data + '\r\n')
97
98    def cmd_port(self, arg):
99        addr = map(int, arg.split(','))
100        ip = '%d.%d.%d.%d' %tuple(addr[:4])
101        port = (addr[4] * 256) + addr[5]
102        s = socket.create_connection((ip, port), timeout=10)
103        self.dtp = self.dtp_handler(s, baseclass=self)
104        self.push('200 active data connection established')
105
106    def cmd_pasv(self, arg):
107        sock = socket.socket()
108        sock.bind((self.socket.getsockname()[0], 0))
109        sock.listen(5)
110        sock.settimeout(10)
111        ip, port = sock.getsockname()[:2]
112        ip = ip.replace('.', ',')
113        p1, p2 = divmod(port, 256)
114        self.push('227 entering passive mode (%s,%d,%d)' %(ip, p1, p2))
115        conn, addr = sock.accept()
116        self.dtp = self.dtp_handler(conn, baseclass=self)
117
118    def cmd_eprt(self, arg):
119        af, ip, port = arg.split(arg[0])[1:-1]
120        port = int(port)
121        s = socket.create_connection((ip, port), timeout=10)
122        self.dtp = self.dtp_handler(s, baseclass=self)
123        self.push('200 active data connection established')
124
125    def cmd_epsv(self, arg):
126        sock = socket.socket(socket.AF_INET6)
127        sock.bind((self.socket.getsockname()[0], 0))
128        sock.listen(5)
129        sock.settimeout(10)
130        port = sock.getsockname()[1]
131        self.push('229 entering extended passive mode (|||%d|)' %port)
132        conn, addr = sock.accept()
133        self.dtp = self.dtp_handler(conn, baseclass=self)
134
135    def cmd_echo(self, arg):
136        # sends back the received string (used by the test suite)
137        self.push(arg)
138
139    def cmd_user(self, arg):
140        self.push('331 username ok')
141
142    def cmd_pass(self, arg):
143        self.push('230 password ok')
144
145    def cmd_acct(self, arg):
146        self.push('230 acct ok')
147
148    def cmd_rnfr(self, arg):
149        self.push('350 rnfr ok')
150
151    def cmd_rnto(self, arg):
152        self.push('250 rnto ok')
153
154    def cmd_dele(self, arg):
155        self.push('250 dele ok')
156
157    def cmd_cwd(self, arg):
158        self.push('250 cwd ok')
159
160    def cmd_size(self, arg):
161        self.push('250 1000')
162
163    def cmd_mkd(self, arg):
164        self.push('257 "%s"' %arg)
165
166    def cmd_rmd(self, arg):
167        self.push('250 rmd ok')
168
169    def cmd_pwd(self, arg):
170        self.push('257 "pwd ok"')
171
172    def cmd_type(self, arg):
173        self.push('200 type ok')
174
175    def cmd_quit(self, arg):
176        self.push('221 quit ok')
177        self.close()
178
179    def cmd_stor(self, arg):
180        self.push('125 stor ok')
181
182    def cmd_rest(self, arg):
183        self.rest = arg
184        self.push('350 rest ok')
185
186    def cmd_retr(self, arg):
187        self.push('125 retr ok')
188        if self.rest is not None:
189            offset = int(self.rest)
190        else:
191            offset = 0
192        self.dtp.push(RETR_DATA[offset:])
193        self.dtp.close_when_done()
194        self.rest = None
195
196    def cmd_list(self, arg):
197        self.push('125 list ok')
198        self.dtp.push(LIST_DATA)
199        self.dtp.close_when_done()
200
201    def cmd_nlst(self, arg):
202        self.push('125 nlst ok')
203        self.dtp.push(NLST_DATA)
204        self.dtp.close_when_done()
205
206
207class DummyFTPServer(asyncore.dispatcher, threading.Thread):
208
209    handler = DummyFTPHandler
210
211    def __init__(self, address, af=socket.AF_INET):
212        threading.Thread.__init__(self)
213        asyncore.dispatcher.__init__(self)
214        self.create_socket(af, socket.SOCK_STREAM)
215        self.bind(address)
216        self.listen(5)
217        self.active = False
218        self.active_lock = threading.Lock()
219        self.host, self.port = self.socket.getsockname()[:2]
220
221    def start(self):
222        assert not self.active
223        self.__flag = threading.Event()
224        threading.Thread.start(self)
225        self.__flag.wait()
226
227    def run(self):
228        self.active = True
229        self.__flag.set()
230        while self.active and asyncore.socket_map:
231            self.active_lock.acquire()
232            asyncore.loop(timeout=0.1, count=1)
233            self.active_lock.release()
234        asyncore.close_all(ignore_all=True)
235
236    def stop(self):
237        assert self.active
238        self.active = False
239        self.join()
240
241    def handle_accept(self):
242        conn, addr = self.accept()
243        self.handler = self.handler(conn)
244        self.close()
245
246    def handle_connect(self):
247        self.close()
248    handle_read = handle_connect
249
250    def writable(self):
251        return 0
252
253    def handle_error(self):
254        raise
255
256
257if ssl is not None:
258
259    CERTFILE = os.path.join(os.path.dirname(__file__), "keycert.pem")
260
261    class SSLConnection(object, asyncore.dispatcher):
262        """An asyncore.dispatcher subclass supporting TLS/SSL."""
263
264        _ssl_accepting = False
265        _ssl_closing = False
266
267        def secure_connection(self):
268            self.socket = ssl.wrap_socket(self.socket, suppress_ragged_eofs=False,
269                                          certfile=CERTFILE, server_side=True,
270                                          do_handshake_on_connect=False,
271                                          ssl_version=ssl.PROTOCOL_SSLv23)
272            self._ssl_accepting = True
273
274        def _do_ssl_handshake(self):
275            try:
276                self.socket.do_handshake()
277            except ssl.SSLError, err:
278                if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
279                                   ssl.SSL_ERROR_WANT_WRITE):
280                    return
281                elif err.args[0] == ssl.SSL_ERROR_EOF:
282                    return self.handle_close()
283                raise
284            except socket.error, err:
285                if err.args[0] == errno.ECONNABORTED:
286                    return self.handle_close()
287            else:
288                self._ssl_accepting = False
289
290        def _do_ssl_shutdown(self):
291            self._ssl_closing = True
292            try:
293                self.socket = self.socket.unwrap()
294            except ssl.SSLError, err:
295                if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
296                                   ssl.SSL_ERROR_WANT_WRITE):
297                    return
298            except socket.error, err:
299                # Any "socket error" corresponds to a SSL_ERROR_SYSCALL return
300                # from OpenSSL's SSL_shutdown(), corresponding to a
301                # closed socket condition. See also:
302                # http://www.mail-archive.com/openssl-users@openssl.org/msg60710.html
303                pass
304            self._ssl_closing = False
305            super(SSLConnection, self).close()
306
307        def handle_read_event(self):
308            if self._ssl_accepting:
309                self._do_ssl_handshake()
310            elif self._ssl_closing:
311                self._do_ssl_shutdown()
312            else:
313                super(SSLConnection, self).handle_read_event()
314
315        def handle_write_event(self):
316            if self._ssl_accepting:
317                self._do_ssl_handshake()
318            elif self._ssl_closing:
319                self._do_ssl_shutdown()
320            else:
321                super(SSLConnection, self).handle_write_event()
322
323        def send(self, data):
324            try:
325                return super(SSLConnection, self).send(data)
326            except ssl.SSLError, err:
327                if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN,
328                                   ssl.SSL_ERROR_WANT_READ,
329                                   ssl.SSL_ERROR_WANT_WRITE):
330                    return 0
331                raise
332
333        def recv(self, buffer_size):
334            try:
335                return super(SSLConnection, self).recv(buffer_size)
336            except ssl.SSLError, err:
337                if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
338                                   ssl.SSL_ERROR_WANT_WRITE):
339                    return ''
340                if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN):
341                    self.handle_close()
342                    return ''
343                raise
344
345        def handle_error(self):
346            raise
347
348        def close(self):
349            if (isinstance(self.socket, ssl.SSLSocket) and
350                self.socket._sslobj is not None):
351                self._do_ssl_shutdown()
352
353
354    class DummyTLS_DTPHandler(SSLConnection, DummyDTPHandler):
355        """A DummyDTPHandler subclass supporting TLS/SSL."""
356
357        def __init__(self, conn, baseclass):
358            DummyDTPHandler.__init__(self, conn, baseclass)
359            if self.baseclass.secure_data_channel:
360                self.secure_connection()
361
362
363    class DummyTLS_FTPHandler(SSLConnection, DummyFTPHandler):
364        """A DummyFTPHandler subclass supporting TLS/SSL."""
365
366        dtp_handler = DummyTLS_DTPHandler
367
368        def __init__(self, conn):
369            DummyFTPHandler.__init__(self, conn)
370            self.secure_data_channel = False
371
372        def cmd_auth(self, line):
373            """Set up secure control channel."""
374            self.push('234 AUTH TLS successful')
375            self.secure_connection()
376
377        def cmd_pbsz(self, line):
378            """Negotiate size of buffer for secure data transfer.
379            For TLS/SSL the only valid value for the parameter is '0'.
380            Any other value is accepted but ignored.
381            """
382            self.push('200 PBSZ=0 successful.')
383
384        def cmd_prot(self, line):
385            """Setup un/secure data channel."""
386            arg = line.upper()
387            if arg == 'C':
388                self.push('200 Protection set to Clear')
389                self.secure_data_channel = False
390            elif arg == 'P':
391                self.push('200 Protection set to Private')
392                self.secure_data_channel = True
393            else:
394                self.push("502 Unrecognized PROT type (use C or P).")
395
396
397    class DummyTLS_FTPServer(DummyFTPServer):
398        handler = DummyTLS_FTPHandler
399
400
401class TestFTPClass(TestCase):
402
403    def setUp(self):
404        self.server = DummyFTPServer((HOST, 0))
405        self.server.start()
406        self.client = ftplib.FTP(timeout=10)
407        self.client.connect(self.server.host, self.server.port)
408
409    def tearDown(self):
410        self.client.close()
411        self.server.stop()
412
413    def test_getwelcome(self):
414        self.assertEqual(self.client.getwelcome(), '220 welcome')
415
416    def test_sanitize(self):
417        self.assertEqual(self.client.sanitize('foo'), repr('foo'))
418        self.assertEqual(self.client.sanitize('pass 12345'), repr('pass *****'))
419        self.assertEqual(self.client.sanitize('PASS 12345'), repr('PASS *****'))
420
421    def test_exceptions(self):
422        self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 400')
423        self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 499')
424        self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 500')
425        self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 599')
426        self.assertRaises(ftplib.error_proto, self.client.sendcmd, 'echo 999')
427
428    def test_all_errors(self):
429        exceptions = (ftplib.error_reply, ftplib.error_temp, ftplib.error_perm,
430                      ftplib.error_proto, ftplib.Error, IOError, EOFError)
431        for x in exceptions:
432            try:
433                raise x('exception not included in all_errors set')
434            except ftplib.all_errors:
435                pass
436
437    def test_set_pasv(self):
438        # passive mode is supposed to be enabled by default
439        self.assertTrue(self.client.passiveserver)
440        self.client.set_pasv(True)
441        self.assertTrue(self.client.passiveserver)
442        self.client.set_pasv(False)
443        self.assertFalse(self.client.passiveserver)
444
445    def test_voidcmd(self):
446        self.client.voidcmd('echo 200')
447        self.client.voidcmd('echo 299')
448        self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 199')
449        self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 300')
450
451    def test_login(self):
452        self.client.login()
453
454    def test_acct(self):
455        self.client.acct('passwd')
456
457    def test_rename(self):
458        self.client.rename('a', 'b')
459        self.server.handler.next_response = '200'
460        self.assertRaises(ftplib.error_reply, self.client.rename, 'a', 'b')
461
462    def test_delete(self):
463        self.client.delete('foo')
464        self.server.handler.next_response = '199'
465        self.assertRaises(ftplib.error_reply, self.client.delete, 'foo')
466
467    def test_size(self):
468        self.client.size('foo')
469
470    def test_mkd(self):
471        dir = self.client.mkd('/foo')
472        self.assertEqual(dir, '/foo')
473
474    def test_rmd(self):
475        self.client.rmd('foo')
476
477    def test_pwd(self):
478        dir = self.client.pwd()
479        self.assertEqual(dir, 'pwd ok')
480
481    def test_quit(self):
482        self.assertEqual(self.client.quit(), '221 quit ok')
483        # Ensure the connection gets closed; sock attribute should be None
484        self.assertEqual(self.client.sock, None)
485
486    def test_retrbinary(self):
487        received = []
488        self.client.retrbinary('retr', received.append)
489        self.assertEqual(''.join(received), RETR_DATA)
490
491    def test_retrbinary_rest(self):
492        for rest in (0, 10, 20):
493            received = []
494            self.client.retrbinary('retr', received.append, rest=rest)
495            self.assertEqual(''.join(received), RETR_DATA[rest:],
496                             msg='rest test case %d %d %d' % (rest,
497                                                              len(''.join(received)),
498                                                              len(RETR_DATA[rest:])))
499
500    def test_retrlines(self):
501        received = []
502        self.client.retrlines('retr', received.append)
503        self.assertEqual(''.join(received), RETR_DATA.replace('\r\n', ''))
504
505    def test_storbinary(self):
506        f = StringIO.StringIO(RETR_DATA)
507        self.client.storbinary('stor', f)
508        self.assertEqual(self.server.handler.last_received_data, RETR_DATA)
509        # test new callback arg
510        flag = []
511        f.seek(0)
512        self.client.storbinary('stor', f, callback=lambda x: flag.append(None))
513        self.assertTrue(flag)
514
515    def test_storbinary_rest(self):
516        f = StringIO.StringIO(RETR_DATA)
517        for r in (30, '30'):
518            f.seek(0)
519            self.client.storbinary('stor', f, rest=r)
520            self.assertEqual(self.server.handler.rest, str(r))
521
522    def test_storlines(self):
523        f = StringIO.StringIO(RETR_DATA.replace('\r\n', '\n'))
524        self.client.storlines('stor', f)
525        self.assertEqual(self.server.handler.last_received_data, RETR_DATA)
526        # test new callback arg
527        flag = []
528        f.seek(0)
529        self.client.storlines('stor foo', f, callback=lambda x: flag.append(None))
530        self.assertTrue(flag)
531
532    def test_nlst(self):
533        self.client.nlst()
534        self.assertEqual(self.client.nlst(), NLST_DATA.split('\r\n')[:-1])
535
536    def test_dir(self):
537        l = []
538        self.client.dir(lambda x: l.append(x))
539        self.assertEqual(''.join(l), LIST_DATA.replace('\r\n', ''))
540
541    def test_makeport(self):
542        self.client.makeport()
543        # IPv4 is in use, just make sure send_eprt has not been used
544        self.assertEqual(self.server.handler.last_received_cmd, 'port')
545
546    def test_makepasv(self):
547        host, port = self.client.makepasv()
548        conn = socket.create_connection((host, port), 10)
549        conn.close()
550        # IPv4 is in use, just make sure send_epsv has not been used
551        self.assertEqual(self.server.handler.last_received_cmd, 'pasv')
552
553
554class TestIPv6Environment(TestCase):
555
556    def setUp(self):
557        self.server = DummyFTPServer((HOST, 0), af=socket.AF_INET6)
558        self.server.start()
559        self.client = ftplib.FTP()
560        self.client.connect(self.server.host, self.server.port)
561
562    def tearDown(self):
563        self.client.close()
564        self.server.stop()
565
566    def test_af(self):
567        self.assertEqual(self.client.af, socket.AF_INET6)
568
569    def test_makeport(self):
570        self.client.makeport()
571        self.assertEqual(self.server.handler.last_received_cmd, 'eprt')
572
573    def test_makepasv(self):
574        host, port = self.client.makepasv()
575        conn = socket.create_connection((host, port), 10)
576        conn.close()
577        self.assertEqual(self.server.handler.last_received_cmd, 'epsv')
578
579    def test_transfer(self):
580        def retr():
581            received = []
582            self.client.retrbinary('retr', received.append)
583            self.assertEqual(''.join(received), RETR_DATA)
584        self.client.set_pasv(True)
585        retr()
586        self.client.set_pasv(False)
587        retr()
588
589
590class TestTLS_FTPClassMixin(TestFTPClass):
591    """Repeat TestFTPClass tests starting the TLS layer for both control
592    and data connections first.
593    """
594
595    def setUp(self):
596        self.server = DummyTLS_FTPServer((HOST, 0))
597        self.server.start()
598        self.client = ftplib.FTP_TLS(timeout=10)
599        self.client.connect(self.server.host, self.server.port)
600        # enable TLS
601        self.client.auth()
602        self.client.prot_p()
603
604
605class TestTLS_FTPClass(TestCase):
606    """Specific TLS_FTP class tests."""
607
608    def setUp(self):
609        self.server = DummyTLS_FTPServer((HOST, 0))
610        self.server.start()
611        self.client = ftplib.FTP_TLS(timeout=10)
612        self.client.connect(self.server.host, self.server.port)
613
614    def tearDown(self):
615        self.client.close()
616        self.server.stop()
617
618    def test_control_connection(self):
619        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
620        self.client.auth()
621        self.assertIsInstance(self.client.sock, ssl.SSLSocket)
622
623    def test_data_connection(self):
624        # clear text
625        sock = self.client.transfercmd('list')
626        self.assertNotIsInstance(sock, ssl.SSLSocket)
627        sock.close()
628        self.assertEqual(self.client.voidresp(), "226 transfer complete")
629
630        # secured, after PROT P
631        self.client.prot_p()
632        sock = self.client.transfercmd('list')
633        self.assertIsInstance(sock, ssl.SSLSocket)
634        sock.close()
635        self.assertEqual(self.client.voidresp(), "226 transfer complete")
636
637        # PROT C is issued, the connection must be in cleartext again
638        self.client.prot_c()
639        sock = self.client.transfercmd('list')
640        self.assertNotIsInstance(sock, ssl.SSLSocket)
641        sock.close()
642        self.assertEqual(self.client.voidresp(), "226 transfer complete")
643
644    def test_login(self):
645        # login() is supposed to implicitly secure the control connection
646        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
647        self.client.login()
648        self.assertIsInstance(self.client.sock, ssl.SSLSocket)
649        # make sure that AUTH TLS doesn't get issued again
650        self.client.login()
651
652    def test_auth_issued_twice(self):
653        self.client.auth()
654        self.assertRaises(ValueError, self.client.auth)
655
656    def test_auth_ssl(self):
657        try:
658            self.client.ssl_version = ssl.PROTOCOL_SSLv3
659            self.client.auth()
660            self.assertRaises(ValueError, self.client.auth)
661        finally:
662            self.client.ssl_version = ssl.PROTOCOL_TLSv1
663
664
665class TestTimeouts(TestCase):
666
667    def setUp(self):
668        self.evt = threading.Event()
669        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
670        self.sock.settimeout(3)
671        self.port = test_support.bind_port(self.sock)
672        threading.Thread(target=self.server, args=(self.evt,self.sock)).start()
673        # Wait for the server to be ready.
674        self.evt.wait()
675        self.evt.clear()
676        ftplib.FTP.port = self.port
677
678    def tearDown(self):
679        self.evt.wait()
680
681    def server(self, evt, serv):
682        # This method sets the evt 3 times:
683        #  1) when the connection is ready to be accepted.
684        #  2) when it is safe for the caller to close the connection
685        #  3) when we have closed the socket
686        serv.listen(5)
687        # (1) Signal the caller that we are ready to accept the connection.
688        evt.set()
689        try:
690            conn, addr = serv.accept()
691        except socket.timeout:
692            pass
693        else:
694            conn.send("1 Hola mundo\n")
695            # (2) Signal the caller that it is safe to close the socket.
696            evt.set()
697            conn.close()
698        finally:
699            serv.close()
700            # (3) Signal the caller that we are done.
701            evt.set()
702
703    def testTimeoutDefault(self):
704        # default -- use global socket timeout
705        self.assertTrue(socket.getdefaulttimeout() is None)
706        socket.setdefaulttimeout(30)
707        try:
708            ftp = ftplib.FTP("localhost")
709        finally:
710            socket.setdefaulttimeout(None)
711        self.assertEqual(ftp.sock.gettimeout(), 30)
712        self.evt.wait()
713        ftp.close()
714
715    def testTimeoutNone(self):
716        # no timeout -- do not use global socket timeout
717        self.assertTrue(socket.getdefaulttimeout() is None)
718        socket.setdefaulttimeout(30)
719        try:
720            ftp = ftplib.FTP("localhost", timeout=None)
721        finally:
722            socket.setdefaulttimeout(None)
723        self.assertTrue(ftp.sock.gettimeout() is None)
724        self.evt.wait()
725        ftp.close()
726
727    def testTimeoutValue(self):
728        # a value
729        ftp = ftplib.FTP(HOST, timeout=30)
730        self.assertEqual(ftp.sock.gettimeout(), 30)
731        self.evt.wait()
732        ftp.close()
733
734    def testTimeoutConnect(self):
735        ftp = ftplib.FTP()
736        ftp.connect(HOST, timeout=30)
737        self.assertEqual(ftp.sock.gettimeout(), 30)
738        self.evt.wait()
739        ftp.close()
740
741    def testTimeoutDifferentOrder(self):
742        ftp = ftplib.FTP(timeout=30)
743        ftp.connect(HOST)
744        self.assertEqual(ftp.sock.gettimeout(), 30)
745        self.evt.wait()
746        ftp.close()
747
748    def testTimeoutDirectAccess(self):
749        ftp = ftplib.FTP()
750        ftp.timeout = 30
751        ftp.connect(HOST)
752        self.assertEqual(ftp.sock.gettimeout(), 30)
753        self.evt.wait()
754        ftp.close()
755
756
757def test_main():
758    tests = [TestFTPClass, TestTimeouts]
759    if socket.has_ipv6:
760        try:
761            DummyFTPServer((HOST, 0), af=socket.AF_INET6)
762        except socket.error:
763            pass
764        else:
765            tests.append(TestIPv6Environment)
766
767    if ssl is not None:
768        tests.extend([TestTLS_FTPClassMixin, TestTLS_FTPClass])
769
770    thread_info = test_support.threading_setup()
771    try:
772        test_support.run_unittest(*tests)
773    finally:
774        test_support.threading_cleanup(*thread_info)
775
776
777if __name__ == '__main__':
778    test_main()
779