]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/csharp/src/Apache.Arrow/Ipc/ArrowFileWriter.cs
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / csharp / src / Apache.Arrow / Ipc / ArrowFileWriter.cs
CommitLineData
1d09f67e
TL
1// Licensed to the Apache Software Foundation (ASF) under one or more
2// contributor license agreements. See the NOTICE file distributed with
3// this work for additional information regarding copyright ownership.
4// The ASF licenses this file to You under the Apache License, Version 2.0
5// (the "License"); you may not use this file except in compliance with
6// the License. You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16using System;
17using System.Buffers.Binary;
18using System.Collections.Generic;
19using System.Diagnostics;
20using System.IO;
21using System.Threading;
22using System.Threading.Tasks;
23
24namespace Apache.Arrow.Ipc
25{
26 public class ArrowFileWriter: ArrowStreamWriter
27 {
28 private long _currentRecordBatchOffset = -1;
29
30 private List<Block> RecordBatchBlocks { get; }
31
32 public ArrowFileWriter(Stream stream, Schema schema)
33 : this(stream, schema, leaveOpen: false)
34 {
35 }
36
37 public ArrowFileWriter(Stream stream, Schema schema, bool leaveOpen)
38 : this(stream, schema, leaveOpen, options: null)
39 {
40 }
41
42 public ArrowFileWriter(Stream stream, Schema schema, bool leaveOpen, IpcOptions options)
43 : base(stream, schema, leaveOpen, options)
44 {
45 if (!stream.CanWrite)
46 {
47 throw new ArgumentException("stream must be writable", nameof(stream));
48 }
49
50 // TODO: Remove seek requirement
51
52 if (!stream.CanSeek)
53 {
54 throw new ArgumentException("stream must be seekable", nameof(stream));
55 }
56
57 RecordBatchBlocks = new List<Block>();
58 }
59
60 public override void WriteRecordBatch(RecordBatch recordBatch)
61 {
62 // TODO: Compare record batch schema
63
64 WriteStart();
65
66 WriteRecordBatchInternal(recordBatch);
67 }
68
69 public override async Task WriteRecordBatchAsync(RecordBatch recordBatch, CancellationToken cancellationToken = default)
70 {
71 // TODO: Compare record batch schema
72
73 await WriteStartAsync(cancellationToken).ConfigureAwait(false);
74
75 cancellationToken.ThrowIfCancellationRequested();
76
77 await WriteRecordBatchInternalAsync(recordBatch, cancellationToken)
78 .ConfigureAwait(false);
79 }
80
81 private protected override void StartingWritingRecordBatch()
82 {
83 _currentRecordBatchOffset = BaseStream.Position;
84 }
85
86 private protected override void FinishedWritingRecordBatch(long bodyLength, long metadataLength)
87 {
88 // Record batches only appear after a Schema is written, so the record batch offsets must
89 // always be greater than 0.
90 Debug.Assert(_currentRecordBatchOffset > 0, "_currentRecordBatchOffset must be positive.");
91
92 int metadataLengthInt = checked((int)metadataLength);
93
94 Debug.Assert(BitUtility.IsMultipleOf8(_currentRecordBatchOffset));
95 Debug.Assert(BitUtility.IsMultipleOf8(metadataLengthInt));
96 Debug.Assert(BitUtility.IsMultipleOf8(bodyLength));
97
98 var block = new Block(
99 offset: _currentRecordBatchOffset,
100 length: bodyLength,
101 metadataLength: metadataLengthInt);
102
103 RecordBatchBlocks.Add(block);
104
105 _currentRecordBatchOffset = -1;
106 }
107
108 private protected override void WriteEndInternal()
109 {
110 base.WriteEndInternal();
111
112 WriteFooter(Schema);
113 }
114
115 private protected override async ValueTask WriteEndInternalAsync(CancellationToken cancellationToken)
116 {
117 await base.WriteEndInternalAsync(cancellationToken);
118
119 await WriteFooterAsync(Schema, cancellationToken);
120 }
121
122 private protected override void WriteStartInternal()
123 {
124 // Write magic number and empty padding up to the 8-byte boundary
125
126 WriteMagic();
127 WritePadding(CalculatePadding(ArrowFileConstants.Magic.Length));
128 }
129
130 private protected async override ValueTask WriteStartInternalAsync(CancellationToken cancellationToken)
131 {
132 // Write magic number and empty padding up to the 8-byte boundary
133
134 await WriteMagicAsync(cancellationToken).ConfigureAwait(false);
135 await WritePaddingAsync(CalculatePadding(ArrowFileConstants.Magic.Length))
136 .ConfigureAwait(false);
137 }
138
139 private void WriteFooter(Schema schema)
140 {
141 Builder.Clear();
142
143 long offset = BaseStream.Position;
144
145 // Serialize the schema
146
147 FlatBuffers.Offset<Flatbuf.Schema> schemaOffset = SerializeSchema(schema);
148
149 // Serialize all record batches
150
151 Flatbuf.Footer.StartRecordBatchesVector(Builder, RecordBatchBlocks.Count);
152
153 // flatbuffer struct vectors have to be created in reverse order
154 for (int i = RecordBatchBlocks.Count - 1; i >= 0; i--)
155 {
156 Block recordBatch = RecordBatchBlocks[i];
157 Flatbuf.Block.CreateBlock(
158 Builder, recordBatch.Offset, recordBatch.MetadataLength, recordBatch.BodyLength);
159 }
160
161 FlatBuffers.VectorOffset recordBatchesVectorOffset = Builder.EndVector();
162
163 // Serialize all dictionaries
164 // NOTE: Currently unsupported.
165
166 Flatbuf.Footer.StartDictionariesVector(Builder, 0);
167
168 FlatBuffers.VectorOffset dictionaryBatchesOffset = Builder.EndVector();
169
170 // Serialize and write the footer flatbuffer
171
172 FlatBuffers.Offset<Flatbuf.Footer> footerOffset = Flatbuf.Footer.CreateFooter(Builder, CurrentMetadataVersion,
173 schemaOffset, dictionaryBatchesOffset, recordBatchesVectorOffset);
174
175 Builder.Finish(footerOffset.Value);
176
177 WriteFlatBuffer();
178
179 // Write footer length
180
181 Buffers.RentReturn(4, (buffer) =>
182 {
183 int footerLength;
184 checked
185 {
186 footerLength = (int)(BaseStream.Position - offset);
187 }
188
189 BinaryPrimitives.WriteInt32LittleEndian(buffer.Span, footerLength);
190
191 BaseStream.Write(buffer);
192 });
193
194 // Write magic
195
196 WriteMagic();
197 }
198
199 private async Task WriteFooterAsync(Schema schema, CancellationToken cancellationToken)
200 {
201 Builder.Clear();
202
203 long offset = BaseStream.Position;
204
205 // Serialize the schema
206
207 FlatBuffers.Offset<Flatbuf.Schema> schemaOffset = SerializeSchema(schema);
208
209 // Serialize all record batches
210
211 Flatbuf.Footer.StartRecordBatchesVector(Builder, RecordBatchBlocks.Count);
212
213 // flatbuffer struct vectors have to be created in reverse order
214 for (int i = RecordBatchBlocks.Count - 1; i >= 0; i--)
215 {
216 Block recordBatch = RecordBatchBlocks[i];
217 Flatbuf.Block.CreateBlock(
218 Builder, recordBatch.Offset, recordBatch.MetadataLength, recordBatch.BodyLength);
219 }
220
221 FlatBuffers.VectorOffset recordBatchesVectorOffset = Builder.EndVector();
222
223 // Serialize all dictionaries
224 // NOTE: Currently unsupported.
225
226 Flatbuf.Footer.StartDictionariesVector(Builder, 0);
227
228 FlatBuffers.VectorOffset dictionaryBatchesOffset = Builder.EndVector();
229
230 // Serialize and write the footer flatbuffer
231
232 FlatBuffers.Offset<Flatbuf.Footer> footerOffset = Flatbuf.Footer.CreateFooter(Builder, CurrentMetadataVersion,
233 schemaOffset, dictionaryBatchesOffset, recordBatchesVectorOffset);
234
235 Builder.Finish(footerOffset.Value);
236
237 cancellationToken.ThrowIfCancellationRequested();
238
239 await WriteFlatBufferAsync(cancellationToken).ConfigureAwait(false);
240
241 // Write footer length
242
243 cancellationToken.ThrowIfCancellationRequested();
244
245 await Buffers.RentReturnAsync(4, async (buffer) =>
246 {
247 int footerLength;
248 checked
249 {
250 footerLength = (int)(BaseStream.Position - offset);
251 }
252
253 BinaryPrimitives.WriteInt32LittleEndian(buffer.Span, footerLength);
254
255 await BaseStream.WriteAsync(buffer, cancellationToken).ConfigureAwait(false);
256 }).ConfigureAwait(false);
257
258 // Write magic
259
260 await WriteMagicAsync(cancellationToken).ConfigureAwait(false);
261 }
262
263 private void WriteMagic()
264 {
265 BaseStream.Write(ArrowFileConstants.Magic);
266 }
267
268 private ValueTask WriteMagicAsync(CancellationToken cancellationToken)
269 {
270 return BaseStream.WriteAsync(ArrowFileConstants.Magic, cancellationToken);
271 }
272 }
273}