1import io
2import os
3import pathlib
4
5import pytest
6from fsspec.utils import (
7    infer_storage_options,
8    read_block,
9    seek_delimiter,
10    stringify_path,
11)
12
13
14def test_read_block():
15    delimiter = b"\n"
16    data = delimiter.join([b"123", b"456", b"789"])
17    f = io.BytesIO(data)
18
19    assert read_block(f, 1, 2) == b"23"
20    assert read_block(f, 0, 1, delimiter=b"\n") == b"123\n"
21    assert read_block(f, 0, 2, delimiter=b"\n") == b"123\n"
22    assert read_block(f, 0, 3, delimiter=b"\n") == b"123\n"
23    assert read_block(f, 0, 5, delimiter=b"\n") == b"123\n456\n"
24    assert read_block(f, 0, 8, delimiter=b"\n") == b"123\n456\n789"
25    assert read_block(f, 0, 100, delimiter=b"\n") == b"123\n456\n789"
26    assert read_block(f, 1, 1, delimiter=b"\n") == b""
27    assert read_block(f, 1, 5, delimiter=b"\n") == b"456\n"
28    assert read_block(f, 1, 8, delimiter=b"\n") == b"456\n789"
29
30    for ols in [[(0, 3), (3, 3), (6, 3), (9, 2)], [(0, 4), (4, 4), (8, 4)]]:
31        out = [read_block(f, o, l, b"\n") for o, l in ols]
32        assert b"".join(filter(None, out)) == data
33
34
35def test_seek_delimiter_endline():
36    f = io.BytesIO(b"123\n456\n789")
37
38    # if at zero, stay at zero
39    seek_delimiter(f, b"\n", 5)
40    assert f.tell() == 0
41
42    # choose the first block
43    for bs in [1, 5, 100]:
44        f.seek(1)
45        seek_delimiter(f, b"\n", blocksize=bs)
46        assert f.tell() == 4
47
48    # handle long delimiters well, even with short blocksizes
49    f = io.BytesIO(b"123abc456abc789")
50    for bs in [1, 2, 3, 4, 5, 6, 10]:
51        f.seek(1)
52        seek_delimiter(f, b"abc", blocksize=bs)
53        assert f.tell() == 6
54
55    # End at the end
56    f = io.BytesIO(b"123\n456")
57    f.seek(5)
58    seek_delimiter(f, b"\n", 5)
59    assert f.tell() == 7
60
61
62def test_infer_storage_options():
63    so = infer_storage_options("/mnt/datasets/test.csv")
64    assert so.pop("protocol") == "file"
65    assert so.pop("path") == "/mnt/datasets/test.csv"
66    assert not so
67
68    assert infer_storage_options("./test.csv")["path"] == "./test.csv"
69    assert infer_storage_options("../test.csv")["path"] == "../test.csv"
70
71    so = infer_storage_options("C:\\test.csv")
72    assert so.pop("protocol") == "file"
73    assert so.pop("path") == "C:\\test.csv"
74    assert not so
75
76    assert infer_storage_options("d:\\test.csv")["path"] == "d:\\test.csv"
77    assert infer_storage_options("\\test.csv")["path"] == "\\test.csv"
78    assert infer_storage_options(".\\test.csv")["path"] == ".\\test.csv"
79    assert infer_storage_options("test.csv")["path"] == "test.csv"
80
81    so = infer_storage_options(
82        "hdfs://username:pwd@Node:123/mnt/datasets/test.csv?q=1#fragm",
83        inherit_storage_options={"extra": "value"},
84    )
85    assert so.pop("protocol") == "hdfs"
86    assert so.pop("username") == "username"
87    assert so.pop("password") == "pwd"
88    assert so.pop("host") == "Node"
89    assert so.pop("port") == 123
90    assert so.pop("path") == "/mnt/datasets/test.csv#fragm"
91    assert so.pop("url_query") == "q=1"
92    assert so.pop("url_fragment") == "fragm"
93    assert so.pop("extra") == "value"
94    assert not so
95
96    so = infer_storage_options("hdfs://User-name@Node-name.com/mnt/datasets/test.csv")
97    assert so.pop("username") == "User-name"
98    assert so.pop("host") == "Node-name.com"
99
100    u = "http://127.0.0.1:8080/test.csv"
101    assert infer_storage_options(u) == {"protocol": "http", "path": u}
102
103    # For s3 and gcs the netloc is actually the bucket name, so we want to
104    # include it in the path. Test that:
105    # - Parsing doesn't lowercase the bucket
106    # - The bucket is included in path
107    for protocol in ["s3", "gcs", "gs"]:
108        options = infer_storage_options("%s://Bucket-name.com/test.csv" % protocol)
109        assert options["path"] == "Bucket-name.com/test.csv"
110
111    with pytest.raises(KeyError):
112        infer_storage_options("file:///bucket/file.csv", {"path": "collide"})
113    with pytest.raises(KeyError):
114        infer_storage_options("hdfs:///bucket/file.csv", {"protocol": "collide"})
115
116
117@pytest.mark.parametrize(
118    "urlpath, expected_path",
119    (
120        (r"c:\foo\bar", r"c:\foo\bar"),
121        (r"C:\\foo\bar", r"C:\\foo\bar"),
122        (r"c:/foo/bar", r"c:/foo/bar"),
123        (r"file:///c|\foo\bar", r"c:\foo\bar"),
124        (r"file:///C|/foo/bar", r"C:/foo/bar"),
125        (r"file:///C:/foo/bar", r"C:/foo/bar"),
126    ),
127)
128def test_infer_storage_options_c(urlpath, expected_path):
129    so = infer_storage_options(urlpath)
130    assert so["protocol"] == "file"
131    assert so["path"] == expected_path
132
133
134def test_stringify_path():
135    test_filepath = os.path.join("path", "to", "file.txt")
136
137    # Pathlib.path
138    path = pathlib.Path(test_filepath)
139    assert stringify_path(path) == test_filepath
140
141    # fspath protocol
142    class CustomFSPath:
143        """For testing fspath on unknown objects"""
144
145        def __init__(self, path):
146            self.path = path
147
148        def __fspath__(self):
149            return self.path
150
151    path = CustomFSPath(test_filepath)
152    assert stringify_path(path) == test_filepath
153
154    # Non path-like input is unaffected
155    path = (1, 2, 3)
156    assert stringify_path(path) is path
157