]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | #include "parquet/arrow/schema.h" | |
19 | ||
20 | #include <functional> | |
21 | #include <string> | |
22 | #include <vector> | |
23 | ||
24 | #include "arrow/extension_type.h" | |
25 | #include "arrow/io/memory.h" | |
26 | #include "arrow/ipc/api.h" | |
27 | #include "arrow/result_internal.h" | |
28 | #include "arrow/type.h" | |
29 | #include "arrow/util/base64.h" | |
30 | #include "arrow/util/checked_cast.h" | |
31 | #include "arrow/util/key_value_metadata.h" | |
32 | #include "arrow/util/logging.h" | |
33 | #include "arrow/util/value_parsing.h" | |
34 | ||
35 | #include "parquet/arrow/schema_internal.h" | |
36 | #include "parquet/exception.h" | |
37 | #include "parquet/metadata.h" | |
38 | #include "parquet/properties.h" | |
39 | #include "parquet/types.h" | |
40 | ||
41 | using arrow::DecimalType; | |
42 | using arrow::Field; | |
43 | using arrow::FieldVector; | |
44 | using arrow::KeyValueMetadata; | |
45 | using arrow::Status; | |
46 | using arrow::internal::checked_cast; | |
47 | ||
48 | using ArrowType = arrow::DataType; | |
49 | using ArrowTypeId = arrow::Type; | |
50 | ||
51 | using parquet::Repetition; | |
52 | using parquet::schema::GroupNode; | |
53 | using parquet::schema::Node; | |
54 | using parquet::schema::NodePtr; | |
55 | using parquet::schema::PrimitiveNode; | |
56 | ||
57 | using ParquetType = parquet::Type; | |
58 | using parquet::ConvertedType; | |
59 | using parquet::LogicalType; | |
60 | ||
61 | using parquet::internal::LevelInfo; | |
62 | ||
63 | namespace parquet { | |
64 | ||
65 | namespace arrow { | |
66 | ||
67 | // ---------------------------------------------------------------------- | |
68 | // Parquet to Arrow schema conversion | |
69 | ||
70 | namespace { | |
71 | ||
72 | Repetition::type RepetitionFromNullable(bool is_nullable) { | |
73 | return is_nullable ? Repetition::OPTIONAL : Repetition::REQUIRED; | |
74 | } | |
75 | ||
76 | Status FieldToNode(const std::string& name, const std::shared_ptr<Field>& field, | |
77 | const WriterProperties& properties, | |
78 | const ArrowWriterProperties& arrow_properties, NodePtr* out); | |
79 | ||
80 | Status ListToNode(const std::shared_ptr<::arrow::BaseListType>& type, | |
81 | const std::string& name, bool nullable, | |
82 | const WriterProperties& properties, | |
83 | const ArrowWriterProperties& arrow_properties, NodePtr* out) { | |
84 | NodePtr element; | |
85 | std::string value_name = | |
86 | arrow_properties.compliant_nested_types() ? "element" : type->value_field()->name(); | |
87 | RETURN_NOT_OK(FieldToNode(value_name, type->value_field(), properties, arrow_properties, | |
88 | &element)); | |
89 | ||
90 | NodePtr list = GroupNode::Make("list", Repetition::REPEATED, {element}); | |
91 | *out = GroupNode::Make(name, RepetitionFromNullable(nullable), {list}, | |
92 | LogicalType::List()); | |
93 | return Status::OK(); | |
94 | } | |
95 | ||
96 | Status MapToNode(const std::shared_ptr<::arrow::MapType>& type, const std::string& name, | |
97 | bool nullable, const WriterProperties& properties, | |
98 | const ArrowWriterProperties& arrow_properties, NodePtr* out) { | |
99 | // TODO: Should we offer a non-compliant mode that forwards the type names? | |
100 | NodePtr key_node; | |
101 | RETURN_NOT_OK( | |
102 | FieldToNode("key", type->key_field(), properties, arrow_properties, &key_node)); | |
103 | ||
104 | NodePtr value_node; | |
105 | RETURN_NOT_OK(FieldToNode("value", type->item_field(), properties, arrow_properties, | |
106 | &value_node)); | |
107 | ||
108 | NodePtr key_value = | |
109 | GroupNode::Make("key_value", Repetition::REPEATED, {key_node, value_node}); | |
110 | *out = GroupNode::Make(name, RepetitionFromNullable(nullable), {key_value}, | |
111 | LogicalType::Map()); | |
112 | return Status::OK(); | |
113 | } | |
114 | ||
115 | Status StructToNode(const std::shared_ptr<::arrow::StructType>& type, | |
116 | const std::string& name, bool nullable, | |
117 | const WriterProperties& properties, | |
118 | const ArrowWriterProperties& arrow_properties, NodePtr* out) { | |
119 | std::vector<NodePtr> children(type->num_fields()); | |
120 | if (type->num_fields() != 0) { | |
121 | for (int i = 0; i < type->num_fields(); i++) { | |
122 | RETURN_NOT_OK(FieldToNode(type->field(i)->name(), type->field(i), properties, | |
123 | arrow_properties, &children[i])); | |
124 | } | |
125 | } else { | |
126 | // XXX (ARROW-10928) We could add a dummy primitive node but that would | |
127 | // require special handling when writing and reading, to avoid column index | |
128 | // mismatches. | |
129 | return Status::NotImplemented("Cannot write struct type '", name, | |
130 | "' with no child field to Parquet. " | |
131 | "Consider adding a dummy child field."); | |
132 | } | |
133 | ||
134 | *out = GroupNode::Make(name, RepetitionFromNullable(nullable), std::move(children)); | |
135 | return Status::OK(); | |
136 | } | |
137 | ||
138 | static std::shared_ptr<const LogicalType> TimestampLogicalTypeFromArrowTimestamp( | |
139 | const ::arrow::TimestampType& timestamp_type, ::arrow::TimeUnit::type time_unit) { | |
140 | const bool utc = !(timestamp_type.timezone().empty()); | |
141 | // ARROW-5878(wesm): for forward compatibility reasons, and because | |
142 | // there's no other way to signal to old readers that values are | |
143 | // timestamps, we force the ConvertedType field to be set to the | |
144 | // corresponding TIMESTAMP_* value. This does cause some ambiguity | |
145 | // as Parquet readers have not been consistent about the | |
146 | // interpretation of TIMESTAMP_* values as being UTC-normalized. | |
147 | switch (time_unit) { | |
148 | case ::arrow::TimeUnit::MILLI: | |
149 | return LogicalType::Timestamp(utc, LogicalType::TimeUnit::MILLIS, | |
150 | /*is_from_converted_type=*/false, | |
151 | /*force_set_converted_type=*/true); | |
152 | case ::arrow::TimeUnit::MICRO: | |
153 | return LogicalType::Timestamp(utc, LogicalType::TimeUnit::MICROS, | |
154 | /*is_from_converted_type=*/false, | |
155 | /*force_set_converted_type=*/true); | |
156 | case ::arrow::TimeUnit::NANO: | |
157 | return LogicalType::Timestamp(utc, LogicalType::TimeUnit::NANOS); | |
158 | case ::arrow::TimeUnit::SECOND: | |
159 | // No equivalent parquet logical type. | |
160 | break; | |
161 | } | |
162 | return LogicalType::None(); | |
163 | } | |
164 | ||
165 | static Status GetTimestampMetadata(const ::arrow::TimestampType& type, | |
166 | const WriterProperties& properties, | |
167 | const ArrowWriterProperties& arrow_properties, | |
168 | ParquetType::type* physical_type, | |
169 | std::shared_ptr<const LogicalType>* logical_type) { | |
170 | const bool coerce = arrow_properties.coerce_timestamps_enabled(); | |
171 | const auto target_unit = | |
172 | coerce ? arrow_properties.coerce_timestamps_unit() : type.unit(); | |
173 | const auto version = properties.version(); | |
174 | ||
175 | // The user is explicitly asking for Impala int96 encoding, there is no | |
176 | // logical type. | |
177 | if (arrow_properties.support_deprecated_int96_timestamps()) { | |
178 | *physical_type = ParquetType::INT96; | |
179 | return Status::OK(); | |
180 | } | |
181 | ||
182 | *physical_type = ParquetType::INT64; | |
183 | *logical_type = TimestampLogicalTypeFromArrowTimestamp(type, target_unit); | |
184 | ||
185 | // The user is explicitly asking for timestamp data to be converted to the | |
186 | // specified units (target_unit). | |
187 | if (coerce) { | |
188 | if (version == ::parquet::ParquetVersion::PARQUET_1_0 || | |
189 | version == ::parquet::ParquetVersion::PARQUET_2_4) { | |
190 | switch (target_unit) { | |
191 | case ::arrow::TimeUnit::MILLI: | |
192 | case ::arrow::TimeUnit::MICRO: | |
193 | break; | |
194 | case ::arrow::TimeUnit::NANO: | |
195 | case ::arrow::TimeUnit::SECOND: | |
196 | return Status::NotImplemented("For Parquet version ", | |
197 | ::parquet::ParquetVersionToString(version), | |
198 | ", can only coerce Arrow timestamps to " | |
199 | "milliseconds or microseconds"); | |
200 | } | |
201 | } else { | |
202 | switch (target_unit) { | |
203 | case ::arrow::TimeUnit::MILLI: | |
204 | case ::arrow::TimeUnit::MICRO: | |
205 | case ::arrow::TimeUnit::NANO: | |
206 | break; | |
207 | case ::arrow::TimeUnit::SECOND: | |
208 | return Status::NotImplemented("For Parquet version ", | |
209 | ::parquet::ParquetVersionToString(version), | |
210 | ", can only coerce Arrow timestamps to " | |
211 | "milliseconds, microseconds, or nanoseconds"); | |
212 | } | |
213 | } | |
214 | return Status::OK(); | |
215 | } | |
216 | ||
217 | // The user implicitly wants timestamp data to retain its original time units, | |
218 | // however the ConvertedType field used to indicate logical types for Parquet | |
219 | // version <= 2.4 fields does not allow for nanosecond time units and so nanoseconds | |
220 | // must be coerced to microseconds. | |
221 | if ((version == ::parquet::ParquetVersion::PARQUET_1_0 || | |
222 | version == ::parquet::ParquetVersion::PARQUET_2_4) && | |
223 | type.unit() == ::arrow::TimeUnit::NANO) { | |
224 | *logical_type = | |
225 | TimestampLogicalTypeFromArrowTimestamp(type, ::arrow::TimeUnit::MICRO); | |
226 | return Status::OK(); | |
227 | } | |
228 | ||
229 | // The user implicitly wants timestamp data to retain its original time units, | |
230 | // however the Arrow seconds time unit can not be represented (annotated) in | |
231 | // any version of Parquet and so must be coerced to milliseconds. | |
232 | if (type.unit() == ::arrow::TimeUnit::SECOND) { | |
233 | *logical_type = | |
234 | TimestampLogicalTypeFromArrowTimestamp(type, ::arrow::TimeUnit::MILLI); | |
235 | return Status::OK(); | |
236 | } | |
237 | ||
238 | return Status::OK(); | |
239 | } | |
240 | ||
241 | static constexpr char FIELD_ID_KEY[] = "PARQUET:field_id"; | |
242 | ||
243 | std::shared_ptr<::arrow::KeyValueMetadata> FieldIdMetadata(int field_id) { | |
244 | if (field_id >= 0) { | |
245 | return ::arrow::key_value_metadata({FIELD_ID_KEY}, {std::to_string(field_id)}); | |
246 | } else { | |
247 | return nullptr; | |
248 | } | |
249 | } | |
250 | ||
251 | int FieldIdFromMetadata( | |
252 | const std::shared_ptr<const ::arrow::KeyValueMetadata>& metadata) { | |
253 | if (!metadata) { | |
254 | return -1; | |
255 | } | |
256 | int key = metadata->FindKey(FIELD_ID_KEY); | |
257 | if (key < 0) { | |
258 | return -1; | |
259 | } | |
260 | std::string field_id_str = metadata->value(key); | |
261 | int field_id; | |
262 | if (::arrow::internal::ParseValue<::arrow::Int32Type>( | |
263 | field_id_str.c_str(), field_id_str.length(), &field_id)) { | |
264 | if (field_id < 0) { | |
265 | // Thrift should convert any negative value to null but normalize to -1 here in case | |
266 | // we later check this in logic. | |
267 | return -1; | |
268 | } | |
269 | return field_id; | |
270 | } else { | |
271 | return -1; | |
272 | } | |
273 | } | |
274 | ||
275 | Status FieldToNode(const std::string& name, const std::shared_ptr<Field>& field, | |
276 | const WriterProperties& properties, | |
277 | const ArrowWriterProperties& arrow_properties, NodePtr* out) { | |
278 | std::shared_ptr<const LogicalType> logical_type = LogicalType::None(); | |
279 | ParquetType::type type; | |
280 | Repetition::type repetition = RepetitionFromNullable(field->nullable()); | |
281 | ||
282 | int length = -1; | |
283 | int precision = -1; | |
284 | int scale = -1; | |
285 | ||
286 | switch (field->type()->id()) { | |
287 | case ArrowTypeId::NA: { | |
288 | type = ParquetType::INT32; | |
289 | logical_type = LogicalType::Null(); | |
290 | if (repetition != Repetition::OPTIONAL) { | |
291 | return Status::Invalid("NullType Arrow field must be nullable"); | |
292 | } | |
293 | } break; | |
294 | case ArrowTypeId::BOOL: | |
295 | type = ParquetType::BOOLEAN; | |
296 | break; | |
297 | case ArrowTypeId::UINT8: | |
298 | type = ParquetType::INT32; | |
299 | logical_type = LogicalType::Int(8, false); | |
300 | break; | |
301 | case ArrowTypeId::INT8: | |
302 | type = ParquetType::INT32; | |
303 | logical_type = LogicalType::Int(8, true); | |
304 | break; | |
305 | case ArrowTypeId::UINT16: | |
306 | type = ParquetType::INT32; | |
307 | logical_type = LogicalType::Int(16, false); | |
308 | break; | |
309 | case ArrowTypeId::INT16: | |
310 | type = ParquetType::INT32; | |
311 | logical_type = LogicalType::Int(16, true); | |
312 | break; | |
313 | case ArrowTypeId::UINT32: | |
314 | if (properties.version() == ::parquet::ParquetVersion::PARQUET_1_0) { | |
315 | type = ParquetType::INT64; | |
316 | } else { | |
317 | type = ParquetType::INT32; | |
318 | logical_type = LogicalType::Int(32, false); | |
319 | } | |
320 | break; | |
321 | case ArrowTypeId::INT32: | |
322 | type = ParquetType::INT32; | |
323 | break; | |
324 | case ArrowTypeId::UINT64: | |
325 | type = ParquetType::INT64; | |
326 | logical_type = LogicalType::Int(64, false); | |
327 | break; | |
328 | case ArrowTypeId::INT64: | |
329 | type = ParquetType::INT64; | |
330 | break; | |
331 | case ArrowTypeId::FLOAT: | |
332 | type = ParquetType::FLOAT; | |
333 | break; | |
334 | case ArrowTypeId::DOUBLE: | |
335 | type = ParquetType::DOUBLE; | |
336 | break; | |
337 | case ArrowTypeId::LARGE_STRING: | |
338 | case ArrowTypeId::STRING: | |
339 | type = ParquetType::BYTE_ARRAY; | |
340 | logical_type = LogicalType::String(); | |
341 | break; | |
342 | case ArrowTypeId::LARGE_BINARY: | |
343 | case ArrowTypeId::BINARY: | |
344 | type = ParquetType::BYTE_ARRAY; | |
345 | break; | |
346 | case ArrowTypeId::FIXED_SIZE_BINARY: { | |
347 | type = ParquetType::FIXED_LEN_BYTE_ARRAY; | |
348 | const auto& fixed_size_binary_type = | |
349 | static_cast<const ::arrow::FixedSizeBinaryType&>(*field->type()); | |
350 | length = fixed_size_binary_type.byte_width(); | |
351 | } break; | |
352 | case ArrowTypeId::DECIMAL128: | |
353 | case ArrowTypeId::DECIMAL256: { | |
354 | type = ParquetType::FIXED_LEN_BYTE_ARRAY; | |
355 | const auto& decimal_type = static_cast<const ::arrow::DecimalType&>(*field->type()); | |
356 | precision = decimal_type.precision(); | |
357 | scale = decimal_type.scale(); | |
358 | length = DecimalType::DecimalSize(precision); | |
359 | PARQUET_CATCH_NOT_OK(logical_type = LogicalType::Decimal(precision, scale)); | |
360 | } break; | |
361 | case ArrowTypeId::DATE32: | |
362 | type = ParquetType::INT32; | |
363 | logical_type = LogicalType::Date(); | |
364 | break; | |
365 | case ArrowTypeId::DATE64: | |
366 | type = ParquetType::INT32; | |
367 | logical_type = LogicalType::Date(); | |
368 | break; | |
369 | case ArrowTypeId::TIMESTAMP: | |
370 | RETURN_NOT_OK( | |
371 | GetTimestampMetadata(static_cast<::arrow::TimestampType&>(*field->type()), | |
372 | properties, arrow_properties, &type, &logical_type)); | |
373 | break; | |
374 | case ArrowTypeId::TIME32: | |
375 | type = ParquetType::INT32; | |
376 | logical_type = | |
377 | LogicalType::Time(/*is_adjusted_to_utc=*/true, LogicalType::TimeUnit::MILLIS); | |
378 | break; | |
379 | case ArrowTypeId::TIME64: { | |
380 | type = ParquetType::INT64; | |
381 | auto time_type = static_cast<::arrow::Time64Type*>(field->type().get()); | |
382 | if (time_type->unit() == ::arrow::TimeUnit::NANO) { | |
383 | logical_type = | |
384 | LogicalType::Time(/*is_adjusted_to_utc=*/true, LogicalType::TimeUnit::NANOS); | |
385 | } else { | |
386 | logical_type = | |
387 | LogicalType::Time(/*is_adjusted_to_utc=*/true, LogicalType::TimeUnit::MICROS); | |
388 | } | |
389 | } break; | |
390 | case ArrowTypeId::STRUCT: { | |
391 | auto struct_type = std::static_pointer_cast<::arrow::StructType>(field->type()); | |
392 | return StructToNode(struct_type, name, field->nullable(), properties, | |
393 | arrow_properties, out); | |
394 | } | |
395 | case ArrowTypeId::FIXED_SIZE_LIST: | |
396 | case ArrowTypeId::LARGE_LIST: | |
397 | case ArrowTypeId::LIST: { | |
398 | auto list_type = std::static_pointer_cast<::arrow::BaseListType>(field->type()); | |
399 | return ListToNode(list_type, name, field->nullable(), properties, arrow_properties, | |
400 | out); | |
401 | } | |
402 | case ArrowTypeId::DICTIONARY: { | |
403 | // Parquet has no Dictionary type, dictionary-encoded is handled on | |
404 | // the encoding, not the schema level. | |
405 | const ::arrow::DictionaryType& dict_type = | |
406 | static_cast<const ::arrow::DictionaryType&>(*field->type()); | |
407 | std::shared_ptr<::arrow::Field> unpacked_field = ::arrow::field( | |
408 | name, dict_type.value_type(), field->nullable(), field->metadata()); | |
409 | return FieldToNode(name, unpacked_field, properties, arrow_properties, out); | |
410 | } | |
411 | case ArrowTypeId::EXTENSION: { | |
412 | auto ext_type = std::static_pointer_cast<::arrow::ExtensionType>(field->type()); | |
413 | std::shared_ptr<::arrow::Field> storage_field = ::arrow::field( | |
414 | name, ext_type->storage_type(), field->nullable(), field->metadata()); | |
415 | return FieldToNode(name, storage_field, properties, arrow_properties, out); | |
416 | } | |
417 | case ArrowTypeId::MAP: { | |
418 | auto map_type = std::static_pointer_cast<::arrow::MapType>(field->type()); | |
419 | return MapToNode(map_type, name, field->nullable(), properties, arrow_properties, | |
420 | out); | |
421 | } | |
422 | ||
423 | default: { | |
424 | // TODO: DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL_TEXT, VARCHAR | |
425 | return Status::NotImplemented( | |
426 | "Unhandled type for Arrow to Parquet schema conversion: ", | |
427 | field->type()->ToString()); | |
428 | } | |
429 | } | |
430 | ||
431 | int field_id = FieldIdFromMetadata(field->metadata()); | |
432 | PARQUET_CATCH_NOT_OK(*out = PrimitiveNode::Make(name, repetition, logical_type, type, | |
433 | length, field_id)); | |
434 | ||
435 | return Status::OK(); | |
436 | } | |
437 | ||
438 | struct SchemaTreeContext { | |
439 | SchemaManifest* manifest; | |
440 | ArrowReaderProperties properties; | |
441 | const SchemaDescriptor* schema; | |
442 | ||
443 | void LinkParent(const SchemaField* child, const SchemaField* parent) { | |
444 | manifest->child_to_parent[child] = parent; | |
445 | } | |
446 | ||
447 | void RecordLeaf(const SchemaField* leaf) { | |
448 | manifest->column_index_to_field[leaf->column_index] = leaf; | |
449 | } | |
450 | }; | |
451 | ||
452 | bool IsDictionaryReadSupported(const ArrowType& type) { | |
453 | // Only supported currently for BYTE_ARRAY types | |
454 | return type.id() == ::arrow::Type::BINARY || type.id() == ::arrow::Type::STRING; | |
455 | } | |
456 | ||
457 | // ---------------------------------------------------------------------- | |
458 | // Schema logic | |
459 | ||
460 | ::arrow::Result<std::shared_ptr<ArrowType>> GetTypeForNode( | |
461 | int column_index, const schema::PrimitiveNode& primitive_node, | |
462 | SchemaTreeContext* ctx) { | |
463 | ASSIGN_OR_RAISE( | |
464 | std::shared_ptr<ArrowType> storage_type, | |
465 | GetArrowType(primitive_node, ctx->properties.coerce_int96_timestamp_unit())); | |
466 | if (ctx->properties.read_dictionary(column_index) && | |
467 | IsDictionaryReadSupported(*storage_type)) { | |
468 | return ::arrow::dictionary(::arrow::int32(), storage_type); | |
469 | } | |
470 | return storage_type; | |
471 | } | |
472 | ||
473 | Status NodeToSchemaField(const Node& node, LevelInfo current_levels, | |
474 | SchemaTreeContext* ctx, const SchemaField* parent, | |
475 | SchemaField* out); | |
476 | ||
477 | Status GroupToSchemaField(const GroupNode& node, LevelInfo current_levels, | |
478 | SchemaTreeContext* ctx, const SchemaField* parent, | |
479 | SchemaField* out); | |
480 | ||
481 | Status PopulateLeaf(int column_index, const std::shared_ptr<Field>& field, | |
482 | LevelInfo current_levels, SchemaTreeContext* ctx, | |
483 | const SchemaField* parent, SchemaField* out) { | |
484 | out->field = field; | |
485 | out->column_index = column_index; | |
486 | out->level_info = current_levels; | |
487 | ctx->RecordLeaf(out); | |
488 | ctx->LinkParent(out, parent); | |
489 | return Status::OK(); | |
490 | } | |
491 | ||
492 | // Special case mentioned in the format spec: | |
493 | // If the name is array or ends in _tuple, this should be a list of struct | |
494 | // even for single child elements. | |
495 | bool HasStructListName(const GroupNode& node) { | |
496 | ::arrow::util::string_view name{node.name()}; | |
497 | return name == "array" || name.ends_with("_tuple"); | |
498 | } | |
499 | ||
500 | Status GroupToStruct(const GroupNode& node, LevelInfo current_levels, | |
501 | SchemaTreeContext* ctx, const SchemaField* parent, | |
502 | SchemaField* out) { | |
503 | std::vector<std::shared_ptr<Field>> arrow_fields; | |
504 | out->children.resize(node.field_count()); | |
505 | // All level increments for the node are expected to happen by callers. | |
506 | // This is required because repeated elements need to have there own | |
507 | // SchemaField. | |
508 | ||
509 | for (int i = 0; i < node.field_count(); i++) { | |
510 | RETURN_NOT_OK( | |
511 | NodeToSchemaField(*node.field(i), current_levels, ctx, out, &out->children[i])); | |
512 | arrow_fields.push_back(out->children[i].field); | |
513 | } | |
514 | auto struct_type = ::arrow::struct_(arrow_fields); | |
515 | out->field = ::arrow::field(node.name(), struct_type, node.is_optional(), | |
516 | FieldIdMetadata(node.field_id())); | |
517 | out->level_info = current_levels; | |
518 | return Status::OK(); | |
519 | } | |
520 | ||
521 | Status ListToSchemaField(const GroupNode& group, LevelInfo current_levels, | |
522 | SchemaTreeContext* ctx, const SchemaField* parent, | |
523 | SchemaField* out); | |
524 | ||
525 | Status MapToSchemaField(const GroupNode& group, LevelInfo current_levels, | |
526 | SchemaTreeContext* ctx, const SchemaField* parent, | |
527 | SchemaField* out) { | |
528 | if (group.field_count() != 1) { | |
529 | return Status::Invalid("MAP-annotated groups must have a single child."); | |
530 | } | |
531 | if (group.is_repeated()) { | |
532 | return Status::Invalid("MAP-annotated groups must not be repeated."); | |
533 | } | |
534 | ||
535 | const Node& key_value_node = *group.field(0); | |
536 | ||
537 | if (!key_value_node.is_repeated()) { | |
538 | return Status::Invalid( | |
539 | "Non-repeated key value in a MAP-annotated group are not supported."); | |
540 | } | |
541 | ||
542 | if (!key_value_node.is_group()) { | |
543 | return Status::Invalid("Key-value node must be a group."); | |
544 | } | |
545 | ||
546 | const GroupNode& key_value = checked_cast<const GroupNode&>(key_value_node); | |
547 | if (key_value.field_count() != 1 && key_value.field_count() != 2) { | |
548 | return Status::Invalid("Key-value map node must have 1 or 2 child elements. Found: ", | |
549 | key_value.field_count()); | |
550 | } | |
551 | const Node& key_node = *key_value.field(0); | |
552 | if (!key_node.is_required()) { | |
553 | return Status::Invalid("Map keys must be annotated as required."); | |
554 | } | |
555 | // Arrow doesn't support 1 column maps (i.e. Sets). The options are to either | |
556 | // make the values column nullable, or process the map as a list. We choose the latter | |
557 | // as it is simpler. | |
558 | if (key_value.field_count() == 1) { | |
559 | return ListToSchemaField(group, current_levels, ctx, parent, out); | |
560 | } | |
561 | ||
562 | current_levels.Increment(group); | |
563 | int16_t repeated_ancestor_def_level = current_levels.IncrementRepeated(); | |
564 | ||
565 | out->children.resize(1); | |
566 | SchemaField* key_value_field = &out->children[0]; | |
567 | ||
568 | key_value_field->children.resize(2); | |
569 | SchemaField* key_field = &key_value_field->children[0]; | |
570 | SchemaField* value_field = &key_value_field->children[1]; | |
571 | ||
572 | ctx->LinkParent(out, parent); | |
573 | ctx->LinkParent(key_value_field, out); | |
574 | ctx->LinkParent(key_field, key_value_field); | |
575 | ctx->LinkParent(value_field, key_value_field); | |
576 | ||
577 | // required/optional group name=whatever { | |
578 | // repeated group name=key_values{ | |
579 | // required TYPE key; | |
580 | // required/optional TYPE value; | |
581 | // } | |
582 | // } | |
583 | // | |
584 | ||
585 | RETURN_NOT_OK(NodeToSchemaField(*key_value.field(0), current_levels, ctx, | |
586 | key_value_field, key_field)); | |
587 | RETURN_NOT_OK(NodeToSchemaField(*key_value.field(1), current_levels, ctx, | |
588 | key_value_field, value_field)); | |
589 | ||
590 | key_value_field->field = ::arrow::field( | |
591 | group.name(), ::arrow::struct_({key_field->field, value_field->field}), | |
592 | /*nullable=*/false, FieldIdMetadata(key_value.field_id())); | |
593 | key_value_field->level_info = current_levels; | |
594 | ||
595 | out->field = ::arrow::field(group.name(), | |
596 | ::arrow::map(key_field->field->type(), value_field->field), | |
597 | group.is_optional(), FieldIdMetadata(group.field_id())); | |
598 | out->level_info = current_levels; | |
599 | // At this point current levels contains the def level for this list, | |
600 | // we need to reset to the prior parent. | |
601 | out->level_info.repeated_ancestor_def_level = repeated_ancestor_def_level; | |
602 | return Status::OK(); | |
603 | } | |
604 | ||
605 | Status ListToSchemaField(const GroupNode& group, LevelInfo current_levels, | |
606 | SchemaTreeContext* ctx, const SchemaField* parent, | |
607 | SchemaField* out) { | |
608 | if (group.field_count() != 1) { | |
609 | return Status::Invalid("LIST-annotated groups must have a single child."); | |
610 | } | |
611 | if (group.is_repeated()) { | |
612 | return Status::Invalid("LIST-annotated groups must not be repeated."); | |
613 | } | |
614 | current_levels.Increment(group); | |
615 | ||
616 | out->children.resize(group.field_count()); | |
617 | SchemaField* child_field = &out->children[0]; | |
618 | ||
619 | ctx->LinkParent(out, parent); | |
620 | ctx->LinkParent(child_field, out); | |
621 | ||
622 | const Node& list_node = *group.field(0); | |
623 | ||
624 | if (!list_node.is_repeated()) { | |
625 | return Status::Invalid( | |
626 | "Non-repeated nodes in a LIST-annotated group are not supported."); | |
627 | } | |
628 | ||
629 | int16_t repeated_ancestor_def_level = current_levels.IncrementRepeated(); | |
630 | if (list_node.is_group()) { | |
631 | // Resolve 3-level encoding | |
632 | // | |
633 | // required/optional group name=whatever { | |
634 | // repeated group name=list { | |
635 | // required/optional TYPE item; | |
636 | // } | |
637 | // } | |
638 | // | |
639 | // yields list<item: TYPE ?nullable> ?nullable | |
640 | // | |
641 | // We distinguish the special case that we have | |
642 | // | |
643 | // required/optional group name=whatever { | |
644 | // repeated group name=array or $SOMETHING_tuple { | |
645 | // required/optional TYPE item; | |
646 | // } | |
647 | // } | |
648 | // | |
649 | // In this latter case, the inner type of the list should be a struct | |
650 | // rather than a primitive value | |
651 | // | |
652 | // yields list<item: struct<item: TYPE ?nullable> not null> ?nullable | |
653 | const auto& list_group = static_cast<const GroupNode&>(list_node); | |
654 | // Special case mentioned in the format spec: | |
655 | // If the name is array or ends in _tuple, this should be a list of struct | |
656 | // even for single child elements. | |
657 | if (list_group.field_count() == 1 && !HasStructListName(list_group)) { | |
658 | // List of primitive type | |
659 | RETURN_NOT_OK( | |
660 | NodeToSchemaField(*list_group.field(0), current_levels, ctx, out, child_field)); | |
661 | } else { | |
662 | RETURN_NOT_OK(GroupToStruct(list_group, current_levels, ctx, out, child_field)); | |
663 | } | |
664 | } else { | |
665 | // Two-level list encoding | |
666 | // | |
667 | // required/optional group LIST { | |
668 | // repeated TYPE; | |
669 | // } | |
670 | const auto& primitive_node = static_cast<const PrimitiveNode&>(list_node); | |
671 | int column_index = ctx->schema->GetColumnIndex(primitive_node); | |
672 | ASSIGN_OR_RAISE(std::shared_ptr<ArrowType> type, | |
673 | GetTypeForNode(column_index, primitive_node, ctx)); | |
674 | auto item_field = ::arrow::field(list_node.name(), type, /*nullable=*/false, | |
675 | FieldIdMetadata(list_node.field_id())); | |
676 | RETURN_NOT_OK( | |
677 | PopulateLeaf(column_index, item_field, current_levels, ctx, out, child_field)); | |
678 | } | |
679 | out->field = ::arrow::field(group.name(), ::arrow::list(child_field->field), | |
680 | group.is_optional(), FieldIdMetadata(group.field_id())); | |
681 | out->level_info = current_levels; | |
682 | // At this point current levels contains the def level for this list, | |
683 | // we need to reset to the prior parent. | |
684 | out->level_info.repeated_ancestor_def_level = repeated_ancestor_def_level; | |
685 | return Status::OK(); | |
686 | } | |
687 | ||
688 | Status GroupToSchemaField(const GroupNode& node, LevelInfo current_levels, | |
689 | SchemaTreeContext* ctx, const SchemaField* parent, | |
690 | SchemaField* out) { | |
691 | if (node.logical_type()->is_list()) { | |
692 | return ListToSchemaField(node, current_levels, ctx, parent, out); | |
693 | } else if (node.logical_type()->is_map()) { | |
694 | return MapToSchemaField(node, current_levels, ctx, parent, out); | |
695 | } | |
696 | std::shared_ptr<ArrowType> type; | |
697 | if (node.is_repeated()) { | |
698 | // Simple repeated struct | |
699 | // | |
700 | // repeated group $NAME { | |
701 | // r/o TYPE[0] f0 | |
702 | // r/o TYPE[1] f1 | |
703 | // } | |
704 | out->children.resize(1); | |
705 | ||
706 | int16_t repeated_ancestor_def_level = current_levels.IncrementRepeated(); | |
707 | RETURN_NOT_OK(GroupToStruct(node, current_levels, ctx, out, &out->children[0])); | |
708 | out->field = ::arrow::field(node.name(), ::arrow::list(out->children[0].field), | |
709 | /*nullable=*/false, FieldIdMetadata(node.field_id())); | |
710 | ||
711 | ctx->LinkParent(&out->children[0], out); | |
712 | out->level_info = current_levels; | |
713 | // At this point current_levels contains this list as the def level, we need to | |
714 | // use the previous ancenstor of thi slist. | |
715 | out->level_info.repeated_ancestor_def_level = repeated_ancestor_def_level; | |
716 | return Status::OK(); | |
717 | } else { | |
718 | current_levels.Increment(node); | |
719 | return GroupToStruct(node, current_levels, ctx, parent, out); | |
720 | } | |
721 | } | |
722 | ||
723 | Status NodeToSchemaField(const Node& node, LevelInfo current_levels, | |
724 | SchemaTreeContext* ctx, const SchemaField* parent, | |
725 | SchemaField* out) { | |
726 | // Workhorse function for converting a Parquet schema node to an Arrow | |
727 | // type. Handles different conventions for nested data. | |
728 | ||
729 | ctx->LinkParent(out, parent); | |
730 | ||
731 | // Now, walk the schema and create a ColumnDescriptor for each leaf node | |
732 | if (node.is_group()) { | |
733 | // A nested field, but we don't know what kind yet | |
734 | return GroupToSchemaField(static_cast<const GroupNode&>(node), current_levels, ctx, | |
735 | parent, out); | |
736 | } else { | |
737 | // Either a normal flat primitive type, or a list type encoded with 1-level | |
738 | // list encoding. Note that the 3-level encoding is the form recommended by | |
739 | // the parquet specification, but technically we can have either | |
740 | // | |
741 | // required/optional $TYPE $FIELD_NAME | |
742 | // | |
743 | // or | |
744 | // | |
745 | // repeated $TYPE $FIELD_NAME | |
746 | const auto& primitive_node = static_cast<const PrimitiveNode&>(node); | |
747 | int column_index = ctx->schema->GetColumnIndex(primitive_node); | |
748 | ASSIGN_OR_RAISE(std::shared_ptr<ArrowType> type, | |
749 | GetTypeForNode(column_index, primitive_node, ctx)); | |
750 | if (node.is_repeated()) { | |
751 | // One-level list encoding, e.g. | |
752 | // a: repeated int32; | |
753 | int16_t repeated_ancestor_def_level = current_levels.IncrementRepeated(); | |
754 | out->children.resize(1); | |
755 | auto child_field = ::arrow::field(node.name(), type, /*nullable=*/false); | |
756 | RETURN_NOT_OK(PopulateLeaf(column_index, child_field, current_levels, ctx, out, | |
757 | &out->children[0])); | |
758 | ||
759 | out->field = ::arrow::field(node.name(), ::arrow::list(child_field), | |
760 | /*nullable=*/false, FieldIdMetadata(node.field_id())); | |
761 | out->level_info = current_levels; | |
762 | // At this point current_levels has consider this list the ancestor so restore | |
763 | // the actual ancenstor. | |
764 | out->level_info.repeated_ancestor_def_level = repeated_ancestor_def_level; | |
765 | return Status::OK(); | |
766 | } else { | |
767 | current_levels.Increment(node); | |
768 | // A normal (required/optional) primitive node | |
769 | return PopulateLeaf(column_index, | |
770 | ::arrow::field(node.name(), type, node.is_optional(), | |
771 | FieldIdMetadata(node.field_id())), | |
772 | current_levels, ctx, parent, out); | |
773 | } | |
774 | } | |
775 | } | |
776 | ||
777 | // Get the original Arrow schema, as serialized in the Parquet metadata | |
778 | Status GetOriginSchema(const std::shared_ptr<const KeyValueMetadata>& metadata, | |
779 | std::shared_ptr<const KeyValueMetadata>* clean_metadata, | |
780 | std::shared_ptr<::arrow::Schema>* out) { | |
781 | if (metadata == nullptr) { | |
782 | *out = nullptr; | |
783 | *clean_metadata = nullptr; | |
784 | return Status::OK(); | |
785 | } | |
786 | ||
787 | static const std::string kArrowSchemaKey = "ARROW:schema"; | |
788 | int schema_index = metadata->FindKey(kArrowSchemaKey); | |
789 | if (schema_index == -1) { | |
790 | *out = nullptr; | |
791 | *clean_metadata = metadata; | |
792 | return Status::OK(); | |
793 | } | |
794 | ||
795 | // The original Arrow schema was serialized using the store_schema option. | |
796 | // We deserialize it here and use it to inform read options such as | |
797 | // dictionary-encoded fields. | |
798 | auto decoded = ::arrow::util::base64_decode(metadata->value(schema_index)); | |
799 | auto schema_buf = std::make_shared<Buffer>(decoded); | |
800 | ||
801 | ::arrow::ipc::DictionaryMemo dict_memo; | |
802 | ::arrow::io::BufferReader input(schema_buf); | |
803 | ||
804 | ARROW_ASSIGN_OR_RAISE(*out, ::arrow::ipc::ReadSchema(&input, &dict_memo)); | |
805 | ||
806 | if (metadata->size() > 1) { | |
807 | // Copy the metadata without the schema key | |
808 | auto new_metadata = ::arrow::key_value_metadata({}, {}); | |
809 | new_metadata->reserve(metadata->size() - 1); | |
810 | for (int64_t i = 0; i < metadata->size(); ++i) { | |
811 | if (i == schema_index) continue; | |
812 | new_metadata->Append(metadata->key(i), metadata->value(i)); | |
813 | } | |
814 | *clean_metadata = new_metadata; | |
815 | } else { | |
816 | // No other keys, let metadata be null | |
817 | *clean_metadata = nullptr; | |
818 | } | |
819 | return Status::OK(); | |
820 | } | |
821 | ||
822 | // Restore original Arrow field information that was serialized as Parquet metadata | |
823 | // but that is not necessarily present in the field reconstitued from Parquet data | |
824 | // (for example, Parquet timestamp types doesn't carry timezone information). | |
825 | ||
826 | Result<bool> ApplyOriginalMetadata(const Field& origin_field, SchemaField* inferred); | |
827 | ||
828 | std::function<std::shared_ptr<::arrow::DataType>(FieldVector)> GetNestedFactory( | |
829 | const ArrowType& origin_type, const ArrowType& inferred_type) { | |
830 | switch (inferred_type.id()) { | |
831 | case ::arrow::Type::STRUCT: | |
832 | if (origin_type.id() == ::arrow::Type::STRUCT) { | |
833 | return ::arrow::struct_; | |
834 | } | |
835 | break; | |
836 | case ::arrow::Type::LIST: | |
837 | if (origin_type.id() == ::arrow::Type::LIST) { | |
838 | return [](FieldVector fields) { | |
839 | DCHECK_EQ(fields.size(), 1); | |
840 | return ::arrow::list(std::move(fields[0])); | |
841 | }; | |
842 | } | |
843 | if (origin_type.id() == ::arrow::Type::LARGE_LIST) { | |
844 | return [](FieldVector fields) { | |
845 | DCHECK_EQ(fields.size(), 1); | |
846 | return ::arrow::large_list(std::move(fields[0])); | |
847 | }; | |
848 | } | |
849 | if (origin_type.id() == ::arrow::Type::FIXED_SIZE_LIST) { | |
850 | const auto list_size = | |
851 | checked_cast<const ::arrow::FixedSizeListType&>(origin_type).list_size(); | |
852 | return [list_size](FieldVector fields) { | |
853 | DCHECK_EQ(fields.size(), 1); | |
854 | return ::arrow::fixed_size_list(std::move(fields[0]), list_size); | |
855 | }; | |
856 | } | |
857 | break; | |
858 | default: | |
859 | break; | |
860 | } | |
861 | return {}; | |
862 | } | |
863 | ||
864 | Result<bool> ApplyOriginalStorageMetadata(const Field& origin_field, | |
865 | SchemaField* inferred) { | |
866 | bool modified = false; | |
867 | ||
868 | auto origin_type = origin_field.type(); | |
869 | auto inferred_type = inferred->field->type(); | |
870 | ||
871 | const int num_children = inferred_type->num_fields(); | |
872 | ||
873 | if (num_children > 0 && origin_type->num_fields() == num_children) { | |
874 | DCHECK_EQ(static_cast<int>(inferred->children.size()), num_children); | |
875 | const auto factory = GetNestedFactory(*origin_type, *inferred_type); | |
876 | if (factory) { | |
877 | // The type may be modified (e.g. LargeList) while the children stay the same | |
878 | modified |= origin_type->id() != inferred_type->id(); | |
879 | ||
880 | // Apply original metadata recursively to children | |
881 | for (int i = 0; i < inferred_type->num_fields(); ++i) { | |
882 | ARROW_ASSIGN_OR_RAISE( | |
883 | const bool child_modified, | |
884 | ApplyOriginalMetadata(*origin_type->field(i), &inferred->children[i])); | |
885 | modified |= child_modified; | |
886 | } | |
887 | if (modified) { | |
888 | // Recreate this field using the modified child fields | |
889 | ::arrow::FieldVector modified_children(inferred_type->num_fields()); | |
890 | for (int i = 0; i < inferred_type->num_fields(); ++i) { | |
891 | modified_children[i] = inferred->children[i].field; | |
892 | } | |
893 | inferred->field = | |
894 | inferred->field->WithType(factory(std::move(modified_children))); | |
895 | } | |
896 | } | |
897 | } | |
898 | ||
899 | if (origin_type->id() == ::arrow::Type::TIMESTAMP && | |
900 | inferred_type->id() == ::arrow::Type::TIMESTAMP) { | |
901 | // Restore time zone, if any | |
902 | const auto& ts_type = checked_cast<const ::arrow::TimestampType&>(*inferred_type); | |
903 | const auto& ts_origin_type = | |
904 | checked_cast<const ::arrow::TimestampType&>(*origin_type); | |
905 | ||
906 | // If the data is tz-aware, then set the original time zone, since Parquet | |
907 | // has no native storage for timezones | |
908 | if (ts_type.timezone() == "UTC" && ts_origin_type.timezone() != "") { | |
909 | if (ts_type.unit() == ts_origin_type.unit()) { | |
910 | inferred->field = inferred->field->WithType(origin_type); | |
911 | } else { | |
912 | auto ts_type_new = ::arrow::timestamp(ts_type.unit(), ts_origin_type.timezone()); | |
913 | inferred->field = inferred->field->WithType(ts_type_new); | |
914 | } | |
915 | } | |
916 | modified = true; | |
917 | } | |
918 | ||
919 | if (origin_type->id() == ::arrow::Type::DICTIONARY && | |
920 | inferred_type->id() != ::arrow::Type::DICTIONARY && | |
921 | IsDictionaryReadSupported(*inferred_type)) { | |
922 | // Direct dictionary reads are only suppored for a couple primitive types, | |
923 | // so no need to recurse on value types. | |
924 | const auto& dict_origin_type = | |
925 | checked_cast<const ::arrow::DictionaryType&>(*origin_type); | |
926 | inferred->field = inferred->field->WithType( | |
927 | ::arrow::dictionary(::arrow::int32(), inferred_type, dict_origin_type.ordered())); | |
928 | modified = true; | |
929 | } | |
930 | ||
931 | if ((origin_type->id() == ::arrow::Type::LARGE_BINARY && | |
932 | inferred_type->id() == ::arrow::Type::BINARY) || | |
933 | (origin_type->id() == ::arrow::Type::LARGE_STRING && | |
934 | inferred_type->id() == ::arrow::Type::STRING)) { | |
935 | // Read back binary-like arrays with the intended offset width. | |
936 | inferred->field = inferred->field->WithType(origin_type); | |
937 | modified = true; | |
938 | } | |
939 | ||
940 | if (origin_type->id() == ::arrow::Type::DECIMAL256 && | |
941 | inferred_type->id() == ::arrow::Type::DECIMAL128) { | |
942 | inferred->field = inferred->field->WithType(origin_type); | |
943 | modified = true; | |
944 | } | |
945 | ||
946 | // Restore field metadata | |
947 | std::shared_ptr<const KeyValueMetadata> field_metadata = origin_field.metadata(); | |
948 | if (field_metadata != nullptr) { | |
949 | if (inferred->field->metadata()) { | |
950 | // Prefer the metadata keys (like field_id) from the current metadata | |
951 | field_metadata = field_metadata->Merge(*inferred->field->metadata()); | |
952 | } | |
953 | inferred->field = inferred->field->WithMetadata(field_metadata); | |
954 | modified = true; | |
955 | } | |
956 | ||
957 | return modified; | |
958 | } | |
959 | ||
960 | Result<bool> ApplyOriginalMetadata(const Field& origin_field, SchemaField* inferred) { | |
961 | bool modified = false; | |
962 | ||
963 | auto origin_type = origin_field.type(); | |
964 | auto inferred_type = inferred->field->type(); | |
965 | ||
966 | if (origin_type->id() == ::arrow::Type::EXTENSION) { | |
967 | const auto& ex_type = checked_cast<const ::arrow::ExtensionType&>(*origin_type); | |
968 | auto origin_storage_field = origin_field.WithType(ex_type.storage_type()); | |
969 | ||
970 | // Apply metadata recursively to storage type | |
971 | RETURN_NOT_OK(ApplyOriginalStorageMetadata(*origin_storage_field, inferred)); | |
972 | ||
973 | // Restore extension type, if the storage type is the same as inferred | |
974 | // from the Parquet type | |
975 | if (ex_type.storage_type()->Equals(*inferred->field->type())) { | |
976 | inferred->field = inferred->field->WithType(origin_type); | |
977 | } | |
978 | modified = true; | |
979 | } else { | |
980 | ARROW_ASSIGN_OR_RAISE(modified, ApplyOriginalStorageMetadata(origin_field, inferred)); | |
981 | } | |
982 | ||
983 | return modified; | |
984 | } | |
985 | ||
986 | } // namespace | |
987 | ||
988 | Status FieldToNode(const std::shared_ptr<Field>& field, | |
989 | const WriterProperties& properties, | |
990 | const ArrowWriterProperties& arrow_properties, NodePtr* out) { | |
991 | return FieldToNode(field->name(), field, properties, arrow_properties, out); | |
992 | } | |
993 | ||
994 | Status ToParquetSchema(const ::arrow::Schema* arrow_schema, | |
995 | const WriterProperties& properties, | |
996 | const ArrowWriterProperties& arrow_properties, | |
997 | std::shared_ptr<SchemaDescriptor>* out) { | |
998 | std::vector<NodePtr> nodes(arrow_schema->num_fields()); | |
999 | for (int i = 0; i < arrow_schema->num_fields(); i++) { | |
1000 | RETURN_NOT_OK( | |
1001 | FieldToNode(arrow_schema->field(i), properties, arrow_properties, &nodes[i])); | |
1002 | } | |
1003 | ||
1004 | NodePtr schema = GroupNode::Make("schema", Repetition::REQUIRED, nodes); | |
1005 | *out = std::make_shared<::parquet::SchemaDescriptor>(); | |
1006 | PARQUET_CATCH_NOT_OK((*out)->Init(schema)); | |
1007 | ||
1008 | return Status::OK(); | |
1009 | } | |
1010 | ||
1011 | Status ToParquetSchema(const ::arrow::Schema* arrow_schema, | |
1012 | const WriterProperties& properties, | |
1013 | std::shared_ptr<SchemaDescriptor>* out) { | |
1014 | return ToParquetSchema(arrow_schema, properties, *default_arrow_writer_properties(), | |
1015 | out); | |
1016 | } | |
1017 | ||
1018 | Status FromParquetSchema( | |
1019 | const SchemaDescriptor* schema, const ArrowReaderProperties& properties, | |
1020 | const std::shared_ptr<const KeyValueMetadata>& key_value_metadata, | |
1021 | std::shared_ptr<::arrow::Schema>* out) { | |
1022 | SchemaManifest manifest; | |
1023 | RETURN_NOT_OK(SchemaManifest::Make(schema, key_value_metadata, properties, &manifest)); | |
1024 | std::vector<std::shared_ptr<Field>> fields(manifest.schema_fields.size()); | |
1025 | ||
1026 | for (int i = 0; i < static_cast<int>(fields.size()); i++) { | |
1027 | const auto& schema_field = manifest.schema_fields[i]; | |
1028 | fields[i] = schema_field.field; | |
1029 | } | |
1030 | if (manifest.origin_schema) { | |
1031 | // ARROW-8980: If the ARROW:schema was in the input metadata, then | |
1032 | // manifest.origin_schema will have it scrubbed out | |
1033 | *out = ::arrow::schema(fields, manifest.origin_schema->metadata()); | |
1034 | } else { | |
1035 | *out = ::arrow::schema(fields, key_value_metadata); | |
1036 | } | |
1037 | return Status::OK(); | |
1038 | } | |
1039 | ||
1040 | Status FromParquetSchema(const SchemaDescriptor* parquet_schema, | |
1041 | const ArrowReaderProperties& properties, | |
1042 | std::shared_ptr<::arrow::Schema>* out) { | |
1043 | return FromParquetSchema(parquet_schema, properties, nullptr, out); | |
1044 | } | |
1045 | ||
1046 | Status FromParquetSchema(const SchemaDescriptor* parquet_schema, | |
1047 | std::shared_ptr<::arrow::Schema>* out) { | |
1048 | ArrowReaderProperties properties; | |
1049 | return FromParquetSchema(parquet_schema, properties, nullptr, out); | |
1050 | } | |
1051 | ||
1052 | Status SchemaManifest::Make(const SchemaDescriptor* schema, | |
1053 | const std::shared_ptr<const KeyValueMetadata>& metadata, | |
1054 | const ArrowReaderProperties& properties, | |
1055 | SchemaManifest* manifest) { | |
1056 | SchemaTreeContext ctx; | |
1057 | ctx.manifest = manifest; | |
1058 | ctx.properties = properties; | |
1059 | ctx.schema = schema; | |
1060 | const GroupNode& schema_node = *schema->group_node(); | |
1061 | manifest->descr = schema; | |
1062 | manifest->schema_fields.resize(schema_node.field_count()); | |
1063 | ||
1064 | // Try to deserialize original Arrow schema | |
1065 | RETURN_NOT_OK( | |
1066 | GetOriginSchema(metadata, &manifest->schema_metadata, &manifest->origin_schema)); | |
1067 | // Ignore original schema if it's not compatible with the Parquet schema | |
1068 | if (manifest->origin_schema != nullptr && | |
1069 | manifest->origin_schema->num_fields() != schema_node.field_count()) { | |
1070 | manifest->origin_schema = nullptr; | |
1071 | } | |
1072 | ||
1073 | for (int i = 0; i < static_cast<int>(schema_node.field_count()); ++i) { | |
1074 | SchemaField* out_field = &manifest->schema_fields[i]; | |
1075 | RETURN_NOT_OK(NodeToSchemaField(*schema_node.field(i), LevelInfo(), &ctx, | |
1076 | /*parent=*/nullptr, out_field)); | |
1077 | ||
1078 | // TODO(wesm): as follow up to ARROW-3246, we should really pass the origin | |
1079 | // schema (if any) through all functions in the schema reconstruction, but | |
1080 | // I'm being lazy and just setting dictionary fields at the top level for | |
1081 | // now | |
1082 | if (manifest->origin_schema == nullptr) { | |
1083 | continue; | |
1084 | } | |
1085 | ||
1086 | auto origin_field = manifest->origin_schema->field(i); | |
1087 | RETURN_NOT_OK(ApplyOriginalMetadata(*origin_field, out_field)); | |
1088 | } | |
1089 | return Status::OK(); | |
1090 | } | |
1091 | ||
1092 | } // namespace arrow | |
1093 | } // namespace parquet |