1import os
2import shutil
3import subprocess
4import sys
5import time
6
7import pytest
8
9import fsspec
10from fsspec.implementations.cached import CachingFileSystem
11
12
13@pytest.fixture()
14def m():
15    """
16    Fixture providing a memory filesystem.
17    """
18    m = fsspec.filesystem("memory")
19    m.store.clear()
20    m.pseudo_dirs.clear()
21    try:
22        yield m
23    finally:
24        m.store.clear()
25        m.pseudo_dirs.clear()
26
27
28@pytest.fixture
29def ftp_writable(tmpdir):
30    """
31    Fixture providing a writable FTP filesystem.
32    """
33    pytest.importorskip("pyftpdlib")
34    from fsspec.implementations.ftp import FTPFileSystem
35
36    FTPFileSystem.clear_instance_cache()  # remove lingering connections
37    CachingFileSystem.clear_instance_cache()
38    d = str(tmpdir)
39    with open(os.path.join(d, "out"), "wb") as f:
40        f.write(b"hello" * 10000)
41    P = subprocess.Popen(
42        [sys.executable, "-m", "pyftpdlib", "-d", d, "-u", "user", "-P", "pass", "-w"]
43    )
44    try:
45        time.sleep(1)
46        yield "localhost", 2121, "user", "pass"
47    finally:
48        P.terminate()
49        P.wait()
50        try:
51            shutil.rmtree(tmpdir)
52        except Exception:
53            pass
54