1#!/usr/bin/env python 2from fs.tests import FSTestCases, ThreadingTestCases 3 4import unittest 5 6import os 7import sys 8import shutil 9import tempfile 10import subprocess 11import time 12from os.path import abspath 13import urllib 14 15from six import PY3 16 17 18try: 19 from pyftpdlib.authorizers import DummyAuthorizer 20 from pyftpdlib.handlers import FTPHandler 21 from pyftpdlib.servers import FTPServer 22except ImportError: 23 if not PY3: 24 raise ImportError("Requires pyftpdlib <http://code.google.com/p/pyftpdlib/>") 25 26from fs.path import * 27 28from fs import ftpfs 29 30ftp_port = 30000 31class TestFTPFS(unittest.TestCase, FSTestCases, ThreadingTestCases): 32 33 __test__ = not PY3 34 35 def setUp(self): 36 global ftp_port 37 ftp_port += 1 38 use_port = str(ftp_port) 39 #ftp_port = 10000 40 self.temp_dir = tempfile.mkdtemp(u"ftpfstests") 41 42 file_path = __file__ 43 if ':' not in file_path: 44 file_path = abspath(file_path) 45 # Apparently Windows requires values from default environment, so copy the exisiting os.environ 46 env = os.environ.copy() 47 env['PYTHONPATH'] = os.getcwd() + os.pathsep + env.get('PYTHONPATH', '') 48 self.ftp_server = subprocess.Popen([sys.executable, 49 file_path, 50 self.temp_dir, 51 use_port], 52 stdout=subprocess.PIPE, 53 env=env) 54 # Block until the server writes a line to stdout 55 self.ftp_server.stdout.readline() 56 57 # Poll until a connection can be made 58 start_time = time.time() 59 while time.time() - start_time < 5: 60 try: 61 ftpurl = urllib.urlopen('ftp://127.0.0.1:%s' % use_port) 62 except IOError: 63 time.sleep(0) 64 else: 65 ftpurl.read() 66 ftpurl.close() 67 break 68 else: 69 # Avoid a possible infinite loop 70 raise Exception("Unable to connect to ftp server") 71 72 self.fs = ftpfs.FTPFS('127.0.0.1', 'user', '12345', dircache=True, port=use_port, timeout=5.0) 73 self.fs.cache_hint(True) 74 75 76 def tearDown(self): 77 #self.ftp_server.terminate() 78 if sys.platform == 'win32': 79 os.popen('TASKKILL /PID '+str(self.ftp_server.pid)+' /F') 80 else: 81 os.system('kill '+str(self.ftp_server.pid)) 82 shutil.rmtree(self.temp_dir) 83 self.fs.close() 84 85 def check(self, p): 86 check_path = self.temp_dir.rstrip(os.sep) + os.sep + p 87 return os.path.exists(check_path.encode('utf-8')) 88 89 90if __name__ == "__main__": 91 92 # Run an ftp server that exposes a given directory 93 import sys 94 authorizer = DummyAuthorizer() 95 authorizer.add_user("user", "12345", sys.argv[1], perm="elradfmw") 96 authorizer.add_anonymous(sys.argv[1]) 97 98 #def nolog(*args): 99 # pass 100 #ftpserver.log = nolog 101 #ftpserver.logline = nolog 102 103 handler = FTPHandler 104 handler.authorizer = authorizer 105 address = ("127.0.0.1", int(sys.argv[2])) 106 #print address 107 108 ftpd = FTPServer(address, handler) 109 110 sys.stdout.write('serving\n') 111 sys.stdout.flush() 112 ftpd.serve_forever() 113