]> git.proxmox.com Git - ceph.git/blame - ceph/src/jaegertracing/thrift/lib/cpp/src/thrift/transport/TSocketPool.cpp
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / cpp / src / thrift / transport / TSocketPool.cpp
CommitLineData
f67539c2
TL
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#include <thrift/thrift-config.h>
21
22#include <algorithm>
23#include <iostream>
24#include <memory>
25#if __cplusplus >= 201703L
26#include <random>
27#endif
28
29#include <thrift/transport/TSocketPool.h>
30
31using std::pair;
32using std::string;
33using std::vector;
34
35namespace apache {
36namespace thrift {
37namespace transport {
38
39using std::shared_ptr;
40
41/**
42 * TSocketPoolServer implementation
43 *
44 */
45TSocketPoolServer::TSocketPoolServer()
46 : host_(""), port_(0), socket_(THRIFT_INVALID_SOCKET), lastFailTime_(0), consecutiveFailures_(0) {
47}
48
49/**
50 * Constructor for TSocketPool server
51 */
52TSocketPoolServer::TSocketPoolServer(const string& host, int port)
53 : host_(host),
54 port_(port),
55 socket_(THRIFT_INVALID_SOCKET),
56 lastFailTime_(0),
57 consecutiveFailures_(0) {
58}
59
60/**
61 * TSocketPool implementation.
62 *
63 */
64
65TSocketPool::TSocketPool()
66 : TSocket(),
67 numRetries_(1),
68 retryInterval_(60),
69 maxConsecutiveFailures_(1),
70 randomize_(true),
71 alwaysTryLast_(true) {
72}
73
74TSocketPool::TSocketPool(const vector<string>& hosts, const vector<int>& ports)
75 : TSocket(),
76 numRetries_(1),
77 retryInterval_(60),
78 maxConsecutiveFailures_(1),
79 randomize_(true),
80 alwaysTryLast_(true) {
81 if (hosts.size() != ports.size()) {
82 GlobalOutput("TSocketPool::TSocketPool: hosts.size != ports.size");
83 throw TTransportException(TTransportException::BAD_ARGS);
84 }
85
86 for (unsigned int i = 0; i < hosts.size(); ++i) {
87 addServer(hosts[i], ports[i]);
88 }
89}
90
91TSocketPool::TSocketPool(const vector<pair<string, int> >& servers)
92 : TSocket(),
93 numRetries_(1),
94 retryInterval_(60),
95 maxConsecutiveFailures_(1),
96 randomize_(true),
97 alwaysTryLast_(true) {
98 for (const auto & server : servers) {
99 addServer(server.first, server.second);
100 }
101}
102
103TSocketPool::TSocketPool(const vector<shared_ptr<TSocketPoolServer> >& servers)
104 : TSocket(),
105 servers_(servers),
106 numRetries_(1),
107 retryInterval_(60),
108 maxConsecutiveFailures_(1),
109 randomize_(true),
110 alwaysTryLast_(true) {
111}
112
113TSocketPool::TSocketPool(const string& host, int port)
114 : TSocket(),
115 numRetries_(1),
116 retryInterval_(60),
117 maxConsecutiveFailures_(1),
118 randomize_(true),
119 alwaysTryLast_(true) {
120 addServer(host, port);
121}
122
123TSocketPool::~TSocketPool() {
124 vector<shared_ptr<TSocketPoolServer> >::const_iterator iter = servers_.begin();
125 vector<shared_ptr<TSocketPoolServer> >::const_iterator iterEnd = servers_.end();
126 for (; iter != iterEnd; ++iter) {
127 setCurrentServer(*iter);
128 TSocketPool::close();
129 }
130}
131
132void TSocketPool::addServer(const string& host, int port) {
133 servers_.push_back(std::make_shared<TSocketPoolServer>(host, port));
134}
135
136void TSocketPool::addServer(shared_ptr<TSocketPoolServer>& server) {
137 if (server) {
138 servers_.push_back(server);
139 }
140}
141
142void TSocketPool::setServers(const vector<shared_ptr<TSocketPoolServer> >& servers) {
143 servers_ = servers;
144}
145
146void TSocketPool::getServers(vector<shared_ptr<TSocketPoolServer> >& servers) {
147 servers = servers_;
148}
149
150void TSocketPool::setNumRetries(int numRetries) {
151 numRetries_ = numRetries;
152}
153
154void TSocketPool::setRetryInterval(int retryInterval) {
155 retryInterval_ = retryInterval;
156}
157
158void TSocketPool::setMaxConsecutiveFailures(int maxConsecutiveFailures) {
159 maxConsecutiveFailures_ = maxConsecutiveFailures;
160}
161
162void TSocketPool::setRandomize(bool randomize) {
163 randomize_ = randomize;
164}
165
166void TSocketPool::setAlwaysTryLast(bool alwaysTryLast) {
167 alwaysTryLast_ = alwaysTryLast;
168}
169
170void TSocketPool::setCurrentServer(const shared_ptr<TSocketPoolServer>& server) {
171 currentServer_ = server;
172 host_ = server->host_;
173 port_ = server->port_;
174 socket_ = server->socket_;
175}
176
177/**
178 * This function throws an exception if socket open fails. When socket
179 * opens fails, the socket in the current server is reset.
180 */
181/* TODO: without apc we ignore a lot of functionality from the php version */
182void TSocketPool::open() {
183
184 size_t numServers = servers_.size();
185 if (numServers == 0) {
186 socket_ = THRIFT_INVALID_SOCKET;
187 throw TTransportException(TTransportException::NOT_OPEN);
188 }
189
190 if (isOpen()) {
191 return;
192 }
193
194 if (randomize_ && numServers > 1) {
195#if __cplusplus >= 201703L
196 std::random_device rng;
197 std::mt19937 urng(rng());
198 std::shuffle(servers_.begin(), servers_.end(), urng);
199#else
200 std::random_shuffle(servers_.begin(), servers_.end());
201#endif
202 }
203
204 for (size_t i = 0; i < numServers; ++i) {
205
206 shared_ptr<TSocketPoolServer>& server = servers_[i];
207 // Impersonate the server socket
208 setCurrentServer(server);
209
210 if (isOpen()) {
211 // already open means we're done
212 return;
213 }
214
215 bool retryIntervalPassed = (server->lastFailTime_ == 0);
216 bool isLastServer = alwaysTryLast_ ? (i == (numServers - 1)) : false;
217
218 if (server->lastFailTime_ > 0) {
219 // The server was marked as down, so check if enough time has elapsed to retry
220 time_t elapsedTime = time(nullptr) - server->lastFailTime_;
221 if (elapsedTime > retryInterval_) {
222 retryIntervalPassed = true;
223 }
224 }
225
226 if (retryIntervalPassed || isLastServer) {
227 for (int j = 0; j < numRetries_; ++j) {
228 try {
229 TSocket::open();
230 } catch (const TException &e) {
231 string errStr = "TSocketPool::open failed " + getSocketInfo() + ": " + e.what();
232 GlobalOutput(errStr.c_str());
233 socket_ = THRIFT_INVALID_SOCKET;
234 continue;
235 }
236
237 // Copy over the opened socket so that we can keep it persistent
238 server->socket_ = socket_;
239 // reset lastFailTime_ is required
240 server->lastFailTime_ = 0;
241 // success
242 return;
243 }
244
245 ++server->consecutiveFailures_;
246 if (server->consecutiveFailures_ > maxConsecutiveFailures_) {
247 // Mark server as down
248 server->consecutiveFailures_ = 0;
249 server->lastFailTime_ = time(nullptr);
250 }
251 }
252 }
253
254 GlobalOutput("TSocketPool::open: all connections failed");
255 throw TTransportException(TTransportException::NOT_OPEN);
256}
257
258void TSocketPool::close() {
259 TSocket::close();
260 if (currentServer_) {
261 currentServer_->socket_ = THRIFT_INVALID_SOCKET;
262 }
263}
264}
265}
266} // apache::thrift::transport