]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/tools/rbd_mirror/PoolReplayer.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / tools / rbd_mirror / PoolReplayer.cc
index ac3463256704a9f95fa67ac953b53fb9a5823f8d..75067813aa59db992cd81d3309594e7210f0de66 100644 (file)
@@ -3,6 +3,7 @@
 
 #include "PoolReplayer.h"
 #include <boost/bind.hpp>
+#include "common/Cond.h"
 #include "common/Formatter.h"
 #include "common/admin_socket.h"
 #include "common/ceph_argparse.h"
 #include "common/common_init.h"
 #include "common/debug.h"
 #include "common/errno.h"
-#include "include/stringify.h"
 #include "cls/rbd/cls_rbd_client.h"
 #include "global/global_context.h"
-#include "librbd/internal.h"
-#include "librbd/Utils.h"
-#include "librbd/Watcher.h"
 #include "librbd/api/Config.h"
-#include "librbd/api/Mirror.h"
-#include "ImageMap.h"
-#include "InstanceReplayer.h"
-#include "InstanceWatcher.h"
-#include "LeaderWatcher.h"
+#include "librbd/api/Namespace.h"
+#include "PoolMetaCache.h"
+#include "RemotePoolPoller.h"
 #include "ServiceDaemon.h"
 #include "Threads.h"
 
 #define dout_prefix *_dout << "rbd::mirror::PoolReplayer: " \
                            << this << " " << __func__ << ": "
 
-using std::chrono::seconds;
-using std::map;
-using std::string;
-using std::unique_ptr;
-using std::vector;
-
-using librbd::cls_client::dir_get_name;
-using librbd::util::create_async_context_callback;
-
 namespace rbd {
 namespace mirror {
 
@@ -49,8 +35,6 @@ namespace {
 
 const std::string SERVICE_DAEMON_INSTANCE_ID_KEY("instance_id");
 const std::string SERVICE_DAEMON_LEADER_KEY("leader");
-const std::string SERVICE_DAEMON_LOCAL_COUNT_KEY("image_local_count");
-const std::string SERVICE_DAEMON_REMOTE_COUNT_KEY("image_remote_count");
 
 const std::vector<std::string> UNIQUE_PEER_CONFIG_KEYS {
   {"monmap", "mon_host", "mon_dns_srv_name", "key", "keyfile", "keyring"}};
@@ -62,7 +46,7 @@ public:
     : pool_replayer(pool_replayer) {
   }
   virtual ~PoolReplayerAdminSocketCommand() {}
-  virtual bool call(Formatter *f, stringstream *ss) = 0;
+  virtual int call(Formatter *f) = 0;
 protected:
   PoolReplayer<I> *pool_replayer;
 };
@@ -74,9 +58,9 @@ public:
     : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
   }
 
-  bool call(Formatter *f, stringstream *ss) override {
-    this->pool_replayer->print_status(f, ss);
-    return true;
+  int call(Formatter *f) override {
+    this->pool_replayer->print_status(f);
+    return 0;
   }
 };
 
@@ -87,9 +71,9 @@ public:
     : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
   }
 
-  bool call(Formatter *f, stringstream *ss) override {
+  int call(Formatter *f) override {
     this->pool_replayer->start();
-    return true;
+    return 0;
   }
 };
 
@@ -100,9 +84,9 @@ public:
     : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
   }
 
-  bool call(Formatter *f, stringstream *ss) override {
+  int call(Formatter *f) override {
     this->pool_replayer->stop(true);
-    return true;
+    return 0;
   }
 };
 
@@ -113,9 +97,9 @@ public:
     : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
   }
 
-  bool call(Formatter *f, stringstream *ss) override {
+  int call(Formatter *f) override {
     this->pool_replayer->restart();
-    return true;
+    return 0;
   }
 };
 
@@ -126,9 +110,9 @@ public:
     : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
   }
 
-  bool call(Formatter *f, stringstream *ss) override {
+  int call(Formatter *f) override {
     this->pool_replayer->flush();
-    return true;
+    return 0;
   }
 };
 
@@ -139,9 +123,9 @@ public:
     : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
   }
 
-  bool call(Formatter *f, stringstream *ss) override {
+  int call(Formatter *f) override {
     this->pool_replayer->release_leader();
-    return true;
+    return 0;
   }
 };
 
@@ -155,42 +139,42 @@ public:
     int r;
 
     command = "rbd mirror status " + name;
-    r = admin_socket->register_command(command, command, this,
+    r = admin_socket->register_command(command, this,
                                       "get status for rbd mirror " + name);
     if (r == 0) {
       commands[command] = new StatusCommand<I>(pool_replayer);
     }
 
     command = "rbd mirror start " + name;
-    r = admin_socket->register_command(command, command, this,
+    r = admin_socket->register_command(command, this,
                                       "start rbd mirror " + name);
     if (r == 0) {
       commands[command] = new StartCommand<I>(pool_replayer);
     }
 
     command = "rbd mirror stop " + name;
-    r = admin_socket->register_command(command, command, this,
+    r = admin_socket->register_command(command, this,
                                       "stop rbd mirror " + name);
     if (r == 0) {
       commands[command] = new StopCommand<I>(pool_replayer);
     }
 
     command = "rbd mirror restart " + name;
-    r = admin_socket->register_command(command, command, this,
+    r = admin_socket->register_command(command, this,
                                       "restart rbd mirror " + name);
     if (r == 0) {
       commands[command] = new RestartCommand<I>(pool_replayer);
     }
 
     command = "rbd mirror flush " + name;
-    r = admin_socket->register_command(command, command, this,
+    r = admin_socket->register_command(command, this,
                                       "flush rbd mirror " + name);
     if (r == 0) {
       commands[command] = new FlushCommand<I>(pool_replayer);
     }
 
     command = "rbd mirror leader release " + name;
-    r = admin_socket->register_command(command, command, this,
+    r = admin_socket->register_command(command, this,
                                        "release rbd mirror leader " + name);
     if (r == 0) {
       commands[command] = new LeaderReleaseCommand<I>(pool_replayer);
@@ -198,22 +182,19 @@ public:
   }
 
   ~PoolReplayerAdminSocketHook() override {
+    (void)admin_socket->unregister_commands(this);
     for (auto i = commands.begin(); i != commands.end(); ++i) {
-      (void)admin_socket->unregister_command(i->first);
       delete i->second;
     }
   }
 
-  bool call(std::string_view command, const cmdmap_t& cmdmap,
-           std::string_view format, bufferlist& out) override {
+  int call(std::string_view command, const cmdmap_t& cmdmap,
+          Formatter *f,
+          std::ostream& ss,
+          bufferlist& out) override {
     auto i = commands.find(command);
     ceph_assert(i != commands.end());
-    Formatter *f = Formatter::create(format);
-    stringstream ss;
-    bool r = i->second->call(f, &ss);
-    delete f;
-    out.append(ss);
-    return r;
+    return i->second->call(f);
   }
 
 private:
@@ -227,40 +208,55 @@ private:
 } // anonymous namespace
 
 template <typename I>
-PoolReplayer<I>::PoolReplayer(Threads<I> *threads,
-                              ServiceDaemon<I>* service_daemon,
-                             int64_t local_pool_id, const PeerSpec &peer,
-                             const std::vector<const char*> &args) :
+struct PoolReplayer<I>::RemotePoolPollerListener
+  : public remote_pool_poller::Listener {
+
+  PoolReplayer<I>* m_pool_replayer;
+
+  RemotePoolPollerListener(PoolReplayer<I>* pool_replayer)
+    : m_pool_replayer(pool_replayer) {
+  }
+
+  void handle_updated(const RemotePoolMeta& remote_pool_meta) override {
+    m_pool_replayer->handle_remote_pool_meta_updated(remote_pool_meta);
+  }
+};
+
+template <typename I>
+PoolReplayer<I>::PoolReplayer(
+    Threads<I> *threads, ServiceDaemon<I> *service_daemon,
+    journal::CacheManagerHandler *cache_manager_handler,
+    PoolMetaCache* pool_meta_cache, int64_t local_pool_id,
+    const PeerSpec &peer, const std::vector<const char*> &args) :
   m_threads(threads),
   m_service_daemon(service_daemon),
+  m_cache_manager_handler(cache_manager_handler),
+  m_pool_meta_cache(pool_meta_cache),
   m_local_pool_id(local_pool_id),
   m_peer(peer),
   m_args(args),
-  m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer)),
-  m_local_pool_watcher_listener(this, true),
-  m_remote_pool_watcher_listener(this, false),
-  m_image_map_listener(this),
+  m_lock(ceph::make_mutex("rbd::mirror::PoolReplayer " + stringify(peer))),
   m_pool_replayer_thread(this),
-  m_leader_listener(this)
-{
+  m_leader_listener(this) {
 }
 
 template <typename I>
 PoolReplayer<I>::~PoolReplayer()
 {
-  delete m_asok_hook;
   shut_down();
+
+  ceph_assert(m_asok_hook == nullptr);
 }
 
 template <typename I>
 bool PoolReplayer<I>::is_blacklisted() const {
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   return m_blacklisted;
 }
 
 template <typename I>
 bool PoolReplayer<I>::is_leader() const {
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   return m_leader_watcher && m_leader_watcher->is_leader();
 }
 
@@ -270,15 +266,15 @@ bool PoolReplayer<I>::is_running() const {
 }
 
 template <typename I>
-void PoolReplayer<I>::init()
-{
-  Mutex::Locker l(m_lock);
+void PoolReplayer<I>::init(const std::string& site_name) {
+  std::lock_guard locker{m_lock};
 
   ceph_assert(!m_pool_replayer_thread.is_started());
 
   // reset state
   m_stopping = false;
   m_blacklisted = false;
+  m_site_name = site_name;
 
   dout(10) << "replaying for " << m_peer << dendl;
   int r = init_rados(g_ceph_context->_conf->cluster,
@@ -312,9 +308,8 @@ void PoolReplayer<I>::init()
   auto cct = reinterpret_cast<CephContext *>(m_local_io_ctx.cct());
   librbd::api::Config<I>::apply_pool_overrides(m_local_io_ctx, &cct->_conf);
 
-  std::string local_mirror_uuid;
   r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx,
-                                          &local_mirror_uuid);
+                                          &m_local_mirror_uuid);
   if (r < 0) {
     derr << "failed to retrieve local mirror uuid from pool "
          << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
@@ -337,25 +332,53 @@ void PoolReplayer<I>::init()
 
   dout(10) << "connected to " << m_peer << dendl;
 
-  m_instance_replayer.reset(InstanceReplayer<I>::create(
-    m_threads, m_service_daemon, m_local_rados, local_mirror_uuid,
-    m_local_pool_id));
-  m_instance_replayer->init();
-  m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx);
+  m_image_sync_throttler.reset(
+      Throttler<I>::create(cct, "rbd_mirror_concurrent_image_syncs"));
+
+  m_image_deletion_throttler.reset(
+      Throttler<I>::create(cct, "rbd_mirror_concurrent_image_deletions"));
 
-  m_instance_watcher.reset(InstanceWatcher<I>::create(
-    m_local_io_ctx, m_threads->work_queue, m_instance_replayer.get()));
-  r = m_instance_watcher->init();
+  m_remote_pool_poller_listener.reset(new RemotePoolPollerListener(this));
+  m_remote_pool_poller.reset(RemotePoolPoller<I>::create(
+    m_threads, m_remote_io_ctx, m_site_name, m_local_mirror_uuid,
+    *m_remote_pool_poller_listener));
+
+  C_SaferCond on_pool_poller_init;
+  m_remote_pool_poller->init(&on_pool_poller_init);
+  r = on_pool_poller_init.wait();
   if (r < 0) {
-    derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl;
+    derr << "failed to initialize remote pool poller: " << cpp_strerror(r)
+         << dendl;
     m_callout_id = m_service_daemon->add_or_update_callout(
       m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
-      "unable to initialize instance messenger object");
+      "unable to initialize remote pool poller");
+    m_remote_pool_poller.reset();
+    return;
+  }
+  ceph_assert(!m_remote_pool_meta.mirror_uuid.empty());
+  m_pool_meta_cache->set_remote_pool_meta(
+    m_remote_io_ctx.get_id(), m_remote_pool_meta);
+  m_pool_meta_cache->set_local_pool_meta(
+    m_local_io_ctx.get_id(), {m_local_mirror_uuid});
+
+  m_default_namespace_replayer.reset(NamespaceReplayer<I>::create(
+      "", m_local_io_ctx, m_remote_io_ctx, m_local_mirror_uuid, m_peer.uuid,
+      m_remote_pool_meta, m_threads, m_image_sync_throttler.get(),
+      m_image_deletion_throttler.get(), m_service_daemon,
+      m_cache_manager_handler, m_pool_meta_cache));
+
+  C_SaferCond on_init;
+  m_default_namespace_replayer->init(&on_init);
+  r = on_init.wait();
+  if (r < 0) {
+    derr << "error initializing default namespace replayer: " << cpp_strerror(r)
+         << dendl;
+    m_callout_id = m_service_daemon->add_or_update_callout(
+      m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
+      "unable to initialize default namespace replayer");
+    m_default_namespace_replayer.reset();
     return;
   }
-  m_service_daemon->add_or_update_attribute(
-      m_local_pool_id, SERVICE_DAEMON_INSTANCE_ID_KEY,
-      m_instance_watcher->get_instance_id());
 
   m_leader_watcher.reset(LeaderWatcher<I>::create(m_threads, m_local_io_ctx,
                                                   &m_leader_listener));
@@ -365,6 +388,7 @@ void PoolReplayer<I>::init()
     m_callout_id = m_service_daemon->add_or_update_callout(
       m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
       "unable to initialize leader messenger object");
+    m_leader_watcher.reset();
     return;
   }
 
@@ -373,37 +397,50 @@ void PoolReplayer<I>::init()
     m_callout_id = service_daemon::CALLOUT_ID_NONE;
   }
 
+  m_service_daemon->add_or_update_attribute(
+    m_local_io_ctx.get_id(), SERVICE_DAEMON_INSTANCE_ID_KEY,
+    stringify(m_local_io_ctx.get_instance_id()));
+
   m_pool_replayer_thread.create("pool replayer");
 }
 
 template <typename I>
 void PoolReplayer<I>::shut_down() {
-  m_stopping = true;
   {
-    Mutex::Locker l(m_lock);
-    m_cond.Signal();
+    std::lock_guard l{m_lock};
+    m_stopping = true;
+    m_cond.notify_all();
   }
   if (m_pool_replayer_thread.is_started()) {
     m_pool_replayer_thread.join();
   }
+
   if (m_leader_watcher) {
     m_leader_watcher->shut_down();
   }
-  if (m_instance_watcher) {
-    m_instance_watcher->shut_down();
+  m_leader_watcher.reset();
+
+  if (m_default_namespace_replayer) {
+    C_SaferCond on_shut_down;
+    m_default_namespace_replayer->shut_down(&on_shut_down);
+    on_shut_down.wait();
   }
-  if (m_instance_replayer) {
-    m_instance_replayer->shut_down();
+  m_default_namespace_replayer.reset();
+
+  if (m_remote_pool_poller) {
+    C_SaferCond ctx;
+    m_remote_pool_poller->shut_down(&ctx);
+    ctx.wait();
+
+    m_pool_meta_cache->remove_remote_pool_meta(m_remote_io_ctx.get_id());
+    m_pool_meta_cache->remove_local_pool_meta(m_local_io_ctx.get_id());
   }
+  m_remote_pool_poller.reset();
+  m_remote_pool_poller_listener.reset();
 
-  m_leader_watcher.reset();
-  m_instance_watcher.reset();
-  m_instance_replayer.reset();
+  m_image_sync_throttler.reset();
+  m_image_deletion_throttler.reset();
 
-  ceph_assert(!m_image_map);
-  ceph_assert(!m_image_deleter);
-  ceph_assert(!m_local_pool_watcher);
-  ceph_assert(!m_remote_pool_watcher);
   m_local_rados.reset();
   m_remote_rados.reset();
 }
@@ -516,7 +553,7 @@ int PoolReplayer<I>::init_rados(const std::string &cluster_name,
   // disable unnecessary librbd cache
   cct->_conf.set_val_or_die("rbd_cache", "false");
   cct->_conf.apply_changes(nullptr);
-  cct->_conf.complain_about_parse_errors(cct);
+  cct->_conf.complain_about_parse_error(cct);
 
   rados_ref->reset(new librados::Rados());
 
@@ -535,11 +572,10 @@ int PoolReplayer<I>::init_rados(const std::string &cluster_name,
 }
 
 template <typename I>
-void PoolReplayer<I>::run()
-{
-  dout(20) << "enter" << dendl;
+void PoolReplayer<I>::run() {
+  dout(20) << dendl;
 
-  while (!m_stopping) {
+  while (true) {
     std::string asok_hook_name = m_local_io_ctx.get_pool_name() + " " +
                                  m_peer.cluster_name;
     if (m_asok_hook_name != asok_hook_name || m_asok_hook == nullptr) {
@@ -550,26 +586,177 @@ void PoolReplayer<I>::run()
                                                       m_asok_hook_name, this);
     }
 
-    Mutex::Locker locker(m_lock);
-    if ((m_local_pool_watcher && m_local_pool_watcher->is_blacklisted()) ||
-       (m_remote_pool_watcher && m_remote_pool_watcher->is_blacklisted())) {
+    with_namespace_replayers([this]() { update_namespace_replayers(); });
+
+    std::unique_lock locker{m_lock};
+
+    if (m_leader_watcher->is_blacklisted() ||
+        m_default_namespace_replayer->is_blacklisted()) {
       m_blacklisted = true;
       m_stopping = true;
+    }
+
+    for (auto &it : m_namespace_replayers) {
+      if (it.second->is_blacklisted()) {
+        m_blacklisted = true;
+        m_stopping = true;
+        break;
+      }
+    }
+
+    if (m_stopping) {
       break;
     }
 
-    if (!m_stopping) {
-      m_cond.WaitInterval(m_lock, utime_t(1, 0));
+    auto seconds = g_ceph_context->_conf.get_val<uint64_t>(
+        "rbd_mirror_pool_replayers_refresh_interval");
+    m_cond.wait_for(locker, ceph::make_timespan(seconds));
+  }
+
+  // shut down namespace replayers
+  with_namespace_replayers([this]() { update_namespace_replayers(); });
+
+  delete m_asok_hook;
+  m_asok_hook = nullptr;
+}
+
+template <typename I>
+void PoolReplayer<I>::update_namespace_replayers() {
+  dout(20) << dendl;
+
+  ceph_assert(ceph_mutex_is_locked(m_lock));
+
+  std::set<std::string> mirroring_namespaces;
+  if (!m_stopping) {
+    int r = list_mirroring_namespaces(&mirroring_namespaces);
+    if (r < 0) {
+      return;
+    }
+  }
+
+  auto cct = reinterpret_cast<CephContext *>(m_local_io_ctx.cct());
+  C_SaferCond cond;
+  auto gather_ctx = new C_Gather(cct, &cond);
+  for (auto it = m_namespace_replayers.begin();
+       it != m_namespace_replayers.end(); ) {
+    auto iter = mirroring_namespaces.find(it->first);
+    if (iter == mirroring_namespaces.end()) {
+      auto namespace_replayer = it->second;
+      auto on_shut_down = new LambdaContext(
+        [namespace_replayer, ctx=gather_ctx->new_sub()](int r) {
+          delete namespace_replayer;
+          ctx->complete(r);
+        });
+      m_service_daemon->remove_namespace(m_local_pool_id, it->first);
+      namespace_replayer->shut_down(on_shut_down);
+      it = m_namespace_replayers.erase(it);
+    } else {
+      mirroring_namespaces.erase(iter);
+      it++;
+    }
+  }
+
+  for (auto &name : mirroring_namespaces) {
+    auto namespace_replayer = NamespaceReplayer<I>::create(
+        name, m_local_io_ctx, m_remote_io_ctx, m_local_mirror_uuid, m_peer.uuid,
+        m_remote_pool_meta, m_threads, m_image_sync_throttler.get(),
+        m_image_deletion_throttler.get(), m_service_daemon,
+        m_cache_manager_handler, m_pool_meta_cache);
+    auto on_init = new LambdaContext(
+        [this, namespace_replayer, name, &mirroring_namespaces,
+         ctx=gather_ctx->new_sub()](int r) {
+          std::lock_guard locker{m_lock};
+          if (r < 0) {
+            derr << "failed to initialize namespace replayer for namespace "
+                 << name << ": " << cpp_strerror(r) << dendl;
+            delete namespace_replayer;
+            mirroring_namespaces.erase(name);
+          } else {
+            m_namespace_replayers[name] = namespace_replayer;
+            m_service_daemon->add_namespace(m_local_pool_id, name);
+          }
+          ctx->complete(r);
+        });
+    namespace_replayer->init(on_init);
+  }
+
+  gather_ctx->activate();
+
+  m_lock.unlock();
+  cond.wait();
+  m_lock.lock();
+
+  if (m_leader) {
+    C_SaferCond acquire_cond;
+    auto acquire_gather_ctx = new C_Gather(cct, &acquire_cond);
+
+    for (auto &name : mirroring_namespaces) {
+      namespace_replayer_acquire_leader(name, acquire_gather_ctx->new_sub());
+    }
+    acquire_gather_ctx->activate();
+
+    m_lock.unlock();
+    acquire_cond.wait();
+    m_lock.lock();
+
+    std::vector<std::string> instance_ids;
+    m_leader_watcher->list_instances(&instance_ids);
+
+    for (auto &name : mirroring_namespaces) {
+      auto it = m_namespace_replayers.find(name);
+      if (it == m_namespace_replayers.end()) {
+        // acuire leader for this namespace replayer failed
+        continue;
+      }
+      it->second->handle_instances_added(instance_ids);
+    }
+  } else {
+    std::string leader_instance_id;
+    if (m_leader_watcher->get_leader_instance_id(&leader_instance_id)) {
+      for (auto &name : mirroring_namespaces) {
+        m_namespace_replayers[name]->handle_update_leader(leader_instance_id);
+      }
+    }
+  }
+}
+
+template <typename I>
+int PoolReplayer<I>::list_mirroring_namespaces(
+    std::set<std::string> *namespaces) {
+  ceph_assert(ceph_mutex_is_locked(m_lock));
+
+  std::vector<std::string> names;
+
+  int r = librbd::api::Namespace<I>::list(m_local_io_ctx, &names);
+  if (r < 0) {
+    derr << "failed to list namespaces: " << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  for (auto &name : names) {
+    cls::rbd::MirrorMode mirror_mode = cls::rbd::MIRROR_MODE_DISABLED;
+    int r = librbd::cls_client::mirror_mode_get(&m_local_io_ctx, &mirror_mode);
+    if (r < 0 && r != -ENOENT) {
+      derr << "failed to get namespace mirror mode: " << cpp_strerror(r)
+           << dendl;
+      if (m_namespace_replayers.count(name) == 0) {
+        continue;
+      }
+    } else if (mirror_mode == cls::rbd::MIRROR_MODE_DISABLED) {
+      dout(10) << "mirroring is disabled for namespace " << name << dendl;
+      continue;
     }
+
+    namespaces->insert(name);
   }
 
-  m_instance_replayer->stop();
+  return 0;
 }
 
 template <typename I>
 void PoolReplayer<I>::reopen_logs()
 {
-  Mutex::Locker l(m_lock);
+  std::lock_guard locker{m_lock};
 
   if (m_local_rados) {
     reinterpret_cast<CephContext *>(m_local_rados->cct())->reopen_logs();
@@ -580,135 +767,207 @@ void PoolReplayer<I>::reopen_logs()
 }
 
 template <typename I>
-void PoolReplayer<I>::print_status(Formatter *f, stringstream *ss)
-{
-  dout(20) << "enter" << dendl;
+void PoolReplayer<I>::namespace_replayer_acquire_leader(const std::string &name,
+                                                        Context *on_finish) {
+  ceph_assert(ceph_mutex_is_locked(m_lock));
+
+  auto it = m_namespace_replayers.find(name);
+  ceph_assert(it != m_namespace_replayers.end());
+
+  on_finish = new LambdaContext(
+      [this, name, on_finish](int r) {
+        if (r < 0) {
+          derr << "failed to handle acquire leader for namespace: "
+               << name << ": " << cpp_strerror(r) << dendl;
+
+          // remove the namespace replayer -- update_namespace_replayers will
+          // retry to create it and acquire leader.
+
+          std::lock_guard locker{m_lock};
+
+          auto namespace_replayer = m_namespace_replayers[name];
+          m_namespace_replayers.erase(name);
+          auto on_shut_down = new LambdaContext(
+              [namespace_replayer, on_finish](int r) {
+                delete namespace_replayer;
+                on_finish->complete(r);
+              });
+          m_service_daemon->remove_namespace(m_local_pool_id, name);
+          namespace_replayer->shut_down(on_shut_down);
+          return;
+        }
+        on_finish->complete(0);
+      });
 
-  if (!f) {
-    return;
-  }
+  it->second->handle_acquire_leader(on_finish);
+}
+
+template <typename I>
+void PoolReplayer<I>::print_status(Formatter *f) {
+  dout(20) << dendl;
 
-  Mutex::Locker l(m_lock);
+  assert(f);
+
+  std::lock_guard l{m_lock};
 
   f->open_object_section("pool_replayer_status");
-  f->dump_string("pool", m_local_io_ctx.get_pool_name());
   f->dump_stream("peer") << m_peer;
-  f->dump_string("instance_id", m_instance_watcher->get_instance_id());
+  if (m_local_io_ctx.is_valid()) {
+    f->dump_string("pool", m_local_io_ctx.get_pool_name());
+    f->dump_stream("instance_id") << m_local_io_ctx.get_instance_id();
+  }
 
   std::string state("running");
   if (m_manual_stop) {
     state = "stopped (manual)";
   } else if (m_stopping) {
     state = "stopped";
+  } else if (!is_running()) {
+    state = "error";
   }
   f->dump_string("state", state);
 
-  std::string leader_instance_id;
-  m_leader_watcher->get_leader_instance_id(&leader_instance_id);
-  f->dump_string("leader_instance_id", leader_instance_id);
-
-  bool leader = m_leader_watcher->is_leader();
-  f->dump_bool("leader", leader);
-  if (leader) {
-    std::vector<std::string> instance_ids;
-    m_leader_watcher->list_instances(&instance_ids);
-    f->open_array_section("instances");
-    for (auto instance_id : instance_ids) {
-      f->dump_string("instance_id", instance_id);
+  if (m_leader_watcher) {
+    std::string leader_instance_id;
+    m_leader_watcher->get_leader_instance_id(&leader_instance_id);
+    f->dump_string("leader_instance_id", leader_instance_id);
+
+    bool leader = m_leader_watcher->is_leader();
+    f->dump_bool("leader", leader);
+    if (leader) {
+      std::vector<std::string> instance_ids;
+      m_leader_watcher->list_instances(&instance_ids);
+      f->open_array_section("instances");
+      for (auto instance_id : instance_ids) {
+        f->dump_string("instance_id", instance_id);
+      }
+      f->close_section(); // instances
     }
-    f->close_section();
   }
 
-  f->dump_string("local_cluster_admin_socket",
-                 reinterpret_cast<CephContext *>(m_local_io_ctx.cct())->_conf.
-                     get_val<std::string>("admin_socket"));
-  f->dump_string("remote_cluster_admin_socket",
-                 reinterpret_cast<CephContext *>(m_remote_io_ctx.cct())->_conf.
-                     get_val<std::string>("admin_socket"));
+  if (m_local_rados) {
+    auto cct = reinterpret_cast<CephContext *>(m_local_rados->cct());
+    f->dump_string("local_cluster_admin_socket",
+                   cct->_conf.get_val<std::string>("admin_socket"));
+  }
+  if (m_remote_rados) {
+    auto cct = reinterpret_cast<CephContext *>(m_remote_rados->cct());
+    f->dump_string("remote_cluster_admin_socket",
+                   cct->_conf.get_val<std::string>("admin_socket"));
+  }
+
+  if (m_image_sync_throttler) {
+    f->open_object_section("sync_throttler");
+    m_image_sync_throttler->print_status(f);
+    f->close_section(); // sync_throttler
+  }
 
-  f->open_object_section("sync_throttler");
-  m_instance_watcher->print_sync_status(f, ss);
-  f->close_section();
+  if (m_image_deletion_throttler) {
+    f->open_object_section("deletion_throttler");
+    m_image_deletion_throttler->print_status(f);
+    f->close_section(); // deletion_throttler
+  }
 
-  m_instance_replayer->print_status(f, ss);
+  if (m_default_namespace_replayer) {
+    m_default_namespace_replayer->print_status(f);
+  }
 
-  if (m_image_deleter) {
-    f->open_object_section("image_deleter");
-    m_image_deleter->print_status(f, ss);
-    f->close_section();
+  f->open_array_section("namespaces");
+  for (auto &it : m_namespace_replayers) {
+    f->open_object_section("namespace");
+    f->dump_string("name", it.first);
+    it.second->print_status(f);
+    f->close_section(); // namespace
   }
+  f->close_section(); // namespaces
 
-  f->close_section();
-  f->flush(*ss);
+  f->close_section(); // pool_replayer_status
 }
 
 template <typename I>
-void PoolReplayer<I>::start()
-{
-  dout(20) << "enter" << dendl;
+void PoolReplayer<I>::start() {
+  dout(20) << dendl;
 
-  Mutex::Locker l(m_lock);
+  std::lock_guard l{m_lock};
 
   if (m_stopping) {
     return;
   }
 
   m_manual_stop = false;
-  m_instance_replayer->start();
+
+  if (m_default_namespace_replayer) {
+    m_default_namespace_replayer->start();
+  }
+  for (auto &it : m_namespace_replayers) {
+    it.second->start();
+  }
 }
 
 template <typename I>
-void PoolReplayer<I>::stop(bool manual)
-{
+void PoolReplayer<I>::stop(bool manual) {
   dout(20) << "enter: manual=" << manual << dendl;
 
-  Mutex::Locker l(m_lock);
+  std::lock_guard l{m_lock};
   if (!manual) {
     m_stopping = true;
-    m_cond.Signal();
+    m_cond.notify_all();
     return;
   } else if (m_stopping) {
     return;
   }
 
   m_manual_stop = true;
-  m_instance_replayer->stop();
+
+  if (m_default_namespace_replayer) {
+    m_default_namespace_replayer->stop();
+  }
+  for (auto &it : m_namespace_replayers) {
+    it.second->stop();
+  }
 }
 
 template <typename I>
-void PoolReplayer<I>::restart()
-{
-  dout(20) << "enter" << dendl;
+void PoolReplayer<I>::restart() {
+  dout(20) << dendl;
 
-  Mutex::Locker l(m_lock);
+  std::lock_guard l{m_lock};
 
   if (m_stopping) {
     return;
   }
 
-  m_instance_replayer->restart();
+  if (m_default_namespace_replayer) {
+    m_default_namespace_replayer->restart();
+  }
+  for (auto &it : m_namespace_replayers) {
+    it.second->restart();
+  }
 }
 
 template <typename I>
-void PoolReplayer<I>::flush()
-{
-  dout(20) << "enter" << dendl;
+void PoolReplayer<I>::flush() {
+  dout(20) << dendl;
 
-  Mutex::Locker l(m_lock);
+  std::lock_guard l{m_lock};
 
   if (m_stopping || m_manual_stop) {
     return;
   }
 
-  m_instance_replayer->flush();
+  if (m_default_namespace_replayer) {
+    m_default_namespace_replayer->flush();
+  }
+  for (auto &it : m_namespace_replayers) {
+    it.second->flush();
+  }
 }
 
 template <typename I>
-void PoolReplayer<I>::release_leader()
-{
-  dout(20) << "enter" << dendl;
+void PoolReplayer<I>::release_leader() {
+  dout(20) << dendl;
 
-  Mutex::Locker l(m_lock);
+  std::lock_guard l{m_lock};
 
   if (m_stopping || !m_leader_watcher) {
     return;
@@ -717,400 +976,132 @@ void PoolReplayer<I>::release_leader()
   m_leader_watcher->release_leader();
 }
 
-template <typename I>
-void PoolReplayer<I>::handle_update(const std::string &mirror_uuid,
-                                   ImageIds &&added_image_ids,
-                                   ImageIds &&removed_image_ids) {
-  if (m_stopping) {
-    return;
-  }
-
-  dout(10) << "mirror_uuid=" << mirror_uuid << ", "
-           << "added_count=" << added_image_ids.size() << ", "
-           << "removed_count=" << removed_image_ids.size() << dendl;
-  Mutex::Locker locker(m_lock);
-  if (!m_leader_watcher->is_leader()) {
-    return;
-  }
-
-  m_service_daemon->add_or_update_attribute(
-    m_local_pool_id, SERVICE_DAEMON_LOCAL_COUNT_KEY,
-    m_local_pool_watcher->get_image_count());
-  if (m_remote_pool_watcher) {
-    m_service_daemon->add_or_update_attribute(
-      m_local_pool_id, SERVICE_DAEMON_REMOTE_COUNT_KEY,
-      m_remote_pool_watcher->get_image_count());
-  }
-
-  std::set<std::string> added_global_image_ids;
-  for (auto& image_id : added_image_ids) {
-    added_global_image_ids.insert(image_id.global_id);
-  }
-
-  std::set<std::string> removed_global_image_ids;
-  for (auto& image_id : removed_image_ids) {
-    removed_global_image_ids.insert(image_id.global_id);
-  }
-
-  m_image_map->update_images(mirror_uuid,
-                             std::move(added_global_image_ids),
-                             std::move(removed_global_image_ids));
-}
-
 template <typename I>
 void PoolReplayer<I>::handle_post_acquire_leader(Context *on_finish) {
-  dout(10) << dendl;
-
-  m_service_daemon->add_or_update_attribute(m_local_pool_id,
-                                            SERVICE_DAEMON_LEADER_KEY, true);
-  m_instance_watcher->handle_acquire_leader();
-  init_image_map(on_finish);
+  dout(20) << dendl;
+
+  with_namespace_replayers(
+      [this](Context *on_finish) {
+        dout(10) << "handle_post_acquire_leader" << dendl;
+
+        ceph_assert(ceph_mutex_is_locked(m_lock));
+
+        m_service_daemon->add_or_update_attribute(m_local_pool_id,
+                                                  SERVICE_DAEMON_LEADER_KEY,
+                                                  true);
+        auto ctx = new LambdaContext(
+            [this, on_finish](int r) {
+              if (r == 0) {
+                std::lock_guard locker{m_lock};
+                m_leader = true;
+              }
+              on_finish->complete(r);
+            });
+
+        auto cct = reinterpret_cast<CephContext *>(m_local_io_ctx.cct());
+        auto gather_ctx = new C_Gather(cct, ctx);
+
+        m_default_namespace_replayer->handle_acquire_leader(
+            gather_ctx->new_sub());
+
+        for (auto &it : m_namespace_replayers) {
+          namespace_replayer_acquire_leader(it.first, gather_ctx->new_sub());
+        }
+
+        gather_ctx->activate();
+      }, on_finish);
 }
 
 template <typename I>
 void PoolReplayer<I>::handle_pre_release_leader(Context *on_finish) {
-  dout(10) << dendl;
-
-  m_service_daemon->remove_attribute(m_local_pool_id,
-                                     SERVICE_DAEMON_LEADER_KEY);
-  m_instance_watcher->handle_release_leader();
-  shut_down_image_deleter(on_finish);
-}
-
-template <typename I>
-void PoolReplayer<I>::init_image_map(Context *on_finish) {
-  dout(5) << dendl;
-
-  Mutex::Locker locker(m_lock);
-  ceph_assert(!m_image_map);
-  m_image_map.reset(ImageMap<I>::create(m_local_io_ctx, m_threads,
-                                        m_instance_watcher->get_instance_id(),
-                                        m_image_map_listener));
-
-  auto ctx = new FunctionContext([this, on_finish](int r) {
-      handle_init_image_map(r, on_finish);
-    });
-  m_image_map->init(create_async_context_callback(
-    m_threads->work_queue, ctx));
-}
-
-template <typename I>
-void PoolReplayer<I>::handle_init_image_map(int r, Context *on_finish) {
-  dout(5) << "r=" << r << dendl;
-  if (r < 0) {
-    derr << "failed to init image map: " << cpp_strerror(r) << dendl;
-    on_finish = new FunctionContext([on_finish, r](int) {
-        on_finish->complete(r);
-      });
-    shut_down_image_map(on_finish);
-    return;
-  }
+  dout(20) << dendl;
 
-  init_local_pool_watcher(on_finish);
-}
-
-template <typename I>
-void PoolReplayer<I>::init_local_pool_watcher(Context *on_finish) {
-  dout(10) << dendl;
-
-  Mutex::Locker locker(m_lock);
-  ceph_assert(!m_local_pool_watcher);
-  m_local_pool_watcher.reset(PoolWatcher<I>::create(
-    m_threads, m_local_io_ctx, m_local_pool_watcher_listener));
-
-  // ensure the initial set of local images is up-to-date
-  // after acquiring the leader role
-  auto ctx = new FunctionContext([this, on_finish](int r) {
-      handle_init_local_pool_watcher(r, on_finish);
-    });
-  m_local_pool_watcher->init(create_async_context_callback(
-    m_threads->work_queue, ctx));
-}
+  with_namespace_replayers(
+      [this](Context *on_finish) {
+        dout(10) << "handle_pre_release_leader" << dendl;
 
-template <typename I>
-void PoolReplayer<I>::handle_init_local_pool_watcher(
-    int r, Context *on_finish) {
-  dout(10) << "r=" << r << dendl;
-  if (r < 0) {
-    derr << "failed to retrieve local images: " << cpp_strerror(r) << dendl;
-    on_finish = new FunctionContext([on_finish, r](int) {
-        on_finish->complete(r);
-      });
-    shut_down_pool_watchers(on_finish);
-    return;
-  }
+        ceph_assert(ceph_mutex_is_locked(m_lock));
 
-  init_remote_pool_watcher(on_finish);
-}
+        m_leader = false;
+        m_service_daemon->remove_attribute(m_local_pool_id,
+                                           SERVICE_DAEMON_LEADER_KEY);
 
-template <typename I>
-void PoolReplayer<I>::init_remote_pool_watcher(Context *on_finish) {
-  dout(10) << dendl;
-
-  Mutex::Locker locker(m_lock);
-  ceph_assert(!m_remote_pool_watcher);
-  m_remote_pool_watcher.reset(PoolWatcher<I>::create(
-    m_threads, m_remote_io_ctx, m_remote_pool_watcher_listener));
-
-  auto ctx = new FunctionContext([this, on_finish](int r) {
-      handle_init_remote_pool_watcher(r, on_finish);
-    });
-  m_remote_pool_watcher->init(create_async_context_callback(
-    m_threads->work_queue, ctx));
-}
+        auto cct = reinterpret_cast<CephContext *>(m_local_io_ctx.cct());
+        auto gather_ctx = new C_Gather(cct, on_finish);
 
-template <typename I>
-void PoolReplayer<I>::handle_init_remote_pool_watcher(
-    int r, Context *on_finish) {
-  dout(10) << "r=" << r << dendl;
-  if (r == -ENOENT) {
-    // Technically nothing to do since the other side doesn't
-    // have mirroring enabled. Eventually the remote pool watcher will
-    // detect images (if mirroring is enabled), so no point propagating
-    // an error which would just busy-spin the state machines.
-    dout(0) << "remote peer does not have mirroring configured" << dendl;
-  } else if (r < 0) {
-    derr << "failed to retrieve remote images: " << cpp_strerror(r) << dendl;
-    on_finish = new FunctionContext([on_finish, r](int) {
-        on_finish->complete(r);
-      });
-    shut_down_pool_watchers(on_finish);
-    return;
-  }
+        m_default_namespace_replayer->handle_release_leader(
+            gather_ctx->new_sub());
 
-  init_image_deleter(on_finish);
-}
+        for (auto &it : m_namespace_replayers) {
+          it.second->handle_release_leader(gather_ctx->new_sub());
+        }
 
-template <typename I>
-void PoolReplayer<I>::init_image_deleter(Context *on_finish) {
-  dout(10) << dendl;
-
-  Mutex::Locker locker(m_lock);
-  ceph_assert(!m_image_deleter);
-
-  on_finish = new FunctionContext([this, on_finish](int r) {
-      handle_init_image_deleter(r, on_finish);
-    });
-  m_image_deleter.reset(ImageDeleter<I>::create(m_local_io_ctx, m_threads,
-                                                m_service_daemon));
-  m_image_deleter->init(create_async_context_callback(
-    m_threads->work_queue, on_finish));
+        gather_ctx->activate();
+      }, on_finish);
 }
 
 template <typename I>
-void PoolReplayer<I>::handle_init_image_deleter(int r, Context *on_finish) {
-  dout(10) << "r=" << r << dendl;
-  if (r < 0) {
-    derr << "failed to init image deleter: " << cpp_strerror(r) << dendl;
-    on_finish = new FunctionContext([on_finish, r](int) {
-        on_finish->complete(r);
-      });
-    shut_down_image_deleter(on_finish);
-    return;
-  }
-
-  on_finish->complete(0);
-
-  Mutex::Locker locker(m_lock);
-  m_cond.Signal();
-}
+void PoolReplayer<I>::handle_update_leader(
+    const std::string &leader_instance_id) {
+  dout(10) << "leader_instance_id=" << leader_instance_id << dendl;
 
-template <typename I>
-void PoolReplayer<I>::shut_down_image_deleter(Context* on_finish) {
-  dout(10) << dendl;
-  {
-    Mutex::Locker locker(m_lock);
-    if (m_image_deleter) {
-      Context *ctx = new FunctionContext([this, on_finish](int r) {
-          handle_shut_down_image_deleter(r, on_finish);
-       });
-      ctx = create_async_context_callback(m_threads->work_queue, ctx);
-
-      m_image_deleter->shut_down(ctx);
-      return;
-    }
-  }
-  shut_down_pool_watchers(on_finish);
-}
+  std::lock_guard locker{m_lock};
 
-template <typename I>
-void PoolReplayer<I>::handle_shut_down_image_deleter(
-    int r, Context* on_finish) {
-  dout(10) << "r=" << r << dendl;
+  m_default_namespace_replayer->handle_update_leader(leader_instance_id);
 
-  {
-    Mutex::Locker locker(m_lock);
-    ceph_assert(m_image_deleter);
-    m_image_deleter.reset();
+  for (auto &it : m_namespace_replayers) {
+    it.second->handle_update_leader(leader_instance_id);
   }
-
-  shut_down_pool_watchers(on_finish);
 }
 
 template <typename I>
-void PoolReplayer<I>::shut_down_pool_watchers(Context *on_finish) {
-  dout(10) << dendl;
+void PoolReplayer<I>::handle_instances_added(
+    const std::vector<std::string> &instance_ids) {
+  dout(5) << "instance_ids=" << instance_ids << dendl;
 
-  {
-    Mutex::Locker locker(m_lock);
-    if (m_local_pool_watcher) {
-      Context *ctx = new FunctionContext([this, on_finish](int r) {
-          handle_shut_down_pool_watchers(r, on_finish);
-       });
-      ctx = create_async_context_callback(m_threads->work_queue, ctx);
-
-      auto gather_ctx = new C_Gather(g_ceph_context, ctx);
-      m_local_pool_watcher->shut_down(gather_ctx->new_sub());
-      if (m_remote_pool_watcher) {
-       m_remote_pool_watcher->shut_down(gather_ctx->new_sub());
-      }
-      gather_ctx->activate();
-      return;
-    }
+  std::lock_guard locker{m_lock};
+  if (!m_leader_watcher->is_leader()) {
+    return;
   }
 
-  on_finish->complete(0);
-}
-
-template <typename I>
-void PoolReplayer<I>::handle_shut_down_pool_watchers(
-    int r, Context *on_finish) {
-  dout(10) << "r=" << r << dendl;
-
-  {
-    Mutex::Locker locker(m_lock);
-    ceph_assert(m_local_pool_watcher);
-    m_local_pool_watcher.reset();
+  m_default_namespace_replayer->handle_instances_added(instance_ids);
 
-    if (m_remote_pool_watcher) {
-      m_remote_pool_watcher.reset();
-    }
+  for (auto &it : m_namespace_replayers) {
+    it.second->handle_instances_added(instance_ids);
   }
-  wait_for_update_ops(on_finish);
-}
-
-template <typename I>
-void PoolReplayer<I>::wait_for_update_ops(Context *on_finish) {
-  dout(10) << dendl;
-
-  Mutex::Locker locker(m_lock);
-
-  Context *ctx = new FunctionContext([this, on_finish](int r) {
-      handle_wait_for_update_ops(r, on_finish);
-    });
-  ctx = create_async_context_callback(m_threads->work_queue, ctx);
-
-  m_update_op_tracker.wait_for_ops(ctx);
-}
-
-template <typename I>
-void PoolReplayer<I>::handle_wait_for_update_ops(int r, Context *on_finish) {
-  dout(10) << "r=" << r << dendl;
-  ceph_assert(r == 0);
-
-  shut_down_image_map(on_finish);
 }
 
 template <typename I>
-void PoolReplayer<I>::shut_down_image_map(Context *on_finish) {
-  dout(5) << dendl;
+void PoolReplayer<I>::handle_instances_removed(
+    const std::vector<std::string> &instance_ids) {
+  dout(5) << "instance_ids=" << instance_ids << dendl;
 
-  {
-    Mutex::Locker locker(m_lock);
-    if (m_image_map) {
-      on_finish = new FunctionContext([this, on_finish](int r) {
-          handle_shut_down_image_map(r, on_finish);
-        });
-      m_image_map->shut_down(create_async_context_callback(
-        m_threads->work_queue, on_finish));
-      return;
-    }
+  std::lock_guard locker{m_lock};
+  if (!m_leader_watcher->is_leader()) {
+    return;
   }
 
-  on_finish->complete(0);
-}
+  m_default_namespace_replayer->handle_instances_removed(instance_ids);
 
-template <typename I>
-void PoolReplayer<I>::handle_shut_down_image_map(int r, Context *on_finish) {
-  dout(5) << "r=" << r << dendl;
-  if (r < 0 && r != -EBLACKLISTED) {
-    derr << "failed to shut down image map: " << cpp_strerror(r) << dendl;
+  for (auto &it : m_namespace_replayers) {
+    it.second->handle_instances_removed(instance_ids);
   }
-
-  Mutex::Locker locker(m_lock);
-  ceph_assert(m_image_map);
-  m_image_map.reset();
-
-  m_instance_replayer->release_all(on_finish);
-}
-
-template <typename I>
-void PoolReplayer<I>::handle_update_leader(
-    const std::string &leader_instance_id) {
-  dout(10) << "leader_instance_id=" << leader_instance_id << dendl;
-
-  m_instance_watcher->handle_update_leader(leader_instance_id);
-}
-
-template <typename I>
-void PoolReplayer<I>::handle_acquire_image(const std::string &global_image_id,
-                                           const std::string &instance_id,
-                                           Context* on_finish) {
-  dout(5) << "global_image_id=" << global_image_id << ", "
-          << "instance_id=" << instance_id << dendl;
-
-  m_instance_watcher->notify_image_acquire(instance_id, global_image_id,
-                                           on_finish);
-}
-
-template <typename I>
-void PoolReplayer<I>::handle_release_image(const std::string &global_image_id,
-                                           const std::string &instance_id,
-                                           Context* on_finish) {
-  dout(5) << "global_image_id=" << global_image_id << ", "
-          << "instance_id=" << instance_id << dendl;
-
-  m_instance_watcher->notify_image_release(instance_id, global_image_id,
-                                           on_finish);
-}
-
-template <typename I>
-void PoolReplayer<I>::handle_remove_image(const std::string &mirror_uuid,
-                                          const std::string &global_image_id,
-                                          const std::string &instance_id,
-                                          Context* on_finish) {
-  ceph_assert(!mirror_uuid.empty());
-  dout(5) << "mirror_uuid=" << mirror_uuid << ", "
-          << "global_image_id=" << global_image_id << ", "
-          << "instance_id=" << instance_id << dendl;
-
-  m_instance_watcher->notify_peer_image_removed(instance_id, global_image_id,
-                                                mirror_uuid, on_finish);
 }
 
 template <typename I>
-void PoolReplayer<I>::handle_instances_added(const InstanceIds &instance_ids) {
-  dout(5) << "instance_ids=" << instance_ids << dendl;
-  Mutex::Locker locker(m_lock);
-  if (!m_leader_watcher->is_leader()) {
-    return;
-  }
-
-  ceph_assert(m_image_map);
-  m_image_map->update_instances_added(instance_ids);
-}
+void PoolReplayer<I>::handle_remote_pool_meta_updated(
+    const RemotePoolMeta& remote_pool_meta) {
+  dout(5) << "remote_pool_meta=" << remote_pool_meta << dendl;
 
-template <typename I>
-void PoolReplayer<I>::handle_instances_removed(
-    const InstanceIds &instance_ids) {
-  dout(5) << "instance_ids=" << instance_ids << dendl;
-  Mutex::Locker locker(m_lock);
-  if (!m_leader_watcher->is_leader()) {
+  if (!m_default_namespace_replayer) {
+    m_remote_pool_meta = remote_pool_meta;
     return;
   }
 
-  ceph_assert(m_image_map);
-  m_image_map->update_instances_removed(instance_ids);
+  derr << "remote pool metadata updated unexpectedly" << dendl;
+  std::unique_lock locker{m_lock};
+  m_stopping = true;
+  m_cond.notify_all();
 }
 
 } // namespace mirror