]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/cpp/src/arrow/array/util.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / array / util.cc
CommitLineData
1d09f67e
TL
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "arrow/array/util.h"
19
20#include <algorithm>
21#include <array>
22#include <cstdint>
23#include <cstring>
24#include <limits>
25#include <memory>
26#include <type_traits>
27#include <utility>
28#include <vector>
29
30#include "arrow/array/array_base.h"
31#include "arrow/array/array_dict.h"
32#include "arrow/array/array_primitive.h"
33#include "arrow/array/concatenate.h"
34#include "arrow/buffer.h"
35#include "arrow/buffer_builder.h"
36#include "arrow/extension_type.h"
37#include "arrow/result.h"
38#include "arrow/scalar.h"
39#include "arrow/status.h"
40#include "arrow/type.h"
41#include "arrow/type_traits.h"
42#include "arrow/util/bit_util.h"
43#include "arrow/util/checked_cast.h"
44#include "arrow/util/decimal.h"
45#include "arrow/util/endian.h"
46#include "arrow/util/logging.h"
47#include "arrow/visitor_inline.h"
48
49namespace arrow {
50
51using internal::checked_cast;
52
53// ----------------------------------------------------------------------
54// Loading from ArrayData
55
56namespace {
57
58class ArrayDataWrapper {
59 public:
60 ArrayDataWrapper(const std::shared_ptr<ArrayData>& data, std::shared_ptr<Array>* out)
61 : data_(data), out_(out) {}
62
63 template <typename T>
64 Status Visit(const T&) {
65 using ArrayType = typename TypeTraits<T>::ArrayType;
66 *out_ = std::make_shared<ArrayType>(data_);
67 return Status::OK();
68 }
69
70 Status Visit(const ExtensionType& type) {
71 *out_ = type.MakeArray(data_);
72 return Status::OK();
73 }
74
75 const std::shared_ptr<ArrayData>& data_;
76 std::shared_ptr<Array>* out_;
77};
78
79class ArrayDataEndianSwapper {
80 public:
81 explicit ArrayDataEndianSwapper(const std::shared_ptr<ArrayData>& data) : data_(data) {
82 out_ = data->Copy();
83 }
84
85 // WARNING: this facility can be called on invalid Array data by the IPC reader.
86 // Do not rely on the advertised ArrayData length, instead use the physical
87 // buffer sizes to avoid accessing memory out of bounds.
88 //
89 // (If this guarantee turns out to be difficult to maintain, we should call
90 // Validate() instead)
91 Status SwapType(const DataType& type) {
92 RETURN_NOT_OK(VisitTypeInline(type, this));
93 RETURN_NOT_OK(SwapChildren(type.fields()));
94 if (internal::HasValidityBitmap(type.id())) {
95 // Copy null bitmap
96 out_->buffers[0] = data_->buffers[0];
97 }
98 return Status::OK();
99 }
100
101 Status SwapChildren(const FieldVector& child_fields) {
102 for (size_t i = 0; i < child_fields.size(); i++) {
103 ARROW_ASSIGN_OR_RAISE(out_->child_data[i],
104 internal::SwapEndianArrayData(data_->child_data[i]));
105 }
106 return Status::OK();
107 }
108
109 template <typename T>
110 Result<std::shared_ptr<Buffer>> ByteSwapBuffer(
111 const std::shared_ptr<Buffer>& in_buffer) {
112 if (sizeof(T) == 1) {
113 // if data size is 1, element is not swapped. We can use the original buffer
114 return in_buffer;
115 }
116 auto in_data = reinterpret_cast<const T*>(in_buffer->data());
117 ARROW_ASSIGN_OR_RAISE(auto out_buffer, AllocateBuffer(in_buffer->size()));
118 auto out_data = reinterpret_cast<T*>(out_buffer->mutable_data());
119 // NOTE: data_->length not trusted (see warning above)
120 int64_t length = in_buffer->size() / sizeof(T);
121 for (int64_t i = 0; i < length; i++) {
122 out_data[i] = BitUtil::ByteSwap(in_data[i]);
123 }
124 return std::move(out_buffer);
125 }
126
127 template <typename VALUE_TYPE>
128 Status SwapOffsets(int index) {
129 if (data_->buffers[index] == nullptr || data_->buffers[index]->size() == 0) {
130 out_->buffers[index] = data_->buffers[index];
131 return Status::OK();
132 }
133 // Except union, offset has one more element rather than data->length
134 ARROW_ASSIGN_OR_RAISE(out_->buffers[index],
135 ByteSwapBuffer<VALUE_TYPE>(data_->buffers[index]));
136 return Status::OK();
137 }
138
139 template <typename T>
140 enable_if_t<std::is_base_of<FixedWidthType, T>::value &&
141 !std::is_base_of<FixedSizeBinaryType, T>::value &&
142 !std::is_base_of<DictionaryType, T>::value,
143 Status>
144 Visit(const T& type) {
145 using value_type = typename T::c_type;
146 ARROW_ASSIGN_OR_RAISE(out_->buffers[1],
147 ByteSwapBuffer<value_type>(data_->buffers[1]));
148 return Status::OK();
149 }
150
151 Status Visit(const Decimal128Type& type) {
152 auto data = reinterpret_cast<const uint64_t*>(data_->buffers[1]->data());
153 ARROW_ASSIGN_OR_RAISE(auto new_buffer, AllocateBuffer(data_->buffers[1]->size()));
154 auto new_data = reinterpret_cast<uint64_t*>(new_buffer->mutable_data());
155 // NOTE: data_->length not trusted (see warning above)
156 const int64_t length = data_->buffers[1]->size() / Decimal128Type::kByteWidth;
157 for (int64_t i = 0; i < length; i++) {
158 uint64_t tmp;
159 auto idx = i * 2;
160#if ARROW_LITTLE_ENDIAN
161 tmp = BitUtil::FromBigEndian(data[idx]);
162 new_data[idx] = BitUtil::FromBigEndian(data[idx + 1]);
163 new_data[idx + 1] = tmp;
164#else
165 tmp = BitUtil::FromLittleEndian(data[idx]);
166 new_data[idx] = BitUtil::FromLittleEndian(data[idx + 1]);
167 new_data[idx + 1] = tmp;
168#endif
169 }
170 out_->buffers[1] = std::move(new_buffer);
171 return Status::OK();
172 }
173
174 Status Visit(const Decimal256Type& type) {
175 auto data = reinterpret_cast<const uint64_t*>(data_->buffers[1]->data());
176 ARROW_ASSIGN_OR_RAISE(auto new_buffer, AllocateBuffer(data_->buffers[1]->size()));
177 auto new_data = reinterpret_cast<uint64_t*>(new_buffer->mutable_data());
178 // NOTE: data_->length not trusted (see warning above)
179 const int64_t length = data_->buffers[1]->size() / Decimal256Type::kByteWidth;
180 for (int64_t i = 0; i < length; i++) {
181 uint64_t tmp0, tmp1, tmp2;
182 auto idx = i * 4;
183#if ARROW_LITTLE_ENDIAN
184 tmp0 = BitUtil::FromBigEndian(data[idx]);
185 tmp1 = BitUtil::FromBigEndian(data[idx + 1]);
186 tmp2 = BitUtil::FromBigEndian(data[idx + 2]);
187 new_data[idx] = BitUtil::FromBigEndian(data[idx + 3]);
188 new_data[idx + 1] = tmp2;
189 new_data[idx + 2] = tmp1;
190 new_data[idx + 3] = tmp0;
191#else
192 tmp0 = BitUtil::FromLittleEndian(data[idx]);
193 tmp1 = BitUtil::FromLittleEndian(data[idx + 1]);
194 tmp2 = BitUtil::FromLittleEndian(data[idx + 2]);
195 new_data[idx] = BitUtil::FromLittleEndian(data[idx + 3]);
196 new_data[idx + 1] = tmp2;
197 new_data[idx + 2] = tmp1;
198 new_data[idx + 3] = tmp0;
199#endif
200 }
201 out_->buffers[1] = std::move(new_buffer);
202 return Status::OK();
203 }
204
205 Status Visit(const DayTimeIntervalType& type) {
206 ARROW_ASSIGN_OR_RAISE(out_->buffers[1], ByteSwapBuffer<uint32_t>(data_->buffers[1]));
207 return Status::OK();
208 }
209
210 Status Visit(const MonthDayNanoIntervalType& type) {
211 using MonthDayNanos = MonthDayNanoIntervalType::MonthDayNanos;
212 auto data = reinterpret_cast<const MonthDayNanos*>(data_->buffers[1]->data());
213 ARROW_ASSIGN_OR_RAISE(auto new_buffer, AllocateBuffer(data_->buffers[1]->size()));
214 auto new_data = reinterpret_cast<MonthDayNanos*>(new_buffer->mutable_data());
215 // NOTE: data_->length not trusted (see warning above)
216 const int64_t length = data_->buffers[1]->size() / sizeof(MonthDayNanos);
217 for (int64_t i = 0; i < length; i++) {
218 MonthDayNanos tmp = data[i];
219#if ARROW_LITTLE_ENDIAN
220 tmp.months = BitUtil::FromBigEndian(tmp.months);
221 tmp.days = BitUtil::FromBigEndian(tmp.days);
222 tmp.nanoseconds = BitUtil::FromBigEndian(tmp.nanoseconds);
223#else
224 tmp.months = BitUtil::FromLittleEndian(tmp.months);
225 tmp.days = BitUtil::FromLittleEndian(tmp.days);
226 tmp.nanoseconds = BitUtil::FromLittleEndian(tmp.nanoseconds);
227#endif
228 new_data[i] = tmp;
229 }
230 out_->buffers[1] = std::move(new_buffer);
231 return Status::OK();
232 }
233
234 Status Visit(const NullType& type) { return Status::OK(); }
235 Status Visit(const BooleanType& type) { return Status::OK(); }
236 Status Visit(const Int8Type& type) { return Status::OK(); }
237 Status Visit(const UInt8Type& type) { return Status::OK(); }
238 Status Visit(const FixedSizeBinaryType& type) { return Status::OK(); }
239 Status Visit(const FixedSizeListType& type) { return Status::OK(); }
240 Status Visit(const StructType& type) { return Status::OK(); }
241 Status Visit(const UnionType& type) {
242 out_->buffers[1] = data_->buffers[1];
243 if (type.mode() == UnionMode::DENSE) {
244 RETURN_NOT_OK(SwapOffsets<int32_t>(2));
245 }
246 return Status::OK();
247 }
248
249 template <typename T>
250 enable_if_t<std::is_same<BinaryType, T>::value || std::is_same<StringType, T>::value,
251 Status>
252 Visit(const T& type) {
253 RETURN_NOT_OK(SwapOffsets<int32_t>(1));
254 out_->buffers[2] = data_->buffers[2];
255 return Status::OK();
256 }
257
258 template <typename T>
259 enable_if_t<std::is_same<LargeBinaryType, T>::value ||
260 std::is_same<LargeStringType, T>::value,
261 Status>
262 Visit(const T& type) {
263 RETURN_NOT_OK(SwapOffsets<int64_t>(1));
264 out_->buffers[2] = data_->buffers[2];
265 return Status::OK();
266 }
267
268 Status Visit(const ListType& type) {
269 RETURN_NOT_OK(SwapOffsets<int32_t>(1));
270 return Status::OK();
271 }
272 Status Visit(const LargeListType& type) {
273 RETURN_NOT_OK(SwapOffsets<int64_t>(1));
274 return Status::OK();
275 }
276
277 Status Visit(const DictionaryType& type) {
278 // dictionary was already swapped in ReadDictionary() in ipc/reader.cc
279 RETURN_NOT_OK(SwapType(*type.index_type()));
280 return Status::OK();
281 }
282
283 Status Visit(const ExtensionType& type) {
284 RETURN_NOT_OK(SwapType(*type.storage_type()));
285 return Status::OK();
286 }
287
288 const std::shared_ptr<ArrayData>& data_;
289 std::shared_ptr<ArrayData> out_;
290};
291
292} // namespace
293
294namespace internal {
295
296Result<std::shared_ptr<ArrayData>> SwapEndianArrayData(
297 const std::shared_ptr<ArrayData>& data) {
298 if (data->offset != 0) {
299 return Status::Invalid("Unsupported data format: data.offset != 0");
300 }
301 ArrayDataEndianSwapper swapper(data);
302 RETURN_NOT_OK(swapper.SwapType(*data->type));
303 return std::move(swapper.out_);
304}
305
306} // namespace internal
307
308std::shared_ptr<Array> MakeArray(const std::shared_ptr<ArrayData>& data) {
309 std::shared_ptr<Array> out;
310 ArrayDataWrapper wrapper_visitor(data, &out);
311 DCHECK_OK(VisitTypeInline(*data->type, &wrapper_visitor));
312 DCHECK(out);
313 return out;
314}
315
316// ----------------------------------------------------------------------
317// Misc APIs
318
319namespace {
320
321// get the maximum buffer length required, then allocate a single zeroed buffer
322// to use anywhere a buffer is required
323class NullArrayFactory {
324 public:
325 struct GetBufferLength {
326 GetBufferLength(const std::shared_ptr<DataType>& type, int64_t length)
327 : type_(*type), length_(length), buffer_length_(BitUtil::BytesForBits(length)) {}
328
329 Result<int64_t> Finish() && {
330 RETURN_NOT_OK(VisitTypeInline(type_, this));
331 return buffer_length_;
332 }
333
334 template <typename T, typename = decltype(TypeTraits<T>::bytes_required(0))>
335 Status Visit(const T&) {
336 return MaxOf(TypeTraits<T>::bytes_required(length_));
337 }
338
339 template <typename T>
340 enable_if_var_size_list<T, Status> Visit(const T&) {
341 // values array may be empty, but there must be at least one offset of 0
342 return MaxOf(sizeof(typename T::offset_type) * (length_ + 1));
343 }
344
345 template <typename T>
346 enable_if_base_binary<T, Status> Visit(const T&) {
347 // values buffer may be empty, but there must be at least one offset of 0
348 return MaxOf(sizeof(typename T::offset_type) * (length_ + 1));
349 }
350
351 Status Visit(const FixedSizeListType& type) {
352 return MaxOf(GetBufferLength(type.value_type(), type.list_size() * length_));
353 }
354
355 Status Visit(const FixedSizeBinaryType& type) {
356 return MaxOf(type.byte_width() * length_);
357 }
358
359 Status Visit(const StructType& type) {
360 for (const auto& child : type.fields()) {
361 RETURN_NOT_OK(MaxOf(GetBufferLength(child->type(), length_)));
362 }
363 return Status::OK();
364 }
365
366 Status Visit(const UnionType& type) {
367 // type codes
368 RETURN_NOT_OK(MaxOf(length_));
369 if (type.mode() == UnionMode::DENSE) {
370 // offsets
371 RETURN_NOT_OK(MaxOf(sizeof(int32_t) * length_));
372 }
373 for (const auto& child : type.fields()) {
374 RETURN_NOT_OK(MaxOf(GetBufferLength(child->type(), length_)));
375 }
376 return Status::OK();
377 }
378
379 Status Visit(const DictionaryType& type) {
380 RETURN_NOT_OK(MaxOf(GetBufferLength(type.value_type(), length_)));
381 return MaxOf(GetBufferLength(type.index_type(), length_));
382 }
383
384 Status Visit(const ExtensionType& type) {
385 // XXX is an extension array's length always == storage length
386 return MaxOf(GetBufferLength(type.storage_type(), length_));
387 }
388
389 Status Visit(const DataType& type) {
390 return Status::NotImplemented("construction of all-null ", type);
391 }
392
393 private:
394 Status MaxOf(GetBufferLength&& other) {
395 ARROW_ASSIGN_OR_RAISE(int64_t buffer_length, std::move(other).Finish());
396 return MaxOf(buffer_length);
397 }
398
399 Status MaxOf(int64_t buffer_length) {
400 if (buffer_length > buffer_length_) {
401 buffer_length_ = buffer_length;
402 }
403 return Status::OK();
404 }
405
406 const DataType& type_;
407 int64_t length_, buffer_length_;
408 };
409
410 NullArrayFactory(MemoryPool* pool, const std::shared_ptr<DataType>& type,
411 int64_t length)
412 : pool_(pool), type_(type), length_(length) {}
413
414 Status CreateBuffer() {
415 ARROW_ASSIGN_OR_RAISE(int64_t buffer_length,
416 GetBufferLength(type_, length_).Finish());
417 ARROW_ASSIGN_OR_RAISE(buffer_, AllocateBuffer(buffer_length, pool_));
418 std::memset(buffer_->mutable_data(), 0, buffer_->size());
419 return Status::OK();
420 }
421
422 Result<std::shared_ptr<ArrayData>> Create() {
423 if (buffer_ == nullptr) {
424 RETURN_NOT_OK(CreateBuffer());
425 }
426 std::vector<std::shared_ptr<ArrayData>> child_data(type_->num_fields());
427 out_ = ArrayData::Make(type_, length_, {buffer_}, child_data, length_, 0);
428 RETURN_NOT_OK(VisitTypeInline(*type_, this));
429 return out_;
430 }
431
432 Status Visit(const NullType&) {
433 out_->buffers.resize(1, nullptr);
434 return Status::OK();
435 }
436
437 Status Visit(const FixedWidthType&) {
438 out_->buffers.resize(2, buffer_);
439 return Status::OK();
440 }
441
442 template <typename T>
443 enable_if_base_binary<T, Status> Visit(const T&) {
444 out_->buffers.resize(3, buffer_);
445 return Status::OK();
446 }
447
448 template <typename T>
449 enable_if_var_size_list<T, Status> Visit(const T& type) {
450 out_->buffers.resize(2, buffer_);
451 ARROW_ASSIGN_OR_RAISE(out_->child_data[0], CreateChild(0, /*length=*/0));
452 return Status::OK();
453 }
454
455 Status Visit(const FixedSizeListType& type) {
456 ARROW_ASSIGN_OR_RAISE(out_->child_data[0],
457 CreateChild(0, length_ * type.list_size()));
458 return Status::OK();
459 }
460
461 Status Visit(const StructType& type) {
462 for (int i = 0; i < type_->num_fields(); ++i) {
463 ARROW_ASSIGN_OR_RAISE(out_->child_data[i], CreateChild(i, length_));
464 }
465 return Status::OK();
466 }
467
468 Status Visit(const UnionType& type) {
469 out_->buffers.resize(2);
470
471 // First buffer is always null
472 out_->buffers[0] = nullptr;
473
474 out_->buffers[1] = buffer_;
475 // buffer_ is zeroed, but 0 may not be a valid type code
476 if (type.type_codes()[0] != 0) {
477 ARROW_ASSIGN_OR_RAISE(out_->buffers[1], AllocateBuffer(length_, pool_));
478 std::memset(out_->buffers[1]->mutable_data(), type.type_codes()[0], length_);
479 }
480
481 // For sparse unions, we now create children with the same length as the
482 // parent
483 int64_t child_length = length_;
484 if (type.mode() == UnionMode::DENSE) {
485 // For dense unions, we set the offsets to all zero and create children
486 // with length 1
487 out_->buffers.resize(3);
488 out_->buffers[2] = buffer_;
489
490 child_length = 1;
491 }
492 for (int i = 0; i < type_->num_fields(); ++i) {
493 ARROW_ASSIGN_OR_RAISE(out_->child_data[i], CreateChild(i, child_length));
494 }
495 return Status::OK();
496 }
497
498 Status Visit(const DictionaryType& type) {
499 out_->buffers.resize(2, buffer_);
500 ARROW_ASSIGN_OR_RAISE(auto typed_null_dict, MakeArrayOfNull(type.value_type(), 0));
501 out_->dictionary = typed_null_dict->data();
502 return Status::OK();
503 }
504
505 Status Visit(const ExtensionType& type) {
506 RETURN_NOT_OK(VisitTypeInline(*type.storage_type(), this));
507 return Status::OK();
508 }
509
510 Status Visit(const DataType& type) {
511 return Status::NotImplemented("construction of all-null ", type);
512 }
513
514 Result<std::shared_ptr<ArrayData>> CreateChild(int i, int64_t length) {
515 NullArrayFactory child_factory(pool_, type_->field(i)->type(), length);
516 child_factory.buffer_ = buffer_;
517 return child_factory.Create();
518 }
519
520 MemoryPool* pool_;
521 std::shared_ptr<DataType> type_;
522 int64_t length_;
523 std::shared_ptr<ArrayData> out_;
524 std::shared_ptr<Buffer> buffer_;
525};
526
527class RepeatedArrayFactory {
528 public:
529 RepeatedArrayFactory(MemoryPool* pool, const Scalar& scalar, int64_t length)
530 : pool_(pool), scalar_(scalar), length_(length) {}
531
532 Result<std::shared_ptr<Array>> Create() {
533 RETURN_NOT_OK(VisitTypeInline(*scalar_.type, this));
534 return out_;
535 }
536
537 Status Visit(const NullType& type) {
538 DCHECK(false); // already forwarded to MakeArrayOfNull
539 return Status::OK();
540 }
541
542 Status Visit(const BooleanType&) {
543 ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateBitmap(length_, pool_));
544 BitUtil::SetBitsTo(buffer->mutable_data(), 0, length_,
545 checked_cast<const BooleanScalar&>(scalar_).value);
546 out_ = std::make_shared<BooleanArray>(length_, buffer);
547 return Status::OK();
548 }
549
550 template <typename T>
551 enable_if_t<is_number_type<T>::value || is_temporal_type<T>::value, Status> Visit(
552 const T&) {
553 auto value = checked_cast<const typename TypeTraits<T>::ScalarType&>(scalar_).value;
554 return FinishFixedWidth(&value, sizeof(value));
555 }
556
557 Status Visit(const FixedSizeBinaryType& type) {
558 auto value = checked_cast<const FixedSizeBinaryScalar&>(scalar_).value;
559 return FinishFixedWidth(value->data(), type.byte_width());
560 }
561
562 template <typename T>
563 enable_if_decimal<T, Status> Visit(const T&) {
564 using ScalarType = typename TypeTraits<T>::ScalarType;
565 auto value = checked_cast<const ScalarType&>(scalar_).value.ToBytes();
566 return FinishFixedWidth(value.data(), value.size());
567 }
568
569 Status Visit(const Decimal256Type&) {
570 auto value = checked_cast<const Decimal256Scalar&>(scalar_).value.ToBytes();
571 return FinishFixedWidth(value.data(), value.size());
572 }
573
574 template <typename T>
575 enable_if_base_binary<T, Status> Visit(const T&) {
576 std::shared_ptr<Buffer> value =
577 checked_cast<const typename TypeTraits<T>::ScalarType&>(scalar_).value;
578 std::shared_ptr<Buffer> values_buffer, offsets_buffer;
579 RETURN_NOT_OK(CreateBufferOf(value->data(), value->size(), &values_buffer));
580 auto size = static_cast<typename T::offset_type>(value->size());
581 RETURN_NOT_OK(CreateOffsetsBuffer(size, &offsets_buffer));
582 out_ = std::make_shared<typename TypeTraits<T>::ArrayType>(length_, offsets_buffer,
583 values_buffer);
584 return Status::OK();
585 }
586
587 template <typename T>
588 enable_if_var_size_list<T, Status> Visit(const T& type) {
589 using ScalarType = typename TypeTraits<T>::ScalarType;
590 using ArrayType = typename TypeTraits<T>::ArrayType;
591
592 auto value = checked_cast<const ScalarType&>(scalar_).value;
593
594 ArrayVector values(length_, value);
595 ARROW_ASSIGN_OR_RAISE(auto value_array, Concatenate(values, pool_));
596
597 std::shared_ptr<Buffer> offsets_buffer;
598 auto size = static_cast<typename T::offset_type>(value->length());
599 RETURN_NOT_OK(CreateOffsetsBuffer(size, &offsets_buffer));
600
601 out_ =
602 std::make_shared<ArrayType>(scalar_.type, length_, offsets_buffer, value_array);
603 return Status::OK();
604 }
605
606 Status Visit(const FixedSizeListType& type) {
607 auto value = checked_cast<const FixedSizeListScalar&>(scalar_).value;
608
609 ArrayVector values(length_, value);
610 ARROW_ASSIGN_OR_RAISE(auto value_array, Concatenate(values, pool_));
611
612 out_ = std::make_shared<FixedSizeListArray>(scalar_.type, length_, value_array);
613 return Status::OK();
614 }
615
616 Status Visit(const MapType& type) {
617 auto map_scalar = checked_cast<const MapScalar&>(scalar_);
618 auto struct_array = checked_cast<const StructArray*>(map_scalar.value.get());
619
620 ArrayVector keys(length_, struct_array->field(0));
621 ArrayVector values(length_, struct_array->field(1));
622
623 ARROW_ASSIGN_OR_RAISE(auto key_array, Concatenate(keys, pool_));
624 ARROW_ASSIGN_OR_RAISE(auto value_array, Concatenate(values, pool_));
625
626 std::shared_ptr<Buffer> offsets_buffer;
627 auto size = static_cast<typename MapType::offset_type>(struct_array->length());
628 RETURN_NOT_OK(CreateOffsetsBuffer(size, &offsets_buffer));
629
630 out_ = std::make_shared<MapArray>(scalar_.type, length_, std::move(offsets_buffer),
631 std::move(key_array), std::move(value_array));
632 return Status::OK();
633 }
634
635 Status Visit(const DictionaryType& type) {
636 const auto& value = checked_cast<const DictionaryScalar&>(scalar_).value;
637 ARROW_ASSIGN_OR_RAISE(auto indices,
638 MakeArrayFromScalar(*value.index, length_, pool_));
639 out_ = std::make_shared<DictionaryArray>(scalar_.type, std::move(indices),
640 value.dictionary);
641 return Status::OK();
642 }
643
644 Status Visit(const StructType& type) {
645 ArrayVector fields;
646 for (const auto& value : checked_cast<const StructScalar&>(scalar_).value) {
647 fields.emplace_back();
648 ARROW_ASSIGN_OR_RAISE(fields.back(), MakeArrayFromScalar(*value, length_, pool_));
649 }
650 out_ = std::make_shared<StructArray>(scalar_.type, length_, std::move(fields));
651 return Status::OK();
652 }
653
654 Status Visit(const SparseUnionType& type) {
655 const auto& union_scalar = checked_cast<const UnionScalar&>(scalar_);
656 const auto& union_type = checked_cast<const UnionType&>(*scalar_.type);
657 const auto scalar_type_code = union_scalar.type_code;
658 const auto scalar_child_id = union_type.child_ids()[scalar_type_code];
659
660 // Create child arrays: most of them are all-null, except for the child array
661 // for the given type code (if the scalar is valid).
662 ArrayVector fields;
663 for (int i = 0; i < type.num_fields(); ++i) {
664 fields.emplace_back();
665 if (i == scalar_child_id && scalar_.is_valid) {
666 ARROW_ASSIGN_OR_RAISE(fields.back(),
667 MakeArrayFromScalar(*union_scalar.value, length_, pool_));
668 } else {
669 ARROW_ASSIGN_OR_RAISE(
670 fields.back(), MakeArrayOfNull(union_type.field(i)->type(), length_, pool_));
671 }
672 }
673
674 ARROW_ASSIGN_OR_RAISE(auto type_codes_buffer, CreateUnionTypeCodes(scalar_type_code));
675
676 out_ = std::make_shared<SparseUnionArray>(scalar_.type, length_, std::move(fields),
677 std::move(type_codes_buffer));
678 return Status::OK();
679 }
680
681 Status Visit(const DenseUnionType& type) {
682 const auto& union_scalar = checked_cast<const UnionScalar&>(scalar_);
683 const auto& union_type = checked_cast<const UnionType&>(*scalar_.type);
684 const auto scalar_type_code = union_scalar.type_code;
685 const auto scalar_child_id = union_type.child_ids()[scalar_type_code];
686
687 // Create child arrays: all of them are empty, except for the child array
688 // for the given type code (if length > 0).
689 ArrayVector fields;
690 for (int i = 0; i < type.num_fields(); ++i) {
691 fields.emplace_back();
692 if (i == scalar_child_id && length_ > 0) {
693 if (scalar_.is_valid) {
694 // One valid element (will be referenced by multiple offsets)
695 ARROW_ASSIGN_OR_RAISE(fields.back(),
696 MakeArrayFromScalar(*union_scalar.value, 1, pool_));
697 } else {
698 // One null element (will be referenced by multiple offsets)
699 ARROW_ASSIGN_OR_RAISE(fields.back(),
700 MakeArrayOfNull(union_type.field(i)->type(), 1, pool_));
701 }
702 } else {
703 // Zero element (will not be referenced by any offset)
704 ARROW_ASSIGN_OR_RAISE(fields.back(),
705 MakeArrayOfNull(union_type.field(i)->type(), 0, pool_));
706 }
707 }
708
709 // Create an offsets buffer with all offsets equal to 0
710 ARROW_ASSIGN_OR_RAISE(auto offsets_buffer,
711 AllocateBuffer(length_ * sizeof(int32_t), pool_));
712 memset(offsets_buffer->mutable_data(), 0, offsets_buffer->size());
713
714 ARROW_ASSIGN_OR_RAISE(auto type_codes_buffer, CreateUnionTypeCodes(scalar_type_code));
715
716 out_ = std::make_shared<DenseUnionArray>(scalar_.type, length_, std::move(fields),
717 std::move(type_codes_buffer),
718 std::move(offsets_buffer));
719 return Status::OK();
720 }
721
722 Status Visit(const ExtensionType& type) {
723 return Status::NotImplemented("construction from scalar of type ", *scalar_.type);
724 }
725
726 Result<std::shared_ptr<Buffer>> CreateUnionTypeCodes(int8_t type_code) {
727 TypedBufferBuilder<int8_t> builder(pool_);
728 RETURN_NOT_OK(builder.Resize(length_));
729 builder.UnsafeAppend(length_, type_code);
730 return builder.Finish();
731 }
732
733 template <typename OffsetType>
734 Status CreateOffsetsBuffer(OffsetType value_length, std::shared_ptr<Buffer>* out) {
735 TypedBufferBuilder<OffsetType> builder(pool_);
736 RETURN_NOT_OK(builder.Resize(length_ + 1));
737 OffsetType offset = 0;
738 for (int64_t i = 0; i < length_ + 1; ++i, offset += value_length) {
739 builder.UnsafeAppend(offset);
740 }
741 return builder.Finish(out);
742 }
743
744 Status CreateBufferOf(const void* data, size_t data_length,
745 std::shared_ptr<Buffer>* out) {
746 BufferBuilder builder(pool_);
747 RETURN_NOT_OK(builder.Resize(length_ * data_length));
748 for (int64_t i = 0; i < length_; ++i) {
749 builder.UnsafeAppend(data, data_length);
750 }
751 return builder.Finish(out);
752 }
753
754 Status FinishFixedWidth(const void* data, size_t data_length) {
755 std::shared_ptr<Buffer> buffer;
756 RETURN_NOT_OK(CreateBufferOf(data, data_length, &buffer));
757 out_ = MakeArray(
758 ArrayData::Make(scalar_.type, length_, {nullptr, std::move(buffer)}, 0));
759 return Status::OK();
760 }
761
762 MemoryPool* pool_;
763 const Scalar& scalar_;
764 int64_t length_;
765 std::shared_ptr<Array> out_;
766};
767
768} // namespace
769
770Result<std::shared_ptr<Array>> MakeArrayOfNull(const std::shared_ptr<DataType>& type,
771 int64_t length, MemoryPool* pool) {
772 ARROW_ASSIGN_OR_RAISE(auto data, NullArrayFactory(pool, type, length).Create());
773 return MakeArray(data);
774}
775
776Result<std::shared_ptr<Array>> MakeArrayFromScalar(const Scalar& scalar, int64_t length,
777 MemoryPool* pool) {
778 // Null union scalars still have a type code associated
779 if (!scalar.is_valid && !is_union(scalar.type->id())) {
780 return MakeArrayOfNull(scalar.type, length, pool);
781 }
782 return RepeatedArrayFactory(pool, scalar, length).Create();
783}
784
785namespace internal {
786
787std::vector<ArrayVector> RechunkArraysConsistently(
788 const std::vector<ArrayVector>& groups) {
789 if (groups.size() <= 1) {
790 return groups;
791 }
792 int64_t total_length = 0;
793 for (const auto& array : groups.front()) {
794 total_length += array->length();
795 }
796#ifndef NDEBUG
797 for (const auto& group : groups) {
798 int64_t group_length = 0;
799 for (const auto& array : group) {
800 group_length += array->length();
801 }
802 DCHECK_EQ(group_length, total_length)
803 << "Array groups should have the same total number of elements";
804 }
805#endif
806 if (total_length == 0) {
807 return groups;
808 }
809
810 // Set up result vectors
811 std::vector<ArrayVector> rechunked_groups(groups.size());
812
813 // Set up progress counters
814 std::vector<ArrayVector::const_iterator> current_arrays;
815 std::vector<int64_t> array_offsets;
816 for (const auto& group : groups) {
817 current_arrays.emplace_back(group.cbegin());
818 array_offsets.emplace_back(0);
819 }
820
821 // Scan all array vectors at once, rechunking along the way
822 int64_t start = 0;
823 while (start < total_length) {
824 // First compute max possible length for next chunk
825 int64_t chunk_length = std::numeric_limits<int64_t>::max();
826 for (size_t i = 0; i < groups.size(); i++) {
827 auto& arr_it = current_arrays[i];
828 auto& offset = array_offsets[i];
829 // Skip any done arrays (including 0-length arrays)
830 while (offset == (*arr_it)->length()) {
831 ++arr_it;
832 offset = 0;
833 }
834 const auto& array = *arr_it;
835 DCHECK_GT(array->length(), offset);
836 chunk_length = std::min(chunk_length, array->length() - offset);
837 }
838 DCHECK_GT(chunk_length, 0);
839
840 // Then slice all arrays along this chunk size
841 for (size_t i = 0; i < groups.size(); i++) {
842 const auto& array = *current_arrays[i];
843 auto& offset = array_offsets[i];
844 if (offset == 0 && array->length() == chunk_length) {
845 // Slice spans entire array
846 rechunked_groups[i].emplace_back(array);
847 } else {
848 DCHECK_LT(chunk_length - offset, array->length());
849 rechunked_groups[i].emplace_back(array->Slice(offset, chunk_length));
850 }
851 offset += chunk_length;
852 }
853 start += chunk_length;
854 }
855
856 return rechunked_groups;
857}
858
859} // namespace internal
860} // namespace arrow