]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/rbd_mirror/test_LeaderWatcher.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / test / rbd_mirror / test_LeaderWatcher.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/rados/librados.hpp"
5 #include "librbd/internal.h"
6 #include "librbd/Utils.h"
7 #include "librbd/api/Mirror.h"
8 #include "test/librbd/test_support.h"
9 #include "test/rbd_mirror/test_fixture.h"
10 #include "tools/rbd_mirror/LeaderWatcher.h"
11 #include "tools/rbd_mirror/Threads.h"
12
13 #include "test/librados/test.h"
14 #include "gtest/gtest.h"
15
16 using librbd::util::unique_lock_name;
17 using rbd::mirror::LeaderWatcher;
18
19 void register_test_leader_watcher() {
20 }
21
22 class TestLeaderWatcher : public ::rbd::mirror::TestFixture {
23 public:
24 class Listener : public rbd::mirror::LeaderWatcher<>::Listener {
25 public:
26 Listener()
27 : m_test_lock(unique_lock_name("LeaderWatcher::m_test_lock", this)) {
28 }
29
30 void on_acquire(int r, Context *ctx) {
31 Mutex::Locker locker(m_test_lock);
32 m_on_acquire_r = r;
33 m_on_acquire = ctx;
34 }
35
36 void on_release(int r, Context *ctx) {
37 Mutex::Locker locker(m_test_lock);
38 m_on_release_r = r;
39 m_on_release = ctx;
40 }
41
42 int acquire_count() const {
43 Mutex::Locker locker(m_test_lock);
44 return m_acquire_count;
45 }
46
47 int release_count() const {
48 Mutex::Locker locker(m_test_lock);
49 return m_release_count;
50 }
51
52 void post_acquire_handler(Context *on_finish) override {
53 Mutex::Locker locker(m_test_lock);
54 m_acquire_count++;
55 on_finish->complete(m_on_acquire_r);
56 m_on_acquire_r = 0;
57 if (m_on_acquire != nullptr) {
58 m_on_acquire->complete(0);
59 m_on_acquire = nullptr;
60 }
61 }
62
63 void pre_release_handler(Context *on_finish) override {
64 Mutex::Locker locker(m_test_lock);
65 m_release_count++;
66 on_finish->complete(m_on_release_r);
67 m_on_release_r = 0;
68 if (m_on_release != nullptr) {
69 m_on_release->complete(0);
70 m_on_release = nullptr;
71 }
72 }
73
74 private:
75 mutable Mutex m_test_lock;
76 int m_acquire_count = 0;
77 int m_release_count = 0;
78 int m_on_acquire_r = 0;
79 int m_on_release_r = 0;
80 Context *m_on_acquire = nullptr;
81 Context *m_on_release = nullptr;
82 };
83
84 struct Connection {
85 librados::Rados cluster;
86 librados::IoCtx io_ctx;
87 };
88
89 std::list<std::unique_ptr<Connection> > m_connections;
90
91 void SetUp() override {
92 TestFixture::SetUp();
93 EXPECT_EQ(0, librbd::api::Mirror<>::mode_set(m_local_io_ctx,
94 RBD_MIRROR_MODE_POOL));
95
96 if (is_librados_test_stub()) {
97 // speed testing up a little
98 EXPECT_EQ(0, _rados->conf_set("rbd_mirror_leader_heartbeat_interval",
99 "1"));
100 }
101 }
102
103 bool is_librados_test_stub() {
104 std::string fsid;
105 EXPECT_EQ(0, _rados->cluster_fsid(&fsid));
106 return fsid == "00000000-1111-2222-3333-444444444444";
107 }
108
109 librados::IoCtx &create_connection(bool no_heartbeats = false) {
110 m_connections.push_back(std::unique_ptr<Connection>(new Connection()));
111 Connection *c = m_connections.back().get();
112
113 EXPECT_EQ("", connect_cluster_pp(c->cluster));
114 if (no_heartbeats) {
115 EXPECT_EQ(0, c->cluster.conf_set("rbd_mirror_leader_heartbeat_interval",
116 "3600"));
117 } else if (is_librados_test_stub()) {
118 EXPECT_EQ(0, c->cluster.conf_set("rbd_mirror_leader_heartbeat_interval",
119 "1"));
120 }
121 EXPECT_EQ(0, c->cluster.ioctx_create(_local_pool_name.c_str(), c->io_ctx));
122
123 return c->io_ctx;
124 }
125 };
126
127 TEST_F(TestLeaderWatcher, InitShutdown)
128 {
129 Listener listener;
130 LeaderWatcher<> leader_watcher(m_threads, m_local_io_ctx, &listener);
131
132 C_SaferCond on_init_acquire;
133 listener.on_acquire(0, &on_init_acquire);
134 ASSERT_EQ(0, leader_watcher.init());
135 ASSERT_EQ(0, on_init_acquire.wait());
136 ASSERT_TRUE(leader_watcher.is_leader());
137
138 leader_watcher.shut_down();
139 ASSERT_EQ(1, listener.acquire_count());
140 ASSERT_EQ(1, listener.release_count());
141 ASSERT_FALSE(leader_watcher.is_leader());
142 }
143
144 TEST_F(TestLeaderWatcher, Release)
145 {
146 Listener listener;
147 LeaderWatcher<> leader_watcher(m_threads, m_local_io_ctx, &listener);
148
149 C_SaferCond on_init_acquire;
150 listener.on_acquire(0, &on_init_acquire);
151 ASSERT_EQ(0, leader_watcher.init());
152 ASSERT_EQ(0, on_init_acquire.wait());
153 ASSERT_TRUE(leader_watcher.is_leader());
154
155 C_SaferCond on_release;
156 C_SaferCond on_acquire;
157 listener.on_release(0, &on_release);
158 listener.on_acquire(0, &on_acquire);
159 leader_watcher.release_leader();
160 ASSERT_EQ(0, on_release.wait());
161 ASSERT_FALSE(leader_watcher.is_leader());
162
163 // wait for lock re-acquired due to no another locker
164 ASSERT_EQ(0, on_acquire.wait());
165 ASSERT_TRUE(leader_watcher.is_leader());
166
167 C_SaferCond on_release2;
168 listener.on_release(0, &on_release2);
169 leader_watcher.release_leader();
170 ASSERT_EQ(0, on_release2.wait());
171
172 leader_watcher.shut_down();
173 ASSERT_EQ(2, listener.acquire_count());
174 ASSERT_EQ(2, listener.release_count());
175 }
176
177 TEST_F(TestLeaderWatcher, ListenerError)
178 {
179 Listener listener;
180 LeaderWatcher<> leader_watcher(m_threads, m_local_io_ctx, &listener);
181
182 // make listener return error on acquire
183 C_SaferCond on_init_acquire, on_init_release;
184 listener.on_acquire(-EINVAL, &on_init_acquire);
185 listener.on_release(0, &on_init_release);
186 ASSERT_EQ(0, leader_watcher.init());
187 ASSERT_EQ(0, on_init_acquire.wait());
188 ASSERT_EQ(0, on_init_release.wait());
189 ASSERT_FALSE(leader_watcher.is_leader());
190
191 // wait for lock re-acquired due to no another locker
192 C_SaferCond on_acquire;
193 listener.on_acquire(0, &on_acquire);
194 ASSERT_EQ(0, on_acquire.wait());
195 ASSERT_TRUE(leader_watcher.is_leader());
196
197 // make listener return error on release
198 C_SaferCond on_release;
199 listener.on_release(-EINVAL, &on_release);
200 leader_watcher.release_leader();
201 ASSERT_EQ(0, on_release.wait());
202 ASSERT_FALSE(leader_watcher.is_leader());
203
204 leader_watcher.shut_down();
205 ASSERT_EQ(2, listener.acquire_count());
206 ASSERT_EQ(2, listener.release_count());
207 ASSERT_FALSE(leader_watcher.is_leader());
208 }
209
210 TEST_F(TestLeaderWatcher, Two)
211 {
212 Listener listener1;
213 LeaderWatcher<> leader_watcher1(m_threads, create_connection(), &listener1);
214
215 C_SaferCond on_init_acquire;
216 listener1.on_acquire(0, &on_init_acquire);
217 ASSERT_EQ(0, leader_watcher1.init());
218 ASSERT_EQ(0, on_init_acquire.wait());
219
220 Listener listener2;
221 LeaderWatcher<> leader_watcher2(m_threads, create_connection(), &listener2);
222
223 ASSERT_EQ(0, leader_watcher2.init());
224 ASSERT_TRUE(leader_watcher1.is_leader());
225 ASSERT_FALSE(leader_watcher2.is_leader());
226
227 C_SaferCond on_release;
228 C_SaferCond on_acquire;
229 listener1.on_release(0, &on_release);
230 listener2.on_acquire(0, &on_acquire);
231 leader_watcher1.release_leader();
232 ASSERT_EQ(0, on_release.wait());
233 ASSERT_FALSE(leader_watcher1.is_leader());
234
235 // wait for lock acquired by another watcher
236 ASSERT_EQ(0, on_acquire.wait());
237 ASSERT_TRUE(leader_watcher2.is_leader());
238
239 leader_watcher1.shut_down();
240 leader_watcher2.shut_down();
241
242 ASSERT_EQ(1, listener1.acquire_count());
243 ASSERT_EQ(1, listener1.release_count());
244 ASSERT_EQ(1, listener2.acquire_count());
245 ASSERT_EQ(1, listener2.release_count());
246 }
247
248 TEST_F(TestLeaderWatcher, Break)
249 {
250 Listener listener1, listener2;
251 LeaderWatcher<> leader_watcher1(m_threads,
252 create_connection(true /* no heartbeats */),
253 &listener1);
254 LeaderWatcher<> leader_watcher2(m_threads, create_connection(), &listener2);
255
256 C_SaferCond on_init_acquire;
257 listener1.on_acquire(0, &on_init_acquire);
258 ASSERT_EQ(0, leader_watcher1.init());
259 ASSERT_EQ(0, on_init_acquire.wait());
260
261 C_SaferCond on_acquire;
262 listener2.on_acquire(0, &on_acquire);
263 ASSERT_EQ(0, leader_watcher2.init());
264 ASSERT_FALSE(leader_watcher2.is_leader());
265
266 // wait for lock broken due to no heartbeats and re-acquired
267 ASSERT_EQ(0, on_acquire.wait());
268 ASSERT_TRUE(leader_watcher2.is_leader());
269
270 leader_watcher1.shut_down();
271 leader_watcher2.shut_down();
272 }
273
274 TEST_F(TestLeaderWatcher, Stress)
275 {
276 const int WATCHERS_COUNT = 20;
277 std::list<LeaderWatcher<> *> leader_watchers;
278 Listener listener;
279
280 for (int i = 0; i < WATCHERS_COUNT; i++) {
281 auto leader_watcher =
282 new LeaderWatcher<>(m_threads, create_connection(), &listener);
283 leader_watchers.push_back(leader_watcher);
284 }
285
286 C_SaferCond on_init_acquire;
287 listener.on_acquire(0, &on_init_acquire);
288 for (auto &leader_watcher : leader_watchers) {
289 ASSERT_EQ(0, leader_watcher->init());
290 }
291 ASSERT_EQ(0, on_init_acquire.wait());
292
293 while (true) {
294 C_SaferCond on_acquire;
295 listener.on_acquire(0, &on_acquire);
296 std::unique_ptr<LeaderWatcher<> > leader_watcher;
297 for (auto it = leader_watchers.begin(); it != leader_watchers.end(); ) {
298 if ((*it)->is_leader()) {
299 ASSERT_FALSE(leader_watcher);
300 leader_watcher.reset(*it);
301 it = leader_watchers.erase(it);
302 } else {
303 it++;
304 }
305 }
306
307 ASSERT_TRUE(leader_watcher);
308 leader_watcher->shut_down();
309 if (leader_watchers.empty()) {
310 break;
311 }
312 ASSERT_EQ(0, on_acquire.wait());
313 }
314 }