]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/go/parquet/metadata/column_chunk.go
c7cee110cfd5b3800533ae8b3b50468b35b642c1
[ceph.git] / ceph / src / arrow / go / parquet / metadata / column_chunk.go
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16
17 package metadata
18
19 import (
20 "bytes"
21 "context"
22 "io"
23 "reflect"
24
25 "github.com/apache/arrow/go/v6/arrow/memory"
26 "github.com/apache/arrow/go/v6/parquet"
27 "github.com/apache/arrow/go/v6/parquet/compress"
28 "github.com/apache/arrow/go/v6/parquet/internal/encryption"
29 format "github.com/apache/arrow/go/v6/parquet/internal/gen-go/parquet"
30 "github.com/apache/arrow/go/v6/parquet/internal/thrift"
31 "github.com/apache/arrow/go/v6/parquet/schema"
32 "golang.org/x/xerrors"
33 )
34
35 // PageEncodingStats is used for counting the number of pages of specific
36 // types with the given internal encoding.
37 type PageEncodingStats struct {
38 Encoding parquet.Encoding
39 PageType format.PageType
40 }
41
42 type statvalues struct {
43 *format.Statistics
44 }
45
46 func (s *statvalues) GetMin() []byte { return s.GetMinValue() }
47 func (s *statvalues) GetMax() []byte { return s.GetMaxValue() }
48 func (s *statvalues) IsSetMin() bool { return s.IsSetMinValue() }
49 func (s *statvalues) IsSetMax() bool { return s.IsSetMaxValue() }
50
51 func makeColumnStats(metadata *format.ColumnMetaData, descr *schema.Column, mem memory.Allocator) TypedStatistics {
52 if descr.ColumnOrder() == parquet.ColumnOrders.TypeDefinedOrder {
53 return NewStatisticsFromEncoded(descr, mem,
54 metadata.NumValues-metadata.Statistics.GetNullCount(),
55 &statvalues{metadata.Statistics})
56 }
57 return NewStatisticsFromEncoded(descr, mem,
58 metadata.NumValues-metadata.Statistics.GetNullCount(),
59 metadata.Statistics)
60 }
61
62 // ColumnChunkMetaData is a proxy around format.ColumnChunkMetaData
63 // containing all of the information and metadata for a given column chunk
64 // and it's associated Column
65 type ColumnChunkMetaData struct {
66 column *format.ColumnChunk
67 columnMeta *format.ColumnMetaData
68 decryptedMeta format.ColumnMetaData
69 descr *schema.Column
70 writerVersion *AppVersion
71 encodings []parquet.Encoding
72 encodingStats []format.PageEncodingStats
73 possibleStats TypedStatistics
74 mem memory.Allocator
75 }
76
77 // NewColumnChunkMetaData creates an instance of the metadata from a column chunk and descriptor
78 //
79 // this is primarily used internally or between the subpackages. ColumnChunkMetaDataBuilder should
80 // be used by consumers instead of using this directly.
81 func NewColumnChunkMetaData(column *format.ColumnChunk, descr *schema.Column, writerVersion *AppVersion, rowGroupOrdinal, columnOrdinal int16, fileDecryptor encryption.FileDecryptor) (*ColumnChunkMetaData, error) {
82 c := &ColumnChunkMetaData{
83 column: column,
84 columnMeta: column.GetMetaData(),
85 descr: descr,
86 writerVersion: writerVersion,
87 mem: memory.DefaultAllocator,
88 }
89 if column.IsSetCryptoMetadata() {
90 ccmd := column.CryptoMetadata
91
92 if ccmd.IsSetENCRYPTION_WITH_COLUMN_KEY() {
93 if fileDecryptor != nil && fileDecryptor.Properties() != nil {
94 // should decrypt metadata
95 path := parquet.ColumnPath(ccmd.ENCRYPTION_WITH_COLUMN_KEY.GetPathInSchema())
96 keyMetadata := ccmd.ENCRYPTION_WITH_COLUMN_KEY.GetKeyMetadata()
97 aadColumnMetadata := encryption.CreateModuleAad(fileDecryptor.FileAad(), encryption.ColumnMetaModule, rowGroupOrdinal, columnOrdinal, -1)
98 decryptor := fileDecryptor.GetColumnMetaDecryptor(path.String(), string(keyMetadata), aadColumnMetadata)
99 thrift.DeserializeThrift(&c.decryptedMeta, decryptor.Decrypt(column.GetEncryptedColumnMetadata()))
100 c.columnMeta = &c.decryptedMeta
101 } else {
102 return nil, xerrors.New("cannot decrypt column metadata. file decryption not setup correctly")
103 }
104 }
105 }
106 for _, enc := range c.columnMeta.Encodings {
107 c.encodings = append(c.encodings, parquet.Encoding(enc))
108 }
109 for _, enc := range c.columnMeta.EncodingStats {
110 c.encodingStats = append(c.encodingStats, *enc)
111 }
112 return c, nil
113 }
114
115 // CryptoMetadata returns the cryptographic metadata for how this column was
116 // encrypted and how to decrypt it.
117 func (c *ColumnChunkMetaData) CryptoMetadata() *format.ColumnCryptoMetaData {
118 return c.column.GetCryptoMetadata()
119 }
120
121 // FileOffset is the location in the file where the column data begins
122 func (c *ColumnChunkMetaData) FileOffset() int64 { return c.column.FileOffset }
123
124 // FilePath gives the name of the parquet file if provided in the metadata
125 func (c *ColumnChunkMetaData) FilePath() string { return c.column.GetFilePath() }
126
127 // Type is the physical storage type used in the parquet file for this column chunk.
128 func (c *ColumnChunkMetaData) Type() parquet.Type { return parquet.Type(c.columnMeta.Type) }
129
130 // NumValues is the number of values stored in just this chunk including nulls.
131 func (c *ColumnChunkMetaData) NumValues() int64 { return c.columnMeta.NumValues }
132
133 // PathInSchema is the full path to this column from the root of the schema including
134 // any nested columns
135 func (c *ColumnChunkMetaData) PathInSchema() parquet.ColumnPath {
136 return c.columnMeta.GetPathInSchema()
137 }
138
139 // Compression provides the type of compression used for this particular chunk.
140 func (c *ColumnChunkMetaData) Compression() compress.Compression {
141 return compress.Compression(c.columnMeta.Codec)
142 }
143
144 // Encodings returns the list of different encodings used in this chunk
145 func (c *ColumnChunkMetaData) Encodings() []parquet.Encoding { return c.encodings }
146
147 // EncodingStats connects the order of encodings based on the list of pages and types
148 func (c *ColumnChunkMetaData) EncodingStats() []PageEncodingStats {
149 ret := make([]PageEncodingStats, len(c.encodingStats))
150 for idx := range ret {
151 ret[idx].Encoding = parquet.Encoding(c.encodingStats[idx].Encoding)
152 ret[idx].PageType = c.encodingStats[idx].PageType
153 }
154 return ret
155 }
156
157 // HasDictionaryPage returns true if there is a dictionary page offset set in
158 // this metadata.
159 func (c *ColumnChunkMetaData) HasDictionaryPage() bool {
160 return c.columnMeta.IsSetDictionaryPageOffset()
161 }
162
163 // DictionaryPageOffset returns the location in the file where the dictionary page starts
164 func (c *ColumnChunkMetaData) DictionaryPageOffset() int64 {
165 return c.columnMeta.GetDictionaryPageOffset()
166 }
167
168 // DataPageOffset returns the location in the file where the data pages begin for this column
169 func (c *ColumnChunkMetaData) DataPageOffset() int64 { return c.columnMeta.GetDataPageOffset() }
170
171 // HasIndexPage returns true if the offset for the index page is set in the metadata
172 func (c *ColumnChunkMetaData) HasIndexPage() bool { return c.columnMeta.IsSetIndexPageOffset() }
173
174 // IndexPageOffset is the location in the file where the index page starts.
175 func (c *ColumnChunkMetaData) IndexPageOffset() int64 { return c.columnMeta.GetIndexPageOffset() }
176
177 // TotalCompressedSize will be equal to TotalUncompressedSize if the data is not compressed.
178 // Otherwise this will be the size of the actual data in the file.
179 func (c *ColumnChunkMetaData) TotalCompressedSize() int64 {
180 return c.columnMeta.GetTotalCompressedSize()
181 }
182
183 // TotalUncompressedSize is the total size of the raw data after uncompressing the chunk
184 func (c *ColumnChunkMetaData) TotalUncompressedSize() int64 {
185 return c.columnMeta.GetTotalUncompressedSize()
186 }
187
188 // BloomFilterOffset is the byte offset from the beginning of the file to the bloom
189 // filter data.
190 func (c *ColumnChunkMetaData) BloomFilterOffset() int64 {
191 return c.columnMeta.GetBloomFilterOffset()
192 }
193
194 // StatsSet returns true only if there are statistics set in the metadata and the column
195 // descriptor has a sort order that is not SortUnknown
196 //
197 // It also checks the writer version to ensure that it was not written by a version
198 // of parquet which is known to have incorrect stat computations.
199 func (c *ColumnChunkMetaData) StatsSet() (bool, error) {
200 if !c.columnMeta.IsSetStatistics() || c.descr.SortOrder() == schema.SortUNKNOWN {
201 return false, nil
202 }
203
204 if c.possibleStats == nil {
205 c.possibleStats = makeColumnStats(c.columnMeta, c.descr, c.mem)
206 }
207
208 encoded, err := c.possibleStats.Encode()
209 if err != nil {
210 return false, err
211 }
212
213 return c.writerVersion.HasCorrectStatistics(c.Type(), c.descr.LogicalType(), encoded, c.descr.SortOrder()), nil
214 }
215
216 func (c *ColumnChunkMetaData) Equals(other *ColumnChunkMetaData) bool {
217 return reflect.DeepEqual(c.columnMeta, other.columnMeta)
218 }
219
220 // Statistics can return nil if there are no stats in this metadata
221 func (c *ColumnChunkMetaData) Statistics() (TypedStatistics, error) {
222 ok, err := c.StatsSet()
223 if err != nil {
224 return nil, err
225 }
226
227 if ok {
228 return c.possibleStats, nil
229 }
230 return nil, nil
231 }
232
233 // ColumnChunkMetaDataBuilder is used during writing to construct metadata
234 // for a given column chunk while writing, providing a proxy around constructing
235 // the actual thrift object.
236 type ColumnChunkMetaDataBuilder struct {
237 chunk *format.ColumnChunk
238 props *parquet.WriterProperties
239 column *schema.Column
240
241 compressedSize int64
242 }
243
244 func NewColumnChunkMetaDataBuilder(props *parquet.WriterProperties, column *schema.Column) *ColumnChunkMetaDataBuilder {
245 return NewColumnChunkMetaDataBuilderWithContents(props, column, format.NewColumnChunk())
246 }
247
248 // NewColumnChunkMetaDataBuilderWithContents will construct a builder and start it with the provided
249 // column chunk information rather than with an empty column chunk.
250 func NewColumnChunkMetaDataBuilderWithContents(props *parquet.WriterProperties, column *schema.Column, chunk *format.ColumnChunk) *ColumnChunkMetaDataBuilder {
251 b := &ColumnChunkMetaDataBuilder{
252 props: props,
253 column: column,
254 chunk: chunk,
255 }
256 b.init(chunk)
257 return b
258 }
259
260 // Contents returns the underlying thrift ColumnChunk object so that it can be used
261 // for constructing or duplicating column metadata
262 func (c *ColumnChunkMetaDataBuilder) Contents() *format.ColumnChunk { return c.chunk }
263
264 func (c *ColumnChunkMetaDataBuilder) init(chunk *format.ColumnChunk) {
265 c.chunk = chunk
266 if !c.chunk.IsSetMetaData() {
267 c.chunk.MetaData = format.NewColumnMetaData()
268 }
269 c.chunk.MetaData.Type = format.Type(c.column.PhysicalType())
270 c.chunk.MetaData.PathInSchema = schema.ColumnPathFromNode(c.column.SchemaNode())
271 c.chunk.MetaData.Codec = format.CompressionCodec(c.props.CompressionFor(c.column.Path()))
272 }
273
274 func (c *ColumnChunkMetaDataBuilder) SetFilePath(val string) {
275 c.chunk.FilePath = &val
276 }
277
278 // Descr returns the associated column descriptor for this column chunk
279 func (c *ColumnChunkMetaDataBuilder) Descr() *schema.Column { return c.column }
280
281 func (c *ColumnChunkMetaDataBuilder) TotalCompressedSize() int64 {
282 // if this column is encrypted, after Finish is called, the MetaData
283 // field is set to nil and we store the compressed size so return that
284 if c.chunk.MetaData == nil {
285 return c.compressedSize
286 }
287 return c.chunk.MetaData.GetTotalCompressedSize()
288 }
289
290 func (c *ColumnChunkMetaDataBuilder) SetStats(val EncodedStatistics) {
291 c.chunk.MetaData.Statistics = val.ToThrift()
292 }
293
294 // ChunkMetaInfo is a helper struct for passing the offset and size information
295 // for finishing the building of column chunk metadata
296 type ChunkMetaInfo struct {
297 NumValues int64
298 DictPageOffset int64
299 IndexPageOffset int64
300 DataPageOffset int64
301 CompressedSize int64
302 UncompressedSize int64
303 }
304
305 // EncodingStats is a helper struct for passing the encoding stat information
306 // for finishing up metadata for a column chunk.
307 type EncodingStats struct {
308 DictEncodingStats map[parquet.Encoding]int32
309 DataEncodingStats map[parquet.Encoding]int32
310 }
311
312 // Finish finalizes the metadata with the given offsets,
313 // flushes any compression that needs to be done, and performs
314 // any encryption if an encryptor is provided.
315 func (c *ColumnChunkMetaDataBuilder) Finish(info ChunkMetaInfo, hasDict, dictFallback bool, encStats EncodingStats, metaEncryptor encryption.Encryptor) error {
316 if info.DictPageOffset > 0 {
317 c.chunk.MetaData.DictionaryPageOffset = &info.DictPageOffset
318 c.chunk.FileOffset = info.DictPageOffset + info.CompressedSize
319 } else {
320 c.chunk.FileOffset = info.DataPageOffset + info.CompressedSize
321 }
322
323 c.chunk.MetaData.NumValues = info.NumValues
324 if info.IndexPageOffset >= 0 {
325 c.chunk.MetaData.IndexPageOffset = &info.IndexPageOffset
326 }
327
328 c.chunk.MetaData.DataPageOffset = info.DataPageOffset
329 c.chunk.MetaData.TotalUncompressedSize = info.UncompressedSize
330 c.chunk.MetaData.TotalCompressedSize = info.CompressedSize
331
332 // no matter the configuration, the maximum number of thrift encodings we'll
333 // populate is going to be 3:
334 // 1. potential dictionary index encoding
335 // 2. page encoding
336 // 3. RLE for repetition and definition levels
337 // so let's preallocate a capacity of 3 but initialize the slice at 0 len
338 const maxEncodings = 3
339
340 thriftEncodings := make([]format.Encoding, 0, maxEncodings)
341 if hasDict {
342 thriftEncodings = append(thriftEncodings, format.Encoding(c.props.DictionaryIndexEncoding()))
343 if c.props.Version() == parquet.V1_0 {
344 thriftEncodings = append(thriftEncodings, format.Encoding_PLAIN)
345 } else {
346 thriftEncodings = append(thriftEncodings, format.Encoding(c.props.DictionaryPageEncoding()))
347 }
348 } else { // no dictionary
349 thriftEncodings = append(thriftEncodings, format.Encoding(c.props.EncodingFor(c.column.Path())))
350 }
351
352 thriftEncodings = append(thriftEncodings, format.Encoding(parquet.Encodings.RLE))
353 // Only PLAIN encoding is supported for fallback in V1
354 // TODO(zeroshade): Use user specified encoding for V2
355 if dictFallback {
356 thriftEncodings = append(thriftEncodings, format.Encoding_PLAIN)
357 }
358 c.chunk.MetaData.Encodings = thriftEncodings
359
360 thriftEncodingStats := make([]*format.PageEncodingStats, 0, len(encStats.DictEncodingStats)+len(encStats.DataEncodingStats))
361 for k, v := range encStats.DictEncodingStats {
362 thriftEncodingStats = append(thriftEncodingStats, &format.PageEncodingStats{
363 PageType: format.PageType_DICTIONARY_PAGE,
364 Encoding: format.Encoding(k),
365 Count: v,
366 })
367 }
368 for k, v := range encStats.DataEncodingStats {
369 thriftEncodingStats = append(thriftEncodingStats, &format.PageEncodingStats{
370 PageType: format.PageType_DATA_PAGE,
371 Encoding: format.Encoding(k),
372 Count: v,
373 })
374 }
375 c.chunk.MetaData.EncodingStats = thriftEncodingStats
376
377 encryptProps := c.props.ColumnEncryptionProperties(c.column.Path())
378 if encryptProps != nil && encryptProps.IsEncrypted() {
379 ccmd := format.NewColumnCryptoMetaData()
380 if encryptProps.IsEncryptedWithFooterKey() {
381 ccmd.ENCRYPTION_WITH_FOOTER_KEY = format.NewEncryptionWithFooterKey()
382 } else {
383 ccmd.ENCRYPTION_WITH_COLUMN_KEY = &format.EncryptionWithColumnKey{
384 KeyMetadata: []byte(encryptProps.KeyMetadata()),
385 PathInSchema: c.column.ColumnPath(),
386 }
387 }
388 c.chunk.CryptoMetadata = ccmd
389
390 encryptedFooter := c.props.FileEncryptionProperties().EncryptedFooter()
391 encryptMetadata := !encryptedFooter || !encryptProps.IsEncryptedWithFooterKey()
392 if encryptMetadata {
393 // Serialize and encrypt ColumnMetadata separately
394 // Thrift-serialize the ColumnMetaData structure,
395 // encrypt it with the column key, and write to encrypted_column_metadata
396 serializer := thrift.NewThriftSerializer()
397 data, err := serializer.Write(context.Background(), c.chunk.MetaData)
398 if err != nil {
399 return err
400 }
401 var buf bytes.Buffer
402 metaEncryptor.Encrypt(&buf, data)
403 c.chunk.EncryptedColumnMetadata = buf.Bytes()
404
405 if encryptedFooter {
406 c.compressedSize = c.chunk.MetaData.GetTotalCompressedSize()
407 c.chunk.MetaData = nil
408 } else {
409 // Keep redacted metadata version for old readers
410 c.chunk.MetaData.Statistics = nil
411 c.chunk.MetaData.EncodingStats = nil
412 }
413 }
414 }
415 return nil
416 }
417
418 // WriteTo will always return 0 as the int64 since the thrift writer library
419 // does not return the number of bytes written, we only use the signature
420 // of (int64, error) in order to match the standard WriteTo interfaces.
421 func (c *ColumnChunkMetaDataBuilder) WriteTo(w io.Writer) (int64, error) {
422 return 0, thrift.SerializeThriftStream(c.chunk, w)
423 }