]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/LeaderWatcher.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / tools / rbd_mirror / LeaderWatcher.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "LeaderWatcher.h"
5 #include "common/Cond.h"
6 #include "common/Timer.h"
7 #include "common/debug.h"
8 #include "common/errno.h"
9 #include "cls/rbd/cls_rbd_client.h"
10 #include "include/stringify.h"
11 #include "librbd/Utils.h"
12 #include "librbd/asio/ContextWQ.h"
13 #include "librbd/watcher/Types.h"
14 #include "Threads.h"
15
16 #define dout_context g_ceph_context
17 #define dout_subsys ceph_subsys_rbd_mirror
18 #undef dout_prefix
19 #define dout_prefix *_dout << "rbd::mirror::LeaderWatcher: " \
20 << this << " " << __func__ << ": "
21 namespace rbd {
22 namespace mirror {
23
24 using namespace leader_watcher;
25
26 using librbd::util::create_async_context_callback;
27 using librbd::util::create_context_callback;
28 using librbd::util::create_rados_callback;
29
30 template <typename I>
31 LeaderWatcher<I>::LeaderWatcher(Threads<I> *threads, librados::IoCtx &io_ctx,
32 leader_watcher::Listener *listener)
33 : Watcher(io_ctx, threads->work_queue, RBD_MIRROR_LEADER),
34 m_threads(threads), m_listener(listener), m_instances_listener(this),
35 m_lock(ceph::make_mutex("rbd::mirror::LeaderWatcher " +
36 io_ctx.get_pool_name())),
37 m_notifier_id(librados::Rados(io_ctx).get_instance_id()),
38 m_instance_id(stringify(m_notifier_id)),
39 m_leader_lock(new LeaderLock(m_ioctx, *m_threads->asio_engine, m_oid, this,
40 true, m_cct->_conf.get_val<uint64_t>(
41 "rbd_blocklist_expire_seconds"))) {
42 }
43
44 template <typename I>
45 LeaderWatcher<I>::~LeaderWatcher() {
46 ceph_assert(m_instances == nullptr);
47 ceph_assert(m_timer_task == nullptr);
48
49 delete m_leader_lock;
50 }
51
52 template <typename I>
53 std::string LeaderWatcher<I>::get_instance_id() {
54 return m_instance_id;
55 }
56
57 template <typename I>
58 int LeaderWatcher<I>::init() {
59 C_SaferCond init_ctx;
60 init(&init_ctx);
61 return init_ctx.wait();
62 }
63
64 template <typename I>
65 void LeaderWatcher<I>::init(Context *on_finish) {
66 dout(10) << "notifier_id=" << m_notifier_id << dendl;
67
68 std::lock_guard locker{m_lock};
69
70 ceph_assert(m_on_finish == nullptr);
71 m_on_finish = on_finish;
72
73 create_leader_object();
74 }
75
76 template <typename I>
77 void LeaderWatcher<I>::create_leader_object() {
78 dout(10) << dendl;
79
80 ceph_assert(ceph_mutex_is_locked(m_lock));
81
82 librados::ObjectWriteOperation op;
83 op.create(false);
84
85 librados::AioCompletion *aio_comp = create_rados_callback<
86 LeaderWatcher<I>, &LeaderWatcher<I>::handle_create_leader_object>(this);
87 int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
88 ceph_assert(r == 0);
89 aio_comp->release();
90 }
91
92 template <typename I>
93 void LeaderWatcher<I>::handle_create_leader_object(int r) {
94 dout(10) << "r=" << r << dendl;
95
96 Context *on_finish = nullptr;
97 {
98 std::lock_guard locker{m_lock};
99
100 if (r == 0) {
101 register_watch();
102 return;
103 }
104
105 derr << "error creating " << m_oid << " object: " << cpp_strerror(r)
106 << dendl;
107
108 std::swap(on_finish, m_on_finish);
109 }
110 on_finish->complete(r);
111 }
112
113 template <typename I>
114 void LeaderWatcher<I>::register_watch() {
115 dout(10) << dendl;
116
117 ceph_assert(ceph_mutex_is_locked(m_lock));
118
119 Context *ctx = create_async_context_callback(
120 m_work_queue, create_context_callback<
121 LeaderWatcher<I>, &LeaderWatcher<I>::handle_register_watch>(this));
122
123 librbd::Watcher::register_watch(ctx);
124 }
125
126 template <typename I>
127 void LeaderWatcher<I>::handle_register_watch(int r) {
128 dout(10) << "r=" << r << dendl;
129
130 Context *on_finish = nullptr;
131 {
132 std::lock_guard timer_locker(m_threads->timer_lock);
133 std::lock_guard locker{m_lock};
134
135 if (r < 0) {
136 derr << "error registering leader watcher for " << m_oid << " object: "
137 << cpp_strerror(r) << dendl;
138 } else {
139 schedule_acquire_leader_lock(0);
140 }
141
142 ceph_assert(m_on_finish != nullptr);
143 std::swap(on_finish, m_on_finish);
144 }
145
146 on_finish->complete(r);
147 }
148
149 template <typename I>
150 void LeaderWatcher<I>::shut_down() {
151 C_SaferCond shut_down_ctx;
152 shut_down(&shut_down_ctx);
153 int r = shut_down_ctx.wait();
154 ceph_assert(r == 0);
155 }
156
157 template <typename I>
158 void LeaderWatcher<I>::shut_down(Context *on_finish) {
159 dout(10) << dendl;
160
161 std::scoped_lock locker{m_threads->timer_lock, m_lock};
162
163 ceph_assert(m_on_shut_down_finish == nullptr);
164 m_on_shut_down_finish = on_finish;
165 cancel_timer_task();
166 shut_down_leader_lock();
167 }
168
169 template <typename I>
170 void LeaderWatcher<I>::shut_down_leader_lock() {
171 dout(10) << dendl;
172
173 ceph_assert(ceph_mutex_is_locked(m_lock));
174
175 Context *ctx = create_async_context_callback(
176 m_work_queue, create_context_callback<
177 LeaderWatcher<I>, &LeaderWatcher<I>::handle_shut_down_leader_lock>(this));
178
179 m_leader_lock->shut_down(ctx);
180 }
181
182 template <typename I>
183 void LeaderWatcher<I>::handle_shut_down_leader_lock(int r) {
184 dout(10) << "r=" << r << dendl;
185
186 std::lock_guard locker{m_lock};
187
188 if (r < 0) {
189 derr << "error shutting down leader lock: " << cpp_strerror(r) << dendl;
190 }
191
192 unregister_watch();
193 }
194
195 template <typename I>
196 void LeaderWatcher<I>::unregister_watch() {
197 dout(10) << dendl;
198
199 ceph_assert(ceph_mutex_is_locked(m_lock));
200
201 Context *ctx = create_async_context_callback(
202 m_work_queue, create_context_callback<
203 LeaderWatcher<I>, &LeaderWatcher<I>::handle_unregister_watch>(this));
204
205 librbd::Watcher::unregister_watch(ctx);
206 }
207
208 template <typename I>
209 void LeaderWatcher<I>::handle_unregister_watch(int r) {
210 dout(10) << "r=" << r << dendl;
211
212 if (r < 0) {
213 derr << "error unregistering leader watcher for " << m_oid << " object: "
214 << cpp_strerror(r) << dendl;
215 }
216 wait_for_tasks();
217 }
218
219 template <typename I>
220 void LeaderWatcher<I>::wait_for_tasks() {
221 dout(10) << dendl;
222
223 std::scoped_lock locker{m_threads->timer_lock, m_lock};
224 schedule_timer_task("wait for tasks", 0, false,
225 &LeaderWatcher<I>::handle_wait_for_tasks, true);
226 }
227
228 template <typename I>
229 void LeaderWatcher<I>::handle_wait_for_tasks() {
230 dout(10) << dendl;
231
232 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
233 ceph_assert(ceph_mutex_is_locked(m_lock));
234 ceph_assert(m_on_shut_down_finish != nullptr);
235
236 ceph_assert(!m_timer_op_tracker.empty());
237 m_timer_op_tracker.finish_op();
238
239 auto ctx = new LambdaContext([this](int r) {
240 Context *on_finish;
241 {
242 // ensure lock isn't held when completing shut down
243 std::lock_guard locker{m_lock};
244 ceph_assert(m_on_shut_down_finish != nullptr);
245 on_finish = m_on_shut_down_finish;
246 }
247 on_finish->complete(0);
248 });
249 m_work_queue->queue(ctx, 0);
250 }
251
252 template <typename I>
253 bool LeaderWatcher<I>::is_blocklisted() const {
254 std::lock_guard locker{m_lock};
255 return m_blocklisted;
256 }
257
258 template <typename I>
259 bool LeaderWatcher<I>::is_leader() const {
260 std::lock_guard locker{m_lock};
261 return is_leader(m_lock);
262 }
263
264 template <typename I>
265 bool LeaderWatcher<I>::is_leader(ceph::mutex &lock) const {
266 ceph_assert(ceph_mutex_is_locked(m_lock));
267
268 bool leader = m_leader_lock->is_leader();
269 dout(10) << leader << dendl;
270 return leader;
271 }
272
273 template <typename I>
274 bool LeaderWatcher<I>::is_releasing_leader() const {
275 std::lock_guard locker{m_lock};
276 return is_releasing_leader(m_lock);
277 }
278
279 template <typename I>
280 bool LeaderWatcher<I>::is_releasing_leader(ceph::mutex &lock) const {
281 ceph_assert(ceph_mutex_is_locked(m_lock));
282
283 bool releasing = m_leader_lock->is_releasing_leader();
284 dout(10) << releasing << dendl;
285 return releasing;
286 }
287
288 template <typename I>
289 bool LeaderWatcher<I>::get_leader_instance_id(std::string *instance_id) const {
290 dout(10) << dendl;
291
292 std::lock_guard locker{m_lock};
293
294 if (is_leader(m_lock) || is_releasing_leader(m_lock)) {
295 *instance_id = m_instance_id;
296 return true;
297 }
298
299 if (!m_locker.cookie.empty()) {
300 *instance_id = stringify(m_locker.entity.num());
301 return true;
302 }
303
304 return false;
305 }
306
307 template <typename I>
308 void LeaderWatcher<I>::release_leader() {
309 dout(10) << dendl;
310
311 std::lock_guard locker{m_lock};
312 if (!is_leader(m_lock)) {
313 return;
314 }
315
316 release_leader_lock();
317 }
318
319 template <typename I>
320 void LeaderWatcher<I>::list_instances(std::vector<std::string> *instance_ids) {
321 dout(10) << dendl;
322
323 std::lock_guard locker{m_lock};
324
325 instance_ids->clear();
326 if (m_instances != nullptr) {
327 m_instances->list(instance_ids);
328 }
329 }
330
331 template <typename I>
332 void LeaderWatcher<I>::cancel_timer_task() {
333 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
334 ceph_assert(ceph_mutex_is_locked(m_lock));
335
336 if (m_timer_task == nullptr) {
337 return;
338 }
339
340 dout(10) << m_timer_task << dendl;
341 bool canceled = m_threads->timer->cancel_event(m_timer_task);
342 ceph_assert(canceled);
343 m_timer_task = nullptr;
344 }
345
346 template <typename I>
347 void LeaderWatcher<I>::schedule_timer_task(const std::string &name,
348 int delay_factor, bool leader,
349 TimerCallback timer_callback,
350 bool shutting_down) {
351 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
352 ceph_assert(ceph_mutex_is_locked(m_lock));
353
354 if (!shutting_down && m_on_shut_down_finish != nullptr) {
355 return;
356 }
357
358 cancel_timer_task();
359
360 m_timer_task = new LambdaContext(
361 [this, leader, timer_callback](int r) {
362 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
363 m_timer_task = nullptr;
364
365 if (m_timer_op_tracker.empty()) {
366 std::lock_guard locker{m_lock};
367 execute_timer_task(leader, timer_callback);
368 return;
369 }
370
371 // old timer task is still running -- do not start next
372 // task until the previous task completes
373 if (m_timer_gate == nullptr) {
374 m_timer_gate = new C_TimerGate(this);
375 m_timer_op_tracker.wait_for_ops(m_timer_gate);
376 }
377 m_timer_gate->leader = leader;
378 m_timer_gate->timer_callback = timer_callback;
379 });
380
381 int after = delay_factor * m_cct->_conf.get_val<uint64_t>(
382 "rbd_mirror_leader_heartbeat_interval");
383
384 dout(10) << "scheduling " << name << " after " << after << " sec (task "
385 << m_timer_task << ")" << dendl;
386 m_threads->timer->add_event_after(after, m_timer_task);
387 }
388
389 template <typename I>
390 void LeaderWatcher<I>::execute_timer_task(bool leader,
391 TimerCallback timer_callback) {
392 dout(10) << dendl;
393
394 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
395 ceph_assert(ceph_mutex_is_locked(m_lock));
396 ceph_assert(m_timer_op_tracker.empty());
397
398 if (is_leader(m_lock) != leader) {
399 return;
400 }
401
402 m_timer_op_tracker.start_op();
403 (this->*timer_callback)();
404 }
405
406 template <typename I>
407 void LeaderWatcher<I>::handle_post_acquire_leader_lock(int r,
408 Context *on_finish) {
409 dout(10) << "r=" << r << dendl;
410
411 if (r < 0) {
412 if (r == -EAGAIN) {
413 dout(10) << "already locked" << dendl;
414 } else {
415 derr << "error acquiring leader lock: " << cpp_strerror(r) << dendl;
416 }
417 on_finish->complete(r);
418 return;
419 }
420
421 std::lock_guard locker{m_lock};
422 ceph_assert(m_on_finish == nullptr);
423 m_on_finish = on_finish;
424 m_ret_val = 0;
425
426 init_instances();
427 }
428
429 template <typename I>
430 void LeaderWatcher<I>::handle_pre_release_leader_lock(Context *on_finish) {
431 dout(10) << dendl;
432
433 std::lock_guard locker{m_lock};
434 ceph_assert(m_on_finish == nullptr);
435 m_on_finish = on_finish;
436 m_ret_val = 0;
437
438 notify_listener();
439 }
440
441 template <typename I>
442 void LeaderWatcher<I>::handle_post_release_leader_lock(int r,
443 Context *on_finish) {
444 dout(10) << "r=" << r << dendl;
445
446 if (r < 0) {
447 on_finish->complete(r);
448 return;
449 }
450
451 std::lock_guard locker{m_lock};
452 ceph_assert(m_on_finish == nullptr);
453 m_on_finish = on_finish;
454
455 notify_lock_released();
456 }
457
458 template <typename I>
459 void LeaderWatcher<I>::break_leader_lock() {
460 dout(10) << dendl;
461
462 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
463 ceph_assert(ceph_mutex_is_locked(m_lock));
464 ceph_assert(!m_timer_op_tracker.empty());
465
466 if (m_locker.cookie.empty()) {
467 get_locker();
468 return;
469 }
470
471 Context *ctx = create_async_context_callback(
472 m_work_queue, create_context_callback<
473 LeaderWatcher<I>, &LeaderWatcher<I>::handle_break_leader_lock>(this));
474
475 m_leader_lock->break_lock(m_locker, true, ctx);
476 }
477
478 template <typename I>
479 void LeaderWatcher<I>::handle_break_leader_lock(int r) {
480 dout(10) << "r=" << r << dendl;
481
482 std::scoped_lock locker{m_threads->timer_lock, m_lock};
483 ceph_assert(!m_timer_op_tracker.empty());
484
485 if (m_leader_lock->is_shutdown()) {
486 dout(10) << "canceling due to shutdown" << dendl;
487 m_timer_op_tracker.finish_op();
488 return;
489 }
490
491 if (r < 0 && r != -ENOENT) {
492 derr << "error breaking leader lock: " << cpp_strerror(r) << dendl;
493 schedule_acquire_leader_lock(1);
494 m_timer_op_tracker.finish_op();
495 return;
496 }
497
498 m_locker = {};
499 m_acquire_attempts = 0;
500 acquire_leader_lock();
501 }
502
503 template <typename I>
504 void LeaderWatcher<I>::schedule_get_locker(bool reset_leader,
505 uint32_t delay_factor) {
506 dout(10) << dendl;
507
508 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
509 ceph_assert(ceph_mutex_is_locked(m_lock));
510
511 if (reset_leader) {
512 m_locker = {};
513 m_acquire_attempts = 0;
514 }
515
516 schedule_timer_task("get locker", delay_factor, false,
517 &LeaderWatcher<I>::get_locker, false);
518 }
519
520 template <typename I>
521 void LeaderWatcher<I>::get_locker() {
522 dout(10) << dendl;
523
524 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
525 ceph_assert(ceph_mutex_is_locked(m_lock));
526 ceph_assert(!m_timer_op_tracker.empty());
527
528 C_GetLocker *get_locker_ctx = new C_GetLocker(this);
529 Context *ctx = create_async_context_callback(m_work_queue, get_locker_ctx);
530
531 m_leader_lock->get_locker(&get_locker_ctx->locker, ctx);
532 }
533
534 template <typename I>
535 void LeaderWatcher<I>::handle_get_locker(int r,
536 librbd::managed_lock::Locker& locker) {
537 dout(10) << "r=" << r << dendl;
538
539 std::scoped_lock l{m_threads->timer_lock, m_lock};
540 ceph_assert(!m_timer_op_tracker.empty());
541
542 if (m_leader_lock->is_shutdown()) {
543 dout(10) << "canceling due to shutdown" << dendl;
544 m_timer_op_tracker.finish_op();
545 return;
546 }
547
548 if (is_leader(m_lock)) {
549 m_locker = {};
550 m_timer_op_tracker.finish_op();
551 return;
552 }
553
554 if (r == -ENOENT) {
555 m_locker = {};
556 m_acquire_attempts = 0;
557 acquire_leader_lock();
558 return;
559 } else if (r < 0) {
560 derr << "error retrieving leader locker: " << cpp_strerror(r) << dendl;
561 schedule_get_locker(true, 1);
562 m_timer_op_tracker.finish_op();
563 return;
564 }
565
566 bool notify_listener = false;
567 if (m_locker != locker) {
568 m_locker = locker;
569 notify_listener = true;
570 if (m_acquire_attempts > 1) {
571 dout(10) << "new lock owner detected -- resetting heartbeat counter"
572 << dendl;
573 m_acquire_attempts = 0;
574 }
575 }
576
577 if (m_acquire_attempts >= m_cct->_conf.get_val<uint64_t>(
578 "rbd_mirror_leader_max_acquire_attempts_before_break")) {
579 dout(0) << "breaking leader lock after " << m_acquire_attempts << " "
580 << "failed attempts to acquire" << dendl;
581 break_leader_lock();
582 return;
583 }
584
585 schedule_acquire_leader_lock(1);
586
587 if (!notify_listener) {
588 m_timer_op_tracker.finish_op();
589 return;
590 }
591
592 auto ctx = new LambdaContext(
593 [this](int r) {
594 std::string instance_id;
595 if (get_leader_instance_id(&instance_id)) {
596 m_listener->update_leader_handler(instance_id);
597 }
598 std::scoped_lock locker{m_threads->timer_lock, m_lock};
599 m_timer_op_tracker.finish_op();
600 });
601 m_work_queue->queue(ctx, 0);
602 }
603
604 template <typename I>
605 void LeaderWatcher<I>::schedule_acquire_leader_lock(uint32_t delay_factor) {
606 dout(10) << dendl;
607
608 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
609 ceph_assert(ceph_mutex_is_locked(m_lock));
610
611 schedule_timer_task("acquire leader lock",
612 delay_factor *
613 m_cct->_conf.get_val<uint64_t>("rbd_mirror_leader_max_missed_heartbeats"),
614 false, &LeaderWatcher<I>::acquire_leader_lock, false);
615 }
616
617 template <typename I>
618 void LeaderWatcher<I>::acquire_leader_lock() {
619 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
620 ceph_assert(ceph_mutex_is_locked(m_lock));
621 ceph_assert(!m_timer_op_tracker.empty());
622
623 ++m_acquire_attempts;
624 dout(10) << "acquire_attempts=" << m_acquire_attempts << dendl;
625
626 Context *ctx = create_async_context_callback(
627 m_work_queue, create_context_callback<
628 LeaderWatcher<I>, &LeaderWatcher<I>::handle_acquire_leader_lock>(this));
629 m_leader_lock->try_acquire_lock(ctx);
630 }
631
632 template <typename I>
633 void LeaderWatcher<I>::handle_acquire_leader_lock(int r) {
634 dout(10) << "r=" << r << dendl;
635
636 std::scoped_lock locker{m_threads->timer_lock, m_lock};
637 ceph_assert(!m_timer_op_tracker.empty());
638
639 if (m_leader_lock->is_shutdown()) {
640 dout(10) << "canceling due to shutdown" << dendl;
641 m_timer_op_tracker.finish_op();
642 return;
643 }
644
645 if (r < 0) {
646 if (r == -EAGAIN) {
647 dout(10) << "already locked" << dendl;
648 } else {
649 derr << "error acquiring lock: " << cpp_strerror(r) << dendl;
650 }
651
652 get_locker();
653 return;
654 }
655
656 m_locker = {};
657 m_acquire_attempts = 0;
658
659 if (m_ret_val) {
660 dout(5) << "releasing due to error on notify" << dendl;
661 release_leader_lock();
662 m_timer_op_tracker.finish_op();
663 return;
664 }
665
666 notify_heartbeat();
667 }
668
669 template <typename I>
670 void LeaderWatcher<I>::release_leader_lock() {
671 dout(10) << dendl;
672
673 ceph_assert(ceph_mutex_is_locked(m_lock));
674
675 Context *ctx = create_async_context_callback(
676 m_work_queue, create_context_callback<
677 LeaderWatcher<I>, &LeaderWatcher<I>::handle_release_leader_lock>(this));
678
679 m_leader_lock->release_lock(ctx);
680 }
681
682 template <typename I>
683 void LeaderWatcher<I>::handle_release_leader_lock(int r) {
684 dout(10) << "r=" << r << dendl;
685
686 std::scoped_lock locker{m_threads->timer_lock, m_lock};
687
688 if (r < 0) {
689 derr << "error releasing lock: " << cpp_strerror(r) << dendl;
690 return;
691 }
692
693 schedule_acquire_leader_lock(1);
694 }
695
696 template <typename I>
697 void LeaderWatcher<I>::init_instances() {
698 dout(10) << dendl;
699
700 ceph_assert(ceph_mutex_is_locked(m_lock));
701 ceph_assert(m_instances == nullptr);
702
703 m_instances = Instances<I>::create(m_threads, m_ioctx, m_instance_id,
704 m_instances_listener);
705
706 Context *ctx = create_context_callback<
707 LeaderWatcher<I>, &LeaderWatcher<I>::handle_init_instances>(this);
708
709 m_instances->init(ctx);
710 }
711
712 template <typename I>
713 void LeaderWatcher<I>::handle_init_instances(int r) {
714 dout(10) << "r=" << r << dendl;
715
716 Context *on_finish = nullptr;
717 if (r < 0) {
718 std::lock_guard locker{m_lock};
719 derr << "error initializing instances: " << cpp_strerror(r) << dendl;
720 m_instances->destroy();
721 m_instances = nullptr;
722
723 ceph_assert(m_on_finish != nullptr);
724 std::swap(m_on_finish, on_finish);
725 } else {
726 std::lock_guard locker{m_lock};
727 notify_listener();
728 return;
729 }
730
731 on_finish->complete(r);
732 }
733
734 template <typename I>
735 void LeaderWatcher<I>::shut_down_instances() {
736 dout(10) << dendl;
737
738 ceph_assert(ceph_mutex_is_locked(m_lock));
739 ceph_assert(m_instances != nullptr);
740
741 Context *ctx = create_async_context_callback(
742 m_work_queue, create_context_callback<LeaderWatcher<I>,
743 &LeaderWatcher<I>::handle_shut_down_instances>(this));
744
745 m_instances->shut_down(ctx);
746 }
747
748 template <typename I>
749 void LeaderWatcher<I>::handle_shut_down_instances(int r) {
750 dout(10) << "r=" << r << dendl;
751 ceph_assert(r == 0);
752
753 Context *on_finish = nullptr;
754 {
755 std::lock_guard locker{m_lock};
756
757 m_instances->destroy();
758 m_instances = nullptr;
759
760 ceph_assert(m_on_finish != nullptr);
761 std::swap(m_on_finish, on_finish);
762 }
763 on_finish->complete(r);
764 }
765
766 template <typename I>
767 void LeaderWatcher<I>::notify_listener() {
768 dout(10) << dendl;
769
770 ceph_assert(ceph_mutex_is_locked(m_lock));
771
772 Context *ctx = create_async_context_callback(
773 m_work_queue, create_context_callback<
774 LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_listener>(this));
775
776 if (is_leader(m_lock)) {
777 ctx = new LambdaContext(
778 [this, ctx](int r) {
779 m_listener->post_acquire_handler(ctx);
780 });
781 } else {
782 ctx = new LambdaContext(
783 [this, ctx](int r) {
784 m_listener->pre_release_handler(ctx);
785 });
786 }
787 m_work_queue->queue(ctx, 0);
788 }
789
790 template <typename I>
791 void LeaderWatcher<I>::handle_notify_listener(int r) {
792 dout(10) << "r=" << r << dendl;
793
794 std::lock_guard locker{m_lock};
795
796 if (r < 0) {
797 derr << "error notifying listener: " << cpp_strerror(r) << dendl;
798 m_ret_val = r;
799 }
800
801 if (is_leader(m_lock)) {
802 notify_lock_acquired();
803 } else {
804 shut_down_instances();
805 }
806 }
807
808 template <typename I>
809 void LeaderWatcher<I>::notify_lock_acquired() {
810 dout(10) << dendl;
811
812 ceph_assert(ceph_mutex_is_locked(m_lock));
813
814 Context *ctx = create_context_callback<
815 LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_lock_acquired>(this);
816
817 bufferlist bl;
818 encode(NotifyMessage{LockAcquiredPayload{}}, bl);
819
820 send_notify(bl, nullptr, ctx);
821 }
822
823 template <typename I>
824 void LeaderWatcher<I>::handle_notify_lock_acquired(int r) {
825 dout(10) << "r=" << r << dendl;
826
827 Context *on_finish = nullptr;
828 {
829 std::lock_guard locker{m_lock};
830 if (r < 0 && r != -ETIMEDOUT) {
831 derr << "error notifying leader lock acquired: " << cpp_strerror(r)
832 << dendl;
833 m_ret_val = r;
834 }
835
836 ceph_assert(m_on_finish != nullptr);
837 std::swap(m_on_finish, on_finish);
838
839 if (m_ret_val == 0) {
840 // listener should be ready for instance add/remove events now
841 m_instances->unblock_listener();
842 }
843 }
844 on_finish->complete(0);
845 }
846
847 template <typename I>
848 void LeaderWatcher<I>::notify_lock_released() {
849 dout(10) << dendl;
850
851 ceph_assert(ceph_mutex_is_locked(m_lock));
852
853 Context *ctx = create_context_callback<
854 LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_lock_released>(this);
855
856 bufferlist bl;
857 encode(NotifyMessage{LockReleasedPayload{}}, bl);
858
859 send_notify(bl, nullptr, ctx);
860 }
861
862 template <typename I>
863 void LeaderWatcher<I>::handle_notify_lock_released(int r) {
864 dout(10) << "r=" << r << dendl;
865
866 Context *on_finish = nullptr;
867 {
868 std::lock_guard locker{m_lock};
869 if (r < 0 && r != -ETIMEDOUT) {
870 derr << "error notifying leader lock released: " << cpp_strerror(r)
871 << dendl;
872 }
873
874 ceph_assert(m_on_finish != nullptr);
875 std::swap(m_on_finish, on_finish);
876 }
877 on_finish->complete(r);
878 }
879
880 template <typename I>
881 void LeaderWatcher<I>::notify_heartbeat() {
882 dout(10) << dendl;
883
884 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
885 ceph_assert(ceph_mutex_is_locked(m_lock));
886 ceph_assert(!m_timer_op_tracker.empty());
887
888 if (!is_leader(m_lock)) {
889 dout(5) << "not leader, canceling" << dendl;
890 m_timer_op_tracker.finish_op();
891 return;
892 }
893
894 Context *ctx = create_context_callback<
895 LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_heartbeat>(this);
896
897 bufferlist bl;
898 encode(NotifyMessage{HeartbeatPayload{}}, bl);
899
900 m_heartbeat_response.acks.clear();
901 send_notify(bl, &m_heartbeat_response, ctx);
902 }
903
904 template <typename I>
905 void LeaderWatcher<I>::handle_notify_heartbeat(int r) {
906 dout(10) << "r=" << r << dendl;
907
908 std::scoped_lock locker{m_threads->timer_lock, m_lock};
909 ceph_assert(!m_timer_op_tracker.empty());
910
911 m_timer_op_tracker.finish_op();
912 if (m_leader_lock->is_shutdown()) {
913 dout(10) << "canceling due to shutdown" << dendl;
914 return;
915 } else if (!is_leader(m_lock)) {
916 return;
917 }
918
919 if (r < 0 && r != -ETIMEDOUT) {
920 derr << "error notifying heartbeat: " << cpp_strerror(r)
921 << ", releasing leader" << dendl;
922 release_leader_lock();
923 return;
924 }
925
926 dout(10) << m_heartbeat_response.acks.size() << " acks received, "
927 << m_heartbeat_response.timeouts.size() << " timed out" << dendl;
928
929 std::vector<std::string> instance_ids;
930 for (auto &it: m_heartbeat_response.acks) {
931 uint64_t notifier_id = it.first.gid;
932 instance_ids.push_back(stringify(notifier_id));
933 }
934 if (!instance_ids.empty()) {
935 m_instances->acked(instance_ids);
936 }
937
938 schedule_timer_task("heartbeat", 1, true,
939 &LeaderWatcher<I>::notify_heartbeat, false);
940 }
941
942 template <typename I>
943 void LeaderWatcher<I>::handle_heartbeat(Context *on_notify_ack) {
944 dout(10) << dendl;
945
946 {
947 std::scoped_lock locker{m_threads->timer_lock, m_lock};
948 if (is_leader(m_lock)) {
949 dout(5) << "got another leader heartbeat, ignoring" << dendl;
950 } else if (!m_locker.cookie.empty()) {
951 cancel_timer_task();
952 m_acquire_attempts = 0;
953 schedule_acquire_leader_lock(1);
954 }
955 }
956
957 on_notify_ack->complete(0);
958 }
959
960 template <typename I>
961 void LeaderWatcher<I>::handle_lock_acquired(Context *on_notify_ack) {
962 dout(10) << dendl;
963
964 {
965 std::scoped_lock locker{m_threads->timer_lock, m_lock};
966 if (is_leader(m_lock)) {
967 dout(5) << "got another leader lock_acquired, ignoring" << dendl;
968 } else {
969 cancel_timer_task();
970 schedule_get_locker(true, 0);
971 }
972 }
973
974 on_notify_ack->complete(0);
975 }
976
977 template <typename I>
978 void LeaderWatcher<I>::handle_lock_released(Context *on_notify_ack) {
979 dout(10) << dendl;
980
981 {
982 std::scoped_lock locker{m_threads->timer_lock, m_lock};
983 if (is_leader(m_lock)) {
984 dout(5) << "got another leader lock_released, ignoring" << dendl;
985 } else {
986 cancel_timer_task();
987 schedule_get_locker(true, 0);
988 }
989 }
990
991 on_notify_ack->complete(0);
992 }
993
994 template <typename I>
995 void LeaderWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
996 uint64_t notifier_id, bufferlist &bl) {
997 dout(10) << "notify_id=" << notify_id << ", handle=" << handle << ", "
998 << "notifier_id=" << notifier_id << dendl;
999
1000 Context *ctx = new C_NotifyAck(this, notify_id, handle);
1001
1002 if (notifier_id == m_notifier_id) {
1003 dout(10) << "our own notification, ignoring" << dendl;
1004 ctx->complete(0);
1005 return;
1006 }
1007
1008 NotifyMessage notify_message;
1009 try {
1010 auto iter = bl.cbegin();
1011 decode(notify_message, iter);
1012 } catch (const buffer::error &err) {
1013 derr << "error decoding image notification: " << err.what() << dendl;
1014 ctx->complete(0);
1015 return;
1016 }
1017
1018 apply_visitor(HandlePayloadVisitor(this, ctx), notify_message.payload);
1019 }
1020
1021 template <typename I>
1022 void LeaderWatcher<I>::handle_rewatch_complete(int r) {
1023 dout(5) << "r=" << r << dendl;
1024
1025 if (r == -EBLOCKLISTED) {
1026 dout(1) << "blocklisted detected" << dendl;
1027 m_blocklisted = true;
1028 return;
1029 }
1030
1031 m_leader_lock->reacquire_lock(nullptr);
1032 }
1033
1034 template <typename I>
1035 void LeaderWatcher<I>::handle_payload(const HeartbeatPayload &payload,
1036 Context *on_notify_ack) {
1037 dout(10) << "heartbeat" << dendl;
1038
1039 handle_heartbeat(on_notify_ack);
1040 }
1041
1042 template <typename I>
1043 void LeaderWatcher<I>::handle_payload(const LockAcquiredPayload &payload,
1044 Context *on_notify_ack) {
1045 dout(10) << "lock_acquired" << dendl;
1046
1047 handle_lock_acquired(on_notify_ack);
1048 }
1049
1050 template <typename I>
1051 void LeaderWatcher<I>::handle_payload(const LockReleasedPayload &payload,
1052 Context *on_notify_ack) {
1053 dout(10) << "lock_released" << dendl;
1054
1055 handle_lock_released(on_notify_ack);
1056 }
1057
1058 template <typename I>
1059 void LeaderWatcher<I>::handle_payload(const UnknownPayload &payload,
1060 Context *on_notify_ack) {
1061 dout(10) << "unknown" << dendl;
1062
1063 on_notify_ack->complete(0);
1064 }
1065
1066 } // namespace mirror
1067 } // namespace rbd
1068
1069 template class rbd::mirror::LeaderWatcher<librbd::ImageCtx>;