]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one or more | |
3 | * contributor license agreements. See the NOTICE file distributed with | |
4 | * this work for additional information regarding copyright ownership. | |
5 | * The ASF licenses this file to You under the Apache License, Version 2.0 | |
6 | * (the "License"); you may not use this file except in compliance with | |
7 | * the License. You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | */ | |
17 | ||
18 | package org.apache.arrow.vector.ipc; | |
19 | ||
20 | import static com.fasterxml.jackson.core.JsonToken.END_ARRAY; | |
21 | import static com.fasterxml.jackson.core.JsonToken.END_OBJECT; | |
22 | import static com.fasterxml.jackson.core.JsonToken.START_ARRAY; | |
23 | import static com.fasterxml.jackson.core.JsonToken.START_OBJECT; | |
24 | import static java.nio.charset.StandardCharsets.UTF_8; | |
25 | import static org.apache.arrow.vector.BufferLayout.BufferType.DATA; | |
26 | import static org.apache.arrow.vector.BufferLayout.BufferType.OFFSET; | |
27 | import static org.apache.arrow.vector.BufferLayout.BufferType.TYPE; | |
28 | import static org.apache.arrow.vector.BufferLayout.BufferType.VALIDITY; | |
29 | ||
30 | import java.io.File; | |
31 | import java.io.IOException; | |
32 | import java.math.BigDecimal; | |
33 | import java.math.BigInteger; | |
34 | import java.util.ArrayList; | |
35 | import java.util.Arrays; | |
36 | import java.util.Collections; | |
37 | import java.util.HashMap; | |
38 | import java.util.List; | |
39 | import java.util.Map; | |
40 | import java.util.Objects; | |
41 | ||
42 | import org.apache.arrow.memory.ArrowBuf; | |
43 | import org.apache.arrow.memory.BufferAllocator; | |
44 | import org.apache.arrow.util.Preconditions; | |
45 | import org.apache.arrow.vector.BigIntVector; | |
46 | import org.apache.arrow.vector.BitVectorHelper; | |
47 | import org.apache.arrow.vector.BufferLayout.BufferType; | |
48 | import org.apache.arrow.vector.Decimal256Vector; | |
49 | import org.apache.arrow.vector.DecimalVector; | |
50 | import org.apache.arrow.vector.FieldVector; | |
51 | import org.apache.arrow.vector.Float4Vector; | |
52 | import org.apache.arrow.vector.Float8Vector; | |
53 | import org.apache.arrow.vector.IntVector; | |
54 | import org.apache.arrow.vector.IntervalDayVector; | |
55 | import org.apache.arrow.vector.IntervalMonthDayNanoVector; | |
56 | import org.apache.arrow.vector.SmallIntVector; | |
57 | import org.apache.arrow.vector.TinyIntVector; | |
58 | import org.apache.arrow.vector.TypeLayout; | |
59 | import org.apache.arrow.vector.VectorSchemaRoot; | |
60 | import org.apache.arrow.vector.dictionary.Dictionary; | |
61 | import org.apache.arrow.vector.dictionary.DictionaryProvider; | |
62 | import org.apache.arrow.vector.ipc.message.ArrowFieldNode; | |
63 | import org.apache.arrow.vector.types.Types; | |
64 | import org.apache.arrow.vector.types.pojo.ArrowType; | |
65 | import org.apache.arrow.vector.types.pojo.Field; | |
66 | import org.apache.arrow.vector.types.pojo.Schema; | |
67 | import org.apache.arrow.vector.util.DecimalUtility; | |
68 | import org.apache.arrow.vector.util.DictionaryUtility; | |
69 | import org.apache.commons.codec.DecoderException; | |
70 | import org.apache.commons.codec.binary.Hex; | |
71 | ||
72 | import com.fasterxml.jackson.core.JsonParseException; | |
73 | import com.fasterxml.jackson.core.JsonParser; | |
74 | import com.fasterxml.jackson.core.JsonToken; | |
75 | import com.fasterxml.jackson.databind.MapperFeature; | |
76 | import com.fasterxml.jackson.databind.MappingJsonFactory; | |
77 | import com.fasterxml.jackson.databind.ObjectMapper; | |
78 | ||
79 | /** | |
80 | * A reader for JSON files that translates them into vectors. This reader is used for integration tests. | |
81 | * | |
82 | * <p>This class uses a streaming parser API, method naming tends to reflect this implementation | |
83 | * detail. | |
84 | */ | |
85 | public class JsonFileReader implements AutoCloseable, DictionaryProvider { | |
86 | private final JsonParser parser; | |
87 | private final BufferAllocator allocator; | |
88 | private Schema schema; | |
89 | private Map<Long, Dictionary> dictionaries; | |
90 | private Boolean started = false; | |
91 | ||
92 | /** | |
93 | * Constructs a new instance. | |
94 | * @param inputFile The file to read. | |
95 | * @param allocator The allocator to use for allocating buffers. | |
96 | */ | |
97 | public JsonFileReader(File inputFile, BufferAllocator allocator) throws JsonParseException, IOException { | |
98 | super(); | |
99 | this.allocator = allocator; | |
100 | MappingJsonFactory jsonFactory = new MappingJsonFactory(new ObjectMapper() | |
101 | //ignore case for enums | |
102 | .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS, true) | |
103 | ); | |
104 | this.parser = jsonFactory.createParser(inputFile); | |
105 | // Allow reading NaN for floating point values | |
106 | this.parser.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true); | |
107 | } | |
108 | ||
109 | @Override | |
110 | public Dictionary lookup(long id) { | |
111 | if (!started) { | |
112 | throw new IllegalStateException("Unable to lookup until after read() has started"); | |
113 | } | |
114 | ||
115 | return dictionaries.get(id); | |
116 | } | |
117 | ||
118 | /** Reads the beginning (schema section) of the json file and returns it. */ | |
119 | public Schema start() throws JsonParseException, IOException { | |
120 | readToken(START_OBJECT); | |
121 | { | |
122 | Schema originalSchema = readNextField("schema", Schema.class); | |
123 | List<Field> fields = new ArrayList<>(); | |
124 | dictionaries = new HashMap<>(); | |
125 | ||
126 | // Convert fields with dictionaries to have the index type | |
127 | for (Field field : originalSchema.getFields()) { | |
128 | fields.add(DictionaryUtility.toMemoryFormat(field, allocator, dictionaries)); | |
129 | } | |
130 | this.schema = new Schema(fields, originalSchema.getCustomMetadata()); | |
131 | ||
132 | if (!dictionaries.isEmpty()) { | |
133 | nextFieldIs("dictionaries"); | |
134 | readDictionaryBatches(); | |
135 | } | |
136 | ||
137 | nextFieldIs("batches"); | |
138 | readToken(START_ARRAY); | |
139 | started = true; | |
140 | return this.schema; | |
141 | } | |
142 | } | |
143 | ||
144 | private void readDictionaryBatches() throws JsonParseException, IOException { | |
145 | readToken(START_ARRAY); | |
146 | JsonToken token = parser.nextToken(); | |
147 | boolean haveDictionaryBatch = token == START_OBJECT; | |
148 | while (haveDictionaryBatch) { | |
149 | ||
150 | // Lookup what dictionary for the batch about to be read | |
151 | long id = readNextField("id", Long.class); | |
152 | Dictionary dict = dictionaries.get(id); | |
153 | if (dict == null) { | |
154 | throw new IllegalArgumentException("Dictionary with id: " + id + " missing encoding from schema Field"); | |
155 | } | |
156 | ||
157 | // Read the dictionary record batch | |
158 | nextFieldIs("data"); | |
159 | FieldVector vector = dict.getVector(); | |
160 | List<Field> fields = Collections.singletonList(vector.getField()); | |
161 | List<FieldVector> vectors = Collections.singletonList(vector); | |
162 | VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, vector.getValueCount()); | |
163 | read(root); | |
164 | ||
165 | readToken(END_OBJECT); | |
166 | token = parser.nextToken(); | |
167 | haveDictionaryBatch = token == START_OBJECT; | |
168 | } | |
169 | ||
170 | if (token != END_ARRAY) { | |
171 | throw new IllegalArgumentException("Invalid token: " + token + " expected end of array at " + | |
172 | parser.getTokenLocation()); | |
173 | } | |
174 | } | |
175 | ||
176 | /** | |
177 | * Reads the next record batch from the file into <code>root</code>. | |
178 | */ | |
179 | public boolean read(VectorSchemaRoot root) throws IOException { | |
180 | JsonToken t = parser.nextToken(); | |
181 | if (t == START_OBJECT) { | |
182 | { | |
183 | int count = readNextField("count", Integer.class); | |
184 | nextFieldIs("columns"); | |
185 | readToken(START_ARRAY); | |
186 | { | |
187 | for (Field field : root.getSchema().getFields()) { | |
188 | FieldVector vector = root.getVector(field); | |
189 | readFromJsonIntoVector(field, vector); | |
190 | } | |
191 | } | |
192 | readToken(END_ARRAY); | |
193 | root.setRowCount(count); | |
194 | } | |
195 | readToken(END_OBJECT); | |
196 | return true; | |
197 | } else if (t == END_ARRAY) { | |
198 | root.setRowCount(0); | |
199 | return false; | |
200 | } else { | |
201 | throw new IllegalArgumentException("Invalid token: " + t); | |
202 | } | |
203 | } | |
204 | ||
205 | /** | |
206 | * Returns the next record batch from the file. | |
207 | */ | |
208 | public VectorSchemaRoot read() throws IOException { | |
209 | JsonToken t = parser.nextToken(); | |
210 | if (t == START_OBJECT) { | |
211 | VectorSchemaRoot recordBatch = VectorSchemaRoot.create(schema, allocator); | |
212 | { | |
213 | int count = readNextField("count", Integer.class); | |
214 | recordBatch.setRowCount(count); | |
215 | nextFieldIs("columns"); | |
216 | readToken(START_ARRAY); | |
217 | { | |
218 | for (Field field : schema.getFields()) { | |
219 | FieldVector vector = recordBatch.getVector(field); | |
220 | readFromJsonIntoVector(field, vector); | |
221 | } | |
222 | } | |
223 | readToken(END_ARRAY); | |
224 | } | |
225 | readToken(END_OBJECT); | |
226 | return recordBatch; | |
227 | } else if (t == END_ARRAY) { | |
228 | return null; | |
229 | } else { | |
230 | throw new IllegalArgumentException("Invalid token: " + t); | |
231 | } | |
232 | } | |
233 | ||
234 | private abstract class BufferReader { | |
235 | protected abstract ArrowBuf read(BufferAllocator allocator, int count) throws IOException; | |
236 | ||
237 | ArrowBuf readBuffer(BufferAllocator allocator, int count) throws IOException { | |
238 | readToken(START_ARRAY); | |
239 | ArrowBuf buf = read(allocator, count); | |
240 | readToken(END_ARRAY); | |
241 | return buf; | |
242 | } | |
243 | } | |
244 | ||
245 | private class BufferHelper { | |
246 | BufferReader BIT = new BufferReader() { | |
247 | @Override | |
248 | protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException { | |
249 | final int bufferSize = BitVectorHelper.getValidityBufferSize(count); | |
250 | ArrowBuf buf = allocator.buffer(bufferSize); | |
251 | ||
252 | // C++ integration test fails without this. | |
253 | buf.setZero(0, bufferSize); | |
254 | ||
255 | for (int i = 0; i < count; i++) { | |
256 | parser.nextToken(); | |
257 | BitVectorHelper.setValidityBit(buf, i, parser.readValueAs(Boolean.class) ? 1 : 0); | |
258 | } | |
259 | ||
260 | buf.writerIndex(bufferSize); | |
261 | return buf; | |
262 | } | |
263 | }; | |
264 | ||
265 | BufferReader DAY_MILLIS = new BufferReader() { | |
266 | @Override | |
267 | protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException { | |
268 | final long size = (long) count * IntervalDayVector.TYPE_WIDTH; | |
269 | ArrowBuf buf = allocator.buffer(size); | |
270 | ||
271 | for (int i = 0; i < count; i++) { | |
272 | readToken(START_OBJECT); | |
273 | buf.writeInt(readNextField("days", Integer.class)); | |
274 | buf.writeInt(readNextField("milliseconds", Integer.class)); | |
275 | readToken(END_OBJECT); | |
276 | } | |
277 | ||
278 | return buf; | |
279 | } | |
280 | }; | |
281 | ||
282 | BufferReader MONTH_DAY_NANOS = new BufferReader() { | |
283 | @Override | |
284 | protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException { | |
285 | final long size = (long) count * IntervalMonthDayNanoVector.TYPE_WIDTH; | |
286 | ArrowBuf buf = allocator.buffer(size); | |
287 | ||
288 | for (int i = 0; i < count; i++) { | |
289 | readToken(START_OBJECT); | |
290 | buf.writeInt(readNextField("months", Integer.class)); | |
291 | buf.writeInt(readNextField("days", Integer.class)); | |
292 | buf.writeLong(readNextField("nanoseconds", Long.class)); | |
293 | readToken(END_OBJECT); | |
294 | } | |
295 | ||
296 | return buf; | |
297 | } | |
298 | }; | |
299 | ||
300 | ||
301 | BufferReader INT1 = new BufferReader() { | |
302 | @Override | |
303 | protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException { | |
304 | final long size = (long) count * TinyIntVector.TYPE_WIDTH; | |
305 | ArrowBuf buf = allocator.buffer(size); | |
306 | ||
307 | for (int i = 0; i < count; i++) { | |
308 | parser.nextToken(); | |
309 | buf.writeByte(parser.getByteValue()); | |
310 | } | |
311 | ||
312 | return buf; | |
313 | } | |
314 | }; | |
315 | ||
316 | BufferReader INT2 = new BufferReader() { | |
317 | @Override | |
318 | protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException { | |
319 | final long size = (long) count * SmallIntVector.TYPE_WIDTH; | |
320 | ArrowBuf buf = allocator.buffer(size); | |
321 | ||
322 | for (int i = 0; i < count; i++) { | |
323 | parser.nextToken(); | |
324 | buf.writeShort(parser.getShortValue()); | |
325 | } | |
326 | ||
327 | return buf; | |
328 | } | |
329 | }; | |
330 | ||
331 | BufferReader INT4 = new BufferReader() { | |
332 | @Override | |
333 | protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException { | |
334 | final long size = (long) count * IntVector.TYPE_WIDTH; | |
335 | ArrowBuf buf = allocator.buffer(size); | |
336 | ||
337 | for (int i = 0; i < count; i++) { | |
338 | parser.nextToken(); | |
339 | buf.writeInt(parser.getIntValue()); | |
340 | } | |
341 | ||
342 | return buf; | |
343 | } | |
344 | }; | |
345 | ||
346 | BufferReader INT8 = new BufferReader() { | |
347 | @Override | |
348 | protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException { | |
349 | final long size = (long) count * BigIntVector.TYPE_WIDTH; | |
350 | ArrowBuf buf = allocator.buffer(size); | |
351 | ||
352 | for (int i = 0; i < count; i++) { | |
353 | parser.nextToken(); | |
354 | String value = parser.getValueAsString(); | |
355 | buf.writeLong(Long.valueOf(value)); | |
356 | } | |
357 | ||
358 | return buf; | |
359 | } | |
360 | }; | |
361 | ||
362 | BufferReader UINT1 = new BufferReader() { | |
363 | @Override | |
364 | protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException { | |
365 | final long size = (long) count * TinyIntVector.TYPE_WIDTH; | |
366 | ArrowBuf buf = allocator.buffer(size); | |
367 | ||
368 | for (int i = 0; i < count; i++) { | |
369 | parser.nextToken(); | |
370 | buf.writeByte(parser.getShortValue() & 0xFF); | |
371 | } | |
372 | ||
373 | return buf; | |
374 | } | |
375 | }; | |
376 | ||
377 | BufferReader UINT2 = new BufferReader() { | |
378 | @Override | |
379 | protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException { | |
380 | final long size = (long) count * SmallIntVector.TYPE_WIDTH; | |
381 | ArrowBuf buf = allocator.buffer(size); | |
382 | ||
383 | for (int i = 0; i < count; i++) { | |
384 | parser.nextToken(); | |
385 | buf.writeShort(parser.getIntValue() & 0xFFFF); | |
386 | } | |
387 | ||
388 | return buf; | |
389 | } | |
390 | }; | |
391 | ||
392 | BufferReader UINT4 = new BufferReader() { | |
393 | @Override | |
394 | protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException { | |
395 | final long size = (long) count * IntVector.TYPE_WIDTH; | |
396 | ArrowBuf buf = allocator.buffer(size); | |
397 | ||
398 | for (int i = 0; i < count; i++) { | |
399 | parser.nextToken(); | |
400 | buf.writeInt((int) parser.getLongValue()); | |
401 | } | |
402 | ||
403 | return buf; | |
404 | } | |
405 | }; | |
406 | ||
407 | BufferReader UINT8 = new BufferReader() { | |
408 | @Override | |
409 | protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException { | |
410 | final long size = (long) count * BigIntVector.TYPE_WIDTH; | |
411 | ArrowBuf buf = allocator.buffer(size); | |
412 | ||
413 | for (int i = 0; i < count; i++) { | |
414 | parser.nextToken(); | |
415 | BigInteger value = new BigInteger(parser.getValueAsString()); | |
416 | buf.writeLong(value.longValue()); | |
417 | } | |
418 | ||
419 | return buf; | |
420 | } | |
421 | }; | |
422 | ||
423 | BufferReader FLOAT4 = new BufferReader() { | |
424 | @Override | |
425 | protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException { | |
426 | final long size = (long) count * Float4Vector.TYPE_WIDTH; | |
427 | ArrowBuf buf = allocator.buffer(size); | |
428 | ||
429 | for (int i = 0; i < count; i++) { | |
430 | parser.nextToken(); | |
431 | buf.writeFloat(parser.getFloatValue()); | |
432 | } | |
433 | ||
434 | return buf; | |
435 | } | |
436 | }; | |
437 | ||
438 | BufferReader FLOAT8 = new BufferReader() { | |
439 | @Override | |
440 | protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException { | |
441 | final long size = (long) count * Float8Vector.TYPE_WIDTH; | |
442 | ArrowBuf buf = allocator.buffer(size); | |
443 | ||
444 | for (int i = 0; i < count; i++) { | |
445 | parser.nextToken(); | |
446 | buf.writeDouble(parser.getDoubleValue()); | |
447 | } | |
448 | ||
449 | return buf; | |
450 | } | |
451 | }; | |
452 | ||
453 | BufferReader DECIMAL = new BufferReader() { | |
454 | @Override | |
455 | protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException { | |
456 | final long size = (long) count * DecimalVector.TYPE_WIDTH; | |
457 | ArrowBuf buf = allocator.buffer(size); | |
458 | ||
459 | for (int i = 0; i < count; i++) { | |
460 | parser.nextToken(); | |
461 | BigDecimal decimalValue = new BigDecimal(parser.readValueAs(String.class)); | |
462 | DecimalUtility.writeBigDecimalToArrowBuf(decimalValue, buf, i, DecimalVector.TYPE_WIDTH); | |
463 | } | |
464 | ||
465 | buf.writerIndex(size); | |
466 | return buf; | |
467 | } | |
468 | }; | |
469 | ||
470 | BufferReader DECIMAL256 = new BufferReader() { | |
471 | @Override | |
472 | protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException { | |
473 | final long size = (long) count * Decimal256Vector.TYPE_WIDTH; | |
474 | ArrowBuf buf = allocator.buffer(size); | |
475 | ||
476 | for (int i = 0; i < count; i++) { | |
477 | parser.nextToken(); | |
478 | BigDecimal decimalValue = new BigDecimal(parser.readValueAs(String.class)); | |
479 | DecimalUtility.writeBigDecimalToArrowBuf(decimalValue, buf, i, Decimal256Vector.TYPE_WIDTH); | |
480 | } | |
481 | ||
482 | buf.writerIndex(size); | |
483 | return buf; | |
484 | } | |
485 | }; | |
486 | ||
487 | ArrowBuf readBinaryValues( | |
488 | BufferAllocator allocator, int count) throws IOException { | |
489 | ArrayList<byte[]> values = new ArrayList<>(count); | |
490 | long bufferSize = 0L; | |
491 | for (int i = 0; i < count; i++) { | |
492 | parser.nextToken(); | |
493 | final byte[] value = decodeHexSafe(parser.readValueAs(String.class)); | |
494 | values.add(value); | |
495 | bufferSize += value.length; | |
496 | } | |
497 | ||
498 | ArrowBuf buf = allocator.buffer(bufferSize); | |
499 | ||
500 | for (byte[] value : values) { | |
501 | buf.writeBytes(value); | |
502 | } | |
503 | ||
504 | return buf; | |
505 | } | |
506 | ||
507 | ArrowBuf readStringValues( | |
508 | BufferAllocator allocator, int count) throws IOException { | |
509 | ArrayList<byte[]> values = new ArrayList<>(count); | |
510 | long bufferSize = 0L; | |
511 | for (int i = 0; i < count; i++) { | |
512 | parser.nextToken(); | |
513 | final byte[] value = parser.getValueAsString().getBytes(UTF_8); | |
514 | values.add(value); | |
515 | bufferSize += value.length; | |
516 | } | |
517 | ||
518 | ArrowBuf buf = allocator.buffer(bufferSize); | |
519 | ||
520 | for (byte[] value : values) { | |
521 | buf.writeBytes(value); | |
522 | } | |
523 | ||
524 | return buf; | |
525 | } | |
526 | ||
527 | BufferReader FIXEDSIZEBINARY = new BufferReader() { | |
528 | @Override | |
529 | protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException { | |
530 | return readBinaryValues(allocator, count); | |
531 | } | |
532 | }; | |
533 | ||
534 | BufferReader VARCHAR = new BufferReader() { | |
535 | @Override | |
536 | protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException { | |
537 | return readStringValues(allocator, count); | |
538 | } | |
539 | }; | |
540 | ||
541 | BufferReader LARGEVARCHAR = new BufferReader() { | |
542 | @Override | |
543 | protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException { | |
544 | return readStringValues(allocator, count); | |
545 | } | |
546 | }; | |
547 | ||
548 | BufferReader VARBINARY = new BufferReader() { | |
549 | @Override | |
550 | protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException { | |
551 | return readBinaryValues(allocator, count); | |
552 | } | |
553 | }; | |
554 | ||
555 | BufferReader LARGEVARBINARY = new BufferReader() { | |
556 | @Override | |
557 | protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException { | |
558 | return readBinaryValues(allocator, count); | |
559 | } | |
560 | }; | |
561 | } | |
562 | ||
563 | private ArrowBuf readIntoBuffer(BufferAllocator allocator, BufferType bufferType, | |
564 | Types.MinorType type, int count) throws IOException { | |
565 | ArrowBuf buf; | |
566 | ||
567 | BufferHelper helper = new BufferHelper(); | |
568 | ||
569 | BufferReader reader = null; | |
570 | ||
571 | if (bufferType.equals(VALIDITY)) { | |
572 | reader = helper.BIT; | |
573 | } else if (bufferType.equals(OFFSET)) { | |
574 | if (type == Types.MinorType.LARGELIST || | |
575 | type == Types.MinorType.LARGEVARCHAR || | |
576 | type == Types.MinorType.LARGEVARBINARY) { | |
577 | reader = helper.INT8; | |
578 | } else { | |
579 | reader = helper.INT4; | |
580 | } | |
581 | } else if (bufferType.equals(TYPE)) { | |
582 | reader = helper.INT1; | |
583 | } else if (bufferType.equals(DATA)) { | |
584 | switch (type) { | |
585 | case BIT: | |
586 | reader = helper.BIT; | |
587 | break; | |
588 | case TINYINT: | |
589 | reader = helper.INT1; | |
590 | break; | |
591 | case SMALLINT: | |
592 | reader = helper.INT2; | |
593 | break; | |
594 | case INT: | |
595 | reader = helper.INT4; | |
596 | break; | |
597 | case BIGINT: | |
598 | reader = helper.INT8; | |
599 | break; | |
600 | case UINT1: | |
601 | reader = helper.UINT1; | |
602 | break; | |
603 | case UINT2: | |
604 | reader = helper.UINT2; | |
605 | break; | |
606 | case UINT4: | |
607 | reader = helper.UINT4; | |
608 | break; | |
609 | case UINT8: | |
610 | reader = helper.UINT8; | |
611 | break; | |
612 | case FLOAT4: | |
613 | reader = helper.FLOAT4; | |
614 | break; | |
615 | case FLOAT8: | |
616 | reader = helper.FLOAT8; | |
617 | break; | |
618 | case DECIMAL: | |
619 | reader = helper.DECIMAL; | |
620 | break; | |
621 | case DECIMAL256: | |
622 | reader = helper.DECIMAL256; | |
623 | break; | |
624 | case FIXEDSIZEBINARY: | |
625 | reader = helper.FIXEDSIZEBINARY; | |
626 | break; | |
627 | case VARCHAR: | |
628 | reader = helper.VARCHAR; | |
629 | break; | |
630 | case LARGEVARCHAR: | |
631 | reader = helper.LARGEVARCHAR; | |
632 | break; | |
633 | case VARBINARY: | |
634 | reader = helper.VARBINARY; | |
635 | break; | |
636 | case LARGEVARBINARY: | |
637 | reader = helper.LARGEVARBINARY; | |
638 | break; | |
639 | case DATEDAY: | |
640 | reader = helper.INT4; | |
641 | break; | |
642 | case DATEMILLI: | |
643 | reader = helper.INT8; | |
644 | break; | |
645 | case TIMESEC: | |
646 | case TIMEMILLI: | |
647 | reader = helper.INT4; | |
648 | break; | |
649 | case TIMEMICRO: | |
650 | case TIMENANO: | |
651 | reader = helper.INT8; | |
652 | break; | |
653 | case TIMESTAMPNANO: | |
654 | case TIMESTAMPMICRO: | |
655 | case TIMESTAMPMILLI: | |
656 | case TIMESTAMPSEC: | |
657 | case TIMESTAMPNANOTZ: | |
658 | case TIMESTAMPMICROTZ: | |
659 | case TIMESTAMPMILLITZ: | |
660 | case TIMESTAMPSECTZ: | |
661 | reader = helper.INT8; | |
662 | break; | |
663 | case INTERVALYEAR: | |
664 | reader = helper.INT4; | |
665 | break; | |
666 | case INTERVALDAY: | |
667 | reader = helper.DAY_MILLIS; | |
668 | break; | |
669 | case INTERVALMONTHDAYNANO: | |
670 | reader = helper.MONTH_DAY_NANOS; | |
671 | break; | |
672 | case DURATION: | |
673 | reader = helper.INT8; | |
674 | break; | |
675 | default: | |
676 | throw new UnsupportedOperationException("Cannot read array of type " + type); | |
677 | } | |
678 | } else { | |
679 | throw new InvalidArrowFileException("Unrecognized buffer type " + bufferType); | |
680 | } | |
681 | ||
682 | buf = reader.readBuffer(allocator, count); | |
683 | ||
684 | Preconditions.checkNotNull(buf); | |
685 | return buf; | |
686 | } | |
687 | ||
688 | private void readFromJsonIntoVector(Field field, FieldVector vector) throws JsonParseException, IOException { | |
689 | TypeLayout typeLayout = TypeLayout.getTypeLayout(field.getType()); | |
690 | List<BufferType> vectorTypes = typeLayout.getBufferTypes(); | |
691 | ArrowBuf[] vectorBuffers = new ArrowBuf[vectorTypes.size()]; | |
692 | /* | |
693 | * The order of inner buffers is : | |
694 | * Fixed width vector: | |
695 | * -- validity buffer | |
696 | * -- data buffer | |
697 | * Variable width vector: | |
698 | * -- validity buffer | |
699 | * -- offset buffer | |
700 | * -- data buffer | |
701 | * | |
702 | * This is similar to what getFieldInnerVectors() used to give but now that we don't have | |
703 | * inner vectors anymore, we will work directly at the buffer level -- populate buffers | |
704 | * locally as we read from Json parser and do loadFieldBuffers on the vector followed by | |
705 | * releasing the local buffers. | |
706 | */ | |
707 | readToken(START_OBJECT); | |
708 | { | |
709 | // If currently reading dictionaries, field name is not important so don't check | |
710 | String name = readNextField("name", String.class); | |
711 | if (started && !Objects.equals(field.getName(), name)) { | |
712 | throw new IllegalArgumentException("Expected field " + field.getName() + " but got " + name); | |
713 | } | |
714 | ||
715 | /* Initialize the vector with required capacity but don't allocateNew since we would | |
716 | * be doing loadFieldBuffers. | |
717 | */ | |
718 | int valueCount = readNextField("count", Integer.class); | |
719 | vector.setInitialCapacity(valueCount); | |
720 | ||
721 | for (int v = 0; v < vectorTypes.size(); v++) { | |
722 | BufferType bufferType = vectorTypes.get(v); | |
723 | nextFieldIs(bufferType.getName()); | |
724 | int innerBufferValueCount = valueCount; | |
725 | if (bufferType.equals(OFFSET) && !field.getType().getTypeID().equals(ArrowType.ArrowTypeID.Union)) { | |
726 | /* offset buffer has 1 additional value capacity */ | |
727 | innerBufferValueCount = valueCount + 1; | |
728 | } | |
729 | ||
730 | vectorBuffers[v] = readIntoBuffer(allocator, bufferType, vector.getMinorType(), innerBufferValueCount); | |
731 | } | |
732 | ||
733 | if (vectorBuffers.length == 0) { | |
734 | readToken(END_OBJECT); | |
735 | return; | |
736 | } | |
737 | ||
738 | int nullCount = 0; | |
739 | if (!(vector.getField().getFieldType().getType() instanceof ArrowType.Union)) { | |
740 | nullCount = BitVectorHelper.getNullCount(vectorBuffers[0], valueCount); | |
741 | } | |
742 | final ArrowFieldNode fieldNode = new ArrowFieldNode(valueCount, nullCount); | |
743 | vector.loadFieldBuffers(fieldNode, Arrays.asList(vectorBuffers)); | |
744 | ||
745 | /* read child vectors (if any) */ | |
746 | List<Field> fields = field.getChildren(); | |
747 | if (!fields.isEmpty()) { | |
748 | List<FieldVector> vectorChildren = vector.getChildrenFromFields(); | |
749 | if (fields.size() != vectorChildren.size()) { | |
750 | throw new IllegalArgumentException( | |
751 | "fields and children are not the same size: " + fields.size() + " != " + vectorChildren.size()); | |
752 | } | |
753 | nextFieldIs("children"); | |
754 | readToken(START_ARRAY); | |
755 | for (int i = 0; i < fields.size(); i++) { | |
756 | Field childField = fields.get(i); | |
757 | FieldVector childVector = vectorChildren.get(i); | |
758 | readFromJsonIntoVector(childField, childVector); | |
759 | } | |
760 | readToken(END_ARRAY); | |
761 | } | |
762 | } | |
763 | readToken(END_OBJECT); | |
764 | ||
765 | for (ArrowBuf buffer: vectorBuffers) { | |
766 | buffer.getReferenceManager().release(); | |
767 | } | |
768 | } | |
769 | ||
770 | private byte[] decodeHexSafe(String hexString) throws IOException { | |
771 | try { | |
772 | return Hex.decodeHex(hexString.toCharArray()); | |
773 | } catch (DecoderException e) { | |
774 | throw new IOException("Unable to decode hex string: " + hexString, e); | |
775 | } | |
776 | } | |
777 | ||
778 | @Override | |
779 | public void close() throws IOException { | |
780 | parser.close(); | |
781 | for (Dictionary dictionary : dictionaries.values()) { | |
782 | dictionary.getVector().close(); | |
783 | } | |
784 | } | |
785 | ||
786 | private <T> T readNextField(String expectedFieldName, Class<T> c) throws IOException, JsonParseException { | |
787 | nextFieldIs(expectedFieldName); | |
788 | parser.nextToken(); | |
789 | return parser.readValueAs(c); | |
790 | } | |
791 | ||
792 | private void nextFieldIs(String expectedFieldName) throws IOException, JsonParseException { | |
793 | String name = parser.nextFieldName(); | |
794 | if (name == null || !name.equals(expectedFieldName)) { | |
795 | throw new IllegalStateException("Expected " + expectedFieldName + " but got " + name); | |
796 | } | |
797 | } | |
798 | ||
799 | private void readToken(JsonToken expected) throws JsonParseException, IOException { | |
800 | JsonToken t = parser.nextToken(); | |
801 | if (t != expected) { | |
802 | throw new IllegalStateException("Expected " + expected + " but got " + t); | |
803 | } | |
804 | } | |
805 | ||
806 | } |