1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "include/random.h"
5 #include "include/Context.h"
6 #include "common/errno.h"
8 #include "svc_notify.h"
9 #include "svc_finisher.h"
11 #include "svc_rados.h"
13 #include "rgw/rgw_zone.h"
15 #define dout_subsys ceph_subsys_rgw
17 static string notify_oid_prefix
= "notify";
19 class RGWWatcher
: public librados::WatchCtx2
{
24 uint64_t watch_handle
;
26 librados::AioCompletion
*register_completion
{nullptr};
28 class C_ReinitWatch
: public Context
{
31 explicit C_ReinitWatch(RGWWatcher
*_watcher
) : watcher(_watcher
) {}
32 void finish(int r
) override
{
37 RGWWatcher(CephContext
*_cct
, RGWSI_Notify
*s
, int i
, RGWSI_RADOS::Obj
& o
) : cct(_cct
), svc(s
), index(i
), obj(o
), watch_handle(0) {}
38 void handle_notify(uint64_t notify_id
,
41 bufferlist
& bl
) override
{
42 ldout(cct
, 10) << "RGWWatcher::handle_notify() "
43 << " notify_id " << notify_id
44 << " cookie " << cookie
45 << " notifier " << notifier_id
46 << " bl.length()=" << bl
.length() << dendl
;
48 if (unlikely(svc
->inject_notify_timeout_probability
== 1) ||
49 (svc
->inject_notify_timeout_probability
> 0 &&
50 (svc
->inject_notify_timeout_probability
>
51 ceph::util::generate_random_number(0.0, 1.0)))) {
53 << "RGWWatcher::handle_notify() dropping notification! "
54 << "If this isn't what you want, set "
55 << "rgw_inject_notify_timeout_probability to zero!" << dendl
;
59 svc
->watch_cb(notify_id
, cookie
, notifier_id
, bl
);
61 bufferlist reply_bl
; // empty reply payload
62 obj
.notify_ack(notify_id
, cookie
, reply_bl
);
64 void handle_error(uint64_t cookie
, int err
) override
{
65 lderr(cct
) << "RGWWatcher::handle_error cookie " << cookie
66 << " err " << cpp_strerror(err
) << dendl
;
67 svc
->remove_watcher(index
);
68 svc
->schedule_context(new C_ReinitWatch(this));
72 int ret
= unregister_watch();
74 ldout(cct
, 0) << "ERROR: unregister_watch() returned ret=" << ret
<< dendl
;
77 ret
= register_watch();
79 ldout(cct
, 0) << "ERROR: register_watch() returned ret=" << ret
<< dendl
;
84 int unregister_watch() {
85 int r
= svc
->unwatch(obj
, watch_handle
);
89 svc
->remove_watcher(index
);
93 int register_watch_async() {
94 if (register_completion
) {
95 register_completion
->release();
96 register_completion
= nullptr;
98 register_completion
= librados::Rados::aio_create_completion(nullptr, nullptr);
99 register_ret
= obj
.aio_watch(register_completion
, &watch_handle
, this);
100 if (register_ret
< 0) {
101 register_completion
->release();
107 int register_watch_finish() {
108 if (register_ret
< 0) {
111 if (!register_completion
) {
114 register_completion
->wait_for_complete();
115 int r
= register_completion
->get_return_value();
116 register_completion
->release();
117 register_completion
= nullptr;
121 svc
->add_watcher(index
);
125 int register_watch() {
126 int r
= obj
.watch(&watch_handle
, this);
130 svc
->add_watcher(index
);
136 class RGWSI_Notify_ShutdownCB
: public RGWSI_Finisher::ShutdownCB
140 RGWSI_Notify_ShutdownCB(RGWSI_Notify
*_svc
) : svc(_svc
) {}
141 void call() override
{
146 string
RGWSI_Notify::get_control_oid(int i
)
148 char buf
[notify_oid_prefix
.size() + 16];
149 snprintf(buf
, sizeof(buf
), "%s.%d", notify_oid_prefix
.c_str(), i
);
154 RGWSI_RADOS::Obj
RGWSI_Notify::pick_control_obj(const string
& key
)
156 uint32_t r
= ceph_str_hash_linux(key
.c_str(), key
.size());
158 int i
= r
% num_watchers
;
159 return notify_objs
[i
];
162 int RGWSI_Notify::init_watch()
164 num_watchers
= cct
->_conf
->rgw_num_control_oids
;
166 bool compat_oid
= (num_watchers
== 0);
168 if (num_watchers
<= 0)
171 watchers
= new RGWWatcher
*[num_watchers
];
175 notify_objs
.resize(num_watchers
);
177 for (int i
=0; i
< num_watchers
; i
++) {
181 notify_oid
= get_control_oid(i
);
183 notify_oid
= notify_oid_prefix
;
186 notify_objs
[i
] = rados_svc
->handle().obj({control_pool
, notify_oid
});
187 auto& notify_obj
= notify_objs
[i
];
189 int r
= notify_obj
.open();
191 ldout(cct
, 0) << "ERROR: notify_obj.open() returned r=" << r
<< dendl
;
195 librados::ObjectWriteOperation op
;
197 r
= notify_obj
.operate(&op
, null_yield
);
198 if (r
< 0 && r
!= -EEXIST
) {
199 ldout(cct
, 0) << "ERROR: notify_obj.operate() returned r=" << r
<< dendl
;
203 RGWWatcher
*watcher
= new RGWWatcher(cct
, this, i
, notify_obj
);
204 watchers
[i
] = watcher
;
206 r
= watcher
->register_watch_async();
208 ldout(cct
, 0) << "WARNING: register_watch_aio() returned " << r
<< dendl
;
214 for (int i
= 0; i
< num_watchers
; ++i
) {
215 int r
= watchers
[i
]->register_watch_finish();
217 ldout(cct
, 0) << "WARNING: async watch returned " << r
<< dendl
;
229 void RGWSI_Notify::finalize_watch()
231 for (int i
= 0; i
< num_watchers
; i
++) {
232 RGWWatcher
*watcher
= watchers
[i
];
233 watcher
->unregister_watch();
240 int RGWSI_Notify::do_start()
242 int r
= zone_svc
->start();
247 assert(zone_svc
->is_started()); /* otherwise there's an ordering problem */
249 r
= rados_svc
->start();
253 r
= finisher_svc
->start();
258 control_pool
= zone_svc
->get_zone_params().control_pool
;
260 int ret
= init_watch();
262 lderr(cct
) << "ERROR: failed to initialize watch: " << cpp_strerror(-ret
) << dendl
;
266 shutdown_cb
= new RGWSI_Notify_ShutdownCB(this);
268 finisher_svc
->register_caller(shutdown_cb
, &handle
);
269 finisher_handle
= handle
;
274 void RGWSI_Notify::shutdown()
280 if (finisher_handle
) {
281 finisher_svc
->unregister_caller(*finisher_handle
);
290 RGWSI_Notify::~RGWSI_Notify()
295 int RGWSI_Notify::unwatch(RGWSI_RADOS::Obj
& obj
, uint64_t watch_handle
)
297 int r
= obj
.unwatch(watch_handle
);
299 ldout(cct
, 0) << "ERROR: rados->unwatch2() returned r=" << r
<< dendl
;
302 r
= rados_svc
->handle().watch_flush();
304 ldout(cct
, 0) << "ERROR: rados->watch_flush() returned r=" << r
<< dendl
;
310 void RGWSI_Notify::add_watcher(int i
)
312 ldout(cct
, 20) << "add_watcher() i=" << i
<< dendl
;
313 std::unique_lock l
{watchers_lock
};
314 watchers_set
.insert(i
);
315 if (watchers_set
.size() == (size_t)num_watchers
) {
316 ldout(cct
, 2) << "all " << num_watchers
<< " watchers are set, enabling cache" << dendl
;
321 void RGWSI_Notify::remove_watcher(int i
)
323 ldout(cct
, 20) << "remove_watcher() i=" << i
<< dendl
;
324 std::unique_lock l
{watchers_lock
};
325 size_t orig_size
= watchers_set
.size();
326 watchers_set
.erase(i
);
327 if (orig_size
== (size_t)num_watchers
&&
328 watchers_set
.size() < orig_size
) { /* actually removed */
329 ldout(cct
, 2) << "removed watcher, disabling cache" << dendl
;
334 int RGWSI_Notify::watch_cb(uint64_t notify_id
,
336 uint64_t notifier_id
,
339 std::shared_lock l
{watchers_lock
};
341 return cb
->watch_cb(notify_id
, cookie
, notifier_id
, bl
);
346 void RGWSI_Notify::set_enabled(bool status
)
348 std::unique_lock l
{watchers_lock
};
349 _set_enabled(status
);
352 void RGWSI_Notify::_set_enabled(bool status
)
356 cb
->set_enabled(status
);
360 int RGWSI_Notify::distribute(const string
& key
, bufferlist
& bl
,
363 RGWSI_RADOS::Obj notify_obj
= pick_control_obj(key
);
365 ldout(cct
, 10) << "distributing notification oid=" << notify_obj
.get_ref().obj
366 << " bl.length()=" << bl
.length() << dendl
;
367 return robust_notify(notify_obj
, bl
, y
);
370 int RGWSI_Notify::robust_notify(RGWSI_RADOS::Obj
& notify_obj
, bufferlist
& bl
,
373 // The reply of every machine that acks goes in here.
374 boost::container::flat_set
<std::pair
<uint64_t, uint64_t>> acks
;
377 // First, try to send, without being fancy about it.
378 auto r
= notify_obj
.notify(bl
, 0, &rbl
, y
);
380 // If that doesn't work, get serious.
382 ldout(cct
, 1) << "robust_notify: If at first you don't succeed: "
383 << cpp_strerror(-r
) << dendl
;
386 auto p
= rbl
.cbegin();
387 // Gather up the replies to the first attempt.
391 // Doing this ourselves since we don't care about the payload;
392 for (auto i
= 0u; i
< num_acks
; ++i
) {
393 std::pair
<uint64_t, uint64_t> id
;
396 ldout(cct
, 20) << "robust_notify: acked by " << id
<< dendl
;
401 } catch (const buffer::error
& e
) {
402 ldout(cct
, 0) << "robust_notify: notify response parse failed: "
403 << e
.what() << dendl
;
404 acks
.clear(); // Throw away junk on failed parse.
408 // Every machine that fails to reply and hasn't acked a previous
409 // attempt goes in here.
410 boost::container::flat_set
<std::pair
<uint64_t, uint64_t>> timeouts
;
413 while (r
< 0 && tries
< max_notify_retries
) {
416 // Reset the timeouts, we're only concerned with new ones.
418 r
= notify_obj
.notify(bl
, 0, &rbl
, y
);
420 ldout(cct
, 1) << "robust_notify: retry " << tries
<< " failed: "
421 << cpp_strerror(-r
) << dendl
;
426 // Not only do we not care about the payload, but we don't
427 // want to empty the container; we just want to augment it
428 // with any new members.
429 for (auto i
= 0u; i
< num_acks
; ++i
) {
430 std::pair
<uint64_t, uint64_t> id
;
432 auto ir
= acks
.insert(id
);
434 ldout(cct
, 20) << "robust_notify: acked by " << id
<< dendl
;
441 uint32_t num_timeouts
;
442 decode(num_timeouts
, p
);
443 for (auto i
= 0u; i
< num_timeouts
; ++i
) {
444 std::pair
<uint64_t, uint64_t> id
;
446 // Only track timeouts from hosts that haven't acked previously.
447 if (acks
.find(id
) != acks
.cend()) {
448 ldout(cct
, 20) << "robust_notify: " << id
<< " timed out."
453 } catch (const buffer::error
& e
) {
454 ldout(cct
, 0) << "robust_notify: notify response parse failed: "
455 << e
.what() << dendl
;
458 // If we got a good parse and timeouts is empty, that means
459 // everyone who timed out in one call received the update in a
461 if (timeouts
.empty()) {
470 void RGWSI_Notify::register_watch_cb(CB
*_cb
)
472 std::unique_lock l
{watchers_lock
};
474 _set_enabled(enabled
);
477 void RGWSI_Notify::schedule_context(Context
*c
)
479 finisher_svc
->schedule_context(c
);