]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/image_replayer/snapshot/Replayer.cc
import ceph 15.2.10
[ceph.git] / ceph / src / tools / rbd_mirror / image_replayer / snapshot / Replayer.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "Replayer.h"
5#include "common/debug.h"
6#include "common/errno.h"
7#include "include/stringify.h"
8#include "common/Timer.h"
9#include "common/WorkQueue.h"
10#include "cls/rbd/cls_rbd_client.h"
1911f103 11#include "json_spirit/json_spirit.h"
9f95a23c
TL
12#include "librbd/ImageCtx.h"
13#include "librbd/ImageState.h"
1911f103 14#include "librbd/Operations.h"
9f95a23c 15#include "librbd/Utils.h"
1911f103 16#include "librbd/deep_copy/Handler.h"
9f95a23c
TL
17#include "librbd/deep_copy/ImageCopyRequest.h"
18#include "librbd/deep_copy/SnapshotCopyRequest.h"
19#include "librbd/mirror/snapshot/CreateNonPrimaryRequest.h"
20#include "librbd/mirror/snapshot/GetImageStateRequest.h"
21#include "librbd/mirror/snapshot/ImageMeta.h"
22#include "librbd/mirror/snapshot/UnlinkPeerRequest.h"
23#include "tools/rbd_mirror/InstanceWatcher.h"
24#include "tools/rbd_mirror/PoolMetaCache.h"
25#include "tools/rbd_mirror/Threads.h"
26#include "tools/rbd_mirror/Types.h"
27#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
28#include "tools/rbd_mirror/image_replayer/ReplayerListener.h"
29#include "tools/rbd_mirror/image_replayer/Utils.h"
30#include "tools/rbd_mirror/image_replayer/snapshot/ApplyImageStateRequest.h"
31#include "tools/rbd_mirror/image_replayer/snapshot/StateBuilder.h"
32#include "tools/rbd_mirror/image_replayer/snapshot/Utils.h"
1911f103 33#include <set>
9f95a23c
TL
34
35#define dout_context g_ceph_context
36#define dout_subsys ceph_subsys_rbd_mirror
37#undef dout_prefix
38#define dout_prefix *_dout << "rbd::mirror::image_replayer::snapshot::" \
39 << "Replayer: " << this << " " << __func__ << ": "
40
41extern PerfCounters *g_perf_counters;
42
43namespace rbd {
44namespace mirror {
45namespace image_replayer {
46namespace snapshot {
47
48namespace {
49
1911f103
TL
50double round_to_two_places(double value) {
51 return abs(round(value * 100) / 100);
52}
53
9f95a23c
TL
54template<typename I>
55std::pair<uint64_t, librbd::SnapInfo*> get_newest_mirror_snapshot(
56 I* image_ctx) {
57 for (auto snap_info_it = image_ctx->snap_info.rbegin();
58 snap_info_it != image_ctx->snap_info.rend(); ++snap_info_it) {
59 const auto& snap_ns = snap_info_it->second.snap_namespace;
60 auto mirror_ns = boost::get<
61 cls::rbd::MirrorSnapshotNamespace>(&snap_ns);
62 if (mirror_ns == nullptr || !mirror_ns->complete) {
63 continue;
64 }
65
66 return {snap_info_it->first, &snap_info_it->second};
67 }
68
69 return {CEPH_NOSNAP, nullptr};
70}
71
72} // anonymous namespace
73
74using librbd::util::create_async_context_callback;
75using librbd::util::create_context_callback;
76using librbd::util::create_rados_callback;
77
78template <typename I>
79struct Replayer<I>::C_UpdateWatchCtx : public librbd::UpdateWatchCtx {
80 Replayer<I>* replayer;
81
82 C_UpdateWatchCtx(Replayer<I>* replayer) : replayer(replayer) {
83 }
84
85 void handle_notify() override {
86 replayer->handle_image_update_notify();
87 }
88};
89
90template <typename I>
1911f103 91struct Replayer<I>::DeepCopyHandler : public librbd::deep_copy::Handler {
9f95a23c 92 Replayer *replayer;
9f95a23c 93
1911f103 94 DeepCopyHandler(Replayer* replayer) : replayer(replayer) {
9f95a23c 95 }
9f95a23c 96
1911f103
TL
97 void handle_read(uint64_t bytes_read) override {
98 replayer->handle_copy_image_read(bytes_read);
9f95a23c
TL
99 }
100
101 int update_progress(uint64_t object_number, uint64_t object_count) override {
102 replayer->handle_copy_image_progress(object_number, object_count);
103 return 0;
104 }
105};
106
107template <typename I>
108Replayer<I>::Replayer(
109 Threads<I>* threads,
110 InstanceWatcher<I>* instance_watcher,
111 const std::string& local_mirror_uuid,
112 PoolMetaCache* pool_meta_cache,
113 StateBuilder<I>* state_builder,
114 ReplayerListener* replayer_listener)
115 : m_threads(threads),
116 m_instance_watcher(instance_watcher),
117 m_local_mirror_uuid(local_mirror_uuid),
118 m_pool_meta_cache(pool_meta_cache),
119 m_state_builder(state_builder),
120 m_replayer_listener(replayer_listener),
121 m_lock(ceph::make_mutex(librbd::util::unique_lock_name(
122 "rbd::mirror::image_replayer::snapshot::Replayer", this))) {
123 dout(10) << dendl;
124}
125
126template <typename I>
127Replayer<I>::~Replayer() {
128 dout(10) << dendl;
129 ceph_assert(m_state == STATE_COMPLETE);
130 ceph_assert(m_update_watch_ctx == nullptr);
1911f103 131 ceph_assert(m_deep_copy_handler == nullptr);
9f95a23c
TL
132}
133
134template <typename I>
135void Replayer<I>::init(Context* on_finish) {
136 dout(10) << dendl;
137
138 ceph_assert(m_state == STATE_INIT);
139
140 RemotePoolMeta remote_pool_meta;
141 int r = m_pool_meta_cache->get_remote_pool_meta(
142 m_state_builder->remote_image_ctx->md_ctx.get_id(), &remote_pool_meta);
143 if (r < 0 || remote_pool_meta.mirror_peer_uuid.empty()) {
144 derr << "failed to retrieve mirror peer uuid from remote pool" << dendl;
145 m_state = STATE_COMPLETE;
146 m_threads->work_queue->queue(on_finish, r);
147 return;
148 }
149
150 m_remote_mirror_peer_uuid = remote_pool_meta.mirror_peer_uuid;
151 dout(10) << "remote_mirror_peer_uuid=" << m_remote_mirror_peer_uuid << dendl;
152
153 ceph_assert(m_on_init_shutdown == nullptr);
154 m_on_init_shutdown = on_finish;
155
156 register_local_update_watcher();
157}
158
159template <typename I>
160void Replayer<I>::shut_down(Context* on_finish) {
161 dout(10) << dendl;
162
163 std::unique_lock locker{m_lock};
164 ceph_assert(m_on_init_shutdown == nullptr);
165 m_on_init_shutdown = on_finish;
166 m_error_code = 0;
167 m_error_description = "";
168
169 ceph_assert(m_state != STATE_INIT);
170 auto state = STATE_COMPLETE;
171 std::swap(m_state, state);
172
173 if (state == STATE_REPLAYING) {
174 // if a sync request was pending, request a cancelation
175 m_instance_watcher->cancel_sync_request(
176 m_state_builder->local_image_ctx->id);
177
178 // TODO interrupt snapshot copy and image copy state machines even if remote
179 // cluster is unreachable
180 dout(10) << "shut down pending on completion of snapshot replay" << dendl;
181 return;
182 }
183 locker.unlock();
184
185 unregister_remote_update_watcher();
186}
187
188template <typename I>
189void Replayer<I>::flush(Context* on_finish) {
190 dout(10) << dendl;
191
192 // TODO
193 m_threads->work_queue->queue(on_finish, 0);
194}
195
196template <typename I>
197bool Replayer<I>::get_replay_status(std::string* description,
198 Context* on_finish) {
199 dout(10) << dendl;
200
201 std::unique_lock locker{m_lock};
202 if (m_state != STATE_REPLAYING && m_state != STATE_IDLE) {
203 locker.unlock();
204
205 derr << "replay not running" << dendl;
206 on_finish->complete(-EAGAIN);
207 return false;
208 }
209
210 std::shared_lock local_image_locker{
211 m_state_builder->local_image_ctx->image_lock};
212 auto [local_snap_id, local_snap_info] = get_newest_mirror_snapshot(
213 m_state_builder->local_image_ctx);
214
215 std::shared_lock remote_image_locker{
216 m_state_builder->remote_image_ctx->image_lock};
217 auto [remote_snap_id, remote_snap_info] = get_newest_mirror_snapshot(
218 m_state_builder->remote_image_ctx);
219
220 if (remote_snap_info == nullptr) {
221 remote_image_locker.unlock();
222 local_image_locker.unlock();
223 locker.unlock();
224
225 derr << "remote image does not contain mirror snapshots" << dendl;
226 on_finish->complete(-EAGAIN);
227 return false;
228 }
229
230 std::string replay_state = "idle";
231 if (m_remote_snap_id_end != CEPH_NOSNAP) {
232 replay_state = "syncing";
233 }
234
1911f103
TL
235 json_spirit::mObject root_obj;
236 root_obj["replay_state"] = replay_state;
237 root_obj["remote_snapshot_timestamp"] = remote_snap_info->timestamp.sec();
9f95a23c
TL
238
239 auto matching_remote_snap_id = util::compute_remote_snap_id(
240 m_state_builder->local_image_ctx->image_lock,
241 m_state_builder->local_image_ctx->snap_info,
242 local_snap_id, m_state_builder->remote_mirror_uuid);
243 auto matching_remote_snap_it =
244 m_state_builder->remote_image_ctx->snap_info.find(matching_remote_snap_id);
245 if (matching_remote_snap_id != CEPH_NOSNAP &&
246 matching_remote_snap_it !=
247 m_state_builder->remote_image_ctx->snap_info.end()) {
248 // use the timestamp from the matching remote image since
249 // the local snapshot would just be the time the snapshot was
250 // synced and not the consistency point in time.
1911f103
TL
251 root_obj["local_snapshot_timestamp"] =
252 matching_remote_snap_it->second.timestamp.sec();
9f95a23c
TL
253 }
254
255 matching_remote_snap_it = m_state_builder->remote_image_ctx->snap_info.find(
256 m_remote_snap_id_end);
257 if (m_remote_snap_id_end != CEPH_NOSNAP &&
258 matching_remote_snap_it !=
259 m_state_builder->remote_image_ctx->snap_info.end()) {
1911f103
TL
260 root_obj["syncing_snapshot_timestamp"] = remote_snap_info->timestamp.sec();
261 root_obj["syncing_percent"] = static_cast<uint64_t>(
9f95a23c 262 100 * m_local_mirror_snap_ns.last_copied_object_number /
1911f103 263 static_cast<float>(std::max<uint64_t>(1U, m_local_object_count)));
9f95a23c
TL
264 }
265
1911f103
TL
266 m_bytes_per_second(0);
267 auto bytes_per_second = m_bytes_per_second.get_average();
268 root_obj["bytes_per_second"] = round_to_two_places(bytes_per_second);
269
270 auto bytes_per_snapshot = boost::accumulators::rolling_mean(
271 m_bytes_per_snapshot);
272 root_obj["bytes_per_snapshot"] = round_to_two_places(bytes_per_snapshot);
273
274 auto pending_bytes = bytes_per_snapshot * m_pending_snapshots;
275 if (bytes_per_second > 0 && m_pending_snapshots > 0) {
276 auto seconds_until_synced = round_to_two_places(
277 pending_bytes / bytes_per_second);
278 if (seconds_until_synced >= std::numeric_limits<uint64_t>::max()) {
279 seconds_until_synced = std::numeric_limits<uint64_t>::max();
280 }
281
282 root_obj["seconds_until_synced"] = static_cast<uint64_t>(
283 seconds_until_synced);
284 }
285
286 *description = json_spirit::write(
287 root_obj, json_spirit::remove_trailing_zeros);
9f95a23c
TL
288
289 local_image_locker.unlock();
290 remote_image_locker.unlock();
291 locker.unlock();
292 on_finish->complete(-EEXIST);
293 return true;
294}
295
296template <typename I>
297void Replayer<I>::load_local_image_meta() {
298 dout(10) << dendl;
299
300 {
301 // reset state in case new snapshot is added while we are scanning
302 std::unique_lock locker{m_lock};
303 m_image_updated = false;
304 }
305
306 ceph_assert(m_state_builder->local_image_meta != nullptr);
307 auto ctx = create_context_callback<
308 Replayer<I>, &Replayer<I>::handle_load_local_image_meta>(this);
309 m_state_builder->local_image_meta->load(ctx);
310}
311
312template <typename I>
313void Replayer<I>::handle_load_local_image_meta(int r) {
314 dout(10) << "r=" << r << dendl;
315
316 if (r < 0 && r != -ENOENT) {
317 derr << "failed to load local image-meta: " << cpp_strerror(r) << dendl;
318 handle_replay_complete(r, "failed to load local image-meta");
319 return;
320 }
321
322 if (r >= 0 && m_state_builder->local_image_meta->resync_requested) {
323 m_resync_requested = true;
324
325 dout(10) << "local image resync requested" << dendl;
326 handle_replay_complete(0, "resync requested");
327 return;
328 }
329
330 refresh_local_image();
331}
332
333template <typename I>
334void Replayer<I>::refresh_local_image() {
335 if (!m_state_builder->local_image_ctx->state->is_refresh_required()) {
336 refresh_remote_image();
337 return;
338 }
339
340 dout(10) << dendl;
341 auto ctx = create_context_callback<
342 Replayer<I>, &Replayer<I>::handle_refresh_local_image>(this);
343 m_state_builder->local_image_ctx->state->refresh(ctx);
344}
345
346template <typename I>
347void Replayer<I>::handle_refresh_local_image(int r) {
348 dout(10) << "r=" << r << dendl;
349
350 if (r < 0) {
351 derr << "failed to refresh local image: " << cpp_strerror(r) << dendl;
352 handle_replay_complete(r, "failed to refresh local image");
353 return;
354 }
355
356 refresh_remote_image();
357}
358
359template <typename I>
360void Replayer<I>::refresh_remote_image() {
361 if (!m_state_builder->remote_image_ctx->state->is_refresh_required()) {
362 std::unique_lock locker{m_lock};
363 scan_local_mirror_snapshots(&locker);
364 return;
365 }
366
367 dout(10) << dendl;
368 auto ctx = create_context_callback<
369 Replayer<I>, &Replayer<I>::handle_refresh_remote_image>(this);
370 m_state_builder->remote_image_ctx->state->refresh(ctx);
371}
372
373template <typename I>
374void Replayer<I>::handle_refresh_remote_image(int r) {
375 dout(10) << "r=" << r << dendl;
376
377 if (r < 0) {
378 derr << "failed to refresh remote image: " << cpp_strerror(r) << dendl;
379 handle_replay_complete(r, "failed to refresh remote image");
380 return;
381 }
382
383 std::unique_lock locker{m_lock};
384 scan_local_mirror_snapshots(&locker);
385}
386
387template <typename I>
388void Replayer<I>::scan_local_mirror_snapshots(
389 std::unique_lock<ceph::mutex>* locker) {
390 if (is_replay_interrupted(locker)) {
391 return;
392 }
393
394 dout(10) << dendl;
395
396 m_local_snap_id_start = 0;
397 m_local_snap_id_end = CEPH_NOSNAP;
398 m_local_mirror_snap_ns = {};
399 m_local_object_count = 0;
400
401 m_remote_snap_id_start = 0;
402 m_remote_snap_id_end = CEPH_NOSNAP;
403 m_remote_mirror_snap_ns = {};
404
1911f103
TL
405 std::set<uint64_t> prune_snap_ids;
406
9f95a23c
TL
407 auto local_image_ctx = m_state_builder->local_image_ctx;
408 std::shared_lock image_locker{local_image_ctx->image_lock};
409 for (auto snap_info_it = local_image_ctx->snap_info.begin();
410 snap_info_it != local_image_ctx->snap_info.end(); ++snap_info_it) {
411 const auto& snap_ns = snap_info_it->second.snap_namespace;
412 auto mirror_ns = boost::get<
413 cls::rbd::MirrorSnapshotNamespace>(&snap_ns);
414 if (mirror_ns == nullptr) {
415 continue;
416 }
417
418 dout(15) << "local mirror snapshot: id=" << snap_info_it->first << ", "
419 << "mirror_ns=" << *mirror_ns << dendl;
420 m_local_mirror_snap_ns = *mirror_ns;
421
422 auto local_snap_id = snap_info_it->first;
423 if (mirror_ns->is_non_primary()) {
424 if (mirror_ns->complete) {
425 // if remote has new snapshots, we would sync from here
426 m_local_snap_id_start = local_snap_id;
427 m_local_snap_id_end = CEPH_NOSNAP;
1911f103
TL
428
429 if (mirror_ns->mirror_peer_uuids.empty()) {
430 // no other peer will attempt to sync to this snapshot so store as
431 // a candidate for removal
432 prune_snap_ids.insert(local_snap_id);
433 }
9f95a23c 434 } else {
1911f103
TL
435 if (mirror_ns->last_copied_object_number == 0) {
436 // snapshot might be missing image state, object-map, etc, so just
437 // delete and re-create it if we haven't started copying data
cd265ab1
TL
438 // objects. Also only prune this snapshot since we will need the
439 // previous mirror snapshot for syncing.
440 prune_snap_ids.clear();
1911f103
TL
441 prune_snap_ids.insert(local_snap_id);
442 break;
443 }
cd265ab1
TL
444
445 // start snap will be last complete mirror snapshot or initial
446 // image revision
447 m_local_snap_id_end = local_snap_id;
9f95a23c
TL
448 }
449 } else if (mirror_ns->is_primary()) {
450 if (mirror_ns->complete) {
451 m_local_snap_id_start = local_snap_id;
452 m_local_snap_id_end = CEPH_NOSNAP;
453 } else {
454 derr << "incomplete local primary snapshot" << dendl;
455 handle_replay_complete(locker, -EINVAL,
456 "incomplete local primary snapshot");
457 return;
458 }
459 } else {
460 derr << "unknown local mirror snapshot state" << dendl;
461 handle_replay_complete(locker, -EINVAL,
462 "invalid local mirror snapshot state");
463 return;
464 }
465 }
466 image_locker.unlock();
467
1911f103
TL
468 if (m_local_snap_id_start > 0 && m_local_snap_id_end == CEPH_NOSNAP) {
469 // remove candidate that is required for delta snapshot sync
470 prune_snap_ids.erase(m_local_snap_id_start);
471 }
472 if (!prune_snap_ids.empty()) {
473 locker->unlock();
474
475 auto prune_snap_id = *prune_snap_ids.begin();
476 dout(5) << "pruning unused non-primary snapshot " << prune_snap_id << dendl;
477 prune_non_primary_snapshot(prune_snap_id);
478 return;
479 }
480
9f95a23c
TL
481 if (m_local_snap_id_start > 0 || m_local_snap_id_end != CEPH_NOSNAP) {
482 if (m_local_mirror_snap_ns.is_non_primary() &&
483 m_local_mirror_snap_ns.primary_mirror_uuid !=
484 m_state_builder->remote_mirror_uuid) {
485 // TODO support multiple peers
486 derr << "local image linked to unknown peer: "
487 << m_local_mirror_snap_ns.primary_mirror_uuid << dendl;
488 handle_replay_complete(locker, -EEXIST,
489 "local image linked to unknown peer");
490 return;
491 } else if (m_local_mirror_snap_ns.state ==
492 cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY) {
493 dout(5) << "local image promoted" << dendl;
494 handle_replay_complete(locker, 0, "force promoted");
495 return;
496 }
497
498 dout(10) << "found local mirror snapshot: "
499 << "local_snap_id_start=" << m_local_snap_id_start << ", "
500 << "local_snap_id_end=" << m_local_snap_id_end << ", "
501 << "local_snap_ns=" << m_local_mirror_snap_ns << dendl;
502 if (!m_local_mirror_snap_ns.is_primary() &&
503 m_local_mirror_snap_ns.complete) {
504 // our remote sync should start after this completed snapshot
505 m_remote_snap_id_start = m_local_mirror_snap_ns.primary_snap_id;
506 }
507 }
508
509 // we don't have any mirror snapshots or only completed non-primary
510 // mirror snapshots
511 scan_remote_mirror_snapshots(locker);
512}
513
514template <typename I>
515void Replayer<I>::scan_remote_mirror_snapshots(
516 std::unique_lock<ceph::mutex>* locker) {
517 dout(10) << dendl;
518
1911f103
TL
519 m_pending_snapshots = 0;
520
521 std::set<uint64_t> unlink_snap_ids;
9f95a23c
TL
522 bool split_brain = false;
523 bool remote_demoted = false;
524 auto remote_image_ctx = m_state_builder->remote_image_ctx;
525 std::shared_lock image_locker{remote_image_ctx->image_lock};
526 for (auto snap_info_it = remote_image_ctx->snap_info.begin();
527 snap_info_it != remote_image_ctx->snap_info.end(); ++snap_info_it) {
528 const auto& snap_ns = snap_info_it->second.snap_namespace;
529 auto mirror_ns = boost::get<
530 cls::rbd::MirrorSnapshotNamespace>(&snap_ns);
531 if (mirror_ns == nullptr) {
532 continue;
533 }
534
535 dout(15) << "remote mirror snapshot: id=" << snap_info_it->first << ", "
536 << "mirror_ns=" << *mirror_ns << dendl;
1911f103 537 remote_demoted = mirror_ns->is_demoted();
9f95a23c
TL
538 if (!mirror_ns->is_primary() && !mirror_ns->is_non_primary()) {
539 derr << "unknown remote mirror snapshot state" << dendl;
540 handle_replay_complete(locker, -EINVAL,
541 "invalid remote mirror snapshot state");
542 return;
1911f103
TL
543 } else if (mirror_ns->mirror_peer_uuids.count(m_remote_mirror_peer_uuid) ==
544 0) {
545 dout(15) << "skipping remote snapshot due to missing mirror peer"
546 << dendl;
547 continue;
9f95a23c
TL
548 }
549
550 auto remote_snap_id = snap_info_it->first;
551 if (m_local_snap_id_start > 0 || m_local_snap_id_end != CEPH_NOSNAP) {
552 // we have a local mirror snapshot
553 if (m_local_mirror_snap_ns.is_non_primary()) {
554 // previously validated that it was linked to remote
555 ceph_assert(m_local_mirror_snap_ns.primary_mirror_uuid ==
556 m_state_builder->remote_mirror_uuid);
557
1911f103 558 unlink_snap_ids.insert(remote_snap_id);
9f95a23c
TL
559 if (m_local_mirror_snap_ns.complete &&
560 m_local_mirror_snap_ns.primary_snap_id >= remote_snap_id) {
561 // skip past completed remote snapshot
562 m_remote_snap_id_start = remote_snap_id;
1911f103 563 m_remote_mirror_snap_ns = *mirror_ns;
9f95a23c
TL
564 dout(15) << "skipping synced remote snapshot " << remote_snap_id
565 << dendl;
566 continue;
567 } else if (!m_local_mirror_snap_ns.complete &&
568 m_local_mirror_snap_ns.primary_snap_id > remote_snap_id) {
569 // skip until we get to the in-progress remote snapshot
570 dout(15) << "skipping synced remote snapshot " << remote_snap_id
571 << " while search for in-progress sync" << dendl;
572 m_remote_snap_id_start = remote_snap_id;
1911f103 573 m_remote_mirror_snap_ns = *mirror_ns;
9f95a23c
TL
574 continue;
575 }
576 } else if (m_local_mirror_snap_ns.state ==
577 cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY_DEMOTED) {
578 // find the matching demotion snapshot in remote image
579 ceph_assert(m_local_snap_id_start > 0);
580 if (mirror_ns->state ==
581 cls::rbd::MIRROR_SNAPSHOT_STATE_NON_PRIMARY_DEMOTED &&
582 mirror_ns->primary_mirror_uuid == m_local_mirror_uuid &&
583 mirror_ns->primary_snap_id == m_local_snap_id_start) {
584 dout(10) << "located matching demotion snapshot: "
585 << "remote_snap_id=" << remote_snap_id << ", "
586 << "local_snap_id=" << m_local_snap_id_start << dendl;
587 m_remote_snap_id_start = remote_snap_id;
588 split_brain = false;
589 continue;
590 } else if (m_remote_snap_id_start == 0) {
591 // still looking for our matching demotion snapshot
592 dout(15) << "skipping remote snapshot " << remote_snap_id << " "
593 << "while searching for demotion" << dendl;
594 split_brain = true;
595 continue;
596 }
597 } else {
598 // should not have been able to reach this
599 ceph_assert(false);
600 }
1911f103 601 } else if (!mirror_ns->is_primary()) {
9f95a23c
TL
602 dout(15) << "skipping non-primary remote snapshot" << dendl;
603 continue;
1911f103
TL
604 }
605
606 // found candidate snapshot to sync
607 ++m_pending_snapshots;
608 if (m_remote_snap_id_end != CEPH_NOSNAP) {
9f95a23c
TL
609 continue;
610 }
611
1911f103 612 // first primary snapshot where were are listed as a peer
9f95a23c
TL
613 m_remote_snap_id_end = remote_snap_id;
614 m_remote_mirror_snap_ns = *mirror_ns;
9f95a23c
TL
615 }
616 image_locker.unlock();
617
1911f103
TL
618 unlink_snap_ids.erase(m_remote_snap_id_start);
619 unlink_snap_ids.erase(m_remote_snap_id_end);
620 if (!unlink_snap_ids.empty()) {
621 locker->unlock();
622
623 // retry the unlinking process for a remote snapshot that we do not
624 // need anymore
625 auto remote_snap_id = *unlink_snap_ids.begin();
626 dout(10) << "unlinking from remote snapshot " << remote_snap_id << dendl;
627 unlink_peer(remote_snap_id);
628 return;
629 }
630
9f95a23c
TL
631 if (m_remote_snap_id_end != CEPH_NOSNAP) {
632 dout(10) << "found remote mirror snapshot: "
633 << "remote_snap_id_start=" << m_remote_snap_id_start << ", "
634 << "remote_snap_id_end=" << m_remote_snap_id_end << ", "
635 << "remote_snap_ns=" << m_remote_mirror_snap_ns << dendl;
636 if (m_remote_mirror_snap_ns.complete) {
637 locker->unlock();
638
639 if (m_local_snap_id_end != CEPH_NOSNAP &&
640 !m_local_mirror_snap_ns.complete) {
641 // attempt to resume image-sync
642 dout(10) << "local image contains in-progress mirror snapshot"
643 << dendl;
644 get_local_image_state();
645 } else {
646 copy_snapshots();
647 }
648 return;
649 } else {
650 // might have raced with the creation of a remote mirror snapshot
651 // so we will need to refresh and rescan once it completes
652 dout(15) << "remote mirror snapshot not complete" << dendl;
653 }
654 }
655
656 if (m_image_updated) {
657 // received update notification while scanning image, restart ...
658 m_image_updated = false;
659 locker->unlock();
660
661 dout(10) << "restarting snapshot scan due to remote update notification"
662 << dendl;
663 load_local_image_meta();
664 return;
665 }
666
667 if (is_replay_interrupted(locker)) {
668 return;
669 } else if (split_brain) {
670 derr << "split-brain detected: failed to find matching non-primary "
671 << "snapshot in remote image: "
672 << "local_snap_id_start=" << m_local_snap_id_start << ", "
673 << "local_snap_ns=" << m_local_mirror_snap_ns << dendl;
674 handle_replay_complete(locker, -EEXIST, "split-brain");
675 return;
676 } else if (remote_demoted) {
677 dout(10) << "remote image demoted" << dendl;
678 handle_replay_complete(locker, -EREMOTEIO, "remote image demoted");
679 return;
680 }
681
682 dout(10) << "all remote snapshots synced: idling waiting for new snapshot"
683 << dendl;
684 ceph_assert(m_state == STATE_REPLAYING);
685 m_state = STATE_IDLE;
686
687 notify_status_updated();
688}
689
1911f103
TL
690template <typename I>
691void Replayer<I>::prune_non_primary_snapshot(uint64_t snap_id) {
692 dout(10) << "snap_id=" << snap_id << dendl;
693
694 auto local_image_ctx = m_state_builder->local_image_ctx;
695 bool snap_valid = false;
696 cls::rbd::SnapshotNamespace snap_namespace;
697 std::string snap_name;
698
699 {
700 std::shared_lock image_locker{local_image_ctx->image_lock};
701 auto snap_info = local_image_ctx->get_snap_info(snap_id);
702 if (snap_info != nullptr) {
703 snap_valid = true;
704 snap_namespace = snap_info->snap_namespace;
705 snap_name = snap_info->name;
706
707 ceph_assert(boost::get<cls::rbd::MirrorSnapshotNamespace>(
708 &snap_namespace) != nullptr);
709 }
710 }
711
712 if (!snap_valid) {
713 load_local_image_meta();
714 return;
715 }
716
717 auto ctx = create_context_callback<
718 Replayer<I>, &Replayer<I>::handle_prune_non_primary_snapshot>(this);
719 local_image_ctx->operations->snap_remove(snap_namespace, snap_name, ctx);
720}
721
722template <typename I>
723void Replayer<I>::handle_prune_non_primary_snapshot(int r) {
724 dout(10) << "r=" << r << dendl;
725
726 if (r < 0 && r != -ENOENT) {
727 derr << "failed to prune non-primary snapshot: " << cpp_strerror(r)
728 << dendl;
729 handle_replay_complete(r, "failed to prune non-primary snapshot");
730 return;
731 }
732
733 if (is_replay_interrupted()) {
734 return;
735 }
736
737 load_local_image_meta();
738}
739
9f95a23c
TL
740template <typename I>
741void Replayer<I>::copy_snapshots() {
742 dout(10) << "remote_snap_id_start=" << m_remote_snap_id_start << ", "
743 << "remote_snap_id_end=" << m_remote_snap_id_end << ", "
744 << "local_snap_id_start=" << m_local_snap_id_start << dendl;
745
746 ceph_assert(m_remote_snap_id_start != CEPH_NOSNAP);
747 ceph_assert(m_remote_snap_id_end > 0 &&
748 m_remote_snap_id_end != CEPH_NOSNAP);
749 ceph_assert(m_local_snap_id_start != CEPH_NOSNAP);
750
751 m_local_mirror_snap_ns = {};
752 auto ctx = create_context_callback<
753 Replayer<I>, &Replayer<I>::handle_copy_snapshots>(this);
754 auto req = librbd::deep_copy::SnapshotCopyRequest<I>::create(
755 m_state_builder->remote_image_ctx, m_state_builder->local_image_ctx,
756 m_remote_snap_id_start, m_remote_snap_id_end, m_local_snap_id_start,
757 false, m_threads->work_queue, &m_local_mirror_snap_ns.snap_seqs,
758 ctx);
759 req->send();
760}
761
762template <typename I>
763void Replayer<I>::handle_copy_snapshots(int r) {
764 dout(10) << "r=" << r << dendl;
765
766 if (r < 0) {
767 derr << "failed to copy snapshots from remote to local image: "
768 << cpp_strerror(r) << dendl;
769 handle_replay_complete(
770 r, "failed to copy snapshots from remote to local image");
771 return;
772 }
773
774 dout(10) << "remote_snap_id_start=" << m_remote_snap_id_start << ", "
775 << "remote_snap_id_end=" << m_remote_snap_id_end << ", "
776 << "local_snap_id_start=" << m_local_snap_id_start << ", "
777 << "snap_seqs=" << m_local_mirror_snap_ns.snap_seqs << dendl;
778 get_remote_image_state();
779}
780
781template <typename I>
782void Replayer<I>::get_remote_image_state() {
783 dout(10) << dendl;
784
785 auto ctx = create_context_callback<
786 Replayer<I>, &Replayer<I>::handle_get_remote_image_state>(this);
787 auto req = librbd::mirror::snapshot::GetImageStateRequest<I>::create(
788 m_state_builder->remote_image_ctx, m_remote_snap_id_end,
789 &m_image_state, ctx);
790 req->send();
791}
792
793template <typename I>
794void Replayer<I>::handle_get_remote_image_state(int r) {
795 dout(10) << "r=" << r << dendl;
796
797 if (r < 0) {
798 derr << "failed to retrieve remote snapshot image state: "
799 << cpp_strerror(r) << dendl;
800 handle_replay_complete(r, "failed to retrieve remote snapshot image state");
801 return;
802 }
803
804 create_non_primary_snapshot();
805}
806
807template <typename I>
808void Replayer<I>::get_local_image_state() {
809 dout(10) << dendl;
810
811 ceph_assert(m_local_snap_id_end != CEPH_NOSNAP);
812 auto ctx = create_context_callback<
813 Replayer<I>, &Replayer<I>::handle_get_local_image_state>(this);
814 auto req = librbd::mirror::snapshot::GetImageStateRequest<I>::create(
815 m_state_builder->local_image_ctx, m_local_snap_id_end,
816 &m_image_state, ctx);
817 req->send();
818}
819
820template <typename I>
821void Replayer<I>::handle_get_local_image_state(int r) {
822 dout(10) << "r=" << r << dendl;
823
824 if (r < 0) {
825 derr << "failed to retrieve local snapshot image state: "
826 << cpp_strerror(r) << dendl;
827 handle_replay_complete(r, "failed to retrieve local snapshot image state");
828 return;
829 }
830
831 request_sync();
832}
833
834template <typename I>
835void Replayer<I>::create_non_primary_snapshot() {
1911f103
TL
836 auto local_image_ctx = m_state_builder->local_image_ctx;
837
838 if (m_local_snap_id_start > 0) {
839 std::shared_lock local_image_locker{local_image_ctx->image_lock};
840
841 auto local_snap_info_it = local_image_ctx->snap_info.find(
842 m_local_snap_id_start);
843 if (local_snap_info_it == local_image_ctx->snap_info.end()) {
844 local_image_locker.unlock();
845
846 derr << "failed to locate local snapshot " << m_local_snap_id_start
847 << dendl;
848 handle_replay_complete(-ENOENT, "failed to locate local start snapshot");
849 return;
850 }
851
852 auto mirror_ns = boost::get<cls::rbd::MirrorSnapshotNamespace>(
853 &local_snap_info_it->second.snap_namespace);
854 ceph_assert(mirror_ns != nullptr);
855
856 auto remote_image_ctx = m_state_builder->remote_image_ctx;
857 std::shared_lock remote_image_locker{remote_image_ctx->image_lock};
858
859 // (re)build a full mapping from remote to local snap ids for all user
860 // snapshots to support applying image state in the future
861 for (auto& [remote_snap_id, remote_snap_info] :
862 remote_image_ctx->snap_info) {
863 if (remote_snap_id >= m_remote_snap_id_end) {
864 break;
865 }
866
867 // we can ignore all non-user snapshots since image state only includes
868 // user snapshots
869 if (boost::get<cls::rbd::UserSnapshotNamespace>(
870 &remote_snap_info.snap_namespace) == nullptr) {
871 continue;
872 }
873
874 uint64_t local_snap_id = CEPH_NOSNAP;
875 if (mirror_ns->is_demoted() && !m_remote_mirror_snap_ns.is_demoted()) {
876 // if we are creating a non-primary snapshot following a demotion,
877 // re-build the full snapshot sequence since we don't have a valid
878 // snapshot mapping
879 auto local_snap_id_it = local_image_ctx->snap_ids.find(
880 {remote_snap_info.snap_namespace, remote_snap_info.name});
881 if (local_snap_id_it != local_image_ctx->snap_ids.end()) {
882 local_snap_id = local_snap_id_it->second;
883 }
884 } else {
885 auto snap_seq_it = mirror_ns->snap_seqs.find(remote_snap_id);
886 if (snap_seq_it != mirror_ns->snap_seqs.end()) {
887 local_snap_id = snap_seq_it->second;
888 }
889 }
890
891 if (m_local_mirror_snap_ns.snap_seqs.count(remote_snap_id) == 0 &&
892 local_snap_id != CEPH_NOSNAP) {
893 dout(15) << "mapping remote snapshot " << remote_snap_id << " to "
894 << "local snapshot " << local_snap_id << dendl;
895 m_local_mirror_snap_ns.snap_seqs[remote_snap_id] = local_snap_id;
896 }
897 }
898 }
899
900 dout(10) << "demoted=" << m_remote_mirror_snap_ns.is_demoted() << ", "
901 << "primary_mirror_uuid="
902 << m_state_builder->remote_mirror_uuid << ", "
903 << "primary_snap_id=" << m_remote_snap_id_end << ", "
904 << "snap_seqs=" << m_local_mirror_snap_ns.snap_seqs << dendl;
9f95a23c
TL
905
906 auto ctx = create_context_callback<
907 Replayer<I>, &Replayer<I>::handle_create_non_primary_snapshot>(this);
908 auto req = librbd::mirror::snapshot::CreateNonPrimaryRequest<I>::create(
1911f103 909 local_image_ctx, m_remote_mirror_snap_ns.is_demoted(),
9f95a23c
TL
910 m_state_builder->remote_mirror_uuid, m_remote_snap_id_end,
911 m_local_mirror_snap_ns.snap_seqs, m_image_state, &m_local_snap_id_end, ctx);
912 req->send();
913}
914
915template <typename I>
916void Replayer<I>::handle_create_non_primary_snapshot(int r) {
917 dout(10) << "r=" << r << dendl;
918
919 if (r < 0) {
920 derr << "failed to create local mirror snapshot: " << cpp_strerror(r)
921 << dendl;
922 handle_replay_complete(r, "failed to create local mirror snapshot");
923 return;
924 }
925
926 dout(15) << "local_snap_id_end=" << m_local_snap_id_end << dendl;
927
928 request_sync();
929}
930
931template <typename I>
932void Replayer<I>::request_sync() {
1911f103
TL
933 if (m_remote_mirror_snap_ns.clean_since_snap_id == m_remote_snap_id_start) {
934 dout(10) << "skipping unnecessary image copy: "
935 << "remote_snap_id_start=" << m_remote_snap_id_start << ", "
936 << "remote_mirror_snap_ns=" << m_remote_mirror_snap_ns << dendl;
937 apply_image_state();
938 return;
939 }
9f95a23c 940
1911f103 941 dout(10) << dendl;
9f95a23c 942 std::unique_lock locker{m_lock};
1911f103
TL
943 if (is_replay_interrupted(&locker)) {
944 return;
945 }
946
9f95a23c
TL
947 auto ctx = create_async_context_callback(
948 m_threads->work_queue, create_context_callback<
949 Replayer<I>, &Replayer<I>::handle_request_sync>(this));
950 m_instance_watcher->notify_sync_request(m_state_builder->local_image_ctx->id,
951 ctx);
952}
953
954template <typename I>
955void Replayer<I>::handle_request_sync(int r) {
956 dout(10) << "r=" << r << dendl;
957
958 std::unique_lock locker{m_lock};
959 if (is_replay_interrupted(&locker)) {
960 return;
961 } else if (r == -ECANCELED) {
962 dout(5) << "image-sync canceled" << dendl;
963 handle_replay_complete(&locker, r, "image-sync canceled");
964 return;
965 } else if (r < 0) {
966 derr << "failed to request image-sync: " << cpp_strerror(r) << dendl;
967 handle_replay_complete(&locker, r, "failed to request image-sync");
968 return;
969 }
970
971 m_sync_in_progress = true;
972 locker.unlock();
973
974 copy_image();
975}
976
977template <typename I>
978void Replayer<I>::copy_image() {
979 dout(10) << "remote_snap_id_start=" << m_remote_snap_id_start << ", "
980 << "remote_snap_id_end=" << m_remote_snap_id_end << ", "
981 << "local_snap_id_start=" << m_local_snap_id_start << ", "
982 << "last_copied_object_number="
983 << m_local_mirror_snap_ns.last_copied_object_number << ", "
984 << "snap_seqs=" << m_local_mirror_snap_ns.snap_seqs << dendl;
985
1911f103
TL
986 m_snapshot_bytes = 0;
987 m_deep_copy_handler = new DeepCopyHandler(this);
9f95a23c
TL
988 auto ctx = create_context_callback<
989 Replayer<I>, &Replayer<I>::handle_copy_image>(this);
990 auto req = librbd::deep_copy::ImageCopyRequest<I>::create(
991 m_state_builder->remote_image_ctx, m_state_builder->local_image_ctx,
992 m_remote_snap_id_start, m_remote_snap_id_end, m_local_snap_id_start, false,
993 (m_local_mirror_snap_ns.last_copied_object_number > 0 ?
994 librbd::deep_copy::ObjectNumber{
995 m_local_mirror_snap_ns.last_copied_object_number} :
996 librbd::deep_copy::ObjectNumber{}),
1911f103 997 m_local_mirror_snap_ns.snap_seqs, m_deep_copy_handler, ctx);
9f95a23c
TL
998 req->send();
999}
1000
1001template <typename I>
1002void Replayer<I>::handle_copy_image(int r) {
1003 dout(10) << "r=" << r << dendl;
1004
1911f103
TL
1005 delete m_deep_copy_handler;
1006 m_deep_copy_handler = nullptr;
9f95a23c
TL
1007
1008 if (r < 0) {
1009 derr << "failed to copy remote image to local image: " << cpp_strerror(r)
1010 << dendl;
1011 handle_replay_complete(r, "failed to copy remote image");
1012 return;
1013 }
1014
1911f103
TL
1015 {
1016 std::unique_lock locker{m_lock};
1017 m_bytes_per_snapshot(m_snapshot_bytes);
1018 m_snapshot_bytes = 0;
1019 }
1020
9f95a23c
TL
1021 apply_image_state();
1022}
1023
1024template <typename I>
1025void Replayer<I>::handle_copy_image_progress(uint64_t object_number,
1026 uint64_t object_count) {
1027 dout(10) << "object_number=" << object_number << ", "
1028 << "object_count=" << object_count << dendl;
1029
1030 std::unique_lock locker{m_lock};
1031 m_local_mirror_snap_ns.last_copied_object_number = std::min(
1032 object_number, object_count);
1033 m_local_object_count = object_count;
1034
1035 update_non_primary_snapshot(false);
1036}
1037
1911f103
TL
1038template <typename I>
1039void Replayer<I>::handle_copy_image_read(uint64_t bytes_read) {
1040 dout(20) << "bytes_read=" << bytes_read << dendl;
1041
1042 std::unique_lock locker{m_lock};
1043 m_bytes_per_second(bytes_read);
1044 m_snapshot_bytes += bytes_read;
1045}
1046
9f95a23c
TL
1047template <typename I>
1048void Replayer<I>::apply_image_state() {
1049 dout(10) << dendl;
1050
1051 auto ctx = create_context_callback<
1052 Replayer<I>, &Replayer<I>::handle_apply_image_state>(this);
1053 auto req = ApplyImageStateRequest<I>::create(
1054 m_local_mirror_uuid,
1055 m_state_builder->remote_mirror_uuid,
1056 m_state_builder->local_image_ctx,
1057 m_state_builder->remote_image_ctx,
1058 m_image_state, ctx);
1059 req->send();
1060}
1061
1062template <typename I>
1063void Replayer<I>::handle_apply_image_state(int r) {
1064 dout(10) << "r=" << r << dendl;
1065
1066 if (r < 0 && r != -ENOENT) {
1067 derr << "failed to apply remote image state to local image: "
1068 << cpp_strerror(r) << dendl;
1069 handle_replay_complete(r, "failed to apply remote image state");
1070 return;
1071 }
1072
1073 std::unique_lock locker{m_lock};
1074 update_non_primary_snapshot(true);
1075}
1076
1077template <typename I>
1078void Replayer<I>::update_non_primary_snapshot(bool complete) {
1079 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1080 if (!complete) {
1081 // disallow two in-flight updates if this isn't the completion of the sync
1082 if (m_updating_sync_point) {
1083 return;
1084 }
1085 m_updating_sync_point = true;
1086 } else {
1087 m_local_mirror_snap_ns.complete = true;
1088 }
1089
1090 dout(10) << dendl;
1091
1092 librados::ObjectWriteOperation op;
1093 librbd::cls_client::mirror_image_snapshot_set_copy_progress(
1094 &op, m_local_snap_id_end, m_local_mirror_snap_ns.complete,
1095 m_local_mirror_snap_ns.last_copied_object_number);
1096
1911f103
TL
1097 auto ctx = new C_TrackedOp(
1098 m_in_flight_op_tracker, new LambdaContext([this, complete](int r) {
9f95a23c
TL
1099 handle_update_non_primary_snapshot(complete, r);
1100 }));
1101 auto aio_comp = create_rados_callback(ctx);
1102 int r = m_state_builder->local_image_ctx->md_ctx.aio_operate(
1103 m_state_builder->local_image_ctx->header_oid, aio_comp, &op);
1104 ceph_assert(r == 0);
1105 aio_comp->release();
1106}
1107
1108template <typename I>
1109void Replayer<I>::handle_update_non_primary_snapshot(bool complete, int r) {
1110 dout(10) << "r=" << r << dendl;
1111
1112 if (r < 0) {
1113 derr << "failed to update local snapshot progress: " << cpp_strerror(r)
1114 << dendl;
1115 if (complete) {
1116 // only fail if this was the final update
1117 handle_replay_complete(r, "failed to update local snapshot progress");
1118 return;
1119 }
1120 }
1121
1122 if (!complete) {
1123 // periodic sync-point update -- do not advance state machine
1124 std::unique_lock locker{m_lock};
1125
1126 ceph_assert(m_updating_sync_point);
1127 m_updating_sync_point = false;
1128 return;
1129 }
1130
1131 notify_image_update();
1132}
1133
1134template <typename I>
1135void Replayer<I>::notify_image_update() {
1136 dout(10) << dendl;
1137
1138 auto ctx = create_context_callback<
1139 Replayer<I>, &Replayer<I>::handle_notify_image_update>(this);
1140 m_state_builder->local_image_ctx->notify_update(ctx);
1141}
1142
1143template <typename I>
1144void Replayer<I>::handle_notify_image_update(int r) {
1145 dout(10) << "r=" << r << dendl;
1146
1147 if (r < 0) {
1148 derr << "failed to notify local image update: " << cpp_strerror(r) << dendl;
1149 }
1150
1911f103 1151 unlink_peer(m_remote_snap_id_start);
9f95a23c
TL
1152}
1153
1154template <typename I>
1911f103
TL
1155void Replayer<I>::unlink_peer(uint64_t remote_snap_id) {
1156 if (remote_snap_id == 0) {
9f95a23c
TL
1157 finish_sync();
1158 return;
1159 }
1160
1161 // local snapshot fully synced -- we no longer depend on the sync
1162 // start snapshot in the remote image
1911f103 1163 dout(10) << "remote_snap_id=" << remote_snap_id << dendl;
9f95a23c
TL
1164
1165 auto ctx = create_context_callback<
1166 Replayer<I>, &Replayer<I>::handle_unlink_peer>(this);
1167 auto req = librbd::mirror::snapshot::UnlinkPeerRequest<I>::create(
1911f103 1168 m_state_builder->remote_image_ctx, remote_snap_id,
9f95a23c
TL
1169 m_remote_mirror_peer_uuid, ctx);
1170 req->send();
1171}
1172
1173template <typename I>
1174void Replayer<I>::handle_unlink_peer(int r) {
1175 dout(10) << "r=" << r << dendl;
1176
1177 if (r < 0 && r != -ENOENT) {
1178 derr << "failed to unlink local peer from remote image: " << cpp_strerror(r)
1179 << dendl;
1180 handle_replay_complete(r, "failed to unlink local peer from remote image");
1181 return;
1182 }
1183
1184 finish_sync();
1185}
1186
1187template <typename I>
1188void Replayer<I>::finish_sync() {
1189 dout(10) << dendl;
1190
1191 {
1192 std::unique_lock locker{m_lock};
1193 notify_status_updated();
1194
1911f103
TL
1195 if (m_sync_in_progress) {
1196 m_sync_in_progress = false;
1197 m_instance_watcher->notify_sync_complete(
1198 m_state_builder->local_image_ctx->id);
1199 }
9f95a23c
TL
1200 }
1201
1202 if (is_replay_interrupted()) {
1203 return;
1204 }
1205
1206 load_local_image_meta();
1207}
1208
1209template <typename I>
1210void Replayer<I>::register_local_update_watcher() {
1211 dout(10) << dendl;
1212
1213 m_update_watch_ctx = new C_UpdateWatchCtx(this);
1214
1215 int r = m_state_builder->local_image_ctx->state->register_update_watcher(
1216 m_update_watch_ctx, &m_local_update_watcher_handle);
1217 auto ctx = create_context_callback<
1218 Replayer<I>, &Replayer<I>::handle_register_local_update_watcher>(this);
1219 m_threads->work_queue->queue(ctx, r);
1220}
1221
1222template <typename I>
1223void Replayer<I>::handle_register_local_update_watcher(int r) {
1224 dout(10) << "r=" << r << dendl;
1225
1226 if (r < 0) {
1227 derr << "failed to register local update watcher: " << cpp_strerror(r)
1228 << dendl;
1229 handle_replay_complete(r, "failed to register local image update watcher");
1230 m_state = STATE_COMPLETE;
1231
1232 delete m_update_watch_ctx;
1233 m_update_watch_ctx = nullptr;
1234
1235 Context* on_init = nullptr;
1236 std::swap(on_init, m_on_init_shutdown);
1237 on_init->complete(r);
1238 return;
1239 }
1240
1241 register_remote_update_watcher();
1242}
1243
1244template <typename I>
1245void Replayer<I>::register_remote_update_watcher() {
1246 dout(10) << dendl;
1247
1248 int r = m_state_builder->remote_image_ctx->state->register_update_watcher(
1249 m_update_watch_ctx, &m_remote_update_watcher_handle);
1250 auto ctx = create_context_callback<
1251 Replayer<I>, &Replayer<I>::handle_register_remote_update_watcher>(this);
1252 m_threads->work_queue->queue(ctx, r);
1253}
1254
1255template <typename I>
1256void Replayer<I>::handle_register_remote_update_watcher(int r) {
1257 dout(10) << "r=" << r << dendl;
1258
1259 if (r < 0) {
1260 derr << "failed to register remote update watcher: " << cpp_strerror(r)
1261 << dendl;
1262 handle_replay_complete(r, "failed to register remote image update watcher");
1263 m_state = STATE_COMPLETE;
1264
1265 unregister_local_update_watcher();
1266 return;
1267 }
1268
1269 m_state = STATE_REPLAYING;
1270
1271 Context* on_init = nullptr;
1272 std::swap(on_init, m_on_init_shutdown);
1273 on_init->complete(0);
1274
1275 // delay initial snapshot scan until after we have alerted
1276 // image replayer that we have initialized in case an error
1277 // occurs
1278 {
1279 std::unique_lock locker{m_lock};
1280 notify_status_updated();
1281 }
1282
1283 load_local_image_meta();
1284}
1285
1286template <typename I>
1287void Replayer<I>::unregister_remote_update_watcher() {
1288 dout(10) << dendl;
1289
1290 auto ctx = create_context_callback<
1291 Replayer<I>,
1292 &Replayer<I>::handle_unregister_remote_update_watcher>(this);
1293 m_state_builder->remote_image_ctx->state->unregister_update_watcher(
1294 m_remote_update_watcher_handle, ctx);
1295}
1296
1297template <typename I>
1298void Replayer<I>::handle_unregister_remote_update_watcher(int r) {
1299 dout(10) << "r=" << r << dendl;
1300
1301 if (r < 0) {
1302 derr << "failed to unregister remote update watcher: " << cpp_strerror(r)
1303 << dendl;
1304 handle_replay_complete(
1305 r, "failed to unregister remote image update watcher");
1306 }
1307
1308 unregister_local_update_watcher();
1309}
1310
1311template <typename I>
1312void Replayer<I>::unregister_local_update_watcher() {
1313 dout(10) << dendl;
1314
1315 auto ctx = create_context_callback<
1316 Replayer<I>,
1317 &Replayer<I>::handle_unregister_local_update_watcher>(this);
1318 m_state_builder->local_image_ctx->state->unregister_update_watcher(
1319 m_local_update_watcher_handle, ctx);
1320}
1321
1322template <typename I>
1323void Replayer<I>::handle_unregister_local_update_watcher(int r) {
1324 dout(10) << "r=" << r << dendl;
1325
1326 if (r < 0) {
1327 derr << "failed to unregister local update watcher: " << cpp_strerror(r)
1328 << dendl;
1329 handle_replay_complete(
1330 r, "failed to unregister local image update watcher");
1331 }
1332
1333 delete m_update_watch_ctx;
1334 m_update_watch_ctx = nullptr;
1335
1336 wait_for_in_flight_ops();
1337}
1338
1339template <typename I>
1340void Replayer<I>::wait_for_in_flight_ops() {
1341 dout(10) << dendl;
1342
1343 auto ctx = create_async_context_callback(
1344 m_threads->work_queue, create_context_callback<
1345 Replayer<I>, &Replayer<I>::handle_wait_for_in_flight_ops>(this));
1346 m_in_flight_op_tracker.wait_for_ops(ctx);
1347}
1348
1349template <typename I>
1350void Replayer<I>::handle_wait_for_in_flight_ops(int r) {
1351 dout(10) << "r=" << r << dendl;
1352
1353 Context* on_shutdown = nullptr;
1354 {
1355 std::unique_lock locker{m_lock};
1356 ceph_assert(m_on_init_shutdown != nullptr);
1357 std::swap(on_shutdown, m_on_init_shutdown);
1358 }
1359 on_shutdown->complete(m_error_code);
1360}
1361
1362template <typename I>
1363void Replayer<I>::handle_image_update_notify() {
1364 dout(10) << dendl;
1365
1366 std::unique_lock locker{m_lock};
1367 if (m_state == STATE_REPLAYING) {
1368 dout(15) << "flagging snapshot rescan required" << dendl;
1369 m_image_updated = true;
1370 } else if (m_state == STATE_IDLE) {
1371 m_state = STATE_REPLAYING;
1372 locker.unlock();
1373
1374 dout(15) << "restarting idle replayer" << dendl;
1375 load_local_image_meta();
1376 }
1377}
1378
1379template <typename I>
1380void Replayer<I>::handle_replay_complete(int r,
1381 const std::string& description) {
1382 std::unique_lock locker{m_lock};
1383 handle_replay_complete(&locker, r, description);
1384}
1385
1386template <typename I>
1387void Replayer<I>::handle_replay_complete(std::unique_lock<ceph::mutex>* locker,
1388 int r,
1389 const std::string& description) {
1390 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1391
1392 if (m_error_code == 0) {
1393 m_error_code = r;
1394 m_error_description = description;
1395 }
1396
1397 if (m_sync_in_progress) {
1398 m_sync_in_progress = false;
1399 m_instance_watcher->notify_sync_complete(
1400 m_state_builder->local_image_ctx->id);
1401 }
1402
1403 if (m_state != STATE_REPLAYING && m_state != STATE_IDLE) {
1404 return;
1405 }
1406
1407 m_state = STATE_COMPLETE;
1408 notify_status_updated();
1409}
1410
1411template <typename I>
1412void Replayer<I>::notify_status_updated() {
1413 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1414
1415 dout(10) << dendl;
1911f103 1416 auto ctx = new C_TrackedOp(m_in_flight_op_tracker, new LambdaContext(
9f95a23c
TL
1417 [this](int) {
1418 m_replayer_listener->handle_notification();
1419 }));
1420 m_threads->work_queue->queue(ctx, 0);
1421}
1422
1423template <typename I>
1424bool Replayer<I>::is_replay_interrupted() {
1425 std::unique_lock locker{m_lock};
1426 return is_replay_interrupted(&locker);
1427}
1428
1429template <typename I>
1430bool Replayer<I>::is_replay_interrupted(std::unique_lock<ceph::mutex>* locker) {
1431 if (m_state == STATE_COMPLETE) {
1432 locker->unlock();
1433
1434 dout(10) << "resuming pending shut down" << dendl;
1435 unregister_remote_update_watcher();
1436 return true;
1437 }
1438 return false;
1439}
1440
1441} // namespace snapshot
1442} // namespace image_replayer
1443} // namespace mirror
1444} // namespace rbd
1445
1446template class rbd::mirror::image_replayer::snapshot::Replayer<librbd::ImageCtx>;