]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/go/parquet/internal/encoding/levels.go
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / go / parquet / internal / encoding / levels.go
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16
17 package encoding
18
19 import (
20 "bytes"
21 "encoding/binary"
22 "io"
23 "math/bits"
24
25 "github.com/JohnCGriffin/overflow"
26 "github.com/apache/arrow/go/v6/arrow/bitutil"
27 "github.com/apache/arrow/go/v6/parquet"
28 format "github.com/apache/arrow/go/v6/parquet/internal/gen-go/parquet"
29 "github.com/apache/arrow/go/v6/parquet/internal/utils"
30 "golang.org/x/xerrors"
31 )
32
33 // LevelEncoder is for handling the encoding of Definition and Repetition levels
34 // to parquet files.
35 type LevelEncoder struct {
36 bitWidth int
37 rleLen int
38 encoding format.Encoding
39 rle *utils.RleEncoder
40 bit *utils.BitWriter
41 }
42
43 // LevelEncodingMaxBufferSize estimates the max number of bytes needed to encode data with the
44 // specified encoding given the max level and number of buffered values provided.
45 func LevelEncodingMaxBufferSize(encoding parquet.Encoding, maxLvl int16, nbuffered int) int {
46 bitWidth := bits.Len64(uint64(maxLvl))
47 nbytes := 0
48 switch encoding {
49 case parquet.Encodings.RLE:
50 nbytes = utils.MaxBufferSize(bitWidth, nbuffered) + utils.MinBufferSize(bitWidth)
51 case parquet.Encodings.BitPacked:
52 nbytes = int(bitutil.BytesForBits(int64(nbuffered * bitWidth)))
53 default:
54 panic("parquet: unknown encoding type for levels")
55 }
56 return nbytes
57 }
58
59 // Reset resets the encoder allowing it to be reused and updating the maxlevel to the new
60 // specified value.
61 func (l *LevelEncoder) Reset(maxLvl int16) {
62 l.bitWidth = bits.Len64(uint64(maxLvl))
63 switch l.encoding {
64 case format.Encoding_RLE:
65 l.rle.Clear()
66 l.rle.BitWidth = l.bitWidth
67 case format.Encoding_BIT_PACKED:
68 l.bit.Clear()
69 default:
70 panic("parquet: unknown encoding type")
71 }
72 }
73
74 // Init is called to set up the desired encoding type, max level and underlying writer for a
75 // level encoder to control where the resulting encoded buffer will end up.
76 func (l *LevelEncoder) Init(encoding parquet.Encoding, maxLvl int16, w io.WriterAt) {
77 l.bitWidth = bits.Len64(uint64(maxLvl))
78 l.encoding = format.Encoding(encoding)
79 switch l.encoding {
80 case format.Encoding_RLE:
81 l.rle = utils.NewRleEncoder(w, l.bitWidth)
82 case format.Encoding_BIT_PACKED:
83 l.bit = utils.NewBitWriter(w)
84 default:
85 panic("parquet: unknown encoding type for levels")
86 }
87 }
88
89 // EncodeNoFlush encodes the provided levels in the encoder, but doesn't flush
90 // the buffer and return it yet, appending these encoded values. Returns the number
91 // of values encoded and any error encountered or nil. If err is not nil, nencoded
92 // will be the number of values encoded before the error was encountered
93 func (l *LevelEncoder) EncodeNoFlush(lvls []int16) (nencoded int, err error) {
94 if l.rle == nil && l.bit == nil {
95 panic("parquet: level encoders are not initialized")
96 }
97
98 switch l.encoding {
99 case format.Encoding_RLE:
100 for _, level := range lvls {
101 if err = l.rle.Put(uint64(level)); err != nil {
102 return
103 }
104 nencoded++
105 }
106 default:
107 for _, level := range lvls {
108 if err = l.bit.WriteValue(uint64(level), uint(l.bitWidth)); err != nil {
109 return
110 }
111 nencoded++
112 }
113 }
114 return
115 }
116
117 // Flush flushes out any encoded data to the underlying writer.
118 func (l *LevelEncoder) Flush() {
119 if l.rle == nil && l.bit == nil {
120 panic("parquet: level encoders are not initialized")
121 }
122
123 switch l.encoding {
124 case format.Encoding_RLE:
125 l.rleLen = l.rle.Flush()
126 default:
127 l.bit.Flush(false)
128 }
129 }
130
131 // Encode encodes the slice of definition or repetition levels based on
132 // the currently configured encoding type and returns the number of
133 // values that were encoded.
134 func (l *LevelEncoder) Encode(lvls []int16) (nencoded int, err error) {
135 if l.rle == nil && l.bit == nil {
136 panic("parquet: level encoders are not initialized")
137 }
138
139 switch l.encoding {
140 case format.Encoding_RLE:
141 defer func() { l.rleLen = l.rle.Flush() }()
142 for _, level := range lvls {
143 if err = l.rle.Put(uint64(level)); err != nil {
144 return
145 }
146 nencoded++
147 }
148
149 default:
150 defer l.bit.Flush(false)
151 for _, level := range lvls {
152 if err = l.bit.WriteValue(uint64(level), uint(l.bitWidth)); err != nil {
153 return
154 }
155 nencoded++
156 }
157 }
158 return
159 }
160
161 // Len returns the number of bytes that were written as Run Length encoded
162 // levels, this is only valid for run length encoding and will panic if using
163 // deprecated bit packed encoding.
164 func (l *LevelEncoder) Len() int {
165 if l.encoding != format.Encoding_RLE {
166 panic("parquet: level encoder, only implemented for RLE")
167 }
168 return l.rleLen
169 }
170
171 // LevelDecoder handles the decoding of repetition and definition levels from a
172 // parquet file supporting bit packed and run length encoded values.
173 type LevelDecoder struct {
174 bitWidth int
175 remaining int // the number of values left to be decoded in the input data
176 maxLvl int16
177 encoding format.Encoding
178 // only one of the following should ever be set at a time based on the
179 // encoding format.
180 rle *utils.RleDecoder
181 bit *utils.BitReader
182 }
183
184 // SetData sets in the data to be decoded by subsequent calls by specifying the encoding type
185 // the maximum level (which is what determines the bit width), the number of values expected
186 // and the raw bytes to decode. Returns the number of bytes expected to be decoded.
187 func (l *LevelDecoder) SetData(encoding parquet.Encoding, maxLvl int16, nbuffered int, data []byte) (int, error) {
188 l.maxLvl = maxLvl
189 l.encoding = format.Encoding(encoding)
190 l.remaining = nbuffered
191 l.bitWidth = bits.Len64(uint64(maxLvl))
192
193 switch encoding {
194 case parquet.Encodings.RLE:
195 if len(data) < 4 {
196 return 0, xerrors.New("parquet: received invalid levels (corrupt data page?)")
197 }
198
199 nbytes := int32(binary.LittleEndian.Uint32(data[:4]))
200 if nbytes < 0 || nbytes > int32(len(data)-4) {
201 return 0, xerrors.New("parquet: received invalid number of bytes (corrupt data page?)")
202 }
203
204 buf := data[4:]
205 if l.rle == nil {
206 l.rle = utils.NewRleDecoder(bytes.NewReader(buf), l.bitWidth)
207 } else {
208 l.rle.Reset(bytes.NewReader(buf), l.bitWidth)
209 }
210 return int(nbytes) + 4, nil
211 case parquet.Encodings.BitPacked:
212 nbits, ok := overflow.Mul(nbuffered, l.bitWidth)
213 if !ok {
214 return 0, xerrors.New("parquet: number of buffered values too large (corrupt data page?)")
215 }
216
217 nbytes := bitutil.BytesForBits(int64(nbits))
218 if nbytes < 0 || nbytes > int64(len(data)) {
219 return 0, xerrors.New("parquet: recieved invalid number of bytes (corrupt data page?)")
220 }
221 if l.bit == nil {
222 l.bit = utils.NewBitReader(bytes.NewReader(data))
223 } else {
224 l.bit.Reset(bytes.NewReader(data))
225 }
226 return int(nbytes), nil
227 default:
228 return 0, xerrors.Errorf("parquet: unknown encoding type for levels '%s'", encoding)
229 }
230 }
231
232 // SetDataV2 is the same as SetData but only for DataPageV2 pages and only supports
233 // run length encoding.
234 func (l *LevelDecoder) SetDataV2(nbytes int32, maxLvl int16, nbuffered int, data []byte) error {
235 if nbytes < 0 {
236 return xerrors.New("parquet: invalid page header (corrupt data page?)")
237 }
238
239 l.maxLvl = maxLvl
240 l.encoding = format.Encoding_RLE
241 l.remaining = nbuffered
242 l.bitWidth = bits.Len64(uint64(maxLvl))
243
244 if l.rle == nil {
245 l.rle = utils.NewRleDecoder(bytes.NewReader(data), l.bitWidth)
246 } else {
247 l.rle.Reset(bytes.NewReader(data), l.bitWidth)
248 }
249 return nil
250 }
251
252 // Decode decodes the bytes that were set with SetData into the slice of levels
253 // returning the total number of levels that were decoded and the number of
254 // values which had a level equal to the max level, indicating how many physical
255 // values exist to be read.
256 func (l *LevelDecoder) Decode(levels []int16) (int, int64) {
257 var (
258 buf [1024]uint64
259 totaldecoded int
260 decoded int
261 valsToRead int64
262 )
263
264 n := utils.Min(int64(l.remaining), int64(len(levels)))
265 for n > 0 {
266 batch := utils.Min(1024, n)
267 switch l.encoding {
268 case format.Encoding_RLE:
269 decoded = l.rle.GetBatch(buf[:batch])
270 case format.Encoding_BIT_PACKED:
271 decoded, _ = l.bit.GetBatch(uint(l.bitWidth), buf[:batch])
272 }
273 l.remaining -= decoded
274 totaldecoded += decoded
275 n -= batch
276
277 for idx, val := range buf[:decoded] {
278 lvl := int16(val)
279 levels[idx] = lvl
280 if lvl == l.maxLvl {
281 valsToRead++
282 }
283 }
284 levels = levels[decoded:]
285 }
286
287 return totaldecoded, valsToRead
288 }