1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "tools/rbd_mirror/PoolWatcher.h"
5 #include "include/rbd_types.h"
6 #include "cls/rbd/cls_rbd_client.h"
7 #include "common/debug.h"
8 #include "common/errno.h"
9 #include "common/Timer.h"
10 #include "librbd/ImageCtx.h"
11 #include "librbd/internal.h"
12 #include "librbd/MirroringWatcher.h"
13 #include "librbd/Utils.h"
14 #include "librbd/api/Image.h"
15 #include "librbd/api/Mirror.h"
16 #include "tools/rbd_mirror/Threads.h"
17 #include "tools/rbd_mirror/pool_watcher/RefreshImagesRequest.h"
18 #include <boost/bind.hpp>
20 #define dout_context g_ceph_context
21 #define dout_subsys ceph_subsys_rbd_mirror
23 #define dout_prefix *_dout << "rbd::mirror::PoolWatcher: " << this << " " \
28 using std::unique_ptr
;
30 using librbd::util::create_context_callback
;
31 using librbd::util::create_rados_callback
;
37 class PoolWatcher
<I
>::MirroringWatcher
: public librbd::MirroringWatcher
<I
> {
39 using ContextWQ
= typename
std::decay
<
40 typename
std::remove_pointer
<
41 decltype(Threads
<I
>::work_queue
)>::type
>::type
;
43 MirroringWatcher(librados::IoCtx
&io_ctx
, ContextWQ
*work_queue
,
44 PoolWatcher
*pool_watcher
)
45 : librbd::MirroringWatcher
<I
>(io_ctx
, work_queue
),
46 m_pool_watcher(pool_watcher
) {
49 void handle_rewatch_complete(int r
) override
{
50 m_pool_watcher
->handle_rewatch_complete(r
);
53 void handle_mode_updated(cls::rbd::MirrorMode mirror_mode
) override
{
54 // invalidate all image state and refresh the pool contents
55 m_pool_watcher
->schedule_refresh_images(5);
58 void handle_image_updated(cls::rbd::MirrorImageState state
,
59 const std::string
&image_id
,
60 const std::string
&global_image_id
) override
{
61 bool enabled
= (state
== cls::rbd::MIRROR_IMAGE_STATE_ENABLED
);
62 m_pool_watcher
->handle_image_updated(image_id
, global_image_id
,
67 PoolWatcher
*m_pool_watcher
;
71 PoolWatcher
<I
>::PoolWatcher(Threads
<I
> *threads
,
72 librados::IoCtx
&io_ctx
,
73 const std::string
& mirror_uuid
,
74 pool_watcher::Listener
&listener
)
77 m_mirror_uuid(mirror_uuid
),
79 m_lock(ceph::make_mutex(librbd::util::unique_lock_name(
80 "rbd::mirror::PoolWatcher", this))) {
81 m_mirroring_watcher
= new MirroringWatcher(m_io_ctx
,
82 m_threads
->work_queue
, this);
86 PoolWatcher
<I
>::~PoolWatcher() {
87 delete m_mirroring_watcher
;
91 bool PoolWatcher
<I
>::is_blacklisted() const {
92 std::lock_guard locker
{m_lock
};
97 void PoolWatcher
<I
>::init(Context
*on_finish
) {
101 std::lock_guard locker
{m_lock
};
102 m_on_init_finish
= on_finish
;
104 ceph_assert(!m_refresh_in_progress
);
105 m_refresh_in_progress
= true;
108 // start async updates for mirror image directory
112 template <typename I
>
113 void PoolWatcher
<I
>::shut_down(Context
*on_finish
) {
117 std::scoped_lock locker
{m_threads
->timer_lock
, m_lock
};
119 ceph_assert(!m_shutting_down
);
120 m_shutting_down
= true;
121 if (m_timer_ctx
!= nullptr) {
122 m_threads
->timer
->cancel_event(m_timer_ctx
);
123 m_timer_ctx
= nullptr;
127 // in-progress unregister tracked as async op
128 unregister_watcher();
130 m_async_op_tracker
.wait_for_ops(on_finish
);
133 template <typename I
>
134 void PoolWatcher
<I
>::register_watcher() {
136 std::lock_guard locker
{m_lock
};
137 ceph_assert(m_image_ids_invalid
);
138 ceph_assert(m_refresh_in_progress
);
141 // if the watch registration is in-flight, let the watcher
142 // handle the transition -- only (re-)register if it's not registered
143 if (!m_mirroring_watcher
->is_unregistered()) {
148 // first time registering or the watch failed
150 m_async_op_tracker
.start_op();
152 Context
*ctx
= create_context_callback
<
153 PoolWatcher
, &PoolWatcher
<I
>::handle_register_watcher
>(this);
154 m_mirroring_watcher
->register_watch(ctx
);
157 template <typename I
>
158 void PoolWatcher
<I
>::handle_register_watcher(int r
) {
159 dout(5) << "r=" << r
<< dendl
;
162 std::lock_guard locker
{m_lock
};
163 ceph_assert(m_image_ids_invalid
);
164 ceph_assert(m_refresh_in_progress
);
166 m_refresh_in_progress
= false;
170 Context
*on_init_finish
= nullptr;
173 } else if (r
== -EBLACKLISTED
) {
174 dout(0) << "detected client is blacklisted" << dendl
;
176 std::lock_guard locker
{m_lock
};
177 m_blacklisted
= true;
178 std::swap(on_init_finish
, m_on_init_finish
);
179 } else if (r
== -ENOENT
) {
180 dout(5) << "mirroring directory does not exist" << dendl
;
181 schedule_refresh_images(30);
183 std::lock_guard locker
{m_lock
};
184 std::swap(on_init_finish
, m_on_init_finish
);
186 derr
<< "unexpected error registering mirroring directory watch: "
187 << cpp_strerror(r
) << dendl
;
188 schedule_refresh_images(10);
191 m_async_op_tracker
.finish_op();
192 if (on_init_finish
!= nullptr) {
193 on_init_finish
->complete(r
);
197 template <typename I
>
198 void PoolWatcher
<I
>::unregister_watcher() {
201 m_async_op_tracker
.start_op();
202 Context
*ctx
= new LambdaContext([this](int r
) {
203 dout(5) << "unregister_watcher: r=" << r
<< dendl
;
205 derr
<< "error unregistering watcher for "
206 << m_mirroring_watcher
->get_oid() << " object: " << cpp_strerror(r
)
209 m_async_op_tracker
.finish_op();
212 m_mirroring_watcher
->unregister_watch(ctx
);
215 template <typename I
>
216 void PoolWatcher
<I
>::refresh_images() {
220 std::lock_guard locker
{m_lock
};
221 ceph_assert(m_image_ids_invalid
);
222 ceph_assert(m_refresh_in_progress
);
224 // clear all pending notification events since we need to perform
225 // a full image list refresh
226 m_pending_added_image_ids
.clear();
227 m_pending_removed_image_ids
.clear();
230 m_async_op_tracker
.start_op();
231 m_refresh_image_ids
.clear();
232 Context
*ctx
= create_context_callback
<
233 PoolWatcher
, &PoolWatcher
<I
>::handle_refresh_images
>(this);
234 auto req
= pool_watcher::RefreshImagesRequest
<I
>::create(m_io_ctx
,
235 &m_refresh_image_ids
,
240 template <typename I
>
241 void PoolWatcher
<I
>::handle_refresh_images(int r
) {
242 dout(5) << "r=" << r
<< dendl
;
244 bool deferred_refresh
= false;
245 bool retry_refresh
= false;
246 Context
*on_init_finish
= nullptr;
248 std::lock_guard locker
{m_lock
};
249 ceph_assert(m_image_ids_invalid
);
250 ceph_assert(m_refresh_in_progress
);
251 m_refresh_in_progress
= false;
254 dout(5) << "mirroring directory not found" << dendl
;
256 m_refresh_image_ids
.clear();
259 if (m_deferred_refresh
) {
260 // need to refresh -- skip the notification
261 deferred_refresh
= true;
263 m_pending_image_ids
= std::move(m_refresh_image_ids
);
264 m_image_ids_invalid
= false;
265 std::swap(on_init_finish
, m_on_init_finish
);
268 } else if (r
== -EBLACKLISTED
) {
269 dout(0) << "detected client is blacklisted during image refresh" << dendl
;
271 m_blacklisted
= true;
272 std::swap(on_init_finish
, m_on_init_finish
);
274 retry_refresh
= true;
278 if (deferred_refresh
) {
279 dout(5) << "scheduling deferred refresh" << dendl
;
280 schedule_refresh_images(0);
281 } else if (retry_refresh
) {
282 derr
<< "failed to retrieve mirroring directory: " << cpp_strerror(r
)
284 schedule_refresh_images(10);
287 m_async_op_tracker
.finish_op();
288 if (on_init_finish
!= nullptr) {
289 on_init_finish
->complete(r
);
293 template <typename I
>
294 void PoolWatcher
<I
>::schedule_refresh_images(double interval
) {
295 std::scoped_lock locker
{m_threads
->timer_lock
, m_lock
};
296 if (m_shutting_down
|| m_refresh_in_progress
|| m_timer_ctx
!= nullptr) {
297 if (m_refresh_in_progress
&& !m_deferred_refresh
) {
298 dout(5) << "deferring refresh until in-flight refresh completes" << dendl
;
299 m_deferred_refresh
= true;
304 m_image_ids_invalid
= true;
305 m_timer_ctx
= m_threads
->timer
->add_event_after(
307 new LambdaContext([this](int r
) {
308 process_refresh_images();
312 template <typename I
>
313 void PoolWatcher
<I
>::handle_rewatch_complete(int r
) {
314 dout(5) << "r=" << r
<< dendl
;
316 if (r
== -EBLACKLISTED
) {
317 dout(0) << "detected client is blacklisted" << dendl
;
319 std::lock_guard locker
{m_lock
};
320 m_blacklisted
= true;
322 } else if (r
== -ENOENT
) {
323 dout(5) << "mirroring directory deleted" << dendl
;
325 derr
<< "unexpected error re-registering mirroring directory watch: "
326 << cpp_strerror(r
) << dendl
;
329 schedule_refresh_images(5);
332 template <typename I
>
333 void PoolWatcher
<I
>::handle_image_updated(const std::string
&id
,
334 const std::string
&global_image_id
,
336 dout(10) << "image_id=" << id
<< ", "
337 << "global_image_id=" << global_image_id
<< ", "
338 << "enabled=" << enabled
<< dendl
;
340 std::lock_guard locker
{m_lock
};
341 ImageId
image_id(global_image_id
, id
);
342 m_pending_added_image_ids
.erase(image_id
);
343 m_pending_removed_image_ids
.erase(image_id
);
346 m_pending_added_image_ids
.insert(image_id
);
349 m_pending_removed_image_ids
.insert(image_id
);
354 template <typename I
>
355 void PoolWatcher
<I
>::process_refresh_images() {
356 ceph_assert(ceph_mutex_is_locked(m_threads
->timer_lock
));
357 ceph_assert(m_timer_ctx
!= nullptr);
358 m_timer_ctx
= nullptr;
361 std::lock_guard locker
{m_lock
};
362 ceph_assert(!m_refresh_in_progress
);
363 m_refresh_in_progress
= true;
364 m_deferred_refresh
= false;
367 // execute outside of the timer's lock
368 m_async_op_tracker
.start_op();
369 Context
*ctx
= new LambdaContext([this](int r
) {
371 m_async_op_tracker
.finish_op();
373 m_threads
->work_queue
->queue(ctx
, 0);
376 template <typename I
>
377 void PoolWatcher
<I
>::schedule_listener() {
378 ceph_assert(ceph_mutex_is_locked(m_lock
));
379 m_pending_updates
= true;
380 if (m_shutting_down
|| m_image_ids_invalid
|| m_notify_listener_in_progress
) {
386 m_async_op_tracker
.start_op();
387 Context
*ctx
= new LambdaContext([this](int r
) {
389 m_async_op_tracker
.finish_op();
392 m_notify_listener_in_progress
= true;
393 m_threads
->work_queue
->queue(ctx
, 0);
396 template <typename I
>
397 void PoolWatcher
<I
>::notify_listener() {
400 std::string mirror_uuid
;
401 ImageIds added_image_ids
;
402 ImageIds removed_image_ids
;
404 std::lock_guard locker
{m_lock
};
405 ceph_assert(m_notify_listener_in_progress
);
408 if (!removed_image_ids
.empty()) {
409 m_listener
.handle_update(mirror_uuid
, {}, std::move(removed_image_ids
));
410 removed_image_ids
.clear();
414 std::lock_guard locker
{m_lock
};
415 ceph_assert(m_notify_listener_in_progress
);
417 // if the watch failed while we didn't own the lock, we are going
418 // to need to perform a full refresh
419 if (m_image_ids_invalid
) {
420 m_notify_listener_in_progress
= false;
424 // merge add/remove notifications into pending set (a given image
425 // can only be in one set or another)
426 for (auto &image_id
: m_pending_removed_image_ids
) {
427 dout(20) << "image_id=" << image_id
<< dendl
;
428 m_pending_image_ids
.erase(image_id
);
431 for (auto &image_id
: m_pending_added_image_ids
) {
432 dout(20) << "image_id=" << image_id
<< dendl
;
433 m_pending_image_ids
.erase(image_id
);
434 m_pending_image_ids
.insert(image_id
);
436 m_pending_added_image_ids
.clear();
438 // compute added/removed images
439 for (auto &image_id
: m_image_ids
) {
440 auto it
= m_pending_image_ids
.find(image_id
);
441 if (it
== m_pending_image_ids
.end() || it
->id
!= image_id
.id
) {
442 removed_image_ids
.insert(image_id
);
445 for (auto &image_id
: m_pending_image_ids
) {
446 auto it
= m_image_ids
.find(image_id
);
447 if (it
== m_image_ids
.end() || it
->id
!= image_id
.id
) {
448 added_image_ids
.insert(image_id
);
452 m_pending_updates
= false;
453 m_image_ids
= m_pending_image_ids
;
456 m_listener
.handle_update(m_mirror_uuid
, std::move(added_image_ids
),
457 std::move(removed_image_ids
));
460 std::lock_guard locker
{m_lock
};
461 m_notify_listener_in_progress
= false;
462 if (m_pending_updates
) {
468 } // namespace mirror
471 template class rbd::mirror::PoolWatcher
<librbd::ImageCtx
>;