]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/util/converter.h
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / util / converter.h
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include <string>
19 #include <utility>
20 #include <vector>
21
22 #include "arrow/array.h"
23 #include "arrow/chunked_array.h"
24 #include "arrow/status.h"
25 #include "arrow/type.h"
26 #include "arrow/type_traits.h"
27 #include "arrow/util/checked_cast.h"
28 #include "arrow/util/make_unique.h"
29 #include "arrow/visitor_inline.h"
30
31 namespace arrow {
32 namespace internal {
33
34 template <typename BaseConverter, template <typename...> class ConverterTrait>
35 static Result<std::unique_ptr<BaseConverter>> MakeConverter(
36 std::shared_ptr<DataType> type, typename BaseConverter::OptionsType options,
37 MemoryPool* pool);
38
39 template <typename Input, typename Options>
40 class Converter {
41 public:
42 using Self = Converter<Input, Options>;
43 using InputType = Input;
44 using OptionsType = Options;
45
46 virtual ~Converter() = default;
47
48 Status Construct(std::shared_ptr<DataType> type, OptionsType options,
49 MemoryPool* pool) {
50 type_ = std::move(type);
51 options_ = std::move(options);
52 return Init(pool);
53 }
54
55 virtual Status Append(InputType value) { return Status::NotImplemented("Append"); }
56
57 virtual Status Extend(InputType values, int64_t size, int64_t offset = 0) {
58 return Status::NotImplemented("Extend");
59 }
60
61 virtual Status ExtendMasked(InputType values, InputType mask, int64_t size,
62 int64_t offset = 0) {
63 return Status::NotImplemented("ExtendMasked");
64 }
65
66 const std::shared_ptr<ArrayBuilder>& builder() const { return builder_; }
67
68 const std::shared_ptr<DataType>& type() const { return type_; }
69
70 OptionsType options() const { return options_; }
71
72 bool may_overflow() const { return may_overflow_; }
73
74 bool rewind_on_overflow() const { return rewind_on_overflow_; }
75
76 virtual Status Reserve(int64_t additional_capacity) {
77 return builder_->Reserve(additional_capacity);
78 }
79
80 Status AppendNull() { return builder_->AppendNull(); }
81
82 virtual Result<std::shared_ptr<Array>> ToArray() { return builder_->Finish(); }
83
84 virtual Result<std::shared_ptr<Array>> ToArray(int64_t length) {
85 ARROW_ASSIGN_OR_RAISE(auto arr, this->ToArray());
86 return arr->Slice(0, length);
87 }
88
89 virtual Result<std::shared_ptr<ChunkedArray>> ToChunkedArray() {
90 ARROW_ASSIGN_OR_RAISE(auto array, ToArray());
91 std::vector<std::shared_ptr<Array>> chunks = {std::move(array)};
92 return std::make_shared<ChunkedArray>(chunks);
93 }
94
95 protected:
96 virtual Status Init(MemoryPool* pool) { return Status::OK(); }
97
98 std::shared_ptr<DataType> type_;
99 std::shared_ptr<ArrayBuilder> builder_;
100 OptionsType options_;
101 bool may_overflow_ = false;
102 bool rewind_on_overflow_ = false;
103 };
104
105 template <typename ArrowType, typename BaseConverter>
106 class PrimitiveConverter : public BaseConverter {
107 public:
108 using BuilderType = typename TypeTraits<ArrowType>::BuilderType;
109
110 protected:
111 Status Init(MemoryPool* pool) override {
112 this->builder_ = std::make_shared<BuilderType>(this->type_, pool);
113 // Narrow variable-sized binary types may overflow
114 this->may_overflow_ = is_binary_like(this->type_->id());
115 primitive_type_ = checked_cast<const ArrowType*>(this->type_.get());
116 primitive_builder_ = checked_cast<BuilderType*>(this->builder_.get());
117 return Status::OK();
118 }
119
120 const ArrowType* primitive_type_;
121 BuilderType* primitive_builder_;
122 };
123
124 template <typename ArrowType, typename BaseConverter,
125 template <typename...> class ConverterTrait>
126 class ListConverter : public BaseConverter {
127 public:
128 using BuilderType = typename TypeTraits<ArrowType>::BuilderType;
129 using ConverterType = typename ConverterTrait<ArrowType>::type;
130
131 protected:
132 Status Init(MemoryPool* pool) override {
133 list_type_ = checked_cast<const ArrowType*>(this->type_.get());
134 ARROW_ASSIGN_OR_RAISE(value_converter_,
135 (MakeConverter<BaseConverter, ConverterTrait>(
136 list_type_->value_type(), this->options_, pool)));
137 this->builder_ =
138 std::make_shared<BuilderType>(pool, value_converter_->builder(), this->type_);
139 list_builder_ = checked_cast<BuilderType*>(this->builder_.get());
140 // Narrow list types may overflow
141 this->may_overflow_ = this->rewind_on_overflow_ =
142 sizeof(typename ArrowType::offset_type) < sizeof(int64_t);
143 return Status::OK();
144 }
145
146 const ArrowType* list_type_;
147 BuilderType* list_builder_;
148 std::unique_ptr<BaseConverter> value_converter_;
149 };
150
151 template <typename BaseConverter, template <typename...> class ConverterTrait>
152 class StructConverter : public BaseConverter {
153 public:
154 using ConverterType = typename ConverterTrait<StructType>::type;
155
156 Status Reserve(int64_t additional_capacity) override {
157 ARROW_RETURN_NOT_OK(this->builder_->Reserve(additional_capacity));
158 for (const auto& child : children_) {
159 ARROW_RETURN_NOT_OK(child->Reserve(additional_capacity));
160 }
161 return Status::OK();
162 }
163
164 protected:
165 Status Init(MemoryPool* pool) override {
166 std::unique_ptr<BaseConverter> child_converter;
167 std::vector<std::shared_ptr<ArrayBuilder>> child_builders;
168
169 struct_type_ = checked_cast<const StructType*>(this->type_.get());
170 for (const auto& field : struct_type_->fields()) {
171 ARROW_ASSIGN_OR_RAISE(child_converter,
172 (MakeConverter<BaseConverter, ConverterTrait>(
173 field->type(), this->options_, pool)));
174 this->may_overflow_ |= child_converter->may_overflow();
175 this->rewind_on_overflow_ = this->may_overflow_;
176 child_builders.push_back(child_converter->builder());
177 children_.push_back(std::move(child_converter));
178 }
179
180 this->builder_ =
181 std::make_shared<StructBuilder>(this->type_, pool, std::move(child_builders));
182 struct_builder_ = checked_cast<StructBuilder*>(this->builder_.get());
183
184 return Status::OK();
185 }
186
187 const StructType* struct_type_;
188 StructBuilder* struct_builder_;
189 std::vector<std::unique_ptr<BaseConverter>> children_;
190 };
191
192 template <typename ValueType, typename BaseConverter>
193 class DictionaryConverter : public BaseConverter {
194 public:
195 using BuilderType = DictionaryBuilder<ValueType>;
196
197 protected:
198 Status Init(MemoryPool* pool) override {
199 std::unique_ptr<ArrayBuilder> builder;
200 ARROW_RETURN_NOT_OK(MakeDictionaryBuilder(pool, this->type_, NULLPTR, &builder));
201 this->builder_ = std::move(builder);
202 this->may_overflow_ = false;
203 dict_type_ = checked_cast<const DictionaryType*>(this->type_.get());
204 value_type_ = checked_cast<const ValueType*>(dict_type_->value_type().get());
205 value_builder_ = checked_cast<BuilderType*>(this->builder_.get());
206 return Status::OK();
207 }
208
209 const DictionaryType* dict_type_;
210 const ValueType* value_type_;
211 BuilderType* value_builder_;
212 };
213
214 template <typename BaseConverter, template <typename...> class ConverterTrait>
215 struct MakeConverterImpl {
216 template <typename T, typename ConverterType = typename ConverterTrait<T>::type>
217 Status Visit(const T&) {
218 out.reset(new ConverterType());
219 return out->Construct(std::move(type), std::move(options), pool);
220 }
221
222 Status Visit(const DictionaryType& t) {
223 switch (t.value_type()->id()) {
224 #define DICTIONARY_CASE(TYPE) \
225 case TYPE::type_id: \
226 out = internal::make_unique< \
227 typename ConverterTrait<DictionaryType>::template dictionary_type<TYPE>>(); \
228 break;
229 DICTIONARY_CASE(BooleanType);
230 DICTIONARY_CASE(Int8Type);
231 DICTIONARY_CASE(Int16Type);
232 DICTIONARY_CASE(Int32Type);
233 DICTIONARY_CASE(Int64Type);
234 DICTIONARY_CASE(UInt8Type);
235 DICTIONARY_CASE(UInt16Type);
236 DICTIONARY_CASE(UInt32Type);
237 DICTIONARY_CASE(UInt64Type);
238 DICTIONARY_CASE(FloatType);
239 DICTIONARY_CASE(DoubleType);
240 DICTIONARY_CASE(BinaryType);
241 DICTIONARY_CASE(StringType);
242 DICTIONARY_CASE(FixedSizeBinaryType);
243 #undef DICTIONARY_CASE
244 default:
245 return Status::NotImplemented("DictionaryArray converter for type ", t.ToString(),
246 " not implemented");
247 }
248 return out->Construct(std::move(type), std::move(options), pool);
249 }
250
251 Status Visit(const DataType& t) { return Status::NotImplemented(t.name()); }
252
253 std::shared_ptr<DataType> type;
254 typename BaseConverter::OptionsType options;
255 MemoryPool* pool;
256 std::unique_ptr<BaseConverter> out;
257 };
258
259 template <typename BaseConverter, template <typename...> class ConverterTrait>
260 static Result<std::unique_ptr<BaseConverter>> MakeConverter(
261 std::shared_ptr<DataType> type, typename BaseConverter::OptionsType options,
262 MemoryPool* pool) {
263 MakeConverterImpl<BaseConverter, ConverterTrait> visitor{
264 std::move(type), std::move(options), pool, NULLPTR};
265 ARROW_RETURN_NOT_OK(VisitTypeInline(*visitor.type, &visitor));
266 return std::move(visitor.out);
267 }
268
269 template <typename Converter>
270 class Chunker {
271 public:
272 using InputType = typename Converter::InputType;
273
274 explicit Chunker(std::unique_ptr<Converter> converter)
275 : converter_(std::move(converter)) {}
276
277 Status Reserve(int64_t additional_capacity) {
278 ARROW_RETURN_NOT_OK(converter_->Reserve(additional_capacity));
279 reserved_ += additional_capacity;
280 return Status::OK();
281 }
282
283 Status AppendNull() {
284 auto status = converter_->AppendNull();
285 if (ARROW_PREDICT_FALSE(status.IsCapacityError())) {
286 if (converter_->builder()->length() == 0) {
287 // Builder length == 0 means the individual element is too large to append.
288 // In this case, no need to try again.
289 return status;
290 }
291 ARROW_RETURN_NOT_OK(FinishChunk());
292 return converter_->AppendNull();
293 }
294 ++length_;
295 return status;
296 }
297
298 Status Append(InputType value) {
299 auto status = converter_->Append(value);
300 if (ARROW_PREDICT_FALSE(status.IsCapacityError())) {
301 if (converter_->builder()->length() == 0) {
302 return status;
303 }
304 ARROW_RETURN_NOT_OK(FinishChunk());
305 return Append(value);
306 }
307 ++length_;
308 return status;
309 }
310
311 Status Extend(InputType values, int64_t size, int64_t offset = 0) {
312 while (offset < size) {
313 auto length_before = converter_->builder()->length();
314 auto status = converter_->Extend(values, size, offset);
315 auto length_after = converter_->builder()->length();
316 auto num_converted = length_after - length_before;
317
318 offset += num_converted;
319 length_ += num_converted;
320
321 if (status.IsCapacityError()) {
322 if (converter_->builder()->length() == 0) {
323 // Builder length == 0 means the individual element is too large to append.
324 // In this case, no need to try again.
325 return status;
326 } else if (converter_->rewind_on_overflow()) {
327 // The list-like and binary-like conversion paths may raise a capacity error,
328 // we need to handle them differently. While the binary-like converters check
329 // the capacity before append/extend the list-like converters just check after
330 // append/extend. Thus depending on the implementation semantics we may need
331 // to rewind (slice) the output chunk by one.
332 length_ -= 1;
333 offset -= 1;
334 }
335 ARROW_RETURN_NOT_OK(FinishChunk());
336 } else if (!status.ok()) {
337 return status;
338 }
339 }
340 return Status::OK();
341 }
342
343 Status ExtendMasked(InputType values, InputType mask, int64_t size,
344 int64_t offset = 0) {
345 while (offset < size) {
346 auto length_before = converter_->builder()->length();
347 auto status = converter_->ExtendMasked(values, mask, size, offset);
348 auto length_after = converter_->builder()->length();
349 auto num_converted = length_after - length_before;
350
351 offset += num_converted;
352 length_ += num_converted;
353
354 if (status.IsCapacityError()) {
355 if (converter_->builder()->length() == 0) {
356 // Builder length == 0 means the individual element is too large to append.
357 // In this case, no need to try again.
358 return status;
359 } else if (converter_->rewind_on_overflow()) {
360 // The list-like and binary-like conversion paths may raise a capacity error,
361 // we need to handle them differently. While the binary-like converters check
362 // the capacity before append/extend the list-like converters just check after
363 // append/extend. Thus depending on the implementation semantics we may need
364 // to rewind (slice) the output chunk by one.
365 length_ -= 1;
366 offset -= 1;
367 }
368 ARROW_RETURN_NOT_OK(FinishChunk());
369 } else if (!status.ok()) {
370 return status;
371 }
372 }
373 return Status::OK();
374 }
375
376 Status FinishChunk() {
377 ARROW_ASSIGN_OR_RAISE(auto chunk, converter_->ToArray(length_));
378 chunks_.push_back(chunk);
379 // Reserve space for the remaining items.
380 // Besides being an optimization, it is also required if the converter's
381 // implementation relies on unsafe builder methods in converter->Append().
382 auto remaining = reserved_ - length_;
383 Reset();
384 return Reserve(remaining);
385 }
386
387 Result<std::shared_ptr<ChunkedArray>> ToChunkedArray() {
388 ARROW_RETURN_NOT_OK(FinishChunk());
389 return std::make_shared<ChunkedArray>(chunks_);
390 }
391
392 protected:
393 void Reset() {
394 converter_->builder()->Reset();
395 length_ = 0;
396 reserved_ = 0;
397 }
398
399 int64_t length_ = 0;
400 int64_t reserved_ = 0;
401 std::unique_ptr<Converter> converter_;
402 std::vector<std::shared_ptr<Array>> chunks_;
403 };
404
405 template <typename T>
406 static Result<std::unique_ptr<Chunker<T>>> MakeChunker(std::unique_ptr<T> converter) {
407 return internal::make_unique<Chunker<T>>(std::move(converter));
408 }
409
410 } // namespace internal
411 } // namespace arrow