]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/image_replayer/snapshot/Replayer.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / tools / rbd_mirror / image_replayer / snapshot / Replayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "Replayer.h"
5 #include "common/debug.h"
6 #include "common/errno.h"
7 #include "common/perf_counters.h"
8 #include "common/perf_counters_key.h"
9 #include "include/stringify.h"
10 #include "common/Timer.h"
11 #include "cls/rbd/cls_rbd_client.h"
12 #include "json_spirit/json_spirit.h"
13 #include "librbd/ImageCtx.h"
14 #include "librbd/ImageState.h"
15 #include "librbd/Operations.h"
16 #include "librbd/Utils.h"
17 #include "librbd/asio/ContextWQ.h"
18 #include "librbd/deep_copy/Handler.h"
19 #include "librbd/deep_copy/ImageCopyRequest.h"
20 #include "librbd/deep_copy/SnapshotCopyRequest.h"
21 #include "librbd/mirror/ImageStateUpdateRequest.h"
22 #include "librbd/mirror/snapshot/CreateNonPrimaryRequest.h"
23 #include "librbd/mirror/snapshot/GetImageStateRequest.h"
24 #include "librbd/mirror/snapshot/ImageMeta.h"
25 #include "librbd/mirror/snapshot/UnlinkPeerRequest.h"
26 #include "tools/rbd_mirror/InstanceWatcher.h"
27 #include "tools/rbd_mirror/PoolMetaCache.h"
28 #include "tools/rbd_mirror/Threads.h"
29 #include "tools/rbd_mirror/Types.h"
30 #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
31 #include "tools/rbd_mirror/image_replayer/ReplayerListener.h"
32 #include "tools/rbd_mirror/image_replayer/Utils.h"
33 #include "tools/rbd_mirror/image_replayer/snapshot/ApplyImageStateRequest.h"
34 #include "tools/rbd_mirror/image_replayer/snapshot/StateBuilder.h"
35 #include "tools/rbd_mirror/image_replayer/snapshot/Utils.h"
36 #include <set>
37
38 #define dout_context g_ceph_context
39 #define dout_subsys ceph_subsys_rbd_mirror
40 #undef dout_prefix
41 #define dout_prefix *_dout << "rbd::mirror::image_replayer::snapshot::" \
42 << "Replayer: " << this << " " << __func__ << ": "
43
44 extern PerfCounters *g_snapshot_perf_counters;
45
46 namespace rbd {
47 namespace mirror {
48 namespace image_replayer {
49 namespace snapshot {
50
51 namespace {
52
53 double round_to_two_places(double value) {
54 return abs(round(value * 100) / 100);
55 }
56
57 template<typename I>
58 std::pair<uint64_t, librbd::SnapInfo*> get_newest_mirror_snapshot(
59 I* image_ctx) {
60 for (auto snap_info_it = image_ctx->snap_info.rbegin();
61 snap_info_it != image_ctx->snap_info.rend(); ++snap_info_it) {
62 const auto& snap_ns = snap_info_it->second.snap_namespace;
63 auto mirror_ns = std::get_if<
64 cls::rbd::MirrorSnapshotNamespace>(&snap_ns);
65 if (mirror_ns == nullptr || !mirror_ns->complete) {
66 continue;
67 }
68
69 return {snap_info_it->first, &snap_info_it->second};
70 }
71
72 return {CEPH_NOSNAP, nullptr};
73 }
74
75 } // anonymous namespace
76
77 using librbd::util::create_async_context_callback;
78 using librbd::util::create_context_callback;
79 using librbd::util::create_rados_callback;
80
81 template <typename I>
82 struct Replayer<I>::C_UpdateWatchCtx : public librbd::UpdateWatchCtx {
83 Replayer<I>* replayer;
84
85 C_UpdateWatchCtx(Replayer<I>* replayer) : replayer(replayer) {
86 }
87
88 void handle_notify() override {
89 replayer->handle_image_update_notify();
90 }
91 };
92
93 template <typename I>
94 struct Replayer<I>::DeepCopyHandler : public librbd::deep_copy::Handler {
95 Replayer *replayer;
96
97 DeepCopyHandler(Replayer* replayer) : replayer(replayer) {
98 }
99
100 void handle_read(uint64_t bytes_read) override {
101 replayer->handle_copy_image_read(bytes_read);
102 }
103
104 int update_progress(uint64_t object_number, uint64_t object_count) override {
105 replayer->handle_copy_image_progress(object_number, object_count);
106 return 0;
107 }
108 };
109
110 template <typename I>
111 Replayer<I>::Replayer(
112 Threads<I>* threads,
113 InstanceWatcher<I>* instance_watcher,
114 const std::string& local_mirror_uuid,
115 PoolMetaCache* pool_meta_cache,
116 StateBuilder<I>* state_builder,
117 ReplayerListener* replayer_listener)
118 : m_threads(threads),
119 m_instance_watcher(instance_watcher),
120 m_local_mirror_uuid(local_mirror_uuid),
121 m_pool_meta_cache(pool_meta_cache),
122 m_state_builder(state_builder),
123 m_replayer_listener(replayer_listener),
124 m_lock(ceph::make_mutex(librbd::util::unique_lock_name(
125 "rbd::mirror::image_replayer::snapshot::Replayer", this))) {
126 dout(10) << dendl;
127 }
128
129 template <typename I>
130 Replayer<I>::~Replayer() {
131 dout(10) << dendl;
132
133 {
134 std::unique_lock locker{m_lock};
135 unregister_perf_counters();
136 }
137
138 ceph_assert(m_state == STATE_COMPLETE);
139 ceph_assert(m_update_watch_ctx == nullptr);
140 ceph_assert(m_deep_copy_handler == nullptr);
141 }
142
143 template <typename I>
144 void Replayer<I>::init(Context* on_finish) {
145 dout(10) << dendl;
146
147 ceph_assert(m_state == STATE_INIT);
148
149 RemotePoolMeta remote_pool_meta;
150 int r = m_pool_meta_cache->get_remote_pool_meta(
151 m_state_builder->remote_image_ctx->md_ctx.get_id(), &remote_pool_meta);
152 if (r < 0 || remote_pool_meta.mirror_peer_uuid.empty()) {
153 derr << "failed to retrieve mirror peer uuid from remote pool" << dendl;
154 m_state = STATE_COMPLETE;
155 m_threads->work_queue->queue(on_finish, r);
156 return;
157 }
158
159 m_remote_mirror_peer_uuid = remote_pool_meta.mirror_peer_uuid;
160 dout(10) << "remote_mirror_peer_uuid=" << m_remote_mirror_peer_uuid << dendl;
161
162 {
163 auto local_image_ctx = m_state_builder->local_image_ctx;
164 std::shared_lock image_locker{local_image_ctx->image_lock};
165 m_image_spec = image_replayer::util::compute_image_spec(
166 local_image_ctx->md_ctx, local_image_ctx->name);
167 }
168
169 {
170 std::unique_lock locker{m_lock};
171 register_perf_counters();
172 }
173
174 ceph_assert(m_on_init_shutdown == nullptr);
175 m_on_init_shutdown = on_finish;
176
177 register_local_update_watcher();
178 }
179
180 template <typename I>
181 void Replayer<I>::shut_down(Context* on_finish) {
182 dout(10) << dendl;
183
184 std::unique_lock locker{m_lock};
185 ceph_assert(m_on_init_shutdown == nullptr);
186 m_on_init_shutdown = on_finish;
187 m_error_code = 0;
188 m_error_description = "";
189
190 ceph_assert(m_state != STATE_INIT);
191 auto state = STATE_COMPLETE;
192 std::swap(m_state, state);
193
194 if (state == STATE_REPLAYING) {
195 // if a sync request was pending, request a cancelation
196 m_instance_watcher->cancel_sync_request(
197 m_state_builder->local_image_ctx->id);
198
199 // TODO interrupt snapshot copy and image copy state machines even if remote
200 // cluster is unreachable
201 dout(10) << "shut down pending on completion of snapshot replay" << dendl;
202 return;
203 }
204 locker.unlock();
205
206 unregister_remote_update_watcher();
207 }
208
209 template <typename I>
210 void Replayer<I>::flush(Context* on_finish) {
211 dout(10) << dendl;
212
213 // TODO
214 m_threads->work_queue->queue(on_finish, 0);
215 }
216
217 template <typename I>
218 bool Replayer<I>::get_replay_status(std::string* description,
219 Context* on_finish) {
220 dout(10) << dendl;
221
222 std::unique_lock locker{m_lock};
223 if (m_state != STATE_REPLAYING && m_state != STATE_IDLE) {
224 locker.unlock();
225
226 derr << "replay not running" << dendl;
227 on_finish->complete(-EAGAIN);
228 return false;
229 }
230
231 std::shared_lock local_image_locker{
232 m_state_builder->local_image_ctx->image_lock};
233 auto [local_snap_id, local_snap_info] = get_newest_mirror_snapshot(
234 m_state_builder->local_image_ctx);
235
236 std::shared_lock remote_image_locker{
237 m_state_builder->remote_image_ctx->image_lock};
238 auto [remote_snap_id, remote_snap_info] = get_newest_mirror_snapshot(
239 m_state_builder->remote_image_ctx);
240
241 if (remote_snap_info == nullptr) {
242 remote_image_locker.unlock();
243 local_image_locker.unlock();
244 locker.unlock();
245
246 derr << "remote image does not contain mirror snapshots" << dendl;
247 on_finish->complete(-EAGAIN);
248 return false;
249 }
250
251 std::string replay_state = "idle";
252 if (m_remote_snap_id_end != CEPH_NOSNAP) {
253 replay_state = "syncing";
254 }
255
256 json_spirit::mObject root_obj;
257 root_obj["replay_state"] = replay_state;
258 root_obj["remote_snapshot_timestamp"] = remote_snap_info->timestamp.sec();
259 if (m_perf_counters) {
260 m_perf_counters->tset(l_rbd_mirror_snapshot_remote_timestamp,
261 remote_snap_info->timestamp);
262 }
263
264 auto matching_remote_snap_id = util::compute_remote_snap_id(
265 m_state_builder->local_image_ctx->image_lock,
266 m_state_builder->local_image_ctx->snap_info,
267 local_snap_id, m_state_builder->remote_mirror_uuid);
268 auto matching_remote_snap_it =
269 m_state_builder->remote_image_ctx->snap_info.find(matching_remote_snap_id);
270 if (matching_remote_snap_id != CEPH_NOSNAP &&
271 matching_remote_snap_it !=
272 m_state_builder->remote_image_ctx->snap_info.end()) {
273 // use the timestamp from the matching remote image since
274 // the local snapshot would just be the time the snapshot was
275 // synced and not the consistency point in time.
276 root_obj["local_snapshot_timestamp"] =
277 matching_remote_snap_it->second.timestamp.sec();
278 if (m_perf_counters) {
279 m_perf_counters->tset(l_rbd_mirror_snapshot_local_timestamp,
280 matching_remote_snap_it->second.timestamp);
281 }
282 }
283
284 matching_remote_snap_it = m_state_builder->remote_image_ctx->snap_info.find(
285 m_remote_snap_id_end);
286 if (m_remote_snap_id_end != CEPH_NOSNAP &&
287 matching_remote_snap_it !=
288 m_state_builder->remote_image_ctx->snap_info.end()) {
289 root_obj["syncing_snapshot_timestamp"] = remote_snap_info->timestamp.sec();
290
291 if (m_local_object_count > 0) {
292 root_obj["syncing_percent"] =
293 100 * m_local_mirror_snap_ns.last_copied_object_number /
294 m_local_object_count;
295 } else {
296 // Set syncing_percent to 0 if m_local_object_count has
297 // not yet been set (last_copied_object_number may be > 0
298 // if the sync is being resumed).
299 root_obj["syncing_percent"] = 0;
300 }
301 }
302
303 m_bytes_per_second(0);
304 auto bytes_per_second = m_bytes_per_second.get_average();
305 root_obj["bytes_per_second"] = round_to_two_places(bytes_per_second);
306
307 auto bytes_per_snapshot = boost::accumulators::rolling_mean(
308 m_bytes_per_snapshot);
309 root_obj["bytes_per_snapshot"] = round_to_two_places(bytes_per_snapshot);
310
311 root_obj["last_snapshot_sync_seconds"] = m_last_snapshot_sync_seconds;
312 root_obj["last_snapshot_bytes"] = m_last_snapshot_bytes;
313
314 auto pending_bytes = bytes_per_snapshot * m_pending_snapshots;
315 if (bytes_per_second > 0 && m_pending_snapshots > 0) {
316 std::uint64_t seconds_until_synced = round_to_two_places(
317 pending_bytes / bytes_per_second);
318 if (seconds_until_synced >= std::numeric_limits<uint64_t>::max()) {
319 seconds_until_synced = std::numeric_limits<uint64_t>::max();
320 }
321
322 root_obj["seconds_until_synced"] = seconds_until_synced;
323 }
324
325 *description = json_spirit::write(
326 root_obj, json_spirit::remove_trailing_zeros);
327
328 local_image_locker.unlock();
329 remote_image_locker.unlock();
330 locker.unlock();
331 on_finish->complete(-EEXIST);
332 return true;
333 }
334
335 template <typename I>
336 void Replayer<I>::load_local_image_meta() {
337 dout(10) << dendl;
338
339 {
340 // reset state in case new snapshot is added while we are scanning
341 std::unique_lock locker{m_lock};
342 m_image_updated = false;
343 }
344
345 bool update_status = false;
346 {
347 auto local_image_ctx = m_state_builder->local_image_ctx;
348 std::shared_lock image_locker{local_image_ctx->image_lock};
349 auto image_spec = image_replayer::util::compute_image_spec(
350 local_image_ctx->md_ctx, local_image_ctx->name);
351 if (m_image_spec != image_spec) {
352 m_image_spec = image_spec;
353 update_status = true;
354 }
355 }
356 if (update_status) {
357 std::unique_lock locker{m_lock};
358 unregister_perf_counters();
359 register_perf_counters();
360 notify_status_updated();
361 }
362
363 ceph_assert(m_state_builder->local_image_meta != nullptr);
364 auto ctx = create_context_callback<
365 Replayer<I>, &Replayer<I>::handle_load_local_image_meta>(this);
366 m_state_builder->local_image_meta->load(ctx);
367 }
368
369 template <typename I>
370 void Replayer<I>::handle_load_local_image_meta(int r) {
371 dout(10) << "r=" << r << dendl;
372
373 if (r < 0 && r != -ENOENT) {
374 derr << "failed to load local image-meta: " << cpp_strerror(r) << dendl;
375 handle_replay_complete(r, "failed to load local image-meta");
376 return;
377 }
378
379 if (r >= 0 && m_state_builder->local_image_meta->resync_requested) {
380 m_resync_requested = true;
381
382 dout(10) << "local image resync requested" << dendl;
383 handle_replay_complete(0, "resync requested");
384 return;
385 }
386
387 refresh_local_image();
388 }
389
390 template <typename I>
391 void Replayer<I>::refresh_local_image() {
392 if (!m_state_builder->local_image_ctx->state->is_refresh_required()) {
393 refresh_remote_image();
394 return;
395 }
396
397 dout(10) << dendl;
398 auto ctx = create_context_callback<
399 Replayer<I>, &Replayer<I>::handle_refresh_local_image>(this);
400 m_state_builder->local_image_ctx->state->refresh(ctx);
401 }
402
403 template <typename I>
404 void Replayer<I>::handle_refresh_local_image(int r) {
405 dout(10) << "r=" << r << dendl;
406
407 if (r < 0) {
408 derr << "failed to refresh local image: " << cpp_strerror(r) << dendl;
409 handle_replay_complete(r, "failed to refresh local image");
410 return;
411 }
412
413 refresh_remote_image();
414 }
415
416 template <typename I>
417 void Replayer<I>::refresh_remote_image() {
418 if (!m_state_builder->remote_image_ctx->state->is_refresh_required()) {
419 std::unique_lock locker{m_lock};
420 scan_local_mirror_snapshots(&locker);
421 return;
422 }
423
424 dout(10) << dendl;
425 auto ctx = create_context_callback<
426 Replayer<I>, &Replayer<I>::handle_refresh_remote_image>(this);
427 m_state_builder->remote_image_ctx->state->refresh(ctx);
428 }
429
430 template <typename I>
431 void Replayer<I>::handle_refresh_remote_image(int r) {
432 dout(10) << "r=" << r << dendl;
433
434 if (r < 0) {
435 derr << "failed to refresh remote image: " << cpp_strerror(r) << dendl;
436 handle_replay_complete(r, "failed to refresh remote image");
437 return;
438 }
439
440 std::unique_lock locker{m_lock};
441 scan_local_mirror_snapshots(&locker);
442 }
443
444 template <typename I>
445 void Replayer<I>::scan_local_mirror_snapshots(
446 std::unique_lock<ceph::mutex>* locker) {
447 if (is_replay_interrupted(locker)) {
448 return;
449 }
450
451 dout(10) << dendl;
452
453 m_local_snap_id_start = 0;
454 m_local_snap_id_end = CEPH_NOSNAP;
455 m_local_mirror_snap_ns = {};
456 m_local_object_count = 0;
457
458 m_remote_snap_id_start = 0;
459 m_remote_snap_id_end = CEPH_NOSNAP;
460 m_remote_mirror_snap_ns = {};
461
462 std::set<uint64_t> prune_snap_ids;
463
464 auto local_image_ctx = m_state_builder->local_image_ctx;
465 std::shared_lock image_locker{local_image_ctx->image_lock};
466 for (auto snap_info_it = local_image_ctx->snap_info.begin();
467 snap_info_it != local_image_ctx->snap_info.end(); ++snap_info_it) {
468 const auto& snap_ns = snap_info_it->second.snap_namespace;
469 auto mirror_ns = std::get_if<
470 cls::rbd::MirrorSnapshotNamespace>(&snap_ns);
471 if (mirror_ns == nullptr) {
472 continue;
473 }
474
475 dout(15) << "local mirror snapshot: id=" << snap_info_it->first << ", "
476 << "mirror_ns=" << *mirror_ns << dendl;
477 m_local_mirror_snap_ns = *mirror_ns;
478
479 auto local_snap_id = snap_info_it->first;
480 if (mirror_ns->is_non_primary()) {
481 if (mirror_ns->complete) {
482 // if remote has new snapshots, we would sync from here
483 m_local_snap_id_start = local_snap_id;
484 ceph_assert(m_local_snap_id_end == CEPH_NOSNAP);
485
486 if (mirror_ns->mirror_peer_uuids.empty()) {
487 // no other peer will attempt to sync to this snapshot so store as
488 // a candidate for removal
489 prune_snap_ids.insert(local_snap_id);
490 }
491 } else if (mirror_ns->last_copied_object_number == 0 &&
492 m_local_snap_id_start > 0) {
493 // snapshot might be missing image state, object-map, etc, so just
494 // delete and re-create it if we haven't started copying data
495 // objects. Also only prune this snapshot since we will need the
496 // previous mirror snapshot for syncing. Special case exception for
497 // the first non-primary snapshot since we know its snapshot is
498 // well-formed because otherwise the mirror-image-state would have
499 // forced an image deletion.
500 prune_snap_ids.clear();
501 prune_snap_ids.insert(local_snap_id);
502 break;
503 } else {
504 // start snap will be last complete mirror snapshot or initial
505 // image revision
506 m_local_snap_id_end = local_snap_id;
507 break;
508 }
509 } else if (mirror_ns->is_primary()) {
510 if (mirror_ns->complete) {
511 m_local_snap_id_start = local_snap_id;
512 ceph_assert(m_local_snap_id_end == CEPH_NOSNAP);
513 } else {
514 derr << "incomplete local primary snapshot" << dendl;
515 handle_replay_complete(locker, -EINVAL,
516 "incomplete local primary snapshot");
517 return;
518 }
519 } else {
520 derr << "unknown local mirror snapshot state" << dendl;
521 handle_replay_complete(locker, -EINVAL,
522 "invalid local mirror snapshot state");
523 return;
524 }
525 }
526 image_locker.unlock();
527
528 if (m_local_snap_id_start > 0) {
529 // remove candidate that is required for delta snapshot sync
530 prune_snap_ids.erase(m_local_snap_id_start);
531 }
532 if (!prune_snap_ids.empty()) {
533 locker->unlock();
534
535 auto prune_snap_id = *prune_snap_ids.begin();
536 dout(5) << "pruning unused non-primary snapshot " << prune_snap_id << dendl;
537 prune_non_primary_snapshot(prune_snap_id);
538 return;
539 }
540
541 if (m_local_snap_id_start > 0 || m_local_snap_id_end != CEPH_NOSNAP) {
542 if (m_local_mirror_snap_ns.is_non_primary() &&
543 m_local_mirror_snap_ns.primary_mirror_uuid !=
544 m_state_builder->remote_mirror_uuid) {
545 // TODO support multiple peers
546 derr << "local image linked to unknown peer: "
547 << m_local_mirror_snap_ns.primary_mirror_uuid << dendl;
548 handle_replay_complete(locker, -EEXIST,
549 "local image linked to unknown peer");
550 return;
551 } else if (m_local_mirror_snap_ns.state ==
552 cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY) {
553 dout(5) << "local image promoted" << dendl;
554 handle_replay_complete(locker, 0, "force promoted");
555 return;
556 }
557
558 dout(10) << "found local mirror snapshot: "
559 << "local_snap_id_start=" << m_local_snap_id_start << ", "
560 << "local_snap_id_end=" << m_local_snap_id_end << ", "
561 << "local_snap_ns=" << m_local_mirror_snap_ns << dendl;
562 if (!m_local_mirror_snap_ns.is_primary() &&
563 m_local_mirror_snap_ns.complete) {
564 // our remote sync should start after this completed snapshot
565 m_remote_snap_id_start = m_local_mirror_snap_ns.primary_snap_id;
566 }
567 }
568
569 // we don't have any mirror snapshots or only completed non-primary
570 // mirror snapshots
571 scan_remote_mirror_snapshots(locker);
572 }
573
574 template <typename I>
575 void Replayer<I>::scan_remote_mirror_snapshots(
576 std::unique_lock<ceph::mutex>* locker) {
577 dout(10) << dendl;
578
579 m_pending_snapshots = 0;
580
581 std::set<uint64_t> unlink_snap_ids;
582 bool split_brain = false;
583 bool remote_demoted = false;
584 auto remote_image_ctx = m_state_builder->remote_image_ctx;
585 std::shared_lock image_locker{remote_image_ctx->image_lock};
586 for (auto snap_info_it = remote_image_ctx->snap_info.begin();
587 snap_info_it != remote_image_ctx->snap_info.end(); ++snap_info_it) {
588 const auto& snap_ns = snap_info_it->second.snap_namespace;
589 auto mirror_ns = std::get_if<
590 cls::rbd::MirrorSnapshotNamespace>(&snap_ns);
591 if (mirror_ns == nullptr) {
592 continue;
593 }
594
595 dout(15) << "remote mirror snapshot: id=" << snap_info_it->first << ", "
596 << "mirror_ns=" << *mirror_ns << dendl;
597 remote_demoted = mirror_ns->is_demoted();
598 if (!mirror_ns->is_primary() && !mirror_ns->is_non_primary()) {
599 derr << "unknown remote mirror snapshot state" << dendl;
600 handle_replay_complete(locker, -EINVAL,
601 "invalid remote mirror snapshot state");
602 return;
603 } else if (mirror_ns->mirror_peer_uuids.count(m_remote_mirror_peer_uuid) ==
604 0) {
605 dout(15) << "skipping remote snapshot due to missing mirror peer"
606 << dendl;
607 continue;
608 }
609
610 auto remote_snap_id = snap_info_it->first;
611 if (m_local_snap_id_start > 0 || m_local_snap_id_end != CEPH_NOSNAP) {
612 // we have a local mirror snapshot
613 if (m_local_mirror_snap_ns.is_non_primary()) {
614 // previously validated that it was linked to remote
615 ceph_assert(m_local_mirror_snap_ns.primary_mirror_uuid ==
616 m_state_builder->remote_mirror_uuid);
617
618 if (m_remote_snap_id_end == CEPH_NOSNAP) {
619 // haven't found the end snap so treat this as a candidate for unlink
620 unlink_snap_ids.insert(remote_snap_id);
621 }
622 if (m_local_mirror_snap_ns.complete &&
623 m_local_mirror_snap_ns.primary_snap_id >= remote_snap_id) {
624 // skip past completed remote snapshot
625 m_remote_snap_id_start = remote_snap_id;
626 m_remote_mirror_snap_ns = *mirror_ns;
627 dout(15) << "skipping synced remote snapshot " << remote_snap_id
628 << dendl;
629 continue;
630 } else if (!m_local_mirror_snap_ns.complete &&
631 m_local_mirror_snap_ns.primary_snap_id > remote_snap_id) {
632 // skip until we get to the in-progress remote snapshot
633 dout(15) << "skipping synced remote snapshot " << remote_snap_id
634 << " while search for in-progress sync" << dendl;
635 m_remote_snap_id_start = remote_snap_id;
636 m_remote_mirror_snap_ns = *mirror_ns;
637 continue;
638 }
639 } else if (m_local_mirror_snap_ns.state ==
640 cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY_DEMOTED) {
641 // find the matching demotion snapshot in remote image
642 ceph_assert(m_local_snap_id_start > 0);
643 if (mirror_ns->state ==
644 cls::rbd::MIRROR_SNAPSHOT_STATE_NON_PRIMARY_DEMOTED &&
645 mirror_ns->primary_mirror_uuid == m_local_mirror_uuid &&
646 mirror_ns->primary_snap_id == m_local_snap_id_start) {
647 dout(10) << "located matching demotion snapshot: "
648 << "remote_snap_id=" << remote_snap_id << ", "
649 << "local_snap_id=" << m_local_snap_id_start << dendl;
650 m_remote_snap_id_start = remote_snap_id;
651 split_brain = false;
652 continue;
653 } else if (m_remote_snap_id_start == 0) {
654 // still looking for our matching demotion snapshot
655 dout(15) << "skipping remote snapshot " << remote_snap_id << " "
656 << "while searching for demotion" << dendl;
657 split_brain = true;
658 continue;
659 }
660 } else {
661 // should not have been able to reach this
662 ceph_assert(false);
663 }
664 } else if (!mirror_ns->is_primary()) {
665 dout(15) << "skipping non-primary remote snapshot" << dendl;
666 continue;
667 }
668
669 // found candidate snapshot to sync
670 ++m_pending_snapshots;
671 if (m_remote_snap_id_end != CEPH_NOSNAP) {
672 continue;
673 }
674
675 // first primary snapshot where were are listed as a peer
676 m_remote_snap_id_end = remote_snap_id;
677 m_remote_mirror_snap_ns = *mirror_ns;
678 }
679
680 if (m_remote_snap_id_start != 0 &&
681 remote_image_ctx->snap_info.count(m_remote_snap_id_start) == 0) {
682 // the remote start snapshot was deleted out from under us
683 derr << "failed to locate remote start snapshot: "
684 << "snap_id=" << m_remote_snap_id_start << dendl;
685 split_brain = true;
686 }
687
688 image_locker.unlock();
689
690 if (!split_brain) {
691 unlink_snap_ids.erase(m_remote_snap_id_start);
692 unlink_snap_ids.erase(m_remote_snap_id_end);
693 if (!unlink_snap_ids.empty()) {
694 locker->unlock();
695
696 // retry the unlinking process for a remote snapshot that we do not
697 // need anymore
698 auto remote_snap_id = *unlink_snap_ids.begin();
699 dout(10) << "unlinking from remote snapshot " << remote_snap_id << dendl;
700 unlink_peer(remote_snap_id);
701 return;
702 }
703
704 if (m_remote_snap_id_end != CEPH_NOSNAP) {
705 dout(10) << "found remote mirror snapshot: "
706 << "remote_snap_id_start=" << m_remote_snap_id_start << ", "
707 << "remote_snap_id_end=" << m_remote_snap_id_end << ", "
708 << "remote_snap_ns=" << m_remote_mirror_snap_ns << dendl;
709 if (m_remote_mirror_snap_ns.complete) {
710 locker->unlock();
711
712 if (m_local_snap_id_end != CEPH_NOSNAP &&
713 !m_local_mirror_snap_ns.complete) {
714 // attempt to resume image-sync
715 dout(10) << "local image contains in-progress mirror snapshot"
716 << dendl;
717 get_local_image_state();
718 } else {
719 copy_snapshots();
720 }
721 return;
722 } else {
723 // might have raced with the creation of a remote mirror snapshot
724 // so we will need to refresh and rescan once it completes
725 dout(15) << "remote mirror snapshot not complete" << dendl;
726 }
727 }
728 }
729
730 if (m_image_updated) {
731 // received update notification while scanning image, restart ...
732 m_image_updated = false;
733 locker->unlock();
734
735 dout(10) << "restarting snapshot scan due to remote update notification"
736 << dendl;
737 load_local_image_meta();
738 return;
739 }
740
741 if (is_replay_interrupted(locker)) {
742 return;
743 } else if (split_brain) {
744 derr << "split-brain detected: failed to find matching non-primary "
745 << "snapshot in remote image: "
746 << "local_snap_id_start=" << m_local_snap_id_start << ", "
747 << "local_snap_ns=" << m_local_mirror_snap_ns << dendl;
748 handle_replay_complete(locker, -EEXIST, "split-brain");
749 return;
750 } else if (remote_demoted) {
751 dout(10) << "remote image demoted" << dendl;
752 handle_replay_complete(locker, -EREMOTEIO, "remote image demoted");
753 return;
754 }
755
756 dout(10) << "all remote snapshots synced: idling waiting for new snapshot"
757 << dendl;
758 ceph_assert(m_state == STATE_REPLAYING);
759 m_state = STATE_IDLE;
760
761 notify_status_updated();
762 }
763
764 template <typename I>
765 void Replayer<I>::prune_non_primary_snapshot(uint64_t snap_id) {
766 dout(10) << "snap_id=" << snap_id << dendl;
767
768 auto local_image_ctx = m_state_builder->local_image_ctx;
769 bool snap_valid = false;
770 cls::rbd::SnapshotNamespace snap_namespace;
771 std::string snap_name;
772
773 {
774 std::shared_lock image_locker{local_image_ctx->image_lock};
775 auto snap_info = local_image_ctx->get_snap_info(snap_id);
776 if (snap_info != nullptr) {
777 snap_valid = true;
778 snap_namespace = snap_info->snap_namespace;
779 snap_name = snap_info->name;
780
781 ceph_assert(std::holds_alternative<cls::rbd::MirrorSnapshotNamespace>(
782 snap_namespace));
783 }
784 }
785
786 if (!snap_valid) {
787 load_local_image_meta();
788 return;
789 }
790
791 auto ctx = create_context_callback<
792 Replayer<I>, &Replayer<I>::handle_prune_non_primary_snapshot>(this);
793 local_image_ctx->operations->snap_remove(snap_namespace, snap_name, ctx);
794 }
795
796 template <typename I>
797 void Replayer<I>::handle_prune_non_primary_snapshot(int r) {
798 dout(10) << "r=" << r << dendl;
799
800 if (r < 0 && r != -ENOENT) {
801 derr << "failed to prune non-primary snapshot: " << cpp_strerror(r)
802 << dendl;
803 handle_replay_complete(r, "failed to prune non-primary snapshot");
804 return;
805 }
806
807 if (is_replay_interrupted()) {
808 return;
809 }
810
811 load_local_image_meta();
812 }
813
814 template <typename I>
815 void Replayer<I>::copy_snapshots() {
816 dout(10) << "remote_snap_id_start=" << m_remote_snap_id_start << ", "
817 << "remote_snap_id_end=" << m_remote_snap_id_end << ", "
818 << "local_snap_id_start=" << m_local_snap_id_start << dendl;
819
820 ceph_assert(m_remote_snap_id_start != CEPH_NOSNAP);
821 ceph_assert(m_remote_snap_id_end > 0 &&
822 m_remote_snap_id_end != CEPH_NOSNAP);
823 ceph_assert(m_local_snap_id_start != CEPH_NOSNAP);
824
825 m_local_mirror_snap_ns = {};
826 auto ctx = create_context_callback<
827 Replayer<I>, &Replayer<I>::handle_copy_snapshots>(this);
828 auto req = librbd::deep_copy::SnapshotCopyRequest<I>::create(
829 m_state_builder->remote_image_ctx, m_state_builder->local_image_ctx,
830 m_remote_snap_id_start, m_remote_snap_id_end, m_local_snap_id_start,
831 false, m_threads->work_queue, &m_local_mirror_snap_ns.snap_seqs,
832 ctx);
833 req->send();
834 }
835
836 template <typename I>
837 void Replayer<I>::handle_copy_snapshots(int r) {
838 dout(10) << "r=" << r << dendl;
839
840 if (r < 0) {
841 derr << "failed to copy snapshots from remote to local image: "
842 << cpp_strerror(r) << dendl;
843 handle_replay_complete(
844 r, "failed to copy snapshots from remote to local image");
845 return;
846 }
847
848 dout(10) << "remote_snap_id_start=" << m_remote_snap_id_start << ", "
849 << "remote_snap_id_end=" << m_remote_snap_id_end << ", "
850 << "local_snap_id_start=" << m_local_snap_id_start << ", "
851 << "snap_seqs=" << m_local_mirror_snap_ns.snap_seqs << dendl;
852 get_remote_image_state();
853 }
854
855 template <typename I>
856 void Replayer<I>::get_remote_image_state() {
857 dout(10) << dendl;
858
859 auto ctx = create_context_callback<
860 Replayer<I>, &Replayer<I>::handle_get_remote_image_state>(this);
861 auto req = librbd::mirror::snapshot::GetImageStateRequest<I>::create(
862 m_state_builder->remote_image_ctx, m_remote_snap_id_end,
863 &m_image_state, ctx);
864 req->send();
865 }
866
867 template <typename I>
868 void Replayer<I>::handle_get_remote_image_state(int r) {
869 dout(10) << "r=" << r << dendl;
870
871 if (r < 0) {
872 derr << "failed to retrieve remote snapshot image state: "
873 << cpp_strerror(r) << dendl;
874 handle_replay_complete(r, "failed to retrieve remote snapshot image state");
875 return;
876 }
877
878 create_non_primary_snapshot();
879 }
880
881 template <typename I>
882 void Replayer<I>::get_local_image_state() {
883 dout(10) << dendl;
884
885 ceph_assert(m_local_snap_id_end != CEPH_NOSNAP);
886 auto ctx = create_context_callback<
887 Replayer<I>, &Replayer<I>::handle_get_local_image_state>(this);
888 auto req = librbd::mirror::snapshot::GetImageStateRequest<I>::create(
889 m_state_builder->local_image_ctx, m_local_snap_id_end,
890 &m_image_state, ctx);
891 req->send();
892 }
893
894 template <typename I>
895 void Replayer<I>::handle_get_local_image_state(int r) {
896 dout(10) << "r=" << r << dendl;
897
898 if (r < 0) {
899 derr << "failed to retrieve local snapshot image state: "
900 << cpp_strerror(r) << dendl;
901 handle_replay_complete(r, "failed to retrieve local snapshot image state");
902 return;
903 }
904
905 request_sync();
906 }
907
908 template <typename I>
909 void Replayer<I>::create_non_primary_snapshot() {
910 auto local_image_ctx = m_state_builder->local_image_ctx;
911
912 if (m_local_snap_id_start > 0) {
913 std::shared_lock local_image_locker{local_image_ctx->image_lock};
914
915 auto local_snap_info_it = local_image_ctx->snap_info.find(
916 m_local_snap_id_start);
917 if (local_snap_info_it == local_image_ctx->snap_info.end()) {
918 local_image_locker.unlock();
919
920 derr << "failed to locate local snapshot " << m_local_snap_id_start
921 << dendl;
922 handle_replay_complete(-ENOENT, "failed to locate local start snapshot");
923 return;
924 }
925
926 auto mirror_ns = std::get_if<cls::rbd::MirrorSnapshotNamespace>(
927 &local_snap_info_it->second.snap_namespace);
928 ceph_assert(mirror_ns != nullptr);
929
930 auto remote_image_ctx = m_state_builder->remote_image_ctx;
931 std::shared_lock remote_image_locker{remote_image_ctx->image_lock};
932
933 // (re)build a full mapping from remote to local snap ids for all user
934 // snapshots to support applying image state in the future
935 for (auto& [remote_snap_id, remote_snap_info] :
936 remote_image_ctx->snap_info) {
937 if (remote_snap_id >= m_remote_snap_id_end) {
938 break;
939 }
940
941 // we can ignore all non-user snapshots since image state only includes
942 // user snapshots
943 if (!std::holds_alternative<cls::rbd::UserSnapshotNamespace>(
944 remote_snap_info.snap_namespace)) {
945 continue;
946 }
947
948 uint64_t local_snap_id = CEPH_NOSNAP;
949 if (mirror_ns->is_demoted() && !m_remote_mirror_snap_ns.is_demoted()) {
950 // if we are creating a non-primary snapshot following a demotion,
951 // re-build the full snapshot sequence since we don't have a valid
952 // snapshot mapping
953 auto local_snap_id_it = local_image_ctx->snap_ids.find(
954 {remote_snap_info.snap_namespace, remote_snap_info.name});
955 if (local_snap_id_it != local_image_ctx->snap_ids.end()) {
956 local_snap_id = local_snap_id_it->second;
957 }
958 } else {
959 auto snap_seq_it = mirror_ns->snap_seqs.find(remote_snap_id);
960 if (snap_seq_it != mirror_ns->snap_seqs.end()) {
961 local_snap_id = snap_seq_it->second;
962 }
963 }
964
965 if (m_local_mirror_snap_ns.snap_seqs.count(remote_snap_id) == 0 &&
966 local_snap_id != CEPH_NOSNAP) {
967 dout(15) << "mapping remote snapshot " << remote_snap_id << " to "
968 << "local snapshot " << local_snap_id << dendl;
969 m_local_mirror_snap_ns.snap_seqs[remote_snap_id] = local_snap_id;
970 }
971 }
972 }
973
974 dout(10) << "demoted=" << m_remote_mirror_snap_ns.is_demoted() << ", "
975 << "primary_mirror_uuid="
976 << m_state_builder->remote_mirror_uuid << ", "
977 << "primary_snap_id=" << m_remote_snap_id_end << ", "
978 << "snap_seqs=" << m_local_mirror_snap_ns.snap_seqs << dendl;
979
980 auto ctx = create_context_callback<
981 Replayer<I>, &Replayer<I>::handle_create_non_primary_snapshot>(this);
982 auto req = librbd::mirror::snapshot::CreateNonPrimaryRequest<I>::create(
983 local_image_ctx, m_remote_mirror_snap_ns.is_demoted(),
984 m_state_builder->remote_mirror_uuid, m_remote_snap_id_end,
985 m_local_mirror_snap_ns.snap_seqs, m_image_state, &m_local_snap_id_end, ctx);
986 req->send();
987 }
988
989 template <typename I>
990 void Replayer<I>::handle_create_non_primary_snapshot(int r) {
991 dout(10) << "r=" << r << dendl;
992
993 if (r < 0) {
994 derr << "failed to create local mirror snapshot: " << cpp_strerror(r)
995 << dendl;
996 handle_replay_complete(r, "failed to create local mirror snapshot");
997 return;
998 }
999
1000 dout(15) << "local_snap_id_end=" << m_local_snap_id_end << dendl;
1001
1002 update_mirror_image_state();
1003 }
1004
1005 template <typename I>
1006 void Replayer<I>::update_mirror_image_state() {
1007 if (m_local_snap_id_start > 0) {
1008 request_sync();
1009 return;
1010 }
1011
1012 // a newly created non-primary image has a local mirror state of CREATING
1013 // until this point so that we could avoid preserving the image until
1014 // the first non-primary snapshot linked the two images together.
1015 dout(10) << dendl;
1016 auto ctx = create_context_callback<
1017 Replayer<I>, &Replayer<I>::handle_update_mirror_image_state>(this);
1018 auto req = librbd::mirror::ImageStateUpdateRequest<I>::create(
1019 m_state_builder->local_image_ctx->md_ctx,
1020 m_state_builder->local_image_ctx->id,
1021 cls::rbd::MIRROR_IMAGE_STATE_ENABLED, {}, ctx);
1022 req->send();
1023 }
1024
1025 template <typename I>
1026 void Replayer<I>::handle_update_mirror_image_state(int r) {
1027 dout(10) << "r=" << r << dendl;
1028
1029 if (r < 0) {
1030 derr << "failed to update local mirror image state: " << cpp_strerror(r)
1031 << dendl;
1032 handle_replay_complete(r, "failed to update local mirror image state");
1033 return;
1034 }
1035
1036 request_sync();
1037 }
1038
1039 template <typename I>
1040 void Replayer<I>::request_sync() {
1041 if (m_remote_mirror_snap_ns.clean_since_snap_id == m_remote_snap_id_start) {
1042 dout(10) << "skipping unnecessary image copy: "
1043 << "remote_snap_id_start=" << m_remote_snap_id_start << ", "
1044 << "remote_mirror_snap_ns=" << m_remote_mirror_snap_ns << dendl;
1045 apply_image_state();
1046 return;
1047 }
1048
1049 dout(10) << dendl;
1050 std::unique_lock locker{m_lock};
1051 if (is_replay_interrupted(&locker)) {
1052 return;
1053 }
1054
1055 auto ctx = create_async_context_callback(
1056 m_threads->work_queue, create_context_callback<
1057 Replayer<I>, &Replayer<I>::handle_request_sync>(this));
1058 m_instance_watcher->notify_sync_request(m_state_builder->local_image_ctx->id,
1059 ctx);
1060 }
1061
1062 template <typename I>
1063 void Replayer<I>::handle_request_sync(int r) {
1064 dout(10) << "r=" << r << dendl;
1065
1066 std::unique_lock locker{m_lock};
1067 if (is_replay_interrupted(&locker)) {
1068 return;
1069 } else if (r == -ECANCELED) {
1070 dout(5) << "image-sync canceled" << dendl;
1071 handle_replay_complete(&locker, r, "image-sync canceled");
1072 return;
1073 } else if (r < 0) {
1074 derr << "failed to request image-sync: " << cpp_strerror(r) << dendl;
1075 handle_replay_complete(&locker, r, "failed to request image-sync");
1076 return;
1077 }
1078
1079 m_sync_in_progress = true;
1080 locker.unlock();
1081
1082 copy_image();
1083 }
1084
1085 template <typename I>
1086 void Replayer<I>::copy_image() {
1087 dout(10) << "remote_snap_id_start=" << m_remote_snap_id_start << ", "
1088 << "remote_snap_id_end=" << m_remote_snap_id_end << ", "
1089 << "local_snap_id_start=" << m_local_snap_id_start << ", "
1090 << "last_copied_object_number="
1091 << m_local_mirror_snap_ns.last_copied_object_number << ", "
1092 << "snap_seqs=" << m_local_mirror_snap_ns.snap_seqs << dendl;
1093
1094 m_snapshot_bytes = 0;
1095 m_snapshot_replay_start = ceph_clock_now();
1096 m_deep_copy_handler = new DeepCopyHandler(this);
1097 auto ctx = create_context_callback<
1098 Replayer<I>, &Replayer<I>::handle_copy_image>(this);
1099 auto req = librbd::deep_copy::ImageCopyRequest<I>::create(
1100 m_state_builder->remote_image_ctx, m_state_builder->local_image_ctx,
1101 m_remote_snap_id_start, m_remote_snap_id_end, m_local_snap_id_start, false,
1102 (m_local_mirror_snap_ns.last_copied_object_number > 0 ?
1103 librbd::deep_copy::ObjectNumber{
1104 m_local_mirror_snap_ns.last_copied_object_number} :
1105 librbd::deep_copy::ObjectNumber{}),
1106 m_local_mirror_snap_ns.snap_seqs, m_deep_copy_handler, ctx);
1107 req->send();
1108 }
1109
1110 template <typename I>
1111 void Replayer<I>::handle_copy_image(int r) {
1112 dout(10) << "r=" << r << dendl;
1113
1114 delete m_deep_copy_handler;
1115 m_deep_copy_handler = nullptr;
1116
1117 if (r < 0) {
1118 derr << "failed to copy remote image to local image: " << cpp_strerror(r)
1119 << dendl;
1120 handle_replay_complete(r, "failed to copy remote image");
1121 return;
1122 }
1123
1124 {
1125 std::unique_lock locker{m_lock};
1126 m_last_snapshot_bytes = m_snapshot_bytes;
1127 m_bytes_per_snapshot(m_snapshot_bytes);
1128 utime_t duration = ceph_clock_now() - m_snapshot_replay_start;
1129 m_last_snapshot_sync_seconds = duration.sec();
1130
1131 if (g_snapshot_perf_counters) {
1132 g_snapshot_perf_counters->inc(l_rbd_mirror_snapshot_sync_bytes,
1133 m_snapshot_bytes);
1134 g_snapshot_perf_counters->inc(l_rbd_mirror_snapshot_snapshots);
1135 g_snapshot_perf_counters->tinc(l_rbd_mirror_snapshot_sync_time,
1136 duration);
1137 }
1138 if (m_perf_counters) {
1139 m_perf_counters->inc(l_rbd_mirror_snapshot_sync_bytes, m_snapshot_bytes);
1140 m_perf_counters->inc(l_rbd_mirror_snapshot_snapshots);
1141 m_perf_counters->tinc(l_rbd_mirror_snapshot_sync_time, duration);
1142 m_perf_counters->tset(l_rbd_mirror_snapshot_last_sync_time, duration);
1143 m_perf_counters->set(l_rbd_mirror_snapshot_last_sync_bytes,
1144 m_snapshot_bytes);
1145 }
1146 }
1147
1148 apply_image_state();
1149 }
1150
1151 template <typename I>
1152 void Replayer<I>::handle_copy_image_progress(uint64_t object_number,
1153 uint64_t object_count) {
1154 dout(10) << "object_number=" << object_number << ", "
1155 << "object_count=" << object_count << dendl;
1156
1157 std::unique_lock locker{m_lock};
1158 m_local_mirror_snap_ns.last_copied_object_number = std::min(
1159 object_number, object_count);
1160 m_local_object_count = object_count;
1161
1162 update_non_primary_snapshot(false);
1163 }
1164
1165 template <typename I>
1166 void Replayer<I>::handle_copy_image_read(uint64_t bytes_read) {
1167 dout(20) << "bytes_read=" << bytes_read << dendl;
1168
1169 std::unique_lock locker{m_lock};
1170 m_bytes_per_second(bytes_read);
1171 m_snapshot_bytes += bytes_read;
1172 }
1173
1174 template <typename I>
1175 void Replayer<I>::apply_image_state() {
1176 dout(10) << dendl;
1177
1178 auto ctx = create_context_callback<
1179 Replayer<I>, &Replayer<I>::handle_apply_image_state>(this);
1180 auto req = ApplyImageStateRequest<I>::create(
1181 m_local_mirror_uuid,
1182 m_state_builder->remote_mirror_uuid,
1183 m_state_builder->local_image_ctx,
1184 m_state_builder->remote_image_ctx,
1185 m_image_state, ctx);
1186 req->send();
1187 }
1188
1189 template <typename I>
1190 void Replayer<I>::handle_apply_image_state(int r) {
1191 dout(10) << "r=" << r << dendl;
1192
1193 if (r < 0 && r != -ENOENT) {
1194 derr << "failed to apply remote image state to local image: "
1195 << cpp_strerror(r) << dendl;
1196 handle_replay_complete(r, "failed to apply remote image state");
1197 return;
1198 }
1199
1200 std::unique_lock locker{m_lock};
1201 update_non_primary_snapshot(true);
1202 }
1203
1204 template <typename I>
1205 void Replayer<I>::update_non_primary_snapshot(bool complete) {
1206 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1207 if (!complete) {
1208 // disallow two in-flight updates if this isn't the completion of the sync
1209 if (m_updating_sync_point) {
1210 return;
1211 }
1212 m_updating_sync_point = true;
1213 } else {
1214 m_local_mirror_snap_ns.complete = true;
1215 }
1216
1217 dout(10) << dendl;
1218
1219 librados::ObjectWriteOperation op;
1220 librbd::cls_client::mirror_image_snapshot_set_copy_progress(
1221 &op, m_local_snap_id_end, m_local_mirror_snap_ns.complete,
1222 m_local_mirror_snap_ns.last_copied_object_number);
1223
1224 auto ctx = new C_TrackedOp(
1225 m_in_flight_op_tracker, new LambdaContext([this, complete](int r) {
1226 handle_update_non_primary_snapshot(complete, r);
1227 }));
1228 auto aio_comp = create_rados_callback(ctx);
1229 int r = m_state_builder->local_image_ctx->md_ctx.aio_operate(
1230 m_state_builder->local_image_ctx->header_oid, aio_comp, &op);
1231 ceph_assert(r == 0);
1232 aio_comp->release();
1233 }
1234
1235 template <typename I>
1236 void Replayer<I>::handle_update_non_primary_snapshot(bool complete, int r) {
1237 dout(10) << "r=" << r << dendl;
1238
1239 if (r < 0) {
1240 derr << "failed to update local snapshot progress: " << cpp_strerror(r)
1241 << dendl;
1242 if (complete) {
1243 // only fail if this was the final update
1244 handle_replay_complete(r, "failed to update local snapshot progress");
1245 return;
1246 }
1247 }
1248
1249 if (!complete) {
1250 // periodic sync-point update -- do not advance state machine
1251 std::unique_lock locker{m_lock};
1252
1253 ceph_assert(m_updating_sync_point);
1254 m_updating_sync_point = false;
1255 return;
1256 }
1257
1258 notify_image_update();
1259 }
1260
1261 template <typename I>
1262 void Replayer<I>::notify_image_update() {
1263 dout(10) << dendl;
1264
1265 auto ctx = create_context_callback<
1266 Replayer<I>, &Replayer<I>::handle_notify_image_update>(this);
1267 m_state_builder->local_image_ctx->notify_update(ctx);
1268 }
1269
1270 template <typename I>
1271 void Replayer<I>::handle_notify_image_update(int r) {
1272 dout(10) << "r=" << r << dendl;
1273
1274 if (r < 0) {
1275 derr << "failed to notify local image update: " << cpp_strerror(r) << dendl;
1276 }
1277
1278 unlink_peer(m_remote_snap_id_start);
1279 }
1280
1281 template <typename I>
1282 void Replayer<I>::unlink_peer(uint64_t remote_snap_id) {
1283 if (remote_snap_id == 0) {
1284 finish_sync();
1285 return;
1286 }
1287
1288 // local snapshot fully synced -- we no longer depend on the sync
1289 // start snapshot in the remote image
1290 dout(10) << "remote_snap_id=" << remote_snap_id << dendl;
1291
1292 auto ctx = create_context_callback<
1293 Replayer<I>, &Replayer<I>::handle_unlink_peer>(this);
1294 auto req = librbd::mirror::snapshot::UnlinkPeerRequest<I>::create(
1295 m_state_builder->remote_image_ctx, remote_snap_id,
1296 m_remote_mirror_peer_uuid, false, ctx);
1297 req->send();
1298 }
1299
1300 template <typename I>
1301 void Replayer<I>::handle_unlink_peer(int r) {
1302 dout(10) << "r=" << r << dendl;
1303
1304 if (r < 0 && r != -ENOENT) {
1305 derr << "failed to unlink local peer from remote image: " << cpp_strerror(r)
1306 << dendl;
1307 handle_replay_complete(r, "failed to unlink local peer from remote image");
1308 return;
1309 }
1310
1311 finish_sync();
1312 }
1313
1314 template <typename I>
1315 void Replayer<I>::finish_sync() {
1316 dout(10) << dendl;
1317
1318 {
1319 std::unique_lock locker{m_lock};
1320 notify_status_updated();
1321
1322 if (m_sync_in_progress) {
1323 m_sync_in_progress = false;
1324 m_instance_watcher->notify_sync_complete(
1325 m_state_builder->local_image_ctx->id);
1326 }
1327 }
1328
1329 if (is_replay_interrupted()) {
1330 return;
1331 }
1332
1333 load_local_image_meta();
1334 }
1335
1336 template <typename I>
1337 void Replayer<I>::register_local_update_watcher() {
1338 dout(10) << dendl;
1339
1340 m_update_watch_ctx = new C_UpdateWatchCtx(this);
1341
1342 int r = m_state_builder->local_image_ctx->state->register_update_watcher(
1343 m_update_watch_ctx, &m_local_update_watcher_handle);
1344 auto ctx = create_context_callback<
1345 Replayer<I>, &Replayer<I>::handle_register_local_update_watcher>(this);
1346 m_threads->work_queue->queue(ctx, r);
1347 }
1348
1349 template <typename I>
1350 void Replayer<I>::handle_register_local_update_watcher(int r) {
1351 dout(10) << "r=" << r << dendl;
1352
1353 if (r < 0) {
1354 derr << "failed to register local update watcher: " << cpp_strerror(r)
1355 << dendl;
1356 handle_replay_complete(r, "failed to register local image update watcher");
1357 m_state = STATE_COMPLETE;
1358
1359 delete m_update_watch_ctx;
1360 m_update_watch_ctx = nullptr;
1361
1362 Context* on_init = nullptr;
1363 std::swap(on_init, m_on_init_shutdown);
1364 on_init->complete(r);
1365 return;
1366 }
1367
1368 register_remote_update_watcher();
1369 }
1370
1371 template <typename I>
1372 void Replayer<I>::register_remote_update_watcher() {
1373 dout(10) << dendl;
1374
1375 int r = m_state_builder->remote_image_ctx->state->register_update_watcher(
1376 m_update_watch_ctx, &m_remote_update_watcher_handle);
1377 auto ctx = create_context_callback<
1378 Replayer<I>, &Replayer<I>::handle_register_remote_update_watcher>(this);
1379 m_threads->work_queue->queue(ctx, r);
1380 }
1381
1382 template <typename I>
1383 void Replayer<I>::handle_register_remote_update_watcher(int r) {
1384 dout(10) << "r=" << r << dendl;
1385
1386 if (r < 0) {
1387 derr << "failed to register remote update watcher: " << cpp_strerror(r)
1388 << dendl;
1389 handle_replay_complete(r, "failed to register remote image update watcher");
1390 m_state = STATE_COMPLETE;
1391
1392 unregister_local_update_watcher();
1393 return;
1394 }
1395
1396 m_state = STATE_REPLAYING;
1397
1398 Context* on_init = nullptr;
1399 std::swap(on_init, m_on_init_shutdown);
1400 on_init->complete(0);
1401
1402 // delay initial snapshot scan until after we have alerted
1403 // image replayer that we have initialized in case an error
1404 // occurs
1405 {
1406 std::unique_lock locker{m_lock};
1407 notify_status_updated();
1408 }
1409
1410 load_local_image_meta();
1411 }
1412
1413 template <typename I>
1414 void Replayer<I>::unregister_remote_update_watcher() {
1415 dout(10) << dendl;
1416
1417 auto ctx = create_context_callback<
1418 Replayer<I>,
1419 &Replayer<I>::handle_unregister_remote_update_watcher>(this);
1420 m_state_builder->remote_image_ctx->state->unregister_update_watcher(
1421 m_remote_update_watcher_handle, ctx);
1422 }
1423
1424 template <typename I>
1425 void Replayer<I>::handle_unregister_remote_update_watcher(int r) {
1426 dout(10) << "r=" << r << dendl;
1427
1428 if (r < 0) {
1429 derr << "failed to unregister remote update watcher: " << cpp_strerror(r)
1430 << dendl;
1431 }
1432
1433 unregister_local_update_watcher();
1434 }
1435
1436 template <typename I>
1437 void Replayer<I>::unregister_local_update_watcher() {
1438 dout(10) << dendl;
1439
1440 auto ctx = create_context_callback<
1441 Replayer<I>,
1442 &Replayer<I>::handle_unregister_local_update_watcher>(this);
1443 m_state_builder->local_image_ctx->state->unregister_update_watcher(
1444 m_local_update_watcher_handle, ctx);
1445 }
1446
1447 template <typename I>
1448 void Replayer<I>::handle_unregister_local_update_watcher(int r) {
1449 dout(10) << "r=" << r << dendl;
1450
1451 if (r < 0) {
1452 derr << "failed to unregister local update watcher: " << cpp_strerror(r)
1453 << dendl;
1454 }
1455
1456 delete m_update_watch_ctx;
1457 m_update_watch_ctx = nullptr;
1458
1459 wait_for_in_flight_ops();
1460 }
1461
1462 template <typename I>
1463 void Replayer<I>::wait_for_in_flight_ops() {
1464 dout(10) << dendl;
1465
1466 auto ctx = create_async_context_callback(
1467 m_threads->work_queue, create_context_callback<
1468 Replayer<I>, &Replayer<I>::handle_wait_for_in_flight_ops>(this));
1469 m_in_flight_op_tracker.wait_for_ops(ctx);
1470 }
1471
1472 template <typename I>
1473 void Replayer<I>::handle_wait_for_in_flight_ops(int r) {
1474 dout(10) << "r=" << r << dendl;
1475
1476 Context* on_shutdown = nullptr;
1477 {
1478 std::unique_lock locker{m_lock};
1479 ceph_assert(m_on_init_shutdown != nullptr);
1480 std::swap(on_shutdown, m_on_init_shutdown);
1481 }
1482 on_shutdown->complete(m_error_code);
1483 }
1484
1485 template <typename I>
1486 void Replayer<I>::handle_image_update_notify() {
1487 dout(10) << dendl;
1488
1489 std::unique_lock locker{m_lock};
1490 if (m_state == STATE_REPLAYING) {
1491 dout(15) << "flagging snapshot rescan required" << dendl;
1492 m_image_updated = true;
1493 } else if (m_state == STATE_IDLE) {
1494 m_state = STATE_REPLAYING;
1495 locker.unlock();
1496
1497 dout(15) << "restarting idle replayer" << dendl;
1498 load_local_image_meta();
1499 }
1500 }
1501
1502 template <typename I>
1503 void Replayer<I>::handle_replay_complete(int r,
1504 const std::string& description) {
1505 std::unique_lock locker{m_lock};
1506 handle_replay_complete(&locker, r, description);
1507 }
1508
1509 template <typename I>
1510 void Replayer<I>::handle_replay_complete(std::unique_lock<ceph::mutex>* locker,
1511 int r,
1512 const std::string& description) {
1513 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1514
1515 if (m_sync_in_progress) {
1516 m_sync_in_progress = false;
1517 m_instance_watcher->notify_sync_complete(
1518 m_state_builder->local_image_ctx->id);
1519 }
1520
1521 // don't set error code and description if resuming a pending
1522 // shutdown
1523 if (is_replay_interrupted(locker)) {
1524 return;
1525 }
1526
1527 if (m_error_code == 0) {
1528 m_error_code = r;
1529 m_error_description = description;
1530 }
1531
1532 if (m_state != STATE_REPLAYING && m_state != STATE_IDLE) {
1533 return;
1534 }
1535
1536 m_state = STATE_COMPLETE;
1537 notify_status_updated();
1538 }
1539
1540 template <typename I>
1541 void Replayer<I>::notify_status_updated() {
1542 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1543
1544 dout(10) << dendl;
1545 auto ctx = new C_TrackedOp(m_in_flight_op_tracker, new LambdaContext(
1546 [this](int) {
1547 m_replayer_listener->handle_notification();
1548 }));
1549 m_threads->work_queue->queue(ctx, 0);
1550 }
1551
1552 template <typename I>
1553 bool Replayer<I>::is_replay_interrupted() {
1554 std::unique_lock locker{m_lock};
1555 return is_replay_interrupted(&locker);
1556 }
1557
1558 template <typename I>
1559 bool Replayer<I>::is_replay_interrupted(std::unique_lock<ceph::mutex>* locker) {
1560 if (m_state == STATE_COMPLETE) {
1561 locker->unlock();
1562
1563 dout(10) << "resuming pending shut down" << dendl;
1564 unregister_remote_update_watcher();
1565 return true;
1566 }
1567 return false;
1568 }
1569
1570 template <typename I>
1571 void Replayer<I>::register_perf_counters() {
1572 dout(5) << dendl;
1573
1574 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1575 ceph_assert(m_perf_counters == nullptr);
1576
1577 auto cct = static_cast<CephContext *>(m_state_builder->local_image_ctx->cct);
1578 auto prio = cct->_conf.get_val<int64_t>("rbd_mirror_image_perf_stats_prio");
1579
1580 auto local_image_ctx = m_state_builder->local_image_ctx;
1581 std::string labels = ceph::perf_counters::key_create(
1582 "rbd_mirror_snapshot_image",
1583 {{"pool", local_image_ctx->md_ctx.get_pool_name()},
1584 {"namespace", local_image_ctx->md_ctx.get_namespace()},
1585 {"image", local_image_ctx->name}});
1586
1587 PerfCountersBuilder plb(g_ceph_context, labels, l_rbd_mirror_snapshot_first,
1588 l_rbd_mirror_snapshot_last);
1589 plb.add_u64_counter(l_rbd_mirror_snapshot_snapshots, "snapshots",
1590 "Number of snapshots synced", nullptr, prio);
1591 plb.add_time_avg(l_rbd_mirror_snapshot_sync_time, "sync_time",
1592 "Average sync time", nullptr, prio);
1593 plb.add_u64_counter(l_rbd_mirror_snapshot_sync_bytes, "sync_bytes",
1594 "Total bytes synced", nullptr, prio, unit_t(UNIT_BYTES));
1595 plb.add_time(l_rbd_mirror_snapshot_remote_timestamp, "remote_timestamp",
1596 "Timestamp of the remote snapshot", nullptr, prio);
1597 plb.add_time(l_rbd_mirror_snapshot_local_timestamp, "local_timestamp",
1598 "Timestamp of the local snapshot", nullptr, prio);
1599 plb.add_time(l_rbd_mirror_snapshot_last_sync_time, "last_sync_time",
1600 "Time taken to sync the last snapshot", nullptr, prio);
1601 plb.add_u64(l_rbd_mirror_snapshot_last_sync_bytes, "last_sync_bytes",
1602 "Bytes synced for the last snapshot", nullptr, prio,
1603 unit_t(UNIT_BYTES));
1604
1605 m_perf_counters = plb.create_perf_counters();
1606 g_ceph_context->get_perfcounters_collection()->add(m_perf_counters);
1607 }
1608
1609 template <typename I>
1610 void Replayer<I>::unregister_perf_counters() {
1611 dout(5) << dendl;
1612 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1613
1614 PerfCounters *perf_counters = nullptr;
1615 std::swap(perf_counters, m_perf_counters);
1616
1617 if (perf_counters != nullptr) {
1618 g_ceph_context->get_perfcounters_collection()->remove(perf_counters);
1619 delete perf_counters;
1620 }
1621 }
1622
1623 } // namespace snapshot
1624 } // namespace image_replayer
1625 } // namespace mirror
1626 } // namespace rbd
1627
1628 template class rbd::mirror::image_replayer::snapshot::Replayer<librbd::ImageCtx>;