1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
18 #include "arrow/json/chunker.h"
24 #include "arrow/json/rapidjson_defs.h"
25 #include "rapidjson/reader.h"
27 #include "arrow/buffer.h"
28 #include "arrow/json/options.h"
29 #include "arrow/util/logging.h"
30 #include "arrow/util/make_unique.h"
31 #include "arrow/util/string_view.h"
35 using internal::make_unique
;
36 using util::string_view
;
40 namespace rj
= arrow::rapidjson
;
42 static size_t ConsumeWhitespace(string_view view
) {
44 auto data
= view
.data();
45 auto nonws_begin
= rj::SkipWhitespace_SIMD(data
, data
+ view
.size());
46 return nonws_begin
- data
;
48 auto ws_count
= view
.find_first_not_of(" \t\r\n");
49 if (ws_count
== string_view::npos
) {
57 /// RapidJson custom stream for reading JSON stored in multiple buffers
58 /// http://rapidjson.org/md_doc_stream.html#CustomStream
59 class MultiStringStream
{
62 explicit MultiStringStream(std::vector
<string_view
> strings
)
63 : strings_(std::move(strings
)) {
64 std::reverse(strings_
.begin(), strings_
.end());
66 explicit MultiStringStream(const BufferVector
& buffers
) : strings_(buffers
.size()) {
67 for (size_t i
= 0; i
< buffers
.size(); ++i
) {
68 strings_
[i
] = string_view(*buffers
[i
]);
70 std::reverse(strings_
.begin(), strings_
.end());
73 if (strings_
.size() == 0) return '\0';
74 return strings_
.back()[0];
77 if (strings_
.size() == 0) return '\0';
78 char taken
= strings_
.back()[0];
79 if (strings_
.back().size() == 1) {
82 strings_
.back() = strings_
.back().substr(1);
87 size_t Tell() { return index_
; }
88 void Put(char) { ARROW_LOG(FATAL
) << "not implemented"; }
89 void Flush() { ARROW_LOG(FATAL
) << "not implemented"; }
91 ARROW_LOG(FATAL
) << "not implemented";
94 size_t PutEnd(char*) {
95 ARROW_LOG(FATAL
) << "not implemented";
101 std::vector
<string_view
> strings_
;
104 template <typename Stream
>
105 static size_t ConsumeWholeObject(Stream
&& stream
) {
106 static constexpr unsigned parse_flags
= rj::kParseIterativeFlag
|
107 rj::kParseStopWhenDoneFlag
|
108 rj::kParseNumbersAsStringsFlag
;
109 rj::BaseReaderHandler
<rj::UTF8
<>> handler
;
111 // parse a single JSON object
112 switch (reader
.Parse
<parse_flags
>(stream
, handler
).Code()) {
113 case rj::kParseErrorNone
:
114 return stream
.Tell();
115 case rj::kParseErrorDocumentEmpty
:
118 // rapidjson emitted an error, the most recent object was partial
119 return string_view::npos
;
125 // A BoundaryFinder implementation that assumes JSON objects can contain raw newlines,
126 // and uses actual JSON parsing to delimit them.
127 class ParsingBoundaryFinder
: public BoundaryFinder
{
129 Status
FindFirst(string_view partial
, string_view block
, int64_t* out_pos
) override
{
130 // NOTE: We could bubble up JSON parse errors here, but the actual parsing
131 // step will detect them later anyway.
132 auto length
= ConsumeWholeObject(MultiStringStream({partial
, block
}));
133 if (length
== string_view::npos
) {
136 DCHECK_GE(length
, partial
.size());
137 DCHECK_LE(length
, partial
.size() + block
.size());
138 *out_pos
= static_cast<int64_t>(length
- partial
.size());
143 Status
FindLast(util::string_view block
, int64_t* out_pos
) override
{
144 const size_t block_length
= block
.size();
145 size_t consumed_length
= 0;
146 while (consumed_length
< block_length
) {
147 rj::MemoryStream
ms(reinterpret_cast<const char*>(block
.data()), block
.size());
148 using InputStream
= rj::EncodedInputStream
<rj::UTF8
<>, rj::MemoryStream
>;
149 auto length
= ConsumeWholeObject(InputStream(ms
));
150 if (length
== string_view::npos
|| length
== 0) {
151 // found incomplete object or block is empty
154 consumed_length
+= length
;
155 block
= block
.substr(length
);
157 if (consumed_length
== 0) {
160 consumed_length
+= ConsumeWhitespace(block
);
161 DCHECK_LE(consumed_length
, block_length
);
162 *out_pos
= static_cast<int64_t>(consumed_length
);
167 Status
FindNth(util::string_view partial
, util::string_view block
, int64_t count
,
168 int64_t* out_pos
, int64_t* num_found
) override
{
169 return Status::NotImplemented("ParsingBoundaryFinder::FindNth");
175 std::unique_ptr
<Chunker
> MakeChunker(const ParseOptions
& options
) {
176 std::shared_ptr
<BoundaryFinder
> delimiter
;
177 if (options
.newlines_in_values
) {
178 delimiter
= std::make_shared
<ParsingBoundaryFinder
>();
180 delimiter
= MakeNewlineBoundaryFinder();
182 return std::unique_ptr
<Chunker
>(new Chunker(std::move(delimiter
)));