1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "arrow/table.h"
19
20 #include <algorithm>
21 #include <cstdlib>
22 #include <limits>
23 #include <memory>
24 #include <sstream>
25 #include <utility>
26
27 #include "arrow/array.h"
28 #include "arrow/array/concatenate.h"
29 #include "arrow/array/validate.h"
30 #include "arrow/pretty_print.h"
31 #include "arrow/record_batch.h"
32 #include "arrow/status.h"
33 #include "arrow/type.h"
34 #include "arrow/util/checked_cast.h"
35 #include "arrow/util/logging.h"
36 #include "arrow/util/vector.h"
37
38 namespace arrow {
39
40 using internal::checked_cast;
41
42 // ----------------------------------------------------------------------
43 // ChunkedArray methods
44
ChunkedArray(ArrayVector chunks)45 ChunkedArray::ChunkedArray(ArrayVector chunks) : chunks_(std::move(chunks)) {
46 length_ = 0;
47 null_count_ = 0;
48
49 ARROW_CHECK_GT(chunks_.size(), 0)
50 << "cannot construct ChunkedArray from empty vector and omitted type";
51 type_ = chunks_[0]->type();
52 for (const std::shared_ptr<Array>& chunk : chunks_) {
53 length_ += chunk->length();
54 null_count_ += chunk->null_count();
55 }
56 }
57
ChunkedArray(ArrayVector chunks,std::shared_ptr<DataType> type)58 ChunkedArray::ChunkedArray(ArrayVector chunks, std::shared_ptr<DataType> type)
59 : chunks_(std::move(chunks)), type_(std::move(type)) {
60 length_ = 0;
61 null_count_ = 0;
62 for (const std::shared_ptr<Array>& chunk : chunks_) {
63 length_ += chunk->length();
64 null_count_ += chunk->null_count();
65 }
66 }
67
Equals(const ChunkedArray & other) const68 bool ChunkedArray::Equals(const ChunkedArray& other) const {
69 if (length_ != other.length()) {
70 return false;
71 }
72 if (null_count_ != other.null_count()) {
73 return false;
74 }
75 // We cannot toggle check_metadata here yet, so we don't check it
76 if (!type_->Equals(*other.type_, /*check_metadata=*/false)) {
77 return false;
78 }
79
80 // Check contents of the underlying arrays. This checks for equality of
81 // the underlying data independently of the chunk size.
82 return internal::ApplyBinaryChunked(
83 *this, other,
84 [](const Array& left_piece, const Array& right_piece,
85 int64_t ARROW_ARG_UNUSED(position)) {
86 if (!left_piece.Equals(right_piece)) {
87 return Status::Invalid("Unequal piece");
88 }
89 return Status::OK();
90 })
91 .ok();
92 }
93
Equals(const std::shared_ptr<ChunkedArray> & other) const94 bool ChunkedArray::Equals(const std::shared_ptr<ChunkedArray>& other) const {
95 if (this == other.get()) {
96 return true;
97 }
98 if (!other) {
99 return false;
100 }
101 return Equals(*other.get());
102 }
103
Slice(int64_t offset,int64_t length) const104 std::shared_ptr<ChunkedArray> ChunkedArray::Slice(int64_t offset, int64_t length) const {
105 ARROW_CHECK_LE(offset, length_) << "Slice offset greater than array length";
106 bool offset_equals_length = offset == length_;
107 int curr_chunk = 0;
108 while (curr_chunk < num_chunks() && offset >= chunk(curr_chunk)->length()) {
109 offset -= chunk(curr_chunk)->length();
110 curr_chunk++;
111 }
112
113 ArrayVector new_chunks;
114 if (num_chunks() > 0 && (offset_equals_length || length == 0)) {
115 // Special case the zero-length slice to make sure there is at least 1 Array
116 // in the result. When there are zero chunks we return zero chunks
117 new_chunks.push_back(chunk(std::min(curr_chunk, num_chunks() - 1))->Slice(0, 0));
118 } else {
119 while (curr_chunk < num_chunks() && length > 0) {
120 new_chunks.push_back(chunk(curr_chunk)->Slice(offset, length));
121 length -= chunk(curr_chunk)->length() - offset;
122 offset = 0;
123 curr_chunk++;
124 }
125 }
126
127 return std::make_shared<ChunkedArray>(new_chunks, type_);
128 }
129
Slice(int64_t offset) const130 std::shared_ptr<ChunkedArray> ChunkedArray::Slice(int64_t offset) const {
131 return Slice(offset, length_);
132 }
133
Flatten(MemoryPool * pool,std::vector<std::shared_ptr<ChunkedArray>> * out) const134 Status ChunkedArray::Flatten(MemoryPool* pool,
135 std::vector<std::shared_ptr<ChunkedArray>>* out) const {
136 return Flatten(pool).Value(out);
137 }
138
Flatten(MemoryPool * pool) const139 Result<std::vector<std::shared_ptr<ChunkedArray>>> ChunkedArray::Flatten(
140 MemoryPool* pool) const {
141 if (type()->id() != Type::STRUCT) {
142 // Emulate nonexistent copy constructor
143 return std::vector<std::shared_ptr<ChunkedArray>>{
144 std::make_shared<ChunkedArray>(chunks_, type_)};
145 }
146
147 std::vector<ArrayVector> flattened_chunks(type()->num_fields());
148 for (const auto& chunk : chunks_) {
149 ARROW_ASSIGN_OR_RAISE(auto arrays,
150 checked_cast<const StructArray&>(*chunk).Flatten(pool));
151 DCHECK_EQ(arrays.size(), flattened_chunks.size());
152 for (size_t i = 0; i < arrays.size(); ++i) {
153 flattened_chunks[i].push_back(arrays[i]);
154 }
155 }
156
157 std::vector<std::shared_ptr<ChunkedArray>> flattened(type()->num_fields());
158 for (size_t i = 0; i < flattened.size(); ++i) {
159 auto child_type = type()->field(static_cast<int>(i))->type();
160 flattened[i] =
161 std::make_shared<ChunkedArray>(std::move(flattened_chunks[i]), child_type);
162 }
163 return flattened;
164 }
165
View(const std::shared_ptr<DataType> & type) const166 Result<std::shared_ptr<ChunkedArray>> ChunkedArray::View(
167 const std::shared_ptr<DataType>& type) const {
168 ArrayVector out_chunks(this->num_chunks());
169 for (int i = 0; i < this->num_chunks(); ++i) {
170 ARROW_ASSIGN_OR_RAISE(out_chunks[i], chunks_[i]->View(type));
171 }
172 return std::make_shared<ChunkedArray>(out_chunks, type);
173 }
174
View(const std::shared_ptr<DataType> & type,std::shared_ptr<ChunkedArray> * out) const175 Status ChunkedArray::View(const std::shared_ptr<DataType>& type,
176 std::shared_ptr<ChunkedArray>* out) const {
177 return View(type).Value(out);
178 }
179
ToString() const180 std::string ChunkedArray::ToString() const {
181 std::stringstream ss;
182 ARROW_CHECK_OK(PrettyPrint(*this, 0, &ss));
183 return ss.str();
184 }
185
Validate() const186 Status ChunkedArray::Validate() const {
187 if (chunks_.size() == 0) {
188 return Status::OK();
189 }
190
191 const auto& type = *chunks_[0]->type();
192 // Make sure chunks all have the same type
193 for (size_t i = 1; i < chunks_.size(); ++i) {
194 const Array& chunk = *chunks_[i];
195 if (!chunk.type()->Equals(type)) {
196 return Status::Invalid("In chunk ", i, " expected type ", type.ToString(),
197 " but saw ", chunk.type()->ToString());
198 }
199 }
200 // Validate the chunks themselves
201 for (size_t i = 0; i < chunks_.size(); ++i) {
202 const Array& chunk = *chunks_[i];
203 const Status st = internal::ValidateArray(chunk);
204 if (!st.ok()) {
205 return Status::Invalid("In chunk ", i, ": ", st.ToString());
206 }
207 }
208 return Status::OK();
209 }
210
ValidateFull() const211 Status ChunkedArray::ValidateFull() const {
212 RETURN_NOT_OK(Validate());
213 for (size_t i = 0; i < chunks_.size(); ++i) {
214 const Array& chunk = *chunks_[i];
215 const Status st = internal::ValidateArrayData(chunk);
216 if (!st.ok()) {
217 return Status::Invalid("In chunk ", i, ": ", st.ToString());
218 }
219 }
220 return Status::OK();
221 }
222
223 namespace internal {
224
Next(std::shared_ptr<Array> * next_left,std::shared_ptr<Array> * next_right)225 bool MultipleChunkIterator::Next(std::shared_ptr<Array>* next_left,
226 std::shared_ptr<Array>* next_right) {
227 if (pos_ == length_) return false;
228
229 // Find non-empty chunk
230 std::shared_ptr<Array> chunk_left, chunk_right;
231 while (true) {
232 chunk_left = left_.chunk(chunk_idx_left_);
233 chunk_right = right_.chunk(chunk_idx_right_);
234 if (chunk_pos_left_ == chunk_left->length()) {
235 chunk_pos_left_ = 0;
236 ++chunk_idx_left_;
237 continue;
238 }
239 if (chunk_pos_right_ == chunk_right->length()) {
240 chunk_pos_right_ = 0;
241 ++chunk_idx_right_;
242 continue;
243 }
244 break;
245 }
246 // Determine how big of a section to return
247 int64_t iteration_size = std::min(chunk_left->length() - chunk_pos_left_,
248 chunk_right->length() - chunk_pos_right_);
249
250 *next_left = chunk_left->Slice(chunk_pos_left_, iteration_size);
251 *next_right = chunk_right->Slice(chunk_pos_right_, iteration_size);
252
253 pos_ += iteration_size;
254 chunk_pos_left_ += iteration_size;
255 chunk_pos_right_ += iteration_size;
256 return true;
257 }
258
259 } // namespace internal
260
261 // ----------------------------------------------------------------------
262 // Table methods
263
264 /// \class SimpleTable
265 /// \brief A basic, non-lazy in-memory table, like SimpleRecordBatch
266 class SimpleTable : public Table {
267 public:
SimpleTable(std::shared_ptr<Schema> schema,std::vector<std::shared_ptr<ChunkedArray>> columns,int64_t num_rows=-1)268 SimpleTable(std::shared_ptr<Schema> schema,
269 std::vector<std::shared_ptr<ChunkedArray>> columns, int64_t num_rows = -1)
270 : columns_(std::move(columns)) {
271 schema_ = std::move(schema);
272 if (num_rows < 0) {
273 if (columns_.size() == 0) {
274 num_rows_ = 0;
275 } else {
276 num_rows_ = columns_[0]->length();
277 }
278 } else {
279 num_rows_ = num_rows;
280 }
281 }
282
SimpleTable(std::shared_ptr<Schema> schema,const std::vector<std::shared_ptr<Array>> & columns,int64_t num_rows=-1)283 SimpleTable(std::shared_ptr<Schema> schema,
284 const std::vector<std::shared_ptr<Array>>& columns, int64_t num_rows = -1) {
285 schema_ = std::move(schema);
286 if (num_rows < 0) {
287 if (columns.size() == 0) {
288 num_rows_ = 0;
289 } else {
290 num_rows_ = columns[0]->length();
291 }
292 } else {
293 num_rows_ = num_rows;
294 }
295
296 columns_.resize(columns.size());
297 for (size_t i = 0; i < columns.size(); ++i) {
298 columns_[i] = std::make_shared<ChunkedArray>(columns[i]);
299 }
300 }
301
column(int i) const302 std::shared_ptr<ChunkedArray> column(int i) const override { return columns_[i]; }
303
Slice(int64_t offset,int64_t length) const304 std::shared_ptr<Table> Slice(int64_t offset, int64_t length) const override {
305 auto sliced = columns_;
306 int64_t num_rows = length;
307 for (auto& column : sliced) {
308 column = column->Slice(offset, length);
309 num_rows = column->length();
310 }
311 return Table::Make(schema_, sliced, num_rows);
312 }
313
RemoveColumn(int i) const314 Result<std::shared_ptr<Table>> RemoveColumn(int i) const override {
315 ARROW_ASSIGN_OR_RAISE(auto new_schema, schema_->RemoveField(i));
316
317 return Table::Make(new_schema, internal::DeleteVectorElement(columns_, i),
318 this->num_rows());
319 }
320
AddColumn(int i,std::shared_ptr<Field> field_arg,std::shared_ptr<ChunkedArray> col) const321 Result<std::shared_ptr<Table>> AddColumn(
322 int i, std::shared_ptr<Field> field_arg,
323 std::shared_ptr<ChunkedArray> col) const override {
324 DCHECK(col != nullptr);
325
326 if (col->length() != num_rows_) {
327 return Status::Invalid(
328 "Added column's length must match table's length. Expected length ", num_rows_,
329 " but got length ", col->length());
330 }
331
332 if (!field_arg->type()->Equals(col->type())) {
333 return Status::Invalid("Field type did not match data type");
334 }
335
336 ARROW_ASSIGN_OR_RAISE(auto new_schema, schema_->AddField(i, field_arg));
337
338 return Table::Make(new_schema,
339 internal::AddVectorElement(columns_, i, std::move(col)));
340 }
341
SetColumn(int i,std::shared_ptr<Field> field_arg,std::shared_ptr<ChunkedArray> col) const342 Result<std::shared_ptr<Table>> SetColumn(
343 int i, std::shared_ptr<Field> field_arg,
344 std::shared_ptr<ChunkedArray> col) const override {
345 DCHECK(col != nullptr);
346
347 if (col->length() != num_rows_) {
348 return Status::Invalid(
349 "Added column's length must match table's length. Expected length ", num_rows_,
350 " but got length ", col->length());
351 }
352
353 if (!field_arg->type()->Equals(col->type())) {
354 return Status::Invalid("Field type did not match data type");
355 }
356
357 ARROW_ASSIGN_OR_RAISE(auto new_schema, schema_->SetField(i, field_arg));
358 return Table::Make(new_schema,
359 internal::ReplaceVectorElement(columns_, i, std::move(col)));
360 }
361
ReplaceSchemaMetadata(const std::shared_ptr<const KeyValueMetadata> & metadata) const362 std::shared_ptr<Table> ReplaceSchemaMetadata(
363 const std::shared_ptr<const KeyValueMetadata>& metadata) const override {
364 auto new_schema = schema_->WithMetadata(metadata);
365 return Table::Make(new_schema, columns_);
366 }
367
Flatten(MemoryPool * pool) const368 Result<std::shared_ptr<Table>> Flatten(MemoryPool* pool) const override {
369 std::vector<std::shared_ptr<Field>> flattened_fields;
370 std::vector<std::shared_ptr<ChunkedArray>> flattened_columns;
371 for (int i = 0; i < num_columns(); ++i) {
372 std::vector<std::shared_ptr<Field>> new_fields = field(i)->Flatten();
373 ARROW_ASSIGN_OR_RAISE(auto new_columns, column(i)->Flatten(pool));
374 DCHECK_EQ(new_columns.size(), new_fields.size());
375 for (size_t j = 0; j < new_columns.size(); ++j) {
376 flattened_fields.push_back(new_fields[j]);
377 flattened_columns.push_back(new_columns[j]);
378 }
379 }
380 auto flattened_schema =
381 std::make_shared<Schema>(std::move(flattened_fields), schema_->metadata());
382 return Table::Make(std::move(flattened_schema), std::move(flattened_columns));
383 }
384
Validate() const385 Status Validate() const override {
386 RETURN_NOT_OK(ValidateMeta());
387 for (int i = 0; i < num_columns(); ++i) {
388 const ChunkedArray* col = columns_[i].get();
389 Status st = col->Validate();
390 if (!st.ok()) {
391 std::stringstream ss;
392 ss << "Column " << i << ": " << st.message();
393 return st.WithMessage(ss.str());
394 }
395 }
396 return Status::OK();
397 }
398
ValidateFull() const399 Status ValidateFull() const override {
400 RETURN_NOT_OK(ValidateMeta());
401 for (int i = 0; i < num_columns(); ++i) {
402 const ChunkedArray* col = columns_[i].get();
403 Status st = col->ValidateFull();
404 if (!st.ok()) {
405 std::stringstream ss;
406 ss << "Column " << i << ": " << st.message();
407 return st.WithMessage(ss.str());
408 }
409 }
410 return Status::OK();
411 }
412
413 protected:
ValidateMeta() const414 Status ValidateMeta() const {
415 // Make sure columns and schema are consistent
416 if (static_cast<int>(columns_.size()) != schema_->num_fields()) {
417 return Status::Invalid("Number of columns did not match schema");
418 }
419 for (int i = 0; i < num_columns(); ++i) {
420 const ChunkedArray* col = columns_[i].get();
421 if (col == nullptr) {
422 return Status::Invalid("Column ", i, " was null");
423 }
424 if (!col->type()->Equals(*schema_->field(i)->type())) {
425 return Status::Invalid("Column data for field ", i, " with type ",
426 col->type()->ToString(), " is inconsistent with schema ",
427 schema_->field(i)->type()->ToString());
428 }
429 }
430
431 // Make sure columns are all the same length, and validate them
432 for (int i = 0; i < num_columns(); ++i) {
433 const ChunkedArray* col = columns_[i].get();
434 if (col->length() != num_rows_) {
435 return Status::Invalid("Column ", i, " named ", field(i)->name(),
436 " expected length ", num_rows_, " but got length ",
437 col->length());
438 }
439 Status st = col->Validate();
440 if (!st.ok()) {
441 std::stringstream ss;
442 ss << "Column " << i << ": " << st.message();
443 return st.WithMessage(ss.str());
444 }
445 }
446 return Status::OK();
447 }
448
449 private:
450 std::vector<std::shared_ptr<ChunkedArray>> columns_;
451 };
452
Table()453 Table::Table() : num_rows_(0) {}
454
columns() const455 std::vector<std::shared_ptr<ChunkedArray>> Table::columns() const {
456 std::vector<std::shared_ptr<ChunkedArray>> result;
457 for (int i = 0; i < this->num_columns(); ++i) {
458 result.emplace_back(this->column(i));
459 }
460 return result;
461 }
462
fields() const463 std::vector<std::shared_ptr<Field>> Table::fields() const {
464 std::vector<std::shared_ptr<Field>> result;
465 for (int i = 0; i < this->num_columns(); ++i) {
466 result.emplace_back(this->field(i));
467 }
468 return result;
469 }
470
Make(std::shared_ptr<Schema> schema,std::vector<std::shared_ptr<ChunkedArray>> columns,int64_t num_rows)471 std::shared_ptr<Table> Table::Make(std::shared_ptr<Schema> schema,
472 std::vector<std::shared_ptr<ChunkedArray>> columns,
473 int64_t num_rows) {
474 return std::make_shared<SimpleTable>(std::move(schema), std::move(columns), num_rows);
475 }
476
Make(std::shared_ptr<Schema> schema,const std::vector<std::shared_ptr<Array>> & arrays,int64_t num_rows)477 std::shared_ptr<Table> Table::Make(std::shared_ptr<Schema> schema,
478 const std::vector<std::shared_ptr<Array>>& arrays,
479 int64_t num_rows) {
480 return std::make_shared<SimpleTable>(std::move(schema), arrays, num_rows);
481 }
482
FromRecordBatchReader(RecordBatchReader * reader)483 Result<std::shared_ptr<Table>> Table::FromRecordBatchReader(RecordBatchReader* reader) {
484 std::shared_ptr<Table> table = nullptr;
485 RETURN_NOT_OK(reader->ReadAll(&table));
486 return table;
487 }
488
FromRecordBatches(std::shared_ptr<Schema> schema,const std::vector<std::shared_ptr<RecordBatch>> & batches)489 Result<std::shared_ptr<Table>> Table::FromRecordBatches(
490 std::shared_ptr<Schema> schema,
491 const std::vector<std::shared_ptr<RecordBatch>>& batches) {
492 const int nbatches = static_cast<int>(batches.size());
493 const int ncolumns = static_cast<int>(schema->num_fields());
494
495 int64_t num_rows = 0;
496 for (int i = 0; i < nbatches; ++i) {
497 if (!batches[i]->schema()->Equals(*schema, false)) {
498 return Status::Invalid("Schema at index ", static_cast<int>(i),
499 " was different: \n", schema->ToString(), "\nvs\n",
500 batches[i]->schema()->ToString());
501 }
502 num_rows += batches[i]->num_rows();
503 }
504
505 std::vector<std::shared_ptr<ChunkedArray>> columns(ncolumns);
506 std::vector<std::shared_ptr<Array>> column_arrays(nbatches);
507
508 for (int i = 0; i < ncolumns; ++i) {
509 for (int j = 0; j < nbatches; ++j) {
510 column_arrays[j] = batches[j]->column(i);
511 }
512 columns[i] = std::make_shared<ChunkedArray>(column_arrays, schema->field(i)->type());
513 }
514
515 return Table::Make(std::move(schema), std::move(columns), num_rows);
516 }
517
FromRecordBatches(std::shared_ptr<Schema> schema,const std::vector<std::shared_ptr<RecordBatch>> & batches,std::shared_ptr<Table> * table)518 Status Table::FromRecordBatches(std::shared_ptr<Schema> schema,
519 const std::vector<std::shared_ptr<RecordBatch>>& batches,
520 std::shared_ptr<Table>* table) {
521 return FromRecordBatches(std::move(schema), batches).Value(table);
522 }
523
FromRecordBatches(const std::vector<std::shared_ptr<RecordBatch>> & batches)524 Result<std::shared_ptr<Table>> Table::FromRecordBatches(
525 const std::vector<std::shared_ptr<RecordBatch>>& batches) {
526 if (batches.size() == 0) {
527 return Status::Invalid("Must pass at least one record batch or an explicit Schema");
528 }
529
530 return FromRecordBatches(batches[0]->schema(), batches);
531 }
532
FromRecordBatches(const std::vector<std::shared_ptr<RecordBatch>> & batches,std::shared_ptr<Table> * table)533 Status Table::FromRecordBatches(const std::vector<std::shared_ptr<RecordBatch>>& batches,
534 std::shared_ptr<Table>* table) {
535 return FromRecordBatches(batches).Value(table);
536 }
537
FromChunkedStructArray(const std::shared_ptr<ChunkedArray> & array)538 Result<std::shared_ptr<Table>> Table::FromChunkedStructArray(
539 const std::shared_ptr<ChunkedArray>& array) {
540 auto type = array->type();
541 if (type->id() != Type::STRUCT) {
542 return Status::Invalid("Expected a chunked struct array, got ", *type);
543 }
544 int num_columns = type->num_fields();
545 int num_chunks = array->num_chunks();
546
547 const auto& struct_chunks = array->chunks();
548 std::vector<std::shared_ptr<ChunkedArray>> columns(num_columns);
549 for (int i = 0; i < num_columns; ++i) {
550 ArrayVector chunks(num_chunks);
551 std::transform(struct_chunks.begin(), struct_chunks.end(), chunks.begin(),
552 [i](const std::shared_ptr<Array>& struct_chunk) {
553 return static_cast<const StructArray&>(*struct_chunk).field(i);
554 });
555 columns[i] = std::make_shared<ChunkedArray>(std::move(chunks));
556 }
557
558 return Table::Make(::arrow::schema(type->fields()), std::move(columns),
559 array->length());
560 }
561
FromChunkedStructArray(const std::shared_ptr<ChunkedArray> & array,std::shared_ptr<Table> * table)562 Status Table::FromChunkedStructArray(const std::shared_ptr<ChunkedArray>& array,
563 std::shared_ptr<Table>* table) {
564 return FromChunkedStructArray(array).Value(table);
565 }
566
RemoveColumn(int i,std::shared_ptr<Table> * out) const567 Status Table::RemoveColumn(int i, std::shared_ptr<Table>* out) const {
568 return RemoveColumn(i).Value(out);
569 }
570
AddColumn(int i,std::shared_ptr<Field> field_arg,std::shared_ptr<ChunkedArray> column,std::shared_ptr<Table> * out) const571 Status Table::AddColumn(int i, std::shared_ptr<Field> field_arg,
572 std::shared_ptr<ChunkedArray> column,
573 std::shared_ptr<Table>* out) const {
574 return AddColumn(i, std::move(field_arg), std::move(column)).Value(out);
575 }
576
SetColumn(int i,std::shared_ptr<Field> field_arg,std::shared_ptr<ChunkedArray> column,std::shared_ptr<Table> * out) const577 Status Table::SetColumn(int i, std::shared_ptr<Field> field_arg,
578 std::shared_ptr<ChunkedArray> column,
579 std::shared_ptr<Table>* out) const {
580 return SetColumn(i, std::move(field_arg), std::move(column)).Value(out);
581 }
582
ColumnNames() const583 std::vector<std::string> Table::ColumnNames() const {
584 std::vector<std::string> names(num_columns());
585 for (int i = 0; i < num_columns(); ++i) {
586 names[i] = field(i)->name();
587 }
588 return names;
589 }
590
RenameColumns(const std::vector<std::string> & names) const591 Result<std::shared_ptr<Table>> Table::RenameColumns(
592 const std::vector<std::string>& names) const {
593 if (names.size() != static_cast<size_t>(num_columns())) {
594 return Status::Invalid("tried to rename a table of ", num_columns(),
595 " columns but only ", names.size(), " names were provided");
596 }
597 std::vector<std::shared_ptr<ChunkedArray>> columns(num_columns());
598 std::vector<std::shared_ptr<Field>> fields(num_columns());
599 for (int i = 0; i < num_columns(); ++i) {
600 columns[i] = column(i);
601 fields[i] = field(i)->WithName(names[i]);
602 }
603 return Table::Make(::arrow::schema(std::move(fields)), std::move(columns), num_rows());
604 }
605
RenameColumns(const std::vector<std::string> & names,std::shared_ptr<Table> * out) const606 Status Table::RenameColumns(const std::vector<std::string>& names,
607 std::shared_ptr<Table>* out) const {
608 return RenameColumns(names).Value(out);
609 }
610
Flatten(MemoryPool * pool,std::shared_ptr<Table> * out) const611 Status Table::Flatten(MemoryPool* pool, std::shared_ptr<Table>* out) const {
612 return Flatten(pool).Value(out);
613 }
614
ToString() const615 std::string Table::ToString() const {
616 std::stringstream ss;
617 ARROW_CHECK_OK(PrettyPrint(*this, 0, &ss));
618 return ss.str();
619 }
620
ConcatenateTables(const std::vector<std::shared_ptr<Table>> & tables,const ConcatenateTablesOptions options,MemoryPool * memory_pool)621 Result<std::shared_ptr<Table>> ConcatenateTables(
622 const std::vector<std::shared_ptr<Table>>& tables,
623 const ConcatenateTablesOptions options, MemoryPool* memory_pool) {
624 if (tables.size() == 0) {
625 return Status::Invalid("Must pass at least one table");
626 }
627
628 std::vector<std::shared_ptr<Table>> promoted_tables;
629 const std::vector<std::shared_ptr<Table>>* tables_to_concat = &tables;
630 if (options.unify_schemas) {
631 std::vector<std::shared_ptr<Schema>> schemas;
632 schemas.reserve(tables.size());
633 for (const auto& t : tables) {
634 schemas.push_back(t->schema());
635 }
636
637 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Schema> unified_schema,
638 UnifySchemas(schemas, options.field_merge_options));
639
640 promoted_tables.reserve(tables.size());
641 for (const auto& t : tables) {
642 promoted_tables.emplace_back();
643 ARROW_ASSIGN_OR_RAISE(promoted_tables.back(),
644 PromoteTableToSchema(t, unified_schema, memory_pool));
645 }
646 tables_to_concat = &promoted_tables;
647 } else {
648 auto first_schema = tables[0]->schema();
649 for (size_t i = 1; i < tables.size(); ++i) {
650 if (!tables[i]->schema()->Equals(*first_schema, false)) {
651 return Status::Invalid("Schema at index ", i, " was different: \n",
652 first_schema->ToString(), "\nvs\n",
653 tables[i]->schema()->ToString());
654 }
655 }
656 }
657
658 std::shared_ptr<Schema> schema = tables_to_concat->front()->schema();
659
660 const int ncolumns = schema->num_fields();
661
662 std::vector<std::shared_ptr<ChunkedArray>> columns(ncolumns);
663 for (int i = 0; i < ncolumns; ++i) {
664 std::vector<std::shared_ptr<Array>> column_arrays;
665 for (const auto& table : *tables_to_concat) {
666 const std::vector<std::shared_ptr<Array>>& chunks = table->column(i)->chunks();
667 for (const auto& chunk : chunks) {
668 column_arrays.push_back(chunk);
669 }
670 }
671 columns[i] = std::make_shared<ChunkedArray>(column_arrays, schema->field(i)->type());
672 }
673 return Table::Make(schema, columns);
674 }
675
PromoteTableToSchema(const std::shared_ptr<Table> & table,const std::shared_ptr<Schema> & schema,MemoryPool * pool)676 Result<std::shared_ptr<Table>> PromoteTableToSchema(const std::shared_ptr<Table>& table,
677 const std::shared_ptr<Schema>& schema,
678 MemoryPool* pool) {
679 const std::shared_ptr<Schema> current_schema = table->schema();
680 if (current_schema->Equals(*schema, /*check_metadata=*/false)) {
681 return table->ReplaceSchemaMetadata(schema->metadata());
682 }
683
684 // fields_seen[i] == true iff that field is also in `schema`.
685 std::vector<bool> fields_seen(current_schema->num_fields(), false);
686
687 std::vector<std::shared_ptr<ChunkedArray>> columns;
688 columns.reserve(schema->num_fields());
689 const int64_t num_rows = table->num_rows();
690 auto AppendColumnOfNulls = [pool, &columns,
691 num_rows](const std::shared_ptr<DataType>& type) {
692 // TODO(bkietz): share the zero-filled buffers as much as possible across
693 // the null-filled arrays created here.
694 ARROW_ASSIGN_OR_RAISE(auto array_of_nulls, MakeArrayOfNull(type, num_rows, pool));
695 columns.push_back(std::make_shared<ChunkedArray>(array_of_nulls));
696 return Status::OK();
697 };
698
699 for (const auto& field : schema->fields()) {
700 const std::vector<int> field_indices =
701 current_schema->GetAllFieldIndices(field->name());
702 if (field_indices.empty()) {
703 RETURN_NOT_OK(AppendColumnOfNulls(field->type()));
704 continue;
705 }
706
707 if (field_indices.size() > 1) {
708 return Status::Invalid(
709 "PromoteTableToSchema cannot handle schemas with duplicate fields: ",
710 field->name());
711 }
712
713 const int field_index = field_indices[0];
714 const auto& current_field = current_schema->field(field_index);
715 if (!field->nullable() && current_field->nullable()) {
716 return Status::Invalid("Unable to promote field ", current_field->name(),
717 ": it was nullable but the target schema was not.");
718 }
719
720 fields_seen[field_index] = true;
721 if (current_field->type()->Equals(field->type())) {
722 columns.push_back(table->column(field_index));
723 continue;
724 }
725
726 if (current_field->type()->id() == Type::NA) {
727 RETURN_NOT_OK(AppendColumnOfNulls(field->type()));
728 continue;
729 }
730
731 return Status::Invalid("Unable to promote field ", field->name(),
732 ": incompatible types: ", field->type()->ToString(), " vs ",
733 current_field->type()->ToString());
734 }
735
736 auto unseen_field_iter = std::find(fields_seen.begin(), fields_seen.end(), false);
737 if (unseen_field_iter != fields_seen.end()) {
738 const size_t unseen_field_index = unseen_field_iter - fields_seen.begin();
739 return Status::Invalid(
740 "Incompatible schemas: field ",
741 current_schema->field(static_cast<int>(unseen_field_index))->name(),
742 " did not exist in the new schema.");
743 }
744
745 return Table::Make(schema, std::move(columns));
746 }
747
Equals(const Table & other,bool check_metadata) const748 bool Table::Equals(const Table& other, bool check_metadata) const {
749 if (this == &other) {
750 return true;
751 }
752 if (!schema_->Equals(*other.schema(), check_metadata)) {
753 return false;
754 }
755 if (this->num_columns() != other.num_columns()) {
756 return false;
757 }
758
759 for (int i = 0; i < this->num_columns(); i++) {
760 if (!this->column(i)->Equals(other.column(i))) {
761 return false;
762 }
763 }
764 return true;
765 }
766
CombineChunks(MemoryPool * pool) const767 Result<std::shared_ptr<Table>> Table::CombineChunks(MemoryPool* pool) const {
768 const int ncolumns = num_columns();
769 std::vector<std::shared_ptr<ChunkedArray>> compacted_columns(ncolumns);
770 for (int i = 0; i < ncolumns; ++i) {
771 auto col = column(i);
772 if (col->num_chunks() <= 1) {
773 compacted_columns[i] = col;
774 } else {
775 std::shared_ptr<Array> compacted;
776 RETURN_NOT_OK(Concatenate(col->chunks(), pool, &compacted));
777 compacted_columns[i] = std::make_shared<ChunkedArray>(compacted);
778 }
779 }
780 return Table::Make(schema(), std::move(compacted_columns));
781 }
782
CombineChunks(MemoryPool * pool,std::shared_ptr<Table> * out) const783 Status Table::CombineChunks(MemoryPool* pool, std::shared_ptr<Table>* out) const {
784 return CombineChunks(pool).Value(out);
785 }
786
787 // ----------------------------------------------------------------------
788 // Convert a table to a sequence of record batches
789
TableBatchReader(const Table & table)790 TableBatchReader::TableBatchReader(const Table& table)
791 : table_(table),
792 column_data_(table.num_columns()),
793 chunk_numbers_(table.num_columns(), 0),
794 chunk_offsets_(table.num_columns(), 0),
795 absolute_row_position_(0),
796 max_chunksize_(std::numeric_limits<int64_t>::max()) {
797 for (int i = 0; i < table.num_columns(); ++i) {
798 column_data_[i] = table.column(i).get();
799 }
800 }
801
schema() const802 std::shared_ptr<Schema> TableBatchReader::schema() const { return table_.schema(); }
803
set_chunksize(int64_t chunksize)804 void TableBatchReader::set_chunksize(int64_t chunksize) { max_chunksize_ = chunksize; }
805
ReadNext(std::shared_ptr<RecordBatch> * out)806 Status TableBatchReader::ReadNext(std::shared_ptr<RecordBatch>* out) {
807 if (absolute_row_position_ == table_.num_rows()) {
808 *out = nullptr;
809 return Status::OK();
810 }
811
812 // Determine the minimum contiguous slice across all columns
813 int64_t chunksize = std::min(table_.num_rows(), max_chunksize_);
814 std::vector<const Array*> chunks(table_.num_columns());
815 for (int i = 0; i < table_.num_columns(); ++i) {
816 auto chunk = column_data_[i]->chunk(chunk_numbers_[i]).get();
817 int64_t chunk_remaining = chunk->length() - chunk_offsets_[i];
818
819 if (chunk_remaining < chunksize) {
820 chunksize = chunk_remaining;
821 }
822
823 chunks[i] = chunk;
824 }
825
826 // Slice chunks and advance chunk index as appropriate
827 std::vector<std::shared_ptr<ArrayData>> batch_data(table_.num_columns());
828
829 for (int i = 0; i < table_.num_columns(); ++i) {
830 // Exhausted chunk
831 const Array* chunk = chunks[i];
832 const int64_t offset = chunk_offsets_[i];
833 std::shared_ptr<ArrayData> slice_data;
834 if ((chunk->length() - offset) == chunksize) {
835 ++chunk_numbers_[i];
836 chunk_offsets_[i] = 0;
837 if (offset > 0) {
838 // Need to slice
839 slice_data = chunk->Slice(offset, chunksize)->data();
840 } else {
841 // No slice
842 slice_data = chunk->data();
843 }
844 } else {
845 chunk_offsets_[i] += chunksize;
846 slice_data = chunk->Slice(offset, chunksize)->data();
847 }
848 batch_data[i] = std::move(slice_data);
849 }
850
851 absolute_row_position_ += chunksize;
852 *out = RecordBatch::Make(table_.schema(), chunksize, std::move(batch_data));
853
854 return Status::OK();
855 }
856
857 } // namespace arrow
858