]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/InstanceWatcher.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / tools / rbd_mirror / InstanceWatcher.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "InstanceWatcher.h"
7c673cae
FG
5#include "include/stringify.h"
6#include "common/debug.h"
7#include "common/errno.h"
8#include "cls/rbd/cls_rbd_client.h"
9#include "librbd/ManagedLock.h"
10#include "librbd/Utils.h"
11#include "InstanceReplayer.h"
31f18b77 12#include "ImageSyncThrottler.h"
11fdf7f2 13#include "common/Cond.h"
7c673cae
FG
14
15#define dout_context g_ceph_context
16#define dout_subsys ceph_subsys_rbd_mirror
17#undef dout_prefix
18#define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: "
19
20namespace rbd {
21namespace mirror {
22
23using namespace instance_watcher;
24
25using librbd::util::create_async_context_callback;
26using librbd::util::create_context_callback;
27using librbd::util::create_rados_callback;
28using librbd::util::unique_lock_name;
29
30namespace {
31
32struct C_GetInstances : public Context {
33 std::vector<std::string> *instance_ids;
34 Context *on_finish;
35 bufferlist out_bl;
36
37 C_GetInstances(std::vector<std::string> *instance_ids, Context *on_finish)
38 : instance_ids(instance_ids), on_finish(on_finish) {
39 }
40
41 void finish(int r) override {
11fdf7f2 42 dout(10) << "C_GetInstances: " << this << " " << __func__ << ": r=" << r
7c673cae
FG
43 << dendl;
44
45 if (r == 0) {
11fdf7f2 46 auto it = out_bl.cbegin();
7c673cae
FG
47 r = librbd::cls_client::mirror_instances_list_finish(&it, instance_ids);
48 } else if (r == -ENOENT) {
49 r = 0;
50 }
51 on_finish->complete(r);
52 }
53};
54
55template <typename I>
56struct C_RemoveInstanceRequest : public Context {
57 InstanceWatcher<I> instance_watcher;
58 Context *on_finish;
59
60 C_RemoveInstanceRequest(librados::IoCtx &io_ctx, ContextWQ *work_queue,
61 const std::string &instance_id, Context *on_finish)
62 : instance_watcher(io_ctx, work_queue, nullptr, instance_id),
63 on_finish(on_finish) {
64 }
65
66 void send() {
11fdf7f2 67 dout(10) << "C_RemoveInstanceRequest: " << this << " " << __func__ << dendl;
7c673cae
FG
68
69 instance_watcher.remove(this);
70 }
71
72 void finish(int r) override {
11fdf7f2 73 dout(10) << "C_RemoveInstanceRequest: " << this << " " << __func__ << ": r="
7c673cae 74 << r << dendl;
11fdf7f2 75 ceph_assert(r == 0);
7c673cae
FG
76
77 on_finish->complete(r);
78 }
79};
80
81} // anonymous namespace
82
83template <typename I>
84struct InstanceWatcher<I>::C_NotifyInstanceRequest : public Context {
85 InstanceWatcher<I> *instance_watcher;
7c673cae
FG
86 std::string instance_id;
87 uint64_t request_id;
88 bufferlist bl;
89 Context *on_finish;
31f18b77
FG
90 bool send_to_leader;
91 std::unique_ptr<librbd::watcher::Notifier> notifier;
7c673cae 92 librbd::watcher::NotifyResponse response;
31f18b77 93 bool canceling = false;
7c673cae
FG
94
95 C_NotifyInstanceRequest(InstanceWatcher<I> *instance_watcher,
96 const std::string &instance_id, uint64_t request_id,
97 bufferlist &&bl, Context *on_finish)
31f18b77
FG
98 : instance_watcher(instance_watcher), instance_id(instance_id),
99 request_id(request_id), bl(bl), on_finish(on_finish),
100 send_to_leader(instance_id.empty()) {
11fdf7f2 101 dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__
31f18b77
FG
102 << ": instance_watcher=" << instance_watcher << ", instance_id="
103 << instance_id << ", request_id=" << request_id << dendl;
104
11fdf7f2 105 ceph_assert(instance_watcher->m_lock.is_locked());
31f18b77
FG
106
107 if (!send_to_leader) {
11fdf7f2 108 ceph_assert((!instance_id.empty()));
31f18b77
FG
109 notifier.reset(new librbd::watcher::Notifier(
110 instance_watcher->m_work_queue,
111 instance_watcher->m_ioctx,
112 RBD_MIRROR_INSTANCE_PREFIX + instance_id));
113 }
114
115 instance_watcher->m_notify_op_tracker.start_op();
7c673cae
FG
116 auto result = instance_watcher->m_notify_ops.insert(
117 std::make_pair(instance_id, this)).second;
11fdf7f2 118 ceph_assert(result);
7c673cae
FG
119 }
120
121 void send() {
11fdf7f2 122 dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl;
7c673cae 123
11fdf7f2 124 ceph_assert(instance_watcher->m_lock.is_locked());
31f18b77
FG
125
126 if (canceling) {
11fdf7f2 127 dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__
31f18b77
FG
128 << ": canceling" << dendl;
129 instance_watcher->m_work_queue->queue(this, -ECANCELED);
130 return;
131 }
132
133 if (send_to_leader) {
134 if (instance_watcher->m_leader_instance_id.empty()) {
11fdf7f2 135 dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__
31f18b77
FG
136 << ": suspending" << dendl;
137 instance_watcher->suspend_notify_request(this);
138 return;
139 }
140
141 if (instance_watcher->m_leader_instance_id != instance_id) {
142 auto count = instance_watcher->m_notify_ops.erase(
143 std::make_pair(instance_id, this));
11fdf7f2 144 ceph_assert(count > 0);
31f18b77
FG
145
146 instance_id = instance_watcher->m_leader_instance_id;
147
148 auto result = instance_watcher->m_notify_ops.insert(
149 std::make_pair(instance_id, this)).second;
11fdf7f2 150 ceph_assert(result);
31f18b77
FG
151
152 notifier.reset(new librbd::watcher::Notifier(
153 instance_watcher->m_work_queue,
154 instance_watcher->m_ioctx,
155 RBD_MIRROR_INSTANCE_PREFIX + instance_id));
156 }
157 }
158
11fdf7f2 159 dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__
d2e6a577 160 << ": sending to " << instance_id << dendl;
31f18b77 161 notifier->notify(bl, &response, this);
7c673cae
FG
162 }
163
164 void cancel() {
11fdf7f2 165 dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl;
7c673cae 166
11fdf7f2 167 ceph_assert(instance_watcher->m_lock.is_locked());
31f18b77
FG
168
169 canceling = true;
170 instance_watcher->unsuspend_notify_request(this);
7c673cae
FG
171 }
172
173 void finish(int r) override {
11fdf7f2 174 dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__ << ": r="
7c673cae
FG
175 << r << dendl;
176
177 if (r == 0 || r == -ETIMEDOUT) {
178 bool found = false;
179 for (auto &it : response.acks) {
180 auto &bl = it.second;
181 if (it.second.length() == 0) {
11fdf7f2
TL
182 dout(5) << "C_NotifyInstanceRequest: " << this << " " << __func__
183 << ": no payload in ack, ignoring" << dendl;
7c673cae
FG
184 continue;
185 }
186 try {
11fdf7f2 187 auto iter = bl.cbegin();
7c673cae 188 NotifyAckPayload ack;
11fdf7f2 189 decode(ack, iter);
7c673cae
FG
190 if (ack.instance_id != instance_watcher->get_instance_id()) {
191 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
192 << ": ack instance_id (" << ack.instance_id << ") "
193 << "does not match, ignoring" << dendl;
194 continue;
195 }
196 if (ack.request_id != request_id) {
197 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
198 << ": ack request_id (" << ack.request_id << ") "
199 << "does not match, ignoring" << dendl;
200 continue;
201 }
202 r = ack.ret_val;
203 found = true;
204 break;
205 } catch (const buffer::error &err) {
206 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
207 << ": failed to decode ack: " << err.what() << dendl;
208 continue;
209 }
210 }
211
212 if (!found) {
213 if (r == -ETIMEDOUT) {
31f18b77
FG
214 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
215 << ": resending after timeout" << dendl;
216 Mutex::Locker locker(instance_watcher->m_lock);
217 send();
218 return;
7c673cae
FG
219 } else {
220 r = -EINVAL;
221 }
31f18b77
FG
222 } else {
223 if (r == -ESTALE && send_to_leader) {
224 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
225 << ": resending due to leader change" << dendl;
226 Mutex::Locker locker(instance_watcher->m_lock);
227 send();
228 return;
229 }
7c673cae
FG
230 }
231 }
232
7c673cae
FG
233 on_finish->complete(r);
234
31f18b77
FG
235 {
236 Mutex::Locker locker(instance_watcher->m_lock);
237 auto result = instance_watcher->m_notify_ops.erase(
7c673cae 238 std::make_pair(instance_id, this));
11fdf7f2 239 ceph_assert(result > 0);
31f18b77
FG
240 instance_watcher->m_notify_op_tracker.finish_op();
241 }
242
7c673cae
FG
243 delete this;
244 }
245
246 void complete(int r) override {
247 finish(r);
248 }
249};
250
31f18b77
FG
251template <typename I>
252struct InstanceWatcher<I>::C_SyncRequest : public Context {
253 InstanceWatcher<I> *instance_watcher;
254 std::string sync_id;
255 Context *on_start;
256 Context *on_complete = nullptr;
257 C_NotifyInstanceRequest *req = nullptr;
258
259 C_SyncRequest(InstanceWatcher<I> *instance_watcher,
260 const std::string &sync_id, Context *on_start)
261 : instance_watcher(instance_watcher), sync_id(sync_id),
262 on_start(on_start) {
11fdf7f2 263 dout(10) << "C_SyncRequest: " << this << " " << __func__ << ": sync_id="
31f18b77
FG
264 << sync_id << dendl;
265 }
266
267 void finish(int r) override {
11fdf7f2 268 dout(10) << "C_SyncRequest: " << this << " " << __func__ << ": r="
31f18b77
FG
269 << r << dendl;
270
271 if (on_start != nullptr) {
272 instance_watcher->handle_notify_sync_request(this, r);
273 } else {
274 instance_watcher->handle_notify_sync_complete(this, r);
275 delete this;
276 }
277 }
278
279 // called twice
280 void complete(int r) override {
281 finish(r);
282 }
283};
284
7c673cae
FG
285#undef dout_prefix
286#define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: " \
287 << this << " " << __func__ << ": "
288template <typename I>
289void InstanceWatcher<I>::get_instances(librados::IoCtx &io_ctx,
290 std::vector<std::string> *instance_ids,
291 Context *on_finish) {
292 librados::ObjectReadOperation op;
293 librbd::cls_client::mirror_instances_list_start(&op);
294 C_GetInstances *ctx = new C_GetInstances(instance_ids, on_finish);
295 librados::AioCompletion *aio_comp = create_rados_callback(ctx);
296
297 int r = io_ctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op, &ctx->out_bl);
11fdf7f2 298 ceph_assert(r == 0);
7c673cae
FG
299 aio_comp->release();
300}
301
302template <typename I>
303void InstanceWatcher<I>::remove_instance(librados::IoCtx &io_ctx,
304 ContextWQ *work_queue,
305 const std::string &instance_id,
306 Context *on_finish) {
307 auto req = new C_RemoveInstanceRequest<I>(io_ctx, work_queue, instance_id,
308 on_finish);
309 req->send();
310}
311
312template <typename I>
313InstanceWatcher<I> *InstanceWatcher<I>::create(
314 librados::IoCtx &io_ctx, ContextWQ *work_queue,
315 InstanceReplayer<I> *instance_replayer) {
316 return new InstanceWatcher<I>(io_ctx, work_queue, instance_replayer,
317 stringify(io_ctx.get_instance_id()));
318}
319
320template <typename I>
321InstanceWatcher<I>::InstanceWatcher(librados::IoCtx &io_ctx,
322 ContextWQ *work_queue,
323 InstanceReplayer<I> *instance_replayer,
324 const std::string &instance_id)
325 : Watcher(io_ctx, work_queue, RBD_MIRROR_INSTANCE_PREFIX + instance_id),
326 m_instance_replayer(instance_replayer), m_instance_id(instance_id),
327 m_lock(unique_lock_name("rbd::mirror::InstanceWatcher::m_lock", this)),
328 m_instance_lock(librbd::ManagedLock<I>::create(
329 m_ioctx, m_work_queue, m_oid, this, librbd::managed_lock::EXCLUSIVE, true,
11fdf7f2 330 m_cct->_conf.get_val<uint64_t>("rbd_blacklist_expire_seconds"))) {
7c673cae
FG
331}
332
333template <typename I>
334InstanceWatcher<I>::~InstanceWatcher() {
11fdf7f2
TL
335 ceph_assert(m_requests.empty());
336 ceph_assert(m_notify_ops.empty());
337 ceph_assert(m_notify_op_tracker.empty());
338 ceph_assert(m_suspended_ops.empty());
339 ceph_assert(m_inflight_sync_reqs.empty());
340 ceph_assert(m_image_sync_throttler == nullptr);
7c673cae
FG
341 m_instance_lock->destroy();
342}
343
344template <typename I>
345int InstanceWatcher<I>::init() {
346 C_SaferCond init_ctx;
347 init(&init_ctx);
348 return init_ctx.wait();
349}
350
351template <typename I>
352void InstanceWatcher<I>::init(Context *on_finish) {
11fdf7f2 353 dout(10) << "instance_id=" << m_instance_id << dendl;
7c673cae
FG
354
355 Mutex::Locker locker(m_lock);
356
11fdf7f2 357 ceph_assert(m_on_finish == nullptr);
7c673cae
FG
358 m_on_finish = on_finish;
359 m_ret_val = 0;
360
361 register_instance();
362}
363
364template <typename I>
365void InstanceWatcher<I>::shut_down() {
366 C_SaferCond shut_down_ctx;
367 shut_down(&shut_down_ctx);
368 int r = shut_down_ctx.wait();
11fdf7f2 369 ceph_assert(r == 0);
7c673cae
FG
370}
371
372template <typename I>
373void InstanceWatcher<I>::shut_down(Context *on_finish) {
11fdf7f2 374 dout(10) << dendl;
7c673cae
FG
375
376 Mutex::Locker locker(m_lock);
377
11fdf7f2 378 ceph_assert(m_on_finish == nullptr);
7c673cae
FG
379 m_on_finish = on_finish;
380 m_ret_val = 0;
381
382 release_lock();
383}
384
385template <typename I>
386void InstanceWatcher<I>::remove(Context *on_finish) {
11fdf7f2 387 dout(10) << dendl;
7c673cae
FG
388
389 Mutex::Locker locker(m_lock);
390
11fdf7f2 391 ceph_assert(m_on_finish == nullptr);
7c673cae
FG
392 m_on_finish = on_finish;
393 m_ret_val = 0;
7c673cae
FG
394
395 get_instance_locker();
396}
397
398template <typename I>
399void InstanceWatcher<I>::notify_image_acquire(
400 const std::string &instance_id, const std::string &global_image_id,
d2e6a577 401 Context *on_notify_ack) {
11fdf7f2 402 dout(10) << "instance_id=" << instance_id << ", global_image_id="
7c673cae
FG
403 << global_image_id << dendl;
404
405 Mutex::Locker locker(m_lock);
406
11fdf7f2 407 ceph_assert(m_on_finish == nullptr);
7c673cae 408
11fdf7f2
TL
409 uint64_t request_id = ++m_request_seq;
410 bufferlist bl;
411 encode(NotifyMessage{ImageAcquirePayload{request_id, global_image_id}}, bl);
412 auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
413 std::move(bl), on_notify_ack);
414 req->send();
7c673cae
FG
415}
416
417template <typename I>
418void InstanceWatcher<I>::notify_image_release(
d2e6a577
FG
419 const std::string &instance_id, const std::string &global_image_id,
420 Context *on_notify_ack) {
11fdf7f2 421 dout(10) << "instance_id=" << instance_id << ", global_image_id="
7c673cae
FG
422 << global_image_id << dendl;
423
424 Mutex::Locker locker(m_lock);
425
11fdf7f2 426 ceph_assert(m_on_finish == nullptr);
7c673cae 427
11fdf7f2
TL
428 uint64_t request_id = ++m_request_seq;
429 bufferlist bl;
430 encode(NotifyMessage{ImageReleasePayload{request_id, global_image_id}}, bl);
431 auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
432 std::move(bl), on_notify_ack);
433 req->send();
d2e6a577
FG
434}
435
436template <typename I>
437void InstanceWatcher<I>::notify_peer_image_removed(
438 const std::string &instance_id, const std::string &global_image_id,
439 const std::string &peer_mirror_uuid, Context *on_notify_ack) {
11fdf7f2 440 dout(10) << "instance_id=" << instance_id << ", "
d2e6a577
FG
441 << "global_image_id=" << global_image_id << ", "
442 << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
443
444 Mutex::Locker locker(m_lock);
11fdf7f2 445 ceph_assert(m_on_finish == nullptr);
d2e6a577 446
11fdf7f2
TL
447 uint64_t request_id = ++m_request_seq;
448 bufferlist bl;
449 encode(NotifyMessage{PeerImageRemovedPayload{request_id, global_image_id,
450 peer_mirror_uuid}}, bl);
451 auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
452 std::move(bl), on_notify_ack);
453 req->send();
7c673cae
FG
454}
455
31f18b77
FG
456template <typename I>
457void InstanceWatcher<I>::notify_sync_request(const std::string &sync_id,
458 Context *on_sync_start) {
11fdf7f2 459 dout(10) << "sync_id=" << sync_id << dendl;
31f18b77
FG
460
461 Mutex::Locker locker(m_lock);
462
11fdf7f2 463 ceph_assert(m_inflight_sync_reqs.count(sync_id) == 0);
31f18b77
FG
464
465 uint64_t request_id = ++m_request_seq;
466
467 bufferlist bl;
11fdf7f2 468 encode(NotifyMessage{SyncRequestPayload{request_id, sync_id}}, bl);
31f18b77
FG
469
470 auto sync_ctx = new C_SyncRequest(this, sync_id, on_sync_start);
471 sync_ctx->req = new C_NotifyInstanceRequest(this, "", request_id,
472 std::move(bl), sync_ctx);
473
474 m_inflight_sync_reqs[sync_id] = sync_ctx;
475 sync_ctx->req->send();
476}
477
478template <typename I>
479bool InstanceWatcher<I>::cancel_sync_request(const std::string &sync_id) {
11fdf7f2 480 dout(10) << "sync_id=" << sync_id << dendl;
31f18b77
FG
481
482 Mutex::Locker locker(m_lock);
483
484 auto it = m_inflight_sync_reqs.find(sync_id);
485 if (it == m_inflight_sync_reqs.end()) {
486 return false;
487 }
488
489 auto sync_ctx = it->second;
490
491 if (sync_ctx->on_start == nullptr) {
492 return false;
493 }
494
11fdf7f2 495 ceph_assert(sync_ctx->req != nullptr);
31f18b77
FG
496 sync_ctx->req->cancel();
497 return true;
498}
499
500template <typename I>
501void InstanceWatcher<I>::notify_sync_start(const std::string &instance_id,
502 const std::string &sync_id) {
11fdf7f2 503 dout(10) << "sync_id=" << sync_id << dendl;
31f18b77
FG
504
505 Mutex::Locker locker(m_lock);
506
507 uint64_t request_id = ++m_request_seq;
508
509 bufferlist bl;
11fdf7f2 510 encode(NotifyMessage{SyncStartPayload{request_id, sync_id}}, bl);
31f18b77
FG
511
512 auto ctx = new FunctionContext(
513 [this, sync_id] (int r) {
11fdf7f2 514 dout(10) << "finish: sync_id=" << sync_id << ", r=" << r << dendl;
31f18b77
FG
515 Mutex::Locker locker(m_lock);
516 if (r != -ESTALE && m_image_sync_throttler != nullptr) {
517 m_image_sync_throttler->finish_op(sync_id);
518 }
519 });
520 auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
521 std::move(bl), ctx);
522 req->send();
523}
524
525template <typename I>
526void InstanceWatcher<I>::notify_sync_complete(const std::string &sync_id) {
31f18b77 527 Mutex::Locker locker(m_lock);
28e407b8
AA
528 notify_sync_complete(m_lock, sync_id);
529}
530
531template <typename I>
532void InstanceWatcher<I>::notify_sync_complete(const Mutex&,
533 const std::string &sync_id) {
534 dout(10) << "sync_id=" << sync_id << dendl;
11fdf7f2 535 ceph_assert(m_lock.is_locked());
31f18b77
FG
536
537 auto it = m_inflight_sync_reqs.find(sync_id);
11fdf7f2 538 ceph_assert(it != m_inflight_sync_reqs.end());
31f18b77
FG
539
540 auto sync_ctx = it->second;
11fdf7f2 541 ceph_assert(sync_ctx->req == nullptr);
31f18b77
FG
542
543 m_inflight_sync_reqs.erase(it);
544 m_work_queue->queue(sync_ctx, 0);
545}
546
547template <typename I>
548void InstanceWatcher<I>::handle_notify_sync_request(C_SyncRequest *sync_ctx,
549 int r) {
11fdf7f2 550 dout(10) << "sync_id=" << sync_ctx->sync_id << ", r=" << r << dendl;
31f18b77
FG
551
552 Context *on_start = nullptr;
553 {
554 Mutex::Locker locker(m_lock);
11fdf7f2
TL
555 ceph_assert(sync_ctx->req != nullptr);
556 ceph_assert(sync_ctx->on_start != nullptr);
31f18b77
FG
557
558 if (sync_ctx->req->canceling) {
559 r = -ECANCELED;
560 }
561
562 std::swap(sync_ctx->on_start, on_start);
563 sync_ctx->req = nullptr;
28e407b8
AA
564
565 if (r == -ECANCELED) {
566 notify_sync_complete(m_lock, sync_ctx->sync_id);
567 }
31f18b77
FG
568 }
569
570 on_start->complete(r == -ECANCELED ? r : 0);
31f18b77
FG
571}
572
573template <typename I>
574void InstanceWatcher<I>::handle_notify_sync_complete(C_SyncRequest *sync_ctx,
575 int r) {
11fdf7f2 576 dout(10) << "sync_id=" << sync_ctx->sync_id << ", r=" << r << dendl;
31f18b77
FG
577
578 if (sync_ctx->on_complete != nullptr) {
579 sync_ctx->on_complete->complete(r);
580 }
581}
582
583template <typename I>
584void InstanceWatcher<I>::print_sync_status(Formatter *f, stringstream *ss) {
11fdf7f2 585 dout(10) << dendl;
31f18b77
FG
586
587 Mutex::Locker locker(m_lock);
588 if (m_image_sync_throttler != nullptr) {
589 m_image_sync_throttler->print_status(f, ss);
590 }
591}
592
593template <typename I>
594void InstanceWatcher<I>::handle_acquire_leader() {
11fdf7f2 595 dout(10) << dendl;
31f18b77
FG
596
597 Mutex::Locker locker(m_lock);
598
11fdf7f2
TL
599 ceph_assert(m_image_sync_throttler == nullptr);
600 m_image_sync_throttler = ImageSyncThrottler<I>::create(m_cct);
31f18b77
FG
601
602 m_leader_instance_id = m_instance_id;
603 unsuspend_notify_requests();
604}
605
606template <typename I>
607void InstanceWatcher<I>::handle_release_leader() {
11fdf7f2 608 dout(10) << dendl;
31f18b77
FG
609
610 Mutex::Locker locker(m_lock);
611
11fdf7f2 612 ceph_assert(m_image_sync_throttler != nullptr);
31f18b77
FG
613
614 m_leader_instance_id.clear();
615
616 m_image_sync_throttler->drain(-ESTALE);
617 m_image_sync_throttler->destroy();
618 m_image_sync_throttler = nullptr;
619}
620
621template <typename I>
622void InstanceWatcher<I>::handle_update_leader(
623 const std::string &leader_instance_id) {
11fdf7f2 624 dout(10) << "leader_instance_id=" << leader_instance_id << dendl;
31f18b77
FG
625
626 Mutex::Locker locker(m_lock);
627
628 m_leader_instance_id = leader_instance_id;
629
630 if (!m_leader_instance_id.empty()) {
631 unsuspend_notify_requests();
632 }
633}
634
7c673cae
FG
635template <typename I>
636void InstanceWatcher<I>::cancel_notify_requests(
637 const std::string &instance_id) {
11fdf7f2 638 dout(10) << "instance_id=" << instance_id << dendl;
7c673cae
FG
639
640 Mutex::Locker locker(m_lock);
641
642 for (auto op : m_notify_ops) {
31f18b77 643 if (op.first == instance_id && !op.second->send_to_leader) {
7c673cae
FG
644 op.second->cancel();
645 }
646 }
647}
648
7c673cae
FG
649template <typename I>
650void InstanceWatcher<I>::register_instance() {
11fdf7f2 651 ceph_assert(m_lock.is_locked());
7c673cae 652
11fdf7f2 653 dout(10) << dendl;
7c673cae
FG
654
655 librados::ObjectWriteOperation op;
656 librbd::cls_client::mirror_instances_add(&op, m_instance_id);
657 librados::AioCompletion *aio_comp = create_rados_callback<
658 InstanceWatcher<I>, &InstanceWatcher<I>::handle_register_instance>(this);
659
660 int r = m_ioctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op);
11fdf7f2 661 ceph_assert(r == 0);
7c673cae
FG
662 aio_comp->release();
663}
664
665template <typename I>
666void InstanceWatcher<I>::handle_register_instance(int r) {
11fdf7f2 667 dout(10) << "r=" << r << dendl;
7c673cae
FG
668
669 Context *on_finish = nullptr;
670 {
671 Mutex::Locker locker(m_lock);
672
673 if (r == 0) {
674 create_instance_object();
675 return;
676 }
677
678 derr << "error registering instance: " << cpp_strerror(r) << dendl;
679
680 std::swap(on_finish, m_on_finish);
681 }
682 on_finish->complete(r);
683}
684
685
686template <typename I>
687void InstanceWatcher<I>::create_instance_object() {
11fdf7f2 688 dout(10) << dendl;
7c673cae 689
11fdf7f2 690 ceph_assert(m_lock.is_locked());
7c673cae
FG
691
692 librados::ObjectWriteOperation op;
693 op.create(true);
694
695 librados::AioCompletion *aio_comp = create_rados_callback<
696 InstanceWatcher<I>,
697 &InstanceWatcher<I>::handle_create_instance_object>(this);
698 int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
11fdf7f2 699 ceph_assert(r == 0);
7c673cae
FG
700 aio_comp->release();
701}
702
703template <typename I>
704void InstanceWatcher<I>::handle_create_instance_object(int r) {
11fdf7f2 705 dout(10) << "r=" << r << dendl;
7c673cae
FG
706
707 Mutex::Locker locker(m_lock);
708
709 if (r < 0) {
710 derr << "error creating " << m_oid << " object: " << cpp_strerror(r)
711 << dendl;
712
713 m_ret_val = r;
714 unregister_instance();
715 return;
716 }
717
718 register_watch();
719}
720
721template <typename I>
722void InstanceWatcher<I>::register_watch() {
11fdf7f2 723 dout(10) << dendl;
7c673cae 724
11fdf7f2 725 ceph_assert(m_lock.is_locked());
7c673cae
FG
726
727 Context *ctx = create_async_context_callback(
728 m_work_queue, create_context_callback<
729 InstanceWatcher<I>, &InstanceWatcher<I>::handle_register_watch>(this));
730
731 librbd::Watcher::register_watch(ctx);
732}
733
734template <typename I>
735void InstanceWatcher<I>::handle_register_watch(int r) {
11fdf7f2 736 dout(10) << "r=" << r << dendl;
7c673cae
FG
737
738 Mutex::Locker locker(m_lock);
739
740 if (r < 0) {
741 derr << "error registering instance watcher for " << m_oid << " object: "
742 << cpp_strerror(r) << dendl;
743
744 m_ret_val = r;
745 remove_instance_object();
746 return;
747 }
748
749 acquire_lock();
750}
751
752template <typename I>
753void InstanceWatcher<I>::acquire_lock() {
11fdf7f2 754 dout(10) << dendl;
7c673cae 755
11fdf7f2 756 ceph_assert(m_lock.is_locked());
7c673cae
FG
757
758 Context *ctx = create_async_context_callback(
759 m_work_queue, create_context_callback<
760 InstanceWatcher<I>, &InstanceWatcher<I>::handle_acquire_lock>(this));
761
762 m_instance_lock->acquire_lock(ctx);
763}
764
765template <typename I>
766void InstanceWatcher<I>::handle_acquire_lock(int r) {
11fdf7f2 767 dout(10) << "r=" << r << dendl;
7c673cae
FG
768
769 Context *on_finish = nullptr;
770 {
771 Mutex::Locker locker(m_lock);
772
773 if (r < 0) {
774
775 derr << "error acquiring instance lock: " << cpp_strerror(r) << dendl;
776
777 m_ret_val = r;
778 unregister_watch();
779 return;
780 }
781
782 std::swap(on_finish, m_on_finish);
783 }
784
785 on_finish->complete(r);
786}
787
788template <typename I>
789void InstanceWatcher<I>::release_lock() {
11fdf7f2 790 dout(10) << dendl;
7c673cae 791
11fdf7f2 792 ceph_assert(m_lock.is_locked());
7c673cae
FG
793
794 Context *ctx = create_async_context_callback(
795 m_work_queue, create_context_callback<
796 InstanceWatcher<I>, &InstanceWatcher<I>::handle_release_lock>(this));
797
798 m_instance_lock->shut_down(ctx);
799}
800
801template <typename I>
802void InstanceWatcher<I>::handle_release_lock(int r) {
11fdf7f2 803 dout(10) << "r=" << r << dendl;
7c673cae
FG
804
805 Mutex::Locker locker(m_lock);
806
807 if (r < 0) {
808 derr << "error releasing instance lock: " << cpp_strerror(r) << dendl;
809 }
810
811 unregister_watch();
812}
813
814template <typename I>
815void InstanceWatcher<I>::unregister_watch() {
11fdf7f2 816 dout(10) << dendl;
7c673cae 817
11fdf7f2 818 ceph_assert(m_lock.is_locked());
7c673cae
FG
819
820 Context *ctx = create_async_context_callback(
821 m_work_queue, create_context_callback<
822 InstanceWatcher<I>, &InstanceWatcher<I>::handle_unregister_watch>(this));
823
824 librbd::Watcher::unregister_watch(ctx);
825}
826
827template <typename I>
828void InstanceWatcher<I>::handle_unregister_watch(int r) {
11fdf7f2 829 dout(10) << "r=" << r << dendl;
7c673cae
FG
830
831 if (r < 0) {
832 derr << "error unregistering instance watcher for " << m_oid << " object: "
833 << cpp_strerror(r) << dendl;
834 }
835
836 Mutex::Locker locker(m_lock);
837 remove_instance_object();
838}
839
840template <typename I>
841void InstanceWatcher<I>::remove_instance_object() {
11fdf7f2 842 ceph_assert(m_lock.is_locked());
7c673cae 843
11fdf7f2 844 dout(10) << dendl;
7c673cae
FG
845
846 librados::ObjectWriteOperation op;
847 op.remove();
848
849 librados::AioCompletion *aio_comp = create_rados_callback<
850 InstanceWatcher<I>,
851 &InstanceWatcher<I>::handle_remove_instance_object>(this);
852 int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
11fdf7f2 853 ceph_assert(r == 0);
7c673cae
FG
854 aio_comp->release();
855}
856
857template <typename I>
858void InstanceWatcher<I>::handle_remove_instance_object(int r) {
11fdf7f2 859 dout(10) << "r=" << r << dendl;
7c673cae 860
11fdf7f2 861 if (r == -ENOENT) {
7c673cae
FG
862 r = 0;
863 }
864
865 if (r < 0) {
866 derr << "error removing " << m_oid << " object: " << cpp_strerror(r)
867 << dendl;
868 }
869
870 Mutex::Locker locker(m_lock);
871 unregister_instance();
872}
873
874template <typename I>
875void InstanceWatcher<I>::unregister_instance() {
11fdf7f2 876 dout(10) << dendl;
7c673cae 877
11fdf7f2 878 ceph_assert(m_lock.is_locked());
7c673cae
FG
879
880 librados::ObjectWriteOperation op;
881 librbd::cls_client::mirror_instances_remove(&op, m_instance_id);
882 librados::AioCompletion *aio_comp = create_rados_callback<
883 InstanceWatcher<I>, &InstanceWatcher<I>::handle_unregister_instance>(this);
884
885 int r = m_ioctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op);
11fdf7f2 886 ceph_assert(r == 0);
7c673cae
FG
887 aio_comp->release();
888}
889
890template <typename I>
891void InstanceWatcher<I>::handle_unregister_instance(int r) {
11fdf7f2 892 dout(10) << "r=" << r << dendl;
7c673cae
FG
893
894 if (r < 0) {
895 derr << "error unregistering instance: " << cpp_strerror(r) << dendl;
896 }
897
898 Mutex::Locker locker(m_lock);
899 wait_for_notify_ops();
900}
901
902template <typename I>
903void InstanceWatcher<I>::wait_for_notify_ops() {
11fdf7f2 904 dout(10) << dendl;
7c673cae 905
11fdf7f2 906 ceph_assert(m_lock.is_locked());
7c673cae
FG
907
908 for (auto op : m_notify_ops) {
909 op.second->cancel();
910 }
911
912 Context *ctx = create_async_context_callback(
913 m_work_queue, create_context_callback<
914 InstanceWatcher<I>, &InstanceWatcher<I>::handle_wait_for_notify_ops>(this));
915
916 m_notify_op_tracker.wait_for_ops(ctx);
917}
918
919template <typename I>
920void InstanceWatcher<I>::handle_wait_for_notify_ops(int r) {
11fdf7f2 921 dout(10) << "r=" << r << dendl;
7c673cae 922
11fdf7f2 923 ceph_assert(r == 0);
7c673cae
FG
924
925 Context *on_finish = nullptr;
926 {
927 Mutex::Locker locker(m_lock);
928
11fdf7f2 929 ceph_assert(m_notify_ops.empty());
7c673cae
FG
930
931 std::swap(on_finish, m_on_finish);
932 r = m_ret_val;
7c673cae
FG
933 }
934 on_finish->complete(r);
935}
936
937template <typename I>
938void InstanceWatcher<I>::get_instance_locker() {
11fdf7f2 939 dout(10) << dendl;
7c673cae 940
11fdf7f2 941 ceph_assert(m_lock.is_locked());
7c673cae
FG
942
943 Context *ctx = create_async_context_callback(
944 m_work_queue, create_context_callback<
945 InstanceWatcher<I>, &InstanceWatcher<I>::handle_get_instance_locker>(this));
946
947 m_instance_lock->get_locker(&m_instance_locker, ctx);
948}
949
950template <typename I>
951void InstanceWatcher<I>::handle_get_instance_locker(int r) {
11fdf7f2 952 dout(10) << "r=" << r << dendl;
7c673cae
FG
953
954 Mutex::Locker locker(m_lock);
955
956 if (r < 0) {
957 if (r != -ENOENT) {
958 derr << "error retrieving instance locker: " << cpp_strerror(r) << dendl;
959 }
960 remove_instance_object();
961 return;
962 }
963
964 break_instance_lock();
965}
966
967template <typename I>
968void InstanceWatcher<I>::break_instance_lock() {
11fdf7f2 969 dout(10) << dendl;
7c673cae 970
11fdf7f2 971 ceph_assert(m_lock.is_locked());
7c673cae
FG
972
973 Context *ctx = create_async_context_callback(
974 m_work_queue, create_context_callback<
975 InstanceWatcher<I>, &InstanceWatcher<I>::handle_break_instance_lock>(this));
976
977 m_instance_lock->break_lock(m_instance_locker, true, ctx);
978}
979
980template <typename I>
981void InstanceWatcher<I>::handle_break_instance_lock(int r) {
11fdf7f2 982 dout(10) << "r=" << r << dendl;
7c673cae
FG
983
984 Mutex::Locker locker(m_lock);
985
986 if (r < 0) {
987 if (r != -ENOENT) {
988 derr << "error breaking instance lock: " << cpp_strerror(r) << dendl;
989 }
990 remove_instance_object();
991 return;
992 }
993
994 remove_instance_object();
995}
996
31f18b77
FG
997template <typename I>
998void InstanceWatcher<I>::suspend_notify_request(C_NotifyInstanceRequest *req) {
11fdf7f2 999 dout(10) << req << dendl;
31f18b77 1000
11fdf7f2 1001 ceph_assert(m_lock.is_locked());
31f18b77
FG
1002
1003 auto result = m_suspended_ops.insert(req).second;
11fdf7f2 1004 ceph_assert(result);
31f18b77
FG
1005}
1006
1007template <typename I>
1008bool InstanceWatcher<I>::unsuspend_notify_request(
1009 C_NotifyInstanceRequest *req) {
11fdf7f2 1010 dout(10) << req << dendl;
31f18b77 1011
11fdf7f2 1012 ceph_assert(m_lock.is_locked());
31f18b77
FG
1013
1014 auto result = m_suspended_ops.erase(req);
1015 if (result == 0) {
1016 return false;
1017 }
1018
1019 req->send();
1020 return true;
1021}
1022
1023template <typename I>
1024void InstanceWatcher<I>::unsuspend_notify_requests() {
11fdf7f2 1025 dout(10) << dendl;
31f18b77 1026
11fdf7f2 1027 ceph_assert(m_lock.is_locked());
31f18b77
FG
1028
1029 std::set<C_NotifyInstanceRequest *> suspended_ops;
1030 std::swap(m_suspended_ops, suspended_ops);
1031
1032 for (auto op : suspended_ops) {
1033 op->send();
1034 }
1035}
1036
7c673cae
FG
1037template <typename I>
1038Context *InstanceWatcher<I>::prepare_request(const std::string &instance_id,
1039 uint64_t request_id,
1040 C_NotifyAck *on_notify_ack) {
11fdf7f2 1041 dout(10) << "instance_id=" << instance_id << ", request_id=" << request_id
7c673cae
FG
1042 << dendl;
1043
1044 Mutex::Locker locker(m_lock);
1045
1046 Context *ctx = nullptr;
1047 Request request(instance_id, request_id);
1048 auto it = m_requests.find(request);
1049
1050 if (it != m_requests.end()) {
11fdf7f2 1051 dout(10) << "duplicate for in-progress request" << dendl;
7c673cae
FG
1052 delete it->on_notify_ack;
1053 m_requests.erase(it);
1054 } else {
31f18b77
FG
1055 ctx = create_async_context_callback(
1056 m_work_queue, new FunctionContext(
1057 [this, instance_id, request_id] (int r) {
1058 complete_request(instance_id, request_id, r);
1059 }));
7c673cae
FG
1060 }
1061
1062 request.on_notify_ack = on_notify_ack;
1063 m_requests.insert(request);
1064 return ctx;
1065}
1066
31f18b77
FG
1067template <typename I>
1068void InstanceWatcher<I>::complete_request(const std::string &instance_id,
1069 uint64_t request_id, int r) {
11fdf7f2 1070 dout(10) << "instance_id=" << instance_id << ", request_id=" << request_id
31f18b77
FG
1071 << dendl;
1072
1073 C_NotifyAck *on_notify_ack;
1074 {
1075 Mutex::Locker locker(m_lock);
1076 Request request(instance_id, request_id);
1077 auto it = m_requests.find(request);
11fdf7f2 1078 ceph_assert(it != m_requests.end());
31f18b77
FG
1079 on_notify_ack = it->on_notify_ack;
1080 m_requests.erase(it);
1081 }
1082
11fdf7f2 1083 encode(NotifyAckPayload(instance_id, request_id, r), on_notify_ack->out);
31f18b77
FG
1084 on_notify_ack->complete(0);
1085}
1086
7c673cae
FG
1087template <typename I>
1088void InstanceWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
1089 uint64_t notifier_id, bufferlist &bl) {
11fdf7f2 1090 dout(10) << "notify_id=" << notify_id << ", handle=" << handle << ", "
7c673cae
FG
1091 << "notifier_id=" << notifier_id << dendl;
1092
1093 auto ctx = new C_NotifyAck(this, notify_id, handle);
1094
1095 NotifyMessage notify_message;
1096 try {
11fdf7f2
TL
1097 auto iter = bl.cbegin();
1098 decode(notify_message, iter);
7c673cae
FG
1099 } catch (const buffer::error &err) {
1100 derr << "error decoding image notification: " << err.what() << dendl;
1101 ctx->complete(0);
1102 return;
1103 }
1104
1105 apply_visitor(HandlePayloadVisitor(this, stringify(notifier_id), ctx),
1106 notify_message.payload);
1107}
1108
1109template <typename I>
1110void InstanceWatcher<I>::handle_image_acquire(
d2e6a577 1111 const std::string &global_image_id, Context *on_finish) {
11fdf7f2 1112 dout(10) << "global_image_id=" << global_image_id << dendl;
7c673cae 1113
31f18b77 1114 auto ctx = new FunctionContext(
d2e6a577
FG
1115 [this, global_image_id, on_finish] (int r) {
1116 m_instance_replayer->acquire_image(this, global_image_id, on_finish);
31f18b77
FG
1117 m_notify_op_tracker.finish_op();
1118 });
1119
1120 m_notify_op_tracker.start_op();
1121 m_work_queue->queue(ctx, 0);
7c673cae
FG
1122}
1123
1124template <typename I>
1125void InstanceWatcher<I>::handle_image_release(
d2e6a577 1126 const std::string &global_image_id, Context *on_finish) {
11fdf7f2 1127 dout(10) << "global_image_id=" << global_image_id << dendl;
7c673cae 1128
31f18b77 1129 auto ctx = new FunctionContext(
d2e6a577
FG
1130 [this, global_image_id, on_finish] (int r) {
1131 m_instance_replayer->release_image(global_image_id, on_finish);
1132 m_notify_op_tracker.finish_op();
1133 });
1134
1135 m_notify_op_tracker.start_op();
1136 m_work_queue->queue(ctx, 0);
1137}
1138
1139template <typename I>
1140void InstanceWatcher<I>::handle_peer_image_removed(
1141 const std::string &global_image_id, const std::string &peer_mirror_uuid,
1142 Context *on_finish) {
11fdf7f2 1143 dout(10) << "global_image_id=" << global_image_id << ", "
d2e6a577
FG
1144 << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
1145
1146 auto ctx = new FunctionContext(
1147 [this, peer_mirror_uuid, global_image_id, on_finish] (int r) {
1148 m_instance_replayer->remove_peer_image(global_image_id,
1149 peer_mirror_uuid, on_finish);
31f18b77
FG
1150 m_notify_op_tracker.finish_op();
1151 });
1152
1153 m_notify_op_tracker.start_op();
1154 m_work_queue->queue(ctx, 0);
1155}
1156
1157template <typename I>
1158void InstanceWatcher<I>::handle_sync_request(const std::string &instance_id,
1159 const std::string &sync_id,
1160 Context *on_finish) {
11fdf7f2 1161 dout(10) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl;
31f18b77
FG
1162
1163 Mutex::Locker locker(m_lock);
1164
1165 if (m_image_sync_throttler == nullptr) {
11fdf7f2 1166 dout(10) << "sync request for non-leader" << dendl;
31f18b77
FG
1167 m_work_queue->queue(on_finish, -ESTALE);
1168 return;
1169 }
1170
1171 Context *on_start = create_async_context_callback(
1172 m_work_queue, new FunctionContext(
1173 [this, instance_id, sync_id, on_finish] (int r) {
11fdf7f2 1174 dout(10) << "handle_sync_request: finish: instance_id=" << instance_id
31f18b77
FG
1175 << ", sync_id=" << sync_id << ", r=" << r << dendl;
1176 if (r == 0) {
1177 notify_sync_start(instance_id, sync_id);
1178 }
1179 on_finish->complete(r);
1180 }));
1181 m_image_sync_throttler->start_op(sync_id, on_start);
1182}
1183
1184template <typename I>
1185void InstanceWatcher<I>::handle_sync_start(const std::string &instance_id,
1186 const std::string &sync_id,
1187 Context *on_finish) {
11fdf7f2 1188 dout(10) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl;
31f18b77
FG
1189
1190 Mutex::Locker locker(m_lock);
1191
1192 auto it = m_inflight_sync_reqs.find(sync_id);
1193 if (it == m_inflight_sync_reqs.end()) {
11fdf7f2 1194 dout(5) << "not found" << dendl;
31f18b77
FG
1195 m_work_queue->queue(on_finish, 0);
1196 return;
1197 }
1198
1199 auto sync_ctx = it->second;
1200
1201 if (sync_ctx->on_complete != nullptr) {
11fdf7f2 1202 dout(5) << "duplicate request" << dendl;
31f18b77
FG
1203 m_work_queue->queue(sync_ctx->on_complete, -ESTALE);
1204 }
1205
1206 sync_ctx->on_complete = on_finish;
7c673cae
FG
1207}
1208
1209template <typename I>
1210void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1211 const ImageAcquirePayload &payload,
1212 C_NotifyAck *on_notify_ack) {
11fdf7f2 1213 dout(10) << "image_acquire: instance_id=" << instance_id << ", "
7c673cae
FG
1214 << "request_id=" << payload.request_id << dendl;
1215
1216 auto on_finish = prepare_request(instance_id, payload.request_id,
1217 on_notify_ack);
1218 if (on_finish != nullptr) {
d2e6a577 1219 handle_image_acquire(payload.global_image_id, on_finish);
7c673cae
FG
1220 }
1221}
1222
1223template <typename I>
1224void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1225 const ImageReleasePayload &payload,
1226 C_NotifyAck *on_notify_ack) {
11fdf7f2 1227 dout(10) << "image_release: instance_id=" << instance_id << ", "
7c673cae
FG
1228 << "request_id=" << payload.request_id << dendl;
1229
1230 auto on_finish = prepare_request(instance_id, payload.request_id,
1231 on_notify_ack);
1232 if (on_finish != nullptr) {
d2e6a577
FG
1233 handle_image_release(payload.global_image_id, on_finish);
1234 }
1235}
1236
1237template <typename I>
1238void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1239 const PeerImageRemovedPayload &payload,
1240 C_NotifyAck *on_notify_ack) {
11fdf7f2 1241 dout(10) << "remove_peer_image: instance_id=" << instance_id << ", "
d2e6a577
FG
1242 << "request_id=" << payload.request_id << dendl;
1243
1244 auto on_finish = prepare_request(instance_id, payload.request_id,
1245 on_notify_ack);
1246 if (on_finish != nullptr) {
1247 handle_peer_image_removed(payload.global_image_id, payload.peer_mirror_uuid,
1248 on_finish);
7c673cae
FG
1249 }
1250}
1251
31f18b77
FG
1252template <typename I>
1253void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1254 const SyncRequestPayload &payload,
1255 C_NotifyAck *on_notify_ack) {
11fdf7f2 1256 dout(10) << "sync_request: instance_id=" << instance_id << ", "
31f18b77
FG
1257 << "request_id=" << payload.request_id << dendl;
1258
1259 auto on_finish = prepare_request(instance_id, payload.request_id,
1260 on_notify_ack);
1261 if (on_finish == nullptr) {
1262 return;
1263 }
1264
1265 handle_sync_request(instance_id, payload.sync_id, on_finish);
1266}
1267
1268template <typename I>
1269void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1270 const SyncStartPayload &payload,
1271 C_NotifyAck *on_notify_ack) {
11fdf7f2 1272 dout(10) << "sync_start: instance_id=" << instance_id << ", "
31f18b77
FG
1273 << "request_id=" << payload.request_id << dendl;
1274
1275 auto on_finish = prepare_request(instance_id, payload.request_id,
1276 on_notify_ack);
1277 if (on_finish == nullptr) {
1278 return;
1279 }
1280
1281 handle_sync_start(instance_id, payload.sync_id, on_finish);
1282}
1283
7c673cae
FG
1284template <typename I>
1285void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1286 const UnknownPayload &payload,
1287 C_NotifyAck *on_notify_ack) {
11fdf7f2 1288 dout(5) << "unknown: instance_id=" << instance_id << dendl;
7c673cae
FG
1289
1290 on_notify_ack->complete(0);
1291}
1292
1293} // namespace mirror
1294} // namespace rbd
1295
1296template class rbd::mirror::InstanceWatcher<librbd::ImageCtx>;