1 // Licensed to the Apache Software Foundation (ASF) under one 2 // or more contributor license agreements. See the NOTICE file 3 // distributed with this work for additional information 4 // regarding copyright ownership. The ASF licenses this file 5 // to you under the Apache License, Version 2.0 (the 6 // "License"); you may not use this file except in compliance 7 // with the License. You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, 12 // software distributed under the License is distributed on an 13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 // KIND, either express or implied. See the License for the 15 // specific language governing permissions and limitations 16 // under the License. 17 18 //! A two-dimensional batch of column-oriented data with a defined 19 //! [schema](crate::datatypes::Schema). 20 21 use std::sync::Arc; 22 23 use crate::array::*; 24 use crate::datatypes::*; 25 use crate::error::{ArrowError, Result}; 26 27 /// A two-dimensional batch of column-oriented data with a defined 28 /// [schema](crate::datatypes::Schema). 29 /// 30 /// A `RecordBatch` is a two-dimensional dataset of a number of 31 /// contiguous arrays, each the same length. 32 /// A record batch has a schema which must match its arrays’ 33 /// datatypes. 34 /// 35 /// Record batches are a convenient unit of work for various 36 /// serialization and computation functions, possibly incremental. 37 /// See also [CSV reader](crate::csv::Reader) and 38 /// [JSON reader](crate::json::Reader). 39 #[derive(Clone, Debug)] 40 pub struct RecordBatch { 41 schema: SchemaRef, 42 columns: Vec<Arc<Array>>, 43 } 44 45 impl RecordBatch { 46 /// Creates a `RecordBatch` from a schema and columns. 47 /// 48 /// Expects the following: 49 /// * the vec of columns to not be empty 50 /// * the schema and column data types to have equal lengths 51 /// and match 52 /// * each array in columns to have the same length 53 /// 54 /// If the conditions are not met, an error is returned. 55 /// 56 /// # Example 57 /// 58 /// ``` 59 /// use std::sync::Arc; 60 /// use arrow::array::Int32Array; 61 /// use arrow::datatypes::{Schema, Field, DataType}; 62 /// use arrow::record_batch::RecordBatch; 63 /// 64 /// # fn main() -> arrow::error::Result<()> { 65 /// let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]); 66 /// let schema = Schema::new(vec![ 67 /// Field::new("id", DataType::Int32, false) 68 /// ]); 69 /// 70 /// let batch = RecordBatch::try_new( 71 /// Arc::new(schema), 72 /// vec![Arc::new(id_array)] 73 /// )?; 74 /// # Ok(()) 75 /// # } 76 /// ``` try_new(schema: SchemaRef, columns: Vec<ArrayRef>) -> Result<Self>77 pub fn try_new(schema: SchemaRef, columns: Vec<ArrayRef>) -> Result<Self> { 78 let options = RecordBatchOptions::default(); 79 Self::validate_new_batch(&schema, columns.as_slice(), &options)?; 80 Ok(RecordBatch { schema, columns }) 81 } 82 83 /// Creates a `RecordBatch` from a schema and columns, with additional options, 84 /// such as whether to strictly validate field names. 85 /// 86 /// See [`RecordBatch::try_new`] for the expected conditions. try_new_with_options( schema: SchemaRef, columns: Vec<ArrayRef>, options: &RecordBatchOptions, ) -> Result<Self>87 pub fn try_new_with_options( 88 schema: SchemaRef, 89 columns: Vec<ArrayRef>, 90 options: &RecordBatchOptions, 91 ) -> Result<Self> { 92 Self::validate_new_batch(&schema, columns.as_slice(), options)?; 93 Ok(RecordBatch { schema, columns }) 94 } 95 96 /// Creates a new empty [`RecordBatch`]. new_empty(schema: SchemaRef) -> Self97 pub fn new_empty(schema: SchemaRef) -> Self { 98 let columns = schema 99 .fields() 100 .iter() 101 .map(|field| new_empty_array(field.data_type())) 102 .collect(); 103 RecordBatch { schema, columns } 104 } 105 106 /// Validate the schema and columns using [`RecordBatchOptions`]. Returns an error 107 /// if any validation check fails. validate_new_batch( schema: &SchemaRef, columns: &[ArrayRef], options: &RecordBatchOptions, ) -> Result<()>108 fn validate_new_batch( 109 schema: &SchemaRef, 110 columns: &[ArrayRef], 111 options: &RecordBatchOptions, 112 ) -> Result<()> { 113 // check that there are some columns 114 if columns.is_empty() { 115 return Err(ArrowError::InvalidArgumentError( 116 "at least one column must be defined to create a record batch" 117 .to_string(), 118 )); 119 } 120 // check that number of fields in schema match column length 121 if schema.fields().len() != columns.len() { 122 return Err(ArrowError::InvalidArgumentError(format!( 123 "number of columns({}) must match number of fields({}) in schema", 124 columns.len(), 125 schema.fields().len(), 126 ))); 127 } 128 // check that all columns have the same row count, and match the schema 129 let len = columns[0].data().len(); 130 131 // This is a bit repetitive, but it is better to check the condition outside the loop 132 if options.match_field_names { 133 for (i, column) in columns.iter().enumerate() { 134 if column.len() != len { 135 return Err(ArrowError::InvalidArgumentError( 136 "all columns in a record batch must have the same length" 137 .to_string(), 138 )); 139 } 140 if column.data_type() != schema.field(i).data_type() { 141 return Err(ArrowError::InvalidArgumentError(format!( 142 "column types must match schema types, expected {:?} but found {:?} at column index {}", 143 schema.field(i).data_type(), 144 column.data_type(), 145 i))); 146 } 147 } 148 } else { 149 for (i, column) in columns.iter().enumerate() { 150 if column.len() != len { 151 return Err(ArrowError::InvalidArgumentError( 152 "all columns in a record batch must have the same length" 153 .to_string(), 154 )); 155 } 156 if !column 157 .data_type() 158 .equals_datatype(schema.field(i).data_type()) 159 { 160 return Err(ArrowError::InvalidArgumentError(format!( 161 "column types must match schema types, expected {:?} but found {:?} at column index {}", 162 schema.field(i).data_type(), 163 column.data_type(), 164 i))); 165 } 166 } 167 } 168 169 Ok(()) 170 } 171 172 /// Returns the [`Schema`](crate::datatypes::Schema) of the record batch. schema(&self) -> SchemaRef173 pub fn schema(&self) -> SchemaRef { 174 self.schema.clone() 175 } 176 177 /// Returns the number of columns in the record batch. 178 /// 179 /// # Example 180 /// 181 /// ``` 182 /// use std::sync::Arc; 183 /// use arrow::array::Int32Array; 184 /// use arrow::datatypes::{Schema, Field, DataType}; 185 /// use arrow::record_batch::RecordBatch; 186 /// 187 /// # fn main() -> arrow::error::Result<()> { 188 /// let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]); 189 /// let schema = Schema::new(vec![ 190 /// Field::new("id", DataType::Int32, false) 191 /// ]); 192 /// 193 /// let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(id_array)])?; 194 /// 195 /// assert_eq!(batch.num_columns(), 1); 196 /// # Ok(()) 197 /// # } 198 /// ``` num_columns(&self) -> usize199 pub fn num_columns(&self) -> usize { 200 self.columns.len() 201 } 202 203 /// Returns the number of rows in each column. 204 /// 205 /// # Panics 206 /// 207 /// Panics if the `RecordBatch` contains no columns. 208 /// 209 /// # Example 210 /// 211 /// ``` 212 /// use std::sync::Arc; 213 /// use arrow::array::Int32Array; 214 /// use arrow::datatypes::{Schema, Field, DataType}; 215 /// use arrow::record_batch::RecordBatch; 216 /// 217 /// # fn main() -> arrow::error::Result<()> { 218 /// let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]); 219 /// let schema = Schema::new(vec![ 220 /// Field::new("id", DataType::Int32, false) 221 /// ]); 222 /// 223 /// let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(id_array)])?; 224 /// 225 /// assert_eq!(batch.num_rows(), 5); 226 /// # Ok(()) 227 /// # } 228 /// ``` num_rows(&self) -> usize229 pub fn num_rows(&self) -> usize { 230 self.columns[0].data().len() 231 } 232 233 /// Get a reference to a column's array by index. 234 /// 235 /// # Panics 236 /// 237 /// Panics if `index` is outside of `0..num_columns`. column(&self, index: usize) -> &ArrayRef238 pub fn column(&self, index: usize) -> &ArrayRef { 239 &self.columns[index] 240 } 241 242 /// Get a reference to all columns in the record batch. columns(&self) -> &[ArrayRef]243 pub fn columns(&self) -> &[ArrayRef] { 244 &self.columns[..] 245 } 246 } 247 248 /// Options that control the behaviour used when creating a [`RecordBatch`]. 249 #[derive(Debug)] 250 pub struct RecordBatchOptions { 251 /// Match field names of structs and lists. If set to `true`, the names must match. 252 pub match_field_names: bool, 253 } 254 255 impl Default for RecordBatchOptions { default() -> Self256 fn default() -> Self { 257 Self { 258 match_field_names: true, 259 } 260 } 261 } 262 263 impl From<&StructArray> for RecordBatch { 264 /// Create a record batch from struct array. 265 /// 266 /// This currently does not flatten and nested struct types from(struct_array: &StructArray) -> Self267 fn from(struct_array: &StructArray) -> Self { 268 if let DataType::Struct(fields) = struct_array.data_type() { 269 let schema = Schema::new(fields.clone()); 270 let columns = struct_array.boxed_fields.clone(); 271 RecordBatch { 272 schema: Arc::new(schema), 273 columns, 274 } 275 } else { 276 unreachable!("unable to get datatype as struct") 277 } 278 } 279 } 280 281 impl From<RecordBatch> for StructArray { from(batch: RecordBatch) -> Self282 fn from(batch: RecordBatch) -> Self { 283 batch 284 .schema 285 .fields 286 .iter() 287 .zip(batch.columns.iter()) 288 .map(|t| (t.0.clone(), t.1.clone())) 289 .collect::<Vec<(Field, ArrayRef)>>() 290 .into() 291 } 292 } 293 294 /// Trait for types that can read `RecordBatch`'s. 295 pub trait RecordBatchReader: Iterator<Item = Result<RecordBatch>> { 296 /// Returns the schema of this `RecordBatchReader`. 297 /// 298 /// Implementation of this trait should guarantee that all `RecordBatch`'s returned by this 299 /// reader should have the same schema as returned from this method. schema(&self) -> SchemaRef300 fn schema(&self) -> SchemaRef; 301 302 /// Reads the next `RecordBatch`. 303 #[deprecated( 304 since = "2.0.0", 305 note = "This method is deprecated in favour of `next` from the trait Iterator." 306 )] next_batch(&mut self) -> Result<Option<RecordBatch>>307 fn next_batch(&mut self) -> Result<Option<RecordBatch>> { 308 self.next().transpose() 309 } 310 } 311 312 #[cfg(test)] 313 mod tests { 314 use super::*; 315 316 use crate::buffer::Buffer; 317 318 #[test] create_record_batch()319 fn create_record_batch() { 320 let schema = Schema::new(vec![ 321 Field::new("a", DataType::Int32, false), 322 Field::new("b", DataType::Utf8, false), 323 ]); 324 325 let a = Int32Array::from(vec![1, 2, 3, 4, 5]); 326 let b = StringArray::from(vec!["a", "b", "c", "d", "e"]); 327 328 let record_batch = 329 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]) 330 .unwrap(); 331 332 assert_eq!(5, record_batch.num_rows()); 333 assert_eq!(2, record_batch.num_columns()); 334 assert_eq!(&DataType::Int32, record_batch.schema().field(0).data_type()); 335 assert_eq!(&DataType::Utf8, record_batch.schema().field(1).data_type()); 336 assert_eq!(5, record_batch.column(0).data().len()); 337 assert_eq!(5, record_batch.column(1).data().len()); 338 } 339 340 #[test] create_record_batch_schema_mismatch()341 fn create_record_batch_schema_mismatch() { 342 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); 343 344 let a = Int64Array::from(vec![1, 2, 3, 4, 5]); 345 346 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]); 347 assert!(!batch.is_ok()); 348 } 349 350 #[test] create_record_batch_field_name_mismatch()351 fn create_record_batch_field_name_mismatch() { 352 let struct_fields = vec![ 353 Field::new("a1", DataType::Int32, false), 354 Field::new( 355 "a2", 356 DataType::List(Box::new(Field::new("item", DataType::Int8, false))), 357 false, 358 ), 359 ]; 360 let struct_type = DataType::Struct(struct_fields); 361 let schema = Arc::new(Schema::new(vec![Field::new("a", struct_type, true)])); 362 363 let a1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2])); 364 let a2_child = Int8Array::from(vec![1, 2, 3, 4]); 365 let a2 = ArrayDataBuilder::new(DataType::List(Box::new(Field::new( 366 "array", 367 DataType::Int8, 368 false, 369 )))) 370 .add_child_data(a2_child.data()) 371 .len(2) 372 .add_buffer(Buffer::from(vec![0i32, 3, 4].to_byte_slice())) 373 .build(); 374 let a2: ArrayRef = Arc::new(ListArray::from(a2)); 375 let a = ArrayDataBuilder::new(DataType::Struct(vec![ 376 Field::new("aa1", DataType::Int32, false), 377 Field::new("a2", a2.data_type().clone(), false), 378 ])) 379 .add_child_data(a1.data()) 380 .add_child_data(a2.data()) 381 .len(2) 382 .build(); 383 let a: ArrayRef = Arc::new(StructArray::from(a)); 384 385 // creating the batch with field name validation should fail 386 let batch = RecordBatch::try_new(schema.clone(), vec![a.clone()]); 387 assert!(batch.is_err()); 388 389 // creating the batch without field name validation should pass 390 let options = RecordBatchOptions { 391 match_field_names: false, 392 }; 393 let batch = RecordBatch::try_new_with_options(schema, vec![a], &options); 394 assert!(batch.is_ok()); 395 } 396 397 #[test] create_record_batch_record_mismatch()398 fn create_record_batch_record_mismatch() { 399 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); 400 401 let a = Int32Array::from(vec![1, 2, 3, 4, 5]); 402 let b = Int32Array::from(vec![1, 2, 3, 4, 5]); 403 404 let batch = 405 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]); 406 assert!(!batch.is_ok()); 407 } 408 409 #[test] create_record_batch_from_struct_array()410 fn create_record_batch_from_struct_array() { 411 let boolean = Arc::new(BooleanArray::from(vec![false, false, true, true])); 412 let int = Arc::new(Int32Array::from(vec![42, 28, 19, 31])); 413 let struct_array = StructArray::from(vec![ 414 ( 415 Field::new("b", DataType::Boolean, false), 416 boolean.clone() as ArrayRef, 417 ), 418 ( 419 Field::new("c", DataType::Int32, false), 420 int.clone() as ArrayRef, 421 ), 422 ]); 423 424 let batch = RecordBatch::from(&struct_array); 425 assert_eq!(2, batch.num_columns()); 426 assert_eq!(4, batch.num_rows()); 427 assert_eq!( 428 struct_array.data_type(), 429 &DataType::Struct(batch.schema().fields().to_vec()) 430 ); 431 assert_eq!(batch.column(0).as_ref(), boolean.as_ref()); 432 assert_eq!(batch.column(1).as_ref(), int.as_ref()); 433 } 434 } 435