]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageChannelReader.java
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / java / vector / src / main / java / org / apache / arrow / vector / ipc / message / MessageChannelReader.java
CommitLineData
1d09f67e
TL
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.arrow.vector.ipc.message;
19
20import java.io.IOException;
21
22import org.apache.arrow.flatbuf.Message;
23import org.apache.arrow.memory.ArrowBuf;
24import org.apache.arrow.memory.BufferAllocator;
25import org.apache.arrow.vector.ipc.ReadChannel;
26
27/**
28 * Reads a sequence of messages using a ReadChannel.
29 */
30public class MessageChannelReader implements AutoCloseable {
31 protected ReadChannel in;
32 protected BufferAllocator allocator;
33
34 /**
35 * Construct a MessageReader to read streaming messages from an existing ReadChannel.
36 *
37 * @param in Channel to read messages from
38 * @param allocator BufferAllocator used to read Message body into an ArrowBuf.
39 */
40 public MessageChannelReader(ReadChannel in, BufferAllocator allocator) {
41 this.in = in;
42 this.allocator = allocator;
43 }
44
45 /**
46 * Read a message from the ReadChannel and return a MessageResult containing the Message
47 * metadata and optional message body data. Once the end-of-stream has been reached, a null
48 * value will be returned. If the message has no body, then MessageResult.getBodyBuffer()
49 * returns null.
50 *
51 * @return MessageResult or null if reached end-of-stream
52 * @throws IOException on error
53 */
54 public MessageResult readNext() throws IOException {
55
56 // Read the flatbuf message and check for end-of-stream
57 MessageMetadataResult result = MessageSerializer.readMessage(in);
58 if (result == null) {
59 return null;
60 }
61 Message message = result.getMessage();
62 ArrowBuf bodyBuffer = null;
63
64 // Read message body data if defined in message
65 if (result.messageHasBody()) {
66 long bodyLength = result.getMessageBodyLength();
67 bodyBuffer = MessageSerializer.readMessageBody(in, bodyLength, allocator);
68 }
69
70 return new MessageResult(message, bodyBuffer);
71 }
72
73 /**
74 * Get the number of bytes read from the ReadChannel.
75 *
76 * @return number of bytes
77 */
78 public long bytesRead() {
79 return in.bytesRead();
80 }
81
82 /**
83 * Close the ReadChannel.
84 *
85 * @throws IOException on error
86 */
87 @Override
88 public void close() throws IOException {
89 in.close();
90 }
91}