]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one or more | |
3 | * contributor license agreements. See the NOTICE file distributed with | |
4 | * this work for additional information regarding copyright ownership. | |
5 | * The ASF licenses this file to You under the Apache License, Version 2.0 | |
6 | * (the "License"); you may not use this file except in compliance with | |
7 | * the License. You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | */ | |
17 | ||
18 | package org.apache.arrow.vector.ipc; | |
19 | ||
20 | import java.io.IOException; | |
21 | import java.nio.ByteBuffer; | |
22 | import java.nio.channels.ReadableByteChannel; | |
23 | ||
24 | import org.apache.arrow.memory.ArrowBuf; | |
25 | import org.slf4j.Logger; | |
26 | import org.slf4j.LoggerFactory; | |
27 | ||
28 | /** | |
29 | * Adapter around {@link ReadableByteChannel} that reads into {@linkplain ArrowBuf}s. | |
30 | */ | |
31 | public class ReadChannel implements AutoCloseable { | |
32 | ||
33 | private static final Logger LOGGER = LoggerFactory.getLogger(ReadChannel.class); | |
34 | ||
35 | private ReadableByteChannel in; | |
36 | private long bytesRead = 0; | |
37 | ||
38 | public ReadChannel(ReadableByteChannel in) { | |
39 | this.in = in; | |
40 | } | |
41 | ||
42 | public long bytesRead() { | |
43 | return bytesRead; | |
44 | } | |
45 | ||
46 | /** | |
47 | * Reads bytes into buffer until it is full (buffer.remaining() == 0). Returns the | |
48 | * number of bytes read which can be less than full if there are no more. | |
49 | * | |
50 | * @param buffer The buffer to read to | |
51 | * @return the number of byte read | |
52 | * @throws IOException if nit enough bytes left to read | |
53 | */ | |
54 | public int readFully(ByteBuffer buffer) throws IOException { | |
55 | if (LOGGER.isDebugEnabled()) { | |
56 | LOGGER.debug("Reading buffer with size: {}", buffer.remaining()); | |
57 | } | |
58 | int totalRead = 0; | |
59 | while (buffer.remaining() != 0) { | |
60 | int read = in.read(buffer); | |
61 | if (read == -1) { | |
62 | this.bytesRead += totalRead; | |
63 | return totalRead; | |
64 | } | |
65 | totalRead += read; | |
66 | if (read == 0) { | |
67 | break; | |
68 | } | |
69 | } | |
70 | this.bytesRead += totalRead; | |
71 | return totalRead; | |
72 | } | |
73 | ||
74 | /** | |
75 | * Reads up to len into buffer. Returns bytes read. | |
76 | * | |
77 | * @param buffer the buffer to read to | |
78 | * @param length the amount of bytes to read | |
79 | * @return the number of bytes read | |
80 | * @throws IOException if nit enough bytes left to read | |
81 | */ | |
82 | public long readFully(ArrowBuf buffer, long length) throws IOException { | |
83 | boolean fullRead = true; | |
84 | long bytesLeft = length; | |
85 | while (fullRead && bytesLeft > 0) { | |
86 | int bytesToRead = (int) Math.min(bytesLeft, Integer.MAX_VALUE); | |
87 | int n = readFully(buffer.nioBuffer(buffer.writerIndex(), bytesToRead)); | |
88 | buffer.writerIndex(buffer.writerIndex() + n); | |
89 | fullRead = n == bytesToRead; | |
90 | bytesLeft -= n; | |
91 | } | |
92 | return length - bytesLeft; | |
93 | } | |
94 | ||
95 | @Override | |
96 | public void close() throws IOException { | |
97 | if (this.in != null) { | |
98 | in.close(); | |
99 | in = null; | |
100 | } | |
101 | } | |
102 | } |