]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one or more | |
3 | * contributor license agreements. See the NOTICE file distributed with | |
4 | * this work for additional information regarding copyright ownership. | |
5 | * The ASF licenses this file to You under the Apache License, Version 2.0 | |
6 | * (the "License"); you may not use this file except in compliance with | |
7 | * the License. You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | */ | |
17 | ||
18 | package org.apache.arrow.vector.ipc; | |
19 | ||
20 | import java.io.IOException; | |
21 | import java.nio.ByteBuffer; | |
22 | import java.nio.channels.SeekableByteChannel; | |
23 | import java.util.Arrays; | |
24 | import java.util.HashMap; | |
25 | import java.util.List; | |
26 | import java.util.Map; | |
27 | ||
28 | import org.apache.arrow.flatbuf.Footer; | |
29 | import org.apache.arrow.memory.BufferAllocator; | |
30 | import org.apache.arrow.util.VisibleForTesting; | |
31 | import org.apache.arrow.vector.compression.CompressionCodec; | |
32 | import org.apache.arrow.vector.compression.NoCompressionCodec; | |
33 | import org.apache.arrow.vector.ipc.message.ArrowBlock; | |
34 | import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch; | |
35 | import org.apache.arrow.vector.ipc.message.ArrowFooter; | |
36 | import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; | |
37 | import org.apache.arrow.vector.ipc.message.MessageSerializer; | |
38 | import org.apache.arrow.vector.types.pojo.Schema; | |
39 | import org.apache.arrow.vector.validate.MetadataV4UnionChecker; | |
40 | import org.slf4j.Logger; | |
41 | import org.slf4j.LoggerFactory; | |
42 | ||
43 | /** | |
44 | * An implementation of {@link ArrowReader} that reads the standard arrow binary | |
45 | * file format. | |
46 | */ | |
47 | public class ArrowFileReader extends ArrowReader { | |
48 | ||
49 | private static final Logger LOGGER = LoggerFactory.getLogger(ArrowFileReader.class); | |
50 | ||
51 | private SeekableReadChannel in; | |
52 | private ArrowFooter footer; | |
53 | private int currentDictionaryBatch = 0; | |
54 | private int currentRecordBatch = 0; | |
55 | ||
56 | public ArrowFileReader( | |
57 | SeekableReadChannel in, BufferAllocator allocator, CompressionCodec.Factory compressionFactory) { | |
58 | super(allocator, compressionFactory); | |
59 | this.in = in; | |
60 | } | |
61 | ||
62 | public ArrowFileReader( | |
63 | SeekableByteChannel in, BufferAllocator allocator, CompressionCodec.Factory compressionFactory) { | |
64 | this(new SeekableReadChannel(in), allocator, compressionFactory); | |
65 | } | |
66 | ||
67 | public ArrowFileReader(SeekableReadChannel in, BufferAllocator allocator) { | |
68 | this(in, allocator, NoCompressionCodec.Factory.INSTANCE); | |
69 | } | |
70 | ||
71 | public ArrowFileReader(SeekableByteChannel in, BufferAllocator allocator) { | |
72 | this(new SeekableReadChannel(in), allocator); | |
73 | } | |
74 | ||
75 | @Override | |
76 | public long bytesRead() { | |
77 | return in.bytesRead(); | |
78 | } | |
79 | ||
80 | @Override | |
81 | protected void closeReadSource() throws IOException { | |
82 | in.close(); | |
83 | } | |
84 | ||
85 | @Override | |
86 | protected Schema readSchema() throws IOException { | |
87 | if (footer == null) { | |
88 | if (in.size() <= (ArrowMagic.MAGIC_LENGTH * 2 + 4)) { | |
89 | throw new InvalidArrowFileException("file too small: " + in.size()); | |
90 | } | |
91 | ByteBuffer buffer = ByteBuffer.allocate(4 + ArrowMagic.MAGIC_LENGTH); | |
92 | long footerLengthOffset = in.size() - buffer.remaining(); | |
93 | in.setPosition(footerLengthOffset); | |
94 | in.readFully(buffer); | |
95 | buffer.flip(); | |
96 | byte[] array = buffer.array(); | |
97 | if (!ArrowMagic.validateMagic(Arrays.copyOfRange(array, 4, array.length))) { | |
98 | throw new InvalidArrowFileException("missing Magic number " + Arrays.toString(buffer.array())); | |
99 | } | |
100 | int footerLength = MessageSerializer.bytesToInt(array); | |
101 | if (footerLength <= 0 || footerLength + ArrowMagic.MAGIC_LENGTH * 2 + 4 > in.size()) { | |
102 | throw new InvalidArrowFileException("invalid footer length: " + footerLength); | |
103 | } | |
104 | long footerOffset = footerLengthOffset - footerLength; | |
105 | LOGGER.debug("Footer starts at {}, length: {}", footerOffset, footerLength); | |
106 | ByteBuffer footerBuffer = ByteBuffer.allocate(footerLength); | |
107 | in.setPosition(footerOffset); | |
108 | in.readFully(footerBuffer); | |
109 | footerBuffer.flip(); | |
110 | Footer footerFB = Footer.getRootAsFooter(footerBuffer); | |
111 | this.footer = new ArrowFooter(footerFB); | |
112 | } | |
113 | MetadataV4UnionChecker.checkRead(footer.getSchema(), footer.getMetadataVersion()); | |
114 | return footer.getSchema(); | |
115 | } | |
116 | ||
117 | @Override | |
118 | public void initialize() throws IOException { | |
119 | super.initialize(); | |
120 | ||
121 | // empty stream, has no dictionaries in IPC. | |
122 | if (footer.getRecordBatches().size() == 0) { | |
123 | return; | |
124 | } | |
125 | // Read and load all dictionaries from schema | |
126 | for (int i = 0; i < dictionaries.size(); i++) { | |
127 | ArrowDictionaryBatch dictionaryBatch = readDictionary(); | |
128 | loadDictionary(dictionaryBatch); | |
129 | } | |
130 | } | |
131 | ||
132 | /** | |
133 | * Get custom metadata. | |
134 | */ | |
135 | public Map<String, String> getMetaData() { | |
136 | if (footer != null) { | |
137 | return footer.getMetaData(); | |
138 | } | |
139 | return new HashMap<>(); | |
140 | } | |
141 | ||
142 | /** | |
143 | * Read a dictionary batch from the source, will be invoked after the schema has been read and | |
144 | * called N times, where N is the number of dictionaries indicated by the schema Fields. | |
145 | * | |
146 | * @return the read ArrowDictionaryBatch | |
147 | * @throws IOException on error | |
148 | */ | |
149 | public ArrowDictionaryBatch readDictionary() throws IOException { | |
150 | if (currentDictionaryBatch >= footer.getDictionaries().size()) { | |
151 | throw new IOException("Requested more dictionaries than defined in footer: " + currentDictionaryBatch); | |
152 | } | |
153 | ArrowBlock block = footer.getDictionaries().get(currentDictionaryBatch++); | |
154 | return readDictionaryBatch(in, block, allocator); | |
155 | } | |
156 | ||
157 | /** Returns true if a batch was read, false if no more batches. */ | |
158 | @Override | |
159 | public boolean loadNextBatch() throws IOException { | |
160 | prepareLoadNextBatch(); | |
161 | ||
162 | if (currentRecordBatch < footer.getRecordBatches().size()) { | |
163 | ArrowBlock block = footer.getRecordBatches().get(currentRecordBatch++); | |
164 | ArrowRecordBatch batch = readRecordBatch(in, block, allocator); | |
165 | loadRecordBatch(batch); | |
166 | return true; | |
167 | } else { | |
168 | return false; | |
169 | } | |
170 | } | |
171 | ||
172 | ||
173 | public List<ArrowBlock> getDictionaryBlocks() throws IOException { | |
174 | ensureInitialized(); | |
175 | return footer.getDictionaries(); | |
176 | } | |
177 | ||
178 | /** | |
179 | * Returns the {@link ArrowBlock} metadata from the file. | |
180 | */ | |
181 | public List<ArrowBlock> getRecordBlocks() throws IOException { | |
182 | ensureInitialized(); | |
183 | return footer.getRecordBatches(); | |
184 | } | |
185 | ||
186 | /** | |
187 | * Loads record batch for the given block. | |
188 | */ | |
189 | public boolean loadRecordBatch(ArrowBlock block) throws IOException { | |
190 | ensureInitialized(); | |
191 | int blockIndex = footer.getRecordBatches().indexOf(block); | |
192 | if (blockIndex == -1) { | |
193 | throw new IllegalArgumentException("Arrow block does not exist in record batches: " + block); | |
194 | } | |
195 | currentRecordBatch = blockIndex; | |
196 | return loadNextBatch(); | |
197 | } | |
198 | ||
199 | @VisibleForTesting | |
200 | ArrowFooter getFooter() { | |
201 | return footer; | |
202 | } | |
203 | ||
204 | private ArrowDictionaryBatch readDictionaryBatch(SeekableReadChannel in, | |
205 | ArrowBlock block, | |
206 | BufferAllocator allocator) throws IOException { | |
207 | LOGGER.debug("DictionaryRecordBatch at {}, metadata: {}, body: {}", | |
208 | block.getOffset(), block.getMetadataLength(), block.getBodyLength()); | |
209 | in.setPosition(block.getOffset()); | |
210 | ArrowDictionaryBatch batch = MessageSerializer.deserializeDictionaryBatch(in, block, allocator); | |
211 | if (batch == null) { | |
212 | throw new IOException("Invalid file. No batch at offset: " + block.getOffset()); | |
213 | } | |
214 | return batch; | |
215 | } | |
216 | ||
217 | private ArrowRecordBatch readRecordBatch(SeekableReadChannel in, | |
218 | ArrowBlock block, | |
219 | BufferAllocator allocator) throws IOException { | |
220 | LOGGER.debug("RecordBatch at {}, metadata: {}, body: {}", | |
221 | block.getOffset(), block.getMetadataLength(), | |
222 | block.getBodyLength()); | |
223 | in.setPosition(block.getOffset()); | |
224 | ArrowRecordBatch batch = MessageSerializer.deserializeRecordBatch(in, block, allocator); | |
225 | if (batch == null) { | |
226 | throw new IOException("Invalid file. No batch at offset: " + block.getOffset()); | |
227 | } | |
228 | return batch; | |
229 | } | |
230 | } |