]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / java / vector / src / main / java / org / apache / arrow / vector / ipc / message / MessageSerializer.java
CommitLineData
1d09f67e
TL
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.arrow.vector.ipc.message;
19
20import static org.apache.arrow.memory.util.LargeMemoryUtil.checkedCastToInt;
21
22import java.io.IOException;
23import java.nio.ByteBuffer;
24import java.util.ArrayList;
25import java.util.List;
26
27import org.apache.arrow.flatbuf.Buffer;
28import org.apache.arrow.flatbuf.DictionaryBatch;
29import org.apache.arrow.flatbuf.FieldNode;
30import org.apache.arrow.flatbuf.Message;
31import org.apache.arrow.flatbuf.MessageHeader;
32import org.apache.arrow.flatbuf.MetadataVersion;
33import org.apache.arrow.flatbuf.RecordBatch;
34import org.apache.arrow.memory.ArrowBuf;
35import org.apache.arrow.memory.BufferAllocator;
36import org.apache.arrow.util.Preconditions;
37import org.apache.arrow.vector.compression.NoCompressionCodec;
38import org.apache.arrow.vector.ipc.ReadChannel;
39import org.apache.arrow.vector.ipc.WriteChannel;
40import org.apache.arrow.vector.types.pojo.Schema;
41
42import com.google.flatbuffers.FlatBufferBuilder;
43
44/**
45 * Utility class for serializing Messages. Messages are all serialized a similar way.
46 * 1. 4 byte little endian message header prefix
47 * 2. FB serialized Message: This includes it the body length, which is the serialized
48 * body and the type of the message.
49 * 3. Serialized message.
50 *
51 * <p>For schema messages, the serialization is simply the FB serialized Schema.
52 *
53 * <p>For RecordBatch messages the serialization is:
54 * 1. 4 byte little endian batch metadata header
55 * 2. FB serialized RowBatch
56 * 3. Padding to align to 8 byte boundary.
57 * 4. serialized RowBatch buffers.
58 */
59public class MessageSerializer {
60
61 // This 0xFFFFFFFF value is the first 4 bytes of a valid IPC message
62 public static final int IPC_CONTINUATION_TOKEN = -1;
63
64 /**
65 * Convert an array of 4 bytes in little-endian to an native-endian i32 value.
66 *
67 * @param bytes byte array with minimum length of 4 in little-endian
68 * @return converted an native-endian 32-bit integer
69 */
70 public static int bytesToInt(byte[] bytes) {
71 return ((bytes[3] & 255) << 24) +
72 ((bytes[2] & 255) << 16) +
73 ((bytes[1] & 255) << 8) +
74 ((bytes[0] & 255));
75 }
76
77 /**
78 * Convert an integer to a little endian 4 byte array.
79 *
80 * @param value integer value input
81 * @param bytes existing byte array with minimum length of 4 to contain the conversion output
82 */
83 public static void intToBytes(int value, byte[] bytes) {
84 bytes[3] = (byte) (value >>> 24);
85 bytes[2] = (byte) (value >>> 16);
86 bytes[1] = (byte) (value >>> 8);
87 bytes[0] = (byte) (value);
88 }
89
90 /**
91 * Convert a long to a little-endian 8 byte array.
92 *
93 * @param value long value input
94 * @param bytes existing byte array with minimum length of 8 to contain the conversion output
95 */
96 public static void longToBytes(long value, byte[] bytes) {
97 bytes[7] = (byte) (value >>> 56);
98 bytes[6] = (byte) (value >>> 48);
99 bytes[5] = (byte) (value >>> 40);
100 bytes[4] = (byte) (value >>> 32);
101 bytes[3] = (byte) (value >>> 24);
102 bytes[2] = (byte) (value >>> 16);
103 bytes[1] = (byte) (value >>> 8);
104 bytes[0] = (byte) (value);
105 }
106
107 public static int writeMessageBuffer(WriteChannel out, int messageLength, ByteBuffer messageBuffer)
108 throws IOException {
109 return writeMessageBuffer(out, messageLength, messageBuffer, IpcOption.DEFAULT);
110 }
111
112 /**
113 * Write the serialized Message metadata, prefixed by the length, to the output Channel. This
114 * ensures that it aligns to an 8 byte boundary and will adjust the message length to include
115 * any padding used for alignment.
116 *
117 * @param out Output Channel
118 * @param messageLength Number of bytes in the message buffer, written as little Endian prefix
119 * @param messageBuffer Message metadata buffer to be written, this does not include any
120 * message body data which should be subsequently written to the Channel
121 * @param option IPC write options
122 * @return Number of bytes written
123 * @throws IOException on error
124 */
125 public static int writeMessageBuffer(WriteChannel out, int messageLength, ByteBuffer messageBuffer, IpcOption option)
126 throws IOException {
127
128 // if write the pre-0.15.0 encapsulated IPC message format consisting of a 4-byte prefix instead of 8 byte
129 int prefixSize = option.write_legacy_ipc_format ? 4 : 8;
130
131 // ensure that message aligns to 8 byte padding - prefix_size bytes, then message body
132 if ((messageLength + prefixSize ) % 8 != 0) {
133 messageLength += 8 - (messageLength + prefixSize) % 8;
134 }
135 if (!option.write_legacy_ipc_format) {
136 out.writeIntLittleEndian(IPC_CONTINUATION_TOKEN);
137 }
138 out.writeIntLittleEndian(messageLength);
139 out.write(messageBuffer);
140 out.align();
141
142 // any bytes written are already captured by our size modification above
143 return messageLength + prefixSize;
144 }
145
146 /**
147 * Serialize a schema object.
148 */
149 public static long serialize(WriteChannel out, Schema schema) throws IOException {
150 return serialize(out, schema, IpcOption.DEFAULT);
151 }
152
153 /**
154 * Serialize a schema object.
155 *
156 * @param out where to write the schema
157 * @param schema the object to serialize to out
158 * @return the number of bytes written
159 * @throws IOException if something went wrong
160 */
161 public static long serialize(WriteChannel out, Schema schema, IpcOption option) throws IOException {
162 long start = out.getCurrentPosition();
163 Preconditions.checkArgument(start % 8 == 0, "out is not aligned");
164
165 ByteBuffer serializedMessage = serializeMetadata(schema, option);
166
167 int messageLength = serializedMessage.remaining();
168
169 int bytesWritten = writeMessageBuffer(out, messageLength, serializedMessage, option);
170 Preconditions.checkArgument(bytesWritten % 8 == 0, "out is not aligned");
171 return bytesWritten;
172 }
173
174 /**
175 * Returns the serialized flatbuffer bytes of the schema wrapped in a message table.
176 */
177 @Deprecated
178 public static ByteBuffer serializeMetadata(Schema schema) {
179 return serializeMetadata(schema, IpcOption.DEFAULT);
180 }
181
182 /**
183 * Returns the serialized flatbuffer bytes of the schema wrapped in a message table.
184 */
185 public static ByteBuffer serializeMetadata(Schema schema, IpcOption writeOption) {
186 FlatBufferBuilder builder = new FlatBufferBuilder();
187 int schemaOffset = schema.getSchema(builder);
188 return MessageSerializer.serializeMessage(builder, org.apache.arrow.flatbuf.MessageHeader.Schema, schemaOffset, 0,
189 writeOption);
190 }
191
192 /**
193 * Deserializes an Arrow Schema object from a schema message. Format is from serialize().
194 *
195 * @param schemaMessage a Message of type MessageHeader.Schema
196 * @return the deserialized Arrow Schema
197 */
198 public static Schema deserializeSchema(Message schemaMessage) {
199 Preconditions.checkArgument(schemaMessage.headerType() == MessageHeader.Schema,
200 "Expected schema but result was: %s", schemaMessage.headerType());
201 return Schema.convertSchema((org.apache.arrow.flatbuf.Schema)
202 schemaMessage.header(new org.apache.arrow.flatbuf.Schema()));
203 }
204
205 /**
206 * Deserializes an Arrow Schema read from the input channel. Format is from serialize().
207 *
208 * @param in the channel to deserialize from
209 * @return the deserialized Arrow Schema
210 * @throws IOException if something went wrong
211 */
212 public static Schema deserializeSchema(ReadChannel in) throws IOException {
213 MessageMetadataResult result = readMessage(in);
214 if (result == null) {
215 throw new IOException("Unexpected end of input when reading Schema");
216 }
217 if (result.getMessage().headerType() != MessageHeader.Schema) {
218 throw new IOException("Expected schema but header was " + result.getMessage().headerType());
219 }
220 return deserializeSchema(result);
221 }
222
223 /**
224 * Deserializes an Arrow Schema object from a {@link MessageMetadataResult}. Format is from serialize().
225 *
226 * @param message a Message of type MessageHeader.Schema
227 * @return the deserialized Arrow Schema
228 */
229 public static Schema deserializeSchema(MessageMetadataResult message) {
230 return deserializeSchema(message.getMessage());
231 }
232
233 /**
234 * Serializes an ArrowRecordBatch. Returns the offset and length of the written batch.
235 */
236 public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch) throws IOException {
237 return serialize(out, batch, IpcOption.DEFAULT);
238 }
239
240 /**
241 * Serializes an ArrowRecordBatch. Returns the offset and length of the written batch.
242 *
243 * @param out where to write the batch
244 * @param batch the object to serialize to out
245 * @return the serialized block metadata
246 * @throws IOException if something went wrong
247 */
248 public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch, IpcOption option) throws IOException {
249
250 long start = out.getCurrentPosition();
251 long bodyLength = batch.computeBodyLength();
252 Preconditions.checkArgument(bodyLength % 8 == 0, "batch is not aligned");
253
254 ByteBuffer serializedMessage = serializeMetadata(batch, option);
255
256 int metadataLength = serializedMessage.remaining();
257
258 int prefixSize = 4;
259 if (!option.write_legacy_ipc_format) {
260 out.writeIntLittleEndian(IPC_CONTINUATION_TOKEN);
261 prefixSize = 8;
262 }
263
264 // calculate alignment bytes so that metadata length points to the correct location after alignment
265 int padding = (int) ((start + metadataLength + prefixSize) % 8);
266 if (padding != 0) {
267 metadataLength += (8 - padding);
268 }
269
270 out.writeIntLittleEndian(metadataLength);
271 out.write(serializedMessage);
272
273 // Align the output to 8 byte boundary.
274 out.align();
275
276 long bufferLength = writeBatchBuffers(out, batch);
277 Preconditions.checkArgument(bufferLength % 8 == 0, "out is not aligned");
278
279 // Metadata size in the Block account for the size prefix
280 return new ArrowBlock(start, metadataLength + prefixSize, bufferLength);
281 }
282
283 /**
284 * Write the Arrow buffers of the record batch to the output channel.
285 *
286 * @param out the output channel to write the buffers to
287 * @param batch an ArrowRecordBatch containing buffers to be written
288 * @return the number of bytes written
289 * @throws IOException on error
290 */
291 public static long writeBatchBuffers(WriteChannel out, ArrowRecordBatch batch) throws IOException {
292 long bufferStart = out.getCurrentPosition();
293 List<ArrowBuf> buffers = batch.getBuffers();
294 List<ArrowBuffer> buffersLayout = batch.getBuffersLayout();
295
296 for (int i = 0; i < buffers.size(); i++) {
297 ArrowBuf buffer = buffers.get(i);
298 ArrowBuffer layout = buffersLayout.get(i);
299 long startPosition = bufferStart + layout.getOffset();
300 if (startPosition != out.getCurrentPosition()) {
301 out.writeZeros(startPosition - out.getCurrentPosition());
302 }
303 out.write(buffer);
304 if (out.getCurrentPosition() != startPosition + layout.getSize()) {
305 throw new IllegalStateException("wrong buffer size: " + out.getCurrentPosition() +
306 " != " + startPosition + layout.getSize());
307 }
308 }
309 out.align();
310 return out.getCurrentPosition() - bufferStart;
311 }
312
313 /**
314 * Returns the serialized form of {@link RecordBatch} wrapped in a {@link org.apache.arrow.flatbuf.Message}.
315 */
316 @Deprecated
317 public static ByteBuffer serializeMetadata(ArrowMessage message) {
318 return serializeMetadata(message, IpcOption.DEFAULT);
319 }
320
321 /**
322 * Returns the serialized form of {@link RecordBatch} wrapped in a {@link org.apache.arrow.flatbuf.Message}.
323 */
324 public static ByteBuffer serializeMetadata(ArrowMessage message, IpcOption writeOption) {
325 FlatBufferBuilder builder = new FlatBufferBuilder();
326 int batchOffset = message.writeTo(builder);
327 return serializeMessage(builder, message.getMessageType(), batchOffset,
328 message.computeBodyLength(), writeOption);
329 }
330
331 /**
332 * Deserializes an ArrowRecordBatch from a record batch message and data in an ArrowBuf.
333 *
334 * @param recordBatchMessage a Message of type MessageHeader.RecordBatch
335 * @param bodyBuffer Arrow buffer containing the RecordBatch data
336 * @return the deserialized ArrowRecordBatch
337 * @throws IOException if something went wrong
338 */
339 public static ArrowRecordBatch deserializeRecordBatch(Message recordBatchMessage, ArrowBuf bodyBuffer)
340 throws IOException {
341 RecordBatch recordBatchFB = (RecordBatch) recordBatchMessage.header(new RecordBatch());
342 return deserializeRecordBatch(recordBatchFB, bodyBuffer);
343 }
344
345 /**
346 * Deserializes an ArrowRecordBatch read from the input channel. This uses the given allocator
347 * to create an ArrowBuf for the batch body data.
348 *
349 * @param in Channel to read a RecordBatch message and data from
350 * @param allocator BufferAllocator to allocate an Arrow buffer to read message body data
351 * @return the deserialized ArrowRecordBatch
352 * @throws IOException on error
353 */
354 public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in, BufferAllocator allocator) throws IOException {
355 MessageMetadataResult result = readMessage(in);
356 if (result == null) {
357 throw new IOException("Unexpected end of input when reading a RecordBatch");
358 }
359 if (result.getMessage().headerType() != MessageHeader.RecordBatch) {
360 throw new IOException("Expected RecordBatch but header was " + result.getMessage().headerType());
361 }
362 long bodyLength = result.getMessageBodyLength();
363 ArrowBuf bodyBuffer = readMessageBody(in, bodyLength, allocator);
364 return deserializeRecordBatch(result.getMessage(), bodyBuffer);
365 }
366
367 /**
368 * Deserializes an ArrowRecordBatch knowing the size of the entire message up front. This
369 * minimizes the number of reads to the underlying stream.
370 *
371 * @param in the channel to deserialize from
372 * @param block the object to deserialize to
373 * @param alloc to allocate buffers
374 * @return the deserialized ArrowRecordBatch
375 * @throws IOException if something went wrong
376 */
377 public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in, ArrowBlock block, BufferAllocator alloc)
378 throws IOException {
379 // Metadata length contains prefix_size bytes plus byte padding
380 long totalLen = block.getMetadataLength() + block.getBodyLength();
381
382 ArrowBuf buffer = alloc.buffer(totalLen);
383 if (in.readFully(buffer, totalLen) != totalLen) {
384 throw new IOException("Unexpected end of input trying to read batch.");
385 }
386
387 int prefixSize = buffer.getInt(0) == IPC_CONTINUATION_TOKEN ? 8 : 4;
388
389 ArrowBuf metadataBuffer = buffer.slice(prefixSize, block.getMetadataLength() - prefixSize);
390
391 Message messageFB =
392 Message.getRootAsMessage(metadataBuffer.nioBuffer().asReadOnlyBuffer());
393
394 RecordBatch recordBatchFB = (RecordBatch) messageFB.header(new RecordBatch());
395
396 // Now read the body
397 final ArrowBuf body = buffer.slice(block.getMetadataLength(),
398 totalLen - block.getMetadataLength());
399 return deserializeRecordBatch(recordBatchFB, body);
400 }
401
402 /**
403 * Deserializes an ArrowRecordBatch given the Flatbuffer metadata and in-memory body.
404 *
405 * @param recordBatchFB Deserialized FlatBuffer record batch
406 * @param body Read body of the record batch
407 * @return ArrowRecordBatch from metadata and in-memory body
408 * @throws IOException on error
409 */
410 public static ArrowRecordBatch deserializeRecordBatch(RecordBatch recordBatchFB, ArrowBuf body) throws IOException {
411 // Now read the body
412 int nodesLength = recordBatchFB.nodesLength();
413 List<ArrowFieldNode> nodes = new ArrayList<>();
414 for (int i = 0; i < nodesLength; ++i) {
415 FieldNode node = recordBatchFB.nodes(i);
416 if ((int) node.length() != node.length() ||
417 (int) node.nullCount() != node.nullCount()) {
418 throw new IOException("Cannot currently deserialize record batches with " +
419 "node length larger than INT_MAX records.");
420 }
421 nodes.add(new ArrowFieldNode(node.length(), node.nullCount()));
422 }
423 List<ArrowBuf> buffers = new ArrayList<>();
424 for (int i = 0; i < recordBatchFB.buffersLength(); ++i) {
425 Buffer bufferFB = recordBatchFB.buffers(i);
426 ArrowBuf vectorBuffer = body.slice(bufferFB.offset(), bufferFB.length());
427 buffers.add(vectorBuffer);
428 }
429
430 ArrowBodyCompression bodyCompression = recordBatchFB.compression() == null ?
431 NoCompressionCodec.DEFAULT_BODY_COMPRESSION
432 : new ArrowBodyCompression(recordBatchFB.compression().codec(), recordBatchFB.compression().method());
433
434 if ((int) recordBatchFB.length() != recordBatchFB.length()) {
435 throw new IOException("Cannot currently deserialize record batches with more than INT_MAX records.");
436 }
437 ArrowRecordBatch arrowRecordBatch =
438 new ArrowRecordBatch(checkedCastToInt(recordBatchFB.length()), nodes, buffers, bodyCompression);
439 body.getReferenceManager().release();
440 return arrowRecordBatch;
441 }
442
443 /**
444 * Reads a record batch based on the metadata in serializedMessage and the underlying data buffer.
445 */
446 public static ArrowRecordBatch deserializeRecordBatch(MessageMetadataResult serializedMessage,
447 ArrowBuf underlying) throws
448 IOException {
449 return deserializeRecordBatch(serializedMessage.getMessage(), underlying);
450 }
451
452 public static ArrowBlock serialize(WriteChannel out, ArrowDictionaryBatch batch) throws IOException {
453 return serialize(out, batch, IpcOption.DEFAULT);
454 }
455
456 /**
457 * Serializes a dictionary ArrowRecordBatch. Returns the offset and length of the written batch.
458 *
459 * @param out where to serialize
460 * @param batch the batch to serialize
461 * @param option options for IPC
462 * @return the metadata of the serialized block
463 * @throws IOException if something went wrong
464 */
465 public static ArrowBlock serialize(WriteChannel out, ArrowDictionaryBatch batch, IpcOption option)
466 throws IOException {
467 long start = out.getCurrentPosition();
468
469 long bodyLength = batch.computeBodyLength();
470 Preconditions.checkArgument(bodyLength % 8 == 0, "batch is not aligned");
471
472 ByteBuffer serializedMessage = serializeMetadata(batch, option);
473
474 int metadataLength = serializedMessage.remaining();
475
476 int prefixSize = 4;
477 if (!option.write_legacy_ipc_format) {
478 out.writeIntLittleEndian(IPC_CONTINUATION_TOKEN);
479 prefixSize = 8;
480 }
481
482 // calculate alignment bytes so that metadata length points to the correct location after alignment
483 int padding = (int) ((start + metadataLength + prefixSize) % 8);
484 if (padding != 0) {
485 metadataLength += (8 - padding);
486 }
487
488 out.writeIntLittleEndian(metadataLength);
489 out.write(serializedMessage);
490
491 // Align the output to 8 byte boundary.
492 out.align();
493
494 // write the embedded record batch
495 long bufferLength = writeBatchBuffers(out, batch.getDictionary());
496 Preconditions.checkArgument(bufferLength % 8 == 0, "out is not aligned");
497
498 // Metadata size in the Block account for the size prefix
499 return new ArrowBlock(start, metadataLength + prefixSize, bufferLength);
500 }
501
502 /**
503 * Deserializes an ArrowDictionaryBatch from a dictionary batch Message and data in an ArrowBuf.
504 *
505 * @param message a message of type MessageHeader.DictionaryBatch
506 * @param bodyBuffer Arrow buffer containing the DictionaryBatch data
507 * of type MessageHeader.DictionaryBatch
508 * @return the deserialized ArrowDictionaryBatch
509 * @throws IOException if something went wrong
510 */
511 public static ArrowDictionaryBatch deserializeDictionaryBatch(Message message, ArrowBuf bodyBuffer)
512 throws IOException {
513 DictionaryBatch dictionaryBatchFB = (DictionaryBatch) message.header(new DictionaryBatch());
514 ArrowRecordBatch recordBatch = deserializeRecordBatch(dictionaryBatchFB.data(), bodyBuffer);
515 return new ArrowDictionaryBatch(dictionaryBatchFB.id(), recordBatch, dictionaryBatchFB.isDelta());
516 }
517
518 /**
519 * Deserializes an ArrowDictionaryBatch from a dictionary batch Message and data in an ArrowBuf.
520 *
521 * @param message a message of type MessageHeader.DictionaryBatch
522 * @param bodyBuffer Arrow buffer containing the DictionaryBatch data
523 * of type MessageHeader.DictionaryBatch
524 * @return the deserialized ArrowDictionaryBatch
525 * @throws IOException if something went wrong
526 */
527 public static ArrowDictionaryBatch deserializeDictionaryBatch(MessageMetadataResult message, ArrowBuf bodyBuffer)
528 throws IOException {
529 return deserializeDictionaryBatch(message.getMessage(), bodyBuffer);
530 }
531
532 /**
533 * Deserializes an ArrowDictionaryBatch read from the input channel. This uses the given allocator
534 * to create an ArrowBuf for the batch body data.
535 *
536 * @param in Channel to read a DictionaryBatch message and data from
537 * @param allocator BufferAllocator to allocate an Arrow buffer to read message body data
538 * @return the deserialized ArrowDictionaryBatch
539 * @throws IOException on error
540 */
541 public static ArrowDictionaryBatch deserializeDictionaryBatch(ReadChannel in, BufferAllocator allocator)
542 throws IOException {
543 MessageMetadataResult result = readMessage(in);
544 if (result == null) {
545 throw new IOException("Unexpected end of input when reading a DictionaryBatch");
546 }
547 if (result.getMessage().headerType() != MessageHeader.DictionaryBatch) {
548 throw new IOException("Expected DictionaryBatch but header was " + result.getMessage().headerType());
549 }
550 long bodyLength = result.getMessageBodyLength();
551 ArrowBuf bodyBuffer = readMessageBody(in, bodyLength, allocator);
552 return deserializeDictionaryBatch(result.getMessage(), bodyBuffer);
553 }
554
555 /**
556 * Deserializes a DictionaryBatch knowing the size of the entire message up front. This
557 * minimizes the number of reads to the underlying stream.
558 *
559 * @param in where to read from
560 * @param block block metadata for deserializing
561 * @param alloc to allocate new buffers
562 * @return the deserialized ArrowDictionaryBatch
563 * @throws IOException if something went wrong
564 */
565 public static ArrowDictionaryBatch deserializeDictionaryBatch(
566 ReadChannel in,
567 ArrowBlock block,
568 BufferAllocator alloc) throws IOException {
569 // Metadata length contains integer prefix plus byte padding
570 long totalLen = block.getMetadataLength() + block.getBodyLength();
571
572 ArrowBuf buffer = alloc.buffer(totalLen);
573 if (in.readFully(buffer, totalLen) != totalLen) {
574 throw new IOException("Unexpected end of input trying to read batch.");
575 }
576
577 int prefixSize = buffer.getInt(0) == IPC_CONTINUATION_TOKEN ? 8 : 4;
578
579 ArrowBuf metadataBuffer = buffer.slice(prefixSize, block.getMetadataLength() - prefixSize);
580
581 Message messageFB =
582 Message.getRootAsMessage(metadataBuffer.nioBuffer().asReadOnlyBuffer());
583
584 DictionaryBatch dictionaryBatchFB = (DictionaryBatch) messageFB.header(new DictionaryBatch());
585
586 // Now read the body
587 final ArrowBuf body = buffer.slice(block.getMetadataLength(),
588 totalLen - block.getMetadataLength());
589 ArrowRecordBatch recordBatch = deserializeRecordBatch(dictionaryBatchFB.data(), body);
590 return new ArrowDictionaryBatch(dictionaryBatchFB.id(), recordBatch, dictionaryBatchFB.isDelta());
591 }
592
593 /**
594 * Deserialize a message that is either an ArrowDictionaryBatch or ArrowRecordBatch.
595 *
596 * @param reader MessageChannelReader to read a sequence of messages from a ReadChannel
597 * @return The deserialized record batch
598 * @throws IOException if the message is not an ArrowDictionaryBatch or ArrowRecordBatch
599 */
600 public static ArrowMessage deserializeMessageBatch(MessageChannelReader reader) throws IOException {
601 MessageResult result = reader.readNext();
602 if (result == null) {
603 return null;
604 } else if (result.getMessage().bodyLength() > Integer.MAX_VALUE) {
605 throw new IOException("Cannot currently deserialize record batches over 2GB");
606 }
607
608 if (result.getMessage().version() != MetadataVersion.V4 &&
609 result.getMessage().version() != MetadataVersion.V5) {
610 throw new IOException("Received metadata with an incompatible version number: " + result.getMessage().version());
611 }
612
613 switch (result.getMessage().headerType()) {
614 case MessageHeader.RecordBatch:
615 return deserializeRecordBatch(result.getMessage(), result.getBodyBuffer());
616 case MessageHeader.DictionaryBatch:
617 return deserializeDictionaryBatch(result.getMessage(), result.getBodyBuffer());
618 default:
619 throw new IOException("Unexpected message header type " + result.getMessage().headerType());
620 }
621 }
622
623 /**
624 * Deserialize a message that is either an ArrowDictionaryBatch or ArrowRecordBatch.
625 *
626 * @param in ReadChannel to read messages from
627 * @param alloc Allocator for message data
628 * @return The deserialized record batch
629 * @throws IOException if the message is not an ArrowDictionaryBatch or ArrowRecordBatch
630 */
631 public static ArrowMessage deserializeMessageBatch(ReadChannel in, BufferAllocator alloc) throws IOException {
632 return deserializeMessageBatch(new MessageChannelReader(in, alloc));
633 }
634
635 @Deprecated
636 public static ByteBuffer serializeMessage(
637 FlatBufferBuilder builder,
638 byte headerType,
639 int headerOffset,
640 long bodyLength) {
641 return serializeMessage(builder, headerType, headerOffset, bodyLength, IpcOption.DEFAULT);
642 }
643
644 /**
645 * Serializes a message header.
646 *
647 * @param builder to write the flatbuf to
648 * @param headerType headerType field
649 * @param headerOffset header offset field
650 * @param bodyLength body length field
651 * @param writeOption IPC write options
652 * @return the corresponding ByteBuffer
653 */
654 public static ByteBuffer serializeMessage(
655 FlatBufferBuilder builder,
656 byte headerType,
657 int headerOffset,
658 long bodyLength,
659 IpcOption writeOption) {
660 Message.startMessage(builder);
661 Message.addHeaderType(builder, headerType);
662 Message.addHeader(builder, headerOffset);
663 Message.addVersion(builder, writeOption.metadataVersion.toFlatbufID());
664 Message.addBodyLength(builder, bodyLength);
665 builder.finish(Message.endMessage(builder));
666 return builder.dataBuffer();
667 }
668
669 /**
670 * Read a Message from the input channel and return a MessageMetadataResult that contains the
671 * Message metadata, buffer containing the serialized Message metadata as read, and length of the
672 * Message in bytes. Returns null if the end-of-stream has been reached.
673 *
674 * @param in ReadChannel to read messages from
675 * @return MessageMetadataResult with deserialized Message metadata and message information if
676 * a valid Message was read, or null if end-of-stream
677 * @throws IOException on error
678 */
679 public static MessageMetadataResult readMessage(ReadChannel in) throws IOException {
680
681 // Read the message size. There is an i32 little endian prefix.
682 ByteBuffer buffer = ByteBuffer.allocate(4);
683 if (in.readFully(buffer) == 4) {
684
685 int messageLength = MessageSerializer.bytesToInt(buffer.array());
686 if (messageLength == IPC_CONTINUATION_TOKEN) {
687 buffer.clear();
688 // ARROW-6313, if the first 4 bytes are continuation message, read the next 4 for the length
689 if (in.readFully(buffer) == 4) {
690 messageLength = MessageSerializer.bytesToInt(buffer.array());
691 }
692 }
693
694 // Length of 0 indicates end of stream
695 if (messageLength != 0) {
696
697 // Read the message into the buffer.
698 ByteBuffer messageBuffer = ByteBuffer.allocate(messageLength);
699 if (in.readFully(messageBuffer) != messageLength) {
700 throw new IOException(
701 "Unexpected end of stream trying to read message.");
702 }
703 messageBuffer.rewind();
704
705 // Load the message.
706 Message message = Message.getRootAsMessage(messageBuffer);
707
708 return new MessageMetadataResult(messageLength, messageBuffer, message);
709 }
710 }
711 return null;
712 }
713
714 /**
715 * Read a Message body from the in channel into an ArrowBuf.
716 *
717 * @param in ReadChannel to read message body from
718 * @param bodyLength Length in bytes of the message body to read
719 * @param allocator Allocate the ArrowBuf to contain message body data
720 * @return an ArrowBuf containing the message body data
721 * @throws IOException on error
722 */
723 public static ArrowBuf readMessageBody(ReadChannel in, long bodyLength,
724 BufferAllocator allocator) throws IOException {
725 ArrowBuf bodyBuffer = allocator.buffer(bodyLength);
726 try {
727 if (in.readFully(bodyBuffer, bodyLength) != bodyLength) {
728 throw new IOException("Unexpected end of input trying to read batch.");
729 }
730 } catch (RuntimeException | IOException e) {
731 bodyBuffer.close();
732 throw e;
733 }
734 return bodyBuffer;
735 }
736}