1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include <reader_writer.h>
19 
20 #include <cassert>
21 #include <fstream>
22 #include <iostream>
23 #include <memory>
24 
25 /*
26  * This example describes writing and reading Parquet Files in C++ and serves as a
27  * reference to the API.
28  * The file contains all the physical data types supported by Parquet.
29  * This example uses the RowGroupWriter API that supports writing RowGroups based on a
30  * certain size.
31  **/
32 
33 /* Parquet is a structured columnar file format
34  * Parquet File = "Parquet data" + "Parquet Metadata"
35  * "Parquet data" is simply a vector of RowGroups. Each RowGroup is a batch of rows in a
36  * columnar layout
37  * "Parquet Metadata" contains the "file schema" and attributes of the RowGroups and their
38  * Columns
39  * "file schema" is a tree where each node is either a primitive type (leaf nodes) or a
40  * complex (nested) type (internal nodes)
41  * For specific details, please refer the format here:
42  * https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
43  **/
44 
45 constexpr int NUM_ROWS = 2500000;
46 constexpr int64_t ROW_GROUP_SIZE = 16 * 1024 * 1024;  // 16 MB
47 const char PARQUET_FILENAME[] = "parquet_cpp_example2.parquet";
48 
main(int argc,char ** argv)49 int main(int argc, char** argv) {
50   /**********************************************************************************
51                              PARQUET WRITER EXAMPLE
52   **********************************************************************************/
53   // parquet::REQUIRED fields do not need definition and repetition level values
54   // parquet::OPTIONAL fields require only definition level values
55   // parquet::REPEATED fields require both definition and repetition level values
56   try {
57     // Create a local file output stream instance.
58     using FileClass = ::arrow::io::FileOutputStream;
59     std::shared_ptr<FileClass> out_file;
60     PARQUET_ASSIGN_OR_THROW(out_file, FileClass::Open(PARQUET_FILENAME));
61 
62     // Setup the parquet schema
63     std::shared_ptr<GroupNode> schema = SetupSchema();
64 
65     // Add writer properties
66     parquet::WriterProperties::Builder builder;
67     builder.compression(parquet::Compression::SNAPPY);
68     std::shared_ptr<parquet::WriterProperties> props = builder.build();
69 
70     // Create a ParquetFileWriter instance
71     std::shared_ptr<parquet::ParquetFileWriter> file_writer =
72         parquet::ParquetFileWriter::Open(out_file, schema, props);
73 
74     // Append a BufferedRowGroup to keep the RowGroup open until a certain size
75     parquet::RowGroupWriter* rg_writer = file_writer->AppendBufferedRowGroup();
76 
77     int num_columns = file_writer->num_columns();
78     std::vector<int64_t> buffered_values_estimate(num_columns, 0);
79     for (int i = 0; i < NUM_ROWS; i++) {
80       int64_t estimated_bytes = 0;
81       // Get the estimated size of the values that are not written to a page yet
82       for (int n = 0; n < num_columns; n++) {
83         estimated_bytes += buffered_values_estimate[n];
84       }
85 
86       // We need to consider the compressed pages
87       // as well as the values that are not compressed yet
88       if ((rg_writer->total_bytes_written() + rg_writer->total_compressed_bytes() +
89            estimated_bytes) > ROW_GROUP_SIZE) {
90         rg_writer->Close();
91         std::fill(buffered_values_estimate.begin(), buffered_values_estimate.end(), 0);
92         rg_writer = file_writer->AppendBufferedRowGroup();
93       }
94 
95       int col_id = 0;
96       // Write the Bool column
97       parquet::BoolWriter* bool_writer =
98           static_cast<parquet::BoolWriter*>(rg_writer->column(col_id));
99       bool bool_value = ((i % 2) == 0) ? true : false;
100       bool_writer->WriteBatch(1, nullptr, nullptr, &bool_value);
101       buffered_values_estimate[col_id] = bool_writer->EstimatedBufferedValueBytes();
102 
103       // Write the Int32 column
104       col_id++;
105       parquet::Int32Writer* int32_writer =
106           static_cast<parquet::Int32Writer*>(rg_writer->column(col_id));
107       int32_t int32_value = i;
108       int32_writer->WriteBatch(1, nullptr, nullptr, &int32_value);
109       buffered_values_estimate[col_id] = int32_writer->EstimatedBufferedValueBytes();
110 
111       // Write the Int64 column. Each row has repeats twice.
112       col_id++;
113       parquet::Int64Writer* int64_writer =
114           static_cast<parquet::Int64Writer*>(rg_writer->column(col_id));
115       int64_t int64_value1 = 2 * i;
116       int16_t definition_level = 1;
117       int16_t repetition_level = 0;
118       int64_writer->WriteBatch(1, &definition_level, &repetition_level, &int64_value1);
119       int64_t int64_value2 = (2 * i + 1);
120       repetition_level = 1;  // start of a new record
121       int64_writer->WriteBatch(1, &definition_level, &repetition_level, &int64_value2);
122       buffered_values_estimate[col_id] = int64_writer->EstimatedBufferedValueBytes();
123 
124       // Write the INT96 column.
125       col_id++;
126       parquet::Int96Writer* int96_writer =
127           static_cast<parquet::Int96Writer*>(rg_writer->column(col_id));
128       parquet::Int96 int96_value;
129       int96_value.value[0] = i;
130       int96_value.value[1] = i + 1;
131       int96_value.value[2] = i + 2;
132       int96_writer->WriteBatch(1, nullptr, nullptr, &int96_value);
133       buffered_values_estimate[col_id] = int96_writer->EstimatedBufferedValueBytes();
134 
135       // Write the Float column
136       col_id++;
137       parquet::FloatWriter* float_writer =
138           static_cast<parquet::FloatWriter*>(rg_writer->column(col_id));
139       float float_value = static_cast<float>(i) * 1.1f;
140       float_writer->WriteBatch(1, nullptr, nullptr, &float_value);
141       buffered_values_estimate[col_id] = float_writer->EstimatedBufferedValueBytes();
142 
143       // Write the Double column
144       col_id++;
145       parquet::DoubleWriter* double_writer =
146           static_cast<parquet::DoubleWriter*>(rg_writer->column(col_id));
147       double double_value = i * 1.1111111;
148       double_writer->WriteBatch(1, nullptr, nullptr, &double_value);
149       buffered_values_estimate[col_id] = double_writer->EstimatedBufferedValueBytes();
150 
151       // Write the ByteArray column. Make every alternate values NULL
152       col_id++;
153       parquet::ByteArrayWriter* ba_writer =
154           static_cast<parquet::ByteArrayWriter*>(rg_writer->column(col_id));
155       parquet::ByteArray ba_value;
156       char hello[FIXED_LENGTH] = "parquet";
157       hello[7] = static_cast<char>(static_cast<int>('0') + i / 100);
158       hello[8] = static_cast<char>(static_cast<int>('0') + (i / 10) % 10);
159       hello[9] = static_cast<char>(static_cast<int>('0') + i % 10);
160       if (i % 2 == 0) {
161         int16_t definition_level = 1;
162         ba_value.ptr = reinterpret_cast<const uint8_t*>(&hello[0]);
163         ba_value.len = FIXED_LENGTH;
164         ba_writer->WriteBatch(1, &definition_level, nullptr, &ba_value);
165       } else {
166         int16_t definition_level = 0;
167         ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr);
168       }
169       buffered_values_estimate[col_id] = ba_writer->EstimatedBufferedValueBytes();
170 
171       // Write the FixedLengthByteArray column
172       col_id++;
173       parquet::FixedLenByteArrayWriter* flba_writer =
174           static_cast<parquet::FixedLenByteArrayWriter*>(rg_writer->column(col_id));
175       parquet::FixedLenByteArray flba_value;
176       char v = static_cast<char>(i);
177       char flba[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v};
178       flba_value.ptr = reinterpret_cast<const uint8_t*>(&flba[0]);
179 
180       flba_writer->WriteBatch(1, nullptr, nullptr, &flba_value);
181       buffered_values_estimate[col_id] = flba_writer->EstimatedBufferedValueBytes();
182     }
183 
184     // Close the RowGroupWriter
185     rg_writer->Close();
186     // Close the ParquetFileWriter
187     file_writer->Close();
188 
189     // Write the bytes to file
190     DCHECK(out_file->Close().ok());
191   } catch (const std::exception& e) {
192     std::cerr << "Parquet write error: " << e.what() << std::endl;
193     return -1;
194   }
195 
196   /**********************************************************************************
197                              PARQUET READER EXAMPLE
198   **********************************************************************************/
199 
200   try {
201     // Create a ParquetReader instance
202     std::unique_ptr<parquet::ParquetFileReader> parquet_reader =
203         parquet::ParquetFileReader::OpenFile(PARQUET_FILENAME, false);
204 
205     // Get the File MetaData
206     std::shared_ptr<parquet::FileMetaData> file_metadata = parquet_reader->metadata();
207 
208     int num_row_groups = file_metadata->num_row_groups();
209 
210     // Get the number of Columns
211     int num_columns = file_metadata->num_columns();
212     assert(num_columns == 8);
213 
214     std::vector<int> col_row_counts(num_columns, 0);
215 
216     // Iterate over all the RowGroups in the file
217     for (int r = 0; r < num_row_groups; ++r) {
218       // Get the RowGroup Reader
219       std::shared_ptr<parquet::RowGroupReader> row_group_reader =
220           parquet_reader->RowGroup(r);
221 
222       assert(row_group_reader->metadata()->total_byte_size() < ROW_GROUP_SIZE);
223 
224       int64_t values_read = 0;
225       int64_t rows_read = 0;
226       int16_t definition_level;
227       int16_t repetition_level;
228       std::shared_ptr<parquet::ColumnReader> column_reader;
229       int col_id = 0;
230 
231       ARROW_UNUSED(rows_read);  // prevent warning in release build
232 
233       // Get the Column Reader for the boolean column
234       column_reader = row_group_reader->Column(col_id);
235       parquet::BoolReader* bool_reader =
236           static_cast<parquet::BoolReader*>(column_reader.get());
237 
238       // Read all the rows in the column
239       while (bool_reader->HasNext()) {
240         bool value;
241         // Read one value at a time. The number of rows read is returned. values_read
242         // contains the number of non-null rows
243         rows_read = bool_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
244         // Ensure only one value is read
245         assert(rows_read == 1);
246         // There are no NULL values in the rows written
247         assert(values_read == 1);
248         // Verify the value written
249         bool expected_value = ((col_row_counts[col_id] % 2) == 0) ? true : false;
250         assert(value == expected_value);
251         col_row_counts[col_id]++;
252       }
253 
254       // Get the Column Reader for the Int32 column
255       col_id++;
256       column_reader = row_group_reader->Column(col_id);
257       parquet::Int32Reader* int32_reader =
258           static_cast<parquet::Int32Reader*>(column_reader.get());
259       // Read all the rows in the column
260       while (int32_reader->HasNext()) {
261         int32_t value;
262         // Read one value at a time. The number of rows read is returned. values_read
263         // contains the number of non-null rows
264         rows_read = int32_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
265         // Ensure only one value is read
266         assert(rows_read == 1);
267         // There are no NULL values in the rows written
268         assert(values_read == 1);
269         // Verify the value written
270         assert(value == col_row_counts[col_id]);
271         col_row_counts[col_id]++;
272       }
273 
274       // Get the Column Reader for the Int64 column
275       col_id++;
276       column_reader = row_group_reader->Column(col_id);
277       parquet::Int64Reader* int64_reader =
278           static_cast<parquet::Int64Reader*>(column_reader.get());
279       // Read all the rows in the column
280       while (int64_reader->HasNext()) {
281         int64_t value;
282         // Read one value at a time. The number of rows read is returned. values_read
283         // contains the number of non-null rows
284         rows_read = int64_reader->ReadBatch(1, &definition_level, &repetition_level,
285                                             &value, &values_read);
286         // Ensure only one value is read
287         assert(rows_read == 1);
288         // There are no NULL values in the rows written
289         assert(values_read == 1);
290         // Verify the value written
291         int64_t expected_value = col_row_counts[col_id];
292         assert(value == expected_value);
293         if ((col_row_counts[col_id] % 2) == 0) {
294           assert(repetition_level == 0);
295         } else {
296           assert(repetition_level == 1);
297         }
298         col_row_counts[col_id]++;
299       }
300 
301       // Get the Column Reader for the Int96 column
302       col_id++;
303       column_reader = row_group_reader->Column(col_id);
304       parquet::Int96Reader* int96_reader =
305           static_cast<parquet::Int96Reader*>(column_reader.get());
306       // Read all the rows in the column
307       while (int96_reader->HasNext()) {
308         parquet::Int96 value;
309         // Read one value at a time. The number of rows read is returned. values_read
310         // contains the number of non-null rows
311         rows_read = int96_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
312         // Ensure only one value is read
313         assert(rows_read == 1);
314         // There are no NULL values in the rows written
315         assert(values_read == 1);
316         // Verify the value written
317         parquet::Int96 expected_value;
318         ARROW_UNUSED(expected_value);  // prevent warning in release build
319         expected_value.value[0] = col_row_counts[col_id];
320         expected_value.value[1] = col_row_counts[col_id] + 1;
321         expected_value.value[2] = col_row_counts[col_id] + 2;
322         for (int j = 0; j < 3; j++) {
323           assert(value.value[j] == expected_value.value[j]);
324         }
325         col_row_counts[col_id]++;
326       }
327 
328       // Get the Column Reader for the Float column
329       col_id++;
330       column_reader = row_group_reader->Column(col_id);
331       parquet::FloatReader* float_reader =
332           static_cast<parquet::FloatReader*>(column_reader.get());
333       // Read all the rows in the column
334       while (float_reader->HasNext()) {
335         float value;
336         // Read one value at a time. The number of rows read is returned. values_read
337         // contains the number of non-null rows
338         rows_read = float_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
339         // Ensure only one value is read
340         assert(rows_read == 1);
341         // There are no NULL values in the rows written
342         assert(values_read == 1);
343         // Verify the value written
344         float expected_value = static_cast<float>(col_row_counts[col_id]) * 1.1f;
345         assert(value == expected_value);
346         col_row_counts[col_id]++;
347       }
348 
349       // Get the Column Reader for the Double column
350       col_id++;
351       column_reader = row_group_reader->Column(col_id);
352       parquet::DoubleReader* double_reader =
353           static_cast<parquet::DoubleReader*>(column_reader.get());
354       // Read all the rows in the column
355       while (double_reader->HasNext()) {
356         double value;
357         // Read one value at a time. The number of rows read is returned. values_read
358         // contains the number of non-null rows
359         rows_read = double_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
360         // Ensure only one value is read
361         assert(rows_read == 1);
362         // There are no NULL values in the rows written
363         assert(values_read == 1);
364         // Verify the value written
365         double expected_value = col_row_counts[col_id] * 1.1111111;
366         assert(value == expected_value);
367         col_row_counts[col_id]++;
368       }
369 
370       // Get the Column Reader for the ByteArray column
371       col_id++;
372       column_reader = row_group_reader->Column(col_id);
373       parquet::ByteArrayReader* ba_reader =
374           static_cast<parquet::ByteArrayReader*>(column_reader.get());
375       // Read all the rows in the column
376       while (ba_reader->HasNext()) {
377         parquet::ByteArray value;
378         // Read one value at a time. The number of rows read is returned. values_read
379         // contains the number of non-null rows
380         rows_read =
381             ba_reader->ReadBatch(1, &definition_level, nullptr, &value, &values_read);
382         // Ensure only one value is read
383         assert(rows_read == 1);
384         // Verify the value written
385         char expected_value[FIXED_LENGTH] = "parquet";
386         ARROW_UNUSED(expected_value);  // prevent warning in release build
387         expected_value[7] = static_cast<char>('0' + col_row_counts[col_id] / 100);
388         expected_value[8] = static_cast<char>('0' + (col_row_counts[col_id] / 10) % 10);
389         expected_value[9] = static_cast<char>('0' + col_row_counts[col_id] % 10);
390         if (col_row_counts[col_id] % 2 == 0) {  // only alternate values exist
391           // There are no NULL values in the rows written
392           assert(values_read == 1);
393           assert(value.len == FIXED_LENGTH);
394           assert(memcmp(value.ptr, &expected_value[0], FIXED_LENGTH) == 0);
395           assert(definition_level == 1);
396         } else {
397           // There are NULL values in the rows written
398           assert(values_read == 0);
399           assert(definition_level == 0);
400         }
401         col_row_counts[col_id]++;
402       }
403 
404       // Get the Column Reader for the FixedLengthByteArray column
405       col_id++;
406       column_reader = row_group_reader->Column(col_id);
407       parquet::FixedLenByteArrayReader* flba_reader =
408           static_cast<parquet::FixedLenByteArrayReader*>(column_reader.get());
409       // Read all the rows in the column
410       while (flba_reader->HasNext()) {
411         parquet::FixedLenByteArray value;
412         // Read one value at a time. The number of rows read is returned. values_read
413         // contains the number of non-null rows
414         rows_read = flba_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
415         // Ensure only one value is read
416         assert(rows_read == 1);
417         // There are no NULL values in the rows written
418         assert(values_read == 1);
419         // Verify the value written
420         char v = static_cast<char>(col_row_counts[col_id]);
421         char expected_value[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v};
422         assert(memcmp(value.ptr, &expected_value[0], FIXED_LENGTH) == 0);
423         col_row_counts[col_id]++;
424       }
425     }
426   } catch (const std::exception& e) {
427     std::cerr << "Parquet read error: " << e.what() << std::endl;
428     return -1;
429   }
430 
431   std::cout << "Parquet Writing and Reading Complete" << std::endl;
432 
433   return 0;
434 }
435