]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/go/parquet/internal/encoding/encoder.go
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / go / parquet / internal / encoding / encoder.go
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16
17 package encoding
18
19 import (
20 "math/bits"
21 "reflect"
22
23 "github.com/apache/arrow/go/v6/arrow"
24 "github.com/apache/arrow/go/v6/arrow/bitutil"
25 "github.com/apache/arrow/go/v6/arrow/memory"
26 "github.com/apache/arrow/go/v6/parquet"
27 format "github.com/apache/arrow/go/v6/parquet/internal/gen-go/parquet"
28 "github.com/apache/arrow/go/v6/parquet/internal/utils"
29 "github.com/apache/arrow/go/v6/parquet/schema"
30 )
31
32 //go:generate go run ../../../arrow/_tools/tmpl/main.go -i -data=physical_types.tmpldata plain_encoder_types.gen.go.tmpl typed_encoder.gen.go.tmpl
33
34 // EncoderTraits is an interface for the different types to make it more
35 // convenient to construct encoders for specific types.
36 type EncoderTraits interface {
37 Encoder(format.Encoding, bool, *schema.Column, memory.Allocator) TypedEncoder
38 }
39
40 // NewEncoder will return the appropriately typed encoder for the requested physical type
41 // and encoding.
42 //
43 // If mem is nil, memory.DefaultAllocator will be used.
44 func NewEncoder(t parquet.Type, e parquet.Encoding, useDict bool, descr *schema.Column, mem memory.Allocator) TypedEncoder {
45 traits := getEncodingTraits(t)
46 if traits == nil {
47 return nil
48 }
49
50 if mem == nil {
51 mem = memory.DefaultAllocator
52 }
53 return traits.Encoder(format.Encoding(e), useDict, descr, mem)
54 }
55
56 type encoder struct {
57 descr *schema.Column
58 encoding format.Encoding
59 typeLen int
60 mem memory.Allocator
61
62 sink *PooledBufferWriter
63 }
64
65 // newEncoderBase constructs a new base encoder for embedding on the typed encoders
66 // encapsulating the common functionality.
67 func newEncoderBase(e format.Encoding, descr *schema.Column, mem memory.Allocator) encoder {
68 typelen := -1
69 if descr != nil && descr.PhysicalType() == parquet.Types.FixedLenByteArray {
70 typelen = int(descr.TypeLength())
71 }
72 return encoder{
73 descr: descr,
74 encoding: e,
75 mem: mem,
76 typeLen: typelen,
77 sink: NewPooledBufferWriter(1024),
78 }
79 }
80
81 // ReserveForWrite allocates n bytes so that the next n bytes written do not require new allocations.
82 func (e *encoder) ReserveForWrite(n int) { e.sink.Reserve(n) }
83 func (e *encoder) EstimatedDataEncodedSize() int64 { return int64(e.sink.Len()) }
84 func (e *encoder) Encoding() parquet.Encoding { return parquet.Encoding(e.encoding) }
85 func (e *encoder) Allocator() memory.Allocator { return e.mem }
86 func (e *encoder) append(data []byte) { e.sink.Write(data) }
87
88 // FlushValues flushes any unwritten data to the buffer and returns the finished encoded buffer of data.
89 // This also clears the encoder, ownership of the data belongs to whomever called FlushValues, Release
90 // should be called on the resulting Buffer when done.
91 func (e *encoder) FlushValues() (Buffer, error) { return e.sink.Finish(), nil }
92
93 // Bytes returns the current bytes that have been written to the encoder's buffer but doesn't transfer ownership.
94 func (e *encoder) Bytes() []byte { return e.sink.Bytes() }
95
96 // Reset drops the data currently in the encoder and resets for new use.
97 func (e *encoder) Reset() { e.sink.Reset(0) }
98
99 type dictEncoder struct {
100 encoder
101
102 dictEncodedSize int
103 idxBuffer *memory.Buffer
104 idxValues []int32
105 memo MemoTable
106 }
107
108 // newDictEncoderBase constructs and returns a dictionary encoder for the appropriate type using the passed
109 // in memo table for constructing the index.
110 func newDictEncoderBase(descr *schema.Column, memo MemoTable, mem memory.Allocator) dictEncoder {
111 return dictEncoder{
112 encoder: newEncoderBase(format.Encoding_PLAIN_DICTIONARY, descr, mem),
113 idxBuffer: memory.NewResizableBuffer(mem),
114 memo: memo,
115 }
116 }
117
118 // Reset drops all the currently encoded values from the index and indexes from the data to allow
119 // restarting the encoding process.
120 func (d *dictEncoder) Reset() {
121 d.encoder.Reset()
122 d.dictEncodedSize = 0
123 d.idxValues = d.idxValues[:0]
124 d.idxBuffer.ResizeNoShrink(0)
125 d.memo.Reset()
126 }
127
128 // append the passed index to the indexbuffer
129 func (d *dictEncoder) addIndex(idx int) {
130 if len(d.idxValues) == cap(d.idxValues) {
131 curLen := len(d.idxValues)
132 d.idxBuffer.ResizeNoShrink(arrow.Int32Traits.BytesRequired(bitutil.NextPowerOf2(curLen + 1)))
133 d.idxValues = arrow.Int32Traits.CastFromBytes(d.idxBuffer.Buf())[: curLen : d.idxBuffer.Len()/arrow.Int32SizeBytes]
134 }
135 d.idxValues = append(d.idxValues, int32(idx))
136 }
137
138 // FlushValues dumps all the currently buffered indexes that would become the data page to a buffer and
139 // returns it or returns nil and any error encountered.
140 func (d *dictEncoder) FlushValues() (Buffer, error) {
141 buf := bufferPool.Get().(*memory.Buffer)
142 buf.Reserve(int(d.EstimatedDataEncodedSize()))
143 size, err := d.WriteIndices(buf.Buf())
144 if err != nil {
145 poolBuffer{buf}.Release()
146 return nil, err
147 }
148 buf.ResizeNoShrink(size)
149 return poolBuffer{buf}, nil
150 }
151
152 // EstimatedDataEncodedSize returns the maximum number of bytes needed to store the RLE encoded indexes, not including the
153 // dictionary index in the computation.
154 func (d *dictEncoder) EstimatedDataEncodedSize() int64 {
155 return 1 + int64(utils.MaxBufferSize(d.BitWidth(), len(d.idxValues))+utils.MinBufferSize(d.BitWidth()))
156 }
157
158 // NumEntries returns the number of entires in the dictionary index for this encoder.
159 func (d *dictEncoder) NumEntries() int {
160 return d.memo.Size()
161 }
162
163 // BitWidth returns the max bitwidth that would be necessary for encoding the index values currently
164 // in the dictionary based on the size of the dictionary index.
165 func (d *dictEncoder) BitWidth() int {
166 switch d.NumEntries() {
167 case 0:
168 return 0
169 case 1:
170 return 1
171 default:
172 return bits.Len32(uint32(d.NumEntries() - 1))
173 }
174 }
175
176 // WriteDict writes the dictionary index to the given byte slice.
177 func (d *dictEncoder) WriteDict(out []byte) {
178 d.memo.WriteOut(out)
179 }
180
181 // WriteIndices performs Run Length encoding on the indexes and the writes the encoded
182 // index value data to the provided byte slice, returning the number of bytes actually written.
183 // If any error is encountered, it will return -1 and the error.
184 func (d *dictEncoder) WriteIndices(out []byte) (int, error) {
185 out[0] = byte(d.BitWidth())
186
187 enc := utils.NewRleEncoder(utils.NewWriterAtBuffer(out[1:]), d.BitWidth())
188 for _, idx := range d.idxValues {
189 if err := enc.Put(uint64(idx)); err != nil {
190 return -1, err
191 }
192 }
193 nbytes := enc.Flush()
194
195 d.idxValues = d.idxValues[:0]
196 return nbytes + 1, nil
197 }
198
199 // Put adds a value to the dictionary data column, inserting the value if it
200 // didn't already exist in the dictionary.
201 func (d *dictEncoder) Put(v interface{}) {
202 memoIdx, found, err := d.memo.GetOrInsert(v)
203 if err != nil {
204 panic(err)
205 }
206 if !found {
207 d.dictEncodedSize += int(reflect.TypeOf(v).Size())
208 }
209 d.addIndex(memoIdx)
210 }
211
212 // DictEncodedSize returns the current size of the encoded dictionary
213 func (d *dictEncoder) DictEncodedSize() int {
214 return d.dictEncodedSize
215 }
216
217 // spacedCompress is a helper function for encoders to remove the slots in the slices passed in according
218 // to the bitmap which are null into an output slice that is no longer spaced out with slots for nulls.
219 func spacedCompress(src, out interface{}, validBits []byte, validBitsOffset int64) int {
220 nvalid := 0
221
222 // for efficiency we use a type switch because the copy runs significantly faster when typed
223 // than calling reflect.Copy
224 switch s := src.(type) {
225 case []int32:
226 o := out.([]int32)
227 reader := utils.NewSetBitRunReader(validBits, validBitsOffset, int64(len(s)))
228 for {
229 run := reader.NextRun()
230 if run.Length == 0 {
231 break
232 }
233 copy(o[nvalid:], s[int(run.Pos):int(run.Pos+run.Length)])
234 nvalid += int(run.Length)
235 }
236 case []int64:
237 o := out.([]int64)
238 reader := utils.NewSetBitRunReader(validBits, validBitsOffset, int64(len(s)))
239 for {
240 run := reader.NextRun()
241 if run.Length == 0 {
242 break
243 }
244 copy(o[nvalid:], s[int(run.Pos):int(run.Pos+run.Length)])
245 nvalid += int(run.Length)
246 }
247 case []float32:
248 o := out.([]float32)
249 reader := utils.NewSetBitRunReader(validBits, validBitsOffset, int64(len(s)))
250 for {
251 run := reader.NextRun()
252 if run.Length == 0 {
253 break
254 }
255 copy(o[nvalid:], s[int(run.Pos):int(run.Pos+run.Length)])
256 nvalid += int(run.Length)
257 }
258 case []float64:
259 o := out.([]float64)
260 reader := utils.NewSetBitRunReader(validBits, validBitsOffset, int64(len(s)))
261 for {
262 run := reader.NextRun()
263 if run.Length == 0 {
264 break
265 }
266 copy(o[nvalid:], s[int(run.Pos):int(run.Pos+run.Length)])
267 nvalid += int(run.Length)
268 }
269 case []parquet.ByteArray:
270 o := out.([]parquet.ByteArray)
271 reader := utils.NewSetBitRunReader(validBits, validBitsOffset, int64(len(s)))
272 for {
273 run := reader.NextRun()
274 if run.Length == 0 {
275 break
276 }
277 copy(o[nvalid:], s[int(run.Pos):int(run.Pos+run.Length)])
278 nvalid += int(run.Length)
279 }
280 case []parquet.FixedLenByteArray:
281 o := out.([]parquet.FixedLenByteArray)
282 reader := utils.NewSetBitRunReader(validBits, validBitsOffset, int64(len(s)))
283 for {
284 run := reader.NextRun()
285 if run.Length == 0 {
286 break
287 }
288 copy(o[nvalid:], s[int(run.Pos):int(run.Pos+run.Length)])
289 nvalid += int(run.Length)
290 }
291 case []bool:
292 o := out.([]bool)
293 reader := utils.NewSetBitRunReader(validBits, validBitsOffset, int64(len(s)))
294 for {
295 run := reader.NextRun()
296 if run.Length == 0 {
297 break
298 }
299 copy(o[nvalid:], s[int(run.Pos):int(run.Pos+run.Length)])
300 nvalid += int(run.Length)
301 }
302 }
303
304 return nvalid
305 }