]>
Commit | Line | Data |
---|---|---|
1 | // Licensed to the Apache Software Foundation (ASF) under one | |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, software | |
12 | // distributed under the License is distributed on an "AS IS" BASIS, | |
13 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | // See the License for the specific language governing permissions and | |
15 | // limitations under the License. | |
16 | ||
17 | package schema | |
18 | ||
19 | import ( | |
20 | "reflect" | |
21 | "strconv" | |
22 | "strings" | |
23 | ||
24 | "github.com/apache/arrow/go/v6/parquet" | |
25 | format "github.com/apache/arrow/go/v6/parquet/internal/gen-go/parquet" | |
26 | "golang.org/x/xerrors" | |
27 | ) | |
28 | ||
29 | type taggedInfo struct { | |
30 | Name string | |
31 | ||
32 | Type parquet.Type | |
33 | KeyType parquet.Type | |
34 | ValueType parquet.Type | |
35 | ||
36 | Length int32 | |
37 | KeyLength int32 | |
38 | ValueLength int32 | |
39 | ||
40 | Scale int32 | |
41 | KeyScale int32 | |
42 | ValueScale int32 | |
43 | ||
44 | Precision int32 | |
45 | KeyPrecision int32 | |
46 | ValuePrecision int32 | |
47 | ||
48 | FieldID int32 | |
49 | KeyFieldID int32 | |
50 | ValueFieldID int32 | |
51 | ||
52 | RepetitionType parquet.Repetition | |
53 | ValueRepetition parquet.Repetition | |
54 | ||
55 | Converted ConvertedType | |
56 | KeyConverted ConvertedType | |
57 | ValueConverted ConvertedType | |
58 | ||
59 | LogicalFields map[string]string | |
60 | KeyLogicalFields map[string]string | |
61 | ValueLogicalFields map[string]string | |
62 | ||
63 | LogicalType LogicalType | |
64 | KeyLogicalType LogicalType | |
65 | ValueLogicalType LogicalType | |
66 | } | |
67 | ||
68 | func (t *taggedInfo) CopyForKey() (ret taggedInfo) { | |
69 | ret = *t | |
70 | ret.Type = t.KeyType | |
71 | ret.Length = t.KeyLength | |
72 | ret.Scale = t.KeyScale | |
73 | ret.Precision = t.KeyPrecision | |
74 | ret.FieldID = t.KeyFieldID | |
75 | ret.RepetitionType = parquet.Repetitions.Required | |
76 | ret.Converted = t.KeyConverted | |
77 | ret.LogicalType = t.KeyLogicalType | |
78 | return | |
79 | } | |
80 | ||
81 | func (t *taggedInfo) CopyForValue() (ret taggedInfo) { | |
82 | ret = *t | |
83 | ret.Type = t.ValueType | |
84 | ret.Length = t.ValueLength | |
85 | ret.Scale = t.ValueScale | |
86 | ret.Precision = t.ValuePrecision | |
87 | ret.FieldID = t.ValueFieldID | |
88 | ret.RepetitionType = t.ValueRepetition | |
89 | ret.Converted = t.ValueConverted | |
90 | ret.LogicalType = t.ValueLogicalType | |
91 | return | |
92 | } | |
93 | ||
94 | func (t *taggedInfo) UpdateLogicalTypes() { | |
95 | processLogicalType := func(fields map[string]string, precision, scale int32) LogicalType { | |
96 | t, ok := fields["type"] | |
97 | if !ok { | |
98 | return NoLogicalType{} | |
99 | } | |
100 | ||
101 | switch strings.ToLower(t) { | |
102 | case "string": | |
103 | return StringLogicalType{} | |
104 | case "map": | |
105 | return MapLogicalType{} | |
106 | case "list": | |
107 | return ListLogicalType{} | |
108 | case "enum": | |
109 | return EnumLogicalType{} | |
110 | case "decimal": | |
111 | if v, ok := fields["precision"]; ok { | |
112 | precision = int32FromType(v) | |
113 | } | |
114 | if v, ok := fields["scale"]; ok { | |
115 | scale = int32FromType(v) | |
116 | } | |
117 | return NewDecimalLogicalType(precision, scale) | |
118 | case "date": | |
119 | return DateLogicalType{} | |
120 | case "time": | |
121 | unit, ok := fields["unit"] | |
122 | if !ok { | |
123 | panic("must specify unit for time logical type") | |
124 | } | |
125 | adjustedToUtc, ok := fields["isadjustedutc"] | |
126 | if !ok { | |
127 | adjustedToUtc = "true" | |
128 | } | |
129 | return NewTimeLogicalType(boolFromStr(adjustedToUtc), timeUnitFromString(strings.ToLower(unit))) | |
130 | case "timestamp": | |
131 | unit, ok := fields["unit"] | |
132 | if !ok { | |
133 | panic("must specify unit for time logical type") | |
134 | } | |
135 | adjustedToUtc, ok := fields["isadjustedutc"] | |
136 | if !ok { | |
137 | adjustedToUtc = "true" | |
138 | } | |
139 | return NewTimestampLogicalType(boolFromStr(adjustedToUtc), timeUnitFromString(unit)) | |
140 | case "integer": | |
141 | width, ok := fields["bitwidth"] | |
142 | if !ok { | |
143 | panic("must specify bitwidth if explicitly setting integer logical type") | |
144 | } | |
145 | signed, ok := fields["signed"] | |
146 | if !ok { | |
147 | signed = "true" | |
148 | } | |
149 | ||
150 | return NewIntLogicalType(int8(int32FromType(width)), boolFromStr(signed)) | |
151 | case "null": | |
152 | return NullLogicalType{} | |
153 | case "json": | |
154 | return JSONLogicalType{} | |
155 | case "bson": | |
156 | return BSONLogicalType{} | |
157 | case "uuid": | |
158 | return UUIDLogicalType{} | |
159 | default: | |
160 | panic(xerrors.Errorf("invalid logical type specified: %s", t)) | |
161 | } | |
162 | } | |
163 | ||
164 | t.LogicalType = processLogicalType(t.LogicalFields, t.Precision, t.Scale) | |
165 | t.KeyLogicalType = processLogicalType(t.KeyLogicalFields, t.KeyPrecision, t.KeyScale) | |
166 | t.ValueLogicalType = processLogicalType(t.ValueLogicalFields, t.ValuePrecision, t.ValueScale) | |
167 | } | |
168 | ||
169 | func newTaggedInfo() taggedInfo { | |
170 | return taggedInfo{ | |
171 | Type: parquet.Types.Undefined, | |
172 | KeyType: parquet.Types.Undefined, | |
173 | ValueType: parquet.Types.Undefined, | |
174 | RepetitionType: parquet.Repetitions.Undefined, | |
175 | ValueRepetition: parquet.Repetitions.Undefined, | |
176 | Converted: ConvertedTypes.NA, | |
177 | KeyConverted: ConvertedTypes.NA, | |
178 | ValueConverted: ConvertedTypes.NA, | |
179 | FieldID: -1, | |
180 | KeyFieldID: -1, | |
181 | ValueFieldID: -1, | |
182 | LogicalFields: make(map[string]string), | |
183 | KeyLogicalFields: make(map[string]string), | |
184 | ValueLogicalFields: make(map[string]string), | |
185 | LogicalType: NoLogicalType{}, | |
186 | KeyLogicalType: NoLogicalType{}, | |
187 | ValueLogicalType: NoLogicalType{}, | |
188 | } | |
189 | } | |
190 | ||
191 | var int32FromType = func(v string) int32 { | |
192 | val, err := strconv.Atoi(v) | |
193 | if err != nil { | |
194 | panic(err) | |
195 | } | |
196 | return int32(val) | |
197 | } | |
198 | ||
199 | var boolFromStr = func(v string) bool { | |
200 | val, err := strconv.ParseBool(v) | |
201 | if err != nil { | |
202 | panic(err) | |
203 | } | |
204 | return val | |
205 | } | |
206 | ||
207 | func infoFromTags(f reflect.StructTag) *taggedInfo { | |
208 | typeFromStr := func(v string) parquet.Type { | |
209 | t, err := format.TypeFromString(strings.ToUpper(v)) | |
210 | if err != nil { | |
211 | panic(xerrors.Errorf("invalid type specified: %s", v)) | |
212 | } | |
213 | return parquet.Type(t) | |
214 | } | |
215 | ||
216 | repFromStr := func(v string) parquet.Repetition { | |
217 | r, err := format.FieldRepetitionTypeFromString(strings.ToUpper(v)) | |
218 | if err != nil { | |
219 | panic(err) | |
220 | } | |
221 | return parquet.Repetition(r) | |
222 | } | |
223 | ||
224 | convertedFromStr := func(v string) ConvertedType { | |
225 | c, err := format.ConvertedTypeFromString(strings.ToUpper(v)) | |
226 | if err != nil { | |
227 | panic(err) | |
228 | } | |
229 | return ConvertedType(c) | |
230 | } | |
231 | ||
232 | if ptags, ok := f.Lookup("parquet"); ok { | |
233 | info := newTaggedInfo() | |
234 | for _, tag := range strings.Split(strings.Replace(ptags, "\t", "", -1), ",") { | |
235 | tag = strings.TrimSpace(tag) | |
236 | kv := strings.SplitN(tag, "=", 2) | |
237 | key := strings.TrimSpace(strings.ToLower(kv[0])) | |
238 | value := strings.TrimSpace(kv[1]) | |
239 | ||
240 | switch key { | |
241 | case "name": | |
242 | info.Name = value | |
243 | case "type": | |
244 | info.Type = typeFromStr(value) | |
245 | case "keytype": | |
246 | info.KeyType = typeFromStr(value) | |
247 | case "valuetype": | |
248 | info.ValueType = typeFromStr(value) | |
249 | case "length": | |
250 | info.Length = int32FromType(value) | |
251 | case "keylength": | |
252 | info.KeyLength = int32FromType(value) | |
253 | case "valuelength": | |
254 | info.ValueLength = int32FromType(value) | |
255 | case "scale": | |
256 | info.Scale = int32FromType(value) | |
257 | case "keyscale": | |
258 | info.KeyScale = int32FromType(value) | |
259 | case "valuescale": | |
260 | info.ValueScale = int32FromType(value) | |
261 | case "precision": | |
262 | info.Precision = int32FromType(value) | |
263 | case "keyprecision": | |
264 | info.KeyPrecision = int32FromType(value) | |
265 | case "valueprecision": | |
266 | info.ValuePrecision = int32FromType(value) | |
267 | case "fieldid": | |
268 | info.FieldID = int32FromType(value) | |
269 | case "keyfieldid": | |
270 | info.KeyFieldID = int32FromType(value) | |
271 | case "valuefieldid": | |
272 | info.ValueFieldID = int32FromType(value) | |
273 | case "repetition": | |
274 | info.RepetitionType = repFromStr(value) | |
275 | case "valuerepetition": | |
276 | info.ValueRepetition = repFromStr(value) | |
277 | case "converted": | |
278 | info.Converted = convertedFromStr(value) | |
279 | case "keyconverted": | |
280 | info.KeyConverted = convertedFromStr(value) | |
281 | case "valueconverted": | |
282 | info.ValueConverted = convertedFromStr(value) | |
283 | case "logical": | |
284 | info.LogicalFields["type"] = value | |
285 | case "keylogical": | |
286 | info.KeyLogicalFields["type"] = value | |
287 | case "valuelogical": | |
288 | info.ValueLogicalFields["type"] = value | |
289 | default: | |
290 | switch { | |
291 | case strings.HasPrefix(key, "logical."): | |
292 | info.LogicalFields[strings.TrimPrefix(key, "logical.")] = value | |
293 | case strings.HasPrefix(key, "keylogical."): | |
294 | info.KeyLogicalFields[strings.TrimPrefix(key, "keylogical.")] = value | |
295 | case strings.HasPrefix(key, "valuelogical."): | |
296 | info.ValueLogicalFields[strings.TrimPrefix(key, "valuelogical.")] = value | |
297 | } | |
298 | } | |
299 | } | |
300 | info.UpdateLogicalTypes() | |
301 | return &info | |
302 | } | |
303 | return nil | |
304 | } | |
305 | ||
306 | // typeToNode recurseively converts a physical type and the tag info into parquet Nodes | |
307 | // | |
308 | // to avoid having to propagate errors up potentially high numbers of recursive calls | |
309 | // we use panics and then recover in the public function NewSchemaFromStruct so that a | |
310 | // failure very far down the stack quickly unwinds. | |
311 | func typeToNode(name string, typ reflect.Type, repType parquet.Repetition, info *taggedInfo) Node { | |
312 | // set up our default values for everything | |
313 | var ( | |
314 | converted = ConvertedTypes.None | |
315 | logical LogicalType = NoLogicalType{} | |
316 | fieldID = int32(-1) | |
317 | physical = parquet.Types.Undefined | |
318 | typeLen = 0 | |
319 | precision = 0 | |
320 | scale = 0 | |
321 | ) | |
322 | if info != nil { // we have struct tag info to process | |
323 | fieldID = info.FieldID | |
324 | if info.Converted != ConvertedTypes.NA { | |
325 | converted = info.Converted | |
326 | } | |
327 | logical = info.LogicalType | |
328 | physical = info.Type | |
329 | typeLen = int(info.Length) | |
330 | precision = int(info.Precision) | |
331 | scale = int(info.Scale) | |
332 | ||
333 | if info.Name != "" { | |
334 | name = info.Name | |
335 | } | |
336 | if info.RepetitionType != parquet.Repetitions.Undefined { | |
337 | repType = info.RepetitionType | |
338 | } | |
339 | } | |
340 | ||
341 | // simplify the logic by switching based on the reflection Kind | |
342 | switch typ.Kind() { | |
343 | case reflect.Map: | |
344 | // a map must have a logical type of MAP or have no tag for logical type in which case | |
345 | // we assume MAP logical type. | |
346 | if !logical.IsNone() && !logical.Equals(MapLogicalType{}) { | |
347 | panic("cannot set logical type to something other than map for a map") | |
348 | } | |
349 | ||
350 | infoCopy := newTaggedInfo() | |
351 | if info != nil { // populate any value specific tags to propagate for the value type | |
352 | infoCopy = info.CopyForValue() | |
353 | } | |
354 | ||
355 | // create the node for the value type of the map | |
356 | value := typeToNode("value", typ.Elem(), parquet.Repetitions.Required, &infoCopy) | |
357 | if info != nil { // change our copy to now use the key specific tags if they exist | |
358 | infoCopy = info.CopyForKey() | |
359 | } | |
360 | ||
361 | // create the node for the key type of the map | |
362 | key := typeToNode("key", typ.Key(), parquet.Repetitions.Required, &infoCopy) | |
363 | if key.RepetitionType() != parquet.Repetitions.Required { // key cannot be optional | |
364 | panic("key type of map must be Required") | |
365 | } | |
366 | return Must(MapOf(name, key, value, repType, fieldID)) | |
367 | case reflect.Struct: | |
368 | // structs are Group nodes | |
369 | fields := make(FieldList, 0) | |
370 | for i := 0; i < typ.NumField(); i++ { | |
371 | f := typ.Field(i) | |
372 | ||
373 | fields = append(fields, typeToNode(f.Name, f.Type, parquet.Repetitions.Required, infoFromTags(f.Tag))) | |
374 | } | |
375 | // group nodes don't have a physical type | |
376 | if physical != parquet.Types.Undefined { | |
377 | panic("cannot specify custom type on struct") | |
378 | } | |
379 | // group nodes don't have converted or logical types | |
380 | if converted != ConvertedTypes.None { | |
381 | panic("cannot specify converted types for a struct") | |
382 | } | |
383 | if !logical.IsNone() { | |
384 | panic("cannot specify logicaltype for a struct") | |
385 | } | |
386 | return Must(NewGroupNode(name, repType, fields, fieldID)) | |
387 | case reflect.Ptr: // if we encounter a pointer create a node for the type it points to, but mark it as optional | |
388 | return typeToNode(name, typ.Elem(), parquet.Repetitions.Optional, info) | |
389 | case reflect.Array: | |
390 | // arrays are repeated or fixed size | |
391 | if typ == reflect.TypeOf(parquet.Int96{}) { | |
392 | return NewInt96Node(name, repType, fieldID) | |
393 | } | |
394 | ||
395 | if typ.Elem() == reflect.TypeOf(byte(0)) { // something like [12]byte translates to FixedLenByteArray with length 12 | |
396 | if physical == parquet.Types.Undefined { | |
397 | physical = parquet.Types.FixedLenByteArray | |
398 | } | |
399 | if typeLen == 0 { // if there was no type length specified in the tag, use the length of the type. | |
400 | typeLen = typ.Len() | |
401 | } | |
402 | if !logical.IsNone() { | |
403 | return MustPrimitive(NewPrimitiveNodeLogical(name, repType, logical, physical, typeLen, fieldID)) | |
404 | } | |
405 | return MustPrimitive(NewPrimitiveNodeConverted(name, repType, physical, converted, typeLen, precision, scale, fieldID)) | |
406 | } | |
407 | fallthrough // if it's not a fixed len byte array type, then just treat it like a slice | |
408 | case reflect.Slice: | |
409 | // for slices, we default to treating them as lists unless the repetition type is set to REPEATED or they are | |
410 | // a bytearray/fixedlenbytearray | |
411 | switch { | |
412 | case repType == parquet.Repetitions.Repeated: | |
413 | return typeToNode(name, typ.Elem(), parquet.Repetitions.Repeated, info) | |
414 | case physical == parquet.Types.FixedLenByteArray || physical == parquet.Types.ByteArray: | |
415 | if typ.Elem() != reflect.TypeOf(byte(0)) { | |
416 | panic("slice with physical type ByteArray or FixedLenByteArray must be []byte") | |
417 | } | |
418 | fallthrough | |
419 | case typ.Elem() == reflect.TypeOf(byte(0)): | |
420 | if physical == parquet.Types.Undefined { | |
421 | physical = parquet.Types.ByteArray | |
422 | } | |
423 | if !logical.IsNone() { | |
424 | return MustPrimitive(NewPrimitiveNodeLogical(name, repType, logical, physical, typeLen, fieldID)) | |
425 | } | |
426 | return MustPrimitive(NewPrimitiveNodeConverted(name, repType, physical, converted, typeLen, precision, scale, fieldID)) | |
427 | default: | |
428 | var elemInfo *taggedInfo | |
429 | if info != nil { | |
430 | elemInfo = &taggedInfo{} | |
431 | *elemInfo = info.CopyForValue() | |
432 | } | |
433 | ||
434 | if !logical.IsNone() && !logical.Equals(ListLogicalType{}) { | |
435 | panic("slice must either be repeated or a List type") | |
436 | } | |
437 | if converted != ConvertedTypes.None && converted != ConvertedTypes.List { | |
438 | panic("slice must either be repeated or a List type") | |
439 | } | |
440 | return Must(ListOf(typeToNode(name, typ.Elem(), parquet.Repetitions.Required, elemInfo), repType, fieldID)) | |
441 | } | |
442 | case reflect.String: | |
443 | // strings are byte arrays or fixedlen byte array | |
444 | t := parquet.Types.ByteArray | |
445 | switch physical { | |
446 | case parquet.Types.Undefined, parquet.Types.ByteArray: | |
447 | case parquet.Types.FixedLenByteArray: | |
448 | t = parquet.Types.FixedLenByteArray | |
449 | default: | |
450 | panic("string fields should be of type bytearray or fixedlenbytearray only") | |
451 | } | |
452 | ||
453 | if !logical.IsNone() { | |
454 | return MustPrimitive(NewPrimitiveNodeLogical(name, repType, logical, t, typeLen, fieldID)) | |
455 | } | |
456 | ||
457 | return MustPrimitive(NewPrimitiveNodeConverted(name, repType, t, converted, typeLen, precision, scale, fieldID)) | |
458 | case reflect.Int, reflect.Int32, reflect.Int8, reflect.Int16, reflect.Int64: | |
459 | // handle integer types, default to setting the corresponding logical type | |
460 | ptyp := parquet.Types.Int32 | |
461 | if typ.Bits() == 64 { | |
462 | ptyp = parquet.Types.Int64 | |
463 | } | |
464 | ||
465 | if physical != parquet.Types.Undefined { | |
466 | ptyp = physical | |
467 | } | |
468 | ||
469 | if !logical.IsNone() { | |
470 | return MustPrimitive(NewPrimitiveNodeLogical(name, repType, logical, ptyp, typeLen, fieldID)) | |
471 | } | |
472 | ||
473 | bitwidth := int8(typ.Bits()) | |
474 | if physical != parquet.Types.Undefined { | |
475 | if ptyp == parquet.Types.Int32 { | |
476 | bitwidth = 32 | |
477 | } else if ptyp == parquet.Types.Int64 { | |
478 | bitwidth = 64 | |
479 | } | |
480 | } | |
481 | ||
482 | if converted != ConvertedTypes.None { | |
483 | return MustPrimitive(NewPrimitiveNodeConverted(name, repType, ptyp, converted, 0, precision, scale, fieldID)) | |
484 | } | |
485 | ||
486 | return MustPrimitive(NewPrimitiveNodeLogical(name, repType, NewIntLogicalType(bitwidth, true), ptyp, 0, fieldID)) | |
487 | case reflect.Uint, reflect.Uint32, reflect.Uint8, reflect.Uint16, reflect.Uint64: | |
488 | // handle unsigned integer types and default to the corresponding logical type for it. | |
489 | ptyp := parquet.Types.Int32 | |
490 | if typ.Bits() == 64 { | |
491 | ptyp = parquet.Types.Int64 | |
492 | } | |
493 | ||
494 | if physical != parquet.Types.Undefined { | |
495 | ptyp = physical | |
496 | } | |
497 | ||
498 | if !logical.IsNone() { | |
499 | return MustPrimitive(NewPrimitiveNodeLogical(name, repType, logical, ptyp, typeLen, fieldID)) | |
500 | } | |
501 | ||
502 | bitwidth := int8(typ.Bits()) | |
503 | if physical != parquet.Types.Undefined { | |
504 | if ptyp == parquet.Types.Int32 { | |
505 | bitwidth = 32 | |
506 | } else if ptyp == parquet.Types.Int64 { | |
507 | bitwidth = 64 | |
508 | } | |
509 | } | |
510 | ||
511 | if converted != ConvertedTypes.None { | |
512 | return MustPrimitive(NewPrimitiveNodeConverted(name, repType, ptyp, converted, 0, precision, scale, fieldID)) | |
513 | } | |
514 | ||
515 | return MustPrimitive(NewPrimitiveNodeLogical(name, repType, NewIntLogicalType(bitwidth, false), ptyp, 0, fieldID)) | |
516 | case reflect.Bool: | |
517 | if !logical.IsNone() { | |
518 | return MustPrimitive(NewPrimitiveNodeLogical(name, repType, logical, parquet.Types.Boolean, typeLen, fieldID)) | |
519 | } | |
520 | return MustPrimitive(NewPrimitiveNodeConverted(name, repType, parquet.Types.Boolean, converted, typeLen, precision, scale, fieldID)) | |
521 | case reflect.Float32: | |
522 | if !logical.IsNone() { | |
523 | return MustPrimitive(NewPrimitiveNodeLogical(name, repType, logical, parquet.Types.Float, typeLen, fieldID)) | |
524 | } | |
525 | return MustPrimitive(NewPrimitiveNodeConverted(name, repType, parquet.Types.Float, converted, typeLen, precision, scale, fieldID)) | |
526 | case reflect.Float64: | |
527 | if !logical.IsNone() { | |
528 | return MustPrimitive(NewPrimitiveNodeLogical(name, repType, logical, parquet.Types.Double, typeLen, fieldID)) | |
529 | } | |
530 | return MustPrimitive(NewPrimitiveNodeConverted(name, repType, parquet.Types.Double, converted, typeLen, precision, scale, fieldID)) | |
531 | } | |
532 | return nil | |
533 | } | |
534 | ||
535 | // NewSchemaFromStruct generates a schema from an object type via reflection of | |
536 | // the type and reading struct tags for "parquet". | |
537 | // | |
538 | // Rules | |
539 | // | |
540 | // Everything defaults to Required repetition, unless otherwise specified. | |
541 | // Pointer types become Optional repetition. | |
542 | // Arrays and Slices become logical List types unless using the tag `repetition=repeated`. | |
543 | // | |
544 | // A length specified byte field (like [5]byte) becomes a fixed_len_byte_array of that length | |
545 | // unless otherwise specified by tags. | |
546 | // | |
547 | // string and []byte both become ByteArray unless otherwise specified. | |
548 | // | |
549 | // Integer types will default to having a logical type of the appropriate bit width | |
550 | // and signedness rather than having no logical type, ie: an int8 will become an int32 | |
551 | // node with logical type Int(bitWidth=8, signed=true). | |
552 | // | |
553 | // Structs will become group nodes with the fields of the struct as the fields of the group, | |
554 | // recursively creating the nodes. | |
555 | // | |
556 | // maps will become appropriate Map structures in the schema of the defined key and values. | |
557 | // | |
558 | // Available Tags | |
559 | // | |
560 | // name: by default the node will have the same name as the field, this tag let's you specify a name | |
561 | // | |
562 | // type: Specify the physical type instead of using the field type | |
563 | // | |
564 | // length: specify the type length of the node, only relevant for fixed_len_byte_array | |
565 | // | |
566 | // scale: specify the scale for a decimal field | |
567 | // | |
568 | // precision: specify the precision for a decimal field | |
569 | // | |
570 | // fieldid: specify the field ID for that node, defaults to -1 which means it is not set in the parquet file. | |
571 | // | |
572 | // repetition: specify the repetition as something other than what is determined by the type | |
573 | // | |
574 | // converted: specify the Converted Type of the field | |
575 | // | |
576 | // logical: specify the logical type of the field, if using decimal then the scale and precision | |
577 | // will be determined by the precision and scale fields, or by the logical.precision / logical.scale fields | |
578 | // with the logical. prefixed versions taking precedence. For Time or Timestamp logical types, | |
579 | // use logical.unit=<millis|micros|nanos> and logical.isadjustedutc=<true|false> to set those. Unit is required | |
580 | // isadjustedutc defaults to true. For Integer logical type, use logical.bitwidth and logical.signed to specify | |
581 | // those values, with bitwidth being required, and signed defaulting to true. | |
582 | // | |
583 | // All tags other than name can use a prefix of "key<tagname>=<value>" to refer to the type of the key for a map | |
584 | // and "value<tagname>=<value>" to refer to the value type of a map or the element of a list (such as the type of a slice) | |
585 | func NewSchemaFromStruct(obj interface{}) (sc *Schema, err error) { | |
586 | ot := reflect.TypeOf(obj) | |
587 | if ot.Kind() == reflect.Ptr { | |
588 | ot = ot.Elem() | |
589 | } | |
590 | ||
591 | // typeToNode uses panics to fail fast / fail early instead of propagating | |
592 | // errors up recursive stacks. so we recover here and return it as an error | |
593 | defer func() { | |
594 | if r := recover(); r != nil { | |
595 | sc = nil | |
596 | switch x := r.(type) { | |
597 | case string: | |
598 | err = xerrors.New(x) | |
599 | case error: | |
600 | err = x | |
601 | default: | |
602 | err = xerrors.New("unknown panic") | |
603 | } | |
604 | } | |
605 | }() | |
606 | ||
607 | root := typeToNode(ot.Name(), ot, parquet.Repetitions.Repeated, nil) | |
608 | return NewSchema(root.(*GroupNode)), nil | |
609 | } | |
610 | ||
611 | var parquetTypeToReflect = map[parquet.Type]reflect.Type{ | |
612 | parquet.Types.Boolean: reflect.TypeOf(true), | |
613 | parquet.Types.Int32: reflect.TypeOf(int32(0)), | |
614 | parquet.Types.Int64: reflect.TypeOf(int64(0)), | |
615 | parquet.Types.Float: reflect.TypeOf(float32(0)), | |
616 | parquet.Types.Double: reflect.TypeOf(float64(0)), | |
617 | parquet.Types.Int96: reflect.TypeOf(parquet.Int96{}), | |
618 | parquet.Types.ByteArray: reflect.TypeOf(parquet.ByteArray{}), | |
619 | parquet.Types.FixedLenByteArray: reflect.TypeOf(parquet.FixedLenByteArray{}), | |
620 | } | |
621 | ||
622 | func typeFromNode(n Node) reflect.Type { | |
623 | switch n.Type() { | |
624 | case Primitive: | |
625 | typ := parquetTypeToReflect[n.(*PrimitiveNode).PhysicalType()] | |
626 | // if a bytearray field is annoted as a String logical type or a UTF8 converted type | |
627 | // then use a string instead of parquet.ByteArray / parquet.FixedLenByteArray which are []byte | |
628 | if n.LogicalType().Equals(StringLogicalType{}) || n.ConvertedType() == ConvertedTypes.UTF8 { | |
629 | typ = reflect.TypeOf(string("")) | |
630 | } | |
631 | ||
632 | if n.RepetitionType() == parquet.Repetitions.Optional { | |
633 | typ = reflect.PtrTo(typ) | |
634 | } else if n.RepetitionType() == parquet.Repetitions.Repeated { | |
635 | typ = reflect.SliceOf(typ) | |
636 | } | |
637 | ||
638 | return typ | |
639 | case Group: | |
640 | gnode := n.(*GroupNode) | |
641 | switch gnode.ConvertedType() { | |
642 | case ConvertedTypes.List: | |
643 | // According to the Parquet Spec, a list should always be a 3-level structure | |
644 | // | |
645 | // <list-repetition> group <name> (LIST) { | |
646 | // repeated group list { | |
647 | // <element-repetition> <element-type> element; | |
648 | // } | |
649 | // } | |
650 | // | |
651 | // Outer-most level must be a group annotated with LIST containing a single field named "list". | |
652 | // this level must be only optional (if the list is nullable) or required | |
653 | // Middle level, named list, must be repeated group with a single field named "element" | |
654 | // "element" field is the lists element type and repetition, which should be only required or optional | |
655 | ||
656 | if gnode.fields.Len() != 1 { | |
657 | panic("invalid list node, should have exactly 1 child.") | |
658 | } | |
659 | ||
660 | if gnode.fields[0].RepetitionType() != parquet.Repetitions.Repeated { | |
661 | panic("invalid list node, child should be repeated") | |
662 | } | |
663 | ||
664 | // it is required that the repeated group of elements is named "list" and it's element | |
665 | // field is named "element", however existing data may not use this so readers shouldn't | |
666 | // enforce them as errors | |
667 | // | |
668 | // Rules for backward compatibility from the parquet spec: | |
669 | // | |
670 | // 1) if the repeated field is not a group, then it's type is the element type and elements | |
671 | // must be required. | |
672 | // 2) if the repeated field is a group with multiple fields, then its type is the element type | |
673 | // and elements must be required. | |
674 | // 3) if the repeated field is a group with one field AND is named either "array" or uses the | |
675 | // LIST-annotated group's name with "_tuple" suffix, then the repeated type is the element | |
676 | // type and the elements must be required. | |
677 | // 4) otherwise, the repeated field's type is the element type with the repeated field's repetition | |
678 | ||
679 | elemMustBeRequired := false | |
680 | addSlice := false | |
681 | var elemType reflect.Type | |
682 | elemNode := gnode.fields[0] | |
683 | switch { | |
684 | case elemNode.Type() == Primitive, | |
685 | elemNode.(*GroupNode).fields.Len() > 1, | |
686 | elemNode.(*GroupNode).fields.Len() == 1 && (elemNode.Name() == "array" || elemNode.Name() == gnode.Name()+"_tuple"): | |
687 | elemMustBeRequired = true | |
688 | elemType = typeFromNode(elemNode) | |
689 | default: | |
690 | addSlice = true | |
691 | elemType = typeFromNode(elemNode.(*GroupNode).fields[0]) | |
692 | } | |
693 | ||
694 | if elemMustBeRequired && elemType.Kind() == reflect.Ptr { | |
695 | elemType = elemType.Elem() | |
696 | } | |
697 | if addSlice { | |
698 | elemType = reflect.SliceOf(elemType) | |
699 | } | |
700 | if gnode.RepetitionType() == parquet.Repetitions.Optional { | |
701 | elemType = reflect.PtrTo(elemType) | |
702 | } | |
703 | return elemType | |
704 | case ConvertedTypes.Map, ConvertedTypes.MapKeyValue: | |
705 | // According to the Parquet Spec, the outer-most level should be | |
706 | // a group containing a single field named "key_value" with repetition | |
707 | // either optional or required for whether or not the map is nullable. | |
708 | // | |
709 | // The key_value middle level *must* be a repeated group with a "key" field | |
710 | // and *optionally* a "value" field | |
711 | // | |
712 | // the "key" field *must* be required and must always exist | |
713 | // | |
714 | // the "value" field can be required or optional or omitted. | |
715 | // | |
716 | // <map-repetition> group <name> (MAP) { | |
717 | // repeated group key_value { | |
718 | // required <key-type> key; | |
719 | // <value-repetition> <value-type> value; | |
720 | // } | |
721 | // } | |
722 | ||
723 | if gnode.fields.Len() != 1 { | |
724 | panic("invalid map node, should have exactly 1 child") | |
725 | } | |
726 | ||
727 | if gnode.fields[0].Type() != Group { | |
728 | panic("invalid map node, child should be a group node") | |
729 | } | |
730 | ||
731 | // that said, this may not be used in existing data and should not be | |
732 | // enforced as errors when reading. | |
733 | // | |
734 | // some data may also incorrectly use MAP_KEY_VALUE instead of MAP | |
735 | // | |
736 | // so any group with MAP_KEY_VALUE that is not contained inside of a "MAP" | |
737 | // group, should be considered equivalent to being a MAP group itself. | |
738 | // | |
739 | // in addition, the fields may not be called "key" and "value" in existing | |
740 | // data, and as such should not be enforced as errors when reading. | |
741 | ||
742 | keyval := gnode.fields[0].(*GroupNode) | |
743 | ||
744 | keyIndex := keyval.FieldIndexByName("key") | |
745 | if keyIndex == -1 { | |
746 | keyIndex = 0 // use first child if there is no child named "key" | |
747 | } | |
748 | ||
749 | keyType := typeFromNode(keyval.fields[keyIndex]) | |
750 | if keyType.Kind() == reflect.Ptr { | |
751 | keyType = keyType.Elem() | |
752 | } | |
753 | // can't use a []byte as a key for a map, so use string | |
754 | if keyType == reflect.TypeOf(parquet.ByteArray{}) || keyType == reflect.TypeOf(parquet.FixedLenByteArray{}) { | |
755 | keyType = reflect.TypeOf(string("")) | |
756 | } | |
757 | ||
758 | // if the value node is omitted, then consider this a "set" and make it a | |
759 | // map[key-type]bool | |
760 | valType := reflect.TypeOf(true) | |
761 | if keyval.fields.Len() > 1 { | |
762 | valIndex := keyval.FieldIndexByName("value") | |
763 | if valIndex == -1 { | |
764 | valIndex = 1 // use second child if there is no child named "value" | |
765 | } | |
766 | ||
767 | valType = typeFromNode(keyval.fields[valIndex]) | |
768 | } | |
769 | ||
770 | mapType := reflect.MapOf(keyType, valType) | |
771 | if gnode.RepetitionType() == parquet.Repetitions.Optional { | |
772 | mapType = reflect.PtrTo(mapType) | |
773 | } | |
774 | return mapType | |
775 | default: | |
776 | fields := []reflect.StructField{} | |
777 | for _, f := range gnode.fields { | |
778 | fields = append(fields, reflect.StructField{ | |
779 | Name: f.Name(), | |
780 | Type: typeFromNode(f), | |
781 | PkgPath: "parquet", | |
782 | }) | |
783 | } | |
784 | ||
785 | structType := reflect.StructOf(fields) | |
786 | if gnode.RepetitionType() == parquet.Repetitions.Repeated { | |
787 | return reflect.SliceOf(structType) | |
788 | } | |
789 | if gnode.RepetitionType() == parquet.Repetitions.Optional { | |
790 | return reflect.PtrTo(structType) | |
791 | } | |
792 | return structType | |
793 | } | |
794 | } | |
795 | panic("what happened?") | |
796 | } | |
797 | ||
798 | // NewStructFromSchema generates a struct type as a reflect.Type from the schema | |
799 | // by using the appropriate physical types and making things either pointers or slices | |
800 | // based on whether they are repeated/optional/required. It does not use the logical | |
801 | // or converted types to change the physical storage so that it is more efficient to use | |
802 | // the resulting type for reading without having to do conversions. | |
803 | // | |
804 | // It will use maps for map types and slices for list types, but otherwise ignores the | |
805 | // converted and logical types of the nodes. Group nodes that are not List or Map will | |
806 | // be nested structs. | |
807 | func NewStructFromSchema(sc *Schema) (t reflect.Type, err error) { | |
808 | defer func() { | |
809 | if r := recover(); r != nil { | |
810 | t = nil | |
811 | switch x := r.(type) { | |
812 | case string: | |
813 | err = xerrors.New(x) | |
814 | case error: | |
815 | err = x | |
816 | default: | |
817 | err = xerrors.New("unknown panic") | |
818 | } | |
819 | } | |
820 | }() | |
821 | ||
822 | t = typeFromNode(sc.root) | |
823 | if t.Kind() == reflect.Slice || t.Kind() == reflect.Ptr { | |
824 | return t.Elem(), nil | |
825 | } | |
826 | return | |
827 | } |