1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "arrow/json/reader.h"
19
20 #include <utility>
21 #include <vector>
22
23 #include "arrow/array.h"
24 #include "arrow/buffer.h"
25 #include "arrow/io/interfaces.h"
26 #include "arrow/json/chunked_builder.h"
27 #include "arrow/json/chunker.h"
28 #include "arrow/json/converter.h"
29 #include "arrow/json/parser.h"
30 #include "arrow/record_batch.h"
31 #include "arrow/table.h"
32 #include "arrow/util/async_generator.h"
33 #include "arrow/util/iterator.h"
34 #include "arrow/util/logging.h"
35 #include "arrow/util/string_view.h"
36 #include "arrow/util/task_group.h"
37 #include "arrow/util/thread_pool.h"
38
39 namespace arrow {
40
41 using util::string_view;
42
43 using internal::GetCpuThreadPool;
44 using internal::TaskGroup;
45 using internal::ThreadPool;
46
47 namespace json {
48
49 class TableReaderImpl : public TableReader,
50 public std::enable_shared_from_this<TableReaderImpl> {
51 public:
TableReaderImpl(MemoryPool * pool,const ReadOptions & read_options,const ParseOptions & parse_options,std::shared_ptr<TaskGroup> task_group)52 TableReaderImpl(MemoryPool* pool, const ReadOptions& read_options,
53 const ParseOptions& parse_options,
54 std::shared_ptr<TaskGroup> task_group)
55 : pool_(pool),
56 read_options_(read_options),
57 parse_options_(parse_options),
58 chunker_(MakeChunker(parse_options_)),
59 task_group_(std::move(task_group)) {}
60
Init(std::shared_ptr<io::InputStream> input)61 Status Init(std::shared_ptr<io::InputStream> input) {
62 ARROW_ASSIGN_OR_RAISE(auto it,
63 io::MakeInputStreamIterator(input, read_options_.block_size));
64 return MakeReadaheadIterator(std::move(it), task_group_->parallelism())
65 .Value(&block_iterator_);
66 }
67
Read()68 Result<std::shared_ptr<Table>> Read() override {
69 RETURN_NOT_OK(MakeBuilder());
70
71 ARROW_ASSIGN_OR_RAISE(auto block, block_iterator_.Next());
72 if (block == nullptr) {
73 return Status::Invalid("Empty JSON file");
74 }
75
76 auto self = shared_from_this();
77 auto empty = std::make_shared<Buffer>("");
78
79 int64_t block_index = 0;
80 std::shared_ptr<Buffer> partial = empty;
81
82 while (block != nullptr) {
83 std::shared_ptr<Buffer> next_block, whole, completion, next_partial;
84
85 ARROW_ASSIGN_OR_RAISE(next_block, block_iterator_.Next());
86
87 if (next_block == nullptr) {
88 // End of file reached => compute completion from penultimate block
89 RETURN_NOT_OK(chunker_->ProcessFinal(partial, block, &completion, &whole));
90 } else {
91 std::shared_ptr<Buffer> starts_with_whole;
92 // Get completion of partial from previous block.
93 RETURN_NOT_OK(chunker_->ProcessWithPartial(partial, block, &completion,
94 &starts_with_whole));
95
96 // Get all whole objects entirely inside the current buffer
97 RETURN_NOT_OK(chunker_->Process(starts_with_whole, &whole, &next_partial));
98 }
99
100 // Launch parse task
101 task_group_->Append([self, partial, completion, whole, block_index] {
102 return self->ParseAndInsert(partial, completion, whole, block_index);
103 });
104 block_index++;
105
106 partial = next_partial;
107 block = next_block;
108 }
109
110 std::shared_ptr<ChunkedArray> array;
111 RETURN_NOT_OK(builder_->Finish(&array));
112 return Table::FromChunkedStructArray(array);
113 }
114
115 private:
MakeBuilder()116 Status MakeBuilder() {
117 auto type = parse_options_.explicit_schema
118 ? struct_(parse_options_.explicit_schema->fields())
119 : struct_({});
120
121 auto promotion_graph =
122 parse_options_.unexpected_field_behavior == UnexpectedFieldBehavior::InferType
123 ? GetPromotionGraph()
124 : nullptr;
125
126 return MakeChunkedArrayBuilder(task_group_, pool_, promotion_graph, type, &builder_);
127 }
128
ParseAndInsert(const std::shared_ptr<Buffer> & partial,const std::shared_ptr<Buffer> & completion,const std::shared_ptr<Buffer> & whole,int64_t block_index)129 Status ParseAndInsert(const std::shared_ptr<Buffer>& partial,
130 const std::shared_ptr<Buffer>& completion,
131 const std::shared_ptr<Buffer>& whole, int64_t block_index) {
132 std::unique_ptr<BlockParser> parser;
133 RETURN_NOT_OK(BlockParser::Make(pool_, parse_options_, &parser));
134 RETURN_NOT_OK(parser->ReserveScalarStorage(partial->size() + completion->size() +
135 whole->size()));
136
137 if (partial->size() != 0 || completion->size() != 0) {
138 std::shared_ptr<Buffer> straddling;
139 if (partial->size() == 0) {
140 straddling = completion;
141 } else if (completion->size() == 0) {
142 straddling = partial;
143 } else {
144 ARROW_ASSIGN_OR_RAISE(straddling,
145 ConcatenateBuffers({partial, completion}, pool_));
146 }
147 RETURN_NOT_OK(parser->Parse(straddling));
148 }
149
150 if (whole->size() != 0) {
151 RETURN_NOT_OK(parser->Parse(whole));
152 }
153
154 std::shared_ptr<Array> parsed;
155 RETURN_NOT_OK(parser->Finish(&parsed));
156 builder_->Insert(block_index, field("", parsed->type()), parsed);
157 return Status::OK();
158 }
159
160 MemoryPool* pool_;
161 ReadOptions read_options_;
162 ParseOptions parse_options_;
163 std::unique_ptr<Chunker> chunker_;
164 std::shared_ptr<TaskGroup> task_group_;
165 Iterator<std::shared_ptr<Buffer>> block_iterator_;
166 std::shared_ptr<ChunkedArrayBuilder> builder_;
167 };
168
Read(std::shared_ptr<Table> * out)169 Status TableReader::Read(std::shared_ptr<Table>* out) { return Read().Value(out); }
170
Make(MemoryPool * pool,std::shared_ptr<io::InputStream> input,const ReadOptions & read_options,const ParseOptions & parse_options)171 Result<std::shared_ptr<TableReader>> TableReader::Make(
172 MemoryPool* pool, std::shared_ptr<io::InputStream> input,
173 const ReadOptions& read_options, const ParseOptions& parse_options) {
174 std::shared_ptr<TableReaderImpl> ptr;
175 if (read_options.use_threads) {
176 ptr = std::make_shared<TableReaderImpl>(pool, read_options, parse_options,
177 TaskGroup::MakeThreaded(GetCpuThreadPool()));
178 } else {
179 ptr = std::make_shared<TableReaderImpl>(pool, read_options, parse_options,
180 TaskGroup::MakeSerial());
181 }
182 RETURN_NOT_OK(ptr->Init(input));
183 return ptr;
184 }
185
Make(MemoryPool * pool,std::shared_ptr<io::InputStream> input,const ReadOptions & read_options,const ParseOptions & parse_options,std::shared_ptr<TableReader> * out)186 Status TableReader::Make(MemoryPool* pool, std::shared_ptr<io::InputStream> input,
187 const ReadOptions& read_options,
188 const ParseOptions& parse_options,
189 std::shared_ptr<TableReader>* out) {
190 return TableReader::Make(pool, input, read_options, parse_options).Value(out);
191 }
192
ParseOne(ParseOptions options,std::shared_ptr<Buffer> json)193 Result<std::shared_ptr<RecordBatch>> ParseOne(ParseOptions options,
194 std::shared_ptr<Buffer> json) {
195 std::unique_ptr<BlockParser> parser;
196 RETURN_NOT_OK(BlockParser::Make(options, &parser));
197 RETURN_NOT_OK(parser->Parse(json));
198 std::shared_ptr<Array> parsed;
199 RETURN_NOT_OK(parser->Finish(&parsed));
200
201 auto type =
202 options.explicit_schema ? struct_(options.explicit_schema->fields()) : struct_({});
203 auto promotion_graph =
204 options.unexpected_field_behavior == UnexpectedFieldBehavior::InferType
205 ? GetPromotionGraph()
206 : nullptr;
207 std::shared_ptr<ChunkedArrayBuilder> builder;
208 RETURN_NOT_OK(MakeChunkedArrayBuilder(internal::TaskGroup::MakeSerial(),
209 default_memory_pool(), promotion_graph, type,
210 &builder));
211
212 builder->Insert(0, field("", type), parsed);
213 std::shared_ptr<ChunkedArray> converted_chunked;
214 RETURN_NOT_OK(builder->Finish(&converted_chunked));
215 auto converted = static_cast<const StructArray*>(converted_chunked->chunk(0).get());
216
217 std::vector<std::shared_ptr<Array>> columns(converted->num_fields());
218 for (int i = 0; i < converted->num_fields(); ++i) {
219 columns[i] = converted->field(i);
220 }
221 return RecordBatch::Make(schema(converted->type()->fields()), converted->length(),
222 std::move(columns));
223 }
224
225 } // namespace json
226 } // namespace arrow
227