1 // Licensed to the Apache Software Foundation (ASF) under one 2 // or more contributor license agreements. See the NOTICE file 3 // distributed with this work for additional information 4 // regarding copyright ownership. The ASF licenses this file 5 // to you under the Apache License, Version 2.0 (the 6 // "License"); you may not use this file except in compliance 7 // with the License. You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, 12 // software distributed under the License is distributed on an 13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 // KIND, either express or implied. See the License for the 15 // specific language governing permissions and limitations 16 // under the License. 17 18 #pragma once 19 20 #include <array> 21 #include <chrono> 22 #include <cstdint> 23 #include <memory> 24 #include <string> 25 #include <vector> 26 27 #include "arrow/util/optional.h" 28 #include "arrow/util/string_view.h" 29 #include "parquet/column_writer.h" 30 #include "parquet/file_writer.h" 31 32 namespace parquet { 33 34 /// \brief A class for writing Parquet files using an output stream type API. 35 /// 36 /// The values given must be of the correct type i.e. the type must 37 /// match the file schema exactly otherwise a ParquetException will be 38 /// thrown. 39 /// 40 /// The user must explicitly indicate the end of the row using the 41 /// EndRow() function or EndRow output manipulator. 42 /// 43 /// A maximum row group size can be configured, the default size is 44 /// 512MB. Alternatively the row group size can be set to zero and the 45 /// user can create new row groups by calling the EndRowGroup() 46 /// function or using the EndRowGroup output manipulator. 47 /// 48 /// Required and optional fields are supported: 49 /// - Required fields are written using operator<<(T) 50 /// - Optional fields are written using 51 /// operator<<(arrow::util::optional<T>). 52 /// 53 /// Note that operator<<(T) can be used to write optional fields. 54 /// 55 /// Similarly, operator<<(arrow::util::optional<T>) can be used to 56 /// write required fields. However if the optional parameter does not 57 /// have a value (i.e. it is nullopt) then a ParquetException will be 58 /// raised. 59 /// 60 /// Currently there is no support for repeated fields. 61 /// 62 class PARQUET_EXPORT StreamWriter { 63 public: 64 template <typename T> 65 using optional = ::arrow::util::optional<T>; 66 67 // N.B. Default constructed objects are not usable. This 68 // constructor is provided so that the object may be move 69 // assigned afterwards. 70 StreamWriter() = default; 71 72 explicit StreamWriter(std::unique_ptr<ParquetFileWriter> writer); 73 74 ~StreamWriter() = default; 75 76 static void SetDefaultMaxRowGroupSize(int64_t max_size); 77 78 void SetMaxRowGroupSize(int64_t max_size); 79 current_column()80 int current_column() const { return column_index_; } 81 current_row()82 int64_t current_row() const { return current_row_; } 83 84 int num_columns() const; 85 86 // Moving is possible. 87 StreamWriter(StreamWriter&&) = default; 88 StreamWriter& operator=(StreamWriter&&) = default; 89 90 // Copying is not allowed. 91 StreamWriter(const StreamWriter&) = delete; 92 StreamWriter& operator=(const StreamWriter&) = delete; 93 94 /// \brief Output operators for required fields. 95 /// These can also be used for optional fields when a value must be set. 96 StreamWriter& operator<<(bool v); 97 98 StreamWriter& operator<<(int8_t v); 99 100 StreamWriter& operator<<(uint8_t v); 101 102 StreamWriter& operator<<(int16_t v); 103 104 StreamWriter& operator<<(uint16_t v); 105 106 StreamWriter& operator<<(int32_t v); 107 108 StreamWriter& operator<<(uint32_t v); 109 110 StreamWriter& operator<<(int64_t v); 111 112 StreamWriter& operator<<(uint64_t v); 113 114 StreamWriter& operator<<(const std::chrono::milliseconds& v); 115 116 StreamWriter& operator<<(const std::chrono::microseconds& v); 117 118 StreamWriter& operator<<(float v); 119 120 StreamWriter& operator<<(double v); 121 122 StreamWriter& operator<<(char v); 123 124 /// \brief Helper class to write fixed length strings. 125 /// This is useful as the standard string view (such as 126 /// arrow::util::string_view) is for variable length data. 127 struct PARQUET_EXPORT FixedStringView { 128 FixedStringView() = default; 129 130 explicit FixedStringView(const char* data_ptr); 131 132 FixedStringView(const char* data_ptr, std::size_t data_len); 133 134 const char* data{NULLPTR}; 135 std::size_t size{0}; 136 }; 137 138 /// \brief Output operators for fixed length strings. 139 template <int N> 140 StreamWriter& operator<<(const char (&v)[N]) { 141 return WriteFixedLength(v, N); 142 } 143 template <std::size_t N> 144 StreamWriter& operator<<(const std::array<char, N>& v) { 145 return WriteFixedLength(v.data(), N); 146 } 147 StreamWriter& operator<<(FixedStringView v); 148 149 /// \brief Output operators for variable length strings. 150 StreamWriter& operator<<(const char* v); 151 StreamWriter& operator<<(const std::string& v); 152 StreamWriter& operator<<(::arrow::util::string_view v); 153 154 /// \brief Output operator for optional fields. 155 template <typename T> 156 StreamWriter& operator<<(const optional<T>& v) { 157 if (v) { 158 return operator<<(*v); 159 } 160 SkipOptionalColumn(); 161 return *this; 162 } 163 164 /// \brief Skip the next N columns of optional data. If there are 165 /// less than N columns remaining then the excess columns are 166 /// ignored. 167 /// \throws ParquetException if there is an attempt to skip any 168 /// required column. 169 /// \return Number of columns actually skipped. 170 int64_t SkipColumns(int num_columns_to_skip); 171 172 /// \brief Terminate the current row and advance to next one. 173 /// \throws ParquetException if all columns in the row were not 174 /// written or skipped. 175 void EndRow(); 176 177 /// \brief Terminate the current row group and create new one. 178 void EndRowGroup(); 179 180 protected: 181 template <typename WriterType, typename T> Write(const T v)182 StreamWriter& Write(const T v) { 183 auto writer = static_cast<WriterType*>(row_group_writer_->column(column_index_++)); 184 185 writer->WriteBatch(kBatchSizeOne, &kDefLevelOne, &kRepLevelZero, &v); 186 187 if (max_row_group_size_ > 0) { 188 row_group_size_ += writer->EstimatedBufferedValueBytes(); 189 } 190 return *this; 191 } 192 193 StreamWriter& WriteVariableLength(const char* data_ptr, std::size_t data_len); 194 195 StreamWriter& WriteFixedLength(const char* data_ptr, std::size_t data_len); 196 197 void CheckColumn(Type::type physical_type, ConvertedType::type converted_type, 198 int length = -1); 199 200 /// \brief Skip the next column which must be optional. 201 /// \throws ParquetException if the next column does not exist or is 202 /// not optional. 203 void SkipOptionalColumn(); 204 205 void WriteNullValue(ColumnWriter* writer); 206 207 private: 208 using node_ptr_type = std::shared_ptr<schema::PrimitiveNode>; 209 210 struct null_deleter { operatornull_deleter211 void operator()(void*) {} 212 }; 213 214 int32_t column_index_{0}; 215 int64_t current_row_{0}; 216 int64_t row_group_size_{0}; 217 int64_t max_row_group_size_{default_row_group_size_}; 218 219 std::unique_ptr<ParquetFileWriter> file_writer_; 220 std::unique_ptr<RowGroupWriter, null_deleter> row_group_writer_; 221 std::vector<node_ptr_type> nodes_; 222 223 static constexpr int16_t kDefLevelZero = 0; 224 static constexpr int16_t kDefLevelOne = 1; 225 static constexpr int16_t kRepLevelZero = 0; 226 static constexpr int64_t kBatchSizeOne = 1; 227 228 static int64_t default_row_group_size_; 229 }; 230 231 struct PARQUET_EXPORT EndRowType {}; 232 constexpr EndRowType EndRow = {}; 233 234 struct PARQUET_EXPORT EndRowGroupType {}; 235 constexpr EndRowGroupType EndRowGroup = {}; 236 237 PARQUET_EXPORT 238 StreamWriter& operator<<(StreamWriter&, EndRowType); 239 240 PARQUET_EXPORT 241 StreamWriter& operator<<(StreamWriter&, EndRowGroupType); 242 243 } // namespace parquet 244