]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/json/chunker.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / json / chunker.cc
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "arrow/json/chunker.h"
19
20 #include <algorithm>
21 #include <utility>
22 #include <vector>
23
24 #include "arrow/json/rapidjson_defs.h"
25 #include "rapidjson/reader.h"
26
27 #include "arrow/buffer.h"
28 #include "arrow/json/options.h"
29 #include "arrow/util/logging.h"
30 #include "arrow/util/make_unique.h"
31 #include "arrow/util/string_view.h"
32
33 namespace arrow {
34
35 using internal::make_unique;
36 using util::string_view;
37
38 namespace json {
39
40 namespace rj = arrow::rapidjson;
41
42 static size_t ConsumeWhitespace(string_view view) {
43 #ifdef RAPIDJSON_SIMD
44 auto data = view.data();
45 auto nonws_begin = rj::SkipWhitespace_SIMD(data, data + view.size());
46 return nonws_begin - data;
47 #else
48 auto ws_count = view.find_first_not_of(" \t\r\n");
49 if (ws_count == string_view::npos) {
50 return view.size();
51 } else {
52 return ws_count;
53 }
54 #endif
55 }
56
57 /// RapidJson custom stream for reading JSON stored in multiple buffers
58 /// http://rapidjson.org/md_doc_stream.html#CustomStream
59 class MultiStringStream {
60 public:
61 using Ch = char;
62 explicit MultiStringStream(std::vector<string_view> strings)
63 : strings_(std::move(strings)) {
64 std::reverse(strings_.begin(), strings_.end());
65 }
66 explicit MultiStringStream(const BufferVector& buffers) : strings_(buffers.size()) {
67 for (size_t i = 0; i < buffers.size(); ++i) {
68 strings_[i] = string_view(*buffers[i]);
69 }
70 std::reverse(strings_.begin(), strings_.end());
71 }
72 char Peek() const {
73 if (strings_.size() == 0) return '\0';
74 return strings_.back()[0];
75 }
76 char Take() {
77 if (strings_.size() == 0) return '\0';
78 char taken = strings_.back()[0];
79 if (strings_.back().size() == 1) {
80 strings_.pop_back();
81 } else {
82 strings_.back() = strings_.back().substr(1);
83 }
84 ++index_;
85 return taken;
86 }
87 size_t Tell() { return index_; }
88 void Put(char) { ARROW_LOG(FATAL) << "not implemented"; }
89 void Flush() { ARROW_LOG(FATAL) << "not implemented"; }
90 char* PutBegin() {
91 ARROW_LOG(FATAL) << "not implemented";
92 return nullptr;
93 }
94 size_t PutEnd(char*) {
95 ARROW_LOG(FATAL) << "not implemented";
96 return 0;
97 }
98
99 private:
100 size_t index_ = 0;
101 std::vector<string_view> strings_;
102 };
103
104 template <typename Stream>
105 static size_t ConsumeWholeObject(Stream&& stream) {
106 static constexpr unsigned parse_flags = rj::kParseIterativeFlag |
107 rj::kParseStopWhenDoneFlag |
108 rj::kParseNumbersAsStringsFlag;
109 rj::BaseReaderHandler<rj::UTF8<>> handler;
110 rj::Reader reader;
111 // parse a single JSON object
112 switch (reader.Parse<parse_flags>(stream, handler).Code()) {
113 case rj::kParseErrorNone:
114 return stream.Tell();
115 case rj::kParseErrorDocumentEmpty:
116 return 0;
117 default:
118 // rapidjson emitted an error, the most recent object was partial
119 return string_view::npos;
120 }
121 }
122
123 namespace {
124
125 // A BoundaryFinder implementation that assumes JSON objects can contain raw newlines,
126 // and uses actual JSON parsing to delimit them.
127 class ParsingBoundaryFinder : public BoundaryFinder {
128 public:
129 Status FindFirst(string_view partial, string_view block, int64_t* out_pos) override {
130 // NOTE: We could bubble up JSON parse errors here, but the actual parsing
131 // step will detect them later anyway.
132 auto length = ConsumeWholeObject(MultiStringStream({partial, block}));
133 if (length == string_view::npos) {
134 *out_pos = -1;
135 } else {
136 DCHECK_GE(length, partial.size());
137 DCHECK_LE(length, partial.size() + block.size());
138 *out_pos = static_cast<int64_t>(length - partial.size());
139 }
140 return Status::OK();
141 }
142
143 Status FindLast(util::string_view block, int64_t* out_pos) override {
144 const size_t block_length = block.size();
145 size_t consumed_length = 0;
146 while (consumed_length < block_length) {
147 rj::MemoryStream ms(reinterpret_cast<const char*>(block.data()), block.size());
148 using InputStream = rj::EncodedInputStream<rj::UTF8<>, rj::MemoryStream>;
149 auto length = ConsumeWholeObject(InputStream(ms));
150 if (length == string_view::npos || length == 0) {
151 // found incomplete object or block is empty
152 break;
153 }
154 consumed_length += length;
155 block = block.substr(length);
156 }
157 if (consumed_length == 0) {
158 *out_pos = -1;
159 } else {
160 consumed_length += ConsumeWhitespace(block);
161 DCHECK_LE(consumed_length, block_length);
162 *out_pos = static_cast<int64_t>(consumed_length);
163 }
164 return Status::OK();
165 }
166
167 Status FindNth(util::string_view partial, util::string_view block, int64_t count,
168 int64_t* out_pos, int64_t* num_found) override {
169 return Status::NotImplemented("ParsingBoundaryFinder::FindNth");
170 }
171 };
172
173 } // namespace
174
175 std::unique_ptr<Chunker> MakeChunker(const ParseOptions& options) {
176 std::shared_ptr<BoundaryFinder> delimiter;
177 if (options.newlines_in_values) {
178 delimiter = std::make_shared<ParsingBoundaryFinder>();
179 } else {
180 delimiter = MakeNewlineBoundaryFinder();
181 }
182 return std::unique_ptr<Chunker>(new Chunker(std::move(delimiter)));
183 }
184
185 } // namespace json
186 } // namespace arrow