1import io 2import os 3import pathlib 4 5import pytest 6from fsspec.utils import ( 7 infer_storage_options, 8 read_block, 9 seek_delimiter, 10 stringify_path, 11) 12 13 14def test_read_block(): 15 delimiter = b"\n" 16 data = delimiter.join([b"123", b"456", b"789"]) 17 f = io.BytesIO(data) 18 19 assert read_block(f, 1, 2) == b"23" 20 assert read_block(f, 0, 1, delimiter=b"\n") == b"123\n" 21 assert read_block(f, 0, 2, delimiter=b"\n") == b"123\n" 22 assert read_block(f, 0, 3, delimiter=b"\n") == b"123\n" 23 assert read_block(f, 0, 5, delimiter=b"\n") == b"123\n456\n" 24 assert read_block(f, 0, 8, delimiter=b"\n") == b"123\n456\n789" 25 assert read_block(f, 0, 100, delimiter=b"\n") == b"123\n456\n789" 26 assert read_block(f, 1, 1, delimiter=b"\n") == b"" 27 assert read_block(f, 1, 5, delimiter=b"\n") == b"456\n" 28 assert read_block(f, 1, 8, delimiter=b"\n") == b"456\n789" 29 30 for ols in [[(0, 3), (3, 3), (6, 3), (9, 2)], [(0, 4), (4, 4), (8, 4)]]: 31 out = [read_block(f, o, l, b"\n") for o, l in ols] 32 assert b"".join(filter(None, out)) == data 33 34 35def test_seek_delimiter_endline(): 36 f = io.BytesIO(b"123\n456\n789") 37 38 # if at zero, stay at zero 39 seek_delimiter(f, b"\n", 5) 40 assert f.tell() == 0 41 42 # choose the first block 43 for bs in [1, 5, 100]: 44 f.seek(1) 45 seek_delimiter(f, b"\n", blocksize=bs) 46 assert f.tell() == 4 47 48 # handle long delimiters well, even with short blocksizes 49 f = io.BytesIO(b"123abc456abc789") 50 for bs in [1, 2, 3, 4, 5, 6, 10]: 51 f.seek(1) 52 seek_delimiter(f, b"abc", blocksize=bs) 53 assert f.tell() == 6 54 55 # End at the end 56 f = io.BytesIO(b"123\n456") 57 f.seek(5) 58 seek_delimiter(f, b"\n", 5) 59 assert f.tell() == 7 60 61 62def test_infer_storage_options(): 63 so = infer_storage_options("/mnt/datasets/test.csv") 64 assert so.pop("protocol") == "file" 65 assert so.pop("path") == "/mnt/datasets/test.csv" 66 assert not so 67 68 assert infer_storage_options("./test.csv")["path"] == "./test.csv" 69 assert infer_storage_options("../test.csv")["path"] == "../test.csv" 70 71 so = infer_storage_options("C:\\test.csv") 72 assert so.pop("protocol") == "file" 73 assert so.pop("path") == "C:\\test.csv" 74 assert not so 75 76 assert infer_storage_options("d:\\test.csv")["path"] == "d:\\test.csv" 77 assert infer_storage_options("\\test.csv")["path"] == "\\test.csv" 78 assert infer_storage_options(".\\test.csv")["path"] == ".\\test.csv" 79 assert infer_storage_options("test.csv")["path"] == "test.csv" 80 81 so = infer_storage_options( 82 "hdfs://username:pwd@Node:123/mnt/datasets/test.csv?q=1#fragm", 83 inherit_storage_options={"extra": "value"}, 84 ) 85 assert so.pop("protocol") == "hdfs" 86 assert so.pop("username") == "username" 87 assert so.pop("password") == "pwd" 88 assert so.pop("host") == "Node" 89 assert so.pop("port") == 123 90 assert so.pop("path") == "/mnt/datasets/test.csv#fragm" 91 assert so.pop("url_query") == "q=1" 92 assert so.pop("url_fragment") == "fragm" 93 assert so.pop("extra") == "value" 94 assert not so 95 96 so = infer_storage_options("hdfs://User-name@Node-name.com/mnt/datasets/test.csv") 97 assert so.pop("username") == "User-name" 98 assert so.pop("host") == "Node-name.com" 99 100 u = "http://127.0.0.1:8080/test.csv" 101 assert infer_storage_options(u) == {"protocol": "http", "path": u} 102 103 # For s3 and gcs the netloc is actually the bucket name, so we want to 104 # include it in the path. Test that: 105 # - Parsing doesn't lowercase the bucket 106 # - The bucket is included in path 107 for protocol in ["s3", "gcs", "gs"]: 108 options = infer_storage_options("%s://Bucket-name.com/test.csv" % protocol) 109 assert options["path"] == "Bucket-name.com/test.csv" 110 111 with pytest.raises(KeyError): 112 infer_storage_options("file:///bucket/file.csv", {"path": "collide"}) 113 with pytest.raises(KeyError): 114 infer_storage_options("hdfs:///bucket/file.csv", {"protocol": "collide"}) 115 116 117@pytest.mark.parametrize( 118 "urlpath, expected_path", 119 ( 120 (r"c:\foo\bar", r"c:\foo\bar"), 121 (r"C:\\foo\bar", r"C:\\foo\bar"), 122 (r"c:/foo/bar", r"c:/foo/bar"), 123 (r"file:///c|\foo\bar", r"c:\foo\bar"), 124 (r"file:///C|/foo/bar", r"C:/foo/bar"), 125 (r"file:///C:/foo/bar", r"C:/foo/bar"), 126 ), 127) 128def test_infer_storage_options_c(urlpath, expected_path): 129 so = infer_storage_options(urlpath) 130 assert so["protocol"] == "file" 131 assert so["path"] == expected_path 132 133 134def test_stringify_path(): 135 test_filepath = os.path.join("path", "to", "file.txt") 136 137 # Pathlib.path 138 path = pathlib.Path(test_filepath) 139 assert stringify_path(path) == test_filepath 140 141 # fspath protocol 142 class CustomFSPath: 143 """For testing fspath on unknown objects""" 144 145 def __init__(self, path): 146 self.path = path 147 148 def __fspath__(self): 149 return self.path 150 151 path = CustomFSPath(test_filepath) 152 assert stringify_path(path) == test_filepath 153 154 # Non path-like input is unaffected 155 path = (1, 2, 3) 156 assert stringify_path(path) is path 157