]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
9f95a23c | 2 | // vim: ts=8 sw=2 smarttab ft=cpp |
7c673cae FG |
3 | |
4 | #include <map> | |
5 | #include <thread> | |
6 | ||
7 | #include "rgw_period_pusher.h" | |
8 | #include "rgw_cr_rest.h" | |
11fdf7f2 | 9 | #include "rgw_zone.h" |
9f95a23c | 10 | #include "rgw_sal.h" |
11fdf7f2 TL |
11 | |
12 | #include "services/svc_zone.h" | |
13 | ||
7c673cae | 14 | #include "common/errno.h" |
11fdf7f2 | 15 | |
31f18b77 | 16 | #include <boost/asio/yield.hpp> |
7c673cae FG |
17 | |
18 | #define dout_subsys ceph_subsys_rgw | |
19 | ||
20 | #undef dout_prefix | |
21 | #define dout_prefix (*_dout << "rgw period pusher: ") | |
22 | ||
23 | /// A coroutine to post the period over the given connection. | |
24 | using PushCR = RGWPostRESTResourceCR<RGWPeriod, int>; | |
25 | ||
26 | /// A coroutine that calls PushCR, and retries with backoff until success. | |
27 | class PushAndRetryCR : public RGWCoroutine { | |
28 | const std::string& zone; | |
29 | RGWRESTConn *const conn; | |
30 | RGWHTTPManager *const http; | |
31 | RGWPeriod& period; | |
32 | const std::string epoch; //< epoch string for params | |
33 | double timeout; //< current interval between retries | |
34 | const double timeout_max; //< maximum interval between retries | |
35 | uint32_t counter; //< number of failures since backoff increased | |
36 | ||
37 | public: | |
38 | PushAndRetryCR(CephContext* cct, const std::string& zone, RGWRESTConn* conn, | |
39 | RGWHTTPManager* http, RGWPeriod& period) | |
40 | : RGWCoroutine(cct), zone(zone), conn(conn), http(http), period(period), | |
41 | epoch(std::to_string(period.get_epoch())), | |
42 | timeout(cct->_conf->rgw_period_push_interval), | |
43 | timeout_max(cct->_conf->rgw_period_push_interval_max), | |
44 | counter(0) | |
45 | {} | |
46 | ||
47 | int operate() override; | |
48 | }; | |
49 | ||
50 | int PushAndRetryCR::operate() | |
51 | { | |
52 | reenter(this) { | |
53 | for (;;) { | |
54 | yield { | |
55 | ldout(cct, 10) << "pushing period " << period.get_id() | |
56 | << " to " << zone << dendl; | |
57 | // initialize the http params | |
58 | rgw_http_param_pair params[] = { | |
59 | { "period", period.get_id().c_str() }, | |
60 | { "epoch", epoch.c_str() }, | |
61 | { nullptr, nullptr } | |
62 | }; | |
63 | call(new PushCR(cct, conn, http, "/admin/realm/period", | |
64 | params, period, nullptr)); | |
65 | } | |
66 | ||
67 | // stop on success | |
68 | if (get_ret_status() == 0) { | |
69 | ldout(cct, 10) << "push to " << zone << " succeeded" << dendl; | |
70 | return set_cr_done(); | |
71 | } | |
72 | ||
73 | // try each endpoint in the connection before waiting | |
74 | if (++counter < conn->get_endpoint_count()) | |
75 | continue; | |
76 | counter = 0; | |
77 | ||
78 | // wait with exponential backoff up to timeout_max | |
79 | yield { | |
80 | utime_t dur; | |
81 | dur.set_from_double(timeout); | |
82 | ||
83 | ldout(cct, 10) << "waiting " << dur << "s for retry.." << dendl; | |
84 | wait(dur); | |
85 | ||
86 | timeout *= 2; | |
87 | if (timeout > timeout_max) | |
88 | timeout = timeout_max; | |
89 | } | |
90 | } | |
91 | } | |
92 | return 0; | |
93 | } | |
94 | ||
95 | /** | |
96 | * PushAllCR is a coroutine that sends the period over all of the given | |
97 | * connections, retrying until they are all marked as completed. | |
98 | */ | |
99 | class PushAllCR : public RGWCoroutine { | |
100 | RGWHTTPManager *const http; | |
101 | RGWPeriod period; //< period object to push | |
102 | std::map<std::string, RGWRESTConn> conns; //< zones that need the period | |
103 | ||
104 | public: | |
105 | PushAllCR(CephContext* cct, RGWHTTPManager* http, RGWPeriod&& period, | |
106 | std::map<std::string, RGWRESTConn>&& conns) | |
107 | : RGWCoroutine(cct), http(http), | |
108 | period(std::move(period)), | |
109 | conns(std::move(conns)) | |
110 | {} | |
111 | ||
112 | int operate() override; | |
113 | }; | |
114 | ||
115 | int PushAllCR::operate() | |
116 | { | |
117 | reenter(this) { | |
118 | // spawn a coroutine to push the period over each connection | |
119 | yield { | |
120 | ldout(cct, 4) << "sending " << conns.size() << " periods" << dendl; | |
121 | for (auto& c : conns) | |
122 | spawn(new PushAndRetryCR(cct, c.first, &c.second, http, period), false); | |
123 | } | |
124 | // wait for all to complete | |
125 | drain_all(); | |
126 | return set_cr_done(); | |
127 | } | |
128 | return 0; | |
129 | } | |
130 | ||
131 | /// A background thread to run the PushAllCR coroutine and exit. | |
132 | class RGWPeriodPusher::CRThread { | |
133 | RGWCoroutinesManager coroutines; | |
134 | RGWHTTPManager http; | |
135 | boost::intrusive_ptr<PushAllCR> push_all; | |
136 | std::thread thread; | |
137 | ||
138 | public: | |
139 | CRThread(CephContext* cct, RGWPeriod&& period, | |
140 | std::map<std::string, RGWRESTConn>&& conns) | |
141 | : coroutines(cct, NULL), | |
142 | http(cct, coroutines.get_completion_mgr()), | |
143 | push_all(new PushAllCR(cct, &http, std::move(period), std::move(conns))) | |
144 | { | |
11fdf7f2 TL |
145 | http.start(); |
146 | // must spawn the CR thread after start | |
7c673cae FG |
147 | thread = std::thread([this] { coroutines.run(push_all.get()); }); |
148 | } | |
149 | ~CRThread() | |
150 | { | |
151 | push_all.reset(); | |
152 | coroutines.stop(); | |
153 | http.stop(); | |
154 | if (thread.joinable()) | |
155 | thread.join(); | |
156 | } | |
157 | }; | |
158 | ||
159 | ||
9f95a23c | 160 | RGWPeriodPusher::RGWPeriodPusher(rgw::sal::RGWRadosStore* store) |
7c673cae FG |
161 | : cct(store->ctx()), store(store) |
162 | { | |
9f95a23c | 163 | const auto& realm = store->svc()->zone->get_realm(); |
7c673cae FG |
164 | auto& realm_id = realm.get_id(); |
165 | if (realm_id.empty()) // no realm configuration | |
166 | return; | |
167 | ||
168 | // always send out the current period on startup | |
169 | RGWPeriod period; | |
9f95a23c | 170 | int r = period.init(cct, store->svc()->sysobj, realm_id, realm.get_name()); |
7c673cae FG |
171 | if (r < 0) { |
172 | lderr(cct) << "failed to load period for realm " << realm_id << dendl; | |
173 | return; | |
174 | } | |
175 | ||
176 | std::lock_guard<std::mutex> lock(mutex); | |
177 | handle_notify(std::move(period)); | |
178 | } | |
179 | ||
180 | // destructor is here because CRThread is incomplete in the header | |
181 | RGWPeriodPusher::~RGWPeriodPusher() = default; | |
182 | ||
183 | void RGWPeriodPusher::handle_notify(RGWRealmNotify type, | |
11fdf7f2 | 184 | bufferlist::const_iterator& p) |
7c673cae FG |
185 | { |
186 | // decode the period | |
187 | RGWZonesNeedPeriod info; | |
188 | try { | |
11fdf7f2 | 189 | decode(info, p); |
7c673cae FG |
190 | } catch (buffer::error& e) { |
191 | lderr(cct) << "Failed to decode the period: " << e.what() << dendl; | |
192 | return; | |
193 | } | |
194 | ||
195 | std::lock_guard<std::mutex> lock(mutex); | |
196 | ||
197 | // we can't process this notification without access to our current realm | |
198 | // configuration. queue it until resume() | |
199 | if (store == nullptr) { | |
200 | pending_periods.emplace_back(std::move(info)); | |
201 | return; | |
202 | } | |
203 | ||
204 | handle_notify(std::move(info)); | |
205 | } | |
206 | ||
207 | // expects the caller to hold a lock on mutex | |
208 | void RGWPeriodPusher::handle_notify(RGWZonesNeedPeriod&& period) | |
209 | { | |
210 | if (period.get_realm_epoch() < realm_epoch) { | |
211 | ldout(cct, 10) << "period's realm epoch " << period.get_realm_epoch() | |
212 | << " is not newer than current realm epoch " << realm_epoch | |
213 | << ", discarding update" << dendl; | |
214 | return; | |
215 | } | |
216 | if (period.get_realm_epoch() == realm_epoch && | |
217 | period.get_epoch() <= period_epoch) { | |
218 | ldout(cct, 10) << "period epoch " << period.get_epoch() << " is not newer " | |
219 | "than current epoch " << period_epoch << ", discarding update" << dendl; | |
220 | return; | |
221 | } | |
222 | ||
223 | // find our zonegroup in the new period | |
224 | auto& zonegroups = period.get_map().zonegroups; | |
9f95a23c | 225 | auto i = zonegroups.find(store->svc()->zone->get_zonegroup().get_id()); |
7c673cae FG |
226 | if (i == zonegroups.end()) { |
227 | lderr(cct) << "The new period does not contain my zonegroup!" << dendl; | |
228 | return; | |
229 | } | |
230 | auto& my_zonegroup = i->second; | |
231 | ||
232 | // if we're not a master zone, we're not responsible for pushing any updates | |
9f95a23c | 233 | if (my_zonegroup.master_zone != store->svc()->zone->get_zone_params().get_id()) |
7c673cae FG |
234 | return; |
235 | ||
236 | // construct a map of the zones that need this period. the map uses the same | |
237 | // keys/ordering as the zone[group] map, so we can use a hint for insertions | |
238 | std::map<std::string, RGWRESTConn> conns; | |
239 | auto hint = conns.end(); | |
240 | ||
241 | // are we the master zonegroup in this period? | |
9f95a23c | 242 | if (period.get_map().master_zonegroup == store->svc()->zone->get_zonegroup().get_id()) { |
7c673cae FG |
243 | // update other zonegroup endpoints |
244 | for (auto& zg : zonegroups) { | |
245 | auto& zonegroup = zg.second; | |
9f95a23c | 246 | if (zonegroup.get_id() == store->svc()->zone->get_zonegroup().get_id()) |
7c673cae FG |
247 | continue; |
248 | if (zonegroup.endpoints.empty()) | |
249 | continue; | |
250 | ||
251 | hint = conns.emplace_hint( | |
252 | hint, std::piecewise_construct, | |
253 | std::forward_as_tuple(zonegroup.get_id()), | |
9f95a23c | 254 | std::forward_as_tuple(cct, store->svc()->zone, zonegroup.get_id(), zonegroup.endpoints)); |
7c673cae FG |
255 | } |
256 | } | |
257 | ||
258 | // update other zone endpoints | |
259 | for (auto& z : my_zonegroup.zones) { | |
260 | auto& zone = z.second; | |
9f95a23c | 261 | if (zone.id == store->svc()->zone->get_zone_params().get_id()) |
7c673cae FG |
262 | continue; |
263 | if (zone.endpoints.empty()) | |
264 | continue; | |
265 | ||
266 | hint = conns.emplace_hint( | |
267 | hint, std::piecewise_construct, | |
268 | std::forward_as_tuple(zone.id), | |
9f95a23c | 269 | std::forward_as_tuple(cct, store->svc()->zone, zone.id, zone.endpoints)); |
7c673cae FG |
270 | } |
271 | ||
272 | if (conns.empty()) { | |
273 | ldout(cct, 4) << "No zones to update" << dendl; | |
274 | return; | |
275 | } | |
276 | ||
277 | realm_epoch = period.get_realm_epoch(); | |
278 | period_epoch = period.get_epoch(); | |
279 | ||
280 | ldout(cct, 4) << "Zone master pushing period " << period.get_id() | |
281 | << " epoch " << period_epoch << " to " | |
282 | << conns.size() << " other zones" << dendl; | |
283 | ||
284 | // spawn a new coroutine thread, destroying the previous one | |
285 | cr_thread.reset(new CRThread(cct, std::move(period), std::move(conns))); | |
286 | } | |
287 | ||
288 | void RGWPeriodPusher::pause() | |
289 | { | |
290 | ldout(cct, 4) << "paused for realm update" << dendl; | |
291 | std::lock_guard<std::mutex> lock(mutex); | |
292 | store = nullptr; | |
293 | } | |
294 | ||
9f95a23c | 295 | void RGWPeriodPusher::resume(rgw::sal::RGWRadosStore* store) |
7c673cae FG |
296 | { |
297 | std::lock_guard<std::mutex> lock(mutex); | |
298 | this->store = store; | |
299 | ||
300 | ldout(cct, 4) << "resume with " << pending_periods.size() | |
301 | << " periods pending" << dendl; | |
302 | ||
303 | // process notification queue | |
304 | for (auto& info : pending_periods) { | |
305 | handle_notify(std::move(info)); | |
306 | } | |
307 | pending_periods.clear(); | |
308 | } |