1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include "arrow/json/chunked_builder.h"
19 
20 #include <mutex>
21 #include <string>
22 #include <unordered_map>
23 #include <utility>
24 #include <vector>
25 
26 #include "arrow/array.h"
27 #include "arrow/buffer.h"
28 #include "arrow/json/converter.h"
29 #include "arrow/table.h"
30 #include "arrow/util/checked_cast.h"
31 #include "arrow/util/logging.h"
32 #include "arrow/util/task_group.h"
33 
34 namespace arrow {
35 
36 using internal::checked_cast;
37 using internal::TaskGroup;
38 
39 namespace json {
40 
41 class NonNestedChunkedArrayBuilder : public ChunkedArrayBuilder {
42  public:
NonNestedChunkedArrayBuilder(const std::shared_ptr<TaskGroup> & task_group,std::shared_ptr<Converter> converter)43   NonNestedChunkedArrayBuilder(const std::shared_ptr<TaskGroup>& task_group,
44                                std::shared_ptr<Converter> converter)
45       : ChunkedArrayBuilder(task_group), converter_(std::move(converter)) {}
46 
Finish(std::shared_ptr<ChunkedArray> * out)47   Status Finish(std::shared_ptr<ChunkedArray>* out) override {
48     RETURN_NOT_OK(task_group_->Finish());
49     *out = std::make_shared<ChunkedArray>(std::move(chunks_), converter_->out_type());
50     chunks_.clear();
51     return Status::OK();
52   }
53 
ReplaceTaskGroup(const std::shared_ptr<TaskGroup> & task_group)54   Status ReplaceTaskGroup(const std::shared_ptr<TaskGroup>& task_group) override {
55     RETURN_NOT_OK(task_group_->Finish());
56     task_group_ = task_group;
57     return Status::OK();
58   }
59 
60  protected:
61   ArrayVector chunks_;
62   std::mutex mutex_;
63   std::shared_ptr<Converter> converter_;
64 };
65 
66 class TypedChunkedArrayBuilder
67     : public NonNestedChunkedArrayBuilder,
68       public std::enable_shared_from_this<TypedChunkedArrayBuilder> {
69  public:
70   using NonNestedChunkedArrayBuilder::NonNestedChunkedArrayBuilder;
71 
Insert(int64_t block_index,const std::shared_ptr<Field> &,const std::shared_ptr<Array> & unconverted)72   void Insert(int64_t block_index, const std::shared_ptr<Field>&,
73               const std::shared_ptr<Array>& unconverted) override {
74     std::unique_lock<std::mutex> lock(mutex_);
75     if (chunks_.size() <= static_cast<size_t>(block_index)) {
76       chunks_.resize(static_cast<size_t>(block_index) + 1, nullptr);
77     }
78     lock.unlock();
79 
80     auto self = shared_from_this();
81 
82     task_group_->Append([self, block_index, unconverted] {
83       std::shared_ptr<Array> converted;
84       RETURN_NOT_OK(self->converter_->Convert(unconverted, &converted));
85       std::unique_lock<std::mutex> lock(self->mutex_);
86       self->chunks_[block_index] = std::move(converted);
87       return Status::OK();
88     });
89   }
90 };
91 
92 class InferringChunkedArrayBuilder
93     : public NonNestedChunkedArrayBuilder,
94       public std::enable_shared_from_this<InferringChunkedArrayBuilder> {
95  public:
InferringChunkedArrayBuilder(const std::shared_ptr<TaskGroup> & task_group,const PromotionGraph * promotion_graph,std::shared_ptr<Converter> converter)96   InferringChunkedArrayBuilder(const std::shared_ptr<TaskGroup>& task_group,
97                                const PromotionGraph* promotion_graph,
98                                std::shared_ptr<Converter> converter)
99       : NonNestedChunkedArrayBuilder(task_group, std::move(converter)),
100         promotion_graph_(promotion_graph) {}
101 
Insert(int64_t block_index,const std::shared_ptr<Field> & unconverted_field,const std::shared_ptr<Array> & unconverted)102   void Insert(int64_t block_index, const std::shared_ptr<Field>& unconverted_field,
103               const std::shared_ptr<Array>& unconverted) override {
104     std::unique_lock<std::mutex> lock(mutex_);
105     if (chunks_.size() <= static_cast<size_t>(block_index)) {
106       chunks_.resize(static_cast<size_t>(block_index) + 1, nullptr);
107       unconverted_.resize(chunks_.size(), nullptr);
108       unconverted_fields_.resize(chunks_.size(), nullptr);
109     }
110     unconverted_[block_index] = unconverted;
111     unconverted_fields_[block_index] = unconverted_field;
112     lock.unlock();
113     ScheduleConvertChunk(block_index);
114   }
115 
ScheduleConvertChunk(int64_t block_index)116   void ScheduleConvertChunk(int64_t block_index) {
117     auto self = shared_from_this();
118     task_group_->Append([self, block_index] {
119       return self->TryConvertChunk(static_cast<size_t>(block_index));
120     });
121   }
122 
TryConvertChunk(size_t block_index)123   Status TryConvertChunk(size_t block_index) {
124     std::unique_lock<std::mutex> lock(mutex_);
125     auto converter = converter_;
126     auto unconverted = unconverted_[block_index];
127     auto unconverted_field = unconverted_fields_[block_index];
128     std::shared_ptr<Array> converted;
129 
130     lock.unlock();
131     Status st = converter->Convert(unconverted, &converted);
132     lock.lock();
133 
134     if (converter != converter_) {
135       // another task promoted converter; reconvert
136       lock.unlock();
137       ScheduleConvertChunk(block_index);
138       return Status::OK();
139     }
140 
141     if (st.ok()) {
142       // conversion succeeded
143       chunks_[block_index] = std::move(converted);
144       return Status::OK();
145     }
146 
147     auto promoted_type =
148         promotion_graph_->Promote(converter_->out_type(), unconverted_field);
149     if (promoted_type == nullptr) {
150       // converter failed, no promotion available
151       return st;
152     }
153     RETURN_NOT_OK(MakeConverter(promoted_type, converter_->pool(), &converter_));
154 
155     size_t nchunks = chunks_.size();
156     for (size_t i = 0; i < nchunks; ++i) {
157       if (i != block_index && chunks_[i]) {
158         // We're assuming the chunk was converted using the wrong type
159         // (which should be true unless the executor reorders tasks)
160         chunks_[i].reset();
161         lock.unlock();
162         ScheduleConvertChunk(i);
163         lock.lock();
164       }
165     }
166     lock.unlock();
167     ScheduleConvertChunk(block_index);
168     return Status::OK();
169   }
170 
Finish(std::shared_ptr<ChunkedArray> * out)171   Status Finish(std::shared_ptr<ChunkedArray>* out) override {
172     RETURN_NOT_OK(NonNestedChunkedArrayBuilder::Finish(out));
173     unconverted_.clear();
174     return Status::OK();
175   }
176 
177  private:
178   ArrayVector unconverted_;
179   std::vector<std::shared_ptr<Field>> unconverted_fields_;
180   const PromotionGraph* promotion_graph_;
181 };
182 
183 class ChunkedListArrayBuilder : public ChunkedArrayBuilder {
184  public:
ChunkedListArrayBuilder(const std::shared_ptr<TaskGroup> & task_group,MemoryPool * pool,std::shared_ptr<ChunkedArrayBuilder> value_builder,const std::shared_ptr<Field> & value_field)185   ChunkedListArrayBuilder(const std::shared_ptr<TaskGroup>& task_group, MemoryPool* pool,
186                           std::shared_ptr<ChunkedArrayBuilder> value_builder,
187                           const std::shared_ptr<Field>& value_field)
188       : ChunkedArrayBuilder(task_group),
189         pool_(pool),
190         value_builder_(std::move(value_builder)),
191         value_field_(value_field) {}
192 
ReplaceTaskGroup(const std::shared_ptr<TaskGroup> & task_group)193   Status ReplaceTaskGroup(const std::shared_ptr<TaskGroup>& task_group) override {
194     RETURN_NOT_OK(task_group_->Finish());
195     RETURN_NOT_OK(value_builder_->ReplaceTaskGroup(task_group));
196     task_group_ = task_group;
197     return Status::OK();
198   }
199 
Insert(int64_t block_index,const std::shared_ptr<Field> &,const std::shared_ptr<Array> & unconverted)200   void Insert(int64_t block_index, const std::shared_ptr<Field>&,
201               const std::shared_ptr<Array>& unconverted) override {
202     std::unique_lock<std::mutex> lock(mutex_);
203 
204     if (null_bitmap_chunks_.size() <= static_cast<size_t>(block_index)) {
205       null_bitmap_chunks_.resize(static_cast<size_t>(block_index) + 1, nullptr);
206       offset_chunks_.resize(null_bitmap_chunks_.size(), nullptr);
207     }
208 
209     if (unconverted->type_id() == Type::NA) {
210       auto st = InsertNull(block_index, unconverted->length());
211       if (!st.ok()) {
212         task_group_->Append([st] { return st; });
213       }
214       return;
215     }
216 
217     DCHECK_EQ(unconverted->type_id(), Type::LIST);
218     const auto& list_array = checked_cast<const ListArray&>(*unconverted);
219 
220     null_bitmap_chunks_[block_index] = unconverted->null_bitmap();
221     offset_chunks_[block_index] = list_array.value_offsets();
222 
223     value_builder_->Insert(block_index, list_array.list_type()->value_field(),
224                            list_array.values());
225   }
226 
Finish(std::shared_ptr<ChunkedArray> * out)227   Status Finish(std::shared_ptr<ChunkedArray>* out) override {
228     RETURN_NOT_OK(task_group_->Finish());
229 
230     std::shared_ptr<ChunkedArray> value_array;
231     RETURN_NOT_OK(value_builder_->Finish(&value_array));
232 
233     auto type = list(value_field_->WithType(value_array->type())->WithMetadata(nullptr));
234     ArrayVector chunks(null_bitmap_chunks_.size());
235     for (size_t i = 0; i < null_bitmap_chunks_.size(); ++i) {
236       auto value_chunk = value_array->chunk(static_cast<int>(i));
237       auto length = offset_chunks_[i]->size() / sizeof(int32_t) - 1;
238       chunks[i] = std::make_shared<ListArray>(type, length, offset_chunks_[i],
239                                               value_chunk, null_bitmap_chunks_[i]);
240     }
241 
242     *out = std::make_shared<ChunkedArray>(std::move(chunks), type);
243     return Status::OK();
244   }
245 
246  private:
247   // call from Insert() only, with mutex_ locked
InsertNull(int64_t block_index,int64_t length)248   Status InsertNull(int64_t block_index, int64_t length) {
249     value_builder_->Insert(block_index, value_field_, std::make_shared<NullArray>(0));
250 
251     ARROW_ASSIGN_OR_RAISE(null_bitmap_chunks_[block_index],
252                           AllocateEmptyBitmap(length, pool_));
253 
254     int64_t offsets_length = (length + 1) * sizeof(int32_t);
255     ARROW_ASSIGN_OR_RAISE(offset_chunks_[block_index],
256                           AllocateBuffer(offsets_length, pool_));
257     std::memset(offset_chunks_[block_index]->mutable_data(), 0, offsets_length);
258 
259     return Status::OK();
260   }
261 
262   std::mutex mutex_;
263   MemoryPool* pool_;
264   std::shared_ptr<ChunkedArrayBuilder> value_builder_;
265   BufferVector offset_chunks_, null_bitmap_chunks_;
266   std::shared_ptr<Field> value_field_;
267 };
268 
269 class ChunkedStructArrayBuilder : public ChunkedArrayBuilder {
270  public:
ChunkedStructArrayBuilder(const std::shared_ptr<TaskGroup> & task_group,MemoryPool * pool,const PromotionGraph * promotion_graph,std::vector<std::pair<std::string,std::shared_ptr<ChunkedArrayBuilder>>> name_builders)271   ChunkedStructArrayBuilder(
272       const std::shared_ptr<TaskGroup>& task_group, MemoryPool* pool,
273       const PromotionGraph* promotion_graph,
274       std::vector<std::pair<std::string, std::shared_ptr<ChunkedArrayBuilder>>>
275           name_builders)
276       : ChunkedArrayBuilder(task_group), pool_(pool), promotion_graph_(promotion_graph) {
277     for (auto&& name_builder : name_builders) {
278       auto index = static_cast<int>(name_to_index_.size());
279       name_to_index_.emplace(std::move(name_builder.first), index);
280       child_builders_.emplace_back(std::move(name_builder.second));
281     }
282   }
283 
Insert(int64_t block_index,const std::shared_ptr<Field> &,const std::shared_ptr<Array> & unconverted)284   void Insert(int64_t block_index, const std::shared_ptr<Field>&,
285               const std::shared_ptr<Array>& unconverted) override {
286     std::unique_lock<std::mutex> lock(mutex_);
287 
288     if (null_bitmap_chunks_.size() <= static_cast<size_t>(block_index)) {
289       null_bitmap_chunks_.resize(static_cast<size_t>(block_index) + 1, nullptr);
290       chunk_lengths_.resize(null_bitmap_chunks_.size(), -1);
291       child_absent_.resize(null_bitmap_chunks_.size(), std::vector<bool>(0));
292     }
293     null_bitmap_chunks_[block_index] = unconverted->null_bitmap();
294     chunk_lengths_[block_index] = unconverted->length();
295 
296     if (unconverted->type_id() == Type::NA) {
297       auto maybe_buffer = AllocateBitmap(unconverted->length(), pool_);
298       if (maybe_buffer.ok()) {
299         null_bitmap_chunks_[block_index] = *std::move(maybe_buffer);
300         std::memset(null_bitmap_chunks_[block_index]->mutable_data(), 0,
301                     null_bitmap_chunks_[block_index]->size());
302       } else {
303         Status st = maybe_buffer.status();
304         task_group_->Append([st] { return st; });
305       }
306 
307       // absent fields will be inserted at Finish
308       return;
309     }
310 
311     const auto& struct_array = checked_cast<const StructArray&>(*unconverted);
312     if (promotion_graph_ == nullptr) {
313       // If unexpected fields are ignored or result in an error then all parsers will emit
314       // columns exclusively in the ordering specified in ParseOptions::explicit_schema,
315       // so child_builders_ is immutable and no associative lookup is necessary.
316       for (int i = 0; i < unconverted->num_fields(); ++i) {
317         child_builders_[i]->Insert(block_index, unconverted->type()->field(i),
318                                    struct_array.field(i));
319       }
320     } else {
321       auto st = InsertChildren(block_index, struct_array);
322       if (!st.ok()) {
323         return task_group_->Append([st] { return st; });
324       }
325     }
326   }
327 
Finish(std::shared_ptr<ChunkedArray> * out)328   Status Finish(std::shared_ptr<ChunkedArray>* out) override {
329     RETURN_NOT_OK(task_group_->Finish());
330 
331     if (promotion_graph_ != nullptr) {
332       // insert absent child chunks
333       for (auto&& name_index : name_to_index_) {
334         auto child_builder = child_builders_[name_index.second].get();
335 
336         RETURN_NOT_OK(child_builder->ReplaceTaskGroup(TaskGroup::MakeSerial()));
337 
338         for (size_t i = 0; i < chunk_lengths_.size(); ++i) {
339           if (child_absent_[i].size() > static_cast<size_t>(name_index.second) &&
340               !child_absent_[i][name_index.second]) {
341             continue;
342           }
343           auto empty = std::make_shared<NullArray>(chunk_lengths_[i]);
344           child_builder->Insert(i, promotion_graph_->Null(name_index.first), empty);
345         }
346       }
347     }
348 
349     std::vector<std::shared_ptr<Field>> fields(name_to_index_.size());
350     std::vector<std::shared_ptr<ChunkedArray>> child_arrays(name_to_index_.size());
351     for (auto&& name_index : name_to_index_) {
352       auto child_builder = child_builders_[name_index.second].get();
353 
354       std::shared_ptr<ChunkedArray> child_array;
355       RETURN_NOT_OK(child_builder->Finish(&child_array));
356 
357       child_arrays[name_index.second] = child_array;
358       fields[name_index.second] = field(name_index.first, child_array->type());
359     }
360 
361     auto type = struct_(std::move(fields));
362     ArrayVector chunks(null_bitmap_chunks_.size());
363     for (size_t i = 0; i < null_bitmap_chunks_.size(); ++i) {
364       ArrayVector child_chunks;
365       for (const auto& child_array : child_arrays) {
366         child_chunks.push_back(child_array->chunk(static_cast<int>(i)));
367       }
368       chunks[i] = std::make_shared<StructArray>(type, chunk_lengths_[i], child_chunks,
369                                                 null_bitmap_chunks_[i]);
370     }
371 
372     *out = std::make_shared<ChunkedArray>(std::move(chunks), type);
373     return Status::OK();
374   }
375 
ReplaceTaskGroup(const std::shared_ptr<TaskGroup> & task_group)376   Status ReplaceTaskGroup(const std::shared_ptr<TaskGroup>& task_group) override {
377     RETURN_NOT_OK(task_group_->Finish());
378     for (auto&& child_builder : child_builders_) {
379       RETURN_NOT_OK(child_builder->ReplaceTaskGroup(task_group));
380     }
381     task_group_ = task_group;
382     return Status::OK();
383   }
384 
385  private:
386   // Insert children associatively by name; the unconverted block may have unexpected or
387   // differently ordered fields
388   // call from Insert() only, with mutex_ locked
InsertChildren(int64_t block_index,const StructArray & unconverted)389   Status InsertChildren(int64_t block_index, const StructArray& unconverted) {
390     const auto& fields = unconverted.type()->fields();
391 
392     for (int i = 0; i < unconverted.num_fields(); ++i) {
393       auto it = name_to_index_.find(fields[i]->name());
394 
395       if (it == name_to_index_.end()) {
396         // add a new field to this builder
397         auto type = promotion_graph_->Infer(fields[i]);
398         DCHECK_NE(type, nullptr)
399             << "invalid unconverted_field encountered in conversion: "
400             << fields[i]->name() << ":" << *fields[i]->type();
401 
402         auto new_index = static_cast<int>(name_to_index_.size());
403         it = name_to_index_.emplace(fields[i]->name(), new_index).first;
404 
405         std::shared_ptr<ChunkedArrayBuilder> child_builder;
406         RETURN_NOT_OK(MakeChunkedArrayBuilder(task_group_, pool_, promotion_graph_, type,
407                                               &child_builder));
408         child_builders_.emplace_back(std::move(child_builder));
409       }
410 
411       auto unconverted_field = unconverted.type()->field(i);
412       child_builders_[it->second]->Insert(block_index, unconverted_field,
413                                           unconverted.field(i));
414 
415       child_absent_[block_index].resize(child_builders_.size(), true);
416       child_absent_[block_index][it->second] = false;
417     }
418 
419     return Status::OK();
420   }
421 
422   std::mutex mutex_;
423   MemoryPool* pool_;
424   const PromotionGraph* promotion_graph_;
425   std::unordered_map<std::string, int> name_to_index_;
426   std::vector<std::shared_ptr<ChunkedArrayBuilder>> child_builders_;
427   std::vector<std::vector<bool>> child_absent_;
428   BufferVector null_bitmap_chunks_;
429   std::vector<int64_t> chunk_lengths_;
430 };
431 
MakeChunkedArrayBuilder(const std::shared_ptr<TaskGroup> & task_group,MemoryPool * pool,const PromotionGraph * promotion_graph,const std::shared_ptr<DataType> & type,std::shared_ptr<ChunkedArrayBuilder> * out)432 Status MakeChunkedArrayBuilder(const std::shared_ptr<TaskGroup>& task_group,
433                                MemoryPool* pool, const PromotionGraph* promotion_graph,
434                                const std::shared_ptr<DataType>& type,
435                                std::shared_ptr<ChunkedArrayBuilder>* out) {
436   if (type->id() == Type::STRUCT) {
437     std::vector<std::pair<std::string, std::shared_ptr<ChunkedArrayBuilder>>>
438         child_builders;
439     for (const auto& f : type->fields()) {
440       std::shared_ptr<ChunkedArrayBuilder> child_builder;
441       RETURN_NOT_OK(MakeChunkedArrayBuilder(task_group, pool, promotion_graph, f->type(),
442                                             &child_builder));
443       child_builders.emplace_back(f->name(), std::move(child_builder));
444     }
445     *out = std::make_shared<ChunkedStructArrayBuilder>(task_group, pool, promotion_graph,
446                                                        std::move(child_builders));
447     return Status::OK();
448   }
449   if (type->id() == Type::LIST) {
450     const auto& list_type = checked_cast<const ListType&>(*type);
451     std::shared_ptr<ChunkedArrayBuilder> value_builder;
452     RETURN_NOT_OK(MakeChunkedArrayBuilder(task_group, pool, promotion_graph,
453                                           list_type.value_type(), &value_builder));
454     *out = std::make_shared<ChunkedListArrayBuilder>(
455         task_group, pool, std::move(value_builder), list_type.value_field());
456     return Status::OK();
457   }
458   std::shared_ptr<Converter> converter;
459   RETURN_NOT_OK(MakeConverter(type, pool, &converter));
460   if (promotion_graph) {
461     *out = std::make_shared<InferringChunkedArrayBuilder>(task_group, promotion_graph,
462                                                           std::move(converter));
463   } else {
464     *out = std::make_shared<TypedChunkedArrayBuilder>(task_group, std::move(converter));
465   }
466   return Status::OK();
467 }
468 
469 }  // namespace json
470 }  // namespace arrow
471