]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/rbd_mirror/test_mock_InstanceWatcher.cc
bdd56bfc39dfc3b85fffec9699972f552a083cab
[ceph.git] / ceph / src / test / rbd_mirror / test_mock_InstanceWatcher.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "librados/AioCompletionImpl.h"
5 #include "librbd/ManagedLock.h"
6 #include "test/librados/test.h"
7 #include "test/librados_test_stub/MockTestMemIoCtxImpl.h"
8 #include "test/librados_test_stub/MockTestMemRadosClient.h"
9 #include "test/librbd/mock/MockImageCtx.h"
10 #include "test/rbd_mirror/test_mock_fixture.h"
11 #include "tools/rbd_mirror/InstanceReplayer.h"
12 #include "tools/rbd_mirror/ImageSyncThrottler.h"
13 #include "tools/rbd_mirror/InstanceWatcher.h"
14 #include "tools/rbd_mirror/Threads.h"
15
16 namespace librbd {
17
18 namespace {
19
20 struct MockTestImageCtx : public MockImageCtx {
21 MockTestImageCtx(librbd::ImageCtx &image_ctx)
22 : librbd::MockImageCtx(image_ctx) {
23 }
24 };
25
26 } // anonymous namespace
27
28 template <>
29 struct ManagedLock<MockTestImageCtx> {
30 static ManagedLock* s_instance;
31
32 static ManagedLock *create(librados::IoCtx& ioctx, ContextWQ *work_queue,
33 const std::string& oid, librbd::Watcher *watcher,
34 managed_lock::Mode mode,
35 bool blacklist_on_break_lock,
36 uint32_t blacklist_expire_seconds) {
37 assert(s_instance != nullptr);
38 return s_instance;
39 }
40
41 ManagedLock() {
42 assert(s_instance == nullptr);
43 s_instance = this;
44 }
45
46 ~ManagedLock() {
47 assert(s_instance == this);
48 s_instance = nullptr;
49 }
50
51 MOCK_METHOD0(destroy, void());
52 MOCK_METHOD1(shut_down, void(Context *));
53 MOCK_METHOD1(acquire_lock, void(Context *));
54 MOCK_METHOD2(get_locker, void(managed_lock::Locker *, Context *));
55 MOCK_METHOD3(break_lock, void(const managed_lock::Locker &, bool, Context *));
56 };
57
58 ManagedLock<MockTestImageCtx> *ManagedLock<MockTestImageCtx>::s_instance = nullptr;
59
60 } // namespace librbd
61
62 namespace rbd {
63 namespace mirror {
64
65 template <>
66 struct Threads<librbd::MockTestImageCtx> {
67 Mutex &timer_lock;
68 SafeTimer *timer;
69 ContextWQ *work_queue;
70
71 Threads(Threads<librbd::ImageCtx> *threads)
72 : timer_lock(threads->timer_lock), timer(threads->timer),
73 work_queue(threads->work_queue) {
74 }
75 };
76
77 template <>
78 struct InstanceReplayer<librbd::MockTestImageCtx> {
79 MOCK_METHOD5(acquire_image, void(InstanceWatcher<librbd::MockTestImageCtx> *,
80 const std::string &, const std::string &,
81 const std::string &, Context *));
82 MOCK_METHOD5(release_image, void(const std::string &, const std::string &,
83 const std::string &, bool, Context *));
84 };
85
86 template <>
87 struct ImageSyncThrottler<librbd::MockTestImageCtx> {
88 static ImageSyncThrottler* s_instance;
89
90 static ImageSyncThrottler *create() {
91 assert(s_instance != nullptr);
92 return s_instance;
93 }
94
95 ImageSyncThrottler() {
96 assert(s_instance == nullptr);
97 s_instance = this;
98 }
99
100 virtual ~ImageSyncThrottler() {
101 assert(s_instance == this);
102 s_instance = nullptr;
103 }
104
105 MOCK_METHOD0(destroy, void());
106 MOCK_METHOD1(drain, void(int));
107 MOCK_METHOD2(start_op, void(const std::string &, Context *));
108 MOCK_METHOD1(finish_op, void(const std::string &));
109 };
110
111 ImageSyncThrottler<librbd::MockTestImageCtx>* ImageSyncThrottler<librbd::MockTestImageCtx>::s_instance = nullptr;
112
113 } // namespace mirror
114 } // namespace rbd
115
116 // template definitions
117 #include "tools/rbd_mirror/InstanceWatcher.cc"
118
119 namespace rbd {
120 namespace mirror {
121
122 using ::testing::_;
123 using ::testing::InSequence;
124 using ::testing::Invoke;
125 using ::testing::Return;
126 using ::testing::StrEq;
127 using ::testing::WithArg;
128
129 class TestMockInstanceWatcher : public TestMockFixture {
130 public:
131 typedef librbd::ManagedLock<librbd::MockTestImageCtx> MockManagedLock;
132 typedef InstanceReplayer<librbd::MockTestImageCtx> MockInstanceReplayer;
133 typedef InstanceWatcher<librbd::MockTestImageCtx> MockInstanceWatcher;
134 typedef Threads<librbd::MockTestImageCtx> MockThreads;
135
136 std::string m_instance_id;
137 std::string m_oid;
138 MockThreads *m_mock_threads;
139
140 void SetUp() override {
141 TestFixture::SetUp();
142 m_local_io_ctx.remove(RBD_MIRROR_LEADER);
143 EXPECT_EQ(0, m_local_io_ctx.create(RBD_MIRROR_LEADER, true));
144
145 m_instance_id = stringify(m_local_io_ctx.get_instance_id());
146 m_oid = RBD_MIRROR_INSTANCE_PREFIX + m_instance_id;
147
148 m_mock_threads = new MockThreads(m_threads);
149 }
150
151 void TearDown() override {
152 delete m_mock_threads;
153 TestMockFixture::TearDown();
154 }
155
156 void expect_register_watch(librados::MockTestMemIoCtxImpl &mock_io_ctx) {
157 EXPECT_CALL(mock_io_ctx, aio_watch(m_oid, _, _, _));
158 }
159
160 void expect_register_watch(librados::MockTestMemIoCtxImpl &mock_io_ctx,
161 const std::string &instance_id) {
162 std::string oid = RBD_MIRROR_INSTANCE_PREFIX + instance_id;
163 EXPECT_CALL(mock_io_ctx, aio_watch(oid, _, _, _));
164 }
165
166 void expect_unregister_watch(librados::MockTestMemIoCtxImpl &mock_io_ctx) {
167 EXPECT_CALL(mock_io_ctx, aio_unwatch(_, _));
168 }
169
170 void expect_register_instance(librados::MockTestMemIoCtxImpl &mock_io_ctx,
171 int r) {
172 EXPECT_CALL(mock_io_ctx, exec(RBD_MIRROR_LEADER, _, StrEq("rbd"),
173 StrEq("mirror_instances_add"), _, _, _))
174 .WillOnce(Return(r));
175 }
176
177 void expect_unregister_instance(librados::MockTestMemIoCtxImpl &mock_io_ctx,
178 int r) {
179 EXPECT_CALL(mock_io_ctx, exec(RBD_MIRROR_LEADER, _, StrEq("rbd"),
180 StrEq("mirror_instances_remove"), _, _, _))
181 .WillOnce(Return(r));
182 }
183
184 void expect_acquire_lock(MockManagedLock &mock_managed_lock, int r) {
185 EXPECT_CALL(mock_managed_lock, acquire_lock(_))
186 .WillOnce(CompleteContext(r));
187 }
188
189 void expect_release_lock(MockManagedLock &mock_managed_lock, int r) {
190 EXPECT_CALL(mock_managed_lock, shut_down(_)).WillOnce(CompleteContext(r));
191 }
192
193 void expect_destroy_lock(MockManagedLock &mock_managed_lock,
194 Context *ctx = nullptr) {
195 EXPECT_CALL(mock_managed_lock, destroy())
196 .WillOnce(Invoke([ctx]() {
197 if (ctx != nullptr) {
198 ctx->complete(0);
199 }
200 }));
201 }
202
203 void expect_get_locker(MockManagedLock &mock_managed_lock,
204 const librbd::managed_lock::Locker &locker, int r) {
205 EXPECT_CALL(mock_managed_lock, get_locker(_, _))
206 .WillOnce(Invoke([r, locker](librbd::managed_lock::Locker *out,
207 Context *ctx) {
208 if (r == 0) {
209 *out = locker;
210 }
211 ctx->complete(r);
212 }));
213 }
214
215 void expect_break_lock(MockManagedLock &mock_managed_lock,
216 const librbd::managed_lock::Locker &locker, int r) {
217 EXPECT_CALL(mock_managed_lock, break_lock(locker, true, _))
218 .WillOnce(WithArg<2>(CompleteContext(r)));
219 }
220 };
221
222 TEST_F(TestMockInstanceWatcher, InitShutdown) {
223 MockManagedLock mock_managed_lock;
224 librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
225
226 auto instance_watcher = new MockInstanceWatcher(
227 m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id);
228 InSequence seq;
229
230 // Init
231 expect_register_instance(mock_io_ctx, 0);
232 expect_register_watch(mock_io_ctx);
233 expect_acquire_lock(mock_managed_lock, 0);
234 ASSERT_EQ(0, instance_watcher->init());
235
236 // Shutdown
237 expect_release_lock(mock_managed_lock, 0);
238 expect_unregister_watch(mock_io_ctx);
239 expect_unregister_instance(mock_io_ctx, 0);
240 instance_watcher->shut_down();
241
242 expect_destroy_lock(mock_managed_lock);
243 delete instance_watcher;
244 }
245
246 TEST_F(TestMockInstanceWatcher, InitError) {
247 MockManagedLock mock_managed_lock;
248 librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
249
250 auto instance_watcher = new MockInstanceWatcher(
251 m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id);
252 InSequence seq;
253
254 expect_register_instance(mock_io_ctx, 0);
255 expect_register_watch(mock_io_ctx);
256 expect_acquire_lock(mock_managed_lock, -EINVAL);
257 expect_unregister_watch(mock_io_ctx);
258 expect_unregister_instance(mock_io_ctx, 0);
259
260 ASSERT_EQ(-EINVAL, instance_watcher->init());
261
262 expect_destroy_lock(mock_managed_lock);
263 delete instance_watcher;
264 }
265
266 TEST_F(TestMockInstanceWatcher, ShutdownError) {
267 MockManagedLock mock_managed_lock;
268 librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
269
270 auto instance_watcher = new MockInstanceWatcher(
271 m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id);
272 InSequence seq;
273
274 // Init
275 expect_register_instance(mock_io_ctx, 0);
276 expect_register_watch(mock_io_ctx);
277 expect_acquire_lock(mock_managed_lock, 0);
278 ASSERT_EQ(0, instance_watcher->init());
279
280 // Shutdown
281 expect_release_lock(mock_managed_lock, -EINVAL);
282 expect_unregister_watch(mock_io_ctx);
283 expect_unregister_instance(mock_io_ctx, 0);
284 instance_watcher->shut_down();
285
286 expect_destroy_lock(mock_managed_lock);
287 delete instance_watcher;
288 }
289
290
291 TEST_F(TestMockInstanceWatcher, Remove) {
292 MockManagedLock mock_managed_lock;
293 librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
294 librbd::managed_lock::Locker
295 locker{entity_name_t::CLIENT(1), "auto 123", "1.2.3.4:0/0", 123};
296
297 InSequence seq;
298
299 expect_get_locker(mock_managed_lock, locker, 0);
300 expect_break_lock(mock_managed_lock, locker, 0);
301 expect_unregister_instance(mock_io_ctx, 0);
302 C_SaferCond on_destroy;
303 expect_destroy_lock(mock_managed_lock, &on_destroy);
304
305 C_SaferCond on_remove;
306 MockInstanceWatcher::remove_instance(m_local_io_ctx,
307 m_mock_threads->work_queue,
308 "instance_id", &on_remove);
309 ASSERT_EQ(0, on_remove.wait());
310 ASSERT_EQ(0, on_destroy.wait());
311 }
312
313 TEST_F(TestMockInstanceWatcher, RemoveNoent) {
314 MockManagedLock mock_managed_lock;
315 librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
316
317 InSequence seq;
318
319 expect_get_locker(mock_managed_lock, librbd::managed_lock::Locker(), -ENOENT);
320 expect_unregister_instance(mock_io_ctx, 0);
321 C_SaferCond on_destroy;
322 expect_destroy_lock(mock_managed_lock, &on_destroy);
323
324 C_SaferCond on_remove;
325 MockInstanceWatcher::remove_instance(m_local_io_ctx,
326 m_mock_threads->work_queue,
327 "instance_id", &on_remove);
328 ASSERT_EQ(0, on_remove.wait());
329 ASSERT_EQ(0, on_destroy.wait());
330 }
331
332 TEST_F(TestMockInstanceWatcher, ImageAcquireRelease) {
333 MockManagedLock mock_managed_lock;
334
335 librados::IoCtx& io_ctx1 = m_local_io_ctx;
336 std::string instance_id1 = m_instance_id;
337 librados::MockTestMemIoCtxImpl &mock_io_ctx1(get_mock_io_ctx(io_ctx1));
338 MockInstanceReplayer mock_instance_replayer1;
339 auto instance_watcher1 = MockInstanceWatcher::create(
340 io_ctx1, m_mock_threads->work_queue, &mock_instance_replayer1);
341
342 librados::Rados cluster;
343 librados::IoCtx io_ctx2;
344 EXPECT_EQ("", connect_cluster_pp(cluster));
345 EXPECT_EQ(0, cluster.ioctx_create(_local_pool_name.c_str(), io_ctx2));
346 std::string instance_id2 = stringify(io_ctx2.get_instance_id());
347 librados::MockTestMemIoCtxImpl &mock_io_ctx2(get_mock_io_ctx(io_ctx2));
348 MockInstanceReplayer mock_instance_replayer2;
349 auto instance_watcher2 = MockInstanceWatcher::create(
350 io_ctx2, m_mock_threads->work_queue, &mock_instance_replayer2);
351
352 InSequence seq;
353
354 // Init instance watcher 1
355 expect_register_instance(mock_io_ctx1, 0);
356 expect_register_watch(mock_io_ctx1, instance_id1);
357 expect_acquire_lock(mock_managed_lock, 0);
358 ASSERT_EQ(0, instance_watcher1->init());
359
360 // Init instance watcher 2
361 expect_register_instance(mock_io_ctx2, 0);
362 expect_register_watch(mock_io_ctx2, instance_id2);
363 expect_acquire_lock(mock_managed_lock, 0);
364 ASSERT_EQ(0, instance_watcher2->init());
365
366 // Acquire Image on the the same instance
367 EXPECT_CALL(mock_instance_replayer1, acquire_image(instance_watcher1, "gid",
368 "uuid", "id", _))
369 .WillOnce(WithArg<4>(CompleteContext(0)));
370 C_SaferCond on_acquire1;
371 instance_watcher1->notify_image_acquire(instance_id1, "gid", "uuid", "id",
372 &on_acquire1);
373 ASSERT_EQ(0, on_acquire1.wait());
374
375 // Acquire Image on the other instance
376 EXPECT_CALL(mock_instance_replayer2, acquire_image(instance_watcher2, "gid",
377 "uuid", "id", _))
378 .WillOnce(WithArg<4>(CompleteContext(0)));
379 C_SaferCond on_acquire2;
380 instance_watcher1->notify_image_acquire(instance_id2, "gid", "uuid", "id",
381 &on_acquire2);
382 ASSERT_EQ(0, on_acquire2.wait());
383
384 // Release Image on the the same instance
385 EXPECT_CALL(mock_instance_replayer1, release_image("gid", "uuid", "id", true,
386 _))
387 .WillOnce(WithArg<4>(CompleteContext(0)));
388 C_SaferCond on_release1;
389 instance_watcher1->notify_image_release(instance_id1, "gid", "uuid", "id",
390 true, &on_release1);
391 ASSERT_EQ(0, on_release1.wait());
392
393 // Release Image on the other instance
394 EXPECT_CALL(mock_instance_replayer2, release_image("gid", "uuid", "id", true,
395 _))
396 .WillOnce(WithArg<4>(CompleteContext(0)));
397 C_SaferCond on_release2;
398 instance_watcher1->notify_image_release(instance_id2, "gid", "uuid", "id",
399 true, &on_release2);
400 ASSERT_EQ(0, on_release2.wait());
401
402 // Shutdown instance watcher 1
403 expect_release_lock(mock_managed_lock, 0);
404 expect_unregister_watch(mock_io_ctx1);
405 expect_unregister_instance(mock_io_ctx1, 0);
406 instance_watcher1->shut_down();
407
408 expect_destroy_lock(mock_managed_lock);
409 delete instance_watcher1;
410
411 // Shutdown instance watcher 2
412 expect_release_lock(mock_managed_lock, 0);
413 expect_unregister_watch(mock_io_ctx2);
414 expect_unregister_instance(mock_io_ctx2, 0);
415 instance_watcher2->shut_down();
416
417 expect_destroy_lock(mock_managed_lock);
418 delete instance_watcher2;
419 }
420
421 TEST_F(TestMockInstanceWatcher, ImageAcquireReleaseCancel) {
422 MockManagedLock mock_managed_lock;
423 librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
424
425 auto instance_watcher = new MockInstanceWatcher(
426 m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id);
427 InSequence seq;
428
429 // Init
430 expect_register_instance(mock_io_ctx, 0);
431 expect_register_watch(mock_io_ctx);
432 expect_acquire_lock(mock_managed_lock, 0);
433 ASSERT_EQ(0, instance_watcher->init());
434
435 // Send Acquire Image and cancel
436 EXPECT_CALL(mock_io_ctx, aio_notify(_, _, _, _, _))
437 .WillOnce(Invoke(
438 [this, instance_watcher, &mock_io_ctx](
439 const std::string& o, librados::AioCompletionImpl *c,
440 bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl) {
441 c->get();
442 auto ctx = new FunctionContext(
443 [instance_watcher, &mock_io_ctx, c, pbl](int r) {
444 instance_watcher->cancel_notify_requests("other");
445 ::encode(librbd::watcher::NotifyResponse(), *pbl);
446 mock_io_ctx.get_mock_rados_client()->
447 finish_aio_completion(c, -ETIMEDOUT);
448 });
449 m_threads->work_queue->queue(ctx, 0);
450 }));
451
452 C_SaferCond on_acquire;
453 instance_watcher->notify_image_acquire("other", "gid", "uuid", "id",
454 &on_acquire);
455 ASSERT_EQ(-ECANCELED, on_acquire.wait());
456
457 // Send Release Image and cancel
458 EXPECT_CALL(mock_io_ctx, aio_notify(_, _, _, _, _))
459 .WillOnce(Invoke(
460 [this, instance_watcher, &mock_io_ctx](
461 const std::string& o, librados::AioCompletionImpl *c,
462 bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl) {
463 c->get();
464 auto ctx = new FunctionContext(
465 [instance_watcher, &mock_io_ctx, c, pbl](int r) {
466 instance_watcher->cancel_notify_requests("other");
467 ::encode(librbd::watcher::NotifyResponse(), *pbl);
468 mock_io_ctx.get_mock_rados_client()->
469 finish_aio_completion(c, -ETIMEDOUT);
470 });
471 m_threads->work_queue->queue(ctx, 0);
472 }));
473
474 C_SaferCond on_release;
475 instance_watcher->notify_image_release("other", "gid", "uuid", "id",
476 true, &on_release);
477 ASSERT_EQ(-ECANCELED, on_release.wait());
478
479 // Shutdown
480 expect_release_lock(mock_managed_lock, 0);
481 expect_unregister_watch(mock_io_ctx);
482 expect_unregister_instance(mock_io_ctx, 0);
483 instance_watcher->shut_down();
484
485 expect_destroy_lock(mock_managed_lock);
486 delete instance_watcher;
487 }
488
489 class TestMockInstanceWatcher_NotifySync : public TestMockInstanceWatcher {
490 public:
491 typedef ImageSyncThrottler<librbd::MockTestImageCtx> MockImageSyncThrottler;
492
493 MockManagedLock mock_managed_lock;
494 MockImageSyncThrottler mock_image_sync_throttler;
495 std::string instance_id1;
496 std::string instance_id2;
497
498 librados::Rados cluster;
499 librados::IoCtx io_ctx2;
500
501 MockInstanceWatcher *instance_watcher1;
502 MockInstanceWatcher *instance_watcher2;
503
504 void SetUp() override {
505 TestMockInstanceWatcher::SetUp();
506
507 instance_id1 = m_instance_id;
508 librados::IoCtx& io_ctx1 = m_local_io_ctx;
509 librados::MockTestMemIoCtxImpl &mock_io_ctx1(get_mock_io_ctx(io_ctx1));
510 instance_watcher1 = MockInstanceWatcher::create(io_ctx1,
511 m_mock_threads->work_queue,
512 nullptr);
513 EXPECT_EQ("", connect_cluster_pp(cluster));
514 EXPECT_EQ(0, cluster.ioctx_create(_local_pool_name.c_str(), io_ctx2));
515 instance_id2 = stringify(io_ctx2.get_instance_id());
516 librados::MockTestMemIoCtxImpl &mock_io_ctx2(get_mock_io_ctx(io_ctx2));
517 instance_watcher2 = MockInstanceWatcher::create(io_ctx2,
518 m_mock_threads->work_queue,
519 nullptr);
520 InSequence seq;
521
522 // Init instance watcher 1 (leader)
523 expect_register_instance(mock_io_ctx1, 0);
524 expect_register_watch(mock_io_ctx1, instance_id1);
525 expect_acquire_lock(mock_managed_lock, 0);
526 EXPECT_EQ(0, instance_watcher1->init());
527 instance_watcher1->handle_acquire_leader();
528
529 // Init instance watcher 2
530 expect_register_instance(mock_io_ctx2, 0);
531 expect_register_watch(mock_io_ctx2, instance_id2);
532 expect_acquire_lock(mock_managed_lock, 0);
533 EXPECT_EQ(0, instance_watcher2->init());
534 instance_watcher2->handle_update_leader(instance_id1);
535 }
536
537 void TearDown() override {
538 librados::IoCtx& io_ctx1 = m_local_io_ctx;
539 librados::MockTestMemIoCtxImpl &mock_io_ctx1(get_mock_io_ctx(io_ctx1));
540 librados::MockTestMemIoCtxImpl &mock_io_ctx2(get_mock_io_ctx(io_ctx2));
541
542 InSequence seq;
543
544 expect_throttler_destroy();
545 instance_watcher1->handle_release_leader();
546
547 // Shutdown instance watcher 1
548 expect_release_lock(mock_managed_lock, 0);
549 expect_unregister_watch(mock_io_ctx1);
550 expect_unregister_instance(mock_io_ctx1, 0);
551 instance_watcher1->shut_down();
552
553 expect_destroy_lock(mock_managed_lock);
554 delete instance_watcher1;
555
556 // Shutdown instance watcher 2
557 expect_release_lock(mock_managed_lock, 0);
558 expect_unregister_watch(mock_io_ctx2);
559 expect_unregister_instance(mock_io_ctx2, 0);
560 instance_watcher2->shut_down();
561
562 expect_destroy_lock(mock_managed_lock);
563 delete instance_watcher2;
564
565 TestMockInstanceWatcher::TearDown();
566 }
567
568 void expect_throttler_destroy(
569 std::vector<Context *> *throttler_queue = nullptr) {
570 EXPECT_CALL(mock_image_sync_throttler, drain(-ESTALE))
571 .WillOnce(Invoke([throttler_queue] (int r) {
572 if (throttler_queue != nullptr) {
573 for (auto ctx : *throttler_queue) {
574 ctx->complete(r);
575 }
576 }
577 }));
578 EXPECT_CALL(mock_image_sync_throttler, destroy());
579 }
580
581 void expect_throttler_start_op(const std::string &sync_id,
582 Context *on_call = nullptr,
583 Context **on_start_ctx = nullptr) {
584 EXPECT_CALL(mock_image_sync_throttler, start_op(sync_id, _))
585 .WillOnce(Invoke([on_call, on_start_ctx] (const std::string &,
586 Context *ctx) {
587 if (on_call != nullptr) {
588 on_call->complete(0);
589 }
590 if (on_start_ctx != nullptr) {
591 *on_start_ctx = ctx;
592 } else {
593 ctx->complete(0);
594 }
595 }));
596 }
597
598 void expect_throttler_finish_op(const std::string &sync_id,
599 Context *on_finish) {
600 EXPECT_CALL(mock_image_sync_throttler, finish_op("sync_id"))
601 .WillOnce(Invoke([on_finish](const std::string &) {
602 on_finish->complete(0);
603 }));
604 }
605 };
606
607 TEST_F(TestMockInstanceWatcher_NotifySync, StartStopOnLeader) {
608 InSequence seq;
609
610 expect_throttler_start_op("sync_id");
611 C_SaferCond on_start;
612 instance_watcher1->notify_sync_request("sync_id", &on_start);
613 ASSERT_EQ(0, on_start.wait());
614
615 C_SaferCond on_finish;
616 expect_throttler_finish_op("sync_id", &on_finish);
617 instance_watcher1->notify_sync_complete("sync_id");
618 ASSERT_EQ(0, on_finish.wait());
619 }
620
621 TEST_F(TestMockInstanceWatcher_NotifySync, CancelStartedOnLeader) {
622 InSequence seq;
623
624 expect_throttler_start_op("sync_id");
625 C_SaferCond on_start;
626 instance_watcher1->notify_sync_request("sync_id", &on_start);
627 ASSERT_EQ(0, on_start.wait());
628
629 ASSERT_FALSE(instance_watcher1->cancel_sync_request("sync_id"));
630
631 C_SaferCond on_finish;
632 expect_throttler_finish_op("sync_id", &on_finish);
633 instance_watcher1->notify_sync_complete("sync_id");
634 ASSERT_EQ(0, on_finish.wait());
635 }
636
637 TEST_F(TestMockInstanceWatcher_NotifySync, StartStopOnNonLeader) {
638 InSequence seq;
639
640 expect_throttler_start_op("sync_id");
641 C_SaferCond on_start;
642 instance_watcher2->notify_sync_request("sync_id", &on_start);
643 ASSERT_EQ(0, on_start.wait());
644
645 C_SaferCond on_finish;
646 expect_throttler_finish_op("sync_id", &on_finish);
647 instance_watcher2->notify_sync_complete("sync_id");
648 ASSERT_EQ(0, on_finish.wait());
649 }
650
651 TEST_F(TestMockInstanceWatcher_NotifySync, CancelStartedOnNonLeader) {
652 InSequence seq;
653
654 expect_throttler_start_op("sync_id");
655 C_SaferCond on_start;
656 instance_watcher2->notify_sync_request("sync_id", &on_start);
657 ASSERT_EQ(0, on_start.wait());
658
659 ASSERT_FALSE(instance_watcher2->cancel_sync_request("sync_id"));
660
661 C_SaferCond on_finish;
662 expect_throttler_finish_op("sync_id", &on_finish);
663 instance_watcher2->notify_sync_complete("sync_id");
664 ASSERT_EQ(0, on_finish.wait());
665 }
666
667 TEST_F(TestMockInstanceWatcher_NotifySync, CancelWaitingOnNonLeader) {
668 InSequence seq;
669
670 C_SaferCond on_start_op_called;
671 Context *on_start_ctx;
672 expect_throttler_start_op("sync_id", &on_start_op_called,
673 &on_start_ctx);
674 C_SaferCond on_start;
675 instance_watcher2->notify_sync_request("sync_id", &on_start);
676 ASSERT_EQ(0, on_start_op_called.wait());
677
678 ASSERT_TRUE(instance_watcher2->cancel_sync_request("sync_id"));
679 // emulate watcher timeout
680 on_start_ctx->complete(-ETIMEDOUT);
681 ASSERT_EQ(-ECANCELED, on_start.wait());
682 }
683
684 TEST_F(TestMockInstanceWatcher_NotifySync, InFlightPrevNotification) {
685 // start sync when previous notification is still in flight
686
687 InSequence seq;
688
689 expect_throttler_start_op("sync_id");
690 C_SaferCond on_start1;
691 instance_watcher2->notify_sync_request("sync_id", &on_start1);
692 ASSERT_EQ(0, on_start1.wait());
693
694 C_SaferCond on_start2;
695 EXPECT_CALL(mock_image_sync_throttler, finish_op("sync_id"))
696 .WillOnce(Invoke([this, &on_start2](const std::string &) {
697 instance_watcher2->notify_sync_request("sync_id", &on_start2);
698 }));
699 expect_throttler_start_op("sync_id");
700 instance_watcher2->notify_sync_complete("sync_id");
701
702 ASSERT_EQ(0, on_start2.wait());
703 C_SaferCond on_finish;
704 expect_throttler_finish_op("sync_id", &on_finish);
705 instance_watcher2->notify_sync_complete("sync_id");
706 ASSERT_EQ(0, on_finish.wait());
707 }
708
709 TEST_F(TestMockInstanceWatcher_NotifySync, NoInFlightReleaseAcquireLeader) {
710 InSequence seq;
711
712 expect_throttler_destroy();
713 instance_watcher1->handle_release_leader();
714 instance_watcher1->handle_acquire_leader();
715 }
716
717 TEST_F(TestMockInstanceWatcher_NotifySync, StartedOnLeaderReleaseLeader) {
718 InSequence seq;
719
720 expect_throttler_destroy();
721 instance_watcher1->handle_release_leader();
722 instance_watcher2->handle_acquire_leader();
723
724 expect_throttler_start_op("sync_id");
725 C_SaferCond on_start;
726 instance_watcher2->notify_sync_request("sync_id", &on_start);
727 ASSERT_EQ(0, on_start.wait());
728 expect_throttler_destroy();
729 instance_watcher2->handle_release_leader();
730 instance_watcher2->notify_sync_complete("sync_id");
731
732 instance_watcher1->handle_acquire_leader();
733 }
734
735 TEST_F(TestMockInstanceWatcher_NotifySync, WaitingOnLeaderReleaseLeader) {
736 InSequence seq;
737
738 C_SaferCond on_start_op_called;
739 Context *on_start_ctx;
740 expect_throttler_start_op("sync_id", &on_start_op_called,
741 &on_start_ctx);
742 C_SaferCond on_start;
743 instance_watcher1->notify_sync_request("sync_id", &on_start);
744 ASSERT_EQ(0, on_start_op_called.wait());
745
746 std::vector<Context *> throttler_queue = {on_start_ctx};
747 expect_throttler_destroy(&throttler_queue);
748 instance_watcher1->handle_release_leader();
749 instance_watcher2->handle_acquire_leader();
750 instance_watcher1->handle_update_leader(instance_id2);
751
752 expect_throttler_start_op("sync_id");
753 ASSERT_EQ(0, on_start.wait());
754 C_SaferCond on_finish;
755 expect_throttler_finish_op("sync_id", &on_finish);
756 instance_watcher1->notify_sync_complete("sync_id");
757 ASSERT_EQ(0, on_finish.wait());
758
759 expect_throttler_destroy();
760 instance_watcher2->handle_release_leader();
761 instance_watcher1->handle_acquire_leader();
762 }
763
764 TEST_F(TestMockInstanceWatcher_NotifySync, StartedOnNonLeaderAcquireLeader) {
765 InSequence seq;
766
767 expect_throttler_destroy();
768 instance_watcher1->handle_release_leader();
769 instance_watcher2->handle_acquire_leader();
770 instance_watcher1->handle_update_leader(instance_id2);
771
772 expect_throttler_start_op("sync_id");
773 C_SaferCond on_start;
774 instance_watcher1->notify_sync_request("sync_id", &on_start);
775 ASSERT_EQ(0, on_start.wait());
776
777 expect_throttler_destroy();
778 instance_watcher2->handle_release_leader();
779 instance_watcher1->handle_acquire_leader();
780 instance_watcher2->handle_update_leader(instance_id2);
781
782 instance_watcher1->notify_sync_complete("sync_id");
783 }
784
785 TEST_F(TestMockInstanceWatcher_NotifySync, WaitingOnNonLeaderAcquireLeader) {
786 InSequence seq;
787
788 C_SaferCond on_start_op_called;
789 Context *on_start_ctx;
790 expect_throttler_start_op("sync_id", &on_start_op_called,
791 &on_start_ctx);
792 C_SaferCond on_start;
793 instance_watcher2->notify_sync_request("sync_id", &on_start);
794 ASSERT_EQ(0, on_start_op_called.wait());
795
796 std::vector<Context *> throttler_queue = {on_start_ctx};
797 expect_throttler_destroy(&throttler_queue);
798 instance_watcher1->handle_release_leader();
799
800 EXPECT_CALL(mock_image_sync_throttler, start_op("sync_id", _))
801 .WillOnce(WithArg<1>(CompleteContext(0)));
802 instance_watcher2->handle_acquire_leader();
803 instance_watcher1->handle_update_leader(instance_id2);
804
805 ASSERT_EQ(0, on_start.wait());
806
807 C_SaferCond on_finish;
808 expect_throttler_finish_op("sync_id", &on_finish);
809 instance_watcher2->notify_sync_complete("sync_id");
810 ASSERT_EQ(0, on_finish.wait());
811
812 expect_throttler_destroy();
813 instance_watcher2->handle_release_leader();
814 instance_watcher1->handle_acquire_leader();
815 }
816
817 } // namespace mirror
818 } // namespace rbd