]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/InstanceWatcher.cc
bump version to 18.2.4-pve3
[ceph.git] / ceph / src / tools / rbd_mirror / InstanceWatcher.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "InstanceWatcher.h"
7c673cae
FG
5#include "include/stringify.h"
6#include "common/debug.h"
7#include "common/errno.h"
8#include "cls/rbd/cls_rbd_client.h"
f67539c2 9#include "librbd/AsioEngine.h"
7c673cae
FG
10#include "librbd/ManagedLock.h"
11#include "librbd/Utils.h"
f67539c2 12#include "librbd/asio/ContextWQ.h"
7c673cae 13#include "InstanceReplayer.h"
9f95a23c 14#include "Throttler.h"
11fdf7f2 15#include "common/Cond.h"
7c673cae
FG
16
17#define dout_context g_ceph_context
18#define dout_subsys ceph_subsys_rbd_mirror
19#undef dout_prefix
20#define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: "
21
22namespace rbd {
23namespace mirror {
24
25using namespace instance_watcher;
26
27using librbd::util::create_async_context_callback;
28using librbd::util::create_context_callback;
29using librbd::util::create_rados_callback;
30using librbd::util::unique_lock_name;
31
32namespace {
33
34struct C_GetInstances : public Context {
35 std::vector<std::string> *instance_ids;
36 Context *on_finish;
37 bufferlist out_bl;
38
39 C_GetInstances(std::vector<std::string> *instance_ids, Context *on_finish)
40 : instance_ids(instance_ids), on_finish(on_finish) {
41 }
42
43 void finish(int r) override {
11fdf7f2 44 dout(10) << "C_GetInstances: " << this << " " << __func__ << ": r=" << r
7c673cae
FG
45 << dendl;
46
47 if (r == 0) {
11fdf7f2 48 auto it = out_bl.cbegin();
7c673cae
FG
49 r = librbd::cls_client::mirror_instances_list_finish(&it, instance_ids);
50 } else if (r == -ENOENT) {
51 r = 0;
52 }
53 on_finish->complete(r);
54 }
55};
56
57template <typename I>
58struct C_RemoveInstanceRequest : public Context {
59 InstanceWatcher<I> instance_watcher;
60 Context *on_finish;
61
f67539c2
TL
62 C_RemoveInstanceRequest(librados::IoCtx &io_ctx,
63 librbd::AsioEngine& asio_engine,
7c673cae 64 const std::string &instance_id, Context *on_finish)
f67539c2 65 : instance_watcher(io_ctx, asio_engine, nullptr, nullptr, instance_id),
7c673cae
FG
66 on_finish(on_finish) {
67 }
68
69 void send() {
11fdf7f2 70 dout(10) << "C_RemoveInstanceRequest: " << this << " " << __func__ << dendl;
7c673cae
FG
71
72 instance_watcher.remove(this);
73 }
74
75 void finish(int r) override {
11fdf7f2 76 dout(10) << "C_RemoveInstanceRequest: " << this << " " << __func__ << ": r="
7c673cae 77 << r << dendl;
11fdf7f2 78 ceph_assert(r == 0);
7c673cae
FG
79
80 on_finish->complete(r);
81 }
82};
83
84} // anonymous namespace
85
86template <typename I>
87struct InstanceWatcher<I>::C_NotifyInstanceRequest : public Context {
88 InstanceWatcher<I> *instance_watcher;
7c673cae
FG
89 std::string instance_id;
90 uint64_t request_id;
91 bufferlist bl;
92 Context *on_finish;
31f18b77
FG
93 bool send_to_leader;
94 std::unique_ptr<librbd::watcher::Notifier> notifier;
7c673cae 95 librbd::watcher::NotifyResponse response;
31f18b77 96 bool canceling = false;
7c673cae
FG
97
98 C_NotifyInstanceRequest(InstanceWatcher<I> *instance_watcher,
99 const std::string &instance_id, uint64_t request_id,
100 bufferlist &&bl, Context *on_finish)
31f18b77
FG
101 : instance_watcher(instance_watcher), instance_id(instance_id),
102 request_id(request_id), bl(bl), on_finish(on_finish),
103 send_to_leader(instance_id.empty()) {
11fdf7f2 104 dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__
31f18b77
FG
105 << ": instance_watcher=" << instance_watcher << ", instance_id="
106 << instance_id << ", request_id=" << request_id << dendl;
107
9f95a23c 108 ceph_assert(ceph_mutex_is_locked(instance_watcher->m_lock));
31f18b77
FG
109
110 if (!send_to_leader) {
11fdf7f2 111 ceph_assert((!instance_id.empty()));
31f18b77
FG
112 notifier.reset(new librbd::watcher::Notifier(
113 instance_watcher->m_work_queue,
114 instance_watcher->m_ioctx,
115 RBD_MIRROR_INSTANCE_PREFIX + instance_id));
116 }
117
118 instance_watcher->m_notify_op_tracker.start_op();
7c673cae
FG
119 auto result = instance_watcher->m_notify_ops.insert(
120 std::make_pair(instance_id, this)).second;
11fdf7f2 121 ceph_assert(result);
7c673cae
FG
122 }
123
124 void send() {
11fdf7f2 125 dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl;
7c673cae 126
9f95a23c 127 ceph_assert(ceph_mutex_is_locked(instance_watcher->m_lock));
31f18b77
FG
128
129 if (canceling) {
11fdf7f2 130 dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__
31f18b77
FG
131 << ": canceling" << dendl;
132 instance_watcher->m_work_queue->queue(this, -ECANCELED);
133 return;
134 }
135
136 if (send_to_leader) {
137 if (instance_watcher->m_leader_instance_id.empty()) {
11fdf7f2 138 dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__
31f18b77
FG
139 << ": suspending" << dendl;
140 instance_watcher->suspend_notify_request(this);
141 return;
142 }
143
144 if (instance_watcher->m_leader_instance_id != instance_id) {
145 auto count = instance_watcher->m_notify_ops.erase(
146 std::make_pair(instance_id, this));
11fdf7f2 147 ceph_assert(count > 0);
31f18b77
FG
148
149 instance_id = instance_watcher->m_leader_instance_id;
150
151 auto result = instance_watcher->m_notify_ops.insert(
152 std::make_pair(instance_id, this)).second;
11fdf7f2 153 ceph_assert(result);
31f18b77
FG
154
155 notifier.reset(new librbd::watcher::Notifier(
156 instance_watcher->m_work_queue,
157 instance_watcher->m_ioctx,
158 RBD_MIRROR_INSTANCE_PREFIX + instance_id));
159 }
160 }
161
11fdf7f2 162 dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__
d2e6a577 163 << ": sending to " << instance_id << dendl;
31f18b77 164 notifier->notify(bl, &response, this);
7c673cae
FG
165 }
166
167 void cancel() {
11fdf7f2 168 dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl;
7c673cae 169
9f95a23c 170 ceph_assert(ceph_mutex_is_locked(instance_watcher->m_lock));
31f18b77
FG
171
172 canceling = true;
173 instance_watcher->unsuspend_notify_request(this);
7c673cae
FG
174 }
175
176 void finish(int r) override {
11fdf7f2 177 dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__ << ": r="
7c673cae
FG
178 << r << dendl;
179
180 if (r == 0 || r == -ETIMEDOUT) {
181 bool found = false;
182 for (auto &it : response.acks) {
183 auto &bl = it.second;
184 if (it.second.length() == 0) {
11fdf7f2
TL
185 dout(5) << "C_NotifyInstanceRequest: " << this << " " << __func__
186 << ": no payload in ack, ignoring" << dendl;
7c673cae
FG
187 continue;
188 }
189 try {
11fdf7f2 190 auto iter = bl.cbegin();
7c673cae 191 NotifyAckPayload ack;
11fdf7f2 192 decode(ack, iter);
7c673cae
FG
193 if (ack.instance_id != instance_watcher->get_instance_id()) {
194 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
195 << ": ack instance_id (" << ack.instance_id << ") "
196 << "does not match, ignoring" << dendl;
197 continue;
198 }
199 if (ack.request_id != request_id) {
200 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
201 << ": ack request_id (" << ack.request_id << ") "
202 << "does not match, ignoring" << dendl;
203 continue;
204 }
205 r = ack.ret_val;
206 found = true;
207 break;
208 } catch (const buffer::error &err) {
209 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
210 << ": failed to decode ack: " << err.what() << dendl;
211 continue;
212 }
213 }
214
215 if (!found) {
216 if (r == -ETIMEDOUT) {
31f18b77
FG
217 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
218 << ": resending after timeout" << dendl;
9f95a23c 219 std::lock_guard locker{instance_watcher->m_lock};
31f18b77
FG
220 send();
221 return;
7c673cae
FG
222 } else {
223 r = -EINVAL;
224 }
31f18b77
FG
225 } else {
226 if (r == -ESTALE && send_to_leader) {
227 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
228 << ": resending due to leader change" << dendl;
9f95a23c 229 std::lock_guard locker{instance_watcher->m_lock};
31f18b77
FG
230 send();
231 return;
232 }
7c673cae
FG
233 }
234 }
235
7c673cae
FG
236 on_finish->complete(r);
237
31f18b77 238 {
9f95a23c 239 std::lock_guard locker{instance_watcher->m_lock};
31f18b77 240 auto result = instance_watcher->m_notify_ops.erase(
7c673cae 241 std::make_pair(instance_id, this));
11fdf7f2 242 ceph_assert(result > 0);
31f18b77
FG
243 instance_watcher->m_notify_op_tracker.finish_op();
244 }
245
7c673cae
FG
246 delete this;
247 }
248
249 void complete(int r) override {
250 finish(r);
251 }
252};
253
31f18b77
FG
254template <typename I>
255struct InstanceWatcher<I>::C_SyncRequest : public Context {
256 InstanceWatcher<I> *instance_watcher;
257 std::string sync_id;
258 Context *on_start;
259 Context *on_complete = nullptr;
260 C_NotifyInstanceRequest *req = nullptr;
261
262 C_SyncRequest(InstanceWatcher<I> *instance_watcher,
263 const std::string &sync_id, Context *on_start)
264 : instance_watcher(instance_watcher), sync_id(sync_id),
265 on_start(on_start) {
11fdf7f2 266 dout(10) << "C_SyncRequest: " << this << " " << __func__ << ": sync_id="
31f18b77
FG
267 << sync_id << dendl;
268 }
269
270 void finish(int r) override {
11fdf7f2 271 dout(10) << "C_SyncRequest: " << this << " " << __func__ << ": r="
31f18b77
FG
272 << r << dendl;
273
274 if (on_start != nullptr) {
275 instance_watcher->handle_notify_sync_request(this, r);
276 } else {
277 instance_watcher->handle_notify_sync_complete(this, r);
278 delete this;
279 }
280 }
281
282 // called twice
283 void complete(int r) override {
284 finish(r);
285 }
286};
287
7c673cae
FG
288#undef dout_prefix
289#define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: " \
290 << this << " " << __func__ << ": "
291template <typename I>
292void InstanceWatcher<I>::get_instances(librados::IoCtx &io_ctx,
293 std::vector<std::string> *instance_ids,
294 Context *on_finish) {
295 librados::ObjectReadOperation op;
296 librbd::cls_client::mirror_instances_list_start(&op);
297 C_GetInstances *ctx = new C_GetInstances(instance_ids, on_finish);
298 librados::AioCompletion *aio_comp = create_rados_callback(ctx);
299
300 int r = io_ctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op, &ctx->out_bl);
11fdf7f2 301 ceph_assert(r == 0);
7c673cae
FG
302 aio_comp->release();
303}
304
305template <typename I>
306void InstanceWatcher<I>::remove_instance(librados::IoCtx &io_ctx,
f67539c2 307 librbd::AsioEngine& asio_engine,
7c673cae
FG
308 const std::string &instance_id,
309 Context *on_finish) {
f67539c2 310 auto req = new C_RemoveInstanceRequest<I>(io_ctx, asio_engine, instance_id,
7c673cae
FG
311 on_finish);
312 req->send();
313}
314
315template <typename I>
316InstanceWatcher<I> *InstanceWatcher<I>::create(
f67539c2 317 librados::IoCtx &io_ctx, librbd::AsioEngine& asio_engine,
9f95a23c
TL
318 InstanceReplayer<I> *instance_replayer,
319 Throttler<I> *image_sync_throttler) {
f67539c2 320 return new InstanceWatcher<I>(io_ctx, asio_engine, instance_replayer,
9f95a23c 321 image_sync_throttler,
7c673cae
FG
322 stringify(io_ctx.get_instance_id()));
323}
324
325template <typename I>
326InstanceWatcher<I>::InstanceWatcher(librados::IoCtx &io_ctx,
f67539c2 327 librbd::AsioEngine& asio_engine,
7c673cae 328 InstanceReplayer<I> *instance_replayer,
9f95a23c 329 Throttler<I> *image_sync_throttler,
7c673cae 330 const std::string &instance_id)
f67539c2
TL
331 : Watcher(io_ctx, asio_engine.get_work_queue(),
332 RBD_MIRROR_INSTANCE_PREFIX + instance_id),
9f95a23c
TL
333 m_instance_replayer(instance_replayer),
334 m_image_sync_throttler(image_sync_throttler), m_instance_id(instance_id),
335 m_lock(ceph::make_mutex(
336 unique_lock_name("rbd::mirror::InstanceWatcher::m_lock", this))),
7c673cae 337 m_instance_lock(librbd::ManagedLock<I>::create(
f67539c2
TL
338 m_ioctx, asio_engine, m_oid, this, librbd::managed_lock::EXCLUSIVE, true,
339 m_cct->_conf.get_val<uint64_t>("rbd_blocklist_expire_seconds"))) {
7c673cae
FG
340}
341
342template <typename I>
343InstanceWatcher<I>::~InstanceWatcher() {
11fdf7f2
TL
344 ceph_assert(m_requests.empty());
345 ceph_assert(m_notify_ops.empty());
346 ceph_assert(m_notify_op_tracker.empty());
347 ceph_assert(m_suspended_ops.empty());
348 ceph_assert(m_inflight_sync_reqs.empty());
7c673cae
FG
349 m_instance_lock->destroy();
350}
351
352template <typename I>
353int InstanceWatcher<I>::init() {
354 C_SaferCond init_ctx;
355 init(&init_ctx);
356 return init_ctx.wait();
357}
358
359template <typename I>
360void InstanceWatcher<I>::init(Context *on_finish) {
11fdf7f2 361 dout(10) << "instance_id=" << m_instance_id << dendl;
7c673cae 362
9f95a23c 363 std::lock_guard locker{m_lock};
7c673cae 364
11fdf7f2 365 ceph_assert(m_on_finish == nullptr);
7c673cae
FG
366 m_on_finish = on_finish;
367 m_ret_val = 0;
368
369 register_instance();
370}
371
372template <typename I>
373void InstanceWatcher<I>::shut_down() {
374 C_SaferCond shut_down_ctx;
375 shut_down(&shut_down_ctx);
376 int r = shut_down_ctx.wait();
11fdf7f2 377 ceph_assert(r == 0);
7c673cae
FG
378}
379
380template <typename I>
381void InstanceWatcher<I>::shut_down(Context *on_finish) {
11fdf7f2 382 dout(10) << dendl;
7c673cae 383
9f95a23c 384 std::lock_guard locker{m_lock};
7c673cae 385
11fdf7f2 386 ceph_assert(m_on_finish == nullptr);
7c673cae
FG
387 m_on_finish = on_finish;
388 m_ret_val = 0;
389
390 release_lock();
391}
392
393template <typename I>
394void InstanceWatcher<I>::remove(Context *on_finish) {
11fdf7f2 395 dout(10) << dendl;
7c673cae 396
9f95a23c 397 std::lock_guard locker{m_lock};
7c673cae 398
11fdf7f2 399 ceph_assert(m_on_finish == nullptr);
7c673cae
FG
400 m_on_finish = on_finish;
401 m_ret_val = 0;
7c673cae
FG
402
403 get_instance_locker();
404}
405
406template <typename I>
407void InstanceWatcher<I>::notify_image_acquire(
408 const std::string &instance_id, const std::string &global_image_id,
d2e6a577 409 Context *on_notify_ack) {
11fdf7f2 410 dout(10) << "instance_id=" << instance_id << ", global_image_id="
7c673cae
FG
411 << global_image_id << dendl;
412
9f95a23c 413 std::lock_guard locker{m_lock};
7c673cae 414
11fdf7f2 415 ceph_assert(m_on_finish == nullptr);
7c673cae 416
11fdf7f2
TL
417 uint64_t request_id = ++m_request_seq;
418 bufferlist bl;
419 encode(NotifyMessage{ImageAcquirePayload{request_id, global_image_id}}, bl);
420 auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
421 std::move(bl), on_notify_ack);
422 req->send();
7c673cae
FG
423}
424
425template <typename I>
426void InstanceWatcher<I>::notify_image_release(
d2e6a577
FG
427 const std::string &instance_id, const std::string &global_image_id,
428 Context *on_notify_ack) {
11fdf7f2 429 dout(10) << "instance_id=" << instance_id << ", global_image_id="
7c673cae
FG
430 << global_image_id << dendl;
431
9f95a23c 432 std::lock_guard locker{m_lock};
7c673cae 433
11fdf7f2 434 ceph_assert(m_on_finish == nullptr);
7c673cae 435
11fdf7f2
TL
436 uint64_t request_id = ++m_request_seq;
437 bufferlist bl;
438 encode(NotifyMessage{ImageReleasePayload{request_id, global_image_id}}, bl);
439 auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
440 std::move(bl), on_notify_ack);
441 req->send();
d2e6a577
FG
442}
443
444template <typename I>
445void InstanceWatcher<I>::notify_peer_image_removed(
446 const std::string &instance_id, const std::string &global_image_id,
447 const std::string &peer_mirror_uuid, Context *on_notify_ack) {
11fdf7f2 448 dout(10) << "instance_id=" << instance_id << ", "
d2e6a577
FG
449 << "global_image_id=" << global_image_id << ", "
450 << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
451
9f95a23c 452 std::lock_guard locker{m_lock};
11fdf7f2 453 ceph_assert(m_on_finish == nullptr);
d2e6a577 454
11fdf7f2
TL
455 uint64_t request_id = ++m_request_seq;
456 bufferlist bl;
457 encode(NotifyMessage{PeerImageRemovedPayload{request_id, global_image_id,
458 peer_mirror_uuid}}, bl);
459 auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
460 std::move(bl), on_notify_ack);
461 req->send();
7c673cae
FG
462}
463
31f18b77
FG
464template <typename I>
465void InstanceWatcher<I>::notify_sync_request(const std::string &sync_id,
466 Context *on_sync_start) {
11fdf7f2 467 dout(10) << "sync_id=" << sync_id << dendl;
31f18b77 468
9f95a23c 469 std::lock_guard locker{m_lock};
31f18b77 470
11fdf7f2 471 ceph_assert(m_inflight_sync_reqs.count(sync_id) == 0);
31f18b77
FG
472
473 uint64_t request_id = ++m_request_seq;
474
475 bufferlist bl;
11fdf7f2 476 encode(NotifyMessage{SyncRequestPayload{request_id, sync_id}}, bl);
31f18b77
FG
477
478 auto sync_ctx = new C_SyncRequest(this, sync_id, on_sync_start);
479 sync_ctx->req = new C_NotifyInstanceRequest(this, "", request_id,
480 std::move(bl), sync_ctx);
481
482 m_inflight_sync_reqs[sync_id] = sync_ctx;
483 sync_ctx->req->send();
484}
485
486template <typename I>
487bool InstanceWatcher<I>::cancel_sync_request(const std::string &sync_id) {
11fdf7f2 488 dout(10) << "sync_id=" << sync_id << dendl;
31f18b77 489
9f95a23c 490 std::lock_guard locker{m_lock};
31f18b77
FG
491
492 auto it = m_inflight_sync_reqs.find(sync_id);
493 if (it == m_inflight_sync_reqs.end()) {
494 return false;
495 }
496
497 auto sync_ctx = it->second;
498
499 if (sync_ctx->on_start == nullptr) {
500 return false;
501 }
502
11fdf7f2 503 ceph_assert(sync_ctx->req != nullptr);
31f18b77
FG
504 sync_ctx->req->cancel();
505 return true;
506}
507
508template <typename I>
509void InstanceWatcher<I>::notify_sync_start(const std::string &instance_id,
510 const std::string &sync_id) {
11fdf7f2 511 dout(10) << "sync_id=" << sync_id << dendl;
31f18b77 512
9f95a23c 513 std::lock_guard locker{m_lock};
31f18b77
FG
514
515 uint64_t request_id = ++m_request_seq;
516
517 bufferlist bl;
11fdf7f2 518 encode(NotifyMessage{SyncStartPayload{request_id, sync_id}}, bl);
31f18b77 519
9f95a23c 520 auto ctx = new LambdaContext(
31f18b77 521 [this, sync_id] (int r) {
11fdf7f2 522 dout(10) << "finish: sync_id=" << sync_id << ", r=" << r << dendl;
9f95a23c
TL
523 std::lock_guard locker{m_lock};
524 if (r != -ESTALE && is_leader()) {
525 m_image_sync_throttler->finish_op(m_ioctx.get_namespace(), sync_id);
31f18b77
FG
526 }
527 });
528 auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
529 std::move(bl), ctx);
530 req->send();
531}
532
533template <typename I>
534void InstanceWatcher<I>::notify_sync_complete(const std::string &sync_id) {
9f95a23c 535 std::lock_guard locker{m_lock};
28e407b8
AA
536 notify_sync_complete(m_lock, sync_id);
537}
538
539template <typename I>
9f95a23c 540void InstanceWatcher<I>::notify_sync_complete(const ceph::mutex&,
28e407b8
AA
541 const std::string &sync_id) {
542 dout(10) << "sync_id=" << sync_id << dendl;
9f95a23c 543 ceph_assert(ceph_mutex_is_locked(m_lock));
31f18b77
FG
544
545 auto it = m_inflight_sync_reqs.find(sync_id);
11fdf7f2 546 ceph_assert(it != m_inflight_sync_reqs.end());
31f18b77
FG
547
548 auto sync_ctx = it->second;
11fdf7f2 549 ceph_assert(sync_ctx->req == nullptr);
31f18b77
FG
550
551 m_inflight_sync_reqs.erase(it);
552 m_work_queue->queue(sync_ctx, 0);
553}
554
555template <typename I>
556void InstanceWatcher<I>::handle_notify_sync_request(C_SyncRequest *sync_ctx,
557 int r) {
11fdf7f2 558 dout(10) << "sync_id=" << sync_ctx->sync_id << ", r=" << r << dendl;
31f18b77
FG
559
560 Context *on_start = nullptr;
561 {
9f95a23c 562 std::lock_guard locker{m_lock};
11fdf7f2
TL
563 ceph_assert(sync_ctx->req != nullptr);
564 ceph_assert(sync_ctx->on_start != nullptr);
31f18b77
FG
565
566 if (sync_ctx->req->canceling) {
567 r = -ECANCELED;
568 }
569
570 std::swap(sync_ctx->on_start, on_start);
571 sync_ctx->req = nullptr;
28e407b8
AA
572
573 if (r == -ECANCELED) {
574 notify_sync_complete(m_lock, sync_ctx->sync_id);
575 }
31f18b77
FG
576 }
577
578 on_start->complete(r == -ECANCELED ? r : 0);
31f18b77
FG
579}
580
581template <typename I>
582void InstanceWatcher<I>::handle_notify_sync_complete(C_SyncRequest *sync_ctx,
583 int r) {
11fdf7f2 584 dout(10) << "sync_id=" << sync_ctx->sync_id << ", r=" << r << dendl;
31f18b77
FG
585
586 if (sync_ctx->on_complete != nullptr) {
587 sync_ctx->on_complete->complete(r);
588 }
589}
590
31f18b77
FG
591template <typename I>
592void InstanceWatcher<I>::handle_acquire_leader() {
11fdf7f2 593 dout(10) << dendl;
31f18b77 594
9f95a23c 595 std::lock_guard locker{m_lock};
31f18b77
FG
596
597 m_leader_instance_id = m_instance_id;
598 unsuspend_notify_requests();
599}
600
601template <typename I>
602void InstanceWatcher<I>::handle_release_leader() {
11fdf7f2 603 dout(10) << dendl;
31f18b77 604
9f95a23c 605 std::lock_guard locker{m_lock};
31f18b77
FG
606
607 m_leader_instance_id.clear();
608
9f95a23c 609 m_image_sync_throttler->drain(m_ioctx.get_namespace(), -ESTALE);
31f18b77
FG
610}
611
612template <typename I>
613void InstanceWatcher<I>::handle_update_leader(
614 const std::string &leader_instance_id) {
11fdf7f2 615 dout(10) << "leader_instance_id=" << leader_instance_id << dendl;
31f18b77 616
9f95a23c 617 std::lock_guard locker{m_lock};
31f18b77
FG
618
619 m_leader_instance_id = leader_instance_id;
620
621 if (!m_leader_instance_id.empty()) {
622 unsuspend_notify_requests();
623 }
624}
625
7c673cae
FG
626template <typename I>
627void InstanceWatcher<I>::cancel_notify_requests(
628 const std::string &instance_id) {
11fdf7f2 629 dout(10) << "instance_id=" << instance_id << dendl;
7c673cae 630
9f95a23c 631 std::lock_guard locker{m_lock};
7c673cae
FG
632
633 for (auto op : m_notify_ops) {
31f18b77 634 if (op.first == instance_id && !op.second->send_to_leader) {
7c673cae
FG
635 op.second->cancel();
636 }
637 }
638}
639
7c673cae
FG
640template <typename I>
641void InstanceWatcher<I>::register_instance() {
9f95a23c 642 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae 643
11fdf7f2 644 dout(10) << dendl;
7c673cae
FG
645
646 librados::ObjectWriteOperation op;
647 librbd::cls_client::mirror_instances_add(&op, m_instance_id);
648 librados::AioCompletion *aio_comp = create_rados_callback<
649 InstanceWatcher<I>, &InstanceWatcher<I>::handle_register_instance>(this);
650
651 int r = m_ioctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op);
11fdf7f2 652 ceph_assert(r == 0);
7c673cae
FG
653 aio_comp->release();
654}
655
656template <typename I>
657void InstanceWatcher<I>::handle_register_instance(int r) {
11fdf7f2 658 dout(10) << "r=" << r << dendl;
7c673cae
FG
659
660 Context *on_finish = nullptr;
661 {
9f95a23c 662 std::lock_guard locker{m_lock};
7c673cae
FG
663
664 if (r == 0) {
665 create_instance_object();
666 return;
667 }
668
669 derr << "error registering instance: " << cpp_strerror(r) << dendl;
670
671 std::swap(on_finish, m_on_finish);
672 }
673 on_finish->complete(r);
674}
675
676
677template <typename I>
678void InstanceWatcher<I>::create_instance_object() {
11fdf7f2 679 dout(10) << dendl;
7c673cae 680
9f95a23c 681 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
682
683 librados::ObjectWriteOperation op;
684 op.create(true);
685
686 librados::AioCompletion *aio_comp = create_rados_callback<
687 InstanceWatcher<I>,
688 &InstanceWatcher<I>::handle_create_instance_object>(this);
689 int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
11fdf7f2 690 ceph_assert(r == 0);
7c673cae
FG
691 aio_comp->release();
692}
693
694template <typename I>
695void InstanceWatcher<I>::handle_create_instance_object(int r) {
11fdf7f2 696 dout(10) << "r=" << r << dendl;
7c673cae 697
9f95a23c 698 std::lock_guard locker{m_lock};
7c673cae
FG
699
700 if (r < 0) {
701 derr << "error creating " << m_oid << " object: " << cpp_strerror(r)
702 << dendl;
703
704 m_ret_val = r;
705 unregister_instance();
706 return;
707 }
708
709 register_watch();
710}
711
712template <typename I>
713void InstanceWatcher<I>::register_watch() {
11fdf7f2 714 dout(10) << dendl;
7c673cae 715
9f95a23c 716 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
717
718 Context *ctx = create_async_context_callback(
719 m_work_queue, create_context_callback<
720 InstanceWatcher<I>, &InstanceWatcher<I>::handle_register_watch>(this));
721
722 librbd::Watcher::register_watch(ctx);
723}
724
725template <typename I>
726void InstanceWatcher<I>::handle_register_watch(int r) {
11fdf7f2 727 dout(10) << "r=" << r << dendl;
7c673cae 728
9f95a23c 729 std::lock_guard locker{m_lock};
7c673cae
FG
730
731 if (r < 0) {
732 derr << "error registering instance watcher for " << m_oid << " object: "
733 << cpp_strerror(r) << dendl;
734
735 m_ret_val = r;
736 remove_instance_object();
737 return;
738 }
739
740 acquire_lock();
741}
742
743template <typename I>
744void InstanceWatcher<I>::acquire_lock() {
11fdf7f2 745 dout(10) << dendl;
7c673cae 746
9f95a23c 747 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
748
749 Context *ctx = create_async_context_callback(
750 m_work_queue, create_context_callback<
751 InstanceWatcher<I>, &InstanceWatcher<I>::handle_acquire_lock>(this));
752
753 m_instance_lock->acquire_lock(ctx);
754}
755
756template <typename I>
757void InstanceWatcher<I>::handle_acquire_lock(int r) {
11fdf7f2 758 dout(10) << "r=" << r << dendl;
7c673cae
FG
759
760 Context *on_finish = nullptr;
761 {
9f95a23c 762 std::lock_guard locker{m_lock};
7c673cae
FG
763
764 if (r < 0) {
765
766 derr << "error acquiring instance lock: " << cpp_strerror(r) << dendl;
767
768 m_ret_val = r;
769 unregister_watch();
770 return;
771 }
772
773 std::swap(on_finish, m_on_finish);
774 }
775
776 on_finish->complete(r);
777}
778
779template <typename I>
780void InstanceWatcher<I>::release_lock() {
11fdf7f2 781 dout(10) << dendl;
7c673cae 782
9f95a23c 783 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
784
785 Context *ctx = create_async_context_callback(
786 m_work_queue, create_context_callback<
787 InstanceWatcher<I>, &InstanceWatcher<I>::handle_release_lock>(this));
788
789 m_instance_lock->shut_down(ctx);
790}
791
792template <typename I>
793void InstanceWatcher<I>::handle_release_lock(int r) {
11fdf7f2 794 dout(10) << "r=" << r << dendl;
7c673cae 795
9f95a23c 796 std::lock_guard locker{m_lock};
7c673cae
FG
797
798 if (r < 0) {
799 derr << "error releasing instance lock: " << cpp_strerror(r) << dendl;
800 }
801
802 unregister_watch();
803}
804
805template <typename I>
806void InstanceWatcher<I>::unregister_watch() {
11fdf7f2 807 dout(10) << dendl;
7c673cae 808
9f95a23c 809 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
810
811 Context *ctx = create_async_context_callback(
812 m_work_queue, create_context_callback<
813 InstanceWatcher<I>, &InstanceWatcher<I>::handle_unregister_watch>(this));
814
815 librbd::Watcher::unregister_watch(ctx);
816}
817
818template <typename I>
819void InstanceWatcher<I>::handle_unregister_watch(int r) {
11fdf7f2 820 dout(10) << "r=" << r << dendl;
7c673cae
FG
821
822 if (r < 0) {
823 derr << "error unregistering instance watcher for " << m_oid << " object: "
824 << cpp_strerror(r) << dendl;
825 }
826
9f95a23c 827 std::lock_guard locker{m_lock};
7c673cae
FG
828 remove_instance_object();
829}
830
831template <typename I>
832void InstanceWatcher<I>::remove_instance_object() {
9f95a23c 833 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae 834
11fdf7f2 835 dout(10) << dendl;
7c673cae
FG
836
837 librados::ObjectWriteOperation op;
838 op.remove();
839
840 librados::AioCompletion *aio_comp = create_rados_callback<
841 InstanceWatcher<I>,
842 &InstanceWatcher<I>::handle_remove_instance_object>(this);
843 int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
11fdf7f2 844 ceph_assert(r == 0);
7c673cae
FG
845 aio_comp->release();
846}
847
848template <typename I>
849void InstanceWatcher<I>::handle_remove_instance_object(int r) {
11fdf7f2 850 dout(10) << "r=" << r << dendl;
7c673cae 851
11fdf7f2 852 if (r == -ENOENT) {
7c673cae
FG
853 r = 0;
854 }
855
856 if (r < 0) {
857 derr << "error removing " << m_oid << " object: " << cpp_strerror(r)
858 << dendl;
859 }
860
9f95a23c 861 std::lock_guard locker{m_lock};
7c673cae
FG
862 unregister_instance();
863}
864
865template <typename I>
866void InstanceWatcher<I>::unregister_instance() {
11fdf7f2 867 dout(10) << dendl;
7c673cae 868
9f95a23c 869 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
870
871 librados::ObjectWriteOperation op;
872 librbd::cls_client::mirror_instances_remove(&op, m_instance_id);
873 librados::AioCompletion *aio_comp = create_rados_callback<
874 InstanceWatcher<I>, &InstanceWatcher<I>::handle_unregister_instance>(this);
875
876 int r = m_ioctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op);
11fdf7f2 877 ceph_assert(r == 0);
7c673cae
FG
878 aio_comp->release();
879}
880
881template <typename I>
882void InstanceWatcher<I>::handle_unregister_instance(int r) {
11fdf7f2 883 dout(10) << "r=" << r << dendl;
7c673cae
FG
884
885 if (r < 0) {
886 derr << "error unregistering instance: " << cpp_strerror(r) << dendl;
887 }
888
9f95a23c 889 std::lock_guard locker{m_lock};
7c673cae
FG
890 wait_for_notify_ops();
891}
892
893template <typename I>
894void InstanceWatcher<I>::wait_for_notify_ops() {
11fdf7f2 895 dout(10) << dendl;
7c673cae 896
9f95a23c 897 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
898
899 for (auto op : m_notify_ops) {
900 op.second->cancel();
901 }
902
903 Context *ctx = create_async_context_callback(
904 m_work_queue, create_context_callback<
905 InstanceWatcher<I>, &InstanceWatcher<I>::handle_wait_for_notify_ops>(this));
906
907 m_notify_op_tracker.wait_for_ops(ctx);
908}
909
910template <typename I>
911void InstanceWatcher<I>::handle_wait_for_notify_ops(int r) {
11fdf7f2 912 dout(10) << "r=" << r << dendl;
7c673cae 913
11fdf7f2 914 ceph_assert(r == 0);
7c673cae
FG
915
916 Context *on_finish = nullptr;
917 {
9f95a23c 918 std::lock_guard locker{m_lock};
7c673cae 919
11fdf7f2 920 ceph_assert(m_notify_ops.empty());
7c673cae
FG
921
922 std::swap(on_finish, m_on_finish);
923 r = m_ret_val;
7c673cae
FG
924 }
925 on_finish->complete(r);
926}
927
928template <typename I>
929void InstanceWatcher<I>::get_instance_locker() {
11fdf7f2 930 dout(10) << dendl;
7c673cae 931
9f95a23c 932 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
933
934 Context *ctx = create_async_context_callback(
935 m_work_queue, create_context_callback<
936 InstanceWatcher<I>, &InstanceWatcher<I>::handle_get_instance_locker>(this));
937
938 m_instance_lock->get_locker(&m_instance_locker, ctx);
939}
940
941template <typename I>
942void InstanceWatcher<I>::handle_get_instance_locker(int r) {
11fdf7f2 943 dout(10) << "r=" << r << dendl;
7c673cae 944
9f95a23c 945 std::lock_guard locker{m_lock};
7c673cae
FG
946
947 if (r < 0) {
948 if (r != -ENOENT) {
949 derr << "error retrieving instance locker: " << cpp_strerror(r) << dendl;
950 }
951 remove_instance_object();
952 return;
953 }
954
955 break_instance_lock();
956}
957
958template <typename I>
959void InstanceWatcher<I>::break_instance_lock() {
11fdf7f2 960 dout(10) << dendl;
7c673cae 961
9f95a23c 962 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
963
964 Context *ctx = create_async_context_callback(
965 m_work_queue, create_context_callback<
966 InstanceWatcher<I>, &InstanceWatcher<I>::handle_break_instance_lock>(this));
967
968 m_instance_lock->break_lock(m_instance_locker, true, ctx);
969}
970
971template <typename I>
972void InstanceWatcher<I>::handle_break_instance_lock(int r) {
11fdf7f2 973 dout(10) << "r=" << r << dendl;
7c673cae 974
9f95a23c 975 std::lock_guard locker{m_lock};
7c673cae
FG
976
977 if (r < 0) {
978 if (r != -ENOENT) {
979 derr << "error breaking instance lock: " << cpp_strerror(r) << dendl;
980 }
981 remove_instance_object();
982 return;
983 }
984
985 remove_instance_object();
986}
987
31f18b77
FG
988template <typename I>
989void InstanceWatcher<I>::suspend_notify_request(C_NotifyInstanceRequest *req) {
11fdf7f2 990 dout(10) << req << dendl;
31f18b77 991
9f95a23c 992 ceph_assert(ceph_mutex_is_locked(m_lock));
31f18b77
FG
993
994 auto result = m_suspended_ops.insert(req).second;
11fdf7f2 995 ceph_assert(result);
31f18b77
FG
996}
997
998template <typename I>
999bool InstanceWatcher<I>::unsuspend_notify_request(
1000 C_NotifyInstanceRequest *req) {
11fdf7f2 1001 dout(10) << req << dendl;
31f18b77 1002
9f95a23c 1003 ceph_assert(ceph_mutex_is_locked(m_lock));
31f18b77
FG
1004
1005 auto result = m_suspended_ops.erase(req);
1006 if (result == 0) {
1007 return false;
1008 }
1009
1010 req->send();
1011 return true;
1012}
1013
1014template <typename I>
1015void InstanceWatcher<I>::unsuspend_notify_requests() {
11fdf7f2 1016 dout(10) << dendl;
31f18b77 1017
9f95a23c 1018 ceph_assert(ceph_mutex_is_locked(m_lock));
31f18b77
FG
1019
1020 std::set<C_NotifyInstanceRequest *> suspended_ops;
1021 std::swap(m_suspended_ops, suspended_ops);
1022
1023 for (auto op : suspended_ops) {
1024 op->send();
1025 }
1026}
1027
7c673cae
FG
1028template <typename I>
1029Context *InstanceWatcher<I>::prepare_request(const std::string &instance_id,
1030 uint64_t request_id,
1031 C_NotifyAck *on_notify_ack) {
11fdf7f2 1032 dout(10) << "instance_id=" << instance_id << ", request_id=" << request_id
7c673cae
FG
1033 << dendl;
1034
9f95a23c 1035 std::lock_guard locker{m_lock};
7c673cae
FG
1036
1037 Context *ctx = nullptr;
1038 Request request(instance_id, request_id);
1039 auto it = m_requests.find(request);
1040
1041 if (it != m_requests.end()) {
11fdf7f2 1042 dout(10) << "duplicate for in-progress request" << dendl;
7c673cae
FG
1043 delete it->on_notify_ack;
1044 m_requests.erase(it);
1045 } else {
31f18b77 1046 ctx = create_async_context_callback(
9f95a23c 1047 m_work_queue, new LambdaContext(
31f18b77
FG
1048 [this, instance_id, request_id] (int r) {
1049 complete_request(instance_id, request_id, r);
1050 }));
7c673cae
FG
1051 }
1052
1053 request.on_notify_ack = on_notify_ack;
1054 m_requests.insert(request);
1055 return ctx;
1056}
1057
31f18b77
FG
1058template <typename I>
1059void InstanceWatcher<I>::complete_request(const std::string &instance_id,
1060 uint64_t request_id, int r) {
11fdf7f2 1061 dout(10) << "instance_id=" << instance_id << ", request_id=" << request_id
31f18b77
FG
1062 << dendl;
1063
1064 C_NotifyAck *on_notify_ack;
1065 {
9f95a23c 1066 std::lock_guard locker{m_lock};
31f18b77
FG
1067 Request request(instance_id, request_id);
1068 auto it = m_requests.find(request);
11fdf7f2 1069 ceph_assert(it != m_requests.end());
31f18b77
FG
1070 on_notify_ack = it->on_notify_ack;
1071 m_requests.erase(it);
1072 }
1073
11fdf7f2 1074 encode(NotifyAckPayload(instance_id, request_id, r), on_notify_ack->out);
31f18b77
FG
1075 on_notify_ack->complete(0);
1076}
1077
7c673cae
FG
1078template <typename I>
1079void InstanceWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
1080 uint64_t notifier_id, bufferlist &bl) {
11fdf7f2 1081 dout(10) << "notify_id=" << notify_id << ", handle=" << handle << ", "
7c673cae
FG
1082 << "notifier_id=" << notifier_id << dendl;
1083
1084 auto ctx = new C_NotifyAck(this, notify_id, handle);
1085
1086 NotifyMessage notify_message;
1087 try {
11fdf7f2
TL
1088 auto iter = bl.cbegin();
1089 decode(notify_message, iter);
7c673cae
FG
1090 } catch (const buffer::error &err) {
1091 derr << "error decoding image notification: " << err.what() << dendl;
1092 ctx->complete(0);
1093 return;
1094 }
1095
1096 apply_visitor(HandlePayloadVisitor(this, stringify(notifier_id), ctx),
1097 notify_message.payload);
1098}
1099
1100template <typename I>
1101void InstanceWatcher<I>::handle_image_acquire(
d2e6a577 1102 const std::string &global_image_id, Context *on_finish) {
11fdf7f2 1103 dout(10) << "global_image_id=" << global_image_id << dendl;
7c673cae 1104
9f95a23c 1105 auto ctx = new LambdaContext(
d2e6a577
FG
1106 [this, global_image_id, on_finish] (int r) {
1107 m_instance_replayer->acquire_image(this, global_image_id, on_finish);
31f18b77
FG
1108 m_notify_op_tracker.finish_op();
1109 });
1110
1111 m_notify_op_tracker.start_op();
1112 m_work_queue->queue(ctx, 0);
7c673cae
FG
1113}
1114
1115template <typename I>
1116void InstanceWatcher<I>::handle_image_release(
d2e6a577 1117 const std::string &global_image_id, Context *on_finish) {
11fdf7f2 1118 dout(10) << "global_image_id=" << global_image_id << dendl;
7c673cae 1119
9f95a23c 1120 auto ctx = new LambdaContext(
d2e6a577
FG
1121 [this, global_image_id, on_finish] (int r) {
1122 m_instance_replayer->release_image(global_image_id, on_finish);
1123 m_notify_op_tracker.finish_op();
1124 });
1125
1126 m_notify_op_tracker.start_op();
1127 m_work_queue->queue(ctx, 0);
1128}
1129
1130template <typename I>
1131void InstanceWatcher<I>::handle_peer_image_removed(
1132 const std::string &global_image_id, const std::string &peer_mirror_uuid,
1133 Context *on_finish) {
11fdf7f2 1134 dout(10) << "global_image_id=" << global_image_id << ", "
d2e6a577
FG
1135 << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
1136
9f95a23c 1137 auto ctx = new LambdaContext(
d2e6a577
FG
1138 [this, peer_mirror_uuid, global_image_id, on_finish] (int r) {
1139 m_instance_replayer->remove_peer_image(global_image_id,
1140 peer_mirror_uuid, on_finish);
31f18b77
FG
1141 m_notify_op_tracker.finish_op();
1142 });
1143
1144 m_notify_op_tracker.start_op();
1145 m_work_queue->queue(ctx, 0);
1146}
1147
1148template <typename I>
1149void InstanceWatcher<I>::handle_sync_request(const std::string &instance_id,
1150 const std::string &sync_id,
1151 Context *on_finish) {
11fdf7f2 1152 dout(10) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl;
31f18b77 1153
9f95a23c 1154 std::lock_guard locker{m_lock};
31f18b77 1155
9f95a23c 1156 if (!is_leader()) {
11fdf7f2 1157 dout(10) << "sync request for non-leader" << dendl;
31f18b77
FG
1158 m_work_queue->queue(on_finish, -ESTALE);
1159 return;
1160 }
1161
1162 Context *on_start = create_async_context_callback(
9f95a23c 1163 m_work_queue, new LambdaContext(
31f18b77 1164 [this, instance_id, sync_id, on_finish] (int r) {
11fdf7f2 1165 dout(10) << "handle_sync_request: finish: instance_id=" << instance_id
31f18b77
FG
1166 << ", sync_id=" << sync_id << ", r=" << r << dendl;
1167 if (r == 0) {
1168 notify_sync_start(instance_id, sync_id);
1169 }
494da23a
TL
1170 if (r == -ENOENT) {
1171 r = 0;
1172 }
31f18b77
FG
1173 on_finish->complete(r);
1174 }));
9f95a23c 1175 m_image_sync_throttler->start_op(m_ioctx.get_namespace(), sync_id, on_start);
31f18b77
FG
1176}
1177
1178template <typename I>
1179void InstanceWatcher<I>::handle_sync_start(const std::string &instance_id,
1180 const std::string &sync_id,
1181 Context *on_finish) {
11fdf7f2 1182 dout(10) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl;
31f18b77 1183
9f95a23c 1184 std::lock_guard locker{m_lock};
31f18b77
FG
1185
1186 auto it = m_inflight_sync_reqs.find(sync_id);
1187 if (it == m_inflight_sync_reqs.end()) {
11fdf7f2 1188 dout(5) << "not found" << dendl;
31f18b77
FG
1189 m_work_queue->queue(on_finish, 0);
1190 return;
1191 }
1192
1193 auto sync_ctx = it->second;
1194
1195 if (sync_ctx->on_complete != nullptr) {
11fdf7f2 1196 dout(5) << "duplicate request" << dendl;
31f18b77
FG
1197 m_work_queue->queue(sync_ctx->on_complete, -ESTALE);
1198 }
1199
1200 sync_ctx->on_complete = on_finish;
7c673cae
FG
1201}
1202
1203template <typename I>
1204void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1205 const ImageAcquirePayload &payload,
1206 C_NotifyAck *on_notify_ack) {
11fdf7f2 1207 dout(10) << "image_acquire: instance_id=" << instance_id << ", "
7c673cae
FG
1208 << "request_id=" << payload.request_id << dendl;
1209
1210 auto on_finish = prepare_request(instance_id, payload.request_id,
1211 on_notify_ack);
1212 if (on_finish != nullptr) {
d2e6a577 1213 handle_image_acquire(payload.global_image_id, on_finish);
7c673cae
FG
1214 }
1215}
1216
1217template <typename I>
1218void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1219 const ImageReleasePayload &payload,
1220 C_NotifyAck *on_notify_ack) {
11fdf7f2 1221 dout(10) << "image_release: instance_id=" << instance_id << ", "
7c673cae
FG
1222 << "request_id=" << payload.request_id << dendl;
1223
1224 auto on_finish = prepare_request(instance_id, payload.request_id,
1225 on_notify_ack);
1226 if (on_finish != nullptr) {
d2e6a577
FG
1227 handle_image_release(payload.global_image_id, on_finish);
1228 }
1229}
1230
1231template <typename I>
1232void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1233 const PeerImageRemovedPayload &payload,
1234 C_NotifyAck *on_notify_ack) {
11fdf7f2 1235 dout(10) << "remove_peer_image: instance_id=" << instance_id << ", "
d2e6a577
FG
1236 << "request_id=" << payload.request_id << dendl;
1237
1238 auto on_finish = prepare_request(instance_id, payload.request_id,
1239 on_notify_ack);
1240 if (on_finish != nullptr) {
1241 handle_peer_image_removed(payload.global_image_id, payload.peer_mirror_uuid,
1242 on_finish);
7c673cae
FG
1243 }
1244}
1245
31f18b77
FG
1246template <typename I>
1247void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1248 const SyncRequestPayload &payload,
1249 C_NotifyAck *on_notify_ack) {
11fdf7f2 1250 dout(10) << "sync_request: instance_id=" << instance_id << ", "
31f18b77
FG
1251 << "request_id=" << payload.request_id << dendl;
1252
1253 auto on_finish = prepare_request(instance_id, payload.request_id,
1254 on_notify_ack);
1255 if (on_finish == nullptr) {
1256 return;
1257 }
1258
1259 handle_sync_request(instance_id, payload.sync_id, on_finish);
1260}
1261
1262template <typename I>
1263void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1264 const SyncStartPayload &payload,
1265 C_NotifyAck *on_notify_ack) {
11fdf7f2 1266 dout(10) << "sync_start: instance_id=" << instance_id << ", "
31f18b77
FG
1267 << "request_id=" << payload.request_id << dendl;
1268
1269 auto on_finish = prepare_request(instance_id, payload.request_id,
1270 on_notify_ack);
1271 if (on_finish == nullptr) {
1272 return;
1273 }
1274
1275 handle_sync_start(instance_id, payload.sync_id, on_finish);
1276}
1277
7c673cae
FG
1278template <typename I>
1279void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1280 const UnknownPayload &payload,
1281 C_NotifyAck *on_notify_ack) {
11fdf7f2 1282 dout(5) << "unknown: instance_id=" << instance_id << dendl;
7c673cae
FG
1283
1284 on_notify_ack->complete(0);
1285}
1286
1287} // namespace mirror
1288} // namespace rbd
1289
1290template class rbd::mirror::InstanceWatcher<librbd::ImageCtx>;