]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/image_replayer/snapshot/Replayer.cc
0f149f0d1d89968edb42121b46b7db088dee876b
[ceph.git] / ceph / src / tools / rbd_mirror / image_replayer / snapshot / Replayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "Replayer.h"
5 #include "common/debug.h"
6 #include "common/errno.h"
7 #include "include/stringify.h"
8 #include "common/Timer.h"
9 #include "common/WorkQueue.h"
10 #include "cls/rbd/cls_rbd_client.h"
11 #include "json_spirit/json_spirit.h"
12 #include "librbd/ImageCtx.h"
13 #include "librbd/ImageState.h"
14 #include "librbd/Operations.h"
15 #include "librbd/Utils.h"
16 #include "librbd/deep_copy/Handler.h"
17 #include "librbd/deep_copy/ImageCopyRequest.h"
18 #include "librbd/deep_copy/SnapshotCopyRequest.h"
19 #include "librbd/mirror/snapshot/CreateNonPrimaryRequest.h"
20 #include "librbd/mirror/snapshot/GetImageStateRequest.h"
21 #include "librbd/mirror/snapshot/ImageMeta.h"
22 #include "librbd/mirror/snapshot/UnlinkPeerRequest.h"
23 #include "tools/rbd_mirror/InstanceWatcher.h"
24 #include "tools/rbd_mirror/PoolMetaCache.h"
25 #include "tools/rbd_mirror/Threads.h"
26 #include "tools/rbd_mirror/Types.h"
27 #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
28 #include "tools/rbd_mirror/image_replayer/ReplayerListener.h"
29 #include "tools/rbd_mirror/image_replayer/Utils.h"
30 #include "tools/rbd_mirror/image_replayer/snapshot/ApplyImageStateRequest.h"
31 #include "tools/rbd_mirror/image_replayer/snapshot/StateBuilder.h"
32 #include "tools/rbd_mirror/image_replayer/snapshot/Utils.h"
33 #include <set>
34
35 #define dout_context g_ceph_context
36 #define dout_subsys ceph_subsys_rbd_mirror
37 #undef dout_prefix
38 #define dout_prefix *_dout << "rbd::mirror::image_replayer::snapshot::" \
39 << "Replayer: " << this << " " << __func__ << ": "
40
41 extern PerfCounters *g_perf_counters;
42
43 namespace rbd {
44 namespace mirror {
45 namespace image_replayer {
46 namespace snapshot {
47
48 namespace {
49
50 double round_to_two_places(double value) {
51 return abs(round(value * 100) / 100);
52 }
53
54 template<typename I>
55 std::pair<uint64_t, librbd::SnapInfo*> get_newest_mirror_snapshot(
56 I* image_ctx) {
57 for (auto snap_info_it = image_ctx->snap_info.rbegin();
58 snap_info_it != image_ctx->snap_info.rend(); ++snap_info_it) {
59 const auto& snap_ns = snap_info_it->second.snap_namespace;
60 auto mirror_ns = boost::get<
61 cls::rbd::MirrorSnapshotNamespace>(&snap_ns);
62 if (mirror_ns == nullptr || !mirror_ns->complete) {
63 continue;
64 }
65
66 return {snap_info_it->first, &snap_info_it->second};
67 }
68
69 return {CEPH_NOSNAP, nullptr};
70 }
71
72 } // anonymous namespace
73
74 using librbd::util::create_async_context_callback;
75 using librbd::util::create_context_callback;
76 using librbd::util::create_rados_callback;
77
78 template <typename I>
79 struct Replayer<I>::C_UpdateWatchCtx : public librbd::UpdateWatchCtx {
80 Replayer<I>* replayer;
81
82 C_UpdateWatchCtx(Replayer<I>* replayer) : replayer(replayer) {
83 }
84
85 void handle_notify() override {
86 replayer->handle_image_update_notify();
87 }
88 };
89
90 template <typename I>
91 struct Replayer<I>::DeepCopyHandler : public librbd::deep_copy::Handler {
92 Replayer *replayer;
93
94 DeepCopyHandler(Replayer* replayer) : replayer(replayer) {
95 }
96
97 void handle_read(uint64_t bytes_read) override {
98 replayer->handle_copy_image_read(bytes_read);
99 }
100
101 int update_progress(uint64_t object_number, uint64_t object_count) override {
102 replayer->handle_copy_image_progress(object_number, object_count);
103 return 0;
104 }
105 };
106
107 template <typename I>
108 Replayer<I>::Replayer(
109 Threads<I>* threads,
110 InstanceWatcher<I>* instance_watcher,
111 const std::string& local_mirror_uuid,
112 PoolMetaCache* pool_meta_cache,
113 StateBuilder<I>* state_builder,
114 ReplayerListener* replayer_listener)
115 : m_threads(threads),
116 m_instance_watcher(instance_watcher),
117 m_local_mirror_uuid(local_mirror_uuid),
118 m_pool_meta_cache(pool_meta_cache),
119 m_state_builder(state_builder),
120 m_replayer_listener(replayer_listener),
121 m_lock(ceph::make_mutex(librbd::util::unique_lock_name(
122 "rbd::mirror::image_replayer::snapshot::Replayer", this))) {
123 dout(10) << dendl;
124 }
125
126 template <typename I>
127 Replayer<I>::~Replayer() {
128 dout(10) << dendl;
129 ceph_assert(m_state == STATE_COMPLETE);
130 ceph_assert(m_update_watch_ctx == nullptr);
131 ceph_assert(m_deep_copy_handler == nullptr);
132 }
133
134 template <typename I>
135 void Replayer<I>::init(Context* on_finish) {
136 dout(10) << dendl;
137
138 ceph_assert(m_state == STATE_INIT);
139
140 RemotePoolMeta remote_pool_meta;
141 int r = m_pool_meta_cache->get_remote_pool_meta(
142 m_state_builder->remote_image_ctx->md_ctx.get_id(), &remote_pool_meta);
143 if (r < 0 || remote_pool_meta.mirror_peer_uuid.empty()) {
144 derr << "failed to retrieve mirror peer uuid from remote pool" << dendl;
145 m_state = STATE_COMPLETE;
146 m_threads->work_queue->queue(on_finish, r);
147 return;
148 }
149
150 m_remote_mirror_peer_uuid = remote_pool_meta.mirror_peer_uuid;
151 dout(10) << "remote_mirror_peer_uuid=" << m_remote_mirror_peer_uuid << dendl;
152
153 ceph_assert(m_on_init_shutdown == nullptr);
154 m_on_init_shutdown = on_finish;
155
156 register_local_update_watcher();
157 }
158
159 template <typename I>
160 void Replayer<I>::shut_down(Context* on_finish) {
161 dout(10) << dendl;
162
163 std::unique_lock locker{m_lock};
164 ceph_assert(m_on_init_shutdown == nullptr);
165 m_on_init_shutdown = on_finish;
166 m_error_code = 0;
167 m_error_description = "";
168
169 ceph_assert(m_state != STATE_INIT);
170 auto state = STATE_COMPLETE;
171 std::swap(m_state, state);
172
173 if (state == STATE_REPLAYING) {
174 // if a sync request was pending, request a cancelation
175 m_instance_watcher->cancel_sync_request(
176 m_state_builder->local_image_ctx->id);
177
178 // TODO interrupt snapshot copy and image copy state machines even if remote
179 // cluster is unreachable
180 dout(10) << "shut down pending on completion of snapshot replay" << dendl;
181 return;
182 }
183 locker.unlock();
184
185 unregister_remote_update_watcher();
186 }
187
188 template <typename I>
189 void Replayer<I>::flush(Context* on_finish) {
190 dout(10) << dendl;
191
192 // TODO
193 m_threads->work_queue->queue(on_finish, 0);
194 }
195
196 template <typename I>
197 bool Replayer<I>::get_replay_status(std::string* description,
198 Context* on_finish) {
199 dout(10) << dendl;
200
201 std::unique_lock locker{m_lock};
202 if (m_state != STATE_REPLAYING && m_state != STATE_IDLE) {
203 locker.unlock();
204
205 derr << "replay not running" << dendl;
206 on_finish->complete(-EAGAIN);
207 return false;
208 }
209
210 std::shared_lock local_image_locker{
211 m_state_builder->local_image_ctx->image_lock};
212 auto [local_snap_id, local_snap_info] = get_newest_mirror_snapshot(
213 m_state_builder->local_image_ctx);
214
215 std::shared_lock remote_image_locker{
216 m_state_builder->remote_image_ctx->image_lock};
217 auto [remote_snap_id, remote_snap_info] = get_newest_mirror_snapshot(
218 m_state_builder->remote_image_ctx);
219
220 if (remote_snap_info == nullptr) {
221 remote_image_locker.unlock();
222 local_image_locker.unlock();
223 locker.unlock();
224
225 derr << "remote image does not contain mirror snapshots" << dendl;
226 on_finish->complete(-EAGAIN);
227 return false;
228 }
229
230 std::string replay_state = "idle";
231 if (m_remote_snap_id_end != CEPH_NOSNAP) {
232 replay_state = "syncing";
233 }
234
235 json_spirit::mObject root_obj;
236 root_obj["replay_state"] = replay_state;
237 root_obj["remote_snapshot_timestamp"] = remote_snap_info->timestamp.sec();
238
239 auto matching_remote_snap_id = util::compute_remote_snap_id(
240 m_state_builder->local_image_ctx->image_lock,
241 m_state_builder->local_image_ctx->snap_info,
242 local_snap_id, m_state_builder->remote_mirror_uuid);
243 auto matching_remote_snap_it =
244 m_state_builder->remote_image_ctx->snap_info.find(matching_remote_snap_id);
245 if (matching_remote_snap_id != CEPH_NOSNAP &&
246 matching_remote_snap_it !=
247 m_state_builder->remote_image_ctx->snap_info.end()) {
248 // use the timestamp from the matching remote image since
249 // the local snapshot would just be the time the snapshot was
250 // synced and not the consistency point in time.
251 root_obj["local_snapshot_timestamp"] =
252 matching_remote_snap_it->second.timestamp.sec();
253 }
254
255 matching_remote_snap_it = m_state_builder->remote_image_ctx->snap_info.find(
256 m_remote_snap_id_end);
257 if (m_remote_snap_id_end != CEPH_NOSNAP &&
258 matching_remote_snap_it !=
259 m_state_builder->remote_image_ctx->snap_info.end()) {
260 root_obj["syncing_snapshot_timestamp"] = remote_snap_info->timestamp.sec();
261 root_obj["syncing_percent"] = static_cast<uint64_t>(
262 100 * m_local_mirror_snap_ns.last_copied_object_number /
263 static_cast<float>(std::max<uint64_t>(1U, m_local_object_count)));
264 }
265
266 m_bytes_per_second(0);
267 auto bytes_per_second = m_bytes_per_second.get_average();
268 root_obj["bytes_per_second"] = round_to_two_places(bytes_per_second);
269
270 auto bytes_per_snapshot = boost::accumulators::rolling_mean(
271 m_bytes_per_snapshot);
272 root_obj["bytes_per_snapshot"] = round_to_two_places(bytes_per_snapshot);
273
274 auto pending_bytes = bytes_per_snapshot * m_pending_snapshots;
275 if (bytes_per_second > 0 && m_pending_snapshots > 0) {
276 auto seconds_until_synced = round_to_two_places(
277 pending_bytes / bytes_per_second);
278 if (seconds_until_synced >= std::numeric_limits<uint64_t>::max()) {
279 seconds_until_synced = std::numeric_limits<uint64_t>::max();
280 }
281
282 root_obj["seconds_until_synced"] = static_cast<uint64_t>(
283 seconds_until_synced);
284 }
285
286 *description = json_spirit::write(
287 root_obj, json_spirit::remove_trailing_zeros);
288
289 local_image_locker.unlock();
290 remote_image_locker.unlock();
291 locker.unlock();
292 on_finish->complete(-EEXIST);
293 return true;
294 }
295
296 template <typename I>
297 void Replayer<I>::load_local_image_meta() {
298 dout(10) << dendl;
299
300 {
301 // reset state in case new snapshot is added while we are scanning
302 std::unique_lock locker{m_lock};
303 m_image_updated = false;
304 }
305
306 ceph_assert(m_state_builder->local_image_meta != nullptr);
307 auto ctx = create_context_callback<
308 Replayer<I>, &Replayer<I>::handle_load_local_image_meta>(this);
309 m_state_builder->local_image_meta->load(ctx);
310 }
311
312 template <typename I>
313 void Replayer<I>::handle_load_local_image_meta(int r) {
314 dout(10) << "r=" << r << dendl;
315
316 if (r < 0 && r != -ENOENT) {
317 derr << "failed to load local image-meta: " << cpp_strerror(r) << dendl;
318 handle_replay_complete(r, "failed to load local image-meta");
319 return;
320 }
321
322 if (r >= 0 && m_state_builder->local_image_meta->resync_requested) {
323 m_resync_requested = true;
324
325 dout(10) << "local image resync requested" << dendl;
326 handle_replay_complete(0, "resync requested");
327 return;
328 }
329
330 refresh_local_image();
331 }
332
333 template <typename I>
334 void Replayer<I>::refresh_local_image() {
335 if (!m_state_builder->local_image_ctx->state->is_refresh_required()) {
336 refresh_remote_image();
337 return;
338 }
339
340 dout(10) << dendl;
341 auto ctx = create_context_callback<
342 Replayer<I>, &Replayer<I>::handle_refresh_local_image>(this);
343 m_state_builder->local_image_ctx->state->refresh(ctx);
344 }
345
346 template <typename I>
347 void Replayer<I>::handle_refresh_local_image(int r) {
348 dout(10) << "r=" << r << dendl;
349
350 if (r < 0) {
351 derr << "failed to refresh local image: " << cpp_strerror(r) << dendl;
352 handle_replay_complete(r, "failed to refresh local image");
353 return;
354 }
355
356 refresh_remote_image();
357 }
358
359 template <typename I>
360 void Replayer<I>::refresh_remote_image() {
361 if (!m_state_builder->remote_image_ctx->state->is_refresh_required()) {
362 std::unique_lock locker{m_lock};
363 scan_local_mirror_snapshots(&locker);
364 return;
365 }
366
367 dout(10) << dendl;
368 auto ctx = create_context_callback<
369 Replayer<I>, &Replayer<I>::handle_refresh_remote_image>(this);
370 m_state_builder->remote_image_ctx->state->refresh(ctx);
371 }
372
373 template <typename I>
374 void Replayer<I>::handle_refresh_remote_image(int r) {
375 dout(10) << "r=" << r << dendl;
376
377 if (r < 0) {
378 derr << "failed to refresh remote image: " << cpp_strerror(r) << dendl;
379 handle_replay_complete(r, "failed to refresh remote image");
380 return;
381 }
382
383 std::unique_lock locker{m_lock};
384 scan_local_mirror_snapshots(&locker);
385 }
386
387 template <typename I>
388 void Replayer<I>::scan_local_mirror_snapshots(
389 std::unique_lock<ceph::mutex>* locker) {
390 if (is_replay_interrupted(locker)) {
391 return;
392 }
393
394 dout(10) << dendl;
395
396 m_local_snap_id_start = 0;
397 m_local_snap_id_end = CEPH_NOSNAP;
398 m_local_mirror_snap_ns = {};
399 m_local_object_count = 0;
400
401 m_remote_snap_id_start = 0;
402 m_remote_snap_id_end = CEPH_NOSNAP;
403 m_remote_mirror_snap_ns = {};
404
405 std::set<uint64_t> prune_snap_ids;
406
407 auto local_image_ctx = m_state_builder->local_image_ctx;
408 std::shared_lock image_locker{local_image_ctx->image_lock};
409 for (auto snap_info_it = local_image_ctx->snap_info.begin();
410 snap_info_it != local_image_ctx->snap_info.end(); ++snap_info_it) {
411 const auto& snap_ns = snap_info_it->second.snap_namespace;
412 auto mirror_ns = boost::get<
413 cls::rbd::MirrorSnapshotNamespace>(&snap_ns);
414 if (mirror_ns == nullptr) {
415 continue;
416 }
417
418 dout(15) << "local mirror snapshot: id=" << snap_info_it->first << ", "
419 << "mirror_ns=" << *mirror_ns << dendl;
420 m_local_mirror_snap_ns = *mirror_ns;
421
422 auto local_snap_id = snap_info_it->first;
423 if (mirror_ns->is_non_primary()) {
424 if (mirror_ns->complete) {
425 // if remote has new snapshots, we would sync from here
426 m_local_snap_id_start = local_snap_id;
427 m_local_snap_id_end = CEPH_NOSNAP;
428
429 if (mirror_ns->mirror_peer_uuids.empty()) {
430 // no other peer will attempt to sync to this snapshot so store as
431 // a candidate for removal
432 prune_snap_ids.insert(local_snap_id);
433 }
434 } else {
435 // start snap will be last complete mirror snapshot or initial
436 // image revision
437 m_local_snap_id_end = local_snap_id;
438
439 if (mirror_ns->last_copied_object_number == 0) {
440 // snapshot might be missing image state, object-map, etc, so just
441 // delete and re-create it if we haven't started copying data
442 // objects
443 prune_snap_ids.insert(local_snap_id);
444 break;
445 }
446 }
447 } else if (mirror_ns->is_primary()) {
448 if (mirror_ns->complete) {
449 m_local_snap_id_start = local_snap_id;
450 m_local_snap_id_end = CEPH_NOSNAP;
451 } else {
452 derr << "incomplete local primary snapshot" << dendl;
453 handle_replay_complete(locker, -EINVAL,
454 "incomplete local primary snapshot");
455 return;
456 }
457 } else {
458 derr << "unknown local mirror snapshot state" << dendl;
459 handle_replay_complete(locker, -EINVAL,
460 "invalid local mirror snapshot state");
461 return;
462 }
463 }
464 image_locker.unlock();
465
466 if (m_local_snap_id_start > 0 && m_local_snap_id_end == CEPH_NOSNAP) {
467 // remove candidate that is required for delta snapshot sync
468 prune_snap_ids.erase(m_local_snap_id_start);
469 }
470 if (!prune_snap_ids.empty()) {
471 locker->unlock();
472
473 auto prune_snap_id = *prune_snap_ids.begin();
474 dout(5) << "pruning unused non-primary snapshot " << prune_snap_id << dendl;
475 prune_non_primary_snapshot(prune_snap_id);
476 return;
477 }
478
479 if (m_local_snap_id_start > 0 || m_local_snap_id_end != CEPH_NOSNAP) {
480 if (m_local_mirror_snap_ns.is_non_primary() &&
481 m_local_mirror_snap_ns.primary_mirror_uuid !=
482 m_state_builder->remote_mirror_uuid) {
483 // TODO support multiple peers
484 derr << "local image linked to unknown peer: "
485 << m_local_mirror_snap_ns.primary_mirror_uuid << dendl;
486 handle_replay_complete(locker, -EEXIST,
487 "local image linked to unknown peer");
488 return;
489 } else if (m_local_mirror_snap_ns.state ==
490 cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY) {
491 dout(5) << "local image promoted" << dendl;
492 handle_replay_complete(locker, 0, "force promoted");
493 return;
494 }
495
496 dout(10) << "found local mirror snapshot: "
497 << "local_snap_id_start=" << m_local_snap_id_start << ", "
498 << "local_snap_id_end=" << m_local_snap_id_end << ", "
499 << "local_snap_ns=" << m_local_mirror_snap_ns << dendl;
500 if (!m_local_mirror_snap_ns.is_primary() &&
501 m_local_mirror_snap_ns.complete) {
502 // our remote sync should start after this completed snapshot
503 m_remote_snap_id_start = m_local_mirror_snap_ns.primary_snap_id;
504 }
505 }
506
507 // we don't have any mirror snapshots or only completed non-primary
508 // mirror snapshots
509 scan_remote_mirror_snapshots(locker);
510 }
511
512 template <typename I>
513 void Replayer<I>::scan_remote_mirror_snapshots(
514 std::unique_lock<ceph::mutex>* locker) {
515 dout(10) << dendl;
516
517 m_pending_snapshots = 0;
518
519 std::set<uint64_t> unlink_snap_ids;
520 bool split_brain = false;
521 bool remote_demoted = false;
522 auto remote_image_ctx = m_state_builder->remote_image_ctx;
523 std::shared_lock image_locker{remote_image_ctx->image_lock};
524 for (auto snap_info_it = remote_image_ctx->snap_info.begin();
525 snap_info_it != remote_image_ctx->snap_info.end(); ++snap_info_it) {
526 const auto& snap_ns = snap_info_it->second.snap_namespace;
527 auto mirror_ns = boost::get<
528 cls::rbd::MirrorSnapshotNamespace>(&snap_ns);
529 if (mirror_ns == nullptr) {
530 continue;
531 }
532
533 dout(15) << "remote mirror snapshot: id=" << snap_info_it->first << ", "
534 << "mirror_ns=" << *mirror_ns << dendl;
535 remote_demoted = mirror_ns->is_demoted();
536 if (!mirror_ns->is_primary() && !mirror_ns->is_non_primary()) {
537 derr << "unknown remote mirror snapshot state" << dendl;
538 handle_replay_complete(locker, -EINVAL,
539 "invalid remote mirror snapshot state");
540 return;
541 } else if (mirror_ns->mirror_peer_uuids.count(m_remote_mirror_peer_uuid) ==
542 0) {
543 dout(15) << "skipping remote snapshot due to missing mirror peer"
544 << dendl;
545 continue;
546 }
547
548 auto remote_snap_id = snap_info_it->first;
549 if (m_local_snap_id_start > 0 || m_local_snap_id_end != CEPH_NOSNAP) {
550 // we have a local mirror snapshot
551 if (m_local_mirror_snap_ns.is_non_primary()) {
552 // previously validated that it was linked to remote
553 ceph_assert(m_local_mirror_snap_ns.primary_mirror_uuid ==
554 m_state_builder->remote_mirror_uuid);
555
556 unlink_snap_ids.insert(remote_snap_id);
557 if (m_local_mirror_snap_ns.complete &&
558 m_local_mirror_snap_ns.primary_snap_id >= remote_snap_id) {
559 // skip past completed remote snapshot
560 m_remote_snap_id_start = remote_snap_id;
561 m_remote_mirror_snap_ns = *mirror_ns;
562 dout(15) << "skipping synced remote snapshot " << remote_snap_id
563 << dendl;
564 continue;
565 } else if (!m_local_mirror_snap_ns.complete &&
566 m_local_mirror_snap_ns.primary_snap_id > remote_snap_id) {
567 // skip until we get to the in-progress remote snapshot
568 dout(15) << "skipping synced remote snapshot " << remote_snap_id
569 << " while search for in-progress sync" << dendl;
570 m_remote_snap_id_start = remote_snap_id;
571 m_remote_mirror_snap_ns = *mirror_ns;
572 continue;
573 }
574 } else if (m_local_mirror_snap_ns.state ==
575 cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY_DEMOTED) {
576 // find the matching demotion snapshot in remote image
577 ceph_assert(m_local_snap_id_start > 0);
578 if (mirror_ns->state ==
579 cls::rbd::MIRROR_SNAPSHOT_STATE_NON_PRIMARY_DEMOTED &&
580 mirror_ns->primary_mirror_uuid == m_local_mirror_uuid &&
581 mirror_ns->primary_snap_id == m_local_snap_id_start) {
582 dout(10) << "located matching demotion snapshot: "
583 << "remote_snap_id=" << remote_snap_id << ", "
584 << "local_snap_id=" << m_local_snap_id_start << dendl;
585 m_remote_snap_id_start = remote_snap_id;
586 split_brain = false;
587 continue;
588 } else if (m_remote_snap_id_start == 0) {
589 // still looking for our matching demotion snapshot
590 dout(15) << "skipping remote snapshot " << remote_snap_id << " "
591 << "while searching for demotion" << dendl;
592 split_brain = true;
593 continue;
594 }
595 } else {
596 // should not have been able to reach this
597 ceph_assert(false);
598 }
599 } else if (!mirror_ns->is_primary()) {
600 dout(15) << "skipping non-primary remote snapshot" << dendl;
601 continue;
602 }
603
604 // found candidate snapshot to sync
605 ++m_pending_snapshots;
606 if (m_remote_snap_id_end != CEPH_NOSNAP) {
607 continue;
608 }
609
610 // first primary snapshot where were are listed as a peer
611 m_remote_snap_id_end = remote_snap_id;
612 m_remote_mirror_snap_ns = *mirror_ns;
613 }
614 image_locker.unlock();
615
616 unlink_snap_ids.erase(m_remote_snap_id_start);
617 unlink_snap_ids.erase(m_remote_snap_id_end);
618 if (!unlink_snap_ids.empty()) {
619 locker->unlock();
620
621 // retry the unlinking process for a remote snapshot that we do not
622 // need anymore
623 auto remote_snap_id = *unlink_snap_ids.begin();
624 dout(10) << "unlinking from remote snapshot " << remote_snap_id << dendl;
625 unlink_peer(remote_snap_id);
626 return;
627 }
628
629 if (m_remote_snap_id_end != CEPH_NOSNAP) {
630 dout(10) << "found remote mirror snapshot: "
631 << "remote_snap_id_start=" << m_remote_snap_id_start << ", "
632 << "remote_snap_id_end=" << m_remote_snap_id_end << ", "
633 << "remote_snap_ns=" << m_remote_mirror_snap_ns << dendl;
634 if (m_remote_mirror_snap_ns.complete) {
635 locker->unlock();
636
637 if (m_local_snap_id_end != CEPH_NOSNAP &&
638 !m_local_mirror_snap_ns.complete) {
639 // attempt to resume image-sync
640 dout(10) << "local image contains in-progress mirror snapshot"
641 << dendl;
642 get_local_image_state();
643 } else {
644 copy_snapshots();
645 }
646 return;
647 } else {
648 // might have raced with the creation of a remote mirror snapshot
649 // so we will need to refresh and rescan once it completes
650 dout(15) << "remote mirror snapshot not complete" << dendl;
651 }
652 }
653
654 if (m_image_updated) {
655 // received update notification while scanning image, restart ...
656 m_image_updated = false;
657 locker->unlock();
658
659 dout(10) << "restarting snapshot scan due to remote update notification"
660 << dendl;
661 load_local_image_meta();
662 return;
663 }
664
665 if (is_replay_interrupted(locker)) {
666 return;
667 } else if (split_brain) {
668 derr << "split-brain detected: failed to find matching non-primary "
669 << "snapshot in remote image: "
670 << "local_snap_id_start=" << m_local_snap_id_start << ", "
671 << "local_snap_ns=" << m_local_mirror_snap_ns << dendl;
672 handle_replay_complete(locker, -EEXIST, "split-brain");
673 return;
674 } else if (remote_demoted) {
675 dout(10) << "remote image demoted" << dendl;
676 handle_replay_complete(locker, -EREMOTEIO, "remote image demoted");
677 return;
678 }
679
680 dout(10) << "all remote snapshots synced: idling waiting for new snapshot"
681 << dendl;
682 ceph_assert(m_state == STATE_REPLAYING);
683 m_state = STATE_IDLE;
684
685 notify_status_updated();
686 }
687
688 template <typename I>
689 void Replayer<I>::prune_non_primary_snapshot(uint64_t snap_id) {
690 dout(10) << "snap_id=" << snap_id << dendl;
691
692 auto local_image_ctx = m_state_builder->local_image_ctx;
693 bool snap_valid = false;
694 cls::rbd::SnapshotNamespace snap_namespace;
695 std::string snap_name;
696
697 {
698 std::shared_lock image_locker{local_image_ctx->image_lock};
699 auto snap_info = local_image_ctx->get_snap_info(snap_id);
700 if (snap_info != nullptr) {
701 snap_valid = true;
702 snap_namespace = snap_info->snap_namespace;
703 snap_name = snap_info->name;
704
705 ceph_assert(boost::get<cls::rbd::MirrorSnapshotNamespace>(
706 &snap_namespace) != nullptr);
707 }
708 }
709
710 if (!snap_valid) {
711 load_local_image_meta();
712 return;
713 }
714
715 auto ctx = create_context_callback<
716 Replayer<I>, &Replayer<I>::handle_prune_non_primary_snapshot>(this);
717 local_image_ctx->operations->snap_remove(snap_namespace, snap_name, ctx);
718 }
719
720 template <typename I>
721 void Replayer<I>::handle_prune_non_primary_snapshot(int r) {
722 dout(10) << "r=" << r << dendl;
723
724 if (r < 0 && r != -ENOENT) {
725 derr << "failed to prune non-primary snapshot: " << cpp_strerror(r)
726 << dendl;
727 handle_replay_complete(r, "failed to prune non-primary snapshot");
728 return;
729 }
730
731 if (is_replay_interrupted()) {
732 return;
733 }
734
735 load_local_image_meta();
736 }
737
738 template <typename I>
739 void Replayer<I>::copy_snapshots() {
740 dout(10) << "remote_snap_id_start=" << m_remote_snap_id_start << ", "
741 << "remote_snap_id_end=" << m_remote_snap_id_end << ", "
742 << "local_snap_id_start=" << m_local_snap_id_start << dendl;
743
744 ceph_assert(m_remote_snap_id_start != CEPH_NOSNAP);
745 ceph_assert(m_remote_snap_id_end > 0 &&
746 m_remote_snap_id_end != CEPH_NOSNAP);
747 ceph_assert(m_local_snap_id_start != CEPH_NOSNAP);
748
749 m_local_mirror_snap_ns = {};
750 auto ctx = create_context_callback<
751 Replayer<I>, &Replayer<I>::handle_copy_snapshots>(this);
752 auto req = librbd::deep_copy::SnapshotCopyRequest<I>::create(
753 m_state_builder->remote_image_ctx, m_state_builder->local_image_ctx,
754 m_remote_snap_id_start, m_remote_snap_id_end, m_local_snap_id_start,
755 false, m_threads->work_queue, &m_local_mirror_snap_ns.snap_seqs,
756 ctx);
757 req->send();
758 }
759
760 template <typename I>
761 void Replayer<I>::handle_copy_snapshots(int r) {
762 dout(10) << "r=" << r << dendl;
763
764 if (r < 0) {
765 derr << "failed to copy snapshots from remote to local image: "
766 << cpp_strerror(r) << dendl;
767 handle_replay_complete(
768 r, "failed to copy snapshots from remote to local image");
769 return;
770 }
771
772 dout(10) << "remote_snap_id_start=" << m_remote_snap_id_start << ", "
773 << "remote_snap_id_end=" << m_remote_snap_id_end << ", "
774 << "local_snap_id_start=" << m_local_snap_id_start << ", "
775 << "snap_seqs=" << m_local_mirror_snap_ns.snap_seqs << dendl;
776 get_remote_image_state();
777 }
778
779 template <typename I>
780 void Replayer<I>::get_remote_image_state() {
781 dout(10) << dendl;
782
783 auto ctx = create_context_callback<
784 Replayer<I>, &Replayer<I>::handle_get_remote_image_state>(this);
785 auto req = librbd::mirror::snapshot::GetImageStateRequest<I>::create(
786 m_state_builder->remote_image_ctx, m_remote_snap_id_end,
787 &m_image_state, ctx);
788 req->send();
789 }
790
791 template <typename I>
792 void Replayer<I>::handle_get_remote_image_state(int r) {
793 dout(10) << "r=" << r << dendl;
794
795 if (r < 0) {
796 derr << "failed to retrieve remote snapshot image state: "
797 << cpp_strerror(r) << dendl;
798 handle_replay_complete(r, "failed to retrieve remote snapshot image state");
799 return;
800 }
801
802 create_non_primary_snapshot();
803 }
804
805 template <typename I>
806 void Replayer<I>::get_local_image_state() {
807 dout(10) << dendl;
808
809 ceph_assert(m_local_snap_id_end != CEPH_NOSNAP);
810 auto ctx = create_context_callback<
811 Replayer<I>, &Replayer<I>::handle_get_local_image_state>(this);
812 auto req = librbd::mirror::snapshot::GetImageStateRequest<I>::create(
813 m_state_builder->local_image_ctx, m_local_snap_id_end,
814 &m_image_state, ctx);
815 req->send();
816 }
817
818 template <typename I>
819 void Replayer<I>::handle_get_local_image_state(int r) {
820 dout(10) << "r=" << r << dendl;
821
822 if (r < 0) {
823 derr << "failed to retrieve local snapshot image state: "
824 << cpp_strerror(r) << dendl;
825 handle_replay_complete(r, "failed to retrieve local snapshot image state");
826 return;
827 }
828
829 request_sync();
830 }
831
832 template <typename I>
833 void Replayer<I>::create_non_primary_snapshot() {
834 auto local_image_ctx = m_state_builder->local_image_ctx;
835
836 if (m_local_snap_id_start > 0) {
837 std::shared_lock local_image_locker{local_image_ctx->image_lock};
838
839 auto local_snap_info_it = local_image_ctx->snap_info.find(
840 m_local_snap_id_start);
841 if (local_snap_info_it == local_image_ctx->snap_info.end()) {
842 local_image_locker.unlock();
843
844 derr << "failed to locate local snapshot " << m_local_snap_id_start
845 << dendl;
846 handle_replay_complete(-ENOENT, "failed to locate local start snapshot");
847 return;
848 }
849
850 auto mirror_ns = boost::get<cls::rbd::MirrorSnapshotNamespace>(
851 &local_snap_info_it->second.snap_namespace);
852 ceph_assert(mirror_ns != nullptr);
853
854 auto remote_image_ctx = m_state_builder->remote_image_ctx;
855 std::shared_lock remote_image_locker{remote_image_ctx->image_lock};
856
857 // (re)build a full mapping from remote to local snap ids for all user
858 // snapshots to support applying image state in the future
859 for (auto& [remote_snap_id, remote_snap_info] :
860 remote_image_ctx->snap_info) {
861 if (remote_snap_id >= m_remote_snap_id_end) {
862 break;
863 }
864
865 // we can ignore all non-user snapshots since image state only includes
866 // user snapshots
867 if (boost::get<cls::rbd::UserSnapshotNamespace>(
868 &remote_snap_info.snap_namespace) == nullptr) {
869 continue;
870 }
871
872 uint64_t local_snap_id = CEPH_NOSNAP;
873 if (mirror_ns->is_demoted() && !m_remote_mirror_snap_ns.is_demoted()) {
874 // if we are creating a non-primary snapshot following a demotion,
875 // re-build the full snapshot sequence since we don't have a valid
876 // snapshot mapping
877 auto local_snap_id_it = local_image_ctx->snap_ids.find(
878 {remote_snap_info.snap_namespace, remote_snap_info.name});
879 if (local_snap_id_it != local_image_ctx->snap_ids.end()) {
880 local_snap_id = local_snap_id_it->second;
881 }
882 } else {
883 auto snap_seq_it = mirror_ns->snap_seqs.find(remote_snap_id);
884 if (snap_seq_it != mirror_ns->snap_seqs.end()) {
885 local_snap_id = snap_seq_it->second;
886 }
887 }
888
889 if (m_local_mirror_snap_ns.snap_seqs.count(remote_snap_id) == 0 &&
890 local_snap_id != CEPH_NOSNAP) {
891 dout(15) << "mapping remote snapshot " << remote_snap_id << " to "
892 << "local snapshot " << local_snap_id << dendl;
893 m_local_mirror_snap_ns.snap_seqs[remote_snap_id] = local_snap_id;
894 }
895 }
896 }
897
898 dout(10) << "demoted=" << m_remote_mirror_snap_ns.is_demoted() << ", "
899 << "primary_mirror_uuid="
900 << m_state_builder->remote_mirror_uuid << ", "
901 << "primary_snap_id=" << m_remote_snap_id_end << ", "
902 << "snap_seqs=" << m_local_mirror_snap_ns.snap_seqs << dendl;
903
904 auto ctx = create_context_callback<
905 Replayer<I>, &Replayer<I>::handle_create_non_primary_snapshot>(this);
906 auto req = librbd::mirror::snapshot::CreateNonPrimaryRequest<I>::create(
907 local_image_ctx, m_remote_mirror_snap_ns.is_demoted(),
908 m_state_builder->remote_mirror_uuid, m_remote_snap_id_end,
909 m_local_mirror_snap_ns.snap_seqs, m_image_state, &m_local_snap_id_end, ctx);
910 req->send();
911 }
912
913 template <typename I>
914 void Replayer<I>::handle_create_non_primary_snapshot(int r) {
915 dout(10) << "r=" << r << dendl;
916
917 if (r < 0) {
918 derr << "failed to create local mirror snapshot: " << cpp_strerror(r)
919 << dendl;
920 handle_replay_complete(r, "failed to create local mirror snapshot");
921 return;
922 }
923
924 dout(15) << "local_snap_id_end=" << m_local_snap_id_end << dendl;
925
926 request_sync();
927 }
928
929 template <typename I>
930 void Replayer<I>::request_sync() {
931 if (m_remote_mirror_snap_ns.clean_since_snap_id == m_remote_snap_id_start) {
932 dout(10) << "skipping unnecessary image copy: "
933 << "remote_snap_id_start=" << m_remote_snap_id_start << ", "
934 << "remote_mirror_snap_ns=" << m_remote_mirror_snap_ns << dendl;
935 apply_image_state();
936 return;
937 }
938
939 dout(10) << dendl;
940 std::unique_lock locker{m_lock};
941 if (is_replay_interrupted(&locker)) {
942 return;
943 }
944
945 auto ctx = create_async_context_callback(
946 m_threads->work_queue, create_context_callback<
947 Replayer<I>, &Replayer<I>::handle_request_sync>(this));
948 m_instance_watcher->notify_sync_request(m_state_builder->local_image_ctx->id,
949 ctx);
950 }
951
952 template <typename I>
953 void Replayer<I>::handle_request_sync(int r) {
954 dout(10) << "r=" << r << dendl;
955
956 std::unique_lock locker{m_lock};
957 if (is_replay_interrupted(&locker)) {
958 return;
959 } else if (r == -ECANCELED) {
960 dout(5) << "image-sync canceled" << dendl;
961 handle_replay_complete(&locker, r, "image-sync canceled");
962 return;
963 } else if (r < 0) {
964 derr << "failed to request image-sync: " << cpp_strerror(r) << dendl;
965 handle_replay_complete(&locker, r, "failed to request image-sync");
966 return;
967 }
968
969 m_sync_in_progress = true;
970 locker.unlock();
971
972 copy_image();
973 }
974
975 template <typename I>
976 void Replayer<I>::copy_image() {
977 dout(10) << "remote_snap_id_start=" << m_remote_snap_id_start << ", "
978 << "remote_snap_id_end=" << m_remote_snap_id_end << ", "
979 << "local_snap_id_start=" << m_local_snap_id_start << ", "
980 << "last_copied_object_number="
981 << m_local_mirror_snap_ns.last_copied_object_number << ", "
982 << "snap_seqs=" << m_local_mirror_snap_ns.snap_seqs << dendl;
983
984 m_snapshot_bytes = 0;
985 m_deep_copy_handler = new DeepCopyHandler(this);
986 auto ctx = create_context_callback<
987 Replayer<I>, &Replayer<I>::handle_copy_image>(this);
988 auto req = librbd::deep_copy::ImageCopyRequest<I>::create(
989 m_state_builder->remote_image_ctx, m_state_builder->local_image_ctx,
990 m_remote_snap_id_start, m_remote_snap_id_end, m_local_snap_id_start, false,
991 (m_local_mirror_snap_ns.last_copied_object_number > 0 ?
992 librbd::deep_copy::ObjectNumber{
993 m_local_mirror_snap_ns.last_copied_object_number} :
994 librbd::deep_copy::ObjectNumber{}),
995 m_local_mirror_snap_ns.snap_seqs, m_deep_copy_handler, ctx);
996 req->send();
997 }
998
999 template <typename I>
1000 void Replayer<I>::handle_copy_image(int r) {
1001 dout(10) << "r=" << r << dendl;
1002
1003 delete m_deep_copy_handler;
1004 m_deep_copy_handler = nullptr;
1005
1006 if (r < 0) {
1007 derr << "failed to copy remote image to local image: " << cpp_strerror(r)
1008 << dendl;
1009 handle_replay_complete(r, "failed to copy remote image");
1010 return;
1011 }
1012
1013 {
1014 std::unique_lock locker{m_lock};
1015 m_bytes_per_snapshot(m_snapshot_bytes);
1016 m_snapshot_bytes = 0;
1017 }
1018
1019 apply_image_state();
1020 }
1021
1022 template <typename I>
1023 void Replayer<I>::handle_copy_image_progress(uint64_t object_number,
1024 uint64_t object_count) {
1025 dout(10) << "object_number=" << object_number << ", "
1026 << "object_count=" << object_count << dendl;
1027
1028 std::unique_lock locker{m_lock};
1029 m_local_mirror_snap_ns.last_copied_object_number = std::min(
1030 object_number, object_count);
1031 m_local_object_count = object_count;
1032
1033 update_non_primary_snapshot(false);
1034 }
1035
1036 template <typename I>
1037 void Replayer<I>::handle_copy_image_read(uint64_t bytes_read) {
1038 dout(20) << "bytes_read=" << bytes_read << dendl;
1039
1040 std::unique_lock locker{m_lock};
1041 m_bytes_per_second(bytes_read);
1042 m_snapshot_bytes += bytes_read;
1043 }
1044
1045 template <typename I>
1046 void Replayer<I>::apply_image_state() {
1047 dout(10) << dendl;
1048
1049 auto ctx = create_context_callback<
1050 Replayer<I>, &Replayer<I>::handle_apply_image_state>(this);
1051 auto req = ApplyImageStateRequest<I>::create(
1052 m_local_mirror_uuid,
1053 m_state_builder->remote_mirror_uuid,
1054 m_state_builder->local_image_ctx,
1055 m_state_builder->remote_image_ctx,
1056 m_image_state, ctx);
1057 req->send();
1058 }
1059
1060 template <typename I>
1061 void Replayer<I>::handle_apply_image_state(int r) {
1062 dout(10) << "r=" << r << dendl;
1063
1064 if (r < 0 && r != -ENOENT) {
1065 derr << "failed to apply remote image state to local image: "
1066 << cpp_strerror(r) << dendl;
1067 handle_replay_complete(r, "failed to apply remote image state");
1068 return;
1069 }
1070
1071 std::unique_lock locker{m_lock};
1072 update_non_primary_snapshot(true);
1073 }
1074
1075 template <typename I>
1076 void Replayer<I>::update_non_primary_snapshot(bool complete) {
1077 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1078 if (!complete) {
1079 // disallow two in-flight updates if this isn't the completion of the sync
1080 if (m_updating_sync_point) {
1081 return;
1082 }
1083 m_updating_sync_point = true;
1084 } else {
1085 m_local_mirror_snap_ns.complete = true;
1086 }
1087
1088 dout(10) << dendl;
1089
1090 librados::ObjectWriteOperation op;
1091 librbd::cls_client::mirror_image_snapshot_set_copy_progress(
1092 &op, m_local_snap_id_end, m_local_mirror_snap_ns.complete,
1093 m_local_mirror_snap_ns.last_copied_object_number);
1094
1095 auto ctx = new C_TrackedOp(
1096 m_in_flight_op_tracker, new LambdaContext([this, complete](int r) {
1097 handle_update_non_primary_snapshot(complete, r);
1098 }));
1099 auto aio_comp = create_rados_callback(ctx);
1100 int r = m_state_builder->local_image_ctx->md_ctx.aio_operate(
1101 m_state_builder->local_image_ctx->header_oid, aio_comp, &op);
1102 ceph_assert(r == 0);
1103 aio_comp->release();
1104 }
1105
1106 template <typename I>
1107 void Replayer<I>::handle_update_non_primary_snapshot(bool complete, int r) {
1108 dout(10) << "r=" << r << dendl;
1109
1110 if (r < 0) {
1111 derr << "failed to update local snapshot progress: " << cpp_strerror(r)
1112 << dendl;
1113 if (complete) {
1114 // only fail if this was the final update
1115 handle_replay_complete(r, "failed to update local snapshot progress");
1116 return;
1117 }
1118 }
1119
1120 if (!complete) {
1121 // periodic sync-point update -- do not advance state machine
1122 std::unique_lock locker{m_lock};
1123
1124 ceph_assert(m_updating_sync_point);
1125 m_updating_sync_point = false;
1126 return;
1127 }
1128
1129 notify_image_update();
1130 }
1131
1132 template <typename I>
1133 void Replayer<I>::notify_image_update() {
1134 dout(10) << dendl;
1135
1136 auto ctx = create_context_callback<
1137 Replayer<I>, &Replayer<I>::handle_notify_image_update>(this);
1138 m_state_builder->local_image_ctx->notify_update(ctx);
1139 }
1140
1141 template <typename I>
1142 void Replayer<I>::handle_notify_image_update(int r) {
1143 dout(10) << "r=" << r << dendl;
1144
1145 if (r < 0) {
1146 derr << "failed to notify local image update: " << cpp_strerror(r) << dendl;
1147 }
1148
1149 unlink_peer(m_remote_snap_id_start);
1150 }
1151
1152 template <typename I>
1153 void Replayer<I>::unlink_peer(uint64_t remote_snap_id) {
1154 if (remote_snap_id == 0) {
1155 finish_sync();
1156 return;
1157 }
1158
1159 // local snapshot fully synced -- we no longer depend on the sync
1160 // start snapshot in the remote image
1161 dout(10) << "remote_snap_id=" << remote_snap_id << dendl;
1162
1163 auto ctx = create_context_callback<
1164 Replayer<I>, &Replayer<I>::handle_unlink_peer>(this);
1165 auto req = librbd::mirror::snapshot::UnlinkPeerRequest<I>::create(
1166 m_state_builder->remote_image_ctx, remote_snap_id,
1167 m_remote_mirror_peer_uuid, ctx);
1168 req->send();
1169 }
1170
1171 template <typename I>
1172 void Replayer<I>::handle_unlink_peer(int r) {
1173 dout(10) << "r=" << r << dendl;
1174
1175 if (r < 0 && r != -ENOENT) {
1176 derr << "failed to unlink local peer from remote image: " << cpp_strerror(r)
1177 << dendl;
1178 handle_replay_complete(r, "failed to unlink local peer from remote image");
1179 return;
1180 }
1181
1182 finish_sync();
1183 }
1184
1185 template <typename I>
1186 void Replayer<I>::finish_sync() {
1187 dout(10) << dendl;
1188
1189 {
1190 std::unique_lock locker{m_lock};
1191 notify_status_updated();
1192
1193 if (m_sync_in_progress) {
1194 m_sync_in_progress = false;
1195 m_instance_watcher->notify_sync_complete(
1196 m_state_builder->local_image_ctx->id);
1197 }
1198 }
1199
1200 if (is_replay_interrupted()) {
1201 return;
1202 }
1203
1204 load_local_image_meta();
1205 }
1206
1207 template <typename I>
1208 void Replayer<I>::register_local_update_watcher() {
1209 dout(10) << dendl;
1210
1211 m_update_watch_ctx = new C_UpdateWatchCtx(this);
1212
1213 int r = m_state_builder->local_image_ctx->state->register_update_watcher(
1214 m_update_watch_ctx, &m_local_update_watcher_handle);
1215 auto ctx = create_context_callback<
1216 Replayer<I>, &Replayer<I>::handle_register_local_update_watcher>(this);
1217 m_threads->work_queue->queue(ctx, r);
1218 }
1219
1220 template <typename I>
1221 void Replayer<I>::handle_register_local_update_watcher(int r) {
1222 dout(10) << "r=" << r << dendl;
1223
1224 if (r < 0) {
1225 derr << "failed to register local update watcher: " << cpp_strerror(r)
1226 << dendl;
1227 handle_replay_complete(r, "failed to register local image update watcher");
1228 m_state = STATE_COMPLETE;
1229
1230 delete m_update_watch_ctx;
1231 m_update_watch_ctx = nullptr;
1232
1233 Context* on_init = nullptr;
1234 std::swap(on_init, m_on_init_shutdown);
1235 on_init->complete(r);
1236 return;
1237 }
1238
1239 register_remote_update_watcher();
1240 }
1241
1242 template <typename I>
1243 void Replayer<I>::register_remote_update_watcher() {
1244 dout(10) << dendl;
1245
1246 int r = m_state_builder->remote_image_ctx->state->register_update_watcher(
1247 m_update_watch_ctx, &m_remote_update_watcher_handle);
1248 auto ctx = create_context_callback<
1249 Replayer<I>, &Replayer<I>::handle_register_remote_update_watcher>(this);
1250 m_threads->work_queue->queue(ctx, r);
1251 }
1252
1253 template <typename I>
1254 void Replayer<I>::handle_register_remote_update_watcher(int r) {
1255 dout(10) << "r=" << r << dendl;
1256
1257 if (r < 0) {
1258 derr << "failed to register remote update watcher: " << cpp_strerror(r)
1259 << dendl;
1260 handle_replay_complete(r, "failed to register remote image update watcher");
1261 m_state = STATE_COMPLETE;
1262
1263 unregister_local_update_watcher();
1264 return;
1265 }
1266
1267 m_state = STATE_REPLAYING;
1268
1269 Context* on_init = nullptr;
1270 std::swap(on_init, m_on_init_shutdown);
1271 on_init->complete(0);
1272
1273 // delay initial snapshot scan until after we have alerted
1274 // image replayer that we have initialized in case an error
1275 // occurs
1276 {
1277 std::unique_lock locker{m_lock};
1278 notify_status_updated();
1279 }
1280
1281 load_local_image_meta();
1282 }
1283
1284 template <typename I>
1285 void Replayer<I>::unregister_remote_update_watcher() {
1286 dout(10) << dendl;
1287
1288 auto ctx = create_context_callback<
1289 Replayer<I>,
1290 &Replayer<I>::handle_unregister_remote_update_watcher>(this);
1291 m_state_builder->remote_image_ctx->state->unregister_update_watcher(
1292 m_remote_update_watcher_handle, ctx);
1293 }
1294
1295 template <typename I>
1296 void Replayer<I>::handle_unregister_remote_update_watcher(int r) {
1297 dout(10) << "r=" << r << dendl;
1298
1299 if (r < 0) {
1300 derr << "failed to unregister remote update watcher: " << cpp_strerror(r)
1301 << dendl;
1302 handle_replay_complete(
1303 r, "failed to unregister remote image update watcher");
1304 }
1305
1306 unregister_local_update_watcher();
1307 }
1308
1309 template <typename I>
1310 void Replayer<I>::unregister_local_update_watcher() {
1311 dout(10) << dendl;
1312
1313 auto ctx = create_context_callback<
1314 Replayer<I>,
1315 &Replayer<I>::handle_unregister_local_update_watcher>(this);
1316 m_state_builder->local_image_ctx->state->unregister_update_watcher(
1317 m_local_update_watcher_handle, ctx);
1318 }
1319
1320 template <typename I>
1321 void Replayer<I>::handle_unregister_local_update_watcher(int r) {
1322 dout(10) << "r=" << r << dendl;
1323
1324 if (r < 0) {
1325 derr << "failed to unregister local update watcher: " << cpp_strerror(r)
1326 << dendl;
1327 handle_replay_complete(
1328 r, "failed to unregister local image update watcher");
1329 }
1330
1331 delete m_update_watch_ctx;
1332 m_update_watch_ctx = nullptr;
1333
1334 wait_for_in_flight_ops();
1335 }
1336
1337 template <typename I>
1338 void Replayer<I>::wait_for_in_flight_ops() {
1339 dout(10) << dendl;
1340
1341 auto ctx = create_async_context_callback(
1342 m_threads->work_queue, create_context_callback<
1343 Replayer<I>, &Replayer<I>::handle_wait_for_in_flight_ops>(this));
1344 m_in_flight_op_tracker.wait_for_ops(ctx);
1345 }
1346
1347 template <typename I>
1348 void Replayer<I>::handle_wait_for_in_flight_ops(int r) {
1349 dout(10) << "r=" << r << dendl;
1350
1351 Context* on_shutdown = nullptr;
1352 {
1353 std::unique_lock locker{m_lock};
1354 ceph_assert(m_on_init_shutdown != nullptr);
1355 std::swap(on_shutdown, m_on_init_shutdown);
1356 }
1357 on_shutdown->complete(m_error_code);
1358 }
1359
1360 template <typename I>
1361 void Replayer<I>::handle_image_update_notify() {
1362 dout(10) << dendl;
1363
1364 std::unique_lock locker{m_lock};
1365 if (m_state == STATE_REPLAYING) {
1366 dout(15) << "flagging snapshot rescan required" << dendl;
1367 m_image_updated = true;
1368 } else if (m_state == STATE_IDLE) {
1369 m_state = STATE_REPLAYING;
1370 locker.unlock();
1371
1372 dout(15) << "restarting idle replayer" << dendl;
1373 load_local_image_meta();
1374 }
1375 }
1376
1377 template <typename I>
1378 void Replayer<I>::handle_replay_complete(int r,
1379 const std::string& description) {
1380 std::unique_lock locker{m_lock};
1381 handle_replay_complete(&locker, r, description);
1382 }
1383
1384 template <typename I>
1385 void Replayer<I>::handle_replay_complete(std::unique_lock<ceph::mutex>* locker,
1386 int r,
1387 const std::string& description) {
1388 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1389
1390 if (m_error_code == 0) {
1391 m_error_code = r;
1392 m_error_description = description;
1393 }
1394
1395 if (m_sync_in_progress) {
1396 m_sync_in_progress = false;
1397 m_instance_watcher->notify_sync_complete(
1398 m_state_builder->local_image_ctx->id);
1399 }
1400
1401 if (m_state != STATE_REPLAYING && m_state != STATE_IDLE) {
1402 return;
1403 }
1404
1405 m_state = STATE_COMPLETE;
1406 notify_status_updated();
1407 }
1408
1409 template <typename I>
1410 void Replayer<I>::notify_status_updated() {
1411 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1412
1413 dout(10) << dendl;
1414 auto ctx = new C_TrackedOp(m_in_flight_op_tracker, new LambdaContext(
1415 [this](int) {
1416 m_replayer_listener->handle_notification();
1417 }));
1418 m_threads->work_queue->queue(ctx, 0);
1419 }
1420
1421 template <typename I>
1422 bool Replayer<I>::is_replay_interrupted() {
1423 std::unique_lock locker{m_lock};
1424 return is_replay_interrupted(&locker);
1425 }
1426
1427 template <typename I>
1428 bool Replayer<I>::is_replay_interrupted(std::unique_lock<ceph::mutex>* locker) {
1429 if (m_state == STATE_COMPLETE) {
1430 locker->unlock();
1431
1432 dout(10) << "resuming pending shut down" << dendl;
1433 unregister_remote_update_watcher();
1434 return true;
1435 }
1436 return false;
1437 }
1438
1439 } // namespace snapshot
1440 } // namespace image_replayer
1441 } // namespace mirror
1442 } // namespace rbd
1443
1444 template class rbd::mirror::image_replayer::snapshot::Replayer<librbd::ImageCtx>;