1# Licensed to the Apache Software Foundation (ASF) under one 2# or more contributor license agreements. See the NOTICE file 3# distributed with this work for additional information 4# regarding copyright ownership. The ASF licenses this file 5# to you under the Apache License, Version 2.0 (the 6# "License"); you may not use this file except in compliance 7# with the License. You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, 12# software distributed under the License is distributed on an 13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14# KIND, either express or implied. See the License for the 15# specific language governing permissions and limitations 16# under the License. 17 18from collections import OrderedDict 19import io 20 21import numpy as np 22import pytest 23 24import pyarrow as pa 25from pyarrow import fs 26from pyarrow.filesystem import LocalFileSystem, FileSystem 27from pyarrow.tests import util 28from pyarrow.tests.parquet.common import (_check_roundtrip, _roundtrip_table, 29 parametrize_legacy_dataset) 30 31try: 32 import pyarrow.parquet as pq 33 from pyarrow.tests.parquet.common import _read_table, _write_table 34except ImportError: 35 pq = None 36 37 38try: 39 import pandas as pd 40 import pandas.testing as tm 41 42 from pyarrow.tests.pandas_examples import dataframe_with_lists 43 from pyarrow.tests.parquet.common import alltypes_sample 44except ImportError: 45 pd = tm = None 46 47 48pytestmark = pytest.mark.parquet 49 50 51def test_parquet_invalid_version(tempdir): 52 table = pa.table({'a': [1, 2, 3]}) 53 with pytest.raises(ValueError, match="Unsupported Parquet format version"): 54 _write_table(table, tempdir / 'test_version.parquet', version="2.2") 55 with pytest.raises(ValueError, match="Unsupported Parquet data page " + 56 "version"): 57 _write_table(table, tempdir / 'test_version.parquet', 58 data_page_version="2.2") 59 60 61@parametrize_legacy_dataset 62def test_set_data_page_size(use_legacy_dataset): 63 arr = pa.array([1, 2, 3] * 100000) 64 t = pa.Table.from_arrays([arr], names=['f0']) 65 66 # 128K, 512K 67 page_sizes = [2 << 16, 2 << 18] 68 for target_page_size in page_sizes: 69 _check_roundtrip(t, data_page_size=target_page_size, 70 use_legacy_dataset=use_legacy_dataset) 71 72 73@pytest.mark.pandas 74@parametrize_legacy_dataset 75def test_chunked_table_write(use_legacy_dataset): 76 # ARROW-232 77 tables = [] 78 batch = pa.RecordBatch.from_pandas(alltypes_sample(size=10)) 79 tables.append(pa.Table.from_batches([batch] * 3)) 80 df, _ = dataframe_with_lists() 81 batch = pa.RecordBatch.from_pandas(df) 82 tables.append(pa.Table.from_batches([batch] * 3)) 83 84 for data_page_version in ['1.0', '2.0']: 85 for use_dictionary in [True, False]: 86 for table in tables: 87 _check_roundtrip( 88 table, version='2.6', 89 use_legacy_dataset=use_legacy_dataset, 90 data_page_version=data_page_version, 91 use_dictionary=use_dictionary) 92 93 94@pytest.mark.pandas 95@parametrize_legacy_dataset 96def test_memory_map(tempdir, use_legacy_dataset): 97 df = alltypes_sample(size=10) 98 99 table = pa.Table.from_pandas(df) 100 _check_roundtrip(table, read_table_kwargs={'memory_map': True}, 101 version='2.6', use_legacy_dataset=use_legacy_dataset) 102 103 filename = str(tempdir / 'tmp_file') 104 with open(filename, 'wb') as f: 105 _write_table(table, f, version='2.6') 106 table_read = pq.read_pandas(filename, memory_map=True, 107 use_legacy_dataset=use_legacy_dataset) 108 assert table_read.equals(table) 109 110 111@pytest.mark.pandas 112@parametrize_legacy_dataset 113def test_enable_buffered_stream(tempdir, use_legacy_dataset): 114 df = alltypes_sample(size=10) 115 116 table = pa.Table.from_pandas(df) 117 _check_roundtrip(table, read_table_kwargs={'buffer_size': 1025}, 118 version='2.6', use_legacy_dataset=use_legacy_dataset) 119 120 filename = str(tempdir / 'tmp_file') 121 with open(filename, 'wb') as f: 122 _write_table(table, f, version='2.6') 123 table_read = pq.read_pandas(filename, buffer_size=4096, 124 use_legacy_dataset=use_legacy_dataset) 125 assert table_read.equals(table) 126 127 128@parametrize_legacy_dataset 129def test_special_chars_filename(tempdir, use_legacy_dataset): 130 table = pa.Table.from_arrays([pa.array([42])], ["ints"]) 131 filename = "foo # bar" 132 path = tempdir / filename 133 assert not path.exists() 134 _write_table(table, str(path)) 135 assert path.exists() 136 table_read = _read_table(str(path), use_legacy_dataset=use_legacy_dataset) 137 assert table_read.equals(table) 138 139 140@parametrize_legacy_dataset 141def test_invalid_source(use_legacy_dataset): 142 # Test that we provide an helpful error message pointing out 143 # that None wasn't expected when trying to open a Parquet None file. 144 # 145 # Depending on use_legacy_dataset the message changes slightly 146 # but in both cases it should point out that None wasn't expected. 147 with pytest.raises(TypeError, match="None"): 148 pq.read_table(None, use_legacy_dataset=use_legacy_dataset) 149 150 with pytest.raises(TypeError, match="None"): 151 pq.ParquetFile(None) 152 153 154@pytest.mark.slow 155def test_file_with_over_int16_max_row_groups(): 156 # PARQUET-1857: Parquet encryption support introduced a INT16_MAX upper 157 # limit on the number of row groups, but this limit only impacts files with 158 # encrypted row group metadata because of the int16 row group ordinal used 159 # in the Parquet Thrift metadata. Unencrypted files are not impacted, so 160 # this test checks that it works (even if it isn't a good idea) 161 t = pa.table([list(range(40000))], names=['f0']) 162 _check_roundtrip(t, row_group_size=1) 163 164 165@pytest.mark.pandas 166@parametrize_legacy_dataset 167def test_empty_table_roundtrip(use_legacy_dataset): 168 df = alltypes_sample(size=10) 169 170 # Create a non-empty table to infer the types correctly, then slice to 0 171 table = pa.Table.from_pandas(df) 172 table = pa.Table.from_arrays( 173 [col.chunk(0)[:0] for col in table.itercolumns()], 174 names=table.schema.names) 175 176 assert table.schema.field('null').type == pa.null() 177 assert table.schema.field('null_list').type == pa.list_(pa.null()) 178 _check_roundtrip( 179 table, version='2.6', use_legacy_dataset=use_legacy_dataset) 180 181 182@pytest.mark.pandas 183@parametrize_legacy_dataset 184def test_empty_table_no_columns(use_legacy_dataset): 185 df = pd.DataFrame() 186 empty = pa.Table.from_pandas(df, preserve_index=False) 187 _check_roundtrip(empty, use_legacy_dataset=use_legacy_dataset) 188 189 190@parametrize_legacy_dataset 191def test_write_nested_zero_length_array_chunk_failure(use_legacy_dataset): 192 # Bug report in ARROW-3792 193 cols = OrderedDict( 194 int32=pa.int32(), 195 list_string=pa.list_(pa.string()) 196 ) 197 data = [[], [OrderedDict(int32=1, list_string=('G',)), ]] 198 199 # This produces a table with a column like 200 # <Column name='list_string' type=ListType(list<item: string>)> 201 # [ 202 # [], 203 # [ 204 # [ 205 # "G" 206 # ] 207 # ] 208 # ] 209 # 210 # Each column is a ChunkedArray with 2 elements 211 my_arrays = [pa.array(batch, type=pa.struct(cols)).flatten() 212 for batch in data] 213 my_batches = [pa.RecordBatch.from_arrays(batch, schema=pa.schema(cols)) 214 for batch in my_arrays] 215 tbl = pa.Table.from_batches(my_batches, pa.schema(cols)) 216 _check_roundtrip(tbl, use_legacy_dataset=use_legacy_dataset) 217 218 219@pytest.mark.pandas 220@parametrize_legacy_dataset 221def test_multiple_path_types(tempdir, use_legacy_dataset): 222 # Test compatibility with PEP 519 path-like objects 223 path = tempdir / 'zzz.parquet' 224 df = pd.DataFrame({'x': np.arange(10, dtype=np.int64)}) 225 _write_table(df, path) 226 table_read = _read_table(path, use_legacy_dataset=use_legacy_dataset) 227 df_read = table_read.to_pandas() 228 tm.assert_frame_equal(df, df_read) 229 230 # Test compatibility with plain string paths 231 path = str(tempdir) + 'zzz.parquet' 232 df = pd.DataFrame({'x': np.arange(10, dtype=np.int64)}) 233 _write_table(df, path) 234 table_read = _read_table(path, use_legacy_dataset=use_legacy_dataset) 235 df_read = table_read.to_pandas() 236 tm.assert_frame_equal(df, df_read) 237 238 239@parametrize_legacy_dataset 240def test_fspath(tempdir, use_legacy_dataset): 241 # ARROW-12472 support __fspath__ objects without using str() 242 path = tempdir / "test.parquet" 243 table = pa.table({"a": [1, 2, 3]}) 244 _write_table(table, path) 245 246 fs_protocol_obj = util.FSProtocolClass(path) 247 248 result = _read_table( 249 fs_protocol_obj, use_legacy_dataset=use_legacy_dataset 250 ) 251 assert result.equals(table) 252 253 # combined with non-local filesystem raises 254 with pytest.raises(TypeError): 255 _read_table(fs_protocol_obj, filesystem=FileSystem()) 256 257 258@pytest.mark.dataset 259@parametrize_legacy_dataset 260@pytest.mark.parametrize("filesystem", [ 261 None, fs.LocalFileSystem(), LocalFileSystem._get_instance() 262]) 263def test_relative_paths(tempdir, use_legacy_dataset, filesystem): 264 # reading and writing from relative paths 265 table = pa.table({"a": [1, 2, 3]}) 266 267 # reading 268 pq.write_table(table, str(tempdir / "data.parquet")) 269 with util.change_cwd(tempdir): 270 result = pq.read_table("data.parquet", filesystem=filesystem, 271 use_legacy_dataset=use_legacy_dataset) 272 assert result.equals(table) 273 274 # writing 275 with util.change_cwd(tempdir): 276 pq.write_table(table, "data2.parquet", filesystem=filesystem) 277 result = pq.read_table(tempdir / "data2.parquet") 278 assert result.equals(table) 279 280 281def test_read_non_existing_file(): 282 # ensure we have a proper error message 283 with pytest.raises(FileNotFoundError): 284 pq.read_table('i-am-not-existing.parquet') 285 286 287def test_file_error_python_exception(): 288 class BogusFile(io.BytesIO): 289 def read(self, *args): 290 raise ZeroDivisionError("zorglub") 291 292 def seek(self, *args): 293 raise ZeroDivisionError("zorglub") 294 295 # ensure the Python exception is restored 296 with pytest.raises(ZeroDivisionError, match="zorglub"): 297 pq.read_table(BogusFile(b"")) 298 299 300@parametrize_legacy_dataset 301def test_parquet_read_from_buffer(tempdir, use_legacy_dataset): 302 # reading from a buffer from python's open() 303 table = pa.table({"a": [1, 2, 3]}) 304 pq.write_table(table, str(tempdir / "data.parquet")) 305 306 with open(str(tempdir / "data.parquet"), "rb") as f: 307 result = pq.read_table(f, use_legacy_dataset=use_legacy_dataset) 308 assert result.equals(table) 309 310 with open(str(tempdir / "data.parquet"), "rb") as f: 311 result = pq.read_table(pa.PythonFile(f), 312 use_legacy_dataset=use_legacy_dataset) 313 assert result.equals(table) 314 315 316@parametrize_legacy_dataset 317def test_byte_stream_split(use_legacy_dataset): 318 # This is only a smoke test. 319 arr_float = pa.array(list(map(float, range(100)))) 320 arr_int = pa.array(list(map(int, range(100)))) 321 data_float = [arr_float, arr_float] 322 table = pa.Table.from_arrays(data_float, names=['a', 'b']) 323 324 # Check with byte_stream_split for both columns. 325 _check_roundtrip(table, expected=table, compression="gzip", 326 use_dictionary=False, use_byte_stream_split=True) 327 328 # Check with byte_stream_split for column 'b' and dictionary 329 # for column 'a'. 330 _check_roundtrip(table, expected=table, compression="gzip", 331 use_dictionary=['a'], 332 use_byte_stream_split=['b']) 333 334 # Check with a collision for both columns. 335 _check_roundtrip(table, expected=table, compression="gzip", 336 use_dictionary=['a', 'b'], 337 use_byte_stream_split=['a', 'b']) 338 339 # Check with mixed column types. 340 mixed_table = pa.Table.from_arrays([arr_float, arr_int], 341 names=['a', 'b']) 342 _check_roundtrip(mixed_table, expected=mixed_table, 343 use_dictionary=['b'], 344 use_byte_stream_split=['a']) 345 346 # Try to use the wrong data type with the byte_stream_split encoding. 347 # This should throw an exception. 348 table = pa.Table.from_arrays([arr_int], names=['tmp']) 349 with pytest.raises(IOError): 350 _check_roundtrip(table, expected=table, use_byte_stream_split=True, 351 use_dictionary=False, 352 use_legacy_dataset=use_legacy_dataset) 353 354 355@parametrize_legacy_dataset 356def test_compression_level(use_legacy_dataset): 357 arr = pa.array(list(map(int, range(1000)))) 358 data = [arr, arr] 359 table = pa.Table.from_arrays(data, names=['a', 'b']) 360 361 # Check one compression level. 362 _check_roundtrip(table, expected=table, compression="gzip", 363 compression_level=1, 364 use_legacy_dataset=use_legacy_dataset) 365 366 # Check another one to make sure that compression_level=1 does not 367 # coincide with the default one in Arrow. 368 _check_roundtrip(table, expected=table, compression="gzip", 369 compression_level=5, 370 use_legacy_dataset=use_legacy_dataset) 371 372 # Check that the user can provide a compression per column 373 _check_roundtrip(table, expected=table, 374 compression={'a': "gzip", 'b': "snappy"}, 375 use_legacy_dataset=use_legacy_dataset) 376 377 # Check that the user can provide a compression level per column 378 _check_roundtrip(table, expected=table, compression="gzip", 379 compression_level={'a': 2, 'b': 3}, 380 use_legacy_dataset=use_legacy_dataset) 381 382 # Check that specifying a compression level for a codec which does allow 383 # specifying one, results into an error. 384 # Uncompressed, snappy, lz4 and lzo do not support specifying a compression 385 # level. 386 # GZIP (zlib) allows for specifying a compression level but as of up 387 # to version 1.2.11 the valid range is [-1, 9]. 388 invalid_combinations = [("snappy", 4), ("lz4", 5), ("gzip", -1337), 389 ("None", 444), ("lzo", 14)] 390 buf = io.BytesIO() 391 for (codec, level) in invalid_combinations: 392 with pytest.raises((ValueError, OSError)): 393 _write_table(table, buf, compression=codec, 394 compression_level=level) 395 396 397def test_sanitized_spark_field_names(): 398 a0 = pa.array([0, 1, 2, 3, 4]) 399 name = 'prohib; ,\t{}' 400 table = pa.Table.from_arrays([a0], [name]) 401 402 result = _roundtrip_table(table, write_table_kwargs={'flavor': 'spark'}) 403 404 expected_name = 'prohib______' 405 assert result.schema[0].name == expected_name 406 407 408@pytest.mark.pandas 409@parametrize_legacy_dataset 410def test_multithreaded_read(use_legacy_dataset): 411 df = alltypes_sample(size=10000) 412 413 table = pa.Table.from_pandas(df) 414 415 buf = io.BytesIO() 416 _write_table(table, buf, compression='SNAPPY', version='2.6') 417 418 buf.seek(0) 419 table1 = _read_table( 420 buf, use_threads=True, use_legacy_dataset=use_legacy_dataset) 421 422 buf.seek(0) 423 table2 = _read_table( 424 buf, use_threads=False, use_legacy_dataset=use_legacy_dataset) 425 426 assert table1.equals(table2) 427 428 429@pytest.mark.pandas 430@parametrize_legacy_dataset 431def test_min_chunksize(use_legacy_dataset): 432 data = pd.DataFrame([np.arange(4)], columns=['A', 'B', 'C', 'D']) 433 table = pa.Table.from_pandas(data.reset_index()) 434 435 buf = io.BytesIO() 436 _write_table(table, buf, chunk_size=-1) 437 438 buf.seek(0) 439 result = _read_table(buf, use_legacy_dataset=use_legacy_dataset) 440 441 assert result.equals(table) 442 443 with pytest.raises(ValueError): 444 _write_table(table, buf, chunk_size=0) 445 446 447@pytest.mark.pandas 448def test_write_error_deletes_incomplete_file(tempdir): 449 # ARROW-1285 450 df = pd.DataFrame({'a': list('abc'), 451 'b': list(range(1, 4)), 452 'c': np.arange(3, 6).astype('u1'), 453 'd': np.arange(4.0, 7.0, dtype='float64'), 454 'e': [True, False, True], 455 'f': pd.Categorical(list('abc')), 456 'g': pd.date_range('20130101', periods=3), 457 'h': pd.date_range('20130101', periods=3, 458 tz='US/Eastern'), 459 'i': pd.date_range('20130101', periods=3, freq='ns')}) 460 461 pdf = pa.Table.from_pandas(df) 462 463 filename = tempdir / 'tmp_file' 464 try: 465 _write_table(pdf, filename) 466 except pa.ArrowException: 467 pass 468 469 assert not filename.exists() 470 471 472@parametrize_legacy_dataset 473def test_read_non_existent_file(tempdir, use_legacy_dataset): 474 path = 'non-existent-file.parquet' 475 try: 476 pq.read_table(path, use_legacy_dataset=use_legacy_dataset) 477 except Exception as e: 478 assert path in e.args[0] 479 480 481@parametrize_legacy_dataset 482def test_read_table_doesnt_warn(datadir, use_legacy_dataset): 483 with pytest.warns(None) as record: 484 pq.read_table(datadir / 'v0.7.1.parquet', 485 use_legacy_dataset=use_legacy_dataset) 486 487 assert len(record) == 0 488 489 490@pytest.mark.pandas 491@parametrize_legacy_dataset 492def test_zlib_compression_bug(use_legacy_dataset): 493 # ARROW-3514: "zlib deflate failed, output buffer too small" 494 table = pa.Table.from_arrays([pa.array(['abc', 'def'])], ['some_col']) 495 f = io.BytesIO() 496 pq.write_table(table, f, compression='gzip') 497 498 f.seek(0) 499 roundtrip = pq.read_table(f, use_legacy_dataset=use_legacy_dataset) 500 tm.assert_frame_equal(roundtrip.to_pandas(), table.to_pandas()) 501 502 503@parametrize_legacy_dataset 504def test_parquet_file_too_small(tempdir, use_legacy_dataset): 505 path = str(tempdir / "test.parquet") 506 # TODO(dataset) with datasets API it raises OSError instead 507 with pytest.raises((pa.ArrowInvalid, OSError), 508 match='size is 0 bytes'): 509 with open(path, 'wb') as f: 510 pass 511 pq.read_table(path, use_legacy_dataset=use_legacy_dataset) 512 513 with pytest.raises((pa.ArrowInvalid, OSError), 514 match='size is 4 bytes'): 515 with open(path, 'wb') as f: 516 f.write(b'ffff') 517 pq.read_table(path, use_legacy_dataset=use_legacy_dataset) 518 519 520@pytest.mark.pandas 521@pytest.mark.fastparquet 522@pytest.mark.filterwarnings("ignore:RangeIndex:FutureWarning") 523@pytest.mark.filterwarnings("ignore:tostring:DeprecationWarning:fastparquet") 524def test_fastparquet_cross_compatibility(tempdir): 525 fp = pytest.importorskip('fastparquet') 526 527 df = pd.DataFrame( 528 { 529 "a": list("abc"), 530 "b": list(range(1, 4)), 531 "c": np.arange(4.0, 7.0, dtype="float64"), 532 "d": [True, False, True], 533 "e": pd.date_range("20130101", periods=3), 534 "f": pd.Categorical(["a", "b", "a"]), 535 # fastparquet writes list as BYTE_ARRAY JSON, so no roundtrip 536 # "g": [[1, 2], None, [1, 2, 3]], 537 } 538 ) 539 table = pa.table(df) 540 541 # Arrow -> fastparquet 542 file_arrow = str(tempdir / "cross_compat_arrow.parquet") 543 pq.write_table(table, file_arrow, compression=None) 544 545 fp_file = fp.ParquetFile(file_arrow) 546 df_fp = fp_file.to_pandas() 547 tm.assert_frame_equal(df, df_fp) 548 549 # Fastparquet -> arrow 550 file_fastparquet = str(tempdir / "cross_compat_fastparquet.parquet") 551 fp.write(file_fastparquet, df) 552 553 table_fp = pq.read_pandas(file_fastparquet) 554 # for fastparquet written file, categoricals comes back as strings 555 # (no arrow schema in parquet metadata) 556 df['f'] = df['f'].astype(object) 557 tm.assert_frame_equal(table_fp.to_pandas(), df) 558 559 560@parametrize_legacy_dataset 561@pytest.mark.parametrize('array_factory', [ 562 lambda: pa.array([0, None] * 10), 563 lambda: pa.array([0, None] * 10).dictionary_encode(), 564 lambda: pa.array(["", None] * 10), 565 lambda: pa.array(["", None] * 10).dictionary_encode(), 566]) 567@pytest.mark.parametrize('use_dictionary', [False, True]) 568@pytest.mark.parametrize('read_dictionary', [False, True]) 569def test_buffer_contents( 570 array_factory, use_dictionary, read_dictionary, use_legacy_dataset 571): 572 # Test that null values are deterministically initialized to zero 573 # after a roundtrip through Parquet. 574 # See ARROW-8006 and ARROW-8011. 575 orig_table = pa.Table.from_pydict({"col": array_factory()}) 576 bio = io.BytesIO() 577 pq.write_table(orig_table, bio, use_dictionary=True) 578 bio.seek(0) 579 read_dictionary = ['col'] if read_dictionary else None 580 table = pq.read_table(bio, use_threads=False, 581 read_dictionary=read_dictionary, 582 use_legacy_dataset=use_legacy_dataset) 583 584 for col in table.columns: 585 [chunk] = col.chunks 586 buf = chunk.buffers()[1] 587 assert buf.to_pybytes() == buf.size * b"\0" 588 589 590def test_parquet_compression_roundtrip(tempdir): 591 # ARROW-10480: ensure even with nonstandard Parquet file naming 592 # conventions, writing and then reading a file works. In 593 # particular, ensure that we don't automatically double-compress 594 # the stream due to auto-detecting the extension in the filename 595 table = pa.table([pa.array(range(4))], names=["ints"]) 596 path = tempdir / "arrow-10480.pyarrow.gz" 597 pq.write_table(table, path, compression="GZIP") 598 result = pq.read_table(path) 599 assert result.equals(table) 600 601 602def test_empty_row_groups(tempdir): 603 # ARROW-3020 604 table = pa.Table.from_arrays([pa.array([], type='int32')], ['f0']) 605 606 path = tempdir / 'empty_row_groups.parquet' 607 608 num_groups = 3 609 with pq.ParquetWriter(path, table.schema) as writer: 610 for i in range(num_groups): 611 writer.write_table(table) 612 613 reader = pq.ParquetFile(path) 614 assert reader.metadata.num_row_groups == num_groups 615 616 for i in range(num_groups): 617 assert reader.read_row_group(i).equals(table) 618 619 620def test_reads_over_batch(tempdir): 621 data = [None] * (1 << 20) 622 data.append([1]) 623 # Large list<int64> with mostly nones and one final 624 # value. This should force batched reads when 625 # reading back. 626 table = pa.Table.from_arrays([data], ['column']) 627 628 path = tempdir / 'arrow-11607.parquet' 629 pq.write_table(table, path) 630 table2 = pq.read_table(path) 631 assert table == table2 632