]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/tools/rbd_mirror/ImageReplayer.cc
update sources to 12.2.7
[ceph.git] / ceph / src / tools / rbd_mirror / ImageReplayer.cc
index f68d51d40d8d1aebb1f2f6aaadd03e4b8be43916..28678172b1727f48d42bb62253e8056e93c8f569 100644 (file)
 #include "librbd/Operations.h"
 #include "librbd/Utils.h"
 #include "librbd/journal/Replay.h"
+#include "ImageDeleter.h"
 #include "ImageReplayer.h"
 #include "Threads.h"
 #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
 #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
 #include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
 #include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
+#include "tools/rbd_mirror/image_replayer/PrepareRemoteImageRequest.h"
 #include "tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h"
 
 #define dout_context g_ceph_context
@@ -72,86 +74,84 @@ struct ReplayHandler : public ::journal::ReplayHandler {
   }
 };
 
+template <typename I>
 class ImageReplayerAdminSocketCommand {
 public:
+  ImageReplayerAdminSocketCommand(const std::string &desc,
+                                  ImageReplayer<I> *replayer)
+    : desc(desc), replayer(replayer) {
+  }
   virtual ~ImageReplayerAdminSocketCommand() {}
   virtual bool call(Formatter *f, stringstream *ss) = 0;
+
+  std::string desc;
+  ImageReplayer<I> *replayer;
+  bool registered = false;
 };
 
 template <typename I>
-class StatusCommand : public ImageReplayerAdminSocketCommand {
+class StatusCommand : public ImageReplayerAdminSocketCommand<I> {
 public:
-  explicit StatusCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
+  explicit StatusCommand(const std::string &desc, ImageReplayer<I> *replayer)
+    : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
+  }
 
   bool call(Formatter *f, stringstream *ss) override {
-    replayer->print_status(f, ss);
+    this->replayer->print_status(f, ss);
     return true;
   }
-
-private:
-  ImageReplayer<I> *replayer;
 };
 
 template <typename I>
-class StartCommand : public ImageReplayerAdminSocketCommand {
+class StartCommand : public ImageReplayerAdminSocketCommand<I> {
 public:
-  explicit StartCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
+  explicit StartCommand(const std::string &desc, ImageReplayer<I> *replayer)
+    : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
+  }
 
   bool call(Formatter *f, stringstream *ss) override {
-    replayer->start(nullptr, true);
+    this->replayer->start(nullptr, true);
     return true;
   }
-
-private:
-  ImageReplayer<I> *replayer;
 };
 
 template <typename I>
-class StopCommand : public ImageReplayerAdminSocketCommand {
+class StopCommand : public ImageReplayerAdminSocketCommand<I> {
 public:
-  explicit StopCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
+  explicit StopCommand(const std::string &desc, ImageReplayer<I> *replayer)
+    : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
+  }
 
   bool call(Formatter *f, stringstream *ss) override {
-    replayer->stop(nullptr, true);
+    this->replayer->stop(nullptr, true);
     return true;
   }
-
-private:
-  ImageReplayer<I> *replayer;
 };
 
 template <typename I>
-class RestartCommand : public ImageReplayerAdminSocketCommand {
+class RestartCommand : public ImageReplayerAdminSocketCommand<I> {
 public:
-  explicit RestartCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
+  explicit RestartCommand(const std::string &desc, ImageReplayer<I> *replayer)
+    : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
+  }
 
   bool call(Formatter *f, stringstream *ss) override {
-    replayer->restart();
+    this->replayer->restart();
     return true;
   }
-
-private:
-  ImageReplayer<I> *replayer;
 };
 
 template <typename I>
-class FlushCommand : public ImageReplayerAdminSocketCommand {
+class FlushCommand : public ImageReplayerAdminSocketCommand<I> {
 public:
-  explicit FlushCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
+  explicit FlushCommand(const std::string &desc, ImageReplayer<I> *replayer)
+    : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
+  }
 
   bool call(Formatter *f, stringstream *ss) override {
-    C_SaferCond cond;
-    replayer->flush(&cond);
-    int r = cond.wait();
-    if (r < 0) {
-      *ss << "flush: " << cpp_strerror(r);
-      return false;
-    }
+    this->replayer->flush();
     return true;
   }
-
-private:
-  ImageReplayer<I> *replayer;
 };
 
 template <typename I>
@@ -160,61 +160,43 @@ public:
   ImageReplayerAdminSocketHook(CephContext *cct, const std::string &name,
                               ImageReplayer<I> *replayer)
     : admin_socket(cct->get_admin_socket()),
-      lock("ImageReplayerAdminSocketHook::lock " +
-             replayer->get_global_image_id()) {
-    std::string command;
-    int r;
-
-    command = "rbd mirror status " + name;
-    r = admin_socket->register_command(command, command, this,
-                                      "get status for rbd mirror " + name);
-    if (r == 0) {
-      commands[command] = new StatusCommand<I>(replayer);
-    }
-
-    command = "rbd mirror start " + name;
-    r = admin_socket->register_command(command, command, this,
-                                      "start rbd mirror " + name);
-    if (r == 0) {
-      commands[command] = new StartCommand<I>(replayer);
-    }
-
-    command = "rbd mirror stop " + name;
-    r = admin_socket->register_command(command, command, this,
-                                      "stop rbd mirror " + name);
-    if (r == 0) {
-      commands[command] = new StopCommand<I>(replayer);
-    }
-
-    command = "rbd mirror restart " + name;
-    r = admin_socket->register_command(command, command, this,
-                                      "restart rbd mirror " + name);
-    if (r == 0) {
-      commands[command] = new RestartCommand<I>(replayer);
-    }
-
-    command = "rbd mirror flush " + name;
-    r = admin_socket->register_command(command, command, this,
-                                      "flush rbd mirror " + name);
-    if (r == 0) {
-      commands[command] = new FlushCommand<I>(replayer);
+      commands{{"rbd mirror flush " + name,
+                new FlushCommand<I>("flush rbd mirror " + name, replayer)},
+               {"rbd mirror restart " + name,
+                new RestartCommand<I>("restart rbd mirror " + name, replayer)},
+               {"rbd mirror start " + name,
+                new StartCommand<I>("start rbd mirror " + name, replayer)},
+               {"rbd mirror status " + name,
+                new StatusCommand<I>("get status for rbd mirror " + name, replayer)},
+               {"rbd mirror stop " + name,
+                new StopCommand<I>("stop rbd mirror " + name, replayer)}} {
+  }
+
+  int register_commands() {
+    for (auto &it : commands) {
+      int r = admin_socket->register_command(it.first, it.first, this,
+                                             it.second->desc);
+      if (r < 0) {
+        return r;
+      }
+      it.second->registered = true;
     }
+    return 0;
   }
 
   ~ImageReplayerAdminSocketHook() override {
-    Mutex::Locker locker(lock);
-    for (Commands::const_iterator i = commands.begin(); i != commands.end();
-        ++i) {
-      (void)admin_socket->unregister_command(i->first);
-      delete i->second;
+    for (auto &it : commands) {
+      if (it.second->registered) {
+        admin_socket->unregister_command(it.first);
+      }
+      delete it.second;
     }
     commands.clear();
   }
 
   bool call(std::string command, cmdmap_t& cmdmap, std::string format,
            bufferlist& out) override {
-    Mutex::Locker locker(lock);
-    Commands::const_iterator i = commands.find(command);
+    auto i = commands.find(command);
     assert(i != commands.end());
     Formatter *f = Formatter::create(format);
     stringstream ss;
@@ -225,10 +207,9 @@ public:
   }
 
 private:
-  typedef std::map<std::string, ImageReplayerAdminSocketCommand*> Commands;
+  typedef std::map<std::string, ImageReplayerAdminSocketCommand<I> *> Commands;
 
   AdminSocket *admin_socket;
-  Mutex lock;
   Commands commands;
 };
 
@@ -270,7 +251,7 @@ void ImageReplayer<I>::RemoteJournalerListener::handle_update(
 }
 
 template <typename I>
-ImageReplayer<I>::ImageReplayer(Threads<librbd::ImageCtx> *threads,
+ImageReplayer<I>::ImageReplayer(Threads<I> *threads,
                                 ImageDeleter<I>* image_deleter,
                                 InstanceWatcher<I> *instance_watcher,
                                 RadosRef local,
@@ -283,7 +264,7 @@ ImageReplayer<I>::ImageReplayer(Threads<librbd::ImageCtx> *threads,
   m_local(local),
   m_local_mirror_uuid(local_mirror_uuid),
   m_local_pool_id(local_pool_id),
-  m_global_image_id(global_image_id),
+  m_global_image_id(global_image_id), m_local_image_name(global_image_id),
   m_lock("rbd::mirror::ImageReplayer " + stringify(local_pool_id) + " " +
         global_image_id),
   m_progress_cxt(this),
@@ -303,14 +284,13 @@ ImageReplayer<I>::ImageReplayer(Threads<librbd::ImageCtx> *threads,
   }
 
   m_name = pool_name + "/" + m_global_image_id;
-  dout(20) << "registered asok hook: " << m_name << dendl;
-  m_asok_hook = new ImageReplayerAdminSocketHook<I>(g_ceph_context, m_name,
-                                                    this);
+  register_admin_socket_hook();
 }
 
 template <typename I>
 ImageReplayer<I>::~ImageReplayer()
 {
+  unregister_admin_socket_hook();
   assert(m_event_preprocessor == nullptr);
   assert(m_replay_status_formatter == nullptr);
   assert(m_local_image_ctx == nullptr);
@@ -323,7 +303,6 @@ ImageReplayer<I>::~ImageReplayer()
   assert(m_in_flight_status_updates == 0);
 
   delete m_journal_listener;
-  delete m_asok_hook;
 }
 
 template <typename I>
@@ -342,32 +321,15 @@ image_replayer::HealthState ImageReplayer<I>::get_health_state() const {
 }
 
 template <typename I>
-void ImageReplayer<I>::add_remote_image(const std::string &mirror_uuid,
-                                        const std::string &image_id,
-                                        librados::IoCtx &io_ctx) {
+void ImageReplayer<I>::add_peer(const std::string &peer_uuid,
+                                librados::IoCtx &io_ctx) {
   Mutex::Locker locker(m_lock);
-
-  RemoteImage remote_image(mirror_uuid, image_id, io_ctx);
-  auto it = m_remote_images.find(remote_image);
-  if (it == m_remote_images.end()) {
-    m_remote_images.insert(remote_image);
+  auto it = m_peers.find({peer_uuid});
+  if (it == m_peers.end()) {
+    m_peers.insert({peer_uuid, io_ctx});
   }
 }
 
-template <typename I>
-void ImageReplayer<I>::remove_remote_image(const std::string &mirror_uuid,
-                                           const std::string &image_id,
-                                          bool schedule_delete) {
-  Mutex::Locker locker(m_lock);
-  m_remote_images.erase({mirror_uuid, image_id});
-}
-
-template <typename I>
-bool ImageReplayer<I>::remote_images_empty() const {
-  Mutex::Locker locker(m_lock);
-  return m_remote_images.empty();
-}
-
 template <typename I>
 void ImageReplayer<I>::set_state_description(int r, const std::string &desc) {
   dout(20) << r << " " << desc << dendl;
@@ -397,6 +359,7 @@ void ImageReplayer<I>::start(Context *on_finish, bool manual)
       m_last_r = 0;
       m_state_desc.clear();
       m_manual_stop = false;
+      m_delete_requested = false;
 
       if (on_finish != nullptr) {
         assert(m_on_start_finish == nullptr);
@@ -421,6 +384,31 @@ void ImageReplayer<I>::start(Context *on_finish, bool manual)
     return;
   }
 
+  wait_for_deletion();
+}
+
+template <typename I>
+void ImageReplayer<I>::wait_for_deletion() {
+  dout(20) << dendl;
+
+  Context *ctx = create_context_callback<
+    ImageReplayer, &ImageReplayer<I>::handle_wait_for_deletion>(this);
+  m_image_deleter->wait_for_scheduled_deletion(
+    m_local_pool_id, m_global_image_id, ctx, false);
+}
+
+template <typename I>
+void ImageReplayer<I>::handle_wait_for_deletion(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  if (r == -ECANCELED) {
+    on_start_fail(0, "");
+    return;
+  } else if (r < 0) {
+    on_start_fail(r, "error waiting for image deletion");
+    return;
+  }
+
   prepare_local_image();
 }
 
@@ -428,10 +416,11 @@ template <typename I>
 void ImageReplayer<I>::prepare_local_image() {
   dout(20) << dendl;
 
+  m_local_image_id = "";
   Context *ctx = create_context_callback<
     ImageReplayer, &ImageReplayer<I>::handle_prepare_local_image>(this);
   auto req = PrepareLocalImageRequest<I>::create(
-    m_local_ioctx, m_global_image_id, &m_local_image_id,
+    m_local_ioctx, m_global_image_id, &m_local_image_id, &m_local_image_name,
     &m_local_image_tag_owner, m_threads->work_queue, ctx);
   req->send();
 }
@@ -445,39 +434,81 @@ void ImageReplayer<I>::handle_prepare_local_image(int r) {
   } else if (r < 0) {
     on_start_fail(r, "error preparing local image for replay");
     return;
-  } else if (m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) {
-    dout(5) << "local image is primary" << dendl;
-    on_start_fail(0, "local image is primary");
-    return;
+  } else {
+    reregister_admin_socket_hook();
   }
 
   // local image doesn't exist or is non-primary
-  bootstrap();
+  prepare_remote_image();
 }
 
 template <typename I>
-void ImageReplayer<I>::bootstrap() {
+void ImageReplayer<I>::prepare_remote_image() {
   dout(20) << dendl;
+  if (m_peers.empty()) {
+    // technically nothing to bootstrap, but it handles the status update
+    bootstrap();
+    return;
+  }
+
+  // TODO need to support multiple remote images
+  assert(!m_peers.empty());
+  m_remote_image = {*m_peers.begin()};
+
+  Context *ctx = create_context_callback<
+    ImageReplayer, &ImageReplayer<I>::handle_prepare_remote_image>(this);
+  auto req = PrepareRemoteImageRequest<I>::create(
+    m_threads, m_remote_image.io_ctx, m_global_image_id, m_local_mirror_uuid,
+    m_local_image_id, &m_remote_image.mirror_uuid, &m_remote_image.image_id,
+    &m_remote_journaler, &m_client_state, &m_client_meta, ctx);
+  req->send();
+}
+
+template <typename I>
+void ImageReplayer<I>::handle_prepare_remote_image(int r) {
+  dout(20) << "r=" << r << dendl;
 
-  if (m_remote_images.empty()) {
-    on_start_fail(-EREMOTEIO, "waiting for primary remote image");
+  assert(r < 0 ? m_remote_journaler == nullptr : m_remote_journaler != nullptr);
+  if (r < 0 && !m_local_image_id.empty() &&
+      m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) {
+    // local image is primary -- fall-through
+  } else if (r == -ENOENT) {
+    dout(20) << "remote image does not exist" << dendl;
+
+    // TODO need to support multiple remote images
+    if (!m_local_image_id.empty() &&
+        m_local_image_tag_owner == m_remote_image.mirror_uuid) {
+      // local image exists and is non-primary and linked to the missing
+      // remote image
+
+      m_delete_requested = true;
+      on_start_fail(0, "remote image no longer exists");
+    } else {
+      on_start_fail(-ENOENT, "remote image does not exist");
+    }
+    return;
+  } else if (r < 0) {
+    on_start_fail(r, "error retrieving remote image id");
     return;
   }
 
-  // TODO bootstrap will need to support multiple remote images
-  m_remote_image = *m_remote_images.begin();
+  bootstrap();
+}
 
-  CephContext *cct = static_cast<CephContext *>(m_local->cct());
-  journal::Settings settings;
-  settings.commit_interval = cct->_conf->rbd_mirror_journal_commit_age;
-  settings.max_fetch_bytes = cct->_conf->rbd_mirror_journal_max_fetch_bytes;
+template <typename I>
+void ImageReplayer<I>::bootstrap() {
+  dout(20) << dendl;
 
-  m_remote_journaler = new Journaler(m_threads->work_queue,
-                                     m_threads->timer,
-                                     &m_threads->timer_lock,
-                                     m_remote_image.io_ctx,
-                                     m_remote_image.image_id,
-                                     m_local_mirror_uuid, settings);
+  if (!m_local_image_id.empty() &&
+      m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) {
+    dout(5) << "local image is primary" << dendl;
+    on_start_fail(0, "local image is primary");
+    return;
+  } else if (m_peers.empty()) {
+    dout(5) << "no peer clusters" << dendl;
+    on_start_fail(-ENOENT, "no peer clusters");
+    return;
+  }
 
   Context *ctx = create_context_callback<
     ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
@@ -487,7 +518,8 @@ void ImageReplayer<I>::bootstrap() {
     &m_local_image_ctx, m_local_image_id, m_remote_image.image_id,
     m_global_image_id, m_threads->work_queue, m_threads->timer,
     &m_threads->timer_lock, m_local_mirror_uuid, m_remote_image.mirror_uuid,
-    m_remote_journaler, &m_client_meta, ctx, &m_do_resync, &m_progress_cxt);
+    m_remote_journaler, &m_client_state, &m_client_meta, ctx,
+    &m_resync_requested, &m_progress_cxt);
 
   {
     Mutex::Locker locker(m_lock);
@@ -527,6 +559,9 @@ void ImageReplayer<I>::handle_bootstrap(int r) {
     return;
   } else if (on_start_interrupted()) {
     return;
+  } else if (m_resync_requested) {
+    on_start_fail(0, "resync requested");
+    return;
   }
 
   assert(m_local_journal == nullptr);
@@ -543,41 +578,6 @@ void ImageReplayer<I>::handle_bootstrap(int r) {
     return;
   }
 
-  {
-    Mutex::Locker locker(m_lock);
-
-    if (m_do_resync) {
-      Context *on_finish = m_on_start_finish;
-      m_stopping_for_resync = true;
-      FunctionContext *ctx = new FunctionContext([this, on_finish](int r) {
-         if (r < 0) {
-           if (on_finish) {
-             on_finish->complete(r);
-           }
-           return;
-         }
-          resync_image(on_finish);
-        });
-      m_on_start_finish = ctx;
-    }
-
-    std::string name = m_local_ioctx.get_pool_name() + "/" +
-                       m_local_image_ctx->name;
-    if (m_name != name) {
-      m_name = name;
-      if (m_asok_hook) {
-       // Re-register asok commands using the new name.
-       delete m_asok_hook;
-       m_asok_hook = nullptr;
-      }
-    }
-    if (!m_asok_hook) {
-      dout(20) << "registered asok hook: " << m_name << dendl;
-      m_asok_hook = new ImageReplayerAdminSocketHook<I>(g_ceph_context, m_name,
-                                                        this);
-    }
-  }
-
   update_mirror_image_status(false, boost::none);
   init_remote_journaler();
 }
@@ -614,13 +614,18 @@ void ImageReplayer<I>::handle_init_remote_journaler(int r) {
     return;
   }
 
-  if (client.state != cls::journal::CLIENT_STATE_CONNECTED) {
+  dout(5) << "image_id=" << m_local_image_id << ", "
+          << "client_meta.image_id=" << m_client_meta.image_id << ", "
+          << "client.state=" << client.state << dendl;
+  if (m_client_meta.image_id == m_local_image_id &&
+      client.state != cls::journal::CLIENT_STATE_CONNECTED) {
     dout(5) << "client flagged disconnected, stopping image replay" << dendl;
     if (m_local_image_ctx->mirroring_resync_after_disconnect) {
-      Mutex::Locker locker(m_lock);
-      m_stopping_for_resync = true;
+      m_resync_requested = true;
+      on_start_fail(-ENOTCONN, "disconnected: automatic resync");
+    } else {
+      on_start_fail(-ENOTCONN, "disconnected");
     }
-    on_start_fail(-ENOTCONN, "disconnected");
     return;
   }
 
@@ -665,19 +670,14 @@ void ImageReplayer<I>::handle_start_replay(int r) {
   update_mirror_image_status(true, boost::none);
   reschedule_update_status_task(30);
 
-  dout(20) << "start succeeded" << dendl;
-  if (on_finish != nullptr) {
-    dout(20) << "on finish complete, r=" << r << dendl;
-    on_finish->complete(r);
-  }
-
   if (on_replay_interrupted()) {
     return;
   }
 
   {
     CephContext *cct = static_cast<CephContext *>(m_local->cct());
-    double poll_seconds = cct->_conf->rbd_mirror_journal_poll_age;
+    double poll_seconds = cct->_conf->get_val<double>(
+      "rbd_mirror_journal_poll_age");
 
     Mutex::Locker locker(m_lock);
     m_replay_handler = new ReplayHandler<I>(this);
@@ -686,6 +686,11 @@ void ImageReplayer<I>::handle_start_replay(int r) {
     dout(20) << "m_remote_journaler=" << *m_remote_journaler << dendl;
   }
 
+  dout(20) << "start succeeded" << dendl;
+  if (on_finish != nullptr) {
+    dout(20) << "on finish complete, r=" << r << dendl;
+    on_finish->complete(r);
+  }
 }
 
 template <typename I>
@@ -697,7 +702,7 @@ void ImageReplayer<I>::on_start_fail(int r, const std::string &desc)
         Mutex::Locker locker(m_lock);
         assert(m_state == STATE_STARTING);
         m_state = STATE_STOPPING;
-        if (r < 0 && r != -ECANCELED && r != -EREMOTEIO) {
+        if (r < 0 && r != -ECANCELED && r != -EREMOTEIO && r != -ENOENT) {
           derr << "start failed: " << cpp_strerror(r) << dendl;
         } else {
           dout(20) << "start canceled" << dendl;
@@ -732,10 +737,11 @@ void ImageReplayer<I>::stop(Context *on_finish, bool manual, int r,
   dout(20) << "on_finish=" << on_finish << ", manual=" << manual
           << ", desc=" << desc << dendl;
 
+  m_image_deleter->cancel_waiter(m_local_pool_id, m_global_image_id);
+
   image_replayer::BootstrapRequest<I> *bootstrap_request = nullptr;
   bool shut_down_replay = false;
   bool running = true;
-  bool canceled_task = false;
   {
     Mutex::Locker locker(m_lock);
 
@@ -758,14 +764,6 @@ void ImageReplayer<I>::stop(Context *on_finish, bool manual, int r,
         std::swap(m_on_stop_finish, on_finish);
         m_stop_requested = true;
         m_manual_stop = manual;
-
-       Mutex::Locker timer_locker(m_threads->timer_lock);
-        if (m_delayed_preprocess_task != nullptr) {
-          canceled_task = m_threads->timer->cancel_event(
-            m_delayed_preprocess_task);
-          assert(canceled_task);
-          m_delayed_preprocess_task = nullptr;
-        }
       }
     }
   }
@@ -776,11 +774,6 @@ void ImageReplayer<I>::stop(Context *on_finish, bool manual, int r,
     bootstrap_request->put();
   }
 
-  if (canceled_task) {
-    m_event_replay_tracker.finish_op();
-    on_replay_interrupted();
-  }
-
   if (!running) {
     dout(20) << "not running" << dendl;
     if (on_finish) {
@@ -1089,16 +1082,30 @@ void ImageReplayer<I>::handle_get_remote_tag(int r) {
 
 template <typename I>
 void ImageReplayer<I>::allocate_local_tag() {
-  dout(20) << dendl;
+  dout(15) << dendl;
 
   std::string mirror_uuid = m_replay_tag_data.mirror_uuid;
-  if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID ||
-      mirror_uuid == m_local_mirror_uuid) {
+  if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
     mirror_uuid = m_remote_image.mirror_uuid;
+  } else if (mirror_uuid == m_local_mirror_uuid) {
+    mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
   } else if (mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) {
-    dout(5) << "encountered image demotion: stopping" << dendl;
-    Mutex::Locker locker(m_lock);
-    m_stop_requested = true;
+    // handle possible edge condition where daemon can failover and
+    // the local image has already been promoted/demoted
+    auto local_tag_data = m_local_journal->get_tag_data();
+    if (local_tag_data.mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID &&
+        (local_tag_data.predecessor.commit_valid &&
+         local_tag_data.predecessor.mirror_uuid ==
+           librbd::Journal<>::LOCAL_MIRROR_UUID)) {
+      dout(15) << "skipping stale demotion event" << dendl;
+      handle_process_entry_safe(m_replay_entry, 0);
+      handle_replay_ready();
+      return;
+    } else {
+      dout(5) << "encountered image demotion: stopping" << dendl;
+      Mutex::Locker locker(m_lock);
+      m_stop_requested = true;
+    }
   }
 
   librbd::journal::TagPredecessor predecessor(m_replay_tag_data.predecessor);
@@ -1108,10 +1115,9 @@ void ImageReplayer<I>::allocate_local_tag() {
     predecessor.mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
   }
 
-  dout(20) << "mirror_uuid=" << mirror_uuid << ", "
-           << "predecessor_mirror_uuid=" << predecessor.mirror_uuid << ", "
-           << "replay_tag_tid=" << m_replay_tag_tid << ", "
-           << "replay_tag_data=" << m_replay_tag_data << dendl;
+  dout(15) << "mirror_uuid=" << mirror_uuid << ", "
+           << "predecessor=" << predecessor << ", "
+           << "replay_tag_tid=" << m_replay_tag_tid << dendl;
   Context *ctx = create_context_callback<
     ImageReplayer, &ImageReplayer<I>::handle_allocate_local_tag>(this);
   m_local_journal->allocate_tag(mirror_uuid, predecessor, ctx);
@@ -1119,7 +1125,8 @@ void ImageReplayer<I>::allocate_local_tag() {
 
 template <typename I>
 void ImageReplayer<I>::handle_allocate_local_tag(int r) {
-  dout(20) << "r=" << r << dendl;
+  dout(15) << "r=" << r << ", "
+           << "tag_tid=" << m_local_journal->get_tag_tid() << dendl;
 
   if (r < 0) {
     derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl;
@@ -1225,6 +1232,19 @@ void ImageReplayer<I>::handle_process_entry_ready(int r) {
   dout(20) << dendl;
   assert(r == 0);
 
+  bool update_status = false;
+  {
+    RWLock::RLocker snap_locker(m_local_image_ctx->snap_lock);
+    if (m_local_image_name != m_local_image_ctx->name) {
+      m_local_image_name = m_local_image_ctx->name;
+      update_status = true;
+    }
+  }
+
+  if (update_status) {
+    reschedule_update_status_task(0);
+  }
+
   // attempt to process the next event
   handle_replay_ready();
 }
@@ -1283,6 +1303,8 @@ bool ImageReplayer<I>::start_mirror_image_status_update(bool force,
 
 template <typename I>
 void ImageReplayer<I>::finish_mirror_image_status_update() {
+  reregister_admin_socket_hook();
+
   Context *on_finish = nullptr;
   {
     Mutex::Locker locker(m_lock);
@@ -1504,6 +1526,22 @@ void ImageReplayer<I>::reschedule_update_status_task(int new_interval) {
 template <typename I>
 void ImageReplayer<I>::shut_down(int r) {
   dout(20) << "r=" << r << dendl;
+
+  bool canceled_delayed_preprocess_task = false;
+  {
+    Mutex::Locker timer_locker(m_threads->timer_lock);
+    if (m_delayed_preprocess_task != nullptr) {
+      canceled_delayed_preprocess_task = m_threads->timer->cancel_event(
+        m_delayed_preprocess_task);
+      assert(canceled_delayed_preprocess_task);
+      m_delayed_preprocess_task = nullptr;
+    }
+  }
+  if (canceled_delayed_preprocess_task) {
+    // wake up sleeping replay
+    m_event_replay_tracker.finish_op();
+  }
+
   {
     Mutex::Locker locker(m_lock);
     assert(m_state == STATE_STOPPING);
@@ -1544,11 +1582,6 @@ void ImageReplayer<I>::shut_down(int r) {
        m_remote_journaler->remove_listener(&m_remote_listener);
         m_remote_journaler->shut_down(ctx);
       });
-    if (m_stopping_for_resync) {
-      ctx = new FunctionContext([this, ctx](int r) {
-          m_remote_journaler->unregister_client(ctx);
-        });
-    }
   }
 
   // stop the replay of remote journal events
@@ -1620,6 +1653,7 @@ template <typename I>
 void ImageReplayer<I>::handle_shut_down(int r) {
   reschedule_update_status_task(-1);
 
+  bool unregister_asok_hook = false;
   {
     Mutex::Locker locker(m_lock);
 
@@ -1636,15 +1670,36 @@ void ImageReplayer<I>::handle_shut_down(int r) {
       return;
     }
 
-    if (m_stopping_for_resync) {
+    bool delete_requested = false;
+    if (m_delete_requested && !m_local_image_id.empty()) {
+      assert(m_remote_image.image_id.empty());
+      dout(0) << "remote image no longer exists: scheduling deletion" << dendl;
+      delete_requested = true;
+    }
+    if (delete_requested || m_resync_requested) {
       m_image_deleter->schedule_image_delete(m_local,
                                              m_local_pool_id,
                                              m_global_image_id,
-                                             true);
-      m_stopping_for_resync = false;
+                                             m_resync_requested);
+
+      m_local_image_id = "";
+      m_resync_requested = false;
+      if (m_delete_requested) {
+        unregister_asok_hook = true;
+        m_delete_requested = false;
+      }
+    } else if (m_last_r == -ENOENT &&
+               m_local_image_id.empty() && m_remote_image.image_id.empty()) {
+      dout(0) << "mirror image no longer exists" << dendl;
+      unregister_asok_hook = true;
+      m_finished = true;
     }
   }
 
+  if (unregister_asok_hook) {
+    unregister_admin_socket_hook();
+  }
+
   dout(20) << "stop complete" << dendl;
   m_local_ioctx.close();
 
@@ -1721,12 +1776,56 @@ template <typename I>
 void ImageReplayer<I>::resync_image(Context *on_finish) {
   dout(20) << dendl;
 
+  m_resync_requested = true;
+  stop(on_finish);
+}
+
+template <typename I>
+void ImageReplayer<I>::register_admin_socket_hook() {
+  ImageReplayerAdminSocketHook<I> *asok_hook;
   {
-    Mutex::Locker l(m_lock);
-    m_stopping_for_resync = true;
+    Mutex::Locker locker(m_lock);
+    if (m_asok_hook != nullptr) {
+      return;
+    }
+
+    dout(15) << "registered asok hook: " << m_name << dendl;
+    asok_hook = new ImageReplayerAdminSocketHook<I>(g_ceph_context, m_name,
+                                                    this);
+    int r = asok_hook->register_commands();
+    if (r == 0) {
+      m_asok_hook = asok_hook;
+      return;
+    }
+    derr << "error registering admin socket commands" << dendl;
   }
+  delete asok_hook;
+}
 
-  stop(on_finish);
+template <typename I>
+void ImageReplayer<I>::unregister_admin_socket_hook() {
+  dout(15) << dendl;
+
+  AdminSocketHook *asok_hook = nullptr;
+  {
+    Mutex::Locker locker(m_lock);
+    std::swap(asok_hook, m_asok_hook);
+  }
+  delete asok_hook;
+}
+
+template <typename I>
+void ImageReplayer<I>::reregister_admin_socket_hook() {
+  {
+    Mutex::Locker locker(m_lock);
+    auto name = m_local_ioctx.get_pool_name() + "/" + m_local_image_name;
+    if (m_name == name) {
+      return;
+    }
+    m_name = name;
+  }
+  unregister_admin_socket_hook();
+  register_admin_socket_hook();
 }
 
 template <typename I>