]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/ImageReplayer.cc
import ceph quincy 17.2.4
[ceph.git] / ceph / src / tools / rbd_mirror / ImageReplayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/compat.h"
5 #include "common/Formatter.h"
6 #include "common/admin_socket.h"
7 #include "common/debug.h"
8 #include "common/errno.h"
9 #include "include/stringify.h"
10 #include "cls/rbd/cls_rbd_client.h"
11 #include "common/Timer.h"
12 #include "global/global_context.h"
13 #include "journal/Journaler.h"
14 #include "librbd/ExclusiveLock.h"
15 #include "librbd/ImageCtx.h"
16 #include "librbd/ImageState.h"
17 #include "librbd/Journal.h"
18 #include "librbd/Operations.h"
19 #include "librbd/Utils.h"
20 #include "librbd/asio/ContextWQ.h"
21 #include "ImageDeleter.h"
22 #include "ImageReplayer.h"
23 #include "MirrorStatusUpdater.h"
24 #include "Threads.h"
25 #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
26 #include "tools/rbd_mirror/image_replayer/ReplayerListener.h"
27 #include "tools/rbd_mirror/image_replayer/StateBuilder.h"
28 #include "tools/rbd_mirror/image_replayer/Utils.h"
29 #include "tools/rbd_mirror/image_replayer/journal/Replayer.h"
30 #include "tools/rbd_mirror/image_replayer/journal/StateBuilder.h"
31 #include <map>
32
33 #define dout_context g_ceph_context
34 #define dout_subsys ceph_subsys_rbd_mirror
35 #undef dout_prefix
36 #define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
37 << __func__ << ": "
38
39 namespace rbd {
40 namespace mirror {
41
42 using librbd::util::create_context_callback;
43
44 template <typename I>
45 std::ostream &operator<<(std::ostream &os,
46 const typename ImageReplayer<I>::State &state);
47
48 namespace {
49
50 template <typename I>
51 class ImageReplayerAdminSocketCommand {
52 public:
53 ImageReplayerAdminSocketCommand(const std::string &desc,
54 ImageReplayer<I> *replayer)
55 : desc(desc), replayer(replayer) {
56 }
57 virtual ~ImageReplayerAdminSocketCommand() {}
58 virtual int call(Formatter *f) = 0;
59
60 std::string desc;
61 ImageReplayer<I> *replayer;
62 bool registered = false;
63 };
64
65 template <typename I>
66 class StatusCommand : public ImageReplayerAdminSocketCommand<I> {
67 public:
68 explicit StatusCommand(const std::string &desc, ImageReplayer<I> *replayer)
69 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
70 }
71
72 int call(Formatter *f) override {
73 this->replayer->print_status(f);
74 return 0;
75 }
76 };
77
78 template <typename I>
79 class StartCommand : public ImageReplayerAdminSocketCommand<I> {
80 public:
81 explicit StartCommand(const std::string &desc, ImageReplayer<I> *replayer)
82 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
83 }
84
85 int call(Formatter *f) override {
86 this->replayer->start(nullptr, true);
87 return 0;
88 }
89 };
90
91 template <typename I>
92 class StopCommand : public ImageReplayerAdminSocketCommand<I> {
93 public:
94 explicit StopCommand(const std::string &desc, ImageReplayer<I> *replayer)
95 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
96 }
97
98 int call(Formatter *f) override {
99 this->replayer->stop(nullptr, true);
100 return 0;
101 }
102 };
103
104 template <typename I>
105 class RestartCommand : public ImageReplayerAdminSocketCommand<I> {
106 public:
107 explicit RestartCommand(const std::string &desc, ImageReplayer<I> *replayer)
108 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
109 }
110
111 int call(Formatter *f) override {
112 this->replayer->restart();
113 return 0;
114 }
115 };
116
117 template <typename I>
118 class FlushCommand : public ImageReplayerAdminSocketCommand<I> {
119 public:
120 explicit FlushCommand(const std::string &desc, ImageReplayer<I> *replayer)
121 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
122 }
123
124 int call(Formatter *f) override {
125 this->replayer->flush();
126 return 0;
127 }
128 };
129
130 template <typename I>
131 class ImageReplayerAdminSocketHook : public AdminSocketHook {
132 public:
133 ImageReplayerAdminSocketHook(CephContext *cct, const std::string &name,
134 ImageReplayer<I> *replayer)
135 : admin_socket(cct->get_admin_socket()),
136 commands{{"rbd mirror flush " + name,
137 new FlushCommand<I>("flush rbd mirror " + name, replayer)},
138 {"rbd mirror restart " + name,
139 new RestartCommand<I>("restart rbd mirror " + name, replayer)},
140 {"rbd mirror start " + name,
141 new StartCommand<I>("start rbd mirror " + name, replayer)},
142 {"rbd mirror status " + name,
143 new StatusCommand<I>("get status for rbd mirror " + name, replayer)},
144 {"rbd mirror stop " + name,
145 new StopCommand<I>("stop rbd mirror " + name, replayer)}} {
146 }
147
148 int register_commands() {
149 for (auto &it : commands) {
150 int r = admin_socket->register_command(it.first, this,
151 it.second->desc);
152 if (r < 0) {
153 return r;
154 }
155 it.second->registered = true;
156 }
157 return 0;
158 }
159
160 ~ImageReplayerAdminSocketHook() override {
161 admin_socket->unregister_commands(this);
162 for (auto &it : commands) {
163 delete it.second;
164 }
165 commands.clear();
166 }
167
168 int call(std::string_view command, const cmdmap_t& cmdmap,
169 Formatter *f,
170 std::ostream& errss,
171 bufferlist& out) override {
172 auto i = commands.find(command);
173 ceph_assert(i != commands.end());
174 return i->second->call(f);
175 }
176
177 private:
178 typedef std::map<std::string, ImageReplayerAdminSocketCommand<I>*,
179 std::less<>> Commands;
180
181 AdminSocket *admin_socket;
182 Commands commands;
183 };
184
185 } // anonymous namespace
186
187 template <typename I>
188 void ImageReplayer<I>::BootstrapProgressContext::update_progress(
189 const std::string &description, bool flush)
190 {
191 const std::string desc = "bootstrapping, " + description;
192 replayer->set_state_description(0, desc);
193 if (flush) {
194 replayer->update_mirror_image_status(false, boost::none);
195 }
196 }
197
198 template <typename I>
199 struct ImageReplayer<I>::ReplayerListener
200 : public image_replayer::ReplayerListener {
201 ImageReplayer<I>* image_replayer;
202
203 ReplayerListener(ImageReplayer<I>* image_replayer)
204 : image_replayer(image_replayer) {
205 }
206
207 void handle_notification() override {
208 image_replayer->handle_replayer_notification();
209 }
210 };
211
212 template <typename I>
213 ImageReplayer<I>::ImageReplayer(
214 librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid,
215 const std::string &global_image_id, Threads<I> *threads,
216 InstanceWatcher<I> *instance_watcher,
217 MirrorStatusUpdater<I>* local_status_updater,
218 journal::CacheManagerHandler *cache_manager_handler,
219 PoolMetaCache* pool_meta_cache) :
220 m_local_io_ctx(local_io_ctx), m_local_mirror_uuid(local_mirror_uuid),
221 m_global_image_id(global_image_id), m_threads(threads),
222 m_instance_watcher(instance_watcher),
223 m_local_status_updater(local_status_updater),
224 m_cache_manager_handler(cache_manager_handler),
225 m_pool_meta_cache(pool_meta_cache),
226 m_local_image_name(global_image_id),
227 m_lock(ceph::make_mutex("rbd::mirror::ImageReplayer " +
228 stringify(local_io_ctx.get_id()) + " " + global_image_id)),
229 m_progress_cxt(this),
230 m_replayer_listener(new ReplayerListener(this))
231 {
232 // Register asok commands using a temporary "remote_pool_name/global_image_id"
233 // name. When the image name becomes known on start the asok commands will be
234 // re-registered using "remote_pool_name/remote_image_name" name.
235
236 m_image_spec = image_replayer::util::compute_image_spec(
237 local_io_ctx, global_image_id);
238 register_admin_socket_hook();
239 }
240
241 template <typename I>
242 ImageReplayer<I>::~ImageReplayer()
243 {
244 unregister_admin_socket_hook();
245 ceph_assert(m_state_builder == nullptr);
246 ceph_assert(m_on_start_finish == nullptr);
247 ceph_assert(m_on_stop_contexts.empty());
248 ceph_assert(m_bootstrap_request == nullptr);
249 ceph_assert(m_update_status_task == nullptr);
250 delete m_replayer_listener;
251 }
252
253 template <typename I>
254 image_replayer::HealthState ImageReplayer<I>::get_health_state() const {
255 std::lock_guard locker{m_lock};
256
257 if (!m_mirror_image_status_state) {
258 return image_replayer::HEALTH_STATE_OK;
259 } else if (*m_mirror_image_status_state ==
260 cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING ||
261 *m_mirror_image_status_state ==
262 cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN) {
263 return image_replayer::HEALTH_STATE_WARNING;
264 }
265 return image_replayer::HEALTH_STATE_ERROR;
266 }
267
268 template <typename I>
269 void ImageReplayer<I>::add_peer(const Peer<I>& peer) {
270 dout(10) << "peer=" << peer << dendl;
271
272 std::lock_guard locker{m_lock};
273 auto it = m_peers.find(peer);
274 if (it == m_peers.end()) {
275 m_peers.insert(peer);
276 }
277 }
278
279 template <typename I>
280 void ImageReplayer<I>::set_state_description(int r, const std::string &desc) {
281 dout(10) << "r=" << r << ", desc=" << desc << dendl;
282
283 std::lock_guard l{m_lock};
284 m_last_r = r;
285 m_state_desc = desc;
286 }
287
288 template <typename I>
289 void ImageReplayer<I>::start(Context *on_finish, bool manual, bool restart)
290 {
291 dout(10) << "on_finish=" << on_finish << dendl;
292
293 int r = 0;
294 {
295 std::lock_guard locker{m_lock};
296 if (!is_stopped_()) {
297 derr << "already running" << dendl;
298 r = -EINVAL;
299 } else if (m_manual_stop && !manual) {
300 dout(5) << "stopped manually, ignoring start without manual flag"
301 << dendl;
302 r = -EPERM;
303 } else if (restart && !m_restart_requested) {
304 dout(10) << "canceled restart" << dendl;
305 r = -ECANCELED;
306 } else {
307 m_state = STATE_STARTING;
308 m_last_r = 0;
309 m_state_desc.clear();
310 m_manual_stop = false;
311 m_delete_requested = false;
312 m_restart_requested = false;
313 m_status_removed = false;
314
315 if (on_finish != nullptr) {
316 ceph_assert(m_on_start_finish == nullptr);
317 m_on_start_finish = on_finish;
318 }
319 ceph_assert(m_on_stop_contexts.empty());
320 }
321 }
322
323 if (r < 0) {
324 if (on_finish) {
325 on_finish->complete(r);
326 }
327 return;
328 }
329
330 bootstrap();
331 }
332
333 template <typename I>
334 void ImageReplayer<I>::bootstrap() {
335 dout(10) << dendl;
336
337 std::unique_lock locker{m_lock};
338 if (m_peers.empty()) {
339 locker.unlock();
340
341 dout(5) << "no peer clusters" << dendl;
342 on_start_fail(-ENOENT, "no peer clusters");
343 return;
344 }
345
346 // TODO need to support multiple remote images
347 ceph_assert(!m_peers.empty());
348 m_remote_image_peer = *m_peers.begin();
349
350 if (on_start_interrupted(m_lock)) {
351 return;
352 }
353
354 ceph_assert(m_state_builder == nullptr);
355 auto ctx = create_context_callback<
356 ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
357 auto request = image_replayer::BootstrapRequest<I>::create(
358 m_threads, m_local_io_ctx, m_remote_image_peer.io_ctx, m_instance_watcher,
359 m_global_image_id, m_local_mirror_uuid,
360 m_remote_image_peer.remote_pool_meta, m_cache_manager_handler,
361 m_pool_meta_cache, &m_progress_cxt, &m_state_builder, &m_resync_requested,
362 ctx);
363
364 request->get();
365 m_bootstrap_request = request;
366 locker.unlock();
367
368 update_mirror_image_status(false, boost::none);
369 request->send();
370 }
371
372 template <typename I>
373 void ImageReplayer<I>::handle_bootstrap(int r) {
374 dout(10) << "r=" << r << dendl;
375 {
376 std::lock_guard locker{m_lock};
377 m_bootstrap_request->put();
378 m_bootstrap_request = nullptr;
379 }
380
381 if (on_start_interrupted()) {
382 return;
383 } else if (r == -ENOMSG) {
384 dout(5) << "local image is primary" << dendl;
385 on_start_fail(0, "local image is primary");
386 return;
387 } else if (r == -EREMOTEIO) {
388 dout(5) << "remote image is not primary" << dendl;
389 on_start_fail(-EREMOTEIO, "remote image is not primary");
390 return;
391 } else if (r == -EEXIST) {
392 on_start_fail(r, "split-brain detected");
393 return;
394 } else if (r == -ENOLINK) {
395 m_delete_requested = true;
396 on_start_fail(0, "remote image no longer exists");
397 return;
398 } else if (r == -ERESTART) {
399 on_start_fail(r, "image in transient state, try again");
400 return;
401 } else if (r < 0) {
402 on_start_fail(r, "error bootstrapping replay");
403 return;
404 } else if (m_resync_requested) {
405 on_start_fail(0, "resync requested");
406 return;
407 }
408
409 start_replay();
410 }
411
412 template <typename I>
413 void ImageReplayer<I>::start_replay() {
414 dout(10) << dendl;
415
416 std::unique_lock locker{m_lock};
417 ceph_assert(m_replayer == nullptr);
418 m_replayer = m_state_builder->create_replayer(m_threads, m_instance_watcher,
419 m_local_mirror_uuid,
420 m_pool_meta_cache,
421 m_replayer_listener);
422
423 auto ctx = create_context_callback<
424 ImageReplayer<I>, &ImageReplayer<I>::handle_start_replay>(this);
425 m_replayer->init(ctx);
426 }
427
428 template <typename I>
429 void ImageReplayer<I>::handle_start_replay(int r) {
430 dout(10) << "r=" << r << dendl;
431
432 if (on_start_interrupted()) {
433 return;
434 } else if (r < 0) {
435 std::string error_description = m_replayer->get_error_description();
436 if (r == -ENOTCONN && m_replayer->is_resync_requested()) {
437 std::unique_lock locker{m_lock};
438 m_resync_requested = true;
439 }
440
441 // shut down not required if init failed
442 m_replayer->destroy();
443 m_replayer = nullptr;
444
445 derr << "error starting replay: " << cpp_strerror(r) << dendl;
446 on_start_fail(r, error_description);
447 return;
448 }
449
450 Context *on_finish = nullptr;
451 {
452 std::unique_lock locker{m_lock};
453 ceph_assert(m_state == STATE_STARTING);
454 m_state = STATE_REPLAYING;
455 std::swap(m_on_start_finish, on_finish);
456
457 std::unique_lock timer_locker{m_threads->timer_lock};
458 schedule_update_mirror_image_replay_status();
459 }
460
461 update_mirror_image_status(true, boost::none);
462 if (on_replay_interrupted()) {
463 if (on_finish != nullptr) {
464 on_finish->complete(r);
465 }
466 return;
467 }
468
469 dout(10) << "start succeeded" << dendl;
470 if (on_finish != nullptr) {
471 dout(10) << "on finish complete, r=" << r << dendl;
472 on_finish->complete(r);
473 }
474 }
475
476 template <typename I>
477 void ImageReplayer<I>::on_start_fail(int r, const std::string &desc)
478 {
479 dout(10) << "r=" << r << ", desc=" << desc << dendl;
480 Context *ctx = new LambdaContext([this, r, desc](int _r) {
481 {
482 std::lock_guard locker{m_lock};
483 ceph_assert(m_state == STATE_STARTING);
484 m_state = STATE_STOPPING;
485 if (r < 0 && r != -ECANCELED && r != -EREMOTEIO && r != -ENOENT) {
486 derr << "start failed: " << cpp_strerror(r) << dendl;
487 } else {
488 dout(10) << "start canceled" << dendl;
489 }
490 }
491
492 set_state_description(r, desc);
493 update_mirror_image_status(false, boost::none);
494 shut_down(r);
495 });
496 m_threads->work_queue->queue(ctx, 0);
497 }
498
499 template <typename I>
500 bool ImageReplayer<I>::on_start_interrupted() {
501 std::lock_guard locker{m_lock};
502 return on_start_interrupted(m_lock);
503 }
504
505 template <typename I>
506 bool ImageReplayer<I>::on_start_interrupted(ceph::mutex& lock) {
507 ceph_assert(ceph_mutex_is_locked(m_lock));
508 ceph_assert(m_state == STATE_STARTING);
509 if (!m_stop_requested) {
510 return false;
511 }
512
513 on_start_fail(-ECANCELED, "");
514 return true;
515 }
516
517 template <typename I>
518 void ImageReplayer<I>::stop(Context *on_finish, bool manual, bool restart)
519 {
520 dout(10) << "on_finish=" << on_finish << ", manual=" << manual
521 << ", restart=" << restart << dendl;
522
523 image_replayer::BootstrapRequest<I> *bootstrap_request = nullptr;
524 bool shut_down_replay = false;
525 bool is_stopped = false;
526 {
527 std::lock_guard locker{m_lock};
528
529 if (!is_running_()) {
530 if (manual && !m_manual_stop) {
531 dout(10) << "marking manual" << dendl;
532 m_manual_stop = true;
533 }
534 if (!restart && m_restart_requested) {
535 dout(10) << "canceling restart" << dendl;
536 m_restart_requested = false;
537 }
538 if (is_stopped_()) {
539 dout(10) << "already stopped" << dendl;
540 is_stopped = true;
541 } else {
542 dout(10) << "joining in-flight stop" << dendl;
543 if (on_finish != nullptr) {
544 m_on_stop_contexts.push_back(on_finish);
545 }
546 }
547 } else {
548 if (m_state == STATE_STARTING) {
549 dout(10) << "canceling start" << dendl;
550 if (m_bootstrap_request != nullptr) {
551 bootstrap_request = m_bootstrap_request;
552 bootstrap_request->get();
553 }
554 } else {
555 dout(10) << "interrupting replay" << dendl;
556 shut_down_replay = true;
557 }
558
559 ceph_assert(m_on_stop_contexts.empty());
560 if (on_finish != nullptr) {
561 m_on_stop_contexts.push_back(on_finish);
562 }
563 m_stop_requested = true;
564 m_manual_stop = manual;
565 }
566 }
567
568 if (is_stopped) {
569 if (on_finish) {
570 on_finish->complete(-EINVAL);
571 }
572 return;
573 }
574
575 // avoid holding lock since bootstrap request will update status
576 if (bootstrap_request != nullptr) {
577 dout(10) << "canceling bootstrap" << dendl;
578 bootstrap_request->cancel();
579 bootstrap_request->put();
580 }
581
582 if (shut_down_replay) {
583 on_stop_journal_replay();
584 }
585 }
586
587 template <typename I>
588 void ImageReplayer<I>::on_stop_journal_replay(int r, const std::string &desc)
589 {
590 dout(10) << dendl;
591
592 {
593 std::lock_guard locker{m_lock};
594 if (m_state != STATE_REPLAYING) {
595 // might be invoked multiple times while stopping
596 return;
597 }
598
599 m_stop_requested = true;
600 m_state = STATE_STOPPING;
601 }
602
603 cancel_update_mirror_image_replay_status();
604 set_state_description(r, desc);
605 update_mirror_image_status(true, boost::none);
606 shut_down(0);
607 }
608
609 template <typename I>
610 void ImageReplayer<I>::restart(Context *on_finish)
611 {
612 {
613 std::lock_guard locker{m_lock};
614 m_restart_requested = true;
615 }
616
617 auto ctx = new LambdaContext(
618 [this, on_finish](int r) {
619 if (r < 0) {
620 // Try start anyway.
621 }
622 start(on_finish, true, true);
623 });
624 stop(ctx, false, true);
625 }
626
627 template <typename I>
628 void ImageReplayer<I>::flush()
629 {
630 C_SaferCond ctx;
631
632 {
633 std::unique_lock locker{m_lock};
634 if (m_state != STATE_REPLAYING) {
635 return;
636 }
637
638 dout(10) << dendl;
639 ceph_assert(m_replayer != nullptr);
640 m_replayer->flush(&ctx);
641 }
642
643 int r = ctx.wait();
644 if (r >= 0) {
645 update_mirror_image_status(false, boost::none);
646 }
647 }
648
649 template <typename I>
650 bool ImageReplayer<I>::on_replay_interrupted()
651 {
652 bool shut_down;
653 {
654 std::lock_guard locker{m_lock};
655 shut_down = m_stop_requested;
656 }
657
658 if (shut_down) {
659 on_stop_journal_replay();
660 }
661 return shut_down;
662 }
663
664 template <typename I>
665 void ImageReplayer<I>::print_status(Formatter *f)
666 {
667 dout(10) << dendl;
668
669 std::lock_guard l{m_lock};
670
671 f->open_object_section("image_replayer");
672 f->dump_string("name", m_image_spec);
673 f->dump_string("state", to_string(m_state));
674 f->close_section();
675 }
676
677 template <typename I>
678 void ImageReplayer<I>::schedule_update_mirror_image_replay_status() {
679 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
680 ceph_assert(ceph_mutex_is_locked_by_me(m_threads->timer_lock));
681 if (m_state != STATE_REPLAYING) {
682 return;
683 }
684
685 dout(10) << dendl;
686
687 // periodically update the replaying status even if nothing changes
688 // so that we can adjust our performance stats
689 ceph_assert(m_update_status_task == nullptr);
690 m_update_status_task = create_context_callback<
691 ImageReplayer<I>,
692 &ImageReplayer<I>::handle_update_mirror_image_replay_status>(this);
693 m_threads->timer->add_event_after(10, m_update_status_task);
694 }
695
696 template <typename I>
697 void ImageReplayer<I>::handle_update_mirror_image_replay_status(int r) {
698 dout(10) << dendl;
699
700 ceph_assert(ceph_mutex_is_locked_by_me(m_threads->timer_lock));
701
702 ceph_assert(m_update_status_task != nullptr);
703 m_update_status_task = nullptr;
704
705 auto ctx = new LambdaContext([this](int) {
706 update_mirror_image_status(false, boost::none);
707
708 std::unique_lock locker{m_lock};
709 std::unique_lock timer_locker{m_threads->timer_lock};
710
711 schedule_update_mirror_image_replay_status();
712 m_in_flight_op_tracker.finish_op();
713 });
714
715 m_in_flight_op_tracker.start_op();
716 m_threads->work_queue->queue(ctx, 0);
717 }
718
719 template <typename I>
720 void ImageReplayer<I>::cancel_update_mirror_image_replay_status() {
721 std::unique_lock timer_locker{m_threads->timer_lock};
722 if (m_update_status_task != nullptr) {
723 dout(10) << dendl;
724
725 if (m_threads->timer->cancel_event(m_update_status_task)) {
726 m_update_status_task = nullptr;
727 }
728 }
729 }
730
731 template <typename I>
732 void ImageReplayer<I>::update_mirror_image_status(
733 bool force, const OptionalState &opt_state) {
734 dout(15) << "force=" << force << ", "
735 << "state=" << opt_state << dendl;
736
737 {
738 std::lock_guard locker{m_lock};
739 if (!force && !is_stopped_() && !is_running_()) {
740 dout(15) << "shut down in-progress: ignoring update" << dendl;
741 return;
742 }
743 }
744
745 m_in_flight_op_tracker.start_op();
746 auto ctx = new LambdaContext(
747 [this, force, opt_state](int r) {
748 set_mirror_image_status_update(force, opt_state);
749 });
750 m_threads->work_queue->queue(ctx, 0);
751 }
752
753 template <typename I>
754 void ImageReplayer<I>::set_mirror_image_status_update(
755 bool force, const OptionalState &opt_state) {
756 dout(15) << "force=" << force << ", "
757 << "state=" << opt_state << dendl;
758
759 reregister_admin_socket_hook();
760
761 State state;
762 std::string state_desc;
763 int last_r;
764 bool stopping_replay;
765
766 auto mirror_image_status_state = boost::make_optional(
767 false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN);
768 image_replayer::BootstrapRequest<I>* bootstrap_request = nullptr;
769 {
770 std::lock_guard locker{m_lock};
771 state = m_state;
772 state_desc = m_state_desc;
773 mirror_image_status_state = m_mirror_image_status_state;
774 last_r = m_last_r;
775 stopping_replay = (m_replayer != nullptr);
776
777 if (m_bootstrap_request != nullptr) {
778 bootstrap_request = m_bootstrap_request;
779 bootstrap_request->get();
780 }
781 }
782
783 bool syncing = false;
784 if (bootstrap_request != nullptr) {
785 syncing = bootstrap_request->is_syncing();
786 bootstrap_request->put();
787 bootstrap_request = nullptr;
788 }
789
790 if (opt_state) {
791 state = *opt_state;
792 }
793
794 cls::rbd::MirrorImageSiteStatus status;
795 status.up = true;
796 switch (state) {
797 case STATE_STARTING:
798 if (syncing) {
799 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING;
800 status.description = state_desc.empty() ? "syncing" : state_desc;
801 mirror_image_status_state = status.state;
802 } else {
803 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY;
804 status.description = "starting replay";
805 }
806 break;
807 case STATE_REPLAYING:
808 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
809 {
810 std::string desc;
811 auto on_req_finish = new LambdaContext(
812 [this, force](int r) {
813 dout(15) << "replay status ready: r=" << r << dendl;
814 if (r >= 0) {
815 set_mirror_image_status_update(force, boost::none);
816 } else if (r == -EAGAIN) {
817 m_in_flight_op_tracker.finish_op();
818 }
819 });
820
821 ceph_assert(m_replayer != nullptr);
822 if (!m_replayer->get_replay_status(&desc, on_req_finish)) {
823 dout(15) << "waiting for replay status" << dendl;
824 return;
825 }
826
827 status.description = "replaying, " + desc;
828 mirror_image_status_state = boost::make_optional(
829 false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN);
830 }
831 break;
832 case STATE_STOPPING:
833 if (stopping_replay) {
834 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY;
835 status.description = state_desc.empty() ? "stopping replay" : state_desc;
836 break;
837 }
838 // FALLTHROUGH
839 case STATE_STOPPED:
840 if (last_r == -EREMOTEIO) {
841 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN;
842 status.description = state_desc;
843 mirror_image_status_state = status.state;
844 } else if (last_r < 0 && last_r != -ECANCELED) {
845 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR;
846 status.description = state_desc;
847 mirror_image_status_state = status.state;
848 } else {
849 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED;
850 status.description = state_desc.empty() ? "stopped" : state_desc;
851 mirror_image_status_state = boost::none;
852 }
853 break;
854 default:
855 ceph_assert(!"invalid state");
856 }
857
858 {
859 std::lock_guard locker{m_lock};
860 m_mirror_image_status_state = mirror_image_status_state;
861 }
862
863 // prevent the status from ping-ponging when failed replays are restarted
864 if (mirror_image_status_state &&
865 *mirror_image_status_state == cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR) {
866 status.state = *mirror_image_status_state;
867 }
868
869 dout(15) << "status=" << status << dendl;
870 m_local_status_updater->set_mirror_image_status(m_global_image_id, status,
871 force);
872 if (m_remote_image_peer.mirror_status_updater != nullptr) {
873 m_remote_image_peer.mirror_status_updater->set_mirror_image_status(
874 m_global_image_id, status, force);
875 }
876
877 m_in_flight_op_tracker.finish_op();
878 }
879
880 template <typename I>
881 void ImageReplayer<I>::shut_down(int r) {
882 dout(10) << "r=" << r << dendl;
883
884 {
885 std::lock_guard locker{m_lock};
886 ceph_assert(m_state == STATE_STOPPING);
887 }
888
889 if (!m_in_flight_op_tracker.empty()) {
890 dout(15) << "waiting for in-flight operations to complete" << dendl;
891 m_in_flight_op_tracker.wait_for_ops(new LambdaContext([this, r](int) {
892 shut_down(r);
893 }));
894 return;
895 }
896
897 // chain the shut down sequence (reverse order)
898 Context *ctx = new LambdaContext(
899 [this, r](int _r) {
900 update_mirror_image_status(true, STATE_STOPPED);
901 handle_shut_down(r);
902 });
903
904 // destruct the state builder
905 if (m_state_builder != nullptr) {
906 ctx = new LambdaContext([this, ctx](int r) {
907 m_state_builder->close(ctx);
908 });
909 }
910
911 // close the replayer
912 if (m_replayer != nullptr) {
913 ctx = new LambdaContext([this, ctx](int r) {
914 m_replayer->destroy();
915 m_replayer = nullptr;
916 ctx->complete(0);
917 });
918 ctx = new LambdaContext([this, ctx](int r) {
919 m_replayer->shut_down(ctx);
920 });
921 }
922
923 m_threads->work_queue->queue(ctx, 0);
924 }
925
926 template <typename I>
927 void ImageReplayer<I>::handle_shut_down(int r) {
928 bool resync_requested = false;
929 bool delete_requested = false;
930 bool unregister_asok_hook = false;
931 {
932 std::lock_guard locker{m_lock};
933
934 if (m_delete_requested && m_state_builder != nullptr &&
935 !m_state_builder->local_image_id.empty()) {
936 ceph_assert(m_state_builder->remote_image_id.empty());
937 dout(0) << "remote image no longer exists: scheduling deletion" << dendl;
938 unregister_asok_hook = true;
939 std::swap(delete_requested, m_delete_requested);
940 m_delete_in_progress = true;
941 }
942
943 std::swap(resync_requested, m_resync_requested);
944 if (!delete_requested && !resync_requested && m_last_r == -ENOENT &&
945 ((m_state_builder == nullptr) ||
946 (m_state_builder->local_image_id.empty() &&
947 m_state_builder->remote_image_id.empty()))) {
948 dout(0) << "mirror image no longer exists" << dendl;
949 unregister_asok_hook = true;
950 m_finished = true;
951 }
952 }
953
954 if (unregister_asok_hook) {
955 unregister_admin_socket_hook();
956 }
957
958 if (delete_requested || resync_requested) {
959 dout(5) << "moving image to trash" << dendl;
960 auto ctx = new LambdaContext([this, r](int) {
961 handle_shut_down(r);
962 });
963 ImageDeleter<I>::trash_move(m_local_io_ctx, m_global_image_id,
964 resync_requested, m_threads->work_queue, ctx);
965 return;
966 }
967
968 if (!m_in_flight_op_tracker.empty()) {
969 dout(15) << "waiting for in-flight operations to complete" << dendl;
970 m_in_flight_op_tracker.wait_for_ops(new LambdaContext([this, r](int) {
971 handle_shut_down(r);
972 }));
973 return;
974 }
975
976 if (!m_status_removed) {
977 auto ctx = new LambdaContext([this, r](int) {
978 m_status_removed = true;
979 handle_shut_down(r);
980 });
981 remove_image_status(m_delete_in_progress, ctx);
982 return;
983 }
984
985 if (m_state_builder != nullptr) {
986 m_state_builder->destroy();
987 m_state_builder = nullptr;
988 }
989
990 dout(10) << "stop complete" << dendl;
991 Context *on_start = nullptr;
992 Contexts on_stop_contexts;
993 {
994 std::lock_guard locker{m_lock};
995 std::swap(on_start, m_on_start_finish);
996 on_stop_contexts = std::move(m_on_stop_contexts);
997 m_stop_requested = false;
998 ceph_assert(m_state == STATE_STOPPING);
999 m_state = STATE_STOPPED;
1000 }
1001
1002 if (on_start != nullptr) {
1003 dout(10) << "on start finish complete, r=" << r << dendl;
1004 on_start->complete(r);
1005 r = 0;
1006 }
1007 for (auto ctx : on_stop_contexts) {
1008 dout(10) << "on stop finish " << ctx << " complete, r=" << r << dendl;
1009 ctx->complete(r);
1010 }
1011 }
1012
1013 template <typename I>
1014 void ImageReplayer<I>::handle_replayer_notification() {
1015 dout(10) << dendl;
1016
1017 std::unique_lock locker{m_lock};
1018 if (m_state != STATE_REPLAYING) {
1019 // might be attempting to shut down
1020 return;
1021 }
1022
1023 {
1024 // detect a rename of the local image
1025 ceph_assert(m_state_builder != nullptr &&
1026 m_state_builder->local_image_ctx != nullptr);
1027 std::shared_lock image_locker{m_state_builder->local_image_ctx->image_lock};
1028 if (m_local_image_name != m_state_builder->local_image_ctx->name) {
1029 // will re-register with new name after next status update
1030 dout(10) << "image renamed" << dendl;
1031 m_local_image_name = m_state_builder->local_image_ctx->name;
1032 }
1033 }
1034
1035 // replayer cannot be shut down while notification is in-flight
1036 ceph_assert(m_replayer != nullptr);
1037 locker.unlock();
1038
1039 if (m_replayer->is_resync_requested()) {
1040 dout(10) << "resync requested" << dendl;
1041 m_resync_requested = true;
1042 on_stop_journal_replay(0, "resync requested");
1043 return;
1044 }
1045
1046 if (!m_replayer->is_replaying()) {
1047 auto error_code = m_replayer->get_error_code();
1048 auto error_description = m_replayer->get_error_description();
1049 dout(10) << "replay interrupted: "
1050 << "r=" << error_code << ", "
1051 << "error=" << error_description << dendl;
1052 on_stop_journal_replay(error_code, error_description);
1053 return;
1054 }
1055
1056 update_mirror_image_status(false, {});
1057 }
1058
1059 template <typename I>
1060 std::string ImageReplayer<I>::to_string(const State state) {
1061 switch (state) {
1062 case ImageReplayer<I>::STATE_STARTING:
1063 return "Starting";
1064 case ImageReplayer<I>::STATE_REPLAYING:
1065 return "Replaying";
1066 case ImageReplayer<I>::STATE_STOPPING:
1067 return "Stopping";
1068 case ImageReplayer<I>::STATE_STOPPED:
1069 return "Stopped";
1070 default:
1071 break;
1072 }
1073 return "Unknown(" + stringify(state) + ")";
1074 }
1075
1076 template <typename I>
1077 void ImageReplayer<I>::register_admin_socket_hook() {
1078 ImageReplayerAdminSocketHook<I> *asok_hook;
1079 {
1080 std::lock_guard locker{m_lock};
1081 if (m_asok_hook != nullptr) {
1082 return;
1083 }
1084
1085 dout(15) << "registered asok hook: " << m_image_spec << dendl;
1086 asok_hook = new ImageReplayerAdminSocketHook<I>(
1087 g_ceph_context, m_image_spec, this);
1088 int r = asok_hook->register_commands();
1089 if (r == 0) {
1090 m_asok_hook = asok_hook;
1091 return;
1092 }
1093 derr << "error registering admin socket commands" << dendl;
1094 }
1095 delete asok_hook;
1096 }
1097
1098 template <typename I>
1099 void ImageReplayer<I>::unregister_admin_socket_hook() {
1100 dout(15) << dendl;
1101
1102 AdminSocketHook *asok_hook = nullptr;
1103 {
1104 std::lock_guard locker{m_lock};
1105 std::swap(asok_hook, m_asok_hook);
1106 }
1107 delete asok_hook;
1108 }
1109
1110 template <typename I>
1111 void ImageReplayer<I>::reregister_admin_socket_hook() {
1112 std::unique_lock locker{m_lock};
1113 if (m_state == STATE_STARTING && m_bootstrap_request != nullptr) {
1114 m_local_image_name = m_bootstrap_request->get_local_image_name();
1115 }
1116
1117 auto image_spec = image_replayer::util::compute_image_spec(
1118 m_local_io_ctx, m_local_image_name);
1119 if (m_asok_hook != nullptr && m_image_spec == image_spec) {
1120 return;
1121 }
1122
1123 dout(15) << "old_image_spec=" << m_image_spec << ", "
1124 << "new_image_spec=" << image_spec << dendl;
1125 m_image_spec = image_spec;
1126
1127 if (m_state == STATE_STOPPING || m_state == STATE_STOPPED) {
1128 // no need to re-register if stopping
1129 return;
1130 }
1131 locker.unlock();
1132
1133 unregister_admin_socket_hook();
1134 register_admin_socket_hook();
1135 }
1136
1137 template <typename I>
1138 void ImageReplayer<I>::remove_image_status(bool force, Context *on_finish)
1139 {
1140 auto ctx = new LambdaContext([this, force, on_finish](int) {
1141 remove_image_status_remote(force, on_finish);
1142 });
1143
1144 if (m_local_status_updater->exists(m_global_image_id)) {
1145 dout(15) << "removing local mirror image status" << dendl;
1146 if (force) {
1147 m_local_status_updater->remove_mirror_image_status(
1148 m_global_image_id, true, ctx);
1149 } else {
1150 m_local_status_updater->remove_refresh_mirror_image_status(
1151 m_global_image_id, ctx);
1152 }
1153 return;
1154 }
1155
1156 ctx->complete(0);
1157 }
1158
1159 template <typename I>
1160 void ImageReplayer<I>::remove_image_status_remote(bool force, Context *on_finish)
1161 {
1162 if (m_remote_image_peer.mirror_status_updater != nullptr &&
1163 m_remote_image_peer.mirror_status_updater->exists(m_global_image_id)) {
1164 dout(15) << "removing remote mirror image status" << dendl;
1165 if (force) {
1166 m_remote_image_peer.mirror_status_updater->remove_mirror_image_status(
1167 m_global_image_id, true, on_finish);
1168 } else {
1169 m_remote_image_peer.mirror_status_updater->remove_refresh_mirror_image_status(
1170 m_global_image_id, on_finish);
1171 }
1172 return;
1173 }
1174 if (on_finish) {
1175 on_finish->complete(0);
1176 }
1177 }
1178
1179 template <typename I>
1180 std::ostream &operator<<(std::ostream &os, const ImageReplayer<I> &replayer)
1181 {
1182 os << "ImageReplayer: " << &replayer << " [" << replayer.get_local_pool_id()
1183 << "/" << replayer.get_global_image_id() << "]";
1184 return os;
1185 }
1186
1187 } // namespace mirror
1188 } // namespace rbd
1189
1190 template class rbd::mirror::ImageReplayer<librbd::ImageCtx>;