1 // Licensed to the Apache Software Foundation (ASF) under one or more
2 // contributor license agreements. See the NOTICE file distributed with
3 // this work for additional information regarding copyright ownership.
4 // The ASF licenses this file to You under the Apache License, Version 2.0
5 // (the "License"); you may not use this file except in compliance with
6 // the License. You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 using System.Collections.Generic;
20 using System.Threading;
21 using System.Threading.Tasks;
22 using Apache.Arrow.Flatbuf;
23 using Apache.Arrow.Flight.Protocol;
24 using Apache.Arrow.Ipc;
26 using Google.Protobuf;
29 namespace Apache.Arrow.Flight.Internal
32 /// Handles writing record batches as flight data
34 internal class FlightDataStream : ArrowStreamWriter
36 private readonly FlightDescriptor _flightDescriptor;
37 private readonly IAsyncStreamWriter<FlightData> _clientStreamWriter;
38 private Protocol.FlightData _currentFlightData;
40 public FlightDataStream(IAsyncStreamWriter<FlightData> clientStreamWriter, FlightDescriptor flightDescriptor, Schema schema)
41 : base(new MemoryStream(), schema)
43 _clientStreamWriter = clientStreamWriter;
44 _flightDescriptor = flightDescriptor;
47 private async Task SendSchema()
49 _currentFlightData = new Protocol.FlightData();
51 if(_flightDescriptor != null)
53 _currentFlightData.FlightDescriptor = _flightDescriptor.ToProtocol();
56 var offset = SerializeSchema(Schema);
57 CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
58 await WriteMessageAsync(MessageHeader.Schema, offset, 0, cancellationTokenSource.Token).ConfigureAwait(false);
59 await _clientStreamWriter.WriteAsync(_currentFlightData).ConfigureAwait(false);
60 HasWrittenSchema = true;
63 private void ResetStream()
65 this.BaseStream.Position = 0;
66 this.BaseStream.SetLength(0);
69 public async Task Write(RecordBatch recordBatch, ByteString applicationMetadata)
71 if (!HasWrittenSchema)
73 await SendSchema().ConfigureAwait(false);
77 _currentFlightData = new Protocol.FlightData();
79 if(applicationMetadata != null)
81 _currentFlightData.AppMetadata = applicationMetadata;
84 await WriteRecordBatchInternalAsync(recordBatch).ConfigureAwait(false);
86 //Reset stream position
87 this.BaseStream.Position = 0;
88 var bodyData = await ByteString.FromStreamAsync(this.BaseStream).ConfigureAwait(false);
90 _currentFlightData.DataBody = bodyData;
91 await _clientStreamWriter.WriteAsync(_currentFlightData).ConfigureAwait(false);
94 private protected override ValueTask<long> WriteMessageAsync<T>(MessageHeader headerType, Offset<T> headerOffset, int bodyLength, CancellationToken cancellationToken)
96 Offset<Flatbuf.Message> messageOffset = Flatbuf.Message.CreateMessage(
97 Builder, CurrentMetadataVersion, headerType, headerOffset.Value,
100 Builder.Finish(messageOffset.Value);
102 ReadOnlyMemory<byte> messageData = Builder.DataBuffer.ToReadOnlyMemory(Builder.DataBuffer.Position, Builder.Offset);
104 _currentFlightData.DataHeader = ByteString.CopyFrom(messageData.Span);
106 return new ValueTask<long>(0);