]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / java / adapter / avro / src / main / java / org / apache / arrow / AvroToArrowUtils.java
CommitLineData
1d09f67e
TL
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.arrow;
19
20import static org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE;
21import static org.apache.arrow.vector.types.FloatingPointPrecision.SINGLE;
22
23import java.io.EOFException;
24import java.io.IOException;
25import java.nio.charset.StandardCharsets;
26import java.util.ArrayList;
27import java.util.Arrays;
28import java.util.HashMap;
29import java.util.List;
30import java.util.Map;
31import java.util.Set;
32import java.util.stream.Collectors;
33
34import org.apache.arrow.consumers.AvroArraysConsumer;
35import org.apache.arrow.consumers.AvroBooleanConsumer;
36import org.apache.arrow.consumers.AvroBytesConsumer;
37import org.apache.arrow.consumers.AvroDoubleConsumer;
38import org.apache.arrow.consumers.AvroEnumConsumer;
39import org.apache.arrow.consumers.AvroFixedConsumer;
40import org.apache.arrow.consumers.AvroFloatConsumer;
41import org.apache.arrow.consumers.AvroIntConsumer;
42import org.apache.arrow.consumers.AvroLongConsumer;
43import org.apache.arrow.consumers.AvroMapConsumer;
44import org.apache.arrow.consumers.AvroNullConsumer;
45import org.apache.arrow.consumers.AvroStringConsumer;
46import org.apache.arrow.consumers.AvroStructConsumer;
47import org.apache.arrow.consumers.AvroUnionsConsumer;
48import org.apache.arrow.consumers.CompositeAvroConsumer;
49import org.apache.arrow.consumers.Consumer;
50import org.apache.arrow.consumers.SkipConsumer;
51import org.apache.arrow.consumers.SkipFunction;
52import org.apache.arrow.consumers.logical.AvroDateConsumer;
53import org.apache.arrow.consumers.logical.AvroDecimalConsumer;
54import org.apache.arrow.consumers.logical.AvroTimeMicroConsumer;
55import org.apache.arrow.consumers.logical.AvroTimeMillisConsumer;
56import org.apache.arrow.consumers.logical.AvroTimestampMicrosConsumer;
57import org.apache.arrow.consumers.logical.AvroTimestampMillisConsumer;
58import org.apache.arrow.memory.BufferAllocator;
59import org.apache.arrow.util.Preconditions;
60import org.apache.arrow.vector.BaseIntVector;
61import org.apache.arrow.vector.BigIntVector;
62import org.apache.arrow.vector.BitVector;
63import org.apache.arrow.vector.DateDayVector;
64import org.apache.arrow.vector.DecimalVector;
65import org.apache.arrow.vector.FieldVector;
66import org.apache.arrow.vector.FixedSizeBinaryVector;
67import org.apache.arrow.vector.Float4Vector;
68import org.apache.arrow.vector.Float8Vector;
69import org.apache.arrow.vector.IntVector;
70import org.apache.arrow.vector.NullVector;
71import org.apache.arrow.vector.TimeMicroVector;
72import org.apache.arrow.vector.TimeMilliVector;
73import org.apache.arrow.vector.TimeStampMicroVector;
74import org.apache.arrow.vector.TimeStampMilliVector;
75import org.apache.arrow.vector.VarBinaryVector;
76import org.apache.arrow.vector.VarCharVector;
77import org.apache.arrow.vector.VectorSchemaRoot;
78import org.apache.arrow.vector.complex.ListVector;
79import org.apache.arrow.vector.complex.MapVector;
80import org.apache.arrow.vector.complex.StructVector;
81import org.apache.arrow.vector.complex.UnionVector;
82import org.apache.arrow.vector.dictionary.Dictionary;
83import org.apache.arrow.vector.dictionary.DictionaryEncoder;
84import org.apache.arrow.vector.dictionary.DictionaryProvider;
85import org.apache.arrow.vector.types.DateUnit;
86import org.apache.arrow.vector.types.TimeUnit;
87import org.apache.arrow.vector.types.Types;
88import org.apache.arrow.vector.types.UnionMode;
89import org.apache.arrow.vector.types.pojo.ArrowType;
90import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
91import org.apache.arrow.vector.types.pojo.Field;
92import org.apache.arrow.vector.types.pojo.FieldType;
93import org.apache.arrow.vector.util.JsonStringArrayList;
94import org.apache.arrow.vector.util.ValueVectorUtility;
95import org.apache.avro.LogicalType;
96import org.apache.avro.LogicalTypes;
97import org.apache.avro.Schema;
98import org.apache.avro.Schema.Type;
99import org.apache.avro.io.Decoder;
100
101/**
102 * Class that does most of the work to convert Avro data into Arrow columnar format Vector objects.
103 */
104public class AvroToArrowUtils {
105
106 /**
107 * Creates a {@link Consumer} from the {@link Schema}
108 *
109 <p>This method currently performs following type mapping for Avro data types to corresponding Arrow data types.
110 *
111 * <ul>
112 * <li>STRING --> ArrowType.Utf8</li>
113 * <li>INT --> ArrowType.Int(32, signed)</li>
114 * <li>LONG --> ArrowType.Int(64, signed)</li>
115 * <li>FLOAT --> ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)</li>
116 * <li>DOUBLE --> ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)</li>
117 * <li>BOOLEAN --> ArrowType.Bool</li>
118 * <li>BYTES --> ArrowType.Binary</li>
119 * <li>ARRAY --> ArrowType.List</li>
120 * <li>MAP --> ArrowType.Map</li>
121 * <li>FIXED --> ArrowType.FixedSizeBinary</li>
122 * <li>RECORD --> ArrowType.Struct</li>
123 * <li>UNION --> ArrowType.Union</li>
124 * <li>ENUM--> ArrowType.Int</li>
125 * <li>DECIMAL --> ArrowType.Decimal</li>
126 * <li>Date --> ArrowType.Date(DateUnit.DAY)</li>
127 * <li>TimeMillis --> ArrowType.Time(TimeUnit.MILLISECOND, 32)</li>
128 * <li>TimeMicros --> ArrowType.Time(TimeUnit.MICROSECOND, 64)</li>
129 * <li>TimestampMillis --> ArrowType.Timestamp(TimeUnit.MILLISECOND, null)</li>
130 * <li>TimestampMicros --> ArrowType.Timestamp(TimeUnit.MICROSECOND, null)</li>
131 * </ul>
132 */
133
134 private static Consumer createConsumer(Schema schema, String name, AvroToArrowConfig config) {
135 return createConsumer(schema, name, false, config, null);
136 }
137
138 private static Consumer createConsumer(Schema schema, String name, AvroToArrowConfig config, FieldVector vector) {
139 return createConsumer(schema, name, false, config, vector);
140 }
141
142 /**
143 * Create a consumer with the given Avro schema.
144 *
145 * @param schema avro schema
146 * @param name arrow field name
147 * @param consumerVector vector to keep in consumer, if v == null, will create a new vector via field.
148 * @return consumer
149 */
150 private static Consumer createConsumer(
151 Schema schema,
152 String name,
153 boolean nullable,
154 AvroToArrowConfig config,
155 FieldVector consumerVector) {
156
157 Preconditions.checkNotNull(schema, "Avro schema object can't be null");
158 Preconditions.checkNotNull(config, "Config can't be null");
159
160 final BufferAllocator allocator = config.getAllocator();
161
162 final Type type = schema.getType();
163 final LogicalType logicalType = schema.getLogicalType();
164
165 final ArrowType arrowType;
166 final FieldType fieldType;
167 final FieldVector vector;
168 final Consumer consumer;
169
170 switch (type) {
171 case UNION:
172 consumer = createUnionConsumer(schema, name, config, consumerVector);
173 break;
174 case ARRAY:
175 consumer = createArrayConsumer(schema, name, config, consumerVector);
176 break;
177 case MAP:
178 consumer = createMapConsumer(schema, name, config, consumerVector);
179 break;
180 case RECORD:
181 consumer = createStructConsumer(schema, name, config, consumerVector);
182 break;
183 case ENUM:
184 consumer = createEnumConsumer(schema, name, config, consumerVector);
185 break;
186 case STRING:
187 arrowType = new ArrowType.Utf8();
188 fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
189 vector = createVector(consumerVector, fieldType, name, allocator);
190 consumer = new AvroStringConsumer((VarCharVector) vector);
191 break;
192 case FIXED:
193 Map<String, String> extProps = createExternalProps(schema);
194 if (logicalType instanceof LogicalTypes.Decimal) {
195 arrowType = createDecimalArrowType((LogicalTypes.Decimal) logicalType);
196 fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema, extProps));
197 vector = createVector(consumerVector, fieldType, name, allocator);
198 consumer = new AvroDecimalConsumer.FixedDecimalConsumer((DecimalVector) vector, schema.getFixedSize());
199 } else {
200 arrowType = new ArrowType.FixedSizeBinary(schema.getFixedSize());
201 fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema, extProps));
202 vector = createVector(consumerVector, fieldType, name, allocator);
203 consumer = new AvroFixedConsumer((FixedSizeBinaryVector) vector, schema.getFixedSize());
204 }
205 break;
206 case INT:
207 if (logicalType instanceof LogicalTypes.Date) {
208 arrowType = new ArrowType.Date(DateUnit.DAY);
209 fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
210 vector = createVector(consumerVector, fieldType, name, allocator);
211 consumer = new AvroDateConsumer((DateDayVector) vector);
212 } else if (logicalType instanceof LogicalTypes.TimeMillis) {
213 arrowType = new ArrowType.Time(TimeUnit.MILLISECOND, 32);
214 fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
215 vector = createVector(consumerVector, fieldType, name, allocator);
216 consumer = new AvroTimeMillisConsumer((TimeMilliVector) vector);
217 } else {
218 arrowType = new ArrowType.Int(32, /*signed=*/true);
219 fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
220 vector = createVector(consumerVector, fieldType, name, allocator);
221 consumer = new AvroIntConsumer((IntVector) vector);
222 }
223 break;
224 case BOOLEAN:
225 arrowType = new ArrowType.Bool();
226 fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
227 vector = createVector(consumerVector, fieldType, name, allocator);
228 consumer = new AvroBooleanConsumer((BitVector) vector);
229 break;
230 case LONG:
231 if (logicalType instanceof LogicalTypes.TimeMicros) {
232 arrowType = new ArrowType.Time(TimeUnit.MICROSECOND, 64);
233 fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
234 vector = createVector(consumerVector, fieldType, name, allocator);
235 consumer = new AvroTimeMicroConsumer((TimeMicroVector) vector);
236 } else if (logicalType instanceof LogicalTypes.TimestampMillis) {
237 arrowType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, null);
238 fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
239 vector = createVector(consumerVector, fieldType, name, allocator);
240 consumer = new AvroTimestampMillisConsumer((TimeStampMilliVector) vector);
241 } else if (logicalType instanceof LogicalTypes.TimestampMicros) {
242 arrowType = new ArrowType.Timestamp(TimeUnit.MICROSECOND, null);
243 fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
244 vector = createVector(consumerVector, fieldType, name, allocator);
245 consumer = new AvroTimestampMicrosConsumer((TimeStampMicroVector) vector);
246 } else {
247 arrowType = new ArrowType.Int(64, /*signed=*/true);
248 fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
249 vector = createVector(consumerVector, fieldType, name, allocator);
250 consumer = new AvroLongConsumer((BigIntVector) vector);
251 }
252 break;
253 case FLOAT:
254 arrowType = new ArrowType.FloatingPoint(SINGLE);
255 fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
256 vector = createVector(consumerVector, fieldType, name, allocator);
257 consumer = new AvroFloatConsumer((Float4Vector) vector);
258 break;
259 case DOUBLE:
260 arrowType = new ArrowType.FloatingPoint(DOUBLE);
261 fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
262 vector = createVector(consumerVector, fieldType, name, allocator);
263 consumer = new AvroDoubleConsumer((Float8Vector) vector);
264 break;
265 case BYTES:
266 if (logicalType instanceof LogicalTypes.Decimal) {
267 arrowType = createDecimalArrowType((LogicalTypes.Decimal) logicalType);
268 fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
269 vector = createVector(consumerVector, fieldType, name, allocator);
270 consumer = new AvroDecimalConsumer.BytesDecimalConsumer((DecimalVector) vector);
271 } else {
272 arrowType = new ArrowType.Binary();
273 fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
274 vector = createVector(consumerVector, fieldType, name, allocator);
275 consumer = new AvroBytesConsumer((VarBinaryVector) vector);
276 }
277 break;
278 case NULL:
279 arrowType = new ArrowType.Null();
280 fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
281 vector = fieldType.createNewSingleVector(name, allocator, /*schemaCallback=*/null);
282 consumer = new AvroNullConsumer((NullVector) vector);
283 break;
284 default:
285 // no-op, shouldn't get here
286 throw new UnsupportedOperationException("Can't convert avro type %s to arrow type." + type.getName());
287 }
288 return consumer;
289 }
290
291 private static ArrowType createDecimalArrowType(LogicalTypes.Decimal logicalType) {
292 final int scale = logicalType.getScale();
293 final int precision = logicalType.getPrecision();
294 Preconditions.checkArgument(precision > 0 && precision <= 38,
295 "Precision must be in range of 1 to 38");
296 Preconditions.checkArgument(scale >= 0 && scale <= 38,
297 "Scale must be in range of 0 to 38.");
298 Preconditions.checkArgument(scale <= precision,
299 "Invalid decimal scale: %s (greater than precision: %s)", scale, precision);
300
301 return new ArrowType.Decimal(precision, scale, 128);
302
303 }
304
305 private static Consumer createSkipConsumer(Schema schema) {
306
307 SkipFunction skipFunction;
308 Type type = schema.getType();
309
310 switch (type) {
311 case UNION:
312 List<Consumer> unionDelegates = schema.getTypes().stream().map(s ->
313 createSkipConsumer(s)).collect(Collectors.toList());
314 skipFunction = decoder -> unionDelegates.get(decoder.readInt()).consume(decoder);
315
316 break;
317 case ARRAY:
318 Consumer elementDelegate = createSkipConsumer(schema.getElementType());
319 skipFunction = decoder -> {
320 for (long i = decoder.skipArray(); i != 0; i = decoder.skipArray()) {
321 for (long j = 0; j < i; j++) {
322 elementDelegate.consume(decoder);
323 }
324 }
325 };
326 break;
327 case MAP:
328 Consumer valueDelegate = createSkipConsumer(schema.getValueType());
329 skipFunction = decoder -> {
330 for (long i = decoder.skipMap(); i != 0; i = decoder.skipMap()) {
331 for (long j = 0; j < i; j++) {
332 decoder.skipString(); // Discard key
333 valueDelegate.consume(decoder);
334 }
335 }
336 };
337 break;
338 case RECORD:
339 List<Consumer> delegates = schema.getFields().stream().map(field ->
340 createSkipConsumer(field.schema())).collect(Collectors.toList());
341
342 skipFunction = decoder -> {
343 for (Consumer consumer : delegates) {
344 consumer.consume(decoder);
345 }
346 };
347
348 break;
349 case ENUM:
350 skipFunction = decoder -> decoder.readEnum();
351 break;
352 case STRING:
353 skipFunction = decoder -> decoder.skipString();
354 break;
355 case FIXED:
356 skipFunction = decoder -> decoder.skipFixed(schema.getFixedSize());
357 break;
358 case INT:
359 skipFunction = decoder -> decoder.readInt();
360 break;
361 case BOOLEAN:
362 skipFunction = decoder -> decoder.skipFixed(1);
363 break;
364 case LONG:
365 skipFunction = decoder -> decoder.readLong();
366 break;
367 case FLOAT:
368 skipFunction = decoder -> decoder.readFloat();
369 break;
370 case DOUBLE:
371 skipFunction = decoder -> decoder.readDouble();
372 break;
373 case BYTES:
374 skipFunction = decoder -> decoder.skipBytes();
375 break;
376 case NULL:
377 skipFunction = decoder -> { };
378 break;
379 default:
380 // no-op, shouldn't get here
381 throw new UnsupportedOperationException("Invalid avro type: " + type.getName());
382 }
383
384 return new SkipConsumer(skipFunction);
385 }
386
387 static CompositeAvroConsumer createCompositeConsumer(
388 Schema schema, AvroToArrowConfig config) {
389
390 List<Consumer> consumers = new ArrayList<>();
391 final Set<String> skipFieldNames = config.getSkipFieldNames();
392
393 Schema.Type type = schema.getType();
394 if (type == Type.RECORD) {
395 for (Schema.Field field : schema.getFields()) {
396 if (skipFieldNames.contains(field.name())) {
397 consumers.add(createSkipConsumer(field.schema()));
398 } else {
399 Consumer consumer = createConsumer(field.schema(), field.name(), config);
400 consumers.add(consumer);
401 }
402
403 }
404 } else {
405 Consumer consumer = createConsumer(schema, "", config);
406 consumers.add(consumer);
407 }
408
409 return new CompositeAvroConsumer(consumers);
410 }
411
412 private static FieldVector createVector(FieldVector consumerVector, FieldType fieldType,
413 String name, BufferAllocator allocator) {
414 return consumerVector != null ? consumerVector : fieldType.createNewSingleVector(name, allocator, null);
415 }
416
417 private static String getDefaultFieldName(ArrowType type) {
418 Types.MinorType minorType = Types.getMinorTypeForArrowType(type);
419 return minorType.name().toLowerCase();
420 }
421
422 private static Field avroSchemaToField(Schema schema, String name, AvroToArrowConfig config) {
423 return avroSchemaToField(schema, name, config, null);
424 }
425
426 private static Field avroSchemaToField(
427 Schema schema,
428 String name,
429 AvroToArrowConfig config,
430 Map<String, String> externalProps) {
431
432 final Type type = schema.getType();
433 final LogicalType logicalType = schema.getLogicalType();
434 final List<Field> children = new ArrayList<>();
435 final FieldType fieldType;
436
437 switch (type) {
438 case UNION:
439 for (int i = 0; i < schema.getTypes().size(); i++) {
440 Schema childSchema = schema.getTypes().get(i);
441 // Union child vector should use default name
442 children.add(avroSchemaToField(childSchema, null, config));
443 }
444 fieldType = createFieldType(new ArrowType.Union(UnionMode.Sparse, null), schema, externalProps);
445 break;
446 case ARRAY:
447 Schema elementSchema = schema.getElementType();
448 children.add(avroSchemaToField(elementSchema, elementSchema.getName(), config));
449 fieldType = createFieldType(new ArrowType.List(), schema, externalProps);
450 break;
451 case MAP:
452 // MapVector internal struct field and key field should be non-nullable
453 FieldType keyFieldType = new FieldType(/*nullable=*/false, new ArrowType.Utf8(), /*dictionary=*/null);
454 Field keyField = new Field("key", keyFieldType, /*children=*/null);
455 Field valueField = avroSchemaToField(schema.getValueType(), "value", config);
456
457 FieldType structFieldType = new FieldType(false, new ArrowType.Struct(), /*dictionary=*/null);
458 Field structField = new Field("internal", structFieldType, Arrays.asList(keyField, valueField));
459 children.add(structField);
460 fieldType = createFieldType(new ArrowType.Map(/*keySorted=*/false), schema, externalProps);
461 break;
462 case RECORD:
463 final Set<String> skipFieldNames = config.getSkipFieldNames();
464 for (int i = 0; i < schema.getFields().size(); i++) {
465 final Schema.Field field = schema.getFields().get(i);
466 Schema childSchema = field.schema();
467 String fullChildName = String.format("%s.%s", name, field.name());
468 if (!skipFieldNames.contains(fullChildName)) {
469 final Map<String, String> extProps = new HashMap<>();
470 String doc = field.doc();
471 Set<String> aliases = field.aliases();
472 if (doc != null) {
473 extProps.put("doc", doc);
474 }
475 if (aliases != null) {
476 extProps.put("aliases", convertAliases(aliases));
477 }
478 children.add(avroSchemaToField(childSchema, fullChildName, config, extProps));
479 }
480 }
481 fieldType = createFieldType(new ArrowType.Struct(), schema, externalProps);
482 break;
483 case ENUM:
484 DictionaryProvider.MapDictionaryProvider provider = config.getProvider();
485 int current = provider.getDictionaryIds().size();
486 int enumCount = schema.getEnumSymbols().size();
487 ArrowType.Int indexType = DictionaryEncoder.getIndexType(enumCount);
488
489 fieldType = createFieldType(indexType, schema, externalProps,
490 new DictionaryEncoding(current, /*ordered=*/false, /*indexType=*/indexType));
491 break;
492
493 case STRING:
494 fieldType = createFieldType(new ArrowType.Utf8(), schema, externalProps);
495 break;
496 case FIXED:
497 final ArrowType fixedArrowType;
498 if (logicalType instanceof LogicalTypes.Decimal) {
499 fixedArrowType = createDecimalArrowType((LogicalTypes.Decimal) logicalType);
500 } else {
501 fixedArrowType = new ArrowType.FixedSizeBinary(schema.getFixedSize());
502 }
503 fieldType = createFieldType(fixedArrowType, schema, externalProps);
504 break;
505 case INT:
506 final ArrowType intArrowType;
507 if (logicalType instanceof LogicalTypes.Date) {
508 intArrowType = new ArrowType.Date(DateUnit.DAY);
509 } else if (logicalType instanceof LogicalTypes.TimeMillis) {
510 intArrowType = new ArrowType.Time(TimeUnit.MILLISECOND, 32);
511 } else {
512 intArrowType = new ArrowType.Int(32, /*signed=*/true);
513 }
514 fieldType = createFieldType(intArrowType, schema, externalProps);
515 break;
516 case BOOLEAN:
517 fieldType = createFieldType(new ArrowType.Bool(), schema, externalProps);
518 break;
519 case LONG:
520 final ArrowType longArrowType;
521 if (logicalType instanceof LogicalTypes.TimeMicros) {
522 longArrowType = new ArrowType.Time(TimeUnit.MICROSECOND, 64);
523 } else if (logicalType instanceof LogicalTypes.TimestampMillis) {
524 longArrowType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, null);
525 } else if (logicalType instanceof LogicalTypes.TimestampMicros) {
526 longArrowType = new ArrowType.Timestamp(TimeUnit.MICROSECOND, null);
527 } else {
528 longArrowType = new ArrowType.Int(64, /*signed=*/true);
529 }
530 fieldType = createFieldType(longArrowType, schema, externalProps);
531 break;
532 case FLOAT:
533 fieldType = createFieldType(new ArrowType.FloatingPoint(SINGLE), schema, externalProps);
534 break;
535 case DOUBLE:
536 fieldType = createFieldType(new ArrowType.FloatingPoint(DOUBLE), schema, externalProps);
537 break;
538 case BYTES:
539 final ArrowType bytesArrowType;
540 if (logicalType instanceof LogicalTypes.Decimal) {
541 bytesArrowType = createDecimalArrowType((LogicalTypes.Decimal) logicalType);
542 } else {
543 bytesArrowType = new ArrowType.Binary();
544 }
545 fieldType = createFieldType(bytesArrowType, schema, externalProps);
546 break;
547 case NULL:
548 fieldType = createFieldType(ArrowType.Null.INSTANCE, schema, externalProps);
549 break;
550 default:
551 // no-op, shouldn't get here
552 throw new UnsupportedOperationException();
553 }
554
555 if (name == null) {
556 name = getDefaultFieldName(fieldType.getType());
557 }
558 return new Field(name, fieldType, children.size() == 0 ? null : children);
559 }
560
561 private static Consumer createArrayConsumer(Schema schema, String name, AvroToArrowConfig config,
562 FieldVector consumerVector) {
563
564 ListVector listVector;
565 if (consumerVector == null) {
566 final Field field = avroSchemaToField(schema, name, config);
567 listVector = (ListVector) field.createVector(config.getAllocator());
568 } else {
569 listVector = (ListVector) consumerVector;
570 }
571
572 FieldVector dataVector = listVector.getDataVector();
573
574 // create delegate
575 Schema childSchema = schema.getElementType();
576 Consumer delegate = createConsumer(childSchema, childSchema.getName(), config, dataVector);
577
578 return new AvroArraysConsumer(listVector, delegate);
579 }
580
581 private static Consumer createStructConsumer(Schema schema, String name, AvroToArrowConfig config,
582 FieldVector consumerVector) {
583
584 final Set<String> skipFieldNames = config.getSkipFieldNames();
585
586 StructVector structVector;
587 if (consumerVector == null) {
588 final Field field = avroSchemaToField(schema, name, config, createExternalProps(schema));
589 structVector = (StructVector) field.createVector(config.getAllocator());
590 } else {
591 structVector = (StructVector) consumerVector;
592 }
593
594 Consumer[] delegates = new Consumer[schema.getFields().size()];
595 int vectorIndex = 0;
596 for (int i = 0; i < schema.getFields().size(); i++) {
597 Schema.Field childField = schema.getFields().get(i);
598 Consumer delegate;
599 // use full name to distinguish fields have same names between parent and child fields.
600 final String fullChildName = String.format("%s.%s", name, childField.name());
601 if (skipFieldNames.contains(fullChildName)) {
602 delegate = createSkipConsumer(childField.schema());
603 } else {
604 delegate = createConsumer(childField.schema(), fullChildName, config,
605 structVector.getChildrenFromFields().get(vectorIndex++));
606 }
607
608 delegates[i] = delegate;
609 }
610
611 return new AvroStructConsumer(structVector, delegates);
612
613 }
614
615 private static Consumer createEnumConsumer(Schema schema, String name, AvroToArrowConfig config,
616 FieldVector consumerVector) {
617
618 BaseIntVector indexVector;
619 if (consumerVector == null) {
620 final Field field = avroSchemaToField(schema, name, config, createExternalProps(schema));
621 indexVector = (BaseIntVector) field.createVector(config.getAllocator());
622 } else {
623 indexVector = (BaseIntVector) consumerVector;
624 }
625
626 final int valueCount = schema.getEnumSymbols().size();
627 VarCharVector dictVector = new VarCharVector(name, config.getAllocator());
628 dictVector.allocateNewSafe();
629 dictVector.setValueCount(valueCount);
630 for (int i = 0; i < valueCount; i++) {
631 dictVector.set(i, schema.getEnumSymbols().get(i).getBytes(StandardCharsets.UTF_8));
632 }
633 Dictionary dictionary =
634 new Dictionary(dictVector, indexVector.getField().getDictionary());
635 config.getProvider().put(dictionary);
636
637 return new AvroEnumConsumer(indexVector);
638
639 }
640
641 private static Consumer createMapConsumer(Schema schema, String name, AvroToArrowConfig config,
642 FieldVector consumerVector) {
643
644 MapVector mapVector;
645 if (consumerVector == null) {
646 final Field field = avroSchemaToField(schema, name, config);
647 mapVector = (MapVector) field.createVector(config.getAllocator());
648 } else {
649 mapVector = (MapVector) consumerVector;
650 }
651
652 // create delegate struct consumer
653 StructVector structVector = (StructVector) mapVector.getDataVector();
654
655 // keys in avro map are always assumed to be strings.
656 Consumer keyConsumer = new AvroStringConsumer(
657 (VarCharVector) structVector.getChildrenFromFields().get(0));
658 Consumer valueConsumer = createConsumer(schema.getValueType(), schema.getValueType().getName(),
659 config, structVector.getChildrenFromFields().get(1));
660
661 AvroStructConsumer internalConsumer =
662 new AvroStructConsumer(structVector, new Consumer[] {keyConsumer, valueConsumer});
663
664 return new AvroMapConsumer(mapVector, internalConsumer);
665 }
666
667 private static Consumer createUnionConsumer(Schema schema, String name, AvroToArrowConfig config,
668 FieldVector consumerVector) {
669 final int size = schema.getTypes().size();
670
671 final boolean nullable = schema.getTypes().stream().anyMatch(t -> t.getType() == Type.NULL);
672
673 UnionVector unionVector;
674 if (consumerVector == null) {
675 final Field field = avroSchemaToField(schema, name, config);
676 unionVector = (UnionVector) field.createVector(config.getAllocator());
677 } else {
678 unionVector = (UnionVector) consumerVector;
679 }
680
681 List<FieldVector> childVectors = unionVector.getChildrenFromFields();
682
683 Consumer[] delegates = new Consumer[size];
684 Types.MinorType[] types = new Types.MinorType[size];
685
686 for (int i = 0; i < size; i++) {
687 FieldVector child = childVectors.get(i);
688 Schema subSchema = schema.getTypes().get(i);
689 Consumer delegate = createConsumer(subSchema, subSchema.getName(), nullable, config, child);
690 delegates[i] = delegate;
691 types[i] = child.getMinorType();
692 }
693 return new AvroUnionsConsumer(unionVector, delegates, types);
694 }
695
696 /**
697 * Read data from {@link Decoder} and generate a {@link VectorSchemaRoot}.
698 * @param schema avro schema
699 * @param decoder avro decoder to read data from
700 */
701 static VectorSchemaRoot avroToArrowVectors(
702 Schema schema,
703 Decoder decoder,
704 AvroToArrowConfig config)
705 throws IOException {
706
707 List<FieldVector> vectors = new ArrayList<>();
708 List<Consumer> consumers = new ArrayList<>();
709 final Set<String> skipFieldNames = config.getSkipFieldNames();
710
711 Schema.Type type = schema.getType();
712 if (type == Type.RECORD) {
713 for (Schema.Field field : schema.getFields()) {
714 if (skipFieldNames.contains(field.name())) {
715 consumers.add(createSkipConsumer(field.schema()));
716 } else {
717 Consumer consumer = createConsumer(field.schema(), field.name(), config);
718 consumers.add(consumer);
719 vectors.add(consumer.getVector());
720 }
721 }
722 } else {
723 Consumer consumer = createConsumer(schema, "", config);
724 consumers.add(consumer);
725 vectors.add(consumer.getVector());
726 }
727
728 long validConsumerCount = consumers.stream().filter(c -> !c.skippable()).count();
729 Preconditions.checkArgument(vectors.size() == validConsumerCount,
730 "vectors size not equals consumers size.");
731
732 List<Field> fields = vectors.stream().map(t -> t.getField()).collect(Collectors.toList());
733
734 VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 0);
735
736 CompositeAvroConsumer compositeConsumer = new CompositeAvroConsumer(consumers);
737
738 int valueCount = 0;
739 try {
740 while (true) {
741 ValueVectorUtility.ensureCapacity(root, valueCount + 1);
742 compositeConsumer.consume(decoder);
743 valueCount++;
744 }
745 } catch (EOFException eof) {
746 // reach the end of encoder stream.
747 root.setRowCount(valueCount);
748 } catch (Exception e) {
749 compositeConsumer.close();
750 throw new UnsupportedOperationException("Error occurs while consume process.", e);
751 }
752
753 return root;
754 }
755
756 private static Map<String, String> getMetaData(Schema schema) {
757 Map<String, String> metadata = new HashMap<>();
758 schema.getObjectProps().forEach((k, v) -> metadata.put(k, v.toString()));
759 return metadata;
760 }
761
762 private static Map<String, String> getMetaData(Schema schema, Map<String, String> externalProps) {
763 Map<String, String> metadata = getMetaData(schema);
764 if (externalProps != null) {
765 metadata.putAll(externalProps);
766 }
767 return metadata;
768 }
769
770 /**
771 * Parse avro attributes and convert them to metadata.
772 */
773 private static Map<String, String> createExternalProps(Schema schema) {
774 final Map<String, String> extProps = new HashMap<>();
775 String doc = schema.getDoc();
776 Set<String> aliases = schema.getAliases();
777 if (doc != null) {
778 extProps.put("doc", doc);
779 }
780 if (aliases != null) {
781 extProps.put("aliases", convertAliases(aliases));
782 }
783 return extProps;
784 }
785
786 private static FieldType createFieldType(ArrowType arrowType, Schema schema, Map<String, String> externalProps) {
787 return createFieldType(arrowType, schema, externalProps, /*dictionary=*/null);
788 }
789
790 private static FieldType createFieldType(
791 ArrowType arrowType,
792 Schema schema,
793 Map<String, String> externalProps,
794 DictionaryEncoding dictionary) {
795
796 return new FieldType(/*nullable=*/false, arrowType, dictionary,
797 getMetaData(schema, externalProps));
798 }
799
800 private static String convertAliases(Set<String> aliases) {
801 JsonStringArrayList jsonList = new JsonStringArrayList();
802 aliases.stream().forEach(a -> jsonList.add(a));
803 return jsonList.toString();
804 }
805}