]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumer.java
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / java / adapter / jdbc / src / main / java / org / apache / arrow / adapter / jdbc / consumer / BinaryConsumer.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.arrow.adapter.jdbc.consumer;
19
20 import java.io.IOException;
21 import java.io.InputStream;
22 import java.sql.ResultSet;
23 import java.sql.SQLException;
24
25 import org.apache.arrow.memory.ArrowBuf;
26 import org.apache.arrow.vector.BitVectorHelper;
27 import org.apache.arrow.vector.VarBinaryVector;
28
29 /**
30 * Consumer which consume binary type values from {@link ResultSet}.
31 * Write the data to {@link org.apache.arrow.vector.VarBinaryVector}.
32 */
33 public abstract class BinaryConsumer extends BaseConsumer<VarBinaryVector> {
34
35 /**
36 * Creates a consumer for {@link VarBinaryVector}.
37 */
38 public static BinaryConsumer createConsumer(VarBinaryVector vector, int index, boolean nullable) {
39 if (nullable) {
40 return new NullableBinaryConsumer(vector, index);
41 } else {
42 return new NonNullableBinaryConsumer(vector, index);
43 }
44 }
45
46 private final byte[] reuseBytes = new byte[1024];
47
48 /**
49 * Instantiate a BinaryConsumer.
50 */
51 public BinaryConsumer(VarBinaryVector vector, int index) {
52 super(vector, index);
53 if (vector != null) {
54 vector.allocateNewSafe();
55 }
56 }
57
58 /**
59 * consume a InputStream.
60 */
61 public void consume(InputStream is) throws IOException {
62 if (is != null) {
63 while (currentIndex >= vector.getValueCapacity()) {
64 vector.reallocValidityAndOffsetBuffers();
65 }
66 final int startOffset = vector.getStartOffset(currentIndex);
67 final ArrowBuf offsetBuffer = vector.getOffsetBuffer();
68 int dataLength = 0;
69 int read;
70 while ((read = is.read(reuseBytes)) != -1) {
71 while (vector.getDataBuffer().capacity() < (startOffset + dataLength + read)) {
72 vector.reallocDataBuffer();
73 }
74 vector.getDataBuffer().setBytes(startOffset + dataLength, reuseBytes, 0, read);
75 dataLength += read;
76 }
77 offsetBuffer.setInt((currentIndex + 1) * VarBinaryVector.OFFSET_WIDTH, startOffset + dataLength);
78 BitVectorHelper.setBit(vector.getValidityBuffer(), currentIndex);
79 vector.setLastSet(currentIndex);
80 }
81 }
82
83 public void moveWriterPosition() {
84 currentIndex++;
85 }
86
87 @Override
88 public void resetValueVector(VarBinaryVector vector) {
89 this.vector = vector;
90 this.vector.allocateNewSafe();
91 this.currentIndex = 0;
92 }
93
94 /**
95 * Consumer for nullable binary data.
96 */
97 static class NullableBinaryConsumer extends BinaryConsumer {
98
99 /**
100 * Instantiate a BinaryConsumer.
101 */
102 public NullableBinaryConsumer(VarBinaryVector vector, int index) {
103 super(vector, index);
104 }
105
106 @Override
107 public void consume(ResultSet resultSet) throws SQLException, IOException {
108 InputStream is = resultSet.getBinaryStream(columnIndexInResultSet);
109 if (!resultSet.wasNull()) {
110 consume(is);
111 }
112 moveWriterPosition();
113 }
114 }
115
116 /**
117 * Consumer for non-nullable binary data.
118 */
119 static class NonNullableBinaryConsumer extends BinaryConsumer {
120
121 /**
122 * Instantiate a BinaryConsumer.
123 */
124 public NonNullableBinaryConsumer(VarBinaryVector vector, int index) {
125 super(vector, index);
126 }
127
128 @Override
129 public void consume(ResultSet resultSet) throws SQLException, IOException {
130 InputStream is = resultSet.getBinaryStream(columnIndexInResultSet);
131 consume(is);
132 moveWriterPosition();
133 }
134 }
135 }