1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements.  See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership.  The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License.  You may obtain a copy of the License at
8#
9#   http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied.  See the License for the
15# specific language governing permissions and limitations
16# under the License.
17
18import datetime
19from collections import OrderedDict
20
21import numpy as np
22import pytest
23
24import pyarrow as pa
25from pyarrow.tests.parquet.common import _check_roundtrip, make_sample_file
26
27try:
28    import pyarrow.parquet as pq
29    from pyarrow.tests.parquet.common import _write_table
30except ImportError:
31    pq = None
32
33
34try:
35    import pandas as pd
36    import pandas.testing as tm
37
38    from pyarrow.tests.parquet.common import alltypes_sample
39except ImportError:
40    pd = tm = None
41
42
43pytestmark = pytest.mark.parquet
44
45
46@pytest.mark.pandas
47def test_parquet_metadata_api():
48    df = alltypes_sample(size=10000)
49    df = df.reindex(columns=sorted(df.columns))
50    df.index = np.random.randint(0, 1000000, size=len(df))
51
52    fileh = make_sample_file(df)
53    ncols = len(df.columns)
54
55    # Series of sniff tests
56    meta = fileh.metadata
57    repr(meta)
58    assert meta.num_rows == len(df)
59    assert meta.num_columns == ncols + 1  # +1 for index
60    assert meta.num_row_groups == 1
61    assert meta.format_version == '2.6'
62    assert 'parquet-cpp' in meta.created_by
63    assert isinstance(meta.serialized_size, int)
64    assert isinstance(meta.metadata, dict)
65
66    # Schema
67    schema = fileh.schema
68    assert meta.schema is schema
69    assert len(schema) == ncols + 1  # +1 for index
70    repr(schema)
71
72    col = schema[0]
73    repr(col)
74    assert col.name == df.columns[0]
75    assert col.max_definition_level == 1
76    assert col.max_repetition_level == 0
77    assert col.max_repetition_level == 0
78
79    assert col.physical_type == 'BOOLEAN'
80    assert col.converted_type == 'NONE'
81
82    with pytest.raises(IndexError):
83        schema[ncols + 1]  # +1 for index
84
85    with pytest.raises(IndexError):
86        schema[-1]
87
88    # Row group
89    for rg in range(meta.num_row_groups):
90        rg_meta = meta.row_group(rg)
91        assert isinstance(rg_meta, pq.RowGroupMetaData)
92        repr(rg_meta)
93
94        for col in range(rg_meta.num_columns):
95            col_meta = rg_meta.column(col)
96            assert isinstance(col_meta, pq.ColumnChunkMetaData)
97            repr(col_meta)
98
99    with pytest.raises(IndexError):
100        meta.row_group(-1)
101
102    with pytest.raises(IndexError):
103        meta.row_group(meta.num_row_groups + 1)
104
105    rg_meta = meta.row_group(0)
106    assert rg_meta.num_rows == len(df)
107    assert rg_meta.num_columns == ncols + 1  # +1 for index
108    assert rg_meta.total_byte_size > 0
109
110    with pytest.raises(IndexError):
111        col_meta = rg_meta.column(-1)
112
113    with pytest.raises(IndexError):
114        col_meta = rg_meta.column(ncols + 2)
115
116    col_meta = rg_meta.column(0)
117    assert col_meta.file_offset > 0
118    assert col_meta.file_path == ''  # created from BytesIO
119    assert col_meta.physical_type == 'BOOLEAN'
120    assert col_meta.num_values == 10000
121    assert col_meta.path_in_schema == 'bool'
122    assert col_meta.is_stats_set is True
123    assert isinstance(col_meta.statistics, pq.Statistics)
124    assert col_meta.compression == 'SNAPPY'
125    assert col_meta.encodings == ('PLAIN', 'RLE')
126    assert col_meta.has_dictionary_page is False
127    assert col_meta.dictionary_page_offset is None
128    assert col_meta.data_page_offset > 0
129    assert col_meta.total_compressed_size > 0
130    assert col_meta.total_uncompressed_size > 0
131    with pytest.raises(NotImplementedError):
132        col_meta.has_index_page
133    with pytest.raises(NotImplementedError):
134        col_meta.index_page_offset
135
136
137def test_parquet_metadata_lifetime(tempdir):
138    # ARROW-6642 - ensure that chained access keeps parent objects alive
139    table = pa.table({'a': [1, 2, 3]})
140    pq.write_table(table, tempdir / 'test_metadata_segfault.parquet')
141    parquet_file = pq.ParquetFile(tempdir / 'test_metadata_segfault.parquet')
142    parquet_file.metadata.row_group(0).column(0).statistics
143
144
145@pytest.mark.pandas
146@pytest.mark.parametrize(
147    (
148        'data',
149        'type',
150        'physical_type',
151        'min_value',
152        'max_value',
153        'null_count',
154        'num_values',
155        'distinct_count'
156    ),
157    [
158        ([1, 2, 2, None, 4], pa.uint8(), 'INT32', 1, 4, 1, 4, 0),
159        ([1, 2, 2, None, 4], pa.uint16(), 'INT32', 1, 4, 1, 4, 0),
160        ([1, 2, 2, None, 4], pa.uint32(), 'INT32', 1, 4, 1, 4, 0),
161        ([1, 2, 2, None, 4], pa.uint64(), 'INT64', 1, 4, 1, 4, 0),
162        ([-1, 2, 2, None, 4], pa.int8(), 'INT32', -1, 4, 1, 4, 0),
163        ([-1, 2, 2, None, 4], pa.int16(), 'INT32', -1, 4, 1, 4, 0),
164        ([-1, 2, 2, None, 4], pa.int32(), 'INT32', -1, 4, 1, 4, 0),
165        ([-1, 2, 2, None, 4], pa.int64(), 'INT64', -1, 4, 1, 4, 0),
166        (
167            [-1.1, 2.2, 2.3, None, 4.4], pa.float32(),
168            'FLOAT', -1.1, 4.4, 1, 4, 0
169        ),
170        (
171            [-1.1, 2.2, 2.3, None, 4.4], pa.float64(),
172            'DOUBLE', -1.1, 4.4, 1, 4, 0
173        ),
174        (
175            ['', 'b', chr(1000), None, 'aaa'], pa.binary(),
176            'BYTE_ARRAY', b'', chr(1000).encode('utf-8'), 1, 4, 0
177        ),
178        (
179            [True, False, False, True, True], pa.bool_(),
180            'BOOLEAN', False, True, 0, 5, 0
181        ),
182        (
183            [b'\x00', b'b', b'12', None, b'aaa'], pa.binary(),
184            'BYTE_ARRAY', b'\x00', b'b', 1, 4, 0
185        ),
186    ]
187)
188def test_parquet_column_statistics_api(data, type, physical_type, min_value,
189                                       max_value, null_count, num_values,
190                                       distinct_count):
191    df = pd.DataFrame({'data': data})
192    schema = pa.schema([pa.field('data', type)])
193    table = pa.Table.from_pandas(df, schema=schema, safe=False)
194    fileh = make_sample_file(table)
195
196    meta = fileh.metadata
197
198    rg_meta = meta.row_group(0)
199    col_meta = rg_meta.column(0)
200
201    stat = col_meta.statistics
202    assert stat.has_min_max
203    assert _close(type, stat.min, min_value)
204    assert _close(type, stat.max, max_value)
205    assert stat.null_count == null_count
206    assert stat.num_values == num_values
207    # TODO(kszucs) until parquet-cpp API doesn't expose HasDistinctCount
208    # method, missing distinct_count is represented as zero instead of None
209    assert stat.distinct_count == distinct_count
210    assert stat.physical_type == physical_type
211
212
213def _close(type, left, right):
214    if type == pa.float32():
215        return abs(left - right) < 1E-7
216    elif type == pa.float64():
217        return abs(left - right) < 1E-13
218    else:
219        return left == right
220
221
222# ARROW-6339
223@pytest.mark.pandas
224def test_parquet_raise_on_unset_statistics():
225    df = pd.DataFrame({"t": pd.Series([pd.NaT], dtype="datetime64[ns]")})
226    meta = make_sample_file(pa.Table.from_pandas(df)).metadata
227
228    assert not meta.row_group(0).column(0).statistics.has_min_max
229    assert meta.row_group(0).column(0).statistics.max is None
230
231
232def test_statistics_convert_logical_types(tempdir):
233    # ARROW-5166, ARROW-4139
234
235    # (min, max, type)
236    cases = [(10, 11164359321221007157, pa.uint64()),
237             (10, 4294967295, pa.uint32()),
238             ("ähnlich", "öffentlich", pa.utf8()),
239             (datetime.time(10, 30, 0, 1000), datetime.time(15, 30, 0, 1000),
240              pa.time32('ms')),
241             (datetime.time(10, 30, 0, 1000), datetime.time(15, 30, 0, 1000),
242              pa.time64('us')),
243             (datetime.datetime(2019, 6, 24, 0, 0, 0, 1000),
244              datetime.datetime(2019, 6, 25, 0, 0, 0, 1000),
245              pa.timestamp('ms')),
246             (datetime.datetime(2019, 6, 24, 0, 0, 0, 1000),
247              datetime.datetime(2019, 6, 25, 0, 0, 0, 1000),
248              pa.timestamp('us'))]
249
250    for i, (min_val, max_val, typ) in enumerate(cases):
251        t = pa.Table.from_arrays([pa.array([min_val, max_val], type=typ)],
252                                 ['col'])
253        path = str(tempdir / ('example{}.parquet'.format(i)))
254        pq.write_table(t, path, version='2.6')
255        pf = pq.ParquetFile(path)
256        stats = pf.metadata.row_group(0).column(0).statistics
257        assert stats.min == min_val
258        assert stats.max == max_val
259
260
261def test_parquet_write_disable_statistics(tempdir):
262    table = pa.Table.from_pydict(
263        OrderedDict([
264            ('a', pa.array([1, 2, 3])),
265            ('b', pa.array(['a', 'b', 'c']))
266        ])
267    )
268    _write_table(table, tempdir / 'data.parquet')
269    meta = pq.read_metadata(tempdir / 'data.parquet')
270    for col in [0, 1]:
271        cc = meta.row_group(0).column(col)
272        assert cc.is_stats_set is True
273        assert cc.statistics is not None
274
275    _write_table(table, tempdir / 'data2.parquet', write_statistics=False)
276    meta = pq.read_metadata(tempdir / 'data2.parquet')
277    for col in [0, 1]:
278        cc = meta.row_group(0).column(col)
279        assert cc.is_stats_set is False
280        assert cc.statistics is None
281
282    _write_table(table, tempdir / 'data3.parquet', write_statistics=['a'])
283    meta = pq.read_metadata(tempdir / 'data3.parquet')
284    cc_a = meta.row_group(0).column(0)
285    cc_b = meta.row_group(0).column(1)
286    assert cc_a.is_stats_set is True
287    assert cc_b.is_stats_set is False
288    assert cc_a.statistics is not None
289    assert cc_b.statistics is None
290
291
292def test_field_id_metadata():
293    # ARROW-7080
294    field_id = b'PARQUET:field_id'
295    inner = pa.field('inner', pa.int32(), metadata={field_id: b'100'})
296    middle = pa.field('middle', pa.struct(
297        [inner]), metadata={field_id: b'101'})
298    fields = [
299        pa.field('basic', pa.int32(), metadata={
300                 b'other': b'abc', field_id: b'1'}),
301        pa.field(
302            'list',
303            pa.list_(pa.field('list-inner', pa.int32(),
304                              metadata={field_id: b'10'})),
305            metadata={field_id: b'11'}),
306        pa.field('struct', pa.struct([middle]), metadata={field_id: b'102'}),
307        pa.field('no-metadata', pa.int32()),
308        pa.field('non-integral-field-id', pa.int32(),
309                 metadata={field_id: b'xyz'}),
310        pa.field('negative-field-id', pa.int32(),
311                 metadata={field_id: b'-1000'})
312    ]
313    arrs = [[] for _ in fields]
314    table = pa.table(arrs, schema=pa.schema(fields))
315
316    bio = pa.BufferOutputStream()
317    pq.write_table(table, bio)
318    contents = bio.getvalue()
319
320    pf = pq.ParquetFile(pa.BufferReader(contents))
321    schema = pf.schema_arrow
322
323    assert schema[0].metadata[field_id] == b'1'
324    assert schema[0].metadata[b'other'] == b'abc'
325
326    list_field = schema[1]
327    assert list_field.metadata[field_id] == b'11'
328
329    list_item_field = list_field.type.value_field
330    assert list_item_field.metadata[field_id] == b'10'
331
332    struct_field = schema[2]
333    assert struct_field.metadata[field_id] == b'102'
334
335    struct_middle_field = struct_field.type[0]
336    assert struct_middle_field.metadata[field_id] == b'101'
337
338    struct_inner_field = struct_middle_field.type[0]
339    assert struct_inner_field.metadata[field_id] == b'100'
340
341    assert schema[3].metadata is None
342    # Invalid input is passed through (ok) but does not
343    # have field_id in parquet (not tested)
344    assert schema[4].metadata[field_id] == b'xyz'
345    assert schema[5].metadata[field_id] == b'-1000'
346
347
348@pytest.mark.pandas
349def test_multi_dataset_metadata(tempdir):
350    filenames = ["ARROW-1983-dataset.0", "ARROW-1983-dataset.1"]
351    metapath = str(tempdir / "_metadata")
352
353    # create a test dataset
354    df = pd.DataFrame({
355        'one': [1, 2, 3],
356        'two': [-1, -2, -3],
357        'three': [[1, 2], [2, 3], [3, 4]],
358    })
359    table = pa.Table.from_pandas(df)
360
361    # write dataset twice and collect/merge metadata
362    _meta = None
363    for filename in filenames:
364        meta = []
365        pq.write_table(table, str(tempdir / filename),
366                       metadata_collector=meta)
367        meta[0].set_file_path(filename)
368        if _meta is None:
369            _meta = meta[0]
370        else:
371            _meta.append_row_groups(meta[0])
372
373    # Write merged metadata-only file
374    with open(metapath, "wb") as f:
375        _meta.write_metadata_file(f)
376
377    # Read back the metadata
378    meta = pq.read_metadata(metapath)
379    md = meta.to_dict()
380    _md = _meta.to_dict()
381    for key in _md:
382        if key != 'serialized_size':
383            assert _md[key] == md[key]
384    assert _md['num_columns'] == 3
385    assert _md['num_rows'] == 6
386    assert _md['num_row_groups'] == 2
387    assert _md['serialized_size'] == 0
388    assert md['serialized_size'] > 0
389
390
391def test_write_metadata(tempdir):
392    path = str(tempdir / "metadata")
393    schema = pa.schema([("a", "int64"), ("b", "float64")])
394
395    # write a pyarrow schema
396    pq.write_metadata(schema, path)
397    parquet_meta = pq.read_metadata(path)
398    schema_as_arrow = parquet_meta.schema.to_arrow_schema()
399    assert schema_as_arrow.equals(schema)
400
401    # ARROW-8980: Check that the ARROW:schema metadata key was removed
402    if schema_as_arrow.metadata:
403        assert b'ARROW:schema' not in schema_as_arrow.metadata
404
405    # pass through writer keyword arguments
406    for version in ["1.0", "2.0", "2.4", "2.6"]:
407        pq.write_metadata(schema, path, version=version)
408        parquet_meta = pq.read_metadata(path)
409        # The version is stored as a single integer in the Parquet metadata,
410        # so it cannot correctly express dotted format versions
411        expected_version = "1.0" if version == "1.0" else "2.6"
412        assert parquet_meta.format_version == expected_version
413
414    # metadata_collector: list of FileMetaData objects
415    table = pa.table({'a': [1, 2], 'b': [.1, .2]}, schema=schema)
416    pq.write_table(table, tempdir / "data.parquet")
417    parquet_meta = pq.read_metadata(str(tempdir / "data.parquet"))
418    pq.write_metadata(
419        schema, path, metadata_collector=[parquet_meta, parquet_meta]
420    )
421    parquet_meta_mult = pq.read_metadata(path)
422    assert parquet_meta_mult.num_row_groups == 2
423
424    # append metadata with different schema raises an error
425    with pytest.raises(RuntimeError, match="requires equal schemas"):
426        pq.write_metadata(
427            pa.schema([("a", "int32"), ("b", "null")]),
428            path, metadata_collector=[parquet_meta, parquet_meta]
429        )
430
431
432def test_table_large_metadata():
433    # ARROW-8694
434    my_schema = pa.schema([pa.field('f0', 'double')],
435                          metadata={'large': 'x' * 10000000})
436
437    table = pa.table([np.arange(10)], schema=my_schema)
438    _check_roundtrip(table)
439
440
441@pytest.mark.pandas
442def test_compare_schemas():
443    df = alltypes_sample(size=10000)
444
445    fileh = make_sample_file(df)
446    fileh2 = make_sample_file(df)
447    fileh3 = make_sample_file(df[df.columns[::2]])
448
449    # ParquetSchema
450    assert isinstance(fileh.schema, pq.ParquetSchema)
451    assert fileh.schema.equals(fileh.schema)
452    assert fileh.schema == fileh.schema
453    assert fileh.schema.equals(fileh2.schema)
454    assert fileh.schema == fileh2.schema
455    assert fileh.schema != 'arbitrary object'
456    assert not fileh.schema.equals(fileh3.schema)
457    assert fileh.schema != fileh3.schema
458
459    # ColumnSchema
460    assert isinstance(fileh.schema[0], pq.ColumnSchema)
461    assert fileh.schema[0].equals(fileh.schema[0])
462    assert fileh.schema[0] == fileh.schema[0]
463    assert not fileh.schema[0].equals(fileh.schema[1])
464    assert fileh.schema[0] != fileh.schema[1]
465    assert fileh.schema[0] != 'arbitrary object'
466
467
468@pytest.mark.pandas
469def test_read_schema(tempdir):
470    N = 100
471    df = pd.DataFrame({
472        'index': np.arange(N),
473        'values': np.random.randn(N)
474    }, columns=['index', 'values'])
475
476    data_path = tempdir / 'test.parquet'
477
478    table = pa.Table.from_pandas(df)
479    _write_table(table, data_path)
480
481    read1 = pq.read_schema(data_path)
482    read2 = pq.read_schema(data_path, memory_map=True)
483    assert table.schema.equals(read1)
484    assert table.schema.equals(read2)
485
486    assert table.schema.metadata[b'pandas'] == read1.metadata[b'pandas']
487
488
489def test_parquet_metadata_empty_to_dict(tempdir):
490    # https://issues.apache.org/jira/browse/ARROW-10146
491    table = pa.table({"a": pa.array([], type="int64")})
492    pq.write_table(table, tempdir / "data.parquet")
493    metadata = pq.read_metadata(tempdir / "data.parquet")
494    # ensure this doesn't error / statistics set to None
495    metadata_dict = metadata.to_dict()
496    assert len(metadata_dict["row_groups"]) == 1
497    assert len(metadata_dict["row_groups"][0]["columns"]) == 1
498    assert metadata_dict["row_groups"][0]["columns"][0]["statistics"] is None
499
500
501@pytest.mark.slow
502@pytest.mark.large_memory
503def test_metadata_exceeds_message_size():
504    # ARROW-13655: Thrift may enable a defaut message size that limits
505    # the size of Parquet metadata that can be written.
506    NCOLS = 1000
507    NREPEATS = 4000
508
509    table = pa.table({str(i): np.random.randn(10) for i in range(NCOLS)})
510
511    with pa.BufferOutputStream() as out:
512        pq.write_table(table, out)
513        buf = out.getvalue()
514
515    original_metadata = pq.read_metadata(pa.BufferReader(buf))
516    metadata = pq.read_metadata(pa.BufferReader(buf))
517    for i in range(NREPEATS):
518        metadata.append_row_groups(original_metadata)
519
520    with pa.BufferOutputStream() as out:
521        metadata.write_metadata_file(out)
522        buf = out.getvalue()
523
524    metadata = pq.read_metadata(pa.BufferReader(buf))
525