1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 // Read Arrow files and streams
19 
20 #pragma once
21 
22 #include <cstddef>
23 #include <cstdint>
24 #include <memory>
25 #include <utility>
26 #include <vector>
27 
28 #include "arrow/io/caching.h"
29 #include "arrow/io/type_fwd.h"
30 #include "arrow/ipc/message.h"
31 #include "arrow/ipc/options.h"
32 #include "arrow/record_batch.h"
33 #include "arrow/result.h"
34 #include "arrow/type_fwd.h"
35 #include "arrow/util/async_generator.h"
36 #include "arrow/util/macros.h"
37 #include "arrow/util/visibility.h"
38 
39 namespace arrow {
40 namespace ipc {
41 
42 class DictionaryMemo;
43 struct IpcPayload;
44 
45 using RecordBatchReader = ::arrow::RecordBatchReader;
46 
47 struct ReadStats {
48   /// Number of IPC messages read.
49   int64_t num_messages = 0;
50   /// Number of record batches read.
51   int64_t num_record_batches = 0;
52   /// Number of dictionary batches read.
53   ///
54   /// Note: num_dictionary_batches >= num_dictionary_deltas + num_replaced_dictionaries
55   int64_t num_dictionary_batches = 0;
56 
57   /// Number of dictionary deltas read.
58   int64_t num_dictionary_deltas = 0;
59   /// Number of replaced dictionaries (i.e. where a dictionary batch replaces
60   /// an existing dictionary with an unrelated new dictionary).
61   int64_t num_replaced_dictionaries = 0;
62 };
63 
64 /// \brief Synchronous batch stream reader that reads from io::InputStream
65 ///
66 /// This class reads the schema (plus any dictionaries) as the first messages
67 /// in the stream, followed by record batches. For more granular zero-copy
68 /// reads see the ReadRecordBatch functions
69 class ARROW_EXPORT RecordBatchStreamReader : public RecordBatchReader {
70  public:
71   /// Create batch reader from generic MessageReader.
72   /// This will take ownership of the given MessageReader.
73   ///
74   /// \param[in] message_reader a MessageReader implementation
75   /// \param[in] options any IPC reading options (optional)
76   /// \return the created batch reader
77   static Result<std::shared_ptr<RecordBatchStreamReader>> Open(
78       std::unique_ptr<MessageReader> message_reader,
79       const IpcReadOptions& options = IpcReadOptions::Defaults());
80 
81   /// \brief Record batch stream reader from InputStream
82   ///
83   /// \param[in] stream an input stream instance. Must stay alive throughout
84   /// lifetime of stream reader
85   /// \param[in] options any IPC reading options (optional)
86   /// \return the created batch reader
87   static Result<std::shared_ptr<RecordBatchStreamReader>> Open(
88       io::InputStream* stream,
89       const IpcReadOptions& options = IpcReadOptions::Defaults());
90 
91   /// \brief Open stream and retain ownership of stream object
92   /// \param[in] stream the input stream
93   /// \param[in] options any IPC reading options (optional)
94   /// \return the created batch reader
95   static Result<std::shared_ptr<RecordBatchStreamReader>> Open(
96       const std::shared_ptr<io::InputStream>& stream,
97       const IpcReadOptions& options = IpcReadOptions::Defaults());
98 
99   /// \brief Return current read statistics
100   virtual ReadStats stats() const = 0;
101 };
102 
103 /// \brief Reads the record batch file format
104 class ARROW_EXPORT RecordBatchFileReader
105     : public std::enable_shared_from_this<RecordBatchFileReader> {
106  public:
107   virtual ~RecordBatchFileReader() = default;
108 
109   /// \brief Open a RecordBatchFileReader
110   ///
111   /// Open a file-like object that is assumed to be self-contained; i.e., the
112   /// end of the file interface is the end of the Arrow file. Note that there
113   /// can be any amount of data preceding the Arrow-formatted data, because we
114   /// need only locate the end of the Arrow file stream to discover the metadata
115   /// and then proceed to read the data into memory.
116   static Result<std::shared_ptr<RecordBatchFileReader>> Open(
117       io::RandomAccessFile* file,
118       const IpcReadOptions& options = IpcReadOptions::Defaults());
119 
120   /// \brief Open a RecordBatchFileReader
121   /// If the file is embedded within some larger file or memory region, you can
122   /// pass the absolute memory offset to the end of the file (which contains the
123   /// metadata footer). The metadata must have been written with memory offsets
124   /// relative to the start of the containing file
125   ///
126   /// \param[in] file the data source
127   /// \param[in] footer_offset the position of the end of the Arrow file
128   /// \param[in] options options for IPC reading
129   /// \return the returned reader
130   static Result<std::shared_ptr<RecordBatchFileReader>> Open(
131       io::RandomAccessFile* file, int64_t footer_offset,
132       const IpcReadOptions& options = IpcReadOptions::Defaults());
133 
134   /// \brief Version of Open that retains ownership of file
135   ///
136   /// \param[in] file the data source
137   /// \param[in] options options for IPC reading
138   /// \return the returned reader
139   static Result<std::shared_ptr<RecordBatchFileReader>> Open(
140       const std::shared_ptr<io::RandomAccessFile>& file,
141       const IpcReadOptions& options = IpcReadOptions::Defaults());
142 
143   /// \brief Version of Open that retains ownership of file
144   ///
145   /// \param[in] file the data source
146   /// \param[in] footer_offset the position of the end of the Arrow file
147   /// \param[in] options options for IPC reading
148   /// \return the returned reader
149   static Result<std::shared_ptr<RecordBatchFileReader>> Open(
150       const std::shared_ptr<io::RandomAccessFile>& file, int64_t footer_offset,
151       const IpcReadOptions& options = IpcReadOptions::Defaults());
152 
153   /// \brief Open a file asynchronously (owns the file).
154   static Future<std::shared_ptr<RecordBatchFileReader>> OpenAsync(
155       const std::shared_ptr<io::RandomAccessFile>& file,
156       const IpcReadOptions& options = IpcReadOptions::Defaults());
157 
158   /// \brief Open a file asynchronously (borrows the file).
159   static Future<std::shared_ptr<RecordBatchFileReader>> OpenAsync(
160       io::RandomAccessFile* file,
161       const IpcReadOptions& options = IpcReadOptions::Defaults());
162 
163   /// \brief Open a file asynchronously (owns the file).
164   static Future<std::shared_ptr<RecordBatchFileReader>> OpenAsync(
165       const std::shared_ptr<io::RandomAccessFile>& file, int64_t footer_offset,
166       const IpcReadOptions& options = IpcReadOptions::Defaults());
167 
168   /// \brief Open a file asynchronously (borrows the file).
169   static Future<std::shared_ptr<RecordBatchFileReader>> OpenAsync(
170       io::RandomAccessFile* file, int64_t footer_offset,
171       const IpcReadOptions& options = IpcReadOptions::Defaults());
172 
173   /// \brief The schema read from the file
174   virtual std::shared_ptr<Schema> schema() const = 0;
175 
176   /// \brief Returns the number of record batches in the file
177   virtual int num_record_batches() const = 0;
178 
179   /// \brief Return the metadata version from the file metadata
180   virtual MetadataVersion version() const = 0;
181 
182   /// \brief Return the contents of the custom_metadata field from the file's
183   /// Footer
184   virtual std::shared_ptr<const KeyValueMetadata> metadata() const = 0;
185 
186   /// \brief Read a particular record batch from the file. Does not copy memory
187   /// if the input source supports zero-copy.
188   ///
189   /// \param[in] i the index of the record batch to return
190   /// \return the read batch
191   virtual Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(int i) = 0;
192 
193   /// \brief Return current read statistics
194   virtual ReadStats stats() const = 0;
195 
196   /// \brief Computes the total number of rows in the file.
197   virtual Result<int64_t> CountRows() = 0;
198 
199   /// \brief Get a reentrant generator of record batches.
200   ///
201   /// \param[in] coalesce If true, enable I/O coalescing.
202   /// \param[in] io_context The IOContext to use (controls which thread pool
203   ///     is used for I/O).
204   /// \param[in] cache_options Options for coalescing (if enabled).
205   /// \param[in] executor Optionally, an executor to use for decoding record
206   ///     batches. This is generally only a benefit for very wide and/or
207   ///     compressed batches.
208   virtual Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> GetRecordBatchGenerator(
209       const bool coalesce = false,
210       const io::IOContext& io_context = io::default_io_context(),
211       const io::CacheOptions cache_options = io::CacheOptions::LazyDefaults(),
212       arrow::internal::Executor* executor = NULLPTR) = 0;
213 };
214 
215 /// \brief A general listener class to receive events.
216 ///
217 /// You must implement callback methods for interested events.
218 ///
219 /// This API is EXPERIMENTAL.
220 ///
221 /// \since 0.17.0
222 class ARROW_EXPORT Listener {
223  public:
224   virtual ~Listener() = default;
225 
226   /// \brief Called when end-of-stream is received.
227   ///
228   /// The default implementation just returns arrow::Status::OK().
229   ///
230   /// \return Status
231   ///
232   /// \see StreamDecoder
233   virtual Status OnEOS();
234 
235   /// \brief Called when a record batch is decoded.
236   ///
237   /// The default implementation just returns
238   /// arrow::Status::NotImplemented().
239   ///
240   /// \param[in] record_batch a record batch decoded
241   /// \return Status
242   ///
243   /// \see StreamDecoder
244   virtual Status OnRecordBatchDecoded(std::shared_ptr<RecordBatch> record_batch);
245 
246   /// \brief Called when a schema is decoded.
247   ///
248   /// The default implementation just returns arrow::Status::OK().
249   ///
250   /// \param[in] schema a schema decoded
251   /// \return Status
252   ///
253   /// \see StreamDecoder
254   virtual Status OnSchemaDecoded(std::shared_ptr<Schema> schema);
255 };
256 
257 /// \brief Collect schema and record batches decoded by StreamDecoder.
258 ///
259 /// This API is EXPERIMENTAL.
260 ///
261 /// \since 0.17.0
262 class ARROW_EXPORT CollectListener : public Listener {
263  public:
CollectListener()264   CollectListener() : schema_(), record_batches_() {}
265   virtual ~CollectListener() = default;
266 
OnSchemaDecoded(std::shared_ptr<Schema> schema)267   Status OnSchemaDecoded(std::shared_ptr<Schema> schema) override {
268     schema_ = std::move(schema);
269     return Status::OK();
270   }
271 
OnRecordBatchDecoded(std::shared_ptr<RecordBatch> record_batch)272   Status OnRecordBatchDecoded(std::shared_ptr<RecordBatch> record_batch) override {
273     record_batches_.push_back(std::move(record_batch));
274     return Status::OK();
275   }
276 
277   /// \return the decoded schema
schema()278   std::shared_ptr<Schema> schema() const { return schema_; }
279 
280   /// \return the all decoded record batches
record_batches()281   std::vector<std::shared_ptr<RecordBatch>> record_batches() const {
282     return record_batches_;
283   }
284 
285  private:
286   std::shared_ptr<Schema> schema_;
287   std::vector<std::shared_ptr<RecordBatch>> record_batches_;
288 };
289 
290 /// \brief Push style stream decoder that receives data from user.
291 ///
292 /// This class decodes the Apache Arrow IPC streaming format data.
293 ///
294 /// This API is EXPERIMENTAL.
295 ///
296 /// \see https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
297 ///
298 /// \since 0.17.0
299 class ARROW_EXPORT StreamDecoder {
300  public:
301   /// \brief Construct a stream decoder.
302   ///
303   /// \param[in] listener a Listener that must implement
304   /// Listener::OnRecordBatchDecoded() to receive decoded record batches
305   /// \param[in] options any IPC reading options (optional)
306   StreamDecoder(std::shared_ptr<Listener> listener,
307                 IpcReadOptions options = IpcReadOptions::Defaults());
308 
309   virtual ~StreamDecoder();
310 
311   /// \brief Feed data to the decoder as a raw data.
312   ///
313   /// If the decoder can read one or more record batches by the data,
314   /// the decoder calls listener->OnRecordBatchDecoded() with a
315   /// decoded record batch multiple times.
316   ///
317   /// \param[in] data a raw data to be processed. This data isn't
318   /// copied. The passed memory must be kept alive through record
319   /// batch processing.
320   /// \param[in] size raw data size.
321   /// \return Status
322   Status Consume(const uint8_t* data, int64_t size);
323 
324   /// \brief Feed data to the decoder as a Buffer.
325   ///
326   /// If the decoder can read one or more record batches by the
327   /// Buffer, the decoder calls listener->RecordBatchReceived() with a
328   /// decoded record batch multiple times.
329   ///
330   /// \param[in] buffer a Buffer to be processed.
331   /// \return Status
332   Status Consume(std::shared_ptr<Buffer> buffer);
333 
334   /// \return the shared schema of the record batches in the stream
335   std::shared_ptr<Schema> schema() const;
336 
337   /// \brief Return the number of bytes needed to advance the state of
338   /// the decoder.
339   ///
340   /// This method is provided for users who want to optimize performance.
341   /// Normal users don't need to use this method.
342   ///
343   /// Here is an example usage for normal users:
344   ///
345   /// ~~~{.cpp}
346   /// decoder.Consume(buffer1);
347   /// decoder.Consume(buffer2);
348   /// decoder.Consume(buffer3);
349   /// ~~~
350   ///
351   /// Decoder has internal buffer. If consumed data isn't enough to
352   /// advance the state of the decoder, consumed data is buffered to
353   /// the internal buffer. It causes performance overhead.
354   ///
355   /// If you pass next_required_size() size data to each Consume()
356   /// call, the decoder doesn't use its internal buffer. It improves
357   /// performance.
358   ///
359   /// Here is an example usage to avoid using internal buffer:
360   ///
361   /// ~~~{.cpp}
362   /// buffer1 = get_data(decoder.next_required_size());
363   /// decoder.Consume(buffer1);
364   /// buffer2 = get_data(decoder.next_required_size());
365   /// decoder.Consume(buffer2);
366   /// ~~~
367   ///
368   /// Users can use this method to avoid creating small chunks. Record
369   /// batch data must be contiguous data. If users pass small chunks
370   /// to the decoder, the decoder needs concatenate small chunks
371   /// internally. It causes performance overhead.
372   ///
373   /// Here is an example usage to reduce small chunks:
374   ///
375   /// ~~~{.cpp}
376   /// buffer = AllocateResizableBuffer();
377   /// while ((small_chunk = get_data(&small_chunk_size))) {
378   ///   auto current_buffer_size = buffer->size();
379   ///   buffer->Resize(current_buffer_size + small_chunk_size);
380   ///   memcpy(buffer->mutable_data() + current_buffer_size,
381   ///          small_chunk,
382   ///          small_chunk_size);
383   ///   if (buffer->size() < decoder.next_required_size()) {
384   ///     continue;
385   ///   }
386   ///   std::shared_ptr<arrow::Buffer> chunk(buffer.release());
387   ///   decoder.Consume(chunk);
388   ///   buffer = AllocateResizableBuffer();
389   /// }
390   /// if (buffer->size() > 0) {
391   ///   std::shared_ptr<arrow::Buffer> chunk(buffer.release());
392   ///   decoder.Consume(chunk);
393   /// }
394   /// ~~~
395   ///
396   /// \return the number of bytes needed to advance the state of the
397   /// decoder
398   int64_t next_required_size() const;
399 
400   /// \brief Return current read statistics
401   ReadStats stats() const;
402 
403  private:
404   class StreamDecoderImpl;
405   std::unique_ptr<StreamDecoderImpl> impl_;
406 
407   ARROW_DISALLOW_COPY_AND_ASSIGN(StreamDecoder);
408 };
409 
410 // Generic read functions; does not copy data if the input supports zero copy reads
411 
412 /// \brief Read Schema from stream serialized as a single IPC message
413 /// and populate any dictionary-encoded fields into a DictionaryMemo
414 ///
415 /// \param[in] stream an InputStream
416 /// \param[in] dictionary_memo for recording dictionary-encoded fields
417 /// \return the output Schema
418 ///
419 /// If record batches follow the schema, it is better to use
420 /// RecordBatchStreamReader
421 ARROW_EXPORT
422 Result<std::shared_ptr<Schema>> ReadSchema(io::InputStream* stream,
423                                            DictionaryMemo* dictionary_memo);
424 
425 /// \brief Read Schema from encapsulated Message
426 ///
427 /// \param[in] message the message containing the Schema IPC metadata
428 /// \param[in] dictionary_memo DictionaryMemo for recording dictionary-encoded
429 /// fields. Can be nullptr if you are sure there are no
430 /// dictionary-encoded fields
431 /// \return the resulting Schema
432 ARROW_EXPORT
433 Result<std::shared_ptr<Schema>> ReadSchema(const Message& message,
434                                            DictionaryMemo* dictionary_memo);
435 
436 /// Read record batch as encapsulated IPC message with metadata size prefix and
437 /// header
438 ///
439 /// \param[in] schema the record batch schema
440 /// \param[in] dictionary_memo DictionaryMemo which has any
441 /// dictionaries. Can be nullptr if you are sure there are no
442 /// dictionary-encoded fields
443 /// \param[in] options IPC options for reading
444 /// \param[in] stream the file where the batch is located
445 /// \return the read record batch
446 ARROW_EXPORT
447 Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(
448     const std::shared_ptr<Schema>& schema, const DictionaryMemo* dictionary_memo,
449     const IpcReadOptions& options, io::InputStream* stream);
450 
451 /// \brief Read record batch from message
452 ///
453 /// \param[in] message a Message containing the record batch metadata
454 /// \param[in] schema the record batch schema
455 /// \param[in] dictionary_memo DictionaryMemo which has any
456 /// dictionaries. Can be nullptr if you are sure there are no
457 /// dictionary-encoded fields
458 /// \param[in] options IPC options for reading
459 /// \return the read record batch
460 ARROW_EXPORT
461 Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(
462     const Message& message, const std::shared_ptr<Schema>& schema,
463     const DictionaryMemo* dictionary_memo, const IpcReadOptions& options);
464 
465 /// Read record batch from file given metadata and schema
466 ///
467 /// \param[in] metadata a Message containing the record batch metadata
468 /// \param[in] schema the record batch schema
469 /// \param[in] dictionary_memo DictionaryMemo which has any
470 /// dictionaries. Can be nullptr if you are sure there are no
471 /// dictionary-encoded fields
472 /// \param[in] file a random access file
473 /// \param[in] options options for deserialization
474 /// \return the read record batch
475 ARROW_EXPORT
476 Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(
477     const Buffer& metadata, const std::shared_ptr<Schema>& schema,
478     const DictionaryMemo* dictionary_memo, const IpcReadOptions& options,
479     io::RandomAccessFile* file);
480 
481 /// \brief Read arrow::Tensor as encapsulated IPC message in file
482 ///
483 /// \param[in] file an InputStream pointed at the start of the message
484 /// \return the read tensor
485 ARROW_EXPORT
486 Result<std::shared_ptr<Tensor>> ReadTensor(io::InputStream* file);
487 
488 /// \brief EXPERIMENTAL: Read arrow::Tensor from IPC message
489 ///
490 /// \param[in] message a Message containing the tensor metadata and body
491 /// \return the read tensor
492 ARROW_EXPORT
493 Result<std::shared_ptr<Tensor>> ReadTensor(const Message& message);
494 
495 /// \brief EXPERIMENTAL: Read arrow::SparseTensor as encapsulated IPC message in file
496 ///
497 /// \param[in] file an InputStream pointed at the start of the message
498 /// \return the read sparse tensor
499 ARROW_EXPORT
500 Result<std::shared_ptr<SparseTensor>> ReadSparseTensor(io::InputStream* file);
501 
502 /// \brief EXPERIMENTAL: Read arrow::SparseTensor from IPC message
503 ///
504 /// \param[in] message a Message containing the tensor metadata and body
505 /// \return the read sparse tensor
506 ARROW_EXPORT
507 Result<std::shared_ptr<SparseTensor>> ReadSparseTensor(const Message& message);
508 
509 namespace internal {
510 
511 // These internal APIs may change without warning or deprecation
512 
513 /// \brief EXPERIMENTAL: Read arrow::SparseTensorFormat::type from a metadata
514 /// \param[in] metadata a Buffer containing the sparse tensor metadata
515 /// \return the count of the body buffers
516 ARROW_EXPORT
517 Result<size_t> ReadSparseTensorBodyBufferCount(const Buffer& metadata);
518 
519 /// \brief EXPERIMENTAL: Read arrow::SparseTensor from an IpcPayload
520 /// \param[in] payload a IpcPayload contains a serialized SparseTensor
521 /// \return the read sparse tensor
522 ARROW_EXPORT
523 Result<std::shared_ptr<SparseTensor>> ReadSparseTensorPayload(const IpcPayload& payload);
524 
525 // For fuzzing targets
526 ARROW_EXPORT
527 Status FuzzIpcStream(const uint8_t* data, int64_t size);
528 ARROW_EXPORT
529 Status FuzzIpcTensorStream(const uint8_t* data, int64_t size);
530 ARROW_EXPORT
531 Status FuzzIpcFile(const uint8_t* data, int64_t size);
532 
533 }  // namespace internal
534 
535 }  // namespace ipc
536 }  // namespace arrow
537