]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/tools/rbd_mirror/PoolReplayer.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / tools / rbd_mirror / PoolReplayer.cc
index c766e8609038189444610a1d3ebf68893700b92e..370d81f447065601c13d8af7cc6d787c00bcacc9 100644 (file)
@@ -16,7 +16,9 @@
 #include "librbd/internal.h"
 #include "librbd/Utils.h"
 #include "librbd/Watcher.h"
+#include "librbd/api/Config.h"
 #include "librbd/api/Mirror.h"
+#include "ImageMap.h"
 #include "InstanceReplayer.h"
 #include "InstanceWatcher.h"
 #include "LeaderWatcher.h"
@@ -41,8 +43,11 @@ using librbd::util::create_async_context_callback;
 namespace rbd {
 namespace mirror {
 
+using ::operator<<;
+
 namespace {
 
+const std::string SERVICE_DAEMON_INSTANCE_ID_KEY("instance_id");
 const std::string SERVICE_DAEMON_LEADER_KEY("leader");
 const std::string SERVICE_DAEMON_LOCAL_COUNT_KEY("image_local_count");
 const std::string SERVICE_DAEMON_REMOTE_COUNT_KEY("image_remote_count");
@@ -50,93 +55,101 @@ const std::string SERVICE_DAEMON_REMOTE_COUNT_KEY("image_remote_count");
 const std::vector<std::string> UNIQUE_PEER_CONFIG_KEYS {
   {"monmap", "mon_host", "mon_dns_srv_name", "key", "keyfile", "keyring"}};
 
+template <typename I>
 class PoolReplayerAdminSocketCommand {
 public:
-  PoolReplayerAdminSocketCommand(PoolReplayer *pool_replayer)
+  PoolReplayerAdminSocketCommand(PoolReplayer<I> *pool_replayer)
     : pool_replayer(pool_replayer) {
   }
   virtual ~PoolReplayerAdminSocketCommand() {}
   virtual bool call(Formatter *f, stringstream *ss) = 0;
 protected:
-  PoolReplayer *pool_replayer;
+  PoolReplayer<I> *pool_replayer;
 };
 
-class StatusCommand : public PoolReplayerAdminSocketCommand {
+template <typename I>
+class StatusCommand : public PoolReplayerAdminSocketCommand<I> {
 public:
-  explicit StatusCommand(PoolReplayer *pool_replayer)
-    : PoolReplayerAdminSocketCommand(pool_replayer) {
+  explicit StatusCommand(PoolReplayer<I> *pool_replayer)
+    : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
   }
 
   bool call(Formatter *f, stringstream *ss) override {
-    pool_replayer->print_status(f, ss);
+    this->pool_replayer->print_status(f, ss);
     return true;
   }
 };
 
-class StartCommand : public PoolReplayerAdminSocketCommand {
+template <typename I>
+class StartCommand : public PoolReplayerAdminSocketCommand<I> {
 public:
-  explicit StartCommand(PoolReplayer *pool_replayer)
-    : PoolReplayerAdminSocketCommand(pool_replayer) {
+  explicit StartCommand(PoolReplayer<I> *pool_replayer)
+    : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
   }
 
   bool call(Formatter *f, stringstream *ss) override {
-    pool_replayer->start();
+    this->pool_replayer->start();
     return true;
   }
 };
 
-class StopCommand : public PoolReplayerAdminSocketCommand {
+template <typename I>
+class StopCommand : public PoolReplayerAdminSocketCommand<I> {
 public:
-  explicit StopCommand(PoolReplayer *pool_replayer)
-    : PoolReplayerAdminSocketCommand(pool_replayer) {
+  explicit StopCommand(PoolReplayer<I> *pool_replayer)
+    : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
   }
 
   bool call(Formatter *f, stringstream *ss) override {
-    pool_replayer->stop(true);
+    this->pool_replayer->stop(true);
     return true;
   }
 };
 
-class RestartCommand : public PoolReplayerAdminSocketCommand {
+template <typename I>
+class RestartCommand : public PoolReplayerAdminSocketCommand<I> {
 public:
-  explicit RestartCommand(PoolReplayer *pool_replayer)
-    : PoolReplayerAdminSocketCommand(pool_replayer) {
+  explicit RestartCommand(PoolReplayer<I> *pool_replayer)
+    : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
   }
 
   bool call(Formatter *f, stringstream *ss) override {
-    pool_replayer->restart();
+    this->pool_replayer->restart();
     return true;
   }
 };
 
-class FlushCommand : public PoolReplayerAdminSocketCommand {
+template <typename I>
+class FlushCommand : public PoolReplayerAdminSocketCommand<I> {
 public:
-  explicit FlushCommand(PoolReplayer *pool_replayer)
-    : PoolReplayerAdminSocketCommand(pool_replayer) {
+  explicit FlushCommand(PoolReplayer<I> *pool_replayer)
+    : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
   }
 
   bool call(Formatter *f, stringstream *ss) override {
-    pool_replayer->flush();
+    this->pool_replayer->flush();
     return true;
   }
 };
 
-class LeaderReleaseCommand : public PoolReplayerAdminSocketCommand {
+template <typename I>
+class LeaderReleaseCommand : public PoolReplayerAdminSocketCommand<I> {
 public:
-  explicit LeaderReleaseCommand(PoolReplayer *pool_replayer)
-    : PoolReplayerAdminSocketCommand(pool_replayer) {
+  explicit LeaderReleaseCommand(PoolReplayer<I> *pool_replayer)
+    : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
   }
 
   bool call(Formatter *f, stringstream *ss) override {
-    pool_replayer->release_leader();
+    this->pool_replayer->release_leader();
     return true;
   }
 };
 
+template <typename I>
 class PoolReplayerAdminSocketHook : public AdminSocketHook {
 public:
   PoolReplayerAdminSocketHook(CephContext *cct, const std::string &name,
-                               PoolReplayer *pool_replayer)
+                              PoolReplayer<I> *pool_replayer)
     : admin_socket(cct->get_admin_socket()) {
     std::string command;
     int r;
@@ -145,57 +158,56 @@ public:
     r = admin_socket->register_command(command, command, this,
                                       "get status for rbd mirror " + name);
     if (r == 0) {
-      commands[command] = new StatusCommand(pool_replayer);
+      commands[command] = new StatusCommand<I>(pool_replayer);
     }
 
     command = "rbd mirror start " + name;
     r = admin_socket->register_command(command, command, this,
                                       "start rbd mirror " + name);
     if (r == 0) {
-      commands[command] = new StartCommand(pool_replayer);
+      commands[command] = new StartCommand<I>(pool_replayer);
     }
 
     command = "rbd mirror stop " + name;
     r = admin_socket->register_command(command, command, this,
                                       "stop rbd mirror " + name);
     if (r == 0) {
-      commands[command] = new StopCommand(pool_replayer);
+      commands[command] = new StopCommand<I>(pool_replayer);
     }
 
     command = "rbd mirror restart " + name;
     r = admin_socket->register_command(command, command, this,
                                       "restart rbd mirror " + name);
     if (r == 0) {
-      commands[command] = new RestartCommand(pool_replayer);
+      commands[command] = new RestartCommand<I>(pool_replayer);
     }
 
     command = "rbd mirror flush " + name;
     r = admin_socket->register_command(command, command, this,
                                       "flush rbd mirror " + name);
     if (r == 0) {
-      commands[command] = new FlushCommand(pool_replayer);
+      commands[command] = new FlushCommand<I>(pool_replayer);
     }
 
     command = "rbd mirror leader release " + name;
     r = admin_socket->register_command(command, command, this,
                                        "release rbd mirror leader " + name);
     if (r == 0) {
-      commands[command] = new LeaderReleaseCommand(pool_replayer);
+      commands[command] = new LeaderReleaseCommand<I>(pool_replayer);
     }
   }
 
   ~PoolReplayerAdminSocketHook() override {
-    for (Commands::const_iterator i = commands.begin(); i != commands.end();
-        ++i) {
+    for (auto i = commands.begin(); i != commands.end(); ++i) {
       (void)admin_socket->unregister_command(i->first);
       delete i->second;
     }
   }
 
-  bool call(std::string command, cmdmap_t& cmdmap, std::string format,
-           bufferlist& out) override {
-    Commands::const_iterator i = commands.find(command);
-    assert(i != commands.end());
+  bool call(std::string_view command, const cmdmap_t& cmdmap,
+           std::string_view format, bufferlist& out) override {
+    auto i = commands.find(command);
+    ceph_assert(i != commands.end());
     Formatter *f = Formatter::create(format);
     stringstream ss;
     bool r = i->second->call(f, &ss);
@@ -205,7 +217,8 @@ public:
   }
 
 private:
-  typedef std::map<std::string, PoolReplayerAdminSocketCommand*> Commands;
+  typedef std::map<std::string, PoolReplayerAdminSocketCommand<I>*,
+                  std::less<>> Commands;
 
   AdminSocket *admin_socket;
   Commands commands;
@@ -213,57 +226,62 @@ private:
 
 } // anonymous namespace
 
-PoolReplayer::PoolReplayer(Threads<librbd::ImageCtx> *threads,
-                           ServiceDaemon<librbd::ImageCtx>* service_daemon,
-                          ImageDeleter<>* image_deleter,
-                          int64_t local_pool_id, const peer_t &peer,
-                          const std::vector<const char*> &args) :
+template <typename I>
+PoolReplayer<I>::PoolReplayer(Threads<I> *threads,
+                              ServiceDaemon<I>* service_daemon,
+                             int64_t local_pool_id, const PeerSpec &peer,
+                             const std::vector<const char*> &args) :
   m_threads(threads),
   m_service_daemon(service_daemon),
-  m_image_deleter(image_deleter),
   m_local_pool_id(local_pool_id),
   m_peer(peer),
   m_args(args),
   m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer)),
   m_local_pool_watcher_listener(this, true),
   m_remote_pool_watcher_listener(this, false),
+  m_image_map_listener(this),
   m_pool_replayer_thread(this),
   m_leader_listener(this)
 {
 }
 
-PoolReplayer::~PoolReplayer()
+template <typename I>
+PoolReplayer<I>::~PoolReplayer()
 {
   delete m_asok_hook;
   shut_down();
 }
 
-bool PoolReplayer::is_blacklisted() const {
+template <typename I>
+bool PoolReplayer<I>::is_blacklisted() const {
   Mutex::Locker locker(m_lock);
   return m_blacklisted;
 }
 
-bool PoolReplayer::is_leader() const {
+template <typename I>
+bool PoolReplayer<I>::is_leader() const {
   Mutex::Locker locker(m_lock);
   return m_leader_watcher && m_leader_watcher->is_leader();
 }
 
-bool PoolReplayer::is_running() const {
+template <typename I>
+bool PoolReplayer<I>::is_running() const {
   return m_pool_replayer_thread.is_started();
 }
 
-void PoolReplayer::init()
+template <typename I>
+void PoolReplayer<I>::init()
 {
-  assert(!m_pool_replayer_thread.is_started());
+  ceph_assert(!m_pool_replayer_thread.is_started());
 
   // reset state
   m_stopping = false;
   m_blacklisted = false;
 
-  dout(20) << "replaying for " << m_peer << dendl;
+  dout(10) << "replaying for " << m_peer << dendl;
   int r = init_rados(g_ceph_context->_conf->cluster,
                      g_ceph_context->_conf->name.to_str(),
-                     "local cluster", &m_local_rados, false);
+                     "", "", "local cluster", &m_local_rados, false);
   if (r < 0) {
     m_callout_id = m_service_daemon->add_or_update_callout(
       m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
@@ -272,6 +290,7 @@ void PoolReplayer::init()
   }
 
   r = init_rados(m_peer.cluster_name, m_peer.client_name,
+                 m_peer.mon_host, m_peer.key,
                  std::string("remote peer ") + stringify(m_peer),
                  &m_remote_rados, true);
   if (r < 0) {
@@ -288,6 +307,9 @@ void PoolReplayer::init()
     return;
   }
 
+  auto cct = reinterpret_cast<CephContext *>(m_local_io_ctx.cct());
+  librbd::api::Config<I>::apply_pool_overrides(m_local_io_ctx, &cct->_conf);
+
   std::string local_mirror_uuid;
   r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx,
                                           &local_mirror_uuid);
@@ -311,15 +333,15 @@ void PoolReplayer::init()
     return;
   }
 
-  dout(20) << "connected to " << m_peer << dendl;
+  dout(10) << "connected to " << m_peer << dendl;
 
-  m_instance_replayer.reset(InstanceReplayer<>::create(
-    m_threads, m_service_daemon, m_image_deleter, m_local_rados,
-    local_mirror_uuid, m_local_pool_id));
+  m_instance_replayer.reset(InstanceReplayer<I>::create(
+    m_threads, m_service_daemon, m_local_rados, local_mirror_uuid,
+    m_local_pool_id));
   m_instance_replayer->init();
   m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx);
 
-  m_instance_watcher.reset(InstanceWatcher<>::create(
+  m_instance_watcher.reset(InstanceWatcher<I>::create(
     m_local_io_ctx, m_threads->work_queue, m_instance_replayer.get()));
   r = m_instance_watcher->init();
   if (r < 0) {
@@ -329,9 +351,12 @@ void PoolReplayer::init()
       "unable to initialize instance messenger object");
     return;
   }
+  m_service_daemon->add_or_update_attribute(
+      m_local_pool_id, SERVICE_DAEMON_INSTANCE_ID_KEY,
+      m_instance_watcher->get_instance_id());
 
-  m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
-                                             &m_leader_listener));
+  m_leader_watcher.reset(LeaderWatcher<I>::create(m_threads, m_local_io_ctx,
+                                                  &m_leader_listener));
   r = m_leader_watcher->init();
   if (r < 0) {
     derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl;
@@ -349,7 +374,8 @@ void PoolReplayer::init()
   m_pool_replayer_thread.create("pool replayer");
 }
 
-void PoolReplayer::shut_down() {
+template <typename I>
+void PoolReplayer<I>::shut_down() {
   m_stopping = true;
   {
     Mutex::Locker l(m_lock);
@@ -372,17 +398,22 @@ void PoolReplayer::shut_down() {
   m_instance_watcher.reset();
   m_instance_replayer.reset();
 
-  assert(!m_local_pool_watcher);
-  assert(!m_remote_pool_watcher);
+  ceph_assert(!m_image_map);
+  ceph_assert(!m_image_deleter);
+  ceph_assert(!m_local_pool_watcher);
+  ceph_assert(!m_remote_pool_watcher);
   m_local_rados.reset();
   m_remote_rados.reset();
 }
 
-int PoolReplayer::init_rados(const std::string &cluster_name,
-                            const std::string &client_name,
-                            const std::string &description,
-                            RadosRef *rados_ref,
-                             bool strip_cluster_overrides) {
+template <typename I>
+int PoolReplayer<I>::init_rados(const std::string &cluster_name,
+                               const std::string &client_name,
+                                const std::string &mon_host,
+                                const std::string &key,
+                               const std::string &description,
+                               RadosRef *rados_ref,
+                                bool strip_cluster_overrides) {
   rados_ref->reset(new librados::Rados());
 
   // NOTE: manually bootstrap a CephContext here instead of via
@@ -400,8 +431,8 @@ int PoolReplayer::init_rados(const std::string &cluster_name,
   cct->_conf->cluster = cluster_name;
 
   // librados::Rados::conf_read_file
-  int r = cct->_conf->parse_config_files(nullptr, nullptr, 0);
-  if (r < 0) {
+  int r = cct->_conf.parse_config_files(nullptr, nullptr, 0);
+  if (r < 0 && r != -ENOENT) {
     derr << "could not read ceph conf for " << description << ": "
         << cpp_strerror(r) << dendl;
     cct->put();
@@ -415,27 +446,27 @@ int PoolReplayer::init_rados(const std::string &cluster_name,
     // remote peer connections shouldn't apply cluster-specific
     // configuration settings
     for (auto& key : UNIQUE_PEER_CONFIG_KEYS) {
-      config_values[key] = cct->_conf->get_val<std::string>(key);
+      config_values[key] = cct->_conf.get_val<std::string>(key);
     }
   }
 
-  cct->_conf->parse_env();
+  cct->_conf.parse_env(cct->get_module_type());
 
   // librados::Rados::conf_parse_env
   std::vector<const char*> args;
-  env_to_vec(args, nullptr);
-  r = cct->_conf->parse_argv(args);
+  r = cct->_conf.parse_argv(args);
   if (r < 0) {
     derr << "could not parse environment for " << description << ":"
          << cpp_strerror(r) << dendl;
     cct->put();
     return r;
   }
+  cct->_conf.parse_env(cct->get_module_type());
 
   if (!m_args.empty()) {
     // librados::Rados::conf_parse_argv
     args = m_args;
-    r = cct->_conf->parse_argv(args);
+    r = cct->_conf.parse_argv(args);
     if (r < 0) {
       derr << "could not parse command line args for " << description << ": "
           << cpp_strerror(r) << dendl;
@@ -448,28 +479,48 @@ int PoolReplayer::init_rados(const std::string &cluster_name,
     // remote peer connections shouldn't apply cluster-specific
     // configuration settings
     for (auto& pair : config_values) {
-      auto value = cct->_conf->get_val<std::string>(pair.first);
+      auto value = cct->_conf.get_val<std::string>(pair.first);
       if (pair.second != value) {
         dout(0) << "reverting global config option override: "
                 << pair.first << ": " << value << " -> " << pair.second
                 << dendl;
-        cct->_conf->set_val_or_die(pair.first, pair.second);
+        cct->_conf.set_val_or_die(pair.first, pair.second);
       }
     }
   }
 
   if (!g_ceph_context->_conf->admin_socket.empty()) {
-    cct->_conf->set_val_or_die("admin_socket",
+    cct->_conf.set_val_or_die("admin_socket",
                                "$run_dir/$name.$pid.$cluster.$cctid.asok");
   }
 
+  if (!mon_host.empty()) {
+    r = cct->_conf.set_val("mon_host", mon_host);
+    if (r < 0) {
+      derr << "failed to set mon_host config for " << description << ": "
+           << cpp_strerror(r) << dendl;
+      cct->put();
+      return r;
+    }
+  }
+
+  if (!key.empty()) {
+    r = cct->_conf.set_val("key", key);
+    if (r < 0) {
+      derr << "failed to set key config for " << description << ": "
+           << cpp_strerror(r) << dendl;
+      cct->put();
+      return r;
+    }
+  }
+
   // disable unnecessary librbd cache
-  cct->_conf->set_val_or_die("rbd_cache", "false");
-  cct->_conf->apply_changes(nullptr);
-  cct->_conf->complain_about_parse_errors(cct);
+  cct->_conf.set_val_or_die("rbd_cache", "false");
+  cct->_conf.apply_changes(nullptr);
+  cct->_conf.complain_about_parse_errors(cct);
 
   r = (*rados_ref)->init_with_context(cct);
-  assert(r == 0);
+  ceph_assert(r == 0);
   cct->put();
 
   r = (*rados_ref)->connect();
@@ -482,7 +533,8 @@ int PoolReplayer::init_rados(const std::string &cluster_name,
   return 0;
 }
 
-void PoolReplayer::run()
+template <typename I>
+void PoolReplayer<I>::run()
 {
   dout(20) << "enter" << dendl;
 
@@ -493,8 +545,8 @@ void PoolReplayer::run()
       m_asok_hook_name = asok_hook_name;
       delete m_asok_hook;
 
-      m_asok_hook = new PoolReplayerAdminSocketHook(g_ceph_context,
-                                                   m_asok_hook_name, this);
+      m_asok_hook = new PoolReplayerAdminSocketHook<I>(g_ceph_context,
+                                                      m_asok_hook_name, this);
     }
 
     Mutex::Locker locker(m_lock);
@@ -513,7 +565,8 @@ void PoolReplayer::run()
   m_instance_replayer->stop();
 }
 
-void PoolReplayer::print_status(Formatter *f, stringstream *ss)
+template <typename I>
+void PoolReplayer<I>::print_status(Formatter *f, stringstream *ss)
 {
   dout(20) << "enter" << dendl;
 
@@ -528,6 +581,14 @@ void PoolReplayer::print_status(Formatter *f, stringstream *ss)
   f->dump_stream("peer") << m_peer;
   f->dump_string("instance_id", m_instance_watcher->get_instance_id());
 
+  std::string state("running");
+  if (m_manual_stop) {
+    state = "stopped (manual)";
+  } else if (m_stopping) {
+    state = "stopped";
+  }
+  f->dump_string("state", state);
+
   std::string leader_instance_id;
   m_leader_watcher->get_leader_instance_id(&leader_instance_id);
   f->dump_string("leader_instance_id", leader_instance_id);
@@ -545,10 +606,10 @@ void PoolReplayer::print_status(Formatter *f, stringstream *ss)
   }
 
   f->dump_string("local_cluster_admin_socket",
-                 reinterpret_cast<CephContext *>(m_local_io_ctx.cct())->_conf->
+                 reinterpret_cast<CephContext *>(m_local_io_ctx.cct())->_conf.
                      get_val<std::string>("admin_socket"));
   f->dump_string("remote_cluster_admin_socket",
-                 reinterpret_cast<CephContext *>(m_remote_io_ctx.cct())->_conf->
+                 reinterpret_cast<CephContext *>(m_remote_io_ctx.cct())->_conf.
                      get_val<std::string>("admin_socket"));
 
   f->open_object_section("sync_throttler");
@@ -557,11 +618,18 @@ void PoolReplayer::print_status(Formatter *f, stringstream *ss)
 
   m_instance_replayer->print_status(f, ss);
 
+  if (m_image_deleter) {
+    f->open_object_section("image_deleter");
+    m_image_deleter->print_status(f, ss);
+    f->close_section();
+  }
+
   f->close_section();
   f->flush(*ss);
 }
 
-void PoolReplayer::start()
+template <typename I>
+void PoolReplayer<I>::start()
 {
   dout(20) << "enter" << dendl;
 
@@ -571,10 +639,12 @@ void PoolReplayer::start()
     return;
   }
 
+  m_manual_stop = false;
   m_instance_replayer->start();
 }
 
-void PoolReplayer::stop(bool manual)
+template <typename I>
+void PoolReplayer<I>::stop(bool manual)
 {
   dout(20) << "enter: manual=" << manual << dendl;
 
@@ -587,10 +657,12 @@ void PoolReplayer::stop(bool manual)
     return;
   }
 
+  m_manual_stop = true;
   m_instance_replayer->stop();
 }
 
-void PoolReplayer::restart()
+template <typename I>
+void PoolReplayer<I>::restart()
 {
   dout(20) << "enter" << dendl;
 
@@ -603,7 +675,8 @@ void PoolReplayer::restart()
   m_instance_replayer->restart();
 }
 
-void PoolReplayer::flush()
+template <typename I>
+void PoolReplayer<I>::flush()
 {
   dout(20) << "enter" << dendl;
 
@@ -616,7 +689,8 @@ void PoolReplayer::flush()
   m_instance_replayer->flush();
 }
 
-void PoolReplayer::release_leader()
+template <typename I>
+void PoolReplayer<I>::release_leader()
 {
   dout(20) << "enter" << dendl;
 
@@ -629,9 +703,10 @@ void PoolReplayer::release_leader()
   m_leader_watcher->release_leader();
 }
 
-void PoolReplayer::handle_update(const std::string &mirror_uuid,
-                                ImageIds &&added_image_ids,
-                                ImageIds &&removed_image_ids) {
+template <typename I>
+void PoolReplayer<I>::handle_update(const std::string &mirror_uuid,
+                                   ImageIds &&added_image_ids,
+                                   ImageIds &&removed_image_ids) {
   if (m_stopping) {
     return;
   }
@@ -653,58 +728,80 @@ void PoolReplayer::handle_update(const std::string &mirror_uuid,
       m_remote_pool_watcher->get_image_count());
   }
 
-  m_update_op_tracker.start_op();
-  Context *ctx = new FunctionContext([this](int r) {
-      dout(20) << "complete handle_update: r=" << r << dendl;
-      m_update_op_tracker.finish_op();
-    });
-
-  C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
-
-  for (auto &image_id : added_image_ids) {
-    // for now always send to myself (the leader)
-    std::string &instance_id = m_instance_watcher->get_instance_id();
-    m_instance_watcher->notify_image_acquire(instance_id, image_id.global_id,
-                                             gather_ctx->new_sub());
+  std::set<std::string> added_global_image_ids;
+  for (auto& image_id : added_image_ids) {
+    added_global_image_ids.insert(image_id.global_id);
   }
 
-  if (!mirror_uuid.empty()) {
-    for (auto &image_id : removed_image_ids) {
-      // for now always send to myself (the leader)
-      std::string &instance_id = m_instance_watcher->get_instance_id();
-      m_instance_watcher->notify_peer_image_removed(instance_id,
-                                                    image_id.global_id,
-                                                    mirror_uuid,
-                                                    gather_ctx->new_sub());
-    }
+  std::set<std::string> removed_global_image_ids;
+  for (auto& image_id : removed_image_ids) {
+    removed_global_image_ids.insert(image_id.global_id);
   }
 
-  gather_ctx->activate();
+  m_image_map->update_images(mirror_uuid,
+                             std::move(added_global_image_ids),
+                             std::move(removed_global_image_ids));
 }
 
-void PoolReplayer::handle_post_acquire_leader(Context *on_finish) {
-  dout(20) << dendl;
+template <typename I>
+void PoolReplayer<I>::handle_post_acquire_leader(Context *on_finish) {
+  dout(10) << dendl;
 
   m_service_daemon->add_or_update_attribute(m_local_pool_id,
                                             SERVICE_DAEMON_LEADER_KEY, true);
   m_instance_watcher->handle_acquire_leader();
-  init_local_pool_watcher(on_finish);
+  init_image_map(on_finish);
 }
 
-void PoolReplayer::handle_pre_release_leader(Context *on_finish) {
-  dout(20) << dendl;
+template <typename I>
+void PoolReplayer<I>::handle_pre_release_leader(Context *on_finish) {
+  dout(10) << dendl;
 
-  m_service_daemon->remove_attribute(m_local_pool_id, SERVICE_DAEMON_LEADER_KEY);
+  m_service_daemon->remove_attribute(m_local_pool_id,
+                                     SERVICE_DAEMON_LEADER_KEY);
   m_instance_watcher->handle_release_leader();
-  shut_down_pool_watchers(on_finish);
+  shut_down_image_deleter(on_finish);
+}
+
+template <typename I>
+void PoolReplayer<I>::init_image_map(Context *on_finish) {
+  dout(5) << dendl;
+
+  Mutex::Locker locker(m_lock);
+  ceph_assert(!m_image_map);
+  m_image_map.reset(ImageMap<I>::create(m_local_io_ctx, m_threads,
+                                        m_instance_watcher->get_instance_id(),
+                                        m_image_map_listener));
+
+  auto ctx = new FunctionContext([this, on_finish](int r) {
+      handle_init_image_map(r, on_finish);
+    });
+  m_image_map->init(create_async_context_callback(
+    m_threads->work_queue, ctx));
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_init_image_map(int r, Context *on_finish) {
+  dout(5) << "r=" << r << dendl;
+  if (r < 0) {
+    derr << "failed to init image map: " << cpp_strerror(r) << dendl;
+    on_finish = new FunctionContext([on_finish, r](int) {
+        on_finish->complete(r);
+      });
+    shut_down_image_map(on_finish);
+    return;
+  }
+
+  init_local_pool_watcher(on_finish);
 }
 
-void PoolReplayer::init_local_pool_watcher(Context *on_finish) {
-  dout(20) << dendl;
+template <typename I>
+void PoolReplayer<I>::init_local_pool_watcher(Context *on_finish) {
+  dout(10) << dendl;
 
   Mutex::Locker locker(m_lock);
-  assert(!m_local_pool_watcher);
-  m_local_pool_watcher.reset(new PoolWatcher<>(
+  ceph_assert(!m_local_pool_watcher);
+  m_local_pool_watcher.reset(PoolWatcher<I>::create(
     m_threads, m_local_io_ctx, m_local_pool_watcher_listener));
 
   // ensure the initial set of local images is up-to-date
@@ -716,36 +813,133 @@ void PoolReplayer::init_local_pool_watcher(Context *on_finish) {
     m_threads->work_queue, ctx));
 }
 
-void PoolReplayer::handle_init_local_pool_watcher(int r, Context *on_finish) {
-  dout(20) << "r=" << r << dendl;
+template <typename I>
+void PoolReplayer<I>::handle_init_local_pool_watcher(
+    int r, Context *on_finish) {
+  dout(10) << "r=" << r << dendl;
   if (r < 0) {
     derr << "failed to retrieve local images: " << cpp_strerror(r) << dendl;
-    on_finish->complete(r);
+    on_finish = new FunctionContext([on_finish, r](int) {
+        on_finish->complete(r);
+      });
+    shut_down_pool_watchers(on_finish);
     return;
   }
 
   init_remote_pool_watcher(on_finish);
 }
 
-void PoolReplayer::init_remote_pool_watcher(Context *on_finish) {
-  dout(20) << dendl;
+template <typename I>
+void PoolReplayer<I>::init_remote_pool_watcher(Context *on_finish) {
+  dout(10) << dendl;
 
   Mutex::Locker locker(m_lock);
-  assert(!m_remote_pool_watcher);
-  m_remote_pool_watcher.reset(new PoolWatcher<>(
+  ceph_assert(!m_remote_pool_watcher);
+  m_remote_pool_watcher.reset(PoolWatcher<I>::create(
     m_threads, m_remote_io_ctx, m_remote_pool_watcher_listener));
+
+  auto ctx = new FunctionContext([this, on_finish](int r) {
+      handle_init_remote_pool_watcher(r, on_finish);
+    });
   m_remote_pool_watcher->init(create_async_context_callback(
+    m_threads->work_queue, ctx));
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_init_remote_pool_watcher(
+    int r, Context *on_finish) {
+  dout(10) << "r=" << r << dendl;
+  if (r == -ENOENT) {
+    // Technically nothing to do since the other side doesn't
+    // have mirroring enabled. Eventually the remote pool watcher will
+    // detect images (if mirroring is enabled), so no point propagating
+    // an error which would just busy-spin the state machines.
+    dout(0) << "remote peer does not have mirroring configured" << dendl;
+  } else if (r < 0) {
+    derr << "failed to retrieve remote images: " << cpp_strerror(r) << dendl;
+    on_finish = new FunctionContext([on_finish, r](int) {
+        on_finish->complete(r);
+      });
+    shut_down_pool_watchers(on_finish);
+    return;
+  }
+
+  init_image_deleter(on_finish);
+}
+
+template <typename I>
+void PoolReplayer<I>::init_image_deleter(Context *on_finish) {
+  dout(10) << dendl;
+
+  Mutex::Locker locker(m_lock);
+  ceph_assert(!m_image_deleter);
+
+  on_finish = new FunctionContext([this, on_finish](int r) {
+      handle_init_image_deleter(r, on_finish);
+    });
+  m_image_deleter.reset(ImageDeleter<I>::create(m_local_io_ctx, m_threads,
+                                                m_service_daemon));
+  m_image_deleter->init(create_async_context_callback(
     m_threads->work_queue, on_finish));
+}
 
+template <typename I>
+void PoolReplayer<I>::handle_init_image_deleter(int r, Context *on_finish) {
+  dout(10) << "r=" << r << dendl;
+  if (r < 0) {
+    derr << "failed to init image deleter: " << cpp_strerror(r) << dendl;
+    on_finish = new FunctionContext([on_finish, r](int) {
+        on_finish->complete(r);
+      });
+    shut_down_image_deleter(on_finish);
+    return;
+  }
+
+  on_finish->complete(0);
+
+  Mutex::Locker locker(m_lock);
   m_cond.Signal();
 }
 
-void PoolReplayer::shut_down_pool_watchers(Context *on_finish) {
-  dout(20) << dendl;
+template <typename I>
+void PoolReplayer<I>::shut_down_image_deleter(Context* on_finish) {
+  dout(10) << dendl;
+  {
+    Mutex::Locker locker(m_lock);
+    if (m_image_deleter) {
+      Context *ctx = new FunctionContext([this, on_finish](int r) {
+          handle_shut_down_image_deleter(r, on_finish);
+       });
+      ctx = create_async_context_callback(m_threads->work_queue, ctx);
+
+      m_image_deleter->shut_down(ctx);
+      return;
+    }
+  }
+  shut_down_pool_watchers(on_finish);
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_shut_down_image_deleter(
+    int r, Context* on_finish) {
+  dout(10) << "r=" << r << dendl;
 
   {
     Mutex::Locker locker(m_lock);
-    if (m_local_pool_watcher) { 
+    ceph_assert(m_image_deleter);
+    m_image_deleter.reset();
+  }
+
+  shut_down_pool_watchers(on_finish);
+}
+
+template <typename I>
+void PoolReplayer<I>::shut_down_pool_watchers(Context *on_finish) {
+  dout(10) << dendl;
+
+  {
+    Mutex::Locker locker(m_lock);
+    if (m_local_pool_watcher) {
       Context *ctx = new FunctionContext([this, on_finish](int r) {
           handle_shut_down_pool_watchers(r, on_finish);
        });
@@ -764,12 +958,14 @@ void PoolReplayer::shut_down_pool_watchers(Context *on_finish) {
   on_finish->complete(0);
 }
 
-void PoolReplayer::handle_shut_down_pool_watchers(int r, Context *on_finish) {
-  dout(20) << "r=" << r << dendl;
+template <typename I>
+void PoolReplayer<I>::handle_shut_down_pool_watchers(
+    int r, Context *on_finish) {
+  dout(10) << "r=" << r << dendl;
 
   {
     Mutex::Locker locker(m_lock);
-    assert(m_local_pool_watcher);
+    ceph_assert(m_local_pool_watcher);
     m_local_pool_watcher.reset();
 
     if (m_remote_pool_watcher) {
@@ -779,8 +975,9 @@ void PoolReplayer::handle_shut_down_pool_watchers(int r, Context *on_finish) {
   wait_for_update_ops(on_finish);
 }
 
-void PoolReplayer::wait_for_update_ops(Context *on_finish) {
-  dout(20) << dendl;
+template <typename I>
+void PoolReplayer<I>::wait_for_update_ops(Context *on_finish) {
+  dout(10) << dendl;
 
   Mutex::Locker locker(m_lock);
 
@@ -792,20 +989,117 @@ void PoolReplayer::wait_for_update_ops(Context *on_finish) {
   m_update_op_tracker.wait_for_ops(ctx);
 }
 
-void PoolReplayer::handle_wait_for_update_ops(int r, Context *on_finish) {
-  dout(20) << "r=" << r << dendl;
+template <typename I>
+void PoolReplayer<I>::handle_wait_for_update_ops(int r, Context *on_finish) {
+  dout(10) << "r=" << r << dendl;
+  ceph_assert(r == 0);
 
-  assert(r == 0);
+  shut_down_image_map(on_finish);
+}
+
+template <typename I>
+void PoolReplayer<I>::shut_down_image_map(Context *on_finish) {
+  dout(5) << dendl;
+
+  {
+    Mutex::Locker locker(m_lock);
+    if (m_image_map) {
+      on_finish = new FunctionContext([this, on_finish](int r) {
+          handle_shut_down_image_map(r, on_finish);
+        });
+      m_image_map->shut_down(create_async_context_callback(
+        m_threads->work_queue, on_finish));
+      return;
+    }
+  }
+
+  on_finish->complete(0);
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_shut_down_image_map(int r, Context *on_finish) {
+  dout(5) << "r=" << r << dendl;
+  if (r < 0 && r != -EBLACKLISTED) {
+    derr << "failed to shut down image map: " << cpp_strerror(r) << dendl;
+  }
 
   Mutex::Locker locker(m_lock);
+  ceph_assert(m_image_map);
+  m_image_map.reset();
+
   m_instance_replayer->release_all(on_finish);
 }
 
-void PoolReplayer::handle_update_leader(const std::string &leader_instance_id) {
-  dout(20) << "leader_instance_id=" << leader_instance_id << dendl;
+template <typename I>
+void PoolReplayer<I>::handle_update_leader(
+    const std::string &leader_instance_id) {
+  dout(10) << "leader_instance_id=" << leader_instance_id << dendl;
 
   m_instance_watcher->handle_update_leader(leader_instance_id);
 }
 
+template <typename I>
+void PoolReplayer<I>::handle_acquire_image(const std::string &global_image_id,
+                                           const std::string &instance_id,
+                                           Context* on_finish) {
+  dout(5) << "global_image_id=" << global_image_id << ", "
+          << "instance_id=" << instance_id << dendl;
+
+  m_instance_watcher->notify_image_acquire(instance_id, global_image_id,
+                                           on_finish);
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_release_image(const std::string &global_image_id,
+                                           const std::string &instance_id,
+                                           Context* on_finish) {
+  dout(5) << "global_image_id=" << global_image_id << ", "
+          << "instance_id=" << instance_id << dendl;
+
+  m_instance_watcher->notify_image_release(instance_id, global_image_id,
+                                           on_finish);
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_remove_image(const std::string &mirror_uuid,
+                                          const std::string &global_image_id,
+                                          const std::string &instance_id,
+                                          Context* on_finish) {
+  ceph_assert(!mirror_uuid.empty());
+  dout(5) << "mirror_uuid=" << mirror_uuid << ", "
+          << "global_image_id=" << global_image_id << ", "
+          << "instance_id=" << instance_id << dendl;
+
+  m_instance_watcher->notify_peer_image_removed(instance_id, global_image_id,
+                                                mirror_uuid, on_finish);
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_instances_added(const InstanceIds &instance_ids) {
+  dout(5) << "instance_ids=" << instance_ids << dendl;
+  Mutex::Locker locker(m_lock);
+  if (!m_leader_watcher->is_leader()) {
+    return;
+  }
+
+  ceph_assert(m_image_map);
+  m_image_map->update_instances_added(instance_ids);
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_instances_removed(
+    const InstanceIds &instance_ids) {
+  dout(5) << "instance_ids=" << instance_ids << dendl;
+  Mutex::Locker locker(m_lock);
+  if (!m_leader_watcher->is_leader()) {
+    return;
+  }
+
+  ceph_assert(m_image_map);
+  m_image_map->update_instances_removed(instance_ids);
+}
+
 } // namespace mirror
 } // namespace rbd
+
+template class rbd::mirror::PoolReplayer<librbd::ImageCtx>;