1# Licensed to the Apache Software Foundation (ASF) under one 2# or more contributor license agreements. See the NOTICE file 3# distributed with this work for additional information 4# regarding copyright ownership. The ASF licenses this file 5# to you under the Apache License, Version 2.0 (the 6# "License"); you may not use this file except in compliance 7# with the License. You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, 12# software distributed under the License is distributed on an 13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14# KIND, either express or implied. See the License for the 15# specific language governing permissions and limitations 16# under the License. 17 18import decimal 19import io 20 21import numpy as np 22import pytest 23 24import pyarrow as pa 25from pyarrow.tests import util 26from pyarrow.tests.parquet.common import (_check_roundtrip, 27 parametrize_legacy_dataset) 28 29try: 30 import pyarrow.parquet as pq 31 from pyarrow.tests.parquet.common import _read_table, _write_table 32except ImportError: 33 pq = None 34 35 36try: 37 import pandas as pd 38 import pandas.testing as tm 39 40 from pyarrow.tests.pandas_examples import (dataframe_with_arrays, 41 dataframe_with_lists) 42 from pyarrow.tests.parquet.common import alltypes_sample 43except ImportError: 44 pd = tm = None 45 46 47pytestmark = pytest.mark.parquet 48 49 50# General roundtrip of data types 51# ----------------------------------------------------------------------------- 52 53 54@pytest.mark.pandas 55@parametrize_legacy_dataset 56@pytest.mark.parametrize('chunk_size', [None, 1000]) 57def test_parquet_2_0_roundtrip(tempdir, chunk_size, use_legacy_dataset): 58 df = alltypes_sample(size=10000, categorical=True) 59 60 filename = tempdir / 'pandas_roundtrip.parquet' 61 arrow_table = pa.Table.from_pandas(df) 62 assert arrow_table.schema.pandas_metadata is not None 63 64 _write_table(arrow_table, filename, version='2.6', 65 coerce_timestamps='ms', chunk_size=chunk_size) 66 table_read = pq.read_pandas( 67 filename, use_legacy_dataset=use_legacy_dataset) 68 assert table_read.schema.pandas_metadata is not None 69 70 read_metadata = table_read.schema.metadata 71 assert arrow_table.schema.metadata == read_metadata 72 73 df_read = table_read.to_pandas() 74 tm.assert_frame_equal(df, df_read) 75 76 77@pytest.mark.pandas 78@parametrize_legacy_dataset 79def test_parquet_1_0_roundtrip(tempdir, use_legacy_dataset): 80 size = 10000 81 np.random.seed(0) 82 df = pd.DataFrame({ 83 'uint8': np.arange(size, dtype=np.uint8), 84 'uint16': np.arange(size, dtype=np.uint16), 85 'uint32': np.arange(size, dtype=np.uint32), 86 'uint64': np.arange(size, dtype=np.uint64), 87 'int8': np.arange(size, dtype=np.int16), 88 'int16': np.arange(size, dtype=np.int16), 89 'int32': np.arange(size, dtype=np.int32), 90 'int64': np.arange(size, dtype=np.int64), 91 'float32': np.arange(size, dtype=np.float32), 92 'float64': np.arange(size, dtype=np.float64), 93 'bool': np.random.randn(size) > 0, 94 'str': [str(x) for x in range(size)], 95 'str_with_nulls': [None] + [str(x) for x in range(size - 2)] + [None], 96 'empty_str': [''] * size 97 }) 98 filename = tempdir / 'pandas_roundtrip.parquet' 99 arrow_table = pa.Table.from_pandas(df) 100 _write_table(arrow_table, filename, version='1.0') 101 table_read = _read_table(filename, use_legacy_dataset=use_legacy_dataset) 102 df_read = table_read.to_pandas() 103 104 # We pass uint32_t as int64_t if we write Parquet version 1.0 105 df['uint32'] = df['uint32'].values.astype(np.int64) 106 107 tm.assert_frame_equal(df, df_read) 108 109 110# Dictionary 111# ----------------------------------------------------------------------------- 112 113 114def _simple_table_write_read(table, use_legacy_dataset): 115 bio = pa.BufferOutputStream() 116 pq.write_table(table, bio) 117 contents = bio.getvalue() 118 return pq.read_table( 119 pa.BufferReader(contents), use_legacy_dataset=use_legacy_dataset 120 ) 121 122 123@pytest.mark.pandas 124@parametrize_legacy_dataset 125def test_direct_read_dictionary(use_legacy_dataset): 126 # ARROW-3325 127 repeats = 10 128 nunique = 5 129 130 data = [ 131 [util.rands(10) for i in range(nunique)] * repeats, 132 133 ] 134 table = pa.table(data, names=['f0']) 135 136 bio = pa.BufferOutputStream() 137 pq.write_table(table, bio) 138 contents = bio.getvalue() 139 140 result = pq.read_table(pa.BufferReader(contents), 141 read_dictionary=['f0'], 142 use_legacy_dataset=use_legacy_dataset) 143 144 # Compute dictionary-encoded subfield 145 expected = pa.table([table[0].dictionary_encode()], names=['f0']) 146 assert result.equals(expected) 147 148 149@pytest.mark.pandas 150@parametrize_legacy_dataset 151def test_direct_read_dictionary_subfield(use_legacy_dataset): 152 repeats = 10 153 nunique = 5 154 155 data = [ 156 [[util.rands(10)] for i in range(nunique)] * repeats, 157 ] 158 table = pa.table(data, names=['f0']) 159 160 bio = pa.BufferOutputStream() 161 pq.write_table(table, bio) 162 contents = bio.getvalue() 163 result = pq.read_table(pa.BufferReader(contents), 164 read_dictionary=['f0.list.item'], 165 use_legacy_dataset=use_legacy_dataset) 166 167 arr = pa.array(data[0]) 168 values_as_dict = arr.values.dictionary_encode() 169 170 inner_indices = values_as_dict.indices.cast('int32') 171 new_values = pa.DictionaryArray.from_arrays(inner_indices, 172 values_as_dict.dictionary) 173 174 offsets = pa.array(range(51), type='int32') 175 expected_arr = pa.ListArray.from_arrays(offsets, new_values) 176 expected = pa.table([expected_arr], names=['f0']) 177 178 assert result.equals(expected) 179 assert result[0].num_chunks == 1 180 181 182@parametrize_legacy_dataset 183def test_dictionary_array_automatically_read(use_legacy_dataset): 184 # ARROW-3246 185 186 # Make a large dictionary, a little over 4MB of data 187 dict_length = 4000 188 dict_values = pa.array([('x' * 1000 + '_{}'.format(i)) 189 for i in range(dict_length)]) 190 191 num_chunks = 10 192 chunk_size = 100 193 chunks = [] 194 for i in range(num_chunks): 195 indices = np.random.randint(0, dict_length, 196 size=chunk_size).astype(np.int32) 197 chunks.append(pa.DictionaryArray.from_arrays(pa.array(indices), 198 dict_values)) 199 200 table = pa.table([pa.chunked_array(chunks)], names=['f0']) 201 result = _simple_table_write_read(table, use_legacy_dataset) 202 203 assert result.equals(table) 204 205 # The only key in the metadata was the Arrow schema key 206 assert result.schema.metadata is None 207 208 209# Decimal 210# ----------------------------------------------------------------------------- 211 212 213@pytest.mark.pandas 214@parametrize_legacy_dataset 215def test_decimal_roundtrip(tempdir, use_legacy_dataset): 216 num_values = 10 217 218 columns = {} 219 for precision in range(1, 39): 220 for scale in range(0, precision + 1): 221 with util.random_seed(0): 222 random_decimal_values = [ 223 util.randdecimal(precision, scale) 224 for _ in range(num_values) 225 ] 226 column_name = ('dec_precision_{:d}_scale_{:d}' 227 .format(precision, scale)) 228 columns[column_name] = random_decimal_values 229 230 expected = pd.DataFrame(columns) 231 filename = tempdir / 'decimals.parquet' 232 string_filename = str(filename) 233 table = pa.Table.from_pandas(expected) 234 _write_table(table, string_filename) 235 result_table = _read_table( 236 string_filename, use_legacy_dataset=use_legacy_dataset) 237 result = result_table.to_pandas() 238 tm.assert_frame_equal(result, expected) 239 240 241@pytest.mark.pandas 242@pytest.mark.xfail( 243 raises=OSError, reason='Parquet does not support negative scale' 244) 245def test_decimal_roundtrip_negative_scale(tempdir): 246 expected = pd.DataFrame({'decimal_num': [decimal.Decimal('1.23E4')]}) 247 filename = tempdir / 'decimals.parquet' 248 string_filename = str(filename) 249 t = pa.Table.from_pandas(expected) 250 _write_table(t, string_filename) 251 result_table = _read_table(string_filename) 252 result = result_table.to_pandas() 253 tm.assert_frame_equal(result, expected) 254 255 256# List types 257# ----------------------------------------------------------------------------- 258 259 260@parametrize_legacy_dataset 261@pytest.mark.parametrize('dtype', [int, float]) 262def test_single_pylist_column_roundtrip(tempdir, dtype, use_legacy_dataset): 263 filename = tempdir / 'single_{}_column.parquet'.format(dtype.__name__) 264 data = [pa.array(list(map(dtype, range(5))))] 265 table = pa.Table.from_arrays(data, names=['a']) 266 _write_table(table, filename) 267 table_read = _read_table(filename, use_legacy_dataset=use_legacy_dataset) 268 for i in range(table.num_columns): 269 col_written = table[i] 270 col_read = table_read[i] 271 assert table.field(i).name == table_read.field(i).name 272 assert col_read.num_chunks == 1 273 data_written = col_written.chunk(0) 274 data_read = col_read.chunk(0) 275 assert data_written.equals(data_read) 276 277 278@parametrize_legacy_dataset 279def test_empty_lists_table_roundtrip(use_legacy_dataset): 280 # ARROW-2744: Shouldn't crash when writing an array of empty lists 281 arr = pa.array([[], []], type=pa.list_(pa.int32())) 282 table = pa.Table.from_arrays([arr], ["A"]) 283 _check_roundtrip(table, use_legacy_dataset=use_legacy_dataset) 284 285 286@parametrize_legacy_dataset 287def test_nested_list_nonnullable_roundtrip_bug(use_legacy_dataset): 288 # Reproduce failure in ARROW-5630 289 typ = pa.list_(pa.field("item", pa.float32(), False)) 290 num_rows = 10000 291 t = pa.table([ 292 pa.array(([[0] * ((i + 5) % 10) for i in range(0, 10)] * 293 (num_rows // 10)), type=typ) 294 ], ['a']) 295 _check_roundtrip( 296 t, data_page_size=4096, use_legacy_dataset=use_legacy_dataset) 297 298 299@parametrize_legacy_dataset 300def test_nested_list_struct_multiple_batches_roundtrip( 301 tempdir, use_legacy_dataset 302): 303 # Reproduce failure in ARROW-11024 304 data = [[{'x': 'abc', 'y': 'abc'}]]*100 + [[{'x': 'abc', 'y': 'gcb'}]]*100 305 table = pa.table([pa.array(data)], names=['column']) 306 _check_roundtrip( 307 table, row_group_size=20, use_legacy_dataset=use_legacy_dataset) 308 309 # Reproduce failure in ARROW-11069 (plain non-nested structs with strings) 310 data = pa.array( 311 [{'a': '1', 'b': '2'}, {'a': '3', 'b': '4'}, {'a': '5', 'b': '6'}]*10 312 ) 313 table = pa.table({'column': data}) 314 _check_roundtrip( 315 table, row_group_size=10, use_legacy_dataset=use_legacy_dataset) 316 317 318def test_writing_empty_lists(): 319 # ARROW-2591: [Python] Segmentation fault issue in pq.write_table 320 arr1 = pa.array([[], []], pa.list_(pa.int32())) 321 table = pa.Table.from_arrays([arr1], ['list(int32)']) 322 _check_roundtrip(table) 323 324 325@pytest.mark.pandas 326def test_column_of_arrays(tempdir): 327 df, schema = dataframe_with_arrays() 328 329 filename = tempdir / 'pandas_roundtrip.parquet' 330 arrow_table = pa.Table.from_pandas(df, schema=schema) 331 _write_table(arrow_table, filename, version='2.6', coerce_timestamps='ms') 332 table_read = _read_table(filename) 333 df_read = table_read.to_pandas() 334 tm.assert_frame_equal(df, df_read) 335 336 337@pytest.mark.pandas 338def test_column_of_lists(tempdir): 339 df, schema = dataframe_with_lists(parquet_compatible=True) 340 341 filename = tempdir / 'pandas_roundtrip.parquet' 342 arrow_table = pa.Table.from_pandas(df, schema=schema) 343 _write_table(arrow_table, filename, version='2.6') 344 table_read = _read_table(filename) 345 df_read = table_read.to_pandas() 346 347 tm.assert_frame_equal(df, df_read) 348 349 350def test_large_list_records(): 351 # This was fixed in PARQUET-1100 352 353 list_lengths = np.random.randint(0, 500, size=50) 354 list_lengths[::10] = 0 355 356 list_values = [list(map(int, np.random.randint(0, 100, size=x))) 357 if i % 8 else None 358 for i, x in enumerate(list_lengths)] 359 360 a1 = pa.array(list_values) 361 362 table = pa.Table.from_arrays([a1], ['int_lists']) 363 _check_roundtrip(table) 364 365 366@pytest.mark.pandas 367@parametrize_legacy_dataset 368def test_parquet_nested_convenience(tempdir, use_legacy_dataset): 369 # ARROW-1684 370 df = pd.DataFrame({ 371 'a': [[1, 2, 3], None, [4, 5], []], 372 'b': [[1.], None, None, [6., 7.]], 373 }) 374 375 path = str(tempdir / 'nested_convenience.parquet') 376 377 table = pa.Table.from_pandas(df, preserve_index=False) 378 _write_table(table, path) 379 380 read = pq.read_table( 381 path, columns=['a'], use_legacy_dataset=use_legacy_dataset) 382 tm.assert_frame_equal(read.to_pandas(), df[['a']]) 383 384 read = pq.read_table( 385 path, columns=['a', 'b'], use_legacy_dataset=use_legacy_dataset) 386 tm.assert_frame_equal(read.to_pandas(), df) 387 388 389# Binary 390# ----------------------------------------------------------------------------- 391 392 393def test_fixed_size_binary(): 394 t0 = pa.binary(10) 395 data = [b'fooooooooo', None, b'barooooooo', b'quxooooooo'] 396 a0 = pa.array(data, type=t0) 397 398 table = pa.Table.from_arrays([a0], 399 ['binary[10]']) 400 _check_roundtrip(table) 401 402 403# Large types 404# ----------------------------------------------------------------------------- 405 406 407@pytest.mark.slow 408@pytest.mark.large_memory 409def test_large_table_int32_overflow(): 410 size = np.iinfo('int32').max + 1 411 412 arr = np.ones(size, dtype='uint8') 413 414 parr = pa.array(arr, type=pa.uint8()) 415 416 table = pa.Table.from_arrays([parr], names=['one']) 417 f = io.BytesIO() 418 _write_table(table, f) 419 420 421def _simple_table_roundtrip(table, use_legacy_dataset=False, **write_kwargs): 422 stream = pa.BufferOutputStream() 423 _write_table(table, stream, **write_kwargs) 424 buf = stream.getvalue() 425 return _read_table(buf, use_legacy_dataset=use_legacy_dataset) 426 427 428@pytest.mark.slow 429@pytest.mark.large_memory 430@parametrize_legacy_dataset 431def test_byte_array_exactly_2gb(use_legacy_dataset): 432 # Test edge case reported in ARROW-3762 433 val = b'x' * (1 << 10) 434 435 base = pa.array([val] * ((1 << 21) - 1)) 436 cases = [ 437 [b'x' * 1023], # 2^31 - 1 438 [b'x' * 1024], # 2^31 439 [b'x' * 1025] # 2^31 + 1 440 ] 441 for case in cases: 442 values = pa.chunked_array([base, pa.array(case)]) 443 t = pa.table([values], names=['f0']) 444 result = _simple_table_roundtrip( 445 t, use_legacy_dataset=use_legacy_dataset, use_dictionary=False) 446 assert t.equals(result) 447 448 449@pytest.mark.slow 450@pytest.mark.pandas 451@pytest.mark.large_memory 452@parametrize_legacy_dataset 453def test_binary_array_overflow_to_chunked(use_legacy_dataset): 454 # ARROW-3762 455 456 # 2^31 + 1 bytes 457 values = [b'x'] + [ 458 b'x' * (1 << 20) 459 ] * 2 * (1 << 10) 460 df = pd.DataFrame({'byte_col': values}) 461 462 tbl = pa.Table.from_pandas(df, preserve_index=False) 463 read_tbl = _simple_table_roundtrip( 464 tbl, use_legacy_dataset=use_legacy_dataset) 465 466 col0_data = read_tbl[0] 467 assert isinstance(col0_data, pa.ChunkedArray) 468 469 # Split up into 2GB chunks 470 assert col0_data.num_chunks == 2 471 472 assert tbl.equals(read_tbl) 473 474 475@pytest.mark.slow 476@pytest.mark.pandas 477@pytest.mark.large_memory 478@parametrize_legacy_dataset 479def test_list_of_binary_large_cell(use_legacy_dataset): 480 # ARROW-4688 481 data = [] 482 483 # TODO(wesm): handle chunked children 484 # 2^31 - 1 bytes in a single cell 485 # data.append([b'x' * (1 << 20)] * 2047 + [b'x' * ((1 << 20) - 1)]) 486 487 # A little under 2GB in cell each containing approximately 10MB each 488 data.extend([[b'x' * 1000000] * 10] * 214) 489 490 arr = pa.array(data) 491 table = pa.Table.from_arrays([arr], ['chunky_cells']) 492 read_table = _simple_table_roundtrip( 493 table, use_legacy_dataset=use_legacy_dataset) 494 assert table.equals(read_table) 495 496 497def test_large_binary(): 498 data = [b'foo', b'bar'] * 50 499 for type in [pa.large_binary(), pa.large_string()]: 500 arr = pa.array(data, type=type) 501 table = pa.Table.from_arrays([arr], names=['strs']) 502 for use_dictionary in [False, True]: 503 _check_roundtrip(table, use_dictionary=use_dictionary) 504 505 506@pytest.mark.slow 507@pytest.mark.large_memory 508def test_large_binary_huge(): 509 s = b'xy' * 997 510 data = [s] * ((1 << 33) // len(s)) 511 for type in [pa.large_binary(), pa.large_string()]: 512 arr = pa.array(data, type=type) 513 table = pa.Table.from_arrays([arr], names=['strs']) 514 for use_dictionary in [False, True]: 515 _check_roundtrip(table, use_dictionary=use_dictionary) 516 del arr, table 517 518 519@pytest.mark.large_memory 520def test_large_binary_overflow(): 521 s = b'x' * (1 << 31) 522 arr = pa.array([s], type=pa.large_binary()) 523 table = pa.Table.from_arrays([arr], names=['strs']) 524 for use_dictionary in [False, True]: 525 writer = pa.BufferOutputStream() 526 with pytest.raises( 527 pa.ArrowInvalid, 528 match="Parquet cannot store strings with size 2GB or more"): 529 _write_table(table, writer, use_dictionary=use_dictionary) 530