2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
21 using System.Collections.Generic;
22 using System.Threading;
23 using Thrift.Collections;
24 using Thrift.Protocol;
25 using Thrift.Transport;
27 namespace Thrift.Server
30 /// Server that uses C# threads (as opposed to the ThreadPool) when handling requests.
32 public class TThreadedServer : TServer
34 private const int DEFAULT_MAX_THREADS = 100;
35 private volatile bool stop = false;
36 private readonly int maxThreads;
38 private Queue<TTransport> clientQueue;
39 private THashSet<Thread> clientThreads;
40 private object clientLock;
41 private Thread workerThread;
43 public int ClientThreadsCount
45 get { return clientThreads.Count; }
48 public TThreadedServer(TProcessor processor, TServerTransport serverTransport)
49 : this(new TSingletonProcessorFactory(processor), serverTransport,
50 new TTransportFactory(), new TTransportFactory(),
51 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
52 DEFAULT_MAX_THREADS, DefaultLogDelegate)
56 public TThreadedServer(TProcessor processor, TServerTransport serverTransport, LogDelegate logDelegate)
57 : this(new TSingletonProcessorFactory(processor), serverTransport,
58 new TTransportFactory(), new TTransportFactory(),
59 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
60 DEFAULT_MAX_THREADS, logDelegate)
65 public TThreadedServer(TProcessor processor,
66 TServerTransport serverTransport,
67 TTransportFactory transportFactory,
68 TProtocolFactory protocolFactory)
69 : this(new TSingletonProcessorFactory(processor), serverTransport,
70 transportFactory, transportFactory,
71 protocolFactory, protocolFactory,
72 DEFAULT_MAX_THREADS, DefaultLogDelegate)
76 public TThreadedServer(TProcessorFactory processorFactory,
77 TServerTransport serverTransport,
78 TTransportFactory transportFactory,
79 TProtocolFactory protocolFactory)
80 : this(processorFactory, serverTransport,
81 transportFactory, transportFactory,
82 protocolFactory, protocolFactory,
83 DEFAULT_MAX_THREADS, DefaultLogDelegate)
86 public TThreadedServer(TProcessorFactory processorFactory,
87 TServerTransport serverTransport,
88 TTransportFactory inputTransportFactory,
89 TTransportFactory outputTransportFactory,
90 TProtocolFactory inputProtocolFactory,
91 TProtocolFactory outputProtocolFactory,
92 int maxThreads, LogDelegate logDel)
93 : base(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory,
94 inputProtocolFactory, outputProtocolFactory, logDel)
96 this.maxThreads = maxThreads;
97 clientQueue = new Queue<TTransport>();
98 clientLock = new object();
99 clientThreads = new THashSet<Thread>();
103 /// Use new Thread for each new client connection. block until numConnections < maxThreads.
105 public override void Serve()
109 //start worker thread
110 workerThread = new Thread(new ThreadStart(Execute));
111 workerThread.Start();
112 serverTransport.Listen();
114 catch (TTransportException ttx)
116 logDelegate("Error, could not listen on ServerTransport: " + ttx);
120 //Fire the preServe server event when server is up but before any client connections
121 if (serverEventHandler != null)
122 serverEventHandler.preServe();
126 int failureCount = 0;
129 TTransport client = serverTransport.Accept();
132 clientQueue.Enqueue(client);
133 Monitor.Pulse(clientLock);
136 catch (TTransportException ttx)
138 if (!stop || ttx.Type != TTransportException.ExceptionType.Interrupted)
141 logDelegate(ttx.ToString());
151 serverTransport.Close();
153 catch (TTransportException ttx)
155 logDelegate("TServeTransport failed on close: " + ttx.Message);
162 /// Loops on processing a client forever
164 private void Execute()
172 //don't dequeue if too many connections
173 while (clientThreads.Count >= maxThreads)
175 Monitor.Wait(clientLock);
178 while (clientQueue.Count == 0)
180 Monitor.Wait(clientLock);
183 client = clientQueue.Dequeue();
184 t = new Thread(new ParameterizedThreadStart(ClientWorker));
185 clientThreads.Add(t);
187 //start processing requests from client on new thread
192 private void ClientWorker(object context)
194 using (TTransport client = (TTransport)context)
196 TProcessor processor = processorFactory.GetProcessor(client);
197 TTransport inputTransport = null;
198 TTransport outputTransport = null;
199 TProtocol inputProtocol = null;
200 TProtocol outputProtocol = null;
201 object connectionContext = null;
206 inputTransport = inputTransportFactory.GetTransport(client);
207 outputTransport = outputTransportFactory.GetTransport(client);
208 inputProtocol = inputProtocolFactory.GetProtocol(inputTransport);
209 outputProtocol = outputProtocolFactory.GetProtocol(outputTransport);
211 //Recover event handler (if any) and fire createContext server event when a client connects
212 if (serverEventHandler != null)
213 connectionContext = serverEventHandler.createContext(inputProtocol, outputProtocol);
215 //Process client requests until client disconnects
218 if (!inputTransport.Peek())
221 //Fire processContext server event
222 //N.B. This is the pattern implemented in C++ and the event fires provisionally.
223 //That is to say it may be many minutes between the event firing and the client request
224 //actually arriving or the client may hang up without ever makeing a request.
225 if (serverEventHandler != null)
226 serverEventHandler.processContext(connectionContext, inputTransport);
227 //Process client request (blocks until transport is readable)
228 if (!processor.Process(inputProtocol, outputProtocol))
232 catch (TTransportException)
234 //Usually a client disconnect, expected
239 logDelegate("Error: " + x);
242 //Fire deleteContext server event after client disconnects
243 if (serverEventHandler != null)
244 serverEventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol);
248 clientThreads.Remove(Thread.CurrentThread);
249 Monitor.Pulse(clientLock);
256 if (inputTransport != null)
257 inputTransport.Close();
258 if (outputTransport != null)
259 outputTransport.Close();
261 // disposable stuff should be disposed
262 if (inputProtocol != null)
263 inputProtocol.Dispose();
264 if (outputProtocol != null)
265 outputProtocol.Dispose();
270 public override void Stop()
273 serverTransport.Close();
274 //clean up all the threads myself
275 workerThread.Abort();
276 foreach (Thread t in clientThreads)