]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/InstanceWatcher.cc
update sources to v12.1.3
[ceph.git] / ceph / src / tools / rbd_mirror / InstanceWatcher.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "InstanceWatcher.h"
7c673cae
FG
5#include "include/stringify.h"
6#include "common/debug.h"
7#include "common/errno.h"
8#include "cls/rbd/cls_rbd_client.h"
9#include "librbd/ManagedLock.h"
10#include "librbd/Utils.h"
11#include "InstanceReplayer.h"
31f18b77 12#include "ImageSyncThrottler.h"
7c673cae
FG
13
14#define dout_context g_ceph_context
15#define dout_subsys ceph_subsys_rbd_mirror
16#undef dout_prefix
17#define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: "
18
19namespace rbd {
20namespace mirror {
21
22using namespace instance_watcher;
23
24using librbd::util::create_async_context_callback;
25using librbd::util::create_context_callback;
26using librbd::util::create_rados_callback;
27using librbd::util::unique_lock_name;
28
29namespace {
30
31struct C_GetInstances : public Context {
32 std::vector<std::string> *instance_ids;
33 Context *on_finish;
34 bufferlist out_bl;
35
36 C_GetInstances(std::vector<std::string> *instance_ids, Context *on_finish)
37 : instance_ids(instance_ids), on_finish(on_finish) {
38 }
39
40 void finish(int r) override {
41 dout(20) << "C_GetInstances: " << this << " " << __func__ << ": r=" << r
42 << dendl;
43
44 if (r == 0) {
45 bufferlist::iterator it = out_bl.begin();
46 r = librbd::cls_client::mirror_instances_list_finish(&it, instance_ids);
47 } else if (r == -ENOENT) {
48 r = 0;
49 }
50 on_finish->complete(r);
51 }
52};
53
54template <typename I>
55struct C_RemoveInstanceRequest : public Context {
56 InstanceWatcher<I> instance_watcher;
57 Context *on_finish;
58
59 C_RemoveInstanceRequest(librados::IoCtx &io_ctx, ContextWQ *work_queue,
60 const std::string &instance_id, Context *on_finish)
61 : instance_watcher(io_ctx, work_queue, nullptr, instance_id),
62 on_finish(on_finish) {
63 }
64
65 void send() {
66 dout(20) << "C_RemoveInstanceRequest: " << this << " " << __func__ << dendl;
67
68 instance_watcher.remove(this);
69 }
70
71 void finish(int r) override {
72 dout(20) << "C_RemoveInstanceRequest: " << this << " " << __func__ << ": r="
73 << r << dendl;
74 assert(r == 0);
75
76 on_finish->complete(r);
77 }
78};
79
80} // anonymous namespace
81
82template <typename I>
83struct InstanceWatcher<I>::C_NotifyInstanceRequest : public Context {
84 InstanceWatcher<I> *instance_watcher;
7c673cae
FG
85 std::string instance_id;
86 uint64_t request_id;
87 bufferlist bl;
88 Context *on_finish;
31f18b77
FG
89 bool send_to_leader;
90 std::unique_ptr<librbd::watcher::Notifier> notifier;
7c673cae 91 librbd::watcher::NotifyResponse response;
31f18b77 92 bool canceling = false;
7c673cae
FG
93
94 C_NotifyInstanceRequest(InstanceWatcher<I> *instance_watcher,
95 const std::string &instance_id, uint64_t request_id,
96 bufferlist &&bl, Context *on_finish)
31f18b77
FG
97 : instance_watcher(instance_watcher), instance_id(instance_id),
98 request_id(request_id), bl(bl), on_finish(on_finish),
99 send_to_leader(instance_id.empty()) {
100 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
101 << ": instance_watcher=" << instance_watcher << ", instance_id="
102 << instance_id << ", request_id=" << request_id << dendl;
103
7c673cae 104 assert(instance_watcher->m_lock.is_locked());
31f18b77
FG
105
106 if (!send_to_leader) {
107 assert((!instance_id.empty()));
108 notifier.reset(new librbd::watcher::Notifier(
109 instance_watcher->m_work_queue,
110 instance_watcher->m_ioctx,
111 RBD_MIRROR_INSTANCE_PREFIX + instance_id));
112 }
113
114 instance_watcher->m_notify_op_tracker.start_op();
7c673cae
FG
115 auto result = instance_watcher->m_notify_ops.insert(
116 std::make_pair(instance_id, this)).second;
117 assert(result);
118 }
119
120 void send() {
121 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl;
122
31f18b77
FG
123 assert(instance_watcher->m_lock.is_locked());
124
125 if (canceling) {
126 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
127 << ": canceling" << dendl;
128 instance_watcher->m_work_queue->queue(this, -ECANCELED);
129 return;
130 }
131
132 if (send_to_leader) {
133 if (instance_watcher->m_leader_instance_id.empty()) {
134 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
135 << ": suspending" << dendl;
136 instance_watcher->suspend_notify_request(this);
137 return;
138 }
139
140 if (instance_watcher->m_leader_instance_id != instance_id) {
141 auto count = instance_watcher->m_notify_ops.erase(
142 std::make_pair(instance_id, this));
143 assert(count > 0);
144
145 instance_id = instance_watcher->m_leader_instance_id;
146
147 auto result = instance_watcher->m_notify_ops.insert(
148 std::make_pair(instance_id, this)).second;
149 assert(result);
150
151 notifier.reset(new librbd::watcher::Notifier(
152 instance_watcher->m_work_queue,
153 instance_watcher->m_ioctx,
154 RBD_MIRROR_INSTANCE_PREFIX + instance_id));
155 }
156 }
157
158 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
d2e6a577 159 << ": sending to " << instance_id << dendl;
31f18b77 160 notifier->notify(bl, &response, this);
7c673cae
FG
161 }
162
163 void cancel() {
164 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl;
165
31f18b77
FG
166 assert(instance_watcher->m_lock.is_locked());
167
168 canceling = true;
169 instance_watcher->unsuspend_notify_request(this);
7c673cae
FG
170 }
171
172 void finish(int r) override {
173 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << ": r="
174 << r << dendl;
175
176 if (r == 0 || r == -ETIMEDOUT) {
177 bool found = false;
178 for (auto &it : response.acks) {
179 auto &bl = it.second;
180 if (it.second.length() == 0) {
181 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
182 << ": no payload in ack, ignoring" << dendl;
183 continue;
184 }
185 try {
186 auto iter = bl.begin();
187 NotifyAckPayload ack;
188 ::decode(ack, iter);
189 if (ack.instance_id != instance_watcher->get_instance_id()) {
190 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
191 << ": ack instance_id (" << ack.instance_id << ") "
192 << "does not match, ignoring" << dendl;
193 continue;
194 }
195 if (ack.request_id != request_id) {
196 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
197 << ": ack request_id (" << ack.request_id << ") "
198 << "does not match, ignoring" << dendl;
199 continue;
200 }
201 r = ack.ret_val;
202 found = true;
203 break;
204 } catch (const buffer::error &err) {
205 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
206 << ": failed to decode ack: " << err.what() << dendl;
207 continue;
208 }
209 }
210
211 if (!found) {
212 if (r == -ETIMEDOUT) {
31f18b77
FG
213 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
214 << ": resending after timeout" << dendl;
215 Mutex::Locker locker(instance_watcher->m_lock);
216 send();
217 return;
7c673cae
FG
218 } else {
219 r = -EINVAL;
220 }
31f18b77
FG
221 } else {
222 if (r == -ESTALE && send_to_leader) {
223 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
224 << ": resending due to leader change" << dendl;
225 Mutex::Locker locker(instance_watcher->m_lock);
226 send();
227 return;
228 }
7c673cae
FG
229 }
230 }
231
7c673cae
FG
232 on_finish->complete(r);
233
31f18b77
FG
234 {
235 Mutex::Locker locker(instance_watcher->m_lock);
236 auto result = instance_watcher->m_notify_ops.erase(
7c673cae 237 std::make_pair(instance_id, this));
31f18b77
FG
238 assert(result > 0);
239 instance_watcher->m_notify_op_tracker.finish_op();
240 }
241
7c673cae
FG
242 delete this;
243 }
244
245 void complete(int r) override {
246 finish(r);
247 }
248};
249
31f18b77
FG
250template <typename I>
251struct InstanceWatcher<I>::C_SyncRequest : public Context {
252 InstanceWatcher<I> *instance_watcher;
253 std::string sync_id;
254 Context *on_start;
255 Context *on_complete = nullptr;
256 C_NotifyInstanceRequest *req = nullptr;
257
258 C_SyncRequest(InstanceWatcher<I> *instance_watcher,
259 const std::string &sync_id, Context *on_start)
260 : instance_watcher(instance_watcher), sync_id(sync_id),
261 on_start(on_start) {
262 dout(20) << "C_SyncRequest: " << this << " " << __func__ << ": sync_id="
263 << sync_id << dendl;
264 }
265
266 void finish(int r) override {
267 dout(20) << "C_SyncRequest: " << this << " " << __func__ << ": r="
268 << r << dendl;
269
270 if (on_start != nullptr) {
271 instance_watcher->handle_notify_sync_request(this, r);
272 } else {
273 instance_watcher->handle_notify_sync_complete(this, r);
274 delete this;
275 }
276 }
277
278 // called twice
279 void complete(int r) override {
280 finish(r);
281 }
282};
283
7c673cae
FG
284#undef dout_prefix
285#define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: " \
286 << this << " " << __func__ << ": "
287template <typename I>
288void InstanceWatcher<I>::get_instances(librados::IoCtx &io_ctx,
289 std::vector<std::string> *instance_ids,
290 Context *on_finish) {
291 librados::ObjectReadOperation op;
292 librbd::cls_client::mirror_instances_list_start(&op);
293 C_GetInstances *ctx = new C_GetInstances(instance_ids, on_finish);
294 librados::AioCompletion *aio_comp = create_rados_callback(ctx);
295
296 int r = io_ctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op, &ctx->out_bl);
297 assert(r == 0);
298 aio_comp->release();
299}
300
301template <typename I>
302void InstanceWatcher<I>::remove_instance(librados::IoCtx &io_ctx,
303 ContextWQ *work_queue,
304 const std::string &instance_id,
305 Context *on_finish) {
306 auto req = new C_RemoveInstanceRequest<I>(io_ctx, work_queue, instance_id,
307 on_finish);
308 req->send();
309}
310
311template <typename I>
312InstanceWatcher<I> *InstanceWatcher<I>::create(
313 librados::IoCtx &io_ctx, ContextWQ *work_queue,
314 InstanceReplayer<I> *instance_replayer) {
315 return new InstanceWatcher<I>(io_ctx, work_queue, instance_replayer,
316 stringify(io_ctx.get_instance_id()));
317}
318
319template <typename I>
320InstanceWatcher<I>::InstanceWatcher(librados::IoCtx &io_ctx,
321 ContextWQ *work_queue,
322 InstanceReplayer<I> *instance_replayer,
323 const std::string &instance_id)
324 : Watcher(io_ctx, work_queue, RBD_MIRROR_INSTANCE_PREFIX + instance_id),
325 m_instance_replayer(instance_replayer), m_instance_id(instance_id),
326 m_lock(unique_lock_name("rbd::mirror::InstanceWatcher::m_lock", this)),
327 m_instance_lock(librbd::ManagedLock<I>::create(
328 m_ioctx, m_work_queue, m_oid, this, librbd::managed_lock::EXCLUSIVE, true,
329 m_cct->_conf->rbd_blacklist_expire_seconds)) {
330}
331
332template <typename I>
333InstanceWatcher<I>::~InstanceWatcher() {
31f18b77
FG
334 assert(m_notify_ops.empty());
335 assert(m_notify_op_tracker.empty());
336 assert(m_suspended_ops.empty());
337 assert(m_inflight_sync_reqs.empty());
338 assert(m_image_sync_throttler == nullptr);
7c673cae
FG
339 m_instance_lock->destroy();
340}
341
342template <typename I>
343int InstanceWatcher<I>::init() {
344 C_SaferCond init_ctx;
345 init(&init_ctx);
346 return init_ctx.wait();
347}
348
349template <typename I>
350void InstanceWatcher<I>::init(Context *on_finish) {
351 dout(20) << "instance_id=" << m_instance_id << dendl;
352
353 Mutex::Locker locker(m_lock);
354
355 assert(m_on_finish == nullptr);
356 m_on_finish = on_finish;
357 m_ret_val = 0;
358
359 register_instance();
360}
361
362template <typename I>
363void InstanceWatcher<I>::shut_down() {
364 C_SaferCond shut_down_ctx;
365 shut_down(&shut_down_ctx);
366 int r = shut_down_ctx.wait();
367 assert(r == 0);
368}
369
370template <typename I>
371void InstanceWatcher<I>::shut_down(Context *on_finish) {
372 dout(20) << dendl;
373
374 Mutex::Locker locker(m_lock);
375
376 assert(m_on_finish == nullptr);
377 m_on_finish = on_finish;
378 m_ret_val = 0;
379
380 release_lock();
381}
382
383template <typename I>
384void InstanceWatcher<I>::remove(Context *on_finish) {
385 dout(20) << dendl;
386
387 Mutex::Locker locker(m_lock);
388
389 assert(m_on_finish == nullptr);
390 m_on_finish = on_finish;
391 m_ret_val = 0;
392 m_removing = true;
393
394 get_instance_locker();
395}
396
397template <typename I>
398void InstanceWatcher<I>::notify_image_acquire(
399 const std::string &instance_id, const std::string &global_image_id,
d2e6a577 400 Context *on_notify_ack) {
7c673cae
FG
401 dout(20) << "instance_id=" << instance_id << ", global_image_id="
402 << global_image_id << dendl;
403
404 Mutex::Locker locker(m_lock);
405
406 assert(m_on_finish == nullptr);
407
408 if (instance_id == m_instance_id) {
d2e6a577 409 handle_image_acquire(global_image_id, on_notify_ack);
7c673cae
FG
410 } else {
411 uint64_t request_id = ++m_request_seq;
412 bufferlist bl;
d2e6a577
FG
413 ::encode(NotifyMessage{ImageAcquirePayload{request_id, global_image_id}},
414 bl);
7c673cae
FG
415 auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
416 std::move(bl), on_notify_ack);
417 req->send();
418 }
419}
420
421template <typename I>
422void InstanceWatcher<I>::notify_image_release(
d2e6a577
FG
423 const std::string &instance_id, const std::string &global_image_id,
424 Context *on_notify_ack) {
7c673cae
FG
425 dout(20) << "instance_id=" << instance_id << ", global_image_id="
426 << global_image_id << dendl;
427
428 Mutex::Locker locker(m_lock);
429
430 assert(m_on_finish == nullptr);
431
432 if (instance_id == m_instance_id) {
d2e6a577 433 handle_image_release(global_image_id, on_notify_ack);
7c673cae
FG
434 } else {
435 uint64_t request_id = ++m_request_seq;
436 bufferlist bl;
d2e6a577
FG
437 ::encode(NotifyMessage{ImageReleasePayload{request_id, global_image_id}},
438 bl);
439 auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
440 std::move(bl), on_notify_ack);
441 req->send();
442 }
443}
444
445template <typename I>
446void InstanceWatcher<I>::notify_peer_image_removed(
447 const std::string &instance_id, const std::string &global_image_id,
448 const std::string &peer_mirror_uuid, Context *on_notify_ack) {
449 dout(20) << "instance_id=" << instance_id << ", "
450 << "global_image_id=" << global_image_id << ", "
451 << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
452
453 Mutex::Locker locker(m_lock);
454 assert(m_on_finish == nullptr);
455
456 if (instance_id == m_instance_id) {
457 handle_peer_image_removed(global_image_id, peer_mirror_uuid, on_notify_ack);
458 } else {
459 uint64_t request_id = ++m_request_seq;
460 bufferlist bl;
461 ::encode(NotifyMessage{PeerImageRemovedPayload{request_id, global_image_id,
462 peer_mirror_uuid}}, bl);
7c673cae
FG
463 auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
464 std::move(bl), on_notify_ack);
465 req->send();
466 }
467}
468
31f18b77
FG
469template <typename I>
470void InstanceWatcher<I>::notify_sync_request(const std::string &sync_id,
471 Context *on_sync_start) {
472 dout(20) << "sync_id=" << sync_id << dendl;
473
474 Mutex::Locker locker(m_lock);
475
476 assert(m_inflight_sync_reqs.count(sync_id) == 0);
477
478 uint64_t request_id = ++m_request_seq;
479
480 bufferlist bl;
481 ::encode(NotifyMessage{SyncRequestPayload{request_id, sync_id}}, bl);
482
483 auto sync_ctx = new C_SyncRequest(this, sync_id, on_sync_start);
484 sync_ctx->req = new C_NotifyInstanceRequest(this, "", request_id,
485 std::move(bl), sync_ctx);
486
487 m_inflight_sync_reqs[sync_id] = sync_ctx;
488 sync_ctx->req->send();
489}
490
491template <typename I>
492bool InstanceWatcher<I>::cancel_sync_request(const std::string &sync_id) {
493 dout(20) << "sync_id=" << sync_id << dendl;
494
495 Mutex::Locker locker(m_lock);
496
497 auto it = m_inflight_sync_reqs.find(sync_id);
498 if (it == m_inflight_sync_reqs.end()) {
499 return false;
500 }
501
502 auto sync_ctx = it->second;
503
504 if (sync_ctx->on_start == nullptr) {
505 return false;
506 }
507
508 assert(sync_ctx->req != nullptr);
509 sync_ctx->req->cancel();
510 return true;
511}
512
513template <typename I>
514void InstanceWatcher<I>::notify_sync_start(const std::string &instance_id,
515 const std::string &sync_id) {
516 dout(20) << "sync_id=" << sync_id << dendl;
517
518 Mutex::Locker locker(m_lock);
519
520 uint64_t request_id = ++m_request_seq;
521
522 bufferlist bl;
523 ::encode(NotifyMessage{SyncStartPayload{request_id, sync_id}}, bl);
524
525 auto ctx = new FunctionContext(
526 [this, sync_id] (int r) {
527 dout(20) << "finish: sync_id=" << sync_id << ", r=" << r << dendl;
528 Mutex::Locker locker(m_lock);
529 if (r != -ESTALE && m_image_sync_throttler != nullptr) {
530 m_image_sync_throttler->finish_op(sync_id);
531 }
532 });
533 auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
534 std::move(bl), ctx);
535 req->send();
536}
537
538template <typename I>
539void InstanceWatcher<I>::notify_sync_complete(const std::string &sync_id) {
540 dout(20) << "sync_id=" << sync_id << dendl;
541
542 Mutex::Locker locker(m_lock);
543
544 auto it = m_inflight_sync_reqs.find(sync_id);
545 assert(it != m_inflight_sync_reqs.end());
546
547 auto sync_ctx = it->second;
548 assert(sync_ctx->req == nullptr);
549
550 m_inflight_sync_reqs.erase(it);
551 m_work_queue->queue(sync_ctx, 0);
552}
553
554template <typename I>
555void InstanceWatcher<I>::handle_notify_sync_request(C_SyncRequest *sync_ctx,
556 int r) {
557 dout(20) << "sync_id=" << sync_ctx->sync_id << ", r=" << r << dendl;
558
559 Context *on_start = nullptr;
560 {
561 Mutex::Locker locker(m_lock);
562
563 assert(sync_ctx->req != nullptr);
564 assert(sync_ctx->on_start != nullptr);
565
566 if (sync_ctx->req->canceling) {
567 r = -ECANCELED;
568 }
569
570 std::swap(sync_ctx->on_start, on_start);
571 sync_ctx->req = nullptr;
572 }
573
574 on_start->complete(r == -ECANCELED ? r : 0);
575
576 if (r == -ECANCELED) {
577 notify_sync_complete(sync_ctx->sync_id);
578 }
579}
580
581template <typename I>
582void InstanceWatcher<I>::handle_notify_sync_complete(C_SyncRequest *sync_ctx,
583 int r) {
584 dout(20) << "sync_id=" << sync_ctx->sync_id << ", r=" << r << dendl;
585
586 if (sync_ctx->on_complete != nullptr) {
587 sync_ctx->on_complete->complete(r);
588 }
589}
590
591template <typename I>
592void InstanceWatcher<I>::print_sync_status(Formatter *f, stringstream *ss) {
593 dout(20) << dendl;
594
595 Mutex::Locker locker(m_lock);
596 if (m_image_sync_throttler != nullptr) {
597 m_image_sync_throttler->print_status(f, ss);
598 }
599}
600
601template <typename I>
602void InstanceWatcher<I>::handle_acquire_leader() {
603 dout(20) << dendl;
604
605 Mutex::Locker locker(m_lock);
606
607 assert(m_image_sync_throttler == nullptr);
608 m_image_sync_throttler = ImageSyncThrottler<I>::create();
609
610 m_leader_instance_id = m_instance_id;
611 unsuspend_notify_requests();
612}
613
614template <typename I>
615void InstanceWatcher<I>::handle_release_leader() {
616 dout(20) << dendl;
617
618 Mutex::Locker locker(m_lock);
619
620 assert(m_image_sync_throttler != nullptr);
621
622 m_leader_instance_id.clear();
623
624 m_image_sync_throttler->drain(-ESTALE);
625 m_image_sync_throttler->destroy();
626 m_image_sync_throttler = nullptr;
627}
628
629template <typename I>
630void InstanceWatcher<I>::handle_update_leader(
631 const std::string &leader_instance_id) {
632 dout(20) << "leader_instance_id=" << leader_instance_id << dendl;
633
634 Mutex::Locker locker(m_lock);
635
636 m_leader_instance_id = leader_instance_id;
637
638 if (!m_leader_instance_id.empty()) {
639 unsuspend_notify_requests();
640 }
641}
642
7c673cae
FG
643template <typename I>
644void InstanceWatcher<I>::cancel_notify_requests(
645 const std::string &instance_id) {
646 dout(20) << "instance_id=" << instance_id << dendl;
647
648 Mutex::Locker locker(m_lock);
649
650 for (auto op : m_notify_ops) {
31f18b77 651 if (op.first == instance_id && !op.second->send_to_leader) {
7c673cae
FG
652 op.second->cancel();
653 }
654 }
655}
656
7c673cae
FG
657template <typename I>
658void InstanceWatcher<I>::register_instance() {
659 assert(m_lock.is_locked());
660
661 dout(20) << dendl;
662
663 librados::ObjectWriteOperation op;
664 librbd::cls_client::mirror_instances_add(&op, m_instance_id);
665 librados::AioCompletion *aio_comp = create_rados_callback<
666 InstanceWatcher<I>, &InstanceWatcher<I>::handle_register_instance>(this);
667
668 int r = m_ioctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op);
669 assert(r == 0);
670 aio_comp->release();
671}
672
673template <typename I>
674void InstanceWatcher<I>::handle_register_instance(int r) {
675 dout(20) << "r=" << r << dendl;
676
677 Context *on_finish = nullptr;
678 {
679 Mutex::Locker locker(m_lock);
680
681 if (r == 0) {
682 create_instance_object();
683 return;
684 }
685
686 derr << "error registering instance: " << cpp_strerror(r) << dendl;
687
688 std::swap(on_finish, m_on_finish);
689 }
690 on_finish->complete(r);
691}
692
693
694template <typename I>
695void InstanceWatcher<I>::create_instance_object() {
696 dout(20) << dendl;
697
698 assert(m_lock.is_locked());
699
700 librados::ObjectWriteOperation op;
701 op.create(true);
702
703 librados::AioCompletion *aio_comp = create_rados_callback<
704 InstanceWatcher<I>,
705 &InstanceWatcher<I>::handle_create_instance_object>(this);
706 int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
707 assert(r == 0);
708 aio_comp->release();
709}
710
711template <typename I>
712void InstanceWatcher<I>::handle_create_instance_object(int r) {
713 dout(20) << "r=" << r << dendl;
714
715 Mutex::Locker locker(m_lock);
716
717 if (r < 0) {
718 derr << "error creating " << m_oid << " object: " << cpp_strerror(r)
719 << dendl;
720
721 m_ret_val = r;
722 unregister_instance();
723 return;
724 }
725
726 register_watch();
727}
728
729template <typename I>
730void InstanceWatcher<I>::register_watch() {
731 dout(20) << dendl;
732
733 assert(m_lock.is_locked());
734
735 Context *ctx = create_async_context_callback(
736 m_work_queue, create_context_callback<
737 InstanceWatcher<I>, &InstanceWatcher<I>::handle_register_watch>(this));
738
739 librbd::Watcher::register_watch(ctx);
740}
741
742template <typename I>
743void InstanceWatcher<I>::handle_register_watch(int r) {
744 dout(20) << "r=" << r << dendl;
745
746 Mutex::Locker locker(m_lock);
747
748 if (r < 0) {
749 derr << "error registering instance watcher for " << m_oid << " object: "
750 << cpp_strerror(r) << dendl;
751
752 m_ret_val = r;
753 remove_instance_object();
754 return;
755 }
756
757 acquire_lock();
758}
759
760template <typename I>
761void InstanceWatcher<I>::acquire_lock() {
762 dout(20) << dendl;
763
764 assert(m_lock.is_locked());
765
766 Context *ctx = create_async_context_callback(
767 m_work_queue, create_context_callback<
768 InstanceWatcher<I>, &InstanceWatcher<I>::handle_acquire_lock>(this));
769
770 m_instance_lock->acquire_lock(ctx);
771}
772
773template <typename I>
774void InstanceWatcher<I>::handle_acquire_lock(int r) {
775 dout(20) << "r=" << r << dendl;
776
777 Context *on_finish = nullptr;
778 {
779 Mutex::Locker locker(m_lock);
780
781 if (r < 0) {
782
783 derr << "error acquiring instance lock: " << cpp_strerror(r) << dendl;
784
785 m_ret_val = r;
786 unregister_watch();
787 return;
788 }
789
790 std::swap(on_finish, m_on_finish);
791 }
792
793 on_finish->complete(r);
794}
795
796template <typename I>
797void InstanceWatcher<I>::release_lock() {
798 dout(20) << dendl;
799
800 assert(m_lock.is_locked());
801
802 Context *ctx = create_async_context_callback(
803 m_work_queue, create_context_callback<
804 InstanceWatcher<I>, &InstanceWatcher<I>::handle_release_lock>(this));
805
806 m_instance_lock->shut_down(ctx);
807}
808
809template <typename I>
810void InstanceWatcher<I>::handle_release_lock(int r) {
811 dout(20) << "r=" << r << dendl;
812
813 Mutex::Locker locker(m_lock);
814
815 if (r < 0) {
816 derr << "error releasing instance lock: " << cpp_strerror(r) << dendl;
817 }
818
819 unregister_watch();
820}
821
822template <typename I>
823void InstanceWatcher<I>::unregister_watch() {
824 dout(20) << dendl;
825
826 assert(m_lock.is_locked());
827
828 Context *ctx = create_async_context_callback(
829 m_work_queue, create_context_callback<
830 InstanceWatcher<I>, &InstanceWatcher<I>::handle_unregister_watch>(this));
831
832 librbd::Watcher::unregister_watch(ctx);
833}
834
835template <typename I>
836void InstanceWatcher<I>::handle_unregister_watch(int r) {
837 dout(20) << "r=" << r << dendl;
838
839 if (r < 0) {
840 derr << "error unregistering instance watcher for " << m_oid << " object: "
841 << cpp_strerror(r) << dendl;
842 }
843
844 Mutex::Locker locker(m_lock);
845 remove_instance_object();
846}
847
848template <typename I>
849void InstanceWatcher<I>::remove_instance_object() {
850 assert(m_lock.is_locked());
851
852 dout(20) << dendl;
853
854 librados::ObjectWriteOperation op;
855 op.remove();
856
857 librados::AioCompletion *aio_comp = create_rados_callback<
858 InstanceWatcher<I>,
859 &InstanceWatcher<I>::handle_remove_instance_object>(this);
860 int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
861 assert(r == 0);
862 aio_comp->release();
863}
864
865template <typename I>
866void InstanceWatcher<I>::handle_remove_instance_object(int r) {
867 dout(20) << "r=" << r << dendl;
868
869 if (m_removing && r == -ENOENT) {
870 r = 0;
871 }
872
873 if (r < 0) {
874 derr << "error removing " << m_oid << " object: " << cpp_strerror(r)
875 << dendl;
876 }
877
878 Mutex::Locker locker(m_lock);
879 unregister_instance();
880}
881
882template <typename I>
883void InstanceWatcher<I>::unregister_instance() {
884 dout(20) << dendl;
885
886 assert(m_lock.is_locked());
887
888 librados::ObjectWriteOperation op;
889 librbd::cls_client::mirror_instances_remove(&op, m_instance_id);
890 librados::AioCompletion *aio_comp = create_rados_callback<
891 InstanceWatcher<I>, &InstanceWatcher<I>::handle_unregister_instance>(this);
892
893 int r = m_ioctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op);
894 assert(r == 0);
895 aio_comp->release();
896}
897
898template <typename I>
899void InstanceWatcher<I>::handle_unregister_instance(int r) {
900 dout(20) << "r=" << r << dendl;
901
902 if (r < 0) {
903 derr << "error unregistering instance: " << cpp_strerror(r) << dendl;
904 }
905
906 Mutex::Locker locker(m_lock);
907 wait_for_notify_ops();
908}
909
910template <typename I>
911void InstanceWatcher<I>::wait_for_notify_ops() {
912 dout(20) << dendl;
913
914 assert(m_lock.is_locked());
915
916 for (auto op : m_notify_ops) {
917 op.second->cancel();
918 }
919
920 Context *ctx = create_async_context_callback(
921 m_work_queue, create_context_callback<
922 InstanceWatcher<I>, &InstanceWatcher<I>::handle_wait_for_notify_ops>(this));
923
924 m_notify_op_tracker.wait_for_ops(ctx);
925}
926
927template <typename I>
928void InstanceWatcher<I>::handle_wait_for_notify_ops(int r) {
929 dout(20) << "r=" << r << dendl;
930
931 assert(r == 0);
932
933 Context *on_finish = nullptr;
934 {
935 Mutex::Locker locker(m_lock);
936
937 assert(m_notify_ops.empty());
938
939 std::swap(on_finish, m_on_finish);
940 r = m_ret_val;
941
942 if (m_removing) {
943 m_removing = false;
944 }
945 }
946 on_finish->complete(r);
947}
948
949template <typename I>
950void InstanceWatcher<I>::get_instance_locker() {
951 dout(20) << dendl;
952
953 assert(m_lock.is_locked());
954
955 Context *ctx = create_async_context_callback(
956 m_work_queue, create_context_callback<
957 InstanceWatcher<I>, &InstanceWatcher<I>::handle_get_instance_locker>(this));
958
959 m_instance_lock->get_locker(&m_instance_locker, ctx);
960}
961
962template <typename I>
963void InstanceWatcher<I>::handle_get_instance_locker(int r) {
964 dout(20) << "r=" << r << dendl;
965
966 Mutex::Locker locker(m_lock);
967
968 if (r < 0) {
969 if (r != -ENOENT) {
970 derr << "error retrieving instance locker: " << cpp_strerror(r) << dendl;
971 }
972 remove_instance_object();
973 return;
974 }
975
976 break_instance_lock();
977}
978
979template <typename I>
980void InstanceWatcher<I>::break_instance_lock() {
981 dout(20) << dendl;
982
983 assert(m_lock.is_locked());
984
985 Context *ctx = create_async_context_callback(
986 m_work_queue, create_context_callback<
987 InstanceWatcher<I>, &InstanceWatcher<I>::handle_break_instance_lock>(this));
988
989 m_instance_lock->break_lock(m_instance_locker, true, ctx);
990}
991
992template <typename I>
993void InstanceWatcher<I>::handle_break_instance_lock(int r) {
994 dout(20) << "r=" << r << dendl;
995
996 Mutex::Locker locker(m_lock);
997
998 if (r < 0) {
999 if (r != -ENOENT) {
1000 derr << "error breaking instance lock: " << cpp_strerror(r) << dendl;
1001 }
1002 remove_instance_object();
1003 return;
1004 }
1005
1006 remove_instance_object();
1007}
1008
31f18b77
FG
1009template <typename I>
1010void InstanceWatcher<I>::suspend_notify_request(C_NotifyInstanceRequest *req) {
1011 dout(20) << req << dendl;
1012
1013 assert(m_lock.is_locked());
1014
1015 auto result = m_suspended_ops.insert(req).second;
1016 assert(result);
1017}
1018
1019template <typename I>
1020bool InstanceWatcher<I>::unsuspend_notify_request(
1021 C_NotifyInstanceRequest *req) {
1022 dout(20) << req << dendl;
1023
1024 assert(m_lock.is_locked());
1025
1026 auto result = m_suspended_ops.erase(req);
1027 if (result == 0) {
1028 return false;
1029 }
1030
1031 req->send();
1032 return true;
1033}
1034
1035template <typename I>
1036void InstanceWatcher<I>::unsuspend_notify_requests() {
1037 dout(20) << dendl;
1038
1039 assert(m_lock.is_locked());
1040
1041 std::set<C_NotifyInstanceRequest *> suspended_ops;
1042 std::swap(m_suspended_ops, suspended_ops);
1043
1044 for (auto op : suspended_ops) {
1045 op->send();
1046 }
1047}
1048
7c673cae
FG
1049template <typename I>
1050Context *InstanceWatcher<I>::prepare_request(const std::string &instance_id,
1051 uint64_t request_id,
1052 C_NotifyAck *on_notify_ack) {
1053 dout(20) << "instance_id=" << instance_id << ", request_id=" << request_id
1054 << dendl;
1055
1056 Mutex::Locker locker(m_lock);
1057
1058 Context *ctx = nullptr;
1059 Request request(instance_id, request_id);
1060 auto it = m_requests.find(request);
1061
1062 if (it != m_requests.end()) {
1063 dout(20) << "duplicate for in-progress request" << dendl;
1064 delete it->on_notify_ack;
1065 m_requests.erase(it);
1066 } else {
31f18b77
FG
1067 ctx = create_async_context_callback(
1068 m_work_queue, new FunctionContext(
1069 [this, instance_id, request_id] (int r) {
1070 complete_request(instance_id, request_id, r);
1071 }));
7c673cae
FG
1072 }
1073
1074 request.on_notify_ack = on_notify_ack;
1075 m_requests.insert(request);
1076 return ctx;
1077}
1078
31f18b77
FG
1079template <typename I>
1080void InstanceWatcher<I>::complete_request(const std::string &instance_id,
1081 uint64_t request_id, int r) {
1082 dout(20) << "instance_id=" << instance_id << ", request_id=" << request_id
1083 << dendl;
1084
1085 C_NotifyAck *on_notify_ack;
1086 {
1087 Mutex::Locker locker(m_lock);
1088 Request request(instance_id, request_id);
1089 auto it = m_requests.find(request);
1090 assert(it != m_requests.end());
1091 on_notify_ack = it->on_notify_ack;
1092 m_requests.erase(it);
1093 }
1094
1095 ::encode(NotifyAckPayload(instance_id, request_id, r), on_notify_ack->out);
1096 on_notify_ack->complete(0);
1097}
1098
7c673cae
FG
1099template <typename I>
1100void InstanceWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
1101 uint64_t notifier_id, bufferlist &bl) {
1102 dout(20) << "notify_id=" << notify_id << ", handle=" << handle << ", "
1103 << "notifier_id=" << notifier_id << dendl;
1104
1105 auto ctx = new C_NotifyAck(this, notify_id, handle);
1106
1107 NotifyMessage notify_message;
1108 try {
1109 bufferlist::iterator iter = bl.begin();
1110 ::decode(notify_message, iter);
1111 } catch (const buffer::error &err) {
1112 derr << "error decoding image notification: " << err.what() << dendl;
1113 ctx->complete(0);
1114 return;
1115 }
1116
1117 apply_visitor(HandlePayloadVisitor(this, stringify(notifier_id), ctx),
1118 notify_message.payload);
1119}
1120
1121template <typename I>
1122void InstanceWatcher<I>::handle_image_acquire(
d2e6a577 1123 const std::string &global_image_id, Context *on_finish) {
7c673cae
FG
1124 dout(20) << "global_image_id=" << global_image_id << dendl;
1125
31f18b77 1126 auto ctx = new FunctionContext(
d2e6a577
FG
1127 [this, global_image_id, on_finish] (int r) {
1128 m_instance_replayer->acquire_image(this, global_image_id, on_finish);
31f18b77
FG
1129 m_notify_op_tracker.finish_op();
1130 });
1131
1132 m_notify_op_tracker.start_op();
1133 m_work_queue->queue(ctx, 0);
7c673cae
FG
1134}
1135
1136template <typename I>
1137void InstanceWatcher<I>::handle_image_release(
d2e6a577 1138 const std::string &global_image_id, Context *on_finish) {
7c673cae
FG
1139 dout(20) << "global_image_id=" << global_image_id << dendl;
1140
31f18b77 1141 auto ctx = new FunctionContext(
d2e6a577
FG
1142 [this, global_image_id, on_finish] (int r) {
1143 m_instance_replayer->release_image(global_image_id, on_finish);
1144 m_notify_op_tracker.finish_op();
1145 });
1146
1147 m_notify_op_tracker.start_op();
1148 m_work_queue->queue(ctx, 0);
1149}
1150
1151template <typename I>
1152void InstanceWatcher<I>::handle_peer_image_removed(
1153 const std::string &global_image_id, const std::string &peer_mirror_uuid,
1154 Context *on_finish) {
1155 dout(20) << "global_image_id=" << global_image_id << ", "
1156 << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
1157
1158 auto ctx = new FunctionContext(
1159 [this, peer_mirror_uuid, global_image_id, on_finish] (int r) {
1160 m_instance_replayer->remove_peer_image(global_image_id,
1161 peer_mirror_uuid, on_finish);
31f18b77
FG
1162 m_notify_op_tracker.finish_op();
1163 });
1164
1165 m_notify_op_tracker.start_op();
1166 m_work_queue->queue(ctx, 0);
1167}
1168
1169template <typename I>
1170void InstanceWatcher<I>::handle_sync_request(const std::string &instance_id,
1171 const std::string &sync_id,
1172 Context *on_finish) {
1173 dout(20) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl;
1174
1175 Mutex::Locker locker(m_lock);
1176
1177 if (m_image_sync_throttler == nullptr) {
1178 dout(20) << "sync request for non-leader" << dendl;
1179 m_work_queue->queue(on_finish, -ESTALE);
1180 return;
1181 }
1182
1183 Context *on_start = create_async_context_callback(
1184 m_work_queue, new FunctionContext(
1185 [this, instance_id, sync_id, on_finish] (int r) {
1186 dout(20) << "handle_sync_request: finish: instance_id=" << instance_id
1187 << ", sync_id=" << sync_id << ", r=" << r << dendl;
1188 if (r == 0) {
1189 notify_sync_start(instance_id, sync_id);
1190 }
1191 on_finish->complete(r);
1192 }));
1193 m_image_sync_throttler->start_op(sync_id, on_start);
1194}
1195
1196template <typename I>
1197void InstanceWatcher<I>::handle_sync_start(const std::string &instance_id,
1198 const std::string &sync_id,
1199 Context *on_finish) {
1200 dout(20) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl;
1201
1202 Mutex::Locker locker(m_lock);
1203
1204 auto it = m_inflight_sync_reqs.find(sync_id);
1205 if (it == m_inflight_sync_reqs.end()) {
1206 dout(20) << "not found" << dendl;
1207 m_work_queue->queue(on_finish, 0);
1208 return;
1209 }
1210
1211 auto sync_ctx = it->second;
1212
1213 if (sync_ctx->on_complete != nullptr) {
1214 dout(20) << "duplicate request" << dendl;
1215 m_work_queue->queue(sync_ctx->on_complete, -ESTALE);
1216 }
1217
1218 sync_ctx->on_complete = on_finish;
7c673cae
FG
1219}
1220
1221template <typename I>
1222void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1223 const ImageAcquirePayload &payload,
1224 C_NotifyAck *on_notify_ack) {
1225 dout(20) << "image_acquire: instance_id=" << instance_id << ", "
1226 << "request_id=" << payload.request_id << dendl;
1227
1228 auto on_finish = prepare_request(instance_id, payload.request_id,
1229 on_notify_ack);
1230 if (on_finish != nullptr) {
d2e6a577 1231 handle_image_acquire(payload.global_image_id, on_finish);
7c673cae
FG
1232 }
1233}
1234
1235template <typename I>
1236void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1237 const ImageReleasePayload &payload,
1238 C_NotifyAck *on_notify_ack) {
1239 dout(20) << "image_release: instance_id=" << instance_id << ", "
1240 << "request_id=" << payload.request_id << dendl;
1241
1242 auto on_finish = prepare_request(instance_id, payload.request_id,
1243 on_notify_ack);
1244 if (on_finish != nullptr) {
d2e6a577
FG
1245 handle_image_release(payload.global_image_id, on_finish);
1246 }
1247}
1248
1249template <typename I>
1250void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1251 const PeerImageRemovedPayload &payload,
1252 C_NotifyAck *on_notify_ack) {
1253 dout(20) << "remove_peer_image: instance_id=" << instance_id << ", "
1254 << "request_id=" << payload.request_id << dendl;
1255
1256 auto on_finish = prepare_request(instance_id, payload.request_id,
1257 on_notify_ack);
1258 if (on_finish != nullptr) {
1259 handle_peer_image_removed(payload.global_image_id, payload.peer_mirror_uuid,
1260 on_finish);
7c673cae
FG
1261 }
1262}
1263
31f18b77
FG
1264template <typename I>
1265void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1266 const SyncRequestPayload &payload,
1267 C_NotifyAck *on_notify_ack) {
1268 dout(20) << "sync_request: instance_id=" << instance_id << ", "
1269 << "request_id=" << payload.request_id << dendl;
1270
1271 auto on_finish = prepare_request(instance_id, payload.request_id,
1272 on_notify_ack);
1273 if (on_finish == nullptr) {
1274 return;
1275 }
1276
1277 handle_sync_request(instance_id, payload.sync_id, on_finish);
1278}
1279
1280template <typename I>
1281void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1282 const SyncStartPayload &payload,
1283 C_NotifyAck *on_notify_ack) {
1284 dout(20) << "sync_start: instance_id=" << instance_id << ", "
1285 << "request_id=" << payload.request_id << dendl;
1286
1287 auto on_finish = prepare_request(instance_id, payload.request_id,
1288 on_notify_ack);
1289 if (on_finish == nullptr) {
1290 return;
1291 }
1292
1293 handle_sync_start(instance_id, payload.sync_id, on_finish);
1294}
1295
7c673cae
FG
1296template <typename I>
1297void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1298 const UnknownPayload &payload,
1299 C_NotifyAck *on_notify_ack) {
1300 dout(20) << "unknown: instance_id=" << instance_id << dendl;
1301
1302 on_notify_ack->complete(0);
1303}
1304
1305} // namespace mirror
1306} // namespace rbd
1307
1308template class rbd::mirror::InstanceWatcher<librbd::ImageCtx>;