]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one or more | |
3 | * contributor license agreements. See the NOTICE file distributed with | |
4 | * this work for additional information regarding copyright ownership. | |
5 | * The ASF licenses this file to You under the Apache License, Version 2.0 | |
6 | * (the "License"); you may not use this file except in compliance with | |
7 | * the License. You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | */ | |
17 | ||
18 | package org.apache.arrow.vector.ipc; | |
19 | ||
20 | import java.io.IOException; | |
21 | import java.nio.channels.WritableByteChannel; | |
22 | import java.util.ArrayList; | |
23 | import java.util.Collections; | |
24 | import java.util.HashSet; | |
25 | import java.util.List; | |
26 | import java.util.Set; | |
27 | ||
28 | import org.apache.arrow.util.AutoCloseables; | |
29 | import org.apache.arrow.vector.FieldVector; | |
30 | import org.apache.arrow.vector.VectorSchemaRoot; | |
31 | import org.apache.arrow.vector.VectorUnloader; | |
32 | import org.apache.arrow.vector.dictionary.Dictionary; | |
33 | import org.apache.arrow.vector.dictionary.DictionaryProvider; | |
34 | import org.apache.arrow.vector.ipc.message.ArrowBlock; | |
35 | import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch; | |
36 | import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; | |
37 | import org.apache.arrow.vector.ipc.message.IpcOption; | |
38 | import org.apache.arrow.vector.ipc.message.MessageSerializer; | |
39 | import org.apache.arrow.vector.types.pojo.Field; | |
40 | import org.apache.arrow.vector.types.pojo.Schema; | |
41 | import org.apache.arrow.vector.util.DictionaryUtility; | |
42 | import org.apache.arrow.vector.validate.MetadataV4UnionChecker; | |
43 | import org.slf4j.Logger; | |
44 | import org.slf4j.LoggerFactory; | |
45 | ||
46 | /** | |
47 | * Abstract base class for implementing Arrow writers for IPC over a WriteChannel. | |
48 | */ | |
49 | public abstract class ArrowWriter implements AutoCloseable { | |
50 | ||
51 | protected static final Logger LOGGER = LoggerFactory.getLogger(ArrowWriter.class); | |
52 | ||
53 | // schema with fields in message format, not memory format | |
54 | protected final Schema schema; | |
55 | protected final WriteChannel out; | |
56 | ||
57 | private final VectorUnloader unloader; | |
58 | private final List<ArrowDictionaryBatch> dictionaries; | |
59 | ||
60 | private boolean started = false; | |
61 | private boolean ended = false; | |
62 | ||
63 | private boolean dictWritten = false; | |
64 | ||
65 | protected IpcOption option; | |
66 | ||
67 | protected ArrowWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out) { | |
68 | this (root, provider, out, IpcOption.DEFAULT); | |
69 | } | |
70 | ||
71 | /** | |
72 | * Note: fields are not closed when the writer is closed. | |
73 | * | |
74 | * @param root the vectors to write to the output | |
75 | * @param provider where to find the dictionaries | |
76 | * @param out the output where to write | |
77 | * @param option IPC write options | |
78 | */ | |
79 | protected ArrowWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out, IpcOption option) { | |
80 | this.unloader = new VectorUnloader(root); | |
81 | this.out = new WriteChannel(out); | |
82 | this.option = option; | |
83 | ||
84 | List<Field> fields = new ArrayList<>(root.getSchema().getFields().size()); | |
85 | Set<Long> dictionaryIdsUsed = new HashSet<>(); | |
86 | ||
87 | MetadataV4UnionChecker.checkForUnion(root.getSchema().getFields().iterator(), option.metadataVersion); | |
88 | // Convert fields with dictionaries to have dictionary type | |
89 | for (Field field : root.getSchema().getFields()) { | |
90 | fields.add(DictionaryUtility.toMessageFormat(field, provider, dictionaryIdsUsed)); | |
91 | } | |
92 | ||
93 | // Create a record batch for each dictionary | |
94 | this.dictionaries = new ArrayList<>(dictionaryIdsUsed.size()); | |
95 | for (long id : dictionaryIdsUsed) { | |
96 | Dictionary dictionary = provider.lookup(id); | |
97 | FieldVector vector = dictionary.getVector(); | |
98 | int count = vector.getValueCount(); | |
99 | VectorSchemaRoot dictRoot = new VectorSchemaRoot( | |
100 | Collections.singletonList(vector.getField()), | |
101 | Collections.singletonList(vector), | |
102 | count); | |
103 | VectorUnloader unloader = new VectorUnloader(dictRoot); | |
104 | ArrowRecordBatch batch = unloader.getRecordBatch(); | |
105 | this.dictionaries.add(new ArrowDictionaryBatch(id, batch)); | |
106 | } | |
107 | ||
108 | this.schema = new Schema(fields, root.getSchema().getCustomMetadata()); | |
109 | } | |
110 | ||
111 | public void start() throws IOException { | |
112 | ensureStarted(); | |
113 | } | |
114 | ||
115 | /** | |
116 | * Writes the record batch currently loaded in this instance's VectorSchemaRoot. | |
117 | */ | |
118 | public void writeBatch() throws IOException { | |
119 | ensureStarted(); | |
120 | ensureDictionariesWritten(); | |
121 | try (ArrowRecordBatch batch = unloader.getRecordBatch()) { | |
122 | writeRecordBatch(batch); | |
123 | } | |
124 | } | |
125 | ||
126 | protected ArrowBlock writeDictionaryBatch(ArrowDictionaryBatch batch) throws IOException { | |
127 | ArrowBlock block = MessageSerializer.serialize(out, batch, option); | |
128 | if (LOGGER.isDebugEnabled()) { | |
129 | LOGGER.debug("DictionaryRecordBatch at {}, metadata: {}, body: {}", | |
130 | block.getOffset(), block.getMetadataLength(), block.getBodyLength()); | |
131 | } | |
132 | return block; | |
133 | } | |
134 | ||
135 | protected ArrowBlock writeRecordBatch(ArrowRecordBatch batch) throws IOException { | |
136 | ArrowBlock block = MessageSerializer.serialize(out, batch, option); | |
137 | if (LOGGER.isDebugEnabled()) { | |
138 | LOGGER.debug("RecordBatch at {}, metadata: {}, body: {}", | |
139 | block.getOffset(), block.getMetadataLength(), block.getBodyLength()); | |
140 | } | |
141 | return block; | |
142 | } | |
143 | ||
144 | public void end() throws IOException { | |
145 | ensureStarted(); | |
146 | ensureEnded(); | |
147 | } | |
148 | ||
149 | public long bytesWritten() { | |
150 | return out.getCurrentPosition(); | |
151 | } | |
152 | ||
153 | private void ensureStarted() throws IOException { | |
154 | if (!started) { | |
155 | started = true; | |
156 | startInternal(out); | |
157 | // write the schema - for file formats this is duplicated in the footer, but matches | |
158 | // the streaming format | |
159 | MessageSerializer.serialize(out, schema, option); | |
160 | } | |
161 | } | |
162 | ||
163 | /** | |
164 | * Write dictionaries after schema and before recordBatches, dictionaries won't be | |
165 | * written if empty stream (only has schema data in IPC). | |
166 | */ | |
167 | private void ensureDictionariesWritten() throws IOException { | |
168 | if (!dictWritten) { | |
169 | dictWritten = true; | |
170 | // write out any dictionaries | |
171 | try { | |
172 | for (ArrowDictionaryBatch batch : dictionaries) { | |
173 | writeDictionaryBatch(batch); | |
174 | } | |
175 | } finally { | |
176 | try { | |
177 | AutoCloseables.close(dictionaries); | |
178 | } catch (Exception e) { | |
179 | throw new RuntimeException("Error occurred while closing dictionaries.", e); | |
180 | } | |
181 | } | |
182 | } | |
183 | } | |
184 | ||
185 | private void ensureEnded() throws IOException { | |
186 | if (!ended) { | |
187 | ended = true; | |
188 | endInternal(out); | |
189 | } | |
190 | } | |
191 | ||
192 | protected void startInternal(WriteChannel out) throws IOException { | |
193 | } | |
194 | ||
195 | protected void endInternal(WriteChannel out) throws IOException { | |
196 | } | |
197 | ||
198 | @Override | |
199 | public void close() { | |
200 | try { | |
201 | end(); | |
202 | out.close(); | |
203 | if (!dictWritten) { | |
204 | AutoCloseables.close(dictionaries); | |
205 | } | |
206 | } catch (Exception e) { | |
207 | throw new RuntimeException(e); | |
208 | } | |
209 | } | |
210 | } |