1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
18 #include "parquet/metadata.h"
27 #include "arrow/io/memory.h"
28 #include "arrow/util/key_value_metadata.h"
29 #include "arrow/util/logging.h"
30 #include "arrow/util/string_view.h"
31 #include "parquet/encryption/encryption_internal.h"
32 #include "parquet/encryption/internal_file_decryptor.h"
33 #include "parquet/exception.h"
34 #include "parquet/schema.h"
35 #include "parquet/schema_internal.h"
36 #include "parquet/statistics.h"
37 #include "parquet/thrift_internal.h"
41 const ApplicationVersion
& ApplicationVersion::PARQUET_251_FIXED_VERSION() {
42 static ApplicationVersion
version("parquet-mr", 1, 8, 0);
46 const ApplicationVersion
& ApplicationVersion::PARQUET_816_FIXED_VERSION() {
47 static ApplicationVersion
version("parquet-mr", 1, 2, 9);
51 const ApplicationVersion
& ApplicationVersion::PARQUET_CPP_FIXED_STATS_VERSION() {
52 static ApplicationVersion
version("parquet-cpp", 1, 3, 0);
56 const ApplicationVersion
& ApplicationVersion::PARQUET_MR_FIXED_STATS_VERSION() {
57 static ApplicationVersion
version("parquet-mr", 1, 10, 0);
61 std::string
ParquetVersionToString(ParquetVersion::type ver
) {
63 case ParquetVersion::PARQUET_1_0
:
65 ARROW_SUPPRESS_DEPRECATION_WARNING
66 case ParquetVersion::PARQUET_2_0
:
68 ARROW_UNSUPPRESS_DEPRECATION_WARNING
69 case ParquetVersion::PARQUET_2_4
:
71 case ParquetVersion::PARQUET_2_6
:
75 // This should be unreachable
79 template <typename DType
>
80 static std::shared_ptr
<Statistics
> MakeTypedColumnStats(
81 const format::ColumnMetaData
& metadata
, const ColumnDescriptor
* descr
) {
82 // If ColumnOrder is defined, return max_value and min_value
83 if (descr
->column_order().get_order() == ColumnOrder::TYPE_DEFINED_ORDER
) {
84 return MakeStatistics
<DType
>(
85 descr
, metadata
.statistics
.min_value
, metadata
.statistics
.max_value
,
86 metadata
.num_values
- metadata
.statistics
.null_count
,
87 metadata
.statistics
.null_count
, metadata
.statistics
.distinct_count
,
88 metadata
.statistics
.__isset
.max_value
|| metadata
.statistics
.__isset
.min_value
,
89 metadata
.statistics
.__isset
.null_count
,
90 metadata
.statistics
.__isset
.distinct_count
);
93 return MakeStatistics
<DType
>(
94 descr
, metadata
.statistics
.min
, metadata
.statistics
.max
,
95 metadata
.num_values
- metadata
.statistics
.null_count
,
96 metadata
.statistics
.null_count
, metadata
.statistics
.distinct_count
,
97 metadata
.statistics
.__isset
.max
|| metadata
.statistics
.__isset
.min
,
98 metadata
.statistics
.__isset
.null_count
, metadata
.statistics
.__isset
.distinct_count
);
101 std::shared_ptr
<Statistics
> MakeColumnStats(const format::ColumnMetaData
& meta_data
,
102 const ColumnDescriptor
* descr
) {
103 switch (static_cast<Type::type
>(meta_data
.type
)) {
105 return MakeTypedColumnStats
<BooleanType
>(meta_data
, descr
);
107 return MakeTypedColumnStats
<Int32Type
>(meta_data
, descr
);
109 return MakeTypedColumnStats
<Int64Type
>(meta_data
, descr
);
111 return MakeTypedColumnStats
<Int96Type
>(meta_data
, descr
);
113 return MakeTypedColumnStats
<DoubleType
>(meta_data
, descr
);
115 return MakeTypedColumnStats
<FloatType
>(meta_data
, descr
);
116 case Type::BYTE_ARRAY
:
117 return MakeTypedColumnStats
<ByteArrayType
>(meta_data
, descr
);
118 case Type::FIXED_LEN_BYTE_ARRAY
:
119 return MakeTypedColumnStats
<FLBAType
>(meta_data
, descr
);
120 case Type::UNDEFINED
:
123 throw ParquetException("Can't decode page statistics for selected column type");
128 // ColumnCryptoMetaData
129 class ColumnCryptoMetaData::ColumnCryptoMetaDataImpl
{
131 explicit ColumnCryptoMetaDataImpl(const format::ColumnCryptoMetaData
* crypto_metadata
)
132 : crypto_metadata_(crypto_metadata
) {}
134 bool encrypted_with_footer_key() const {
135 return crypto_metadata_
->__isset
.ENCRYPTION_WITH_FOOTER_KEY
;
137 bool encrypted_with_column_key() const {
138 return crypto_metadata_
->__isset
.ENCRYPTION_WITH_COLUMN_KEY
;
140 std::shared_ptr
<schema::ColumnPath
> path_in_schema() const {
141 return std::make_shared
<schema::ColumnPath
>(
142 crypto_metadata_
->ENCRYPTION_WITH_COLUMN_KEY
.path_in_schema
);
144 const std::string
& key_metadata() const {
145 return crypto_metadata_
->ENCRYPTION_WITH_COLUMN_KEY
.key_metadata
;
149 const format::ColumnCryptoMetaData
* crypto_metadata_
;
152 std::unique_ptr
<ColumnCryptoMetaData
> ColumnCryptoMetaData::Make(
153 const uint8_t* metadata
) {
154 return std::unique_ptr
<ColumnCryptoMetaData
>(new ColumnCryptoMetaData(metadata
));
157 ColumnCryptoMetaData::ColumnCryptoMetaData(const uint8_t* metadata
)
158 : impl_(new ColumnCryptoMetaDataImpl(
159 reinterpret_cast<const format::ColumnCryptoMetaData
*>(metadata
))) {}
161 ColumnCryptoMetaData::~ColumnCryptoMetaData() = default;
163 std::shared_ptr
<schema::ColumnPath
> ColumnCryptoMetaData::path_in_schema() const {
164 return impl_
->path_in_schema();
166 bool ColumnCryptoMetaData::encrypted_with_footer_key() const {
167 return impl_
->encrypted_with_footer_key();
169 const std::string
& ColumnCryptoMetaData::key_metadata() const {
170 return impl_
->key_metadata();
173 // ColumnChunk metadata
174 class ColumnChunkMetaData::ColumnChunkMetaDataImpl
{
176 explicit ColumnChunkMetaDataImpl(const format::ColumnChunk
* column
,
177 const ColumnDescriptor
* descr
,
178 int16_t row_group_ordinal
, int16_t column_ordinal
,
179 const ApplicationVersion
* writer_version
,
180 std::shared_ptr
<InternalFileDecryptor
> file_decryptor
)
181 : column_(column
), descr_(descr
), writer_version_(writer_version
) {
182 column_metadata_
= &column
->meta_data
;
183 if (column
->__isset
.crypto_metadata
) { // column metadata is encrypted
184 format::ColumnCryptoMetaData ccmd
= column
->crypto_metadata
;
186 if (ccmd
.__isset
.ENCRYPTION_WITH_COLUMN_KEY
) {
187 if (file_decryptor
!= nullptr && file_decryptor
->properties() != nullptr) {
188 // should decrypt metadata
189 std::shared_ptr
<schema::ColumnPath
> path
= std::make_shared
<schema::ColumnPath
>(
190 ccmd
.ENCRYPTION_WITH_COLUMN_KEY
.path_in_schema
);
191 std::string key_metadata
= ccmd
.ENCRYPTION_WITH_COLUMN_KEY
.key_metadata
;
193 std::string aad_column_metadata
= encryption::CreateModuleAad(
194 file_decryptor
->file_aad(), encryption::kColumnMetaData
, row_group_ordinal
,
195 column_ordinal
, static_cast<int16_t>(-1));
196 auto decryptor
= file_decryptor
->GetColumnMetaDecryptor(
197 path
->ToDotString(), key_metadata
, aad_column_metadata
);
198 auto len
= static_cast<uint32_t>(column
->encrypted_column_metadata
.size());
199 DeserializeThriftMsg(
200 reinterpret_cast<const uint8_t*>(column
->encrypted_column_metadata
.c_str()),
201 &len
, &decrypted_metadata_
, decryptor
);
202 column_metadata_
= &decrypted_metadata_
;
204 throw ParquetException(
205 "Cannot decrypt ColumnMetadata."
206 " FileDecryption is not setup correctly");
210 for (const auto& encoding
: column_metadata_
->encodings
) {
211 encodings_
.push_back(LoadEnumSafe(&encoding
));
213 for (const auto& encoding_stats
: column_metadata_
->encoding_stats
) {
214 encoding_stats_
.push_back({LoadEnumSafe(&encoding_stats
.page_type
),
215 LoadEnumSafe(&encoding_stats
.encoding
),
216 encoding_stats
.count
});
218 possible_stats_
= nullptr;
221 bool Equals(const ColumnChunkMetaDataImpl
& other
) const {
222 return *column_metadata_
== *other
.column_metadata_
;
226 inline int64_t file_offset() const { return column_
->file_offset
; }
227 inline const std::string
& file_path() const { return column_
->file_path
; }
229 inline Type::type
type() const { return LoadEnumSafe(&column_metadata_
->type
); }
231 inline int64_t num_values() const { return column_metadata_
->num_values
; }
233 std::shared_ptr
<schema::ColumnPath
> path_in_schema() {
234 return std::make_shared
<schema::ColumnPath
>(column_metadata_
->path_in_schema
);
237 // Check if statistics are set and are valid
238 // 1) Must be set in the metadata
239 // 2) Statistics must not be corrupted
240 inline bool is_stats_set() const {
241 DCHECK(writer_version_
!= nullptr);
242 // If the column statistics don't exist or column sort order is unknown
243 // we cannot use the column stats
244 if (!column_metadata_
->__isset
.statistics
||
245 descr_
->sort_order() == SortOrder::UNKNOWN
) {
248 if (possible_stats_
== nullptr) {
249 possible_stats_
= MakeColumnStats(*column_metadata_
, descr_
);
251 EncodedStatistics encodedStatistics
= possible_stats_
->Encode();
252 return writer_version_
->HasCorrectStatistics(type(), encodedStatistics
,
253 descr_
->sort_order());
256 inline std::shared_ptr
<Statistics
> statistics() const {
257 return is_stats_set() ? possible_stats_
: nullptr;
260 inline Compression::type
compression() const {
261 return LoadEnumSafe(&column_metadata_
->codec
);
264 const std::vector
<Encoding::type
>& encodings() const { return encodings_
; }
266 const std::vector
<PageEncodingStats
>& encoding_stats() const { return encoding_stats_
; }
268 inline bool has_dictionary_page() const {
269 return column_metadata_
->__isset
.dictionary_page_offset
;
272 inline int64_t dictionary_page_offset() const {
273 return column_metadata_
->dictionary_page_offset
;
276 inline int64_t data_page_offset() const { return column_metadata_
->data_page_offset
; }
278 inline bool has_index_page() const {
279 return column_metadata_
->__isset
.index_page_offset
;
282 inline int64_t index_page_offset() const { return column_metadata_
->index_page_offset
; }
284 inline int64_t total_compressed_size() const {
285 return column_metadata_
->total_compressed_size
;
288 inline int64_t total_uncompressed_size() const {
289 return column_metadata_
->total_uncompressed_size
;
292 inline std::unique_ptr
<ColumnCryptoMetaData
> crypto_metadata() const {
293 if (column_
->__isset
.crypto_metadata
) {
294 return ColumnCryptoMetaData::Make(
295 reinterpret_cast<const uint8_t*>(&column_
->crypto_metadata
));
302 mutable std::shared_ptr
<Statistics
> possible_stats_
;
303 std::vector
<Encoding::type
> encodings_
;
304 std::vector
<PageEncodingStats
> encoding_stats_
;
305 const format::ColumnChunk
* column_
;
306 const format::ColumnMetaData
* column_metadata_
;
307 format::ColumnMetaData decrypted_metadata_
;
308 const ColumnDescriptor
* descr_
;
309 const ApplicationVersion
* writer_version_
;
312 std::unique_ptr
<ColumnChunkMetaData
> ColumnChunkMetaData::Make(
313 const void* metadata
, const ColumnDescriptor
* descr
,
314 const ApplicationVersion
* writer_version
, int16_t row_group_ordinal
,
315 int16_t column_ordinal
, std::shared_ptr
<InternalFileDecryptor
> file_decryptor
) {
316 return std::unique_ptr
<ColumnChunkMetaData
>(
317 new ColumnChunkMetaData(metadata
, descr
, row_group_ordinal
, column_ordinal
,
318 writer_version
, std::move(file_decryptor
)));
321 ColumnChunkMetaData::ColumnChunkMetaData(
322 const void* metadata
, const ColumnDescriptor
* descr
, int16_t row_group_ordinal
,
323 int16_t column_ordinal
, const ApplicationVersion
* writer_version
,
324 std::shared_ptr
<InternalFileDecryptor
> file_decryptor
)
325 : impl_
{new ColumnChunkMetaDataImpl(
326 reinterpret_cast<const format::ColumnChunk
*>(metadata
), descr
,
327 row_group_ordinal
, column_ordinal
, writer_version
, std::move(file_decryptor
))} {
330 ColumnChunkMetaData::~ColumnChunkMetaData() = default;
333 int64_t ColumnChunkMetaData::file_offset() const { return impl_
->file_offset(); }
335 const std::string
& ColumnChunkMetaData::file_path() const { return impl_
->file_path(); }
337 Type::type
ColumnChunkMetaData::type() const { return impl_
->type(); }
339 int64_t ColumnChunkMetaData::num_values() const { return impl_
->num_values(); }
341 std::shared_ptr
<schema::ColumnPath
> ColumnChunkMetaData::path_in_schema() const {
342 return impl_
->path_in_schema();
345 std::shared_ptr
<Statistics
> ColumnChunkMetaData::statistics() const {
346 return impl_
->statistics();
349 bool ColumnChunkMetaData::is_stats_set() const { return impl_
->is_stats_set(); }
351 bool ColumnChunkMetaData::has_dictionary_page() const {
352 return impl_
->has_dictionary_page();
355 int64_t ColumnChunkMetaData::dictionary_page_offset() const {
356 return impl_
->dictionary_page_offset();
359 int64_t ColumnChunkMetaData::data_page_offset() const {
360 return impl_
->data_page_offset();
363 bool ColumnChunkMetaData::has_index_page() const { return impl_
->has_index_page(); }
365 int64_t ColumnChunkMetaData::index_page_offset() const {
366 return impl_
->index_page_offset();
369 Compression::type
ColumnChunkMetaData::compression() const {
370 return impl_
->compression();
373 bool ColumnChunkMetaData::can_decompress() const {
374 return ::arrow::util::Codec::IsAvailable(compression());
377 const std::vector
<Encoding::type
>& ColumnChunkMetaData::encodings() const {
378 return impl_
->encodings();
381 const std::vector
<PageEncodingStats
>& ColumnChunkMetaData::encoding_stats() const {
382 return impl_
->encoding_stats();
385 int64_t ColumnChunkMetaData::total_uncompressed_size() const {
386 return impl_
->total_uncompressed_size();
389 int64_t ColumnChunkMetaData::total_compressed_size() const {
390 return impl_
->total_compressed_size();
393 std::unique_ptr
<ColumnCryptoMetaData
> ColumnChunkMetaData::crypto_metadata() const {
394 return impl_
->crypto_metadata();
397 bool ColumnChunkMetaData::Equals(const ColumnChunkMetaData
& other
) const {
398 return impl_
->Equals(*other
.impl_
);
401 // row-group metadata
402 class RowGroupMetaData::RowGroupMetaDataImpl
{
404 explicit RowGroupMetaDataImpl(const format::RowGroup
* row_group
,
405 const SchemaDescriptor
* schema
,
406 const ApplicationVersion
* writer_version
,
407 std::shared_ptr
<InternalFileDecryptor
> file_decryptor
)
408 : row_group_(row_group
),
410 writer_version_(writer_version
),
411 file_decryptor_(std::move(file_decryptor
)) {}
413 bool Equals(const RowGroupMetaDataImpl
& other
) const {
414 return *row_group_
== *other
.row_group_
;
417 inline int num_columns() const { return static_cast<int>(row_group_
->columns
.size()); }
419 inline int64_t num_rows() const { return row_group_
->num_rows
; }
421 inline int64_t total_byte_size() const { return row_group_
->total_byte_size
; }
423 inline int64_t total_compressed_size() const {
424 return row_group_
->total_compressed_size
;
427 inline int64_t file_offset() const { return row_group_
->file_offset
; }
429 inline const SchemaDescriptor
* schema() const { return schema_
; }
431 std::unique_ptr
<ColumnChunkMetaData
> ColumnChunk(int i
) {
432 if (i
< num_columns()) {
433 return ColumnChunkMetaData::Make(&row_group_
->columns
[i
], schema_
->Column(i
),
434 writer_version_
, row_group_
->ordinal
,
435 static_cast<int16_t>(i
), file_decryptor_
);
437 throw ParquetException("The file only has ", num_columns(),
438 " columns, requested metadata for column: ", i
);
442 const format::RowGroup
* row_group_
;
443 const SchemaDescriptor
* schema_
;
444 const ApplicationVersion
* writer_version_
;
445 std::shared_ptr
<InternalFileDecryptor
> file_decryptor_
;
448 std::unique_ptr
<RowGroupMetaData
> RowGroupMetaData::Make(
449 const void* metadata
, const SchemaDescriptor
* schema
,
450 const ApplicationVersion
* writer_version
,
451 std::shared_ptr
<InternalFileDecryptor
> file_decryptor
) {
452 return std::unique_ptr
<RowGroupMetaData
>(
453 new RowGroupMetaData(metadata
, schema
, writer_version
, std::move(file_decryptor
)));
456 RowGroupMetaData::RowGroupMetaData(const void* metadata
, const SchemaDescriptor
* schema
,
457 const ApplicationVersion
* writer_version
,
458 std::shared_ptr
<InternalFileDecryptor
> file_decryptor
)
459 : impl_
{new RowGroupMetaDataImpl(reinterpret_cast<const format::RowGroup
*>(metadata
),
460 schema
, writer_version
, std::move(file_decryptor
))} {
463 RowGroupMetaData::~RowGroupMetaData() = default;
465 bool RowGroupMetaData::Equals(const RowGroupMetaData
& other
) const {
466 return impl_
->Equals(*other
.impl_
);
469 int RowGroupMetaData::num_columns() const { return impl_
->num_columns(); }
471 int64_t RowGroupMetaData::num_rows() const { return impl_
->num_rows(); }
473 int64_t RowGroupMetaData::total_byte_size() const { return impl_
->total_byte_size(); }
475 int64_t RowGroupMetaData::total_compressed_size() const {
476 return impl_
->total_compressed_size();
479 int64_t RowGroupMetaData::file_offset() const { return impl_
->file_offset(); }
481 const SchemaDescriptor
* RowGroupMetaData::schema() const { return impl_
->schema(); }
483 std::unique_ptr
<ColumnChunkMetaData
> RowGroupMetaData::ColumnChunk(int i
) const {
484 return impl_
->ColumnChunk(i
);
487 bool RowGroupMetaData::can_decompress() const {
488 int n_columns
= num_columns();
489 for (int i
= 0; i
< n_columns
; i
++) {
490 if (!ColumnChunk(i
)->can_decompress()) {
498 class FileMetaData::FileMetaDataImpl
{
500 FileMetaDataImpl() = default;
502 explicit FileMetaDataImpl(
503 const void* metadata
, uint32_t* metadata_len
,
504 std::shared_ptr
<InternalFileDecryptor
> file_decryptor
= nullptr)
505 : file_decryptor_(file_decryptor
) {
506 metadata_
.reset(new format::FileMetaData
);
508 auto footer_decryptor
=
509 file_decryptor_
!= nullptr ? file_decryptor
->GetFooterDecryptor() : nullptr;
511 DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(metadata
), metadata_len
,
512 metadata_
.get(), footer_decryptor
);
513 metadata_len_
= *metadata_len
;
515 if (metadata_
->__isset
.created_by
) {
516 writer_version_
= ApplicationVersion(metadata_
->created_by
);
518 writer_version_
= ApplicationVersion("unknown 0.0.0");
523 InitKeyValueMetadata();
526 bool VerifySignature(const void* signature
) {
527 // verify decryption properties are set
528 if (file_decryptor_
== nullptr) {
529 throw ParquetException("Decryption not set properly. cannot verify signature");
531 // serialize the footer
532 uint8_t* serialized_data
;
533 uint32_t serialized_len
= metadata_len_
;
534 ThriftSerializer serializer
;
535 serializer
.SerializeToBuffer(metadata_
.get(), &serialized_len
, &serialized_data
);
537 // encrypt with nonce
538 auto nonce
= const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(signature
));
539 auto tag
= const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(signature
)) +
540 encryption::kNonceLength
;
542 std::string key
= file_decryptor_
->GetFooterKey();
543 std::string aad
= encryption::CreateFooterAad(file_decryptor_
->file_aad());
545 auto aes_encryptor
= encryption::AesEncryptor::Make(
546 file_decryptor_
->algorithm(), static_cast<int>(key
.size()), true, nullptr);
548 std::shared_ptr
<Buffer
> encrypted_buffer
= std::static_pointer_cast
<ResizableBuffer
>(
549 AllocateBuffer(file_decryptor_
->pool(),
550 aes_encryptor
->CiphertextSizeDelta() + serialized_len
));
551 uint32_t encrypted_len
= aes_encryptor
->SignedFooterEncrypt(
552 serialized_data
, serialized_len
, str2bytes(key
), static_cast<int>(key
.size()),
553 str2bytes(aad
), static_cast<int>(aad
.size()), nonce
,
554 encrypted_buffer
->mutable_data());
555 // Delete AES encryptor object. It was created only to verify the footer signature.
556 aes_encryptor
->WipeOut();
557 delete aes_encryptor
;
559 memcmp(encrypted_buffer
->data() + encrypted_len
- encryption::kGcmTagLength
,
560 tag
, encryption::kGcmTagLength
);
563 inline uint32_t size() const { return metadata_len_
; }
564 inline int num_columns() const { return schema_
.num_columns(); }
565 inline int64_t num_rows() const { return metadata_
->num_rows
; }
566 inline int num_row_groups() const {
567 return static_cast<int>(metadata_
->row_groups
.size());
569 inline int32_t version() const { return metadata_
->version
; }
570 inline const std::string
& created_by() const { return metadata_
->created_by
; }
571 inline int num_schema_elements() const {
572 return static_cast<int>(metadata_
->schema
.size());
575 inline bool is_encryption_algorithm_set() const {
576 return metadata_
->__isset
.encryption_algorithm
;
578 inline EncryptionAlgorithm
encryption_algorithm() {
579 return FromThrift(metadata_
->encryption_algorithm
);
581 inline const std::string
& footer_signing_key_metadata() {
582 return metadata_
->footer_signing_key_metadata
;
585 const ApplicationVersion
& writer_version() const { return writer_version_
; }
587 void WriteTo(::arrow::io::OutputStream
* dst
,
588 const std::shared_ptr
<Encryptor
>& encryptor
) const {
589 ThriftSerializer serializer
;
590 // Only in encrypted files with plaintext footers the
591 // encryption_algorithm is set in footer
592 if (is_encryption_algorithm_set()) {
593 uint8_t* serialized_data
;
594 uint32_t serialized_len
;
595 serializer
.SerializeToBuffer(metadata_
.get(), &serialized_len
, &serialized_data
);
597 // encrypt the footer key
598 std::vector
<uint8_t> encrypted_data(encryptor
->CiphertextSizeDelta() +
600 unsigned encrypted_len
=
601 encryptor
->Encrypt(serialized_data
, serialized_len
, encrypted_data
.data());
603 // write unencrypted footer
604 PARQUET_THROW_NOT_OK(dst
->Write(serialized_data
, serialized_len
));
605 // Write signature (nonce and tag)
606 PARQUET_THROW_NOT_OK(
607 dst
->Write(encrypted_data
.data() + 4, encryption::kNonceLength
));
608 PARQUET_THROW_NOT_OK(
609 dst
->Write(encrypted_data
.data() + encrypted_len
- encryption::kGcmTagLength
,
610 encryption::kGcmTagLength
));
611 } else { // either plaintext file (when encryptor is null)
612 // or encrypted file with encrypted footer
613 serializer
.Serialize(metadata_
.get(), dst
, encryptor
);
617 std::unique_ptr
<RowGroupMetaData
> RowGroup(int i
) {
618 if (!(i
< num_row_groups())) {
619 std::stringstream ss
;
620 ss
<< "The file only has " << num_row_groups()
621 << " row groups, requested metadata for row group: " << i
;
622 throw ParquetException(ss
.str());
624 return RowGroupMetaData::Make(&metadata_
->row_groups
[i
], &schema_
, &writer_version_
,
628 bool Equals(const FileMetaDataImpl
& other
) const {
629 return *metadata_
== *other
.metadata_
;
632 const SchemaDescriptor
* schema() const { return &schema_
; }
634 const std::shared_ptr
<const KeyValueMetadata
>& key_value_metadata() const {
635 return key_value_metadata_
;
638 void set_file_path(const std::string
& path
) {
639 for (format::RowGroup
& row_group
: metadata_
->row_groups
) {
640 for (format::ColumnChunk
& chunk
: row_group
.columns
) {
641 chunk
.__set_file_path(path
);
646 format::RowGroup
& row_group(int i
) {
647 DCHECK_LT(i
, num_row_groups());
648 return metadata_
->row_groups
[i
];
651 void AppendRowGroups(const std::unique_ptr
<FileMetaDataImpl
>& other
) {
652 if (!schema()->Equals(*other
->schema())) {
653 throw ParquetException("AppendRowGroups requires equal schemas.");
656 // ARROW-13654: `other` may point to self, be careful not to enter an infinite loop
657 const int n
= other
->num_row_groups();
658 metadata_
->row_groups
.reserve(metadata_
->row_groups
.size() + n
);
659 for (int i
= 0; i
< n
; i
++) {
660 format::RowGroup other_rg
= other
->row_group(i
);
661 metadata_
->num_rows
+= other_rg
.num_rows
;
662 metadata_
->row_groups
.push_back(std::move(other_rg
));
666 std::shared_ptr
<FileMetaData
> Subset(const std::vector
<int>& row_groups
) {
667 for (int i
: row_groups
) {
668 if (i
< num_row_groups()) continue;
670 throw ParquetException(
671 "The file only has ", num_row_groups(),
672 " row groups, but requested a subset including row group: ", i
);
675 std::shared_ptr
<FileMetaData
> out(new FileMetaData());
676 out
->impl_
.reset(new FileMetaDataImpl());
677 out
->impl_
->metadata_
.reset(new format::FileMetaData());
679 auto metadata
= out
->impl_
->metadata_
.get();
680 metadata
->version
= metadata_
->version
;
681 metadata
->schema
= metadata_
->schema
;
683 metadata
->row_groups
.resize(row_groups
.size());
685 for (int selected_index
: row_groups
) {
686 metadata
->num_rows
+= row_group(selected_index
).num_rows
;
687 metadata
->row_groups
[i
++] = row_group(selected_index
);
690 metadata
->key_value_metadata
= metadata_
->key_value_metadata
;
691 metadata
->created_by
= metadata_
->created_by
;
692 metadata
->column_orders
= metadata_
->column_orders
;
693 metadata
->encryption_algorithm
= metadata_
->encryption_algorithm
;
694 metadata
->footer_signing_key_metadata
= metadata_
->footer_signing_key_metadata
;
695 metadata
->__isset
= metadata_
->__isset
;
697 out
->impl_
->schema_
= schema_
;
698 out
->impl_
->writer_version_
= writer_version_
;
699 out
->impl_
->key_value_metadata_
= key_value_metadata_
;
700 out
->impl_
->file_decryptor_
= file_decryptor_
;
705 void set_file_decryptor(std::shared_ptr
<InternalFileDecryptor
> file_decryptor
) {
706 file_decryptor_
= file_decryptor
;
710 friend FileMetaDataBuilder
;
711 uint32_t metadata_len_
= 0;
712 std::unique_ptr
<format::FileMetaData
> metadata_
;
713 SchemaDescriptor schema_
;
714 ApplicationVersion writer_version_
;
715 std::shared_ptr
<const KeyValueMetadata
> key_value_metadata_
;
716 std::shared_ptr
<InternalFileDecryptor
> file_decryptor_
;
719 if (metadata_
->schema
.empty()) {
720 throw ParquetException("Empty file schema (no root)");
722 schema_
.Init(schema::Unflatten(&metadata_
->schema
[0],
723 static_cast<int>(metadata_
->schema
.size())));
726 void InitColumnOrders() {
727 // update ColumnOrder
728 std::vector
<parquet::ColumnOrder
> column_orders
;
729 if (metadata_
->__isset
.column_orders
) {
730 for (auto column_order
: metadata_
->column_orders
) {
731 if (column_order
.__isset
.TYPE_ORDER
) {
732 column_orders
.push_back(ColumnOrder::type_defined_
);
734 column_orders
.push_back(ColumnOrder::undefined_
);
738 column_orders
.resize(schema_
.num_columns(), ColumnOrder::undefined_
);
741 schema_
.updateColumnOrders(column_orders
);
744 void InitKeyValueMetadata() {
745 std::shared_ptr
<KeyValueMetadata
> metadata
= nullptr;
746 if (metadata_
->__isset
.key_value_metadata
) {
747 metadata
= std::make_shared
<KeyValueMetadata
>();
748 for (const auto& it
: metadata_
->key_value_metadata
) {
749 metadata
->Append(it
.key
, it
.value
);
752 key_value_metadata_
= std::move(metadata
);
756 std::shared_ptr
<FileMetaData
> FileMetaData::Make(
757 const void* metadata
, uint32_t* metadata_len
,
758 std::shared_ptr
<InternalFileDecryptor
> file_decryptor
) {
759 // This FileMetaData ctor is private, not compatible with std::make_shared
760 return std::shared_ptr
<FileMetaData
>(
761 new FileMetaData(metadata
, metadata_len
, file_decryptor
));
764 FileMetaData::FileMetaData(const void* metadata
, uint32_t* metadata_len
,
765 std::shared_ptr
<InternalFileDecryptor
> file_decryptor
)
766 : impl_
{std::unique_ptr
<FileMetaDataImpl
>(
767 new FileMetaDataImpl(metadata
, metadata_len
, file_decryptor
))} {}
769 FileMetaData::FileMetaData()
770 : impl_
{std::unique_ptr
<FileMetaDataImpl
>(new FileMetaDataImpl())} {}
772 FileMetaData::~FileMetaData() = default;
774 bool FileMetaData::Equals(const FileMetaData
& other
) const {
775 return impl_
->Equals(*other
.impl_
);
778 std::unique_ptr
<RowGroupMetaData
> FileMetaData::RowGroup(int i
) const {
779 return impl_
->RowGroup(i
);
782 bool FileMetaData::VerifySignature(const void* signature
) {
783 return impl_
->VerifySignature(signature
);
786 uint32_t FileMetaData::size() const { return impl_
->size(); }
788 int FileMetaData::num_columns() const { return impl_
->num_columns(); }
790 int64_t FileMetaData::num_rows() const { return impl_
->num_rows(); }
792 int FileMetaData::num_row_groups() const { return impl_
->num_row_groups(); }
794 bool FileMetaData::can_decompress() const {
795 int n_row_groups
= num_row_groups();
796 for (int i
= 0; i
< n_row_groups
; i
++) {
797 if (!RowGroup(i
)->can_decompress()) {
804 bool FileMetaData::is_encryption_algorithm_set() const {
805 return impl_
->is_encryption_algorithm_set();
808 EncryptionAlgorithm
FileMetaData::encryption_algorithm() const {
809 return impl_
->encryption_algorithm();
812 const std::string
& FileMetaData::footer_signing_key_metadata() const {
813 return impl_
->footer_signing_key_metadata();
816 void FileMetaData::set_file_decryptor(
817 std::shared_ptr
<InternalFileDecryptor
> file_decryptor
) {
818 impl_
->set_file_decryptor(file_decryptor
);
821 ParquetVersion::type
FileMetaData::version() const {
822 switch (impl_
->version()) {
824 return ParquetVersion::PARQUET_1_0
;
826 return ParquetVersion::PARQUET_2_LATEST
;
828 // Improperly set version, assuming Parquet 1.0
831 return ParquetVersion::PARQUET_1_0
;
834 const ApplicationVersion
& FileMetaData::writer_version() const {
835 return impl_
->writer_version();
838 const std::string
& FileMetaData::created_by() const { return impl_
->created_by(); }
840 int FileMetaData::num_schema_elements() const { return impl_
->num_schema_elements(); }
842 const SchemaDescriptor
* FileMetaData::schema() const { return impl_
->schema(); }
844 const std::shared_ptr
<const KeyValueMetadata
>& FileMetaData::key_value_metadata() const {
845 return impl_
->key_value_metadata();
848 void FileMetaData::set_file_path(const std::string
& path
) { impl_
->set_file_path(path
); }
850 void FileMetaData::AppendRowGroups(const FileMetaData
& other
) {
851 impl_
->AppendRowGroups(other
.impl_
);
854 std::shared_ptr
<FileMetaData
> FileMetaData::Subset(
855 const std::vector
<int>& row_groups
) const {
856 return impl_
->Subset(row_groups
);
859 void FileMetaData::WriteTo(::arrow::io::OutputStream
* dst
,
860 const std::shared_ptr
<Encryptor
>& encryptor
) const {
861 return impl_
->WriteTo(dst
, encryptor
);
864 class FileCryptoMetaData::FileCryptoMetaDataImpl
{
866 FileCryptoMetaDataImpl() = default;
868 explicit FileCryptoMetaDataImpl(const uint8_t* metadata
, uint32_t* metadata_len
) {
869 metadata_
.reset(new format::FileCryptoMetaData
);
870 DeserializeThriftMsg(metadata
, metadata_len
, metadata_
.get());
871 metadata_len_
= *metadata_len
;
874 EncryptionAlgorithm
encryption_algorithm() {
875 return FromThrift(metadata_
->encryption_algorithm
);
877 const std::string
& key_metadata() { return metadata_
->key_metadata
; }
878 void WriteTo(::arrow::io::OutputStream
* dst
) const {
879 ThriftSerializer serializer
;
880 serializer
.Serialize(metadata_
.get(), dst
);
884 friend FileMetaDataBuilder
;
885 std::unique_ptr
<format::FileCryptoMetaData
> metadata_
;
886 uint32_t metadata_len_
;
889 EncryptionAlgorithm
FileCryptoMetaData::encryption_algorithm() const {
890 return impl_
->encryption_algorithm();
893 const std::string
& FileCryptoMetaData::key_metadata() const {
894 return impl_
->key_metadata();
897 std::shared_ptr
<FileCryptoMetaData
> FileCryptoMetaData::Make(
898 const uint8_t* serialized_metadata
, uint32_t* metadata_len
) {
899 return std::shared_ptr
<FileCryptoMetaData
>(
900 new FileCryptoMetaData(serialized_metadata
, metadata_len
));
903 FileCryptoMetaData::FileCryptoMetaData(const uint8_t* serialized_metadata
,
904 uint32_t* metadata_len
)
905 : impl_(new FileCryptoMetaDataImpl(serialized_metadata
, metadata_len
)) {}
907 FileCryptoMetaData::FileCryptoMetaData() : impl_(new FileCryptoMetaDataImpl()) {}
909 FileCryptoMetaData::~FileCryptoMetaData() = default;
911 void FileCryptoMetaData::WriteTo(::arrow::io::OutputStream
* dst
) const {
915 std::string
FileMetaData::SerializeToString() const {
916 // We need to pass in an initial size. Since it will automatically
917 // increase the buffer size to hold the metadata, we just leave it 0.
918 PARQUET_ASSIGN_OR_THROW(auto serializer
, ::arrow::io::BufferOutputStream::Create(0));
919 WriteTo(serializer
.get());
920 PARQUET_ASSIGN_OR_THROW(auto metadata_buffer
, serializer
->Finish());
921 return metadata_buffer
->ToString();
924 ApplicationVersion::ApplicationVersion(std::string application
, int major
, int minor
,
926 : application_(std::move(application
)), version
{major
, minor
, patch
, "", "", ""} {}
929 // Parse the application version format and set parsed values to
930 // ApplicationVersion.
932 // The application version format must be compatible parquet-mr's
934 // * https://github.com/apache/parquet-mr/blob/master/parquet-common/src/main/java/org/apache/parquet/VersionParser.java
935 // * https://github.com/apache/parquet-mr/blob/master/parquet-common/src/main/java/org/apache/parquet/SemanticVersion.java
937 // The application version format:
938 // "${APPLICATION_NAME}"
939 // "${APPLICATION_NAME} version ${VERSION}"
940 // "${APPLICATION_NAME} version ${VERSION} (build ${BUILD_NAME})"
944 // parquet-cpp version 1.5.0ab-xyz5.5.0+cd
945 // parquet-cpp version 1.5.0ab-xyz5.5.0+cd (build abcd)
947 // The VERSION format:
949 // "${MAJOR}.${MINOR}"
950 // "${MAJOR}.${MINOR}.${PATCH}"
951 // "${MAJOR}.${MINOR}.${PATCH}${UNKNOWN}"
952 // "${MAJOR}.${MINOR}.${PATCH}${UNKNOWN}-${PRE_RELEASE}"
953 // "${MAJOR}.${MINOR}.${PATCH}${UNKNOWN}-${PRE_RELEASE}+${BUILD_INFO}"
954 // "${MAJOR}.${MINOR}.${PATCH}${UNKNOWN}+${BUILD_INFO}"
955 // "${MAJOR}.${MINOR}.${PATCH}-${PRE_RELEASE}"
956 // "${MAJOR}.${MINOR}.${PATCH}-${PRE_RELEASE}+${BUILD_INFO}"
957 // "${MAJOR}.${MINOR}.${PATCH}+${BUILD_INFO}"
965 // 1.5.0ab-cdh5.5.0+cd
970 class ApplicationVersionParser
{
972 ApplicationVersionParser(const std::string
& created_by
,
973 ApplicationVersion
& application_version
)
974 : created_by_(created_by
),
975 application_version_(application_version
),
976 spaces_(" \t\v\r\n\f"),
977 digits_("0123456789") {}
980 application_version_
.application_
= "unknown";
981 application_version_
.version
= {0, 0, 0, "", "", ""};
983 if (!ParseApplicationName()) {
986 if (!ParseVersion()) {
989 if (!ParseBuildName()) {
995 bool IsSpace(const std::string
& string
, const size_t& offset
) {
996 auto target
= ::arrow::util::string_view(string
).substr(offset
, 1);
997 return target
.find_first_of(spaces_
) != ::arrow::util::string_view::npos
;
1000 void RemovePrecedingSpaces(const std::string
& string
, size_t& start
,
1001 const size_t& end
) {
1002 while (start
< end
&& IsSpace(string
, start
)) {
1007 void RemoveTrailingSpaces(const std::string
& string
, const size_t& start
, size_t& end
) {
1008 while (start
< (end
- 1) && (end
- 1) < string
.size() && IsSpace(string
, end
- 1)) {
1013 bool ParseApplicationName() {
1014 std::string
version_mark(" version ");
1015 auto version_mark_position
= created_by_
.find(version_mark
);
1016 size_t application_name_end
;
1017 // No VERSION and BUILD_NAME.
1018 if (version_mark_position
== std::string::npos
) {
1019 version_start_
= std::string::npos
;
1020 application_name_end
= created_by_
.size();
1022 version_start_
= version_mark_position
+ version_mark
.size();
1023 application_name_end
= version_mark_position
;
1026 size_t application_name_start
= 0;
1027 RemovePrecedingSpaces(created_by_
, application_name_start
, application_name_end
);
1028 RemoveTrailingSpaces(created_by_
, application_name_start
, application_name_end
);
1029 application_version_
.application_
= created_by_
.substr(
1030 application_name_start
, application_name_end
- application_name_start
);
1035 bool ParseVersion() {
1037 if (version_start_
== std::string::npos
) {
1041 RemovePrecedingSpaces(created_by_
, version_start_
, created_by_
.size());
1042 version_end_
= created_by_
.find(" (", version_start_
);
1044 if (version_end_
== std::string::npos
) {
1045 version_end_
= created_by_
.size();
1047 RemoveTrailingSpaces(created_by_
, version_start_
, version_end_
);
1049 if (version_start_
== version_end_
) {
1052 version_string_
= created_by_
.substr(version_start_
, version_end_
- version_start_
);
1054 if (!ParseVersionMajor()) {
1057 if (!ParseVersionMinor()) {
1060 if (!ParseVersionPatch()) {
1063 if (!ParseVersionUnknown()) {
1066 if (!ParseVersionPreRelease()) {
1069 if (!ParseVersionBuildInfo()) {
1076 bool ParseVersionMajor() {
1077 size_t version_major_start
= 0;
1078 auto version_major_end
= version_string_
.find_first_not_of(digits_
);
1080 if (version_major_end
== std::string::npos
) {
1081 version_major_end
= version_string_
.size();
1082 version_parsing_position_
= version_major_end
;
1085 if (version_string_
[version_major_end
] != '.') {
1089 if (version_major_end
== version_major_start
) {
1092 version_parsing_position_
= version_major_end
+ 1; // +1 is for '.'.
1094 auto version_major_string
= version_string_
.substr(
1095 version_major_start
, version_major_end
- version_major_start
);
1096 application_version_
.version
.major
= atoi(version_major_string
.c_str());
1100 bool ParseVersionMinor() {
1101 auto version_minor_start
= version_parsing_position_
;
1102 auto version_minor_end
=
1103 version_string_
.find_first_not_of(digits_
, version_minor_start
);
1104 // MAJOR.MINOR only.
1105 if (version_minor_end
== std::string::npos
) {
1106 version_minor_end
= version_string_
.size();
1107 version_parsing_position_
= version_minor_end
;
1110 if (version_string_
[version_minor_end
] != '.') {
1114 if (version_minor_end
== version_minor_start
) {
1117 version_parsing_position_
= version_minor_end
+ 1; // +1 is for '.'.
1119 auto version_minor_string
= version_string_
.substr(
1120 version_minor_start
, version_minor_end
- version_minor_start
);
1121 application_version_
.version
.minor
= atoi(version_minor_string
.c_str());
1125 bool ParseVersionPatch() {
1126 auto version_patch_start
= version_parsing_position_
;
1127 auto version_patch_end
=
1128 version_string_
.find_first_not_of(digits_
, version_patch_start
);
1129 // No UNKNOWN, PRE_RELEASE and BUILD_INFO.
1130 if (version_patch_end
== std::string::npos
) {
1131 version_patch_end
= version_string_
.size();
1134 if (version_patch_end
== version_patch_start
) {
1137 auto version_patch_string
= version_string_
.substr(
1138 version_patch_start
, version_patch_end
- version_patch_start
);
1139 application_version_
.version
.patch
= atoi(version_patch_string
.c_str());
1140 version_parsing_position_
= version_patch_end
;
1144 bool ParseVersionUnknown() {
1146 if (version_parsing_position_
== version_string_
.size()) {
1149 auto version_unknown_start
= version_parsing_position_
;
1150 auto version_unknown_end
= version_string_
.find_first_of("-+", version_unknown_start
);
1151 // No PRE_RELEASE and BUILD_INFO
1152 if (version_unknown_end
== std::string::npos
) {
1153 version_unknown_end
= version_string_
.size();
1155 application_version_
.version
.unknown
= version_string_
.substr(
1156 version_unknown_start
, version_unknown_end
- version_unknown_start
);
1157 version_parsing_position_
= version_unknown_end
;
1161 bool ParseVersionPreRelease() {
1163 if (version_parsing_position_
== version_string_
.size() ||
1164 version_string_
[version_parsing_position_
] != '-') {
1168 auto version_pre_release_start
= version_parsing_position_
+ 1; // +1 is for '-'.
1169 auto version_pre_release_end
=
1170 version_string_
.find_first_of("+", version_pre_release_start
);
1172 if (version_pre_release_end
== std::string::npos
) {
1173 version_pre_release_end
= version_string_
.size();
1175 application_version_
.version
.pre_release
= version_string_
.substr(
1176 version_pre_release_start
, version_pre_release_end
- version_pre_release_start
);
1177 version_parsing_position_
= version_pre_release_end
;
1181 bool ParseVersionBuildInfo() {
1183 if (version_parsing_position_
== version_string_
.size() ||
1184 version_string_
[version_parsing_position_
] != '+') {
1188 auto version_build_info_start
= version_parsing_position_
+ 1; // +1 is for '+'.
1189 application_version_
.version
.build_info
=
1190 version_string_
.substr(version_build_info_start
);
1194 bool ParseBuildName() {
1195 std::string
build_mark(" (build ");
1196 auto build_mark_position
= created_by_
.find(build_mark
, version_end_
);
1198 if (build_mark_position
== std::string::npos
) {
1201 auto build_name_start
= build_mark_position
+ build_mark
.size();
1202 RemovePrecedingSpaces(created_by_
, build_name_start
, created_by_
.size());
1203 auto build_name_end
= created_by_
.find_first_of(")", build_name_start
);
1205 if (build_name_end
== std::string::npos
) {
1208 RemoveTrailingSpaces(created_by_
, build_name_start
, build_name_end
);
1209 application_version_
.build_
=
1210 created_by_
.substr(build_name_start
, build_name_end
- build_name_start
);
1215 const std::string
& created_by_
;
1216 ApplicationVersion
& application_version_
;
1219 std::string spaces_
;
1220 std::string digits_
;
1221 size_t version_parsing_position_
;
1222 size_t version_start_
;
1223 size_t version_end_
;
1224 std::string version_string_
;
1228 ApplicationVersion::ApplicationVersion(const std::string
& created_by
) {
1229 ApplicationVersionParser
parser(created_by
, *this);
1233 bool ApplicationVersion::VersionLt(const ApplicationVersion
& other_version
) const {
1234 if (application_
!= other_version
.application_
) return false;
1236 if (version
.major
< other_version
.version
.major
) return true;
1237 if (version
.major
> other_version
.version
.major
) return false;
1238 DCHECK_EQ(version
.major
, other_version
.version
.major
);
1239 if (version
.minor
< other_version
.version
.minor
) return true;
1240 if (version
.minor
> other_version
.version
.minor
) return false;
1241 DCHECK_EQ(version
.minor
, other_version
.version
.minor
);
1242 return version
.patch
< other_version
.version
.patch
;
1245 bool ApplicationVersion::VersionEq(const ApplicationVersion
& other_version
) const {
1246 return application_
== other_version
.application_
&&
1247 version
.major
== other_version
.version
.major
&&
1248 version
.minor
== other_version
.version
.minor
&&
1249 version
.patch
== other_version
.version
.patch
;
1253 // parquet-mr/parquet-column/src/main/java/org/apache/parquet/CorruptStatistics.java
1254 // PARQUET-686 has more discussion on statistics
1255 bool ApplicationVersion::HasCorrectStatistics(Type::type col_type
,
1256 EncodedStatistics
& statistics
,
1257 SortOrder::type sort_order
) const {
1258 // parquet-cpp version 1.3.0 and parquet-mr 1.10.0 onwards stats are computed
1259 // correctly for all types
1260 if ((application_
== "parquet-cpp" && VersionLt(PARQUET_CPP_FIXED_STATS_VERSION())) ||
1261 (application_
== "parquet-mr" && VersionLt(PARQUET_MR_FIXED_STATS_VERSION()))) {
1262 // Only SIGNED are valid unless max and min are the same
1263 // (in which case the sort order does not matter)
1264 bool max_equals_min
= statistics
.has_min
&& statistics
.has_max
1265 ? statistics
.min() == statistics
.max()
1267 if (SortOrder::SIGNED
!= sort_order
&& !max_equals_min
) {
1271 // Statistics of other types are OK
1272 if (col_type
!= Type::FIXED_LEN_BYTE_ARRAY
&& col_type
!= Type::BYTE_ARRAY
) {
1276 // created_by is not populated, which could have been caused by
1277 // parquet-mr during the same time as PARQUET-251, see PARQUET-297
1278 if (application_
== "unknown") {
1282 // Unknown sort order has incorrect stats
1283 if (SortOrder::UNKNOWN
== sort_order
) {
1288 if (VersionLt(PARQUET_251_FIXED_VERSION())) {
1295 // MetaData Builders
1296 // row-group metadata
1297 class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl
{
1299 explicit ColumnChunkMetaDataBuilderImpl(std::shared_ptr
<WriterProperties
> props
,
1300 const ColumnDescriptor
* column
)
1301 : owned_column_chunk_(new format::ColumnChunk
),
1302 properties_(std::move(props
)),
1304 Init(owned_column_chunk_
.get());
1307 explicit ColumnChunkMetaDataBuilderImpl(std::shared_ptr
<WriterProperties
> props
,
1308 const ColumnDescriptor
* column
,
1309 format::ColumnChunk
* column_chunk
)
1310 : properties_(std::move(props
)), column_(column
) {
1314 const void* contents() const { return column_chunk_
; }
1317 void set_file_path(const std::string
& val
) { column_chunk_
->__set_file_path(val
); }
1320 void SetStatistics(const EncodedStatistics
& val
) {
1321 column_chunk_
->meta_data
.__set_statistics(ToThrift(val
));
1324 void Finish(int64_t num_values
, int64_t dictionary_page_offset
,
1325 int64_t index_page_offset
, int64_t data_page_offset
,
1326 int64_t compressed_size
, int64_t uncompressed_size
, bool has_dictionary
,
1327 bool dictionary_fallback
,
1328 const std::map
<Encoding::type
, int32_t>& dict_encoding_stats
,
1329 const std::map
<Encoding::type
, int32_t>& data_encoding_stats
,
1330 const std::shared_ptr
<Encryptor
>& encryptor
) {
1331 if (dictionary_page_offset
> 0) {
1332 column_chunk_
->meta_data
.__set_dictionary_page_offset(dictionary_page_offset
);
1333 column_chunk_
->__set_file_offset(dictionary_page_offset
+ compressed_size
);
1335 column_chunk_
->__set_file_offset(data_page_offset
+ compressed_size
);
1337 column_chunk_
->__isset
.meta_data
= true;
1338 column_chunk_
->meta_data
.__set_num_values(num_values
);
1339 if (index_page_offset
>= 0) {
1340 column_chunk_
->meta_data
.__set_index_page_offset(index_page_offset
);
1342 column_chunk_
->meta_data
.__set_data_page_offset(data_page_offset
);
1343 column_chunk_
->meta_data
.__set_total_uncompressed_size(uncompressed_size
);
1344 column_chunk_
->meta_data
.__set_total_compressed_size(compressed_size
);
1346 std::vector
<format::Encoding::type
> thrift_encodings
;
1347 if (has_dictionary
) {
1348 thrift_encodings
.push_back(ToThrift(properties_
->dictionary_index_encoding()));
1349 if (properties_
->version() == ParquetVersion::PARQUET_1_0
) {
1350 thrift_encodings
.push_back(ToThrift(Encoding::PLAIN
));
1352 thrift_encodings
.push_back(ToThrift(properties_
->dictionary_page_encoding()));
1354 } else { // Dictionary not enabled
1355 thrift_encodings
.push_back(ToThrift(properties_
->encoding(column_
->path())));
1357 thrift_encodings
.push_back(ToThrift(Encoding::RLE
));
1358 // Only PLAIN encoding is supported for fallback in V1
1359 // TODO(majetideepak): Use user specified encoding for V2
1360 if (dictionary_fallback
) {
1361 thrift_encodings
.push_back(ToThrift(Encoding::PLAIN
));
1363 column_chunk_
->meta_data
.__set_encodings(thrift_encodings
);
1364 std::vector
<format::PageEncodingStats
> thrift_encoding_stats
;
1365 // Add dictionary page encoding stats
1366 for (const auto& entry
: dict_encoding_stats
) {
1367 format::PageEncodingStats dict_enc_stat
;
1368 dict_enc_stat
.__set_page_type(format::PageType::DICTIONARY_PAGE
);
1369 dict_enc_stat
.__set_encoding(ToThrift(entry
.first
));
1370 dict_enc_stat
.__set_count(entry
.second
);
1371 thrift_encoding_stats
.push_back(dict_enc_stat
);
1373 // Add data page encoding stats
1374 for (const auto& entry
: data_encoding_stats
) {
1375 format::PageEncodingStats data_enc_stat
;
1376 data_enc_stat
.__set_page_type(format::PageType::DATA_PAGE
);
1377 data_enc_stat
.__set_encoding(ToThrift(entry
.first
));
1378 data_enc_stat
.__set_count(entry
.second
);
1379 thrift_encoding_stats
.push_back(data_enc_stat
);
1381 column_chunk_
->meta_data
.__set_encoding_stats(thrift_encoding_stats
);
1383 const auto& encrypt_md
=
1384 properties_
->column_encryption_properties(column_
->path()->ToDotString());
1385 // column is encrypted
1386 if (encrypt_md
!= nullptr && encrypt_md
->is_encrypted()) {
1387 column_chunk_
->__isset
.crypto_metadata
= true;
1388 format::ColumnCryptoMetaData ccmd
;
1389 if (encrypt_md
->is_encrypted_with_footer_key()) {
1390 // encrypted with footer key
1391 ccmd
.__isset
.ENCRYPTION_WITH_FOOTER_KEY
= true;
1392 ccmd
.__set_ENCRYPTION_WITH_FOOTER_KEY(format::EncryptionWithFooterKey());
1393 } else { // encrypted with column key
1394 format::EncryptionWithColumnKey eck
;
1395 eck
.__set_key_metadata(encrypt_md
->key_metadata());
1396 eck
.__set_path_in_schema(column_
->path()->ToDotVector());
1397 ccmd
.__isset
.ENCRYPTION_WITH_COLUMN_KEY
= true;
1398 ccmd
.__set_ENCRYPTION_WITH_COLUMN_KEY(eck
);
1400 column_chunk_
->__set_crypto_metadata(ccmd
);
1402 bool encrypted_footer
=
1403 properties_
->file_encryption_properties()->encrypted_footer();
1404 bool encrypt_metadata
=
1405 !encrypted_footer
|| !encrypt_md
->is_encrypted_with_footer_key();
1406 if (encrypt_metadata
) {
1407 ThriftSerializer serializer
;
1408 // Serialize and encrypt ColumnMetadata separately
1409 // Thrift-serialize the ColumnMetaData structure,
1410 // encrypt it with the column key, and write to encrypted_column_metadata
1411 uint8_t* serialized_data
;
1412 uint32_t serialized_len
;
1414 serializer
.SerializeToBuffer(&column_chunk_
->meta_data
, &serialized_len
,
1417 std::vector
<uint8_t> encrypted_data(encryptor
->CiphertextSizeDelta() +
1419 unsigned encrypted_len
=
1420 encryptor
->Encrypt(serialized_data
, serialized_len
, encrypted_data
.data());
1423 const_cast<const char*>(reinterpret_cast<char*>(encrypted_data
.data()));
1424 std::string
encrypted_column_metadata(temp
, encrypted_len
);
1425 column_chunk_
->__set_encrypted_column_metadata(encrypted_column_metadata
);
1427 if (encrypted_footer
) {
1428 column_chunk_
->__isset
.meta_data
= false;
1430 // Keep redacted metadata version for old readers
1431 column_chunk_
->__isset
.meta_data
= true;
1432 column_chunk_
->meta_data
.__isset
.statistics
= false;
1433 column_chunk_
->meta_data
.__isset
.encoding_stats
= false;
1439 void WriteTo(::arrow::io::OutputStream
* sink
) {
1440 ThriftSerializer serializer
;
1441 serializer
.Serialize(column_chunk_
, sink
);
1444 const ColumnDescriptor
* descr() const { return column_
; }
1445 int64_t total_compressed_size() const {
1446 return column_chunk_
->meta_data
.total_compressed_size
;
1450 void Init(format::ColumnChunk
* column_chunk
) {
1451 column_chunk_
= column_chunk
;
1453 column_chunk_
->meta_data
.__set_type(ToThrift(column_
->physical_type()));
1454 column_chunk_
->meta_data
.__set_path_in_schema(column_
->path()->ToDotVector());
1455 column_chunk_
->meta_data
.__set_codec(
1456 ToThrift(properties_
->compression(column_
->path())));
1459 format::ColumnChunk
* column_chunk_
;
1460 std::unique_ptr
<format::ColumnChunk
> owned_column_chunk_
;
1461 const std::shared_ptr
<WriterProperties
> properties_
;
1462 const ColumnDescriptor
* column_
;
1465 std::unique_ptr
<ColumnChunkMetaDataBuilder
> ColumnChunkMetaDataBuilder::Make(
1466 std::shared_ptr
<WriterProperties
> props
, const ColumnDescriptor
* column
,
1468 return std::unique_ptr
<ColumnChunkMetaDataBuilder
>(
1469 new ColumnChunkMetaDataBuilder(std::move(props
), column
, contents
));
1472 std::unique_ptr
<ColumnChunkMetaDataBuilder
> ColumnChunkMetaDataBuilder::Make(
1473 std::shared_ptr
<WriterProperties
> props
, const ColumnDescriptor
* column
) {
1474 return std::unique_ptr
<ColumnChunkMetaDataBuilder
>(
1475 new ColumnChunkMetaDataBuilder(std::move(props
), column
));
1478 ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilder(
1479 std::shared_ptr
<WriterProperties
> props
, const ColumnDescriptor
* column
)
1480 : impl_
{std::unique_ptr
<ColumnChunkMetaDataBuilderImpl
>(
1481 new ColumnChunkMetaDataBuilderImpl(std::move(props
), column
))} {}
1483 ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilder(
1484 std::shared_ptr
<WriterProperties
> props
, const ColumnDescriptor
* column
,
1486 : impl_
{std::unique_ptr
<ColumnChunkMetaDataBuilderImpl
>(
1487 new ColumnChunkMetaDataBuilderImpl(
1488 std::move(props
), column
,
1489 reinterpret_cast<format::ColumnChunk
*>(contents
)))} {}
1491 ColumnChunkMetaDataBuilder::~ColumnChunkMetaDataBuilder() = default;
1493 const void* ColumnChunkMetaDataBuilder::contents() const { return impl_
->contents(); }
1495 void ColumnChunkMetaDataBuilder::set_file_path(const std::string
& path
) {
1496 impl_
->set_file_path(path
);
1499 void ColumnChunkMetaDataBuilder::Finish(
1500 int64_t num_values
, int64_t dictionary_page_offset
, int64_t index_page_offset
,
1501 int64_t data_page_offset
, int64_t compressed_size
, int64_t uncompressed_size
,
1502 bool has_dictionary
, bool dictionary_fallback
,
1503 const std::map
<Encoding::type
, int32_t>& dict_encoding_stats
,
1504 const std::map
<Encoding::type
, int32_t>& data_encoding_stats
,
1505 const std::shared_ptr
<Encryptor
>& encryptor
) {
1506 impl_
->Finish(num_values
, dictionary_page_offset
, index_page_offset
, data_page_offset
,
1507 compressed_size
, uncompressed_size
, has_dictionary
, dictionary_fallback
,
1508 dict_encoding_stats
, data_encoding_stats
, encryptor
);
1511 void ColumnChunkMetaDataBuilder::WriteTo(::arrow::io::OutputStream
* sink
) {
1512 impl_
->WriteTo(sink
);
1515 const ColumnDescriptor
* ColumnChunkMetaDataBuilder::descr() const {
1516 return impl_
->descr();
1519 void ColumnChunkMetaDataBuilder::SetStatistics(const EncodedStatistics
& result
) {
1520 impl_
->SetStatistics(result
);
1523 int64_t ColumnChunkMetaDataBuilder::total_compressed_size() const {
1524 return impl_
->total_compressed_size();
1527 class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl
{
1529 explicit RowGroupMetaDataBuilderImpl(std::shared_ptr
<WriterProperties
> props
,
1530 const SchemaDescriptor
* schema
, void* contents
)
1531 : properties_(std::move(props
)), schema_(schema
), next_column_(0) {
1532 row_group_
= reinterpret_cast<format::RowGroup
*>(contents
);
1533 InitializeColumns(schema
->num_columns());
1536 ColumnChunkMetaDataBuilder
* NextColumnChunk() {
1537 if (!(next_column_
< num_columns())) {
1538 std::stringstream ss
;
1539 ss
<< "The schema only has " << num_columns()
1540 << " columns, requested metadata for column: " << next_column_
;
1541 throw ParquetException(ss
.str());
1543 auto column
= schema_
->Column(next_column_
);
1544 auto column_builder
= ColumnChunkMetaDataBuilder::Make(
1545 properties_
, column
, &row_group_
->columns
[next_column_
++]);
1546 auto column_builder_ptr
= column_builder
.get();
1547 column_builders_
.push_back(std::move(column_builder
));
1548 return column_builder_ptr
;
1551 int current_column() { return next_column_
- 1; }
1553 void Finish(int64_t total_bytes_written
, int16_t row_group_ordinal
) {
1554 if (!(next_column_
== schema_
->num_columns())) {
1555 std::stringstream ss
;
1556 ss
<< "Only " << next_column_
- 1 << " out of " << schema_
->num_columns()
1557 << " columns are initialized";
1558 throw ParquetException(ss
.str());
1561 int64_t file_offset
= 0;
1562 int64_t total_compressed_size
= 0;
1563 for (int i
= 0; i
< schema_
->num_columns(); i
++) {
1564 if (!(row_group_
->columns
[i
].file_offset
>= 0)) {
1565 std::stringstream ss
;
1566 ss
<< "Column " << i
<< " is not complete.";
1567 throw ParquetException(ss
.str());
1570 const format::ColumnMetaData
& first_col
= row_group_
->columns
[0].meta_data
;
1571 // As per spec, file_offset for the row group points to the first
1572 // dictionary or data page of the column.
1573 if (first_col
.__isset
.dictionary_page_offset
&&
1574 first_col
.dictionary_page_offset
> 0) {
1575 file_offset
= first_col
.dictionary_page_offset
;
1577 file_offset
= first_col
.data_page_offset
;
1580 // sometimes column metadata is encrypted and not available to read,
1581 // so we must get total_compressed_size from column builder
1582 total_compressed_size
+= column_builders_
[i
]->total_compressed_size();
1585 row_group_
->__set_file_offset(file_offset
);
1586 row_group_
->__set_total_compressed_size(total_compressed_size
);
1587 row_group_
->__set_total_byte_size(total_bytes_written
);
1588 row_group_
->__set_ordinal(row_group_ordinal
);
1591 void set_num_rows(int64_t num_rows
) { row_group_
->num_rows
= num_rows
; }
1593 int num_columns() { return static_cast<int>(row_group_
->columns
.size()); }
1595 int64_t num_rows() { return row_group_
->num_rows
; }
1598 void InitializeColumns(int ncols
) { row_group_
->columns
.resize(ncols
); }
1600 format::RowGroup
* row_group_
;
1601 const std::shared_ptr
<WriterProperties
> properties_
;
1602 const SchemaDescriptor
* schema_
;
1603 std::vector
<std::unique_ptr
<ColumnChunkMetaDataBuilder
>> column_builders_
;
1607 std::unique_ptr
<RowGroupMetaDataBuilder
> RowGroupMetaDataBuilder::Make(
1608 std::shared_ptr
<WriterProperties
> props
, const SchemaDescriptor
* schema_
,
1610 return std::unique_ptr
<RowGroupMetaDataBuilder
>(
1611 new RowGroupMetaDataBuilder(std::move(props
), schema_
, contents
));
1614 RowGroupMetaDataBuilder::RowGroupMetaDataBuilder(std::shared_ptr
<WriterProperties
> props
,
1615 const SchemaDescriptor
* schema_
,
1617 : impl_
{new RowGroupMetaDataBuilderImpl(std::move(props
), schema_
, contents
)} {}
1619 RowGroupMetaDataBuilder::~RowGroupMetaDataBuilder() = default;
1621 ColumnChunkMetaDataBuilder
* RowGroupMetaDataBuilder::NextColumnChunk() {
1622 return impl_
->NextColumnChunk();
1625 int RowGroupMetaDataBuilder::current_column() const { return impl_
->current_column(); }
1627 int RowGroupMetaDataBuilder::num_columns() { return impl_
->num_columns(); }
1629 int64_t RowGroupMetaDataBuilder::num_rows() { return impl_
->num_rows(); }
1631 void RowGroupMetaDataBuilder::set_num_rows(int64_t num_rows
) {
1632 impl_
->set_num_rows(num_rows
);
1635 void RowGroupMetaDataBuilder::Finish(int64_t total_bytes_written
,
1636 int16_t row_group_ordinal
) {
1637 impl_
->Finish(total_bytes_written
, row_group_ordinal
);
1641 // TODO(PARQUET-595) Support key_value_metadata
1642 class FileMetaDataBuilder::FileMetaDataBuilderImpl
{
1644 explicit FileMetaDataBuilderImpl(
1645 const SchemaDescriptor
* schema
, std::shared_ptr
<WriterProperties
> props
,
1646 std::shared_ptr
<const KeyValueMetadata
> key_value_metadata
)
1647 : metadata_(new format::FileMetaData()),
1648 properties_(std::move(props
)),
1650 key_value_metadata_(std::move(key_value_metadata
)) {
1651 if (properties_
->file_encryption_properties() != nullptr &&
1652 properties_
->file_encryption_properties()->encrypted_footer()) {
1653 crypto_metadata_
.reset(new format::FileCryptoMetaData());
1657 RowGroupMetaDataBuilder
* AppendRowGroup() {
1658 row_groups_
.emplace_back();
1659 current_row_group_builder_
=
1660 RowGroupMetaDataBuilder::Make(properties_
, schema_
, &row_groups_
.back());
1661 return current_row_group_builder_
.get();
1664 std::unique_ptr
<FileMetaData
> Finish() {
1665 int64_t total_rows
= 0;
1666 for (auto row_group
: row_groups_
) {
1667 total_rows
+= row_group
.num_rows
;
1669 metadata_
->__set_num_rows(total_rows
);
1670 metadata_
->__set_row_groups(row_groups_
);
1672 if (key_value_metadata_
) {
1673 metadata_
->key_value_metadata
.clear();
1674 metadata_
->key_value_metadata
.reserve(key_value_metadata_
->size());
1675 for (int64_t i
= 0; i
< key_value_metadata_
->size(); ++i
) {
1676 format::KeyValue kv_pair
;
1677 kv_pair
.__set_key(key_value_metadata_
->key(i
));
1678 kv_pair
.__set_value(key_value_metadata_
->value(i
));
1679 metadata_
->key_value_metadata
.push_back(kv_pair
);
1681 metadata_
->__isset
.key_value_metadata
= true;
1684 int32_t file_version
= 0;
1685 switch (properties_
->version()) {
1686 case ParquetVersion::PARQUET_1_0
:
1693 metadata_
->__set_version(file_version
);
1694 metadata_
->__set_created_by(properties_
->created_by());
1696 // Users cannot set the `ColumnOrder` since we donot not have user defined sort order
1698 // We always default to `TYPE_DEFINED_ORDER`. We can expose it in
1699 // the API once we have user defined sort orders in the Parquet format.
1700 // TypeDefinedOrder implies choose SortOrder based on ConvertedType/PhysicalType
1701 format::TypeDefinedOrder type_defined_order
;
1702 format::ColumnOrder column_order
;
1703 column_order
.__set_TYPE_ORDER(type_defined_order
);
1704 column_order
.__isset
.TYPE_ORDER
= true;
1705 metadata_
->column_orders
.resize(schema_
->num_columns(), column_order
);
1706 metadata_
->__isset
.column_orders
= true;
1708 // if plaintext footer, set footer signing algorithm
1709 auto file_encryption_properties
= properties_
->file_encryption_properties();
1710 if (file_encryption_properties
&& !file_encryption_properties
->encrypted_footer()) {
1711 EncryptionAlgorithm signing_algorithm
;
1712 EncryptionAlgorithm algo
= file_encryption_properties
->algorithm();
1713 signing_algorithm
.aad
.aad_file_unique
= algo
.aad
.aad_file_unique
;
1714 signing_algorithm
.aad
.supply_aad_prefix
= algo
.aad
.supply_aad_prefix
;
1715 if (!algo
.aad
.supply_aad_prefix
) {
1716 signing_algorithm
.aad
.aad_prefix
= algo
.aad
.aad_prefix
;
1718 signing_algorithm
.algorithm
= ParquetCipher::AES_GCM_V1
;
1720 metadata_
->__set_encryption_algorithm(ToThrift(signing_algorithm
));
1721 const std::string
& footer_signing_key_metadata
=
1722 file_encryption_properties
->footer_key_metadata();
1723 if (footer_signing_key_metadata
.size() > 0) {
1724 metadata_
->__set_footer_signing_key_metadata(footer_signing_key_metadata
);
1728 ToParquet(static_cast<parquet::schema::GroupNode
*>(schema_
->schema_root().get()),
1729 &metadata_
->schema
);
1730 auto file_meta_data
= std::unique_ptr
<FileMetaData
>(new FileMetaData());
1731 file_meta_data
->impl_
->metadata_
= std::move(metadata_
);
1732 file_meta_data
->impl_
->InitSchema();
1733 file_meta_data
->impl_
->InitKeyValueMetadata();
1734 return file_meta_data
;
1737 std::unique_ptr
<FileCryptoMetaData
> BuildFileCryptoMetaData() {
1738 if (crypto_metadata_
== nullptr) {
1742 auto file_encryption_properties
= properties_
->file_encryption_properties();
1744 crypto_metadata_
->__set_encryption_algorithm(
1745 ToThrift(file_encryption_properties
->algorithm()));
1746 std::string key_metadata
= file_encryption_properties
->footer_key_metadata();
1748 if (!key_metadata
.empty()) {
1749 crypto_metadata_
->__set_key_metadata(key_metadata
);
1752 std::unique_ptr
<FileCryptoMetaData
> file_crypto_metadata
=
1753 std::unique_ptr
<FileCryptoMetaData
>(new FileCryptoMetaData());
1754 file_crypto_metadata
->impl_
->metadata_
= std::move(crypto_metadata_
);
1756 return file_crypto_metadata
;
1760 std::unique_ptr
<format::FileMetaData
> metadata_
;
1761 std::unique_ptr
<format::FileCryptoMetaData
> crypto_metadata_
;
1764 const std::shared_ptr
<WriterProperties
> properties_
;
1765 std::vector
<format::RowGroup
> row_groups_
;
1767 std::unique_ptr
<RowGroupMetaDataBuilder
> current_row_group_builder_
;
1768 const SchemaDescriptor
* schema_
;
1769 std::shared_ptr
<const KeyValueMetadata
> key_value_metadata_
;
1772 std::unique_ptr
<FileMetaDataBuilder
> FileMetaDataBuilder::Make(
1773 const SchemaDescriptor
* schema
, std::shared_ptr
<WriterProperties
> props
,
1774 std::shared_ptr
<const KeyValueMetadata
> key_value_metadata
) {
1775 return std::unique_ptr
<FileMetaDataBuilder
>(
1776 new FileMetaDataBuilder(schema
, std::move(props
), std::move(key_value_metadata
)));
1779 FileMetaDataBuilder::FileMetaDataBuilder(
1780 const SchemaDescriptor
* schema
, std::shared_ptr
<WriterProperties
> props
,
1781 std::shared_ptr
<const KeyValueMetadata
> key_value_metadata
)
1782 : impl_
{std::unique_ptr
<FileMetaDataBuilderImpl
>(new FileMetaDataBuilderImpl(
1783 schema
, std::move(props
), std::move(key_value_metadata
)))} {}
1785 FileMetaDataBuilder::~FileMetaDataBuilder() = default;
1787 RowGroupMetaDataBuilder
* FileMetaDataBuilder::AppendRowGroup() {
1788 return impl_
->AppendRowGroup();
1791 std::unique_ptr
<FileMetaData
> FileMetaDataBuilder::Finish() { return impl_
->Finish(); }
1793 std::unique_ptr
<FileCryptoMetaData
> FileMetaDataBuilder::GetCryptoMetaData() {
1794 return impl_
->BuildFileCryptoMetaData();
1797 } // namespace parquet