]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/ImageReplayer.cc
import 15.2.2 octopus source
[ceph.git] / ceph / src / tools / rbd_mirror / ImageReplayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/compat.h"
5 #include "common/Formatter.h"
6 #include "common/admin_socket.h"
7 #include "common/debug.h"
8 #include "common/errno.h"
9 #include "include/stringify.h"
10 #include "cls/rbd/cls_rbd_client.h"
11 #include "common/Timer.h"
12 #include "common/WorkQueue.h"
13 #include "global/global_context.h"
14 #include "journal/Journaler.h"
15 #include "librbd/ExclusiveLock.h"
16 #include "librbd/ImageCtx.h"
17 #include "librbd/ImageState.h"
18 #include "librbd/Journal.h"
19 #include "librbd/Operations.h"
20 #include "librbd/Utils.h"
21 #include "ImageDeleter.h"
22 #include "ImageReplayer.h"
23 #include "MirrorStatusUpdater.h"
24 #include "Threads.h"
25 #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
26 #include "tools/rbd_mirror/image_replayer/ReplayerListener.h"
27 #include "tools/rbd_mirror/image_replayer/StateBuilder.h"
28 #include "tools/rbd_mirror/image_replayer/Utils.h"
29 #include "tools/rbd_mirror/image_replayer/journal/Replayer.h"
30 #include "tools/rbd_mirror/image_replayer/journal/StateBuilder.h"
31 #include <map>
32
33 #define dout_context g_ceph_context
34 #define dout_subsys ceph_subsys_rbd_mirror
35 #undef dout_prefix
36 #define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
37 << __func__ << ": "
38
39 extern PerfCounters *g_perf_counters;
40
41 namespace rbd {
42 namespace mirror {
43
44 using librbd::util::create_context_callback;
45
46 template <typename I>
47 std::ostream &operator<<(std::ostream &os,
48 const typename ImageReplayer<I>::State &state);
49
50 namespace {
51
52 template <typename I>
53 class ImageReplayerAdminSocketCommand {
54 public:
55 ImageReplayerAdminSocketCommand(const std::string &desc,
56 ImageReplayer<I> *replayer)
57 : desc(desc), replayer(replayer) {
58 }
59 virtual ~ImageReplayerAdminSocketCommand() {}
60 virtual int call(Formatter *f) = 0;
61
62 std::string desc;
63 ImageReplayer<I> *replayer;
64 bool registered = false;
65 };
66
67 template <typename I>
68 class StatusCommand : public ImageReplayerAdminSocketCommand<I> {
69 public:
70 explicit StatusCommand(const std::string &desc, ImageReplayer<I> *replayer)
71 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
72 }
73
74 int call(Formatter *f) override {
75 this->replayer->print_status(f);
76 return 0;
77 }
78 };
79
80 template <typename I>
81 class StartCommand : public ImageReplayerAdminSocketCommand<I> {
82 public:
83 explicit StartCommand(const std::string &desc, ImageReplayer<I> *replayer)
84 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
85 }
86
87 int call(Formatter *f) override {
88 this->replayer->start(nullptr, true);
89 return 0;
90 }
91 };
92
93 template <typename I>
94 class StopCommand : public ImageReplayerAdminSocketCommand<I> {
95 public:
96 explicit StopCommand(const std::string &desc, ImageReplayer<I> *replayer)
97 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
98 }
99
100 int call(Formatter *f) override {
101 this->replayer->stop(nullptr, true);
102 return 0;
103 }
104 };
105
106 template <typename I>
107 class RestartCommand : public ImageReplayerAdminSocketCommand<I> {
108 public:
109 explicit RestartCommand(const std::string &desc, ImageReplayer<I> *replayer)
110 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
111 }
112
113 int call(Formatter *f) override {
114 this->replayer->restart();
115 return 0;
116 }
117 };
118
119 template <typename I>
120 class FlushCommand : public ImageReplayerAdminSocketCommand<I> {
121 public:
122 explicit FlushCommand(const std::string &desc, ImageReplayer<I> *replayer)
123 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
124 }
125
126 int call(Formatter *f) override {
127 this->replayer->flush();
128 return 0;
129 }
130 };
131
132 template <typename I>
133 class ImageReplayerAdminSocketHook : public AdminSocketHook {
134 public:
135 ImageReplayerAdminSocketHook(CephContext *cct, const std::string &name,
136 ImageReplayer<I> *replayer)
137 : admin_socket(cct->get_admin_socket()),
138 commands{{"rbd mirror flush " + name,
139 new FlushCommand<I>("flush rbd mirror " + name, replayer)},
140 {"rbd mirror restart " + name,
141 new RestartCommand<I>("restart rbd mirror " + name, replayer)},
142 {"rbd mirror start " + name,
143 new StartCommand<I>("start rbd mirror " + name, replayer)},
144 {"rbd mirror status " + name,
145 new StatusCommand<I>("get status for rbd mirror " + name, replayer)},
146 {"rbd mirror stop " + name,
147 new StopCommand<I>("stop rbd mirror " + name, replayer)}} {
148 }
149
150 int register_commands() {
151 for (auto &it : commands) {
152 int r = admin_socket->register_command(it.first, this,
153 it.second->desc);
154 if (r < 0) {
155 return r;
156 }
157 it.second->registered = true;
158 }
159 return 0;
160 }
161
162 ~ImageReplayerAdminSocketHook() override {
163 admin_socket->unregister_commands(this);
164 for (auto &it : commands) {
165 delete it.second;
166 }
167 commands.clear();
168 }
169
170 int call(std::string_view command, const cmdmap_t& cmdmap,
171 Formatter *f,
172 std::ostream& errss,
173 bufferlist& out) override {
174 auto i = commands.find(command);
175 ceph_assert(i != commands.end());
176 return i->second->call(f);
177 }
178
179 private:
180 typedef std::map<std::string, ImageReplayerAdminSocketCommand<I>*,
181 std::less<>> Commands;
182
183 AdminSocket *admin_socket;
184 Commands commands;
185 };
186
187 } // anonymous namespace
188
189 template <typename I>
190 void ImageReplayer<I>::BootstrapProgressContext::update_progress(
191 const std::string &description, bool flush)
192 {
193 const std::string desc = "bootstrapping, " + description;
194 replayer->set_state_description(0, desc);
195 if (flush) {
196 replayer->update_mirror_image_status(false, boost::none);
197 }
198 }
199
200 template <typename I>
201 struct ImageReplayer<I>::ReplayerListener
202 : public image_replayer::ReplayerListener {
203 ImageReplayer<I>* image_replayer;
204
205 ReplayerListener(ImageReplayer<I>* image_replayer)
206 : image_replayer(image_replayer) {
207 }
208
209 void handle_notification() override {
210 image_replayer->handle_replayer_notification();
211 }
212 };
213
214 template <typename I>
215 ImageReplayer<I>::ImageReplayer(
216 librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid,
217 const std::string &global_image_id, Threads<I> *threads,
218 InstanceWatcher<I> *instance_watcher,
219 MirrorStatusUpdater<I>* local_status_updater,
220 journal::CacheManagerHandler *cache_manager_handler,
221 PoolMetaCache* pool_meta_cache) :
222 m_local_io_ctx(local_io_ctx), m_local_mirror_uuid(local_mirror_uuid),
223 m_global_image_id(global_image_id), m_threads(threads),
224 m_instance_watcher(instance_watcher),
225 m_local_status_updater(local_status_updater),
226 m_cache_manager_handler(cache_manager_handler),
227 m_pool_meta_cache(pool_meta_cache),
228 m_local_image_name(global_image_id),
229 m_lock(ceph::make_mutex("rbd::mirror::ImageReplayer " +
230 stringify(local_io_ctx.get_id()) + " " + global_image_id)),
231 m_progress_cxt(this),
232 m_replayer_listener(new ReplayerListener(this))
233 {
234 // Register asok commands using a temporary "remote_pool_name/global_image_id"
235 // name. When the image name becomes known on start the asok commands will be
236 // re-registered using "remote_pool_name/remote_image_name" name.
237
238 m_image_spec = image_replayer::util::compute_image_spec(
239 local_io_ctx, global_image_id);
240 register_admin_socket_hook();
241 }
242
243 template <typename I>
244 ImageReplayer<I>::~ImageReplayer()
245 {
246 unregister_admin_socket_hook();
247 ceph_assert(m_state_builder == nullptr);
248 ceph_assert(m_on_start_finish == nullptr);
249 ceph_assert(m_on_stop_finish == nullptr);
250 ceph_assert(m_bootstrap_request == nullptr);
251 ceph_assert(m_update_status_task == nullptr);
252 delete m_replayer_listener;
253 }
254
255 template <typename I>
256 image_replayer::HealthState ImageReplayer<I>::get_health_state() const {
257 std::lock_guard locker{m_lock};
258
259 if (!m_mirror_image_status_state) {
260 return image_replayer::HEALTH_STATE_OK;
261 } else if (*m_mirror_image_status_state ==
262 cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING ||
263 *m_mirror_image_status_state ==
264 cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN) {
265 return image_replayer::HEALTH_STATE_WARNING;
266 }
267 return image_replayer::HEALTH_STATE_ERROR;
268 }
269
270 template <typename I>
271 void ImageReplayer<I>::add_peer(const Peer<I>& peer) {
272 dout(10) << "peer=" << peer << dendl;
273
274 std::lock_guard locker{m_lock};
275 auto it = m_peers.find(peer);
276 if (it == m_peers.end()) {
277 m_peers.insert(peer);
278 }
279 }
280
281 template <typename I>
282 void ImageReplayer<I>::set_state_description(int r, const std::string &desc) {
283 dout(10) << "r=" << r << ", desc=" << desc << dendl;
284
285 std::lock_guard l{m_lock};
286 m_last_r = r;
287 m_state_desc = desc;
288 }
289
290 template <typename I>
291 void ImageReplayer<I>::start(Context *on_finish, bool manual)
292 {
293 dout(10) << "on_finish=" << on_finish << dendl;
294
295 int r = 0;
296 {
297 std::lock_guard locker{m_lock};
298 if (!is_stopped_()) {
299 derr << "already running" << dendl;
300 r = -EINVAL;
301 } else if (m_manual_stop && !manual) {
302 dout(5) << "stopped manually, ignoring start without manual flag"
303 << dendl;
304 r = -EPERM;
305 } else {
306 m_state = STATE_STARTING;
307 m_last_r = 0;
308 m_state_desc.clear();
309 m_manual_stop = false;
310 m_delete_requested = false;
311
312 if (on_finish != nullptr) {
313 ceph_assert(m_on_start_finish == nullptr);
314 m_on_start_finish = on_finish;
315 }
316 ceph_assert(m_on_stop_finish == nullptr);
317 }
318 }
319
320 if (r < 0) {
321 if (on_finish) {
322 on_finish->complete(r);
323 }
324 return;
325 }
326
327 bootstrap();
328 }
329
330 template <typename I>
331 void ImageReplayer<I>::bootstrap() {
332 dout(10) << dendl;
333
334 std::unique_lock locker{m_lock};
335 if (m_peers.empty()) {
336 locker.unlock();
337
338 dout(5) << "no peer clusters" << dendl;
339 on_start_fail(-ENOENT, "no peer clusters");
340 return;
341 }
342
343 // TODO need to support multiple remote images
344 ceph_assert(!m_peers.empty());
345 m_remote_image_peer = *m_peers.begin();
346
347 if (on_start_interrupted(m_lock)) {
348 return;
349 }
350
351 ceph_assert(m_state_builder == nullptr);
352 auto ctx = create_context_callback<
353 ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
354 auto request = image_replayer::BootstrapRequest<I>::create(
355 m_threads, m_local_io_ctx, m_remote_image_peer.io_ctx, m_instance_watcher,
356 m_global_image_id, m_local_mirror_uuid,
357 m_remote_image_peer.remote_pool_meta, m_cache_manager_handler,
358 m_pool_meta_cache, &m_progress_cxt, &m_state_builder, &m_resync_requested,
359 ctx);
360
361 request->get();
362 m_bootstrap_request = request;
363 locker.unlock();
364
365 update_mirror_image_status(false, boost::none);
366 request->send();
367 }
368
369 template <typename I>
370 void ImageReplayer<I>::handle_bootstrap(int r) {
371 dout(10) << "r=" << r << dendl;
372 {
373 std::lock_guard locker{m_lock};
374 m_bootstrap_request->put();
375 m_bootstrap_request = nullptr;
376 }
377
378 if (on_start_interrupted()) {
379 return;
380 } else if (r == -ENOMSG) {
381 dout(5) << "local image is primary" << dendl;
382 on_start_fail(0, "local image is primary");
383 return;
384 } else if (r == -EREMOTEIO) {
385 dout(5) << "remote image is non-primary" << dendl;
386 on_start_fail(-EREMOTEIO, "remote image is non-primary");
387 return;
388 } else if (r == -EEXIST) {
389 on_start_fail(r, "split-brain detected");
390 return;
391 } else if (r == -ENOLINK) {
392 m_delete_requested = true;
393 on_start_fail(0, "remote image no longer exists");
394 return;
395 } else if (r < 0) {
396 on_start_fail(r, "error bootstrapping replay");
397 return;
398 } else if (m_resync_requested) {
399 on_start_fail(0, "resync requested");
400 return;
401 }
402
403 start_replay();
404 }
405
406 template <typename I>
407 void ImageReplayer<I>::start_replay() {
408 dout(10) << dendl;
409
410 std::unique_lock locker{m_lock};
411 ceph_assert(m_replayer == nullptr);
412 m_replayer = m_state_builder->create_replayer(m_threads, m_instance_watcher,
413 m_local_mirror_uuid,
414 m_pool_meta_cache,
415 m_replayer_listener);
416
417 auto ctx = create_context_callback<
418 ImageReplayer<I>, &ImageReplayer<I>::handle_start_replay>(this);
419 m_replayer->init(ctx);
420 }
421
422 template <typename I>
423 void ImageReplayer<I>::handle_start_replay(int r) {
424 dout(10) << "r=" << r << dendl;
425
426 if (on_start_interrupted()) {
427 return;
428 } else if (r < 0) {
429 std::string error_description = m_replayer->get_error_description();
430 if (r == -ENOTCONN && m_replayer->is_resync_requested()) {
431 std::unique_lock locker{m_lock};
432 m_resync_requested = true;
433 }
434
435 // shut down not required if init failed
436 m_replayer->destroy();
437 m_replayer = nullptr;
438
439 derr << "error starting replay: " << cpp_strerror(r) << dendl;
440 on_start_fail(r, error_description);
441 return;
442 }
443
444 Context *on_finish = nullptr;
445 {
446 std::unique_lock locker{m_lock};
447 ceph_assert(m_state == STATE_STARTING);
448 m_state = STATE_REPLAYING;
449 std::swap(m_on_start_finish, on_finish);
450
451 std::unique_lock timer_locker{m_threads->timer_lock};
452 schedule_update_mirror_image_replay_status();
453 }
454
455 update_mirror_image_status(true, boost::none);
456 if (on_replay_interrupted()) {
457 if (on_finish != nullptr) {
458 on_finish->complete(r);
459 }
460 return;
461 }
462
463 dout(10) << "start succeeded" << dendl;
464 if (on_finish != nullptr) {
465 dout(10) << "on finish complete, r=" << r << dendl;
466 on_finish->complete(r);
467 }
468 }
469
470 template <typename I>
471 void ImageReplayer<I>::on_start_fail(int r, const std::string &desc)
472 {
473 dout(10) << "r=" << r << ", desc=" << desc << dendl;
474 Context *ctx = new LambdaContext([this, r, desc](int _r) {
475 {
476 std::lock_guard locker{m_lock};
477 ceph_assert(m_state == STATE_STARTING);
478 m_state = STATE_STOPPING;
479 if (r < 0 && r != -ECANCELED && r != -EREMOTEIO && r != -ENOENT) {
480 derr << "start failed: " << cpp_strerror(r) << dendl;
481 } else {
482 dout(10) << "start canceled" << dendl;
483 }
484 }
485
486 set_state_description(r, desc);
487 update_mirror_image_status(false, boost::none);
488 shut_down(r);
489 });
490 m_threads->work_queue->queue(ctx, 0);
491 }
492
493 template <typename I>
494 bool ImageReplayer<I>::on_start_interrupted() {
495 std::lock_guard locker{m_lock};
496 return on_start_interrupted(m_lock);
497 }
498
499 template <typename I>
500 bool ImageReplayer<I>::on_start_interrupted(ceph::mutex& lock) {
501 ceph_assert(ceph_mutex_is_locked(m_lock));
502 ceph_assert(m_state == STATE_STARTING);
503 if (!m_stop_requested) {
504 return false;
505 }
506
507 on_start_fail(-ECANCELED, "");
508 return true;
509 }
510
511 template <typename I>
512 void ImageReplayer<I>::stop(Context *on_finish, bool manual, int r,
513 const std::string& desc)
514 {
515 dout(10) << "on_finish=" << on_finish << ", manual=" << manual
516 << ", desc=" << desc << dendl;
517
518 image_replayer::BootstrapRequest<I> *bootstrap_request = nullptr;
519 bool shut_down_replay = false;
520 bool running = true;
521 {
522 std::lock_guard locker{m_lock};
523
524 if (!is_running_()) {
525 running = false;
526 } else {
527 if (!is_stopped_()) {
528 if (m_state == STATE_STARTING) {
529 dout(10) << "canceling start" << dendl;
530 if (m_bootstrap_request != nullptr) {
531 bootstrap_request = m_bootstrap_request;
532 bootstrap_request->get();
533 }
534 } else {
535 dout(10) << "interrupting replay" << dendl;
536 shut_down_replay = true;
537 }
538
539 ceph_assert(m_on_stop_finish == nullptr);
540 std::swap(m_on_stop_finish, on_finish);
541 m_stop_requested = true;
542 m_manual_stop = manual;
543 }
544 }
545 }
546
547 // avoid holding lock since bootstrap request will update status
548 if (bootstrap_request != nullptr) {
549 dout(10) << "canceling bootstrap" << dendl;
550 bootstrap_request->cancel();
551 bootstrap_request->put();
552 }
553
554 if (!running) {
555 dout(20) << "not running" << dendl;
556 if (on_finish) {
557 on_finish->complete(-EINVAL);
558 }
559 return;
560 }
561
562 if (shut_down_replay) {
563 on_stop_journal_replay(r, desc);
564 } else if (on_finish != nullptr) {
565 on_finish->complete(0);
566 }
567 }
568
569 template <typename I>
570 void ImageReplayer<I>::on_stop_journal_replay(int r, const std::string &desc)
571 {
572 dout(10) << dendl;
573
574 {
575 std::lock_guard locker{m_lock};
576 if (m_state != STATE_REPLAYING) {
577 // might be invoked multiple times while stopping
578 return;
579 }
580
581 m_stop_requested = true;
582 m_state = STATE_STOPPING;
583 }
584
585 cancel_update_mirror_image_replay_status();
586 set_state_description(r, desc);
587 update_mirror_image_status(true, boost::none);
588 shut_down(0);
589 }
590
591 template <typename I>
592 void ImageReplayer<I>::restart(Context *on_finish)
593 {
594 auto ctx = new LambdaContext(
595 [this, on_finish](int r) {
596 if (r < 0) {
597 // Try start anyway.
598 }
599 start(on_finish, true);
600 });
601 stop(ctx);
602 }
603
604 template <typename I>
605 void ImageReplayer<I>::flush()
606 {
607 C_SaferCond ctx;
608
609 {
610 std::unique_lock locker{m_lock};
611 if (m_state != STATE_REPLAYING) {
612 return;
613 }
614
615 dout(10) << dendl;
616 ceph_assert(m_replayer != nullptr);
617 m_replayer->flush(&ctx);
618 }
619
620 int r = ctx.wait();
621 if (r >= 0) {
622 update_mirror_image_status(false, boost::none);
623 }
624 }
625
626 template <typename I>
627 bool ImageReplayer<I>::on_replay_interrupted()
628 {
629 bool shut_down;
630 {
631 std::lock_guard locker{m_lock};
632 shut_down = m_stop_requested;
633 }
634
635 if (shut_down) {
636 on_stop_journal_replay();
637 }
638 return shut_down;
639 }
640
641 template <typename I>
642 void ImageReplayer<I>::print_status(Formatter *f)
643 {
644 dout(10) << dendl;
645
646 std::lock_guard l{m_lock};
647
648 f->open_object_section("image_replayer");
649 f->dump_string("name", m_image_spec);
650 f->dump_string("state", to_string(m_state));
651 f->close_section();
652 }
653
654 template <typename I>
655 void ImageReplayer<I>::schedule_update_mirror_image_replay_status() {
656 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
657 ceph_assert(ceph_mutex_is_locked_by_me(m_threads->timer_lock));
658 if (m_state != STATE_REPLAYING) {
659 return;
660 }
661
662 dout(10) << dendl;
663
664 // periodically update the replaying status even if nothing changes
665 // so that we can adjust our performance stats
666 ceph_assert(m_update_status_task == nullptr);
667 m_update_status_task = create_context_callback<
668 ImageReplayer<I>,
669 &ImageReplayer<I>::handle_update_mirror_image_replay_status>(this);
670 m_threads->timer->add_event_after(10, m_update_status_task);
671 }
672
673 template <typename I>
674 void ImageReplayer<I>::handle_update_mirror_image_replay_status(int r) {
675 dout(10) << dendl;
676
677 auto ctx = new LambdaContext([this](int) {
678 update_mirror_image_status(false, boost::none);
679
680 {
681 std::unique_lock locker{m_lock};
682 std::unique_lock timer_locker{m_threads->timer_lock};
683 ceph_assert(m_update_status_task != nullptr);
684 m_update_status_task = nullptr;
685
686 schedule_update_mirror_image_replay_status();
687 }
688
689 m_in_flight_op_tracker.finish_op();
690 });
691
692 m_in_flight_op_tracker.start_op();
693 m_threads->work_queue->queue(ctx, 0);
694 }
695
696 template <typename I>
697 void ImageReplayer<I>::cancel_update_mirror_image_replay_status() {
698 std::unique_lock timer_locker{m_threads->timer_lock};
699 if (m_update_status_task != nullptr) {
700 dout(10) << dendl;
701
702 if (m_threads->timer->cancel_event(m_update_status_task)) {
703 m_update_status_task = nullptr;
704 }
705 }
706 }
707
708 template <typename I>
709 void ImageReplayer<I>::update_mirror_image_status(
710 bool force, const OptionalState &opt_state) {
711 dout(15) << "force=" << force << ", "
712 << "state=" << opt_state << dendl;
713
714 {
715 std::lock_guard locker{m_lock};
716 if (!force && !is_stopped_() && !is_running_()) {
717 dout(15) << "shut down in-progress: ignoring update" << dendl;
718 return;
719 }
720 }
721
722 m_in_flight_op_tracker.start_op();
723 auto ctx = new LambdaContext(
724 [this, force, opt_state](int r) {
725 set_mirror_image_status_update(force, opt_state);
726 });
727 m_threads->work_queue->queue(ctx, 0);
728 }
729
730 template <typename I>
731 void ImageReplayer<I>::set_mirror_image_status_update(
732 bool force, const OptionalState &opt_state) {
733 dout(15) << "force=" << force << ", "
734 << "state=" << opt_state << dendl;
735
736 reregister_admin_socket_hook();
737
738 State state;
739 std::string state_desc;
740 int last_r;
741 bool stopping_replay;
742
743 auto mirror_image_status_state = boost::make_optional(
744 false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN);
745 image_replayer::BootstrapRequest<I>* bootstrap_request = nullptr;
746 {
747 std::lock_guard locker{m_lock};
748 state = m_state;
749 state_desc = m_state_desc;
750 mirror_image_status_state = m_mirror_image_status_state;
751 last_r = m_last_r;
752 stopping_replay = (m_replayer != nullptr);
753
754 if (m_bootstrap_request != nullptr) {
755 bootstrap_request = m_bootstrap_request;
756 bootstrap_request->get();
757 }
758 }
759
760 bool syncing = false;
761 if (bootstrap_request != nullptr) {
762 syncing = bootstrap_request->is_syncing();
763 bootstrap_request->put();
764 bootstrap_request = nullptr;
765 }
766
767 if (opt_state) {
768 state = *opt_state;
769 }
770
771 cls::rbd::MirrorImageSiteStatus status;
772 status.up = true;
773 switch (state) {
774 case STATE_STARTING:
775 if (syncing) {
776 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING;
777 status.description = state_desc.empty() ? "syncing" : state_desc;
778 mirror_image_status_state = status.state;
779 } else {
780 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY;
781 status.description = "starting replay";
782 }
783 break;
784 case STATE_REPLAYING:
785 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
786 {
787 std::string desc;
788 auto on_req_finish = new LambdaContext(
789 [this, force](int r) {
790 dout(15) << "replay status ready: r=" << r << dendl;
791 if (r >= 0) {
792 set_mirror_image_status_update(force, boost::none);
793 } else if (r == -EAGAIN) {
794 m_in_flight_op_tracker.finish_op();
795 }
796 });
797
798 ceph_assert(m_replayer != nullptr);
799 if (!m_replayer->get_replay_status(&desc, on_req_finish)) {
800 dout(15) << "waiting for replay status" << dendl;
801 return;
802 }
803
804 status.description = "replaying, " + desc;
805 mirror_image_status_state = boost::make_optional(
806 false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN);
807 }
808 break;
809 case STATE_STOPPING:
810 if (stopping_replay) {
811 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY;
812 status.description = state_desc.empty() ? "stopping replay" : state_desc;
813 break;
814 }
815 // FALLTHROUGH
816 case STATE_STOPPED:
817 if (last_r == -EREMOTEIO) {
818 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN;
819 status.description = state_desc;
820 mirror_image_status_state = status.state;
821 } else if (last_r < 0 && last_r != -ECANCELED) {
822 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR;
823 status.description = state_desc;
824 mirror_image_status_state = status.state;
825 } else {
826 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED;
827 status.description = state_desc.empty() ? "stopped" : state_desc;
828 mirror_image_status_state = boost::none;
829 }
830 break;
831 default:
832 ceph_assert(!"invalid state");
833 }
834
835 {
836 std::lock_guard locker{m_lock};
837 m_mirror_image_status_state = mirror_image_status_state;
838 }
839
840 // prevent the status from ping-ponging when failed replays are restarted
841 if (mirror_image_status_state &&
842 *mirror_image_status_state == cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR) {
843 status.state = *mirror_image_status_state;
844 }
845
846 dout(15) << "status=" << status << dendl;
847 m_local_status_updater->set_mirror_image_status(m_global_image_id, status,
848 force);
849 if (m_remote_image_peer.mirror_status_updater != nullptr) {
850 m_remote_image_peer.mirror_status_updater->set_mirror_image_status(
851 m_global_image_id, status, force);
852 }
853
854 m_in_flight_op_tracker.finish_op();
855 }
856
857 template <typename I>
858 void ImageReplayer<I>::shut_down(int r) {
859 dout(10) << "r=" << r << dendl;
860
861 {
862 std::lock_guard locker{m_lock};
863 ceph_assert(m_state == STATE_STOPPING);
864 }
865
866 if (!m_in_flight_op_tracker.empty()) {
867 dout(15) << "waiting for in-flight operations to complete" << dendl;
868 m_in_flight_op_tracker.wait_for_ops(new LambdaContext([this, r](int) {
869 shut_down(r);
870 }));
871 return;
872 }
873
874 // chain the shut down sequence (reverse order)
875 Context *ctx = new LambdaContext(
876 [this, r](int _r) {
877 update_mirror_image_status(true, STATE_STOPPED);
878 handle_shut_down(r);
879 });
880
881 // destruct the state builder
882 if (m_state_builder != nullptr) {
883 ctx = new LambdaContext([this, ctx](int r) {
884 m_state_builder->close(ctx);
885 });
886 }
887
888 // close the replayer
889 if (m_replayer != nullptr) {
890 ctx = new LambdaContext([this, ctx](int r) {
891 m_replayer->destroy();
892 m_replayer = nullptr;
893 ctx->complete(0);
894 });
895 ctx = new LambdaContext([this, ctx](int r) {
896 m_replayer->shut_down(ctx);
897 });
898 }
899
900 m_threads->work_queue->queue(ctx, 0);
901 }
902
903 template <typename I>
904 void ImageReplayer<I>::handle_shut_down(int r) {
905 bool resync_requested = false;
906 bool delete_requested = false;
907 bool unregister_asok_hook = false;
908 {
909 std::lock_guard locker{m_lock};
910
911 if (m_delete_requested && m_state_builder != nullptr &&
912 !m_state_builder->local_image_id.empty()) {
913 ceph_assert(m_state_builder->remote_image_id.empty());
914 dout(0) << "remote image no longer exists: scheduling deletion" << dendl;
915 unregister_asok_hook = true;
916 std::swap(delete_requested, m_delete_requested);
917 }
918
919 std::swap(resync_requested, m_resync_requested);
920 if (!delete_requested && !resync_requested && m_last_r == -ENOENT &&
921 ((m_state_builder == nullptr) ||
922 (m_state_builder->local_image_id.empty() &&
923 m_state_builder->remote_image_id.empty()))) {
924 dout(0) << "mirror image no longer exists" << dendl;
925 unregister_asok_hook = true;
926 m_finished = true;
927 }
928 }
929
930 if (unregister_asok_hook) {
931 unregister_admin_socket_hook();
932 }
933
934 if (delete_requested || resync_requested) {
935 dout(5) << "moving image to trash" << dendl;
936 auto ctx = new LambdaContext([this, r](int) {
937 handle_shut_down(r);
938 });
939 ImageDeleter<I>::trash_move(m_local_io_ctx, m_global_image_id,
940 resync_requested, m_threads->work_queue, ctx);
941 return;
942 }
943
944 if (!m_in_flight_op_tracker.empty()) {
945 dout(15) << "waiting for in-flight operations to complete" << dendl;
946 m_in_flight_op_tracker.wait_for_ops(new LambdaContext([this, r](int) {
947 handle_shut_down(r);
948 }));
949 return;
950 }
951
952 if (m_local_status_updater->exists(m_global_image_id)) {
953 dout(15) << "removing local mirror image status" << dendl;
954 auto ctx = new LambdaContext([this, r](int) {
955 handle_shut_down(r);
956 });
957 m_local_status_updater->remove_mirror_image_status(m_global_image_id, ctx);
958 return;
959 }
960
961 if (m_remote_image_peer.mirror_status_updater != nullptr &&
962 m_remote_image_peer.mirror_status_updater->exists(m_global_image_id)) {
963 dout(15) << "removing remote mirror image status" << dendl;
964 auto ctx = new LambdaContext([this, r](int) {
965 handle_shut_down(r);
966 });
967 m_remote_image_peer.mirror_status_updater->remove_mirror_image_status(
968 m_global_image_id, ctx);
969 return;
970 }
971
972 if (m_state_builder != nullptr) {
973 m_state_builder->destroy();
974 m_state_builder = nullptr;
975 }
976
977 dout(10) << "stop complete" << dendl;
978 Context *on_start = nullptr;
979 Context *on_stop = nullptr;
980 {
981 std::lock_guard locker{m_lock};
982 std::swap(on_start, m_on_start_finish);
983 std::swap(on_stop, m_on_stop_finish);
984 m_stop_requested = false;
985 ceph_assert(m_state == STATE_STOPPING);
986 m_state = STATE_STOPPED;
987 }
988
989 if (on_start != nullptr) {
990 dout(10) << "on start finish complete, r=" << r << dendl;
991 on_start->complete(r);
992 r = 0;
993 }
994 if (on_stop != nullptr) {
995 dout(10) << "on stop finish complete, r=" << r << dendl;
996 on_stop->complete(r);
997 }
998 }
999
1000 template <typename I>
1001 void ImageReplayer<I>::handle_replayer_notification() {
1002 dout(10) << dendl;
1003
1004 std::unique_lock locker{m_lock};
1005 if (m_state != STATE_REPLAYING) {
1006 // might be attempting to shut down
1007 return;
1008 }
1009
1010 {
1011 // detect a rename of the local image
1012 ceph_assert(m_state_builder != nullptr &&
1013 m_state_builder->local_image_ctx != nullptr);
1014 std::shared_lock image_locker{m_state_builder->local_image_ctx->image_lock};
1015 if (m_local_image_name != m_state_builder->local_image_ctx->name) {
1016 // will re-register with new name after next status update
1017 dout(10) << "image renamed" << dendl;
1018 m_local_image_name = m_state_builder->local_image_ctx->name;
1019 }
1020 }
1021
1022 // replayer cannot be shut down while notification is in-flight
1023 ceph_assert(m_replayer != nullptr);
1024 locker.unlock();
1025
1026 if (m_replayer->is_resync_requested()) {
1027 dout(10) << "resync requested" << dendl;
1028 m_resync_requested = true;
1029 on_stop_journal_replay(0, "resync requested");
1030 return;
1031 }
1032
1033 if (!m_replayer->is_replaying()) {
1034 auto error_code = m_replayer->get_error_code();
1035 auto error_description = m_replayer->get_error_description();
1036 dout(10) << "replay interrupted: "
1037 << "r=" << error_code << ", "
1038 << "error=" << error_description << dendl;
1039 on_stop_journal_replay(error_code, error_description);
1040 return;
1041 }
1042
1043 update_mirror_image_status(false, {});
1044 }
1045
1046 template <typename I>
1047 std::string ImageReplayer<I>::to_string(const State state) {
1048 switch (state) {
1049 case ImageReplayer<I>::STATE_STARTING:
1050 return "Starting";
1051 case ImageReplayer<I>::STATE_REPLAYING:
1052 return "Replaying";
1053 case ImageReplayer<I>::STATE_STOPPING:
1054 return "Stopping";
1055 case ImageReplayer<I>::STATE_STOPPED:
1056 return "Stopped";
1057 default:
1058 break;
1059 }
1060 return "Unknown(" + stringify(state) + ")";
1061 }
1062
1063 template <typename I>
1064 void ImageReplayer<I>::register_admin_socket_hook() {
1065 ImageReplayerAdminSocketHook<I> *asok_hook;
1066 {
1067 std::lock_guard locker{m_lock};
1068 if (m_asok_hook != nullptr) {
1069 return;
1070 }
1071
1072 dout(15) << "registered asok hook: " << m_image_spec << dendl;
1073 asok_hook = new ImageReplayerAdminSocketHook<I>(
1074 g_ceph_context, m_image_spec, this);
1075 int r = asok_hook->register_commands();
1076 if (r == 0) {
1077 m_asok_hook = asok_hook;
1078 return;
1079 }
1080 derr << "error registering admin socket commands" << dendl;
1081 }
1082 delete asok_hook;
1083 }
1084
1085 template <typename I>
1086 void ImageReplayer<I>::unregister_admin_socket_hook() {
1087 dout(15) << dendl;
1088
1089 AdminSocketHook *asok_hook = nullptr;
1090 {
1091 std::lock_guard locker{m_lock};
1092 std::swap(asok_hook, m_asok_hook);
1093 }
1094 delete asok_hook;
1095 }
1096
1097 template <typename I>
1098 void ImageReplayer<I>::reregister_admin_socket_hook() {
1099 std::unique_lock locker{m_lock};
1100 if (m_state == STATE_STARTING && m_bootstrap_request != nullptr) {
1101 m_local_image_name = m_bootstrap_request->get_local_image_name();
1102 }
1103
1104 auto image_spec = image_replayer::util::compute_image_spec(
1105 m_local_io_ctx, m_local_image_name);
1106 if (m_asok_hook != nullptr && m_image_spec == image_spec) {
1107 return;
1108 }
1109
1110 dout(15) << "old_image_spec=" << m_image_spec << ", "
1111 << "new_image_spec=" << image_spec << dendl;
1112 m_image_spec = image_spec;
1113
1114 if (m_state == STATE_STOPPING || m_state == STATE_STOPPED) {
1115 // no need to re-register if stopping
1116 return;
1117 }
1118 locker.unlock();
1119
1120 unregister_admin_socket_hook();
1121 register_admin_socket_hook();
1122 }
1123
1124 template <typename I>
1125 std::ostream &operator<<(std::ostream &os, const ImageReplayer<I> &replayer)
1126 {
1127 os << "ImageReplayer: " << &replayer << " [" << replayer.get_local_pool_id()
1128 << "/" << replayer.get_global_image_id() << "]";
1129 return os;
1130 }
1131
1132 } // namespace mirror
1133 } // namespace rbd
1134
1135 template class rbd::mirror::ImageReplayer<librbd::ImageCtx>;