1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include "arrow/json/reader.h"
19 
20 #include <utility>
21 #include <vector>
22 
23 #include "arrow/array.h"
24 #include "arrow/buffer.h"
25 #include "arrow/io/interfaces.h"
26 #include "arrow/json/chunked_builder.h"
27 #include "arrow/json/chunker.h"
28 #include "arrow/json/converter.h"
29 #include "arrow/json/parser.h"
30 #include "arrow/record_batch.h"
31 #include "arrow/table.h"
32 #include "arrow/util/async_generator.h"
33 #include "arrow/util/iterator.h"
34 #include "arrow/util/logging.h"
35 #include "arrow/util/string_view.h"
36 #include "arrow/util/task_group.h"
37 #include "arrow/util/thread_pool.h"
38 
39 namespace arrow {
40 
41 using util::string_view;
42 
43 using internal::GetCpuThreadPool;
44 using internal::TaskGroup;
45 using internal::ThreadPool;
46 
47 namespace json {
48 
49 class TableReaderImpl : public TableReader,
50                         public std::enable_shared_from_this<TableReaderImpl> {
51  public:
TableReaderImpl(MemoryPool * pool,const ReadOptions & read_options,const ParseOptions & parse_options,std::shared_ptr<TaskGroup> task_group)52   TableReaderImpl(MemoryPool* pool, const ReadOptions& read_options,
53                   const ParseOptions& parse_options,
54                   std::shared_ptr<TaskGroup> task_group)
55       : pool_(pool),
56         read_options_(read_options),
57         parse_options_(parse_options),
58         chunker_(MakeChunker(parse_options_)),
59         task_group_(std::move(task_group)) {}
60 
Init(std::shared_ptr<io::InputStream> input)61   Status Init(std::shared_ptr<io::InputStream> input) {
62     ARROW_ASSIGN_OR_RAISE(auto it,
63                           io::MakeInputStreamIterator(input, read_options_.block_size));
64     return MakeReadaheadIterator(std::move(it), task_group_->parallelism())
65         .Value(&block_iterator_);
66   }
67 
Read()68   Result<std::shared_ptr<Table>> Read() override {
69     RETURN_NOT_OK(MakeBuilder());
70 
71     ARROW_ASSIGN_OR_RAISE(auto block, block_iterator_.Next());
72     if (block == nullptr) {
73       return Status::Invalid("Empty JSON file");
74     }
75 
76     auto self = shared_from_this();
77     auto empty = std::make_shared<Buffer>("");
78 
79     int64_t block_index = 0;
80     std::shared_ptr<Buffer> partial = empty;
81 
82     while (block != nullptr) {
83       std::shared_ptr<Buffer> next_block, whole, completion, next_partial;
84 
85       ARROW_ASSIGN_OR_RAISE(next_block, block_iterator_.Next());
86 
87       if (next_block == nullptr) {
88         // End of file reached => compute completion from penultimate block
89         RETURN_NOT_OK(chunker_->ProcessFinal(partial, block, &completion, &whole));
90       } else {
91         std::shared_ptr<Buffer> starts_with_whole;
92         // Get completion of partial from previous block.
93         RETURN_NOT_OK(chunker_->ProcessWithPartial(partial, block, &completion,
94                                                    &starts_with_whole));
95 
96         // Get all whole objects entirely inside the current buffer
97         RETURN_NOT_OK(chunker_->Process(starts_with_whole, &whole, &next_partial));
98       }
99 
100       // Launch parse task
101       task_group_->Append([self, partial, completion, whole, block_index] {
102         return self->ParseAndInsert(partial, completion, whole, block_index);
103       });
104       block_index++;
105 
106       partial = next_partial;
107       block = next_block;
108     }
109 
110     std::shared_ptr<ChunkedArray> array;
111     RETURN_NOT_OK(builder_->Finish(&array));
112     return Table::FromChunkedStructArray(array);
113   }
114 
115  private:
MakeBuilder()116   Status MakeBuilder() {
117     auto type = parse_options_.explicit_schema
118                     ? struct_(parse_options_.explicit_schema->fields())
119                     : struct_({});
120 
121     auto promotion_graph =
122         parse_options_.unexpected_field_behavior == UnexpectedFieldBehavior::InferType
123             ? GetPromotionGraph()
124             : nullptr;
125 
126     return MakeChunkedArrayBuilder(task_group_, pool_, promotion_graph, type, &builder_);
127   }
128 
ParseAndInsert(const std::shared_ptr<Buffer> & partial,const std::shared_ptr<Buffer> & completion,const std::shared_ptr<Buffer> & whole,int64_t block_index)129   Status ParseAndInsert(const std::shared_ptr<Buffer>& partial,
130                         const std::shared_ptr<Buffer>& completion,
131                         const std::shared_ptr<Buffer>& whole, int64_t block_index) {
132     std::unique_ptr<BlockParser> parser;
133     RETURN_NOT_OK(BlockParser::Make(pool_, parse_options_, &parser));
134     RETURN_NOT_OK(parser->ReserveScalarStorage(partial->size() + completion->size() +
135                                                whole->size()));
136 
137     if (partial->size() != 0 || completion->size() != 0) {
138       std::shared_ptr<Buffer> straddling;
139       if (partial->size() == 0) {
140         straddling = completion;
141       } else if (completion->size() == 0) {
142         straddling = partial;
143       } else {
144         ARROW_ASSIGN_OR_RAISE(straddling,
145                               ConcatenateBuffers({partial, completion}, pool_));
146       }
147       RETURN_NOT_OK(parser->Parse(straddling));
148     }
149 
150     if (whole->size() != 0) {
151       RETURN_NOT_OK(parser->Parse(whole));
152     }
153 
154     std::shared_ptr<Array> parsed;
155     RETURN_NOT_OK(parser->Finish(&parsed));
156     builder_->Insert(block_index, field("", parsed->type()), parsed);
157     return Status::OK();
158   }
159 
160   MemoryPool* pool_;
161   ReadOptions read_options_;
162   ParseOptions parse_options_;
163   std::unique_ptr<Chunker> chunker_;
164   std::shared_ptr<TaskGroup> task_group_;
165   Iterator<std::shared_ptr<Buffer>> block_iterator_;
166   std::shared_ptr<ChunkedArrayBuilder> builder_;
167 };
168 
Read(std::shared_ptr<Table> * out)169 Status TableReader::Read(std::shared_ptr<Table>* out) { return Read().Value(out); }
170 
Make(MemoryPool * pool,std::shared_ptr<io::InputStream> input,const ReadOptions & read_options,const ParseOptions & parse_options)171 Result<std::shared_ptr<TableReader>> TableReader::Make(
172     MemoryPool* pool, std::shared_ptr<io::InputStream> input,
173     const ReadOptions& read_options, const ParseOptions& parse_options) {
174   std::shared_ptr<TableReaderImpl> ptr;
175   if (read_options.use_threads) {
176     ptr = std::make_shared<TableReaderImpl>(pool, read_options, parse_options,
177                                             TaskGroup::MakeThreaded(GetCpuThreadPool()));
178   } else {
179     ptr = std::make_shared<TableReaderImpl>(pool, read_options, parse_options,
180                                             TaskGroup::MakeSerial());
181   }
182   RETURN_NOT_OK(ptr->Init(input));
183   return ptr;
184 }
185 
Make(MemoryPool * pool,std::shared_ptr<io::InputStream> input,const ReadOptions & read_options,const ParseOptions & parse_options,std::shared_ptr<TableReader> * out)186 Status TableReader::Make(MemoryPool* pool, std::shared_ptr<io::InputStream> input,
187                          const ReadOptions& read_options,
188                          const ParseOptions& parse_options,
189                          std::shared_ptr<TableReader>* out) {
190   return TableReader::Make(pool, input, read_options, parse_options).Value(out);
191 }
192 
ParseOne(ParseOptions options,std::shared_ptr<Buffer> json)193 Result<std::shared_ptr<RecordBatch>> ParseOne(ParseOptions options,
194                                               std::shared_ptr<Buffer> json) {
195   std::unique_ptr<BlockParser> parser;
196   RETURN_NOT_OK(BlockParser::Make(options, &parser));
197   RETURN_NOT_OK(parser->Parse(json));
198   std::shared_ptr<Array> parsed;
199   RETURN_NOT_OK(parser->Finish(&parsed));
200 
201   auto type =
202       options.explicit_schema ? struct_(options.explicit_schema->fields()) : struct_({});
203   auto promotion_graph =
204       options.unexpected_field_behavior == UnexpectedFieldBehavior::InferType
205           ? GetPromotionGraph()
206           : nullptr;
207   std::shared_ptr<ChunkedArrayBuilder> builder;
208   RETURN_NOT_OK(MakeChunkedArrayBuilder(internal::TaskGroup::MakeSerial(),
209                                         default_memory_pool(), promotion_graph, type,
210                                         &builder));
211 
212   builder->Insert(0, field("", type), parsed);
213   std::shared_ptr<ChunkedArray> converted_chunked;
214   RETURN_NOT_OK(builder->Finish(&converted_chunked));
215   auto converted = static_cast<const StructArray*>(converted_chunked->chunk(0).get());
216 
217   std::vector<std::shared_ptr<Array>> columns(converted->num_fields());
218   for (int i = 0; i < converted->num_fields(); ++i) {
219     columns[i] = converted->field(i);
220   }
221   return RecordBatch::Make(schema(converted->type()->fields()), converted->length(),
222                            std::move(columns));
223 }
224 
225 }  // namespace json
226 }  // namespace arrow
227