2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org
.apache
.arrow
.vector
.ipc
;
20 import java
.io
.IOException
;
21 import java
.nio
.ByteBuffer
;
22 import java
.nio
.channels
.SeekableByteChannel
;
23 import java
.util
.Arrays
;
24 import java
.util
.HashMap
;
25 import java
.util
.List
;
28 import org
.apache
.arrow
.flatbuf
.Footer
;
29 import org
.apache
.arrow
.memory
.BufferAllocator
;
30 import org
.apache
.arrow
.util
.VisibleForTesting
;
31 import org
.apache
.arrow
.vector
.compression
.CompressionCodec
;
32 import org
.apache
.arrow
.vector
.compression
.NoCompressionCodec
;
33 import org
.apache
.arrow
.vector
.ipc
.message
.ArrowBlock
;
34 import org
.apache
.arrow
.vector
.ipc
.message
.ArrowDictionaryBatch
;
35 import org
.apache
.arrow
.vector
.ipc
.message
.ArrowFooter
;
36 import org
.apache
.arrow
.vector
.ipc
.message
.ArrowRecordBatch
;
37 import org
.apache
.arrow
.vector
.ipc
.message
.MessageSerializer
;
38 import org
.apache
.arrow
.vector
.types
.pojo
.Schema
;
39 import org
.apache
.arrow
.vector
.validate
.MetadataV4UnionChecker
;
40 import org
.slf4j
.Logger
;
41 import org
.slf4j
.LoggerFactory
;
44 * An implementation of {@link ArrowReader} that reads the standard arrow binary
47 public class ArrowFileReader
extends ArrowReader
{
49 private static final Logger LOGGER
= LoggerFactory
.getLogger(ArrowFileReader
.class);
51 private SeekableReadChannel in
;
52 private ArrowFooter footer
;
53 private int currentDictionaryBatch
= 0;
54 private int currentRecordBatch
= 0;
56 public ArrowFileReader(
57 SeekableReadChannel in
, BufferAllocator allocator
, CompressionCodec
.Factory compressionFactory
) {
58 super(allocator
, compressionFactory
);
62 public ArrowFileReader(
63 SeekableByteChannel in
, BufferAllocator allocator
, CompressionCodec
.Factory compressionFactory
) {
64 this(new SeekableReadChannel(in
), allocator
, compressionFactory
);
67 public ArrowFileReader(SeekableReadChannel in
, BufferAllocator allocator
) {
68 this(in
, allocator
, NoCompressionCodec
.Factory
.INSTANCE
);
71 public ArrowFileReader(SeekableByteChannel in
, BufferAllocator allocator
) {
72 this(new SeekableReadChannel(in
), allocator
);
76 public long bytesRead() {
77 return in
.bytesRead();
81 protected void closeReadSource() throws IOException
{
86 protected Schema
readSchema() throws IOException
{
88 if (in
.size() <= (ArrowMagic
.MAGIC_LENGTH
* 2 + 4)) {
89 throw new InvalidArrowFileException("file too small: " + in
.size());
91 ByteBuffer buffer
= ByteBuffer
.allocate(4 + ArrowMagic
.MAGIC_LENGTH
);
92 long footerLengthOffset
= in
.size() - buffer
.remaining();
93 in
.setPosition(footerLengthOffset
);
96 byte[] array
= buffer
.array();
97 if (!ArrowMagic
.validateMagic(Arrays
.copyOfRange(array
, 4, array
.length
))) {
98 throw new InvalidArrowFileException("missing Magic number " + Arrays
.toString(buffer
.array()));
100 int footerLength
= MessageSerializer
.bytesToInt(array
);
101 if (footerLength
<= 0 || footerLength
+ ArrowMagic
.MAGIC_LENGTH
* 2 + 4 > in
.size()) {
102 throw new InvalidArrowFileException("invalid footer length: " + footerLength
);
104 long footerOffset
= footerLengthOffset
- footerLength
;
105 LOGGER
.debug("Footer starts at {}, length: {}", footerOffset
, footerLength
);
106 ByteBuffer footerBuffer
= ByteBuffer
.allocate(footerLength
);
107 in
.setPosition(footerOffset
);
108 in
.readFully(footerBuffer
);
110 Footer footerFB
= Footer
.getRootAsFooter(footerBuffer
);
111 this.footer
= new ArrowFooter(footerFB
);
113 MetadataV4UnionChecker
.checkRead(footer
.getSchema(), footer
.getMetadataVersion());
114 return footer
.getSchema();
118 public void initialize() throws IOException
{
121 // empty stream, has no dictionaries in IPC.
122 if (footer
.getRecordBatches().size() == 0) {
125 // Read and load all dictionaries from schema
126 for (int i
= 0; i
< dictionaries
.size(); i
++) {
127 ArrowDictionaryBatch dictionaryBatch
= readDictionary();
128 loadDictionary(dictionaryBatch
);
133 * Get custom metadata.
135 public Map
<String
, String
> getMetaData() {
136 if (footer
!= null) {
137 return footer
.getMetaData();
139 return new HashMap
<>();
143 * Read a dictionary batch from the source, will be invoked after the schema has been read and
144 * called N times, where N is the number of dictionaries indicated by the schema Fields.
146 * @return the read ArrowDictionaryBatch
147 * @throws IOException on error
149 public ArrowDictionaryBatch
readDictionary() throws IOException
{
150 if (currentDictionaryBatch
>= footer
.getDictionaries().size()) {
151 throw new IOException("Requested more dictionaries than defined in footer: " + currentDictionaryBatch
);
153 ArrowBlock block
= footer
.getDictionaries().get(currentDictionaryBatch
++);
154 return readDictionaryBatch(in
, block
, allocator
);
157 /** Returns true if a batch was read, false if no more batches. */
159 public boolean loadNextBatch() throws IOException
{
160 prepareLoadNextBatch();
162 if (currentRecordBatch
< footer
.getRecordBatches().size()) {
163 ArrowBlock block
= footer
.getRecordBatches().get(currentRecordBatch
++);
164 ArrowRecordBatch batch
= readRecordBatch(in
, block
, allocator
);
165 loadRecordBatch(batch
);
173 public List
<ArrowBlock
> getDictionaryBlocks() throws IOException
{
175 return footer
.getDictionaries();
179 * Returns the {@link ArrowBlock} metadata from the file.
181 public List
<ArrowBlock
> getRecordBlocks() throws IOException
{
183 return footer
.getRecordBatches();
187 * Loads record batch for the given block.
189 public boolean loadRecordBatch(ArrowBlock block
) throws IOException
{
191 int blockIndex
= footer
.getRecordBatches().indexOf(block
);
192 if (blockIndex
== -1) {
193 throw new IllegalArgumentException("Arrow block does not exist in record batches: " + block
);
195 currentRecordBatch
= blockIndex
;
196 return loadNextBatch();
200 ArrowFooter
getFooter() {
204 private ArrowDictionaryBatch
readDictionaryBatch(SeekableReadChannel in
,
206 BufferAllocator allocator
) throws IOException
{
207 LOGGER
.debug("DictionaryRecordBatch at {}, metadata: {}, body: {}",
208 block
.getOffset(), block
.getMetadataLength(), block
.getBodyLength());
209 in
.setPosition(block
.getOffset());
210 ArrowDictionaryBatch batch
= MessageSerializer
.deserializeDictionaryBatch(in
, block
, allocator
);
212 throw new IOException("Invalid file. No batch at offset: " + block
.getOffset());
217 private ArrowRecordBatch
readRecordBatch(SeekableReadChannel in
,
219 BufferAllocator allocator
) throws IOException
{
220 LOGGER
.debug("RecordBatch at {}, metadata: {}, body: {}",
221 block
.getOffset(), block
.getMetadataLength(),
222 block
.getBodyLength());
223 in
.setPosition(block
.getOffset());
224 ArrowRecordBatch batch
= MessageSerializer
.deserializeRecordBatch(in
, block
, allocator
);
226 throw new IOException("Invalid file. No batch at offset: " + block
.getOffset());