1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
17 // Package schema provides types and functions for manipulating and building parquet
20 // Some of the utilities provided include building a schema using Struct Tags
21 // on a struct type, getting Column Paths from a node, and dealing with the
22 // converted and logical types for Parquet.
24 // Logical types specify ways to interpret the primitive types allowing the
25 // number of primitive types to be smaller and reuse efficient encodings.
26 // For instance a "string" is just a ByteArray column with a UTF-8 annotation
27 // or "String Logical Type".
29 // For more information about Logical and Converted Types, check:
30 // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
38 "github.com/apache/arrow/go/v6/parquet"
39 format "github.com/apache/arrow/go/v6/parquet/internal/gen-go/parquet"
40 "golang.org/x/xerrors"
43 // Schema is the container for the converted Parquet schema with a computed
44 // information from the schema analysis needed for file reading
46 // * Column index to Node
48 // * Max repetition / definition levels for each primitive node
50 // The ColumnDescriptor objects produced by this class can be used to assist in
51 // the reconstruction of fully materialized data structures from the
52 // repetition-definition level encoding of nested data
57 nodeToLeaf map[*PrimitiveNode]int
58 leafToBase map[int]Node
59 leafToIndex strIntMultimap
62 // FromParquet converts a slice of thrift Schema Elements to the correct node type
63 func FromParquet(elems []*format.SchemaElement) (Node, error) {
65 return nil, xerrors.New("parquet: empty schema (no root)")
68 if elems[0].GetNumChildren() == 0 {
70 return nil, xerrors.New("parquet: schema had multiple nodes but root had no children")
72 // parquet file with no columns
73 return GroupNodeFromThrift(elems[0], []Node{})
76 // We don't check that the root node is repeated since this is not
77 // consistently set by implementations
80 nextNode func() (Node, error)
83 nextNode = func() (Node, error) {
84 if pos == len(elems) {
85 return nil, xerrors.New("parquet: malformed schema: not enough elements")
91 if elem.GetNumChildren() == 0 {
92 return PrimitiveNodeFromThrift(elem)
95 fields := make([]Node, 0, elem.GetNumChildren())
96 for i := 0; i < int(elem.GetNumChildren()); i++ {
101 fields = append(fields, n)
104 return GroupNodeFromThrift(elem, fields)
110 // Root returns the group node that is the root of this schema
111 func (s *Schema) Root() *GroupNode {
112 return s.root.(*GroupNode)
115 // NumColumns returns the number of leaf nodes that are the actual primitive
116 // columns in this schema.
117 func (s *Schema) NumColumns() int {
121 // Equals returns true as long as the leaf columns are equal, doesn't take
122 // into account the groups and only checks whether the schemas are compatible
123 // at the physical storage level.
124 func (s *Schema) Equals(rhs *Schema) bool {
125 if s.NumColumns() != rhs.NumColumns() {
129 for idx, c := range s.leaves {
130 if !c.Equals(rhs.Column(idx)) {
137 func (s *Schema) buildTree(n Node, maxDefLvl, maxRepLvl int16, base Node) {
138 switch n.RepetitionType() {
139 case parquet.Repetitions.Repeated:
142 case parquet.Repetitions.Optional:
146 switch n := n.(type) {
148 for _, f := range n.fields {
149 s.buildTree(f, maxDefLvl, maxRepLvl, base)
152 s.nodeToLeaf[n] = len(s.leaves)
153 s.leaves = append(s.leaves, NewColumn(n, maxDefLvl, maxRepLvl))
154 s.leafToBase[len(s.leaves)-1] = base
155 s.leafToIndex.Add(n.Path(), len(s.leaves)-1)
159 // Column returns the (0-indexed) column of the provided index.
160 func (s *Schema) Column(i int) *Column {
164 // ColumnIndexByName looks up the column by it's full dot separated
165 // node path. If there are multiple columns that match, it returns the first one.
167 // Returns -1 if not found.
168 func (s *Schema) ColumnIndexByName(nodePath string) int {
169 if search, ok := s.leafToIndex[nodePath]; ok {
175 // ColumnIndexByNode returns the index of the column represented by this node.
177 // Returns -1 if not found.
178 func (s *Schema) ColumnIndexByNode(n Node) int {
179 if search, ok := s.leafToIndex[n.Path()]; ok {
180 for _, idx := range search {
181 if n == s.Column(idx).SchemaNode() {
189 // ColumnRoot returns the root node of a given column if it is under a
190 // nested group node, providing that root group node.
191 func (s *Schema) ColumnRoot(i int) Node {
192 return s.leafToBase[i]
195 // HasRepeatedFields returns true if any node in the schema has a repeated field type.
196 func (s *Schema) HasRepeatedFields() bool {
197 return s.root.(*GroupNode).HasRepeatedFields()
200 // UpdateColumnOrders must get a slice that is the same length as the number of leaf columns
201 // and is used to update the schema metadata Column Orders. len(orders) must equal s.NumColumns()
202 func (s *Schema) UpdateColumnOrders(orders []parquet.ColumnOrder) error {
203 if len(orders) != s.NumColumns() {
204 return xerrors.New("parquet: malformed schema: not enough ColumnOrder values")
207 visitor := schemaColumnOrderUpdater{orders, 0}
208 s.root.Visit(&visitor)
212 // NewSchema constructs a new Schema object from a root group node.
214 // Any fields with a field-id of -1 will be given an appropriate field number based on their order.
215 func NewSchema(root *GroupNode) *Schema {
219 make(map[*PrimitiveNode]int),
221 make(strIntMultimap),
224 for _, f := range root.fields {
225 s.buildTree(f, 0, 0, f)
230 type schemaColumnOrderUpdater struct {
231 colOrders []parquet.ColumnOrder
235 func (s *schemaColumnOrderUpdater) VisitPre(n Node) bool {
236 if n.Type() == Primitive {
237 leaf := n.(*PrimitiveNode)
238 leaf.ColumnOrder = s.colOrders[s.leafCount]
244 func (s *schemaColumnOrderUpdater) VisitPost(Node) {}
246 type toThriftVisitor struct {
247 elements []*format.SchemaElement
250 func (t *toThriftVisitor) VisitPre(n Node) bool {
251 t.elements = append(t.elements, n.toThrift())
255 func (t *toThriftVisitor) VisitPost(Node) {}
257 // ToThrift converts a GroupNode to a slice of SchemaElements which is used
258 // for thrift serialization.
259 func ToThrift(schema *GroupNode) []*format.SchemaElement {
260 t := &toThriftVisitor{make([]*format.SchemaElement, 0)}
265 type schemaPrinter struct {
271 func (s *schemaPrinter) VisitPre(n Node) bool {
272 fmt.Fprint(s.w, strings.Repeat(" ", s.indent))
273 if n.Type() == Group {
275 fmt.Fprintf(s.w, "%s group field_id=%d %s", g.RepetitionType(), g.FieldID(), g.Name())
276 _, invalid := g.logicalType.(UnknownLogicalType)
277 _, none := g.logicalType.(NoLogicalType)
279 if g.logicalType != nil && !invalid && !none {
280 fmt.Fprintf(s.w, " (%s)", g.logicalType)
281 } else if g.convertedType != ConvertedTypes.None {
282 fmt.Fprintf(s.w, " (%s)", g.convertedType)
285 fmt.Fprintln(s.w, " {")
286 s.indent += s.indentWidth
288 p := n.(*PrimitiveNode)
289 fmt.Fprintf(s.w, "%s %s field_id=%d %s", p.RepetitionType(), strings.ToLower(p.PhysicalType().String()), p.FieldID(), p.Name())
290 _, invalid := p.logicalType.(UnknownLogicalType)
291 _, none := p.logicalType.(NoLogicalType)
293 if p.logicalType != nil && !invalid && !none {
294 fmt.Fprintf(s.w, " (%s)", p.logicalType)
295 } else if p.convertedType == ConvertedTypes.Decimal {
296 fmt.Fprintf(s.w, " (%s(%d,%d))", p.convertedType, p.DecimalMetadata().Precision, p.DecimalMetadata().Scale)
297 } else if p.convertedType != ConvertedTypes.None {
298 fmt.Fprintf(s.w, " (%s)", p.convertedType)
300 fmt.Fprintln(s.w, ";")
305 func (s *schemaPrinter) VisitPost(n Node) {
306 if n.Type() == Group {
307 s.indent -= s.indentWidth
308 fmt.Fprint(s.w, strings.Repeat(" ", s.indent))
309 fmt.Fprintln(s.w, "}")
313 // PrintSchema writes a string representation of the tree to w using the indent
315 func PrintSchema(n Node, w io.Writer, indentWidth int) {
316 n.Visit(&schemaPrinter{w, 0, indentWidth})
319 type strIntMultimap map[string][]int
321 func (f strIntMultimap) Add(key string, val int) bool {
322 if _, ok := f[key]; !ok {
326 f[key] = append(f[key], val)