]> git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/thrift/lib/csharp/src/Server/TThreadedServer.cs
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / csharp / src / Server / TThreadedServer.cs
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 using System;
21 using System.Collections.Generic;
22 using System.Threading;
23 using Thrift.Collections;
24 using Thrift.Protocol;
25 using Thrift.Transport;
26
27 namespace Thrift.Server
28 {
29 /// <summary>
30 /// Server that uses C# threads (as opposed to the ThreadPool) when handling requests.
31 /// </summary>
32 public class TThreadedServer : TServer
33 {
34 private const int DEFAULT_MAX_THREADS = 100;
35 private volatile bool stop = false;
36 private readonly int maxThreads;
37
38 private Queue<TTransport> clientQueue;
39 private THashSet<Thread> clientThreads;
40 private object clientLock;
41 private Thread workerThread;
42
43 public int ClientThreadsCount
44 {
45 get { return clientThreads.Count; }
46 }
47
48 public TThreadedServer(TProcessor processor, TServerTransport serverTransport)
49 : this(new TSingletonProcessorFactory(processor), serverTransport,
50 new TTransportFactory(), new TTransportFactory(),
51 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
52 DEFAULT_MAX_THREADS, DefaultLogDelegate)
53 {
54 }
55
56 public TThreadedServer(TProcessor processor, TServerTransport serverTransport, LogDelegate logDelegate)
57 : this(new TSingletonProcessorFactory(processor), serverTransport,
58 new TTransportFactory(), new TTransportFactory(),
59 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
60 DEFAULT_MAX_THREADS, logDelegate)
61 {
62 }
63
64
65 public TThreadedServer(TProcessor processor,
66 TServerTransport serverTransport,
67 TTransportFactory transportFactory,
68 TProtocolFactory protocolFactory)
69 : this(new TSingletonProcessorFactory(processor), serverTransport,
70 transportFactory, transportFactory,
71 protocolFactory, protocolFactory,
72 DEFAULT_MAX_THREADS, DefaultLogDelegate)
73 {
74 }
75
76 public TThreadedServer(TProcessorFactory processorFactory,
77 TServerTransport serverTransport,
78 TTransportFactory transportFactory,
79 TProtocolFactory protocolFactory)
80 : this(processorFactory, serverTransport,
81 transportFactory, transportFactory,
82 protocolFactory, protocolFactory,
83 DEFAULT_MAX_THREADS, DefaultLogDelegate)
84 {
85 }
86 public TThreadedServer(TProcessorFactory processorFactory,
87 TServerTransport serverTransport,
88 TTransportFactory inputTransportFactory,
89 TTransportFactory outputTransportFactory,
90 TProtocolFactory inputProtocolFactory,
91 TProtocolFactory outputProtocolFactory,
92 int maxThreads, LogDelegate logDel)
93 : base(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory,
94 inputProtocolFactory, outputProtocolFactory, logDel)
95 {
96 this.maxThreads = maxThreads;
97 clientQueue = new Queue<TTransport>();
98 clientLock = new object();
99 clientThreads = new THashSet<Thread>();
100 }
101
102 /// <summary>
103 /// Use new Thread for each new client connection. block until numConnections &lt; maxThreads.
104 /// </summary>
105 public override void Serve()
106 {
107 try
108 {
109 //start worker thread
110 workerThread = new Thread(new ThreadStart(Execute));
111 workerThread.Start();
112 serverTransport.Listen();
113 }
114 catch (TTransportException ttx)
115 {
116 logDelegate("Error, could not listen on ServerTransport: " + ttx);
117 return;
118 }
119
120 //Fire the preServe server event when server is up but before any client connections
121 if (serverEventHandler != null)
122 serverEventHandler.preServe();
123
124 while (!stop)
125 {
126 int failureCount = 0;
127 try
128 {
129 TTransport client = serverTransport.Accept();
130 lock (clientLock)
131 {
132 clientQueue.Enqueue(client);
133 Monitor.Pulse(clientLock);
134 }
135 }
136 catch (TTransportException ttx)
137 {
138 if (!stop || ttx.Type != TTransportException.ExceptionType.Interrupted)
139 {
140 ++failureCount;
141 logDelegate(ttx.ToString());
142 }
143
144 }
145 }
146
147 if (stop)
148 {
149 try
150 {
151 serverTransport.Close();
152 }
153 catch (TTransportException ttx)
154 {
155 logDelegate("TServeTransport failed on close: " + ttx.Message);
156 }
157 stop = false;
158 }
159 }
160
161 /// <summary>
162 /// Loops on processing a client forever
163 /// </summary>
164 private void Execute()
165 {
166 while (!stop)
167 {
168 TTransport client;
169 Thread t;
170 lock (clientLock)
171 {
172 //don't dequeue if too many connections
173 while (clientThreads.Count >= maxThreads)
174 {
175 Monitor.Wait(clientLock);
176 }
177
178 while (clientQueue.Count == 0)
179 {
180 Monitor.Wait(clientLock);
181 }
182
183 client = clientQueue.Dequeue();
184 t = new Thread(new ParameterizedThreadStart(ClientWorker));
185 clientThreads.Add(t);
186 }
187 //start processing requests from client on new thread
188 t.Start(client);
189 }
190 }
191
192 private void ClientWorker(object context)
193 {
194 using (TTransport client = (TTransport)context)
195 {
196 TProcessor processor = processorFactory.GetProcessor(client);
197 TTransport inputTransport = null;
198 TTransport outputTransport = null;
199 TProtocol inputProtocol = null;
200 TProtocol outputProtocol = null;
201 object connectionContext = null;
202 try
203 {
204 try
205 {
206 inputTransport = inputTransportFactory.GetTransport(client);
207 outputTransport = outputTransportFactory.GetTransport(client);
208 inputProtocol = inputProtocolFactory.GetProtocol(inputTransport);
209 outputProtocol = outputProtocolFactory.GetProtocol(outputTransport);
210
211 //Recover event handler (if any) and fire createContext server event when a client connects
212 if (serverEventHandler != null)
213 connectionContext = serverEventHandler.createContext(inputProtocol, outputProtocol);
214
215 //Process client requests until client disconnects
216 while (!stop)
217 {
218 if (!inputTransport.Peek())
219 break;
220
221 //Fire processContext server event
222 //N.B. This is the pattern implemented in C++ and the event fires provisionally.
223 //That is to say it may be many minutes between the event firing and the client request
224 //actually arriving or the client may hang up without ever makeing a request.
225 if (serverEventHandler != null)
226 serverEventHandler.processContext(connectionContext, inputTransport);
227 //Process client request (blocks until transport is readable)
228 if (!processor.Process(inputProtocol, outputProtocol))
229 break;
230 }
231 }
232 catch (TTransportException)
233 {
234 //Usually a client disconnect, expected
235 }
236 catch (Exception x)
237 {
238 //Unexpected
239 logDelegate("Error: " + x);
240 }
241
242 //Fire deleteContext server event after client disconnects
243 if (serverEventHandler != null)
244 serverEventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol);
245
246 lock (clientLock)
247 {
248 clientThreads.Remove(Thread.CurrentThread);
249 Monitor.Pulse(clientLock);
250 }
251
252 }
253 finally
254 {
255 //Close transports
256 if (inputTransport != null)
257 inputTransport.Close();
258 if (outputTransport != null)
259 outputTransport.Close();
260
261 // disposable stuff should be disposed
262 if (inputProtocol != null)
263 inputProtocol.Dispose();
264 if (outputProtocol != null)
265 outputProtocol.Dispose();
266 }
267 }
268 }
269
270 public override void Stop()
271 {
272 stop = true;
273 serverTransport.Close();
274 //clean up all the threads myself
275 workerThread.Abort();
276 foreach (Thread t in clientThreads)
277 {
278 t.Abort();
279 }
280 }
281 }
282 }