]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/test/rbd_mirror/test_mock_PoolReplayer.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / test / rbd_mirror / test_mock_PoolReplayer.cc
index 4ceb9be3c4762ca2e6991e65b1ca7e9fb84acced..42e5a42a6ea8bed11d1e3794d8aaa2dbe508dcb2 100644 (file)
@@ -2,6 +2,7 @@
 // vim: ts=8 sw=2 smarttab
 
 #include "librbd/api/Config.h"
+#include "librbd/api/Namespace.h"
 #include "test/librbd/mock/MockImageCtx.h"
 #include "test/librados_test_stub/MockTestMemCluster.h"
 #include "test/librados_test_stub/MockTestMemIoCtxImpl.h"
 #include "test/rbd_mirror/test_mock_fixture.h"
 #include "test/rbd_mirror/mock/MockContextWQ.h"
 #include "test/rbd_mirror/mock/MockSafeTimer.h"
-#include "tools/rbd_mirror/PoolReplayer.h"
-#include "tools/rbd_mirror/ImageDeleter.h"
-#include "tools/rbd_mirror/ImageMap.h"
-#include "tools/rbd_mirror/InstanceWatcher.h"
-#include "tools/rbd_mirror/InstanceReplayer.h"
+#include "tools/rbd_mirror/Throttler.h"
 #include "tools/rbd_mirror/LeaderWatcher.h"
-#include "tools/rbd_mirror/PoolWatcher.h"
+#include "tools/rbd_mirror/NamespaceReplayer.h"
+#include "tools/rbd_mirror/PoolMetaCache.h"
+#include "tools/rbd_mirror/PoolReplayer.h"
+#include "tools/rbd_mirror/RemotePoolPoller.h"
 #include "tools/rbd_mirror/ServiceDaemon.h"
 #include "tools/rbd_mirror/Threads.h"
+#include "common/Formatter.h"
 
 namespace librbd {
 
@@ -41,158 +42,155 @@ public:
   }
 };
 
-}
-
-} // namespace librbd
-
-namespace rbd {
-namespace mirror {
-
 template <>
-struct ImageDeleter<librbd::MockTestImageCtx> {
-  static ImageDeleter* s_instance;
+class Namespace<MockTestImageCtx> {
+public:
+  static Namespace* s_instance;
 
-  static ImageDeleter* create(librados::IoCtx &ioctx,
-                              Threads<librbd::MockTestImageCtx> *threads,
-                              ServiceDaemon<librbd::MockTestImageCtx> *service_daemon) {
-    ceph_assert(s_instance != nullptr);
-    return s_instance;
-  }
+  static int list(librados::IoCtx& io_ctx, std::vector<std::string> *names) {
+    if (s_instance) {
+      return s_instance->list(names);
+    }
 
-  MOCK_METHOD1(init, void(Context*));
-  MOCK_METHOD1(shut_down, void(Context*));
-  MOCK_METHOD2(print_status, void(Formatter*, std::stringstream*));
+    return 0;
+  }
 
-  ImageDeleter() {
+  Namespace() {
     s_instance = this;
   }
-};
-
-ImageDeleter<librbd::MockTestImageCtx>* ImageDeleter<librbd::MockTestImageCtx>::s_instance = nullptr;
 
-template<>
-struct ImageMap<librbd::MockTestImageCtx> {
-  static ImageMap* s_instance;
+  void add(const std::string &name) {
+    std::lock_guard locker{m_lock};
 
-  static ImageMap *create(librados::IoCtx &ioctx,
-                          Threads<librbd::MockTestImageCtx> *threads,
-                          const std::string& instance_id,
-                          image_map::Listener &listener) {
-    ceph_assert(s_instance != nullptr);
-    return s_instance;
+    m_names.insert(name);
   }
 
-  MOCK_METHOD1(init, void(Context*));
-  MOCK_METHOD1(shut_down, void(Context*));
-
-  MOCK_METHOD1(update_instances_added, void(const std::vector<std::string>&));
-  MOCK_METHOD1(update_instances_removed, void(const std::vector<std::string>&));
+  void remove(const std::string &name) {
+    std::lock_guard locker{m_lock};
 
-  MOCK_METHOD3(update_images_mock, void(const std::string&,
-                                        const std::set<std::string>&,
-                                        const std::set<std::string>&));
-  void update_images(const std::string& mirror_uuid,
-                     std::set<std::string>&& added,
-                     std::set<std::string>&& removed) {
-    update_images_mock(mirror_uuid, added, removed);
+    m_names.erase(name);
   }
 
-  ImageMap() {
-    s_instance = this;
+  void clear() {
+    std::lock_guard locker{m_lock};
+
+    m_names.clear();
   }
-};
 
-ImageMap<librbd::MockTestImageCtx>* ImageMap<librbd::MockTestImageCtx>::s_instance = nullptr;
+private:
+  ceph::mutex m_lock = ceph::make_mutex("Namespace");
+  std::set<std::string> m_names;
 
-template<>
-struct InstanceReplayer<librbd::MockTestImageCtx> {
-  static InstanceReplayer* s_instance;
+  int list(std::vector<std::string> *names) {
+    std::lock_guard locker{m_lock};
 
-  static InstanceReplayer* create(Threads<librbd::MockTestImageCtx> *threads,
-                                  ServiceDaemon<librbd::MockTestImageCtx> *service_daemon,
-                                  RadosRef rados, const std::string& uuid,
-                                  int64_t pool_id) {
-    ceph_assert(s_instance != nullptr);
-    return s_instance;
+    names->clear();
+    names->insert(names->begin(), m_names.begin(), m_names.end());
+    return 0;
   }
+};
 
-  MOCK_METHOD0(start, void());
-  MOCK_METHOD0(stop, void());
-  MOCK_METHOD0(restart, void());
-  MOCK_METHOD0(flush, void());
+Namespace<librbd::MockTestImageCtx>* Namespace<librbd::MockTestImageCtx>::s_instance = nullptr;
 
-  MOCK_METHOD2(print_status, void(Formatter*, std::stringstream*));
+} // namespace api
 
-  MOCK_METHOD2(add_peer, void(const std::string&, librados::IoCtx&));
+} // namespace librbd
 
-  MOCK_METHOD0(init, void());
-  MOCK_METHOD0(shut_down, void());
-  MOCK_METHOD1(release_all, void(Context*));
+namespace rbd {
+namespace mirror {
 
-  InstanceReplayer() {
+template <>
+struct Throttler<librbd::MockTestImageCtx> {
+  static Throttler* s_instance;
+
+  static Throttler *create(
+      CephContext *cct,
+      const std::string &max_concurrent_ops_config_param_name) {
+    return s_instance;
+  }
+
+  Throttler() {
+    ceph_assert(s_instance == nullptr);
     s_instance = this;
   }
-};
 
-InstanceReplayer<librbd::MockTestImageCtx>* InstanceReplayer<librbd::MockTestImageCtx>::s_instance = nullptr;
+  virtual ~Throttler() {
+    ceph_assert(s_instance == this);
+    s_instance = nullptr;
+  }
 
-template<>
-struct InstanceWatcher<librbd::MockTestImageCtx> {
-  static InstanceWatcher* s_instance;
+  MOCK_METHOD1(print_status, void(Formatter*));
+};
 
-  static InstanceWatcher* create(librados::IoCtx &ioctx,
-                                 MockContextWQ* work_queue,
-                                 InstanceReplayer<librbd::MockTestImageCtx>* instance_replayer) {
-    ceph_assert(s_instance != nullptr);
-    return s_instance;
-  }
+Throttler<librbd::MockTestImageCtx>* Throttler<librbd::MockTestImageCtx>::s_instance = nullptr;
 
-  MOCK_METHOD0(handle_acquire_leader, void());
-  MOCK_METHOD0(handle_release_leader, void());
+template <>
+struct NamespaceReplayer<librbd::MockTestImageCtx> {
+  static std::map<std::string, NamespaceReplayer *> s_instances;
+
+  static NamespaceReplayer *create(
+      const std::string &name,
+      librados::IoCtx &local_ioctx,
+      librados::IoCtx &remote_ioctx,
+      const std::string &local_mirror_uuid,
+      const std::string& local_mirror_peer_uuid,
+      const RemotePoolMeta& remote_pool_meta,
+      Threads<librbd::MockTestImageCtx> *threads,
+      Throttler<librbd::MockTestImageCtx> *image_sync_throttler,
+      Throttler<librbd::MockTestImageCtx> *image_deletion_throttler,
+      ServiceDaemon<librbd::MockTestImageCtx> *service_daemon,
+      journal::CacheManagerHandler *cache_manager_handler,
+      PoolMetaCache* pool_meta_cache) {
+    ceph_assert(s_instances.count(name));
+    auto namespace_replayer = s_instances[name];
+    s_instances.erase(name);
+    return namespace_replayer;
+  }
 
+  MOCK_METHOD0(is_blacklisted, bool());
   MOCK_METHOD0(get_instance_id, std::string());
 
-  MOCK_METHOD2(print_sync_status, void(Formatter*, std::stringstream*));
+  MOCK_METHOD1(init, void(Context*));
+  MOCK_METHOD1(shut_down, void(Context*));
 
-  MOCK_METHOD0(init, int());
-  MOCK_METHOD0(shut_down, void());
-
-  MOCK_METHOD3(notify_image_acquire, void(const std::string&,
-                                          const std::string&,
-                                          Context*));
-  MOCK_METHOD3(notify_image_release, void(const std::string&,
-                                          const std::string&,
-                                          Context*));
-  MOCK_METHOD4(notify_peer_image_removed, void(const std::string&,
-                                               const std::string&,
-                                               const std::string&,
-                                               Context*));
-
-  MOCK_METHOD1(handle_update_leader, void(const std::string&));
-
-  InstanceWatcher() {
-    s_instance = this;
-  }
+  MOCK_METHOD1(handle_acquire_leader, void(Context *));
+  MOCK_METHOD1(handle_release_leader, void(Context *));
+  MOCK_METHOD1(handle_update_leader, void(const std::string &));
+  MOCK_METHOD1(handle_instances_added, void(const std::vector<std::string> &));
+  MOCK_METHOD1(handle_instances_removed, void(const std::vector<std::string> &));
+
+  MOCK_METHOD1(print_status, void(Formatter*));
+  MOCK_METHOD0(start, void());
+  MOCK_METHOD0(stop, void());
+  MOCK_METHOD0(restart, void());
+  MOCK_METHOD0(flush, void());
 
+  NamespaceReplayer(const std::string &name = "") {
+    ceph_assert(!s_instances.count(name));
+    s_instances[name] = this;
+  }
 };
 
-InstanceWatcher<librbd::MockTestImageCtx>* InstanceWatcher<librbd::MockTestImageCtx>::s_instance = nullptr;
+std::map<std::string, NamespaceReplayer<librbd::MockTestImageCtx> *> NamespaceReplayer<librbd::MockTestImageCtx>::s_instances;
 
 template<>
 struct LeaderWatcher<librbd::MockTestImageCtx> {
   static LeaderWatcher* s_instance;
+  leader_watcher::Listener* listener = nullptr;
 
   static LeaderWatcher *create(Threads<librbd::MockTestImageCtx> *threads,
                                librados::IoCtx &ioctx,
                                leader_watcher::Listener* listener) {
     ceph_assert(s_instance != nullptr);
+    s_instance->listener = listener;
     return s_instance;
   }
 
+  MOCK_METHOD0(is_blacklisted, bool());
   MOCK_METHOD0(is_leader, bool());
   MOCK_METHOD0(release_leader, void());
 
-  MOCK_METHOD1(get_leader_instance_id, void(std::string*));
+  MOCK_METHOD1(get_leader_instance_id, bool(std::string*));
   MOCK_METHOD1(list_instances, void(std::vector<std::string>*));
 
   MOCK_METHOD0(init, int());
@@ -207,33 +205,37 @@ struct LeaderWatcher<librbd::MockTestImageCtx> {
 LeaderWatcher<librbd::MockTestImageCtx>* LeaderWatcher<librbd::MockTestImageCtx>::s_instance = nullptr;
 
 template<>
-struct PoolWatcher<librbd::MockTestImageCtx> {
-  static PoolWatcher* s_instance;
+struct RemotePoolPoller<librbd::MockTestImageCtx> {
+  static RemotePoolPoller* s_instance;
 
-  static PoolWatcher *create(Threads<librbd::MockTestImageCtx> *threads,
-                             librados::IoCtx &ioctx,
-                             pool_watcher::Listener& listener) {
+  remote_pool_poller::Listener* listener = nullptr;
+
+  static RemotePoolPoller* create(
+      Threads<librbd::MockTestImageCtx>* threads,
+      librados::IoCtx& remote_io_ctx,
+      const std::string& local_site_name,
+      const std::string& local_mirror_uuid,
+      remote_pool_poller::Listener& listener) {
     ceph_assert(s_instance != nullptr);
+    s_instance->listener = &listener;
     return s_instance;
   }
 
-  MOCK_METHOD0(is_blacklisted, bool());
-
-  MOCK_METHOD0(get_image_count, uint64_t());
-
   MOCK_METHOD1(init, void(Context*));
   MOCK_METHOD1(shut_down, void(Context*));
 
-  PoolWatcher() {
+  RemotePoolPoller() {
     s_instance = this;
   }
-
 };
 
-PoolWatcher<librbd::MockTestImageCtx>* PoolWatcher<librbd::MockTestImageCtx>::s_instance = nullptr;
+RemotePoolPoller<librbd::MockTestImageCtx>* RemotePoolPoller<librbd::MockTestImageCtx>::s_instance = nullptr;
 
 template<>
 struct ServiceDaemon<librbd::MockTestImageCtx> {
+  MOCK_METHOD2(add_namespace, void(int64_t, const std::string &));
+  MOCK_METHOD2(remove_namespace, void(int64_t, const std::string &));
+
   MOCK_METHOD3(add_or_update_attribute,
                void(int64_t, const std::string&,
                     const service_daemon::AttributeValue&));
@@ -249,8 +251,8 @@ struct ServiceDaemon<librbd::MockTestImageCtx> {
 template <>
 struct Threads<librbd::MockTestImageCtx> {
   MockSafeTimer *timer;
-  Mutex &timer_lock;
-  Cond timer_cond;
+  ceph::mutex &timer_lock;
+  ceph::condition_variable timer_cond;
 
   MockContextWQ *work_queue;
 
@@ -275,6 +277,7 @@ namespace rbd {
 namespace mirror {
 
 using ::testing::_;
+using ::testing::AtLeast;
 using ::testing::DoAll;
 using ::testing::InSequence;
 using ::testing::Invoke;
@@ -284,15 +287,22 @@ using ::testing::WithArg;
 
 class TestMockPoolReplayer : public TestMockFixture {
 public:
+  typedef librbd::api::Namespace<librbd::MockTestImageCtx> MockNamespace;
   typedef PoolReplayer<librbd::MockTestImageCtx> MockPoolReplayer;
-  typedef ImageMap<librbd::MockTestImageCtx> MockImageMap;
-  typedef InstanceReplayer<librbd::MockTestImageCtx> MockInstanceReplayer;
-  typedef InstanceWatcher<librbd::MockTestImageCtx> MockInstanceWatcher;
+  typedef Throttler<librbd::MockTestImageCtx> MockThrottler;
+  typedef NamespaceReplayer<librbd::MockTestImageCtx> MockNamespaceReplayer;
+  typedef RemotePoolPoller<librbd::MockTestImageCtx> MockRemotePoolPoller;
   typedef LeaderWatcher<librbd::MockTestImageCtx> MockLeaderWatcher;
-  typedef PoolWatcher<librbd::MockTestImageCtx> MockPoolWatcher;
   typedef ServiceDaemon<librbd::MockTestImageCtx> MockServiceDaemon;
   typedef Threads<librbd::MockTestImageCtx> MockThreads;
 
+  void expect_work_queue(MockThreads &mock_threads) {
+    EXPECT_CALL(*mock_threads.work_queue, queue(_, _))
+      .WillRepeatedly(Invoke([this](Context *ctx, int r) {
+            m_threads->work_queue->queue(ctx, r);
+          }));
+  }
+
   void expect_connect(librados::MockTestMemCluster& mock_cluster,
                       librados::MockTestMemRadosClient* mock_rados_client,
                       const std::string& cluster_name, CephContext** cct_ref) {
@@ -303,7 +313,6 @@ public:
                     cct->get();
                     *cct_ref = cct;
                   }
-
                   return mock_rados_client;
                 }));
   }
@@ -330,63 +339,201 @@ public:
                       Return(r)));
   }
 
-  void expect_instance_replayer_init(MockInstanceReplayer& mock_instance_replayer) {
-    EXPECT_CALL(mock_instance_replayer, init());
+  void expect_mirror_mode_get(librados::MockTestMemIoCtxImpl *io_ctx_impl,
+                              cls::rbd::MirrorMode mirror_mode, int r) {
+    bufferlist out_bl;
+    encode(mirror_mode, out_bl);
+
+    EXPECT_CALL(*io_ctx_impl,
+                exec(RBD_MIRRORING, _, StrEq("rbd"), StrEq("mirror_mode_get"),
+                     _, _, _))
+      .WillOnce(DoAll(WithArg<5>(Invoke([out_bl](bufferlist *bl) {
+                                          *bl = out_bl;
+                                        })),
+          Return(r)));
   }
 
-  void expect_instance_replayer_shut_down(MockInstanceReplayer& mock_instance_replayer) {
-    EXPECT_CALL(mock_instance_replayer, shut_down());
+  void expect_mirror_mode_get(librados::MockTestMemIoCtxImpl *io_ctx_impl) {
+    EXPECT_CALL(*io_ctx_impl,
+                exec(RBD_MIRRORING, _, StrEq("rbd"), StrEq("mirror_mode_get"),
+                     _, _, _))
+      .WillRepeatedly(DoAll(WithArg<5>(Invoke([](bufferlist *bl) {
+                encode(cls::rbd::MIRROR_MODE_POOL, *bl);
+              })),
+          Return(0)));
   }
 
-  void expect_instance_replayer_stop(MockInstanceReplayer& mock_instance_replayer) {
-    EXPECT_CALL(mock_instance_replayer, stop());
+  void expect_leader_watcher_init(MockLeaderWatcher& mock_leader_watcher,
+                                  int r) {
+    EXPECT_CALL(mock_leader_watcher, init())
+      .WillOnce(Return(r));
   }
 
-  void expect_instance_replayer_add_peer(MockInstanceReplayer& mock_instance_replayer,
-                                        const std::string& uuid) {
-    EXPECT_CALL(mock_instance_replayer, add_peer(uuid, _));
+  void expect_leader_watcher_shut_down(MockLeaderWatcher& mock_leader_watcher) {
+    EXPECT_CALL(mock_leader_watcher, shut_down());
+  }
+
+  void expect_leader_watcher_get_leader_instance_id(
+      MockLeaderWatcher& mock_leader_watcher) {
+    EXPECT_CALL(mock_leader_watcher, get_leader_instance_id(_))
+      .WillRepeatedly(Return(true));
   }
 
-  void expect_instance_watcher_get_instance_id(
-      MockInstanceWatcher& mock_instance_watcher,
+  void expect_leader_watcher_list_instances(
+      MockLeaderWatcher& mock_leader_watcher) {
+    EXPECT_CALL(mock_leader_watcher, list_instances(_))
+      .Times(AtLeast(0));
+  }
+
+  void expect_remote_pool_poller_init(
+      MockRemotePoolPoller& mock_remote_pool_poller,
+      const RemotePoolMeta& remote_pool_meta, int r) {
+    EXPECT_CALL(mock_remote_pool_poller, init(_))
+      .WillOnce(Invoke(
+                  [this, &mock_remote_pool_poller, remote_pool_meta, r]
+                  (Context* ctx) {
+                    if (r >= 0) {
+                      mock_remote_pool_poller.listener->handle_updated(
+                        remote_pool_meta);
+                    }
+
+                    m_threads->work_queue->queue(ctx, r);
+                }));
+  }
+
+  void expect_remote_pool_poller_shut_down(
+      MockRemotePoolPoller& mock_remote_pool_poller, int r) {
+    EXPECT_CALL(mock_remote_pool_poller, shut_down(_))
+      .WillOnce(Invoke(
+                  [this, r](Context* ctx) {
+                    m_threads->work_queue->queue(ctx, r);
+                }));
+  }
+
+  void expect_leader_watcher_is_blacklisted(
+      MockLeaderWatcher &mock_leader_watcher, bool blacklisted) {
+    EXPECT_CALL(mock_leader_watcher, is_blacklisted())
+      .WillRepeatedly(Return(blacklisted));
+  }
+
+  void expect_namespace_replayer_is_blacklisted(
+      MockNamespaceReplayer &mock_namespace_replayer,
+      bool blacklisted) {
+    EXPECT_CALL(mock_namespace_replayer, is_blacklisted())
+      .WillRepeatedly(Return(blacklisted));
+  }
+
+  void expect_namespace_replayer_get_instance_id(
+      MockNamespaceReplayer &mock_namespace_replayer,
       const std::string &instance_id) {
-    EXPECT_CALL(mock_instance_watcher, get_instance_id())
+    EXPECT_CALL(mock_namespace_replayer, get_instance_id())
       .WillOnce(Return(instance_id));
   }
 
-  void expect_instance_watcher_init(MockInstanceWatcher& mock_instance_watcher,
-                                    int r) {
-    EXPECT_CALL(mock_instance_watcher, init())
-      .WillOnce(Return(r));
+  void expect_namespace_replayer_init(
+      MockNamespaceReplayer &mock_namespace_replayer, int r,
+      Context *on_init = nullptr) {
+
+    EXPECT_CALL(mock_namespace_replayer, init(_))
+      .WillOnce(Invoke([this, r, on_init](Context* ctx) {
+                         m_threads->work_queue->queue(ctx, r);
+                         if (on_init != nullptr) {
+                           m_threads->work_queue->queue(on_init, r);
+                         }
+                       }));
   }
 
-  void expect_instance_watcher_shut_down(MockInstanceWatcher& mock_instance_watcher) {
-    EXPECT_CALL(mock_instance_watcher, shut_down());
+  void expect_namespace_replayer_shut_down(
+      MockNamespaceReplayer &mock_namespace_replayer,
+      Context *on_shut_down = nullptr) {
+    EXPECT_CALL(mock_namespace_replayer, shut_down(_))
+      .WillOnce(Invoke([this, on_shut_down](Context* ctx) {
+                         m_threads->work_queue->queue(ctx);
+                         if (on_shut_down != nullptr) {
+                           m_threads->work_queue->queue(on_shut_down);
+                         }
+                       }));
   }
 
-  void expect_leader_watcher_init(MockLeaderWatcher& mock_leader_watcher,
-                                  int r) {
-    EXPECT_CALL(mock_leader_watcher, init())
-      .WillOnce(Return(r));
+  void expect_namespace_replayer_handle_acquire_leader(
+      MockNamespaceReplayer &mock_namespace_replayer, int r,
+      Context *on_acquire = nullptr) {
+    EXPECT_CALL(mock_namespace_replayer, handle_acquire_leader(_))
+      .WillOnce(Invoke([this, r, on_acquire](Context* ctx) {
+                         m_threads->work_queue->queue(ctx, r);
+                         if (on_acquire != nullptr) {
+                           m_threads->work_queue->queue(on_acquire, r);
+                         }
+                       }));
   }
 
-  void expect_leader_watcher_shut_down(MockLeaderWatcher& mock_leader_watcher) {
-    EXPECT_CALL(mock_leader_watcher, shut_down());
+  void expect_namespace_replayer_handle_release_leader(
+      MockNamespaceReplayer &mock_namespace_replayer, int r,
+      Context *on_release = nullptr) {
+    EXPECT_CALL(mock_namespace_replayer, handle_release_leader(_))
+      .WillOnce(Invoke([this, r, on_release](Context* ctx) {
+                         m_threads->work_queue->queue(ctx, r);
+                         if (on_release != nullptr) {
+                           m_threads->work_queue->queue(on_release, r);
+                         }
+                       }));
+  }
+
+  void expect_namespace_replayer_handle_update_leader(
+      MockNamespaceReplayer &mock_namespace_replayer,
+      const std::string &leader_instance_id,
+      Context *on_update = nullptr) {
+    EXPECT_CALL(mock_namespace_replayer,
+                handle_update_leader(leader_instance_id))
+      .WillOnce(Invoke([on_update](const std::string &) {
+                         if (on_update != nullptr) {
+                           on_update->complete(0);
+                         }
+                       }));
+  }
+
+  void expect_namespace_replayer_handle_instances_added(
+      MockNamespaceReplayer &mock_namespace_replayer) {
+    EXPECT_CALL(mock_namespace_replayer, handle_instances_added(_));
+  }
+
+  void expect_namespace_replayer_handle_instances_removed(
+      MockNamespaceReplayer &mock_namespace_replayer) {
+    EXPECT_CALL(mock_namespace_replayer, handle_instances_removed(_));
+  }
+
+  void expect_service_daemon_add_namespace(
+      MockServiceDaemon &mock_service_daemon,
+      const std::string& namespace_name) {
+    EXPECT_CALL(mock_service_daemon,
+                add_namespace(m_local_io_ctx.get_id(), namespace_name));
+  }
+
+  void expect_service_daemon_remove_namespace(
+      MockServiceDaemon &mock_service_daemon,
+      const std::string& namespace_name) {
+    EXPECT_CALL(mock_service_daemon,
+                remove_namespace(m_local_io_ctx.get_id(), namespace_name));
   }
 
   void expect_service_daemon_add_or_update_attribute(
       MockServiceDaemon &mock_service_daemon, const std::string& key,
       const service_daemon::AttributeValue& value) {
-    EXPECT_CALL(mock_service_daemon, add_or_update_attribute(_, _, _));
+    EXPECT_CALL(mock_service_daemon, add_or_update_attribute(_, key, value));
+  }
+
+  void expect_service_daemon_remove_attribute(
+      MockServiceDaemon &mock_service_daemon, const std::string& key) {
+    EXPECT_CALL(mock_service_daemon, remove_attribute(_, key));
   }
 
   void expect_service_daemon_add_or_update_instance_id_attribute(
-      MockInstanceWatcher& mock_instance_watcher,
-      MockServiceDaemon &mock_service_daemon) {
-    expect_instance_watcher_get_instance_id(mock_instance_watcher, "1234");
-    expect_service_daemon_add_or_update_attribute(mock_service_daemon,
-                                                  "instance_id", "1234");
+      MockServiceDaemon &mock_service_daemon, const std::string &instance_id) {
+    expect_service_daemon_add_or_update_attribute(
+        mock_service_daemon, "instance_id", {instance_id});
   }
+
+  PoolMetaCache m_pool_meta_cache{g_ceph_context};
 };
 
 TEST_F(TestMockPoolReplayer, ConfigKeyOverride) {
@@ -394,6 +541,17 @@ TEST_F(TestMockPoolReplayer, ConfigKeyOverride) {
   peer_spec.mon_host = "123";
   peer_spec.key = "234";
 
+  auto mock_default_namespace_replayer = new MockNamespaceReplayer();
+  expect_namespace_replayer_is_blacklisted(*mock_default_namespace_replayer,
+                                           false);
+
+  MockThreads mock_threads(m_threads);
+  expect_work_queue(mock_threads);
+
+  auto mock_leader_watcher = new MockLeaderWatcher();
+  expect_leader_watcher_get_leader_instance_id(*mock_leader_watcher);
+  expect_leader_watcher_is_blacklisted(*mock_leader_watcher, false);
+
   InSequence seq;
 
   auto& mock_cluster = get_mock_cluster();
@@ -412,35 +570,362 @@ TEST_F(TestMockPoolReplayer, ConfigKeyOverride) {
   expect_create_ioctx(mock_local_rados_client, mock_local_io_ctx);
 
   expect_mirror_uuid_get(mock_local_io_ctx, "uuid", 0);
+  auto mock_remote_pool_poller = new MockRemotePoolPoller();
+  expect_remote_pool_poller_init(*mock_remote_pool_poller,
+                                 {"remote mirror uuid", ""}, 0);
+  expect_namespace_replayer_init(*mock_default_namespace_replayer, 0);
+  expect_leader_watcher_init(*mock_leader_watcher, 0);
+
+  MockServiceDaemon mock_service_daemon;
+  std::string instance_id = stringify(mock_local_io_ctx->get_instance_id());
+  expect_service_daemon_add_or_update_instance_id_attribute(
+    mock_service_daemon, instance_id);
 
-  auto mock_instance_replayer = new MockInstanceReplayer();
-  expect_instance_replayer_init(*mock_instance_replayer);
-  expect_instance_replayer_add_peer(*mock_instance_replayer, "uuid");
+  MockPoolReplayer pool_replayer(&mock_threads, &mock_service_daemon, nullptr,
+                                 &m_pool_meta_cache,
+                                 m_local_io_ctx.get_id(), peer_spec, {});
+  pool_replayer.init("siteA");
+
+  ASSERT_TRUE(remote_cct != nullptr);
+  ASSERT_EQ("123", remote_cct->_conf.get_val<std::string>("mon_host"));
+  ASSERT_EQ("234", remote_cct->_conf.get_val<std::string>("key"));
+  remote_cct->put();
 
-  auto mock_instance_watcher = new MockInstanceWatcher();
-  expect_instance_watcher_init(*mock_instance_watcher, 0);
+  expect_leader_watcher_shut_down(*mock_leader_watcher);
+  expect_namespace_replayer_shut_down(*mock_default_namespace_replayer);
+  expect_remote_pool_poller_shut_down(*mock_remote_pool_poller, 0);
+
+  pool_replayer.shut_down();
+}
+
+TEST_F(TestMockPoolReplayer, AcquireReleaseLeader) {
+  PeerSpec peer_spec{"uuid", "cluster name", "client.name"};
+  peer_spec.mon_host = "123";
+  peer_spec.key = "234";
+
+  auto mock_default_namespace_replayer = new MockNamespaceReplayer();
+  expect_namespace_replayer_is_blacklisted(*mock_default_namespace_replayer,
+                                           false);
+
+  MockThreads mock_threads(m_threads);
+  expect_work_queue(mock_threads);
+
+  auto mock_leader_watcher = new MockLeaderWatcher();
+  expect_leader_watcher_get_leader_instance_id(*mock_leader_watcher);
+  expect_leader_watcher_list_instances(*mock_leader_watcher);
+  expect_leader_watcher_is_blacklisted(*mock_leader_watcher, false);
+
+  InSequence seq;
+
+  auto& mock_cluster = get_mock_cluster();
+  auto mock_local_rados_client = mock_cluster.do_create_rados_client(
+    g_ceph_context);
+  expect_connect(mock_cluster, mock_local_rados_client, "ceph", nullptr);
+
+  auto mock_remote_rados_client = mock_cluster.do_create_rados_client(
+    g_ceph_context);
+  expect_connect(mock_cluster, mock_remote_rados_client, "cluster name",
+                 nullptr);
+
+  auto mock_local_io_ctx = mock_local_rados_client->do_create_ioctx(
+    m_local_io_ctx.get_id(), m_local_io_ctx.get_pool_name());
+  expect_create_ioctx(mock_local_rados_client, mock_local_io_ctx);
+
+  expect_mirror_uuid_get(mock_local_io_ctx, "uuid", 0);
+  auto mock_remote_pool_poller = new MockRemotePoolPoller();
+  expect_remote_pool_poller_init(*mock_remote_pool_poller,
+                                 {"remote mirror uuid", ""}, 0);
+  expect_namespace_replayer_init(*mock_default_namespace_replayer, 0);
+  expect_leader_watcher_init(*mock_leader_watcher, 0);
 
   MockServiceDaemon mock_service_daemon;
+  std::string instance_id = stringify(mock_local_io_ctx->get_instance_id());
   expect_service_daemon_add_or_update_instance_id_attribute(
-      *mock_instance_watcher, mock_service_daemon);
+    mock_service_daemon, instance_id);
+
+  MockPoolReplayer pool_replayer(&mock_threads, &mock_service_daemon, nullptr,
+                                 &m_pool_meta_cache,
+                                 m_local_io_ctx.get_id(), peer_spec, {});
+  pool_replayer.init("siteA");
+
+  expect_service_daemon_add_or_update_attribute(
+      mock_service_daemon, SERVICE_DAEMON_LEADER_KEY, true);
+  expect_namespace_replayer_handle_acquire_leader(
+      *mock_default_namespace_replayer, 0);
+
+  C_SaferCond on_acquire;
+  mock_leader_watcher->listener->post_acquire_handler(&on_acquire);
+  ASSERT_EQ(0, on_acquire.wait());
+
+  expect_service_daemon_remove_attribute(mock_service_daemon,
+                                         SERVICE_DAEMON_LEADER_KEY);
+  expect_namespace_replayer_handle_release_leader(
+      *mock_default_namespace_replayer, 0);
+
+  C_SaferCond on_release;
+  mock_leader_watcher->listener->pre_release_handler(&on_release);
+  ASSERT_EQ(0, on_release.wait());
+
+  expect_leader_watcher_shut_down(*mock_leader_watcher);
+  expect_namespace_replayer_shut_down(*mock_default_namespace_replayer);
+  expect_remote_pool_poller_shut_down(*mock_remote_pool_poller, 0);
+
+  pool_replayer.shut_down();
+}
+
+TEST_F(TestMockPoolReplayer, Namespaces) {
+  PeerSpec peer_spec{"uuid", "cluster name", "client.name"};
+  peer_spec.mon_host = "123";
+  peer_spec.key = "234";
+
+  g_ceph_context->_conf.set_val(
+      "rbd_mirror_pool_replayers_refresh_interval", "1");
+
+  MockNamespace mock_namespace;
+
+  auto mock_default_namespace_replayer = new MockNamespaceReplayer();
+  expect_namespace_replayer_is_blacklisted(*mock_default_namespace_replayer,
+                                           false);
+
+  auto mock_ns1_namespace_replayer = new MockNamespaceReplayer("ns1");
+  expect_namespace_replayer_is_blacklisted(*mock_ns1_namespace_replayer,
+                                           false);
+
+  auto mock_ns2_namespace_replayer = new MockNamespaceReplayer("ns2");
+  expect_namespace_replayer_is_blacklisted(*mock_ns2_namespace_replayer,
+                                           false);
+
+  MockThreads mock_threads(m_threads);
+  expect_work_queue(mock_threads);
 
   auto mock_leader_watcher = new MockLeaderWatcher();
+  expect_leader_watcher_get_leader_instance_id(*mock_leader_watcher);
+  expect_leader_watcher_list_instances(*mock_leader_watcher);
+  expect_leader_watcher_is_blacklisted(*mock_leader_watcher, false);
+
+  auto& mock_cluster = get_mock_cluster();
+  auto mock_local_rados_client = mock_cluster.do_create_rados_client(
+      g_ceph_context);
+  auto mock_local_io_ctx = mock_local_rados_client->do_create_ioctx(
+      m_local_io_ctx.get_id(), m_local_io_ctx.get_pool_name());
+  auto mock_remote_rados_client = mock_cluster.do_create_rados_client(
+      g_ceph_context);
+
+  expect_mirror_mode_get(mock_local_io_ctx);
+
+  InSequence seq;
+
+  expect_connect(mock_cluster, mock_local_rados_client, "ceph", nullptr);
+  expect_connect(mock_cluster, mock_remote_rados_client, "cluster name",
+                 nullptr);
+  expect_create_ioctx(mock_local_rados_client, mock_local_io_ctx);
+  expect_mirror_uuid_get(mock_local_io_ctx, "uuid", 0);
+  auto mock_remote_pool_poller = new MockRemotePoolPoller();
+  expect_remote_pool_poller_init(*mock_remote_pool_poller,
+                                 {"remote mirror uuid", ""}, 0);
+  expect_namespace_replayer_init(*mock_default_namespace_replayer, 0);
   expect_leader_watcher_init(*mock_leader_watcher, 0);
 
-  MockThreads mock_threads(m_threads);
-  MockPoolReplayer pool_replayer(&mock_threads, &mock_service_daemon,
+  MockServiceDaemon mock_service_daemon;
+  std::string instance_id = stringify(mock_local_io_ctx->get_instance_id());
+  expect_service_daemon_add_or_update_instance_id_attribute(
+    mock_service_daemon, instance_id);
+
+  MockPoolReplayer pool_replayer(&mock_threads, &mock_service_daemon, nullptr,
+                                 &m_pool_meta_cache,
                                  m_local_io_ctx.get_id(), peer_spec, {});
-  pool_replayer.init();
+  pool_replayer.init("siteA");
+
+  C_SaferCond on_ns1_init;
+  expect_namespace_replayer_init(*mock_ns1_namespace_replayer, 0);
+  expect_service_daemon_add_namespace(mock_service_daemon, "ns1");
+  expect_namespace_replayer_handle_update_leader(*mock_ns1_namespace_replayer,
+                                                 "", &on_ns1_init);
+
+  mock_namespace.add("ns1");
+  ASSERT_EQ(0, on_ns1_init.wait());
+
+  expect_service_daemon_add_or_update_attribute(
+      mock_service_daemon, SERVICE_DAEMON_LEADER_KEY, true);
+  expect_namespace_replayer_handle_acquire_leader(
+      *mock_default_namespace_replayer, 0);
+  expect_namespace_replayer_handle_acquire_leader(
+      *mock_ns1_namespace_replayer, 0);
+
+  C_SaferCond on_acquire;
+  mock_leader_watcher->listener->post_acquire_handler(&on_acquire);
+  ASSERT_EQ(0, on_acquire.wait());
+
+  expect_namespace_replayer_init(*mock_ns2_namespace_replayer, 0);
+  expect_service_daemon_add_namespace(mock_service_daemon, "ns2");
+  C_SaferCond on_ns2_acquire;
+  expect_namespace_replayer_handle_acquire_leader(
+      *mock_ns2_namespace_replayer, 0, &on_ns2_acquire);
+  expect_namespace_replayer_handle_instances_added(
+      *mock_ns2_namespace_replayer);
+
+  mock_namespace.add("ns2");
+  ASSERT_EQ(0, on_ns2_acquire.wait());
+
+  C_SaferCond on_ns2_shut_down;
+  expect_service_daemon_remove_namespace(mock_service_daemon, "ns2");
+  expect_namespace_replayer_shut_down(*mock_ns2_namespace_replayer,
+                                      &on_ns2_shut_down);
+  mock_namespace.remove("ns2");
+  ASSERT_EQ(0, on_ns2_shut_down.wait());
+
+  expect_service_daemon_remove_attribute(mock_service_daemon,
+                                         SERVICE_DAEMON_LEADER_KEY);
+  expect_namespace_replayer_handle_release_leader(
+      *mock_default_namespace_replayer, 0);
+  expect_namespace_replayer_handle_release_leader(
+      *mock_ns1_namespace_replayer, 0);
+
+  C_SaferCond on_release;
+  mock_leader_watcher->listener->pre_release_handler(&on_release);
+  ASSERT_EQ(0, on_release.wait());
+
+  expect_service_daemon_remove_namespace(mock_service_daemon, "ns1");
+  expect_namespace_replayer_shut_down(*mock_ns1_namespace_replayer);
+  expect_leader_watcher_shut_down(*mock_leader_watcher);
+  expect_namespace_replayer_shut_down(*mock_default_namespace_replayer);
+  expect_remote_pool_poller_shut_down(*mock_remote_pool_poller, 0);
 
-  ASSERT_TRUE(remote_cct != nullptr);
-  ASSERT_EQ("123", remote_cct->_conf.get_val<std::string>("mon_host"));
-  ASSERT_EQ("234", remote_cct->_conf.get_val<std::string>("key"));
-  remote_cct->put();
+  pool_replayer.shut_down();
+}
+
+TEST_F(TestMockPoolReplayer, NamespacesError) {
+  PeerSpec peer_spec{"uuid", "cluster name", "client.name"};
+  peer_spec.mon_host = "123";
+  peer_spec.key = "234";
+
+  g_ceph_context->_conf.set_val(
+      "rbd_mirror_pool_replayers_refresh_interval", "1");
+
+  MockNamespace mock_namespace;
+
+  auto mock_default_namespace_replayer = new MockNamespaceReplayer();
+  expect_namespace_replayer_is_blacklisted(*mock_default_namespace_replayer,
+                                           false);
+  auto mock_ns1_namespace_replayer = new MockNamespaceReplayer("ns1");
+  auto mock_ns2_namespace_replayer = new MockNamespaceReplayer("ns2");
+  expect_namespace_replayer_is_blacklisted(*mock_ns2_namespace_replayer,
+                                           false);
+  auto mock_ns3_namespace_replayer = new MockNamespaceReplayer("ns3");
+
+  MockThreads mock_threads(m_threads);
+  expect_work_queue(mock_threads);
+
+  auto mock_leader_watcher = new MockLeaderWatcher();
+  expect_leader_watcher_get_leader_instance_id(*mock_leader_watcher);
+  expect_leader_watcher_list_instances(*mock_leader_watcher);
+  expect_leader_watcher_is_blacklisted(*mock_leader_watcher, false);
+
+  auto& mock_cluster = get_mock_cluster();
+  auto mock_local_rados_client = mock_cluster.do_create_rados_client(
+      g_ceph_context);
+  auto mock_local_io_ctx = mock_local_rados_client->do_create_ioctx(
+      m_local_io_ctx.get_id(), m_local_io_ctx.get_pool_name());
+  auto mock_remote_rados_client = mock_cluster.do_create_rados_client(
+      g_ceph_context);
+
+  expect_mirror_mode_get(mock_local_io_ctx);
+
+  InSequence seq;
+
+  expect_connect(mock_cluster, mock_local_rados_client, "ceph", nullptr);
+  expect_connect(mock_cluster, mock_remote_rados_client, "cluster name",
+                 nullptr);
+  expect_create_ioctx(mock_local_rados_client, mock_local_io_ctx);
+  expect_mirror_uuid_get(mock_local_io_ctx, "uuid", 0);
+  auto mock_remote_pool_poller = new MockRemotePoolPoller();
+  expect_remote_pool_poller_init(*mock_remote_pool_poller,
+                                 {"remote mirror uuid", ""}, 0);
+  expect_namespace_replayer_init(*mock_default_namespace_replayer, 0);
+  expect_leader_watcher_init(*mock_leader_watcher, 0);
+
+  MockServiceDaemon mock_service_daemon;
+  std::string instance_id = stringify(mock_local_io_ctx->get_instance_id());
+  expect_service_daemon_add_or_update_instance_id_attribute(
+    mock_service_daemon, instance_id);
+
+  MockPoolReplayer pool_replayer(&mock_threads, &mock_service_daemon, nullptr,
+                                 &m_pool_meta_cache,
+                                 m_local_io_ctx.get_id(), peer_spec, {});
+  pool_replayer.init("siteA");
+
+  // test namespace replayer init fails for non leader
+
+  C_SaferCond on_ns1_init;
+  Context* ctx = new LambdaContext(
+      [&mock_namespace, &on_ns1_init](int r) {
+        mock_namespace.remove("ns1");
+        on_ns1_init.complete(r);
+      });
+  expect_namespace_replayer_init(*mock_ns1_namespace_replayer, -EINVAL, ctx);
+  mock_namespace.add("ns1");
+  ASSERT_EQ(-EINVAL, on_ns1_init.wait());
+
+  // test acquire leader fails when default namespace replayer fails
+
+  expect_service_daemon_add_or_update_attribute(
+    mock_service_daemon, SERVICE_DAEMON_LEADER_KEY, true);
+  expect_namespace_replayer_handle_acquire_leader(
+    *mock_default_namespace_replayer, -EINVAL);
+
+  C_SaferCond on_acquire1;
+  mock_leader_watcher->listener->post_acquire_handler(&on_acquire1);
+  ASSERT_EQ(-EINVAL, on_acquire1.wait());
+
+  // test acquire leader succeeds when non-default namespace replayer fails
+
+  C_SaferCond on_ns2_init;
+  expect_namespace_replayer_init(*mock_ns2_namespace_replayer, 0);
+  expect_service_daemon_add_namespace(mock_service_daemon, "ns2");
+  expect_namespace_replayer_handle_update_leader(*mock_ns2_namespace_replayer,
+                                                 "", &on_ns2_init);
+  mock_namespace.add("ns2");
+  ASSERT_EQ(0, on_ns2_init.wait());
+
+  expect_service_daemon_add_or_update_attribute(
+      mock_service_daemon, SERVICE_DAEMON_LEADER_KEY, true);
+  expect_namespace_replayer_handle_acquire_leader(
+      *mock_default_namespace_replayer, 0);
+
+  expect_namespace_replayer_handle_acquire_leader(*mock_ns2_namespace_replayer,
+                                                  -EINVAL);
+  ctx = new LambdaContext(
+      [&mock_namespace](int) {
+        mock_namespace.remove("ns2");
+      });
+  expect_service_daemon_remove_namespace(mock_service_daemon, "ns2");
+  expect_namespace_replayer_shut_down(*mock_ns2_namespace_replayer, ctx);
+  mock_namespace.add("ns2");
+
+  C_SaferCond on_acquire2;
+  mock_leader_watcher->listener->post_acquire_handler(&on_acquire2);
+  ASSERT_EQ(0, on_acquire2.wait());
+
+  // test namespace replayer init fails on acquire leader
+
+  C_SaferCond on_ns3_shut_down;
+  ctx = new LambdaContext(
+      [&mock_namespace, &on_ns3_shut_down](int) {
+        mock_namespace.remove("ns3");
+        on_ns3_shut_down.complete(0);
+      });
+  expect_namespace_replayer_init(*mock_ns3_namespace_replayer, 0);
+  expect_service_daemon_add_namespace(mock_service_daemon, "ns3");
+  expect_namespace_replayer_handle_acquire_leader(*mock_ns3_namespace_replayer,
+                                                  -EINVAL);
+  expect_service_daemon_remove_namespace(mock_service_daemon, "ns3");
+  expect_namespace_replayer_shut_down(*mock_ns3_namespace_replayer, ctx);
+  mock_namespace.add("ns3");
+  ASSERT_EQ(0, on_ns3_shut_down.wait());
 
-  expect_instance_replayer_stop(*mock_instance_replayer);
   expect_leader_watcher_shut_down(*mock_leader_watcher);
-  expect_instance_watcher_shut_down(*mock_instance_watcher);
-  expect_instance_replayer_shut_down(*mock_instance_replayer);
+  expect_namespace_replayer_shut_down(*mock_default_namespace_replayer);
+  expect_remote_pool_poller_shut_down(*mock_remote_pool_poller, 0);
 
   pool_replayer.shut_down();
 }