]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/cpp/src/parquet/arrow/schema.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / parquet / arrow / schema.cc
CommitLineData
1d09f67e
TL
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "parquet/arrow/schema.h"
19
20#include <functional>
21#include <string>
22#include <vector>
23
24#include "arrow/extension_type.h"
25#include "arrow/io/memory.h"
26#include "arrow/ipc/api.h"
27#include "arrow/result_internal.h"
28#include "arrow/type.h"
29#include "arrow/util/base64.h"
30#include "arrow/util/checked_cast.h"
31#include "arrow/util/key_value_metadata.h"
32#include "arrow/util/logging.h"
33#include "arrow/util/value_parsing.h"
34
35#include "parquet/arrow/schema_internal.h"
36#include "parquet/exception.h"
37#include "parquet/metadata.h"
38#include "parquet/properties.h"
39#include "parquet/types.h"
40
41using arrow::DecimalType;
42using arrow::Field;
43using arrow::FieldVector;
44using arrow::KeyValueMetadata;
45using arrow::Status;
46using arrow::internal::checked_cast;
47
48using ArrowType = arrow::DataType;
49using ArrowTypeId = arrow::Type;
50
51using parquet::Repetition;
52using parquet::schema::GroupNode;
53using parquet::schema::Node;
54using parquet::schema::NodePtr;
55using parquet::schema::PrimitiveNode;
56
57using ParquetType = parquet::Type;
58using parquet::ConvertedType;
59using parquet::LogicalType;
60
61using parquet::internal::LevelInfo;
62
63namespace parquet {
64
65namespace arrow {
66
67// ----------------------------------------------------------------------
68// Parquet to Arrow schema conversion
69
70namespace {
71
72Repetition::type RepetitionFromNullable(bool is_nullable) {
73 return is_nullable ? Repetition::OPTIONAL : Repetition::REQUIRED;
74}
75
76Status FieldToNode(const std::string& name, const std::shared_ptr<Field>& field,
77 const WriterProperties& properties,
78 const ArrowWriterProperties& arrow_properties, NodePtr* out);
79
80Status ListToNode(const std::shared_ptr<::arrow::BaseListType>& type,
81 const std::string& name, bool nullable,
82 const WriterProperties& properties,
83 const ArrowWriterProperties& arrow_properties, NodePtr* out) {
84 NodePtr element;
85 std::string value_name =
86 arrow_properties.compliant_nested_types() ? "element" : type->value_field()->name();
87 RETURN_NOT_OK(FieldToNode(value_name, type->value_field(), properties, arrow_properties,
88 &element));
89
90 NodePtr list = GroupNode::Make("list", Repetition::REPEATED, {element});
91 *out = GroupNode::Make(name, RepetitionFromNullable(nullable), {list},
92 LogicalType::List());
93 return Status::OK();
94}
95
96Status MapToNode(const std::shared_ptr<::arrow::MapType>& type, const std::string& name,
97 bool nullable, const WriterProperties& properties,
98 const ArrowWriterProperties& arrow_properties, NodePtr* out) {
99 // TODO: Should we offer a non-compliant mode that forwards the type names?
100 NodePtr key_node;
101 RETURN_NOT_OK(
102 FieldToNode("key", type->key_field(), properties, arrow_properties, &key_node));
103
104 NodePtr value_node;
105 RETURN_NOT_OK(FieldToNode("value", type->item_field(), properties, arrow_properties,
106 &value_node));
107
108 NodePtr key_value =
109 GroupNode::Make("key_value", Repetition::REPEATED, {key_node, value_node});
110 *out = GroupNode::Make(name, RepetitionFromNullable(nullable), {key_value},
111 LogicalType::Map());
112 return Status::OK();
113}
114
115Status StructToNode(const std::shared_ptr<::arrow::StructType>& type,
116 const std::string& name, bool nullable,
117 const WriterProperties& properties,
118 const ArrowWriterProperties& arrow_properties, NodePtr* out) {
119 std::vector<NodePtr> children(type->num_fields());
120 if (type->num_fields() != 0) {
121 for (int i = 0; i < type->num_fields(); i++) {
122 RETURN_NOT_OK(FieldToNode(type->field(i)->name(), type->field(i), properties,
123 arrow_properties, &children[i]));
124 }
125 } else {
126 // XXX (ARROW-10928) We could add a dummy primitive node but that would
127 // require special handling when writing and reading, to avoid column index
128 // mismatches.
129 return Status::NotImplemented("Cannot write struct type '", name,
130 "' with no child field to Parquet. "
131 "Consider adding a dummy child field.");
132 }
133
134 *out = GroupNode::Make(name, RepetitionFromNullable(nullable), std::move(children));
135 return Status::OK();
136}
137
138static std::shared_ptr<const LogicalType> TimestampLogicalTypeFromArrowTimestamp(
139 const ::arrow::TimestampType& timestamp_type, ::arrow::TimeUnit::type time_unit) {
140 const bool utc = !(timestamp_type.timezone().empty());
141 // ARROW-5878(wesm): for forward compatibility reasons, and because
142 // there's no other way to signal to old readers that values are
143 // timestamps, we force the ConvertedType field to be set to the
144 // corresponding TIMESTAMP_* value. This does cause some ambiguity
145 // as Parquet readers have not been consistent about the
146 // interpretation of TIMESTAMP_* values as being UTC-normalized.
147 switch (time_unit) {
148 case ::arrow::TimeUnit::MILLI:
149 return LogicalType::Timestamp(utc, LogicalType::TimeUnit::MILLIS,
150 /*is_from_converted_type=*/false,
151 /*force_set_converted_type=*/true);
152 case ::arrow::TimeUnit::MICRO:
153 return LogicalType::Timestamp(utc, LogicalType::TimeUnit::MICROS,
154 /*is_from_converted_type=*/false,
155 /*force_set_converted_type=*/true);
156 case ::arrow::TimeUnit::NANO:
157 return LogicalType::Timestamp(utc, LogicalType::TimeUnit::NANOS);
158 case ::arrow::TimeUnit::SECOND:
159 // No equivalent parquet logical type.
160 break;
161 }
162 return LogicalType::None();
163}
164
165static Status GetTimestampMetadata(const ::arrow::TimestampType& type,
166 const WriterProperties& properties,
167 const ArrowWriterProperties& arrow_properties,
168 ParquetType::type* physical_type,
169 std::shared_ptr<const LogicalType>* logical_type) {
170 const bool coerce = arrow_properties.coerce_timestamps_enabled();
171 const auto target_unit =
172 coerce ? arrow_properties.coerce_timestamps_unit() : type.unit();
173 const auto version = properties.version();
174
175 // The user is explicitly asking for Impala int96 encoding, there is no
176 // logical type.
177 if (arrow_properties.support_deprecated_int96_timestamps()) {
178 *physical_type = ParquetType::INT96;
179 return Status::OK();
180 }
181
182 *physical_type = ParquetType::INT64;
183 *logical_type = TimestampLogicalTypeFromArrowTimestamp(type, target_unit);
184
185 // The user is explicitly asking for timestamp data to be converted to the
186 // specified units (target_unit).
187 if (coerce) {
188 if (version == ::parquet::ParquetVersion::PARQUET_1_0 ||
189 version == ::parquet::ParquetVersion::PARQUET_2_4) {
190 switch (target_unit) {
191 case ::arrow::TimeUnit::MILLI:
192 case ::arrow::TimeUnit::MICRO:
193 break;
194 case ::arrow::TimeUnit::NANO:
195 case ::arrow::TimeUnit::SECOND:
196 return Status::NotImplemented("For Parquet version ",
197 ::parquet::ParquetVersionToString(version),
198 ", can only coerce Arrow timestamps to "
199 "milliseconds or microseconds");
200 }
201 } else {
202 switch (target_unit) {
203 case ::arrow::TimeUnit::MILLI:
204 case ::arrow::TimeUnit::MICRO:
205 case ::arrow::TimeUnit::NANO:
206 break;
207 case ::arrow::TimeUnit::SECOND:
208 return Status::NotImplemented("For Parquet version ",
209 ::parquet::ParquetVersionToString(version),
210 ", can only coerce Arrow timestamps to "
211 "milliseconds, microseconds, or nanoseconds");
212 }
213 }
214 return Status::OK();
215 }
216
217 // The user implicitly wants timestamp data to retain its original time units,
218 // however the ConvertedType field used to indicate logical types for Parquet
219 // version <= 2.4 fields does not allow for nanosecond time units and so nanoseconds
220 // must be coerced to microseconds.
221 if ((version == ::parquet::ParquetVersion::PARQUET_1_0 ||
222 version == ::parquet::ParquetVersion::PARQUET_2_4) &&
223 type.unit() == ::arrow::TimeUnit::NANO) {
224 *logical_type =
225 TimestampLogicalTypeFromArrowTimestamp(type, ::arrow::TimeUnit::MICRO);
226 return Status::OK();
227 }
228
229 // The user implicitly wants timestamp data to retain its original time units,
230 // however the Arrow seconds time unit can not be represented (annotated) in
231 // any version of Parquet and so must be coerced to milliseconds.
232 if (type.unit() == ::arrow::TimeUnit::SECOND) {
233 *logical_type =
234 TimestampLogicalTypeFromArrowTimestamp(type, ::arrow::TimeUnit::MILLI);
235 return Status::OK();
236 }
237
238 return Status::OK();
239}
240
241static constexpr char FIELD_ID_KEY[] = "PARQUET:field_id";
242
243std::shared_ptr<::arrow::KeyValueMetadata> FieldIdMetadata(int field_id) {
244 if (field_id >= 0) {
245 return ::arrow::key_value_metadata({FIELD_ID_KEY}, {std::to_string(field_id)});
246 } else {
247 return nullptr;
248 }
249}
250
251int FieldIdFromMetadata(
252 const std::shared_ptr<const ::arrow::KeyValueMetadata>& metadata) {
253 if (!metadata) {
254 return -1;
255 }
256 int key = metadata->FindKey(FIELD_ID_KEY);
257 if (key < 0) {
258 return -1;
259 }
260 std::string field_id_str = metadata->value(key);
261 int field_id;
262 if (::arrow::internal::ParseValue<::arrow::Int32Type>(
263 field_id_str.c_str(), field_id_str.length(), &field_id)) {
264 if (field_id < 0) {
265 // Thrift should convert any negative value to null but normalize to -1 here in case
266 // we later check this in logic.
267 return -1;
268 }
269 return field_id;
270 } else {
271 return -1;
272 }
273}
274
275Status FieldToNode(const std::string& name, const std::shared_ptr<Field>& field,
276 const WriterProperties& properties,
277 const ArrowWriterProperties& arrow_properties, NodePtr* out) {
278 std::shared_ptr<const LogicalType> logical_type = LogicalType::None();
279 ParquetType::type type;
280 Repetition::type repetition = RepetitionFromNullable(field->nullable());
281
282 int length = -1;
283 int precision = -1;
284 int scale = -1;
285
286 switch (field->type()->id()) {
287 case ArrowTypeId::NA: {
288 type = ParquetType::INT32;
289 logical_type = LogicalType::Null();
290 if (repetition != Repetition::OPTIONAL) {
291 return Status::Invalid("NullType Arrow field must be nullable");
292 }
293 } break;
294 case ArrowTypeId::BOOL:
295 type = ParquetType::BOOLEAN;
296 break;
297 case ArrowTypeId::UINT8:
298 type = ParquetType::INT32;
299 logical_type = LogicalType::Int(8, false);
300 break;
301 case ArrowTypeId::INT8:
302 type = ParquetType::INT32;
303 logical_type = LogicalType::Int(8, true);
304 break;
305 case ArrowTypeId::UINT16:
306 type = ParquetType::INT32;
307 logical_type = LogicalType::Int(16, false);
308 break;
309 case ArrowTypeId::INT16:
310 type = ParquetType::INT32;
311 logical_type = LogicalType::Int(16, true);
312 break;
313 case ArrowTypeId::UINT32:
314 if (properties.version() == ::parquet::ParquetVersion::PARQUET_1_0) {
315 type = ParquetType::INT64;
316 } else {
317 type = ParquetType::INT32;
318 logical_type = LogicalType::Int(32, false);
319 }
320 break;
321 case ArrowTypeId::INT32:
322 type = ParquetType::INT32;
323 break;
324 case ArrowTypeId::UINT64:
325 type = ParquetType::INT64;
326 logical_type = LogicalType::Int(64, false);
327 break;
328 case ArrowTypeId::INT64:
329 type = ParquetType::INT64;
330 break;
331 case ArrowTypeId::FLOAT:
332 type = ParquetType::FLOAT;
333 break;
334 case ArrowTypeId::DOUBLE:
335 type = ParquetType::DOUBLE;
336 break;
337 case ArrowTypeId::LARGE_STRING:
338 case ArrowTypeId::STRING:
339 type = ParquetType::BYTE_ARRAY;
340 logical_type = LogicalType::String();
341 break;
342 case ArrowTypeId::LARGE_BINARY:
343 case ArrowTypeId::BINARY:
344 type = ParquetType::BYTE_ARRAY;
345 break;
346 case ArrowTypeId::FIXED_SIZE_BINARY: {
347 type = ParquetType::FIXED_LEN_BYTE_ARRAY;
348 const auto& fixed_size_binary_type =
349 static_cast<const ::arrow::FixedSizeBinaryType&>(*field->type());
350 length = fixed_size_binary_type.byte_width();
351 } break;
352 case ArrowTypeId::DECIMAL128:
353 case ArrowTypeId::DECIMAL256: {
354 type = ParquetType::FIXED_LEN_BYTE_ARRAY;
355 const auto& decimal_type = static_cast<const ::arrow::DecimalType&>(*field->type());
356 precision = decimal_type.precision();
357 scale = decimal_type.scale();
358 length = DecimalType::DecimalSize(precision);
359 PARQUET_CATCH_NOT_OK(logical_type = LogicalType::Decimal(precision, scale));
360 } break;
361 case ArrowTypeId::DATE32:
362 type = ParquetType::INT32;
363 logical_type = LogicalType::Date();
364 break;
365 case ArrowTypeId::DATE64:
366 type = ParquetType::INT32;
367 logical_type = LogicalType::Date();
368 break;
369 case ArrowTypeId::TIMESTAMP:
370 RETURN_NOT_OK(
371 GetTimestampMetadata(static_cast<::arrow::TimestampType&>(*field->type()),
372 properties, arrow_properties, &type, &logical_type));
373 break;
374 case ArrowTypeId::TIME32:
375 type = ParquetType::INT32;
376 logical_type =
377 LogicalType::Time(/*is_adjusted_to_utc=*/true, LogicalType::TimeUnit::MILLIS);
378 break;
379 case ArrowTypeId::TIME64: {
380 type = ParquetType::INT64;
381 auto time_type = static_cast<::arrow::Time64Type*>(field->type().get());
382 if (time_type->unit() == ::arrow::TimeUnit::NANO) {
383 logical_type =
384 LogicalType::Time(/*is_adjusted_to_utc=*/true, LogicalType::TimeUnit::NANOS);
385 } else {
386 logical_type =
387 LogicalType::Time(/*is_adjusted_to_utc=*/true, LogicalType::TimeUnit::MICROS);
388 }
389 } break;
390 case ArrowTypeId::STRUCT: {
391 auto struct_type = std::static_pointer_cast<::arrow::StructType>(field->type());
392 return StructToNode(struct_type, name, field->nullable(), properties,
393 arrow_properties, out);
394 }
395 case ArrowTypeId::FIXED_SIZE_LIST:
396 case ArrowTypeId::LARGE_LIST:
397 case ArrowTypeId::LIST: {
398 auto list_type = std::static_pointer_cast<::arrow::BaseListType>(field->type());
399 return ListToNode(list_type, name, field->nullable(), properties, arrow_properties,
400 out);
401 }
402 case ArrowTypeId::DICTIONARY: {
403 // Parquet has no Dictionary type, dictionary-encoded is handled on
404 // the encoding, not the schema level.
405 const ::arrow::DictionaryType& dict_type =
406 static_cast<const ::arrow::DictionaryType&>(*field->type());
407 std::shared_ptr<::arrow::Field> unpacked_field = ::arrow::field(
408 name, dict_type.value_type(), field->nullable(), field->metadata());
409 return FieldToNode(name, unpacked_field, properties, arrow_properties, out);
410 }
411 case ArrowTypeId::EXTENSION: {
412 auto ext_type = std::static_pointer_cast<::arrow::ExtensionType>(field->type());
413 std::shared_ptr<::arrow::Field> storage_field = ::arrow::field(
414 name, ext_type->storage_type(), field->nullable(), field->metadata());
415 return FieldToNode(name, storage_field, properties, arrow_properties, out);
416 }
417 case ArrowTypeId::MAP: {
418 auto map_type = std::static_pointer_cast<::arrow::MapType>(field->type());
419 return MapToNode(map_type, name, field->nullable(), properties, arrow_properties,
420 out);
421 }
422
423 default: {
424 // TODO: DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL_TEXT, VARCHAR
425 return Status::NotImplemented(
426 "Unhandled type for Arrow to Parquet schema conversion: ",
427 field->type()->ToString());
428 }
429 }
430
431 int field_id = FieldIdFromMetadata(field->metadata());
432 PARQUET_CATCH_NOT_OK(*out = PrimitiveNode::Make(name, repetition, logical_type, type,
433 length, field_id));
434
435 return Status::OK();
436}
437
438struct SchemaTreeContext {
439 SchemaManifest* manifest;
440 ArrowReaderProperties properties;
441 const SchemaDescriptor* schema;
442
443 void LinkParent(const SchemaField* child, const SchemaField* parent) {
444 manifest->child_to_parent[child] = parent;
445 }
446
447 void RecordLeaf(const SchemaField* leaf) {
448 manifest->column_index_to_field[leaf->column_index] = leaf;
449 }
450};
451
452bool IsDictionaryReadSupported(const ArrowType& type) {
453 // Only supported currently for BYTE_ARRAY types
454 return type.id() == ::arrow::Type::BINARY || type.id() == ::arrow::Type::STRING;
455}
456
457// ----------------------------------------------------------------------
458// Schema logic
459
460::arrow::Result<std::shared_ptr<ArrowType>> GetTypeForNode(
461 int column_index, const schema::PrimitiveNode& primitive_node,
462 SchemaTreeContext* ctx) {
463 ASSIGN_OR_RAISE(
464 std::shared_ptr<ArrowType> storage_type,
465 GetArrowType(primitive_node, ctx->properties.coerce_int96_timestamp_unit()));
466 if (ctx->properties.read_dictionary(column_index) &&
467 IsDictionaryReadSupported(*storage_type)) {
468 return ::arrow::dictionary(::arrow::int32(), storage_type);
469 }
470 return storage_type;
471}
472
473Status NodeToSchemaField(const Node& node, LevelInfo current_levels,
474 SchemaTreeContext* ctx, const SchemaField* parent,
475 SchemaField* out);
476
477Status GroupToSchemaField(const GroupNode& node, LevelInfo current_levels,
478 SchemaTreeContext* ctx, const SchemaField* parent,
479 SchemaField* out);
480
481Status PopulateLeaf(int column_index, const std::shared_ptr<Field>& field,
482 LevelInfo current_levels, SchemaTreeContext* ctx,
483 const SchemaField* parent, SchemaField* out) {
484 out->field = field;
485 out->column_index = column_index;
486 out->level_info = current_levels;
487 ctx->RecordLeaf(out);
488 ctx->LinkParent(out, parent);
489 return Status::OK();
490}
491
492// Special case mentioned in the format spec:
493// If the name is array or ends in _tuple, this should be a list of struct
494// even for single child elements.
495bool HasStructListName(const GroupNode& node) {
496 ::arrow::util::string_view name{node.name()};
497 return name == "array" || name.ends_with("_tuple");
498}
499
500Status GroupToStruct(const GroupNode& node, LevelInfo current_levels,
501 SchemaTreeContext* ctx, const SchemaField* parent,
502 SchemaField* out) {
503 std::vector<std::shared_ptr<Field>> arrow_fields;
504 out->children.resize(node.field_count());
505 // All level increments for the node are expected to happen by callers.
506 // This is required because repeated elements need to have there own
507 // SchemaField.
508
509 for (int i = 0; i < node.field_count(); i++) {
510 RETURN_NOT_OK(
511 NodeToSchemaField(*node.field(i), current_levels, ctx, out, &out->children[i]));
512 arrow_fields.push_back(out->children[i].field);
513 }
514 auto struct_type = ::arrow::struct_(arrow_fields);
515 out->field = ::arrow::field(node.name(), struct_type, node.is_optional(),
516 FieldIdMetadata(node.field_id()));
517 out->level_info = current_levels;
518 return Status::OK();
519}
520
521Status ListToSchemaField(const GroupNode& group, LevelInfo current_levels,
522 SchemaTreeContext* ctx, const SchemaField* parent,
523 SchemaField* out);
524
525Status MapToSchemaField(const GroupNode& group, LevelInfo current_levels,
526 SchemaTreeContext* ctx, const SchemaField* parent,
527 SchemaField* out) {
528 if (group.field_count() != 1) {
529 return Status::Invalid("MAP-annotated groups must have a single child.");
530 }
531 if (group.is_repeated()) {
532 return Status::Invalid("MAP-annotated groups must not be repeated.");
533 }
534
535 const Node& key_value_node = *group.field(0);
536
537 if (!key_value_node.is_repeated()) {
538 return Status::Invalid(
539 "Non-repeated key value in a MAP-annotated group are not supported.");
540 }
541
542 if (!key_value_node.is_group()) {
543 return Status::Invalid("Key-value node must be a group.");
544 }
545
546 const GroupNode& key_value = checked_cast<const GroupNode&>(key_value_node);
547 if (key_value.field_count() != 1 && key_value.field_count() != 2) {
548 return Status::Invalid("Key-value map node must have 1 or 2 child elements. Found: ",
549 key_value.field_count());
550 }
551 const Node& key_node = *key_value.field(0);
552 if (!key_node.is_required()) {
553 return Status::Invalid("Map keys must be annotated as required.");
554 }
555 // Arrow doesn't support 1 column maps (i.e. Sets). The options are to either
556 // make the values column nullable, or process the map as a list. We choose the latter
557 // as it is simpler.
558 if (key_value.field_count() == 1) {
559 return ListToSchemaField(group, current_levels, ctx, parent, out);
560 }
561
562 current_levels.Increment(group);
563 int16_t repeated_ancestor_def_level = current_levels.IncrementRepeated();
564
565 out->children.resize(1);
566 SchemaField* key_value_field = &out->children[0];
567
568 key_value_field->children.resize(2);
569 SchemaField* key_field = &key_value_field->children[0];
570 SchemaField* value_field = &key_value_field->children[1];
571
572 ctx->LinkParent(out, parent);
573 ctx->LinkParent(key_value_field, out);
574 ctx->LinkParent(key_field, key_value_field);
575 ctx->LinkParent(value_field, key_value_field);
576
577 // required/optional group name=whatever {
578 // repeated group name=key_values{
579 // required TYPE key;
580 // required/optional TYPE value;
581 // }
582 // }
583 //
584
585 RETURN_NOT_OK(NodeToSchemaField(*key_value.field(0), current_levels, ctx,
586 key_value_field, key_field));
587 RETURN_NOT_OK(NodeToSchemaField(*key_value.field(1), current_levels, ctx,
588 key_value_field, value_field));
589
590 key_value_field->field = ::arrow::field(
591 group.name(), ::arrow::struct_({key_field->field, value_field->field}),
592 /*nullable=*/false, FieldIdMetadata(key_value.field_id()));
593 key_value_field->level_info = current_levels;
594
595 out->field = ::arrow::field(group.name(),
596 ::arrow::map(key_field->field->type(), value_field->field),
597 group.is_optional(), FieldIdMetadata(group.field_id()));
598 out->level_info = current_levels;
599 // At this point current levels contains the def level for this list,
600 // we need to reset to the prior parent.
601 out->level_info.repeated_ancestor_def_level = repeated_ancestor_def_level;
602 return Status::OK();
603}
604
605Status ListToSchemaField(const GroupNode& group, LevelInfo current_levels,
606 SchemaTreeContext* ctx, const SchemaField* parent,
607 SchemaField* out) {
608 if (group.field_count() != 1) {
609 return Status::Invalid("LIST-annotated groups must have a single child.");
610 }
611 if (group.is_repeated()) {
612 return Status::Invalid("LIST-annotated groups must not be repeated.");
613 }
614 current_levels.Increment(group);
615
616 out->children.resize(group.field_count());
617 SchemaField* child_field = &out->children[0];
618
619 ctx->LinkParent(out, parent);
620 ctx->LinkParent(child_field, out);
621
622 const Node& list_node = *group.field(0);
623
624 if (!list_node.is_repeated()) {
625 return Status::Invalid(
626 "Non-repeated nodes in a LIST-annotated group are not supported.");
627 }
628
629 int16_t repeated_ancestor_def_level = current_levels.IncrementRepeated();
630 if (list_node.is_group()) {
631 // Resolve 3-level encoding
632 //
633 // required/optional group name=whatever {
634 // repeated group name=list {
635 // required/optional TYPE item;
636 // }
637 // }
638 //
639 // yields list<item: TYPE ?nullable> ?nullable
640 //
641 // We distinguish the special case that we have
642 //
643 // required/optional group name=whatever {
644 // repeated group name=array or $SOMETHING_tuple {
645 // required/optional TYPE item;
646 // }
647 // }
648 //
649 // In this latter case, the inner type of the list should be a struct
650 // rather than a primitive value
651 //
652 // yields list<item: struct<item: TYPE ?nullable> not null> ?nullable
653 const auto& list_group = static_cast<const GroupNode&>(list_node);
654 // Special case mentioned in the format spec:
655 // If the name is array or ends in _tuple, this should be a list of struct
656 // even for single child elements.
657 if (list_group.field_count() == 1 && !HasStructListName(list_group)) {
658 // List of primitive type
659 RETURN_NOT_OK(
660 NodeToSchemaField(*list_group.field(0), current_levels, ctx, out, child_field));
661 } else {
662 RETURN_NOT_OK(GroupToStruct(list_group, current_levels, ctx, out, child_field));
663 }
664 } else {
665 // Two-level list encoding
666 //
667 // required/optional group LIST {
668 // repeated TYPE;
669 // }
670 const auto& primitive_node = static_cast<const PrimitiveNode&>(list_node);
671 int column_index = ctx->schema->GetColumnIndex(primitive_node);
672 ASSIGN_OR_RAISE(std::shared_ptr<ArrowType> type,
673 GetTypeForNode(column_index, primitive_node, ctx));
674 auto item_field = ::arrow::field(list_node.name(), type, /*nullable=*/false,
675 FieldIdMetadata(list_node.field_id()));
676 RETURN_NOT_OK(
677 PopulateLeaf(column_index, item_field, current_levels, ctx, out, child_field));
678 }
679 out->field = ::arrow::field(group.name(), ::arrow::list(child_field->field),
680 group.is_optional(), FieldIdMetadata(group.field_id()));
681 out->level_info = current_levels;
682 // At this point current levels contains the def level for this list,
683 // we need to reset to the prior parent.
684 out->level_info.repeated_ancestor_def_level = repeated_ancestor_def_level;
685 return Status::OK();
686}
687
688Status GroupToSchemaField(const GroupNode& node, LevelInfo current_levels,
689 SchemaTreeContext* ctx, const SchemaField* parent,
690 SchemaField* out) {
691 if (node.logical_type()->is_list()) {
692 return ListToSchemaField(node, current_levels, ctx, parent, out);
693 } else if (node.logical_type()->is_map()) {
694 return MapToSchemaField(node, current_levels, ctx, parent, out);
695 }
696 std::shared_ptr<ArrowType> type;
697 if (node.is_repeated()) {
698 // Simple repeated struct
699 //
700 // repeated group $NAME {
701 // r/o TYPE[0] f0
702 // r/o TYPE[1] f1
703 // }
704 out->children.resize(1);
705
706 int16_t repeated_ancestor_def_level = current_levels.IncrementRepeated();
707 RETURN_NOT_OK(GroupToStruct(node, current_levels, ctx, out, &out->children[0]));
708 out->field = ::arrow::field(node.name(), ::arrow::list(out->children[0].field),
709 /*nullable=*/false, FieldIdMetadata(node.field_id()));
710
711 ctx->LinkParent(&out->children[0], out);
712 out->level_info = current_levels;
713 // At this point current_levels contains this list as the def level, we need to
714 // use the previous ancenstor of thi slist.
715 out->level_info.repeated_ancestor_def_level = repeated_ancestor_def_level;
716 return Status::OK();
717 } else {
718 current_levels.Increment(node);
719 return GroupToStruct(node, current_levels, ctx, parent, out);
720 }
721}
722
723Status NodeToSchemaField(const Node& node, LevelInfo current_levels,
724 SchemaTreeContext* ctx, const SchemaField* parent,
725 SchemaField* out) {
726 // Workhorse function for converting a Parquet schema node to an Arrow
727 // type. Handles different conventions for nested data.
728
729 ctx->LinkParent(out, parent);
730
731 // Now, walk the schema and create a ColumnDescriptor for each leaf node
732 if (node.is_group()) {
733 // A nested field, but we don't know what kind yet
734 return GroupToSchemaField(static_cast<const GroupNode&>(node), current_levels, ctx,
735 parent, out);
736 } else {
737 // Either a normal flat primitive type, or a list type encoded with 1-level
738 // list encoding. Note that the 3-level encoding is the form recommended by
739 // the parquet specification, but technically we can have either
740 //
741 // required/optional $TYPE $FIELD_NAME
742 //
743 // or
744 //
745 // repeated $TYPE $FIELD_NAME
746 const auto& primitive_node = static_cast<const PrimitiveNode&>(node);
747 int column_index = ctx->schema->GetColumnIndex(primitive_node);
748 ASSIGN_OR_RAISE(std::shared_ptr<ArrowType> type,
749 GetTypeForNode(column_index, primitive_node, ctx));
750 if (node.is_repeated()) {
751 // One-level list encoding, e.g.
752 // a: repeated int32;
753 int16_t repeated_ancestor_def_level = current_levels.IncrementRepeated();
754 out->children.resize(1);
755 auto child_field = ::arrow::field(node.name(), type, /*nullable=*/false);
756 RETURN_NOT_OK(PopulateLeaf(column_index, child_field, current_levels, ctx, out,
757 &out->children[0]));
758
759 out->field = ::arrow::field(node.name(), ::arrow::list(child_field),
760 /*nullable=*/false, FieldIdMetadata(node.field_id()));
761 out->level_info = current_levels;
762 // At this point current_levels has consider this list the ancestor so restore
763 // the actual ancenstor.
764 out->level_info.repeated_ancestor_def_level = repeated_ancestor_def_level;
765 return Status::OK();
766 } else {
767 current_levels.Increment(node);
768 // A normal (required/optional) primitive node
769 return PopulateLeaf(column_index,
770 ::arrow::field(node.name(), type, node.is_optional(),
771 FieldIdMetadata(node.field_id())),
772 current_levels, ctx, parent, out);
773 }
774 }
775}
776
777// Get the original Arrow schema, as serialized in the Parquet metadata
778Status GetOriginSchema(const std::shared_ptr<const KeyValueMetadata>& metadata,
779 std::shared_ptr<const KeyValueMetadata>* clean_metadata,
780 std::shared_ptr<::arrow::Schema>* out) {
781 if (metadata == nullptr) {
782 *out = nullptr;
783 *clean_metadata = nullptr;
784 return Status::OK();
785 }
786
787 static const std::string kArrowSchemaKey = "ARROW:schema";
788 int schema_index = metadata->FindKey(kArrowSchemaKey);
789 if (schema_index == -1) {
790 *out = nullptr;
791 *clean_metadata = metadata;
792 return Status::OK();
793 }
794
795 // The original Arrow schema was serialized using the store_schema option.
796 // We deserialize it here and use it to inform read options such as
797 // dictionary-encoded fields.
798 auto decoded = ::arrow::util::base64_decode(metadata->value(schema_index));
799 auto schema_buf = std::make_shared<Buffer>(decoded);
800
801 ::arrow::ipc::DictionaryMemo dict_memo;
802 ::arrow::io::BufferReader input(schema_buf);
803
804 ARROW_ASSIGN_OR_RAISE(*out, ::arrow::ipc::ReadSchema(&input, &dict_memo));
805
806 if (metadata->size() > 1) {
807 // Copy the metadata without the schema key
808 auto new_metadata = ::arrow::key_value_metadata({}, {});
809 new_metadata->reserve(metadata->size() - 1);
810 for (int64_t i = 0; i < metadata->size(); ++i) {
811 if (i == schema_index) continue;
812 new_metadata->Append(metadata->key(i), metadata->value(i));
813 }
814 *clean_metadata = new_metadata;
815 } else {
816 // No other keys, let metadata be null
817 *clean_metadata = nullptr;
818 }
819 return Status::OK();
820}
821
822// Restore original Arrow field information that was serialized as Parquet metadata
823// but that is not necessarily present in the field reconstitued from Parquet data
824// (for example, Parquet timestamp types doesn't carry timezone information).
825
826Result<bool> ApplyOriginalMetadata(const Field& origin_field, SchemaField* inferred);
827
828std::function<std::shared_ptr<::arrow::DataType>(FieldVector)> GetNestedFactory(
829 const ArrowType& origin_type, const ArrowType& inferred_type) {
830 switch (inferred_type.id()) {
831 case ::arrow::Type::STRUCT:
832 if (origin_type.id() == ::arrow::Type::STRUCT) {
833 return ::arrow::struct_;
834 }
835 break;
836 case ::arrow::Type::LIST:
837 if (origin_type.id() == ::arrow::Type::LIST) {
838 return [](FieldVector fields) {
839 DCHECK_EQ(fields.size(), 1);
840 return ::arrow::list(std::move(fields[0]));
841 };
842 }
843 if (origin_type.id() == ::arrow::Type::LARGE_LIST) {
844 return [](FieldVector fields) {
845 DCHECK_EQ(fields.size(), 1);
846 return ::arrow::large_list(std::move(fields[0]));
847 };
848 }
849 if (origin_type.id() == ::arrow::Type::FIXED_SIZE_LIST) {
850 const auto list_size =
851 checked_cast<const ::arrow::FixedSizeListType&>(origin_type).list_size();
852 return [list_size](FieldVector fields) {
853 DCHECK_EQ(fields.size(), 1);
854 return ::arrow::fixed_size_list(std::move(fields[0]), list_size);
855 };
856 }
857 break;
858 default:
859 break;
860 }
861 return {};
862}
863
864Result<bool> ApplyOriginalStorageMetadata(const Field& origin_field,
865 SchemaField* inferred) {
866 bool modified = false;
867
868 auto origin_type = origin_field.type();
869 auto inferred_type = inferred->field->type();
870
871 const int num_children = inferred_type->num_fields();
872
873 if (num_children > 0 && origin_type->num_fields() == num_children) {
874 DCHECK_EQ(static_cast<int>(inferred->children.size()), num_children);
875 const auto factory = GetNestedFactory(*origin_type, *inferred_type);
876 if (factory) {
877 // The type may be modified (e.g. LargeList) while the children stay the same
878 modified |= origin_type->id() != inferred_type->id();
879
880 // Apply original metadata recursively to children
881 for (int i = 0; i < inferred_type->num_fields(); ++i) {
882 ARROW_ASSIGN_OR_RAISE(
883 const bool child_modified,
884 ApplyOriginalMetadata(*origin_type->field(i), &inferred->children[i]));
885 modified |= child_modified;
886 }
887 if (modified) {
888 // Recreate this field using the modified child fields
889 ::arrow::FieldVector modified_children(inferred_type->num_fields());
890 for (int i = 0; i < inferred_type->num_fields(); ++i) {
891 modified_children[i] = inferred->children[i].field;
892 }
893 inferred->field =
894 inferred->field->WithType(factory(std::move(modified_children)));
895 }
896 }
897 }
898
899 if (origin_type->id() == ::arrow::Type::TIMESTAMP &&
900 inferred_type->id() == ::arrow::Type::TIMESTAMP) {
901 // Restore time zone, if any
902 const auto& ts_type = checked_cast<const ::arrow::TimestampType&>(*inferred_type);
903 const auto& ts_origin_type =
904 checked_cast<const ::arrow::TimestampType&>(*origin_type);
905
906 // If the data is tz-aware, then set the original time zone, since Parquet
907 // has no native storage for timezones
908 if (ts_type.timezone() == "UTC" && ts_origin_type.timezone() != "") {
909 if (ts_type.unit() == ts_origin_type.unit()) {
910 inferred->field = inferred->field->WithType(origin_type);
911 } else {
912 auto ts_type_new = ::arrow::timestamp(ts_type.unit(), ts_origin_type.timezone());
913 inferred->field = inferred->field->WithType(ts_type_new);
914 }
915 }
916 modified = true;
917 }
918
919 if (origin_type->id() == ::arrow::Type::DICTIONARY &&
920 inferred_type->id() != ::arrow::Type::DICTIONARY &&
921 IsDictionaryReadSupported(*inferred_type)) {
922 // Direct dictionary reads are only suppored for a couple primitive types,
923 // so no need to recurse on value types.
924 const auto& dict_origin_type =
925 checked_cast<const ::arrow::DictionaryType&>(*origin_type);
926 inferred->field = inferred->field->WithType(
927 ::arrow::dictionary(::arrow::int32(), inferred_type, dict_origin_type.ordered()));
928 modified = true;
929 }
930
931 if ((origin_type->id() == ::arrow::Type::LARGE_BINARY &&
932 inferred_type->id() == ::arrow::Type::BINARY) ||
933 (origin_type->id() == ::arrow::Type::LARGE_STRING &&
934 inferred_type->id() == ::arrow::Type::STRING)) {
935 // Read back binary-like arrays with the intended offset width.
936 inferred->field = inferred->field->WithType(origin_type);
937 modified = true;
938 }
939
940 if (origin_type->id() == ::arrow::Type::DECIMAL256 &&
941 inferred_type->id() == ::arrow::Type::DECIMAL128) {
942 inferred->field = inferred->field->WithType(origin_type);
943 modified = true;
944 }
945
946 // Restore field metadata
947 std::shared_ptr<const KeyValueMetadata> field_metadata = origin_field.metadata();
948 if (field_metadata != nullptr) {
949 if (inferred->field->metadata()) {
950 // Prefer the metadata keys (like field_id) from the current metadata
951 field_metadata = field_metadata->Merge(*inferred->field->metadata());
952 }
953 inferred->field = inferred->field->WithMetadata(field_metadata);
954 modified = true;
955 }
956
957 return modified;
958}
959
960Result<bool> ApplyOriginalMetadata(const Field& origin_field, SchemaField* inferred) {
961 bool modified = false;
962
963 auto origin_type = origin_field.type();
964 auto inferred_type = inferred->field->type();
965
966 if (origin_type->id() == ::arrow::Type::EXTENSION) {
967 const auto& ex_type = checked_cast<const ::arrow::ExtensionType&>(*origin_type);
968 auto origin_storage_field = origin_field.WithType(ex_type.storage_type());
969
970 // Apply metadata recursively to storage type
971 RETURN_NOT_OK(ApplyOriginalStorageMetadata(*origin_storage_field, inferred));
972
973 // Restore extension type, if the storage type is the same as inferred
974 // from the Parquet type
975 if (ex_type.storage_type()->Equals(*inferred->field->type())) {
976 inferred->field = inferred->field->WithType(origin_type);
977 }
978 modified = true;
979 } else {
980 ARROW_ASSIGN_OR_RAISE(modified, ApplyOriginalStorageMetadata(origin_field, inferred));
981 }
982
983 return modified;
984}
985
986} // namespace
987
988Status FieldToNode(const std::shared_ptr<Field>& field,
989 const WriterProperties& properties,
990 const ArrowWriterProperties& arrow_properties, NodePtr* out) {
991 return FieldToNode(field->name(), field, properties, arrow_properties, out);
992}
993
994Status ToParquetSchema(const ::arrow::Schema* arrow_schema,
995 const WriterProperties& properties,
996 const ArrowWriterProperties& arrow_properties,
997 std::shared_ptr<SchemaDescriptor>* out) {
998 std::vector<NodePtr> nodes(arrow_schema->num_fields());
999 for (int i = 0; i < arrow_schema->num_fields(); i++) {
1000 RETURN_NOT_OK(
1001 FieldToNode(arrow_schema->field(i), properties, arrow_properties, &nodes[i]));
1002 }
1003
1004 NodePtr schema = GroupNode::Make("schema", Repetition::REQUIRED, nodes);
1005 *out = std::make_shared<::parquet::SchemaDescriptor>();
1006 PARQUET_CATCH_NOT_OK((*out)->Init(schema));
1007
1008 return Status::OK();
1009}
1010
1011Status ToParquetSchema(const ::arrow::Schema* arrow_schema,
1012 const WriterProperties& properties,
1013 std::shared_ptr<SchemaDescriptor>* out) {
1014 return ToParquetSchema(arrow_schema, properties, *default_arrow_writer_properties(),
1015 out);
1016}
1017
1018Status FromParquetSchema(
1019 const SchemaDescriptor* schema, const ArrowReaderProperties& properties,
1020 const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
1021 std::shared_ptr<::arrow::Schema>* out) {
1022 SchemaManifest manifest;
1023 RETURN_NOT_OK(SchemaManifest::Make(schema, key_value_metadata, properties, &manifest));
1024 std::vector<std::shared_ptr<Field>> fields(manifest.schema_fields.size());
1025
1026 for (int i = 0; i < static_cast<int>(fields.size()); i++) {
1027 const auto& schema_field = manifest.schema_fields[i];
1028 fields[i] = schema_field.field;
1029 }
1030 if (manifest.origin_schema) {
1031 // ARROW-8980: If the ARROW:schema was in the input metadata, then
1032 // manifest.origin_schema will have it scrubbed out
1033 *out = ::arrow::schema(fields, manifest.origin_schema->metadata());
1034 } else {
1035 *out = ::arrow::schema(fields, key_value_metadata);
1036 }
1037 return Status::OK();
1038}
1039
1040Status FromParquetSchema(const SchemaDescriptor* parquet_schema,
1041 const ArrowReaderProperties& properties,
1042 std::shared_ptr<::arrow::Schema>* out) {
1043 return FromParquetSchema(parquet_schema, properties, nullptr, out);
1044}
1045
1046Status FromParquetSchema(const SchemaDescriptor* parquet_schema,
1047 std::shared_ptr<::arrow::Schema>* out) {
1048 ArrowReaderProperties properties;
1049 return FromParquetSchema(parquet_schema, properties, nullptr, out);
1050}
1051
1052Status SchemaManifest::Make(const SchemaDescriptor* schema,
1053 const std::shared_ptr<const KeyValueMetadata>& metadata,
1054 const ArrowReaderProperties& properties,
1055 SchemaManifest* manifest) {
1056 SchemaTreeContext ctx;
1057 ctx.manifest = manifest;
1058 ctx.properties = properties;
1059 ctx.schema = schema;
1060 const GroupNode& schema_node = *schema->group_node();
1061 manifest->descr = schema;
1062 manifest->schema_fields.resize(schema_node.field_count());
1063
1064 // Try to deserialize original Arrow schema
1065 RETURN_NOT_OK(
1066 GetOriginSchema(metadata, &manifest->schema_metadata, &manifest->origin_schema));
1067 // Ignore original schema if it's not compatible with the Parquet schema
1068 if (manifest->origin_schema != nullptr &&
1069 manifest->origin_schema->num_fields() != schema_node.field_count()) {
1070 manifest->origin_schema = nullptr;
1071 }
1072
1073 for (int i = 0; i < static_cast<int>(schema_node.field_count()); ++i) {
1074 SchemaField* out_field = &manifest->schema_fields[i];
1075 RETURN_NOT_OK(NodeToSchemaField(*schema_node.field(i), LevelInfo(), &ctx,
1076 /*parent=*/nullptr, out_field));
1077
1078 // TODO(wesm): as follow up to ARROW-3246, we should really pass the origin
1079 // schema (if any) through all functions in the schema reconstruction, but
1080 // I'm being lazy and just setting dictionary fields at the top level for
1081 // now
1082 if (manifest->origin_schema == nullptr) {
1083 continue;
1084 }
1085
1086 auto origin_field = manifest->origin_schema->field(i);
1087 RETURN_NOT_OK(ApplyOriginalMetadata(*origin_field, out_field));
1088 }
1089 return Status::OK();
1090}
1091
1092} // namespace arrow
1093} // namespace parquet