2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
19 * Contains some contributions under the Thrift Software License.
20 * Please see doc/old-thrift-license.txt in the Thrift distribution for
25 using System.Threading;
26 using Thrift.Protocol;
27 using Thrift.Transport;
28 using Thrift.Processor;
29 using System.Threading.Tasks;
30 using Microsoft.Extensions.Logging;
32 namespace Thrift.Server
35 /// Server that uses C# built-in ThreadPool to spawn threads when handling requests.
37 public class TThreadPoolAsyncServer : TServer
39 private const int DEFAULT_MIN_THREADS = -1; // use .NET ThreadPool defaults
40 private const int DEFAULT_MAX_THREADS = -1; // use .NET ThreadPool defaults
41 private volatile bool stop = false;
43 private CancellationToken ServerCancellationToken;
45 public struct Configuration
47 public int MinWorkerThreads;
48 public int MaxWorkerThreads;
49 public int MinIOThreads;
50 public int MaxIOThreads;
52 public Configuration(int min = DEFAULT_MIN_THREADS, int max = DEFAULT_MAX_THREADS)
54 MinWorkerThreads = min;
55 MaxWorkerThreads = max;
60 public Configuration(int minWork, int maxWork, int minIO, int maxIO)
62 MinWorkerThreads = minWork;
63 MaxWorkerThreads = maxWork;
69 public TThreadPoolAsyncServer(ITAsyncProcessor processor, TServerTransport serverTransport, ILogger logger = null)
70 : this(new TSingletonProcessorFactory(processor), serverTransport,
71 null, null, // defaults to TTransportFactory()
72 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
73 new Configuration(), logger)
77 public TThreadPoolAsyncServer(ITAsyncProcessor processor,
78 TServerTransport serverTransport,
79 TTransportFactory transportFactory,
80 TProtocolFactory protocolFactory)
81 : this(new TSingletonProcessorFactory(processor), serverTransport,
82 transportFactory, transportFactory,
83 protocolFactory, protocolFactory,
88 public TThreadPoolAsyncServer(ITProcessorFactory processorFactory,
89 TServerTransport serverTransport,
90 TTransportFactory transportFactory,
91 TProtocolFactory protocolFactory)
92 : this(processorFactory, serverTransport,
93 transportFactory, transportFactory,
94 protocolFactory, protocolFactory,
99 public TThreadPoolAsyncServer(ITProcessorFactory processorFactory,
100 TServerTransport serverTransport,
101 TTransportFactory inputTransportFactory,
102 TTransportFactory outputTransportFactory,
103 TProtocolFactory inputProtocolFactory,
104 TProtocolFactory outputProtocolFactory,
105 int minThreadPoolThreads, int maxThreadPoolThreads, ILogger logger= null)
106 : this(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory,
107 inputProtocolFactory, outputProtocolFactory,
108 new Configuration(minThreadPoolThreads, maxThreadPoolThreads),
113 public TThreadPoolAsyncServer(ITProcessorFactory processorFactory,
114 TServerTransport serverTransport,
115 TTransportFactory inputTransportFactory,
116 TTransportFactory outputTransportFactory,
117 TProtocolFactory inputProtocolFactory,
118 TProtocolFactory outputProtocolFactory,
119 Configuration threadConfig,
120 ILogger logger = null)
121 : base(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory,
122 inputProtocolFactory, outputProtocolFactory, logger)
124 lock (typeof(TThreadPoolAsyncServer))
126 if ((threadConfig.MaxWorkerThreads > 0) || (threadConfig.MaxIOThreads > 0))
129 ThreadPool.GetMaxThreads(out work, out comm);
130 if (threadConfig.MaxWorkerThreads > 0)
131 work = threadConfig.MaxWorkerThreads;
132 if (threadConfig.MaxIOThreads > 0)
133 comm = threadConfig.MaxIOThreads;
134 if (!ThreadPool.SetMaxThreads(work, comm))
135 throw new Exception("Error: could not SetMaxThreads in ThreadPool");
138 if ((threadConfig.MinWorkerThreads > 0) || (threadConfig.MinIOThreads > 0))
141 ThreadPool.GetMinThreads(out work, out comm);
142 if (threadConfig.MinWorkerThreads > 0)
143 work = threadConfig.MinWorkerThreads;
144 if (threadConfig.MinIOThreads > 0)
145 comm = threadConfig.MinIOThreads;
146 if (!ThreadPool.SetMinThreads(work, comm))
147 throw new Exception("Error: could not SetMinThreads in ThreadPool");
154 /// Use new ThreadPool thread for each new client connection.
156 public override async Task ServeAsync(CancellationToken cancellationToken)
158 ServerCancellationToken = cancellationToken;
163 ServerTransport.Listen();
165 catch (TTransportException ttx)
167 LogError("Error, could not listen on ServerTransport: " + ttx);
171 //Fire the preServe server event when server is up but before any client connections
172 if (ServerEventHandler != null)
173 await ServerEventHandler.PreServeAsync(cancellationToken);
177 int failureCount = 0;
180 TTransport client = await ServerTransport.AcceptAsync(cancellationToken);
181 ThreadPool.QueueUserWorkItem(this.Execute, client);
183 catch (TTransportException ttx)
185 if (!stop || ttx.Type != TTransportException.ExceptionType.Interrupted)
188 LogError(ttx.ToString());
198 ServerTransport.Close();
200 catch (TTransportException ttx)
202 LogError("TServerTransport failed on close: " + ttx.Message);
210 ServerCancellationToken = default(CancellationToken);
215 /// Loops on processing a client forever
216 /// threadContext will be a TTransport instance
218 /// <param name="threadContext"></param>
219 private void Execute(object threadContext)
221 var cancellationToken = ServerCancellationToken;
223 using (TTransport client = (TTransport)threadContext)
225 ITAsyncProcessor processor = ProcessorFactory.GetAsyncProcessor(client, this);
226 TTransport inputTransport = null;
227 TTransport outputTransport = null;
228 TProtocol inputProtocol = null;
229 TProtocol outputProtocol = null;
230 object connectionContext = null;
235 inputTransport = InputTransportFactory.GetTransport(client);
236 outputTransport = OutputTransportFactory.GetTransport(client);
237 inputProtocol = InputProtocolFactory.GetProtocol(inputTransport);
238 outputProtocol = OutputProtocolFactory.GetProtocol(outputTransport);
240 //Recover event handler (if any) and fire createContext server event when a client connects
241 if (ServerEventHandler != null)
242 connectionContext = ServerEventHandler.CreateContextAsync(inputProtocol, outputProtocol, cancellationToken).Result;
244 //Process client requests until client disconnects
247 if (! inputTransport.PeekAsync(cancellationToken).Result)
250 //Fire processContext server event
251 //N.B. This is the pattern implemented in C++ and the event fires provisionally.
252 //That is to say it may be many minutes between the event firing and the client request
253 //actually arriving or the client may hang up without ever makeing a request.
254 if (ServerEventHandler != null)
255 ServerEventHandler.ProcessContextAsync(connectionContext, inputTransport, cancellationToken).Wait();
256 //Process client request (blocks until transport is readable)
257 if (!processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken).Result)
261 catch (TTransportException)
263 //Usually a client disconnect, expected
268 LogError("Error: " + x);
271 //Fire deleteContext server event after client disconnects
272 if (ServerEventHandler != null)
273 ServerEventHandler.DeleteContextAsync(connectionContext, inputProtocol, outputProtocol, cancellationToken).Wait();
279 inputTransport?.Close();
280 outputTransport?.Close();
282 // disposable stuff should be disposed
283 inputProtocol?.Dispose();
284 outputProtocol?.Dispose();
285 inputTransport?.Dispose();
286 outputTransport?.Dispose();
291 public override void Stop()
294 ServerTransport?.Close();