1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 // This module defines an abstract interface for iterating through pages in a
19 // Parquet column chunk within a row group. It could be extended in the future
20 // to iterate through all data pages in all chunks in a file.
21
22 #include <sstream>
23
24 #include <arrow/io/file.h>
25
26 #include "arrow/testing/future_util.h"
27 #include "parquet/encryption/test_encryption_util.h"
28 #include "parquet/file_reader.h"
29 #include "parquet/file_writer.h"
30 #include "parquet/test_util.h"
31
32 using ::arrow::io::FileOutputStream;
33
34 using parquet::ConvertedType;
35 using parquet::Repetition;
36 using parquet::Type;
37 using parquet::schema::GroupNode;
38 using parquet::schema::PrimitiveNode;
39
40 namespace parquet {
41 namespace encryption {
42 namespace test {
43
data_file(const char * file)44 std::string data_file(const char* file) {
45 std::string dir_string(parquet::test::get_data_dir());
46 std::stringstream ss;
47 ss << dir_string << "/" << file;
48 return ss.str();
49 }
50
BuildKeyMap(const char * const * column_ids,const char * const * column_keys,const char * footer_id,const char * footer_key)51 std::unordered_map<std::string, std::string> BuildKeyMap(const char* const* column_ids,
52 const char* const* column_keys,
53 const char* footer_id,
54 const char* footer_key) {
55 std::unordered_map<std::string, std::string> key_map;
56 // add column keys
57 for (int i = 0; i < 6; i++) {
58 key_map.insert({column_ids[i], column_keys[i]});
59 }
60 // add footer key
61 key_map.insert({footer_id, footer_key});
62
63 return key_map;
64 }
65
BuildColumnKeyMapping()66 std::string BuildColumnKeyMapping() {
67 std::ostringstream stream;
68 stream << kColumnMasterKeyIds[0] << ":" << kDoubleFieldName << ";"
69 << kColumnMasterKeyIds[1] << ":" << kFloatFieldName << ";"
70 << kColumnMasterKeyIds[2] << ":" << kBooleanFieldName << ";"
71 << kColumnMasterKeyIds[3] << ":" << kInt32FieldName << ";"
72 << kColumnMasterKeyIds[4] << ":" << kByteArrayFieldName << ";"
73 << kColumnMasterKeyIds[5] << ":" << kFixedLenByteArrayFieldName << ";";
74 return stream.str();
75 }
76
77 template <typename DType>
78 struct ColumnData {
79 typedef typename DType::c_type T;
80
81 std::vector<T> values;
82 std::vector<int16_t> definition_levels;
83 std::vector<int16_t> repetition_levels;
84
rowsparquet::encryption::test::ColumnData85 int64_t rows() const { return values.size(); }
raw_valuesparquet::encryption::test::ColumnData86 const T* raw_values() const { return values.data(); }
raw_definition_levelsparquet::encryption::test::ColumnData87 const int16_t* raw_definition_levels() const {
88 return definition_levels.size() == 0 ? nullptr : definition_levels.data();
89 }
raw_repetition_levelsparquet::encryption::test::ColumnData90 const int16_t* raw_repetition_levels() const {
91 return repetition_levels.size() == 0 ? nullptr : repetition_levels.data();
92 }
93 };
94
95 template <typename DType>
GenerateSampleData(int rows)96 ColumnData<DType> GenerateSampleData(int rows) {
97 return ColumnData<DType>();
98 }
99
100 template <>
GenerateSampleData(int rows)101 ColumnData<Int32Type> GenerateSampleData<Int32Type>(int rows) {
102 ColumnData<Int32Type> int32_col;
103 // Int32 column
104 for (int i = 0; i < rows; i++) {
105 int32_col.values.push_back(i);
106 }
107 return int32_col;
108 }
109
110 template <>
GenerateSampleData(int rows)111 ColumnData<Int64Type> GenerateSampleData<Int64Type>(int rows) {
112 ColumnData<Int64Type> int64_col;
113 // The Int64 column. Each row has repeats twice.
114 for (int i = 0; i < 2 * rows; i++) {
115 int64_t value = i * 1000 * 1000;
116 value *= 1000 * 1000;
117 int16_t definition_level = 1;
118 int16_t repetition_level = 0;
119 if ((i % 2) == 0) {
120 repetition_level = 1; // start of a new record
121 }
122 int64_col.values.push_back(value);
123 int64_col.definition_levels.push_back(definition_level);
124 int64_col.repetition_levels.push_back(repetition_level);
125 }
126 return int64_col;
127 }
128
129 template <>
GenerateSampleData(int rows)130 ColumnData<Int96Type> GenerateSampleData<Int96Type>(int rows) {
131 ColumnData<Int96Type> int96_col;
132 for (int i = 0; i < rows; i++) {
133 parquet::Int96 value;
134 value.value[0] = i;
135 value.value[1] = i + 1;
136 value.value[2] = i + 2;
137 int96_col.values.push_back(value);
138 }
139 return int96_col;
140 }
141
142 template <>
GenerateSampleData(int rows)143 ColumnData<FloatType> GenerateSampleData<FloatType>(int rows) {
144 ColumnData<FloatType> float_col;
145 for (int i = 0; i < rows; i++) {
146 float value = static_cast<float>(i) * 1.1f;
147 float_col.values.push_back(value);
148 }
149 return float_col;
150 }
151
152 template <>
GenerateSampleData(int rows)153 ColumnData<DoubleType> GenerateSampleData<DoubleType>(int rows) {
154 ColumnData<DoubleType> double_col;
155 for (int i = 0; i < rows; i++) {
156 double value = i * 1.1111111;
157 double_col.values.push_back(value);
158 }
159 return double_col;
160 }
161
162 template <typename DType, typename NextFunc>
WriteBatch(int rows,const NextFunc get_next_column)163 void WriteBatch(int rows, const NextFunc get_next_column) {
164 ColumnData<DType> column = GenerateSampleData<DType>(rows);
165 TypedColumnWriter<DType>* writer =
166 static_cast<TypedColumnWriter<DType>*>(get_next_column());
167 writer->WriteBatch(column.rows(), column.raw_definition_levels(),
168 column.raw_repetition_levels(), column.raw_values());
169 }
170
FileEncryptor()171 FileEncryptor::FileEncryptor() { schema_ = SetupEncryptionSchema(); }
172
SetupEncryptionSchema()173 std::shared_ptr<GroupNode> FileEncryptor::SetupEncryptionSchema() {
174 parquet::schema::NodeVector fields;
175
176 fields.push_back(PrimitiveNode::Make(kBooleanFieldName, Repetition::REQUIRED,
177 Type::BOOLEAN, ConvertedType::NONE));
178
179 fields.push_back(PrimitiveNode::Make(kInt32FieldName, Repetition::REQUIRED, Type::INT32,
180 ConvertedType::TIME_MILLIS));
181
182 fields.push_back(PrimitiveNode::Make(kInt64FieldName, Repetition::REPEATED, Type::INT64,
183 ConvertedType::NONE));
184
185 fields.push_back(PrimitiveNode::Make(kInt96FieldName, Repetition::REQUIRED, Type::INT96,
186 ConvertedType::NONE));
187
188 fields.push_back(PrimitiveNode::Make(kFloatFieldName, Repetition::REQUIRED, Type::FLOAT,
189 ConvertedType::NONE));
190
191 fields.push_back(PrimitiveNode::Make(kDoubleFieldName, Repetition::REQUIRED,
192 Type::DOUBLE, ConvertedType::NONE));
193
194 fields.push_back(PrimitiveNode::Make(kByteArrayFieldName, Repetition::OPTIONAL,
195 Type::BYTE_ARRAY, ConvertedType::NONE));
196
197 fields.push_back(PrimitiveNode::Make(kFixedLenByteArrayFieldName, Repetition::REQUIRED,
198 Type::FIXED_LEN_BYTE_ARRAY, ConvertedType::NONE,
199 kFixedLength));
200
201 return std::static_pointer_cast<GroupNode>(
202 GroupNode::Make("schema", Repetition::REQUIRED, fields));
203 }
204
EncryptFile(std::string file,std::shared_ptr<parquet::FileEncryptionProperties> encryption_configurations)205 void FileEncryptor::EncryptFile(
206 std::string file,
207 std::shared_ptr<parquet::FileEncryptionProperties> encryption_configurations) {
208 WriterProperties::Builder prop_builder;
209 prop_builder.compression(parquet::Compression::SNAPPY);
210 prop_builder.encryption(encryption_configurations);
211 std::shared_ptr<WriterProperties> writer_properties = prop_builder.build();
212
213 PARQUET_ASSIGN_OR_THROW(auto out_file, FileOutputStream::Open(file));
214 // Create a ParquetFileWriter instance
215 std::shared_ptr<parquet::ParquetFileWriter> file_writer =
216 parquet::ParquetFileWriter::Open(out_file, schema_, writer_properties);
217
218 for (int r = 0; r < num_rowgroups_; r++) {
219 bool buffered_mode = r % 2 == 0;
220 auto row_group_writer = buffered_mode ? file_writer->AppendBufferedRowGroup()
221 : file_writer->AppendRowGroup();
222
223 int column_index = 0;
224 // Captures i by reference; increments it by one
225 auto get_next_column = [&]() {
226 return buffered_mode ? row_group_writer->column(column_index++)
227 : row_group_writer->NextColumn();
228 };
229
230 // Write the Bool column
231 parquet::BoolWriter* bool_writer =
232 static_cast<parquet::BoolWriter*>(get_next_column());
233 for (int i = 0; i < rows_per_rowgroup_; i++) {
234 bool value = ((i % 2) == 0) ? true : false;
235 bool_writer->WriteBatch(1, nullptr, nullptr, &value);
236 }
237
238 // Write the Int32 column
239 WriteBatch<Int32Type>(rows_per_rowgroup_, get_next_column);
240
241 // Write the Int64 column.
242 WriteBatch<Int64Type>(rows_per_rowgroup_, get_next_column);
243
244 // Write the INT96 column.
245 WriteBatch<Int96Type>(rows_per_rowgroup_, get_next_column);
246
247 // Write the Float column
248 WriteBatch<FloatType>(rows_per_rowgroup_, get_next_column);
249
250 // Write the Double column
251 WriteBatch<DoubleType>(rows_per_rowgroup_, get_next_column);
252
253 // Write the ByteArray column. Make every alternate values NULL
254 // Write the ByteArray column. Make every alternate values NULL
255 parquet::ByteArrayWriter* ba_writer =
256 static_cast<parquet::ByteArrayWriter*>(get_next_column());
257 for (int i = 0; i < rows_per_rowgroup_; i++) {
258 parquet::ByteArray value;
259 char hello[kFixedLength] = "parquet";
260 hello[7] = static_cast<char>(static_cast<int>('0') + i / 100);
261 hello[8] = static_cast<char>(static_cast<int>('0') + (i / 10) % 10);
262 hello[9] = static_cast<char>(static_cast<int>('0') + i % 10);
263 if (i % 2 == 0) {
264 int16_t definition_level = 1;
265 value.ptr = reinterpret_cast<const uint8_t*>(&hello[0]);
266 value.len = kFixedLength;
267 ba_writer->WriteBatch(1, &definition_level, nullptr, &value);
268 } else {
269 int16_t definition_level = 0;
270 ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr);
271 }
272 }
273
274 // Write the FixedLengthByteArray column
275 parquet::FixedLenByteArrayWriter* flba_writer =
276 static_cast<parquet::FixedLenByteArrayWriter*>(get_next_column());
277 for (int i = 0; i < rows_per_rowgroup_; i++) {
278 parquet::FixedLenByteArray value;
279 char v = static_cast<char>(i);
280 char flba[kFixedLength] = {v, v, v, v, v, v, v, v, v, v};
281 value.ptr = reinterpret_cast<const uint8_t*>(&flba[0]);
282 flba_writer->WriteBatch(1, nullptr, nullptr, &value);
283 }
284 }
285
286 // Close the ParquetFileWriter
287 file_writer->Close();
288 PARQUET_THROW_NOT_OK(out_file->Close());
289
290 return;
291 } // namespace test
292
293 template <typename DType, typename RowGroupReader, typename RowGroupMetadata>
ReadAndVerifyColumn(RowGroupReader * rg_reader,RowGroupMetadata * rg_md,int column_index,int rows)294 void ReadAndVerifyColumn(RowGroupReader* rg_reader, RowGroupMetadata* rg_md,
295 int column_index, int rows) {
296 ColumnData<DType> expected_column_data = GenerateSampleData<DType>(rows);
297 std::shared_ptr<parquet::ColumnReader> column_reader = rg_reader->Column(column_index);
298 TypedColumnReader<DType>* reader =
299 static_cast<TypedColumnReader<DType>*>(column_reader.get());
300
301 std::unique_ptr<ColumnChunkMetaData> col_md = rg_md->ColumnChunk(column_index);
302
303 int64_t rows_should_read = expected_column_data.values.size();
304
305 // Read all the rows in the column
306 ColumnData<DType> read_col_data;
307 read_col_data.values.resize(rows_should_read);
308 int64_t values_read;
309 int64_t rows_read;
310 if (expected_column_data.definition_levels.size() > 0 &&
311 expected_column_data.repetition_levels.size() > 0) {
312 std::vector<int16_t> definition_levels(rows_should_read);
313 std::vector<int16_t> repetition_levels(rows_should_read);
314 rows_read = reader->ReadBatch(rows_should_read, definition_levels.data(),
315 repetition_levels.data(), read_col_data.values.data(),
316 &values_read);
317 ASSERT_EQ(definition_levels, expected_column_data.definition_levels);
318 ASSERT_EQ(repetition_levels, expected_column_data.repetition_levels);
319 } else {
320 rows_read = reader->ReadBatch(rows_should_read, nullptr, nullptr,
321 read_col_data.values.data(), &values_read);
322 }
323 ASSERT_EQ(rows_read, rows_should_read);
324 ASSERT_EQ(values_read, rows_should_read);
325 ASSERT_EQ(read_col_data.values, expected_column_data.values);
326 // make sure we got the same number of values the metadata says
327 ASSERT_EQ(col_md->num_values(), rows_read);
328 }
329
DecryptFile(std::string file,std::shared_ptr<FileDecryptionProperties> file_decryption_properties)330 void FileDecryptor::DecryptFile(
331 std::string file,
332 std::shared_ptr<FileDecryptionProperties> file_decryption_properties) {
333 std::string exception_msg;
334 parquet::ReaderProperties reader_properties = parquet::default_reader_properties();
335 if (file_decryption_properties) {
336 reader_properties.file_decryption_properties(file_decryption_properties->DeepClone());
337 }
338
339 std::shared_ptr<::arrow::io::RandomAccessFile> source;
340 PARQUET_ASSIGN_OR_THROW(
341 source, ::arrow::io::ReadableFile::Open(file, reader_properties.memory_pool()));
342
343 auto file_reader = parquet::ParquetFileReader::Open(source, reader_properties);
344 CheckFile(file_reader.get(), file_decryption_properties.get());
345
346 if (file_decryption_properties) {
347 reader_properties.file_decryption_properties(file_decryption_properties->DeepClone());
348 }
349 auto fut = parquet::ParquetFileReader::OpenAsync(source, reader_properties);
350 ASSERT_FINISHES_OK(fut);
351 ASSERT_OK_AND_ASSIGN(file_reader, fut.MoveResult());
352 CheckFile(file_reader.get(), file_decryption_properties.get());
353
354 file_reader->Close();
355 PARQUET_THROW_NOT_OK(source->Close());
356 }
357
CheckFile(parquet::ParquetFileReader * file_reader,FileDecryptionProperties * file_decryption_properties)358 void FileDecryptor::CheckFile(parquet::ParquetFileReader* file_reader,
359 FileDecryptionProperties* file_decryption_properties) {
360 // Get the File MetaData
361 std::shared_ptr<parquet::FileMetaData> file_metadata = file_reader->metadata();
362
363 // Get the number of RowGroups
364 int num_row_groups = file_metadata->num_row_groups();
365
366 // Get the number of Columns
367 int num_columns = file_metadata->num_columns();
368 ASSERT_EQ(num_columns, 8);
369
370 // Iterate over all the RowGroups in the file
371 for (int r = 0; r < num_row_groups; ++r) {
372 // Get the RowGroup Reader
373 std::shared_ptr<parquet::RowGroupReader> row_group_reader = file_reader->RowGroup(r);
374
375 // Get the RowGroupMetaData
376 std::unique_ptr<RowGroupMetaData> rg_metadata = file_metadata->RowGroup(r);
377
378 int rows_per_rowgroup = static_cast<int>(rg_metadata->num_rows());
379
380 int64_t values_read = 0;
381 int64_t rows_read = 0;
382 int16_t definition_level;
383 // int16_t repetition_level;
384 int i;
385 std::shared_ptr<parquet::ColumnReader> column_reader;
386
387 // Get the Column Reader for the boolean column
388 column_reader = row_group_reader->Column(0);
389 parquet::BoolReader* bool_reader =
390 static_cast<parquet::BoolReader*>(column_reader.get());
391
392 // Get the ColumnChunkMetaData for the boolean column
393 std::unique_ptr<ColumnChunkMetaData> boolean_md = rg_metadata->ColumnChunk(0);
394
395 // Read all the rows in the column
396 i = 0;
397 while (bool_reader->HasNext()) {
398 bool value;
399 // Read one value at a time. The number of rows read is returned. values_read
400 // contains the number of non-null rows
401 rows_read = bool_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
402 // Ensure only one value is read
403 ASSERT_EQ(rows_read, 1);
404 // There are no NULL values in the rows written
405 ASSERT_EQ(values_read, 1);
406 // Verify the value written
407 bool expected_value = ((i % 2) == 0) ? true : false;
408 ASSERT_EQ(value, expected_value);
409 i++;
410 }
411 // make sure we got the same number of values the metadata says
412 ASSERT_EQ(boolean_md->num_values(), i);
413
414 ReadAndVerifyColumn<Int32Type>(row_group_reader.get(), rg_metadata.get(), 1,
415 rows_per_rowgroup);
416
417 ReadAndVerifyColumn<Int64Type>(row_group_reader.get(), rg_metadata.get(), 2,
418 rows_per_rowgroup);
419
420 ReadAndVerifyColumn<Int96Type>(row_group_reader.get(), rg_metadata.get(), 3,
421 rows_per_rowgroup);
422
423 if (file_decryption_properties) {
424 ReadAndVerifyColumn<FloatType>(row_group_reader.get(), rg_metadata.get(), 4,
425 rows_per_rowgroup);
426
427 ReadAndVerifyColumn<DoubleType>(row_group_reader.get(), rg_metadata.get(), 5,
428 rows_per_rowgroup);
429 }
430
431 // Get the Column Reader for the ByteArray column
432 column_reader = row_group_reader->Column(6);
433 parquet::ByteArrayReader* ba_reader =
434 static_cast<parquet::ByteArrayReader*>(column_reader.get());
435
436 // Get the ColumnChunkMetaData for the ByteArray column
437 std::unique_ptr<ColumnChunkMetaData> ba_md = rg_metadata->ColumnChunk(6);
438
439 // Read all the rows in the column
440 i = 0;
441 while (ba_reader->HasNext()) {
442 parquet::ByteArray value;
443 // Read one value at a time. The number of rows read is returned. values_read
444 // contains the number of non-null rows
445 rows_read =
446 ba_reader->ReadBatch(1, &definition_level, nullptr, &value, &values_read);
447 // Ensure only one value is read
448 ASSERT_EQ(rows_read, 1);
449 // Verify the value written
450 char expected_value[kFixedLength] = "parquet";
451 expected_value[7] = static_cast<char>('0' + i / 100);
452 expected_value[8] = static_cast<char>('0' + (i / 10) % 10);
453 expected_value[9] = static_cast<char>('0' + i % 10);
454 if (i % 2 == 0) { // only alternate values exist
455 // There are no NULL values in the rows written
456 ASSERT_EQ(values_read, 1);
457 ASSERT_EQ(value.len, kFixedLength);
458 ASSERT_EQ(memcmp(value.ptr, &expected_value[0], kFixedLength), 0);
459 ASSERT_EQ(definition_level, 1);
460 } else {
461 // There are NULL values in the rows written
462 ASSERT_EQ(values_read, 0);
463 ASSERT_EQ(definition_level, 0);
464 }
465 i++;
466 }
467 // make sure we got the same number of values the metadata says
468 ASSERT_EQ(ba_md->num_values(), i);
469
470 // Get the Column Reader for the FixedLengthByteArray column
471 column_reader = row_group_reader->Column(7);
472 parquet::FixedLenByteArrayReader* flba_reader =
473 static_cast<parquet::FixedLenByteArrayReader*>(column_reader.get());
474
475 // Get the ColumnChunkMetaData for the FixedLengthByteArray column
476 std::unique_ptr<ColumnChunkMetaData> flba_md = rg_metadata->ColumnChunk(7);
477
478 // Read all the rows in the column
479 i = 0;
480 while (flba_reader->HasNext()) {
481 parquet::FixedLenByteArray value;
482 // Read one value at a time. The number of rows read is returned. values_read
483 // contains the number of non-null rows
484 rows_read = flba_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
485 // Ensure only one value is read
486 ASSERT_EQ(rows_read, 1);
487 // There are no NULL values in the rows written
488 ASSERT_EQ(values_read, 1);
489 // Verify the value written
490 char v = static_cast<char>(i);
491 char expected_value[kFixedLength] = {v, v, v, v, v, v, v, v, v, v};
492 ASSERT_EQ(memcmp(value.ptr, &expected_value[0], kFixedLength), 0);
493 i++;
494 }
495 // make sure we got the same number of values the metadata says
496 ASSERT_EQ(flba_md->num_values(), i);
497 }
498 }
499
500 } // namespace test
501 } // namespace encryption
502 } // namespace parquet
503