2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
19 module thrift.server.threaded;
22 import std.variant : Variant;
24 import thrift.protocol.base;
25 import thrift.protocol.processor;
26 import thrift.server.base;
27 import thrift.server.transport.base;
28 import thrift.transport.base;
29 import thrift.util.cancellation;
32 * A simple threaded server which spawns a new thread per connection.
34 class TThreadedServer : TServer {
38 TServerTransport serverTransport,
39 TTransportFactory transportFactory,
40 TProtocolFactory protocolFactory
42 super(processor, serverTransport, transportFactory, protocolFactory);
47 TProcessorFactory processorFactory,
48 TServerTransport serverTransport,
49 TTransportFactory transportFactory,
50 TProtocolFactory protocolFactory
52 super(processorFactory, serverTransport, transportFactory, protocolFactory);
58 TServerTransport serverTransport,
59 TTransportFactory inputTransportFactory,
60 TTransportFactory outputTransportFactory,
61 TProtocolFactory inputProtocolFactory,
62 TProtocolFactory outputProtocolFactory
64 super(processor, serverTransport, inputTransportFactory,
65 outputTransportFactory, inputProtocolFactory, outputProtocolFactory);
70 TProcessorFactory processorFactory,
71 TServerTransport serverTransport,
72 TTransportFactory inputTransportFactory,
73 TTransportFactory outputTransportFactory,
74 TProtocolFactory inputProtocolFactory,
75 TProtocolFactory outputProtocolFactory
77 super(processorFactory, serverTransport, inputTransportFactory,
78 outputTransportFactory, inputProtocolFactory, outputProtocolFactory);
81 override void serve(TCancellation cancellation = null) {
83 // Start the server listening
84 serverTransport_.listen();
85 } catch (TTransportException ttx) {
86 logError("listen() failed: %s", ttx);
90 if (eventHandler) eventHandler.preServe();
92 auto workerThreads = new ThreadGroup();
96 TTransport inputTransport;
97 TTransport outputTransport;
98 TProtocol inputProtocol;
99 TProtocol outputProtocol;
102 client = serverTransport_.accept(cancellation);
103 scope(failure) client.close();
105 inputTransport = inputTransportFactory_.getTransport(client);
106 scope(failure) inputTransport.close();
108 outputTransport = outputTransportFactory_.getTransport(client);
109 scope(failure) outputTransport.close();
111 inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
112 outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
113 } catch (TCancelledException tce) {
115 } catch (TTransportException ttx) {
116 logError("TServerTransport failed on accept: %s", ttx);
118 } catch (TException tx) {
119 logError("Caught TException on accept: %s", tx);
123 auto info = TConnectionInfo(inputProtocol, outputProtocol, client);
124 auto processor = processorFactory_.getProcessor(info);
125 auto worker = new WorkerThread(client, inputProtocol, outputProtocol,
126 processor, eventHandler);
127 workerThreads.add(worker);
132 serverTransport_.close();
133 } catch (TServerTransportException e) {
134 logError("Server transport failed to close: %s", e);
136 workerThreads.joinAll();
140 // The worker thread handling a client connection.
141 private class WorkerThread : Thread {
142 this(TTransport client, TProtocol inputProtocol, TProtocol outputProtocol,
143 TProcessor processor, TServerEventHandler eventHandler)
146 inputProtocol_ = inputProtocol;
147 outputProtocol_ = outputProtocol;
148 processor_ = processor;
149 eventHandler_ = eventHandler;
155 Variant connectionContext;
158 eventHandler_.createContext(inputProtocol_, outputProtocol_);
164 eventHandler_.preProcess(connectionContext, client_);
167 if (!processor_.process(inputProtocol_, outputProtocol_,
168 connectionContext) || !inputProtocol_.transport.peek()
170 // Something went fundamentlly wrong or there is nothing more to
171 // process, close the connection.
175 } catch (TTransportException ttx) {
176 if (ttx.type() != TTransportException.Type.END_OF_FILE) {
177 logError("Client died unexpectedly: %s", ttx);
179 } catch (Exception e) {
180 logError("Uncaught exception: %s", e);
184 eventHandler_.deleteContext(connectionContext, inputProtocol_,
189 inputProtocol_.transport.close();
190 } catch (TTransportException ttx) {
191 logError("Input close failed: %s", ttx);
194 outputProtocol_.transport.close();
195 } catch (TTransportException ttx) {
196 logError("Output close failed: %s", ttx);
200 } catch (TTransportException ttx) {
201 logError("Client close failed: %s", ttx);
207 TProtocol inputProtocol_;
208 TProtocol outputProtocol_;
209 TProcessor processor_;
210 TServerEventHandler eventHandler_;
214 import thrift.internal.test.server;
215 testServeCancel!TThreadedServer();