1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "LeaderWatcher.h"
5 #include "common/Timer.h"
6 #include "common/debug.h"
7 #include "common/errno.h"
8 #include "cls/rbd/cls_rbd_client.h"
9 #include "include/stringify.h"
10 #include "librbd/Utils.h"
11 #include "librbd/watcher/Types.h"
14 #define dout_context g_ceph_context
15 #define dout_subsys ceph_subsys_rbd_mirror
17 #define dout_prefix *_dout << "rbd::mirror::LeaderWatcher: " \
18 << this << " " << __func__ << ": "
22 using namespace leader_watcher
;
24 using librbd::util::create_async_context_callback
;
25 using librbd::util::create_context_callback
;
26 using librbd::util::create_rados_callback
;
29 LeaderWatcher
<I
>::LeaderWatcher(Threads
<I
> *threads
, librados::IoCtx
&io_ctx
,
31 : Watcher(io_ctx
, threads
->work_queue
, RBD_MIRROR_LEADER
),
32 m_threads(threads
), m_listener(listener
),
33 m_lock("rbd::mirror::LeaderWatcher " + io_ctx
.get_pool_name()),
34 m_notifier_id(librados::Rados(io_ctx
).get_instance_id()),
35 m_leader_lock(new LeaderLock(m_ioctx
, m_work_queue
, m_oid
, this, true,
36 m_cct
->_conf
->rbd_blacklist_expire_seconds
)) {
40 LeaderWatcher
<I
>::~LeaderWatcher() {
41 assert(m_status_watcher
== nullptr);
42 assert(m_instances
== nullptr);
43 assert(m_timer_task
== nullptr);
49 std::string LeaderWatcher
<I
>::get_instance_id() {
50 return stringify(m_notifier_id
);
54 int LeaderWatcher
<I
>::init() {
57 return init_ctx
.wait();
61 void LeaderWatcher
<I
>::init(Context
*on_finish
) {
62 dout(20) << "notifier_id=" << m_notifier_id
<< dendl
;
64 Mutex::Locker
locker(m_lock
);
66 assert(m_on_finish
== nullptr);
67 m_on_finish
= on_finish
;
69 create_leader_object();
73 void LeaderWatcher
<I
>::create_leader_object() {
76 assert(m_lock
.is_locked());
78 librados::ObjectWriteOperation op
;
81 librados::AioCompletion
*aio_comp
= create_rados_callback
<
82 LeaderWatcher
<I
>, &LeaderWatcher
<I
>::handle_create_leader_object
>(this);
83 int r
= m_ioctx
.aio_operate(m_oid
, aio_comp
, &op
);
89 void LeaderWatcher
<I
>::handle_create_leader_object(int r
) {
90 dout(20) << "r=" << r
<< dendl
;
92 Context
*on_finish
= nullptr;
94 Mutex::Locker
locker(m_lock
);
101 derr
<< "error creating " << m_oid
<< " object: " << cpp_strerror(r
)
104 std::swap(on_finish
, m_on_finish
);
106 on_finish
->complete(r
);
109 template <typename I
>
110 void LeaderWatcher
<I
>::register_watch() {
113 assert(m_lock
.is_locked());
115 Context
*ctx
= create_async_context_callback(
116 m_work_queue
, create_context_callback
<
117 LeaderWatcher
<I
>, &LeaderWatcher
<I
>::handle_register_watch
>(this));
119 librbd::Watcher::register_watch(ctx
);
122 template <typename I
>
123 void LeaderWatcher
<I
>::handle_register_watch(int r
) {
124 dout(20) << "r=" << r
<< dendl
;
126 Context
*on_finish
= nullptr;
128 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
129 Mutex::Locker
locker(m_lock
);
132 derr
<< "error registering leader watcher for " << m_oid
<< " object: "
133 << cpp_strerror(r
) << dendl
;
135 schedule_acquire_leader_lock(0);
138 std::swap(on_finish
, m_on_finish
);
140 on_finish
->complete(r
);
143 template <typename I
>
144 void LeaderWatcher
<I
>::shut_down() {
145 C_SaferCond shut_down_ctx
;
146 shut_down(&shut_down_ctx
);
147 int r
= shut_down_ctx
.wait();
151 template <typename I
>
152 void LeaderWatcher
<I
>::shut_down(Context
*on_finish
) {
155 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
156 Mutex::Locker
locker(m_lock
);
158 assert(m_on_shut_down_finish
== nullptr);
159 m_on_shut_down_finish
= on_finish
;
161 shut_down_leader_lock();
164 template <typename I
>
165 void LeaderWatcher
<I
>::shut_down_leader_lock() {
168 assert(m_lock
.is_locked());
170 Context
*ctx
= create_async_context_callback(
171 m_work_queue
, create_context_callback
<
172 LeaderWatcher
<I
>, &LeaderWatcher
<I
>::handle_shut_down_leader_lock
>(this));
174 m_leader_lock
->shut_down(ctx
);
177 template <typename I
>
178 void LeaderWatcher
<I
>::handle_shut_down_leader_lock(int r
) {
179 dout(20) << "r=" << r
<< dendl
;
181 Mutex::Locker
locker(m_lock
);
184 derr
<< "error shutting down leader lock: " << cpp_strerror(r
) << dendl
;
190 template <typename I
>
191 void LeaderWatcher
<I
>::unregister_watch() {
194 assert(m_lock
.is_locked());
196 Context
*ctx
= create_async_context_callback(
197 m_work_queue
, create_context_callback
<
198 LeaderWatcher
<I
>, &LeaderWatcher
<I
>::handle_unregister_watch
>(this));
200 librbd::Watcher::unregister_watch(ctx
);
203 template <typename I
>
204 void LeaderWatcher
<I
>::handle_unregister_watch(int r
) {
205 dout(20) << "r=" << r
<< dendl
;
208 derr
<< "error unregistering leader watcher for " << m_oid
<< " object: "
209 << cpp_strerror(r
) << dendl
;
214 template <typename I
>
215 void LeaderWatcher
<I
>::wait_for_tasks() {
218 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
219 Mutex::Locker
locker(m_lock
);
220 schedule_timer_task("wait for tasks", 0, false,
221 &LeaderWatcher
<I
>::handle_wait_for_tasks
, true);
224 template <typename I
>
225 void LeaderWatcher
<I
>::handle_wait_for_tasks() {
228 assert(m_threads
->timer_lock
.is_locked());
229 assert(m_lock
.is_locked());
230 assert(m_on_shut_down_finish
!= nullptr);
232 assert(!m_timer_op_tracker
.empty());
233 m_timer_op_tracker
.finish_op();
235 auto ctx
= new FunctionContext([this](int r
) {
238 // ensure lock isn't held when completing shut down
239 Mutex::Locker
locker(m_lock
);
240 assert(m_on_shut_down_finish
!= nullptr);
241 on_finish
= m_on_shut_down_finish
;
243 on_finish
->complete(0);
245 m_work_queue
->queue(ctx
, 0);
248 template <typename I
>
249 bool LeaderWatcher
<I
>::is_leader() const {
250 Mutex::Locker
locker(m_lock
);
252 return is_leader(m_lock
);
255 template <typename I
>
256 bool LeaderWatcher
<I
>::is_leader(Mutex
&lock
) const {
257 assert(m_lock
.is_locked());
259 bool leader
= m_leader_lock
->is_leader();
260 dout(20) << leader
<< dendl
;
264 template <typename I
>
265 bool LeaderWatcher
<I
>::is_releasing_leader() const {
266 Mutex::Locker
locker(m_lock
);
268 return is_releasing_leader(m_lock
);
271 template <typename I
>
272 bool LeaderWatcher
<I
>::is_releasing_leader(Mutex
&lock
) const {
273 assert(m_lock
.is_locked());
275 bool releasing
= m_leader_lock
->is_releasing_leader();
276 dout(20) << releasing
<< dendl
;
280 template <typename I
>
281 bool LeaderWatcher
<I
>::get_leader_instance_id(std::string
*instance_id
) const {
284 Mutex::Locker
locker(m_lock
);
286 if (is_leader(m_lock
) || is_releasing_leader(m_lock
)) {
287 *instance_id
= stringify(m_notifier_id
);
291 if (!m_locker
.cookie
.empty()) {
292 *instance_id
= stringify(m_locker
.entity
.num());
299 template <typename I
>
300 void LeaderWatcher
<I
>::release_leader() {
303 Mutex::Locker
locker(m_lock
);
304 if (!is_leader(m_lock
)) {
308 release_leader_lock();
311 template <typename I
>
312 void LeaderWatcher
<I
>::list_instances(std::vector
<std::string
> *instance_ids
) {
315 Mutex::Locker
locker(m_lock
);
317 instance_ids
->clear();
318 if (m_instances
!= nullptr) {
319 m_instances
->list(instance_ids
);
323 template <typename I
>
324 void LeaderWatcher
<I
>::cancel_timer_task() {
325 assert(m_threads
->timer_lock
.is_locked());
326 assert(m_lock
.is_locked());
328 if (m_timer_task
== nullptr) {
332 dout(20) << m_timer_task
<< dendl
;
333 bool canceled
= m_threads
->timer
->cancel_event(m_timer_task
);
335 m_timer_task
= nullptr;
338 template <typename I
>
339 void LeaderWatcher
<I
>::schedule_timer_task(const std::string
&name
,
340 int delay_factor
, bool leader
,
341 TimerCallback timer_callback
,
342 bool shutting_down
) {
343 assert(m_threads
->timer_lock
.is_locked());
344 assert(m_lock
.is_locked());
346 if (!shutting_down
&& m_on_shut_down_finish
!= nullptr) {
352 m_timer_task
= new FunctionContext(
353 [this, leader
, timer_callback
](int r
) {
354 assert(m_threads
->timer_lock
.is_locked());
355 m_timer_task
= nullptr;
357 if (m_timer_op_tracker
.empty()) {
358 Mutex::Locker
locker(m_lock
);
359 execute_timer_task(leader
, timer_callback
);
363 // old timer task is still running -- do not start next
364 // task until the previous task completes
365 if (m_timer_gate
== nullptr) {
366 m_timer_gate
= new C_TimerGate(this);
367 m_timer_op_tracker
.wait_for_ops(m_timer_gate
);
369 m_timer_gate
->leader
= leader
;
370 m_timer_gate
->timer_callback
= timer_callback
;
373 int after
= delay_factor
*
374 max(1, m_cct
->_conf
->rbd_mirror_leader_heartbeat_interval
);
376 dout(20) << "scheduling " << name
<< " after " << after
<< " sec (task "
377 << m_timer_task
<< ")" << dendl
;
378 m_threads
->timer
->add_event_after(after
, m_timer_task
);
381 template <typename I
>
382 void LeaderWatcher
<I
>::execute_timer_task(bool leader
,
383 TimerCallback timer_callback
) {
386 assert(m_threads
->timer_lock
.is_locked());
387 assert(m_lock
.is_locked());
388 assert(m_timer_op_tracker
.empty());
390 if (is_leader(m_lock
) != leader
) {
394 m_timer_op_tracker
.start_op();
395 (this->*timer_callback
)();
398 template <typename I
>
399 void LeaderWatcher
<I
>::handle_post_acquire_leader_lock(int r
,
400 Context
*on_finish
) {
401 dout(20) << "r=" << r
<< dendl
;
405 dout(20) << "already locked" << dendl
;
407 derr
<< "error acquiring leader lock: " << cpp_strerror(r
) << dendl
;
409 on_finish
->complete(r
);
413 Mutex::Locker
locker(m_lock
);
414 assert(m_on_finish
== nullptr);
415 m_on_finish
= on_finish
;
418 init_status_watcher();
421 template <typename I
>
422 void LeaderWatcher
<I
>::handle_pre_release_leader_lock(Context
*on_finish
) {
425 Mutex::Locker
locker(m_lock
);
426 assert(m_on_finish
== nullptr);
427 m_on_finish
= on_finish
;
433 template <typename I
>
434 void LeaderWatcher
<I
>::handle_post_release_leader_lock(int r
,
435 Context
*on_finish
) {
436 dout(20) << "r=" << r
<< dendl
;
439 on_finish
->complete(r
);
443 Mutex::Locker
locker(m_lock
);
444 assert(m_on_finish
== nullptr);
445 m_on_finish
= on_finish
;
447 notify_lock_released();
450 template <typename I
>
451 void LeaderWatcher
<I
>::break_leader_lock() {
454 assert(m_threads
->timer_lock
.is_locked());
455 assert(m_lock
.is_locked());
456 assert(!m_timer_op_tracker
.empty());
458 if (m_locker
.cookie
.empty()) {
463 Context
*ctx
= create_async_context_callback(
464 m_work_queue
, create_context_callback
<
465 LeaderWatcher
<I
>, &LeaderWatcher
<I
>::handle_break_leader_lock
>(this));
467 m_leader_lock
->break_lock(m_locker
, true, ctx
);
470 template <typename I
>
471 void LeaderWatcher
<I
>::handle_break_leader_lock(int r
) {
472 dout(20) << "r=" << r
<< dendl
;
474 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
475 Mutex::Locker
locker(m_lock
);
476 assert(!m_timer_op_tracker
.empty());
478 if (m_leader_lock
->is_shutdown()) {
479 dout(20) << "canceling due to shutdown" << dendl
;
480 m_timer_op_tracker
.finish_op();
484 if (r
< 0 && r
!= -ENOENT
) {
485 derr
<< "error beaking leader lock: " << cpp_strerror(r
) << dendl
;
486 schedule_acquire_leader_lock(1);
487 m_timer_op_tracker
.finish_op();
492 m_acquire_attempts
= 0;
493 acquire_leader_lock();
496 template <typename I
>
497 void LeaderWatcher
<I
>::schedule_get_locker(bool reset_leader
,
498 uint32_t delay_factor
) {
501 assert(m_threads
->timer_lock
.is_locked());
502 assert(m_lock
.is_locked());
506 m_acquire_attempts
= 0;
509 schedule_timer_task("get locker", delay_factor
, false,
510 &LeaderWatcher
<I
>::get_locker
, false);
513 template <typename I
>
514 void LeaderWatcher
<I
>::get_locker() {
517 assert(m_threads
->timer_lock
.is_locked());
518 assert(m_lock
.is_locked());
519 assert(!m_timer_op_tracker
.empty());
521 C_GetLocker
*get_locker_ctx
= new C_GetLocker(this);
522 Context
*ctx
= create_async_context_callback(m_work_queue
, get_locker_ctx
);
524 m_leader_lock
->get_locker(&get_locker_ctx
->locker
, ctx
);
527 template <typename I
>
528 void LeaderWatcher
<I
>::handle_get_locker(int r
,
529 librbd::managed_lock::Locker
& locker
) {
530 dout(20) << "r=" << r
<< dendl
;
532 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
533 Mutex::Locker
mutex_locker(m_lock
);
534 assert(!m_timer_op_tracker
.empty());
536 if (m_leader_lock
->is_shutdown()) {
537 dout(20) << "canceling due to shutdown" << dendl
;
538 m_timer_op_tracker
.finish_op();
542 if (is_leader(m_lock
)) {
544 m_timer_op_tracker
.finish_op();
550 m_acquire_attempts
= 0;
551 acquire_leader_lock();
554 derr
<< "error retrieving leader locker: " << cpp_strerror(r
) << dendl
;
555 schedule_get_locker(true, 1);
556 m_timer_op_tracker
.finish_op();
560 bool notify_listener
= false;
561 if (m_locker
!= locker
) {
563 notify_listener
= true;
564 if (m_acquire_attempts
> 1) {
565 dout(10) << "new lock owner detected -- resetting heartbeat counter"
567 m_acquire_attempts
= 0;
571 if (m_acquire_attempts
>=
572 m_cct
->_conf
->rbd_mirror_leader_max_acquire_attempts_before_break
) {
573 dout(0) << "breaking leader lock after " << m_acquire_attempts
<< " "
574 << "failed attempts to acquire" << dendl
;
579 schedule_acquire_leader_lock(1);
581 if (!notify_listener
) {
582 m_timer_op_tracker
.finish_op();
586 auto ctx
= new FunctionContext(
588 std::string instance_id
;
589 if (get_leader_instance_id(&instance_id
)) {
590 m_listener
->update_leader_handler(instance_id
);
592 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
593 Mutex::Locker
locker(m_lock
);
594 m_timer_op_tracker
.finish_op();
596 m_work_queue
->queue(ctx
, 0);
599 template <typename I
>
600 void LeaderWatcher
<I
>::schedule_acquire_leader_lock(uint32_t delay_factor
) {
603 assert(m_threads
->timer_lock
.is_locked());
604 assert(m_lock
.is_locked());
606 schedule_timer_task("acquire leader lock",
608 m_cct
->_conf
->rbd_mirror_leader_max_missed_heartbeats
,
609 false, &LeaderWatcher
<I
>::acquire_leader_lock
, false);
612 template <typename I
>
613 void LeaderWatcher
<I
>::acquire_leader_lock() {
614 assert(m_threads
->timer_lock
.is_locked());
615 assert(m_lock
.is_locked());
616 assert(!m_timer_op_tracker
.empty());
618 ++m_acquire_attempts
;
619 dout(20) << "acquire_attempts=" << m_acquire_attempts
<< dendl
;
621 Context
*ctx
= create_async_context_callback(
622 m_work_queue
, create_context_callback
<
623 LeaderWatcher
<I
>, &LeaderWatcher
<I
>::handle_acquire_leader_lock
>(this));
624 m_leader_lock
->try_acquire_lock(ctx
);
627 template <typename I
>
628 void LeaderWatcher
<I
>::handle_acquire_leader_lock(int r
) {
629 dout(20) << "r=" << r
<< dendl
;
631 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
632 Mutex::Locker
locker(m_lock
);
633 assert(!m_timer_op_tracker
.empty());
635 if (m_leader_lock
->is_shutdown()) {
636 dout(20) << "canceling due to shutdown" << dendl
;
637 m_timer_op_tracker
.finish_op();
643 dout(20) << "already locked" << dendl
;
645 derr
<< "error acquiring lock: " << cpp_strerror(r
) << dendl
;
653 m_acquire_attempts
= 0;
656 dout(5) << "releasing due to error on notify" << dendl
;
657 release_leader_lock();
658 m_timer_op_tracker
.finish_op();
665 template <typename I
>
666 void LeaderWatcher
<I
>::release_leader_lock() {
669 assert(m_lock
.is_locked());
671 Context
*ctx
= create_async_context_callback(
672 m_work_queue
, create_context_callback
<
673 LeaderWatcher
<I
>, &LeaderWatcher
<I
>::handle_release_leader_lock
>(this));
675 m_leader_lock
->release_lock(ctx
);
678 template <typename I
>
679 void LeaderWatcher
<I
>::handle_release_leader_lock(int r
) {
680 dout(20) << "r=" << r
<< dendl
;
682 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
683 Mutex::Locker
locker(m_lock
);
686 derr
<< "error releasing lock: " << cpp_strerror(r
) << dendl
;
690 schedule_acquire_leader_lock(1);
693 template <typename I
>
694 void LeaderWatcher
<I
>::init_status_watcher() {
697 assert(m_lock
.is_locked());
698 assert(m_status_watcher
== nullptr);
700 m_status_watcher
= MirrorStatusWatcher
<I
>::create(m_ioctx
, m_work_queue
);
702 Context
*ctx
= create_context_callback
<
703 LeaderWatcher
<I
>, &LeaderWatcher
<I
>::handle_init_status_watcher
>(this);
705 m_status_watcher
->init(ctx
);
708 template <typename I
>
709 void LeaderWatcher
<I
>::handle_init_status_watcher(int r
) {
710 dout(20) << "r=" << r
<< dendl
;
712 Context
*on_finish
= nullptr;
714 Mutex::Locker
locker(m_lock
);
721 derr
<< "error initializing mirror status watcher: " << cpp_strerror(r
)
723 m_status_watcher
->destroy();
724 m_status_watcher
= nullptr;
725 assert(m_on_finish
!= nullptr);
726 std::swap(m_on_finish
, on_finish
);
728 on_finish
->complete(r
);
731 template <typename I
>
732 void LeaderWatcher
<I
>::shut_down_status_watcher() {
735 assert(m_lock
.is_locked());
736 assert(m_status_watcher
!= nullptr);
738 Context
*ctx
= create_async_context_callback(
739 m_work_queue
, create_context_callback
<LeaderWatcher
<I
>,
740 &LeaderWatcher
<I
>::handle_shut_down_status_watcher
>(this));
742 m_status_watcher
->shut_down(ctx
);
745 template <typename I
>
746 void LeaderWatcher
<I
>::handle_shut_down_status_watcher(int r
) {
747 dout(20) << "r=" << r
<< dendl
;
749 Context
*on_finish
= nullptr;
751 Mutex::Locker
locker(m_lock
);
753 m_status_watcher
->destroy();
754 m_status_watcher
= nullptr;
757 derr
<< "error shutting mirror status watcher down: " << cpp_strerror(r
)
761 if (m_ret_val
!= 0) {
765 if (!is_leader(m_lock
)) {
766 // ignore on releasing
770 assert(m_on_finish
!= nullptr);
771 std::swap(m_on_finish
, on_finish
);
773 on_finish
->complete(r
);
776 template <typename I
>
777 void LeaderWatcher
<I
>::init_instances() {
780 assert(m_lock
.is_locked());
781 assert(m_instances
== nullptr);
783 m_instances
= Instances
<I
>::create(m_threads
, m_ioctx
);
785 Context
*ctx
= create_context_callback
<
786 LeaderWatcher
<I
>, &LeaderWatcher
<I
>::handle_init_instances
>(this);
788 m_instances
->init(ctx
);
791 template <typename I
>
792 void LeaderWatcher
<I
>::handle_init_instances(int r
) {
793 dout(20) << "r=" << r
<< dendl
;
795 Mutex::Locker
locker(m_lock
);
798 derr
<< "error initializing instances: " << cpp_strerror(r
) << dendl
;
800 m_instances
->destroy();
801 m_instances
= nullptr;
802 shut_down_status_watcher();
809 template <typename I
>
810 void LeaderWatcher
<I
>::shut_down_instances() {
813 assert(m_lock
.is_locked());
814 assert(m_instances
!= nullptr);
816 Context
*ctx
= create_async_context_callback(
817 m_work_queue
, create_context_callback
<LeaderWatcher
<I
>,
818 &LeaderWatcher
<I
>::handle_shut_down_instances
>(this));
820 m_instances
->shut_down(ctx
);
823 template <typename I
>
824 void LeaderWatcher
<I
>::handle_shut_down_instances(int r
) {
825 dout(20) << "r=" << r
<< dendl
;
828 Mutex::Locker
locker(m_lock
);
830 m_instances
->destroy();
831 m_instances
= nullptr;
833 shut_down_status_watcher();
836 template <typename I
>
837 void LeaderWatcher
<I
>::notify_listener() {
840 assert(m_lock
.is_locked());
842 Context
*ctx
= create_async_context_callback(
843 m_work_queue
, create_context_callback
<
844 LeaderWatcher
<I
>, &LeaderWatcher
<I
>::handle_notify_listener
>(this));
846 if (is_leader(m_lock
)) {
847 ctx
= new FunctionContext(
849 m_listener
->post_acquire_handler(ctx
);
852 ctx
= new FunctionContext(
854 m_listener
->pre_release_handler(ctx
);
857 m_work_queue
->queue(ctx
, 0);
860 template <typename I
>
861 void LeaderWatcher
<I
>::handle_notify_listener(int r
) {
862 dout(20) << "r=" << r
<< dendl
;
864 Mutex::Locker
locker(m_lock
);
867 derr
<< "error notifying listener: " << cpp_strerror(r
) << dendl
;
871 if (is_leader(m_lock
)) {
872 notify_lock_acquired();
874 shut_down_instances();
878 template <typename I
>
879 void LeaderWatcher
<I
>::notify_lock_acquired() {
882 assert(m_lock
.is_locked());
884 Context
*ctx
= create_context_callback
<
885 LeaderWatcher
<I
>, &LeaderWatcher
<I
>::handle_notify_lock_acquired
>(this);
888 ::encode(NotifyMessage
{LockAcquiredPayload
{}}, bl
);
890 send_notify(bl
, nullptr, ctx
);
893 template <typename I
>
894 void LeaderWatcher
<I
>::handle_notify_lock_acquired(int r
) {
895 dout(20) << "r=" << r
<< dendl
;
897 Context
*on_finish
= nullptr;
899 Mutex::Locker
locker(m_lock
);
900 if (r
< 0 && r
!= -ETIMEDOUT
) {
901 derr
<< "error notifying leader lock acquired: " << cpp_strerror(r
)
906 assert(m_on_finish
!= nullptr);
907 std::swap(m_on_finish
, on_finish
);
909 on_finish
->complete(0);
912 template <typename I
>
913 void LeaderWatcher
<I
>::notify_lock_released() {
916 assert(m_lock
.is_locked());
918 Context
*ctx
= create_context_callback
<
919 LeaderWatcher
<I
>, &LeaderWatcher
<I
>::handle_notify_lock_released
>(this);
922 ::encode(NotifyMessage
{LockReleasedPayload
{}}, bl
);
924 send_notify(bl
, nullptr, ctx
);
927 template <typename I
>
928 void LeaderWatcher
<I
>::handle_notify_lock_released(int r
) {
929 dout(20) << "r=" << r
<< dendl
;
931 Context
*on_finish
= nullptr;
933 Mutex::Locker
locker(m_lock
);
934 if (r
< 0 && r
!= -ETIMEDOUT
) {
935 derr
<< "error notifying leader lock released: " << cpp_strerror(r
)
939 assert(m_on_finish
!= nullptr);
940 std::swap(m_on_finish
, on_finish
);
942 on_finish
->complete(r
);
945 template <typename I
>
946 void LeaderWatcher
<I
>::notify_heartbeat() {
949 assert(m_threads
->timer_lock
.is_locked());
950 assert(m_lock
.is_locked());
951 assert(!m_timer_op_tracker
.empty());
953 if (!is_leader(m_lock
)) {
954 dout(5) << "not leader, canceling" << dendl
;
955 m_timer_op_tracker
.finish_op();
959 Context
*ctx
= create_context_callback
<
960 LeaderWatcher
<I
>, &LeaderWatcher
<I
>::handle_notify_heartbeat
>(this);
963 ::encode(NotifyMessage
{HeartbeatPayload
{}}, bl
);
965 m_heartbeat_response
.acks
.clear();
966 send_notify(bl
, &m_heartbeat_response
, ctx
);
969 template <typename I
>
970 void LeaderWatcher
<I
>::handle_notify_heartbeat(int r
) {
971 dout(20) << "r=" << r
<< dendl
;
973 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
974 Mutex::Locker
locker(m_lock
);
975 assert(!m_timer_op_tracker
.empty());
977 m_timer_op_tracker
.finish_op();
978 if (m_leader_lock
->is_shutdown()) {
979 dout(20) << "canceling due to shutdown" << dendl
;
981 } else if (!is_leader(m_lock
)) {
985 if (r
< 0 && r
!= -ETIMEDOUT
) {
986 derr
<< "error notifying hearbeat: " << cpp_strerror(r
)
987 << ", releasing leader" << dendl
;
988 release_leader_lock();
992 dout(20) << m_heartbeat_response
.acks
.size() << " acks received, "
993 << m_heartbeat_response
.timeouts
.size() << " timed out" << dendl
;
995 for (auto &it
: m_heartbeat_response
.acks
) {
996 uint64_t notifier_id
= it
.first
.gid
;
997 if (notifier_id
== m_notifier_id
) {
1001 std::string instance_id
= stringify(notifier_id
);
1002 m_instances
->notify(instance_id
);
1005 schedule_timer_task("heartbeat", 1, true,
1006 &LeaderWatcher
<I
>::notify_heartbeat
, false);
1009 template <typename I
>
1010 void LeaderWatcher
<I
>::handle_heartbeat(Context
*on_notify_ack
) {
1014 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
1015 Mutex::Locker
locker(m_lock
);
1016 if (is_leader(m_lock
)) {
1017 dout(5) << "got another leader heartbeat, ignoring" << dendl
;
1019 cancel_timer_task();
1020 m_acquire_attempts
= 0;
1021 schedule_acquire_leader_lock(1);
1025 on_notify_ack
->complete(0);
1028 template <typename I
>
1029 void LeaderWatcher
<I
>::handle_lock_acquired(Context
*on_notify_ack
) {
1033 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
1034 Mutex::Locker
locker(m_lock
);
1035 if (is_leader(m_lock
)) {
1036 dout(5) << "got another leader lock_acquired, ignoring" << dendl
;
1038 cancel_timer_task();
1039 schedule_get_locker(true, 0);
1043 on_notify_ack
->complete(0);
1046 template <typename I
>
1047 void LeaderWatcher
<I
>::handle_lock_released(Context
*on_notify_ack
) {
1051 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
1052 Mutex::Locker
locker(m_lock
);
1053 if (is_leader(m_lock
)) {
1054 dout(5) << "got another leader lock_released, ignoring" << dendl
;
1056 cancel_timer_task();
1057 schedule_get_locker(true, 0);
1061 on_notify_ack
->complete(0);
1064 template <typename I
>
1065 void LeaderWatcher
<I
>::handle_notify(uint64_t notify_id
, uint64_t handle
,
1066 uint64_t notifier_id
, bufferlist
&bl
) {
1067 dout(20) << "notify_id=" << notify_id
<< ", handle=" << handle
<< ", "
1068 << "notifier_id=" << notifier_id
<< dendl
;
1070 Context
*ctx
= new C_NotifyAck(this, notify_id
, handle
);
1072 if (notifier_id
== m_notifier_id
) {
1073 dout(20) << "our own notification, ignoring" << dendl
;
1078 NotifyMessage notify_message
;
1080 bufferlist::iterator iter
= bl
.begin();
1081 ::decode(notify_message
, iter
);
1082 } catch (const buffer::error
&err
) {
1083 derr
<< ": error decoding image notification: " << err
.what() << dendl
;
1088 apply_visitor(HandlePayloadVisitor(this, ctx
), notify_message
.payload
);
1091 template <typename I
>
1092 void LeaderWatcher
<I
>::handle_payload(const HeartbeatPayload
&payload
,
1093 Context
*on_notify_ack
) {
1094 dout(20) << "heartbeat" << dendl
;
1096 handle_heartbeat(on_notify_ack
);
1099 template <typename I
>
1100 void LeaderWatcher
<I
>::handle_payload(const LockAcquiredPayload
&payload
,
1101 Context
*on_notify_ack
) {
1102 dout(20) << "lock_acquired" << dendl
;
1104 handle_lock_acquired(on_notify_ack
);
1107 template <typename I
>
1108 void LeaderWatcher
<I
>::handle_payload(const LockReleasedPayload
&payload
,
1109 Context
*on_notify_ack
) {
1110 dout(20) << "lock_released" << dendl
;
1112 handle_lock_released(on_notify_ack
);
1115 template <typename I
>
1116 void LeaderWatcher
<I
>::handle_payload(const UnknownPayload
&payload
,
1117 Context
*on_notify_ack
) {
1118 dout(20) << "unknown" << dendl
;
1120 on_notify_ack
->complete(0);
1123 } // namespace mirror
1126 template class rbd::mirror::LeaderWatcher
<librbd::ImageCtx
>;