1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "LeaderWatcher.h"
5 #include "common/Cond.h"
6 #include "common/Timer.h"
7 #include "common/debug.h"
8 #include "common/errno.h"
9 #include "cls/rbd/cls_rbd_client.h"
10 #include "include/stringify.h"
11 #include "librbd/Utils.h"
12 #include "librbd/asio/ContextWQ.h"
13 #include "librbd/watcher/Types.h"
16 #define dout_context g_ceph_context
17 #define dout_subsys ceph_subsys_rbd_mirror
19 #define dout_prefix *_dout << "rbd::mirror::LeaderWatcher: " \
20 << this << " " << __func__ << ": "
24 using namespace leader_watcher
;
26 using librbd::util::create_async_context_callback
;
27 using librbd::util::create_context_callback
;
28 using librbd::util::create_rados_callback
;
31 LeaderWatcher
<I
>::LeaderWatcher(Threads
<I
> *threads
, librados::IoCtx
&io_ctx
,
32 leader_watcher::Listener
*listener
)
33 : Watcher(io_ctx
, threads
->work_queue
, RBD_MIRROR_LEADER
),
34 m_threads(threads
), m_listener(listener
), m_instances_listener(this),
35 m_lock(ceph::make_mutex("rbd::mirror::LeaderWatcher " +
36 io_ctx
.get_pool_name())),
37 m_notifier_id(librados::Rados(io_ctx
).get_instance_id()),
38 m_instance_id(stringify(m_notifier_id
)),
39 m_leader_lock(new LeaderLock(m_ioctx
, *m_threads
->asio_engine
, m_oid
, this,
40 true, m_cct
->_conf
.get_val
<uint64_t>(
41 "rbd_blocklist_expire_seconds"))) {
45 LeaderWatcher
<I
>::~LeaderWatcher() {
46 ceph_assert(m_instances
== nullptr);
47 ceph_assert(m_timer_task
== nullptr);
53 std::string LeaderWatcher
<I
>::get_instance_id() {
58 int LeaderWatcher
<I
>::init() {
61 return init_ctx
.wait();
65 void LeaderWatcher
<I
>::init(Context
*on_finish
) {
66 dout(10) << "notifier_id=" << m_notifier_id
<< dendl
;
68 std::lock_guard locker
{m_lock
};
70 ceph_assert(m_on_finish
== nullptr);
71 m_on_finish
= on_finish
;
73 create_leader_object();
77 void LeaderWatcher
<I
>::create_leader_object() {
80 ceph_assert(ceph_mutex_is_locked(m_lock
));
82 librados::ObjectWriteOperation op
;
85 librados::AioCompletion
*aio_comp
= create_rados_callback
<
86 LeaderWatcher
<I
>, &LeaderWatcher
<I
>::handle_create_leader_object
>(this);
87 int r
= m_ioctx
.aio_operate(m_oid
, aio_comp
, &op
);
93 void LeaderWatcher
<I
>::handle_create_leader_object(int r
) {
94 dout(10) << "r=" << r
<< dendl
;
96 Context
*on_finish
= nullptr;
98 std::lock_guard locker
{m_lock
};
105 derr
<< "error creating " << m_oid
<< " object: " << cpp_strerror(r
)
108 std::swap(on_finish
, m_on_finish
);
110 on_finish
->complete(r
);
113 template <typename I
>
114 void LeaderWatcher
<I
>::register_watch() {
117 ceph_assert(ceph_mutex_is_locked(m_lock
));
119 Context
*ctx
= create_async_context_callback(
120 m_work_queue
, create_context_callback
<
121 LeaderWatcher
<I
>, &LeaderWatcher
<I
>::handle_register_watch
>(this));
123 librbd::Watcher::register_watch(ctx
);
126 template <typename I
>
127 void LeaderWatcher
<I
>::handle_register_watch(int r
) {
128 dout(10) << "r=" << r
<< dendl
;
130 Context
*on_finish
= nullptr;
132 std::lock_guard
timer_locker(m_threads
->timer_lock
);
133 std::lock_guard locker
{m_lock
};
136 derr
<< "error registering leader watcher for " << m_oid
<< " object: "
137 << cpp_strerror(r
) << dendl
;
139 schedule_acquire_leader_lock(0);
142 ceph_assert(m_on_finish
!= nullptr);
143 std::swap(on_finish
, m_on_finish
);
146 on_finish
->complete(r
);
149 template <typename I
>
150 void LeaderWatcher
<I
>::shut_down() {
151 C_SaferCond shut_down_ctx
;
152 shut_down(&shut_down_ctx
);
153 int r
= shut_down_ctx
.wait();
157 template <typename I
>
158 void LeaderWatcher
<I
>::shut_down(Context
*on_finish
) {
161 std::scoped_lock locker
{m_threads
->timer_lock
, m_lock
};
163 ceph_assert(m_on_shut_down_finish
== nullptr);
164 m_on_shut_down_finish
= on_finish
;
166 shut_down_leader_lock();
169 template <typename I
>
170 void LeaderWatcher
<I
>::shut_down_leader_lock() {
173 ceph_assert(ceph_mutex_is_locked(m_lock
));
175 Context
*ctx
= create_async_context_callback(
176 m_work_queue
, create_context_callback
<
177 LeaderWatcher
<I
>, &LeaderWatcher
<I
>::handle_shut_down_leader_lock
>(this));
179 m_leader_lock
->shut_down(ctx
);
182 template <typename I
>
183 void LeaderWatcher
<I
>::handle_shut_down_leader_lock(int r
) {
184 dout(10) << "r=" << r
<< dendl
;
186 std::lock_guard locker
{m_lock
};
189 derr
<< "error shutting down leader lock: " << cpp_strerror(r
) << dendl
;
195 template <typename I
>
196 void LeaderWatcher
<I
>::unregister_watch() {
199 ceph_assert(ceph_mutex_is_locked(m_lock
));
201 Context
*ctx
= create_async_context_callback(
202 m_work_queue
, create_context_callback
<
203 LeaderWatcher
<I
>, &LeaderWatcher
<I
>::handle_unregister_watch
>(this));
205 librbd::Watcher::unregister_watch(ctx
);
208 template <typename I
>
209 void LeaderWatcher
<I
>::handle_unregister_watch(int r
) {
210 dout(10) << "r=" << r
<< dendl
;
213 derr
<< "error unregistering leader watcher for " << m_oid
<< " object: "
214 << cpp_strerror(r
) << dendl
;
219 template <typename I
>
220 void LeaderWatcher
<I
>::wait_for_tasks() {
223 std::scoped_lock locker
{m_threads
->timer_lock
, m_lock
};
224 schedule_timer_task("wait for tasks", 0, false,
225 &LeaderWatcher
<I
>::handle_wait_for_tasks
, true);
228 template <typename I
>
229 void LeaderWatcher
<I
>::handle_wait_for_tasks() {
232 ceph_assert(ceph_mutex_is_locked(m_threads
->timer_lock
));
233 ceph_assert(ceph_mutex_is_locked(m_lock
));
234 ceph_assert(m_on_shut_down_finish
!= nullptr);
236 ceph_assert(!m_timer_op_tracker
.empty());
237 m_timer_op_tracker
.finish_op();
239 auto ctx
= new LambdaContext([this](int r
) {
242 // ensure lock isn't held when completing shut down
243 std::lock_guard locker
{m_lock
};
244 ceph_assert(m_on_shut_down_finish
!= nullptr);
245 on_finish
= m_on_shut_down_finish
;
247 on_finish
->complete(0);
249 m_work_queue
->queue(ctx
, 0);
252 template <typename I
>
253 bool LeaderWatcher
<I
>::is_blocklisted() const {
254 std::lock_guard locker
{m_lock
};
255 return m_blocklisted
;
258 template <typename I
>
259 bool LeaderWatcher
<I
>::is_leader() const {
260 std::lock_guard locker
{m_lock
};
261 return is_leader(m_lock
);
264 template <typename I
>
265 bool LeaderWatcher
<I
>::is_leader(ceph::mutex
&lock
) const {
266 ceph_assert(ceph_mutex_is_locked(m_lock
));
268 bool leader
= m_leader_lock
->is_leader();
269 dout(10) << leader
<< dendl
;
273 template <typename I
>
274 bool LeaderWatcher
<I
>::is_releasing_leader() const {
275 std::lock_guard locker
{m_lock
};
276 return is_releasing_leader(m_lock
);
279 template <typename I
>
280 bool LeaderWatcher
<I
>::is_releasing_leader(ceph::mutex
&lock
) const {
281 ceph_assert(ceph_mutex_is_locked(m_lock
));
283 bool releasing
= m_leader_lock
->is_releasing_leader();
284 dout(10) << releasing
<< dendl
;
288 template <typename I
>
289 bool LeaderWatcher
<I
>::get_leader_instance_id(std::string
*instance_id
) const {
292 std::lock_guard locker
{m_lock
};
294 if (is_leader(m_lock
) || is_releasing_leader(m_lock
)) {
295 *instance_id
= m_instance_id
;
299 if (!m_locker
.cookie
.empty()) {
300 *instance_id
= stringify(m_locker
.entity
.num());
307 template <typename I
>
308 void LeaderWatcher
<I
>::release_leader() {
311 std::lock_guard locker
{m_lock
};
312 if (!is_leader(m_lock
)) {
316 release_leader_lock();
319 template <typename I
>
320 void LeaderWatcher
<I
>::list_instances(std::vector
<std::string
> *instance_ids
) {
323 std::lock_guard locker
{m_lock
};
325 instance_ids
->clear();
326 if (m_instances
!= nullptr) {
327 m_instances
->list(instance_ids
);
331 template <typename I
>
332 void LeaderWatcher
<I
>::cancel_timer_task() {
333 ceph_assert(ceph_mutex_is_locked(m_threads
->timer_lock
));
334 ceph_assert(ceph_mutex_is_locked(m_lock
));
336 if (m_timer_task
== nullptr) {
340 dout(10) << m_timer_task
<< dendl
;
341 bool canceled
= m_threads
->timer
->cancel_event(m_timer_task
);
342 ceph_assert(canceled
);
343 m_timer_task
= nullptr;
346 template <typename I
>
347 void LeaderWatcher
<I
>::schedule_timer_task(const std::string
&name
,
348 int delay_factor
, bool leader
,
349 TimerCallback timer_callback
,
350 bool shutting_down
) {
351 ceph_assert(ceph_mutex_is_locked(m_threads
->timer_lock
));
352 ceph_assert(ceph_mutex_is_locked(m_lock
));
354 if (!shutting_down
&& m_on_shut_down_finish
!= nullptr) {
360 m_timer_task
= new LambdaContext(
361 [this, leader
, timer_callback
](int r
) {
362 ceph_assert(ceph_mutex_is_locked(m_threads
->timer_lock
));
363 m_timer_task
= nullptr;
365 if (m_timer_op_tracker
.empty()) {
366 std::lock_guard locker
{m_lock
};
367 execute_timer_task(leader
, timer_callback
);
371 // old timer task is still running -- do not start next
372 // task until the previous task completes
373 if (m_timer_gate
== nullptr) {
374 m_timer_gate
= new C_TimerGate(this);
375 m_timer_op_tracker
.wait_for_ops(m_timer_gate
);
377 m_timer_gate
->leader
= leader
;
378 m_timer_gate
->timer_callback
= timer_callback
;
381 int after
= delay_factor
* m_cct
->_conf
.get_val
<uint64_t>(
382 "rbd_mirror_leader_heartbeat_interval");
384 dout(10) << "scheduling " << name
<< " after " << after
<< " sec (task "
385 << m_timer_task
<< ")" << dendl
;
386 m_threads
->timer
->add_event_after(after
, m_timer_task
);
389 template <typename I
>
390 void LeaderWatcher
<I
>::execute_timer_task(bool leader
,
391 TimerCallback timer_callback
) {
394 ceph_assert(ceph_mutex_is_locked(m_threads
->timer_lock
));
395 ceph_assert(ceph_mutex_is_locked(m_lock
));
396 ceph_assert(m_timer_op_tracker
.empty());
398 if (is_leader(m_lock
) != leader
) {
402 m_timer_op_tracker
.start_op();
403 (this->*timer_callback
)();
406 template <typename I
>
407 void LeaderWatcher
<I
>::handle_post_acquire_leader_lock(int r
,
408 Context
*on_finish
) {
409 dout(10) << "r=" << r
<< dendl
;
413 dout(10) << "already locked" << dendl
;
415 derr
<< "error acquiring leader lock: " << cpp_strerror(r
) << dendl
;
417 on_finish
->complete(r
);
421 std::lock_guard locker
{m_lock
};
422 ceph_assert(m_on_finish
== nullptr);
423 m_on_finish
= on_finish
;
429 template <typename I
>
430 void LeaderWatcher
<I
>::handle_pre_release_leader_lock(Context
*on_finish
) {
433 std::lock_guard locker
{m_lock
};
434 ceph_assert(m_on_finish
== nullptr);
435 m_on_finish
= on_finish
;
441 template <typename I
>
442 void LeaderWatcher
<I
>::handle_post_release_leader_lock(int r
,
443 Context
*on_finish
) {
444 dout(10) << "r=" << r
<< dendl
;
447 on_finish
->complete(r
);
451 std::lock_guard locker
{m_lock
};
452 ceph_assert(m_on_finish
== nullptr);
453 m_on_finish
= on_finish
;
455 notify_lock_released();
458 template <typename I
>
459 void LeaderWatcher
<I
>::break_leader_lock() {
462 ceph_assert(ceph_mutex_is_locked(m_threads
->timer_lock
));
463 ceph_assert(ceph_mutex_is_locked(m_lock
));
464 ceph_assert(!m_timer_op_tracker
.empty());
466 if (m_locker
.cookie
.empty()) {
471 Context
*ctx
= create_async_context_callback(
472 m_work_queue
, create_context_callback
<
473 LeaderWatcher
<I
>, &LeaderWatcher
<I
>::handle_break_leader_lock
>(this));
475 m_leader_lock
->break_lock(m_locker
, true, ctx
);
478 template <typename I
>
479 void LeaderWatcher
<I
>::handle_break_leader_lock(int r
) {
480 dout(10) << "r=" << r
<< dendl
;
482 std::scoped_lock locker
{m_threads
->timer_lock
, m_lock
};
483 ceph_assert(!m_timer_op_tracker
.empty());
485 if (m_leader_lock
->is_shutdown()) {
486 dout(10) << "canceling due to shutdown" << dendl
;
487 m_timer_op_tracker
.finish_op();
491 if (r
< 0 && r
!= -ENOENT
) {
492 derr
<< "error breaking leader lock: " << cpp_strerror(r
) << dendl
;
493 schedule_acquire_leader_lock(1);
494 m_timer_op_tracker
.finish_op();
499 m_acquire_attempts
= 0;
500 acquire_leader_lock();
503 template <typename I
>
504 void LeaderWatcher
<I
>::schedule_get_locker(bool reset_leader
,
505 uint32_t delay_factor
) {
508 ceph_assert(ceph_mutex_is_locked(m_threads
->timer_lock
));
509 ceph_assert(ceph_mutex_is_locked(m_lock
));
513 m_acquire_attempts
= 0;
516 schedule_timer_task("get locker", delay_factor
, false,
517 &LeaderWatcher
<I
>::get_locker
, false);
520 template <typename I
>
521 void LeaderWatcher
<I
>::get_locker() {
524 ceph_assert(ceph_mutex_is_locked(m_threads
->timer_lock
));
525 ceph_assert(ceph_mutex_is_locked(m_lock
));
526 ceph_assert(!m_timer_op_tracker
.empty());
528 C_GetLocker
*get_locker_ctx
= new C_GetLocker(this);
529 Context
*ctx
= create_async_context_callback(m_work_queue
, get_locker_ctx
);
531 m_leader_lock
->get_locker(&get_locker_ctx
->locker
, ctx
);
534 template <typename I
>
535 void LeaderWatcher
<I
>::handle_get_locker(int r
,
536 librbd::managed_lock::Locker
& locker
) {
537 dout(10) << "r=" << r
<< dendl
;
539 std::scoped_lock l
{m_threads
->timer_lock
, m_lock
};
540 ceph_assert(!m_timer_op_tracker
.empty());
542 if (m_leader_lock
->is_shutdown()) {
543 dout(10) << "canceling due to shutdown" << dendl
;
544 m_timer_op_tracker
.finish_op();
548 if (is_leader(m_lock
)) {
550 m_timer_op_tracker
.finish_op();
556 m_acquire_attempts
= 0;
557 acquire_leader_lock();
560 derr
<< "error retrieving leader locker: " << cpp_strerror(r
) << dendl
;
561 schedule_get_locker(true, 1);
562 m_timer_op_tracker
.finish_op();
566 bool notify_listener
= false;
567 if (m_locker
!= locker
) {
569 notify_listener
= true;
570 if (m_acquire_attempts
> 1) {
571 dout(10) << "new lock owner detected -- resetting heartbeat counter"
573 m_acquire_attempts
= 0;
577 if (m_acquire_attempts
>= m_cct
->_conf
.get_val
<uint64_t>(
578 "rbd_mirror_leader_max_acquire_attempts_before_break")) {
579 dout(0) << "breaking leader lock after " << m_acquire_attempts
<< " "
580 << "failed attempts to acquire" << dendl
;
585 schedule_acquire_leader_lock(1);
587 if (!notify_listener
) {
588 m_timer_op_tracker
.finish_op();
592 auto ctx
= new LambdaContext(
594 std::string instance_id
;
595 if (get_leader_instance_id(&instance_id
)) {
596 m_listener
->update_leader_handler(instance_id
);
598 std::scoped_lock locker
{m_threads
->timer_lock
, m_lock
};
599 m_timer_op_tracker
.finish_op();
601 m_work_queue
->queue(ctx
, 0);
604 template <typename I
>
605 void LeaderWatcher
<I
>::schedule_acquire_leader_lock(uint32_t delay_factor
) {
608 ceph_assert(ceph_mutex_is_locked(m_threads
->timer_lock
));
609 ceph_assert(ceph_mutex_is_locked(m_lock
));
611 schedule_timer_task("acquire leader lock",
613 m_cct
->_conf
.get_val
<uint64_t>("rbd_mirror_leader_max_missed_heartbeats"),
614 false, &LeaderWatcher
<I
>::acquire_leader_lock
, false);
617 template <typename I
>
618 void LeaderWatcher
<I
>::acquire_leader_lock() {
619 ceph_assert(ceph_mutex_is_locked(m_threads
->timer_lock
));
620 ceph_assert(ceph_mutex_is_locked(m_lock
));
621 ceph_assert(!m_timer_op_tracker
.empty());
623 ++m_acquire_attempts
;
624 dout(10) << "acquire_attempts=" << m_acquire_attempts
<< dendl
;
626 Context
*ctx
= create_async_context_callback(
627 m_work_queue
, create_context_callback
<
628 LeaderWatcher
<I
>, &LeaderWatcher
<I
>::handle_acquire_leader_lock
>(this));
629 m_leader_lock
->try_acquire_lock(ctx
);
632 template <typename I
>
633 void LeaderWatcher
<I
>::handle_acquire_leader_lock(int r
) {
634 dout(10) << "r=" << r
<< dendl
;
636 std::scoped_lock locker
{m_threads
->timer_lock
, m_lock
};
637 ceph_assert(!m_timer_op_tracker
.empty());
639 if (m_leader_lock
->is_shutdown()) {
640 dout(10) << "canceling due to shutdown" << dendl
;
641 m_timer_op_tracker
.finish_op();
647 dout(10) << "already locked" << dendl
;
649 derr
<< "error acquiring lock: " << cpp_strerror(r
) << dendl
;
657 m_acquire_attempts
= 0;
660 dout(5) << "releasing due to error on notify" << dendl
;
661 release_leader_lock();
662 m_timer_op_tracker
.finish_op();
669 template <typename I
>
670 void LeaderWatcher
<I
>::release_leader_lock() {
673 ceph_assert(ceph_mutex_is_locked(m_lock
));
675 Context
*ctx
= create_async_context_callback(
676 m_work_queue
, create_context_callback
<
677 LeaderWatcher
<I
>, &LeaderWatcher
<I
>::handle_release_leader_lock
>(this));
679 m_leader_lock
->release_lock(ctx
);
682 template <typename I
>
683 void LeaderWatcher
<I
>::handle_release_leader_lock(int r
) {
684 dout(10) << "r=" << r
<< dendl
;
686 std::scoped_lock locker
{m_threads
->timer_lock
, m_lock
};
689 derr
<< "error releasing lock: " << cpp_strerror(r
) << dendl
;
693 schedule_acquire_leader_lock(1);
696 template <typename I
>
697 void LeaderWatcher
<I
>::init_instances() {
700 ceph_assert(ceph_mutex_is_locked(m_lock
));
701 ceph_assert(m_instances
== nullptr);
703 m_instances
= Instances
<I
>::create(m_threads
, m_ioctx
, m_instance_id
,
704 m_instances_listener
);
706 Context
*ctx
= create_context_callback
<
707 LeaderWatcher
<I
>, &LeaderWatcher
<I
>::handle_init_instances
>(this);
709 m_instances
->init(ctx
);
712 template <typename I
>
713 void LeaderWatcher
<I
>::handle_init_instances(int r
) {
714 dout(10) << "r=" << r
<< dendl
;
716 Context
*on_finish
= nullptr;
718 std::lock_guard locker
{m_lock
};
719 derr
<< "error initializing instances: " << cpp_strerror(r
) << dendl
;
720 m_instances
->destroy();
721 m_instances
= nullptr;
723 ceph_assert(m_on_finish
!= nullptr);
724 std::swap(m_on_finish
, on_finish
);
726 std::lock_guard locker
{m_lock
};
731 on_finish
->complete(r
);
734 template <typename I
>
735 void LeaderWatcher
<I
>::shut_down_instances() {
738 ceph_assert(ceph_mutex_is_locked(m_lock
));
739 ceph_assert(m_instances
!= nullptr);
741 Context
*ctx
= create_async_context_callback(
742 m_work_queue
, create_context_callback
<LeaderWatcher
<I
>,
743 &LeaderWatcher
<I
>::handle_shut_down_instances
>(this));
745 m_instances
->shut_down(ctx
);
748 template <typename I
>
749 void LeaderWatcher
<I
>::handle_shut_down_instances(int r
) {
750 dout(10) << "r=" << r
<< dendl
;
753 Context
*on_finish
= nullptr;
755 std::lock_guard locker
{m_lock
};
757 m_instances
->destroy();
758 m_instances
= nullptr;
760 ceph_assert(m_on_finish
!= nullptr);
761 std::swap(m_on_finish
, on_finish
);
763 on_finish
->complete(r
);
766 template <typename I
>
767 void LeaderWatcher
<I
>::notify_listener() {
770 ceph_assert(ceph_mutex_is_locked(m_lock
));
772 Context
*ctx
= create_async_context_callback(
773 m_work_queue
, create_context_callback
<
774 LeaderWatcher
<I
>, &LeaderWatcher
<I
>::handle_notify_listener
>(this));
776 if (is_leader(m_lock
)) {
777 ctx
= new LambdaContext(
779 m_listener
->post_acquire_handler(ctx
);
782 ctx
= new LambdaContext(
784 m_listener
->pre_release_handler(ctx
);
787 m_work_queue
->queue(ctx
, 0);
790 template <typename I
>
791 void LeaderWatcher
<I
>::handle_notify_listener(int r
) {
792 dout(10) << "r=" << r
<< dendl
;
794 std::lock_guard locker
{m_lock
};
797 derr
<< "error notifying listener: " << cpp_strerror(r
) << dendl
;
801 if (is_leader(m_lock
)) {
802 notify_lock_acquired();
804 shut_down_instances();
808 template <typename I
>
809 void LeaderWatcher
<I
>::notify_lock_acquired() {
812 ceph_assert(ceph_mutex_is_locked(m_lock
));
814 Context
*ctx
= create_context_callback
<
815 LeaderWatcher
<I
>, &LeaderWatcher
<I
>::handle_notify_lock_acquired
>(this);
818 encode(NotifyMessage
{LockAcquiredPayload
{}}, bl
);
820 send_notify(bl
, nullptr, ctx
);
823 template <typename I
>
824 void LeaderWatcher
<I
>::handle_notify_lock_acquired(int r
) {
825 dout(10) << "r=" << r
<< dendl
;
827 Context
*on_finish
= nullptr;
829 std::lock_guard locker
{m_lock
};
830 if (r
< 0 && r
!= -ETIMEDOUT
) {
831 derr
<< "error notifying leader lock acquired: " << cpp_strerror(r
)
836 ceph_assert(m_on_finish
!= nullptr);
837 std::swap(m_on_finish
, on_finish
);
839 if (m_ret_val
== 0) {
840 // listener should be ready for instance add/remove events now
841 m_instances
->unblock_listener();
844 on_finish
->complete(0);
847 template <typename I
>
848 void LeaderWatcher
<I
>::notify_lock_released() {
851 ceph_assert(ceph_mutex_is_locked(m_lock
));
853 Context
*ctx
= create_context_callback
<
854 LeaderWatcher
<I
>, &LeaderWatcher
<I
>::handle_notify_lock_released
>(this);
857 encode(NotifyMessage
{LockReleasedPayload
{}}, bl
);
859 send_notify(bl
, nullptr, ctx
);
862 template <typename I
>
863 void LeaderWatcher
<I
>::handle_notify_lock_released(int r
) {
864 dout(10) << "r=" << r
<< dendl
;
866 Context
*on_finish
= nullptr;
868 std::lock_guard locker
{m_lock
};
869 if (r
< 0 && r
!= -ETIMEDOUT
) {
870 derr
<< "error notifying leader lock released: " << cpp_strerror(r
)
874 ceph_assert(m_on_finish
!= nullptr);
875 std::swap(m_on_finish
, on_finish
);
877 on_finish
->complete(r
);
880 template <typename I
>
881 void LeaderWatcher
<I
>::notify_heartbeat() {
884 ceph_assert(ceph_mutex_is_locked(m_threads
->timer_lock
));
885 ceph_assert(ceph_mutex_is_locked(m_lock
));
886 ceph_assert(!m_timer_op_tracker
.empty());
888 if (!is_leader(m_lock
)) {
889 dout(5) << "not leader, canceling" << dendl
;
890 m_timer_op_tracker
.finish_op();
894 Context
*ctx
= create_context_callback
<
895 LeaderWatcher
<I
>, &LeaderWatcher
<I
>::handle_notify_heartbeat
>(this);
898 encode(NotifyMessage
{HeartbeatPayload
{}}, bl
);
900 m_heartbeat_response
.acks
.clear();
901 send_notify(bl
, &m_heartbeat_response
, ctx
);
904 template <typename I
>
905 void LeaderWatcher
<I
>::handle_notify_heartbeat(int r
) {
906 dout(10) << "r=" << r
<< dendl
;
908 std::scoped_lock locker
{m_threads
->timer_lock
, m_lock
};
909 ceph_assert(!m_timer_op_tracker
.empty());
911 m_timer_op_tracker
.finish_op();
912 if (m_leader_lock
->is_shutdown()) {
913 dout(10) << "canceling due to shutdown" << dendl
;
915 } else if (!is_leader(m_lock
)) {
919 if (r
< 0 && r
!= -ETIMEDOUT
) {
920 derr
<< "error notifying heartbeat: " << cpp_strerror(r
)
921 << ", releasing leader" << dendl
;
922 release_leader_lock();
926 dout(10) << m_heartbeat_response
.acks
.size() << " acks received, "
927 << m_heartbeat_response
.timeouts
.size() << " timed out" << dendl
;
929 std::vector
<std::string
> instance_ids
;
930 for (auto &it
: m_heartbeat_response
.acks
) {
931 uint64_t notifier_id
= it
.first
.gid
;
932 instance_ids
.push_back(stringify(notifier_id
));
934 if (!instance_ids
.empty()) {
935 m_instances
->acked(instance_ids
);
938 schedule_timer_task("heartbeat", 1, true,
939 &LeaderWatcher
<I
>::notify_heartbeat
, false);
942 template <typename I
>
943 void LeaderWatcher
<I
>::handle_heartbeat(Context
*on_notify_ack
) {
947 std::scoped_lock locker
{m_threads
->timer_lock
, m_lock
};
948 if (is_leader(m_lock
)) {
949 dout(5) << "got another leader heartbeat, ignoring" << dendl
;
950 } else if (!m_locker
.cookie
.empty()) {
952 m_acquire_attempts
= 0;
953 schedule_acquire_leader_lock(1);
957 on_notify_ack
->complete(0);
960 template <typename I
>
961 void LeaderWatcher
<I
>::handle_lock_acquired(Context
*on_notify_ack
) {
965 std::scoped_lock locker
{m_threads
->timer_lock
, m_lock
};
966 if (is_leader(m_lock
)) {
967 dout(5) << "got another leader lock_acquired, ignoring" << dendl
;
970 schedule_get_locker(true, 0);
974 on_notify_ack
->complete(0);
977 template <typename I
>
978 void LeaderWatcher
<I
>::handle_lock_released(Context
*on_notify_ack
) {
982 std::scoped_lock locker
{m_threads
->timer_lock
, m_lock
};
983 if (is_leader(m_lock
)) {
984 dout(5) << "got another leader lock_released, ignoring" << dendl
;
987 schedule_get_locker(true, 0);
991 on_notify_ack
->complete(0);
994 template <typename I
>
995 void LeaderWatcher
<I
>::handle_notify(uint64_t notify_id
, uint64_t handle
,
996 uint64_t notifier_id
, bufferlist
&bl
) {
997 dout(10) << "notify_id=" << notify_id
<< ", handle=" << handle
<< ", "
998 << "notifier_id=" << notifier_id
<< dendl
;
1000 Context
*ctx
= new C_NotifyAck(this, notify_id
, handle
);
1002 if (notifier_id
== m_notifier_id
) {
1003 dout(10) << "our own notification, ignoring" << dendl
;
1008 NotifyMessage notify_message
;
1010 auto iter
= bl
.cbegin();
1011 decode(notify_message
, iter
);
1012 } catch (const buffer::error
&err
) {
1013 derr
<< "error decoding image notification: " << err
.what() << dendl
;
1018 apply_visitor(HandlePayloadVisitor(this, ctx
), notify_message
.payload
);
1021 template <typename I
>
1022 void LeaderWatcher
<I
>::handle_rewatch_complete(int r
) {
1023 dout(5) << "r=" << r
<< dendl
;
1025 if (r
== -EBLOCKLISTED
) {
1026 dout(1) << "blocklisted detected" << dendl
;
1027 m_blocklisted
= true;
1031 m_leader_lock
->reacquire_lock(nullptr);
1034 template <typename I
>
1035 void LeaderWatcher
<I
>::handle_payload(const HeartbeatPayload
&payload
,
1036 Context
*on_notify_ack
) {
1037 dout(10) << "heartbeat" << dendl
;
1039 handle_heartbeat(on_notify_ack
);
1042 template <typename I
>
1043 void LeaderWatcher
<I
>::handle_payload(const LockAcquiredPayload
&payload
,
1044 Context
*on_notify_ack
) {
1045 dout(10) << "lock_acquired" << dendl
;
1047 handle_lock_acquired(on_notify_ack
);
1050 template <typename I
>
1051 void LeaderWatcher
<I
>::handle_payload(const LockReleasedPayload
&payload
,
1052 Context
*on_notify_ack
) {
1053 dout(10) << "lock_released" << dendl
;
1055 handle_lock_released(on_notify_ack
);
1058 template <typename I
>
1059 void LeaderWatcher
<I
>::handle_payload(const UnknownPayload
&payload
,
1060 Context
*on_notify_ack
) {
1061 dout(10) << "unknown" << dendl
;
1063 on_notify_ack
->complete(0);
1066 } // namespace mirror
1069 template class rbd::mirror::LeaderWatcher
<librbd::ImageCtx
>;