]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/image_replayer/snapshot/Replayer.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / tools / rbd_mirror / image_replayer / snapshot / Replayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "Replayer.h"
5 #include "common/debug.h"
6 #include "common/errno.h"
7 #include "include/stringify.h"
8 #include "common/Timer.h"
9 #include "cls/rbd/cls_rbd_client.h"
10 #include "json_spirit/json_spirit.h"
11 #include "librbd/ImageCtx.h"
12 #include "librbd/ImageState.h"
13 #include "librbd/Operations.h"
14 #include "librbd/Utils.h"
15 #include "librbd/asio/ContextWQ.h"
16 #include "librbd/deep_copy/Handler.h"
17 #include "librbd/deep_copy/ImageCopyRequest.h"
18 #include "librbd/deep_copy/SnapshotCopyRequest.h"
19 #include "librbd/mirror/ImageStateUpdateRequest.h"
20 #include "librbd/mirror/snapshot/CreateNonPrimaryRequest.h"
21 #include "librbd/mirror/snapshot/GetImageStateRequest.h"
22 #include "librbd/mirror/snapshot/ImageMeta.h"
23 #include "librbd/mirror/snapshot/UnlinkPeerRequest.h"
24 #include "tools/rbd_mirror/InstanceWatcher.h"
25 #include "tools/rbd_mirror/PoolMetaCache.h"
26 #include "tools/rbd_mirror/Threads.h"
27 #include "tools/rbd_mirror/Types.h"
28 #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
29 #include "tools/rbd_mirror/image_replayer/ReplayerListener.h"
30 #include "tools/rbd_mirror/image_replayer/Utils.h"
31 #include "tools/rbd_mirror/image_replayer/snapshot/ApplyImageStateRequest.h"
32 #include "tools/rbd_mirror/image_replayer/snapshot/StateBuilder.h"
33 #include "tools/rbd_mirror/image_replayer/snapshot/Utils.h"
34 #include <set>
35
36 #define dout_context g_ceph_context
37 #define dout_subsys ceph_subsys_rbd_mirror
38 #undef dout_prefix
39 #define dout_prefix *_dout << "rbd::mirror::image_replayer::snapshot::" \
40 << "Replayer: " << this << " " << __func__ << ": "
41
42 extern PerfCounters *g_perf_counters;
43
44 namespace rbd {
45 namespace mirror {
46 namespace image_replayer {
47 namespace snapshot {
48
49 namespace {
50
51 double round_to_two_places(double value) {
52 return abs(round(value * 100) / 100);
53 }
54
55 template<typename I>
56 std::pair<uint64_t, librbd::SnapInfo*> get_newest_mirror_snapshot(
57 I* image_ctx) {
58 for (auto snap_info_it = image_ctx->snap_info.rbegin();
59 snap_info_it != image_ctx->snap_info.rend(); ++snap_info_it) {
60 const auto& snap_ns = snap_info_it->second.snap_namespace;
61 auto mirror_ns = boost::get<
62 cls::rbd::MirrorSnapshotNamespace>(&snap_ns);
63 if (mirror_ns == nullptr || !mirror_ns->complete) {
64 continue;
65 }
66
67 return {snap_info_it->first, &snap_info_it->second};
68 }
69
70 return {CEPH_NOSNAP, nullptr};
71 }
72
73 } // anonymous namespace
74
75 using librbd::util::create_async_context_callback;
76 using librbd::util::create_context_callback;
77 using librbd::util::create_rados_callback;
78
79 template <typename I>
80 struct Replayer<I>::C_UpdateWatchCtx : public librbd::UpdateWatchCtx {
81 Replayer<I>* replayer;
82
83 C_UpdateWatchCtx(Replayer<I>* replayer) : replayer(replayer) {
84 }
85
86 void handle_notify() override {
87 replayer->handle_image_update_notify();
88 }
89 };
90
91 template <typename I>
92 struct Replayer<I>::DeepCopyHandler : public librbd::deep_copy::Handler {
93 Replayer *replayer;
94
95 DeepCopyHandler(Replayer* replayer) : replayer(replayer) {
96 }
97
98 void handle_read(uint64_t bytes_read) override {
99 replayer->handle_copy_image_read(bytes_read);
100 }
101
102 int update_progress(uint64_t object_number, uint64_t object_count) override {
103 replayer->handle_copy_image_progress(object_number, object_count);
104 return 0;
105 }
106 };
107
108 template <typename I>
109 Replayer<I>::Replayer(
110 Threads<I>* threads,
111 InstanceWatcher<I>* instance_watcher,
112 const std::string& local_mirror_uuid,
113 PoolMetaCache* pool_meta_cache,
114 StateBuilder<I>* state_builder,
115 ReplayerListener* replayer_listener)
116 : m_threads(threads),
117 m_instance_watcher(instance_watcher),
118 m_local_mirror_uuid(local_mirror_uuid),
119 m_pool_meta_cache(pool_meta_cache),
120 m_state_builder(state_builder),
121 m_replayer_listener(replayer_listener),
122 m_lock(ceph::make_mutex(librbd::util::unique_lock_name(
123 "rbd::mirror::image_replayer::snapshot::Replayer", this))) {
124 dout(10) << dendl;
125 }
126
127 template <typename I>
128 Replayer<I>::~Replayer() {
129 dout(10) << dendl;
130 ceph_assert(m_state == STATE_COMPLETE);
131 ceph_assert(m_update_watch_ctx == nullptr);
132 ceph_assert(m_deep_copy_handler == nullptr);
133 }
134
135 template <typename I>
136 void Replayer<I>::init(Context* on_finish) {
137 dout(10) << dendl;
138
139 ceph_assert(m_state == STATE_INIT);
140
141 RemotePoolMeta remote_pool_meta;
142 int r = m_pool_meta_cache->get_remote_pool_meta(
143 m_state_builder->remote_image_ctx->md_ctx.get_id(), &remote_pool_meta);
144 if (r < 0 || remote_pool_meta.mirror_peer_uuid.empty()) {
145 derr << "failed to retrieve mirror peer uuid from remote pool" << dendl;
146 m_state = STATE_COMPLETE;
147 m_threads->work_queue->queue(on_finish, r);
148 return;
149 }
150
151 m_remote_mirror_peer_uuid = remote_pool_meta.mirror_peer_uuid;
152 dout(10) << "remote_mirror_peer_uuid=" << m_remote_mirror_peer_uuid << dendl;
153
154 ceph_assert(m_on_init_shutdown == nullptr);
155 m_on_init_shutdown = on_finish;
156
157 register_local_update_watcher();
158 }
159
160 template <typename I>
161 void Replayer<I>::shut_down(Context* on_finish) {
162 dout(10) << dendl;
163
164 std::unique_lock locker{m_lock};
165 ceph_assert(m_on_init_shutdown == nullptr);
166 m_on_init_shutdown = on_finish;
167 m_error_code = 0;
168 m_error_description = "";
169
170 ceph_assert(m_state != STATE_INIT);
171 auto state = STATE_COMPLETE;
172 std::swap(m_state, state);
173
174 if (state == STATE_REPLAYING) {
175 // if a sync request was pending, request a cancelation
176 m_instance_watcher->cancel_sync_request(
177 m_state_builder->local_image_ctx->id);
178
179 // TODO interrupt snapshot copy and image copy state machines even if remote
180 // cluster is unreachable
181 dout(10) << "shut down pending on completion of snapshot replay" << dendl;
182 return;
183 }
184 locker.unlock();
185
186 unregister_remote_update_watcher();
187 }
188
189 template <typename I>
190 void Replayer<I>::flush(Context* on_finish) {
191 dout(10) << dendl;
192
193 // TODO
194 m_threads->work_queue->queue(on_finish, 0);
195 }
196
197 template <typename I>
198 bool Replayer<I>::get_replay_status(std::string* description,
199 Context* on_finish) {
200 dout(10) << dendl;
201
202 std::unique_lock locker{m_lock};
203 if (m_state != STATE_REPLAYING && m_state != STATE_IDLE) {
204 locker.unlock();
205
206 derr << "replay not running" << dendl;
207 on_finish->complete(-EAGAIN);
208 return false;
209 }
210
211 std::shared_lock local_image_locker{
212 m_state_builder->local_image_ctx->image_lock};
213 auto [local_snap_id, local_snap_info] = get_newest_mirror_snapshot(
214 m_state_builder->local_image_ctx);
215
216 std::shared_lock remote_image_locker{
217 m_state_builder->remote_image_ctx->image_lock};
218 auto [remote_snap_id, remote_snap_info] = get_newest_mirror_snapshot(
219 m_state_builder->remote_image_ctx);
220
221 if (remote_snap_info == nullptr) {
222 remote_image_locker.unlock();
223 local_image_locker.unlock();
224 locker.unlock();
225
226 derr << "remote image does not contain mirror snapshots" << dendl;
227 on_finish->complete(-EAGAIN);
228 return false;
229 }
230
231 std::string replay_state = "idle";
232 if (m_remote_snap_id_end != CEPH_NOSNAP) {
233 replay_state = "syncing";
234 }
235
236 json_spirit::mObject root_obj;
237 root_obj["replay_state"] = replay_state;
238 root_obj["remote_snapshot_timestamp"] = remote_snap_info->timestamp.sec();
239
240 auto matching_remote_snap_id = util::compute_remote_snap_id(
241 m_state_builder->local_image_ctx->image_lock,
242 m_state_builder->local_image_ctx->snap_info,
243 local_snap_id, m_state_builder->remote_mirror_uuid);
244 auto matching_remote_snap_it =
245 m_state_builder->remote_image_ctx->snap_info.find(matching_remote_snap_id);
246 if (matching_remote_snap_id != CEPH_NOSNAP &&
247 matching_remote_snap_it !=
248 m_state_builder->remote_image_ctx->snap_info.end()) {
249 // use the timestamp from the matching remote image since
250 // the local snapshot would just be the time the snapshot was
251 // synced and not the consistency point in time.
252 root_obj["local_snapshot_timestamp"] =
253 matching_remote_snap_it->second.timestamp.sec();
254 }
255
256 matching_remote_snap_it = m_state_builder->remote_image_ctx->snap_info.find(
257 m_remote_snap_id_end);
258 if (m_remote_snap_id_end != CEPH_NOSNAP &&
259 matching_remote_snap_it !=
260 m_state_builder->remote_image_ctx->snap_info.end()) {
261 root_obj["syncing_snapshot_timestamp"] = remote_snap_info->timestamp.sec();
262 root_obj["syncing_percent"] = static_cast<uint64_t>(
263 100 * m_local_mirror_snap_ns.last_copied_object_number /
264 static_cast<float>(std::max<uint64_t>(1U, m_local_object_count)));
265 }
266
267 m_bytes_per_second(0);
268 auto bytes_per_second = m_bytes_per_second.get_average();
269 root_obj["bytes_per_second"] = round_to_two_places(bytes_per_second);
270
271 auto bytes_per_snapshot = boost::accumulators::rolling_mean(
272 m_bytes_per_snapshot);
273 root_obj["bytes_per_snapshot"] = round_to_two_places(bytes_per_snapshot);
274
275 auto pending_bytes = bytes_per_snapshot * m_pending_snapshots;
276 if (bytes_per_second > 0 && m_pending_snapshots > 0) {
277 std::uint64_t seconds_until_synced = round_to_two_places(
278 pending_bytes / bytes_per_second);
279 if (seconds_until_synced >= std::numeric_limits<uint64_t>::max()) {
280 seconds_until_synced = std::numeric_limits<uint64_t>::max();
281 }
282
283 root_obj["seconds_until_synced"] = seconds_until_synced;
284 }
285
286 *description = json_spirit::write(
287 root_obj, json_spirit::remove_trailing_zeros);
288
289 local_image_locker.unlock();
290 remote_image_locker.unlock();
291 locker.unlock();
292 on_finish->complete(-EEXIST);
293 return true;
294 }
295
296 template <typename I>
297 void Replayer<I>::load_local_image_meta() {
298 dout(10) << dendl;
299
300 {
301 // reset state in case new snapshot is added while we are scanning
302 std::unique_lock locker{m_lock};
303 m_image_updated = false;
304 }
305
306 ceph_assert(m_state_builder->local_image_meta != nullptr);
307 auto ctx = create_context_callback<
308 Replayer<I>, &Replayer<I>::handle_load_local_image_meta>(this);
309 m_state_builder->local_image_meta->load(ctx);
310 }
311
312 template <typename I>
313 void Replayer<I>::handle_load_local_image_meta(int r) {
314 dout(10) << "r=" << r << dendl;
315
316 if (r < 0 && r != -ENOENT) {
317 derr << "failed to load local image-meta: " << cpp_strerror(r) << dendl;
318 handle_replay_complete(r, "failed to load local image-meta");
319 return;
320 }
321
322 if (r >= 0 && m_state_builder->local_image_meta->resync_requested) {
323 m_resync_requested = true;
324
325 dout(10) << "local image resync requested" << dendl;
326 handle_replay_complete(0, "resync requested");
327 return;
328 }
329
330 refresh_local_image();
331 }
332
333 template <typename I>
334 void Replayer<I>::refresh_local_image() {
335 if (!m_state_builder->local_image_ctx->state->is_refresh_required()) {
336 refresh_remote_image();
337 return;
338 }
339
340 dout(10) << dendl;
341 auto ctx = create_context_callback<
342 Replayer<I>, &Replayer<I>::handle_refresh_local_image>(this);
343 m_state_builder->local_image_ctx->state->refresh(ctx);
344 }
345
346 template <typename I>
347 void Replayer<I>::handle_refresh_local_image(int r) {
348 dout(10) << "r=" << r << dendl;
349
350 if (r < 0) {
351 derr << "failed to refresh local image: " << cpp_strerror(r) << dendl;
352 handle_replay_complete(r, "failed to refresh local image");
353 return;
354 }
355
356 refresh_remote_image();
357 }
358
359 template <typename I>
360 void Replayer<I>::refresh_remote_image() {
361 if (!m_state_builder->remote_image_ctx->state->is_refresh_required()) {
362 std::unique_lock locker{m_lock};
363 scan_local_mirror_snapshots(&locker);
364 return;
365 }
366
367 dout(10) << dendl;
368 auto ctx = create_context_callback<
369 Replayer<I>, &Replayer<I>::handle_refresh_remote_image>(this);
370 m_state_builder->remote_image_ctx->state->refresh(ctx);
371 }
372
373 template <typename I>
374 void Replayer<I>::handle_refresh_remote_image(int r) {
375 dout(10) << "r=" << r << dendl;
376
377 if (r < 0) {
378 derr << "failed to refresh remote image: " << cpp_strerror(r) << dendl;
379 handle_replay_complete(r, "failed to refresh remote image");
380 return;
381 }
382
383 std::unique_lock locker{m_lock};
384 scan_local_mirror_snapshots(&locker);
385 }
386
387 template <typename I>
388 void Replayer<I>::scan_local_mirror_snapshots(
389 std::unique_lock<ceph::mutex>* locker) {
390 if (is_replay_interrupted(locker)) {
391 return;
392 }
393
394 dout(10) << dendl;
395
396 m_local_snap_id_start = 0;
397 m_local_snap_id_end = CEPH_NOSNAP;
398 m_local_mirror_snap_ns = {};
399 m_local_object_count = 0;
400
401 m_remote_snap_id_start = 0;
402 m_remote_snap_id_end = CEPH_NOSNAP;
403 m_remote_mirror_snap_ns = {};
404
405 std::set<uint64_t> prune_snap_ids;
406
407 bool completed_non_primary_snapshots_exist = false;
408 auto local_image_ctx = m_state_builder->local_image_ctx;
409 std::shared_lock image_locker{local_image_ctx->image_lock};
410 for (auto snap_info_it = local_image_ctx->snap_info.begin();
411 snap_info_it != local_image_ctx->snap_info.end(); ++snap_info_it) {
412 const auto& snap_ns = snap_info_it->second.snap_namespace;
413 auto mirror_ns = boost::get<
414 cls::rbd::MirrorSnapshotNamespace>(&snap_ns);
415 if (mirror_ns == nullptr) {
416 continue;
417 }
418
419 dout(15) << "local mirror snapshot: id=" << snap_info_it->first << ", "
420 << "mirror_ns=" << *mirror_ns << dendl;
421 m_local_mirror_snap_ns = *mirror_ns;
422
423 auto local_snap_id = snap_info_it->first;
424 if (mirror_ns->is_non_primary()) {
425 if (mirror_ns->complete) {
426 // if remote has new snapshots, we would sync from here
427 m_local_snap_id_start = local_snap_id;
428 m_local_snap_id_end = CEPH_NOSNAP;
429 completed_non_primary_snapshots_exist = true;
430
431 if (mirror_ns->mirror_peer_uuids.empty()) {
432 // no other peer will attempt to sync to this snapshot so store as
433 // a candidate for removal
434 prune_snap_ids.insert(local_snap_id);
435 }
436 } else if (mirror_ns->last_copied_object_number == 0 &&
437 m_local_snap_id_start > 0) {
438 // shouldn't be possible, but ensure that pruning this snapshot
439 // wouldn't leave this image w/o any non-primary snapshots
440 if (!completed_non_primary_snapshots_exist) {
441 derr << "incomplete local non-primary snapshot" << dendl;
442 handle_replay_complete(locker, -EINVAL,
443 "incomplete local non-primary snapshot");
444 return;
445 }
446
447 // snapshot might be missing image state, object-map, etc, so just
448 // delete and re-create it if we haven't started copying data
449 // objects. Also only prune this snapshot since we will need the
450 // previous mirror snapshot for syncing. Special case exception for
451 // the first non-primary snapshot since we know its snapshot is
452 // well-formed because otherwise the mirror-image-state would have
453 // forced an image deletion.
454 prune_snap_ids.clear();
455 prune_snap_ids.insert(local_snap_id);
456 break;
457 } else {
458 // start snap will be last complete mirror snapshot or initial
459 // image revision
460 m_local_snap_id_end = local_snap_id;
461 break;
462 }
463 } else if (mirror_ns->is_primary()) {
464 if (mirror_ns->complete) {
465 m_local_snap_id_start = local_snap_id;
466 m_local_snap_id_end = CEPH_NOSNAP;
467 } else {
468 derr << "incomplete local primary snapshot" << dendl;
469 handle_replay_complete(locker, -EINVAL,
470 "incomplete local primary snapshot");
471 return;
472 }
473 } else {
474 derr << "unknown local mirror snapshot state" << dendl;
475 handle_replay_complete(locker, -EINVAL,
476 "invalid local mirror snapshot state");
477 return;
478 }
479 }
480 image_locker.unlock();
481
482 if (m_local_snap_id_start > 0 && m_local_snap_id_end == CEPH_NOSNAP) {
483 // remove candidate that is required for delta snapshot sync
484 prune_snap_ids.erase(m_local_snap_id_start);
485 }
486 if (!prune_snap_ids.empty()) {
487 locker->unlock();
488
489 auto prune_snap_id = *prune_snap_ids.begin();
490 dout(5) << "pruning unused non-primary snapshot " << prune_snap_id << dendl;
491 prune_non_primary_snapshot(prune_snap_id);
492 return;
493 }
494
495 if (m_local_snap_id_start > 0 || m_local_snap_id_end != CEPH_NOSNAP) {
496 if (m_local_mirror_snap_ns.is_non_primary() &&
497 m_local_mirror_snap_ns.primary_mirror_uuid !=
498 m_state_builder->remote_mirror_uuid) {
499 // TODO support multiple peers
500 derr << "local image linked to unknown peer: "
501 << m_local_mirror_snap_ns.primary_mirror_uuid << dendl;
502 handle_replay_complete(locker, -EEXIST,
503 "local image linked to unknown peer");
504 return;
505 } else if (m_local_mirror_snap_ns.state ==
506 cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY) {
507 dout(5) << "local image promoted" << dendl;
508 handle_replay_complete(locker, 0, "force promoted");
509 return;
510 }
511
512 dout(10) << "found local mirror snapshot: "
513 << "local_snap_id_start=" << m_local_snap_id_start << ", "
514 << "local_snap_id_end=" << m_local_snap_id_end << ", "
515 << "local_snap_ns=" << m_local_mirror_snap_ns << dendl;
516 if (!m_local_mirror_snap_ns.is_primary() &&
517 m_local_mirror_snap_ns.complete) {
518 // our remote sync should start after this completed snapshot
519 m_remote_snap_id_start = m_local_mirror_snap_ns.primary_snap_id;
520 }
521 }
522
523 // we don't have any mirror snapshots or only completed non-primary
524 // mirror snapshots
525 scan_remote_mirror_snapshots(locker);
526 }
527
528 template <typename I>
529 void Replayer<I>::scan_remote_mirror_snapshots(
530 std::unique_lock<ceph::mutex>* locker) {
531 dout(10) << dendl;
532
533 m_pending_snapshots = 0;
534
535 std::set<uint64_t> unlink_snap_ids;
536 bool split_brain = false;
537 bool remote_demoted = false;
538 auto remote_image_ctx = m_state_builder->remote_image_ctx;
539 std::shared_lock image_locker{remote_image_ctx->image_lock};
540 for (auto snap_info_it = remote_image_ctx->snap_info.begin();
541 snap_info_it != remote_image_ctx->snap_info.end(); ++snap_info_it) {
542 const auto& snap_ns = snap_info_it->second.snap_namespace;
543 auto mirror_ns = boost::get<
544 cls::rbd::MirrorSnapshotNamespace>(&snap_ns);
545 if (mirror_ns == nullptr) {
546 continue;
547 }
548
549 dout(15) << "remote mirror snapshot: id=" << snap_info_it->first << ", "
550 << "mirror_ns=" << *mirror_ns << dendl;
551 remote_demoted = mirror_ns->is_demoted();
552 if (!mirror_ns->is_primary() && !mirror_ns->is_non_primary()) {
553 derr << "unknown remote mirror snapshot state" << dendl;
554 handle_replay_complete(locker, -EINVAL,
555 "invalid remote mirror snapshot state");
556 return;
557 } else if (mirror_ns->mirror_peer_uuids.count(m_remote_mirror_peer_uuid) ==
558 0) {
559 dout(15) << "skipping remote snapshot due to missing mirror peer"
560 << dendl;
561 continue;
562 }
563
564 auto remote_snap_id = snap_info_it->first;
565 if (m_local_snap_id_start > 0 || m_local_snap_id_end != CEPH_NOSNAP) {
566 // we have a local mirror snapshot
567 if (m_local_mirror_snap_ns.is_non_primary()) {
568 // previously validated that it was linked to remote
569 ceph_assert(m_local_mirror_snap_ns.primary_mirror_uuid ==
570 m_state_builder->remote_mirror_uuid);
571
572 if (m_remote_snap_id_end == CEPH_NOSNAP) {
573 // haven't found the end snap so treat this as a candidate for unlink
574 unlink_snap_ids.insert(remote_snap_id);
575 }
576 if (m_local_mirror_snap_ns.complete &&
577 m_local_mirror_snap_ns.primary_snap_id >= remote_snap_id) {
578 // skip past completed remote snapshot
579 m_remote_snap_id_start = remote_snap_id;
580 m_remote_mirror_snap_ns = *mirror_ns;
581 dout(15) << "skipping synced remote snapshot " << remote_snap_id
582 << dendl;
583 continue;
584 } else if (!m_local_mirror_snap_ns.complete &&
585 m_local_mirror_snap_ns.primary_snap_id > remote_snap_id) {
586 // skip until we get to the in-progress remote snapshot
587 dout(15) << "skipping synced remote snapshot " << remote_snap_id
588 << " while search for in-progress sync" << dendl;
589 m_remote_snap_id_start = remote_snap_id;
590 m_remote_mirror_snap_ns = *mirror_ns;
591 continue;
592 }
593 } else if (m_local_mirror_snap_ns.state ==
594 cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY_DEMOTED) {
595 // find the matching demotion snapshot in remote image
596 ceph_assert(m_local_snap_id_start > 0);
597 if (mirror_ns->state ==
598 cls::rbd::MIRROR_SNAPSHOT_STATE_NON_PRIMARY_DEMOTED &&
599 mirror_ns->primary_mirror_uuid == m_local_mirror_uuid &&
600 mirror_ns->primary_snap_id == m_local_snap_id_start) {
601 dout(10) << "located matching demotion snapshot: "
602 << "remote_snap_id=" << remote_snap_id << ", "
603 << "local_snap_id=" << m_local_snap_id_start << dendl;
604 m_remote_snap_id_start = remote_snap_id;
605 split_brain = false;
606 continue;
607 } else if (m_remote_snap_id_start == 0) {
608 // still looking for our matching demotion snapshot
609 dout(15) << "skipping remote snapshot " << remote_snap_id << " "
610 << "while searching for demotion" << dendl;
611 split_brain = true;
612 continue;
613 }
614 } else {
615 // should not have been able to reach this
616 ceph_assert(false);
617 }
618 } else if (!mirror_ns->is_primary()) {
619 dout(15) << "skipping non-primary remote snapshot" << dendl;
620 continue;
621 }
622
623 // found candidate snapshot to sync
624 ++m_pending_snapshots;
625 if (m_remote_snap_id_end != CEPH_NOSNAP) {
626 continue;
627 }
628
629 // first primary snapshot where were are listed as a peer
630 m_remote_snap_id_end = remote_snap_id;
631 m_remote_mirror_snap_ns = *mirror_ns;
632 }
633
634 if (m_remote_snap_id_start != 0 &&
635 remote_image_ctx->snap_info.count(m_remote_snap_id_start) == 0) {
636 // the remote start snapshot was deleted out from under us
637 derr << "failed to locate remote start snapshot: "
638 << "snap_id=" << m_remote_snap_id_start << dendl;
639 split_brain = true;
640 }
641
642 image_locker.unlock();
643
644 if (!split_brain) {
645 unlink_snap_ids.erase(m_remote_snap_id_start);
646 unlink_snap_ids.erase(m_remote_snap_id_end);
647 if (!unlink_snap_ids.empty()) {
648 locker->unlock();
649
650 // retry the unlinking process for a remote snapshot that we do not
651 // need anymore
652 auto remote_snap_id = *unlink_snap_ids.begin();
653 dout(10) << "unlinking from remote snapshot " << remote_snap_id << dendl;
654 unlink_peer(remote_snap_id);
655 return;
656 }
657
658 if (m_remote_snap_id_end != CEPH_NOSNAP) {
659 dout(10) << "found remote mirror snapshot: "
660 << "remote_snap_id_start=" << m_remote_snap_id_start << ", "
661 << "remote_snap_id_end=" << m_remote_snap_id_end << ", "
662 << "remote_snap_ns=" << m_remote_mirror_snap_ns << dendl;
663 if (m_remote_mirror_snap_ns.complete) {
664 locker->unlock();
665
666 if (m_local_snap_id_end != CEPH_NOSNAP &&
667 !m_local_mirror_snap_ns.complete) {
668 // attempt to resume image-sync
669 dout(10) << "local image contains in-progress mirror snapshot"
670 << dendl;
671 get_local_image_state();
672 } else {
673 copy_snapshots();
674 }
675 return;
676 } else {
677 // might have raced with the creation of a remote mirror snapshot
678 // so we will need to refresh and rescan once it completes
679 dout(15) << "remote mirror snapshot not complete" << dendl;
680 }
681 }
682 }
683
684 if (m_image_updated) {
685 // received update notification while scanning image, restart ...
686 m_image_updated = false;
687 locker->unlock();
688
689 dout(10) << "restarting snapshot scan due to remote update notification"
690 << dendl;
691 load_local_image_meta();
692 return;
693 }
694
695 if (is_replay_interrupted(locker)) {
696 return;
697 } else if (split_brain) {
698 derr << "split-brain detected: failed to find matching non-primary "
699 << "snapshot in remote image: "
700 << "local_snap_id_start=" << m_local_snap_id_start << ", "
701 << "local_snap_ns=" << m_local_mirror_snap_ns << dendl;
702 handle_replay_complete(locker, -EEXIST, "split-brain");
703 return;
704 } else if (remote_demoted) {
705 dout(10) << "remote image demoted" << dendl;
706 handle_replay_complete(locker, -EREMOTEIO, "remote image demoted");
707 return;
708 }
709
710 dout(10) << "all remote snapshots synced: idling waiting for new snapshot"
711 << dendl;
712 ceph_assert(m_state == STATE_REPLAYING);
713 m_state = STATE_IDLE;
714
715 notify_status_updated();
716 }
717
718 template <typename I>
719 void Replayer<I>::prune_non_primary_snapshot(uint64_t snap_id) {
720 dout(10) << "snap_id=" << snap_id << dendl;
721
722 auto local_image_ctx = m_state_builder->local_image_ctx;
723 bool snap_valid = false;
724 cls::rbd::SnapshotNamespace snap_namespace;
725 std::string snap_name;
726
727 {
728 std::shared_lock image_locker{local_image_ctx->image_lock};
729 auto snap_info = local_image_ctx->get_snap_info(snap_id);
730 if (snap_info != nullptr) {
731 snap_valid = true;
732 snap_namespace = snap_info->snap_namespace;
733 snap_name = snap_info->name;
734
735 ceph_assert(boost::get<cls::rbd::MirrorSnapshotNamespace>(
736 &snap_namespace) != nullptr);
737 }
738 }
739
740 if (!snap_valid) {
741 load_local_image_meta();
742 return;
743 }
744
745 auto ctx = create_context_callback<
746 Replayer<I>, &Replayer<I>::handle_prune_non_primary_snapshot>(this);
747 local_image_ctx->operations->snap_remove(snap_namespace, snap_name, ctx);
748 }
749
750 template <typename I>
751 void Replayer<I>::handle_prune_non_primary_snapshot(int r) {
752 dout(10) << "r=" << r << dendl;
753
754 if (r < 0 && r != -ENOENT) {
755 derr << "failed to prune non-primary snapshot: " << cpp_strerror(r)
756 << dendl;
757 handle_replay_complete(r, "failed to prune non-primary snapshot");
758 return;
759 }
760
761 if (is_replay_interrupted()) {
762 return;
763 }
764
765 load_local_image_meta();
766 }
767
768 template <typename I>
769 void Replayer<I>::copy_snapshots() {
770 dout(10) << "remote_snap_id_start=" << m_remote_snap_id_start << ", "
771 << "remote_snap_id_end=" << m_remote_snap_id_end << ", "
772 << "local_snap_id_start=" << m_local_snap_id_start << dendl;
773
774 ceph_assert(m_remote_snap_id_start != CEPH_NOSNAP);
775 ceph_assert(m_remote_snap_id_end > 0 &&
776 m_remote_snap_id_end != CEPH_NOSNAP);
777 ceph_assert(m_local_snap_id_start != CEPH_NOSNAP);
778
779 m_local_mirror_snap_ns = {};
780 auto ctx = create_context_callback<
781 Replayer<I>, &Replayer<I>::handle_copy_snapshots>(this);
782 auto req = librbd::deep_copy::SnapshotCopyRequest<I>::create(
783 m_state_builder->remote_image_ctx, m_state_builder->local_image_ctx,
784 m_remote_snap_id_start, m_remote_snap_id_end, m_local_snap_id_start,
785 false, m_threads->work_queue, &m_local_mirror_snap_ns.snap_seqs,
786 ctx);
787 req->send();
788 }
789
790 template <typename I>
791 void Replayer<I>::handle_copy_snapshots(int r) {
792 dout(10) << "r=" << r << dendl;
793
794 if (r < 0) {
795 derr << "failed to copy snapshots from remote to local image: "
796 << cpp_strerror(r) << dendl;
797 handle_replay_complete(
798 r, "failed to copy snapshots from remote to local image");
799 return;
800 }
801
802 dout(10) << "remote_snap_id_start=" << m_remote_snap_id_start << ", "
803 << "remote_snap_id_end=" << m_remote_snap_id_end << ", "
804 << "local_snap_id_start=" << m_local_snap_id_start << ", "
805 << "snap_seqs=" << m_local_mirror_snap_ns.snap_seqs << dendl;
806 get_remote_image_state();
807 }
808
809 template <typename I>
810 void Replayer<I>::get_remote_image_state() {
811 dout(10) << dendl;
812
813 auto ctx = create_context_callback<
814 Replayer<I>, &Replayer<I>::handle_get_remote_image_state>(this);
815 auto req = librbd::mirror::snapshot::GetImageStateRequest<I>::create(
816 m_state_builder->remote_image_ctx, m_remote_snap_id_end,
817 &m_image_state, ctx);
818 req->send();
819 }
820
821 template <typename I>
822 void Replayer<I>::handle_get_remote_image_state(int r) {
823 dout(10) << "r=" << r << dendl;
824
825 if (r < 0) {
826 derr << "failed to retrieve remote snapshot image state: "
827 << cpp_strerror(r) << dendl;
828 handle_replay_complete(r, "failed to retrieve remote snapshot image state");
829 return;
830 }
831
832 create_non_primary_snapshot();
833 }
834
835 template <typename I>
836 void Replayer<I>::get_local_image_state() {
837 dout(10) << dendl;
838
839 ceph_assert(m_local_snap_id_end != CEPH_NOSNAP);
840 auto ctx = create_context_callback<
841 Replayer<I>, &Replayer<I>::handle_get_local_image_state>(this);
842 auto req = librbd::mirror::snapshot::GetImageStateRequest<I>::create(
843 m_state_builder->local_image_ctx, m_local_snap_id_end,
844 &m_image_state, ctx);
845 req->send();
846 }
847
848 template <typename I>
849 void Replayer<I>::handle_get_local_image_state(int r) {
850 dout(10) << "r=" << r << dendl;
851
852 if (r < 0) {
853 derr << "failed to retrieve local snapshot image state: "
854 << cpp_strerror(r) << dendl;
855 handle_replay_complete(r, "failed to retrieve local snapshot image state");
856 return;
857 }
858
859 request_sync();
860 }
861
862 template <typename I>
863 void Replayer<I>::create_non_primary_snapshot() {
864 auto local_image_ctx = m_state_builder->local_image_ctx;
865
866 if (m_local_snap_id_start > 0) {
867 std::shared_lock local_image_locker{local_image_ctx->image_lock};
868
869 auto local_snap_info_it = local_image_ctx->snap_info.find(
870 m_local_snap_id_start);
871 if (local_snap_info_it == local_image_ctx->snap_info.end()) {
872 local_image_locker.unlock();
873
874 derr << "failed to locate local snapshot " << m_local_snap_id_start
875 << dendl;
876 handle_replay_complete(-ENOENT, "failed to locate local start snapshot");
877 return;
878 }
879
880 auto mirror_ns = boost::get<cls::rbd::MirrorSnapshotNamespace>(
881 &local_snap_info_it->second.snap_namespace);
882 ceph_assert(mirror_ns != nullptr);
883
884 auto remote_image_ctx = m_state_builder->remote_image_ctx;
885 std::shared_lock remote_image_locker{remote_image_ctx->image_lock};
886
887 // (re)build a full mapping from remote to local snap ids for all user
888 // snapshots to support applying image state in the future
889 for (auto& [remote_snap_id, remote_snap_info] :
890 remote_image_ctx->snap_info) {
891 if (remote_snap_id >= m_remote_snap_id_end) {
892 break;
893 }
894
895 // we can ignore all non-user snapshots since image state only includes
896 // user snapshots
897 if (boost::get<cls::rbd::UserSnapshotNamespace>(
898 &remote_snap_info.snap_namespace) == nullptr) {
899 continue;
900 }
901
902 uint64_t local_snap_id = CEPH_NOSNAP;
903 if (mirror_ns->is_demoted() && !m_remote_mirror_snap_ns.is_demoted()) {
904 // if we are creating a non-primary snapshot following a demotion,
905 // re-build the full snapshot sequence since we don't have a valid
906 // snapshot mapping
907 auto local_snap_id_it = local_image_ctx->snap_ids.find(
908 {remote_snap_info.snap_namespace, remote_snap_info.name});
909 if (local_snap_id_it != local_image_ctx->snap_ids.end()) {
910 local_snap_id = local_snap_id_it->second;
911 }
912 } else {
913 auto snap_seq_it = mirror_ns->snap_seqs.find(remote_snap_id);
914 if (snap_seq_it != mirror_ns->snap_seqs.end()) {
915 local_snap_id = snap_seq_it->second;
916 }
917 }
918
919 if (m_local_mirror_snap_ns.snap_seqs.count(remote_snap_id) == 0 &&
920 local_snap_id != CEPH_NOSNAP) {
921 dout(15) << "mapping remote snapshot " << remote_snap_id << " to "
922 << "local snapshot " << local_snap_id << dendl;
923 m_local_mirror_snap_ns.snap_seqs[remote_snap_id] = local_snap_id;
924 }
925 }
926 }
927
928 dout(10) << "demoted=" << m_remote_mirror_snap_ns.is_demoted() << ", "
929 << "primary_mirror_uuid="
930 << m_state_builder->remote_mirror_uuid << ", "
931 << "primary_snap_id=" << m_remote_snap_id_end << ", "
932 << "snap_seqs=" << m_local_mirror_snap_ns.snap_seqs << dendl;
933
934 auto ctx = create_context_callback<
935 Replayer<I>, &Replayer<I>::handle_create_non_primary_snapshot>(this);
936 auto req = librbd::mirror::snapshot::CreateNonPrimaryRequest<I>::create(
937 local_image_ctx, m_remote_mirror_snap_ns.is_demoted(),
938 m_state_builder->remote_mirror_uuid, m_remote_snap_id_end,
939 m_local_mirror_snap_ns.snap_seqs, m_image_state, &m_local_snap_id_end, ctx);
940 req->send();
941 }
942
943 template <typename I>
944 void Replayer<I>::handle_create_non_primary_snapshot(int r) {
945 dout(10) << "r=" << r << dendl;
946
947 if (r < 0) {
948 derr << "failed to create local mirror snapshot: " << cpp_strerror(r)
949 << dendl;
950 handle_replay_complete(r, "failed to create local mirror snapshot");
951 return;
952 }
953
954 dout(15) << "local_snap_id_end=" << m_local_snap_id_end << dendl;
955
956 update_mirror_image_state();
957 }
958
959 template <typename I>
960 void Replayer<I>::update_mirror_image_state() {
961 if (m_local_snap_id_start > 0) {
962 request_sync();
963 return;
964 }
965
966 // a newly created non-primary image has a local mirror state of CREATING
967 // until this point so that we could avoid preserving the image until
968 // the first non-primary snapshot linked the two images together.
969 dout(10) << dendl;
970 auto ctx = create_context_callback<
971 Replayer<I>, &Replayer<I>::handle_update_mirror_image_state>(this);
972 auto req = librbd::mirror::ImageStateUpdateRequest<I>::create(
973 m_state_builder->local_image_ctx->md_ctx,
974 m_state_builder->local_image_ctx->id,
975 cls::rbd::MIRROR_IMAGE_STATE_ENABLED, {}, ctx);
976 req->send();
977 }
978
979 template <typename I>
980 void Replayer<I>::handle_update_mirror_image_state(int r) {
981 dout(10) << "r=" << r << dendl;
982
983 if (r < 0) {
984 derr << "failed to update local mirror image state: " << cpp_strerror(r)
985 << dendl;
986 handle_replay_complete(r, "failed to update local mirror image state");
987 return;
988 }
989
990 request_sync();
991 }
992
993 template <typename I>
994 void Replayer<I>::request_sync() {
995 if (m_remote_mirror_snap_ns.clean_since_snap_id == m_remote_snap_id_start) {
996 dout(10) << "skipping unnecessary image copy: "
997 << "remote_snap_id_start=" << m_remote_snap_id_start << ", "
998 << "remote_mirror_snap_ns=" << m_remote_mirror_snap_ns << dendl;
999 apply_image_state();
1000 return;
1001 }
1002
1003 dout(10) << dendl;
1004 std::unique_lock locker{m_lock};
1005 if (is_replay_interrupted(&locker)) {
1006 return;
1007 }
1008
1009 auto ctx = create_async_context_callback(
1010 m_threads->work_queue, create_context_callback<
1011 Replayer<I>, &Replayer<I>::handle_request_sync>(this));
1012 m_instance_watcher->notify_sync_request(m_state_builder->local_image_ctx->id,
1013 ctx);
1014 }
1015
1016 template <typename I>
1017 void Replayer<I>::handle_request_sync(int r) {
1018 dout(10) << "r=" << r << dendl;
1019
1020 std::unique_lock locker{m_lock};
1021 if (is_replay_interrupted(&locker)) {
1022 return;
1023 } else if (r == -ECANCELED) {
1024 dout(5) << "image-sync canceled" << dendl;
1025 handle_replay_complete(&locker, r, "image-sync canceled");
1026 return;
1027 } else if (r < 0) {
1028 derr << "failed to request image-sync: " << cpp_strerror(r) << dendl;
1029 handle_replay_complete(&locker, r, "failed to request image-sync");
1030 return;
1031 }
1032
1033 m_sync_in_progress = true;
1034 locker.unlock();
1035
1036 copy_image();
1037 }
1038
1039 template <typename I>
1040 void Replayer<I>::copy_image() {
1041 dout(10) << "remote_snap_id_start=" << m_remote_snap_id_start << ", "
1042 << "remote_snap_id_end=" << m_remote_snap_id_end << ", "
1043 << "local_snap_id_start=" << m_local_snap_id_start << ", "
1044 << "last_copied_object_number="
1045 << m_local_mirror_snap_ns.last_copied_object_number << ", "
1046 << "snap_seqs=" << m_local_mirror_snap_ns.snap_seqs << dendl;
1047
1048 m_snapshot_bytes = 0;
1049 m_deep_copy_handler = new DeepCopyHandler(this);
1050 auto ctx = create_context_callback<
1051 Replayer<I>, &Replayer<I>::handle_copy_image>(this);
1052 auto req = librbd::deep_copy::ImageCopyRequest<I>::create(
1053 m_state_builder->remote_image_ctx, m_state_builder->local_image_ctx,
1054 m_remote_snap_id_start, m_remote_snap_id_end, m_local_snap_id_start, false,
1055 (m_local_mirror_snap_ns.last_copied_object_number > 0 ?
1056 librbd::deep_copy::ObjectNumber{
1057 m_local_mirror_snap_ns.last_copied_object_number} :
1058 librbd::deep_copy::ObjectNumber{}),
1059 m_local_mirror_snap_ns.snap_seqs, m_deep_copy_handler, ctx);
1060 req->send();
1061 }
1062
1063 template <typename I>
1064 void Replayer<I>::handle_copy_image(int r) {
1065 dout(10) << "r=" << r << dendl;
1066
1067 delete m_deep_copy_handler;
1068 m_deep_copy_handler = nullptr;
1069
1070 if (r < 0) {
1071 derr << "failed to copy remote image to local image: " << cpp_strerror(r)
1072 << dendl;
1073 handle_replay_complete(r, "failed to copy remote image");
1074 return;
1075 }
1076
1077 {
1078 std::unique_lock locker{m_lock};
1079 m_bytes_per_snapshot(m_snapshot_bytes);
1080 m_snapshot_bytes = 0;
1081 }
1082
1083 apply_image_state();
1084 }
1085
1086 template <typename I>
1087 void Replayer<I>::handle_copy_image_progress(uint64_t object_number,
1088 uint64_t object_count) {
1089 dout(10) << "object_number=" << object_number << ", "
1090 << "object_count=" << object_count << dendl;
1091
1092 std::unique_lock locker{m_lock};
1093 m_local_mirror_snap_ns.last_copied_object_number = std::min(
1094 object_number, object_count);
1095 m_local_object_count = object_count;
1096
1097 update_non_primary_snapshot(false);
1098 }
1099
1100 template <typename I>
1101 void Replayer<I>::handle_copy_image_read(uint64_t bytes_read) {
1102 dout(20) << "bytes_read=" << bytes_read << dendl;
1103
1104 std::unique_lock locker{m_lock};
1105 m_bytes_per_second(bytes_read);
1106 m_snapshot_bytes += bytes_read;
1107 }
1108
1109 template <typename I>
1110 void Replayer<I>::apply_image_state() {
1111 dout(10) << dendl;
1112
1113 auto ctx = create_context_callback<
1114 Replayer<I>, &Replayer<I>::handle_apply_image_state>(this);
1115 auto req = ApplyImageStateRequest<I>::create(
1116 m_local_mirror_uuid,
1117 m_state_builder->remote_mirror_uuid,
1118 m_state_builder->local_image_ctx,
1119 m_state_builder->remote_image_ctx,
1120 m_image_state, ctx);
1121 req->send();
1122 }
1123
1124 template <typename I>
1125 void Replayer<I>::handle_apply_image_state(int r) {
1126 dout(10) << "r=" << r << dendl;
1127
1128 if (r < 0 && r != -ENOENT) {
1129 derr << "failed to apply remote image state to local image: "
1130 << cpp_strerror(r) << dendl;
1131 handle_replay_complete(r, "failed to apply remote image state");
1132 return;
1133 }
1134
1135 std::unique_lock locker{m_lock};
1136 update_non_primary_snapshot(true);
1137 }
1138
1139 template <typename I>
1140 void Replayer<I>::update_non_primary_snapshot(bool complete) {
1141 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1142 if (!complete) {
1143 // disallow two in-flight updates if this isn't the completion of the sync
1144 if (m_updating_sync_point) {
1145 return;
1146 }
1147 m_updating_sync_point = true;
1148 } else {
1149 m_local_mirror_snap_ns.complete = true;
1150 }
1151
1152 dout(10) << dendl;
1153
1154 librados::ObjectWriteOperation op;
1155 librbd::cls_client::mirror_image_snapshot_set_copy_progress(
1156 &op, m_local_snap_id_end, m_local_mirror_snap_ns.complete,
1157 m_local_mirror_snap_ns.last_copied_object_number);
1158
1159 auto ctx = new C_TrackedOp(
1160 m_in_flight_op_tracker, new LambdaContext([this, complete](int r) {
1161 handle_update_non_primary_snapshot(complete, r);
1162 }));
1163 auto aio_comp = create_rados_callback(ctx);
1164 int r = m_state_builder->local_image_ctx->md_ctx.aio_operate(
1165 m_state_builder->local_image_ctx->header_oid, aio_comp, &op);
1166 ceph_assert(r == 0);
1167 aio_comp->release();
1168 }
1169
1170 template <typename I>
1171 void Replayer<I>::handle_update_non_primary_snapshot(bool complete, int r) {
1172 dout(10) << "r=" << r << dendl;
1173
1174 if (r < 0) {
1175 derr << "failed to update local snapshot progress: " << cpp_strerror(r)
1176 << dendl;
1177 if (complete) {
1178 // only fail if this was the final update
1179 handle_replay_complete(r, "failed to update local snapshot progress");
1180 return;
1181 }
1182 }
1183
1184 if (!complete) {
1185 // periodic sync-point update -- do not advance state machine
1186 std::unique_lock locker{m_lock};
1187
1188 ceph_assert(m_updating_sync_point);
1189 m_updating_sync_point = false;
1190 return;
1191 }
1192
1193 notify_image_update();
1194 }
1195
1196 template <typename I>
1197 void Replayer<I>::notify_image_update() {
1198 dout(10) << dendl;
1199
1200 auto ctx = create_context_callback<
1201 Replayer<I>, &Replayer<I>::handle_notify_image_update>(this);
1202 m_state_builder->local_image_ctx->notify_update(ctx);
1203 }
1204
1205 template <typename I>
1206 void Replayer<I>::handle_notify_image_update(int r) {
1207 dout(10) << "r=" << r << dendl;
1208
1209 if (r < 0) {
1210 derr << "failed to notify local image update: " << cpp_strerror(r) << dendl;
1211 }
1212
1213 unlink_peer(m_remote_snap_id_start);
1214 }
1215
1216 template <typename I>
1217 void Replayer<I>::unlink_peer(uint64_t remote_snap_id) {
1218 if (remote_snap_id == 0) {
1219 finish_sync();
1220 return;
1221 }
1222
1223 // local snapshot fully synced -- we no longer depend on the sync
1224 // start snapshot in the remote image
1225 dout(10) << "remote_snap_id=" << remote_snap_id << dendl;
1226
1227 auto ctx = create_context_callback<
1228 Replayer<I>, &Replayer<I>::handle_unlink_peer>(this);
1229 auto req = librbd::mirror::snapshot::UnlinkPeerRequest<I>::create(
1230 m_state_builder->remote_image_ctx, remote_snap_id,
1231 m_remote_mirror_peer_uuid, ctx);
1232 req->send();
1233 }
1234
1235 template <typename I>
1236 void Replayer<I>::handle_unlink_peer(int r) {
1237 dout(10) << "r=" << r << dendl;
1238
1239 if (r < 0 && r != -ENOENT) {
1240 derr << "failed to unlink local peer from remote image: " << cpp_strerror(r)
1241 << dendl;
1242 handle_replay_complete(r, "failed to unlink local peer from remote image");
1243 return;
1244 }
1245
1246 finish_sync();
1247 }
1248
1249 template <typename I>
1250 void Replayer<I>::finish_sync() {
1251 dout(10) << dendl;
1252
1253 {
1254 std::unique_lock locker{m_lock};
1255 notify_status_updated();
1256
1257 if (m_sync_in_progress) {
1258 m_sync_in_progress = false;
1259 m_instance_watcher->notify_sync_complete(
1260 m_state_builder->local_image_ctx->id);
1261 }
1262 }
1263
1264 if (is_replay_interrupted()) {
1265 return;
1266 }
1267
1268 load_local_image_meta();
1269 }
1270
1271 template <typename I>
1272 void Replayer<I>::register_local_update_watcher() {
1273 dout(10) << dendl;
1274
1275 m_update_watch_ctx = new C_UpdateWatchCtx(this);
1276
1277 int r = m_state_builder->local_image_ctx->state->register_update_watcher(
1278 m_update_watch_ctx, &m_local_update_watcher_handle);
1279 auto ctx = create_context_callback<
1280 Replayer<I>, &Replayer<I>::handle_register_local_update_watcher>(this);
1281 m_threads->work_queue->queue(ctx, r);
1282 }
1283
1284 template <typename I>
1285 void Replayer<I>::handle_register_local_update_watcher(int r) {
1286 dout(10) << "r=" << r << dendl;
1287
1288 if (r < 0) {
1289 derr << "failed to register local update watcher: " << cpp_strerror(r)
1290 << dendl;
1291 handle_replay_complete(r, "failed to register local image update watcher");
1292 m_state = STATE_COMPLETE;
1293
1294 delete m_update_watch_ctx;
1295 m_update_watch_ctx = nullptr;
1296
1297 Context* on_init = nullptr;
1298 std::swap(on_init, m_on_init_shutdown);
1299 on_init->complete(r);
1300 return;
1301 }
1302
1303 register_remote_update_watcher();
1304 }
1305
1306 template <typename I>
1307 void Replayer<I>::register_remote_update_watcher() {
1308 dout(10) << dendl;
1309
1310 int r = m_state_builder->remote_image_ctx->state->register_update_watcher(
1311 m_update_watch_ctx, &m_remote_update_watcher_handle);
1312 auto ctx = create_context_callback<
1313 Replayer<I>, &Replayer<I>::handle_register_remote_update_watcher>(this);
1314 m_threads->work_queue->queue(ctx, r);
1315 }
1316
1317 template <typename I>
1318 void Replayer<I>::handle_register_remote_update_watcher(int r) {
1319 dout(10) << "r=" << r << dendl;
1320
1321 if (r < 0) {
1322 derr << "failed to register remote update watcher: " << cpp_strerror(r)
1323 << dendl;
1324 handle_replay_complete(r, "failed to register remote image update watcher");
1325 m_state = STATE_COMPLETE;
1326
1327 unregister_local_update_watcher();
1328 return;
1329 }
1330
1331 m_state = STATE_REPLAYING;
1332
1333 Context* on_init = nullptr;
1334 std::swap(on_init, m_on_init_shutdown);
1335 on_init->complete(0);
1336
1337 // delay initial snapshot scan until after we have alerted
1338 // image replayer that we have initialized in case an error
1339 // occurs
1340 {
1341 std::unique_lock locker{m_lock};
1342 notify_status_updated();
1343 }
1344
1345 load_local_image_meta();
1346 }
1347
1348 template <typename I>
1349 void Replayer<I>::unregister_remote_update_watcher() {
1350 dout(10) << dendl;
1351
1352 auto ctx = create_context_callback<
1353 Replayer<I>,
1354 &Replayer<I>::handle_unregister_remote_update_watcher>(this);
1355 m_state_builder->remote_image_ctx->state->unregister_update_watcher(
1356 m_remote_update_watcher_handle, ctx);
1357 }
1358
1359 template <typename I>
1360 void Replayer<I>::handle_unregister_remote_update_watcher(int r) {
1361 dout(10) << "r=" << r << dendl;
1362
1363 if (r < 0) {
1364 derr << "failed to unregister remote update watcher: " << cpp_strerror(r)
1365 << dendl;
1366 handle_replay_complete(
1367 r, "failed to unregister remote image update watcher");
1368 }
1369
1370 unregister_local_update_watcher();
1371 }
1372
1373 template <typename I>
1374 void Replayer<I>::unregister_local_update_watcher() {
1375 dout(10) << dendl;
1376
1377 auto ctx = create_context_callback<
1378 Replayer<I>,
1379 &Replayer<I>::handle_unregister_local_update_watcher>(this);
1380 m_state_builder->local_image_ctx->state->unregister_update_watcher(
1381 m_local_update_watcher_handle, ctx);
1382 }
1383
1384 template <typename I>
1385 void Replayer<I>::handle_unregister_local_update_watcher(int r) {
1386 dout(10) << "r=" << r << dendl;
1387
1388 if (r < 0) {
1389 derr << "failed to unregister local update watcher: " << cpp_strerror(r)
1390 << dendl;
1391 handle_replay_complete(
1392 r, "failed to unregister local image update watcher");
1393 }
1394
1395 delete m_update_watch_ctx;
1396 m_update_watch_ctx = nullptr;
1397
1398 wait_for_in_flight_ops();
1399 }
1400
1401 template <typename I>
1402 void Replayer<I>::wait_for_in_flight_ops() {
1403 dout(10) << dendl;
1404
1405 auto ctx = create_async_context_callback(
1406 m_threads->work_queue, create_context_callback<
1407 Replayer<I>, &Replayer<I>::handle_wait_for_in_flight_ops>(this));
1408 m_in_flight_op_tracker.wait_for_ops(ctx);
1409 }
1410
1411 template <typename I>
1412 void Replayer<I>::handle_wait_for_in_flight_ops(int r) {
1413 dout(10) << "r=" << r << dendl;
1414
1415 Context* on_shutdown = nullptr;
1416 {
1417 std::unique_lock locker{m_lock};
1418 ceph_assert(m_on_init_shutdown != nullptr);
1419 std::swap(on_shutdown, m_on_init_shutdown);
1420 }
1421 on_shutdown->complete(m_error_code);
1422 }
1423
1424 template <typename I>
1425 void Replayer<I>::handle_image_update_notify() {
1426 dout(10) << dendl;
1427
1428 std::unique_lock locker{m_lock};
1429 if (m_state == STATE_REPLAYING) {
1430 dout(15) << "flagging snapshot rescan required" << dendl;
1431 m_image_updated = true;
1432 } else if (m_state == STATE_IDLE) {
1433 m_state = STATE_REPLAYING;
1434 locker.unlock();
1435
1436 dout(15) << "restarting idle replayer" << dendl;
1437 load_local_image_meta();
1438 }
1439 }
1440
1441 template <typename I>
1442 void Replayer<I>::handle_replay_complete(int r,
1443 const std::string& description) {
1444 std::unique_lock locker{m_lock};
1445 handle_replay_complete(&locker, r, description);
1446 }
1447
1448 template <typename I>
1449 void Replayer<I>::handle_replay_complete(std::unique_lock<ceph::mutex>* locker,
1450 int r,
1451 const std::string& description) {
1452 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1453
1454 if (m_error_code == 0) {
1455 m_error_code = r;
1456 m_error_description = description;
1457 }
1458
1459 if (m_sync_in_progress) {
1460 m_sync_in_progress = false;
1461 m_instance_watcher->notify_sync_complete(
1462 m_state_builder->local_image_ctx->id);
1463 }
1464
1465 if (m_state != STATE_REPLAYING && m_state != STATE_IDLE) {
1466 return;
1467 }
1468
1469 m_state = STATE_COMPLETE;
1470 notify_status_updated();
1471 }
1472
1473 template <typename I>
1474 void Replayer<I>::notify_status_updated() {
1475 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1476
1477 dout(10) << dendl;
1478 auto ctx = new C_TrackedOp(m_in_flight_op_tracker, new LambdaContext(
1479 [this](int) {
1480 m_replayer_listener->handle_notification();
1481 }));
1482 m_threads->work_queue->queue(ctx, 0);
1483 }
1484
1485 template <typename I>
1486 bool Replayer<I>::is_replay_interrupted() {
1487 std::unique_lock locker{m_lock};
1488 return is_replay_interrupted(&locker);
1489 }
1490
1491 template <typename I>
1492 bool Replayer<I>::is_replay_interrupted(std::unique_lock<ceph::mutex>* locker) {
1493 if (m_state == STATE_COMPLETE) {
1494 locker->unlock();
1495
1496 dout(10) << "resuming pending shut down" << dendl;
1497 unregister_remote_update_watcher();
1498 return true;
1499 }
1500 return false;
1501 }
1502
1503 } // namespace snapshot
1504 } // namespace image_replayer
1505 } // namespace mirror
1506 } // namespace rbd
1507
1508 template class rbd::mirror::image_replayer::snapshot::Replayer<librbd::ImageCtx>;