1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #pragma once
19 
20 #include <array>
21 #include <chrono>
22 #include <cstdint>
23 #include <cstring>
24 #include <memory>
25 #include <string>
26 #include <vector>
27 
28 #include "arrow/util/optional.h"
29 #include "parquet/column_reader.h"
30 #include "parquet/file_reader.h"
31 #include "parquet/stream_writer.h"
32 
33 namespace parquet {
34 
35 /// \brief A class for reading Parquet files using an output stream type API.
36 ///
37 /// The values given must be of the correct type i.e. the type must
38 /// match the file schema exactly otherwise a ParquetException will be
39 /// thrown.
40 ///
41 /// The user must explicitly advance to the next row using the
42 /// EndRow() function or EndRow input manipulator.
43 ///
44 /// Required and optional fields are supported:
45 /// - Required fields are read using operator>>(T)
46 /// - Optional fields are read with
47 ///   operator>>(arrow::util::optional<T>)
48 ///
49 /// Note that operator>>(arrow::util::optional<T>) can be used to read
50 /// required fields.
51 ///
52 /// Similarly operator>>(T) can be used to read optional fields.
53 /// However, if the value is not present then a ParquetException will
54 /// be raised.
55 ///
56 /// Currently there is no support for repeated fields.
57 ///
58 class PARQUET_EXPORT StreamReader {
59  public:
60   template <typename T>
61   using optional = ::arrow::util::optional<T>;
62 
63   // N.B. Default constructed objects are not usable.  This
64   //      constructor is provided so that the object may be move
65   //      assigned afterwards.
66   StreamReader() = default;
67 
68   explicit StreamReader(std::unique_ptr<ParquetFileReader> reader);
69 
70   ~StreamReader() = default;
71 
eof()72   bool eof() const { return eof_; }
73 
current_column()74   int current_column() const { return column_index_; }
75 
current_row()76   int64_t current_row() const { return current_row_; }
77 
78   int num_columns() const;
79 
80   int64_t num_rows() const;
81 
82   // Moving is possible.
83   StreamReader(StreamReader&&) = default;
84   StreamReader& operator=(StreamReader&&) = default;
85 
86   // Copying is not allowed.
87   StreamReader(const StreamReader&) = delete;
88   StreamReader& operator=(const StreamReader&) = delete;
89 
90   StreamReader& operator>>(bool& v);
91 
92   StreamReader& operator>>(int8_t& v);
93 
94   StreamReader& operator>>(uint8_t& v);
95 
96   StreamReader& operator>>(int16_t& v);
97 
98   StreamReader& operator>>(uint16_t& v);
99 
100   StreamReader& operator>>(int32_t& v);
101 
102   StreamReader& operator>>(uint32_t& v);
103 
104   StreamReader& operator>>(int64_t& v);
105 
106   StreamReader& operator>>(uint64_t& v);
107 
108   StreamReader& operator>>(std::chrono::milliseconds& v);
109 
110   StreamReader& operator>>(std::chrono::microseconds& v);
111 
112   StreamReader& operator>>(float& v);
113 
114   StreamReader& operator>>(double& v);
115 
116   StreamReader& operator>>(char& v);
117 
118   template <int N>
119   StreamReader& operator>>(char (&v)[N]) {
120     ReadFixedLength(v, N);
121     return *this;
122   }
123 
124   template <std::size_t N>
125   StreamReader& operator>>(std::array<char, N>& v) {
126     ReadFixedLength(v.data(), static_cast<int>(N));
127     return *this;
128   }
129 
130   // N.B. Cannot allow for reading to a arbitrary char pointer as the
131   //      length cannot be verified.  Also it would overshadow the
132   //      char[N] input operator.
133   // StreamReader& operator>>(char * v);
134 
135   StreamReader& operator>>(std::string& v);
136 
137   // Input operators for optional fields.
138 
139   StreamReader& operator>>(optional<bool>& v);
140 
141   StreamReader& operator>>(optional<int8_t>& v);
142 
143   StreamReader& operator>>(optional<uint8_t>& v);
144 
145   StreamReader& operator>>(optional<int16_t>& v);
146 
147   StreamReader& operator>>(optional<uint16_t>& v);
148 
149   StreamReader& operator>>(optional<int32_t>& v);
150 
151   StreamReader& operator>>(optional<uint32_t>& v);
152 
153   StreamReader& operator>>(optional<int64_t>& v);
154 
155   StreamReader& operator>>(optional<uint64_t>& v);
156 
157   StreamReader& operator>>(optional<float>& v);
158 
159   StreamReader& operator>>(optional<double>& v);
160 
161   StreamReader& operator>>(optional<std::chrono::milliseconds>& v);
162 
163   StreamReader& operator>>(optional<std::chrono::microseconds>& v);
164 
165   StreamReader& operator>>(optional<char>& v);
166 
167   StreamReader& operator>>(optional<std::string>& v);
168 
169   template <std::size_t N>
170   StreamReader& operator>>(optional<std::array<char, N>>& v) {
171     CheckColumn(Type::FIXED_LEN_BYTE_ARRAY, ConvertedType::NONE, N);
172     FixedLenByteArray flba;
173     if (ReadOptional(&flba)) {
174       v = std::array<char, N>{};
175       std::memcpy(v->data(), flba.ptr, N);
176     } else {
177       v.reset();
178     }
179     return *this;
180   }
181 
182   /// \brief Terminate current row and advance to next one.
183   /// \throws ParquetException if all columns in the row were not
184   /// read or skipped.
185   void EndRow();
186 
187   /// \brief Skip the data in the next columns.
188   /// If the number of columns exceeds the columns remaining on the
189   /// current row then skipping is terminated - it does _not_ continue
190   /// skipping columns on the next row.
191   /// Skipping of columns still requires the use 'EndRow' even if all
192   /// remaining columns were skipped.
193   /// \return Number of columns actually skipped.
194   int64_t SkipColumns(int64_t num_columns_to_skip);
195 
196   /// \brief Skip the data in the next rows.
197   /// Skipping of rows is not allowed if reading of data for the
198   /// current row is not finished.
199   /// Skipping of rows will be terminated if the end of file is
200   /// reached.
201   /// \return Number of rows actually skipped.
202   int64_t SkipRows(int64_t num_rows_to_skip);
203 
204  protected:
205   [[noreturn]] void ThrowReadFailedException(
206       const std::shared_ptr<schema::PrimitiveNode>& node);
207 
208   template <typename ReaderType, typename T>
Read(T * v)209   void Read(T* v) {
210     const auto& node = nodes_[column_index_];
211     auto reader = static_cast<ReaderType*>(column_readers_[column_index_++].get());
212     int16_t def_level;
213     int16_t rep_level;
214     int64_t values_read;
215 
216     reader->ReadBatch(kBatchSizeOne, &def_level, &rep_level, v, &values_read);
217 
218     if (values_read != 1) {
219       ThrowReadFailedException(node);
220     }
221   }
222 
223   template <typename ReaderType, typename ReadType, typename T>
Read(T * v)224   void Read(T* v) {
225     const auto& node = nodes_[column_index_];
226     auto reader = static_cast<ReaderType*>(column_readers_[column_index_++].get());
227     int16_t def_level;
228     int16_t rep_level;
229     ReadType tmp;
230     int64_t values_read;
231 
232     reader->ReadBatch(kBatchSizeOne, &def_level, &rep_level, &tmp, &values_read);
233 
234     if (values_read == 1) {
235       *v = tmp;
236     } else {
237       ThrowReadFailedException(node);
238     }
239   }
240 
241   template <typename ReaderType, typename ReadType = typename ReaderType::T, typename T>
ReadOptional(optional<T> * v)242   void ReadOptional(optional<T>* v) {
243     const auto& node = nodes_[column_index_];
244     auto reader = static_cast<ReaderType*>(column_readers_[column_index_++].get());
245     int16_t def_level;
246     int16_t rep_level;
247     ReadType tmp;
248     int64_t values_read;
249 
250     reader->ReadBatch(kBatchSizeOne, &def_level, &rep_level, &tmp, &values_read);
251 
252     if (values_read == 1) {
253       *v = T(tmp);
254     } else if ((values_read == 0) && (def_level == 0)) {
255       v->reset();
256     } else {
257       ThrowReadFailedException(node);
258     }
259   }
260 
261   void ReadFixedLength(char* ptr, int len);
262 
263   void Read(ByteArray* v);
264 
265   void Read(FixedLenByteArray* v);
266 
267   bool ReadOptional(ByteArray* v);
268 
269   bool ReadOptional(FixedLenByteArray* v);
270 
271   void NextRowGroup();
272 
273   void CheckColumn(Type::type physical_type, ConvertedType::type converted_type,
274                    int length = 0);
275 
276   void SkipRowsInColumn(ColumnReader* reader, int64_t num_rows_to_skip);
277 
278   void SetEof();
279 
280  private:
281   std::unique_ptr<ParquetFileReader> file_reader_;
282   std::shared_ptr<FileMetaData> file_metadata_;
283   std::shared_ptr<RowGroupReader> row_group_reader_;
284   std::vector<std::shared_ptr<ColumnReader>> column_readers_;
285   std::vector<std::shared_ptr<schema::PrimitiveNode>> nodes_;
286 
287   bool eof_{true};
288   int row_group_index_{0};
289   int column_index_{0};
290   int64_t current_row_{0};
291   int64_t row_group_row_offset_{0};
292 
293   static constexpr int64_t kBatchSizeOne = 1;
294 };  // namespace parquet
295 
296 PARQUET_EXPORT
297 StreamReader& operator>>(StreamReader&, EndRowType);
298 
299 }  // namespace parquet
300