]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/InstanceWatcher.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / tools / rbd_mirror / InstanceWatcher.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "InstanceWatcher.h"
7c673cae
FG
5#include "include/stringify.h"
6#include "common/debug.h"
7#include "common/errno.h"
8#include "cls/rbd/cls_rbd_client.h"
9#include "librbd/ManagedLock.h"
10#include "librbd/Utils.h"
11#include "InstanceReplayer.h"
9f95a23c 12#include "Throttler.h"
11fdf7f2 13#include "common/Cond.h"
7c673cae
FG
14
15#define dout_context g_ceph_context
16#define dout_subsys ceph_subsys_rbd_mirror
17#undef dout_prefix
18#define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: "
19
20namespace rbd {
21namespace mirror {
22
23using namespace instance_watcher;
24
25using librbd::util::create_async_context_callback;
26using librbd::util::create_context_callback;
27using librbd::util::create_rados_callback;
28using librbd::util::unique_lock_name;
29
30namespace {
31
32struct C_GetInstances : public Context {
33 std::vector<std::string> *instance_ids;
34 Context *on_finish;
35 bufferlist out_bl;
36
37 C_GetInstances(std::vector<std::string> *instance_ids, Context *on_finish)
38 : instance_ids(instance_ids), on_finish(on_finish) {
39 }
40
41 void finish(int r) override {
11fdf7f2 42 dout(10) << "C_GetInstances: " << this << " " << __func__ << ": r=" << r
7c673cae
FG
43 << dendl;
44
45 if (r == 0) {
11fdf7f2 46 auto it = out_bl.cbegin();
7c673cae
FG
47 r = librbd::cls_client::mirror_instances_list_finish(&it, instance_ids);
48 } else if (r == -ENOENT) {
49 r = 0;
50 }
51 on_finish->complete(r);
52 }
53};
54
55template <typename I>
56struct C_RemoveInstanceRequest : public Context {
57 InstanceWatcher<I> instance_watcher;
58 Context *on_finish;
59
60 C_RemoveInstanceRequest(librados::IoCtx &io_ctx, ContextWQ *work_queue,
61 const std::string &instance_id, Context *on_finish)
9f95a23c 62 : instance_watcher(io_ctx, work_queue, nullptr, nullptr, instance_id),
7c673cae
FG
63 on_finish(on_finish) {
64 }
65
66 void send() {
11fdf7f2 67 dout(10) << "C_RemoveInstanceRequest: " << this << " " << __func__ << dendl;
7c673cae
FG
68
69 instance_watcher.remove(this);
70 }
71
72 void finish(int r) override {
11fdf7f2 73 dout(10) << "C_RemoveInstanceRequest: " << this << " " << __func__ << ": r="
7c673cae 74 << r << dendl;
11fdf7f2 75 ceph_assert(r == 0);
7c673cae
FG
76
77 on_finish->complete(r);
78 }
79};
80
81} // anonymous namespace
82
83template <typename I>
84struct InstanceWatcher<I>::C_NotifyInstanceRequest : public Context {
85 InstanceWatcher<I> *instance_watcher;
7c673cae
FG
86 std::string instance_id;
87 uint64_t request_id;
88 bufferlist bl;
89 Context *on_finish;
31f18b77
FG
90 bool send_to_leader;
91 std::unique_ptr<librbd::watcher::Notifier> notifier;
7c673cae 92 librbd::watcher::NotifyResponse response;
31f18b77 93 bool canceling = false;
7c673cae
FG
94
95 C_NotifyInstanceRequest(InstanceWatcher<I> *instance_watcher,
96 const std::string &instance_id, uint64_t request_id,
97 bufferlist &&bl, Context *on_finish)
31f18b77
FG
98 : instance_watcher(instance_watcher), instance_id(instance_id),
99 request_id(request_id), bl(bl), on_finish(on_finish),
100 send_to_leader(instance_id.empty()) {
11fdf7f2 101 dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__
31f18b77
FG
102 << ": instance_watcher=" << instance_watcher << ", instance_id="
103 << instance_id << ", request_id=" << request_id << dendl;
104
9f95a23c 105 ceph_assert(ceph_mutex_is_locked(instance_watcher->m_lock));
31f18b77
FG
106
107 if (!send_to_leader) {
11fdf7f2 108 ceph_assert((!instance_id.empty()));
31f18b77
FG
109 notifier.reset(new librbd::watcher::Notifier(
110 instance_watcher->m_work_queue,
111 instance_watcher->m_ioctx,
112 RBD_MIRROR_INSTANCE_PREFIX + instance_id));
113 }
114
115 instance_watcher->m_notify_op_tracker.start_op();
7c673cae
FG
116 auto result = instance_watcher->m_notify_ops.insert(
117 std::make_pair(instance_id, this)).second;
11fdf7f2 118 ceph_assert(result);
7c673cae
FG
119 }
120
121 void send() {
11fdf7f2 122 dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl;
7c673cae 123
9f95a23c 124 ceph_assert(ceph_mutex_is_locked(instance_watcher->m_lock));
31f18b77
FG
125
126 if (canceling) {
11fdf7f2 127 dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__
31f18b77
FG
128 << ": canceling" << dendl;
129 instance_watcher->m_work_queue->queue(this, -ECANCELED);
130 return;
131 }
132
133 if (send_to_leader) {
134 if (instance_watcher->m_leader_instance_id.empty()) {
11fdf7f2 135 dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__
31f18b77
FG
136 << ": suspending" << dendl;
137 instance_watcher->suspend_notify_request(this);
138 return;
139 }
140
141 if (instance_watcher->m_leader_instance_id != instance_id) {
142 auto count = instance_watcher->m_notify_ops.erase(
143 std::make_pair(instance_id, this));
11fdf7f2 144 ceph_assert(count > 0);
31f18b77
FG
145
146 instance_id = instance_watcher->m_leader_instance_id;
147
148 auto result = instance_watcher->m_notify_ops.insert(
149 std::make_pair(instance_id, this)).second;
11fdf7f2 150 ceph_assert(result);
31f18b77
FG
151
152 notifier.reset(new librbd::watcher::Notifier(
153 instance_watcher->m_work_queue,
154 instance_watcher->m_ioctx,
155 RBD_MIRROR_INSTANCE_PREFIX + instance_id));
156 }
157 }
158
11fdf7f2 159 dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__
d2e6a577 160 << ": sending to " << instance_id << dendl;
31f18b77 161 notifier->notify(bl, &response, this);
7c673cae
FG
162 }
163
164 void cancel() {
11fdf7f2 165 dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl;
7c673cae 166
9f95a23c 167 ceph_assert(ceph_mutex_is_locked(instance_watcher->m_lock));
31f18b77
FG
168
169 canceling = true;
170 instance_watcher->unsuspend_notify_request(this);
7c673cae
FG
171 }
172
173 void finish(int r) override {
11fdf7f2 174 dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__ << ": r="
7c673cae
FG
175 << r << dendl;
176
177 if (r == 0 || r == -ETIMEDOUT) {
178 bool found = false;
179 for (auto &it : response.acks) {
180 auto &bl = it.second;
181 if (it.second.length() == 0) {
11fdf7f2
TL
182 dout(5) << "C_NotifyInstanceRequest: " << this << " " << __func__
183 << ": no payload in ack, ignoring" << dendl;
7c673cae
FG
184 continue;
185 }
186 try {
11fdf7f2 187 auto iter = bl.cbegin();
7c673cae 188 NotifyAckPayload ack;
11fdf7f2 189 decode(ack, iter);
7c673cae
FG
190 if (ack.instance_id != instance_watcher->get_instance_id()) {
191 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
192 << ": ack instance_id (" << ack.instance_id << ") "
193 << "does not match, ignoring" << dendl;
194 continue;
195 }
196 if (ack.request_id != request_id) {
197 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
198 << ": ack request_id (" << ack.request_id << ") "
199 << "does not match, ignoring" << dendl;
200 continue;
201 }
202 r = ack.ret_val;
203 found = true;
204 break;
205 } catch (const buffer::error &err) {
206 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
207 << ": failed to decode ack: " << err.what() << dendl;
208 continue;
209 }
210 }
211
212 if (!found) {
213 if (r == -ETIMEDOUT) {
31f18b77
FG
214 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
215 << ": resending after timeout" << dendl;
9f95a23c 216 std::lock_guard locker{instance_watcher->m_lock};
31f18b77
FG
217 send();
218 return;
7c673cae
FG
219 } else {
220 r = -EINVAL;
221 }
31f18b77
FG
222 } else {
223 if (r == -ESTALE && send_to_leader) {
224 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
225 << ": resending due to leader change" << dendl;
9f95a23c 226 std::lock_guard locker{instance_watcher->m_lock};
31f18b77
FG
227 send();
228 return;
229 }
7c673cae
FG
230 }
231 }
232
7c673cae
FG
233 on_finish->complete(r);
234
31f18b77 235 {
9f95a23c 236 std::lock_guard locker{instance_watcher->m_lock};
31f18b77 237 auto result = instance_watcher->m_notify_ops.erase(
7c673cae 238 std::make_pair(instance_id, this));
11fdf7f2 239 ceph_assert(result > 0);
31f18b77
FG
240 instance_watcher->m_notify_op_tracker.finish_op();
241 }
242
7c673cae
FG
243 delete this;
244 }
245
246 void complete(int r) override {
247 finish(r);
248 }
249};
250
31f18b77
FG
251template <typename I>
252struct InstanceWatcher<I>::C_SyncRequest : public Context {
253 InstanceWatcher<I> *instance_watcher;
254 std::string sync_id;
255 Context *on_start;
256 Context *on_complete = nullptr;
257 C_NotifyInstanceRequest *req = nullptr;
258
259 C_SyncRequest(InstanceWatcher<I> *instance_watcher,
260 const std::string &sync_id, Context *on_start)
261 : instance_watcher(instance_watcher), sync_id(sync_id),
262 on_start(on_start) {
11fdf7f2 263 dout(10) << "C_SyncRequest: " << this << " " << __func__ << ": sync_id="
31f18b77
FG
264 << sync_id << dendl;
265 }
266
267 void finish(int r) override {
11fdf7f2 268 dout(10) << "C_SyncRequest: " << this << " " << __func__ << ": r="
31f18b77
FG
269 << r << dendl;
270
271 if (on_start != nullptr) {
272 instance_watcher->handle_notify_sync_request(this, r);
273 } else {
274 instance_watcher->handle_notify_sync_complete(this, r);
275 delete this;
276 }
277 }
278
279 // called twice
280 void complete(int r) override {
281 finish(r);
282 }
283};
284
7c673cae
FG
285#undef dout_prefix
286#define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: " \
287 << this << " " << __func__ << ": "
288template <typename I>
289void InstanceWatcher<I>::get_instances(librados::IoCtx &io_ctx,
290 std::vector<std::string> *instance_ids,
291 Context *on_finish) {
292 librados::ObjectReadOperation op;
293 librbd::cls_client::mirror_instances_list_start(&op);
294 C_GetInstances *ctx = new C_GetInstances(instance_ids, on_finish);
295 librados::AioCompletion *aio_comp = create_rados_callback(ctx);
296
297 int r = io_ctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op, &ctx->out_bl);
11fdf7f2 298 ceph_assert(r == 0);
7c673cae
FG
299 aio_comp->release();
300}
301
302template <typename I>
303void InstanceWatcher<I>::remove_instance(librados::IoCtx &io_ctx,
304 ContextWQ *work_queue,
305 const std::string &instance_id,
306 Context *on_finish) {
307 auto req = new C_RemoveInstanceRequest<I>(io_ctx, work_queue, instance_id,
308 on_finish);
309 req->send();
310}
311
312template <typename I>
313InstanceWatcher<I> *InstanceWatcher<I>::create(
314 librados::IoCtx &io_ctx, ContextWQ *work_queue,
9f95a23c
TL
315 InstanceReplayer<I> *instance_replayer,
316 Throttler<I> *image_sync_throttler) {
7c673cae 317 return new InstanceWatcher<I>(io_ctx, work_queue, instance_replayer,
9f95a23c 318 image_sync_throttler,
7c673cae
FG
319 stringify(io_ctx.get_instance_id()));
320}
321
322template <typename I>
323InstanceWatcher<I>::InstanceWatcher(librados::IoCtx &io_ctx,
324 ContextWQ *work_queue,
325 InstanceReplayer<I> *instance_replayer,
9f95a23c 326 Throttler<I> *image_sync_throttler,
7c673cae
FG
327 const std::string &instance_id)
328 : Watcher(io_ctx, work_queue, RBD_MIRROR_INSTANCE_PREFIX + instance_id),
9f95a23c
TL
329 m_instance_replayer(instance_replayer),
330 m_image_sync_throttler(image_sync_throttler), m_instance_id(instance_id),
331 m_lock(ceph::make_mutex(
332 unique_lock_name("rbd::mirror::InstanceWatcher::m_lock", this))),
7c673cae
FG
333 m_instance_lock(librbd::ManagedLock<I>::create(
334 m_ioctx, m_work_queue, m_oid, this, librbd::managed_lock::EXCLUSIVE, true,
11fdf7f2 335 m_cct->_conf.get_val<uint64_t>("rbd_blacklist_expire_seconds"))) {
7c673cae
FG
336}
337
338template <typename I>
339InstanceWatcher<I>::~InstanceWatcher() {
11fdf7f2
TL
340 ceph_assert(m_requests.empty());
341 ceph_assert(m_notify_ops.empty());
342 ceph_assert(m_notify_op_tracker.empty());
343 ceph_assert(m_suspended_ops.empty());
344 ceph_assert(m_inflight_sync_reqs.empty());
7c673cae
FG
345 m_instance_lock->destroy();
346}
347
348template <typename I>
349int InstanceWatcher<I>::init() {
350 C_SaferCond init_ctx;
351 init(&init_ctx);
352 return init_ctx.wait();
353}
354
355template <typename I>
356void InstanceWatcher<I>::init(Context *on_finish) {
11fdf7f2 357 dout(10) << "instance_id=" << m_instance_id << dendl;
7c673cae 358
9f95a23c 359 std::lock_guard locker{m_lock};
7c673cae 360
11fdf7f2 361 ceph_assert(m_on_finish == nullptr);
7c673cae
FG
362 m_on_finish = on_finish;
363 m_ret_val = 0;
364
365 register_instance();
366}
367
368template <typename I>
369void InstanceWatcher<I>::shut_down() {
370 C_SaferCond shut_down_ctx;
371 shut_down(&shut_down_ctx);
372 int r = shut_down_ctx.wait();
11fdf7f2 373 ceph_assert(r == 0);
7c673cae
FG
374}
375
376template <typename I>
377void InstanceWatcher<I>::shut_down(Context *on_finish) {
11fdf7f2 378 dout(10) << dendl;
7c673cae 379
9f95a23c 380 std::lock_guard locker{m_lock};
7c673cae 381
11fdf7f2 382 ceph_assert(m_on_finish == nullptr);
7c673cae
FG
383 m_on_finish = on_finish;
384 m_ret_val = 0;
385
386 release_lock();
387}
388
389template <typename I>
390void InstanceWatcher<I>::remove(Context *on_finish) {
11fdf7f2 391 dout(10) << dendl;
7c673cae 392
9f95a23c 393 std::lock_guard locker{m_lock};
7c673cae 394
11fdf7f2 395 ceph_assert(m_on_finish == nullptr);
7c673cae
FG
396 m_on_finish = on_finish;
397 m_ret_val = 0;
7c673cae
FG
398
399 get_instance_locker();
400}
401
402template <typename I>
403void InstanceWatcher<I>::notify_image_acquire(
404 const std::string &instance_id, const std::string &global_image_id,
d2e6a577 405 Context *on_notify_ack) {
11fdf7f2 406 dout(10) << "instance_id=" << instance_id << ", global_image_id="
7c673cae
FG
407 << global_image_id << dendl;
408
9f95a23c 409 std::lock_guard locker{m_lock};
7c673cae 410
11fdf7f2 411 ceph_assert(m_on_finish == nullptr);
7c673cae 412
11fdf7f2
TL
413 uint64_t request_id = ++m_request_seq;
414 bufferlist bl;
415 encode(NotifyMessage{ImageAcquirePayload{request_id, global_image_id}}, bl);
416 auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
417 std::move(bl), on_notify_ack);
418 req->send();
7c673cae
FG
419}
420
421template <typename I>
422void InstanceWatcher<I>::notify_image_release(
d2e6a577
FG
423 const std::string &instance_id, const std::string &global_image_id,
424 Context *on_notify_ack) {
11fdf7f2 425 dout(10) << "instance_id=" << instance_id << ", global_image_id="
7c673cae
FG
426 << global_image_id << dendl;
427
9f95a23c 428 std::lock_guard locker{m_lock};
7c673cae 429
11fdf7f2 430 ceph_assert(m_on_finish == nullptr);
7c673cae 431
11fdf7f2
TL
432 uint64_t request_id = ++m_request_seq;
433 bufferlist bl;
434 encode(NotifyMessage{ImageReleasePayload{request_id, global_image_id}}, bl);
435 auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
436 std::move(bl), on_notify_ack);
437 req->send();
d2e6a577
FG
438}
439
440template <typename I>
441void InstanceWatcher<I>::notify_peer_image_removed(
442 const std::string &instance_id, const std::string &global_image_id,
443 const std::string &peer_mirror_uuid, Context *on_notify_ack) {
11fdf7f2 444 dout(10) << "instance_id=" << instance_id << ", "
d2e6a577
FG
445 << "global_image_id=" << global_image_id << ", "
446 << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
447
9f95a23c 448 std::lock_guard locker{m_lock};
11fdf7f2 449 ceph_assert(m_on_finish == nullptr);
d2e6a577 450
11fdf7f2
TL
451 uint64_t request_id = ++m_request_seq;
452 bufferlist bl;
453 encode(NotifyMessage{PeerImageRemovedPayload{request_id, global_image_id,
454 peer_mirror_uuid}}, bl);
455 auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
456 std::move(bl), on_notify_ack);
457 req->send();
7c673cae
FG
458}
459
31f18b77
FG
460template <typename I>
461void InstanceWatcher<I>::notify_sync_request(const std::string &sync_id,
462 Context *on_sync_start) {
11fdf7f2 463 dout(10) << "sync_id=" << sync_id << dendl;
31f18b77 464
9f95a23c 465 std::lock_guard locker{m_lock};
31f18b77 466
11fdf7f2 467 ceph_assert(m_inflight_sync_reqs.count(sync_id) == 0);
31f18b77
FG
468
469 uint64_t request_id = ++m_request_seq;
470
471 bufferlist bl;
11fdf7f2 472 encode(NotifyMessage{SyncRequestPayload{request_id, sync_id}}, bl);
31f18b77
FG
473
474 auto sync_ctx = new C_SyncRequest(this, sync_id, on_sync_start);
475 sync_ctx->req = new C_NotifyInstanceRequest(this, "", request_id,
476 std::move(bl), sync_ctx);
477
478 m_inflight_sync_reqs[sync_id] = sync_ctx;
479 sync_ctx->req->send();
480}
481
482template <typename I>
483bool InstanceWatcher<I>::cancel_sync_request(const std::string &sync_id) {
11fdf7f2 484 dout(10) << "sync_id=" << sync_id << dendl;
31f18b77 485
9f95a23c 486 std::lock_guard locker{m_lock};
31f18b77
FG
487
488 auto it = m_inflight_sync_reqs.find(sync_id);
489 if (it == m_inflight_sync_reqs.end()) {
490 return false;
491 }
492
493 auto sync_ctx = it->second;
494
495 if (sync_ctx->on_start == nullptr) {
496 return false;
497 }
498
11fdf7f2 499 ceph_assert(sync_ctx->req != nullptr);
31f18b77
FG
500 sync_ctx->req->cancel();
501 return true;
502}
503
504template <typename I>
505void InstanceWatcher<I>::notify_sync_start(const std::string &instance_id,
506 const std::string &sync_id) {
11fdf7f2 507 dout(10) << "sync_id=" << sync_id << dendl;
31f18b77 508
9f95a23c 509 std::lock_guard locker{m_lock};
31f18b77
FG
510
511 uint64_t request_id = ++m_request_seq;
512
513 bufferlist bl;
11fdf7f2 514 encode(NotifyMessage{SyncStartPayload{request_id, sync_id}}, bl);
31f18b77 515
9f95a23c 516 auto ctx = new LambdaContext(
31f18b77 517 [this, sync_id] (int r) {
11fdf7f2 518 dout(10) << "finish: sync_id=" << sync_id << ", r=" << r << dendl;
9f95a23c
TL
519 std::lock_guard locker{m_lock};
520 if (r != -ESTALE && is_leader()) {
521 m_image_sync_throttler->finish_op(m_ioctx.get_namespace(), sync_id);
31f18b77
FG
522 }
523 });
524 auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
525 std::move(bl), ctx);
526 req->send();
527}
528
529template <typename I>
530void InstanceWatcher<I>::notify_sync_complete(const std::string &sync_id) {
9f95a23c 531 std::lock_guard locker{m_lock};
28e407b8
AA
532 notify_sync_complete(m_lock, sync_id);
533}
534
535template <typename I>
9f95a23c 536void InstanceWatcher<I>::notify_sync_complete(const ceph::mutex&,
28e407b8
AA
537 const std::string &sync_id) {
538 dout(10) << "sync_id=" << sync_id << dendl;
9f95a23c 539 ceph_assert(ceph_mutex_is_locked(m_lock));
31f18b77
FG
540
541 auto it = m_inflight_sync_reqs.find(sync_id);
11fdf7f2 542 ceph_assert(it != m_inflight_sync_reqs.end());
31f18b77
FG
543
544 auto sync_ctx = it->second;
11fdf7f2 545 ceph_assert(sync_ctx->req == nullptr);
31f18b77
FG
546
547 m_inflight_sync_reqs.erase(it);
548 m_work_queue->queue(sync_ctx, 0);
549}
550
551template <typename I>
552void InstanceWatcher<I>::handle_notify_sync_request(C_SyncRequest *sync_ctx,
553 int r) {
11fdf7f2 554 dout(10) << "sync_id=" << sync_ctx->sync_id << ", r=" << r << dendl;
31f18b77
FG
555
556 Context *on_start = nullptr;
557 {
9f95a23c 558 std::lock_guard locker{m_lock};
11fdf7f2
TL
559 ceph_assert(sync_ctx->req != nullptr);
560 ceph_assert(sync_ctx->on_start != nullptr);
31f18b77
FG
561
562 if (sync_ctx->req->canceling) {
563 r = -ECANCELED;
564 }
565
566 std::swap(sync_ctx->on_start, on_start);
567 sync_ctx->req = nullptr;
28e407b8
AA
568
569 if (r == -ECANCELED) {
570 notify_sync_complete(m_lock, sync_ctx->sync_id);
571 }
31f18b77
FG
572 }
573
574 on_start->complete(r == -ECANCELED ? r : 0);
31f18b77
FG
575}
576
577template <typename I>
578void InstanceWatcher<I>::handle_notify_sync_complete(C_SyncRequest *sync_ctx,
579 int r) {
11fdf7f2 580 dout(10) << "sync_id=" << sync_ctx->sync_id << ", r=" << r << dendl;
31f18b77
FG
581
582 if (sync_ctx->on_complete != nullptr) {
583 sync_ctx->on_complete->complete(r);
584 }
585}
586
31f18b77
FG
587template <typename I>
588void InstanceWatcher<I>::handle_acquire_leader() {
11fdf7f2 589 dout(10) << dendl;
31f18b77 590
9f95a23c 591 std::lock_guard locker{m_lock};
31f18b77
FG
592
593 m_leader_instance_id = m_instance_id;
594 unsuspend_notify_requests();
595}
596
597template <typename I>
598void InstanceWatcher<I>::handle_release_leader() {
11fdf7f2 599 dout(10) << dendl;
31f18b77 600
9f95a23c 601 std::lock_guard locker{m_lock};
31f18b77
FG
602
603 m_leader_instance_id.clear();
604
9f95a23c 605 m_image_sync_throttler->drain(m_ioctx.get_namespace(), -ESTALE);
31f18b77
FG
606}
607
608template <typename I>
609void InstanceWatcher<I>::handle_update_leader(
610 const std::string &leader_instance_id) {
11fdf7f2 611 dout(10) << "leader_instance_id=" << leader_instance_id << dendl;
31f18b77 612
9f95a23c 613 std::lock_guard locker{m_lock};
31f18b77
FG
614
615 m_leader_instance_id = leader_instance_id;
616
617 if (!m_leader_instance_id.empty()) {
618 unsuspend_notify_requests();
619 }
620}
621
7c673cae
FG
622template <typename I>
623void InstanceWatcher<I>::cancel_notify_requests(
624 const std::string &instance_id) {
11fdf7f2 625 dout(10) << "instance_id=" << instance_id << dendl;
7c673cae 626
9f95a23c 627 std::lock_guard locker{m_lock};
7c673cae
FG
628
629 for (auto op : m_notify_ops) {
31f18b77 630 if (op.first == instance_id && !op.second->send_to_leader) {
7c673cae
FG
631 op.second->cancel();
632 }
633 }
634}
635
7c673cae
FG
636template <typename I>
637void InstanceWatcher<I>::register_instance() {
9f95a23c 638 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae 639
11fdf7f2 640 dout(10) << dendl;
7c673cae
FG
641
642 librados::ObjectWriteOperation op;
643 librbd::cls_client::mirror_instances_add(&op, m_instance_id);
644 librados::AioCompletion *aio_comp = create_rados_callback<
645 InstanceWatcher<I>, &InstanceWatcher<I>::handle_register_instance>(this);
646
647 int r = m_ioctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op);
11fdf7f2 648 ceph_assert(r == 0);
7c673cae
FG
649 aio_comp->release();
650}
651
652template <typename I>
653void InstanceWatcher<I>::handle_register_instance(int r) {
11fdf7f2 654 dout(10) << "r=" << r << dendl;
7c673cae
FG
655
656 Context *on_finish = nullptr;
657 {
9f95a23c 658 std::lock_guard locker{m_lock};
7c673cae
FG
659
660 if (r == 0) {
661 create_instance_object();
662 return;
663 }
664
665 derr << "error registering instance: " << cpp_strerror(r) << dendl;
666
667 std::swap(on_finish, m_on_finish);
668 }
669 on_finish->complete(r);
670}
671
672
673template <typename I>
674void InstanceWatcher<I>::create_instance_object() {
11fdf7f2 675 dout(10) << dendl;
7c673cae 676
9f95a23c 677 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
678
679 librados::ObjectWriteOperation op;
680 op.create(true);
681
682 librados::AioCompletion *aio_comp = create_rados_callback<
683 InstanceWatcher<I>,
684 &InstanceWatcher<I>::handle_create_instance_object>(this);
685 int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
11fdf7f2 686 ceph_assert(r == 0);
7c673cae
FG
687 aio_comp->release();
688}
689
690template <typename I>
691void InstanceWatcher<I>::handle_create_instance_object(int r) {
11fdf7f2 692 dout(10) << "r=" << r << dendl;
7c673cae 693
9f95a23c 694 std::lock_guard locker{m_lock};
7c673cae
FG
695
696 if (r < 0) {
697 derr << "error creating " << m_oid << " object: " << cpp_strerror(r)
698 << dendl;
699
700 m_ret_val = r;
701 unregister_instance();
702 return;
703 }
704
705 register_watch();
706}
707
708template <typename I>
709void InstanceWatcher<I>::register_watch() {
11fdf7f2 710 dout(10) << dendl;
7c673cae 711
9f95a23c 712 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
713
714 Context *ctx = create_async_context_callback(
715 m_work_queue, create_context_callback<
716 InstanceWatcher<I>, &InstanceWatcher<I>::handle_register_watch>(this));
717
718 librbd::Watcher::register_watch(ctx);
719}
720
721template <typename I>
722void InstanceWatcher<I>::handle_register_watch(int r) {
11fdf7f2 723 dout(10) << "r=" << r << dendl;
7c673cae 724
9f95a23c 725 std::lock_guard locker{m_lock};
7c673cae
FG
726
727 if (r < 0) {
728 derr << "error registering instance watcher for " << m_oid << " object: "
729 << cpp_strerror(r) << dendl;
730
731 m_ret_val = r;
732 remove_instance_object();
733 return;
734 }
735
736 acquire_lock();
737}
738
739template <typename I>
740void InstanceWatcher<I>::acquire_lock() {
11fdf7f2 741 dout(10) << dendl;
7c673cae 742
9f95a23c 743 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
744
745 Context *ctx = create_async_context_callback(
746 m_work_queue, create_context_callback<
747 InstanceWatcher<I>, &InstanceWatcher<I>::handle_acquire_lock>(this));
748
749 m_instance_lock->acquire_lock(ctx);
750}
751
752template <typename I>
753void InstanceWatcher<I>::handle_acquire_lock(int r) {
11fdf7f2 754 dout(10) << "r=" << r << dendl;
7c673cae
FG
755
756 Context *on_finish = nullptr;
757 {
9f95a23c 758 std::lock_guard locker{m_lock};
7c673cae
FG
759
760 if (r < 0) {
761
762 derr << "error acquiring instance lock: " << cpp_strerror(r) << dendl;
763
764 m_ret_val = r;
765 unregister_watch();
766 return;
767 }
768
769 std::swap(on_finish, m_on_finish);
770 }
771
772 on_finish->complete(r);
773}
774
775template <typename I>
776void InstanceWatcher<I>::release_lock() {
11fdf7f2 777 dout(10) << dendl;
7c673cae 778
9f95a23c 779 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
780
781 Context *ctx = create_async_context_callback(
782 m_work_queue, create_context_callback<
783 InstanceWatcher<I>, &InstanceWatcher<I>::handle_release_lock>(this));
784
785 m_instance_lock->shut_down(ctx);
786}
787
788template <typename I>
789void InstanceWatcher<I>::handle_release_lock(int r) {
11fdf7f2 790 dout(10) << "r=" << r << dendl;
7c673cae 791
9f95a23c 792 std::lock_guard locker{m_lock};
7c673cae
FG
793
794 if (r < 0) {
795 derr << "error releasing instance lock: " << cpp_strerror(r) << dendl;
796 }
797
798 unregister_watch();
799}
800
801template <typename I>
802void InstanceWatcher<I>::unregister_watch() {
11fdf7f2 803 dout(10) << dendl;
7c673cae 804
9f95a23c 805 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
806
807 Context *ctx = create_async_context_callback(
808 m_work_queue, create_context_callback<
809 InstanceWatcher<I>, &InstanceWatcher<I>::handle_unregister_watch>(this));
810
811 librbd::Watcher::unregister_watch(ctx);
812}
813
814template <typename I>
815void InstanceWatcher<I>::handle_unregister_watch(int r) {
11fdf7f2 816 dout(10) << "r=" << r << dendl;
7c673cae
FG
817
818 if (r < 0) {
819 derr << "error unregistering instance watcher for " << m_oid << " object: "
820 << cpp_strerror(r) << dendl;
821 }
822
9f95a23c 823 std::lock_guard locker{m_lock};
7c673cae
FG
824 remove_instance_object();
825}
826
827template <typename I>
828void InstanceWatcher<I>::remove_instance_object() {
9f95a23c 829 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae 830
11fdf7f2 831 dout(10) << dendl;
7c673cae
FG
832
833 librados::ObjectWriteOperation op;
834 op.remove();
835
836 librados::AioCompletion *aio_comp = create_rados_callback<
837 InstanceWatcher<I>,
838 &InstanceWatcher<I>::handle_remove_instance_object>(this);
839 int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
11fdf7f2 840 ceph_assert(r == 0);
7c673cae
FG
841 aio_comp->release();
842}
843
844template <typename I>
845void InstanceWatcher<I>::handle_remove_instance_object(int r) {
11fdf7f2 846 dout(10) << "r=" << r << dendl;
7c673cae 847
11fdf7f2 848 if (r == -ENOENT) {
7c673cae
FG
849 r = 0;
850 }
851
852 if (r < 0) {
853 derr << "error removing " << m_oid << " object: " << cpp_strerror(r)
854 << dendl;
855 }
856
9f95a23c 857 std::lock_guard locker{m_lock};
7c673cae
FG
858 unregister_instance();
859}
860
861template <typename I>
862void InstanceWatcher<I>::unregister_instance() {
11fdf7f2 863 dout(10) << dendl;
7c673cae 864
9f95a23c 865 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
866
867 librados::ObjectWriteOperation op;
868 librbd::cls_client::mirror_instances_remove(&op, m_instance_id);
869 librados::AioCompletion *aio_comp = create_rados_callback<
870 InstanceWatcher<I>, &InstanceWatcher<I>::handle_unregister_instance>(this);
871
872 int r = m_ioctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op);
11fdf7f2 873 ceph_assert(r == 0);
7c673cae
FG
874 aio_comp->release();
875}
876
877template <typename I>
878void InstanceWatcher<I>::handle_unregister_instance(int r) {
11fdf7f2 879 dout(10) << "r=" << r << dendl;
7c673cae
FG
880
881 if (r < 0) {
882 derr << "error unregistering instance: " << cpp_strerror(r) << dendl;
883 }
884
9f95a23c 885 std::lock_guard locker{m_lock};
7c673cae
FG
886 wait_for_notify_ops();
887}
888
889template <typename I>
890void InstanceWatcher<I>::wait_for_notify_ops() {
11fdf7f2 891 dout(10) << dendl;
7c673cae 892
9f95a23c 893 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
894
895 for (auto op : m_notify_ops) {
896 op.second->cancel();
897 }
898
899 Context *ctx = create_async_context_callback(
900 m_work_queue, create_context_callback<
901 InstanceWatcher<I>, &InstanceWatcher<I>::handle_wait_for_notify_ops>(this));
902
903 m_notify_op_tracker.wait_for_ops(ctx);
904}
905
906template <typename I>
907void InstanceWatcher<I>::handle_wait_for_notify_ops(int r) {
11fdf7f2 908 dout(10) << "r=" << r << dendl;
7c673cae 909
11fdf7f2 910 ceph_assert(r == 0);
7c673cae
FG
911
912 Context *on_finish = nullptr;
913 {
9f95a23c 914 std::lock_guard locker{m_lock};
7c673cae 915
11fdf7f2 916 ceph_assert(m_notify_ops.empty());
7c673cae
FG
917
918 std::swap(on_finish, m_on_finish);
919 r = m_ret_val;
7c673cae
FG
920 }
921 on_finish->complete(r);
922}
923
924template <typename I>
925void InstanceWatcher<I>::get_instance_locker() {
11fdf7f2 926 dout(10) << dendl;
7c673cae 927
9f95a23c 928 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
929
930 Context *ctx = create_async_context_callback(
931 m_work_queue, create_context_callback<
932 InstanceWatcher<I>, &InstanceWatcher<I>::handle_get_instance_locker>(this));
933
934 m_instance_lock->get_locker(&m_instance_locker, ctx);
935}
936
937template <typename I>
938void InstanceWatcher<I>::handle_get_instance_locker(int r) {
11fdf7f2 939 dout(10) << "r=" << r << dendl;
7c673cae 940
9f95a23c 941 std::lock_guard locker{m_lock};
7c673cae
FG
942
943 if (r < 0) {
944 if (r != -ENOENT) {
945 derr << "error retrieving instance locker: " << cpp_strerror(r) << dendl;
946 }
947 remove_instance_object();
948 return;
949 }
950
951 break_instance_lock();
952}
953
954template <typename I>
955void InstanceWatcher<I>::break_instance_lock() {
11fdf7f2 956 dout(10) << dendl;
7c673cae 957
9f95a23c 958 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
959
960 Context *ctx = create_async_context_callback(
961 m_work_queue, create_context_callback<
962 InstanceWatcher<I>, &InstanceWatcher<I>::handle_break_instance_lock>(this));
963
964 m_instance_lock->break_lock(m_instance_locker, true, ctx);
965}
966
967template <typename I>
968void InstanceWatcher<I>::handle_break_instance_lock(int r) {
11fdf7f2 969 dout(10) << "r=" << r << dendl;
7c673cae 970
9f95a23c 971 std::lock_guard locker{m_lock};
7c673cae
FG
972
973 if (r < 0) {
974 if (r != -ENOENT) {
975 derr << "error breaking instance lock: " << cpp_strerror(r) << dendl;
976 }
977 remove_instance_object();
978 return;
979 }
980
981 remove_instance_object();
982}
983
31f18b77
FG
984template <typename I>
985void InstanceWatcher<I>::suspend_notify_request(C_NotifyInstanceRequest *req) {
11fdf7f2 986 dout(10) << req << dendl;
31f18b77 987
9f95a23c 988 ceph_assert(ceph_mutex_is_locked(m_lock));
31f18b77
FG
989
990 auto result = m_suspended_ops.insert(req).second;
11fdf7f2 991 ceph_assert(result);
31f18b77
FG
992}
993
994template <typename I>
995bool InstanceWatcher<I>::unsuspend_notify_request(
996 C_NotifyInstanceRequest *req) {
11fdf7f2 997 dout(10) << req << dendl;
31f18b77 998
9f95a23c 999 ceph_assert(ceph_mutex_is_locked(m_lock));
31f18b77
FG
1000
1001 auto result = m_suspended_ops.erase(req);
1002 if (result == 0) {
1003 return false;
1004 }
1005
1006 req->send();
1007 return true;
1008}
1009
1010template <typename I>
1011void InstanceWatcher<I>::unsuspend_notify_requests() {
11fdf7f2 1012 dout(10) << dendl;
31f18b77 1013
9f95a23c 1014 ceph_assert(ceph_mutex_is_locked(m_lock));
31f18b77
FG
1015
1016 std::set<C_NotifyInstanceRequest *> suspended_ops;
1017 std::swap(m_suspended_ops, suspended_ops);
1018
1019 for (auto op : suspended_ops) {
1020 op->send();
1021 }
1022}
1023
7c673cae
FG
1024template <typename I>
1025Context *InstanceWatcher<I>::prepare_request(const std::string &instance_id,
1026 uint64_t request_id,
1027 C_NotifyAck *on_notify_ack) {
11fdf7f2 1028 dout(10) << "instance_id=" << instance_id << ", request_id=" << request_id
7c673cae
FG
1029 << dendl;
1030
9f95a23c 1031 std::lock_guard locker{m_lock};
7c673cae
FG
1032
1033 Context *ctx = nullptr;
1034 Request request(instance_id, request_id);
1035 auto it = m_requests.find(request);
1036
1037 if (it != m_requests.end()) {
11fdf7f2 1038 dout(10) << "duplicate for in-progress request" << dendl;
7c673cae
FG
1039 delete it->on_notify_ack;
1040 m_requests.erase(it);
1041 } else {
31f18b77 1042 ctx = create_async_context_callback(
9f95a23c 1043 m_work_queue, new LambdaContext(
31f18b77
FG
1044 [this, instance_id, request_id] (int r) {
1045 complete_request(instance_id, request_id, r);
1046 }));
7c673cae
FG
1047 }
1048
1049 request.on_notify_ack = on_notify_ack;
1050 m_requests.insert(request);
1051 return ctx;
1052}
1053
31f18b77
FG
1054template <typename I>
1055void InstanceWatcher<I>::complete_request(const std::string &instance_id,
1056 uint64_t request_id, int r) {
11fdf7f2 1057 dout(10) << "instance_id=" << instance_id << ", request_id=" << request_id
31f18b77
FG
1058 << dendl;
1059
1060 C_NotifyAck *on_notify_ack;
1061 {
9f95a23c 1062 std::lock_guard locker{m_lock};
31f18b77
FG
1063 Request request(instance_id, request_id);
1064 auto it = m_requests.find(request);
11fdf7f2 1065 ceph_assert(it != m_requests.end());
31f18b77
FG
1066 on_notify_ack = it->on_notify_ack;
1067 m_requests.erase(it);
1068 }
1069
11fdf7f2 1070 encode(NotifyAckPayload(instance_id, request_id, r), on_notify_ack->out);
31f18b77
FG
1071 on_notify_ack->complete(0);
1072}
1073
7c673cae
FG
1074template <typename I>
1075void InstanceWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
1076 uint64_t notifier_id, bufferlist &bl) {
11fdf7f2 1077 dout(10) << "notify_id=" << notify_id << ", handle=" << handle << ", "
7c673cae
FG
1078 << "notifier_id=" << notifier_id << dendl;
1079
1080 auto ctx = new C_NotifyAck(this, notify_id, handle);
1081
1082 NotifyMessage notify_message;
1083 try {
11fdf7f2
TL
1084 auto iter = bl.cbegin();
1085 decode(notify_message, iter);
7c673cae
FG
1086 } catch (const buffer::error &err) {
1087 derr << "error decoding image notification: " << err.what() << dendl;
1088 ctx->complete(0);
1089 return;
1090 }
1091
1092 apply_visitor(HandlePayloadVisitor(this, stringify(notifier_id), ctx),
1093 notify_message.payload);
1094}
1095
1096template <typename I>
1097void InstanceWatcher<I>::handle_image_acquire(
d2e6a577 1098 const std::string &global_image_id, Context *on_finish) {
11fdf7f2 1099 dout(10) << "global_image_id=" << global_image_id << dendl;
7c673cae 1100
9f95a23c 1101 auto ctx = new LambdaContext(
d2e6a577
FG
1102 [this, global_image_id, on_finish] (int r) {
1103 m_instance_replayer->acquire_image(this, global_image_id, on_finish);
31f18b77
FG
1104 m_notify_op_tracker.finish_op();
1105 });
1106
1107 m_notify_op_tracker.start_op();
1108 m_work_queue->queue(ctx, 0);
7c673cae
FG
1109}
1110
1111template <typename I>
1112void InstanceWatcher<I>::handle_image_release(
d2e6a577 1113 const std::string &global_image_id, Context *on_finish) {
11fdf7f2 1114 dout(10) << "global_image_id=" << global_image_id << dendl;
7c673cae 1115
9f95a23c 1116 auto ctx = new LambdaContext(
d2e6a577
FG
1117 [this, global_image_id, on_finish] (int r) {
1118 m_instance_replayer->release_image(global_image_id, on_finish);
1119 m_notify_op_tracker.finish_op();
1120 });
1121
1122 m_notify_op_tracker.start_op();
1123 m_work_queue->queue(ctx, 0);
1124}
1125
1126template <typename I>
1127void InstanceWatcher<I>::handle_peer_image_removed(
1128 const std::string &global_image_id, const std::string &peer_mirror_uuid,
1129 Context *on_finish) {
11fdf7f2 1130 dout(10) << "global_image_id=" << global_image_id << ", "
d2e6a577
FG
1131 << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
1132
9f95a23c 1133 auto ctx = new LambdaContext(
d2e6a577
FG
1134 [this, peer_mirror_uuid, global_image_id, on_finish] (int r) {
1135 m_instance_replayer->remove_peer_image(global_image_id,
1136 peer_mirror_uuid, on_finish);
31f18b77
FG
1137 m_notify_op_tracker.finish_op();
1138 });
1139
1140 m_notify_op_tracker.start_op();
1141 m_work_queue->queue(ctx, 0);
1142}
1143
1144template <typename I>
1145void InstanceWatcher<I>::handle_sync_request(const std::string &instance_id,
1146 const std::string &sync_id,
1147 Context *on_finish) {
11fdf7f2 1148 dout(10) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl;
31f18b77 1149
9f95a23c 1150 std::lock_guard locker{m_lock};
31f18b77 1151
9f95a23c 1152 if (!is_leader()) {
11fdf7f2 1153 dout(10) << "sync request for non-leader" << dendl;
31f18b77
FG
1154 m_work_queue->queue(on_finish, -ESTALE);
1155 return;
1156 }
1157
1158 Context *on_start = create_async_context_callback(
9f95a23c 1159 m_work_queue, new LambdaContext(
31f18b77 1160 [this, instance_id, sync_id, on_finish] (int r) {
11fdf7f2 1161 dout(10) << "handle_sync_request: finish: instance_id=" << instance_id
31f18b77
FG
1162 << ", sync_id=" << sync_id << ", r=" << r << dendl;
1163 if (r == 0) {
1164 notify_sync_start(instance_id, sync_id);
1165 }
494da23a
TL
1166 if (r == -ENOENT) {
1167 r = 0;
1168 }
31f18b77
FG
1169 on_finish->complete(r);
1170 }));
9f95a23c 1171 m_image_sync_throttler->start_op(m_ioctx.get_namespace(), sync_id, on_start);
31f18b77
FG
1172}
1173
1174template <typename I>
1175void InstanceWatcher<I>::handle_sync_start(const std::string &instance_id,
1176 const std::string &sync_id,
1177 Context *on_finish) {
11fdf7f2 1178 dout(10) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl;
31f18b77 1179
9f95a23c 1180 std::lock_guard locker{m_lock};
31f18b77
FG
1181
1182 auto it = m_inflight_sync_reqs.find(sync_id);
1183 if (it == m_inflight_sync_reqs.end()) {
11fdf7f2 1184 dout(5) << "not found" << dendl;
31f18b77
FG
1185 m_work_queue->queue(on_finish, 0);
1186 return;
1187 }
1188
1189 auto sync_ctx = it->second;
1190
1191 if (sync_ctx->on_complete != nullptr) {
11fdf7f2 1192 dout(5) << "duplicate request" << dendl;
31f18b77
FG
1193 m_work_queue->queue(sync_ctx->on_complete, -ESTALE);
1194 }
1195
1196 sync_ctx->on_complete = on_finish;
7c673cae
FG
1197}
1198
1199template <typename I>
1200void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1201 const ImageAcquirePayload &payload,
1202 C_NotifyAck *on_notify_ack) {
11fdf7f2 1203 dout(10) << "image_acquire: instance_id=" << instance_id << ", "
7c673cae
FG
1204 << "request_id=" << payload.request_id << dendl;
1205
1206 auto on_finish = prepare_request(instance_id, payload.request_id,
1207 on_notify_ack);
1208 if (on_finish != nullptr) {
d2e6a577 1209 handle_image_acquire(payload.global_image_id, on_finish);
7c673cae
FG
1210 }
1211}
1212
1213template <typename I>
1214void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1215 const ImageReleasePayload &payload,
1216 C_NotifyAck *on_notify_ack) {
11fdf7f2 1217 dout(10) << "image_release: instance_id=" << instance_id << ", "
7c673cae
FG
1218 << "request_id=" << payload.request_id << dendl;
1219
1220 auto on_finish = prepare_request(instance_id, payload.request_id,
1221 on_notify_ack);
1222 if (on_finish != nullptr) {
d2e6a577
FG
1223 handle_image_release(payload.global_image_id, on_finish);
1224 }
1225}
1226
1227template <typename I>
1228void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1229 const PeerImageRemovedPayload &payload,
1230 C_NotifyAck *on_notify_ack) {
11fdf7f2 1231 dout(10) << "remove_peer_image: instance_id=" << instance_id << ", "
d2e6a577
FG
1232 << "request_id=" << payload.request_id << dendl;
1233
1234 auto on_finish = prepare_request(instance_id, payload.request_id,
1235 on_notify_ack);
1236 if (on_finish != nullptr) {
1237 handle_peer_image_removed(payload.global_image_id, payload.peer_mirror_uuid,
1238 on_finish);
7c673cae
FG
1239 }
1240}
1241
31f18b77
FG
1242template <typename I>
1243void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1244 const SyncRequestPayload &payload,
1245 C_NotifyAck *on_notify_ack) {
11fdf7f2 1246 dout(10) << "sync_request: instance_id=" << instance_id << ", "
31f18b77
FG
1247 << "request_id=" << payload.request_id << dendl;
1248
1249 auto on_finish = prepare_request(instance_id, payload.request_id,
1250 on_notify_ack);
1251 if (on_finish == nullptr) {
1252 return;
1253 }
1254
1255 handle_sync_request(instance_id, payload.sync_id, on_finish);
1256}
1257
1258template <typename I>
1259void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1260 const SyncStartPayload &payload,
1261 C_NotifyAck *on_notify_ack) {
11fdf7f2 1262 dout(10) << "sync_start: instance_id=" << instance_id << ", "
31f18b77
FG
1263 << "request_id=" << payload.request_id << dendl;
1264
1265 auto on_finish = prepare_request(instance_id, payload.request_id,
1266 on_notify_ack);
1267 if (on_finish == nullptr) {
1268 return;
1269 }
1270
1271 handle_sync_start(instance_id, payload.sync_id, on_finish);
1272}
1273
7c673cae
FG
1274template <typename I>
1275void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1276 const UnknownPayload &payload,
1277 C_NotifyAck *on_notify_ack) {
11fdf7f2 1278 dout(5) << "unknown: instance_id=" << instance_id << dendl;
7c673cae
FG
1279
1280 on_notify_ack->complete(0);
1281}
1282
1283} // namespace mirror
1284} // namespace rbd
1285
1286template class rbd::mirror::InstanceWatcher<librbd::ImageCtx>;