1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
20 "github.com/apache/arrow/go/v6/arrow/memory"
21 "github.com/apache/arrow/go/v6/parquet"
22 "github.com/apache/arrow/go/v6/parquet/internal/utils"
23 "golang.org/x/xerrors"
26 // DeltaByteArrayEncoder is an encoder for writing bytearrays which are delta encoded
27 // this is also known as incremental encoding or front compression. For each element
28 // in a sequence of strings, we store the prefix length of the previous entry plus the suffix
29 // see https://en.wikipedia.org/wiki/Incremental_encoding for a longer description.
31 // This is stored as a sequence of delta-encoded prefix lengths followed by the suffixes
32 // encoded as delta length byte arrays.
33 type DeltaByteArrayEncoder struct {
36 prefixEncoder *DeltaBitPackInt32Encoder
37 suffixEncoder *DeltaLengthByteArrayEncoder
39 lastVal parquet.ByteArray
42 func (enc *DeltaByteArrayEncoder) initEncoders() {
43 enc.prefixEncoder = &DeltaBitPackInt32Encoder{
44 deltaBitPackEncoder: &deltaBitPackEncoder{encoder: newEncoderBase(enc.encoding, nil, enc.mem)}}
45 enc.suffixEncoder = &DeltaLengthByteArrayEncoder{
46 newEncoderBase(enc.encoding, nil, enc.mem),
47 &DeltaBitPackInt32Encoder{
48 deltaBitPackEncoder: &deltaBitPackEncoder{encoder: newEncoderBase(enc.encoding, nil, enc.mem)}}}
51 // Type returns the underlying physical type this operates on, in this case ByteArrays only
52 func (DeltaByteArrayEncoder) Type() parquet.Type { return parquet.Types.ByteArray }
54 // Put writes a slice of ByteArrays to the encoder
55 func (enc *DeltaByteArrayEncoder) Put(in []parquet.ByteArray) {
60 var suf parquet.ByteArray
61 if enc.prefixEncoder == nil { // initialize our encoders if we haven't yet
63 enc.prefixEncoder.Put([]int32{0})
66 enc.suffixEncoder.Put([]parquet.ByteArray{suf})
70 // for each value, figure out the common prefix with the previous value
71 // and then write the prefix length and the suffix.
72 for _, val := range in {
73 l1 := enc.lastVal.Len()
76 for j < l1 && j < l2 {
77 if enc.lastVal[j] != val[j] {
82 enc.prefixEncoder.Put([]int32{int32(j)})
84 enc.suffixEncoder.Put([]parquet.ByteArray{suf})
88 // do the memcpy after the loops to keep a copy of the lastVal
89 // we do a copy here so that we only copy and keep a reference
90 // to the suffix, and aren't forcing the *entire* value to stay
91 // in memory while we have this reference to just the suffix.
92 enc.lastVal = append([]byte{}, enc.lastVal...)
95 // PutSpaced is like Put, but assumes the data is already spaced for nulls and uses the bitmap provided and offset
96 // to compress the data before writing it without the null slots.
97 func (enc *DeltaByteArrayEncoder) PutSpaced(in []parquet.ByteArray, validBits []byte, validBitsOffset int64) {
99 data := make([]parquet.ByteArray, len(in))
100 nvalid := spacedCompress(in, data, validBits, validBitsOffset)
101 enc.Put(data[:nvalid])
107 // Flush flushes any remaining data out and returns the finished encoded buffer.
108 // or returns nil and any error encountered during flushing.
109 func (enc *DeltaByteArrayEncoder) FlushValues() (Buffer, error) {
110 if enc.prefixEncoder == nil {
113 prefixBuf, err := enc.prefixEncoder.FlushValues()
117 defer prefixBuf.Release()
119 suffixBuf, err := enc.suffixEncoder.FlushValues()
123 defer suffixBuf.Release()
125 ret := bufferPool.Get().(*memory.Buffer)
126 ret.ResizeNoShrink(prefixBuf.Len() + suffixBuf.Len())
127 copy(ret.Bytes(), prefixBuf.Bytes())
128 copy(ret.Bytes()[prefixBuf.Len():], suffixBuf.Bytes())
129 return poolBuffer{ret}, nil
132 // DeltaByteArrayDecoder is a decoder for a column of data encoded using incremental or prefix encoding.
133 type DeltaByteArrayDecoder struct {
134 *DeltaLengthByteArrayDecoder
136 prefixLengths []int32
137 lastVal parquet.ByteArray
140 // Type returns the underlying physical type this decoder operates on, in this case ByteArrays only
141 func (DeltaByteArrayDecoder) Type() parquet.Type {
142 return parquet.Types.ByteArray
145 func (d *DeltaByteArrayDecoder) Allocator() memory.Allocator { return d.mem }
147 // SetData expects the data passed in to be the prefix lengths, followed by the
148 // blocks of suffix data in order to initialize the decoder.
149 func (d *DeltaByteArrayDecoder) SetData(nvalues int, data []byte) error {
150 prefixLenDec := DeltaBitPackInt32Decoder{
151 deltaBitPackDecoder: &deltaBitPackDecoder{
152 decoder: newDecoderBase(d.encoding, d.descr),
155 if err := prefixLenDec.SetData(nvalues, data); err != nil {
159 d.prefixLengths = make([]int32, nvalues)
160 // decode all the prefix lengths first so we know how many bytes it took to get the
161 // prefix lengths for nvalues
162 prefixLenDec.Decode(d.prefixLengths)
164 // now that we know how many bytes we needed for the prefix lengths, the rest are the
165 // delta length byte array encoding.
166 return d.DeltaLengthByteArrayDecoder.SetData(nvalues, data[int(prefixLenDec.bytesRead()):])
169 // Decode decodes byte arrays into the slice provided and returns the number of values actually decoded
170 func (d *DeltaByteArrayDecoder) Decode(out []parquet.ByteArray) (int, error) {
171 max := utils.MinInt(len(out), d.nvals)
178 if d.lastVal == nil {
179 _, err = d.DeltaLengthByteArrayDecoder.Decode(out[:1])
185 d.prefixLengths = d.prefixLengths[1:]
189 suffixHolder := make([]parquet.ByteArray, 1)
191 prefixLen, d.prefixLengths = d.prefixLengths[0], d.prefixLengths[1:]
193 prefix := d.lastVal[:prefixLen:prefixLen]
194 _, err = d.DeltaLengthByteArrayDecoder.Decode(suffixHolder)
199 if len(suffixHolder[0]) == 0 {
202 d.lastVal = make([]byte, int(prefixLen)+len(suffixHolder[0]))
203 copy(d.lastVal, prefix)
204 copy(d.lastVal[prefixLen:], suffixHolder[0])
206 out[0], out = d.lastVal, out[1:]
211 // DecodeSpaced is like decode, but the result is spaced out based on the bitmap provided.
212 func (d *DeltaByteArrayDecoder) DecodeSpaced(out []parquet.ByteArray, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
213 toread := len(out) - nullCount
214 values, err := d.Decode(out[:toread])
218 if values != toread {
219 return values, xerrors.New("parquet: number of values / definition levels read did not match")
222 return spacedExpand(out, nullCount, validBits, validBitsOffset), nil