1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "InstanceWatcher.h"
5 #include "include/stringify.h"
6 #include "common/debug.h"
7 #include "common/errno.h"
8 #include "cls/rbd/cls_rbd_client.h"
9 #include "librbd/ManagedLock.h"
10 #include "librbd/Utils.h"
11 #include "InstanceReplayer.h"
12 #include "ImageSyncThrottler.h"
14 #define dout_context g_ceph_context
15 #define dout_subsys ceph_subsys_rbd_mirror
17 #define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: "
22 using namespace instance_watcher
;
24 using librbd::util::create_async_context_callback
;
25 using librbd::util::create_context_callback
;
26 using librbd::util::create_rados_callback
;
27 using librbd::util::unique_lock_name
;
31 struct C_GetInstances
: public Context
{
32 std::vector
<std::string
> *instance_ids
;
36 C_GetInstances(std::vector
<std::string
> *instance_ids
, Context
*on_finish
)
37 : instance_ids(instance_ids
), on_finish(on_finish
) {
40 void finish(int r
) override
{
41 dout(20) << "C_GetInstances: " << this << " " << __func__
<< ": r=" << r
45 bufferlist::iterator it
= out_bl
.begin();
46 r
= librbd::cls_client::mirror_instances_list_finish(&it
, instance_ids
);
47 } else if (r
== -ENOENT
) {
50 on_finish
->complete(r
);
55 struct C_RemoveInstanceRequest
: public Context
{
56 InstanceWatcher
<I
> instance_watcher
;
59 C_RemoveInstanceRequest(librados::IoCtx
&io_ctx
, ContextWQ
*work_queue
,
60 const std::string
&instance_id
, Context
*on_finish
)
61 : instance_watcher(io_ctx
, work_queue
, nullptr, instance_id
),
62 on_finish(on_finish
) {
66 dout(20) << "C_RemoveInstanceRequest: " << this << " " << __func__
<< dendl
;
68 instance_watcher
.remove(this);
71 void finish(int r
) override
{
72 dout(20) << "C_RemoveInstanceRequest: " << this << " " << __func__
<< ": r="
76 on_finish
->complete(r
);
80 } // anonymous namespace
83 struct InstanceWatcher
<I
>::C_NotifyInstanceRequest
: public Context
{
84 InstanceWatcher
<I
> *instance_watcher
;
85 std::string instance_id
;
90 std::unique_ptr
<librbd::watcher::Notifier
> notifier
;
91 librbd::watcher::NotifyResponse response
;
92 bool canceling
= false;
94 C_NotifyInstanceRequest(InstanceWatcher
<I
> *instance_watcher
,
95 const std::string
&instance_id
, uint64_t request_id
,
96 bufferlist
&&bl
, Context
*on_finish
)
97 : instance_watcher(instance_watcher
), instance_id(instance_id
),
98 request_id(request_id
), bl(bl
), on_finish(on_finish
),
99 send_to_leader(instance_id
.empty()) {
100 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
101 << ": instance_watcher=" << instance_watcher
<< ", instance_id="
102 << instance_id
<< ", request_id=" << request_id
<< dendl
;
104 assert(instance_watcher
->m_lock
.is_locked());
106 if (!send_to_leader
) {
107 assert((!instance_id
.empty()));
108 notifier
.reset(new librbd::watcher::Notifier(
109 instance_watcher
->m_work_queue
,
110 instance_watcher
->m_ioctx
,
111 RBD_MIRROR_INSTANCE_PREFIX
+ instance_id
));
114 instance_watcher
->m_notify_op_tracker
.start_op();
115 auto result
= instance_watcher
->m_notify_ops
.insert(
116 std::make_pair(instance_id
, this)).second
;
121 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
<< dendl
;
123 assert(instance_watcher
->m_lock
.is_locked());
126 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
127 << ": canceling" << dendl
;
128 instance_watcher
->m_work_queue
->queue(this, -ECANCELED
);
132 if (send_to_leader
) {
133 if (instance_watcher
->m_leader_instance_id
.empty()) {
134 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
135 << ": suspending" << dendl
;
136 instance_watcher
->suspend_notify_request(this);
140 if (instance_watcher
->m_leader_instance_id
!= instance_id
) {
141 auto count
= instance_watcher
->m_notify_ops
.erase(
142 std::make_pair(instance_id
, this));
145 instance_id
= instance_watcher
->m_leader_instance_id
;
147 auto result
= instance_watcher
->m_notify_ops
.insert(
148 std::make_pair(instance_id
, this)).second
;
151 notifier
.reset(new librbd::watcher::Notifier(
152 instance_watcher
->m_work_queue
,
153 instance_watcher
->m_ioctx
,
154 RBD_MIRROR_INSTANCE_PREFIX
+ instance_id
));
158 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
159 << ": sending to " << instance_id
<< dendl
;
160 notifier
->notify(bl
, &response
, this);
164 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
<< dendl
;
166 assert(instance_watcher
->m_lock
.is_locked());
169 instance_watcher
->unsuspend_notify_request(this);
172 void finish(int r
) override
{
173 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
<< ": r="
176 if (r
== 0 || r
== -ETIMEDOUT
) {
178 for (auto &it
: response
.acks
) {
179 auto &bl
= it
.second
;
180 if (it
.second
.length() == 0) {
181 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
182 << ": no payload in ack, ignoring" << dendl
;
186 auto iter
= bl
.begin();
187 NotifyAckPayload ack
;
189 if (ack
.instance_id
!= instance_watcher
->get_instance_id()) {
190 derr
<< "C_NotifyInstanceRequest: " << this << " " << __func__
191 << ": ack instance_id (" << ack
.instance_id
<< ") "
192 << "does not match, ignoring" << dendl
;
195 if (ack
.request_id
!= request_id
) {
196 derr
<< "C_NotifyInstanceRequest: " << this << " " << __func__
197 << ": ack request_id (" << ack
.request_id
<< ") "
198 << "does not match, ignoring" << dendl
;
204 } catch (const buffer::error
&err
) {
205 derr
<< "C_NotifyInstanceRequest: " << this << " " << __func__
206 << ": failed to decode ack: " << err
.what() << dendl
;
212 if (r
== -ETIMEDOUT
) {
213 derr
<< "C_NotifyInstanceRequest: " << this << " " << __func__
214 << ": resending after timeout" << dendl
;
215 Mutex::Locker
locker(instance_watcher
->m_lock
);
222 if (r
== -ESTALE
&& send_to_leader
) {
223 derr
<< "C_NotifyInstanceRequest: " << this << " " << __func__
224 << ": resending due to leader change" << dendl
;
225 Mutex::Locker
locker(instance_watcher
->m_lock
);
232 on_finish
->complete(r
);
235 Mutex::Locker
locker(instance_watcher
->m_lock
);
236 auto result
= instance_watcher
->m_notify_ops
.erase(
237 std::make_pair(instance_id
, this));
239 instance_watcher
->m_notify_op_tracker
.finish_op();
245 void complete(int r
) override
{
250 template <typename I
>
251 struct InstanceWatcher
<I
>::C_SyncRequest
: public Context
{
252 InstanceWatcher
<I
> *instance_watcher
;
255 Context
*on_complete
= nullptr;
256 C_NotifyInstanceRequest
*req
= nullptr;
258 C_SyncRequest(InstanceWatcher
<I
> *instance_watcher
,
259 const std::string
&sync_id
, Context
*on_start
)
260 : instance_watcher(instance_watcher
), sync_id(sync_id
),
262 dout(20) << "C_SyncRequest: " << this << " " << __func__
<< ": sync_id="
266 void finish(int r
) override
{
267 dout(20) << "C_SyncRequest: " << this << " " << __func__
<< ": r="
270 if (on_start
!= nullptr) {
271 instance_watcher
->handle_notify_sync_request(this, r
);
273 instance_watcher
->handle_notify_sync_complete(this, r
);
279 void complete(int r
) override
{
285 #define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: " \
286 << this << " " << __func__ << ": "
287 template <typename I
>
288 void InstanceWatcher
<I
>::get_instances(librados::IoCtx
&io_ctx
,
289 std::vector
<std::string
> *instance_ids
,
290 Context
*on_finish
) {
291 librados::ObjectReadOperation op
;
292 librbd::cls_client::mirror_instances_list_start(&op
);
293 C_GetInstances
*ctx
= new C_GetInstances(instance_ids
, on_finish
);
294 librados::AioCompletion
*aio_comp
= create_rados_callback(ctx
);
296 int r
= io_ctx
.aio_operate(RBD_MIRROR_LEADER
, aio_comp
, &op
, &ctx
->out_bl
);
301 template <typename I
>
302 void InstanceWatcher
<I
>::remove_instance(librados::IoCtx
&io_ctx
,
303 ContextWQ
*work_queue
,
304 const std::string
&instance_id
,
305 Context
*on_finish
) {
306 auto req
= new C_RemoveInstanceRequest
<I
>(io_ctx
, work_queue
, instance_id
,
311 template <typename I
>
312 InstanceWatcher
<I
> *InstanceWatcher
<I
>::create(
313 librados::IoCtx
&io_ctx
, ContextWQ
*work_queue
,
314 InstanceReplayer
<I
> *instance_replayer
) {
315 return new InstanceWatcher
<I
>(io_ctx
, work_queue
, instance_replayer
,
316 stringify(io_ctx
.get_instance_id()));
319 template <typename I
>
320 InstanceWatcher
<I
>::InstanceWatcher(librados::IoCtx
&io_ctx
,
321 ContextWQ
*work_queue
,
322 InstanceReplayer
<I
> *instance_replayer
,
323 const std::string
&instance_id
)
324 : Watcher(io_ctx
, work_queue
, RBD_MIRROR_INSTANCE_PREFIX
+ instance_id
),
325 m_instance_replayer(instance_replayer
), m_instance_id(instance_id
),
326 m_lock(unique_lock_name("rbd::mirror::InstanceWatcher::m_lock", this)),
327 m_instance_lock(librbd::ManagedLock
<I
>::create(
328 m_ioctx
, m_work_queue
, m_oid
, this, librbd::managed_lock::EXCLUSIVE
, true,
329 m_cct
->_conf
->get_val
<int64_t>("rbd_blacklist_expire_seconds"))) {
332 template <typename I
>
333 InstanceWatcher
<I
>::~InstanceWatcher() {
334 assert(m_notify_ops
.empty());
335 assert(m_notify_op_tracker
.empty());
336 assert(m_suspended_ops
.empty());
337 assert(m_inflight_sync_reqs
.empty());
338 assert(m_image_sync_throttler
== nullptr);
339 m_instance_lock
->destroy();
342 template <typename I
>
343 int InstanceWatcher
<I
>::init() {
344 C_SaferCond init_ctx
;
346 return init_ctx
.wait();
349 template <typename I
>
350 void InstanceWatcher
<I
>::init(Context
*on_finish
) {
351 dout(20) << "instance_id=" << m_instance_id
<< dendl
;
353 Mutex::Locker
locker(m_lock
);
355 assert(m_on_finish
== nullptr);
356 m_on_finish
= on_finish
;
362 template <typename I
>
363 void InstanceWatcher
<I
>::shut_down() {
364 C_SaferCond shut_down_ctx
;
365 shut_down(&shut_down_ctx
);
366 int r
= shut_down_ctx
.wait();
370 template <typename I
>
371 void InstanceWatcher
<I
>::shut_down(Context
*on_finish
) {
374 Mutex::Locker
locker(m_lock
);
376 assert(m_on_finish
== nullptr);
377 m_on_finish
= on_finish
;
383 template <typename I
>
384 void InstanceWatcher
<I
>::remove(Context
*on_finish
) {
387 Mutex::Locker
locker(m_lock
);
389 assert(m_on_finish
== nullptr);
390 m_on_finish
= on_finish
;
394 get_instance_locker();
397 template <typename I
>
398 void InstanceWatcher
<I
>::notify_image_acquire(
399 const std::string
&instance_id
, const std::string
&global_image_id
,
400 Context
*on_notify_ack
) {
401 dout(20) << "instance_id=" << instance_id
<< ", global_image_id="
402 << global_image_id
<< dendl
;
404 Mutex::Locker
locker(m_lock
);
406 assert(m_on_finish
== nullptr);
408 if (instance_id
== m_instance_id
) {
409 handle_image_acquire(global_image_id
, on_notify_ack
);
411 uint64_t request_id
= ++m_request_seq
;
413 ::encode(NotifyMessage
{ImageAcquirePayload
{request_id
, global_image_id
}},
415 auto req
= new C_NotifyInstanceRequest(this, instance_id
, request_id
,
416 std::move(bl
), on_notify_ack
);
421 template <typename I
>
422 void InstanceWatcher
<I
>::notify_image_release(
423 const std::string
&instance_id
, const std::string
&global_image_id
,
424 Context
*on_notify_ack
) {
425 dout(20) << "instance_id=" << instance_id
<< ", global_image_id="
426 << global_image_id
<< dendl
;
428 Mutex::Locker
locker(m_lock
);
430 assert(m_on_finish
== nullptr);
432 if (instance_id
== m_instance_id
) {
433 handle_image_release(global_image_id
, on_notify_ack
);
435 uint64_t request_id
= ++m_request_seq
;
437 ::encode(NotifyMessage
{ImageReleasePayload
{request_id
, global_image_id
}},
439 auto req
= new C_NotifyInstanceRequest(this, instance_id
, request_id
,
440 std::move(bl
), on_notify_ack
);
445 template <typename I
>
446 void InstanceWatcher
<I
>::notify_peer_image_removed(
447 const std::string
&instance_id
, const std::string
&global_image_id
,
448 const std::string
&peer_mirror_uuid
, Context
*on_notify_ack
) {
449 dout(20) << "instance_id=" << instance_id
<< ", "
450 << "global_image_id=" << global_image_id
<< ", "
451 << "peer_mirror_uuid=" << peer_mirror_uuid
<< dendl
;
453 Mutex::Locker
locker(m_lock
);
454 assert(m_on_finish
== nullptr);
456 if (instance_id
== m_instance_id
) {
457 handle_peer_image_removed(global_image_id
, peer_mirror_uuid
, on_notify_ack
);
459 uint64_t request_id
= ++m_request_seq
;
461 ::encode(NotifyMessage
{PeerImageRemovedPayload
{request_id
, global_image_id
,
462 peer_mirror_uuid
}}, bl
);
463 auto req
= new C_NotifyInstanceRequest(this, instance_id
, request_id
,
464 std::move(bl
), on_notify_ack
);
469 template <typename I
>
470 void InstanceWatcher
<I
>::notify_sync_request(const std::string
&sync_id
,
471 Context
*on_sync_start
) {
472 dout(20) << "sync_id=" << sync_id
<< dendl
;
474 Mutex::Locker
locker(m_lock
);
476 assert(m_inflight_sync_reqs
.count(sync_id
) == 0);
478 uint64_t request_id
= ++m_request_seq
;
481 ::encode(NotifyMessage
{SyncRequestPayload
{request_id
, sync_id
}}, bl
);
483 auto sync_ctx
= new C_SyncRequest(this, sync_id
, on_sync_start
);
484 sync_ctx
->req
= new C_NotifyInstanceRequest(this, "", request_id
,
485 std::move(bl
), sync_ctx
);
487 m_inflight_sync_reqs
[sync_id
] = sync_ctx
;
488 sync_ctx
->req
->send();
491 template <typename I
>
492 bool InstanceWatcher
<I
>::cancel_sync_request(const std::string
&sync_id
) {
493 dout(20) << "sync_id=" << sync_id
<< dendl
;
495 Mutex::Locker
locker(m_lock
);
497 auto it
= m_inflight_sync_reqs
.find(sync_id
);
498 if (it
== m_inflight_sync_reqs
.end()) {
502 auto sync_ctx
= it
->second
;
504 if (sync_ctx
->on_start
== nullptr) {
508 assert(sync_ctx
->req
!= nullptr);
509 sync_ctx
->req
->cancel();
513 template <typename I
>
514 void InstanceWatcher
<I
>::notify_sync_start(const std::string
&instance_id
,
515 const std::string
&sync_id
) {
516 dout(20) << "sync_id=" << sync_id
<< dendl
;
518 Mutex::Locker
locker(m_lock
);
520 uint64_t request_id
= ++m_request_seq
;
523 ::encode(NotifyMessage
{SyncStartPayload
{request_id
, sync_id
}}, bl
);
525 auto ctx
= new FunctionContext(
526 [this, sync_id
] (int r
) {
527 dout(20) << "finish: sync_id=" << sync_id
<< ", r=" << r
<< dendl
;
528 Mutex::Locker
locker(m_lock
);
529 if (r
!= -ESTALE
&& m_image_sync_throttler
!= nullptr) {
530 m_image_sync_throttler
->finish_op(sync_id
);
533 auto req
= new C_NotifyInstanceRequest(this, instance_id
, request_id
,
538 template <typename I
>
539 void InstanceWatcher
<I
>::notify_sync_complete(const std::string
&sync_id
) {
540 Mutex::Locker
locker(m_lock
);
541 notify_sync_complete(m_lock
, sync_id
);
544 template <typename I
>
545 void InstanceWatcher
<I
>::notify_sync_complete(const Mutex
&,
546 const std::string
&sync_id
) {
547 dout(10) << "sync_id=" << sync_id
<< dendl
;
548 assert(m_lock
.is_locked());
550 auto it
= m_inflight_sync_reqs
.find(sync_id
);
551 assert(it
!= m_inflight_sync_reqs
.end());
553 auto sync_ctx
= it
->second
;
554 assert(sync_ctx
->req
== nullptr);
556 m_inflight_sync_reqs
.erase(it
);
557 m_work_queue
->queue(sync_ctx
, 0);
560 template <typename I
>
561 void InstanceWatcher
<I
>::handle_notify_sync_request(C_SyncRequest
*sync_ctx
,
563 dout(20) << "sync_id=" << sync_ctx
->sync_id
<< ", r=" << r
<< dendl
;
565 Context
*on_start
= nullptr;
567 Mutex::Locker
locker(m_lock
);
568 assert(sync_ctx
->req
!= nullptr);
569 assert(sync_ctx
->on_start
!= nullptr);
571 if (sync_ctx
->req
->canceling
) {
575 std::swap(sync_ctx
->on_start
, on_start
);
576 sync_ctx
->req
= nullptr;
578 if (r
== -ECANCELED
) {
579 notify_sync_complete(m_lock
, sync_ctx
->sync_id
);
583 on_start
->complete(r
== -ECANCELED
? r
: 0);
586 template <typename I
>
587 void InstanceWatcher
<I
>::handle_notify_sync_complete(C_SyncRequest
*sync_ctx
,
589 dout(20) << "sync_id=" << sync_ctx
->sync_id
<< ", r=" << r
<< dendl
;
591 if (sync_ctx
->on_complete
!= nullptr) {
592 sync_ctx
->on_complete
->complete(r
);
596 template <typename I
>
597 void InstanceWatcher
<I
>::print_sync_status(Formatter
*f
, stringstream
*ss
) {
600 Mutex::Locker
locker(m_lock
);
601 if (m_image_sync_throttler
!= nullptr) {
602 m_image_sync_throttler
->print_status(f
, ss
);
606 template <typename I
>
607 void InstanceWatcher
<I
>::handle_acquire_leader() {
610 Mutex::Locker
locker(m_lock
);
612 assert(m_image_sync_throttler
== nullptr);
613 m_image_sync_throttler
= ImageSyncThrottler
<I
>::create();
615 m_leader_instance_id
= m_instance_id
;
616 unsuspend_notify_requests();
619 template <typename I
>
620 void InstanceWatcher
<I
>::handle_release_leader() {
623 Mutex::Locker
locker(m_lock
);
625 assert(m_image_sync_throttler
!= nullptr);
627 m_leader_instance_id
.clear();
629 m_image_sync_throttler
->drain(-ESTALE
);
630 m_image_sync_throttler
->destroy();
631 m_image_sync_throttler
= nullptr;
634 template <typename I
>
635 void InstanceWatcher
<I
>::handle_update_leader(
636 const std::string
&leader_instance_id
) {
637 dout(20) << "leader_instance_id=" << leader_instance_id
<< dendl
;
639 Mutex::Locker
locker(m_lock
);
641 m_leader_instance_id
= leader_instance_id
;
643 if (!m_leader_instance_id
.empty()) {
644 unsuspend_notify_requests();
648 template <typename I
>
649 void InstanceWatcher
<I
>::cancel_notify_requests(
650 const std::string
&instance_id
) {
651 dout(20) << "instance_id=" << instance_id
<< dendl
;
653 Mutex::Locker
locker(m_lock
);
655 for (auto op
: m_notify_ops
) {
656 if (op
.first
== instance_id
&& !op
.second
->send_to_leader
) {
662 template <typename I
>
663 void InstanceWatcher
<I
>::register_instance() {
664 assert(m_lock
.is_locked());
668 librados::ObjectWriteOperation op
;
669 librbd::cls_client::mirror_instances_add(&op
, m_instance_id
);
670 librados::AioCompletion
*aio_comp
= create_rados_callback
<
671 InstanceWatcher
<I
>, &InstanceWatcher
<I
>::handle_register_instance
>(this);
673 int r
= m_ioctx
.aio_operate(RBD_MIRROR_LEADER
, aio_comp
, &op
);
678 template <typename I
>
679 void InstanceWatcher
<I
>::handle_register_instance(int r
) {
680 dout(20) << "r=" << r
<< dendl
;
682 Context
*on_finish
= nullptr;
684 Mutex::Locker
locker(m_lock
);
687 create_instance_object();
691 derr
<< "error registering instance: " << cpp_strerror(r
) << dendl
;
693 std::swap(on_finish
, m_on_finish
);
695 on_finish
->complete(r
);
699 template <typename I
>
700 void InstanceWatcher
<I
>::create_instance_object() {
703 assert(m_lock
.is_locked());
705 librados::ObjectWriteOperation op
;
708 librados::AioCompletion
*aio_comp
= create_rados_callback
<
710 &InstanceWatcher
<I
>::handle_create_instance_object
>(this);
711 int r
= m_ioctx
.aio_operate(m_oid
, aio_comp
, &op
);
716 template <typename I
>
717 void InstanceWatcher
<I
>::handle_create_instance_object(int r
) {
718 dout(20) << "r=" << r
<< dendl
;
720 Mutex::Locker
locker(m_lock
);
723 derr
<< "error creating " << m_oid
<< " object: " << cpp_strerror(r
)
727 unregister_instance();
734 template <typename I
>
735 void InstanceWatcher
<I
>::register_watch() {
738 assert(m_lock
.is_locked());
740 Context
*ctx
= create_async_context_callback(
741 m_work_queue
, create_context_callback
<
742 InstanceWatcher
<I
>, &InstanceWatcher
<I
>::handle_register_watch
>(this));
744 librbd::Watcher::register_watch(ctx
);
747 template <typename I
>
748 void InstanceWatcher
<I
>::handle_register_watch(int r
) {
749 dout(20) << "r=" << r
<< dendl
;
751 Mutex::Locker
locker(m_lock
);
754 derr
<< "error registering instance watcher for " << m_oid
<< " object: "
755 << cpp_strerror(r
) << dendl
;
758 remove_instance_object();
765 template <typename I
>
766 void InstanceWatcher
<I
>::acquire_lock() {
769 assert(m_lock
.is_locked());
771 Context
*ctx
= create_async_context_callback(
772 m_work_queue
, create_context_callback
<
773 InstanceWatcher
<I
>, &InstanceWatcher
<I
>::handle_acquire_lock
>(this));
775 m_instance_lock
->acquire_lock(ctx
);
778 template <typename I
>
779 void InstanceWatcher
<I
>::handle_acquire_lock(int r
) {
780 dout(20) << "r=" << r
<< dendl
;
782 Context
*on_finish
= nullptr;
784 Mutex::Locker
locker(m_lock
);
788 derr
<< "error acquiring instance lock: " << cpp_strerror(r
) << dendl
;
795 std::swap(on_finish
, m_on_finish
);
798 on_finish
->complete(r
);
801 template <typename I
>
802 void InstanceWatcher
<I
>::release_lock() {
805 assert(m_lock
.is_locked());
807 Context
*ctx
= create_async_context_callback(
808 m_work_queue
, create_context_callback
<
809 InstanceWatcher
<I
>, &InstanceWatcher
<I
>::handle_release_lock
>(this));
811 m_instance_lock
->shut_down(ctx
);
814 template <typename I
>
815 void InstanceWatcher
<I
>::handle_release_lock(int r
) {
816 dout(20) << "r=" << r
<< dendl
;
818 Mutex::Locker
locker(m_lock
);
821 derr
<< "error releasing instance lock: " << cpp_strerror(r
) << dendl
;
827 template <typename I
>
828 void InstanceWatcher
<I
>::unregister_watch() {
831 assert(m_lock
.is_locked());
833 Context
*ctx
= create_async_context_callback(
834 m_work_queue
, create_context_callback
<
835 InstanceWatcher
<I
>, &InstanceWatcher
<I
>::handle_unregister_watch
>(this));
837 librbd::Watcher::unregister_watch(ctx
);
840 template <typename I
>
841 void InstanceWatcher
<I
>::handle_unregister_watch(int r
) {
842 dout(20) << "r=" << r
<< dendl
;
845 derr
<< "error unregistering instance watcher for " << m_oid
<< " object: "
846 << cpp_strerror(r
) << dendl
;
849 Mutex::Locker
locker(m_lock
);
850 remove_instance_object();
853 template <typename I
>
854 void InstanceWatcher
<I
>::remove_instance_object() {
855 assert(m_lock
.is_locked());
859 librados::ObjectWriteOperation op
;
862 librados::AioCompletion
*aio_comp
= create_rados_callback
<
864 &InstanceWatcher
<I
>::handle_remove_instance_object
>(this);
865 int r
= m_ioctx
.aio_operate(m_oid
, aio_comp
, &op
);
870 template <typename I
>
871 void InstanceWatcher
<I
>::handle_remove_instance_object(int r
) {
872 dout(20) << "r=" << r
<< dendl
;
874 if (m_removing
&& r
== -ENOENT
) {
879 derr
<< "error removing " << m_oid
<< " object: " << cpp_strerror(r
)
883 Mutex::Locker
locker(m_lock
);
884 unregister_instance();
887 template <typename I
>
888 void InstanceWatcher
<I
>::unregister_instance() {
891 assert(m_lock
.is_locked());
893 librados::ObjectWriteOperation op
;
894 librbd::cls_client::mirror_instances_remove(&op
, m_instance_id
);
895 librados::AioCompletion
*aio_comp
= create_rados_callback
<
896 InstanceWatcher
<I
>, &InstanceWatcher
<I
>::handle_unregister_instance
>(this);
898 int r
= m_ioctx
.aio_operate(RBD_MIRROR_LEADER
, aio_comp
, &op
);
903 template <typename I
>
904 void InstanceWatcher
<I
>::handle_unregister_instance(int r
) {
905 dout(20) << "r=" << r
<< dendl
;
908 derr
<< "error unregistering instance: " << cpp_strerror(r
) << dendl
;
911 Mutex::Locker
locker(m_lock
);
912 wait_for_notify_ops();
915 template <typename I
>
916 void InstanceWatcher
<I
>::wait_for_notify_ops() {
919 assert(m_lock
.is_locked());
921 for (auto op
: m_notify_ops
) {
925 Context
*ctx
= create_async_context_callback(
926 m_work_queue
, create_context_callback
<
927 InstanceWatcher
<I
>, &InstanceWatcher
<I
>::handle_wait_for_notify_ops
>(this));
929 m_notify_op_tracker
.wait_for_ops(ctx
);
932 template <typename I
>
933 void InstanceWatcher
<I
>::handle_wait_for_notify_ops(int r
) {
934 dout(20) << "r=" << r
<< dendl
;
938 Context
*on_finish
= nullptr;
940 Mutex::Locker
locker(m_lock
);
942 assert(m_notify_ops
.empty());
944 std::swap(on_finish
, m_on_finish
);
951 on_finish
->complete(r
);
954 template <typename I
>
955 void InstanceWatcher
<I
>::get_instance_locker() {
958 assert(m_lock
.is_locked());
960 Context
*ctx
= create_async_context_callback(
961 m_work_queue
, create_context_callback
<
962 InstanceWatcher
<I
>, &InstanceWatcher
<I
>::handle_get_instance_locker
>(this));
964 m_instance_lock
->get_locker(&m_instance_locker
, ctx
);
967 template <typename I
>
968 void InstanceWatcher
<I
>::handle_get_instance_locker(int r
) {
969 dout(20) << "r=" << r
<< dendl
;
971 Mutex::Locker
locker(m_lock
);
975 derr
<< "error retrieving instance locker: " << cpp_strerror(r
) << dendl
;
977 remove_instance_object();
981 break_instance_lock();
984 template <typename I
>
985 void InstanceWatcher
<I
>::break_instance_lock() {
988 assert(m_lock
.is_locked());
990 Context
*ctx
= create_async_context_callback(
991 m_work_queue
, create_context_callback
<
992 InstanceWatcher
<I
>, &InstanceWatcher
<I
>::handle_break_instance_lock
>(this));
994 m_instance_lock
->break_lock(m_instance_locker
, true, ctx
);
997 template <typename I
>
998 void InstanceWatcher
<I
>::handle_break_instance_lock(int r
) {
999 dout(20) << "r=" << r
<< dendl
;
1001 Mutex::Locker
locker(m_lock
);
1005 derr
<< "error breaking instance lock: " << cpp_strerror(r
) << dendl
;
1007 remove_instance_object();
1011 remove_instance_object();
1014 template <typename I
>
1015 void InstanceWatcher
<I
>::suspend_notify_request(C_NotifyInstanceRequest
*req
) {
1016 dout(20) << req
<< dendl
;
1018 assert(m_lock
.is_locked());
1020 auto result
= m_suspended_ops
.insert(req
).second
;
1024 template <typename I
>
1025 bool InstanceWatcher
<I
>::unsuspend_notify_request(
1026 C_NotifyInstanceRequest
*req
) {
1027 dout(20) << req
<< dendl
;
1029 assert(m_lock
.is_locked());
1031 auto result
= m_suspended_ops
.erase(req
);
1040 template <typename I
>
1041 void InstanceWatcher
<I
>::unsuspend_notify_requests() {
1044 assert(m_lock
.is_locked());
1046 std::set
<C_NotifyInstanceRequest
*> suspended_ops
;
1047 std::swap(m_suspended_ops
, suspended_ops
);
1049 for (auto op
: suspended_ops
) {
1054 template <typename I
>
1055 Context
*InstanceWatcher
<I
>::prepare_request(const std::string
&instance_id
,
1056 uint64_t request_id
,
1057 C_NotifyAck
*on_notify_ack
) {
1058 dout(20) << "instance_id=" << instance_id
<< ", request_id=" << request_id
1061 Mutex::Locker
locker(m_lock
);
1063 Context
*ctx
= nullptr;
1064 Request
request(instance_id
, request_id
);
1065 auto it
= m_requests
.find(request
);
1067 if (it
!= m_requests
.end()) {
1068 dout(20) << "duplicate for in-progress request" << dendl
;
1069 delete it
->on_notify_ack
;
1070 m_requests
.erase(it
);
1072 ctx
= create_async_context_callback(
1073 m_work_queue
, new FunctionContext(
1074 [this, instance_id
, request_id
] (int r
) {
1075 complete_request(instance_id
, request_id
, r
);
1079 request
.on_notify_ack
= on_notify_ack
;
1080 m_requests
.insert(request
);
1084 template <typename I
>
1085 void InstanceWatcher
<I
>::complete_request(const std::string
&instance_id
,
1086 uint64_t request_id
, int r
) {
1087 dout(20) << "instance_id=" << instance_id
<< ", request_id=" << request_id
1090 C_NotifyAck
*on_notify_ack
;
1092 Mutex::Locker
locker(m_lock
);
1093 Request
request(instance_id
, request_id
);
1094 auto it
= m_requests
.find(request
);
1095 assert(it
!= m_requests
.end());
1096 on_notify_ack
= it
->on_notify_ack
;
1097 m_requests
.erase(it
);
1100 ::encode(NotifyAckPayload(instance_id
, request_id
, r
), on_notify_ack
->out
);
1101 on_notify_ack
->complete(0);
1104 template <typename I
>
1105 void InstanceWatcher
<I
>::handle_notify(uint64_t notify_id
, uint64_t handle
,
1106 uint64_t notifier_id
, bufferlist
&bl
) {
1107 dout(20) << "notify_id=" << notify_id
<< ", handle=" << handle
<< ", "
1108 << "notifier_id=" << notifier_id
<< dendl
;
1110 auto ctx
= new C_NotifyAck(this, notify_id
, handle
);
1112 NotifyMessage notify_message
;
1114 bufferlist::iterator iter
= bl
.begin();
1115 ::decode(notify_message
, iter
);
1116 } catch (const buffer::error
&err
) {
1117 derr
<< "error decoding image notification: " << err
.what() << dendl
;
1122 apply_visitor(HandlePayloadVisitor(this, stringify(notifier_id
), ctx
),
1123 notify_message
.payload
);
1126 template <typename I
>
1127 void InstanceWatcher
<I
>::handle_image_acquire(
1128 const std::string
&global_image_id
, Context
*on_finish
) {
1129 dout(20) << "global_image_id=" << global_image_id
<< dendl
;
1131 auto ctx
= new FunctionContext(
1132 [this, global_image_id
, on_finish
] (int r
) {
1133 m_instance_replayer
->acquire_image(this, global_image_id
, on_finish
);
1134 m_notify_op_tracker
.finish_op();
1137 m_notify_op_tracker
.start_op();
1138 m_work_queue
->queue(ctx
, 0);
1141 template <typename I
>
1142 void InstanceWatcher
<I
>::handle_image_release(
1143 const std::string
&global_image_id
, Context
*on_finish
) {
1144 dout(20) << "global_image_id=" << global_image_id
<< dendl
;
1146 auto ctx
= new FunctionContext(
1147 [this, global_image_id
, on_finish
] (int r
) {
1148 m_instance_replayer
->release_image(global_image_id
, on_finish
);
1149 m_notify_op_tracker
.finish_op();
1152 m_notify_op_tracker
.start_op();
1153 m_work_queue
->queue(ctx
, 0);
1156 template <typename I
>
1157 void InstanceWatcher
<I
>::handle_peer_image_removed(
1158 const std::string
&global_image_id
, const std::string
&peer_mirror_uuid
,
1159 Context
*on_finish
) {
1160 dout(20) << "global_image_id=" << global_image_id
<< ", "
1161 << "peer_mirror_uuid=" << peer_mirror_uuid
<< dendl
;
1163 auto ctx
= new FunctionContext(
1164 [this, peer_mirror_uuid
, global_image_id
, on_finish
] (int r
) {
1165 m_instance_replayer
->remove_peer_image(global_image_id
,
1166 peer_mirror_uuid
, on_finish
);
1167 m_notify_op_tracker
.finish_op();
1170 m_notify_op_tracker
.start_op();
1171 m_work_queue
->queue(ctx
, 0);
1174 template <typename I
>
1175 void InstanceWatcher
<I
>::handle_sync_request(const std::string
&instance_id
,
1176 const std::string
&sync_id
,
1177 Context
*on_finish
) {
1178 dout(20) << "instance_id=" << instance_id
<< ", sync_id=" << sync_id
<< dendl
;
1180 Mutex::Locker
locker(m_lock
);
1182 if (m_image_sync_throttler
== nullptr) {
1183 dout(20) << "sync request for non-leader" << dendl
;
1184 m_work_queue
->queue(on_finish
, -ESTALE
);
1188 Context
*on_start
= create_async_context_callback(
1189 m_work_queue
, new FunctionContext(
1190 [this, instance_id
, sync_id
, on_finish
] (int r
) {
1191 dout(20) << "handle_sync_request: finish: instance_id=" << instance_id
1192 << ", sync_id=" << sync_id
<< ", r=" << r
<< dendl
;
1194 notify_sync_start(instance_id
, sync_id
);
1196 on_finish
->complete(r
);
1198 m_image_sync_throttler
->start_op(sync_id
, on_start
);
1201 template <typename I
>
1202 void InstanceWatcher
<I
>::handle_sync_start(const std::string
&instance_id
,
1203 const std::string
&sync_id
,
1204 Context
*on_finish
) {
1205 dout(20) << "instance_id=" << instance_id
<< ", sync_id=" << sync_id
<< dendl
;
1207 Mutex::Locker
locker(m_lock
);
1209 auto it
= m_inflight_sync_reqs
.find(sync_id
);
1210 if (it
== m_inflight_sync_reqs
.end()) {
1211 dout(20) << "not found" << dendl
;
1212 m_work_queue
->queue(on_finish
, 0);
1216 auto sync_ctx
= it
->second
;
1218 if (sync_ctx
->on_complete
!= nullptr) {
1219 dout(20) << "duplicate request" << dendl
;
1220 m_work_queue
->queue(sync_ctx
->on_complete
, -ESTALE
);
1223 sync_ctx
->on_complete
= on_finish
;
1226 template <typename I
>
1227 void InstanceWatcher
<I
>::handle_payload(const std::string
&instance_id
,
1228 const ImageAcquirePayload
&payload
,
1229 C_NotifyAck
*on_notify_ack
) {
1230 dout(20) << "image_acquire: instance_id=" << instance_id
<< ", "
1231 << "request_id=" << payload
.request_id
<< dendl
;
1233 auto on_finish
= prepare_request(instance_id
, payload
.request_id
,
1235 if (on_finish
!= nullptr) {
1236 handle_image_acquire(payload
.global_image_id
, on_finish
);
1240 template <typename I
>
1241 void InstanceWatcher
<I
>::handle_payload(const std::string
&instance_id
,
1242 const ImageReleasePayload
&payload
,
1243 C_NotifyAck
*on_notify_ack
) {
1244 dout(20) << "image_release: instance_id=" << instance_id
<< ", "
1245 << "request_id=" << payload
.request_id
<< dendl
;
1247 auto on_finish
= prepare_request(instance_id
, payload
.request_id
,
1249 if (on_finish
!= nullptr) {
1250 handle_image_release(payload
.global_image_id
, on_finish
);
1254 template <typename I
>
1255 void InstanceWatcher
<I
>::handle_payload(const std::string
&instance_id
,
1256 const PeerImageRemovedPayload
&payload
,
1257 C_NotifyAck
*on_notify_ack
) {
1258 dout(20) << "remove_peer_image: instance_id=" << instance_id
<< ", "
1259 << "request_id=" << payload
.request_id
<< dendl
;
1261 auto on_finish
= prepare_request(instance_id
, payload
.request_id
,
1263 if (on_finish
!= nullptr) {
1264 handle_peer_image_removed(payload
.global_image_id
, payload
.peer_mirror_uuid
,
1269 template <typename I
>
1270 void InstanceWatcher
<I
>::handle_payload(const std::string
&instance_id
,
1271 const SyncRequestPayload
&payload
,
1272 C_NotifyAck
*on_notify_ack
) {
1273 dout(20) << "sync_request: instance_id=" << instance_id
<< ", "
1274 << "request_id=" << payload
.request_id
<< dendl
;
1276 auto on_finish
= prepare_request(instance_id
, payload
.request_id
,
1278 if (on_finish
== nullptr) {
1282 handle_sync_request(instance_id
, payload
.sync_id
, on_finish
);
1285 template <typename I
>
1286 void InstanceWatcher
<I
>::handle_payload(const std::string
&instance_id
,
1287 const SyncStartPayload
&payload
,
1288 C_NotifyAck
*on_notify_ack
) {
1289 dout(20) << "sync_start: instance_id=" << instance_id
<< ", "
1290 << "request_id=" << payload
.request_id
<< dendl
;
1292 auto on_finish
= prepare_request(instance_id
, payload
.request_id
,
1294 if (on_finish
== nullptr) {
1298 handle_sync_start(instance_id
, payload
.sync_id
, on_finish
);
1301 template <typename I
>
1302 void InstanceWatcher
<I
>::handle_payload(const std::string
&instance_id
,
1303 const UnknownPayload
&payload
,
1304 C_NotifyAck
*on_notify_ack
) {
1305 dout(20) << "unknown: instance_id=" << instance_id
<< dendl
;
1307 on_notify_ack
->complete(0);
1310 } // namespace mirror
1313 template class rbd::mirror::InstanceWatcher
<librbd::ImageCtx
>;