1 // Licensed to the Apache Software Foundation (ASF) under one 2 // or more contributor license agreements. See the NOTICE file 3 // distributed with this work for additional information 4 // regarding copyright ownership. The ASF licenses this file 5 // to you under the Apache License, Version 2.0 (the 6 // "License"); you may not use this file except in compliance 7 // with the License. You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, 12 // software distributed under the License is distributed on an 13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 // KIND, either express or implied. See the License for the 15 // specific language governing permissions and limitations 16 // under the License. 17 18 #pragma once 19 20 #include <array> 21 #include <chrono> 22 #include <cstdint> 23 #include <cstring> 24 #include <memory> 25 #include <string> 26 #include <vector> 27 28 #include "arrow/util/optional.h" 29 #include "parquet/column_reader.h" 30 #include "parquet/file_reader.h" 31 #include "parquet/stream_writer.h" 32 33 namespace parquet { 34 35 /// \brief A class for reading Parquet files using an output stream type API. 36 /// 37 /// The values given must be of the correct type i.e. the type must 38 /// match the file schema exactly otherwise a ParquetException will be 39 /// thrown. 40 /// 41 /// The user must explicitly advance to the next row using the 42 /// EndRow() function or EndRow input manipulator. 43 /// 44 /// Required and optional fields are supported: 45 /// - Required fields are read using operator>>(T) 46 /// - Optional fields are read with 47 /// operator>>(arrow::util::optional<T>) 48 /// 49 /// Note that operator>>(arrow::util::optional<T>) can be used to read 50 /// required fields. 51 /// 52 /// Similarly operator>>(T) can be used to read optional fields. 53 /// However, if the value is not present then a ParquetException will 54 /// be raised. 55 /// 56 /// Currently there is no support for repeated fields. 57 /// 58 class PARQUET_EXPORT StreamReader { 59 public: 60 template <typename T> 61 using optional = ::arrow::util::optional<T>; 62 63 // N.B. Default constructed objects are not usable. This 64 // constructor is provided so that the object may be move 65 // assigned afterwards. 66 StreamReader() = default; 67 68 explicit StreamReader(std::unique_ptr<ParquetFileReader> reader); 69 70 ~StreamReader() = default; 71 eof()72 bool eof() const { return eof_; } 73 current_column()74 int current_column() const { return column_index_; } 75 current_row()76 int64_t current_row() const { return current_row_; } 77 78 int num_columns() const; 79 80 int64_t num_rows() const; 81 82 // Moving is possible. 83 StreamReader(StreamReader&&) = default; 84 StreamReader& operator=(StreamReader&&) = default; 85 86 // Copying is not allowed. 87 StreamReader(const StreamReader&) = delete; 88 StreamReader& operator=(const StreamReader&) = delete; 89 90 StreamReader& operator>>(bool& v); 91 92 StreamReader& operator>>(int8_t& v); 93 94 StreamReader& operator>>(uint8_t& v); 95 96 StreamReader& operator>>(int16_t& v); 97 98 StreamReader& operator>>(uint16_t& v); 99 100 StreamReader& operator>>(int32_t& v); 101 102 StreamReader& operator>>(uint32_t& v); 103 104 StreamReader& operator>>(int64_t& v); 105 106 StreamReader& operator>>(uint64_t& v); 107 108 StreamReader& operator>>(std::chrono::milliseconds& v); 109 110 StreamReader& operator>>(std::chrono::microseconds& v); 111 112 StreamReader& operator>>(float& v); 113 114 StreamReader& operator>>(double& v); 115 116 StreamReader& operator>>(char& v); 117 118 template <int N> 119 StreamReader& operator>>(char (&v)[N]) { 120 ReadFixedLength(v, N); 121 return *this; 122 } 123 124 template <std::size_t N> 125 StreamReader& operator>>(std::array<char, N>& v) { 126 ReadFixedLength(v.data(), static_cast<int>(N)); 127 return *this; 128 } 129 130 // N.B. Cannot allow for reading to a arbitrary char pointer as the 131 // length cannot be verified. Also it would overshadow the 132 // char[N] input operator. 133 // StreamReader& operator>>(char * v); 134 135 StreamReader& operator>>(std::string& v); 136 137 // Input operators for optional fields. 138 139 StreamReader& operator>>(optional<bool>& v); 140 141 StreamReader& operator>>(optional<int8_t>& v); 142 143 StreamReader& operator>>(optional<uint8_t>& v); 144 145 StreamReader& operator>>(optional<int16_t>& v); 146 147 StreamReader& operator>>(optional<uint16_t>& v); 148 149 StreamReader& operator>>(optional<int32_t>& v); 150 151 StreamReader& operator>>(optional<uint32_t>& v); 152 153 StreamReader& operator>>(optional<int64_t>& v); 154 155 StreamReader& operator>>(optional<uint64_t>& v); 156 157 StreamReader& operator>>(optional<float>& v); 158 159 StreamReader& operator>>(optional<double>& v); 160 161 StreamReader& operator>>(optional<std::chrono::milliseconds>& v); 162 163 StreamReader& operator>>(optional<std::chrono::microseconds>& v); 164 165 StreamReader& operator>>(optional<char>& v); 166 167 StreamReader& operator>>(optional<std::string>& v); 168 169 template <std::size_t N> 170 StreamReader& operator>>(optional<std::array<char, N>>& v) { 171 CheckColumn(Type::FIXED_LEN_BYTE_ARRAY, ConvertedType::NONE, N); 172 FixedLenByteArray flba; 173 if (ReadOptional(&flba)) { 174 v = std::array<char, N>{}; 175 std::memcpy(v->data(), flba.ptr, N); 176 } else { 177 v.reset(); 178 } 179 return *this; 180 } 181 182 /// \brief Terminate current row and advance to next one. 183 /// \throws ParquetException if all columns in the row were not 184 /// read or skipped. 185 void EndRow(); 186 187 /// \brief Skip the data in the next columns. 188 /// If the number of columns exceeds the columns remaining on the 189 /// current row then skipping is terminated - it does _not_ continue 190 /// skipping columns on the next row. 191 /// Skipping of columns still requires the use 'EndRow' even if all 192 /// remaining columns were skipped. 193 /// \return Number of columns actually skipped. 194 int64_t SkipColumns(int64_t num_columns_to_skip); 195 196 /// \brief Skip the data in the next rows. 197 /// Skipping of rows is not allowed if reading of data for the 198 /// current row is not finished. 199 /// Skipping of rows will be terminated if the end of file is 200 /// reached. 201 /// \return Number of rows actually skipped. 202 int64_t SkipRows(int64_t num_rows_to_skip); 203 204 protected: 205 [[noreturn]] void ThrowReadFailedException( 206 const std::shared_ptr<schema::PrimitiveNode>& node); 207 208 template <typename ReaderType, typename T> Read(T * v)209 void Read(T* v) { 210 const auto& node = nodes_[column_index_]; 211 auto reader = static_cast<ReaderType*>(column_readers_[column_index_++].get()); 212 int16_t def_level; 213 int16_t rep_level; 214 int64_t values_read; 215 216 reader->ReadBatch(kBatchSizeOne, &def_level, &rep_level, v, &values_read); 217 218 if (values_read != 1) { 219 ThrowReadFailedException(node); 220 } 221 } 222 223 template <typename ReaderType, typename ReadType, typename T> Read(T * v)224 void Read(T* v) { 225 const auto& node = nodes_[column_index_]; 226 auto reader = static_cast<ReaderType*>(column_readers_[column_index_++].get()); 227 int16_t def_level; 228 int16_t rep_level; 229 ReadType tmp; 230 int64_t values_read; 231 232 reader->ReadBatch(kBatchSizeOne, &def_level, &rep_level, &tmp, &values_read); 233 234 if (values_read == 1) { 235 *v = tmp; 236 } else { 237 ThrowReadFailedException(node); 238 } 239 } 240 241 template <typename ReaderType, typename ReadType = typename ReaderType::T, typename T> ReadOptional(optional<T> * v)242 void ReadOptional(optional<T>* v) { 243 const auto& node = nodes_[column_index_]; 244 auto reader = static_cast<ReaderType*>(column_readers_[column_index_++].get()); 245 int16_t def_level; 246 int16_t rep_level; 247 ReadType tmp; 248 int64_t values_read; 249 250 reader->ReadBatch(kBatchSizeOne, &def_level, &rep_level, &tmp, &values_read); 251 252 if (values_read == 1) { 253 *v = T(tmp); 254 } else if ((values_read == 0) && (def_level == 0)) { 255 v->reset(); 256 } else { 257 ThrowReadFailedException(node); 258 } 259 } 260 261 void ReadFixedLength(char* ptr, int len); 262 263 void Read(ByteArray* v); 264 265 void Read(FixedLenByteArray* v); 266 267 bool ReadOptional(ByteArray* v); 268 269 bool ReadOptional(FixedLenByteArray* v); 270 271 void NextRowGroup(); 272 273 void CheckColumn(Type::type physical_type, ConvertedType::type converted_type, 274 int length = 0); 275 276 void SkipRowsInColumn(ColumnReader* reader, int64_t num_rows_to_skip); 277 278 void SetEof(); 279 280 private: 281 std::unique_ptr<ParquetFileReader> file_reader_; 282 std::shared_ptr<FileMetaData> file_metadata_; 283 std::shared_ptr<RowGroupReader> row_group_reader_; 284 std::vector<std::shared_ptr<ColumnReader>> column_readers_; 285 std::vector<std::shared_ptr<schema::PrimitiveNode>> nodes_; 286 287 bool eof_{true}; 288 int row_group_index_{0}; 289 int column_index_{0}; 290 int64_t current_row_{0}; 291 int64_t row_group_row_offset_{0}; 292 293 static constexpr int64_t kBatchSizeOne = 1; 294 }; // namespace parquet 295 296 PARQUET_EXPORT 297 StreamReader& operator>>(StreamReader&, EndRowType); 298 299 } // namespace parquet 300