]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/InstanceWatcher.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / tools / rbd_mirror / InstanceWatcher.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "InstanceWatcher.h"
5 #include "include/atomic.h"
6 #include "include/stringify.h"
7 #include "common/debug.h"
8 #include "common/errno.h"
9 #include "cls/rbd/cls_rbd_client.h"
10 #include "librbd/ManagedLock.h"
11 #include "librbd/Utils.h"
12 #include "InstanceReplayer.h"
13
14 #define dout_context g_ceph_context
15 #define dout_subsys ceph_subsys_rbd_mirror
16 #undef dout_prefix
17 #define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: "
18
19 namespace rbd {
20 namespace mirror {
21
22 using namespace instance_watcher;
23
24 using librbd::util::create_async_context_callback;
25 using librbd::util::create_context_callback;
26 using librbd::util::create_rados_callback;
27 using librbd::util::unique_lock_name;
28
29 namespace {
30
31 struct C_GetInstances : public Context {
32 std::vector<std::string> *instance_ids;
33 Context *on_finish;
34 bufferlist out_bl;
35
36 C_GetInstances(std::vector<std::string> *instance_ids, Context *on_finish)
37 : instance_ids(instance_ids), on_finish(on_finish) {
38 }
39
40 void finish(int r) override {
41 dout(20) << "C_GetInstances: " << this << " " << __func__ << ": r=" << r
42 << dendl;
43
44 if (r == 0) {
45 bufferlist::iterator it = out_bl.begin();
46 r = librbd::cls_client::mirror_instances_list_finish(&it, instance_ids);
47 } else if (r == -ENOENT) {
48 r = 0;
49 }
50 on_finish->complete(r);
51 }
52 };
53
54 template <typename I>
55 struct C_RemoveInstanceRequest : public Context {
56 InstanceWatcher<I> instance_watcher;
57 Context *on_finish;
58
59 C_RemoveInstanceRequest(librados::IoCtx &io_ctx, ContextWQ *work_queue,
60 const std::string &instance_id, Context *on_finish)
61 : instance_watcher(io_ctx, work_queue, nullptr, instance_id),
62 on_finish(on_finish) {
63 }
64
65 void send() {
66 dout(20) << "C_RemoveInstanceRequest: " << this << " " << __func__ << dendl;
67
68 instance_watcher.remove(this);
69 }
70
71 void finish(int r) override {
72 dout(20) << "C_RemoveInstanceRequest: " << this << " " << __func__ << ": r="
73 << r << dendl;
74 assert(r == 0);
75
76 on_finish->complete(r);
77 }
78 };
79
80 } // anonymous namespace
81
82 template <typename I>
83 struct InstanceWatcher<I>::C_NotifyInstanceRequest : public Context {
84 InstanceWatcher<I> *instance_watcher;
85 librbd::watcher::Notifier notifier;
86 std::string instance_id;
87 uint64_t request_id;
88 bufferlist bl;
89 Context *on_finish;
90 librbd::watcher::NotifyResponse response;
91 atomic_t canceling;
92
93 C_NotifyInstanceRequest(InstanceWatcher<I> *instance_watcher,
94 const std::string &instance_id, uint64_t request_id,
95 bufferlist &&bl, Context *on_finish)
96 : instance_watcher(instance_watcher),
97 notifier(instance_watcher->m_work_queue, instance_watcher->m_ioctx,
98 RBD_MIRROR_INSTANCE_PREFIX + instance_id),
99 instance_id(instance_id), request_id(request_id), bl(bl),
100 on_finish(on_finish) {
101 instance_watcher->m_notify_op_tracker.start_op();
102 assert(instance_watcher->m_lock.is_locked());
103 auto result = instance_watcher->m_notify_ops.insert(
104 std::make_pair(instance_id, this)).second;
105 assert(result);
106 }
107
108 void send() {
109 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl;
110
111 notifier.notify(bl, &response, this);
112 }
113
114 void cancel() {
115 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl;
116
117 canceling.set(1);
118 }
119
120 void finish(int r) override {
121 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << ": r="
122 << r << dendl;
123
124 if (r == 0 || r == -ETIMEDOUT) {
125 bool found = false;
126 for (auto &it : response.acks) {
127 auto &bl = it.second;
128 if (it.second.length() == 0) {
129 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
130 << ": no payload in ack, ignoring" << dendl;
131 continue;
132 }
133 try {
134 auto iter = bl.begin();
135 NotifyAckPayload ack;
136 ::decode(ack, iter);
137 if (ack.instance_id != instance_watcher->get_instance_id()) {
138 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
139 << ": ack instance_id (" << ack.instance_id << ") "
140 << "does not match, ignoring" << dendl;
141 continue;
142 }
143 if (ack.request_id != request_id) {
144 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
145 << ": ack request_id (" << ack.request_id << ") "
146 << "does not match, ignoring" << dendl;
147 continue;
148 }
149 r = ack.ret_val;
150 found = true;
151 break;
152 } catch (const buffer::error &err) {
153 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
154 << ": failed to decode ack: " << err.what() << dendl;
155 continue;
156 }
157 }
158
159 if (!found) {
160 if (r == -ETIMEDOUT) {
161 if (canceling.read()) {
162 r = -ECANCELED;
163 } else {
164 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
165 << ": resending after timeout" << dendl;
166 send();
167 return;
168 }
169 } else {
170 r = -EINVAL;
171 }
172 }
173 }
174
175 instance_watcher->m_notify_op_tracker.finish_op();
176 on_finish->complete(r);
177
178 Mutex::Locker locker(instance_watcher->m_lock);
179 auto result = instance_watcher->m_notify_ops.erase(
180 std::make_pair(instance_id, this));
181 assert(result > 0);
182 delete this;
183 }
184
185 void complete(int r) override {
186 finish(r);
187 }
188 };
189
190 #undef dout_prefix
191 #define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: " \
192 << this << " " << __func__ << ": "
193 template <typename I>
194 void InstanceWatcher<I>::get_instances(librados::IoCtx &io_ctx,
195 std::vector<std::string> *instance_ids,
196 Context *on_finish) {
197 librados::ObjectReadOperation op;
198 librbd::cls_client::mirror_instances_list_start(&op);
199 C_GetInstances *ctx = new C_GetInstances(instance_ids, on_finish);
200 librados::AioCompletion *aio_comp = create_rados_callback(ctx);
201
202 int r = io_ctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op, &ctx->out_bl);
203 assert(r == 0);
204 aio_comp->release();
205 }
206
207 template <typename I>
208 void InstanceWatcher<I>::remove_instance(librados::IoCtx &io_ctx,
209 ContextWQ *work_queue,
210 const std::string &instance_id,
211 Context *on_finish) {
212 auto req = new C_RemoveInstanceRequest<I>(io_ctx, work_queue, instance_id,
213 on_finish);
214 req->send();
215 }
216
217 template <typename I>
218 InstanceWatcher<I> *InstanceWatcher<I>::create(
219 librados::IoCtx &io_ctx, ContextWQ *work_queue,
220 InstanceReplayer<I> *instance_replayer) {
221 return new InstanceWatcher<I>(io_ctx, work_queue, instance_replayer,
222 stringify(io_ctx.get_instance_id()));
223 }
224
225 template <typename I>
226 InstanceWatcher<I>::InstanceWatcher(librados::IoCtx &io_ctx,
227 ContextWQ *work_queue,
228 InstanceReplayer<I> *instance_replayer,
229 const std::string &instance_id)
230 : Watcher(io_ctx, work_queue, RBD_MIRROR_INSTANCE_PREFIX + instance_id),
231 m_instance_replayer(instance_replayer), m_instance_id(instance_id),
232 m_lock(unique_lock_name("rbd::mirror::InstanceWatcher::m_lock", this)),
233 m_instance_lock(librbd::ManagedLock<I>::create(
234 m_ioctx, m_work_queue, m_oid, this, librbd::managed_lock::EXCLUSIVE, true,
235 m_cct->_conf->rbd_blacklist_expire_seconds)) {
236 }
237
238 template <typename I>
239 InstanceWatcher<I>::~InstanceWatcher() {
240 m_instance_lock->destroy();
241 }
242
243 template <typename I>
244 int InstanceWatcher<I>::init() {
245 C_SaferCond init_ctx;
246 init(&init_ctx);
247 return init_ctx.wait();
248 }
249
250 template <typename I>
251 void InstanceWatcher<I>::init(Context *on_finish) {
252 dout(20) << "instance_id=" << m_instance_id << dendl;
253
254 Mutex::Locker locker(m_lock);
255
256 assert(m_on_finish == nullptr);
257 m_on_finish = on_finish;
258 m_ret_val = 0;
259
260 register_instance();
261 }
262
263 template <typename I>
264 void InstanceWatcher<I>::shut_down() {
265 C_SaferCond shut_down_ctx;
266 shut_down(&shut_down_ctx);
267 int r = shut_down_ctx.wait();
268 assert(r == 0);
269 }
270
271 template <typename I>
272 void InstanceWatcher<I>::shut_down(Context *on_finish) {
273 dout(20) << dendl;
274
275 Mutex::Locker locker(m_lock);
276
277 assert(m_on_finish == nullptr);
278 m_on_finish = on_finish;
279 m_ret_val = 0;
280
281 release_lock();
282 }
283
284 template <typename I>
285 void InstanceWatcher<I>::remove(Context *on_finish) {
286 dout(20) << dendl;
287
288 Mutex::Locker locker(m_lock);
289
290 assert(m_on_finish == nullptr);
291 m_on_finish = on_finish;
292 m_ret_val = 0;
293 m_removing = true;
294
295 get_instance_locker();
296 }
297
298 template <typename I>
299 void InstanceWatcher<I>::notify_image_acquire(
300 const std::string &instance_id, const std::string &global_image_id,
301 const std::string &peer_mirror_uuid, const std::string &peer_image_id,
302 Context *on_notify_ack) {
303 dout(20) << "instance_id=" << instance_id << ", global_image_id="
304 << global_image_id << dendl;
305
306 Mutex::Locker locker(m_lock);
307
308 assert(m_on_finish == nullptr);
309
310 if (instance_id == m_instance_id) {
311 handle_image_acquire(global_image_id, peer_mirror_uuid, peer_image_id,
312 on_notify_ack);
313 } else {
314 uint64_t request_id = ++m_request_seq;
315 bufferlist bl;
316 ::encode(NotifyMessage{ImageAcquirePayload{
317 request_id, global_image_id, peer_mirror_uuid, peer_image_id}}, bl);
318 auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
319 std::move(bl), on_notify_ack);
320 req->send();
321 }
322 }
323
324 template <typename I>
325 void InstanceWatcher<I>::notify_image_release(
326 const std::string &instance_id, const std::string &global_image_id,
327 const std::string &peer_mirror_uuid, const std::string &peer_image_id,
328 bool schedule_delete, Context *on_notify_ack) {
329 dout(20) << "instance_id=" << instance_id << ", global_image_id="
330 << global_image_id << dendl;
331
332 Mutex::Locker locker(m_lock);
333
334 assert(m_on_finish == nullptr);
335
336 if (instance_id == m_instance_id) {
337 handle_image_release(global_image_id, peer_mirror_uuid, peer_image_id,
338 schedule_delete, on_notify_ack);
339 } else {
340 uint64_t request_id = ++m_request_seq;
341 bufferlist bl;
342 ::encode(NotifyMessage{ImageReleasePayload{
343 request_id, global_image_id, peer_mirror_uuid, peer_image_id,
344 schedule_delete}}, bl);
345 auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
346 std::move(bl), on_notify_ack);
347 req->send();
348 }
349 }
350
351 template <typename I>
352 void InstanceWatcher<I>::cancel_notify_requests(
353 const std::string &instance_id) {
354 dout(20) << "instance_id=" << instance_id << dendl;
355
356 Mutex::Locker locker(m_lock);
357
358 for (auto op : m_notify_ops) {
359 if (op.first == instance_id) {
360 op.second->cancel();
361 }
362 }
363 }
364
365
366 template <typename I>
367 void InstanceWatcher<I>::register_instance() {
368 assert(m_lock.is_locked());
369
370 dout(20) << dendl;
371
372 librados::ObjectWriteOperation op;
373 librbd::cls_client::mirror_instances_add(&op, m_instance_id);
374 librados::AioCompletion *aio_comp = create_rados_callback<
375 InstanceWatcher<I>, &InstanceWatcher<I>::handle_register_instance>(this);
376
377 int r = m_ioctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op);
378 assert(r == 0);
379 aio_comp->release();
380 }
381
382 template <typename I>
383 void InstanceWatcher<I>::handle_register_instance(int r) {
384 dout(20) << "r=" << r << dendl;
385
386 Context *on_finish = nullptr;
387 {
388 Mutex::Locker locker(m_lock);
389
390 if (r == 0) {
391 create_instance_object();
392 return;
393 }
394
395 derr << "error registering instance: " << cpp_strerror(r) << dendl;
396
397 std::swap(on_finish, m_on_finish);
398 }
399 on_finish->complete(r);
400 }
401
402
403 template <typename I>
404 void InstanceWatcher<I>::create_instance_object() {
405 dout(20) << dendl;
406
407 assert(m_lock.is_locked());
408
409 librados::ObjectWriteOperation op;
410 op.create(true);
411
412 librados::AioCompletion *aio_comp = create_rados_callback<
413 InstanceWatcher<I>,
414 &InstanceWatcher<I>::handle_create_instance_object>(this);
415 int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
416 assert(r == 0);
417 aio_comp->release();
418 }
419
420 template <typename I>
421 void InstanceWatcher<I>::handle_create_instance_object(int r) {
422 dout(20) << "r=" << r << dendl;
423
424 Mutex::Locker locker(m_lock);
425
426 if (r < 0) {
427 derr << "error creating " << m_oid << " object: " << cpp_strerror(r)
428 << dendl;
429
430 m_ret_val = r;
431 unregister_instance();
432 return;
433 }
434
435 register_watch();
436 }
437
438 template <typename I>
439 void InstanceWatcher<I>::register_watch() {
440 dout(20) << dendl;
441
442 assert(m_lock.is_locked());
443
444 Context *ctx = create_async_context_callback(
445 m_work_queue, create_context_callback<
446 InstanceWatcher<I>, &InstanceWatcher<I>::handle_register_watch>(this));
447
448 librbd::Watcher::register_watch(ctx);
449 }
450
451 template <typename I>
452 void InstanceWatcher<I>::handle_register_watch(int r) {
453 dout(20) << "r=" << r << dendl;
454
455 Mutex::Locker locker(m_lock);
456
457 if (r < 0) {
458 derr << "error registering instance watcher for " << m_oid << " object: "
459 << cpp_strerror(r) << dendl;
460
461 m_ret_val = r;
462 remove_instance_object();
463 return;
464 }
465
466 acquire_lock();
467 }
468
469 template <typename I>
470 void InstanceWatcher<I>::acquire_lock() {
471 dout(20) << dendl;
472
473 assert(m_lock.is_locked());
474
475 Context *ctx = create_async_context_callback(
476 m_work_queue, create_context_callback<
477 InstanceWatcher<I>, &InstanceWatcher<I>::handle_acquire_lock>(this));
478
479 m_instance_lock->acquire_lock(ctx);
480 }
481
482 template <typename I>
483 void InstanceWatcher<I>::handle_acquire_lock(int r) {
484 dout(20) << "r=" << r << dendl;
485
486 Context *on_finish = nullptr;
487 {
488 Mutex::Locker locker(m_lock);
489
490 if (r < 0) {
491
492 derr << "error acquiring instance lock: " << cpp_strerror(r) << dendl;
493
494 m_ret_val = r;
495 unregister_watch();
496 return;
497 }
498
499 std::swap(on_finish, m_on_finish);
500 }
501
502 on_finish->complete(r);
503 }
504
505 template <typename I>
506 void InstanceWatcher<I>::release_lock() {
507 dout(20) << dendl;
508
509 assert(m_lock.is_locked());
510
511 Context *ctx = create_async_context_callback(
512 m_work_queue, create_context_callback<
513 InstanceWatcher<I>, &InstanceWatcher<I>::handle_release_lock>(this));
514
515 m_instance_lock->shut_down(ctx);
516 }
517
518 template <typename I>
519 void InstanceWatcher<I>::handle_release_lock(int r) {
520 dout(20) << "r=" << r << dendl;
521
522 Mutex::Locker locker(m_lock);
523
524 if (r < 0) {
525 derr << "error releasing instance lock: " << cpp_strerror(r) << dendl;
526 }
527
528 unregister_watch();
529 }
530
531 template <typename I>
532 void InstanceWatcher<I>::unregister_watch() {
533 dout(20) << dendl;
534
535 assert(m_lock.is_locked());
536
537 Context *ctx = create_async_context_callback(
538 m_work_queue, create_context_callback<
539 InstanceWatcher<I>, &InstanceWatcher<I>::handle_unregister_watch>(this));
540
541 librbd::Watcher::unregister_watch(ctx);
542 }
543
544 template <typename I>
545 void InstanceWatcher<I>::handle_unregister_watch(int r) {
546 dout(20) << "r=" << r << dendl;
547
548 if (r < 0) {
549 derr << "error unregistering instance watcher for " << m_oid << " object: "
550 << cpp_strerror(r) << dendl;
551 }
552
553 Mutex::Locker locker(m_lock);
554 remove_instance_object();
555 }
556
557 template <typename I>
558 void InstanceWatcher<I>::remove_instance_object() {
559 assert(m_lock.is_locked());
560
561 dout(20) << dendl;
562
563 librados::ObjectWriteOperation op;
564 op.remove();
565
566 librados::AioCompletion *aio_comp = create_rados_callback<
567 InstanceWatcher<I>,
568 &InstanceWatcher<I>::handle_remove_instance_object>(this);
569 int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
570 assert(r == 0);
571 aio_comp->release();
572 }
573
574 template <typename I>
575 void InstanceWatcher<I>::handle_remove_instance_object(int r) {
576 dout(20) << "r=" << r << dendl;
577
578 if (m_removing && r == -ENOENT) {
579 r = 0;
580 }
581
582 if (r < 0) {
583 derr << "error removing " << m_oid << " object: " << cpp_strerror(r)
584 << dendl;
585 }
586
587 Mutex::Locker locker(m_lock);
588 unregister_instance();
589 }
590
591 template <typename I>
592 void InstanceWatcher<I>::unregister_instance() {
593 dout(20) << dendl;
594
595 assert(m_lock.is_locked());
596
597 librados::ObjectWriteOperation op;
598 librbd::cls_client::mirror_instances_remove(&op, m_instance_id);
599 librados::AioCompletion *aio_comp = create_rados_callback<
600 InstanceWatcher<I>, &InstanceWatcher<I>::handle_unregister_instance>(this);
601
602 int r = m_ioctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op);
603 assert(r == 0);
604 aio_comp->release();
605 }
606
607 template <typename I>
608 void InstanceWatcher<I>::handle_unregister_instance(int r) {
609 dout(20) << "r=" << r << dendl;
610
611 if (r < 0) {
612 derr << "error unregistering instance: " << cpp_strerror(r) << dendl;
613 }
614
615 Mutex::Locker locker(m_lock);
616 wait_for_notify_ops();
617 }
618
619 template <typename I>
620 void InstanceWatcher<I>::wait_for_notify_ops() {
621 dout(20) << dendl;
622
623 assert(m_lock.is_locked());
624
625 for (auto op : m_notify_ops) {
626 op.second->cancel();
627 }
628
629 Context *ctx = create_async_context_callback(
630 m_work_queue, create_context_callback<
631 InstanceWatcher<I>, &InstanceWatcher<I>::handle_wait_for_notify_ops>(this));
632
633 m_notify_op_tracker.wait_for_ops(ctx);
634 }
635
636 template <typename I>
637 void InstanceWatcher<I>::handle_wait_for_notify_ops(int r) {
638 dout(20) << "r=" << r << dendl;
639
640 assert(r == 0);
641
642 Context *on_finish = nullptr;
643 {
644 Mutex::Locker locker(m_lock);
645
646 assert(m_notify_ops.empty());
647
648 std::swap(on_finish, m_on_finish);
649 r = m_ret_val;
650
651 if (m_removing) {
652 m_removing = false;
653 }
654 }
655 on_finish->complete(r);
656 }
657
658 template <typename I>
659 void InstanceWatcher<I>::get_instance_locker() {
660 dout(20) << dendl;
661
662 assert(m_lock.is_locked());
663
664 Context *ctx = create_async_context_callback(
665 m_work_queue, create_context_callback<
666 InstanceWatcher<I>, &InstanceWatcher<I>::handle_get_instance_locker>(this));
667
668 m_instance_lock->get_locker(&m_instance_locker, ctx);
669 }
670
671 template <typename I>
672 void InstanceWatcher<I>::handle_get_instance_locker(int r) {
673 dout(20) << "r=" << r << dendl;
674
675 Mutex::Locker locker(m_lock);
676
677 if (r < 0) {
678 if (r != -ENOENT) {
679 derr << "error retrieving instance locker: " << cpp_strerror(r) << dendl;
680 }
681 remove_instance_object();
682 return;
683 }
684
685 break_instance_lock();
686 }
687
688 template <typename I>
689 void InstanceWatcher<I>::break_instance_lock() {
690 dout(20) << dendl;
691
692 assert(m_lock.is_locked());
693
694 Context *ctx = create_async_context_callback(
695 m_work_queue, create_context_callback<
696 InstanceWatcher<I>, &InstanceWatcher<I>::handle_break_instance_lock>(this));
697
698 m_instance_lock->break_lock(m_instance_locker, true, ctx);
699 }
700
701 template <typename I>
702 void InstanceWatcher<I>::handle_break_instance_lock(int r) {
703 dout(20) << "r=" << r << dendl;
704
705 Mutex::Locker locker(m_lock);
706
707 if (r < 0) {
708 if (r != -ENOENT) {
709 derr << "error breaking instance lock: " << cpp_strerror(r) << dendl;
710 }
711 remove_instance_object();
712 return;
713 }
714
715 remove_instance_object();
716 }
717
718 template <typename I>
719 Context *InstanceWatcher<I>::prepare_request(const std::string &instance_id,
720 uint64_t request_id,
721 C_NotifyAck *on_notify_ack) {
722 dout(20) << "instance_id=" << instance_id << ", request_id=" << request_id
723 << dendl;
724
725 Mutex::Locker locker(m_lock);
726
727 Context *ctx = nullptr;
728 Request request(instance_id, request_id);
729 auto it = m_requests.find(request);
730
731 if (it != m_requests.end()) {
732 dout(20) << "duplicate for in-progress request" << dendl;
733 delete it->on_notify_ack;
734 m_requests.erase(it);
735 } else {
736 ctx = new FunctionContext(
737 [this, instance_id, request_id] (int r) {
738 C_NotifyAck *on_notify_ack = nullptr;
739 {
740 // update request state in the requests list
741 Mutex::Locker locker(m_lock);
742 Request request(instance_id, request_id);
743 auto it = m_requests.find(request);
744 assert(it != m_requests.end());
745 on_notify_ack = it->on_notify_ack;
746 m_requests.erase(it);
747 }
748
749 ::encode(NotifyAckPayload(instance_id, request_id, r),
750 on_notify_ack->out);
751 on_notify_ack->complete(0);
752 });
753 }
754
755 request.on_notify_ack = on_notify_ack;
756 m_requests.insert(request);
757 return ctx;
758 }
759
760 template <typename I>
761 void InstanceWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
762 uint64_t notifier_id, bufferlist &bl) {
763 dout(20) << "notify_id=" << notify_id << ", handle=" << handle << ", "
764 << "notifier_id=" << notifier_id << dendl;
765
766 auto ctx = new C_NotifyAck(this, notify_id, handle);
767
768 NotifyMessage notify_message;
769 try {
770 bufferlist::iterator iter = bl.begin();
771 ::decode(notify_message, iter);
772 } catch (const buffer::error &err) {
773 derr << "error decoding image notification: " << err.what() << dendl;
774 ctx->complete(0);
775 return;
776 }
777
778 apply_visitor(HandlePayloadVisitor(this, stringify(notifier_id), ctx),
779 notify_message.payload);
780 }
781
782 template <typename I>
783 void InstanceWatcher<I>::handle_image_acquire(
784 const std::string &global_image_id, const std::string &peer_mirror_uuid,
785 const std::string &peer_image_id, Context *on_finish) {
786 dout(20) << "global_image_id=" << global_image_id << dendl;
787
788 m_instance_replayer->acquire_image(global_image_id, peer_mirror_uuid,
789 peer_image_id, on_finish);
790 }
791
792 template <typename I>
793 void InstanceWatcher<I>::handle_image_release(
794 const std::string &global_image_id, const std::string &peer_mirror_uuid,
795 const std::string &peer_image_id, bool schedule_delete, Context *on_finish) {
796 dout(20) << "global_image_id=" << global_image_id << dendl;
797
798 m_instance_replayer->release_image(global_image_id, peer_mirror_uuid,
799 peer_image_id, schedule_delete, on_finish);
800 }
801
802 template <typename I>
803 void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
804 const ImageAcquirePayload &payload,
805 C_NotifyAck *on_notify_ack) {
806 dout(20) << "image_acquire: instance_id=" << instance_id << ", "
807 << "request_id=" << payload.request_id << dendl;
808
809 auto on_finish = prepare_request(instance_id, payload.request_id,
810 on_notify_ack);
811 if (on_finish != nullptr) {
812 handle_image_acquire(payload.global_image_id, payload.peer_mirror_uuid,
813 payload.peer_image_id, on_finish);
814 }
815 }
816
817 template <typename I>
818 void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
819 const ImageReleasePayload &payload,
820 C_NotifyAck *on_notify_ack) {
821 dout(20) << "image_release: instance_id=" << instance_id << ", "
822 << "request_id=" << payload.request_id << dendl;
823
824 auto on_finish = prepare_request(instance_id, payload.request_id,
825 on_notify_ack);
826 if (on_finish != nullptr) {
827 handle_image_release(payload.global_image_id, payload.peer_mirror_uuid,
828 payload.peer_image_id, payload.schedule_delete,
829 on_finish);
830 }
831 }
832
833 template <typename I>
834 void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
835 const UnknownPayload &payload,
836 C_NotifyAck *on_notify_ack) {
837 dout(20) << "unknown: instance_id=" << instance_id << dendl;
838
839 on_notify_ack->complete(0);
840 }
841
842 } // namespace mirror
843 } // namespace rbd
844
845 template class rbd::mirror::InstanceWatcher<librbd::ImageCtx>;