]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/rbd_mirror/test_LeaderWatcher.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / test / rbd_mirror / test_LeaderWatcher.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/rados/librados.hpp"
5 #include "librbd/internal.h"
6 #include "librbd/Utils.h"
7 #include "librbd/api/Mirror.h"
8 #include "test/librbd/test_support.h"
9 #include "test/rbd_mirror/test_fixture.h"
10 #include "tools/rbd_mirror/LeaderWatcher.h"
11 #include "tools/rbd_mirror/Threads.h"
12 #include "common/Cond.h"
13
14 #include "test/librados/test_cxx.h"
15 #include "gtest/gtest.h"
16
17 using librbd::util::unique_lock_name;
18 using rbd::mirror::LeaderWatcher;
19
20 void register_test_leader_watcher() {
21 }
22
23 class TestLeaderWatcher : public ::rbd::mirror::TestFixture {
24 public:
25 class Listener : public rbd::mirror::leader_watcher::Listener {
26 public:
27 Listener()
28 : m_test_lock(ceph::make_mutex(
29 unique_lock_name("LeaderWatcher::m_test_lock", this))) {
30 }
31
32 void on_acquire(int r, Context *ctx) {
33 std::lock_guard locker{m_test_lock};
34 m_on_acquire_r = r;
35 m_on_acquire = ctx;
36 }
37
38 void on_release(int r, Context *ctx) {
39 std::lock_guard locker{m_test_lock};
40 m_on_release_r = r;
41 m_on_release = ctx;
42 }
43
44 int acquire_count() const {
45 std::lock_guard locker{m_test_lock};
46 return m_acquire_count;
47 }
48
49 int release_count() const {
50 std::lock_guard locker{m_test_lock};
51 return m_release_count;
52 }
53
54 void post_acquire_handler(Context *on_finish) override {
55 std::lock_guard locker{m_test_lock};
56 m_acquire_count++;
57 on_finish->complete(m_on_acquire_r);
58 m_on_acquire_r = 0;
59 if (m_on_acquire != nullptr) {
60 m_on_acquire->complete(0);
61 m_on_acquire = nullptr;
62 }
63 }
64
65 void pre_release_handler(Context *on_finish) override {
66 std::lock_guard locker{m_test_lock};
67 m_release_count++;
68 on_finish->complete(m_on_release_r);
69 m_on_release_r = 0;
70 if (m_on_release != nullptr) {
71 m_on_release->complete(0);
72 m_on_release = nullptr;
73 }
74 }
75
76 void update_leader_handler(const std::string &leader_instance_id) override {
77 }
78
79 void handle_instances_added(const InstanceIds& instance_ids) override {
80 }
81 void handle_instances_removed(const InstanceIds& instance_ids) override {
82 }
83
84 private:
85 mutable ceph::mutex m_test_lock;
86 int m_acquire_count = 0;
87 int m_release_count = 0;
88 int m_on_acquire_r = 0;
89 int m_on_release_r = 0;
90 Context *m_on_acquire = nullptr;
91 Context *m_on_release = nullptr;
92 };
93
94 struct Connection {
95 librados::Rados cluster;
96 librados::IoCtx io_ctx;
97 };
98
99 std::list<std::unique_ptr<Connection> > m_connections;
100
101 void SetUp() override {
102 TestFixture::SetUp();
103 EXPECT_EQ(0, librbd::api::Mirror<>::mode_set(m_local_io_ctx,
104 RBD_MIRROR_MODE_POOL));
105
106 if (is_librados_test_stub(*_rados)) {
107 // speed testing up a little
108 EXPECT_EQ(0, _rados->conf_set("rbd_mirror_leader_heartbeat_interval",
109 "1"));
110 }
111 }
112
113 librados::IoCtx &create_connection(bool no_heartbeats = false) {
114 m_connections.push_back(std::unique_ptr<Connection>(new Connection()));
115 Connection *c = m_connections.back().get();
116
117 EXPECT_EQ("", connect_cluster_pp(c->cluster));
118 if (no_heartbeats) {
119 EXPECT_EQ(0, c->cluster.conf_set("rbd_mirror_leader_heartbeat_interval",
120 "3600"));
121 } else if (is_librados_test_stub(*_rados)) {
122 EXPECT_EQ(0, c->cluster.conf_set("rbd_mirror_leader_heartbeat_interval",
123 "1"));
124 }
125 EXPECT_EQ(0, c->cluster.ioctx_create(_local_pool_name.c_str(), c->io_ctx));
126
127 return c->io_ctx;
128 }
129 };
130
131 TEST_F(TestLeaderWatcher, InitShutdown)
132 {
133 Listener listener;
134 LeaderWatcher<> leader_watcher(m_threads, m_local_io_ctx, &listener);
135
136 C_SaferCond on_init_acquire;
137 listener.on_acquire(0, &on_init_acquire);
138 ASSERT_EQ(0, leader_watcher.init());
139 ASSERT_EQ(0, on_init_acquire.wait());
140 ASSERT_TRUE(leader_watcher.is_leader());
141
142 leader_watcher.shut_down();
143 ASSERT_EQ(1, listener.acquire_count());
144 ASSERT_EQ(1, listener.release_count());
145 ASSERT_FALSE(leader_watcher.is_leader());
146 }
147
148 TEST_F(TestLeaderWatcher, Release)
149 {
150 Listener listener;
151 LeaderWatcher<> leader_watcher(m_threads, m_local_io_ctx, &listener);
152
153 C_SaferCond on_init_acquire;
154 listener.on_acquire(0, &on_init_acquire);
155 ASSERT_EQ(0, leader_watcher.init());
156 ASSERT_EQ(0, on_init_acquire.wait());
157 ASSERT_TRUE(leader_watcher.is_leader());
158
159 C_SaferCond on_release;
160 C_SaferCond on_acquire;
161 listener.on_release(0, &on_release);
162 listener.on_acquire(0, &on_acquire);
163 leader_watcher.release_leader();
164 ASSERT_EQ(0, on_release.wait());
165 ASSERT_FALSE(leader_watcher.is_leader());
166
167 // wait for lock re-acquired due to no another locker
168 ASSERT_EQ(0, on_acquire.wait());
169 ASSERT_TRUE(leader_watcher.is_leader());
170
171 C_SaferCond on_release2;
172 listener.on_release(0, &on_release2);
173 leader_watcher.release_leader();
174 ASSERT_EQ(0, on_release2.wait());
175
176 leader_watcher.shut_down();
177 ASSERT_EQ(2, listener.acquire_count());
178 ASSERT_EQ(2, listener.release_count());
179 }
180
181 TEST_F(TestLeaderWatcher, ListenerError)
182 {
183 Listener listener;
184 LeaderWatcher<> leader_watcher(m_threads, m_local_io_ctx, &listener);
185
186 // make listener return error on acquire
187 C_SaferCond on_init_acquire, on_init_release;
188 listener.on_acquire(-EINVAL, &on_init_acquire);
189 listener.on_release(0, &on_init_release);
190 ASSERT_EQ(0, leader_watcher.init());
191 ASSERT_EQ(0, on_init_acquire.wait());
192 ASSERT_EQ(0, on_init_release.wait());
193 ASSERT_FALSE(leader_watcher.is_leader());
194
195 // wait for lock re-acquired due to no another locker
196 C_SaferCond on_acquire;
197 listener.on_acquire(0, &on_acquire);
198 ASSERT_EQ(0, on_acquire.wait());
199 ASSERT_TRUE(leader_watcher.is_leader());
200
201 // make listener return error on release
202 C_SaferCond on_release;
203 listener.on_release(-EINVAL, &on_release);
204 leader_watcher.release_leader();
205 ASSERT_EQ(0, on_release.wait());
206 ASSERT_FALSE(leader_watcher.is_leader());
207
208 leader_watcher.shut_down();
209 ASSERT_EQ(2, listener.acquire_count());
210 ASSERT_EQ(2, listener.release_count());
211 ASSERT_FALSE(leader_watcher.is_leader());
212 }
213
214 TEST_F(TestLeaderWatcher, Two)
215 {
216 Listener listener1;
217 LeaderWatcher<> leader_watcher1(m_threads, create_connection(), &listener1);
218
219 C_SaferCond on_init_acquire;
220 listener1.on_acquire(0, &on_init_acquire);
221 ASSERT_EQ(0, leader_watcher1.init());
222 ASSERT_EQ(0, on_init_acquire.wait());
223
224 Listener listener2;
225 LeaderWatcher<> leader_watcher2(m_threads, create_connection(), &listener2);
226
227 ASSERT_EQ(0, leader_watcher2.init());
228 ASSERT_TRUE(leader_watcher1.is_leader());
229 ASSERT_FALSE(leader_watcher2.is_leader());
230
231 C_SaferCond on_release;
232 C_SaferCond on_acquire;
233 listener1.on_release(0, &on_release);
234 listener2.on_acquire(0, &on_acquire);
235 leader_watcher1.release_leader();
236 ASSERT_EQ(0, on_release.wait());
237 ASSERT_FALSE(leader_watcher1.is_leader());
238
239 // wait for lock acquired by another watcher
240 ASSERT_EQ(0, on_acquire.wait());
241 ASSERT_TRUE(leader_watcher2.is_leader());
242
243 leader_watcher1.shut_down();
244 leader_watcher2.shut_down();
245
246 ASSERT_EQ(1, listener1.acquire_count());
247 ASSERT_EQ(1, listener1.release_count());
248 ASSERT_EQ(1, listener2.acquire_count());
249 ASSERT_EQ(1, listener2.release_count());
250 }
251
252 TEST_F(TestLeaderWatcher, Break)
253 {
254 Listener listener1, listener2;
255 LeaderWatcher<> leader_watcher1(m_threads,
256 create_connection(true /* no heartbeats */),
257 &listener1);
258 LeaderWatcher<> leader_watcher2(m_threads, create_connection(), &listener2);
259
260 C_SaferCond on_init_acquire;
261 listener1.on_acquire(0, &on_init_acquire);
262 ASSERT_EQ(0, leader_watcher1.init());
263 ASSERT_EQ(0, on_init_acquire.wait());
264
265 C_SaferCond on_acquire;
266 listener2.on_acquire(0, &on_acquire);
267 ASSERT_EQ(0, leader_watcher2.init());
268 ASSERT_FALSE(leader_watcher2.is_leader());
269
270 // wait for lock broken due to no heartbeats and re-acquired
271 ASSERT_EQ(0, on_acquire.wait());
272 ASSERT_TRUE(leader_watcher2.is_leader());
273
274 leader_watcher1.shut_down();
275 leader_watcher2.shut_down();
276 }
277
278 TEST_F(TestLeaderWatcher, Stress)
279 {
280 const int WATCHERS_COUNT = 20;
281 std::list<LeaderWatcher<> *> leader_watchers;
282 Listener listener;
283
284 for (int i = 0; i < WATCHERS_COUNT; i++) {
285 auto leader_watcher =
286 new LeaderWatcher<>(m_threads, create_connection(), &listener);
287 leader_watchers.push_back(leader_watcher);
288 }
289
290 C_SaferCond on_init_acquire;
291 listener.on_acquire(0, &on_init_acquire);
292 for (auto &leader_watcher : leader_watchers) {
293 ASSERT_EQ(0, leader_watcher->init());
294 }
295 ASSERT_EQ(0, on_init_acquire.wait());
296
297 while (true) {
298 C_SaferCond on_acquire;
299 listener.on_acquire(0, &on_acquire);
300 std::unique_ptr<LeaderWatcher<> > leader_watcher;
301 for (auto it = leader_watchers.begin(); it != leader_watchers.end(); ) {
302 if ((*it)->is_leader()) {
303 ASSERT_FALSE(leader_watcher);
304 leader_watcher.reset(*it);
305 it = leader_watchers.erase(it);
306 } else {
307 it++;
308 }
309 }
310
311 ASSERT_TRUE(leader_watcher);
312 leader_watcher->shut_down();
313 if (leader_watchers.empty()) {
314 break;
315 }
316 ASSERT_EQ(0, on_acquire.wait());
317 }
318 }