]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/image_replayer/snapshot/Replayer.cc
4a44a57bc223534ce51ffacb4f9c2c06b041bee0
[ceph.git] / ceph / src / tools / rbd_mirror / image_replayer / snapshot / Replayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "Replayer.h"
5 #include "common/debug.h"
6 #include "common/errno.h"
7 #include "include/stringify.h"
8 #include "common/Timer.h"
9 #include "cls/rbd/cls_rbd_client.h"
10 #include "json_spirit/json_spirit.h"
11 #include "librbd/ImageCtx.h"
12 #include "librbd/ImageState.h"
13 #include "librbd/Operations.h"
14 #include "librbd/Utils.h"
15 #include "librbd/asio/ContextWQ.h"
16 #include "librbd/deep_copy/Handler.h"
17 #include "librbd/deep_copy/ImageCopyRequest.h"
18 #include "librbd/deep_copy/SnapshotCopyRequest.h"
19 #include "librbd/mirror/ImageStateUpdateRequest.h"
20 #include "librbd/mirror/snapshot/CreateNonPrimaryRequest.h"
21 #include "librbd/mirror/snapshot/GetImageStateRequest.h"
22 #include "librbd/mirror/snapshot/ImageMeta.h"
23 #include "librbd/mirror/snapshot/UnlinkPeerRequest.h"
24 #include "tools/rbd_mirror/InstanceWatcher.h"
25 #include "tools/rbd_mirror/PoolMetaCache.h"
26 #include "tools/rbd_mirror/Threads.h"
27 #include "tools/rbd_mirror/Types.h"
28 #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
29 #include "tools/rbd_mirror/image_replayer/ReplayerListener.h"
30 #include "tools/rbd_mirror/image_replayer/Utils.h"
31 #include "tools/rbd_mirror/image_replayer/snapshot/ApplyImageStateRequest.h"
32 #include "tools/rbd_mirror/image_replayer/snapshot/StateBuilder.h"
33 #include "tools/rbd_mirror/image_replayer/snapshot/Utils.h"
34 #include <set>
35
36 #define dout_context g_ceph_context
37 #define dout_subsys ceph_subsys_rbd_mirror
38 #undef dout_prefix
39 #define dout_prefix *_dout << "rbd::mirror::image_replayer::snapshot::" \
40 << "Replayer: " << this << " " << __func__ << ": "
41
42 extern PerfCounters *g_snapshot_perf_counters;
43
44 namespace rbd {
45 namespace mirror {
46 namespace image_replayer {
47 namespace snapshot {
48
49 namespace {
50
51 double round_to_two_places(double value) {
52 return abs(round(value * 100) / 100);
53 }
54
55 template<typename I>
56 std::pair<uint64_t, librbd::SnapInfo*> get_newest_mirror_snapshot(
57 I* image_ctx) {
58 for (auto snap_info_it = image_ctx->snap_info.rbegin();
59 snap_info_it != image_ctx->snap_info.rend(); ++snap_info_it) {
60 const auto& snap_ns = snap_info_it->second.snap_namespace;
61 auto mirror_ns = boost::get<
62 cls::rbd::MirrorSnapshotNamespace>(&snap_ns);
63 if (mirror_ns == nullptr || !mirror_ns->complete) {
64 continue;
65 }
66
67 return {snap_info_it->first, &snap_info_it->second};
68 }
69
70 return {CEPH_NOSNAP, nullptr};
71 }
72
73 } // anonymous namespace
74
75 using librbd::util::create_async_context_callback;
76 using librbd::util::create_context_callback;
77 using librbd::util::create_rados_callback;
78
79 template <typename I>
80 struct Replayer<I>::C_UpdateWatchCtx : public librbd::UpdateWatchCtx {
81 Replayer<I>* replayer;
82
83 C_UpdateWatchCtx(Replayer<I>* replayer) : replayer(replayer) {
84 }
85
86 void handle_notify() override {
87 replayer->handle_image_update_notify();
88 }
89 };
90
91 template <typename I>
92 struct Replayer<I>::DeepCopyHandler : public librbd::deep_copy::Handler {
93 Replayer *replayer;
94
95 DeepCopyHandler(Replayer* replayer) : replayer(replayer) {
96 }
97
98 void handle_read(uint64_t bytes_read) override {
99 replayer->handle_copy_image_read(bytes_read);
100 }
101
102 int update_progress(uint64_t object_number, uint64_t object_count) override {
103 replayer->handle_copy_image_progress(object_number, object_count);
104 return 0;
105 }
106 };
107
108 template <typename I>
109 Replayer<I>::Replayer(
110 Threads<I>* threads,
111 InstanceWatcher<I>* instance_watcher,
112 const std::string& local_mirror_uuid,
113 PoolMetaCache* pool_meta_cache,
114 StateBuilder<I>* state_builder,
115 ReplayerListener* replayer_listener)
116 : m_threads(threads),
117 m_instance_watcher(instance_watcher),
118 m_local_mirror_uuid(local_mirror_uuid),
119 m_pool_meta_cache(pool_meta_cache),
120 m_state_builder(state_builder),
121 m_replayer_listener(replayer_listener),
122 m_lock(ceph::make_mutex(librbd::util::unique_lock_name(
123 "rbd::mirror::image_replayer::snapshot::Replayer", this))) {
124 dout(10) << dendl;
125 }
126
127 template <typename I>
128 Replayer<I>::~Replayer() {
129 dout(10) << dendl;
130
131 {
132 std::unique_lock locker{m_lock};
133 unregister_perf_counters();
134 }
135
136 ceph_assert(m_state == STATE_COMPLETE);
137 ceph_assert(m_update_watch_ctx == nullptr);
138 ceph_assert(m_deep_copy_handler == nullptr);
139 }
140
141 template <typename I>
142 void Replayer<I>::init(Context* on_finish) {
143 dout(10) << dendl;
144
145 ceph_assert(m_state == STATE_INIT);
146
147 RemotePoolMeta remote_pool_meta;
148 int r = m_pool_meta_cache->get_remote_pool_meta(
149 m_state_builder->remote_image_ctx->md_ctx.get_id(), &remote_pool_meta);
150 if (r < 0 || remote_pool_meta.mirror_peer_uuid.empty()) {
151 derr << "failed to retrieve mirror peer uuid from remote pool" << dendl;
152 m_state = STATE_COMPLETE;
153 m_threads->work_queue->queue(on_finish, r);
154 return;
155 }
156
157 m_remote_mirror_peer_uuid = remote_pool_meta.mirror_peer_uuid;
158 dout(10) << "remote_mirror_peer_uuid=" << m_remote_mirror_peer_uuid << dendl;
159
160 {
161 auto local_image_ctx = m_state_builder->local_image_ctx;
162 std::shared_lock image_locker{local_image_ctx->image_lock};
163 m_image_spec = image_replayer::util::compute_image_spec(
164 local_image_ctx->md_ctx, local_image_ctx->name);
165 }
166
167 {
168 std::unique_lock locker{m_lock};
169 register_perf_counters();
170 }
171
172 ceph_assert(m_on_init_shutdown == nullptr);
173 m_on_init_shutdown = on_finish;
174
175 register_local_update_watcher();
176 }
177
178 template <typename I>
179 void Replayer<I>::shut_down(Context* on_finish) {
180 dout(10) << dendl;
181
182 std::unique_lock locker{m_lock};
183 ceph_assert(m_on_init_shutdown == nullptr);
184 m_on_init_shutdown = on_finish;
185 m_error_code = 0;
186 m_error_description = "";
187
188 ceph_assert(m_state != STATE_INIT);
189 auto state = STATE_COMPLETE;
190 std::swap(m_state, state);
191
192 if (state == STATE_REPLAYING) {
193 // if a sync request was pending, request a cancelation
194 m_instance_watcher->cancel_sync_request(
195 m_state_builder->local_image_ctx->id);
196
197 // TODO interrupt snapshot copy and image copy state machines even if remote
198 // cluster is unreachable
199 dout(10) << "shut down pending on completion of snapshot replay" << dendl;
200 return;
201 }
202 locker.unlock();
203
204 unregister_remote_update_watcher();
205 }
206
207 template <typename I>
208 void Replayer<I>::flush(Context* on_finish) {
209 dout(10) << dendl;
210
211 // TODO
212 m_threads->work_queue->queue(on_finish, 0);
213 }
214
215 template <typename I>
216 bool Replayer<I>::get_replay_status(std::string* description,
217 Context* on_finish) {
218 dout(10) << dendl;
219
220 std::unique_lock locker{m_lock};
221 if (m_state != STATE_REPLAYING && m_state != STATE_IDLE) {
222 locker.unlock();
223
224 derr << "replay not running" << dendl;
225 on_finish->complete(-EAGAIN);
226 return false;
227 }
228
229 std::shared_lock local_image_locker{
230 m_state_builder->local_image_ctx->image_lock};
231 auto [local_snap_id, local_snap_info] = get_newest_mirror_snapshot(
232 m_state_builder->local_image_ctx);
233
234 std::shared_lock remote_image_locker{
235 m_state_builder->remote_image_ctx->image_lock};
236 auto [remote_snap_id, remote_snap_info] = get_newest_mirror_snapshot(
237 m_state_builder->remote_image_ctx);
238
239 if (remote_snap_info == nullptr) {
240 remote_image_locker.unlock();
241 local_image_locker.unlock();
242 locker.unlock();
243
244 derr << "remote image does not contain mirror snapshots" << dendl;
245 on_finish->complete(-EAGAIN);
246 return false;
247 }
248
249 std::string replay_state = "idle";
250 if (m_remote_snap_id_end != CEPH_NOSNAP) {
251 replay_state = "syncing";
252 }
253
254 json_spirit::mObject root_obj;
255 root_obj["replay_state"] = replay_state;
256 root_obj["remote_snapshot_timestamp"] = remote_snap_info->timestamp.sec();
257
258 auto matching_remote_snap_id = util::compute_remote_snap_id(
259 m_state_builder->local_image_ctx->image_lock,
260 m_state_builder->local_image_ctx->snap_info,
261 local_snap_id, m_state_builder->remote_mirror_uuid);
262 auto matching_remote_snap_it =
263 m_state_builder->remote_image_ctx->snap_info.find(matching_remote_snap_id);
264 if (matching_remote_snap_id != CEPH_NOSNAP &&
265 matching_remote_snap_it !=
266 m_state_builder->remote_image_ctx->snap_info.end()) {
267 // use the timestamp from the matching remote image since
268 // the local snapshot would just be the time the snapshot was
269 // synced and not the consistency point in time.
270 root_obj["local_snapshot_timestamp"] =
271 matching_remote_snap_it->second.timestamp.sec();
272 }
273
274 matching_remote_snap_it = m_state_builder->remote_image_ctx->snap_info.find(
275 m_remote_snap_id_end);
276 if (m_remote_snap_id_end != CEPH_NOSNAP &&
277 matching_remote_snap_it !=
278 m_state_builder->remote_image_ctx->snap_info.end()) {
279 root_obj["syncing_snapshot_timestamp"] = remote_snap_info->timestamp.sec();
280 root_obj["syncing_percent"] = static_cast<uint64_t>(
281 100 * m_local_mirror_snap_ns.last_copied_object_number /
282 static_cast<float>(std::max<uint64_t>(1U, m_local_object_count)));
283 }
284
285 m_bytes_per_second(0);
286 auto bytes_per_second = m_bytes_per_second.get_average();
287 root_obj["bytes_per_second"] = round_to_two_places(bytes_per_second);
288
289 auto bytes_per_snapshot = boost::accumulators::rolling_mean(
290 m_bytes_per_snapshot);
291 root_obj["bytes_per_snapshot"] = round_to_two_places(bytes_per_snapshot);
292
293 auto pending_bytes = bytes_per_snapshot * m_pending_snapshots;
294 if (bytes_per_second > 0 && m_pending_snapshots > 0) {
295 std::uint64_t seconds_until_synced = round_to_two_places(
296 pending_bytes / bytes_per_second);
297 if (seconds_until_synced >= std::numeric_limits<uint64_t>::max()) {
298 seconds_until_synced = std::numeric_limits<uint64_t>::max();
299 }
300
301 root_obj["seconds_until_synced"] = seconds_until_synced;
302 }
303
304 *description = json_spirit::write(
305 root_obj, json_spirit::remove_trailing_zeros);
306
307 local_image_locker.unlock();
308 remote_image_locker.unlock();
309 locker.unlock();
310 on_finish->complete(-EEXIST);
311 return true;
312 }
313
314 template <typename I>
315 void Replayer<I>::load_local_image_meta() {
316 dout(10) << dendl;
317
318 {
319 // reset state in case new snapshot is added while we are scanning
320 std::unique_lock locker{m_lock};
321 m_image_updated = false;
322 }
323
324 bool update_status = false;
325 {
326 auto local_image_ctx = m_state_builder->local_image_ctx;
327 std::shared_lock image_locker{local_image_ctx->image_lock};
328 auto image_spec = image_replayer::util::compute_image_spec(
329 local_image_ctx->md_ctx, local_image_ctx->name);
330 if (m_image_spec != image_spec) {
331 m_image_spec = image_spec;
332 update_status = true;
333 }
334 }
335 if (update_status) {
336 std::unique_lock locker{m_lock};
337 unregister_perf_counters();
338 register_perf_counters();
339 notify_status_updated();
340 }
341
342 ceph_assert(m_state_builder->local_image_meta != nullptr);
343 auto ctx = create_context_callback<
344 Replayer<I>, &Replayer<I>::handle_load_local_image_meta>(this);
345 m_state_builder->local_image_meta->load(ctx);
346 }
347
348 template <typename I>
349 void Replayer<I>::handle_load_local_image_meta(int r) {
350 dout(10) << "r=" << r << dendl;
351
352 if (r < 0 && r != -ENOENT) {
353 derr << "failed to load local image-meta: " << cpp_strerror(r) << dendl;
354 handle_replay_complete(r, "failed to load local image-meta");
355 return;
356 }
357
358 if (r >= 0 && m_state_builder->local_image_meta->resync_requested) {
359 m_resync_requested = true;
360
361 dout(10) << "local image resync requested" << dendl;
362 handle_replay_complete(0, "resync requested");
363 return;
364 }
365
366 refresh_local_image();
367 }
368
369 template <typename I>
370 void Replayer<I>::refresh_local_image() {
371 if (!m_state_builder->local_image_ctx->state->is_refresh_required()) {
372 refresh_remote_image();
373 return;
374 }
375
376 dout(10) << dendl;
377 auto ctx = create_context_callback<
378 Replayer<I>, &Replayer<I>::handle_refresh_local_image>(this);
379 m_state_builder->local_image_ctx->state->refresh(ctx);
380 }
381
382 template <typename I>
383 void Replayer<I>::handle_refresh_local_image(int r) {
384 dout(10) << "r=" << r << dendl;
385
386 if (r < 0) {
387 derr << "failed to refresh local image: " << cpp_strerror(r) << dendl;
388 handle_replay_complete(r, "failed to refresh local image");
389 return;
390 }
391
392 refresh_remote_image();
393 }
394
395 template <typename I>
396 void Replayer<I>::refresh_remote_image() {
397 if (!m_state_builder->remote_image_ctx->state->is_refresh_required()) {
398 std::unique_lock locker{m_lock};
399 scan_local_mirror_snapshots(&locker);
400 return;
401 }
402
403 dout(10) << dendl;
404 auto ctx = create_context_callback<
405 Replayer<I>, &Replayer<I>::handle_refresh_remote_image>(this);
406 m_state_builder->remote_image_ctx->state->refresh(ctx);
407 }
408
409 template <typename I>
410 void Replayer<I>::handle_refresh_remote_image(int r) {
411 dout(10) << "r=" << r << dendl;
412
413 if (r < 0) {
414 derr << "failed to refresh remote image: " << cpp_strerror(r) << dendl;
415 handle_replay_complete(r, "failed to refresh remote image");
416 return;
417 }
418
419 std::unique_lock locker{m_lock};
420 scan_local_mirror_snapshots(&locker);
421 }
422
423 template <typename I>
424 void Replayer<I>::scan_local_mirror_snapshots(
425 std::unique_lock<ceph::mutex>* locker) {
426 if (is_replay_interrupted(locker)) {
427 return;
428 }
429
430 dout(10) << dendl;
431
432 m_local_snap_id_start = 0;
433 m_local_snap_id_end = CEPH_NOSNAP;
434 m_local_mirror_snap_ns = {};
435 m_local_object_count = 0;
436
437 m_remote_snap_id_start = 0;
438 m_remote_snap_id_end = CEPH_NOSNAP;
439 m_remote_mirror_snap_ns = {};
440
441 std::set<uint64_t> prune_snap_ids;
442
443 auto local_image_ctx = m_state_builder->local_image_ctx;
444 std::shared_lock image_locker{local_image_ctx->image_lock};
445 for (auto snap_info_it = local_image_ctx->snap_info.begin();
446 snap_info_it != local_image_ctx->snap_info.end(); ++snap_info_it) {
447 const auto& snap_ns = snap_info_it->second.snap_namespace;
448 auto mirror_ns = boost::get<
449 cls::rbd::MirrorSnapshotNamespace>(&snap_ns);
450 if (mirror_ns == nullptr) {
451 continue;
452 }
453
454 dout(15) << "local mirror snapshot: id=" << snap_info_it->first << ", "
455 << "mirror_ns=" << *mirror_ns << dendl;
456 m_local_mirror_snap_ns = *mirror_ns;
457
458 auto local_snap_id = snap_info_it->first;
459 if (mirror_ns->is_non_primary()) {
460 if (mirror_ns->complete) {
461 // if remote has new snapshots, we would sync from here
462 m_local_snap_id_start = local_snap_id;
463 ceph_assert(m_local_snap_id_end == CEPH_NOSNAP);
464
465 if (mirror_ns->mirror_peer_uuids.empty()) {
466 // no other peer will attempt to sync to this snapshot so store as
467 // a candidate for removal
468 prune_snap_ids.insert(local_snap_id);
469 }
470 } else if (mirror_ns->last_copied_object_number == 0 &&
471 m_local_snap_id_start > 0) {
472 // snapshot might be missing image state, object-map, etc, so just
473 // delete and re-create it if we haven't started copying data
474 // objects. Also only prune this snapshot since we will need the
475 // previous mirror snapshot for syncing. Special case exception for
476 // the first non-primary snapshot since we know its snapshot is
477 // well-formed because otherwise the mirror-image-state would have
478 // forced an image deletion.
479 prune_snap_ids.clear();
480 prune_snap_ids.insert(local_snap_id);
481 break;
482 } else {
483 // start snap will be last complete mirror snapshot or initial
484 // image revision
485 m_local_snap_id_end = local_snap_id;
486 break;
487 }
488 } else if (mirror_ns->is_primary()) {
489 if (mirror_ns->complete) {
490 m_local_snap_id_start = local_snap_id;
491 ceph_assert(m_local_snap_id_end == CEPH_NOSNAP);
492 } else {
493 derr << "incomplete local primary snapshot" << dendl;
494 handle_replay_complete(locker, -EINVAL,
495 "incomplete local primary snapshot");
496 return;
497 }
498 } else {
499 derr << "unknown local mirror snapshot state" << dendl;
500 handle_replay_complete(locker, -EINVAL,
501 "invalid local mirror snapshot state");
502 return;
503 }
504 }
505 image_locker.unlock();
506
507 if (m_local_snap_id_start > 0) {
508 // remove candidate that is required for delta snapshot sync
509 prune_snap_ids.erase(m_local_snap_id_start);
510 }
511 if (!prune_snap_ids.empty()) {
512 locker->unlock();
513
514 auto prune_snap_id = *prune_snap_ids.begin();
515 dout(5) << "pruning unused non-primary snapshot " << prune_snap_id << dendl;
516 prune_non_primary_snapshot(prune_snap_id);
517 return;
518 }
519
520 if (m_local_snap_id_start > 0 || m_local_snap_id_end != CEPH_NOSNAP) {
521 if (m_local_mirror_snap_ns.is_non_primary() &&
522 m_local_mirror_snap_ns.primary_mirror_uuid !=
523 m_state_builder->remote_mirror_uuid) {
524 // TODO support multiple peers
525 derr << "local image linked to unknown peer: "
526 << m_local_mirror_snap_ns.primary_mirror_uuid << dendl;
527 handle_replay_complete(locker, -EEXIST,
528 "local image linked to unknown peer");
529 return;
530 } else if (m_local_mirror_snap_ns.state ==
531 cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY) {
532 dout(5) << "local image promoted" << dendl;
533 handle_replay_complete(locker, 0, "force promoted");
534 return;
535 }
536
537 dout(10) << "found local mirror snapshot: "
538 << "local_snap_id_start=" << m_local_snap_id_start << ", "
539 << "local_snap_id_end=" << m_local_snap_id_end << ", "
540 << "local_snap_ns=" << m_local_mirror_snap_ns << dendl;
541 if (!m_local_mirror_snap_ns.is_primary() &&
542 m_local_mirror_snap_ns.complete) {
543 // our remote sync should start after this completed snapshot
544 m_remote_snap_id_start = m_local_mirror_snap_ns.primary_snap_id;
545 }
546 }
547
548 // we don't have any mirror snapshots or only completed non-primary
549 // mirror snapshots
550 scan_remote_mirror_snapshots(locker);
551 }
552
553 template <typename I>
554 void Replayer<I>::scan_remote_mirror_snapshots(
555 std::unique_lock<ceph::mutex>* locker) {
556 dout(10) << dendl;
557
558 m_pending_snapshots = 0;
559
560 std::set<uint64_t> unlink_snap_ids;
561 bool split_brain = false;
562 bool remote_demoted = false;
563 auto remote_image_ctx = m_state_builder->remote_image_ctx;
564 std::shared_lock image_locker{remote_image_ctx->image_lock};
565 for (auto snap_info_it = remote_image_ctx->snap_info.begin();
566 snap_info_it != remote_image_ctx->snap_info.end(); ++snap_info_it) {
567 const auto& snap_ns = snap_info_it->second.snap_namespace;
568 auto mirror_ns = boost::get<
569 cls::rbd::MirrorSnapshotNamespace>(&snap_ns);
570 if (mirror_ns == nullptr) {
571 continue;
572 }
573
574 dout(15) << "remote mirror snapshot: id=" << snap_info_it->first << ", "
575 << "mirror_ns=" << *mirror_ns << dendl;
576 remote_demoted = mirror_ns->is_demoted();
577 if (!mirror_ns->is_primary() && !mirror_ns->is_non_primary()) {
578 derr << "unknown remote mirror snapshot state" << dendl;
579 handle_replay_complete(locker, -EINVAL,
580 "invalid remote mirror snapshot state");
581 return;
582 } else if (mirror_ns->mirror_peer_uuids.count(m_remote_mirror_peer_uuid) ==
583 0) {
584 dout(15) << "skipping remote snapshot due to missing mirror peer"
585 << dendl;
586 continue;
587 }
588
589 auto remote_snap_id = snap_info_it->first;
590 if (m_local_snap_id_start > 0 || m_local_snap_id_end != CEPH_NOSNAP) {
591 // we have a local mirror snapshot
592 if (m_local_mirror_snap_ns.is_non_primary()) {
593 // previously validated that it was linked to remote
594 ceph_assert(m_local_mirror_snap_ns.primary_mirror_uuid ==
595 m_state_builder->remote_mirror_uuid);
596
597 if (m_remote_snap_id_end == CEPH_NOSNAP) {
598 // haven't found the end snap so treat this as a candidate for unlink
599 unlink_snap_ids.insert(remote_snap_id);
600 }
601 if (m_local_mirror_snap_ns.complete &&
602 m_local_mirror_snap_ns.primary_snap_id >= remote_snap_id) {
603 // skip past completed remote snapshot
604 m_remote_snap_id_start = remote_snap_id;
605 m_remote_mirror_snap_ns = *mirror_ns;
606 dout(15) << "skipping synced remote snapshot " << remote_snap_id
607 << dendl;
608 continue;
609 } else if (!m_local_mirror_snap_ns.complete &&
610 m_local_mirror_snap_ns.primary_snap_id > remote_snap_id) {
611 // skip until we get to the in-progress remote snapshot
612 dout(15) << "skipping synced remote snapshot " << remote_snap_id
613 << " while search for in-progress sync" << dendl;
614 m_remote_snap_id_start = remote_snap_id;
615 m_remote_mirror_snap_ns = *mirror_ns;
616 continue;
617 }
618 } else if (m_local_mirror_snap_ns.state ==
619 cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY_DEMOTED) {
620 // find the matching demotion snapshot in remote image
621 ceph_assert(m_local_snap_id_start > 0);
622 if (mirror_ns->state ==
623 cls::rbd::MIRROR_SNAPSHOT_STATE_NON_PRIMARY_DEMOTED &&
624 mirror_ns->primary_mirror_uuid == m_local_mirror_uuid &&
625 mirror_ns->primary_snap_id == m_local_snap_id_start) {
626 dout(10) << "located matching demotion snapshot: "
627 << "remote_snap_id=" << remote_snap_id << ", "
628 << "local_snap_id=" << m_local_snap_id_start << dendl;
629 m_remote_snap_id_start = remote_snap_id;
630 split_brain = false;
631 continue;
632 } else if (m_remote_snap_id_start == 0) {
633 // still looking for our matching demotion snapshot
634 dout(15) << "skipping remote snapshot " << remote_snap_id << " "
635 << "while searching for demotion" << dendl;
636 split_brain = true;
637 continue;
638 }
639 } else {
640 // should not have been able to reach this
641 ceph_assert(false);
642 }
643 } else if (!mirror_ns->is_primary()) {
644 dout(15) << "skipping non-primary remote snapshot" << dendl;
645 continue;
646 }
647
648 // found candidate snapshot to sync
649 ++m_pending_snapshots;
650 if (m_remote_snap_id_end != CEPH_NOSNAP) {
651 continue;
652 }
653
654 // first primary snapshot where were are listed as a peer
655 m_remote_snap_id_end = remote_snap_id;
656 m_remote_mirror_snap_ns = *mirror_ns;
657 }
658
659 if (m_remote_snap_id_start != 0 &&
660 remote_image_ctx->snap_info.count(m_remote_snap_id_start) == 0) {
661 // the remote start snapshot was deleted out from under us
662 derr << "failed to locate remote start snapshot: "
663 << "snap_id=" << m_remote_snap_id_start << dendl;
664 split_brain = true;
665 }
666
667 image_locker.unlock();
668
669 if (!split_brain) {
670 unlink_snap_ids.erase(m_remote_snap_id_start);
671 unlink_snap_ids.erase(m_remote_snap_id_end);
672 if (!unlink_snap_ids.empty()) {
673 locker->unlock();
674
675 // retry the unlinking process for a remote snapshot that we do not
676 // need anymore
677 auto remote_snap_id = *unlink_snap_ids.begin();
678 dout(10) << "unlinking from remote snapshot " << remote_snap_id << dendl;
679 unlink_peer(remote_snap_id);
680 return;
681 }
682
683 if (m_remote_snap_id_end != CEPH_NOSNAP) {
684 dout(10) << "found remote mirror snapshot: "
685 << "remote_snap_id_start=" << m_remote_snap_id_start << ", "
686 << "remote_snap_id_end=" << m_remote_snap_id_end << ", "
687 << "remote_snap_ns=" << m_remote_mirror_snap_ns << dendl;
688 if (m_remote_mirror_snap_ns.complete) {
689 locker->unlock();
690
691 if (m_local_snap_id_end != CEPH_NOSNAP &&
692 !m_local_mirror_snap_ns.complete) {
693 // attempt to resume image-sync
694 dout(10) << "local image contains in-progress mirror snapshot"
695 << dendl;
696 get_local_image_state();
697 } else {
698 copy_snapshots();
699 }
700 return;
701 } else {
702 // might have raced with the creation of a remote mirror snapshot
703 // so we will need to refresh and rescan once it completes
704 dout(15) << "remote mirror snapshot not complete" << dendl;
705 }
706 }
707 }
708
709 if (m_image_updated) {
710 // received update notification while scanning image, restart ...
711 m_image_updated = false;
712 locker->unlock();
713
714 dout(10) << "restarting snapshot scan due to remote update notification"
715 << dendl;
716 load_local_image_meta();
717 return;
718 }
719
720 if (is_replay_interrupted(locker)) {
721 return;
722 } else if (split_brain) {
723 derr << "split-brain detected: failed to find matching non-primary "
724 << "snapshot in remote image: "
725 << "local_snap_id_start=" << m_local_snap_id_start << ", "
726 << "local_snap_ns=" << m_local_mirror_snap_ns << dendl;
727 handle_replay_complete(locker, -EEXIST, "split-brain");
728 return;
729 } else if (remote_demoted) {
730 dout(10) << "remote image demoted" << dendl;
731 handle_replay_complete(locker, -EREMOTEIO, "remote image demoted");
732 return;
733 }
734
735 dout(10) << "all remote snapshots synced: idling waiting for new snapshot"
736 << dendl;
737 ceph_assert(m_state == STATE_REPLAYING);
738 m_state = STATE_IDLE;
739
740 notify_status_updated();
741 }
742
743 template <typename I>
744 void Replayer<I>::prune_non_primary_snapshot(uint64_t snap_id) {
745 dout(10) << "snap_id=" << snap_id << dendl;
746
747 auto local_image_ctx = m_state_builder->local_image_ctx;
748 bool snap_valid = false;
749 cls::rbd::SnapshotNamespace snap_namespace;
750 std::string snap_name;
751
752 {
753 std::shared_lock image_locker{local_image_ctx->image_lock};
754 auto snap_info = local_image_ctx->get_snap_info(snap_id);
755 if (snap_info != nullptr) {
756 snap_valid = true;
757 snap_namespace = snap_info->snap_namespace;
758 snap_name = snap_info->name;
759
760 ceph_assert(boost::get<cls::rbd::MirrorSnapshotNamespace>(
761 &snap_namespace) != nullptr);
762 }
763 }
764
765 if (!snap_valid) {
766 load_local_image_meta();
767 return;
768 }
769
770 auto ctx = create_context_callback<
771 Replayer<I>, &Replayer<I>::handle_prune_non_primary_snapshot>(this);
772 local_image_ctx->operations->snap_remove(snap_namespace, snap_name, ctx);
773 }
774
775 template <typename I>
776 void Replayer<I>::handle_prune_non_primary_snapshot(int r) {
777 dout(10) << "r=" << r << dendl;
778
779 if (r < 0 && r != -ENOENT) {
780 derr << "failed to prune non-primary snapshot: " << cpp_strerror(r)
781 << dendl;
782 handle_replay_complete(r, "failed to prune non-primary snapshot");
783 return;
784 }
785
786 if (is_replay_interrupted()) {
787 return;
788 }
789
790 load_local_image_meta();
791 }
792
793 template <typename I>
794 void Replayer<I>::copy_snapshots() {
795 dout(10) << "remote_snap_id_start=" << m_remote_snap_id_start << ", "
796 << "remote_snap_id_end=" << m_remote_snap_id_end << ", "
797 << "local_snap_id_start=" << m_local_snap_id_start << dendl;
798
799 ceph_assert(m_remote_snap_id_start != CEPH_NOSNAP);
800 ceph_assert(m_remote_snap_id_end > 0 &&
801 m_remote_snap_id_end != CEPH_NOSNAP);
802 ceph_assert(m_local_snap_id_start != CEPH_NOSNAP);
803
804 m_local_mirror_snap_ns = {};
805 auto ctx = create_context_callback<
806 Replayer<I>, &Replayer<I>::handle_copy_snapshots>(this);
807 auto req = librbd::deep_copy::SnapshotCopyRequest<I>::create(
808 m_state_builder->remote_image_ctx, m_state_builder->local_image_ctx,
809 m_remote_snap_id_start, m_remote_snap_id_end, m_local_snap_id_start,
810 false, m_threads->work_queue, &m_local_mirror_snap_ns.snap_seqs,
811 ctx);
812 req->send();
813 }
814
815 template <typename I>
816 void Replayer<I>::handle_copy_snapshots(int r) {
817 dout(10) << "r=" << r << dendl;
818
819 if (r < 0) {
820 derr << "failed to copy snapshots from remote to local image: "
821 << cpp_strerror(r) << dendl;
822 handle_replay_complete(
823 r, "failed to copy snapshots from remote to local image");
824 return;
825 }
826
827 dout(10) << "remote_snap_id_start=" << m_remote_snap_id_start << ", "
828 << "remote_snap_id_end=" << m_remote_snap_id_end << ", "
829 << "local_snap_id_start=" << m_local_snap_id_start << ", "
830 << "snap_seqs=" << m_local_mirror_snap_ns.snap_seqs << dendl;
831 get_remote_image_state();
832 }
833
834 template <typename I>
835 void Replayer<I>::get_remote_image_state() {
836 dout(10) << dendl;
837
838 auto ctx = create_context_callback<
839 Replayer<I>, &Replayer<I>::handle_get_remote_image_state>(this);
840 auto req = librbd::mirror::snapshot::GetImageStateRequest<I>::create(
841 m_state_builder->remote_image_ctx, m_remote_snap_id_end,
842 &m_image_state, ctx);
843 req->send();
844 }
845
846 template <typename I>
847 void Replayer<I>::handle_get_remote_image_state(int r) {
848 dout(10) << "r=" << r << dendl;
849
850 if (r < 0) {
851 derr << "failed to retrieve remote snapshot image state: "
852 << cpp_strerror(r) << dendl;
853 handle_replay_complete(r, "failed to retrieve remote snapshot image state");
854 return;
855 }
856
857 create_non_primary_snapshot();
858 }
859
860 template <typename I>
861 void Replayer<I>::get_local_image_state() {
862 dout(10) << dendl;
863
864 ceph_assert(m_local_snap_id_end != CEPH_NOSNAP);
865 auto ctx = create_context_callback<
866 Replayer<I>, &Replayer<I>::handle_get_local_image_state>(this);
867 auto req = librbd::mirror::snapshot::GetImageStateRequest<I>::create(
868 m_state_builder->local_image_ctx, m_local_snap_id_end,
869 &m_image_state, ctx);
870 req->send();
871 }
872
873 template <typename I>
874 void Replayer<I>::handle_get_local_image_state(int r) {
875 dout(10) << "r=" << r << dendl;
876
877 if (r < 0) {
878 derr << "failed to retrieve local snapshot image state: "
879 << cpp_strerror(r) << dendl;
880 handle_replay_complete(r, "failed to retrieve local snapshot image state");
881 return;
882 }
883
884 request_sync();
885 }
886
887 template <typename I>
888 void Replayer<I>::create_non_primary_snapshot() {
889 auto local_image_ctx = m_state_builder->local_image_ctx;
890
891 if (m_local_snap_id_start > 0) {
892 std::shared_lock local_image_locker{local_image_ctx->image_lock};
893
894 auto local_snap_info_it = local_image_ctx->snap_info.find(
895 m_local_snap_id_start);
896 if (local_snap_info_it == local_image_ctx->snap_info.end()) {
897 local_image_locker.unlock();
898
899 derr << "failed to locate local snapshot " << m_local_snap_id_start
900 << dendl;
901 handle_replay_complete(-ENOENT, "failed to locate local start snapshot");
902 return;
903 }
904
905 auto mirror_ns = boost::get<cls::rbd::MirrorSnapshotNamespace>(
906 &local_snap_info_it->second.snap_namespace);
907 ceph_assert(mirror_ns != nullptr);
908
909 auto remote_image_ctx = m_state_builder->remote_image_ctx;
910 std::shared_lock remote_image_locker{remote_image_ctx->image_lock};
911
912 // (re)build a full mapping from remote to local snap ids for all user
913 // snapshots to support applying image state in the future
914 for (auto& [remote_snap_id, remote_snap_info] :
915 remote_image_ctx->snap_info) {
916 if (remote_snap_id >= m_remote_snap_id_end) {
917 break;
918 }
919
920 // we can ignore all non-user snapshots since image state only includes
921 // user snapshots
922 if (boost::get<cls::rbd::UserSnapshotNamespace>(
923 &remote_snap_info.snap_namespace) == nullptr) {
924 continue;
925 }
926
927 uint64_t local_snap_id = CEPH_NOSNAP;
928 if (mirror_ns->is_demoted() && !m_remote_mirror_snap_ns.is_demoted()) {
929 // if we are creating a non-primary snapshot following a demotion,
930 // re-build the full snapshot sequence since we don't have a valid
931 // snapshot mapping
932 auto local_snap_id_it = local_image_ctx->snap_ids.find(
933 {remote_snap_info.snap_namespace, remote_snap_info.name});
934 if (local_snap_id_it != local_image_ctx->snap_ids.end()) {
935 local_snap_id = local_snap_id_it->second;
936 }
937 } else {
938 auto snap_seq_it = mirror_ns->snap_seqs.find(remote_snap_id);
939 if (snap_seq_it != mirror_ns->snap_seqs.end()) {
940 local_snap_id = snap_seq_it->second;
941 }
942 }
943
944 if (m_local_mirror_snap_ns.snap_seqs.count(remote_snap_id) == 0 &&
945 local_snap_id != CEPH_NOSNAP) {
946 dout(15) << "mapping remote snapshot " << remote_snap_id << " to "
947 << "local snapshot " << local_snap_id << dendl;
948 m_local_mirror_snap_ns.snap_seqs[remote_snap_id] = local_snap_id;
949 }
950 }
951 }
952
953 dout(10) << "demoted=" << m_remote_mirror_snap_ns.is_demoted() << ", "
954 << "primary_mirror_uuid="
955 << m_state_builder->remote_mirror_uuid << ", "
956 << "primary_snap_id=" << m_remote_snap_id_end << ", "
957 << "snap_seqs=" << m_local_mirror_snap_ns.snap_seqs << dendl;
958
959 auto ctx = create_context_callback<
960 Replayer<I>, &Replayer<I>::handle_create_non_primary_snapshot>(this);
961 auto req = librbd::mirror::snapshot::CreateNonPrimaryRequest<I>::create(
962 local_image_ctx, m_remote_mirror_snap_ns.is_demoted(),
963 m_state_builder->remote_mirror_uuid, m_remote_snap_id_end,
964 m_local_mirror_snap_ns.snap_seqs, m_image_state, &m_local_snap_id_end, ctx);
965 req->send();
966 }
967
968 template <typename I>
969 void Replayer<I>::handle_create_non_primary_snapshot(int r) {
970 dout(10) << "r=" << r << dendl;
971
972 if (r < 0) {
973 derr << "failed to create local mirror snapshot: " << cpp_strerror(r)
974 << dendl;
975 handle_replay_complete(r, "failed to create local mirror snapshot");
976 return;
977 }
978
979 dout(15) << "local_snap_id_end=" << m_local_snap_id_end << dendl;
980
981 update_mirror_image_state();
982 }
983
984 template <typename I>
985 void Replayer<I>::update_mirror_image_state() {
986 if (m_local_snap_id_start > 0) {
987 request_sync();
988 return;
989 }
990
991 // a newly created non-primary image has a local mirror state of CREATING
992 // until this point so that we could avoid preserving the image until
993 // the first non-primary snapshot linked the two images together.
994 dout(10) << dendl;
995 auto ctx = create_context_callback<
996 Replayer<I>, &Replayer<I>::handle_update_mirror_image_state>(this);
997 auto req = librbd::mirror::ImageStateUpdateRequest<I>::create(
998 m_state_builder->local_image_ctx->md_ctx,
999 m_state_builder->local_image_ctx->id,
1000 cls::rbd::MIRROR_IMAGE_STATE_ENABLED, {}, ctx);
1001 req->send();
1002 }
1003
1004 template <typename I>
1005 void Replayer<I>::handle_update_mirror_image_state(int r) {
1006 dout(10) << "r=" << r << dendl;
1007
1008 if (r < 0) {
1009 derr << "failed to update local mirror image state: " << cpp_strerror(r)
1010 << dendl;
1011 handle_replay_complete(r, "failed to update local mirror image state");
1012 return;
1013 }
1014
1015 request_sync();
1016 }
1017
1018 template <typename I>
1019 void Replayer<I>::request_sync() {
1020 if (m_remote_mirror_snap_ns.clean_since_snap_id == m_remote_snap_id_start) {
1021 dout(10) << "skipping unnecessary image copy: "
1022 << "remote_snap_id_start=" << m_remote_snap_id_start << ", "
1023 << "remote_mirror_snap_ns=" << m_remote_mirror_snap_ns << dendl;
1024 apply_image_state();
1025 return;
1026 }
1027
1028 dout(10) << dendl;
1029 std::unique_lock locker{m_lock};
1030 if (is_replay_interrupted(&locker)) {
1031 return;
1032 }
1033
1034 auto ctx = create_async_context_callback(
1035 m_threads->work_queue, create_context_callback<
1036 Replayer<I>, &Replayer<I>::handle_request_sync>(this));
1037 m_instance_watcher->notify_sync_request(m_state_builder->local_image_ctx->id,
1038 ctx);
1039 }
1040
1041 template <typename I>
1042 void Replayer<I>::handle_request_sync(int r) {
1043 dout(10) << "r=" << r << dendl;
1044
1045 std::unique_lock locker{m_lock};
1046 if (is_replay_interrupted(&locker)) {
1047 return;
1048 } else if (r == -ECANCELED) {
1049 dout(5) << "image-sync canceled" << dendl;
1050 handle_replay_complete(&locker, r, "image-sync canceled");
1051 return;
1052 } else if (r < 0) {
1053 derr << "failed to request image-sync: " << cpp_strerror(r) << dendl;
1054 handle_replay_complete(&locker, r, "failed to request image-sync");
1055 return;
1056 }
1057
1058 m_sync_in_progress = true;
1059 locker.unlock();
1060
1061 copy_image();
1062 }
1063
1064 template <typename I>
1065 void Replayer<I>::copy_image() {
1066 dout(10) << "remote_snap_id_start=" << m_remote_snap_id_start << ", "
1067 << "remote_snap_id_end=" << m_remote_snap_id_end << ", "
1068 << "local_snap_id_start=" << m_local_snap_id_start << ", "
1069 << "last_copied_object_number="
1070 << m_local_mirror_snap_ns.last_copied_object_number << ", "
1071 << "snap_seqs=" << m_local_mirror_snap_ns.snap_seqs << dendl;
1072
1073 m_snapshot_bytes = 0;
1074 m_snapshot_replay_start = ceph_clock_now();
1075 m_deep_copy_handler = new DeepCopyHandler(this);
1076 auto ctx = create_context_callback<
1077 Replayer<I>, &Replayer<I>::handle_copy_image>(this);
1078 auto req = librbd::deep_copy::ImageCopyRequest<I>::create(
1079 m_state_builder->remote_image_ctx, m_state_builder->local_image_ctx,
1080 m_remote_snap_id_start, m_remote_snap_id_end, m_local_snap_id_start, false,
1081 (m_local_mirror_snap_ns.last_copied_object_number > 0 ?
1082 librbd::deep_copy::ObjectNumber{
1083 m_local_mirror_snap_ns.last_copied_object_number} :
1084 librbd::deep_copy::ObjectNumber{}),
1085 m_local_mirror_snap_ns.snap_seqs, m_deep_copy_handler, ctx);
1086 req->send();
1087 }
1088
1089 template <typename I>
1090 void Replayer<I>::handle_copy_image(int r) {
1091 dout(10) << "r=" << r << dendl;
1092
1093 delete m_deep_copy_handler;
1094 m_deep_copy_handler = nullptr;
1095
1096 if (r < 0) {
1097 derr << "failed to copy remote image to local image: " << cpp_strerror(r)
1098 << dendl;
1099 handle_replay_complete(r, "failed to copy remote image");
1100 return;
1101 }
1102
1103 {
1104 std::unique_lock locker{m_lock};
1105 m_bytes_per_snapshot(m_snapshot_bytes);
1106 auto time = ceph_clock_now() - m_snapshot_replay_start;
1107 if (g_snapshot_perf_counters) {
1108 g_snapshot_perf_counters->inc(l_rbd_mirror_snapshot_replay_bytes,
1109 m_snapshot_bytes);
1110 g_snapshot_perf_counters->inc(l_rbd_mirror_snapshot_replay_snapshots);
1111 g_snapshot_perf_counters->tinc(
1112 l_rbd_mirror_snapshot_replay_snapshots_time, time);
1113 }
1114 if (m_perf_counters) {
1115 m_perf_counters->inc(l_rbd_mirror_snapshot_replay_bytes, m_snapshot_bytes);
1116 m_perf_counters->inc(l_rbd_mirror_snapshot_replay_snapshots);
1117 m_perf_counters->tinc(l_rbd_mirror_snapshot_replay_snapshots_time, time);
1118 }
1119 m_snapshot_bytes = 0;
1120 }
1121
1122 apply_image_state();
1123 }
1124
1125 template <typename I>
1126 void Replayer<I>::handle_copy_image_progress(uint64_t object_number,
1127 uint64_t object_count) {
1128 dout(10) << "object_number=" << object_number << ", "
1129 << "object_count=" << object_count << dendl;
1130
1131 std::unique_lock locker{m_lock};
1132 m_local_mirror_snap_ns.last_copied_object_number = std::min(
1133 object_number, object_count);
1134 m_local_object_count = object_count;
1135
1136 update_non_primary_snapshot(false);
1137 }
1138
1139 template <typename I>
1140 void Replayer<I>::handle_copy_image_read(uint64_t bytes_read) {
1141 dout(20) << "bytes_read=" << bytes_read << dendl;
1142
1143 std::unique_lock locker{m_lock};
1144 m_bytes_per_second(bytes_read);
1145 m_snapshot_bytes += bytes_read;
1146 }
1147
1148 template <typename I>
1149 void Replayer<I>::apply_image_state() {
1150 dout(10) << dendl;
1151
1152 auto ctx = create_context_callback<
1153 Replayer<I>, &Replayer<I>::handle_apply_image_state>(this);
1154 auto req = ApplyImageStateRequest<I>::create(
1155 m_local_mirror_uuid,
1156 m_state_builder->remote_mirror_uuid,
1157 m_state_builder->local_image_ctx,
1158 m_state_builder->remote_image_ctx,
1159 m_image_state, ctx);
1160 req->send();
1161 }
1162
1163 template <typename I>
1164 void Replayer<I>::handle_apply_image_state(int r) {
1165 dout(10) << "r=" << r << dendl;
1166
1167 if (r < 0 && r != -ENOENT) {
1168 derr << "failed to apply remote image state to local image: "
1169 << cpp_strerror(r) << dendl;
1170 handle_replay_complete(r, "failed to apply remote image state");
1171 return;
1172 }
1173
1174 std::unique_lock locker{m_lock};
1175 update_non_primary_snapshot(true);
1176 }
1177
1178 template <typename I>
1179 void Replayer<I>::update_non_primary_snapshot(bool complete) {
1180 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1181 if (!complete) {
1182 // disallow two in-flight updates if this isn't the completion of the sync
1183 if (m_updating_sync_point) {
1184 return;
1185 }
1186 m_updating_sync_point = true;
1187 } else {
1188 m_local_mirror_snap_ns.complete = true;
1189 }
1190
1191 dout(10) << dendl;
1192
1193 librados::ObjectWriteOperation op;
1194 librbd::cls_client::mirror_image_snapshot_set_copy_progress(
1195 &op, m_local_snap_id_end, m_local_mirror_snap_ns.complete,
1196 m_local_mirror_snap_ns.last_copied_object_number);
1197
1198 auto ctx = new C_TrackedOp(
1199 m_in_flight_op_tracker, new LambdaContext([this, complete](int r) {
1200 handle_update_non_primary_snapshot(complete, r);
1201 }));
1202 auto aio_comp = create_rados_callback(ctx);
1203 int r = m_state_builder->local_image_ctx->md_ctx.aio_operate(
1204 m_state_builder->local_image_ctx->header_oid, aio_comp, &op);
1205 ceph_assert(r == 0);
1206 aio_comp->release();
1207 }
1208
1209 template <typename I>
1210 void Replayer<I>::handle_update_non_primary_snapshot(bool complete, int r) {
1211 dout(10) << "r=" << r << dendl;
1212
1213 if (r < 0) {
1214 derr << "failed to update local snapshot progress: " << cpp_strerror(r)
1215 << dendl;
1216 if (complete) {
1217 // only fail if this was the final update
1218 handle_replay_complete(r, "failed to update local snapshot progress");
1219 return;
1220 }
1221 }
1222
1223 if (!complete) {
1224 // periodic sync-point update -- do not advance state machine
1225 std::unique_lock locker{m_lock};
1226
1227 ceph_assert(m_updating_sync_point);
1228 m_updating_sync_point = false;
1229 return;
1230 }
1231
1232 notify_image_update();
1233 }
1234
1235 template <typename I>
1236 void Replayer<I>::notify_image_update() {
1237 dout(10) << dendl;
1238
1239 auto ctx = create_context_callback<
1240 Replayer<I>, &Replayer<I>::handle_notify_image_update>(this);
1241 m_state_builder->local_image_ctx->notify_update(ctx);
1242 }
1243
1244 template <typename I>
1245 void Replayer<I>::handle_notify_image_update(int r) {
1246 dout(10) << "r=" << r << dendl;
1247
1248 if (r < 0) {
1249 derr << "failed to notify local image update: " << cpp_strerror(r) << dendl;
1250 }
1251
1252 unlink_peer(m_remote_snap_id_start);
1253 }
1254
1255 template <typename I>
1256 void Replayer<I>::unlink_peer(uint64_t remote_snap_id) {
1257 if (remote_snap_id == 0) {
1258 finish_sync();
1259 return;
1260 }
1261
1262 // local snapshot fully synced -- we no longer depend on the sync
1263 // start snapshot in the remote image
1264 dout(10) << "remote_snap_id=" << remote_snap_id << dendl;
1265
1266 auto ctx = create_context_callback<
1267 Replayer<I>, &Replayer<I>::handle_unlink_peer>(this);
1268 auto req = librbd::mirror::snapshot::UnlinkPeerRequest<I>::create(
1269 m_state_builder->remote_image_ctx, remote_snap_id,
1270 m_remote_mirror_peer_uuid, ctx);
1271 req->send();
1272 }
1273
1274 template <typename I>
1275 void Replayer<I>::handle_unlink_peer(int r) {
1276 dout(10) << "r=" << r << dendl;
1277
1278 if (r < 0 && r != -ENOENT) {
1279 derr << "failed to unlink local peer from remote image: " << cpp_strerror(r)
1280 << dendl;
1281 handle_replay_complete(r, "failed to unlink local peer from remote image");
1282 return;
1283 }
1284
1285 finish_sync();
1286 }
1287
1288 template <typename I>
1289 void Replayer<I>::finish_sync() {
1290 dout(10) << dendl;
1291
1292 {
1293 std::unique_lock locker{m_lock};
1294 notify_status_updated();
1295
1296 if (m_sync_in_progress) {
1297 m_sync_in_progress = false;
1298 m_instance_watcher->notify_sync_complete(
1299 m_state_builder->local_image_ctx->id);
1300 }
1301 }
1302
1303 if (is_replay_interrupted()) {
1304 return;
1305 }
1306
1307 load_local_image_meta();
1308 }
1309
1310 template <typename I>
1311 void Replayer<I>::register_local_update_watcher() {
1312 dout(10) << dendl;
1313
1314 m_update_watch_ctx = new C_UpdateWatchCtx(this);
1315
1316 int r = m_state_builder->local_image_ctx->state->register_update_watcher(
1317 m_update_watch_ctx, &m_local_update_watcher_handle);
1318 auto ctx = create_context_callback<
1319 Replayer<I>, &Replayer<I>::handle_register_local_update_watcher>(this);
1320 m_threads->work_queue->queue(ctx, r);
1321 }
1322
1323 template <typename I>
1324 void Replayer<I>::handle_register_local_update_watcher(int r) {
1325 dout(10) << "r=" << r << dendl;
1326
1327 if (r < 0) {
1328 derr << "failed to register local update watcher: " << cpp_strerror(r)
1329 << dendl;
1330 handle_replay_complete(r, "failed to register local image update watcher");
1331 m_state = STATE_COMPLETE;
1332
1333 delete m_update_watch_ctx;
1334 m_update_watch_ctx = nullptr;
1335
1336 Context* on_init = nullptr;
1337 std::swap(on_init, m_on_init_shutdown);
1338 on_init->complete(r);
1339 return;
1340 }
1341
1342 register_remote_update_watcher();
1343 }
1344
1345 template <typename I>
1346 void Replayer<I>::register_remote_update_watcher() {
1347 dout(10) << dendl;
1348
1349 int r = m_state_builder->remote_image_ctx->state->register_update_watcher(
1350 m_update_watch_ctx, &m_remote_update_watcher_handle);
1351 auto ctx = create_context_callback<
1352 Replayer<I>, &Replayer<I>::handle_register_remote_update_watcher>(this);
1353 m_threads->work_queue->queue(ctx, r);
1354 }
1355
1356 template <typename I>
1357 void Replayer<I>::handle_register_remote_update_watcher(int r) {
1358 dout(10) << "r=" << r << dendl;
1359
1360 if (r < 0) {
1361 derr << "failed to register remote update watcher: " << cpp_strerror(r)
1362 << dendl;
1363 handle_replay_complete(r, "failed to register remote image update watcher");
1364 m_state = STATE_COMPLETE;
1365
1366 unregister_local_update_watcher();
1367 return;
1368 }
1369
1370 m_state = STATE_REPLAYING;
1371
1372 Context* on_init = nullptr;
1373 std::swap(on_init, m_on_init_shutdown);
1374 on_init->complete(0);
1375
1376 // delay initial snapshot scan until after we have alerted
1377 // image replayer that we have initialized in case an error
1378 // occurs
1379 {
1380 std::unique_lock locker{m_lock};
1381 notify_status_updated();
1382 }
1383
1384 load_local_image_meta();
1385 }
1386
1387 template <typename I>
1388 void Replayer<I>::unregister_remote_update_watcher() {
1389 dout(10) << dendl;
1390
1391 auto ctx = create_context_callback<
1392 Replayer<I>,
1393 &Replayer<I>::handle_unregister_remote_update_watcher>(this);
1394 m_state_builder->remote_image_ctx->state->unregister_update_watcher(
1395 m_remote_update_watcher_handle, ctx);
1396 }
1397
1398 template <typename I>
1399 void Replayer<I>::handle_unregister_remote_update_watcher(int r) {
1400 dout(10) << "r=" << r << dendl;
1401
1402 if (r < 0) {
1403 derr << "failed to unregister remote update watcher: " << cpp_strerror(r)
1404 << dendl;
1405 }
1406
1407 unregister_local_update_watcher();
1408 }
1409
1410 template <typename I>
1411 void Replayer<I>::unregister_local_update_watcher() {
1412 dout(10) << dendl;
1413
1414 auto ctx = create_context_callback<
1415 Replayer<I>,
1416 &Replayer<I>::handle_unregister_local_update_watcher>(this);
1417 m_state_builder->local_image_ctx->state->unregister_update_watcher(
1418 m_local_update_watcher_handle, ctx);
1419 }
1420
1421 template <typename I>
1422 void Replayer<I>::handle_unregister_local_update_watcher(int r) {
1423 dout(10) << "r=" << r << dendl;
1424
1425 if (r < 0) {
1426 derr << "failed to unregister local update watcher: " << cpp_strerror(r)
1427 << dendl;
1428 }
1429
1430 delete m_update_watch_ctx;
1431 m_update_watch_ctx = nullptr;
1432
1433 wait_for_in_flight_ops();
1434 }
1435
1436 template <typename I>
1437 void Replayer<I>::wait_for_in_flight_ops() {
1438 dout(10) << dendl;
1439
1440 auto ctx = create_async_context_callback(
1441 m_threads->work_queue, create_context_callback<
1442 Replayer<I>, &Replayer<I>::handle_wait_for_in_flight_ops>(this));
1443 m_in_flight_op_tracker.wait_for_ops(ctx);
1444 }
1445
1446 template <typename I>
1447 void Replayer<I>::handle_wait_for_in_flight_ops(int r) {
1448 dout(10) << "r=" << r << dendl;
1449
1450 Context* on_shutdown = nullptr;
1451 {
1452 std::unique_lock locker{m_lock};
1453 ceph_assert(m_on_init_shutdown != nullptr);
1454 std::swap(on_shutdown, m_on_init_shutdown);
1455 }
1456 on_shutdown->complete(m_error_code);
1457 }
1458
1459 template <typename I>
1460 void Replayer<I>::handle_image_update_notify() {
1461 dout(10) << dendl;
1462
1463 std::unique_lock locker{m_lock};
1464 if (m_state == STATE_REPLAYING) {
1465 dout(15) << "flagging snapshot rescan required" << dendl;
1466 m_image_updated = true;
1467 } else if (m_state == STATE_IDLE) {
1468 m_state = STATE_REPLAYING;
1469 locker.unlock();
1470
1471 dout(15) << "restarting idle replayer" << dendl;
1472 load_local_image_meta();
1473 }
1474 }
1475
1476 template <typename I>
1477 void Replayer<I>::handle_replay_complete(int r,
1478 const std::string& description) {
1479 std::unique_lock locker{m_lock};
1480 handle_replay_complete(&locker, r, description);
1481 }
1482
1483 template <typename I>
1484 void Replayer<I>::handle_replay_complete(std::unique_lock<ceph::mutex>* locker,
1485 int r,
1486 const std::string& description) {
1487 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1488
1489 if (m_sync_in_progress) {
1490 m_sync_in_progress = false;
1491 m_instance_watcher->notify_sync_complete(
1492 m_state_builder->local_image_ctx->id);
1493 }
1494
1495 // don't set error code and description if resuming a pending
1496 // shutdown
1497 if (is_replay_interrupted(locker)) {
1498 return;
1499 }
1500
1501 if (m_error_code == 0) {
1502 m_error_code = r;
1503 m_error_description = description;
1504 }
1505
1506 if (m_state != STATE_REPLAYING && m_state != STATE_IDLE) {
1507 return;
1508 }
1509
1510 m_state = STATE_COMPLETE;
1511 notify_status_updated();
1512 }
1513
1514 template <typename I>
1515 void Replayer<I>::notify_status_updated() {
1516 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1517
1518 dout(10) << dendl;
1519 auto ctx = new C_TrackedOp(m_in_flight_op_tracker, new LambdaContext(
1520 [this](int) {
1521 m_replayer_listener->handle_notification();
1522 }));
1523 m_threads->work_queue->queue(ctx, 0);
1524 }
1525
1526 template <typename I>
1527 bool Replayer<I>::is_replay_interrupted() {
1528 std::unique_lock locker{m_lock};
1529 return is_replay_interrupted(&locker);
1530 }
1531
1532 template <typename I>
1533 bool Replayer<I>::is_replay_interrupted(std::unique_lock<ceph::mutex>* locker) {
1534 if (m_state == STATE_COMPLETE) {
1535 locker->unlock();
1536
1537 dout(10) << "resuming pending shut down" << dendl;
1538 unregister_remote_update_watcher();
1539 return true;
1540 }
1541 return false;
1542 }
1543
1544 template <typename I>
1545 void Replayer<I>::register_perf_counters() {
1546 dout(5) << dendl;
1547
1548 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1549 ceph_assert(m_perf_counters == nullptr);
1550
1551 auto cct = static_cast<CephContext *>(m_state_builder->local_image_ctx->cct);
1552 auto prio = cct->_conf.get_val<int64_t>("rbd_mirror_image_perf_stats_prio");
1553 PerfCountersBuilder plb(g_ceph_context,
1554 "rbd_mirror_snapshot_image_" + m_image_spec,
1555 l_rbd_mirror_snapshot_first,
1556 l_rbd_mirror_snapshot_last);
1557 plb.add_u64_counter(l_rbd_mirror_snapshot_replay_snapshots,
1558 "snapshots", "Snapshots", "r", prio);
1559 plb.add_time_avg(l_rbd_mirror_snapshot_replay_snapshots_time,
1560 "snapshots_time", "Snapshots time", "rl", prio);
1561 plb.add_u64_counter(l_rbd_mirror_snapshot_replay_bytes, "replay_bytes",
1562 "Replayed data", "rb", prio, unit_t(UNIT_BYTES));
1563 m_perf_counters = plb.create_perf_counters();
1564 g_ceph_context->get_perfcounters_collection()->add(m_perf_counters);
1565 }
1566
1567 template <typename I>
1568 void Replayer<I>::unregister_perf_counters() {
1569 dout(5) << dendl;
1570 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1571
1572 PerfCounters *perf_counters = nullptr;
1573 std::swap(perf_counters, m_perf_counters);
1574
1575 if (perf_counters != nullptr) {
1576 g_ceph_context->get_perfcounters_collection()->remove(perf_counters);
1577 delete perf_counters;
1578 }
1579 }
1580
1581 } // namespace snapshot
1582 } // namespace image_replayer
1583 } // namespace mirror
1584 } // namespace rbd
1585
1586 template class rbd::mirror::image_replayer::snapshot::Replayer<librbd::ImageCtx>;