]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/LeaderWatcher.cc
update sources to 12.2.10
[ceph.git] / ceph / src / tools / rbd_mirror / LeaderWatcher.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "LeaderWatcher.h"
5#include "common/Timer.h"
6#include "common/debug.h"
7#include "common/errno.h"
8#include "cls/rbd/cls_rbd_client.h"
9#include "include/stringify.h"
10#include "librbd/Utils.h"
11#include "librbd/watcher/Types.h"
12#include "Threads.h"
13
14#define dout_context g_ceph_context
15#define dout_subsys ceph_subsys_rbd_mirror
16#undef dout_prefix
17#define dout_prefix *_dout << "rbd::mirror::LeaderWatcher: " \
18 << this << " " << __func__ << ": "
19namespace rbd {
20namespace mirror {
21
22using namespace leader_watcher;
23
24using librbd::util::create_async_context_callback;
25using librbd::util::create_context_callback;
26using librbd::util::create_rados_callback;
27
28template <typename I>
29LeaderWatcher<I>::LeaderWatcher(Threads<I> *threads, librados::IoCtx &io_ctx,
30 Listener *listener)
31 : Watcher(io_ctx, threads->work_queue, RBD_MIRROR_LEADER),
32 m_threads(threads), m_listener(listener),
33 m_lock("rbd::mirror::LeaderWatcher " + io_ctx.get_pool_name()),
34 m_notifier_id(librados::Rados(io_ctx).get_instance_id()),
35 m_leader_lock(new LeaderLock(m_ioctx, m_work_queue, m_oid, this, true,
181888fb
FG
36 m_cct->_conf->get_val<int64_t>(
37 "rbd_blacklist_expire_seconds"))) {
7c673cae
FG
38}
39
40template <typename I>
41LeaderWatcher<I>::~LeaderWatcher() {
42 assert(m_status_watcher == nullptr);
43 assert(m_instances == nullptr);
44 assert(m_timer_task == nullptr);
45
46 delete m_leader_lock;
47}
48
31f18b77
FG
49template <typename I>
50std::string LeaderWatcher<I>::get_instance_id() {
51 return stringify(m_notifier_id);
52}
53
7c673cae
FG
54template <typename I>
55int LeaderWatcher<I>::init() {
56 C_SaferCond init_ctx;
57 init(&init_ctx);
58 return init_ctx.wait();
59}
60
61template <typename I>
62void LeaderWatcher<I>::init(Context *on_finish) {
63 dout(20) << "notifier_id=" << m_notifier_id << dendl;
64
65 Mutex::Locker locker(m_lock);
66
67 assert(m_on_finish == nullptr);
68 m_on_finish = on_finish;
69
70 create_leader_object();
71}
72
73template <typename I>
74void LeaderWatcher<I>::create_leader_object() {
75 dout(20) << dendl;
76
77 assert(m_lock.is_locked());
78
79 librados::ObjectWriteOperation op;
80 op.create(false);
81
82 librados::AioCompletion *aio_comp = create_rados_callback<
83 LeaderWatcher<I>, &LeaderWatcher<I>::handle_create_leader_object>(this);
84 int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
85 assert(r == 0);
86 aio_comp->release();
87}
88
89template <typename I>
90void LeaderWatcher<I>::handle_create_leader_object(int r) {
91 dout(20) << "r=" << r << dendl;
92
93 Context *on_finish = nullptr;
94 {
95 Mutex::Locker locker(m_lock);
96
97 if (r == 0) {
98 register_watch();
99 return;
100 }
101
102 derr << "error creating " << m_oid << " object: " << cpp_strerror(r)
103 << dendl;
104
105 std::swap(on_finish, m_on_finish);
106 }
107 on_finish->complete(r);
108}
109
110template <typename I>
111void LeaderWatcher<I>::register_watch() {
112 dout(20) << dendl;
113
114 assert(m_lock.is_locked());
115
116 Context *ctx = create_async_context_callback(
117 m_work_queue, create_context_callback<
118 LeaderWatcher<I>, &LeaderWatcher<I>::handle_register_watch>(this));
119
120 librbd::Watcher::register_watch(ctx);
121}
122
123template <typename I>
124void LeaderWatcher<I>::handle_register_watch(int r) {
125 dout(20) << "r=" << r << dendl;
126
127 Context *on_finish = nullptr;
128 {
129 Mutex::Locker timer_locker(m_threads->timer_lock);
130 Mutex::Locker locker(m_lock);
131
132 if (r < 0) {
133 derr << "error registering leader watcher for " << m_oid << " object: "
134 << cpp_strerror(r) << dendl;
135 } else {
136 schedule_acquire_leader_lock(0);
137 }
138
139 std::swap(on_finish, m_on_finish);
140 }
141 on_finish->complete(r);
142}
143
144template <typename I>
145void LeaderWatcher<I>::shut_down() {
146 C_SaferCond shut_down_ctx;
147 shut_down(&shut_down_ctx);
148 int r = shut_down_ctx.wait();
149 assert(r == 0);
150}
151
152template <typename I>
153void LeaderWatcher<I>::shut_down(Context *on_finish) {
154 dout(20) << dendl;
155
156 Mutex::Locker timer_locker(m_threads->timer_lock);
157 Mutex::Locker locker(m_lock);
158
159 assert(m_on_shut_down_finish == nullptr);
160 m_on_shut_down_finish = on_finish;
161 cancel_timer_task();
162 shut_down_leader_lock();
163}
164
165template <typename I>
166void LeaderWatcher<I>::shut_down_leader_lock() {
167 dout(20) << dendl;
168
169 assert(m_lock.is_locked());
170
171 Context *ctx = create_async_context_callback(
172 m_work_queue, create_context_callback<
173 LeaderWatcher<I>, &LeaderWatcher<I>::handle_shut_down_leader_lock>(this));
174
175 m_leader_lock->shut_down(ctx);
176}
177
178template <typename I>
179void LeaderWatcher<I>::handle_shut_down_leader_lock(int r) {
180 dout(20) << "r=" << r << dendl;
181
182 Mutex::Locker locker(m_lock);
183
184 if (r < 0) {
185 derr << "error shutting down leader lock: " << cpp_strerror(r) << dendl;
186 }
187
188 unregister_watch();
189}
190
191template <typename I>
192void LeaderWatcher<I>::unregister_watch() {
193 dout(20) << dendl;
194
195 assert(m_lock.is_locked());
196
197 Context *ctx = create_async_context_callback(
198 m_work_queue, create_context_callback<
199 LeaderWatcher<I>, &LeaderWatcher<I>::handle_unregister_watch>(this));
200
201 librbd::Watcher::unregister_watch(ctx);
202}
203
204template <typename I>
205void LeaderWatcher<I>::handle_unregister_watch(int r) {
206 dout(20) << "r=" << r << dendl;
207
208 if (r < 0) {
209 derr << "error unregistering leader watcher for " << m_oid << " object: "
210 << cpp_strerror(r) << dendl;
211 }
212 wait_for_tasks();
213}
214
215template <typename I>
216void LeaderWatcher<I>::wait_for_tasks() {
217 dout(20) << dendl;
218
219 Mutex::Locker timer_locker(m_threads->timer_lock);
220 Mutex::Locker locker(m_lock);
221 schedule_timer_task("wait for tasks", 0, false,
222 &LeaderWatcher<I>::handle_wait_for_tasks, true);
223}
224
225template <typename I>
226void LeaderWatcher<I>::handle_wait_for_tasks() {
227 dout(20) << dendl;
228
229 assert(m_threads->timer_lock.is_locked());
230 assert(m_lock.is_locked());
231 assert(m_on_shut_down_finish != nullptr);
232
233 assert(!m_timer_op_tracker.empty());
234 m_timer_op_tracker.finish_op();
235
236 auto ctx = new FunctionContext([this](int r) {
237 Context *on_finish;
238 {
239 // ensure lock isn't held when completing shut down
240 Mutex::Locker locker(m_lock);
241 assert(m_on_shut_down_finish != nullptr);
242 on_finish = m_on_shut_down_finish;
243 }
244 on_finish->complete(0);
245 });
246 m_work_queue->queue(ctx, 0);
247}
248
249template <typename I>
250bool LeaderWatcher<I>::is_leader() const {
251 Mutex::Locker locker(m_lock);
252
253 return is_leader(m_lock);
254}
255
256template <typename I>
257bool LeaderWatcher<I>::is_leader(Mutex &lock) const {
258 assert(m_lock.is_locked());
259
260 bool leader = m_leader_lock->is_leader();
261 dout(20) << leader << dendl;
262 return leader;
263}
264
265template <typename I>
266bool LeaderWatcher<I>::is_releasing_leader() const {
267 Mutex::Locker locker(m_lock);
268
269 return is_releasing_leader(m_lock);
270}
271
272template <typename I>
273bool LeaderWatcher<I>::is_releasing_leader(Mutex &lock) const {
274 assert(m_lock.is_locked());
275
276 bool releasing = m_leader_lock->is_releasing_leader();
277 dout(20) << releasing << dendl;
278 return releasing;
279}
280
281template <typename I>
282bool LeaderWatcher<I>::get_leader_instance_id(std::string *instance_id) const {
283 dout(20) << dendl;
284
285 Mutex::Locker locker(m_lock);
286
287 if (is_leader(m_lock) || is_releasing_leader(m_lock)) {
288 *instance_id = stringify(m_notifier_id);
289 return true;
290 }
291
292 if (!m_locker.cookie.empty()) {
293 *instance_id = stringify(m_locker.entity.num());
294 return true;
295 }
296
297 return false;
298}
299
300template <typename I>
301void LeaderWatcher<I>::release_leader() {
302 dout(20) << dendl;
303
304 Mutex::Locker locker(m_lock);
305 if (!is_leader(m_lock)) {
306 return;
307 }
308
309 release_leader_lock();
310}
311
312template <typename I>
313void LeaderWatcher<I>::list_instances(std::vector<std::string> *instance_ids) {
314 dout(20) << dendl;
315
316 Mutex::Locker locker(m_lock);
317
318 instance_ids->clear();
319 if (m_instances != nullptr) {
320 m_instances->list(instance_ids);
321 }
322}
323
324template <typename I>
325void LeaderWatcher<I>::cancel_timer_task() {
326 assert(m_threads->timer_lock.is_locked());
327 assert(m_lock.is_locked());
328
329 if (m_timer_task == nullptr) {
330 return;
331 }
332
333 dout(20) << m_timer_task << dendl;
334 bool canceled = m_threads->timer->cancel_event(m_timer_task);
335 assert(canceled);
336 m_timer_task = nullptr;
337}
338
339template <typename I>
340void LeaderWatcher<I>::schedule_timer_task(const std::string &name,
341 int delay_factor, bool leader,
342 TimerCallback timer_callback,
343 bool shutting_down) {
344 assert(m_threads->timer_lock.is_locked());
345 assert(m_lock.is_locked());
346
347 if (!shutting_down && m_on_shut_down_finish != nullptr) {
348 return;
349 }
350
351 cancel_timer_task();
352
353 m_timer_task = new FunctionContext(
354 [this, leader, timer_callback](int r) {
355 assert(m_threads->timer_lock.is_locked());
356 m_timer_task = nullptr;
357
358 if (m_timer_op_tracker.empty()) {
359 Mutex::Locker locker(m_lock);
360 execute_timer_task(leader, timer_callback);
361 return;
362 }
363
364 // old timer task is still running -- do not start next
365 // task until the previous task completes
366 if (m_timer_gate == nullptr) {
367 m_timer_gate = new C_TimerGate(this);
368 m_timer_op_tracker.wait_for_ops(m_timer_gate);
369 }
370 m_timer_gate->leader = leader;
371 m_timer_gate->timer_callback = timer_callback;
372 });
373
181888fb
FG
374 int after = delay_factor * m_cct->_conf->get_val<int64_t>(
375 "rbd_mirror_leader_heartbeat_interval");
7c673cae
FG
376
377 dout(20) << "scheduling " << name << " after " << after << " sec (task "
378 << m_timer_task << ")" << dendl;
379 m_threads->timer->add_event_after(after, m_timer_task);
380}
381
382template <typename I>
383void LeaderWatcher<I>::execute_timer_task(bool leader,
384 TimerCallback timer_callback) {
385 dout(20) << dendl;
386
387 assert(m_threads->timer_lock.is_locked());
388 assert(m_lock.is_locked());
389 assert(m_timer_op_tracker.empty());
390
391 if (is_leader(m_lock) != leader) {
392 return;
393 }
394
395 m_timer_op_tracker.start_op();
396 (this->*timer_callback)();
397}
398
399template <typename I>
400void LeaderWatcher<I>::handle_post_acquire_leader_lock(int r,
401 Context *on_finish) {
402 dout(20) << "r=" << r << dendl;
403
404 if (r < 0) {
405 if (r == -EAGAIN) {
406 dout(20) << "already locked" << dendl;
407 } else {
408 derr << "error acquiring leader lock: " << cpp_strerror(r) << dendl;
409 }
410 on_finish->complete(r);
411 return;
412 }
413
414 Mutex::Locker locker(m_lock);
415 assert(m_on_finish == nullptr);
416 m_on_finish = on_finish;
417 m_ret_val = 0;
418
419 init_status_watcher();
420}
421
422template <typename I>
423void LeaderWatcher<I>::handle_pre_release_leader_lock(Context *on_finish) {
424 dout(20) << dendl;
425
426 Mutex::Locker locker(m_lock);
427 assert(m_on_finish == nullptr);
428 m_on_finish = on_finish;
429 m_ret_val = 0;
430
431 notify_listener();
432}
433
434template <typename I>
435void LeaderWatcher<I>::handle_post_release_leader_lock(int r,
436 Context *on_finish) {
437 dout(20) << "r=" << r << dendl;
438
439 if (r < 0) {
440 on_finish->complete(r);
441 return;
442 }
443
444 Mutex::Locker locker(m_lock);
445 assert(m_on_finish == nullptr);
446 m_on_finish = on_finish;
447
448 notify_lock_released();
449}
450
451template <typename I>
452void LeaderWatcher<I>::break_leader_lock() {
453 dout(20) << dendl;
454
455 assert(m_threads->timer_lock.is_locked());
456 assert(m_lock.is_locked());
457 assert(!m_timer_op_tracker.empty());
458
459 if (m_locker.cookie.empty()) {
460 get_locker();
461 return;
462 }
463
464 Context *ctx = create_async_context_callback(
465 m_work_queue, create_context_callback<
466 LeaderWatcher<I>, &LeaderWatcher<I>::handle_break_leader_lock>(this));
467
468 m_leader_lock->break_lock(m_locker, true, ctx);
469}
470
471template <typename I>
472void LeaderWatcher<I>::handle_break_leader_lock(int r) {
473 dout(20) << "r=" << r << dendl;
474
475 Mutex::Locker timer_locker(m_threads->timer_lock);
476 Mutex::Locker locker(m_lock);
477 assert(!m_timer_op_tracker.empty());
478
479 if (m_leader_lock->is_shutdown()) {
480 dout(20) << "canceling due to shutdown" << dendl;
481 m_timer_op_tracker.finish_op();
482 return;
483 }
484
485 if (r < 0 && r != -ENOENT) {
486 derr << "error beaking leader lock: " << cpp_strerror(r) << dendl;
487 schedule_acquire_leader_lock(1);
488 m_timer_op_tracker.finish_op();
489 return;
490 }
491
492 m_locker = {};
493 m_acquire_attempts = 0;
494 acquire_leader_lock();
495}
496
497template <typename I>
498void LeaderWatcher<I>::schedule_get_locker(bool reset_leader,
499 uint32_t delay_factor) {
500 dout(20) << dendl;
501
502 assert(m_threads->timer_lock.is_locked());
503 assert(m_lock.is_locked());
504
505 if (reset_leader) {
506 m_locker = {};
507 m_acquire_attempts = 0;
508 }
509
510 schedule_timer_task("get locker", delay_factor, false,
511 &LeaderWatcher<I>::get_locker, false);
512}
513
514template <typename I>
515void LeaderWatcher<I>::get_locker() {
516 dout(20) << dendl;
517
518 assert(m_threads->timer_lock.is_locked());
519 assert(m_lock.is_locked());
520 assert(!m_timer_op_tracker.empty());
521
522 C_GetLocker *get_locker_ctx = new C_GetLocker(this);
523 Context *ctx = create_async_context_callback(m_work_queue, get_locker_ctx);
524
525 m_leader_lock->get_locker(&get_locker_ctx->locker, ctx);
526}
527
528template <typename I>
529void LeaderWatcher<I>::handle_get_locker(int r,
530 librbd::managed_lock::Locker& locker) {
531 dout(20) << "r=" << r << dendl;
532
533 Mutex::Locker timer_locker(m_threads->timer_lock);
534 Mutex::Locker mutex_locker(m_lock);
535 assert(!m_timer_op_tracker.empty());
536
537 if (m_leader_lock->is_shutdown()) {
538 dout(20) << "canceling due to shutdown" << dendl;
539 m_timer_op_tracker.finish_op();
540 return;
541 }
542
543 if (is_leader(m_lock)) {
544 m_locker = {};
545 m_timer_op_tracker.finish_op();
546 return;
547 }
548
549 if (r == -ENOENT) {
550 m_locker = {};
551 m_acquire_attempts = 0;
552 acquire_leader_lock();
553 return;
554 } else if (r < 0) {
555 derr << "error retrieving leader locker: " << cpp_strerror(r) << dendl;
556 schedule_get_locker(true, 1);
557 m_timer_op_tracker.finish_op();
558 return;
559 }
560
31f18b77 561 bool notify_listener = false;
7c673cae
FG
562 if (m_locker != locker) {
563 m_locker = locker;
31f18b77 564 notify_listener = true;
7c673cae
FG
565 if (m_acquire_attempts > 1) {
566 dout(10) << "new lock owner detected -- resetting heartbeat counter"
567 << dendl;
568 m_acquire_attempts = 0;
569 }
570 }
571
181888fb
FG
572 if (m_acquire_attempts >= m_cct->_conf->get_val<int64_t>(
573 "rbd_mirror_leader_max_acquire_attempts_before_break")) {
7c673cae
FG
574 dout(0) << "breaking leader lock after " << m_acquire_attempts << " "
575 << "failed attempts to acquire" << dendl;
576 break_leader_lock();
31f18b77
FG
577 return;
578 }
579
580 schedule_acquire_leader_lock(1);
581
582 if (!notify_listener) {
7c673cae 583 m_timer_op_tracker.finish_op();
31f18b77 584 return;
7c673cae 585 }
31f18b77
FG
586
587 auto ctx = new FunctionContext(
588 [this](int r) {
589 std::string instance_id;
590 if (get_leader_instance_id(&instance_id)) {
591 m_listener->update_leader_handler(instance_id);
592 }
593 Mutex::Locker timer_locker(m_threads->timer_lock);
594 Mutex::Locker locker(m_lock);
595 m_timer_op_tracker.finish_op();
596 });
597 m_work_queue->queue(ctx, 0);
7c673cae
FG
598}
599
600template <typename I>
601void LeaderWatcher<I>::schedule_acquire_leader_lock(uint32_t delay_factor) {
602 dout(20) << dendl;
603
604 assert(m_threads->timer_lock.is_locked());
605 assert(m_lock.is_locked());
606
607 schedule_timer_task("acquire leader lock",
608 delay_factor *
181888fb 609 m_cct->_conf->get_val<int64_t>("rbd_mirror_leader_max_missed_heartbeats"),
7c673cae
FG
610 false, &LeaderWatcher<I>::acquire_leader_lock, false);
611}
612
613template <typename I>
614void LeaderWatcher<I>::acquire_leader_lock() {
615 assert(m_threads->timer_lock.is_locked());
616 assert(m_lock.is_locked());
617 assert(!m_timer_op_tracker.empty());
618
619 ++m_acquire_attempts;
620 dout(20) << "acquire_attempts=" << m_acquire_attempts << dendl;
621
622 Context *ctx = create_async_context_callback(
623 m_work_queue, create_context_callback<
624 LeaderWatcher<I>, &LeaderWatcher<I>::handle_acquire_leader_lock>(this));
625 m_leader_lock->try_acquire_lock(ctx);
626}
627
628template <typename I>
629void LeaderWatcher<I>::handle_acquire_leader_lock(int r) {
630 dout(20) << "r=" << r << dendl;
631
632 Mutex::Locker timer_locker(m_threads->timer_lock);
633 Mutex::Locker locker(m_lock);
634 assert(!m_timer_op_tracker.empty());
635
636 if (m_leader_lock->is_shutdown()) {
637 dout(20) << "canceling due to shutdown" << dendl;
638 m_timer_op_tracker.finish_op();
639 return;
640 }
641
642 if (r < 0) {
643 if (r == -EAGAIN) {
644 dout(20) << "already locked" << dendl;
645 } else {
646 derr << "error acquiring lock: " << cpp_strerror(r) << dendl;
647 }
648
649 get_locker();
650 return;
651 }
652
653 m_locker = {};
654 m_acquire_attempts = 0;
655
656 if (m_ret_val) {
657 dout(5) << "releasing due to error on notify" << dendl;
658 release_leader_lock();
659 m_timer_op_tracker.finish_op();
660 return;
661 }
662
663 notify_heartbeat();
664}
665
666template <typename I>
667void LeaderWatcher<I>::release_leader_lock() {
668 dout(20) << dendl;
669
670 assert(m_lock.is_locked());
671
672 Context *ctx = create_async_context_callback(
673 m_work_queue, create_context_callback<
674 LeaderWatcher<I>, &LeaderWatcher<I>::handle_release_leader_lock>(this));
675
676 m_leader_lock->release_lock(ctx);
677}
678
679template <typename I>
680void LeaderWatcher<I>::handle_release_leader_lock(int r) {
681 dout(20) << "r=" << r << dendl;
682
683 Mutex::Locker timer_locker(m_threads->timer_lock);
684 Mutex::Locker locker(m_lock);
685
686 if (r < 0) {
687 derr << "error releasing lock: " << cpp_strerror(r) << dendl;
688 return;
689 }
690
691 schedule_acquire_leader_lock(1);
692}
693
694template <typename I>
695void LeaderWatcher<I>::init_status_watcher() {
696 dout(20) << dendl;
697
698 assert(m_lock.is_locked());
699 assert(m_status_watcher == nullptr);
700
701 m_status_watcher = MirrorStatusWatcher<I>::create(m_ioctx, m_work_queue);
702
703 Context *ctx = create_context_callback<
704 LeaderWatcher<I>, &LeaderWatcher<I>::handle_init_status_watcher>(this);
705
706 m_status_watcher->init(ctx);
707}
708
709template <typename I>
710void LeaderWatcher<I>::handle_init_status_watcher(int r) {
711 dout(20) << "r=" << r << dendl;
712
713 Context *on_finish = nullptr;
714 {
715 Mutex::Locker locker(m_lock);
716
717 if (r == 0) {
718 init_instances();
719 return;
720 }
721
722 derr << "error initializing mirror status watcher: " << cpp_strerror(r)
723 << dendl;
724 m_status_watcher->destroy();
725 m_status_watcher = nullptr;
726 assert(m_on_finish != nullptr);
727 std::swap(m_on_finish, on_finish);
728 }
729 on_finish->complete(r);
730}
731
732template <typename I>
733void LeaderWatcher<I>::shut_down_status_watcher() {
734 dout(20) << dendl;
735
736 assert(m_lock.is_locked());
737 assert(m_status_watcher != nullptr);
738
739 Context *ctx = create_async_context_callback(
740 m_work_queue, create_context_callback<LeaderWatcher<I>,
741 &LeaderWatcher<I>::handle_shut_down_status_watcher>(this));
742
743 m_status_watcher->shut_down(ctx);
744}
745
746template <typename I>
747void LeaderWatcher<I>::handle_shut_down_status_watcher(int r) {
748 dout(20) << "r=" << r << dendl;
749
750 Context *on_finish = nullptr;
751 {
752 Mutex::Locker locker(m_lock);
753
754 m_status_watcher->destroy();
755 m_status_watcher = nullptr;
756
757 if (r < 0) {
758 derr << "error shutting mirror status watcher down: " << cpp_strerror(r)
759 << dendl;
760 }
761
762 if (m_ret_val != 0) {
763 r = m_ret_val;
764 }
765
766 if (!is_leader(m_lock)) {
767 // ignore on releasing
768 r = 0;
769 }
770
771 assert(m_on_finish != nullptr);
772 std::swap(m_on_finish, on_finish);
773 }
774 on_finish->complete(r);
775}
776
777template <typename I>
778void LeaderWatcher<I>::init_instances() {
779 dout(20) << dendl;
780
781 assert(m_lock.is_locked());
782 assert(m_instances == nullptr);
783
784 m_instances = Instances<I>::create(m_threads, m_ioctx);
785
786 Context *ctx = create_context_callback<
787 LeaderWatcher<I>, &LeaderWatcher<I>::handle_init_instances>(this);
788
789 m_instances->init(ctx);
790}
791
792template <typename I>
793void LeaderWatcher<I>::handle_init_instances(int r) {
794 dout(20) << "r=" << r << dendl;
795
796 Mutex::Locker locker(m_lock);
797
798 if (r < 0) {
799 derr << "error initializing instances: " << cpp_strerror(r) << dendl;
800 m_ret_val = r;
801 m_instances->destroy();
802 m_instances = nullptr;
803 shut_down_status_watcher();
804 return;
805 }
806
807 notify_listener();
808}
809
810template <typename I>
811void LeaderWatcher<I>::shut_down_instances() {
812 dout(20) << dendl;
813
814 assert(m_lock.is_locked());
815 assert(m_instances != nullptr);
816
817 Context *ctx = create_async_context_callback(
818 m_work_queue, create_context_callback<LeaderWatcher<I>,
819 &LeaderWatcher<I>::handle_shut_down_instances>(this));
820
821 m_instances->shut_down(ctx);
822}
823
824template <typename I>
825void LeaderWatcher<I>::handle_shut_down_instances(int r) {
826 dout(20) << "r=" << r << dendl;
827 assert(r == 0);
828
829 Mutex::Locker locker(m_lock);
830
831 m_instances->destroy();
832 m_instances = nullptr;
833
834 shut_down_status_watcher();
835}
836
837template <typename I>
838void LeaderWatcher<I>::notify_listener() {
839 dout(20) << dendl;
840
841 assert(m_lock.is_locked());
842
843 Context *ctx = create_async_context_callback(
844 m_work_queue, create_context_callback<
845 LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_listener>(this));
846
847 if (is_leader(m_lock)) {
848 ctx = new FunctionContext(
849 [this, ctx](int r) {
850 m_listener->post_acquire_handler(ctx);
851 });
852 } else {
853 ctx = new FunctionContext(
854 [this, ctx](int r) {
855 m_listener->pre_release_handler(ctx);
856 });
857 }
858 m_work_queue->queue(ctx, 0);
859}
860
861template <typename I>
862void LeaderWatcher<I>::handle_notify_listener(int r) {
863 dout(20) << "r=" << r << dendl;
864
865 Mutex::Locker locker(m_lock);
866
867 if (r < 0) {
868 derr << "error notifying listener: " << cpp_strerror(r) << dendl;
869 m_ret_val = r;
870 }
871
872 if (is_leader(m_lock)) {
873 notify_lock_acquired();
874 } else {
875 shut_down_instances();
876 }
877}
878
879template <typename I>
880void LeaderWatcher<I>::notify_lock_acquired() {
881 dout(20) << dendl;
882
883 assert(m_lock.is_locked());
884
885 Context *ctx = create_context_callback<
886 LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_lock_acquired>(this);
887
888 bufferlist bl;
889 ::encode(NotifyMessage{LockAcquiredPayload{}}, bl);
890
891 send_notify(bl, nullptr, ctx);
892}
893
894template <typename I>
895void LeaderWatcher<I>::handle_notify_lock_acquired(int r) {
896 dout(20) << "r=" << r << dendl;
897
898 Context *on_finish = nullptr;
899 {
900 Mutex::Locker locker(m_lock);
901 if (r < 0 && r != -ETIMEDOUT) {
902 derr << "error notifying leader lock acquired: " << cpp_strerror(r)
903 << dendl;
904 m_ret_val = r;
905 }
906
907 assert(m_on_finish != nullptr);
908 std::swap(m_on_finish, on_finish);
909 }
910 on_finish->complete(0);
911}
912
913template <typename I>
914void LeaderWatcher<I>::notify_lock_released() {
915 dout(20) << dendl;
916
917 assert(m_lock.is_locked());
918
919 Context *ctx = create_context_callback<
920 LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_lock_released>(this);
921
922 bufferlist bl;
923 ::encode(NotifyMessage{LockReleasedPayload{}}, bl);
924
925 send_notify(bl, nullptr, ctx);
926}
927
928template <typename I>
929void LeaderWatcher<I>::handle_notify_lock_released(int r) {
930 dout(20) << "r=" << r << dendl;
931
932 Context *on_finish = nullptr;
933 {
934 Mutex::Locker locker(m_lock);
935 if (r < 0 && r != -ETIMEDOUT) {
936 derr << "error notifying leader lock released: " << cpp_strerror(r)
937 << dendl;
938 }
939
940 assert(m_on_finish != nullptr);
941 std::swap(m_on_finish, on_finish);
942 }
943 on_finish->complete(r);
944}
945
946template <typename I>
947void LeaderWatcher<I>::notify_heartbeat() {
948 dout(20) << dendl;
949
950 assert(m_threads->timer_lock.is_locked());
951 assert(m_lock.is_locked());
952 assert(!m_timer_op_tracker.empty());
953
954 if (!is_leader(m_lock)) {
955 dout(5) << "not leader, canceling" << dendl;
956 m_timer_op_tracker.finish_op();
957 return;
958 }
959
960 Context *ctx = create_context_callback<
961 LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_heartbeat>(this);
962
963 bufferlist bl;
964 ::encode(NotifyMessage{HeartbeatPayload{}}, bl);
965
966 m_heartbeat_response.acks.clear();
967 send_notify(bl, &m_heartbeat_response, ctx);
968}
969
970template <typename I>
971void LeaderWatcher<I>::handle_notify_heartbeat(int r) {
972 dout(20) << "r=" << r << dendl;
973
974 Mutex::Locker timer_locker(m_threads->timer_lock);
975 Mutex::Locker locker(m_lock);
976 assert(!m_timer_op_tracker.empty());
977
978 m_timer_op_tracker.finish_op();
979 if (m_leader_lock->is_shutdown()) {
980 dout(20) << "canceling due to shutdown" << dendl;
981 return;
982 } else if (!is_leader(m_lock)) {
983 return;
984 }
985
986 if (r < 0 && r != -ETIMEDOUT) {
987 derr << "error notifying hearbeat: " << cpp_strerror(r)
988 << ", releasing leader" << dendl;
989 release_leader_lock();
990 return;
991 }
992
993 dout(20) << m_heartbeat_response.acks.size() << " acks received, "
994 << m_heartbeat_response.timeouts.size() << " timed out" << dendl;
995
996 for (auto &it: m_heartbeat_response.acks) {
997 uint64_t notifier_id = it.first.gid;
998 if (notifier_id == m_notifier_id) {
999 continue;
1000 }
1001
1002 std::string instance_id = stringify(notifier_id);
1003 m_instances->notify(instance_id);
1004 }
1005
1006 schedule_timer_task("heartbeat", 1, true,
1007 &LeaderWatcher<I>::notify_heartbeat, false);
1008}
1009
1010template <typename I>
1011void LeaderWatcher<I>::handle_heartbeat(Context *on_notify_ack) {
1012 dout(20) << dendl;
1013
1014 {
1015 Mutex::Locker timer_locker(m_threads->timer_lock);
1016 Mutex::Locker locker(m_lock);
1017 if (is_leader(m_lock)) {
1018 dout(5) << "got another leader heartbeat, ignoring" << dendl;
1019 } else {
1020 cancel_timer_task();
1021 m_acquire_attempts = 0;
1022 schedule_acquire_leader_lock(1);
1023 }
1024 }
1025
1026 on_notify_ack->complete(0);
1027}
1028
1029template <typename I>
1030void LeaderWatcher<I>::handle_lock_acquired(Context *on_notify_ack) {
1031 dout(20) << dendl;
1032
1033 {
1034 Mutex::Locker timer_locker(m_threads->timer_lock);
1035 Mutex::Locker locker(m_lock);
1036 if (is_leader(m_lock)) {
1037 dout(5) << "got another leader lock_acquired, ignoring" << dendl;
1038 } else {
1039 cancel_timer_task();
1040 schedule_get_locker(true, 0);
1041 }
1042 }
1043
1044 on_notify_ack->complete(0);
1045}
1046
1047template <typename I>
1048void LeaderWatcher<I>::handle_lock_released(Context *on_notify_ack) {
1049 dout(20) << dendl;
1050
1051 {
1052 Mutex::Locker timer_locker(m_threads->timer_lock);
1053 Mutex::Locker locker(m_lock);
1054 if (is_leader(m_lock)) {
1055 dout(5) << "got another leader lock_released, ignoring" << dendl;
1056 } else {
1057 cancel_timer_task();
1058 schedule_get_locker(true, 0);
1059 }
1060 }
1061
1062 on_notify_ack->complete(0);
1063}
1064
1065template <typename I>
1066void LeaderWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
1067 uint64_t notifier_id, bufferlist &bl) {
1068 dout(20) << "notify_id=" << notify_id << ", handle=" << handle << ", "
1069 << "notifier_id=" << notifier_id << dendl;
1070
1071 Context *ctx = new C_NotifyAck(this, notify_id, handle);
1072
1073 if (notifier_id == m_notifier_id) {
1074 dout(20) << "our own notification, ignoring" << dendl;
1075 ctx->complete(0);
1076 return;
1077 }
1078
1079 NotifyMessage notify_message;
1080 try {
1081 bufferlist::iterator iter = bl.begin();
1082 ::decode(notify_message, iter);
1083 } catch (const buffer::error &err) {
1084 derr << ": error decoding image notification: " << err.what() << dendl;
1085 ctx->complete(0);
1086 return;
1087 }
1088
1089 apply_visitor(HandlePayloadVisitor(this, ctx), notify_message.payload);
1090}
1091
1092template <typename I>
91327a77
AA
1093void LeaderWatcher<I>::handle_rewatch_complete(int r) {
1094 dout(5) << "r=" << r << dendl;
1095
1096 m_leader_lock->reacquire_lock(nullptr);
1097}
1098
1099template <typename I>
7c673cae
FG
1100void LeaderWatcher<I>::handle_payload(const HeartbeatPayload &payload,
1101 Context *on_notify_ack) {
1102 dout(20) << "heartbeat" << dendl;
1103
1104 handle_heartbeat(on_notify_ack);
1105}
1106
1107template <typename I>
1108void LeaderWatcher<I>::handle_payload(const LockAcquiredPayload &payload,
1109 Context *on_notify_ack) {
1110 dout(20) << "lock_acquired" << dendl;
1111
1112 handle_lock_acquired(on_notify_ack);
1113}
1114
1115template <typename I>
1116void LeaderWatcher<I>::handle_payload(const LockReleasedPayload &payload,
1117 Context *on_notify_ack) {
1118 dout(20) << "lock_released" << dendl;
1119
1120 handle_lock_released(on_notify_ack);
1121}
1122
1123template <typename I>
1124void LeaderWatcher<I>::handle_payload(const UnknownPayload &payload,
1125 Context *on_notify_ack) {
1126 dout(20) << "unknown" << dendl;
1127
1128 on_notify_ack->complete(0);
1129}
1130
1131} // namespace mirror
1132} // namespace rbd
1133
1134template class rbd::mirror::LeaderWatcher<librbd::ImageCtx>;