1 // Licensed to the Apache Software Foundation (ASF) under one or more
2 // contributor license agreements. See the NOTICE file distributed with
3 // this work for additional information regarding copyright ownership.
4 // The ASF licenses this file to You under the Apache License, Version 2.0
5 // (the "License"); you may not use this file except in compliance with
6 // the License. You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
18 using System.Buffers.Binary;
19 using System.Collections.Generic;
20 using System.Diagnostics;
22 using System.Threading;
23 using System.Threading.Tasks;
24 using Apache.Arrow.Arrays;
25 using Apache.Arrow.Types;
28 namespace Apache.Arrow.Ipc
30 public class ArrowStreamWriter : IDisposable
32 internal class ArrowRecordBatchFlatBufferBuilder :
33 IArrowArrayVisitor<Int8Array>,
34 IArrowArrayVisitor<Int16Array>,
35 IArrowArrayVisitor<Int32Array>,
36 IArrowArrayVisitor<Int64Array>,
37 IArrowArrayVisitor<UInt8Array>,
38 IArrowArrayVisitor<UInt16Array>,
39 IArrowArrayVisitor<UInt32Array>,
40 IArrowArrayVisitor<UInt64Array>,
41 IArrowArrayVisitor<FloatArray>,
42 IArrowArrayVisitor<DoubleArray>,
43 IArrowArrayVisitor<BooleanArray>,
44 IArrowArrayVisitor<TimestampArray>,
45 IArrowArrayVisitor<Date32Array>,
46 IArrowArrayVisitor<Date64Array>,
47 IArrowArrayVisitor<ListArray>,
48 IArrowArrayVisitor<StringArray>,
49 IArrowArrayVisitor<BinaryArray>,
50 IArrowArrayVisitor<FixedSizeBinaryArray>,
51 IArrowArrayVisitor<StructArray>,
52 IArrowArrayVisitor<Decimal128Array>,
53 IArrowArrayVisitor<Decimal256Array>,
54 IArrowArrayVisitor<DictionaryArray>
56 public readonly struct Buffer
58 public readonly ArrowBuffer DataBuffer;
59 public readonly int Offset;
61 public Buffer(ArrowBuffer buffer, int offset)
68 private readonly List<Buffer> _buffers;
70 public IReadOnlyList<Buffer> Buffers => _buffers;
72 public int TotalLength { get; private set; }
74 public ArrowRecordBatchFlatBufferBuilder()
76 _buffers = new List<Buffer>();
80 public void Visit(Int8Array array) => CreateBuffers(array);
81 public void Visit(Int16Array array) => CreateBuffers(array);
82 public void Visit(Int32Array array) => CreateBuffers(array);
83 public void Visit(Int64Array array) => CreateBuffers(array);
84 public void Visit(UInt8Array array) => CreateBuffers(array);
85 public void Visit(UInt16Array array) => CreateBuffers(array);
86 public void Visit(UInt32Array array) => CreateBuffers(array);
87 public void Visit(UInt64Array array) => CreateBuffers(array);
88 public void Visit(FloatArray array) => CreateBuffers(array);
89 public void Visit(DoubleArray array) => CreateBuffers(array);
90 public void Visit(TimestampArray array) => CreateBuffers(array);
91 public void Visit(BooleanArray array) => CreateBuffers(array);
92 public void Visit(Date32Array array) => CreateBuffers(array);
93 public void Visit(Date64Array array) => CreateBuffers(array);
95 public void Visit(ListArray array)
97 _buffers.Add(CreateBuffer(array.NullBitmapBuffer));
98 _buffers.Add(CreateBuffer(array.ValueOffsetsBuffer));
100 array.Values.Accept(this);
103 public void Visit(StringArray array) => Visit(array as BinaryArray);
105 public void Visit(BinaryArray array)
107 _buffers.Add(CreateBuffer(array.NullBitmapBuffer));
108 _buffers.Add(CreateBuffer(array.ValueOffsetsBuffer));
109 _buffers.Add(CreateBuffer(array.ValueBuffer));
112 public void Visit(FixedSizeBinaryArray array)
114 _buffers.Add(CreateBuffer(array.NullBitmapBuffer));
115 _buffers.Add(CreateBuffer(array.ValueBuffer));
118 public void Visit(Decimal128Array array)
120 _buffers.Add(CreateBuffer(array.NullBitmapBuffer));
121 _buffers.Add(CreateBuffer(array.ValueBuffer));
124 public void Visit(Decimal256Array array)
126 _buffers.Add(CreateBuffer(array.NullBitmapBuffer));
127 _buffers.Add(CreateBuffer(array.ValueBuffer));
130 public void Visit(StructArray array)
132 _buffers.Add(CreateBuffer(array.NullBitmapBuffer));
134 for (int i = 0; i < array.Fields.Count; i++)
136 array.Fields[i].Accept(this);
140 public void Visit(DictionaryArray array)
142 // Dictionary is serialized separately in Dictionary serialization.
143 // We are only interested in indices at this context.
145 _buffers.Add(CreateBuffer(array.NullBitmapBuffer));
146 _buffers.Add(CreateBuffer(array.IndicesBuffer));
149 private void CreateBuffers(BooleanArray array)
151 _buffers.Add(CreateBuffer(array.NullBitmapBuffer));
152 _buffers.Add(CreateBuffer(array.ValueBuffer));
155 private void CreateBuffers<T>(PrimitiveArray<T> array)
158 _buffers.Add(CreateBuffer(array.NullBitmapBuffer));
159 _buffers.Add(CreateBuffer(array.ValueBuffer));
162 private Buffer CreateBuffer(ArrowBuffer buffer)
164 int offset = TotalLength;
166 int paddedLength = checked((int)BitUtility.RoundUpToMultipleOf8(buffer.Length));
167 TotalLength += paddedLength;
169 return new Buffer(buffer, offset);
172 public void Visit(IArrowArray array)
174 throw new NotImplementedException();
178 protected Stream BaseStream { get; }
180 protected ArrayPool<byte> Buffers { get; }
182 private protected FlatBufferBuilder Builder { get; }
184 protected bool HasWrittenSchema { get; set; }
186 private bool HasWrittenDictionaryBatch { get; set; }
188 private bool HasWrittenStart { get; set; }
190 private bool HasWrittenEnd { get; set; }
192 protected Schema Schema { get; }
194 private readonly bool _leaveOpen;
195 private readonly IpcOptions _options;
197 private protected const Flatbuf.MetadataVersion CurrentMetadataVersion = Flatbuf.MetadataVersion.V4;
199 private static readonly byte[] s_padding = new byte[64];
201 private readonly ArrowTypeFlatbufferBuilder _fieldTypeBuilder;
203 private DictionaryMemo _dictionaryMemo;
204 private DictionaryMemo DictionaryMemo => _dictionaryMemo ??= new DictionaryMemo();
206 public ArrowStreamWriter(Stream baseStream, Schema schema)
207 : this(baseStream, schema, leaveOpen: false)
211 public ArrowStreamWriter(Stream baseStream, Schema schema, bool leaveOpen)
212 : this(baseStream, schema, leaveOpen, options: null)
216 public ArrowStreamWriter(Stream baseStream, Schema schema, bool leaveOpen, IpcOptions options)
218 BaseStream = baseStream ?? throw new ArgumentNullException(nameof(baseStream));
219 Schema = schema ?? throw new ArgumentNullException(nameof(schema));
220 _leaveOpen = leaveOpen;
222 Buffers = ArrayPool<byte>.Create();
223 Builder = new FlatBufferBuilder(1024);
224 HasWrittenSchema = false;
226 _fieldTypeBuilder = new ArrowTypeFlatbufferBuilder(Builder);
227 _options = options ?? IpcOptions.Default;
231 private void CreateSelfAndChildrenFieldNodes(ArrayData data)
233 if (data.DataType is NestedType)
235 // flatbuffer struct vectors have to be created in reverse order
236 for (int i = data.Children.Length - 1; i >= 0; i--)
238 CreateSelfAndChildrenFieldNodes(data.Children[i]);
241 Flatbuf.FieldNode.CreateFieldNode(Builder, data.Length, data.NullCount);
244 private static int CountAllNodes(IReadOnlyDictionary<string, Field> fields)
247 foreach (Field arrowArray in fields.Values)
249 CountSelfAndChildrenNodes(arrowArray.DataType, ref count);
254 private static void CountSelfAndChildrenNodes(IArrowType type, ref int count)
256 if (type is NestedType nestedType)
258 foreach (Field childField in nestedType.Fields)
260 CountSelfAndChildrenNodes(childField.DataType, ref count);
266 private protected void WriteRecordBatchInternal(RecordBatch recordBatch)
268 // TODO: Truncate buffers with extraneous padding / unused capacity
270 if (!HasWrittenSchema)
273 HasWrittenSchema = true;
276 if (!HasWrittenDictionaryBatch)
278 DictionaryCollector.Collect(recordBatch, ref _dictionaryMemo);
279 WriteDictionaries(recordBatch);
280 HasWrittenDictionaryBatch = true;
283 (ArrowRecordBatchFlatBufferBuilder recordBatchBuilder, VectorOffset fieldNodesVectorOffset) =
284 PreparingWritingRecordBatch(recordBatch);
286 VectorOffset buffersVectorOffset = Builder.EndVector();
288 // Serialize record batch
290 StartingWritingRecordBatch();
292 Offset<Flatbuf.RecordBatch> recordBatchOffset = Flatbuf.RecordBatch.CreateRecordBatch(Builder, recordBatch.Length,
293 fieldNodesVectorOffset,
294 buffersVectorOffset);
296 long metadataLength = WriteMessage(Flatbuf.MessageHeader.RecordBatch,
297 recordBatchOffset, recordBatchBuilder.TotalLength);
299 long bufferLength = WriteBufferData(recordBatchBuilder.Buffers);
301 FinishedWritingRecordBatch(bufferLength, metadataLength);
304 private protected async Task WriteRecordBatchInternalAsync(RecordBatch recordBatch,
305 CancellationToken cancellationToken = default)
307 // TODO: Truncate buffers with extraneous padding / unused capacity
309 if (!HasWrittenSchema)
311 await WriteSchemaAsync(Schema, cancellationToken).ConfigureAwait(false);
312 HasWrittenSchema = true;
315 if (!HasWrittenDictionaryBatch)
317 DictionaryCollector.Collect(recordBatch, ref _dictionaryMemo);
318 await WriteDictionariesAsync(recordBatch, cancellationToken).ConfigureAwait(false);
319 HasWrittenDictionaryBatch = true;
322 (ArrowRecordBatchFlatBufferBuilder recordBatchBuilder, VectorOffset fieldNodesVectorOffset) =
323 PreparingWritingRecordBatch(recordBatch);
325 VectorOffset buffersVectorOffset = Builder.EndVector();
327 // Serialize record batch
329 StartingWritingRecordBatch();
331 Offset<Flatbuf.RecordBatch> recordBatchOffset = Flatbuf.RecordBatch.CreateRecordBatch(Builder, recordBatch.Length,
332 fieldNodesVectorOffset,
333 buffersVectorOffset);
335 long metadataLength = await WriteMessageAsync(Flatbuf.MessageHeader.RecordBatch,
336 recordBatchOffset, recordBatchBuilder.TotalLength,
337 cancellationToken).ConfigureAwait(false);
339 long bufferLength = await WriteBufferDataAsync(recordBatchBuilder.Buffers, cancellationToken).ConfigureAwait(false);
341 FinishedWritingRecordBatch(bufferLength, metadataLength);
344 private long WriteBufferData(IReadOnlyList<ArrowRecordBatchFlatBufferBuilder.Buffer> buffers)
348 for (int i = 0; i < buffers.Count; i++)
350 ArrowBuffer buffer = buffers[i].DataBuffer;
356 int paddedLength = checked((int)BitUtility.RoundUpToMultipleOf8(buffer.Length));
357 int padding = paddedLength - buffer.Length;
360 WritePadding(padding);
363 bodyLength += paddedLength;
366 // Write padding so the record batch message body length is a multiple of 8 bytes
368 int bodyPaddingLength = CalculatePadding(bodyLength);
370 WritePadding(bodyPaddingLength);
372 return bodyLength + bodyPaddingLength;
375 private async ValueTask<long> WriteBufferDataAsync(IReadOnlyList<ArrowRecordBatchFlatBufferBuilder.Buffer> buffers, CancellationToken cancellationToken = default)
379 for (int i = 0; i < buffers.Count; i++)
381 ArrowBuffer buffer = buffers[i].DataBuffer;
385 await WriteBufferAsync(buffer, cancellationToken).ConfigureAwait(false);
387 int paddedLength = checked((int)BitUtility.RoundUpToMultipleOf8(buffer.Length));
388 int padding = paddedLength - buffer.Length;
391 await WritePaddingAsync(padding).ConfigureAwait(false);
394 bodyLength += paddedLength;
397 // Write padding so the record batch message body length is a multiple of 8 bytes
399 int bodyPaddingLength = CalculatePadding(bodyLength);
401 await WritePaddingAsync(bodyPaddingLength).ConfigureAwait(false);
403 return bodyLength + bodyPaddingLength;
406 private Tuple<ArrowRecordBatchFlatBufferBuilder, VectorOffset> PreparingWritingRecordBatch(RecordBatch recordBatch)
408 return PreparingWritingRecordBatch(recordBatch.Schema.Fields, recordBatch.ArrayList);
411 private Tuple<ArrowRecordBatchFlatBufferBuilder, VectorOffset> PreparingWritingRecordBatch(IReadOnlyDictionary<string, Field> fields, IReadOnlyList<IArrowArray> arrays)
415 // Serialize field nodes
417 int fieldCount = fields.Count;
419 Flatbuf.RecordBatch.StartNodesVector(Builder, CountAllNodes(fields));
421 // flatbuffer struct vectors have to be created in reverse order
422 for (int i = fieldCount - 1; i >= 0; i--)
424 CreateSelfAndChildrenFieldNodes(arrays[i].Data);
427 VectorOffset fieldNodesVectorOffset = Builder.EndVector();
431 var recordBatchBuilder = new ArrowRecordBatchFlatBufferBuilder();
432 for (int i = 0; i < fieldCount; i++)
434 IArrowArray fieldArray = arrays[i];
435 fieldArray.Accept(recordBatchBuilder);
438 IReadOnlyList<ArrowRecordBatchFlatBufferBuilder.Buffer> buffers = recordBatchBuilder.Buffers;
440 Flatbuf.RecordBatch.StartBuffersVector(Builder, buffers.Count);
442 // flatbuffer struct vectors have to be created in reverse order
443 for (int i = buffers.Count - 1; i >= 0; i--)
445 Flatbuf.Buffer.CreateBuffer(Builder,
446 buffers[i].Offset, buffers[i].DataBuffer.Length);
449 return Tuple.Create(recordBatchBuilder, fieldNodesVectorOffset);
453 private protected void WriteDictionaries(RecordBatch recordBatch)
455 foreach (Field field in recordBatch.Schema.Fields.Values)
457 WriteDictionary(field);
461 private protected void WriteDictionary(Field field)
463 if (field.DataType.TypeId != ArrowTypeId.Dictionary)
465 if (field.DataType is NestedType nestedType)
467 foreach (Field child in nestedType.Fields)
469 WriteDictionary(child);
475 (ArrowRecordBatchFlatBufferBuilder recordBatchBuilder, Offset<Flatbuf.DictionaryBatch> dictionaryBatchOffset) =
476 CreateDictionaryBatchOffset(field);
478 WriteMessage(Flatbuf.MessageHeader.DictionaryBatch,
479 dictionaryBatchOffset, recordBatchBuilder.TotalLength);
481 WriteBufferData(recordBatchBuilder.Buffers);
484 private protected async Task WriteDictionariesAsync(RecordBatch recordBatch, CancellationToken cancellationToken)
486 foreach (Field field in recordBatch.Schema.Fields.Values)
488 await WriteDictionaryAsync(field, cancellationToken).ConfigureAwait(false);
492 private protected async Task WriteDictionaryAsync(Field field, CancellationToken cancellationToken)
494 if (field.DataType.TypeId != ArrowTypeId.Dictionary)
496 if (field.DataType is NestedType nestedType)
498 foreach (Field child in nestedType.Fields)
500 await WriteDictionaryAsync(child, cancellationToken).ConfigureAwait(false);
506 (ArrowRecordBatchFlatBufferBuilder recordBatchBuilder, Offset<Flatbuf.DictionaryBatch> dictionaryBatchOffset) =
507 CreateDictionaryBatchOffset(field);
509 await WriteMessageAsync(Flatbuf.MessageHeader.DictionaryBatch,
510 dictionaryBatchOffset, recordBatchBuilder.TotalLength, cancellationToken).ConfigureAwait(false);
512 await WriteBufferDataAsync(recordBatchBuilder.Buffers, cancellationToken).ConfigureAwait(false);
515 private Tuple<ArrowRecordBatchFlatBufferBuilder, Offset<Flatbuf.DictionaryBatch>> CreateDictionaryBatchOffset(Field field)
517 Field dictionaryField = new Field("dummy", ((DictionaryType)field.DataType).ValueType, false);
518 long id = DictionaryMemo.GetId(field);
519 IArrowArray dictionary = DictionaryMemo.GetDictionary(id);
521 var fieldsDictionary = new Dictionary<string, Field> {
522 { dictionaryField.Name, dictionaryField } };
524 var arrays = new List<IArrowArray> { dictionary };
526 (ArrowRecordBatchFlatBufferBuilder recordBatchBuilder, VectorOffset fieldNodesVectorOffset) =
527 PreparingWritingRecordBatch(fieldsDictionary, arrays);
529 VectorOffset buffersVectorOffset = Builder.EndVector();
531 // Serialize record batch
532 Offset<Flatbuf.RecordBatch> recordBatchOffset = Flatbuf.RecordBatch.CreateRecordBatch(Builder, dictionary.Length,
533 fieldNodesVectorOffset,
534 buffersVectorOffset);
536 // TODO: Support delta.
537 Offset<Flatbuf.DictionaryBatch> dictionaryBatchOffset = Flatbuf.DictionaryBatch.CreateDictionaryBatch(Builder, id, recordBatchOffset, false);
538 return Tuple.Create(recordBatchBuilder, dictionaryBatchOffset);
541 private protected virtual void WriteStartInternal()
543 if (!HasWrittenSchema)
546 HasWrittenSchema = true;
550 private protected async virtual ValueTask WriteStartInternalAsync(CancellationToken cancellationToken)
552 if (!HasWrittenSchema)
554 await WriteSchemaAsync(Schema, cancellationToken).ConfigureAwait(false);
555 HasWrittenSchema = true;
559 private protected virtual void WriteEndInternal()
561 WriteIpcMessageLength(length: 0);
564 private protected virtual ValueTask WriteEndInternalAsync(CancellationToken cancellationToken)
566 return WriteIpcMessageLengthAsync(length: 0, cancellationToken);
569 private protected virtual void StartingWritingRecordBatch()
573 private protected virtual void FinishedWritingRecordBatch(long bodyLength, long metadataLength)
577 public virtual void WriteRecordBatch(RecordBatch recordBatch)
579 WriteRecordBatchInternal(recordBatch);
582 public virtual Task WriteRecordBatchAsync(RecordBatch recordBatch, CancellationToken cancellationToken = default)
584 return WriteRecordBatchInternalAsync(recordBatch, cancellationToken);
587 public void WriteStart()
589 if (!HasWrittenStart)
591 WriteStartInternal();
592 HasWrittenStart = true;
596 public async Task WriteStartAsync(CancellationToken cancellationToken = default)
598 if (!HasWrittenStart)
600 await WriteStartInternalAsync(cancellationToken);
601 HasWrittenStart = true;
605 public void WriteEnd()
610 HasWrittenEnd = true;
614 public async Task WriteEndAsync(CancellationToken cancellationToken = default)
618 await WriteEndInternalAsync(cancellationToken);
619 HasWrittenEnd = true;
623 private void WriteBuffer(ArrowBuffer arrowBuffer)
625 BaseStream.Write(arrowBuffer.Memory);
628 private ValueTask WriteBufferAsync(ArrowBuffer arrowBuffer, CancellationToken cancellationToken = default)
630 return BaseStream.WriteAsync(arrowBuffer.Memory, cancellationToken);
633 private protected Offset<Flatbuf.Schema> SerializeSchema(Schema schema)
636 VectorOffset metadataVectorOffset = default;
637 if (schema.HasMetadata)
639 Offset<Flatbuf.KeyValue>[] metadataOffsets = GetMetadataOffsets(schema.Metadata);
640 metadataVectorOffset = Flatbuf.Schema.CreateCustomMetadataVector(Builder, metadataOffsets);
644 var fieldOffsets = new Offset<Flatbuf.Field>[schema.Fields.Count];
645 for (int i = 0; i < fieldOffsets.Length; i++)
647 Field field = schema.GetFieldByIndex(i);
648 StringOffset fieldNameOffset = Builder.CreateString(field.Name);
649 ArrowTypeFlatbufferBuilder.FieldType fieldType = _fieldTypeBuilder.BuildFieldType(field);
651 VectorOffset fieldChildrenVectorOffset = GetChildrenFieldOffset(field);
652 VectorOffset fieldMetadataVectorOffset = GetFieldMetadataOffset(field);
653 Offset<Flatbuf.DictionaryEncoding> dictionaryOffset = GetDictionaryOffset(field);
655 fieldOffsets[i] = Flatbuf.Field.CreateField(Builder,
656 fieldNameOffset, field.IsNullable, fieldType.Type, fieldType.Offset,
657 dictionaryOffset, fieldChildrenVectorOffset, fieldMetadataVectorOffset);
660 VectorOffset fieldsVectorOffset = Flatbuf.Schema.CreateFieldsVector(Builder, fieldOffsets);
664 Flatbuf.Endianness endianness = BitConverter.IsLittleEndian ? Flatbuf.Endianness.Little : Flatbuf.Endianness.Big;
666 return Flatbuf.Schema.CreateSchema(
667 Builder, endianness, fieldsVectorOffset, metadataVectorOffset);
670 private VectorOffset GetChildrenFieldOffset(Field field)
672 IArrowType targetDataType = field.DataType is DictionaryType dictionaryType ?
673 dictionaryType.ValueType :
676 if (!(targetDataType is NestedType type))
681 int childrenCount = type.Fields.Count;
682 var children = new Offset<Flatbuf.Field>[childrenCount];
684 for (int i = 0; i < childrenCount; i++)
686 Field childField = type.Fields[i];
687 StringOffset childFieldNameOffset = Builder.CreateString(childField.Name);
688 ArrowTypeFlatbufferBuilder.FieldType childFieldType = _fieldTypeBuilder.BuildFieldType(childField);
690 VectorOffset childFieldChildrenVectorOffset = GetChildrenFieldOffset(childField);
691 VectorOffset childFieldMetadataVectorOffset = GetFieldMetadataOffset(childField);
692 Offset<Flatbuf.DictionaryEncoding> dictionaryOffset = GetDictionaryOffset(childField);
694 children[i] = Flatbuf.Field.CreateField(Builder,
695 childFieldNameOffset, childField.IsNullable, childFieldType.Type, childFieldType.Offset,
696 dictionaryOffset, childFieldChildrenVectorOffset, childFieldMetadataVectorOffset);
699 return Builder.CreateVectorOfTables(children);
702 private VectorOffset GetFieldMetadataOffset(Field field)
704 if (!field.HasMetadata)
709 Offset<Flatbuf.KeyValue>[] metadataOffsets = GetMetadataOffsets(field.Metadata);
710 return Flatbuf.Field.CreateCustomMetadataVector(Builder, metadataOffsets);
713 private Offset<Flatbuf.DictionaryEncoding> GetDictionaryOffset(Field field)
715 if (field.DataType.TypeId != ArrowTypeId.Dictionary)
720 long id = DictionaryMemo.GetOrAssignId(field);
721 var dicType = field.DataType as DictionaryType;
722 var indexType = dicType.IndexType as NumberType;
724 Offset<Flatbuf.Int> indexOffset = Flatbuf.Int.CreateInt(Builder, indexType.BitWidth, indexType.IsSigned);
725 return Flatbuf.DictionaryEncoding.CreateDictionaryEncoding(Builder, id, indexOffset, dicType.Ordered);
728 private Offset<Flatbuf.KeyValue>[] GetMetadataOffsets(IReadOnlyDictionary<string, string> metadata)
730 Debug.Assert(metadata != null);
731 Debug.Assert(metadata.Count > 0);
733 Offset<Flatbuf.KeyValue>[] metadataOffsets = new Offset<Flatbuf.KeyValue>[metadata.Count];
735 foreach (KeyValuePair<string, string> metadatum in metadata)
737 StringOffset keyOffset = Builder.CreateString(metadatum.Key);
738 StringOffset valueOffset = Builder.CreateString(metadatum.Value);
740 metadataOffsets[index++] = Flatbuf.KeyValue.CreateKeyValue(Builder, keyOffset, valueOffset);
743 return metadataOffsets;
746 private Offset<Flatbuf.Schema> WriteSchema(Schema schema)
752 Offset<Flatbuf.Schema> schemaOffset = SerializeSchema(schema);
756 WriteMessage(Flatbuf.MessageHeader.Schema, schemaOffset, 0);
761 private async ValueTask<Offset<Flatbuf.Schema>> WriteSchemaAsync(Schema schema, CancellationToken cancellationToken)
767 Offset<Flatbuf.Schema> schemaOffset = SerializeSchema(schema);
771 await WriteMessageAsync(Flatbuf.MessageHeader.Schema, schemaOffset, 0, cancellationToken)
772 .ConfigureAwait(false);
778 /// Writes the message to the <see cref="BaseStream"/>.
781 /// The number of bytes written to the stream.
783 private protected long WriteMessage<T>(
784 Flatbuf.MessageHeader headerType, Offset<T> headerOffset, int bodyLength)
787 Offset<Flatbuf.Message> messageOffset = Flatbuf.Message.CreateMessage(
788 Builder, CurrentMetadataVersion, headerType, headerOffset.Value,
791 Builder.Finish(messageOffset.Value);
793 ReadOnlyMemory<byte> messageData = Builder.DataBuffer.ToReadOnlyMemory(Builder.DataBuffer.Position, Builder.Offset);
794 int messagePaddingLength = CalculatePadding(_options.SizeOfIpcLength + messageData.Length);
796 WriteIpcMessageLength(messageData.Length + messagePaddingLength);
798 BaseStream.Write(messageData);
799 WritePadding(messagePaddingLength);
803 return _options.SizeOfIpcLength + messageData.Length + messagePaddingLength;
808 /// Writes the message to the <see cref="BaseStream"/>.
811 /// The number of bytes written to the stream.
813 private protected virtual async ValueTask<long> WriteMessageAsync<T>(
814 Flatbuf.MessageHeader headerType, Offset<T> headerOffset, int bodyLength,
815 CancellationToken cancellationToken)
818 Offset<Flatbuf.Message> messageOffset = Flatbuf.Message.CreateMessage(
819 Builder, CurrentMetadataVersion, headerType, headerOffset.Value,
822 Builder.Finish(messageOffset.Value);
824 ReadOnlyMemory<byte> messageData = Builder.DataBuffer.ToReadOnlyMemory(Builder.DataBuffer.Position, Builder.Offset);
825 int messagePaddingLength = CalculatePadding(_options.SizeOfIpcLength + messageData.Length);
827 await WriteIpcMessageLengthAsync(messageData.Length + messagePaddingLength, cancellationToken)
828 .ConfigureAwait(false);
830 await BaseStream.WriteAsync(messageData, cancellationToken).ConfigureAwait(false);
831 await WritePaddingAsync(messagePaddingLength).ConfigureAwait(false);
835 return _options.SizeOfIpcLength + messageData.Length + messagePaddingLength;
839 private protected void WriteFlatBuffer()
841 ReadOnlyMemory<byte> segment = Builder.DataBuffer.ToReadOnlyMemory(Builder.DataBuffer.Position, Builder.Offset);
843 BaseStream.Write(segment);
846 private protected async ValueTask WriteFlatBufferAsync(CancellationToken cancellationToken = default)
848 ReadOnlyMemory<byte> segment = Builder.DataBuffer.ToReadOnlyMemory(Builder.DataBuffer.Position, Builder.Offset);
850 await BaseStream.WriteAsync(segment, cancellationToken).ConfigureAwait(false);
853 private void WriteIpcMessageLength(int length)
855 Buffers.RentReturn(_options.SizeOfIpcLength, (buffer) =>
857 Memory<byte> currentBufferPosition = buffer;
858 if (!_options.WriteLegacyIpcFormat)
860 BinaryPrimitives.WriteInt32LittleEndian(
861 currentBufferPosition.Span, MessageSerializer.IpcContinuationToken);
862 currentBufferPosition = currentBufferPosition.Slice(sizeof(int));
865 BinaryPrimitives.WriteInt32LittleEndian(currentBufferPosition.Span, length);
866 BaseStream.Write(buffer);
870 private async ValueTask WriteIpcMessageLengthAsync(int length, CancellationToken cancellationToken)
872 await Buffers.RentReturnAsync(_options.SizeOfIpcLength, async (buffer) =>
874 Memory<byte> currentBufferPosition = buffer;
875 if (!_options.WriteLegacyIpcFormat)
877 BinaryPrimitives.WriteInt32LittleEndian(
878 currentBufferPosition.Span, MessageSerializer.IpcContinuationToken);
879 currentBufferPosition = currentBufferPosition.Slice(sizeof(int));
882 BinaryPrimitives.WriteInt32LittleEndian(currentBufferPosition.Span, length);
883 await BaseStream.WriteAsync(buffer, cancellationToken).ConfigureAwait(false);
884 }).ConfigureAwait(false);
887 protected int CalculatePadding(long offset, int alignment = 8)
889 long result = BitUtility.RoundUpToMultiplePowerOfTwo(offset, alignment) - offset;
896 private protected void WritePadding(int length)
900 BaseStream.Write(s_padding.AsMemory(0, Math.Min(s_padding.Length, length)));
904 private protected ValueTask WritePaddingAsync(int length)
908 return BaseStream.WriteAsync(s_padding.AsMemory(0, Math.Min(s_padding.Length, length)));
914 public virtual void Dispose()
918 BaseStream.Dispose();
923 internal static class DictionaryCollector
925 internal static void Collect(RecordBatch recordBatch, ref DictionaryMemo dictionaryMemo)
927 Schema schema = recordBatch.Schema;
928 for (int i = 0; i < schema.Fields.Count; i++)
930 Field field = schema.GetFieldByIndex(i);
931 IArrowArray array = recordBatch.Column(i);
933 CollectDictionary(field, array.Data, ref dictionaryMemo);
937 private static void CollectDictionary(Field field, ArrayData arrayData, ref DictionaryMemo dictionaryMemo)
939 if (field.DataType is DictionaryType dictionaryType)
941 if (arrayData.Dictionary == null)
943 throw new ArgumentException($"{nameof(arrayData.Dictionary)} must not be null");
945 arrayData.Dictionary.EnsureDataType(dictionaryType.ValueType.TypeId);
947 IArrowArray dictionary = ArrowArrayFactory.BuildArray(arrayData.Dictionary);
949 dictionaryMemo ??= new DictionaryMemo();
950 long id = dictionaryMemo.GetOrAssignId(field);
952 dictionaryMemo.AddOrReplaceDictionary(id, dictionary);
953 WalkChildren(dictionary.Data, ref dictionaryMemo);
957 WalkChildren(arrayData, ref dictionaryMemo);
961 private static void WalkChildren(ArrayData arrayData, ref DictionaryMemo dictionaryMemo)
963 ArrayData[] children = arrayData.Children;
965 if (children == null)
970 if (arrayData.DataType is NestedType nestedType)
972 for (int i = 0; i < nestedType.Fields.Count; i++)
974 Field childField = nestedType.Fields[i];
975 ArrayData child = children[i];
977 CollectDictionary(childField, child, ref dictionaryMemo);