1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include <reader_writer.h>
19
20 #include <cassert>
21 #include <fstream>
22 #include <iostream>
23 #include <memory>
24
25 /*
26 * This example describes writing and reading Parquet Files in C++ and serves as a
27 * reference to the API.
28 * The file contains all the physical data types supported by Parquet.
29 * This example uses the RowGroupWriter API that supports writing RowGroups based on a
30 * certain size.
31 **/
32
33 /* Parquet is a structured columnar file format
34 * Parquet File = "Parquet data" + "Parquet Metadata"
35 * "Parquet data" is simply a vector of RowGroups. Each RowGroup is a batch of rows in a
36 * columnar layout
37 * "Parquet Metadata" contains the "file schema" and attributes of the RowGroups and their
38 * Columns
39 * "file schema" is a tree where each node is either a primitive type (leaf nodes) or a
40 * complex (nested) type (internal nodes)
41 * For specific details, please refer the format here:
42 * https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
43 **/
44
45 constexpr int NUM_ROWS = 2500000;
46 constexpr int64_t ROW_GROUP_SIZE = 16 * 1024 * 1024; // 16 MB
47 const char PARQUET_FILENAME[] = "parquet_cpp_example2.parquet";
48
main(int argc,char ** argv)49 int main(int argc, char** argv) {
50 /**********************************************************************************
51 PARQUET WRITER EXAMPLE
52 **********************************************************************************/
53 // parquet::REQUIRED fields do not need definition and repetition level values
54 // parquet::OPTIONAL fields require only definition level values
55 // parquet::REPEATED fields require both definition and repetition level values
56 try {
57 // Create a local file output stream instance.
58 using FileClass = ::arrow::io::FileOutputStream;
59 std::shared_ptr<FileClass> out_file;
60 PARQUET_ASSIGN_OR_THROW(out_file, FileClass::Open(PARQUET_FILENAME));
61
62 // Setup the parquet schema
63 std::shared_ptr<GroupNode> schema = SetupSchema();
64
65 // Add writer properties
66 parquet::WriterProperties::Builder builder;
67 builder.compression(parquet::Compression::SNAPPY);
68 std::shared_ptr<parquet::WriterProperties> props = builder.build();
69
70 // Create a ParquetFileWriter instance
71 std::shared_ptr<parquet::ParquetFileWriter> file_writer =
72 parquet::ParquetFileWriter::Open(out_file, schema, props);
73
74 // Append a BufferedRowGroup to keep the RowGroup open until a certain size
75 parquet::RowGroupWriter* rg_writer = file_writer->AppendBufferedRowGroup();
76
77 int num_columns = file_writer->num_columns();
78 std::vector<int64_t> buffered_values_estimate(num_columns, 0);
79 for (int i = 0; i < NUM_ROWS; i++) {
80 int64_t estimated_bytes = 0;
81 // Get the estimated size of the values that are not written to a page yet
82 for (int n = 0; n < num_columns; n++) {
83 estimated_bytes += buffered_values_estimate[n];
84 }
85
86 // We need to consider the compressed pages
87 // as well as the values that are not compressed yet
88 if ((rg_writer->total_bytes_written() + rg_writer->total_compressed_bytes() +
89 estimated_bytes) > ROW_GROUP_SIZE) {
90 rg_writer->Close();
91 std::fill(buffered_values_estimate.begin(), buffered_values_estimate.end(), 0);
92 rg_writer = file_writer->AppendBufferedRowGroup();
93 }
94
95 int col_id = 0;
96 // Write the Bool column
97 parquet::BoolWriter* bool_writer =
98 static_cast<parquet::BoolWriter*>(rg_writer->column(col_id));
99 bool bool_value = ((i % 2) == 0) ? true : false;
100 bool_writer->WriteBatch(1, nullptr, nullptr, &bool_value);
101 buffered_values_estimate[col_id] = bool_writer->EstimatedBufferedValueBytes();
102
103 // Write the Int32 column
104 col_id++;
105 parquet::Int32Writer* int32_writer =
106 static_cast<parquet::Int32Writer*>(rg_writer->column(col_id));
107 int32_t int32_value = i;
108 int32_writer->WriteBatch(1, nullptr, nullptr, &int32_value);
109 buffered_values_estimate[col_id] = int32_writer->EstimatedBufferedValueBytes();
110
111 // Write the Int64 column. Each row has repeats twice.
112 col_id++;
113 parquet::Int64Writer* int64_writer =
114 static_cast<parquet::Int64Writer*>(rg_writer->column(col_id));
115 int64_t int64_value1 = 2 * i;
116 int16_t definition_level = 1;
117 int16_t repetition_level = 0;
118 int64_writer->WriteBatch(1, &definition_level, &repetition_level, &int64_value1);
119 int64_t int64_value2 = (2 * i + 1);
120 repetition_level = 1; // start of a new record
121 int64_writer->WriteBatch(1, &definition_level, &repetition_level, &int64_value2);
122 buffered_values_estimate[col_id] = int64_writer->EstimatedBufferedValueBytes();
123
124 // Write the INT96 column.
125 col_id++;
126 parquet::Int96Writer* int96_writer =
127 static_cast<parquet::Int96Writer*>(rg_writer->column(col_id));
128 parquet::Int96 int96_value;
129 int96_value.value[0] = i;
130 int96_value.value[1] = i + 1;
131 int96_value.value[2] = i + 2;
132 int96_writer->WriteBatch(1, nullptr, nullptr, &int96_value);
133 buffered_values_estimate[col_id] = int96_writer->EstimatedBufferedValueBytes();
134
135 // Write the Float column
136 col_id++;
137 parquet::FloatWriter* float_writer =
138 static_cast<parquet::FloatWriter*>(rg_writer->column(col_id));
139 float float_value = static_cast<float>(i) * 1.1f;
140 float_writer->WriteBatch(1, nullptr, nullptr, &float_value);
141 buffered_values_estimate[col_id] = float_writer->EstimatedBufferedValueBytes();
142
143 // Write the Double column
144 col_id++;
145 parquet::DoubleWriter* double_writer =
146 static_cast<parquet::DoubleWriter*>(rg_writer->column(col_id));
147 double double_value = i * 1.1111111;
148 double_writer->WriteBatch(1, nullptr, nullptr, &double_value);
149 buffered_values_estimate[col_id] = double_writer->EstimatedBufferedValueBytes();
150
151 // Write the ByteArray column. Make every alternate values NULL
152 col_id++;
153 parquet::ByteArrayWriter* ba_writer =
154 static_cast<parquet::ByteArrayWriter*>(rg_writer->column(col_id));
155 parquet::ByteArray ba_value;
156 char hello[FIXED_LENGTH] = "parquet";
157 hello[7] = static_cast<char>(static_cast<int>('0') + i / 100);
158 hello[8] = static_cast<char>(static_cast<int>('0') + (i / 10) % 10);
159 hello[9] = static_cast<char>(static_cast<int>('0') + i % 10);
160 if (i % 2 == 0) {
161 int16_t definition_level = 1;
162 ba_value.ptr = reinterpret_cast<const uint8_t*>(&hello[0]);
163 ba_value.len = FIXED_LENGTH;
164 ba_writer->WriteBatch(1, &definition_level, nullptr, &ba_value);
165 } else {
166 int16_t definition_level = 0;
167 ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr);
168 }
169 buffered_values_estimate[col_id] = ba_writer->EstimatedBufferedValueBytes();
170
171 // Write the FixedLengthByteArray column
172 col_id++;
173 parquet::FixedLenByteArrayWriter* flba_writer =
174 static_cast<parquet::FixedLenByteArrayWriter*>(rg_writer->column(col_id));
175 parquet::FixedLenByteArray flba_value;
176 char v = static_cast<char>(i);
177 char flba[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v};
178 flba_value.ptr = reinterpret_cast<const uint8_t*>(&flba[0]);
179
180 flba_writer->WriteBatch(1, nullptr, nullptr, &flba_value);
181 buffered_values_estimate[col_id] = flba_writer->EstimatedBufferedValueBytes();
182 }
183
184 // Close the RowGroupWriter
185 rg_writer->Close();
186 // Close the ParquetFileWriter
187 file_writer->Close();
188
189 // Write the bytes to file
190 DCHECK(out_file->Close().ok());
191 } catch (const std::exception& e) {
192 std::cerr << "Parquet write error: " << e.what() << std::endl;
193 return -1;
194 }
195
196 /**********************************************************************************
197 PARQUET READER EXAMPLE
198 **********************************************************************************/
199
200 try {
201 // Create a ParquetReader instance
202 std::unique_ptr<parquet::ParquetFileReader> parquet_reader =
203 parquet::ParquetFileReader::OpenFile(PARQUET_FILENAME, false);
204
205 // Get the File MetaData
206 std::shared_ptr<parquet::FileMetaData> file_metadata = parquet_reader->metadata();
207
208 int num_row_groups = file_metadata->num_row_groups();
209
210 // Get the number of Columns
211 int num_columns = file_metadata->num_columns();
212 assert(num_columns == 8);
213
214 std::vector<int> col_row_counts(num_columns, 0);
215
216 // Iterate over all the RowGroups in the file
217 for (int r = 0; r < num_row_groups; ++r) {
218 // Get the RowGroup Reader
219 std::shared_ptr<parquet::RowGroupReader> row_group_reader =
220 parquet_reader->RowGroup(r);
221
222 assert(row_group_reader->metadata()->total_byte_size() < ROW_GROUP_SIZE);
223
224 int64_t values_read = 0;
225 int64_t rows_read = 0;
226 int16_t definition_level;
227 int16_t repetition_level;
228 std::shared_ptr<parquet::ColumnReader> column_reader;
229 int col_id = 0;
230
231 ARROW_UNUSED(rows_read); // prevent warning in release build
232
233 // Get the Column Reader for the boolean column
234 column_reader = row_group_reader->Column(col_id);
235 parquet::BoolReader* bool_reader =
236 static_cast<parquet::BoolReader*>(column_reader.get());
237
238 // Read all the rows in the column
239 while (bool_reader->HasNext()) {
240 bool value;
241 // Read one value at a time. The number of rows read is returned. values_read
242 // contains the number of non-null rows
243 rows_read = bool_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
244 // Ensure only one value is read
245 assert(rows_read == 1);
246 // There are no NULL values in the rows written
247 assert(values_read == 1);
248 // Verify the value written
249 bool expected_value = ((col_row_counts[col_id] % 2) == 0) ? true : false;
250 assert(value == expected_value);
251 col_row_counts[col_id]++;
252 }
253
254 // Get the Column Reader for the Int32 column
255 col_id++;
256 column_reader = row_group_reader->Column(col_id);
257 parquet::Int32Reader* int32_reader =
258 static_cast<parquet::Int32Reader*>(column_reader.get());
259 // Read all the rows in the column
260 while (int32_reader->HasNext()) {
261 int32_t value;
262 // Read one value at a time. The number of rows read is returned. values_read
263 // contains the number of non-null rows
264 rows_read = int32_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
265 // Ensure only one value is read
266 assert(rows_read == 1);
267 // There are no NULL values in the rows written
268 assert(values_read == 1);
269 // Verify the value written
270 assert(value == col_row_counts[col_id]);
271 col_row_counts[col_id]++;
272 }
273
274 // Get the Column Reader for the Int64 column
275 col_id++;
276 column_reader = row_group_reader->Column(col_id);
277 parquet::Int64Reader* int64_reader =
278 static_cast<parquet::Int64Reader*>(column_reader.get());
279 // Read all the rows in the column
280 while (int64_reader->HasNext()) {
281 int64_t value;
282 // Read one value at a time. The number of rows read is returned. values_read
283 // contains the number of non-null rows
284 rows_read = int64_reader->ReadBatch(1, &definition_level, &repetition_level,
285 &value, &values_read);
286 // Ensure only one value is read
287 assert(rows_read == 1);
288 // There are no NULL values in the rows written
289 assert(values_read == 1);
290 // Verify the value written
291 int64_t expected_value = col_row_counts[col_id];
292 assert(value == expected_value);
293 if ((col_row_counts[col_id] % 2) == 0) {
294 assert(repetition_level == 0);
295 } else {
296 assert(repetition_level == 1);
297 }
298 col_row_counts[col_id]++;
299 }
300
301 // Get the Column Reader for the Int96 column
302 col_id++;
303 column_reader = row_group_reader->Column(col_id);
304 parquet::Int96Reader* int96_reader =
305 static_cast<parquet::Int96Reader*>(column_reader.get());
306 // Read all the rows in the column
307 while (int96_reader->HasNext()) {
308 parquet::Int96 value;
309 // Read one value at a time. The number of rows read is returned. values_read
310 // contains the number of non-null rows
311 rows_read = int96_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
312 // Ensure only one value is read
313 assert(rows_read == 1);
314 // There are no NULL values in the rows written
315 assert(values_read == 1);
316 // Verify the value written
317 parquet::Int96 expected_value;
318 ARROW_UNUSED(expected_value); // prevent warning in release build
319 expected_value.value[0] = col_row_counts[col_id];
320 expected_value.value[1] = col_row_counts[col_id] + 1;
321 expected_value.value[2] = col_row_counts[col_id] + 2;
322 for (int j = 0; j < 3; j++) {
323 assert(value.value[j] == expected_value.value[j]);
324 }
325 col_row_counts[col_id]++;
326 }
327
328 // Get the Column Reader for the Float column
329 col_id++;
330 column_reader = row_group_reader->Column(col_id);
331 parquet::FloatReader* float_reader =
332 static_cast<parquet::FloatReader*>(column_reader.get());
333 // Read all the rows in the column
334 while (float_reader->HasNext()) {
335 float value;
336 // Read one value at a time. The number of rows read is returned. values_read
337 // contains the number of non-null rows
338 rows_read = float_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
339 // Ensure only one value is read
340 assert(rows_read == 1);
341 // There are no NULL values in the rows written
342 assert(values_read == 1);
343 // Verify the value written
344 float expected_value = static_cast<float>(col_row_counts[col_id]) * 1.1f;
345 assert(value == expected_value);
346 col_row_counts[col_id]++;
347 }
348
349 // Get the Column Reader for the Double column
350 col_id++;
351 column_reader = row_group_reader->Column(col_id);
352 parquet::DoubleReader* double_reader =
353 static_cast<parquet::DoubleReader*>(column_reader.get());
354 // Read all the rows in the column
355 while (double_reader->HasNext()) {
356 double value;
357 // Read one value at a time. The number of rows read is returned. values_read
358 // contains the number of non-null rows
359 rows_read = double_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
360 // Ensure only one value is read
361 assert(rows_read == 1);
362 // There are no NULL values in the rows written
363 assert(values_read == 1);
364 // Verify the value written
365 double expected_value = col_row_counts[col_id] * 1.1111111;
366 assert(value == expected_value);
367 col_row_counts[col_id]++;
368 }
369
370 // Get the Column Reader for the ByteArray column
371 col_id++;
372 column_reader = row_group_reader->Column(col_id);
373 parquet::ByteArrayReader* ba_reader =
374 static_cast<parquet::ByteArrayReader*>(column_reader.get());
375 // Read all the rows in the column
376 while (ba_reader->HasNext()) {
377 parquet::ByteArray value;
378 // Read one value at a time. The number of rows read is returned. values_read
379 // contains the number of non-null rows
380 rows_read =
381 ba_reader->ReadBatch(1, &definition_level, nullptr, &value, &values_read);
382 // Ensure only one value is read
383 assert(rows_read == 1);
384 // Verify the value written
385 char expected_value[FIXED_LENGTH] = "parquet";
386 ARROW_UNUSED(expected_value); // prevent warning in release build
387 expected_value[7] = static_cast<char>('0' + col_row_counts[col_id] / 100);
388 expected_value[8] = static_cast<char>('0' + (col_row_counts[col_id] / 10) % 10);
389 expected_value[9] = static_cast<char>('0' + col_row_counts[col_id] % 10);
390 if (col_row_counts[col_id] % 2 == 0) { // only alternate values exist
391 // There are no NULL values in the rows written
392 assert(values_read == 1);
393 assert(value.len == FIXED_LENGTH);
394 assert(memcmp(value.ptr, &expected_value[0], FIXED_LENGTH) == 0);
395 assert(definition_level == 1);
396 } else {
397 // There are NULL values in the rows written
398 assert(values_read == 0);
399 assert(definition_level == 0);
400 }
401 col_row_counts[col_id]++;
402 }
403
404 // Get the Column Reader for the FixedLengthByteArray column
405 col_id++;
406 column_reader = row_group_reader->Column(col_id);
407 parquet::FixedLenByteArrayReader* flba_reader =
408 static_cast<parquet::FixedLenByteArrayReader*>(column_reader.get());
409 // Read all the rows in the column
410 while (flba_reader->HasNext()) {
411 parquet::FixedLenByteArray value;
412 // Read one value at a time. The number of rows read is returned. values_read
413 // contains the number of non-null rows
414 rows_read = flba_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
415 // Ensure only one value is read
416 assert(rows_read == 1);
417 // There are no NULL values in the rows written
418 assert(values_read == 1);
419 // Verify the value written
420 char v = static_cast<char>(col_row_counts[col_id]);
421 char expected_value[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v};
422 assert(memcmp(value.ptr, &expected_value[0], FIXED_LENGTH) == 0);
423 col_row_counts[col_id]++;
424 }
425 }
426 } catch (const std::exception& e) {
427 std::cerr << "Parquet read error: " << e.what() << std::endl;
428 return -1;
429 }
430
431 std::cout << "Parquet Writing and Reading Complete" << std::endl;
432
433 return 0;
434 }
435