]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileReader.java
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / java / vector / src / main / java / org / apache / arrow / vector / ipc / ArrowFileReader.java
CommitLineData
1d09f67e
TL
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.arrow.vector.ipc;
19
20import java.io.IOException;
21import java.nio.ByteBuffer;
22import java.nio.channels.SeekableByteChannel;
23import java.util.Arrays;
24import java.util.HashMap;
25import java.util.List;
26import java.util.Map;
27
28import org.apache.arrow.flatbuf.Footer;
29import org.apache.arrow.memory.BufferAllocator;
30import org.apache.arrow.util.VisibleForTesting;
31import org.apache.arrow.vector.compression.CompressionCodec;
32import org.apache.arrow.vector.compression.NoCompressionCodec;
33import org.apache.arrow.vector.ipc.message.ArrowBlock;
34import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
35import org.apache.arrow.vector.ipc.message.ArrowFooter;
36import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
37import org.apache.arrow.vector.ipc.message.MessageSerializer;
38import org.apache.arrow.vector.types.pojo.Schema;
39import org.apache.arrow.vector.validate.MetadataV4UnionChecker;
40import org.slf4j.Logger;
41import org.slf4j.LoggerFactory;
42
43/**
44 * An implementation of {@link ArrowReader} that reads the standard arrow binary
45 * file format.
46 */
47public class ArrowFileReader extends ArrowReader {
48
49 private static final Logger LOGGER = LoggerFactory.getLogger(ArrowFileReader.class);
50
51 private SeekableReadChannel in;
52 private ArrowFooter footer;
53 private int currentDictionaryBatch = 0;
54 private int currentRecordBatch = 0;
55
56 public ArrowFileReader(
57 SeekableReadChannel in, BufferAllocator allocator, CompressionCodec.Factory compressionFactory) {
58 super(allocator, compressionFactory);
59 this.in = in;
60 }
61
62 public ArrowFileReader(
63 SeekableByteChannel in, BufferAllocator allocator, CompressionCodec.Factory compressionFactory) {
64 this(new SeekableReadChannel(in), allocator, compressionFactory);
65 }
66
67 public ArrowFileReader(SeekableReadChannel in, BufferAllocator allocator) {
68 this(in, allocator, NoCompressionCodec.Factory.INSTANCE);
69 }
70
71 public ArrowFileReader(SeekableByteChannel in, BufferAllocator allocator) {
72 this(new SeekableReadChannel(in), allocator);
73 }
74
75 @Override
76 public long bytesRead() {
77 return in.bytesRead();
78 }
79
80 @Override
81 protected void closeReadSource() throws IOException {
82 in.close();
83 }
84
85 @Override
86 protected Schema readSchema() throws IOException {
87 if (footer == null) {
88 if (in.size() <= (ArrowMagic.MAGIC_LENGTH * 2 + 4)) {
89 throw new InvalidArrowFileException("file too small: " + in.size());
90 }
91 ByteBuffer buffer = ByteBuffer.allocate(4 + ArrowMagic.MAGIC_LENGTH);
92 long footerLengthOffset = in.size() - buffer.remaining();
93 in.setPosition(footerLengthOffset);
94 in.readFully(buffer);
95 buffer.flip();
96 byte[] array = buffer.array();
97 if (!ArrowMagic.validateMagic(Arrays.copyOfRange(array, 4, array.length))) {
98 throw new InvalidArrowFileException("missing Magic number " + Arrays.toString(buffer.array()));
99 }
100 int footerLength = MessageSerializer.bytesToInt(array);
101 if (footerLength <= 0 || footerLength + ArrowMagic.MAGIC_LENGTH * 2 + 4 > in.size()) {
102 throw new InvalidArrowFileException("invalid footer length: " + footerLength);
103 }
104 long footerOffset = footerLengthOffset - footerLength;
105 LOGGER.debug("Footer starts at {}, length: {}", footerOffset, footerLength);
106 ByteBuffer footerBuffer = ByteBuffer.allocate(footerLength);
107 in.setPosition(footerOffset);
108 in.readFully(footerBuffer);
109 footerBuffer.flip();
110 Footer footerFB = Footer.getRootAsFooter(footerBuffer);
111 this.footer = new ArrowFooter(footerFB);
112 }
113 MetadataV4UnionChecker.checkRead(footer.getSchema(), footer.getMetadataVersion());
114 return footer.getSchema();
115 }
116
117 @Override
118 public void initialize() throws IOException {
119 super.initialize();
120
121 // empty stream, has no dictionaries in IPC.
122 if (footer.getRecordBatches().size() == 0) {
123 return;
124 }
125 // Read and load all dictionaries from schema
126 for (int i = 0; i < dictionaries.size(); i++) {
127 ArrowDictionaryBatch dictionaryBatch = readDictionary();
128 loadDictionary(dictionaryBatch);
129 }
130 }
131
132 /**
133 * Get custom metadata.
134 */
135 public Map<String, String> getMetaData() {
136 if (footer != null) {
137 return footer.getMetaData();
138 }
139 return new HashMap<>();
140 }
141
142 /**
143 * Read a dictionary batch from the source, will be invoked after the schema has been read and
144 * called N times, where N is the number of dictionaries indicated by the schema Fields.
145 *
146 * @return the read ArrowDictionaryBatch
147 * @throws IOException on error
148 */
149 public ArrowDictionaryBatch readDictionary() throws IOException {
150 if (currentDictionaryBatch >= footer.getDictionaries().size()) {
151 throw new IOException("Requested more dictionaries than defined in footer: " + currentDictionaryBatch);
152 }
153 ArrowBlock block = footer.getDictionaries().get(currentDictionaryBatch++);
154 return readDictionaryBatch(in, block, allocator);
155 }
156
157 /** Returns true if a batch was read, false if no more batches. */
158 @Override
159 public boolean loadNextBatch() throws IOException {
160 prepareLoadNextBatch();
161
162 if (currentRecordBatch < footer.getRecordBatches().size()) {
163 ArrowBlock block = footer.getRecordBatches().get(currentRecordBatch++);
164 ArrowRecordBatch batch = readRecordBatch(in, block, allocator);
165 loadRecordBatch(batch);
166 return true;
167 } else {
168 return false;
169 }
170 }
171
172
173 public List<ArrowBlock> getDictionaryBlocks() throws IOException {
174 ensureInitialized();
175 return footer.getDictionaries();
176 }
177
178 /**
179 * Returns the {@link ArrowBlock} metadata from the file.
180 */
181 public List<ArrowBlock> getRecordBlocks() throws IOException {
182 ensureInitialized();
183 return footer.getRecordBatches();
184 }
185
186 /**
187 * Loads record batch for the given block.
188 */
189 public boolean loadRecordBatch(ArrowBlock block) throws IOException {
190 ensureInitialized();
191 int blockIndex = footer.getRecordBatches().indexOf(block);
192 if (blockIndex == -1) {
193 throw new IllegalArgumentException("Arrow block does not exist in record batches: " + block);
194 }
195 currentRecordBatch = blockIndex;
196 return loadNextBatch();
197 }
198
199 @VisibleForTesting
200 ArrowFooter getFooter() {
201 return footer;
202 }
203
204 private ArrowDictionaryBatch readDictionaryBatch(SeekableReadChannel in,
205 ArrowBlock block,
206 BufferAllocator allocator) throws IOException {
207 LOGGER.debug("DictionaryRecordBatch at {}, metadata: {}, body: {}",
208 block.getOffset(), block.getMetadataLength(), block.getBodyLength());
209 in.setPosition(block.getOffset());
210 ArrowDictionaryBatch batch = MessageSerializer.deserializeDictionaryBatch(in, block, allocator);
211 if (batch == null) {
212 throw new IOException("Invalid file. No batch at offset: " + block.getOffset());
213 }
214 return batch;
215 }
216
217 private ArrowRecordBatch readRecordBatch(SeekableReadChannel in,
218 ArrowBlock block,
219 BufferAllocator allocator) throws IOException {
220 LOGGER.debug("RecordBatch at {}, metadata: {}, body: {}",
221 block.getOffset(), block.getMetadataLength(),
222 block.getBodyLength());
223 in.setPosition(block.getOffset());
224 ArrowRecordBatch batch = MessageSerializer.deserializeRecordBatch(in, block, allocator);
225 if (batch == null) {
226 throw new IOException("Invalid file. No batch at offset: " + block.getOffset());
227 }
228 return batch;
229 }
230}