1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include "arrow/table.h"
19 
20 #include <algorithm>
21 #include <cstdlib>
22 #include <limits>
23 #include <memory>
24 #include <sstream>
25 #include <utility>
26 
27 #include "arrow/array/array_base.h"
28 #include "arrow/array/array_binary.h"
29 #include "arrow/array/array_nested.h"
30 #include "arrow/array/concatenate.h"
31 #include "arrow/array/util.h"
32 #include "arrow/chunked_array.h"
33 #include "arrow/pretty_print.h"
34 #include "arrow/record_batch.h"
35 #include "arrow/result.h"
36 #include "arrow/status.h"
37 #include "arrow/type.h"
38 #include "arrow/type_fwd.h"
39 #include "arrow/type_traits.h"
40 #include "arrow/util/checked_cast.h"
41 #include "arrow/util/logging.h"
42 #include "arrow/util/vector.h"
43 
44 namespace arrow {
45 
46 using internal::checked_cast;
47 
48 class KeyValueMetadata;
49 class MemoryPool;
50 struct ArrayData;
51 
52 // ----------------------------------------------------------------------
53 // Table methods
54 
55 /// \class SimpleTable
56 /// \brief A basic, non-lazy in-memory table, like SimpleRecordBatch
57 class SimpleTable : public Table {
58  public:
SimpleTable(std::shared_ptr<Schema> schema,std::vector<std::shared_ptr<ChunkedArray>> columns,int64_t num_rows=-1)59   SimpleTable(std::shared_ptr<Schema> schema,
60               std::vector<std::shared_ptr<ChunkedArray>> columns, int64_t num_rows = -1)
61       : columns_(std::move(columns)) {
62     schema_ = std::move(schema);
63     if (num_rows < 0) {
64       if (columns_.size() == 0) {
65         num_rows_ = 0;
66       } else {
67         num_rows_ = columns_[0]->length();
68       }
69     } else {
70       num_rows_ = num_rows;
71     }
72   }
73 
SimpleTable(std::shared_ptr<Schema> schema,const std::vector<std::shared_ptr<Array>> & columns,int64_t num_rows=-1)74   SimpleTable(std::shared_ptr<Schema> schema,
75               const std::vector<std::shared_ptr<Array>>& columns, int64_t num_rows = -1) {
76     schema_ = std::move(schema);
77     if (num_rows < 0) {
78       if (columns.size() == 0) {
79         num_rows_ = 0;
80       } else {
81         num_rows_ = columns[0]->length();
82       }
83     } else {
84       num_rows_ = num_rows;
85     }
86 
87     columns_.resize(columns.size());
88     for (size_t i = 0; i < columns.size(); ++i) {
89       columns_[i] = std::make_shared<ChunkedArray>(columns[i]);
90     }
91   }
92 
column(int i) const93   std::shared_ptr<ChunkedArray> column(int i) const override { return columns_[i]; }
94 
columns() const95   const std::vector<std::shared_ptr<ChunkedArray>>& columns() const override {
96     return columns_;
97   }
98 
Slice(int64_t offset,int64_t length) const99   std::shared_ptr<Table> Slice(int64_t offset, int64_t length) const override {
100     auto sliced = columns_;
101     int64_t num_rows = length;
102     for (auto& column : sliced) {
103       column = column->Slice(offset, length);
104       num_rows = column->length();
105     }
106     return Table::Make(schema_, std::move(sliced), num_rows);
107   }
108 
RemoveColumn(int i) const109   Result<std::shared_ptr<Table>> RemoveColumn(int i) const override {
110     ARROW_ASSIGN_OR_RAISE(auto new_schema, schema_->RemoveField(i));
111 
112     return Table::Make(std::move(new_schema), internal::DeleteVectorElement(columns_, i),
113                        this->num_rows());
114   }
115 
AddColumn(int i,std::shared_ptr<Field> field_arg,std::shared_ptr<ChunkedArray> col) const116   Result<std::shared_ptr<Table>> AddColumn(
117       int i, std::shared_ptr<Field> field_arg,
118       std::shared_ptr<ChunkedArray> col) const override {
119     DCHECK(col != nullptr);
120 
121     if (col->length() != num_rows_) {
122       return Status::Invalid(
123           "Added column's length must match table's length. Expected length ", num_rows_,
124           " but got length ", col->length());
125     }
126 
127     if (!field_arg->type()->Equals(col->type())) {
128       return Status::Invalid("Field type did not match data type");
129     }
130 
131     ARROW_ASSIGN_OR_RAISE(auto new_schema, schema_->AddField(i, field_arg));
132     return Table::Make(std::move(new_schema),
133                        internal::AddVectorElement(columns_, i, std::move(col)));
134   }
135 
SetColumn(int i,std::shared_ptr<Field> field_arg,std::shared_ptr<ChunkedArray> col) const136   Result<std::shared_ptr<Table>> SetColumn(
137       int i, std::shared_ptr<Field> field_arg,
138       std::shared_ptr<ChunkedArray> col) const override {
139     DCHECK(col != nullptr);
140 
141     if (col->length() != num_rows_) {
142       return Status::Invalid(
143           "Added column's length must match table's length. Expected length ", num_rows_,
144           " but got length ", col->length());
145     }
146 
147     if (!field_arg->type()->Equals(col->type())) {
148       return Status::Invalid("Field type did not match data type");
149     }
150 
151     ARROW_ASSIGN_OR_RAISE(auto new_schema, schema_->SetField(i, field_arg));
152     return Table::Make(std::move(new_schema),
153                        internal::ReplaceVectorElement(columns_, i, std::move(col)));
154   }
155 
ReplaceSchemaMetadata(const std::shared_ptr<const KeyValueMetadata> & metadata) const156   std::shared_ptr<Table> ReplaceSchemaMetadata(
157       const std::shared_ptr<const KeyValueMetadata>& metadata) const override {
158     auto new_schema = schema_->WithMetadata(metadata);
159     return Table::Make(std::move(new_schema), columns_);
160   }
161 
Flatten(MemoryPool * pool) const162   Result<std::shared_ptr<Table>> Flatten(MemoryPool* pool) const override {
163     std::vector<std::shared_ptr<Field>> flattened_fields;
164     std::vector<std::shared_ptr<ChunkedArray>> flattened_columns;
165     for (int i = 0; i < num_columns(); ++i) {
166       std::vector<std::shared_ptr<Field>> new_fields = field(i)->Flatten();
167       ARROW_ASSIGN_OR_RAISE(auto new_columns, column(i)->Flatten(pool));
168       DCHECK_EQ(new_columns.size(), new_fields.size());
169       for (size_t j = 0; j < new_columns.size(); ++j) {
170         flattened_fields.push_back(new_fields[j]);
171         flattened_columns.push_back(new_columns[j]);
172       }
173     }
174     auto flattened_schema =
175         std::make_shared<Schema>(std::move(flattened_fields), schema_->metadata());
176     return Table::Make(std::move(flattened_schema), std::move(flattened_columns));
177   }
178 
Validate() const179   Status Validate() const override {
180     RETURN_NOT_OK(ValidateMeta());
181     for (int i = 0; i < num_columns(); ++i) {
182       const ChunkedArray* col = columns_[i].get();
183       Status st = col->Validate();
184       if (!st.ok()) {
185         std::stringstream ss;
186         ss << "Column " << i << ": " << st.message();
187         return st.WithMessage(ss.str());
188       }
189     }
190     return Status::OK();
191   }
192 
ValidateFull() const193   Status ValidateFull() const override {
194     RETURN_NOT_OK(ValidateMeta());
195     for (int i = 0; i < num_columns(); ++i) {
196       const ChunkedArray* col = columns_[i].get();
197       Status st = col->ValidateFull();
198       if (!st.ok()) {
199         std::stringstream ss;
200         ss << "Column " << i << ": " << st.message();
201         return st.WithMessage(ss.str());
202       }
203     }
204     return Status::OK();
205   }
206 
207  protected:
ValidateMeta() const208   Status ValidateMeta() const {
209     // Make sure columns and schema are consistent
210     if (static_cast<int>(columns_.size()) != schema_->num_fields()) {
211       return Status::Invalid("Number of columns did not match schema");
212     }
213     for (int i = 0; i < num_columns(); ++i) {
214       const ChunkedArray* col = columns_[i].get();
215       if (col == nullptr) {
216         return Status::Invalid("Column ", i, " was null");
217       }
218       if (!col->type()->Equals(*schema_->field(i)->type())) {
219         return Status::Invalid("Column data for field ", i, " with type ",
220                                col->type()->ToString(), " is inconsistent with schema ",
221                                schema_->field(i)->type()->ToString());
222       }
223     }
224 
225     // Make sure columns are all the same length, and validate them
226     for (int i = 0; i < num_columns(); ++i) {
227       const ChunkedArray* col = columns_[i].get();
228       if (col->length() != num_rows_) {
229         return Status::Invalid("Column ", i, " named ", field(i)->name(),
230                                " expected length ", num_rows_, " but got length ",
231                                col->length());
232       }
233       Status st = col->Validate();
234       if (!st.ok()) {
235         std::stringstream ss;
236         ss << "Column " << i << ": " << st.message();
237         return st.WithMessage(ss.str());
238       }
239     }
240     return Status::OK();
241   }
242 
243  private:
244   std::vector<std::shared_ptr<ChunkedArray>> columns_;
245 };
246 
Table()247 Table::Table() : num_rows_(0) {}
248 
fields() const249 std::vector<std::shared_ptr<Field>> Table::fields() const {
250   std::vector<std::shared_ptr<Field>> result;
251   for (int i = 0; i < this->num_columns(); ++i) {
252     result.emplace_back(this->field(i));
253   }
254   return result;
255 }
256 
Make(std::shared_ptr<Schema> schema,std::vector<std::shared_ptr<ChunkedArray>> columns,int64_t num_rows)257 std::shared_ptr<Table> Table::Make(std::shared_ptr<Schema> schema,
258                                    std::vector<std::shared_ptr<ChunkedArray>> columns,
259                                    int64_t num_rows) {
260   return std::make_shared<SimpleTable>(std::move(schema), std::move(columns), num_rows);
261 }
262 
Make(std::shared_ptr<Schema> schema,const std::vector<std::shared_ptr<Array>> & arrays,int64_t num_rows)263 std::shared_ptr<Table> Table::Make(std::shared_ptr<Schema> schema,
264                                    const std::vector<std::shared_ptr<Array>>& arrays,
265                                    int64_t num_rows) {
266   return std::make_shared<SimpleTable>(std::move(schema), arrays, num_rows);
267 }
268 
FromRecordBatchReader(RecordBatchReader * reader)269 Result<std::shared_ptr<Table>> Table::FromRecordBatchReader(RecordBatchReader* reader) {
270   std::shared_ptr<Table> table = nullptr;
271   RETURN_NOT_OK(reader->ReadAll(&table));
272   return table;
273 }
274 
FromRecordBatches(std::shared_ptr<Schema> schema,const std::vector<std::shared_ptr<RecordBatch>> & batches)275 Result<std::shared_ptr<Table>> Table::FromRecordBatches(
276     std::shared_ptr<Schema> schema,
277     const std::vector<std::shared_ptr<RecordBatch>>& batches) {
278   const int nbatches = static_cast<int>(batches.size());
279   const int ncolumns = static_cast<int>(schema->num_fields());
280 
281   int64_t num_rows = 0;
282   for (int i = 0; i < nbatches; ++i) {
283     if (!batches[i]->schema()->Equals(*schema, false)) {
284       return Status::Invalid("Schema at index ", static_cast<int>(i),
285                              " was different: \n", schema->ToString(), "\nvs\n",
286                              batches[i]->schema()->ToString());
287     }
288     num_rows += batches[i]->num_rows();
289   }
290 
291   std::vector<std::shared_ptr<ChunkedArray>> columns(ncolumns);
292   std::vector<std::shared_ptr<Array>> column_arrays(nbatches);
293 
294   for (int i = 0; i < ncolumns; ++i) {
295     for (int j = 0; j < nbatches; ++j) {
296       column_arrays[j] = batches[j]->column(i);
297     }
298     columns[i] = std::make_shared<ChunkedArray>(column_arrays, schema->field(i)->type());
299   }
300 
301   return Table::Make(std::move(schema), std::move(columns), num_rows);
302 }
303 
FromRecordBatches(const std::vector<std::shared_ptr<RecordBatch>> & batches)304 Result<std::shared_ptr<Table>> Table::FromRecordBatches(
305     const std::vector<std::shared_ptr<RecordBatch>>& batches) {
306   if (batches.size() == 0) {
307     return Status::Invalid("Must pass at least one record batch or an explicit Schema");
308   }
309 
310   return FromRecordBatches(batches[0]->schema(), batches);
311 }
312 
FromChunkedStructArray(const std::shared_ptr<ChunkedArray> & array)313 Result<std::shared_ptr<Table>> Table::FromChunkedStructArray(
314     const std::shared_ptr<ChunkedArray>& array) {
315   auto type = array->type();
316   if (type->id() != Type::STRUCT) {
317     return Status::Invalid("Expected a chunked struct array, got ", *type);
318   }
319   int num_columns = type->num_fields();
320   int num_chunks = array->num_chunks();
321 
322   const auto& struct_chunks = array->chunks();
323   std::vector<std::shared_ptr<ChunkedArray>> columns(num_columns);
324   for (int i = 0; i < num_columns; ++i) {
325     ArrayVector chunks(num_chunks);
326     std::transform(struct_chunks.begin(), struct_chunks.end(), chunks.begin(),
327                    [i](const std::shared_ptr<Array>& struct_chunk) {
328                      return static_cast<const StructArray&>(*struct_chunk).field(i);
329                    });
330     columns[i] =
331         std::make_shared<ChunkedArray>(std::move(chunks), type->field(i)->type());
332   }
333 
334   return Table::Make(::arrow::schema(type->fields()), std::move(columns),
335                      array->length());
336 }
337 
ColumnNames() const338 std::vector<std::string> Table::ColumnNames() const {
339   std::vector<std::string> names(num_columns());
340   for (int i = 0; i < num_columns(); ++i) {
341     names[i] = field(i)->name();
342   }
343   return names;
344 }
345 
RenameColumns(const std::vector<std::string> & names) const346 Result<std::shared_ptr<Table>> Table::RenameColumns(
347     const std::vector<std::string>& names) const {
348   if (names.size() != static_cast<size_t>(num_columns())) {
349     return Status::Invalid("tried to rename a table of ", num_columns(),
350                            " columns but only ", names.size(), " names were provided");
351   }
352   std::vector<std::shared_ptr<ChunkedArray>> columns(num_columns());
353   std::vector<std::shared_ptr<Field>> fields(num_columns());
354   for (int i = 0; i < num_columns(); ++i) {
355     columns[i] = column(i);
356     fields[i] = field(i)->WithName(names[i]);
357   }
358   return Table::Make(::arrow::schema(std::move(fields)), std::move(columns), num_rows());
359 }
360 
SelectColumns(const std::vector<int> & indices) const361 Result<std::shared_ptr<Table>> Table::SelectColumns(
362     const std::vector<int>& indices) const {
363   int n = static_cast<int>(indices.size());
364 
365   std::vector<std::shared_ptr<ChunkedArray>> columns(n);
366   std::vector<std::shared_ptr<Field>> fields(n);
367   for (int i = 0; i < n; i++) {
368     int pos = indices[i];
369     if (pos < 0 || pos > num_columns() - 1) {
370       return Status::Invalid("Invalid column index ", pos, " to select columns.");
371     }
372     columns[i] = column(pos);
373     fields[i] = field(pos);
374   }
375 
376   auto new_schema =
377       std::make_shared<arrow::Schema>(std::move(fields), schema()->metadata());
378   return Table::Make(std::move(new_schema), std::move(columns), num_rows());
379 }
380 
ToString() const381 std::string Table::ToString() const {
382   std::stringstream ss;
383   ARROW_CHECK_OK(PrettyPrint(*this, 0, &ss));
384   return ss.str();
385 }
386 
ConcatenateTables(const std::vector<std::shared_ptr<Table>> & tables,const ConcatenateTablesOptions options,MemoryPool * memory_pool)387 Result<std::shared_ptr<Table>> ConcatenateTables(
388     const std::vector<std::shared_ptr<Table>>& tables,
389     const ConcatenateTablesOptions options, MemoryPool* memory_pool) {
390   if (tables.size() == 0) {
391     return Status::Invalid("Must pass at least one table");
392   }
393 
394   std::vector<std::shared_ptr<Table>> promoted_tables;
395   const std::vector<std::shared_ptr<Table>>* tables_to_concat = &tables;
396   if (options.unify_schemas) {
397     std::vector<std::shared_ptr<Schema>> schemas;
398     schemas.reserve(tables.size());
399     for (const auto& t : tables) {
400       schemas.push_back(t->schema());
401     }
402 
403     ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Schema> unified_schema,
404                           UnifySchemas(schemas, options.field_merge_options));
405 
406     promoted_tables.reserve(tables.size());
407     for (const auto& t : tables) {
408       promoted_tables.emplace_back();
409       ARROW_ASSIGN_OR_RAISE(promoted_tables.back(),
410                             PromoteTableToSchema(t, unified_schema, memory_pool));
411     }
412     tables_to_concat = &promoted_tables;
413   } else {
414     auto first_schema = tables[0]->schema();
415     for (size_t i = 1; i < tables.size(); ++i) {
416       if (!tables[i]->schema()->Equals(*first_schema, false)) {
417         return Status::Invalid("Schema at index ", i, " was different: \n",
418                                first_schema->ToString(), "\nvs\n",
419                                tables[i]->schema()->ToString());
420       }
421     }
422   }
423 
424   std::shared_ptr<Schema> schema = tables_to_concat->front()->schema();
425 
426   const int ncolumns = schema->num_fields();
427 
428   std::vector<std::shared_ptr<ChunkedArray>> columns(ncolumns);
429   for (int i = 0; i < ncolumns; ++i) {
430     std::vector<std::shared_ptr<Array>> column_arrays;
431     for (const auto& table : *tables_to_concat) {
432       const std::vector<std::shared_ptr<Array>>& chunks = table->column(i)->chunks();
433       for (const auto& chunk : chunks) {
434         column_arrays.push_back(chunk);
435       }
436     }
437     columns[i] = std::make_shared<ChunkedArray>(column_arrays, schema->field(i)->type());
438   }
439   return Table::Make(std::move(schema), std::move(columns));
440 }
441 
PromoteTableToSchema(const std::shared_ptr<Table> & table,const std::shared_ptr<Schema> & schema,MemoryPool * pool)442 Result<std::shared_ptr<Table>> PromoteTableToSchema(const std::shared_ptr<Table>& table,
443                                                     const std::shared_ptr<Schema>& schema,
444                                                     MemoryPool* pool) {
445   const std::shared_ptr<Schema> current_schema = table->schema();
446   if (current_schema->Equals(*schema, /*check_metadata=*/false)) {
447     return table->ReplaceSchemaMetadata(schema->metadata());
448   }
449 
450   // fields_seen[i] == true iff that field is also in `schema`.
451   std::vector<bool> fields_seen(current_schema->num_fields(), false);
452 
453   std::vector<std::shared_ptr<ChunkedArray>> columns;
454   columns.reserve(schema->num_fields());
455   const int64_t num_rows = table->num_rows();
456   auto AppendColumnOfNulls = [pool, &columns,
457                               num_rows](const std::shared_ptr<DataType>& type) {
458     // TODO(bkietz): share the zero-filled buffers as much as possible across
459     // the null-filled arrays created here.
460     ARROW_ASSIGN_OR_RAISE(auto array_of_nulls, MakeArrayOfNull(type, num_rows, pool));
461     columns.push_back(std::make_shared<ChunkedArray>(array_of_nulls));
462     return Status::OK();
463   };
464 
465   for (const auto& field : schema->fields()) {
466     const std::vector<int> field_indices =
467         current_schema->GetAllFieldIndices(field->name());
468     if (field_indices.empty()) {
469       RETURN_NOT_OK(AppendColumnOfNulls(field->type()));
470       continue;
471     }
472 
473     if (field_indices.size() > 1) {
474       return Status::Invalid(
475           "PromoteTableToSchema cannot handle schemas with duplicate fields: ",
476           field->name());
477     }
478 
479     const int field_index = field_indices[0];
480     const auto& current_field = current_schema->field(field_index);
481     if (!field->nullable() && current_field->nullable()) {
482       return Status::Invalid("Unable to promote field ", current_field->name(),
483                              ": it was nullable but the target schema was not.");
484     }
485 
486     fields_seen[field_index] = true;
487     if (current_field->type()->Equals(field->type())) {
488       columns.push_back(table->column(field_index));
489       continue;
490     }
491 
492     if (current_field->type()->id() == Type::NA) {
493       RETURN_NOT_OK(AppendColumnOfNulls(field->type()));
494       continue;
495     }
496 
497     return Status::Invalid("Unable to promote field ", field->name(),
498                            ": incompatible types: ", field->type()->ToString(), " vs ",
499                            current_field->type()->ToString());
500   }
501 
502   auto unseen_field_iter = std::find(fields_seen.begin(), fields_seen.end(), false);
503   if (unseen_field_iter != fields_seen.end()) {
504     const size_t unseen_field_index = unseen_field_iter - fields_seen.begin();
505     return Status::Invalid(
506         "Incompatible schemas: field ",
507         current_schema->field(static_cast<int>(unseen_field_index))->name(),
508         " did not exist in the new schema.");
509   }
510 
511   return Table::Make(schema, std::move(columns));
512 }
513 
Equals(const Table & other,bool check_metadata) const514 bool Table::Equals(const Table& other, bool check_metadata) const {
515   if (this == &other) {
516     return true;
517   }
518   if (!schema_->Equals(*other.schema(), check_metadata)) {
519     return false;
520   }
521   if (this->num_columns() != other.num_columns()) {
522     return false;
523   }
524 
525   for (int i = 0; i < this->num_columns(); i++) {
526     if (!this->column(i)->Equals(other.column(i))) {
527       return false;
528     }
529   }
530   return true;
531 }
532 
CombineChunks(MemoryPool * pool) const533 Result<std::shared_ptr<Table>> Table::CombineChunks(MemoryPool* pool) const {
534   const int ncolumns = num_columns();
535   std::vector<std::shared_ptr<ChunkedArray>> compacted_columns(ncolumns);
536   for (int i = 0; i < ncolumns; ++i) {
537     const auto& col = column(i);
538     if (col->num_chunks() <= 1) {
539       compacted_columns[i] = col;
540       continue;
541     }
542 
543     if (is_binary_like(col->type()->id())) {
544       // ARROW-5744 Allow binary columns to be combined into multiple chunks to avoid
545       // buffer overflow
546       ArrayVector chunks;
547       int chunk_i = 0;
548       while (chunk_i < col->num_chunks()) {
549         ArrayVector safe_chunks;
550         int64_t data_length = 0;
551         for (; chunk_i < col->num_chunks(); ++chunk_i) {
552           const auto& chunk = col->chunk(chunk_i);
553           data_length += checked_cast<const BinaryArray&>(*chunk).total_values_length();
554           if (data_length >= kBinaryMemoryLimit) {
555             break;
556           }
557           safe_chunks.push_back(chunk);
558         }
559         chunks.emplace_back();
560         ARROW_ASSIGN_OR_RAISE(chunks.back(), Concatenate(safe_chunks, pool));
561       }
562       compacted_columns[i] = std::make_shared<ChunkedArray>(std::move(chunks));
563     } else {
564       ARROW_ASSIGN_OR_RAISE(auto compacted, Concatenate(col->chunks(), pool));
565       compacted_columns[i] = std::make_shared<ChunkedArray>(compacted);
566     }
567   }
568   return Table::Make(schema(), std::move(compacted_columns), num_rows_);
569 }
570 
571 // ----------------------------------------------------------------------
572 // Convert a table to a sequence of record batches
573 
TableBatchReader(const Table & table)574 TableBatchReader::TableBatchReader(const Table& table)
575     : table_(table),
576       column_data_(table.num_columns()),
577       chunk_numbers_(table.num_columns(), 0),
578       chunk_offsets_(table.num_columns(), 0),
579       absolute_row_position_(0),
580       max_chunksize_(std::numeric_limits<int64_t>::max()) {
581   for (int i = 0; i < table.num_columns(); ++i) {
582     column_data_[i] = table.column(i).get();
583   }
584 }
585 
schema() const586 std::shared_ptr<Schema> TableBatchReader::schema() const { return table_.schema(); }
587 
set_chunksize(int64_t chunksize)588 void TableBatchReader::set_chunksize(int64_t chunksize) { max_chunksize_ = chunksize; }
589 
ReadNext(std::shared_ptr<RecordBatch> * out)590 Status TableBatchReader::ReadNext(std::shared_ptr<RecordBatch>* out) {
591   if (absolute_row_position_ == table_.num_rows()) {
592     *out = nullptr;
593     return Status::OK();
594   }
595 
596   // Determine the minimum contiguous slice across all columns
597   int64_t chunksize = std::min(table_.num_rows(), max_chunksize_);
598   std::vector<const Array*> chunks(table_.num_columns());
599   for (int i = 0; i < table_.num_columns(); ++i) {
600     auto chunk = column_data_[i]->chunk(chunk_numbers_[i]).get();
601     int64_t chunk_remaining = chunk->length() - chunk_offsets_[i];
602 
603     if (chunk_remaining < chunksize) {
604       chunksize = chunk_remaining;
605     }
606 
607     chunks[i] = chunk;
608   }
609 
610   // Slice chunks and advance chunk index as appropriate
611   std::vector<std::shared_ptr<ArrayData>> batch_data(table_.num_columns());
612 
613   for (int i = 0; i < table_.num_columns(); ++i) {
614     // Exhausted chunk
615     const Array* chunk = chunks[i];
616     const int64_t offset = chunk_offsets_[i];
617     std::shared_ptr<ArrayData> slice_data;
618     if ((chunk->length() - offset) == chunksize) {
619       ++chunk_numbers_[i];
620       chunk_offsets_[i] = 0;
621       if (offset > 0) {
622         // Need to slice
623         slice_data = chunk->Slice(offset, chunksize)->data();
624       } else {
625         // No slice
626         slice_data = chunk->data();
627       }
628     } else {
629       chunk_offsets_[i] += chunksize;
630       slice_data = chunk->Slice(offset, chunksize)->data();
631     }
632     batch_data[i] = std::move(slice_data);
633   }
634 
635   absolute_row_position_ += chunksize;
636   *out = RecordBatch::Make(table_.schema(), chunksize, std::move(batch_data));
637 
638   return Status::OK();
639 }
640 
641 }  // namespace arrow
642