1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include "parquet/stream_writer.h"
19 
20 #include <utility>
21 
22 namespace parquet {
23 
24 int64_t StreamWriter::default_row_group_size_{512 * 1024 * 1024};  // 512MB
25 
26 constexpr int16_t StreamWriter::kDefLevelZero;
27 constexpr int16_t StreamWriter::kDefLevelOne;
28 constexpr int16_t StreamWriter::kRepLevelZero;
29 constexpr int64_t StreamWriter::kBatchSizeOne;
30 
FixedStringView(const char * data_ptr)31 StreamWriter::FixedStringView::FixedStringView(const char* data_ptr)
32     : data{data_ptr}, size{std::strlen(data_ptr)} {}
33 
FixedStringView(const char * data_ptr,std::size_t data_len)34 StreamWriter::FixedStringView::FixedStringView(const char* data_ptr, std::size_t data_len)
35     : data{data_ptr}, size{data_len} {}
36 
StreamWriter(std::unique_ptr<ParquetFileWriter> writer)37 StreamWriter::StreamWriter(std::unique_ptr<ParquetFileWriter> writer)
38     : file_writer_{std::move(writer)},
39       row_group_writer_{file_writer_->AppendBufferedRowGroup()} {
40   auto schema = file_writer_->schema();
41   auto group_node = schema->group_node();
42 
43   nodes_.resize(schema->num_columns());
44 
45   for (auto i = 0; i < schema->num_columns(); ++i) {
46     nodes_[i] = std::static_pointer_cast<schema::PrimitiveNode>(group_node->field(i));
47   }
48 }
49 
SetDefaultMaxRowGroupSize(int64_t max_size)50 void StreamWriter::SetDefaultMaxRowGroupSize(int64_t max_size) {
51   default_row_group_size_ = max_size;
52 }
53 
SetMaxRowGroupSize(int64_t max_size)54 void StreamWriter::SetMaxRowGroupSize(int64_t max_size) {
55   max_row_group_size_ = max_size;
56 }
57 
num_columns() const58 int StreamWriter::num_columns() const { return static_cast<int>(nodes_.size()); }
59 
operator <<(bool v)60 StreamWriter& StreamWriter::operator<<(bool v) {
61   CheckColumn(Type::BOOLEAN, ConvertedType::NONE);
62   return Write<BoolWriter>(v);
63 }
64 
operator <<(int8_t v)65 StreamWriter& StreamWriter::operator<<(int8_t v) {
66   CheckColumn(Type::INT32, ConvertedType::INT_8);
67   return Write<Int32Writer>(static_cast<int32_t>(v));
68 }
69 
operator <<(uint8_t v)70 StreamWriter& StreamWriter::operator<<(uint8_t v) {
71   CheckColumn(Type::INT32, ConvertedType::UINT_8);
72   return Write<Int32Writer>(static_cast<int32_t>(v));
73 }
74 
operator <<(int16_t v)75 StreamWriter& StreamWriter::operator<<(int16_t v) {
76   CheckColumn(Type::INT32, ConvertedType::INT_16);
77   return Write<Int32Writer>(static_cast<int32_t>(v));
78 }
79 
operator <<(uint16_t v)80 StreamWriter& StreamWriter::operator<<(uint16_t v) {
81   CheckColumn(Type::INT32, ConvertedType::UINT_16);
82   return Write<Int32Writer>(static_cast<int32_t>(v));
83 }
84 
operator <<(int32_t v)85 StreamWriter& StreamWriter::operator<<(int32_t v) {
86   CheckColumn(Type::INT32, ConvertedType::INT_32);
87   return Write<Int32Writer>(v);
88 }
89 
operator <<(uint32_t v)90 StreamWriter& StreamWriter::operator<<(uint32_t v) {
91   CheckColumn(Type::INT32, ConvertedType::UINT_32);
92   return Write<Int32Writer>(static_cast<int32_t>(v));
93 }
94 
operator <<(int64_t v)95 StreamWriter& StreamWriter::operator<<(int64_t v) {
96   CheckColumn(Type::INT64, ConvertedType::INT_64);
97   return Write<Int64Writer>(v);
98 }
99 
operator <<(uint64_t v)100 StreamWriter& StreamWriter::operator<<(uint64_t v) {
101   CheckColumn(Type::INT64, ConvertedType::UINT_64);
102   return Write<Int64Writer>(static_cast<int64_t>(v));
103 }
104 
operator <<(const std::chrono::milliseconds & v)105 StreamWriter& StreamWriter::operator<<(const std::chrono::milliseconds& v) {
106   CheckColumn(Type::INT64, ConvertedType::TIMESTAMP_MILLIS);
107   return Write<Int64Writer>(static_cast<int64_t>(v.count()));
108 }
109 
operator <<(const std::chrono::microseconds & v)110 StreamWriter& StreamWriter::operator<<(const std::chrono::microseconds& v) {
111   CheckColumn(Type::INT64, ConvertedType::TIMESTAMP_MICROS);
112   return Write<Int64Writer>(static_cast<int64_t>(v.count()));
113 }
114 
operator <<(float v)115 StreamWriter& StreamWriter::operator<<(float v) {
116   CheckColumn(Type::FLOAT, ConvertedType::NONE);
117   return Write<FloatWriter>(v);
118 }
119 
operator <<(double v)120 StreamWriter& StreamWriter::operator<<(double v) {
121   CheckColumn(Type::DOUBLE, ConvertedType::NONE);
122   return Write<DoubleWriter>(v);
123 }
124 
operator <<(char v)125 StreamWriter& StreamWriter::operator<<(char v) { return WriteFixedLength(&v, 1); }
126 
operator <<(FixedStringView v)127 StreamWriter& StreamWriter::operator<<(FixedStringView v) {
128   return WriteFixedLength(v.data, v.size);
129 }
130 
operator <<(const char * v)131 StreamWriter& StreamWriter::operator<<(const char* v) {
132   return WriteVariableLength(v, std::strlen(v));
133 }
134 
operator <<(const std::string & v)135 StreamWriter& StreamWriter::operator<<(const std::string& v) {
136   return WriteVariableLength(v.data(), v.size());
137 }
138 
operator <<(::arrow::util::string_view v)139 StreamWriter& StreamWriter::operator<<(::arrow::util::string_view v) {
140   return WriteVariableLength(v.data(), v.size());
141 }
142 
WriteVariableLength(const char * data_ptr,std::size_t data_len)143 StreamWriter& StreamWriter::WriteVariableLength(const char* data_ptr,
144                                                 std::size_t data_len) {
145   CheckColumn(Type::BYTE_ARRAY, ConvertedType::UTF8);
146 
147   auto writer = static_cast<ByteArrayWriter*>(row_group_writer_->column(column_index_++));
148 
149   if (data_ptr != nullptr) {
150     ByteArray ba_value;
151 
152     ba_value.ptr = reinterpret_cast<const uint8_t*>(data_ptr);
153     ba_value.len = static_cast<uint32_t>(data_len);
154 
155     writer->WriteBatch(kBatchSizeOne, &kDefLevelOne, &kRepLevelZero, &ba_value);
156   } else {
157     writer->WriteBatch(kBatchSizeOne, &kDefLevelZero, &kRepLevelZero, nullptr);
158   }
159   if (max_row_group_size_ > 0) {
160     row_group_size_ += writer->EstimatedBufferedValueBytes();
161   }
162   return *this;
163 }
164 
WriteFixedLength(const char * data_ptr,std::size_t data_len)165 StreamWriter& StreamWriter::WriteFixedLength(const char* data_ptr, std::size_t data_len) {
166   CheckColumn(Type::FIXED_LEN_BYTE_ARRAY, ConvertedType::NONE,
167               static_cast<int>(data_len));
168 
169   auto writer =
170       static_cast<FixedLenByteArrayWriter*>(row_group_writer_->column(column_index_++));
171 
172   if (data_ptr != nullptr) {
173     FixedLenByteArray flba_value;
174 
175     flba_value.ptr = reinterpret_cast<const uint8_t*>(data_ptr);
176     writer->WriteBatch(kBatchSizeOne, &kDefLevelOne, &kRepLevelZero, &flba_value);
177   } else {
178     writer->WriteBatch(kBatchSizeOne, &kDefLevelZero, &kRepLevelZero, nullptr);
179   }
180   if (max_row_group_size_ > 0) {
181     row_group_size_ += writer->EstimatedBufferedValueBytes();
182   }
183   return *this;
184 }
185 
CheckColumn(Type::type physical_type,ConvertedType::type converted_type,int length)186 void StreamWriter::CheckColumn(Type::type physical_type,
187                                ConvertedType::type converted_type, int length) {
188   if (static_cast<std::size_t>(column_index_) >= nodes_.size()) {
189     throw ParquetException("Column index out-of-bounds.  Index " +
190                            std::to_string(column_index_) + " is invalid for " +
191                            std::to_string(nodes_.size()) + " columns");
192   }
193   const auto& node = nodes_[column_index_];
194 
195   if (physical_type != node->physical_type()) {
196     throw ParquetException("Column physical type mismatch.  Column '" + node->name() +
197                            "' has physical type '" + TypeToString(node->physical_type()) +
198                            "' not '" + TypeToString(physical_type) + "'");
199   }
200   if (converted_type != node->converted_type()) {
201     throw ParquetException("Column converted type mismatch.  Column '" + node->name() +
202                            "' has converted type[" +
203                            ConvertedTypeToString(node->converted_type()) + "] not '" +
204                            ConvertedTypeToString(converted_type) + "'");
205   }
206   // Length must be exact.
207   // A shorter length fixed array is not acceptable as it would
208   // result in array bound read errors.
209   //
210   if (length != node->type_length()) {
211     throw ParquetException("Column length mismatch.  Column '" + node->name() +
212                            "' has length " + std::to_string(node->type_length()) +
213                            " not " + std::to_string(length));
214   }
215 }
216 
SkipColumns(int num_columns_to_skip)217 int64_t StreamWriter::SkipColumns(int num_columns_to_skip) {
218   int num_columns_skipped = 0;
219 
220   for (; (num_columns_to_skip > num_columns_skipped) &&
221          static_cast<std::size_t>(column_index_) < nodes_.size();
222        ++num_columns_skipped) {
223     const auto& node = nodes_[column_index_];
224 
225     if (node->is_required()) {
226       throw ParquetException("Cannot skip column '" + node->name() +
227                              "' as it is required.");
228     }
229     auto writer = row_group_writer_->column(column_index_++);
230 
231     WriteNullValue(writer);
232   }
233   return num_columns_skipped;
234 }
235 
WriteNullValue(ColumnWriter * writer)236 void StreamWriter::WriteNullValue(ColumnWriter* writer) {
237   switch (writer->type()) {
238     case Type::BOOLEAN:
239       static_cast<BoolWriter*>(writer)->WriteBatch(kBatchSizeOne, &kDefLevelZero,
240                                                    &kRepLevelZero, nullptr);
241       break;
242     case Type::INT32:
243       static_cast<Int32Writer*>(writer)->WriteBatch(kBatchSizeOne, &kDefLevelZero,
244                                                     &kRepLevelZero, nullptr);
245       break;
246     case Type::INT64:
247       static_cast<Int64Writer*>(writer)->WriteBatch(kBatchSizeOne, &kDefLevelZero,
248                                                     &kRepLevelZero, nullptr);
249       break;
250     case Type::BYTE_ARRAY:
251       static_cast<ByteArrayWriter*>(writer)->WriteBatch(kBatchSizeOne, &kDefLevelZero,
252                                                         &kRepLevelZero, nullptr);
253       break;
254     case Type::FIXED_LEN_BYTE_ARRAY:
255       static_cast<FixedLenByteArrayWriter*>(writer)->WriteBatch(
256           kBatchSizeOne, &kDefLevelZero, &kRepLevelZero, nullptr);
257       break;
258     case Type::FLOAT:
259       static_cast<FloatWriter*>(writer)->WriteBatch(kBatchSizeOne, &kDefLevelZero,
260                                                     &kRepLevelZero, nullptr);
261       break;
262     case Type::DOUBLE:
263       static_cast<DoubleWriter*>(writer)->WriteBatch(kBatchSizeOne, &kDefLevelZero,
264                                                      &kRepLevelZero, nullptr);
265       break;
266     case Type::INT96:
267     case Type::UNDEFINED:
268       throw ParquetException("Unexpected type: " + TypeToString(writer->type()));
269       break;
270   }
271 }
272 
SkipOptionalColumn()273 void StreamWriter::SkipOptionalColumn() {
274   if (SkipColumns(1) != 1) {
275     throw ParquetException("Failed to skip optional column at column index " +
276                            std::to_string(column_index_));
277   }
278 }
279 
EndRow()280 void StreamWriter::EndRow() {
281   if (!file_writer_) {
282     throw ParquetException("StreamWriter not initialized");
283   }
284   if (static_cast<std::size_t>(column_index_) < nodes_.size()) {
285     throw ParquetException("Cannot end row with " + std::to_string(column_index_) +
286                            " of " + std::to_string(nodes_.size()) + " columns written");
287   }
288   column_index_ = 0;
289   ++current_row_;
290 
291   if (max_row_group_size_ > 0) {
292     if (row_group_size_ > max_row_group_size_) {
293       EndRowGroup();
294     }
295     // Initialize for each row with size already written
296     // (compressed + uncompressed).
297     //
298     row_group_size_ = row_group_writer_->total_bytes_written() +
299                       row_group_writer_->total_compressed_bytes();
300   }
301 }
302 
EndRowGroup()303 void StreamWriter::EndRowGroup() {
304   if (!file_writer_) {
305     throw ParquetException("StreamWriter not initialized");
306   }
307   // Avoid creating empty row groups.
308   if (row_group_writer_->num_rows() > 0) {
309     row_group_writer_->Close();
310     row_group_writer_.reset(file_writer_->AppendBufferedRowGroup());
311   }
312 }
313 
operator <<(StreamWriter & os,EndRowType)314 StreamWriter& operator<<(StreamWriter& os, EndRowType) {
315   os.EndRow();
316   return os;
317 }
318 
operator <<(StreamWriter & os,EndRowGroupType)319 StreamWriter& operator<<(StreamWriter& os, EndRowGroupType) {
320   os.EndRowGroup();
321   return os;
322 }
323 
324 }  // namespace parquet
325