1"""
2Tests for the pandas.io.common functionalities
3"""
4import codecs
5from io import BytesIO, StringIO
6import mmap
7import os
8from pathlib import Path
9
10import pytest
11
12from pandas.compat import is_platform_windows
13import pandas.util._test_decorators as td
14
15import pandas as pd
16import pandas._testing as tm
17
18import pandas.io.common as icom
19
20
21class CustomFSPath:
22    """For testing fspath on unknown objects"""
23
24    def __init__(self, path):
25        self.path = path
26
27    def __fspath__(self):
28        return self.path
29
30
31# Functions that consume a string path and return a string or path-like object
32path_types = [str, CustomFSPath, Path]
33
34try:
35    from py.path import local as LocalPath
36
37    path_types.append(LocalPath)
38except ImportError:
39    pass
40
41HERE = os.path.abspath(os.path.dirname(__file__))
42
43
44# https://github.com/cython/cython/issues/1720
45@pytest.mark.filterwarnings("ignore:can't resolve package:ImportWarning")
46class TestCommonIOCapabilities:
47    data1 = """index,A,B,C,D
48foo,2,3,4,5
49bar,7,8,9,10
50baz,12,13,14,15
51qux,12,13,14,15
52foo2,12,13,14,15
53bar2,12,13,14,15
54"""
55
56    def test_expand_user(self):
57        filename = "~/sometest"
58        expanded_name = icom._expand_user(filename)
59
60        assert expanded_name != filename
61        assert os.path.isabs(expanded_name)
62        assert os.path.expanduser(filename) == expanded_name
63
64    def test_expand_user_normal_path(self):
65        filename = "/somefolder/sometest"
66        expanded_name = icom._expand_user(filename)
67
68        assert expanded_name == filename
69        assert os.path.expanduser(filename) == expanded_name
70
71    def test_stringify_path_pathlib(self):
72        rel_path = icom.stringify_path(Path("."))
73        assert rel_path == "."
74        redundant_path = icom.stringify_path(Path("foo//bar"))
75        assert redundant_path == os.path.join("foo", "bar")
76
77    @td.skip_if_no("py.path")
78    def test_stringify_path_localpath(self):
79        path = os.path.join("foo", "bar")
80        abs_path = os.path.abspath(path)
81        lpath = LocalPath(path)
82        assert icom.stringify_path(lpath) == abs_path
83
84    def test_stringify_path_fspath(self):
85        p = CustomFSPath("foo/bar.csv")
86        result = icom.stringify_path(p)
87        assert result == "foo/bar.csv"
88
89    def test_stringify_file_and_path_like(self):
90        # GH 38125: do not stringify file objects that are also path-like
91        fsspec = pytest.importorskip("fsspec")
92        with tm.ensure_clean() as path:
93            with fsspec.open(f"file://{path}", mode="wb") as fsspec_obj:
94                assert fsspec_obj == icom.stringify_path(fsspec_obj)
95
96    @pytest.mark.parametrize(
97        "extension,expected",
98        [
99            ("", None),
100            (".gz", "gzip"),
101            (".bz2", "bz2"),
102            (".zip", "zip"),
103            (".xz", "xz"),
104            (".GZ", "gzip"),
105            (".BZ2", "bz2"),
106            (".ZIP", "zip"),
107            (".XZ", "xz"),
108        ],
109    )
110    @pytest.mark.parametrize("path_type", path_types)
111    def test_infer_compression_from_path(self, extension, expected, path_type):
112        path = path_type("foo/bar.csv" + extension)
113        compression = icom.infer_compression(path, compression="infer")
114        assert compression == expected
115
116    @pytest.mark.parametrize("path_type", [str, CustomFSPath, Path])
117    def test_get_handle_with_path(self, path_type):
118        # ignore LocalPath: it creates strange paths: /absolute/~/sometest
119        filename = path_type("~/sometest")
120        with icom.get_handle(filename, "w") as handles:
121            assert os.path.isabs(handles.handle.name)
122            assert os.path.expanduser(filename) == handles.handle.name
123
124    def test_get_handle_with_buffer(self):
125        input_buffer = StringIO()
126        with icom.get_handle(input_buffer, "r") as handles:
127            assert handles.handle == input_buffer
128        assert not input_buffer.closed
129        input_buffer.close()
130
131    def test_iterator(self):
132        with pd.read_csv(StringIO(self.data1), chunksize=1) as reader:
133            result = pd.concat(reader, ignore_index=True)
134        expected = pd.read_csv(StringIO(self.data1))
135        tm.assert_frame_equal(result, expected)
136
137        # GH12153
138        with pd.read_csv(StringIO(self.data1), chunksize=1) as it:
139            first = next(it)
140            tm.assert_frame_equal(first, expected.iloc[[0]])
141            tm.assert_frame_equal(pd.concat(it), expected.iloc[1:])
142
143    @pytest.mark.parametrize(
144        "reader, module, error_class, fn_ext",
145        [
146            (pd.read_csv, "os", FileNotFoundError, "csv"),
147            (pd.read_fwf, "os", FileNotFoundError, "txt"),
148            (pd.read_excel, "xlrd", FileNotFoundError, "xlsx"),
149            (pd.read_feather, "pyarrow", IOError, "feather"),
150            (pd.read_hdf, "tables", FileNotFoundError, "h5"),
151            (pd.read_stata, "os", FileNotFoundError, "dta"),
152            (pd.read_sas, "os", FileNotFoundError, "sas7bdat"),
153            (pd.read_json, "os", ValueError, "json"),
154            (pd.read_pickle, "os", FileNotFoundError, "pickle"),
155        ],
156    )
157    def test_read_non_existent(self, reader, module, error_class, fn_ext):
158        pytest.importorskip(module)
159
160        path = os.path.join(HERE, "data", "does_not_exist." + fn_ext)
161        msg1 = fr"File (b')?.+does_not_exist\.{fn_ext}'? does not exist"
162        msg2 = fr"\[Errno 2\] No such file or directory: '.+does_not_exist\.{fn_ext}'"
163        msg3 = "Expected object or value"
164        msg4 = "path_or_buf needs to be a string file path or file-like"
165        msg5 = (
166            fr"\[Errno 2\] File .+does_not_exist\.{fn_ext} does not exist: "
167            fr"'.+does_not_exist\.{fn_ext}'"
168        )
169        msg6 = fr"\[Errno 2\] 没有那个文件或目录: '.+does_not_exist\.{fn_ext}'"
170        msg7 = (
171            fr"\[Errno 2\] File o directory non esistente: '.+does_not_exist\.{fn_ext}'"
172        )
173        msg8 = fr"Failed to open local file.+does_not_exist\.{fn_ext}"
174
175        with pytest.raises(
176            error_class,
177            match=fr"({msg1}|{msg2}|{msg3}|{msg4}|{msg5}|{msg6}|{msg7}|{msg8})",
178        ):
179            reader(path)
180
181    @pytest.mark.parametrize(
182        "reader, module, error_class, fn_ext",
183        [
184            (pd.read_csv, "os", FileNotFoundError, "csv"),
185            (pd.read_table, "os", FileNotFoundError, "csv"),
186            (pd.read_fwf, "os", FileNotFoundError, "txt"),
187            (pd.read_excel, "xlrd", FileNotFoundError, "xlsx"),
188            (pd.read_feather, "pyarrow", IOError, "feather"),
189            (pd.read_hdf, "tables", FileNotFoundError, "h5"),
190            (pd.read_stata, "os", FileNotFoundError, "dta"),
191            (pd.read_sas, "os", FileNotFoundError, "sas7bdat"),
192            (pd.read_json, "os", ValueError, "json"),
193            (pd.read_pickle, "os", FileNotFoundError, "pickle"),
194        ],
195    )
196    def test_read_expands_user_home_dir(
197        self, reader, module, error_class, fn_ext, monkeypatch
198    ):
199        pytest.importorskip(module)
200
201        path = os.path.join("~", "does_not_exist." + fn_ext)
202        monkeypatch.setattr(icom, "_expand_user", lambda x: os.path.join("foo", x))
203
204        msg1 = fr"File (b')?.+does_not_exist\.{fn_ext}'? does not exist"
205        msg2 = fr"\[Errno 2\] No such file or directory: '.+does_not_exist\.{fn_ext}'"
206        msg3 = "Unexpected character found when decoding 'false'"
207        msg4 = "path_or_buf needs to be a string file path or file-like"
208        msg5 = (
209            fr"\[Errno 2\] File .+does_not_exist\.{fn_ext} does not exist: "
210            fr"'.+does_not_exist\.{fn_ext}'"
211        )
212        msg6 = fr"\[Errno 2\] 没有那个文件或目录: '.+does_not_exist\.{fn_ext}'"
213        msg7 = (
214            fr"\[Errno 2\] File o directory non esistente: '.+does_not_exist\.{fn_ext}'"
215        )
216        msg8 = fr"Failed to open local file.+does_not_exist\.{fn_ext}"
217
218        with pytest.raises(
219            error_class,
220            match=fr"({msg1}|{msg2}|{msg3}|{msg4}|{msg5}|{msg6}|{msg7}|{msg8})",
221        ):
222            reader(path)
223
224    @pytest.mark.parametrize(
225        "reader, module, path",
226        [
227            (pd.read_csv, "os", ("io", "data", "csv", "iris.csv")),
228            (pd.read_table, "os", ("io", "data", "csv", "iris.csv")),
229            (
230                pd.read_fwf,
231                "os",
232                ("io", "data", "fixed_width", "fixed_width_format.txt"),
233            ),
234            (pd.read_excel, "xlrd", ("io", "data", "excel", "test1.xlsx")),
235            (
236                pd.read_feather,
237                "pyarrow",
238                ("io", "data", "feather", "feather-0_3_1.feather"),
239            ),
240            (
241                pd.read_hdf,
242                "tables",
243                ("io", "data", "legacy_hdf", "datetimetz_object.h5"),
244            ),
245            (pd.read_stata, "os", ("io", "data", "stata", "stata10_115.dta")),
246            (pd.read_sas, "os", ("io", "sas", "data", "test1.sas7bdat")),
247            (pd.read_json, "os", ("io", "json", "data", "tsframe_v012.json")),
248            (
249                pd.read_pickle,
250                "os",
251                ("io", "data", "pickle", "categorical.0.25.0.pickle"),
252            ),
253        ],
254    )
255    def test_read_fspath_all(self, reader, module, path, datapath):
256        pytest.importorskip(module)
257        path = datapath(*path)
258
259        mypath = CustomFSPath(path)
260        result = reader(mypath)
261        expected = reader(path)
262
263        if path.endswith(".pickle"):
264            # categorical
265            tm.assert_categorical_equal(result, expected)
266        else:
267            tm.assert_frame_equal(result, expected)
268
269    @pytest.mark.parametrize(
270        "writer_name, writer_kwargs, module",
271        [
272            ("to_csv", {}, "os"),
273            ("to_excel", {"engine": "xlwt"}, "xlwt"),
274            ("to_feather", {}, "pyarrow"),
275            ("to_html", {}, "os"),
276            ("to_json", {}, "os"),
277            ("to_latex", {}, "os"),
278            ("to_pickle", {}, "os"),
279            ("to_stata", {"time_stamp": pd.to_datetime("2019-01-01 00:00")}, "os"),
280        ],
281    )
282    def test_write_fspath_all(self, writer_name, writer_kwargs, module):
283        p1 = tm.ensure_clean("string")
284        p2 = tm.ensure_clean("fspath")
285        df = pd.DataFrame({"A": [1, 2]})
286
287        with p1 as string, p2 as fspath:
288            pytest.importorskip(module)
289            mypath = CustomFSPath(fspath)
290            writer = getattr(df, writer_name)
291
292            writer(string, **writer_kwargs)
293            with open(string, "rb") as f:
294                expected = f.read()
295
296            writer(mypath, **writer_kwargs)
297            with open(fspath, "rb") as f:
298                result = f.read()
299
300            assert result == expected
301
302    def test_write_fspath_hdf5(self):
303        # Same test as write_fspath_all, except HDF5 files aren't
304        # necessarily byte-for-byte identical for a given dataframe, so we'll
305        # have to read and compare equality
306        pytest.importorskip("tables")
307
308        df = pd.DataFrame({"A": [1, 2]})
309        p1 = tm.ensure_clean("string")
310        p2 = tm.ensure_clean("fspath")
311
312        with p1 as string, p2 as fspath:
313            mypath = CustomFSPath(fspath)
314            df.to_hdf(mypath, key="bar")
315            df.to_hdf(string, key="bar")
316
317            result = pd.read_hdf(fspath, key="bar")
318            expected = pd.read_hdf(string, key="bar")
319
320        tm.assert_frame_equal(result, expected)
321
322
323@pytest.fixture
324def mmap_file(datapath):
325    return datapath("io", "data", "csv", "test_mmap.csv")
326
327
328class TestMMapWrapper:
329    def test_constructor_bad_file(self, mmap_file):
330        non_file = StringIO("I am not a file")
331        non_file.fileno = lambda: -1
332
333        # the error raised is different on Windows
334        if is_platform_windows():
335            msg = "The parameter is incorrect"
336            err = OSError
337        else:
338            msg = "[Errno 22]"
339            err = mmap.error
340
341        with pytest.raises(err, match=msg):
342            icom._MMapWrapper(non_file)
343
344        target = open(mmap_file)
345        target.close()
346
347        msg = "I/O operation on closed file"
348        with pytest.raises(ValueError, match=msg):
349            icom._MMapWrapper(target)
350
351    def test_get_attr(self, mmap_file):
352        with open(mmap_file) as target:
353            wrapper = icom._MMapWrapper(target)
354
355        attrs = dir(wrapper.mmap)
356        attrs = [attr for attr in attrs if not attr.startswith("__")]
357        attrs.append("__next__")
358
359        for attr in attrs:
360            assert hasattr(wrapper, attr)
361
362        assert not hasattr(wrapper, "foo")
363
364    def test_next(self, mmap_file):
365        with open(mmap_file) as target:
366            wrapper = icom._MMapWrapper(target)
367            lines = target.readlines()
368
369        for line in lines:
370            next_line = next(wrapper)
371            assert next_line.strip() == line.strip()
372
373        with pytest.raises(StopIteration, match=r"^$"):
374            next(wrapper)
375
376    def test_unknown_engine(self):
377        with tm.ensure_clean() as path:
378            df = tm.makeDataFrame()
379            df.to_csv(path)
380            with pytest.raises(ValueError, match="Unknown engine"):
381                pd.read_csv(path, engine="pyt")
382
383    def test_binary_mode(self):
384        """
385        'encoding' shouldn't be passed to 'open' in binary mode.
386
387        GH 35058
388        """
389        with tm.ensure_clean() as path:
390            df = tm.makeDataFrame()
391            df.to_csv(path, mode="w+b")
392            tm.assert_frame_equal(df, pd.read_csv(path, index_col=0))
393
394    @pytest.mark.parametrize("encoding", ["utf-16", "utf-32"])
395    @pytest.mark.parametrize("compression_", ["bz2", "xz"])
396    def test_warning_missing_utf_bom(self, encoding, compression_):
397        """
398        bz2 and xz do not write the byte order mark (BOM) for utf-16/32.
399
400        https://stackoverflow.com/questions/55171439
401
402        GH 35681
403        """
404        df = tm.makeDataFrame()
405        with tm.ensure_clean() as path:
406            with tm.assert_produces_warning(UnicodeWarning):
407                df.to_csv(path, compression=compression_, encoding=encoding)
408
409            # reading should fail (otherwise we wouldn't need the warning)
410            with pytest.raises(Exception):
411                pd.read_csv(path, compression=compression_, encoding=encoding)
412
413
414def test_is_fsspec_url():
415    assert icom.is_fsspec_url("gcs://pandas/somethingelse.com")
416    assert icom.is_fsspec_url("gs://pandas/somethingelse.com")
417    # the following is the only remote URL that is handled without fsspec
418    assert not icom.is_fsspec_url("http://pandas/somethingelse.com")
419    assert not icom.is_fsspec_url("random:pandas/somethingelse.com")
420    assert not icom.is_fsspec_url("/local/path")
421    assert not icom.is_fsspec_url("relative/local/path")
422
423
424def test_default_errors():
425    # GH 38989
426    with tm.ensure_clean() as path:
427        file = Path(path)
428        file.write_bytes(b"\xe4\na\n1")
429        tm.assert_frame_equal(pd.read_csv(file, skiprows=[0]), pd.DataFrame({"a": [1]}))
430
431
432@pytest.mark.parametrize("encoding", [None, "utf-8"])
433@pytest.mark.parametrize("format", ["csv", "json"])
434def test_codecs_encoding(encoding, format):
435    # GH39247
436    expected = tm.makeDataFrame()
437    with tm.ensure_clean() as path:
438        with codecs.open(path, mode="w", encoding=encoding) as handle:
439            getattr(expected, f"to_{format}")(handle)
440        with codecs.open(path, mode="r", encoding=encoding) as handle:
441            if format == "csv":
442                df = pd.read_csv(handle, index_col=0)
443            else:
444                df = pd.read_json(handle)
445    tm.assert_frame_equal(expected, df)
446
447
448def test_codecs_get_writer_reader():
449    # GH39247
450    expected = tm.makeDataFrame()
451    with tm.ensure_clean() as path:
452        with open(path, "wb") as handle:
453            with codecs.getwriter("utf-8")(handle) as encoded:
454                expected.to_csv(encoded)
455        with open(path, "rb") as handle:
456            with codecs.getreader("utf-8")(handle) as encoded:
457                df = pd.read_csv(encoded, index_col=0)
458    tm.assert_frame_equal(expected, df)
459
460
461@pytest.mark.parametrize(
462    "io_class,mode,msg",
463    [
464        (BytesIO, "t", "a bytes-like object is required, not 'str'"),
465        (StringIO, "b", "string argument expected, got 'bytes'"),
466    ],
467)
468def test_explicit_encoding(io_class, mode, msg):
469    # GH39247; this test makes sure that if a user provides mode="*t" or "*b",
470    # it is used. In the case of this test it leads to an error as intentionally the
471    # wrong mode is requested
472    expected = tm.makeDataFrame()
473    with io_class() as buffer:
474        with pytest.raises(TypeError, match=msg):
475            expected.to_csv(buffer, mode=f"w{mode}")
476