1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include <reader_writer.h>
19
20 #include <cassert>
21 #include <fstream>
22 #include <iostream>
23 #include <memory>
24
25 /*
26 * This example describes writing and reading Parquet Files in C++ and serves as a
27 * reference to the API.
28 * The file contains all the physical data types supported by Parquet.
29 * This example uses the RowGroupWriter API that supports writing RowGroups optimized for
30 * memory consumption.
31 **/
32
33 /* Parquet is a structured columnar file format
34 * Parquet File = "Parquet data" + "Parquet Metadata"
35 * "Parquet data" is simply a vector of RowGroups. Each RowGroup is a batch of rows in a
36 * columnar layout
37 * "Parquet Metadata" contains the "file schema" and attributes of the RowGroups and their
38 * Columns
39 * "file schema" is a tree where each node is either a primitive type (leaf nodes) or a
40 * complex (nested) type (internal nodes)
41 * For specific details, please refer the format here:
42 * https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
43 **/
44
45 constexpr int NUM_ROWS_PER_ROW_GROUP = 500;
46 const char PARQUET_FILENAME[] = "parquet_cpp_example.parquet";
47
main(int argc,char ** argv)48 int main(int argc, char** argv) {
49 /**********************************************************************************
50 PARQUET WRITER EXAMPLE
51 **********************************************************************************/
52 // parquet::REQUIRED fields do not need definition and repetition level values
53 // parquet::OPTIONAL fields require only definition level values
54 // parquet::REPEATED fields require both definition and repetition level values
55 try {
56 // Create a local file output stream instance.
57 using FileClass = ::arrow::io::FileOutputStream;
58 std::shared_ptr<FileClass> out_file;
59 PARQUET_ASSIGN_OR_THROW(out_file, FileClass::Open(PARQUET_FILENAME));
60
61 // Setup the parquet schema
62 std::shared_ptr<GroupNode> schema = SetupSchema();
63
64 // Add writer properties
65 parquet::WriterProperties::Builder builder;
66 builder.compression(parquet::Compression::SNAPPY);
67 std::shared_ptr<parquet::WriterProperties> props = builder.build();
68
69 // Create a ParquetFileWriter instance
70 std::shared_ptr<parquet::ParquetFileWriter> file_writer =
71 parquet::ParquetFileWriter::Open(out_file, schema, props);
72
73 // Append a RowGroup with a specific number of rows.
74 parquet::RowGroupWriter* rg_writer = file_writer->AppendRowGroup();
75
76 // Write the Bool column
77 parquet::BoolWriter* bool_writer =
78 static_cast<parquet::BoolWriter*>(rg_writer->NextColumn());
79 for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
80 bool value = ((i % 2) == 0) ? true : false;
81 bool_writer->WriteBatch(1, nullptr, nullptr, &value);
82 }
83
84 // Write the Int32 column
85 parquet::Int32Writer* int32_writer =
86 static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
87 for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
88 int32_t value = i;
89 int32_writer->WriteBatch(1, nullptr, nullptr, &value);
90 }
91
92 // Write the Int64 column. Each row has repeats twice.
93 parquet::Int64Writer* int64_writer =
94 static_cast<parquet::Int64Writer*>(rg_writer->NextColumn());
95 for (int i = 0; i < 2 * NUM_ROWS_PER_ROW_GROUP; i++) {
96 int64_t value = i * 1000 * 1000;
97 value *= 1000 * 1000;
98 int16_t definition_level = 1;
99 int16_t repetition_level = 0;
100 if ((i % 2) == 0) {
101 repetition_level = 1; // start of a new record
102 }
103 int64_writer->WriteBatch(1, &definition_level, &repetition_level, &value);
104 }
105
106 // Write the INT96 column.
107 parquet::Int96Writer* int96_writer =
108 static_cast<parquet::Int96Writer*>(rg_writer->NextColumn());
109 for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
110 parquet::Int96 value;
111 value.value[0] = i;
112 value.value[1] = i + 1;
113 value.value[2] = i + 2;
114 int96_writer->WriteBatch(1, nullptr, nullptr, &value);
115 }
116
117 // Write the Float column
118 parquet::FloatWriter* float_writer =
119 static_cast<parquet::FloatWriter*>(rg_writer->NextColumn());
120 for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
121 float value = static_cast<float>(i) * 1.1f;
122 float_writer->WriteBatch(1, nullptr, nullptr, &value);
123 }
124
125 // Write the Double column
126 parquet::DoubleWriter* double_writer =
127 static_cast<parquet::DoubleWriter*>(rg_writer->NextColumn());
128 for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
129 double value = i * 1.1111111;
130 double_writer->WriteBatch(1, nullptr, nullptr, &value);
131 }
132
133 // Write the ByteArray column. Make every alternate values NULL
134 parquet::ByteArrayWriter* ba_writer =
135 static_cast<parquet::ByteArrayWriter*>(rg_writer->NextColumn());
136 for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
137 parquet::ByteArray value;
138 char hello[FIXED_LENGTH] = "parquet";
139 hello[7] = static_cast<char>(static_cast<int>('0') + i / 100);
140 hello[8] = static_cast<char>(static_cast<int>('0') + (i / 10) % 10);
141 hello[9] = static_cast<char>(static_cast<int>('0') + i % 10);
142 if (i % 2 == 0) {
143 int16_t definition_level = 1;
144 value.ptr = reinterpret_cast<const uint8_t*>(&hello[0]);
145 value.len = FIXED_LENGTH;
146 ba_writer->WriteBatch(1, &definition_level, nullptr, &value);
147 } else {
148 int16_t definition_level = 0;
149 ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr);
150 }
151 }
152
153 // Write the FixedLengthByteArray column
154 parquet::FixedLenByteArrayWriter* flba_writer =
155 static_cast<parquet::FixedLenByteArrayWriter*>(rg_writer->NextColumn());
156 for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
157 parquet::FixedLenByteArray value;
158 char v = static_cast<char>(i);
159 char flba[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v};
160 value.ptr = reinterpret_cast<const uint8_t*>(&flba[0]);
161
162 flba_writer->WriteBatch(1, nullptr, nullptr, &value);
163 }
164
165 // Close the ParquetFileWriter
166 file_writer->Close();
167
168 // Write the bytes to file
169 DCHECK(out_file->Close().ok());
170 } catch (const std::exception& e) {
171 std::cerr << "Parquet write error: " << e.what() << std::endl;
172 return -1;
173 }
174
175 /**********************************************************************************
176 PARQUET READER EXAMPLE
177 **********************************************************************************/
178
179 try {
180 // Create a ParquetReader instance
181 std::unique_ptr<parquet::ParquetFileReader> parquet_reader =
182 parquet::ParquetFileReader::OpenFile(PARQUET_FILENAME, false);
183
184 // Get the File MetaData
185 std::shared_ptr<parquet::FileMetaData> file_metadata = parquet_reader->metadata();
186
187 // Get the number of RowGroups
188 int num_row_groups = file_metadata->num_row_groups();
189 assert(num_row_groups == 1);
190
191 // Get the number of Columns
192 int num_columns = file_metadata->num_columns();
193 assert(num_columns == 8);
194
195 // Iterate over all the RowGroups in the file
196 for (int r = 0; r < num_row_groups; ++r) {
197 // Get the RowGroup Reader
198 std::shared_ptr<parquet::RowGroupReader> row_group_reader =
199 parquet_reader->RowGroup(r);
200
201 int64_t values_read = 0;
202 int64_t rows_read = 0;
203 int16_t definition_level;
204 int16_t repetition_level;
205 int i;
206 std::shared_ptr<parquet::ColumnReader> column_reader;
207
208 ARROW_UNUSED(rows_read); // prevent warning in release build
209
210 // Get the Column Reader for the boolean column
211 column_reader = row_group_reader->Column(0);
212 parquet::BoolReader* bool_reader =
213 static_cast<parquet::BoolReader*>(column_reader.get());
214
215 // Read all the rows in the column
216 i = 0;
217 while (bool_reader->HasNext()) {
218 bool value;
219 // Read one value at a time. The number of rows read is returned. values_read
220 // contains the number of non-null rows
221 rows_read = bool_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
222 // Ensure only one value is read
223 assert(rows_read == 1);
224 // There are no NULL values in the rows written
225 assert(values_read == 1);
226 // Verify the value written
227 bool expected_value = ((i % 2) == 0) ? true : false;
228 assert(value == expected_value);
229 i++;
230 }
231
232 // Get the Column Reader for the Int32 column
233 column_reader = row_group_reader->Column(1);
234 parquet::Int32Reader* int32_reader =
235 static_cast<parquet::Int32Reader*>(column_reader.get());
236 // Read all the rows in the column
237 i = 0;
238 while (int32_reader->HasNext()) {
239 int32_t value;
240 // Read one value at a time. The number of rows read is returned. values_read
241 // contains the number of non-null rows
242 rows_read = int32_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
243 // Ensure only one value is read
244 assert(rows_read == 1);
245 // There are no NULL values in the rows written
246 assert(values_read == 1);
247 // Verify the value written
248 assert(value == i);
249 i++;
250 }
251
252 // Get the Column Reader for the Int64 column
253 column_reader = row_group_reader->Column(2);
254 parquet::Int64Reader* int64_reader =
255 static_cast<parquet::Int64Reader*>(column_reader.get());
256 // Read all the rows in the column
257 i = 0;
258 while (int64_reader->HasNext()) {
259 int64_t value;
260 // Read one value at a time. The number of rows read is returned. values_read
261 // contains the number of non-null rows
262 rows_read = int64_reader->ReadBatch(1, &definition_level, &repetition_level,
263 &value, &values_read);
264 // Ensure only one value is read
265 assert(rows_read == 1);
266 // There are no NULL values in the rows written
267 assert(values_read == 1);
268 // Verify the value written
269 int64_t expected_value = i * 1000 * 1000;
270 expected_value *= 1000 * 1000;
271 assert(value == expected_value);
272 if ((i % 2) == 0) {
273 assert(repetition_level == 1);
274 } else {
275 assert(repetition_level == 0);
276 }
277 i++;
278 }
279
280 // Get the Column Reader for the Int96 column
281 column_reader = row_group_reader->Column(3);
282 parquet::Int96Reader* int96_reader =
283 static_cast<parquet::Int96Reader*>(column_reader.get());
284 // Read all the rows in the column
285 i = 0;
286 while (int96_reader->HasNext()) {
287 parquet::Int96 value;
288 // Read one value at a time. The number of rows read is returned. values_read
289 // contains the number of non-null rows
290 rows_read = int96_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
291 // Ensure only one value is read
292 assert(rows_read == 1);
293 // There are no NULL values in the rows written
294 assert(values_read == 1);
295 // Verify the value written
296 parquet::Int96 expected_value;
297 ARROW_UNUSED(expected_value); // prevent warning in release build
298 expected_value.value[0] = i;
299 expected_value.value[1] = i + 1;
300 expected_value.value[2] = i + 2;
301 for (int j = 0; j < 3; j++) {
302 assert(value.value[j] == expected_value.value[j]);
303 }
304 i++;
305 }
306
307 // Get the Column Reader for the Float column
308 column_reader = row_group_reader->Column(4);
309 parquet::FloatReader* float_reader =
310 static_cast<parquet::FloatReader*>(column_reader.get());
311 // Read all the rows in the column
312 i = 0;
313 while (float_reader->HasNext()) {
314 float value;
315 // Read one value at a time. The number of rows read is returned. values_read
316 // contains the number of non-null rows
317 rows_read = float_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
318 // Ensure only one value is read
319 assert(rows_read == 1);
320 // There are no NULL values in the rows written
321 assert(values_read == 1);
322 // Verify the value written
323 float expected_value = static_cast<float>(i) * 1.1f;
324 assert(value == expected_value);
325 i++;
326 }
327
328 // Get the Column Reader for the Double column
329 column_reader = row_group_reader->Column(5);
330 parquet::DoubleReader* double_reader =
331 static_cast<parquet::DoubleReader*>(column_reader.get());
332 // Read all the rows in the column
333 i = 0;
334 while (double_reader->HasNext()) {
335 double value;
336 // Read one value at a time. The number of rows read is returned. values_read
337 // contains the number of non-null rows
338 rows_read = double_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
339 // Ensure only one value is read
340 assert(rows_read == 1);
341 // There are no NULL values in the rows written
342 assert(values_read == 1);
343 // Verify the value written
344 double expected_value = i * 1.1111111;
345 assert(value == expected_value);
346 i++;
347 }
348
349 // Get the Column Reader for the ByteArray column
350 column_reader = row_group_reader->Column(6);
351 parquet::ByteArrayReader* ba_reader =
352 static_cast<parquet::ByteArrayReader*>(column_reader.get());
353 // Read all the rows in the column
354 i = 0;
355 while (ba_reader->HasNext()) {
356 parquet::ByteArray value;
357 // Read one value at a time. The number of rows read is returned. values_read
358 // contains the number of non-null rows
359 rows_read =
360 ba_reader->ReadBatch(1, &definition_level, nullptr, &value, &values_read);
361 // Ensure only one value is read
362 assert(rows_read == 1);
363 // Verify the value written
364 char expected_value[FIXED_LENGTH] = "parquet";
365 ARROW_UNUSED(expected_value); // prevent warning in release build
366 expected_value[7] = static_cast<char>('0' + i / 100);
367 expected_value[8] = static_cast<char>('0' + (i / 10) % 10);
368 expected_value[9] = static_cast<char>('0' + i % 10);
369 if (i % 2 == 0) { // only alternate values exist
370 // There are no NULL values in the rows written
371 assert(values_read == 1);
372 assert(value.len == FIXED_LENGTH);
373 assert(memcmp(value.ptr, &expected_value[0], FIXED_LENGTH) == 0);
374 assert(definition_level == 1);
375 } else {
376 // There are NULL values in the rows written
377 assert(values_read == 0);
378 assert(definition_level == 0);
379 }
380 i++;
381 }
382
383 // Get the Column Reader for the FixedLengthByteArray column
384 column_reader = row_group_reader->Column(7);
385 parquet::FixedLenByteArrayReader* flba_reader =
386 static_cast<parquet::FixedLenByteArrayReader*>(column_reader.get());
387 // Read all the rows in the column
388 i = 0;
389 while (flba_reader->HasNext()) {
390 parquet::FixedLenByteArray value;
391 // Read one value at a time. The number of rows read is returned. values_read
392 // contains the number of non-null rows
393 rows_read = flba_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
394 // Ensure only one value is read
395 assert(rows_read == 1);
396 // There are no NULL values in the rows written
397 assert(values_read == 1);
398 // Verify the value written
399 char v = static_cast<char>(i);
400 char expected_value[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v};
401 assert(memcmp(value.ptr, &expected_value[0], FIXED_LENGTH) == 0);
402 i++;
403 }
404 }
405 } catch (const std::exception& e) {
406 std::cerr << "Parquet read error: " << e.what() << std::endl;
407 return -1;
408 }
409
410 std::cout << "Parquet Writing and Reading Complete" << std::endl;
411
412 return 0;
413 }
414