]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/arrow/cpp/src/arrow/json/reader.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / json / reader.cc
diff --git a/ceph/src/arrow/cpp/src/arrow/json/reader.cc b/ceph/src/arrow/cpp/src/arrow/json/reader.cc
new file mode 100644 (file)
index 0000000..18aed02
--- /dev/null
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/json/reader.h"
+
+#include <utility>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/buffer.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/json/chunked_builder.h"
+#include "arrow/json/chunker.h"
+#include "arrow/json/converter.h"
+#include "arrow/json/parser.h"
+#include "arrow/record_batch.h"
+#include "arrow/table.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/string_view.h"
+#include "arrow/util/task_group.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+
+using util::string_view;
+
+using internal::checked_cast;
+using internal::GetCpuThreadPool;
+using internal::TaskGroup;
+using internal::ThreadPool;
+
+namespace json {
+
+class TableReaderImpl : public TableReader,
+                        public std::enable_shared_from_this<TableReaderImpl> {
+ public:
+  TableReaderImpl(MemoryPool* pool, const ReadOptions& read_options,
+                  const ParseOptions& parse_options,
+                  std::shared_ptr<TaskGroup> task_group)
+      : pool_(pool),
+        read_options_(read_options),
+        parse_options_(parse_options),
+        chunker_(MakeChunker(parse_options_)),
+        task_group_(std::move(task_group)) {}
+
+  Status Init(std::shared_ptr<io::InputStream> input) {
+    ARROW_ASSIGN_OR_RAISE(auto it,
+                          io::MakeInputStreamIterator(input, read_options_.block_size));
+    return MakeReadaheadIterator(std::move(it), task_group_->parallelism())
+        .Value(&block_iterator_);
+  }
+
+  Result<std::shared_ptr<Table>> Read() override {
+    RETURN_NOT_OK(MakeBuilder());
+
+    ARROW_ASSIGN_OR_RAISE(auto block, block_iterator_.Next());
+    if (block == nullptr) {
+      return Status::Invalid("Empty JSON file");
+    }
+
+    auto self = shared_from_this();
+    auto empty = std::make_shared<Buffer>("");
+
+    int64_t block_index = 0;
+    std::shared_ptr<Buffer> partial = empty;
+
+    while (block != nullptr) {
+      std::shared_ptr<Buffer> next_block, whole, completion, next_partial;
+
+      ARROW_ASSIGN_OR_RAISE(next_block, block_iterator_.Next());
+
+      if (next_block == nullptr) {
+        // End of file reached => compute completion from penultimate block
+        RETURN_NOT_OK(chunker_->ProcessFinal(partial, block, &completion, &whole));
+      } else {
+        std::shared_ptr<Buffer> starts_with_whole;
+        // Get completion of partial from previous block.
+        RETURN_NOT_OK(chunker_->ProcessWithPartial(partial, block, &completion,
+                                                   &starts_with_whole));
+
+        // Get all whole objects entirely inside the current buffer
+        RETURN_NOT_OK(chunker_->Process(starts_with_whole, &whole, &next_partial));
+      }
+
+      // Launch parse task
+      task_group_->Append([self, partial, completion, whole, block_index] {
+        return self->ParseAndInsert(partial, completion, whole, block_index);
+      });
+      block_index++;
+
+      partial = next_partial;
+      block = next_block;
+    }
+
+    std::shared_ptr<ChunkedArray> array;
+    RETURN_NOT_OK(builder_->Finish(&array));
+    return Table::FromChunkedStructArray(array);
+  }
+
+ private:
+  Status MakeBuilder() {
+    auto type = parse_options_.explicit_schema
+                    ? struct_(parse_options_.explicit_schema->fields())
+                    : struct_({});
+
+    auto promotion_graph =
+        parse_options_.unexpected_field_behavior == UnexpectedFieldBehavior::InferType
+            ? GetPromotionGraph()
+            : nullptr;
+
+    return MakeChunkedArrayBuilder(task_group_, pool_, promotion_graph, type, &builder_);
+  }
+
+  Status ParseAndInsert(const std::shared_ptr<Buffer>& partial,
+                        const std::shared_ptr<Buffer>& completion,
+                        const std::shared_ptr<Buffer>& whole, int64_t block_index) {
+    std::unique_ptr<BlockParser> parser;
+    RETURN_NOT_OK(BlockParser::Make(pool_, parse_options_, &parser));
+    RETURN_NOT_OK(parser->ReserveScalarStorage(partial->size() + completion->size() +
+                                               whole->size()));
+
+    if (partial->size() != 0 || completion->size() != 0) {
+      std::shared_ptr<Buffer> straddling;
+      if (partial->size() == 0) {
+        straddling = completion;
+      } else if (completion->size() == 0) {
+        straddling = partial;
+      } else {
+        ARROW_ASSIGN_OR_RAISE(straddling,
+                              ConcatenateBuffers({partial, completion}, pool_));
+      }
+      RETURN_NOT_OK(parser->Parse(straddling));
+    }
+
+    if (whole->size() != 0) {
+      RETURN_NOT_OK(parser->Parse(whole));
+    }
+
+    std::shared_ptr<Array> parsed;
+    RETURN_NOT_OK(parser->Finish(&parsed));
+    builder_->Insert(block_index, field("", parsed->type()), parsed);
+    return Status::OK();
+  }
+
+  MemoryPool* pool_;
+  ReadOptions read_options_;
+  ParseOptions parse_options_;
+  std::unique_ptr<Chunker> chunker_;
+  std::shared_ptr<TaskGroup> task_group_;
+  Iterator<std::shared_ptr<Buffer>> block_iterator_;
+  std::shared_ptr<ChunkedArrayBuilder> builder_;
+};
+
+Result<std::shared_ptr<TableReader>> TableReader::Make(
+    MemoryPool* pool, std::shared_ptr<io::InputStream> input,
+    const ReadOptions& read_options, const ParseOptions& parse_options) {
+  std::shared_ptr<TableReaderImpl> ptr;
+  if (read_options.use_threads) {
+    ptr = std::make_shared<TableReaderImpl>(pool, read_options, parse_options,
+                                            TaskGroup::MakeThreaded(GetCpuThreadPool()));
+  } else {
+    ptr = std::make_shared<TableReaderImpl>(pool, read_options, parse_options,
+                                            TaskGroup::MakeSerial());
+  }
+  RETURN_NOT_OK(ptr->Init(input));
+  return ptr;
+}
+
+Result<std::shared_ptr<RecordBatch>> ParseOne(ParseOptions options,
+                                              std::shared_ptr<Buffer> json) {
+  std::unique_ptr<BlockParser> parser;
+  RETURN_NOT_OK(BlockParser::Make(options, &parser));
+  RETURN_NOT_OK(parser->Parse(json));
+  std::shared_ptr<Array> parsed;
+  RETURN_NOT_OK(parser->Finish(&parsed));
+
+  auto type =
+      options.explicit_schema ? struct_(options.explicit_schema->fields()) : struct_({});
+  auto promotion_graph =
+      options.unexpected_field_behavior == UnexpectedFieldBehavior::InferType
+          ? GetPromotionGraph()
+          : nullptr;
+  std::shared_ptr<ChunkedArrayBuilder> builder;
+  RETURN_NOT_OK(MakeChunkedArrayBuilder(TaskGroup::MakeSerial(), default_memory_pool(),
+                                        promotion_graph, type, &builder));
+
+  builder->Insert(0, field("", type), parsed);
+  std::shared_ptr<ChunkedArray> converted_chunked;
+  RETURN_NOT_OK(builder->Finish(&converted_chunked));
+  const auto& converted = checked_cast<const StructArray&>(*converted_chunked->chunk(0));
+
+  std::vector<std::shared_ptr<Array>> columns(converted.num_fields());
+  for (int i = 0; i < converted.num_fields(); ++i) {
+    columns[i] = converted.field(i);
+  }
+  return RecordBatch::Make(schema(converted.type()->fields()), converted.length(),
+                           std::move(columns));
+}
+
+}  // namespace json
+}  // namespace arrow