]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / java / vector / src / main / java / org / apache / arrow / vector / ipc / ArrowWriter.java
CommitLineData
1d09f67e
TL
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.arrow.vector.ipc;
19
20import java.io.IOException;
21import java.nio.channels.WritableByteChannel;
22import java.util.ArrayList;
23import java.util.Collections;
24import java.util.HashSet;
25import java.util.List;
26import java.util.Set;
27
28import org.apache.arrow.util.AutoCloseables;
29import org.apache.arrow.vector.FieldVector;
30import org.apache.arrow.vector.VectorSchemaRoot;
31import org.apache.arrow.vector.VectorUnloader;
32import org.apache.arrow.vector.dictionary.Dictionary;
33import org.apache.arrow.vector.dictionary.DictionaryProvider;
34import org.apache.arrow.vector.ipc.message.ArrowBlock;
35import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
36import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
37import org.apache.arrow.vector.ipc.message.IpcOption;
38import org.apache.arrow.vector.ipc.message.MessageSerializer;
39import org.apache.arrow.vector.types.pojo.Field;
40import org.apache.arrow.vector.types.pojo.Schema;
41import org.apache.arrow.vector.util.DictionaryUtility;
42import org.apache.arrow.vector.validate.MetadataV4UnionChecker;
43import org.slf4j.Logger;
44import org.slf4j.LoggerFactory;
45
46/**
47 * Abstract base class for implementing Arrow writers for IPC over a WriteChannel.
48 */
49public abstract class ArrowWriter implements AutoCloseable {
50
51 protected static final Logger LOGGER = LoggerFactory.getLogger(ArrowWriter.class);
52
53 // schema with fields in message format, not memory format
54 protected final Schema schema;
55 protected final WriteChannel out;
56
57 private final VectorUnloader unloader;
58 private final List<ArrowDictionaryBatch> dictionaries;
59
60 private boolean started = false;
61 private boolean ended = false;
62
63 private boolean dictWritten = false;
64
65 protected IpcOption option;
66
67 protected ArrowWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out) {
68 this (root, provider, out, IpcOption.DEFAULT);
69 }
70
71 /**
72 * Note: fields are not closed when the writer is closed.
73 *
74 * @param root the vectors to write to the output
75 * @param provider where to find the dictionaries
76 * @param out the output where to write
77 * @param option IPC write options
78 */
79 protected ArrowWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out, IpcOption option) {
80 this.unloader = new VectorUnloader(root);
81 this.out = new WriteChannel(out);
82 this.option = option;
83
84 List<Field> fields = new ArrayList<>(root.getSchema().getFields().size());
85 Set<Long> dictionaryIdsUsed = new HashSet<>();
86
87 MetadataV4UnionChecker.checkForUnion(root.getSchema().getFields().iterator(), option.metadataVersion);
88 // Convert fields with dictionaries to have dictionary type
89 for (Field field : root.getSchema().getFields()) {
90 fields.add(DictionaryUtility.toMessageFormat(field, provider, dictionaryIdsUsed));
91 }
92
93 // Create a record batch for each dictionary
94 this.dictionaries = new ArrayList<>(dictionaryIdsUsed.size());
95 for (long id : dictionaryIdsUsed) {
96 Dictionary dictionary = provider.lookup(id);
97 FieldVector vector = dictionary.getVector();
98 int count = vector.getValueCount();
99 VectorSchemaRoot dictRoot = new VectorSchemaRoot(
100 Collections.singletonList(vector.getField()),
101 Collections.singletonList(vector),
102 count);
103 VectorUnloader unloader = new VectorUnloader(dictRoot);
104 ArrowRecordBatch batch = unloader.getRecordBatch();
105 this.dictionaries.add(new ArrowDictionaryBatch(id, batch));
106 }
107
108 this.schema = new Schema(fields, root.getSchema().getCustomMetadata());
109 }
110
111 public void start() throws IOException {
112 ensureStarted();
113 }
114
115 /**
116 * Writes the record batch currently loaded in this instance's VectorSchemaRoot.
117 */
118 public void writeBatch() throws IOException {
119 ensureStarted();
120 ensureDictionariesWritten();
121 try (ArrowRecordBatch batch = unloader.getRecordBatch()) {
122 writeRecordBatch(batch);
123 }
124 }
125
126 protected ArrowBlock writeDictionaryBatch(ArrowDictionaryBatch batch) throws IOException {
127 ArrowBlock block = MessageSerializer.serialize(out, batch, option);
128 if (LOGGER.isDebugEnabled()) {
129 LOGGER.debug("DictionaryRecordBatch at {}, metadata: {}, body: {}",
130 block.getOffset(), block.getMetadataLength(), block.getBodyLength());
131 }
132 return block;
133 }
134
135 protected ArrowBlock writeRecordBatch(ArrowRecordBatch batch) throws IOException {
136 ArrowBlock block = MessageSerializer.serialize(out, batch, option);
137 if (LOGGER.isDebugEnabled()) {
138 LOGGER.debug("RecordBatch at {}, metadata: {}, body: {}",
139 block.getOffset(), block.getMetadataLength(), block.getBodyLength());
140 }
141 return block;
142 }
143
144 public void end() throws IOException {
145 ensureStarted();
146 ensureEnded();
147 }
148
149 public long bytesWritten() {
150 return out.getCurrentPosition();
151 }
152
153 private void ensureStarted() throws IOException {
154 if (!started) {
155 started = true;
156 startInternal(out);
157 // write the schema - for file formats this is duplicated in the footer, but matches
158 // the streaming format
159 MessageSerializer.serialize(out, schema, option);
160 }
161 }
162
163 /**
164 * Write dictionaries after schema and before recordBatches, dictionaries won't be
165 * written if empty stream (only has schema data in IPC).
166 */
167 private void ensureDictionariesWritten() throws IOException {
168 if (!dictWritten) {
169 dictWritten = true;
170 // write out any dictionaries
171 try {
172 for (ArrowDictionaryBatch batch : dictionaries) {
173 writeDictionaryBatch(batch);
174 }
175 } finally {
176 try {
177 AutoCloseables.close(dictionaries);
178 } catch (Exception e) {
179 throw new RuntimeException("Error occurred while closing dictionaries.", e);
180 }
181 }
182 }
183 }
184
185 private void ensureEnded() throws IOException {
186 if (!ended) {
187 ended = true;
188 endInternal(out);
189 }
190 }
191
192 protected void startInternal(WriteChannel out) throws IOException {
193 }
194
195 protected void endInternal(WriteChannel out) throws IOException {
196 }
197
198 @Override
199 public void close() {
200 try {
201 end();
202 out.close();
203 if (!dictWritten) {
204 AutoCloseables.close(dictionaries);
205 }
206 } catch (Exception e) {
207 throw new RuntimeException(e);
208 }
209 }
210}