1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements.  See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership.  The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License.  You may obtain a copy of the License at
8#
9#   http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied.  See the License for the
15# specific language governing permissions and limitations
16# under the License.
17
18from collections import OrderedDict
19import io
20
21import numpy as np
22import pytest
23
24import pyarrow as pa
25from pyarrow import fs
26from pyarrow.filesystem import LocalFileSystem, FileSystem
27from pyarrow.tests import util
28from pyarrow.tests.parquet.common import (_check_roundtrip, _roundtrip_table,
29                                          parametrize_legacy_dataset)
30
31try:
32    import pyarrow.parquet as pq
33    from pyarrow.tests.parquet.common import _read_table, _write_table
34except ImportError:
35    pq = None
36
37
38try:
39    import pandas as pd
40    import pandas.testing as tm
41
42    from pyarrow.tests.pandas_examples import dataframe_with_lists
43    from pyarrow.tests.parquet.common import alltypes_sample
44except ImportError:
45    pd = tm = None
46
47
48pytestmark = pytest.mark.parquet
49
50
51def test_parquet_invalid_version(tempdir):
52    table = pa.table({'a': [1, 2, 3]})
53    with pytest.raises(ValueError, match="Unsupported Parquet format version"):
54        _write_table(table, tempdir / 'test_version.parquet', version="2.2")
55    with pytest.raises(ValueError, match="Unsupported Parquet data page " +
56                       "version"):
57        _write_table(table, tempdir / 'test_version.parquet',
58                     data_page_version="2.2")
59
60
61@parametrize_legacy_dataset
62def test_set_data_page_size(use_legacy_dataset):
63    arr = pa.array([1, 2, 3] * 100000)
64    t = pa.Table.from_arrays([arr], names=['f0'])
65
66    # 128K, 512K
67    page_sizes = [2 << 16, 2 << 18]
68    for target_page_size in page_sizes:
69        _check_roundtrip(t, data_page_size=target_page_size,
70                         use_legacy_dataset=use_legacy_dataset)
71
72
73@pytest.mark.pandas
74@parametrize_legacy_dataset
75def test_chunked_table_write(use_legacy_dataset):
76    # ARROW-232
77    tables = []
78    batch = pa.RecordBatch.from_pandas(alltypes_sample(size=10))
79    tables.append(pa.Table.from_batches([batch] * 3))
80    df, _ = dataframe_with_lists()
81    batch = pa.RecordBatch.from_pandas(df)
82    tables.append(pa.Table.from_batches([batch] * 3))
83
84    for data_page_version in ['1.0', '2.0']:
85        for use_dictionary in [True, False]:
86            for table in tables:
87                _check_roundtrip(
88                    table, version='2.6',
89                    use_legacy_dataset=use_legacy_dataset,
90                    data_page_version=data_page_version,
91                    use_dictionary=use_dictionary)
92
93
94@pytest.mark.pandas
95@parametrize_legacy_dataset
96def test_memory_map(tempdir, use_legacy_dataset):
97    df = alltypes_sample(size=10)
98
99    table = pa.Table.from_pandas(df)
100    _check_roundtrip(table, read_table_kwargs={'memory_map': True},
101                     version='2.6', use_legacy_dataset=use_legacy_dataset)
102
103    filename = str(tempdir / 'tmp_file')
104    with open(filename, 'wb') as f:
105        _write_table(table, f, version='2.6')
106    table_read = pq.read_pandas(filename, memory_map=True,
107                                use_legacy_dataset=use_legacy_dataset)
108    assert table_read.equals(table)
109
110
111@pytest.mark.pandas
112@parametrize_legacy_dataset
113def test_enable_buffered_stream(tempdir, use_legacy_dataset):
114    df = alltypes_sample(size=10)
115
116    table = pa.Table.from_pandas(df)
117    _check_roundtrip(table, read_table_kwargs={'buffer_size': 1025},
118                     version='2.6', use_legacy_dataset=use_legacy_dataset)
119
120    filename = str(tempdir / 'tmp_file')
121    with open(filename, 'wb') as f:
122        _write_table(table, f, version='2.6')
123    table_read = pq.read_pandas(filename, buffer_size=4096,
124                                use_legacy_dataset=use_legacy_dataset)
125    assert table_read.equals(table)
126
127
128@parametrize_legacy_dataset
129def test_special_chars_filename(tempdir, use_legacy_dataset):
130    table = pa.Table.from_arrays([pa.array([42])], ["ints"])
131    filename = "foo # bar"
132    path = tempdir / filename
133    assert not path.exists()
134    _write_table(table, str(path))
135    assert path.exists()
136    table_read = _read_table(str(path), use_legacy_dataset=use_legacy_dataset)
137    assert table_read.equals(table)
138
139
140@parametrize_legacy_dataset
141def test_invalid_source(use_legacy_dataset):
142    # Test that we provide an helpful error message pointing out
143    # that None wasn't expected when trying to open a Parquet None file.
144    #
145    # Depending on use_legacy_dataset the message changes slightly
146    # but in both cases it should point out that None wasn't expected.
147    with pytest.raises(TypeError, match="None"):
148        pq.read_table(None, use_legacy_dataset=use_legacy_dataset)
149
150    with pytest.raises(TypeError, match="None"):
151        pq.ParquetFile(None)
152
153
154@pytest.mark.slow
155def test_file_with_over_int16_max_row_groups():
156    # PARQUET-1857: Parquet encryption support introduced a INT16_MAX upper
157    # limit on the number of row groups, but this limit only impacts files with
158    # encrypted row group metadata because of the int16 row group ordinal used
159    # in the Parquet Thrift metadata. Unencrypted files are not impacted, so
160    # this test checks that it works (even if it isn't a good idea)
161    t = pa.table([list(range(40000))], names=['f0'])
162    _check_roundtrip(t, row_group_size=1)
163
164
165@pytest.mark.pandas
166@parametrize_legacy_dataset
167def test_empty_table_roundtrip(use_legacy_dataset):
168    df = alltypes_sample(size=10)
169
170    # Create a non-empty table to infer the types correctly, then slice to 0
171    table = pa.Table.from_pandas(df)
172    table = pa.Table.from_arrays(
173        [col.chunk(0)[:0] for col in table.itercolumns()],
174        names=table.schema.names)
175
176    assert table.schema.field('null').type == pa.null()
177    assert table.schema.field('null_list').type == pa.list_(pa.null())
178    _check_roundtrip(
179        table, version='2.6', use_legacy_dataset=use_legacy_dataset)
180
181
182@pytest.mark.pandas
183@parametrize_legacy_dataset
184def test_empty_table_no_columns(use_legacy_dataset):
185    df = pd.DataFrame()
186    empty = pa.Table.from_pandas(df, preserve_index=False)
187    _check_roundtrip(empty, use_legacy_dataset=use_legacy_dataset)
188
189
190@parametrize_legacy_dataset
191def test_write_nested_zero_length_array_chunk_failure(use_legacy_dataset):
192    # Bug report in ARROW-3792
193    cols = OrderedDict(
194        int32=pa.int32(),
195        list_string=pa.list_(pa.string())
196    )
197    data = [[], [OrderedDict(int32=1, list_string=('G',)), ]]
198
199    # This produces a table with a column like
200    # <Column name='list_string' type=ListType(list<item: string>)>
201    # [
202    #   [],
203    #   [
204    #     [
205    #       "G"
206    #     ]
207    #   ]
208    # ]
209    #
210    # Each column is a ChunkedArray with 2 elements
211    my_arrays = [pa.array(batch, type=pa.struct(cols)).flatten()
212                 for batch in data]
213    my_batches = [pa.RecordBatch.from_arrays(batch, schema=pa.schema(cols))
214                  for batch in my_arrays]
215    tbl = pa.Table.from_batches(my_batches, pa.schema(cols))
216    _check_roundtrip(tbl, use_legacy_dataset=use_legacy_dataset)
217
218
219@pytest.mark.pandas
220@parametrize_legacy_dataset
221def test_multiple_path_types(tempdir, use_legacy_dataset):
222    # Test compatibility with PEP 519 path-like objects
223    path = tempdir / 'zzz.parquet'
224    df = pd.DataFrame({'x': np.arange(10, dtype=np.int64)})
225    _write_table(df, path)
226    table_read = _read_table(path, use_legacy_dataset=use_legacy_dataset)
227    df_read = table_read.to_pandas()
228    tm.assert_frame_equal(df, df_read)
229
230    # Test compatibility with plain string paths
231    path = str(tempdir) + 'zzz.parquet'
232    df = pd.DataFrame({'x': np.arange(10, dtype=np.int64)})
233    _write_table(df, path)
234    table_read = _read_table(path, use_legacy_dataset=use_legacy_dataset)
235    df_read = table_read.to_pandas()
236    tm.assert_frame_equal(df, df_read)
237
238
239@parametrize_legacy_dataset
240def test_fspath(tempdir, use_legacy_dataset):
241    # ARROW-12472 support __fspath__ objects without using str()
242    path = tempdir / "test.parquet"
243    table = pa.table({"a": [1, 2, 3]})
244    _write_table(table, path)
245
246    fs_protocol_obj = util.FSProtocolClass(path)
247
248    result = _read_table(
249        fs_protocol_obj, use_legacy_dataset=use_legacy_dataset
250    )
251    assert result.equals(table)
252
253    # combined with non-local filesystem raises
254    with pytest.raises(TypeError):
255        _read_table(fs_protocol_obj, filesystem=FileSystem())
256
257
258@pytest.mark.dataset
259@parametrize_legacy_dataset
260@pytest.mark.parametrize("filesystem", [
261    None, fs.LocalFileSystem(), LocalFileSystem._get_instance()
262])
263def test_relative_paths(tempdir, use_legacy_dataset, filesystem):
264    # reading and writing from relative paths
265    table = pa.table({"a": [1, 2, 3]})
266
267    # reading
268    pq.write_table(table, str(tempdir / "data.parquet"))
269    with util.change_cwd(tempdir):
270        result = pq.read_table("data.parquet", filesystem=filesystem,
271                               use_legacy_dataset=use_legacy_dataset)
272    assert result.equals(table)
273
274    # writing
275    with util.change_cwd(tempdir):
276        pq.write_table(table, "data2.parquet", filesystem=filesystem)
277    result = pq.read_table(tempdir / "data2.parquet")
278    assert result.equals(table)
279
280
281def test_read_non_existing_file():
282    # ensure we have a proper error message
283    with pytest.raises(FileNotFoundError):
284        pq.read_table('i-am-not-existing.parquet')
285
286
287def test_file_error_python_exception():
288    class BogusFile(io.BytesIO):
289        def read(self, *args):
290            raise ZeroDivisionError("zorglub")
291
292        def seek(self, *args):
293            raise ZeroDivisionError("zorglub")
294
295    # ensure the Python exception is restored
296    with pytest.raises(ZeroDivisionError, match="zorglub"):
297        pq.read_table(BogusFile(b""))
298
299
300@parametrize_legacy_dataset
301def test_parquet_read_from_buffer(tempdir, use_legacy_dataset):
302    # reading from a buffer from python's open()
303    table = pa.table({"a": [1, 2, 3]})
304    pq.write_table(table, str(tempdir / "data.parquet"))
305
306    with open(str(tempdir / "data.parquet"), "rb") as f:
307        result = pq.read_table(f, use_legacy_dataset=use_legacy_dataset)
308    assert result.equals(table)
309
310    with open(str(tempdir / "data.parquet"), "rb") as f:
311        result = pq.read_table(pa.PythonFile(f),
312                               use_legacy_dataset=use_legacy_dataset)
313    assert result.equals(table)
314
315
316@parametrize_legacy_dataset
317def test_byte_stream_split(use_legacy_dataset):
318    # This is only a smoke test.
319    arr_float = pa.array(list(map(float, range(100))))
320    arr_int = pa.array(list(map(int, range(100))))
321    data_float = [arr_float, arr_float]
322    table = pa.Table.from_arrays(data_float, names=['a', 'b'])
323
324    # Check with byte_stream_split for both columns.
325    _check_roundtrip(table, expected=table, compression="gzip",
326                     use_dictionary=False, use_byte_stream_split=True)
327
328    # Check with byte_stream_split for column 'b' and dictionary
329    # for column 'a'.
330    _check_roundtrip(table, expected=table, compression="gzip",
331                     use_dictionary=['a'],
332                     use_byte_stream_split=['b'])
333
334    # Check with a collision for both columns.
335    _check_roundtrip(table, expected=table, compression="gzip",
336                     use_dictionary=['a', 'b'],
337                     use_byte_stream_split=['a', 'b'])
338
339    # Check with mixed column types.
340    mixed_table = pa.Table.from_arrays([arr_float, arr_int],
341                                       names=['a', 'b'])
342    _check_roundtrip(mixed_table, expected=mixed_table,
343                     use_dictionary=['b'],
344                     use_byte_stream_split=['a'])
345
346    # Try to use the wrong data type with the byte_stream_split encoding.
347    # This should throw an exception.
348    table = pa.Table.from_arrays([arr_int], names=['tmp'])
349    with pytest.raises(IOError):
350        _check_roundtrip(table, expected=table, use_byte_stream_split=True,
351                         use_dictionary=False,
352                         use_legacy_dataset=use_legacy_dataset)
353
354
355@parametrize_legacy_dataset
356def test_compression_level(use_legacy_dataset):
357    arr = pa.array(list(map(int, range(1000))))
358    data = [arr, arr]
359    table = pa.Table.from_arrays(data, names=['a', 'b'])
360
361    # Check one compression level.
362    _check_roundtrip(table, expected=table, compression="gzip",
363                     compression_level=1,
364                     use_legacy_dataset=use_legacy_dataset)
365
366    # Check another one to make sure that compression_level=1 does not
367    # coincide with the default one in Arrow.
368    _check_roundtrip(table, expected=table, compression="gzip",
369                     compression_level=5,
370                     use_legacy_dataset=use_legacy_dataset)
371
372    # Check that the user can provide a compression per column
373    _check_roundtrip(table, expected=table,
374                     compression={'a': "gzip", 'b': "snappy"},
375                     use_legacy_dataset=use_legacy_dataset)
376
377    # Check that the user can provide a compression level per column
378    _check_roundtrip(table, expected=table, compression="gzip",
379                     compression_level={'a': 2, 'b': 3},
380                     use_legacy_dataset=use_legacy_dataset)
381
382    # Check that specifying a compression level for a codec which does allow
383    # specifying one, results into an error.
384    # Uncompressed, snappy, lz4 and lzo do not support specifying a compression
385    # level.
386    # GZIP (zlib) allows for specifying a compression level but as of up
387    # to version 1.2.11 the valid range is [-1, 9].
388    invalid_combinations = [("snappy", 4), ("lz4", 5), ("gzip", -1337),
389                            ("None", 444), ("lzo", 14)]
390    buf = io.BytesIO()
391    for (codec, level) in invalid_combinations:
392        with pytest.raises((ValueError, OSError)):
393            _write_table(table, buf, compression=codec,
394                         compression_level=level)
395
396
397def test_sanitized_spark_field_names():
398    a0 = pa.array([0, 1, 2, 3, 4])
399    name = 'prohib; ,\t{}'
400    table = pa.Table.from_arrays([a0], [name])
401
402    result = _roundtrip_table(table, write_table_kwargs={'flavor': 'spark'})
403
404    expected_name = 'prohib______'
405    assert result.schema[0].name == expected_name
406
407
408@pytest.mark.pandas
409@parametrize_legacy_dataset
410def test_multithreaded_read(use_legacy_dataset):
411    df = alltypes_sample(size=10000)
412
413    table = pa.Table.from_pandas(df)
414
415    buf = io.BytesIO()
416    _write_table(table, buf, compression='SNAPPY', version='2.6')
417
418    buf.seek(0)
419    table1 = _read_table(
420        buf, use_threads=True, use_legacy_dataset=use_legacy_dataset)
421
422    buf.seek(0)
423    table2 = _read_table(
424        buf, use_threads=False, use_legacy_dataset=use_legacy_dataset)
425
426    assert table1.equals(table2)
427
428
429@pytest.mark.pandas
430@parametrize_legacy_dataset
431def test_min_chunksize(use_legacy_dataset):
432    data = pd.DataFrame([np.arange(4)], columns=['A', 'B', 'C', 'D'])
433    table = pa.Table.from_pandas(data.reset_index())
434
435    buf = io.BytesIO()
436    _write_table(table, buf, chunk_size=-1)
437
438    buf.seek(0)
439    result = _read_table(buf, use_legacy_dataset=use_legacy_dataset)
440
441    assert result.equals(table)
442
443    with pytest.raises(ValueError):
444        _write_table(table, buf, chunk_size=0)
445
446
447@pytest.mark.pandas
448def test_write_error_deletes_incomplete_file(tempdir):
449    # ARROW-1285
450    df = pd.DataFrame({'a': list('abc'),
451                       'b': list(range(1, 4)),
452                       'c': np.arange(3, 6).astype('u1'),
453                       'd': np.arange(4.0, 7.0, dtype='float64'),
454                       'e': [True, False, True],
455                       'f': pd.Categorical(list('abc')),
456                       'g': pd.date_range('20130101', periods=3),
457                       'h': pd.date_range('20130101', periods=3,
458                                          tz='US/Eastern'),
459                       'i': pd.date_range('20130101', periods=3, freq='ns')})
460
461    pdf = pa.Table.from_pandas(df)
462
463    filename = tempdir / 'tmp_file'
464    try:
465        _write_table(pdf, filename)
466    except pa.ArrowException:
467        pass
468
469    assert not filename.exists()
470
471
472@parametrize_legacy_dataset
473def test_read_non_existent_file(tempdir, use_legacy_dataset):
474    path = 'non-existent-file.parquet'
475    try:
476        pq.read_table(path, use_legacy_dataset=use_legacy_dataset)
477    except Exception as e:
478        assert path in e.args[0]
479
480
481@parametrize_legacy_dataset
482def test_read_table_doesnt_warn(datadir, use_legacy_dataset):
483    with pytest.warns(None) as record:
484        pq.read_table(datadir / 'v0.7.1.parquet',
485                      use_legacy_dataset=use_legacy_dataset)
486
487    assert len(record) == 0
488
489
490@pytest.mark.pandas
491@parametrize_legacy_dataset
492def test_zlib_compression_bug(use_legacy_dataset):
493    # ARROW-3514: "zlib deflate failed, output buffer too small"
494    table = pa.Table.from_arrays([pa.array(['abc', 'def'])], ['some_col'])
495    f = io.BytesIO()
496    pq.write_table(table, f, compression='gzip')
497
498    f.seek(0)
499    roundtrip = pq.read_table(f, use_legacy_dataset=use_legacy_dataset)
500    tm.assert_frame_equal(roundtrip.to_pandas(), table.to_pandas())
501
502
503@parametrize_legacy_dataset
504def test_parquet_file_too_small(tempdir, use_legacy_dataset):
505    path = str(tempdir / "test.parquet")
506    # TODO(dataset) with datasets API it raises OSError instead
507    with pytest.raises((pa.ArrowInvalid, OSError),
508                       match='size is 0 bytes'):
509        with open(path, 'wb') as f:
510            pass
511        pq.read_table(path, use_legacy_dataset=use_legacy_dataset)
512
513    with pytest.raises((pa.ArrowInvalid, OSError),
514                       match='size is 4 bytes'):
515        with open(path, 'wb') as f:
516            f.write(b'ffff')
517        pq.read_table(path, use_legacy_dataset=use_legacy_dataset)
518
519
520@pytest.mark.pandas
521@pytest.mark.fastparquet
522@pytest.mark.filterwarnings("ignore:RangeIndex:FutureWarning")
523@pytest.mark.filterwarnings("ignore:tostring:DeprecationWarning:fastparquet")
524def test_fastparquet_cross_compatibility(tempdir):
525    fp = pytest.importorskip('fastparquet')
526
527    df = pd.DataFrame(
528        {
529            "a": list("abc"),
530            "b": list(range(1, 4)),
531            "c": np.arange(4.0, 7.0, dtype="float64"),
532            "d": [True, False, True],
533            "e": pd.date_range("20130101", periods=3),
534            "f": pd.Categorical(["a", "b", "a"]),
535            # fastparquet writes list as BYTE_ARRAY JSON, so no roundtrip
536            # "g": [[1, 2], None, [1, 2, 3]],
537        }
538    )
539    table = pa.table(df)
540
541    # Arrow -> fastparquet
542    file_arrow = str(tempdir / "cross_compat_arrow.parquet")
543    pq.write_table(table, file_arrow, compression=None)
544
545    fp_file = fp.ParquetFile(file_arrow)
546    df_fp = fp_file.to_pandas()
547    tm.assert_frame_equal(df, df_fp)
548
549    # Fastparquet -> arrow
550    file_fastparquet = str(tempdir / "cross_compat_fastparquet.parquet")
551    fp.write(file_fastparquet, df)
552
553    table_fp = pq.read_pandas(file_fastparquet)
554    # for fastparquet written file, categoricals comes back as strings
555    # (no arrow schema in parquet metadata)
556    df['f'] = df['f'].astype(object)
557    tm.assert_frame_equal(table_fp.to_pandas(), df)
558
559
560@parametrize_legacy_dataset
561@pytest.mark.parametrize('array_factory', [
562    lambda: pa.array([0, None] * 10),
563    lambda: pa.array([0, None] * 10).dictionary_encode(),
564    lambda: pa.array(["", None] * 10),
565    lambda: pa.array(["", None] * 10).dictionary_encode(),
566])
567@pytest.mark.parametrize('use_dictionary', [False, True])
568@pytest.mark.parametrize('read_dictionary', [False, True])
569def test_buffer_contents(
570        array_factory, use_dictionary, read_dictionary, use_legacy_dataset
571):
572    # Test that null values are deterministically initialized to zero
573    # after a roundtrip through Parquet.
574    # See ARROW-8006 and ARROW-8011.
575    orig_table = pa.Table.from_pydict({"col": array_factory()})
576    bio = io.BytesIO()
577    pq.write_table(orig_table, bio, use_dictionary=True)
578    bio.seek(0)
579    read_dictionary = ['col'] if read_dictionary else None
580    table = pq.read_table(bio, use_threads=False,
581                          read_dictionary=read_dictionary,
582                          use_legacy_dataset=use_legacy_dataset)
583
584    for col in table.columns:
585        [chunk] = col.chunks
586        buf = chunk.buffers()[1]
587        assert buf.to_pybytes() == buf.size * b"\0"
588
589
590def test_parquet_compression_roundtrip(tempdir):
591    # ARROW-10480: ensure even with nonstandard Parquet file naming
592    # conventions, writing and then reading a file works. In
593    # particular, ensure that we don't automatically double-compress
594    # the stream due to auto-detecting the extension in the filename
595    table = pa.table([pa.array(range(4))], names=["ints"])
596    path = tempdir / "arrow-10480.pyarrow.gz"
597    pq.write_table(table, path, compression="GZIP")
598    result = pq.read_table(path)
599    assert result.equals(table)
600
601
602def test_empty_row_groups(tempdir):
603    # ARROW-3020
604    table = pa.Table.from_arrays([pa.array([], type='int32')], ['f0'])
605
606    path = tempdir / 'empty_row_groups.parquet'
607
608    num_groups = 3
609    with pq.ParquetWriter(path, table.schema) as writer:
610        for i in range(num_groups):
611            writer.write_table(table)
612
613    reader = pq.ParquetFile(path)
614    assert reader.metadata.num_row_groups == num_groups
615
616    for i in range(num_groups):
617        assert reader.read_row_group(i).equals(table)
618
619
620def test_reads_over_batch(tempdir):
621    data = [None] * (1 << 20)
622    data.append([1])
623    # Large list<int64> with mostly nones and one final
624    # value.  This should force batched reads when
625    # reading back.
626    table = pa.Table.from_arrays([data], ['column'])
627
628    path = tempdir / 'arrow-11607.parquet'
629    pq.write_table(table, path)
630    table2 = pq.read_table(path)
631    assert table == table2
632