]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/arrow/csharp/src/Apache.Arrow.Flight/Server/Internal/FlightServerImplementation.cs
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / csharp / src / Apache.Arrow.Flight / Server / Internal / FlightServerImplementation.cs
diff --git a/ceph/src/arrow/csharp/src/Apache.Arrow.Flight/Server/Internal/FlightServerImplementation.cs b/ceph/src/arrow/csharp/src/Apache.Arrow.Flight/Server/Internal/FlightServerImplementation.cs
new file mode 100644 (file)
index 0000000..dcf6e57
--- /dev/null
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.Arrow.Flight.Internal;
+using Apache.Arrow.Flight.Protocol;
+using Apache.Arrow.Flight.Server;
+using Grpc.Core;
+
+namespace Apache.Arrow.Flight.Server.Internal
+{
+    /// <summary>
+    /// This class has to be internal, since the generated code from proto is set as internal.
+    /// </summary>
+    internal class FlightServerImplementation : FlightService.FlightServiceBase
+    {
+        private readonly FlightServer _flightServer;
+        public FlightServerImplementation(FlightServer flightServer)
+        {
+            _flightServer = flightServer;
+        }
+
+        public override async Task DoPut(IAsyncStreamReader<FlightData> requestStream, IServerStreamWriter<Protocol.PutResult> responseStream, ServerCallContext context)
+        {
+            var readStream = new FlightServerRecordBatchStreamReader(requestStream);
+            var writeStream = new StreamWriter<FlightPutResult, Protocol.PutResult>(responseStream, putResult => putResult.ToProtocol());
+            await _flightServer.DoPut(readStream, writeStream, context).ConfigureAwait(false);
+        }
+
+        public override Task DoGet(Protocol.Ticket request, IServerStreamWriter<FlightData> responseStream, ServerCallContext context)
+        {
+            return _flightServer.DoGet(new FlightTicket(request.Ticket_), new FlightServerRecordBatchStreamWriter(responseStream), context);
+        }
+
+        public override Task ListFlights(Protocol.Criteria request, IServerStreamWriter<Protocol.FlightInfo> responseStream, ServerCallContext context)
+        {
+            var writeStream = new StreamWriter<FlightInfo, Protocol.FlightInfo>(responseStream, flightInfo => flightInfo.ToProtocol());
+            return _flightServer.ListFlights(new FlightCriteria(request), writeStream, context);
+        }
+
+        public override Task DoAction(Protocol.Action request, IServerStreamWriter<Protocol.Result> responseStream, ServerCallContext context)
+        {
+            var action = new FlightAction(request);
+            var writeStream = new StreamWriter<FlightResult, Protocol.Result>(responseStream, result => result.ToProtocol());
+            return _flightServer.DoAction(action, writeStream, context);
+        }
+
+        public override async Task<SchemaResult> GetSchema(Protocol.FlightDescriptor request, ServerCallContext context)
+        {
+            var flightDescriptor = new FlightDescriptor(request);
+            var schema = await _flightServer.GetSchema(flightDescriptor, context).ConfigureAwait(false);
+
+            return new SchemaResult()
+            {
+                Schema = SchemaWriter.SerializeSchema(schema)
+            };
+        }
+
+        public override async Task<Protocol.FlightInfo> GetFlightInfo(Protocol.FlightDescriptor request, ServerCallContext context)
+        {
+            var flightDescriptor = new FlightDescriptor(request);
+            var flightInfo = await _flightServer.GetFlightInfo(flightDescriptor, context).ConfigureAwait(false);
+
+            return flightInfo.ToProtocol();
+        }
+
+        public override Task DoExchange(IAsyncStreamReader<FlightData> requestStream, IServerStreamWriter<FlightData> responseStream, ServerCallContext context)
+        {
+            //Exchange is not yet implemented
+            throw new NotImplementedException();
+        }
+
+        public override Task Handshake(IAsyncStreamReader<HandshakeRequest> requestStream, IServerStreamWriter<HandshakeResponse> responseStream, ServerCallContext context)
+        {
+            //Handshake is not yet implemented
+            throw new NotImplementedException();
+        }
+
+        public override Task ListActions(Empty request, IServerStreamWriter<Protocol.ActionType> responseStream, ServerCallContext context)
+        {
+            var writeStream = new StreamWriter<FlightActionType, Protocol.ActionType>(responseStream, (actionType) => actionType.ToProtocol());
+            return _flightServer.ListActions(writeStream, context);
+        }
+    }
+}