1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include "arrow/table_builder.h"
19 
20 #include <memory>
21 #include <utility>
22 
23 #include "arrow/array.h"
24 #include "arrow/builder.h"
25 #include "arrow/record_batch.h"
26 #include "arrow/status.h"
27 #include "arrow/type.h"
28 #include "arrow/util/logging.h"
29 
30 namespace arrow {
31 
32 // ----------------------------------------------------------------------
33 // RecordBatchBuilder
34 
RecordBatchBuilder(const std::shared_ptr<Schema> & schema,MemoryPool * pool,int64_t initial_capacity)35 RecordBatchBuilder::RecordBatchBuilder(const std::shared_ptr<Schema>& schema,
36                                        MemoryPool* pool, int64_t initial_capacity)
37     : schema_(schema), initial_capacity_(initial_capacity), pool_(pool) {}
38 
Make(const std::shared_ptr<Schema> & schema,MemoryPool * pool,std::unique_ptr<RecordBatchBuilder> * builder)39 Status RecordBatchBuilder::Make(const std::shared_ptr<Schema>& schema, MemoryPool* pool,
40                                 std::unique_ptr<RecordBatchBuilder>* builder) {
41   return Make(schema, pool, kMinBuilderCapacity, builder);
42 }
43 
Make(const std::shared_ptr<Schema> & schema,MemoryPool * pool,int64_t initial_capacity,std::unique_ptr<RecordBatchBuilder> * builder)44 Status RecordBatchBuilder::Make(const std::shared_ptr<Schema>& schema, MemoryPool* pool,
45                                 int64_t initial_capacity,
46                                 std::unique_ptr<RecordBatchBuilder>* builder) {
47   builder->reset(new RecordBatchBuilder(schema, pool, initial_capacity));
48   RETURN_NOT_OK((*builder)->CreateBuilders());
49   return (*builder)->InitBuilders();
50 }
51 
Flush(bool reset_builders,std::shared_ptr<RecordBatch> * batch)52 Status RecordBatchBuilder::Flush(bool reset_builders,
53                                  std::shared_ptr<RecordBatch>* batch) {
54   std::vector<std::shared_ptr<Array>> fields;
55   fields.resize(this->num_fields());
56 
57   int64_t length = 0;
58   for (int i = 0; i < this->num_fields(); ++i) {
59     RETURN_NOT_OK(raw_field_builders_[i]->Finish(&fields[i]));
60     if (i > 0 && fields[i]->length() != length) {
61       return Status::Invalid("All fields must be same length when calling Flush");
62     }
63     length = fields[i]->length();
64   }
65   *batch = RecordBatch::Make(schema_, length, std::move(fields));
66   if (reset_builders) {
67     return InitBuilders();
68   } else {
69     return Status::OK();
70   }
71 }
72 
Flush(std::shared_ptr<RecordBatch> * batch)73 Status RecordBatchBuilder::Flush(std::shared_ptr<RecordBatch>* batch) {
74   return Flush(true, batch);
75 }
76 
SetInitialCapacity(int64_t capacity)77 void RecordBatchBuilder::SetInitialCapacity(int64_t capacity) {
78   ARROW_CHECK_GT(capacity, 0) << "Initial capacity must be positive";
79   initial_capacity_ = capacity;
80 }
81 
CreateBuilders()82 Status RecordBatchBuilder::CreateBuilders() {
83   field_builders_.resize(this->num_fields());
84   raw_field_builders_.resize(this->num_fields());
85   for (int i = 0; i < this->num_fields(); ++i) {
86     RETURN_NOT_OK(MakeBuilder(pool_, schema_->field(i)->type(), &field_builders_[i]));
87     raw_field_builders_[i] = field_builders_[i].get();
88   }
89   return Status::OK();
90 }
91 
InitBuilders()92 Status RecordBatchBuilder::InitBuilders() {
93   for (int i = 0; i < this->num_fields(); ++i) {
94     RETURN_NOT_OK(raw_field_builders_[i]->Reserve(initial_capacity_));
95   }
96   return Status::OK();
97 }
98 
99 }  // namespace arrow
100