]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, software | |
12 | // distributed under the License is distributed on an "AS IS" BASIS, | |
13 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | // See the License for the specific language governing permissions and | |
15 | // limitations under the License. | |
16 | ||
17 | package encoding | |
18 | ||
19 | import ( | |
20 | "bytes" | |
21 | "math" | |
22 | "math/bits" | |
23 | "reflect" | |
24 | ||
25 | "github.com/apache/arrow/go/v6/arrow" | |
26 | "github.com/apache/arrow/go/v6/arrow/memory" | |
27 | "github.com/apache/arrow/go/v6/parquet" | |
28 | "github.com/apache/arrow/go/v6/parquet/internal/utils" | |
29 | "golang.org/x/xerrors" | |
30 | ) | |
31 | ||
32 | // see the deltaBitPack encoder for a description of the encoding format that is | |
33 | // used for delta-bitpacking. | |
34 | type deltaBitPackDecoder struct { | |
35 | decoder | |
36 | ||
37 | mem memory.Allocator | |
38 | ||
39 | usedFirst bool | |
40 | bitdecoder *utils.BitReader | |
41 | blockSize uint64 | |
42 | currentBlockVals uint32 | |
43 | miniBlocks uint64 | |
44 | valsPerMini uint32 | |
45 | currentMiniBlockVals uint32 | |
46 | minDelta int64 | |
47 | miniBlockIdx uint64 | |
48 | ||
49 | deltaBitWidths *memory.Buffer | |
50 | deltaBitWidth byte | |
51 | ||
52 | lastVal int64 | |
53 | } | |
54 | ||
55 | // returns the number of bytes read so far | |
56 | func (d *deltaBitPackDecoder) bytesRead() int64 { | |
57 | return d.bitdecoder.CurOffset() | |
58 | } | |
59 | ||
60 | func (d *deltaBitPackDecoder) Allocator() memory.Allocator { return d.mem } | |
61 | ||
62 | // SetData sets the bytes and the expected number of values to decode | |
63 | // into the decoder, updating the decoder and allowing it to be reused. | |
64 | func (d *deltaBitPackDecoder) SetData(nvalues int, data []byte) error { | |
65 | // set our data into the underlying decoder for the type | |
66 | if err := d.decoder.SetData(nvalues, data); err != nil { | |
67 | return err | |
68 | } | |
69 | // create a bit reader for our decoder's values | |
70 | d.bitdecoder = utils.NewBitReader(bytes.NewReader(d.data)) | |
71 | d.currentBlockVals = 0 | |
72 | d.currentMiniBlockVals = 0 | |
73 | if d.deltaBitWidths == nil { | |
74 | d.deltaBitWidths = memory.NewResizableBuffer(d.mem) | |
75 | } | |
76 | ||
77 | var ok bool | |
78 | d.blockSize, ok = d.bitdecoder.GetVlqInt() | |
79 | if !ok { | |
80 | return xerrors.New("parquet: eof exception") | |
81 | } | |
82 | ||
83 | if d.miniBlocks, ok = d.bitdecoder.GetVlqInt(); !ok { | |
84 | return xerrors.New("parquet: eof exception") | |
85 | } | |
86 | ||
87 | var totalValues uint64 | |
88 | if totalValues, ok = d.bitdecoder.GetVlqInt(); !ok { | |
89 | return xerrors.New("parquet: eof exception") | |
90 | } | |
91 | ||
92 | if int(totalValues) != d.nvals { | |
93 | return xerrors.New("parquet: mismatch between number of values and count in data header") | |
94 | } | |
95 | ||
96 | if d.lastVal, ok = d.bitdecoder.GetZigZagVlqInt(); !ok { | |
97 | return xerrors.New("parquet: eof exception") | |
98 | } | |
99 | ||
100 | if d.miniBlocks != 0 { | |
101 | d.valsPerMini = uint32(d.blockSize / d.miniBlocks) | |
102 | } | |
103 | return nil | |
104 | } | |
105 | ||
106 | // initialize a block to decode | |
107 | func (d *deltaBitPackDecoder) initBlock() error { | |
108 | // first we grab the min delta value that we'll start from | |
109 | var ok bool | |
110 | if d.minDelta, ok = d.bitdecoder.GetZigZagVlqInt(); !ok { | |
111 | return xerrors.New("parquet: eof exception") | |
112 | } | |
113 | ||
114 | // ensure we have enough space for our miniblocks to decode the widths | |
115 | d.deltaBitWidths.Resize(int(d.miniBlocks)) | |
116 | ||
117 | var err error | |
118 | for i := uint64(0); i < d.miniBlocks; i++ { | |
119 | if d.deltaBitWidths.Bytes()[i], err = d.bitdecoder.ReadByte(); err != nil { | |
120 | return err | |
121 | } | |
122 | } | |
123 | ||
124 | d.miniBlockIdx = 0 | |
125 | d.deltaBitWidth = d.deltaBitWidths.Bytes()[0] | |
126 | d.currentBlockVals = uint32(d.blockSize) | |
127 | return nil | |
128 | } | |
129 | ||
130 | // DeltaBitPackInt32Decoder decodes Int32 values which are packed using the Delta BitPacking algorithm. | |
131 | type DeltaBitPackInt32Decoder struct { | |
132 | *deltaBitPackDecoder | |
133 | ||
134 | miniBlockValues []int32 | |
135 | } | |
136 | ||
137 | func (d *DeltaBitPackInt32Decoder) unpackNextMini() error { | |
138 | if d.miniBlockValues == nil { | |
139 | d.miniBlockValues = make([]int32, 0, int(d.valsPerMini)) | |
140 | } else { | |
141 | d.miniBlockValues = d.miniBlockValues[:0] | |
142 | } | |
143 | d.deltaBitWidth = d.deltaBitWidths.Bytes()[int(d.miniBlockIdx)] | |
144 | d.currentMiniBlockVals = d.valsPerMini | |
145 | ||
146 | for j := 0; j < int(d.valsPerMini); j++ { | |
147 | delta, ok := d.bitdecoder.GetValue(int(d.deltaBitWidth)) | |
148 | if !ok { | |
149 | return xerrors.New("parquet: eof exception") | |
150 | } | |
151 | ||
152 | d.lastVal += int64(delta) + int64(d.minDelta) | |
153 | d.miniBlockValues = append(d.miniBlockValues, int32(d.lastVal)) | |
154 | } | |
155 | d.miniBlockIdx++ | |
156 | return nil | |
157 | } | |
158 | ||
159 | // Decode retrieves min(remaining values, len(out)) values from the data and returns the number | |
160 | // of values actually decoded and any errors encountered. | |
161 | func (d *DeltaBitPackInt32Decoder) Decode(out []int32) (int, error) { | |
162 | max := utils.MinInt(len(out), d.nvals) | |
163 | if max == 0 { | |
164 | return 0, nil | |
165 | } | |
166 | ||
167 | out = out[:max] | |
168 | if !d.usedFirst { // starting value to calculate deltas against | |
169 | out[0] = int32(d.lastVal) | |
170 | out = out[1:] | |
171 | d.usedFirst = true | |
172 | } | |
173 | ||
174 | var err error | |
175 | for len(out) > 0 { // unpack mini blocks until we get all the values we need | |
176 | if d.currentBlockVals == 0 { | |
177 | err = d.initBlock() | |
178 | } | |
179 | if d.currentMiniBlockVals == 0 { | |
180 | err = d.unpackNextMini() | |
181 | } | |
182 | if err != nil { | |
183 | return 0, err | |
184 | } | |
185 | ||
186 | // copy as many values from our mini block as we can into out | |
187 | start := int(d.valsPerMini - d.currentMiniBlockVals) | |
188 | end := utils.MinInt(int(d.valsPerMini), len(out)) | |
189 | copy(out, d.miniBlockValues[start:end]) | |
190 | ||
191 | numCopied := end - start | |
192 | out = out[numCopied:] | |
193 | d.currentBlockVals -= uint32(numCopied) | |
194 | d.currentMiniBlockVals -= uint32(numCopied) | |
195 | } | |
196 | return max, nil | |
197 | } | |
198 | ||
199 | // DecodeSpaced is like Decode, but the result is spaced out appropriately based on the passed in bitmap | |
200 | func (d *DeltaBitPackInt32Decoder) DecodeSpaced(out []int32, nullCount int, validBits []byte, validBitsOffset int64) (int, error) { | |
201 | toread := len(out) - nullCount | |
202 | values, err := d.Decode(out[:toread]) | |
203 | if err != nil { | |
204 | return values, err | |
205 | } | |
206 | if values != toread { | |
207 | return values, xerrors.New("parquet: number of values / definition levels read did not match") | |
208 | } | |
209 | ||
210 | return spacedExpand(out, nullCount, validBits, validBitsOffset), nil | |
211 | } | |
212 | ||
213 | // Type returns the physical parquet type that this decoder decodes, in this case Int32 | |
214 | func (DeltaBitPackInt32Decoder) Type() parquet.Type { | |
215 | return parquet.Types.Int32 | |
216 | } | |
217 | ||
218 | // DeltaBitPackInt64Decoder decodes a delta bit packed int64 column of data. | |
219 | type DeltaBitPackInt64Decoder struct { | |
220 | *deltaBitPackDecoder | |
221 | ||
222 | miniBlockValues []int64 | |
223 | } | |
224 | ||
225 | func (d *DeltaBitPackInt64Decoder) unpackNextMini() error { | |
226 | if d.miniBlockValues == nil { | |
227 | d.miniBlockValues = make([]int64, 0, int(d.valsPerMini)) | |
228 | } else { | |
229 | d.miniBlockValues = d.miniBlockValues[:0] | |
230 | } | |
231 | ||
232 | d.deltaBitWidth = d.deltaBitWidths.Bytes()[int(d.miniBlockIdx)] | |
233 | d.currentMiniBlockVals = d.valsPerMini | |
234 | ||
235 | for j := 0; j < int(d.valsPerMini); j++ { | |
236 | delta, ok := d.bitdecoder.GetValue(int(d.deltaBitWidth)) | |
237 | if !ok { | |
238 | return xerrors.New("parquet: eof exception") | |
239 | } | |
240 | ||
241 | d.lastVal += int64(delta) + int64(d.minDelta) | |
242 | d.miniBlockValues = append(d.miniBlockValues, d.lastVal) | |
243 | } | |
244 | d.miniBlockIdx++ | |
245 | return nil | |
246 | } | |
247 | ||
248 | // Decode retrieves min(remaining values, len(out)) values from the data and returns the number | |
249 | // of values actually decoded and any errors encountered. | |
250 | func (d *DeltaBitPackInt64Decoder) Decode(out []int64) (int, error) { | |
251 | max := utils.MinInt(len(out), d.nvals) | |
252 | if max == 0 { | |
253 | return 0, nil | |
254 | } | |
255 | ||
256 | out = out[:max] | |
257 | if !d.usedFirst { | |
258 | out[0] = d.lastVal | |
259 | out = out[1:] | |
260 | d.usedFirst = true | |
261 | } | |
262 | ||
263 | var err error | |
264 | for len(out) > 0 { | |
265 | if d.currentBlockVals == 0 { | |
266 | err = d.initBlock() | |
267 | } | |
268 | if d.currentMiniBlockVals == 0 { | |
269 | err = d.unpackNextMini() | |
270 | } | |
271 | ||
272 | if err != nil { | |
273 | return 0, err | |
274 | } | |
275 | ||
276 | start := int(d.valsPerMini - d.currentMiniBlockVals) | |
277 | end := utils.MinInt(int(d.valsPerMini), len(out)) | |
278 | copy(out, d.miniBlockValues[start:end]) | |
279 | ||
280 | numCopied := end - start | |
281 | out = out[numCopied:] | |
282 | d.currentBlockVals -= uint32(numCopied) | |
283 | d.currentMiniBlockVals -= uint32(numCopied) | |
284 | } | |
285 | return max, nil | |
286 | } | |
287 | ||
288 | // Type returns the physical parquet type that this decoder decodes, in this case Int64 | |
289 | func (DeltaBitPackInt64Decoder) Type() parquet.Type { | |
290 | return parquet.Types.Int64 | |
291 | } | |
292 | ||
293 | // DecodeSpaced is like Decode, but the result is spaced out appropriately based on the passed in bitmap | |
294 | func (d DeltaBitPackInt64Decoder) DecodeSpaced(out []int64, nullCount int, validBits []byte, validBitsOffset int64) (int, error) { | |
295 | toread := len(out) - nullCount | |
296 | values, err := d.Decode(out[:toread]) | |
297 | if err != nil { | |
298 | return values, err | |
299 | } | |
300 | if values != toread { | |
301 | return values, xerrors.New("parquet: number of values / definition levels read did not match") | |
302 | } | |
303 | ||
304 | return spacedExpand(out, nullCount, validBits, validBitsOffset), nil | |
305 | } | |
306 | ||
307 | const ( | |
308 | // block size must be a multiple of 128 | |
309 | defaultBlockSize = 128 | |
310 | defaultNumMiniBlocks = 4 | |
311 | // block size / number of mini blocks must result in a multiple of 32 | |
312 | defaultNumValuesPerMini = 32 | |
313 | // max size of the header for the delta blocks | |
314 | maxHeaderWriterSize = 32 | |
315 | ) | |
316 | ||
317 | // deltaBitPackEncoder is an encoder for the DeltaBinary Packing format | |
318 | // as per the parquet spec. | |
319 | // | |
320 | // Consists of a header followed by blocks of delta encoded values binary packed. | |
321 | // | |
322 | // Format | |
323 | // [header] [block 1] [block 2] ... [block N] | |
324 | // | |
325 | // Header | |
326 | // [block size] [number of mini blocks per block] [total value count] [first value] | |
327 | // | |
328 | // Block | |
329 | // [min delta] [list of bitwidths of the miniblocks] [miniblocks...] | |
330 | // | |
331 | // Sets aside bytes at the start of the internal buffer where the header will be written, | |
332 | // and only writes the header when FlushValues is called before returning it. | |
333 | type deltaBitPackEncoder struct { | |
334 | encoder | |
335 | ||
336 | bitWriter *utils.BitWriter | |
337 | totalVals uint64 | |
338 | firstVal int64 | |
339 | currentVal int64 | |
340 | ||
341 | blockSize uint64 | |
342 | miniBlockSize uint64 | |
343 | numMiniBlocks uint64 | |
344 | deltas []int64 | |
345 | } | |
346 | ||
347 | // flushBlock flushes out a finished block for writing to the underlying encoder | |
348 | func (enc *deltaBitPackEncoder) flushBlock() { | |
349 | if len(enc.deltas) == 0 { | |
350 | return | |
351 | } | |
352 | ||
353 | // determine the minimum delta value | |
354 | minDelta := int64(math.MaxInt64) | |
355 | for _, delta := range enc.deltas { | |
356 | if delta < minDelta { | |
357 | minDelta = delta | |
358 | } | |
359 | } | |
360 | ||
361 | enc.bitWriter.WriteZigZagVlqInt(minDelta) | |
362 | // reserve enough bytes to write out our miniblock deltas | |
363 | offset := enc.bitWriter.ReserveBytes(int(enc.numMiniBlocks)) | |
364 | ||
365 | valuesToWrite := int64(len(enc.deltas)) | |
366 | for i := 0; i < int(enc.numMiniBlocks); i++ { | |
367 | n := utils.Min(int64(enc.miniBlockSize), valuesToWrite) | |
368 | if n == 0 { | |
369 | break | |
370 | } | |
371 | ||
372 | maxDelta := int64(math.MinInt64) | |
373 | start := i * int(enc.miniBlockSize) | |
374 | for _, val := range enc.deltas[start : start+int(n)] { | |
375 | maxDelta = utils.Max(maxDelta, val) | |
376 | } | |
377 | ||
378 | // compute bit width to store (max_delta - min_delta) | |
379 | width := uint(bits.Len64(uint64(maxDelta - minDelta))) | |
380 | // write out the bit width we used into the bytes we reserved earlier | |
381 | enc.bitWriter.WriteAt([]byte{byte(width)}, int64(offset+i)) | |
382 | ||
383 | // write out our deltas | |
384 | for _, val := range enc.deltas[start : start+int(n)] { | |
385 | enc.bitWriter.WriteValue(uint64(val-minDelta), width) | |
386 | } | |
387 | ||
388 | valuesToWrite -= n | |
389 | ||
390 | // pad the last block if n < miniBlockSize | |
391 | for ; n < int64(enc.miniBlockSize); n++ { | |
392 | enc.bitWriter.WriteValue(0, width) | |
393 | } | |
394 | } | |
395 | enc.deltas = enc.deltas[:0] | |
396 | } | |
397 | ||
398 | // putInternal is the implementation for actually writing data which must be | |
399 | // integral data as int, int8, int32, or int64. | |
400 | func (enc *deltaBitPackEncoder) putInternal(data interface{}) { | |
401 | v := reflect.ValueOf(data) | |
402 | if v.Len() == 0 { | |
403 | return | |
404 | } | |
405 | ||
406 | idx := 0 | |
407 | if enc.totalVals == 0 { | |
408 | enc.blockSize = defaultBlockSize | |
409 | enc.numMiniBlocks = defaultNumMiniBlocks | |
410 | enc.miniBlockSize = defaultNumValuesPerMini | |
411 | ||
412 | enc.firstVal = v.Index(0).Int() | |
413 | enc.currentVal = enc.firstVal | |
414 | idx = 1 | |
415 | ||
416 | enc.bitWriter = utils.NewBitWriter(enc.sink) | |
417 | } | |
418 | ||
419 | enc.totalVals += uint64(v.Len()) | |
420 | for ; idx < v.Len(); idx++ { | |
421 | val := v.Index(idx).Int() | |
422 | enc.deltas = append(enc.deltas, val-enc.currentVal) | |
423 | enc.currentVal = val | |
424 | if len(enc.deltas) == int(enc.blockSize) { | |
425 | enc.flushBlock() | |
426 | } | |
427 | } | |
428 | } | |
429 | ||
430 | // FlushValues flushes any remaining data and returns the finished encoded buffer | |
431 | // or returns nil and any error encountered during flushing. | |
432 | func (enc *deltaBitPackEncoder) FlushValues() (Buffer, error) { | |
433 | if enc.bitWriter != nil { | |
434 | // write any remaining values | |
435 | enc.flushBlock() | |
436 | enc.bitWriter.Flush(true) | |
437 | } else { | |
438 | enc.blockSize = defaultBlockSize | |
439 | enc.numMiniBlocks = defaultNumMiniBlocks | |
440 | enc.miniBlockSize = defaultNumValuesPerMini | |
441 | } | |
442 | ||
443 | buffer := make([]byte, maxHeaderWriterSize) | |
444 | headerWriter := utils.NewBitWriter(utils.NewWriterAtBuffer(buffer)) | |
445 | ||
446 | headerWriter.WriteVlqInt(uint64(enc.blockSize)) | |
447 | headerWriter.WriteVlqInt(uint64(enc.numMiniBlocks)) | |
448 | headerWriter.WriteVlqInt(uint64(enc.totalVals)) | |
449 | headerWriter.WriteZigZagVlqInt(int64(enc.firstVal)) | |
450 | headerWriter.Flush(false) | |
451 | ||
452 | buffer = buffer[:headerWriter.Written()] | |
453 | enc.totalVals = 0 | |
454 | ||
455 | if enc.bitWriter != nil { | |
456 | flushed := enc.sink.Finish() | |
457 | defer flushed.Release() | |
458 | ||
459 | buffer = append(buffer, flushed.Buf()[:enc.bitWriter.Written()]...) | |
460 | } | |
461 | return poolBuffer{memory.NewBufferBytes(buffer)}, nil | |
462 | } | |
463 | ||
464 | // EstimatedDataEncodedSize returns the current amount of data actually flushed out and written | |
465 | func (enc *deltaBitPackEncoder) EstimatedDataEncodedSize() int64 { | |
466 | return int64(enc.bitWriter.Written()) | |
467 | } | |
468 | ||
469 | // DeltaBitPackInt32Encoder is an encoder for the delta bitpacking encoding for int32 data. | |
470 | type DeltaBitPackInt32Encoder struct { | |
471 | *deltaBitPackEncoder | |
472 | } | |
473 | ||
474 | // Put writes the values from the provided slice of int32 to the encoder | |
475 | func (enc DeltaBitPackInt32Encoder) Put(in []int32) { | |
476 | enc.putInternal(in) | |
477 | } | |
478 | ||
479 | // PutSpaced takes a slice of int32 along with a bitmap that describes the nulls and an offset into the bitmap | |
480 | // in order to write spaced data to the encoder. | |
481 | func (enc DeltaBitPackInt32Encoder) PutSpaced(in []int32, validBits []byte, validBitsOffset int64) { | |
482 | buffer := memory.NewResizableBuffer(enc.mem) | |
483 | buffer.Reserve(arrow.Int32Traits.BytesRequired(len(in))) | |
484 | defer buffer.Release() | |
485 | ||
486 | data := arrow.Int32Traits.CastFromBytes(buffer.Buf()) | |
487 | nvalid := spacedCompress(in, data, validBits, validBitsOffset) | |
488 | enc.Put(data[:nvalid]) | |
489 | } | |
490 | ||
491 | // Type returns the underlying physical type this encoder works with, in this case Int32 | |
492 | func (DeltaBitPackInt32Encoder) Type() parquet.Type { | |
493 | return parquet.Types.Int32 | |
494 | } | |
495 | ||
496 | // DeltaBitPackInt32Encoder is an encoder for the delta bitpacking encoding for int32 data. | |
497 | type DeltaBitPackInt64Encoder struct { | |
498 | *deltaBitPackEncoder | |
499 | } | |
500 | ||
501 | // Put writes the values from the provided slice of int64 to the encoder | |
502 | func (enc DeltaBitPackInt64Encoder) Put(in []int64) { | |
503 | enc.putInternal(in) | |
504 | } | |
505 | ||
506 | // PutSpaced takes a slice of int64 along with a bitmap that describes the nulls and an offset into the bitmap | |
507 | // in order to write spaced data to the encoder. | |
508 | func (enc DeltaBitPackInt64Encoder) PutSpaced(in []int64, validBits []byte, validBitsOffset int64) { | |
509 | buffer := memory.NewResizableBuffer(enc.mem) | |
510 | buffer.Reserve(arrow.Int64Traits.BytesRequired(len(in))) | |
511 | defer buffer.Release() | |
512 | ||
513 | data := arrow.Int64Traits.CastFromBytes(buffer.Buf()) | |
514 | nvalid := spacedCompress(in, data, validBits, validBitsOffset) | |
515 | enc.Put(data[:nvalid]) | |
516 | } | |
517 | ||
518 | // Type returns the underlying physical type this encoder works with, in this case Int64 | |
519 | func (DeltaBitPackInt64Encoder) Type() parquet.Type { | |
520 | return parquet.Types.Int64 | |
521 | } |