]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/java/dataset/src/test/java/org/apache/arrow/dataset/ParquetWriteSupport.java
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / java / dataset / src / test / java / org / apache / arrow / dataset / ParquetWriteSupport.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.arrow.dataset;
19
20 import java.io.File;
21 import java.nio.file.Path;
22 import java.nio.file.Paths;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.List;
26
27 import org.apache.arrow.util.Preconditions;
28 import org.apache.avro.Schema;
29 import org.apache.avro.generic.GenericData;
30 import org.apache.avro.generic.GenericRecord;
31 import org.apache.parquet.avro.AvroParquetWriter;
32 import org.apache.parquet.hadoop.ParquetWriter;
33
34 /**
35 * Utility class for writing Parquet files using Avro based tools.
36 */
37 public class ParquetWriteSupport implements AutoCloseable {
38
39 private final String path;
40 private final String uri;
41 private final ParquetWriter<GenericRecord> writer;
42 private final Schema avroSchema;
43 private final List<GenericRecord> writtenRecords = new ArrayList<>();
44 private final GenericRecordListBuilder recordListBuilder = new GenericRecordListBuilder();
45
46
47 public ParquetWriteSupport(String schemaName, File outputFolder) throws Exception {
48 avroSchema = readSchemaFromFile(schemaName);
49 path = outputFolder.getPath() + File.separator + "generated.parquet";
50 uri = "file://" + path;
51 writer = AvroParquetWriter.<GenericRecord>builder(new org.apache.hadoop.fs.Path(path))
52 .withSchema(avroSchema)
53 .build();
54 }
55
56 private static Schema readSchemaFromFile(String schemaName) throws Exception {
57 Path schemaPath = Paths.get(ParquetWriteSupport.class.getResource("/").getPath(),
58 "avroschema", schemaName);
59 return new org.apache.avro.Schema.Parser().parse(schemaPath.toFile());
60 }
61
62 public static ParquetWriteSupport writeTempFile(String schemaName, File outputFolder,
63 Object... values) throws Exception {
64 try (final ParquetWriteSupport writeSupport = new ParquetWriteSupport(schemaName, outputFolder)) {
65 writeSupport.writeRecords(values);
66 return writeSupport;
67 }
68 }
69
70 public void writeRecords(Object... values) throws Exception {
71 final List<GenericRecord> valueList = getRecordListBuilder().createRecordList(values);
72 writeRecords(valueList);
73 }
74
75 public void writeRecords(List<GenericRecord> records) throws Exception {
76 for (GenericRecord record : records) {
77 writeRecord(record);
78 }
79 }
80
81 public void writeRecord(GenericRecord record) throws Exception {
82 writtenRecords.add(record);
83 writer.write(record);
84 }
85
86 public String getOutputURI() {
87 return uri;
88 }
89
90 public Schema getAvroSchema() {
91 return avroSchema;
92 }
93
94 public GenericRecordListBuilder getRecordListBuilder() {
95 return recordListBuilder;
96 }
97
98 public List<GenericRecord> getWrittenRecords() {
99 return Collections.unmodifiableList(writtenRecords);
100 }
101
102 @Override
103 public void close() throws Exception {
104 writer.close();
105 }
106
107 public class GenericRecordListBuilder {
108 public final List<GenericRecord> createRecordList(Object... values) {
109 final int fieldCount = avroSchema.getFields().size();
110 Preconditions.checkArgument(values.length % fieldCount == 0,
111 "arg count of values should be divide by field number");
112 final List<GenericRecord> recordList = new ArrayList<>();
113 for (int i = 0; i < values.length / fieldCount; i++) {
114 final GenericRecord record = new GenericData.Record(avroSchema);
115 for (int j = 0; j < fieldCount; j++) {
116 record.put(j, values[i * fieldCount + j]);
117 }
118 recordList.add(record);
119 }
120 return Collections.unmodifiableList(recordList);
121 }
122 }
123 }