// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package metadata import ( "github.com/apache/arrow/go/v6/parquet" "github.com/apache/arrow/go/v6/parquet/schema" "github.com/apache/arrow/go/v6/parquet/internal/utils" "github.com/apache/arrow/go/v6/parquet/internal/encoding" ) {{range .In}} type minmaxPair{{.Name}} [2]{{.name}} // {{.Name}}Statistics is the typed interface for managing stats for a column // of {{.Name}} type. type {{.Name}}Statistics struct { statistics min {{.name}} max {{.name}} bitSetReader utils.SetBitRunReader } // New{{.Name}}Statistics constructs an appropriate stat object type using the // given column descriptor and allocator. // // Panics if the physical type of descr is not parquet.Type.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}} func New{{.Name}}Statistics(descr *schema.Column, mem memory.Allocator) *{{.Name}}Statistics { if descr.PhysicalType() != parquet.Types.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}} { panic(xerrors.Errorf("parquet: invalid type %s for constructing a {{.Name}} stat object", descr.PhysicalType())) } return &{{.Name}}Statistics{ statistics: statistics{ descr: descr, hasNullCount: true, hasDistinctCount: true, order: descr.SortOrder(), encoder: encoding.NewEncoder(descr.PhysicalType(), parquet.Encodings.Plain, false, descr, mem), mem: mem, }, {{if eq .Name "ByteArray"}} min: make([]byte, 0), max: make([]byte, 0), {{end}} } } // New{{.Name}}StatisticsFromEncoded will construct a propertly typed statistics object // initializing it with the provided information. func New{{.Name}}StatisticsFromEncoded(descr *schema.Column, mem memory.Allocator, nvalues int64, encoded StatProvider) *{{.Name}}Statistics { ret := New{{.Name}}Statistics(descr, mem) ret.nvalues += nvalues if encoded.IsSetNullCount() { ret.incNulls(encoded.GetNullCount()) } if encoded.IsSetDistinctCount() { ret.incDistinct(encoded.GetDistinctCount()) } encodedMin := encoded.GetMin() if encodedMin != nil && len(encodedMin) > 0 { ret.min = ret.plainDecode(encodedMin) } encodedMax := encoded.GetMax() if encodedMax != nil && len(encodedMax) > 0 { ret.max = ret.plainDecode(encodedMax) } ret.hasMinMax = encoded.IsSetMax() || encoded.IsSetMin() return ret } func (s *{{.Name}}Statistics) plainEncode(src {{.name}}) []byte { {{- if eq .Name "ByteArray"}} return src {{- else}} s.encoder.(encoding.{{.Name}}Encoder).Put([]{{.name}}{src}) buf, err := s.encoder.FlushValues() if err != nil { panic(err) // recovered by Encode } defer buf.Release() out := make([]byte, buf.Len()) copy(out, buf.Bytes()) return out {{- end}} } func (s *{{.Name}}Statistics) plainDecode(src []byte) {{.name}} { {{- if eq .Name "ByteArray"}} return src {{- else}} var buf [1]{{.name}} decoder := encoding.NewDecoder(s.descr.PhysicalType(), parquet.Encodings.Plain, s.descr, s.mem) decoder.SetData(1, src) decoder.(encoding.{{.Name}}Decoder).Decode(buf[:]) return buf[0] {{- end}} } {{if and (ne .Name "ByteArray") (ne .Name "FixedLenByteArray")}} func (s *{{.Name}}Statistics) minval(a, b {{.name}}) {{.name}} { if s.less(a, b) { return a } return b } func (s *{{.Name}}Statistics) maxval(a, b {{.name}}) {{.name}} { if s.less(a, b) { return b } return a } {{else}} func (s *{{.Name}}Statistics) minval(a, b {{.name}}) {{.name}} { switch { case a == nil: return b case b == nil: return a case s.less(a, b): return a default: return b } } func (s *{{.Name}}Statistics) maxval(a, b {{.name}}) {{.name}} { switch { case a == nil: return b case b == nil: return a case s.less(a, b): return b default: return a } } {{end}} // MinMaxEqual returns true if both stat objects have the same Min and Max values func (s *{{.Name}}Statistics) MinMaxEqual(rhs *{{.Name}}Statistics) bool { return s.equal(s.min, rhs.min) && s.equal(s.max, rhs.max) } // Equals returns true only if both objects are the same type, have the same min and // max values, null count, distinct count and number of values. func (s *{{.Name}}Statistics) Equals(other TypedStatistics) bool { if s.Type() != other.Type() { return false } rhs, ok := other.(*{{.Name}}Statistics) if !ok { return false } if s.HasMinMax() != rhs.HasMinMax() { return false } return (s.hasMinMax && s.MinMaxEqual(rhs)) && s.NullCount() == rhs.NullCount() && s.DistinctCount() == rhs.DistinctCount() && s.NumValues() == rhs.NumValues() } {{if or (eq .name "float32") (eq .name "float64")}} func (s *{{.Name}}Statistics) coalesce(val, fallback {{.name}}) {{.name}} { if math.IsNaN(float64(val)) { return fallback } return val } {{end}} func (s *{{.Name}}Statistics) getMinMax(values []{{.name}}) (min, max {{.name}}) { {{- if or (eq .name "int32") (eq .name "int64")}} if s.order == schema.SortSIGNED { min, max = utils.GetMinMax{{.Name}}(values) } else { umin, umax := utils.GetMinMaxU{{.name}}(arrow.U{{.name}}Traits.CastFromBytes(arrow.{{.Name}}Traits.CastToBytes(values))) min, max = {{.name}}(umin), {{.name}}(umax) } {{- else}} defMin := s.defaultMin() defMax := s.defaultMax() min = defMin max = defMax for _, v := range values { {{- if or (eq .name "float32") (eq .name "float64") }} min = s.minval(min, s.coalesce(v, defMin)) max = s.maxval(max, s.coalesce(v, defMax)) {{- else}} min = s.minval(min, v) max = s.maxval(max, v) {{- end }} } {{- end}} return } func (s *{{.Name}}Statistics) getMinMaxSpaced(values []{{.name}}, validBits []byte, validBitsOffset int64) (min, max {{.name}}) { min = s.defaultMin() max = s.defaultMax() {{- if or (eq .name "int32") (eq .name "int64")}} var fn func([]{{.name}}) ({{.name}}, {{.name}}) if s.order == schema.SortSIGNED { fn = utils.GetMinMax{{.Name}} } else { fn = func(v []{{.name}}) ({{.name}}, {{.name}}) { umin, umax := utils.GetMinMaxU{{.name}}(arrow.U{{.name}}Traits.CastFromBytes(arrow.{{.Name}}Traits.CastToBytes(values))) return {{.name}}(umin), {{.name}}(umax) } } {{- end}} if s.bitSetReader == nil { s.bitSetReader = utils.NewSetBitRunReader(validBits, validBitsOffset, int64(len(values))) } else { s.bitSetReader.Reset(validBits, validBitsOffset, int64(len(values))) } for { run := s.bitSetReader.NextRun() if run.Length == 0 { break } {{- if or (eq .name "int32") (eq .name "int64")}} localMin, localMax := fn(values[int(run.Pos):int(run.Pos+run.Length)]) if min > localMin { min = localMin } if max < localMax { max = localMax } {{- else}} for _, v := range values[int(run.Pos):int(run.Pos+run.Length)] { {{- if or (eq .name "float32") (eq .name "float64") }} min = s.minval(min, coalesce(v, s.defaultMin()).({{.name}})) max = s.maxval(max, coalesce(v, s.defaultMax()).({{.name}})) {{- else}} min = s.minval(min, v) max = s.maxval(max, v) {{- end }} } {{- end}} } return } func (s *{{.Name}}Statistics) Min() {{.name}} { return s.min } func (s *{{.Name}}Statistics) Max() {{.name}} { return s.max } // Merge merges the stats from other into this stat object, updating // the null count, distinct count, number of values and the min/max if // appropriate. func (s *{{.Name}}Statistics) Merge(other TypedStatistics) { rhs, ok := other.(*{{.Name}}Statistics) if !ok { panic("incompatible stat type merge") } s.statistics.merge(rhs) if rhs.HasMinMax() { s.SetMinMax(rhs.Min(), rhs.Max()) } } // Update is used to add more values to the current stat object, finding the // min and max values etc. func (s *{{.Name}}Statistics) Update(values []{{.name}}, numNull int64) { s.incNulls(numNull) s.nvalues += int64(len(values)) if len(values) == 0 { return } s.SetMinMax(s.getMinMax(values)) } // UpdateSpaced is just like Update, but for spaced values using validBits to determine // and skip null values. func (s *{{.Name}}Statistics) UpdateSpaced(values []{{.name}}, validBits []byte, validBitsOffset, numNull int64) { s.incNulls(numNull) notnull := int64(len(values)) - numNull s.nvalues += notnull if notnull == 0 { return } s.SetMinMax(s.getMinMaxSpaced(values, validBits, validBitsOffset)) } // SetMinMax updates the min and max values only if they are not currently set // or if argMin is less than the current min / argMax is greater than the current max func (s *{{.Name}}Statistics) SetMinMax(argMin, argMax {{.name}}) { maybeMinMax := s.cleanStat([2]{{.name}}{argMin, argMax}) if maybeMinMax == nil { return } min := (*maybeMinMax)[0] max := (*maybeMinMax)[1] if !s.hasMinMax { s.hasMinMax = true s.min = min s.max = max } else { if !s.less(s.min, min) { s.min = min } if s.less(s.max, max) { s.max = max } } } // EncodeMin returns the encoded min value with plain encoding. // // ByteArray stats do not include the length in the encoding. func (s *{{.Name}}Statistics) EncodeMin() []byte { if s.HasMinMax() { return s.plainEncode(s.min) } return nil } // EncodeMax returns the current encoded max value with plain encoding // // ByteArray stats do not include the length in the encoding func (s *{{.Name}}Statistics) EncodeMax() []byte{ if s.HasMinMax() { return s.plainEncode(s.max) } return nil } // Encode returns a populated EncodedStatistics object func (s *{{.Name}}Statistics) Encode() (enc EncodedStatistics, err error) { defer func() { if r := recover(); r != nil { switch r := r.(type) { case error: err = r case string: err = xerrors.New(r) default: err = xerrors.Errorf("unknown error type thrown from panic: %v", r) } } }() if s.HasMinMax() { enc.SetMax(s.EncodeMax()) enc.SetMin(s.EncodeMin()) } if s.HasNullCount() { enc.SetNullCount(s.NullCount()) } if s.HasDistinctCount() { enc.SetDistinctCount(s.DistinctCount()) } return } {{end}} // NewStatistics uses the type in the column descriptor to construct the appropriate // typed stats object. If mem is nil, then memory.DefaultAllocator will be used. func NewStatistics(descr *schema.Column, mem memory.Allocator) TypedStatistics { if mem == nil { mem = memory.DefaultAllocator } switch descr.PhysicalType() { {{- range .In}} case parquet.Types.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}}: return New{{.Name}}Statistics(descr, mem) {{- end}} default: panic("not implemented") } } // NewStatisticsFromEncoded uses the provided information to initialize a typed stat object // by checking the type of the provided column descriptor. // // If mem is nil, then memory.DefaultAllocator is used. func NewStatisticsFromEncoded(descr *schema.Column, mem memory.Allocator, nvalues int64, encoded StatProvider) TypedStatistics { if mem == nil { mem = memory.DefaultAllocator } switch descr.PhysicalType() { {{- range .In}} case parquet.Types.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}}: return New{{.Name}}StatisticsFromEncoded(descr, mem, nvalues, encoded) {{- end}} default: panic("not implemented") } }