2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
19 * Contains some contributions under the Thrift Software License.
20 * Please see doc/old-thrift-license.txt in the Thrift distribution for
25 using System.Threading;
26 using Thrift.Protocol;
27 using Thrift.Transport;
29 namespace Thrift.Server
32 /// Server that uses C# built-in ThreadPool to spawn threads when handling requests.
34 public class TThreadPoolServer : TServer
36 private const int DEFAULT_MIN_THREADS = -1; // use .NET ThreadPool defaults
37 private const int DEFAULT_MAX_THREADS = -1; // use .NET ThreadPool defaults
38 private volatile bool stop = false;
40 public struct Configuration
42 public int MinWorkerThreads;
43 public int MaxWorkerThreads;
44 public int MinIOThreads;
45 public int MaxIOThreads;
47 public Configuration(int min = DEFAULT_MIN_THREADS, int max = DEFAULT_MAX_THREADS)
49 MinWorkerThreads = min;
50 MaxWorkerThreads = max;
55 public Configuration(int minWork, int maxWork, int minIO, int maxIO)
57 MinWorkerThreads = minWork;
58 MaxWorkerThreads = maxWork;
64 public TThreadPoolServer(TProcessor processor, TServerTransport serverTransport)
65 : this(new TSingletonProcessorFactory(processor), serverTransport,
66 new TTransportFactory(), new TTransportFactory(),
67 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
68 new Configuration(), DefaultLogDelegate)
72 public TThreadPoolServer(TProcessor processor, TServerTransport serverTransport, LogDelegate logDelegate)
73 : this(new TSingletonProcessorFactory(processor), serverTransport,
74 new TTransportFactory(), new TTransportFactory(),
75 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
76 new Configuration(), logDelegate)
80 public TThreadPoolServer(TProcessor processor,
81 TServerTransport serverTransport,
82 TTransportFactory transportFactory,
83 TProtocolFactory protocolFactory)
84 : this(new TSingletonProcessorFactory(processor), serverTransport,
85 transportFactory, transportFactory,
86 protocolFactory, protocolFactory,
87 new Configuration(), DefaultLogDelegate)
91 public TThreadPoolServer(TProcessorFactory processorFactory,
92 TServerTransport serverTransport,
93 TTransportFactory transportFactory,
94 TProtocolFactory protocolFactory)
95 : this(processorFactory, serverTransport,
96 transportFactory, transportFactory,
97 protocolFactory, protocolFactory,
98 new Configuration(), DefaultLogDelegate)
102 public TThreadPoolServer(TProcessorFactory processorFactory,
103 TServerTransport serverTransport,
104 TTransportFactory inputTransportFactory,
105 TTransportFactory outputTransportFactory,
106 TProtocolFactory inputProtocolFactory,
107 TProtocolFactory outputProtocolFactory,
108 int minThreadPoolThreads, int maxThreadPoolThreads, LogDelegate logDel)
109 : this(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory,
110 inputProtocolFactory, outputProtocolFactory,
111 new Configuration(minThreadPoolThreads, maxThreadPoolThreads),
116 public TThreadPoolServer(TProcessorFactory processorFactory,
117 TServerTransport serverTransport,
118 TTransportFactory inputTransportFactory,
119 TTransportFactory outputTransportFactory,
120 TProtocolFactory inputProtocolFactory,
121 TProtocolFactory outputProtocolFactory,
122 Configuration threadConfig,
124 : base(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory,
125 inputProtocolFactory, outputProtocolFactory, logDel)
127 lock (typeof(TThreadPoolServer))
129 if ((threadConfig.MaxWorkerThreads > 0) || (threadConfig.MaxIOThreads > 0))
132 ThreadPool.GetMaxThreads(out work, out comm);
133 if (threadConfig.MaxWorkerThreads > 0)
134 work = threadConfig.MaxWorkerThreads;
135 if (threadConfig.MaxIOThreads > 0)
136 comm = threadConfig.MaxIOThreads;
137 if (!ThreadPool.SetMaxThreads(work, comm))
138 throw new Exception("Error: could not SetMaxThreads in ThreadPool");
141 if ((threadConfig.MinWorkerThreads > 0) || (threadConfig.MinIOThreads > 0))
144 ThreadPool.GetMinThreads(out work, out comm);
145 if (threadConfig.MinWorkerThreads > 0)
146 work = threadConfig.MinWorkerThreads;
147 if (threadConfig.MinIOThreads > 0)
148 comm = threadConfig.MinIOThreads;
149 if (!ThreadPool.SetMinThreads(work, comm))
150 throw new Exception("Error: could not SetMinThreads in ThreadPool");
157 /// Use new ThreadPool thread for each new client connection.
159 public override void Serve()
163 serverTransport.Listen();
165 catch (TTransportException ttx)
167 logDelegate("Error, could not listen on ServerTransport: " + ttx);
171 //Fire the preServe server event when server is up but before any client connections
172 if (serverEventHandler != null)
173 serverEventHandler.preServe();
177 int failureCount = 0;
180 TTransport client = serverTransport.Accept();
181 ThreadPool.QueueUserWorkItem(this.Execute, client);
183 catch (TTransportException ttx)
185 if (!stop || ttx.Type != TTransportException.ExceptionType.Interrupted)
188 logDelegate(ttx.ToString());
198 serverTransport.Close();
200 catch (TTransportException ttx)
202 logDelegate("TServerTransport failed on close: " + ttx.Message);
209 /// Loops on processing a client forever
210 /// threadContext will be a TTransport instance
212 /// <param name="threadContext"></param>
213 private void Execute(object threadContext)
215 using (TTransport client = (TTransport)threadContext)
217 TProcessor processor = processorFactory.GetProcessor(client, this);
218 TTransport inputTransport = null;
219 TTransport outputTransport = null;
220 TProtocol inputProtocol = null;
221 TProtocol outputProtocol = null;
222 object connectionContext = null;
227 inputTransport = inputTransportFactory.GetTransport(client);
228 outputTransport = outputTransportFactory.GetTransport(client);
229 inputProtocol = inputProtocolFactory.GetProtocol(inputTransport);
230 outputProtocol = outputProtocolFactory.GetProtocol(outputTransport);
232 //Recover event handler (if any) and fire createContext server event when a client connects
233 if (serverEventHandler != null)
234 connectionContext = serverEventHandler.createContext(inputProtocol, outputProtocol);
236 //Process client requests until client disconnects
239 if (!inputTransport.Peek())
242 //Fire processContext server event
243 //N.B. This is the pattern implemented in C++ and the event fires provisionally.
244 //That is to say it may be many minutes between the event firing and the client request
245 //actually arriving or the client may hang up without ever makeing a request.
246 if (serverEventHandler != null)
247 serverEventHandler.processContext(connectionContext, inputTransport);
248 //Process client request (blocks until transport is readable)
249 if (!processor.Process(inputProtocol, outputProtocol))
253 catch (TTransportException)
255 //Usually a client disconnect, expected
260 logDelegate("Error: " + x);
263 //Fire deleteContext server event after client disconnects
264 if (serverEventHandler != null)
265 serverEventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol);
271 if (inputTransport != null)
272 inputTransport.Close();
273 if (outputTransport != null)
274 outputTransport.Close();
276 // disposable stuff should be disposed
277 if (inputProtocol != null)
278 inputProtocol.Dispose();
279 if (outputProtocol != null)
280 outputProtocol.Dispose();
281 if (inputTransport != null)
282 inputTransport.Dispose();
283 if (outputTransport != null)
284 outputTransport.Dispose();
289 public override void Stop()
292 serverTransport.Close();