1#!/usr/bin/env python
2from fs.tests import FSTestCases, ThreadingTestCases
3
4import unittest
5
6import os
7import sys
8import shutil
9import tempfile
10import subprocess
11import time
12from os.path import abspath
13import urllib
14
15from six import PY3
16
17
18try:
19    from pyftpdlib.authorizers import DummyAuthorizer
20    from pyftpdlib.handlers import FTPHandler
21    from pyftpdlib.servers import FTPServer
22except ImportError:
23    if not PY3:
24        raise ImportError("Requires pyftpdlib <http://code.google.com/p/pyftpdlib/>")
25
26from fs.path import *
27
28from fs import ftpfs
29
30ftp_port = 30000
31class TestFTPFS(unittest.TestCase, FSTestCases, ThreadingTestCases):
32
33    __test__ = not PY3
34
35    def setUp(self):
36        global ftp_port
37        ftp_port += 1
38        use_port = str(ftp_port)
39        #ftp_port = 10000
40        self.temp_dir = tempfile.mkdtemp(u"ftpfstests")
41
42        file_path = __file__
43        if ':' not in file_path:
44            file_path = abspath(file_path)
45        # Apparently Windows requires values from default environment, so copy the exisiting os.environ
46        env = os.environ.copy()
47        env['PYTHONPATH'] = os.getcwd() + os.pathsep + env.get('PYTHONPATH', '')
48        self.ftp_server = subprocess.Popen([sys.executable,
49                                            file_path,
50                                            self.temp_dir,
51                                            use_port],
52                                           stdout=subprocess.PIPE,
53                                           env=env)
54        # Block until the server writes a line to stdout
55        self.ftp_server.stdout.readline()
56
57        # Poll until a connection can be made
58        start_time = time.time()
59        while time.time() - start_time < 5:
60            try:
61                ftpurl = urllib.urlopen('ftp://127.0.0.1:%s' % use_port)
62            except IOError:
63                time.sleep(0)
64            else:
65                ftpurl.read()
66                ftpurl.close()
67                break
68        else:
69            # Avoid a possible infinite loop
70            raise Exception("Unable to connect to ftp server")
71
72        self.fs = ftpfs.FTPFS('127.0.0.1', 'user', '12345', dircache=True, port=use_port, timeout=5.0)
73        self.fs.cache_hint(True)
74
75
76    def tearDown(self):
77        #self.ftp_server.terminate()
78        if sys.platform == 'win32':
79            os.popen('TASKKILL /PID '+str(self.ftp_server.pid)+' /F')
80        else:
81            os.system('kill '+str(self.ftp_server.pid))
82        shutil.rmtree(self.temp_dir)
83        self.fs.close()
84
85    def check(self, p):
86        check_path = self.temp_dir.rstrip(os.sep) + os.sep + p
87        return os.path.exists(check_path.encode('utf-8'))
88
89
90if __name__ == "__main__":
91
92    # Run an ftp server that exposes a given directory
93    import sys
94    authorizer = DummyAuthorizer()
95    authorizer.add_user("user", "12345", sys.argv[1], perm="elradfmw")
96    authorizer.add_anonymous(sys.argv[1])
97
98    #def nolog(*args):
99    #    pass
100    #ftpserver.log = nolog
101    #ftpserver.logline = nolog
102
103    handler = FTPHandler
104    handler.authorizer = authorizer
105    address = ("127.0.0.1", int(sys.argv[2]))
106    #print address
107
108    ftpd = FTPServer(address, handler)
109
110    sys.stdout.write('serving\n')
111    sys.stdout.flush()
112    ftpd.serve_forever()
113