]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one or more | |
3 | * contributor license agreements. See the NOTICE file distributed with | |
4 | * this work for additional information regarding copyright ownership. | |
5 | * The ASF licenses this file to You under the Apache License, Version 2.0 | |
6 | * (the "License"); you may not use this file except in compliance with | |
7 | * the License. You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | */ | |
17 | ||
18 | package org.apache.arrow.vector.ipc; | |
19 | ||
20 | import static org.apache.arrow.vector.BufferLayout.BufferType.*; | |
21 | ||
22 | import java.io.File; | |
23 | import java.io.IOException; | |
24 | import java.math.BigDecimal; | |
25 | import java.util.ArrayList; | |
26 | import java.util.Collections; | |
27 | import java.util.HashSet; | |
28 | import java.util.List; | |
29 | import java.util.Set; | |
30 | ||
31 | import org.apache.arrow.memory.ArrowBuf; | |
32 | import org.apache.arrow.util.Preconditions; | |
33 | import org.apache.arrow.vector.BaseVariableWidthVector; | |
34 | import org.apache.arrow.vector.BigIntVector; | |
35 | import org.apache.arrow.vector.BitVectorHelper; | |
36 | import org.apache.arrow.vector.BufferLayout.BufferType; | |
37 | import org.apache.arrow.vector.DateDayVector; | |
38 | import org.apache.arrow.vector.DateMilliVector; | |
39 | import org.apache.arrow.vector.Decimal256Vector; | |
40 | import org.apache.arrow.vector.DecimalVector; | |
41 | import org.apache.arrow.vector.DurationVector; | |
42 | import org.apache.arrow.vector.FieldVector; | |
43 | import org.apache.arrow.vector.FixedSizeBinaryVector; | |
44 | import org.apache.arrow.vector.Float4Vector; | |
45 | import org.apache.arrow.vector.Float8Vector; | |
46 | import org.apache.arrow.vector.IntVector; | |
47 | import org.apache.arrow.vector.IntervalDayVector; | |
48 | import org.apache.arrow.vector.IntervalMonthDayNanoVector; | |
49 | import org.apache.arrow.vector.IntervalYearVector; | |
50 | import org.apache.arrow.vector.SmallIntVector; | |
51 | import org.apache.arrow.vector.TimeMicroVector; | |
52 | import org.apache.arrow.vector.TimeMilliVector; | |
53 | import org.apache.arrow.vector.TimeNanoVector; | |
54 | import org.apache.arrow.vector.TimeSecVector; | |
55 | import org.apache.arrow.vector.TimeStampMicroTZVector; | |
56 | import org.apache.arrow.vector.TimeStampMicroVector; | |
57 | import org.apache.arrow.vector.TimeStampMilliTZVector; | |
58 | import org.apache.arrow.vector.TimeStampMilliVector; | |
59 | import org.apache.arrow.vector.TimeStampNanoTZVector; | |
60 | import org.apache.arrow.vector.TimeStampNanoVector; | |
61 | import org.apache.arrow.vector.TimeStampSecTZVector; | |
62 | import org.apache.arrow.vector.TimeStampSecVector; | |
63 | import org.apache.arrow.vector.TinyIntVector; | |
64 | import org.apache.arrow.vector.TypeLayout; | |
65 | import org.apache.arrow.vector.UInt1Vector; | |
66 | import org.apache.arrow.vector.UInt2Vector; | |
67 | import org.apache.arrow.vector.UInt4Vector; | |
68 | import org.apache.arrow.vector.UInt8Vector; | |
69 | import org.apache.arrow.vector.VectorSchemaRoot; | |
70 | import org.apache.arrow.vector.dictionary.Dictionary; | |
71 | import org.apache.arrow.vector.dictionary.DictionaryProvider; | |
72 | import org.apache.arrow.vector.types.Types.MinorType; | |
73 | import org.apache.arrow.vector.types.pojo.Field; | |
74 | import org.apache.arrow.vector.types.pojo.Schema; | |
75 | import org.apache.arrow.vector.util.DecimalUtility; | |
76 | import org.apache.arrow.vector.util.DictionaryUtility; | |
77 | import org.apache.commons.codec.binary.Hex; | |
78 | ||
79 | import com.fasterxml.jackson.core.JsonEncoding; | |
80 | import com.fasterxml.jackson.core.JsonGenerator; | |
81 | import com.fasterxml.jackson.core.util.DefaultPrettyPrinter; | |
82 | import com.fasterxml.jackson.core.util.DefaultPrettyPrinter.NopIndenter; | |
83 | import com.fasterxml.jackson.databind.MappingJsonFactory; | |
84 | ||
85 | /** | |
86 | * A writer that converts binary Vectors into a JSON format suitable | |
87 | * for integration testing. | |
88 | */ | |
89 | public class JsonFileWriter implements AutoCloseable { | |
90 | ||
91 | /** | |
92 | * Configuration POJO for writing JSON files. | |
93 | */ | |
94 | public static final class JSONWriteConfig { | |
95 | private final boolean pretty; | |
96 | ||
97 | private JSONWriteConfig(boolean pretty) { | |
98 | this.pretty = pretty; | |
99 | } | |
100 | ||
101 | private JSONWriteConfig() { | |
102 | this.pretty = false; | |
103 | } | |
104 | ||
105 | public JSONWriteConfig pretty(boolean pretty) { | |
106 | return new JSONWriteConfig(pretty); | |
107 | } | |
108 | } | |
109 | ||
110 | public static JSONWriteConfig config() { | |
111 | return new JSONWriteConfig(); | |
112 | } | |
113 | ||
114 | private final JsonGenerator generator; | |
115 | private Schema schema; | |
116 | ||
117 | /** | |
118 | * Constructs a new writer that will output to <code>outputFile</code>. | |
119 | */ | |
120 | public JsonFileWriter(File outputFile) throws IOException { | |
121 | this(outputFile, config()); | |
122 | } | |
123 | ||
124 | /** | |
125 | * Constructs a new writer that will output to <code>outputFile</code> with the given options. | |
126 | */ | |
127 | public JsonFileWriter(File outputFile, JSONWriteConfig config) throws IOException { | |
128 | MappingJsonFactory jsonFactory = new MappingJsonFactory(); | |
129 | this.generator = jsonFactory.createGenerator(outputFile, JsonEncoding.UTF8); | |
130 | if (config.pretty) { | |
131 | DefaultPrettyPrinter prettyPrinter = new DefaultPrettyPrinter(); | |
132 | prettyPrinter.indentArraysWith(NopIndenter.instance); | |
133 | this.generator.setPrettyPrinter(prettyPrinter); | |
134 | } | |
135 | // Allow writing of floating point NaN values not as strings | |
136 | this.generator.configure(JsonGenerator.Feature.QUOTE_NON_NUMERIC_NUMBERS, false); | |
137 | } | |
138 | ||
139 | /** | |
140 | * Writes out the "header" of the file including the schema and any dictionaries required. | |
141 | */ | |
142 | public void start(Schema schema, DictionaryProvider provider) throws IOException { | |
143 | List<Field> fields = new ArrayList<>(schema.getFields().size()); | |
144 | Set<Long> dictionaryIdsUsed = new HashSet<>(); | |
145 | this.schema = schema; // Store original Schema to ensure batches written match | |
146 | ||
147 | // Convert fields with dictionaries to have dictionary type | |
148 | for (Field field : schema.getFields()) { | |
149 | fields.add(DictionaryUtility.toMessageFormat(field, provider, dictionaryIdsUsed)); | |
150 | } | |
151 | Schema updatedSchema = new Schema(fields, schema.getCustomMetadata()); | |
152 | ||
153 | generator.writeStartObject(); | |
154 | generator.writeObjectField("schema", updatedSchema); | |
155 | ||
156 | // Write all dictionaries that were used | |
157 | if (!dictionaryIdsUsed.isEmpty()) { | |
158 | writeDictionaryBatches(generator, dictionaryIdsUsed, provider); | |
159 | } | |
160 | ||
161 | // Start writing of record batches | |
162 | generator.writeArrayFieldStart("batches"); | |
163 | } | |
164 | ||
165 | private void writeDictionaryBatches(JsonGenerator generator, Set<Long> dictionaryIdsUsed, DictionaryProvider provider) | |
166 | throws IOException { | |
167 | generator.writeArrayFieldStart("dictionaries"); | |
168 | for (Long id : dictionaryIdsUsed) { | |
169 | generator.writeStartObject(); | |
170 | generator.writeObjectField("id", id); | |
171 | ||
172 | generator.writeFieldName("data"); | |
173 | Dictionary dictionary = provider.lookup(id); | |
174 | FieldVector vector = dictionary.getVector(); | |
175 | List<Field> fields = Collections.singletonList(vector.getField()); | |
176 | List<FieldVector> vectors = Collections.singletonList(vector); | |
177 | VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, vector.getValueCount()); | |
178 | writeBatch(root); | |
179 | ||
180 | generator.writeEndObject(); | |
181 | } | |
182 | generator.writeEndArray(); | |
183 | } | |
184 | ||
185 | /** Writes the record batch to the JSON file. */ | |
186 | public void write(VectorSchemaRoot recordBatch) throws IOException { | |
187 | if (!recordBatch.getSchema().equals(schema)) { | |
188 | throw new IllegalArgumentException("record batches must have the same schema: " + schema); | |
189 | } | |
190 | writeBatch(recordBatch); | |
191 | } | |
192 | ||
193 | private void writeBatch(VectorSchemaRoot recordBatch) throws IOException { | |
194 | generator.writeStartObject(); | |
195 | { | |
196 | generator.writeObjectField("count", recordBatch.getRowCount()); | |
197 | generator.writeArrayFieldStart("columns"); | |
198 | for (Field field : recordBatch.getSchema().getFields()) { | |
199 | FieldVector vector = recordBatch.getVector(field); | |
200 | writeFromVectorIntoJson(field, vector); | |
201 | } | |
202 | generator.writeEndArray(); | |
203 | } | |
204 | generator.writeEndObject(); | |
205 | } | |
206 | ||
207 | private void writeFromVectorIntoJson(Field field, FieldVector vector) throws IOException { | |
208 | List<BufferType> vectorTypes = TypeLayout.getTypeLayout(field.getType()).getBufferTypes(); | |
209 | List<ArrowBuf> vectorBuffers = vector.getFieldBuffers(); | |
210 | if (vectorTypes.size() != vectorBuffers.size()) { | |
211 | throw new IllegalArgumentException("vector types and inner vector buffers are not the same size: " + | |
212 | vectorTypes.size() + " != " + vectorBuffers.size()); | |
213 | } | |
214 | generator.writeStartObject(); | |
215 | { | |
216 | generator.writeObjectField("name", field.getName()); | |
217 | int valueCount = vector.getValueCount(); | |
218 | generator.writeObjectField("count", valueCount); | |
219 | ||
220 | for (int v = 0; v < vectorTypes.size(); v++) { | |
221 | BufferType bufferType = vectorTypes.get(v); | |
222 | ArrowBuf vectorBuffer = vectorBuffers.get(v); | |
223 | generator.writeArrayFieldStart(bufferType.getName()); | |
224 | final int bufferValueCount = (bufferType.equals(OFFSET) && vector.getMinorType() != MinorType.DENSEUNION) ? | |
225 | valueCount + 1 : valueCount; | |
226 | for (int i = 0; i < bufferValueCount; i++) { | |
227 | if (bufferType.equals(DATA) && (vector.getMinorType() == MinorType.VARCHAR || | |
228 | vector.getMinorType() == MinorType.VARBINARY)) { | |
229 | writeValueToGenerator(bufferType, vectorBuffer, vectorBuffers.get(v - 1), vector, i); | |
230 | } else if (bufferType.equals(OFFSET) && vector.getValueCount() == 0 && | |
231 | (vector.getMinorType() == MinorType.VARBINARY || vector.getMinorType() == MinorType.VARCHAR)) { | |
232 | ArrowBuf vectorBufferTmp = vector.getAllocator().buffer(4); | |
233 | vectorBufferTmp.setInt(0, 0); | |
234 | writeValueToGenerator(bufferType, vectorBufferTmp, null, vector, i); | |
235 | vectorBufferTmp.close(); | |
236 | } else { | |
237 | writeValueToGenerator(bufferType, vectorBuffer, null, vector, i); | |
238 | } | |
239 | } | |
240 | generator.writeEndArray(); | |
241 | } | |
242 | List<Field> fields = field.getChildren(); | |
243 | List<FieldVector> children = vector.getChildrenFromFields(); | |
244 | if (fields.size() != children.size()) { | |
245 | throw new IllegalArgumentException("fields and children are not the same size: " + fields.size() + " != " + | |
246 | children.size()); | |
247 | } | |
248 | if (fields.size() > 0) { | |
249 | generator.writeArrayFieldStart("children"); | |
250 | for (int i = 0; i < fields.size(); i++) { | |
251 | Field childField = fields.get(i); | |
252 | FieldVector childVector = children.get(i); | |
253 | writeFromVectorIntoJson(childField, childVector); | |
254 | } | |
255 | generator.writeEndArray(); | |
256 | } | |
257 | } | |
258 | generator.writeEndObject(); | |
259 | } | |
260 | ||
261 | private void writeValueToGenerator( | |
262 | BufferType bufferType, | |
263 | ArrowBuf buffer, | |
264 | ArrowBuf offsetBuffer, | |
265 | FieldVector vector, | |
266 | final int index) throws IOException { | |
267 | if (bufferType.equals(TYPE)) { | |
268 | generator.writeNumber(buffer.getByte(index * TinyIntVector.TYPE_WIDTH)); | |
269 | } else if (bufferType.equals(OFFSET)) { | |
270 | generator.writeNumber(buffer.getInt(index * BaseVariableWidthVector.OFFSET_WIDTH)); | |
271 | } else if (bufferType.equals(VALIDITY)) { | |
272 | generator.writeNumber(vector.isNull(index) ? 0 : 1); | |
273 | } else if (bufferType.equals(DATA)) { | |
274 | switch (vector.getMinorType()) { | |
275 | case TINYINT: | |
276 | generator.writeNumber(TinyIntVector.get(buffer, index)); | |
277 | break; | |
278 | case SMALLINT: | |
279 | generator.writeNumber(SmallIntVector.get(buffer, index)); | |
280 | break; | |
281 | case INT: | |
282 | generator.writeNumber(IntVector.get(buffer, index)); | |
283 | break; | |
284 | case BIGINT: | |
285 | generator.writeString(String.valueOf(BigIntVector.get(buffer, index))); | |
286 | break; | |
287 | case UINT1: | |
288 | generator.writeNumber(UInt1Vector.getNoOverflow(buffer, index)); | |
289 | break; | |
290 | case UINT2: | |
291 | generator.writeNumber(UInt2Vector.get(buffer, index)); | |
292 | break; | |
293 | case UINT4: | |
294 | generator.writeNumber(UInt4Vector.getNoOverflow(buffer, index)); | |
295 | break; | |
296 | case UINT8: | |
297 | generator.writeString(UInt8Vector.getNoOverflow(buffer, index).toString()); | |
298 | break; | |
299 | case FLOAT4: | |
300 | generator.writeNumber(Float4Vector.get(buffer, index)); | |
301 | break; | |
302 | case FLOAT8: | |
303 | generator.writeNumber(Float8Vector.get(buffer, index)); | |
304 | break; | |
305 | case DATEDAY: | |
306 | generator.writeNumber(DateDayVector.get(buffer, index)); | |
307 | break; | |
308 | case DATEMILLI: | |
309 | generator.writeNumber(DateMilliVector.get(buffer, index)); | |
310 | break; | |
311 | case TIMESEC: | |
312 | generator.writeNumber(TimeSecVector.get(buffer, index)); | |
313 | break; | |
314 | case TIMEMILLI: | |
315 | generator.writeNumber(TimeMilliVector.get(buffer, index)); | |
316 | break; | |
317 | case TIMEMICRO: | |
318 | generator.writeNumber(TimeMicroVector.get(buffer, index)); | |
319 | break; | |
320 | case TIMENANO: | |
321 | generator.writeNumber(TimeNanoVector.get(buffer, index)); | |
322 | break; | |
323 | case TIMESTAMPSEC: | |
324 | generator.writeNumber(TimeStampSecVector.get(buffer, index)); | |
325 | break; | |
326 | case TIMESTAMPMILLI: | |
327 | generator.writeNumber(TimeStampMilliVector.get(buffer, index)); | |
328 | break; | |
329 | case TIMESTAMPMICRO: | |
330 | generator.writeNumber(TimeStampMicroVector.get(buffer, index)); | |
331 | break; | |
332 | case TIMESTAMPNANO: | |
333 | generator.writeNumber(TimeStampNanoVector.get(buffer, index)); | |
334 | break; | |
335 | case TIMESTAMPSECTZ: | |
336 | generator.writeNumber(TimeStampSecTZVector.get(buffer, index)); | |
337 | break; | |
338 | case TIMESTAMPMILLITZ: | |
339 | generator.writeNumber(TimeStampMilliTZVector.get(buffer, index)); | |
340 | break; | |
341 | case TIMESTAMPMICROTZ: | |
342 | generator.writeNumber(TimeStampMicroTZVector.get(buffer, index)); | |
343 | break; | |
344 | case TIMESTAMPNANOTZ: | |
345 | generator.writeNumber(TimeStampNanoTZVector.get(buffer, index)); | |
346 | break; | |
347 | case DURATION: | |
348 | generator.writeNumber(DurationVector.get(buffer, index)); | |
349 | break; | |
350 | case INTERVALYEAR: | |
351 | generator.writeNumber(IntervalYearVector.getTotalMonths(buffer, index)); | |
352 | break; | |
353 | case INTERVALDAY: | |
354 | generator.writeStartObject(); | |
355 | generator.writeObjectField("days", IntervalDayVector.getDays(buffer, index)); | |
356 | generator.writeObjectField("milliseconds", IntervalDayVector.getMilliseconds(buffer, index)); | |
357 | generator.writeEndObject(); | |
358 | break; | |
359 | case INTERVALMONTHDAYNANO: | |
360 | generator.writeStartObject(); | |
361 | generator.writeObjectField("months", IntervalMonthDayNanoVector.getMonths(buffer, index)); | |
362 | generator.writeObjectField("days", IntervalMonthDayNanoVector.getDays(buffer, index)); | |
363 | generator.writeObjectField("nanoseconds", IntervalMonthDayNanoVector.getNanoseconds(buffer, index)); | |
364 | generator.writeEndObject(); | |
365 | break; | |
366 | case BIT: | |
367 | generator.writeNumber(BitVectorHelper.get(buffer, index)); | |
368 | break; | |
369 | case VARBINARY: { | |
370 | Preconditions.checkNotNull(offsetBuffer); | |
371 | String hexString = Hex.encodeHexString(BaseVariableWidthVector.get(buffer, | |
372 | offsetBuffer, index)); | |
373 | generator.writeObject(hexString); | |
374 | break; | |
375 | } | |
376 | case FIXEDSIZEBINARY: | |
377 | int byteWidth = ((FixedSizeBinaryVector) vector).getByteWidth(); | |
378 | String fixedSizeHexString = Hex.encodeHexString(FixedSizeBinaryVector.get(buffer, index, byteWidth)); | |
379 | generator.writeObject(fixedSizeHexString); | |
380 | break; | |
381 | case VARCHAR: { | |
382 | Preconditions.checkNotNull(offsetBuffer); | |
383 | byte[] b = (BaseVariableWidthVector.get(buffer, offsetBuffer, index)); | |
384 | generator.writeString(new String(b, "UTF-8")); | |
385 | break; | |
386 | } | |
387 | case DECIMAL: { | |
388 | int scale = ((DecimalVector) vector).getScale(); | |
389 | BigDecimal decimalValue = DecimalUtility.getBigDecimalFromArrowBuf(buffer, index, scale, | |
390 | DecimalVector.TYPE_WIDTH); | |
391 | // We write the unscaled value, because the scale is stored in the type metadata. | |
392 | generator.writeString(decimalValue.unscaledValue().toString()); | |
393 | break; | |
394 | } | |
395 | case DECIMAL256: { | |
396 | int scale = ((Decimal256Vector) vector).getScale(); | |
397 | BigDecimal decimalValue = DecimalUtility.getBigDecimalFromArrowBuf(buffer, index, scale, | |
398 | Decimal256Vector.TYPE_WIDTH); | |
399 | // We write the unscaled value, because the scale is stored in the type metadata. | |
400 | generator.writeString(decimalValue.unscaledValue().toString()); | |
401 | break; | |
402 | } | |
403 | ||
404 | default: | |
405 | throw new UnsupportedOperationException("minor type: " + vector.getMinorType()); | |
406 | } | |
407 | } | |
408 | } | |
409 | ||
410 | @Override | |
411 | public void close() throws IOException { | |
412 | generator.writeEndArray(); | |
413 | generator.writeEndObject(); | |
414 | generator.close(); | |
415 | } | |
416 | ||
417 | } |