]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | #include "arrow/compute/function_internal.h" | |
19 | ||
20 | #include "arrow/array/util.h" | |
21 | #include "arrow/compute/function.h" | |
22 | #include "arrow/compute/registry.h" | |
23 | #include "arrow/io/memory.h" | |
24 | #include "arrow/ipc/reader.h" | |
25 | #include "arrow/ipc/writer.h" | |
26 | #include "arrow/record_batch.h" | |
27 | #include "arrow/scalar.h" | |
28 | #include "arrow/util/checked_cast.h" | |
29 | ||
30 | namespace arrow { | |
31 | namespace compute { | |
32 | namespace internal { | |
33 | using ::arrow::internal::checked_cast; | |
34 | ||
35 | constexpr char kTypeNameField[] = "_type_name"; | |
36 | ||
37 | Result<std::shared_ptr<StructScalar>> FunctionOptionsToStructScalar( | |
38 | const FunctionOptions& options) { | |
39 | std::vector<std::string> field_names; | |
40 | std::vector<std::shared_ptr<Scalar>> values; | |
41 | const auto* options_type = | |
42 | dynamic_cast<const GenericOptionsType*>(options.options_type()); | |
43 | if (!options_type) { | |
44 | return Status::NotImplemented("serializing ", options.type_name(), | |
45 | " to StructScalar"); | |
46 | } | |
47 | RETURN_NOT_OK(options_type->ToStructScalar(options, &field_names, &values)); | |
48 | field_names.push_back(kTypeNameField); | |
49 | const char* options_name = options.type_name(); | |
50 | values.emplace_back( | |
51 | new BinaryScalar(Buffer::Wrap(options_name, std::strlen(options_name)))); | |
52 | return StructScalar::Make(std::move(values), std::move(field_names)); | |
53 | } | |
54 | ||
55 | Result<std::unique_ptr<FunctionOptions>> FunctionOptionsFromStructScalar( | |
56 | const StructScalar& scalar) { | |
57 | ARROW_ASSIGN_OR_RAISE(auto type_name_holder, scalar.field(kTypeNameField)); | |
58 | const std::string type_name = | |
59 | checked_cast<const BinaryScalar&>(*type_name_holder).value->ToString(); | |
60 | ARROW_ASSIGN_OR_RAISE(auto raw_options_type, | |
61 | GetFunctionRegistry()->GetFunctionOptionsType(type_name)); | |
62 | const auto* options_type = checked_cast<const GenericOptionsType*>(raw_options_type); | |
63 | return options_type->FromStructScalar(scalar); | |
64 | } | |
65 | ||
66 | Result<std::shared_ptr<Buffer>> GenericOptionsType::Serialize( | |
67 | const FunctionOptions& options) const { | |
68 | ARROW_ASSIGN_OR_RAISE(auto scalar, FunctionOptionsToStructScalar(options)); | |
69 | ARROW_ASSIGN_OR_RAISE(auto array, MakeArrayFromScalar(*scalar, 1)); | |
70 | auto batch = | |
71 | RecordBatch::Make(schema({field("", array->type())}), /*num_rows=*/1, {array}); | |
72 | ARROW_ASSIGN_OR_RAISE(auto stream, io::BufferOutputStream::Create()); | |
73 | ARROW_ASSIGN_OR_RAISE(auto writer, ipc::MakeFileWriter(stream, batch->schema())); | |
74 | RETURN_NOT_OK(writer->WriteRecordBatch(*batch)); | |
75 | RETURN_NOT_OK(writer->Close()); | |
76 | return stream->Finish(); | |
77 | } | |
78 | ||
79 | Result<std::unique_ptr<FunctionOptions>> GenericOptionsType::Deserialize( | |
80 | const Buffer& buffer) const { | |
81 | return DeserializeFunctionOptions(buffer); | |
82 | } | |
83 | ||
84 | Result<std::unique_ptr<FunctionOptions>> DeserializeFunctionOptions( | |
85 | const Buffer& buffer) { | |
86 | io::BufferReader stream(buffer); | |
87 | ARROW_ASSIGN_OR_RAISE(auto reader, ipc::RecordBatchFileReader::Open(&stream)); | |
88 | ARROW_ASSIGN_OR_RAISE(auto batch, reader->ReadRecordBatch(0)); | |
89 | if (batch->num_rows() != 1) { | |
90 | return Status::Invalid( | |
91 | "serialized FunctionOptions's batch repr was not a single row - had ", | |
92 | batch->num_rows()); | |
93 | } | |
94 | if (batch->num_columns() != 1) { | |
95 | return Status::Invalid( | |
96 | "serialized FunctionOptions's batch repr was not a single column - had ", | |
97 | batch->num_columns()); | |
98 | } | |
99 | auto column = batch->column(0); | |
100 | if (column->type()->id() != Type::STRUCT) { | |
101 | return Status::Invalid( | |
102 | "serialized FunctionOptions's batch repr was not a struct column - was ", | |
103 | column->type()->ToString()); | |
104 | } | |
105 | ARROW_ASSIGN_OR_RAISE(auto raw_scalar, | |
106 | checked_cast<const StructArray&>(*column).GetScalar(0)); | |
107 | auto scalar = checked_cast<const StructScalar&>(*raw_scalar); | |
108 | return FunctionOptionsFromStructScalar(scalar); | |
109 | } | |
110 | ||
111 | } // namespace internal | |
112 | } // namespace compute | |
113 | } // namespace arrow |