--- /dev/null
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/json/reader.h"
+
+#include <utility>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/buffer.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/json/chunked_builder.h"
+#include "arrow/json/chunker.h"
+#include "arrow/json/converter.h"
+#include "arrow/json/parser.h"
+#include "arrow/record_batch.h"
+#include "arrow/table.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/string_view.h"
+#include "arrow/util/task_group.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+
+using util::string_view;
+
+using internal::checked_cast;
+using internal::GetCpuThreadPool;
+using internal::TaskGroup;
+using internal::ThreadPool;
+
+namespace json {
+
+class TableReaderImpl : public TableReader,
+ public std::enable_shared_from_this<TableReaderImpl> {
+ public:
+ TableReaderImpl(MemoryPool* pool, const ReadOptions& read_options,
+ const ParseOptions& parse_options,
+ std::shared_ptr<TaskGroup> task_group)
+ : pool_(pool),
+ read_options_(read_options),
+ parse_options_(parse_options),
+ chunker_(MakeChunker(parse_options_)),
+ task_group_(std::move(task_group)) {}
+
+ Status Init(std::shared_ptr<io::InputStream> input) {
+ ARROW_ASSIGN_OR_RAISE(auto it,
+ io::MakeInputStreamIterator(input, read_options_.block_size));
+ return MakeReadaheadIterator(std::move(it), task_group_->parallelism())
+ .Value(&block_iterator_);
+ }
+
+ Result<std::shared_ptr<Table>> Read() override {
+ RETURN_NOT_OK(MakeBuilder());
+
+ ARROW_ASSIGN_OR_RAISE(auto block, block_iterator_.Next());
+ if (block == nullptr) {
+ return Status::Invalid("Empty JSON file");
+ }
+
+ auto self = shared_from_this();
+ auto empty = std::make_shared<Buffer>("");
+
+ int64_t block_index = 0;
+ std::shared_ptr<Buffer> partial = empty;
+
+ while (block != nullptr) {
+ std::shared_ptr<Buffer> next_block, whole, completion, next_partial;
+
+ ARROW_ASSIGN_OR_RAISE(next_block, block_iterator_.Next());
+
+ if (next_block == nullptr) {
+ // End of file reached => compute completion from penultimate block
+ RETURN_NOT_OK(chunker_->ProcessFinal(partial, block, &completion, &whole));
+ } else {
+ std::shared_ptr<Buffer> starts_with_whole;
+ // Get completion of partial from previous block.
+ RETURN_NOT_OK(chunker_->ProcessWithPartial(partial, block, &completion,
+ &starts_with_whole));
+
+ // Get all whole objects entirely inside the current buffer
+ RETURN_NOT_OK(chunker_->Process(starts_with_whole, &whole, &next_partial));
+ }
+
+ // Launch parse task
+ task_group_->Append([self, partial, completion, whole, block_index] {
+ return self->ParseAndInsert(partial, completion, whole, block_index);
+ });
+ block_index++;
+
+ partial = next_partial;
+ block = next_block;
+ }
+
+ std::shared_ptr<ChunkedArray> array;
+ RETURN_NOT_OK(builder_->Finish(&array));
+ return Table::FromChunkedStructArray(array);
+ }
+
+ private:
+ Status MakeBuilder() {
+ auto type = parse_options_.explicit_schema
+ ? struct_(parse_options_.explicit_schema->fields())
+ : struct_({});
+
+ auto promotion_graph =
+ parse_options_.unexpected_field_behavior == UnexpectedFieldBehavior::InferType
+ ? GetPromotionGraph()
+ : nullptr;
+
+ return MakeChunkedArrayBuilder(task_group_, pool_, promotion_graph, type, &builder_);
+ }
+
+ Status ParseAndInsert(const std::shared_ptr<Buffer>& partial,
+ const std::shared_ptr<Buffer>& completion,
+ const std::shared_ptr<Buffer>& whole, int64_t block_index) {
+ std::unique_ptr<BlockParser> parser;
+ RETURN_NOT_OK(BlockParser::Make(pool_, parse_options_, &parser));
+ RETURN_NOT_OK(parser->ReserveScalarStorage(partial->size() + completion->size() +
+ whole->size()));
+
+ if (partial->size() != 0 || completion->size() != 0) {
+ std::shared_ptr<Buffer> straddling;
+ if (partial->size() == 0) {
+ straddling = completion;
+ } else if (completion->size() == 0) {
+ straddling = partial;
+ } else {
+ ARROW_ASSIGN_OR_RAISE(straddling,
+ ConcatenateBuffers({partial, completion}, pool_));
+ }
+ RETURN_NOT_OK(parser->Parse(straddling));
+ }
+
+ if (whole->size() != 0) {
+ RETURN_NOT_OK(parser->Parse(whole));
+ }
+
+ std::shared_ptr<Array> parsed;
+ RETURN_NOT_OK(parser->Finish(&parsed));
+ builder_->Insert(block_index, field("", parsed->type()), parsed);
+ return Status::OK();
+ }
+
+ MemoryPool* pool_;
+ ReadOptions read_options_;
+ ParseOptions parse_options_;
+ std::unique_ptr<Chunker> chunker_;
+ std::shared_ptr<TaskGroup> task_group_;
+ Iterator<std::shared_ptr<Buffer>> block_iterator_;
+ std::shared_ptr<ChunkedArrayBuilder> builder_;
+};
+
+Result<std::shared_ptr<TableReader>> TableReader::Make(
+ MemoryPool* pool, std::shared_ptr<io::InputStream> input,
+ const ReadOptions& read_options, const ParseOptions& parse_options) {
+ std::shared_ptr<TableReaderImpl> ptr;
+ if (read_options.use_threads) {
+ ptr = std::make_shared<TableReaderImpl>(pool, read_options, parse_options,
+ TaskGroup::MakeThreaded(GetCpuThreadPool()));
+ } else {
+ ptr = std::make_shared<TableReaderImpl>(pool, read_options, parse_options,
+ TaskGroup::MakeSerial());
+ }
+ RETURN_NOT_OK(ptr->Init(input));
+ return ptr;
+}
+
+Result<std::shared_ptr<RecordBatch>> ParseOne(ParseOptions options,
+ std::shared_ptr<Buffer> json) {
+ std::unique_ptr<BlockParser> parser;
+ RETURN_NOT_OK(BlockParser::Make(options, &parser));
+ RETURN_NOT_OK(parser->Parse(json));
+ std::shared_ptr<Array> parsed;
+ RETURN_NOT_OK(parser->Finish(&parsed));
+
+ auto type =
+ options.explicit_schema ? struct_(options.explicit_schema->fields()) : struct_({});
+ auto promotion_graph =
+ options.unexpected_field_behavior == UnexpectedFieldBehavior::InferType
+ ? GetPromotionGraph()
+ : nullptr;
+ std::shared_ptr<ChunkedArrayBuilder> builder;
+ RETURN_NOT_OK(MakeChunkedArrayBuilder(TaskGroup::MakeSerial(), default_memory_pool(),
+ promotion_graph, type, &builder));
+
+ builder->Insert(0, field("", type), parsed);
+ std::shared_ptr<ChunkedArray> converted_chunked;
+ RETURN_NOT_OK(builder->Finish(&converted_chunked));
+ const auto& converted = checked_cast<const StructArray&>(*converted_chunked->chunk(0));
+
+ std::vector<std::shared_ptr<Array>> columns(converted.num_fields());
+ for (int i = 0; i < converted.num_fields(); ++i) {
+ columns[i] = converted.field(i);
+ }
+ return RecordBatch::Make(schema(converted.type()->fields()), converted.length(),
+ std::move(columns));
+}
+
+} // namespace json
+} // namespace arrow