]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | #include "arrow/array/util.h" | |
19 | ||
20 | #include <algorithm> | |
21 | #include <array> | |
22 | #include <cstdint> | |
23 | #include <cstring> | |
24 | #include <limits> | |
25 | #include <memory> | |
26 | #include <type_traits> | |
27 | #include <utility> | |
28 | #include <vector> | |
29 | ||
30 | #include "arrow/array/array_base.h" | |
31 | #include "arrow/array/array_dict.h" | |
32 | #include "arrow/array/array_primitive.h" | |
33 | #include "arrow/array/concatenate.h" | |
34 | #include "arrow/buffer.h" | |
35 | #include "arrow/buffer_builder.h" | |
36 | #include "arrow/extension_type.h" | |
37 | #include "arrow/result.h" | |
38 | #include "arrow/scalar.h" | |
39 | #include "arrow/status.h" | |
40 | #include "arrow/type.h" | |
41 | #include "arrow/type_traits.h" | |
42 | #include "arrow/util/bit_util.h" | |
43 | #include "arrow/util/checked_cast.h" | |
44 | #include "arrow/util/decimal.h" | |
45 | #include "arrow/util/endian.h" | |
46 | #include "arrow/util/logging.h" | |
47 | #include "arrow/visitor_inline.h" | |
48 | ||
49 | namespace arrow { | |
50 | ||
51 | using internal::checked_cast; | |
52 | ||
53 | // ---------------------------------------------------------------------- | |
54 | // Loading from ArrayData | |
55 | ||
56 | namespace { | |
57 | ||
58 | class ArrayDataWrapper { | |
59 | public: | |
60 | ArrayDataWrapper(const std::shared_ptr<ArrayData>& data, std::shared_ptr<Array>* out) | |
61 | : data_(data), out_(out) {} | |
62 | ||
63 | template <typename T> | |
64 | Status Visit(const T&) { | |
65 | using ArrayType = typename TypeTraits<T>::ArrayType; | |
66 | *out_ = std::make_shared<ArrayType>(data_); | |
67 | return Status::OK(); | |
68 | } | |
69 | ||
70 | Status Visit(const ExtensionType& type) { | |
71 | *out_ = type.MakeArray(data_); | |
72 | return Status::OK(); | |
73 | } | |
74 | ||
75 | const std::shared_ptr<ArrayData>& data_; | |
76 | std::shared_ptr<Array>* out_; | |
77 | }; | |
78 | ||
79 | class ArrayDataEndianSwapper { | |
80 | public: | |
81 | explicit ArrayDataEndianSwapper(const std::shared_ptr<ArrayData>& data) : data_(data) { | |
82 | out_ = data->Copy(); | |
83 | } | |
84 | ||
85 | // WARNING: this facility can be called on invalid Array data by the IPC reader. | |
86 | // Do not rely on the advertised ArrayData length, instead use the physical | |
87 | // buffer sizes to avoid accessing memory out of bounds. | |
88 | // | |
89 | // (If this guarantee turns out to be difficult to maintain, we should call | |
90 | // Validate() instead) | |
91 | Status SwapType(const DataType& type) { | |
92 | RETURN_NOT_OK(VisitTypeInline(type, this)); | |
93 | RETURN_NOT_OK(SwapChildren(type.fields())); | |
94 | if (internal::HasValidityBitmap(type.id())) { | |
95 | // Copy null bitmap | |
96 | out_->buffers[0] = data_->buffers[0]; | |
97 | } | |
98 | return Status::OK(); | |
99 | } | |
100 | ||
101 | Status SwapChildren(const FieldVector& child_fields) { | |
102 | for (size_t i = 0; i < child_fields.size(); i++) { | |
103 | ARROW_ASSIGN_OR_RAISE(out_->child_data[i], | |
104 | internal::SwapEndianArrayData(data_->child_data[i])); | |
105 | } | |
106 | return Status::OK(); | |
107 | } | |
108 | ||
109 | template <typename T> | |
110 | Result<std::shared_ptr<Buffer>> ByteSwapBuffer( | |
111 | const std::shared_ptr<Buffer>& in_buffer) { | |
112 | if (sizeof(T) == 1) { | |
113 | // if data size is 1, element is not swapped. We can use the original buffer | |
114 | return in_buffer; | |
115 | } | |
116 | auto in_data = reinterpret_cast<const T*>(in_buffer->data()); | |
117 | ARROW_ASSIGN_OR_RAISE(auto out_buffer, AllocateBuffer(in_buffer->size())); | |
118 | auto out_data = reinterpret_cast<T*>(out_buffer->mutable_data()); | |
119 | // NOTE: data_->length not trusted (see warning above) | |
120 | int64_t length = in_buffer->size() / sizeof(T); | |
121 | for (int64_t i = 0; i < length; i++) { | |
122 | out_data[i] = BitUtil::ByteSwap(in_data[i]); | |
123 | } | |
124 | return std::move(out_buffer); | |
125 | } | |
126 | ||
127 | template <typename VALUE_TYPE> | |
128 | Status SwapOffsets(int index) { | |
129 | if (data_->buffers[index] == nullptr || data_->buffers[index]->size() == 0) { | |
130 | out_->buffers[index] = data_->buffers[index]; | |
131 | return Status::OK(); | |
132 | } | |
133 | // Except union, offset has one more element rather than data->length | |
134 | ARROW_ASSIGN_OR_RAISE(out_->buffers[index], | |
135 | ByteSwapBuffer<VALUE_TYPE>(data_->buffers[index])); | |
136 | return Status::OK(); | |
137 | } | |
138 | ||
139 | template <typename T> | |
140 | enable_if_t<std::is_base_of<FixedWidthType, T>::value && | |
141 | !std::is_base_of<FixedSizeBinaryType, T>::value && | |
142 | !std::is_base_of<DictionaryType, T>::value, | |
143 | Status> | |
144 | Visit(const T& type) { | |
145 | using value_type = typename T::c_type; | |
146 | ARROW_ASSIGN_OR_RAISE(out_->buffers[1], | |
147 | ByteSwapBuffer<value_type>(data_->buffers[1])); | |
148 | return Status::OK(); | |
149 | } | |
150 | ||
151 | Status Visit(const Decimal128Type& type) { | |
152 | auto data = reinterpret_cast<const uint64_t*>(data_->buffers[1]->data()); | |
153 | ARROW_ASSIGN_OR_RAISE(auto new_buffer, AllocateBuffer(data_->buffers[1]->size())); | |
154 | auto new_data = reinterpret_cast<uint64_t*>(new_buffer->mutable_data()); | |
155 | // NOTE: data_->length not trusted (see warning above) | |
156 | const int64_t length = data_->buffers[1]->size() / Decimal128Type::kByteWidth; | |
157 | for (int64_t i = 0; i < length; i++) { | |
158 | uint64_t tmp; | |
159 | auto idx = i * 2; | |
160 | #if ARROW_LITTLE_ENDIAN | |
161 | tmp = BitUtil::FromBigEndian(data[idx]); | |
162 | new_data[idx] = BitUtil::FromBigEndian(data[idx + 1]); | |
163 | new_data[idx + 1] = tmp; | |
164 | #else | |
165 | tmp = BitUtil::FromLittleEndian(data[idx]); | |
166 | new_data[idx] = BitUtil::FromLittleEndian(data[idx + 1]); | |
167 | new_data[idx + 1] = tmp; | |
168 | #endif | |
169 | } | |
170 | out_->buffers[1] = std::move(new_buffer); | |
171 | return Status::OK(); | |
172 | } | |
173 | ||
174 | Status Visit(const Decimal256Type& type) { | |
175 | auto data = reinterpret_cast<const uint64_t*>(data_->buffers[1]->data()); | |
176 | ARROW_ASSIGN_OR_RAISE(auto new_buffer, AllocateBuffer(data_->buffers[1]->size())); | |
177 | auto new_data = reinterpret_cast<uint64_t*>(new_buffer->mutable_data()); | |
178 | // NOTE: data_->length not trusted (see warning above) | |
179 | const int64_t length = data_->buffers[1]->size() / Decimal256Type::kByteWidth; | |
180 | for (int64_t i = 0; i < length; i++) { | |
181 | uint64_t tmp0, tmp1, tmp2; | |
182 | auto idx = i * 4; | |
183 | #if ARROW_LITTLE_ENDIAN | |
184 | tmp0 = BitUtil::FromBigEndian(data[idx]); | |
185 | tmp1 = BitUtil::FromBigEndian(data[idx + 1]); | |
186 | tmp2 = BitUtil::FromBigEndian(data[idx + 2]); | |
187 | new_data[idx] = BitUtil::FromBigEndian(data[idx + 3]); | |
188 | new_data[idx + 1] = tmp2; | |
189 | new_data[idx + 2] = tmp1; | |
190 | new_data[idx + 3] = tmp0; | |
191 | #else | |
192 | tmp0 = BitUtil::FromLittleEndian(data[idx]); | |
193 | tmp1 = BitUtil::FromLittleEndian(data[idx + 1]); | |
194 | tmp2 = BitUtil::FromLittleEndian(data[idx + 2]); | |
195 | new_data[idx] = BitUtil::FromLittleEndian(data[idx + 3]); | |
196 | new_data[idx + 1] = tmp2; | |
197 | new_data[idx + 2] = tmp1; | |
198 | new_data[idx + 3] = tmp0; | |
199 | #endif | |
200 | } | |
201 | out_->buffers[1] = std::move(new_buffer); | |
202 | return Status::OK(); | |
203 | } | |
204 | ||
205 | Status Visit(const DayTimeIntervalType& type) { | |
206 | ARROW_ASSIGN_OR_RAISE(out_->buffers[1], ByteSwapBuffer<uint32_t>(data_->buffers[1])); | |
207 | return Status::OK(); | |
208 | } | |
209 | ||
210 | Status Visit(const MonthDayNanoIntervalType& type) { | |
211 | using MonthDayNanos = MonthDayNanoIntervalType::MonthDayNanos; | |
212 | auto data = reinterpret_cast<const MonthDayNanos*>(data_->buffers[1]->data()); | |
213 | ARROW_ASSIGN_OR_RAISE(auto new_buffer, AllocateBuffer(data_->buffers[1]->size())); | |
214 | auto new_data = reinterpret_cast<MonthDayNanos*>(new_buffer->mutable_data()); | |
215 | // NOTE: data_->length not trusted (see warning above) | |
216 | const int64_t length = data_->buffers[1]->size() / sizeof(MonthDayNanos); | |
217 | for (int64_t i = 0; i < length; i++) { | |
218 | MonthDayNanos tmp = data[i]; | |
219 | #if ARROW_LITTLE_ENDIAN | |
220 | tmp.months = BitUtil::FromBigEndian(tmp.months); | |
221 | tmp.days = BitUtil::FromBigEndian(tmp.days); | |
222 | tmp.nanoseconds = BitUtil::FromBigEndian(tmp.nanoseconds); | |
223 | #else | |
224 | tmp.months = BitUtil::FromLittleEndian(tmp.months); | |
225 | tmp.days = BitUtil::FromLittleEndian(tmp.days); | |
226 | tmp.nanoseconds = BitUtil::FromLittleEndian(tmp.nanoseconds); | |
227 | #endif | |
228 | new_data[i] = tmp; | |
229 | } | |
230 | out_->buffers[1] = std::move(new_buffer); | |
231 | return Status::OK(); | |
232 | } | |
233 | ||
234 | Status Visit(const NullType& type) { return Status::OK(); } | |
235 | Status Visit(const BooleanType& type) { return Status::OK(); } | |
236 | Status Visit(const Int8Type& type) { return Status::OK(); } | |
237 | Status Visit(const UInt8Type& type) { return Status::OK(); } | |
238 | Status Visit(const FixedSizeBinaryType& type) { return Status::OK(); } | |
239 | Status Visit(const FixedSizeListType& type) { return Status::OK(); } | |
240 | Status Visit(const StructType& type) { return Status::OK(); } | |
241 | Status Visit(const UnionType& type) { | |
242 | out_->buffers[1] = data_->buffers[1]; | |
243 | if (type.mode() == UnionMode::DENSE) { | |
244 | RETURN_NOT_OK(SwapOffsets<int32_t>(2)); | |
245 | } | |
246 | return Status::OK(); | |
247 | } | |
248 | ||
249 | template <typename T> | |
250 | enable_if_t<std::is_same<BinaryType, T>::value || std::is_same<StringType, T>::value, | |
251 | Status> | |
252 | Visit(const T& type) { | |
253 | RETURN_NOT_OK(SwapOffsets<int32_t>(1)); | |
254 | out_->buffers[2] = data_->buffers[2]; | |
255 | return Status::OK(); | |
256 | } | |
257 | ||
258 | template <typename T> | |
259 | enable_if_t<std::is_same<LargeBinaryType, T>::value || | |
260 | std::is_same<LargeStringType, T>::value, | |
261 | Status> | |
262 | Visit(const T& type) { | |
263 | RETURN_NOT_OK(SwapOffsets<int64_t>(1)); | |
264 | out_->buffers[2] = data_->buffers[2]; | |
265 | return Status::OK(); | |
266 | } | |
267 | ||
268 | Status Visit(const ListType& type) { | |
269 | RETURN_NOT_OK(SwapOffsets<int32_t>(1)); | |
270 | return Status::OK(); | |
271 | } | |
272 | Status Visit(const LargeListType& type) { | |
273 | RETURN_NOT_OK(SwapOffsets<int64_t>(1)); | |
274 | return Status::OK(); | |
275 | } | |
276 | ||
277 | Status Visit(const DictionaryType& type) { | |
278 | // dictionary was already swapped in ReadDictionary() in ipc/reader.cc | |
279 | RETURN_NOT_OK(SwapType(*type.index_type())); | |
280 | return Status::OK(); | |
281 | } | |
282 | ||
283 | Status Visit(const ExtensionType& type) { | |
284 | RETURN_NOT_OK(SwapType(*type.storage_type())); | |
285 | return Status::OK(); | |
286 | } | |
287 | ||
288 | const std::shared_ptr<ArrayData>& data_; | |
289 | std::shared_ptr<ArrayData> out_; | |
290 | }; | |
291 | ||
292 | } // namespace | |
293 | ||
294 | namespace internal { | |
295 | ||
296 | Result<std::shared_ptr<ArrayData>> SwapEndianArrayData( | |
297 | const std::shared_ptr<ArrayData>& data) { | |
298 | if (data->offset != 0) { | |
299 | return Status::Invalid("Unsupported data format: data.offset != 0"); | |
300 | } | |
301 | ArrayDataEndianSwapper swapper(data); | |
302 | RETURN_NOT_OK(swapper.SwapType(*data->type)); | |
303 | return std::move(swapper.out_); | |
304 | } | |
305 | ||
306 | } // namespace internal | |
307 | ||
308 | std::shared_ptr<Array> MakeArray(const std::shared_ptr<ArrayData>& data) { | |
309 | std::shared_ptr<Array> out; | |
310 | ArrayDataWrapper wrapper_visitor(data, &out); | |
311 | DCHECK_OK(VisitTypeInline(*data->type, &wrapper_visitor)); | |
312 | DCHECK(out); | |
313 | return out; | |
314 | } | |
315 | ||
316 | // ---------------------------------------------------------------------- | |
317 | // Misc APIs | |
318 | ||
319 | namespace { | |
320 | ||
321 | // get the maximum buffer length required, then allocate a single zeroed buffer | |
322 | // to use anywhere a buffer is required | |
323 | class NullArrayFactory { | |
324 | public: | |
325 | struct GetBufferLength { | |
326 | GetBufferLength(const std::shared_ptr<DataType>& type, int64_t length) | |
327 | : type_(*type), length_(length), buffer_length_(BitUtil::BytesForBits(length)) {} | |
328 | ||
329 | Result<int64_t> Finish() && { | |
330 | RETURN_NOT_OK(VisitTypeInline(type_, this)); | |
331 | return buffer_length_; | |
332 | } | |
333 | ||
334 | template <typename T, typename = decltype(TypeTraits<T>::bytes_required(0))> | |
335 | Status Visit(const T&) { | |
336 | return MaxOf(TypeTraits<T>::bytes_required(length_)); | |
337 | } | |
338 | ||
339 | template <typename T> | |
340 | enable_if_var_size_list<T, Status> Visit(const T&) { | |
341 | // values array may be empty, but there must be at least one offset of 0 | |
342 | return MaxOf(sizeof(typename T::offset_type) * (length_ + 1)); | |
343 | } | |
344 | ||
345 | template <typename T> | |
346 | enable_if_base_binary<T, Status> Visit(const T&) { | |
347 | // values buffer may be empty, but there must be at least one offset of 0 | |
348 | return MaxOf(sizeof(typename T::offset_type) * (length_ + 1)); | |
349 | } | |
350 | ||
351 | Status Visit(const FixedSizeListType& type) { | |
352 | return MaxOf(GetBufferLength(type.value_type(), type.list_size() * length_)); | |
353 | } | |
354 | ||
355 | Status Visit(const FixedSizeBinaryType& type) { | |
356 | return MaxOf(type.byte_width() * length_); | |
357 | } | |
358 | ||
359 | Status Visit(const StructType& type) { | |
360 | for (const auto& child : type.fields()) { | |
361 | RETURN_NOT_OK(MaxOf(GetBufferLength(child->type(), length_))); | |
362 | } | |
363 | return Status::OK(); | |
364 | } | |
365 | ||
366 | Status Visit(const UnionType& type) { | |
367 | // type codes | |
368 | RETURN_NOT_OK(MaxOf(length_)); | |
369 | if (type.mode() == UnionMode::DENSE) { | |
370 | // offsets | |
371 | RETURN_NOT_OK(MaxOf(sizeof(int32_t) * length_)); | |
372 | } | |
373 | for (const auto& child : type.fields()) { | |
374 | RETURN_NOT_OK(MaxOf(GetBufferLength(child->type(), length_))); | |
375 | } | |
376 | return Status::OK(); | |
377 | } | |
378 | ||
379 | Status Visit(const DictionaryType& type) { | |
380 | RETURN_NOT_OK(MaxOf(GetBufferLength(type.value_type(), length_))); | |
381 | return MaxOf(GetBufferLength(type.index_type(), length_)); | |
382 | } | |
383 | ||
384 | Status Visit(const ExtensionType& type) { | |
385 | // XXX is an extension array's length always == storage length | |
386 | return MaxOf(GetBufferLength(type.storage_type(), length_)); | |
387 | } | |
388 | ||
389 | Status Visit(const DataType& type) { | |
390 | return Status::NotImplemented("construction of all-null ", type); | |
391 | } | |
392 | ||
393 | private: | |
394 | Status MaxOf(GetBufferLength&& other) { | |
395 | ARROW_ASSIGN_OR_RAISE(int64_t buffer_length, std::move(other).Finish()); | |
396 | return MaxOf(buffer_length); | |
397 | } | |
398 | ||
399 | Status MaxOf(int64_t buffer_length) { | |
400 | if (buffer_length > buffer_length_) { | |
401 | buffer_length_ = buffer_length; | |
402 | } | |
403 | return Status::OK(); | |
404 | } | |
405 | ||
406 | const DataType& type_; | |
407 | int64_t length_, buffer_length_; | |
408 | }; | |
409 | ||
410 | NullArrayFactory(MemoryPool* pool, const std::shared_ptr<DataType>& type, | |
411 | int64_t length) | |
412 | : pool_(pool), type_(type), length_(length) {} | |
413 | ||
414 | Status CreateBuffer() { | |
415 | ARROW_ASSIGN_OR_RAISE(int64_t buffer_length, | |
416 | GetBufferLength(type_, length_).Finish()); | |
417 | ARROW_ASSIGN_OR_RAISE(buffer_, AllocateBuffer(buffer_length, pool_)); | |
418 | std::memset(buffer_->mutable_data(), 0, buffer_->size()); | |
419 | return Status::OK(); | |
420 | } | |
421 | ||
422 | Result<std::shared_ptr<ArrayData>> Create() { | |
423 | if (buffer_ == nullptr) { | |
424 | RETURN_NOT_OK(CreateBuffer()); | |
425 | } | |
426 | std::vector<std::shared_ptr<ArrayData>> child_data(type_->num_fields()); | |
427 | out_ = ArrayData::Make(type_, length_, {buffer_}, child_data, length_, 0); | |
428 | RETURN_NOT_OK(VisitTypeInline(*type_, this)); | |
429 | return out_; | |
430 | } | |
431 | ||
432 | Status Visit(const NullType&) { | |
433 | out_->buffers.resize(1, nullptr); | |
434 | return Status::OK(); | |
435 | } | |
436 | ||
437 | Status Visit(const FixedWidthType&) { | |
438 | out_->buffers.resize(2, buffer_); | |
439 | return Status::OK(); | |
440 | } | |
441 | ||
442 | template <typename T> | |
443 | enable_if_base_binary<T, Status> Visit(const T&) { | |
444 | out_->buffers.resize(3, buffer_); | |
445 | return Status::OK(); | |
446 | } | |
447 | ||
448 | template <typename T> | |
449 | enable_if_var_size_list<T, Status> Visit(const T& type) { | |
450 | out_->buffers.resize(2, buffer_); | |
451 | ARROW_ASSIGN_OR_RAISE(out_->child_data[0], CreateChild(0, /*length=*/0)); | |
452 | return Status::OK(); | |
453 | } | |
454 | ||
455 | Status Visit(const FixedSizeListType& type) { | |
456 | ARROW_ASSIGN_OR_RAISE(out_->child_data[0], | |
457 | CreateChild(0, length_ * type.list_size())); | |
458 | return Status::OK(); | |
459 | } | |
460 | ||
461 | Status Visit(const StructType& type) { | |
462 | for (int i = 0; i < type_->num_fields(); ++i) { | |
463 | ARROW_ASSIGN_OR_RAISE(out_->child_data[i], CreateChild(i, length_)); | |
464 | } | |
465 | return Status::OK(); | |
466 | } | |
467 | ||
468 | Status Visit(const UnionType& type) { | |
469 | out_->buffers.resize(2); | |
470 | ||
471 | // First buffer is always null | |
472 | out_->buffers[0] = nullptr; | |
473 | ||
474 | out_->buffers[1] = buffer_; | |
475 | // buffer_ is zeroed, but 0 may not be a valid type code | |
476 | if (type.type_codes()[0] != 0) { | |
477 | ARROW_ASSIGN_OR_RAISE(out_->buffers[1], AllocateBuffer(length_, pool_)); | |
478 | std::memset(out_->buffers[1]->mutable_data(), type.type_codes()[0], length_); | |
479 | } | |
480 | ||
481 | // For sparse unions, we now create children with the same length as the | |
482 | // parent | |
483 | int64_t child_length = length_; | |
484 | if (type.mode() == UnionMode::DENSE) { | |
485 | // For dense unions, we set the offsets to all zero and create children | |
486 | // with length 1 | |
487 | out_->buffers.resize(3); | |
488 | out_->buffers[2] = buffer_; | |
489 | ||
490 | child_length = 1; | |
491 | } | |
492 | for (int i = 0; i < type_->num_fields(); ++i) { | |
493 | ARROW_ASSIGN_OR_RAISE(out_->child_data[i], CreateChild(i, child_length)); | |
494 | } | |
495 | return Status::OK(); | |
496 | } | |
497 | ||
498 | Status Visit(const DictionaryType& type) { | |
499 | out_->buffers.resize(2, buffer_); | |
500 | ARROW_ASSIGN_OR_RAISE(auto typed_null_dict, MakeArrayOfNull(type.value_type(), 0)); | |
501 | out_->dictionary = typed_null_dict->data(); | |
502 | return Status::OK(); | |
503 | } | |
504 | ||
505 | Status Visit(const ExtensionType& type) { | |
506 | RETURN_NOT_OK(VisitTypeInline(*type.storage_type(), this)); | |
507 | return Status::OK(); | |
508 | } | |
509 | ||
510 | Status Visit(const DataType& type) { | |
511 | return Status::NotImplemented("construction of all-null ", type); | |
512 | } | |
513 | ||
514 | Result<std::shared_ptr<ArrayData>> CreateChild(int i, int64_t length) { | |
515 | NullArrayFactory child_factory(pool_, type_->field(i)->type(), length); | |
516 | child_factory.buffer_ = buffer_; | |
517 | return child_factory.Create(); | |
518 | } | |
519 | ||
520 | MemoryPool* pool_; | |
521 | std::shared_ptr<DataType> type_; | |
522 | int64_t length_; | |
523 | std::shared_ptr<ArrayData> out_; | |
524 | std::shared_ptr<Buffer> buffer_; | |
525 | }; | |
526 | ||
527 | class RepeatedArrayFactory { | |
528 | public: | |
529 | RepeatedArrayFactory(MemoryPool* pool, const Scalar& scalar, int64_t length) | |
530 | : pool_(pool), scalar_(scalar), length_(length) {} | |
531 | ||
532 | Result<std::shared_ptr<Array>> Create() { | |
533 | RETURN_NOT_OK(VisitTypeInline(*scalar_.type, this)); | |
534 | return out_; | |
535 | } | |
536 | ||
537 | Status Visit(const NullType& type) { | |
538 | DCHECK(false); // already forwarded to MakeArrayOfNull | |
539 | return Status::OK(); | |
540 | } | |
541 | ||
542 | Status Visit(const BooleanType&) { | |
543 | ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateBitmap(length_, pool_)); | |
544 | BitUtil::SetBitsTo(buffer->mutable_data(), 0, length_, | |
545 | checked_cast<const BooleanScalar&>(scalar_).value); | |
546 | out_ = std::make_shared<BooleanArray>(length_, buffer); | |
547 | return Status::OK(); | |
548 | } | |
549 | ||
550 | template <typename T> | |
551 | enable_if_t<is_number_type<T>::value || is_temporal_type<T>::value, Status> Visit( | |
552 | const T&) { | |
553 | auto value = checked_cast<const typename TypeTraits<T>::ScalarType&>(scalar_).value; | |
554 | return FinishFixedWidth(&value, sizeof(value)); | |
555 | } | |
556 | ||
557 | Status Visit(const FixedSizeBinaryType& type) { | |
558 | auto value = checked_cast<const FixedSizeBinaryScalar&>(scalar_).value; | |
559 | return FinishFixedWidth(value->data(), type.byte_width()); | |
560 | } | |
561 | ||
562 | template <typename T> | |
563 | enable_if_decimal<T, Status> Visit(const T&) { | |
564 | using ScalarType = typename TypeTraits<T>::ScalarType; | |
565 | auto value = checked_cast<const ScalarType&>(scalar_).value.ToBytes(); | |
566 | return FinishFixedWidth(value.data(), value.size()); | |
567 | } | |
568 | ||
569 | Status Visit(const Decimal256Type&) { | |
570 | auto value = checked_cast<const Decimal256Scalar&>(scalar_).value.ToBytes(); | |
571 | return FinishFixedWidth(value.data(), value.size()); | |
572 | } | |
573 | ||
574 | template <typename T> | |
575 | enable_if_base_binary<T, Status> Visit(const T&) { | |
576 | std::shared_ptr<Buffer> value = | |
577 | checked_cast<const typename TypeTraits<T>::ScalarType&>(scalar_).value; | |
578 | std::shared_ptr<Buffer> values_buffer, offsets_buffer; | |
579 | RETURN_NOT_OK(CreateBufferOf(value->data(), value->size(), &values_buffer)); | |
580 | auto size = static_cast<typename T::offset_type>(value->size()); | |
581 | RETURN_NOT_OK(CreateOffsetsBuffer(size, &offsets_buffer)); | |
582 | out_ = std::make_shared<typename TypeTraits<T>::ArrayType>(length_, offsets_buffer, | |
583 | values_buffer); | |
584 | return Status::OK(); | |
585 | } | |
586 | ||
587 | template <typename T> | |
588 | enable_if_var_size_list<T, Status> Visit(const T& type) { | |
589 | using ScalarType = typename TypeTraits<T>::ScalarType; | |
590 | using ArrayType = typename TypeTraits<T>::ArrayType; | |
591 | ||
592 | auto value = checked_cast<const ScalarType&>(scalar_).value; | |
593 | ||
594 | ArrayVector values(length_, value); | |
595 | ARROW_ASSIGN_OR_RAISE(auto value_array, Concatenate(values, pool_)); | |
596 | ||
597 | std::shared_ptr<Buffer> offsets_buffer; | |
598 | auto size = static_cast<typename T::offset_type>(value->length()); | |
599 | RETURN_NOT_OK(CreateOffsetsBuffer(size, &offsets_buffer)); | |
600 | ||
601 | out_ = | |
602 | std::make_shared<ArrayType>(scalar_.type, length_, offsets_buffer, value_array); | |
603 | return Status::OK(); | |
604 | } | |
605 | ||
606 | Status Visit(const FixedSizeListType& type) { | |
607 | auto value = checked_cast<const FixedSizeListScalar&>(scalar_).value; | |
608 | ||
609 | ArrayVector values(length_, value); | |
610 | ARROW_ASSIGN_OR_RAISE(auto value_array, Concatenate(values, pool_)); | |
611 | ||
612 | out_ = std::make_shared<FixedSizeListArray>(scalar_.type, length_, value_array); | |
613 | return Status::OK(); | |
614 | } | |
615 | ||
616 | Status Visit(const MapType& type) { | |
617 | auto map_scalar = checked_cast<const MapScalar&>(scalar_); | |
618 | auto struct_array = checked_cast<const StructArray*>(map_scalar.value.get()); | |
619 | ||
620 | ArrayVector keys(length_, struct_array->field(0)); | |
621 | ArrayVector values(length_, struct_array->field(1)); | |
622 | ||
623 | ARROW_ASSIGN_OR_RAISE(auto key_array, Concatenate(keys, pool_)); | |
624 | ARROW_ASSIGN_OR_RAISE(auto value_array, Concatenate(values, pool_)); | |
625 | ||
626 | std::shared_ptr<Buffer> offsets_buffer; | |
627 | auto size = static_cast<typename MapType::offset_type>(struct_array->length()); | |
628 | RETURN_NOT_OK(CreateOffsetsBuffer(size, &offsets_buffer)); | |
629 | ||
630 | out_ = std::make_shared<MapArray>(scalar_.type, length_, std::move(offsets_buffer), | |
631 | std::move(key_array), std::move(value_array)); | |
632 | return Status::OK(); | |
633 | } | |
634 | ||
635 | Status Visit(const DictionaryType& type) { | |
636 | const auto& value = checked_cast<const DictionaryScalar&>(scalar_).value; | |
637 | ARROW_ASSIGN_OR_RAISE(auto indices, | |
638 | MakeArrayFromScalar(*value.index, length_, pool_)); | |
639 | out_ = std::make_shared<DictionaryArray>(scalar_.type, std::move(indices), | |
640 | value.dictionary); | |
641 | return Status::OK(); | |
642 | } | |
643 | ||
644 | Status Visit(const StructType& type) { | |
645 | ArrayVector fields; | |
646 | for (const auto& value : checked_cast<const StructScalar&>(scalar_).value) { | |
647 | fields.emplace_back(); | |
648 | ARROW_ASSIGN_OR_RAISE(fields.back(), MakeArrayFromScalar(*value, length_, pool_)); | |
649 | } | |
650 | out_ = std::make_shared<StructArray>(scalar_.type, length_, std::move(fields)); | |
651 | return Status::OK(); | |
652 | } | |
653 | ||
654 | Status Visit(const SparseUnionType& type) { | |
655 | const auto& union_scalar = checked_cast<const UnionScalar&>(scalar_); | |
656 | const auto& union_type = checked_cast<const UnionType&>(*scalar_.type); | |
657 | const auto scalar_type_code = union_scalar.type_code; | |
658 | const auto scalar_child_id = union_type.child_ids()[scalar_type_code]; | |
659 | ||
660 | // Create child arrays: most of them are all-null, except for the child array | |
661 | // for the given type code (if the scalar is valid). | |
662 | ArrayVector fields; | |
663 | for (int i = 0; i < type.num_fields(); ++i) { | |
664 | fields.emplace_back(); | |
665 | if (i == scalar_child_id && scalar_.is_valid) { | |
666 | ARROW_ASSIGN_OR_RAISE(fields.back(), | |
667 | MakeArrayFromScalar(*union_scalar.value, length_, pool_)); | |
668 | } else { | |
669 | ARROW_ASSIGN_OR_RAISE( | |
670 | fields.back(), MakeArrayOfNull(union_type.field(i)->type(), length_, pool_)); | |
671 | } | |
672 | } | |
673 | ||
674 | ARROW_ASSIGN_OR_RAISE(auto type_codes_buffer, CreateUnionTypeCodes(scalar_type_code)); | |
675 | ||
676 | out_ = std::make_shared<SparseUnionArray>(scalar_.type, length_, std::move(fields), | |
677 | std::move(type_codes_buffer)); | |
678 | return Status::OK(); | |
679 | } | |
680 | ||
681 | Status Visit(const DenseUnionType& type) { | |
682 | const auto& union_scalar = checked_cast<const UnionScalar&>(scalar_); | |
683 | const auto& union_type = checked_cast<const UnionType&>(*scalar_.type); | |
684 | const auto scalar_type_code = union_scalar.type_code; | |
685 | const auto scalar_child_id = union_type.child_ids()[scalar_type_code]; | |
686 | ||
687 | // Create child arrays: all of them are empty, except for the child array | |
688 | // for the given type code (if length > 0). | |
689 | ArrayVector fields; | |
690 | for (int i = 0; i < type.num_fields(); ++i) { | |
691 | fields.emplace_back(); | |
692 | if (i == scalar_child_id && length_ > 0) { | |
693 | if (scalar_.is_valid) { | |
694 | // One valid element (will be referenced by multiple offsets) | |
695 | ARROW_ASSIGN_OR_RAISE(fields.back(), | |
696 | MakeArrayFromScalar(*union_scalar.value, 1, pool_)); | |
697 | } else { | |
698 | // One null element (will be referenced by multiple offsets) | |
699 | ARROW_ASSIGN_OR_RAISE(fields.back(), | |
700 | MakeArrayOfNull(union_type.field(i)->type(), 1, pool_)); | |
701 | } | |
702 | } else { | |
703 | // Zero element (will not be referenced by any offset) | |
704 | ARROW_ASSIGN_OR_RAISE(fields.back(), | |
705 | MakeArrayOfNull(union_type.field(i)->type(), 0, pool_)); | |
706 | } | |
707 | } | |
708 | ||
709 | // Create an offsets buffer with all offsets equal to 0 | |
710 | ARROW_ASSIGN_OR_RAISE(auto offsets_buffer, | |
711 | AllocateBuffer(length_ * sizeof(int32_t), pool_)); | |
712 | memset(offsets_buffer->mutable_data(), 0, offsets_buffer->size()); | |
713 | ||
714 | ARROW_ASSIGN_OR_RAISE(auto type_codes_buffer, CreateUnionTypeCodes(scalar_type_code)); | |
715 | ||
716 | out_ = std::make_shared<DenseUnionArray>(scalar_.type, length_, std::move(fields), | |
717 | std::move(type_codes_buffer), | |
718 | std::move(offsets_buffer)); | |
719 | return Status::OK(); | |
720 | } | |
721 | ||
722 | Status Visit(const ExtensionType& type) { | |
723 | return Status::NotImplemented("construction from scalar of type ", *scalar_.type); | |
724 | } | |
725 | ||
726 | Result<std::shared_ptr<Buffer>> CreateUnionTypeCodes(int8_t type_code) { | |
727 | TypedBufferBuilder<int8_t> builder(pool_); | |
728 | RETURN_NOT_OK(builder.Resize(length_)); | |
729 | builder.UnsafeAppend(length_, type_code); | |
730 | return builder.Finish(); | |
731 | } | |
732 | ||
733 | template <typename OffsetType> | |
734 | Status CreateOffsetsBuffer(OffsetType value_length, std::shared_ptr<Buffer>* out) { | |
735 | TypedBufferBuilder<OffsetType> builder(pool_); | |
736 | RETURN_NOT_OK(builder.Resize(length_ + 1)); | |
737 | OffsetType offset = 0; | |
738 | for (int64_t i = 0; i < length_ + 1; ++i, offset += value_length) { | |
739 | builder.UnsafeAppend(offset); | |
740 | } | |
741 | return builder.Finish(out); | |
742 | } | |
743 | ||
744 | Status CreateBufferOf(const void* data, size_t data_length, | |
745 | std::shared_ptr<Buffer>* out) { | |
746 | BufferBuilder builder(pool_); | |
747 | RETURN_NOT_OK(builder.Resize(length_ * data_length)); | |
748 | for (int64_t i = 0; i < length_; ++i) { | |
749 | builder.UnsafeAppend(data, data_length); | |
750 | } | |
751 | return builder.Finish(out); | |
752 | } | |
753 | ||
754 | Status FinishFixedWidth(const void* data, size_t data_length) { | |
755 | std::shared_ptr<Buffer> buffer; | |
756 | RETURN_NOT_OK(CreateBufferOf(data, data_length, &buffer)); | |
757 | out_ = MakeArray( | |
758 | ArrayData::Make(scalar_.type, length_, {nullptr, std::move(buffer)}, 0)); | |
759 | return Status::OK(); | |
760 | } | |
761 | ||
762 | MemoryPool* pool_; | |
763 | const Scalar& scalar_; | |
764 | int64_t length_; | |
765 | std::shared_ptr<Array> out_; | |
766 | }; | |
767 | ||
768 | } // namespace | |
769 | ||
770 | Result<std::shared_ptr<Array>> MakeArrayOfNull(const std::shared_ptr<DataType>& type, | |
771 | int64_t length, MemoryPool* pool) { | |
772 | ARROW_ASSIGN_OR_RAISE(auto data, NullArrayFactory(pool, type, length).Create()); | |
773 | return MakeArray(data); | |
774 | } | |
775 | ||
776 | Result<std::shared_ptr<Array>> MakeArrayFromScalar(const Scalar& scalar, int64_t length, | |
777 | MemoryPool* pool) { | |
778 | // Null union scalars still have a type code associated | |
779 | if (!scalar.is_valid && !is_union(scalar.type->id())) { | |
780 | return MakeArrayOfNull(scalar.type, length, pool); | |
781 | } | |
782 | return RepeatedArrayFactory(pool, scalar, length).Create(); | |
783 | } | |
784 | ||
785 | namespace internal { | |
786 | ||
787 | std::vector<ArrayVector> RechunkArraysConsistently( | |
788 | const std::vector<ArrayVector>& groups) { | |
789 | if (groups.size() <= 1) { | |
790 | return groups; | |
791 | } | |
792 | int64_t total_length = 0; | |
793 | for (const auto& array : groups.front()) { | |
794 | total_length += array->length(); | |
795 | } | |
796 | #ifndef NDEBUG | |
797 | for (const auto& group : groups) { | |
798 | int64_t group_length = 0; | |
799 | for (const auto& array : group) { | |
800 | group_length += array->length(); | |
801 | } | |
802 | DCHECK_EQ(group_length, total_length) | |
803 | << "Array groups should have the same total number of elements"; | |
804 | } | |
805 | #endif | |
806 | if (total_length == 0) { | |
807 | return groups; | |
808 | } | |
809 | ||
810 | // Set up result vectors | |
811 | std::vector<ArrayVector> rechunked_groups(groups.size()); | |
812 | ||
813 | // Set up progress counters | |
814 | std::vector<ArrayVector::const_iterator> current_arrays; | |
815 | std::vector<int64_t> array_offsets; | |
816 | for (const auto& group : groups) { | |
817 | current_arrays.emplace_back(group.cbegin()); | |
818 | array_offsets.emplace_back(0); | |
819 | } | |
820 | ||
821 | // Scan all array vectors at once, rechunking along the way | |
822 | int64_t start = 0; | |
823 | while (start < total_length) { | |
824 | // First compute max possible length for next chunk | |
825 | int64_t chunk_length = std::numeric_limits<int64_t>::max(); | |
826 | for (size_t i = 0; i < groups.size(); i++) { | |
827 | auto& arr_it = current_arrays[i]; | |
828 | auto& offset = array_offsets[i]; | |
829 | // Skip any done arrays (including 0-length arrays) | |
830 | while (offset == (*arr_it)->length()) { | |
831 | ++arr_it; | |
832 | offset = 0; | |
833 | } | |
834 | const auto& array = *arr_it; | |
835 | DCHECK_GT(array->length(), offset); | |
836 | chunk_length = std::min(chunk_length, array->length() - offset); | |
837 | } | |
838 | DCHECK_GT(chunk_length, 0); | |
839 | ||
840 | // Then slice all arrays along this chunk size | |
841 | for (size_t i = 0; i < groups.size(); i++) { | |
842 | const auto& array = *current_arrays[i]; | |
843 | auto& offset = array_offsets[i]; | |
844 | if (offset == 0 && array->length() == chunk_length) { | |
845 | // Slice spans entire array | |
846 | rechunked_groups[i].emplace_back(array); | |
847 | } else { | |
848 | DCHECK_LT(chunk_length - offset, array->length()); | |
849 | rechunked_groups[i].emplace_back(array->Slice(offset, chunk_length)); | |
850 | } | |
851 | offset += chunk_length; | |
852 | } | |
853 | start += chunk_length; | |
854 | } | |
855 | ||
856 | return rechunked_groups; | |
857 | } | |
858 | ||
859 | } // namespace internal | |
860 | } // namespace arrow |