]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/InstanceReplayer.cc
import ceph 16.2.6
[ceph.git] / ceph / src / tools / rbd_mirror / InstanceReplayer.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "include/stringify.h"
9f95a23c 5#include "common/Cond.h"
7c673cae
FG
6#include "common/Timer.h"
7#include "common/debug.h"
8#include "common/errno.h"
9#include "librbd/Utils.h"
f67539c2 10#include "librbd/asio/ContextWQ.h"
7c673cae
FG
11#include "ImageReplayer.h"
12#include "InstanceReplayer.h"
c07f9fc5 13#include "ServiceDaemon.h"
7c673cae
FG
14#include "Threads.h"
15
16#define dout_context g_ceph_context
17#define dout_subsys ceph_subsys_rbd_mirror
18#undef dout_prefix
19#define dout_prefix *_dout << "rbd::mirror::InstanceReplayer: " \
20 << this << " " << __func__ << ": "
21
22namespace rbd {
23namespace mirror {
24
c07f9fc5
FG
25namespace {
26
27const std::string SERVICE_DAEMON_ASSIGNED_COUNT_KEY("image_assigned_count");
28const std::string SERVICE_DAEMON_WARNING_COUNT_KEY("image_warning_count");
29const std::string SERVICE_DAEMON_ERROR_COUNT_KEY("image_error_count");
30
31} // anonymous namespace
32
7c673cae
FG
33using librbd::util::create_async_context_callback;
34using librbd::util::create_context_callback;
35
36template <typename I>
37InstanceReplayer<I>::InstanceReplayer(
9f95a23c 38 librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid,
c07f9fc5 39 Threads<I> *threads, ServiceDaemon<I>* service_daemon,
9f95a23c
TL
40 MirrorStatusUpdater<I>* local_status_updater,
41 journal::CacheManagerHandler *cache_manager_handler,
42 PoolMetaCache* pool_meta_cache)
43 : m_local_io_ctx(local_io_ctx), m_local_mirror_uuid(local_mirror_uuid),
44 m_threads(threads), m_service_daemon(service_daemon),
45 m_local_status_updater(local_status_updater),
46 m_cache_manager_handler(cache_manager_handler),
47 m_pool_meta_cache(pool_meta_cache),
48 m_lock(ceph::make_mutex("rbd::mirror::InstanceReplayer " +
49 stringify(local_io_ctx.get_id()))) {
7c673cae
FG
50}
51
52template <typename I>
53InstanceReplayer<I>::~InstanceReplayer() {
11fdf7f2
TL
54 ceph_assert(m_image_state_check_task == nullptr);
55 ceph_assert(m_async_op_tracker.empty());
56 ceph_assert(m_image_replayers.empty());
7c673cae
FG
57}
58
9f95a23c 59template <typename I>
f67539c2 60bool InstanceReplayer<I>::is_blocklisted() const {
9f95a23c 61 std::lock_guard locker{m_lock};
f67539c2 62 return m_blocklisted;
9f95a23c
TL
63}
64
7c673cae
FG
65template <typename I>
66int InstanceReplayer<I>::init() {
67 C_SaferCond init_ctx;
68 init(&init_ctx);
69 return init_ctx.wait();
70}
71
72template <typename I>
73void InstanceReplayer<I>::init(Context *on_finish) {
11fdf7f2 74 dout(10) << dendl;
7c673cae 75
9f95a23c 76 Context *ctx = new LambdaContext(
7c673cae
FG
77 [this, on_finish] (int r) {
78 {
9f95a23c 79 std::lock_guard timer_locker{m_threads->timer_lock};
7c673cae
FG
80 schedule_image_state_check_task();
81 }
82 on_finish->complete(0);
83 });
84
85 m_threads->work_queue->queue(ctx, 0);
86}
87
88template <typename I>
89void InstanceReplayer<I>::shut_down() {
90 C_SaferCond shut_down_ctx;
91 shut_down(&shut_down_ctx);
92 int r = shut_down_ctx.wait();
11fdf7f2 93 ceph_assert(r == 0);
7c673cae
FG
94}
95
96template <typename I>
97void InstanceReplayer<I>::shut_down(Context *on_finish) {
11fdf7f2 98 dout(10) << dendl;
7c673cae 99
9f95a23c 100 std::lock_guard locker{m_lock};
7c673cae 101
11fdf7f2 102 ceph_assert(m_on_shut_down == nullptr);
7c673cae
FG
103 m_on_shut_down = on_finish;
104
9f95a23c 105 Context *ctx = new LambdaContext(
7c673cae
FG
106 [this] (int r) {
107 cancel_image_state_check_task();
108 wait_for_ops();
109 });
110
111 m_threads->work_queue->queue(ctx, 0);
112}
113
114template <typename I>
9f95a23c
TL
115void InstanceReplayer<I>::add_peer(const Peer<I>& peer) {
116 dout(10) << "peer=" << peer << dendl;
7c673cae 117
9f95a23c
TL
118 std::lock_guard locker{m_lock};
119 auto result = m_peers.insert(peer).second;
11fdf7f2 120 ceph_assert(result);
7c673cae
FG
121}
122
7c673cae
FG
123template <typename I>
124void InstanceReplayer<I>::release_all(Context *on_finish) {
11fdf7f2 125 dout(10) << dendl;
7c673cae 126
9f95a23c 127 std::lock_guard locker{m_lock};
7c673cae
FG
128
129 C_Gather *gather_ctx = new C_Gather(g_ceph_context, on_finish);
130 for (auto it = m_image_replayers.begin(); it != m_image_replayers.end();
131 it = m_image_replayers.erase(it)) {
132 auto image_replayer = it->second;
133 auto ctx = gather_ctx->new_sub();
9f95a23c 134 ctx = new LambdaContext(
7c673cae
FG
135 [image_replayer, ctx] (int r) {
136 image_replayer->destroy();
137 ctx->complete(0);
138 });
139 stop_image_replayer(image_replayer, ctx);
140 }
141 gather_ctx->activate();
142}
143
144template <typename I>
31f18b77
FG
145void InstanceReplayer<I>::acquire_image(InstanceWatcher<I> *instance_watcher,
146 const std::string &global_image_id,
7c673cae 147 Context *on_finish) {
11fdf7f2 148 dout(10) << "global_image_id=" << global_image_id << dendl;
7c673cae 149
9f95a23c 150 std::lock_guard locker{m_lock};
7c673cae 151
11fdf7f2 152 ceph_assert(m_on_shut_down == nullptr);
7c673cae
FG
153
154 auto it = m_image_replayers.find(global_image_id);
7c673cae
FG
155 if (it == m_image_replayers.end()) {
156 auto image_replayer = ImageReplayer<I>::create(
9f95a23c
TL
157 m_local_io_ctx, m_local_mirror_uuid, global_image_id,
158 m_threads, instance_watcher, m_local_status_updater,
159 m_cache_manager_handler, m_pool_meta_cache);
7c673cae 160
11fdf7f2 161 dout(10) << global_image_id << ": creating replayer " << image_replayer
7c673cae
FG
162 << dendl;
163
164 it = m_image_replayers.insert(std::make_pair(global_image_id,
165 image_replayer)).first;
d2e6a577
FG
166
167 // TODO only a single peer is currently supported
11fdf7f2 168 ceph_assert(m_peers.size() == 1);
d2e6a577 169 auto peer = *m_peers.begin();
9f95a23c 170 image_replayer->add_peer(peer);
11fdf7f2
TL
171 start_image_replayer(image_replayer);
172 } else {
173 // A duplicate acquire notification implies (1) connection hiccup or
174 // (2) new leader election. For the second case, restart the replayer to
175 // detect if the image has been deleted while the leader was offline
176 auto& image_replayer = it->second;
177 image_replayer->set_finished(false);
1911f103 178 image_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr));
7c673cae
FG
179 }
180
7c673cae
FG
181 m_threads->work_queue->queue(on_finish, 0);
182}
183
184template <typename I>
185void InstanceReplayer<I>::release_image(const std::string &global_image_id,
7c673cae 186 Context *on_finish) {
11fdf7f2 187 dout(10) << "global_image_id=" << global_image_id << dendl;
7c673cae 188
9f95a23c 189 std::lock_guard locker{m_lock};
11fdf7f2 190 ceph_assert(m_on_shut_down == nullptr);
7c673cae
FG
191
192 auto it = m_image_replayers.find(global_image_id);
7c673cae 193 if (it == m_image_replayers.end()) {
11fdf7f2 194 dout(5) << global_image_id << ": not found" << dendl;
7c673cae
FG
195 m_threads->work_queue->queue(on_finish, 0);
196 return;
197 }
198
199 auto image_replayer = it->second;
7c673cae
FG
200 m_image_replayers.erase(it);
201
9f95a23c 202 on_finish = new LambdaContext(
7c673cae
FG
203 [image_replayer, on_finish] (int r) {
204 image_replayer->destroy();
205 on_finish->complete(0);
206 });
d2e6a577
FG
207 stop_image_replayer(image_replayer, on_finish);
208}
7c673cae 209
d2e6a577
FG
210template <typename I>
211void InstanceReplayer<I>::remove_peer_image(const std::string &global_image_id,
212 const std::string &peer_mirror_uuid,
213 Context *on_finish) {
11fdf7f2 214 dout(10) << "global_image_id=" << global_image_id << ", "
d2e6a577 215 << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
7c673cae 216
9f95a23c 217 std::lock_guard locker{m_lock};
11fdf7f2 218 ceph_assert(m_on_shut_down == nullptr);
d2e6a577
FG
219
220 auto it = m_image_replayers.find(global_image_id);
221 if (it != m_image_replayers.end()) {
222 // TODO only a single peer is currently supported, therefore
223 // we can just interrupt the current image replayer and
224 // it will eventually detect that the peer image is missing and
225 // determine if a delete propagation is required.
226 auto image_replayer = it->second;
1911f103 227 image_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr));
d2e6a577
FG
228 }
229 m_threads->work_queue->queue(on_finish, 0);
7c673cae
FG
230}
231
232template <typename I>
9f95a23c 233void InstanceReplayer<I>::print_status(Formatter *f) {
11fdf7f2 234 dout(10) << dendl;
7c673cae 235
9f95a23c 236 std::lock_guard locker{m_lock};
7c673cae
FG
237
238 f->open_array_section("image_replayers");
239 for (auto &kv : m_image_replayers) {
240 auto &image_replayer = kv.second;
9f95a23c 241 image_replayer->print_status(f);
7c673cae
FG
242 }
243 f->close_section();
244}
245
246template <typename I>
247void InstanceReplayer<I>::start()
248{
11fdf7f2 249 dout(10) << dendl;
7c673cae 250
9f95a23c 251 std::lock_guard locker{m_lock};
7c673cae
FG
252
253 m_manual_stop = false;
254
1911f103
TL
255 auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
256 auto gather_ctx = new C_Gather(
257 cct, new C_TrackedOp(m_async_op_tracker, nullptr));
7c673cae
FG
258 for (auto &kv : m_image_replayers) {
259 auto &image_replayer = kv.second;
1911f103 260 image_replayer->start(gather_ctx->new_sub(), true);
7c673cae 261 }
1911f103
TL
262
263 gather_ctx->activate();
7c673cae
FG
264}
265
266template <typename I>
267void InstanceReplayer<I>::stop()
268{
1911f103 269 stop(nullptr);
7c673cae
FG
270}
271
9f95a23c
TL
272template <typename I>
273void InstanceReplayer<I>::stop(Context *on_finish)
274{
275 dout(10) << dendl;
276
e306af50
TL
277 if (on_finish == nullptr) {
278 on_finish = new C_TrackedOp(m_async_op_tracker, on_finish);
279 } else {
280 on_finish = new LambdaContext(
281 [this, on_finish] (int r) {
282 m_async_op_tracker.wait_for_ops(on_finish);
283 });
284 }
285
9f95a23c 286 auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
e306af50 287 auto gather_ctx = new C_Gather(cct, on_finish);
9f95a23c
TL
288 {
289 std::lock_guard locker{m_lock};
290
291 m_manual_stop = true;
292
293 for (auto &kv : m_image_replayers) {
294 auto &image_replayer = kv.second;
295 image_replayer->stop(gather_ctx->new_sub(), true);
296 }
297 }
298
299 gather_ctx->activate();
300}
301
7c673cae
FG
302template <typename I>
303void InstanceReplayer<I>::restart()
304{
11fdf7f2 305 dout(10) << dendl;
7c673cae 306
9f95a23c 307 std::lock_guard locker{m_lock};
7c673cae
FG
308
309 m_manual_stop = false;
310
311 for (auto &kv : m_image_replayers) {
312 auto &image_replayer = kv.second;
1911f103 313 image_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr));
7c673cae
FG
314 }
315}
316
317template <typename I>
318void InstanceReplayer<I>::flush()
319{
11fdf7f2 320 dout(10) << dendl;
7c673cae 321
9f95a23c 322 std::lock_guard locker{m_lock};
7c673cae
FG
323
324 for (auto &kv : m_image_replayers) {
325 auto &image_replayer = kv.second;
326 image_replayer->flush();
327 }
328}
329
330template <typename I>
331void InstanceReplayer<I>::start_image_replayer(
332 ImageReplayer<I> *image_replayer) {
9f95a23c 333 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
334
335 std::string global_image_id = image_replayer->get_global_image_id();
7c673cae
FG
336 if (!image_replayer->is_stopped()) {
337 return;
f67539c2
TL
338 } else if (image_replayer->is_blocklisted()) {
339 derr << "global_image_id=" << global_image_id << ": blocklisted detected "
11fdf7f2 340 << "during image replay" << dendl;
f67539c2 341 m_blocklisted = true;
7c673cae 342 return;
d2e6a577
FG
343 } else if (image_replayer->is_finished()) {
344 // TODO temporary until policy integrated
345 dout(5) << "removing image replayer for global_image_id="
346 << global_image_id << dendl;
347 m_image_replayers.erase(image_replayer->get_global_image_id());
348 image_replayer->destroy();
349 return;
11fdf7f2
TL
350 } else if (m_manual_stop) {
351 return;
7c673cae
FG
352 }
353
11fdf7f2 354 dout(10) << "global_image_id=" << global_image_id << dendl;
1911f103 355 image_replayer->start(new C_TrackedOp(m_async_op_tracker, nullptr), false);
7c673cae
FG
356}
357
358template <typename I>
c07f9fc5 359void InstanceReplayer<I>::queue_start_image_replayers() {
11fdf7f2 360 dout(10) << dendl;
7c673cae 361
c07f9fc5
FG
362 Context *ctx = create_context_callback<
363 InstanceReplayer, &InstanceReplayer<I>::start_image_replayers>(this);
7c673cae
FG
364 m_async_op_tracker.start_op();
365 m_threads->work_queue->queue(ctx, 0);
366}
367
c07f9fc5
FG
368template <typename I>
369void InstanceReplayer<I>::start_image_replayers(int r) {
11fdf7f2 370 dout(10) << dendl;
c07f9fc5 371
9f95a23c 372 std::lock_guard locker{m_lock};
c07f9fc5 373 if (m_on_shut_down != nullptr) {
522d829b 374 m_async_op_tracker.finish_op();
c07f9fc5
FG
375 return;
376 }
377
d2e6a577
FG
378 uint64_t image_count = 0;
379 uint64_t warning_count = 0;
380 uint64_t error_count = 0;
381 for (auto it = m_image_replayers.begin();
382 it != m_image_replayers.end();) {
383 auto current_it(it);
384 ++it;
385
c07f9fc5 386 ++image_count;
d2e6a577 387 auto health_state = current_it->second->get_health_state();
c07f9fc5
FG
388 if (health_state == image_replayer::HEALTH_STATE_WARNING) {
389 ++warning_count;
390 } else if (health_state == image_replayer::HEALTH_STATE_ERROR) {
391 ++error_count;
392 }
393
d2e6a577 394 start_image_replayer(current_it->second);
c07f9fc5
FG
395 }
396
9f95a23c
TL
397 m_service_daemon->add_or_update_namespace_attribute(
398 m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
399 SERVICE_DAEMON_ASSIGNED_COUNT_KEY, image_count);
400 m_service_daemon->add_or_update_namespace_attribute(
401 m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
402 SERVICE_DAEMON_WARNING_COUNT_KEY, warning_count);
403 m_service_daemon->add_or_update_namespace_attribute(
404 m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
405 SERVICE_DAEMON_ERROR_COUNT_KEY, error_count);
c07f9fc5
FG
406
407 m_async_op_tracker.finish_op();
408}
7c673cae
FG
409
410template <typename I>
411void InstanceReplayer<I>::stop_image_replayer(ImageReplayer<I> *image_replayer,
412 Context *on_finish) {
11fdf7f2 413 dout(10) << image_replayer << " global_image_id="
7c673cae
FG
414 << image_replayer->get_global_image_id() << ", on_finish="
415 << on_finish << dendl;
416
417 if (image_replayer->is_stopped()) {
418 m_threads->work_queue->queue(on_finish, 0);
419 return;
420 }
421
422 m_async_op_tracker.start_op();
423 Context *ctx = create_async_context_callback(
9f95a23c 424 m_threads->work_queue, new LambdaContext(
7c673cae
FG
425 [this, image_replayer, on_finish] (int r) {
426 stop_image_replayer(image_replayer, on_finish);
427 m_async_op_tracker.finish_op();
428 }));
429
430 if (image_replayer->is_running()) {
431 image_replayer->stop(ctx, false);
432 } else {
433 int after = 1;
11fdf7f2 434 dout(10) << "scheduling image replayer " << image_replayer << " stop after "
7c673cae 435 << after << " sec (task " << ctx << ")" << dendl;
9f95a23c 436 ctx = new LambdaContext(
7c673cae 437 [this, after, ctx] (int r) {
9f95a23c 438 std::lock_guard timer_locker{m_threads->timer_lock};
7c673cae
FG
439 m_threads->timer->add_event_after(after, ctx);
440 });
441 m_threads->work_queue->queue(ctx, 0);
442 }
443}
444
445template <typename I>
446void InstanceReplayer<I>::wait_for_ops() {
11fdf7f2 447 dout(10) << dendl;
7c673cae
FG
448
449 Context *ctx = create_context_callback<
450 InstanceReplayer, &InstanceReplayer<I>::handle_wait_for_ops>(this);
451
452 m_async_op_tracker.wait_for_ops(ctx);
453}
454
455template <typename I>
456void InstanceReplayer<I>::handle_wait_for_ops(int r) {
11fdf7f2 457 dout(10) << "r=" << r << dendl;
7c673cae 458
11fdf7f2 459 ceph_assert(r == 0);
7c673cae 460
9f95a23c 461 std::lock_guard locker{m_lock};
7c673cae
FG
462 stop_image_replayers();
463}
464
465template <typename I>
466void InstanceReplayer<I>::stop_image_replayers() {
11fdf7f2 467 dout(10) << dendl;
7c673cae 468
9f95a23c 469 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
470
471 Context *ctx = create_async_context_callback(
472 m_threads->work_queue, create_context_callback<InstanceReplayer<I>,
473 &InstanceReplayer<I>::handle_stop_image_replayers>(this));
474
475 C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
476 for (auto &it : m_image_replayers) {
477 stop_image_replayer(it.second, gather_ctx->new_sub());
478 }
479 gather_ctx->activate();
480}
481
482template <typename I>
483void InstanceReplayer<I>::handle_stop_image_replayers(int r) {
11fdf7f2 484 dout(10) << "r=" << r << dendl;
7c673cae 485
11fdf7f2 486 ceph_assert(r == 0);
7c673cae
FG
487
488 Context *on_finish = nullptr;
489 {
9f95a23c 490 std::lock_guard locker{m_lock};
7c673cae
FG
491
492 for (auto &it : m_image_replayers) {
11fdf7f2 493 ceph_assert(it.second->is_stopped());
7c673cae
FG
494 it.second->destroy();
495 }
496 m_image_replayers.clear();
497
11fdf7f2 498 ceph_assert(m_on_shut_down != nullptr);
7c673cae
FG
499 std::swap(on_finish, m_on_shut_down);
500 }
501 on_finish->complete(r);
502}
503
504template <typename I>
505void InstanceReplayer<I>::cancel_image_state_check_task() {
9f95a23c 506 std::lock_guard timer_locker{m_threads->timer_lock};
7c673cae
FG
507
508 if (m_image_state_check_task == nullptr) {
509 return;
510 }
511
11fdf7f2 512 dout(10) << m_image_state_check_task << dendl;
7c673cae 513 bool canceled = m_threads->timer->cancel_event(m_image_state_check_task);
11fdf7f2 514 ceph_assert(canceled);
7c673cae
FG
515 m_image_state_check_task = nullptr;
516}
517
518template <typename I>
519void InstanceReplayer<I>::schedule_image_state_check_task() {
9f95a23c 520 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
11fdf7f2 521 ceph_assert(m_image_state_check_task == nullptr);
7c673cae 522
9f95a23c 523 m_image_state_check_task = new LambdaContext(
7c673cae 524 [this](int r) {
9f95a23c 525 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
7c673cae
FG
526 m_image_state_check_task = nullptr;
527 schedule_image_state_check_task();
c07f9fc5 528 queue_start_image_replayers();
7c673cae
FG
529 });
530
9f95a23c 531 auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
11fdf7f2 532 int after = cct->_conf.get_val<uint64_t>(
181888fb 533 "rbd_mirror_image_state_check_interval");
7c673cae 534
11fdf7f2 535 dout(10) << "scheduling image state check after " << after << " sec (task "
7c673cae
FG
536 << m_image_state_check_task << ")" << dendl;
537 m_threads->timer->add_event_after(after, m_image_state_check_task);
538}
539
540} // namespace mirror
541} // namespace rbd
542
543template class rbd::mirror::InstanceReplayer<librbd::ImageCtx>;