1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
18 #include "arrow/json/chunked_builder.h"
22 #include <unordered_map>
26 #include "arrow/array.h"
27 #include "arrow/buffer.h"
28 #include "arrow/json/converter.h"
29 #include "arrow/table.h"
30 #include "arrow/util/checked_cast.h"
31 #include "arrow/util/logging.h"
32 #include "arrow/util/task_group.h"
36 using internal::checked_cast
;
37 using internal::TaskGroup
;
41 class NonNestedChunkedArrayBuilder
: public ChunkedArrayBuilder
{
43 NonNestedChunkedArrayBuilder(const std::shared_ptr
<TaskGroup
>& task_group
,
44 std::shared_ptr
<Converter
> converter
)
45 : ChunkedArrayBuilder(task_group
), converter_(std::move(converter
)) {}
47 Status
Finish(std::shared_ptr
<ChunkedArray
>* out
) override
{
48 RETURN_NOT_OK(task_group_
->Finish());
49 *out
= std::make_shared
<ChunkedArray
>(std::move(chunks_
), converter_
->out_type());
54 Status
ReplaceTaskGroup(const std::shared_ptr
<TaskGroup
>& task_group
) override
{
55 RETURN_NOT_OK(task_group_
->Finish());
56 task_group_
= task_group
;
63 std::shared_ptr
<Converter
> converter_
;
66 class TypedChunkedArrayBuilder
67 : public NonNestedChunkedArrayBuilder
,
68 public std::enable_shared_from_this
<TypedChunkedArrayBuilder
> {
70 using NonNestedChunkedArrayBuilder::NonNestedChunkedArrayBuilder
;
72 void Insert(int64_t block_index
, const std::shared_ptr
<Field
>&,
73 const std::shared_ptr
<Array
>& unconverted
) override
{
74 std::unique_lock
<std::mutex
> lock(mutex_
);
75 if (chunks_
.size() <= static_cast<size_t>(block_index
)) {
76 chunks_
.resize(static_cast<size_t>(block_index
) + 1, nullptr);
80 auto self
= shared_from_this();
82 task_group_
->Append([self
, block_index
, unconverted
] {
83 std::shared_ptr
<Array
> converted
;
84 RETURN_NOT_OK(self
->converter_
->Convert(unconverted
, &converted
));
85 std::unique_lock
<std::mutex
> lock(self
->mutex_
);
86 self
->chunks_
[block_index
] = std::move(converted
);
92 class InferringChunkedArrayBuilder
93 : public NonNestedChunkedArrayBuilder
,
94 public std::enable_shared_from_this
<InferringChunkedArrayBuilder
> {
96 InferringChunkedArrayBuilder(const std::shared_ptr
<TaskGroup
>& task_group
,
97 const PromotionGraph
* promotion_graph
,
98 std::shared_ptr
<Converter
> converter
)
99 : NonNestedChunkedArrayBuilder(task_group
, std::move(converter
)),
100 promotion_graph_(promotion_graph
) {}
102 void Insert(int64_t block_index
, const std::shared_ptr
<Field
>& unconverted_field
,
103 const std::shared_ptr
<Array
>& unconverted
) override
{
104 std::unique_lock
<std::mutex
> lock(mutex_
);
105 if (chunks_
.size() <= static_cast<size_t>(block_index
)) {
106 chunks_
.resize(static_cast<size_t>(block_index
) + 1, nullptr);
107 unconverted_
.resize(chunks_
.size(), nullptr);
108 unconverted_fields_
.resize(chunks_
.size(), nullptr);
110 unconverted_
[block_index
] = unconverted
;
111 unconverted_fields_
[block_index
] = unconverted_field
;
113 ScheduleConvertChunk(block_index
);
116 void ScheduleConvertChunk(int64_t block_index
) {
117 auto self
= shared_from_this();
118 task_group_
->Append([self
, block_index
] {
119 return self
->TryConvertChunk(static_cast<size_t>(block_index
));
123 Status
TryConvertChunk(size_t block_index
) {
124 std::unique_lock
<std::mutex
> lock(mutex_
);
125 auto converter
= converter_
;
126 auto unconverted
= unconverted_
[block_index
];
127 auto unconverted_field
= unconverted_fields_
[block_index
];
128 std::shared_ptr
<Array
> converted
;
131 Status st
= converter
->Convert(unconverted
, &converted
);
134 if (converter
!= converter_
) {
135 // another task promoted converter; reconvert
137 ScheduleConvertChunk(block_index
);
142 // conversion succeeded
143 chunks_
[block_index
] = std::move(converted
);
148 promotion_graph_
->Promote(converter_
->out_type(), unconverted_field
);
149 if (promoted_type
== nullptr) {
150 // converter failed, no promotion available
153 RETURN_NOT_OK(MakeConverter(promoted_type
, converter_
->pool(), &converter_
));
155 size_t nchunks
= chunks_
.size();
156 for (size_t i
= 0; i
< nchunks
; ++i
) {
157 if (i
!= block_index
&& chunks_
[i
]) {
158 // We're assuming the chunk was converted using the wrong type
159 // (which should be true unless the executor reorders tasks)
162 ScheduleConvertChunk(i
);
167 ScheduleConvertChunk(block_index
);
171 Status
Finish(std::shared_ptr
<ChunkedArray
>* out
) override
{
172 RETURN_NOT_OK(NonNestedChunkedArrayBuilder::Finish(out
));
173 unconverted_
.clear();
178 ArrayVector unconverted_
;
179 std::vector
<std::shared_ptr
<Field
>> unconverted_fields_
;
180 const PromotionGraph
* promotion_graph_
;
183 class ChunkedListArrayBuilder
: public ChunkedArrayBuilder
{
185 ChunkedListArrayBuilder(const std::shared_ptr
<TaskGroup
>& task_group
, MemoryPool
* pool
,
186 std::shared_ptr
<ChunkedArrayBuilder
> value_builder
,
187 const std::shared_ptr
<Field
>& value_field
)
188 : ChunkedArrayBuilder(task_group
),
190 value_builder_(std::move(value_builder
)),
191 value_field_(value_field
) {}
193 Status
ReplaceTaskGroup(const std::shared_ptr
<TaskGroup
>& task_group
) override
{
194 RETURN_NOT_OK(task_group_
->Finish());
195 RETURN_NOT_OK(value_builder_
->ReplaceTaskGroup(task_group
));
196 task_group_
= task_group
;
200 void Insert(int64_t block_index
, const std::shared_ptr
<Field
>&,
201 const std::shared_ptr
<Array
>& unconverted
) override
{
202 std::unique_lock
<std::mutex
> lock(mutex_
);
204 if (null_bitmap_chunks_
.size() <= static_cast<size_t>(block_index
)) {
205 null_bitmap_chunks_
.resize(static_cast<size_t>(block_index
) + 1, nullptr);
206 offset_chunks_
.resize(null_bitmap_chunks_
.size(), nullptr);
209 if (unconverted
->type_id() == Type::NA
) {
210 auto st
= InsertNull(block_index
, unconverted
->length());
212 task_group_
->Append([st
] { return st
; });
217 DCHECK_EQ(unconverted
->type_id(), Type::LIST
);
218 const auto& list_array
= checked_cast
<const ListArray
&>(*unconverted
);
220 null_bitmap_chunks_
[block_index
] = unconverted
->null_bitmap();
221 offset_chunks_
[block_index
] = list_array
.value_offsets();
223 value_builder_
->Insert(block_index
, list_array
.list_type()->value_field(),
224 list_array
.values());
227 Status
Finish(std::shared_ptr
<ChunkedArray
>* out
) override
{
228 RETURN_NOT_OK(task_group_
->Finish());
230 std::shared_ptr
<ChunkedArray
> value_array
;
231 RETURN_NOT_OK(value_builder_
->Finish(&value_array
));
233 auto type
= list(value_field_
->WithType(value_array
->type())->WithMetadata(nullptr));
234 ArrayVector
chunks(null_bitmap_chunks_
.size());
235 for (size_t i
= 0; i
< null_bitmap_chunks_
.size(); ++i
) {
236 auto value_chunk
= value_array
->chunk(static_cast<int>(i
));
237 auto length
= offset_chunks_
[i
]->size() / sizeof(int32_t) - 1;
238 chunks
[i
] = std::make_shared
<ListArray
>(type
, length
, offset_chunks_
[i
],
239 value_chunk
, null_bitmap_chunks_
[i
]);
242 *out
= std::make_shared
<ChunkedArray
>(std::move(chunks
), type
);
247 // call from Insert() only, with mutex_ locked
248 Status
InsertNull(int64_t block_index
, int64_t length
) {
249 value_builder_
->Insert(block_index
, value_field_
, std::make_shared
<NullArray
>(0));
251 ARROW_ASSIGN_OR_RAISE(null_bitmap_chunks_
[block_index
],
252 AllocateEmptyBitmap(length
, pool_
));
254 int64_t offsets_length
= (length
+ 1) * sizeof(int32_t);
255 ARROW_ASSIGN_OR_RAISE(offset_chunks_
[block_index
],
256 AllocateBuffer(offsets_length
, pool_
));
257 std::memset(offset_chunks_
[block_index
]->mutable_data(), 0, offsets_length
);
264 std::shared_ptr
<ChunkedArrayBuilder
> value_builder_
;
265 BufferVector offset_chunks_
, null_bitmap_chunks_
;
266 std::shared_ptr
<Field
> value_field_
;
269 class ChunkedStructArrayBuilder
: public ChunkedArrayBuilder
{
271 ChunkedStructArrayBuilder(
272 const std::shared_ptr
<TaskGroup
>& task_group
, MemoryPool
* pool
,
273 const PromotionGraph
* promotion_graph
,
274 std::vector
<std::pair
<std::string
, std::shared_ptr
<ChunkedArrayBuilder
>>>
276 : ChunkedArrayBuilder(task_group
), pool_(pool
), promotion_graph_(promotion_graph
) {
277 for (auto&& name_builder
: name_builders
) {
278 auto index
= static_cast<int>(name_to_index_
.size());
279 name_to_index_
.emplace(std::move(name_builder
.first
), index
);
280 child_builders_
.emplace_back(std::move(name_builder
.second
));
284 void Insert(int64_t block_index
, const std::shared_ptr
<Field
>&,
285 const std::shared_ptr
<Array
>& unconverted
) override
{
286 std::unique_lock
<std::mutex
> lock(mutex_
);
288 if (null_bitmap_chunks_
.size() <= static_cast<size_t>(block_index
)) {
289 null_bitmap_chunks_
.resize(static_cast<size_t>(block_index
) + 1, nullptr);
290 chunk_lengths_
.resize(null_bitmap_chunks_
.size(), -1);
291 child_absent_
.resize(null_bitmap_chunks_
.size(), std::vector
<bool>(0));
293 null_bitmap_chunks_
[block_index
] = unconverted
->null_bitmap();
294 chunk_lengths_
[block_index
] = unconverted
->length();
296 if (unconverted
->type_id() == Type::NA
) {
297 auto maybe_buffer
= AllocateBitmap(unconverted
->length(), pool_
);
298 if (maybe_buffer
.ok()) {
299 null_bitmap_chunks_
[block_index
] = *std::move(maybe_buffer
);
300 std::memset(null_bitmap_chunks_
[block_index
]->mutable_data(), 0,
301 null_bitmap_chunks_
[block_index
]->size());
303 Status st
= maybe_buffer
.status();
304 task_group_
->Append([st
] { return st
; });
307 // absent fields will be inserted at Finish
311 const auto& struct_array
= checked_cast
<const StructArray
&>(*unconverted
);
312 if (promotion_graph_
== nullptr) {
313 // If unexpected fields are ignored or result in an error then all parsers will emit
314 // columns exclusively in the ordering specified in ParseOptions::explicit_schema,
315 // so child_builders_ is immutable and no associative lookup is necessary.
316 for (int i
= 0; i
< unconverted
->num_fields(); ++i
) {
317 child_builders_
[i
]->Insert(block_index
, unconverted
->type()->field(i
),
318 struct_array
.field(i
));
321 auto st
= InsertChildren(block_index
, struct_array
);
323 return task_group_
->Append([st
] { return st
; });
328 Status
Finish(std::shared_ptr
<ChunkedArray
>* out
) override
{
329 RETURN_NOT_OK(task_group_
->Finish());
331 if (promotion_graph_
!= nullptr) {
332 // insert absent child chunks
333 for (auto&& name_index
: name_to_index_
) {
334 auto child_builder
= child_builders_
[name_index
.second
].get();
336 RETURN_NOT_OK(child_builder
->ReplaceTaskGroup(TaskGroup::MakeSerial()));
338 for (size_t i
= 0; i
< chunk_lengths_
.size(); ++i
) {
339 if (child_absent_
[i
].size() > static_cast<size_t>(name_index
.second
) &&
340 !child_absent_
[i
][name_index
.second
]) {
343 auto empty
= std::make_shared
<NullArray
>(chunk_lengths_
[i
]);
344 child_builder
->Insert(i
, promotion_graph_
->Null(name_index
.first
), empty
);
349 std::vector
<std::shared_ptr
<Field
>> fields(name_to_index_
.size());
350 std::vector
<std::shared_ptr
<ChunkedArray
>> child_arrays(name_to_index_
.size());
351 for (auto&& name_index
: name_to_index_
) {
352 auto child_builder
= child_builders_
[name_index
.second
].get();
354 std::shared_ptr
<ChunkedArray
> child_array
;
355 RETURN_NOT_OK(child_builder
->Finish(&child_array
));
357 child_arrays
[name_index
.second
] = child_array
;
358 fields
[name_index
.second
] = field(name_index
.first
, child_array
->type());
361 auto type
= struct_(std::move(fields
));
362 ArrayVector
chunks(null_bitmap_chunks_
.size());
363 for (size_t i
= 0; i
< null_bitmap_chunks_
.size(); ++i
) {
364 ArrayVector child_chunks
;
365 for (const auto& child_array
: child_arrays
) {
366 child_chunks
.push_back(child_array
->chunk(static_cast<int>(i
)));
368 chunks
[i
] = std::make_shared
<StructArray
>(type
, chunk_lengths_
[i
], child_chunks
,
369 null_bitmap_chunks_
[i
]);
372 *out
= std::make_shared
<ChunkedArray
>(std::move(chunks
), type
);
376 Status
ReplaceTaskGroup(const std::shared_ptr
<TaskGroup
>& task_group
) override
{
377 RETURN_NOT_OK(task_group_
->Finish());
378 for (auto&& child_builder
: child_builders_
) {
379 RETURN_NOT_OK(child_builder
->ReplaceTaskGroup(task_group
));
381 task_group_
= task_group
;
386 // Insert children associatively by name; the unconverted block may have unexpected or
387 // differently ordered fields
388 // call from Insert() only, with mutex_ locked
389 Status
InsertChildren(int64_t block_index
, const StructArray
& unconverted
) {
390 const auto& fields
= unconverted
.type()->fields();
392 for (int i
= 0; i
< unconverted
.num_fields(); ++i
) {
393 auto it
= name_to_index_
.find(fields
[i
]->name());
395 if (it
== name_to_index_
.end()) {
396 // add a new field to this builder
397 auto type
= promotion_graph_
->Infer(fields
[i
]);
398 DCHECK_NE(type
, nullptr)
399 << "invalid unconverted_field encountered in conversion: "
400 << fields
[i
]->name() << ":" << *fields
[i
]->type();
402 auto new_index
= static_cast<int>(name_to_index_
.size());
403 it
= name_to_index_
.emplace(fields
[i
]->name(), new_index
).first
;
405 std::shared_ptr
<ChunkedArrayBuilder
> child_builder
;
406 RETURN_NOT_OK(MakeChunkedArrayBuilder(task_group_
, pool_
, promotion_graph_
, type
,
408 child_builders_
.emplace_back(std::move(child_builder
));
411 auto unconverted_field
= unconverted
.type()->field(i
);
412 child_builders_
[it
->second
]->Insert(block_index
, unconverted_field
,
413 unconverted
.field(i
));
415 child_absent_
[block_index
].resize(child_builders_
.size(), true);
416 child_absent_
[block_index
][it
->second
] = false;
424 const PromotionGraph
* promotion_graph_
;
425 std::unordered_map
<std::string
, int> name_to_index_
;
426 std::vector
<std::shared_ptr
<ChunkedArrayBuilder
>> child_builders_
;
427 std::vector
<std::vector
<bool>> child_absent_
;
428 BufferVector null_bitmap_chunks_
;
429 std::vector
<int64_t> chunk_lengths_
;
432 Status
MakeChunkedArrayBuilder(const std::shared_ptr
<TaskGroup
>& task_group
,
433 MemoryPool
* pool
, const PromotionGraph
* promotion_graph
,
434 const std::shared_ptr
<DataType
>& type
,
435 std::shared_ptr
<ChunkedArrayBuilder
>* out
) {
436 if (type
->id() == Type::STRUCT
) {
437 std::vector
<std::pair
<std::string
, std::shared_ptr
<ChunkedArrayBuilder
>>>
439 for (const auto& f
: type
->fields()) {
440 std::shared_ptr
<ChunkedArrayBuilder
> child_builder
;
441 RETURN_NOT_OK(MakeChunkedArrayBuilder(task_group
, pool
, promotion_graph
, f
->type(),
443 child_builders
.emplace_back(f
->name(), std::move(child_builder
));
445 *out
= std::make_shared
<ChunkedStructArrayBuilder
>(task_group
, pool
, promotion_graph
,
446 std::move(child_builders
));
449 if (type
->id() == Type::LIST
) {
450 const auto& list_type
= checked_cast
<const ListType
&>(*type
);
451 std::shared_ptr
<ChunkedArrayBuilder
> value_builder
;
452 RETURN_NOT_OK(MakeChunkedArrayBuilder(task_group
, pool
, promotion_graph
,
453 list_type
.value_type(), &value_builder
));
454 *out
= std::make_shared
<ChunkedListArrayBuilder
>(
455 task_group
, pool
, std::move(value_builder
), list_type
.value_field());
458 std::shared_ptr
<Converter
> converter
;
459 RETURN_NOT_OK(MakeConverter(type
, pool
, &converter
));
460 if (promotion_graph
) {
461 *out
= std::make_shared
<InferringChunkedArrayBuilder
>(task_group
, promotion_graph
,
462 std::move(converter
));
464 *out
= std::make_shared
<TypedChunkedArrayBuilder
>(task_group
, std::move(converter
));