1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include <reader_writer.h>
19 
20 #include <cassert>
21 #include <fstream>
22 #include <iostream>
23 #include <memory>
24 
25 /*
26  * This example describes writing and reading Parquet Files in C++ and serves as a
27  * reference to the API.
28  * The file contains all the physical data types supported by Parquet.
29  * This example uses the RowGroupWriter API that supports writing RowGroups optimized for
30  * memory consumption.
31  **/
32 
33 /* Parquet is a structured columnar file format
34  * Parquet File = "Parquet data" + "Parquet Metadata"
35  * "Parquet data" is simply a vector of RowGroups. Each RowGroup is a batch of rows in a
36  * columnar layout
37  * "Parquet Metadata" contains the "file schema" and attributes of the RowGroups and their
38  * Columns
39  * "file schema" is a tree where each node is either a primitive type (leaf nodes) or a
40  * complex (nested) type (internal nodes)
41  * For specific details, please refer the format here:
42  * https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
43  **/
44 
45 constexpr int NUM_ROWS_PER_ROW_GROUP = 500;
46 const char PARQUET_FILENAME[] = "parquet_cpp_example.parquet";
47 
main(int argc,char ** argv)48 int main(int argc, char** argv) {
49   /**********************************************************************************
50                              PARQUET WRITER EXAMPLE
51   **********************************************************************************/
52   // parquet::REQUIRED fields do not need definition and repetition level values
53   // parquet::OPTIONAL fields require only definition level values
54   // parquet::REPEATED fields require both definition and repetition level values
55   try {
56     // Create a local file output stream instance.
57     using FileClass = ::arrow::io::FileOutputStream;
58     std::shared_ptr<FileClass> out_file;
59     PARQUET_ASSIGN_OR_THROW(out_file, FileClass::Open(PARQUET_FILENAME));
60 
61     // Setup the parquet schema
62     std::shared_ptr<GroupNode> schema = SetupSchema();
63 
64     // Add writer properties
65     parquet::WriterProperties::Builder builder;
66     builder.compression(parquet::Compression::SNAPPY);
67     std::shared_ptr<parquet::WriterProperties> props = builder.build();
68 
69     // Create a ParquetFileWriter instance
70     std::shared_ptr<parquet::ParquetFileWriter> file_writer =
71         parquet::ParquetFileWriter::Open(out_file, schema, props);
72 
73     // Append a RowGroup with a specific number of rows.
74     parquet::RowGroupWriter* rg_writer = file_writer->AppendRowGroup();
75 
76     // Write the Bool column
77     parquet::BoolWriter* bool_writer =
78         static_cast<parquet::BoolWriter*>(rg_writer->NextColumn());
79     for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
80       bool value = ((i % 2) == 0) ? true : false;
81       bool_writer->WriteBatch(1, nullptr, nullptr, &value);
82     }
83 
84     // Write the Int32 column
85     parquet::Int32Writer* int32_writer =
86         static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
87     for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
88       int32_t value = i;
89       int32_writer->WriteBatch(1, nullptr, nullptr, &value);
90     }
91 
92     // Write the Int64 column. Each row has repeats twice.
93     parquet::Int64Writer* int64_writer =
94         static_cast<parquet::Int64Writer*>(rg_writer->NextColumn());
95     for (int i = 0; i < 2 * NUM_ROWS_PER_ROW_GROUP; i++) {
96       int64_t value = i * 1000 * 1000;
97       value *= 1000 * 1000;
98       int16_t definition_level = 1;
99       int16_t repetition_level = 0;
100       if ((i % 2) == 0) {
101         repetition_level = 1;  // start of a new record
102       }
103       int64_writer->WriteBatch(1, &definition_level, &repetition_level, &value);
104     }
105 
106     // Write the INT96 column.
107     parquet::Int96Writer* int96_writer =
108         static_cast<parquet::Int96Writer*>(rg_writer->NextColumn());
109     for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
110       parquet::Int96 value;
111       value.value[0] = i;
112       value.value[1] = i + 1;
113       value.value[2] = i + 2;
114       int96_writer->WriteBatch(1, nullptr, nullptr, &value);
115     }
116 
117     // Write the Float column
118     parquet::FloatWriter* float_writer =
119         static_cast<parquet::FloatWriter*>(rg_writer->NextColumn());
120     for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
121       float value = static_cast<float>(i) * 1.1f;
122       float_writer->WriteBatch(1, nullptr, nullptr, &value);
123     }
124 
125     // Write the Double column
126     parquet::DoubleWriter* double_writer =
127         static_cast<parquet::DoubleWriter*>(rg_writer->NextColumn());
128     for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
129       double value = i * 1.1111111;
130       double_writer->WriteBatch(1, nullptr, nullptr, &value);
131     }
132 
133     // Write the ByteArray column. Make every alternate values NULL
134     parquet::ByteArrayWriter* ba_writer =
135         static_cast<parquet::ByteArrayWriter*>(rg_writer->NextColumn());
136     for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
137       parquet::ByteArray value;
138       char hello[FIXED_LENGTH] = "parquet";
139       hello[7] = static_cast<char>(static_cast<int>('0') + i / 100);
140       hello[8] = static_cast<char>(static_cast<int>('0') + (i / 10) % 10);
141       hello[9] = static_cast<char>(static_cast<int>('0') + i % 10);
142       if (i % 2 == 0) {
143         int16_t definition_level = 1;
144         value.ptr = reinterpret_cast<const uint8_t*>(&hello[0]);
145         value.len = FIXED_LENGTH;
146         ba_writer->WriteBatch(1, &definition_level, nullptr, &value);
147       } else {
148         int16_t definition_level = 0;
149         ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr);
150       }
151     }
152 
153     // Write the FixedLengthByteArray column
154     parquet::FixedLenByteArrayWriter* flba_writer =
155         static_cast<parquet::FixedLenByteArrayWriter*>(rg_writer->NextColumn());
156     for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
157       parquet::FixedLenByteArray value;
158       char v = static_cast<char>(i);
159       char flba[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v};
160       value.ptr = reinterpret_cast<const uint8_t*>(&flba[0]);
161 
162       flba_writer->WriteBatch(1, nullptr, nullptr, &value);
163     }
164 
165     // Close the ParquetFileWriter
166     file_writer->Close();
167 
168     // Write the bytes to file
169     DCHECK(out_file->Close().ok());
170   } catch (const std::exception& e) {
171     std::cerr << "Parquet write error: " << e.what() << std::endl;
172     return -1;
173   }
174 
175   /**********************************************************************************
176                              PARQUET READER EXAMPLE
177   **********************************************************************************/
178 
179   try {
180     // Create a ParquetReader instance
181     std::unique_ptr<parquet::ParquetFileReader> parquet_reader =
182         parquet::ParquetFileReader::OpenFile(PARQUET_FILENAME, false);
183 
184     // Get the File MetaData
185     std::shared_ptr<parquet::FileMetaData> file_metadata = parquet_reader->metadata();
186 
187     // Get the number of RowGroups
188     int num_row_groups = file_metadata->num_row_groups();
189     assert(num_row_groups == 1);
190 
191     // Get the number of Columns
192     int num_columns = file_metadata->num_columns();
193     assert(num_columns == 8);
194 
195     // Iterate over all the RowGroups in the file
196     for (int r = 0; r < num_row_groups; ++r) {
197       // Get the RowGroup Reader
198       std::shared_ptr<parquet::RowGroupReader> row_group_reader =
199           parquet_reader->RowGroup(r);
200 
201       int64_t values_read = 0;
202       int64_t rows_read = 0;
203       int16_t definition_level;
204       int16_t repetition_level;
205       int i;
206       std::shared_ptr<parquet::ColumnReader> column_reader;
207 
208       ARROW_UNUSED(rows_read);  // prevent warning in release build
209 
210       // Get the Column Reader for the boolean column
211       column_reader = row_group_reader->Column(0);
212       parquet::BoolReader* bool_reader =
213           static_cast<parquet::BoolReader*>(column_reader.get());
214 
215       // Read all the rows in the column
216       i = 0;
217       while (bool_reader->HasNext()) {
218         bool value;
219         // Read one value at a time. The number of rows read is returned. values_read
220         // contains the number of non-null rows
221         rows_read = bool_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
222         // Ensure only one value is read
223         assert(rows_read == 1);
224         // There are no NULL values in the rows written
225         assert(values_read == 1);
226         // Verify the value written
227         bool expected_value = ((i % 2) == 0) ? true : false;
228         assert(value == expected_value);
229         i++;
230       }
231 
232       // Get the Column Reader for the Int32 column
233       column_reader = row_group_reader->Column(1);
234       parquet::Int32Reader* int32_reader =
235           static_cast<parquet::Int32Reader*>(column_reader.get());
236       // Read all the rows in the column
237       i = 0;
238       while (int32_reader->HasNext()) {
239         int32_t value;
240         // Read one value at a time. The number of rows read is returned. values_read
241         // contains the number of non-null rows
242         rows_read = int32_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
243         // Ensure only one value is read
244         assert(rows_read == 1);
245         // There are no NULL values in the rows written
246         assert(values_read == 1);
247         // Verify the value written
248         assert(value == i);
249         i++;
250       }
251 
252       // Get the Column Reader for the Int64 column
253       column_reader = row_group_reader->Column(2);
254       parquet::Int64Reader* int64_reader =
255           static_cast<parquet::Int64Reader*>(column_reader.get());
256       // Read all the rows in the column
257       i = 0;
258       while (int64_reader->HasNext()) {
259         int64_t value;
260         // Read one value at a time. The number of rows read is returned. values_read
261         // contains the number of non-null rows
262         rows_read = int64_reader->ReadBatch(1, &definition_level, &repetition_level,
263                                             &value, &values_read);
264         // Ensure only one value is read
265         assert(rows_read == 1);
266         // There are no NULL values in the rows written
267         assert(values_read == 1);
268         // Verify the value written
269         int64_t expected_value = i * 1000 * 1000;
270         expected_value *= 1000 * 1000;
271         assert(value == expected_value);
272         if ((i % 2) == 0) {
273           assert(repetition_level == 1);
274         } else {
275           assert(repetition_level == 0);
276         }
277         i++;
278       }
279 
280       // Get the Column Reader for the Int96 column
281       column_reader = row_group_reader->Column(3);
282       parquet::Int96Reader* int96_reader =
283           static_cast<parquet::Int96Reader*>(column_reader.get());
284       // Read all the rows in the column
285       i = 0;
286       while (int96_reader->HasNext()) {
287         parquet::Int96 value;
288         // Read one value at a time. The number of rows read is returned. values_read
289         // contains the number of non-null rows
290         rows_read = int96_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
291         // Ensure only one value is read
292         assert(rows_read == 1);
293         // There are no NULL values in the rows written
294         assert(values_read == 1);
295         // Verify the value written
296         parquet::Int96 expected_value;
297         ARROW_UNUSED(expected_value);  // prevent warning in release build
298         expected_value.value[0] = i;
299         expected_value.value[1] = i + 1;
300         expected_value.value[2] = i + 2;
301         for (int j = 0; j < 3; j++) {
302           assert(value.value[j] == expected_value.value[j]);
303         }
304         i++;
305       }
306 
307       // Get the Column Reader for the Float column
308       column_reader = row_group_reader->Column(4);
309       parquet::FloatReader* float_reader =
310           static_cast<parquet::FloatReader*>(column_reader.get());
311       // Read all the rows in the column
312       i = 0;
313       while (float_reader->HasNext()) {
314         float value;
315         // Read one value at a time. The number of rows read is returned. values_read
316         // contains the number of non-null rows
317         rows_read = float_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
318         // Ensure only one value is read
319         assert(rows_read == 1);
320         // There are no NULL values in the rows written
321         assert(values_read == 1);
322         // Verify the value written
323         float expected_value = static_cast<float>(i) * 1.1f;
324         assert(value == expected_value);
325         i++;
326       }
327 
328       // Get the Column Reader for the Double column
329       column_reader = row_group_reader->Column(5);
330       parquet::DoubleReader* double_reader =
331           static_cast<parquet::DoubleReader*>(column_reader.get());
332       // Read all the rows in the column
333       i = 0;
334       while (double_reader->HasNext()) {
335         double value;
336         // Read one value at a time. The number of rows read is returned. values_read
337         // contains the number of non-null rows
338         rows_read = double_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
339         // Ensure only one value is read
340         assert(rows_read == 1);
341         // There are no NULL values in the rows written
342         assert(values_read == 1);
343         // Verify the value written
344         double expected_value = i * 1.1111111;
345         assert(value == expected_value);
346         i++;
347       }
348 
349       // Get the Column Reader for the ByteArray column
350       column_reader = row_group_reader->Column(6);
351       parquet::ByteArrayReader* ba_reader =
352           static_cast<parquet::ByteArrayReader*>(column_reader.get());
353       // Read all the rows in the column
354       i = 0;
355       while (ba_reader->HasNext()) {
356         parquet::ByteArray value;
357         // Read one value at a time. The number of rows read is returned. values_read
358         // contains the number of non-null rows
359         rows_read =
360             ba_reader->ReadBatch(1, &definition_level, nullptr, &value, &values_read);
361         // Ensure only one value is read
362         assert(rows_read == 1);
363         // Verify the value written
364         char expected_value[FIXED_LENGTH] = "parquet";
365         ARROW_UNUSED(expected_value);  // prevent warning in release build
366         expected_value[7] = static_cast<char>('0' + i / 100);
367         expected_value[8] = static_cast<char>('0' + (i / 10) % 10);
368         expected_value[9] = static_cast<char>('0' + i % 10);
369         if (i % 2 == 0) {  // only alternate values exist
370           // There are no NULL values in the rows written
371           assert(values_read == 1);
372           assert(value.len == FIXED_LENGTH);
373           assert(memcmp(value.ptr, &expected_value[0], FIXED_LENGTH) == 0);
374           assert(definition_level == 1);
375         } else {
376           // There are NULL values in the rows written
377           assert(values_read == 0);
378           assert(definition_level == 0);
379         }
380         i++;
381       }
382 
383       // Get the Column Reader for the FixedLengthByteArray column
384       column_reader = row_group_reader->Column(7);
385       parquet::FixedLenByteArrayReader* flba_reader =
386           static_cast<parquet::FixedLenByteArrayReader*>(column_reader.get());
387       // Read all the rows in the column
388       i = 0;
389       while (flba_reader->HasNext()) {
390         parquet::FixedLenByteArray value;
391         // Read one value at a time. The number of rows read is returned. values_read
392         // contains the number of non-null rows
393         rows_read = flba_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
394         // Ensure only one value is read
395         assert(rows_read == 1);
396         // There are no NULL values in the rows written
397         assert(values_read == 1);
398         // Verify the value written
399         char v = static_cast<char>(i);
400         char expected_value[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v};
401         assert(memcmp(value.ptr, &expected_value[0], FIXED_LENGTH) == 0);
402         i++;
403       }
404     }
405   } catch (const std::exception& e) {
406     std::cerr << "Parquet read error: " << e.what() << std::endl;
407     return -1;
408   }
409 
410   std::cout << "Parquet Writing and Reading Complete" << std::endl;
411 
412   return 0;
413 }
414