1# Licensed to the Apache Software Foundation (ASF) under one 2# or more contributor license agreements. See the NOTICE file 3# distributed with this work for additional information 4# regarding copyright ownership. The ASF licenses this file 5# to you under the Apache License, Version 2.0 (the 6# "License"); you may not use this file except in compliance 7# with the License. You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, 12# software distributed under the License is distributed on an 13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14# KIND, either express or implied. See the License for the 15# specific language governing permissions and limitations 16# under the License. 17 18 19#' @title RecordBatchReader classes 20#' @description Apache Arrow defines two formats for [serializing data for interprocess 21#' communication (IPC)](https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc): 22#' a "stream" format and a "file" format, known as Feather. 23#' `RecordBatchStreamReader` and `RecordBatchFileReader` are 24#' interfaces for accessing record batches from input sources those formats, 25#' respectively. 26#' 27#' For guidance on how to use these classes, see the examples section. 28#' 29#' @seealso [read_ipc_stream()] and [read_feather()] provide a much simpler interface 30#' for reading data from these formats and are sufficient for many use cases. 31#' @usage NULL 32#' @format NULL 33#' @docType class 34#' @section Factory: 35#' 36#' The `RecordBatchFileReader$create()` and `RecordBatchStreamReader$create()` 37#' factory methods instantiate the object and 38#' take a single argument, named according to the class: 39#' 40#' - `file` A character file name, raw vector, or Arrow file connection object 41#' (e.g. [RandomAccessFile]). 42#' - `stream` A raw vector, [Buffer], or [InputStream]. 43#' 44#' @section Methods: 45#' 46#' - `$read_next_batch()`: Returns a `RecordBatch`, iterating through the 47#' Reader. If there are no further batches in the Reader, it returns `NULL`. 48#' - `$schema`: Returns a [Schema] (active binding) 49#' - `$batches()`: Returns a list of `RecordBatch`es 50#' - `$read_table()`: Collects the reader's `RecordBatch`es into a [Table] 51#' - `$get_batch(i)`: For `RecordBatchFileReader`, return a particular batch 52#' by an integer index. 53#' - `$num_record_batches()`: For `RecordBatchFileReader`, see how many batches 54#' are in the file. 55#' 56#' @rdname RecordBatchReader 57#' @name RecordBatchReader 58#' @include arrow-package.R 59#' @examples 60#' \donttest{ 61#' tf <- tempfile() 62#' on.exit(unlink(tf)) 63#' 64#' batch <- record_batch(chickwts) 65#' 66#' # This opens a connection to the file in Arrow 67#' file_obj <- FileOutputStream$create(tf) 68#' # Pass that to a RecordBatchWriter to write data conforming to a schema 69#' writer <- RecordBatchFileWriter$create(file_obj, batch$schema) 70#' writer$write(batch) 71#' # You may write additional batches to the stream, provided that they have 72#' # the same schema. 73#' # Call "close" on the writer to indicate end-of-file/stream 74#' writer$close() 75#' # Then, close the connection--closing the IPC message does not close the file 76#' file_obj$close() 77#' 78#' # Now, we have a file we can read from. Same pattern: open file connection, 79#' # then pass it to a RecordBatchReader 80#' read_file_obj <- ReadableFile$create(tf) 81#' reader <- RecordBatchFileReader$create(read_file_obj) 82#' # RecordBatchFileReader knows how many batches it has (StreamReader does not) 83#' reader$num_record_batches 84#' # We could consume the Reader by calling $read_next_batch() until all are, 85#' # consumed, or we can call $read_table() to pull them all into a Table 86#' tab <- reader$read_table() 87#' # Call as.data.frame to turn that Table into an R data.frame 88#' df <- as.data.frame(tab) 89#' # This should be the same data we sent 90#' all.equal(df, chickwts, check.attributes = FALSE) 91#' # Unlike the Writers, we don't have to close RecordBatchReaders, 92#' # but we do still need to close the file connection 93#' read_file_obj$close() 94#' } 95RecordBatchReader <- R6Class("RecordBatchReader", inherit = ArrowObject, 96 public = list( 97 read_next_batch = function() { 98 RecordBatchReader__ReadNext(self) 99 } 100 ), 101 active = list( 102 schema = function() RecordBatchReader__schema(self) 103 ) 104) 105 106#' @rdname RecordBatchReader 107#' @usage NULL 108#' @format NULL 109#' @export 110RecordBatchStreamReader <- R6Class("RecordBatchStreamReader", inherit = RecordBatchReader, 111 public = list( 112 batches = function() ipc___RecordBatchStreamReader__batches(self), 113 read_table = function() Table__from_RecordBatchReader(self) 114 ) 115) 116RecordBatchStreamReader$create <- function(stream) { 117 if (inherits(stream, c("raw", "Buffer"))) { 118 # TODO: deprecate this because it doesn't close the connection to the Buffer 119 # (that's a problem, right?) 120 stream <- BufferReader$create(stream) 121 } 122 assert_is(stream, "InputStream") 123 ipc___RecordBatchStreamReader__Open(stream) 124} 125 126#' @rdname RecordBatchReader 127#' @usage NULL 128#' @format NULL 129#' @export 130RecordBatchFileReader <- R6Class("RecordBatchFileReader", inherit = ArrowObject, 131 # Why doesn't this inherit from RecordBatchReader in C++? 132 # Origin: https://github.com/apache/arrow/pull/679 133 public = list( 134 get_batch = function(i) { 135 ipc___RecordBatchFileReader__ReadRecordBatch(self, i) 136 }, 137 batches = function() { 138 ipc___RecordBatchFileReader__batches(self) 139 }, 140 read_table = function() Table__from_RecordBatchFileReader(self) 141 ), 142 active = list( 143 num_record_batches = function() ipc___RecordBatchFileReader__num_record_batches(self), 144 schema = function() ipc___RecordBatchFileReader__schema(self) 145 ) 146) 147RecordBatchFileReader$create <- function(file) { 148 if (inherits(file, c("raw", "Buffer"))) { 149 # TODO: deprecate this because it doesn't close the connection to the Buffer 150 # (that's a problem, right?) 151 file <- BufferReader$create(file) 152 } 153 assert_is(file, "InputStream") 154 ipc___RecordBatchFileReader__Open(file) 155} 156