#include "test/librbd/mock/MockImageCtx.h"
#include "test/rbd_mirror/test_mock_fixture.h"
#include "tools/rbd_mirror/InstanceReplayer.h"
+#include "tools/rbd_mirror/ImageSyncThrottler.h"
#include "tools/rbd_mirror/InstanceWatcher.h"
#include "tools/rbd_mirror/Threads.h"
template <>
struct InstanceReplayer<librbd::MockTestImageCtx> {
- MOCK_METHOD4(acquire_image, void(const std::string &, const std::string &,
+ MOCK_METHOD5(acquire_image, void(InstanceWatcher<librbd::MockTestImageCtx> *,
+ const std::string &, const std::string &,
const std::string &, Context *));
MOCK_METHOD5(release_image, void(const std::string &, const std::string &,
const std::string &, bool, Context *));
};
+template <>
+struct ImageSyncThrottler<librbd::MockTestImageCtx> {
+ static ImageSyncThrottler* s_instance;
+
+ static ImageSyncThrottler *create() {
+ assert(s_instance != nullptr);
+ return s_instance;
+ }
+
+ ImageSyncThrottler() {
+ assert(s_instance == nullptr);
+ s_instance = this;
+ }
+
+ virtual ~ImageSyncThrottler() {
+ assert(s_instance == this);
+ s_instance = nullptr;
+ }
+
+ MOCK_METHOD0(destroy, void());
+ MOCK_METHOD1(drain, void(int));
+ MOCK_METHOD2(start_op, void(const std::string &, Context *));
+ MOCK_METHOD1(finish_op, void(const std::string &));
+};
+
+ImageSyncThrottler<librbd::MockTestImageCtx>* ImageSyncThrottler<librbd::MockTestImageCtx>::s_instance = nullptr;
+
} // namespace mirror
} // namespace rbd
ASSERT_EQ(0, instance_watcher2->init());
// Acquire Image on the the same instance
- EXPECT_CALL(mock_instance_replayer1, acquire_image("gid", "uuid", "id", _))
- .WillOnce(WithArg<3>(CompleteContext(0)));
+ EXPECT_CALL(mock_instance_replayer1, acquire_image(instance_watcher1, "gid",
+ "uuid", "id", _))
+ .WillOnce(WithArg<4>(CompleteContext(0)));
C_SaferCond on_acquire1;
instance_watcher1->notify_image_acquire(instance_id1, "gid", "uuid", "id",
&on_acquire1);
ASSERT_EQ(0, on_acquire1.wait());
// Acquire Image on the other instance
- EXPECT_CALL(mock_instance_replayer2, acquire_image("gid", "uuid", "id", _))
- .WillOnce(WithArg<3>(CompleteContext(0)));
+ EXPECT_CALL(mock_instance_replayer2, acquire_image(instance_watcher2, "gid",
+ "uuid", "id", _))
+ .WillOnce(WithArg<4>(CompleteContext(0)));
C_SaferCond on_acquire2;
instance_watcher1->notify_image_acquire(instance_id2, "gid", "uuid", "id",
&on_acquire2);
delete instance_watcher;
}
+class TestMockInstanceWatcher_NotifySync : public TestMockInstanceWatcher {
+public:
+ typedef ImageSyncThrottler<librbd::MockTestImageCtx> MockImageSyncThrottler;
+
+ MockManagedLock mock_managed_lock;
+ MockImageSyncThrottler mock_image_sync_throttler;
+ std::string instance_id1;
+ std::string instance_id2;
+
+ librados::Rados cluster;
+ librados::IoCtx io_ctx2;
+
+ MockInstanceWatcher *instance_watcher1;
+ MockInstanceWatcher *instance_watcher2;
+
+ void SetUp() override {
+ TestMockInstanceWatcher::SetUp();
+
+ instance_id1 = m_instance_id;
+ librados::IoCtx& io_ctx1 = m_local_io_ctx;
+ librados::MockTestMemIoCtxImpl &mock_io_ctx1(get_mock_io_ctx(io_ctx1));
+ instance_watcher1 = MockInstanceWatcher::create(io_ctx1,
+ m_mock_threads->work_queue,
+ nullptr);
+ EXPECT_EQ("", connect_cluster_pp(cluster));
+ EXPECT_EQ(0, cluster.ioctx_create(_local_pool_name.c_str(), io_ctx2));
+ instance_id2 = stringify(io_ctx2.get_instance_id());
+ librados::MockTestMemIoCtxImpl &mock_io_ctx2(get_mock_io_ctx(io_ctx2));
+ instance_watcher2 = MockInstanceWatcher::create(io_ctx2,
+ m_mock_threads->work_queue,
+ nullptr);
+ InSequence seq;
+
+ // Init instance watcher 1 (leader)
+ expect_register_instance(mock_io_ctx1, 0);
+ expect_register_watch(mock_io_ctx1, instance_id1);
+ expect_acquire_lock(mock_managed_lock, 0);
+ EXPECT_EQ(0, instance_watcher1->init());
+ instance_watcher1->handle_acquire_leader();
+
+ // Init instance watcher 2
+ expect_register_instance(mock_io_ctx2, 0);
+ expect_register_watch(mock_io_ctx2, instance_id2);
+ expect_acquire_lock(mock_managed_lock, 0);
+ EXPECT_EQ(0, instance_watcher2->init());
+ instance_watcher2->handle_update_leader(instance_id1);
+ }
+
+ void TearDown() override {
+ librados::IoCtx& io_ctx1 = m_local_io_ctx;
+ librados::MockTestMemIoCtxImpl &mock_io_ctx1(get_mock_io_ctx(io_ctx1));
+ librados::MockTestMemIoCtxImpl &mock_io_ctx2(get_mock_io_ctx(io_ctx2));
+
+ InSequence seq;
+
+ expect_throttler_destroy();
+ instance_watcher1->handle_release_leader();
+
+ // Shutdown instance watcher 1
+ expect_release_lock(mock_managed_lock, 0);
+ expect_unregister_watch(mock_io_ctx1);
+ expect_unregister_instance(mock_io_ctx1, 0);
+ instance_watcher1->shut_down();
+
+ expect_destroy_lock(mock_managed_lock);
+ delete instance_watcher1;
+
+ // Shutdown instance watcher 2
+ expect_release_lock(mock_managed_lock, 0);
+ expect_unregister_watch(mock_io_ctx2);
+ expect_unregister_instance(mock_io_ctx2, 0);
+ instance_watcher2->shut_down();
+
+ expect_destroy_lock(mock_managed_lock);
+ delete instance_watcher2;
+
+ TestMockInstanceWatcher::TearDown();
+ }
+
+ void expect_throttler_destroy(
+ std::vector<Context *> *throttler_queue = nullptr) {
+ EXPECT_CALL(mock_image_sync_throttler, drain(-ESTALE))
+ .WillOnce(Invoke([throttler_queue] (int r) {
+ if (throttler_queue != nullptr) {
+ for (auto ctx : *throttler_queue) {
+ ctx->complete(r);
+ }
+ }
+ }));
+ EXPECT_CALL(mock_image_sync_throttler, destroy());
+ }
+
+ void expect_throttler_start_op(const std::string &sync_id,
+ Context *on_call = nullptr,
+ Context **on_start_ctx = nullptr) {
+ EXPECT_CALL(mock_image_sync_throttler, start_op(sync_id, _))
+ .WillOnce(Invoke([on_call, on_start_ctx] (const std::string &,
+ Context *ctx) {
+ if (on_call != nullptr) {
+ on_call->complete(0);
+ }
+ if (on_start_ctx != nullptr) {
+ *on_start_ctx = ctx;
+ } else {
+ ctx->complete(0);
+ }
+ }));
+ }
+
+ void expect_throttler_finish_op(const std::string &sync_id,
+ Context *on_finish) {
+ EXPECT_CALL(mock_image_sync_throttler, finish_op("sync_id"))
+ .WillOnce(Invoke([on_finish](const std::string &) {
+ on_finish->complete(0);
+ }));
+ }
+};
+
+TEST_F(TestMockInstanceWatcher_NotifySync, StartStopOnLeader) {
+ InSequence seq;
+
+ expect_throttler_start_op("sync_id");
+ C_SaferCond on_start;
+ instance_watcher1->notify_sync_request("sync_id", &on_start);
+ ASSERT_EQ(0, on_start.wait());
+
+ C_SaferCond on_finish;
+ expect_throttler_finish_op("sync_id", &on_finish);
+ instance_watcher1->notify_sync_complete("sync_id");
+ ASSERT_EQ(0, on_finish.wait());
+}
+
+TEST_F(TestMockInstanceWatcher_NotifySync, CancelStartedOnLeader) {
+ InSequence seq;
+
+ expect_throttler_start_op("sync_id");
+ C_SaferCond on_start;
+ instance_watcher1->notify_sync_request("sync_id", &on_start);
+ ASSERT_EQ(0, on_start.wait());
+
+ ASSERT_FALSE(instance_watcher1->cancel_sync_request("sync_id"));
+
+ C_SaferCond on_finish;
+ expect_throttler_finish_op("sync_id", &on_finish);
+ instance_watcher1->notify_sync_complete("sync_id");
+ ASSERT_EQ(0, on_finish.wait());
+}
+
+TEST_F(TestMockInstanceWatcher_NotifySync, StartStopOnNonLeader) {
+ InSequence seq;
+
+ expect_throttler_start_op("sync_id");
+ C_SaferCond on_start;
+ instance_watcher2->notify_sync_request("sync_id", &on_start);
+ ASSERT_EQ(0, on_start.wait());
+
+ C_SaferCond on_finish;
+ expect_throttler_finish_op("sync_id", &on_finish);
+ instance_watcher2->notify_sync_complete("sync_id");
+ ASSERT_EQ(0, on_finish.wait());
+}
+
+TEST_F(TestMockInstanceWatcher_NotifySync, CancelStartedOnNonLeader) {
+ InSequence seq;
+
+ expect_throttler_start_op("sync_id");
+ C_SaferCond on_start;
+ instance_watcher2->notify_sync_request("sync_id", &on_start);
+ ASSERT_EQ(0, on_start.wait());
+
+ ASSERT_FALSE(instance_watcher2->cancel_sync_request("sync_id"));
+
+ C_SaferCond on_finish;
+ expect_throttler_finish_op("sync_id", &on_finish);
+ instance_watcher2->notify_sync_complete("sync_id");
+ ASSERT_EQ(0, on_finish.wait());
+}
+
+TEST_F(TestMockInstanceWatcher_NotifySync, CancelWaitingOnNonLeader) {
+ InSequence seq;
+
+ C_SaferCond on_start_op_called;
+ Context *on_start_ctx;
+ expect_throttler_start_op("sync_id", &on_start_op_called,
+ &on_start_ctx);
+ C_SaferCond on_start;
+ instance_watcher2->notify_sync_request("sync_id", &on_start);
+ ASSERT_EQ(0, on_start_op_called.wait());
+
+ ASSERT_TRUE(instance_watcher2->cancel_sync_request("sync_id"));
+ // emulate watcher timeout
+ on_start_ctx->complete(-ETIMEDOUT);
+ ASSERT_EQ(-ECANCELED, on_start.wait());
+}
+
+TEST_F(TestMockInstanceWatcher_NotifySync, InFlightPrevNotification) {
+ // start sync when previous notification is still in flight
+
+ InSequence seq;
+
+ expect_throttler_start_op("sync_id");
+ C_SaferCond on_start1;
+ instance_watcher2->notify_sync_request("sync_id", &on_start1);
+ ASSERT_EQ(0, on_start1.wait());
+
+ C_SaferCond on_start2;
+ EXPECT_CALL(mock_image_sync_throttler, finish_op("sync_id"))
+ .WillOnce(Invoke([this, &on_start2](const std::string &) {
+ instance_watcher2->notify_sync_request("sync_id", &on_start2);
+ }));
+ expect_throttler_start_op("sync_id");
+ instance_watcher2->notify_sync_complete("sync_id");
+
+ ASSERT_EQ(0, on_start2.wait());
+ C_SaferCond on_finish;
+ expect_throttler_finish_op("sync_id", &on_finish);
+ instance_watcher2->notify_sync_complete("sync_id");
+ ASSERT_EQ(0, on_finish.wait());
+}
+
+TEST_F(TestMockInstanceWatcher_NotifySync, NoInFlightReleaseAcquireLeader) {
+ InSequence seq;
+
+ expect_throttler_destroy();
+ instance_watcher1->handle_release_leader();
+ instance_watcher1->handle_acquire_leader();
+}
+
+TEST_F(TestMockInstanceWatcher_NotifySync, StartedOnLeaderReleaseLeader) {
+ InSequence seq;
+
+ expect_throttler_destroy();
+ instance_watcher1->handle_release_leader();
+ instance_watcher2->handle_acquire_leader();
+
+ expect_throttler_start_op("sync_id");
+ C_SaferCond on_start;
+ instance_watcher2->notify_sync_request("sync_id", &on_start);
+ ASSERT_EQ(0, on_start.wait());
+ expect_throttler_destroy();
+ instance_watcher2->handle_release_leader();
+ instance_watcher2->notify_sync_complete("sync_id");
+
+ instance_watcher1->handle_acquire_leader();
+}
+
+TEST_F(TestMockInstanceWatcher_NotifySync, WaitingOnLeaderReleaseLeader) {
+ InSequence seq;
+
+ C_SaferCond on_start_op_called;
+ Context *on_start_ctx;
+ expect_throttler_start_op("sync_id", &on_start_op_called,
+ &on_start_ctx);
+ C_SaferCond on_start;
+ instance_watcher1->notify_sync_request("sync_id", &on_start);
+ ASSERT_EQ(0, on_start_op_called.wait());
+
+ std::vector<Context *> throttler_queue = {on_start_ctx};
+ expect_throttler_destroy(&throttler_queue);
+ instance_watcher1->handle_release_leader();
+ instance_watcher2->handle_acquire_leader();
+ instance_watcher1->handle_update_leader(instance_id2);
+
+ expect_throttler_start_op("sync_id");
+ ASSERT_EQ(0, on_start.wait());
+ C_SaferCond on_finish;
+ expect_throttler_finish_op("sync_id", &on_finish);
+ instance_watcher1->notify_sync_complete("sync_id");
+ ASSERT_EQ(0, on_finish.wait());
+
+ expect_throttler_destroy();
+ instance_watcher2->handle_release_leader();
+ instance_watcher1->handle_acquire_leader();
+}
+
+TEST_F(TestMockInstanceWatcher_NotifySync, StartedOnNonLeaderAcquireLeader) {
+ InSequence seq;
+
+ expect_throttler_destroy();
+ instance_watcher1->handle_release_leader();
+ instance_watcher2->handle_acquire_leader();
+ instance_watcher1->handle_update_leader(instance_id2);
+
+ expect_throttler_start_op("sync_id");
+ C_SaferCond on_start;
+ instance_watcher1->notify_sync_request("sync_id", &on_start);
+ ASSERT_EQ(0, on_start.wait());
+
+ expect_throttler_destroy();
+ instance_watcher2->handle_release_leader();
+ instance_watcher1->handle_acquire_leader();
+ instance_watcher2->handle_update_leader(instance_id2);
+
+ instance_watcher1->notify_sync_complete("sync_id");
+}
+
+TEST_F(TestMockInstanceWatcher_NotifySync, WaitingOnNonLeaderAcquireLeader) {
+ InSequence seq;
+
+ C_SaferCond on_start_op_called;
+ Context *on_start_ctx;
+ expect_throttler_start_op("sync_id", &on_start_op_called,
+ &on_start_ctx);
+ C_SaferCond on_start;
+ instance_watcher2->notify_sync_request("sync_id", &on_start);
+ ASSERT_EQ(0, on_start_op_called.wait());
+
+ std::vector<Context *> throttler_queue = {on_start_ctx};
+ expect_throttler_destroy(&throttler_queue);
+ instance_watcher1->handle_release_leader();
+
+ EXPECT_CALL(mock_image_sync_throttler, start_op("sync_id", _))
+ .WillOnce(WithArg<1>(CompleteContext(0)));
+ instance_watcher2->handle_acquire_leader();
+ instance_watcher1->handle_update_leader(instance_id2);
+
+ ASSERT_EQ(0, on_start.wait());
+
+ C_SaferCond on_finish;
+ expect_throttler_finish_op("sync_id", &on_finish);
+ instance_watcher2->notify_sync_complete("sync_id");
+ ASSERT_EQ(0, on_finish.wait());
+
+ expect_throttler_destroy();
+ instance_watcher2->handle_release_leader();
+ instance_watcher1->handle_acquire_leader();
+}
+
} // namespace mirror
} // namespace rbd