1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 //! A two-dimensional batch of column-oriented data with a defined
19 //! [schema](crate::datatypes::Schema).
20 
21 use std::sync::Arc;
22 
23 use crate::array::*;
24 use crate::datatypes::*;
25 use crate::error::{ArrowError, Result};
26 
27 /// A two-dimensional batch of column-oriented data with a defined
28 /// [schema](crate::datatypes::Schema).
29 ///
30 /// A `RecordBatch` is a two-dimensional dataset of a number of
31 /// contiguous arrays, each the same length.
32 /// A record batch has a schema which must match its arrays’
33 /// datatypes.
34 ///
35 /// Record batches are a convenient unit of work for various
36 /// serialization and computation functions, possibly incremental.
37 /// See also [CSV reader](crate::csv::Reader) and
38 /// [JSON reader](crate::json::Reader).
39 #[derive(Clone, Debug)]
40 pub struct RecordBatch {
41     schema: SchemaRef,
42     columns: Vec<Arc<Array>>,
43 }
44 
45 impl RecordBatch {
46     /// Creates a `RecordBatch` from a schema and columns.
47     ///
48     /// Expects the following:
49     ///  * the vec of columns to not be empty
50     ///  * the schema and column data types to have equal lengths
51     ///    and match
52     ///  * each array in columns to have the same length
53     ///
54     /// If the conditions are not met, an error is returned.
55     ///
56     /// # Example
57     ///
58     /// ```
59     /// use std::sync::Arc;
60     /// use arrow::array::Int32Array;
61     /// use arrow::datatypes::{Schema, Field, DataType};
62     /// use arrow::record_batch::RecordBatch;
63     ///
64     /// # fn main() -> arrow::error::Result<()> {
65     /// let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
66     /// let schema = Schema::new(vec![
67     ///     Field::new("id", DataType::Int32, false)
68     /// ]);
69     ///
70     /// let batch = RecordBatch::try_new(
71     ///     Arc::new(schema),
72     ///     vec![Arc::new(id_array)]
73     /// )?;
74     /// # Ok(())
75     /// # }
76     /// ```
try_new(schema: SchemaRef, columns: Vec<ArrayRef>) -> Result<Self>77     pub fn try_new(schema: SchemaRef, columns: Vec<ArrayRef>) -> Result<Self> {
78         let options = RecordBatchOptions::default();
79         Self::validate_new_batch(&schema, columns.as_slice(), &options)?;
80         Ok(RecordBatch { schema, columns })
81     }
82 
83     /// Creates a `RecordBatch` from a schema and columns, with additional options,
84     /// such as whether to strictly validate field names.
85     ///
86     /// See [`RecordBatch::try_new`] for the expected conditions.
try_new_with_options( schema: SchemaRef, columns: Vec<ArrayRef>, options: &RecordBatchOptions, ) -> Result<Self>87     pub fn try_new_with_options(
88         schema: SchemaRef,
89         columns: Vec<ArrayRef>,
90         options: &RecordBatchOptions,
91     ) -> Result<Self> {
92         Self::validate_new_batch(&schema, columns.as_slice(), options)?;
93         Ok(RecordBatch { schema, columns })
94     }
95 
96     /// Creates a new empty [`RecordBatch`].
new_empty(schema: SchemaRef) -> Self97     pub fn new_empty(schema: SchemaRef) -> Self {
98         let columns = schema
99             .fields()
100             .iter()
101             .map(|field| new_empty_array(field.data_type()))
102             .collect();
103         RecordBatch { schema, columns }
104     }
105 
106     /// Validate the schema and columns using [`RecordBatchOptions`]. Returns an error
107     /// if any validation check fails.
validate_new_batch( schema: &SchemaRef, columns: &[ArrayRef], options: &RecordBatchOptions, ) -> Result<()>108     fn validate_new_batch(
109         schema: &SchemaRef,
110         columns: &[ArrayRef],
111         options: &RecordBatchOptions,
112     ) -> Result<()> {
113         // check that there are some columns
114         if columns.is_empty() {
115             return Err(ArrowError::InvalidArgumentError(
116                 "at least one column must be defined to create a record batch"
117                     .to_string(),
118             ));
119         }
120         // check that number of fields in schema match column length
121         if schema.fields().len() != columns.len() {
122             return Err(ArrowError::InvalidArgumentError(format!(
123                 "number of columns({}) must match number of fields({}) in schema",
124                 columns.len(),
125                 schema.fields().len(),
126             )));
127         }
128         // check that all columns have the same row count, and match the schema
129         let len = columns[0].data().len();
130 
131         // This is a bit repetitive, but it is better to check the condition outside the loop
132         if options.match_field_names {
133             for (i, column) in columns.iter().enumerate() {
134                 if column.len() != len {
135                     return Err(ArrowError::InvalidArgumentError(
136                         "all columns in a record batch must have the same length"
137                             .to_string(),
138                     ));
139                 }
140                 if column.data_type() != schema.field(i).data_type() {
141                     return Err(ArrowError::InvalidArgumentError(format!(
142                         "column types must match schema types, expected {:?} but found {:?} at column index {}",
143                         schema.field(i).data_type(),
144                         column.data_type(),
145                         i)));
146                 }
147             }
148         } else {
149             for (i, column) in columns.iter().enumerate() {
150                 if column.len() != len {
151                     return Err(ArrowError::InvalidArgumentError(
152                         "all columns in a record batch must have the same length"
153                             .to_string(),
154                     ));
155                 }
156                 if !column
157                     .data_type()
158                     .equals_datatype(schema.field(i).data_type())
159                 {
160                     return Err(ArrowError::InvalidArgumentError(format!(
161                         "column types must match schema types, expected {:?} but found {:?} at column index {}",
162                         schema.field(i).data_type(),
163                         column.data_type(),
164                         i)));
165                 }
166             }
167         }
168 
169         Ok(())
170     }
171 
172     /// Returns the [`Schema`](crate::datatypes::Schema) of the record batch.
schema(&self) -> SchemaRef173     pub fn schema(&self) -> SchemaRef {
174         self.schema.clone()
175     }
176 
177     /// Returns the number of columns in the record batch.
178     ///
179     /// # Example
180     ///
181     /// ```
182     /// use std::sync::Arc;
183     /// use arrow::array::Int32Array;
184     /// use arrow::datatypes::{Schema, Field, DataType};
185     /// use arrow::record_batch::RecordBatch;
186     ///
187     /// # fn main() -> arrow::error::Result<()> {
188     /// let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
189     /// let schema = Schema::new(vec![
190     ///     Field::new("id", DataType::Int32, false)
191     /// ]);
192     ///
193     /// let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(id_array)])?;
194     ///
195     /// assert_eq!(batch.num_columns(), 1);
196     /// # Ok(())
197     /// # }
198     /// ```
num_columns(&self) -> usize199     pub fn num_columns(&self) -> usize {
200         self.columns.len()
201     }
202 
203     /// Returns the number of rows in each column.
204     ///
205     /// # Panics
206     ///
207     /// Panics if the `RecordBatch` contains no columns.
208     ///
209     /// # Example
210     ///
211     /// ```
212     /// use std::sync::Arc;
213     /// use arrow::array::Int32Array;
214     /// use arrow::datatypes::{Schema, Field, DataType};
215     /// use arrow::record_batch::RecordBatch;
216     ///
217     /// # fn main() -> arrow::error::Result<()> {
218     /// let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
219     /// let schema = Schema::new(vec![
220     ///     Field::new("id", DataType::Int32, false)
221     /// ]);
222     ///
223     /// let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(id_array)])?;
224     ///
225     /// assert_eq!(batch.num_rows(), 5);
226     /// # Ok(())
227     /// # }
228     /// ```
num_rows(&self) -> usize229     pub fn num_rows(&self) -> usize {
230         self.columns[0].data().len()
231     }
232 
233     /// Get a reference to a column's array by index.
234     ///
235     /// # Panics
236     ///
237     /// Panics if `index` is outside of `0..num_columns`.
column(&self, index: usize) -> &ArrayRef238     pub fn column(&self, index: usize) -> &ArrayRef {
239         &self.columns[index]
240     }
241 
242     /// Get a reference to all columns in the record batch.
columns(&self) -> &[ArrayRef]243     pub fn columns(&self) -> &[ArrayRef] {
244         &self.columns[..]
245     }
246 }
247 
248 /// Options that control the behaviour used when creating a [`RecordBatch`].
249 #[derive(Debug)]
250 pub struct RecordBatchOptions {
251     /// Match field names of structs and lists. If set to `true`, the names must match.
252     pub match_field_names: bool,
253 }
254 
255 impl Default for RecordBatchOptions {
default() -> Self256     fn default() -> Self {
257         Self {
258             match_field_names: true,
259         }
260     }
261 }
262 
263 impl From<&StructArray> for RecordBatch {
264     /// Create a record batch from struct array.
265     ///
266     /// This currently does not flatten and nested struct types
from(struct_array: &StructArray) -> Self267     fn from(struct_array: &StructArray) -> Self {
268         if let DataType::Struct(fields) = struct_array.data_type() {
269             let schema = Schema::new(fields.clone());
270             let columns = struct_array.boxed_fields.clone();
271             RecordBatch {
272                 schema: Arc::new(schema),
273                 columns,
274             }
275         } else {
276             unreachable!("unable to get datatype as struct")
277         }
278     }
279 }
280 
281 impl From<RecordBatch> for StructArray {
from(batch: RecordBatch) -> Self282     fn from(batch: RecordBatch) -> Self {
283         batch
284             .schema
285             .fields
286             .iter()
287             .zip(batch.columns.iter())
288             .map(|t| (t.0.clone(), t.1.clone()))
289             .collect::<Vec<(Field, ArrayRef)>>()
290             .into()
291     }
292 }
293 
294 /// Trait for types that can read `RecordBatch`'s.
295 pub trait RecordBatchReader: Iterator<Item = Result<RecordBatch>> {
296     /// Returns the schema of this `RecordBatchReader`.
297     ///
298     /// Implementation of this trait should guarantee that all `RecordBatch`'s returned by this
299     /// reader should have the same schema as returned from this method.
schema(&self) -> SchemaRef300     fn schema(&self) -> SchemaRef;
301 
302     /// Reads the next `RecordBatch`.
303     #[deprecated(
304         since = "2.0.0",
305         note = "This method is deprecated in favour of `next` from the trait Iterator."
306     )]
next_batch(&mut self) -> Result<Option<RecordBatch>>307     fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
308         self.next().transpose()
309     }
310 }
311 
312 #[cfg(test)]
313 mod tests {
314     use super::*;
315 
316     use crate::buffer::Buffer;
317 
318     #[test]
create_record_batch()319     fn create_record_batch() {
320         let schema = Schema::new(vec![
321             Field::new("a", DataType::Int32, false),
322             Field::new("b", DataType::Utf8, false),
323         ]);
324 
325         let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
326         let b = StringArray::from(vec!["a", "b", "c", "d", "e"]);
327 
328         let record_batch =
329             RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])
330                 .unwrap();
331 
332         assert_eq!(5, record_batch.num_rows());
333         assert_eq!(2, record_batch.num_columns());
334         assert_eq!(&DataType::Int32, record_batch.schema().field(0).data_type());
335         assert_eq!(&DataType::Utf8, record_batch.schema().field(1).data_type());
336         assert_eq!(5, record_batch.column(0).data().len());
337         assert_eq!(5, record_batch.column(1).data().len());
338     }
339 
340     #[test]
create_record_batch_schema_mismatch()341     fn create_record_batch_schema_mismatch() {
342         let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
343 
344         let a = Int64Array::from(vec![1, 2, 3, 4, 5]);
345 
346         let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]);
347         assert!(!batch.is_ok());
348     }
349 
350     #[test]
create_record_batch_field_name_mismatch()351     fn create_record_batch_field_name_mismatch() {
352         let struct_fields = vec![
353             Field::new("a1", DataType::Int32, false),
354             Field::new(
355                 "a2",
356                 DataType::List(Box::new(Field::new("item", DataType::Int8, false))),
357                 false,
358             ),
359         ];
360         let struct_type = DataType::Struct(struct_fields);
361         let schema = Arc::new(Schema::new(vec![Field::new("a", struct_type, true)]));
362 
363         let a1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
364         let a2_child = Int8Array::from(vec![1, 2, 3, 4]);
365         let a2 = ArrayDataBuilder::new(DataType::List(Box::new(Field::new(
366             "array",
367             DataType::Int8,
368             false,
369         ))))
370         .add_child_data(a2_child.data())
371         .len(2)
372         .add_buffer(Buffer::from(vec![0i32, 3, 4].to_byte_slice()))
373         .build();
374         let a2: ArrayRef = Arc::new(ListArray::from(a2));
375         let a = ArrayDataBuilder::new(DataType::Struct(vec![
376             Field::new("aa1", DataType::Int32, false),
377             Field::new("a2", a2.data_type().clone(), false),
378         ]))
379         .add_child_data(a1.data())
380         .add_child_data(a2.data())
381         .len(2)
382         .build();
383         let a: ArrayRef = Arc::new(StructArray::from(a));
384 
385         // creating the batch with field name validation should fail
386         let batch = RecordBatch::try_new(schema.clone(), vec![a.clone()]);
387         assert!(batch.is_err());
388 
389         // creating the batch without field name validation should pass
390         let options = RecordBatchOptions {
391             match_field_names: false,
392         };
393         let batch = RecordBatch::try_new_with_options(schema, vec![a], &options);
394         assert!(batch.is_ok());
395     }
396 
397     #[test]
create_record_batch_record_mismatch()398     fn create_record_batch_record_mismatch() {
399         let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
400 
401         let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
402         let b = Int32Array::from(vec![1, 2, 3, 4, 5]);
403 
404         let batch =
405             RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]);
406         assert!(!batch.is_ok());
407     }
408 
409     #[test]
create_record_batch_from_struct_array()410     fn create_record_batch_from_struct_array() {
411         let boolean = Arc::new(BooleanArray::from(vec![false, false, true, true]));
412         let int = Arc::new(Int32Array::from(vec![42, 28, 19, 31]));
413         let struct_array = StructArray::from(vec![
414             (
415                 Field::new("b", DataType::Boolean, false),
416                 boolean.clone() as ArrayRef,
417             ),
418             (
419                 Field::new("c", DataType::Int32, false),
420                 int.clone() as ArrayRef,
421             ),
422         ]);
423 
424         let batch = RecordBatch::from(&struct_array);
425         assert_eq!(2, batch.num_columns());
426         assert_eq!(4, batch.num_rows());
427         assert_eq!(
428             struct_array.data_type(),
429             &DataType::Struct(batch.schema().fields().to_vec())
430         );
431         assert_eq!(batch.column(0).as_ref(), boolean.as_ref());
432         assert_eq!(batch.column(1).as_ref(), int.as_ref());
433     }
434 }
435