1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include "arrow/json/chunker.h"
19 
20 #include <algorithm>
21 #include <utility>
22 #include <vector>
23 
24 #include "arrow/json/rapidjson_defs.h"
25 #include "rapidjson/reader.h"
26 
27 #include "arrow/buffer.h"
28 #include "arrow/json/options.h"
29 #include "arrow/util/logging.h"
30 #include "arrow/util/make_unique.h"
31 #include "arrow/util/string_view.h"
32 
33 namespace arrow {
34 
35 using internal::make_unique;
36 using util::string_view;
37 
38 namespace json {
39 
40 namespace rj = arrow::rapidjson;
41 
ConsumeWhitespace(string_view view)42 static size_t ConsumeWhitespace(string_view view) {
43 #ifdef RAPIDJSON_SIMD
44   auto data = view.data();
45   auto nonws_begin = rj::SkipWhitespace_SIMD(data, data + view.size());
46   return nonws_begin - data;
47 #else
48   auto ws_count = view.find_first_not_of(" \t\r\n");
49   if (ws_count == string_view::npos) {
50     return view.size();
51   } else {
52     return ws_count;
53   }
54 #endif
55 }
56 
57 /// RapidJson custom stream for reading JSON stored in multiple buffers
58 /// http://rapidjson.org/md_doc_stream.html#CustomStream
59 class MultiStringStream {
60  public:
61   using Ch = char;
MultiStringStream(std::vector<string_view> strings)62   explicit MultiStringStream(std::vector<string_view> strings)
63       : strings_(std::move(strings)) {
64     std::reverse(strings_.begin(), strings_.end());
65   }
MultiStringStream(const BufferVector & buffers)66   explicit MultiStringStream(const BufferVector& buffers) : strings_(buffers.size()) {
67     for (size_t i = 0; i < buffers.size(); ++i) {
68       strings_[i] = string_view(*buffers[i]);
69     }
70     std::reverse(strings_.begin(), strings_.end());
71   }
Peek() const72   char Peek() const {
73     if (strings_.size() == 0) return '\0';
74     return strings_.back()[0];
75   }
Take()76   char Take() {
77     if (strings_.size() == 0) return '\0';
78     char taken = strings_.back()[0];
79     if (strings_.back().size() == 1) {
80       strings_.pop_back();
81     } else {
82       strings_.back() = strings_.back().substr(1);
83     }
84     ++index_;
85     return taken;
86   }
Tell()87   size_t Tell() { return index_; }
Put(char)88   void Put(char) { ARROW_LOG(FATAL) << "not implemented"; }
Flush()89   void Flush() { ARROW_LOG(FATAL) << "not implemented"; }
PutBegin()90   char* PutBegin() {
91     ARROW_LOG(FATAL) << "not implemented";
92     return nullptr;
93   }
PutEnd(char *)94   size_t PutEnd(char*) {
95     ARROW_LOG(FATAL) << "not implemented";
96     return 0;
97   }
98 
99  private:
100   size_t index_ = 0;
101   std::vector<string_view> strings_;
102 };
103 
104 template <typename Stream>
ConsumeWholeObject(Stream && stream)105 static size_t ConsumeWholeObject(Stream&& stream) {
106   static constexpr unsigned parse_flags = rj::kParseIterativeFlag |
107                                           rj::kParseStopWhenDoneFlag |
108                                           rj::kParseNumbersAsStringsFlag;
109   rj::BaseReaderHandler<rj::UTF8<>> handler;
110   rj::Reader reader;
111   // parse a single JSON object
112   switch (reader.Parse<parse_flags>(stream, handler).Code()) {
113     case rj::kParseErrorNone:
114       return stream.Tell();
115     case rj::kParseErrorDocumentEmpty:
116       return 0;
117     default:
118       // rapidjson emitted an error, the most recent object was partial
119       return string_view::npos;
120   }
121 }
122 
123 namespace {
124 
125 // A BoundaryFinder implementation that assumes JSON objects can contain raw newlines,
126 // and uses actual JSON parsing to delimit them.
127 class ParsingBoundaryFinder : public BoundaryFinder {
128  public:
FindFirst(string_view partial,string_view block,int64_t * out_pos)129   Status FindFirst(string_view partial, string_view block, int64_t* out_pos) override {
130     // NOTE: We could bubble up JSON parse errors here, but the actual parsing
131     // step will detect them later anyway.
132     auto length = ConsumeWholeObject(MultiStringStream({partial, block}));
133     if (length == string_view::npos) {
134       *out_pos = -1;
135     } else {
136       DCHECK_GE(length, partial.size());
137       DCHECK_LE(length, partial.size() + block.size());
138       *out_pos = static_cast<int64_t>(length - partial.size());
139     }
140     return Status::OK();
141   }
142 
FindLast(util::string_view block,int64_t * out_pos)143   Status FindLast(util::string_view block, int64_t* out_pos) override {
144     const size_t block_length = block.size();
145     size_t consumed_length = 0;
146     while (consumed_length < block_length) {
147       rj::MemoryStream ms(reinterpret_cast<const char*>(block.data()), block.size());
148       using InputStream = rj::EncodedInputStream<rj::UTF8<>, rj::MemoryStream>;
149       auto length = ConsumeWholeObject(InputStream(ms));
150       if (length == string_view::npos || length == 0) {
151         // found incomplete object or block is empty
152         break;
153       }
154       consumed_length += length;
155       block = block.substr(length);
156     }
157     if (consumed_length == 0) {
158       *out_pos = -1;
159     } else {
160       consumed_length += ConsumeWhitespace(block);
161       DCHECK_LE(consumed_length, block_length);
162       *out_pos = static_cast<int64_t>(consumed_length);
163     }
164     return Status::OK();
165   }
166 
FindNth(util::string_view partial,util::string_view block,int64_t count,int64_t * out_pos,int64_t * num_found)167   Status FindNth(util::string_view partial, util::string_view block, int64_t count,
168                  int64_t* out_pos, int64_t* num_found) override {
169     return Status::NotImplemented("ParsingBoundaryFinder::FindNth");
170   }
171 };
172 
173 }  // namespace
174 
MakeChunker(const ParseOptions & options)175 std::unique_ptr<Chunker> MakeChunker(const ParseOptions& options) {
176   std::shared_ptr<BoundaryFinder> delimiter;
177   if (options.newlines_in_values) {
178     delimiter = std::make_shared<ParsingBoundaryFinder>();
179   } else {
180     delimiter = MakeNewlineBoundaryFinder();
181   }
182   return std::unique_ptr<Chunker>(new Chunker(std::move(delimiter)));
183 }
184 
185 }  // namespace json
186 }  // namespace arrow
187