1 // Licensed to the Apache Software Foundation(ASF) under one
2 // or more contributor license agreements.See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
19 using System.Threading;
20 using System.Threading.Tasks;
21 using Microsoft.Extensions.Logging;
22 using Thrift.Protocol;
23 using Thrift.Processor;
24 using Thrift.Transport;
26 namespace Thrift.Server
28 //TODO: unhandled exceptions, etc.
30 // ReSharper disable once InconsistentNaming
31 public class TSimpleAsyncServer : TServer
33 private readonly int _clientWaitingDelay;
34 private volatile Task _serverTask;
36 public TSimpleAsyncServer(ITProcessorFactory itProcessorFactory,
37 TServerTransport serverTransport,
38 TTransportFactory inputTransportFactory,
39 TTransportFactory outputTransportFactory,
40 TProtocolFactory inputProtocolFactory,
41 TProtocolFactory outputProtocolFactory,
43 int clientWaitingDelay = 10)
44 : base(itProcessorFactory,
46 inputTransportFactory,
47 outputTransportFactory,
49 outputProtocolFactory,
52 _clientWaitingDelay = clientWaitingDelay;
55 public TSimpleAsyncServer(ITProcessorFactory itProcessorFactory,
56 TServerTransport serverTransport,
57 TTransportFactory inputTransportFactory,
58 TTransportFactory outputTransportFactory,
59 TProtocolFactory inputProtocolFactory,
60 TProtocolFactory outputProtocolFactory,
61 ILoggerFactory loggerFactory,
62 int clientWaitingDelay = 10)
63 : this(itProcessorFactory,
65 inputTransportFactory,
66 outputTransportFactory,
68 outputProtocolFactory,
69 loggerFactory.CreateLogger<TSimpleAsyncServer>())
73 public TSimpleAsyncServer(ITAsyncProcessor processor,
74 TServerTransport serverTransport,
75 TProtocolFactory inputProtocolFactory,
76 TProtocolFactory outputProtocolFactory,
77 ILoggerFactory loggerFactory,
78 int clientWaitingDelay = 10)
79 : this(new TSingletonProcessorFactory(processor),
81 null, // defaults to TTransportFactory()
82 null, // defaults to TTransportFactory()
84 outputProtocolFactory,
85 loggerFactory.CreateLogger(nameof(TSimpleAsyncServer)),
90 public override async Task ServeAsync(CancellationToken cancellationToken)
95 _serverTask = Task.Factory.StartNew(() => StartListening(cancellationToken), TaskCreationOptions.LongRunning);
100 Logger.LogError(ex.ToString());
104 private async Task StartListening(CancellationToken cancellationToken)
106 ServerTransport.Listen();
108 Logger.LogTrace("Started listening at server");
110 if (ServerEventHandler != null)
112 await ServerEventHandler.PreServeAsync(cancellationToken);
115 while (!cancellationToken.IsCancellationRequested)
117 if (ServerTransport.IsClientPending())
119 Logger.LogTrace("Waiting for client connection");
123 var client = await ServerTransport.AcceptAsync(cancellationToken);
124 await Task.Factory.StartNew(() => Execute(client, cancellationToken), cancellationToken);
126 catch (TTransportException ttx)
128 Logger.LogTrace($"Transport exception: {ttx}");
130 if (ttx.Type != TTransportException.ExceptionType.Interrupted)
132 Logger.LogError(ttx.ToString());
140 await Task.Delay(TimeSpan.FromMilliseconds(_clientWaitingDelay), cancellationToken);
142 catch (TaskCanceledException) { }
146 ServerTransport.Close();
148 Logger.LogTrace("Completed listening at server");
151 public override void Stop()
155 private async Task Execute(TTransport client, CancellationToken cancellationToken)
157 Logger.LogTrace("Started client request processing");
159 var processor = ProcessorFactory.GetAsyncProcessor(client, this);
161 TTransport inputTransport = null;
162 TTransport outputTransport = null;
163 TProtocol inputProtocol = null;
164 TProtocol outputProtocol = null;
165 object connectionContext = null;
171 inputTransport = InputTransportFactory.GetTransport(client);
172 outputTransport = OutputTransportFactory.GetTransport(client);
173 inputProtocol = InputProtocolFactory.GetProtocol(inputTransport);
174 outputProtocol = OutputProtocolFactory.GetProtocol(outputTransport);
176 if (ServerEventHandler != null)
178 connectionContext = await ServerEventHandler.CreateContextAsync(inputProtocol, outputProtocol, cancellationToken);
181 while (!cancellationToken.IsCancellationRequested)
183 if (!await inputTransport.PeekAsync(cancellationToken))
188 if (ServerEventHandler != null)
190 await ServerEventHandler.ProcessContextAsync(connectionContext, inputTransport, cancellationToken);
193 if (!await processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken))
199 catch (TTransportException ttx)
201 Logger.LogTrace($"Transport exception: {ttx}");
205 Logger.LogError($"Error: {x}");
208 if (ServerEventHandler != null)
210 await ServerEventHandler.DeleteContextAsync(connectionContext, inputProtocol, outputProtocol, cancellationToken);
217 inputTransport?.Close();
218 outputTransport?.Close();
220 // disposable stuff should be disposed
221 inputProtocol?.Dispose();
222 outputProtocol?.Dispose();
223 inputTransport?.Dispose();
224 outputTransport?.Dispose();
227 Logger.LogTrace("Completed client request processing");