]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one or more | |
3 | * contributor license agreements. See the NOTICE file distributed with | |
4 | * this work for additional information regarding copyright ownership. | |
5 | * The ASF licenses this file to You under the Apache License, Version 2.0 | |
6 | * (the "License"); you may not use this file except in compliance with | |
7 | * the License. You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | */ | |
17 | ||
18 | package org.apache.arrow; | |
19 | ||
20 | import static org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE; | |
21 | import static org.apache.arrow.vector.types.FloatingPointPrecision.SINGLE; | |
22 | ||
23 | import java.io.EOFException; | |
24 | import java.io.IOException; | |
25 | import java.nio.charset.StandardCharsets; | |
26 | import java.util.ArrayList; | |
27 | import java.util.Arrays; | |
28 | import java.util.HashMap; | |
29 | import java.util.List; | |
30 | import java.util.Map; | |
31 | import java.util.Set; | |
32 | import java.util.stream.Collectors; | |
33 | ||
34 | import org.apache.arrow.consumers.AvroArraysConsumer; | |
35 | import org.apache.arrow.consumers.AvroBooleanConsumer; | |
36 | import org.apache.arrow.consumers.AvroBytesConsumer; | |
37 | import org.apache.arrow.consumers.AvroDoubleConsumer; | |
38 | import org.apache.arrow.consumers.AvroEnumConsumer; | |
39 | import org.apache.arrow.consumers.AvroFixedConsumer; | |
40 | import org.apache.arrow.consumers.AvroFloatConsumer; | |
41 | import org.apache.arrow.consumers.AvroIntConsumer; | |
42 | import org.apache.arrow.consumers.AvroLongConsumer; | |
43 | import org.apache.arrow.consumers.AvroMapConsumer; | |
44 | import org.apache.arrow.consumers.AvroNullConsumer; | |
45 | import org.apache.arrow.consumers.AvroStringConsumer; | |
46 | import org.apache.arrow.consumers.AvroStructConsumer; | |
47 | import org.apache.arrow.consumers.AvroUnionsConsumer; | |
48 | import org.apache.arrow.consumers.CompositeAvroConsumer; | |
49 | import org.apache.arrow.consumers.Consumer; | |
50 | import org.apache.arrow.consumers.SkipConsumer; | |
51 | import org.apache.arrow.consumers.SkipFunction; | |
52 | import org.apache.arrow.consumers.logical.AvroDateConsumer; | |
53 | import org.apache.arrow.consumers.logical.AvroDecimalConsumer; | |
54 | import org.apache.arrow.consumers.logical.AvroTimeMicroConsumer; | |
55 | import org.apache.arrow.consumers.logical.AvroTimeMillisConsumer; | |
56 | import org.apache.arrow.consumers.logical.AvroTimestampMicrosConsumer; | |
57 | import org.apache.arrow.consumers.logical.AvroTimestampMillisConsumer; | |
58 | import org.apache.arrow.memory.BufferAllocator; | |
59 | import org.apache.arrow.util.Preconditions; | |
60 | import org.apache.arrow.vector.BaseIntVector; | |
61 | import org.apache.arrow.vector.BigIntVector; | |
62 | import org.apache.arrow.vector.BitVector; | |
63 | import org.apache.arrow.vector.DateDayVector; | |
64 | import org.apache.arrow.vector.DecimalVector; | |
65 | import org.apache.arrow.vector.FieldVector; | |
66 | import org.apache.arrow.vector.FixedSizeBinaryVector; | |
67 | import org.apache.arrow.vector.Float4Vector; | |
68 | import org.apache.arrow.vector.Float8Vector; | |
69 | import org.apache.arrow.vector.IntVector; | |
70 | import org.apache.arrow.vector.NullVector; | |
71 | import org.apache.arrow.vector.TimeMicroVector; | |
72 | import org.apache.arrow.vector.TimeMilliVector; | |
73 | import org.apache.arrow.vector.TimeStampMicroVector; | |
74 | import org.apache.arrow.vector.TimeStampMilliVector; | |
75 | import org.apache.arrow.vector.VarBinaryVector; | |
76 | import org.apache.arrow.vector.VarCharVector; | |
77 | import org.apache.arrow.vector.VectorSchemaRoot; | |
78 | import org.apache.arrow.vector.complex.ListVector; | |
79 | import org.apache.arrow.vector.complex.MapVector; | |
80 | import org.apache.arrow.vector.complex.StructVector; | |
81 | import org.apache.arrow.vector.complex.UnionVector; | |
82 | import org.apache.arrow.vector.dictionary.Dictionary; | |
83 | import org.apache.arrow.vector.dictionary.DictionaryEncoder; | |
84 | import org.apache.arrow.vector.dictionary.DictionaryProvider; | |
85 | import org.apache.arrow.vector.types.DateUnit; | |
86 | import org.apache.arrow.vector.types.TimeUnit; | |
87 | import org.apache.arrow.vector.types.Types; | |
88 | import org.apache.arrow.vector.types.UnionMode; | |
89 | import org.apache.arrow.vector.types.pojo.ArrowType; | |
90 | import org.apache.arrow.vector.types.pojo.DictionaryEncoding; | |
91 | import org.apache.arrow.vector.types.pojo.Field; | |
92 | import org.apache.arrow.vector.types.pojo.FieldType; | |
93 | import org.apache.arrow.vector.util.JsonStringArrayList; | |
94 | import org.apache.arrow.vector.util.ValueVectorUtility; | |
95 | import org.apache.avro.LogicalType; | |
96 | import org.apache.avro.LogicalTypes; | |
97 | import org.apache.avro.Schema; | |
98 | import org.apache.avro.Schema.Type; | |
99 | import org.apache.avro.io.Decoder; | |
100 | ||
101 | /** | |
102 | * Class that does most of the work to convert Avro data into Arrow columnar format Vector objects. | |
103 | */ | |
104 | public class AvroToArrowUtils { | |
105 | ||
106 | /** | |
107 | * Creates a {@link Consumer} from the {@link Schema} | |
108 | * | |
109 | <p>This method currently performs following type mapping for Avro data types to corresponding Arrow data types. | |
110 | * | |
111 | * <ul> | |
112 | * <li>STRING --> ArrowType.Utf8</li> | |
113 | * <li>INT --> ArrowType.Int(32, signed)</li> | |
114 | * <li>LONG --> ArrowType.Int(64, signed)</li> | |
115 | * <li>FLOAT --> ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)</li> | |
116 | * <li>DOUBLE --> ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)</li> | |
117 | * <li>BOOLEAN --> ArrowType.Bool</li> | |
118 | * <li>BYTES --> ArrowType.Binary</li> | |
119 | * <li>ARRAY --> ArrowType.List</li> | |
120 | * <li>MAP --> ArrowType.Map</li> | |
121 | * <li>FIXED --> ArrowType.FixedSizeBinary</li> | |
122 | * <li>RECORD --> ArrowType.Struct</li> | |
123 | * <li>UNION --> ArrowType.Union</li> | |
124 | * <li>ENUM--> ArrowType.Int</li> | |
125 | * <li>DECIMAL --> ArrowType.Decimal</li> | |
126 | * <li>Date --> ArrowType.Date(DateUnit.DAY)</li> | |
127 | * <li>TimeMillis --> ArrowType.Time(TimeUnit.MILLISECOND, 32)</li> | |
128 | * <li>TimeMicros --> ArrowType.Time(TimeUnit.MICROSECOND, 64)</li> | |
129 | * <li>TimestampMillis --> ArrowType.Timestamp(TimeUnit.MILLISECOND, null)</li> | |
130 | * <li>TimestampMicros --> ArrowType.Timestamp(TimeUnit.MICROSECOND, null)</li> | |
131 | * </ul> | |
132 | */ | |
133 | ||
134 | private static Consumer createConsumer(Schema schema, String name, AvroToArrowConfig config) { | |
135 | return createConsumer(schema, name, false, config, null); | |
136 | } | |
137 | ||
138 | private static Consumer createConsumer(Schema schema, String name, AvroToArrowConfig config, FieldVector vector) { | |
139 | return createConsumer(schema, name, false, config, vector); | |
140 | } | |
141 | ||
142 | /** | |
143 | * Create a consumer with the given Avro schema. | |
144 | * | |
145 | * @param schema avro schema | |
146 | * @param name arrow field name | |
147 | * @param consumerVector vector to keep in consumer, if v == null, will create a new vector via field. | |
148 | * @return consumer | |
149 | */ | |
150 | private static Consumer createConsumer( | |
151 | Schema schema, | |
152 | String name, | |
153 | boolean nullable, | |
154 | AvroToArrowConfig config, | |
155 | FieldVector consumerVector) { | |
156 | ||
157 | Preconditions.checkNotNull(schema, "Avro schema object can't be null"); | |
158 | Preconditions.checkNotNull(config, "Config can't be null"); | |
159 | ||
160 | final BufferAllocator allocator = config.getAllocator(); | |
161 | ||
162 | final Type type = schema.getType(); | |
163 | final LogicalType logicalType = schema.getLogicalType(); | |
164 | ||
165 | final ArrowType arrowType; | |
166 | final FieldType fieldType; | |
167 | final FieldVector vector; | |
168 | final Consumer consumer; | |
169 | ||
170 | switch (type) { | |
171 | case UNION: | |
172 | consumer = createUnionConsumer(schema, name, config, consumerVector); | |
173 | break; | |
174 | case ARRAY: | |
175 | consumer = createArrayConsumer(schema, name, config, consumerVector); | |
176 | break; | |
177 | case MAP: | |
178 | consumer = createMapConsumer(schema, name, config, consumerVector); | |
179 | break; | |
180 | case RECORD: | |
181 | consumer = createStructConsumer(schema, name, config, consumerVector); | |
182 | break; | |
183 | case ENUM: | |
184 | consumer = createEnumConsumer(schema, name, config, consumerVector); | |
185 | break; | |
186 | case STRING: | |
187 | arrowType = new ArrowType.Utf8(); | |
188 | fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); | |
189 | vector = createVector(consumerVector, fieldType, name, allocator); | |
190 | consumer = new AvroStringConsumer((VarCharVector) vector); | |
191 | break; | |
192 | case FIXED: | |
193 | Map<String, String> extProps = createExternalProps(schema); | |
194 | if (logicalType instanceof LogicalTypes.Decimal) { | |
195 | arrowType = createDecimalArrowType((LogicalTypes.Decimal) logicalType); | |
196 | fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema, extProps)); | |
197 | vector = createVector(consumerVector, fieldType, name, allocator); | |
198 | consumer = new AvroDecimalConsumer.FixedDecimalConsumer((DecimalVector) vector, schema.getFixedSize()); | |
199 | } else { | |
200 | arrowType = new ArrowType.FixedSizeBinary(schema.getFixedSize()); | |
201 | fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema, extProps)); | |
202 | vector = createVector(consumerVector, fieldType, name, allocator); | |
203 | consumer = new AvroFixedConsumer((FixedSizeBinaryVector) vector, schema.getFixedSize()); | |
204 | } | |
205 | break; | |
206 | case INT: | |
207 | if (logicalType instanceof LogicalTypes.Date) { | |
208 | arrowType = new ArrowType.Date(DateUnit.DAY); | |
209 | fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); | |
210 | vector = createVector(consumerVector, fieldType, name, allocator); | |
211 | consumer = new AvroDateConsumer((DateDayVector) vector); | |
212 | } else if (logicalType instanceof LogicalTypes.TimeMillis) { | |
213 | arrowType = new ArrowType.Time(TimeUnit.MILLISECOND, 32); | |
214 | fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); | |
215 | vector = createVector(consumerVector, fieldType, name, allocator); | |
216 | consumer = new AvroTimeMillisConsumer((TimeMilliVector) vector); | |
217 | } else { | |
218 | arrowType = new ArrowType.Int(32, /*signed=*/true); | |
219 | fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); | |
220 | vector = createVector(consumerVector, fieldType, name, allocator); | |
221 | consumer = new AvroIntConsumer((IntVector) vector); | |
222 | } | |
223 | break; | |
224 | case BOOLEAN: | |
225 | arrowType = new ArrowType.Bool(); | |
226 | fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); | |
227 | vector = createVector(consumerVector, fieldType, name, allocator); | |
228 | consumer = new AvroBooleanConsumer((BitVector) vector); | |
229 | break; | |
230 | case LONG: | |
231 | if (logicalType instanceof LogicalTypes.TimeMicros) { | |
232 | arrowType = new ArrowType.Time(TimeUnit.MICROSECOND, 64); | |
233 | fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); | |
234 | vector = createVector(consumerVector, fieldType, name, allocator); | |
235 | consumer = new AvroTimeMicroConsumer((TimeMicroVector) vector); | |
236 | } else if (logicalType instanceof LogicalTypes.TimestampMillis) { | |
237 | arrowType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, null); | |
238 | fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); | |
239 | vector = createVector(consumerVector, fieldType, name, allocator); | |
240 | consumer = new AvroTimestampMillisConsumer((TimeStampMilliVector) vector); | |
241 | } else if (logicalType instanceof LogicalTypes.TimestampMicros) { | |
242 | arrowType = new ArrowType.Timestamp(TimeUnit.MICROSECOND, null); | |
243 | fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); | |
244 | vector = createVector(consumerVector, fieldType, name, allocator); | |
245 | consumer = new AvroTimestampMicrosConsumer((TimeStampMicroVector) vector); | |
246 | } else { | |
247 | arrowType = new ArrowType.Int(64, /*signed=*/true); | |
248 | fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); | |
249 | vector = createVector(consumerVector, fieldType, name, allocator); | |
250 | consumer = new AvroLongConsumer((BigIntVector) vector); | |
251 | } | |
252 | break; | |
253 | case FLOAT: | |
254 | arrowType = new ArrowType.FloatingPoint(SINGLE); | |
255 | fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); | |
256 | vector = createVector(consumerVector, fieldType, name, allocator); | |
257 | consumer = new AvroFloatConsumer((Float4Vector) vector); | |
258 | break; | |
259 | case DOUBLE: | |
260 | arrowType = new ArrowType.FloatingPoint(DOUBLE); | |
261 | fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); | |
262 | vector = createVector(consumerVector, fieldType, name, allocator); | |
263 | consumer = new AvroDoubleConsumer((Float8Vector) vector); | |
264 | break; | |
265 | case BYTES: | |
266 | if (logicalType instanceof LogicalTypes.Decimal) { | |
267 | arrowType = createDecimalArrowType((LogicalTypes.Decimal) logicalType); | |
268 | fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); | |
269 | vector = createVector(consumerVector, fieldType, name, allocator); | |
270 | consumer = new AvroDecimalConsumer.BytesDecimalConsumer((DecimalVector) vector); | |
271 | } else { | |
272 | arrowType = new ArrowType.Binary(); | |
273 | fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); | |
274 | vector = createVector(consumerVector, fieldType, name, allocator); | |
275 | consumer = new AvroBytesConsumer((VarBinaryVector) vector); | |
276 | } | |
277 | break; | |
278 | case NULL: | |
279 | arrowType = new ArrowType.Null(); | |
280 | fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); | |
281 | vector = fieldType.createNewSingleVector(name, allocator, /*schemaCallback=*/null); | |
282 | consumer = new AvroNullConsumer((NullVector) vector); | |
283 | break; | |
284 | default: | |
285 | // no-op, shouldn't get here | |
286 | throw new UnsupportedOperationException("Can't convert avro type %s to arrow type." + type.getName()); | |
287 | } | |
288 | return consumer; | |
289 | } | |
290 | ||
291 | private static ArrowType createDecimalArrowType(LogicalTypes.Decimal logicalType) { | |
292 | final int scale = logicalType.getScale(); | |
293 | final int precision = logicalType.getPrecision(); | |
294 | Preconditions.checkArgument(precision > 0 && precision <= 38, | |
295 | "Precision must be in range of 1 to 38"); | |
296 | Preconditions.checkArgument(scale >= 0 && scale <= 38, | |
297 | "Scale must be in range of 0 to 38."); | |
298 | Preconditions.checkArgument(scale <= precision, | |
299 | "Invalid decimal scale: %s (greater than precision: %s)", scale, precision); | |
300 | ||
301 | return new ArrowType.Decimal(precision, scale, 128); | |
302 | ||
303 | } | |
304 | ||
305 | private static Consumer createSkipConsumer(Schema schema) { | |
306 | ||
307 | SkipFunction skipFunction; | |
308 | Type type = schema.getType(); | |
309 | ||
310 | switch (type) { | |
311 | case UNION: | |
312 | List<Consumer> unionDelegates = schema.getTypes().stream().map(s -> | |
313 | createSkipConsumer(s)).collect(Collectors.toList()); | |
314 | skipFunction = decoder -> unionDelegates.get(decoder.readInt()).consume(decoder); | |
315 | ||
316 | break; | |
317 | case ARRAY: | |
318 | Consumer elementDelegate = createSkipConsumer(schema.getElementType()); | |
319 | skipFunction = decoder -> { | |
320 | for (long i = decoder.skipArray(); i != 0; i = decoder.skipArray()) { | |
321 | for (long j = 0; j < i; j++) { | |
322 | elementDelegate.consume(decoder); | |
323 | } | |
324 | } | |
325 | }; | |
326 | break; | |
327 | case MAP: | |
328 | Consumer valueDelegate = createSkipConsumer(schema.getValueType()); | |
329 | skipFunction = decoder -> { | |
330 | for (long i = decoder.skipMap(); i != 0; i = decoder.skipMap()) { | |
331 | for (long j = 0; j < i; j++) { | |
332 | decoder.skipString(); // Discard key | |
333 | valueDelegate.consume(decoder); | |
334 | } | |
335 | } | |
336 | }; | |
337 | break; | |
338 | case RECORD: | |
339 | List<Consumer> delegates = schema.getFields().stream().map(field -> | |
340 | createSkipConsumer(field.schema())).collect(Collectors.toList()); | |
341 | ||
342 | skipFunction = decoder -> { | |
343 | for (Consumer consumer : delegates) { | |
344 | consumer.consume(decoder); | |
345 | } | |
346 | }; | |
347 | ||
348 | break; | |
349 | case ENUM: | |
350 | skipFunction = decoder -> decoder.readEnum(); | |
351 | break; | |
352 | case STRING: | |
353 | skipFunction = decoder -> decoder.skipString(); | |
354 | break; | |
355 | case FIXED: | |
356 | skipFunction = decoder -> decoder.skipFixed(schema.getFixedSize()); | |
357 | break; | |
358 | case INT: | |
359 | skipFunction = decoder -> decoder.readInt(); | |
360 | break; | |
361 | case BOOLEAN: | |
362 | skipFunction = decoder -> decoder.skipFixed(1); | |
363 | break; | |
364 | case LONG: | |
365 | skipFunction = decoder -> decoder.readLong(); | |
366 | break; | |
367 | case FLOAT: | |
368 | skipFunction = decoder -> decoder.readFloat(); | |
369 | break; | |
370 | case DOUBLE: | |
371 | skipFunction = decoder -> decoder.readDouble(); | |
372 | break; | |
373 | case BYTES: | |
374 | skipFunction = decoder -> decoder.skipBytes(); | |
375 | break; | |
376 | case NULL: | |
377 | skipFunction = decoder -> { }; | |
378 | break; | |
379 | default: | |
380 | // no-op, shouldn't get here | |
381 | throw new UnsupportedOperationException("Invalid avro type: " + type.getName()); | |
382 | } | |
383 | ||
384 | return new SkipConsumer(skipFunction); | |
385 | } | |
386 | ||
387 | static CompositeAvroConsumer createCompositeConsumer( | |
388 | Schema schema, AvroToArrowConfig config) { | |
389 | ||
390 | List<Consumer> consumers = new ArrayList<>(); | |
391 | final Set<String> skipFieldNames = config.getSkipFieldNames(); | |
392 | ||
393 | Schema.Type type = schema.getType(); | |
394 | if (type == Type.RECORD) { | |
395 | for (Schema.Field field : schema.getFields()) { | |
396 | if (skipFieldNames.contains(field.name())) { | |
397 | consumers.add(createSkipConsumer(field.schema())); | |
398 | } else { | |
399 | Consumer consumer = createConsumer(field.schema(), field.name(), config); | |
400 | consumers.add(consumer); | |
401 | } | |
402 | ||
403 | } | |
404 | } else { | |
405 | Consumer consumer = createConsumer(schema, "", config); | |
406 | consumers.add(consumer); | |
407 | } | |
408 | ||
409 | return new CompositeAvroConsumer(consumers); | |
410 | } | |
411 | ||
412 | private static FieldVector createVector(FieldVector consumerVector, FieldType fieldType, | |
413 | String name, BufferAllocator allocator) { | |
414 | return consumerVector != null ? consumerVector : fieldType.createNewSingleVector(name, allocator, null); | |
415 | } | |
416 | ||
417 | private static String getDefaultFieldName(ArrowType type) { | |
418 | Types.MinorType minorType = Types.getMinorTypeForArrowType(type); | |
419 | return minorType.name().toLowerCase(); | |
420 | } | |
421 | ||
422 | private static Field avroSchemaToField(Schema schema, String name, AvroToArrowConfig config) { | |
423 | return avroSchemaToField(schema, name, config, null); | |
424 | } | |
425 | ||
426 | private static Field avroSchemaToField( | |
427 | Schema schema, | |
428 | String name, | |
429 | AvroToArrowConfig config, | |
430 | Map<String, String> externalProps) { | |
431 | ||
432 | final Type type = schema.getType(); | |
433 | final LogicalType logicalType = schema.getLogicalType(); | |
434 | final List<Field> children = new ArrayList<>(); | |
435 | final FieldType fieldType; | |
436 | ||
437 | switch (type) { | |
438 | case UNION: | |
439 | for (int i = 0; i < schema.getTypes().size(); i++) { | |
440 | Schema childSchema = schema.getTypes().get(i); | |
441 | // Union child vector should use default name | |
442 | children.add(avroSchemaToField(childSchema, null, config)); | |
443 | } | |
444 | fieldType = createFieldType(new ArrowType.Union(UnionMode.Sparse, null), schema, externalProps); | |
445 | break; | |
446 | case ARRAY: | |
447 | Schema elementSchema = schema.getElementType(); | |
448 | children.add(avroSchemaToField(elementSchema, elementSchema.getName(), config)); | |
449 | fieldType = createFieldType(new ArrowType.List(), schema, externalProps); | |
450 | break; | |
451 | case MAP: | |
452 | // MapVector internal struct field and key field should be non-nullable | |
453 | FieldType keyFieldType = new FieldType(/*nullable=*/false, new ArrowType.Utf8(), /*dictionary=*/null); | |
454 | Field keyField = new Field("key", keyFieldType, /*children=*/null); | |
455 | Field valueField = avroSchemaToField(schema.getValueType(), "value", config); | |
456 | ||
457 | FieldType structFieldType = new FieldType(false, new ArrowType.Struct(), /*dictionary=*/null); | |
458 | Field structField = new Field("internal", structFieldType, Arrays.asList(keyField, valueField)); | |
459 | children.add(structField); | |
460 | fieldType = createFieldType(new ArrowType.Map(/*keySorted=*/false), schema, externalProps); | |
461 | break; | |
462 | case RECORD: | |
463 | final Set<String> skipFieldNames = config.getSkipFieldNames(); | |
464 | for (int i = 0; i < schema.getFields().size(); i++) { | |
465 | final Schema.Field field = schema.getFields().get(i); | |
466 | Schema childSchema = field.schema(); | |
467 | String fullChildName = String.format("%s.%s", name, field.name()); | |
468 | if (!skipFieldNames.contains(fullChildName)) { | |
469 | final Map<String, String> extProps = new HashMap<>(); | |
470 | String doc = field.doc(); | |
471 | Set<String> aliases = field.aliases(); | |
472 | if (doc != null) { | |
473 | extProps.put("doc", doc); | |
474 | } | |
475 | if (aliases != null) { | |
476 | extProps.put("aliases", convertAliases(aliases)); | |
477 | } | |
478 | children.add(avroSchemaToField(childSchema, fullChildName, config, extProps)); | |
479 | } | |
480 | } | |
481 | fieldType = createFieldType(new ArrowType.Struct(), schema, externalProps); | |
482 | break; | |
483 | case ENUM: | |
484 | DictionaryProvider.MapDictionaryProvider provider = config.getProvider(); | |
485 | int current = provider.getDictionaryIds().size(); | |
486 | int enumCount = schema.getEnumSymbols().size(); | |
487 | ArrowType.Int indexType = DictionaryEncoder.getIndexType(enumCount); | |
488 | ||
489 | fieldType = createFieldType(indexType, schema, externalProps, | |
490 | new DictionaryEncoding(current, /*ordered=*/false, /*indexType=*/indexType)); | |
491 | break; | |
492 | ||
493 | case STRING: | |
494 | fieldType = createFieldType(new ArrowType.Utf8(), schema, externalProps); | |
495 | break; | |
496 | case FIXED: | |
497 | final ArrowType fixedArrowType; | |
498 | if (logicalType instanceof LogicalTypes.Decimal) { | |
499 | fixedArrowType = createDecimalArrowType((LogicalTypes.Decimal) logicalType); | |
500 | } else { | |
501 | fixedArrowType = new ArrowType.FixedSizeBinary(schema.getFixedSize()); | |
502 | } | |
503 | fieldType = createFieldType(fixedArrowType, schema, externalProps); | |
504 | break; | |
505 | case INT: | |
506 | final ArrowType intArrowType; | |
507 | if (logicalType instanceof LogicalTypes.Date) { | |
508 | intArrowType = new ArrowType.Date(DateUnit.DAY); | |
509 | } else if (logicalType instanceof LogicalTypes.TimeMillis) { | |
510 | intArrowType = new ArrowType.Time(TimeUnit.MILLISECOND, 32); | |
511 | } else { | |
512 | intArrowType = new ArrowType.Int(32, /*signed=*/true); | |
513 | } | |
514 | fieldType = createFieldType(intArrowType, schema, externalProps); | |
515 | break; | |
516 | case BOOLEAN: | |
517 | fieldType = createFieldType(new ArrowType.Bool(), schema, externalProps); | |
518 | break; | |
519 | case LONG: | |
520 | final ArrowType longArrowType; | |
521 | if (logicalType instanceof LogicalTypes.TimeMicros) { | |
522 | longArrowType = new ArrowType.Time(TimeUnit.MICROSECOND, 64); | |
523 | } else if (logicalType instanceof LogicalTypes.TimestampMillis) { | |
524 | longArrowType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, null); | |
525 | } else if (logicalType instanceof LogicalTypes.TimestampMicros) { | |
526 | longArrowType = new ArrowType.Timestamp(TimeUnit.MICROSECOND, null); | |
527 | } else { | |
528 | longArrowType = new ArrowType.Int(64, /*signed=*/true); | |
529 | } | |
530 | fieldType = createFieldType(longArrowType, schema, externalProps); | |
531 | break; | |
532 | case FLOAT: | |
533 | fieldType = createFieldType(new ArrowType.FloatingPoint(SINGLE), schema, externalProps); | |
534 | break; | |
535 | case DOUBLE: | |
536 | fieldType = createFieldType(new ArrowType.FloatingPoint(DOUBLE), schema, externalProps); | |
537 | break; | |
538 | case BYTES: | |
539 | final ArrowType bytesArrowType; | |
540 | if (logicalType instanceof LogicalTypes.Decimal) { | |
541 | bytesArrowType = createDecimalArrowType((LogicalTypes.Decimal) logicalType); | |
542 | } else { | |
543 | bytesArrowType = new ArrowType.Binary(); | |
544 | } | |
545 | fieldType = createFieldType(bytesArrowType, schema, externalProps); | |
546 | break; | |
547 | case NULL: | |
548 | fieldType = createFieldType(ArrowType.Null.INSTANCE, schema, externalProps); | |
549 | break; | |
550 | default: | |
551 | // no-op, shouldn't get here | |
552 | throw new UnsupportedOperationException(); | |
553 | } | |
554 | ||
555 | if (name == null) { | |
556 | name = getDefaultFieldName(fieldType.getType()); | |
557 | } | |
558 | return new Field(name, fieldType, children.size() == 0 ? null : children); | |
559 | } | |
560 | ||
561 | private static Consumer createArrayConsumer(Schema schema, String name, AvroToArrowConfig config, | |
562 | FieldVector consumerVector) { | |
563 | ||
564 | ListVector listVector; | |
565 | if (consumerVector == null) { | |
566 | final Field field = avroSchemaToField(schema, name, config); | |
567 | listVector = (ListVector) field.createVector(config.getAllocator()); | |
568 | } else { | |
569 | listVector = (ListVector) consumerVector; | |
570 | } | |
571 | ||
572 | FieldVector dataVector = listVector.getDataVector(); | |
573 | ||
574 | // create delegate | |
575 | Schema childSchema = schema.getElementType(); | |
576 | Consumer delegate = createConsumer(childSchema, childSchema.getName(), config, dataVector); | |
577 | ||
578 | return new AvroArraysConsumer(listVector, delegate); | |
579 | } | |
580 | ||
581 | private static Consumer createStructConsumer(Schema schema, String name, AvroToArrowConfig config, | |
582 | FieldVector consumerVector) { | |
583 | ||
584 | final Set<String> skipFieldNames = config.getSkipFieldNames(); | |
585 | ||
586 | StructVector structVector; | |
587 | if (consumerVector == null) { | |
588 | final Field field = avroSchemaToField(schema, name, config, createExternalProps(schema)); | |
589 | structVector = (StructVector) field.createVector(config.getAllocator()); | |
590 | } else { | |
591 | structVector = (StructVector) consumerVector; | |
592 | } | |
593 | ||
594 | Consumer[] delegates = new Consumer[schema.getFields().size()]; | |
595 | int vectorIndex = 0; | |
596 | for (int i = 0; i < schema.getFields().size(); i++) { | |
597 | Schema.Field childField = schema.getFields().get(i); | |
598 | Consumer delegate; | |
599 | // use full name to distinguish fields have same names between parent and child fields. | |
600 | final String fullChildName = String.format("%s.%s", name, childField.name()); | |
601 | if (skipFieldNames.contains(fullChildName)) { | |
602 | delegate = createSkipConsumer(childField.schema()); | |
603 | } else { | |
604 | delegate = createConsumer(childField.schema(), fullChildName, config, | |
605 | structVector.getChildrenFromFields().get(vectorIndex++)); | |
606 | } | |
607 | ||
608 | delegates[i] = delegate; | |
609 | } | |
610 | ||
611 | return new AvroStructConsumer(structVector, delegates); | |
612 | ||
613 | } | |
614 | ||
615 | private static Consumer createEnumConsumer(Schema schema, String name, AvroToArrowConfig config, | |
616 | FieldVector consumerVector) { | |
617 | ||
618 | BaseIntVector indexVector; | |
619 | if (consumerVector == null) { | |
620 | final Field field = avroSchemaToField(schema, name, config, createExternalProps(schema)); | |
621 | indexVector = (BaseIntVector) field.createVector(config.getAllocator()); | |
622 | } else { | |
623 | indexVector = (BaseIntVector) consumerVector; | |
624 | } | |
625 | ||
626 | final int valueCount = schema.getEnumSymbols().size(); | |
627 | VarCharVector dictVector = new VarCharVector(name, config.getAllocator()); | |
628 | dictVector.allocateNewSafe(); | |
629 | dictVector.setValueCount(valueCount); | |
630 | for (int i = 0; i < valueCount; i++) { | |
631 | dictVector.set(i, schema.getEnumSymbols().get(i).getBytes(StandardCharsets.UTF_8)); | |
632 | } | |
633 | Dictionary dictionary = | |
634 | new Dictionary(dictVector, indexVector.getField().getDictionary()); | |
635 | config.getProvider().put(dictionary); | |
636 | ||
637 | return new AvroEnumConsumer(indexVector); | |
638 | ||
639 | } | |
640 | ||
641 | private static Consumer createMapConsumer(Schema schema, String name, AvroToArrowConfig config, | |
642 | FieldVector consumerVector) { | |
643 | ||
644 | MapVector mapVector; | |
645 | if (consumerVector == null) { | |
646 | final Field field = avroSchemaToField(schema, name, config); | |
647 | mapVector = (MapVector) field.createVector(config.getAllocator()); | |
648 | } else { | |
649 | mapVector = (MapVector) consumerVector; | |
650 | } | |
651 | ||
652 | // create delegate struct consumer | |
653 | StructVector structVector = (StructVector) mapVector.getDataVector(); | |
654 | ||
655 | // keys in avro map are always assumed to be strings. | |
656 | Consumer keyConsumer = new AvroStringConsumer( | |
657 | (VarCharVector) structVector.getChildrenFromFields().get(0)); | |
658 | Consumer valueConsumer = createConsumer(schema.getValueType(), schema.getValueType().getName(), | |
659 | config, structVector.getChildrenFromFields().get(1)); | |
660 | ||
661 | AvroStructConsumer internalConsumer = | |
662 | new AvroStructConsumer(structVector, new Consumer[] {keyConsumer, valueConsumer}); | |
663 | ||
664 | return new AvroMapConsumer(mapVector, internalConsumer); | |
665 | } | |
666 | ||
667 | private static Consumer createUnionConsumer(Schema schema, String name, AvroToArrowConfig config, | |
668 | FieldVector consumerVector) { | |
669 | final int size = schema.getTypes().size(); | |
670 | ||
671 | final boolean nullable = schema.getTypes().stream().anyMatch(t -> t.getType() == Type.NULL); | |
672 | ||
673 | UnionVector unionVector; | |
674 | if (consumerVector == null) { | |
675 | final Field field = avroSchemaToField(schema, name, config); | |
676 | unionVector = (UnionVector) field.createVector(config.getAllocator()); | |
677 | } else { | |
678 | unionVector = (UnionVector) consumerVector; | |
679 | } | |
680 | ||
681 | List<FieldVector> childVectors = unionVector.getChildrenFromFields(); | |
682 | ||
683 | Consumer[] delegates = new Consumer[size]; | |
684 | Types.MinorType[] types = new Types.MinorType[size]; | |
685 | ||
686 | for (int i = 0; i < size; i++) { | |
687 | FieldVector child = childVectors.get(i); | |
688 | Schema subSchema = schema.getTypes().get(i); | |
689 | Consumer delegate = createConsumer(subSchema, subSchema.getName(), nullable, config, child); | |
690 | delegates[i] = delegate; | |
691 | types[i] = child.getMinorType(); | |
692 | } | |
693 | return new AvroUnionsConsumer(unionVector, delegates, types); | |
694 | } | |
695 | ||
696 | /** | |
697 | * Read data from {@link Decoder} and generate a {@link VectorSchemaRoot}. | |
698 | * @param schema avro schema | |
699 | * @param decoder avro decoder to read data from | |
700 | */ | |
701 | static VectorSchemaRoot avroToArrowVectors( | |
702 | Schema schema, | |
703 | Decoder decoder, | |
704 | AvroToArrowConfig config) | |
705 | throws IOException { | |
706 | ||
707 | List<FieldVector> vectors = new ArrayList<>(); | |
708 | List<Consumer> consumers = new ArrayList<>(); | |
709 | final Set<String> skipFieldNames = config.getSkipFieldNames(); | |
710 | ||
711 | Schema.Type type = schema.getType(); | |
712 | if (type == Type.RECORD) { | |
713 | for (Schema.Field field : schema.getFields()) { | |
714 | if (skipFieldNames.contains(field.name())) { | |
715 | consumers.add(createSkipConsumer(field.schema())); | |
716 | } else { | |
717 | Consumer consumer = createConsumer(field.schema(), field.name(), config); | |
718 | consumers.add(consumer); | |
719 | vectors.add(consumer.getVector()); | |
720 | } | |
721 | } | |
722 | } else { | |
723 | Consumer consumer = createConsumer(schema, "", config); | |
724 | consumers.add(consumer); | |
725 | vectors.add(consumer.getVector()); | |
726 | } | |
727 | ||
728 | long validConsumerCount = consumers.stream().filter(c -> !c.skippable()).count(); | |
729 | Preconditions.checkArgument(vectors.size() == validConsumerCount, | |
730 | "vectors size not equals consumers size."); | |
731 | ||
732 | List<Field> fields = vectors.stream().map(t -> t.getField()).collect(Collectors.toList()); | |
733 | ||
734 | VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 0); | |
735 | ||
736 | CompositeAvroConsumer compositeConsumer = new CompositeAvroConsumer(consumers); | |
737 | ||
738 | int valueCount = 0; | |
739 | try { | |
740 | while (true) { | |
741 | ValueVectorUtility.ensureCapacity(root, valueCount + 1); | |
742 | compositeConsumer.consume(decoder); | |
743 | valueCount++; | |
744 | } | |
745 | } catch (EOFException eof) { | |
746 | // reach the end of encoder stream. | |
747 | root.setRowCount(valueCount); | |
748 | } catch (Exception e) { | |
749 | compositeConsumer.close(); | |
750 | throw new UnsupportedOperationException("Error occurs while consume process.", e); | |
751 | } | |
752 | ||
753 | return root; | |
754 | } | |
755 | ||
756 | private static Map<String, String> getMetaData(Schema schema) { | |
757 | Map<String, String> metadata = new HashMap<>(); | |
758 | schema.getObjectProps().forEach((k, v) -> metadata.put(k, v.toString())); | |
759 | return metadata; | |
760 | } | |
761 | ||
762 | private static Map<String, String> getMetaData(Schema schema, Map<String, String> externalProps) { | |
763 | Map<String, String> metadata = getMetaData(schema); | |
764 | if (externalProps != null) { | |
765 | metadata.putAll(externalProps); | |
766 | } | |
767 | return metadata; | |
768 | } | |
769 | ||
770 | /** | |
771 | * Parse avro attributes and convert them to metadata. | |
772 | */ | |
773 | private static Map<String, String> createExternalProps(Schema schema) { | |
774 | final Map<String, String> extProps = new HashMap<>(); | |
775 | String doc = schema.getDoc(); | |
776 | Set<String> aliases = schema.getAliases(); | |
777 | if (doc != null) { | |
778 | extProps.put("doc", doc); | |
779 | } | |
780 | if (aliases != null) { | |
781 | extProps.put("aliases", convertAliases(aliases)); | |
782 | } | |
783 | return extProps; | |
784 | } | |
785 | ||
786 | private static FieldType createFieldType(ArrowType arrowType, Schema schema, Map<String, String> externalProps) { | |
787 | return createFieldType(arrowType, schema, externalProps, /*dictionary=*/null); | |
788 | } | |
789 | ||
790 | private static FieldType createFieldType( | |
791 | ArrowType arrowType, | |
792 | Schema schema, | |
793 | Map<String, String> externalProps, | |
794 | DictionaryEncoding dictionary) { | |
795 | ||
796 | return new FieldType(/*nullable=*/false, arrowType, dictionary, | |
797 | getMetaData(schema, externalProps)); | |
798 | } | |
799 | ||
800 | private static String convertAliases(Set<String> aliases) { | |
801 | JsonStringArrayList jsonList = new JsonStringArrayList(); | |
802 | aliases.stream().forEach(a -> jsonList.add(a)); | |
803 | return jsonList.toString(); | |
804 | } | |
805 | } |