1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include "arrow/table.h"
19 
20 #include <algorithm>
21 #include <cstdlib>
22 #include <limits>
23 #include <memory>
24 #include <sstream>
25 #include <utility>
26 
27 #include "arrow/array.h"
28 #include "arrow/array/concatenate.h"
29 #include "arrow/array/validate.h"
30 #include "arrow/pretty_print.h"
31 #include "arrow/record_batch.h"
32 #include "arrow/status.h"
33 #include "arrow/type.h"
34 #include "arrow/util/checked_cast.h"
35 #include "arrow/util/logging.h"
36 #include "arrow/util/vector.h"
37 
38 namespace arrow {
39 
40 using internal::checked_cast;
41 
42 // ----------------------------------------------------------------------
43 // ChunkedArray methods
44 
ChunkedArray(ArrayVector chunks)45 ChunkedArray::ChunkedArray(ArrayVector chunks) : chunks_(std::move(chunks)) {
46   length_ = 0;
47   null_count_ = 0;
48 
49   ARROW_CHECK_GT(chunks_.size(), 0)
50       << "cannot construct ChunkedArray from empty vector and omitted type";
51   type_ = chunks_[0]->type();
52   for (const std::shared_ptr<Array>& chunk : chunks_) {
53     length_ += chunk->length();
54     null_count_ += chunk->null_count();
55   }
56 }
57 
ChunkedArray(ArrayVector chunks,std::shared_ptr<DataType> type)58 ChunkedArray::ChunkedArray(ArrayVector chunks, std::shared_ptr<DataType> type)
59     : chunks_(std::move(chunks)), type_(std::move(type)) {
60   length_ = 0;
61   null_count_ = 0;
62   for (const std::shared_ptr<Array>& chunk : chunks_) {
63     length_ += chunk->length();
64     null_count_ += chunk->null_count();
65   }
66 }
67 
Equals(const ChunkedArray & other) const68 bool ChunkedArray::Equals(const ChunkedArray& other) const {
69   if (length_ != other.length()) {
70     return false;
71   }
72   if (null_count_ != other.null_count()) {
73     return false;
74   }
75   // We cannot toggle check_metadata here yet, so we don't check it
76   if (!type_->Equals(*other.type_, /*check_metadata=*/false)) {
77     return false;
78   }
79 
80   // Check contents of the underlying arrays. This checks for equality of
81   // the underlying data independently of the chunk size.
82   return internal::ApplyBinaryChunked(
83              *this, other,
84              [](const Array& left_piece, const Array& right_piece,
85                 int64_t ARROW_ARG_UNUSED(position)) {
86                if (!left_piece.Equals(right_piece)) {
87                  return Status::Invalid("Unequal piece");
88                }
89                return Status::OK();
90              })
91       .ok();
92 }
93 
Equals(const std::shared_ptr<ChunkedArray> & other) const94 bool ChunkedArray::Equals(const std::shared_ptr<ChunkedArray>& other) const {
95   if (this == other.get()) {
96     return true;
97   }
98   if (!other) {
99     return false;
100   }
101   return Equals(*other.get());
102 }
103 
Slice(int64_t offset,int64_t length) const104 std::shared_ptr<ChunkedArray> ChunkedArray::Slice(int64_t offset, int64_t length) const {
105   ARROW_CHECK_LE(offset, length_) << "Slice offset greater than array length";
106   bool offset_equals_length = offset == length_;
107   int curr_chunk = 0;
108   while (curr_chunk < num_chunks() && offset >= chunk(curr_chunk)->length()) {
109     offset -= chunk(curr_chunk)->length();
110     curr_chunk++;
111   }
112 
113   ArrayVector new_chunks;
114   if (num_chunks() > 0 && (offset_equals_length || length == 0)) {
115     // Special case the zero-length slice to make sure there is at least 1 Array
116     // in the result. When there are zero chunks we return zero chunks
117     new_chunks.push_back(chunk(std::min(curr_chunk, num_chunks() - 1))->Slice(0, 0));
118   } else {
119     while (curr_chunk < num_chunks() && length > 0) {
120       new_chunks.push_back(chunk(curr_chunk)->Slice(offset, length));
121       length -= chunk(curr_chunk)->length() - offset;
122       offset = 0;
123       curr_chunk++;
124     }
125   }
126 
127   return std::make_shared<ChunkedArray>(new_chunks, type_);
128 }
129 
Slice(int64_t offset) const130 std::shared_ptr<ChunkedArray> ChunkedArray::Slice(int64_t offset) const {
131   return Slice(offset, length_);
132 }
133 
Flatten(MemoryPool * pool,std::vector<std::shared_ptr<ChunkedArray>> * out) const134 Status ChunkedArray::Flatten(MemoryPool* pool,
135                              std::vector<std::shared_ptr<ChunkedArray>>* out) const {
136   return Flatten(pool).Value(out);
137 }
138 
Flatten(MemoryPool * pool) const139 Result<std::vector<std::shared_ptr<ChunkedArray>>> ChunkedArray::Flatten(
140     MemoryPool* pool) const {
141   if (type()->id() != Type::STRUCT) {
142     // Emulate nonexistent copy constructor
143     return std::vector<std::shared_ptr<ChunkedArray>>{
144         std::make_shared<ChunkedArray>(chunks_, type_)};
145   }
146 
147   std::vector<ArrayVector> flattened_chunks(type()->num_fields());
148   for (const auto& chunk : chunks_) {
149     ARROW_ASSIGN_OR_RAISE(auto arrays,
150                           checked_cast<const StructArray&>(*chunk).Flatten(pool));
151     DCHECK_EQ(arrays.size(), flattened_chunks.size());
152     for (size_t i = 0; i < arrays.size(); ++i) {
153       flattened_chunks[i].push_back(arrays[i]);
154     }
155   }
156 
157   std::vector<std::shared_ptr<ChunkedArray>> flattened(type()->num_fields());
158   for (size_t i = 0; i < flattened.size(); ++i) {
159     auto child_type = type()->field(static_cast<int>(i))->type();
160     flattened[i] =
161         std::make_shared<ChunkedArray>(std::move(flattened_chunks[i]), child_type);
162   }
163   return flattened;
164 }
165 
View(const std::shared_ptr<DataType> & type) const166 Result<std::shared_ptr<ChunkedArray>> ChunkedArray::View(
167     const std::shared_ptr<DataType>& type) const {
168   ArrayVector out_chunks(this->num_chunks());
169   for (int i = 0; i < this->num_chunks(); ++i) {
170     ARROW_ASSIGN_OR_RAISE(out_chunks[i], chunks_[i]->View(type));
171   }
172   return std::make_shared<ChunkedArray>(out_chunks, type);
173 }
174 
View(const std::shared_ptr<DataType> & type,std::shared_ptr<ChunkedArray> * out) const175 Status ChunkedArray::View(const std::shared_ptr<DataType>& type,
176                           std::shared_ptr<ChunkedArray>* out) const {
177   return View(type).Value(out);
178 }
179 
ToString() const180 std::string ChunkedArray::ToString() const {
181   std::stringstream ss;
182   ARROW_CHECK_OK(PrettyPrint(*this, 0, &ss));
183   return ss.str();
184 }
185 
Validate() const186 Status ChunkedArray::Validate() const {
187   if (chunks_.size() == 0) {
188     return Status::OK();
189   }
190 
191   const auto& type = *chunks_[0]->type();
192   // Make sure chunks all have the same type
193   for (size_t i = 1; i < chunks_.size(); ++i) {
194     const Array& chunk = *chunks_[i];
195     if (!chunk.type()->Equals(type)) {
196       return Status::Invalid("In chunk ", i, " expected type ", type.ToString(),
197                              " but saw ", chunk.type()->ToString());
198     }
199   }
200   // Validate the chunks themselves
201   for (size_t i = 0; i < chunks_.size(); ++i) {
202     const Array& chunk = *chunks_[i];
203     const Status st = internal::ValidateArray(chunk);
204     if (!st.ok()) {
205       return Status::Invalid("In chunk ", i, ": ", st.ToString());
206     }
207   }
208   return Status::OK();
209 }
210 
ValidateFull() const211 Status ChunkedArray::ValidateFull() const {
212   RETURN_NOT_OK(Validate());
213   for (size_t i = 0; i < chunks_.size(); ++i) {
214     const Array& chunk = *chunks_[i];
215     const Status st = internal::ValidateArrayData(chunk);
216     if (!st.ok()) {
217       return Status::Invalid("In chunk ", i, ": ", st.ToString());
218     }
219   }
220   return Status::OK();
221 }
222 
223 namespace internal {
224 
Next(std::shared_ptr<Array> * next_left,std::shared_ptr<Array> * next_right)225 bool MultipleChunkIterator::Next(std::shared_ptr<Array>* next_left,
226                                  std::shared_ptr<Array>* next_right) {
227   if (pos_ == length_) return false;
228 
229   // Find non-empty chunk
230   std::shared_ptr<Array> chunk_left, chunk_right;
231   while (true) {
232     chunk_left = left_.chunk(chunk_idx_left_);
233     chunk_right = right_.chunk(chunk_idx_right_);
234     if (chunk_pos_left_ == chunk_left->length()) {
235       chunk_pos_left_ = 0;
236       ++chunk_idx_left_;
237       continue;
238     }
239     if (chunk_pos_right_ == chunk_right->length()) {
240       chunk_pos_right_ = 0;
241       ++chunk_idx_right_;
242       continue;
243     }
244     break;
245   }
246   // Determine how big of a section to return
247   int64_t iteration_size = std::min(chunk_left->length() - chunk_pos_left_,
248                                     chunk_right->length() - chunk_pos_right_);
249 
250   *next_left = chunk_left->Slice(chunk_pos_left_, iteration_size);
251   *next_right = chunk_right->Slice(chunk_pos_right_, iteration_size);
252 
253   pos_ += iteration_size;
254   chunk_pos_left_ += iteration_size;
255   chunk_pos_right_ += iteration_size;
256   return true;
257 }
258 
259 }  // namespace internal
260 
261 // ----------------------------------------------------------------------
262 // Table methods
263 
264 /// \class SimpleTable
265 /// \brief A basic, non-lazy in-memory table, like SimpleRecordBatch
266 class SimpleTable : public Table {
267  public:
SimpleTable(std::shared_ptr<Schema> schema,std::vector<std::shared_ptr<ChunkedArray>> columns,int64_t num_rows=-1)268   SimpleTable(std::shared_ptr<Schema> schema,
269               std::vector<std::shared_ptr<ChunkedArray>> columns, int64_t num_rows = -1)
270       : columns_(std::move(columns)) {
271     schema_ = std::move(schema);
272     if (num_rows < 0) {
273       if (columns_.size() == 0) {
274         num_rows_ = 0;
275       } else {
276         num_rows_ = columns_[0]->length();
277       }
278     } else {
279       num_rows_ = num_rows;
280     }
281   }
282 
SimpleTable(std::shared_ptr<Schema> schema,const std::vector<std::shared_ptr<Array>> & columns,int64_t num_rows=-1)283   SimpleTable(std::shared_ptr<Schema> schema,
284               const std::vector<std::shared_ptr<Array>>& columns, int64_t num_rows = -1) {
285     schema_ = std::move(schema);
286     if (num_rows < 0) {
287       if (columns.size() == 0) {
288         num_rows_ = 0;
289       } else {
290         num_rows_ = columns[0]->length();
291       }
292     } else {
293       num_rows_ = num_rows;
294     }
295 
296     columns_.resize(columns.size());
297     for (size_t i = 0; i < columns.size(); ++i) {
298       columns_[i] = std::make_shared<ChunkedArray>(columns[i]);
299     }
300   }
301 
column(int i) const302   std::shared_ptr<ChunkedArray> column(int i) const override { return columns_[i]; }
303 
Slice(int64_t offset,int64_t length) const304   std::shared_ptr<Table> Slice(int64_t offset, int64_t length) const override {
305     auto sliced = columns_;
306     int64_t num_rows = length;
307     for (auto& column : sliced) {
308       column = column->Slice(offset, length);
309       num_rows = column->length();
310     }
311     return Table::Make(schema_, sliced, num_rows);
312   }
313 
RemoveColumn(int i) const314   Result<std::shared_ptr<Table>> RemoveColumn(int i) const override {
315     ARROW_ASSIGN_OR_RAISE(auto new_schema, schema_->RemoveField(i));
316 
317     return Table::Make(new_schema, internal::DeleteVectorElement(columns_, i),
318                        this->num_rows());
319   }
320 
AddColumn(int i,std::shared_ptr<Field> field_arg,std::shared_ptr<ChunkedArray> col) const321   Result<std::shared_ptr<Table>> AddColumn(
322       int i, std::shared_ptr<Field> field_arg,
323       std::shared_ptr<ChunkedArray> col) const override {
324     DCHECK(col != nullptr);
325 
326     if (col->length() != num_rows_) {
327       return Status::Invalid(
328           "Added column's length must match table's length. Expected length ", num_rows_,
329           " but got length ", col->length());
330     }
331 
332     if (!field_arg->type()->Equals(col->type())) {
333       return Status::Invalid("Field type did not match data type");
334     }
335 
336     ARROW_ASSIGN_OR_RAISE(auto new_schema, schema_->AddField(i, field_arg));
337 
338     return Table::Make(new_schema,
339                        internal::AddVectorElement(columns_, i, std::move(col)));
340   }
341 
SetColumn(int i,std::shared_ptr<Field> field_arg,std::shared_ptr<ChunkedArray> col) const342   Result<std::shared_ptr<Table>> SetColumn(
343       int i, std::shared_ptr<Field> field_arg,
344       std::shared_ptr<ChunkedArray> col) const override {
345     DCHECK(col != nullptr);
346 
347     if (col->length() != num_rows_) {
348       return Status::Invalid(
349           "Added column's length must match table's length. Expected length ", num_rows_,
350           " but got length ", col->length());
351     }
352 
353     if (!field_arg->type()->Equals(col->type())) {
354       return Status::Invalid("Field type did not match data type");
355     }
356 
357     ARROW_ASSIGN_OR_RAISE(auto new_schema, schema_->SetField(i, field_arg));
358     return Table::Make(new_schema,
359                        internal::ReplaceVectorElement(columns_, i, std::move(col)));
360   }
361 
ReplaceSchemaMetadata(const std::shared_ptr<const KeyValueMetadata> & metadata) const362   std::shared_ptr<Table> ReplaceSchemaMetadata(
363       const std::shared_ptr<const KeyValueMetadata>& metadata) const override {
364     auto new_schema = schema_->WithMetadata(metadata);
365     return Table::Make(new_schema, columns_);
366   }
367 
Flatten(MemoryPool * pool) const368   Result<std::shared_ptr<Table>> Flatten(MemoryPool* pool) const override {
369     std::vector<std::shared_ptr<Field>> flattened_fields;
370     std::vector<std::shared_ptr<ChunkedArray>> flattened_columns;
371     for (int i = 0; i < num_columns(); ++i) {
372       std::vector<std::shared_ptr<Field>> new_fields = field(i)->Flatten();
373       ARROW_ASSIGN_OR_RAISE(auto new_columns, column(i)->Flatten(pool));
374       DCHECK_EQ(new_columns.size(), new_fields.size());
375       for (size_t j = 0; j < new_columns.size(); ++j) {
376         flattened_fields.push_back(new_fields[j]);
377         flattened_columns.push_back(new_columns[j]);
378       }
379     }
380     auto flattened_schema =
381         std::make_shared<Schema>(std::move(flattened_fields), schema_->metadata());
382     return Table::Make(std::move(flattened_schema), std::move(flattened_columns));
383   }
384 
Validate() const385   Status Validate() const override {
386     RETURN_NOT_OK(ValidateMeta());
387     for (int i = 0; i < num_columns(); ++i) {
388       const ChunkedArray* col = columns_[i].get();
389       Status st = col->Validate();
390       if (!st.ok()) {
391         std::stringstream ss;
392         ss << "Column " << i << ": " << st.message();
393         return st.WithMessage(ss.str());
394       }
395     }
396     return Status::OK();
397   }
398 
ValidateFull() const399   Status ValidateFull() const override {
400     RETURN_NOT_OK(ValidateMeta());
401     for (int i = 0; i < num_columns(); ++i) {
402       const ChunkedArray* col = columns_[i].get();
403       Status st = col->ValidateFull();
404       if (!st.ok()) {
405         std::stringstream ss;
406         ss << "Column " << i << ": " << st.message();
407         return st.WithMessage(ss.str());
408       }
409     }
410     return Status::OK();
411   }
412 
413  protected:
ValidateMeta() const414   Status ValidateMeta() const {
415     // Make sure columns and schema are consistent
416     if (static_cast<int>(columns_.size()) != schema_->num_fields()) {
417       return Status::Invalid("Number of columns did not match schema");
418     }
419     for (int i = 0; i < num_columns(); ++i) {
420       const ChunkedArray* col = columns_[i].get();
421       if (col == nullptr) {
422         return Status::Invalid("Column ", i, " was null");
423       }
424       if (!col->type()->Equals(*schema_->field(i)->type())) {
425         return Status::Invalid("Column data for field ", i, " with type ",
426                                col->type()->ToString(), " is inconsistent with schema ",
427                                schema_->field(i)->type()->ToString());
428       }
429     }
430 
431     // Make sure columns are all the same length, and validate them
432     for (int i = 0; i < num_columns(); ++i) {
433       const ChunkedArray* col = columns_[i].get();
434       if (col->length() != num_rows_) {
435         return Status::Invalid("Column ", i, " named ", field(i)->name(),
436                                " expected length ", num_rows_, " but got length ",
437                                col->length());
438       }
439       Status st = col->Validate();
440       if (!st.ok()) {
441         std::stringstream ss;
442         ss << "Column " << i << ": " << st.message();
443         return st.WithMessage(ss.str());
444       }
445     }
446     return Status::OK();
447   }
448 
449  private:
450   std::vector<std::shared_ptr<ChunkedArray>> columns_;
451 };
452 
Table()453 Table::Table() : num_rows_(0) {}
454 
columns() const455 std::vector<std::shared_ptr<ChunkedArray>> Table::columns() const {
456   std::vector<std::shared_ptr<ChunkedArray>> result;
457   for (int i = 0; i < this->num_columns(); ++i) {
458     result.emplace_back(this->column(i));
459   }
460   return result;
461 }
462 
fields() const463 std::vector<std::shared_ptr<Field>> Table::fields() const {
464   std::vector<std::shared_ptr<Field>> result;
465   for (int i = 0; i < this->num_columns(); ++i) {
466     result.emplace_back(this->field(i));
467   }
468   return result;
469 }
470 
Make(std::shared_ptr<Schema> schema,std::vector<std::shared_ptr<ChunkedArray>> columns,int64_t num_rows)471 std::shared_ptr<Table> Table::Make(std::shared_ptr<Schema> schema,
472                                    std::vector<std::shared_ptr<ChunkedArray>> columns,
473                                    int64_t num_rows) {
474   return std::make_shared<SimpleTable>(std::move(schema), std::move(columns), num_rows);
475 }
476 
Make(std::shared_ptr<Schema> schema,const std::vector<std::shared_ptr<Array>> & arrays,int64_t num_rows)477 std::shared_ptr<Table> Table::Make(std::shared_ptr<Schema> schema,
478                                    const std::vector<std::shared_ptr<Array>>& arrays,
479                                    int64_t num_rows) {
480   return std::make_shared<SimpleTable>(std::move(schema), arrays, num_rows);
481 }
482 
FromRecordBatchReader(RecordBatchReader * reader)483 Result<std::shared_ptr<Table>> Table::FromRecordBatchReader(RecordBatchReader* reader) {
484   std::shared_ptr<Table> table = nullptr;
485   RETURN_NOT_OK(reader->ReadAll(&table));
486   return table;
487 }
488 
FromRecordBatches(std::shared_ptr<Schema> schema,const std::vector<std::shared_ptr<RecordBatch>> & batches)489 Result<std::shared_ptr<Table>> Table::FromRecordBatches(
490     std::shared_ptr<Schema> schema,
491     const std::vector<std::shared_ptr<RecordBatch>>& batches) {
492   const int nbatches = static_cast<int>(batches.size());
493   const int ncolumns = static_cast<int>(schema->num_fields());
494 
495   int64_t num_rows = 0;
496   for (int i = 0; i < nbatches; ++i) {
497     if (!batches[i]->schema()->Equals(*schema, false)) {
498       return Status::Invalid("Schema at index ", static_cast<int>(i),
499                              " was different: \n", schema->ToString(), "\nvs\n",
500                              batches[i]->schema()->ToString());
501     }
502     num_rows += batches[i]->num_rows();
503   }
504 
505   std::vector<std::shared_ptr<ChunkedArray>> columns(ncolumns);
506   std::vector<std::shared_ptr<Array>> column_arrays(nbatches);
507 
508   for (int i = 0; i < ncolumns; ++i) {
509     for (int j = 0; j < nbatches; ++j) {
510       column_arrays[j] = batches[j]->column(i);
511     }
512     columns[i] = std::make_shared<ChunkedArray>(column_arrays, schema->field(i)->type());
513   }
514 
515   return Table::Make(std::move(schema), std::move(columns), num_rows);
516 }
517 
FromRecordBatches(std::shared_ptr<Schema> schema,const std::vector<std::shared_ptr<RecordBatch>> & batches,std::shared_ptr<Table> * table)518 Status Table::FromRecordBatches(std::shared_ptr<Schema> schema,
519                                 const std::vector<std::shared_ptr<RecordBatch>>& batches,
520                                 std::shared_ptr<Table>* table) {
521   return FromRecordBatches(std::move(schema), batches).Value(table);
522 }
523 
FromRecordBatches(const std::vector<std::shared_ptr<RecordBatch>> & batches)524 Result<std::shared_ptr<Table>> Table::FromRecordBatches(
525     const std::vector<std::shared_ptr<RecordBatch>>& batches) {
526   if (batches.size() == 0) {
527     return Status::Invalid("Must pass at least one record batch or an explicit Schema");
528   }
529 
530   return FromRecordBatches(batches[0]->schema(), batches);
531 }
532 
FromRecordBatches(const std::vector<std::shared_ptr<RecordBatch>> & batches,std::shared_ptr<Table> * table)533 Status Table::FromRecordBatches(const std::vector<std::shared_ptr<RecordBatch>>& batches,
534                                 std::shared_ptr<Table>* table) {
535   return FromRecordBatches(batches).Value(table);
536 }
537 
FromChunkedStructArray(const std::shared_ptr<ChunkedArray> & array)538 Result<std::shared_ptr<Table>> Table::FromChunkedStructArray(
539     const std::shared_ptr<ChunkedArray>& array) {
540   auto type = array->type();
541   if (type->id() != Type::STRUCT) {
542     return Status::Invalid("Expected a chunked struct array, got ", *type);
543   }
544   int num_columns = type->num_fields();
545   int num_chunks = array->num_chunks();
546 
547   const auto& struct_chunks = array->chunks();
548   std::vector<std::shared_ptr<ChunkedArray>> columns(num_columns);
549   for (int i = 0; i < num_columns; ++i) {
550     ArrayVector chunks(num_chunks);
551     std::transform(struct_chunks.begin(), struct_chunks.end(), chunks.begin(),
552                    [i](const std::shared_ptr<Array>& struct_chunk) {
553                      return static_cast<const StructArray&>(*struct_chunk).field(i);
554                    });
555     columns[i] = std::make_shared<ChunkedArray>(std::move(chunks));
556   }
557 
558   return Table::Make(::arrow::schema(type->fields()), std::move(columns),
559                      array->length());
560 }
561 
FromChunkedStructArray(const std::shared_ptr<ChunkedArray> & array,std::shared_ptr<Table> * table)562 Status Table::FromChunkedStructArray(const std::shared_ptr<ChunkedArray>& array,
563                                      std::shared_ptr<Table>* table) {
564   return FromChunkedStructArray(array).Value(table);
565 }
566 
RemoveColumn(int i,std::shared_ptr<Table> * out) const567 Status Table::RemoveColumn(int i, std::shared_ptr<Table>* out) const {
568   return RemoveColumn(i).Value(out);
569 }
570 
AddColumn(int i,std::shared_ptr<Field> field_arg,std::shared_ptr<ChunkedArray> column,std::shared_ptr<Table> * out) const571 Status Table::AddColumn(int i, std::shared_ptr<Field> field_arg,
572                         std::shared_ptr<ChunkedArray> column,
573                         std::shared_ptr<Table>* out) const {
574   return AddColumn(i, std::move(field_arg), std::move(column)).Value(out);
575 }
576 
SetColumn(int i,std::shared_ptr<Field> field_arg,std::shared_ptr<ChunkedArray> column,std::shared_ptr<Table> * out) const577 Status Table::SetColumn(int i, std::shared_ptr<Field> field_arg,
578                         std::shared_ptr<ChunkedArray> column,
579                         std::shared_ptr<Table>* out) const {
580   return SetColumn(i, std::move(field_arg), std::move(column)).Value(out);
581 }
582 
ColumnNames() const583 std::vector<std::string> Table::ColumnNames() const {
584   std::vector<std::string> names(num_columns());
585   for (int i = 0; i < num_columns(); ++i) {
586     names[i] = field(i)->name();
587   }
588   return names;
589 }
590 
RenameColumns(const std::vector<std::string> & names) const591 Result<std::shared_ptr<Table>> Table::RenameColumns(
592     const std::vector<std::string>& names) const {
593   if (names.size() != static_cast<size_t>(num_columns())) {
594     return Status::Invalid("tried to rename a table of ", num_columns(),
595                            " columns but only ", names.size(), " names were provided");
596   }
597   std::vector<std::shared_ptr<ChunkedArray>> columns(num_columns());
598   std::vector<std::shared_ptr<Field>> fields(num_columns());
599   for (int i = 0; i < num_columns(); ++i) {
600     columns[i] = column(i);
601     fields[i] = field(i)->WithName(names[i]);
602   }
603   return Table::Make(::arrow::schema(std::move(fields)), std::move(columns), num_rows());
604 }
605 
RenameColumns(const std::vector<std::string> & names,std::shared_ptr<Table> * out) const606 Status Table::RenameColumns(const std::vector<std::string>& names,
607                             std::shared_ptr<Table>* out) const {
608   return RenameColumns(names).Value(out);
609 }
610 
Flatten(MemoryPool * pool,std::shared_ptr<Table> * out) const611 Status Table::Flatten(MemoryPool* pool, std::shared_ptr<Table>* out) const {
612   return Flatten(pool).Value(out);
613 }
614 
ToString() const615 std::string Table::ToString() const {
616   std::stringstream ss;
617   ARROW_CHECK_OK(PrettyPrint(*this, 0, &ss));
618   return ss.str();
619 }
620 
ConcatenateTables(const std::vector<std::shared_ptr<Table>> & tables,const ConcatenateTablesOptions options,MemoryPool * memory_pool)621 Result<std::shared_ptr<Table>> ConcatenateTables(
622     const std::vector<std::shared_ptr<Table>>& tables,
623     const ConcatenateTablesOptions options, MemoryPool* memory_pool) {
624   if (tables.size() == 0) {
625     return Status::Invalid("Must pass at least one table");
626   }
627 
628   std::vector<std::shared_ptr<Table>> promoted_tables;
629   const std::vector<std::shared_ptr<Table>>* tables_to_concat = &tables;
630   if (options.unify_schemas) {
631     std::vector<std::shared_ptr<Schema>> schemas;
632     schemas.reserve(tables.size());
633     for (const auto& t : tables) {
634       schemas.push_back(t->schema());
635     }
636 
637     ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Schema> unified_schema,
638                           UnifySchemas(schemas, options.field_merge_options));
639 
640     promoted_tables.reserve(tables.size());
641     for (const auto& t : tables) {
642       promoted_tables.emplace_back();
643       ARROW_ASSIGN_OR_RAISE(promoted_tables.back(),
644                             PromoteTableToSchema(t, unified_schema, memory_pool));
645     }
646     tables_to_concat = &promoted_tables;
647   } else {
648     auto first_schema = tables[0]->schema();
649     for (size_t i = 1; i < tables.size(); ++i) {
650       if (!tables[i]->schema()->Equals(*first_schema, false)) {
651         return Status::Invalid("Schema at index ", i, " was different: \n",
652                                first_schema->ToString(), "\nvs\n",
653                                tables[i]->schema()->ToString());
654       }
655     }
656   }
657 
658   std::shared_ptr<Schema> schema = tables_to_concat->front()->schema();
659 
660   const int ncolumns = schema->num_fields();
661 
662   std::vector<std::shared_ptr<ChunkedArray>> columns(ncolumns);
663   for (int i = 0; i < ncolumns; ++i) {
664     std::vector<std::shared_ptr<Array>> column_arrays;
665     for (const auto& table : *tables_to_concat) {
666       const std::vector<std::shared_ptr<Array>>& chunks = table->column(i)->chunks();
667       for (const auto& chunk : chunks) {
668         column_arrays.push_back(chunk);
669       }
670     }
671     columns[i] = std::make_shared<ChunkedArray>(column_arrays, schema->field(i)->type());
672   }
673   return Table::Make(schema, columns);
674 }
675 
PromoteTableToSchema(const std::shared_ptr<Table> & table,const std::shared_ptr<Schema> & schema,MemoryPool * pool)676 Result<std::shared_ptr<Table>> PromoteTableToSchema(const std::shared_ptr<Table>& table,
677                                                     const std::shared_ptr<Schema>& schema,
678                                                     MemoryPool* pool) {
679   const std::shared_ptr<Schema> current_schema = table->schema();
680   if (current_schema->Equals(*schema, /*check_metadata=*/false)) {
681     return table->ReplaceSchemaMetadata(schema->metadata());
682   }
683 
684   // fields_seen[i] == true iff that field is also in `schema`.
685   std::vector<bool> fields_seen(current_schema->num_fields(), false);
686 
687   std::vector<std::shared_ptr<ChunkedArray>> columns;
688   columns.reserve(schema->num_fields());
689   const int64_t num_rows = table->num_rows();
690   auto AppendColumnOfNulls = [pool, &columns,
691                               num_rows](const std::shared_ptr<DataType>& type) {
692     // TODO(bkietz): share the zero-filled buffers as much as possible across
693     // the null-filled arrays created here.
694     ARROW_ASSIGN_OR_RAISE(auto array_of_nulls, MakeArrayOfNull(type, num_rows, pool));
695     columns.push_back(std::make_shared<ChunkedArray>(array_of_nulls));
696     return Status::OK();
697   };
698 
699   for (const auto& field : schema->fields()) {
700     const std::vector<int> field_indices =
701         current_schema->GetAllFieldIndices(field->name());
702     if (field_indices.empty()) {
703       RETURN_NOT_OK(AppendColumnOfNulls(field->type()));
704       continue;
705     }
706 
707     if (field_indices.size() > 1) {
708       return Status::Invalid(
709           "PromoteTableToSchema cannot handle schemas with duplicate fields: ",
710           field->name());
711     }
712 
713     const int field_index = field_indices[0];
714     const auto& current_field = current_schema->field(field_index);
715     if (!field->nullable() && current_field->nullable()) {
716       return Status::Invalid("Unable to promote field ", current_field->name(),
717                              ": it was nullable but the target schema was not.");
718     }
719 
720     fields_seen[field_index] = true;
721     if (current_field->type()->Equals(field->type())) {
722       columns.push_back(table->column(field_index));
723       continue;
724     }
725 
726     if (current_field->type()->id() == Type::NA) {
727       RETURN_NOT_OK(AppendColumnOfNulls(field->type()));
728       continue;
729     }
730 
731     return Status::Invalid("Unable to promote field ", field->name(),
732                            ": incompatible types: ", field->type()->ToString(), " vs ",
733                            current_field->type()->ToString());
734   }
735 
736   auto unseen_field_iter = std::find(fields_seen.begin(), fields_seen.end(), false);
737   if (unseen_field_iter != fields_seen.end()) {
738     const size_t unseen_field_index = unseen_field_iter - fields_seen.begin();
739     return Status::Invalid(
740         "Incompatible schemas: field ",
741         current_schema->field(static_cast<int>(unseen_field_index))->name(),
742         " did not exist in the new schema.");
743   }
744 
745   return Table::Make(schema, std::move(columns));
746 }
747 
Equals(const Table & other,bool check_metadata) const748 bool Table::Equals(const Table& other, bool check_metadata) const {
749   if (this == &other) {
750     return true;
751   }
752   if (!schema_->Equals(*other.schema(), check_metadata)) {
753     return false;
754   }
755   if (this->num_columns() != other.num_columns()) {
756     return false;
757   }
758 
759   for (int i = 0; i < this->num_columns(); i++) {
760     if (!this->column(i)->Equals(other.column(i))) {
761       return false;
762     }
763   }
764   return true;
765 }
766 
CombineChunks(MemoryPool * pool) const767 Result<std::shared_ptr<Table>> Table::CombineChunks(MemoryPool* pool) const {
768   const int ncolumns = num_columns();
769   std::vector<std::shared_ptr<ChunkedArray>> compacted_columns(ncolumns);
770   for (int i = 0; i < ncolumns; ++i) {
771     auto col = column(i);
772     if (col->num_chunks() <= 1) {
773       compacted_columns[i] = col;
774     } else {
775       std::shared_ptr<Array> compacted;
776       RETURN_NOT_OK(Concatenate(col->chunks(), pool, &compacted));
777       compacted_columns[i] = std::make_shared<ChunkedArray>(compacted);
778     }
779   }
780   return Table::Make(schema(), std::move(compacted_columns));
781 }
782 
CombineChunks(MemoryPool * pool,std::shared_ptr<Table> * out) const783 Status Table::CombineChunks(MemoryPool* pool, std::shared_ptr<Table>* out) const {
784   return CombineChunks(pool).Value(out);
785 }
786 
787 // ----------------------------------------------------------------------
788 // Convert a table to a sequence of record batches
789 
TableBatchReader(const Table & table)790 TableBatchReader::TableBatchReader(const Table& table)
791     : table_(table),
792       column_data_(table.num_columns()),
793       chunk_numbers_(table.num_columns(), 0),
794       chunk_offsets_(table.num_columns(), 0),
795       absolute_row_position_(0),
796       max_chunksize_(std::numeric_limits<int64_t>::max()) {
797   for (int i = 0; i < table.num_columns(); ++i) {
798     column_data_[i] = table.column(i).get();
799   }
800 }
801 
schema() const802 std::shared_ptr<Schema> TableBatchReader::schema() const { return table_.schema(); }
803 
set_chunksize(int64_t chunksize)804 void TableBatchReader::set_chunksize(int64_t chunksize) { max_chunksize_ = chunksize; }
805 
ReadNext(std::shared_ptr<RecordBatch> * out)806 Status TableBatchReader::ReadNext(std::shared_ptr<RecordBatch>* out) {
807   if (absolute_row_position_ == table_.num_rows()) {
808     *out = nullptr;
809     return Status::OK();
810   }
811 
812   // Determine the minimum contiguous slice across all columns
813   int64_t chunksize = std::min(table_.num_rows(), max_chunksize_);
814   std::vector<const Array*> chunks(table_.num_columns());
815   for (int i = 0; i < table_.num_columns(); ++i) {
816     auto chunk = column_data_[i]->chunk(chunk_numbers_[i]).get();
817     int64_t chunk_remaining = chunk->length() - chunk_offsets_[i];
818 
819     if (chunk_remaining < chunksize) {
820       chunksize = chunk_remaining;
821     }
822 
823     chunks[i] = chunk;
824   }
825 
826   // Slice chunks and advance chunk index as appropriate
827   std::vector<std::shared_ptr<ArrayData>> batch_data(table_.num_columns());
828 
829   for (int i = 0; i < table_.num_columns(); ++i) {
830     // Exhausted chunk
831     const Array* chunk = chunks[i];
832     const int64_t offset = chunk_offsets_[i];
833     std::shared_ptr<ArrayData> slice_data;
834     if ((chunk->length() - offset) == chunksize) {
835       ++chunk_numbers_[i];
836       chunk_offsets_[i] = 0;
837       if (offset > 0) {
838         // Need to slice
839         slice_data = chunk->Slice(offset, chunksize)->data();
840       } else {
841         // No slice
842         slice_data = chunk->data();
843       }
844     } else {
845       chunk_offsets_[i] += chunksize;
846       slice_data = chunk->Slice(offset, chunksize)->data();
847     }
848     batch_data[i] = std::move(slice_data);
849   }
850 
851   absolute_row_position_ += chunksize;
852   *out = RecordBatch::Make(table_.schema(), chunksize, std::move(batch_data));
853 
854   return Status::OK();
855 }
856 
857 }  // namespace arrow
858