]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/arrow/cpp/src/arrow/util/converter.h
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / util / converter.h
diff --git a/ceph/src/arrow/cpp/src/arrow/util/converter.h b/ceph/src/arrow/cpp/src/arrow/util/converter.h
new file mode 100644 (file)
index 0000000..0b29e0f
--- /dev/null
@@ -0,0 +1,411 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/chunked_array.h"
+#include "arrow/status.h"
+#include "arrow/type.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/visitor_inline.h"
+
+namespace arrow {
+namespace internal {
+
+template <typename BaseConverter, template <typename...> class ConverterTrait>
+static Result<std::unique_ptr<BaseConverter>> MakeConverter(
+    std::shared_ptr<DataType> type, typename BaseConverter::OptionsType options,
+    MemoryPool* pool);
+
+template <typename Input, typename Options>
+class Converter {
+ public:
+  using Self = Converter<Input, Options>;
+  using InputType = Input;
+  using OptionsType = Options;
+
+  virtual ~Converter() = default;
+
+  Status Construct(std::shared_ptr<DataType> type, OptionsType options,
+                   MemoryPool* pool) {
+    type_ = std::move(type);
+    options_ = std::move(options);
+    return Init(pool);
+  }
+
+  virtual Status Append(InputType value) { return Status::NotImplemented("Append"); }
+
+  virtual Status Extend(InputType values, int64_t size, int64_t offset = 0) {
+    return Status::NotImplemented("Extend");
+  }
+
+  virtual Status ExtendMasked(InputType values, InputType mask, int64_t size,
+                              int64_t offset = 0) {
+    return Status::NotImplemented("ExtendMasked");
+  }
+
+  const std::shared_ptr<ArrayBuilder>& builder() const { return builder_; }
+
+  const std::shared_ptr<DataType>& type() const { return type_; }
+
+  OptionsType options() const { return options_; }
+
+  bool may_overflow() const { return may_overflow_; }
+
+  bool rewind_on_overflow() const { return rewind_on_overflow_; }
+
+  virtual Status Reserve(int64_t additional_capacity) {
+    return builder_->Reserve(additional_capacity);
+  }
+
+  Status AppendNull() { return builder_->AppendNull(); }
+
+  virtual Result<std::shared_ptr<Array>> ToArray() { return builder_->Finish(); }
+
+  virtual Result<std::shared_ptr<Array>> ToArray(int64_t length) {
+    ARROW_ASSIGN_OR_RAISE(auto arr, this->ToArray());
+    return arr->Slice(0, length);
+  }
+
+  virtual Result<std::shared_ptr<ChunkedArray>> ToChunkedArray() {
+    ARROW_ASSIGN_OR_RAISE(auto array, ToArray());
+    std::vector<std::shared_ptr<Array>> chunks = {std::move(array)};
+    return std::make_shared<ChunkedArray>(chunks);
+  }
+
+ protected:
+  virtual Status Init(MemoryPool* pool) { return Status::OK(); }
+
+  std::shared_ptr<DataType> type_;
+  std::shared_ptr<ArrayBuilder> builder_;
+  OptionsType options_;
+  bool may_overflow_ = false;
+  bool rewind_on_overflow_ = false;
+};
+
+template <typename ArrowType, typename BaseConverter>
+class PrimitiveConverter : public BaseConverter {
+ public:
+  using BuilderType = typename TypeTraits<ArrowType>::BuilderType;
+
+ protected:
+  Status Init(MemoryPool* pool) override {
+    this->builder_ = std::make_shared<BuilderType>(this->type_, pool);
+    // Narrow variable-sized binary types may overflow
+    this->may_overflow_ = is_binary_like(this->type_->id());
+    primitive_type_ = checked_cast<const ArrowType*>(this->type_.get());
+    primitive_builder_ = checked_cast<BuilderType*>(this->builder_.get());
+    return Status::OK();
+  }
+
+  const ArrowType* primitive_type_;
+  BuilderType* primitive_builder_;
+};
+
+template <typename ArrowType, typename BaseConverter,
+          template <typename...> class ConverterTrait>
+class ListConverter : public BaseConverter {
+ public:
+  using BuilderType = typename TypeTraits<ArrowType>::BuilderType;
+  using ConverterType = typename ConverterTrait<ArrowType>::type;
+
+ protected:
+  Status Init(MemoryPool* pool) override {
+    list_type_ = checked_cast<const ArrowType*>(this->type_.get());
+    ARROW_ASSIGN_OR_RAISE(value_converter_,
+                          (MakeConverter<BaseConverter, ConverterTrait>(
+                              list_type_->value_type(), this->options_, pool)));
+    this->builder_ =
+        std::make_shared<BuilderType>(pool, value_converter_->builder(), this->type_);
+    list_builder_ = checked_cast<BuilderType*>(this->builder_.get());
+    // Narrow list types may overflow
+    this->may_overflow_ = this->rewind_on_overflow_ =
+        sizeof(typename ArrowType::offset_type) < sizeof(int64_t);
+    return Status::OK();
+  }
+
+  const ArrowType* list_type_;
+  BuilderType* list_builder_;
+  std::unique_ptr<BaseConverter> value_converter_;
+};
+
+template <typename BaseConverter, template <typename...> class ConverterTrait>
+class StructConverter : public BaseConverter {
+ public:
+  using ConverterType = typename ConverterTrait<StructType>::type;
+
+  Status Reserve(int64_t additional_capacity) override {
+    ARROW_RETURN_NOT_OK(this->builder_->Reserve(additional_capacity));
+    for (const auto& child : children_) {
+      ARROW_RETURN_NOT_OK(child->Reserve(additional_capacity));
+    }
+    return Status::OK();
+  }
+
+ protected:
+  Status Init(MemoryPool* pool) override {
+    std::unique_ptr<BaseConverter> child_converter;
+    std::vector<std::shared_ptr<ArrayBuilder>> child_builders;
+
+    struct_type_ = checked_cast<const StructType*>(this->type_.get());
+    for (const auto& field : struct_type_->fields()) {
+      ARROW_ASSIGN_OR_RAISE(child_converter,
+                            (MakeConverter<BaseConverter, ConverterTrait>(
+                                field->type(), this->options_, pool)));
+      this->may_overflow_ |= child_converter->may_overflow();
+      this->rewind_on_overflow_ = this->may_overflow_;
+      child_builders.push_back(child_converter->builder());
+      children_.push_back(std::move(child_converter));
+    }
+
+    this->builder_ =
+        std::make_shared<StructBuilder>(this->type_, pool, std::move(child_builders));
+    struct_builder_ = checked_cast<StructBuilder*>(this->builder_.get());
+
+    return Status::OK();
+  }
+
+  const StructType* struct_type_;
+  StructBuilder* struct_builder_;
+  std::vector<std::unique_ptr<BaseConverter>> children_;
+};
+
+template <typename ValueType, typename BaseConverter>
+class DictionaryConverter : public BaseConverter {
+ public:
+  using BuilderType = DictionaryBuilder<ValueType>;
+
+ protected:
+  Status Init(MemoryPool* pool) override {
+    std::unique_ptr<ArrayBuilder> builder;
+    ARROW_RETURN_NOT_OK(MakeDictionaryBuilder(pool, this->type_, NULLPTR, &builder));
+    this->builder_ = std::move(builder);
+    this->may_overflow_ = false;
+    dict_type_ = checked_cast<const DictionaryType*>(this->type_.get());
+    value_type_ = checked_cast<const ValueType*>(dict_type_->value_type().get());
+    value_builder_ = checked_cast<BuilderType*>(this->builder_.get());
+    return Status::OK();
+  }
+
+  const DictionaryType* dict_type_;
+  const ValueType* value_type_;
+  BuilderType* value_builder_;
+};
+
+template <typename BaseConverter, template <typename...> class ConverterTrait>
+struct MakeConverterImpl {
+  template <typename T, typename ConverterType = typename ConverterTrait<T>::type>
+  Status Visit(const T&) {
+    out.reset(new ConverterType());
+    return out->Construct(std::move(type), std::move(options), pool);
+  }
+
+  Status Visit(const DictionaryType& t) {
+    switch (t.value_type()->id()) {
+#define DICTIONARY_CASE(TYPE)                                                       \
+  case TYPE::type_id:                                                               \
+    out = internal::make_unique<                                                    \
+        typename ConverterTrait<DictionaryType>::template dictionary_type<TYPE>>(); \
+    break;
+      DICTIONARY_CASE(BooleanType);
+      DICTIONARY_CASE(Int8Type);
+      DICTIONARY_CASE(Int16Type);
+      DICTIONARY_CASE(Int32Type);
+      DICTIONARY_CASE(Int64Type);
+      DICTIONARY_CASE(UInt8Type);
+      DICTIONARY_CASE(UInt16Type);
+      DICTIONARY_CASE(UInt32Type);
+      DICTIONARY_CASE(UInt64Type);
+      DICTIONARY_CASE(FloatType);
+      DICTIONARY_CASE(DoubleType);
+      DICTIONARY_CASE(BinaryType);
+      DICTIONARY_CASE(StringType);
+      DICTIONARY_CASE(FixedSizeBinaryType);
+#undef DICTIONARY_CASE
+      default:
+        return Status::NotImplemented("DictionaryArray converter for type ", t.ToString(),
+                                      " not implemented");
+    }
+    return out->Construct(std::move(type), std::move(options), pool);
+  }
+
+  Status Visit(const DataType& t) { return Status::NotImplemented(t.name()); }
+
+  std::shared_ptr<DataType> type;
+  typename BaseConverter::OptionsType options;
+  MemoryPool* pool;
+  std::unique_ptr<BaseConverter> out;
+};
+
+template <typename BaseConverter, template <typename...> class ConverterTrait>
+static Result<std::unique_ptr<BaseConverter>> MakeConverter(
+    std::shared_ptr<DataType> type, typename BaseConverter::OptionsType options,
+    MemoryPool* pool) {
+  MakeConverterImpl<BaseConverter, ConverterTrait> visitor{
+      std::move(type), std::move(options), pool, NULLPTR};
+  ARROW_RETURN_NOT_OK(VisitTypeInline(*visitor.type, &visitor));
+  return std::move(visitor.out);
+}
+
+template <typename Converter>
+class Chunker {
+ public:
+  using InputType = typename Converter::InputType;
+
+  explicit Chunker(std::unique_ptr<Converter> converter)
+      : converter_(std::move(converter)) {}
+
+  Status Reserve(int64_t additional_capacity) {
+    ARROW_RETURN_NOT_OK(converter_->Reserve(additional_capacity));
+    reserved_ += additional_capacity;
+    return Status::OK();
+  }
+
+  Status AppendNull() {
+    auto status = converter_->AppendNull();
+    if (ARROW_PREDICT_FALSE(status.IsCapacityError())) {
+      if (converter_->builder()->length() == 0) {
+        // Builder length == 0 means the individual element is too large to append.
+        // In this case, no need to try again.
+        return status;
+      }
+      ARROW_RETURN_NOT_OK(FinishChunk());
+      return converter_->AppendNull();
+    }
+    ++length_;
+    return status;
+  }
+
+  Status Append(InputType value) {
+    auto status = converter_->Append(value);
+    if (ARROW_PREDICT_FALSE(status.IsCapacityError())) {
+      if (converter_->builder()->length() == 0) {
+        return status;
+      }
+      ARROW_RETURN_NOT_OK(FinishChunk());
+      return Append(value);
+    }
+    ++length_;
+    return status;
+  }
+
+  Status Extend(InputType values, int64_t size, int64_t offset = 0) {
+    while (offset < size) {
+      auto length_before = converter_->builder()->length();
+      auto status = converter_->Extend(values, size, offset);
+      auto length_after = converter_->builder()->length();
+      auto num_converted = length_after - length_before;
+
+      offset += num_converted;
+      length_ += num_converted;
+
+      if (status.IsCapacityError()) {
+        if (converter_->builder()->length() == 0) {
+          // Builder length == 0 means the individual element is too large to append.
+          // In this case, no need to try again.
+          return status;
+        } else if (converter_->rewind_on_overflow()) {
+          // The list-like and binary-like conversion paths may raise  a capacity error,
+          // we need to handle them differently. While the binary-like converters check
+          // the capacity before append/extend the list-like converters just check after
+          // append/extend. Thus depending on the implementation semantics we may need
+          // to rewind (slice) the output chunk by one.
+          length_ -= 1;
+          offset -= 1;
+        }
+        ARROW_RETURN_NOT_OK(FinishChunk());
+      } else if (!status.ok()) {
+        return status;
+      }
+    }
+    return Status::OK();
+  }
+
+  Status ExtendMasked(InputType values, InputType mask, int64_t size,
+                      int64_t offset = 0) {
+    while (offset < size) {
+      auto length_before = converter_->builder()->length();
+      auto status = converter_->ExtendMasked(values, mask, size, offset);
+      auto length_after = converter_->builder()->length();
+      auto num_converted = length_after - length_before;
+
+      offset += num_converted;
+      length_ += num_converted;
+
+      if (status.IsCapacityError()) {
+        if (converter_->builder()->length() == 0) {
+          // Builder length == 0 means the individual element is too large to append.
+          // In this case, no need to try again.
+          return status;
+        } else if (converter_->rewind_on_overflow()) {
+          // The list-like and binary-like conversion paths may raise  a capacity error,
+          // we need to handle them differently. While the binary-like converters check
+          // the capacity before append/extend the list-like converters just check after
+          // append/extend. Thus depending on the implementation semantics we may need
+          // to rewind (slice) the output chunk by one.
+          length_ -= 1;
+          offset -= 1;
+        }
+        ARROW_RETURN_NOT_OK(FinishChunk());
+      } else if (!status.ok()) {
+        return status;
+      }
+    }
+    return Status::OK();
+  }
+
+  Status FinishChunk() {
+    ARROW_ASSIGN_OR_RAISE(auto chunk, converter_->ToArray(length_));
+    chunks_.push_back(chunk);
+    // Reserve space for the remaining items.
+    // Besides being an optimization, it is also required if the converter's
+    // implementation relies on unsafe builder methods in converter->Append().
+    auto remaining = reserved_ - length_;
+    Reset();
+    return Reserve(remaining);
+  }
+
+  Result<std::shared_ptr<ChunkedArray>> ToChunkedArray() {
+    ARROW_RETURN_NOT_OK(FinishChunk());
+    return std::make_shared<ChunkedArray>(chunks_);
+  }
+
+ protected:
+  void Reset() {
+    converter_->builder()->Reset();
+    length_ = 0;
+    reserved_ = 0;
+  }
+
+  int64_t length_ = 0;
+  int64_t reserved_ = 0;
+  std::unique_ptr<Converter> converter_;
+  std::vector<std::shared_ptr<Array>> chunks_;
+};
+
+template <typename T>
+static Result<std::unique_ptr<Chunker<T>>> MakeChunker(std::unique_ptr<T> converter) {
+  return internal::make_unique<Chunker<T>>(std::move(converter));
+}
+
+}  // namespace internal
+}  // namespace arrow