]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/go/parquet/internal/utils/bit_reader.go
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / go / parquet / internal / utils / bit_reader.go
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16
17 package utils
18
19 import (
20 "encoding/binary"
21 "errors"
22 "io"
23 "math"
24 "reflect"
25 "unsafe"
26
27 "github.com/apache/arrow/go/v6/arrow"
28 "github.com/apache/arrow/go/v6/arrow/bitutil"
29 "github.com/apache/arrow/go/v6/arrow/memory"
30 )
31
32 // masks for grabbing the trailing bits based on the number of trailing bits desired
33 var trailingMask [64]uint64
34
35 func init() {
36 // generate the masks at init so we don't have to hard code them.
37 for i := 0; i < 64; i++ {
38 trailingMask[i] = (math.MaxUint64 >> (64 - i))
39 }
40 }
41
42 // trailingBits returns a value constructed from the bits trailing bits of
43 // the value v that is passed in. If bits >= 64, then we just return v.
44 func trailingBits(v uint64, bits uint) uint64 {
45 if bits >= 64 {
46 return v
47 }
48 return v & trailingMask[bits]
49 }
50
51 // reader is a useful interface to define the functionality we need for implementation
52 type reader interface {
53 io.Reader
54 io.ReaderAt
55 io.Seeker
56 }
57
58 // default buffer length
59 const buflen = 1024
60
61 // BitReader implements functionality for reading bits or bytes buffering up to a uint64
62 // at a time from the reader in order to improve efficiency. It also provides
63 // methods to read multiple bytes in one read such as encoded ints/values.
64 //
65 // This BitReader is the basis for the other utility classes like RLE decoding
66 // and such, providing the necessary functions for interpreting the values.
67 type BitReader struct {
68 reader reader
69 buffer uint64
70 byteoffset int64
71 bitoffset uint
72 raw [8]byte
73
74 unpackBuf [buflen]uint32
75 }
76
77 // NewBitReader takes in a reader that implements io.Reader, io.ReaderAt and io.Seeker
78 // interfaces and returns a BitReader for use with various bit level manipulations.
79 func NewBitReader(r reader) *BitReader {
80 return &BitReader{reader: r}
81 }
82
83 // CurOffset returns the current Byte offset into the data that the reader is at.
84 func (b *BitReader) CurOffset() int64 {
85 return b.byteoffset + bitutil.BytesForBits(int64(b.bitoffset))
86 }
87
88 // Reset allows reusing a BitReader by setting a new reader and resetting the internal
89 // state back to zeros.
90 func (b *BitReader) Reset(r reader) {
91 b.reader = r
92 b.buffer = 0
93 b.byteoffset = 0
94 b.bitoffset = 0
95 }
96
97 // GetVlqInt reads a Vlq encoded int from the stream. The encoded value must start
98 // at the beginning of a byte and this returns false if there weren't enough bytes
99 // in the buffer or reader. This will call `ReadByte` which in turn retrieves byte
100 // aligned values from the reader
101 func (b *BitReader) GetVlqInt() (uint64, bool) {
102 tmp, err := binary.ReadUvarint(b)
103 if err != nil {
104 return 0, false
105 }
106 return tmp, true
107 }
108
109 // GetZigZagVlqInt reads a zigzag encoded integer, returning false if there weren't
110 // enough bytes remaining.
111 func (b *BitReader) GetZigZagVlqInt() (int64, bool) {
112 u, ok := b.GetVlqInt()
113 if !ok {
114 return 0, false
115 }
116
117 return int64(u>>1) ^ -int64(u&1), true
118 }
119
120 // ReadByte reads a single aligned byte from the underlying stream, or populating
121 // error if there aren't enough bytes left.
122 func (b *BitReader) ReadByte() (byte, error) {
123 var tmp byte
124 if ok := b.GetAligned(1, &tmp); !ok {
125 return 0, errors.New("failed to read byte")
126 }
127
128 return tmp, nil
129 }
130
131 // GetAligned reads nbytes from the underlying stream into the passed interface value.
132 // Returning false if there aren't enough bytes remaining in the stream or if an invalid
133 // type is passed. The bytes are read aligned to byte boundaries.
134 //
135 // v must be a pointer to a byte or sized uint type (*byte, *uint16, *uint32, *uint64).
136 // encoded values are assumed to be little endian.
137 func (b *BitReader) GetAligned(nbytes int, v interface{}) bool {
138 // figure out the number of bytes to represent v
139 typBytes := int(reflect.TypeOf(v).Elem().Size())
140 if nbytes > typBytes {
141 return false
142 }
143
144 bread := bitutil.BytesForBits(int64(b.bitoffset))
145
146 b.byteoffset += bread
147 n, err := b.reader.ReadAt(b.raw[:nbytes], b.byteoffset)
148 if err != nil && err != io.EOF {
149 return false
150 }
151 if n != nbytes {
152 return false
153 }
154 // zero pad the the bytes
155 memory.Set(b.raw[n:typBytes], 0)
156
157 switch v := v.(type) {
158 case *byte:
159 *v = b.raw[0]
160 case *uint64:
161 *v = binary.LittleEndian.Uint64(b.raw[:typBytes])
162 case *uint32:
163 *v = binary.LittleEndian.Uint32(b.raw[:typBytes])
164 case *uint16:
165 *v = binary.LittleEndian.Uint16(b.raw[:typBytes])
166 default:
167 return false
168 }
169
170 b.byteoffset += int64(nbytes)
171
172 b.bitoffset = 0
173 b.fillbuffer()
174 return true
175 }
176
177 // fillbuffer fills the uint64 buffer with bytes from the underlying stream
178 func (b *BitReader) fillbuffer() error {
179 n, err := b.reader.ReadAt(b.raw[:], b.byteoffset)
180 if err != nil && n == 0 && err != io.EOF {
181 return err
182 }
183 for i := n; i < 8; i++ {
184 b.raw[i] = 0
185 }
186 b.buffer = binary.LittleEndian.Uint64(b.raw[:])
187 return nil
188 }
189
190 // next reads an integral value from the next bits in the buffer
191 func (b *BitReader) next(bits uint) (v uint64, err error) {
192 v = trailingBits(b.buffer, b.bitoffset+bits) >> b.bitoffset
193 b.bitoffset += bits
194 // if we need more bits to get what was requested then refill the buffer
195 if b.bitoffset >= 64 {
196 b.byteoffset += 8
197 b.bitoffset -= 64
198 if err = b.fillbuffer(); err != nil {
199 return 0, err
200 }
201 v |= trailingBits(b.buffer, b.bitoffset) << (bits - b.bitoffset)
202 }
203 return
204 }
205
206 // GetBatchIndex is like GetBatch but for IndexType (used for dictionary decoding)
207 func (b *BitReader) GetBatchIndex(bits uint, out []IndexType) (i int, err error) {
208 // IndexType is a 32-bit value so bits must be less than 32 when unpacking
209 // values using the bitreader.
210 if bits > 32 {
211 return 0, errors.New("must be 32 bits or less per read")
212 }
213
214 var val uint64
215
216 length := len(out)
217 // if we're not currently byte-aligned, read bits until we are byte-aligned.
218 for ; i < length && b.bitoffset != 0; i++ {
219 val, err = b.next(bits)
220 out[i] = IndexType(val)
221 if err != nil {
222 return
223 }
224 }
225
226 b.reader.Seek(b.byteoffset, io.SeekStart)
227 // grab as many 32 byte chunks as possible in one shot
228 if i < length { // IndexType should be a 32 bit value so we can do quick unpacking right into the output
229 numUnpacked := unpack32(b.reader, (*(*[]uint32)(unsafe.Pointer(&out)))[i:], int(bits))
230 i += numUnpacked
231 b.byteoffset += int64(numUnpacked * int(bits) / 8)
232 }
233
234 // re-fill our buffer just in case.
235 b.fillbuffer()
236 // grab the remaining values that aren't 32 byte aligned
237 for ; i < length; i++ {
238 val, err = b.next(bits)
239 out[i] = IndexType(val)
240 if err != nil {
241 break
242 }
243 }
244 return
245 }
246
247 // GetBatchBools is like GetBatch but optimized for reading bits as boolean values
248 func (b *BitReader) GetBatchBools(out []bool) (int, error) {
249 bits := uint(1)
250 length := len(out)
251
252 i := 0
253 // read until we are byte-aligned
254 for ; i < length && b.bitoffset != 0; i++ {
255 val, err := b.next(bits)
256 out[i] = val != 0
257 if err != nil {
258 return i, err
259 }
260 }
261
262 b.reader.Seek(b.byteoffset, io.SeekStart)
263 buf := arrow.Uint32Traits.CastToBytes(b.unpackBuf[:])
264 blen := buflen * 8
265 for i < length {
266 // grab byte-aligned bits in a loop since it's more efficient than going
267 // bit by bit when you can grab 8 bools at a time.
268 unpackSize := MinInt(blen, length-i) / 8 * 8
269 n, err := b.reader.Read(buf[:bitutil.BytesForBits(int64(unpackSize))])
270 if err != nil {
271 return i, err
272 }
273 BytesToBools(buf[:n], out[i:])
274 i += unpackSize
275 b.byteoffset += int64(n)
276 }
277
278 b.fillbuffer()
279 // grab the trailing bits
280 for ; i < length; i++ {
281 val, err := b.next(bits)
282 out[i] = val != 0
283 if err != nil {
284 return i, err
285 }
286 }
287
288 return i, nil
289 }
290
291 // GetBatch fills out by decoding values repeated from the stream that are encoded
292 // using bits as the number of bits per value. The values are expected to be bit packed
293 // so we will unpack the values to populate.
294 func (b *BitReader) GetBatch(bits uint, out []uint64) (int, error) {
295 // since we're unpacking into uint64 values, we can't support bits being
296 // larger than 64 here as that's the largest size value we're reading
297 if bits > 64 {
298 return 0, errors.New("must be 64 bits or less per read")
299 }
300
301 length := len(out)
302
303 i := 0
304 // read until we are byte aligned
305 for ; i < length && b.bitoffset != 0; i++ {
306 val, err := b.next(bits)
307 out[i] = val
308 if err != nil {
309 return i, err
310 }
311 }
312
313 b.reader.Seek(b.byteoffset, io.SeekStart)
314 for i < length {
315 // unpack groups of 32 bytes at a time into a buffer since it's more efficient
316 unpackSize := MinInt(buflen, length-i)
317 numUnpacked := unpack32(b.reader, b.unpackBuf[:unpackSize], int(bits))
318 if numUnpacked == 0 {
319 break
320 }
321
322 for k := 0; k < numUnpacked; k++ {
323 out[i+k] = uint64(b.unpackBuf[k])
324 }
325 i += numUnpacked
326 b.byteoffset += int64(numUnpacked * int(bits) / 8)
327 }
328
329 b.fillbuffer()
330 // and then the remaining trailing values
331 for ; i < length; i++ {
332 val, err := b.next(bits)
333 out[i] = val
334 if err != nil {
335 return i, err
336 }
337 }
338
339 return i, nil
340 }
341
342 // GetValue returns a single value that is bit packed using width as the number of bits
343 // and returns false if there weren't enough bits remaining.
344 func (b *BitReader) GetValue(width int) (uint64, bool) {
345 v := make([]uint64, 1)
346 n, _ := b.GetBatch(uint(width), v)
347 return v[0], n == 1
348 }