]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/cpp/src/arrow/compute/function_internal.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / compute / function_internal.cc
CommitLineData
1d09f67e
TL
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "arrow/compute/function_internal.h"
19
20#include "arrow/array/util.h"
21#include "arrow/compute/function.h"
22#include "arrow/compute/registry.h"
23#include "arrow/io/memory.h"
24#include "arrow/ipc/reader.h"
25#include "arrow/ipc/writer.h"
26#include "arrow/record_batch.h"
27#include "arrow/scalar.h"
28#include "arrow/util/checked_cast.h"
29
30namespace arrow {
31namespace compute {
32namespace internal {
33using ::arrow::internal::checked_cast;
34
35constexpr char kTypeNameField[] = "_type_name";
36
37Result<std::shared_ptr<StructScalar>> FunctionOptionsToStructScalar(
38 const FunctionOptions& options) {
39 std::vector<std::string> field_names;
40 std::vector<std::shared_ptr<Scalar>> values;
41 const auto* options_type =
42 dynamic_cast<const GenericOptionsType*>(options.options_type());
43 if (!options_type) {
44 return Status::NotImplemented("serializing ", options.type_name(),
45 " to StructScalar");
46 }
47 RETURN_NOT_OK(options_type->ToStructScalar(options, &field_names, &values));
48 field_names.push_back(kTypeNameField);
49 const char* options_name = options.type_name();
50 values.emplace_back(
51 new BinaryScalar(Buffer::Wrap(options_name, std::strlen(options_name))));
52 return StructScalar::Make(std::move(values), std::move(field_names));
53}
54
55Result<std::unique_ptr<FunctionOptions>> FunctionOptionsFromStructScalar(
56 const StructScalar& scalar) {
57 ARROW_ASSIGN_OR_RAISE(auto type_name_holder, scalar.field(kTypeNameField));
58 const std::string type_name =
59 checked_cast<const BinaryScalar&>(*type_name_holder).value->ToString();
60 ARROW_ASSIGN_OR_RAISE(auto raw_options_type,
61 GetFunctionRegistry()->GetFunctionOptionsType(type_name));
62 const auto* options_type = checked_cast<const GenericOptionsType*>(raw_options_type);
63 return options_type->FromStructScalar(scalar);
64}
65
66Result<std::shared_ptr<Buffer>> GenericOptionsType::Serialize(
67 const FunctionOptions& options) const {
68 ARROW_ASSIGN_OR_RAISE(auto scalar, FunctionOptionsToStructScalar(options));
69 ARROW_ASSIGN_OR_RAISE(auto array, MakeArrayFromScalar(*scalar, 1));
70 auto batch =
71 RecordBatch::Make(schema({field("", array->type())}), /*num_rows=*/1, {array});
72 ARROW_ASSIGN_OR_RAISE(auto stream, io::BufferOutputStream::Create());
73 ARROW_ASSIGN_OR_RAISE(auto writer, ipc::MakeFileWriter(stream, batch->schema()));
74 RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
75 RETURN_NOT_OK(writer->Close());
76 return stream->Finish();
77}
78
79Result<std::unique_ptr<FunctionOptions>> GenericOptionsType::Deserialize(
80 const Buffer& buffer) const {
81 return DeserializeFunctionOptions(buffer);
82}
83
84Result<std::unique_ptr<FunctionOptions>> DeserializeFunctionOptions(
85 const Buffer& buffer) {
86 io::BufferReader stream(buffer);
87 ARROW_ASSIGN_OR_RAISE(auto reader, ipc::RecordBatchFileReader::Open(&stream));
88 ARROW_ASSIGN_OR_RAISE(auto batch, reader->ReadRecordBatch(0));
89 if (batch->num_rows() != 1) {
90 return Status::Invalid(
91 "serialized FunctionOptions's batch repr was not a single row - had ",
92 batch->num_rows());
93 }
94 if (batch->num_columns() != 1) {
95 return Status::Invalid(
96 "serialized FunctionOptions's batch repr was not a single column - had ",
97 batch->num_columns());
98 }
99 auto column = batch->column(0);
100 if (column->type()->id() != Type::STRUCT) {
101 return Status::Invalid(
102 "serialized FunctionOptions's batch repr was not a struct column - was ",
103 column->type()->ToString());
104 }
105 ARROW_ASSIGN_OR_RAISE(auto raw_scalar,
106 checked_cast<const StructArray&>(*column).GetScalar(0));
107 auto scalar = checked_cast<const StructScalar&>(*raw_scalar);
108 return FunctionOptionsFromStructScalar(scalar);
109}
110
111} // namespace internal
112} // namespace compute
113} // namespace arrow