]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/ImageReplayer.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / tools / rbd_mirror / ImageReplayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/compat.h"
5 #include "common/Formatter.h"
6 #include "common/admin_socket.h"
7 #include "common/debug.h"
8 #include "common/errno.h"
9 #include "include/stringify.h"
10 #include "cls/rbd/cls_rbd_client.h"
11 #include "common/Timer.h"
12 #include "global/global_context.h"
13 #include "journal/Journaler.h"
14 #include "librbd/ExclusiveLock.h"
15 #include "librbd/ImageCtx.h"
16 #include "librbd/ImageState.h"
17 #include "librbd/Journal.h"
18 #include "librbd/Operations.h"
19 #include "librbd/Utils.h"
20 #include "librbd/asio/ContextWQ.h"
21 #include "ImageDeleter.h"
22 #include "ImageReplayer.h"
23 #include "MirrorStatusUpdater.h"
24 #include "Threads.h"
25 #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
26 #include "tools/rbd_mirror/image_replayer/ReplayerListener.h"
27 #include "tools/rbd_mirror/image_replayer/StateBuilder.h"
28 #include "tools/rbd_mirror/image_replayer/Utils.h"
29 #include "tools/rbd_mirror/image_replayer/journal/Replayer.h"
30 #include "tools/rbd_mirror/image_replayer/journal/StateBuilder.h"
31 #include <map>
32
33 #define dout_context g_ceph_context
34 #define dout_subsys ceph_subsys_rbd_mirror
35 #undef dout_prefix
36 #define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
37 << __func__ << ": "
38
39 namespace rbd {
40 namespace mirror {
41
42 using librbd::util::create_context_callback;
43
44 template <typename I>
45 std::ostream &operator<<(std::ostream &os,
46 const typename ImageReplayer<I>::State &state);
47
48 namespace {
49
50 template <typename I>
51 class ImageReplayerAdminSocketCommand {
52 public:
53 ImageReplayerAdminSocketCommand(const std::string &desc,
54 ImageReplayer<I> *replayer)
55 : desc(desc), replayer(replayer) {
56 }
57 virtual ~ImageReplayerAdminSocketCommand() {}
58 virtual int call(Formatter *f) = 0;
59
60 std::string desc;
61 ImageReplayer<I> *replayer;
62 bool registered = false;
63 };
64
65 template <typename I>
66 class StatusCommand : public ImageReplayerAdminSocketCommand<I> {
67 public:
68 explicit StatusCommand(const std::string &desc, ImageReplayer<I> *replayer)
69 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
70 }
71
72 int call(Formatter *f) override {
73 this->replayer->print_status(f);
74 return 0;
75 }
76 };
77
78 template <typename I>
79 class StartCommand : public ImageReplayerAdminSocketCommand<I> {
80 public:
81 explicit StartCommand(const std::string &desc, ImageReplayer<I> *replayer)
82 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
83 }
84
85 int call(Formatter *f) override {
86 this->replayer->start(nullptr, true);
87 return 0;
88 }
89 };
90
91 template <typename I>
92 class StopCommand : public ImageReplayerAdminSocketCommand<I> {
93 public:
94 explicit StopCommand(const std::string &desc, ImageReplayer<I> *replayer)
95 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
96 }
97
98 int call(Formatter *f) override {
99 this->replayer->stop(nullptr, true);
100 return 0;
101 }
102 };
103
104 template <typename I>
105 class RestartCommand : public ImageReplayerAdminSocketCommand<I> {
106 public:
107 explicit RestartCommand(const std::string &desc, ImageReplayer<I> *replayer)
108 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
109 }
110
111 int call(Formatter *f) override {
112 this->replayer->restart();
113 return 0;
114 }
115 };
116
117 template <typename I>
118 class FlushCommand : public ImageReplayerAdminSocketCommand<I> {
119 public:
120 explicit FlushCommand(const std::string &desc, ImageReplayer<I> *replayer)
121 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
122 }
123
124 int call(Formatter *f) override {
125 this->replayer->flush();
126 return 0;
127 }
128 };
129
130 template <typename I>
131 class ImageReplayerAdminSocketHook : public AdminSocketHook {
132 public:
133 ImageReplayerAdminSocketHook(CephContext *cct, const std::string &name,
134 ImageReplayer<I> *replayer)
135 : admin_socket(cct->get_admin_socket()),
136 commands{{"rbd mirror flush " + name,
137 new FlushCommand<I>("flush rbd mirror " + name, replayer)},
138 {"rbd mirror restart " + name,
139 new RestartCommand<I>("restart rbd mirror " + name, replayer)},
140 {"rbd mirror start " + name,
141 new StartCommand<I>("start rbd mirror " + name, replayer)},
142 {"rbd mirror status " + name,
143 new StatusCommand<I>("get status for rbd mirror " + name, replayer)},
144 {"rbd mirror stop " + name,
145 new StopCommand<I>("stop rbd mirror " + name, replayer)}} {
146 }
147
148 int register_commands() {
149 for (auto &it : commands) {
150 int r = admin_socket->register_command(it.first, this,
151 it.second->desc);
152 if (r < 0) {
153 return r;
154 }
155 it.second->registered = true;
156 }
157 return 0;
158 }
159
160 ~ImageReplayerAdminSocketHook() override {
161 admin_socket->unregister_commands(this);
162 for (auto &it : commands) {
163 delete it.second;
164 }
165 commands.clear();
166 }
167
168 int call(std::string_view command, const cmdmap_t& cmdmap,
169 const bufferlist&,
170 Formatter *f,
171 std::ostream& errss,
172 bufferlist& out) override {
173 auto i = commands.find(command);
174 ceph_assert(i != commands.end());
175 return i->second->call(f);
176 }
177
178 private:
179 typedef std::map<std::string, ImageReplayerAdminSocketCommand<I>*,
180 std::less<>> Commands;
181
182 AdminSocket *admin_socket;
183 Commands commands;
184 };
185
186 } // anonymous namespace
187
188 template <typename I>
189 void ImageReplayer<I>::BootstrapProgressContext::update_progress(
190 const std::string &description, bool flush)
191 {
192 const std::string desc = "bootstrapping, " + description;
193 replayer->set_state_description(0, desc);
194 if (flush) {
195 replayer->update_mirror_image_status(false, boost::none);
196 }
197 }
198
199 template <typename I>
200 struct ImageReplayer<I>::ReplayerListener
201 : public image_replayer::ReplayerListener {
202 ImageReplayer<I>* image_replayer;
203
204 ReplayerListener(ImageReplayer<I>* image_replayer)
205 : image_replayer(image_replayer) {
206 }
207
208 void handle_notification() override {
209 image_replayer->handle_replayer_notification();
210 }
211 };
212
213 template <typename I>
214 ImageReplayer<I>::ImageReplayer(
215 librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid,
216 const std::string &global_image_id, Threads<I> *threads,
217 InstanceWatcher<I> *instance_watcher,
218 MirrorStatusUpdater<I>* local_status_updater,
219 journal::CacheManagerHandler *cache_manager_handler,
220 PoolMetaCache* pool_meta_cache) :
221 m_local_io_ctx(local_io_ctx), m_local_mirror_uuid(local_mirror_uuid),
222 m_global_image_id(global_image_id), m_threads(threads),
223 m_instance_watcher(instance_watcher),
224 m_local_status_updater(local_status_updater),
225 m_cache_manager_handler(cache_manager_handler),
226 m_pool_meta_cache(pool_meta_cache),
227 m_local_image_name(global_image_id),
228 m_lock(ceph::make_mutex("rbd::mirror::ImageReplayer " +
229 stringify(local_io_ctx.get_id()) + " " + global_image_id)),
230 m_progress_cxt(this),
231 m_replayer_listener(new ReplayerListener(this))
232 {
233 // Register asok commands using a temporary "remote_pool_name/global_image_id"
234 // name. When the image name becomes known on start the asok commands will be
235 // re-registered using "remote_pool_name/remote_image_name" name.
236
237 m_image_spec = image_replayer::util::compute_image_spec(
238 local_io_ctx, global_image_id);
239 register_admin_socket_hook();
240 }
241
242 template <typename I>
243 ImageReplayer<I>::~ImageReplayer()
244 {
245 unregister_admin_socket_hook();
246 ceph_assert(m_state_builder == nullptr);
247 ceph_assert(m_on_start_finish == nullptr);
248 ceph_assert(m_on_stop_contexts.empty());
249 ceph_assert(m_bootstrap_request == nullptr);
250 ceph_assert(m_update_status_task == nullptr);
251 delete m_replayer_listener;
252 }
253
254 template <typename I>
255 image_replayer::HealthState ImageReplayer<I>::get_health_state() const {
256 std::lock_guard locker{m_lock};
257
258 if (!m_mirror_image_status_state) {
259 return image_replayer::HEALTH_STATE_OK;
260 } else if (*m_mirror_image_status_state ==
261 cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING ||
262 *m_mirror_image_status_state ==
263 cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN) {
264 return image_replayer::HEALTH_STATE_WARNING;
265 }
266 return image_replayer::HEALTH_STATE_ERROR;
267 }
268
269 template <typename I>
270 void ImageReplayer<I>::add_peer(const Peer<I>& peer) {
271 dout(10) << "peer=" << peer << dendl;
272
273 std::lock_guard locker{m_lock};
274 auto it = m_peers.find(peer);
275 if (it == m_peers.end()) {
276 m_peers.insert(peer);
277 }
278 }
279
280 template <typename I>
281 void ImageReplayer<I>::set_state_description(int r, const std::string &desc) {
282 dout(10) << "r=" << r << ", desc=" << desc << dendl;
283
284 std::lock_guard l{m_lock};
285 m_last_r = r;
286 m_state_desc = desc;
287 }
288
289 template <typename I>
290 void ImageReplayer<I>::start(Context *on_finish, bool manual, bool restart)
291 {
292 dout(10) << "on_finish=" << on_finish << dendl;
293
294 int r = 0;
295 {
296 std::lock_guard locker{m_lock};
297 if (!is_stopped_()) {
298 derr << "already running" << dendl;
299 r = -EINVAL;
300 } else if (m_manual_stop && !manual) {
301 dout(5) << "stopped manually, ignoring start without manual flag"
302 << dendl;
303 r = -EPERM;
304 } else if (restart && !m_restart_requested) {
305 dout(10) << "canceled restart" << dendl;
306 r = -ECANCELED;
307 } else {
308 m_state = STATE_STARTING;
309 m_last_r = 0;
310 m_state_desc.clear();
311 m_manual_stop = false;
312 m_delete_requested = false;
313 m_restart_requested = false;
314 m_status_removed = false;
315
316 if (on_finish != nullptr) {
317 ceph_assert(m_on_start_finish == nullptr);
318 m_on_start_finish = on_finish;
319 }
320 ceph_assert(m_on_stop_contexts.empty());
321 }
322 }
323
324 if (r < 0) {
325 if (on_finish) {
326 on_finish->complete(r);
327 }
328 return;
329 }
330
331 bootstrap();
332 }
333
334 template <typename I>
335 void ImageReplayer<I>::bootstrap() {
336 dout(10) << dendl;
337
338 std::unique_lock locker{m_lock};
339 if (m_peers.empty()) {
340 locker.unlock();
341
342 dout(5) << "no peer clusters" << dendl;
343 on_start_fail(-ENOENT, "no peer clusters");
344 return;
345 }
346
347 // TODO need to support multiple remote images
348 ceph_assert(!m_peers.empty());
349 m_remote_image_peer = *m_peers.begin();
350
351 ceph_assert(m_state_builder == nullptr);
352 auto ctx = create_context_callback<
353 ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
354 auto request = image_replayer::BootstrapRequest<I>::create(
355 m_threads, m_local_io_ctx, m_remote_image_peer.io_ctx, m_instance_watcher,
356 m_global_image_id, m_local_mirror_uuid,
357 m_remote_image_peer.remote_pool_meta, m_cache_manager_handler,
358 m_pool_meta_cache, &m_progress_cxt, &m_state_builder, &m_resync_requested,
359 ctx);
360
361 request->get();
362 m_bootstrap_request = request;
363
364 // proceed even if stop was requested to allow for m_delete_requested
365 // to get set; cancel() would prevent BootstrapRequest from going into
366 // image sync
367 if (m_stop_requested) {
368 request->cancel();
369 }
370 locker.unlock();
371
372 update_mirror_image_status(false, boost::none);
373 request->send();
374 }
375
376 template <typename I>
377 void ImageReplayer<I>::handle_bootstrap(int r) {
378 dout(10) << "r=" << r << dendl;
379 {
380 std::lock_guard locker{m_lock};
381 m_bootstrap_request->put();
382 m_bootstrap_request = nullptr;
383 }
384
385 // set m_delete_requested early to ensure that in case remote
386 // image no longer exists local image gets deleted even if start
387 // is interrupted
388 if (r == -ENOLINK) {
389 dout(5) << "remote image no longer exists" << dendl;
390 m_delete_requested = true;
391 }
392
393 if (on_start_interrupted()) {
394 return;
395 } else if (r == -ENOMSG) {
396 dout(5) << "local image is primary" << dendl;
397 on_start_fail(0, "local image is primary");
398 return;
399 } else if (r == -EREMOTEIO) {
400 dout(5) << "remote image is not primary" << dendl;
401 on_start_fail(-EREMOTEIO, "remote image is not primary");
402 return;
403 } else if (r == -EEXIST) {
404 on_start_fail(r, "split-brain detected");
405 return;
406 } else if (r == -ENOLINK) {
407 on_start_fail(0, "remote image no longer exists");
408 return;
409 } else if (r == -ERESTART) {
410 on_start_fail(r, "image in transient state, try again");
411 return;
412 } else if (r < 0) {
413 on_start_fail(r, "error bootstrapping replay");
414 return;
415 } else if (m_resync_requested) {
416 on_start_fail(0, "resync requested");
417 return;
418 }
419
420 start_replay();
421 }
422
423 template <typename I>
424 void ImageReplayer<I>::start_replay() {
425 dout(10) << dendl;
426
427 std::unique_lock locker{m_lock};
428 ceph_assert(m_replayer == nullptr);
429 m_replayer = m_state_builder->create_replayer(m_threads, m_instance_watcher,
430 m_local_mirror_uuid,
431 m_pool_meta_cache,
432 m_replayer_listener);
433
434 auto ctx = create_context_callback<
435 ImageReplayer<I>, &ImageReplayer<I>::handle_start_replay>(this);
436 m_replayer->init(ctx);
437 }
438
439 template <typename I>
440 void ImageReplayer<I>::handle_start_replay(int r) {
441 dout(10) << "r=" << r << dendl;
442
443 if (on_start_interrupted()) {
444 return;
445 } else if (r < 0) {
446 std::string error_description = m_replayer->get_error_description();
447 if (r == -ENOTCONN && m_replayer->is_resync_requested()) {
448 std::unique_lock locker{m_lock};
449 m_resync_requested = true;
450 }
451
452 // shut down not required if init failed
453 m_replayer->destroy();
454 m_replayer = nullptr;
455
456 derr << "error starting replay: " << cpp_strerror(r) << dendl;
457 on_start_fail(r, error_description);
458 return;
459 }
460
461 Context *on_finish = nullptr;
462 {
463 std::unique_lock locker{m_lock};
464 ceph_assert(m_state == STATE_STARTING);
465 m_state = STATE_REPLAYING;
466 std::swap(m_on_start_finish, on_finish);
467
468 std::unique_lock timer_locker{m_threads->timer_lock};
469 schedule_update_mirror_image_replay_status();
470 }
471
472 update_mirror_image_status(true, boost::none);
473 if (on_replay_interrupted()) {
474 if (on_finish != nullptr) {
475 on_finish->complete(r);
476 }
477 return;
478 }
479
480 dout(10) << "start succeeded" << dendl;
481 if (on_finish != nullptr) {
482 dout(10) << "on finish complete, r=" << r << dendl;
483 on_finish->complete(r);
484 }
485 }
486
487 template <typename I>
488 void ImageReplayer<I>::on_start_fail(int r, const std::string &desc)
489 {
490 dout(10) << "r=" << r << ", desc=" << desc << dendl;
491 Context *ctx = new LambdaContext([this, r, desc](int _r) {
492 {
493 std::lock_guard locker{m_lock};
494 ceph_assert(m_state == STATE_STARTING);
495 m_state = STATE_STOPPING;
496 if (r < 0 && r != -ECANCELED && r != -EREMOTEIO && r != -ENOENT) {
497 derr << "start failed: " << cpp_strerror(r) << dendl;
498 } else {
499 dout(10) << "start canceled" << dendl;
500 }
501 }
502
503 set_state_description(r, desc);
504 update_mirror_image_status(false, boost::none);
505 shut_down(r);
506 });
507 m_threads->work_queue->queue(ctx, 0);
508 }
509
510 template <typename I>
511 bool ImageReplayer<I>::on_start_interrupted() {
512 std::lock_guard locker{m_lock};
513 return on_start_interrupted(m_lock);
514 }
515
516 template <typename I>
517 bool ImageReplayer<I>::on_start_interrupted(ceph::mutex& lock) {
518 ceph_assert(ceph_mutex_is_locked(m_lock));
519 ceph_assert(m_state == STATE_STARTING);
520 if (!m_stop_requested) {
521 return false;
522 }
523
524 on_start_fail(-ECANCELED, "");
525 return true;
526 }
527
528 template <typename I>
529 void ImageReplayer<I>::stop(Context *on_finish, bool manual, bool restart)
530 {
531 dout(10) << "on_finish=" << on_finish << ", manual=" << manual
532 << ", restart=" << restart << dendl;
533
534 image_replayer::BootstrapRequest<I> *bootstrap_request = nullptr;
535 bool shut_down_replay = false;
536 bool is_stopped = false;
537 {
538 std::lock_guard locker{m_lock};
539
540 if (!is_running_()) {
541 if (manual && !m_manual_stop) {
542 dout(10) << "marking manual" << dendl;
543 m_manual_stop = true;
544 }
545 if (!restart && m_restart_requested) {
546 dout(10) << "canceling restart" << dendl;
547 m_restart_requested = false;
548 }
549 if (is_stopped_()) {
550 dout(10) << "already stopped" << dendl;
551 is_stopped = true;
552 } else {
553 dout(10) << "joining in-flight stop" << dendl;
554 if (on_finish != nullptr) {
555 m_on_stop_contexts.push_back(on_finish);
556 }
557 }
558 } else {
559 if (m_state == STATE_STARTING) {
560 dout(10) << "canceling start" << dendl;
561 if (m_bootstrap_request != nullptr) {
562 bootstrap_request = m_bootstrap_request;
563 bootstrap_request->get();
564 }
565 } else {
566 dout(10) << "interrupting replay" << dendl;
567 shut_down_replay = true;
568 }
569
570 ceph_assert(m_on_stop_contexts.empty());
571 if (on_finish != nullptr) {
572 m_on_stop_contexts.push_back(on_finish);
573 }
574 m_stop_requested = true;
575 m_manual_stop = manual;
576 }
577 }
578
579 if (is_stopped) {
580 if (on_finish) {
581 on_finish->complete(-EINVAL);
582 }
583 return;
584 }
585
586 // avoid holding lock since bootstrap request will update status
587 if (bootstrap_request != nullptr) {
588 dout(10) << "canceling bootstrap" << dendl;
589 bootstrap_request->cancel();
590 bootstrap_request->put();
591 }
592
593 if (shut_down_replay) {
594 on_stop_journal_replay();
595 }
596 }
597
598 template <typename I>
599 void ImageReplayer<I>::on_stop_journal_replay(int r, const std::string &desc)
600 {
601 dout(10) << dendl;
602
603 {
604 std::lock_guard locker{m_lock};
605 if (m_state != STATE_REPLAYING) {
606 // might be invoked multiple times while stopping
607 return;
608 }
609
610 m_stop_requested = true;
611 m_state = STATE_STOPPING;
612 }
613
614 cancel_update_mirror_image_replay_status();
615 set_state_description(r, desc);
616 update_mirror_image_status(true, boost::none);
617 shut_down(0);
618 }
619
620 template <typename I>
621 void ImageReplayer<I>::restart(Context *on_finish)
622 {
623 {
624 std::lock_guard locker{m_lock};
625 m_restart_requested = true;
626 }
627
628 auto ctx = new LambdaContext(
629 [this, on_finish](int r) {
630 if (r < 0) {
631 // Try start anyway.
632 }
633 start(on_finish, true, true);
634 });
635 stop(ctx, false, true);
636 }
637
638 template <typename I>
639 void ImageReplayer<I>::flush()
640 {
641 C_SaferCond ctx;
642
643 {
644 std::unique_lock locker{m_lock};
645 if (m_state != STATE_REPLAYING) {
646 return;
647 }
648
649 dout(10) << dendl;
650 ceph_assert(m_replayer != nullptr);
651 m_replayer->flush(&ctx);
652 }
653
654 int r = ctx.wait();
655 if (r >= 0) {
656 update_mirror_image_status(false, boost::none);
657 }
658 }
659
660 template <typename I>
661 bool ImageReplayer<I>::on_replay_interrupted()
662 {
663 bool shut_down;
664 {
665 std::lock_guard locker{m_lock};
666 shut_down = m_stop_requested;
667 }
668
669 if (shut_down) {
670 on_stop_journal_replay();
671 }
672 return shut_down;
673 }
674
675 template <typename I>
676 void ImageReplayer<I>::print_status(Formatter *f)
677 {
678 dout(10) << dendl;
679
680 std::lock_guard l{m_lock};
681
682 f->open_object_section("image_replayer");
683 f->dump_string("name", m_image_spec);
684 f->dump_string("state", to_string(m_state));
685 f->close_section();
686 }
687
688 template <typename I>
689 void ImageReplayer<I>::schedule_update_mirror_image_replay_status() {
690 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
691 ceph_assert(ceph_mutex_is_locked_by_me(m_threads->timer_lock));
692 if (m_state != STATE_REPLAYING) {
693 return;
694 }
695
696 dout(10) << dendl;
697
698 // periodically update the replaying status even if nothing changes
699 // so that we can adjust our performance stats
700 ceph_assert(m_update_status_task == nullptr);
701 m_update_status_task = create_context_callback<
702 ImageReplayer<I>,
703 &ImageReplayer<I>::handle_update_mirror_image_replay_status>(this);
704 m_threads->timer->add_event_after(10, m_update_status_task);
705 }
706
707 template <typename I>
708 void ImageReplayer<I>::handle_update_mirror_image_replay_status(int r) {
709 dout(10) << dendl;
710
711 ceph_assert(ceph_mutex_is_locked_by_me(m_threads->timer_lock));
712
713 ceph_assert(m_update_status_task != nullptr);
714 m_update_status_task = nullptr;
715
716 auto ctx = new LambdaContext([this](int) {
717 update_mirror_image_status(false, boost::none);
718
719 std::unique_lock locker{m_lock};
720 std::unique_lock timer_locker{m_threads->timer_lock};
721
722 schedule_update_mirror_image_replay_status();
723 m_in_flight_op_tracker.finish_op();
724 });
725
726 m_in_flight_op_tracker.start_op();
727 m_threads->work_queue->queue(ctx, 0);
728 }
729
730 template <typename I>
731 void ImageReplayer<I>::cancel_update_mirror_image_replay_status() {
732 std::unique_lock timer_locker{m_threads->timer_lock};
733 if (m_update_status_task != nullptr) {
734 dout(10) << dendl;
735
736 if (m_threads->timer->cancel_event(m_update_status_task)) {
737 m_update_status_task = nullptr;
738 }
739 }
740 }
741
742 template <typename I>
743 void ImageReplayer<I>::update_mirror_image_status(
744 bool force, const OptionalState &opt_state) {
745 dout(15) << "force=" << force << ", "
746 << "state=" << opt_state << dendl;
747
748 {
749 std::lock_guard locker{m_lock};
750 if (!force && !is_stopped_() && !is_running_()) {
751 dout(15) << "shut down in-progress: ignoring update" << dendl;
752 return;
753 }
754 }
755
756 m_in_flight_op_tracker.start_op();
757 auto ctx = new LambdaContext(
758 [this, force, opt_state](int r) {
759 set_mirror_image_status_update(force, opt_state);
760 });
761 m_threads->work_queue->queue(ctx, 0);
762 }
763
764 template <typename I>
765 void ImageReplayer<I>::set_mirror_image_status_update(
766 bool force, const OptionalState &opt_state) {
767 dout(15) << "force=" << force << ", "
768 << "state=" << opt_state << dendl;
769
770 reregister_admin_socket_hook();
771
772 State state;
773 std::string state_desc;
774 int last_r;
775 bool stopping_replay;
776
777 auto mirror_image_status_state = boost::make_optional(
778 false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN);
779 image_replayer::BootstrapRequest<I>* bootstrap_request = nullptr;
780 {
781 std::lock_guard locker{m_lock};
782 state = m_state;
783 state_desc = m_state_desc;
784 mirror_image_status_state = m_mirror_image_status_state;
785 last_r = m_last_r;
786 stopping_replay = (m_replayer != nullptr);
787
788 if (m_bootstrap_request != nullptr) {
789 bootstrap_request = m_bootstrap_request;
790 bootstrap_request->get();
791 }
792 }
793
794 bool syncing = false;
795 if (bootstrap_request != nullptr) {
796 syncing = bootstrap_request->is_syncing();
797 bootstrap_request->put();
798 bootstrap_request = nullptr;
799 }
800
801 if (opt_state) {
802 state = *opt_state;
803 }
804
805 cls::rbd::MirrorImageSiteStatus status;
806 status.up = true;
807 switch (state) {
808 case STATE_STARTING:
809 if (syncing) {
810 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING;
811 status.description = state_desc.empty() ? "syncing" : state_desc;
812 mirror_image_status_state = status.state;
813 } else {
814 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY;
815 status.description = "starting replay";
816 }
817 break;
818 case STATE_REPLAYING:
819 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
820 {
821 std::string desc;
822 auto on_req_finish = new LambdaContext(
823 [this, force](int r) {
824 dout(15) << "replay status ready: r=" << r << dendl;
825 if (r >= 0) {
826 set_mirror_image_status_update(force, boost::none);
827 } else if (r == -EAGAIN) {
828 m_in_flight_op_tracker.finish_op();
829 }
830 });
831
832 ceph_assert(m_replayer != nullptr);
833 if (!m_replayer->get_replay_status(&desc, on_req_finish)) {
834 dout(15) << "waiting for replay status" << dendl;
835 return;
836 }
837
838 status.description = "replaying, " + desc;
839 mirror_image_status_state = boost::make_optional(
840 false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN);
841 }
842 break;
843 case STATE_STOPPING:
844 if (stopping_replay) {
845 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY;
846 status.description = state_desc.empty() ? "stopping replay" : state_desc;
847 break;
848 }
849 // FALLTHROUGH
850 case STATE_STOPPED:
851 if (last_r == -EREMOTEIO) {
852 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN;
853 status.description = state_desc;
854 mirror_image_status_state = status.state;
855 } else if (last_r < 0 && last_r != -ECANCELED) {
856 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR;
857 status.description = state_desc;
858 mirror_image_status_state = status.state;
859 } else {
860 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED;
861 status.description = state_desc.empty() ? "stopped" : state_desc;
862 mirror_image_status_state = boost::none;
863 }
864 break;
865 default:
866 ceph_assert(!"invalid state");
867 }
868
869 {
870 std::lock_guard locker{m_lock};
871 m_mirror_image_status_state = mirror_image_status_state;
872 }
873
874 // prevent the status from ping-ponging when failed replays are restarted
875 if (mirror_image_status_state &&
876 *mirror_image_status_state == cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR) {
877 status.state = *mirror_image_status_state;
878 }
879
880 dout(15) << "status=" << status << dendl;
881 m_local_status_updater->set_mirror_image_status(m_global_image_id, status,
882 force);
883 if (m_remote_image_peer.mirror_status_updater != nullptr) {
884 m_remote_image_peer.mirror_status_updater->set_mirror_image_status(
885 m_global_image_id, status, force);
886 }
887
888 m_in_flight_op_tracker.finish_op();
889 }
890
891 template <typename I>
892 void ImageReplayer<I>::shut_down(int r) {
893 dout(10) << "r=" << r << dendl;
894
895 {
896 std::lock_guard locker{m_lock};
897 ceph_assert(m_state == STATE_STOPPING);
898 }
899
900 if (!m_in_flight_op_tracker.empty()) {
901 dout(15) << "waiting for in-flight operations to complete" << dendl;
902 m_in_flight_op_tracker.wait_for_ops(new LambdaContext([this, r](int) {
903 shut_down(r);
904 }));
905 return;
906 }
907
908 // chain the shut down sequence (reverse order)
909 Context *ctx = new LambdaContext(
910 [this, r](int _r) {
911 update_mirror_image_status(true, STATE_STOPPED);
912 handle_shut_down(r);
913 });
914
915 // destruct the state builder
916 if (m_state_builder != nullptr) {
917 ctx = new LambdaContext([this, ctx](int r) {
918 m_state_builder->close(ctx);
919 });
920 }
921
922 // close the replayer
923 if (m_replayer != nullptr) {
924 ctx = new LambdaContext([this, ctx](int r) {
925 m_replayer->destroy();
926 m_replayer = nullptr;
927 ctx->complete(0);
928 });
929 ctx = new LambdaContext([this, ctx](int r) {
930 m_replayer->shut_down(ctx);
931 });
932 }
933
934 m_threads->work_queue->queue(ctx, 0);
935 }
936
937 template <typename I>
938 void ImageReplayer<I>::handle_shut_down(int r) {
939 bool resync_requested = false;
940 bool delete_requested = false;
941 bool unregister_asok_hook = false;
942 {
943 std::lock_guard locker{m_lock};
944
945 if (m_delete_requested && m_state_builder != nullptr &&
946 !m_state_builder->local_image_id.empty()) {
947 ceph_assert(m_state_builder->remote_image_id.empty());
948 dout(0) << "remote image no longer exists: scheduling deletion" << dendl;
949 unregister_asok_hook = true;
950 std::swap(delete_requested, m_delete_requested);
951 m_delete_in_progress = true;
952 }
953
954 std::swap(resync_requested, m_resync_requested);
955 if (!delete_requested && !resync_requested && m_last_r == -ENOENT &&
956 ((m_state_builder == nullptr) ||
957 (m_state_builder->local_image_id.empty() &&
958 m_state_builder->remote_image_id.empty()))) {
959 dout(0) << "mirror image no longer exists" << dendl;
960 unregister_asok_hook = true;
961 m_finished = true;
962 }
963 }
964
965 if (unregister_asok_hook) {
966 unregister_admin_socket_hook();
967 }
968
969 if (delete_requested || resync_requested) {
970 dout(5) << "moving image to trash" << dendl;
971 auto ctx = new LambdaContext([this, r](int) {
972 handle_shut_down(r);
973 });
974 ImageDeleter<I>::trash_move(m_local_io_ctx, m_global_image_id,
975 resync_requested, m_threads->work_queue, ctx);
976 return;
977 }
978
979 if (!m_in_flight_op_tracker.empty()) {
980 dout(15) << "waiting for in-flight operations to complete" << dendl;
981 m_in_flight_op_tracker.wait_for_ops(new LambdaContext([this, r](int) {
982 handle_shut_down(r);
983 }));
984 return;
985 }
986
987 if (!m_status_removed) {
988 auto ctx = new LambdaContext([this, r](int) {
989 m_status_removed = true;
990 handle_shut_down(r);
991 });
992 remove_image_status(m_delete_in_progress, ctx);
993 return;
994 }
995
996 if (m_state_builder != nullptr) {
997 m_state_builder->destroy();
998 m_state_builder = nullptr;
999 }
1000
1001 dout(10) << "stop complete" << dendl;
1002 Context *on_start = nullptr;
1003 Contexts on_stop_contexts;
1004 {
1005 std::lock_guard locker{m_lock};
1006 std::swap(on_start, m_on_start_finish);
1007 on_stop_contexts = std::move(m_on_stop_contexts);
1008 m_stop_requested = false;
1009 ceph_assert(m_state == STATE_STOPPING);
1010 m_state = STATE_STOPPED;
1011 }
1012
1013 if (on_start != nullptr) {
1014 dout(10) << "on start finish complete, r=" << r << dendl;
1015 on_start->complete(r);
1016 r = 0;
1017 }
1018 for (auto ctx : on_stop_contexts) {
1019 dout(10) << "on stop finish " << ctx << " complete, r=" << r << dendl;
1020 ctx->complete(r);
1021 }
1022 }
1023
1024 template <typename I>
1025 void ImageReplayer<I>::handle_replayer_notification() {
1026 dout(10) << dendl;
1027
1028 std::unique_lock locker{m_lock};
1029 if (m_state != STATE_REPLAYING) {
1030 // might be attempting to shut down
1031 return;
1032 }
1033
1034 {
1035 // detect a rename of the local image
1036 ceph_assert(m_state_builder != nullptr &&
1037 m_state_builder->local_image_ctx != nullptr);
1038 std::shared_lock image_locker{m_state_builder->local_image_ctx->image_lock};
1039 if (m_local_image_name != m_state_builder->local_image_ctx->name) {
1040 // will re-register with new name after next status update
1041 dout(10) << "image renamed" << dendl;
1042 m_local_image_name = m_state_builder->local_image_ctx->name;
1043 }
1044 }
1045
1046 // replayer cannot be shut down while notification is in-flight
1047 ceph_assert(m_replayer != nullptr);
1048 locker.unlock();
1049
1050 if (m_replayer->is_resync_requested()) {
1051 dout(10) << "resync requested" << dendl;
1052 m_resync_requested = true;
1053 on_stop_journal_replay(0, "resync requested");
1054 return;
1055 }
1056
1057 if (!m_replayer->is_replaying()) {
1058 auto error_code = m_replayer->get_error_code();
1059 auto error_description = m_replayer->get_error_description();
1060 dout(10) << "replay interrupted: "
1061 << "r=" << error_code << ", "
1062 << "error=" << error_description << dendl;
1063 on_stop_journal_replay(error_code, error_description);
1064 return;
1065 }
1066
1067 update_mirror_image_status(false, {});
1068 }
1069
1070 template <typename I>
1071 std::string ImageReplayer<I>::to_string(const State state) {
1072 switch (state) {
1073 case ImageReplayer<I>::STATE_STARTING:
1074 return "Starting";
1075 case ImageReplayer<I>::STATE_REPLAYING:
1076 return "Replaying";
1077 case ImageReplayer<I>::STATE_STOPPING:
1078 return "Stopping";
1079 case ImageReplayer<I>::STATE_STOPPED:
1080 return "Stopped";
1081 default:
1082 break;
1083 }
1084 return "Unknown(" + stringify(state) + ")";
1085 }
1086
1087 template <typename I>
1088 void ImageReplayer<I>::register_admin_socket_hook() {
1089 ImageReplayerAdminSocketHook<I> *asok_hook;
1090 {
1091 std::lock_guard locker{m_lock};
1092 if (m_asok_hook != nullptr) {
1093 return;
1094 }
1095
1096 dout(15) << "registered asok hook: " << m_image_spec << dendl;
1097 asok_hook = new ImageReplayerAdminSocketHook<I>(
1098 g_ceph_context, m_image_spec, this);
1099 int r = asok_hook->register_commands();
1100 if (r == 0) {
1101 m_asok_hook = asok_hook;
1102 return;
1103 }
1104 derr << "error registering admin socket commands" << dendl;
1105 }
1106 delete asok_hook;
1107 }
1108
1109 template <typename I>
1110 void ImageReplayer<I>::unregister_admin_socket_hook() {
1111 dout(15) << dendl;
1112
1113 AdminSocketHook *asok_hook = nullptr;
1114 {
1115 std::lock_guard locker{m_lock};
1116 std::swap(asok_hook, m_asok_hook);
1117 }
1118 delete asok_hook;
1119 }
1120
1121 template <typename I>
1122 void ImageReplayer<I>::reregister_admin_socket_hook() {
1123 std::unique_lock locker{m_lock};
1124 if (m_state == STATE_STARTING && m_bootstrap_request != nullptr) {
1125 m_local_image_name = m_bootstrap_request->get_local_image_name();
1126 }
1127
1128 auto image_spec = image_replayer::util::compute_image_spec(
1129 m_local_io_ctx, m_local_image_name);
1130 if (m_asok_hook != nullptr && m_image_spec == image_spec) {
1131 return;
1132 }
1133
1134 dout(15) << "old_image_spec=" << m_image_spec << ", "
1135 << "new_image_spec=" << image_spec << dendl;
1136 m_image_spec = image_spec;
1137
1138 if (m_state == STATE_STOPPING || m_state == STATE_STOPPED) {
1139 // no need to re-register if stopping
1140 return;
1141 }
1142 locker.unlock();
1143
1144 unregister_admin_socket_hook();
1145 register_admin_socket_hook();
1146 }
1147
1148 template <typename I>
1149 void ImageReplayer<I>::remove_image_status(bool force, Context *on_finish)
1150 {
1151 auto ctx = new LambdaContext([this, force, on_finish](int) {
1152 remove_image_status_remote(force, on_finish);
1153 });
1154
1155 if (m_local_status_updater->exists(m_global_image_id)) {
1156 dout(15) << "removing local mirror image status" << dendl;
1157 if (force) {
1158 m_local_status_updater->remove_mirror_image_status(
1159 m_global_image_id, true, ctx);
1160 } else {
1161 m_local_status_updater->remove_refresh_mirror_image_status(
1162 m_global_image_id, ctx);
1163 }
1164 return;
1165 }
1166
1167 ctx->complete(0);
1168 }
1169
1170 template <typename I>
1171 void ImageReplayer<I>::remove_image_status_remote(bool force, Context *on_finish)
1172 {
1173 if (m_remote_image_peer.mirror_status_updater != nullptr &&
1174 m_remote_image_peer.mirror_status_updater->exists(m_global_image_id)) {
1175 dout(15) << "removing remote mirror image status" << dendl;
1176 if (force) {
1177 m_remote_image_peer.mirror_status_updater->remove_mirror_image_status(
1178 m_global_image_id, true, on_finish);
1179 } else {
1180 m_remote_image_peer.mirror_status_updater->remove_refresh_mirror_image_status(
1181 m_global_image_id, on_finish);
1182 }
1183 return;
1184 }
1185 if (on_finish) {
1186 on_finish->complete(0);
1187 }
1188 }
1189
1190 template <typename I>
1191 std::ostream &operator<<(std::ostream &os, const ImageReplayer<I> &replayer)
1192 {
1193 os << "ImageReplayer: " << &replayer << " [" << replayer.get_local_pool_id()
1194 << "/" << replayer.get_global_image_id() << "]";
1195 return os;
1196 }
1197
1198 } // namespace mirror
1199 } // namespace rbd
1200
1201 template class rbd::mirror::ImageReplayer<librbd::ImageCtx>;