1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/rados/librados.hpp"
5 #include "librbd/internal.h"
6 #include "librbd/Utils.h"
7 #include "librbd/api/Mirror.h"
8 #include "test/librbd/test_support.h"
9 #include "test/rbd_mirror/test_fixture.h"
10 #include "tools/rbd_mirror/LeaderWatcher.h"
11 #include "tools/rbd_mirror/Threads.h"
13 #include "test/librados/test.h"
14 #include "gtest/gtest.h"
16 using librbd::util::unique_lock_name
;
17 using rbd::mirror::LeaderWatcher
;
19 void register_test_leader_watcher() {
22 class TestLeaderWatcher
: public ::rbd::mirror::TestFixture
{
24 class Listener
: public rbd::mirror::LeaderWatcher
<>::Listener
{
27 : m_test_lock(unique_lock_name("LeaderWatcher::m_test_lock", this)) {
30 void on_acquire(int r
, Context
*ctx
) {
31 Mutex::Locker
locker(m_test_lock
);
36 void on_release(int r
, Context
*ctx
) {
37 Mutex::Locker
locker(m_test_lock
);
42 int acquire_count() const {
43 Mutex::Locker
locker(m_test_lock
);
44 return m_acquire_count
;
47 int release_count() const {
48 Mutex::Locker
locker(m_test_lock
);
49 return m_release_count
;
52 void post_acquire_handler(Context
*on_finish
) override
{
53 Mutex::Locker
locker(m_test_lock
);
55 on_finish
->complete(m_on_acquire_r
);
57 if (m_on_acquire
!= nullptr) {
58 m_on_acquire
->complete(0);
59 m_on_acquire
= nullptr;
63 void pre_release_handler(Context
*on_finish
) override
{
64 Mutex::Locker
locker(m_test_lock
);
66 on_finish
->complete(m_on_release_r
);
68 if (m_on_release
!= nullptr) {
69 m_on_release
->complete(0);
70 m_on_release
= nullptr;
75 mutable Mutex m_test_lock
;
76 int m_acquire_count
= 0;
77 int m_release_count
= 0;
78 int m_on_acquire_r
= 0;
79 int m_on_release_r
= 0;
80 Context
*m_on_acquire
= nullptr;
81 Context
*m_on_release
= nullptr;
85 librados::Rados cluster
;
86 librados::IoCtx io_ctx
;
89 std::list
<std::unique_ptr
<Connection
> > m_connections
;
91 void SetUp() override
{
93 EXPECT_EQ(0, librbd::api::Mirror
<>::mode_set(m_local_io_ctx
,
94 RBD_MIRROR_MODE_POOL
));
96 if (is_librados_test_stub()) {
97 // speed testing up a little
98 EXPECT_EQ(0, _rados
->conf_set("rbd_mirror_leader_heartbeat_interval",
103 bool is_librados_test_stub() {
105 EXPECT_EQ(0, _rados
->cluster_fsid(&fsid
));
106 return fsid
== "00000000-1111-2222-3333-444444444444";
109 librados::IoCtx
&create_connection(bool no_heartbeats
= false) {
110 m_connections
.push_back(std::unique_ptr
<Connection
>(new Connection()));
111 Connection
*c
= m_connections
.back().get();
113 EXPECT_EQ("", connect_cluster_pp(c
->cluster
));
115 EXPECT_EQ(0, c
->cluster
.conf_set("rbd_mirror_leader_heartbeat_interval",
117 } else if (is_librados_test_stub()) {
118 EXPECT_EQ(0, c
->cluster
.conf_set("rbd_mirror_leader_heartbeat_interval",
121 EXPECT_EQ(0, c
->cluster
.ioctx_create(_local_pool_name
.c_str(), c
->io_ctx
));
127 TEST_F(TestLeaderWatcher
, InitShutdown
)
130 LeaderWatcher
<> leader_watcher(m_threads
, m_local_io_ctx
, &listener
);
132 C_SaferCond on_init_acquire
;
133 listener
.on_acquire(0, &on_init_acquire
);
134 ASSERT_EQ(0, leader_watcher
.init());
135 ASSERT_EQ(0, on_init_acquire
.wait());
136 ASSERT_TRUE(leader_watcher
.is_leader());
138 leader_watcher
.shut_down();
139 ASSERT_EQ(1, listener
.acquire_count());
140 ASSERT_EQ(1, listener
.release_count());
141 ASSERT_FALSE(leader_watcher
.is_leader());
144 TEST_F(TestLeaderWatcher
, Release
)
147 LeaderWatcher
<> leader_watcher(m_threads
, m_local_io_ctx
, &listener
);
149 C_SaferCond on_init_acquire
;
150 listener
.on_acquire(0, &on_init_acquire
);
151 ASSERT_EQ(0, leader_watcher
.init());
152 ASSERT_EQ(0, on_init_acquire
.wait());
153 ASSERT_TRUE(leader_watcher
.is_leader());
155 C_SaferCond on_release
;
156 C_SaferCond on_acquire
;
157 listener
.on_release(0, &on_release
);
158 listener
.on_acquire(0, &on_acquire
);
159 leader_watcher
.release_leader();
160 ASSERT_EQ(0, on_release
.wait());
161 ASSERT_FALSE(leader_watcher
.is_leader());
163 // wait for lock re-acquired due to no another locker
164 ASSERT_EQ(0, on_acquire
.wait());
165 ASSERT_TRUE(leader_watcher
.is_leader());
167 C_SaferCond on_release2
;
168 listener
.on_release(0, &on_release2
);
169 leader_watcher
.release_leader();
170 ASSERT_EQ(0, on_release2
.wait());
172 leader_watcher
.shut_down();
173 ASSERT_EQ(2, listener
.acquire_count());
174 ASSERT_EQ(2, listener
.release_count());
177 TEST_F(TestLeaderWatcher
, ListenerError
)
180 LeaderWatcher
<> leader_watcher(m_threads
, m_local_io_ctx
, &listener
);
182 // make listener return error on acquire
183 C_SaferCond on_init_acquire
, on_init_release
;
184 listener
.on_acquire(-EINVAL
, &on_init_acquire
);
185 listener
.on_release(0, &on_init_release
);
186 ASSERT_EQ(0, leader_watcher
.init());
187 ASSERT_EQ(0, on_init_acquire
.wait());
188 ASSERT_EQ(0, on_init_release
.wait());
189 ASSERT_FALSE(leader_watcher
.is_leader());
191 // wait for lock re-acquired due to no another locker
192 C_SaferCond on_acquire
;
193 listener
.on_acquire(0, &on_acquire
);
194 ASSERT_EQ(0, on_acquire
.wait());
195 ASSERT_TRUE(leader_watcher
.is_leader());
197 // make listener return error on release
198 C_SaferCond on_release
;
199 listener
.on_release(-EINVAL
, &on_release
);
200 leader_watcher
.release_leader();
201 ASSERT_EQ(0, on_release
.wait());
202 ASSERT_FALSE(leader_watcher
.is_leader());
204 leader_watcher
.shut_down();
205 ASSERT_EQ(2, listener
.acquire_count());
206 ASSERT_EQ(2, listener
.release_count());
207 ASSERT_FALSE(leader_watcher
.is_leader());
210 TEST_F(TestLeaderWatcher
, Two
)
213 LeaderWatcher
<> leader_watcher1(m_threads
, create_connection(), &listener1
);
215 C_SaferCond on_init_acquire
;
216 listener1
.on_acquire(0, &on_init_acquire
);
217 ASSERT_EQ(0, leader_watcher1
.init());
218 ASSERT_EQ(0, on_init_acquire
.wait());
221 LeaderWatcher
<> leader_watcher2(m_threads
, create_connection(), &listener2
);
223 ASSERT_EQ(0, leader_watcher2
.init());
224 ASSERT_TRUE(leader_watcher1
.is_leader());
225 ASSERT_FALSE(leader_watcher2
.is_leader());
227 C_SaferCond on_release
;
228 C_SaferCond on_acquire
;
229 listener1
.on_release(0, &on_release
);
230 listener2
.on_acquire(0, &on_acquire
);
231 leader_watcher1
.release_leader();
232 ASSERT_EQ(0, on_release
.wait());
233 ASSERT_FALSE(leader_watcher1
.is_leader());
235 // wait for lock acquired by another watcher
236 ASSERT_EQ(0, on_acquire
.wait());
237 ASSERT_TRUE(leader_watcher2
.is_leader());
239 leader_watcher1
.shut_down();
240 leader_watcher2
.shut_down();
242 ASSERT_EQ(1, listener1
.acquire_count());
243 ASSERT_EQ(1, listener1
.release_count());
244 ASSERT_EQ(1, listener2
.acquire_count());
245 ASSERT_EQ(1, listener2
.release_count());
248 TEST_F(TestLeaderWatcher
, Break
)
250 Listener listener1
, listener2
;
251 LeaderWatcher
<> leader_watcher1(m_threads
,
252 create_connection(true /* no heartbeats */),
254 LeaderWatcher
<> leader_watcher2(m_threads
, create_connection(), &listener2
);
256 C_SaferCond on_init_acquire
;
257 listener1
.on_acquire(0, &on_init_acquire
);
258 ASSERT_EQ(0, leader_watcher1
.init());
259 ASSERT_EQ(0, on_init_acquire
.wait());
261 C_SaferCond on_acquire
;
262 listener2
.on_acquire(0, &on_acquire
);
263 ASSERT_EQ(0, leader_watcher2
.init());
264 ASSERT_FALSE(leader_watcher2
.is_leader());
266 // wait for lock broken due to no heartbeats and re-acquired
267 ASSERT_EQ(0, on_acquire
.wait());
268 ASSERT_TRUE(leader_watcher2
.is_leader());
270 leader_watcher1
.shut_down();
271 leader_watcher2
.shut_down();
274 TEST_F(TestLeaderWatcher
, Stress
)
276 const int WATCHERS_COUNT
= 20;
277 std::list
<LeaderWatcher
<> *> leader_watchers
;
280 for (int i
= 0; i
< WATCHERS_COUNT
; i
++) {
281 auto leader_watcher
=
282 new LeaderWatcher
<>(m_threads
, create_connection(), &listener
);
283 leader_watchers
.push_back(leader_watcher
);
286 C_SaferCond on_init_acquire
;
287 listener
.on_acquire(0, &on_init_acquire
);
288 for (auto &leader_watcher
: leader_watchers
) {
289 ASSERT_EQ(0, leader_watcher
->init());
291 ASSERT_EQ(0, on_init_acquire
.wait());
294 C_SaferCond on_acquire
;
295 listener
.on_acquire(0, &on_acquire
);
296 std::unique_ptr
<LeaderWatcher
<> > leader_watcher
;
297 for (auto it
= leader_watchers
.begin(); it
!= leader_watchers
.end(); ) {
298 if ((*it
)->is_leader()) {
299 ASSERT_FALSE(leader_watcher
);
300 leader_watcher
.reset(*it
);
301 it
= leader_watchers
.erase(it
);
307 ASSERT_TRUE(leader_watcher
);
308 leader_watcher
->shut_down();
309 if (leader_watchers
.empty()) {
312 ASSERT_EQ(0, on_acquire
.wait());