1# Licensed to the Apache Software Foundation (ASF) under one 2# or more contributor license agreements. See the NOTICE file 3# distributed with this work for additional information 4# regarding copyright ownership. The ASF licenses this file 5# to you under the Apache License, Version 2.0 (the 6# "License"); you may not use this file except in compliance 7# with the License. You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, 12# software distributed under the License is distributed on an 13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14# KIND, either express or implied. See the License for the 15# specific language governing permissions and limitations 16# under the License. 17 18import datetime 19from collections import OrderedDict 20 21import numpy as np 22import pytest 23 24import pyarrow as pa 25from pyarrow.tests.parquet.common import _check_roundtrip, make_sample_file 26 27try: 28 import pyarrow.parquet as pq 29 from pyarrow.tests.parquet.common import _write_table 30except ImportError: 31 pq = None 32 33 34try: 35 import pandas as pd 36 import pandas.testing as tm 37 38 from pyarrow.tests.parquet.common import alltypes_sample 39except ImportError: 40 pd = tm = None 41 42 43pytestmark = pytest.mark.parquet 44 45 46@pytest.mark.pandas 47def test_parquet_metadata_api(): 48 df = alltypes_sample(size=10000) 49 df = df.reindex(columns=sorted(df.columns)) 50 df.index = np.random.randint(0, 1000000, size=len(df)) 51 52 fileh = make_sample_file(df) 53 ncols = len(df.columns) 54 55 # Series of sniff tests 56 meta = fileh.metadata 57 repr(meta) 58 assert meta.num_rows == len(df) 59 assert meta.num_columns == ncols + 1 # +1 for index 60 assert meta.num_row_groups == 1 61 assert meta.format_version == '2.6' 62 assert 'parquet-cpp' in meta.created_by 63 assert isinstance(meta.serialized_size, int) 64 assert isinstance(meta.metadata, dict) 65 66 # Schema 67 schema = fileh.schema 68 assert meta.schema is schema 69 assert len(schema) == ncols + 1 # +1 for index 70 repr(schema) 71 72 col = schema[0] 73 repr(col) 74 assert col.name == df.columns[0] 75 assert col.max_definition_level == 1 76 assert col.max_repetition_level == 0 77 assert col.max_repetition_level == 0 78 79 assert col.physical_type == 'BOOLEAN' 80 assert col.converted_type == 'NONE' 81 82 with pytest.raises(IndexError): 83 schema[ncols + 1] # +1 for index 84 85 with pytest.raises(IndexError): 86 schema[-1] 87 88 # Row group 89 for rg in range(meta.num_row_groups): 90 rg_meta = meta.row_group(rg) 91 assert isinstance(rg_meta, pq.RowGroupMetaData) 92 repr(rg_meta) 93 94 for col in range(rg_meta.num_columns): 95 col_meta = rg_meta.column(col) 96 assert isinstance(col_meta, pq.ColumnChunkMetaData) 97 repr(col_meta) 98 99 with pytest.raises(IndexError): 100 meta.row_group(-1) 101 102 with pytest.raises(IndexError): 103 meta.row_group(meta.num_row_groups + 1) 104 105 rg_meta = meta.row_group(0) 106 assert rg_meta.num_rows == len(df) 107 assert rg_meta.num_columns == ncols + 1 # +1 for index 108 assert rg_meta.total_byte_size > 0 109 110 with pytest.raises(IndexError): 111 col_meta = rg_meta.column(-1) 112 113 with pytest.raises(IndexError): 114 col_meta = rg_meta.column(ncols + 2) 115 116 col_meta = rg_meta.column(0) 117 assert col_meta.file_offset > 0 118 assert col_meta.file_path == '' # created from BytesIO 119 assert col_meta.physical_type == 'BOOLEAN' 120 assert col_meta.num_values == 10000 121 assert col_meta.path_in_schema == 'bool' 122 assert col_meta.is_stats_set is True 123 assert isinstance(col_meta.statistics, pq.Statistics) 124 assert col_meta.compression == 'SNAPPY' 125 assert col_meta.encodings == ('PLAIN', 'RLE') 126 assert col_meta.has_dictionary_page is False 127 assert col_meta.dictionary_page_offset is None 128 assert col_meta.data_page_offset > 0 129 assert col_meta.total_compressed_size > 0 130 assert col_meta.total_uncompressed_size > 0 131 with pytest.raises(NotImplementedError): 132 col_meta.has_index_page 133 with pytest.raises(NotImplementedError): 134 col_meta.index_page_offset 135 136 137def test_parquet_metadata_lifetime(tempdir): 138 # ARROW-6642 - ensure that chained access keeps parent objects alive 139 table = pa.table({'a': [1, 2, 3]}) 140 pq.write_table(table, tempdir / 'test_metadata_segfault.parquet') 141 parquet_file = pq.ParquetFile(tempdir / 'test_metadata_segfault.parquet') 142 parquet_file.metadata.row_group(0).column(0).statistics 143 144 145@pytest.mark.pandas 146@pytest.mark.parametrize( 147 ( 148 'data', 149 'type', 150 'physical_type', 151 'min_value', 152 'max_value', 153 'null_count', 154 'num_values', 155 'distinct_count' 156 ), 157 [ 158 ([1, 2, 2, None, 4], pa.uint8(), 'INT32', 1, 4, 1, 4, 0), 159 ([1, 2, 2, None, 4], pa.uint16(), 'INT32', 1, 4, 1, 4, 0), 160 ([1, 2, 2, None, 4], pa.uint32(), 'INT32', 1, 4, 1, 4, 0), 161 ([1, 2, 2, None, 4], pa.uint64(), 'INT64', 1, 4, 1, 4, 0), 162 ([-1, 2, 2, None, 4], pa.int8(), 'INT32', -1, 4, 1, 4, 0), 163 ([-1, 2, 2, None, 4], pa.int16(), 'INT32', -1, 4, 1, 4, 0), 164 ([-1, 2, 2, None, 4], pa.int32(), 'INT32', -1, 4, 1, 4, 0), 165 ([-1, 2, 2, None, 4], pa.int64(), 'INT64', -1, 4, 1, 4, 0), 166 ( 167 [-1.1, 2.2, 2.3, None, 4.4], pa.float32(), 168 'FLOAT', -1.1, 4.4, 1, 4, 0 169 ), 170 ( 171 [-1.1, 2.2, 2.3, None, 4.4], pa.float64(), 172 'DOUBLE', -1.1, 4.4, 1, 4, 0 173 ), 174 ( 175 ['', 'b', chr(1000), None, 'aaa'], pa.binary(), 176 'BYTE_ARRAY', b'', chr(1000).encode('utf-8'), 1, 4, 0 177 ), 178 ( 179 [True, False, False, True, True], pa.bool_(), 180 'BOOLEAN', False, True, 0, 5, 0 181 ), 182 ( 183 [b'\x00', b'b', b'12', None, b'aaa'], pa.binary(), 184 'BYTE_ARRAY', b'\x00', b'b', 1, 4, 0 185 ), 186 ] 187) 188def test_parquet_column_statistics_api(data, type, physical_type, min_value, 189 max_value, null_count, num_values, 190 distinct_count): 191 df = pd.DataFrame({'data': data}) 192 schema = pa.schema([pa.field('data', type)]) 193 table = pa.Table.from_pandas(df, schema=schema, safe=False) 194 fileh = make_sample_file(table) 195 196 meta = fileh.metadata 197 198 rg_meta = meta.row_group(0) 199 col_meta = rg_meta.column(0) 200 201 stat = col_meta.statistics 202 assert stat.has_min_max 203 assert _close(type, stat.min, min_value) 204 assert _close(type, stat.max, max_value) 205 assert stat.null_count == null_count 206 assert stat.num_values == num_values 207 # TODO(kszucs) until parquet-cpp API doesn't expose HasDistinctCount 208 # method, missing distinct_count is represented as zero instead of None 209 assert stat.distinct_count == distinct_count 210 assert stat.physical_type == physical_type 211 212 213def _close(type, left, right): 214 if type == pa.float32(): 215 return abs(left - right) < 1E-7 216 elif type == pa.float64(): 217 return abs(left - right) < 1E-13 218 else: 219 return left == right 220 221 222# ARROW-6339 223@pytest.mark.pandas 224def test_parquet_raise_on_unset_statistics(): 225 df = pd.DataFrame({"t": pd.Series([pd.NaT], dtype="datetime64[ns]")}) 226 meta = make_sample_file(pa.Table.from_pandas(df)).metadata 227 228 assert not meta.row_group(0).column(0).statistics.has_min_max 229 assert meta.row_group(0).column(0).statistics.max is None 230 231 232def test_statistics_convert_logical_types(tempdir): 233 # ARROW-5166, ARROW-4139 234 235 # (min, max, type) 236 cases = [(10, 11164359321221007157, pa.uint64()), 237 (10, 4294967295, pa.uint32()), 238 ("ähnlich", "öffentlich", pa.utf8()), 239 (datetime.time(10, 30, 0, 1000), datetime.time(15, 30, 0, 1000), 240 pa.time32('ms')), 241 (datetime.time(10, 30, 0, 1000), datetime.time(15, 30, 0, 1000), 242 pa.time64('us')), 243 (datetime.datetime(2019, 6, 24, 0, 0, 0, 1000), 244 datetime.datetime(2019, 6, 25, 0, 0, 0, 1000), 245 pa.timestamp('ms')), 246 (datetime.datetime(2019, 6, 24, 0, 0, 0, 1000), 247 datetime.datetime(2019, 6, 25, 0, 0, 0, 1000), 248 pa.timestamp('us'))] 249 250 for i, (min_val, max_val, typ) in enumerate(cases): 251 t = pa.Table.from_arrays([pa.array([min_val, max_val], type=typ)], 252 ['col']) 253 path = str(tempdir / ('example{}.parquet'.format(i))) 254 pq.write_table(t, path, version='2.6') 255 pf = pq.ParquetFile(path) 256 stats = pf.metadata.row_group(0).column(0).statistics 257 assert stats.min == min_val 258 assert stats.max == max_val 259 260 261def test_parquet_write_disable_statistics(tempdir): 262 table = pa.Table.from_pydict( 263 OrderedDict([ 264 ('a', pa.array([1, 2, 3])), 265 ('b', pa.array(['a', 'b', 'c'])) 266 ]) 267 ) 268 _write_table(table, tempdir / 'data.parquet') 269 meta = pq.read_metadata(tempdir / 'data.parquet') 270 for col in [0, 1]: 271 cc = meta.row_group(0).column(col) 272 assert cc.is_stats_set is True 273 assert cc.statistics is not None 274 275 _write_table(table, tempdir / 'data2.parquet', write_statistics=False) 276 meta = pq.read_metadata(tempdir / 'data2.parquet') 277 for col in [0, 1]: 278 cc = meta.row_group(0).column(col) 279 assert cc.is_stats_set is False 280 assert cc.statistics is None 281 282 _write_table(table, tempdir / 'data3.parquet', write_statistics=['a']) 283 meta = pq.read_metadata(tempdir / 'data3.parquet') 284 cc_a = meta.row_group(0).column(0) 285 cc_b = meta.row_group(0).column(1) 286 assert cc_a.is_stats_set is True 287 assert cc_b.is_stats_set is False 288 assert cc_a.statistics is not None 289 assert cc_b.statistics is None 290 291 292def test_field_id_metadata(): 293 # ARROW-7080 294 field_id = b'PARQUET:field_id' 295 inner = pa.field('inner', pa.int32(), metadata={field_id: b'100'}) 296 middle = pa.field('middle', pa.struct( 297 [inner]), metadata={field_id: b'101'}) 298 fields = [ 299 pa.field('basic', pa.int32(), metadata={ 300 b'other': b'abc', field_id: b'1'}), 301 pa.field( 302 'list', 303 pa.list_(pa.field('list-inner', pa.int32(), 304 metadata={field_id: b'10'})), 305 metadata={field_id: b'11'}), 306 pa.field('struct', pa.struct([middle]), metadata={field_id: b'102'}), 307 pa.field('no-metadata', pa.int32()), 308 pa.field('non-integral-field-id', pa.int32(), 309 metadata={field_id: b'xyz'}), 310 pa.field('negative-field-id', pa.int32(), 311 metadata={field_id: b'-1000'}) 312 ] 313 arrs = [[] for _ in fields] 314 table = pa.table(arrs, schema=pa.schema(fields)) 315 316 bio = pa.BufferOutputStream() 317 pq.write_table(table, bio) 318 contents = bio.getvalue() 319 320 pf = pq.ParquetFile(pa.BufferReader(contents)) 321 schema = pf.schema_arrow 322 323 assert schema[0].metadata[field_id] == b'1' 324 assert schema[0].metadata[b'other'] == b'abc' 325 326 list_field = schema[1] 327 assert list_field.metadata[field_id] == b'11' 328 329 list_item_field = list_field.type.value_field 330 assert list_item_field.metadata[field_id] == b'10' 331 332 struct_field = schema[2] 333 assert struct_field.metadata[field_id] == b'102' 334 335 struct_middle_field = struct_field.type[0] 336 assert struct_middle_field.metadata[field_id] == b'101' 337 338 struct_inner_field = struct_middle_field.type[0] 339 assert struct_inner_field.metadata[field_id] == b'100' 340 341 assert schema[3].metadata is None 342 # Invalid input is passed through (ok) but does not 343 # have field_id in parquet (not tested) 344 assert schema[4].metadata[field_id] == b'xyz' 345 assert schema[5].metadata[field_id] == b'-1000' 346 347 348@pytest.mark.pandas 349def test_multi_dataset_metadata(tempdir): 350 filenames = ["ARROW-1983-dataset.0", "ARROW-1983-dataset.1"] 351 metapath = str(tempdir / "_metadata") 352 353 # create a test dataset 354 df = pd.DataFrame({ 355 'one': [1, 2, 3], 356 'two': [-1, -2, -3], 357 'three': [[1, 2], [2, 3], [3, 4]], 358 }) 359 table = pa.Table.from_pandas(df) 360 361 # write dataset twice and collect/merge metadata 362 _meta = None 363 for filename in filenames: 364 meta = [] 365 pq.write_table(table, str(tempdir / filename), 366 metadata_collector=meta) 367 meta[0].set_file_path(filename) 368 if _meta is None: 369 _meta = meta[0] 370 else: 371 _meta.append_row_groups(meta[0]) 372 373 # Write merged metadata-only file 374 with open(metapath, "wb") as f: 375 _meta.write_metadata_file(f) 376 377 # Read back the metadata 378 meta = pq.read_metadata(metapath) 379 md = meta.to_dict() 380 _md = _meta.to_dict() 381 for key in _md: 382 if key != 'serialized_size': 383 assert _md[key] == md[key] 384 assert _md['num_columns'] == 3 385 assert _md['num_rows'] == 6 386 assert _md['num_row_groups'] == 2 387 assert _md['serialized_size'] == 0 388 assert md['serialized_size'] > 0 389 390 391def test_write_metadata(tempdir): 392 path = str(tempdir / "metadata") 393 schema = pa.schema([("a", "int64"), ("b", "float64")]) 394 395 # write a pyarrow schema 396 pq.write_metadata(schema, path) 397 parquet_meta = pq.read_metadata(path) 398 schema_as_arrow = parquet_meta.schema.to_arrow_schema() 399 assert schema_as_arrow.equals(schema) 400 401 # ARROW-8980: Check that the ARROW:schema metadata key was removed 402 if schema_as_arrow.metadata: 403 assert b'ARROW:schema' not in schema_as_arrow.metadata 404 405 # pass through writer keyword arguments 406 for version in ["1.0", "2.0", "2.4", "2.6"]: 407 pq.write_metadata(schema, path, version=version) 408 parquet_meta = pq.read_metadata(path) 409 # The version is stored as a single integer in the Parquet metadata, 410 # so it cannot correctly express dotted format versions 411 expected_version = "1.0" if version == "1.0" else "2.6" 412 assert parquet_meta.format_version == expected_version 413 414 # metadata_collector: list of FileMetaData objects 415 table = pa.table({'a': [1, 2], 'b': [.1, .2]}, schema=schema) 416 pq.write_table(table, tempdir / "data.parquet") 417 parquet_meta = pq.read_metadata(str(tempdir / "data.parquet")) 418 pq.write_metadata( 419 schema, path, metadata_collector=[parquet_meta, parquet_meta] 420 ) 421 parquet_meta_mult = pq.read_metadata(path) 422 assert parquet_meta_mult.num_row_groups == 2 423 424 # append metadata with different schema raises an error 425 with pytest.raises(RuntimeError, match="requires equal schemas"): 426 pq.write_metadata( 427 pa.schema([("a", "int32"), ("b", "null")]), 428 path, metadata_collector=[parquet_meta, parquet_meta] 429 ) 430 431 432def test_table_large_metadata(): 433 # ARROW-8694 434 my_schema = pa.schema([pa.field('f0', 'double')], 435 metadata={'large': 'x' * 10000000}) 436 437 table = pa.table([np.arange(10)], schema=my_schema) 438 _check_roundtrip(table) 439 440 441@pytest.mark.pandas 442def test_compare_schemas(): 443 df = alltypes_sample(size=10000) 444 445 fileh = make_sample_file(df) 446 fileh2 = make_sample_file(df) 447 fileh3 = make_sample_file(df[df.columns[::2]]) 448 449 # ParquetSchema 450 assert isinstance(fileh.schema, pq.ParquetSchema) 451 assert fileh.schema.equals(fileh.schema) 452 assert fileh.schema == fileh.schema 453 assert fileh.schema.equals(fileh2.schema) 454 assert fileh.schema == fileh2.schema 455 assert fileh.schema != 'arbitrary object' 456 assert not fileh.schema.equals(fileh3.schema) 457 assert fileh.schema != fileh3.schema 458 459 # ColumnSchema 460 assert isinstance(fileh.schema[0], pq.ColumnSchema) 461 assert fileh.schema[0].equals(fileh.schema[0]) 462 assert fileh.schema[0] == fileh.schema[0] 463 assert not fileh.schema[0].equals(fileh.schema[1]) 464 assert fileh.schema[0] != fileh.schema[1] 465 assert fileh.schema[0] != 'arbitrary object' 466 467 468@pytest.mark.pandas 469def test_read_schema(tempdir): 470 N = 100 471 df = pd.DataFrame({ 472 'index': np.arange(N), 473 'values': np.random.randn(N) 474 }, columns=['index', 'values']) 475 476 data_path = tempdir / 'test.parquet' 477 478 table = pa.Table.from_pandas(df) 479 _write_table(table, data_path) 480 481 read1 = pq.read_schema(data_path) 482 read2 = pq.read_schema(data_path, memory_map=True) 483 assert table.schema.equals(read1) 484 assert table.schema.equals(read2) 485 486 assert table.schema.metadata[b'pandas'] == read1.metadata[b'pandas'] 487 488 489def test_parquet_metadata_empty_to_dict(tempdir): 490 # https://issues.apache.org/jira/browse/ARROW-10146 491 table = pa.table({"a": pa.array([], type="int64")}) 492 pq.write_table(table, tempdir / "data.parquet") 493 metadata = pq.read_metadata(tempdir / "data.parquet") 494 # ensure this doesn't error / statistics set to None 495 metadata_dict = metadata.to_dict() 496 assert len(metadata_dict["row_groups"]) == 1 497 assert len(metadata_dict["row_groups"][0]["columns"]) == 1 498 assert metadata_dict["row_groups"][0]["columns"][0]["statistics"] is None 499 500 501@pytest.mark.slow 502@pytest.mark.large_memory 503def test_metadata_exceeds_message_size(): 504 # ARROW-13655: Thrift may enable a defaut message size that limits 505 # the size of Parquet metadata that can be written. 506 NCOLS = 1000 507 NREPEATS = 4000 508 509 table = pa.table({str(i): np.random.randn(10) for i in range(NCOLS)}) 510 511 with pa.BufferOutputStream() as out: 512 pq.write_table(table, out) 513 buf = out.getvalue() 514 515 original_metadata = pq.read_metadata(pa.BufferReader(buf)) 516 metadata = pq.read_metadata(pa.BufferReader(buf)) 517 for i in range(NREPEATS): 518 metadata.append_row_groups(original_metadata) 519 520 with pa.BufferOutputStream() as out: 521 metadata.write_metadata_file(out) 522 buf = out.getvalue() 523 524 metadata = pq.read_metadata(pa.BufferReader(buf)) 525