#include "test/librbd/mock/MockImageCtx.h"
#include "test/rbd_mirror/test_mock_fixture.h"
#include "tools/rbd_mirror/InstanceReplayer.h"
-#include "tools/rbd_mirror/ImageSyncThrottler.h"
#include "tools/rbd_mirror/InstanceWatcher.h"
#include "tools/rbd_mirror/Threads.h"
template <>
struct Threads<librbd::MockTestImageCtx> {
- Mutex &timer_lock;
+ ceph::mutex &timer_lock;
SafeTimer *timer;
ContextWQ *work_queue;
};
template <>
-struct ImageSyncThrottler<librbd::MockTestImageCtx> {
- static ImageSyncThrottler* s_instance;
+struct Throttler<librbd::MockTestImageCtx> {
+ static Throttler* s_instance;
- static ImageSyncThrottler *create(CephContext *cct) {
- ceph_assert(s_instance != nullptr);
- return s_instance;
- }
-
- ImageSyncThrottler() {
+ Throttler() {
ceph_assert(s_instance == nullptr);
s_instance = this;
}
- virtual ~ImageSyncThrottler() {
+ virtual ~Throttler() {
ceph_assert(s_instance == this);
s_instance = nullptr;
}
- MOCK_METHOD0(destroy, void());
- MOCK_METHOD1(drain, void(int));
- MOCK_METHOD2(start_op, void(const std::string &, Context *));
- MOCK_METHOD1(finish_op, void(const std::string &));
+ MOCK_METHOD3(start_op, void(const std::string &, const std::string &,
+ Context *));
+ MOCK_METHOD2(finish_op, void(const std::string &, const std::string &));
+ MOCK_METHOD2(drain, void(const std::string &, int));
};
-ImageSyncThrottler<librbd::MockTestImageCtx>* ImageSyncThrottler<librbd::MockTestImageCtx>::s_instance = nullptr;
+Throttler<librbd::MockTestImageCtx>* Throttler<librbd::MockTestImageCtx>::s_instance = nullptr;
} // namespace mirror
} // namespace rbd
librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
auto instance_watcher = new MockInstanceWatcher(
- m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id);
+ m_local_io_ctx, m_mock_threads->work_queue, nullptr, nullptr,
+ m_instance_id);
InSequence seq;
// Init
librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
auto instance_watcher = new MockInstanceWatcher(
- m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id);
+ m_local_io_ctx, m_mock_threads->work_queue, nullptr, nullptr,
+ m_instance_id);
InSequence seq;
expect_register_instance(mock_io_ctx, 0);
librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
auto instance_watcher = new MockInstanceWatcher(
- m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id);
+ m_local_io_ctx, m_mock_threads->work_queue, nullptr, nullptr,
+ m_instance_id);
InSequence seq;
// Init
librados::MockTestMemIoCtxImpl &mock_io_ctx1(get_mock_io_ctx(io_ctx1));
MockInstanceReplayer mock_instance_replayer1;
auto instance_watcher1 = MockInstanceWatcher::create(
- io_ctx1, m_mock_threads->work_queue, &mock_instance_replayer1);
+ io_ctx1, m_mock_threads->work_queue, &mock_instance_replayer1, nullptr);
librados::Rados cluster;
librados::IoCtx io_ctx2;
librados::MockTestMemIoCtxImpl &mock_io_ctx2(get_mock_io_ctx(io_ctx2));
MockInstanceReplayer mock_instance_replayer2;
auto instance_watcher2 = MockInstanceWatcher::create(
- io_ctx2, m_mock_threads->work_queue, &mock_instance_replayer2);
+ io_ctx2, m_mock_threads->work_queue, &mock_instance_replayer2, nullptr);
InSequence seq;
librados::MockTestMemIoCtxImpl &mock_io_ctx1(get_mock_io_ctx(io_ctx1));
MockInstanceReplayer mock_instance_replayer1;
auto instance_watcher1 = MockInstanceWatcher::create(
- io_ctx1, m_mock_threads->work_queue, &mock_instance_replayer1);
+ io_ctx1, m_mock_threads->work_queue, &mock_instance_replayer1, nullptr);
librados::Rados cluster;
librados::IoCtx io_ctx2;
librados::MockTestMemIoCtxImpl &mock_io_ctx2(get_mock_io_ctx(io_ctx2));
MockInstanceReplayer mock_instance_replayer2;
auto instance_watcher2 = MockInstanceWatcher::create(
- io_ctx2, m_mock_threads->work_queue, &mock_instance_replayer2);
+ io_ctx2, m_mock_threads->work_queue, &mock_instance_replayer2, nullptr);
InSequence seq;
librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
auto instance_watcher = new MockInstanceWatcher(
- m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id);
+ m_local_io_ctx, m_mock_threads->work_queue, nullptr, nullptr,
+ m_instance_id);
InSequence seq;
// Init
const std::string& o, librados::AioCompletionImpl *c,
bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl) {
c->get();
- auto ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
[instance_watcher, &mock_io_ctx, c, pbl](int r) {
instance_watcher->cancel_notify_requests("other");
encode(librbd::watcher::NotifyResponse(), *pbl);
const std::string& o, librados::AioCompletionImpl *c,
bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl) {
c->get();
- auto ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
[instance_watcher, &mock_io_ctx, c, pbl](int r) {
instance_watcher->cancel_notify_requests("other");
encode(librbd::watcher::NotifyResponse(), *pbl);
MockInstanceReplayer mock_instance_replayer;
auto instance_watcher = new MockInstanceWatcher(
- m_local_io_ctx, m_mock_threads->work_queue, &mock_instance_replayer,
- m_instance_id);
+ m_local_io_ctx, m_mock_threads->work_queue, &mock_instance_replayer,
+ nullptr, m_instance_id);
InSequence seq;
// Init
MockInstanceReplayer mock_instance_replayer;
auto instance_watcher = new MockInstanceWatcher(
- m_local_io_ctx, m_mock_threads->work_queue, &mock_instance_replayer,
- m_instance_id);
+ m_local_io_ctx, m_mock_threads->work_queue, &mock_instance_replayer,
+ nullptr, m_instance_id);
InSequence seq;
// Init
librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
auto instance_watcher = new MockInstanceWatcher(
- m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id);
+ m_local_io_ctx, m_mock_threads->work_queue, nullptr, nullptr,
+ m_instance_id);
InSequence seq;
// Init
const std::string& o, librados::AioCompletionImpl *c,
bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl) {
c->get();
- auto ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
[instance_watcher, &mock_io_ctx, c, pbl](int r) {
instance_watcher->cancel_notify_requests("other");
encode(librbd::watcher::NotifyResponse(), *pbl);
class TestMockInstanceWatcher_NotifySync : public TestMockInstanceWatcher {
public:
- typedef ImageSyncThrottler<librbd::MockTestImageCtx> MockImageSyncThrottler;
+ typedef Throttler<librbd::MockTestImageCtx> MockThrottler;
MockManagedLock mock_managed_lock;
- MockImageSyncThrottler mock_image_sync_throttler;
+ MockThrottler mock_image_sync_throttler;
std::string instance_id1;
std::string instance_id2;
librados::MockTestMemIoCtxImpl &mock_io_ctx1(get_mock_io_ctx(io_ctx1));
instance_watcher1 = MockInstanceWatcher::create(io_ctx1,
m_mock_threads->work_queue,
- nullptr);
+ nullptr,
+ &mock_image_sync_throttler);
EXPECT_EQ("", connect_cluster_pp(cluster));
EXPECT_EQ(0, cluster.ioctx_create(_local_pool_name.c_str(), io_ctx2));
instance_id2 = stringify(io_ctx2.get_instance_id());
librados::MockTestMemIoCtxImpl &mock_io_ctx2(get_mock_io_ctx(io_ctx2));
instance_watcher2 = MockInstanceWatcher::create(io_ctx2,
m_mock_threads->work_queue,
- nullptr);
+ nullptr,
+ &mock_image_sync_throttler);
InSequence seq;
// Init instance watcher 1 (leader)
InSequence seq;
- expect_throttler_destroy();
+ expect_throttler_drain();
instance_watcher1->handle_release_leader();
// Shutdown instance watcher 1
TestMockInstanceWatcher::TearDown();
}
- void expect_throttler_destroy(
- std::vector<Context *> *throttler_queue = nullptr) {
- EXPECT_CALL(mock_image_sync_throttler, drain(-ESTALE))
- .WillOnce(Invoke([throttler_queue] (int r) {
- if (throttler_queue != nullptr) {
- for (auto ctx : *throttler_queue) {
- ctx->complete(r);
- }
- }
- }));
- EXPECT_CALL(mock_image_sync_throttler, destroy());
- }
-
void expect_throttler_start_op(const std::string &sync_id,
Context *on_call = nullptr,
Context **on_start_ctx = nullptr) {
- EXPECT_CALL(mock_image_sync_throttler, start_op(sync_id, _))
+ EXPECT_CALL(mock_image_sync_throttler, start_op("", sync_id, _))
.WillOnce(Invoke([on_call, on_start_ctx] (const std::string &,
+ const std::string &,
Context *ctx) {
if (on_start_ctx != nullptr) {
*on_start_ctx = ctx;
void expect_throttler_finish_op(const std::string &sync_id,
Context *on_finish) {
- EXPECT_CALL(mock_image_sync_throttler, finish_op("sync_id"))
- .WillOnce(Invoke([on_finish](const std::string &) {
+ EXPECT_CALL(mock_image_sync_throttler, finish_op("", "sync_id"))
+ .WillOnce(Invoke([on_finish](const std::string &, const std::string &) {
on_finish->complete(0);
}));
}
+
+ void expect_throttler_drain() {
+ EXPECT_CALL(mock_image_sync_throttler, drain("", -ESTALE));
+ }
};
TEST_F(TestMockInstanceWatcher_NotifySync, StartStopOnLeader) {
ASSERT_EQ(0, on_start1.wait());
C_SaferCond on_start2;
- EXPECT_CALL(mock_image_sync_throttler, finish_op("sync_id"))
- .WillOnce(Invoke([this, &on_start2](const std::string &) {
+ EXPECT_CALL(mock_image_sync_throttler, finish_op("", "sync_id"))
+ .WillOnce(Invoke([this, &on_start2](const std::string &,
+ const std::string &) {
instance_watcher2->notify_sync_request("sync_id", &on_start2);
}));
expect_throttler_start_op("sync_id");
TEST_F(TestMockInstanceWatcher_NotifySync, NoInFlightReleaseAcquireLeader) {
InSequence seq;
- expect_throttler_destroy();
+ expect_throttler_drain();
instance_watcher1->handle_release_leader();
instance_watcher1->handle_acquire_leader();
}
TEST_F(TestMockInstanceWatcher_NotifySync, StartedOnLeaderReleaseLeader) {
InSequence seq;
- expect_throttler_destroy();
+ expect_throttler_drain();
instance_watcher1->handle_release_leader();
instance_watcher2->handle_acquire_leader();
C_SaferCond on_start;
instance_watcher2->notify_sync_request("sync_id", &on_start);
ASSERT_EQ(0, on_start.wait());
- expect_throttler_destroy();
+ expect_throttler_drain();
instance_watcher2->handle_release_leader();
instance_watcher2->notify_sync_complete("sync_id");
instance_watcher1->notify_sync_request("sync_id", &on_start);
ASSERT_EQ(0, on_start_op_called.wait());
- std::vector<Context *> throttler_queue = {on_start_ctx};
- expect_throttler_destroy(&throttler_queue);
+ expect_throttler_drain();
instance_watcher1->handle_release_leader();
+ // emulate throttler queue drain on leader release
+ on_start_ctx->complete(-ESTALE);
+
expect_throttler_start_op("sync_id");
instance_watcher2->handle_acquire_leader();
instance_watcher1->handle_update_leader(instance_id2);
instance_watcher1->notify_sync_complete("sync_id");
ASSERT_EQ(0, on_finish.wait());
- expect_throttler_destroy();
+ expect_throttler_drain();
instance_watcher2->handle_release_leader();
instance_watcher1->handle_acquire_leader();
}
TEST_F(TestMockInstanceWatcher_NotifySync, StartedOnNonLeaderAcquireLeader) {
InSequence seq;
- expect_throttler_destroy();
+ expect_throttler_drain();
instance_watcher1->handle_release_leader();
instance_watcher2->handle_acquire_leader();
instance_watcher1->handle_update_leader(instance_id2);
instance_watcher1->notify_sync_request("sync_id", &on_start);
ASSERT_EQ(0, on_start.wait());
- expect_throttler_destroy();
+ expect_throttler_drain();
instance_watcher2->handle_release_leader();
instance_watcher1->handle_acquire_leader();
instance_watcher2->handle_update_leader(instance_id1);
instance_watcher2->notify_sync_request("sync_id", &on_start);
ASSERT_EQ(0, on_start_op_called.wait());
- std::vector<Context *> throttler_queue = {on_start_ctx};
- expect_throttler_destroy(&throttler_queue);
+ expect_throttler_drain();
instance_watcher1->handle_release_leader();
+ // emulate throttler queue drain on leader release
+ on_start_ctx->complete(-ESTALE);
- EXPECT_CALL(mock_image_sync_throttler, start_op("sync_id", _))
- .WillOnce(WithArg<1>(CompleteContext(0)));
+ EXPECT_CALL(mock_image_sync_throttler, start_op("", "sync_id", _))
+ .WillOnce(WithArg<2>(CompleteContext(0)));
instance_watcher2->handle_acquire_leader();
instance_watcher1->handle_update_leader(instance_id2);
instance_watcher2->notify_sync_complete("sync_id");
ASSERT_EQ(0, on_finish.wait());
- expect_throttler_destroy();
+ expect_throttler_drain();
instance_watcher2->handle_release_leader();
instance_watcher1->handle_acquire_leader();
}