]> git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/thrift/lib/cpp/src/thrift/server/TServerFramework.cpp
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / cpp / src / thrift / server / TServerFramework.cpp
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #include <algorithm>
21 #include <stdexcept>
22 #include <stdint.h>
23 #include <thrift/server/TServerFramework.h>
24
25 namespace apache {
26 namespace thrift {
27 namespace server {
28
29 using apache::thrift::concurrency::Synchronized;
30 using apache::thrift::protocol::TProtocol;
31 using apache::thrift::protocol::TProtocolFactory;
32 using std::bind;
33 using std::shared_ptr;
34 using apache::thrift::transport::TServerTransport;
35 using apache::thrift::transport::TTransport;
36 using apache::thrift::transport::TTransportException;
37 using apache::thrift::transport::TTransportFactory;
38 using std::string;
39
40 TServerFramework::TServerFramework(const shared_ptr<TProcessorFactory>& processorFactory,
41 const shared_ptr<TServerTransport>& serverTransport,
42 const shared_ptr<TTransportFactory>& transportFactory,
43 const shared_ptr<TProtocolFactory>& protocolFactory)
44 : TServer(processorFactory, serverTransport, transportFactory, protocolFactory),
45 clients_(0),
46 hwm_(0),
47 limit_(INT64_MAX) {
48 }
49
50 TServerFramework::TServerFramework(const shared_ptr<TProcessor>& processor,
51 const shared_ptr<TServerTransport>& serverTransport,
52 const shared_ptr<TTransportFactory>& transportFactory,
53 const shared_ptr<TProtocolFactory>& protocolFactory)
54 : TServer(processor, serverTransport, transportFactory, protocolFactory),
55 clients_(0),
56 hwm_(0),
57 limit_(INT64_MAX) {
58 }
59
60 TServerFramework::TServerFramework(const shared_ptr<TProcessorFactory>& processorFactory,
61 const shared_ptr<TServerTransport>& serverTransport,
62 const shared_ptr<TTransportFactory>& inputTransportFactory,
63 const shared_ptr<TTransportFactory>& outputTransportFactory,
64 const shared_ptr<TProtocolFactory>& inputProtocolFactory,
65 const shared_ptr<TProtocolFactory>& outputProtocolFactory)
66 : TServer(processorFactory,
67 serverTransport,
68 inputTransportFactory,
69 outputTransportFactory,
70 inputProtocolFactory,
71 outputProtocolFactory),
72 clients_(0),
73 hwm_(0),
74 limit_(INT64_MAX) {
75 }
76
77 TServerFramework::TServerFramework(const shared_ptr<TProcessor>& processor,
78 const shared_ptr<TServerTransport>& serverTransport,
79 const shared_ptr<TTransportFactory>& inputTransportFactory,
80 const shared_ptr<TTransportFactory>& outputTransportFactory,
81 const shared_ptr<TProtocolFactory>& inputProtocolFactory,
82 const shared_ptr<TProtocolFactory>& outputProtocolFactory)
83 : TServer(processor,
84 serverTransport,
85 inputTransportFactory,
86 outputTransportFactory,
87 inputProtocolFactory,
88 outputProtocolFactory),
89 clients_(0),
90 hwm_(0),
91 limit_(INT64_MAX) {
92 }
93
94 TServerFramework::~TServerFramework() = default;
95
96 template <typename T>
97 static void releaseOneDescriptor(const string& name, T& pTransport) {
98 if (pTransport) {
99 try {
100 pTransport->close();
101 } catch (const TTransportException& ttx) {
102 string errStr = string("TServerFramework " + name + " close failed: ") + ttx.what();
103 GlobalOutput(errStr.c_str());
104 }
105 }
106 }
107
108 void TServerFramework::serve() {
109 shared_ptr<TTransport> client;
110 shared_ptr<TTransport> inputTransport;
111 shared_ptr<TTransport> outputTransport;
112 shared_ptr<TProtocol> inputProtocol;
113 shared_ptr<TProtocol> outputProtocol;
114
115 // Start the server listening
116 serverTransport_->listen();
117
118 // Run the preServe event to indicate server is now listening
119 // and that it is safe to connect.
120 if (eventHandler_) {
121 eventHandler_->preServe();
122 }
123
124 // Fetch client from server
125 for (;;) {
126 try {
127 // Dereference any resources from any previous client creation
128 // such that a blocking accept does not hold them indefinitely.
129 outputProtocol.reset();
130 inputProtocol.reset();
131 outputTransport.reset();
132 inputTransport.reset();
133 client.reset();
134
135 // If we have reached the limit on the number of concurrent
136 // clients allowed, wait for one or more clients to drain before
137 // accepting another.
138 {
139 Synchronized sync(mon_);
140 while (clients_ >= limit_) {
141 mon_.wait();
142 }
143 }
144
145 client = serverTransport_->accept();
146
147 inputTransport = inputTransportFactory_->getTransport(client);
148 outputTransport = outputTransportFactory_->getTransport(client);
149 if (!outputProtocolFactory_) {
150 inputProtocol = inputProtocolFactory_->getProtocol(inputTransport, outputTransport);
151 outputProtocol = inputProtocol;
152 } else {
153 inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
154 outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
155 }
156
157 newlyConnectedClient(shared_ptr<TConnectedClient>(
158 new TConnectedClient(getProcessor(inputProtocol, outputProtocol, client),
159 inputProtocol,
160 outputProtocol,
161 eventHandler_,
162 client),
163 bind(&TServerFramework::disposeConnectedClient, this, std::placeholders::_1)));
164
165 } catch (TTransportException& ttx) {
166 releaseOneDescriptor("inputTransport", inputTransport);
167 releaseOneDescriptor("outputTransport", outputTransport);
168 releaseOneDescriptor("client", client);
169 if (ttx.getType() == TTransportException::TIMED_OUT) {
170 // Accept timeout - continue processing.
171 continue;
172 } else if (ttx.getType() == TTransportException::END_OF_FILE
173 || ttx.getType() == TTransportException::INTERRUPTED) {
174 // Server was interrupted. This only happens when stopping.
175 break;
176 } else {
177 // All other transport exceptions are logged.
178 // State of connection is unknown. Done.
179 string errStr = string("TServerTransport died: ") + ttx.what();
180 GlobalOutput(errStr.c_str());
181 break;
182 }
183 }
184 }
185
186 releaseOneDescriptor("serverTransport", serverTransport_);
187 }
188
189 int64_t TServerFramework::getConcurrentClientLimit() const {
190 Synchronized sync(mon_);
191 return limit_;
192 }
193
194 int64_t TServerFramework::getConcurrentClientCount() const {
195 Synchronized sync(mon_);
196 return clients_;
197 }
198
199 int64_t TServerFramework::getConcurrentClientCountHWM() const {
200 Synchronized sync(mon_);
201 return hwm_;
202 }
203
204 void TServerFramework::setConcurrentClientLimit(int64_t newLimit) {
205 if (newLimit < 1) {
206 throw std::invalid_argument("newLimit must be greater than zero");
207 }
208 Synchronized sync(mon_);
209 limit_ = newLimit;
210 if (limit_ - clients_ > 0) {
211 mon_.notify();
212 }
213 }
214
215 void TServerFramework::stop() {
216 // Order is important because serve() releases serverTransport_ when it is
217 // interrupted, which closes the socket that interruptChildren uses.
218 serverTransport_->interruptChildren();
219 serverTransport_->interrupt();
220 }
221
222 void TServerFramework::newlyConnectedClient(const shared_ptr<TConnectedClient>& pClient) {
223 {
224 Synchronized sync(mon_);
225 ++clients_;
226 hwm_ = (std::max)(hwm_, clients_);
227 }
228
229 onClientConnected(pClient);
230 }
231
232 void TServerFramework::disposeConnectedClient(TConnectedClient* pClient) {
233 onClientDisconnected(pClient);
234 delete pClient;
235
236 Synchronized sync(mon_);
237 if (limit_ - --clients_ > 0) {
238 mon_.notify();
239 }
240 }
241
242 }
243 }
244 } // apache::thrift::server