]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/InstanceWatcher.cc
update sources to 12.2.7
[ceph.git] / ceph / src / tools / rbd_mirror / InstanceWatcher.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "InstanceWatcher.h"
5 #include "include/stringify.h"
6 #include "common/debug.h"
7 #include "common/errno.h"
8 #include "cls/rbd/cls_rbd_client.h"
9 #include "librbd/ManagedLock.h"
10 #include "librbd/Utils.h"
11 #include "InstanceReplayer.h"
12 #include "ImageSyncThrottler.h"
13
14 #define dout_context g_ceph_context
15 #define dout_subsys ceph_subsys_rbd_mirror
16 #undef dout_prefix
17 #define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: "
18
19 namespace rbd {
20 namespace mirror {
21
22 using namespace instance_watcher;
23
24 using librbd::util::create_async_context_callback;
25 using librbd::util::create_context_callback;
26 using librbd::util::create_rados_callback;
27 using librbd::util::unique_lock_name;
28
29 namespace {
30
31 struct C_GetInstances : public Context {
32 std::vector<std::string> *instance_ids;
33 Context *on_finish;
34 bufferlist out_bl;
35
36 C_GetInstances(std::vector<std::string> *instance_ids, Context *on_finish)
37 : instance_ids(instance_ids), on_finish(on_finish) {
38 }
39
40 void finish(int r) override {
41 dout(20) << "C_GetInstances: " << this << " " << __func__ << ": r=" << r
42 << dendl;
43
44 if (r == 0) {
45 bufferlist::iterator it = out_bl.begin();
46 r = librbd::cls_client::mirror_instances_list_finish(&it, instance_ids);
47 } else if (r == -ENOENT) {
48 r = 0;
49 }
50 on_finish->complete(r);
51 }
52 };
53
54 template <typename I>
55 struct C_RemoveInstanceRequest : public Context {
56 InstanceWatcher<I> instance_watcher;
57 Context *on_finish;
58
59 C_RemoveInstanceRequest(librados::IoCtx &io_ctx, ContextWQ *work_queue,
60 const std::string &instance_id, Context *on_finish)
61 : instance_watcher(io_ctx, work_queue, nullptr, instance_id),
62 on_finish(on_finish) {
63 }
64
65 void send() {
66 dout(20) << "C_RemoveInstanceRequest: " << this << " " << __func__ << dendl;
67
68 instance_watcher.remove(this);
69 }
70
71 void finish(int r) override {
72 dout(20) << "C_RemoveInstanceRequest: " << this << " " << __func__ << ": r="
73 << r << dendl;
74 assert(r == 0);
75
76 on_finish->complete(r);
77 }
78 };
79
80 } // anonymous namespace
81
82 template <typename I>
83 struct InstanceWatcher<I>::C_NotifyInstanceRequest : public Context {
84 InstanceWatcher<I> *instance_watcher;
85 std::string instance_id;
86 uint64_t request_id;
87 bufferlist bl;
88 Context *on_finish;
89 bool send_to_leader;
90 std::unique_ptr<librbd::watcher::Notifier> notifier;
91 librbd::watcher::NotifyResponse response;
92 bool canceling = false;
93
94 C_NotifyInstanceRequest(InstanceWatcher<I> *instance_watcher,
95 const std::string &instance_id, uint64_t request_id,
96 bufferlist &&bl, Context *on_finish)
97 : instance_watcher(instance_watcher), instance_id(instance_id),
98 request_id(request_id), bl(bl), on_finish(on_finish),
99 send_to_leader(instance_id.empty()) {
100 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
101 << ": instance_watcher=" << instance_watcher << ", instance_id="
102 << instance_id << ", request_id=" << request_id << dendl;
103
104 assert(instance_watcher->m_lock.is_locked());
105
106 if (!send_to_leader) {
107 assert((!instance_id.empty()));
108 notifier.reset(new librbd::watcher::Notifier(
109 instance_watcher->m_work_queue,
110 instance_watcher->m_ioctx,
111 RBD_MIRROR_INSTANCE_PREFIX + instance_id));
112 }
113
114 instance_watcher->m_notify_op_tracker.start_op();
115 auto result = instance_watcher->m_notify_ops.insert(
116 std::make_pair(instance_id, this)).second;
117 assert(result);
118 }
119
120 void send() {
121 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl;
122
123 assert(instance_watcher->m_lock.is_locked());
124
125 if (canceling) {
126 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
127 << ": canceling" << dendl;
128 instance_watcher->m_work_queue->queue(this, -ECANCELED);
129 return;
130 }
131
132 if (send_to_leader) {
133 if (instance_watcher->m_leader_instance_id.empty()) {
134 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
135 << ": suspending" << dendl;
136 instance_watcher->suspend_notify_request(this);
137 return;
138 }
139
140 if (instance_watcher->m_leader_instance_id != instance_id) {
141 auto count = instance_watcher->m_notify_ops.erase(
142 std::make_pair(instance_id, this));
143 assert(count > 0);
144
145 instance_id = instance_watcher->m_leader_instance_id;
146
147 auto result = instance_watcher->m_notify_ops.insert(
148 std::make_pair(instance_id, this)).second;
149 assert(result);
150
151 notifier.reset(new librbd::watcher::Notifier(
152 instance_watcher->m_work_queue,
153 instance_watcher->m_ioctx,
154 RBD_MIRROR_INSTANCE_PREFIX + instance_id));
155 }
156 }
157
158 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
159 << ": sending to " << instance_id << dendl;
160 notifier->notify(bl, &response, this);
161 }
162
163 void cancel() {
164 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl;
165
166 assert(instance_watcher->m_lock.is_locked());
167
168 canceling = true;
169 instance_watcher->unsuspend_notify_request(this);
170 }
171
172 void finish(int r) override {
173 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << ": r="
174 << r << dendl;
175
176 if (r == 0 || r == -ETIMEDOUT) {
177 bool found = false;
178 for (auto &it : response.acks) {
179 auto &bl = it.second;
180 if (it.second.length() == 0) {
181 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
182 << ": no payload in ack, ignoring" << dendl;
183 continue;
184 }
185 try {
186 auto iter = bl.begin();
187 NotifyAckPayload ack;
188 ::decode(ack, iter);
189 if (ack.instance_id != instance_watcher->get_instance_id()) {
190 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
191 << ": ack instance_id (" << ack.instance_id << ") "
192 << "does not match, ignoring" << dendl;
193 continue;
194 }
195 if (ack.request_id != request_id) {
196 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
197 << ": ack request_id (" << ack.request_id << ") "
198 << "does not match, ignoring" << dendl;
199 continue;
200 }
201 r = ack.ret_val;
202 found = true;
203 break;
204 } catch (const buffer::error &err) {
205 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
206 << ": failed to decode ack: " << err.what() << dendl;
207 continue;
208 }
209 }
210
211 if (!found) {
212 if (r == -ETIMEDOUT) {
213 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
214 << ": resending after timeout" << dendl;
215 Mutex::Locker locker(instance_watcher->m_lock);
216 send();
217 return;
218 } else {
219 r = -EINVAL;
220 }
221 } else {
222 if (r == -ESTALE && send_to_leader) {
223 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
224 << ": resending due to leader change" << dendl;
225 Mutex::Locker locker(instance_watcher->m_lock);
226 send();
227 return;
228 }
229 }
230 }
231
232 on_finish->complete(r);
233
234 {
235 Mutex::Locker locker(instance_watcher->m_lock);
236 auto result = instance_watcher->m_notify_ops.erase(
237 std::make_pair(instance_id, this));
238 assert(result > 0);
239 instance_watcher->m_notify_op_tracker.finish_op();
240 }
241
242 delete this;
243 }
244
245 void complete(int r) override {
246 finish(r);
247 }
248 };
249
250 template <typename I>
251 struct InstanceWatcher<I>::C_SyncRequest : public Context {
252 InstanceWatcher<I> *instance_watcher;
253 std::string sync_id;
254 Context *on_start;
255 Context *on_complete = nullptr;
256 C_NotifyInstanceRequest *req = nullptr;
257
258 C_SyncRequest(InstanceWatcher<I> *instance_watcher,
259 const std::string &sync_id, Context *on_start)
260 : instance_watcher(instance_watcher), sync_id(sync_id),
261 on_start(on_start) {
262 dout(20) << "C_SyncRequest: " << this << " " << __func__ << ": sync_id="
263 << sync_id << dendl;
264 }
265
266 void finish(int r) override {
267 dout(20) << "C_SyncRequest: " << this << " " << __func__ << ": r="
268 << r << dendl;
269
270 if (on_start != nullptr) {
271 instance_watcher->handle_notify_sync_request(this, r);
272 } else {
273 instance_watcher->handle_notify_sync_complete(this, r);
274 delete this;
275 }
276 }
277
278 // called twice
279 void complete(int r) override {
280 finish(r);
281 }
282 };
283
284 #undef dout_prefix
285 #define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: " \
286 << this << " " << __func__ << ": "
287 template <typename I>
288 void InstanceWatcher<I>::get_instances(librados::IoCtx &io_ctx,
289 std::vector<std::string> *instance_ids,
290 Context *on_finish) {
291 librados::ObjectReadOperation op;
292 librbd::cls_client::mirror_instances_list_start(&op);
293 C_GetInstances *ctx = new C_GetInstances(instance_ids, on_finish);
294 librados::AioCompletion *aio_comp = create_rados_callback(ctx);
295
296 int r = io_ctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op, &ctx->out_bl);
297 assert(r == 0);
298 aio_comp->release();
299 }
300
301 template <typename I>
302 void InstanceWatcher<I>::remove_instance(librados::IoCtx &io_ctx,
303 ContextWQ *work_queue,
304 const std::string &instance_id,
305 Context *on_finish) {
306 auto req = new C_RemoveInstanceRequest<I>(io_ctx, work_queue, instance_id,
307 on_finish);
308 req->send();
309 }
310
311 template <typename I>
312 InstanceWatcher<I> *InstanceWatcher<I>::create(
313 librados::IoCtx &io_ctx, ContextWQ *work_queue,
314 InstanceReplayer<I> *instance_replayer) {
315 return new InstanceWatcher<I>(io_ctx, work_queue, instance_replayer,
316 stringify(io_ctx.get_instance_id()));
317 }
318
319 template <typename I>
320 InstanceWatcher<I>::InstanceWatcher(librados::IoCtx &io_ctx,
321 ContextWQ *work_queue,
322 InstanceReplayer<I> *instance_replayer,
323 const std::string &instance_id)
324 : Watcher(io_ctx, work_queue, RBD_MIRROR_INSTANCE_PREFIX + instance_id),
325 m_instance_replayer(instance_replayer), m_instance_id(instance_id),
326 m_lock(unique_lock_name("rbd::mirror::InstanceWatcher::m_lock", this)),
327 m_instance_lock(librbd::ManagedLock<I>::create(
328 m_ioctx, m_work_queue, m_oid, this, librbd::managed_lock::EXCLUSIVE, true,
329 m_cct->_conf->get_val<int64_t>("rbd_blacklist_expire_seconds"))) {
330 }
331
332 template <typename I>
333 InstanceWatcher<I>::~InstanceWatcher() {
334 assert(m_notify_ops.empty());
335 assert(m_notify_op_tracker.empty());
336 assert(m_suspended_ops.empty());
337 assert(m_inflight_sync_reqs.empty());
338 assert(m_image_sync_throttler == nullptr);
339 m_instance_lock->destroy();
340 }
341
342 template <typename I>
343 int InstanceWatcher<I>::init() {
344 C_SaferCond init_ctx;
345 init(&init_ctx);
346 return init_ctx.wait();
347 }
348
349 template <typename I>
350 void InstanceWatcher<I>::init(Context *on_finish) {
351 dout(20) << "instance_id=" << m_instance_id << dendl;
352
353 Mutex::Locker locker(m_lock);
354
355 assert(m_on_finish == nullptr);
356 m_on_finish = on_finish;
357 m_ret_val = 0;
358
359 register_instance();
360 }
361
362 template <typename I>
363 void InstanceWatcher<I>::shut_down() {
364 C_SaferCond shut_down_ctx;
365 shut_down(&shut_down_ctx);
366 int r = shut_down_ctx.wait();
367 assert(r == 0);
368 }
369
370 template <typename I>
371 void InstanceWatcher<I>::shut_down(Context *on_finish) {
372 dout(20) << dendl;
373
374 Mutex::Locker locker(m_lock);
375
376 assert(m_on_finish == nullptr);
377 m_on_finish = on_finish;
378 m_ret_val = 0;
379
380 release_lock();
381 }
382
383 template <typename I>
384 void InstanceWatcher<I>::remove(Context *on_finish) {
385 dout(20) << dendl;
386
387 Mutex::Locker locker(m_lock);
388
389 assert(m_on_finish == nullptr);
390 m_on_finish = on_finish;
391 m_ret_val = 0;
392 m_removing = true;
393
394 get_instance_locker();
395 }
396
397 template <typename I>
398 void InstanceWatcher<I>::notify_image_acquire(
399 const std::string &instance_id, const std::string &global_image_id,
400 Context *on_notify_ack) {
401 dout(20) << "instance_id=" << instance_id << ", global_image_id="
402 << global_image_id << dendl;
403
404 Mutex::Locker locker(m_lock);
405
406 assert(m_on_finish == nullptr);
407
408 if (instance_id == m_instance_id) {
409 handle_image_acquire(global_image_id, on_notify_ack);
410 } else {
411 uint64_t request_id = ++m_request_seq;
412 bufferlist bl;
413 ::encode(NotifyMessage{ImageAcquirePayload{request_id, global_image_id}},
414 bl);
415 auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
416 std::move(bl), on_notify_ack);
417 req->send();
418 }
419 }
420
421 template <typename I>
422 void InstanceWatcher<I>::notify_image_release(
423 const std::string &instance_id, const std::string &global_image_id,
424 Context *on_notify_ack) {
425 dout(20) << "instance_id=" << instance_id << ", global_image_id="
426 << global_image_id << dendl;
427
428 Mutex::Locker locker(m_lock);
429
430 assert(m_on_finish == nullptr);
431
432 if (instance_id == m_instance_id) {
433 handle_image_release(global_image_id, on_notify_ack);
434 } else {
435 uint64_t request_id = ++m_request_seq;
436 bufferlist bl;
437 ::encode(NotifyMessage{ImageReleasePayload{request_id, global_image_id}},
438 bl);
439 auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
440 std::move(bl), on_notify_ack);
441 req->send();
442 }
443 }
444
445 template <typename I>
446 void InstanceWatcher<I>::notify_peer_image_removed(
447 const std::string &instance_id, const std::string &global_image_id,
448 const std::string &peer_mirror_uuid, Context *on_notify_ack) {
449 dout(20) << "instance_id=" << instance_id << ", "
450 << "global_image_id=" << global_image_id << ", "
451 << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
452
453 Mutex::Locker locker(m_lock);
454 assert(m_on_finish == nullptr);
455
456 if (instance_id == m_instance_id) {
457 handle_peer_image_removed(global_image_id, peer_mirror_uuid, on_notify_ack);
458 } else {
459 uint64_t request_id = ++m_request_seq;
460 bufferlist bl;
461 ::encode(NotifyMessage{PeerImageRemovedPayload{request_id, global_image_id,
462 peer_mirror_uuid}}, bl);
463 auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
464 std::move(bl), on_notify_ack);
465 req->send();
466 }
467 }
468
469 template <typename I>
470 void InstanceWatcher<I>::notify_sync_request(const std::string &sync_id,
471 Context *on_sync_start) {
472 dout(20) << "sync_id=" << sync_id << dendl;
473
474 Mutex::Locker locker(m_lock);
475
476 assert(m_inflight_sync_reqs.count(sync_id) == 0);
477
478 uint64_t request_id = ++m_request_seq;
479
480 bufferlist bl;
481 ::encode(NotifyMessage{SyncRequestPayload{request_id, sync_id}}, bl);
482
483 auto sync_ctx = new C_SyncRequest(this, sync_id, on_sync_start);
484 sync_ctx->req = new C_NotifyInstanceRequest(this, "", request_id,
485 std::move(bl), sync_ctx);
486
487 m_inflight_sync_reqs[sync_id] = sync_ctx;
488 sync_ctx->req->send();
489 }
490
491 template <typename I>
492 bool InstanceWatcher<I>::cancel_sync_request(const std::string &sync_id) {
493 dout(20) << "sync_id=" << sync_id << dendl;
494
495 Mutex::Locker locker(m_lock);
496
497 auto it = m_inflight_sync_reqs.find(sync_id);
498 if (it == m_inflight_sync_reqs.end()) {
499 return false;
500 }
501
502 auto sync_ctx = it->second;
503
504 if (sync_ctx->on_start == nullptr) {
505 return false;
506 }
507
508 assert(sync_ctx->req != nullptr);
509 sync_ctx->req->cancel();
510 return true;
511 }
512
513 template <typename I>
514 void InstanceWatcher<I>::notify_sync_start(const std::string &instance_id,
515 const std::string &sync_id) {
516 dout(20) << "sync_id=" << sync_id << dendl;
517
518 Mutex::Locker locker(m_lock);
519
520 uint64_t request_id = ++m_request_seq;
521
522 bufferlist bl;
523 ::encode(NotifyMessage{SyncStartPayload{request_id, sync_id}}, bl);
524
525 auto ctx = new FunctionContext(
526 [this, sync_id] (int r) {
527 dout(20) << "finish: sync_id=" << sync_id << ", r=" << r << dendl;
528 Mutex::Locker locker(m_lock);
529 if (r != -ESTALE && m_image_sync_throttler != nullptr) {
530 m_image_sync_throttler->finish_op(sync_id);
531 }
532 });
533 auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
534 std::move(bl), ctx);
535 req->send();
536 }
537
538 template <typename I>
539 void InstanceWatcher<I>::notify_sync_complete(const std::string &sync_id) {
540 Mutex::Locker locker(m_lock);
541 notify_sync_complete(m_lock, sync_id);
542 }
543
544 template <typename I>
545 void InstanceWatcher<I>::notify_sync_complete(const Mutex&,
546 const std::string &sync_id) {
547 dout(10) << "sync_id=" << sync_id << dendl;
548 assert(m_lock.is_locked());
549
550 auto it = m_inflight_sync_reqs.find(sync_id);
551 assert(it != m_inflight_sync_reqs.end());
552
553 auto sync_ctx = it->second;
554 assert(sync_ctx->req == nullptr);
555
556 m_inflight_sync_reqs.erase(it);
557 m_work_queue->queue(sync_ctx, 0);
558 }
559
560 template <typename I>
561 void InstanceWatcher<I>::handle_notify_sync_request(C_SyncRequest *sync_ctx,
562 int r) {
563 dout(20) << "sync_id=" << sync_ctx->sync_id << ", r=" << r << dendl;
564
565 Context *on_start = nullptr;
566 {
567 Mutex::Locker locker(m_lock);
568 assert(sync_ctx->req != nullptr);
569 assert(sync_ctx->on_start != nullptr);
570
571 if (sync_ctx->req->canceling) {
572 r = -ECANCELED;
573 }
574
575 std::swap(sync_ctx->on_start, on_start);
576 sync_ctx->req = nullptr;
577
578 if (r == -ECANCELED) {
579 notify_sync_complete(m_lock, sync_ctx->sync_id);
580 }
581 }
582
583 on_start->complete(r == -ECANCELED ? r : 0);
584 }
585
586 template <typename I>
587 void InstanceWatcher<I>::handle_notify_sync_complete(C_SyncRequest *sync_ctx,
588 int r) {
589 dout(20) << "sync_id=" << sync_ctx->sync_id << ", r=" << r << dendl;
590
591 if (sync_ctx->on_complete != nullptr) {
592 sync_ctx->on_complete->complete(r);
593 }
594 }
595
596 template <typename I>
597 void InstanceWatcher<I>::print_sync_status(Formatter *f, stringstream *ss) {
598 dout(20) << dendl;
599
600 Mutex::Locker locker(m_lock);
601 if (m_image_sync_throttler != nullptr) {
602 m_image_sync_throttler->print_status(f, ss);
603 }
604 }
605
606 template <typename I>
607 void InstanceWatcher<I>::handle_acquire_leader() {
608 dout(20) << dendl;
609
610 Mutex::Locker locker(m_lock);
611
612 assert(m_image_sync_throttler == nullptr);
613 m_image_sync_throttler = ImageSyncThrottler<I>::create();
614
615 m_leader_instance_id = m_instance_id;
616 unsuspend_notify_requests();
617 }
618
619 template <typename I>
620 void InstanceWatcher<I>::handle_release_leader() {
621 dout(20) << dendl;
622
623 Mutex::Locker locker(m_lock);
624
625 assert(m_image_sync_throttler != nullptr);
626
627 m_leader_instance_id.clear();
628
629 m_image_sync_throttler->drain(-ESTALE);
630 m_image_sync_throttler->destroy();
631 m_image_sync_throttler = nullptr;
632 }
633
634 template <typename I>
635 void InstanceWatcher<I>::handle_update_leader(
636 const std::string &leader_instance_id) {
637 dout(20) << "leader_instance_id=" << leader_instance_id << dendl;
638
639 Mutex::Locker locker(m_lock);
640
641 m_leader_instance_id = leader_instance_id;
642
643 if (!m_leader_instance_id.empty()) {
644 unsuspend_notify_requests();
645 }
646 }
647
648 template <typename I>
649 void InstanceWatcher<I>::cancel_notify_requests(
650 const std::string &instance_id) {
651 dout(20) << "instance_id=" << instance_id << dendl;
652
653 Mutex::Locker locker(m_lock);
654
655 for (auto op : m_notify_ops) {
656 if (op.first == instance_id && !op.second->send_to_leader) {
657 op.second->cancel();
658 }
659 }
660 }
661
662 template <typename I>
663 void InstanceWatcher<I>::register_instance() {
664 assert(m_lock.is_locked());
665
666 dout(20) << dendl;
667
668 librados::ObjectWriteOperation op;
669 librbd::cls_client::mirror_instances_add(&op, m_instance_id);
670 librados::AioCompletion *aio_comp = create_rados_callback<
671 InstanceWatcher<I>, &InstanceWatcher<I>::handle_register_instance>(this);
672
673 int r = m_ioctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op);
674 assert(r == 0);
675 aio_comp->release();
676 }
677
678 template <typename I>
679 void InstanceWatcher<I>::handle_register_instance(int r) {
680 dout(20) << "r=" << r << dendl;
681
682 Context *on_finish = nullptr;
683 {
684 Mutex::Locker locker(m_lock);
685
686 if (r == 0) {
687 create_instance_object();
688 return;
689 }
690
691 derr << "error registering instance: " << cpp_strerror(r) << dendl;
692
693 std::swap(on_finish, m_on_finish);
694 }
695 on_finish->complete(r);
696 }
697
698
699 template <typename I>
700 void InstanceWatcher<I>::create_instance_object() {
701 dout(20) << dendl;
702
703 assert(m_lock.is_locked());
704
705 librados::ObjectWriteOperation op;
706 op.create(true);
707
708 librados::AioCompletion *aio_comp = create_rados_callback<
709 InstanceWatcher<I>,
710 &InstanceWatcher<I>::handle_create_instance_object>(this);
711 int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
712 assert(r == 0);
713 aio_comp->release();
714 }
715
716 template <typename I>
717 void InstanceWatcher<I>::handle_create_instance_object(int r) {
718 dout(20) << "r=" << r << dendl;
719
720 Mutex::Locker locker(m_lock);
721
722 if (r < 0) {
723 derr << "error creating " << m_oid << " object: " << cpp_strerror(r)
724 << dendl;
725
726 m_ret_val = r;
727 unregister_instance();
728 return;
729 }
730
731 register_watch();
732 }
733
734 template <typename I>
735 void InstanceWatcher<I>::register_watch() {
736 dout(20) << dendl;
737
738 assert(m_lock.is_locked());
739
740 Context *ctx = create_async_context_callback(
741 m_work_queue, create_context_callback<
742 InstanceWatcher<I>, &InstanceWatcher<I>::handle_register_watch>(this));
743
744 librbd::Watcher::register_watch(ctx);
745 }
746
747 template <typename I>
748 void InstanceWatcher<I>::handle_register_watch(int r) {
749 dout(20) << "r=" << r << dendl;
750
751 Mutex::Locker locker(m_lock);
752
753 if (r < 0) {
754 derr << "error registering instance watcher for " << m_oid << " object: "
755 << cpp_strerror(r) << dendl;
756
757 m_ret_val = r;
758 remove_instance_object();
759 return;
760 }
761
762 acquire_lock();
763 }
764
765 template <typename I>
766 void InstanceWatcher<I>::acquire_lock() {
767 dout(20) << dendl;
768
769 assert(m_lock.is_locked());
770
771 Context *ctx = create_async_context_callback(
772 m_work_queue, create_context_callback<
773 InstanceWatcher<I>, &InstanceWatcher<I>::handle_acquire_lock>(this));
774
775 m_instance_lock->acquire_lock(ctx);
776 }
777
778 template <typename I>
779 void InstanceWatcher<I>::handle_acquire_lock(int r) {
780 dout(20) << "r=" << r << dendl;
781
782 Context *on_finish = nullptr;
783 {
784 Mutex::Locker locker(m_lock);
785
786 if (r < 0) {
787
788 derr << "error acquiring instance lock: " << cpp_strerror(r) << dendl;
789
790 m_ret_val = r;
791 unregister_watch();
792 return;
793 }
794
795 std::swap(on_finish, m_on_finish);
796 }
797
798 on_finish->complete(r);
799 }
800
801 template <typename I>
802 void InstanceWatcher<I>::release_lock() {
803 dout(20) << dendl;
804
805 assert(m_lock.is_locked());
806
807 Context *ctx = create_async_context_callback(
808 m_work_queue, create_context_callback<
809 InstanceWatcher<I>, &InstanceWatcher<I>::handle_release_lock>(this));
810
811 m_instance_lock->shut_down(ctx);
812 }
813
814 template <typename I>
815 void InstanceWatcher<I>::handle_release_lock(int r) {
816 dout(20) << "r=" << r << dendl;
817
818 Mutex::Locker locker(m_lock);
819
820 if (r < 0) {
821 derr << "error releasing instance lock: " << cpp_strerror(r) << dendl;
822 }
823
824 unregister_watch();
825 }
826
827 template <typename I>
828 void InstanceWatcher<I>::unregister_watch() {
829 dout(20) << dendl;
830
831 assert(m_lock.is_locked());
832
833 Context *ctx = create_async_context_callback(
834 m_work_queue, create_context_callback<
835 InstanceWatcher<I>, &InstanceWatcher<I>::handle_unregister_watch>(this));
836
837 librbd::Watcher::unregister_watch(ctx);
838 }
839
840 template <typename I>
841 void InstanceWatcher<I>::handle_unregister_watch(int r) {
842 dout(20) << "r=" << r << dendl;
843
844 if (r < 0) {
845 derr << "error unregistering instance watcher for " << m_oid << " object: "
846 << cpp_strerror(r) << dendl;
847 }
848
849 Mutex::Locker locker(m_lock);
850 remove_instance_object();
851 }
852
853 template <typename I>
854 void InstanceWatcher<I>::remove_instance_object() {
855 assert(m_lock.is_locked());
856
857 dout(20) << dendl;
858
859 librados::ObjectWriteOperation op;
860 op.remove();
861
862 librados::AioCompletion *aio_comp = create_rados_callback<
863 InstanceWatcher<I>,
864 &InstanceWatcher<I>::handle_remove_instance_object>(this);
865 int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
866 assert(r == 0);
867 aio_comp->release();
868 }
869
870 template <typename I>
871 void InstanceWatcher<I>::handle_remove_instance_object(int r) {
872 dout(20) << "r=" << r << dendl;
873
874 if (m_removing && r == -ENOENT) {
875 r = 0;
876 }
877
878 if (r < 0) {
879 derr << "error removing " << m_oid << " object: " << cpp_strerror(r)
880 << dendl;
881 }
882
883 Mutex::Locker locker(m_lock);
884 unregister_instance();
885 }
886
887 template <typename I>
888 void InstanceWatcher<I>::unregister_instance() {
889 dout(20) << dendl;
890
891 assert(m_lock.is_locked());
892
893 librados::ObjectWriteOperation op;
894 librbd::cls_client::mirror_instances_remove(&op, m_instance_id);
895 librados::AioCompletion *aio_comp = create_rados_callback<
896 InstanceWatcher<I>, &InstanceWatcher<I>::handle_unregister_instance>(this);
897
898 int r = m_ioctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op);
899 assert(r == 0);
900 aio_comp->release();
901 }
902
903 template <typename I>
904 void InstanceWatcher<I>::handle_unregister_instance(int r) {
905 dout(20) << "r=" << r << dendl;
906
907 if (r < 0) {
908 derr << "error unregistering instance: " << cpp_strerror(r) << dendl;
909 }
910
911 Mutex::Locker locker(m_lock);
912 wait_for_notify_ops();
913 }
914
915 template <typename I>
916 void InstanceWatcher<I>::wait_for_notify_ops() {
917 dout(20) << dendl;
918
919 assert(m_lock.is_locked());
920
921 for (auto op : m_notify_ops) {
922 op.second->cancel();
923 }
924
925 Context *ctx = create_async_context_callback(
926 m_work_queue, create_context_callback<
927 InstanceWatcher<I>, &InstanceWatcher<I>::handle_wait_for_notify_ops>(this));
928
929 m_notify_op_tracker.wait_for_ops(ctx);
930 }
931
932 template <typename I>
933 void InstanceWatcher<I>::handle_wait_for_notify_ops(int r) {
934 dout(20) << "r=" << r << dendl;
935
936 assert(r == 0);
937
938 Context *on_finish = nullptr;
939 {
940 Mutex::Locker locker(m_lock);
941
942 assert(m_notify_ops.empty());
943
944 std::swap(on_finish, m_on_finish);
945 r = m_ret_val;
946
947 if (m_removing) {
948 m_removing = false;
949 }
950 }
951 on_finish->complete(r);
952 }
953
954 template <typename I>
955 void InstanceWatcher<I>::get_instance_locker() {
956 dout(20) << dendl;
957
958 assert(m_lock.is_locked());
959
960 Context *ctx = create_async_context_callback(
961 m_work_queue, create_context_callback<
962 InstanceWatcher<I>, &InstanceWatcher<I>::handle_get_instance_locker>(this));
963
964 m_instance_lock->get_locker(&m_instance_locker, ctx);
965 }
966
967 template <typename I>
968 void InstanceWatcher<I>::handle_get_instance_locker(int r) {
969 dout(20) << "r=" << r << dendl;
970
971 Mutex::Locker locker(m_lock);
972
973 if (r < 0) {
974 if (r != -ENOENT) {
975 derr << "error retrieving instance locker: " << cpp_strerror(r) << dendl;
976 }
977 remove_instance_object();
978 return;
979 }
980
981 break_instance_lock();
982 }
983
984 template <typename I>
985 void InstanceWatcher<I>::break_instance_lock() {
986 dout(20) << dendl;
987
988 assert(m_lock.is_locked());
989
990 Context *ctx = create_async_context_callback(
991 m_work_queue, create_context_callback<
992 InstanceWatcher<I>, &InstanceWatcher<I>::handle_break_instance_lock>(this));
993
994 m_instance_lock->break_lock(m_instance_locker, true, ctx);
995 }
996
997 template <typename I>
998 void InstanceWatcher<I>::handle_break_instance_lock(int r) {
999 dout(20) << "r=" << r << dendl;
1000
1001 Mutex::Locker locker(m_lock);
1002
1003 if (r < 0) {
1004 if (r != -ENOENT) {
1005 derr << "error breaking instance lock: " << cpp_strerror(r) << dendl;
1006 }
1007 remove_instance_object();
1008 return;
1009 }
1010
1011 remove_instance_object();
1012 }
1013
1014 template <typename I>
1015 void InstanceWatcher<I>::suspend_notify_request(C_NotifyInstanceRequest *req) {
1016 dout(20) << req << dendl;
1017
1018 assert(m_lock.is_locked());
1019
1020 auto result = m_suspended_ops.insert(req).second;
1021 assert(result);
1022 }
1023
1024 template <typename I>
1025 bool InstanceWatcher<I>::unsuspend_notify_request(
1026 C_NotifyInstanceRequest *req) {
1027 dout(20) << req << dendl;
1028
1029 assert(m_lock.is_locked());
1030
1031 auto result = m_suspended_ops.erase(req);
1032 if (result == 0) {
1033 return false;
1034 }
1035
1036 req->send();
1037 return true;
1038 }
1039
1040 template <typename I>
1041 void InstanceWatcher<I>::unsuspend_notify_requests() {
1042 dout(20) << dendl;
1043
1044 assert(m_lock.is_locked());
1045
1046 std::set<C_NotifyInstanceRequest *> suspended_ops;
1047 std::swap(m_suspended_ops, suspended_ops);
1048
1049 for (auto op : suspended_ops) {
1050 op->send();
1051 }
1052 }
1053
1054 template <typename I>
1055 Context *InstanceWatcher<I>::prepare_request(const std::string &instance_id,
1056 uint64_t request_id,
1057 C_NotifyAck *on_notify_ack) {
1058 dout(20) << "instance_id=" << instance_id << ", request_id=" << request_id
1059 << dendl;
1060
1061 Mutex::Locker locker(m_lock);
1062
1063 Context *ctx = nullptr;
1064 Request request(instance_id, request_id);
1065 auto it = m_requests.find(request);
1066
1067 if (it != m_requests.end()) {
1068 dout(20) << "duplicate for in-progress request" << dendl;
1069 delete it->on_notify_ack;
1070 m_requests.erase(it);
1071 } else {
1072 ctx = create_async_context_callback(
1073 m_work_queue, new FunctionContext(
1074 [this, instance_id, request_id] (int r) {
1075 complete_request(instance_id, request_id, r);
1076 }));
1077 }
1078
1079 request.on_notify_ack = on_notify_ack;
1080 m_requests.insert(request);
1081 return ctx;
1082 }
1083
1084 template <typename I>
1085 void InstanceWatcher<I>::complete_request(const std::string &instance_id,
1086 uint64_t request_id, int r) {
1087 dout(20) << "instance_id=" << instance_id << ", request_id=" << request_id
1088 << dendl;
1089
1090 C_NotifyAck *on_notify_ack;
1091 {
1092 Mutex::Locker locker(m_lock);
1093 Request request(instance_id, request_id);
1094 auto it = m_requests.find(request);
1095 assert(it != m_requests.end());
1096 on_notify_ack = it->on_notify_ack;
1097 m_requests.erase(it);
1098 }
1099
1100 ::encode(NotifyAckPayload(instance_id, request_id, r), on_notify_ack->out);
1101 on_notify_ack->complete(0);
1102 }
1103
1104 template <typename I>
1105 void InstanceWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
1106 uint64_t notifier_id, bufferlist &bl) {
1107 dout(20) << "notify_id=" << notify_id << ", handle=" << handle << ", "
1108 << "notifier_id=" << notifier_id << dendl;
1109
1110 auto ctx = new C_NotifyAck(this, notify_id, handle);
1111
1112 NotifyMessage notify_message;
1113 try {
1114 bufferlist::iterator iter = bl.begin();
1115 ::decode(notify_message, iter);
1116 } catch (const buffer::error &err) {
1117 derr << "error decoding image notification: " << err.what() << dendl;
1118 ctx->complete(0);
1119 return;
1120 }
1121
1122 apply_visitor(HandlePayloadVisitor(this, stringify(notifier_id), ctx),
1123 notify_message.payload);
1124 }
1125
1126 template <typename I>
1127 void InstanceWatcher<I>::handle_image_acquire(
1128 const std::string &global_image_id, Context *on_finish) {
1129 dout(20) << "global_image_id=" << global_image_id << dendl;
1130
1131 auto ctx = new FunctionContext(
1132 [this, global_image_id, on_finish] (int r) {
1133 m_instance_replayer->acquire_image(this, global_image_id, on_finish);
1134 m_notify_op_tracker.finish_op();
1135 });
1136
1137 m_notify_op_tracker.start_op();
1138 m_work_queue->queue(ctx, 0);
1139 }
1140
1141 template <typename I>
1142 void InstanceWatcher<I>::handle_image_release(
1143 const std::string &global_image_id, Context *on_finish) {
1144 dout(20) << "global_image_id=" << global_image_id << dendl;
1145
1146 auto ctx = new FunctionContext(
1147 [this, global_image_id, on_finish] (int r) {
1148 m_instance_replayer->release_image(global_image_id, on_finish);
1149 m_notify_op_tracker.finish_op();
1150 });
1151
1152 m_notify_op_tracker.start_op();
1153 m_work_queue->queue(ctx, 0);
1154 }
1155
1156 template <typename I>
1157 void InstanceWatcher<I>::handle_peer_image_removed(
1158 const std::string &global_image_id, const std::string &peer_mirror_uuid,
1159 Context *on_finish) {
1160 dout(20) << "global_image_id=" << global_image_id << ", "
1161 << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
1162
1163 auto ctx = new FunctionContext(
1164 [this, peer_mirror_uuid, global_image_id, on_finish] (int r) {
1165 m_instance_replayer->remove_peer_image(global_image_id,
1166 peer_mirror_uuid, on_finish);
1167 m_notify_op_tracker.finish_op();
1168 });
1169
1170 m_notify_op_tracker.start_op();
1171 m_work_queue->queue(ctx, 0);
1172 }
1173
1174 template <typename I>
1175 void InstanceWatcher<I>::handle_sync_request(const std::string &instance_id,
1176 const std::string &sync_id,
1177 Context *on_finish) {
1178 dout(20) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl;
1179
1180 Mutex::Locker locker(m_lock);
1181
1182 if (m_image_sync_throttler == nullptr) {
1183 dout(20) << "sync request for non-leader" << dendl;
1184 m_work_queue->queue(on_finish, -ESTALE);
1185 return;
1186 }
1187
1188 Context *on_start = create_async_context_callback(
1189 m_work_queue, new FunctionContext(
1190 [this, instance_id, sync_id, on_finish] (int r) {
1191 dout(20) << "handle_sync_request: finish: instance_id=" << instance_id
1192 << ", sync_id=" << sync_id << ", r=" << r << dendl;
1193 if (r == 0) {
1194 notify_sync_start(instance_id, sync_id);
1195 }
1196 on_finish->complete(r);
1197 }));
1198 m_image_sync_throttler->start_op(sync_id, on_start);
1199 }
1200
1201 template <typename I>
1202 void InstanceWatcher<I>::handle_sync_start(const std::string &instance_id,
1203 const std::string &sync_id,
1204 Context *on_finish) {
1205 dout(20) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl;
1206
1207 Mutex::Locker locker(m_lock);
1208
1209 auto it = m_inflight_sync_reqs.find(sync_id);
1210 if (it == m_inflight_sync_reqs.end()) {
1211 dout(20) << "not found" << dendl;
1212 m_work_queue->queue(on_finish, 0);
1213 return;
1214 }
1215
1216 auto sync_ctx = it->second;
1217
1218 if (sync_ctx->on_complete != nullptr) {
1219 dout(20) << "duplicate request" << dendl;
1220 m_work_queue->queue(sync_ctx->on_complete, -ESTALE);
1221 }
1222
1223 sync_ctx->on_complete = on_finish;
1224 }
1225
1226 template <typename I>
1227 void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1228 const ImageAcquirePayload &payload,
1229 C_NotifyAck *on_notify_ack) {
1230 dout(20) << "image_acquire: instance_id=" << instance_id << ", "
1231 << "request_id=" << payload.request_id << dendl;
1232
1233 auto on_finish = prepare_request(instance_id, payload.request_id,
1234 on_notify_ack);
1235 if (on_finish != nullptr) {
1236 handle_image_acquire(payload.global_image_id, on_finish);
1237 }
1238 }
1239
1240 template <typename I>
1241 void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1242 const ImageReleasePayload &payload,
1243 C_NotifyAck *on_notify_ack) {
1244 dout(20) << "image_release: instance_id=" << instance_id << ", "
1245 << "request_id=" << payload.request_id << dendl;
1246
1247 auto on_finish = prepare_request(instance_id, payload.request_id,
1248 on_notify_ack);
1249 if (on_finish != nullptr) {
1250 handle_image_release(payload.global_image_id, on_finish);
1251 }
1252 }
1253
1254 template <typename I>
1255 void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1256 const PeerImageRemovedPayload &payload,
1257 C_NotifyAck *on_notify_ack) {
1258 dout(20) << "remove_peer_image: instance_id=" << instance_id << ", "
1259 << "request_id=" << payload.request_id << dendl;
1260
1261 auto on_finish = prepare_request(instance_id, payload.request_id,
1262 on_notify_ack);
1263 if (on_finish != nullptr) {
1264 handle_peer_image_removed(payload.global_image_id, payload.peer_mirror_uuid,
1265 on_finish);
1266 }
1267 }
1268
1269 template <typename I>
1270 void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1271 const SyncRequestPayload &payload,
1272 C_NotifyAck *on_notify_ack) {
1273 dout(20) << "sync_request: instance_id=" << instance_id << ", "
1274 << "request_id=" << payload.request_id << dendl;
1275
1276 auto on_finish = prepare_request(instance_id, payload.request_id,
1277 on_notify_ack);
1278 if (on_finish == nullptr) {
1279 return;
1280 }
1281
1282 handle_sync_request(instance_id, payload.sync_id, on_finish);
1283 }
1284
1285 template <typename I>
1286 void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1287 const SyncStartPayload &payload,
1288 C_NotifyAck *on_notify_ack) {
1289 dout(20) << "sync_start: instance_id=" << instance_id << ", "
1290 << "request_id=" << payload.request_id << dendl;
1291
1292 auto on_finish = prepare_request(instance_id, payload.request_id,
1293 on_notify_ack);
1294 if (on_finish == nullptr) {
1295 return;
1296 }
1297
1298 handle_sync_start(instance_id, payload.sync_id, on_finish);
1299 }
1300
1301 template <typename I>
1302 void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1303 const UnknownPayload &payload,
1304 C_NotifyAck *on_notify_ack) {
1305 dout(20) << "unknown: instance_id=" << instance_id << dendl;
1306
1307 on_notify_ack->complete(0);
1308 }
1309
1310 } // namespace mirror
1311 } // namespace rbd
1312
1313 template class rbd::mirror::InstanceWatcher<librbd::ImageCtx>;