1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 // This module defines an abstract interface for iterating through pages in a
19 // Parquet column chunk within a row group. It could be extended in the future
20 // to iterate through all data pages in all chunks in a file.
21 
22 #include <sstream>
23 
24 #include <arrow/io/file.h>
25 
26 #include "arrow/testing/future_util.h"
27 #include "parquet/encryption/test_encryption_util.h"
28 #include "parquet/file_reader.h"
29 #include "parquet/file_writer.h"
30 #include "parquet/test_util.h"
31 
32 using ::arrow::io::FileOutputStream;
33 
34 using parquet::ConvertedType;
35 using parquet::Repetition;
36 using parquet::Type;
37 using parquet::schema::GroupNode;
38 using parquet::schema::PrimitiveNode;
39 
40 namespace parquet {
41 namespace encryption {
42 namespace test {
43 
data_file(const char * file)44 std::string data_file(const char* file) {
45   std::string dir_string(parquet::test::get_data_dir());
46   std::stringstream ss;
47   ss << dir_string << "/" << file;
48   return ss.str();
49 }
50 
BuildKeyMap(const char * const * column_ids,const char * const * column_keys,const char * footer_id,const char * footer_key)51 std::unordered_map<std::string, std::string> BuildKeyMap(const char* const* column_ids,
52                                                          const char* const* column_keys,
53                                                          const char* footer_id,
54                                                          const char* footer_key) {
55   std::unordered_map<std::string, std::string> key_map;
56   // add column keys
57   for (int i = 0; i < 6; i++) {
58     key_map.insert({column_ids[i], column_keys[i]});
59   }
60   // add footer key
61   key_map.insert({footer_id, footer_key});
62 
63   return key_map;
64 }
65 
BuildColumnKeyMapping()66 std::string BuildColumnKeyMapping() {
67   std::ostringstream stream;
68   stream << kColumnMasterKeyIds[0] << ":" << kDoubleFieldName << ";"
69          << kColumnMasterKeyIds[1] << ":" << kFloatFieldName << ";"
70          << kColumnMasterKeyIds[2] << ":" << kBooleanFieldName << ";"
71          << kColumnMasterKeyIds[3] << ":" << kInt32FieldName << ";"
72          << kColumnMasterKeyIds[4] << ":" << kByteArrayFieldName << ";"
73          << kColumnMasterKeyIds[5] << ":" << kFixedLenByteArrayFieldName << ";";
74   return stream.str();
75 }
76 
77 template <typename DType>
78 struct ColumnData {
79   typedef typename DType::c_type T;
80 
81   std::vector<T> values;
82   std::vector<int16_t> definition_levels;
83   std::vector<int16_t> repetition_levels;
84 
rowsparquet::encryption::test::ColumnData85   int64_t rows() const { return values.size(); }
raw_valuesparquet::encryption::test::ColumnData86   const T* raw_values() const { return values.data(); }
raw_definition_levelsparquet::encryption::test::ColumnData87   const int16_t* raw_definition_levels() const {
88     return definition_levels.size() == 0 ? nullptr : definition_levels.data();
89   }
raw_repetition_levelsparquet::encryption::test::ColumnData90   const int16_t* raw_repetition_levels() const {
91     return repetition_levels.size() == 0 ? nullptr : repetition_levels.data();
92   }
93 };
94 
95 template <typename DType>
GenerateSampleData(int rows)96 ColumnData<DType> GenerateSampleData(int rows) {
97   return ColumnData<DType>();
98 }
99 
100 template <>
GenerateSampleData(int rows)101 ColumnData<Int32Type> GenerateSampleData<Int32Type>(int rows) {
102   ColumnData<Int32Type> int32_col;
103   // Int32 column
104   for (int i = 0; i < rows; i++) {
105     int32_col.values.push_back(i);
106   }
107   return int32_col;
108 }
109 
110 template <>
GenerateSampleData(int rows)111 ColumnData<Int64Type> GenerateSampleData<Int64Type>(int rows) {
112   ColumnData<Int64Type> int64_col;
113   // The Int64 column. Each row has repeats twice.
114   for (int i = 0; i < 2 * rows; i++) {
115     int64_t value = i * 1000 * 1000;
116     value *= 1000 * 1000;
117     int16_t definition_level = 1;
118     int16_t repetition_level = 0;
119     if ((i % 2) == 0) {
120       repetition_level = 1;  // start of a new record
121     }
122     int64_col.values.push_back(value);
123     int64_col.definition_levels.push_back(definition_level);
124     int64_col.repetition_levels.push_back(repetition_level);
125   }
126   return int64_col;
127 }
128 
129 template <>
GenerateSampleData(int rows)130 ColumnData<Int96Type> GenerateSampleData<Int96Type>(int rows) {
131   ColumnData<Int96Type> int96_col;
132   for (int i = 0; i < rows; i++) {
133     parquet::Int96 value;
134     value.value[0] = i;
135     value.value[1] = i + 1;
136     value.value[2] = i + 2;
137     int96_col.values.push_back(value);
138   }
139   return int96_col;
140 }
141 
142 template <>
GenerateSampleData(int rows)143 ColumnData<FloatType> GenerateSampleData<FloatType>(int rows) {
144   ColumnData<FloatType> float_col;
145   for (int i = 0; i < rows; i++) {
146     float value = static_cast<float>(i) * 1.1f;
147     float_col.values.push_back(value);
148   }
149   return float_col;
150 }
151 
152 template <>
GenerateSampleData(int rows)153 ColumnData<DoubleType> GenerateSampleData<DoubleType>(int rows) {
154   ColumnData<DoubleType> double_col;
155   for (int i = 0; i < rows; i++) {
156     double value = i * 1.1111111;
157     double_col.values.push_back(value);
158   }
159   return double_col;
160 }
161 
162 template <typename DType, typename NextFunc>
WriteBatch(int rows,const NextFunc get_next_column)163 void WriteBatch(int rows, const NextFunc get_next_column) {
164   ColumnData<DType> column = GenerateSampleData<DType>(rows);
165   TypedColumnWriter<DType>* writer =
166       static_cast<TypedColumnWriter<DType>*>(get_next_column());
167   writer->WriteBatch(column.rows(), column.raw_definition_levels(),
168                      column.raw_repetition_levels(), column.raw_values());
169 }
170 
FileEncryptor()171 FileEncryptor::FileEncryptor() { schema_ = SetupEncryptionSchema(); }
172 
SetupEncryptionSchema()173 std::shared_ptr<GroupNode> FileEncryptor::SetupEncryptionSchema() {
174   parquet::schema::NodeVector fields;
175 
176   fields.push_back(PrimitiveNode::Make(kBooleanFieldName, Repetition::REQUIRED,
177                                        Type::BOOLEAN, ConvertedType::NONE));
178 
179   fields.push_back(PrimitiveNode::Make(kInt32FieldName, Repetition::REQUIRED, Type::INT32,
180                                        ConvertedType::TIME_MILLIS));
181 
182   fields.push_back(PrimitiveNode::Make(kInt64FieldName, Repetition::REPEATED, Type::INT64,
183                                        ConvertedType::NONE));
184 
185   fields.push_back(PrimitiveNode::Make(kInt96FieldName, Repetition::REQUIRED, Type::INT96,
186                                        ConvertedType::NONE));
187 
188   fields.push_back(PrimitiveNode::Make(kFloatFieldName, Repetition::REQUIRED, Type::FLOAT,
189                                        ConvertedType::NONE));
190 
191   fields.push_back(PrimitiveNode::Make(kDoubleFieldName, Repetition::REQUIRED,
192                                        Type::DOUBLE, ConvertedType::NONE));
193 
194   fields.push_back(PrimitiveNode::Make(kByteArrayFieldName, Repetition::OPTIONAL,
195                                        Type::BYTE_ARRAY, ConvertedType::NONE));
196 
197   fields.push_back(PrimitiveNode::Make(kFixedLenByteArrayFieldName, Repetition::REQUIRED,
198                                        Type::FIXED_LEN_BYTE_ARRAY, ConvertedType::NONE,
199                                        kFixedLength));
200 
201   return std::static_pointer_cast<GroupNode>(
202       GroupNode::Make("schema", Repetition::REQUIRED, fields));
203 }
204 
EncryptFile(std::string file,std::shared_ptr<parquet::FileEncryptionProperties> encryption_configurations)205 void FileEncryptor::EncryptFile(
206     std::string file,
207     std::shared_ptr<parquet::FileEncryptionProperties> encryption_configurations) {
208   WriterProperties::Builder prop_builder;
209   prop_builder.compression(parquet::Compression::SNAPPY);
210   prop_builder.encryption(encryption_configurations);
211   std::shared_ptr<WriterProperties> writer_properties = prop_builder.build();
212 
213   PARQUET_ASSIGN_OR_THROW(auto out_file, FileOutputStream::Open(file));
214   // Create a ParquetFileWriter instance
215   std::shared_ptr<parquet::ParquetFileWriter> file_writer =
216       parquet::ParquetFileWriter::Open(out_file, schema_, writer_properties);
217 
218   for (int r = 0; r < num_rowgroups_; r++) {
219     bool buffered_mode = r % 2 == 0;
220     auto row_group_writer = buffered_mode ? file_writer->AppendBufferedRowGroup()
221                                           : file_writer->AppendRowGroup();
222 
223     int column_index = 0;
224     // Captures i by reference; increments it by one
225     auto get_next_column = [&]() {
226       return buffered_mode ? row_group_writer->column(column_index++)
227                            : row_group_writer->NextColumn();
228     };
229 
230     // Write the Bool column
231     parquet::BoolWriter* bool_writer =
232         static_cast<parquet::BoolWriter*>(get_next_column());
233     for (int i = 0; i < rows_per_rowgroup_; i++) {
234       bool value = ((i % 2) == 0) ? true : false;
235       bool_writer->WriteBatch(1, nullptr, nullptr, &value);
236     }
237 
238     // Write the Int32 column
239     WriteBatch<Int32Type>(rows_per_rowgroup_, get_next_column);
240 
241     // Write the Int64 column.
242     WriteBatch<Int64Type>(rows_per_rowgroup_, get_next_column);
243 
244     // Write the INT96 column.
245     WriteBatch<Int96Type>(rows_per_rowgroup_, get_next_column);
246 
247     // Write the Float column
248     WriteBatch<FloatType>(rows_per_rowgroup_, get_next_column);
249 
250     // Write the Double column
251     WriteBatch<DoubleType>(rows_per_rowgroup_, get_next_column);
252 
253     // Write the ByteArray column. Make every alternate values NULL
254     // Write the ByteArray column. Make every alternate values NULL
255     parquet::ByteArrayWriter* ba_writer =
256         static_cast<parquet::ByteArrayWriter*>(get_next_column());
257     for (int i = 0; i < rows_per_rowgroup_; i++) {
258       parquet::ByteArray value;
259       char hello[kFixedLength] = "parquet";
260       hello[7] = static_cast<char>(static_cast<int>('0') + i / 100);
261       hello[8] = static_cast<char>(static_cast<int>('0') + (i / 10) % 10);
262       hello[9] = static_cast<char>(static_cast<int>('0') + i % 10);
263       if (i % 2 == 0) {
264         int16_t definition_level = 1;
265         value.ptr = reinterpret_cast<const uint8_t*>(&hello[0]);
266         value.len = kFixedLength;
267         ba_writer->WriteBatch(1, &definition_level, nullptr, &value);
268       } else {
269         int16_t definition_level = 0;
270         ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr);
271       }
272     }
273 
274     // Write the FixedLengthByteArray column
275     parquet::FixedLenByteArrayWriter* flba_writer =
276         static_cast<parquet::FixedLenByteArrayWriter*>(get_next_column());
277     for (int i = 0; i < rows_per_rowgroup_; i++) {
278       parquet::FixedLenByteArray value;
279       char v = static_cast<char>(i);
280       char flba[kFixedLength] = {v, v, v, v, v, v, v, v, v, v};
281       value.ptr = reinterpret_cast<const uint8_t*>(&flba[0]);
282       flba_writer->WriteBatch(1, nullptr, nullptr, &value);
283     }
284   }
285 
286   // Close the ParquetFileWriter
287   file_writer->Close();
288   PARQUET_THROW_NOT_OK(out_file->Close());
289 
290   return;
291 }  // namespace test
292 
293 template <typename DType, typename RowGroupReader, typename RowGroupMetadata>
ReadAndVerifyColumn(RowGroupReader * rg_reader,RowGroupMetadata * rg_md,int column_index,int rows)294 void ReadAndVerifyColumn(RowGroupReader* rg_reader, RowGroupMetadata* rg_md,
295                          int column_index, int rows) {
296   ColumnData<DType> expected_column_data = GenerateSampleData<DType>(rows);
297   std::shared_ptr<parquet::ColumnReader> column_reader = rg_reader->Column(column_index);
298   TypedColumnReader<DType>* reader =
299       static_cast<TypedColumnReader<DType>*>(column_reader.get());
300 
301   std::unique_ptr<ColumnChunkMetaData> col_md = rg_md->ColumnChunk(column_index);
302 
303   int64_t rows_should_read = expected_column_data.values.size();
304 
305   // Read all the rows in the column
306   ColumnData<DType> read_col_data;
307   read_col_data.values.resize(rows_should_read);
308   int64_t values_read;
309   int64_t rows_read;
310   if (expected_column_data.definition_levels.size() > 0 &&
311       expected_column_data.repetition_levels.size() > 0) {
312     std::vector<int16_t> definition_levels(rows_should_read);
313     std::vector<int16_t> repetition_levels(rows_should_read);
314     rows_read = reader->ReadBatch(rows_should_read, definition_levels.data(),
315                                   repetition_levels.data(), read_col_data.values.data(),
316                                   &values_read);
317     ASSERT_EQ(definition_levels, expected_column_data.definition_levels);
318     ASSERT_EQ(repetition_levels, expected_column_data.repetition_levels);
319   } else {
320     rows_read = reader->ReadBatch(rows_should_read, nullptr, nullptr,
321                                   read_col_data.values.data(), &values_read);
322   }
323   ASSERT_EQ(rows_read, rows_should_read);
324   ASSERT_EQ(values_read, rows_should_read);
325   ASSERT_EQ(read_col_data.values, expected_column_data.values);
326   // make sure we got the same number of values the metadata says
327   ASSERT_EQ(col_md->num_values(), rows_read);
328 }
329 
DecryptFile(std::string file,std::shared_ptr<FileDecryptionProperties> file_decryption_properties)330 void FileDecryptor::DecryptFile(
331     std::string file,
332     std::shared_ptr<FileDecryptionProperties> file_decryption_properties) {
333   std::string exception_msg;
334   parquet::ReaderProperties reader_properties = parquet::default_reader_properties();
335   if (file_decryption_properties) {
336     reader_properties.file_decryption_properties(file_decryption_properties->DeepClone());
337   }
338 
339   std::shared_ptr<::arrow::io::RandomAccessFile> source;
340   PARQUET_ASSIGN_OR_THROW(
341       source, ::arrow::io::ReadableFile::Open(file, reader_properties.memory_pool()));
342 
343   auto file_reader = parquet::ParquetFileReader::Open(source, reader_properties);
344   CheckFile(file_reader.get(), file_decryption_properties.get());
345 
346   if (file_decryption_properties) {
347     reader_properties.file_decryption_properties(file_decryption_properties->DeepClone());
348   }
349   auto fut = parquet::ParquetFileReader::OpenAsync(source, reader_properties);
350   ASSERT_FINISHES_OK(fut);
351   ASSERT_OK_AND_ASSIGN(file_reader, fut.MoveResult());
352   CheckFile(file_reader.get(), file_decryption_properties.get());
353 
354   file_reader->Close();
355   PARQUET_THROW_NOT_OK(source->Close());
356 }
357 
CheckFile(parquet::ParquetFileReader * file_reader,FileDecryptionProperties * file_decryption_properties)358 void FileDecryptor::CheckFile(parquet::ParquetFileReader* file_reader,
359                               FileDecryptionProperties* file_decryption_properties) {
360   // Get the File MetaData
361   std::shared_ptr<parquet::FileMetaData> file_metadata = file_reader->metadata();
362 
363   // Get the number of RowGroups
364   int num_row_groups = file_metadata->num_row_groups();
365 
366   // Get the number of Columns
367   int num_columns = file_metadata->num_columns();
368   ASSERT_EQ(num_columns, 8);
369 
370   // Iterate over all the RowGroups in the file
371   for (int r = 0; r < num_row_groups; ++r) {
372     // Get the RowGroup Reader
373     std::shared_ptr<parquet::RowGroupReader> row_group_reader = file_reader->RowGroup(r);
374 
375     // Get the RowGroupMetaData
376     std::unique_ptr<RowGroupMetaData> rg_metadata = file_metadata->RowGroup(r);
377 
378     int rows_per_rowgroup = static_cast<int>(rg_metadata->num_rows());
379 
380     int64_t values_read = 0;
381     int64_t rows_read = 0;
382     int16_t definition_level;
383     // int16_t repetition_level;
384     int i;
385     std::shared_ptr<parquet::ColumnReader> column_reader;
386 
387     // Get the Column Reader for the boolean column
388     column_reader = row_group_reader->Column(0);
389     parquet::BoolReader* bool_reader =
390         static_cast<parquet::BoolReader*>(column_reader.get());
391 
392     // Get the ColumnChunkMetaData for the boolean column
393     std::unique_ptr<ColumnChunkMetaData> boolean_md = rg_metadata->ColumnChunk(0);
394 
395     // Read all the rows in the column
396     i = 0;
397     while (bool_reader->HasNext()) {
398       bool value;
399       // Read one value at a time. The number of rows read is returned. values_read
400       // contains the number of non-null rows
401       rows_read = bool_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
402       // Ensure only one value is read
403       ASSERT_EQ(rows_read, 1);
404       // There are no NULL values in the rows written
405       ASSERT_EQ(values_read, 1);
406       // Verify the value written
407       bool expected_value = ((i % 2) == 0) ? true : false;
408       ASSERT_EQ(value, expected_value);
409       i++;
410     }
411     // make sure we got the same number of values the metadata says
412     ASSERT_EQ(boolean_md->num_values(), i);
413 
414     ReadAndVerifyColumn<Int32Type>(row_group_reader.get(), rg_metadata.get(), 1,
415                                    rows_per_rowgroup);
416 
417     ReadAndVerifyColumn<Int64Type>(row_group_reader.get(), rg_metadata.get(), 2,
418                                    rows_per_rowgroup);
419 
420     ReadAndVerifyColumn<Int96Type>(row_group_reader.get(), rg_metadata.get(), 3,
421                                    rows_per_rowgroup);
422 
423     if (file_decryption_properties) {
424       ReadAndVerifyColumn<FloatType>(row_group_reader.get(), rg_metadata.get(), 4,
425                                      rows_per_rowgroup);
426 
427       ReadAndVerifyColumn<DoubleType>(row_group_reader.get(), rg_metadata.get(), 5,
428                                       rows_per_rowgroup);
429     }
430 
431     // Get the Column Reader for the ByteArray column
432     column_reader = row_group_reader->Column(6);
433     parquet::ByteArrayReader* ba_reader =
434         static_cast<parquet::ByteArrayReader*>(column_reader.get());
435 
436     // Get the ColumnChunkMetaData for the ByteArray column
437     std::unique_ptr<ColumnChunkMetaData> ba_md = rg_metadata->ColumnChunk(6);
438 
439     // Read all the rows in the column
440     i = 0;
441     while (ba_reader->HasNext()) {
442       parquet::ByteArray value;
443       // Read one value at a time. The number of rows read is returned. values_read
444       // contains the number of non-null rows
445       rows_read =
446           ba_reader->ReadBatch(1, &definition_level, nullptr, &value, &values_read);
447       // Ensure only one value is read
448       ASSERT_EQ(rows_read, 1);
449       // Verify the value written
450       char expected_value[kFixedLength] = "parquet";
451       expected_value[7] = static_cast<char>('0' + i / 100);
452       expected_value[8] = static_cast<char>('0' + (i / 10) % 10);
453       expected_value[9] = static_cast<char>('0' + i % 10);
454       if (i % 2 == 0) {  // only alternate values exist
455         // There are no NULL values in the rows written
456         ASSERT_EQ(values_read, 1);
457         ASSERT_EQ(value.len, kFixedLength);
458         ASSERT_EQ(memcmp(value.ptr, &expected_value[0], kFixedLength), 0);
459         ASSERT_EQ(definition_level, 1);
460       } else {
461         // There are NULL values in the rows written
462         ASSERT_EQ(values_read, 0);
463         ASSERT_EQ(definition_level, 0);
464       }
465       i++;
466     }
467     // make sure we got the same number of values the metadata says
468     ASSERT_EQ(ba_md->num_values(), i);
469 
470     // Get the Column Reader for the FixedLengthByteArray column
471     column_reader = row_group_reader->Column(7);
472     parquet::FixedLenByteArrayReader* flba_reader =
473         static_cast<parquet::FixedLenByteArrayReader*>(column_reader.get());
474 
475     // Get the ColumnChunkMetaData for the FixedLengthByteArray column
476     std::unique_ptr<ColumnChunkMetaData> flba_md = rg_metadata->ColumnChunk(7);
477 
478     // Read all the rows in the column
479     i = 0;
480     while (flba_reader->HasNext()) {
481       parquet::FixedLenByteArray value;
482       // Read one value at a time. The number of rows read is returned. values_read
483       // contains the number of non-null rows
484       rows_read = flba_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
485       // Ensure only one value is read
486       ASSERT_EQ(rows_read, 1);
487       // There are no NULL values in the rows written
488       ASSERT_EQ(values_read, 1);
489       // Verify the value written
490       char v = static_cast<char>(i);
491       char expected_value[kFixedLength] = {v, v, v, v, v, v, v, v, v, v};
492       ASSERT_EQ(memcmp(value.ptr, &expected_value[0], kFixedLength), 0);
493       i++;
494     }
495     // make sure we got the same number of values the metadata says
496     ASSERT_EQ(flba_md->num_values(), i);
497   }
498 }
499 
500 }  // namespace test
501 }  // namespace encryption
502 }  // namespace parquet
503