]> git.proxmox.com Git - ceph.git/blame - ceph/src/jaegertracing/thrift/lib/d/src/thrift/server/taskpool.d
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / d / src / thrift / server / taskpool.d
CommitLineData
f67539c2
TL
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19module thrift.server.taskpool;
20
21import core.sync.condition;
22import core.sync.mutex;
23import std.exception : enforce;
24import std.parallelism;
25import std.variant : Variant;
26import thrift.base;
27import thrift.protocol.base;
28import thrift.protocol.processor;
29import thrift.server.base;
30import thrift.server.transport.base;
31import thrift.transport.base;
32import thrift.util.cancellation;
33
34/**
35 * A server which dispatches client requests to a std.parallelism TaskPool.
36 */
37class TTaskPoolServer : TServer {
38 ///
39 this(
40 TProcessor processor,
41 TServerTransport serverTransport,
42 TTransportFactory transportFactory,
43 TProtocolFactory protocolFactory,
44 TaskPool taskPool = null
45 ) {
46 this(processor, serverTransport, transportFactory, transportFactory,
47 protocolFactory, protocolFactory, taskPool);
48 }
49
50 ///
51 this(
52 TProcessorFactory processorFactory,
53 TServerTransport serverTransport,
54 TTransportFactory transportFactory,
55 TProtocolFactory protocolFactory,
56 TaskPool taskPool = null
57 ) {
58 this(processorFactory, serverTransport, transportFactory, transportFactory,
59 protocolFactory, protocolFactory, taskPool);
60 }
61
62 ///
63 this(
64 TProcessor processor,
65 TServerTransport serverTransport,
66 TTransportFactory inputTransportFactory,
67 TTransportFactory outputTransportFactory,
68 TProtocolFactory inputProtocolFactory,
69 TProtocolFactory outputProtocolFactory,
70 TaskPool taskPool = null
71 ) {
72 this(new TSingletonProcessorFactory(processor), serverTransport,
73 inputTransportFactory, outputTransportFactory,
74 inputProtocolFactory, outputProtocolFactory);
75 }
76
77 ///
78 this(
79 TProcessorFactory processorFactory,
80 TServerTransport serverTransport,
81 TTransportFactory inputTransportFactory,
82 TTransportFactory outputTransportFactory,
83 TProtocolFactory inputProtocolFactory,
84 TProtocolFactory outputProtocolFactory,
85 TaskPool taskPool = null
86 ) {
87 super(processorFactory, serverTransport, inputTransportFactory,
88 outputTransportFactory, inputProtocolFactory, outputProtocolFactory);
89
90 if (taskPool) {
91 this.taskPool = taskPool;
92 } else {
93 auto ptp = std.parallelism.taskPool;
94 if (ptp.size > 0) {
95 taskPool_ = ptp;
96 } else {
97 // If the global task pool is empty (default on a single-core machine),
98 // create a new one with a single worker thread. The rationale for this
99 // is to avoid that an application which worked fine with no task pool
100 // explicitly set on the multi-core developer boxes suddenly fails on a
101 // single-core user machine.
102 taskPool_ = new TaskPool(1);
103 taskPool_.isDaemon = true;
104 }
105 }
106 }
107
108 override void serve(TCancellation cancellation = null) {
109 serverTransport_.listen();
110
111 if (eventHandler) eventHandler.preServe();
112
113 auto queueState = QueueState();
114
115 while (true) {
116 // Check if we can still handle more connections.
117 if (maxActiveConns) {
118 synchronized (queueState.mutex) {
119 while (queueState.activeConns >= maxActiveConns) {
120 queueState.connClosed.wait();
121 }
122 }
123 }
124
125 TTransport client;
126 TTransport inputTransport;
127 TTransport outputTransport;
128 TProtocol inputProtocol;
129 TProtocol outputProtocol;
130
131 try {
132 client = serverTransport_.accept(cancellation);
133 scope(failure) client.close();
134
135 inputTransport = inputTransportFactory_.getTransport(client);
136 scope(failure) inputTransport.close();
137
138 outputTransport = outputTransportFactory_.getTransport(client);
139 scope(failure) outputTransport.close();
140
141 inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
142 outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
143 } catch (TCancelledException tce) {
144 break;
145 } catch (TTransportException ttx) {
146 logError("TServerTransport failed on accept: %s", ttx);
147 continue;
148 } catch (TException tx) {
149 logError("Caught TException on accept: %s", tx);
150 continue;
151 }
152
153 auto info = TConnectionInfo(inputProtocol, outputProtocol, client);
154 auto processor = processorFactory_.getProcessor(info);
155
156 synchronized (queueState.mutex) {
157 ++queueState.activeConns;
158 }
159 taskPool_.put(task!worker(queueState, client, inputProtocol,
160 outputProtocol, processor, eventHandler));
161 }
162
163 // First, stop accepting new connections.
164 try {
165 serverTransport_.close();
166 } catch (TServerTransportException e) {
167 logError("Server transport failed to close: %s", e);
168 }
169
170 // Then, wait until all active connections are finished.
171 synchronized (queueState.mutex) {
172 while (queueState.activeConns > 0) {
173 queueState.connClosed.wait();
174 }
175 }
176 }
177
178 /**
179 * Sets the task pool to use.
180 *
181 * By default, the global std.parallelism taskPool instance is used, which
182 * might not be appropriate for many applications, e.g. where tuning the
183 * number of worker threads is desired. (On single-core systems, a private
184 * task pool with a single thread is used by default, since the global
185 * taskPool instance has no worker threads then.)
186 *
187 * Note: TTaskPoolServer expects that tasks are never dropped from the pool,
188 * e.g. by calling TaskPool.close() while there are still tasks in the
189 * queue. If this happens, serve() will never return.
190 */
191 void taskPool(TaskPool pool) @property {
192 enforce(pool !is null, "Cannot use a null task pool.");
193 enforce(pool.size > 0, "Cannot use a task pool with no worker threads.");
194 taskPool_ = pool;
195 }
196
197 /**
198 * The maximum number of client connections open at the same time. Zero for
199 * no limit, which is the default.
200 *
201 * If this limit is reached, no clients are accept()ed from the server
202 * transport any longer until another connection has been closed again.
203 */
204 size_t maxActiveConns;
205
206protected:
207 TaskPool taskPool_;
208}
209
210// Cannot be private as worker has to be passed as alias parameter to
211// another module.
212// private {
213 /*
214 * The state of the »connection queue«, i.e. used for keeping track of how
215 * many client connections are currently processed.
216 */
217 struct QueueState {
218 /// Protects the queue state.
219 Mutex mutex;
220
221 /// The number of active connections (from the time they are accept()ed
222 /// until they are closed when the worked task finishes).
223 size_t activeConns;
224
225 /// Signals that the number of active connections has been decreased, i.e.
226 /// that a connection has been closed.
227 Condition connClosed;
228
229 /// Returns an initialized instance.
230 static QueueState opCall() {
231 QueueState q;
232 q.mutex = new Mutex;
233 q.connClosed = new Condition(q.mutex);
234 return q;
235 }
236 }
237
238 void worker(ref QueueState queueState, TTransport client,
239 TProtocol inputProtocol, TProtocol outputProtocol,
240 TProcessor processor, TServerEventHandler eventHandler)
241 {
242 scope (exit) {
243 synchronized (queueState.mutex) {
244 assert(queueState.activeConns > 0);
245 --queueState.activeConns;
246 queueState.connClosed.notifyAll();
247 }
248 }
249
250 Variant connectionContext;
251 if (eventHandler) {
252 connectionContext =
253 eventHandler.createContext(inputProtocol, outputProtocol);
254 }
255
256 try {
257 while (true) {
258 if (eventHandler) {
259 eventHandler.preProcess(connectionContext, client);
260 }
261
262 if (!processor.process(inputProtocol, outputProtocol,
263 connectionContext) || !inputProtocol.transport.peek()
264 ) {
265 // Something went fundamentlly wrong or there is nothing more to
266 // process, close the connection.
267 break;
268 }
269 }
270 } catch (TTransportException ttx) {
271 if (ttx.type() != TTransportException.Type.END_OF_FILE) {
272 logError("Client died unexpectedly: %s", ttx);
273 }
274 } catch (Exception e) {
275 logError("Uncaught exception: %s", e);
276 }
277
278 if (eventHandler) {
279 eventHandler.deleteContext(connectionContext, inputProtocol,
280 outputProtocol);
281 }
282
283 try {
284 inputProtocol.transport.close();
285 } catch (TTransportException ttx) {
286 logError("Input close failed: %s", ttx);
287 }
288 try {
289 outputProtocol.transport.close();
290 } catch (TTransportException ttx) {
291 logError("Output close failed: %s", ttx);
292 }
293 try {
294 client.close();
295 } catch (TTransportException ttx) {
296 logError("Client close failed: %s", ttx);
297 }
298 }
299// }
300
301unittest {
302 import thrift.internal.test.server;
303 testServeCancel!TTaskPoolServer();
304}