2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org
.apache
.arrow
.vector
.ipc
;
20 import static org
.apache
.arrow
.vector
.BufferLayout
.BufferType
.*;
23 import java
.io
.IOException
;
24 import java
.math
.BigDecimal
;
25 import java
.util
.ArrayList
;
26 import java
.util
.Collections
;
27 import java
.util
.HashSet
;
28 import java
.util
.List
;
31 import org
.apache
.arrow
.memory
.ArrowBuf
;
32 import org
.apache
.arrow
.util
.Preconditions
;
33 import org
.apache
.arrow
.vector
.BaseVariableWidthVector
;
34 import org
.apache
.arrow
.vector
.BigIntVector
;
35 import org
.apache
.arrow
.vector
.BitVectorHelper
;
36 import org
.apache
.arrow
.vector
.BufferLayout
.BufferType
;
37 import org
.apache
.arrow
.vector
.DateDayVector
;
38 import org
.apache
.arrow
.vector
.DateMilliVector
;
39 import org
.apache
.arrow
.vector
.Decimal256Vector
;
40 import org
.apache
.arrow
.vector
.DecimalVector
;
41 import org
.apache
.arrow
.vector
.DurationVector
;
42 import org
.apache
.arrow
.vector
.FieldVector
;
43 import org
.apache
.arrow
.vector
.FixedSizeBinaryVector
;
44 import org
.apache
.arrow
.vector
.Float4Vector
;
45 import org
.apache
.arrow
.vector
.Float8Vector
;
46 import org
.apache
.arrow
.vector
.IntVector
;
47 import org
.apache
.arrow
.vector
.IntervalDayVector
;
48 import org
.apache
.arrow
.vector
.IntervalMonthDayNanoVector
;
49 import org
.apache
.arrow
.vector
.IntervalYearVector
;
50 import org
.apache
.arrow
.vector
.SmallIntVector
;
51 import org
.apache
.arrow
.vector
.TimeMicroVector
;
52 import org
.apache
.arrow
.vector
.TimeMilliVector
;
53 import org
.apache
.arrow
.vector
.TimeNanoVector
;
54 import org
.apache
.arrow
.vector
.TimeSecVector
;
55 import org
.apache
.arrow
.vector
.TimeStampMicroTZVector
;
56 import org
.apache
.arrow
.vector
.TimeStampMicroVector
;
57 import org
.apache
.arrow
.vector
.TimeStampMilliTZVector
;
58 import org
.apache
.arrow
.vector
.TimeStampMilliVector
;
59 import org
.apache
.arrow
.vector
.TimeStampNanoTZVector
;
60 import org
.apache
.arrow
.vector
.TimeStampNanoVector
;
61 import org
.apache
.arrow
.vector
.TimeStampSecTZVector
;
62 import org
.apache
.arrow
.vector
.TimeStampSecVector
;
63 import org
.apache
.arrow
.vector
.TinyIntVector
;
64 import org
.apache
.arrow
.vector
.TypeLayout
;
65 import org
.apache
.arrow
.vector
.UInt1Vector
;
66 import org
.apache
.arrow
.vector
.UInt2Vector
;
67 import org
.apache
.arrow
.vector
.UInt4Vector
;
68 import org
.apache
.arrow
.vector
.UInt8Vector
;
69 import org
.apache
.arrow
.vector
.VectorSchemaRoot
;
70 import org
.apache
.arrow
.vector
.dictionary
.Dictionary
;
71 import org
.apache
.arrow
.vector
.dictionary
.DictionaryProvider
;
72 import org
.apache
.arrow
.vector
.types
.Types
.MinorType
;
73 import org
.apache
.arrow
.vector
.types
.pojo
.Field
;
74 import org
.apache
.arrow
.vector
.types
.pojo
.Schema
;
75 import org
.apache
.arrow
.vector
.util
.DecimalUtility
;
76 import org
.apache
.arrow
.vector
.util
.DictionaryUtility
;
77 import org
.apache
.commons
.codec
.binary
.Hex
;
79 import com
.fasterxml
.jackson
.core
.JsonEncoding
;
80 import com
.fasterxml
.jackson
.core
.JsonGenerator
;
81 import com
.fasterxml
.jackson
.core
.util
.DefaultPrettyPrinter
;
82 import com
.fasterxml
.jackson
.core
.util
.DefaultPrettyPrinter
.NopIndenter
;
83 import com
.fasterxml
.jackson
.databind
.MappingJsonFactory
;
86 * A writer that converts binary Vectors into a JSON format suitable
87 * for integration testing.
89 public class JsonFileWriter
implements AutoCloseable
{
92 * Configuration POJO for writing JSON files.
94 public static final class JSONWriteConfig
{
95 private final boolean pretty
;
97 private JSONWriteConfig(boolean pretty
) {
101 private JSONWriteConfig() {
105 public JSONWriteConfig
pretty(boolean pretty
) {
106 return new JSONWriteConfig(pretty
);
110 public static JSONWriteConfig
config() {
111 return new JSONWriteConfig();
114 private final JsonGenerator generator
;
115 private Schema schema
;
118 * Constructs a new writer that will output to <code>outputFile</code>.
120 public JsonFileWriter(File outputFile
) throws IOException
{
121 this(outputFile
, config());
125 * Constructs a new writer that will output to <code>outputFile</code> with the given options.
127 public JsonFileWriter(File outputFile
, JSONWriteConfig config
) throws IOException
{
128 MappingJsonFactory jsonFactory
= new MappingJsonFactory();
129 this.generator
= jsonFactory
.createGenerator(outputFile
, JsonEncoding
.UTF8
);
131 DefaultPrettyPrinter prettyPrinter
= new DefaultPrettyPrinter();
132 prettyPrinter
.indentArraysWith(NopIndenter
.instance
);
133 this.generator
.setPrettyPrinter(prettyPrinter
);
135 // Allow writing of floating point NaN values not as strings
136 this.generator
.configure(JsonGenerator
.Feature
.QUOTE_NON_NUMERIC_NUMBERS
, false);
140 * Writes out the "header" of the file including the schema and any dictionaries required.
142 public void start(Schema schema
, DictionaryProvider provider
) throws IOException
{
143 List
<Field
> fields
= new ArrayList
<>(schema
.getFields().size());
144 Set
<Long
> dictionaryIdsUsed
= new HashSet
<>();
145 this.schema
= schema
; // Store original Schema to ensure batches written match
147 // Convert fields with dictionaries to have dictionary type
148 for (Field field
: schema
.getFields()) {
149 fields
.add(DictionaryUtility
.toMessageFormat(field
, provider
, dictionaryIdsUsed
));
151 Schema updatedSchema
= new Schema(fields
, schema
.getCustomMetadata());
153 generator
.writeStartObject();
154 generator
.writeObjectField("schema", updatedSchema
);
156 // Write all dictionaries that were used
157 if (!dictionaryIdsUsed
.isEmpty()) {
158 writeDictionaryBatches(generator
, dictionaryIdsUsed
, provider
);
161 // Start writing of record batches
162 generator
.writeArrayFieldStart("batches");
165 private void writeDictionaryBatches(JsonGenerator generator
, Set
<Long
> dictionaryIdsUsed
, DictionaryProvider provider
)
167 generator
.writeArrayFieldStart("dictionaries");
168 for (Long id
: dictionaryIdsUsed
) {
169 generator
.writeStartObject();
170 generator
.writeObjectField("id", id
);
172 generator
.writeFieldName("data");
173 Dictionary dictionary
= provider
.lookup(id
);
174 FieldVector vector
= dictionary
.getVector();
175 List
<Field
> fields
= Collections
.singletonList(vector
.getField());
176 List
<FieldVector
> vectors
= Collections
.singletonList(vector
);
177 VectorSchemaRoot root
= new VectorSchemaRoot(fields
, vectors
, vector
.getValueCount());
180 generator
.writeEndObject();
182 generator
.writeEndArray();
185 /** Writes the record batch to the JSON file. */
186 public void write(VectorSchemaRoot recordBatch
) throws IOException
{
187 if (!recordBatch
.getSchema().equals(schema
)) {
188 throw new IllegalArgumentException("record batches must have the same schema: " + schema
);
190 writeBatch(recordBatch
);
193 private void writeBatch(VectorSchemaRoot recordBatch
) throws IOException
{
194 generator
.writeStartObject();
196 generator
.writeObjectField("count", recordBatch
.getRowCount());
197 generator
.writeArrayFieldStart("columns");
198 for (Field field
: recordBatch
.getSchema().getFields()) {
199 FieldVector vector
= recordBatch
.getVector(field
);
200 writeFromVectorIntoJson(field
, vector
);
202 generator
.writeEndArray();
204 generator
.writeEndObject();
207 private void writeFromVectorIntoJson(Field field
, FieldVector vector
) throws IOException
{
208 List
<BufferType
> vectorTypes
= TypeLayout
.getTypeLayout(field
.getType()).getBufferTypes();
209 List
<ArrowBuf
> vectorBuffers
= vector
.getFieldBuffers();
210 if (vectorTypes
.size() != vectorBuffers
.size()) {
211 throw new IllegalArgumentException("vector types and inner vector buffers are not the same size: " +
212 vectorTypes
.size() + " != " + vectorBuffers
.size());
214 generator
.writeStartObject();
216 generator
.writeObjectField("name", field
.getName());
217 int valueCount
= vector
.getValueCount();
218 generator
.writeObjectField("count", valueCount
);
220 for (int v
= 0; v
< vectorTypes
.size(); v
++) {
221 BufferType bufferType
= vectorTypes
.get(v
);
222 ArrowBuf vectorBuffer
= vectorBuffers
.get(v
);
223 generator
.writeArrayFieldStart(bufferType
.getName());
224 final int bufferValueCount
= (bufferType
.equals(OFFSET
) && vector
.getMinorType() != MinorType
.DENSEUNION
) ?
225 valueCount
+ 1 : valueCount
;
226 for (int i
= 0; i
< bufferValueCount
; i
++) {
227 if (bufferType
.equals(DATA
) && (vector
.getMinorType() == MinorType
.VARCHAR
||
228 vector
.getMinorType() == MinorType
.VARBINARY
)) {
229 writeValueToGenerator(bufferType
, vectorBuffer
, vectorBuffers
.get(v
- 1), vector
, i
);
230 } else if (bufferType
.equals(OFFSET
) && vector
.getValueCount() == 0 &&
231 (vector
.getMinorType() == MinorType
.VARBINARY
|| vector
.getMinorType() == MinorType
.VARCHAR
)) {
232 ArrowBuf vectorBufferTmp
= vector
.getAllocator().buffer(4);
233 vectorBufferTmp
.setInt(0, 0);
234 writeValueToGenerator(bufferType
, vectorBufferTmp
, null, vector
, i
);
235 vectorBufferTmp
.close();
237 writeValueToGenerator(bufferType
, vectorBuffer
, null, vector
, i
);
240 generator
.writeEndArray();
242 List
<Field
> fields
= field
.getChildren();
243 List
<FieldVector
> children
= vector
.getChildrenFromFields();
244 if (fields
.size() != children
.size()) {
245 throw new IllegalArgumentException("fields and children are not the same size: " + fields
.size() + " != " +
248 if (fields
.size() > 0) {
249 generator
.writeArrayFieldStart("children");
250 for (int i
= 0; i
< fields
.size(); i
++) {
251 Field childField
= fields
.get(i
);
252 FieldVector childVector
= children
.get(i
);
253 writeFromVectorIntoJson(childField
, childVector
);
255 generator
.writeEndArray();
258 generator
.writeEndObject();
261 private void writeValueToGenerator(
262 BufferType bufferType
,
264 ArrowBuf offsetBuffer
,
266 final int index
) throws IOException
{
267 if (bufferType
.equals(TYPE
)) {
268 generator
.writeNumber(buffer
.getByte(index
* TinyIntVector
.TYPE_WIDTH
));
269 } else if (bufferType
.equals(OFFSET
)) {
270 generator
.writeNumber(buffer
.getInt(index
* BaseVariableWidthVector
.OFFSET_WIDTH
));
271 } else if (bufferType
.equals(VALIDITY
)) {
272 generator
.writeNumber(vector
.isNull(index
) ?
0 : 1);
273 } else if (bufferType
.equals(DATA
)) {
274 switch (vector
.getMinorType()) {
276 generator
.writeNumber(TinyIntVector
.get(buffer
, index
));
279 generator
.writeNumber(SmallIntVector
.get(buffer
, index
));
282 generator
.writeNumber(IntVector
.get(buffer
, index
));
285 generator
.writeString(String
.valueOf(BigIntVector
.get(buffer
, index
)));
288 generator
.writeNumber(UInt1Vector
.getNoOverflow(buffer
, index
));
291 generator
.writeNumber(UInt2Vector
.get(buffer
, index
));
294 generator
.writeNumber(UInt4Vector
.getNoOverflow(buffer
, index
));
297 generator
.writeString(UInt8Vector
.getNoOverflow(buffer
, index
).toString());
300 generator
.writeNumber(Float4Vector
.get(buffer
, index
));
303 generator
.writeNumber(Float8Vector
.get(buffer
, index
));
306 generator
.writeNumber(DateDayVector
.get(buffer
, index
));
309 generator
.writeNumber(DateMilliVector
.get(buffer
, index
));
312 generator
.writeNumber(TimeSecVector
.get(buffer
, index
));
315 generator
.writeNumber(TimeMilliVector
.get(buffer
, index
));
318 generator
.writeNumber(TimeMicroVector
.get(buffer
, index
));
321 generator
.writeNumber(TimeNanoVector
.get(buffer
, index
));
324 generator
.writeNumber(TimeStampSecVector
.get(buffer
, index
));
327 generator
.writeNumber(TimeStampMilliVector
.get(buffer
, index
));
330 generator
.writeNumber(TimeStampMicroVector
.get(buffer
, index
));
333 generator
.writeNumber(TimeStampNanoVector
.get(buffer
, index
));
336 generator
.writeNumber(TimeStampSecTZVector
.get(buffer
, index
));
338 case TIMESTAMPMILLITZ
:
339 generator
.writeNumber(TimeStampMilliTZVector
.get(buffer
, index
));
341 case TIMESTAMPMICROTZ
:
342 generator
.writeNumber(TimeStampMicroTZVector
.get(buffer
, index
));
344 case TIMESTAMPNANOTZ
:
345 generator
.writeNumber(TimeStampNanoTZVector
.get(buffer
, index
));
348 generator
.writeNumber(DurationVector
.get(buffer
, index
));
351 generator
.writeNumber(IntervalYearVector
.getTotalMonths(buffer
, index
));
354 generator
.writeStartObject();
355 generator
.writeObjectField("days", IntervalDayVector
.getDays(buffer
, index
));
356 generator
.writeObjectField("milliseconds", IntervalDayVector
.getMilliseconds(buffer
, index
));
357 generator
.writeEndObject();
359 case INTERVALMONTHDAYNANO
:
360 generator
.writeStartObject();
361 generator
.writeObjectField("months", IntervalMonthDayNanoVector
.getMonths(buffer
, index
));
362 generator
.writeObjectField("days", IntervalMonthDayNanoVector
.getDays(buffer
, index
));
363 generator
.writeObjectField("nanoseconds", IntervalMonthDayNanoVector
.getNanoseconds(buffer
, index
));
364 generator
.writeEndObject();
367 generator
.writeNumber(BitVectorHelper
.get(buffer
, index
));
370 Preconditions
.checkNotNull(offsetBuffer
);
371 String hexString
= Hex
.encodeHexString(BaseVariableWidthVector
.get(buffer
,
372 offsetBuffer
, index
));
373 generator
.writeObject(hexString
);
376 case FIXEDSIZEBINARY
:
377 int byteWidth
= ((FixedSizeBinaryVector
) vector
).getByteWidth();
378 String fixedSizeHexString
= Hex
.encodeHexString(FixedSizeBinaryVector
.get(buffer
, index
, byteWidth
));
379 generator
.writeObject(fixedSizeHexString
);
382 Preconditions
.checkNotNull(offsetBuffer
);
383 byte[] b
= (BaseVariableWidthVector
.get(buffer
, offsetBuffer
, index
));
384 generator
.writeString(new String(b
, "UTF-8"));
388 int scale
= ((DecimalVector
) vector
).getScale();
389 BigDecimal decimalValue
= DecimalUtility
.getBigDecimalFromArrowBuf(buffer
, index
, scale
,
390 DecimalVector
.TYPE_WIDTH
);
391 // We write the unscaled value, because the scale is stored in the type metadata.
392 generator
.writeString(decimalValue
.unscaledValue().toString());
396 int scale
= ((Decimal256Vector
) vector
).getScale();
397 BigDecimal decimalValue
= DecimalUtility
.getBigDecimalFromArrowBuf(buffer
, index
, scale
,
398 Decimal256Vector
.TYPE_WIDTH
);
399 // We write the unscaled value, because the scale is stored in the type metadata.
400 generator
.writeString(decimalValue
.unscaledValue().toString());
405 throw new UnsupportedOperationException("minor type: " + vector
.getMinorType());
411 public void close() throws IOException
{
412 generator
.writeEndArray();
413 generator
.writeEndObject();