1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
23 "github.com/apache/arrow/go/v6/arrow"
24 "github.com/apache/arrow/go/v6/arrow/bitutil"
25 "github.com/apache/arrow/go/v6/arrow/memory"
26 "github.com/apache/arrow/go/v6/parquet"
27 format "github.com/apache/arrow/go/v6/parquet/internal/gen-go/parquet"
28 "github.com/apache/arrow/go/v6/parquet/internal/utils"
29 "github.com/apache/arrow/go/v6/parquet/schema"
32 //go:generate go run ../../../arrow/_tools/tmpl/main.go -i -data=physical_types.tmpldata plain_encoder_types.gen.go.tmpl typed_encoder.gen.go.tmpl
34 // EncoderTraits is an interface for the different types to make it more
35 // convenient to construct encoders for specific types.
36 type EncoderTraits interface {
37 Encoder(format.Encoding, bool, *schema.Column, memory.Allocator) TypedEncoder
40 // NewEncoder will return the appropriately typed encoder for the requested physical type
43 // If mem is nil, memory.DefaultAllocator will be used.
44 func NewEncoder(t parquet.Type, e parquet.Encoding, useDict bool, descr *schema.Column, mem memory.Allocator) TypedEncoder {
45 traits := getEncodingTraits(t)
51 mem = memory.DefaultAllocator
53 return traits.Encoder(format.Encoding(e), useDict, descr, mem)
58 encoding format.Encoding
62 sink *PooledBufferWriter
65 // newEncoderBase constructs a new base encoder for embedding on the typed encoders
66 // encapsulating the common functionality.
67 func newEncoderBase(e format.Encoding, descr *schema.Column, mem memory.Allocator) encoder {
69 if descr != nil && descr.PhysicalType() == parquet.Types.FixedLenByteArray {
70 typelen = int(descr.TypeLength())
77 sink: NewPooledBufferWriter(1024),
81 // ReserveForWrite allocates n bytes so that the next n bytes written do not require new allocations.
82 func (e *encoder) ReserveForWrite(n int) { e.sink.Reserve(n) }
83 func (e *encoder) EstimatedDataEncodedSize() int64 { return int64(e.sink.Len()) }
84 func (e *encoder) Encoding() parquet.Encoding { return parquet.Encoding(e.encoding) }
85 func (e *encoder) Allocator() memory.Allocator { return e.mem }
86 func (e *encoder) append(data []byte) { e.sink.Write(data) }
88 // FlushValues flushes any unwritten data to the buffer and returns the finished encoded buffer of data.
89 // This also clears the encoder, ownership of the data belongs to whomever called FlushValues, Release
90 // should be called on the resulting Buffer when done.
91 func (e *encoder) FlushValues() (Buffer, error) { return e.sink.Finish(), nil }
93 // Bytes returns the current bytes that have been written to the encoder's buffer but doesn't transfer ownership.
94 func (e *encoder) Bytes() []byte { return e.sink.Bytes() }
96 // Reset drops the data currently in the encoder and resets for new use.
97 func (e *encoder) Reset() { e.sink.Reset(0) }
99 type dictEncoder struct {
103 idxBuffer *memory.Buffer
108 // newDictEncoderBase constructs and returns a dictionary encoder for the appropriate type using the passed
109 // in memo table for constructing the index.
110 func newDictEncoderBase(descr *schema.Column, memo MemoTable, mem memory.Allocator) dictEncoder {
112 encoder: newEncoderBase(format.Encoding_PLAIN_DICTIONARY, descr, mem),
113 idxBuffer: memory.NewResizableBuffer(mem),
118 // Reset drops all the currently encoded values from the index and indexes from the data to allow
119 // restarting the encoding process.
120 func (d *dictEncoder) Reset() {
122 d.dictEncodedSize = 0
123 d.idxValues = d.idxValues[:0]
124 d.idxBuffer.ResizeNoShrink(0)
128 // append the passed index to the indexbuffer
129 func (d *dictEncoder) addIndex(idx int) {
130 if len(d.idxValues) == cap(d.idxValues) {
131 curLen := len(d.idxValues)
132 d.idxBuffer.ResizeNoShrink(arrow.Int32Traits.BytesRequired(bitutil.NextPowerOf2(curLen + 1)))
133 d.idxValues = arrow.Int32Traits.CastFromBytes(d.idxBuffer.Buf())[: curLen : d.idxBuffer.Len()/arrow.Int32SizeBytes]
135 d.idxValues = append(d.idxValues, int32(idx))
138 // FlushValues dumps all the currently buffered indexes that would become the data page to a buffer and
139 // returns it or returns nil and any error encountered.
140 func (d *dictEncoder) FlushValues() (Buffer, error) {
141 buf := bufferPool.Get().(*memory.Buffer)
142 buf.Reserve(int(d.EstimatedDataEncodedSize()))
143 size, err := d.WriteIndices(buf.Buf())
145 poolBuffer{buf}.Release()
148 buf.ResizeNoShrink(size)
149 return poolBuffer{buf}, nil
152 // EstimatedDataEncodedSize returns the maximum number of bytes needed to store the RLE encoded indexes, not including the
153 // dictionary index in the computation.
154 func (d *dictEncoder) EstimatedDataEncodedSize() int64 {
155 return 1 + int64(utils.MaxBufferSize(d.BitWidth(), len(d.idxValues))+utils.MinBufferSize(d.BitWidth()))
158 // NumEntries returns the number of entires in the dictionary index for this encoder.
159 func (d *dictEncoder) NumEntries() int {
163 // BitWidth returns the max bitwidth that would be necessary for encoding the index values currently
164 // in the dictionary based on the size of the dictionary index.
165 func (d *dictEncoder) BitWidth() int {
166 switch d.NumEntries() {
172 return bits.Len32(uint32(d.NumEntries() - 1))
176 // WriteDict writes the dictionary index to the given byte slice.
177 func (d *dictEncoder) WriteDict(out []byte) {
181 // WriteIndices performs Run Length encoding on the indexes and the writes the encoded
182 // index value data to the provided byte slice, returning the number of bytes actually written.
183 // If any error is encountered, it will return -1 and the error.
184 func (d *dictEncoder) WriteIndices(out []byte) (int, error) {
185 out[0] = byte(d.BitWidth())
187 enc := utils.NewRleEncoder(utils.NewWriterAtBuffer(out[1:]), d.BitWidth())
188 for _, idx := range d.idxValues {
189 if err := enc.Put(uint64(idx)); err != nil {
193 nbytes := enc.Flush()
195 d.idxValues = d.idxValues[:0]
196 return nbytes + 1, nil
199 // Put adds a value to the dictionary data column, inserting the value if it
200 // didn't already exist in the dictionary.
201 func (d *dictEncoder) Put(v interface{}) {
202 memoIdx, found, err := d.memo.GetOrInsert(v)
207 d.dictEncodedSize += int(reflect.TypeOf(v).Size())
212 // DictEncodedSize returns the current size of the encoded dictionary
213 func (d *dictEncoder) DictEncodedSize() int {
214 return d.dictEncodedSize
217 // spacedCompress is a helper function for encoders to remove the slots in the slices passed in according
218 // to the bitmap which are null into an output slice that is no longer spaced out with slots for nulls.
219 func spacedCompress(src, out interface{}, validBits []byte, validBitsOffset int64) int {
222 // for efficiency we use a type switch because the copy runs significantly faster when typed
223 // than calling reflect.Copy
224 switch s := src.(type) {
227 reader := utils.NewSetBitRunReader(validBits, validBitsOffset, int64(len(s)))
229 run := reader.NextRun()
233 copy(o[nvalid:], s[int(run.Pos):int(run.Pos+run.Length)])
234 nvalid += int(run.Length)
238 reader := utils.NewSetBitRunReader(validBits, validBitsOffset, int64(len(s)))
240 run := reader.NextRun()
244 copy(o[nvalid:], s[int(run.Pos):int(run.Pos+run.Length)])
245 nvalid += int(run.Length)
249 reader := utils.NewSetBitRunReader(validBits, validBitsOffset, int64(len(s)))
251 run := reader.NextRun()
255 copy(o[nvalid:], s[int(run.Pos):int(run.Pos+run.Length)])
256 nvalid += int(run.Length)
260 reader := utils.NewSetBitRunReader(validBits, validBitsOffset, int64(len(s)))
262 run := reader.NextRun()
266 copy(o[nvalid:], s[int(run.Pos):int(run.Pos+run.Length)])
267 nvalid += int(run.Length)
269 case []parquet.ByteArray:
270 o := out.([]parquet.ByteArray)
271 reader := utils.NewSetBitRunReader(validBits, validBitsOffset, int64(len(s)))
273 run := reader.NextRun()
277 copy(o[nvalid:], s[int(run.Pos):int(run.Pos+run.Length)])
278 nvalid += int(run.Length)
280 case []parquet.FixedLenByteArray:
281 o := out.([]parquet.FixedLenByteArray)
282 reader := utils.NewSetBitRunReader(validBits, validBitsOffset, int64(len(s)))
284 run := reader.NextRun()
288 copy(o[nvalid:], s[int(run.Pos):int(run.Pos+run.Length)])
289 nvalid += int(run.Length)
293 reader := utils.NewSetBitRunReader(validBits, validBitsOffset, int64(len(s)))
295 run := reader.NextRun()
299 copy(o[nvalid:], s[int(run.Pos):int(run.Pos+run.Length)])
300 nvalid += int(run.Length)