]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/InstanceWatcher.cc
update sources to 12.2.7
[ceph.git] / ceph / src / tools / rbd_mirror / InstanceWatcher.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "InstanceWatcher.h"
7c673cae
FG
5#include "include/stringify.h"
6#include "common/debug.h"
7#include "common/errno.h"
8#include "cls/rbd/cls_rbd_client.h"
9#include "librbd/ManagedLock.h"
10#include "librbd/Utils.h"
11#include "InstanceReplayer.h"
31f18b77 12#include "ImageSyncThrottler.h"
7c673cae
FG
13
14#define dout_context g_ceph_context
15#define dout_subsys ceph_subsys_rbd_mirror
16#undef dout_prefix
17#define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: "
18
19namespace rbd {
20namespace mirror {
21
22using namespace instance_watcher;
23
24using librbd::util::create_async_context_callback;
25using librbd::util::create_context_callback;
26using librbd::util::create_rados_callback;
27using librbd::util::unique_lock_name;
28
29namespace {
30
31struct C_GetInstances : public Context {
32 std::vector<std::string> *instance_ids;
33 Context *on_finish;
34 bufferlist out_bl;
35
36 C_GetInstances(std::vector<std::string> *instance_ids, Context *on_finish)
37 : instance_ids(instance_ids), on_finish(on_finish) {
38 }
39
40 void finish(int r) override {
41 dout(20) << "C_GetInstances: " << this << " " << __func__ << ": r=" << r
42 << dendl;
43
44 if (r == 0) {
45 bufferlist::iterator it = out_bl.begin();
46 r = librbd::cls_client::mirror_instances_list_finish(&it, instance_ids);
47 } else if (r == -ENOENT) {
48 r = 0;
49 }
50 on_finish->complete(r);
51 }
52};
53
54template <typename I>
55struct C_RemoveInstanceRequest : public Context {
56 InstanceWatcher<I> instance_watcher;
57 Context *on_finish;
58
59 C_RemoveInstanceRequest(librados::IoCtx &io_ctx, ContextWQ *work_queue,
60 const std::string &instance_id, Context *on_finish)
61 : instance_watcher(io_ctx, work_queue, nullptr, instance_id),
62 on_finish(on_finish) {
63 }
64
65 void send() {
66 dout(20) << "C_RemoveInstanceRequest: " << this << " " << __func__ << dendl;
67
68 instance_watcher.remove(this);
69 }
70
71 void finish(int r) override {
72 dout(20) << "C_RemoveInstanceRequest: " << this << " " << __func__ << ": r="
73 << r << dendl;
74 assert(r == 0);
75
76 on_finish->complete(r);
77 }
78};
79
80} // anonymous namespace
81
82template <typename I>
83struct InstanceWatcher<I>::C_NotifyInstanceRequest : public Context {
84 InstanceWatcher<I> *instance_watcher;
7c673cae
FG
85 std::string instance_id;
86 uint64_t request_id;
87 bufferlist bl;
88 Context *on_finish;
31f18b77
FG
89 bool send_to_leader;
90 std::unique_ptr<librbd::watcher::Notifier> notifier;
7c673cae 91 librbd::watcher::NotifyResponse response;
31f18b77 92 bool canceling = false;
7c673cae
FG
93
94 C_NotifyInstanceRequest(InstanceWatcher<I> *instance_watcher,
95 const std::string &instance_id, uint64_t request_id,
96 bufferlist &&bl, Context *on_finish)
31f18b77
FG
97 : instance_watcher(instance_watcher), instance_id(instance_id),
98 request_id(request_id), bl(bl), on_finish(on_finish),
99 send_to_leader(instance_id.empty()) {
100 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
101 << ": instance_watcher=" << instance_watcher << ", instance_id="
102 << instance_id << ", request_id=" << request_id << dendl;
103
7c673cae 104 assert(instance_watcher->m_lock.is_locked());
31f18b77
FG
105
106 if (!send_to_leader) {
107 assert((!instance_id.empty()));
108 notifier.reset(new librbd::watcher::Notifier(
109 instance_watcher->m_work_queue,
110 instance_watcher->m_ioctx,
111 RBD_MIRROR_INSTANCE_PREFIX + instance_id));
112 }
113
114 instance_watcher->m_notify_op_tracker.start_op();
7c673cae
FG
115 auto result = instance_watcher->m_notify_ops.insert(
116 std::make_pair(instance_id, this)).second;
117 assert(result);
118 }
119
120 void send() {
121 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl;
122
31f18b77
FG
123 assert(instance_watcher->m_lock.is_locked());
124
125 if (canceling) {
126 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
127 << ": canceling" << dendl;
128 instance_watcher->m_work_queue->queue(this, -ECANCELED);
129 return;
130 }
131
132 if (send_to_leader) {
133 if (instance_watcher->m_leader_instance_id.empty()) {
134 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
135 << ": suspending" << dendl;
136 instance_watcher->suspend_notify_request(this);
137 return;
138 }
139
140 if (instance_watcher->m_leader_instance_id != instance_id) {
141 auto count = instance_watcher->m_notify_ops.erase(
142 std::make_pair(instance_id, this));
143 assert(count > 0);
144
145 instance_id = instance_watcher->m_leader_instance_id;
146
147 auto result = instance_watcher->m_notify_ops.insert(
148 std::make_pair(instance_id, this)).second;
149 assert(result);
150
151 notifier.reset(new librbd::watcher::Notifier(
152 instance_watcher->m_work_queue,
153 instance_watcher->m_ioctx,
154 RBD_MIRROR_INSTANCE_PREFIX + instance_id));
155 }
156 }
157
158 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
d2e6a577 159 << ": sending to " << instance_id << dendl;
31f18b77 160 notifier->notify(bl, &response, this);
7c673cae
FG
161 }
162
163 void cancel() {
164 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl;
165
31f18b77
FG
166 assert(instance_watcher->m_lock.is_locked());
167
168 canceling = true;
169 instance_watcher->unsuspend_notify_request(this);
7c673cae
FG
170 }
171
172 void finish(int r) override {
173 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << ": r="
174 << r << dendl;
175
176 if (r == 0 || r == -ETIMEDOUT) {
177 bool found = false;
178 for (auto &it : response.acks) {
179 auto &bl = it.second;
180 if (it.second.length() == 0) {
181 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
182 << ": no payload in ack, ignoring" << dendl;
183 continue;
184 }
185 try {
186 auto iter = bl.begin();
187 NotifyAckPayload ack;
188 ::decode(ack, iter);
189 if (ack.instance_id != instance_watcher->get_instance_id()) {
190 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
191 << ": ack instance_id (" << ack.instance_id << ") "
192 << "does not match, ignoring" << dendl;
193 continue;
194 }
195 if (ack.request_id != request_id) {
196 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
197 << ": ack request_id (" << ack.request_id << ") "
198 << "does not match, ignoring" << dendl;
199 continue;
200 }
201 r = ack.ret_val;
202 found = true;
203 break;
204 } catch (const buffer::error &err) {
205 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
206 << ": failed to decode ack: " << err.what() << dendl;
207 continue;
208 }
209 }
210
211 if (!found) {
212 if (r == -ETIMEDOUT) {
31f18b77
FG
213 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
214 << ": resending after timeout" << dendl;
215 Mutex::Locker locker(instance_watcher->m_lock);
216 send();
217 return;
7c673cae
FG
218 } else {
219 r = -EINVAL;
220 }
31f18b77
FG
221 } else {
222 if (r == -ESTALE && send_to_leader) {
223 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
224 << ": resending due to leader change" << dendl;
225 Mutex::Locker locker(instance_watcher->m_lock);
226 send();
227 return;
228 }
7c673cae
FG
229 }
230 }
231
7c673cae
FG
232 on_finish->complete(r);
233
31f18b77
FG
234 {
235 Mutex::Locker locker(instance_watcher->m_lock);
236 auto result = instance_watcher->m_notify_ops.erase(
7c673cae 237 std::make_pair(instance_id, this));
31f18b77
FG
238 assert(result > 0);
239 instance_watcher->m_notify_op_tracker.finish_op();
240 }
241
7c673cae
FG
242 delete this;
243 }
244
245 void complete(int r) override {
246 finish(r);
247 }
248};
249
31f18b77
FG
250template <typename I>
251struct InstanceWatcher<I>::C_SyncRequest : public Context {
252 InstanceWatcher<I> *instance_watcher;
253 std::string sync_id;
254 Context *on_start;
255 Context *on_complete = nullptr;
256 C_NotifyInstanceRequest *req = nullptr;
257
258 C_SyncRequest(InstanceWatcher<I> *instance_watcher,
259 const std::string &sync_id, Context *on_start)
260 : instance_watcher(instance_watcher), sync_id(sync_id),
261 on_start(on_start) {
262 dout(20) << "C_SyncRequest: " << this << " " << __func__ << ": sync_id="
263 << sync_id << dendl;
264 }
265
266 void finish(int r) override {
267 dout(20) << "C_SyncRequest: " << this << " " << __func__ << ": r="
268 << r << dendl;
269
270 if (on_start != nullptr) {
271 instance_watcher->handle_notify_sync_request(this, r);
272 } else {
273 instance_watcher->handle_notify_sync_complete(this, r);
274 delete this;
275 }
276 }
277
278 // called twice
279 void complete(int r) override {
280 finish(r);
281 }
282};
283
7c673cae
FG
284#undef dout_prefix
285#define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: " \
286 << this << " " << __func__ << ": "
287template <typename I>
288void InstanceWatcher<I>::get_instances(librados::IoCtx &io_ctx,
289 std::vector<std::string> *instance_ids,
290 Context *on_finish) {
291 librados::ObjectReadOperation op;
292 librbd::cls_client::mirror_instances_list_start(&op);
293 C_GetInstances *ctx = new C_GetInstances(instance_ids, on_finish);
294 librados::AioCompletion *aio_comp = create_rados_callback(ctx);
295
296 int r = io_ctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op, &ctx->out_bl);
297 assert(r == 0);
298 aio_comp->release();
299}
300
301template <typename I>
302void InstanceWatcher<I>::remove_instance(librados::IoCtx &io_ctx,
303 ContextWQ *work_queue,
304 const std::string &instance_id,
305 Context *on_finish) {
306 auto req = new C_RemoveInstanceRequest<I>(io_ctx, work_queue, instance_id,
307 on_finish);
308 req->send();
309}
310
311template <typename I>
312InstanceWatcher<I> *InstanceWatcher<I>::create(
313 librados::IoCtx &io_ctx, ContextWQ *work_queue,
314 InstanceReplayer<I> *instance_replayer) {
315 return new InstanceWatcher<I>(io_ctx, work_queue, instance_replayer,
316 stringify(io_ctx.get_instance_id()));
317}
318
319template <typename I>
320InstanceWatcher<I>::InstanceWatcher(librados::IoCtx &io_ctx,
321 ContextWQ *work_queue,
322 InstanceReplayer<I> *instance_replayer,
323 const std::string &instance_id)
324 : Watcher(io_ctx, work_queue, RBD_MIRROR_INSTANCE_PREFIX + instance_id),
325 m_instance_replayer(instance_replayer), m_instance_id(instance_id),
326 m_lock(unique_lock_name("rbd::mirror::InstanceWatcher::m_lock", this)),
327 m_instance_lock(librbd::ManagedLock<I>::create(
328 m_ioctx, m_work_queue, m_oid, this, librbd::managed_lock::EXCLUSIVE, true,
181888fb 329 m_cct->_conf->get_val<int64_t>("rbd_blacklist_expire_seconds"))) {
7c673cae
FG
330}
331
332template <typename I>
333InstanceWatcher<I>::~InstanceWatcher() {
31f18b77
FG
334 assert(m_notify_ops.empty());
335 assert(m_notify_op_tracker.empty());
336 assert(m_suspended_ops.empty());
337 assert(m_inflight_sync_reqs.empty());
338 assert(m_image_sync_throttler == nullptr);
7c673cae
FG
339 m_instance_lock->destroy();
340}
341
342template <typename I>
343int InstanceWatcher<I>::init() {
344 C_SaferCond init_ctx;
345 init(&init_ctx);
346 return init_ctx.wait();
347}
348
349template <typename I>
350void InstanceWatcher<I>::init(Context *on_finish) {
351 dout(20) << "instance_id=" << m_instance_id << dendl;
352
353 Mutex::Locker locker(m_lock);
354
355 assert(m_on_finish == nullptr);
356 m_on_finish = on_finish;
357 m_ret_val = 0;
358
359 register_instance();
360}
361
362template <typename I>
363void InstanceWatcher<I>::shut_down() {
364 C_SaferCond shut_down_ctx;
365 shut_down(&shut_down_ctx);
366 int r = shut_down_ctx.wait();
367 assert(r == 0);
368}
369
370template <typename I>
371void InstanceWatcher<I>::shut_down(Context *on_finish) {
372 dout(20) << dendl;
373
374 Mutex::Locker locker(m_lock);
375
376 assert(m_on_finish == nullptr);
377 m_on_finish = on_finish;
378 m_ret_val = 0;
379
380 release_lock();
381}
382
383template <typename I>
384void InstanceWatcher<I>::remove(Context *on_finish) {
385 dout(20) << dendl;
386
387 Mutex::Locker locker(m_lock);
388
389 assert(m_on_finish == nullptr);
390 m_on_finish = on_finish;
391 m_ret_val = 0;
392 m_removing = true;
393
394 get_instance_locker();
395}
396
397template <typename I>
398void InstanceWatcher<I>::notify_image_acquire(
399 const std::string &instance_id, const std::string &global_image_id,
d2e6a577 400 Context *on_notify_ack) {
7c673cae
FG
401 dout(20) << "instance_id=" << instance_id << ", global_image_id="
402 << global_image_id << dendl;
403
404 Mutex::Locker locker(m_lock);
405
406 assert(m_on_finish == nullptr);
407
408 if (instance_id == m_instance_id) {
d2e6a577 409 handle_image_acquire(global_image_id, on_notify_ack);
7c673cae
FG
410 } else {
411 uint64_t request_id = ++m_request_seq;
412 bufferlist bl;
d2e6a577
FG
413 ::encode(NotifyMessage{ImageAcquirePayload{request_id, global_image_id}},
414 bl);
7c673cae
FG
415 auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
416 std::move(bl), on_notify_ack);
417 req->send();
418 }
419}
420
421template <typename I>
422void InstanceWatcher<I>::notify_image_release(
d2e6a577
FG
423 const std::string &instance_id, const std::string &global_image_id,
424 Context *on_notify_ack) {
7c673cae
FG
425 dout(20) << "instance_id=" << instance_id << ", global_image_id="
426 << global_image_id << dendl;
427
428 Mutex::Locker locker(m_lock);
429
430 assert(m_on_finish == nullptr);
431
432 if (instance_id == m_instance_id) {
d2e6a577 433 handle_image_release(global_image_id, on_notify_ack);
7c673cae
FG
434 } else {
435 uint64_t request_id = ++m_request_seq;
436 bufferlist bl;
d2e6a577
FG
437 ::encode(NotifyMessage{ImageReleasePayload{request_id, global_image_id}},
438 bl);
439 auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
440 std::move(bl), on_notify_ack);
441 req->send();
442 }
443}
444
445template <typename I>
446void InstanceWatcher<I>::notify_peer_image_removed(
447 const std::string &instance_id, const std::string &global_image_id,
448 const std::string &peer_mirror_uuid, Context *on_notify_ack) {
449 dout(20) << "instance_id=" << instance_id << ", "
450 << "global_image_id=" << global_image_id << ", "
451 << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
452
453 Mutex::Locker locker(m_lock);
454 assert(m_on_finish == nullptr);
455
456 if (instance_id == m_instance_id) {
457 handle_peer_image_removed(global_image_id, peer_mirror_uuid, on_notify_ack);
458 } else {
459 uint64_t request_id = ++m_request_seq;
460 bufferlist bl;
461 ::encode(NotifyMessage{PeerImageRemovedPayload{request_id, global_image_id,
462 peer_mirror_uuid}}, bl);
7c673cae
FG
463 auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
464 std::move(bl), on_notify_ack);
465 req->send();
466 }
467}
468
31f18b77
FG
469template <typename I>
470void InstanceWatcher<I>::notify_sync_request(const std::string &sync_id,
471 Context *on_sync_start) {
472 dout(20) << "sync_id=" << sync_id << dendl;
473
474 Mutex::Locker locker(m_lock);
475
476 assert(m_inflight_sync_reqs.count(sync_id) == 0);
477
478 uint64_t request_id = ++m_request_seq;
479
480 bufferlist bl;
481 ::encode(NotifyMessage{SyncRequestPayload{request_id, sync_id}}, bl);
482
483 auto sync_ctx = new C_SyncRequest(this, sync_id, on_sync_start);
484 sync_ctx->req = new C_NotifyInstanceRequest(this, "", request_id,
485 std::move(bl), sync_ctx);
486
487 m_inflight_sync_reqs[sync_id] = sync_ctx;
488 sync_ctx->req->send();
489}
490
491template <typename I>
492bool InstanceWatcher<I>::cancel_sync_request(const std::string &sync_id) {
493 dout(20) << "sync_id=" << sync_id << dendl;
494
495 Mutex::Locker locker(m_lock);
496
497 auto it = m_inflight_sync_reqs.find(sync_id);
498 if (it == m_inflight_sync_reqs.end()) {
499 return false;
500 }
501
502 auto sync_ctx = it->second;
503
504 if (sync_ctx->on_start == nullptr) {
505 return false;
506 }
507
508 assert(sync_ctx->req != nullptr);
509 sync_ctx->req->cancel();
510 return true;
511}
512
513template <typename I>
514void InstanceWatcher<I>::notify_sync_start(const std::string &instance_id,
515 const std::string &sync_id) {
516 dout(20) << "sync_id=" << sync_id << dendl;
517
518 Mutex::Locker locker(m_lock);
519
520 uint64_t request_id = ++m_request_seq;
521
522 bufferlist bl;
523 ::encode(NotifyMessage{SyncStartPayload{request_id, sync_id}}, bl);
524
525 auto ctx = new FunctionContext(
526 [this, sync_id] (int r) {
527 dout(20) << "finish: sync_id=" << sync_id << ", r=" << r << dendl;
528 Mutex::Locker locker(m_lock);
529 if (r != -ESTALE && m_image_sync_throttler != nullptr) {
530 m_image_sync_throttler->finish_op(sync_id);
531 }
532 });
533 auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
534 std::move(bl), ctx);
535 req->send();
536}
537
538template <typename I>
539void InstanceWatcher<I>::notify_sync_complete(const std::string &sync_id) {
31f18b77 540 Mutex::Locker locker(m_lock);
28e407b8
AA
541 notify_sync_complete(m_lock, sync_id);
542}
543
544template <typename I>
545void InstanceWatcher<I>::notify_sync_complete(const Mutex&,
546 const std::string &sync_id) {
547 dout(10) << "sync_id=" << sync_id << dendl;
548 assert(m_lock.is_locked());
31f18b77
FG
549
550 auto it = m_inflight_sync_reqs.find(sync_id);
551 assert(it != m_inflight_sync_reqs.end());
552
553 auto sync_ctx = it->second;
554 assert(sync_ctx->req == nullptr);
555
556 m_inflight_sync_reqs.erase(it);
557 m_work_queue->queue(sync_ctx, 0);
558}
559
560template <typename I>
561void InstanceWatcher<I>::handle_notify_sync_request(C_SyncRequest *sync_ctx,
562 int r) {
563 dout(20) << "sync_id=" << sync_ctx->sync_id << ", r=" << r << dendl;
564
565 Context *on_start = nullptr;
566 {
567 Mutex::Locker locker(m_lock);
31f18b77
FG
568 assert(sync_ctx->req != nullptr);
569 assert(sync_ctx->on_start != nullptr);
570
571 if (sync_ctx->req->canceling) {
572 r = -ECANCELED;
573 }
574
575 std::swap(sync_ctx->on_start, on_start);
576 sync_ctx->req = nullptr;
28e407b8
AA
577
578 if (r == -ECANCELED) {
579 notify_sync_complete(m_lock, sync_ctx->sync_id);
580 }
31f18b77
FG
581 }
582
583 on_start->complete(r == -ECANCELED ? r : 0);
31f18b77
FG
584}
585
586template <typename I>
587void InstanceWatcher<I>::handle_notify_sync_complete(C_SyncRequest *sync_ctx,
588 int r) {
589 dout(20) << "sync_id=" << sync_ctx->sync_id << ", r=" << r << dendl;
590
591 if (sync_ctx->on_complete != nullptr) {
592 sync_ctx->on_complete->complete(r);
593 }
594}
595
596template <typename I>
597void InstanceWatcher<I>::print_sync_status(Formatter *f, stringstream *ss) {
598 dout(20) << dendl;
599
600 Mutex::Locker locker(m_lock);
601 if (m_image_sync_throttler != nullptr) {
602 m_image_sync_throttler->print_status(f, ss);
603 }
604}
605
606template <typename I>
607void InstanceWatcher<I>::handle_acquire_leader() {
608 dout(20) << dendl;
609
610 Mutex::Locker locker(m_lock);
611
612 assert(m_image_sync_throttler == nullptr);
613 m_image_sync_throttler = ImageSyncThrottler<I>::create();
614
615 m_leader_instance_id = m_instance_id;
616 unsuspend_notify_requests();
617}
618
619template <typename I>
620void InstanceWatcher<I>::handle_release_leader() {
621 dout(20) << dendl;
622
623 Mutex::Locker locker(m_lock);
624
625 assert(m_image_sync_throttler != nullptr);
626
627 m_leader_instance_id.clear();
628
629 m_image_sync_throttler->drain(-ESTALE);
630 m_image_sync_throttler->destroy();
631 m_image_sync_throttler = nullptr;
632}
633
634template <typename I>
635void InstanceWatcher<I>::handle_update_leader(
636 const std::string &leader_instance_id) {
637 dout(20) << "leader_instance_id=" << leader_instance_id << dendl;
638
639 Mutex::Locker locker(m_lock);
640
641 m_leader_instance_id = leader_instance_id;
642
643 if (!m_leader_instance_id.empty()) {
644 unsuspend_notify_requests();
645 }
646}
647
7c673cae
FG
648template <typename I>
649void InstanceWatcher<I>::cancel_notify_requests(
650 const std::string &instance_id) {
651 dout(20) << "instance_id=" << instance_id << dendl;
652
653 Mutex::Locker locker(m_lock);
654
655 for (auto op : m_notify_ops) {
31f18b77 656 if (op.first == instance_id && !op.second->send_to_leader) {
7c673cae
FG
657 op.second->cancel();
658 }
659 }
660}
661
7c673cae
FG
662template <typename I>
663void InstanceWatcher<I>::register_instance() {
664 assert(m_lock.is_locked());
665
666 dout(20) << dendl;
667
668 librados::ObjectWriteOperation op;
669 librbd::cls_client::mirror_instances_add(&op, m_instance_id);
670 librados::AioCompletion *aio_comp = create_rados_callback<
671 InstanceWatcher<I>, &InstanceWatcher<I>::handle_register_instance>(this);
672
673 int r = m_ioctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op);
674 assert(r == 0);
675 aio_comp->release();
676}
677
678template <typename I>
679void InstanceWatcher<I>::handle_register_instance(int r) {
680 dout(20) << "r=" << r << dendl;
681
682 Context *on_finish = nullptr;
683 {
684 Mutex::Locker locker(m_lock);
685
686 if (r == 0) {
687 create_instance_object();
688 return;
689 }
690
691 derr << "error registering instance: " << cpp_strerror(r) << dendl;
692
693 std::swap(on_finish, m_on_finish);
694 }
695 on_finish->complete(r);
696}
697
698
699template <typename I>
700void InstanceWatcher<I>::create_instance_object() {
701 dout(20) << dendl;
702
703 assert(m_lock.is_locked());
704
705 librados::ObjectWriteOperation op;
706 op.create(true);
707
708 librados::AioCompletion *aio_comp = create_rados_callback<
709 InstanceWatcher<I>,
710 &InstanceWatcher<I>::handle_create_instance_object>(this);
711 int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
712 assert(r == 0);
713 aio_comp->release();
714}
715
716template <typename I>
717void InstanceWatcher<I>::handle_create_instance_object(int r) {
718 dout(20) << "r=" << r << dendl;
719
720 Mutex::Locker locker(m_lock);
721
722 if (r < 0) {
723 derr << "error creating " << m_oid << " object: " << cpp_strerror(r)
724 << dendl;
725
726 m_ret_val = r;
727 unregister_instance();
728 return;
729 }
730
731 register_watch();
732}
733
734template <typename I>
735void InstanceWatcher<I>::register_watch() {
736 dout(20) << dendl;
737
738 assert(m_lock.is_locked());
739
740 Context *ctx = create_async_context_callback(
741 m_work_queue, create_context_callback<
742 InstanceWatcher<I>, &InstanceWatcher<I>::handle_register_watch>(this));
743
744 librbd::Watcher::register_watch(ctx);
745}
746
747template <typename I>
748void InstanceWatcher<I>::handle_register_watch(int r) {
749 dout(20) << "r=" << r << dendl;
750
751 Mutex::Locker locker(m_lock);
752
753 if (r < 0) {
754 derr << "error registering instance watcher for " << m_oid << " object: "
755 << cpp_strerror(r) << dendl;
756
757 m_ret_val = r;
758 remove_instance_object();
759 return;
760 }
761
762 acquire_lock();
763}
764
765template <typename I>
766void InstanceWatcher<I>::acquire_lock() {
767 dout(20) << dendl;
768
769 assert(m_lock.is_locked());
770
771 Context *ctx = create_async_context_callback(
772 m_work_queue, create_context_callback<
773 InstanceWatcher<I>, &InstanceWatcher<I>::handle_acquire_lock>(this));
774
775 m_instance_lock->acquire_lock(ctx);
776}
777
778template <typename I>
779void InstanceWatcher<I>::handle_acquire_lock(int r) {
780 dout(20) << "r=" << r << dendl;
781
782 Context *on_finish = nullptr;
783 {
784 Mutex::Locker locker(m_lock);
785
786 if (r < 0) {
787
788 derr << "error acquiring instance lock: " << cpp_strerror(r) << dendl;
789
790 m_ret_val = r;
791 unregister_watch();
792 return;
793 }
794
795 std::swap(on_finish, m_on_finish);
796 }
797
798 on_finish->complete(r);
799}
800
801template <typename I>
802void InstanceWatcher<I>::release_lock() {
803 dout(20) << dendl;
804
805 assert(m_lock.is_locked());
806
807 Context *ctx = create_async_context_callback(
808 m_work_queue, create_context_callback<
809 InstanceWatcher<I>, &InstanceWatcher<I>::handle_release_lock>(this));
810
811 m_instance_lock->shut_down(ctx);
812}
813
814template <typename I>
815void InstanceWatcher<I>::handle_release_lock(int r) {
816 dout(20) << "r=" << r << dendl;
817
818 Mutex::Locker locker(m_lock);
819
820 if (r < 0) {
821 derr << "error releasing instance lock: " << cpp_strerror(r) << dendl;
822 }
823
824 unregister_watch();
825}
826
827template <typename I>
828void InstanceWatcher<I>::unregister_watch() {
829 dout(20) << dendl;
830
831 assert(m_lock.is_locked());
832
833 Context *ctx = create_async_context_callback(
834 m_work_queue, create_context_callback<
835 InstanceWatcher<I>, &InstanceWatcher<I>::handle_unregister_watch>(this));
836
837 librbd::Watcher::unregister_watch(ctx);
838}
839
840template <typename I>
841void InstanceWatcher<I>::handle_unregister_watch(int r) {
842 dout(20) << "r=" << r << dendl;
843
844 if (r < 0) {
845 derr << "error unregistering instance watcher for " << m_oid << " object: "
846 << cpp_strerror(r) << dendl;
847 }
848
849 Mutex::Locker locker(m_lock);
850 remove_instance_object();
851}
852
853template <typename I>
854void InstanceWatcher<I>::remove_instance_object() {
855 assert(m_lock.is_locked());
856
857 dout(20) << dendl;
858
859 librados::ObjectWriteOperation op;
860 op.remove();
861
862 librados::AioCompletion *aio_comp = create_rados_callback<
863 InstanceWatcher<I>,
864 &InstanceWatcher<I>::handle_remove_instance_object>(this);
865 int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
866 assert(r == 0);
867 aio_comp->release();
868}
869
870template <typename I>
871void InstanceWatcher<I>::handle_remove_instance_object(int r) {
872 dout(20) << "r=" << r << dendl;
873
874 if (m_removing && r == -ENOENT) {
875 r = 0;
876 }
877
878 if (r < 0) {
879 derr << "error removing " << m_oid << " object: " << cpp_strerror(r)
880 << dendl;
881 }
882
883 Mutex::Locker locker(m_lock);
884 unregister_instance();
885}
886
887template <typename I>
888void InstanceWatcher<I>::unregister_instance() {
889 dout(20) << dendl;
890
891 assert(m_lock.is_locked());
892
893 librados::ObjectWriteOperation op;
894 librbd::cls_client::mirror_instances_remove(&op, m_instance_id);
895 librados::AioCompletion *aio_comp = create_rados_callback<
896 InstanceWatcher<I>, &InstanceWatcher<I>::handle_unregister_instance>(this);
897
898 int r = m_ioctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op);
899 assert(r == 0);
900 aio_comp->release();
901}
902
903template <typename I>
904void InstanceWatcher<I>::handle_unregister_instance(int r) {
905 dout(20) << "r=" << r << dendl;
906
907 if (r < 0) {
908 derr << "error unregistering instance: " << cpp_strerror(r) << dendl;
909 }
910
911 Mutex::Locker locker(m_lock);
912 wait_for_notify_ops();
913}
914
915template <typename I>
916void InstanceWatcher<I>::wait_for_notify_ops() {
917 dout(20) << dendl;
918
919 assert(m_lock.is_locked());
920
921 for (auto op : m_notify_ops) {
922 op.second->cancel();
923 }
924
925 Context *ctx = create_async_context_callback(
926 m_work_queue, create_context_callback<
927 InstanceWatcher<I>, &InstanceWatcher<I>::handle_wait_for_notify_ops>(this));
928
929 m_notify_op_tracker.wait_for_ops(ctx);
930}
931
932template <typename I>
933void InstanceWatcher<I>::handle_wait_for_notify_ops(int r) {
934 dout(20) << "r=" << r << dendl;
935
936 assert(r == 0);
937
938 Context *on_finish = nullptr;
939 {
940 Mutex::Locker locker(m_lock);
941
942 assert(m_notify_ops.empty());
943
944 std::swap(on_finish, m_on_finish);
945 r = m_ret_val;
946
947 if (m_removing) {
948 m_removing = false;
949 }
950 }
951 on_finish->complete(r);
952}
953
954template <typename I>
955void InstanceWatcher<I>::get_instance_locker() {
956 dout(20) << dendl;
957
958 assert(m_lock.is_locked());
959
960 Context *ctx = create_async_context_callback(
961 m_work_queue, create_context_callback<
962 InstanceWatcher<I>, &InstanceWatcher<I>::handle_get_instance_locker>(this));
963
964 m_instance_lock->get_locker(&m_instance_locker, ctx);
965}
966
967template <typename I>
968void InstanceWatcher<I>::handle_get_instance_locker(int r) {
969 dout(20) << "r=" << r << dendl;
970
971 Mutex::Locker locker(m_lock);
972
973 if (r < 0) {
974 if (r != -ENOENT) {
975 derr << "error retrieving instance locker: " << cpp_strerror(r) << dendl;
976 }
977 remove_instance_object();
978 return;
979 }
980
981 break_instance_lock();
982}
983
984template <typename I>
985void InstanceWatcher<I>::break_instance_lock() {
986 dout(20) << dendl;
987
988 assert(m_lock.is_locked());
989
990 Context *ctx = create_async_context_callback(
991 m_work_queue, create_context_callback<
992 InstanceWatcher<I>, &InstanceWatcher<I>::handle_break_instance_lock>(this));
993
994 m_instance_lock->break_lock(m_instance_locker, true, ctx);
995}
996
997template <typename I>
998void InstanceWatcher<I>::handle_break_instance_lock(int r) {
999 dout(20) << "r=" << r << dendl;
1000
1001 Mutex::Locker locker(m_lock);
1002
1003 if (r < 0) {
1004 if (r != -ENOENT) {
1005 derr << "error breaking instance lock: " << cpp_strerror(r) << dendl;
1006 }
1007 remove_instance_object();
1008 return;
1009 }
1010
1011 remove_instance_object();
1012}
1013
31f18b77
FG
1014template <typename I>
1015void InstanceWatcher<I>::suspend_notify_request(C_NotifyInstanceRequest *req) {
1016 dout(20) << req << dendl;
1017
1018 assert(m_lock.is_locked());
1019
1020 auto result = m_suspended_ops.insert(req).second;
1021 assert(result);
1022}
1023
1024template <typename I>
1025bool InstanceWatcher<I>::unsuspend_notify_request(
1026 C_NotifyInstanceRequest *req) {
1027 dout(20) << req << dendl;
1028
1029 assert(m_lock.is_locked());
1030
1031 auto result = m_suspended_ops.erase(req);
1032 if (result == 0) {
1033 return false;
1034 }
1035
1036 req->send();
1037 return true;
1038}
1039
1040template <typename I>
1041void InstanceWatcher<I>::unsuspend_notify_requests() {
1042 dout(20) << dendl;
1043
1044 assert(m_lock.is_locked());
1045
1046 std::set<C_NotifyInstanceRequest *> suspended_ops;
1047 std::swap(m_suspended_ops, suspended_ops);
1048
1049 for (auto op : suspended_ops) {
1050 op->send();
1051 }
1052}
1053
7c673cae
FG
1054template <typename I>
1055Context *InstanceWatcher<I>::prepare_request(const std::string &instance_id,
1056 uint64_t request_id,
1057 C_NotifyAck *on_notify_ack) {
1058 dout(20) << "instance_id=" << instance_id << ", request_id=" << request_id
1059 << dendl;
1060
1061 Mutex::Locker locker(m_lock);
1062
1063 Context *ctx = nullptr;
1064 Request request(instance_id, request_id);
1065 auto it = m_requests.find(request);
1066
1067 if (it != m_requests.end()) {
1068 dout(20) << "duplicate for in-progress request" << dendl;
1069 delete it->on_notify_ack;
1070 m_requests.erase(it);
1071 } else {
31f18b77
FG
1072 ctx = create_async_context_callback(
1073 m_work_queue, new FunctionContext(
1074 [this, instance_id, request_id] (int r) {
1075 complete_request(instance_id, request_id, r);
1076 }));
7c673cae
FG
1077 }
1078
1079 request.on_notify_ack = on_notify_ack;
1080 m_requests.insert(request);
1081 return ctx;
1082}
1083
31f18b77
FG
1084template <typename I>
1085void InstanceWatcher<I>::complete_request(const std::string &instance_id,
1086 uint64_t request_id, int r) {
1087 dout(20) << "instance_id=" << instance_id << ", request_id=" << request_id
1088 << dendl;
1089
1090 C_NotifyAck *on_notify_ack;
1091 {
1092 Mutex::Locker locker(m_lock);
1093 Request request(instance_id, request_id);
1094 auto it = m_requests.find(request);
1095 assert(it != m_requests.end());
1096 on_notify_ack = it->on_notify_ack;
1097 m_requests.erase(it);
1098 }
1099
1100 ::encode(NotifyAckPayload(instance_id, request_id, r), on_notify_ack->out);
1101 on_notify_ack->complete(0);
1102}
1103
7c673cae
FG
1104template <typename I>
1105void InstanceWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
1106 uint64_t notifier_id, bufferlist &bl) {
1107 dout(20) << "notify_id=" << notify_id << ", handle=" << handle << ", "
1108 << "notifier_id=" << notifier_id << dendl;
1109
1110 auto ctx = new C_NotifyAck(this, notify_id, handle);
1111
1112 NotifyMessage notify_message;
1113 try {
1114 bufferlist::iterator iter = bl.begin();
1115 ::decode(notify_message, iter);
1116 } catch (const buffer::error &err) {
1117 derr << "error decoding image notification: " << err.what() << dendl;
1118 ctx->complete(0);
1119 return;
1120 }
1121
1122 apply_visitor(HandlePayloadVisitor(this, stringify(notifier_id), ctx),
1123 notify_message.payload);
1124}
1125
1126template <typename I>
1127void InstanceWatcher<I>::handle_image_acquire(
d2e6a577 1128 const std::string &global_image_id, Context *on_finish) {
7c673cae
FG
1129 dout(20) << "global_image_id=" << global_image_id << dendl;
1130
31f18b77 1131 auto ctx = new FunctionContext(
d2e6a577
FG
1132 [this, global_image_id, on_finish] (int r) {
1133 m_instance_replayer->acquire_image(this, global_image_id, on_finish);
31f18b77
FG
1134 m_notify_op_tracker.finish_op();
1135 });
1136
1137 m_notify_op_tracker.start_op();
1138 m_work_queue->queue(ctx, 0);
7c673cae
FG
1139}
1140
1141template <typename I>
1142void InstanceWatcher<I>::handle_image_release(
d2e6a577 1143 const std::string &global_image_id, Context *on_finish) {
7c673cae
FG
1144 dout(20) << "global_image_id=" << global_image_id << dendl;
1145
31f18b77 1146 auto ctx = new FunctionContext(
d2e6a577
FG
1147 [this, global_image_id, on_finish] (int r) {
1148 m_instance_replayer->release_image(global_image_id, on_finish);
1149 m_notify_op_tracker.finish_op();
1150 });
1151
1152 m_notify_op_tracker.start_op();
1153 m_work_queue->queue(ctx, 0);
1154}
1155
1156template <typename I>
1157void InstanceWatcher<I>::handle_peer_image_removed(
1158 const std::string &global_image_id, const std::string &peer_mirror_uuid,
1159 Context *on_finish) {
1160 dout(20) << "global_image_id=" << global_image_id << ", "
1161 << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
1162
1163 auto ctx = new FunctionContext(
1164 [this, peer_mirror_uuid, global_image_id, on_finish] (int r) {
1165 m_instance_replayer->remove_peer_image(global_image_id,
1166 peer_mirror_uuid, on_finish);
31f18b77
FG
1167 m_notify_op_tracker.finish_op();
1168 });
1169
1170 m_notify_op_tracker.start_op();
1171 m_work_queue->queue(ctx, 0);
1172}
1173
1174template <typename I>
1175void InstanceWatcher<I>::handle_sync_request(const std::string &instance_id,
1176 const std::string &sync_id,
1177 Context *on_finish) {
1178 dout(20) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl;
1179
1180 Mutex::Locker locker(m_lock);
1181
1182 if (m_image_sync_throttler == nullptr) {
1183 dout(20) << "sync request for non-leader" << dendl;
1184 m_work_queue->queue(on_finish, -ESTALE);
1185 return;
1186 }
1187
1188 Context *on_start = create_async_context_callback(
1189 m_work_queue, new FunctionContext(
1190 [this, instance_id, sync_id, on_finish] (int r) {
1191 dout(20) << "handle_sync_request: finish: instance_id=" << instance_id
1192 << ", sync_id=" << sync_id << ", r=" << r << dendl;
1193 if (r == 0) {
1194 notify_sync_start(instance_id, sync_id);
1195 }
1196 on_finish->complete(r);
1197 }));
1198 m_image_sync_throttler->start_op(sync_id, on_start);
1199}
1200
1201template <typename I>
1202void InstanceWatcher<I>::handle_sync_start(const std::string &instance_id,
1203 const std::string &sync_id,
1204 Context *on_finish) {
1205 dout(20) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl;
1206
1207 Mutex::Locker locker(m_lock);
1208
1209 auto it = m_inflight_sync_reqs.find(sync_id);
1210 if (it == m_inflight_sync_reqs.end()) {
1211 dout(20) << "not found" << dendl;
1212 m_work_queue->queue(on_finish, 0);
1213 return;
1214 }
1215
1216 auto sync_ctx = it->second;
1217
1218 if (sync_ctx->on_complete != nullptr) {
1219 dout(20) << "duplicate request" << dendl;
1220 m_work_queue->queue(sync_ctx->on_complete, -ESTALE);
1221 }
1222
1223 sync_ctx->on_complete = on_finish;
7c673cae
FG
1224}
1225
1226template <typename I>
1227void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1228 const ImageAcquirePayload &payload,
1229 C_NotifyAck *on_notify_ack) {
1230 dout(20) << "image_acquire: instance_id=" << instance_id << ", "
1231 << "request_id=" << payload.request_id << dendl;
1232
1233 auto on_finish = prepare_request(instance_id, payload.request_id,
1234 on_notify_ack);
1235 if (on_finish != nullptr) {
d2e6a577 1236 handle_image_acquire(payload.global_image_id, on_finish);
7c673cae
FG
1237 }
1238}
1239
1240template <typename I>
1241void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1242 const ImageReleasePayload &payload,
1243 C_NotifyAck *on_notify_ack) {
1244 dout(20) << "image_release: instance_id=" << instance_id << ", "
1245 << "request_id=" << payload.request_id << dendl;
1246
1247 auto on_finish = prepare_request(instance_id, payload.request_id,
1248 on_notify_ack);
1249 if (on_finish != nullptr) {
d2e6a577
FG
1250 handle_image_release(payload.global_image_id, on_finish);
1251 }
1252}
1253
1254template <typename I>
1255void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1256 const PeerImageRemovedPayload &payload,
1257 C_NotifyAck *on_notify_ack) {
1258 dout(20) << "remove_peer_image: instance_id=" << instance_id << ", "
1259 << "request_id=" << payload.request_id << dendl;
1260
1261 auto on_finish = prepare_request(instance_id, payload.request_id,
1262 on_notify_ack);
1263 if (on_finish != nullptr) {
1264 handle_peer_image_removed(payload.global_image_id, payload.peer_mirror_uuid,
1265 on_finish);
7c673cae
FG
1266 }
1267}
1268
31f18b77
FG
1269template <typename I>
1270void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1271 const SyncRequestPayload &payload,
1272 C_NotifyAck *on_notify_ack) {
1273 dout(20) << "sync_request: instance_id=" << instance_id << ", "
1274 << "request_id=" << payload.request_id << dendl;
1275
1276 auto on_finish = prepare_request(instance_id, payload.request_id,
1277 on_notify_ack);
1278 if (on_finish == nullptr) {
1279 return;
1280 }
1281
1282 handle_sync_request(instance_id, payload.sync_id, on_finish);
1283}
1284
1285template <typename I>
1286void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1287 const SyncStartPayload &payload,
1288 C_NotifyAck *on_notify_ack) {
1289 dout(20) << "sync_start: instance_id=" << instance_id << ", "
1290 << "request_id=" << payload.request_id << dendl;
1291
1292 auto on_finish = prepare_request(instance_id, payload.request_id,
1293 on_notify_ack);
1294 if (on_finish == nullptr) {
1295 return;
1296 }
1297
1298 handle_sync_start(instance_id, payload.sync_id, on_finish);
1299}
1300
7c673cae
FG
1301template <typename I>
1302void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1303 const UnknownPayload &payload,
1304 C_NotifyAck *on_notify_ack) {
1305 dout(20) << "unknown: instance_id=" << instance_id << dendl;
1306
1307 on_notify_ack->complete(0);
1308}
1309
1310} // namespace mirror
1311} // namespace rbd
1312
1313template class rbd::mirror::InstanceWatcher<librbd::ImageCtx>;