1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #pragma once
19 
20 #include <array>
21 #include <chrono>
22 #include <cstdint>
23 #include <memory>
24 #include <string>
25 #include <vector>
26 
27 #include "arrow/util/optional.h"
28 #include "arrow/util/string_view.h"
29 #include "parquet/column_writer.h"
30 #include "parquet/file_writer.h"
31 
32 namespace parquet {
33 
34 /// \brief A class for writing Parquet files using an output stream type API.
35 ///
36 /// The values given must be of the correct type i.e. the type must
37 /// match the file schema exactly otherwise a ParquetException will be
38 /// thrown.
39 ///
40 /// The user must explicitly indicate the end of the row using the
41 /// EndRow() function or EndRow output manipulator.
42 ///
43 /// A maximum row group size can be configured, the default size is
44 /// 512MB.  Alternatively the row group size can be set to zero and the
45 /// user can create new row groups by calling the EndRowGroup()
46 /// function or using the EndRowGroup output manipulator.
47 ///
48 /// Required and optional fields are supported:
49 /// - Required fields are written using operator<<(T)
50 /// - Optional fields are written using
51 ///   operator<<(arrow::util::optional<T>).
52 ///
53 /// Note that operator<<(T) can be used to write optional fields.
54 ///
55 /// Similarly, operator<<(arrow::util::optional<T>) can be used to
56 /// write required fields.  However if the optional parameter does not
57 /// have a value (i.e. it is nullopt) then a ParquetException will be
58 /// raised.
59 ///
60 /// Currently there is no support for repeated fields.
61 ///
62 class PARQUET_EXPORT StreamWriter {
63  public:
64   template <typename T>
65   using optional = ::arrow::util::optional<T>;
66 
67   // N.B. Default constructed objects are not usable.  This
68   //      constructor is provided so that the object may be move
69   //      assigned afterwards.
70   StreamWriter() = default;
71 
72   explicit StreamWriter(std::unique_ptr<ParquetFileWriter> writer);
73 
74   ~StreamWriter() = default;
75 
76   static void SetDefaultMaxRowGroupSize(int64_t max_size);
77 
78   void SetMaxRowGroupSize(int64_t max_size);
79 
current_column()80   int current_column() const { return column_index_; }
81 
current_row()82   int64_t current_row() const { return current_row_; }
83 
84   int num_columns() const;
85 
86   // Moving is possible.
87   StreamWriter(StreamWriter&&) = default;
88   StreamWriter& operator=(StreamWriter&&) = default;
89 
90   // Copying is not allowed.
91   StreamWriter(const StreamWriter&) = delete;
92   StreamWriter& operator=(const StreamWriter&) = delete;
93 
94   /// \brief Output operators for required fields.
95   /// These can also be used for optional fields when a value must be set.
96   StreamWriter& operator<<(bool v);
97 
98   StreamWriter& operator<<(int8_t v);
99 
100   StreamWriter& operator<<(uint8_t v);
101 
102   StreamWriter& operator<<(int16_t v);
103 
104   StreamWriter& operator<<(uint16_t v);
105 
106   StreamWriter& operator<<(int32_t v);
107 
108   StreamWriter& operator<<(uint32_t v);
109 
110   StreamWriter& operator<<(int64_t v);
111 
112   StreamWriter& operator<<(uint64_t v);
113 
114   StreamWriter& operator<<(const std::chrono::milliseconds& v);
115 
116   StreamWriter& operator<<(const std::chrono::microseconds& v);
117 
118   StreamWriter& operator<<(float v);
119 
120   StreamWriter& operator<<(double v);
121 
122   StreamWriter& operator<<(char v);
123 
124   /// \brief Helper class to write fixed length strings.
125   /// This is useful as the standard string view (such as
126   /// arrow::util::string_view) is for variable length data.
127   struct PARQUET_EXPORT FixedStringView {
128     FixedStringView() = default;
129 
130     explicit FixedStringView(const char* data_ptr);
131 
132     FixedStringView(const char* data_ptr, std::size_t data_len);
133 
134     const char* data{NULLPTR};
135     std::size_t size{0};
136   };
137 
138   /// \brief Output operators for fixed length strings.
139   template <int N>
140   StreamWriter& operator<<(const char (&v)[N]) {
141     return WriteFixedLength(v, N);
142   }
143   template <std::size_t N>
144   StreamWriter& operator<<(const std::array<char, N>& v) {
145     return WriteFixedLength(v.data(), N);
146   }
147   StreamWriter& operator<<(FixedStringView v);
148 
149   /// \brief Output operators for variable length strings.
150   StreamWriter& operator<<(const char* v);
151   StreamWriter& operator<<(const std::string& v);
152   StreamWriter& operator<<(::arrow::util::string_view v);
153 
154   /// \brief Output operator for optional fields.
155   template <typename T>
156   StreamWriter& operator<<(const optional<T>& v) {
157     if (v) {
158       return operator<<(*v);
159     }
160     SkipOptionalColumn();
161     return *this;
162   }
163 
164   /// \brief Skip the next N columns of optional data.  If there are
165   /// less than N columns remaining then the excess columns are
166   /// ignored.
167   /// \throws ParquetException if there is an attempt to skip any
168   /// required column.
169   /// \return Number of columns actually skipped.
170   int64_t SkipColumns(int num_columns_to_skip);
171 
172   /// \brief Terminate the current row and advance to next one.
173   /// \throws ParquetException if all columns in the row were not
174   /// written or skipped.
175   void EndRow();
176 
177   /// \brief Terminate the current row group and create new one.
178   void EndRowGroup();
179 
180  protected:
181   template <typename WriterType, typename T>
Write(const T v)182   StreamWriter& Write(const T v) {
183     auto writer = static_cast<WriterType*>(row_group_writer_->column(column_index_++));
184 
185     writer->WriteBatch(kBatchSizeOne, &kDefLevelOne, &kRepLevelZero, &v);
186 
187     if (max_row_group_size_ > 0) {
188       row_group_size_ += writer->EstimatedBufferedValueBytes();
189     }
190     return *this;
191   }
192 
193   StreamWriter& WriteVariableLength(const char* data_ptr, std::size_t data_len);
194 
195   StreamWriter& WriteFixedLength(const char* data_ptr, std::size_t data_len);
196 
197   void CheckColumn(Type::type physical_type, ConvertedType::type converted_type,
198                    int length = -1);
199 
200   /// \brief Skip the next column which must be optional.
201   /// \throws ParquetException if the next column does not exist or is
202   /// not optional.
203   void SkipOptionalColumn();
204 
205   void WriteNullValue(ColumnWriter* writer);
206 
207  private:
208   using node_ptr_type = std::shared_ptr<schema::PrimitiveNode>;
209 
210   struct null_deleter {
operatornull_deleter211     void operator()(void*) {}
212   };
213 
214   int32_t column_index_{0};
215   int64_t current_row_{0};
216   int64_t row_group_size_{0};
217   int64_t max_row_group_size_{default_row_group_size_};
218 
219   std::unique_ptr<ParquetFileWriter> file_writer_;
220   std::unique_ptr<RowGroupWriter, null_deleter> row_group_writer_;
221   std::vector<node_ptr_type> nodes_;
222 
223   static constexpr int16_t kDefLevelZero = 0;
224   static constexpr int16_t kDefLevelOne = 1;
225   static constexpr int16_t kRepLevelZero = 0;
226   static constexpr int64_t kBatchSizeOne = 1;
227 
228   static int64_t default_row_group_size_;
229 };
230 
231 struct PARQUET_EXPORT EndRowType {};
232 constexpr EndRowType EndRow = {};
233 
234 struct PARQUET_EXPORT EndRowGroupType {};
235 constexpr EndRowGroupType EndRowGroup = {};
236 
237 PARQUET_EXPORT
238 StreamWriter& operator<<(StreamWriter&, EndRowType);
239 
240 PARQUET_EXPORT
241 StreamWriter& operator<<(StreamWriter&, EndRowGroupType);
242 
243 }  // namespace parquet
244