2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org
.apache
.arrow
.adapter
.jdbc
.consumer
;
20 import java
.io
.IOException
;
21 import java
.io
.InputStream
;
22 import java
.sql
.ResultSet
;
23 import java
.sql
.SQLException
;
25 import org
.apache
.arrow
.memory
.ArrowBuf
;
26 import org
.apache
.arrow
.vector
.BitVectorHelper
;
27 import org
.apache
.arrow
.vector
.VarBinaryVector
;
30 * Consumer which consume binary type values from {@link ResultSet}.
31 * Write the data to {@link org.apache.arrow.vector.VarBinaryVector}.
33 public abstract class BinaryConsumer
extends BaseConsumer
<VarBinaryVector
> {
36 * Creates a consumer for {@link VarBinaryVector}.
38 public static BinaryConsumer
createConsumer(VarBinaryVector vector
, int index
, boolean nullable
) {
40 return new NullableBinaryConsumer(vector
, index
);
42 return new NonNullableBinaryConsumer(vector
, index
);
46 private final byte[] reuseBytes
= new byte[1024];
49 * Instantiate a BinaryConsumer.
51 public BinaryConsumer(VarBinaryVector vector
, int index
) {
54 vector
.allocateNewSafe();
59 * consume a InputStream.
61 public void consume(InputStream is
) throws IOException
{
63 while (currentIndex
>= vector
.getValueCapacity()) {
64 vector
.reallocValidityAndOffsetBuffers();
66 final int startOffset
= vector
.getStartOffset(currentIndex
);
67 final ArrowBuf offsetBuffer
= vector
.getOffsetBuffer();
70 while ((read
= is
.read(reuseBytes
)) != -1) {
71 while (vector
.getDataBuffer().capacity() < (startOffset
+ dataLength
+ read
)) {
72 vector
.reallocDataBuffer();
74 vector
.getDataBuffer().setBytes(startOffset
+ dataLength
, reuseBytes
, 0, read
);
77 offsetBuffer
.setInt((currentIndex
+ 1) * VarBinaryVector
.OFFSET_WIDTH
, startOffset
+ dataLength
);
78 BitVectorHelper
.setBit(vector
.getValidityBuffer(), currentIndex
);
79 vector
.setLastSet(currentIndex
);
83 public void moveWriterPosition() {
88 public void resetValueVector(VarBinaryVector vector
) {
90 this.vector
.allocateNewSafe();
91 this.currentIndex
= 0;
95 * Consumer for nullable binary data.
97 static class NullableBinaryConsumer
extends BinaryConsumer
{
100 * Instantiate a BinaryConsumer.
102 public NullableBinaryConsumer(VarBinaryVector vector
, int index
) {
103 super(vector
, index
);
107 public void consume(ResultSet resultSet
) throws SQLException
, IOException
{
108 InputStream is
= resultSet
.getBinaryStream(columnIndexInResultSet
);
109 if (!resultSet
.wasNull()) {
112 moveWriterPosition();
117 * Consumer for non-nullable binary data.
119 static class NonNullableBinaryConsumer
extends BinaryConsumer
{
122 * Instantiate a BinaryConsumer.
124 public NonNullableBinaryConsumer(VarBinaryVector vector
, int index
) {
125 super(vector
, index
);
129 public void consume(ResultSet resultSet
) throws SQLException
, IOException
{
130 InputStream is
= resultSet
.getBinaryStream(columnIndexInResultSet
);
132 moveWriterPosition();