]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, | |
13 | * software distributed under the License is distributed on an | |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | * KIND, either express or implied. See the License for the | |
16 | * specific language governing permissions and limitations | |
17 | * under the License. | |
18 | */ | |
19 | ||
20 | #include <thrift/thrift-config.h> | |
21 | ||
22 | #include <algorithm> | |
23 | #include <iostream> | |
24 | #include <memory> | |
25 | #if __cplusplus >= 201703L | |
26 | #include <random> | |
27 | #endif | |
28 | ||
29 | #include <thrift/transport/TSocketPool.h> | |
30 | ||
31 | using std::pair; | |
32 | using std::string; | |
33 | using std::vector; | |
34 | ||
35 | namespace apache { | |
36 | namespace thrift { | |
37 | namespace transport { | |
38 | ||
39 | using std::shared_ptr; | |
40 | ||
41 | /** | |
42 | * TSocketPoolServer implementation | |
43 | * | |
44 | */ | |
45 | TSocketPoolServer::TSocketPoolServer() | |
46 | : host_(""), port_(0), socket_(THRIFT_INVALID_SOCKET), lastFailTime_(0), consecutiveFailures_(0) { | |
47 | } | |
48 | ||
49 | /** | |
50 | * Constructor for TSocketPool server | |
51 | */ | |
52 | TSocketPoolServer::TSocketPoolServer(const string& host, int port) | |
53 | : host_(host), | |
54 | port_(port), | |
55 | socket_(THRIFT_INVALID_SOCKET), | |
56 | lastFailTime_(0), | |
57 | consecutiveFailures_(0) { | |
58 | } | |
59 | ||
60 | /** | |
61 | * TSocketPool implementation. | |
62 | * | |
63 | */ | |
64 | ||
65 | TSocketPool::TSocketPool() | |
66 | : TSocket(), | |
67 | numRetries_(1), | |
68 | retryInterval_(60), | |
69 | maxConsecutiveFailures_(1), | |
70 | randomize_(true), | |
71 | alwaysTryLast_(true) { | |
72 | } | |
73 | ||
74 | TSocketPool::TSocketPool(const vector<string>& hosts, const vector<int>& ports) | |
75 | : TSocket(), | |
76 | numRetries_(1), | |
77 | retryInterval_(60), | |
78 | maxConsecutiveFailures_(1), | |
79 | randomize_(true), | |
80 | alwaysTryLast_(true) { | |
81 | if (hosts.size() != ports.size()) { | |
82 | GlobalOutput("TSocketPool::TSocketPool: hosts.size != ports.size"); | |
83 | throw TTransportException(TTransportException::BAD_ARGS); | |
84 | } | |
85 | ||
86 | for (unsigned int i = 0; i < hosts.size(); ++i) { | |
87 | addServer(hosts[i], ports[i]); | |
88 | } | |
89 | } | |
90 | ||
91 | TSocketPool::TSocketPool(const vector<pair<string, int> >& servers) | |
92 | : TSocket(), | |
93 | numRetries_(1), | |
94 | retryInterval_(60), | |
95 | maxConsecutiveFailures_(1), | |
96 | randomize_(true), | |
97 | alwaysTryLast_(true) { | |
98 | for (const auto & server : servers) { | |
99 | addServer(server.first, server.second); | |
100 | } | |
101 | } | |
102 | ||
103 | TSocketPool::TSocketPool(const vector<shared_ptr<TSocketPoolServer> >& servers) | |
104 | : TSocket(), | |
105 | servers_(servers), | |
106 | numRetries_(1), | |
107 | retryInterval_(60), | |
108 | maxConsecutiveFailures_(1), | |
109 | randomize_(true), | |
110 | alwaysTryLast_(true) { | |
111 | } | |
112 | ||
113 | TSocketPool::TSocketPool(const string& host, int port) | |
114 | : TSocket(), | |
115 | numRetries_(1), | |
116 | retryInterval_(60), | |
117 | maxConsecutiveFailures_(1), | |
118 | randomize_(true), | |
119 | alwaysTryLast_(true) { | |
120 | addServer(host, port); | |
121 | } | |
122 | ||
123 | TSocketPool::~TSocketPool() { | |
124 | vector<shared_ptr<TSocketPoolServer> >::const_iterator iter = servers_.begin(); | |
125 | vector<shared_ptr<TSocketPoolServer> >::const_iterator iterEnd = servers_.end(); | |
126 | for (; iter != iterEnd; ++iter) { | |
127 | setCurrentServer(*iter); | |
128 | TSocketPool::close(); | |
129 | } | |
130 | } | |
131 | ||
132 | void TSocketPool::addServer(const string& host, int port) { | |
133 | servers_.push_back(std::make_shared<TSocketPoolServer>(host, port)); | |
134 | } | |
135 | ||
136 | void TSocketPool::addServer(shared_ptr<TSocketPoolServer>& server) { | |
137 | if (server) { | |
138 | servers_.push_back(server); | |
139 | } | |
140 | } | |
141 | ||
142 | void TSocketPool::setServers(const vector<shared_ptr<TSocketPoolServer> >& servers) { | |
143 | servers_ = servers; | |
144 | } | |
145 | ||
146 | void TSocketPool::getServers(vector<shared_ptr<TSocketPoolServer> >& servers) { | |
147 | servers = servers_; | |
148 | } | |
149 | ||
150 | void TSocketPool::setNumRetries(int numRetries) { | |
151 | numRetries_ = numRetries; | |
152 | } | |
153 | ||
154 | void TSocketPool::setRetryInterval(int retryInterval) { | |
155 | retryInterval_ = retryInterval; | |
156 | } | |
157 | ||
158 | void TSocketPool::setMaxConsecutiveFailures(int maxConsecutiveFailures) { | |
159 | maxConsecutiveFailures_ = maxConsecutiveFailures; | |
160 | } | |
161 | ||
162 | void TSocketPool::setRandomize(bool randomize) { | |
163 | randomize_ = randomize; | |
164 | } | |
165 | ||
166 | void TSocketPool::setAlwaysTryLast(bool alwaysTryLast) { | |
167 | alwaysTryLast_ = alwaysTryLast; | |
168 | } | |
169 | ||
170 | void TSocketPool::setCurrentServer(const shared_ptr<TSocketPoolServer>& server) { | |
171 | currentServer_ = server; | |
172 | host_ = server->host_; | |
173 | port_ = server->port_; | |
174 | socket_ = server->socket_; | |
175 | } | |
176 | ||
177 | /** | |
178 | * This function throws an exception if socket open fails. When socket | |
179 | * opens fails, the socket in the current server is reset. | |
180 | */ | |
181 | /* TODO: without apc we ignore a lot of functionality from the php version */ | |
182 | void TSocketPool::open() { | |
183 | ||
184 | size_t numServers = servers_.size(); | |
185 | if (numServers == 0) { | |
186 | socket_ = THRIFT_INVALID_SOCKET; | |
187 | throw TTransportException(TTransportException::NOT_OPEN); | |
188 | } | |
189 | ||
190 | if (isOpen()) { | |
191 | return; | |
192 | } | |
193 | ||
194 | if (randomize_ && numServers > 1) { | |
195 | #if __cplusplus >= 201703L | |
196 | std::random_device rng; | |
197 | std::mt19937 urng(rng()); | |
198 | std::shuffle(servers_.begin(), servers_.end(), urng); | |
199 | #else | |
200 | std::random_shuffle(servers_.begin(), servers_.end()); | |
201 | #endif | |
202 | } | |
203 | ||
204 | for (size_t i = 0; i < numServers; ++i) { | |
205 | ||
206 | shared_ptr<TSocketPoolServer>& server = servers_[i]; | |
207 | // Impersonate the server socket | |
208 | setCurrentServer(server); | |
209 | ||
210 | if (isOpen()) { | |
211 | // already open means we're done | |
212 | return; | |
213 | } | |
214 | ||
215 | bool retryIntervalPassed = (server->lastFailTime_ == 0); | |
216 | bool isLastServer = alwaysTryLast_ ? (i == (numServers - 1)) : false; | |
217 | ||
218 | if (server->lastFailTime_ > 0) { | |
219 | // The server was marked as down, so check if enough time has elapsed to retry | |
220 | time_t elapsedTime = time(nullptr) - server->lastFailTime_; | |
221 | if (elapsedTime > retryInterval_) { | |
222 | retryIntervalPassed = true; | |
223 | } | |
224 | } | |
225 | ||
226 | if (retryIntervalPassed || isLastServer) { | |
227 | for (int j = 0; j < numRetries_; ++j) { | |
228 | try { | |
229 | TSocket::open(); | |
230 | } catch (const TException &e) { | |
231 | string errStr = "TSocketPool::open failed " + getSocketInfo() + ": " + e.what(); | |
232 | GlobalOutput(errStr.c_str()); | |
233 | socket_ = THRIFT_INVALID_SOCKET; | |
234 | continue; | |
235 | } | |
236 | ||
237 | // Copy over the opened socket so that we can keep it persistent | |
238 | server->socket_ = socket_; | |
239 | // reset lastFailTime_ is required | |
240 | server->lastFailTime_ = 0; | |
241 | // success | |
242 | return; | |
243 | } | |
244 | ||
245 | ++server->consecutiveFailures_; | |
246 | if (server->consecutiveFailures_ > maxConsecutiveFailures_) { | |
247 | // Mark server as down | |
248 | server->consecutiveFailures_ = 0; | |
249 | server->lastFailTime_ = time(nullptr); | |
250 | } | |
251 | } | |
252 | } | |
253 | ||
254 | GlobalOutput("TSocketPool::open: all connections failed"); | |
255 | throw TTransportException(TTransportException::NOT_OPEN); | |
256 | } | |
257 | ||
258 | void TSocketPool::close() { | |
259 | TSocket::close(); | |
260 | if (currentServer_) { | |
261 | currentServer_->socket_ = THRIFT_INVALID_SOCKET; | |
262 | } | |
263 | } | |
264 | } | |
265 | } | |
266 | } // apache::thrift::transport |