]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one or more |
2 | // contributor license agreements. See the NOTICE file distributed with | |
3 | // this work for additional information regarding copyright ownership. | |
4 | // The ASF licenses this file to You under the Apache License, Version 2.0 | |
5 | // (the "License"); you may not use this file except in compliance with | |
6 | // the License. You may obtain a copy of the License at | |
7 | // | |
8 | // http://www.apache.org/licenses/LICENSE-2.0 | |
9 | // | |
10 | // Unless required by applicable law or agreed to in writing, software | |
11 | // distributed under the License is distributed on an "AS IS" BASIS, | |
12 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | // See the License for the specific language governing permissions and | |
14 | // limitations under the License. | |
15 | ||
16 | using System; | |
17 | using System.Buffers.Binary; | |
18 | using System.Collections.Generic; | |
19 | using System.Diagnostics; | |
20 | using System.IO; | |
21 | using System.Threading; | |
22 | using System.Threading.Tasks; | |
23 | ||
24 | namespace Apache.Arrow.Ipc | |
25 | { | |
26 | public class ArrowFileWriter: ArrowStreamWriter | |
27 | { | |
28 | private long _currentRecordBatchOffset = -1; | |
29 | ||
30 | private List<Block> RecordBatchBlocks { get; } | |
31 | ||
32 | public ArrowFileWriter(Stream stream, Schema schema) | |
33 | : this(stream, schema, leaveOpen: false) | |
34 | { | |
35 | } | |
36 | ||
37 | public ArrowFileWriter(Stream stream, Schema schema, bool leaveOpen) | |
38 | : this(stream, schema, leaveOpen, options: null) | |
39 | { | |
40 | } | |
41 | ||
42 | public ArrowFileWriter(Stream stream, Schema schema, bool leaveOpen, IpcOptions options) | |
43 | : base(stream, schema, leaveOpen, options) | |
44 | { | |
45 | if (!stream.CanWrite) | |
46 | { | |
47 | throw new ArgumentException("stream must be writable", nameof(stream)); | |
48 | } | |
49 | ||
50 | // TODO: Remove seek requirement | |
51 | ||
52 | if (!stream.CanSeek) | |
53 | { | |
54 | throw new ArgumentException("stream must be seekable", nameof(stream)); | |
55 | } | |
56 | ||
57 | RecordBatchBlocks = new List<Block>(); | |
58 | } | |
59 | ||
60 | public override void WriteRecordBatch(RecordBatch recordBatch) | |
61 | { | |
62 | // TODO: Compare record batch schema | |
63 | ||
64 | WriteStart(); | |
65 | ||
66 | WriteRecordBatchInternal(recordBatch); | |
67 | } | |
68 | ||
69 | public override async Task WriteRecordBatchAsync(RecordBatch recordBatch, CancellationToken cancellationToken = default) | |
70 | { | |
71 | // TODO: Compare record batch schema | |
72 | ||
73 | await WriteStartAsync(cancellationToken).ConfigureAwait(false); | |
74 | ||
75 | cancellationToken.ThrowIfCancellationRequested(); | |
76 | ||
77 | await WriteRecordBatchInternalAsync(recordBatch, cancellationToken) | |
78 | .ConfigureAwait(false); | |
79 | } | |
80 | ||
81 | private protected override void StartingWritingRecordBatch() | |
82 | { | |
83 | _currentRecordBatchOffset = BaseStream.Position; | |
84 | } | |
85 | ||
86 | private protected override void FinishedWritingRecordBatch(long bodyLength, long metadataLength) | |
87 | { | |
88 | // Record batches only appear after a Schema is written, so the record batch offsets must | |
89 | // always be greater than 0. | |
90 | Debug.Assert(_currentRecordBatchOffset > 0, "_currentRecordBatchOffset must be positive."); | |
91 | ||
92 | int metadataLengthInt = checked((int)metadataLength); | |
93 | ||
94 | Debug.Assert(BitUtility.IsMultipleOf8(_currentRecordBatchOffset)); | |
95 | Debug.Assert(BitUtility.IsMultipleOf8(metadataLengthInt)); | |
96 | Debug.Assert(BitUtility.IsMultipleOf8(bodyLength)); | |
97 | ||
98 | var block = new Block( | |
99 | offset: _currentRecordBatchOffset, | |
100 | length: bodyLength, | |
101 | metadataLength: metadataLengthInt); | |
102 | ||
103 | RecordBatchBlocks.Add(block); | |
104 | ||
105 | _currentRecordBatchOffset = -1; | |
106 | } | |
107 | ||
108 | private protected override void WriteEndInternal() | |
109 | { | |
110 | base.WriteEndInternal(); | |
111 | ||
112 | WriteFooter(Schema); | |
113 | } | |
114 | ||
115 | private protected override async ValueTask WriteEndInternalAsync(CancellationToken cancellationToken) | |
116 | { | |
117 | await base.WriteEndInternalAsync(cancellationToken); | |
118 | ||
119 | await WriteFooterAsync(Schema, cancellationToken); | |
120 | } | |
121 | ||
122 | private protected override void WriteStartInternal() | |
123 | { | |
124 | // Write magic number and empty padding up to the 8-byte boundary | |
125 | ||
126 | WriteMagic(); | |
127 | WritePadding(CalculatePadding(ArrowFileConstants.Magic.Length)); | |
128 | } | |
129 | ||
130 | private protected async override ValueTask WriteStartInternalAsync(CancellationToken cancellationToken) | |
131 | { | |
132 | // Write magic number and empty padding up to the 8-byte boundary | |
133 | ||
134 | await WriteMagicAsync(cancellationToken).ConfigureAwait(false); | |
135 | await WritePaddingAsync(CalculatePadding(ArrowFileConstants.Magic.Length)) | |
136 | .ConfigureAwait(false); | |
137 | } | |
138 | ||
139 | private void WriteFooter(Schema schema) | |
140 | { | |
141 | Builder.Clear(); | |
142 | ||
143 | long offset = BaseStream.Position; | |
144 | ||
145 | // Serialize the schema | |
146 | ||
147 | FlatBuffers.Offset<Flatbuf.Schema> schemaOffset = SerializeSchema(schema); | |
148 | ||
149 | // Serialize all record batches | |
150 | ||
151 | Flatbuf.Footer.StartRecordBatchesVector(Builder, RecordBatchBlocks.Count); | |
152 | ||
153 | // flatbuffer struct vectors have to be created in reverse order | |
154 | for (int i = RecordBatchBlocks.Count - 1; i >= 0; i--) | |
155 | { | |
156 | Block recordBatch = RecordBatchBlocks[i]; | |
157 | Flatbuf.Block.CreateBlock( | |
158 | Builder, recordBatch.Offset, recordBatch.MetadataLength, recordBatch.BodyLength); | |
159 | } | |
160 | ||
161 | FlatBuffers.VectorOffset recordBatchesVectorOffset = Builder.EndVector(); | |
162 | ||
163 | // Serialize all dictionaries | |
164 | // NOTE: Currently unsupported. | |
165 | ||
166 | Flatbuf.Footer.StartDictionariesVector(Builder, 0); | |
167 | ||
168 | FlatBuffers.VectorOffset dictionaryBatchesOffset = Builder.EndVector(); | |
169 | ||
170 | // Serialize and write the footer flatbuffer | |
171 | ||
172 | FlatBuffers.Offset<Flatbuf.Footer> footerOffset = Flatbuf.Footer.CreateFooter(Builder, CurrentMetadataVersion, | |
173 | schemaOffset, dictionaryBatchesOffset, recordBatchesVectorOffset); | |
174 | ||
175 | Builder.Finish(footerOffset.Value); | |
176 | ||
177 | WriteFlatBuffer(); | |
178 | ||
179 | // Write footer length | |
180 | ||
181 | Buffers.RentReturn(4, (buffer) => | |
182 | { | |
183 | int footerLength; | |
184 | checked | |
185 | { | |
186 | footerLength = (int)(BaseStream.Position - offset); | |
187 | } | |
188 | ||
189 | BinaryPrimitives.WriteInt32LittleEndian(buffer.Span, footerLength); | |
190 | ||
191 | BaseStream.Write(buffer); | |
192 | }); | |
193 | ||
194 | // Write magic | |
195 | ||
196 | WriteMagic(); | |
197 | } | |
198 | ||
199 | private async Task WriteFooterAsync(Schema schema, CancellationToken cancellationToken) | |
200 | { | |
201 | Builder.Clear(); | |
202 | ||
203 | long offset = BaseStream.Position; | |
204 | ||
205 | // Serialize the schema | |
206 | ||
207 | FlatBuffers.Offset<Flatbuf.Schema> schemaOffset = SerializeSchema(schema); | |
208 | ||
209 | // Serialize all record batches | |
210 | ||
211 | Flatbuf.Footer.StartRecordBatchesVector(Builder, RecordBatchBlocks.Count); | |
212 | ||
213 | // flatbuffer struct vectors have to be created in reverse order | |
214 | for (int i = RecordBatchBlocks.Count - 1; i >= 0; i--) | |
215 | { | |
216 | Block recordBatch = RecordBatchBlocks[i]; | |
217 | Flatbuf.Block.CreateBlock( | |
218 | Builder, recordBatch.Offset, recordBatch.MetadataLength, recordBatch.BodyLength); | |
219 | } | |
220 | ||
221 | FlatBuffers.VectorOffset recordBatchesVectorOffset = Builder.EndVector(); | |
222 | ||
223 | // Serialize all dictionaries | |
224 | // NOTE: Currently unsupported. | |
225 | ||
226 | Flatbuf.Footer.StartDictionariesVector(Builder, 0); | |
227 | ||
228 | FlatBuffers.VectorOffset dictionaryBatchesOffset = Builder.EndVector(); | |
229 | ||
230 | // Serialize and write the footer flatbuffer | |
231 | ||
232 | FlatBuffers.Offset<Flatbuf.Footer> footerOffset = Flatbuf.Footer.CreateFooter(Builder, CurrentMetadataVersion, | |
233 | schemaOffset, dictionaryBatchesOffset, recordBatchesVectorOffset); | |
234 | ||
235 | Builder.Finish(footerOffset.Value); | |
236 | ||
237 | cancellationToken.ThrowIfCancellationRequested(); | |
238 | ||
239 | await WriteFlatBufferAsync(cancellationToken).ConfigureAwait(false); | |
240 | ||
241 | // Write footer length | |
242 | ||
243 | cancellationToken.ThrowIfCancellationRequested(); | |
244 | ||
245 | await Buffers.RentReturnAsync(4, async (buffer) => | |
246 | { | |
247 | int footerLength; | |
248 | checked | |
249 | { | |
250 | footerLength = (int)(BaseStream.Position - offset); | |
251 | } | |
252 | ||
253 | BinaryPrimitives.WriteInt32LittleEndian(buffer.Span, footerLength); | |
254 | ||
255 | await BaseStream.WriteAsync(buffer, cancellationToken).ConfigureAwait(false); | |
256 | }).ConfigureAwait(false); | |
257 | ||
258 | // Write magic | |
259 | ||
260 | await WriteMagicAsync(cancellationToken).ConfigureAwait(false); | |
261 | } | |
262 | ||
263 | private void WriteMagic() | |
264 | { | |
265 | BaseStream.Write(ArrowFileConstants.Magic); | |
266 | } | |
267 | ||
268 | private ValueTask WriteMagicAsync(CancellationToken cancellationToken) | |
269 | { | |
270 | return BaseStream.WriteAsync(ArrowFileConstants.Magic, cancellationToken); | |
271 | } | |
272 | } | |
273 | } |