]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/PoolWatcher.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / tools / rbd_mirror / PoolWatcher.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "tools/rbd_mirror/PoolWatcher.h"
5#include "include/rbd_types.h"
6#include "cls/rbd/cls_rbd_client.h"
7#include "common/debug.h"
8#include "common/errno.h"
9#include "common/Timer.h"
10#include "librbd/ImageCtx.h"
11#include "librbd/internal.h"
12#include "librbd/MirroringWatcher.h"
13#include "librbd/Utils.h"
14#include "librbd/api/Image.h"
15#include "librbd/api/Mirror.h"
16#include "tools/rbd_mirror/Threads.h"
17#include "tools/rbd_mirror/pool_watcher/RefreshImagesRequest.h"
18#include <boost/bind.hpp>
19
20#define dout_context g_ceph_context
21#define dout_subsys ceph_subsys_rbd_mirror
22#undef dout_prefix
23#define dout_prefix *_dout << "rbd::mirror::PoolWatcher: " << this << " " \
24 << __func__ << ": "
25
26using std::list;
27using std::string;
28using std::unique_ptr;
29using std::vector;
30using librbd::util::create_context_callback;
31using librbd::util::create_rados_callback;
32
33namespace rbd {
34namespace mirror {
35
36template <typename I>
37class PoolWatcher<I>::MirroringWatcher : public librbd::MirroringWatcher<I> {
38public:
39 using ContextWQ = typename std::decay<
40 typename std::remove_pointer<
41 decltype(Threads<I>::work_queue)>::type>::type;
42
43 MirroringWatcher(librados::IoCtx &io_ctx, ContextWQ *work_queue,
44 PoolWatcher *pool_watcher)
45 : librbd::MirroringWatcher<I>(io_ctx, work_queue),
46 m_pool_watcher(pool_watcher) {
47 }
48
49 void handle_rewatch_complete(int r) override {
50 m_pool_watcher->handle_rewatch_complete(r);
51 }
52
53 void handle_mode_updated(cls::rbd::MirrorMode mirror_mode) override {
54 // invalidate all image state and refresh the pool contents
55 m_pool_watcher->schedule_refresh_images(5);
56 }
57
58 void handle_image_updated(cls::rbd::MirrorImageState state,
59 const std::string &remote_image_id,
60 const std::string &global_image_id) override {
61 bool enabled = (state == cls::rbd::MIRROR_IMAGE_STATE_ENABLED);
62 m_pool_watcher->handle_image_updated(remote_image_id, global_image_id,
63 enabled);
64 }
65
66private:
67 PoolWatcher *m_pool_watcher;
68};
69
70template <typename I>
71PoolWatcher<I>::PoolWatcher(Threads<I> *threads, librados::IoCtx &remote_io_ctx,
72 Listener &listener)
73 : m_threads(threads), m_remote_io_ctx(remote_io_ctx), m_listener(listener),
74 m_lock(librbd::util::unique_lock_name("rbd::mirror::PoolWatcher", this)) {
75 m_mirroring_watcher = new MirroringWatcher(m_remote_io_ctx,
76 m_threads->work_queue, this);
77}
78
79template <typename I>
80PoolWatcher<I>::~PoolWatcher() {
81 delete m_mirroring_watcher;
82}
83
84template <typename I>
85bool PoolWatcher<I>::is_blacklisted() const {
86 Mutex::Locker locker(m_lock);
87 return m_blacklisted;
88}
89
90template <typename I>
91void PoolWatcher<I>::init(Context *on_finish) {
92 dout(5) << dendl;
93
94 {
95 Mutex::Locker locker(m_lock);
96 m_on_init_finish = on_finish;
97
98 assert(!m_refresh_in_progress);
99 m_refresh_in_progress = true;
100 }
101
102 // start async updates for mirror image directory
103 register_watcher();
104}
105
106template <typename I>
107void PoolWatcher<I>::shut_down(Context *on_finish) {
108 dout(5) << dendl;
109
110 {
111 Mutex::Locker timer_locker(m_threads->timer_lock);
112 Mutex::Locker locker(m_lock);
113
114 assert(!m_shutting_down);
115 m_shutting_down = true;
116 if (m_timer_ctx != nullptr) {
117 m_threads->timer->cancel_event(m_timer_ctx);
118 m_timer_ctx = nullptr;
119 }
120 }
121
122 // in-progress unregister tracked as async op
123 unregister_watcher();
124
125 m_async_op_tracker.wait_for_ops(on_finish);
126}
127
128template <typename I>
129void PoolWatcher<I>::register_watcher() {
130 {
131 Mutex::Locker locker(m_lock);
132 assert(m_image_ids_invalid);
133 assert(m_refresh_in_progress);
134 }
135
136 // if the watch registration is in-flight, let the watcher
137 // handle the transition -- only (re-)register if it's not registered
138 if (!m_mirroring_watcher->is_unregistered()) {
139 refresh_images();
140 return;
141 }
142
143 // first time registering or the watch failed
144 dout(5) << dendl;
145 m_async_op_tracker.start_op();
146
147 Context *ctx = create_context_callback<
148 PoolWatcher, &PoolWatcher<I>::handle_register_watcher>(this);
149 m_mirroring_watcher->register_watch(ctx);
150}
151
152template <typename I>
153void PoolWatcher<I>::handle_register_watcher(int r) {
154 dout(5) << "r=" << r << dendl;
155
156 {
157 Mutex::Locker locker(m_lock);
158 assert(m_image_ids_invalid);
159 assert(m_refresh_in_progress);
160 if (r < 0) {
161 m_refresh_in_progress = false;
162 }
163 }
164
165 Context *on_init_finish = nullptr;
166 if (r >= 0) {
167 refresh_images();
168 } else if (r == -EBLACKLISTED) {
169 dout(0) << "detected client is blacklisted" << dendl;
170
171 Mutex::Locker locker(m_lock);
172 m_blacklisted = true;
173 std::swap(on_init_finish, m_on_init_finish);
174 } else if (r == -ENOENT) {
175 dout(5) << "mirroring directory does not exist" << dendl;
176 schedule_refresh_images(30);
177 } else {
178 derr << "unexpected error registering mirroring directory watch: "
179 << cpp_strerror(r) << dendl;
180 schedule_refresh_images(10);
181 }
182
183 m_async_op_tracker.finish_op();
184 if (on_init_finish != nullptr) {
185 on_init_finish->complete(r);
186 }
187}
188
189template <typename I>
190void PoolWatcher<I>::unregister_watcher() {
191 dout(5) << dendl;
192
193 m_async_op_tracker.start_op();
194 Context *ctx = new FunctionContext([this](int r) {
195 dout(5) << "unregister_watcher: r=" << r << dendl;
196 if (r < 0) {
197 derr << "error unregistering watcher for "
198 << m_mirroring_watcher->get_oid() << " object: " << cpp_strerror(r)
199 << dendl;
200 }
201 m_async_op_tracker.finish_op();
202 });
203
204 m_mirroring_watcher->unregister_watch(ctx);
205}
206
207template <typename I>
208void PoolWatcher<I>::refresh_images() {
209 dout(5) << dendl;
210
211 {
212 Mutex::Locker locker(m_lock);
213 assert(m_image_ids_invalid);
214 assert(m_refresh_in_progress);
215
216 // clear all pending notification events since we need to perform
217 // a full image list refresh
218 m_pending_added_image_ids.clear();
219 m_pending_removed_image_ids.clear();
220 }
221
222 m_async_op_tracker.start_op();
223 m_refresh_image_ids.clear();
224 Context *ctx = create_context_callback<
225 PoolWatcher, &PoolWatcher<I>::handle_refresh_images>(this);
226 auto req = pool_watcher::RefreshImagesRequest<I>::create(m_remote_io_ctx,
227 &m_refresh_image_ids,
228 ctx);
229 req->send();
230}
231
232template <typename I>
233void PoolWatcher<I>::handle_refresh_images(int r) {
234 dout(5) << "r=" << r << dendl;
235
236 bool retry_refresh = false;
237 Context *on_init_finish = nullptr;
238 {
239 Mutex::Locker locker(m_lock);
240 assert(m_image_ids_invalid);
241 assert(m_refresh_in_progress);
242
243 if (r >= 0) {
244 m_pending_image_ids = std::move(m_refresh_image_ids);
245 } else if (r == -EBLACKLISTED) {
246 dout(0) << "detected client is blacklisted during image refresh" << dendl;
247
248 m_blacklisted = true;
249 m_refresh_in_progress = false;
250 std::swap(on_init_finish, m_on_init_finish);
251 } else if (r == -ENOENT) {
252 dout(5) << "mirroring directory not found" << dendl;
253 m_pending_image_ids.clear();
254 r = 0;
255 } else {
256 m_refresh_in_progress = false;
257 retry_refresh = true;
258 }
259 }
260
261 if (retry_refresh) {
262 derr << "failed to retrieve mirroring directory: " << cpp_strerror(r)
263 << dendl;
264 schedule_refresh_images(10);
265 } else if (r >= 0) {
266 get_mirror_uuid();
267 return;
268 }
269
270 m_async_op_tracker.finish_op();
271 if (on_init_finish != nullptr) {
272 assert(r == -EBLACKLISTED);
273 on_init_finish->complete(r);
274 }
275}
276
277template <typename I>
278void PoolWatcher<I>::get_mirror_uuid() {
279 dout(5) << dendl;
280
281 librados::ObjectReadOperation op;
282 librbd::cls_client::mirror_uuid_get_start(&op);
283
284 m_out_bl.clear();
285 librados::AioCompletion *aio_comp = create_rados_callback<
286 PoolWatcher, &PoolWatcher<I>::handle_get_mirror_uuid>(this);
287 int r = m_remote_io_ctx.aio_operate(RBD_MIRRORING, aio_comp, &op, &m_out_bl);
288 assert(r == 0);
289 aio_comp->release();
290}
291
292template <typename I>
293void PoolWatcher<I>::handle_get_mirror_uuid(int r) {
294 dout(5) << "r=" << r << dendl;
295
296 bool deferred_refresh = false;
297 bool retry_refresh = false;
298 Context *on_init_finish = nullptr;
299 {
300 Mutex::Locker locker(m_lock);
301 assert(m_image_ids_invalid);
302 assert(m_refresh_in_progress);
303 m_refresh_in_progress = false;
304
305 m_pending_mirror_uuid = "";
306 if (r >= 0) {
307 bufferlist::iterator it = m_out_bl.begin();
308 r = librbd::cls_client::mirror_uuid_get_finish(
309 &it, &m_pending_mirror_uuid);
310 }
311 if (r >= 0 && m_pending_mirror_uuid.empty()) {
312 r = -ENOENT;
313 }
314
315 if (m_deferred_refresh) {
316 // need to refresh -- skip the notification
317 deferred_refresh = true;
318 } else if (r >= 0) {
319 dout(10) << "mirror_uuid=" << m_pending_mirror_uuid << dendl;
320 m_image_ids_invalid = false;
321 std::swap(on_init_finish, m_on_init_finish);
322 schedule_listener();
323 } else if (r == -EBLACKLISTED) {
324 dout(0) << "detected client is blacklisted during image refresh" << dendl;
325
326 m_blacklisted = true;
327 std::swap(on_init_finish, m_on_init_finish);
328 } else if (r == -ENOENT) {
329 dout(5) << "mirroring uuid not found" << dendl;
330 std::swap(on_init_finish, m_on_init_finish);
331 retry_refresh = true;
332 } else {
333 retry_refresh = true;
334 }
335 }
336
337 if (deferred_refresh) {
338 dout(5) << "scheduling deferred refresh" << dendl;
339 schedule_refresh_images(0);
340 } else if (retry_refresh) {
341 derr << "failed to retrieve mirror uuid: " << cpp_strerror(r)
342 << dendl;
343 schedule_refresh_images(10);
344 }
345
346 m_async_op_tracker.finish_op();
347 if (on_init_finish != nullptr) {
348 on_init_finish->complete(r);
349 }
350}
351
352template <typename I>
353void PoolWatcher<I>::schedule_refresh_images(double interval) {
354 Mutex::Locker timer_locker(m_threads->timer_lock);
355 Mutex::Locker locker(m_lock);
356 if (m_shutting_down || m_refresh_in_progress || m_timer_ctx != nullptr) {
357 if (m_refresh_in_progress && !m_deferred_refresh) {
358 dout(5) << "deferring refresh until in-flight refresh completes" << dendl;
359 m_deferred_refresh = true;
360 }
361 return;
362 }
363
364 m_image_ids_invalid = true;
365 m_timer_ctx = new FunctionContext([this](int r) {
366 process_refresh_images();
367 });
368 m_threads->timer->add_event_after(interval, m_timer_ctx);
369}
370
371template <typename I>
372void PoolWatcher<I>::handle_rewatch_complete(int r) {
373 dout(5) << "r=" << r << dendl;
374
375 if (r == -EBLACKLISTED) {
376 dout(0) << "detected client is blacklisted" << dendl;
377
378 Mutex::Locker locker(m_lock);
379 m_blacklisted = true;
380 return;
381 } else if (r == -ENOENT) {
382 dout(5) << "mirroring directory deleted" << dendl;
383 } else if (r < 0) {
384 derr << "unexpected error re-registering mirroring directory watch: "
385 << cpp_strerror(r) << dendl;
386 }
387
388 schedule_refresh_images(5);
389}
390
391template <typename I>
392void PoolWatcher<I>::handle_image_updated(const std::string &remote_image_id,
393 const std::string &global_image_id,
394 bool enabled) {
395 dout(10) << "remote_image_id=" << remote_image_id << ", "
396 << "global_image_id=" << global_image_id << ", "
397 << "enabled=" << enabled << dendl;
398
399 Mutex::Locker locker(m_lock);
400 ImageId image_id(global_image_id, remote_image_id);
401 m_pending_added_image_ids.erase(image_id);
402 m_pending_removed_image_ids.erase(image_id);
403
404 if (enabled) {
405 m_pending_added_image_ids.insert(image_id);
406 schedule_listener();
407 } else {
408 m_pending_removed_image_ids.insert(image_id);
409 schedule_listener();
410 }
411}
412
413template <typename I>
414void PoolWatcher<I>::process_refresh_images() {
415 assert(m_threads->timer_lock.is_locked());
416 assert(m_timer_ctx != nullptr);
417 m_timer_ctx = nullptr;
418
419 {
420 Mutex::Locker locker(m_lock);
421 assert(!m_refresh_in_progress);
422 m_refresh_in_progress = true;
423 m_deferred_refresh = false;
424 }
425
426 // execute outside of the timer's lock
427 m_async_op_tracker.start_op();
428 Context *ctx = new FunctionContext([this](int r) {
429 register_watcher();
430 m_async_op_tracker.finish_op();
431 });
432 m_threads->work_queue->queue(ctx, 0);
433}
434
435template <typename I>
436void PoolWatcher<I>::schedule_listener() {
437 assert(m_lock.is_locked());
438 m_pending_updates = true;
439 if (m_shutting_down || m_image_ids_invalid || m_notify_listener_in_progress) {
440 return;
441 }
442
443 dout(20) << dendl;
444
445 m_async_op_tracker.start_op();
446 Context *ctx = new FunctionContext([this](int r) {
447 notify_listener();
448 m_async_op_tracker.finish_op();
449 });
450
451 m_notify_listener_in_progress = true;
452 m_threads->work_queue->queue(ctx, 0);
453}
454
455template <typename I>
456void PoolWatcher<I>::notify_listener() {
457 dout(10) << dendl;
458
459 std::string mirror_uuid;
460 ImageIds added_image_ids;
461 ImageIds removed_image_ids;
462 {
463 Mutex::Locker locker(m_lock);
464 assert(m_notify_listener_in_progress);
465
466 // if the mirror uuid is updated, treat it as the removal of all
467 // images in the pool
468 if (m_mirror_uuid != m_pending_mirror_uuid) {
469 if (!m_mirror_uuid.empty()) {
470 dout(0) << "mirror uuid updated:"
471 << "old=" << m_mirror_uuid << ", "
472 << "new=" << m_pending_mirror_uuid << dendl;
473 }
474
475 mirror_uuid = m_mirror_uuid;
476 removed_image_ids = std::move(m_image_ids);
477 m_image_ids.clear();
478 }
479 }
480
481 if (!removed_image_ids.empty()) {
482 m_listener.handle_update(mirror_uuid, {}, std::move(removed_image_ids));
483 removed_image_ids.clear();
484 }
485
486 {
487 Mutex::Locker locker(m_lock);
488 assert(m_notify_listener_in_progress);
489
490 // if the watch failed while we didn't own the lock, we are going
491 // to need to perform a full refresh
492 if (m_image_ids_invalid) {
493 m_notify_listener_in_progress = false;
494 return;
495 }
496
497 // merge add/remove notifications into pending set (a given image
498 // can only be in one set or another)
499 for (auto &image_id : m_pending_removed_image_ids) {
500 dout(20) << "image_id=" << image_id << dendl;
501 m_pending_image_ids.erase(image_id);
502 }
503
504 for (auto &image_id : m_pending_added_image_ids) {
505 dout(20) << "image_id=" << image_id << dendl;
506 m_pending_image_ids.erase(image_id);
507 m_pending_image_ids.insert(image_id);
508 }
509 m_pending_added_image_ids.clear();
510
511 // compute added/removed images
512 for (auto &image_id : m_image_ids) {
513 auto it = m_pending_image_ids.find(image_id);
514 if (it == m_pending_image_ids.end() || it->id != image_id.id) {
515 removed_image_ids.insert(image_id);
516 }
517 }
518 for (auto &image_id : m_pending_image_ids) {
519 auto it = m_image_ids.find(image_id);
520 if (it == m_image_ids.end() || it->id != image_id.id) {
521 added_image_ids.insert(image_id);
522 }
523 }
524
525 m_pending_updates = false;
526 m_image_ids = m_pending_image_ids;
527
528 m_mirror_uuid = m_pending_mirror_uuid;
529 mirror_uuid = m_mirror_uuid;
530 }
531
532 m_listener.handle_update(mirror_uuid, std::move(added_image_ids),
533 std::move(removed_image_ids));
534
535 {
536 Mutex::Locker locker(m_lock);
537 m_notify_listener_in_progress = false;
538 if (m_pending_updates) {
539 schedule_listener();
540 }
541 }
542}
543
544} // namespace mirror
545} // namespace rbd
546
547template class rbd::mirror::PoolWatcher<librbd::ImageCtx>;