1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "arrow/table_builder.h"
19
20 #include <memory>
21 #include <utility>
22
23 #include "arrow/array.h"
24 #include "arrow/builder.h"
25 #include "arrow/record_batch.h"
26 #include "arrow/status.h"
27 #include "arrow/type.h"
28 #include "arrow/util/logging.h"
29
30 namespace arrow {
31
32 // ----------------------------------------------------------------------
33 // RecordBatchBuilder
34
RecordBatchBuilder(const std::shared_ptr<Schema> & schema,MemoryPool * pool,int64_t initial_capacity)35 RecordBatchBuilder::RecordBatchBuilder(const std::shared_ptr<Schema>& schema,
36 MemoryPool* pool, int64_t initial_capacity)
37 : schema_(schema), initial_capacity_(initial_capacity), pool_(pool) {}
38
Make(const std::shared_ptr<Schema> & schema,MemoryPool * pool,std::unique_ptr<RecordBatchBuilder> * builder)39 Status RecordBatchBuilder::Make(const std::shared_ptr<Schema>& schema, MemoryPool* pool,
40 std::unique_ptr<RecordBatchBuilder>* builder) {
41 return Make(schema, pool, kMinBuilderCapacity, builder);
42 }
43
Make(const std::shared_ptr<Schema> & schema,MemoryPool * pool,int64_t initial_capacity,std::unique_ptr<RecordBatchBuilder> * builder)44 Status RecordBatchBuilder::Make(const std::shared_ptr<Schema>& schema, MemoryPool* pool,
45 int64_t initial_capacity,
46 std::unique_ptr<RecordBatchBuilder>* builder) {
47 builder->reset(new RecordBatchBuilder(schema, pool, initial_capacity));
48 RETURN_NOT_OK((*builder)->CreateBuilders());
49 return (*builder)->InitBuilders();
50 }
51
Flush(bool reset_builders,std::shared_ptr<RecordBatch> * batch)52 Status RecordBatchBuilder::Flush(bool reset_builders,
53 std::shared_ptr<RecordBatch>* batch) {
54 std::vector<std::shared_ptr<Array>> fields;
55 fields.resize(this->num_fields());
56
57 int64_t length = 0;
58 for (int i = 0; i < this->num_fields(); ++i) {
59 RETURN_NOT_OK(raw_field_builders_[i]->Finish(&fields[i]));
60 if (i > 0 && fields[i]->length() != length) {
61 return Status::Invalid("All fields must be same length when calling Flush");
62 }
63 length = fields[i]->length();
64 }
65 *batch = RecordBatch::Make(schema_, length, std::move(fields));
66 if (reset_builders) {
67 return InitBuilders();
68 } else {
69 return Status::OK();
70 }
71 }
72
Flush(std::shared_ptr<RecordBatch> * batch)73 Status RecordBatchBuilder::Flush(std::shared_ptr<RecordBatch>* batch) {
74 return Flush(true, batch);
75 }
76
SetInitialCapacity(int64_t capacity)77 void RecordBatchBuilder::SetInitialCapacity(int64_t capacity) {
78 ARROW_CHECK_GT(capacity, 0) << "Initial capacity must be positive";
79 initial_capacity_ = capacity;
80 }
81
CreateBuilders()82 Status RecordBatchBuilder::CreateBuilders() {
83 field_builders_.resize(this->num_fields());
84 raw_field_builders_.resize(this->num_fields());
85 for (int i = 0; i < this->num_fields(); ++i) {
86 RETURN_NOT_OK(MakeBuilder(pool_, schema_->field(i)->type(), &field_builders_[i]));
87 raw_field_builders_[i] = field_builders_[i].get();
88 }
89 return Status::OK();
90 }
91
InitBuilders()92 Status RecordBatchBuilder::InitBuilders() {
93 for (int i = 0; i < this->num_fields(); ++i) {
94 RETURN_NOT_OK(raw_field_builders_[i]->Reserve(initial_capacity_));
95 }
96 return Status::OK();
97 }
98
99 } // namespace arrow
100