1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/rados/librados.hpp"
5 #include "librbd/internal.h"
6 #include "librbd/Utils.h"
7 #include "librbd/api/Mirror.h"
8 #include "test/librbd/test_support.h"
9 #include "test/rbd_mirror/test_fixture.h"
10 #include "tools/rbd_mirror/LeaderWatcher.h"
11 #include "tools/rbd_mirror/Threads.h"
12 #include "common/Cond.h"
14 #include "test/librados/test_cxx.h"
15 #include "gtest/gtest.h"
17 using librbd::util::unique_lock_name
;
18 using rbd::mirror::LeaderWatcher
;
20 void register_test_leader_watcher() {
23 class TestLeaderWatcher
: public ::rbd::mirror::TestFixture
{
25 class Listener
: public rbd::mirror::leader_watcher::Listener
{
28 : m_test_lock(ceph::make_mutex(
29 unique_lock_name("LeaderWatcher::m_test_lock", this))) {
32 void on_acquire(int r
, Context
*ctx
) {
33 std::lock_guard locker
{m_test_lock
};
38 void on_release(int r
, Context
*ctx
) {
39 std::lock_guard locker
{m_test_lock
};
44 int acquire_count() const {
45 std::lock_guard locker
{m_test_lock
};
46 return m_acquire_count
;
49 int release_count() const {
50 std::lock_guard locker
{m_test_lock
};
51 return m_release_count
;
54 void post_acquire_handler(Context
*on_finish
) override
{
55 std::lock_guard locker
{m_test_lock
};
57 on_finish
->complete(m_on_acquire_r
);
59 if (m_on_acquire
!= nullptr) {
60 m_on_acquire
->complete(0);
61 m_on_acquire
= nullptr;
65 void pre_release_handler(Context
*on_finish
) override
{
66 std::lock_guard locker
{m_test_lock
};
68 on_finish
->complete(m_on_release_r
);
70 if (m_on_release
!= nullptr) {
71 m_on_release
->complete(0);
72 m_on_release
= nullptr;
76 void update_leader_handler(const std::string
&leader_instance_id
) override
{
79 void handle_instances_added(const InstanceIds
& instance_ids
) override
{
81 void handle_instances_removed(const InstanceIds
& instance_ids
) override
{
85 mutable ceph::mutex m_test_lock
;
86 int m_acquire_count
= 0;
87 int m_release_count
= 0;
88 int m_on_acquire_r
= 0;
89 int m_on_release_r
= 0;
90 Context
*m_on_acquire
= nullptr;
91 Context
*m_on_release
= nullptr;
95 librados::Rados cluster
;
96 librados::IoCtx io_ctx
;
99 std::list
<std::unique_ptr
<Connection
> > m_connections
;
101 void SetUp() override
{
102 TestFixture::SetUp();
103 EXPECT_EQ(0, librbd::api::Mirror
<>::mode_set(m_local_io_ctx
,
104 RBD_MIRROR_MODE_POOL
));
106 if (is_librados_test_stub(*_rados
)) {
107 // speed testing up a little
108 EXPECT_EQ(0, _rados
->conf_set("rbd_mirror_leader_heartbeat_interval",
113 librados::IoCtx
&create_connection(bool no_heartbeats
= false) {
114 m_connections
.push_back(std::unique_ptr
<Connection
>(new Connection()));
115 Connection
*c
= m_connections
.back().get();
117 EXPECT_EQ("", connect_cluster_pp(c
->cluster
));
119 EXPECT_EQ(0, c
->cluster
.conf_set("rbd_mirror_leader_heartbeat_interval",
121 } else if (is_librados_test_stub(*_rados
)) {
122 EXPECT_EQ(0, c
->cluster
.conf_set("rbd_mirror_leader_heartbeat_interval",
125 EXPECT_EQ(0, c
->cluster
.ioctx_create(_local_pool_name
.c_str(), c
->io_ctx
));
131 TEST_F(TestLeaderWatcher
, InitShutdown
)
134 LeaderWatcher
<> leader_watcher(m_threads
, m_local_io_ctx
, &listener
);
136 C_SaferCond on_init_acquire
;
137 listener
.on_acquire(0, &on_init_acquire
);
138 ASSERT_EQ(0, leader_watcher
.init());
139 ASSERT_EQ(0, on_init_acquire
.wait());
140 ASSERT_TRUE(leader_watcher
.is_leader());
142 leader_watcher
.shut_down();
143 ASSERT_EQ(1, listener
.acquire_count());
144 ASSERT_EQ(1, listener
.release_count());
145 ASSERT_FALSE(leader_watcher
.is_leader());
148 TEST_F(TestLeaderWatcher
, Release
)
151 LeaderWatcher
<> leader_watcher(m_threads
, m_local_io_ctx
, &listener
);
153 C_SaferCond on_init_acquire
;
154 listener
.on_acquire(0, &on_init_acquire
);
155 ASSERT_EQ(0, leader_watcher
.init());
156 ASSERT_EQ(0, on_init_acquire
.wait());
157 ASSERT_TRUE(leader_watcher
.is_leader());
159 C_SaferCond on_release
;
160 C_SaferCond on_acquire
;
161 listener
.on_release(0, &on_release
);
162 listener
.on_acquire(0, &on_acquire
);
163 leader_watcher
.release_leader();
164 ASSERT_EQ(0, on_release
.wait());
165 ASSERT_FALSE(leader_watcher
.is_leader());
167 // wait for lock re-acquired due to no another locker
168 ASSERT_EQ(0, on_acquire
.wait());
169 ASSERT_TRUE(leader_watcher
.is_leader());
171 C_SaferCond on_release2
;
172 listener
.on_release(0, &on_release2
);
173 leader_watcher
.release_leader();
174 ASSERT_EQ(0, on_release2
.wait());
176 leader_watcher
.shut_down();
177 ASSERT_EQ(2, listener
.acquire_count());
178 ASSERT_EQ(2, listener
.release_count());
181 TEST_F(TestLeaderWatcher
, ListenerError
)
184 LeaderWatcher
<> leader_watcher(m_threads
, m_local_io_ctx
, &listener
);
186 // make listener return error on acquire
187 C_SaferCond on_init_acquire
, on_init_release
;
188 listener
.on_acquire(-EINVAL
, &on_init_acquire
);
189 listener
.on_release(0, &on_init_release
);
190 ASSERT_EQ(0, leader_watcher
.init());
191 ASSERT_EQ(0, on_init_acquire
.wait());
192 ASSERT_EQ(0, on_init_release
.wait());
193 ASSERT_FALSE(leader_watcher
.is_leader());
195 // wait for lock re-acquired due to no another locker
196 C_SaferCond on_acquire
;
197 listener
.on_acquire(0, &on_acquire
);
198 ASSERT_EQ(0, on_acquire
.wait());
199 ASSERT_TRUE(leader_watcher
.is_leader());
201 // make listener return error on release
202 C_SaferCond on_release
;
203 listener
.on_release(-EINVAL
, &on_release
);
204 leader_watcher
.release_leader();
205 ASSERT_EQ(0, on_release
.wait());
206 ASSERT_FALSE(leader_watcher
.is_leader());
208 leader_watcher
.shut_down();
209 ASSERT_EQ(2, listener
.acquire_count());
210 ASSERT_EQ(2, listener
.release_count());
211 ASSERT_FALSE(leader_watcher
.is_leader());
214 TEST_F(TestLeaderWatcher
, Two
)
217 LeaderWatcher
<> leader_watcher1(m_threads
, create_connection(), &listener1
);
219 C_SaferCond on_init_acquire
;
220 listener1
.on_acquire(0, &on_init_acquire
);
221 ASSERT_EQ(0, leader_watcher1
.init());
222 ASSERT_EQ(0, on_init_acquire
.wait());
225 LeaderWatcher
<> leader_watcher2(m_threads
, create_connection(), &listener2
);
227 ASSERT_EQ(0, leader_watcher2
.init());
228 ASSERT_TRUE(leader_watcher1
.is_leader());
229 ASSERT_FALSE(leader_watcher2
.is_leader());
231 C_SaferCond on_release
;
232 C_SaferCond on_acquire
;
233 listener1
.on_release(0, &on_release
);
234 listener2
.on_acquire(0, &on_acquire
);
235 leader_watcher1
.release_leader();
236 ASSERT_EQ(0, on_release
.wait());
237 ASSERT_FALSE(leader_watcher1
.is_leader());
239 // wait for lock acquired by another watcher
240 ASSERT_EQ(0, on_acquire
.wait());
241 ASSERT_TRUE(leader_watcher2
.is_leader());
243 leader_watcher1
.shut_down();
244 leader_watcher2
.shut_down();
246 ASSERT_EQ(1, listener1
.acquire_count());
247 ASSERT_EQ(1, listener1
.release_count());
248 ASSERT_EQ(1, listener2
.acquire_count());
249 ASSERT_EQ(1, listener2
.release_count());
252 TEST_F(TestLeaderWatcher
, Break
)
254 Listener listener1
, listener2
;
255 LeaderWatcher
<> leader_watcher1(m_threads
,
256 create_connection(true /* no heartbeats */),
258 LeaderWatcher
<> leader_watcher2(m_threads
, create_connection(), &listener2
);
260 C_SaferCond on_init_acquire
;
261 listener1
.on_acquire(0, &on_init_acquire
);
262 ASSERT_EQ(0, leader_watcher1
.init());
263 ASSERT_EQ(0, on_init_acquire
.wait());
265 C_SaferCond on_acquire
;
266 listener2
.on_acquire(0, &on_acquire
);
267 ASSERT_EQ(0, leader_watcher2
.init());
268 ASSERT_FALSE(leader_watcher2
.is_leader());
270 // wait for lock broken due to no heartbeats and re-acquired
271 ASSERT_EQ(0, on_acquire
.wait());
272 ASSERT_TRUE(leader_watcher2
.is_leader());
274 leader_watcher1
.shut_down();
275 leader_watcher2
.shut_down();
278 TEST_F(TestLeaderWatcher
, Stress
)
280 const int WATCHERS_COUNT
= 20;
281 std::list
<LeaderWatcher
<> *> leader_watchers
;
284 for (int i
= 0; i
< WATCHERS_COUNT
; i
++) {
285 auto leader_watcher
=
286 new LeaderWatcher
<>(m_threads
, create_connection(), &listener
);
287 leader_watchers
.push_back(leader_watcher
);
290 C_SaferCond on_init_acquire
;
291 listener
.on_acquire(0, &on_init_acquire
);
292 for (auto &leader_watcher
: leader_watchers
) {
293 ASSERT_EQ(0, leader_watcher
->init());
295 ASSERT_EQ(0, on_init_acquire
.wait());
298 C_SaferCond on_acquire
;
299 listener
.on_acquire(0, &on_acquire
);
300 std::unique_ptr
<LeaderWatcher
<> > leader_watcher
;
301 for (auto it
= leader_watchers
.begin(); it
!= leader_watchers
.end(); ) {
302 if ((*it
)->is_leader()) {
303 ASSERT_FALSE(leader_watcher
);
304 leader_watcher
.reset(*it
);
305 it
= leader_watchers
.erase(it
);
311 ASSERT_TRUE(leader_watcher
);
312 leader_watcher
->shut_down();
313 if (leader_watchers
.empty()) {
316 ASSERT_EQ(0, on_acquire
.wait());