]>
git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/thrift/lib/cpp/src/thrift/server/TServerFramework.cpp
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
23 #include <thrift/server/TServerFramework.h>
29 using apache::thrift::concurrency::Synchronized
;
30 using apache::thrift::protocol::TProtocol
;
31 using apache::thrift::protocol::TProtocolFactory
;
33 using std::shared_ptr
;
34 using apache::thrift::transport::TServerTransport
;
35 using apache::thrift::transport::TTransport
;
36 using apache::thrift::transport::TTransportException
;
37 using apache::thrift::transport::TTransportFactory
;
40 TServerFramework::TServerFramework(const shared_ptr
<TProcessorFactory
>& processorFactory
,
41 const shared_ptr
<TServerTransport
>& serverTransport
,
42 const shared_ptr
<TTransportFactory
>& transportFactory
,
43 const shared_ptr
<TProtocolFactory
>& protocolFactory
)
44 : TServer(processorFactory
, serverTransport
, transportFactory
, protocolFactory
),
50 TServerFramework::TServerFramework(const shared_ptr
<TProcessor
>& processor
,
51 const shared_ptr
<TServerTransport
>& serverTransport
,
52 const shared_ptr
<TTransportFactory
>& transportFactory
,
53 const shared_ptr
<TProtocolFactory
>& protocolFactory
)
54 : TServer(processor
, serverTransport
, transportFactory
, protocolFactory
),
60 TServerFramework::TServerFramework(const shared_ptr
<TProcessorFactory
>& processorFactory
,
61 const shared_ptr
<TServerTransport
>& serverTransport
,
62 const shared_ptr
<TTransportFactory
>& inputTransportFactory
,
63 const shared_ptr
<TTransportFactory
>& outputTransportFactory
,
64 const shared_ptr
<TProtocolFactory
>& inputProtocolFactory
,
65 const shared_ptr
<TProtocolFactory
>& outputProtocolFactory
)
66 : TServer(processorFactory
,
68 inputTransportFactory
,
69 outputTransportFactory
,
71 outputProtocolFactory
),
77 TServerFramework::TServerFramework(const shared_ptr
<TProcessor
>& processor
,
78 const shared_ptr
<TServerTransport
>& serverTransport
,
79 const shared_ptr
<TTransportFactory
>& inputTransportFactory
,
80 const shared_ptr
<TTransportFactory
>& outputTransportFactory
,
81 const shared_ptr
<TProtocolFactory
>& inputProtocolFactory
,
82 const shared_ptr
<TProtocolFactory
>& outputProtocolFactory
)
85 inputTransportFactory
,
86 outputTransportFactory
,
88 outputProtocolFactory
),
94 TServerFramework::~TServerFramework() = default;
97 static void releaseOneDescriptor(const string
& name
, T
& pTransport
) {
101 } catch (const TTransportException
& ttx
) {
102 string errStr
= string("TServerFramework " + name
+ " close failed: ") + ttx
.what();
103 GlobalOutput(errStr
.c_str());
108 void TServerFramework::serve() {
109 shared_ptr
<TTransport
> client
;
110 shared_ptr
<TTransport
> inputTransport
;
111 shared_ptr
<TTransport
> outputTransport
;
112 shared_ptr
<TProtocol
> inputProtocol
;
113 shared_ptr
<TProtocol
> outputProtocol
;
115 // Start the server listening
116 serverTransport_
->listen();
118 // Run the preServe event to indicate server is now listening
119 // and that it is safe to connect.
121 eventHandler_
->preServe();
124 // Fetch client from server
127 // Dereference any resources from any previous client creation
128 // such that a blocking accept does not hold them indefinitely.
129 outputProtocol
.reset();
130 inputProtocol
.reset();
131 outputTransport
.reset();
132 inputTransport
.reset();
135 // If we have reached the limit on the number of concurrent
136 // clients allowed, wait for one or more clients to drain before
137 // accepting another.
139 Synchronized
sync(mon_
);
140 while (clients_
>= limit_
) {
145 client
= serverTransport_
->accept();
147 inputTransport
= inputTransportFactory_
->getTransport(client
);
148 outputTransport
= outputTransportFactory_
->getTransport(client
);
149 if (!outputProtocolFactory_
) {
150 inputProtocol
= inputProtocolFactory_
->getProtocol(inputTransport
, outputTransport
);
151 outputProtocol
= inputProtocol
;
153 inputProtocol
= inputProtocolFactory_
->getProtocol(inputTransport
);
154 outputProtocol
= outputProtocolFactory_
->getProtocol(outputTransport
);
157 newlyConnectedClient(shared_ptr
<TConnectedClient
>(
158 new TConnectedClient(getProcessor(inputProtocol
, outputProtocol
, client
),
163 bind(&TServerFramework::disposeConnectedClient
, this, std::placeholders::_1
)));
165 } catch (TTransportException
& ttx
) {
166 releaseOneDescriptor("inputTransport", inputTransport
);
167 releaseOneDescriptor("outputTransport", outputTransport
);
168 releaseOneDescriptor("client", client
);
169 if (ttx
.getType() == TTransportException::TIMED_OUT
) {
170 // Accept timeout - continue processing.
172 } else if (ttx
.getType() == TTransportException::END_OF_FILE
173 || ttx
.getType() == TTransportException::INTERRUPTED
) {
174 // Server was interrupted. This only happens when stopping.
177 // All other transport exceptions are logged.
178 // State of connection is unknown. Done.
179 string errStr
= string("TServerTransport died: ") + ttx
.what();
180 GlobalOutput(errStr
.c_str());
186 releaseOneDescriptor("serverTransport", serverTransport_
);
189 int64_t TServerFramework::getConcurrentClientLimit() const {
190 Synchronized
sync(mon_
);
194 int64_t TServerFramework::getConcurrentClientCount() const {
195 Synchronized
sync(mon_
);
199 int64_t TServerFramework::getConcurrentClientCountHWM() const {
200 Synchronized
sync(mon_
);
204 void TServerFramework::setConcurrentClientLimit(int64_t newLimit
) {
206 throw std::invalid_argument("newLimit must be greater than zero");
208 Synchronized
sync(mon_
);
210 if (limit_
- clients_
> 0) {
215 void TServerFramework::stop() {
216 // Order is important because serve() releases serverTransport_ when it is
217 // interrupted, which closes the socket that interruptChildren uses.
218 serverTransport_
->interruptChildren();
219 serverTransport_
->interrupt();
222 void TServerFramework::newlyConnectedClient(const shared_ptr
<TConnectedClient
>& pClient
) {
224 Synchronized
sync(mon_
);
226 hwm_
= (std::max
)(hwm_
, clients_
);
229 onClientConnected(pClient
);
232 void TServerFramework::disposeConnectedClient(TConnectedClient
* pClient
) {
233 onClientDisconnected(pClient
);
236 Synchronized
sync(mon_
);
237 if (limit_
- --clients_
> 0) {
244 } // apache::thrift::server