1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
25 "github.com/JohnCGriffin/overflow"
26 "github.com/apache/arrow/go/v6/arrow/bitutil"
27 "github.com/apache/arrow/go/v6/parquet"
28 format "github.com/apache/arrow/go/v6/parquet/internal/gen-go/parquet"
29 "github.com/apache/arrow/go/v6/parquet/internal/utils"
30 "golang.org/x/xerrors"
33 // LevelEncoder is for handling the encoding of Definition and Repetition levels
35 type LevelEncoder struct {
38 encoding format.Encoding
43 // LevelEncodingMaxBufferSize estimates the max number of bytes needed to encode data with the
44 // specified encoding given the max level and number of buffered values provided.
45 func LevelEncodingMaxBufferSize(encoding parquet.Encoding, maxLvl int16, nbuffered int) int {
46 bitWidth := bits.Len64(uint64(maxLvl))
49 case parquet.Encodings.RLE:
50 nbytes = utils.MaxBufferSize(bitWidth, nbuffered) + utils.MinBufferSize(bitWidth)
51 case parquet.Encodings.BitPacked:
52 nbytes = int(bitutil.BytesForBits(int64(nbuffered * bitWidth)))
54 panic("parquet: unknown encoding type for levels")
59 // Reset resets the encoder allowing it to be reused and updating the maxlevel to the new
61 func (l *LevelEncoder) Reset(maxLvl int16) {
62 l.bitWidth = bits.Len64(uint64(maxLvl))
64 case format.Encoding_RLE:
66 l.rle.BitWidth = l.bitWidth
67 case format.Encoding_BIT_PACKED:
70 panic("parquet: unknown encoding type")
74 // Init is called to set up the desired encoding type, max level and underlying writer for a
75 // level encoder to control where the resulting encoded buffer will end up.
76 func (l *LevelEncoder) Init(encoding parquet.Encoding, maxLvl int16, w io.WriterAt) {
77 l.bitWidth = bits.Len64(uint64(maxLvl))
78 l.encoding = format.Encoding(encoding)
80 case format.Encoding_RLE:
81 l.rle = utils.NewRleEncoder(w, l.bitWidth)
82 case format.Encoding_BIT_PACKED:
83 l.bit = utils.NewBitWriter(w)
85 panic("parquet: unknown encoding type for levels")
89 // EncodeNoFlush encodes the provided levels in the encoder, but doesn't flush
90 // the buffer and return it yet, appending these encoded values. Returns the number
91 // of values encoded and any error encountered or nil. If err is not nil, nencoded
92 // will be the number of values encoded before the error was encountered
93 func (l *LevelEncoder) EncodeNoFlush(lvls []int16) (nencoded int, err error) {
94 if l.rle == nil && l.bit == nil {
95 panic("parquet: level encoders are not initialized")
99 case format.Encoding_RLE:
100 for _, level := range lvls {
101 if err = l.rle.Put(uint64(level)); err != nil {
107 for _, level := range lvls {
108 if err = l.bit.WriteValue(uint64(level), uint(l.bitWidth)); err != nil {
117 // Flush flushes out any encoded data to the underlying writer.
118 func (l *LevelEncoder) Flush() {
119 if l.rle == nil && l.bit == nil {
120 panic("parquet: level encoders are not initialized")
124 case format.Encoding_RLE:
125 l.rleLen = l.rle.Flush()
131 // Encode encodes the slice of definition or repetition levels based on
132 // the currently configured encoding type and returns the number of
133 // values that were encoded.
134 func (l *LevelEncoder) Encode(lvls []int16) (nencoded int, err error) {
135 if l.rle == nil && l.bit == nil {
136 panic("parquet: level encoders are not initialized")
140 case format.Encoding_RLE:
141 defer func() { l.rleLen = l.rle.Flush() }()
142 for _, level := range lvls {
143 if err = l.rle.Put(uint64(level)); err != nil {
150 defer l.bit.Flush(false)
151 for _, level := range lvls {
152 if err = l.bit.WriteValue(uint64(level), uint(l.bitWidth)); err != nil {
161 // Len returns the number of bytes that were written as Run Length encoded
162 // levels, this is only valid for run length encoding and will panic if using
163 // deprecated bit packed encoding.
164 func (l *LevelEncoder) Len() int {
165 if l.encoding != format.Encoding_RLE {
166 panic("parquet: level encoder, only implemented for RLE")
171 // LevelDecoder handles the decoding of repetition and definition levels from a
172 // parquet file supporting bit packed and run length encoded values.
173 type LevelDecoder struct {
175 remaining int // the number of values left to be decoded in the input data
177 encoding format.Encoding
178 // only one of the following should ever be set at a time based on the
180 rle *utils.RleDecoder
184 // SetData sets in the data to be decoded by subsequent calls by specifying the encoding type
185 // the maximum level (which is what determines the bit width), the number of values expected
186 // and the raw bytes to decode. Returns the number of bytes expected to be decoded.
187 func (l *LevelDecoder) SetData(encoding parquet.Encoding, maxLvl int16, nbuffered int, data []byte) (int, error) {
189 l.encoding = format.Encoding(encoding)
190 l.remaining = nbuffered
191 l.bitWidth = bits.Len64(uint64(maxLvl))
194 case parquet.Encodings.RLE:
196 return 0, xerrors.New("parquet: received invalid levels (corrupt data page?)")
199 nbytes := int32(binary.LittleEndian.Uint32(data[:4]))
200 if nbytes < 0 || nbytes > int32(len(data)-4) {
201 return 0, xerrors.New("parquet: received invalid number of bytes (corrupt data page?)")
206 l.rle = utils.NewRleDecoder(bytes.NewReader(buf), l.bitWidth)
208 l.rle.Reset(bytes.NewReader(buf), l.bitWidth)
210 return int(nbytes) + 4, nil
211 case parquet.Encodings.BitPacked:
212 nbits, ok := overflow.Mul(nbuffered, l.bitWidth)
214 return 0, xerrors.New("parquet: number of buffered values too large (corrupt data page?)")
217 nbytes := bitutil.BytesForBits(int64(nbits))
218 if nbytes < 0 || nbytes > int64(len(data)) {
219 return 0, xerrors.New("parquet: recieved invalid number of bytes (corrupt data page?)")
222 l.bit = utils.NewBitReader(bytes.NewReader(data))
224 l.bit.Reset(bytes.NewReader(data))
226 return int(nbytes), nil
228 return 0, xerrors.Errorf("parquet: unknown encoding type for levels '%s'", encoding)
232 // SetDataV2 is the same as SetData but only for DataPageV2 pages and only supports
233 // run length encoding.
234 func (l *LevelDecoder) SetDataV2(nbytes int32, maxLvl int16, nbuffered int, data []byte) error {
236 return xerrors.New("parquet: invalid page header (corrupt data page?)")
240 l.encoding = format.Encoding_RLE
241 l.remaining = nbuffered
242 l.bitWidth = bits.Len64(uint64(maxLvl))
245 l.rle = utils.NewRleDecoder(bytes.NewReader(data), l.bitWidth)
247 l.rle.Reset(bytes.NewReader(data), l.bitWidth)
252 // Decode decodes the bytes that were set with SetData into the slice of levels
253 // returning the total number of levels that were decoded and the number of
254 // values which had a level equal to the max level, indicating how many physical
255 // values exist to be read.
256 func (l *LevelDecoder) Decode(levels []int16) (int, int64) {
264 n := utils.Min(int64(l.remaining), int64(len(levels)))
266 batch := utils.Min(1024, n)
268 case format.Encoding_RLE:
269 decoded = l.rle.GetBatch(buf[:batch])
270 case format.Encoding_BIT_PACKED:
271 decoded, _ = l.bit.GetBatch(uint(l.bitWidth), buf[:batch])
273 l.remaining -= decoded
274 totaldecoded += decoded
277 for idx, val := range buf[:decoded] {
284 levels = levels[decoded:]
287 return totaldecoded, valsToRead