]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/go/parquet/internal/encoding/delta_byte_array.go
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / go / parquet / internal / encoding / delta_byte_array.go
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16
17 package encoding
18
19 import (
20 "github.com/apache/arrow/go/v6/arrow/memory"
21 "github.com/apache/arrow/go/v6/parquet"
22 "github.com/apache/arrow/go/v6/parquet/internal/utils"
23 "golang.org/x/xerrors"
24 )
25
26 // DeltaByteArrayEncoder is an encoder for writing bytearrays which are delta encoded
27 // this is also known as incremental encoding or front compression. For each element
28 // in a sequence of strings, we store the prefix length of the previous entry plus the suffix
29 // see https://en.wikipedia.org/wiki/Incremental_encoding for a longer description.
30 //
31 // This is stored as a sequence of delta-encoded prefix lengths followed by the suffixes
32 // encoded as delta length byte arrays.
33 type DeltaByteArrayEncoder struct {
34 encoder
35
36 prefixEncoder *DeltaBitPackInt32Encoder
37 suffixEncoder *DeltaLengthByteArrayEncoder
38
39 lastVal parquet.ByteArray
40 }
41
42 func (enc *DeltaByteArrayEncoder) initEncoders() {
43 enc.prefixEncoder = &DeltaBitPackInt32Encoder{
44 deltaBitPackEncoder: &deltaBitPackEncoder{encoder: newEncoderBase(enc.encoding, nil, enc.mem)}}
45 enc.suffixEncoder = &DeltaLengthByteArrayEncoder{
46 newEncoderBase(enc.encoding, nil, enc.mem),
47 &DeltaBitPackInt32Encoder{
48 deltaBitPackEncoder: &deltaBitPackEncoder{encoder: newEncoderBase(enc.encoding, nil, enc.mem)}}}
49 }
50
51 // Type returns the underlying physical type this operates on, in this case ByteArrays only
52 func (DeltaByteArrayEncoder) Type() parquet.Type { return parquet.Types.ByteArray }
53
54 // Put writes a slice of ByteArrays to the encoder
55 func (enc *DeltaByteArrayEncoder) Put(in []parquet.ByteArray) {
56 if len(in) == 0 {
57 return
58 }
59
60 var suf parquet.ByteArray
61 if enc.prefixEncoder == nil { // initialize our encoders if we haven't yet
62 enc.initEncoders()
63 enc.prefixEncoder.Put([]int32{0})
64 suf = in[0]
65 enc.lastVal = in[0]
66 enc.suffixEncoder.Put([]parquet.ByteArray{suf})
67 in = in[1:]
68 }
69
70 // for each value, figure out the common prefix with the previous value
71 // and then write the prefix length and the suffix.
72 for _, val := range in {
73 l1 := enc.lastVal.Len()
74 l2 := val.Len()
75 j := 0
76 for j < l1 && j < l2 {
77 if enc.lastVal[j] != val[j] {
78 break
79 }
80 j++
81 }
82 enc.prefixEncoder.Put([]int32{int32(j)})
83 suf = val[j:]
84 enc.suffixEncoder.Put([]parquet.ByteArray{suf})
85 enc.lastVal = val
86 }
87
88 // do the memcpy after the loops to keep a copy of the lastVal
89 // we do a copy here so that we only copy and keep a reference
90 // to the suffix, and aren't forcing the *entire* value to stay
91 // in memory while we have this reference to just the suffix.
92 enc.lastVal = append([]byte{}, enc.lastVal...)
93 }
94
95 // PutSpaced is like Put, but assumes the data is already spaced for nulls and uses the bitmap provided and offset
96 // to compress the data before writing it without the null slots.
97 func (enc *DeltaByteArrayEncoder) PutSpaced(in []parquet.ByteArray, validBits []byte, validBitsOffset int64) {
98 if validBits != nil {
99 data := make([]parquet.ByteArray, len(in))
100 nvalid := spacedCompress(in, data, validBits, validBitsOffset)
101 enc.Put(data[:nvalid])
102 } else {
103 enc.Put(in)
104 }
105 }
106
107 // Flush flushes any remaining data out and returns the finished encoded buffer.
108 // or returns nil and any error encountered during flushing.
109 func (enc *DeltaByteArrayEncoder) FlushValues() (Buffer, error) {
110 if enc.prefixEncoder == nil {
111 enc.initEncoders()
112 }
113 prefixBuf, err := enc.prefixEncoder.FlushValues()
114 if err != nil {
115 return nil, err
116 }
117 defer prefixBuf.Release()
118
119 suffixBuf, err := enc.suffixEncoder.FlushValues()
120 if err != nil {
121 return nil, err
122 }
123 defer suffixBuf.Release()
124
125 ret := bufferPool.Get().(*memory.Buffer)
126 ret.ResizeNoShrink(prefixBuf.Len() + suffixBuf.Len())
127 copy(ret.Bytes(), prefixBuf.Bytes())
128 copy(ret.Bytes()[prefixBuf.Len():], suffixBuf.Bytes())
129 return poolBuffer{ret}, nil
130 }
131
132 // DeltaByteArrayDecoder is a decoder for a column of data encoded using incremental or prefix encoding.
133 type DeltaByteArrayDecoder struct {
134 *DeltaLengthByteArrayDecoder
135
136 prefixLengths []int32
137 lastVal parquet.ByteArray
138 }
139
140 // Type returns the underlying physical type this decoder operates on, in this case ByteArrays only
141 func (DeltaByteArrayDecoder) Type() parquet.Type {
142 return parquet.Types.ByteArray
143 }
144
145 func (d *DeltaByteArrayDecoder) Allocator() memory.Allocator { return d.mem }
146
147 // SetData expects the data passed in to be the prefix lengths, followed by the
148 // blocks of suffix data in order to initialize the decoder.
149 func (d *DeltaByteArrayDecoder) SetData(nvalues int, data []byte) error {
150 prefixLenDec := DeltaBitPackInt32Decoder{
151 deltaBitPackDecoder: &deltaBitPackDecoder{
152 decoder: newDecoderBase(d.encoding, d.descr),
153 mem: d.mem}}
154
155 if err := prefixLenDec.SetData(nvalues, data); err != nil {
156 return err
157 }
158
159 d.prefixLengths = make([]int32, nvalues)
160 // decode all the prefix lengths first so we know how many bytes it took to get the
161 // prefix lengths for nvalues
162 prefixLenDec.Decode(d.prefixLengths)
163
164 // now that we know how many bytes we needed for the prefix lengths, the rest are the
165 // delta length byte array encoding.
166 return d.DeltaLengthByteArrayDecoder.SetData(nvalues, data[int(prefixLenDec.bytesRead()):])
167 }
168
169 // Decode decodes byte arrays into the slice provided and returns the number of values actually decoded
170 func (d *DeltaByteArrayDecoder) Decode(out []parquet.ByteArray) (int, error) {
171 max := utils.MinInt(len(out), d.nvals)
172 if max == 0 {
173 return 0, nil
174 }
175 out = out[:max]
176
177 var err error
178 if d.lastVal == nil {
179 _, err = d.DeltaLengthByteArrayDecoder.Decode(out[:1])
180 if err != nil {
181 return 0, err
182 }
183 d.lastVal = out[0]
184 out = out[1:]
185 d.prefixLengths = d.prefixLengths[1:]
186 }
187
188 var prefixLen int32
189 suffixHolder := make([]parquet.ByteArray, 1)
190 for len(out) > 0 {
191 prefixLen, d.prefixLengths = d.prefixLengths[0], d.prefixLengths[1:]
192
193 prefix := d.lastVal[:prefixLen:prefixLen]
194 _, err = d.DeltaLengthByteArrayDecoder.Decode(suffixHolder)
195 if err != nil {
196 return 0, err
197 }
198
199 if len(suffixHolder[0]) == 0 {
200 d.lastVal = prefix
201 } else {
202 d.lastVal = make([]byte, int(prefixLen)+len(suffixHolder[0]))
203 copy(d.lastVal, prefix)
204 copy(d.lastVal[prefixLen:], suffixHolder[0])
205 }
206 out[0], out = d.lastVal, out[1:]
207 }
208 return max, nil
209 }
210
211 // DecodeSpaced is like decode, but the result is spaced out based on the bitmap provided.
212 func (d *DeltaByteArrayDecoder) DecodeSpaced(out []parquet.ByteArray, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
213 toread := len(out) - nullCount
214 values, err := d.Decode(out[:toread])
215 if err != nil {
216 return values, err
217 }
218 if values != toread {
219 return values, xerrors.New("parquet: number of values / definition levels read did not match")
220 }
221
222 return spacedExpand(out, nullCount, validBits, validBitsOffset), nil
223 }