1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #pragma once
19
20 #include <cstdint>
21 #include <memory>
22 #include <string>
23 #include <utility>
24 #include <vector>
25
26 #include "arrow/record_batch.h"
27 #include "arrow/type.h"
28 #include "arrow/type_fwd.h"
29 #include "arrow/util/macros.h"
30 #include "arrow/util/visibility.h"
31
32 namespace arrow {
33
34 /// \class ChunkedArray
35 /// \brief A data structure managing a list of primitive Arrow arrays logically
36 /// as one large array
37 class ARROW_EXPORT ChunkedArray {
38 public:
39 /// \brief Construct a chunked array from a vector of arrays
40 ///
41 /// The vector must be non-empty and all its elements must have the same
42 /// data type.
43 explicit ChunkedArray(ArrayVector chunks);
44
45 /// \brief Construct a chunked array from a single Array
ChunkedArray(std::shared_ptr<Array> chunk)46 explicit ChunkedArray(std::shared_ptr<Array> chunk)
47 : ChunkedArray(ArrayVector{std::move(chunk)}) {}
48
49 /// \brief Construct a chunked array from a vector of arrays and a data type
50 ///
51 /// As the data type is passed explicitly, the vector may be empty.
52 ChunkedArray(ArrayVector chunks, std::shared_ptr<DataType> type);
53
54 /// \return the total length of the chunked array; computed on construction
length()55 int64_t length() const { return length_; }
56
57 /// \return the total number of nulls among all chunks
null_count()58 int64_t null_count() const { return null_count_; }
59
num_chunks()60 int num_chunks() const { return static_cast<int>(chunks_.size()); }
61
62 /// \return chunk a particular chunk from the chunked array
chunk(int i)63 std::shared_ptr<Array> chunk(int i) const { return chunks_[i]; }
64
chunks()65 const ArrayVector& chunks() const { return chunks_; }
66
67 /// \brief Construct a zero-copy slice of the chunked array with the
68 /// indicated offset and length
69 ///
70 /// \param[in] offset the position of the first element in the constructed
71 /// slice
72 /// \param[in] length the length of the slice. If there are not enough
73 /// elements in the chunked array, the length will be adjusted accordingly
74 ///
75 /// \return a new object wrapped in std::shared_ptr<ChunkedArray>
76 std::shared_ptr<ChunkedArray> Slice(int64_t offset, int64_t length) const;
77
78 /// \brief Slice from offset until end of the chunked array
79 std::shared_ptr<ChunkedArray> Slice(int64_t offset) const;
80
81 /// \brief Flatten this chunked array as a vector of chunked arrays, one
82 /// for each struct field
83 ///
84 /// \param[in] pool The pool for buffer allocations, if any
85 Result<std::vector<std::shared_ptr<ChunkedArray>>> Flatten(
86 MemoryPool* pool = default_memory_pool()) const;
87
88 ARROW_DEPRECATED("Use Result-returning version")
89 Status Flatten(MemoryPool* pool, std::vector<std::shared_ptr<ChunkedArray>>* out) const;
90
91 /// Construct a zero-copy view of this chunked array with the given
92 /// type. Calls Array::View on each constituent chunk. Always succeeds if
93 /// there are zero chunks
94 Result<std::shared_ptr<ChunkedArray>> View(const std::shared_ptr<DataType>& type) const;
95
96 ARROW_DEPRECATED("Use Result-returning version")
97 Status View(const std::shared_ptr<DataType>& type,
98 std::shared_ptr<ChunkedArray>* out) const;
99
type()100 std::shared_ptr<DataType> type() const { return type_; }
101
102 /// \brief Determine if two chunked arrays are equal.
103 ///
104 /// Two chunked arrays can be equal only if they have equal datatypes.
105 /// However, they may be equal even if they have different chunkings.
106 bool Equals(const ChunkedArray& other) const;
107 /// \brief Determine if two chunked arrays are equal.
108 bool Equals(const std::shared_ptr<ChunkedArray>& other) const;
109
110 /// \return PrettyPrint representation suitable for debugging
111 std::string ToString() const;
112
113 /// \brief Perform cheap validation checks to determine obvious inconsistencies
114 /// within the chunk array's internal data.
115 ///
116 /// This is O(k*m) where k is the number of array descendents,
117 /// and m is the number of chunks.
118 ///
119 /// \return Status
120 Status Validate() const;
121
122 /// \brief Perform extensive validation checks to determine inconsistencies
123 /// within the chunk array's internal data.
124 ///
125 /// This is O(k*n) where k is the number of array descendents,
126 /// and n is the length in elements.
127 ///
128 /// \return Status
129 Status ValidateFull() const;
130
131 protected:
132 ArrayVector chunks_;
133 int64_t length_;
134 int64_t null_count_;
135 std::shared_ptr<DataType> type_;
136
137 private:
138 ARROW_DISALLOW_COPY_AND_ASSIGN(ChunkedArray);
139 };
140
141 namespace internal {
142
143 /// \brief EXPERIMENTAL: Utility for incremental iteration over contiguous
144 /// pieces of potentially differently-chunked ChunkedArray objects
145 class ARROW_EXPORT MultipleChunkIterator {
146 public:
MultipleChunkIterator(const ChunkedArray & left,const ChunkedArray & right)147 MultipleChunkIterator(const ChunkedArray& left, const ChunkedArray& right)
148 : left_(left),
149 right_(right),
150 pos_(0),
151 length_(left.length()),
152 chunk_idx_left_(0),
153 chunk_idx_right_(0),
154 chunk_pos_left_(0),
155 chunk_pos_right_(0) {}
156
157 bool Next(std::shared_ptr<Array>* next_left, std::shared_ptr<Array>* next_right);
158
position()159 int64_t position() const { return pos_; }
160
161 private:
162 const ChunkedArray& left_;
163 const ChunkedArray& right_;
164
165 // The amount of the entire ChunkedArray consumed
166 int64_t pos_;
167
168 // Length of the chunked array(s)
169 int64_t length_;
170
171 // Current left chunk
172 int chunk_idx_left_;
173
174 // Current right chunk
175 int chunk_idx_right_;
176
177 // Offset into the current left chunk
178 int64_t chunk_pos_left_;
179
180 // Offset into the current right chunk
181 int64_t chunk_pos_right_;
182 };
183
184 /// \brief Evaluate binary function on two ChunkedArray objects having possibly
185 /// different chunk layouts. The passed binary function / functor should have
186 /// the following signature.
187 ///
188 /// Status(const Array&, const Array&, int64_t)
189 ///
190 /// The third argument is the absolute position relative to the start of each
191 /// ChunkedArray. The function is executed against each contiguous pair of
192 /// array segments, slicing if necessary.
193 ///
194 /// For example, if two arrays have chunk sizes
195 ///
196 /// left: [10, 10, 20]
197 /// right: [15, 10, 15]
198 ///
199 /// Then the following invocations take place (pseudocode)
200 ///
201 /// func(left.chunk[0][0:10], right.chunk[0][0:10], 0)
202 /// func(left.chunk[1][0:5], right.chunk[0][10:15], 10)
203 /// func(left.chunk[1][5:10], right.chunk[1][0:5], 15)
204 /// func(left.chunk[2][0:5], right.chunk[1][5:10], 20)
205 /// func(left.chunk[2][5:20], right.chunk[2][:], 25)
206 template <typename Action>
ApplyBinaryChunked(const ChunkedArray & left,const ChunkedArray & right,Action && action)207 Status ApplyBinaryChunked(const ChunkedArray& left, const ChunkedArray& right,
208 Action&& action) {
209 MultipleChunkIterator iterator(left, right);
210 std::shared_ptr<Array> left_piece, right_piece;
211 while (iterator.Next(&left_piece, &right_piece)) {
212 ARROW_RETURN_NOT_OK(action(*left_piece, *right_piece, iterator.position()));
213 }
214 return Status::OK();
215 }
216
217 } // namespace internal
218
219 /// \class Table
220 /// \brief Logical table as sequence of chunked arrays
221 class ARROW_EXPORT Table {
222 public:
223 virtual ~Table() = default;
224
225 /// \brief Construct a Table from schema and columns
226 ///
227 /// If columns is zero-length, the table's number of rows is zero
228 ///
229 /// \param[in] schema The table schema (column types)
230 /// \param[in] columns The table's columns as chunked arrays
231 /// \param[in] num_rows number of rows in table, -1 (default) to infer from columns
232 static std::shared_ptr<Table> Make(std::shared_ptr<Schema> schema,
233 std::vector<std::shared_ptr<ChunkedArray>> columns,
234 int64_t num_rows = -1);
235
236 /// \brief Construct a Table from schema and arrays
237 ///
238 /// \param[in] schema The table schema (column types)
239 /// \param[in] arrays The table's columns as arrays
240 /// \param[in] num_rows number of rows in table, -1 (default) to infer from columns
241 static std::shared_ptr<Table> Make(std::shared_ptr<Schema> schema,
242 const std::vector<std::shared_ptr<Array>>& arrays,
243 int64_t num_rows = -1);
244
245 /// \brief Construct a Table from a RecordBatchReader.
246 ///
247 /// \param[in] reader the arrow::Schema for each batch
248 static Result<std::shared_ptr<Table>> FromRecordBatchReader(RecordBatchReader* reader);
249
250 /// \brief Construct a Table from RecordBatches, using schema supplied by the first
251 /// RecordBatch.
252 ///
253 /// \param[in] batches a std::vector of record batches
254 static Result<std::shared_ptr<Table>> FromRecordBatches(
255 const std::vector<std::shared_ptr<RecordBatch>>& batches);
256
257 ARROW_DEPRECATED("Use Result-returning version")
258 static Status FromRecordBatches(
259 const std::vector<std::shared_ptr<RecordBatch>>& batches,
260 std::shared_ptr<Table>* table);
261
262 /// \brief Construct a Table from RecordBatches, using supplied schema. There may be
263 /// zero record batches
264 ///
265 /// \param[in] schema the arrow::Schema for each batch
266 /// \param[in] batches a std::vector of record batches
267 static Result<std::shared_ptr<Table>> FromRecordBatches(
268 std::shared_ptr<Schema> schema,
269 const std::vector<std::shared_ptr<RecordBatch>>& batches);
270
271 ARROW_DEPRECATED("Use Result-returning version")
272 static Status FromRecordBatches(
273 std::shared_ptr<Schema> schema,
274 const std::vector<std::shared_ptr<RecordBatch>>& batches,
275 std::shared_ptr<Table>* table);
276
277 /// \brief Construct a Table from a chunked StructArray. One column will be produced
278 /// for each field of the StructArray.
279 ///
280 /// \param[in] array a chunked StructArray
281 static Result<std::shared_ptr<Table>> FromChunkedStructArray(
282 const std::shared_ptr<ChunkedArray>& array);
283
284 ARROW_DEPRECATED("Use Result-returning version")
285 static Status FromChunkedStructArray(const std::shared_ptr<ChunkedArray>& array,
286 std::shared_ptr<Table>* table);
287
288 /// \brief Return the table schema
schema()289 std::shared_ptr<Schema> schema() const { return schema_; }
290
291 /// \brief Return a column by index
292 virtual std::shared_ptr<ChunkedArray> column(int i) const = 0;
293
294 /// \brief Return vector of all columns for table
295 std::vector<std::shared_ptr<ChunkedArray>> columns() const;
296
297 /// Return a column's field by index
field(int i)298 std::shared_ptr<Field> field(int i) const { return schema_->field(i); }
299
300 /// \brief Return vector of all fields for table
301 std::vector<std::shared_ptr<Field>> fields() const;
302
303 /// \brief Construct a zero-copy slice of the table with the
304 /// indicated offset and length
305 ///
306 /// \param[in] offset the index of the first row in the constructed
307 /// slice
308 /// \param[in] length the number of rows of the slice. If there are not enough
309 /// rows in the table, the length will be adjusted accordingly
310 ///
311 /// \return a new object wrapped in std::shared_ptr<Table>
312 virtual std::shared_ptr<Table> Slice(int64_t offset, int64_t length) const = 0;
313
314 /// \brief Slice from first row at offset until end of the table
Slice(int64_t offset)315 std::shared_ptr<Table> Slice(int64_t offset) const { return Slice(offset, num_rows_); }
316
317 /// \brief Return a column by name
318 /// \param[in] name field name
319 /// \return an Array or null if no field was found
GetColumnByName(const std::string & name)320 std::shared_ptr<ChunkedArray> GetColumnByName(const std::string& name) const {
321 auto i = schema_->GetFieldIndex(name);
322 return i == -1 ? NULLPTR : column(i);
323 }
324
325 /// \brief Remove column from the table, producing a new Table
326 virtual Result<std::shared_ptr<Table>> RemoveColumn(int i) const = 0;
327
328 ARROW_DEPRECATED("Use Result-returning version")
329 Status RemoveColumn(int i, std::shared_ptr<Table>* out) const;
330
331 /// \brief Add column to the table, producing a new Table
332 virtual Result<std::shared_ptr<Table>> AddColumn(
333 int i, std::shared_ptr<Field> field_arg,
334 std::shared_ptr<ChunkedArray> column) const = 0;
335
336 ARROW_DEPRECATED("Use Result-returning version")
337 Status AddColumn(int i, std::shared_ptr<Field> field_arg,
338 std::shared_ptr<ChunkedArray> column,
339 std::shared_ptr<Table>* out) const;
340
341 /// \brief Replace a column in the table, producing a new Table
342 virtual Result<std::shared_ptr<Table>> SetColumn(
343 int i, std::shared_ptr<Field> field_arg,
344 std::shared_ptr<ChunkedArray> column) const = 0;
345
346 ARROW_DEPRECATED("Use Result-returning version")
347 Status SetColumn(int i, std::shared_ptr<Field> field_arg,
348 std::shared_ptr<ChunkedArray> column,
349 std::shared_ptr<Table>* out) const;
350
351 /// \brief Return names of all columns
352 std::vector<std::string> ColumnNames() const;
353
354 /// \brief Rename columns with provided names
355 Result<std::shared_ptr<Table>> RenameColumns(
356 const std::vector<std::string>& names) const;
357
358 ARROW_DEPRECATED("Use Result-returning version")
359 Status RenameColumns(const std::vector<std::string>& names,
360 std::shared_ptr<Table>* out) const;
361
362 /// \brief Replace schema key-value metadata with new metadata (EXPERIMENTAL)
363 /// \since 0.5.0
364 ///
365 /// \param[in] metadata new KeyValueMetadata
366 /// \return new Table
367 virtual std::shared_ptr<Table> ReplaceSchemaMetadata(
368 const std::shared_ptr<const KeyValueMetadata>& metadata) const = 0;
369
370 /// \brief Flatten the table, producing a new Table. Any column with a
371 /// struct type will be flattened into multiple columns
372 ///
373 /// \param[in] pool The pool for buffer allocations, if any
374 virtual Result<std::shared_ptr<Table>> Flatten(
375 MemoryPool* pool = default_memory_pool()) const = 0;
376
377 ARROW_DEPRECATED("Use Result-returning version")
378 Status Flatten(MemoryPool* pool, std::shared_ptr<Table>* out) const;
379
380 /// \return PrettyPrint representation suitable for debugging
381 std::string ToString() const;
382
383 /// \brief Perform cheap validation checks to determine obvious inconsistencies
384 /// within the table's schema and internal data.
385 ///
386 /// This is O(k*m) where k is the total number of field descendents,
387 /// and m is the number of chunks.
388 ///
389 /// \return Status
390 virtual Status Validate() const = 0;
391
392 /// \brief Perform extensive validation checks to determine inconsistencies
393 /// within the table's schema and internal data.
394 ///
395 /// This is O(k*n) where k is the total number of field descendents,
396 /// and n is the number of rows.
397 ///
398 /// \return Status
399 virtual Status ValidateFull() const = 0;
400
401 /// \brief Return the number of columns in the table
num_columns()402 int num_columns() const { return schema_->num_fields(); }
403
404 /// \brief Return the number of rows (equal to each column's logical length)
num_rows()405 int64_t num_rows() const { return num_rows_; }
406
407 /// \brief Determine if tables are equal
408 ///
409 /// Two tables can be equal only if they have equal schemas.
410 /// However, they may be equal even if they have different chunkings.
411 bool Equals(const Table& other, bool check_metadata = false) const;
412
413 /// \brief Make a new table by combining the chunks this table has.
414 ///
415 /// All the underlying chunks in the ChunkedArray of each column are
416 /// concatenated into zero or one chunk.
417 ///
418 /// \param[in] pool The pool for buffer allocations
419 Result<std::shared_ptr<Table>> CombineChunks(
420 MemoryPool* pool = default_memory_pool()) const;
421
422 ARROW_DEPRECATED("Use Result-returning version")
423 Status CombineChunks(MemoryPool* pool, std::shared_ptr<Table>* out) const;
424
425 protected:
426 Table();
427
428 std::shared_ptr<Schema> schema_;
429 int64_t num_rows_;
430
431 private:
432 ARROW_DISALLOW_COPY_AND_ASSIGN(Table);
433 };
434
435 /// \brief Compute a stream of record batches from a (possibly chunked) Table
436 ///
437 /// The conversion is zero-copy: each record batch is a view over a slice
438 /// of the table's columns.
439 class ARROW_EXPORT TableBatchReader : public RecordBatchReader {
440 public:
441 /// \brief Construct a TableBatchReader for the given table
442 explicit TableBatchReader(const Table& table);
443
444 std::shared_ptr<Schema> schema() const override;
445
446 Status ReadNext(std::shared_ptr<RecordBatch>* out) override;
447
448 /// \brief Set the desired maximum chunk size of record batches
449 ///
450 /// The actual chunk size of each record batch may be smaller, depending
451 /// on actual chunking characteristics of each table column.
452 void set_chunksize(int64_t chunksize);
453
454 private:
455 const Table& table_;
456 std::vector<ChunkedArray*> column_data_;
457 std::vector<int> chunk_numbers_;
458 std::vector<int64_t> chunk_offsets_;
459 int64_t absolute_row_position_;
460 int64_t max_chunksize_;
461 };
462
463 /// \defgroup concat-tables ConcatenateTables function.
464 ///
465 /// ConcatenateTables function.
466 /// @{
467
468 /// \brief Controls the behavior of ConcatenateTables().
469 struct ARROW_EXPORT ConcatenateTablesOptions {
470 /// If true, the schemas of the tables will be first unified with fields of
471 /// the same name being merged, according to `field_merge_options`, then each
472 /// table will be promoted to the unified schema before being concatenated.
473 /// Otherwise, all tables should have the same schema. Each column in the output table
474 /// is the result of concatenating the corresponding columns in all input tables.
475 bool unify_schemas = false;
476
477 Field::MergeOptions field_merge_options = Field::MergeOptions::Defaults();
478
DefaultsConcatenateTablesOptions479 static ConcatenateTablesOptions Defaults() { return ConcatenateTablesOptions(); }
480 };
481
482 /// \brief Construct table from multiple input tables.
483 ARROW_EXPORT
484 Result<std::shared_ptr<Table>> ConcatenateTables(
485 const std::vector<std::shared_ptr<Table>>& tables,
486 ConcatenateTablesOptions options = ConcatenateTablesOptions::Defaults(),
487 MemoryPool* memory_pool = default_memory_pool());
488
489 /// \brief Promotes a table to conform to the given schema.
490 ///
491 /// If a field in the schema does not have a corresponding column in the
492 /// table, a column of nulls will be added to the resulting table.
493 /// If the corresponding column is of type Null, it will be promoted to
494 /// the type specified by schema, with null values filled.
495 /// Returns an error:
496 /// - if the corresponding column's type is not compatible with the
497 /// schema.
498 /// - if there is a column in the table that does not exist in the schema.
499 ///
500 /// \param[in] table the input Table
501 /// \param[in] schema the target schema to promote to
502 /// \param[in] pool The memory pool to be used if null-filled arrays need to
503 /// be created.
504 ARROW_EXPORT
505 Result<std::shared_ptr<Table>> PromoteTableToSchema(
506 const std::shared_ptr<Table>& table, const std::shared_ptr<Schema>& schema,
507 MemoryPool* pool = default_memory_pool());
508
509 } // namespace arrow
510