1 // Licensed to the Apache Software Foundation (ASF) under one 2 // or more contributor license agreements. See the NOTICE file 3 // distributed with this work for additional information 4 // regarding copyright ownership. The ASF licenses this file 5 // to you under the Apache License, Version 2.0 (the 6 // "License"); you may not use this file except in compliance 7 // with the License. You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, 12 // software distributed under the License is distributed on an 13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 // KIND, either express or implied. See the License for the 15 // specific language governing permissions and limitations 16 // under the License. 17 18 // Read Arrow files and streams 19 20 #pragma once 21 22 #include <cstddef> 23 #include <cstdint> 24 #include <memory> 25 #include <utility> 26 #include <vector> 27 28 #include "arrow/io/caching.h" 29 #include "arrow/io/type_fwd.h" 30 #include "arrow/ipc/message.h" 31 #include "arrow/ipc/options.h" 32 #include "arrow/record_batch.h" 33 #include "arrow/result.h" 34 #include "arrow/type_fwd.h" 35 #include "arrow/util/async_generator.h" 36 #include "arrow/util/macros.h" 37 #include "arrow/util/visibility.h" 38 39 namespace arrow { 40 namespace ipc { 41 42 class DictionaryMemo; 43 struct IpcPayload; 44 45 using RecordBatchReader = ::arrow::RecordBatchReader; 46 47 struct ReadStats { 48 /// Number of IPC messages read. 49 int64_t num_messages = 0; 50 /// Number of record batches read. 51 int64_t num_record_batches = 0; 52 /// Number of dictionary batches read. 53 /// 54 /// Note: num_dictionary_batches >= num_dictionary_deltas + num_replaced_dictionaries 55 int64_t num_dictionary_batches = 0; 56 57 /// Number of dictionary deltas read. 58 int64_t num_dictionary_deltas = 0; 59 /// Number of replaced dictionaries (i.e. where a dictionary batch replaces 60 /// an existing dictionary with an unrelated new dictionary). 61 int64_t num_replaced_dictionaries = 0; 62 }; 63 64 /// \brief Synchronous batch stream reader that reads from io::InputStream 65 /// 66 /// This class reads the schema (plus any dictionaries) as the first messages 67 /// in the stream, followed by record batches. For more granular zero-copy 68 /// reads see the ReadRecordBatch functions 69 class ARROW_EXPORT RecordBatchStreamReader : public RecordBatchReader { 70 public: 71 /// Create batch reader from generic MessageReader. 72 /// This will take ownership of the given MessageReader. 73 /// 74 /// \param[in] message_reader a MessageReader implementation 75 /// \param[in] options any IPC reading options (optional) 76 /// \return the created batch reader 77 static Result<std::shared_ptr<RecordBatchStreamReader>> Open( 78 std::unique_ptr<MessageReader> message_reader, 79 const IpcReadOptions& options = IpcReadOptions::Defaults()); 80 81 /// \brief Record batch stream reader from InputStream 82 /// 83 /// \param[in] stream an input stream instance. Must stay alive throughout 84 /// lifetime of stream reader 85 /// \param[in] options any IPC reading options (optional) 86 /// \return the created batch reader 87 static Result<std::shared_ptr<RecordBatchStreamReader>> Open( 88 io::InputStream* stream, 89 const IpcReadOptions& options = IpcReadOptions::Defaults()); 90 91 /// \brief Open stream and retain ownership of stream object 92 /// \param[in] stream the input stream 93 /// \param[in] options any IPC reading options (optional) 94 /// \return the created batch reader 95 static Result<std::shared_ptr<RecordBatchStreamReader>> Open( 96 const std::shared_ptr<io::InputStream>& stream, 97 const IpcReadOptions& options = IpcReadOptions::Defaults()); 98 99 /// \brief Return current read statistics 100 virtual ReadStats stats() const = 0; 101 }; 102 103 /// \brief Reads the record batch file format 104 class ARROW_EXPORT RecordBatchFileReader 105 : public std::enable_shared_from_this<RecordBatchFileReader> { 106 public: 107 virtual ~RecordBatchFileReader() = default; 108 109 /// \brief Open a RecordBatchFileReader 110 /// 111 /// Open a file-like object that is assumed to be self-contained; i.e., the 112 /// end of the file interface is the end of the Arrow file. Note that there 113 /// can be any amount of data preceding the Arrow-formatted data, because we 114 /// need only locate the end of the Arrow file stream to discover the metadata 115 /// and then proceed to read the data into memory. 116 static Result<std::shared_ptr<RecordBatchFileReader>> Open( 117 io::RandomAccessFile* file, 118 const IpcReadOptions& options = IpcReadOptions::Defaults()); 119 120 /// \brief Open a RecordBatchFileReader 121 /// If the file is embedded within some larger file or memory region, you can 122 /// pass the absolute memory offset to the end of the file (which contains the 123 /// metadata footer). The metadata must have been written with memory offsets 124 /// relative to the start of the containing file 125 /// 126 /// \param[in] file the data source 127 /// \param[in] footer_offset the position of the end of the Arrow file 128 /// \param[in] options options for IPC reading 129 /// \return the returned reader 130 static Result<std::shared_ptr<RecordBatchFileReader>> Open( 131 io::RandomAccessFile* file, int64_t footer_offset, 132 const IpcReadOptions& options = IpcReadOptions::Defaults()); 133 134 /// \brief Version of Open that retains ownership of file 135 /// 136 /// \param[in] file the data source 137 /// \param[in] options options for IPC reading 138 /// \return the returned reader 139 static Result<std::shared_ptr<RecordBatchFileReader>> Open( 140 const std::shared_ptr<io::RandomAccessFile>& file, 141 const IpcReadOptions& options = IpcReadOptions::Defaults()); 142 143 /// \brief Version of Open that retains ownership of file 144 /// 145 /// \param[in] file the data source 146 /// \param[in] footer_offset the position of the end of the Arrow file 147 /// \param[in] options options for IPC reading 148 /// \return the returned reader 149 static Result<std::shared_ptr<RecordBatchFileReader>> Open( 150 const std::shared_ptr<io::RandomAccessFile>& file, int64_t footer_offset, 151 const IpcReadOptions& options = IpcReadOptions::Defaults()); 152 153 /// \brief Open a file asynchronously (owns the file). 154 static Future<std::shared_ptr<RecordBatchFileReader>> OpenAsync( 155 const std::shared_ptr<io::RandomAccessFile>& file, 156 const IpcReadOptions& options = IpcReadOptions::Defaults()); 157 158 /// \brief Open a file asynchronously (borrows the file). 159 static Future<std::shared_ptr<RecordBatchFileReader>> OpenAsync( 160 io::RandomAccessFile* file, 161 const IpcReadOptions& options = IpcReadOptions::Defaults()); 162 163 /// \brief Open a file asynchronously (owns the file). 164 static Future<std::shared_ptr<RecordBatchFileReader>> OpenAsync( 165 const std::shared_ptr<io::RandomAccessFile>& file, int64_t footer_offset, 166 const IpcReadOptions& options = IpcReadOptions::Defaults()); 167 168 /// \brief Open a file asynchronously (borrows the file). 169 static Future<std::shared_ptr<RecordBatchFileReader>> OpenAsync( 170 io::RandomAccessFile* file, int64_t footer_offset, 171 const IpcReadOptions& options = IpcReadOptions::Defaults()); 172 173 /// \brief The schema read from the file 174 virtual std::shared_ptr<Schema> schema() const = 0; 175 176 /// \brief Returns the number of record batches in the file 177 virtual int num_record_batches() const = 0; 178 179 /// \brief Return the metadata version from the file metadata 180 virtual MetadataVersion version() const = 0; 181 182 /// \brief Return the contents of the custom_metadata field from the file's 183 /// Footer 184 virtual std::shared_ptr<const KeyValueMetadata> metadata() const = 0; 185 186 /// \brief Read a particular record batch from the file. Does not copy memory 187 /// if the input source supports zero-copy. 188 /// 189 /// \param[in] i the index of the record batch to return 190 /// \return the read batch 191 virtual Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(int i) = 0; 192 193 /// \brief Return current read statistics 194 virtual ReadStats stats() const = 0; 195 196 /// \brief Computes the total number of rows in the file. 197 virtual Result<int64_t> CountRows() = 0; 198 199 /// \brief Get a reentrant generator of record batches. 200 /// 201 /// \param[in] coalesce If true, enable I/O coalescing. 202 /// \param[in] io_context The IOContext to use (controls which thread pool 203 /// is used for I/O). 204 /// \param[in] cache_options Options for coalescing (if enabled). 205 /// \param[in] executor Optionally, an executor to use for decoding record 206 /// batches. This is generally only a benefit for very wide and/or 207 /// compressed batches. 208 virtual Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> GetRecordBatchGenerator( 209 const bool coalesce = false, 210 const io::IOContext& io_context = io::default_io_context(), 211 const io::CacheOptions cache_options = io::CacheOptions::LazyDefaults(), 212 arrow::internal::Executor* executor = NULLPTR) = 0; 213 }; 214 215 /// \brief A general listener class to receive events. 216 /// 217 /// You must implement callback methods for interested events. 218 /// 219 /// This API is EXPERIMENTAL. 220 /// 221 /// \since 0.17.0 222 class ARROW_EXPORT Listener { 223 public: 224 virtual ~Listener() = default; 225 226 /// \brief Called when end-of-stream is received. 227 /// 228 /// The default implementation just returns arrow::Status::OK(). 229 /// 230 /// \return Status 231 /// 232 /// \see StreamDecoder 233 virtual Status OnEOS(); 234 235 /// \brief Called when a record batch is decoded. 236 /// 237 /// The default implementation just returns 238 /// arrow::Status::NotImplemented(). 239 /// 240 /// \param[in] record_batch a record batch decoded 241 /// \return Status 242 /// 243 /// \see StreamDecoder 244 virtual Status OnRecordBatchDecoded(std::shared_ptr<RecordBatch> record_batch); 245 246 /// \brief Called when a schema is decoded. 247 /// 248 /// The default implementation just returns arrow::Status::OK(). 249 /// 250 /// \param[in] schema a schema decoded 251 /// \return Status 252 /// 253 /// \see StreamDecoder 254 virtual Status OnSchemaDecoded(std::shared_ptr<Schema> schema); 255 }; 256 257 /// \brief Collect schema and record batches decoded by StreamDecoder. 258 /// 259 /// This API is EXPERIMENTAL. 260 /// 261 /// \since 0.17.0 262 class ARROW_EXPORT CollectListener : public Listener { 263 public: CollectListener()264 CollectListener() : schema_(), record_batches_() {} 265 virtual ~CollectListener() = default; 266 OnSchemaDecoded(std::shared_ptr<Schema> schema)267 Status OnSchemaDecoded(std::shared_ptr<Schema> schema) override { 268 schema_ = std::move(schema); 269 return Status::OK(); 270 } 271 OnRecordBatchDecoded(std::shared_ptr<RecordBatch> record_batch)272 Status OnRecordBatchDecoded(std::shared_ptr<RecordBatch> record_batch) override { 273 record_batches_.push_back(std::move(record_batch)); 274 return Status::OK(); 275 } 276 277 /// \return the decoded schema schema()278 std::shared_ptr<Schema> schema() const { return schema_; } 279 280 /// \return the all decoded record batches record_batches()281 std::vector<std::shared_ptr<RecordBatch>> record_batches() const { 282 return record_batches_; 283 } 284 285 private: 286 std::shared_ptr<Schema> schema_; 287 std::vector<std::shared_ptr<RecordBatch>> record_batches_; 288 }; 289 290 /// \brief Push style stream decoder that receives data from user. 291 /// 292 /// This class decodes the Apache Arrow IPC streaming format data. 293 /// 294 /// This API is EXPERIMENTAL. 295 /// 296 /// \see https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format 297 /// 298 /// \since 0.17.0 299 class ARROW_EXPORT StreamDecoder { 300 public: 301 /// \brief Construct a stream decoder. 302 /// 303 /// \param[in] listener a Listener that must implement 304 /// Listener::OnRecordBatchDecoded() to receive decoded record batches 305 /// \param[in] options any IPC reading options (optional) 306 StreamDecoder(std::shared_ptr<Listener> listener, 307 IpcReadOptions options = IpcReadOptions::Defaults()); 308 309 virtual ~StreamDecoder(); 310 311 /// \brief Feed data to the decoder as a raw data. 312 /// 313 /// If the decoder can read one or more record batches by the data, 314 /// the decoder calls listener->OnRecordBatchDecoded() with a 315 /// decoded record batch multiple times. 316 /// 317 /// \param[in] data a raw data to be processed. This data isn't 318 /// copied. The passed memory must be kept alive through record 319 /// batch processing. 320 /// \param[in] size raw data size. 321 /// \return Status 322 Status Consume(const uint8_t* data, int64_t size); 323 324 /// \brief Feed data to the decoder as a Buffer. 325 /// 326 /// If the decoder can read one or more record batches by the 327 /// Buffer, the decoder calls listener->RecordBatchReceived() with a 328 /// decoded record batch multiple times. 329 /// 330 /// \param[in] buffer a Buffer to be processed. 331 /// \return Status 332 Status Consume(std::shared_ptr<Buffer> buffer); 333 334 /// \return the shared schema of the record batches in the stream 335 std::shared_ptr<Schema> schema() const; 336 337 /// \brief Return the number of bytes needed to advance the state of 338 /// the decoder. 339 /// 340 /// This method is provided for users who want to optimize performance. 341 /// Normal users don't need to use this method. 342 /// 343 /// Here is an example usage for normal users: 344 /// 345 /// ~~~{.cpp} 346 /// decoder.Consume(buffer1); 347 /// decoder.Consume(buffer2); 348 /// decoder.Consume(buffer3); 349 /// ~~~ 350 /// 351 /// Decoder has internal buffer. If consumed data isn't enough to 352 /// advance the state of the decoder, consumed data is buffered to 353 /// the internal buffer. It causes performance overhead. 354 /// 355 /// If you pass next_required_size() size data to each Consume() 356 /// call, the decoder doesn't use its internal buffer. It improves 357 /// performance. 358 /// 359 /// Here is an example usage to avoid using internal buffer: 360 /// 361 /// ~~~{.cpp} 362 /// buffer1 = get_data(decoder.next_required_size()); 363 /// decoder.Consume(buffer1); 364 /// buffer2 = get_data(decoder.next_required_size()); 365 /// decoder.Consume(buffer2); 366 /// ~~~ 367 /// 368 /// Users can use this method to avoid creating small chunks. Record 369 /// batch data must be contiguous data. If users pass small chunks 370 /// to the decoder, the decoder needs concatenate small chunks 371 /// internally. It causes performance overhead. 372 /// 373 /// Here is an example usage to reduce small chunks: 374 /// 375 /// ~~~{.cpp} 376 /// buffer = AllocateResizableBuffer(); 377 /// while ((small_chunk = get_data(&small_chunk_size))) { 378 /// auto current_buffer_size = buffer->size(); 379 /// buffer->Resize(current_buffer_size + small_chunk_size); 380 /// memcpy(buffer->mutable_data() + current_buffer_size, 381 /// small_chunk, 382 /// small_chunk_size); 383 /// if (buffer->size() < decoder.next_required_size()) { 384 /// continue; 385 /// } 386 /// std::shared_ptr<arrow::Buffer> chunk(buffer.release()); 387 /// decoder.Consume(chunk); 388 /// buffer = AllocateResizableBuffer(); 389 /// } 390 /// if (buffer->size() > 0) { 391 /// std::shared_ptr<arrow::Buffer> chunk(buffer.release()); 392 /// decoder.Consume(chunk); 393 /// } 394 /// ~~~ 395 /// 396 /// \return the number of bytes needed to advance the state of the 397 /// decoder 398 int64_t next_required_size() const; 399 400 /// \brief Return current read statistics 401 ReadStats stats() const; 402 403 private: 404 class StreamDecoderImpl; 405 std::unique_ptr<StreamDecoderImpl> impl_; 406 407 ARROW_DISALLOW_COPY_AND_ASSIGN(StreamDecoder); 408 }; 409 410 // Generic read functions; does not copy data if the input supports zero copy reads 411 412 /// \brief Read Schema from stream serialized as a single IPC message 413 /// and populate any dictionary-encoded fields into a DictionaryMemo 414 /// 415 /// \param[in] stream an InputStream 416 /// \param[in] dictionary_memo for recording dictionary-encoded fields 417 /// \return the output Schema 418 /// 419 /// If record batches follow the schema, it is better to use 420 /// RecordBatchStreamReader 421 ARROW_EXPORT 422 Result<std::shared_ptr<Schema>> ReadSchema(io::InputStream* stream, 423 DictionaryMemo* dictionary_memo); 424 425 /// \brief Read Schema from encapsulated Message 426 /// 427 /// \param[in] message the message containing the Schema IPC metadata 428 /// \param[in] dictionary_memo DictionaryMemo for recording dictionary-encoded 429 /// fields. Can be nullptr if you are sure there are no 430 /// dictionary-encoded fields 431 /// \return the resulting Schema 432 ARROW_EXPORT 433 Result<std::shared_ptr<Schema>> ReadSchema(const Message& message, 434 DictionaryMemo* dictionary_memo); 435 436 /// Read record batch as encapsulated IPC message with metadata size prefix and 437 /// header 438 /// 439 /// \param[in] schema the record batch schema 440 /// \param[in] dictionary_memo DictionaryMemo which has any 441 /// dictionaries. Can be nullptr if you are sure there are no 442 /// dictionary-encoded fields 443 /// \param[in] options IPC options for reading 444 /// \param[in] stream the file where the batch is located 445 /// \return the read record batch 446 ARROW_EXPORT 447 Result<std::shared_ptr<RecordBatch>> ReadRecordBatch( 448 const std::shared_ptr<Schema>& schema, const DictionaryMemo* dictionary_memo, 449 const IpcReadOptions& options, io::InputStream* stream); 450 451 /// \brief Read record batch from message 452 /// 453 /// \param[in] message a Message containing the record batch metadata 454 /// \param[in] schema the record batch schema 455 /// \param[in] dictionary_memo DictionaryMemo which has any 456 /// dictionaries. Can be nullptr if you are sure there are no 457 /// dictionary-encoded fields 458 /// \param[in] options IPC options for reading 459 /// \return the read record batch 460 ARROW_EXPORT 461 Result<std::shared_ptr<RecordBatch>> ReadRecordBatch( 462 const Message& message, const std::shared_ptr<Schema>& schema, 463 const DictionaryMemo* dictionary_memo, const IpcReadOptions& options); 464 465 /// Read record batch from file given metadata and schema 466 /// 467 /// \param[in] metadata a Message containing the record batch metadata 468 /// \param[in] schema the record batch schema 469 /// \param[in] dictionary_memo DictionaryMemo which has any 470 /// dictionaries. Can be nullptr if you are sure there are no 471 /// dictionary-encoded fields 472 /// \param[in] file a random access file 473 /// \param[in] options options for deserialization 474 /// \return the read record batch 475 ARROW_EXPORT 476 Result<std::shared_ptr<RecordBatch>> ReadRecordBatch( 477 const Buffer& metadata, const std::shared_ptr<Schema>& schema, 478 const DictionaryMemo* dictionary_memo, const IpcReadOptions& options, 479 io::RandomAccessFile* file); 480 481 /// \brief Read arrow::Tensor as encapsulated IPC message in file 482 /// 483 /// \param[in] file an InputStream pointed at the start of the message 484 /// \return the read tensor 485 ARROW_EXPORT 486 Result<std::shared_ptr<Tensor>> ReadTensor(io::InputStream* file); 487 488 /// \brief EXPERIMENTAL: Read arrow::Tensor from IPC message 489 /// 490 /// \param[in] message a Message containing the tensor metadata and body 491 /// \return the read tensor 492 ARROW_EXPORT 493 Result<std::shared_ptr<Tensor>> ReadTensor(const Message& message); 494 495 /// \brief EXPERIMENTAL: Read arrow::SparseTensor as encapsulated IPC message in file 496 /// 497 /// \param[in] file an InputStream pointed at the start of the message 498 /// \return the read sparse tensor 499 ARROW_EXPORT 500 Result<std::shared_ptr<SparseTensor>> ReadSparseTensor(io::InputStream* file); 501 502 /// \brief EXPERIMENTAL: Read arrow::SparseTensor from IPC message 503 /// 504 /// \param[in] message a Message containing the tensor metadata and body 505 /// \return the read sparse tensor 506 ARROW_EXPORT 507 Result<std::shared_ptr<SparseTensor>> ReadSparseTensor(const Message& message); 508 509 namespace internal { 510 511 // These internal APIs may change without warning or deprecation 512 513 /// \brief EXPERIMENTAL: Read arrow::SparseTensorFormat::type from a metadata 514 /// \param[in] metadata a Buffer containing the sparse tensor metadata 515 /// \return the count of the body buffers 516 ARROW_EXPORT 517 Result<size_t> ReadSparseTensorBodyBufferCount(const Buffer& metadata); 518 519 /// \brief EXPERIMENTAL: Read arrow::SparseTensor from an IpcPayload 520 /// \param[in] payload a IpcPayload contains a serialized SparseTensor 521 /// \return the read sparse tensor 522 ARROW_EXPORT 523 Result<std::shared_ptr<SparseTensor>> ReadSparseTensorPayload(const IpcPayload& payload); 524 525 // For fuzzing targets 526 ARROW_EXPORT 527 Status FuzzIpcStream(const uint8_t* data, int64_t size); 528 ARROW_EXPORT 529 Status FuzzIpcTensorStream(const uint8_t* data, int64_t size); 530 ARROW_EXPORT 531 Status FuzzIpcFile(const uint8_t* data, int64_t size); 532 533 } // namespace internal 534 535 } // namespace ipc 536 } // namespace arrow 537