]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one or more | |
3 | * contributor license agreements. See the NOTICE file distributed with | |
4 | * this work for additional information regarding copyright ownership. | |
5 | * The ASF licenses this file to You under the Apache License, Version 2.0 | |
6 | * (the "License"); you may not use this file except in compliance with | |
7 | * the License. You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | */ | |
17 | ||
18 | package org.apache.arrow.consumers; | |
19 | ||
20 | import java.io.IOException; | |
21 | ||
22 | import org.apache.arrow.util.AutoCloseables; | |
23 | import org.apache.arrow.vector.ValueVector; | |
24 | import org.apache.arrow.vector.complex.UnionVector; | |
25 | import org.apache.arrow.vector.types.Types; | |
26 | import org.apache.avro.io.Decoder; | |
27 | ||
28 | /** | |
29 | * Consumer which consume unions type values from avro decoder. | |
30 | * Write the data to {@link org.apache.arrow.vector.complex.UnionVector}. | |
31 | */ | |
32 | public class AvroUnionsConsumer extends BaseAvroConsumer<UnionVector> { | |
33 | ||
34 | private Consumer[] delegates; | |
35 | private Types.MinorType[] types; | |
36 | ||
37 | /** | |
38 | * Instantiate an AvroUnionConsumer. | |
39 | */ | |
40 | public AvroUnionsConsumer(UnionVector vector, Consumer[] delegates, Types.MinorType[] types) { | |
41 | ||
42 | super(vector); | |
43 | this.delegates = delegates; | |
44 | this.types = types; | |
45 | } | |
46 | ||
47 | @Override | |
48 | public void consume(Decoder decoder) throws IOException { | |
49 | int fieldIndex = decoder.readInt(); | |
50 | ||
51 | ensureInnerVectorCapacity(currentIndex + 1, fieldIndex); | |
52 | Consumer delegate = delegates[fieldIndex]; | |
53 | ||
54 | vector.setType(currentIndex, types[fieldIndex]); | |
55 | // In UnionVector we need to set sub vector writer position before consume a value | |
56 | // because in the previous iterations we might not have written to the specific union sub vector. | |
57 | delegate.setPosition(currentIndex); | |
58 | delegate.consume(decoder); | |
59 | ||
60 | currentIndex++; | |
61 | } | |
62 | ||
63 | @Override | |
64 | public void close() throws Exception { | |
65 | super.close(); | |
66 | AutoCloseables.close(delegates); | |
67 | } | |
68 | ||
69 | @Override | |
70 | public boolean resetValueVector(UnionVector vector) { | |
71 | for (int i = 0; i < delegates.length; i++) { | |
72 | delegates[i].resetValueVector(vector.getChildrenFromFields().get(i)); | |
73 | } | |
74 | return super.resetValueVector(vector); | |
75 | } | |
76 | ||
77 | void ensureInnerVectorCapacity(long targetCapacity, int fieldIndex) { | |
78 | ValueVector fieldVector = vector.getChildrenFromFields().get(fieldIndex); | |
79 | if (fieldVector.getMinorType() == Types.MinorType.NULL) { | |
80 | return; | |
81 | } | |
82 | while (fieldVector.getValueCapacity() < targetCapacity) { | |
83 | fieldVector.reAlloc(); | |
84 | } | |
85 | } | |
86 | } |