1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #pragma once
19 
20 #include <cstdint>
21 #include <memory>
22 #include <string>
23 #include <utility>
24 #include <vector>
25 
26 #include "arrow/record_batch.h"
27 #include "arrow/type.h"
28 #include "arrow/type_fwd.h"
29 #include "arrow/util/macros.h"
30 #include "arrow/util/visibility.h"
31 
32 namespace arrow {
33 
34 /// \class ChunkedArray
35 /// \brief A data structure managing a list of primitive Arrow arrays logically
36 /// as one large array
37 class ARROW_EXPORT ChunkedArray {
38  public:
39   /// \brief Construct a chunked array from a vector of arrays
40   ///
41   /// The vector must be non-empty and all its elements must have the same
42   /// data type.
43   explicit ChunkedArray(ArrayVector chunks);
44 
45   /// \brief Construct a chunked array from a single Array
ChunkedArray(std::shared_ptr<Array> chunk)46   explicit ChunkedArray(std::shared_ptr<Array> chunk)
47       : ChunkedArray(ArrayVector{std::move(chunk)}) {}
48 
49   /// \brief Construct a chunked array from a vector of arrays and a data type
50   ///
51   /// As the data type is passed explicitly, the vector may be empty.
52   ChunkedArray(ArrayVector chunks, std::shared_ptr<DataType> type);
53 
54   /// \return the total length of the chunked array; computed on construction
length()55   int64_t length() const { return length_; }
56 
57   /// \return the total number of nulls among all chunks
null_count()58   int64_t null_count() const { return null_count_; }
59 
num_chunks()60   int num_chunks() const { return static_cast<int>(chunks_.size()); }
61 
62   /// \return chunk a particular chunk from the chunked array
chunk(int i)63   std::shared_ptr<Array> chunk(int i) const { return chunks_[i]; }
64 
chunks()65   const ArrayVector& chunks() const { return chunks_; }
66 
67   /// \brief Construct a zero-copy slice of the chunked array with the
68   /// indicated offset and length
69   ///
70   /// \param[in] offset the position of the first element in the constructed
71   /// slice
72   /// \param[in] length the length of the slice. If there are not enough
73   /// elements in the chunked array, the length will be adjusted accordingly
74   ///
75   /// \return a new object wrapped in std::shared_ptr<ChunkedArray>
76   std::shared_ptr<ChunkedArray> Slice(int64_t offset, int64_t length) const;
77 
78   /// \brief Slice from offset until end of the chunked array
79   std::shared_ptr<ChunkedArray> Slice(int64_t offset) const;
80 
81   /// \brief Flatten this chunked array as a vector of chunked arrays, one
82   /// for each struct field
83   ///
84   /// \param[in] pool The pool for buffer allocations, if any
85   Result<std::vector<std::shared_ptr<ChunkedArray>>> Flatten(
86       MemoryPool* pool = default_memory_pool()) const;
87 
88   ARROW_DEPRECATED("Use Result-returning version")
89   Status Flatten(MemoryPool* pool, std::vector<std::shared_ptr<ChunkedArray>>* out) const;
90 
91   /// Construct a zero-copy view of this chunked array with the given
92   /// type. Calls Array::View on each constituent chunk. Always succeeds if
93   /// there are zero chunks
94   Result<std::shared_ptr<ChunkedArray>> View(const std::shared_ptr<DataType>& type) const;
95 
96   ARROW_DEPRECATED("Use Result-returning version")
97   Status View(const std::shared_ptr<DataType>& type,
98               std::shared_ptr<ChunkedArray>* out) const;
99 
type()100   std::shared_ptr<DataType> type() const { return type_; }
101 
102   /// \brief Determine if two chunked arrays are equal.
103   ///
104   /// Two chunked arrays can be equal only if they have equal datatypes.
105   /// However, they may be equal even if they have different chunkings.
106   bool Equals(const ChunkedArray& other) const;
107   /// \brief Determine if two chunked arrays are equal.
108   bool Equals(const std::shared_ptr<ChunkedArray>& other) const;
109 
110   /// \return PrettyPrint representation suitable for debugging
111   std::string ToString() const;
112 
113   /// \brief Perform cheap validation checks to determine obvious inconsistencies
114   /// within the chunk array's internal data.
115   ///
116   /// This is O(k*m) where k is the number of array descendents,
117   /// and m is the number of chunks.
118   ///
119   /// \return Status
120   Status Validate() const;
121 
122   /// \brief Perform extensive validation checks to determine inconsistencies
123   /// within the chunk array's internal data.
124   ///
125   /// This is O(k*n) where k is the number of array descendents,
126   /// and n is the length in elements.
127   ///
128   /// \return Status
129   Status ValidateFull() const;
130 
131  protected:
132   ArrayVector chunks_;
133   int64_t length_;
134   int64_t null_count_;
135   std::shared_ptr<DataType> type_;
136 
137  private:
138   ARROW_DISALLOW_COPY_AND_ASSIGN(ChunkedArray);
139 };
140 
141 namespace internal {
142 
143 /// \brief EXPERIMENTAL: Utility for incremental iteration over contiguous
144 /// pieces of potentially differently-chunked ChunkedArray objects
145 class ARROW_EXPORT MultipleChunkIterator {
146  public:
MultipleChunkIterator(const ChunkedArray & left,const ChunkedArray & right)147   MultipleChunkIterator(const ChunkedArray& left, const ChunkedArray& right)
148       : left_(left),
149         right_(right),
150         pos_(0),
151         length_(left.length()),
152         chunk_idx_left_(0),
153         chunk_idx_right_(0),
154         chunk_pos_left_(0),
155         chunk_pos_right_(0) {}
156 
157   bool Next(std::shared_ptr<Array>* next_left, std::shared_ptr<Array>* next_right);
158 
position()159   int64_t position() const { return pos_; }
160 
161  private:
162   const ChunkedArray& left_;
163   const ChunkedArray& right_;
164 
165   // The amount of the entire ChunkedArray consumed
166   int64_t pos_;
167 
168   // Length of the chunked array(s)
169   int64_t length_;
170 
171   // Current left chunk
172   int chunk_idx_left_;
173 
174   // Current right chunk
175   int chunk_idx_right_;
176 
177   // Offset into the current left chunk
178   int64_t chunk_pos_left_;
179 
180   // Offset into the current right chunk
181   int64_t chunk_pos_right_;
182 };
183 
184 /// \brief Evaluate binary function on two ChunkedArray objects having possibly
185 /// different chunk layouts. The passed binary function / functor should have
186 /// the following signature.
187 ///
188 ///    Status(const Array&, const Array&, int64_t)
189 ///
190 /// The third argument is the absolute position relative to the start of each
191 /// ChunkedArray. The function is executed against each contiguous pair of
192 /// array segments, slicing if necessary.
193 ///
194 /// For example, if two arrays have chunk sizes
195 ///
196 ///   left: [10, 10, 20]
197 ///   right: [15, 10, 15]
198 ///
199 /// Then the following invocations take place (pseudocode)
200 ///
201 ///   func(left.chunk[0][0:10], right.chunk[0][0:10], 0)
202 ///   func(left.chunk[1][0:5], right.chunk[0][10:15], 10)
203 ///   func(left.chunk[1][5:10], right.chunk[1][0:5], 15)
204 ///   func(left.chunk[2][0:5], right.chunk[1][5:10], 20)
205 ///   func(left.chunk[2][5:20], right.chunk[2][:], 25)
206 template <typename Action>
ApplyBinaryChunked(const ChunkedArray & left,const ChunkedArray & right,Action && action)207 Status ApplyBinaryChunked(const ChunkedArray& left, const ChunkedArray& right,
208                           Action&& action) {
209   MultipleChunkIterator iterator(left, right);
210   std::shared_ptr<Array> left_piece, right_piece;
211   while (iterator.Next(&left_piece, &right_piece)) {
212     ARROW_RETURN_NOT_OK(action(*left_piece, *right_piece, iterator.position()));
213   }
214   return Status::OK();
215 }
216 
217 }  // namespace internal
218 
219 /// \class Table
220 /// \brief Logical table as sequence of chunked arrays
221 class ARROW_EXPORT Table {
222  public:
223   virtual ~Table() = default;
224 
225   /// \brief Construct a Table from schema and columns
226   ///
227   /// If columns is zero-length, the table's number of rows is zero
228   ///
229   /// \param[in] schema The table schema (column types)
230   /// \param[in] columns The table's columns as chunked arrays
231   /// \param[in] num_rows number of rows in table, -1 (default) to infer from columns
232   static std::shared_ptr<Table> Make(std::shared_ptr<Schema> schema,
233                                      std::vector<std::shared_ptr<ChunkedArray>> columns,
234                                      int64_t num_rows = -1);
235 
236   /// \brief Construct a Table from schema and arrays
237   ///
238   /// \param[in] schema The table schema (column types)
239   /// \param[in] arrays The table's columns as arrays
240   /// \param[in] num_rows number of rows in table, -1 (default) to infer from columns
241   static std::shared_ptr<Table> Make(std::shared_ptr<Schema> schema,
242                                      const std::vector<std::shared_ptr<Array>>& arrays,
243                                      int64_t num_rows = -1);
244 
245   /// \brief Construct a Table from a RecordBatchReader.
246   ///
247   /// \param[in] reader the arrow::Schema for each batch
248   static Result<std::shared_ptr<Table>> FromRecordBatchReader(RecordBatchReader* reader);
249 
250   /// \brief Construct a Table from RecordBatches, using schema supplied by the first
251   /// RecordBatch.
252   ///
253   /// \param[in] batches a std::vector of record batches
254   static Result<std::shared_ptr<Table>> FromRecordBatches(
255       const std::vector<std::shared_ptr<RecordBatch>>& batches);
256 
257   ARROW_DEPRECATED("Use Result-returning version")
258   static Status FromRecordBatches(
259       const std::vector<std::shared_ptr<RecordBatch>>& batches,
260       std::shared_ptr<Table>* table);
261 
262   /// \brief Construct a Table from RecordBatches, using supplied schema. There may be
263   /// zero record batches
264   ///
265   /// \param[in] schema the arrow::Schema for each batch
266   /// \param[in] batches a std::vector of record batches
267   static Result<std::shared_ptr<Table>> FromRecordBatches(
268       std::shared_ptr<Schema> schema,
269       const std::vector<std::shared_ptr<RecordBatch>>& batches);
270 
271   ARROW_DEPRECATED("Use Result-returning version")
272   static Status FromRecordBatches(
273       std::shared_ptr<Schema> schema,
274       const std::vector<std::shared_ptr<RecordBatch>>& batches,
275       std::shared_ptr<Table>* table);
276 
277   /// \brief Construct a Table from a chunked StructArray. One column will be produced
278   /// for each field of the StructArray.
279   ///
280   /// \param[in] array a chunked StructArray
281   static Result<std::shared_ptr<Table>> FromChunkedStructArray(
282       const std::shared_ptr<ChunkedArray>& array);
283 
284   ARROW_DEPRECATED("Use Result-returning version")
285   static Status FromChunkedStructArray(const std::shared_ptr<ChunkedArray>& array,
286                                        std::shared_ptr<Table>* table);
287 
288   /// \brief Return the table schema
schema()289   std::shared_ptr<Schema> schema() const { return schema_; }
290 
291   /// \brief Return a column by index
292   virtual std::shared_ptr<ChunkedArray> column(int i) const = 0;
293 
294   /// \brief Return vector of all columns for table
295   std::vector<std::shared_ptr<ChunkedArray>> columns() const;
296 
297   /// Return a column's field by index
field(int i)298   std::shared_ptr<Field> field(int i) const { return schema_->field(i); }
299 
300   /// \brief Return vector of all fields for table
301   std::vector<std::shared_ptr<Field>> fields() const;
302 
303   /// \brief Construct a zero-copy slice of the table with the
304   /// indicated offset and length
305   ///
306   /// \param[in] offset the index of the first row in the constructed
307   /// slice
308   /// \param[in] length the number of rows of the slice. If there are not enough
309   /// rows in the table, the length will be adjusted accordingly
310   ///
311   /// \return a new object wrapped in std::shared_ptr<Table>
312   virtual std::shared_ptr<Table> Slice(int64_t offset, int64_t length) const = 0;
313 
314   /// \brief Slice from first row at offset until end of the table
Slice(int64_t offset)315   std::shared_ptr<Table> Slice(int64_t offset) const { return Slice(offset, num_rows_); }
316 
317   /// \brief Return a column by name
318   /// \param[in] name field name
319   /// \return an Array or null if no field was found
GetColumnByName(const std::string & name)320   std::shared_ptr<ChunkedArray> GetColumnByName(const std::string& name) const {
321     auto i = schema_->GetFieldIndex(name);
322     return i == -1 ? NULLPTR : column(i);
323   }
324 
325   /// \brief Remove column from the table, producing a new Table
326   virtual Result<std::shared_ptr<Table>> RemoveColumn(int i) const = 0;
327 
328   ARROW_DEPRECATED("Use Result-returning version")
329   Status RemoveColumn(int i, std::shared_ptr<Table>* out) const;
330 
331   /// \brief Add column to the table, producing a new Table
332   virtual Result<std::shared_ptr<Table>> AddColumn(
333       int i, std::shared_ptr<Field> field_arg,
334       std::shared_ptr<ChunkedArray> column) const = 0;
335 
336   ARROW_DEPRECATED("Use Result-returning version")
337   Status AddColumn(int i, std::shared_ptr<Field> field_arg,
338                    std::shared_ptr<ChunkedArray> column,
339                    std::shared_ptr<Table>* out) const;
340 
341   /// \brief Replace a column in the table, producing a new Table
342   virtual Result<std::shared_ptr<Table>> SetColumn(
343       int i, std::shared_ptr<Field> field_arg,
344       std::shared_ptr<ChunkedArray> column) const = 0;
345 
346   ARROW_DEPRECATED("Use Result-returning version")
347   Status SetColumn(int i, std::shared_ptr<Field> field_arg,
348                    std::shared_ptr<ChunkedArray> column,
349                    std::shared_ptr<Table>* out) const;
350 
351   /// \brief Return names of all columns
352   std::vector<std::string> ColumnNames() const;
353 
354   /// \brief Rename columns with provided names
355   Result<std::shared_ptr<Table>> RenameColumns(
356       const std::vector<std::string>& names) const;
357 
358   ARROW_DEPRECATED("Use Result-returning version")
359   Status RenameColumns(const std::vector<std::string>& names,
360                        std::shared_ptr<Table>* out) const;
361 
362   /// \brief Replace schema key-value metadata with new metadata (EXPERIMENTAL)
363   /// \since 0.5.0
364   ///
365   /// \param[in] metadata new KeyValueMetadata
366   /// \return new Table
367   virtual std::shared_ptr<Table> ReplaceSchemaMetadata(
368       const std::shared_ptr<const KeyValueMetadata>& metadata) const = 0;
369 
370   /// \brief Flatten the table, producing a new Table.  Any column with a
371   /// struct type will be flattened into multiple columns
372   ///
373   /// \param[in] pool The pool for buffer allocations, if any
374   virtual Result<std::shared_ptr<Table>> Flatten(
375       MemoryPool* pool = default_memory_pool()) const = 0;
376 
377   ARROW_DEPRECATED("Use Result-returning version")
378   Status Flatten(MemoryPool* pool, std::shared_ptr<Table>* out) const;
379 
380   /// \return PrettyPrint representation suitable for debugging
381   std::string ToString() const;
382 
383   /// \brief Perform cheap validation checks to determine obvious inconsistencies
384   /// within the table's schema and internal data.
385   ///
386   /// This is O(k*m) where k is the total number of field descendents,
387   /// and m is the number of chunks.
388   ///
389   /// \return Status
390   virtual Status Validate() const = 0;
391 
392   /// \brief Perform extensive validation checks to determine inconsistencies
393   /// within the table's schema and internal data.
394   ///
395   /// This is O(k*n) where k is the total number of field descendents,
396   /// and n is the number of rows.
397   ///
398   /// \return Status
399   virtual Status ValidateFull() const = 0;
400 
401   /// \brief Return the number of columns in the table
num_columns()402   int num_columns() const { return schema_->num_fields(); }
403 
404   /// \brief Return the number of rows (equal to each column's logical length)
num_rows()405   int64_t num_rows() const { return num_rows_; }
406 
407   /// \brief Determine if tables are equal
408   ///
409   /// Two tables can be equal only if they have equal schemas.
410   /// However, they may be equal even if they have different chunkings.
411   bool Equals(const Table& other, bool check_metadata = false) const;
412 
413   /// \brief Make a new table by combining the chunks this table has.
414   ///
415   /// All the underlying chunks in the ChunkedArray of each column are
416   /// concatenated into zero or one chunk.
417   ///
418   /// \param[in] pool The pool for buffer allocations
419   Result<std::shared_ptr<Table>> CombineChunks(
420       MemoryPool* pool = default_memory_pool()) const;
421 
422   ARROW_DEPRECATED("Use Result-returning version")
423   Status CombineChunks(MemoryPool* pool, std::shared_ptr<Table>* out) const;
424 
425  protected:
426   Table();
427 
428   std::shared_ptr<Schema> schema_;
429   int64_t num_rows_;
430 
431  private:
432   ARROW_DISALLOW_COPY_AND_ASSIGN(Table);
433 };
434 
435 /// \brief Compute a stream of record batches from a (possibly chunked) Table
436 ///
437 /// The conversion is zero-copy: each record batch is a view over a slice
438 /// of the table's columns.
439 class ARROW_EXPORT TableBatchReader : public RecordBatchReader {
440  public:
441   /// \brief Construct a TableBatchReader for the given table
442   explicit TableBatchReader(const Table& table);
443 
444   std::shared_ptr<Schema> schema() const override;
445 
446   Status ReadNext(std::shared_ptr<RecordBatch>* out) override;
447 
448   /// \brief Set the desired maximum chunk size of record batches
449   ///
450   /// The actual chunk size of each record batch may be smaller, depending
451   /// on actual chunking characteristics of each table column.
452   void set_chunksize(int64_t chunksize);
453 
454  private:
455   const Table& table_;
456   std::vector<ChunkedArray*> column_data_;
457   std::vector<int> chunk_numbers_;
458   std::vector<int64_t> chunk_offsets_;
459   int64_t absolute_row_position_;
460   int64_t max_chunksize_;
461 };
462 
463 /// \defgroup concat-tables ConcatenateTables function.
464 ///
465 /// ConcatenateTables function.
466 /// @{
467 
468 /// \brief Controls the behavior of ConcatenateTables().
469 struct ARROW_EXPORT ConcatenateTablesOptions {
470   /// If true, the schemas of the tables will be first unified with fields of
471   /// the same name being merged, according to `field_merge_options`, then each
472   /// table will be promoted to the unified schema before being concatenated.
473   /// Otherwise, all tables should have the same schema. Each column in the output table
474   /// is the result of concatenating the corresponding columns in all input tables.
475   bool unify_schemas = false;
476 
477   Field::MergeOptions field_merge_options = Field::MergeOptions::Defaults();
478 
DefaultsConcatenateTablesOptions479   static ConcatenateTablesOptions Defaults() { return ConcatenateTablesOptions(); }
480 };
481 
482 /// \brief Construct table from multiple input tables.
483 ARROW_EXPORT
484 Result<std::shared_ptr<Table>> ConcatenateTables(
485     const std::vector<std::shared_ptr<Table>>& tables,
486     ConcatenateTablesOptions options = ConcatenateTablesOptions::Defaults(),
487     MemoryPool* memory_pool = default_memory_pool());
488 
489 /// \brief Promotes a table to conform to the given schema.
490 ///
491 /// If a field in the schema does not have a corresponding column in the
492 /// table, a column of nulls will be added to the resulting table.
493 /// If the corresponding column is of type Null, it will be promoted to
494 /// the type specified by schema, with null values filled.
495 /// Returns an error:
496 /// - if the corresponding column's type is not compatible with the
497 ///   schema.
498 /// - if there is a column in the table that does not exist in the schema.
499 ///
500 /// \param[in] table the input Table
501 /// \param[in] schema the target schema to promote to
502 /// \param[in] pool The memory pool to be used if null-filled arrays need to
503 /// be created.
504 ARROW_EXPORT
505 Result<std::shared_ptr<Table>> PromoteTableToSchema(
506     const std::shared_ptr<Table>& table, const std::shared_ptr<Schema>& schema,
507     MemoryPool* pool = default_memory_pool());
508 
509 }  // namespace arrow
510