1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements.  See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership.  The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License.  You may obtain a copy of the License at
8#
9#   http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied.  See the License for the
15# specific language governing permissions and limitations
16# under the License.
17
18
19#' @title RecordBatchReader classes
20#' @description Apache Arrow defines two formats for [serializing data for interprocess
21#' communication (IPC)](https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc):
22#' a "stream" format and a "file" format, known as Feather.
23#' `RecordBatchStreamReader` and `RecordBatchFileReader` are
24#' interfaces for accessing record batches from input sources those formats,
25#' respectively.
26#'
27#' For guidance on how to use these classes, see the examples section.
28#'
29#' @seealso [read_ipc_stream()] and [read_feather()] provide a much simpler interface
30#' for reading data from these formats and are sufficient for many use cases.
31#' @usage NULL
32#' @format NULL
33#' @docType class
34#' @section Factory:
35#'
36#' The `RecordBatchFileReader$create()` and `RecordBatchStreamReader$create()`
37#' factory methods instantiate the object and
38#' take a single argument, named according to the class:
39#'
40#' - `file` A character file name, raw vector, or Arrow file connection object
41#'    (e.g. [RandomAccessFile]).
42#' - `stream` A raw vector, [Buffer], or [InputStream].
43#'
44#' @section Methods:
45#'
46#' - `$read_next_batch()`: Returns a `RecordBatch`, iterating through the
47#'   Reader. If there are no further batches in the Reader, it returns `NULL`.
48#' - `$schema`: Returns a [Schema] (active binding)
49#' - `$batches()`: Returns a list of `RecordBatch`es
50#' - `$read_table()`: Collects the reader's `RecordBatch`es into a [Table]
51#' - `$get_batch(i)`: For `RecordBatchFileReader`, return a particular batch
52#'    by an integer index.
53#' - `$num_record_batches()`: For `RecordBatchFileReader`, see how many batches
54#'    are in the file.
55#'
56#' @rdname RecordBatchReader
57#' @name RecordBatchReader
58#' @include arrow-package.R
59#' @examples
60#' \donttest{
61#' tf <- tempfile()
62#' on.exit(unlink(tf))
63#'
64#' batch <- record_batch(chickwts)
65#'
66#' # This opens a connection to the file in Arrow
67#' file_obj <- FileOutputStream$create(tf)
68#' # Pass that to a RecordBatchWriter to write data conforming to a schema
69#' writer <- RecordBatchFileWriter$create(file_obj, batch$schema)
70#' writer$write(batch)
71#' # You may write additional batches to the stream, provided that they have
72#' # the same schema.
73#' # Call "close" on the writer to indicate end-of-file/stream
74#' writer$close()
75#' # Then, close the connection--closing the IPC message does not close the file
76#' file_obj$close()
77#'
78#' # Now, we have a file we can read from. Same pattern: open file connection,
79#' # then pass it to a RecordBatchReader
80#' read_file_obj <- ReadableFile$create(tf)
81#' reader <- RecordBatchFileReader$create(read_file_obj)
82#' # RecordBatchFileReader knows how many batches it has (StreamReader does not)
83#' reader$num_record_batches
84#' # We could consume the Reader by calling $read_next_batch() until all are,
85#' # consumed, or we can call $read_table() to pull them all into a Table
86#' tab <- reader$read_table()
87#' # Call as.data.frame to turn that Table into an R data.frame
88#' df <- as.data.frame(tab)
89#' # This should be the same data we sent
90#' all.equal(df, chickwts, check.attributes = FALSE)
91#' # Unlike the Writers, we don't have to close RecordBatchReaders,
92#' # but we do still need to close the file connection
93#' read_file_obj$close()
94#' }
95RecordBatchReader <- R6Class("RecordBatchReader", inherit = ArrowObject,
96  public = list(
97    read_next_batch = function() {
98      RecordBatchReader__ReadNext(self)
99    }
100  ),
101  active = list(
102    schema = function() RecordBatchReader__schema(self)
103  )
104)
105
106#' @rdname RecordBatchReader
107#' @usage NULL
108#' @format NULL
109#' @export
110RecordBatchStreamReader <- R6Class("RecordBatchStreamReader", inherit = RecordBatchReader,
111  public = list(
112    batches = function() ipc___RecordBatchStreamReader__batches(self),
113    read_table = function() Table__from_RecordBatchReader(self)
114  )
115)
116RecordBatchStreamReader$create <- function(stream) {
117  if (inherits(stream, c("raw", "Buffer"))) {
118    # TODO: deprecate this because it doesn't close the connection to the Buffer
119    # (that's a problem, right?)
120    stream <- BufferReader$create(stream)
121  }
122  assert_is(stream, "InputStream")
123  ipc___RecordBatchStreamReader__Open(stream)
124}
125
126#' @rdname RecordBatchReader
127#' @usage NULL
128#' @format NULL
129#' @export
130RecordBatchFileReader <- R6Class("RecordBatchFileReader", inherit = ArrowObject,
131  # Why doesn't this inherit from RecordBatchReader in C++?
132  # Origin: https://github.com/apache/arrow/pull/679
133  public = list(
134    get_batch = function(i) {
135      ipc___RecordBatchFileReader__ReadRecordBatch(self, i)
136    },
137    batches = function() {
138      ipc___RecordBatchFileReader__batches(self)
139    },
140    read_table = function() Table__from_RecordBatchFileReader(self)
141  ),
142  active = list(
143    num_record_batches = function() ipc___RecordBatchFileReader__num_record_batches(self),
144    schema = function() ipc___RecordBatchFileReader__schema(self)
145  )
146)
147RecordBatchFileReader$create <- function(file) {
148  if (inherits(file, c("raw", "Buffer"))) {
149    # TODO: deprecate this because it doesn't close the connection to the Buffer
150    # (that's a problem, right?)
151    file <- BufferReader$create(file)
152  }
153  assert_is(file, "InputStream")
154  ipc___RecordBatchFileReader__Open(file)
155}
156