]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/json/chunked_builder.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / json / chunked_builder.cc
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "arrow/json/chunked_builder.h"
19
20 #include <mutex>
21 #include <string>
22 #include <unordered_map>
23 #include <utility>
24 #include <vector>
25
26 #include "arrow/array.h"
27 #include "arrow/buffer.h"
28 #include "arrow/json/converter.h"
29 #include "arrow/table.h"
30 #include "arrow/util/checked_cast.h"
31 #include "arrow/util/logging.h"
32 #include "arrow/util/task_group.h"
33
34 namespace arrow {
35
36 using internal::checked_cast;
37 using internal::TaskGroup;
38
39 namespace json {
40
41 class NonNestedChunkedArrayBuilder : public ChunkedArrayBuilder {
42 public:
43 NonNestedChunkedArrayBuilder(const std::shared_ptr<TaskGroup>& task_group,
44 std::shared_ptr<Converter> converter)
45 : ChunkedArrayBuilder(task_group), converter_(std::move(converter)) {}
46
47 Status Finish(std::shared_ptr<ChunkedArray>* out) override {
48 RETURN_NOT_OK(task_group_->Finish());
49 *out = std::make_shared<ChunkedArray>(std::move(chunks_), converter_->out_type());
50 chunks_.clear();
51 return Status::OK();
52 }
53
54 Status ReplaceTaskGroup(const std::shared_ptr<TaskGroup>& task_group) override {
55 RETURN_NOT_OK(task_group_->Finish());
56 task_group_ = task_group;
57 return Status::OK();
58 }
59
60 protected:
61 ArrayVector chunks_;
62 std::mutex mutex_;
63 std::shared_ptr<Converter> converter_;
64 };
65
66 class TypedChunkedArrayBuilder
67 : public NonNestedChunkedArrayBuilder,
68 public std::enable_shared_from_this<TypedChunkedArrayBuilder> {
69 public:
70 using NonNestedChunkedArrayBuilder::NonNestedChunkedArrayBuilder;
71
72 void Insert(int64_t block_index, const std::shared_ptr<Field>&,
73 const std::shared_ptr<Array>& unconverted) override {
74 std::unique_lock<std::mutex> lock(mutex_);
75 if (chunks_.size() <= static_cast<size_t>(block_index)) {
76 chunks_.resize(static_cast<size_t>(block_index) + 1, nullptr);
77 }
78 lock.unlock();
79
80 auto self = shared_from_this();
81
82 task_group_->Append([self, block_index, unconverted] {
83 std::shared_ptr<Array> converted;
84 RETURN_NOT_OK(self->converter_->Convert(unconverted, &converted));
85 std::unique_lock<std::mutex> lock(self->mutex_);
86 self->chunks_[block_index] = std::move(converted);
87 return Status::OK();
88 });
89 }
90 };
91
92 class InferringChunkedArrayBuilder
93 : public NonNestedChunkedArrayBuilder,
94 public std::enable_shared_from_this<InferringChunkedArrayBuilder> {
95 public:
96 InferringChunkedArrayBuilder(const std::shared_ptr<TaskGroup>& task_group,
97 const PromotionGraph* promotion_graph,
98 std::shared_ptr<Converter> converter)
99 : NonNestedChunkedArrayBuilder(task_group, std::move(converter)),
100 promotion_graph_(promotion_graph) {}
101
102 void Insert(int64_t block_index, const std::shared_ptr<Field>& unconverted_field,
103 const std::shared_ptr<Array>& unconverted) override {
104 std::unique_lock<std::mutex> lock(mutex_);
105 if (chunks_.size() <= static_cast<size_t>(block_index)) {
106 chunks_.resize(static_cast<size_t>(block_index) + 1, nullptr);
107 unconverted_.resize(chunks_.size(), nullptr);
108 unconverted_fields_.resize(chunks_.size(), nullptr);
109 }
110 unconverted_[block_index] = unconverted;
111 unconverted_fields_[block_index] = unconverted_field;
112 lock.unlock();
113 ScheduleConvertChunk(block_index);
114 }
115
116 void ScheduleConvertChunk(int64_t block_index) {
117 auto self = shared_from_this();
118 task_group_->Append([self, block_index] {
119 return self->TryConvertChunk(static_cast<size_t>(block_index));
120 });
121 }
122
123 Status TryConvertChunk(size_t block_index) {
124 std::unique_lock<std::mutex> lock(mutex_);
125 auto converter = converter_;
126 auto unconverted = unconverted_[block_index];
127 auto unconverted_field = unconverted_fields_[block_index];
128 std::shared_ptr<Array> converted;
129
130 lock.unlock();
131 Status st = converter->Convert(unconverted, &converted);
132 lock.lock();
133
134 if (converter != converter_) {
135 // another task promoted converter; reconvert
136 lock.unlock();
137 ScheduleConvertChunk(block_index);
138 return Status::OK();
139 }
140
141 if (st.ok()) {
142 // conversion succeeded
143 chunks_[block_index] = std::move(converted);
144 return Status::OK();
145 }
146
147 auto promoted_type =
148 promotion_graph_->Promote(converter_->out_type(), unconverted_field);
149 if (promoted_type == nullptr) {
150 // converter failed, no promotion available
151 return st;
152 }
153 RETURN_NOT_OK(MakeConverter(promoted_type, converter_->pool(), &converter_));
154
155 size_t nchunks = chunks_.size();
156 for (size_t i = 0; i < nchunks; ++i) {
157 if (i != block_index && chunks_[i]) {
158 // We're assuming the chunk was converted using the wrong type
159 // (which should be true unless the executor reorders tasks)
160 chunks_[i].reset();
161 lock.unlock();
162 ScheduleConvertChunk(i);
163 lock.lock();
164 }
165 }
166 lock.unlock();
167 ScheduleConvertChunk(block_index);
168 return Status::OK();
169 }
170
171 Status Finish(std::shared_ptr<ChunkedArray>* out) override {
172 RETURN_NOT_OK(NonNestedChunkedArrayBuilder::Finish(out));
173 unconverted_.clear();
174 return Status::OK();
175 }
176
177 private:
178 ArrayVector unconverted_;
179 std::vector<std::shared_ptr<Field>> unconverted_fields_;
180 const PromotionGraph* promotion_graph_;
181 };
182
183 class ChunkedListArrayBuilder : public ChunkedArrayBuilder {
184 public:
185 ChunkedListArrayBuilder(const std::shared_ptr<TaskGroup>& task_group, MemoryPool* pool,
186 std::shared_ptr<ChunkedArrayBuilder> value_builder,
187 const std::shared_ptr<Field>& value_field)
188 : ChunkedArrayBuilder(task_group),
189 pool_(pool),
190 value_builder_(std::move(value_builder)),
191 value_field_(value_field) {}
192
193 Status ReplaceTaskGroup(const std::shared_ptr<TaskGroup>& task_group) override {
194 RETURN_NOT_OK(task_group_->Finish());
195 RETURN_NOT_OK(value_builder_->ReplaceTaskGroup(task_group));
196 task_group_ = task_group;
197 return Status::OK();
198 }
199
200 void Insert(int64_t block_index, const std::shared_ptr<Field>&,
201 const std::shared_ptr<Array>& unconverted) override {
202 std::unique_lock<std::mutex> lock(mutex_);
203
204 if (null_bitmap_chunks_.size() <= static_cast<size_t>(block_index)) {
205 null_bitmap_chunks_.resize(static_cast<size_t>(block_index) + 1, nullptr);
206 offset_chunks_.resize(null_bitmap_chunks_.size(), nullptr);
207 }
208
209 if (unconverted->type_id() == Type::NA) {
210 auto st = InsertNull(block_index, unconverted->length());
211 if (!st.ok()) {
212 task_group_->Append([st] { return st; });
213 }
214 return;
215 }
216
217 DCHECK_EQ(unconverted->type_id(), Type::LIST);
218 const auto& list_array = checked_cast<const ListArray&>(*unconverted);
219
220 null_bitmap_chunks_[block_index] = unconverted->null_bitmap();
221 offset_chunks_[block_index] = list_array.value_offsets();
222
223 value_builder_->Insert(block_index, list_array.list_type()->value_field(),
224 list_array.values());
225 }
226
227 Status Finish(std::shared_ptr<ChunkedArray>* out) override {
228 RETURN_NOT_OK(task_group_->Finish());
229
230 std::shared_ptr<ChunkedArray> value_array;
231 RETURN_NOT_OK(value_builder_->Finish(&value_array));
232
233 auto type = list(value_field_->WithType(value_array->type())->WithMetadata(nullptr));
234 ArrayVector chunks(null_bitmap_chunks_.size());
235 for (size_t i = 0; i < null_bitmap_chunks_.size(); ++i) {
236 auto value_chunk = value_array->chunk(static_cast<int>(i));
237 auto length = offset_chunks_[i]->size() / sizeof(int32_t) - 1;
238 chunks[i] = std::make_shared<ListArray>(type, length, offset_chunks_[i],
239 value_chunk, null_bitmap_chunks_[i]);
240 }
241
242 *out = std::make_shared<ChunkedArray>(std::move(chunks), type);
243 return Status::OK();
244 }
245
246 private:
247 // call from Insert() only, with mutex_ locked
248 Status InsertNull(int64_t block_index, int64_t length) {
249 value_builder_->Insert(block_index, value_field_, std::make_shared<NullArray>(0));
250
251 ARROW_ASSIGN_OR_RAISE(null_bitmap_chunks_[block_index],
252 AllocateEmptyBitmap(length, pool_));
253
254 int64_t offsets_length = (length + 1) * sizeof(int32_t);
255 ARROW_ASSIGN_OR_RAISE(offset_chunks_[block_index],
256 AllocateBuffer(offsets_length, pool_));
257 std::memset(offset_chunks_[block_index]->mutable_data(), 0, offsets_length);
258
259 return Status::OK();
260 }
261
262 std::mutex mutex_;
263 MemoryPool* pool_;
264 std::shared_ptr<ChunkedArrayBuilder> value_builder_;
265 BufferVector offset_chunks_, null_bitmap_chunks_;
266 std::shared_ptr<Field> value_field_;
267 };
268
269 class ChunkedStructArrayBuilder : public ChunkedArrayBuilder {
270 public:
271 ChunkedStructArrayBuilder(
272 const std::shared_ptr<TaskGroup>& task_group, MemoryPool* pool,
273 const PromotionGraph* promotion_graph,
274 std::vector<std::pair<std::string, std::shared_ptr<ChunkedArrayBuilder>>>
275 name_builders)
276 : ChunkedArrayBuilder(task_group), pool_(pool), promotion_graph_(promotion_graph) {
277 for (auto&& name_builder : name_builders) {
278 auto index = static_cast<int>(name_to_index_.size());
279 name_to_index_.emplace(std::move(name_builder.first), index);
280 child_builders_.emplace_back(std::move(name_builder.second));
281 }
282 }
283
284 void Insert(int64_t block_index, const std::shared_ptr<Field>&,
285 const std::shared_ptr<Array>& unconverted) override {
286 std::unique_lock<std::mutex> lock(mutex_);
287
288 if (null_bitmap_chunks_.size() <= static_cast<size_t>(block_index)) {
289 null_bitmap_chunks_.resize(static_cast<size_t>(block_index) + 1, nullptr);
290 chunk_lengths_.resize(null_bitmap_chunks_.size(), -1);
291 child_absent_.resize(null_bitmap_chunks_.size(), std::vector<bool>(0));
292 }
293 null_bitmap_chunks_[block_index] = unconverted->null_bitmap();
294 chunk_lengths_[block_index] = unconverted->length();
295
296 if (unconverted->type_id() == Type::NA) {
297 auto maybe_buffer = AllocateBitmap(unconverted->length(), pool_);
298 if (maybe_buffer.ok()) {
299 null_bitmap_chunks_[block_index] = *std::move(maybe_buffer);
300 std::memset(null_bitmap_chunks_[block_index]->mutable_data(), 0,
301 null_bitmap_chunks_[block_index]->size());
302 } else {
303 Status st = maybe_buffer.status();
304 task_group_->Append([st] { return st; });
305 }
306
307 // absent fields will be inserted at Finish
308 return;
309 }
310
311 const auto& struct_array = checked_cast<const StructArray&>(*unconverted);
312 if (promotion_graph_ == nullptr) {
313 // If unexpected fields are ignored or result in an error then all parsers will emit
314 // columns exclusively in the ordering specified in ParseOptions::explicit_schema,
315 // so child_builders_ is immutable and no associative lookup is necessary.
316 for (int i = 0; i < unconverted->num_fields(); ++i) {
317 child_builders_[i]->Insert(block_index, unconverted->type()->field(i),
318 struct_array.field(i));
319 }
320 } else {
321 auto st = InsertChildren(block_index, struct_array);
322 if (!st.ok()) {
323 return task_group_->Append([st] { return st; });
324 }
325 }
326 }
327
328 Status Finish(std::shared_ptr<ChunkedArray>* out) override {
329 RETURN_NOT_OK(task_group_->Finish());
330
331 if (promotion_graph_ != nullptr) {
332 // insert absent child chunks
333 for (auto&& name_index : name_to_index_) {
334 auto child_builder = child_builders_[name_index.second].get();
335
336 RETURN_NOT_OK(child_builder->ReplaceTaskGroup(TaskGroup::MakeSerial()));
337
338 for (size_t i = 0; i < chunk_lengths_.size(); ++i) {
339 if (child_absent_[i].size() > static_cast<size_t>(name_index.second) &&
340 !child_absent_[i][name_index.second]) {
341 continue;
342 }
343 auto empty = std::make_shared<NullArray>(chunk_lengths_[i]);
344 child_builder->Insert(i, promotion_graph_->Null(name_index.first), empty);
345 }
346 }
347 }
348
349 std::vector<std::shared_ptr<Field>> fields(name_to_index_.size());
350 std::vector<std::shared_ptr<ChunkedArray>> child_arrays(name_to_index_.size());
351 for (auto&& name_index : name_to_index_) {
352 auto child_builder = child_builders_[name_index.second].get();
353
354 std::shared_ptr<ChunkedArray> child_array;
355 RETURN_NOT_OK(child_builder->Finish(&child_array));
356
357 child_arrays[name_index.second] = child_array;
358 fields[name_index.second] = field(name_index.first, child_array->type());
359 }
360
361 auto type = struct_(std::move(fields));
362 ArrayVector chunks(null_bitmap_chunks_.size());
363 for (size_t i = 0; i < null_bitmap_chunks_.size(); ++i) {
364 ArrayVector child_chunks;
365 for (const auto& child_array : child_arrays) {
366 child_chunks.push_back(child_array->chunk(static_cast<int>(i)));
367 }
368 chunks[i] = std::make_shared<StructArray>(type, chunk_lengths_[i], child_chunks,
369 null_bitmap_chunks_[i]);
370 }
371
372 *out = std::make_shared<ChunkedArray>(std::move(chunks), type);
373 return Status::OK();
374 }
375
376 Status ReplaceTaskGroup(const std::shared_ptr<TaskGroup>& task_group) override {
377 RETURN_NOT_OK(task_group_->Finish());
378 for (auto&& child_builder : child_builders_) {
379 RETURN_NOT_OK(child_builder->ReplaceTaskGroup(task_group));
380 }
381 task_group_ = task_group;
382 return Status::OK();
383 }
384
385 private:
386 // Insert children associatively by name; the unconverted block may have unexpected or
387 // differently ordered fields
388 // call from Insert() only, with mutex_ locked
389 Status InsertChildren(int64_t block_index, const StructArray& unconverted) {
390 const auto& fields = unconverted.type()->fields();
391
392 for (int i = 0; i < unconverted.num_fields(); ++i) {
393 auto it = name_to_index_.find(fields[i]->name());
394
395 if (it == name_to_index_.end()) {
396 // add a new field to this builder
397 auto type = promotion_graph_->Infer(fields[i]);
398 DCHECK_NE(type, nullptr)
399 << "invalid unconverted_field encountered in conversion: "
400 << fields[i]->name() << ":" << *fields[i]->type();
401
402 auto new_index = static_cast<int>(name_to_index_.size());
403 it = name_to_index_.emplace(fields[i]->name(), new_index).first;
404
405 std::shared_ptr<ChunkedArrayBuilder> child_builder;
406 RETURN_NOT_OK(MakeChunkedArrayBuilder(task_group_, pool_, promotion_graph_, type,
407 &child_builder));
408 child_builders_.emplace_back(std::move(child_builder));
409 }
410
411 auto unconverted_field = unconverted.type()->field(i);
412 child_builders_[it->second]->Insert(block_index, unconverted_field,
413 unconverted.field(i));
414
415 child_absent_[block_index].resize(child_builders_.size(), true);
416 child_absent_[block_index][it->second] = false;
417 }
418
419 return Status::OK();
420 }
421
422 std::mutex mutex_;
423 MemoryPool* pool_;
424 const PromotionGraph* promotion_graph_;
425 std::unordered_map<std::string, int> name_to_index_;
426 std::vector<std::shared_ptr<ChunkedArrayBuilder>> child_builders_;
427 std::vector<std::vector<bool>> child_absent_;
428 BufferVector null_bitmap_chunks_;
429 std::vector<int64_t> chunk_lengths_;
430 };
431
432 Status MakeChunkedArrayBuilder(const std::shared_ptr<TaskGroup>& task_group,
433 MemoryPool* pool, const PromotionGraph* promotion_graph,
434 const std::shared_ptr<DataType>& type,
435 std::shared_ptr<ChunkedArrayBuilder>* out) {
436 if (type->id() == Type::STRUCT) {
437 std::vector<std::pair<std::string, std::shared_ptr<ChunkedArrayBuilder>>>
438 child_builders;
439 for (const auto& f : type->fields()) {
440 std::shared_ptr<ChunkedArrayBuilder> child_builder;
441 RETURN_NOT_OK(MakeChunkedArrayBuilder(task_group, pool, promotion_graph, f->type(),
442 &child_builder));
443 child_builders.emplace_back(f->name(), std::move(child_builder));
444 }
445 *out = std::make_shared<ChunkedStructArrayBuilder>(task_group, pool, promotion_graph,
446 std::move(child_builders));
447 return Status::OK();
448 }
449 if (type->id() == Type::LIST) {
450 const auto& list_type = checked_cast<const ListType&>(*type);
451 std::shared_ptr<ChunkedArrayBuilder> value_builder;
452 RETURN_NOT_OK(MakeChunkedArrayBuilder(task_group, pool, promotion_graph,
453 list_type.value_type(), &value_builder));
454 *out = std::make_shared<ChunkedListArrayBuilder>(
455 task_group, pool, std::move(value_builder), list_type.value_field());
456 return Status::OK();
457 }
458 std::shared_ptr<Converter> converter;
459 RETURN_NOT_OK(MakeConverter(type, pool, &converter));
460 if (promotion_graph) {
461 *out = std::make_shared<InferringChunkedArrayBuilder>(task_group, promotion_graph,
462 std::move(converter));
463 } else {
464 *out = std::make_shared<TypedChunkedArrayBuilder>(task_group, std::move(converter));
465 }
466 return Status::OK();
467 }
468
469 } // namespace json
470 } // namespace arrow