1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements.  See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership.  The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License.  You may obtain a copy of the License at
8#
9#   http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied.  See the License for the
15# specific language governing permissions and limitations
16# under the License.
17
18import decimal
19import io
20
21import numpy as np
22import pytest
23
24import pyarrow as pa
25from pyarrow.tests import util
26from pyarrow.tests.parquet.common import (_check_roundtrip,
27                                          parametrize_legacy_dataset)
28
29try:
30    import pyarrow.parquet as pq
31    from pyarrow.tests.parquet.common import _read_table, _write_table
32except ImportError:
33    pq = None
34
35
36try:
37    import pandas as pd
38    import pandas.testing as tm
39
40    from pyarrow.tests.pandas_examples import (dataframe_with_arrays,
41                                               dataframe_with_lists)
42    from pyarrow.tests.parquet.common import alltypes_sample
43except ImportError:
44    pd = tm = None
45
46
47pytestmark = pytest.mark.parquet
48
49
50# General roundtrip of data types
51# -----------------------------------------------------------------------------
52
53
54@pytest.mark.pandas
55@parametrize_legacy_dataset
56@pytest.mark.parametrize('chunk_size', [None, 1000])
57def test_parquet_2_0_roundtrip(tempdir, chunk_size, use_legacy_dataset):
58    df = alltypes_sample(size=10000, categorical=True)
59
60    filename = tempdir / 'pandas_roundtrip.parquet'
61    arrow_table = pa.Table.from_pandas(df)
62    assert arrow_table.schema.pandas_metadata is not None
63
64    _write_table(arrow_table, filename, version='2.6',
65                 coerce_timestamps='ms', chunk_size=chunk_size)
66    table_read = pq.read_pandas(
67        filename, use_legacy_dataset=use_legacy_dataset)
68    assert table_read.schema.pandas_metadata is not None
69
70    read_metadata = table_read.schema.metadata
71    assert arrow_table.schema.metadata == read_metadata
72
73    df_read = table_read.to_pandas()
74    tm.assert_frame_equal(df, df_read)
75
76
77@pytest.mark.pandas
78@parametrize_legacy_dataset
79def test_parquet_1_0_roundtrip(tempdir, use_legacy_dataset):
80    size = 10000
81    np.random.seed(0)
82    df = pd.DataFrame({
83        'uint8': np.arange(size, dtype=np.uint8),
84        'uint16': np.arange(size, dtype=np.uint16),
85        'uint32': np.arange(size, dtype=np.uint32),
86        'uint64': np.arange(size, dtype=np.uint64),
87        'int8': np.arange(size, dtype=np.int16),
88        'int16': np.arange(size, dtype=np.int16),
89        'int32': np.arange(size, dtype=np.int32),
90        'int64': np.arange(size, dtype=np.int64),
91        'float32': np.arange(size, dtype=np.float32),
92        'float64': np.arange(size, dtype=np.float64),
93        'bool': np.random.randn(size) > 0,
94        'str': [str(x) for x in range(size)],
95        'str_with_nulls': [None] + [str(x) for x in range(size - 2)] + [None],
96        'empty_str': [''] * size
97    })
98    filename = tempdir / 'pandas_roundtrip.parquet'
99    arrow_table = pa.Table.from_pandas(df)
100    _write_table(arrow_table, filename, version='1.0')
101    table_read = _read_table(filename, use_legacy_dataset=use_legacy_dataset)
102    df_read = table_read.to_pandas()
103
104    # We pass uint32_t as int64_t if we write Parquet version 1.0
105    df['uint32'] = df['uint32'].values.astype(np.int64)
106
107    tm.assert_frame_equal(df, df_read)
108
109
110# Dictionary
111# -----------------------------------------------------------------------------
112
113
114def _simple_table_write_read(table, use_legacy_dataset):
115    bio = pa.BufferOutputStream()
116    pq.write_table(table, bio)
117    contents = bio.getvalue()
118    return pq.read_table(
119        pa.BufferReader(contents), use_legacy_dataset=use_legacy_dataset
120    )
121
122
123@pytest.mark.pandas
124@parametrize_legacy_dataset
125def test_direct_read_dictionary(use_legacy_dataset):
126    # ARROW-3325
127    repeats = 10
128    nunique = 5
129
130    data = [
131        [util.rands(10) for i in range(nunique)] * repeats,
132
133    ]
134    table = pa.table(data, names=['f0'])
135
136    bio = pa.BufferOutputStream()
137    pq.write_table(table, bio)
138    contents = bio.getvalue()
139
140    result = pq.read_table(pa.BufferReader(contents),
141                           read_dictionary=['f0'],
142                           use_legacy_dataset=use_legacy_dataset)
143
144    # Compute dictionary-encoded subfield
145    expected = pa.table([table[0].dictionary_encode()], names=['f0'])
146    assert result.equals(expected)
147
148
149@pytest.mark.pandas
150@parametrize_legacy_dataset
151def test_direct_read_dictionary_subfield(use_legacy_dataset):
152    repeats = 10
153    nunique = 5
154
155    data = [
156        [[util.rands(10)] for i in range(nunique)] * repeats,
157    ]
158    table = pa.table(data, names=['f0'])
159
160    bio = pa.BufferOutputStream()
161    pq.write_table(table, bio)
162    contents = bio.getvalue()
163    result = pq.read_table(pa.BufferReader(contents),
164                           read_dictionary=['f0.list.item'],
165                           use_legacy_dataset=use_legacy_dataset)
166
167    arr = pa.array(data[0])
168    values_as_dict = arr.values.dictionary_encode()
169
170    inner_indices = values_as_dict.indices.cast('int32')
171    new_values = pa.DictionaryArray.from_arrays(inner_indices,
172                                                values_as_dict.dictionary)
173
174    offsets = pa.array(range(51), type='int32')
175    expected_arr = pa.ListArray.from_arrays(offsets, new_values)
176    expected = pa.table([expected_arr], names=['f0'])
177
178    assert result.equals(expected)
179    assert result[0].num_chunks == 1
180
181
182@parametrize_legacy_dataset
183def test_dictionary_array_automatically_read(use_legacy_dataset):
184    # ARROW-3246
185
186    # Make a large dictionary, a little over 4MB of data
187    dict_length = 4000
188    dict_values = pa.array([('x' * 1000 + '_{}'.format(i))
189                            for i in range(dict_length)])
190
191    num_chunks = 10
192    chunk_size = 100
193    chunks = []
194    for i in range(num_chunks):
195        indices = np.random.randint(0, dict_length,
196                                    size=chunk_size).astype(np.int32)
197        chunks.append(pa.DictionaryArray.from_arrays(pa.array(indices),
198                                                     dict_values))
199
200    table = pa.table([pa.chunked_array(chunks)], names=['f0'])
201    result = _simple_table_write_read(table, use_legacy_dataset)
202
203    assert result.equals(table)
204
205    # The only key in the metadata was the Arrow schema key
206    assert result.schema.metadata is None
207
208
209# Decimal
210# -----------------------------------------------------------------------------
211
212
213@pytest.mark.pandas
214@parametrize_legacy_dataset
215def test_decimal_roundtrip(tempdir, use_legacy_dataset):
216    num_values = 10
217
218    columns = {}
219    for precision in range(1, 39):
220        for scale in range(0, precision + 1):
221            with util.random_seed(0):
222                random_decimal_values = [
223                    util.randdecimal(precision, scale)
224                    for _ in range(num_values)
225                ]
226            column_name = ('dec_precision_{:d}_scale_{:d}'
227                           .format(precision, scale))
228            columns[column_name] = random_decimal_values
229
230    expected = pd.DataFrame(columns)
231    filename = tempdir / 'decimals.parquet'
232    string_filename = str(filename)
233    table = pa.Table.from_pandas(expected)
234    _write_table(table, string_filename)
235    result_table = _read_table(
236        string_filename, use_legacy_dataset=use_legacy_dataset)
237    result = result_table.to_pandas()
238    tm.assert_frame_equal(result, expected)
239
240
241@pytest.mark.pandas
242@pytest.mark.xfail(
243    raises=OSError, reason='Parquet does not support negative scale'
244)
245def test_decimal_roundtrip_negative_scale(tempdir):
246    expected = pd.DataFrame({'decimal_num': [decimal.Decimal('1.23E4')]})
247    filename = tempdir / 'decimals.parquet'
248    string_filename = str(filename)
249    t = pa.Table.from_pandas(expected)
250    _write_table(t, string_filename)
251    result_table = _read_table(string_filename)
252    result = result_table.to_pandas()
253    tm.assert_frame_equal(result, expected)
254
255
256# List types
257# -----------------------------------------------------------------------------
258
259
260@parametrize_legacy_dataset
261@pytest.mark.parametrize('dtype', [int, float])
262def test_single_pylist_column_roundtrip(tempdir, dtype, use_legacy_dataset):
263    filename = tempdir / 'single_{}_column.parquet'.format(dtype.__name__)
264    data = [pa.array(list(map(dtype, range(5))))]
265    table = pa.Table.from_arrays(data, names=['a'])
266    _write_table(table, filename)
267    table_read = _read_table(filename, use_legacy_dataset=use_legacy_dataset)
268    for i in range(table.num_columns):
269        col_written = table[i]
270        col_read = table_read[i]
271        assert table.field(i).name == table_read.field(i).name
272        assert col_read.num_chunks == 1
273        data_written = col_written.chunk(0)
274        data_read = col_read.chunk(0)
275        assert data_written.equals(data_read)
276
277
278@parametrize_legacy_dataset
279def test_empty_lists_table_roundtrip(use_legacy_dataset):
280    # ARROW-2744: Shouldn't crash when writing an array of empty lists
281    arr = pa.array([[], []], type=pa.list_(pa.int32()))
282    table = pa.Table.from_arrays([arr], ["A"])
283    _check_roundtrip(table, use_legacy_dataset=use_legacy_dataset)
284
285
286@parametrize_legacy_dataset
287def test_nested_list_nonnullable_roundtrip_bug(use_legacy_dataset):
288    # Reproduce failure in ARROW-5630
289    typ = pa.list_(pa.field("item", pa.float32(), False))
290    num_rows = 10000
291    t = pa.table([
292        pa.array(([[0] * ((i + 5) % 10) for i in range(0, 10)] *
293                  (num_rows // 10)), type=typ)
294    ], ['a'])
295    _check_roundtrip(
296        t, data_page_size=4096, use_legacy_dataset=use_legacy_dataset)
297
298
299@parametrize_legacy_dataset
300def test_nested_list_struct_multiple_batches_roundtrip(
301    tempdir, use_legacy_dataset
302):
303    # Reproduce failure in ARROW-11024
304    data = [[{'x': 'abc', 'y': 'abc'}]]*100 + [[{'x': 'abc', 'y': 'gcb'}]]*100
305    table = pa.table([pa.array(data)], names=['column'])
306    _check_roundtrip(
307        table, row_group_size=20, use_legacy_dataset=use_legacy_dataset)
308
309    # Reproduce failure in ARROW-11069 (plain non-nested structs with strings)
310    data = pa.array(
311        [{'a': '1', 'b': '2'}, {'a': '3', 'b': '4'}, {'a': '5', 'b': '6'}]*10
312    )
313    table = pa.table({'column': data})
314    _check_roundtrip(
315        table, row_group_size=10, use_legacy_dataset=use_legacy_dataset)
316
317
318def test_writing_empty_lists():
319    # ARROW-2591: [Python] Segmentation fault issue in pq.write_table
320    arr1 = pa.array([[], []], pa.list_(pa.int32()))
321    table = pa.Table.from_arrays([arr1], ['list(int32)'])
322    _check_roundtrip(table)
323
324
325@pytest.mark.pandas
326def test_column_of_arrays(tempdir):
327    df, schema = dataframe_with_arrays()
328
329    filename = tempdir / 'pandas_roundtrip.parquet'
330    arrow_table = pa.Table.from_pandas(df, schema=schema)
331    _write_table(arrow_table, filename, version='2.6', coerce_timestamps='ms')
332    table_read = _read_table(filename)
333    df_read = table_read.to_pandas()
334    tm.assert_frame_equal(df, df_read)
335
336
337@pytest.mark.pandas
338def test_column_of_lists(tempdir):
339    df, schema = dataframe_with_lists(parquet_compatible=True)
340
341    filename = tempdir / 'pandas_roundtrip.parquet'
342    arrow_table = pa.Table.from_pandas(df, schema=schema)
343    _write_table(arrow_table, filename, version='2.6')
344    table_read = _read_table(filename)
345    df_read = table_read.to_pandas()
346
347    tm.assert_frame_equal(df, df_read)
348
349
350def test_large_list_records():
351    # This was fixed in PARQUET-1100
352
353    list_lengths = np.random.randint(0, 500, size=50)
354    list_lengths[::10] = 0
355
356    list_values = [list(map(int, np.random.randint(0, 100, size=x)))
357                   if i % 8 else None
358                   for i, x in enumerate(list_lengths)]
359
360    a1 = pa.array(list_values)
361
362    table = pa.Table.from_arrays([a1], ['int_lists'])
363    _check_roundtrip(table)
364
365
366@pytest.mark.pandas
367@parametrize_legacy_dataset
368def test_parquet_nested_convenience(tempdir, use_legacy_dataset):
369    # ARROW-1684
370    df = pd.DataFrame({
371        'a': [[1, 2, 3], None, [4, 5], []],
372        'b': [[1.], None, None, [6., 7.]],
373    })
374
375    path = str(tempdir / 'nested_convenience.parquet')
376
377    table = pa.Table.from_pandas(df, preserve_index=False)
378    _write_table(table, path)
379
380    read = pq.read_table(
381        path, columns=['a'], use_legacy_dataset=use_legacy_dataset)
382    tm.assert_frame_equal(read.to_pandas(), df[['a']])
383
384    read = pq.read_table(
385        path, columns=['a', 'b'], use_legacy_dataset=use_legacy_dataset)
386    tm.assert_frame_equal(read.to_pandas(), df)
387
388
389# Binary
390# -----------------------------------------------------------------------------
391
392
393def test_fixed_size_binary():
394    t0 = pa.binary(10)
395    data = [b'fooooooooo', None, b'barooooooo', b'quxooooooo']
396    a0 = pa.array(data, type=t0)
397
398    table = pa.Table.from_arrays([a0],
399                                 ['binary[10]'])
400    _check_roundtrip(table)
401
402
403# Large types
404# -----------------------------------------------------------------------------
405
406
407@pytest.mark.slow
408@pytest.mark.large_memory
409def test_large_table_int32_overflow():
410    size = np.iinfo('int32').max + 1
411
412    arr = np.ones(size, dtype='uint8')
413
414    parr = pa.array(arr, type=pa.uint8())
415
416    table = pa.Table.from_arrays([parr], names=['one'])
417    f = io.BytesIO()
418    _write_table(table, f)
419
420
421def _simple_table_roundtrip(table, use_legacy_dataset=False, **write_kwargs):
422    stream = pa.BufferOutputStream()
423    _write_table(table, stream, **write_kwargs)
424    buf = stream.getvalue()
425    return _read_table(buf, use_legacy_dataset=use_legacy_dataset)
426
427
428@pytest.mark.slow
429@pytest.mark.large_memory
430@parametrize_legacy_dataset
431def test_byte_array_exactly_2gb(use_legacy_dataset):
432    # Test edge case reported in ARROW-3762
433    val = b'x' * (1 << 10)
434
435    base = pa.array([val] * ((1 << 21) - 1))
436    cases = [
437        [b'x' * 1023],  # 2^31 - 1
438        [b'x' * 1024],  # 2^31
439        [b'x' * 1025]   # 2^31 + 1
440    ]
441    for case in cases:
442        values = pa.chunked_array([base, pa.array(case)])
443        t = pa.table([values], names=['f0'])
444        result = _simple_table_roundtrip(
445            t, use_legacy_dataset=use_legacy_dataset, use_dictionary=False)
446        assert t.equals(result)
447
448
449@pytest.mark.slow
450@pytest.mark.pandas
451@pytest.mark.large_memory
452@parametrize_legacy_dataset
453def test_binary_array_overflow_to_chunked(use_legacy_dataset):
454    # ARROW-3762
455
456    # 2^31 + 1 bytes
457    values = [b'x'] + [
458        b'x' * (1 << 20)
459    ] * 2 * (1 << 10)
460    df = pd.DataFrame({'byte_col': values})
461
462    tbl = pa.Table.from_pandas(df, preserve_index=False)
463    read_tbl = _simple_table_roundtrip(
464        tbl, use_legacy_dataset=use_legacy_dataset)
465
466    col0_data = read_tbl[0]
467    assert isinstance(col0_data, pa.ChunkedArray)
468
469    # Split up into 2GB chunks
470    assert col0_data.num_chunks == 2
471
472    assert tbl.equals(read_tbl)
473
474
475@pytest.mark.slow
476@pytest.mark.pandas
477@pytest.mark.large_memory
478@parametrize_legacy_dataset
479def test_list_of_binary_large_cell(use_legacy_dataset):
480    # ARROW-4688
481    data = []
482
483    # TODO(wesm): handle chunked children
484    # 2^31 - 1 bytes in a single cell
485    # data.append([b'x' * (1 << 20)] * 2047 + [b'x' * ((1 << 20) - 1)])
486
487    # A little under 2GB in cell each containing approximately 10MB each
488    data.extend([[b'x' * 1000000] * 10] * 214)
489
490    arr = pa.array(data)
491    table = pa.Table.from_arrays([arr], ['chunky_cells'])
492    read_table = _simple_table_roundtrip(
493        table, use_legacy_dataset=use_legacy_dataset)
494    assert table.equals(read_table)
495
496
497def test_large_binary():
498    data = [b'foo', b'bar'] * 50
499    for type in [pa.large_binary(), pa.large_string()]:
500        arr = pa.array(data, type=type)
501        table = pa.Table.from_arrays([arr], names=['strs'])
502        for use_dictionary in [False, True]:
503            _check_roundtrip(table, use_dictionary=use_dictionary)
504
505
506@pytest.mark.slow
507@pytest.mark.large_memory
508def test_large_binary_huge():
509    s = b'xy' * 997
510    data = [s] * ((1 << 33) // len(s))
511    for type in [pa.large_binary(), pa.large_string()]:
512        arr = pa.array(data, type=type)
513        table = pa.Table.from_arrays([arr], names=['strs'])
514        for use_dictionary in [False, True]:
515            _check_roundtrip(table, use_dictionary=use_dictionary)
516        del arr, table
517
518
519@pytest.mark.large_memory
520def test_large_binary_overflow():
521    s = b'x' * (1 << 31)
522    arr = pa.array([s], type=pa.large_binary())
523    table = pa.Table.from_arrays([arr], names=['strs'])
524    for use_dictionary in [False, True]:
525        writer = pa.BufferOutputStream()
526        with pytest.raises(
527                pa.ArrowInvalid,
528                match="Parquet cannot store strings with size 2GB or more"):
529            _write_table(table, writer, use_dictionary=use_dictionary)
530