]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowVectorIterator.java
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / java / adapter / avro / src / main / java / org / apache / arrow / AvroToArrowVectorIterator.java
CommitLineData
1d09f67e
TL
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.arrow;
19
20import java.io.EOFException;
21import java.util.ArrayList;
22import java.util.Iterator;
23import java.util.List;
24import java.util.stream.Collectors;
25
26import org.apache.arrow.consumers.CompositeAvroConsumer;
27import org.apache.arrow.util.Preconditions;
28import org.apache.arrow.vector.FieldVector;
29import org.apache.arrow.vector.VectorSchemaRoot;
30import org.apache.arrow.vector.types.pojo.Field;
31import org.apache.arrow.vector.util.ValueVectorUtility;
32import org.apache.avro.Schema;
33import org.apache.avro.io.Decoder;
34
35/**
36 * VectorSchemaRoot iterator for partially converting avro data.
37 */
38public class AvroToArrowVectorIterator implements Iterator<VectorSchemaRoot>, AutoCloseable {
39
40 public static final int NO_LIMIT_BATCH_SIZE = -1;
41 public static final int DEFAULT_BATCH_SIZE = 1024;
42
43 private final Decoder decoder;
44 private final Schema schema;
45
46 private final AvroToArrowConfig config;
47
48 private CompositeAvroConsumer compositeConsumer;
49
50 private org.apache.arrow.vector.types.pojo.Schema rootSchema;
51
52 private VectorSchemaRoot nextBatch;
53
54 private final int targetBatchSize;
55
56 /**
57 * Construct an instance.
58 */
59 private AvroToArrowVectorIterator(
60 Decoder decoder,
61 Schema schema,
62 AvroToArrowConfig config) {
63
64 this.decoder = decoder;
65 this.schema = schema;
66 this.config = config;
67 this.targetBatchSize = config.getTargetBatchSize();
68
69 }
70
71 /**
72 * Create a ArrowVectorIterator to partially convert data.
73 */
74 public static AvroToArrowVectorIterator create(
75 Decoder decoder,
76 Schema schema,
77 AvroToArrowConfig config) {
78
79 AvroToArrowVectorIterator iterator = new AvroToArrowVectorIterator(decoder, schema, config);
80 try {
81 iterator.initialize();
82 return iterator;
83 } catch (Exception e) {
84 iterator.close();
85 throw new RuntimeException("Error occurs while creating iterator.", e);
86 }
87 }
88
89 private void initialize() {
90 // create consumers
91 compositeConsumer = AvroToArrowUtils.createCompositeConsumer(schema, config);
92 List<FieldVector> vectors = new ArrayList<>();
93 compositeConsumer.getConsumers().forEach(c -> vectors.add(c.getVector()));
94 List<Field> fields = vectors.stream().map(t -> t.getField()).collect(Collectors.toList());
95 VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 0);
96 rootSchema = root.getSchema();
97
98 load(root);
99 }
100
101 private void consumeData(VectorSchemaRoot root) {
102 int readRowCount = 0;
103 try {
104 while ((targetBatchSize == NO_LIMIT_BATCH_SIZE || readRowCount < targetBatchSize)) {
105 compositeConsumer.consume(decoder);
106 readRowCount++;
107 }
108
109 if (targetBatchSize == NO_LIMIT_BATCH_SIZE) {
110 while (true) {
111 ValueVectorUtility.ensureCapacity(root, readRowCount + 1);
112 compositeConsumer.consume(decoder);
113 readRowCount++;
114 }
115 } else {
116 while (readRowCount < targetBatchSize) {
117 compositeConsumer.consume(decoder);
118 readRowCount++;
119 }
120 }
121
122 root.setRowCount(readRowCount);
123 } catch (EOFException eof) {
124 // reach the end of encoder stream.
125 root.setRowCount(readRowCount);
126 } catch (Exception e) {
127 compositeConsumer.close();
128 throw new RuntimeException("Error occurs while consuming data.", e);
129 }
130 }
131
132 // Loads the next schema root or null if no more rows are available.
133 private void load(VectorSchemaRoot root) {
134 final int targetBatchSize = config.getTargetBatchSize();
135 if (targetBatchSize != NO_LIMIT_BATCH_SIZE) {
136 ValueVectorUtility.preAllocate(root, targetBatchSize);
137 }
138
139 long validConsumerCount = compositeConsumer.getConsumers().stream().filter(c ->
140 !c.skippable()).count();
141 Preconditions.checkArgument(root.getFieldVectors().size() == validConsumerCount,
142 "Schema root vectors size not equals to consumers size.");
143
144 compositeConsumer.resetConsumerVectors(root);
145
146 // consume data
147 consumeData(root);
148
149 if (root.getRowCount() == 0) {
150 root.close();
151 nextBatch = null;
152 } else {
153 nextBatch = root;
154 }
155 }
156
157 @Override
158 public boolean hasNext() {
159 return nextBatch != null;
160 }
161
162 /**
163 * Gets the next vector. The user is responsible for freeing its resources.
164 */
165 public VectorSchemaRoot next() {
166 Preconditions.checkArgument(hasNext());
167 VectorSchemaRoot returned = nextBatch;
168 try {
169 load(VectorSchemaRoot.create(rootSchema, config.getAllocator()));
170 } catch (Exception e) {
171 returned.close();
172 throw new RuntimeException("Error occurs while getting next schema root.", e);
173 }
174 return returned;
175 }
176
177 /**
178 * Clean up resources.
179 */
180 public void close() {
181 if (nextBatch != null) {
182 nextBatch.close();
183 }
184 compositeConsumer.close();
185 }
186}