]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/ArrayConsumer.java
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / java / adapter / jdbc / src / main / java / org / apache / arrow / adapter / jdbc / consumer / ArrayConsumer.java
CommitLineData
1d09f67e
TL
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.arrow.adapter.jdbc.consumer;
19
20import java.io.IOException;
21import java.sql.Array;
22import java.sql.ResultSet;
23import java.sql.SQLException;
24
25import org.apache.arrow.vector.ValueVector;
26import org.apache.arrow.vector.complex.ListVector;
27
28/**
29 * Consumer which consume array type values from {@link ResultSet}.
30 * Write the data to {@link org.apache.arrow.vector.complex.ListVector}.
31 */
32public abstract class ArrayConsumer extends BaseConsumer<ListVector> {
33
34 /**
35 * Creates a consumer for {@link ListVector}.
36 */
37 public static ArrayConsumer createConsumer(
38 ListVector vector, JdbcConsumer delegate, int index, boolean nullable) {
39 if (nullable) {
40 return new ArrayConsumer.NullableArrayConsumer(vector, delegate, index);
41 } else {
42 return new ArrayConsumer.NonNullableArrayConsumer(vector, delegate, index);
43 }
44 }
45
46 protected final JdbcConsumer delegate;
47
48 private final ValueVector innerVector;
49
50 protected int innerVectorIndex = 0;
51
52 /**
53 * Instantiate a ArrayConsumer.
54 */
55 public ArrayConsumer(ListVector vector, JdbcConsumer delegate, int index) {
56 super(vector, index);
57 this.delegate = delegate;
58 this.innerVector = vector.getDataVector();
59 }
60
61 @Override
62 public void close() throws Exception {
63 this.vector.close();
64 this.delegate.close();
65 }
66
67 void ensureInnerVectorCapacity(int targetCapacity) {
68 while (innerVector.getValueCapacity() < targetCapacity) {
69 innerVector.reAlloc();
70 }
71 }
72
73 /**
74 * Nullable consumer for {@link ListVector}.
75 */
76 static class NullableArrayConsumer extends ArrayConsumer {
77
78 /**
79 * Instantiate a nullable array consumer.
80 */
81 public NullableArrayConsumer(ListVector vector, JdbcConsumer delegate, int index) {
82 super(vector, delegate, index);
83 }
84
85 @Override
86 public void consume(ResultSet resultSet) throws SQLException, IOException {
87 final Array array = resultSet.getArray(columnIndexInResultSet);
88 if (!resultSet.wasNull()) {
89 vector.startNewValue(currentIndex);
90 int count = 0;
91 try (ResultSet rs = array.getResultSet()) {
92 while (rs.next()) {
93 ensureInnerVectorCapacity(innerVectorIndex + count + 1);
94 delegate.consume(rs);
95 count++;
96 }
97 }
98 vector.endValue(currentIndex, count);
99 innerVectorIndex += count;
100 }
101 currentIndex++;
102 }
103 }
104
105 /**
106 * Non-nullable consumer for {@link ListVector}.
107 */
108 static class NonNullableArrayConsumer extends ArrayConsumer {
109
110 /**
111 * Instantiate a nullable array consumer.
112 */
113 public NonNullableArrayConsumer(ListVector vector, JdbcConsumer delegate, int index) {
114 super(vector, delegate, index);
115 }
116
117 @Override
118 public void consume(ResultSet resultSet) throws SQLException, IOException {
119 final Array array = resultSet.getArray(columnIndexInResultSet);
120 vector.startNewValue(currentIndex);
121 int count = 0;
122 try (ResultSet rs = array.getResultSet()) {
123 while (rs.next()) {
124 ensureInnerVectorCapacity(innerVectorIndex + count + 1);
125 delegate.consume(rs);
126 count++;
127 }
128 }
129 vector.endValue(currentIndex, count);
130 innerVectorIndex += count;
131 currentIndex++;
132 }
133 }
134}