]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/go/parquet/schema/schema.go
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / go / parquet / schema / schema.go
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16
17 // Package schema provides types and functions for manipulating and building parquet
18 // file schemas.
19 //
20 // Some of the utilities provided include building a schema using Struct Tags
21 // on a struct type, getting Column Paths from a node, and dealing with the
22 // converted and logical types for Parquet.
23 //
24 // Logical types specify ways to interpret the primitive types allowing the
25 // number of primitive types to be smaller and reuse efficient encodings.
26 // For instance a "string" is just a ByteArray column with a UTF-8 annotation
27 // or "String Logical Type".
28 //
29 // For more information about Logical and Converted Types, check:
30 // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
31 package schema
32
33 import (
34 "fmt"
35 "io"
36 "strings"
37
38 "github.com/apache/arrow/go/v6/parquet"
39 format "github.com/apache/arrow/go/v6/parquet/internal/gen-go/parquet"
40 "golang.org/x/xerrors"
41 )
42
43 // Schema is the container for the converted Parquet schema with a computed
44 // information from the schema analysis needed for file reading
45 //
46 // * Column index to Node
47 //
48 // * Max repetition / definition levels for each primitive node
49 //
50 // The ColumnDescriptor objects produced by this class can be used to assist in
51 // the reconstruction of fully materialized data structures from the
52 // repetition-definition level encoding of nested data
53 type Schema struct {
54 root Node
55
56 leaves []*Column
57 nodeToLeaf map[*PrimitiveNode]int
58 leafToBase map[int]Node
59 leafToIndex strIntMultimap
60 }
61
62 // FromParquet converts a slice of thrift Schema Elements to the correct node type
63 func FromParquet(elems []*format.SchemaElement) (Node, error) {
64 if len(elems) == 0 {
65 return nil, xerrors.New("parquet: empty schema (no root)")
66 }
67
68 if elems[0].GetNumChildren() == 0 {
69 if len(elems) > 1 {
70 return nil, xerrors.New("parquet: schema had multiple nodes but root had no children")
71 }
72 // parquet file with no columns
73 return GroupNodeFromThrift(elems[0], []Node{})
74 }
75
76 // We don't check that the root node is repeated since this is not
77 // consistently set by implementations
78 var (
79 pos = 0
80 nextNode func() (Node, error)
81 )
82
83 nextNode = func() (Node, error) {
84 if pos == len(elems) {
85 return nil, xerrors.New("parquet: malformed schema: not enough elements")
86 }
87
88 elem := elems[pos]
89 pos++
90
91 if elem.GetNumChildren() == 0 {
92 return PrimitiveNodeFromThrift(elem)
93 }
94
95 fields := make([]Node, 0, elem.GetNumChildren())
96 for i := 0; i < int(elem.GetNumChildren()); i++ {
97 n, err := nextNode()
98 if err != nil {
99 return nil, err
100 }
101 fields = append(fields, n)
102 }
103
104 return GroupNodeFromThrift(elem, fields)
105 }
106
107 return nextNode()
108 }
109
110 // Root returns the group node that is the root of this schema
111 func (s *Schema) Root() *GroupNode {
112 return s.root.(*GroupNode)
113 }
114
115 // NumColumns returns the number of leaf nodes that are the actual primitive
116 // columns in this schema.
117 func (s *Schema) NumColumns() int {
118 return len(s.leaves)
119 }
120
121 // Equals returns true as long as the leaf columns are equal, doesn't take
122 // into account the groups and only checks whether the schemas are compatible
123 // at the physical storage level.
124 func (s *Schema) Equals(rhs *Schema) bool {
125 if s.NumColumns() != rhs.NumColumns() {
126 return false
127 }
128
129 for idx, c := range s.leaves {
130 if !c.Equals(rhs.Column(idx)) {
131 return false
132 }
133 }
134 return true
135 }
136
137 func (s *Schema) buildTree(n Node, maxDefLvl, maxRepLvl int16, base Node) {
138 switch n.RepetitionType() {
139 case parquet.Repetitions.Repeated:
140 maxRepLvl++
141 fallthrough
142 case parquet.Repetitions.Optional:
143 maxDefLvl++
144 }
145
146 switch n := n.(type) {
147 case *GroupNode:
148 for _, f := range n.fields {
149 s.buildTree(f, maxDefLvl, maxRepLvl, base)
150 }
151 case *PrimitiveNode:
152 s.nodeToLeaf[n] = len(s.leaves)
153 s.leaves = append(s.leaves, NewColumn(n, maxDefLvl, maxRepLvl))
154 s.leafToBase[len(s.leaves)-1] = base
155 s.leafToIndex.Add(n.Path(), len(s.leaves)-1)
156 }
157 }
158
159 // Column returns the (0-indexed) column of the provided index.
160 func (s *Schema) Column(i int) *Column {
161 return s.leaves[i]
162 }
163
164 // ColumnIndexByName looks up the column by it's full dot separated
165 // node path. If there are multiple columns that match, it returns the first one.
166 //
167 // Returns -1 if not found.
168 func (s *Schema) ColumnIndexByName(nodePath string) int {
169 if search, ok := s.leafToIndex[nodePath]; ok {
170 return search[0]
171 }
172 return -1
173 }
174
175 // ColumnIndexByNode returns the index of the column represented by this node.
176 //
177 // Returns -1 if not found.
178 func (s *Schema) ColumnIndexByNode(n Node) int {
179 if search, ok := s.leafToIndex[n.Path()]; ok {
180 for _, idx := range search {
181 if n == s.Column(idx).SchemaNode() {
182 return idx
183 }
184 }
185 }
186 return -1
187 }
188
189 // ColumnRoot returns the root node of a given column if it is under a
190 // nested group node, providing that root group node.
191 func (s *Schema) ColumnRoot(i int) Node {
192 return s.leafToBase[i]
193 }
194
195 // HasRepeatedFields returns true if any node in the schema has a repeated field type.
196 func (s *Schema) HasRepeatedFields() bool {
197 return s.root.(*GroupNode).HasRepeatedFields()
198 }
199
200 // UpdateColumnOrders must get a slice that is the same length as the number of leaf columns
201 // and is used to update the schema metadata Column Orders. len(orders) must equal s.NumColumns()
202 func (s *Schema) UpdateColumnOrders(orders []parquet.ColumnOrder) error {
203 if len(orders) != s.NumColumns() {
204 return xerrors.New("parquet: malformed schema: not enough ColumnOrder values")
205 }
206
207 visitor := schemaColumnOrderUpdater{orders, 0}
208 s.root.Visit(&visitor)
209 return nil
210 }
211
212 // NewSchema constructs a new Schema object from a root group node.
213 //
214 // Any fields with a field-id of -1 will be given an appropriate field number based on their order.
215 func NewSchema(root *GroupNode) *Schema {
216 s := &Schema{
217 root,
218 make([]*Column, 0),
219 make(map[*PrimitiveNode]int),
220 make(map[int]Node),
221 make(strIntMultimap),
222 }
223
224 for _, f := range root.fields {
225 s.buildTree(f, 0, 0, f)
226 }
227 return s
228 }
229
230 type schemaColumnOrderUpdater struct {
231 colOrders []parquet.ColumnOrder
232 leafCount int
233 }
234
235 func (s *schemaColumnOrderUpdater) VisitPre(n Node) bool {
236 if n.Type() == Primitive {
237 leaf := n.(*PrimitiveNode)
238 leaf.ColumnOrder = s.colOrders[s.leafCount]
239 s.leafCount++
240 }
241 return true
242 }
243
244 func (s *schemaColumnOrderUpdater) VisitPost(Node) {}
245
246 type toThriftVisitor struct {
247 elements []*format.SchemaElement
248 }
249
250 func (t *toThriftVisitor) VisitPre(n Node) bool {
251 t.elements = append(t.elements, n.toThrift())
252 return true
253 }
254
255 func (t *toThriftVisitor) VisitPost(Node) {}
256
257 // ToThrift converts a GroupNode to a slice of SchemaElements which is used
258 // for thrift serialization.
259 func ToThrift(schema *GroupNode) []*format.SchemaElement {
260 t := &toThriftVisitor{make([]*format.SchemaElement, 0)}
261 schema.Visit(t)
262 return t.elements
263 }
264
265 type schemaPrinter struct {
266 w io.Writer
267 indent int
268 indentWidth int
269 }
270
271 func (s *schemaPrinter) VisitPre(n Node) bool {
272 fmt.Fprint(s.w, strings.Repeat(" ", s.indent))
273 if n.Type() == Group {
274 g := n.(*GroupNode)
275 fmt.Fprintf(s.w, "%s group field_id=%d %s", g.RepetitionType(), g.FieldID(), g.Name())
276 _, invalid := g.logicalType.(UnknownLogicalType)
277 _, none := g.logicalType.(NoLogicalType)
278
279 if g.logicalType != nil && !invalid && !none {
280 fmt.Fprintf(s.w, " (%s)", g.logicalType)
281 } else if g.convertedType != ConvertedTypes.None {
282 fmt.Fprintf(s.w, " (%s)", g.convertedType)
283 }
284
285 fmt.Fprintln(s.w, " {")
286 s.indent += s.indentWidth
287 } else {
288 p := n.(*PrimitiveNode)
289 fmt.Fprintf(s.w, "%s %s field_id=%d %s", p.RepetitionType(), strings.ToLower(p.PhysicalType().String()), p.FieldID(), p.Name())
290 _, invalid := p.logicalType.(UnknownLogicalType)
291 _, none := p.logicalType.(NoLogicalType)
292
293 if p.logicalType != nil && !invalid && !none {
294 fmt.Fprintf(s.w, " (%s)", p.logicalType)
295 } else if p.convertedType == ConvertedTypes.Decimal {
296 fmt.Fprintf(s.w, " (%s(%d,%d))", p.convertedType, p.DecimalMetadata().Precision, p.DecimalMetadata().Scale)
297 } else if p.convertedType != ConvertedTypes.None {
298 fmt.Fprintf(s.w, " (%s)", p.convertedType)
299 }
300 fmt.Fprintln(s.w, ";")
301 }
302 return true
303 }
304
305 func (s *schemaPrinter) VisitPost(n Node) {
306 if n.Type() == Group {
307 s.indent -= s.indentWidth
308 fmt.Fprint(s.w, strings.Repeat(" ", s.indent))
309 fmt.Fprintln(s.w, "}")
310 }
311 }
312
313 // PrintSchema writes a string representation of the tree to w using the indent
314 // width provided.
315 func PrintSchema(n Node, w io.Writer, indentWidth int) {
316 n.Visit(&schemaPrinter{w, 0, indentWidth})
317 }
318
319 type strIntMultimap map[string][]int
320
321 func (f strIntMultimap) Add(key string, val int) bool {
322 if _, ok := f[key]; !ok {
323 f[key] = []int{val}
324 return false
325 }
326 f[key] = append(f[key], val)
327 return true
328 }