]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, | |
13 | * software distributed under the License is distributed on an | |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | * KIND, either express or implied. See the License for the | |
16 | * specific language governing permissions and limitations | |
17 | * under the License. | |
18 | */ | |
19 | module thrift.server.threaded; | |
20 | ||
21 | import core.thread; | |
22 | import std.variant : Variant; | |
23 | import thrift.base; | |
24 | import thrift.protocol.base; | |
25 | import thrift.protocol.processor; | |
26 | import thrift.server.base; | |
27 | import thrift.server.transport.base; | |
28 | import thrift.transport.base; | |
29 | import thrift.util.cancellation; | |
30 | ||
31 | /** | |
32 | * A simple threaded server which spawns a new thread per connection. | |
33 | */ | |
34 | class TThreadedServer : TServer { | |
35 | /// | |
36 | this( | |
37 | TProcessor processor, | |
38 | TServerTransport serverTransport, | |
39 | TTransportFactory transportFactory, | |
40 | TProtocolFactory protocolFactory | |
41 | ) { | |
42 | super(processor, serverTransport, transportFactory, protocolFactory); | |
43 | } | |
44 | ||
45 | /// | |
46 | this( | |
47 | TProcessorFactory processorFactory, | |
48 | TServerTransport serverTransport, | |
49 | TTransportFactory transportFactory, | |
50 | TProtocolFactory protocolFactory | |
51 | ) { | |
52 | super(processorFactory, serverTransport, transportFactory, protocolFactory); | |
53 | } | |
54 | ||
55 | /// | |
56 | this( | |
57 | TProcessor processor, | |
58 | TServerTransport serverTransport, | |
59 | TTransportFactory inputTransportFactory, | |
60 | TTransportFactory outputTransportFactory, | |
61 | TProtocolFactory inputProtocolFactory, | |
62 | TProtocolFactory outputProtocolFactory | |
63 | ) { | |
64 | super(processor, serverTransport, inputTransportFactory, | |
65 | outputTransportFactory, inputProtocolFactory, outputProtocolFactory); | |
66 | } | |
67 | ||
68 | /// | |
69 | this( | |
70 | TProcessorFactory processorFactory, | |
71 | TServerTransport serverTransport, | |
72 | TTransportFactory inputTransportFactory, | |
73 | TTransportFactory outputTransportFactory, | |
74 | TProtocolFactory inputProtocolFactory, | |
75 | TProtocolFactory outputProtocolFactory | |
76 | ) { | |
77 | super(processorFactory, serverTransport, inputTransportFactory, | |
78 | outputTransportFactory, inputProtocolFactory, outputProtocolFactory); | |
79 | } | |
80 | ||
81 | override void serve(TCancellation cancellation = null) { | |
82 | try { | |
83 | // Start the server listening | |
84 | serverTransport_.listen(); | |
85 | } catch (TTransportException ttx) { | |
86 | logError("listen() failed: %s", ttx); | |
87 | return; | |
88 | } | |
89 | ||
90 | if (eventHandler) eventHandler.preServe(); | |
91 | ||
92 | auto workerThreads = new ThreadGroup(); | |
93 | ||
94 | while (true) { | |
95 | TTransport client; | |
96 | TTransport inputTransport; | |
97 | TTransport outputTransport; | |
98 | TProtocol inputProtocol; | |
99 | TProtocol outputProtocol; | |
100 | ||
101 | try { | |
102 | client = serverTransport_.accept(cancellation); | |
103 | scope(failure) client.close(); | |
104 | ||
105 | inputTransport = inputTransportFactory_.getTransport(client); | |
106 | scope(failure) inputTransport.close(); | |
107 | ||
108 | outputTransport = outputTransportFactory_.getTransport(client); | |
109 | scope(failure) outputTransport.close(); | |
110 | ||
111 | inputProtocol = inputProtocolFactory_.getProtocol(inputTransport); | |
112 | outputProtocol = outputProtocolFactory_.getProtocol(outputTransport); | |
113 | } catch (TCancelledException tce) { | |
114 | break; | |
115 | } catch (TTransportException ttx) { | |
116 | logError("TServerTransport failed on accept: %s", ttx); | |
117 | continue; | |
118 | } catch (TException tx) { | |
119 | logError("Caught TException on accept: %s", tx); | |
120 | continue; | |
121 | } | |
122 | ||
123 | auto info = TConnectionInfo(inputProtocol, outputProtocol, client); | |
124 | auto processor = processorFactory_.getProcessor(info); | |
125 | auto worker = new WorkerThread(client, inputProtocol, outputProtocol, | |
126 | processor, eventHandler); | |
127 | workerThreads.add(worker); | |
128 | worker.start(); | |
129 | } | |
130 | ||
131 | try { | |
132 | serverTransport_.close(); | |
133 | } catch (TServerTransportException e) { | |
134 | logError("Server transport failed to close: %s", e); | |
135 | } | |
136 | workerThreads.joinAll(); | |
137 | } | |
138 | } | |
139 | ||
140 | // The worker thread handling a client connection. | |
141 | private class WorkerThread : Thread { | |
142 | this(TTransport client, TProtocol inputProtocol, TProtocol outputProtocol, | |
143 | TProcessor processor, TServerEventHandler eventHandler) | |
144 | { | |
145 | client_ = client; | |
146 | inputProtocol_ = inputProtocol; | |
147 | outputProtocol_ = outputProtocol; | |
148 | processor_ = processor; | |
149 | eventHandler_ = eventHandler; | |
150 | ||
151 | super(&run); | |
152 | } | |
153 | ||
154 | void run() { | |
155 | Variant connectionContext; | |
156 | if (eventHandler_) { | |
157 | connectionContext = | |
158 | eventHandler_.createContext(inputProtocol_, outputProtocol_); | |
159 | } | |
160 | ||
161 | try { | |
162 | while (true) { | |
163 | if (eventHandler_) { | |
164 | eventHandler_.preProcess(connectionContext, client_); | |
165 | } | |
166 | ||
167 | if (!processor_.process(inputProtocol_, outputProtocol_, | |
168 | connectionContext) || !inputProtocol_.transport.peek() | |
169 | ) { | |
170 | // Something went fundamentlly wrong or there is nothing more to | |
171 | // process, close the connection. | |
172 | break; | |
173 | } | |
174 | } | |
175 | } catch (TTransportException ttx) { | |
176 | if (ttx.type() != TTransportException.Type.END_OF_FILE) { | |
177 | logError("Client died unexpectedly: %s", ttx); | |
178 | } | |
179 | } catch (Exception e) { | |
180 | logError("Uncaught exception: %s", e); | |
181 | } | |
182 | ||
183 | if (eventHandler_) { | |
184 | eventHandler_.deleteContext(connectionContext, inputProtocol_, | |
185 | outputProtocol_); | |
186 | } | |
187 | ||
188 | try { | |
189 | inputProtocol_.transport.close(); | |
190 | } catch (TTransportException ttx) { | |
191 | logError("Input close failed: %s", ttx); | |
192 | } | |
193 | try { | |
194 | outputProtocol_.transport.close(); | |
195 | } catch (TTransportException ttx) { | |
196 | logError("Output close failed: %s", ttx); | |
197 | } | |
198 | try { | |
199 | client_.close(); | |
200 | } catch (TTransportException ttx) { | |
201 | logError("Client close failed: %s", ttx); | |
202 | } | |
203 | } | |
204 | ||
205 | private: | |
206 | TTransport client_; | |
207 | TProtocol inputProtocol_; | |
208 | TProtocol outputProtocol_; | |
209 | TProcessor processor_; | |
210 | TServerEventHandler eventHandler_; | |
211 | } | |
212 | ||
213 | unittest { | |
214 | import thrift.internal.test.server; | |
215 | testServeCancel!TThreadedServer(); | |
216 | } | |
217 |