]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/InstanceWatcher.cc
04e0a5e342d40721510649fd806f6363cc482096
[ceph.git] / ceph / src / tools / rbd_mirror / InstanceWatcher.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "InstanceWatcher.h"
5 #include "include/stringify.h"
6 #include "common/debug.h"
7 #include "common/errno.h"
8 #include "cls/rbd/cls_rbd_client.h"
9 #include "librbd/ManagedLock.h"
10 #include "librbd/Utils.h"
11 #include "InstanceReplayer.h"
12 #include "ImageSyncThrottler.h"
13
14 #define dout_context g_ceph_context
15 #define dout_subsys ceph_subsys_rbd_mirror
16 #undef dout_prefix
17 #define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: "
18
19 namespace rbd {
20 namespace mirror {
21
22 using namespace instance_watcher;
23
24 using librbd::util::create_async_context_callback;
25 using librbd::util::create_context_callback;
26 using librbd::util::create_rados_callback;
27 using librbd::util::unique_lock_name;
28
29 namespace {
30
31 struct C_GetInstances : public Context {
32 std::vector<std::string> *instance_ids;
33 Context *on_finish;
34 bufferlist out_bl;
35
36 C_GetInstances(std::vector<std::string> *instance_ids, Context *on_finish)
37 : instance_ids(instance_ids), on_finish(on_finish) {
38 }
39
40 void finish(int r) override {
41 dout(20) << "C_GetInstances: " << this << " " << __func__ << ": r=" << r
42 << dendl;
43
44 if (r == 0) {
45 bufferlist::iterator it = out_bl.begin();
46 r = librbd::cls_client::mirror_instances_list_finish(&it, instance_ids);
47 } else if (r == -ENOENT) {
48 r = 0;
49 }
50 on_finish->complete(r);
51 }
52 };
53
54 template <typename I>
55 struct C_RemoveInstanceRequest : public Context {
56 InstanceWatcher<I> instance_watcher;
57 Context *on_finish;
58
59 C_RemoveInstanceRequest(librados::IoCtx &io_ctx, ContextWQ *work_queue,
60 const std::string &instance_id, Context *on_finish)
61 : instance_watcher(io_ctx, work_queue, nullptr, instance_id),
62 on_finish(on_finish) {
63 }
64
65 void send() {
66 dout(20) << "C_RemoveInstanceRequest: " << this << " " << __func__ << dendl;
67
68 instance_watcher.remove(this);
69 }
70
71 void finish(int r) override {
72 dout(20) << "C_RemoveInstanceRequest: " << this << " " << __func__ << ": r="
73 << r << dendl;
74 assert(r == 0);
75
76 on_finish->complete(r);
77 }
78 };
79
80 } // anonymous namespace
81
82 template <typename I>
83 struct InstanceWatcher<I>::C_NotifyInstanceRequest : public Context {
84 InstanceWatcher<I> *instance_watcher;
85 std::string instance_id;
86 uint64_t request_id;
87 bufferlist bl;
88 Context *on_finish;
89 bool send_to_leader;
90 std::unique_ptr<librbd::watcher::Notifier> notifier;
91 librbd::watcher::NotifyResponse response;
92 bool canceling = false;
93
94 C_NotifyInstanceRequest(InstanceWatcher<I> *instance_watcher,
95 const std::string &instance_id, uint64_t request_id,
96 bufferlist &&bl, Context *on_finish)
97 : instance_watcher(instance_watcher), instance_id(instance_id),
98 request_id(request_id), bl(bl), on_finish(on_finish),
99 send_to_leader(instance_id.empty()) {
100 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
101 << ": instance_watcher=" << instance_watcher << ", instance_id="
102 << instance_id << ", request_id=" << request_id << dendl;
103
104 assert(instance_watcher->m_lock.is_locked());
105
106 if (!send_to_leader) {
107 assert((!instance_id.empty()));
108 notifier.reset(new librbd::watcher::Notifier(
109 instance_watcher->m_work_queue,
110 instance_watcher->m_ioctx,
111 RBD_MIRROR_INSTANCE_PREFIX + instance_id));
112 }
113
114 instance_watcher->m_notify_op_tracker.start_op();
115 auto result = instance_watcher->m_notify_ops.insert(
116 std::make_pair(instance_id, this)).second;
117 assert(result);
118 }
119
120 void send() {
121 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl;
122
123 assert(instance_watcher->m_lock.is_locked());
124
125 if (canceling) {
126 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
127 << ": canceling" << dendl;
128 instance_watcher->m_work_queue->queue(this, -ECANCELED);
129 return;
130 }
131
132 if (send_to_leader) {
133 if (instance_watcher->m_leader_instance_id.empty()) {
134 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
135 << ": suspending" << dendl;
136 instance_watcher->suspend_notify_request(this);
137 return;
138 }
139
140 if (instance_watcher->m_leader_instance_id != instance_id) {
141 auto count = instance_watcher->m_notify_ops.erase(
142 std::make_pair(instance_id, this));
143 assert(count > 0);
144
145 instance_id = instance_watcher->m_leader_instance_id;
146
147 auto result = instance_watcher->m_notify_ops.insert(
148 std::make_pair(instance_id, this)).second;
149 assert(result);
150
151 notifier.reset(new librbd::watcher::Notifier(
152 instance_watcher->m_work_queue,
153 instance_watcher->m_ioctx,
154 RBD_MIRROR_INSTANCE_PREFIX + instance_id));
155 }
156 }
157
158 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
159 << ": sendding to " << instance_id << dendl;
160 notifier->notify(bl, &response, this);
161 }
162
163 void cancel() {
164 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl;
165
166 assert(instance_watcher->m_lock.is_locked());
167
168 canceling = true;
169 instance_watcher->unsuspend_notify_request(this);
170 }
171
172 void finish(int r) override {
173 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << ": r="
174 << r << dendl;
175
176 if (r == 0 || r == -ETIMEDOUT) {
177 bool found = false;
178 for (auto &it : response.acks) {
179 auto &bl = it.second;
180 if (it.second.length() == 0) {
181 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
182 << ": no payload in ack, ignoring" << dendl;
183 continue;
184 }
185 try {
186 auto iter = bl.begin();
187 NotifyAckPayload ack;
188 ::decode(ack, iter);
189 if (ack.instance_id != instance_watcher->get_instance_id()) {
190 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
191 << ": ack instance_id (" << ack.instance_id << ") "
192 << "does not match, ignoring" << dendl;
193 continue;
194 }
195 if (ack.request_id != request_id) {
196 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
197 << ": ack request_id (" << ack.request_id << ") "
198 << "does not match, ignoring" << dendl;
199 continue;
200 }
201 r = ack.ret_val;
202 found = true;
203 break;
204 } catch (const buffer::error &err) {
205 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
206 << ": failed to decode ack: " << err.what() << dendl;
207 continue;
208 }
209 }
210
211 if (!found) {
212 if (r == -ETIMEDOUT) {
213 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
214 << ": resending after timeout" << dendl;
215 Mutex::Locker locker(instance_watcher->m_lock);
216 send();
217 return;
218 } else {
219 r = -EINVAL;
220 }
221 } else {
222 if (r == -ESTALE && send_to_leader) {
223 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
224 << ": resending due to leader change" << dendl;
225 Mutex::Locker locker(instance_watcher->m_lock);
226 send();
227 return;
228 }
229 }
230 }
231
232 on_finish->complete(r);
233
234 {
235 Mutex::Locker locker(instance_watcher->m_lock);
236 auto result = instance_watcher->m_notify_ops.erase(
237 std::make_pair(instance_id, this));
238 assert(result > 0);
239 instance_watcher->m_notify_op_tracker.finish_op();
240 }
241
242 delete this;
243 }
244
245 void complete(int r) override {
246 finish(r);
247 }
248 };
249
250 template <typename I>
251 struct InstanceWatcher<I>::C_SyncRequest : public Context {
252 InstanceWatcher<I> *instance_watcher;
253 std::string sync_id;
254 Context *on_start;
255 Context *on_complete = nullptr;
256 C_NotifyInstanceRequest *req = nullptr;
257
258 C_SyncRequest(InstanceWatcher<I> *instance_watcher,
259 const std::string &sync_id, Context *on_start)
260 : instance_watcher(instance_watcher), sync_id(sync_id),
261 on_start(on_start) {
262 dout(20) << "C_SyncRequest: " << this << " " << __func__ << ": sync_id="
263 << sync_id << dendl;
264 }
265
266 void finish(int r) override {
267 dout(20) << "C_SyncRequest: " << this << " " << __func__ << ": r="
268 << r << dendl;
269
270 if (on_start != nullptr) {
271 instance_watcher->handle_notify_sync_request(this, r);
272 } else {
273 instance_watcher->handle_notify_sync_complete(this, r);
274 delete this;
275 }
276 }
277
278 // called twice
279 void complete(int r) override {
280 finish(r);
281 }
282 };
283
284 #undef dout_prefix
285 #define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: " \
286 << this << " " << __func__ << ": "
287 template <typename I>
288 void InstanceWatcher<I>::get_instances(librados::IoCtx &io_ctx,
289 std::vector<std::string> *instance_ids,
290 Context *on_finish) {
291 librados::ObjectReadOperation op;
292 librbd::cls_client::mirror_instances_list_start(&op);
293 C_GetInstances *ctx = new C_GetInstances(instance_ids, on_finish);
294 librados::AioCompletion *aio_comp = create_rados_callback(ctx);
295
296 int r = io_ctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op, &ctx->out_bl);
297 assert(r == 0);
298 aio_comp->release();
299 }
300
301 template <typename I>
302 void InstanceWatcher<I>::remove_instance(librados::IoCtx &io_ctx,
303 ContextWQ *work_queue,
304 const std::string &instance_id,
305 Context *on_finish) {
306 auto req = new C_RemoveInstanceRequest<I>(io_ctx, work_queue, instance_id,
307 on_finish);
308 req->send();
309 }
310
311 template <typename I>
312 InstanceWatcher<I> *InstanceWatcher<I>::create(
313 librados::IoCtx &io_ctx, ContextWQ *work_queue,
314 InstanceReplayer<I> *instance_replayer) {
315 return new InstanceWatcher<I>(io_ctx, work_queue, instance_replayer,
316 stringify(io_ctx.get_instance_id()));
317 }
318
319 template <typename I>
320 InstanceWatcher<I>::InstanceWatcher(librados::IoCtx &io_ctx,
321 ContextWQ *work_queue,
322 InstanceReplayer<I> *instance_replayer,
323 const std::string &instance_id)
324 : Watcher(io_ctx, work_queue, RBD_MIRROR_INSTANCE_PREFIX + instance_id),
325 m_instance_replayer(instance_replayer), m_instance_id(instance_id),
326 m_lock(unique_lock_name("rbd::mirror::InstanceWatcher::m_lock", this)),
327 m_instance_lock(librbd::ManagedLock<I>::create(
328 m_ioctx, m_work_queue, m_oid, this, librbd::managed_lock::EXCLUSIVE, true,
329 m_cct->_conf->rbd_blacklist_expire_seconds)) {
330 }
331
332 template <typename I>
333 InstanceWatcher<I>::~InstanceWatcher() {
334 assert(m_notify_ops.empty());
335 assert(m_notify_op_tracker.empty());
336 assert(m_suspended_ops.empty());
337 assert(m_inflight_sync_reqs.empty());
338 assert(m_image_sync_throttler == nullptr);
339 m_instance_lock->destroy();
340 }
341
342 template <typename I>
343 int InstanceWatcher<I>::init() {
344 C_SaferCond init_ctx;
345 init(&init_ctx);
346 return init_ctx.wait();
347 }
348
349 template <typename I>
350 void InstanceWatcher<I>::init(Context *on_finish) {
351 dout(20) << "instance_id=" << m_instance_id << dendl;
352
353 Mutex::Locker locker(m_lock);
354
355 assert(m_on_finish == nullptr);
356 m_on_finish = on_finish;
357 m_ret_val = 0;
358
359 register_instance();
360 }
361
362 template <typename I>
363 void InstanceWatcher<I>::shut_down() {
364 C_SaferCond shut_down_ctx;
365 shut_down(&shut_down_ctx);
366 int r = shut_down_ctx.wait();
367 assert(r == 0);
368 }
369
370 template <typename I>
371 void InstanceWatcher<I>::shut_down(Context *on_finish) {
372 dout(20) << dendl;
373
374 Mutex::Locker locker(m_lock);
375
376 assert(m_on_finish == nullptr);
377 m_on_finish = on_finish;
378 m_ret_val = 0;
379
380 release_lock();
381 }
382
383 template <typename I>
384 void InstanceWatcher<I>::remove(Context *on_finish) {
385 dout(20) << dendl;
386
387 Mutex::Locker locker(m_lock);
388
389 assert(m_on_finish == nullptr);
390 m_on_finish = on_finish;
391 m_ret_val = 0;
392 m_removing = true;
393
394 get_instance_locker();
395 }
396
397 template <typename I>
398 void InstanceWatcher<I>::notify_image_acquire(
399 const std::string &instance_id, const std::string &global_image_id,
400 const std::string &peer_mirror_uuid, const std::string &peer_image_id,
401 Context *on_notify_ack) {
402 dout(20) << "instance_id=" << instance_id << ", global_image_id="
403 << global_image_id << dendl;
404
405 Mutex::Locker locker(m_lock);
406
407 assert(m_on_finish == nullptr);
408
409 if (instance_id == m_instance_id) {
410 handle_image_acquire(global_image_id, peer_mirror_uuid, peer_image_id,
411 on_notify_ack);
412 } else {
413 uint64_t request_id = ++m_request_seq;
414 bufferlist bl;
415 ::encode(NotifyMessage{ImageAcquirePayload{
416 request_id, global_image_id, peer_mirror_uuid, peer_image_id}}, bl);
417 auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
418 std::move(bl), on_notify_ack);
419 req->send();
420 }
421 }
422
423 template <typename I>
424 void InstanceWatcher<I>::notify_image_release(
425 const std::string &instance_id, const std::string &global_image_id,
426 const std::string &peer_mirror_uuid, const std::string &peer_image_id,
427 bool schedule_delete, Context *on_notify_ack) {
428 dout(20) << "instance_id=" << instance_id << ", global_image_id="
429 << global_image_id << dendl;
430
431 Mutex::Locker locker(m_lock);
432
433 assert(m_on_finish == nullptr);
434
435 if (instance_id == m_instance_id) {
436 handle_image_release(global_image_id, peer_mirror_uuid, peer_image_id,
437 schedule_delete, on_notify_ack);
438 } else {
439 uint64_t request_id = ++m_request_seq;
440 bufferlist bl;
441 ::encode(NotifyMessage{ImageReleasePayload{
442 request_id, global_image_id, peer_mirror_uuid, peer_image_id,
443 schedule_delete}}, bl);
444 auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
445 std::move(bl), on_notify_ack);
446 req->send();
447 }
448 }
449
450 template <typename I>
451 void InstanceWatcher<I>::notify_sync_request(const std::string &sync_id,
452 Context *on_sync_start) {
453 dout(20) << "sync_id=" << sync_id << dendl;
454
455 Mutex::Locker locker(m_lock);
456
457 assert(m_inflight_sync_reqs.count(sync_id) == 0);
458
459 uint64_t request_id = ++m_request_seq;
460
461 bufferlist bl;
462 ::encode(NotifyMessage{SyncRequestPayload{request_id, sync_id}}, bl);
463
464 auto sync_ctx = new C_SyncRequest(this, sync_id, on_sync_start);
465 sync_ctx->req = new C_NotifyInstanceRequest(this, "", request_id,
466 std::move(bl), sync_ctx);
467
468 m_inflight_sync_reqs[sync_id] = sync_ctx;
469 sync_ctx->req->send();
470 }
471
472 template <typename I>
473 bool InstanceWatcher<I>::cancel_sync_request(const std::string &sync_id) {
474 dout(20) << "sync_id=" << sync_id << dendl;
475
476 Mutex::Locker locker(m_lock);
477
478 auto it = m_inflight_sync_reqs.find(sync_id);
479 if (it == m_inflight_sync_reqs.end()) {
480 return false;
481 }
482
483 auto sync_ctx = it->second;
484
485 if (sync_ctx->on_start == nullptr) {
486 return false;
487 }
488
489 assert(sync_ctx->req != nullptr);
490 sync_ctx->req->cancel();
491 return true;
492 }
493
494 template <typename I>
495 void InstanceWatcher<I>::notify_sync_start(const std::string &instance_id,
496 const std::string &sync_id) {
497 dout(20) << "sync_id=" << sync_id << dendl;
498
499 Mutex::Locker locker(m_lock);
500
501 uint64_t request_id = ++m_request_seq;
502
503 bufferlist bl;
504 ::encode(NotifyMessage{SyncStartPayload{request_id, sync_id}}, bl);
505
506 auto ctx = new FunctionContext(
507 [this, sync_id] (int r) {
508 dout(20) << "finish: sync_id=" << sync_id << ", r=" << r << dendl;
509 Mutex::Locker locker(m_lock);
510 if (r != -ESTALE && m_image_sync_throttler != nullptr) {
511 m_image_sync_throttler->finish_op(sync_id);
512 }
513 });
514 auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
515 std::move(bl), ctx);
516 req->send();
517 }
518
519 template <typename I>
520 void InstanceWatcher<I>::notify_sync_complete(const std::string &sync_id) {
521 dout(20) << "sync_id=" << sync_id << dendl;
522
523 Mutex::Locker locker(m_lock);
524
525 auto it = m_inflight_sync_reqs.find(sync_id);
526 assert(it != m_inflight_sync_reqs.end());
527
528 auto sync_ctx = it->second;
529 assert(sync_ctx->req == nullptr);
530
531 m_inflight_sync_reqs.erase(it);
532 m_work_queue->queue(sync_ctx, 0);
533 }
534
535 template <typename I>
536 void InstanceWatcher<I>::handle_notify_sync_request(C_SyncRequest *sync_ctx,
537 int r) {
538 dout(20) << "sync_id=" << sync_ctx->sync_id << ", r=" << r << dendl;
539
540 Context *on_start = nullptr;
541 {
542 Mutex::Locker locker(m_lock);
543
544 assert(sync_ctx->req != nullptr);
545 assert(sync_ctx->on_start != nullptr);
546
547 if (sync_ctx->req->canceling) {
548 r = -ECANCELED;
549 }
550
551 std::swap(sync_ctx->on_start, on_start);
552 sync_ctx->req = nullptr;
553 }
554
555 on_start->complete(r == -ECANCELED ? r : 0);
556
557 if (r == -ECANCELED) {
558 notify_sync_complete(sync_ctx->sync_id);
559 }
560 }
561
562 template <typename I>
563 void InstanceWatcher<I>::handle_notify_sync_complete(C_SyncRequest *sync_ctx,
564 int r) {
565 dout(20) << "sync_id=" << sync_ctx->sync_id << ", r=" << r << dendl;
566
567 if (sync_ctx->on_complete != nullptr) {
568 sync_ctx->on_complete->complete(r);
569 }
570 }
571
572 template <typename I>
573 void InstanceWatcher<I>::print_sync_status(Formatter *f, stringstream *ss) {
574 dout(20) << dendl;
575
576 Mutex::Locker locker(m_lock);
577 if (m_image_sync_throttler != nullptr) {
578 m_image_sync_throttler->print_status(f, ss);
579 }
580 }
581
582 template <typename I>
583 void InstanceWatcher<I>::handle_acquire_leader() {
584 dout(20) << dendl;
585
586 Mutex::Locker locker(m_lock);
587
588 assert(m_image_sync_throttler == nullptr);
589 m_image_sync_throttler = ImageSyncThrottler<I>::create();
590
591 m_leader_instance_id = m_instance_id;
592 unsuspend_notify_requests();
593 }
594
595 template <typename I>
596 void InstanceWatcher<I>::handle_release_leader() {
597 dout(20) << dendl;
598
599 Mutex::Locker locker(m_lock);
600
601 assert(m_image_sync_throttler != nullptr);
602
603 m_leader_instance_id.clear();
604
605 m_image_sync_throttler->drain(-ESTALE);
606 m_image_sync_throttler->destroy();
607 m_image_sync_throttler = nullptr;
608 }
609
610 template <typename I>
611 void InstanceWatcher<I>::handle_update_leader(
612 const std::string &leader_instance_id) {
613 dout(20) << "leader_instance_id=" << leader_instance_id << dendl;
614
615 Mutex::Locker locker(m_lock);
616
617 m_leader_instance_id = leader_instance_id;
618
619 if (!m_leader_instance_id.empty()) {
620 unsuspend_notify_requests();
621 }
622 }
623
624 template <typename I>
625 void InstanceWatcher<I>::cancel_notify_requests(
626 const std::string &instance_id) {
627 dout(20) << "instance_id=" << instance_id << dendl;
628
629 Mutex::Locker locker(m_lock);
630
631 for (auto op : m_notify_ops) {
632 if (op.first == instance_id && !op.second->send_to_leader) {
633 op.second->cancel();
634 }
635 }
636 }
637
638 template <typename I>
639 void InstanceWatcher<I>::register_instance() {
640 assert(m_lock.is_locked());
641
642 dout(20) << dendl;
643
644 librados::ObjectWriteOperation op;
645 librbd::cls_client::mirror_instances_add(&op, m_instance_id);
646 librados::AioCompletion *aio_comp = create_rados_callback<
647 InstanceWatcher<I>, &InstanceWatcher<I>::handle_register_instance>(this);
648
649 int r = m_ioctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op);
650 assert(r == 0);
651 aio_comp->release();
652 }
653
654 template <typename I>
655 void InstanceWatcher<I>::handle_register_instance(int r) {
656 dout(20) << "r=" << r << dendl;
657
658 Context *on_finish = nullptr;
659 {
660 Mutex::Locker locker(m_lock);
661
662 if (r == 0) {
663 create_instance_object();
664 return;
665 }
666
667 derr << "error registering instance: " << cpp_strerror(r) << dendl;
668
669 std::swap(on_finish, m_on_finish);
670 }
671 on_finish->complete(r);
672 }
673
674
675 template <typename I>
676 void InstanceWatcher<I>::create_instance_object() {
677 dout(20) << dendl;
678
679 assert(m_lock.is_locked());
680
681 librados::ObjectWriteOperation op;
682 op.create(true);
683
684 librados::AioCompletion *aio_comp = create_rados_callback<
685 InstanceWatcher<I>,
686 &InstanceWatcher<I>::handle_create_instance_object>(this);
687 int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
688 assert(r == 0);
689 aio_comp->release();
690 }
691
692 template <typename I>
693 void InstanceWatcher<I>::handle_create_instance_object(int r) {
694 dout(20) << "r=" << r << dendl;
695
696 Mutex::Locker locker(m_lock);
697
698 if (r < 0) {
699 derr << "error creating " << m_oid << " object: " << cpp_strerror(r)
700 << dendl;
701
702 m_ret_val = r;
703 unregister_instance();
704 return;
705 }
706
707 register_watch();
708 }
709
710 template <typename I>
711 void InstanceWatcher<I>::register_watch() {
712 dout(20) << dendl;
713
714 assert(m_lock.is_locked());
715
716 Context *ctx = create_async_context_callback(
717 m_work_queue, create_context_callback<
718 InstanceWatcher<I>, &InstanceWatcher<I>::handle_register_watch>(this));
719
720 librbd::Watcher::register_watch(ctx);
721 }
722
723 template <typename I>
724 void InstanceWatcher<I>::handle_register_watch(int r) {
725 dout(20) << "r=" << r << dendl;
726
727 Mutex::Locker locker(m_lock);
728
729 if (r < 0) {
730 derr << "error registering instance watcher for " << m_oid << " object: "
731 << cpp_strerror(r) << dendl;
732
733 m_ret_val = r;
734 remove_instance_object();
735 return;
736 }
737
738 acquire_lock();
739 }
740
741 template <typename I>
742 void InstanceWatcher<I>::acquire_lock() {
743 dout(20) << dendl;
744
745 assert(m_lock.is_locked());
746
747 Context *ctx = create_async_context_callback(
748 m_work_queue, create_context_callback<
749 InstanceWatcher<I>, &InstanceWatcher<I>::handle_acquire_lock>(this));
750
751 m_instance_lock->acquire_lock(ctx);
752 }
753
754 template <typename I>
755 void InstanceWatcher<I>::handle_acquire_lock(int r) {
756 dout(20) << "r=" << r << dendl;
757
758 Context *on_finish = nullptr;
759 {
760 Mutex::Locker locker(m_lock);
761
762 if (r < 0) {
763
764 derr << "error acquiring instance lock: " << cpp_strerror(r) << dendl;
765
766 m_ret_val = r;
767 unregister_watch();
768 return;
769 }
770
771 std::swap(on_finish, m_on_finish);
772 }
773
774 on_finish->complete(r);
775 }
776
777 template <typename I>
778 void InstanceWatcher<I>::release_lock() {
779 dout(20) << dendl;
780
781 assert(m_lock.is_locked());
782
783 Context *ctx = create_async_context_callback(
784 m_work_queue, create_context_callback<
785 InstanceWatcher<I>, &InstanceWatcher<I>::handle_release_lock>(this));
786
787 m_instance_lock->shut_down(ctx);
788 }
789
790 template <typename I>
791 void InstanceWatcher<I>::handle_release_lock(int r) {
792 dout(20) << "r=" << r << dendl;
793
794 Mutex::Locker locker(m_lock);
795
796 if (r < 0) {
797 derr << "error releasing instance lock: " << cpp_strerror(r) << dendl;
798 }
799
800 unregister_watch();
801 }
802
803 template <typename I>
804 void InstanceWatcher<I>::unregister_watch() {
805 dout(20) << dendl;
806
807 assert(m_lock.is_locked());
808
809 Context *ctx = create_async_context_callback(
810 m_work_queue, create_context_callback<
811 InstanceWatcher<I>, &InstanceWatcher<I>::handle_unregister_watch>(this));
812
813 librbd::Watcher::unregister_watch(ctx);
814 }
815
816 template <typename I>
817 void InstanceWatcher<I>::handle_unregister_watch(int r) {
818 dout(20) << "r=" << r << dendl;
819
820 if (r < 0) {
821 derr << "error unregistering instance watcher for " << m_oid << " object: "
822 << cpp_strerror(r) << dendl;
823 }
824
825 Mutex::Locker locker(m_lock);
826 remove_instance_object();
827 }
828
829 template <typename I>
830 void InstanceWatcher<I>::remove_instance_object() {
831 assert(m_lock.is_locked());
832
833 dout(20) << dendl;
834
835 librados::ObjectWriteOperation op;
836 op.remove();
837
838 librados::AioCompletion *aio_comp = create_rados_callback<
839 InstanceWatcher<I>,
840 &InstanceWatcher<I>::handle_remove_instance_object>(this);
841 int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
842 assert(r == 0);
843 aio_comp->release();
844 }
845
846 template <typename I>
847 void InstanceWatcher<I>::handle_remove_instance_object(int r) {
848 dout(20) << "r=" << r << dendl;
849
850 if (m_removing && r == -ENOENT) {
851 r = 0;
852 }
853
854 if (r < 0) {
855 derr << "error removing " << m_oid << " object: " << cpp_strerror(r)
856 << dendl;
857 }
858
859 Mutex::Locker locker(m_lock);
860 unregister_instance();
861 }
862
863 template <typename I>
864 void InstanceWatcher<I>::unregister_instance() {
865 dout(20) << dendl;
866
867 assert(m_lock.is_locked());
868
869 librados::ObjectWriteOperation op;
870 librbd::cls_client::mirror_instances_remove(&op, m_instance_id);
871 librados::AioCompletion *aio_comp = create_rados_callback<
872 InstanceWatcher<I>, &InstanceWatcher<I>::handle_unregister_instance>(this);
873
874 int r = m_ioctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op);
875 assert(r == 0);
876 aio_comp->release();
877 }
878
879 template <typename I>
880 void InstanceWatcher<I>::handle_unregister_instance(int r) {
881 dout(20) << "r=" << r << dendl;
882
883 if (r < 0) {
884 derr << "error unregistering instance: " << cpp_strerror(r) << dendl;
885 }
886
887 Mutex::Locker locker(m_lock);
888 wait_for_notify_ops();
889 }
890
891 template <typename I>
892 void InstanceWatcher<I>::wait_for_notify_ops() {
893 dout(20) << dendl;
894
895 assert(m_lock.is_locked());
896
897 for (auto op : m_notify_ops) {
898 op.second->cancel();
899 }
900
901 Context *ctx = create_async_context_callback(
902 m_work_queue, create_context_callback<
903 InstanceWatcher<I>, &InstanceWatcher<I>::handle_wait_for_notify_ops>(this));
904
905 m_notify_op_tracker.wait_for_ops(ctx);
906 }
907
908 template <typename I>
909 void InstanceWatcher<I>::handle_wait_for_notify_ops(int r) {
910 dout(20) << "r=" << r << dendl;
911
912 assert(r == 0);
913
914 Context *on_finish = nullptr;
915 {
916 Mutex::Locker locker(m_lock);
917
918 assert(m_notify_ops.empty());
919
920 std::swap(on_finish, m_on_finish);
921 r = m_ret_val;
922
923 if (m_removing) {
924 m_removing = false;
925 }
926 }
927 on_finish->complete(r);
928 }
929
930 template <typename I>
931 void InstanceWatcher<I>::get_instance_locker() {
932 dout(20) << dendl;
933
934 assert(m_lock.is_locked());
935
936 Context *ctx = create_async_context_callback(
937 m_work_queue, create_context_callback<
938 InstanceWatcher<I>, &InstanceWatcher<I>::handle_get_instance_locker>(this));
939
940 m_instance_lock->get_locker(&m_instance_locker, ctx);
941 }
942
943 template <typename I>
944 void InstanceWatcher<I>::handle_get_instance_locker(int r) {
945 dout(20) << "r=" << r << dendl;
946
947 Mutex::Locker locker(m_lock);
948
949 if (r < 0) {
950 if (r != -ENOENT) {
951 derr << "error retrieving instance locker: " << cpp_strerror(r) << dendl;
952 }
953 remove_instance_object();
954 return;
955 }
956
957 break_instance_lock();
958 }
959
960 template <typename I>
961 void InstanceWatcher<I>::break_instance_lock() {
962 dout(20) << dendl;
963
964 assert(m_lock.is_locked());
965
966 Context *ctx = create_async_context_callback(
967 m_work_queue, create_context_callback<
968 InstanceWatcher<I>, &InstanceWatcher<I>::handle_break_instance_lock>(this));
969
970 m_instance_lock->break_lock(m_instance_locker, true, ctx);
971 }
972
973 template <typename I>
974 void InstanceWatcher<I>::handle_break_instance_lock(int r) {
975 dout(20) << "r=" << r << dendl;
976
977 Mutex::Locker locker(m_lock);
978
979 if (r < 0) {
980 if (r != -ENOENT) {
981 derr << "error breaking instance lock: " << cpp_strerror(r) << dendl;
982 }
983 remove_instance_object();
984 return;
985 }
986
987 remove_instance_object();
988 }
989
990 template <typename I>
991 void InstanceWatcher<I>::suspend_notify_request(C_NotifyInstanceRequest *req) {
992 dout(20) << req << dendl;
993
994 assert(m_lock.is_locked());
995
996 auto result = m_suspended_ops.insert(req).second;
997 assert(result);
998 }
999
1000 template <typename I>
1001 bool InstanceWatcher<I>::unsuspend_notify_request(
1002 C_NotifyInstanceRequest *req) {
1003 dout(20) << req << dendl;
1004
1005 assert(m_lock.is_locked());
1006
1007 auto result = m_suspended_ops.erase(req);
1008 if (result == 0) {
1009 return false;
1010 }
1011
1012 req->send();
1013 return true;
1014 }
1015
1016 template <typename I>
1017 void InstanceWatcher<I>::unsuspend_notify_requests() {
1018 dout(20) << dendl;
1019
1020 assert(m_lock.is_locked());
1021
1022 std::set<C_NotifyInstanceRequest *> suspended_ops;
1023 std::swap(m_suspended_ops, suspended_ops);
1024
1025 for (auto op : suspended_ops) {
1026 op->send();
1027 }
1028 }
1029
1030 template <typename I>
1031 Context *InstanceWatcher<I>::prepare_request(const std::string &instance_id,
1032 uint64_t request_id,
1033 C_NotifyAck *on_notify_ack) {
1034 dout(20) << "instance_id=" << instance_id << ", request_id=" << request_id
1035 << dendl;
1036
1037 Mutex::Locker locker(m_lock);
1038
1039 Context *ctx = nullptr;
1040 Request request(instance_id, request_id);
1041 auto it = m_requests.find(request);
1042
1043 if (it != m_requests.end()) {
1044 dout(20) << "duplicate for in-progress request" << dendl;
1045 delete it->on_notify_ack;
1046 m_requests.erase(it);
1047 } else {
1048 ctx = create_async_context_callback(
1049 m_work_queue, new FunctionContext(
1050 [this, instance_id, request_id] (int r) {
1051 complete_request(instance_id, request_id, r);
1052 }));
1053 }
1054
1055 request.on_notify_ack = on_notify_ack;
1056 m_requests.insert(request);
1057 return ctx;
1058 }
1059
1060 template <typename I>
1061 void InstanceWatcher<I>::complete_request(const std::string &instance_id,
1062 uint64_t request_id, int r) {
1063 dout(20) << "instance_id=" << instance_id << ", request_id=" << request_id
1064 << dendl;
1065
1066 C_NotifyAck *on_notify_ack;
1067 {
1068 Mutex::Locker locker(m_lock);
1069 Request request(instance_id, request_id);
1070 auto it = m_requests.find(request);
1071 assert(it != m_requests.end());
1072 on_notify_ack = it->on_notify_ack;
1073 m_requests.erase(it);
1074 }
1075
1076 ::encode(NotifyAckPayload(instance_id, request_id, r), on_notify_ack->out);
1077 on_notify_ack->complete(0);
1078 }
1079
1080 template <typename I>
1081 void InstanceWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
1082 uint64_t notifier_id, bufferlist &bl) {
1083 dout(20) << "notify_id=" << notify_id << ", handle=" << handle << ", "
1084 << "notifier_id=" << notifier_id << dendl;
1085
1086 auto ctx = new C_NotifyAck(this, notify_id, handle);
1087
1088 NotifyMessage notify_message;
1089 try {
1090 bufferlist::iterator iter = bl.begin();
1091 ::decode(notify_message, iter);
1092 } catch (const buffer::error &err) {
1093 derr << "error decoding image notification: " << err.what() << dendl;
1094 ctx->complete(0);
1095 return;
1096 }
1097
1098 apply_visitor(HandlePayloadVisitor(this, stringify(notifier_id), ctx),
1099 notify_message.payload);
1100 }
1101
1102 template <typename I>
1103 void InstanceWatcher<I>::handle_image_acquire(
1104 const std::string &global_image_id, const std::string &peer_mirror_uuid,
1105 const std::string &peer_image_id, Context *on_finish) {
1106 dout(20) << "global_image_id=" << global_image_id << dendl;
1107
1108 auto ctx = new FunctionContext(
1109 [this, global_image_id, peer_mirror_uuid, peer_image_id,
1110 on_finish] (int r) {
1111 m_instance_replayer->acquire_image(this, global_image_id,
1112 peer_mirror_uuid, peer_image_id,
1113 on_finish);
1114 m_notify_op_tracker.finish_op();
1115 });
1116
1117 m_notify_op_tracker.start_op();
1118 m_work_queue->queue(ctx, 0);
1119 }
1120
1121 template <typename I>
1122 void InstanceWatcher<I>::handle_image_release(
1123 const std::string &global_image_id, const std::string &peer_mirror_uuid,
1124 const std::string &peer_image_id, bool schedule_delete, Context *on_finish) {
1125 dout(20) << "global_image_id=" << global_image_id << dendl;
1126
1127 auto ctx = new FunctionContext(
1128 [this, global_image_id, peer_mirror_uuid, peer_image_id, schedule_delete,
1129 on_finish] (int r) {
1130 m_instance_replayer->release_image(global_image_id, peer_mirror_uuid,
1131 peer_image_id, schedule_delete,
1132 on_finish);
1133 m_notify_op_tracker.finish_op();
1134 });
1135
1136 m_notify_op_tracker.start_op();
1137 m_work_queue->queue(ctx, 0);
1138 }
1139
1140 template <typename I>
1141 void InstanceWatcher<I>::handle_sync_request(const std::string &instance_id,
1142 const std::string &sync_id,
1143 Context *on_finish) {
1144 dout(20) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl;
1145
1146 Mutex::Locker locker(m_lock);
1147
1148 if (m_image_sync_throttler == nullptr) {
1149 dout(20) << "sync request for non-leader" << dendl;
1150 m_work_queue->queue(on_finish, -ESTALE);
1151 return;
1152 }
1153
1154 Context *on_start = create_async_context_callback(
1155 m_work_queue, new FunctionContext(
1156 [this, instance_id, sync_id, on_finish] (int r) {
1157 dout(20) << "handle_sync_request: finish: instance_id=" << instance_id
1158 << ", sync_id=" << sync_id << ", r=" << r << dendl;
1159 if (r == 0) {
1160 notify_sync_start(instance_id, sync_id);
1161 }
1162 on_finish->complete(r);
1163 }));
1164 m_image_sync_throttler->start_op(sync_id, on_start);
1165 }
1166
1167 template <typename I>
1168 void InstanceWatcher<I>::handle_sync_start(const std::string &instance_id,
1169 const std::string &sync_id,
1170 Context *on_finish) {
1171 dout(20) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl;
1172
1173 Mutex::Locker locker(m_lock);
1174
1175 auto it = m_inflight_sync_reqs.find(sync_id);
1176 if (it == m_inflight_sync_reqs.end()) {
1177 dout(20) << "not found" << dendl;
1178 m_work_queue->queue(on_finish, 0);
1179 return;
1180 }
1181
1182 auto sync_ctx = it->second;
1183
1184 if (sync_ctx->on_complete != nullptr) {
1185 dout(20) << "duplicate request" << dendl;
1186 m_work_queue->queue(sync_ctx->on_complete, -ESTALE);
1187 }
1188
1189 sync_ctx->on_complete = on_finish;
1190 }
1191
1192 template <typename I>
1193 void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1194 const ImageAcquirePayload &payload,
1195 C_NotifyAck *on_notify_ack) {
1196 dout(20) << "image_acquire: instance_id=" << instance_id << ", "
1197 << "request_id=" << payload.request_id << dendl;
1198
1199 auto on_finish = prepare_request(instance_id, payload.request_id,
1200 on_notify_ack);
1201 if (on_finish != nullptr) {
1202 handle_image_acquire(payload.global_image_id, payload.peer_mirror_uuid,
1203 payload.peer_image_id, on_finish);
1204 }
1205 }
1206
1207 template <typename I>
1208 void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1209 const ImageReleasePayload &payload,
1210 C_NotifyAck *on_notify_ack) {
1211 dout(20) << "image_release: instance_id=" << instance_id << ", "
1212 << "request_id=" << payload.request_id << dendl;
1213
1214 auto on_finish = prepare_request(instance_id, payload.request_id,
1215 on_notify_ack);
1216 if (on_finish != nullptr) {
1217 handle_image_release(payload.global_image_id, payload.peer_mirror_uuid,
1218 payload.peer_image_id, payload.schedule_delete,
1219 on_finish);
1220 }
1221 }
1222
1223 template <typename I>
1224 void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1225 const SyncRequestPayload &payload,
1226 C_NotifyAck *on_notify_ack) {
1227 dout(20) << "sync_request: instance_id=" << instance_id << ", "
1228 << "request_id=" << payload.request_id << dendl;
1229
1230 auto on_finish = prepare_request(instance_id, payload.request_id,
1231 on_notify_ack);
1232 if (on_finish == nullptr) {
1233 return;
1234 }
1235
1236 handle_sync_request(instance_id, payload.sync_id, on_finish);
1237 }
1238
1239 template <typename I>
1240 void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1241 const SyncStartPayload &payload,
1242 C_NotifyAck *on_notify_ack) {
1243 dout(20) << "sync_start: instance_id=" << instance_id << ", "
1244 << "request_id=" << payload.request_id << dendl;
1245
1246 auto on_finish = prepare_request(instance_id, payload.request_id,
1247 on_notify_ack);
1248 if (on_finish == nullptr) {
1249 return;
1250 }
1251
1252 handle_sync_start(instance_id, payload.sync_id, on_finish);
1253 }
1254
1255 template <typename I>
1256 void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1257 const UnknownPayload &payload,
1258 C_NotifyAck *on_notify_ack) {
1259 dout(20) << "unknown: instance_id=" << instance_id << dendl;
1260
1261 on_notify_ack->complete(0);
1262 }
1263
1264 } // namespace mirror
1265 } // namespace rbd
1266
1267 template class rbd::mirror::InstanceWatcher<librbd::ImageCtx>;