]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/InstanceReplayer.cc
import ceph quincy 17.2.4
[ceph.git] / ceph / src / tools / rbd_mirror / InstanceReplayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/stringify.h"
5 #include "common/Cond.h"
6 #include "common/Timer.h"
7 #include "common/debug.h"
8 #include "common/errno.h"
9 #include "librbd/Utils.h"
10 #include "librbd/asio/ContextWQ.h"
11 #include "ImageReplayer.h"
12 #include "InstanceReplayer.h"
13 #include "ServiceDaemon.h"
14 #include "Threads.h"
15
16 #define dout_context g_ceph_context
17 #define dout_subsys ceph_subsys_rbd_mirror
18 #undef dout_prefix
19 #define dout_prefix *_dout << "rbd::mirror::InstanceReplayer: " \
20 << this << " " << __func__ << ": "
21
22 namespace rbd {
23 namespace mirror {
24
25 namespace {
26
27 const std::string SERVICE_DAEMON_ASSIGNED_COUNT_KEY("image_assigned_count");
28 const std::string SERVICE_DAEMON_WARNING_COUNT_KEY("image_warning_count");
29 const std::string SERVICE_DAEMON_ERROR_COUNT_KEY("image_error_count");
30
31 } // anonymous namespace
32
33 using librbd::util::create_async_context_callback;
34 using librbd::util::create_context_callback;
35
36 template <typename I>
37 InstanceReplayer<I>::InstanceReplayer(
38 librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid,
39 Threads<I> *threads, ServiceDaemon<I>* service_daemon,
40 MirrorStatusUpdater<I>* local_status_updater,
41 journal::CacheManagerHandler *cache_manager_handler,
42 PoolMetaCache* pool_meta_cache)
43 : m_local_io_ctx(local_io_ctx), m_local_mirror_uuid(local_mirror_uuid),
44 m_threads(threads), m_service_daemon(service_daemon),
45 m_local_status_updater(local_status_updater),
46 m_cache_manager_handler(cache_manager_handler),
47 m_pool_meta_cache(pool_meta_cache),
48 m_lock(ceph::make_mutex("rbd::mirror::InstanceReplayer " +
49 stringify(local_io_ctx.get_id()))) {
50 }
51
52 template <typename I>
53 InstanceReplayer<I>::~InstanceReplayer() {
54 ceph_assert(m_image_state_check_task == nullptr);
55 ceph_assert(m_async_op_tracker.empty());
56 ceph_assert(m_image_replayers.empty());
57 }
58
59 template <typename I>
60 bool InstanceReplayer<I>::is_blocklisted() const {
61 std::lock_guard locker{m_lock};
62 return m_blocklisted;
63 }
64
65 template <typename I>
66 int InstanceReplayer<I>::init() {
67 C_SaferCond init_ctx;
68 init(&init_ctx);
69 return init_ctx.wait();
70 }
71
72 template <typename I>
73 void InstanceReplayer<I>::init(Context *on_finish) {
74 dout(10) << dendl;
75
76 Context *ctx = new LambdaContext(
77 [this, on_finish] (int r) {
78 {
79 std::lock_guard timer_locker{m_threads->timer_lock};
80 schedule_image_state_check_task();
81 }
82 on_finish->complete(0);
83 });
84
85 m_threads->work_queue->queue(ctx, 0);
86 }
87
88 template <typename I>
89 void InstanceReplayer<I>::shut_down() {
90 C_SaferCond shut_down_ctx;
91 shut_down(&shut_down_ctx);
92 int r = shut_down_ctx.wait();
93 ceph_assert(r == 0);
94 }
95
96 template <typename I>
97 void InstanceReplayer<I>::shut_down(Context *on_finish) {
98 dout(10) << dendl;
99
100 std::lock_guard locker{m_lock};
101
102 ceph_assert(m_on_shut_down == nullptr);
103 m_on_shut_down = on_finish;
104
105 Context *ctx = new LambdaContext(
106 [this] (int r) {
107 cancel_image_state_check_task();
108 wait_for_ops();
109 });
110
111 m_threads->work_queue->queue(ctx, 0);
112 }
113
114 template <typename I>
115 void InstanceReplayer<I>::add_peer(const Peer<I>& peer) {
116 dout(10) << "peer=" << peer << dendl;
117
118 std::lock_guard locker{m_lock};
119 auto result = m_peers.insert(peer).second;
120 ceph_assert(result);
121 }
122
123 template <typename I>
124 void InstanceReplayer<I>::release_all(Context *on_finish) {
125 dout(10) << dendl;
126
127 std::lock_guard locker{m_lock};
128
129 C_Gather *gather_ctx = new C_Gather(g_ceph_context, on_finish);
130 for (auto it = m_image_replayers.begin(); it != m_image_replayers.end();
131 it = m_image_replayers.erase(it)) {
132 auto image_replayer = it->second;
133 auto ctx = gather_ctx->new_sub();
134 ctx = new LambdaContext(
135 [image_replayer, ctx] (int r) {
136 image_replayer->destroy();
137 ctx->complete(0);
138 });
139 stop_image_replayer(image_replayer, ctx);
140 }
141 gather_ctx->activate();
142 }
143
144 template <typename I>
145 void InstanceReplayer<I>::acquire_image(InstanceWatcher<I> *instance_watcher,
146 const std::string &global_image_id,
147 Context *on_finish) {
148 dout(10) << "global_image_id=" << global_image_id << dendl;
149
150 std::lock_guard locker{m_lock};
151
152 ceph_assert(m_on_shut_down == nullptr);
153
154 auto it = m_image_replayers.find(global_image_id);
155 if (it == m_image_replayers.end()) {
156 auto image_replayer = ImageReplayer<I>::create(
157 m_local_io_ctx, m_local_mirror_uuid, global_image_id,
158 m_threads, instance_watcher, m_local_status_updater,
159 m_cache_manager_handler, m_pool_meta_cache);
160
161 dout(10) << global_image_id << ": creating replayer " << image_replayer
162 << dendl;
163
164 it = m_image_replayers.insert(std::make_pair(global_image_id,
165 image_replayer)).first;
166
167 // TODO only a single peer is currently supported
168 ceph_assert(m_peers.size() == 1);
169 auto peer = *m_peers.begin();
170 image_replayer->add_peer(peer);
171 start_image_replayer(image_replayer);
172 } else {
173 // A duplicate acquire notification implies (1) connection hiccup or
174 // (2) new leader election. For the second case, restart the replayer to
175 // detect if the image has been deleted while the leader was offline
176 auto& image_replayer = it->second;
177 image_replayer->set_finished(false);
178 image_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr));
179 }
180
181 m_threads->work_queue->queue(on_finish, 0);
182 }
183
184 template <typename I>
185 void InstanceReplayer<I>::release_image(const std::string &global_image_id,
186 Context *on_finish) {
187 dout(10) << "global_image_id=" << global_image_id << dendl;
188
189 std::lock_guard locker{m_lock};
190 ceph_assert(m_on_shut_down == nullptr);
191
192 auto it = m_image_replayers.find(global_image_id);
193 if (it == m_image_replayers.end()) {
194 dout(5) << global_image_id << ": not found" << dendl;
195 m_threads->work_queue->queue(on_finish, 0);
196 return;
197 }
198
199 auto image_replayer = it->second;
200 m_image_replayers.erase(it);
201
202 on_finish = new LambdaContext(
203 [image_replayer, on_finish] (int r) {
204 image_replayer->destroy();
205 on_finish->complete(0);
206 });
207 stop_image_replayer(image_replayer, on_finish);
208 }
209
210 template <typename I>
211 void InstanceReplayer<I>::remove_peer_image(const std::string &global_image_id,
212 const std::string &peer_mirror_uuid,
213 Context *on_finish) {
214 dout(10) << "global_image_id=" << global_image_id << ", "
215 << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
216
217 std::lock_guard locker{m_lock};
218 ceph_assert(m_on_shut_down == nullptr);
219
220 auto it = m_image_replayers.find(global_image_id);
221 if (it != m_image_replayers.end()) {
222 // TODO only a single peer is currently supported, therefore
223 // we can just interrupt the current image replayer and
224 // it will eventually detect that the peer image is missing and
225 // determine if a delete propagation is required.
226 auto image_replayer = it->second;
227 image_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr));
228 }
229 m_threads->work_queue->queue(on_finish, 0);
230 }
231
232 template <typename I>
233 void InstanceReplayer<I>::print_status(Formatter *f) {
234 dout(10) << dendl;
235
236 std::lock_guard locker{m_lock};
237
238 f->open_array_section("image_replayers");
239 for (auto &kv : m_image_replayers) {
240 auto &image_replayer = kv.second;
241 image_replayer->print_status(f);
242 }
243 f->close_section();
244 }
245
246 template <typename I>
247 void InstanceReplayer<I>::start()
248 {
249 dout(10) << dendl;
250
251 std::lock_guard locker{m_lock};
252
253 m_manual_stop = false;
254
255 auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
256 auto gather_ctx = new C_Gather(
257 cct, new C_TrackedOp(m_async_op_tracker, nullptr));
258 for (auto &kv : m_image_replayers) {
259 auto &image_replayer = kv.second;
260 image_replayer->start(gather_ctx->new_sub(), true);
261 }
262
263 gather_ctx->activate();
264 }
265
266 template <typename I>
267 void InstanceReplayer<I>::stop()
268 {
269 stop(nullptr);
270 }
271
272 template <typename I>
273 void InstanceReplayer<I>::stop(Context *on_finish)
274 {
275 dout(10) << dendl;
276
277 if (on_finish == nullptr) {
278 on_finish = new C_TrackedOp(m_async_op_tracker, on_finish);
279 } else {
280 on_finish = new LambdaContext(
281 [this, on_finish] (int r) {
282 m_async_op_tracker.wait_for_ops(on_finish);
283 });
284 }
285
286 auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
287 auto gather_ctx = new C_Gather(cct, on_finish);
288 {
289 std::lock_guard locker{m_lock};
290
291 m_manual_stop = true;
292
293 for (auto &kv : m_image_replayers) {
294 auto &image_replayer = kv.second;
295 image_replayer->stop(gather_ctx->new_sub(), true);
296 }
297 }
298
299 gather_ctx->activate();
300 }
301
302 template <typename I>
303 void InstanceReplayer<I>::restart()
304 {
305 dout(10) << dendl;
306
307 std::lock_guard locker{m_lock};
308
309 m_manual_stop = false;
310
311 for (auto &kv : m_image_replayers) {
312 auto &image_replayer = kv.second;
313 image_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr));
314 }
315 }
316
317 template <typename I>
318 void InstanceReplayer<I>::flush()
319 {
320 dout(10) << dendl;
321
322 std::lock_guard locker{m_lock};
323
324 for (auto &kv : m_image_replayers) {
325 auto &image_replayer = kv.second;
326 image_replayer->flush();
327 }
328 }
329
330 template <typename I>
331 void InstanceReplayer<I>::start_image_replayer(
332 ImageReplayer<I> *image_replayer) {
333 ceph_assert(ceph_mutex_is_locked(m_lock));
334
335 std::string global_image_id = image_replayer->get_global_image_id();
336 if (!image_replayer->is_stopped()) {
337 return;
338 } else if (image_replayer->is_blocklisted()) {
339 derr << "global_image_id=" << global_image_id << ": blocklisted detected "
340 << "during image replay" << dendl;
341 m_blocklisted = true;
342 return;
343 } else if (image_replayer->is_finished()) {
344 // TODO temporary until policy integrated
345 dout(5) << "removing image replayer for global_image_id="
346 << global_image_id << dendl;
347 m_image_replayers.erase(image_replayer->get_global_image_id());
348 image_replayer->destroy();
349 return;
350 } else if (m_manual_stop) {
351 return;
352 }
353
354 dout(10) << "global_image_id=" << global_image_id << dendl;
355 image_replayer->start(new C_TrackedOp(m_async_op_tracker, nullptr), false);
356 }
357
358 template <typename I>
359 void InstanceReplayer<I>::queue_start_image_replayers() {
360 dout(10) << dendl;
361
362 Context *ctx = create_context_callback<
363 InstanceReplayer, &InstanceReplayer<I>::start_image_replayers>(this);
364 m_async_op_tracker.start_op();
365 m_threads->work_queue->queue(ctx, 0);
366 }
367
368 template <typename I>
369 void InstanceReplayer<I>::start_image_replayers(int r) {
370 dout(10) << dendl;
371
372 std::lock_guard locker{m_lock};
373 if (m_on_shut_down != nullptr) {
374 m_async_op_tracker.finish_op();
375 return;
376 }
377
378 uint64_t image_count = 0;
379 uint64_t warning_count = 0;
380 uint64_t error_count = 0;
381 for (auto it = m_image_replayers.begin();
382 it != m_image_replayers.end();) {
383 auto current_it(it);
384 ++it;
385
386 ++image_count;
387 auto health_state = current_it->second->get_health_state();
388 if (health_state == image_replayer::HEALTH_STATE_WARNING) {
389 ++warning_count;
390 } else if (health_state == image_replayer::HEALTH_STATE_ERROR) {
391 ++error_count;
392 }
393
394 start_image_replayer(current_it->second);
395 }
396
397 m_service_daemon->add_or_update_namespace_attribute(
398 m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
399 SERVICE_DAEMON_ASSIGNED_COUNT_KEY, image_count);
400 m_service_daemon->add_or_update_namespace_attribute(
401 m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
402 SERVICE_DAEMON_WARNING_COUNT_KEY, warning_count);
403 m_service_daemon->add_or_update_namespace_attribute(
404 m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
405 SERVICE_DAEMON_ERROR_COUNT_KEY, error_count);
406
407 m_async_op_tracker.finish_op();
408 }
409
410 template <typename I>
411 void InstanceReplayer<I>::stop_image_replayer(ImageReplayer<I> *image_replayer,
412 Context *on_finish) {
413 dout(10) << image_replayer << " global_image_id="
414 << image_replayer->get_global_image_id() << ", on_finish="
415 << on_finish << dendl;
416
417 if (image_replayer->is_stopped()) {
418 m_threads->work_queue->queue(on_finish, 0);
419 return;
420 }
421
422 m_async_op_tracker.start_op();
423 Context *ctx = create_async_context_callback(
424 m_threads->work_queue, new LambdaContext(
425 [this, image_replayer, on_finish] (int r) {
426 stop_image_replayer(image_replayer, on_finish);
427 m_async_op_tracker.finish_op();
428 }));
429
430 if (image_replayer->is_running()) {
431 image_replayer->stop(ctx, false);
432 } else {
433 int after = 1;
434 dout(10) << "scheduling image replayer " << image_replayer << " stop after "
435 << after << " sec (task " << ctx << ")" << dendl;
436 ctx = new LambdaContext(
437 [this, after, ctx] (int r) {
438 std::lock_guard timer_locker{m_threads->timer_lock};
439 m_threads->timer->add_event_after(after, ctx);
440 });
441 m_threads->work_queue->queue(ctx, 0);
442 }
443 }
444
445 template <typename I>
446 void InstanceReplayer<I>::wait_for_ops() {
447 dout(10) << dendl;
448
449 Context *ctx = create_context_callback<
450 InstanceReplayer, &InstanceReplayer<I>::handle_wait_for_ops>(this);
451
452 m_async_op_tracker.wait_for_ops(ctx);
453 }
454
455 template <typename I>
456 void InstanceReplayer<I>::handle_wait_for_ops(int r) {
457 dout(10) << "r=" << r << dendl;
458
459 ceph_assert(r == 0);
460
461 std::lock_guard locker{m_lock};
462 stop_image_replayers();
463 }
464
465 template <typename I>
466 void InstanceReplayer<I>::stop_image_replayers() {
467 dout(10) << dendl;
468
469 ceph_assert(ceph_mutex_is_locked(m_lock));
470
471 Context *ctx = create_async_context_callback(
472 m_threads->work_queue, create_context_callback<InstanceReplayer<I>,
473 &InstanceReplayer<I>::handle_stop_image_replayers>(this));
474
475 C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
476 for (auto &it : m_image_replayers) {
477 stop_image_replayer(it.second, gather_ctx->new_sub());
478 }
479 gather_ctx->activate();
480 }
481
482 template <typename I>
483 void InstanceReplayer<I>::handle_stop_image_replayers(int r) {
484 dout(10) << "r=" << r << dendl;
485
486 ceph_assert(r == 0);
487
488 Context *on_finish = nullptr;
489 {
490 std::lock_guard locker{m_lock};
491
492 for (auto &it : m_image_replayers) {
493 ceph_assert(it.second->is_stopped());
494 it.second->destroy();
495 }
496 m_image_replayers.clear();
497
498 ceph_assert(m_on_shut_down != nullptr);
499 std::swap(on_finish, m_on_shut_down);
500 }
501 on_finish->complete(r);
502 }
503
504 template <typename I>
505 void InstanceReplayer<I>::cancel_image_state_check_task() {
506 std::lock_guard timer_locker{m_threads->timer_lock};
507
508 if (m_image_state_check_task == nullptr) {
509 return;
510 }
511
512 dout(10) << m_image_state_check_task << dendl;
513 bool canceled = m_threads->timer->cancel_event(m_image_state_check_task);
514 ceph_assert(canceled);
515 m_image_state_check_task = nullptr;
516 }
517
518 template <typename I>
519 void InstanceReplayer<I>::schedule_image_state_check_task() {
520 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
521 ceph_assert(m_image_state_check_task == nullptr);
522
523 m_image_state_check_task = new LambdaContext(
524 [this](int r) {
525 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
526 m_image_state_check_task = nullptr;
527 schedule_image_state_check_task();
528 queue_start_image_replayers();
529 });
530
531 auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
532 int after = cct->_conf.get_val<uint64_t>(
533 "rbd_mirror_image_state_check_interval");
534
535 dout(10) << "scheduling image state check after " << after << " sec (task "
536 << m_image_state_check_task << ")" << dendl;
537 m_threads->timer->add_event_after(after, m_image_state_check_task);
538 }
539
540 } // namespace mirror
541 } // namespace rbd
542
543 template class rbd::mirror::InstanceReplayer<librbd::ImageCtx>;