]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_period_pusher.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / rgw / rgw_period_pusher.cc
CommitLineData
7c673cae 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
7c673cae
FG
3
4#include <map>
5#include <thread>
6
7#include "rgw_period_pusher.h"
8#include "rgw_cr_rest.h"
11fdf7f2 9#include "rgw_zone.h"
9f95a23c 10#include "rgw_sal.h"
11fdf7f2
TL
11
12#include "services/svc_zone.h"
13
7c673cae 14#include "common/errno.h"
11fdf7f2 15
31f18b77 16#include <boost/asio/yield.hpp>
7c673cae
FG
17
18#define dout_subsys ceph_subsys_rgw
19
20#undef dout_prefix
21#define dout_prefix (*_dout << "rgw period pusher: ")
22
23/// A coroutine to post the period over the given connection.
24using PushCR = RGWPostRESTResourceCR<RGWPeriod, int>;
25
26/// A coroutine that calls PushCR, and retries with backoff until success.
27class PushAndRetryCR : public RGWCoroutine {
28 const std::string& zone;
29 RGWRESTConn *const conn;
30 RGWHTTPManager *const http;
31 RGWPeriod& period;
32 const std::string epoch; //< epoch string for params
33 double timeout; //< current interval between retries
34 const double timeout_max; //< maximum interval between retries
35 uint32_t counter; //< number of failures since backoff increased
36
37 public:
38 PushAndRetryCR(CephContext* cct, const std::string& zone, RGWRESTConn* conn,
39 RGWHTTPManager* http, RGWPeriod& period)
40 : RGWCoroutine(cct), zone(zone), conn(conn), http(http), period(period),
41 epoch(std::to_string(period.get_epoch())),
42 timeout(cct->_conf->rgw_period_push_interval),
43 timeout_max(cct->_conf->rgw_period_push_interval_max),
44 counter(0)
45 {}
46
47 int operate() override;
48};
49
50int PushAndRetryCR::operate()
51{
52 reenter(this) {
53 for (;;) {
54 yield {
55 ldout(cct, 10) << "pushing period " << period.get_id()
56 << " to " << zone << dendl;
57 // initialize the http params
58 rgw_http_param_pair params[] = {
59 { "period", period.get_id().c_str() },
60 { "epoch", epoch.c_str() },
61 { nullptr, nullptr }
62 };
63 call(new PushCR(cct, conn, http, "/admin/realm/period",
64 params, period, nullptr));
65 }
66
67 // stop on success
68 if (get_ret_status() == 0) {
69 ldout(cct, 10) << "push to " << zone << " succeeded" << dendl;
70 return set_cr_done();
71 }
72
73 // try each endpoint in the connection before waiting
74 if (++counter < conn->get_endpoint_count())
75 continue;
76 counter = 0;
77
78 // wait with exponential backoff up to timeout_max
79 yield {
80 utime_t dur;
81 dur.set_from_double(timeout);
82
83 ldout(cct, 10) << "waiting " << dur << "s for retry.." << dendl;
84 wait(dur);
85
86 timeout *= 2;
87 if (timeout > timeout_max)
88 timeout = timeout_max;
89 }
90 }
91 }
92 return 0;
93}
94
95/**
96 * PushAllCR is a coroutine that sends the period over all of the given
97 * connections, retrying until they are all marked as completed.
98 */
99class PushAllCR : public RGWCoroutine {
100 RGWHTTPManager *const http;
101 RGWPeriod period; //< period object to push
102 std::map<std::string, RGWRESTConn> conns; //< zones that need the period
103
104 public:
105 PushAllCR(CephContext* cct, RGWHTTPManager* http, RGWPeriod&& period,
106 std::map<std::string, RGWRESTConn>&& conns)
107 : RGWCoroutine(cct), http(http),
108 period(std::move(period)),
109 conns(std::move(conns))
110 {}
111
112 int operate() override;
113};
114
115int PushAllCR::operate()
116{
117 reenter(this) {
118 // spawn a coroutine to push the period over each connection
119 yield {
120 ldout(cct, 4) << "sending " << conns.size() << " periods" << dendl;
121 for (auto& c : conns)
122 spawn(new PushAndRetryCR(cct, c.first, &c.second, http, period), false);
123 }
124 // wait for all to complete
125 drain_all();
126 return set_cr_done();
127 }
128 return 0;
129}
130
131/// A background thread to run the PushAllCR coroutine and exit.
132class RGWPeriodPusher::CRThread {
133 RGWCoroutinesManager coroutines;
134 RGWHTTPManager http;
135 boost::intrusive_ptr<PushAllCR> push_all;
136 std::thread thread;
137
138 public:
139 CRThread(CephContext* cct, RGWPeriod&& period,
140 std::map<std::string, RGWRESTConn>&& conns)
141 : coroutines(cct, NULL),
142 http(cct, coroutines.get_completion_mgr()),
143 push_all(new PushAllCR(cct, &http, std::move(period), std::move(conns)))
144 {
11fdf7f2
TL
145 http.start();
146 // must spawn the CR thread after start
7c673cae
FG
147 thread = std::thread([this] { coroutines.run(push_all.get()); });
148 }
149 ~CRThread()
150 {
151 push_all.reset();
152 coroutines.stop();
153 http.stop();
154 if (thread.joinable())
155 thread.join();
156 }
157};
158
159
9f95a23c 160RGWPeriodPusher::RGWPeriodPusher(rgw::sal::RGWRadosStore* store)
7c673cae
FG
161 : cct(store->ctx()), store(store)
162{
9f95a23c 163 const auto& realm = store->svc()->zone->get_realm();
7c673cae
FG
164 auto& realm_id = realm.get_id();
165 if (realm_id.empty()) // no realm configuration
166 return;
167
168 // always send out the current period on startup
169 RGWPeriod period;
9f95a23c 170 int r = period.init(cct, store->svc()->sysobj, realm_id, realm.get_name());
7c673cae
FG
171 if (r < 0) {
172 lderr(cct) << "failed to load period for realm " << realm_id << dendl;
173 return;
174 }
175
176 std::lock_guard<std::mutex> lock(mutex);
177 handle_notify(std::move(period));
178}
179
180// destructor is here because CRThread is incomplete in the header
181RGWPeriodPusher::~RGWPeriodPusher() = default;
182
183void RGWPeriodPusher::handle_notify(RGWRealmNotify type,
11fdf7f2 184 bufferlist::const_iterator& p)
7c673cae
FG
185{
186 // decode the period
187 RGWZonesNeedPeriod info;
188 try {
11fdf7f2 189 decode(info, p);
7c673cae
FG
190 } catch (buffer::error& e) {
191 lderr(cct) << "Failed to decode the period: " << e.what() << dendl;
192 return;
193 }
194
195 std::lock_guard<std::mutex> lock(mutex);
196
197 // we can't process this notification without access to our current realm
198 // configuration. queue it until resume()
199 if (store == nullptr) {
200 pending_periods.emplace_back(std::move(info));
201 return;
202 }
203
204 handle_notify(std::move(info));
205}
206
207// expects the caller to hold a lock on mutex
208void RGWPeriodPusher::handle_notify(RGWZonesNeedPeriod&& period)
209{
210 if (period.get_realm_epoch() < realm_epoch) {
211 ldout(cct, 10) << "period's realm epoch " << period.get_realm_epoch()
212 << " is not newer than current realm epoch " << realm_epoch
213 << ", discarding update" << dendl;
214 return;
215 }
216 if (period.get_realm_epoch() == realm_epoch &&
217 period.get_epoch() <= period_epoch) {
218 ldout(cct, 10) << "period epoch " << period.get_epoch() << " is not newer "
219 "than current epoch " << period_epoch << ", discarding update" << dendl;
220 return;
221 }
222
223 // find our zonegroup in the new period
224 auto& zonegroups = period.get_map().zonegroups;
9f95a23c 225 auto i = zonegroups.find(store->svc()->zone->get_zonegroup().get_id());
7c673cae
FG
226 if (i == zonegroups.end()) {
227 lderr(cct) << "The new period does not contain my zonegroup!" << dendl;
228 return;
229 }
230 auto& my_zonegroup = i->second;
231
232 // if we're not a master zone, we're not responsible for pushing any updates
9f95a23c 233 if (my_zonegroup.master_zone != store->svc()->zone->get_zone_params().get_id())
7c673cae
FG
234 return;
235
236 // construct a map of the zones that need this period. the map uses the same
237 // keys/ordering as the zone[group] map, so we can use a hint for insertions
238 std::map<std::string, RGWRESTConn> conns;
239 auto hint = conns.end();
240
241 // are we the master zonegroup in this period?
9f95a23c 242 if (period.get_map().master_zonegroup == store->svc()->zone->get_zonegroup().get_id()) {
7c673cae
FG
243 // update other zonegroup endpoints
244 for (auto& zg : zonegroups) {
245 auto& zonegroup = zg.second;
9f95a23c 246 if (zonegroup.get_id() == store->svc()->zone->get_zonegroup().get_id())
7c673cae
FG
247 continue;
248 if (zonegroup.endpoints.empty())
249 continue;
250
251 hint = conns.emplace_hint(
252 hint, std::piecewise_construct,
253 std::forward_as_tuple(zonegroup.get_id()),
9f95a23c 254 std::forward_as_tuple(cct, store->svc()->zone, zonegroup.get_id(), zonegroup.endpoints));
7c673cae
FG
255 }
256 }
257
258 // update other zone endpoints
259 for (auto& z : my_zonegroup.zones) {
260 auto& zone = z.second;
9f95a23c 261 if (zone.id == store->svc()->zone->get_zone_params().get_id())
7c673cae
FG
262 continue;
263 if (zone.endpoints.empty())
264 continue;
265
266 hint = conns.emplace_hint(
267 hint, std::piecewise_construct,
268 std::forward_as_tuple(zone.id),
9f95a23c 269 std::forward_as_tuple(cct, store->svc()->zone, zone.id, zone.endpoints));
7c673cae
FG
270 }
271
272 if (conns.empty()) {
273 ldout(cct, 4) << "No zones to update" << dendl;
274 return;
275 }
276
277 realm_epoch = period.get_realm_epoch();
278 period_epoch = period.get_epoch();
279
280 ldout(cct, 4) << "Zone master pushing period " << period.get_id()
281 << " epoch " << period_epoch << " to "
282 << conns.size() << " other zones" << dendl;
283
284 // spawn a new coroutine thread, destroying the previous one
285 cr_thread.reset(new CRThread(cct, std::move(period), std::move(conns)));
286}
287
288void RGWPeriodPusher::pause()
289{
290 ldout(cct, 4) << "paused for realm update" << dendl;
291 std::lock_guard<std::mutex> lock(mutex);
292 store = nullptr;
293}
294
9f95a23c 295void RGWPeriodPusher::resume(rgw::sal::RGWRadosStore* store)
7c673cae
FG
296{
297 std::lock_guard<std::mutex> lock(mutex);
298 this->store = store;
299
300 ldout(cct, 4) << "resume with " << pending_periods.size()
301 << " periods pending" << dendl;
302
303 // process notification queue
304 for (auto& info : pending_periods) {
305 handle_notify(std::move(info));
306 }
307 pending_periods.clear();
308}