]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / java / adapter / avro / src / main / java / org / apache / arrow / consumers / AvroUnionsConsumer.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.arrow.consumers;
19
20 import java.io.IOException;
21
22 import org.apache.arrow.util.AutoCloseables;
23 import org.apache.arrow.vector.ValueVector;
24 import org.apache.arrow.vector.complex.UnionVector;
25 import org.apache.arrow.vector.types.Types;
26 import org.apache.avro.io.Decoder;
27
28 /**
29 * Consumer which consume unions type values from avro decoder.
30 * Write the data to {@link org.apache.arrow.vector.complex.UnionVector}.
31 */
32 public class AvroUnionsConsumer extends BaseAvroConsumer<UnionVector> {
33
34 private Consumer[] delegates;
35 private Types.MinorType[] types;
36
37 /**
38 * Instantiate an AvroUnionConsumer.
39 */
40 public AvroUnionsConsumer(UnionVector vector, Consumer[] delegates, Types.MinorType[] types) {
41
42 super(vector);
43 this.delegates = delegates;
44 this.types = types;
45 }
46
47 @Override
48 public void consume(Decoder decoder) throws IOException {
49 int fieldIndex = decoder.readInt();
50
51 ensureInnerVectorCapacity(currentIndex + 1, fieldIndex);
52 Consumer delegate = delegates[fieldIndex];
53
54 vector.setType(currentIndex, types[fieldIndex]);
55 // In UnionVector we need to set sub vector writer position before consume a value
56 // because in the previous iterations we might not have written to the specific union sub vector.
57 delegate.setPosition(currentIndex);
58 delegate.consume(decoder);
59
60 currentIndex++;
61 }
62
63 @Override
64 public void close() throws Exception {
65 super.close();
66 AutoCloseables.close(delegates);
67 }
68
69 @Override
70 public boolean resetValueVector(UnionVector vector) {
71 for (int i = 0; i < delegates.length; i++) {
72 delegates[i].resetValueVector(vector.getChildrenFromFields().get(i));
73 }
74 return super.resetValueVector(vector);
75 }
76
77 void ensureInnerVectorCapacity(long targetCapacity, int fieldIndex) {
78 ValueVector fieldVector = vector.getChildrenFromFields().get(fieldIndex);
79 if (fieldVector.getMinorType() == Types.MinorType.NULL) {
80 return;
81 }
82 while (fieldVector.getValueCapacity() < targetCapacity) {
83 fieldVector.reAlloc();
84 }
85 }
86 }