]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/compute/kernels/scalar_nested.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / compute / kernels / scalar_nested.cc
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 // Vector kernels involving nested types
19
20 #include "arrow/array/array_base.h"
21 #include "arrow/compute/api_scalar.h"
22 #include "arrow/compute/kernels/common.h"
23 #include "arrow/result.h"
24 #include "arrow/util/bit_block_counter.h"
25
26 namespace arrow {
27 namespace compute {
28 namespace internal {
29 namespace {
30
31 template <typename Type, typename offset_type = typename Type::offset_type>
32 Status ListValueLength(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
33 using ScalarType = typename TypeTraits<Type>::ScalarType;
34 using OffsetScalarType = typename TypeTraits<Type>::OffsetScalarType;
35
36 if (batch[0].kind() == Datum::ARRAY) {
37 typename TypeTraits<Type>::ArrayType list(batch[0].array());
38 ArrayData* out_arr = out->mutable_array();
39 auto out_values = out_arr->GetMutableValues<offset_type>(1);
40 const offset_type* offsets = list.raw_value_offsets();
41 ::arrow::internal::VisitBitBlocksVoid(
42 list.data()->buffers[0], list.offset(), list.length(),
43 [&](int64_t position) {
44 *out_values++ = offsets[position + 1] - offsets[position];
45 },
46 [&]() { *out_values++ = 0; });
47 } else {
48 const auto& arg0 = batch[0].scalar_as<ScalarType>();
49 if (arg0.is_valid) {
50 checked_cast<OffsetScalarType*>(out->scalar().get())->value =
51 static_cast<offset_type>(arg0.value->length());
52 }
53 }
54
55 return Status::OK();
56 }
57
58 Status FixedSizeListValueLength(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
59 using offset_type = typename FixedSizeListType::offset_type;
60 auto width = checked_cast<const FixedSizeListType&>(*batch[0].type()).list_size();
61 if (batch[0].kind() == Datum::ARRAY) {
62 const auto& arr = *batch[0].array();
63 ArrayData* out_arr = out->mutable_array();
64 auto* out_values = out_arr->GetMutableValues<offset_type>(1);
65 std::fill(out_values, out_values + arr.length, width);
66 } else {
67 const auto& arg0 = batch[0].scalar_as<FixedSizeListScalar>();
68 if (arg0.is_valid) {
69 checked_cast<Int32Scalar*>(out->scalar().get())->value = width;
70 }
71 }
72
73 return Status::OK();
74 }
75
76 const FunctionDoc list_value_length_doc{
77 "Compute list lengths",
78 ("`lists` must have a list-like type.\n"
79 "For each non-null value in `lists`, its length is emitted.\n"
80 "Null values emit a null in the output."),
81 {"lists"}};
82
83 template <typename Type, typename IndexType>
84 struct ListElementArray {
85 static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
86 using ListArrayType = typename TypeTraits<Type>::ArrayType;
87 using IndexScalarType = typename TypeTraits<IndexType>::ScalarType;
88 const auto& index_scalar = batch[1].scalar_as<IndexScalarType>();
89 if (ARROW_PREDICT_FALSE(!index_scalar.is_valid)) {
90 return Status::Invalid("Index must not be null");
91 }
92 ListArrayType list_array(batch[0].array());
93 auto index = index_scalar.value;
94 if (ARROW_PREDICT_FALSE(index < 0)) {
95 return Status::Invalid("Index ", index,
96 " is out of bounds: should be greater than or equal to 0");
97 }
98 std::unique_ptr<ArrayBuilder> builder;
99 RETURN_NOT_OK(MakeBuilder(ctx->memory_pool(), list_array.value_type(), &builder));
100 RETURN_NOT_OK(builder->Reserve(list_array.length()));
101 for (int i = 0; i < list_array.length(); ++i) {
102 if (list_array.IsNull(i)) {
103 RETURN_NOT_OK(builder->AppendNull());
104 continue;
105 }
106 std::shared_ptr<arrow::Array> value_array = list_array.value_slice(i);
107 auto len = value_array->length();
108 if (ARROW_PREDICT_FALSE(index >= static_cast<typename IndexType::c_type>(len))) {
109 return Status::Invalid("Index ", index, " is out of bounds: should be in [0, ",
110 len, ")");
111 }
112 RETURN_NOT_OK(builder->AppendArraySlice(*value_array->data(), index, 1));
113 }
114 ARROW_ASSIGN_OR_RAISE(auto result, builder->Finish());
115 out->value = result->data();
116 return Status::OK();
117 }
118 };
119
120 template <typename, typename IndexType>
121 struct ListElementScalar {
122 static Status Exec(KernelContext* /*ctx*/, const ExecBatch& batch, Datum* out) {
123 using IndexScalarType = typename TypeTraits<IndexType>::ScalarType;
124 const auto& index_scalar = batch[1].scalar_as<IndexScalarType>();
125 if (ARROW_PREDICT_FALSE(!index_scalar.is_valid)) {
126 return Status::Invalid("Index must not be null");
127 }
128 const auto& list_scalar = batch[0].scalar_as<BaseListScalar>();
129 if (ARROW_PREDICT_FALSE(!list_scalar.is_valid)) {
130 out->value = MakeNullScalar(
131 checked_cast<const BaseListType&>(*batch[0].type()).value_type());
132 return Status::OK();
133 }
134 auto list = list_scalar.value;
135 auto index = index_scalar.value;
136 auto len = list->length();
137 if (ARROW_PREDICT_FALSE(index < 0 ||
138 index >= static_cast<typename IndexType::c_type>(len))) {
139 return Status::Invalid("Index ", index, " is out of bounds: should be in [0, ", len,
140 ")");
141 }
142 ARROW_ASSIGN_OR_RAISE(out->value, list->GetScalar(index));
143 return Status::OK();
144 }
145 };
146
147 template <typename InListType>
148 void AddListElementArrayKernels(ScalarFunction* func) {
149 for (const auto& index_type : IntTypes()) {
150 auto inputs = {InputType::Array(InListType::type_id), InputType::Scalar(index_type)};
151 auto output = OutputType{ListValuesType};
152 auto sig = KernelSignature::Make(std::move(inputs), std::move(output),
153 /*is_varargs=*/false);
154 auto scalar_exec = GenerateInteger<ListElementArray, InListType>({index_type->id()});
155 ScalarKernel kernel{std::move(sig), std::move(scalar_exec)};
156 kernel.null_handling = NullHandling::COMPUTED_NO_PREALLOCATE;
157 kernel.mem_allocation = MemAllocation::NO_PREALLOCATE;
158 DCHECK_OK(func->AddKernel(std::move(kernel)));
159 }
160 }
161
162 void AddListElementArrayKernels(ScalarFunction* func) {
163 AddListElementArrayKernels<ListType>(func);
164 AddListElementArrayKernels<LargeListType>(func);
165 AddListElementArrayKernels<FixedSizeListType>(func);
166 }
167
168 void AddListElementScalarKernels(ScalarFunction* func) {
169 for (const auto list_type_id : {Type::LIST, Type::LARGE_LIST, Type::FIXED_SIZE_LIST}) {
170 for (const auto& index_type : IntTypes()) {
171 auto inputs = {InputType::Scalar(list_type_id), InputType::Scalar(index_type)};
172 auto output = OutputType{ListValuesType};
173 auto sig = KernelSignature::Make(std::move(inputs), std::move(output),
174 /*is_varargs=*/false);
175 auto scalar_exec = GenerateInteger<ListElementScalar, void>({index_type->id()});
176 ScalarKernel kernel{std::move(sig), std::move(scalar_exec)};
177 kernel.null_handling = NullHandling::COMPUTED_NO_PREALLOCATE;
178 kernel.mem_allocation = MemAllocation::NO_PREALLOCATE;
179 DCHECK_OK(func->AddKernel(std::move(kernel)));
180 }
181 }
182 }
183
184 const FunctionDoc list_element_doc(
185 "Compute elements using of nested list values using an index",
186 ("`lists` must have a list-like type.\n"
187 "For each value in each list of `lists`, the element at `index`\n"
188 "is emitted. Null values emit a null in the output."),
189 {"lists", "index"});
190
191 Result<ValueDescr> MakeStructResolve(KernelContext* ctx,
192 const std::vector<ValueDescr>& descrs) {
193 auto names = OptionsWrapper<MakeStructOptions>::Get(ctx).field_names;
194 auto nullable = OptionsWrapper<MakeStructOptions>::Get(ctx).field_nullability;
195 auto metadata = OptionsWrapper<MakeStructOptions>::Get(ctx).field_metadata;
196
197 if (names.size() == 0) {
198 names.resize(descrs.size());
199 nullable.resize(descrs.size(), true);
200 metadata.resize(descrs.size(), nullptr);
201 int i = 0;
202 for (auto& name : names) {
203 name = std::to_string(i++);
204 }
205 } else if (names.size() != descrs.size() || nullable.size() != descrs.size() ||
206 metadata.size() != descrs.size()) {
207 return Status::Invalid("make_struct() was passed ", descrs.size(), " arguments but ",
208 names.size(), " field names, ", nullable.size(),
209 " nullability bits, and ", metadata.size(),
210 " metadata dictionaries.");
211 }
212
213 size_t i = 0;
214 FieldVector fields(descrs.size());
215
216 ValueDescr::Shape shape = ValueDescr::SCALAR;
217 for (const ValueDescr& descr : descrs) {
218 if (descr.shape != ValueDescr::SCALAR) {
219 shape = ValueDescr::ARRAY;
220 } else {
221 switch (descr.type->id()) {
222 case Type::EXTENSION:
223 case Type::DENSE_UNION:
224 case Type::SPARSE_UNION:
225 return Status::NotImplemented("Broadcasting scalars of type ", *descr.type);
226 default:
227 break;
228 }
229 }
230
231 fields[i] =
232 field(std::move(names[i]), descr.type, nullable[i], std::move(metadata[i]));
233 ++i;
234 }
235
236 return ValueDescr{struct_(std::move(fields)), shape};
237 }
238
239 Status MakeStructExec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
240 ARROW_ASSIGN_OR_RAISE(auto descr, MakeStructResolve(ctx, batch.GetDescriptors()));
241
242 for (int i = 0; i < batch.num_values(); ++i) {
243 const auto& field = checked_cast<const StructType&>(*descr.type).field(i);
244 if (batch[i].null_count() > 0 && !field->nullable()) {
245 return Status::Invalid("Output field ", field, " (#", i,
246 ") does not allow nulls but the corresponding "
247 "argument was not entirely valid.");
248 }
249 }
250
251 if (descr.shape == ValueDescr::SCALAR) {
252 ScalarVector scalars(batch.num_values());
253 for (int i = 0; i < batch.num_values(); ++i) {
254 scalars[i] = batch[i].scalar();
255 }
256
257 *out =
258 Datum(std::make_shared<StructScalar>(std::move(scalars), std::move(descr.type)));
259 return Status::OK();
260 }
261
262 ArrayVector arrays(batch.num_values());
263 for (int i = 0; i < batch.num_values(); ++i) {
264 if (batch[i].is_array()) {
265 arrays[i] = batch[i].make_array();
266 continue;
267 }
268
269 ARROW_ASSIGN_OR_RAISE(arrays[i], MakeArrayFromScalar(*batch[i].scalar(), batch.length,
270 ctx->memory_pool()));
271 }
272
273 *out = std::make_shared<StructArray>(descr.type, batch.length, std::move(arrays));
274 return Status::OK();
275 }
276
277 const FunctionDoc make_struct_doc{"Wrap Arrays into a StructArray",
278 ("Names of the StructArray's fields are\n"
279 "specified through MakeStructOptions."),
280 {"*args"},
281 "MakeStructOptions"};
282
283 } // namespace
284
285 void RegisterScalarNested(FunctionRegistry* registry) {
286 auto list_value_length = std::make_shared<ScalarFunction>(
287 "list_value_length", Arity::Unary(), &list_value_length_doc);
288 DCHECK_OK(list_value_length->AddKernel({InputType(Type::LIST)}, int32(),
289 ListValueLength<ListType>));
290 DCHECK_OK(list_value_length->AddKernel({InputType(Type::FIXED_SIZE_LIST)}, int32(),
291 FixedSizeListValueLength));
292 DCHECK_OK(list_value_length->AddKernel({InputType(Type::LARGE_LIST)}, int64(),
293 ListValueLength<LargeListType>));
294 DCHECK_OK(registry->AddFunction(std::move(list_value_length)));
295
296 auto list_element = std::make_shared<ScalarFunction>("list_element", Arity::Binary(),
297 &list_element_doc);
298 AddListElementArrayKernels(list_element.get());
299 AddListElementScalarKernels(list_element.get());
300 DCHECK_OK(registry->AddFunction(std::move(list_element)));
301
302 static MakeStructOptions kDefaultMakeStructOptions;
303 auto make_struct_function = std::make_shared<ScalarFunction>(
304 "make_struct", Arity::VarArgs(), &make_struct_doc, &kDefaultMakeStructOptions);
305
306 ScalarKernel kernel{KernelSignature::Make({InputType{}}, OutputType{MakeStructResolve},
307 /*is_varargs=*/true),
308 MakeStructExec, OptionsWrapper<MakeStructOptions>::Init};
309 kernel.null_handling = NullHandling::OUTPUT_NOT_NULL;
310 kernel.mem_allocation = MemAllocation::NO_PREALLOCATE;
311 DCHECK_OK(make_struct_function->AddKernel(std::move(kernel)));
312 DCHECK_OK(registry->AddFunction(std::move(make_struct_function)));
313 }
314
315 } // namespace internal
316 } // namespace compute
317 } // namespace arrow