]>
git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/thrift/lib/cpp/src/thrift/transport/TSocketPool.cpp
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
20 #include <thrift/thrift-config.h>
25 #if __cplusplus >= 201703L
29 #include <thrift/transport/TSocketPool.h>
39 using std::shared_ptr
;
42 * TSocketPoolServer implementation
45 TSocketPoolServer::TSocketPoolServer()
46 : host_(""), port_(0), socket_(THRIFT_INVALID_SOCKET
), lastFailTime_(0), consecutiveFailures_(0) {
50 * Constructor for TSocketPool server
52 TSocketPoolServer::TSocketPoolServer(const string
& host
, int port
)
55 socket_(THRIFT_INVALID_SOCKET
),
57 consecutiveFailures_(0) {
61 * TSocketPool implementation.
65 TSocketPool::TSocketPool()
69 maxConsecutiveFailures_(1),
71 alwaysTryLast_(true) {
74 TSocketPool::TSocketPool(const vector
<string
>& hosts
, const vector
<int>& ports
)
78 maxConsecutiveFailures_(1),
80 alwaysTryLast_(true) {
81 if (hosts
.size() != ports
.size()) {
82 GlobalOutput("TSocketPool::TSocketPool: hosts.size != ports.size");
83 throw TTransportException(TTransportException::BAD_ARGS
);
86 for (unsigned int i
= 0; i
< hosts
.size(); ++i
) {
87 addServer(hosts
[i
], ports
[i
]);
91 TSocketPool::TSocketPool(const vector
<pair
<string
, int> >& servers
)
95 maxConsecutiveFailures_(1),
97 alwaysTryLast_(true) {
98 for (const auto & server
: servers
) {
99 addServer(server
.first
, server
.second
);
103 TSocketPool::TSocketPool(const vector
<shared_ptr
<TSocketPoolServer
> >& servers
)
108 maxConsecutiveFailures_(1),
110 alwaysTryLast_(true) {
113 TSocketPool::TSocketPool(const string
& host
, int port
)
117 maxConsecutiveFailures_(1),
119 alwaysTryLast_(true) {
120 addServer(host
, port
);
123 TSocketPool::~TSocketPool() {
124 vector
<shared_ptr
<TSocketPoolServer
> >::const_iterator iter
= servers_
.begin();
125 vector
<shared_ptr
<TSocketPoolServer
> >::const_iterator iterEnd
= servers_
.end();
126 for (; iter
!= iterEnd
; ++iter
) {
127 setCurrentServer(*iter
);
128 TSocketPool::close();
132 void TSocketPool::addServer(const string
& host
, int port
) {
133 servers_
.push_back(std::make_shared
<TSocketPoolServer
>(host
, port
));
136 void TSocketPool::addServer(shared_ptr
<TSocketPoolServer
>& server
) {
138 servers_
.push_back(server
);
142 void TSocketPool::setServers(const vector
<shared_ptr
<TSocketPoolServer
> >& servers
) {
146 void TSocketPool::getServers(vector
<shared_ptr
<TSocketPoolServer
> >& servers
) {
150 void TSocketPool::setNumRetries(int numRetries
) {
151 numRetries_
= numRetries
;
154 void TSocketPool::setRetryInterval(int retryInterval
) {
155 retryInterval_
= retryInterval
;
158 void TSocketPool::setMaxConsecutiveFailures(int maxConsecutiveFailures
) {
159 maxConsecutiveFailures_
= maxConsecutiveFailures
;
162 void TSocketPool::setRandomize(bool randomize
) {
163 randomize_
= randomize
;
166 void TSocketPool::setAlwaysTryLast(bool alwaysTryLast
) {
167 alwaysTryLast_
= alwaysTryLast
;
170 void TSocketPool::setCurrentServer(const shared_ptr
<TSocketPoolServer
>& server
) {
171 currentServer_
= server
;
172 host_
= server
->host_
;
173 port_
= server
->port_
;
174 socket_
= server
->socket_
;
178 * This function throws an exception if socket open fails. When socket
179 * opens fails, the socket in the current server is reset.
181 /* TODO: without apc we ignore a lot of functionality from the php version */
182 void TSocketPool::open() {
184 size_t numServers
= servers_
.size();
185 if (numServers
== 0) {
186 socket_
= THRIFT_INVALID_SOCKET
;
187 throw TTransportException(TTransportException::NOT_OPEN
);
194 if (randomize_
&& numServers
> 1) {
195 #if __cplusplus >= 201703L
196 std::random_device rng
;
197 std::mt19937
urng(rng());
198 std::shuffle(servers_
.begin(), servers_
.end(), urng
);
200 std::random_shuffle(servers_
.begin(), servers_
.end());
204 for (size_t i
= 0; i
< numServers
; ++i
) {
206 shared_ptr
<TSocketPoolServer
>& server
= servers_
[i
];
207 // Impersonate the server socket
208 setCurrentServer(server
);
211 // already open means we're done
215 bool retryIntervalPassed
= (server
->lastFailTime_
== 0);
216 bool isLastServer
= alwaysTryLast_
? (i
== (numServers
- 1)) : false;
218 if (server
->lastFailTime_
> 0) {
219 // The server was marked as down, so check if enough time has elapsed to retry
220 time_t elapsedTime
= time(nullptr) - server
->lastFailTime_
;
221 if (elapsedTime
> retryInterval_
) {
222 retryIntervalPassed
= true;
226 if (retryIntervalPassed
|| isLastServer
) {
227 for (int j
= 0; j
< numRetries_
; ++j
) {
230 } catch (const TException
&e
) {
231 string errStr
= "TSocketPool::open failed " + getSocketInfo() + ": " + e
.what();
232 GlobalOutput(errStr
.c_str());
233 socket_
= THRIFT_INVALID_SOCKET
;
237 // Copy over the opened socket so that we can keep it persistent
238 server
->socket_
= socket_
;
239 // reset lastFailTime_ is required
240 server
->lastFailTime_
= 0;
245 ++server
->consecutiveFailures_
;
246 if (server
->consecutiveFailures_
> maxConsecutiveFailures_
) {
247 // Mark server as down
248 server
->consecutiveFailures_
= 0;
249 server
->lastFailTime_
= time(nullptr);
254 GlobalOutput("TSocketPool::open: all connections failed");
255 throw TTransportException(TTransportException::NOT_OPEN
);
258 void TSocketPool::close() {
260 if (currentServer_
) {
261 currentServer_
->socket_
= THRIFT_INVALID_SOCKET
;
266 } // apache::thrift::transport