]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
9f95a23c | 2 | // vim: ts=8 sw=2 smarttab ft=cpp |
7c673cae FG |
3 | |
4 | #include <map> | |
5 | #include <thread> | |
6 | ||
7 | #include "rgw_period_pusher.h" | |
8 | #include "rgw_cr_rest.h" | |
11fdf7f2 | 9 | #include "rgw_zone.h" |
9f95a23c | 10 | #include "rgw_sal.h" |
f67539c2 | 11 | #include "rgw_sal_rados.h" |
11fdf7f2 TL |
12 | |
13 | #include "services/svc_zone.h" | |
14 | ||
7c673cae | 15 | #include "common/errno.h" |
11fdf7f2 | 16 | |
31f18b77 | 17 | #include <boost/asio/yield.hpp> |
7c673cae FG |
18 | |
19 | #define dout_subsys ceph_subsys_rgw | |
20 | ||
21 | #undef dout_prefix | |
22 | #define dout_prefix (*_dout << "rgw period pusher: ") | |
23 | ||
24 | /// A coroutine to post the period over the given connection. | |
25 | using PushCR = RGWPostRESTResourceCR<RGWPeriod, int>; | |
26 | ||
27 | /// A coroutine that calls PushCR, and retries with backoff until success. | |
28 | class PushAndRetryCR : public RGWCoroutine { | |
29 | const std::string& zone; | |
30 | RGWRESTConn *const conn; | |
31 | RGWHTTPManager *const http; | |
32 | RGWPeriod& period; | |
33 | const std::string epoch; //< epoch string for params | |
34 | double timeout; //< current interval between retries | |
35 | const double timeout_max; //< maximum interval between retries | |
36 | uint32_t counter; //< number of failures since backoff increased | |
37 | ||
38 | public: | |
39 | PushAndRetryCR(CephContext* cct, const std::string& zone, RGWRESTConn* conn, | |
40 | RGWHTTPManager* http, RGWPeriod& period) | |
41 | : RGWCoroutine(cct), zone(zone), conn(conn), http(http), period(period), | |
42 | epoch(std::to_string(period.get_epoch())), | |
43 | timeout(cct->_conf->rgw_period_push_interval), | |
44 | timeout_max(cct->_conf->rgw_period_push_interval_max), | |
45 | counter(0) | |
46 | {} | |
47 | ||
48 | int operate() override; | |
49 | }; | |
50 | ||
51 | int PushAndRetryCR::operate() | |
52 | { | |
53 | reenter(this) { | |
54 | for (;;) { | |
55 | yield { | |
56 | ldout(cct, 10) << "pushing period " << period.get_id() | |
57 | << " to " << zone << dendl; | |
58 | // initialize the http params | |
59 | rgw_http_param_pair params[] = { | |
60 | { "period", period.get_id().c_str() }, | |
61 | { "epoch", epoch.c_str() }, | |
62 | { nullptr, nullptr } | |
63 | }; | |
64 | call(new PushCR(cct, conn, http, "/admin/realm/period", | |
65 | params, period, nullptr)); | |
66 | } | |
67 | ||
68 | // stop on success | |
69 | if (get_ret_status() == 0) { | |
70 | ldout(cct, 10) << "push to " << zone << " succeeded" << dendl; | |
71 | return set_cr_done(); | |
72 | } | |
73 | ||
74 | // try each endpoint in the connection before waiting | |
75 | if (++counter < conn->get_endpoint_count()) | |
76 | continue; | |
77 | counter = 0; | |
78 | ||
79 | // wait with exponential backoff up to timeout_max | |
80 | yield { | |
81 | utime_t dur; | |
82 | dur.set_from_double(timeout); | |
83 | ||
84 | ldout(cct, 10) << "waiting " << dur << "s for retry.." << dendl; | |
85 | wait(dur); | |
86 | ||
87 | timeout *= 2; | |
88 | if (timeout > timeout_max) | |
89 | timeout = timeout_max; | |
90 | } | |
91 | } | |
92 | } | |
93 | return 0; | |
94 | } | |
95 | ||
96 | /** | |
97 | * PushAllCR is a coroutine that sends the period over all of the given | |
98 | * connections, retrying until they are all marked as completed. | |
99 | */ | |
100 | class PushAllCR : public RGWCoroutine { | |
101 | RGWHTTPManager *const http; | |
102 | RGWPeriod period; //< period object to push | |
103 | std::map<std::string, RGWRESTConn> conns; //< zones that need the period | |
104 | ||
105 | public: | |
106 | PushAllCR(CephContext* cct, RGWHTTPManager* http, RGWPeriod&& period, | |
107 | std::map<std::string, RGWRESTConn>&& conns) | |
108 | : RGWCoroutine(cct), http(http), | |
109 | period(std::move(period)), | |
110 | conns(std::move(conns)) | |
111 | {} | |
112 | ||
113 | int operate() override; | |
114 | }; | |
115 | ||
116 | int PushAllCR::operate() | |
117 | { | |
118 | reenter(this) { | |
119 | // spawn a coroutine to push the period over each connection | |
120 | yield { | |
121 | ldout(cct, 4) << "sending " << conns.size() << " periods" << dendl; | |
122 | for (auto& c : conns) | |
123 | spawn(new PushAndRetryCR(cct, c.first, &c.second, http, period), false); | |
124 | } | |
125 | // wait for all to complete | |
126 | drain_all(); | |
127 | return set_cr_done(); | |
128 | } | |
129 | return 0; | |
130 | } | |
131 | ||
132 | /// A background thread to run the PushAllCR coroutine and exit. | |
133 | class RGWPeriodPusher::CRThread { | |
134 | RGWCoroutinesManager coroutines; | |
135 | RGWHTTPManager http; | |
136 | boost::intrusive_ptr<PushAllCR> push_all; | |
137 | std::thread thread; | |
138 | ||
139 | public: | |
140 | CRThread(CephContext* cct, RGWPeriod&& period, | |
141 | std::map<std::string, RGWRESTConn>&& conns) | |
142 | : coroutines(cct, NULL), | |
143 | http(cct, coroutines.get_completion_mgr()), | |
144 | push_all(new PushAllCR(cct, &http, std::move(period), std::move(conns))) | |
145 | { | |
11fdf7f2 TL |
146 | http.start(); |
147 | // must spawn the CR thread after start | |
7c673cae FG |
148 | thread = std::thread([this] { coroutines.run(push_all.get()); }); |
149 | } | |
150 | ~CRThread() | |
151 | { | |
152 | push_all.reset(); | |
153 | coroutines.stop(); | |
154 | http.stop(); | |
155 | if (thread.joinable()) | |
156 | thread.join(); | |
157 | } | |
158 | }; | |
159 | ||
160 | ||
f67539c2 TL |
161 | RGWPeriodPusher::RGWPeriodPusher(rgw::sal::RGWRadosStore* store, |
162 | optional_yield y) | |
7c673cae FG |
163 | : cct(store->ctx()), store(store) |
164 | { | |
9f95a23c | 165 | const auto& realm = store->svc()->zone->get_realm(); |
7c673cae FG |
166 | auto& realm_id = realm.get_id(); |
167 | if (realm_id.empty()) // no realm configuration | |
168 | return; | |
169 | ||
170 | // always send out the current period on startup | |
171 | RGWPeriod period; | |
f67539c2 | 172 | int r = period.init(cct, store->svc()->sysobj, realm_id, y, realm.get_name()); |
7c673cae FG |
173 | if (r < 0) { |
174 | lderr(cct) << "failed to load period for realm " << realm_id << dendl; | |
175 | return; | |
176 | } | |
177 | ||
178 | std::lock_guard<std::mutex> lock(mutex); | |
179 | handle_notify(std::move(period)); | |
180 | } | |
181 | ||
182 | // destructor is here because CRThread is incomplete in the header | |
183 | RGWPeriodPusher::~RGWPeriodPusher() = default; | |
184 | ||
185 | void RGWPeriodPusher::handle_notify(RGWRealmNotify type, | |
11fdf7f2 | 186 | bufferlist::const_iterator& p) |
7c673cae FG |
187 | { |
188 | // decode the period | |
189 | RGWZonesNeedPeriod info; | |
190 | try { | |
11fdf7f2 | 191 | decode(info, p); |
7c673cae FG |
192 | } catch (buffer::error& e) { |
193 | lderr(cct) << "Failed to decode the period: " << e.what() << dendl; | |
194 | return; | |
195 | } | |
196 | ||
197 | std::lock_guard<std::mutex> lock(mutex); | |
198 | ||
199 | // we can't process this notification without access to our current realm | |
200 | // configuration. queue it until resume() | |
201 | if (store == nullptr) { | |
202 | pending_periods.emplace_back(std::move(info)); | |
203 | return; | |
204 | } | |
205 | ||
206 | handle_notify(std::move(info)); | |
207 | } | |
208 | ||
209 | // expects the caller to hold a lock on mutex | |
210 | void RGWPeriodPusher::handle_notify(RGWZonesNeedPeriod&& period) | |
211 | { | |
212 | if (period.get_realm_epoch() < realm_epoch) { | |
213 | ldout(cct, 10) << "period's realm epoch " << period.get_realm_epoch() | |
214 | << " is not newer than current realm epoch " << realm_epoch | |
215 | << ", discarding update" << dendl; | |
216 | return; | |
217 | } | |
218 | if (period.get_realm_epoch() == realm_epoch && | |
219 | period.get_epoch() <= period_epoch) { | |
220 | ldout(cct, 10) << "period epoch " << period.get_epoch() << " is not newer " | |
221 | "than current epoch " << period_epoch << ", discarding update" << dendl; | |
222 | return; | |
223 | } | |
224 | ||
225 | // find our zonegroup in the new period | |
226 | auto& zonegroups = period.get_map().zonegroups; | |
9f95a23c | 227 | auto i = zonegroups.find(store->svc()->zone->get_zonegroup().get_id()); |
7c673cae FG |
228 | if (i == zonegroups.end()) { |
229 | lderr(cct) << "The new period does not contain my zonegroup!" << dendl; | |
230 | return; | |
231 | } | |
232 | auto& my_zonegroup = i->second; | |
233 | ||
234 | // if we're not a master zone, we're not responsible for pushing any updates | |
9f95a23c | 235 | if (my_zonegroup.master_zone != store->svc()->zone->get_zone_params().get_id()) |
7c673cae FG |
236 | return; |
237 | ||
238 | // construct a map of the zones that need this period. the map uses the same | |
239 | // keys/ordering as the zone[group] map, so we can use a hint for insertions | |
240 | std::map<std::string, RGWRESTConn> conns; | |
241 | auto hint = conns.end(); | |
242 | ||
243 | // are we the master zonegroup in this period? | |
9f95a23c | 244 | if (period.get_map().master_zonegroup == store->svc()->zone->get_zonegroup().get_id()) { |
7c673cae FG |
245 | // update other zonegroup endpoints |
246 | for (auto& zg : zonegroups) { | |
247 | auto& zonegroup = zg.second; | |
9f95a23c | 248 | if (zonegroup.get_id() == store->svc()->zone->get_zonegroup().get_id()) |
7c673cae FG |
249 | continue; |
250 | if (zonegroup.endpoints.empty()) | |
251 | continue; | |
252 | ||
253 | hint = conns.emplace_hint( | |
254 | hint, std::piecewise_construct, | |
255 | std::forward_as_tuple(zonegroup.get_id()), | |
9f95a23c | 256 | std::forward_as_tuple(cct, store->svc()->zone, zonegroup.get_id(), zonegroup.endpoints)); |
7c673cae FG |
257 | } |
258 | } | |
259 | ||
260 | // update other zone endpoints | |
261 | for (auto& z : my_zonegroup.zones) { | |
262 | auto& zone = z.second; | |
9f95a23c | 263 | if (zone.id == store->svc()->zone->get_zone_params().get_id()) |
7c673cae FG |
264 | continue; |
265 | if (zone.endpoints.empty()) | |
266 | continue; | |
267 | ||
268 | hint = conns.emplace_hint( | |
269 | hint, std::piecewise_construct, | |
270 | std::forward_as_tuple(zone.id), | |
9f95a23c | 271 | std::forward_as_tuple(cct, store->svc()->zone, zone.id, zone.endpoints)); |
7c673cae FG |
272 | } |
273 | ||
274 | if (conns.empty()) { | |
275 | ldout(cct, 4) << "No zones to update" << dendl; | |
276 | return; | |
277 | } | |
278 | ||
279 | realm_epoch = period.get_realm_epoch(); | |
280 | period_epoch = period.get_epoch(); | |
281 | ||
282 | ldout(cct, 4) << "Zone master pushing period " << period.get_id() | |
283 | << " epoch " << period_epoch << " to " | |
284 | << conns.size() << " other zones" << dendl; | |
285 | ||
286 | // spawn a new coroutine thread, destroying the previous one | |
287 | cr_thread.reset(new CRThread(cct, std::move(period), std::move(conns))); | |
288 | } | |
289 | ||
290 | void RGWPeriodPusher::pause() | |
291 | { | |
292 | ldout(cct, 4) << "paused for realm update" << dendl; | |
293 | std::lock_guard<std::mutex> lock(mutex); | |
294 | store = nullptr; | |
295 | } | |
296 | ||
9f95a23c | 297 | void RGWPeriodPusher::resume(rgw::sal::RGWRadosStore* store) |
7c673cae FG |
298 | { |
299 | std::lock_guard<std::mutex> lock(mutex); | |
300 | this->store = store; | |
301 | ||
302 | ldout(cct, 4) << "resume with " << pending_periods.size() | |
303 | << " periods pending" << dendl; | |
304 | ||
305 | // process notification queue | |
306 | for (auto& info : pending_periods) { | |
307 | handle_notify(std::move(info)); | |
308 | } | |
309 | pending_periods.clear(); | |
310 | } |