]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, software | |
12 | // distributed under the License is distributed on an "AS IS" BASIS, | |
13 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | // See the License for the specific language governing permissions and | |
15 | // limitations under the License. | |
16 | ||
17 | package metadata | |
18 | ||
19 | import ( | |
20 | "github.com/apache/arrow/go/v6/parquet" | |
21 | "github.com/apache/arrow/go/v6/parquet/schema" | |
22 | "github.com/apache/arrow/go/v6/parquet/internal/utils" | |
23 | "github.com/apache/arrow/go/v6/parquet/internal/encoding" | |
24 | ) | |
25 | ||
26 | {{range .In}} | |
27 | type minmaxPair{{.Name}} [2]{{.name}} | |
28 | ||
29 | // {{.Name}}Statistics is the typed interface for managing stats for a column | |
30 | // of {{.Name}} type. | |
31 | type {{.Name}}Statistics struct { | |
32 | statistics | |
33 | min {{.name}} | |
34 | max {{.name}} | |
35 | ||
36 | bitSetReader utils.SetBitRunReader | |
37 | } | |
38 | ||
39 | // New{{.Name}}Statistics constructs an appropriate stat object type using the | |
40 | // given column descriptor and allocator. | |
41 | // | |
42 | // Panics if the physical type of descr is not parquet.Type.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}} | |
43 | func New{{.Name}}Statistics(descr *schema.Column, mem memory.Allocator) *{{.Name}}Statistics { | |
44 | if descr.PhysicalType() != parquet.Types.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}} { | |
45 | panic(xerrors.Errorf("parquet: invalid type %s for constructing a {{.Name}} stat object", descr.PhysicalType())) | |
46 | } | |
47 | ||
48 | return &{{.Name}}Statistics{ | |
49 | statistics: statistics{ | |
50 | descr: descr, | |
51 | hasNullCount: true, | |
52 | hasDistinctCount: true, | |
53 | order: descr.SortOrder(), | |
54 | encoder: encoding.NewEncoder(descr.PhysicalType(), parquet.Encodings.Plain, false, descr, mem), | |
55 | mem: mem, | |
56 | }, | |
57 | {{if eq .Name "ByteArray"}} | |
58 | min: make([]byte, 0), | |
59 | max: make([]byte, 0), | |
60 | {{end}} | |
61 | } | |
62 | } | |
63 | ||
64 | // New{{.Name}}StatisticsFromEncoded will construct a propertly typed statistics object | |
65 | // initializing it with the provided information. | |
66 | func New{{.Name}}StatisticsFromEncoded(descr *schema.Column, mem memory.Allocator, nvalues int64, encoded StatProvider) *{{.Name}}Statistics { | |
67 | ret := New{{.Name}}Statistics(descr, mem) | |
68 | ret.nvalues += nvalues | |
69 | if encoded.IsSetNullCount() { | |
70 | ret.incNulls(encoded.GetNullCount()) | |
71 | } | |
72 | if encoded.IsSetDistinctCount() { | |
73 | ret.incDistinct(encoded.GetDistinctCount()) | |
74 | } | |
75 | ||
76 | encodedMin := encoded.GetMin() | |
77 | if encodedMin != nil && len(encodedMin) > 0 { | |
78 | ret.min = ret.plainDecode(encodedMin) | |
79 | } | |
80 | encodedMax := encoded.GetMax() | |
81 | if encodedMax != nil && len(encodedMax) > 0 { | |
82 | ret.max = ret.plainDecode(encodedMax) | |
83 | } | |
84 | ret.hasMinMax = encoded.IsSetMax() || encoded.IsSetMin() | |
85 | return ret | |
86 | } | |
87 | ||
88 | func (s *{{.Name}}Statistics) plainEncode(src {{.name}}) []byte { | |
89 | {{- if eq .Name "ByteArray"}} | |
90 | return src | |
91 | {{- else}} | |
92 | s.encoder.(encoding.{{.Name}}Encoder).Put([]{{.name}}{src}) | |
93 | buf, err := s.encoder.FlushValues() | |
94 | if err != nil { | |
95 | panic(err) // recovered by Encode | |
96 | } | |
97 | defer buf.Release() | |
98 | ||
99 | out := make([]byte, buf.Len()) | |
100 | copy(out, buf.Bytes()) | |
101 | return out | |
102 | {{- end}} | |
103 | } | |
104 | ||
105 | func (s *{{.Name}}Statistics) plainDecode(src []byte) {{.name}} { | |
106 | {{- if eq .Name "ByteArray"}} | |
107 | return src | |
108 | {{- else}} | |
109 | var buf [1]{{.name}} | |
110 | ||
111 | decoder := encoding.NewDecoder(s.descr.PhysicalType(), parquet.Encodings.Plain, s.descr, s.mem) | |
112 | decoder.SetData(1, src) | |
113 | decoder.(encoding.{{.Name}}Decoder).Decode(buf[:]) | |
114 | return buf[0] | |
115 | {{- end}} | |
116 | } | |
117 | ||
118 | {{if and (ne .Name "ByteArray") (ne .Name "FixedLenByteArray")}} | |
119 | func (s *{{.Name}}Statistics) minval(a, b {{.name}}) {{.name}} { | |
120 | if s.less(a, b) { | |
121 | return a | |
122 | } | |
123 | return b | |
124 | } | |
125 | ||
126 | func (s *{{.Name}}Statistics) maxval(a, b {{.name}}) {{.name}} { | |
127 | if s.less(a, b) { | |
128 | return b | |
129 | } | |
130 | return a | |
131 | } | |
132 | {{else}} | |
133 | func (s *{{.Name}}Statistics) minval(a, b {{.name}}) {{.name}} { | |
134 | switch { | |
135 | case a == nil: | |
136 | return b | |
137 | case b == nil: | |
138 | return a | |
139 | case s.less(a, b): | |
140 | return a | |
141 | default: | |
142 | return b | |
143 | } | |
144 | } | |
145 | ||
146 | func (s *{{.Name}}Statistics) maxval(a, b {{.name}}) {{.name}} { | |
147 | switch { | |
148 | case a == nil: | |
149 | return b | |
150 | case b == nil: | |
151 | return a | |
152 | case s.less(a, b): | |
153 | return b | |
154 | default: | |
155 | return a | |
156 | } | |
157 | } | |
158 | {{end}} | |
159 | ||
160 | // MinMaxEqual returns true if both stat objects have the same Min and Max values | |
161 | func (s *{{.Name}}Statistics) MinMaxEqual(rhs *{{.Name}}Statistics) bool { | |
162 | return s.equal(s.min, rhs.min) && s.equal(s.max, rhs.max) | |
163 | } | |
164 | ||
165 | // Equals returns true only if both objects are the same type, have the same min and | |
166 | // max values, null count, distinct count and number of values. | |
167 | func (s *{{.Name}}Statistics) Equals(other TypedStatistics) bool { | |
168 | if s.Type() != other.Type() { | |
169 | return false | |
170 | } | |
171 | rhs, ok := other.(*{{.Name}}Statistics) | |
172 | if !ok { | |
173 | return false | |
174 | } | |
175 | ||
176 | if s.HasMinMax() != rhs.HasMinMax() { return false } | |
177 | return (s.hasMinMax && s.MinMaxEqual(rhs)) && | |
178 | s.NullCount() == rhs.NullCount() && | |
179 | s.DistinctCount() == rhs.DistinctCount() && | |
180 | s.NumValues() == rhs.NumValues() | |
181 | } | |
182 | ||
183 | {{if or (eq .name "float32") (eq .name "float64")}} | |
184 | func (s *{{.Name}}Statistics) coalesce(val, fallback {{.name}}) {{.name}} { | |
185 | if math.IsNaN(float64(val)) { | |
186 | return fallback | |
187 | } | |
188 | return val | |
189 | } | |
190 | {{end}} | |
191 | ||
192 | func (s *{{.Name}}Statistics) getMinMax(values []{{.name}}) (min, max {{.name}}) { | |
193 | {{- if or (eq .name "int32") (eq .name "int64")}} | |
194 | if s.order == schema.SortSIGNED { | |
195 | min, max = utils.GetMinMax{{.Name}}(values) | |
196 | } else { | |
197 | umin, umax := utils.GetMinMaxU{{.name}}(arrow.U{{.name}}Traits.CastFromBytes(arrow.{{.Name}}Traits.CastToBytes(values))) | |
198 | min, max = {{.name}}(umin), {{.name}}(umax) | |
199 | } | |
200 | {{- else}} | |
201 | defMin := s.defaultMin() | |
202 | defMax := s.defaultMax() | |
203 | ||
204 | min = defMin | |
205 | max = defMax | |
206 | ||
207 | for _, v := range values { | |
208 | {{- if or (eq .name "float32") (eq .name "float64") }} | |
209 | min = s.minval(min, s.coalesce(v, defMin)) | |
210 | max = s.maxval(max, s.coalesce(v, defMax)) | |
211 | {{- else}} | |
212 | min = s.minval(min, v) | |
213 | max = s.maxval(max, v) | |
214 | {{- end }} | |
215 | } | |
216 | {{- end}} | |
217 | return | |
218 | } | |
219 | ||
220 | func (s *{{.Name}}Statistics) getMinMaxSpaced(values []{{.name}}, validBits []byte, validBitsOffset int64) (min, max {{.name}}) { | |
221 | min = s.defaultMin() | |
222 | max = s.defaultMax() | |
223 | ||
224 | {{- if or (eq .name "int32") (eq .name "int64")}} | |
225 | var fn func([]{{.name}}) ({{.name}}, {{.name}}) | |
226 | if s.order == schema.SortSIGNED { | |
227 | fn = utils.GetMinMax{{.Name}} | |
228 | } else { | |
229 | fn = func(v []{{.name}}) ({{.name}}, {{.name}}) { | |
230 | umin, umax := utils.GetMinMaxU{{.name}}(arrow.U{{.name}}Traits.CastFromBytes(arrow.{{.Name}}Traits.CastToBytes(values))) | |
231 | return {{.name}}(umin), {{.name}}(umax) | |
232 | } | |
233 | } | |
234 | {{- end}} | |
235 | ||
236 | if s.bitSetReader == nil { | |
237 | s.bitSetReader = utils.NewSetBitRunReader(validBits, validBitsOffset, int64(len(values))) | |
238 | } else { | |
239 | s.bitSetReader.Reset(validBits, validBitsOffset, int64(len(values))) | |
240 | } | |
241 | ||
242 | for { | |
243 | run := s.bitSetReader.NextRun() | |
244 | if run.Length == 0 { | |
245 | break | |
246 | } | |
247 | {{- if or (eq .name "int32") (eq .name "int64")}} | |
248 | localMin, localMax := fn(values[int(run.Pos):int(run.Pos+run.Length)]) | |
249 | if min > localMin { | |
250 | min = localMin | |
251 | } | |
252 | if max < localMax { | |
253 | max = localMax | |
254 | } | |
255 | {{- else}} | |
256 | for _, v := range values[int(run.Pos):int(run.Pos+run.Length)] { | |
257 | {{- if or (eq .name "float32") (eq .name "float64") }} | |
258 | min = s.minval(min, coalesce(v, s.defaultMin()).({{.name}})) | |
259 | max = s.maxval(max, coalesce(v, s.defaultMax()).({{.name}})) | |
260 | {{- else}} | |
261 | min = s.minval(min, v) | |
262 | max = s.maxval(max, v) | |
263 | {{- end }} | |
264 | } | |
265 | {{- end}} | |
266 | } | |
267 | return | |
268 | } | |
269 | ||
270 | func (s *{{.Name}}Statistics) Min() {{.name}} { return s.min } | |
271 | func (s *{{.Name}}Statistics) Max() {{.name}} { return s.max } | |
272 | ||
273 | // Merge merges the stats from other into this stat object, updating | |
274 | // the null count, distinct count, number of values and the min/max if | |
275 | // appropriate. | |
276 | func (s *{{.Name}}Statistics) Merge(other TypedStatistics) { | |
277 | rhs, ok := other.(*{{.Name}}Statistics) | |
278 | if !ok { | |
279 | panic("incompatible stat type merge") | |
280 | } | |
281 | ||
282 | s.statistics.merge(rhs) | |
283 | if rhs.HasMinMax() { | |
284 | s.SetMinMax(rhs.Min(), rhs.Max()) | |
285 | } | |
286 | } | |
287 | ||
288 | // Update is used to add more values to the current stat object, finding the | |
289 | // min and max values etc. | |
290 | func (s *{{.Name}}Statistics) Update(values []{{.name}}, numNull int64) { | |
291 | s.incNulls(numNull) | |
292 | s.nvalues += int64(len(values)) | |
293 | ||
294 | if len(values) == 0 { | |
295 | return | |
296 | } | |
297 | ||
298 | s.SetMinMax(s.getMinMax(values)) | |
299 | } | |
300 | ||
301 | // UpdateSpaced is just like Update, but for spaced values using validBits to determine | |
302 | // and skip null values. | |
303 | func (s *{{.Name}}Statistics) UpdateSpaced(values []{{.name}}, validBits []byte, validBitsOffset, numNull int64) { | |
304 | s.incNulls(numNull) | |
305 | notnull := int64(len(values)) - numNull | |
306 | s.nvalues += notnull | |
307 | ||
308 | if notnull == 0 { | |
309 | return | |
310 | } | |
311 | ||
312 | s.SetMinMax(s.getMinMaxSpaced(values, validBits, validBitsOffset)) | |
313 | } | |
314 | ||
315 | // SetMinMax updates the min and max values only if they are not currently set | |
316 | // or if argMin is less than the current min / argMax is greater than the current max | |
317 | func (s *{{.Name}}Statistics) SetMinMax(argMin, argMax {{.name}}) { | |
318 | maybeMinMax := s.cleanStat([2]{{.name}}{argMin, argMax}) | |
319 | if maybeMinMax == nil { | |
320 | return | |
321 | } | |
322 | ||
323 | min := (*maybeMinMax)[0] | |
324 | max := (*maybeMinMax)[1] | |
325 | ||
326 | if !s.hasMinMax { | |
327 | s.hasMinMax = true | |
328 | s.min = min | |
329 | s.max = max | |
330 | } else { | |
331 | if !s.less(s.min, min) { | |
332 | s.min = min | |
333 | } | |
334 | if s.less(s.max, max) { | |
335 | s.max = max | |
336 | } | |
337 | } | |
338 | } | |
339 | ||
340 | // EncodeMin returns the encoded min value with plain encoding. | |
341 | // | |
342 | // ByteArray stats do not include the length in the encoding. | |
343 | func (s *{{.Name}}Statistics) EncodeMin() []byte { | |
344 | if s.HasMinMax() { | |
345 | return s.plainEncode(s.min) | |
346 | } | |
347 | return nil | |
348 | } | |
349 | ||
350 | // EncodeMax returns the current encoded max value with plain encoding | |
351 | // | |
352 | // ByteArray stats do not include the length in the encoding | |
353 | func (s *{{.Name}}Statistics) EncodeMax() []byte{ | |
354 | if s.HasMinMax() { | |
355 | return s.plainEncode(s.max) | |
356 | } | |
357 | return nil | |
358 | } | |
359 | ||
360 | // Encode returns a populated EncodedStatistics object | |
361 | func (s *{{.Name}}Statistics) Encode() (enc EncodedStatistics, err error) { | |
362 | defer func() { | |
363 | if r := recover(); r != nil { | |
364 | switch r := r.(type) { | |
365 | case error: | |
366 | err = r | |
367 | case string: | |
368 | err = xerrors.New(r) | |
369 | default: | |
370 | err = xerrors.Errorf("unknown error type thrown from panic: %v", r) | |
371 | } | |
372 | } | |
373 | }() | |
374 | if s.HasMinMax() { | |
375 | enc.SetMax(s.EncodeMax()) | |
376 | enc.SetMin(s.EncodeMin()) | |
377 | } | |
378 | if s.HasNullCount() { | |
379 | enc.SetNullCount(s.NullCount()) | |
380 | } | |
381 | if s.HasDistinctCount() { | |
382 | enc.SetDistinctCount(s.DistinctCount()) | |
383 | } | |
384 | return | |
385 | } | |
386 | {{end}} | |
387 | ||
388 | // NewStatistics uses the type in the column descriptor to construct the appropriate | |
389 | // typed stats object. If mem is nil, then memory.DefaultAllocator will be used. | |
390 | func NewStatistics(descr *schema.Column, mem memory.Allocator) TypedStatistics { | |
391 | if mem == nil { | |
392 | mem = memory.DefaultAllocator | |
393 | } | |
394 | switch descr.PhysicalType() { | |
395 | {{- range .In}} | |
396 | case parquet.Types.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}}: | |
397 | return New{{.Name}}Statistics(descr, mem) | |
398 | {{- end}} | |
399 | default: | |
400 | panic("not implemented") | |
401 | } | |
402 | } | |
403 | ||
404 | // NewStatisticsFromEncoded uses the provided information to initialize a typed stat object | |
405 | // by checking the type of the provided column descriptor. | |
406 | // | |
407 | // If mem is nil, then memory.DefaultAllocator is used. | |
408 | func NewStatisticsFromEncoded(descr *schema.Column, mem memory.Allocator, nvalues int64, encoded StatProvider) TypedStatistics { | |
409 | if mem == nil { | |
410 | mem = memory.DefaultAllocator | |
411 | } | |
412 | switch descr.PhysicalType() { | |
413 | {{- range .In}} | |
414 | case parquet.Types.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}}: | |
415 | return New{{.Name}}StatisticsFromEncoded(descr, mem, nvalues, encoded) | |
416 | {{- end}} | |
417 | default: | |
418 | panic("not implemented") | |
419 | } | |
420 | } |