]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / java / vector / src / main / java / org / apache / arrow / vector / ipc / JsonFileReader.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.arrow.vector.ipc;
19
20 import static com.fasterxml.jackson.core.JsonToken.END_ARRAY;
21 import static com.fasterxml.jackson.core.JsonToken.END_OBJECT;
22 import static com.fasterxml.jackson.core.JsonToken.START_ARRAY;
23 import static com.fasterxml.jackson.core.JsonToken.START_OBJECT;
24 import static java.nio.charset.StandardCharsets.UTF_8;
25 import static org.apache.arrow.vector.BufferLayout.BufferType.DATA;
26 import static org.apache.arrow.vector.BufferLayout.BufferType.OFFSET;
27 import static org.apache.arrow.vector.BufferLayout.BufferType.TYPE;
28 import static org.apache.arrow.vector.BufferLayout.BufferType.VALIDITY;
29
30 import java.io.File;
31 import java.io.IOException;
32 import java.math.BigDecimal;
33 import java.math.BigInteger;
34 import java.util.ArrayList;
35 import java.util.Arrays;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Objects;
41
42 import org.apache.arrow.memory.ArrowBuf;
43 import org.apache.arrow.memory.BufferAllocator;
44 import org.apache.arrow.util.Preconditions;
45 import org.apache.arrow.vector.BigIntVector;
46 import org.apache.arrow.vector.BitVectorHelper;
47 import org.apache.arrow.vector.BufferLayout.BufferType;
48 import org.apache.arrow.vector.Decimal256Vector;
49 import org.apache.arrow.vector.DecimalVector;
50 import org.apache.arrow.vector.FieldVector;
51 import org.apache.arrow.vector.Float4Vector;
52 import org.apache.arrow.vector.Float8Vector;
53 import org.apache.arrow.vector.IntVector;
54 import org.apache.arrow.vector.IntervalDayVector;
55 import org.apache.arrow.vector.IntervalMonthDayNanoVector;
56 import org.apache.arrow.vector.SmallIntVector;
57 import org.apache.arrow.vector.TinyIntVector;
58 import org.apache.arrow.vector.TypeLayout;
59 import org.apache.arrow.vector.VectorSchemaRoot;
60 import org.apache.arrow.vector.dictionary.Dictionary;
61 import org.apache.arrow.vector.dictionary.DictionaryProvider;
62 import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
63 import org.apache.arrow.vector.types.Types;
64 import org.apache.arrow.vector.types.pojo.ArrowType;
65 import org.apache.arrow.vector.types.pojo.Field;
66 import org.apache.arrow.vector.types.pojo.Schema;
67 import org.apache.arrow.vector.util.DecimalUtility;
68 import org.apache.arrow.vector.util.DictionaryUtility;
69 import org.apache.commons.codec.DecoderException;
70 import org.apache.commons.codec.binary.Hex;
71
72 import com.fasterxml.jackson.core.JsonParseException;
73 import com.fasterxml.jackson.core.JsonParser;
74 import com.fasterxml.jackson.core.JsonToken;
75 import com.fasterxml.jackson.databind.MapperFeature;
76 import com.fasterxml.jackson.databind.MappingJsonFactory;
77 import com.fasterxml.jackson.databind.ObjectMapper;
78
79 /**
80 * A reader for JSON files that translates them into vectors. This reader is used for integration tests.
81 *
82 * <p>This class uses a streaming parser API, method naming tends to reflect this implementation
83 * detail.
84 */
85 public class JsonFileReader implements AutoCloseable, DictionaryProvider {
86 private final JsonParser parser;
87 private final BufferAllocator allocator;
88 private Schema schema;
89 private Map<Long, Dictionary> dictionaries;
90 private Boolean started = false;
91
92 /**
93 * Constructs a new instance.
94 * @param inputFile The file to read.
95 * @param allocator The allocator to use for allocating buffers.
96 */
97 public JsonFileReader(File inputFile, BufferAllocator allocator) throws JsonParseException, IOException {
98 super();
99 this.allocator = allocator;
100 MappingJsonFactory jsonFactory = new MappingJsonFactory(new ObjectMapper()
101 //ignore case for enums
102 .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS, true)
103 );
104 this.parser = jsonFactory.createParser(inputFile);
105 // Allow reading NaN for floating point values
106 this.parser.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
107 }
108
109 @Override
110 public Dictionary lookup(long id) {
111 if (!started) {
112 throw new IllegalStateException("Unable to lookup until after read() has started");
113 }
114
115 return dictionaries.get(id);
116 }
117
118 /** Reads the beginning (schema section) of the json file and returns it. */
119 public Schema start() throws JsonParseException, IOException {
120 readToken(START_OBJECT);
121 {
122 Schema originalSchema = readNextField("schema", Schema.class);
123 List<Field> fields = new ArrayList<>();
124 dictionaries = new HashMap<>();
125
126 // Convert fields with dictionaries to have the index type
127 for (Field field : originalSchema.getFields()) {
128 fields.add(DictionaryUtility.toMemoryFormat(field, allocator, dictionaries));
129 }
130 this.schema = new Schema(fields, originalSchema.getCustomMetadata());
131
132 if (!dictionaries.isEmpty()) {
133 nextFieldIs("dictionaries");
134 readDictionaryBatches();
135 }
136
137 nextFieldIs("batches");
138 readToken(START_ARRAY);
139 started = true;
140 return this.schema;
141 }
142 }
143
144 private void readDictionaryBatches() throws JsonParseException, IOException {
145 readToken(START_ARRAY);
146 JsonToken token = parser.nextToken();
147 boolean haveDictionaryBatch = token == START_OBJECT;
148 while (haveDictionaryBatch) {
149
150 // Lookup what dictionary for the batch about to be read
151 long id = readNextField("id", Long.class);
152 Dictionary dict = dictionaries.get(id);
153 if (dict == null) {
154 throw new IllegalArgumentException("Dictionary with id: " + id + " missing encoding from schema Field");
155 }
156
157 // Read the dictionary record batch
158 nextFieldIs("data");
159 FieldVector vector = dict.getVector();
160 List<Field> fields = Collections.singletonList(vector.getField());
161 List<FieldVector> vectors = Collections.singletonList(vector);
162 VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, vector.getValueCount());
163 read(root);
164
165 readToken(END_OBJECT);
166 token = parser.nextToken();
167 haveDictionaryBatch = token == START_OBJECT;
168 }
169
170 if (token != END_ARRAY) {
171 throw new IllegalArgumentException("Invalid token: " + token + " expected end of array at " +
172 parser.getTokenLocation());
173 }
174 }
175
176 /**
177 * Reads the next record batch from the file into <code>root</code>.
178 */
179 public boolean read(VectorSchemaRoot root) throws IOException {
180 JsonToken t = parser.nextToken();
181 if (t == START_OBJECT) {
182 {
183 int count = readNextField("count", Integer.class);
184 nextFieldIs("columns");
185 readToken(START_ARRAY);
186 {
187 for (Field field : root.getSchema().getFields()) {
188 FieldVector vector = root.getVector(field);
189 readFromJsonIntoVector(field, vector);
190 }
191 }
192 readToken(END_ARRAY);
193 root.setRowCount(count);
194 }
195 readToken(END_OBJECT);
196 return true;
197 } else if (t == END_ARRAY) {
198 root.setRowCount(0);
199 return false;
200 } else {
201 throw new IllegalArgumentException("Invalid token: " + t);
202 }
203 }
204
205 /**
206 * Returns the next record batch from the file.
207 */
208 public VectorSchemaRoot read() throws IOException {
209 JsonToken t = parser.nextToken();
210 if (t == START_OBJECT) {
211 VectorSchemaRoot recordBatch = VectorSchemaRoot.create(schema, allocator);
212 {
213 int count = readNextField("count", Integer.class);
214 recordBatch.setRowCount(count);
215 nextFieldIs("columns");
216 readToken(START_ARRAY);
217 {
218 for (Field field : schema.getFields()) {
219 FieldVector vector = recordBatch.getVector(field);
220 readFromJsonIntoVector(field, vector);
221 }
222 }
223 readToken(END_ARRAY);
224 }
225 readToken(END_OBJECT);
226 return recordBatch;
227 } else if (t == END_ARRAY) {
228 return null;
229 } else {
230 throw new IllegalArgumentException("Invalid token: " + t);
231 }
232 }
233
234 private abstract class BufferReader {
235 protected abstract ArrowBuf read(BufferAllocator allocator, int count) throws IOException;
236
237 ArrowBuf readBuffer(BufferAllocator allocator, int count) throws IOException {
238 readToken(START_ARRAY);
239 ArrowBuf buf = read(allocator, count);
240 readToken(END_ARRAY);
241 return buf;
242 }
243 }
244
245 private class BufferHelper {
246 BufferReader BIT = new BufferReader() {
247 @Override
248 protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
249 final int bufferSize = BitVectorHelper.getValidityBufferSize(count);
250 ArrowBuf buf = allocator.buffer(bufferSize);
251
252 // C++ integration test fails without this.
253 buf.setZero(0, bufferSize);
254
255 for (int i = 0; i < count; i++) {
256 parser.nextToken();
257 BitVectorHelper.setValidityBit(buf, i, parser.readValueAs(Boolean.class) ? 1 : 0);
258 }
259
260 buf.writerIndex(bufferSize);
261 return buf;
262 }
263 };
264
265 BufferReader DAY_MILLIS = new BufferReader() {
266 @Override
267 protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
268 final long size = (long) count * IntervalDayVector.TYPE_WIDTH;
269 ArrowBuf buf = allocator.buffer(size);
270
271 for (int i = 0; i < count; i++) {
272 readToken(START_OBJECT);
273 buf.writeInt(readNextField("days", Integer.class));
274 buf.writeInt(readNextField("milliseconds", Integer.class));
275 readToken(END_OBJECT);
276 }
277
278 return buf;
279 }
280 };
281
282 BufferReader MONTH_DAY_NANOS = new BufferReader() {
283 @Override
284 protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
285 final long size = (long) count * IntervalMonthDayNanoVector.TYPE_WIDTH;
286 ArrowBuf buf = allocator.buffer(size);
287
288 for (int i = 0; i < count; i++) {
289 readToken(START_OBJECT);
290 buf.writeInt(readNextField("months", Integer.class));
291 buf.writeInt(readNextField("days", Integer.class));
292 buf.writeLong(readNextField("nanoseconds", Long.class));
293 readToken(END_OBJECT);
294 }
295
296 return buf;
297 }
298 };
299
300
301 BufferReader INT1 = new BufferReader() {
302 @Override
303 protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
304 final long size = (long) count * TinyIntVector.TYPE_WIDTH;
305 ArrowBuf buf = allocator.buffer(size);
306
307 for (int i = 0; i < count; i++) {
308 parser.nextToken();
309 buf.writeByte(parser.getByteValue());
310 }
311
312 return buf;
313 }
314 };
315
316 BufferReader INT2 = new BufferReader() {
317 @Override
318 protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
319 final long size = (long) count * SmallIntVector.TYPE_WIDTH;
320 ArrowBuf buf = allocator.buffer(size);
321
322 for (int i = 0; i < count; i++) {
323 parser.nextToken();
324 buf.writeShort(parser.getShortValue());
325 }
326
327 return buf;
328 }
329 };
330
331 BufferReader INT4 = new BufferReader() {
332 @Override
333 protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
334 final long size = (long) count * IntVector.TYPE_WIDTH;
335 ArrowBuf buf = allocator.buffer(size);
336
337 for (int i = 0; i < count; i++) {
338 parser.nextToken();
339 buf.writeInt(parser.getIntValue());
340 }
341
342 return buf;
343 }
344 };
345
346 BufferReader INT8 = new BufferReader() {
347 @Override
348 protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
349 final long size = (long) count * BigIntVector.TYPE_WIDTH;
350 ArrowBuf buf = allocator.buffer(size);
351
352 for (int i = 0; i < count; i++) {
353 parser.nextToken();
354 String value = parser.getValueAsString();
355 buf.writeLong(Long.valueOf(value));
356 }
357
358 return buf;
359 }
360 };
361
362 BufferReader UINT1 = new BufferReader() {
363 @Override
364 protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
365 final long size = (long) count * TinyIntVector.TYPE_WIDTH;
366 ArrowBuf buf = allocator.buffer(size);
367
368 for (int i = 0; i < count; i++) {
369 parser.nextToken();
370 buf.writeByte(parser.getShortValue() & 0xFF);
371 }
372
373 return buf;
374 }
375 };
376
377 BufferReader UINT2 = new BufferReader() {
378 @Override
379 protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
380 final long size = (long) count * SmallIntVector.TYPE_WIDTH;
381 ArrowBuf buf = allocator.buffer(size);
382
383 for (int i = 0; i < count; i++) {
384 parser.nextToken();
385 buf.writeShort(parser.getIntValue() & 0xFFFF);
386 }
387
388 return buf;
389 }
390 };
391
392 BufferReader UINT4 = new BufferReader() {
393 @Override
394 protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
395 final long size = (long) count * IntVector.TYPE_WIDTH;
396 ArrowBuf buf = allocator.buffer(size);
397
398 for (int i = 0; i < count; i++) {
399 parser.nextToken();
400 buf.writeInt((int) parser.getLongValue());
401 }
402
403 return buf;
404 }
405 };
406
407 BufferReader UINT8 = new BufferReader() {
408 @Override
409 protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
410 final long size = (long) count * BigIntVector.TYPE_WIDTH;
411 ArrowBuf buf = allocator.buffer(size);
412
413 for (int i = 0; i < count; i++) {
414 parser.nextToken();
415 BigInteger value = new BigInteger(parser.getValueAsString());
416 buf.writeLong(value.longValue());
417 }
418
419 return buf;
420 }
421 };
422
423 BufferReader FLOAT4 = new BufferReader() {
424 @Override
425 protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
426 final long size = (long) count * Float4Vector.TYPE_WIDTH;
427 ArrowBuf buf = allocator.buffer(size);
428
429 for (int i = 0; i < count; i++) {
430 parser.nextToken();
431 buf.writeFloat(parser.getFloatValue());
432 }
433
434 return buf;
435 }
436 };
437
438 BufferReader FLOAT8 = new BufferReader() {
439 @Override
440 protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
441 final long size = (long) count * Float8Vector.TYPE_WIDTH;
442 ArrowBuf buf = allocator.buffer(size);
443
444 for (int i = 0; i < count; i++) {
445 parser.nextToken();
446 buf.writeDouble(parser.getDoubleValue());
447 }
448
449 return buf;
450 }
451 };
452
453 BufferReader DECIMAL = new BufferReader() {
454 @Override
455 protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
456 final long size = (long) count * DecimalVector.TYPE_WIDTH;
457 ArrowBuf buf = allocator.buffer(size);
458
459 for (int i = 0; i < count; i++) {
460 parser.nextToken();
461 BigDecimal decimalValue = new BigDecimal(parser.readValueAs(String.class));
462 DecimalUtility.writeBigDecimalToArrowBuf(decimalValue, buf, i, DecimalVector.TYPE_WIDTH);
463 }
464
465 buf.writerIndex(size);
466 return buf;
467 }
468 };
469
470 BufferReader DECIMAL256 = new BufferReader() {
471 @Override
472 protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
473 final long size = (long) count * Decimal256Vector.TYPE_WIDTH;
474 ArrowBuf buf = allocator.buffer(size);
475
476 for (int i = 0; i < count; i++) {
477 parser.nextToken();
478 BigDecimal decimalValue = new BigDecimal(parser.readValueAs(String.class));
479 DecimalUtility.writeBigDecimalToArrowBuf(decimalValue, buf, i, Decimal256Vector.TYPE_WIDTH);
480 }
481
482 buf.writerIndex(size);
483 return buf;
484 }
485 };
486
487 ArrowBuf readBinaryValues(
488 BufferAllocator allocator, int count) throws IOException {
489 ArrayList<byte[]> values = new ArrayList<>(count);
490 long bufferSize = 0L;
491 for (int i = 0; i < count; i++) {
492 parser.nextToken();
493 final byte[] value = decodeHexSafe(parser.readValueAs(String.class));
494 values.add(value);
495 bufferSize += value.length;
496 }
497
498 ArrowBuf buf = allocator.buffer(bufferSize);
499
500 for (byte[] value : values) {
501 buf.writeBytes(value);
502 }
503
504 return buf;
505 }
506
507 ArrowBuf readStringValues(
508 BufferAllocator allocator, int count) throws IOException {
509 ArrayList<byte[]> values = new ArrayList<>(count);
510 long bufferSize = 0L;
511 for (int i = 0; i < count; i++) {
512 parser.nextToken();
513 final byte[] value = parser.getValueAsString().getBytes(UTF_8);
514 values.add(value);
515 bufferSize += value.length;
516 }
517
518 ArrowBuf buf = allocator.buffer(bufferSize);
519
520 for (byte[] value : values) {
521 buf.writeBytes(value);
522 }
523
524 return buf;
525 }
526
527 BufferReader FIXEDSIZEBINARY = new BufferReader() {
528 @Override
529 protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
530 return readBinaryValues(allocator, count);
531 }
532 };
533
534 BufferReader VARCHAR = new BufferReader() {
535 @Override
536 protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
537 return readStringValues(allocator, count);
538 }
539 };
540
541 BufferReader LARGEVARCHAR = new BufferReader() {
542 @Override
543 protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
544 return readStringValues(allocator, count);
545 }
546 };
547
548 BufferReader VARBINARY = new BufferReader() {
549 @Override
550 protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
551 return readBinaryValues(allocator, count);
552 }
553 };
554
555 BufferReader LARGEVARBINARY = new BufferReader() {
556 @Override
557 protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
558 return readBinaryValues(allocator, count);
559 }
560 };
561 }
562
563 private ArrowBuf readIntoBuffer(BufferAllocator allocator, BufferType bufferType,
564 Types.MinorType type, int count) throws IOException {
565 ArrowBuf buf;
566
567 BufferHelper helper = new BufferHelper();
568
569 BufferReader reader = null;
570
571 if (bufferType.equals(VALIDITY)) {
572 reader = helper.BIT;
573 } else if (bufferType.equals(OFFSET)) {
574 if (type == Types.MinorType.LARGELIST ||
575 type == Types.MinorType.LARGEVARCHAR ||
576 type == Types.MinorType.LARGEVARBINARY) {
577 reader = helper.INT8;
578 } else {
579 reader = helper.INT4;
580 }
581 } else if (bufferType.equals(TYPE)) {
582 reader = helper.INT1;
583 } else if (bufferType.equals(DATA)) {
584 switch (type) {
585 case BIT:
586 reader = helper.BIT;
587 break;
588 case TINYINT:
589 reader = helper.INT1;
590 break;
591 case SMALLINT:
592 reader = helper.INT2;
593 break;
594 case INT:
595 reader = helper.INT4;
596 break;
597 case BIGINT:
598 reader = helper.INT8;
599 break;
600 case UINT1:
601 reader = helper.UINT1;
602 break;
603 case UINT2:
604 reader = helper.UINT2;
605 break;
606 case UINT4:
607 reader = helper.UINT4;
608 break;
609 case UINT8:
610 reader = helper.UINT8;
611 break;
612 case FLOAT4:
613 reader = helper.FLOAT4;
614 break;
615 case FLOAT8:
616 reader = helper.FLOAT8;
617 break;
618 case DECIMAL:
619 reader = helper.DECIMAL;
620 break;
621 case DECIMAL256:
622 reader = helper.DECIMAL256;
623 break;
624 case FIXEDSIZEBINARY:
625 reader = helper.FIXEDSIZEBINARY;
626 break;
627 case VARCHAR:
628 reader = helper.VARCHAR;
629 break;
630 case LARGEVARCHAR:
631 reader = helper.LARGEVARCHAR;
632 break;
633 case VARBINARY:
634 reader = helper.VARBINARY;
635 break;
636 case LARGEVARBINARY:
637 reader = helper.LARGEVARBINARY;
638 break;
639 case DATEDAY:
640 reader = helper.INT4;
641 break;
642 case DATEMILLI:
643 reader = helper.INT8;
644 break;
645 case TIMESEC:
646 case TIMEMILLI:
647 reader = helper.INT4;
648 break;
649 case TIMEMICRO:
650 case TIMENANO:
651 reader = helper.INT8;
652 break;
653 case TIMESTAMPNANO:
654 case TIMESTAMPMICRO:
655 case TIMESTAMPMILLI:
656 case TIMESTAMPSEC:
657 case TIMESTAMPNANOTZ:
658 case TIMESTAMPMICROTZ:
659 case TIMESTAMPMILLITZ:
660 case TIMESTAMPSECTZ:
661 reader = helper.INT8;
662 break;
663 case INTERVALYEAR:
664 reader = helper.INT4;
665 break;
666 case INTERVALDAY:
667 reader = helper.DAY_MILLIS;
668 break;
669 case INTERVALMONTHDAYNANO:
670 reader = helper.MONTH_DAY_NANOS;
671 break;
672 case DURATION:
673 reader = helper.INT8;
674 break;
675 default:
676 throw new UnsupportedOperationException("Cannot read array of type " + type);
677 }
678 } else {
679 throw new InvalidArrowFileException("Unrecognized buffer type " + bufferType);
680 }
681
682 buf = reader.readBuffer(allocator, count);
683
684 Preconditions.checkNotNull(buf);
685 return buf;
686 }
687
688 private void readFromJsonIntoVector(Field field, FieldVector vector) throws JsonParseException, IOException {
689 TypeLayout typeLayout = TypeLayout.getTypeLayout(field.getType());
690 List<BufferType> vectorTypes = typeLayout.getBufferTypes();
691 ArrowBuf[] vectorBuffers = new ArrowBuf[vectorTypes.size()];
692 /*
693 * The order of inner buffers is :
694 * Fixed width vector:
695 * -- validity buffer
696 * -- data buffer
697 * Variable width vector:
698 * -- validity buffer
699 * -- offset buffer
700 * -- data buffer
701 *
702 * This is similar to what getFieldInnerVectors() used to give but now that we don't have
703 * inner vectors anymore, we will work directly at the buffer level -- populate buffers
704 * locally as we read from Json parser and do loadFieldBuffers on the vector followed by
705 * releasing the local buffers.
706 */
707 readToken(START_OBJECT);
708 {
709 // If currently reading dictionaries, field name is not important so don't check
710 String name = readNextField("name", String.class);
711 if (started && !Objects.equals(field.getName(), name)) {
712 throw new IllegalArgumentException("Expected field " + field.getName() + " but got " + name);
713 }
714
715 /* Initialize the vector with required capacity but don't allocateNew since we would
716 * be doing loadFieldBuffers.
717 */
718 int valueCount = readNextField("count", Integer.class);
719 vector.setInitialCapacity(valueCount);
720
721 for (int v = 0; v < vectorTypes.size(); v++) {
722 BufferType bufferType = vectorTypes.get(v);
723 nextFieldIs(bufferType.getName());
724 int innerBufferValueCount = valueCount;
725 if (bufferType.equals(OFFSET) && !field.getType().getTypeID().equals(ArrowType.ArrowTypeID.Union)) {
726 /* offset buffer has 1 additional value capacity */
727 innerBufferValueCount = valueCount + 1;
728 }
729
730 vectorBuffers[v] = readIntoBuffer(allocator, bufferType, vector.getMinorType(), innerBufferValueCount);
731 }
732
733 if (vectorBuffers.length == 0) {
734 readToken(END_OBJECT);
735 return;
736 }
737
738 int nullCount = 0;
739 if (!(vector.getField().getFieldType().getType() instanceof ArrowType.Union)) {
740 nullCount = BitVectorHelper.getNullCount(vectorBuffers[0], valueCount);
741 }
742 final ArrowFieldNode fieldNode = new ArrowFieldNode(valueCount, nullCount);
743 vector.loadFieldBuffers(fieldNode, Arrays.asList(vectorBuffers));
744
745 /* read child vectors (if any) */
746 List<Field> fields = field.getChildren();
747 if (!fields.isEmpty()) {
748 List<FieldVector> vectorChildren = vector.getChildrenFromFields();
749 if (fields.size() != vectorChildren.size()) {
750 throw new IllegalArgumentException(
751 "fields and children are not the same size: " + fields.size() + " != " + vectorChildren.size());
752 }
753 nextFieldIs("children");
754 readToken(START_ARRAY);
755 for (int i = 0; i < fields.size(); i++) {
756 Field childField = fields.get(i);
757 FieldVector childVector = vectorChildren.get(i);
758 readFromJsonIntoVector(childField, childVector);
759 }
760 readToken(END_ARRAY);
761 }
762 }
763 readToken(END_OBJECT);
764
765 for (ArrowBuf buffer: vectorBuffers) {
766 buffer.getReferenceManager().release();
767 }
768 }
769
770 private byte[] decodeHexSafe(String hexString) throws IOException {
771 try {
772 return Hex.decodeHex(hexString.toCharArray());
773 } catch (DecoderException e) {
774 throw new IOException("Unable to decode hex string: " + hexString, e);
775 }
776 }
777
778 @Override
779 public void close() throws IOException {
780 parser.close();
781 for (Dictionary dictionary : dictionaries.values()) {
782 dictionary.getVector().close();
783 }
784 }
785
786 private <T> T readNextField(String expectedFieldName, Class<T> c) throws IOException, JsonParseException {
787 nextFieldIs(expectedFieldName);
788 parser.nextToken();
789 return parser.readValueAs(c);
790 }
791
792 private void nextFieldIs(String expectedFieldName) throws IOException, JsonParseException {
793 String name = parser.nextFieldName();
794 if (name == null || !name.equals(expectedFieldName)) {
795 throw new IllegalStateException("Expected " + expectedFieldName + " but got " + name);
796 }
797 }
798
799 private void readToken(JsonToken expected) throws JsonParseException, IOException {
800 JsonToken t = parser.nextToken();
801 if (t != expected) {
802 throw new IllegalStateException("Expected " + expected + " but got " + t);
803 }
804 }
805
806 }