1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "librados/AioCompletionImpl.h"
5 #include "librbd/ManagedLock.h"
6 #include "test/librados/test.h"
7 #include "test/librados_test_stub/MockTestMemIoCtxImpl.h"
8 #include "test/librados_test_stub/MockTestMemRadosClient.h"
9 #include "test/librbd/mock/MockImageCtx.h"
10 #include "test/rbd_mirror/test_mock_fixture.h"
11 #include "tools/rbd_mirror/InstanceReplayer.h"
12 #include "tools/rbd_mirror/ImageSyncThrottler.h"
13 #include "tools/rbd_mirror/InstanceWatcher.h"
14 #include "tools/rbd_mirror/Threads.h"
20 struct MockTestImageCtx
: public MockImageCtx
{
21 MockTestImageCtx(librbd::ImageCtx
&image_ctx
)
22 : librbd::MockImageCtx(image_ctx
) {
26 } // anonymous namespace
29 struct ManagedLock
<MockTestImageCtx
> {
30 static ManagedLock
* s_instance
;
32 static ManagedLock
*create(librados::IoCtx
& ioctx
, ContextWQ
*work_queue
,
33 const std::string
& oid
, librbd::Watcher
*watcher
,
34 managed_lock::Mode mode
,
35 bool blacklist_on_break_lock
,
36 uint32_t blacklist_expire_seconds
) {
37 assert(s_instance
!= nullptr);
42 assert(s_instance
== nullptr);
47 assert(s_instance
== this);
51 MOCK_METHOD0(destroy
, void());
52 MOCK_METHOD1(shut_down
, void(Context
*));
53 MOCK_METHOD1(acquire_lock
, void(Context
*));
54 MOCK_METHOD2(get_locker
, void(managed_lock::Locker
*, Context
*));
55 MOCK_METHOD3(break_lock
, void(const managed_lock::Locker
&, bool, Context
*));
58 ManagedLock
<MockTestImageCtx
> *ManagedLock
<MockTestImageCtx
>::s_instance
= nullptr;
66 struct Threads
<librbd::MockTestImageCtx
> {
69 ContextWQ
*work_queue
;
71 Threads(Threads
<librbd::ImageCtx
> *threads
)
72 : timer_lock(threads
->timer_lock
), timer(threads
->timer
),
73 work_queue(threads
->work_queue
) {
78 struct InstanceReplayer
<librbd::MockTestImageCtx
> {
79 MOCK_METHOD3(acquire_image
, void(InstanceWatcher
<librbd::MockTestImageCtx
> *,
80 const std::string
&, Context
*));
81 MOCK_METHOD2(release_image
, void(const std::string
&, Context
*));
82 MOCK_METHOD3(remove_peer_image
, void(const std::string
&, const std::string
&,
87 struct ImageSyncThrottler
<librbd::MockTestImageCtx
> {
88 static ImageSyncThrottler
* s_instance
;
90 static ImageSyncThrottler
*create() {
91 assert(s_instance
!= nullptr);
95 ImageSyncThrottler() {
96 assert(s_instance
== nullptr);
100 virtual ~ImageSyncThrottler() {
101 assert(s_instance
== this);
102 s_instance
= nullptr;
105 MOCK_METHOD0(destroy
, void());
106 MOCK_METHOD1(drain
, void(int));
107 MOCK_METHOD2(start_op
, void(const std::string
&, Context
*));
108 MOCK_METHOD1(finish_op
, void(const std::string
&));
111 ImageSyncThrottler
<librbd::MockTestImageCtx
>* ImageSyncThrottler
<librbd::MockTestImageCtx
>::s_instance
= nullptr;
113 } // namespace mirror
116 // template definitions
117 #include "tools/rbd_mirror/InstanceWatcher.cc"
123 using ::testing::InSequence
;
124 using ::testing::Invoke
;
125 using ::testing::Return
;
126 using ::testing::StrEq
;
127 using ::testing::WithArg
;
129 class TestMockInstanceWatcher
: public TestMockFixture
{
131 typedef librbd::ManagedLock
<librbd::MockTestImageCtx
> MockManagedLock
;
132 typedef InstanceReplayer
<librbd::MockTestImageCtx
> MockInstanceReplayer
;
133 typedef InstanceWatcher
<librbd::MockTestImageCtx
> MockInstanceWatcher
;
134 typedef Threads
<librbd::MockTestImageCtx
> MockThreads
;
136 std::string m_instance_id
;
138 MockThreads
*m_mock_threads
;
140 void SetUp() override
{
141 TestFixture::SetUp();
142 m_local_io_ctx
.remove(RBD_MIRROR_LEADER
);
143 EXPECT_EQ(0, m_local_io_ctx
.create(RBD_MIRROR_LEADER
, true));
145 m_instance_id
= stringify(m_local_io_ctx
.get_instance_id());
146 m_oid
= RBD_MIRROR_INSTANCE_PREFIX
+ m_instance_id
;
148 m_mock_threads
= new MockThreads(m_threads
);
151 void TearDown() override
{
152 delete m_mock_threads
;
153 TestMockFixture::TearDown();
156 void expect_register_watch(librados::MockTestMemIoCtxImpl
&mock_io_ctx
) {
157 EXPECT_CALL(mock_io_ctx
, aio_watch(m_oid
, _
, _
, _
));
160 void expect_register_watch(librados::MockTestMemIoCtxImpl
&mock_io_ctx
,
161 const std::string
&instance_id
) {
162 std::string oid
= RBD_MIRROR_INSTANCE_PREFIX
+ instance_id
;
163 EXPECT_CALL(mock_io_ctx
, aio_watch(oid
, _
, _
, _
));
166 void expect_unregister_watch(librados::MockTestMemIoCtxImpl
&mock_io_ctx
) {
167 EXPECT_CALL(mock_io_ctx
, aio_unwatch(_
, _
));
170 void expect_register_instance(librados::MockTestMemIoCtxImpl
&mock_io_ctx
,
172 EXPECT_CALL(mock_io_ctx
, exec(RBD_MIRROR_LEADER
, _
, StrEq("rbd"),
173 StrEq("mirror_instances_add"), _
, _
, _
))
174 .WillOnce(Return(r
));
177 void expect_unregister_instance(librados::MockTestMemIoCtxImpl
&mock_io_ctx
,
179 EXPECT_CALL(mock_io_ctx
, exec(RBD_MIRROR_LEADER
, _
, StrEq("rbd"),
180 StrEq("mirror_instances_remove"), _
, _
, _
))
181 .WillOnce(Return(r
));
184 void expect_acquire_lock(MockManagedLock
&mock_managed_lock
, int r
) {
185 EXPECT_CALL(mock_managed_lock
, acquire_lock(_
))
186 .WillOnce(CompleteContext(r
));
189 void expect_release_lock(MockManagedLock
&mock_managed_lock
, int r
) {
190 EXPECT_CALL(mock_managed_lock
, shut_down(_
)).WillOnce(CompleteContext(r
));
193 void expect_destroy_lock(MockManagedLock
&mock_managed_lock
,
194 Context
*ctx
= nullptr) {
195 EXPECT_CALL(mock_managed_lock
, destroy())
196 .WillOnce(Invoke([ctx
]() {
197 if (ctx
!= nullptr) {
203 void expect_get_locker(MockManagedLock
&mock_managed_lock
,
204 const librbd::managed_lock::Locker
&locker
, int r
) {
205 EXPECT_CALL(mock_managed_lock
, get_locker(_
, _
))
206 .WillOnce(Invoke([r
, locker
](librbd::managed_lock::Locker
*out
,
215 void expect_break_lock(MockManagedLock
&mock_managed_lock
,
216 const librbd::managed_lock::Locker
&locker
, int r
) {
217 EXPECT_CALL(mock_managed_lock
, break_lock(locker
, true, _
))
218 .WillOnce(WithArg
<2>(CompleteContext(r
)));
222 TEST_F(TestMockInstanceWatcher
, InitShutdown
) {
223 MockManagedLock mock_managed_lock
;
224 librados::MockTestMemIoCtxImpl
&mock_io_ctx(get_mock_io_ctx(m_local_io_ctx
));
226 auto instance_watcher
= new MockInstanceWatcher(
227 m_local_io_ctx
, m_mock_threads
->work_queue
, nullptr, m_instance_id
);
231 expect_register_instance(mock_io_ctx
, 0);
232 expect_register_watch(mock_io_ctx
);
233 expect_acquire_lock(mock_managed_lock
, 0);
234 ASSERT_EQ(0, instance_watcher
->init());
237 expect_release_lock(mock_managed_lock
, 0);
238 expect_unregister_watch(mock_io_ctx
);
239 expect_unregister_instance(mock_io_ctx
, 0);
240 instance_watcher
->shut_down();
242 expect_destroy_lock(mock_managed_lock
);
243 delete instance_watcher
;
246 TEST_F(TestMockInstanceWatcher
, InitError
) {
247 MockManagedLock mock_managed_lock
;
248 librados::MockTestMemIoCtxImpl
&mock_io_ctx(get_mock_io_ctx(m_local_io_ctx
));
250 auto instance_watcher
= new MockInstanceWatcher(
251 m_local_io_ctx
, m_mock_threads
->work_queue
, nullptr, m_instance_id
);
254 expect_register_instance(mock_io_ctx
, 0);
255 expect_register_watch(mock_io_ctx
);
256 expect_acquire_lock(mock_managed_lock
, -EINVAL
);
257 expect_unregister_watch(mock_io_ctx
);
258 expect_unregister_instance(mock_io_ctx
, 0);
260 ASSERT_EQ(-EINVAL
, instance_watcher
->init());
262 expect_destroy_lock(mock_managed_lock
);
263 delete instance_watcher
;
266 TEST_F(TestMockInstanceWatcher
, ShutdownError
) {
267 MockManagedLock mock_managed_lock
;
268 librados::MockTestMemIoCtxImpl
&mock_io_ctx(get_mock_io_ctx(m_local_io_ctx
));
270 auto instance_watcher
= new MockInstanceWatcher(
271 m_local_io_ctx
, m_mock_threads
->work_queue
, nullptr, m_instance_id
);
275 expect_register_instance(mock_io_ctx
, 0);
276 expect_register_watch(mock_io_ctx
);
277 expect_acquire_lock(mock_managed_lock
, 0);
278 ASSERT_EQ(0, instance_watcher
->init());
281 expect_release_lock(mock_managed_lock
, -EINVAL
);
282 expect_unregister_watch(mock_io_ctx
);
283 expect_unregister_instance(mock_io_ctx
, 0);
284 instance_watcher
->shut_down();
286 expect_destroy_lock(mock_managed_lock
);
287 delete instance_watcher
;
291 TEST_F(TestMockInstanceWatcher
, Remove
) {
292 MockManagedLock mock_managed_lock
;
293 librados::MockTestMemIoCtxImpl
&mock_io_ctx(get_mock_io_ctx(m_local_io_ctx
));
294 librbd::managed_lock::Locker
295 locker
{entity_name_t::CLIENT(1), "auto 123", "1.2.3.4:0/0", 123};
299 expect_get_locker(mock_managed_lock
, locker
, 0);
300 expect_break_lock(mock_managed_lock
, locker
, 0);
301 expect_unregister_instance(mock_io_ctx
, 0);
302 C_SaferCond on_destroy
;
303 expect_destroy_lock(mock_managed_lock
, &on_destroy
);
305 C_SaferCond on_remove
;
306 MockInstanceWatcher::remove_instance(m_local_io_ctx
,
307 m_mock_threads
->work_queue
,
308 "instance_id", &on_remove
);
309 ASSERT_EQ(0, on_remove
.wait());
310 ASSERT_EQ(0, on_destroy
.wait());
313 TEST_F(TestMockInstanceWatcher
, RemoveNoent
) {
314 MockManagedLock mock_managed_lock
;
315 librados::MockTestMemIoCtxImpl
&mock_io_ctx(get_mock_io_ctx(m_local_io_ctx
));
319 expect_get_locker(mock_managed_lock
, librbd::managed_lock::Locker(), -ENOENT
);
320 expect_unregister_instance(mock_io_ctx
, 0);
321 C_SaferCond on_destroy
;
322 expect_destroy_lock(mock_managed_lock
, &on_destroy
);
324 C_SaferCond on_remove
;
325 MockInstanceWatcher::remove_instance(m_local_io_ctx
,
326 m_mock_threads
->work_queue
,
327 "instance_id", &on_remove
);
328 ASSERT_EQ(0, on_remove
.wait());
329 ASSERT_EQ(0, on_destroy
.wait());
332 TEST_F(TestMockInstanceWatcher
, ImageAcquireRelease
) {
333 MockManagedLock mock_managed_lock
;
335 librados::IoCtx
& io_ctx1
= m_local_io_ctx
;
336 std::string instance_id1
= m_instance_id
;
337 librados::MockTestMemIoCtxImpl
&mock_io_ctx1(get_mock_io_ctx(io_ctx1
));
338 MockInstanceReplayer mock_instance_replayer1
;
339 auto instance_watcher1
= MockInstanceWatcher::create(
340 io_ctx1
, m_mock_threads
->work_queue
, &mock_instance_replayer1
);
342 librados::Rados cluster
;
343 librados::IoCtx io_ctx2
;
344 EXPECT_EQ("", connect_cluster_pp(cluster
));
345 EXPECT_EQ(0, cluster
.ioctx_create(_local_pool_name
.c_str(), io_ctx2
));
346 std::string instance_id2
= stringify(io_ctx2
.get_instance_id());
347 librados::MockTestMemIoCtxImpl
&mock_io_ctx2(get_mock_io_ctx(io_ctx2
));
348 MockInstanceReplayer mock_instance_replayer2
;
349 auto instance_watcher2
= MockInstanceWatcher::create(
350 io_ctx2
, m_mock_threads
->work_queue
, &mock_instance_replayer2
);
354 // Init instance watcher 1
355 expect_register_instance(mock_io_ctx1
, 0);
356 expect_register_watch(mock_io_ctx1
, instance_id1
);
357 expect_acquire_lock(mock_managed_lock
, 0);
358 ASSERT_EQ(0, instance_watcher1
->init());
360 // Init instance watcher 2
361 expect_register_instance(mock_io_ctx2
, 0);
362 expect_register_watch(mock_io_ctx2
, instance_id2
);
363 expect_acquire_lock(mock_managed_lock
, 0);
364 ASSERT_EQ(0, instance_watcher2
->init());
366 // Acquire Image on the the same instance
367 EXPECT_CALL(mock_instance_replayer1
, acquire_image(instance_watcher1
, "gid",
369 .WillOnce(WithArg
<2>(CompleteContext(0)));
370 C_SaferCond on_acquire1
;
371 instance_watcher1
->notify_image_acquire(instance_id1
, "gid", &on_acquire1
);
372 ASSERT_EQ(0, on_acquire1
.wait());
374 // Acquire Image on the other instance
375 EXPECT_CALL(mock_instance_replayer2
, acquire_image(instance_watcher2
, "gid",
377 .WillOnce(WithArg
<2>(CompleteContext(0)));
378 C_SaferCond on_acquire2
;
379 instance_watcher1
->notify_image_acquire(instance_id2
, "gid", &on_acquire2
);
380 ASSERT_EQ(0, on_acquire2
.wait());
382 // Release Image on the the same instance
383 EXPECT_CALL(mock_instance_replayer1
, release_image("gid", _
))
384 .WillOnce(WithArg
<1>(CompleteContext(0)));
385 C_SaferCond on_release1
;
386 instance_watcher1
->notify_image_release(instance_id1
, "gid", &on_release1
);
387 ASSERT_EQ(0, on_release1
.wait());
389 // Release Image on the other instance
390 EXPECT_CALL(mock_instance_replayer2
, release_image("gid", _
))
391 .WillOnce(WithArg
<1>(CompleteContext(0)));
392 C_SaferCond on_release2
;
393 instance_watcher1
->notify_image_release(instance_id2
, "gid", &on_release2
);
394 ASSERT_EQ(0, on_release2
.wait());
396 // Shutdown instance watcher 1
397 expect_release_lock(mock_managed_lock
, 0);
398 expect_unregister_watch(mock_io_ctx1
);
399 expect_unregister_instance(mock_io_ctx1
, 0);
400 instance_watcher1
->shut_down();
402 expect_destroy_lock(mock_managed_lock
);
403 delete instance_watcher1
;
405 // Shutdown instance watcher 2
406 expect_release_lock(mock_managed_lock
, 0);
407 expect_unregister_watch(mock_io_ctx2
);
408 expect_unregister_instance(mock_io_ctx2
, 0);
409 instance_watcher2
->shut_down();
411 expect_destroy_lock(mock_managed_lock
);
412 delete instance_watcher2
;
415 TEST_F(TestMockInstanceWatcher
, PeerImageRemoved
) {
416 MockManagedLock mock_managed_lock
;
418 librados::IoCtx
& io_ctx1
= m_local_io_ctx
;
419 std::string instance_id1
= m_instance_id
;
420 librados::MockTestMemIoCtxImpl
&mock_io_ctx1(get_mock_io_ctx(io_ctx1
));
421 MockInstanceReplayer mock_instance_replayer1
;
422 auto instance_watcher1
= MockInstanceWatcher::create(
423 io_ctx1
, m_mock_threads
->work_queue
, &mock_instance_replayer1
);
425 librados::Rados cluster
;
426 librados::IoCtx io_ctx2
;
427 EXPECT_EQ("", connect_cluster_pp(cluster
));
428 EXPECT_EQ(0, cluster
.ioctx_create(_local_pool_name
.c_str(), io_ctx2
));
429 std::string instance_id2
= stringify(io_ctx2
.get_instance_id());
430 librados::MockTestMemIoCtxImpl
&mock_io_ctx2(get_mock_io_ctx(io_ctx2
));
431 MockInstanceReplayer mock_instance_replayer2
;
432 auto instance_watcher2
= MockInstanceWatcher::create(
433 io_ctx2
, m_mock_threads
->work_queue
, &mock_instance_replayer2
);
437 // Init instance watcher 1
438 expect_register_instance(mock_io_ctx1
, 0);
439 expect_register_watch(mock_io_ctx1
, instance_id1
);
440 expect_acquire_lock(mock_managed_lock
, 0);
441 ASSERT_EQ(0, instance_watcher1
->init());
443 // Init instance watcher 2
444 expect_register_instance(mock_io_ctx2
, 0);
445 expect_register_watch(mock_io_ctx2
, instance_id2
);
446 expect_acquire_lock(mock_managed_lock
, 0);
447 ASSERT_EQ(0, instance_watcher2
->init());
449 // Peer Image Removed on the same instance
450 EXPECT_CALL(mock_instance_replayer1
, remove_peer_image("gid", "uuid", _
))
451 .WillOnce(WithArg
<2>(CompleteContext(0)));
452 C_SaferCond on_removed1
;
453 instance_watcher1
->notify_peer_image_removed(instance_id1
, "gid", "uuid",
455 ASSERT_EQ(0, on_removed1
.wait());
457 // Peer Image Removed on the other instance
458 EXPECT_CALL(mock_instance_replayer2
, remove_peer_image("gid", "uuid", _
))
459 .WillOnce(WithArg
<2>(CompleteContext(0)));
460 C_SaferCond on_removed2
;
461 instance_watcher1
->notify_peer_image_removed(instance_id2
, "gid", "uuid",
463 ASSERT_EQ(0, on_removed2
.wait());
465 // Shutdown instance watcher 1
466 expect_release_lock(mock_managed_lock
, 0);
467 expect_unregister_watch(mock_io_ctx1
);
468 expect_unregister_instance(mock_io_ctx1
, 0);
469 instance_watcher1
->shut_down();
471 expect_destroy_lock(mock_managed_lock
);
472 delete instance_watcher1
;
474 // Shutdown instance watcher 2
475 expect_release_lock(mock_managed_lock
, 0);
476 expect_unregister_watch(mock_io_ctx2
);
477 expect_unregister_instance(mock_io_ctx2
, 0);
478 instance_watcher2
->shut_down();
480 expect_destroy_lock(mock_managed_lock
);
481 delete instance_watcher2
;
484 TEST_F(TestMockInstanceWatcher
, ImageAcquireReleaseCancel
) {
485 MockManagedLock mock_managed_lock
;
486 librados::MockTestMemIoCtxImpl
&mock_io_ctx(get_mock_io_ctx(m_local_io_ctx
));
488 auto instance_watcher
= new MockInstanceWatcher(
489 m_local_io_ctx
, m_mock_threads
->work_queue
, nullptr, m_instance_id
);
493 expect_register_instance(mock_io_ctx
, 0);
494 expect_register_watch(mock_io_ctx
);
495 expect_acquire_lock(mock_managed_lock
, 0);
496 ASSERT_EQ(0, instance_watcher
->init());
498 // Send Acquire Image and cancel
499 EXPECT_CALL(mock_io_ctx
, aio_notify(_
, _
, _
, _
, _
))
501 [this, instance_watcher
, &mock_io_ctx
](
502 const std::string
& o
, librados::AioCompletionImpl
*c
,
503 bufferlist
& bl
, uint64_t timeout_ms
, bufferlist
*pbl
) {
505 auto ctx
= new FunctionContext(
506 [instance_watcher
, &mock_io_ctx
, c
, pbl
](int r
) {
507 instance_watcher
->cancel_notify_requests("other");
508 ::encode(librbd::watcher::NotifyResponse(), *pbl
);
509 mock_io_ctx
.get_mock_rados_client()->
510 finish_aio_completion(c
, -ETIMEDOUT
);
512 m_threads
->work_queue
->queue(ctx
, 0);
515 C_SaferCond on_acquire
;
516 instance_watcher
->notify_image_acquire("other", "gid", &on_acquire
);
517 ASSERT_EQ(-ECANCELED
, on_acquire
.wait());
519 // Send Release Image and cancel
520 EXPECT_CALL(mock_io_ctx
, aio_notify(_
, _
, _
, _
, _
))
522 [this, instance_watcher
, &mock_io_ctx
](
523 const std::string
& o
, librados::AioCompletionImpl
*c
,
524 bufferlist
& bl
, uint64_t timeout_ms
, bufferlist
*pbl
) {
526 auto ctx
= new FunctionContext(
527 [instance_watcher
, &mock_io_ctx
, c
, pbl
](int r
) {
528 instance_watcher
->cancel_notify_requests("other");
529 ::encode(librbd::watcher::NotifyResponse(), *pbl
);
530 mock_io_ctx
.get_mock_rados_client()->
531 finish_aio_completion(c
, -ETIMEDOUT
);
533 m_threads
->work_queue
->queue(ctx
, 0);
536 C_SaferCond on_release
;
537 instance_watcher
->notify_image_release("other", "gid", &on_release
);
538 ASSERT_EQ(-ECANCELED
, on_release
.wait());
541 expect_release_lock(mock_managed_lock
, 0);
542 expect_unregister_watch(mock_io_ctx
);
543 expect_unregister_instance(mock_io_ctx
, 0);
544 instance_watcher
->shut_down();
546 expect_destroy_lock(mock_managed_lock
);
547 delete instance_watcher
;
550 TEST_F(TestMockInstanceWatcher
, PeerImageRemovedCancel
) {
551 MockManagedLock mock_managed_lock
;
552 librados::MockTestMemIoCtxImpl
&mock_io_ctx(get_mock_io_ctx(m_local_io_ctx
));
554 auto instance_watcher
= new MockInstanceWatcher(
555 m_local_io_ctx
, m_mock_threads
->work_queue
, nullptr, m_instance_id
);
559 expect_register_instance(mock_io_ctx
, 0);
560 expect_register_watch(mock_io_ctx
);
561 expect_acquire_lock(mock_managed_lock
, 0);
562 ASSERT_EQ(0, instance_watcher
->init());
564 // Send Acquire Image and cancel
565 EXPECT_CALL(mock_io_ctx
, aio_notify(_
, _
, _
, _
, _
))
567 [this, instance_watcher
, &mock_io_ctx
](
568 const std::string
& o
, librados::AioCompletionImpl
*c
,
569 bufferlist
& bl
, uint64_t timeout_ms
, bufferlist
*pbl
) {
571 auto ctx
= new FunctionContext(
572 [instance_watcher
, &mock_io_ctx
, c
, pbl
](int r
) {
573 instance_watcher
->cancel_notify_requests("other");
574 ::encode(librbd::watcher::NotifyResponse(), *pbl
);
575 mock_io_ctx
.get_mock_rados_client()->
576 finish_aio_completion(c
, -ETIMEDOUT
);
578 m_threads
->work_queue
->queue(ctx
, 0);
581 C_SaferCond on_acquire
;
582 instance_watcher
->notify_peer_image_removed("other", "gid", "uuid",
584 ASSERT_EQ(-ECANCELED
, on_acquire
.wait());
587 expect_release_lock(mock_managed_lock
, 0);
588 expect_unregister_watch(mock_io_ctx
);
589 expect_unregister_instance(mock_io_ctx
, 0);
590 instance_watcher
->shut_down();
592 expect_destroy_lock(mock_managed_lock
);
593 delete instance_watcher
;
597 class TestMockInstanceWatcher_NotifySync
: public TestMockInstanceWatcher
{
599 typedef ImageSyncThrottler
<librbd::MockTestImageCtx
> MockImageSyncThrottler
;
601 MockManagedLock mock_managed_lock
;
602 MockImageSyncThrottler mock_image_sync_throttler
;
603 std::string instance_id1
;
604 std::string instance_id2
;
606 librados::Rados cluster
;
607 librados::IoCtx io_ctx2
;
609 MockInstanceWatcher
*instance_watcher1
;
610 MockInstanceWatcher
*instance_watcher2
;
612 void SetUp() override
{
613 TestMockInstanceWatcher::SetUp();
615 instance_id1
= m_instance_id
;
616 librados::IoCtx
& io_ctx1
= m_local_io_ctx
;
617 librados::MockTestMemIoCtxImpl
&mock_io_ctx1(get_mock_io_ctx(io_ctx1
));
618 instance_watcher1
= MockInstanceWatcher::create(io_ctx1
,
619 m_mock_threads
->work_queue
,
621 EXPECT_EQ("", connect_cluster_pp(cluster
));
622 EXPECT_EQ(0, cluster
.ioctx_create(_local_pool_name
.c_str(), io_ctx2
));
623 instance_id2
= stringify(io_ctx2
.get_instance_id());
624 librados::MockTestMemIoCtxImpl
&mock_io_ctx2(get_mock_io_ctx(io_ctx2
));
625 instance_watcher2
= MockInstanceWatcher::create(io_ctx2
,
626 m_mock_threads
->work_queue
,
630 // Init instance watcher 1 (leader)
631 expect_register_instance(mock_io_ctx1
, 0);
632 expect_register_watch(mock_io_ctx1
, instance_id1
);
633 expect_acquire_lock(mock_managed_lock
, 0);
634 EXPECT_EQ(0, instance_watcher1
->init());
635 instance_watcher1
->handle_acquire_leader();
637 // Init instance watcher 2
638 expect_register_instance(mock_io_ctx2
, 0);
639 expect_register_watch(mock_io_ctx2
, instance_id2
);
640 expect_acquire_lock(mock_managed_lock
, 0);
641 EXPECT_EQ(0, instance_watcher2
->init());
642 instance_watcher2
->handle_update_leader(instance_id1
);
645 void TearDown() override
{
646 librados::IoCtx
& io_ctx1
= m_local_io_ctx
;
647 librados::MockTestMemIoCtxImpl
&mock_io_ctx1(get_mock_io_ctx(io_ctx1
));
648 librados::MockTestMemIoCtxImpl
&mock_io_ctx2(get_mock_io_ctx(io_ctx2
));
652 expect_throttler_destroy();
653 instance_watcher1
->handle_release_leader();
655 // Shutdown instance watcher 1
656 expect_release_lock(mock_managed_lock
, 0);
657 expect_unregister_watch(mock_io_ctx1
);
658 expect_unregister_instance(mock_io_ctx1
, 0);
659 instance_watcher1
->shut_down();
661 expect_destroy_lock(mock_managed_lock
);
662 delete instance_watcher1
;
664 // Shutdown instance watcher 2
665 expect_release_lock(mock_managed_lock
, 0);
666 expect_unregister_watch(mock_io_ctx2
);
667 expect_unregister_instance(mock_io_ctx2
, 0);
668 instance_watcher2
->shut_down();
670 expect_destroy_lock(mock_managed_lock
);
671 delete instance_watcher2
;
673 TestMockInstanceWatcher::TearDown();
676 void expect_throttler_destroy(
677 std::vector
<Context
*> *throttler_queue
= nullptr) {
678 EXPECT_CALL(mock_image_sync_throttler
, drain(-ESTALE
))
679 .WillOnce(Invoke([throttler_queue
] (int r
) {
680 if (throttler_queue
!= nullptr) {
681 for (auto ctx
: *throttler_queue
) {
686 EXPECT_CALL(mock_image_sync_throttler
, destroy());
689 void expect_throttler_start_op(const std::string
&sync_id
,
690 Context
*on_call
= nullptr,
691 Context
**on_start_ctx
= nullptr) {
692 EXPECT_CALL(mock_image_sync_throttler
, start_op(sync_id
, _
))
693 .WillOnce(Invoke([on_call
, on_start_ctx
] (const std::string
&,
695 if (on_call
!= nullptr) {
696 on_call
->complete(0);
698 if (on_start_ctx
!= nullptr) {
706 void expect_throttler_finish_op(const std::string
&sync_id
,
707 Context
*on_finish
) {
708 EXPECT_CALL(mock_image_sync_throttler
, finish_op("sync_id"))
709 .WillOnce(Invoke([on_finish
](const std::string
&) {
710 on_finish
->complete(0);
715 TEST_F(TestMockInstanceWatcher_NotifySync
, StartStopOnLeader
) {
718 expect_throttler_start_op("sync_id");
719 C_SaferCond on_start
;
720 instance_watcher1
->notify_sync_request("sync_id", &on_start
);
721 ASSERT_EQ(0, on_start
.wait());
723 C_SaferCond on_finish
;
724 expect_throttler_finish_op("sync_id", &on_finish
);
725 instance_watcher1
->notify_sync_complete("sync_id");
726 ASSERT_EQ(0, on_finish
.wait());
729 TEST_F(TestMockInstanceWatcher_NotifySync
, CancelStartedOnLeader
) {
732 expect_throttler_start_op("sync_id");
733 C_SaferCond on_start
;
734 instance_watcher1
->notify_sync_request("sync_id", &on_start
);
735 ASSERT_EQ(0, on_start
.wait());
737 ASSERT_FALSE(instance_watcher1
->cancel_sync_request("sync_id"));
739 C_SaferCond on_finish
;
740 expect_throttler_finish_op("sync_id", &on_finish
);
741 instance_watcher1
->notify_sync_complete("sync_id");
742 ASSERT_EQ(0, on_finish
.wait());
745 TEST_F(TestMockInstanceWatcher_NotifySync
, StartStopOnNonLeader
) {
748 expect_throttler_start_op("sync_id");
749 C_SaferCond on_start
;
750 instance_watcher2
->notify_sync_request("sync_id", &on_start
);
751 ASSERT_EQ(0, on_start
.wait());
753 C_SaferCond on_finish
;
754 expect_throttler_finish_op("sync_id", &on_finish
);
755 instance_watcher2
->notify_sync_complete("sync_id");
756 ASSERT_EQ(0, on_finish
.wait());
759 TEST_F(TestMockInstanceWatcher_NotifySync
, CancelStartedOnNonLeader
) {
762 expect_throttler_start_op("sync_id");
763 C_SaferCond on_start
;
764 instance_watcher2
->notify_sync_request("sync_id", &on_start
);
765 ASSERT_EQ(0, on_start
.wait());
767 ASSERT_FALSE(instance_watcher2
->cancel_sync_request("sync_id"));
769 C_SaferCond on_finish
;
770 expect_throttler_finish_op("sync_id", &on_finish
);
771 instance_watcher2
->notify_sync_complete("sync_id");
772 ASSERT_EQ(0, on_finish
.wait());
775 TEST_F(TestMockInstanceWatcher_NotifySync
, CancelWaitingOnNonLeader
) {
778 C_SaferCond on_start_op_called
;
779 Context
*on_start_ctx
;
780 expect_throttler_start_op("sync_id", &on_start_op_called
,
782 C_SaferCond on_start
;
783 instance_watcher2
->notify_sync_request("sync_id", &on_start
);
784 ASSERT_EQ(0, on_start_op_called
.wait());
786 ASSERT_TRUE(instance_watcher2
->cancel_sync_request("sync_id"));
787 // emulate watcher timeout
788 on_start_ctx
->complete(-ETIMEDOUT
);
789 ASSERT_EQ(-ECANCELED
, on_start
.wait());
792 TEST_F(TestMockInstanceWatcher_NotifySync
, InFlightPrevNotification
) {
793 // start sync when previous notification is still in flight
797 expect_throttler_start_op("sync_id");
798 C_SaferCond on_start1
;
799 instance_watcher2
->notify_sync_request("sync_id", &on_start1
);
800 ASSERT_EQ(0, on_start1
.wait());
802 C_SaferCond on_start2
;
803 EXPECT_CALL(mock_image_sync_throttler
, finish_op("sync_id"))
804 .WillOnce(Invoke([this, &on_start2
](const std::string
&) {
805 instance_watcher2
->notify_sync_request("sync_id", &on_start2
);
807 expect_throttler_start_op("sync_id");
808 instance_watcher2
->notify_sync_complete("sync_id");
810 ASSERT_EQ(0, on_start2
.wait());
811 C_SaferCond on_finish
;
812 expect_throttler_finish_op("sync_id", &on_finish
);
813 instance_watcher2
->notify_sync_complete("sync_id");
814 ASSERT_EQ(0, on_finish
.wait());
817 TEST_F(TestMockInstanceWatcher_NotifySync
, NoInFlightReleaseAcquireLeader
) {
820 expect_throttler_destroy();
821 instance_watcher1
->handle_release_leader();
822 instance_watcher1
->handle_acquire_leader();
825 TEST_F(TestMockInstanceWatcher_NotifySync
, StartedOnLeaderReleaseLeader
) {
828 expect_throttler_destroy();
829 instance_watcher1
->handle_release_leader();
830 instance_watcher2
->handle_acquire_leader();
832 expect_throttler_start_op("sync_id");
833 C_SaferCond on_start
;
834 instance_watcher2
->notify_sync_request("sync_id", &on_start
);
835 ASSERT_EQ(0, on_start
.wait());
836 expect_throttler_destroy();
837 instance_watcher2
->handle_release_leader();
838 instance_watcher2
->notify_sync_complete("sync_id");
840 instance_watcher1
->handle_acquire_leader();
843 TEST_F(TestMockInstanceWatcher_NotifySync
, WaitingOnLeaderReleaseLeader
) {
846 C_SaferCond on_start_op_called
;
847 Context
*on_start_ctx
;
848 expect_throttler_start_op("sync_id", &on_start_op_called
,
850 C_SaferCond on_start
;
851 instance_watcher1
->notify_sync_request("sync_id", &on_start
);
852 ASSERT_EQ(0, on_start_op_called
.wait());
854 std::vector
<Context
*> throttler_queue
= {on_start_ctx
};
855 expect_throttler_destroy(&throttler_queue
);
856 instance_watcher1
->handle_release_leader();
857 instance_watcher2
->handle_acquire_leader();
858 instance_watcher1
->handle_update_leader(instance_id2
);
860 expect_throttler_start_op("sync_id");
861 ASSERT_EQ(0, on_start
.wait());
862 C_SaferCond on_finish
;
863 expect_throttler_finish_op("sync_id", &on_finish
);
864 instance_watcher1
->notify_sync_complete("sync_id");
865 ASSERT_EQ(0, on_finish
.wait());
867 expect_throttler_destroy();
868 instance_watcher2
->handle_release_leader();
869 instance_watcher1
->handle_acquire_leader();
872 TEST_F(TestMockInstanceWatcher_NotifySync
, StartedOnNonLeaderAcquireLeader
) {
875 expect_throttler_destroy();
876 instance_watcher1
->handle_release_leader();
877 instance_watcher2
->handle_acquire_leader();
878 instance_watcher1
->handle_update_leader(instance_id2
);
880 expect_throttler_start_op("sync_id");
881 C_SaferCond on_start
;
882 instance_watcher1
->notify_sync_request("sync_id", &on_start
);
883 ASSERT_EQ(0, on_start
.wait());
885 expect_throttler_destroy();
886 instance_watcher2
->handle_release_leader();
887 instance_watcher1
->handle_acquire_leader();
888 instance_watcher2
->handle_update_leader(instance_id2
);
890 instance_watcher1
->notify_sync_complete("sync_id");
893 TEST_F(TestMockInstanceWatcher_NotifySync
, WaitingOnNonLeaderAcquireLeader
) {
896 C_SaferCond on_start_op_called
;
897 Context
*on_start_ctx
;
898 expect_throttler_start_op("sync_id", &on_start_op_called
,
900 C_SaferCond on_start
;
901 instance_watcher2
->notify_sync_request("sync_id", &on_start
);
902 ASSERT_EQ(0, on_start_op_called
.wait());
904 std::vector
<Context
*> throttler_queue
= {on_start_ctx
};
905 expect_throttler_destroy(&throttler_queue
);
906 instance_watcher1
->handle_release_leader();
908 EXPECT_CALL(mock_image_sync_throttler
, start_op("sync_id", _
))
909 .WillOnce(WithArg
<1>(CompleteContext(0)));
910 instance_watcher2
->handle_acquire_leader();
911 instance_watcher1
->handle_update_leader(instance_id2
);
913 ASSERT_EQ(0, on_start
.wait());
915 C_SaferCond on_finish
;
916 expect_throttler_finish_op("sync_id", &on_finish
);
917 instance_watcher2
->notify_sync_complete("sync_id");
918 ASSERT_EQ(0, on_finish
.wait());
920 expect_throttler_destroy();
921 instance_watcher2
->handle_release_leader();
922 instance_watcher1
->handle_acquire_leader();
925 } // namespace mirror