1import os 2import shutil 3import subprocess 4import sys 5import time 6 7import pytest 8 9import fsspec 10from fsspec.implementations.cached import CachingFileSystem 11 12 13@pytest.fixture() 14def m(): 15 """ 16 Fixture providing a memory filesystem. 17 """ 18 m = fsspec.filesystem("memory") 19 m.store.clear() 20 m.pseudo_dirs.clear() 21 try: 22 yield m 23 finally: 24 m.store.clear() 25 m.pseudo_dirs.clear() 26 27 28@pytest.fixture 29def ftp_writable(tmpdir): 30 """ 31 Fixture providing a writable FTP filesystem. 32 """ 33 pytest.importorskip("pyftpdlib") 34 from fsspec.implementations.ftp import FTPFileSystem 35 36 FTPFileSystem.clear_instance_cache() # remove lingering connections 37 CachingFileSystem.clear_instance_cache() 38 d = str(tmpdir) 39 with open(os.path.join(d, "out"), "wb") as f: 40 f.write(b"hello" * 10000) 41 P = subprocess.Popen( 42 [sys.executable, "-m", "pyftpdlib", "-d", d, "-u", "user", "-P", "pass", "-w"] 43 ) 44 try: 45 time.sleep(1) 46 yield "localhost", 2121, "user", "pass" 47 finally: 48 P.terminate() 49 P.wait() 50 try: 51 shutil.rmtree(tmpdir) 52 except Exception: 53 pass 54