]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/ImageReplayer.cc
update sources to v12.1.0
[ceph.git] / ceph / src / tools / rbd_mirror / ImageReplayer.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "include/compat.h"
5#include "common/Formatter.h"
6#include "common/debug.h"
7#include "common/errno.h"
8#include "include/stringify.h"
9#include "cls/rbd/cls_rbd_client.h"
10#include "common/Timer.h"
11#include "common/WorkQueue.h"
12#include "global/global_context.h"
13#include "journal/Journaler.h"
14#include "journal/ReplayHandler.h"
15#include "journal/Settings.h"
16#include "librbd/ExclusiveLock.h"
17#include "librbd/ImageCtx.h"
18#include "librbd/ImageState.h"
19#include "librbd/Journal.h"
20#include "librbd/Operations.h"
21#include "librbd/Utils.h"
22#include "librbd/journal/Replay.h"
23#include "ImageReplayer.h"
7c673cae
FG
24#include "Threads.h"
25#include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
26#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
27#include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
28#include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
29#include "tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h"
30
31#define dout_context g_ceph_context
32#define dout_subsys ceph_subsys_rbd_mirror
33#undef dout_prefix
34#define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
35 << __func__ << ": "
36
37using std::map;
38using std::string;
39using std::unique_ptr;
40using std::shared_ptr;
41using std::vector;
42
43namespace rbd {
44namespace mirror {
45
46using librbd::util::create_context_callback;
47using librbd::util::create_rados_callback;
48using namespace rbd::mirror::image_replayer;
49
50template <typename I>
51std::ostream &operator<<(std::ostream &os,
52 const typename ImageReplayer<I>::State &state);
53
54namespace {
55
56template <typename I>
57struct ReplayHandler : public ::journal::ReplayHandler {
58 ImageReplayer<I> *replayer;
59 ReplayHandler(ImageReplayer<I> *replayer) : replayer(replayer) {}
60 void get() override {}
61 void put() override {}
62
63 void handle_entries_available() override {
64 replayer->handle_replay_ready();
65 }
66 void handle_complete(int r) override {
67 std::stringstream ss;
68 if (r < 0) {
69 ss << "replay completed with error: " << cpp_strerror(r);
70 }
71 replayer->handle_replay_complete(r, ss.str());
72 }
73};
74
75class ImageReplayerAdminSocketCommand {
76public:
77 virtual ~ImageReplayerAdminSocketCommand() {}
78 virtual bool call(Formatter *f, stringstream *ss) = 0;
79};
80
81template <typename I>
82class StatusCommand : public ImageReplayerAdminSocketCommand {
83public:
84 explicit StatusCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
85
86 bool call(Formatter *f, stringstream *ss) override {
87 replayer->print_status(f, ss);
88 return true;
89 }
90
91private:
92 ImageReplayer<I> *replayer;
93};
94
95template <typename I>
96class StartCommand : public ImageReplayerAdminSocketCommand {
97public:
98 explicit StartCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
99
100 bool call(Formatter *f, stringstream *ss) override {
101 replayer->start(nullptr, true);
102 return true;
103 }
104
105private:
106 ImageReplayer<I> *replayer;
107};
108
109template <typename I>
110class StopCommand : public ImageReplayerAdminSocketCommand {
111public:
112 explicit StopCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
113
114 bool call(Formatter *f, stringstream *ss) override {
115 replayer->stop(nullptr, true);
116 return true;
117 }
118
119private:
120 ImageReplayer<I> *replayer;
121};
122
123template <typename I>
124class RestartCommand : public ImageReplayerAdminSocketCommand {
125public:
126 explicit RestartCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
127
128 bool call(Formatter *f, stringstream *ss) override {
129 replayer->restart();
130 return true;
131 }
132
133private:
134 ImageReplayer<I> *replayer;
135};
136
137template <typename I>
138class FlushCommand : public ImageReplayerAdminSocketCommand {
139public:
140 explicit FlushCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
141
142 bool call(Formatter *f, stringstream *ss) override {
143 C_SaferCond cond;
144 replayer->flush(&cond);
145 int r = cond.wait();
146 if (r < 0) {
147 *ss << "flush: " << cpp_strerror(r);
148 return false;
149 }
150 return true;
151 }
152
153private:
154 ImageReplayer<I> *replayer;
155};
156
157template <typename I>
158class ImageReplayerAdminSocketHook : public AdminSocketHook {
159public:
160 ImageReplayerAdminSocketHook(CephContext *cct, const std::string &name,
31f18b77
FG
161 ImageReplayer<I> *replayer)
162 : admin_socket(cct->get_admin_socket()),
163 lock("ImageReplayerAdminSocketHook::lock " +
164 replayer->get_global_image_id()) {
7c673cae
FG
165 std::string command;
166 int r;
167
168 command = "rbd mirror status " + name;
169 r = admin_socket->register_command(command, command, this,
170 "get status for rbd mirror " + name);
171 if (r == 0) {
172 commands[command] = new StatusCommand<I>(replayer);
173 }
174
175 command = "rbd mirror start " + name;
176 r = admin_socket->register_command(command, command, this,
177 "start rbd mirror " + name);
178 if (r == 0) {
179 commands[command] = new StartCommand<I>(replayer);
180 }
181
182 command = "rbd mirror stop " + name;
183 r = admin_socket->register_command(command, command, this,
184 "stop rbd mirror " + name);
185 if (r == 0) {
186 commands[command] = new StopCommand<I>(replayer);
187 }
188
189 command = "rbd mirror restart " + name;
190 r = admin_socket->register_command(command, command, this,
191 "restart rbd mirror " + name);
192 if (r == 0) {
193 commands[command] = new RestartCommand<I>(replayer);
194 }
195
196 command = "rbd mirror flush " + name;
197 r = admin_socket->register_command(command, command, this,
198 "flush rbd mirror " + name);
199 if (r == 0) {
200 commands[command] = new FlushCommand<I>(replayer);
201 }
202 }
203
204 ~ImageReplayerAdminSocketHook() override {
31f18b77 205 Mutex::Locker locker(lock);
7c673cae
FG
206 for (Commands::const_iterator i = commands.begin(); i != commands.end();
207 ++i) {
208 (void)admin_socket->unregister_command(i->first);
209 delete i->second;
210 }
31f18b77 211 commands.clear();
7c673cae
FG
212 }
213
214 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
215 bufferlist& out) override {
31f18b77 216 Mutex::Locker locker(lock);
7c673cae
FG
217 Commands::const_iterator i = commands.find(command);
218 assert(i != commands.end());
219 Formatter *f = Formatter::create(format);
220 stringstream ss;
221 bool r = i->second->call(f, &ss);
222 delete f;
223 out.append(ss);
224 return r;
225 }
226
227private:
228 typedef std::map<std::string, ImageReplayerAdminSocketCommand*> Commands;
229
230 AdminSocket *admin_socket;
31f18b77 231 Mutex lock;
7c673cae
FG
232 Commands commands;
233};
234
235uint32_t calculate_replay_delay(const utime_t &event_time,
236 int mirroring_replay_delay) {
237 if (mirroring_replay_delay <= 0) {
238 return 0;
239 }
240
241 utime_t now = ceph_clock_now();
242 if (event_time + mirroring_replay_delay <= now) {
243 return 0;
244 }
245
246 // ensure it is rounded up when converting to integer
247 return (event_time + mirroring_replay_delay - now) + 1;
248}
249
250} // anonymous namespace
251
252template <typename I>
253void ImageReplayer<I>::BootstrapProgressContext::update_progress(
254 const std::string &description, bool flush)
255{
256 const std::string desc = "bootstrapping, " + description;
257 replayer->set_state_description(0, desc);
258 if (flush) {
259 replayer->update_mirror_image_status(false, boost::none);
260 }
261}
262
263template <typename I>
264void ImageReplayer<I>::RemoteJournalerListener::handle_update(
265 ::journal::JournalMetadata *) {
266 FunctionContext *ctx = new FunctionContext([this](int r) {
267 replayer->handle_remote_journal_metadata_updated();
268 });
269 replayer->m_threads->work_queue->queue(ctx, 0);
270}
271
272template <typename I>
273ImageReplayer<I>::ImageReplayer(Threads<librbd::ImageCtx> *threads,
274 shared_ptr<ImageDeleter> image_deleter,
31f18b77 275 InstanceWatcher<I> *instance_watcher,
7c673cae
FG
276 RadosRef local,
277 const std::string &local_mirror_uuid,
278 int64_t local_pool_id,
279 const std::string &global_image_id) :
280 m_threads(threads),
281 m_image_deleter(image_deleter),
31f18b77 282 m_instance_watcher(instance_watcher),
7c673cae
FG
283 m_local(local),
284 m_local_mirror_uuid(local_mirror_uuid),
285 m_local_pool_id(local_pool_id),
286 m_global_image_id(global_image_id),
287 m_lock("rbd::mirror::ImageReplayer " + stringify(local_pool_id) + " " +
288 global_image_id),
289 m_progress_cxt(this),
290 m_journal_listener(new JournalListener(this)),
291 m_remote_listener(this)
292{
293 // Register asok commands using a temporary "remote_pool_name/global_image_id"
294 // name. When the image name becomes known on start the asok commands will be
295 // re-registered using "remote_pool_name/remote_image_name" name.
296
297 std::string pool_name;
298 int r = m_local->pool_reverse_lookup(m_local_pool_id, &pool_name);
299 if (r < 0) {
300 derr << "error resolving local pool " << m_local_pool_id
301 << ": " << cpp_strerror(r) << dendl;
302 pool_name = stringify(m_local_pool_id);
303 }
304
305 m_name = pool_name + "/" + m_global_image_id;
306 dout(20) << "registered asok hook: " << m_name << dendl;
307 m_asok_hook = new ImageReplayerAdminSocketHook<I>(g_ceph_context, m_name,
308 this);
309}
310
311template <typename I>
312ImageReplayer<I>::~ImageReplayer()
313{
314 assert(m_event_preprocessor == nullptr);
315 assert(m_replay_status_formatter == nullptr);
316 assert(m_local_image_ctx == nullptr);
317 assert(m_local_replay == nullptr);
318 assert(m_remote_journaler == nullptr);
319 assert(m_replay_handler == nullptr);
320 assert(m_on_start_finish == nullptr);
321 assert(m_on_stop_finish == nullptr);
322 assert(m_bootstrap_request == nullptr);
323 assert(m_in_flight_status_updates == 0);
324
325 delete m_journal_listener;
326 delete m_asok_hook;
327}
328
329template <typename I>
330void ImageReplayer<I>::add_remote_image(const std::string &mirror_uuid,
331 const std::string &image_id,
332 librados::IoCtx &io_ctx) {
333 Mutex::Locker locker(m_lock);
334
335 RemoteImage remote_image(mirror_uuid, image_id, io_ctx);
336 auto it = m_remote_images.find(remote_image);
337 if (it == m_remote_images.end()) {
338 m_remote_images.insert(remote_image);
339 }
340}
341
342template <typename I>
343void ImageReplayer<I>::remove_remote_image(const std::string &mirror_uuid,
344 const std::string &image_id,
345 bool schedule_delete) {
346 Mutex::Locker locker(m_lock);
347 m_remote_images.erase({mirror_uuid, image_id});
348}
349
350template <typename I>
351bool ImageReplayer<I>::remote_images_empty() const {
352 Mutex::Locker locker(m_lock);
353 return m_remote_images.empty();
354}
355
356template <typename I>
357void ImageReplayer<I>::set_state_description(int r, const std::string &desc) {
358 dout(20) << r << " " << desc << dendl;
359
360 Mutex::Locker l(m_lock);
361 m_last_r = r;
362 m_state_desc = desc;
363}
364
365template <typename I>
366void ImageReplayer<I>::start(Context *on_finish, bool manual)
367{
368 dout(20) << "on_finish=" << on_finish << dendl;
369
370 int r = 0;
371 {
372 Mutex::Locker locker(m_lock);
373 if (!is_stopped_()) {
374 derr << "already running" << dendl;
375 r = -EINVAL;
376 } else if (m_manual_stop && !manual) {
377 dout(5) << "stopped manually, ignoring start without manual flag"
378 << dendl;
379 r = -EPERM;
380 } else {
381 m_state = STATE_STARTING;
382 m_last_r = 0;
383 m_state_desc.clear();
384 m_manual_stop = false;
385
386 if (on_finish != nullptr) {
387 assert(m_on_start_finish == nullptr);
388 m_on_start_finish = on_finish;
389 }
390 assert(m_on_stop_finish == nullptr);
391 }
392 }
393
394 if (r < 0) {
395 if (on_finish) {
396 on_finish->complete(r);
397 }
398 return;
399 }
400
401 r = m_local->ioctx_create2(m_local_pool_id, m_local_ioctx);
402 if (r < 0) {
403 derr << "error opening ioctx for local pool " << m_local_pool_id
404 << ": " << cpp_strerror(r) << dendl;
405 on_start_fail(r, "error opening local pool");
406 return;
407 }
408
409 prepare_local_image();
410}
411
412template <typename I>
413void ImageReplayer<I>::prepare_local_image() {
414 dout(20) << dendl;
415
416 Context *ctx = create_context_callback<
417 ImageReplayer, &ImageReplayer<I>::handle_prepare_local_image>(this);
418 auto req = PrepareLocalImageRequest<I>::create(
419 m_local_ioctx, m_global_image_id, &m_local_image_id,
420 &m_local_image_tag_owner, m_threads->work_queue, ctx);
421 req->send();
422}
423
424template <typename I>
425void ImageReplayer<I>::handle_prepare_local_image(int r) {
426 dout(20) << "r=" << r << dendl;
427
428 if (r == -ENOENT) {
429 dout(20) << "local image does not exist" << dendl;
430 } else if (r < 0) {
431 on_start_fail(r, "error preparing local image for replay");
432 return;
433 } else if (m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) {
434 dout(5) << "local image is primary" << dendl;
435 on_start_fail(0, "local image is primary");
436 return;
437 }
438
439 // local image doesn't exist or is non-primary
440 bootstrap();
441}
442
443template <typename I>
444void ImageReplayer<I>::bootstrap() {
445 dout(20) << dendl;
446
447 if (m_remote_images.empty()) {
448 on_start_fail(0, "waiting for primary remote image");
449 return;
450 }
451
452 // TODO bootstrap will need to support multiple remote images
453 m_remote_image = *m_remote_images.begin();
454
455 CephContext *cct = static_cast<CephContext *>(m_local->cct());
456 journal::Settings settings;
457 settings.commit_interval = cct->_conf->rbd_mirror_journal_commit_age;
458 settings.max_fetch_bytes = cct->_conf->rbd_mirror_journal_max_fetch_bytes;
459
460 m_remote_journaler = new Journaler(m_threads->work_queue,
461 m_threads->timer,
462 &m_threads->timer_lock,
463 m_remote_image.io_ctx,
464 m_remote_image.image_id,
465 m_local_mirror_uuid, settings);
466
467 Context *ctx = create_context_callback<
468 ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
469
470 BootstrapRequest<I> *request = BootstrapRequest<I>::create(
31f18b77 471 m_local_ioctx, m_remote_image.io_ctx, m_instance_watcher,
7c673cae
FG
472 &m_local_image_ctx, m_local_image_id, m_remote_image.image_id,
473 m_global_image_id, m_threads->work_queue, m_threads->timer,
474 &m_threads->timer_lock, m_local_mirror_uuid, m_remote_image.mirror_uuid,
475 m_remote_journaler, &m_client_meta, ctx, &m_do_resync, &m_progress_cxt);
476
477 {
478 Mutex::Locker locker(m_lock);
479 request->get();
480 m_bootstrap_request = request;
481 }
482
483 update_mirror_image_status(false, boost::none);
484 reschedule_update_status_task(10);
485
486 request->send();
487}
488
489template <typename I>
490void ImageReplayer<I>::handle_bootstrap(int r) {
491 dout(20) << "r=" << r << dendl;
492 {
493 Mutex::Locker locker(m_lock);
494 m_bootstrap_request->put();
495 m_bootstrap_request = nullptr;
496 if (m_local_image_ctx) {
497 m_local_image_id = m_local_image_ctx->id;
498 }
499 }
500
501 if (r == -EREMOTEIO) {
502 m_local_image_tag_owner = "";
503 dout(5) << "remote image is non-primary or local image is primary" << dendl;
504 on_start_fail(0, "remote image is non-primary or local image is primary");
505 return;
506 } else if (r == -EEXIST) {
507 m_local_image_tag_owner = "";
508 on_start_fail(r, "split-brain detected");
509 return;
510 } else if (r < 0) {
511 on_start_fail(r, "error bootstrapping replay");
512 return;
513 } else if (on_start_interrupted()) {
514 return;
515 }
516
517 assert(m_local_journal == nullptr);
518 {
519 RWLock::RLocker snap_locker(m_local_image_ctx->snap_lock);
520 if (m_local_image_ctx->journal != nullptr) {
521 m_local_journal = m_local_image_ctx->journal;
522 m_local_journal->add_listener(m_journal_listener);
523 }
524 }
525
526 if (m_local_journal == nullptr) {
527 on_start_fail(-EINVAL, "error accessing local journal");
528 return;
529 }
530
531 {
532 Mutex::Locker locker(m_lock);
533
534 if (m_do_resync) {
535 Context *on_finish = m_on_start_finish;
536 m_stopping_for_resync = true;
537 FunctionContext *ctx = new FunctionContext([this, on_finish](int r) {
538 if (r < 0) {
539 if (on_finish) {
540 on_finish->complete(r);
541 }
542 return;
543 }
544 resync_image(on_finish);
545 });
546 m_on_start_finish = ctx;
547 }
548
549 std::string name = m_local_ioctx.get_pool_name() + "/" +
550 m_local_image_ctx->name;
551 if (m_name != name) {
552 m_name = name;
553 if (m_asok_hook) {
554 // Re-register asok commands using the new name.
555 delete m_asok_hook;
556 m_asok_hook = nullptr;
557 }
558 }
559 if (!m_asok_hook) {
560 dout(20) << "registered asok hook: " << m_name << dendl;
561 m_asok_hook = new ImageReplayerAdminSocketHook<I>(g_ceph_context, m_name,
562 this);
563 }
564 }
565
566 update_mirror_image_status(false, boost::none);
567 init_remote_journaler();
568}
569
570template <typename I>
571void ImageReplayer<I>::init_remote_journaler() {
572 dout(20) << dendl;
573
574 Context *ctx = create_context_callback<
575 ImageReplayer, &ImageReplayer<I>::handle_init_remote_journaler>(this);
576 m_remote_journaler->init(ctx);
577}
578
579template <typename I>
580void ImageReplayer<I>::handle_init_remote_journaler(int r) {
581 dout(20) << "r=" << r << dendl;
582
583 if (r < 0) {
584 derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl;
585 on_start_fail(r, "error initializing remote journal");
586 return;
587 } else if (on_start_interrupted()) {
588 return;
589 }
590
591 m_remote_journaler->add_listener(&m_remote_listener);
592
593 cls::journal::Client client;
594 r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
595 if (r < 0) {
596 derr << "error retrieving remote journal client: " << cpp_strerror(r)
597 << dendl;
598 on_start_fail(r, "error retrieving remote journal client");
599 return;
600 }
601
602 if (client.state != cls::journal::CLIENT_STATE_CONNECTED) {
603 dout(5) << "client flagged disconnected, stopping image replay" << dendl;
604 if (m_local_image_ctx->mirroring_resync_after_disconnect) {
605 Mutex::Locker locker(m_lock);
606 m_stopping_for_resync = true;
607 }
608 on_start_fail(-ENOTCONN, "disconnected");
609 return;
610 }
611
612 start_replay();
613}
614
615template <typename I>
616void ImageReplayer<I>::start_replay() {
617 dout(20) << dendl;
618
619 Context *start_ctx = create_context_callback<
620 ImageReplayer, &ImageReplayer<I>::handle_start_replay>(this);
621 m_local_journal->start_external_replay(&m_local_replay, start_ctx);
622}
623
624template <typename I>
625void ImageReplayer<I>::handle_start_replay(int r) {
626 dout(20) << "r=" << r << dendl;
627
628 if (r < 0) {
629 assert(m_local_replay == nullptr);
630 derr << "error starting external replay on local image "
631 << m_local_image_id << ": " << cpp_strerror(r) << dendl;
632 on_start_fail(r, "error starting replay on local image");
633 return;
634 }
635
636 Context *on_finish(nullptr);
637 {
638 Mutex::Locker locker(m_lock);
639 assert(m_state == STATE_STARTING);
640 m_state = STATE_REPLAYING;
641 std::swap(m_on_start_finish, on_finish);
642 }
643
644 m_event_preprocessor = EventPreprocessor<I>::create(
645 *m_local_image_ctx, *m_remote_journaler, m_local_mirror_uuid,
646 &m_client_meta, m_threads->work_queue);
647 m_replay_status_formatter =
648 ReplayStatusFormatter<I>::create(m_remote_journaler, m_local_mirror_uuid);
649
650 update_mirror_image_status(true, boost::none);
651 reschedule_update_status_task(30);
652
653 dout(20) << "start succeeded" << dendl;
654 if (on_finish != nullptr) {
655 dout(20) << "on finish complete, r=" << r << dendl;
656 on_finish->complete(r);
657 }
658
659 if (on_replay_interrupted()) {
660 return;
661 }
662
663 {
664 CephContext *cct = static_cast<CephContext *>(m_local->cct());
665 double poll_seconds = cct->_conf->rbd_mirror_journal_poll_age;
666
667 Mutex::Locker locker(m_lock);
668 m_replay_handler = new ReplayHandler<I>(this);
669 m_remote_journaler->start_live_replay(m_replay_handler, poll_seconds);
670
671 dout(20) << "m_remote_journaler=" << *m_remote_journaler << dendl;
672 }
673
674}
675
676template <typename I>
677void ImageReplayer<I>::on_start_fail(int r, const std::string &desc)
678{
679 dout(20) << "r=" << r << dendl;
680 Context *ctx = new FunctionContext([this, r, desc](int _r) {
681 {
682 Mutex::Locker locker(m_lock);
683 assert(m_state == STATE_STARTING);
684 m_state = STATE_STOPPING;
685 if (r < 0 && r != -ECANCELED) {
686 derr << "start failed: " << cpp_strerror(r) << dendl;
687 } else {
688 dout(20) << "start canceled" << dendl;
689 }
690 }
691
692 set_state_description(r, desc);
693 update_mirror_image_status(false, boost::none);
694 reschedule_update_status_task(-1);
695 shut_down(r);
696 });
697 m_threads->work_queue->queue(ctx, 0);
698}
699
700template <typename I>
701bool ImageReplayer<I>::on_start_interrupted()
702{
703 Mutex::Locker locker(m_lock);
704 assert(m_state == STATE_STARTING);
705 if (m_on_stop_finish == nullptr) {
706 return false;
707 }
708
709 on_start_fail(-ECANCELED);
710 return true;
711}
712
713template <typename I>
714void ImageReplayer<I>::stop(Context *on_finish, bool manual, int r,
715 const std::string& desc)
716{
717 dout(20) << "on_finish=" << on_finish << ", manual=" << manual
718 << ", desc=" << desc << dendl;
719
720 image_replayer::BootstrapRequest<I> *bootstrap_request = nullptr;
721 bool shut_down_replay = false;
722 bool running = true;
723 bool canceled_task = false;
724 {
725 Mutex::Locker locker(m_lock);
726
727 if (!is_running_()) {
728 running = false;
729 } else {
730 if (!is_stopped_()) {
731 if (m_state == STATE_STARTING) {
732 dout(20) << "canceling start" << dendl;
733 if (m_bootstrap_request) {
734 bootstrap_request = m_bootstrap_request;
735 bootstrap_request->get();
736 }
737 } else {
738 dout(20) << "interrupting replay" << dendl;
739 shut_down_replay = true;
740 }
741
742 assert(m_on_stop_finish == nullptr);
743 std::swap(m_on_stop_finish, on_finish);
744 m_stop_requested = true;
745 m_manual_stop = manual;
746
747 Mutex::Locker timer_locker(m_threads->timer_lock);
748 if (m_delayed_preprocess_task != nullptr) {
749 canceled_task = m_threads->timer->cancel_event(
750 m_delayed_preprocess_task);
751 assert(canceled_task);
752 m_delayed_preprocess_task = nullptr;
753 }
754 }
755 }
756 }
757
758 // avoid holding lock since bootstrap request will update status
759 if (bootstrap_request != nullptr) {
760 bootstrap_request->cancel();
761 bootstrap_request->put();
762 }
763
764 if (canceled_task) {
765 m_event_replay_tracker.finish_op();
766 on_replay_interrupted();
767 }
768
769 if (!running) {
770 dout(20) << "not running" << dendl;
771 if (on_finish) {
772 on_finish->complete(-EINVAL);
773 }
774 return;
775 }
776
777 if (shut_down_replay) {
778 on_stop_journal_replay(r, desc);
779 } else if (on_finish != nullptr) {
780 on_finish->complete(0);
781 }
782}
783
784template <typename I>
785void ImageReplayer<I>::on_stop_journal_replay(int r, const std::string &desc)
786{
787 dout(20) << "enter" << dendl;
788
789 {
790 Mutex::Locker locker(m_lock);
791 if (m_state != STATE_REPLAYING) {
792 // might be invoked multiple times while stopping
793 return;
794 }
795 m_stop_requested = true;
796 m_state = STATE_STOPPING;
797 }
798
799 set_state_description(r, desc);
800 update_mirror_image_status(false, boost::none);
801 reschedule_update_status_task(-1);
802 shut_down(0);
803}
804
805template <typename I>
806void ImageReplayer<I>::handle_replay_ready()
807{
808 dout(20) << "enter" << dendl;
809 if (on_replay_interrupted()) {
810 return;
811 }
812
813 if (!m_remote_journaler->try_pop_front(&m_replay_entry, &m_replay_tag_tid)) {
814 return;
815 }
816
817 m_event_replay_tracker.start_op();
31f18b77
FG
818
819 m_lock.Lock();
820 bool stopping = (m_state == STATE_STOPPING);
821 m_lock.Unlock();
822
823 if (stopping) {
824 dout(10) << "stopping event replay" << dendl;
825 m_event_replay_tracker.finish_op();
826 return;
827 }
828
7c673cae
FG
829 if (m_replay_tag_valid && m_replay_tag.tid == m_replay_tag_tid) {
830 preprocess_entry();
831 return;
832 }
833
834 replay_flush();
835}
836
837template <typename I>
838void ImageReplayer<I>::restart(Context *on_finish)
839{
840 FunctionContext *ctx = new FunctionContext(
841 [this, on_finish](int r) {
842 if (r < 0) {
843 // Try start anyway.
844 }
845 start(on_finish, true);
846 });
847 stop(ctx);
848}
849
850template <typename I>
851void ImageReplayer<I>::flush(Context *on_finish)
852{
853 dout(20) << "enter" << dendl;
854
855 {
856 Mutex::Locker locker(m_lock);
857 if (m_state == STATE_REPLAYING) {
858 Context *ctx = new FunctionContext(
859 [on_finish](int r) {
860 if (on_finish != nullptr) {
861 on_finish->complete(r);
862 }
863 });
864 on_flush_local_replay_flush_start(ctx);
865 return;
866 }
867 }
868
869 if (on_finish) {
870 on_finish->complete(0);
871 }
872}
873
874template <typename I>
875void ImageReplayer<I>::on_flush_local_replay_flush_start(Context *on_flush)
876{
877 dout(20) << "enter" << dendl;
878 FunctionContext *ctx = new FunctionContext(
879 [this, on_flush](int r) {
880 on_flush_local_replay_flush_finish(on_flush, r);
881 });
882
883 assert(m_lock.is_locked());
884 assert(m_state == STATE_REPLAYING);
885 m_local_replay->flush(ctx);
886}
887
888template <typename I>
889void ImageReplayer<I>::on_flush_local_replay_flush_finish(Context *on_flush,
890 int r)
891{
892 dout(20) << "r=" << r << dendl;
893 if (r < 0) {
894 derr << "error flushing local replay: " << cpp_strerror(r) << dendl;
895 on_flush->complete(r);
896 return;
897 }
898
899 on_flush_flush_commit_position_start(on_flush);
900}
901
902template <typename I>
903void ImageReplayer<I>::on_flush_flush_commit_position_start(Context *on_flush)
904{
905 FunctionContext *ctx = new FunctionContext(
906 [this, on_flush](int r) {
907 on_flush_flush_commit_position_finish(on_flush, r);
908 });
909
910 m_remote_journaler->flush_commit_position(ctx);
911}
912
913template <typename I>
914void ImageReplayer<I>::on_flush_flush_commit_position_finish(Context *on_flush,
915 int r)
916{
917 if (r < 0) {
918 derr << "error flushing remote journal commit position: "
919 << cpp_strerror(r) << dendl;
920 }
921
922 update_mirror_image_status(false, boost::none);
923
924 dout(20) << "flush complete, r=" << r << dendl;
925 on_flush->complete(r);
926}
927
928template <typename I>
929bool ImageReplayer<I>::on_replay_interrupted()
930{
931 bool shut_down;
932 {
933 Mutex::Locker locker(m_lock);
934 shut_down = m_stop_requested;
935 }
936
937 if (shut_down) {
938 on_stop_journal_replay();
939 }
940 return shut_down;
941}
942
943template <typename I>
944void ImageReplayer<I>::print_status(Formatter *f, stringstream *ss)
945{
946 dout(20) << "enter" << dendl;
947
948 Mutex::Locker l(m_lock);
949
950 if (f) {
951 f->open_object_section("image_replayer");
952 f->dump_string("name", m_name);
953 f->dump_string("state", to_string(m_state));
954 f->close_section();
955 f->flush(*ss);
956 } else {
957 *ss << m_name << ": state: " << to_string(m_state);
958 }
959}
960
961template <typename I>
962void ImageReplayer<I>::handle_replay_complete(int r, const std::string &error_desc)
963{
964 dout(20) << "r=" << r << dendl;
965 if (r < 0) {
966 derr << "replay encountered an error: " << cpp_strerror(r) << dendl;
967 set_state_description(r, error_desc);
968 }
969
970 {
971 Mutex::Locker locker(m_lock);
972 m_stop_requested = true;
973 }
974 on_replay_interrupted();
975}
976
977template <typename I>
978void ImageReplayer<I>::replay_flush() {
979 dout(20) << dendl;
980
981 bool interrupted = false;
982 {
983 Mutex::Locker locker(m_lock);
984 if (m_state != STATE_REPLAYING) {
985 dout(20) << "replay interrupted" << dendl;
986 interrupted = true;
987 } else {
988 m_state = STATE_REPLAY_FLUSHING;
989 }
990 }
991
992 if (interrupted) {
993 m_event_replay_tracker.finish_op();
994 return;
995 }
996
997 // shut down the replay to flush all IO and ops and create a new
998 // replayer to handle the new tag epoch
999 Context *ctx = create_context_callback<
1000 ImageReplayer<I>, &ImageReplayer<I>::handle_replay_flush>(this);
1001 ctx = new FunctionContext([this, ctx](int r) {
1002 m_local_image_ctx->journal->stop_external_replay();
1003 m_local_replay = nullptr;
1004
1005 if (r < 0) {
1006 ctx->complete(r);
1007 return;
1008 }
1009
1010 m_local_journal->start_external_replay(&m_local_replay, ctx);
1011 });
1012 m_local_replay->shut_down(false, ctx);
1013}
1014
1015template <typename I>
1016void ImageReplayer<I>::handle_replay_flush(int r) {
1017 dout(20) << "r=" << r << dendl;
1018
1019 {
1020 Mutex::Locker locker(m_lock);
1021 assert(m_state == STATE_REPLAY_FLUSHING);
1022 m_state = STATE_REPLAYING;
1023 }
1024
1025 if (r < 0) {
1026 derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl;
1027 m_event_replay_tracker.finish_op();
1028 handle_replay_complete(r, "replay flush encountered an error");
1029 return;
1030 } else if (on_replay_interrupted()) {
1031 m_event_replay_tracker.finish_op();
1032 return;
1033 }
1034
1035 get_remote_tag();
1036}
1037
1038template <typename I>
1039void ImageReplayer<I>::get_remote_tag() {
1040 dout(20) << "tag_tid: " << m_replay_tag_tid << dendl;
1041
1042 Context *ctx = create_context_callback<
1043 ImageReplayer, &ImageReplayer<I>::handle_get_remote_tag>(this);
1044 m_remote_journaler->get_tag(m_replay_tag_tid, &m_replay_tag, ctx);
1045}
1046
1047template <typename I>
1048void ImageReplayer<I>::handle_get_remote_tag(int r) {
1049 dout(20) << "r=" << r << dendl;
1050
1051 if (r == 0) {
1052 try {
1053 bufferlist::iterator it = m_replay_tag.data.begin();
1054 ::decode(m_replay_tag_data, it);
1055 } catch (const buffer::error &err) {
1056 r = -EBADMSG;
1057 }
1058 }
1059
1060 if (r < 0) {
1061 derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": "
1062 << cpp_strerror(r) << dendl;
1063 m_event_replay_tracker.finish_op();
1064 handle_replay_complete(r, "failed to retrieve remote tag");
1065 return;
1066 }
1067
1068 m_replay_tag_valid = true;
1069 dout(20) << "decoded remote tag " << m_replay_tag_tid << ": "
1070 << m_replay_tag_data << dendl;
1071
1072 allocate_local_tag();
1073}
1074
1075template <typename I>
1076void ImageReplayer<I>::allocate_local_tag() {
1077 dout(20) << dendl;
1078
1079 std::string mirror_uuid = m_replay_tag_data.mirror_uuid;
1080 if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID ||
1081 mirror_uuid == m_local_mirror_uuid) {
1082 mirror_uuid = m_remote_image.mirror_uuid;
1083 } else if (mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) {
1084 dout(5) << "encountered image demotion: stopping" << dendl;
1085 Mutex::Locker locker(m_lock);
1086 m_stop_requested = true;
1087 }
1088
1089 librbd::journal::TagPredecessor predecessor(m_replay_tag_data.predecessor);
1090 if (predecessor.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
1091 predecessor.mirror_uuid = m_remote_image.mirror_uuid;
1092 } else if (predecessor.mirror_uuid == m_local_mirror_uuid) {
1093 predecessor.mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
1094 }
1095
1096 dout(20) << "mirror_uuid=" << mirror_uuid << ", "
1097 << "predecessor_mirror_uuid=" << predecessor.mirror_uuid << ", "
1098 << "replay_tag_tid=" << m_replay_tag_tid << ", "
1099 << "replay_tag_data=" << m_replay_tag_data << dendl;
1100 Context *ctx = create_context_callback<
1101 ImageReplayer, &ImageReplayer<I>::handle_allocate_local_tag>(this);
1102 m_local_journal->allocate_tag(mirror_uuid, predecessor, ctx);
1103}
1104
1105template <typename I>
1106void ImageReplayer<I>::handle_allocate_local_tag(int r) {
1107 dout(20) << "r=" << r << dendl;
1108
1109 if (r < 0) {
1110 derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl;
1111 m_event_replay_tracker.finish_op();
1112 handle_replay_complete(r, "failed to allocate journal tag");
1113 return;
1114 }
1115
1116 preprocess_entry();
1117}
1118
1119template <typename I>
1120void ImageReplayer<I>::preprocess_entry() {
1121 dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid()
1122 << dendl;
1123
1124 bufferlist data = m_replay_entry.get_data();
1125 bufferlist::iterator it = data.begin();
1126 int r = m_local_replay->decode(&it, &m_event_entry);
1127 if (r < 0) {
1128 derr << "failed to decode journal event" << dendl;
1129 m_event_replay_tracker.finish_op();
1130 handle_replay_complete(r, "failed to decode journal event");
1131 return;
1132 }
1133
1134 uint32_t delay = calculate_replay_delay(
1135 m_event_entry.timestamp, m_local_image_ctx->mirroring_replay_delay);
1136 if (delay == 0) {
1137 handle_preprocess_entry_ready(0);
1138 return;
1139 }
1140
1141 dout(20) << "delaying replay by " << delay << " sec" << dendl;
1142
1143 Mutex::Locker timer_locker(m_threads->timer_lock);
1144 assert(m_delayed_preprocess_task == nullptr);
1145 m_delayed_preprocess_task = new FunctionContext(
1146 [this](int r) {
1147 assert(m_threads->timer_lock.is_locked());
1148 m_delayed_preprocess_task = nullptr;
1149 m_threads->work_queue->queue(
1150 create_context_callback<ImageReplayer,
1151 &ImageReplayer<I>::handle_preprocess_entry_ready>(this), 0);
1152 });
1153 m_threads->timer->add_event_after(delay, m_delayed_preprocess_task);
1154}
1155
1156template <typename I>
1157void ImageReplayer<I>::handle_preprocess_entry_ready(int r) {
1158 dout(20) << "r=" << r << dendl;
1159 assert(r == 0);
1160
1161 if (!m_event_preprocessor->is_required(m_event_entry)) {
1162 process_entry();
1163 return;
1164 }
1165
1166 Context *ctx = create_context_callback<
1167 ImageReplayer, &ImageReplayer<I>::handle_preprocess_entry_safe>(this);
1168 m_event_preprocessor->preprocess(&m_event_entry, ctx);
1169}
1170
1171template <typename I>
1172void ImageReplayer<I>::handle_preprocess_entry_safe(int r) {
1173 dout(20) << "r=" << r << dendl;
1174
1175 if (r < 0) {
7c673cae 1176 m_event_replay_tracker.finish_op();
31f18b77
FG
1177
1178 if (r == -ECANCELED) {
1179 handle_replay_complete(0, "lost exclusive lock");
1180 } else {
1181 derr << "failed to preprocess journal event" << dendl;
1182 handle_replay_complete(r, "failed to preprocess journal event");
1183 }
7c673cae
FG
1184 return;
1185 }
1186
1187 process_entry();
1188}
1189
1190template <typename I>
1191void ImageReplayer<I>::process_entry() {
1192 dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid()
1193 << dendl;
1194
1195 // stop replaying events if stop has been requested
1196 if (on_replay_interrupted()) {
1197 m_event_replay_tracker.finish_op();
1198 return;
1199 }
1200
1201 Context *on_ready = create_context_callback<
1202 ImageReplayer, &ImageReplayer<I>::handle_process_entry_ready>(this);
1203 Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry));
1204
1205 m_local_replay->process(m_event_entry, on_ready, on_commit);
1206}
1207
1208template <typename I>
1209void ImageReplayer<I>::handle_process_entry_ready(int r) {
1210 dout(20) << dendl;
1211 assert(r == 0);
1212
1213 // attempt to process the next event
1214 handle_replay_ready();
1215}
1216
1217template <typename I>
1218void ImageReplayer<I>::handle_process_entry_safe(const ReplayEntry& replay_entry,
1219 int r) {
1220 dout(20) << "commit_tid=" << replay_entry.get_commit_tid() << ", r=" << r
1221 << dendl;
1222
1223 if (r < 0) {
1224 derr << "failed to commit journal event: " << cpp_strerror(r) << dendl;
1225 handle_replay_complete(r, "failed to commit journal event");
1226 } else {
1227 assert(m_remote_journaler != nullptr);
1228 m_remote_journaler->committed(replay_entry);
1229 }
1230 m_event_replay_tracker.finish_op();
1231}
1232
1233template <typename I>
1234bool ImageReplayer<I>::update_mirror_image_status(bool force,
1235 const OptionalState &state) {
1236 dout(20) << dendl;
1237 {
1238 Mutex::Locker locker(m_lock);
1239 if (!start_mirror_image_status_update(force, false)) {
1240 return false;
1241 }
1242 }
1243
1244 queue_mirror_image_status_update(state);
1245 return true;
1246}
1247
1248template <typename I>
1249bool ImageReplayer<I>::start_mirror_image_status_update(bool force,
1250 bool restarting) {
1251 assert(m_lock.is_locked());
1252
1253 if (!force && !is_stopped_()) {
1254 if (!is_running_()) {
1255 dout(20) << "shut down in-progress: ignoring update" << dendl;
1256 return false;
1257 } else if (m_in_flight_status_updates > (restarting ? 1 : 0)) {
1258 dout(20) << "already sending update" << dendl;
1259 m_update_status_requested = true;
1260 return false;
1261 }
1262 }
1263
1264 dout(20) << dendl;
1265 ++m_in_flight_status_updates;
1266 return true;
1267}
1268
1269template <typename I>
1270void ImageReplayer<I>::finish_mirror_image_status_update() {
1271 Context *on_finish = nullptr;
1272 {
1273 Mutex::Locker locker(m_lock);
1274 assert(m_in_flight_status_updates > 0);
1275 if (--m_in_flight_status_updates > 0) {
1276 dout(20) << "waiting on " << m_in_flight_status_updates << " in-flight "
1277 << "updates" << dendl;
1278 return;
1279 }
1280
1281 std::swap(on_finish, m_on_update_status_finish);
1282 }
1283
1284 dout(20) << dendl;
1285 if (on_finish != nullptr) {
1286 on_finish->complete(0);
1287 }
1288}
1289
1290template <typename I>
1291void ImageReplayer<I>::queue_mirror_image_status_update(const OptionalState &state) {
1292 dout(20) << dendl;
1293 FunctionContext *ctx = new FunctionContext(
1294 [this, state](int r) {
1295 send_mirror_status_update(state);
1296 });
1297 m_threads->work_queue->queue(ctx, 0);
1298}
1299
1300template <typename I>
1301void ImageReplayer<I>::send_mirror_status_update(const OptionalState &opt_state) {
1302 State state;
1303 std::string state_desc;
1304 int last_r;
1305 bool bootstrapping;
1306 bool stopping_replay;
1307 {
1308 Mutex::Locker locker(m_lock);
1309 state = m_state;
1310 state_desc = m_state_desc;
1311 last_r = m_last_r;
1312 bootstrapping = (m_bootstrap_request != nullptr);
1313 stopping_replay = (m_local_image_ctx != nullptr);
1314 }
1315
1316 if (opt_state) {
1317 state = *opt_state;
1318 }
1319
1320 cls::rbd::MirrorImageStatus status;
1321 status.up = true;
1322 switch (state) {
1323 case STATE_STARTING:
1324 if (bootstrapping) {
1325 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING;
1326 status.description = state_desc.empty() ? "syncing" : state_desc;
1327 } else {
1328 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY;
1329 status.description = "starting replay";
1330 }
1331 break;
1332 case STATE_REPLAYING:
1333 case STATE_REPLAY_FLUSHING:
1334 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
1335 {
1336 Context *on_req_finish = new FunctionContext(
1337 [this](int r) {
1338 dout(20) << "replay status ready: r=" << r << dendl;
1339 if (r >= 0) {
1340 send_mirror_status_update(boost::none);
1341 } else if (r == -EAGAIN) {
1342 // decrement in-flight status update counter
1343 handle_mirror_status_update(r);
1344 }
1345 });
1346
1347 std::string desc;
1348 if (!m_replay_status_formatter->get_or_send_update(&desc,
1349 on_req_finish)) {
1350 dout(20) << "waiting for replay status" << dendl;
1351 return;
1352 }
1353 status.description = "replaying, " + desc;
1354 }
1355 break;
1356 case STATE_STOPPING:
1357 if (stopping_replay) {
1358 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY;
1359 status.description = "stopping replay";
1360 break;
1361 }
1362 // FALLTHROUGH
1363 case STATE_STOPPED:
1364 if (last_r < 0) {
1365 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR;
1366 status.description = state_desc;
1367 } else {
1368 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED;
1369 status.description = state_desc.empty() ? "stopped" : state_desc;
1370 }
1371 break;
1372 default:
1373 assert(!"invalid state");
1374 }
1375
1376 dout(20) << "status=" << status << dendl;
1377 librados::ObjectWriteOperation op;
1378 librbd::cls_client::mirror_image_status_set(&op, m_global_image_id, status);
1379
1380 librados::AioCompletion *aio_comp = create_rados_callback<
1381 ImageReplayer<I>, &ImageReplayer<I>::handle_mirror_status_update>(this);
1382 int r = m_local_ioctx.aio_operate(RBD_MIRRORING, aio_comp, &op);
1383 assert(r == 0);
1384 aio_comp->release();
1385}
1386
1387template <typename I>
1388void ImageReplayer<I>::handle_mirror_status_update(int r) {
1389 dout(20) << "r=" << r << dendl;
1390
1391 bool running = false;
1392 bool started = false;
1393 {
1394 Mutex::Locker locker(m_lock);
1395 bool update_status_requested = false;
1396 std::swap(update_status_requested, m_update_status_requested);
1397
1398 running = is_running_();
1399 if (running && update_status_requested) {
1400 started = start_mirror_image_status_update(false, true);
1401 }
1402 }
1403
1404 // if a deferred update is available, send it -- otherwise reschedule
1405 // the timer task
1406 if (started) {
1407 queue_mirror_image_status_update(boost::none);
1408 } else if (running) {
1409 reschedule_update_status_task();
1410 }
1411
1412 // mark committed status update as no longer in-flight
1413 finish_mirror_image_status_update();
1414}
1415
1416template <typename I>
1417void ImageReplayer<I>::reschedule_update_status_task(int new_interval) {
1418 dout(20) << dendl;
1419
1420 bool canceled_task = false;
1421 {
1422 Mutex::Locker locker(m_lock);
1423 Mutex::Locker timer_locker(m_threads->timer_lock);
1424
1425 if (m_update_status_task) {
1426 canceled_task = m_threads->timer->cancel_event(m_update_status_task);
1427 m_update_status_task = nullptr;
1428 }
1429
1430 if (new_interval > 0) {
1431 m_update_status_interval = new_interval;
1432 }
1433
1434 bool restarting = (new_interval == 0 || canceled_task);
1435 if (new_interval >= 0 && is_running_() &&
1436 start_mirror_image_status_update(false, restarting)) {
1437 m_update_status_task = new FunctionContext(
1438 [this](int r) {
1439 assert(m_threads->timer_lock.is_locked());
1440 m_update_status_task = nullptr;
1441
1442 queue_mirror_image_status_update(boost::none);
1443 });
1444 m_threads->timer->add_event_after(m_update_status_interval,
1445 m_update_status_task);
1446 }
1447 }
1448
1449 if (canceled_task) {
1450 dout(20) << "canceled task" << dendl;
1451 finish_mirror_image_status_update();
1452 }
1453}
1454
1455template <typename I>
1456void ImageReplayer<I>::shut_down(int r) {
1457 dout(20) << "r=" << r << dendl;
1458 {
1459 Mutex::Locker locker(m_lock);
1460 assert(m_state == STATE_STOPPING);
1461
1462 // if status updates are in-flight, wait for them to complete
1463 // before proceeding
1464 if (m_in_flight_status_updates > 0) {
1465 if (m_on_update_status_finish == nullptr) {
1466 dout(20) << "waiting for in-flight status update" << dendl;
1467 m_on_update_status_finish = new FunctionContext(
1468 [this, r](int _r) {
1469 shut_down(r);
1470 });
1471 }
1472 return;
1473 }
1474 }
1475
31f18b77
FG
1476 // NOTE: it's important to ensure that the local image is fully
1477 // closed before attempting to close the remote journal in
1478 // case the remote cluster is unreachable
1479
7c673cae
FG
1480 // chain the shut down sequence (reverse order)
1481 Context *ctx = new FunctionContext(
1482 [this, r](int _r) {
1483 update_mirror_image_status(true, STATE_STOPPED);
1484 handle_shut_down(r);
1485 });
31f18b77
FG
1486
1487 // close the remote journal
7c673cae
FG
1488 if (m_remote_journaler != nullptr) {
1489 ctx = new FunctionContext([this, ctx](int r) {
1490 delete m_remote_journaler;
1491 m_remote_journaler = nullptr;
1492 ctx->complete(0);
1493 });
1494 ctx = new FunctionContext([this, ctx](int r) {
1495 m_remote_journaler->remove_listener(&m_remote_listener);
1496 m_remote_journaler->shut_down(ctx);
1497 });
1498 if (m_stopping_for_resync) {
1499 ctx = new FunctionContext([this, ctx](int r) {
1500 m_remote_journaler->unregister_client(ctx);
1501 });
1502 }
1503 }
31f18b77
FG
1504
1505 // stop the replay of remote journal events
1506 if (m_replay_handler != nullptr) {
1507 ctx = new FunctionContext([this, ctx](int r) {
1508 delete m_replay_handler;
1509 m_replay_handler = nullptr;
1510
1511 m_event_replay_tracker.wait_for_ops(ctx);
1512 });
1513 ctx = new FunctionContext([this, ctx](int r) {
1514 m_remote_journaler->stop_replay(ctx);
1515 });
1516 }
1517
1518 // close the local image (release exclusive lock)
1519 if (m_local_image_ctx) {
1520 ctx = new FunctionContext([this, ctx](int r) {
1521 CloseImageRequest<I> *request = CloseImageRequest<I>::create(
1522 &m_local_image_ctx, ctx);
1523 request->send();
1524 });
1525 }
1526
1527 // shut down event replay into the local image
7c673cae
FG
1528 if (m_local_journal != nullptr) {
1529 ctx = new FunctionContext([this, ctx](int r) {
1530 m_local_journal = nullptr;
1531 ctx->complete(0);
1532 });
1533 if (m_local_replay != nullptr) {
1534 ctx = new FunctionContext([this, ctx](int r) {
1535 m_local_journal->stop_external_replay();
1536 m_local_replay = nullptr;
1537
1538 EventPreprocessor<I>::destroy(m_event_preprocessor);
1539 m_event_preprocessor = nullptr;
1540 ctx->complete(0);
1541 });
1542 }
1543 ctx = new FunctionContext([this, ctx](int r) {
7c673cae
FG
1544 // blocks if listener notification is in-progress
1545 m_local_journal->remove_listener(m_journal_listener);
7c673cae
FG
1546 ctx->complete(0);
1547 });
31f18b77
FG
1548 }
1549
1550 // wait for all local in-flight replay events to complete
1551 ctx = new FunctionContext([this, ctx](int r) {
1552 if (r < 0) {
1553 derr << "error shutting down journal replay: " << cpp_strerror(r)
1554 << dendl;
1555 }
1556
1557 m_event_replay_tracker.wait_for_ops(ctx);
1558 });
1559
1560 // flush any local in-flight replay events
1561 if (m_local_replay != nullptr) {
7c673cae 1562 ctx = new FunctionContext([this, ctx](int r) {
31f18b77 1563 m_local_replay->shut_down(true, ctx);
7c673cae
FG
1564 });
1565 }
31f18b77 1566
7c673cae
FG
1567 m_threads->work_queue->queue(ctx, 0);
1568}
1569
1570template <typename I>
1571void ImageReplayer<I>::handle_shut_down(int r) {
1572 reschedule_update_status_task(-1);
1573
1574 {
1575 Mutex::Locker locker(m_lock);
1576
1577 // if status updates are in-flight, wait for them to complete
1578 // before proceeding
1579 if (m_in_flight_status_updates > 0) {
1580 if (m_on_update_status_finish == nullptr) {
1581 dout(20) << "waiting for in-flight status update" << dendl;
1582 m_on_update_status_finish = new FunctionContext(
1583 [this, r](int _r) {
1584 handle_shut_down(r);
1585 });
1586 }
1587 return;
1588 }
1589
1590 if (m_stopping_for_resync) {
1591 m_image_deleter->schedule_image_delete(m_local,
1592 m_local_pool_id,
1593 m_global_image_id);
1594 m_stopping_for_resync = false;
1595 }
1596 }
1597
1598 dout(20) << "stop complete" << dendl;
1599 m_local_ioctx.close();
1600
1601 ReplayStatusFormatter<I>::destroy(m_replay_status_formatter);
1602 m_replay_status_formatter = nullptr;
1603
1604 Context *on_start = nullptr;
1605 Context *on_stop = nullptr;
1606 {
1607 Mutex::Locker locker(m_lock);
1608 std::swap(on_start, m_on_start_finish);
1609 std::swap(on_stop, m_on_stop_finish);
1610 m_stop_requested = false;
1611 assert(m_delayed_preprocess_task == nullptr);
1612 assert(m_state == STATE_STOPPING);
1613 m_state = STATE_STOPPED;
1614 }
1615
1616 if (on_start != nullptr) {
1617 dout(20) << "on start finish complete, r=" << r << dendl;
1618 on_start->complete(r);
1619 r = 0;
1620 }
1621 if (on_stop != nullptr) {
1622 dout(20) << "on stop finish complete, r=" << r << dendl;
1623 on_stop->complete(r);
1624 }
1625}
1626
1627template <typename I>
1628void ImageReplayer<I>::handle_remote_journal_metadata_updated() {
1629 dout(20) << dendl;
1630
1631 cls::journal::Client client;
1632 {
1633 Mutex::Locker locker(m_lock);
1634 if (!is_running_()) {
1635 return;
1636 }
1637
1638 int r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
1639 if (r < 0) {
1640 derr << "failed to retrieve client: " << cpp_strerror(r) << dendl;
1641 return;
1642 }
1643 }
1644
1645 if (client.state != cls::journal::CLIENT_STATE_CONNECTED) {
1646 dout(0) << "client flagged disconnected, stopping image replay" << dendl;
1647 stop(nullptr, false, -ENOTCONN, "disconnected");
1648 }
1649}
1650
1651template <typename I>
1652std::string ImageReplayer<I>::to_string(const State state) {
1653 switch (state) {
1654 case ImageReplayer<I>::STATE_STARTING:
1655 return "Starting";
1656 case ImageReplayer<I>::STATE_REPLAYING:
1657 return "Replaying";
1658 case ImageReplayer<I>::STATE_REPLAY_FLUSHING:
1659 return "ReplayFlushing";
1660 case ImageReplayer<I>::STATE_STOPPING:
1661 return "Stopping";
1662 case ImageReplayer<I>::STATE_STOPPED:
1663 return "Stopped";
1664 default:
1665 break;
1666 }
1667 return "Unknown(" + stringify(state) + ")";
1668}
1669
1670template <typename I>
1671void ImageReplayer<I>::resync_image(Context *on_finish) {
1672 dout(20) << dendl;
1673
1674 {
1675 Mutex::Locker l(m_lock);
1676 m_stopping_for_resync = true;
1677 }
1678
1679 stop(on_finish);
1680}
1681
1682template <typename I>
1683std::ostream &operator<<(std::ostream &os, const ImageReplayer<I> &replayer)
1684{
1685 os << "ImageReplayer: " << &replayer << " [" << replayer.get_local_pool_id()
1686 << "/" << replayer.get_global_image_id() << "]";
1687 return os;
1688}
1689
1690} // namespace mirror
1691} // namespace rbd
1692
1693template class rbd::mirror::ImageReplayer<librbd::ImageCtx>;