]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one or more | |
3 | * contributor license agreements. See the NOTICE file distributed with | |
4 | * this work for additional information regarding copyright ownership. | |
5 | * The ASF licenses this file to You under the Apache License, Version 2.0 | |
6 | * (the "License"); you may not use this file except in compliance with | |
7 | * the License. You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | */ | |
17 | ||
18 | package org.apache.arrow.adapter.jdbc.consumer; | |
19 | ||
20 | import java.io.IOException; | |
21 | import java.sql.Array; | |
22 | import java.sql.ResultSet; | |
23 | import java.sql.SQLException; | |
24 | ||
25 | import org.apache.arrow.vector.ValueVector; | |
26 | import org.apache.arrow.vector.complex.ListVector; | |
27 | ||
28 | /** | |
29 | * Consumer which consume array type values from {@link ResultSet}. | |
30 | * Write the data to {@link org.apache.arrow.vector.complex.ListVector}. | |
31 | */ | |
32 | public abstract class ArrayConsumer extends BaseConsumer<ListVector> { | |
33 | ||
34 | /** | |
35 | * Creates a consumer for {@link ListVector}. | |
36 | */ | |
37 | public static ArrayConsumer createConsumer( | |
38 | ListVector vector, JdbcConsumer delegate, int index, boolean nullable) { | |
39 | if (nullable) { | |
40 | return new ArrayConsumer.NullableArrayConsumer(vector, delegate, index); | |
41 | } else { | |
42 | return new ArrayConsumer.NonNullableArrayConsumer(vector, delegate, index); | |
43 | } | |
44 | } | |
45 | ||
46 | protected final JdbcConsumer delegate; | |
47 | ||
48 | private final ValueVector innerVector; | |
49 | ||
50 | protected int innerVectorIndex = 0; | |
51 | ||
52 | /** | |
53 | * Instantiate a ArrayConsumer. | |
54 | */ | |
55 | public ArrayConsumer(ListVector vector, JdbcConsumer delegate, int index) { | |
56 | super(vector, index); | |
57 | this.delegate = delegate; | |
58 | this.innerVector = vector.getDataVector(); | |
59 | } | |
60 | ||
61 | @Override | |
62 | public void close() throws Exception { | |
63 | this.vector.close(); | |
64 | this.delegate.close(); | |
65 | } | |
66 | ||
67 | void ensureInnerVectorCapacity(int targetCapacity) { | |
68 | while (innerVector.getValueCapacity() < targetCapacity) { | |
69 | innerVector.reAlloc(); | |
70 | } | |
71 | } | |
72 | ||
73 | /** | |
74 | * Nullable consumer for {@link ListVector}. | |
75 | */ | |
76 | static class NullableArrayConsumer extends ArrayConsumer { | |
77 | ||
78 | /** | |
79 | * Instantiate a nullable array consumer. | |
80 | */ | |
81 | public NullableArrayConsumer(ListVector vector, JdbcConsumer delegate, int index) { | |
82 | super(vector, delegate, index); | |
83 | } | |
84 | ||
85 | @Override | |
86 | public void consume(ResultSet resultSet) throws SQLException, IOException { | |
87 | final Array array = resultSet.getArray(columnIndexInResultSet); | |
88 | if (!resultSet.wasNull()) { | |
89 | vector.startNewValue(currentIndex); | |
90 | int count = 0; | |
91 | try (ResultSet rs = array.getResultSet()) { | |
92 | while (rs.next()) { | |
93 | ensureInnerVectorCapacity(innerVectorIndex + count + 1); | |
94 | delegate.consume(rs); | |
95 | count++; | |
96 | } | |
97 | } | |
98 | vector.endValue(currentIndex, count); | |
99 | innerVectorIndex += count; | |
100 | } | |
101 | currentIndex++; | |
102 | } | |
103 | } | |
104 | ||
105 | /** | |
106 | * Non-nullable consumer for {@link ListVector}. | |
107 | */ | |
108 | static class NonNullableArrayConsumer extends ArrayConsumer { | |
109 | ||
110 | /** | |
111 | * Instantiate a nullable array consumer. | |
112 | */ | |
113 | public NonNullableArrayConsumer(ListVector vector, JdbcConsumer delegate, int index) { | |
114 | super(vector, delegate, index); | |
115 | } | |
116 | ||
117 | @Override | |
118 | public void consume(ResultSet resultSet) throws SQLException, IOException { | |
119 | final Array array = resultSet.getArray(columnIndexInResultSet); | |
120 | vector.startNewValue(currentIndex); | |
121 | int count = 0; | |
122 | try (ResultSet rs = array.getResultSet()) { | |
123 | while (rs.next()) { | |
124 | ensureInnerVectorCapacity(innerVectorIndex + count + 1); | |
125 | delegate.consume(rs); | |
126 | count++; | |
127 | } | |
128 | } | |
129 | vector.endValue(currentIndex, count); | |
130 | innerVectorIndex += count; | |
131 | currentIndex++; | |
132 | } | |
133 | } | |
134 | } |