]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/PoolWatcher.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / tools / rbd_mirror / PoolWatcher.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "tools/rbd_mirror/PoolWatcher.h"
5 #include "include/rbd_types.h"
6 #include "cls/rbd/cls_rbd_client.h"
7 #include "common/debug.h"
8 #include "common/errno.h"
9 #include "common/Timer.h"
10 #include "librbd/ImageCtx.h"
11 #include "librbd/internal.h"
12 #include "librbd/MirroringWatcher.h"
13 #include "librbd/Utils.h"
14 #include "librbd/api/Image.h"
15 #include "librbd/api/Mirror.h"
16 #include "tools/rbd_mirror/Threads.h"
17 #include "tools/rbd_mirror/pool_watcher/RefreshImagesRequest.h"
18 #include <boost/bind.hpp>
19
20 #define dout_context g_ceph_context
21 #define dout_subsys ceph_subsys_rbd_mirror
22 #undef dout_prefix
23 #define dout_prefix *_dout << "rbd::mirror::PoolWatcher: " << this << " " \
24 << __func__ << ": "
25
26 using std::list;
27 using std::string;
28 using std::unique_ptr;
29 using std::vector;
30 using librbd::util::create_context_callback;
31 using librbd::util::create_rados_callback;
32
33 namespace rbd {
34 namespace mirror {
35
36 template <typename I>
37 class PoolWatcher<I>::MirroringWatcher : public librbd::MirroringWatcher<I> {
38 public:
39 using ContextWQ = typename std::decay<
40 typename std::remove_pointer<
41 decltype(Threads<I>::work_queue)>::type>::type;
42
43 MirroringWatcher(librados::IoCtx &io_ctx, ContextWQ *work_queue,
44 PoolWatcher *pool_watcher)
45 : librbd::MirroringWatcher<I>(io_ctx, work_queue),
46 m_pool_watcher(pool_watcher) {
47 }
48
49 void handle_rewatch_complete(int r) override {
50 m_pool_watcher->handle_rewatch_complete(r);
51 }
52
53 void handle_mode_updated(cls::rbd::MirrorMode mirror_mode) override {
54 // invalidate all image state and refresh the pool contents
55 m_pool_watcher->schedule_refresh_images(5);
56 }
57
58 void handle_image_updated(cls::rbd::MirrorImageState state,
59 const std::string &image_id,
60 const std::string &global_image_id) override {
61 bool enabled = (state == cls::rbd::MIRROR_IMAGE_STATE_ENABLED);
62 m_pool_watcher->handle_image_updated(image_id, global_image_id,
63 enabled);
64 }
65
66 private:
67 PoolWatcher *m_pool_watcher;
68 };
69
70 template <typename I>
71 PoolWatcher<I>::PoolWatcher(Threads<I> *threads,
72 librados::IoCtx &io_ctx,
73 const std::string& mirror_uuid,
74 pool_watcher::Listener &listener)
75 : m_threads(threads),
76 m_io_ctx(io_ctx),
77 m_mirror_uuid(mirror_uuid),
78 m_listener(listener),
79 m_lock(ceph::make_mutex(librbd::util::unique_lock_name(
80 "rbd::mirror::PoolWatcher", this))) {
81 m_mirroring_watcher = new MirroringWatcher(m_io_ctx,
82 m_threads->work_queue, this);
83 }
84
85 template <typename I>
86 PoolWatcher<I>::~PoolWatcher() {
87 delete m_mirroring_watcher;
88 }
89
90 template <typename I>
91 bool PoolWatcher<I>::is_blacklisted() const {
92 std::lock_guard locker{m_lock};
93 return m_blacklisted;
94 }
95
96 template <typename I>
97 void PoolWatcher<I>::init(Context *on_finish) {
98 dout(5) << dendl;
99
100 {
101 std::lock_guard locker{m_lock};
102 m_on_init_finish = on_finish;
103
104 ceph_assert(!m_refresh_in_progress);
105 m_refresh_in_progress = true;
106 }
107
108 // start async updates for mirror image directory
109 register_watcher();
110 }
111
112 template <typename I>
113 void PoolWatcher<I>::shut_down(Context *on_finish) {
114 dout(5) << dendl;
115
116 {
117 std::scoped_lock locker{m_threads->timer_lock, m_lock};
118
119 ceph_assert(!m_shutting_down);
120 m_shutting_down = true;
121 if (m_timer_ctx != nullptr) {
122 m_threads->timer->cancel_event(m_timer_ctx);
123 m_timer_ctx = nullptr;
124 }
125 }
126
127 // in-progress unregister tracked as async op
128 unregister_watcher();
129
130 m_async_op_tracker.wait_for_ops(on_finish);
131 }
132
133 template <typename I>
134 void PoolWatcher<I>::register_watcher() {
135 {
136 std::lock_guard locker{m_lock};
137 ceph_assert(m_image_ids_invalid);
138 ceph_assert(m_refresh_in_progress);
139 }
140
141 // if the watch registration is in-flight, let the watcher
142 // handle the transition -- only (re-)register if it's not registered
143 if (!m_mirroring_watcher->is_unregistered()) {
144 refresh_images();
145 return;
146 }
147
148 // first time registering or the watch failed
149 dout(5) << dendl;
150 m_async_op_tracker.start_op();
151
152 Context *ctx = create_context_callback<
153 PoolWatcher, &PoolWatcher<I>::handle_register_watcher>(this);
154 m_mirroring_watcher->register_watch(ctx);
155 }
156
157 template <typename I>
158 void PoolWatcher<I>::handle_register_watcher(int r) {
159 dout(5) << "r=" << r << dendl;
160
161 {
162 std::lock_guard locker{m_lock};
163 ceph_assert(m_image_ids_invalid);
164 ceph_assert(m_refresh_in_progress);
165 if (r < 0) {
166 m_refresh_in_progress = false;
167 }
168 }
169
170 Context *on_init_finish = nullptr;
171 if (r >= 0) {
172 refresh_images();
173 } else if (r == -EBLACKLISTED) {
174 dout(0) << "detected client is blacklisted" << dendl;
175
176 std::lock_guard locker{m_lock};
177 m_blacklisted = true;
178 std::swap(on_init_finish, m_on_init_finish);
179 } else if (r == -ENOENT) {
180 dout(5) << "mirroring directory does not exist" << dendl;
181 schedule_refresh_images(30);
182
183 std::lock_guard locker{m_lock};
184 std::swap(on_init_finish, m_on_init_finish);
185 } else {
186 derr << "unexpected error registering mirroring directory watch: "
187 << cpp_strerror(r) << dendl;
188 schedule_refresh_images(10);
189 }
190
191 m_async_op_tracker.finish_op();
192 if (on_init_finish != nullptr) {
193 on_init_finish->complete(r);
194 }
195 }
196
197 template <typename I>
198 void PoolWatcher<I>::unregister_watcher() {
199 dout(5) << dendl;
200
201 m_async_op_tracker.start_op();
202 Context *ctx = new LambdaContext([this](int r) {
203 dout(5) << "unregister_watcher: r=" << r << dendl;
204 if (r < 0) {
205 derr << "error unregistering watcher for "
206 << m_mirroring_watcher->get_oid() << " object: " << cpp_strerror(r)
207 << dendl;
208 }
209 m_async_op_tracker.finish_op();
210 });
211
212 m_mirroring_watcher->unregister_watch(ctx);
213 }
214
215 template <typename I>
216 void PoolWatcher<I>::refresh_images() {
217 dout(5) << dendl;
218
219 {
220 std::lock_guard locker{m_lock};
221 ceph_assert(m_image_ids_invalid);
222 ceph_assert(m_refresh_in_progress);
223
224 // clear all pending notification events since we need to perform
225 // a full image list refresh
226 m_pending_added_image_ids.clear();
227 m_pending_removed_image_ids.clear();
228 }
229
230 m_async_op_tracker.start_op();
231 m_refresh_image_ids.clear();
232 Context *ctx = create_context_callback<
233 PoolWatcher, &PoolWatcher<I>::handle_refresh_images>(this);
234 auto req = pool_watcher::RefreshImagesRequest<I>::create(m_io_ctx,
235 &m_refresh_image_ids,
236 ctx);
237 req->send();
238 }
239
240 template <typename I>
241 void PoolWatcher<I>::handle_refresh_images(int r) {
242 dout(5) << "r=" << r << dendl;
243
244 bool deferred_refresh = false;
245 bool retry_refresh = false;
246 Context *on_init_finish = nullptr;
247 {
248 std::lock_guard locker{m_lock};
249 ceph_assert(m_image_ids_invalid);
250 ceph_assert(m_refresh_in_progress);
251 m_refresh_in_progress = false;
252
253 if (r == -ENOENT) {
254 dout(5) << "mirroring directory not found" << dendl;
255 r = 0;
256 m_refresh_image_ids.clear();
257 }
258
259 if (m_deferred_refresh) {
260 // need to refresh -- skip the notification
261 deferred_refresh = true;
262 } else if (r >= 0) {
263 m_pending_image_ids = std::move(m_refresh_image_ids);
264 m_image_ids_invalid = false;
265 std::swap(on_init_finish, m_on_init_finish);
266
267 schedule_listener();
268 } else if (r == -EBLACKLISTED) {
269 dout(0) << "detected client is blacklisted during image refresh" << dendl;
270
271 m_blacklisted = true;
272 std::swap(on_init_finish, m_on_init_finish);
273 } else {
274 retry_refresh = true;
275 }
276 }
277
278 if (deferred_refresh) {
279 dout(5) << "scheduling deferred refresh" << dendl;
280 schedule_refresh_images(0);
281 } else if (retry_refresh) {
282 derr << "failed to retrieve mirroring directory: " << cpp_strerror(r)
283 << dendl;
284 schedule_refresh_images(10);
285 }
286
287 m_async_op_tracker.finish_op();
288 if (on_init_finish != nullptr) {
289 on_init_finish->complete(r);
290 }
291 }
292
293 template <typename I>
294 void PoolWatcher<I>::schedule_refresh_images(double interval) {
295 std::scoped_lock locker{m_threads->timer_lock, m_lock};
296 if (m_shutting_down || m_refresh_in_progress || m_timer_ctx != nullptr) {
297 if (m_refresh_in_progress && !m_deferred_refresh) {
298 dout(5) << "deferring refresh until in-flight refresh completes" << dendl;
299 m_deferred_refresh = true;
300 }
301 return;
302 }
303
304 m_image_ids_invalid = true;
305 m_timer_ctx = m_threads->timer->add_event_after(
306 interval,
307 new LambdaContext([this](int r) {
308 process_refresh_images();
309 }));
310 }
311
312 template <typename I>
313 void PoolWatcher<I>::handle_rewatch_complete(int r) {
314 dout(5) << "r=" << r << dendl;
315
316 if (r == -EBLACKLISTED) {
317 dout(0) << "detected client is blacklisted" << dendl;
318
319 std::lock_guard locker{m_lock};
320 m_blacklisted = true;
321 return;
322 } else if (r == -ENOENT) {
323 dout(5) << "mirroring directory deleted" << dendl;
324 } else if (r < 0) {
325 derr << "unexpected error re-registering mirroring directory watch: "
326 << cpp_strerror(r) << dendl;
327 }
328
329 schedule_refresh_images(5);
330 }
331
332 template <typename I>
333 void PoolWatcher<I>::handle_image_updated(const std::string &id,
334 const std::string &global_image_id,
335 bool enabled) {
336 dout(10) << "image_id=" << id << ", "
337 << "global_image_id=" << global_image_id << ", "
338 << "enabled=" << enabled << dendl;
339
340 std::lock_guard locker{m_lock};
341 ImageId image_id(global_image_id, id);
342 m_pending_added_image_ids.erase(image_id);
343 m_pending_removed_image_ids.erase(image_id);
344
345 if (enabled) {
346 m_pending_added_image_ids.insert(image_id);
347 schedule_listener();
348 } else {
349 m_pending_removed_image_ids.insert(image_id);
350 schedule_listener();
351 }
352 }
353
354 template <typename I>
355 void PoolWatcher<I>::process_refresh_images() {
356 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
357 ceph_assert(m_timer_ctx != nullptr);
358 m_timer_ctx = nullptr;
359
360 {
361 std::lock_guard locker{m_lock};
362 ceph_assert(!m_refresh_in_progress);
363 m_refresh_in_progress = true;
364 m_deferred_refresh = false;
365 }
366
367 // execute outside of the timer's lock
368 m_async_op_tracker.start_op();
369 Context *ctx = new LambdaContext([this](int r) {
370 register_watcher();
371 m_async_op_tracker.finish_op();
372 });
373 m_threads->work_queue->queue(ctx, 0);
374 }
375
376 template <typename I>
377 void PoolWatcher<I>::schedule_listener() {
378 ceph_assert(ceph_mutex_is_locked(m_lock));
379 m_pending_updates = true;
380 if (m_shutting_down || m_image_ids_invalid || m_notify_listener_in_progress) {
381 return;
382 }
383
384 dout(20) << dendl;
385
386 m_async_op_tracker.start_op();
387 Context *ctx = new LambdaContext([this](int r) {
388 notify_listener();
389 m_async_op_tracker.finish_op();
390 });
391
392 m_notify_listener_in_progress = true;
393 m_threads->work_queue->queue(ctx, 0);
394 }
395
396 template <typename I>
397 void PoolWatcher<I>::notify_listener() {
398 dout(10) << dendl;
399
400 std::string mirror_uuid;
401 ImageIds added_image_ids;
402 ImageIds removed_image_ids;
403 {
404 std::lock_guard locker{m_lock};
405 ceph_assert(m_notify_listener_in_progress);
406 }
407
408 if (!removed_image_ids.empty()) {
409 m_listener.handle_update(mirror_uuid, {}, std::move(removed_image_ids));
410 removed_image_ids.clear();
411 }
412
413 {
414 std::lock_guard locker{m_lock};
415 ceph_assert(m_notify_listener_in_progress);
416
417 // if the watch failed while we didn't own the lock, we are going
418 // to need to perform a full refresh
419 if (m_image_ids_invalid) {
420 m_notify_listener_in_progress = false;
421 return;
422 }
423
424 // merge add/remove notifications into pending set (a given image
425 // can only be in one set or another)
426 for (auto &image_id : m_pending_removed_image_ids) {
427 dout(20) << "image_id=" << image_id << dendl;
428 m_pending_image_ids.erase(image_id);
429 }
430
431 for (auto &image_id : m_pending_added_image_ids) {
432 dout(20) << "image_id=" << image_id << dendl;
433 m_pending_image_ids.erase(image_id);
434 m_pending_image_ids.insert(image_id);
435 }
436 m_pending_added_image_ids.clear();
437
438 // compute added/removed images
439 for (auto &image_id : m_image_ids) {
440 auto it = m_pending_image_ids.find(image_id);
441 if (it == m_pending_image_ids.end() || it->id != image_id.id) {
442 removed_image_ids.insert(image_id);
443 }
444 }
445 for (auto &image_id : m_pending_image_ids) {
446 auto it = m_image_ids.find(image_id);
447 if (it == m_image_ids.end() || it->id != image_id.id) {
448 added_image_ids.insert(image_id);
449 }
450 }
451
452 m_pending_updates = false;
453 m_image_ids = m_pending_image_ids;
454 }
455
456 m_listener.handle_update(m_mirror_uuid, std::move(added_image_ids),
457 std::move(removed_image_ids));
458
459 {
460 std::lock_guard locker{m_lock};
461 m_notify_listener_in_progress = false;
462 if (m_pending_updates) {
463 schedule_listener();
464 }
465 }
466 }
467
468 } // namespace mirror
469 } // namespace rbd
470
471 template class rbd::mirror::PoolWatcher<librbd::ImageCtx>;