]> git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/thrift/lib/d/src/thrift/server/threaded.d
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / d / src / thrift / server / threaded.d
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 module thrift.server.threaded;
20
21 import core.thread;
22 import std.variant : Variant;
23 import thrift.base;
24 import thrift.protocol.base;
25 import thrift.protocol.processor;
26 import thrift.server.base;
27 import thrift.server.transport.base;
28 import thrift.transport.base;
29 import thrift.util.cancellation;
30
31 /**
32 * A simple threaded server which spawns a new thread per connection.
33 */
34 class TThreadedServer : TServer {
35 ///
36 this(
37 TProcessor processor,
38 TServerTransport serverTransport,
39 TTransportFactory transportFactory,
40 TProtocolFactory protocolFactory
41 ) {
42 super(processor, serverTransport, transportFactory, protocolFactory);
43 }
44
45 ///
46 this(
47 TProcessorFactory processorFactory,
48 TServerTransport serverTransport,
49 TTransportFactory transportFactory,
50 TProtocolFactory protocolFactory
51 ) {
52 super(processorFactory, serverTransport, transportFactory, protocolFactory);
53 }
54
55 ///
56 this(
57 TProcessor processor,
58 TServerTransport serverTransport,
59 TTransportFactory inputTransportFactory,
60 TTransportFactory outputTransportFactory,
61 TProtocolFactory inputProtocolFactory,
62 TProtocolFactory outputProtocolFactory
63 ) {
64 super(processor, serverTransport, inputTransportFactory,
65 outputTransportFactory, inputProtocolFactory, outputProtocolFactory);
66 }
67
68 ///
69 this(
70 TProcessorFactory processorFactory,
71 TServerTransport serverTransport,
72 TTransportFactory inputTransportFactory,
73 TTransportFactory outputTransportFactory,
74 TProtocolFactory inputProtocolFactory,
75 TProtocolFactory outputProtocolFactory
76 ) {
77 super(processorFactory, serverTransport, inputTransportFactory,
78 outputTransportFactory, inputProtocolFactory, outputProtocolFactory);
79 }
80
81 override void serve(TCancellation cancellation = null) {
82 try {
83 // Start the server listening
84 serverTransport_.listen();
85 } catch (TTransportException ttx) {
86 logError("listen() failed: %s", ttx);
87 return;
88 }
89
90 if (eventHandler) eventHandler.preServe();
91
92 auto workerThreads = new ThreadGroup();
93
94 while (true) {
95 TTransport client;
96 TTransport inputTransport;
97 TTransport outputTransport;
98 TProtocol inputProtocol;
99 TProtocol outputProtocol;
100
101 try {
102 client = serverTransport_.accept(cancellation);
103 scope(failure) client.close();
104
105 inputTransport = inputTransportFactory_.getTransport(client);
106 scope(failure) inputTransport.close();
107
108 outputTransport = outputTransportFactory_.getTransport(client);
109 scope(failure) outputTransport.close();
110
111 inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
112 outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
113 } catch (TCancelledException tce) {
114 break;
115 } catch (TTransportException ttx) {
116 logError("TServerTransport failed on accept: %s", ttx);
117 continue;
118 } catch (TException tx) {
119 logError("Caught TException on accept: %s", tx);
120 continue;
121 }
122
123 auto info = TConnectionInfo(inputProtocol, outputProtocol, client);
124 auto processor = processorFactory_.getProcessor(info);
125 auto worker = new WorkerThread(client, inputProtocol, outputProtocol,
126 processor, eventHandler);
127 workerThreads.add(worker);
128 worker.start();
129 }
130
131 try {
132 serverTransport_.close();
133 } catch (TServerTransportException e) {
134 logError("Server transport failed to close: %s", e);
135 }
136 workerThreads.joinAll();
137 }
138 }
139
140 // The worker thread handling a client connection.
141 private class WorkerThread : Thread {
142 this(TTransport client, TProtocol inputProtocol, TProtocol outputProtocol,
143 TProcessor processor, TServerEventHandler eventHandler)
144 {
145 client_ = client;
146 inputProtocol_ = inputProtocol;
147 outputProtocol_ = outputProtocol;
148 processor_ = processor;
149 eventHandler_ = eventHandler;
150
151 super(&run);
152 }
153
154 void run() {
155 Variant connectionContext;
156 if (eventHandler_) {
157 connectionContext =
158 eventHandler_.createContext(inputProtocol_, outputProtocol_);
159 }
160
161 try {
162 while (true) {
163 if (eventHandler_) {
164 eventHandler_.preProcess(connectionContext, client_);
165 }
166
167 if (!processor_.process(inputProtocol_, outputProtocol_,
168 connectionContext) || !inputProtocol_.transport.peek()
169 ) {
170 // Something went fundamentlly wrong or there is nothing more to
171 // process, close the connection.
172 break;
173 }
174 }
175 } catch (TTransportException ttx) {
176 if (ttx.type() != TTransportException.Type.END_OF_FILE) {
177 logError("Client died unexpectedly: %s", ttx);
178 }
179 } catch (Exception e) {
180 logError("Uncaught exception: %s", e);
181 }
182
183 if (eventHandler_) {
184 eventHandler_.deleteContext(connectionContext, inputProtocol_,
185 outputProtocol_);
186 }
187
188 try {
189 inputProtocol_.transport.close();
190 } catch (TTransportException ttx) {
191 logError("Input close failed: %s", ttx);
192 }
193 try {
194 outputProtocol_.transport.close();
195 } catch (TTransportException ttx) {
196 logError("Output close failed: %s", ttx);
197 }
198 try {
199 client_.close();
200 } catch (TTransportException ttx) {
201 logError("Client close failed: %s", ttx);
202 }
203 }
204
205 private:
206 TTransport client_;
207 TProtocol inputProtocol_;
208 TProtocol outputProtocol_;
209 TProcessor processor_;
210 TServerEventHandler eventHandler_;
211 }
212
213 unittest {
214 import thrift.internal.test.server;
215 testServeCancel!TThreadedServer();
216 }
217