1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
7 #include "rgw_period_pusher.h"
8 #include "rgw_cr_rest.h"
11 #include "rgw_sal_rados.h"
13 #include "services/svc_zone.h"
15 #include "common/errno.h"
17 #include <boost/asio/yield.hpp>
19 #define dout_subsys ceph_subsys_rgw
22 #define dout_prefix (*_dout << "rgw period pusher: ")
24 /// A coroutine to post the period over the given connection.
25 using PushCR
= RGWPostRESTResourceCR
<RGWPeriod
, int>;
27 /// A coroutine that calls PushCR, and retries with backoff until success.
28 class PushAndRetryCR
: public RGWCoroutine
{
29 const std::string
& zone
;
30 RGWRESTConn
*const conn
;
31 RGWHTTPManager
*const http
;
33 const std::string epoch
; //< epoch string for params
34 double timeout
; //< current interval between retries
35 const double timeout_max
; //< maximum interval between retries
36 uint32_t counter
; //< number of failures since backoff increased
39 PushAndRetryCR(CephContext
* cct
, const std::string
& zone
, RGWRESTConn
* conn
,
40 RGWHTTPManager
* http
, RGWPeriod
& period
)
41 : RGWCoroutine(cct
), zone(zone
), conn(conn
), http(http
), period(period
),
42 epoch(std::to_string(period
.get_epoch())),
43 timeout(cct
->_conf
->rgw_period_push_interval
),
44 timeout_max(cct
->_conf
->rgw_period_push_interval_max
),
48 int operate(const DoutPrefixProvider
*dpp
) override
;
51 int PushAndRetryCR::operate(const DoutPrefixProvider
*dpp
)
56 ldpp_dout(dpp
, 10) << "pushing period " << period
.get_id()
57 << " to " << zone
<< dendl
;
58 // initialize the http params
59 rgw_http_param_pair params
[] = {
60 { "period", period
.get_id().c_str() },
61 { "epoch", epoch
.c_str() },
64 call(new PushCR(cct
, conn
, http
, "/admin/realm/period",
65 params
, period
, nullptr));
69 if (get_ret_status() == 0) {
70 ldpp_dout(dpp
, 10) << "push to " << zone
<< " succeeded" << dendl
;
74 // try each endpoint in the connection before waiting
75 if (++counter
< conn
->get_endpoint_count())
79 // wait with exponential backoff up to timeout_max
82 dur
.set_from_double(timeout
);
84 ldpp_dout(dpp
, 10) << "waiting " << dur
<< "s for retry.." << dendl
;
88 if (timeout
> timeout_max
)
89 timeout
= timeout_max
;
97 * PushAllCR is a coroutine that sends the period over all of the given
98 * connections, retrying until they are all marked as completed.
100 class PushAllCR
: public RGWCoroutine
{
101 RGWHTTPManager
*const http
;
102 RGWPeriod period
; //< period object to push
103 std::map
<std::string
, RGWRESTConn
> conns
; //< zones that need the period
106 PushAllCR(CephContext
* cct
, RGWHTTPManager
* http
, RGWPeriod
&& period
,
107 std::map
<std::string
, RGWRESTConn
>&& conns
)
108 : RGWCoroutine(cct
), http(http
),
109 period(std::move(period
)),
110 conns(std::move(conns
))
113 int operate(const DoutPrefixProvider
*dpp
) override
;
116 int PushAllCR::operate(const DoutPrefixProvider
*dpp
)
119 // spawn a coroutine to push the period over each connection
121 ldpp_dout(dpp
, 4) << "sending " << conns
.size() << " periods" << dendl
;
122 for (auto& c
: conns
)
123 spawn(new PushAndRetryCR(cct
, c
.first
, &c
.second
, http
, period
), false);
125 // wait for all to complete
127 return set_cr_done();
132 /// A background thread to run the PushAllCR coroutine and exit.
133 class RGWPeriodPusher::CRThread
: public DoutPrefixProvider
{
135 RGWCoroutinesManager coroutines
;
137 boost::intrusive_ptr
<PushAllCR
> push_all
;
141 CRThread(CephContext
* cct
, RGWPeriod
&& period
,
142 std::map
<std::string
, RGWRESTConn
>&& conns
)
143 : cct(cct
), coroutines(cct
, NULL
),
144 http(cct
, coroutines
.get_completion_mgr()),
145 push_all(new PushAllCR(cct
, &http
, std::move(period
), std::move(conns
)))
148 // must spawn the CR thread after start
149 thread
= std::thread([this]() noexcept
{ coroutines
.run(this, push_all
.get()); });
156 if (thread
.joinable())
160 CephContext
*get_cct() const override
{ return cct
; }
161 unsigned get_subsys() const override
{ return dout_subsys
; }
162 std::ostream
& gen_prefix(std::ostream
& out
) const override
{ return out
<< "rgw period pusher CR thread: "; }
166 RGWPeriodPusher::RGWPeriodPusher(const DoutPrefixProvider
*dpp
, rgw::sal::Store
* store
,
168 : cct(store
->ctx()), store(store
)
170 const auto& realm
= store
->get_zone()->get_realm();
171 auto& realm_id
= realm
.get_id();
172 if (realm_id
.empty()) // no realm configuration
175 // always send out the current period on startup
178 int r
= period
.init(dpp
, cct
, static_cast<rgw::sal::RadosStore
* >(store
)->svc()->sysobj
, realm_id
, y
, realm
.get_name());
180 ldpp_dout(dpp
, -1) << "failed to load period for realm " << realm_id
<< dendl
;
184 std::lock_guard
<std::mutex
> lock(mutex
);
185 handle_notify(std::move(period
));
188 // destructor is here because CRThread is incomplete in the header
189 RGWPeriodPusher::~RGWPeriodPusher() = default;
191 void RGWPeriodPusher::handle_notify(RGWRealmNotify type
,
192 bufferlist::const_iterator
& p
)
195 RGWZonesNeedPeriod info
;
198 } catch (buffer::error
& e
) {
199 lderr(cct
) << "Failed to decode the period: " << e
.what() << dendl
;
203 std::lock_guard
<std::mutex
> lock(mutex
);
205 // we can't process this notification without access to our current realm
206 // configuration. queue it until resume()
207 if (store
== nullptr) {
208 pending_periods
.emplace_back(std::move(info
));
212 handle_notify(std::move(info
));
215 // expects the caller to hold a lock on mutex
216 void RGWPeriodPusher::handle_notify(RGWZonesNeedPeriod
&& period
)
218 if (period
.get_realm_epoch() < realm_epoch
) {
219 ldout(cct
, 10) << "period's realm epoch " << period
.get_realm_epoch()
220 << " is not newer than current realm epoch " << realm_epoch
221 << ", discarding update" << dendl
;
224 if (period
.get_realm_epoch() == realm_epoch
&&
225 period
.get_epoch() <= period_epoch
) {
226 ldout(cct
, 10) << "period epoch " << period
.get_epoch() << " is not newer "
227 "than current epoch " << period_epoch
<< ", discarding update" << dendl
;
231 // find our zonegroup in the new period
232 auto& zonegroups
= period
.get_map().zonegroups
;
233 auto i
= zonegroups
.find(store
->get_zone()->get_zonegroup().get_id());
234 if (i
== zonegroups
.end()) {
235 lderr(cct
) << "The new period does not contain my zonegroup!" << dendl
;
238 auto& my_zonegroup
= i
->second
;
240 // if we're not a master zone, we're not responsible for pushing any updates
241 if (my_zonegroup
.master_zone
!= store
->get_zone()->get_params().get_id())
244 // construct a map of the zones that need this period. the map uses the same
245 // keys/ordering as the zone[group] map, so we can use a hint for insertions
246 std::map
<std::string
, RGWRESTConn
> conns
;
247 auto hint
= conns
.end();
249 // are we the master zonegroup in this period?
250 if (period
.get_map().master_zonegroup
== store
->get_zone()->get_zonegroup().get_id()) {
251 // update other zonegroup endpoints
252 for (auto& zg
: zonegroups
) {
253 auto& zonegroup
= zg
.second
;
254 if (zonegroup
.get_id() == store
->get_zone()->get_zonegroup().get_id())
256 if (zonegroup
.endpoints
.empty())
259 hint
= conns
.emplace_hint(
260 hint
, std::piecewise_construct
,
261 std::forward_as_tuple(zonegroup
.get_id()),
262 std::forward_as_tuple(cct
, store
, zonegroup
.get_id(), zonegroup
.endpoints
, zonegroup
.api_name
));
266 // update other zone endpoints
267 for (auto& z
: my_zonegroup
.zones
) {
268 auto& zone
= z
.second
;
269 if (zone
.id
== store
->get_zone()->get_params().get_id())
271 if (zone
.endpoints
.empty())
274 hint
= conns
.emplace_hint(
275 hint
, std::piecewise_construct
,
276 std::forward_as_tuple(zone
.id
),
277 std::forward_as_tuple(cct
, store
, zone
.id
, zone
.endpoints
, my_zonegroup
.api_name
));
281 ldout(cct
, 4) << "No zones to update" << dendl
;
285 realm_epoch
= period
.get_realm_epoch();
286 period_epoch
= period
.get_epoch();
288 ldout(cct
, 4) << "Zone master pushing period " << period
.get_id()
289 << " epoch " << period_epoch
<< " to "
290 << conns
.size() << " other zones" << dendl
;
292 // spawn a new coroutine thread, destroying the previous one
293 cr_thread
.reset(new CRThread(cct
, std::move(period
), std::move(conns
)));
296 void RGWPeriodPusher::pause()
298 ldout(cct
, 4) << "paused for realm update" << dendl
;
299 std::lock_guard
<std::mutex
> lock(mutex
);
303 void RGWPeriodPusher::resume(rgw::sal::Store
* store
)
305 std::lock_guard
<std::mutex
> lock(mutex
);
308 ldout(cct
, 4) << "resume with " << pending_periods
.size()
309 << " periods pending" << dendl
;
311 // process notification queue
312 for (auto& info
: pending_periods
) {
313 handle_notify(std::move(info
));
315 pending_periods
.clear();