1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "librados/AioCompletionImpl.h"
5 #include "librbd/ManagedLock.h"
6 #include "test/librados/test.h"
7 #include "test/librados_test_stub/MockTestMemIoCtxImpl.h"
8 #include "test/librados_test_stub/MockTestMemRadosClient.h"
9 #include "test/librbd/mock/MockImageCtx.h"
10 #include "test/rbd_mirror/test_mock_fixture.h"
11 #include "tools/rbd_mirror/InstanceReplayer.h"
12 #include "tools/rbd_mirror/ImageSyncThrottler.h"
13 #include "tools/rbd_mirror/InstanceWatcher.h"
14 #include "tools/rbd_mirror/Threads.h"
20 struct MockTestImageCtx
: public MockImageCtx
{
21 MockTestImageCtx(librbd::ImageCtx
&image_ctx
)
22 : librbd::MockImageCtx(image_ctx
) {
26 } // anonymous namespace
29 struct ManagedLock
<MockTestImageCtx
> {
30 static ManagedLock
* s_instance
;
32 static ManagedLock
*create(librados::IoCtx
& ioctx
, ContextWQ
*work_queue
,
33 const std::string
& oid
, librbd::Watcher
*watcher
,
34 managed_lock::Mode mode
,
35 bool blacklist_on_break_lock
,
36 uint32_t blacklist_expire_seconds
) {
37 assert(s_instance
!= nullptr);
42 assert(s_instance
== nullptr);
47 assert(s_instance
== this);
51 MOCK_METHOD0(destroy
, void());
52 MOCK_METHOD1(shut_down
, void(Context
*));
53 MOCK_METHOD1(acquire_lock
, void(Context
*));
54 MOCK_METHOD2(get_locker
, void(managed_lock::Locker
*, Context
*));
55 MOCK_METHOD3(break_lock
, void(const managed_lock::Locker
&, bool, Context
*));
58 ManagedLock
<MockTestImageCtx
> *ManagedLock
<MockTestImageCtx
>::s_instance
= nullptr;
66 struct Threads
<librbd::MockTestImageCtx
> {
69 ContextWQ
*work_queue
;
71 Threads(Threads
<librbd::ImageCtx
> *threads
)
72 : timer_lock(threads
->timer_lock
), timer(threads
->timer
),
73 work_queue(threads
->work_queue
) {
78 struct InstanceReplayer
<librbd::MockTestImageCtx
> {
79 MOCK_METHOD5(acquire_image
, void(InstanceWatcher
<librbd::MockTestImageCtx
> *,
80 const std::string
&, const std::string
&,
81 const std::string
&, Context
*));
82 MOCK_METHOD5(release_image
, void(const std::string
&, const std::string
&,
83 const std::string
&, bool, Context
*));
87 struct ImageSyncThrottler
<librbd::MockTestImageCtx
> {
88 static ImageSyncThrottler
* s_instance
;
90 static ImageSyncThrottler
*create() {
91 assert(s_instance
!= nullptr);
95 ImageSyncThrottler() {
96 assert(s_instance
== nullptr);
100 virtual ~ImageSyncThrottler() {
101 assert(s_instance
== this);
102 s_instance
= nullptr;
105 MOCK_METHOD0(destroy
, void());
106 MOCK_METHOD1(drain
, void(int));
107 MOCK_METHOD2(start_op
, void(const std::string
&, Context
*));
108 MOCK_METHOD1(finish_op
, void(const std::string
&));
111 ImageSyncThrottler
<librbd::MockTestImageCtx
>* ImageSyncThrottler
<librbd::MockTestImageCtx
>::s_instance
= nullptr;
113 } // namespace mirror
116 // template definitions
117 #include "tools/rbd_mirror/InstanceWatcher.cc"
123 using ::testing::InSequence
;
124 using ::testing::Invoke
;
125 using ::testing::Return
;
126 using ::testing::StrEq
;
127 using ::testing::WithArg
;
129 class TestMockInstanceWatcher
: public TestMockFixture
{
131 typedef librbd::ManagedLock
<librbd::MockTestImageCtx
> MockManagedLock
;
132 typedef InstanceReplayer
<librbd::MockTestImageCtx
> MockInstanceReplayer
;
133 typedef InstanceWatcher
<librbd::MockTestImageCtx
> MockInstanceWatcher
;
134 typedef Threads
<librbd::MockTestImageCtx
> MockThreads
;
136 std::string m_instance_id
;
138 MockThreads
*m_mock_threads
;
140 void SetUp() override
{
141 TestFixture::SetUp();
142 m_local_io_ctx
.remove(RBD_MIRROR_LEADER
);
143 EXPECT_EQ(0, m_local_io_ctx
.create(RBD_MIRROR_LEADER
, true));
145 m_instance_id
= stringify(m_local_io_ctx
.get_instance_id());
146 m_oid
= RBD_MIRROR_INSTANCE_PREFIX
+ m_instance_id
;
148 m_mock_threads
= new MockThreads(m_threads
);
151 void TearDown() override
{
152 delete m_mock_threads
;
153 TestMockFixture::TearDown();
156 void expect_register_watch(librados::MockTestMemIoCtxImpl
&mock_io_ctx
) {
157 EXPECT_CALL(mock_io_ctx
, aio_watch(m_oid
, _
, _
, _
));
160 void expect_register_watch(librados::MockTestMemIoCtxImpl
&mock_io_ctx
,
161 const std::string
&instance_id
) {
162 std::string oid
= RBD_MIRROR_INSTANCE_PREFIX
+ instance_id
;
163 EXPECT_CALL(mock_io_ctx
, aio_watch(oid
, _
, _
, _
));
166 void expect_unregister_watch(librados::MockTestMemIoCtxImpl
&mock_io_ctx
) {
167 EXPECT_CALL(mock_io_ctx
, aio_unwatch(_
, _
));
170 void expect_register_instance(librados::MockTestMemIoCtxImpl
&mock_io_ctx
,
172 EXPECT_CALL(mock_io_ctx
, exec(RBD_MIRROR_LEADER
, _
, StrEq("rbd"),
173 StrEq("mirror_instances_add"), _
, _
, _
))
174 .WillOnce(Return(r
));
177 void expect_unregister_instance(librados::MockTestMemIoCtxImpl
&mock_io_ctx
,
179 EXPECT_CALL(mock_io_ctx
, exec(RBD_MIRROR_LEADER
, _
, StrEq("rbd"),
180 StrEq("mirror_instances_remove"), _
, _
, _
))
181 .WillOnce(Return(r
));
184 void expect_acquire_lock(MockManagedLock
&mock_managed_lock
, int r
) {
185 EXPECT_CALL(mock_managed_lock
, acquire_lock(_
))
186 .WillOnce(CompleteContext(r
));
189 void expect_release_lock(MockManagedLock
&mock_managed_lock
, int r
) {
190 EXPECT_CALL(mock_managed_lock
, shut_down(_
)).WillOnce(CompleteContext(r
));
193 void expect_destroy_lock(MockManagedLock
&mock_managed_lock
,
194 Context
*ctx
= nullptr) {
195 EXPECT_CALL(mock_managed_lock
, destroy())
196 .WillOnce(Invoke([ctx
]() {
197 if (ctx
!= nullptr) {
203 void expect_get_locker(MockManagedLock
&mock_managed_lock
,
204 const librbd::managed_lock::Locker
&locker
, int r
) {
205 EXPECT_CALL(mock_managed_lock
, get_locker(_
, _
))
206 .WillOnce(Invoke([r
, locker
](librbd::managed_lock::Locker
*out
,
215 void expect_break_lock(MockManagedLock
&mock_managed_lock
,
216 const librbd::managed_lock::Locker
&locker
, int r
) {
217 EXPECT_CALL(mock_managed_lock
, break_lock(locker
, true, _
))
218 .WillOnce(WithArg
<2>(CompleteContext(r
)));
222 TEST_F(TestMockInstanceWatcher
, InitShutdown
) {
223 MockManagedLock mock_managed_lock
;
224 librados::MockTestMemIoCtxImpl
&mock_io_ctx(get_mock_io_ctx(m_local_io_ctx
));
226 auto instance_watcher
= new MockInstanceWatcher(
227 m_local_io_ctx
, m_mock_threads
->work_queue
, nullptr, m_instance_id
);
231 expect_register_instance(mock_io_ctx
, 0);
232 expect_register_watch(mock_io_ctx
);
233 expect_acquire_lock(mock_managed_lock
, 0);
234 ASSERT_EQ(0, instance_watcher
->init());
237 expect_release_lock(mock_managed_lock
, 0);
238 expect_unregister_watch(mock_io_ctx
);
239 expect_unregister_instance(mock_io_ctx
, 0);
240 instance_watcher
->shut_down();
242 expect_destroy_lock(mock_managed_lock
);
243 delete instance_watcher
;
246 TEST_F(TestMockInstanceWatcher
, InitError
) {
247 MockManagedLock mock_managed_lock
;
248 librados::MockTestMemIoCtxImpl
&mock_io_ctx(get_mock_io_ctx(m_local_io_ctx
));
250 auto instance_watcher
= new MockInstanceWatcher(
251 m_local_io_ctx
, m_mock_threads
->work_queue
, nullptr, m_instance_id
);
254 expect_register_instance(mock_io_ctx
, 0);
255 expect_register_watch(mock_io_ctx
);
256 expect_acquire_lock(mock_managed_lock
, -EINVAL
);
257 expect_unregister_watch(mock_io_ctx
);
258 expect_unregister_instance(mock_io_ctx
, 0);
260 ASSERT_EQ(-EINVAL
, instance_watcher
->init());
262 expect_destroy_lock(mock_managed_lock
);
263 delete instance_watcher
;
266 TEST_F(TestMockInstanceWatcher
, ShutdownError
) {
267 MockManagedLock mock_managed_lock
;
268 librados::MockTestMemIoCtxImpl
&mock_io_ctx(get_mock_io_ctx(m_local_io_ctx
));
270 auto instance_watcher
= new MockInstanceWatcher(
271 m_local_io_ctx
, m_mock_threads
->work_queue
, nullptr, m_instance_id
);
275 expect_register_instance(mock_io_ctx
, 0);
276 expect_register_watch(mock_io_ctx
);
277 expect_acquire_lock(mock_managed_lock
, 0);
278 ASSERT_EQ(0, instance_watcher
->init());
281 expect_release_lock(mock_managed_lock
, -EINVAL
);
282 expect_unregister_watch(mock_io_ctx
);
283 expect_unregister_instance(mock_io_ctx
, 0);
284 instance_watcher
->shut_down();
286 expect_destroy_lock(mock_managed_lock
);
287 delete instance_watcher
;
291 TEST_F(TestMockInstanceWatcher
, Remove
) {
292 MockManagedLock mock_managed_lock
;
293 librados::MockTestMemIoCtxImpl
&mock_io_ctx(get_mock_io_ctx(m_local_io_ctx
));
294 librbd::managed_lock::Locker
295 locker
{entity_name_t::CLIENT(1), "auto 123", "1.2.3.4:0/0", 123};
299 expect_get_locker(mock_managed_lock
, locker
, 0);
300 expect_break_lock(mock_managed_lock
, locker
, 0);
301 expect_unregister_instance(mock_io_ctx
, 0);
302 C_SaferCond on_destroy
;
303 expect_destroy_lock(mock_managed_lock
, &on_destroy
);
305 C_SaferCond on_remove
;
306 MockInstanceWatcher::remove_instance(m_local_io_ctx
,
307 m_mock_threads
->work_queue
,
308 "instance_id", &on_remove
);
309 ASSERT_EQ(0, on_remove
.wait());
310 ASSERT_EQ(0, on_destroy
.wait());
313 TEST_F(TestMockInstanceWatcher
, RemoveNoent
) {
314 MockManagedLock mock_managed_lock
;
315 librados::MockTestMemIoCtxImpl
&mock_io_ctx(get_mock_io_ctx(m_local_io_ctx
));
319 expect_get_locker(mock_managed_lock
, librbd::managed_lock::Locker(), -ENOENT
);
320 expect_unregister_instance(mock_io_ctx
, 0);
321 C_SaferCond on_destroy
;
322 expect_destroy_lock(mock_managed_lock
, &on_destroy
);
324 C_SaferCond on_remove
;
325 MockInstanceWatcher::remove_instance(m_local_io_ctx
,
326 m_mock_threads
->work_queue
,
327 "instance_id", &on_remove
);
328 ASSERT_EQ(0, on_remove
.wait());
329 ASSERT_EQ(0, on_destroy
.wait());
332 TEST_F(TestMockInstanceWatcher
, ImageAcquireRelease
) {
333 MockManagedLock mock_managed_lock
;
335 librados::IoCtx
& io_ctx1
= m_local_io_ctx
;
336 std::string instance_id1
= m_instance_id
;
337 librados::MockTestMemIoCtxImpl
&mock_io_ctx1(get_mock_io_ctx(io_ctx1
));
338 MockInstanceReplayer mock_instance_replayer1
;
339 auto instance_watcher1
= MockInstanceWatcher::create(
340 io_ctx1
, m_mock_threads
->work_queue
, &mock_instance_replayer1
);
342 librados::Rados cluster
;
343 librados::IoCtx io_ctx2
;
344 EXPECT_EQ("", connect_cluster_pp(cluster
));
345 EXPECT_EQ(0, cluster
.ioctx_create(_local_pool_name
.c_str(), io_ctx2
));
346 std::string instance_id2
= stringify(io_ctx2
.get_instance_id());
347 librados::MockTestMemIoCtxImpl
&mock_io_ctx2(get_mock_io_ctx(io_ctx2
));
348 MockInstanceReplayer mock_instance_replayer2
;
349 auto instance_watcher2
= MockInstanceWatcher::create(
350 io_ctx2
, m_mock_threads
->work_queue
, &mock_instance_replayer2
);
354 // Init instance watcher 1
355 expect_register_instance(mock_io_ctx1
, 0);
356 expect_register_watch(mock_io_ctx1
, instance_id1
);
357 expect_acquire_lock(mock_managed_lock
, 0);
358 ASSERT_EQ(0, instance_watcher1
->init());
360 // Init instance watcher 2
361 expect_register_instance(mock_io_ctx2
, 0);
362 expect_register_watch(mock_io_ctx2
, instance_id2
);
363 expect_acquire_lock(mock_managed_lock
, 0);
364 ASSERT_EQ(0, instance_watcher2
->init());
366 // Acquire Image on the the same instance
367 EXPECT_CALL(mock_instance_replayer1
, acquire_image(instance_watcher1
, "gid",
369 .WillOnce(WithArg
<4>(CompleteContext(0)));
370 C_SaferCond on_acquire1
;
371 instance_watcher1
->notify_image_acquire(instance_id1
, "gid", "uuid", "id",
373 ASSERT_EQ(0, on_acquire1
.wait());
375 // Acquire Image on the other instance
376 EXPECT_CALL(mock_instance_replayer2
, acquire_image(instance_watcher2
, "gid",
378 .WillOnce(WithArg
<4>(CompleteContext(0)));
379 C_SaferCond on_acquire2
;
380 instance_watcher1
->notify_image_acquire(instance_id2
, "gid", "uuid", "id",
382 ASSERT_EQ(0, on_acquire2
.wait());
384 // Release Image on the the same instance
385 EXPECT_CALL(mock_instance_replayer1
, release_image("gid", "uuid", "id", true,
387 .WillOnce(WithArg
<4>(CompleteContext(0)));
388 C_SaferCond on_release1
;
389 instance_watcher1
->notify_image_release(instance_id1
, "gid", "uuid", "id",
391 ASSERT_EQ(0, on_release1
.wait());
393 // Release Image on the other instance
394 EXPECT_CALL(mock_instance_replayer2
, release_image("gid", "uuid", "id", true,
396 .WillOnce(WithArg
<4>(CompleteContext(0)));
397 C_SaferCond on_release2
;
398 instance_watcher1
->notify_image_release(instance_id2
, "gid", "uuid", "id",
400 ASSERT_EQ(0, on_release2
.wait());
402 // Shutdown instance watcher 1
403 expect_release_lock(mock_managed_lock
, 0);
404 expect_unregister_watch(mock_io_ctx1
);
405 expect_unregister_instance(mock_io_ctx1
, 0);
406 instance_watcher1
->shut_down();
408 expect_destroy_lock(mock_managed_lock
);
409 delete instance_watcher1
;
411 // Shutdown instance watcher 2
412 expect_release_lock(mock_managed_lock
, 0);
413 expect_unregister_watch(mock_io_ctx2
);
414 expect_unregister_instance(mock_io_ctx2
, 0);
415 instance_watcher2
->shut_down();
417 expect_destroy_lock(mock_managed_lock
);
418 delete instance_watcher2
;
421 TEST_F(TestMockInstanceWatcher
, ImageAcquireReleaseCancel
) {
422 MockManagedLock mock_managed_lock
;
423 librados::MockTestMemIoCtxImpl
&mock_io_ctx(get_mock_io_ctx(m_local_io_ctx
));
425 auto instance_watcher
= new MockInstanceWatcher(
426 m_local_io_ctx
, m_mock_threads
->work_queue
, nullptr, m_instance_id
);
430 expect_register_instance(mock_io_ctx
, 0);
431 expect_register_watch(mock_io_ctx
);
432 expect_acquire_lock(mock_managed_lock
, 0);
433 ASSERT_EQ(0, instance_watcher
->init());
435 // Send Acquire Image and cancel
436 EXPECT_CALL(mock_io_ctx
, aio_notify(_
, _
, _
, _
, _
))
438 [this, instance_watcher
, &mock_io_ctx
](
439 const std::string
& o
, librados::AioCompletionImpl
*c
,
440 bufferlist
& bl
, uint64_t timeout_ms
, bufferlist
*pbl
) {
442 auto ctx
= new FunctionContext(
443 [instance_watcher
, &mock_io_ctx
, c
, pbl
](int r
) {
444 instance_watcher
->cancel_notify_requests("other");
445 ::encode(librbd::watcher::NotifyResponse(), *pbl
);
446 mock_io_ctx
.get_mock_rados_client()->
447 finish_aio_completion(c
, -ETIMEDOUT
);
449 m_threads
->work_queue
->queue(ctx
, 0);
452 C_SaferCond on_acquire
;
453 instance_watcher
->notify_image_acquire("other", "gid", "uuid", "id",
455 ASSERT_EQ(-ECANCELED
, on_acquire
.wait());
457 // Send Release Image and cancel
458 EXPECT_CALL(mock_io_ctx
, aio_notify(_
, _
, _
, _
, _
))
460 [this, instance_watcher
, &mock_io_ctx
](
461 const std::string
& o
, librados::AioCompletionImpl
*c
,
462 bufferlist
& bl
, uint64_t timeout_ms
, bufferlist
*pbl
) {
464 auto ctx
= new FunctionContext(
465 [instance_watcher
, &mock_io_ctx
, c
, pbl
](int r
) {
466 instance_watcher
->cancel_notify_requests("other");
467 ::encode(librbd::watcher::NotifyResponse(), *pbl
);
468 mock_io_ctx
.get_mock_rados_client()->
469 finish_aio_completion(c
, -ETIMEDOUT
);
471 m_threads
->work_queue
->queue(ctx
, 0);
474 C_SaferCond on_release
;
475 instance_watcher
->notify_image_release("other", "gid", "uuid", "id",
477 ASSERT_EQ(-ECANCELED
, on_release
.wait());
480 expect_release_lock(mock_managed_lock
, 0);
481 expect_unregister_watch(mock_io_ctx
);
482 expect_unregister_instance(mock_io_ctx
, 0);
483 instance_watcher
->shut_down();
485 expect_destroy_lock(mock_managed_lock
);
486 delete instance_watcher
;
489 class TestMockInstanceWatcher_NotifySync
: public TestMockInstanceWatcher
{
491 typedef ImageSyncThrottler
<librbd::MockTestImageCtx
> MockImageSyncThrottler
;
493 MockManagedLock mock_managed_lock
;
494 MockImageSyncThrottler mock_image_sync_throttler
;
495 std::string instance_id1
;
496 std::string instance_id2
;
498 librados::Rados cluster
;
499 librados::IoCtx io_ctx2
;
501 MockInstanceWatcher
*instance_watcher1
;
502 MockInstanceWatcher
*instance_watcher2
;
504 void SetUp() override
{
505 TestMockInstanceWatcher::SetUp();
507 instance_id1
= m_instance_id
;
508 librados::IoCtx
& io_ctx1
= m_local_io_ctx
;
509 librados::MockTestMemIoCtxImpl
&mock_io_ctx1(get_mock_io_ctx(io_ctx1
));
510 instance_watcher1
= MockInstanceWatcher::create(io_ctx1
,
511 m_mock_threads
->work_queue
,
513 EXPECT_EQ("", connect_cluster_pp(cluster
));
514 EXPECT_EQ(0, cluster
.ioctx_create(_local_pool_name
.c_str(), io_ctx2
));
515 instance_id2
= stringify(io_ctx2
.get_instance_id());
516 librados::MockTestMemIoCtxImpl
&mock_io_ctx2(get_mock_io_ctx(io_ctx2
));
517 instance_watcher2
= MockInstanceWatcher::create(io_ctx2
,
518 m_mock_threads
->work_queue
,
522 // Init instance watcher 1 (leader)
523 expect_register_instance(mock_io_ctx1
, 0);
524 expect_register_watch(mock_io_ctx1
, instance_id1
);
525 expect_acquire_lock(mock_managed_lock
, 0);
526 EXPECT_EQ(0, instance_watcher1
->init());
527 instance_watcher1
->handle_acquire_leader();
529 // Init instance watcher 2
530 expect_register_instance(mock_io_ctx2
, 0);
531 expect_register_watch(mock_io_ctx2
, instance_id2
);
532 expect_acquire_lock(mock_managed_lock
, 0);
533 EXPECT_EQ(0, instance_watcher2
->init());
534 instance_watcher2
->handle_update_leader(instance_id1
);
537 void TearDown() override
{
538 librados::IoCtx
& io_ctx1
= m_local_io_ctx
;
539 librados::MockTestMemIoCtxImpl
&mock_io_ctx1(get_mock_io_ctx(io_ctx1
));
540 librados::MockTestMemIoCtxImpl
&mock_io_ctx2(get_mock_io_ctx(io_ctx2
));
544 expect_throttler_destroy();
545 instance_watcher1
->handle_release_leader();
547 // Shutdown instance watcher 1
548 expect_release_lock(mock_managed_lock
, 0);
549 expect_unregister_watch(mock_io_ctx1
);
550 expect_unregister_instance(mock_io_ctx1
, 0);
551 instance_watcher1
->shut_down();
553 expect_destroy_lock(mock_managed_lock
);
554 delete instance_watcher1
;
556 // Shutdown instance watcher 2
557 expect_release_lock(mock_managed_lock
, 0);
558 expect_unregister_watch(mock_io_ctx2
);
559 expect_unregister_instance(mock_io_ctx2
, 0);
560 instance_watcher2
->shut_down();
562 expect_destroy_lock(mock_managed_lock
);
563 delete instance_watcher2
;
565 TestMockInstanceWatcher::TearDown();
568 void expect_throttler_destroy(
569 std::vector
<Context
*> *throttler_queue
= nullptr) {
570 EXPECT_CALL(mock_image_sync_throttler
, drain(-ESTALE
))
571 .WillOnce(Invoke([throttler_queue
] (int r
) {
572 if (throttler_queue
!= nullptr) {
573 for (auto ctx
: *throttler_queue
) {
578 EXPECT_CALL(mock_image_sync_throttler
, destroy());
581 void expect_throttler_start_op(const std::string
&sync_id
,
582 Context
*on_call
= nullptr,
583 Context
**on_start_ctx
= nullptr) {
584 EXPECT_CALL(mock_image_sync_throttler
, start_op(sync_id
, _
))
585 .WillOnce(Invoke([on_call
, on_start_ctx
] (const std::string
&,
587 if (on_call
!= nullptr) {
588 on_call
->complete(0);
590 if (on_start_ctx
!= nullptr) {
598 void expect_throttler_finish_op(const std::string
&sync_id
,
599 Context
*on_finish
) {
600 EXPECT_CALL(mock_image_sync_throttler
, finish_op("sync_id"))
601 .WillOnce(Invoke([on_finish
](const std::string
&) {
602 on_finish
->complete(0);
607 TEST_F(TestMockInstanceWatcher_NotifySync
, StartStopOnLeader
) {
610 expect_throttler_start_op("sync_id");
611 C_SaferCond on_start
;
612 instance_watcher1
->notify_sync_request("sync_id", &on_start
);
613 ASSERT_EQ(0, on_start
.wait());
615 C_SaferCond on_finish
;
616 expect_throttler_finish_op("sync_id", &on_finish
);
617 instance_watcher1
->notify_sync_complete("sync_id");
618 ASSERT_EQ(0, on_finish
.wait());
621 TEST_F(TestMockInstanceWatcher_NotifySync
, CancelStartedOnLeader
) {
624 expect_throttler_start_op("sync_id");
625 C_SaferCond on_start
;
626 instance_watcher1
->notify_sync_request("sync_id", &on_start
);
627 ASSERT_EQ(0, on_start
.wait());
629 ASSERT_FALSE(instance_watcher1
->cancel_sync_request("sync_id"));
631 C_SaferCond on_finish
;
632 expect_throttler_finish_op("sync_id", &on_finish
);
633 instance_watcher1
->notify_sync_complete("sync_id");
634 ASSERT_EQ(0, on_finish
.wait());
637 TEST_F(TestMockInstanceWatcher_NotifySync
, StartStopOnNonLeader
) {
640 expect_throttler_start_op("sync_id");
641 C_SaferCond on_start
;
642 instance_watcher2
->notify_sync_request("sync_id", &on_start
);
643 ASSERT_EQ(0, on_start
.wait());
645 C_SaferCond on_finish
;
646 expect_throttler_finish_op("sync_id", &on_finish
);
647 instance_watcher2
->notify_sync_complete("sync_id");
648 ASSERT_EQ(0, on_finish
.wait());
651 TEST_F(TestMockInstanceWatcher_NotifySync
, CancelStartedOnNonLeader
) {
654 expect_throttler_start_op("sync_id");
655 C_SaferCond on_start
;
656 instance_watcher2
->notify_sync_request("sync_id", &on_start
);
657 ASSERT_EQ(0, on_start
.wait());
659 ASSERT_FALSE(instance_watcher2
->cancel_sync_request("sync_id"));
661 C_SaferCond on_finish
;
662 expect_throttler_finish_op("sync_id", &on_finish
);
663 instance_watcher2
->notify_sync_complete("sync_id");
664 ASSERT_EQ(0, on_finish
.wait());
667 TEST_F(TestMockInstanceWatcher_NotifySync
, CancelWaitingOnNonLeader
) {
670 C_SaferCond on_start_op_called
;
671 Context
*on_start_ctx
;
672 expect_throttler_start_op("sync_id", &on_start_op_called
,
674 C_SaferCond on_start
;
675 instance_watcher2
->notify_sync_request("sync_id", &on_start
);
676 ASSERT_EQ(0, on_start_op_called
.wait());
678 ASSERT_TRUE(instance_watcher2
->cancel_sync_request("sync_id"));
679 // emulate watcher timeout
680 on_start_ctx
->complete(-ETIMEDOUT
);
681 ASSERT_EQ(-ECANCELED
, on_start
.wait());
684 TEST_F(TestMockInstanceWatcher_NotifySync
, InFlightPrevNotification
) {
685 // start sync when previous notification is still in flight
689 expect_throttler_start_op("sync_id");
690 C_SaferCond on_start1
;
691 instance_watcher2
->notify_sync_request("sync_id", &on_start1
);
692 ASSERT_EQ(0, on_start1
.wait());
694 C_SaferCond on_start2
;
695 EXPECT_CALL(mock_image_sync_throttler
, finish_op("sync_id"))
696 .WillOnce(Invoke([this, &on_start2
](const std::string
&) {
697 instance_watcher2
->notify_sync_request("sync_id", &on_start2
);
699 expect_throttler_start_op("sync_id");
700 instance_watcher2
->notify_sync_complete("sync_id");
702 ASSERT_EQ(0, on_start2
.wait());
703 C_SaferCond on_finish
;
704 expect_throttler_finish_op("sync_id", &on_finish
);
705 instance_watcher2
->notify_sync_complete("sync_id");
706 ASSERT_EQ(0, on_finish
.wait());
709 TEST_F(TestMockInstanceWatcher_NotifySync
, NoInFlightReleaseAcquireLeader
) {
712 expect_throttler_destroy();
713 instance_watcher1
->handle_release_leader();
714 instance_watcher1
->handle_acquire_leader();
717 TEST_F(TestMockInstanceWatcher_NotifySync
, StartedOnLeaderReleaseLeader
) {
720 expect_throttler_destroy();
721 instance_watcher1
->handle_release_leader();
722 instance_watcher2
->handle_acquire_leader();
724 expect_throttler_start_op("sync_id");
725 C_SaferCond on_start
;
726 instance_watcher2
->notify_sync_request("sync_id", &on_start
);
727 ASSERT_EQ(0, on_start
.wait());
728 expect_throttler_destroy();
729 instance_watcher2
->handle_release_leader();
730 instance_watcher2
->notify_sync_complete("sync_id");
732 instance_watcher1
->handle_acquire_leader();
735 TEST_F(TestMockInstanceWatcher_NotifySync
, WaitingOnLeaderReleaseLeader
) {
738 C_SaferCond on_start_op_called
;
739 Context
*on_start_ctx
;
740 expect_throttler_start_op("sync_id", &on_start_op_called
,
742 C_SaferCond on_start
;
743 instance_watcher1
->notify_sync_request("sync_id", &on_start
);
744 ASSERT_EQ(0, on_start_op_called
.wait());
746 std::vector
<Context
*> throttler_queue
= {on_start_ctx
};
747 expect_throttler_destroy(&throttler_queue
);
748 instance_watcher1
->handle_release_leader();
749 instance_watcher2
->handle_acquire_leader();
750 instance_watcher1
->handle_update_leader(instance_id2
);
752 expect_throttler_start_op("sync_id");
753 ASSERT_EQ(0, on_start
.wait());
754 C_SaferCond on_finish
;
755 expect_throttler_finish_op("sync_id", &on_finish
);
756 instance_watcher1
->notify_sync_complete("sync_id");
757 ASSERT_EQ(0, on_finish
.wait());
759 expect_throttler_destroy();
760 instance_watcher2
->handle_release_leader();
761 instance_watcher1
->handle_acquire_leader();
764 TEST_F(TestMockInstanceWatcher_NotifySync
, StartedOnNonLeaderAcquireLeader
) {
767 expect_throttler_destroy();
768 instance_watcher1
->handle_release_leader();
769 instance_watcher2
->handle_acquire_leader();
770 instance_watcher1
->handle_update_leader(instance_id2
);
772 expect_throttler_start_op("sync_id");
773 C_SaferCond on_start
;
774 instance_watcher1
->notify_sync_request("sync_id", &on_start
);
775 ASSERT_EQ(0, on_start
.wait());
777 expect_throttler_destroy();
778 instance_watcher2
->handle_release_leader();
779 instance_watcher1
->handle_acquire_leader();
780 instance_watcher2
->handle_update_leader(instance_id2
);
782 instance_watcher1
->notify_sync_complete("sync_id");
785 TEST_F(TestMockInstanceWatcher_NotifySync
, WaitingOnNonLeaderAcquireLeader
) {
788 C_SaferCond on_start_op_called
;
789 Context
*on_start_ctx
;
790 expect_throttler_start_op("sync_id", &on_start_op_called
,
792 C_SaferCond on_start
;
793 instance_watcher2
->notify_sync_request("sync_id", &on_start
);
794 ASSERT_EQ(0, on_start_op_called
.wait());
796 std::vector
<Context
*> throttler_queue
= {on_start_ctx
};
797 expect_throttler_destroy(&throttler_queue
);
798 instance_watcher1
->handle_release_leader();
800 EXPECT_CALL(mock_image_sync_throttler
, start_op("sync_id", _
))
801 .WillOnce(WithArg
<1>(CompleteContext(0)));
802 instance_watcher2
->handle_acquire_leader();
803 instance_watcher1
->handle_update_leader(instance_id2
);
805 ASSERT_EQ(0, on_start
.wait());
807 C_SaferCond on_finish
;
808 expect_throttler_finish_op("sync_id", &on_finish
);
809 instance_watcher2
->notify_sync_complete("sync_id");
810 ASSERT_EQ(0, on_finish
.wait());
812 expect_throttler_destroy();
813 instance_watcher2
->handle_release_leader();
814 instance_watcher1
->handle_acquire_leader();
817 } // namespace mirror