]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileWriter.java
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / java / vector / src / main / java / org / apache / arrow / vector / ipc / JsonFileWriter.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.arrow.vector.ipc;
19
20 import static org.apache.arrow.vector.BufferLayout.BufferType.*;
21
22 import java.io.File;
23 import java.io.IOException;
24 import java.math.BigDecimal;
25 import java.util.ArrayList;
26 import java.util.Collections;
27 import java.util.HashSet;
28 import java.util.List;
29 import java.util.Set;
30
31 import org.apache.arrow.memory.ArrowBuf;
32 import org.apache.arrow.util.Preconditions;
33 import org.apache.arrow.vector.BaseVariableWidthVector;
34 import org.apache.arrow.vector.BigIntVector;
35 import org.apache.arrow.vector.BitVectorHelper;
36 import org.apache.arrow.vector.BufferLayout.BufferType;
37 import org.apache.arrow.vector.DateDayVector;
38 import org.apache.arrow.vector.DateMilliVector;
39 import org.apache.arrow.vector.Decimal256Vector;
40 import org.apache.arrow.vector.DecimalVector;
41 import org.apache.arrow.vector.DurationVector;
42 import org.apache.arrow.vector.FieldVector;
43 import org.apache.arrow.vector.FixedSizeBinaryVector;
44 import org.apache.arrow.vector.Float4Vector;
45 import org.apache.arrow.vector.Float8Vector;
46 import org.apache.arrow.vector.IntVector;
47 import org.apache.arrow.vector.IntervalDayVector;
48 import org.apache.arrow.vector.IntervalMonthDayNanoVector;
49 import org.apache.arrow.vector.IntervalYearVector;
50 import org.apache.arrow.vector.SmallIntVector;
51 import org.apache.arrow.vector.TimeMicroVector;
52 import org.apache.arrow.vector.TimeMilliVector;
53 import org.apache.arrow.vector.TimeNanoVector;
54 import org.apache.arrow.vector.TimeSecVector;
55 import org.apache.arrow.vector.TimeStampMicroTZVector;
56 import org.apache.arrow.vector.TimeStampMicroVector;
57 import org.apache.arrow.vector.TimeStampMilliTZVector;
58 import org.apache.arrow.vector.TimeStampMilliVector;
59 import org.apache.arrow.vector.TimeStampNanoTZVector;
60 import org.apache.arrow.vector.TimeStampNanoVector;
61 import org.apache.arrow.vector.TimeStampSecTZVector;
62 import org.apache.arrow.vector.TimeStampSecVector;
63 import org.apache.arrow.vector.TinyIntVector;
64 import org.apache.arrow.vector.TypeLayout;
65 import org.apache.arrow.vector.UInt1Vector;
66 import org.apache.arrow.vector.UInt2Vector;
67 import org.apache.arrow.vector.UInt4Vector;
68 import org.apache.arrow.vector.UInt8Vector;
69 import org.apache.arrow.vector.VectorSchemaRoot;
70 import org.apache.arrow.vector.dictionary.Dictionary;
71 import org.apache.arrow.vector.dictionary.DictionaryProvider;
72 import org.apache.arrow.vector.types.Types.MinorType;
73 import org.apache.arrow.vector.types.pojo.Field;
74 import org.apache.arrow.vector.types.pojo.Schema;
75 import org.apache.arrow.vector.util.DecimalUtility;
76 import org.apache.arrow.vector.util.DictionaryUtility;
77 import org.apache.commons.codec.binary.Hex;
78
79 import com.fasterxml.jackson.core.JsonEncoding;
80 import com.fasterxml.jackson.core.JsonGenerator;
81 import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
82 import com.fasterxml.jackson.core.util.DefaultPrettyPrinter.NopIndenter;
83 import com.fasterxml.jackson.databind.MappingJsonFactory;
84
85 /**
86 * A writer that converts binary Vectors into a JSON format suitable
87 * for integration testing.
88 */
89 public class JsonFileWriter implements AutoCloseable {
90
91 /**
92 * Configuration POJO for writing JSON files.
93 */
94 public static final class JSONWriteConfig {
95 private final boolean pretty;
96
97 private JSONWriteConfig(boolean pretty) {
98 this.pretty = pretty;
99 }
100
101 private JSONWriteConfig() {
102 this.pretty = false;
103 }
104
105 public JSONWriteConfig pretty(boolean pretty) {
106 return new JSONWriteConfig(pretty);
107 }
108 }
109
110 public static JSONWriteConfig config() {
111 return new JSONWriteConfig();
112 }
113
114 private final JsonGenerator generator;
115 private Schema schema;
116
117 /**
118 * Constructs a new writer that will output to <code>outputFile</code>.
119 */
120 public JsonFileWriter(File outputFile) throws IOException {
121 this(outputFile, config());
122 }
123
124 /**
125 * Constructs a new writer that will output to <code>outputFile</code> with the given options.
126 */
127 public JsonFileWriter(File outputFile, JSONWriteConfig config) throws IOException {
128 MappingJsonFactory jsonFactory = new MappingJsonFactory();
129 this.generator = jsonFactory.createGenerator(outputFile, JsonEncoding.UTF8);
130 if (config.pretty) {
131 DefaultPrettyPrinter prettyPrinter = new DefaultPrettyPrinter();
132 prettyPrinter.indentArraysWith(NopIndenter.instance);
133 this.generator.setPrettyPrinter(prettyPrinter);
134 }
135 // Allow writing of floating point NaN values not as strings
136 this.generator.configure(JsonGenerator.Feature.QUOTE_NON_NUMERIC_NUMBERS, false);
137 }
138
139 /**
140 * Writes out the "header" of the file including the schema and any dictionaries required.
141 */
142 public void start(Schema schema, DictionaryProvider provider) throws IOException {
143 List<Field> fields = new ArrayList<>(schema.getFields().size());
144 Set<Long> dictionaryIdsUsed = new HashSet<>();
145 this.schema = schema; // Store original Schema to ensure batches written match
146
147 // Convert fields with dictionaries to have dictionary type
148 for (Field field : schema.getFields()) {
149 fields.add(DictionaryUtility.toMessageFormat(field, provider, dictionaryIdsUsed));
150 }
151 Schema updatedSchema = new Schema(fields, schema.getCustomMetadata());
152
153 generator.writeStartObject();
154 generator.writeObjectField("schema", updatedSchema);
155
156 // Write all dictionaries that were used
157 if (!dictionaryIdsUsed.isEmpty()) {
158 writeDictionaryBatches(generator, dictionaryIdsUsed, provider);
159 }
160
161 // Start writing of record batches
162 generator.writeArrayFieldStart("batches");
163 }
164
165 private void writeDictionaryBatches(JsonGenerator generator, Set<Long> dictionaryIdsUsed, DictionaryProvider provider)
166 throws IOException {
167 generator.writeArrayFieldStart("dictionaries");
168 for (Long id : dictionaryIdsUsed) {
169 generator.writeStartObject();
170 generator.writeObjectField("id", id);
171
172 generator.writeFieldName("data");
173 Dictionary dictionary = provider.lookup(id);
174 FieldVector vector = dictionary.getVector();
175 List<Field> fields = Collections.singletonList(vector.getField());
176 List<FieldVector> vectors = Collections.singletonList(vector);
177 VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, vector.getValueCount());
178 writeBatch(root);
179
180 generator.writeEndObject();
181 }
182 generator.writeEndArray();
183 }
184
185 /** Writes the record batch to the JSON file. */
186 public void write(VectorSchemaRoot recordBatch) throws IOException {
187 if (!recordBatch.getSchema().equals(schema)) {
188 throw new IllegalArgumentException("record batches must have the same schema: " + schema);
189 }
190 writeBatch(recordBatch);
191 }
192
193 private void writeBatch(VectorSchemaRoot recordBatch) throws IOException {
194 generator.writeStartObject();
195 {
196 generator.writeObjectField("count", recordBatch.getRowCount());
197 generator.writeArrayFieldStart("columns");
198 for (Field field : recordBatch.getSchema().getFields()) {
199 FieldVector vector = recordBatch.getVector(field);
200 writeFromVectorIntoJson(field, vector);
201 }
202 generator.writeEndArray();
203 }
204 generator.writeEndObject();
205 }
206
207 private void writeFromVectorIntoJson(Field field, FieldVector vector) throws IOException {
208 List<BufferType> vectorTypes = TypeLayout.getTypeLayout(field.getType()).getBufferTypes();
209 List<ArrowBuf> vectorBuffers = vector.getFieldBuffers();
210 if (vectorTypes.size() != vectorBuffers.size()) {
211 throw new IllegalArgumentException("vector types and inner vector buffers are not the same size: " +
212 vectorTypes.size() + " != " + vectorBuffers.size());
213 }
214 generator.writeStartObject();
215 {
216 generator.writeObjectField("name", field.getName());
217 int valueCount = vector.getValueCount();
218 generator.writeObjectField("count", valueCount);
219
220 for (int v = 0; v < vectorTypes.size(); v++) {
221 BufferType bufferType = vectorTypes.get(v);
222 ArrowBuf vectorBuffer = vectorBuffers.get(v);
223 generator.writeArrayFieldStart(bufferType.getName());
224 final int bufferValueCount = (bufferType.equals(OFFSET) && vector.getMinorType() != MinorType.DENSEUNION) ?
225 valueCount + 1 : valueCount;
226 for (int i = 0; i < bufferValueCount; i++) {
227 if (bufferType.equals(DATA) && (vector.getMinorType() == MinorType.VARCHAR ||
228 vector.getMinorType() == MinorType.VARBINARY)) {
229 writeValueToGenerator(bufferType, vectorBuffer, vectorBuffers.get(v - 1), vector, i);
230 } else if (bufferType.equals(OFFSET) && vector.getValueCount() == 0 &&
231 (vector.getMinorType() == MinorType.VARBINARY || vector.getMinorType() == MinorType.VARCHAR)) {
232 ArrowBuf vectorBufferTmp = vector.getAllocator().buffer(4);
233 vectorBufferTmp.setInt(0, 0);
234 writeValueToGenerator(bufferType, vectorBufferTmp, null, vector, i);
235 vectorBufferTmp.close();
236 } else {
237 writeValueToGenerator(bufferType, vectorBuffer, null, vector, i);
238 }
239 }
240 generator.writeEndArray();
241 }
242 List<Field> fields = field.getChildren();
243 List<FieldVector> children = vector.getChildrenFromFields();
244 if (fields.size() != children.size()) {
245 throw new IllegalArgumentException("fields and children are not the same size: " + fields.size() + " != " +
246 children.size());
247 }
248 if (fields.size() > 0) {
249 generator.writeArrayFieldStart("children");
250 for (int i = 0; i < fields.size(); i++) {
251 Field childField = fields.get(i);
252 FieldVector childVector = children.get(i);
253 writeFromVectorIntoJson(childField, childVector);
254 }
255 generator.writeEndArray();
256 }
257 }
258 generator.writeEndObject();
259 }
260
261 private void writeValueToGenerator(
262 BufferType bufferType,
263 ArrowBuf buffer,
264 ArrowBuf offsetBuffer,
265 FieldVector vector,
266 final int index) throws IOException {
267 if (bufferType.equals(TYPE)) {
268 generator.writeNumber(buffer.getByte(index * TinyIntVector.TYPE_WIDTH));
269 } else if (bufferType.equals(OFFSET)) {
270 generator.writeNumber(buffer.getInt(index * BaseVariableWidthVector.OFFSET_WIDTH));
271 } else if (bufferType.equals(VALIDITY)) {
272 generator.writeNumber(vector.isNull(index) ? 0 : 1);
273 } else if (bufferType.equals(DATA)) {
274 switch (vector.getMinorType()) {
275 case TINYINT:
276 generator.writeNumber(TinyIntVector.get(buffer, index));
277 break;
278 case SMALLINT:
279 generator.writeNumber(SmallIntVector.get(buffer, index));
280 break;
281 case INT:
282 generator.writeNumber(IntVector.get(buffer, index));
283 break;
284 case BIGINT:
285 generator.writeString(String.valueOf(BigIntVector.get(buffer, index)));
286 break;
287 case UINT1:
288 generator.writeNumber(UInt1Vector.getNoOverflow(buffer, index));
289 break;
290 case UINT2:
291 generator.writeNumber(UInt2Vector.get(buffer, index));
292 break;
293 case UINT4:
294 generator.writeNumber(UInt4Vector.getNoOverflow(buffer, index));
295 break;
296 case UINT8:
297 generator.writeString(UInt8Vector.getNoOverflow(buffer, index).toString());
298 break;
299 case FLOAT4:
300 generator.writeNumber(Float4Vector.get(buffer, index));
301 break;
302 case FLOAT8:
303 generator.writeNumber(Float8Vector.get(buffer, index));
304 break;
305 case DATEDAY:
306 generator.writeNumber(DateDayVector.get(buffer, index));
307 break;
308 case DATEMILLI:
309 generator.writeNumber(DateMilliVector.get(buffer, index));
310 break;
311 case TIMESEC:
312 generator.writeNumber(TimeSecVector.get(buffer, index));
313 break;
314 case TIMEMILLI:
315 generator.writeNumber(TimeMilliVector.get(buffer, index));
316 break;
317 case TIMEMICRO:
318 generator.writeNumber(TimeMicroVector.get(buffer, index));
319 break;
320 case TIMENANO:
321 generator.writeNumber(TimeNanoVector.get(buffer, index));
322 break;
323 case TIMESTAMPSEC:
324 generator.writeNumber(TimeStampSecVector.get(buffer, index));
325 break;
326 case TIMESTAMPMILLI:
327 generator.writeNumber(TimeStampMilliVector.get(buffer, index));
328 break;
329 case TIMESTAMPMICRO:
330 generator.writeNumber(TimeStampMicroVector.get(buffer, index));
331 break;
332 case TIMESTAMPNANO:
333 generator.writeNumber(TimeStampNanoVector.get(buffer, index));
334 break;
335 case TIMESTAMPSECTZ:
336 generator.writeNumber(TimeStampSecTZVector.get(buffer, index));
337 break;
338 case TIMESTAMPMILLITZ:
339 generator.writeNumber(TimeStampMilliTZVector.get(buffer, index));
340 break;
341 case TIMESTAMPMICROTZ:
342 generator.writeNumber(TimeStampMicroTZVector.get(buffer, index));
343 break;
344 case TIMESTAMPNANOTZ:
345 generator.writeNumber(TimeStampNanoTZVector.get(buffer, index));
346 break;
347 case DURATION:
348 generator.writeNumber(DurationVector.get(buffer, index));
349 break;
350 case INTERVALYEAR:
351 generator.writeNumber(IntervalYearVector.getTotalMonths(buffer, index));
352 break;
353 case INTERVALDAY:
354 generator.writeStartObject();
355 generator.writeObjectField("days", IntervalDayVector.getDays(buffer, index));
356 generator.writeObjectField("milliseconds", IntervalDayVector.getMilliseconds(buffer, index));
357 generator.writeEndObject();
358 break;
359 case INTERVALMONTHDAYNANO:
360 generator.writeStartObject();
361 generator.writeObjectField("months", IntervalMonthDayNanoVector.getMonths(buffer, index));
362 generator.writeObjectField("days", IntervalMonthDayNanoVector.getDays(buffer, index));
363 generator.writeObjectField("nanoseconds", IntervalMonthDayNanoVector.getNanoseconds(buffer, index));
364 generator.writeEndObject();
365 break;
366 case BIT:
367 generator.writeNumber(BitVectorHelper.get(buffer, index));
368 break;
369 case VARBINARY: {
370 Preconditions.checkNotNull(offsetBuffer);
371 String hexString = Hex.encodeHexString(BaseVariableWidthVector.get(buffer,
372 offsetBuffer, index));
373 generator.writeObject(hexString);
374 break;
375 }
376 case FIXEDSIZEBINARY:
377 int byteWidth = ((FixedSizeBinaryVector) vector).getByteWidth();
378 String fixedSizeHexString = Hex.encodeHexString(FixedSizeBinaryVector.get(buffer, index, byteWidth));
379 generator.writeObject(fixedSizeHexString);
380 break;
381 case VARCHAR: {
382 Preconditions.checkNotNull(offsetBuffer);
383 byte[] b = (BaseVariableWidthVector.get(buffer, offsetBuffer, index));
384 generator.writeString(new String(b, "UTF-8"));
385 break;
386 }
387 case DECIMAL: {
388 int scale = ((DecimalVector) vector).getScale();
389 BigDecimal decimalValue = DecimalUtility.getBigDecimalFromArrowBuf(buffer, index, scale,
390 DecimalVector.TYPE_WIDTH);
391 // We write the unscaled value, because the scale is stored in the type metadata.
392 generator.writeString(decimalValue.unscaledValue().toString());
393 break;
394 }
395 case DECIMAL256: {
396 int scale = ((Decimal256Vector) vector).getScale();
397 BigDecimal decimalValue = DecimalUtility.getBigDecimalFromArrowBuf(buffer, index, scale,
398 Decimal256Vector.TYPE_WIDTH);
399 // We write the unscaled value, because the scale is stored in the type metadata.
400 generator.writeString(decimalValue.unscaledValue().toString());
401 break;
402 }
403
404 default:
405 throw new UnsupportedOperationException("minor type: " + vector.getMinorType());
406 }
407 }
408 }
409
410 @Override
411 public void close() throws IOException {
412 generator.writeEndArray();
413 generator.writeEndObject();
414 generator.close();
415 }
416
417 }