]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "include/rados/librados.hpp" | |
5 | #include "librbd/internal.h" | |
6 | #include "librbd/Utils.h" | |
7 | #include "librbd/api/Mirror.h" | |
8 | #include "test/librbd/test_support.h" | |
9 | #include "test/rbd_mirror/test_fixture.h" | |
10 | #include "tools/rbd_mirror/LeaderWatcher.h" | |
11 | #include "tools/rbd_mirror/Threads.h" | |
11fdf7f2 | 12 | #include "common/Cond.h" |
7c673cae | 13 | |
11fdf7f2 | 14 | #include "test/librados/test_cxx.h" |
7c673cae FG |
15 | #include "gtest/gtest.h" |
16 | ||
17 | using librbd::util::unique_lock_name; | |
18 | using rbd::mirror::LeaderWatcher; | |
19 | ||
20 | void register_test_leader_watcher() { | |
21 | } | |
22 | ||
23 | class TestLeaderWatcher : public ::rbd::mirror::TestFixture { | |
24 | public: | |
11fdf7f2 | 25 | class Listener : public rbd::mirror::leader_watcher::Listener { |
7c673cae FG |
26 | public: |
27 | Listener() | |
9f95a23c TL |
28 | : m_test_lock(ceph::make_mutex( |
29 | unique_lock_name("LeaderWatcher::m_test_lock", this))) { | |
7c673cae FG |
30 | } |
31 | ||
32 | void on_acquire(int r, Context *ctx) { | |
9f95a23c | 33 | std::lock_guard locker{m_test_lock}; |
7c673cae FG |
34 | m_on_acquire_r = r; |
35 | m_on_acquire = ctx; | |
36 | } | |
37 | ||
38 | void on_release(int r, Context *ctx) { | |
9f95a23c | 39 | std::lock_guard locker{m_test_lock}; |
7c673cae FG |
40 | m_on_release_r = r; |
41 | m_on_release = ctx; | |
42 | } | |
43 | ||
44 | int acquire_count() const { | |
9f95a23c | 45 | std::lock_guard locker{m_test_lock}; |
7c673cae FG |
46 | return m_acquire_count; |
47 | } | |
48 | ||
49 | int release_count() const { | |
9f95a23c | 50 | std::lock_guard locker{m_test_lock}; |
7c673cae FG |
51 | return m_release_count; |
52 | } | |
53 | ||
54 | void post_acquire_handler(Context *on_finish) override { | |
9f95a23c | 55 | std::lock_guard locker{m_test_lock}; |
7c673cae FG |
56 | m_acquire_count++; |
57 | on_finish->complete(m_on_acquire_r); | |
58 | m_on_acquire_r = 0; | |
59 | if (m_on_acquire != nullptr) { | |
60 | m_on_acquire->complete(0); | |
61 | m_on_acquire = nullptr; | |
62 | } | |
63 | } | |
64 | ||
65 | void pre_release_handler(Context *on_finish) override { | |
9f95a23c | 66 | std::lock_guard locker{m_test_lock}; |
7c673cae FG |
67 | m_release_count++; |
68 | on_finish->complete(m_on_release_r); | |
69 | m_on_release_r = 0; | |
70 | if (m_on_release != nullptr) { | |
71 | m_on_release->complete(0); | |
72 | m_on_release = nullptr; | |
73 | } | |
74 | } | |
75 | ||
31f18b77 FG |
76 | void update_leader_handler(const std::string &leader_instance_id) override { |
77 | } | |
78 | ||
11fdf7f2 TL |
79 | void handle_instances_added(const InstanceIds& instance_ids) override { |
80 | } | |
81 | void handle_instances_removed(const InstanceIds& instance_ids) override { | |
82 | } | |
83 | ||
7c673cae | 84 | private: |
9f95a23c | 85 | mutable ceph::mutex m_test_lock; |
7c673cae FG |
86 | int m_acquire_count = 0; |
87 | int m_release_count = 0; | |
88 | int m_on_acquire_r = 0; | |
89 | int m_on_release_r = 0; | |
90 | Context *m_on_acquire = nullptr; | |
91 | Context *m_on_release = nullptr; | |
92 | }; | |
93 | ||
94 | struct Connection { | |
95 | librados::Rados cluster; | |
96 | librados::IoCtx io_ctx; | |
97 | }; | |
98 | ||
99 | std::list<std::unique_ptr<Connection> > m_connections; | |
100 | ||
101 | void SetUp() override { | |
102 | TestFixture::SetUp(); | |
103 | EXPECT_EQ(0, librbd::api::Mirror<>::mode_set(m_local_io_ctx, | |
104 | RBD_MIRROR_MODE_POOL)); | |
105 | ||
11fdf7f2 | 106 | if (is_librados_test_stub(*_rados)) { |
7c673cae FG |
107 | // speed testing up a little |
108 | EXPECT_EQ(0, _rados->conf_set("rbd_mirror_leader_heartbeat_interval", | |
109 | "1")); | |
110 | } | |
111 | } | |
112 | ||
7c673cae FG |
113 | librados::IoCtx &create_connection(bool no_heartbeats = false) { |
114 | m_connections.push_back(std::unique_ptr<Connection>(new Connection())); | |
115 | Connection *c = m_connections.back().get(); | |
116 | ||
117 | EXPECT_EQ("", connect_cluster_pp(c->cluster)); | |
118 | if (no_heartbeats) { | |
119 | EXPECT_EQ(0, c->cluster.conf_set("rbd_mirror_leader_heartbeat_interval", | |
120 | "3600")); | |
11fdf7f2 | 121 | } else if (is_librados_test_stub(*_rados)) { |
7c673cae FG |
122 | EXPECT_EQ(0, c->cluster.conf_set("rbd_mirror_leader_heartbeat_interval", |
123 | "1")); | |
124 | } | |
125 | EXPECT_EQ(0, c->cluster.ioctx_create(_local_pool_name.c_str(), c->io_ctx)); | |
126 | ||
127 | return c->io_ctx; | |
128 | } | |
129 | }; | |
130 | ||
131 | TEST_F(TestLeaderWatcher, InitShutdown) | |
132 | { | |
133 | Listener listener; | |
134 | LeaderWatcher<> leader_watcher(m_threads, m_local_io_ctx, &listener); | |
135 | ||
136 | C_SaferCond on_init_acquire; | |
137 | listener.on_acquire(0, &on_init_acquire); | |
138 | ASSERT_EQ(0, leader_watcher.init()); | |
139 | ASSERT_EQ(0, on_init_acquire.wait()); | |
140 | ASSERT_TRUE(leader_watcher.is_leader()); | |
141 | ||
142 | leader_watcher.shut_down(); | |
143 | ASSERT_EQ(1, listener.acquire_count()); | |
144 | ASSERT_EQ(1, listener.release_count()); | |
145 | ASSERT_FALSE(leader_watcher.is_leader()); | |
146 | } | |
147 | ||
148 | TEST_F(TestLeaderWatcher, Release) | |
149 | { | |
150 | Listener listener; | |
151 | LeaderWatcher<> leader_watcher(m_threads, m_local_io_ctx, &listener); | |
152 | ||
153 | C_SaferCond on_init_acquire; | |
154 | listener.on_acquire(0, &on_init_acquire); | |
155 | ASSERT_EQ(0, leader_watcher.init()); | |
156 | ASSERT_EQ(0, on_init_acquire.wait()); | |
157 | ASSERT_TRUE(leader_watcher.is_leader()); | |
158 | ||
159 | C_SaferCond on_release; | |
160 | C_SaferCond on_acquire; | |
161 | listener.on_release(0, &on_release); | |
162 | listener.on_acquire(0, &on_acquire); | |
163 | leader_watcher.release_leader(); | |
164 | ASSERT_EQ(0, on_release.wait()); | |
165 | ASSERT_FALSE(leader_watcher.is_leader()); | |
166 | ||
167 | // wait for lock re-acquired due to no another locker | |
168 | ASSERT_EQ(0, on_acquire.wait()); | |
169 | ASSERT_TRUE(leader_watcher.is_leader()); | |
170 | ||
171 | C_SaferCond on_release2; | |
172 | listener.on_release(0, &on_release2); | |
173 | leader_watcher.release_leader(); | |
174 | ASSERT_EQ(0, on_release2.wait()); | |
175 | ||
176 | leader_watcher.shut_down(); | |
177 | ASSERT_EQ(2, listener.acquire_count()); | |
178 | ASSERT_EQ(2, listener.release_count()); | |
179 | } | |
180 | ||
181 | TEST_F(TestLeaderWatcher, ListenerError) | |
182 | { | |
183 | Listener listener; | |
184 | LeaderWatcher<> leader_watcher(m_threads, m_local_io_ctx, &listener); | |
185 | ||
186 | // make listener return error on acquire | |
187 | C_SaferCond on_init_acquire, on_init_release; | |
188 | listener.on_acquire(-EINVAL, &on_init_acquire); | |
189 | listener.on_release(0, &on_init_release); | |
190 | ASSERT_EQ(0, leader_watcher.init()); | |
191 | ASSERT_EQ(0, on_init_acquire.wait()); | |
192 | ASSERT_EQ(0, on_init_release.wait()); | |
193 | ASSERT_FALSE(leader_watcher.is_leader()); | |
194 | ||
195 | // wait for lock re-acquired due to no another locker | |
196 | C_SaferCond on_acquire; | |
197 | listener.on_acquire(0, &on_acquire); | |
198 | ASSERT_EQ(0, on_acquire.wait()); | |
199 | ASSERT_TRUE(leader_watcher.is_leader()); | |
200 | ||
201 | // make listener return error on release | |
202 | C_SaferCond on_release; | |
203 | listener.on_release(-EINVAL, &on_release); | |
204 | leader_watcher.release_leader(); | |
205 | ASSERT_EQ(0, on_release.wait()); | |
206 | ASSERT_FALSE(leader_watcher.is_leader()); | |
207 | ||
208 | leader_watcher.shut_down(); | |
209 | ASSERT_EQ(2, listener.acquire_count()); | |
210 | ASSERT_EQ(2, listener.release_count()); | |
211 | ASSERT_FALSE(leader_watcher.is_leader()); | |
212 | } | |
213 | ||
214 | TEST_F(TestLeaderWatcher, Two) | |
215 | { | |
216 | Listener listener1; | |
217 | LeaderWatcher<> leader_watcher1(m_threads, create_connection(), &listener1); | |
218 | ||
219 | C_SaferCond on_init_acquire; | |
220 | listener1.on_acquire(0, &on_init_acquire); | |
221 | ASSERT_EQ(0, leader_watcher1.init()); | |
222 | ASSERT_EQ(0, on_init_acquire.wait()); | |
223 | ||
224 | Listener listener2; | |
225 | LeaderWatcher<> leader_watcher2(m_threads, create_connection(), &listener2); | |
226 | ||
227 | ASSERT_EQ(0, leader_watcher2.init()); | |
228 | ASSERT_TRUE(leader_watcher1.is_leader()); | |
229 | ASSERT_FALSE(leader_watcher2.is_leader()); | |
230 | ||
231 | C_SaferCond on_release; | |
232 | C_SaferCond on_acquire; | |
233 | listener1.on_release(0, &on_release); | |
234 | listener2.on_acquire(0, &on_acquire); | |
235 | leader_watcher1.release_leader(); | |
236 | ASSERT_EQ(0, on_release.wait()); | |
237 | ASSERT_FALSE(leader_watcher1.is_leader()); | |
238 | ||
239 | // wait for lock acquired by another watcher | |
240 | ASSERT_EQ(0, on_acquire.wait()); | |
241 | ASSERT_TRUE(leader_watcher2.is_leader()); | |
242 | ||
243 | leader_watcher1.shut_down(); | |
244 | leader_watcher2.shut_down(); | |
245 | ||
246 | ASSERT_EQ(1, listener1.acquire_count()); | |
247 | ASSERT_EQ(1, listener1.release_count()); | |
248 | ASSERT_EQ(1, listener2.acquire_count()); | |
249 | ASSERT_EQ(1, listener2.release_count()); | |
250 | } | |
251 | ||
252 | TEST_F(TestLeaderWatcher, Break) | |
253 | { | |
254 | Listener listener1, listener2; | |
255 | LeaderWatcher<> leader_watcher1(m_threads, | |
256 | create_connection(true /* no heartbeats */), | |
257 | &listener1); | |
258 | LeaderWatcher<> leader_watcher2(m_threads, create_connection(), &listener2); | |
259 | ||
260 | C_SaferCond on_init_acquire; | |
261 | listener1.on_acquire(0, &on_init_acquire); | |
262 | ASSERT_EQ(0, leader_watcher1.init()); | |
263 | ASSERT_EQ(0, on_init_acquire.wait()); | |
264 | ||
265 | C_SaferCond on_acquire; | |
266 | listener2.on_acquire(0, &on_acquire); | |
267 | ASSERT_EQ(0, leader_watcher2.init()); | |
268 | ASSERT_FALSE(leader_watcher2.is_leader()); | |
269 | ||
270 | // wait for lock broken due to no heartbeats and re-acquired | |
271 | ASSERT_EQ(0, on_acquire.wait()); | |
272 | ASSERT_TRUE(leader_watcher2.is_leader()); | |
273 | ||
274 | leader_watcher1.shut_down(); | |
275 | leader_watcher2.shut_down(); | |
276 | } | |
277 | ||
278 | TEST_F(TestLeaderWatcher, Stress) | |
279 | { | |
280 | const int WATCHERS_COUNT = 20; | |
281 | std::list<LeaderWatcher<> *> leader_watchers; | |
282 | Listener listener; | |
283 | ||
284 | for (int i = 0; i < WATCHERS_COUNT; i++) { | |
285 | auto leader_watcher = | |
286 | new LeaderWatcher<>(m_threads, create_connection(), &listener); | |
287 | leader_watchers.push_back(leader_watcher); | |
288 | } | |
289 | ||
290 | C_SaferCond on_init_acquire; | |
291 | listener.on_acquire(0, &on_init_acquire); | |
292 | for (auto &leader_watcher : leader_watchers) { | |
293 | ASSERT_EQ(0, leader_watcher->init()); | |
294 | } | |
295 | ASSERT_EQ(0, on_init_acquire.wait()); | |
296 | ||
297 | while (true) { | |
298 | C_SaferCond on_acquire; | |
299 | listener.on_acquire(0, &on_acquire); | |
300 | std::unique_ptr<LeaderWatcher<> > leader_watcher; | |
301 | for (auto it = leader_watchers.begin(); it != leader_watchers.end(); ) { | |
302 | if ((*it)->is_leader()) { | |
303 | ASSERT_FALSE(leader_watcher); | |
304 | leader_watcher.reset(*it); | |
305 | it = leader_watchers.erase(it); | |
306 | } else { | |
307 | it++; | |
308 | } | |
309 | } | |
310 | ||
311 | ASSERT_TRUE(leader_watcher); | |
312 | leader_watcher->shut_down(); | |
313 | if (leader_watchers.empty()) { | |
314 | break; | |
315 | } | |
316 | ASSERT_EQ(0, on_acquire.wait()); | |
317 | } | |
318 | } |