]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/go/parquet/internal/encoding/delta_bit_packing.go
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / go / parquet / internal / encoding / delta_bit_packing.go
CommitLineData
1d09f67e
TL
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package encoding
18
19import (
20 "bytes"
21 "math"
22 "math/bits"
23 "reflect"
24
25 "github.com/apache/arrow/go/v6/arrow"
26 "github.com/apache/arrow/go/v6/arrow/memory"
27 "github.com/apache/arrow/go/v6/parquet"
28 "github.com/apache/arrow/go/v6/parquet/internal/utils"
29 "golang.org/x/xerrors"
30)
31
32// see the deltaBitPack encoder for a description of the encoding format that is
33// used for delta-bitpacking.
34type deltaBitPackDecoder struct {
35 decoder
36
37 mem memory.Allocator
38
39 usedFirst bool
40 bitdecoder *utils.BitReader
41 blockSize uint64
42 currentBlockVals uint32
43 miniBlocks uint64
44 valsPerMini uint32
45 currentMiniBlockVals uint32
46 minDelta int64
47 miniBlockIdx uint64
48
49 deltaBitWidths *memory.Buffer
50 deltaBitWidth byte
51
52 lastVal int64
53}
54
55// returns the number of bytes read so far
56func (d *deltaBitPackDecoder) bytesRead() int64 {
57 return d.bitdecoder.CurOffset()
58}
59
60func (d *deltaBitPackDecoder) Allocator() memory.Allocator { return d.mem }
61
62// SetData sets the bytes and the expected number of values to decode
63// into the decoder, updating the decoder and allowing it to be reused.
64func (d *deltaBitPackDecoder) SetData(nvalues int, data []byte) error {
65 // set our data into the underlying decoder for the type
66 if err := d.decoder.SetData(nvalues, data); err != nil {
67 return err
68 }
69 // create a bit reader for our decoder's values
70 d.bitdecoder = utils.NewBitReader(bytes.NewReader(d.data))
71 d.currentBlockVals = 0
72 d.currentMiniBlockVals = 0
73 if d.deltaBitWidths == nil {
74 d.deltaBitWidths = memory.NewResizableBuffer(d.mem)
75 }
76
77 var ok bool
78 d.blockSize, ok = d.bitdecoder.GetVlqInt()
79 if !ok {
80 return xerrors.New("parquet: eof exception")
81 }
82
83 if d.miniBlocks, ok = d.bitdecoder.GetVlqInt(); !ok {
84 return xerrors.New("parquet: eof exception")
85 }
86
87 var totalValues uint64
88 if totalValues, ok = d.bitdecoder.GetVlqInt(); !ok {
89 return xerrors.New("parquet: eof exception")
90 }
91
92 if int(totalValues) != d.nvals {
93 return xerrors.New("parquet: mismatch between number of values and count in data header")
94 }
95
96 if d.lastVal, ok = d.bitdecoder.GetZigZagVlqInt(); !ok {
97 return xerrors.New("parquet: eof exception")
98 }
99
100 if d.miniBlocks != 0 {
101 d.valsPerMini = uint32(d.blockSize / d.miniBlocks)
102 }
103 return nil
104}
105
106// initialize a block to decode
107func (d *deltaBitPackDecoder) initBlock() error {
108 // first we grab the min delta value that we'll start from
109 var ok bool
110 if d.minDelta, ok = d.bitdecoder.GetZigZagVlqInt(); !ok {
111 return xerrors.New("parquet: eof exception")
112 }
113
114 // ensure we have enough space for our miniblocks to decode the widths
115 d.deltaBitWidths.Resize(int(d.miniBlocks))
116
117 var err error
118 for i := uint64(0); i < d.miniBlocks; i++ {
119 if d.deltaBitWidths.Bytes()[i], err = d.bitdecoder.ReadByte(); err != nil {
120 return err
121 }
122 }
123
124 d.miniBlockIdx = 0
125 d.deltaBitWidth = d.deltaBitWidths.Bytes()[0]
126 d.currentBlockVals = uint32(d.blockSize)
127 return nil
128}
129
130// DeltaBitPackInt32Decoder decodes Int32 values which are packed using the Delta BitPacking algorithm.
131type DeltaBitPackInt32Decoder struct {
132 *deltaBitPackDecoder
133
134 miniBlockValues []int32
135}
136
137func (d *DeltaBitPackInt32Decoder) unpackNextMini() error {
138 if d.miniBlockValues == nil {
139 d.miniBlockValues = make([]int32, 0, int(d.valsPerMini))
140 } else {
141 d.miniBlockValues = d.miniBlockValues[:0]
142 }
143 d.deltaBitWidth = d.deltaBitWidths.Bytes()[int(d.miniBlockIdx)]
144 d.currentMiniBlockVals = d.valsPerMini
145
146 for j := 0; j < int(d.valsPerMini); j++ {
147 delta, ok := d.bitdecoder.GetValue(int(d.deltaBitWidth))
148 if !ok {
149 return xerrors.New("parquet: eof exception")
150 }
151
152 d.lastVal += int64(delta) + int64(d.minDelta)
153 d.miniBlockValues = append(d.miniBlockValues, int32(d.lastVal))
154 }
155 d.miniBlockIdx++
156 return nil
157}
158
159// Decode retrieves min(remaining values, len(out)) values from the data and returns the number
160// of values actually decoded and any errors encountered.
161func (d *DeltaBitPackInt32Decoder) Decode(out []int32) (int, error) {
162 max := utils.MinInt(len(out), d.nvals)
163 if max == 0 {
164 return 0, nil
165 }
166
167 out = out[:max]
168 if !d.usedFirst { // starting value to calculate deltas against
169 out[0] = int32(d.lastVal)
170 out = out[1:]
171 d.usedFirst = true
172 }
173
174 var err error
175 for len(out) > 0 { // unpack mini blocks until we get all the values we need
176 if d.currentBlockVals == 0 {
177 err = d.initBlock()
178 }
179 if d.currentMiniBlockVals == 0 {
180 err = d.unpackNextMini()
181 }
182 if err != nil {
183 return 0, err
184 }
185
186 // copy as many values from our mini block as we can into out
187 start := int(d.valsPerMini - d.currentMiniBlockVals)
188 end := utils.MinInt(int(d.valsPerMini), len(out))
189 copy(out, d.miniBlockValues[start:end])
190
191 numCopied := end - start
192 out = out[numCopied:]
193 d.currentBlockVals -= uint32(numCopied)
194 d.currentMiniBlockVals -= uint32(numCopied)
195 }
196 return max, nil
197}
198
199// DecodeSpaced is like Decode, but the result is spaced out appropriately based on the passed in bitmap
200func (d *DeltaBitPackInt32Decoder) DecodeSpaced(out []int32, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
201 toread := len(out) - nullCount
202 values, err := d.Decode(out[:toread])
203 if err != nil {
204 return values, err
205 }
206 if values != toread {
207 return values, xerrors.New("parquet: number of values / definition levels read did not match")
208 }
209
210 return spacedExpand(out, nullCount, validBits, validBitsOffset), nil
211}
212
213// Type returns the physical parquet type that this decoder decodes, in this case Int32
214func (DeltaBitPackInt32Decoder) Type() parquet.Type {
215 return parquet.Types.Int32
216}
217
218// DeltaBitPackInt64Decoder decodes a delta bit packed int64 column of data.
219type DeltaBitPackInt64Decoder struct {
220 *deltaBitPackDecoder
221
222 miniBlockValues []int64
223}
224
225func (d *DeltaBitPackInt64Decoder) unpackNextMini() error {
226 if d.miniBlockValues == nil {
227 d.miniBlockValues = make([]int64, 0, int(d.valsPerMini))
228 } else {
229 d.miniBlockValues = d.miniBlockValues[:0]
230 }
231
232 d.deltaBitWidth = d.deltaBitWidths.Bytes()[int(d.miniBlockIdx)]
233 d.currentMiniBlockVals = d.valsPerMini
234
235 for j := 0; j < int(d.valsPerMini); j++ {
236 delta, ok := d.bitdecoder.GetValue(int(d.deltaBitWidth))
237 if !ok {
238 return xerrors.New("parquet: eof exception")
239 }
240
241 d.lastVal += int64(delta) + int64(d.minDelta)
242 d.miniBlockValues = append(d.miniBlockValues, d.lastVal)
243 }
244 d.miniBlockIdx++
245 return nil
246}
247
248// Decode retrieves min(remaining values, len(out)) values from the data and returns the number
249// of values actually decoded and any errors encountered.
250func (d *DeltaBitPackInt64Decoder) Decode(out []int64) (int, error) {
251 max := utils.MinInt(len(out), d.nvals)
252 if max == 0 {
253 return 0, nil
254 }
255
256 out = out[:max]
257 if !d.usedFirst {
258 out[0] = d.lastVal
259 out = out[1:]
260 d.usedFirst = true
261 }
262
263 var err error
264 for len(out) > 0 {
265 if d.currentBlockVals == 0 {
266 err = d.initBlock()
267 }
268 if d.currentMiniBlockVals == 0 {
269 err = d.unpackNextMini()
270 }
271
272 if err != nil {
273 return 0, err
274 }
275
276 start := int(d.valsPerMini - d.currentMiniBlockVals)
277 end := utils.MinInt(int(d.valsPerMini), len(out))
278 copy(out, d.miniBlockValues[start:end])
279
280 numCopied := end - start
281 out = out[numCopied:]
282 d.currentBlockVals -= uint32(numCopied)
283 d.currentMiniBlockVals -= uint32(numCopied)
284 }
285 return max, nil
286}
287
288// Type returns the physical parquet type that this decoder decodes, in this case Int64
289func (DeltaBitPackInt64Decoder) Type() parquet.Type {
290 return parquet.Types.Int64
291}
292
293// DecodeSpaced is like Decode, but the result is spaced out appropriately based on the passed in bitmap
294func (d DeltaBitPackInt64Decoder) DecodeSpaced(out []int64, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
295 toread := len(out) - nullCount
296 values, err := d.Decode(out[:toread])
297 if err != nil {
298 return values, err
299 }
300 if values != toread {
301 return values, xerrors.New("parquet: number of values / definition levels read did not match")
302 }
303
304 return spacedExpand(out, nullCount, validBits, validBitsOffset), nil
305}
306
307const (
308 // block size must be a multiple of 128
309 defaultBlockSize = 128
310 defaultNumMiniBlocks = 4
311 // block size / number of mini blocks must result in a multiple of 32
312 defaultNumValuesPerMini = 32
313 // max size of the header for the delta blocks
314 maxHeaderWriterSize = 32
315)
316
317// deltaBitPackEncoder is an encoder for the DeltaBinary Packing format
318// as per the parquet spec.
319//
320// Consists of a header followed by blocks of delta encoded values binary packed.
321//
322// Format
323// [header] [block 1] [block 2] ... [block N]
324//
325// Header
326// [block size] [number of mini blocks per block] [total value count] [first value]
327//
328// Block
329// [min delta] [list of bitwidths of the miniblocks] [miniblocks...]
330//
331// Sets aside bytes at the start of the internal buffer where the header will be written,
332// and only writes the header when FlushValues is called before returning it.
333type deltaBitPackEncoder struct {
334 encoder
335
336 bitWriter *utils.BitWriter
337 totalVals uint64
338 firstVal int64
339 currentVal int64
340
341 blockSize uint64
342 miniBlockSize uint64
343 numMiniBlocks uint64
344 deltas []int64
345}
346
347// flushBlock flushes out a finished block for writing to the underlying encoder
348func (enc *deltaBitPackEncoder) flushBlock() {
349 if len(enc.deltas) == 0 {
350 return
351 }
352
353 // determine the minimum delta value
354 minDelta := int64(math.MaxInt64)
355 for _, delta := range enc.deltas {
356 if delta < minDelta {
357 minDelta = delta
358 }
359 }
360
361 enc.bitWriter.WriteZigZagVlqInt(minDelta)
362 // reserve enough bytes to write out our miniblock deltas
363 offset := enc.bitWriter.ReserveBytes(int(enc.numMiniBlocks))
364
365 valuesToWrite := int64(len(enc.deltas))
366 for i := 0; i < int(enc.numMiniBlocks); i++ {
367 n := utils.Min(int64(enc.miniBlockSize), valuesToWrite)
368 if n == 0 {
369 break
370 }
371
372 maxDelta := int64(math.MinInt64)
373 start := i * int(enc.miniBlockSize)
374 for _, val := range enc.deltas[start : start+int(n)] {
375 maxDelta = utils.Max(maxDelta, val)
376 }
377
378 // compute bit width to store (max_delta - min_delta)
379 width := uint(bits.Len64(uint64(maxDelta - minDelta)))
380 // write out the bit width we used into the bytes we reserved earlier
381 enc.bitWriter.WriteAt([]byte{byte(width)}, int64(offset+i))
382
383 // write out our deltas
384 for _, val := range enc.deltas[start : start+int(n)] {
385 enc.bitWriter.WriteValue(uint64(val-minDelta), width)
386 }
387
388 valuesToWrite -= n
389
390 // pad the last block if n < miniBlockSize
391 for ; n < int64(enc.miniBlockSize); n++ {
392 enc.bitWriter.WriteValue(0, width)
393 }
394 }
395 enc.deltas = enc.deltas[:0]
396}
397
398// putInternal is the implementation for actually writing data which must be
399// integral data as int, int8, int32, or int64.
400func (enc *deltaBitPackEncoder) putInternal(data interface{}) {
401 v := reflect.ValueOf(data)
402 if v.Len() == 0 {
403 return
404 }
405
406 idx := 0
407 if enc.totalVals == 0 {
408 enc.blockSize = defaultBlockSize
409 enc.numMiniBlocks = defaultNumMiniBlocks
410 enc.miniBlockSize = defaultNumValuesPerMini
411
412 enc.firstVal = v.Index(0).Int()
413 enc.currentVal = enc.firstVal
414 idx = 1
415
416 enc.bitWriter = utils.NewBitWriter(enc.sink)
417 }
418
419 enc.totalVals += uint64(v.Len())
420 for ; idx < v.Len(); idx++ {
421 val := v.Index(idx).Int()
422 enc.deltas = append(enc.deltas, val-enc.currentVal)
423 enc.currentVal = val
424 if len(enc.deltas) == int(enc.blockSize) {
425 enc.flushBlock()
426 }
427 }
428}
429
430// FlushValues flushes any remaining data and returns the finished encoded buffer
431// or returns nil and any error encountered during flushing.
432func (enc *deltaBitPackEncoder) FlushValues() (Buffer, error) {
433 if enc.bitWriter != nil {
434 // write any remaining values
435 enc.flushBlock()
436 enc.bitWriter.Flush(true)
437 } else {
438 enc.blockSize = defaultBlockSize
439 enc.numMiniBlocks = defaultNumMiniBlocks
440 enc.miniBlockSize = defaultNumValuesPerMini
441 }
442
443 buffer := make([]byte, maxHeaderWriterSize)
444 headerWriter := utils.NewBitWriter(utils.NewWriterAtBuffer(buffer))
445
446 headerWriter.WriteVlqInt(uint64(enc.blockSize))
447 headerWriter.WriteVlqInt(uint64(enc.numMiniBlocks))
448 headerWriter.WriteVlqInt(uint64(enc.totalVals))
449 headerWriter.WriteZigZagVlqInt(int64(enc.firstVal))
450 headerWriter.Flush(false)
451
452 buffer = buffer[:headerWriter.Written()]
453 enc.totalVals = 0
454
455 if enc.bitWriter != nil {
456 flushed := enc.sink.Finish()
457 defer flushed.Release()
458
459 buffer = append(buffer, flushed.Buf()[:enc.bitWriter.Written()]...)
460 }
461 return poolBuffer{memory.NewBufferBytes(buffer)}, nil
462}
463
464// EstimatedDataEncodedSize returns the current amount of data actually flushed out and written
465func (enc *deltaBitPackEncoder) EstimatedDataEncodedSize() int64 {
466 return int64(enc.bitWriter.Written())
467}
468
469// DeltaBitPackInt32Encoder is an encoder for the delta bitpacking encoding for int32 data.
470type DeltaBitPackInt32Encoder struct {
471 *deltaBitPackEncoder
472}
473
474// Put writes the values from the provided slice of int32 to the encoder
475func (enc DeltaBitPackInt32Encoder) Put(in []int32) {
476 enc.putInternal(in)
477}
478
479// PutSpaced takes a slice of int32 along with a bitmap that describes the nulls and an offset into the bitmap
480// in order to write spaced data to the encoder.
481func (enc DeltaBitPackInt32Encoder) PutSpaced(in []int32, validBits []byte, validBitsOffset int64) {
482 buffer := memory.NewResizableBuffer(enc.mem)
483 buffer.Reserve(arrow.Int32Traits.BytesRequired(len(in)))
484 defer buffer.Release()
485
486 data := arrow.Int32Traits.CastFromBytes(buffer.Buf())
487 nvalid := spacedCompress(in, data, validBits, validBitsOffset)
488 enc.Put(data[:nvalid])
489}
490
491// Type returns the underlying physical type this encoder works with, in this case Int32
492func (DeltaBitPackInt32Encoder) Type() parquet.Type {
493 return parquet.Types.Int32
494}
495
496// DeltaBitPackInt32Encoder is an encoder for the delta bitpacking encoding for int32 data.
497type DeltaBitPackInt64Encoder struct {
498 *deltaBitPackEncoder
499}
500
501// Put writes the values from the provided slice of int64 to the encoder
502func (enc DeltaBitPackInt64Encoder) Put(in []int64) {
503 enc.putInternal(in)
504}
505
506// PutSpaced takes a slice of int64 along with a bitmap that describes the nulls and an offset into the bitmap
507// in order to write spaced data to the encoder.
508func (enc DeltaBitPackInt64Encoder) PutSpaced(in []int64, validBits []byte, validBitsOffset int64) {
509 buffer := memory.NewResizableBuffer(enc.mem)
510 buffer.Reserve(arrow.Int64Traits.BytesRequired(len(in)))
511 defer buffer.Release()
512
513 data := arrow.Int64Traits.CastFromBytes(buffer.Buf())
514 nvalid := spacedCompress(in, data, validBits, validBitsOffset)
515 enc.Put(data[:nvalid])
516}
517
518// Type returns the underlying physical type this encoder works with, in this case Int64
519func (DeltaBitPackInt64Encoder) Type() parquet.Type {
520 return parquet.Types.Int64
521}