]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Code generated by plain_encoder_types.gen.go.tmpl. DO NOT EDIT. |
2 | ||
3 | // Licensed to the Apache Software Foundation (ASF) under one | |
4 | // or more contributor license agreements. See the NOTICE file | |
5 | // distributed with this work for additional information | |
6 | // regarding copyright ownership. The ASF licenses this file | |
7 | // to you under the Apache License, Version 2.0 (the | |
8 | // "License"); you may not use this file except in compliance | |
9 | // with the License. You may obtain a copy of the License at | |
10 | // | |
11 | // http://www.apache.org/licenses/LICENSE-2.0 | |
12 | // | |
13 | // Unless required by applicable law or agreed to in writing, software | |
14 | // distributed under the License is distributed on an "AS IS" BASIS, | |
15 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
16 | // See the License for the specific language governing permissions and | |
17 | // limitations under the License. | |
18 | ||
19 | package encoding | |
20 | ||
21 | import ( | |
22 | "bytes" | |
23 | "encoding/binary" | |
24 | "math" | |
25 | ||
26 | "github.com/apache/arrow/go/v6/arrow" | |
27 | "github.com/apache/arrow/go/v6/arrow/endian" | |
28 | "github.com/apache/arrow/go/v6/parquet" | |
29 | "github.com/apache/arrow/go/v6/parquet/internal/utils" | |
30 | "golang.org/x/xerrors" | |
31 | ) | |
32 | ||
33 | var ( | |
34 | writeInt32LE func(*encoder, []int32) | |
35 | copyFromInt32LE func(dst []int32, src []byte) | |
36 | writeInt64LE func(*encoder, []int64) | |
37 | copyFromInt64LE func(dst []int64, src []byte) | |
38 | writeInt96LE func(*encoder, []parquet.Int96) | |
39 | copyFromInt96LE func(dst []parquet.Int96, src []byte) | |
40 | writeFloat32LE func(*encoder, []float32) | |
41 | copyFromFloat32LE func(dst []float32, src []byte) | |
42 | writeFloat64LE func(*encoder, []float64) | |
43 | copyFromFloat64LE func(dst []float64, src []byte) | |
44 | ) | |
45 | ||
46 | func init() { | |
47 | // int96 is already internally represented as little endian data | |
48 | // no need to have special behavior on big endian architectures | |
49 | // for read/write, consumers will need to be aware of the fact | |
50 | // that it is internally 12 bytes little endian when attempting | |
51 | // to utilize it. | |
52 | writeInt96LE = func(e *encoder, in []parquet.Int96) { | |
53 | e.append(parquet.Int96Traits.CastToBytes(in)) | |
54 | } | |
55 | copyFromInt96LE = func(dst []parquet.Int96, src []byte) { | |
56 | copy(parquet.Int96Traits.CastToBytes(dst), src) | |
57 | } | |
58 | ||
59 | if endian.IsBigEndian { | |
60 | writeInt32LE = func(e *encoder, in []int32) { | |
61 | binary.Write(e.sink, binary.LittleEndian, in) | |
62 | } | |
63 | copyFromInt32LE = func(dst []int32, src []byte) { | |
64 | r := bytes.NewReader(src) | |
65 | binary.Read(r, binary.LittleEndian, &dst) | |
66 | } | |
67 | writeInt64LE = func(e *encoder, in []int64) { | |
68 | binary.Write(e.sink, binary.LittleEndian, in) | |
69 | } | |
70 | copyFromInt64LE = func(dst []int64, src []byte) { | |
71 | r := bytes.NewReader(src) | |
72 | binary.Read(r, binary.LittleEndian, &dst) | |
73 | } | |
74 | writeFloat32LE = func(e *encoder, in []float32) { | |
75 | binary.Write(e.sink, binary.LittleEndian, in) | |
76 | } | |
77 | copyFromFloat32LE = func(dst []float32, src []byte) { | |
78 | r := bytes.NewReader(src) | |
79 | binary.Read(r, binary.LittleEndian, &dst) | |
80 | } | |
81 | writeFloat64LE = func(e *encoder, in []float64) { | |
82 | binary.Write(e.sink, binary.LittleEndian, in) | |
83 | } | |
84 | copyFromFloat64LE = func(dst []float64, src []byte) { | |
85 | r := bytes.NewReader(src) | |
86 | binary.Read(r, binary.LittleEndian, &dst) | |
87 | } | |
88 | } else { | |
89 | writeInt32LE = func(e *encoder, in []int32) { | |
90 | e.append(arrow.Int32Traits.CastToBytes(in)) | |
91 | } | |
92 | copyFromInt32LE = func(dst []int32, src []byte) { | |
93 | copy(arrow.Int32Traits.CastToBytes(dst), src) | |
94 | } | |
95 | writeInt64LE = func(e *encoder, in []int64) { | |
96 | e.append(arrow.Int64Traits.CastToBytes(in)) | |
97 | } | |
98 | copyFromInt64LE = func(dst []int64, src []byte) { | |
99 | copy(arrow.Int64Traits.CastToBytes(dst), src) | |
100 | } | |
101 | writeFloat32LE = func(e *encoder, in []float32) { | |
102 | e.append(arrow.Float32Traits.CastToBytes(in)) | |
103 | } | |
104 | copyFromFloat32LE = func(dst []float32, src []byte) { | |
105 | copy(arrow.Float32Traits.CastToBytes(dst), src) | |
106 | } | |
107 | writeFloat64LE = func(e *encoder, in []float64) { | |
108 | e.append(arrow.Float64Traits.CastToBytes(in)) | |
109 | } | |
110 | copyFromFloat64LE = func(dst []float64, src []byte) { | |
111 | copy(arrow.Float64Traits.CastToBytes(dst), src) | |
112 | } | |
113 | } | |
114 | } | |
115 | ||
116 | // PlainInt32Encoder is an encoder for int32 values using Plain Encoding | |
117 | // which in general is just storing the values as raw bytes of the appropriate size | |
118 | type PlainInt32Encoder struct { | |
119 | encoder | |
120 | ||
121 | bitSetReader utils.SetBitRunReader | |
122 | } | |
123 | ||
124 | // Put encodes a slice of values into the underlying buffer | |
125 | func (enc *PlainInt32Encoder) Put(in []int32) { | |
126 | writeInt32LE(&enc.encoder, in) | |
127 | } | |
128 | ||
129 | // PutSpaced encodes a slice of values into the underlying buffer which are spaced out | |
130 | // including null values defined by the validBits bitmap starting at a given bit offset. | |
131 | // the values are first compressed by having the null slots removed before writing to the buffer | |
132 | func (enc *PlainInt32Encoder) PutSpaced(in []int32, validBits []byte, validBitsOffset int64) { | |
133 | nbytes := arrow.Int32Traits.BytesRequired(len(in)) | |
134 | enc.ReserveForWrite(nbytes) | |
135 | ||
136 | if enc.bitSetReader == nil { | |
137 | enc.bitSetReader = utils.NewSetBitRunReader(validBits, validBitsOffset, int64(len(in))) | |
138 | } else { | |
139 | enc.bitSetReader.Reset(validBits, validBitsOffset, int64(len(in))) | |
140 | } | |
141 | ||
142 | for { | |
143 | run := enc.bitSetReader.NextRun() | |
144 | if run.Length == 0 { | |
145 | break | |
146 | } | |
147 | enc.Put(in[int(run.Pos):int(run.Pos+run.Length)]) | |
148 | } | |
149 | } | |
150 | ||
151 | // Type returns the underlying physical type this encoder is able to encode | |
152 | func (PlainInt32Encoder) Type() parquet.Type { | |
153 | return parquet.Types.Int32 | |
154 | } | |
155 | ||
156 | // PlainInt32Decoder is a decoder specifically for decoding Plain Encoding data | |
157 | // of int32 type. | |
158 | type PlainInt32Decoder struct { | |
159 | decoder | |
160 | ||
161 | bitSetReader utils.SetBitRunReader | |
162 | } | |
163 | ||
164 | // Type returns the physical type this decoder is able to decode for | |
165 | func (PlainInt32Decoder) Type() parquet.Type { | |
166 | return parquet.Types.Int32 | |
167 | } | |
168 | ||
169 | // Decode populates the given slice with values from the data to be decoded, | |
170 | // decoding the min(len(out), remaining values). | |
171 | // It returns the number of values actually decoded and any error encountered. | |
172 | func (dec *PlainInt32Decoder) Decode(out []int32) (int, error) { | |
173 | max := utils.MinInt(len(out), dec.nvals) | |
174 | nbytes := int64(max) * int64(arrow.Int32SizeBytes) | |
175 | if nbytes > int64(len(dec.data)) || nbytes > math.MaxInt32 { | |
176 | return 0, xerrors.Errorf("parquet: eof exception decode plain Int32, nvals: %d, nbytes: %d, datalen: %d", dec.nvals, nbytes, len(dec.data)) | |
177 | } | |
178 | ||
179 | copyFromInt32LE(out, dec.data[:nbytes]) | |
180 | dec.data = dec.data[nbytes:] | |
181 | dec.nvals -= max | |
182 | return max, nil | |
183 | } | |
184 | ||
185 | // DecodeSpaced is the same as decode, except it expands the data out to leave spaces for null values | |
186 | // as defined by the bitmap provided. | |
187 | func (dec *PlainInt32Decoder) DecodeSpaced(out []int32, nullCount int, validBits []byte, validBitsOffset int64) (int, error) { | |
188 | toread := len(out) - nullCount | |
189 | values, err := dec.Decode(out[:toread]) | |
190 | if err != nil { | |
191 | return 0, err | |
192 | } | |
193 | if values != toread { | |
194 | return 0, xerrors.New("parquet: number of values / definition levels read did not match") | |
195 | } | |
196 | ||
197 | nvalues := len(out) | |
198 | if nullCount == 0 { | |
199 | return nvalues, nil | |
200 | } | |
201 | ||
202 | idxDecode := nvalues - nullCount | |
203 | if dec.bitSetReader == nil { | |
204 | dec.bitSetReader = utils.NewReverseSetBitRunReader(validBits, validBitsOffset, int64(nvalues)) | |
205 | } else { | |
206 | dec.bitSetReader.Reset(validBits, validBitsOffset, int64(nvalues)) | |
207 | } | |
208 | ||
209 | for { | |
210 | run := dec.bitSetReader.NextRun() | |
211 | if run.Length == 0 { | |
212 | break | |
213 | } | |
214 | ||
215 | idxDecode -= int(run.Length) | |
216 | copy(out[int(run.Pos):], out[idxDecode:idxDecode+int(run.Length)]) | |
217 | } | |
218 | return nvalues, nil | |
219 | } | |
220 | ||
221 | // PlainInt64Encoder is an encoder for int64 values using Plain Encoding | |
222 | // which in general is just storing the values as raw bytes of the appropriate size | |
223 | type PlainInt64Encoder struct { | |
224 | encoder | |
225 | ||
226 | bitSetReader utils.SetBitRunReader | |
227 | } | |
228 | ||
229 | // Put encodes a slice of values into the underlying buffer | |
230 | func (enc *PlainInt64Encoder) Put(in []int64) { | |
231 | writeInt64LE(&enc.encoder, in) | |
232 | } | |
233 | ||
234 | // PutSpaced encodes a slice of values into the underlying buffer which are spaced out | |
235 | // including null values defined by the validBits bitmap starting at a given bit offset. | |
236 | // the values are first compressed by having the null slots removed before writing to the buffer | |
237 | func (enc *PlainInt64Encoder) PutSpaced(in []int64, validBits []byte, validBitsOffset int64) { | |
238 | nbytes := arrow.Int64Traits.BytesRequired(len(in)) | |
239 | enc.ReserveForWrite(nbytes) | |
240 | ||
241 | if enc.bitSetReader == nil { | |
242 | enc.bitSetReader = utils.NewSetBitRunReader(validBits, validBitsOffset, int64(len(in))) | |
243 | } else { | |
244 | enc.bitSetReader.Reset(validBits, validBitsOffset, int64(len(in))) | |
245 | } | |
246 | ||
247 | for { | |
248 | run := enc.bitSetReader.NextRun() | |
249 | if run.Length == 0 { | |
250 | break | |
251 | } | |
252 | enc.Put(in[int(run.Pos):int(run.Pos+run.Length)]) | |
253 | } | |
254 | } | |
255 | ||
256 | // Type returns the underlying physical type this encoder is able to encode | |
257 | func (PlainInt64Encoder) Type() parquet.Type { | |
258 | return parquet.Types.Int64 | |
259 | } | |
260 | ||
261 | // PlainInt64Decoder is a decoder specifically for decoding Plain Encoding data | |
262 | // of int64 type. | |
263 | type PlainInt64Decoder struct { | |
264 | decoder | |
265 | ||
266 | bitSetReader utils.SetBitRunReader | |
267 | } | |
268 | ||
269 | // Type returns the physical type this decoder is able to decode for | |
270 | func (PlainInt64Decoder) Type() parquet.Type { | |
271 | return parquet.Types.Int64 | |
272 | } | |
273 | ||
274 | // Decode populates the given slice with values from the data to be decoded, | |
275 | // decoding the min(len(out), remaining values). | |
276 | // It returns the number of values actually decoded and any error encountered. | |
277 | func (dec *PlainInt64Decoder) Decode(out []int64) (int, error) { | |
278 | max := utils.MinInt(len(out), dec.nvals) | |
279 | nbytes := int64(max) * int64(arrow.Int64SizeBytes) | |
280 | if nbytes > int64(len(dec.data)) || nbytes > math.MaxInt32 { | |
281 | return 0, xerrors.Errorf("parquet: eof exception decode plain Int64, nvals: %d, nbytes: %d, datalen: %d", dec.nvals, nbytes, len(dec.data)) | |
282 | } | |
283 | ||
284 | copyFromInt64LE(out, dec.data[:nbytes]) | |
285 | dec.data = dec.data[nbytes:] | |
286 | dec.nvals -= max | |
287 | return max, nil | |
288 | } | |
289 | ||
290 | // DecodeSpaced is the same as decode, except it expands the data out to leave spaces for null values | |
291 | // as defined by the bitmap provided. | |
292 | func (dec *PlainInt64Decoder) DecodeSpaced(out []int64, nullCount int, validBits []byte, validBitsOffset int64) (int, error) { | |
293 | toread := len(out) - nullCount | |
294 | values, err := dec.Decode(out[:toread]) | |
295 | if err != nil { | |
296 | return 0, err | |
297 | } | |
298 | if values != toread { | |
299 | return 0, xerrors.New("parquet: number of values / definition levels read did not match") | |
300 | } | |
301 | ||
302 | nvalues := len(out) | |
303 | if nullCount == 0 { | |
304 | return nvalues, nil | |
305 | } | |
306 | ||
307 | idxDecode := nvalues - nullCount | |
308 | if dec.bitSetReader == nil { | |
309 | dec.bitSetReader = utils.NewReverseSetBitRunReader(validBits, validBitsOffset, int64(nvalues)) | |
310 | } else { | |
311 | dec.bitSetReader.Reset(validBits, validBitsOffset, int64(nvalues)) | |
312 | } | |
313 | ||
314 | for { | |
315 | run := dec.bitSetReader.NextRun() | |
316 | if run.Length == 0 { | |
317 | break | |
318 | } | |
319 | ||
320 | idxDecode -= int(run.Length) | |
321 | copy(out[int(run.Pos):], out[idxDecode:idxDecode+int(run.Length)]) | |
322 | } | |
323 | return nvalues, nil | |
324 | } | |
325 | ||
326 | // PlainInt96Encoder is an encoder for parquet.Int96 values using Plain Encoding | |
327 | // which in general is just storing the values as raw bytes of the appropriate size | |
328 | type PlainInt96Encoder struct { | |
329 | encoder | |
330 | ||
331 | bitSetReader utils.SetBitRunReader | |
332 | } | |
333 | ||
334 | // Put encodes a slice of values into the underlying buffer | |
335 | func (enc *PlainInt96Encoder) Put(in []parquet.Int96) { | |
336 | writeInt96LE(&enc.encoder, in) | |
337 | } | |
338 | ||
339 | // PutSpaced encodes a slice of values into the underlying buffer which are spaced out | |
340 | // including null values defined by the validBits bitmap starting at a given bit offset. | |
341 | // the values are first compressed by having the null slots removed before writing to the buffer | |
342 | func (enc *PlainInt96Encoder) PutSpaced(in []parquet.Int96, validBits []byte, validBitsOffset int64) { | |
343 | nbytes := parquet.Int96Traits.BytesRequired(len(in)) | |
344 | enc.ReserveForWrite(nbytes) | |
345 | ||
346 | if enc.bitSetReader == nil { | |
347 | enc.bitSetReader = utils.NewSetBitRunReader(validBits, validBitsOffset, int64(len(in))) | |
348 | } else { | |
349 | enc.bitSetReader.Reset(validBits, validBitsOffset, int64(len(in))) | |
350 | } | |
351 | ||
352 | for { | |
353 | run := enc.bitSetReader.NextRun() | |
354 | if run.Length == 0 { | |
355 | break | |
356 | } | |
357 | enc.Put(in[int(run.Pos):int(run.Pos+run.Length)]) | |
358 | } | |
359 | } | |
360 | ||
361 | // Type returns the underlying physical type this encoder is able to encode | |
362 | func (PlainInt96Encoder) Type() parquet.Type { | |
363 | return parquet.Types.Int96 | |
364 | } | |
365 | ||
366 | // PlainInt96Decoder is a decoder specifically for decoding Plain Encoding data | |
367 | // of parquet.Int96 type. | |
368 | type PlainInt96Decoder struct { | |
369 | decoder | |
370 | ||
371 | bitSetReader utils.SetBitRunReader | |
372 | } | |
373 | ||
374 | // Type returns the physical type this decoder is able to decode for | |
375 | func (PlainInt96Decoder) Type() parquet.Type { | |
376 | return parquet.Types.Int96 | |
377 | } | |
378 | ||
379 | // Decode populates the given slice with values from the data to be decoded, | |
380 | // decoding the min(len(out), remaining values). | |
381 | // It returns the number of values actually decoded and any error encountered. | |
382 | func (dec *PlainInt96Decoder) Decode(out []parquet.Int96) (int, error) { | |
383 | max := utils.MinInt(len(out), dec.nvals) | |
384 | nbytes := int64(max) * int64(parquet.Int96SizeBytes) | |
385 | if nbytes > int64(len(dec.data)) || nbytes > math.MaxInt32 { | |
386 | return 0, xerrors.Errorf("parquet: eof exception decode plain Int96, nvals: %d, nbytes: %d, datalen: %d", dec.nvals, nbytes, len(dec.data)) | |
387 | } | |
388 | ||
389 | copyFromInt96LE(out, dec.data[:nbytes]) | |
390 | dec.data = dec.data[nbytes:] | |
391 | dec.nvals -= max | |
392 | return max, nil | |
393 | } | |
394 | ||
395 | // DecodeSpaced is the same as decode, except it expands the data out to leave spaces for null values | |
396 | // as defined by the bitmap provided. | |
397 | func (dec *PlainInt96Decoder) DecodeSpaced(out []parquet.Int96, nullCount int, validBits []byte, validBitsOffset int64) (int, error) { | |
398 | toread := len(out) - nullCount | |
399 | values, err := dec.Decode(out[:toread]) | |
400 | if err != nil { | |
401 | return 0, err | |
402 | } | |
403 | if values != toread { | |
404 | return 0, xerrors.New("parquet: number of values / definition levels read did not match") | |
405 | } | |
406 | ||
407 | nvalues := len(out) | |
408 | if nullCount == 0 { | |
409 | return nvalues, nil | |
410 | } | |
411 | ||
412 | idxDecode := nvalues - nullCount | |
413 | if dec.bitSetReader == nil { | |
414 | dec.bitSetReader = utils.NewReverseSetBitRunReader(validBits, validBitsOffset, int64(nvalues)) | |
415 | } else { | |
416 | dec.bitSetReader.Reset(validBits, validBitsOffset, int64(nvalues)) | |
417 | } | |
418 | ||
419 | for { | |
420 | run := dec.bitSetReader.NextRun() | |
421 | if run.Length == 0 { | |
422 | break | |
423 | } | |
424 | ||
425 | idxDecode -= int(run.Length) | |
426 | copy(out[int(run.Pos):], out[idxDecode:idxDecode+int(run.Length)]) | |
427 | } | |
428 | return nvalues, nil | |
429 | } | |
430 | ||
431 | // PlainFloat32Encoder is an encoder for float32 values using Plain Encoding | |
432 | // which in general is just storing the values as raw bytes of the appropriate size | |
433 | type PlainFloat32Encoder struct { | |
434 | encoder | |
435 | ||
436 | bitSetReader utils.SetBitRunReader | |
437 | } | |
438 | ||
439 | // Put encodes a slice of values into the underlying buffer | |
440 | func (enc *PlainFloat32Encoder) Put(in []float32) { | |
441 | writeFloat32LE(&enc.encoder, in) | |
442 | } | |
443 | ||
444 | // PutSpaced encodes a slice of values into the underlying buffer which are spaced out | |
445 | // including null values defined by the validBits bitmap starting at a given bit offset. | |
446 | // the values are first compressed by having the null slots removed before writing to the buffer | |
447 | func (enc *PlainFloat32Encoder) PutSpaced(in []float32, validBits []byte, validBitsOffset int64) { | |
448 | nbytes := arrow.Float32Traits.BytesRequired(len(in)) | |
449 | enc.ReserveForWrite(nbytes) | |
450 | ||
451 | if enc.bitSetReader == nil { | |
452 | enc.bitSetReader = utils.NewSetBitRunReader(validBits, validBitsOffset, int64(len(in))) | |
453 | } else { | |
454 | enc.bitSetReader.Reset(validBits, validBitsOffset, int64(len(in))) | |
455 | } | |
456 | ||
457 | for { | |
458 | run := enc.bitSetReader.NextRun() | |
459 | if run.Length == 0 { | |
460 | break | |
461 | } | |
462 | enc.Put(in[int(run.Pos):int(run.Pos+run.Length)]) | |
463 | } | |
464 | } | |
465 | ||
466 | // Type returns the underlying physical type this encoder is able to encode | |
467 | func (PlainFloat32Encoder) Type() parquet.Type { | |
468 | return parquet.Types.Float | |
469 | } | |
470 | ||
471 | // PlainFloat32Decoder is a decoder specifically for decoding Plain Encoding data | |
472 | // of float32 type. | |
473 | type PlainFloat32Decoder struct { | |
474 | decoder | |
475 | ||
476 | bitSetReader utils.SetBitRunReader | |
477 | } | |
478 | ||
479 | // Type returns the physical type this decoder is able to decode for | |
480 | func (PlainFloat32Decoder) Type() parquet.Type { | |
481 | return parquet.Types.Float | |
482 | } | |
483 | ||
484 | // Decode populates the given slice with values from the data to be decoded, | |
485 | // decoding the min(len(out), remaining values). | |
486 | // It returns the number of values actually decoded and any error encountered. | |
487 | func (dec *PlainFloat32Decoder) Decode(out []float32) (int, error) { | |
488 | max := utils.MinInt(len(out), dec.nvals) | |
489 | nbytes := int64(max) * int64(arrow.Float32SizeBytes) | |
490 | if nbytes > int64(len(dec.data)) || nbytes > math.MaxInt32 { | |
491 | return 0, xerrors.Errorf("parquet: eof exception decode plain Float32, nvals: %d, nbytes: %d, datalen: %d", dec.nvals, nbytes, len(dec.data)) | |
492 | } | |
493 | ||
494 | copyFromFloat32LE(out, dec.data[:nbytes]) | |
495 | dec.data = dec.data[nbytes:] | |
496 | dec.nvals -= max | |
497 | return max, nil | |
498 | } | |
499 | ||
500 | // DecodeSpaced is the same as decode, except it expands the data out to leave spaces for null values | |
501 | // as defined by the bitmap provided. | |
502 | func (dec *PlainFloat32Decoder) DecodeSpaced(out []float32, nullCount int, validBits []byte, validBitsOffset int64) (int, error) { | |
503 | toread := len(out) - nullCount | |
504 | values, err := dec.Decode(out[:toread]) | |
505 | if err != nil { | |
506 | return 0, err | |
507 | } | |
508 | if values != toread { | |
509 | return 0, xerrors.New("parquet: number of values / definition levels read did not match") | |
510 | } | |
511 | ||
512 | nvalues := len(out) | |
513 | if nullCount == 0 { | |
514 | return nvalues, nil | |
515 | } | |
516 | ||
517 | idxDecode := nvalues - nullCount | |
518 | if dec.bitSetReader == nil { | |
519 | dec.bitSetReader = utils.NewReverseSetBitRunReader(validBits, validBitsOffset, int64(nvalues)) | |
520 | } else { | |
521 | dec.bitSetReader.Reset(validBits, validBitsOffset, int64(nvalues)) | |
522 | } | |
523 | ||
524 | for { | |
525 | run := dec.bitSetReader.NextRun() | |
526 | if run.Length == 0 { | |
527 | break | |
528 | } | |
529 | ||
530 | idxDecode -= int(run.Length) | |
531 | copy(out[int(run.Pos):], out[idxDecode:idxDecode+int(run.Length)]) | |
532 | } | |
533 | return nvalues, nil | |
534 | } | |
535 | ||
536 | // PlainFloat64Encoder is an encoder for float64 values using Plain Encoding | |
537 | // which in general is just storing the values as raw bytes of the appropriate size | |
538 | type PlainFloat64Encoder struct { | |
539 | encoder | |
540 | ||
541 | bitSetReader utils.SetBitRunReader | |
542 | } | |
543 | ||
544 | // Put encodes a slice of values into the underlying buffer | |
545 | func (enc *PlainFloat64Encoder) Put(in []float64) { | |
546 | writeFloat64LE(&enc.encoder, in) | |
547 | } | |
548 | ||
549 | // PutSpaced encodes a slice of values into the underlying buffer which are spaced out | |
550 | // including null values defined by the validBits bitmap starting at a given bit offset. | |
551 | // the values are first compressed by having the null slots removed before writing to the buffer | |
552 | func (enc *PlainFloat64Encoder) PutSpaced(in []float64, validBits []byte, validBitsOffset int64) { | |
553 | nbytes := arrow.Float64Traits.BytesRequired(len(in)) | |
554 | enc.ReserveForWrite(nbytes) | |
555 | ||
556 | if enc.bitSetReader == nil { | |
557 | enc.bitSetReader = utils.NewSetBitRunReader(validBits, validBitsOffset, int64(len(in))) | |
558 | } else { | |
559 | enc.bitSetReader.Reset(validBits, validBitsOffset, int64(len(in))) | |
560 | } | |
561 | ||
562 | for { | |
563 | run := enc.bitSetReader.NextRun() | |
564 | if run.Length == 0 { | |
565 | break | |
566 | } | |
567 | enc.Put(in[int(run.Pos):int(run.Pos+run.Length)]) | |
568 | } | |
569 | } | |
570 | ||
571 | // Type returns the underlying physical type this encoder is able to encode | |
572 | func (PlainFloat64Encoder) Type() parquet.Type { | |
573 | return parquet.Types.Double | |
574 | } | |
575 | ||
576 | // PlainFloat64Decoder is a decoder specifically for decoding Plain Encoding data | |
577 | // of float64 type. | |
578 | type PlainFloat64Decoder struct { | |
579 | decoder | |
580 | ||
581 | bitSetReader utils.SetBitRunReader | |
582 | } | |
583 | ||
584 | // Type returns the physical type this decoder is able to decode for | |
585 | func (PlainFloat64Decoder) Type() parquet.Type { | |
586 | return parquet.Types.Double | |
587 | } | |
588 | ||
589 | // Decode populates the given slice with values from the data to be decoded, | |
590 | // decoding the min(len(out), remaining values). | |
591 | // It returns the number of values actually decoded and any error encountered. | |
592 | func (dec *PlainFloat64Decoder) Decode(out []float64) (int, error) { | |
593 | max := utils.MinInt(len(out), dec.nvals) | |
594 | nbytes := int64(max) * int64(arrow.Float64SizeBytes) | |
595 | if nbytes > int64(len(dec.data)) || nbytes > math.MaxInt32 { | |
596 | return 0, xerrors.Errorf("parquet: eof exception decode plain Float64, nvals: %d, nbytes: %d, datalen: %d", dec.nvals, nbytes, len(dec.data)) | |
597 | } | |
598 | ||
599 | copyFromFloat64LE(out, dec.data[:nbytes]) | |
600 | dec.data = dec.data[nbytes:] | |
601 | dec.nvals -= max | |
602 | return max, nil | |
603 | } | |
604 | ||
605 | // DecodeSpaced is the same as decode, except it expands the data out to leave spaces for null values | |
606 | // as defined by the bitmap provided. | |
607 | func (dec *PlainFloat64Decoder) DecodeSpaced(out []float64, nullCount int, validBits []byte, validBitsOffset int64) (int, error) { | |
608 | toread := len(out) - nullCount | |
609 | values, err := dec.Decode(out[:toread]) | |
610 | if err != nil { | |
611 | return 0, err | |
612 | } | |
613 | if values != toread { | |
614 | return 0, xerrors.New("parquet: number of values / definition levels read did not match") | |
615 | } | |
616 | ||
617 | nvalues := len(out) | |
618 | if nullCount == 0 { | |
619 | return nvalues, nil | |
620 | } | |
621 | ||
622 | idxDecode := nvalues - nullCount | |
623 | if dec.bitSetReader == nil { | |
624 | dec.bitSetReader = utils.NewReverseSetBitRunReader(validBits, validBitsOffset, int64(nvalues)) | |
625 | } else { | |
626 | dec.bitSetReader.Reset(validBits, validBitsOffset, int64(nvalues)) | |
627 | } | |
628 | ||
629 | for { | |
630 | run := dec.bitSetReader.NextRun() | |
631 | if run.Length == 0 { | |
632 | break | |
633 | } | |
634 | ||
635 | idxDecode -= int(run.Length) | |
636 | copy(out[int(run.Pos):], out[idxDecode:idxDecode+int(run.Length)]) | |
637 | } | |
638 | return nvalues, nil | |
639 | } |