]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/go/parquet/internal/utils/typed_rle_dict.gen.go.tmpl
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / go / parquet / internal / utils / typed_rle_dict.gen.go.tmpl
CommitLineData
1d09f67e
TL
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package utils
18
19import (
20 "github.com/apache/arrow/go/v6/parquet"
21)
22
23{{range .In}}
24{{if ne .Name "Boolean"}}
25func (r *RleDecoder) GetBatchWithDictSpaced{{.Name}}(dc DictionaryConverter, vals []{{.name}}, nullCount int, validBits []byte, validBitsOffset int64) (totalProcessed int, err error) {
26 if nullCount == 0 {
27 return r.GetBatchWithDict{{.Name}}(dc, vals)
28 }
29
30 var (
31 blockCounter = NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
32 processed = 0
33 block BitBlockCount
34 )
35
36 for {
37 block = blockCounter.NextFourWords()
38 if block.Len == 0 {
39 break
40 }
41
42 switch {
43 case block.AllSet():
44 processed, err = r.GetBatchWithDict{{.Name}}(dc, vals[:block.Len])
45 case block.NoneSet():
46 dc.FillZero(vals[:block.Len])
47 processed = int(block.Len)
48 default:
49 processed, err = r.getspaced{{.Name}}(dc, vals, int(block.Len), int(block.Len)-int(block.Popcnt), validBits, validBitsOffset)
50 }
51
52 if err != nil {
53 break
54 }
55
56 totalProcessed += processed
57 vals = vals[int(block.Len):]
58 validBitsOffset += int64(block.Len)
59 if processed != int(block.Len) {
60 break
61 }
62 }
63 return
64}
65
66func (r *RleDecoder) getspaced{{.Name}}(dc DictionaryConverter, vals []{{.name}}, batchSize, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
67 if nullCount == batchSize {
68 dc.FillZero(vals[:batchSize])
69 return batchSize, nil
70 }
71
72 read := 0
73 remain := batchSize - nullCount
74
75 const bufferSize = 1024
76 var indexbuffer [bufferSize]IndexType
77
78 // assume no bits to start
79 bitReader := NewBitRunReader(validBits, validBitsOffset, int64(batchSize))
80 validRun := bitReader.NextRun()
81 for read < batchSize {
82 if validRun.Len == 0 {
83 validRun = bitReader.NextRun()
84 }
85
86 if !validRun.Set {
87 dc.FillZero(vals[:int(validRun.Len)])
88 vals = vals[int(validRun.Len):]
89 read += int(validRun.Len)
90 validRun.Len = 0
91 continue
92 }
93
94 if r.repCount == 0 && r.litCount == 0 {
95 if !r.Next() {
96 return read, nil
97 }
98 }
99
100 var batch int
101 switch {
102 case r.repCount > 0:
103 batch, remain, validRun = r.consumeRepeatCounts(read, batchSize, remain, validRun, bitReader)
104 current := IndexType(r.curVal)
105 if !dc.IsValid(current) {
106 return read, nil
107 }
108 dc.Fill(vals[:batch], current)
109 case r.litCount > 0:
110 var (
111 litread int
112 skipped int
113 err error
114 )
115 litread, skipped, validRun, err = r.consumeLiterals{{.Name}}(dc, vals, remain, indexbuffer[:], validRun, bitReader)
116 if err != nil {
117 return read, err
118 }
119 batch = litread + skipped
120 remain -= litread
121 }
122
123 vals = vals[batch:]
124 read += batch
125 }
126 return read, nil
127}
128
129func (r *RleDecoder) consumeLiterals{{.Name}}(dc DictionaryConverter, vals []{{.name}}, remain int, buf []IndexType, run BitRun, bitRdr BitRunReader) (int, int, BitRun, error) {
130 batch := MinInt(MinInt(remain, int(r.litCount)), len(buf))
131 buf = buf[:batch]
132
133 n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
134 if n != batch {
135 return 0, 0, run, xerrors.New("was not able to retrieve correct number of indexes")
136 }
137
138 if !dc.IsValid(buf...) {
139 return 0, 0, run, xerrors.New("invalid index values found for dictionary converter")
140 }
141
142 var (
143 read int
144 skipped int
145 )
146 for read < batch {
147 if run.Set {
148 updateSize := MinInt(batch-read, int(run.Len))
149 if err := dc.Copy(vals, buf[read:read+updateSize]); err != nil {
150 return 0, 0, run, err
151 }
152 read += updateSize
153 vals = vals[updateSize:]
154 run.Len -= int64(updateSize)
155 } else {
156 dc.FillZero(vals[:int(run.Len)])
157 vals = vals[int(run.Len):]
158 skipped += int(run.Len)
159 run.Len = 0
160 }
161 if run.Len == 0 {
162 run = bitRdr.NextRun()
163 }
164 }
165 r.litCount -= int32(batch)
166 return read, skipped, run, nil
167}
168
169func (r *RleDecoder) GetBatchWithDict{{.Name}}(dc DictionaryConverter, vals []{{.name}}) (int, error) {
170 var (
171 read = 0
172 size = len(vals)
173 indexbuffer [1024]IndexType
174 )
175
176 for read < size {
177 remain := size - read
178
179 switch {
180 case r.repCount > 0:
181 idx := IndexType(r.curVal)
182 if !dc.IsValid(idx) {
183 return read, nil
184 }
185 batch := MinInt(remain, int(r.repCount))
186 if err := dc.Fill(vals[:batch], idx); err != nil {
187 return read, err
188 }
189 r.repCount -= int32(batch)
190 read += batch
191 vals = vals[batch:]
192 case r.litCount > 0:
193 litbatch := MinInt(MinInt(remain, int(r.litCount)), 1024)
194 buf := indexbuffer[:litbatch]
195 n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
196 if n != litbatch {
197 return read, nil
198 }
199 if !dc.IsValid(buf...) {
200 return read, nil
201 }
202 if err := dc.Copy(vals, buf); err != nil {
203 return read, nil
204 }
205 r.litCount -= int32(litbatch)
206 read += litbatch
207 vals = vals[litbatch:]
208 default:
209 if !r.Next() {
210 return read, nil
211 }
212 }
213 }
214
215 return read, nil
216}
217{{end}}
218{{end}}