]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | #include "arrow/json/chunker.h" | |
19 | ||
20 | #include <algorithm> | |
21 | #include <utility> | |
22 | #include <vector> | |
23 | ||
24 | #include "arrow/json/rapidjson_defs.h" | |
25 | #include "rapidjson/reader.h" | |
26 | ||
27 | #include "arrow/buffer.h" | |
28 | #include "arrow/json/options.h" | |
29 | #include "arrow/util/logging.h" | |
30 | #include "arrow/util/make_unique.h" | |
31 | #include "arrow/util/string_view.h" | |
32 | ||
33 | namespace arrow { | |
34 | ||
35 | using internal::make_unique; | |
36 | using util::string_view; | |
37 | ||
38 | namespace json { | |
39 | ||
40 | namespace rj = arrow::rapidjson; | |
41 | ||
42 | static size_t ConsumeWhitespace(string_view view) { | |
43 | #ifdef RAPIDJSON_SIMD | |
44 | auto data = view.data(); | |
45 | auto nonws_begin = rj::SkipWhitespace_SIMD(data, data + view.size()); | |
46 | return nonws_begin - data; | |
47 | #else | |
48 | auto ws_count = view.find_first_not_of(" \t\r\n"); | |
49 | if (ws_count == string_view::npos) { | |
50 | return view.size(); | |
51 | } else { | |
52 | return ws_count; | |
53 | } | |
54 | #endif | |
55 | } | |
56 | ||
57 | /// RapidJson custom stream for reading JSON stored in multiple buffers | |
58 | /// http://rapidjson.org/md_doc_stream.html#CustomStream | |
59 | class MultiStringStream { | |
60 | public: | |
61 | using Ch = char; | |
62 | explicit MultiStringStream(std::vector<string_view> strings) | |
63 | : strings_(std::move(strings)) { | |
64 | std::reverse(strings_.begin(), strings_.end()); | |
65 | } | |
66 | explicit MultiStringStream(const BufferVector& buffers) : strings_(buffers.size()) { | |
67 | for (size_t i = 0; i < buffers.size(); ++i) { | |
68 | strings_[i] = string_view(*buffers[i]); | |
69 | } | |
70 | std::reverse(strings_.begin(), strings_.end()); | |
71 | } | |
72 | char Peek() const { | |
73 | if (strings_.size() == 0) return '\0'; | |
74 | return strings_.back()[0]; | |
75 | } | |
76 | char Take() { | |
77 | if (strings_.size() == 0) return '\0'; | |
78 | char taken = strings_.back()[0]; | |
79 | if (strings_.back().size() == 1) { | |
80 | strings_.pop_back(); | |
81 | } else { | |
82 | strings_.back() = strings_.back().substr(1); | |
83 | } | |
84 | ++index_; | |
85 | return taken; | |
86 | } | |
87 | size_t Tell() { return index_; } | |
88 | void Put(char) { ARROW_LOG(FATAL) << "not implemented"; } | |
89 | void Flush() { ARROW_LOG(FATAL) << "not implemented"; } | |
90 | char* PutBegin() { | |
91 | ARROW_LOG(FATAL) << "not implemented"; | |
92 | return nullptr; | |
93 | } | |
94 | size_t PutEnd(char*) { | |
95 | ARROW_LOG(FATAL) << "not implemented"; | |
96 | return 0; | |
97 | } | |
98 | ||
99 | private: | |
100 | size_t index_ = 0; | |
101 | std::vector<string_view> strings_; | |
102 | }; | |
103 | ||
104 | template <typename Stream> | |
105 | static size_t ConsumeWholeObject(Stream&& stream) { | |
106 | static constexpr unsigned parse_flags = rj::kParseIterativeFlag | | |
107 | rj::kParseStopWhenDoneFlag | | |
108 | rj::kParseNumbersAsStringsFlag; | |
109 | rj::BaseReaderHandler<rj::UTF8<>> handler; | |
110 | rj::Reader reader; | |
111 | // parse a single JSON object | |
112 | switch (reader.Parse<parse_flags>(stream, handler).Code()) { | |
113 | case rj::kParseErrorNone: | |
114 | return stream.Tell(); | |
115 | case rj::kParseErrorDocumentEmpty: | |
116 | return 0; | |
117 | default: | |
118 | // rapidjson emitted an error, the most recent object was partial | |
119 | return string_view::npos; | |
120 | } | |
121 | } | |
122 | ||
123 | namespace { | |
124 | ||
125 | // A BoundaryFinder implementation that assumes JSON objects can contain raw newlines, | |
126 | // and uses actual JSON parsing to delimit them. | |
127 | class ParsingBoundaryFinder : public BoundaryFinder { | |
128 | public: | |
129 | Status FindFirst(string_view partial, string_view block, int64_t* out_pos) override { | |
130 | // NOTE: We could bubble up JSON parse errors here, but the actual parsing | |
131 | // step will detect them later anyway. | |
132 | auto length = ConsumeWholeObject(MultiStringStream({partial, block})); | |
133 | if (length == string_view::npos) { | |
134 | *out_pos = -1; | |
135 | } else { | |
136 | DCHECK_GE(length, partial.size()); | |
137 | DCHECK_LE(length, partial.size() + block.size()); | |
138 | *out_pos = static_cast<int64_t>(length - partial.size()); | |
139 | } | |
140 | return Status::OK(); | |
141 | } | |
142 | ||
143 | Status FindLast(util::string_view block, int64_t* out_pos) override { | |
144 | const size_t block_length = block.size(); | |
145 | size_t consumed_length = 0; | |
146 | while (consumed_length < block_length) { | |
147 | rj::MemoryStream ms(reinterpret_cast<const char*>(block.data()), block.size()); | |
148 | using InputStream = rj::EncodedInputStream<rj::UTF8<>, rj::MemoryStream>; | |
149 | auto length = ConsumeWholeObject(InputStream(ms)); | |
150 | if (length == string_view::npos || length == 0) { | |
151 | // found incomplete object or block is empty | |
152 | break; | |
153 | } | |
154 | consumed_length += length; | |
155 | block = block.substr(length); | |
156 | } | |
157 | if (consumed_length == 0) { | |
158 | *out_pos = -1; | |
159 | } else { | |
160 | consumed_length += ConsumeWhitespace(block); | |
161 | DCHECK_LE(consumed_length, block_length); | |
162 | *out_pos = static_cast<int64_t>(consumed_length); | |
163 | } | |
164 | return Status::OK(); | |
165 | } | |
166 | ||
167 | Status FindNth(util::string_view partial, util::string_view block, int64_t count, | |
168 | int64_t* out_pos, int64_t* num_found) override { | |
169 | return Status::NotImplemented("ParsingBoundaryFinder::FindNth"); | |
170 | } | |
171 | }; | |
172 | ||
173 | } // namespace | |
174 | ||
175 | std::unique_ptr<Chunker> MakeChunker(const ParseOptions& options) { | |
176 | std::shared_ptr<BoundaryFinder> delimiter; | |
177 | if (options.newlines_in_values) { | |
178 | delimiter = std::make_shared<ParsingBoundaryFinder>(); | |
179 | } else { | |
180 | delimiter = MakeNewlineBoundaryFinder(); | |
181 | } | |
182 | return std::unique_ptr<Chunker>(new Chunker(std::move(delimiter))); | |
183 | } | |
184 | ||
185 | } // namespace json | |
186 | } // namespace arrow |