1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
25 "github.com/apache/arrow/go/v6/arrow"
26 "github.com/apache/arrow/go/v6/arrow/memory"
27 "github.com/apache/arrow/go/v6/parquet"
28 "github.com/apache/arrow/go/v6/parquet/internal/utils"
29 "golang.org/x/xerrors"
32 // see the deltaBitPack encoder for a description of the encoding format that is
33 // used for delta-bitpacking.
34 type deltaBitPackDecoder struct {
40 bitdecoder *utils.BitReader
42 currentBlockVals uint32
45 currentMiniBlockVals uint32
49 deltaBitWidths *memory.Buffer
55 // returns the number of bytes read so far
56 func (d *deltaBitPackDecoder) bytesRead() int64 {
57 return d.bitdecoder.CurOffset()
60 func (d *deltaBitPackDecoder) Allocator() memory.Allocator { return d.mem }
62 // SetData sets the bytes and the expected number of values to decode
63 // into the decoder, updating the decoder and allowing it to be reused.
64 func (d *deltaBitPackDecoder) SetData(nvalues int, data []byte) error {
65 // set our data into the underlying decoder for the type
66 if err := d.decoder.SetData(nvalues, data); err != nil {
69 // create a bit reader for our decoder's values
70 d.bitdecoder = utils.NewBitReader(bytes.NewReader(d.data))
71 d.currentBlockVals = 0
72 d.currentMiniBlockVals = 0
73 if d.deltaBitWidths == nil {
74 d.deltaBitWidths = memory.NewResizableBuffer(d.mem)
78 d.blockSize, ok = d.bitdecoder.GetVlqInt()
80 return xerrors.New("parquet: eof exception")
83 if d.miniBlocks, ok = d.bitdecoder.GetVlqInt(); !ok {
84 return xerrors.New("parquet: eof exception")
87 var totalValues uint64
88 if totalValues, ok = d.bitdecoder.GetVlqInt(); !ok {
89 return xerrors.New("parquet: eof exception")
92 if int(totalValues) != d.nvals {
93 return xerrors.New("parquet: mismatch between number of values and count in data header")
96 if d.lastVal, ok = d.bitdecoder.GetZigZagVlqInt(); !ok {
97 return xerrors.New("parquet: eof exception")
100 if d.miniBlocks != 0 {
101 d.valsPerMini = uint32(d.blockSize / d.miniBlocks)
106 // initialize a block to decode
107 func (d *deltaBitPackDecoder) initBlock() error {
108 // first we grab the min delta value that we'll start from
110 if d.minDelta, ok = d.bitdecoder.GetZigZagVlqInt(); !ok {
111 return xerrors.New("parquet: eof exception")
114 // ensure we have enough space for our miniblocks to decode the widths
115 d.deltaBitWidths.Resize(int(d.miniBlocks))
118 for i := uint64(0); i < d.miniBlocks; i++ {
119 if d.deltaBitWidths.Bytes()[i], err = d.bitdecoder.ReadByte(); err != nil {
125 d.deltaBitWidth = d.deltaBitWidths.Bytes()[0]
126 d.currentBlockVals = uint32(d.blockSize)
130 // DeltaBitPackInt32Decoder decodes Int32 values which are packed using the Delta BitPacking algorithm.
131 type DeltaBitPackInt32Decoder struct {
134 miniBlockValues []int32
137 func (d *DeltaBitPackInt32Decoder) unpackNextMini() error {
138 if d.miniBlockValues == nil {
139 d.miniBlockValues = make([]int32, 0, int(d.valsPerMini))
141 d.miniBlockValues = d.miniBlockValues[:0]
143 d.deltaBitWidth = d.deltaBitWidths.Bytes()[int(d.miniBlockIdx)]
144 d.currentMiniBlockVals = d.valsPerMini
146 for j := 0; j < int(d.valsPerMini); j++ {
147 delta, ok := d.bitdecoder.GetValue(int(d.deltaBitWidth))
149 return xerrors.New("parquet: eof exception")
152 d.lastVal += int64(delta) + int64(d.minDelta)
153 d.miniBlockValues = append(d.miniBlockValues, int32(d.lastVal))
159 // Decode retrieves min(remaining values, len(out)) values from the data and returns the number
160 // of values actually decoded and any errors encountered.
161 func (d *DeltaBitPackInt32Decoder) Decode(out []int32) (int, error) {
162 max := utils.MinInt(len(out), d.nvals)
168 if !d.usedFirst { // starting value to calculate deltas against
169 out[0] = int32(d.lastVal)
175 for len(out) > 0 { // unpack mini blocks until we get all the values we need
176 if d.currentBlockVals == 0 {
179 if d.currentMiniBlockVals == 0 {
180 err = d.unpackNextMini()
186 // copy as many values from our mini block as we can into out
187 start := int(d.valsPerMini - d.currentMiniBlockVals)
188 end := utils.MinInt(int(d.valsPerMini), len(out))
189 copy(out, d.miniBlockValues[start:end])
191 numCopied := end - start
192 out = out[numCopied:]
193 d.currentBlockVals -= uint32(numCopied)
194 d.currentMiniBlockVals -= uint32(numCopied)
199 // DecodeSpaced is like Decode, but the result is spaced out appropriately based on the passed in bitmap
200 func (d *DeltaBitPackInt32Decoder) DecodeSpaced(out []int32, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
201 toread := len(out) - nullCount
202 values, err := d.Decode(out[:toread])
206 if values != toread {
207 return values, xerrors.New("parquet: number of values / definition levels read did not match")
210 return spacedExpand(out, nullCount, validBits, validBitsOffset), nil
213 // Type returns the physical parquet type that this decoder decodes, in this case Int32
214 func (DeltaBitPackInt32Decoder) Type() parquet.Type {
215 return parquet.Types.Int32
218 // DeltaBitPackInt64Decoder decodes a delta bit packed int64 column of data.
219 type DeltaBitPackInt64Decoder struct {
222 miniBlockValues []int64
225 func (d *DeltaBitPackInt64Decoder) unpackNextMini() error {
226 if d.miniBlockValues == nil {
227 d.miniBlockValues = make([]int64, 0, int(d.valsPerMini))
229 d.miniBlockValues = d.miniBlockValues[:0]
232 d.deltaBitWidth = d.deltaBitWidths.Bytes()[int(d.miniBlockIdx)]
233 d.currentMiniBlockVals = d.valsPerMini
235 for j := 0; j < int(d.valsPerMini); j++ {
236 delta, ok := d.bitdecoder.GetValue(int(d.deltaBitWidth))
238 return xerrors.New("parquet: eof exception")
241 d.lastVal += int64(delta) + int64(d.minDelta)
242 d.miniBlockValues = append(d.miniBlockValues, d.lastVal)
248 // Decode retrieves min(remaining values, len(out)) values from the data and returns the number
249 // of values actually decoded and any errors encountered.
250 func (d *DeltaBitPackInt64Decoder) Decode(out []int64) (int, error) {
251 max := utils.MinInt(len(out), d.nvals)
265 if d.currentBlockVals == 0 {
268 if d.currentMiniBlockVals == 0 {
269 err = d.unpackNextMini()
276 start := int(d.valsPerMini - d.currentMiniBlockVals)
277 end := utils.MinInt(int(d.valsPerMini), len(out))
278 copy(out, d.miniBlockValues[start:end])
280 numCopied := end - start
281 out = out[numCopied:]
282 d.currentBlockVals -= uint32(numCopied)
283 d.currentMiniBlockVals -= uint32(numCopied)
288 // Type returns the physical parquet type that this decoder decodes, in this case Int64
289 func (DeltaBitPackInt64Decoder) Type() parquet.Type {
290 return parquet.Types.Int64
293 // DecodeSpaced is like Decode, but the result is spaced out appropriately based on the passed in bitmap
294 func (d DeltaBitPackInt64Decoder) DecodeSpaced(out []int64, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
295 toread := len(out) - nullCount
296 values, err := d.Decode(out[:toread])
300 if values != toread {
301 return values, xerrors.New("parquet: number of values / definition levels read did not match")
304 return spacedExpand(out, nullCount, validBits, validBitsOffset), nil
308 // block size must be a multiple of 128
309 defaultBlockSize = 128
310 defaultNumMiniBlocks = 4
311 // block size / number of mini blocks must result in a multiple of 32
312 defaultNumValuesPerMini = 32
313 // max size of the header for the delta blocks
314 maxHeaderWriterSize = 32
317 // deltaBitPackEncoder is an encoder for the DeltaBinary Packing format
318 // as per the parquet spec.
320 // Consists of a header followed by blocks of delta encoded values binary packed.
323 // [header] [block 1] [block 2] ... [block N]
326 // [block size] [number of mini blocks per block] [total value count] [first value]
329 // [min delta] [list of bitwidths of the miniblocks] [miniblocks...]
331 // Sets aside bytes at the start of the internal buffer where the header will be written,
332 // and only writes the header when FlushValues is called before returning it.
333 type deltaBitPackEncoder struct {
336 bitWriter *utils.BitWriter
347 // flushBlock flushes out a finished block for writing to the underlying encoder
348 func (enc *deltaBitPackEncoder) flushBlock() {
349 if len(enc.deltas) == 0 {
353 // determine the minimum delta value
354 minDelta := int64(math.MaxInt64)
355 for _, delta := range enc.deltas {
356 if delta < minDelta {
361 enc.bitWriter.WriteZigZagVlqInt(minDelta)
362 // reserve enough bytes to write out our miniblock deltas
363 offset := enc.bitWriter.ReserveBytes(int(enc.numMiniBlocks))
365 valuesToWrite := int64(len(enc.deltas))
366 for i := 0; i < int(enc.numMiniBlocks); i++ {
367 n := utils.Min(int64(enc.miniBlockSize), valuesToWrite)
372 maxDelta := int64(math.MinInt64)
373 start := i * int(enc.miniBlockSize)
374 for _, val := range enc.deltas[start : start+int(n)] {
375 maxDelta = utils.Max(maxDelta, val)
378 // compute bit width to store (max_delta - min_delta)
379 width := uint(bits.Len64(uint64(maxDelta - minDelta)))
380 // write out the bit width we used into the bytes we reserved earlier
381 enc.bitWriter.WriteAt([]byte{byte(width)}, int64(offset+i))
383 // write out our deltas
384 for _, val := range enc.deltas[start : start+int(n)] {
385 enc.bitWriter.WriteValue(uint64(val-minDelta), width)
390 // pad the last block if n < miniBlockSize
391 for ; n < int64(enc.miniBlockSize); n++ {
392 enc.bitWriter.WriteValue(0, width)
395 enc.deltas = enc.deltas[:0]
398 // putInternal is the implementation for actually writing data which must be
399 // integral data as int, int8, int32, or int64.
400 func (enc *deltaBitPackEncoder) putInternal(data interface{}) {
401 v := reflect.ValueOf(data)
407 if enc.totalVals == 0 {
408 enc.blockSize = defaultBlockSize
409 enc.numMiniBlocks = defaultNumMiniBlocks
410 enc.miniBlockSize = defaultNumValuesPerMini
412 enc.firstVal = v.Index(0).Int()
413 enc.currentVal = enc.firstVal
416 enc.bitWriter = utils.NewBitWriter(enc.sink)
419 enc.totalVals += uint64(v.Len())
420 for ; idx < v.Len(); idx++ {
421 val := v.Index(idx).Int()
422 enc.deltas = append(enc.deltas, val-enc.currentVal)
424 if len(enc.deltas) == int(enc.blockSize) {
430 // FlushValues flushes any remaining data and returns the finished encoded buffer
431 // or returns nil and any error encountered during flushing.
432 func (enc *deltaBitPackEncoder) FlushValues() (Buffer, error) {
433 if enc.bitWriter != nil {
434 // write any remaining values
436 enc.bitWriter.Flush(true)
438 enc.blockSize = defaultBlockSize
439 enc.numMiniBlocks = defaultNumMiniBlocks
440 enc.miniBlockSize = defaultNumValuesPerMini
443 buffer := make([]byte, maxHeaderWriterSize)
444 headerWriter := utils.NewBitWriter(utils.NewWriterAtBuffer(buffer))
446 headerWriter.WriteVlqInt(uint64(enc.blockSize))
447 headerWriter.WriteVlqInt(uint64(enc.numMiniBlocks))
448 headerWriter.WriteVlqInt(uint64(enc.totalVals))
449 headerWriter.WriteZigZagVlqInt(int64(enc.firstVal))
450 headerWriter.Flush(false)
452 buffer = buffer[:headerWriter.Written()]
455 if enc.bitWriter != nil {
456 flushed := enc.sink.Finish()
457 defer flushed.Release()
459 buffer = append(buffer, flushed.Buf()[:enc.bitWriter.Written()]...)
461 return poolBuffer{memory.NewBufferBytes(buffer)}, nil
464 // EstimatedDataEncodedSize returns the current amount of data actually flushed out and written
465 func (enc *deltaBitPackEncoder) EstimatedDataEncodedSize() int64 {
466 return int64(enc.bitWriter.Written())
469 // DeltaBitPackInt32Encoder is an encoder for the delta bitpacking encoding for int32 data.
470 type DeltaBitPackInt32Encoder struct {
474 // Put writes the values from the provided slice of int32 to the encoder
475 func (enc DeltaBitPackInt32Encoder) Put(in []int32) {
479 // PutSpaced takes a slice of int32 along with a bitmap that describes the nulls and an offset into the bitmap
480 // in order to write spaced data to the encoder.
481 func (enc DeltaBitPackInt32Encoder) PutSpaced(in []int32, validBits []byte, validBitsOffset int64) {
482 buffer := memory.NewResizableBuffer(enc.mem)
483 buffer.Reserve(arrow.Int32Traits.BytesRequired(len(in)))
484 defer buffer.Release()
486 data := arrow.Int32Traits.CastFromBytes(buffer.Buf())
487 nvalid := spacedCompress(in, data, validBits, validBitsOffset)
488 enc.Put(data[:nvalid])
491 // Type returns the underlying physical type this encoder works with, in this case Int32
492 func (DeltaBitPackInt32Encoder) Type() parquet.Type {
493 return parquet.Types.Int32
496 // DeltaBitPackInt32Encoder is an encoder for the delta bitpacking encoding for int32 data.
497 type DeltaBitPackInt64Encoder struct {
501 // Put writes the values from the provided slice of int64 to the encoder
502 func (enc DeltaBitPackInt64Encoder) Put(in []int64) {
506 // PutSpaced takes a slice of int64 along with a bitmap that describes the nulls and an offset into the bitmap
507 // in order to write spaced data to the encoder.
508 func (enc DeltaBitPackInt64Encoder) PutSpaced(in []int64, validBits []byte, validBitsOffset int64) {
509 buffer := memory.NewResizableBuffer(enc.mem)
510 buffer.Reserve(arrow.Int64Traits.BytesRequired(len(in)))
511 defer buffer.Release()
513 data := arrow.Int64Traits.CastFromBytes(buffer.Buf())
514 nvalid := spacedCompress(in, data, validBits, validBitsOffset)
515 enc.Put(data[:nvalid])
518 // Type returns the underlying physical type this encoder works with, in this case Int64
519 func (DeltaBitPackInt64Encoder) Type() parquet.Type {
520 return parquet.Types.Int64