]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/csharp/src/Apache.Arrow.Flight/Internal/FlightDataStream.cs
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / csharp / src / Apache.Arrow.Flight / Internal / FlightDataStream.cs
1 // Licensed to the Apache Software Foundation (ASF) under one or more
2 // contributor license agreements. See the NOTICE file distributed with
3 // this work for additional information regarding copyright ownership.
4 // The ASF licenses this file to You under the Apache License, Version 2.0
5 // (the "License"); you may not use this file except in compliance with
6 // the License. You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15
16 using System;
17 using System.Collections.Generic;
18 using System.IO;
19 using System.Text;
20 using System.Threading;
21 using System.Threading.Tasks;
22 using Apache.Arrow.Flatbuf;
23 using Apache.Arrow.Flight.Protocol;
24 using Apache.Arrow.Ipc;
25 using FlatBuffers;
26 using Google.Protobuf;
27 using Grpc.Core;
28
29 namespace Apache.Arrow.Flight.Internal
30 {
31 /// <summary>
32 /// Handles writing record batches as flight data
33 /// </summary>
34 internal class FlightDataStream : ArrowStreamWriter
35 {
36 private readonly FlightDescriptor _flightDescriptor;
37 private readonly IAsyncStreamWriter<FlightData> _clientStreamWriter;
38 private Protocol.FlightData _currentFlightData;
39
40 public FlightDataStream(IAsyncStreamWriter<FlightData> clientStreamWriter, FlightDescriptor flightDescriptor, Schema schema)
41 : base(new MemoryStream(), schema)
42 {
43 _clientStreamWriter = clientStreamWriter;
44 _flightDescriptor = flightDescriptor;
45 }
46
47 private async Task SendSchema()
48 {
49 _currentFlightData = new Protocol.FlightData();
50
51 if(_flightDescriptor != null)
52 {
53 _currentFlightData.FlightDescriptor = _flightDescriptor.ToProtocol();
54 }
55
56 var offset = SerializeSchema(Schema);
57 CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
58 await WriteMessageAsync(MessageHeader.Schema, offset, 0, cancellationTokenSource.Token).ConfigureAwait(false);
59 await _clientStreamWriter.WriteAsync(_currentFlightData).ConfigureAwait(false);
60 HasWrittenSchema = true;
61 }
62
63 private void ResetStream()
64 {
65 this.BaseStream.Position = 0;
66 this.BaseStream.SetLength(0);
67 }
68
69 public async Task Write(RecordBatch recordBatch, ByteString applicationMetadata)
70 {
71 if (!HasWrittenSchema)
72 {
73 await SendSchema().ConfigureAwait(false);
74 }
75 ResetStream();
76
77 _currentFlightData = new Protocol.FlightData();
78
79 if(applicationMetadata != null)
80 {
81 _currentFlightData.AppMetadata = applicationMetadata;
82 }
83
84 await WriteRecordBatchInternalAsync(recordBatch).ConfigureAwait(false);
85
86 //Reset stream position
87 this.BaseStream.Position = 0;
88 var bodyData = await ByteString.FromStreamAsync(this.BaseStream).ConfigureAwait(false);
89
90 _currentFlightData.DataBody = bodyData;
91 await _clientStreamWriter.WriteAsync(_currentFlightData).ConfigureAwait(false);
92 }
93
94 private protected override ValueTask<long> WriteMessageAsync<T>(MessageHeader headerType, Offset<T> headerOffset, int bodyLength, CancellationToken cancellationToken)
95 {
96 Offset<Flatbuf.Message> messageOffset = Flatbuf.Message.CreateMessage(
97 Builder, CurrentMetadataVersion, headerType, headerOffset.Value,
98 bodyLength);
99
100 Builder.Finish(messageOffset.Value);
101
102 ReadOnlyMemory<byte> messageData = Builder.DataBuffer.ToReadOnlyMemory(Builder.DataBuffer.Position, Builder.Offset);
103
104 _currentFlightData.DataHeader = ByteString.CopyFrom(messageData.Span);
105
106 return new ValueTask<long>(0);
107 }
108 }
109 }