]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/LeaderWatcher.cc
update sources to v12.1.0
[ceph.git] / ceph / src / tools / rbd_mirror / LeaderWatcher.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "LeaderWatcher.h"
5#include "common/Timer.h"
6#include "common/debug.h"
7#include "common/errno.h"
8#include "cls/rbd/cls_rbd_client.h"
9#include "include/stringify.h"
10#include "librbd/Utils.h"
11#include "librbd/watcher/Types.h"
12#include "Threads.h"
13
14#define dout_context g_ceph_context
15#define dout_subsys ceph_subsys_rbd_mirror
16#undef dout_prefix
17#define dout_prefix *_dout << "rbd::mirror::LeaderWatcher: " \
18 << this << " " << __func__ << ": "
19namespace rbd {
20namespace mirror {
21
22using namespace leader_watcher;
23
24using librbd::util::create_async_context_callback;
25using librbd::util::create_context_callback;
26using librbd::util::create_rados_callback;
27
28template <typename I>
29LeaderWatcher<I>::LeaderWatcher(Threads<I> *threads, librados::IoCtx &io_ctx,
30 Listener *listener)
31 : Watcher(io_ctx, threads->work_queue, RBD_MIRROR_LEADER),
32 m_threads(threads), m_listener(listener),
33 m_lock("rbd::mirror::LeaderWatcher " + io_ctx.get_pool_name()),
34 m_notifier_id(librados::Rados(io_ctx).get_instance_id()),
35 m_leader_lock(new LeaderLock(m_ioctx, m_work_queue, m_oid, this, true,
36 m_cct->_conf->rbd_blacklist_expire_seconds)) {
37}
38
39template <typename I>
40LeaderWatcher<I>::~LeaderWatcher() {
41 assert(m_status_watcher == nullptr);
42 assert(m_instances == nullptr);
43 assert(m_timer_task == nullptr);
44
45 delete m_leader_lock;
46}
47
31f18b77
FG
48template <typename I>
49std::string LeaderWatcher<I>::get_instance_id() {
50 return stringify(m_notifier_id);
51}
52
7c673cae
FG
53template <typename I>
54int LeaderWatcher<I>::init() {
55 C_SaferCond init_ctx;
56 init(&init_ctx);
57 return init_ctx.wait();
58}
59
60template <typename I>
61void LeaderWatcher<I>::init(Context *on_finish) {
62 dout(20) << "notifier_id=" << m_notifier_id << dendl;
63
64 Mutex::Locker locker(m_lock);
65
66 assert(m_on_finish == nullptr);
67 m_on_finish = on_finish;
68
69 create_leader_object();
70}
71
72template <typename I>
73void LeaderWatcher<I>::create_leader_object() {
74 dout(20) << dendl;
75
76 assert(m_lock.is_locked());
77
78 librados::ObjectWriteOperation op;
79 op.create(false);
80
81 librados::AioCompletion *aio_comp = create_rados_callback<
82 LeaderWatcher<I>, &LeaderWatcher<I>::handle_create_leader_object>(this);
83 int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
84 assert(r == 0);
85 aio_comp->release();
86}
87
88template <typename I>
89void LeaderWatcher<I>::handle_create_leader_object(int r) {
90 dout(20) << "r=" << r << dendl;
91
92 Context *on_finish = nullptr;
93 {
94 Mutex::Locker locker(m_lock);
95
96 if (r == 0) {
97 register_watch();
98 return;
99 }
100
101 derr << "error creating " << m_oid << " object: " << cpp_strerror(r)
102 << dendl;
103
104 std::swap(on_finish, m_on_finish);
105 }
106 on_finish->complete(r);
107}
108
109template <typename I>
110void LeaderWatcher<I>::register_watch() {
111 dout(20) << dendl;
112
113 assert(m_lock.is_locked());
114
115 Context *ctx = create_async_context_callback(
116 m_work_queue, create_context_callback<
117 LeaderWatcher<I>, &LeaderWatcher<I>::handle_register_watch>(this));
118
119 librbd::Watcher::register_watch(ctx);
120}
121
122template <typename I>
123void LeaderWatcher<I>::handle_register_watch(int r) {
124 dout(20) << "r=" << r << dendl;
125
126 Context *on_finish = nullptr;
127 {
128 Mutex::Locker timer_locker(m_threads->timer_lock);
129 Mutex::Locker locker(m_lock);
130
131 if (r < 0) {
132 derr << "error registering leader watcher for " << m_oid << " object: "
133 << cpp_strerror(r) << dendl;
134 } else {
135 schedule_acquire_leader_lock(0);
136 }
137
138 std::swap(on_finish, m_on_finish);
139 }
140 on_finish->complete(r);
141}
142
143template <typename I>
144void LeaderWatcher<I>::shut_down() {
145 C_SaferCond shut_down_ctx;
146 shut_down(&shut_down_ctx);
147 int r = shut_down_ctx.wait();
148 assert(r == 0);
149}
150
151template <typename I>
152void LeaderWatcher<I>::shut_down(Context *on_finish) {
153 dout(20) << dendl;
154
155 Mutex::Locker timer_locker(m_threads->timer_lock);
156 Mutex::Locker locker(m_lock);
157
158 assert(m_on_shut_down_finish == nullptr);
159 m_on_shut_down_finish = on_finish;
160 cancel_timer_task();
161 shut_down_leader_lock();
162}
163
164template <typename I>
165void LeaderWatcher<I>::shut_down_leader_lock() {
166 dout(20) << dendl;
167
168 assert(m_lock.is_locked());
169
170 Context *ctx = create_async_context_callback(
171 m_work_queue, create_context_callback<
172 LeaderWatcher<I>, &LeaderWatcher<I>::handle_shut_down_leader_lock>(this));
173
174 m_leader_lock->shut_down(ctx);
175}
176
177template <typename I>
178void LeaderWatcher<I>::handle_shut_down_leader_lock(int r) {
179 dout(20) << "r=" << r << dendl;
180
181 Mutex::Locker locker(m_lock);
182
183 if (r < 0) {
184 derr << "error shutting down leader lock: " << cpp_strerror(r) << dendl;
185 }
186
187 unregister_watch();
188}
189
190template <typename I>
191void LeaderWatcher<I>::unregister_watch() {
192 dout(20) << dendl;
193
194 assert(m_lock.is_locked());
195
196 Context *ctx = create_async_context_callback(
197 m_work_queue, create_context_callback<
198 LeaderWatcher<I>, &LeaderWatcher<I>::handle_unregister_watch>(this));
199
200 librbd::Watcher::unregister_watch(ctx);
201}
202
203template <typename I>
204void LeaderWatcher<I>::handle_unregister_watch(int r) {
205 dout(20) << "r=" << r << dendl;
206
207 if (r < 0) {
208 derr << "error unregistering leader watcher for " << m_oid << " object: "
209 << cpp_strerror(r) << dendl;
210 }
211 wait_for_tasks();
212}
213
214template <typename I>
215void LeaderWatcher<I>::wait_for_tasks() {
216 dout(20) << dendl;
217
218 Mutex::Locker timer_locker(m_threads->timer_lock);
219 Mutex::Locker locker(m_lock);
220 schedule_timer_task("wait for tasks", 0, false,
221 &LeaderWatcher<I>::handle_wait_for_tasks, true);
222}
223
224template <typename I>
225void LeaderWatcher<I>::handle_wait_for_tasks() {
226 dout(20) << dendl;
227
228 assert(m_threads->timer_lock.is_locked());
229 assert(m_lock.is_locked());
230 assert(m_on_shut_down_finish != nullptr);
231
232 assert(!m_timer_op_tracker.empty());
233 m_timer_op_tracker.finish_op();
234
235 auto ctx = new FunctionContext([this](int r) {
236 Context *on_finish;
237 {
238 // ensure lock isn't held when completing shut down
239 Mutex::Locker locker(m_lock);
240 assert(m_on_shut_down_finish != nullptr);
241 on_finish = m_on_shut_down_finish;
242 }
243 on_finish->complete(0);
244 });
245 m_work_queue->queue(ctx, 0);
246}
247
248template <typename I>
249bool LeaderWatcher<I>::is_leader() const {
250 Mutex::Locker locker(m_lock);
251
252 return is_leader(m_lock);
253}
254
255template <typename I>
256bool LeaderWatcher<I>::is_leader(Mutex &lock) const {
257 assert(m_lock.is_locked());
258
259 bool leader = m_leader_lock->is_leader();
260 dout(20) << leader << dendl;
261 return leader;
262}
263
264template <typename I>
265bool LeaderWatcher<I>::is_releasing_leader() const {
266 Mutex::Locker locker(m_lock);
267
268 return is_releasing_leader(m_lock);
269}
270
271template <typename I>
272bool LeaderWatcher<I>::is_releasing_leader(Mutex &lock) const {
273 assert(m_lock.is_locked());
274
275 bool releasing = m_leader_lock->is_releasing_leader();
276 dout(20) << releasing << dendl;
277 return releasing;
278}
279
280template <typename I>
281bool LeaderWatcher<I>::get_leader_instance_id(std::string *instance_id) const {
282 dout(20) << dendl;
283
284 Mutex::Locker locker(m_lock);
285
286 if (is_leader(m_lock) || is_releasing_leader(m_lock)) {
287 *instance_id = stringify(m_notifier_id);
288 return true;
289 }
290
291 if (!m_locker.cookie.empty()) {
292 *instance_id = stringify(m_locker.entity.num());
293 return true;
294 }
295
296 return false;
297}
298
299template <typename I>
300void LeaderWatcher<I>::release_leader() {
301 dout(20) << dendl;
302
303 Mutex::Locker locker(m_lock);
304 if (!is_leader(m_lock)) {
305 return;
306 }
307
308 release_leader_lock();
309}
310
311template <typename I>
312void LeaderWatcher<I>::list_instances(std::vector<std::string> *instance_ids) {
313 dout(20) << dendl;
314
315 Mutex::Locker locker(m_lock);
316
317 instance_ids->clear();
318 if (m_instances != nullptr) {
319 m_instances->list(instance_ids);
320 }
321}
322
323template <typename I>
324void LeaderWatcher<I>::cancel_timer_task() {
325 assert(m_threads->timer_lock.is_locked());
326 assert(m_lock.is_locked());
327
328 if (m_timer_task == nullptr) {
329 return;
330 }
331
332 dout(20) << m_timer_task << dendl;
333 bool canceled = m_threads->timer->cancel_event(m_timer_task);
334 assert(canceled);
335 m_timer_task = nullptr;
336}
337
338template <typename I>
339void LeaderWatcher<I>::schedule_timer_task(const std::string &name,
340 int delay_factor, bool leader,
341 TimerCallback timer_callback,
342 bool shutting_down) {
343 assert(m_threads->timer_lock.is_locked());
344 assert(m_lock.is_locked());
345
346 if (!shutting_down && m_on_shut_down_finish != nullptr) {
347 return;
348 }
349
350 cancel_timer_task();
351
352 m_timer_task = new FunctionContext(
353 [this, leader, timer_callback](int r) {
354 assert(m_threads->timer_lock.is_locked());
355 m_timer_task = nullptr;
356
357 if (m_timer_op_tracker.empty()) {
358 Mutex::Locker locker(m_lock);
359 execute_timer_task(leader, timer_callback);
360 return;
361 }
362
363 // old timer task is still running -- do not start next
364 // task until the previous task completes
365 if (m_timer_gate == nullptr) {
366 m_timer_gate = new C_TimerGate(this);
367 m_timer_op_tracker.wait_for_ops(m_timer_gate);
368 }
369 m_timer_gate->leader = leader;
370 m_timer_gate->timer_callback = timer_callback;
371 });
372
373 int after = delay_factor *
374 max(1, m_cct->_conf->rbd_mirror_leader_heartbeat_interval);
375
376 dout(20) << "scheduling " << name << " after " << after << " sec (task "
377 << m_timer_task << ")" << dendl;
378 m_threads->timer->add_event_after(after, m_timer_task);
379}
380
381template <typename I>
382void LeaderWatcher<I>::execute_timer_task(bool leader,
383 TimerCallback timer_callback) {
384 dout(20) << dendl;
385
386 assert(m_threads->timer_lock.is_locked());
387 assert(m_lock.is_locked());
388 assert(m_timer_op_tracker.empty());
389
390 if (is_leader(m_lock) != leader) {
391 return;
392 }
393
394 m_timer_op_tracker.start_op();
395 (this->*timer_callback)();
396}
397
398template <typename I>
399void LeaderWatcher<I>::handle_post_acquire_leader_lock(int r,
400 Context *on_finish) {
401 dout(20) << "r=" << r << dendl;
402
403 if (r < 0) {
404 if (r == -EAGAIN) {
405 dout(20) << "already locked" << dendl;
406 } else {
407 derr << "error acquiring leader lock: " << cpp_strerror(r) << dendl;
408 }
409 on_finish->complete(r);
410 return;
411 }
412
413 Mutex::Locker locker(m_lock);
414 assert(m_on_finish == nullptr);
415 m_on_finish = on_finish;
416 m_ret_val = 0;
417
418 init_status_watcher();
419}
420
421template <typename I>
422void LeaderWatcher<I>::handle_pre_release_leader_lock(Context *on_finish) {
423 dout(20) << dendl;
424
425 Mutex::Locker locker(m_lock);
426 assert(m_on_finish == nullptr);
427 m_on_finish = on_finish;
428 m_ret_val = 0;
429
430 notify_listener();
431}
432
433template <typename I>
434void LeaderWatcher<I>::handle_post_release_leader_lock(int r,
435 Context *on_finish) {
436 dout(20) << "r=" << r << dendl;
437
438 if (r < 0) {
439 on_finish->complete(r);
440 return;
441 }
442
443 Mutex::Locker locker(m_lock);
444 assert(m_on_finish == nullptr);
445 m_on_finish = on_finish;
446
447 notify_lock_released();
448}
449
450template <typename I>
451void LeaderWatcher<I>::break_leader_lock() {
452 dout(20) << dendl;
453
454 assert(m_threads->timer_lock.is_locked());
455 assert(m_lock.is_locked());
456 assert(!m_timer_op_tracker.empty());
457
458 if (m_locker.cookie.empty()) {
459 get_locker();
460 return;
461 }
462
463 Context *ctx = create_async_context_callback(
464 m_work_queue, create_context_callback<
465 LeaderWatcher<I>, &LeaderWatcher<I>::handle_break_leader_lock>(this));
466
467 m_leader_lock->break_lock(m_locker, true, ctx);
468}
469
470template <typename I>
471void LeaderWatcher<I>::handle_break_leader_lock(int r) {
472 dout(20) << "r=" << r << dendl;
473
474 Mutex::Locker timer_locker(m_threads->timer_lock);
475 Mutex::Locker locker(m_lock);
476 assert(!m_timer_op_tracker.empty());
477
478 if (m_leader_lock->is_shutdown()) {
479 dout(20) << "canceling due to shutdown" << dendl;
480 m_timer_op_tracker.finish_op();
481 return;
482 }
483
484 if (r < 0 && r != -ENOENT) {
485 derr << "error beaking leader lock: " << cpp_strerror(r) << dendl;
486 schedule_acquire_leader_lock(1);
487 m_timer_op_tracker.finish_op();
488 return;
489 }
490
491 m_locker = {};
492 m_acquire_attempts = 0;
493 acquire_leader_lock();
494}
495
496template <typename I>
497void LeaderWatcher<I>::schedule_get_locker(bool reset_leader,
498 uint32_t delay_factor) {
499 dout(20) << dendl;
500
501 assert(m_threads->timer_lock.is_locked());
502 assert(m_lock.is_locked());
503
504 if (reset_leader) {
505 m_locker = {};
506 m_acquire_attempts = 0;
507 }
508
509 schedule_timer_task("get locker", delay_factor, false,
510 &LeaderWatcher<I>::get_locker, false);
511}
512
513template <typename I>
514void LeaderWatcher<I>::get_locker() {
515 dout(20) << dendl;
516
517 assert(m_threads->timer_lock.is_locked());
518 assert(m_lock.is_locked());
519 assert(!m_timer_op_tracker.empty());
520
521 C_GetLocker *get_locker_ctx = new C_GetLocker(this);
522 Context *ctx = create_async_context_callback(m_work_queue, get_locker_ctx);
523
524 m_leader_lock->get_locker(&get_locker_ctx->locker, ctx);
525}
526
527template <typename I>
528void LeaderWatcher<I>::handle_get_locker(int r,
529 librbd::managed_lock::Locker& locker) {
530 dout(20) << "r=" << r << dendl;
531
532 Mutex::Locker timer_locker(m_threads->timer_lock);
533 Mutex::Locker mutex_locker(m_lock);
534 assert(!m_timer_op_tracker.empty());
535
536 if (m_leader_lock->is_shutdown()) {
537 dout(20) << "canceling due to shutdown" << dendl;
538 m_timer_op_tracker.finish_op();
539 return;
540 }
541
542 if (is_leader(m_lock)) {
543 m_locker = {};
544 m_timer_op_tracker.finish_op();
545 return;
546 }
547
548 if (r == -ENOENT) {
549 m_locker = {};
550 m_acquire_attempts = 0;
551 acquire_leader_lock();
552 return;
553 } else if (r < 0) {
554 derr << "error retrieving leader locker: " << cpp_strerror(r) << dendl;
555 schedule_get_locker(true, 1);
556 m_timer_op_tracker.finish_op();
557 return;
558 }
559
31f18b77 560 bool notify_listener = false;
7c673cae
FG
561 if (m_locker != locker) {
562 m_locker = locker;
31f18b77 563 notify_listener = true;
7c673cae
FG
564 if (m_acquire_attempts > 1) {
565 dout(10) << "new lock owner detected -- resetting heartbeat counter"
566 << dendl;
567 m_acquire_attempts = 0;
568 }
569 }
570
571 if (m_acquire_attempts >=
572 m_cct->_conf->rbd_mirror_leader_max_acquire_attempts_before_break) {
573 dout(0) << "breaking leader lock after " << m_acquire_attempts << " "
574 << "failed attempts to acquire" << dendl;
575 break_leader_lock();
31f18b77
FG
576 return;
577 }
578
579 schedule_acquire_leader_lock(1);
580
581 if (!notify_listener) {
7c673cae 582 m_timer_op_tracker.finish_op();
31f18b77 583 return;
7c673cae 584 }
31f18b77
FG
585
586 auto ctx = new FunctionContext(
587 [this](int r) {
588 std::string instance_id;
589 if (get_leader_instance_id(&instance_id)) {
590 m_listener->update_leader_handler(instance_id);
591 }
592 Mutex::Locker timer_locker(m_threads->timer_lock);
593 Mutex::Locker locker(m_lock);
594 m_timer_op_tracker.finish_op();
595 });
596 m_work_queue->queue(ctx, 0);
7c673cae
FG
597}
598
599template <typename I>
600void LeaderWatcher<I>::schedule_acquire_leader_lock(uint32_t delay_factor) {
601 dout(20) << dendl;
602
603 assert(m_threads->timer_lock.is_locked());
604 assert(m_lock.is_locked());
605
606 schedule_timer_task("acquire leader lock",
607 delay_factor *
608 m_cct->_conf->rbd_mirror_leader_max_missed_heartbeats,
609 false, &LeaderWatcher<I>::acquire_leader_lock, false);
610}
611
612template <typename I>
613void LeaderWatcher<I>::acquire_leader_lock() {
614 assert(m_threads->timer_lock.is_locked());
615 assert(m_lock.is_locked());
616 assert(!m_timer_op_tracker.empty());
617
618 ++m_acquire_attempts;
619 dout(20) << "acquire_attempts=" << m_acquire_attempts << dendl;
620
621 Context *ctx = create_async_context_callback(
622 m_work_queue, create_context_callback<
623 LeaderWatcher<I>, &LeaderWatcher<I>::handle_acquire_leader_lock>(this));
624 m_leader_lock->try_acquire_lock(ctx);
625}
626
627template <typename I>
628void LeaderWatcher<I>::handle_acquire_leader_lock(int r) {
629 dout(20) << "r=" << r << dendl;
630
631 Mutex::Locker timer_locker(m_threads->timer_lock);
632 Mutex::Locker locker(m_lock);
633 assert(!m_timer_op_tracker.empty());
634
635 if (m_leader_lock->is_shutdown()) {
636 dout(20) << "canceling due to shutdown" << dendl;
637 m_timer_op_tracker.finish_op();
638 return;
639 }
640
641 if (r < 0) {
642 if (r == -EAGAIN) {
643 dout(20) << "already locked" << dendl;
644 } else {
645 derr << "error acquiring lock: " << cpp_strerror(r) << dendl;
646 }
647
648 get_locker();
649 return;
650 }
651
652 m_locker = {};
653 m_acquire_attempts = 0;
654
655 if (m_ret_val) {
656 dout(5) << "releasing due to error on notify" << dendl;
657 release_leader_lock();
658 m_timer_op_tracker.finish_op();
659 return;
660 }
661
662 notify_heartbeat();
663}
664
665template <typename I>
666void LeaderWatcher<I>::release_leader_lock() {
667 dout(20) << dendl;
668
669 assert(m_lock.is_locked());
670
671 Context *ctx = create_async_context_callback(
672 m_work_queue, create_context_callback<
673 LeaderWatcher<I>, &LeaderWatcher<I>::handle_release_leader_lock>(this));
674
675 m_leader_lock->release_lock(ctx);
676}
677
678template <typename I>
679void LeaderWatcher<I>::handle_release_leader_lock(int r) {
680 dout(20) << "r=" << r << dendl;
681
682 Mutex::Locker timer_locker(m_threads->timer_lock);
683 Mutex::Locker locker(m_lock);
684
685 if (r < 0) {
686 derr << "error releasing lock: " << cpp_strerror(r) << dendl;
687 return;
688 }
689
690 schedule_acquire_leader_lock(1);
691}
692
693template <typename I>
694void LeaderWatcher<I>::init_status_watcher() {
695 dout(20) << dendl;
696
697 assert(m_lock.is_locked());
698 assert(m_status_watcher == nullptr);
699
700 m_status_watcher = MirrorStatusWatcher<I>::create(m_ioctx, m_work_queue);
701
702 Context *ctx = create_context_callback<
703 LeaderWatcher<I>, &LeaderWatcher<I>::handle_init_status_watcher>(this);
704
705 m_status_watcher->init(ctx);
706}
707
708template <typename I>
709void LeaderWatcher<I>::handle_init_status_watcher(int r) {
710 dout(20) << "r=" << r << dendl;
711
712 Context *on_finish = nullptr;
713 {
714 Mutex::Locker locker(m_lock);
715
716 if (r == 0) {
717 init_instances();
718 return;
719 }
720
721 derr << "error initializing mirror status watcher: " << cpp_strerror(r)
722 << dendl;
723 m_status_watcher->destroy();
724 m_status_watcher = nullptr;
725 assert(m_on_finish != nullptr);
726 std::swap(m_on_finish, on_finish);
727 }
728 on_finish->complete(r);
729}
730
731template <typename I>
732void LeaderWatcher<I>::shut_down_status_watcher() {
733 dout(20) << dendl;
734
735 assert(m_lock.is_locked());
736 assert(m_status_watcher != nullptr);
737
738 Context *ctx = create_async_context_callback(
739 m_work_queue, create_context_callback<LeaderWatcher<I>,
740 &LeaderWatcher<I>::handle_shut_down_status_watcher>(this));
741
742 m_status_watcher->shut_down(ctx);
743}
744
745template <typename I>
746void LeaderWatcher<I>::handle_shut_down_status_watcher(int r) {
747 dout(20) << "r=" << r << dendl;
748
749 Context *on_finish = nullptr;
750 {
751 Mutex::Locker locker(m_lock);
752
753 m_status_watcher->destroy();
754 m_status_watcher = nullptr;
755
756 if (r < 0) {
757 derr << "error shutting mirror status watcher down: " << cpp_strerror(r)
758 << dendl;
759 }
760
761 if (m_ret_val != 0) {
762 r = m_ret_val;
763 }
764
765 if (!is_leader(m_lock)) {
766 // ignore on releasing
767 r = 0;
768 }
769
770 assert(m_on_finish != nullptr);
771 std::swap(m_on_finish, on_finish);
772 }
773 on_finish->complete(r);
774}
775
776template <typename I>
777void LeaderWatcher<I>::init_instances() {
778 dout(20) << dendl;
779
780 assert(m_lock.is_locked());
781 assert(m_instances == nullptr);
782
783 m_instances = Instances<I>::create(m_threads, m_ioctx);
784
785 Context *ctx = create_context_callback<
786 LeaderWatcher<I>, &LeaderWatcher<I>::handle_init_instances>(this);
787
788 m_instances->init(ctx);
789}
790
791template <typename I>
792void LeaderWatcher<I>::handle_init_instances(int r) {
793 dout(20) << "r=" << r << dendl;
794
795 Mutex::Locker locker(m_lock);
796
797 if (r < 0) {
798 derr << "error initializing instances: " << cpp_strerror(r) << dendl;
799 m_ret_val = r;
800 m_instances->destroy();
801 m_instances = nullptr;
802 shut_down_status_watcher();
803 return;
804 }
805
806 notify_listener();
807}
808
809template <typename I>
810void LeaderWatcher<I>::shut_down_instances() {
811 dout(20) << dendl;
812
813 assert(m_lock.is_locked());
814 assert(m_instances != nullptr);
815
816 Context *ctx = create_async_context_callback(
817 m_work_queue, create_context_callback<LeaderWatcher<I>,
818 &LeaderWatcher<I>::handle_shut_down_instances>(this));
819
820 m_instances->shut_down(ctx);
821}
822
823template <typename I>
824void LeaderWatcher<I>::handle_shut_down_instances(int r) {
825 dout(20) << "r=" << r << dendl;
826 assert(r == 0);
827
828 Mutex::Locker locker(m_lock);
829
830 m_instances->destroy();
831 m_instances = nullptr;
832
833 shut_down_status_watcher();
834}
835
836template <typename I>
837void LeaderWatcher<I>::notify_listener() {
838 dout(20) << dendl;
839
840 assert(m_lock.is_locked());
841
842 Context *ctx = create_async_context_callback(
843 m_work_queue, create_context_callback<
844 LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_listener>(this));
845
846 if (is_leader(m_lock)) {
847 ctx = new FunctionContext(
848 [this, ctx](int r) {
849 m_listener->post_acquire_handler(ctx);
850 });
851 } else {
852 ctx = new FunctionContext(
853 [this, ctx](int r) {
854 m_listener->pre_release_handler(ctx);
855 });
856 }
857 m_work_queue->queue(ctx, 0);
858}
859
860template <typename I>
861void LeaderWatcher<I>::handle_notify_listener(int r) {
862 dout(20) << "r=" << r << dendl;
863
864 Mutex::Locker locker(m_lock);
865
866 if (r < 0) {
867 derr << "error notifying listener: " << cpp_strerror(r) << dendl;
868 m_ret_val = r;
869 }
870
871 if (is_leader(m_lock)) {
872 notify_lock_acquired();
873 } else {
874 shut_down_instances();
875 }
876}
877
878template <typename I>
879void LeaderWatcher<I>::notify_lock_acquired() {
880 dout(20) << dendl;
881
882 assert(m_lock.is_locked());
883
884 Context *ctx = create_context_callback<
885 LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_lock_acquired>(this);
886
887 bufferlist bl;
888 ::encode(NotifyMessage{LockAcquiredPayload{}}, bl);
889
890 send_notify(bl, nullptr, ctx);
891}
892
893template <typename I>
894void LeaderWatcher<I>::handle_notify_lock_acquired(int r) {
895 dout(20) << "r=" << r << dendl;
896
897 Context *on_finish = nullptr;
898 {
899 Mutex::Locker locker(m_lock);
900 if (r < 0 && r != -ETIMEDOUT) {
901 derr << "error notifying leader lock acquired: " << cpp_strerror(r)
902 << dendl;
903 m_ret_val = r;
904 }
905
906 assert(m_on_finish != nullptr);
907 std::swap(m_on_finish, on_finish);
908 }
909 on_finish->complete(0);
910}
911
912template <typename I>
913void LeaderWatcher<I>::notify_lock_released() {
914 dout(20) << dendl;
915
916 assert(m_lock.is_locked());
917
918 Context *ctx = create_context_callback<
919 LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_lock_released>(this);
920
921 bufferlist bl;
922 ::encode(NotifyMessage{LockReleasedPayload{}}, bl);
923
924 send_notify(bl, nullptr, ctx);
925}
926
927template <typename I>
928void LeaderWatcher<I>::handle_notify_lock_released(int r) {
929 dout(20) << "r=" << r << dendl;
930
931 Context *on_finish = nullptr;
932 {
933 Mutex::Locker locker(m_lock);
934 if (r < 0 && r != -ETIMEDOUT) {
935 derr << "error notifying leader lock released: " << cpp_strerror(r)
936 << dendl;
937 }
938
939 assert(m_on_finish != nullptr);
940 std::swap(m_on_finish, on_finish);
941 }
942 on_finish->complete(r);
943}
944
945template <typename I>
946void LeaderWatcher<I>::notify_heartbeat() {
947 dout(20) << dendl;
948
949 assert(m_threads->timer_lock.is_locked());
950 assert(m_lock.is_locked());
951 assert(!m_timer_op_tracker.empty());
952
953 if (!is_leader(m_lock)) {
954 dout(5) << "not leader, canceling" << dendl;
955 m_timer_op_tracker.finish_op();
956 return;
957 }
958
959 Context *ctx = create_context_callback<
960 LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_heartbeat>(this);
961
962 bufferlist bl;
963 ::encode(NotifyMessage{HeartbeatPayload{}}, bl);
964
965 m_heartbeat_response.acks.clear();
966 send_notify(bl, &m_heartbeat_response, ctx);
967}
968
969template <typename I>
970void LeaderWatcher<I>::handle_notify_heartbeat(int r) {
971 dout(20) << "r=" << r << dendl;
972
973 Mutex::Locker timer_locker(m_threads->timer_lock);
974 Mutex::Locker locker(m_lock);
975 assert(!m_timer_op_tracker.empty());
976
977 m_timer_op_tracker.finish_op();
978 if (m_leader_lock->is_shutdown()) {
979 dout(20) << "canceling due to shutdown" << dendl;
980 return;
981 } else if (!is_leader(m_lock)) {
982 return;
983 }
984
985 if (r < 0 && r != -ETIMEDOUT) {
986 derr << "error notifying hearbeat: " << cpp_strerror(r)
987 << ", releasing leader" << dendl;
988 release_leader_lock();
989 return;
990 }
991
992 dout(20) << m_heartbeat_response.acks.size() << " acks received, "
993 << m_heartbeat_response.timeouts.size() << " timed out" << dendl;
994
995 for (auto &it: m_heartbeat_response.acks) {
996 uint64_t notifier_id = it.first.gid;
997 if (notifier_id == m_notifier_id) {
998 continue;
999 }
1000
1001 std::string instance_id = stringify(notifier_id);
1002 m_instances->notify(instance_id);
1003 }
1004
1005 schedule_timer_task("heartbeat", 1, true,
1006 &LeaderWatcher<I>::notify_heartbeat, false);
1007}
1008
1009template <typename I>
1010void LeaderWatcher<I>::handle_heartbeat(Context *on_notify_ack) {
1011 dout(20) << dendl;
1012
1013 {
1014 Mutex::Locker timer_locker(m_threads->timer_lock);
1015 Mutex::Locker locker(m_lock);
1016 if (is_leader(m_lock)) {
1017 dout(5) << "got another leader heartbeat, ignoring" << dendl;
1018 } else {
1019 cancel_timer_task();
1020 m_acquire_attempts = 0;
1021 schedule_acquire_leader_lock(1);
1022 }
1023 }
1024
1025 on_notify_ack->complete(0);
1026}
1027
1028template <typename I>
1029void LeaderWatcher<I>::handle_lock_acquired(Context *on_notify_ack) {
1030 dout(20) << dendl;
1031
1032 {
1033 Mutex::Locker timer_locker(m_threads->timer_lock);
1034 Mutex::Locker locker(m_lock);
1035 if (is_leader(m_lock)) {
1036 dout(5) << "got another leader lock_acquired, ignoring" << dendl;
1037 } else {
1038 cancel_timer_task();
1039 schedule_get_locker(true, 0);
1040 }
1041 }
1042
1043 on_notify_ack->complete(0);
1044}
1045
1046template <typename I>
1047void LeaderWatcher<I>::handle_lock_released(Context *on_notify_ack) {
1048 dout(20) << dendl;
1049
1050 {
1051 Mutex::Locker timer_locker(m_threads->timer_lock);
1052 Mutex::Locker locker(m_lock);
1053 if (is_leader(m_lock)) {
1054 dout(5) << "got another leader lock_released, ignoring" << dendl;
1055 } else {
1056 cancel_timer_task();
1057 schedule_get_locker(true, 0);
1058 }
1059 }
1060
1061 on_notify_ack->complete(0);
1062}
1063
1064template <typename I>
1065void LeaderWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
1066 uint64_t notifier_id, bufferlist &bl) {
1067 dout(20) << "notify_id=" << notify_id << ", handle=" << handle << ", "
1068 << "notifier_id=" << notifier_id << dendl;
1069
1070 Context *ctx = new C_NotifyAck(this, notify_id, handle);
1071
1072 if (notifier_id == m_notifier_id) {
1073 dout(20) << "our own notification, ignoring" << dendl;
1074 ctx->complete(0);
1075 return;
1076 }
1077
1078 NotifyMessage notify_message;
1079 try {
1080 bufferlist::iterator iter = bl.begin();
1081 ::decode(notify_message, iter);
1082 } catch (const buffer::error &err) {
1083 derr << ": error decoding image notification: " << err.what() << dendl;
1084 ctx->complete(0);
1085 return;
1086 }
1087
1088 apply_visitor(HandlePayloadVisitor(this, ctx), notify_message.payload);
1089}
1090
1091template <typename I>
1092void LeaderWatcher<I>::handle_payload(const HeartbeatPayload &payload,
1093 Context *on_notify_ack) {
1094 dout(20) << "heartbeat" << dendl;
1095
1096 handle_heartbeat(on_notify_ack);
1097}
1098
1099template <typename I>
1100void LeaderWatcher<I>::handle_payload(const LockAcquiredPayload &payload,
1101 Context *on_notify_ack) {
1102 dout(20) << "lock_acquired" << dendl;
1103
1104 handle_lock_acquired(on_notify_ack);
1105}
1106
1107template <typename I>
1108void LeaderWatcher<I>::handle_payload(const LockReleasedPayload &payload,
1109 Context *on_notify_ack) {
1110 dout(20) << "lock_released" << dendl;
1111
1112 handle_lock_released(on_notify_ack);
1113}
1114
1115template <typename I>
1116void LeaderWatcher<I>::handle_payload(const UnknownPayload &payload,
1117 Context *on_notify_ack) {
1118 dout(20) << "unknown" << dendl;
1119
1120 on_notify_ack->complete(0);
1121}
1122
1123} // namespace mirror
1124} // namespace rbd
1125
1126template class rbd::mirror::LeaderWatcher<librbd::ImageCtx>;