]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_period_pusher.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rgw / rgw_period_pusher.cc
CommitLineData
7c673cae 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
7c673cae
FG
3
4#include <map>
5#include <thread>
6
7#include "rgw_period_pusher.h"
8#include "rgw_cr_rest.h"
11fdf7f2 9#include "rgw_zone.h"
9f95a23c 10#include "rgw_sal.h"
f67539c2 11#include "rgw_sal_rados.h"
11fdf7f2
TL
12
13#include "services/svc_zone.h"
14
7c673cae 15#include "common/errno.h"
11fdf7f2 16
31f18b77 17#include <boost/asio/yield.hpp>
7c673cae
FG
18
19#define dout_subsys ceph_subsys_rgw
20
21#undef dout_prefix
22#define dout_prefix (*_dout << "rgw period pusher: ")
23
24/// A coroutine to post the period over the given connection.
25using PushCR = RGWPostRESTResourceCR<RGWPeriod, int>;
26
27/// A coroutine that calls PushCR, and retries with backoff until success.
28class PushAndRetryCR : public RGWCoroutine {
29 const std::string& zone;
30 RGWRESTConn *const conn;
31 RGWHTTPManager *const http;
32 RGWPeriod& period;
33 const std::string epoch; //< epoch string for params
34 double timeout; //< current interval between retries
35 const double timeout_max; //< maximum interval between retries
36 uint32_t counter; //< number of failures since backoff increased
37
38 public:
39 PushAndRetryCR(CephContext* cct, const std::string& zone, RGWRESTConn* conn,
40 RGWHTTPManager* http, RGWPeriod& period)
41 : RGWCoroutine(cct), zone(zone), conn(conn), http(http), period(period),
42 epoch(std::to_string(period.get_epoch())),
43 timeout(cct->_conf->rgw_period_push_interval),
44 timeout_max(cct->_conf->rgw_period_push_interval_max),
45 counter(0)
46 {}
47
48 int operate() override;
49};
50
51int PushAndRetryCR::operate()
52{
53 reenter(this) {
54 for (;;) {
55 yield {
56 ldout(cct, 10) << "pushing period " << period.get_id()
57 << " to " << zone << dendl;
58 // initialize the http params
59 rgw_http_param_pair params[] = {
60 { "period", period.get_id().c_str() },
61 { "epoch", epoch.c_str() },
62 { nullptr, nullptr }
63 };
64 call(new PushCR(cct, conn, http, "/admin/realm/period",
65 params, period, nullptr));
66 }
67
68 // stop on success
69 if (get_ret_status() == 0) {
70 ldout(cct, 10) << "push to " << zone << " succeeded" << dendl;
71 return set_cr_done();
72 }
73
74 // try each endpoint in the connection before waiting
75 if (++counter < conn->get_endpoint_count())
76 continue;
77 counter = 0;
78
79 // wait with exponential backoff up to timeout_max
80 yield {
81 utime_t dur;
82 dur.set_from_double(timeout);
83
84 ldout(cct, 10) << "waiting " << dur << "s for retry.." << dendl;
85 wait(dur);
86
87 timeout *= 2;
88 if (timeout > timeout_max)
89 timeout = timeout_max;
90 }
91 }
92 }
93 return 0;
94}
95
96/**
97 * PushAllCR is a coroutine that sends the period over all of the given
98 * connections, retrying until they are all marked as completed.
99 */
100class PushAllCR : public RGWCoroutine {
101 RGWHTTPManager *const http;
102 RGWPeriod period; //< period object to push
103 std::map<std::string, RGWRESTConn> conns; //< zones that need the period
104
105 public:
106 PushAllCR(CephContext* cct, RGWHTTPManager* http, RGWPeriod&& period,
107 std::map<std::string, RGWRESTConn>&& conns)
108 : RGWCoroutine(cct), http(http),
109 period(std::move(period)),
110 conns(std::move(conns))
111 {}
112
113 int operate() override;
114};
115
116int PushAllCR::operate()
117{
118 reenter(this) {
119 // spawn a coroutine to push the period over each connection
120 yield {
121 ldout(cct, 4) << "sending " << conns.size() << " periods" << dendl;
122 for (auto& c : conns)
123 spawn(new PushAndRetryCR(cct, c.first, &c.second, http, period), false);
124 }
125 // wait for all to complete
126 drain_all();
127 return set_cr_done();
128 }
129 return 0;
130}
131
132/// A background thread to run the PushAllCR coroutine and exit.
133class RGWPeriodPusher::CRThread {
134 RGWCoroutinesManager coroutines;
135 RGWHTTPManager http;
136 boost::intrusive_ptr<PushAllCR> push_all;
137 std::thread thread;
138
139 public:
140 CRThread(CephContext* cct, RGWPeriod&& period,
141 std::map<std::string, RGWRESTConn>&& conns)
142 : coroutines(cct, NULL),
143 http(cct, coroutines.get_completion_mgr()),
144 push_all(new PushAllCR(cct, &http, std::move(period), std::move(conns)))
145 {
11fdf7f2
TL
146 http.start();
147 // must spawn the CR thread after start
7c673cae
FG
148 thread = std::thread([this] { coroutines.run(push_all.get()); });
149 }
150 ~CRThread()
151 {
152 push_all.reset();
153 coroutines.stop();
154 http.stop();
155 if (thread.joinable())
156 thread.join();
157 }
158};
159
160
f67539c2
TL
161RGWPeriodPusher::RGWPeriodPusher(rgw::sal::RGWRadosStore* store,
162 optional_yield y)
7c673cae
FG
163 : cct(store->ctx()), store(store)
164{
9f95a23c 165 const auto& realm = store->svc()->zone->get_realm();
7c673cae
FG
166 auto& realm_id = realm.get_id();
167 if (realm_id.empty()) // no realm configuration
168 return;
169
170 // always send out the current period on startup
171 RGWPeriod period;
f67539c2 172 int r = period.init(cct, store->svc()->sysobj, realm_id, y, realm.get_name());
7c673cae
FG
173 if (r < 0) {
174 lderr(cct) << "failed to load period for realm " << realm_id << dendl;
175 return;
176 }
177
178 std::lock_guard<std::mutex> lock(mutex);
179 handle_notify(std::move(period));
180}
181
182// destructor is here because CRThread is incomplete in the header
183RGWPeriodPusher::~RGWPeriodPusher() = default;
184
185void RGWPeriodPusher::handle_notify(RGWRealmNotify type,
11fdf7f2 186 bufferlist::const_iterator& p)
7c673cae
FG
187{
188 // decode the period
189 RGWZonesNeedPeriod info;
190 try {
11fdf7f2 191 decode(info, p);
7c673cae
FG
192 } catch (buffer::error& e) {
193 lderr(cct) << "Failed to decode the period: " << e.what() << dendl;
194 return;
195 }
196
197 std::lock_guard<std::mutex> lock(mutex);
198
199 // we can't process this notification without access to our current realm
200 // configuration. queue it until resume()
201 if (store == nullptr) {
202 pending_periods.emplace_back(std::move(info));
203 return;
204 }
205
206 handle_notify(std::move(info));
207}
208
209// expects the caller to hold a lock on mutex
210void RGWPeriodPusher::handle_notify(RGWZonesNeedPeriod&& period)
211{
212 if (period.get_realm_epoch() < realm_epoch) {
213 ldout(cct, 10) << "period's realm epoch " << period.get_realm_epoch()
214 << " is not newer than current realm epoch " << realm_epoch
215 << ", discarding update" << dendl;
216 return;
217 }
218 if (period.get_realm_epoch() == realm_epoch &&
219 period.get_epoch() <= period_epoch) {
220 ldout(cct, 10) << "period epoch " << period.get_epoch() << " is not newer "
221 "than current epoch " << period_epoch << ", discarding update" << dendl;
222 return;
223 }
224
225 // find our zonegroup in the new period
226 auto& zonegroups = period.get_map().zonegroups;
9f95a23c 227 auto i = zonegroups.find(store->svc()->zone->get_zonegroup().get_id());
7c673cae
FG
228 if (i == zonegroups.end()) {
229 lderr(cct) << "The new period does not contain my zonegroup!" << dendl;
230 return;
231 }
232 auto& my_zonegroup = i->second;
233
234 // if we're not a master zone, we're not responsible for pushing any updates
9f95a23c 235 if (my_zonegroup.master_zone != store->svc()->zone->get_zone_params().get_id())
7c673cae
FG
236 return;
237
238 // construct a map of the zones that need this period. the map uses the same
239 // keys/ordering as the zone[group] map, so we can use a hint for insertions
240 std::map<std::string, RGWRESTConn> conns;
241 auto hint = conns.end();
242
243 // are we the master zonegroup in this period?
9f95a23c 244 if (period.get_map().master_zonegroup == store->svc()->zone->get_zonegroup().get_id()) {
7c673cae
FG
245 // update other zonegroup endpoints
246 for (auto& zg : zonegroups) {
247 auto& zonegroup = zg.second;
9f95a23c 248 if (zonegroup.get_id() == store->svc()->zone->get_zonegroup().get_id())
7c673cae
FG
249 continue;
250 if (zonegroup.endpoints.empty())
251 continue;
252
253 hint = conns.emplace_hint(
254 hint, std::piecewise_construct,
255 std::forward_as_tuple(zonegroup.get_id()),
9f95a23c 256 std::forward_as_tuple(cct, store->svc()->zone, zonegroup.get_id(), zonegroup.endpoints));
7c673cae
FG
257 }
258 }
259
260 // update other zone endpoints
261 for (auto& z : my_zonegroup.zones) {
262 auto& zone = z.second;
9f95a23c 263 if (zone.id == store->svc()->zone->get_zone_params().get_id())
7c673cae
FG
264 continue;
265 if (zone.endpoints.empty())
266 continue;
267
268 hint = conns.emplace_hint(
269 hint, std::piecewise_construct,
270 std::forward_as_tuple(zone.id),
9f95a23c 271 std::forward_as_tuple(cct, store->svc()->zone, zone.id, zone.endpoints));
7c673cae
FG
272 }
273
274 if (conns.empty()) {
275 ldout(cct, 4) << "No zones to update" << dendl;
276 return;
277 }
278
279 realm_epoch = period.get_realm_epoch();
280 period_epoch = period.get_epoch();
281
282 ldout(cct, 4) << "Zone master pushing period " << period.get_id()
283 << " epoch " << period_epoch << " to "
284 << conns.size() << " other zones" << dendl;
285
286 // spawn a new coroutine thread, destroying the previous one
287 cr_thread.reset(new CRThread(cct, std::move(period), std::move(conns)));
288}
289
290void RGWPeriodPusher::pause()
291{
292 ldout(cct, 4) << "paused for realm update" << dendl;
293 std::lock_guard<std::mutex> lock(mutex);
294 store = nullptr;
295}
296
9f95a23c 297void RGWPeriodPusher::resume(rgw::sal::RGWRadosStore* store)
7c673cae
FG
298{
299 std::lock_guard<std::mutex> lock(mutex);
300 this->store = store;
301
302 ldout(cct, 4) << "resume with " << pending_periods.size()
303 << " periods pending" << dendl;
304
305 // process notification queue
306 for (auto& info : pending_periods) {
307 handle_notify(std::move(info));
308 }
309 pending_periods.clear();
310}