// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include #include #include #include #include /* * This example describes writing and reading Parquet Files in C++ and serves as a * reference to the API. * The file contains all the physical data types supported by Parquet. * This example uses the RowGroupWriter API that supports writing RowGroups based on a * certain size. **/ /* Parquet is a structured columnar file format * Parquet File = "Parquet data" + "Parquet Metadata" * "Parquet data" is simply a vector of RowGroups. Each RowGroup is a batch of rows in a * columnar layout * "Parquet Metadata" contains the "file schema" and attributes of the RowGroups and their * Columns * "file schema" is a tree where each node is either a primitive type (leaf nodes) or a * complex (nested) type (internal nodes) * For specific details, please refer the format here: * https://github.com/apache/parquet-format/blob/master/LogicalTypes.md **/ constexpr int NUM_ROWS = 2500000; constexpr int64_t ROW_GROUP_SIZE = 16 * 1024 * 1024; // 16 MB const char PARQUET_FILENAME[] = "parquet_cpp_example2.parquet"; int main(int argc, char** argv) { /********************************************************************************** PARQUET WRITER EXAMPLE **********************************************************************************/ // parquet::REQUIRED fields do not need definition and repetition level values // parquet::OPTIONAL fields require only definition level values // parquet::REPEATED fields require both definition and repetition level values try { // Create a local file output stream instance. using FileClass = ::arrow::io::FileOutputStream; std::shared_ptr out_file; PARQUET_ASSIGN_OR_THROW(out_file, FileClass::Open(PARQUET_FILENAME)); // Setup the parquet schema std::shared_ptr schema = SetupSchema(); // Add writer properties parquet::WriterProperties::Builder builder; builder.compression(parquet::Compression::SNAPPY); std::shared_ptr props = builder.build(); // Create a ParquetFileWriter instance std::shared_ptr file_writer = parquet::ParquetFileWriter::Open(out_file, schema, props); // Append a BufferedRowGroup to keep the RowGroup open until a certain size parquet::RowGroupWriter* rg_writer = file_writer->AppendBufferedRowGroup(); int num_columns = file_writer->num_columns(); std::vector buffered_values_estimate(num_columns, 0); for (int i = 0; i < NUM_ROWS; i++) { int64_t estimated_bytes = 0; // Get the estimated size of the values that are not written to a page yet for (int n = 0; n < num_columns; n++) { estimated_bytes += buffered_values_estimate[n]; } // We need to consider the compressed pages // as well as the values that are not compressed yet if ((rg_writer->total_bytes_written() + rg_writer->total_compressed_bytes() + estimated_bytes) > ROW_GROUP_SIZE) { rg_writer->Close(); std::fill(buffered_values_estimate.begin(), buffered_values_estimate.end(), 0); rg_writer = file_writer->AppendBufferedRowGroup(); } int col_id = 0; // Write the Bool column parquet::BoolWriter* bool_writer = static_cast(rg_writer->column(col_id)); bool bool_value = ((i % 2) == 0) ? true : false; bool_writer->WriteBatch(1, nullptr, nullptr, &bool_value); buffered_values_estimate[col_id] = bool_writer->EstimatedBufferedValueBytes(); // Write the Int32 column col_id++; parquet::Int32Writer* int32_writer = static_cast(rg_writer->column(col_id)); int32_t int32_value = i; int32_writer->WriteBatch(1, nullptr, nullptr, &int32_value); buffered_values_estimate[col_id] = int32_writer->EstimatedBufferedValueBytes(); // Write the Int64 column. Each row has repeats twice. col_id++; parquet::Int64Writer* int64_writer = static_cast(rg_writer->column(col_id)); int64_t int64_value1 = 2 * i; int16_t definition_level = 1; int16_t repetition_level = 0; int64_writer->WriteBatch(1, &definition_level, &repetition_level, &int64_value1); int64_t int64_value2 = (2 * i + 1); repetition_level = 1; // start of a new record int64_writer->WriteBatch(1, &definition_level, &repetition_level, &int64_value2); buffered_values_estimate[col_id] = int64_writer->EstimatedBufferedValueBytes(); // Write the INT96 column. col_id++; parquet::Int96Writer* int96_writer = static_cast(rg_writer->column(col_id)); parquet::Int96 int96_value; int96_value.value[0] = i; int96_value.value[1] = i + 1; int96_value.value[2] = i + 2; int96_writer->WriteBatch(1, nullptr, nullptr, &int96_value); buffered_values_estimate[col_id] = int96_writer->EstimatedBufferedValueBytes(); // Write the Float column col_id++; parquet::FloatWriter* float_writer = static_cast(rg_writer->column(col_id)); float float_value = static_cast(i) * 1.1f; float_writer->WriteBatch(1, nullptr, nullptr, &float_value); buffered_values_estimate[col_id] = float_writer->EstimatedBufferedValueBytes(); // Write the Double column col_id++; parquet::DoubleWriter* double_writer = static_cast(rg_writer->column(col_id)); double double_value = i * 1.1111111; double_writer->WriteBatch(1, nullptr, nullptr, &double_value); buffered_values_estimate[col_id] = double_writer->EstimatedBufferedValueBytes(); // Write the ByteArray column. Make every alternate values NULL col_id++; parquet::ByteArrayWriter* ba_writer = static_cast(rg_writer->column(col_id)); parquet::ByteArray ba_value; char hello[FIXED_LENGTH] = "parquet"; hello[7] = static_cast(static_cast('0') + i / 100); hello[8] = static_cast(static_cast('0') + (i / 10) % 10); hello[9] = static_cast(static_cast('0') + i % 10); if (i % 2 == 0) { int16_t definition_level = 1; ba_value.ptr = reinterpret_cast(&hello[0]); ba_value.len = FIXED_LENGTH; ba_writer->WriteBatch(1, &definition_level, nullptr, &ba_value); } else { int16_t definition_level = 0; ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr); } buffered_values_estimate[col_id] = ba_writer->EstimatedBufferedValueBytes(); // Write the FixedLengthByteArray column col_id++; parquet::FixedLenByteArrayWriter* flba_writer = static_cast(rg_writer->column(col_id)); parquet::FixedLenByteArray flba_value; char v = static_cast(i); char flba[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v}; flba_value.ptr = reinterpret_cast(&flba[0]); flba_writer->WriteBatch(1, nullptr, nullptr, &flba_value); buffered_values_estimate[col_id] = flba_writer->EstimatedBufferedValueBytes(); } // Close the RowGroupWriter rg_writer->Close(); // Close the ParquetFileWriter file_writer->Close(); // Write the bytes to file DCHECK(out_file->Close().ok()); } catch (const std::exception& e) { std::cerr << "Parquet write error: " << e.what() << std::endl; return -1; } /********************************************************************************** PARQUET READER EXAMPLE **********************************************************************************/ try { // Create a ParquetReader instance std::unique_ptr parquet_reader = parquet::ParquetFileReader::OpenFile(PARQUET_FILENAME, false); // Get the File MetaData std::shared_ptr file_metadata = parquet_reader->metadata(); int num_row_groups = file_metadata->num_row_groups(); // Get the number of Columns int num_columns = file_metadata->num_columns(); assert(num_columns == 8); std::vector col_row_counts(num_columns, 0); // Iterate over all the RowGroups in the file for (int r = 0; r < num_row_groups; ++r) { // Get the RowGroup Reader std::shared_ptr row_group_reader = parquet_reader->RowGroup(r); assert(row_group_reader->metadata()->total_byte_size() < ROW_GROUP_SIZE); int64_t values_read = 0; int64_t rows_read = 0; int16_t definition_level; int16_t repetition_level; std::shared_ptr column_reader; int col_id = 0; ARROW_UNUSED(rows_read); // prevent warning in release build // Get the Column Reader for the boolean column column_reader = row_group_reader->Column(col_id); parquet::BoolReader* bool_reader = static_cast(column_reader.get()); // Read all the rows in the column while (bool_reader->HasNext()) { bool value; // Read one value at a time. The number of rows read is returned. values_read // contains the number of non-null rows rows_read = bool_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); // Ensure only one value is read assert(rows_read == 1); // There are no NULL values in the rows written assert(values_read == 1); // Verify the value written bool expected_value = ((col_row_counts[col_id] % 2) == 0) ? true : false; assert(value == expected_value); col_row_counts[col_id]++; } // Get the Column Reader for the Int32 column col_id++; column_reader = row_group_reader->Column(col_id); parquet::Int32Reader* int32_reader = static_cast(column_reader.get()); // Read all the rows in the column while (int32_reader->HasNext()) { int32_t value; // Read one value at a time. The number of rows read is returned. values_read // contains the number of non-null rows rows_read = int32_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); // Ensure only one value is read assert(rows_read == 1); // There are no NULL values in the rows written assert(values_read == 1); // Verify the value written assert(value == col_row_counts[col_id]); col_row_counts[col_id]++; } // Get the Column Reader for the Int64 column col_id++; column_reader = row_group_reader->Column(col_id); parquet::Int64Reader* int64_reader = static_cast(column_reader.get()); // Read all the rows in the column while (int64_reader->HasNext()) { int64_t value; // Read one value at a time. The number of rows read is returned. values_read // contains the number of non-null rows rows_read = int64_reader->ReadBatch(1, &definition_level, &repetition_level, &value, &values_read); // Ensure only one value is read assert(rows_read == 1); // There are no NULL values in the rows written assert(values_read == 1); // Verify the value written int64_t expected_value = col_row_counts[col_id]; assert(value == expected_value); if ((col_row_counts[col_id] % 2) == 0) { assert(repetition_level == 0); } else { assert(repetition_level == 1); } col_row_counts[col_id]++; } // Get the Column Reader for the Int96 column col_id++; column_reader = row_group_reader->Column(col_id); parquet::Int96Reader* int96_reader = static_cast(column_reader.get()); // Read all the rows in the column while (int96_reader->HasNext()) { parquet::Int96 value; // Read one value at a time. The number of rows read is returned. values_read // contains the number of non-null rows rows_read = int96_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); // Ensure only one value is read assert(rows_read == 1); // There are no NULL values in the rows written assert(values_read == 1); // Verify the value written parquet::Int96 expected_value; ARROW_UNUSED(expected_value); // prevent warning in release build expected_value.value[0] = col_row_counts[col_id]; expected_value.value[1] = col_row_counts[col_id] + 1; expected_value.value[2] = col_row_counts[col_id] + 2; for (int j = 0; j < 3; j++) { assert(value.value[j] == expected_value.value[j]); } col_row_counts[col_id]++; } // Get the Column Reader for the Float column col_id++; column_reader = row_group_reader->Column(col_id); parquet::FloatReader* float_reader = static_cast(column_reader.get()); // Read all the rows in the column while (float_reader->HasNext()) { float value; // Read one value at a time. The number of rows read is returned. values_read // contains the number of non-null rows rows_read = float_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); // Ensure only one value is read assert(rows_read == 1); // There are no NULL values in the rows written assert(values_read == 1); // Verify the value written float expected_value = static_cast(col_row_counts[col_id]) * 1.1f; assert(value == expected_value); col_row_counts[col_id]++; } // Get the Column Reader for the Double column col_id++; column_reader = row_group_reader->Column(col_id); parquet::DoubleReader* double_reader = static_cast(column_reader.get()); // Read all the rows in the column while (double_reader->HasNext()) { double value; // Read one value at a time. The number of rows read is returned. values_read // contains the number of non-null rows rows_read = double_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); // Ensure only one value is read assert(rows_read == 1); // There are no NULL values in the rows written assert(values_read == 1); // Verify the value written double expected_value = col_row_counts[col_id] * 1.1111111; assert(value == expected_value); col_row_counts[col_id]++; } // Get the Column Reader for the ByteArray column col_id++; column_reader = row_group_reader->Column(col_id); parquet::ByteArrayReader* ba_reader = static_cast(column_reader.get()); // Read all the rows in the column while (ba_reader->HasNext()) { parquet::ByteArray value; // Read one value at a time. The number of rows read is returned. values_read // contains the number of non-null rows rows_read = ba_reader->ReadBatch(1, &definition_level, nullptr, &value, &values_read); // Ensure only one value is read assert(rows_read == 1); // Verify the value written char expected_value[FIXED_LENGTH] = "parquet"; ARROW_UNUSED(expected_value); // prevent warning in release build expected_value[7] = static_cast('0' + col_row_counts[col_id] / 100); expected_value[8] = static_cast('0' + (col_row_counts[col_id] / 10) % 10); expected_value[9] = static_cast('0' + col_row_counts[col_id] % 10); if (col_row_counts[col_id] % 2 == 0) { // only alternate values exist // There are no NULL values in the rows written assert(values_read == 1); assert(value.len == FIXED_LENGTH); assert(memcmp(value.ptr, &expected_value[0], FIXED_LENGTH) == 0); assert(definition_level == 1); } else { // There are NULL values in the rows written assert(values_read == 0); assert(definition_level == 0); } col_row_counts[col_id]++; } // Get the Column Reader for the FixedLengthByteArray column col_id++; column_reader = row_group_reader->Column(col_id); parquet::FixedLenByteArrayReader* flba_reader = static_cast(column_reader.get()); // Read all the rows in the column while (flba_reader->HasNext()) { parquet::FixedLenByteArray value; // Read one value at a time. The number of rows read is returned. values_read // contains the number of non-null rows rows_read = flba_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); // Ensure only one value is read assert(rows_read == 1); // There are no NULL values in the rows written assert(values_read == 1); // Verify the value written char v = static_cast(col_row_counts[col_id]); char expected_value[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v}; assert(memcmp(value.ptr, &expected_value[0], FIXED_LENGTH) == 0); col_row_counts[col_id]++; } } } catch (const std::exception& e) { std::cerr << "Parquet read error: " << e.what() << std::endl; return -1; } std::cout << "Parquet Writing and Reading Complete" << std::endl; return 0; }