]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/LeaderWatcher.cc
update sources to v12.1.2
[ceph.git] / ceph / src / tools / rbd_mirror / LeaderWatcher.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "LeaderWatcher.h"
5#include "common/Timer.h"
6#include "common/debug.h"
7#include "common/errno.h"
8#include "cls/rbd/cls_rbd_client.h"
9#include "include/stringify.h"
10#include "librbd/Utils.h"
11#include "librbd/watcher/Types.h"
12#include "Threads.h"
13
14#define dout_context g_ceph_context
15#define dout_subsys ceph_subsys_rbd_mirror
16#undef dout_prefix
17#define dout_prefix *_dout << "rbd::mirror::LeaderWatcher: " \
18 << this << " " << __func__ << ": "
19namespace rbd {
20namespace mirror {
21
22using namespace leader_watcher;
23
24using librbd::util::create_async_context_callback;
25using librbd::util::create_context_callback;
26using librbd::util::create_rados_callback;
27
28template <typename I>
29LeaderWatcher<I>::LeaderWatcher(Threads<I> *threads, librados::IoCtx &io_ctx,
30 Listener *listener)
31 : Watcher(io_ctx, threads->work_queue, RBD_MIRROR_LEADER),
32 m_threads(threads), m_listener(listener),
33 m_lock("rbd::mirror::LeaderWatcher " + io_ctx.get_pool_name()),
34 m_notifier_id(librados::Rados(io_ctx).get_instance_id()),
35 m_leader_lock(new LeaderLock(m_ioctx, m_work_queue, m_oid, this, true,
36 m_cct->_conf->rbd_blacklist_expire_seconds)) {
37}
38
39template <typename I>
40LeaderWatcher<I>::~LeaderWatcher() {
41 assert(m_status_watcher == nullptr);
42 assert(m_instances == nullptr);
43 assert(m_timer_task == nullptr);
44
45 delete m_leader_lock;
46}
47
31f18b77
FG
48template <typename I>
49std::string LeaderWatcher<I>::get_instance_id() {
50 return stringify(m_notifier_id);
51}
52
7c673cae
FG
53template <typename I>
54int LeaderWatcher<I>::init() {
55 C_SaferCond init_ctx;
56 init(&init_ctx);
57 return init_ctx.wait();
58}
59
60template <typename I>
61void LeaderWatcher<I>::init(Context *on_finish) {
62 dout(20) << "notifier_id=" << m_notifier_id << dendl;
63
64 Mutex::Locker locker(m_lock);
65
66 assert(m_on_finish == nullptr);
67 m_on_finish = on_finish;
68
69 create_leader_object();
70}
71
72template <typename I>
73void LeaderWatcher<I>::create_leader_object() {
74 dout(20) << dendl;
75
76 assert(m_lock.is_locked());
77
78 librados::ObjectWriteOperation op;
79 op.create(false);
80
81 librados::AioCompletion *aio_comp = create_rados_callback<
82 LeaderWatcher<I>, &LeaderWatcher<I>::handle_create_leader_object>(this);
83 int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
84 assert(r == 0);
85 aio_comp->release();
86}
87
88template <typename I>
89void LeaderWatcher<I>::handle_create_leader_object(int r) {
90 dout(20) << "r=" << r << dendl;
91
92 Context *on_finish = nullptr;
93 {
94 Mutex::Locker locker(m_lock);
95
96 if (r == 0) {
97 register_watch();
98 return;
99 }
100
101 derr << "error creating " << m_oid << " object: " << cpp_strerror(r)
102 << dendl;
103
104 std::swap(on_finish, m_on_finish);
105 }
106 on_finish->complete(r);
107}
108
109template <typename I>
110void LeaderWatcher<I>::register_watch() {
111 dout(20) << dendl;
112
113 assert(m_lock.is_locked());
114
115 Context *ctx = create_async_context_callback(
116 m_work_queue, create_context_callback<
117 LeaderWatcher<I>, &LeaderWatcher<I>::handle_register_watch>(this));
118
119 librbd::Watcher::register_watch(ctx);
120}
121
122template <typename I>
123void LeaderWatcher<I>::handle_register_watch(int r) {
124 dout(20) << "r=" << r << dendl;
125
126 Context *on_finish = nullptr;
127 {
128 Mutex::Locker timer_locker(m_threads->timer_lock);
129 Mutex::Locker locker(m_lock);
130
131 if (r < 0) {
132 derr << "error registering leader watcher for " << m_oid << " object: "
133 << cpp_strerror(r) << dendl;
134 } else {
135 schedule_acquire_leader_lock(0);
136 }
137
138 std::swap(on_finish, m_on_finish);
139 }
140 on_finish->complete(r);
141}
142
143template <typename I>
144void LeaderWatcher<I>::shut_down() {
145 C_SaferCond shut_down_ctx;
146 shut_down(&shut_down_ctx);
147 int r = shut_down_ctx.wait();
148 assert(r == 0);
149}
150
151template <typename I>
152void LeaderWatcher<I>::shut_down(Context *on_finish) {
153 dout(20) << dendl;
154
155 Mutex::Locker timer_locker(m_threads->timer_lock);
156 Mutex::Locker locker(m_lock);
157
158 assert(m_on_shut_down_finish == nullptr);
159 m_on_shut_down_finish = on_finish;
160 cancel_timer_task();
161 shut_down_leader_lock();
162}
163
164template <typename I>
165void LeaderWatcher<I>::shut_down_leader_lock() {
166 dout(20) << dendl;
167
168 assert(m_lock.is_locked());
169
170 Context *ctx = create_async_context_callback(
171 m_work_queue, create_context_callback<
172 LeaderWatcher<I>, &LeaderWatcher<I>::handle_shut_down_leader_lock>(this));
173
174 m_leader_lock->shut_down(ctx);
175}
176
177template <typename I>
178void LeaderWatcher<I>::handle_shut_down_leader_lock(int r) {
179 dout(20) << "r=" << r << dendl;
180
181 Mutex::Locker locker(m_lock);
182
183 if (r < 0) {
184 derr << "error shutting down leader lock: " << cpp_strerror(r) << dendl;
185 }
186
187 unregister_watch();
188}
189
190template <typename I>
191void LeaderWatcher<I>::unregister_watch() {
192 dout(20) << dendl;
193
194 assert(m_lock.is_locked());
195
196 Context *ctx = create_async_context_callback(
197 m_work_queue, create_context_callback<
198 LeaderWatcher<I>, &LeaderWatcher<I>::handle_unregister_watch>(this));
199
200 librbd::Watcher::unregister_watch(ctx);
201}
202
203template <typename I>
204void LeaderWatcher<I>::handle_unregister_watch(int r) {
205 dout(20) << "r=" << r << dendl;
206
207 if (r < 0) {
208 derr << "error unregistering leader watcher for " << m_oid << " object: "
209 << cpp_strerror(r) << dendl;
210 }
211 wait_for_tasks();
212}
213
214template <typename I>
215void LeaderWatcher<I>::wait_for_tasks() {
216 dout(20) << dendl;
217
218 Mutex::Locker timer_locker(m_threads->timer_lock);
219 Mutex::Locker locker(m_lock);
220 schedule_timer_task("wait for tasks", 0, false,
221 &LeaderWatcher<I>::handle_wait_for_tasks, true);
222}
223
224template <typename I>
225void LeaderWatcher<I>::handle_wait_for_tasks() {
226 dout(20) << dendl;
227
228 assert(m_threads->timer_lock.is_locked());
229 assert(m_lock.is_locked());
230 assert(m_on_shut_down_finish != nullptr);
231
232 assert(!m_timer_op_tracker.empty());
233 m_timer_op_tracker.finish_op();
234
235 auto ctx = new FunctionContext([this](int r) {
236 Context *on_finish;
237 {
238 // ensure lock isn't held when completing shut down
239 Mutex::Locker locker(m_lock);
240 assert(m_on_shut_down_finish != nullptr);
241 on_finish = m_on_shut_down_finish;
242 }
243 on_finish->complete(0);
244 });
245 m_work_queue->queue(ctx, 0);
246}
247
248template <typename I>
249bool LeaderWatcher<I>::is_leader() const {
250 Mutex::Locker locker(m_lock);
251
252 return is_leader(m_lock);
253}
254
255template <typename I>
256bool LeaderWatcher<I>::is_leader(Mutex &lock) const {
257 assert(m_lock.is_locked());
258
259 bool leader = m_leader_lock->is_leader();
260 dout(20) << leader << dendl;
261 return leader;
262}
263
264template <typename I>
265bool LeaderWatcher<I>::is_releasing_leader() const {
266 Mutex::Locker locker(m_lock);
267
268 return is_releasing_leader(m_lock);
269}
270
271template <typename I>
272bool LeaderWatcher<I>::is_releasing_leader(Mutex &lock) const {
273 assert(m_lock.is_locked());
274
275 bool releasing = m_leader_lock->is_releasing_leader();
276 dout(20) << releasing << dendl;
277 return releasing;
278}
279
280template <typename I>
281bool LeaderWatcher<I>::get_leader_instance_id(std::string *instance_id) const {
282 dout(20) << dendl;
283
284 Mutex::Locker locker(m_lock);
285
286 if (is_leader(m_lock) || is_releasing_leader(m_lock)) {
287 *instance_id = stringify(m_notifier_id);
288 return true;
289 }
290
291 if (!m_locker.cookie.empty()) {
292 *instance_id = stringify(m_locker.entity.num());
293 return true;
294 }
295
296 return false;
297}
298
299template <typename I>
300void LeaderWatcher<I>::release_leader() {
301 dout(20) << dendl;
302
303 Mutex::Locker locker(m_lock);
304 if (!is_leader(m_lock)) {
305 return;
306 }
307
308 release_leader_lock();
309}
310
311template <typename I>
312void LeaderWatcher<I>::list_instances(std::vector<std::string> *instance_ids) {
313 dout(20) << dendl;
314
315 Mutex::Locker locker(m_lock);
316
317 instance_ids->clear();
318 if (m_instances != nullptr) {
319 m_instances->list(instance_ids);
320 }
321}
322
323template <typename I>
324void LeaderWatcher<I>::cancel_timer_task() {
325 assert(m_threads->timer_lock.is_locked());
326 assert(m_lock.is_locked());
327
328 if (m_timer_task == nullptr) {
329 return;
330 }
331
332 dout(20) << m_timer_task << dendl;
333 bool canceled = m_threads->timer->cancel_event(m_timer_task);
334 assert(canceled);
335 m_timer_task = nullptr;
336}
337
338template <typename I>
339void LeaderWatcher<I>::schedule_timer_task(const std::string &name,
340 int delay_factor, bool leader,
341 TimerCallback timer_callback,
342 bool shutting_down) {
343 assert(m_threads->timer_lock.is_locked());
344 assert(m_lock.is_locked());
345
346 if (!shutting_down && m_on_shut_down_finish != nullptr) {
347 return;
348 }
349
350 cancel_timer_task();
351
352 m_timer_task = new FunctionContext(
353 [this, leader, timer_callback](int r) {
354 assert(m_threads->timer_lock.is_locked());
355 m_timer_task = nullptr;
356
357 if (m_timer_op_tracker.empty()) {
358 Mutex::Locker locker(m_lock);
359 execute_timer_task(leader, timer_callback);
360 return;
361 }
362
363 // old timer task is still running -- do not start next
364 // task until the previous task completes
365 if (m_timer_gate == nullptr) {
366 m_timer_gate = new C_TimerGate(this);
367 m_timer_op_tracker.wait_for_ops(m_timer_gate);
368 }
369 m_timer_gate->leader = leader;
370 m_timer_gate->timer_callback = timer_callback;
371 });
372
c07f9fc5 373 int after = delay_factor * m_cct->_conf->rbd_mirror_leader_heartbeat_interval;
7c673cae
FG
374
375 dout(20) << "scheduling " << name << " after " << after << " sec (task "
376 << m_timer_task << ")" << dendl;
377 m_threads->timer->add_event_after(after, m_timer_task);
378}
379
380template <typename I>
381void LeaderWatcher<I>::execute_timer_task(bool leader,
382 TimerCallback timer_callback) {
383 dout(20) << dendl;
384
385 assert(m_threads->timer_lock.is_locked());
386 assert(m_lock.is_locked());
387 assert(m_timer_op_tracker.empty());
388
389 if (is_leader(m_lock) != leader) {
390 return;
391 }
392
393 m_timer_op_tracker.start_op();
394 (this->*timer_callback)();
395}
396
397template <typename I>
398void LeaderWatcher<I>::handle_post_acquire_leader_lock(int r,
399 Context *on_finish) {
400 dout(20) << "r=" << r << dendl;
401
402 if (r < 0) {
403 if (r == -EAGAIN) {
404 dout(20) << "already locked" << dendl;
405 } else {
406 derr << "error acquiring leader lock: " << cpp_strerror(r) << dendl;
407 }
408 on_finish->complete(r);
409 return;
410 }
411
412 Mutex::Locker locker(m_lock);
413 assert(m_on_finish == nullptr);
414 m_on_finish = on_finish;
415 m_ret_val = 0;
416
417 init_status_watcher();
418}
419
420template <typename I>
421void LeaderWatcher<I>::handle_pre_release_leader_lock(Context *on_finish) {
422 dout(20) << dendl;
423
424 Mutex::Locker locker(m_lock);
425 assert(m_on_finish == nullptr);
426 m_on_finish = on_finish;
427 m_ret_val = 0;
428
429 notify_listener();
430}
431
432template <typename I>
433void LeaderWatcher<I>::handle_post_release_leader_lock(int r,
434 Context *on_finish) {
435 dout(20) << "r=" << r << dendl;
436
437 if (r < 0) {
438 on_finish->complete(r);
439 return;
440 }
441
442 Mutex::Locker locker(m_lock);
443 assert(m_on_finish == nullptr);
444 m_on_finish = on_finish;
445
446 notify_lock_released();
447}
448
449template <typename I>
450void LeaderWatcher<I>::break_leader_lock() {
451 dout(20) << dendl;
452
453 assert(m_threads->timer_lock.is_locked());
454 assert(m_lock.is_locked());
455 assert(!m_timer_op_tracker.empty());
456
457 if (m_locker.cookie.empty()) {
458 get_locker();
459 return;
460 }
461
462 Context *ctx = create_async_context_callback(
463 m_work_queue, create_context_callback<
464 LeaderWatcher<I>, &LeaderWatcher<I>::handle_break_leader_lock>(this));
465
466 m_leader_lock->break_lock(m_locker, true, ctx);
467}
468
469template <typename I>
470void LeaderWatcher<I>::handle_break_leader_lock(int r) {
471 dout(20) << "r=" << r << dendl;
472
473 Mutex::Locker timer_locker(m_threads->timer_lock);
474 Mutex::Locker locker(m_lock);
475 assert(!m_timer_op_tracker.empty());
476
477 if (m_leader_lock->is_shutdown()) {
478 dout(20) << "canceling due to shutdown" << dendl;
479 m_timer_op_tracker.finish_op();
480 return;
481 }
482
483 if (r < 0 && r != -ENOENT) {
484 derr << "error beaking leader lock: " << cpp_strerror(r) << dendl;
485 schedule_acquire_leader_lock(1);
486 m_timer_op_tracker.finish_op();
487 return;
488 }
489
490 m_locker = {};
491 m_acquire_attempts = 0;
492 acquire_leader_lock();
493}
494
495template <typename I>
496void LeaderWatcher<I>::schedule_get_locker(bool reset_leader,
497 uint32_t delay_factor) {
498 dout(20) << dendl;
499
500 assert(m_threads->timer_lock.is_locked());
501 assert(m_lock.is_locked());
502
503 if (reset_leader) {
504 m_locker = {};
505 m_acquire_attempts = 0;
506 }
507
508 schedule_timer_task("get locker", delay_factor, false,
509 &LeaderWatcher<I>::get_locker, false);
510}
511
512template <typename I>
513void LeaderWatcher<I>::get_locker() {
514 dout(20) << dendl;
515
516 assert(m_threads->timer_lock.is_locked());
517 assert(m_lock.is_locked());
518 assert(!m_timer_op_tracker.empty());
519
520 C_GetLocker *get_locker_ctx = new C_GetLocker(this);
521 Context *ctx = create_async_context_callback(m_work_queue, get_locker_ctx);
522
523 m_leader_lock->get_locker(&get_locker_ctx->locker, ctx);
524}
525
526template <typename I>
527void LeaderWatcher<I>::handle_get_locker(int r,
528 librbd::managed_lock::Locker& locker) {
529 dout(20) << "r=" << r << dendl;
530
531 Mutex::Locker timer_locker(m_threads->timer_lock);
532 Mutex::Locker mutex_locker(m_lock);
533 assert(!m_timer_op_tracker.empty());
534
535 if (m_leader_lock->is_shutdown()) {
536 dout(20) << "canceling due to shutdown" << dendl;
537 m_timer_op_tracker.finish_op();
538 return;
539 }
540
541 if (is_leader(m_lock)) {
542 m_locker = {};
543 m_timer_op_tracker.finish_op();
544 return;
545 }
546
547 if (r == -ENOENT) {
548 m_locker = {};
549 m_acquire_attempts = 0;
550 acquire_leader_lock();
551 return;
552 } else if (r < 0) {
553 derr << "error retrieving leader locker: " << cpp_strerror(r) << dendl;
554 schedule_get_locker(true, 1);
555 m_timer_op_tracker.finish_op();
556 return;
557 }
558
31f18b77 559 bool notify_listener = false;
7c673cae
FG
560 if (m_locker != locker) {
561 m_locker = locker;
31f18b77 562 notify_listener = true;
7c673cae
FG
563 if (m_acquire_attempts > 1) {
564 dout(10) << "new lock owner detected -- resetting heartbeat counter"
565 << dendl;
566 m_acquire_attempts = 0;
567 }
568 }
569
570 if (m_acquire_attempts >=
571 m_cct->_conf->rbd_mirror_leader_max_acquire_attempts_before_break) {
572 dout(0) << "breaking leader lock after " << m_acquire_attempts << " "
573 << "failed attempts to acquire" << dendl;
574 break_leader_lock();
31f18b77
FG
575 return;
576 }
577
578 schedule_acquire_leader_lock(1);
579
580 if (!notify_listener) {
7c673cae 581 m_timer_op_tracker.finish_op();
31f18b77 582 return;
7c673cae 583 }
31f18b77
FG
584
585 auto ctx = new FunctionContext(
586 [this](int r) {
587 std::string instance_id;
588 if (get_leader_instance_id(&instance_id)) {
589 m_listener->update_leader_handler(instance_id);
590 }
591 Mutex::Locker timer_locker(m_threads->timer_lock);
592 Mutex::Locker locker(m_lock);
593 m_timer_op_tracker.finish_op();
594 });
595 m_work_queue->queue(ctx, 0);
7c673cae
FG
596}
597
598template <typename I>
599void LeaderWatcher<I>::schedule_acquire_leader_lock(uint32_t delay_factor) {
600 dout(20) << dendl;
601
602 assert(m_threads->timer_lock.is_locked());
603 assert(m_lock.is_locked());
604
605 schedule_timer_task("acquire leader lock",
606 delay_factor *
607 m_cct->_conf->rbd_mirror_leader_max_missed_heartbeats,
608 false, &LeaderWatcher<I>::acquire_leader_lock, false);
609}
610
611template <typename I>
612void LeaderWatcher<I>::acquire_leader_lock() {
613 assert(m_threads->timer_lock.is_locked());
614 assert(m_lock.is_locked());
615 assert(!m_timer_op_tracker.empty());
616
617 ++m_acquire_attempts;
618 dout(20) << "acquire_attempts=" << m_acquire_attempts << dendl;
619
620 Context *ctx = create_async_context_callback(
621 m_work_queue, create_context_callback<
622 LeaderWatcher<I>, &LeaderWatcher<I>::handle_acquire_leader_lock>(this));
623 m_leader_lock->try_acquire_lock(ctx);
624}
625
626template <typename I>
627void LeaderWatcher<I>::handle_acquire_leader_lock(int r) {
628 dout(20) << "r=" << r << dendl;
629
630 Mutex::Locker timer_locker(m_threads->timer_lock);
631 Mutex::Locker locker(m_lock);
632 assert(!m_timer_op_tracker.empty());
633
634 if (m_leader_lock->is_shutdown()) {
635 dout(20) << "canceling due to shutdown" << dendl;
636 m_timer_op_tracker.finish_op();
637 return;
638 }
639
640 if (r < 0) {
641 if (r == -EAGAIN) {
642 dout(20) << "already locked" << dendl;
643 } else {
644 derr << "error acquiring lock: " << cpp_strerror(r) << dendl;
645 }
646
647 get_locker();
648 return;
649 }
650
651 m_locker = {};
652 m_acquire_attempts = 0;
653
654 if (m_ret_val) {
655 dout(5) << "releasing due to error on notify" << dendl;
656 release_leader_lock();
657 m_timer_op_tracker.finish_op();
658 return;
659 }
660
661 notify_heartbeat();
662}
663
664template <typename I>
665void LeaderWatcher<I>::release_leader_lock() {
666 dout(20) << dendl;
667
668 assert(m_lock.is_locked());
669
670 Context *ctx = create_async_context_callback(
671 m_work_queue, create_context_callback<
672 LeaderWatcher<I>, &LeaderWatcher<I>::handle_release_leader_lock>(this));
673
674 m_leader_lock->release_lock(ctx);
675}
676
677template <typename I>
678void LeaderWatcher<I>::handle_release_leader_lock(int r) {
679 dout(20) << "r=" << r << dendl;
680
681 Mutex::Locker timer_locker(m_threads->timer_lock);
682 Mutex::Locker locker(m_lock);
683
684 if (r < 0) {
685 derr << "error releasing lock: " << cpp_strerror(r) << dendl;
686 return;
687 }
688
689 schedule_acquire_leader_lock(1);
690}
691
692template <typename I>
693void LeaderWatcher<I>::init_status_watcher() {
694 dout(20) << dendl;
695
696 assert(m_lock.is_locked());
697 assert(m_status_watcher == nullptr);
698
699 m_status_watcher = MirrorStatusWatcher<I>::create(m_ioctx, m_work_queue);
700
701 Context *ctx = create_context_callback<
702 LeaderWatcher<I>, &LeaderWatcher<I>::handle_init_status_watcher>(this);
703
704 m_status_watcher->init(ctx);
705}
706
707template <typename I>
708void LeaderWatcher<I>::handle_init_status_watcher(int r) {
709 dout(20) << "r=" << r << dendl;
710
711 Context *on_finish = nullptr;
712 {
713 Mutex::Locker locker(m_lock);
714
715 if (r == 0) {
716 init_instances();
717 return;
718 }
719
720 derr << "error initializing mirror status watcher: " << cpp_strerror(r)
721 << dendl;
722 m_status_watcher->destroy();
723 m_status_watcher = nullptr;
724 assert(m_on_finish != nullptr);
725 std::swap(m_on_finish, on_finish);
726 }
727 on_finish->complete(r);
728}
729
730template <typename I>
731void LeaderWatcher<I>::shut_down_status_watcher() {
732 dout(20) << dendl;
733
734 assert(m_lock.is_locked());
735 assert(m_status_watcher != nullptr);
736
737 Context *ctx = create_async_context_callback(
738 m_work_queue, create_context_callback<LeaderWatcher<I>,
739 &LeaderWatcher<I>::handle_shut_down_status_watcher>(this));
740
741 m_status_watcher->shut_down(ctx);
742}
743
744template <typename I>
745void LeaderWatcher<I>::handle_shut_down_status_watcher(int r) {
746 dout(20) << "r=" << r << dendl;
747
748 Context *on_finish = nullptr;
749 {
750 Mutex::Locker locker(m_lock);
751
752 m_status_watcher->destroy();
753 m_status_watcher = nullptr;
754
755 if (r < 0) {
756 derr << "error shutting mirror status watcher down: " << cpp_strerror(r)
757 << dendl;
758 }
759
760 if (m_ret_val != 0) {
761 r = m_ret_val;
762 }
763
764 if (!is_leader(m_lock)) {
765 // ignore on releasing
766 r = 0;
767 }
768
769 assert(m_on_finish != nullptr);
770 std::swap(m_on_finish, on_finish);
771 }
772 on_finish->complete(r);
773}
774
775template <typename I>
776void LeaderWatcher<I>::init_instances() {
777 dout(20) << dendl;
778
779 assert(m_lock.is_locked());
780 assert(m_instances == nullptr);
781
782 m_instances = Instances<I>::create(m_threads, m_ioctx);
783
784 Context *ctx = create_context_callback<
785 LeaderWatcher<I>, &LeaderWatcher<I>::handle_init_instances>(this);
786
787 m_instances->init(ctx);
788}
789
790template <typename I>
791void LeaderWatcher<I>::handle_init_instances(int r) {
792 dout(20) << "r=" << r << dendl;
793
794 Mutex::Locker locker(m_lock);
795
796 if (r < 0) {
797 derr << "error initializing instances: " << cpp_strerror(r) << dendl;
798 m_ret_val = r;
799 m_instances->destroy();
800 m_instances = nullptr;
801 shut_down_status_watcher();
802 return;
803 }
804
805 notify_listener();
806}
807
808template <typename I>
809void LeaderWatcher<I>::shut_down_instances() {
810 dout(20) << dendl;
811
812 assert(m_lock.is_locked());
813 assert(m_instances != nullptr);
814
815 Context *ctx = create_async_context_callback(
816 m_work_queue, create_context_callback<LeaderWatcher<I>,
817 &LeaderWatcher<I>::handle_shut_down_instances>(this));
818
819 m_instances->shut_down(ctx);
820}
821
822template <typename I>
823void LeaderWatcher<I>::handle_shut_down_instances(int r) {
824 dout(20) << "r=" << r << dendl;
825 assert(r == 0);
826
827 Mutex::Locker locker(m_lock);
828
829 m_instances->destroy();
830 m_instances = nullptr;
831
832 shut_down_status_watcher();
833}
834
835template <typename I>
836void LeaderWatcher<I>::notify_listener() {
837 dout(20) << dendl;
838
839 assert(m_lock.is_locked());
840
841 Context *ctx = create_async_context_callback(
842 m_work_queue, create_context_callback<
843 LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_listener>(this));
844
845 if (is_leader(m_lock)) {
846 ctx = new FunctionContext(
847 [this, ctx](int r) {
848 m_listener->post_acquire_handler(ctx);
849 });
850 } else {
851 ctx = new FunctionContext(
852 [this, ctx](int r) {
853 m_listener->pre_release_handler(ctx);
854 });
855 }
856 m_work_queue->queue(ctx, 0);
857}
858
859template <typename I>
860void LeaderWatcher<I>::handle_notify_listener(int r) {
861 dout(20) << "r=" << r << dendl;
862
863 Mutex::Locker locker(m_lock);
864
865 if (r < 0) {
866 derr << "error notifying listener: " << cpp_strerror(r) << dendl;
867 m_ret_val = r;
868 }
869
870 if (is_leader(m_lock)) {
871 notify_lock_acquired();
872 } else {
873 shut_down_instances();
874 }
875}
876
877template <typename I>
878void LeaderWatcher<I>::notify_lock_acquired() {
879 dout(20) << dendl;
880
881 assert(m_lock.is_locked());
882
883 Context *ctx = create_context_callback<
884 LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_lock_acquired>(this);
885
886 bufferlist bl;
887 ::encode(NotifyMessage{LockAcquiredPayload{}}, bl);
888
889 send_notify(bl, nullptr, ctx);
890}
891
892template <typename I>
893void LeaderWatcher<I>::handle_notify_lock_acquired(int r) {
894 dout(20) << "r=" << r << dendl;
895
896 Context *on_finish = nullptr;
897 {
898 Mutex::Locker locker(m_lock);
899 if (r < 0 && r != -ETIMEDOUT) {
900 derr << "error notifying leader lock acquired: " << cpp_strerror(r)
901 << dendl;
902 m_ret_val = r;
903 }
904
905 assert(m_on_finish != nullptr);
906 std::swap(m_on_finish, on_finish);
907 }
908 on_finish->complete(0);
909}
910
911template <typename I>
912void LeaderWatcher<I>::notify_lock_released() {
913 dout(20) << dendl;
914
915 assert(m_lock.is_locked());
916
917 Context *ctx = create_context_callback<
918 LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_lock_released>(this);
919
920 bufferlist bl;
921 ::encode(NotifyMessage{LockReleasedPayload{}}, bl);
922
923 send_notify(bl, nullptr, ctx);
924}
925
926template <typename I>
927void LeaderWatcher<I>::handle_notify_lock_released(int r) {
928 dout(20) << "r=" << r << dendl;
929
930 Context *on_finish = nullptr;
931 {
932 Mutex::Locker locker(m_lock);
933 if (r < 0 && r != -ETIMEDOUT) {
934 derr << "error notifying leader lock released: " << cpp_strerror(r)
935 << dendl;
936 }
937
938 assert(m_on_finish != nullptr);
939 std::swap(m_on_finish, on_finish);
940 }
941 on_finish->complete(r);
942}
943
944template <typename I>
945void LeaderWatcher<I>::notify_heartbeat() {
946 dout(20) << dendl;
947
948 assert(m_threads->timer_lock.is_locked());
949 assert(m_lock.is_locked());
950 assert(!m_timer_op_tracker.empty());
951
952 if (!is_leader(m_lock)) {
953 dout(5) << "not leader, canceling" << dendl;
954 m_timer_op_tracker.finish_op();
955 return;
956 }
957
958 Context *ctx = create_context_callback<
959 LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_heartbeat>(this);
960
961 bufferlist bl;
962 ::encode(NotifyMessage{HeartbeatPayload{}}, bl);
963
964 m_heartbeat_response.acks.clear();
965 send_notify(bl, &m_heartbeat_response, ctx);
966}
967
968template <typename I>
969void LeaderWatcher<I>::handle_notify_heartbeat(int r) {
970 dout(20) << "r=" << r << dendl;
971
972 Mutex::Locker timer_locker(m_threads->timer_lock);
973 Mutex::Locker locker(m_lock);
974 assert(!m_timer_op_tracker.empty());
975
976 m_timer_op_tracker.finish_op();
977 if (m_leader_lock->is_shutdown()) {
978 dout(20) << "canceling due to shutdown" << dendl;
979 return;
980 } else if (!is_leader(m_lock)) {
981 return;
982 }
983
984 if (r < 0 && r != -ETIMEDOUT) {
985 derr << "error notifying hearbeat: " << cpp_strerror(r)
986 << ", releasing leader" << dendl;
987 release_leader_lock();
988 return;
989 }
990
991 dout(20) << m_heartbeat_response.acks.size() << " acks received, "
992 << m_heartbeat_response.timeouts.size() << " timed out" << dendl;
993
994 for (auto &it: m_heartbeat_response.acks) {
995 uint64_t notifier_id = it.first.gid;
996 if (notifier_id == m_notifier_id) {
997 continue;
998 }
999
1000 std::string instance_id = stringify(notifier_id);
1001 m_instances->notify(instance_id);
1002 }
1003
1004 schedule_timer_task("heartbeat", 1, true,
1005 &LeaderWatcher<I>::notify_heartbeat, false);
1006}
1007
1008template <typename I>
1009void LeaderWatcher<I>::handle_heartbeat(Context *on_notify_ack) {
1010 dout(20) << dendl;
1011
1012 {
1013 Mutex::Locker timer_locker(m_threads->timer_lock);
1014 Mutex::Locker locker(m_lock);
1015 if (is_leader(m_lock)) {
1016 dout(5) << "got another leader heartbeat, ignoring" << dendl;
1017 } else {
1018 cancel_timer_task();
1019 m_acquire_attempts = 0;
1020 schedule_acquire_leader_lock(1);
1021 }
1022 }
1023
1024 on_notify_ack->complete(0);
1025}
1026
1027template <typename I>
1028void LeaderWatcher<I>::handle_lock_acquired(Context *on_notify_ack) {
1029 dout(20) << dendl;
1030
1031 {
1032 Mutex::Locker timer_locker(m_threads->timer_lock);
1033 Mutex::Locker locker(m_lock);
1034 if (is_leader(m_lock)) {
1035 dout(5) << "got another leader lock_acquired, ignoring" << dendl;
1036 } else {
1037 cancel_timer_task();
1038 schedule_get_locker(true, 0);
1039 }
1040 }
1041
1042 on_notify_ack->complete(0);
1043}
1044
1045template <typename I>
1046void LeaderWatcher<I>::handle_lock_released(Context *on_notify_ack) {
1047 dout(20) << dendl;
1048
1049 {
1050 Mutex::Locker timer_locker(m_threads->timer_lock);
1051 Mutex::Locker locker(m_lock);
1052 if (is_leader(m_lock)) {
1053 dout(5) << "got another leader lock_released, ignoring" << dendl;
1054 } else {
1055 cancel_timer_task();
1056 schedule_get_locker(true, 0);
1057 }
1058 }
1059
1060 on_notify_ack->complete(0);
1061}
1062
1063template <typename I>
1064void LeaderWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
1065 uint64_t notifier_id, bufferlist &bl) {
1066 dout(20) << "notify_id=" << notify_id << ", handle=" << handle << ", "
1067 << "notifier_id=" << notifier_id << dendl;
1068
1069 Context *ctx = new C_NotifyAck(this, notify_id, handle);
1070
1071 if (notifier_id == m_notifier_id) {
1072 dout(20) << "our own notification, ignoring" << dendl;
1073 ctx->complete(0);
1074 return;
1075 }
1076
1077 NotifyMessage notify_message;
1078 try {
1079 bufferlist::iterator iter = bl.begin();
1080 ::decode(notify_message, iter);
1081 } catch (const buffer::error &err) {
1082 derr << ": error decoding image notification: " << err.what() << dendl;
1083 ctx->complete(0);
1084 return;
1085 }
1086
1087 apply_visitor(HandlePayloadVisitor(this, ctx), notify_message.payload);
1088}
1089
1090template <typename I>
1091void LeaderWatcher<I>::handle_payload(const HeartbeatPayload &payload,
1092 Context *on_notify_ack) {
1093 dout(20) << "heartbeat" << dendl;
1094
1095 handle_heartbeat(on_notify_ack);
1096}
1097
1098template <typename I>
1099void LeaderWatcher<I>::handle_payload(const LockAcquiredPayload &payload,
1100 Context *on_notify_ack) {
1101 dout(20) << "lock_acquired" << dendl;
1102
1103 handle_lock_acquired(on_notify_ack);
1104}
1105
1106template <typename I>
1107void LeaderWatcher<I>::handle_payload(const LockReleasedPayload &payload,
1108 Context *on_notify_ack) {
1109 dout(20) << "lock_released" << dendl;
1110
1111 handle_lock_released(on_notify_ack);
1112}
1113
1114template <typename I>
1115void LeaderWatcher<I>::handle_payload(const UnknownPayload &payload,
1116 Context *on_notify_ack) {
1117 dout(20) << "unknown" << dendl;
1118
1119 on_notify_ack->complete(0);
1120}
1121
1122} // namespace mirror
1123} // namespace rbd
1124
1125template class rbd::mirror::LeaderWatcher<librbd::ImageCtx>;