]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/ImageReplayer.cc
update sources to v12.1.2
[ceph.git] / ceph / src / tools / rbd_mirror / ImageReplayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/compat.h"
5 #include "common/Formatter.h"
6 #include "common/debug.h"
7 #include "common/errno.h"
8 #include "include/stringify.h"
9 #include "cls/rbd/cls_rbd_client.h"
10 #include "common/Timer.h"
11 #include "common/WorkQueue.h"
12 #include "global/global_context.h"
13 #include "journal/Journaler.h"
14 #include "journal/ReplayHandler.h"
15 #include "journal/Settings.h"
16 #include "librbd/ExclusiveLock.h"
17 #include "librbd/ImageCtx.h"
18 #include "librbd/ImageState.h"
19 #include "librbd/Journal.h"
20 #include "librbd/Operations.h"
21 #include "librbd/Utils.h"
22 #include "librbd/journal/Replay.h"
23 #include "ImageReplayer.h"
24 #include "Threads.h"
25 #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
26 #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
27 #include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
28 #include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
29 #include "tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h"
30
31 #define dout_context g_ceph_context
32 #define dout_subsys ceph_subsys_rbd_mirror
33 #undef dout_prefix
34 #define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
35 << __func__ << ": "
36
37 using std::map;
38 using std::string;
39 using std::unique_ptr;
40 using std::shared_ptr;
41 using std::vector;
42
43 namespace rbd {
44 namespace mirror {
45
46 using librbd::util::create_context_callback;
47 using librbd::util::create_rados_callback;
48 using namespace rbd::mirror::image_replayer;
49
50 template <typename I>
51 std::ostream &operator<<(std::ostream &os,
52 const typename ImageReplayer<I>::State &state);
53
54 namespace {
55
56 template <typename I>
57 struct ReplayHandler : public ::journal::ReplayHandler {
58 ImageReplayer<I> *replayer;
59 ReplayHandler(ImageReplayer<I> *replayer) : replayer(replayer) {}
60 void get() override {}
61 void put() override {}
62
63 void handle_entries_available() override {
64 replayer->handle_replay_ready();
65 }
66 void handle_complete(int r) override {
67 std::stringstream ss;
68 if (r < 0) {
69 ss << "replay completed with error: " << cpp_strerror(r);
70 }
71 replayer->handle_replay_complete(r, ss.str());
72 }
73 };
74
75 class ImageReplayerAdminSocketCommand {
76 public:
77 virtual ~ImageReplayerAdminSocketCommand() {}
78 virtual bool call(Formatter *f, stringstream *ss) = 0;
79 };
80
81 template <typename I>
82 class StatusCommand : public ImageReplayerAdminSocketCommand {
83 public:
84 explicit StatusCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
85
86 bool call(Formatter *f, stringstream *ss) override {
87 replayer->print_status(f, ss);
88 return true;
89 }
90
91 private:
92 ImageReplayer<I> *replayer;
93 };
94
95 template <typename I>
96 class StartCommand : public ImageReplayerAdminSocketCommand {
97 public:
98 explicit StartCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
99
100 bool call(Formatter *f, stringstream *ss) override {
101 replayer->start(nullptr, true);
102 return true;
103 }
104
105 private:
106 ImageReplayer<I> *replayer;
107 };
108
109 template <typename I>
110 class StopCommand : public ImageReplayerAdminSocketCommand {
111 public:
112 explicit StopCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
113
114 bool call(Formatter *f, stringstream *ss) override {
115 replayer->stop(nullptr, true);
116 return true;
117 }
118
119 private:
120 ImageReplayer<I> *replayer;
121 };
122
123 template <typename I>
124 class RestartCommand : public ImageReplayerAdminSocketCommand {
125 public:
126 explicit RestartCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
127
128 bool call(Formatter *f, stringstream *ss) override {
129 replayer->restart();
130 return true;
131 }
132
133 private:
134 ImageReplayer<I> *replayer;
135 };
136
137 template <typename I>
138 class FlushCommand : public ImageReplayerAdminSocketCommand {
139 public:
140 explicit FlushCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
141
142 bool call(Formatter *f, stringstream *ss) override {
143 C_SaferCond cond;
144 replayer->flush(&cond);
145 int r = cond.wait();
146 if (r < 0) {
147 *ss << "flush: " << cpp_strerror(r);
148 return false;
149 }
150 return true;
151 }
152
153 private:
154 ImageReplayer<I> *replayer;
155 };
156
157 template <typename I>
158 class ImageReplayerAdminSocketHook : public AdminSocketHook {
159 public:
160 ImageReplayerAdminSocketHook(CephContext *cct, const std::string &name,
161 ImageReplayer<I> *replayer)
162 : admin_socket(cct->get_admin_socket()),
163 lock("ImageReplayerAdminSocketHook::lock " +
164 replayer->get_global_image_id()) {
165 std::string command;
166 int r;
167
168 command = "rbd mirror status " + name;
169 r = admin_socket->register_command(command, command, this,
170 "get status for rbd mirror " + name);
171 if (r == 0) {
172 commands[command] = new StatusCommand<I>(replayer);
173 }
174
175 command = "rbd mirror start " + name;
176 r = admin_socket->register_command(command, command, this,
177 "start rbd mirror " + name);
178 if (r == 0) {
179 commands[command] = new StartCommand<I>(replayer);
180 }
181
182 command = "rbd mirror stop " + name;
183 r = admin_socket->register_command(command, command, this,
184 "stop rbd mirror " + name);
185 if (r == 0) {
186 commands[command] = new StopCommand<I>(replayer);
187 }
188
189 command = "rbd mirror restart " + name;
190 r = admin_socket->register_command(command, command, this,
191 "restart rbd mirror " + name);
192 if (r == 0) {
193 commands[command] = new RestartCommand<I>(replayer);
194 }
195
196 command = "rbd mirror flush " + name;
197 r = admin_socket->register_command(command, command, this,
198 "flush rbd mirror " + name);
199 if (r == 0) {
200 commands[command] = new FlushCommand<I>(replayer);
201 }
202 }
203
204 ~ImageReplayerAdminSocketHook() override {
205 Mutex::Locker locker(lock);
206 for (Commands::const_iterator i = commands.begin(); i != commands.end();
207 ++i) {
208 (void)admin_socket->unregister_command(i->first);
209 delete i->second;
210 }
211 commands.clear();
212 }
213
214 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
215 bufferlist& out) override {
216 Mutex::Locker locker(lock);
217 Commands::const_iterator i = commands.find(command);
218 assert(i != commands.end());
219 Formatter *f = Formatter::create(format);
220 stringstream ss;
221 bool r = i->second->call(f, &ss);
222 delete f;
223 out.append(ss);
224 return r;
225 }
226
227 private:
228 typedef std::map<std::string, ImageReplayerAdminSocketCommand*> Commands;
229
230 AdminSocket *admin_socket;
231 Mutex lock;
232 Commands commands;
233 };
234
235 uint32_t calculate_replay_delay(const utime_t &event_time,
236 int mirroring_replay_delay) {
237 if (mirroring_replay_delay <= 0) {
238 return 0;
239 }
240
241 utime_t now = ceph_clock_now();
242 if (event_time + mirroring_replay_delay <= now) {
243 return 0;
244 }
245
246 // ensure it is rounded up when converting to integer
247 return (event_time + mirroring_replay_delay - now) + 1;
248 }
249
250 } // anonymous namespace
251
252 template <typename I>
253 void ImageReplayer<I>::BootstrapProgressContext::update_progress(
254 const std::string &description, bool flush)
255 {
256 const std::string desc = "bootstrapping, " + description;
257 replayer->set_state_description(0, desc);
258 if (flush) {
259 replayer->update_mirror_image_status(false, boost::none);
260 }
261 }
262
263 template <typename I>
264 void ImageReplayer<I>::RemoteJournalerListener::handle_update(
265 ::journal::JournalMetadata *) {
266 FunctionContext *ctx = new FunctionContext([this](int r) {
267 replayer->handle_remote_journal_metadata_updated();
268 });
269 replayer->m_threads->work_queue->queue(ctx, 0);
270 }
271
272 template <typename I>
273 ImageReplayer<I>::ImageReplayer(Threads<librbd::ImageCtx> *threads,
274 ImageDeleter<I>* image_deleter,
275 InstanceWatcher<I> *instance_watcher,
276 RadosRef local,
277 const std::string &local_mirror_uuid,
278 int64_t local_pool_id,
279 const std::string &global_image_id) :
280 m_threads(threads),
281 m_image_deleter(image_deleter),
282 m_instance_watcher(instance_watcher),
283 m_local(local),
284 m_local_mirror_uuid(local_mirror_uuid),
285 m_local_pool_id(local_pool_id),
286 m_global_image_id(global_image_id),
287 m_lock("rbd::mirror::ImageReplayer " + stringify(local_pool_id) + " " +
288 global_image_id),
289 m_progress_cxt(this),
290 m_journal_listener(new JournalListener(this)),
291 m_remote_listener(this)
292 {
293 // Register asok commands using a temporary "remote_pool_name/global_image_id"
294 // name. When the image name becomes known on start the asok commands will be
295 // re-registered using "remote_pool_name/remote_image_name" name.
296
297 std::string pool_name;
298 int r = m_local->pool_reverse_lookup(m_local_pool_id, &pool_name);
299 if (r < 0) {
300 derr << "error resolving local pool " << m_local_pool_id
301 << ": " << cpp_strerror(r) << dendl;
302 pool_name = stringify(m_local_pool_id);
303 }
304
305 m_name = pool_name + "/" + m_global_image_id;
306 dout(20) << "registered asok hook: " << m_name << dendl;
307 m_asok_hook = new ImageReplayerAdminSocketHook<I>(g_ceph_context, m_name,
308 this);
309 }
310
311 template <typename I>
312 ImageReplayer<I>::~ImageReplayer()
313 {
314 assert(m_event_preprocessor == nullptr);
315 assert(m_replay_status_formatter == nullptr);
316 assert(m_local_image_ctx == nullptr);
317 assert(m_local_replay == nullptr);
318 assert(m_remote_journaler == nullptr);
319 assert(m_replay_handler == nullptr);
320 assert(m_on_start_finish == nullptr);
321 assert(m_on_stop_finish == nullptr);
322 assert(m_bootstrap_request == nullptr);
323 assert(m_in_flight_status_updates == 0);
324
325 delete m_journal_listener;
326 delete m_asok_hook;
327 }
328
329 template <typename I>
330 image_replayer::HealthState ImageReplayer<I>::get_health_state() const {
331 Mutex::Locker locker(m_lock);
332
333 if (!m_mirror_image_status_state) {
334 return image_replayer::HEALTH_STATE_OK;
335 } else if (*m_mirror_image_status_state ==
336 cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING ||
337 *m_mirror_image_status_state ==
338 cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN) {
339 return image_replayer::HEALTH_STATE_WARNING;
340 }
341 return image_replayer::HEALTH_STATE_ERROR;
342 }
343
344 template <typename I>
345 void ImageReplayer<I>::add_remote_image(const std::string &mirror_uuid,
346 const std::string &image_id,
347 librados::IoCtx &io_ctx) {
348 Mutex::Locker locker(m_lock);
349
350 RemoteImage remote_image(mirror_uuid, image_id, io_ctx);
351 auto it = m_remote_images.find(remote_image);
352 if (it == m_remote_images.end()) {
353 m_remote_images.insert(remote_image);
354 }
355 }
356
357 template <typename I>
358 void ImageReplayer<I>::remove_remote_image(const std::string &mirror_uuid,
359 const std::string &image_id,
360 bool schedule_delete) {
361 Mutex::Locker locker(m_lock);
362 m_remote_images.erase({mirror_uuid, image_id});
363 }
364
365 template <typename I>
366 bool ImageReplayer<I>::remote_images_empty() const {
367 Mutex::Locker locker(m_lock);
368 return m_remote_images.empty();
369 }
370
371 template <typename I>
372 void ImageReplayer<I>::set_state_description(int r, const std::string &desc) {
373 dout(20) << r << " " << desc << dendl;
374
375 Mutex::Locker l(m_lock);
376 m_last_r = r;
377 m_state_desc = desc;
378 }
379
380 template <typename I>
381 void ImageReplayer<I>::start(Context *on_finish, bool manual)
382 {
383 dout(20) << "on_finish=" << on_finish << dendl;
384
385 int r = 0;
386 {
387 Mutex::Locker locker(m_lock);
388 if (!is_stopped_()) {
389 derr << "already running" << dendl;
390 r = -EINVAL;
391 } else if (m_manual_stop && !manual) {
392 dout(5) << "stopped manually, ignoring start without manual flag"
393 << dendl;
394 r = -EPERM;
395 } else {
396 m_state = STATE_STARTING;
397 m_last_r = 0;
398 m_state_desc.clear();
399 m_manual_stop = false;
400
401 if (on_finish != nullptr) {
402 assert(m_on_start_finish == nullptr);
403 m_on_start_finish = on_finish;
404 }
405 assert(m_on_stop_finish == nullptr);
406 }
407 }
408
409 if (r < 0) {
410 if (on_finish) {
411 on_finish->complete(r);
412 }
413 return;
414 }
415
416 r = m_local->ioctx_create2(m_local_pool_id, m_local_ioctx);
417 if (r < 0) {
418 derr << "error opening ioctx for local pool " << m_local_pool_id
419 << ": " << cpp_strerror(r) << dendl;
420 on_start_fail(r, "error opening local pool");
421 return;
422 }
423
424 prepare_local_image();
425 }
426
427 template <typename I>
428 void ImageReplayer<I>::prepare_local_image() {
429 dout(20) << dendl;
430
431 Context *ctx = create_context_callback<
432 ImageReplayer, &ImageReplayer<I>::handle_prepare_local_image>(this);
433 auto req = PrepareLocalImageRequest<I>::create(
434 m_local_ioctx, m_global_image_id, &m_local_image_id,
435 &m_local_image_tag_owner, m_threads->work_queue, ctx);
436 req->send();
437 }
438
439 template <typename I>
440 void ImageReplayer<I>::handle_prepare_local_image(int r) {
441 dout(20) << "r=" << r << dendl;
442
443 if (r == -ENOENT) {
444 dout(20) << "local image does not exist" << dendl;
445 } else if (r < 0) {
446 on_start_fail(r, "error preparing local image for replay");
447 return;
448 } else if (m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) {
449 dout(5) << "local image is primary" << dendl;
450 on_start_fail(0, "local image is primary");
451 return;
452 }
453
454 // local image doesn't exist or is non-primary
455 bootstrap();
456 }
457
458 template <typename I>
459 void ImageReplayer<I>::bootstrap() {
460 dout(20) << dendl;
461
462 if (m_remote_images.empty()) {
463 on_start_fail(-EREMOTEIO, "waiting for primary remote image");
464 return;
465 }
466
467 // TODO bootstrap will need to support multiple remote images
468 m_remote_image = *m_remote_images.begin();
469
470 CephContext *cct = static_cast<CephContext *>(m_local->cct());
471 journal::Settings settings;
472 settings.commit_interval = cct->_conf->rbd_mirror_journal_commit_age;
473 settings.max_fetch_bytes = cct->_conf->rbd_mirror_journal_max_fetch_bytes;
474
475 m_remote_journaler = new Journaler(m_threads->work_queue,
476 m_threads->timer,
477 &m_threads->timer_lock,
478 m_remote_image.io_ctx,
479 m_remote_image.image_id,
480 m_local_mirror_uuid, settings);
481
482 Context *ctx = create_context_callback<
483 ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
484
485 BootstrapRequest<I> *request = BootstrapRequest<I>::create(
486 m_local_ioctx, m_remote_image.io_ctx, m_instance_watcher,
487 &m_local_image_ctx, m_local_image_id, m_remote_image.image_id,
488 m_global_image_id, m_threads->work_queue, m_threads->timer,
489 &m_threads->timer_lock, m_local_mirror_uuid, m_remote_image.mirror_uuid,
490 m_remote_journaler, &m_client_meta, ctx, &m_do_resync, &m_progress_cxt);
491
492 {
493 Mutex::Locker locker(m_lock);
494 request->get();
495 m_bootstrap_request = request;
496 }
497
498 update_mirror_image_status(false, boost::none);
499 reschedule_update_status_task(10);
500
501 request->send();
502 }
503
504 template <typename I>
505 void ImageReplayer<I>::handle_bootstrap(int r) {
506 dout(20) << "r=" << r << dendl;
507 {
508 Mutex::Locker locker(m_lock);
509 m_bootstrap_request->put();
510 m_bootstrap_request = nullptr;
511 if (m_local_image_ctx) {
512 m_local_image_id = m_local_image_ctx->id;
513 }
514 }
515
516 if (r == -EREMOTEIO) {
517 m_local_image_tag_owner = "";
518 dout(5) << "remote image is non-primary" << dendl;
519 on_start_fail(-EREMOTEIO, "remote image is non-primary");
520 return;
521 } else if (r == -EEXIST) {
522 m_local_image_tag_owner = "";
523 on_start_fail(r, "split-brain detected");
524 return;
525 } else if (r < 0) {
526 on_start_fail(r, "error bootstrapping replay");
527 return;
528 } else if (on_start_interrupted()) {
529 return;
530 }
531
532 assert(m_local_journal == nullptr);
533 {
534 RWLock::RLocker snap_locker(m_local_image_ctx->snap_lock);
535 if (m_local_image_ctx->journal != nullptr) {
536 m_local_journal = m_local_image_ctx->journal;
537 m_local_journal->add_listener(m_journal_listener);
538 }
539 }
540
541 if (m_local_journal == nullptr) {
542 on_start_fail(-EINVAL, "error accessing local journal");
543 return;
544 }
545
546 {
547 Mutex::Locker locker(m_lock);
548
549 if (m_do_resync) {
550 Context *on_finish = m_on_start_finish;
551 m_stopping_for_resync = true;
552 FunctionContext *ctx = new FunctionContext([this, on_finish](int r) {
553 if (r < 0) {
554 if (on_finish) {
555 on_finish->complete(r);
556 }
557 return;
558 }
559 resync_image(on_finish);
560 });
561 m_on_start_finish = ctx;
562 }
563
564 std::string name = m_local_ioctx.get_pool_name() + "/" +
565 m_local_image_ctx->name;
566 if (m_name != name) {
567 m_name = name;
568 if (m_asok_hook) {
569 // Re-register asok commands using the new name.
570 delete m_asok_hook;
571 m_asok_hook = nullptr;
572 }
573 }
574 if (!m_asok_hook) {
575 dout(20) << "registered asok hook: " << m_name << dendl;
576 m_asok_hook = new ImageReplayerAdminSocketHook<I>(g_ceph_context, m_name,
577 this);
578 }
579 }
580
581 update_mirror_image_status(false, boost::none);
582 init_remote_journaler();
583 }
584
585 template <typename I>
586 void ImageReplayer<I>::init_remote_journaler() {
587 dout(20) << dendl;
588
589 Context *ctx = create_context_callback<
590 ImageReplayer, &ImageReplayer<I>::handle_init_remote_journaler>(this);
591 m_remote_journaler->init(ctx);
592 }
593
594 template <typename I>
595 void ImageReplayer<I>::handle_init_remote_journaler(int r) {
596 dout(20) << "r=" << r << dendl;
597
598 if (r < 0) {
599 derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl;
600 on_start_fail(r, "error initializing remote journal");
601 return;
602 } else if (on_start_interrupted()) {
603 return;
604 }
605
606 m_remote_journaler->add_listener(&m_remote_listener);
607
608 cls::journal::Client client;
609 r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
610 if (r < 0) {
611 derr << "error retrieving remote journal client: " << cpp_strerror(r)
612 << dendl;
613 on_start_fail(r, "error retrieving remote journal client");
614 return;
615 }
616
617 if (client.state != cls::journal::CLIENT_STATE_CONNECTED) {
618 dout(5) << "client flagged disconnected, stopping image replay" << dendl;
619 if (m_local_image_ctx->mirroring_resync_after_disconnect) {
620 Mutex::Locker locker(m_lock);
621 m_stopping_for_resync = true;
622 }
623 on_start_fail(-ENOTCONN, "disconnected");
624 return;
625 }
626
627 start_replay();
628 }
629
630 template <typename I>
631 void ImageReplayer<I>::start_replay() {
632 dout(20) << dendl;
633
634 Context *start_ctx = create_context_callback<
635 ImageReplayer, &ImageReplayer<I>::handle_start_replay>(this);
636 m_local_journal->start_external_replay(&m_local_replay, start_ctx);
637 }
638
639 template <typename I>
640 void ImageReplayer<I>::handle_start_replay(int r) {
641 dout(20) << "r=" << r << dendl;
642
643 if (r < 0) {
644 assert(m_local_replay == nullptr);
645 derr << "error starting external replay on local image "
646 << m_local_image_id << ": " << cpp_strerror(r) << dendl;
647 on_start_fail(r, "error starting replay on local image");
648 return;
649 }
650
651 Context *on_finish(nullptr);
652 {
653 Mutex::Locker locker(m_lock);
654 assert(m_state == STATE_STARTING);
655 m_state = STATE_REPLAYING;
656 std::swap(m_on_start_finish, on_finish);
657 }
658
659 m_event_preprocessor = EventPreprocessor<I>::create(
660 *m_local_image_ctx, *m_remote_journaler, m_local_mirror_uuid,
661 &m_client_meta, m_threads->work_queue);
662 m_replay_status_formatter =
663 ReplayStatusFormatter<I>::create(m_remote_journaler, m_local_mirror_uuid);
664
665 update_mirror_image_status(true, boost::none);
666 reschedule_update_status_task(30);
667
668 dout(20) << "start succeeded" << dendl;
669 if (on_finish != nullptr) {
670 dout(20) << "on finish complete, r=" << r << dendl;
671 on_finish->complete(r);
672 }
673
674 if (on_replay_interrupted()) {
675 return;
676 }
677
678 {
679 CephContext *cct = static_cast<CephContext *>(m_local->cct());
680 double poll_seconds = cct->_conf->rbd_mirror_journal_poll_age;
681
682 Mutex::Locker locker(m_lock);
683 m_replay_handler = new ReplayHandler<I>(this);
684 m_remote_journaler->start_live_replay(m_replay_handler, poll_seconds);
685
686 dout(20) << "m_remote_journaler=" << *m_remote_journaler << dendl;
687 }
688
689 }
690
691 template <typename I>
692 void ImageReplayer<I>::on_start_fail(int r, const std::string &desc)
693 {
694 dout(20) << "r=" << r << dendl;
695 Context *ctx = new FunctionContext([this, r, desc](int _r) {
696 {
697 Mutex::Locker locker(m_lock);
698 assert(m_state == STATE_STARTING);
699 m_state = STATE_STOPPING;
700 if (r < 0 && r != -ECANCELED && r != -EREMOTEIO) {
701 derr << "start failed: " << cpp_strerror(r) << dendl;
702 } else {
703 dout(20) << "start canceled" << dendl;
704 }
705 }
706
707 set_state_description(r, desc);
708 update_mirror_image_status(false, boost::none);
709 reschedule_update_status_task(-1);
710 shut_down(r);
711 });
712 m_threads->work_queue->queue(ctx, 0);
713 }
714
715 template <typename I>
716 bool ImageReplayer<I>::on_start_interrupted()
717 {
718 Mutex::Locker locker(m_lock);
719 assert(m_state == STATE_STARTING);
720 if (m_on_stop_finish == nullptr) {
721 return false;
722 }
723
724 on_start_fail(-ECANCELED);
725 return true;
726 }
727
728 template <typename I>
729 void ImageReplayer<I>::stop(Context *on_finish, bool manual, int r,
730 const std::string& desc)
731 {
732 dout(20) << "on_finish=" << on_finish << ", manual=" << manual
733 << ", desc=" << desc << dendl;
734
735 image_replayer::BootstrapRequest<I> *bootstrap_request = nullptr;
736 bool shut_down_replay = false;
737 bool running = true;
738 bool canceled_task = false;
739 {
740 Mutex::Locker locker(m_lock);
741
742 if (!is_running_()) {
743 running = false;
744 } else {
745 if (!is_stopped_()) {
746 if (m_state == STATE_STARTING) {
747 dout(20) << "canceling start" << dendl;
748 if (m_bootstrap_request) {
749 bootstrap_request = m_bootstrap_request;
750 bootstrap_request->get();
751 }
752 } else {
753 dout(20) << "interrupting replay" << dendl;
754 shut_down_replay = true;
755 }
756
757 assert(m_on_stop_finish == nullptr);
758 std::swap(m_on_stop_finish, on_finish);
759 m_stop_requested = true;
760 m_manual_stop = manual;
761
762 Mutex::Locker timer_locker(m_threads->timer_lock);
763 if (m_delayed_preprocess_task != nullptr) {
764 canceled_task = m_threads->timer->cancel_event(
765 m_delayed_preprocess_task);
766 assert(canceled_task);
767 m_delayed_preprocess_task = nullptr;
768 }
769 }
770 }
771 }
772
773 // avoid holding lock since bootstrap request will update status
774 if (bootstrap_request != nullptr) {
775 bootstrap_request->cancel();
776 bootstrap_request->put();
777 }
778
779 if (canceled_task) {
780 m_event_replay_tracker.finish_op();
781 on_replay_interrupted();
782 }
783
784 if (!running) {
785 dout(20) << "not running" << dendl;
786 if (on_finish) {
787 on_finish->complete(-EINVAL);
788 }
789 return;
790 }
791
792 if (shut_down_replay) {
793 on_stop_journal_replay(r, desc);
794 } else if (on_finish != nullptr) {
795 on_finish->complete(0);
796 }
797 }
798
799 template <typename I>
800 void ImageReplayer<I>::on_stop_journal_replay(int r, const std::string &desc)
801 {
802 dout(20) << "enter" << dendl;
803
804 {
805 Mutex::Locker locker(m_lock);
806 if (m_state != STATE_REPLAYING) {
807 // might be invoked multiple times while stopping
808 return;
809 }
810 m_stop_requested = true;
811 m_state = STATE_STOPPING;
812 }
813
814 set_state_description(r, desc);
815 update_mirror_image_status(false, boost::none);
816 reschedule_update_status_task(-1);
817 shut_down(0);
818 }
819
820 template <typename I>
821 void ImageReplayer<I>::handle_replay_ready()
822 {
823 dout(20) << "enter" << dendl;
824 if (on_replay_interrupted()) {
825 return;
826 }
827
828 if (!m_remote_journaler->try_pop_front(&m_replay_entry, &m_replay_tag_tid)) {
829 return;
830 }
831
832 m_event_replay_tracker.start_op();
833
834 m_lock.Lock();
835 bool stopping = (m_state == STATE_STOPPING);
836 m_lock.Unlock();
837
838 if (stopping) {
839 dout(10) << "stopping event replay" << dendl;
840 m_event_replay_tracker.finish_op();
841 return;
842 }
843
844 if (m_replay_tag_valid && m_replay_tag.tid == m_replay_tag_tid) {
845 preprocess_entry();
846 return;
847 }
848
849 replay_flush();
850 }
851
852 template <typename I>
853 void ImageReplayer<I>::restart(Context *on_finish)
854 {
855 FunctionContext *ctx = new FunctionContext(
856 [this, on_finish](int r) {
857 if (r < 0) {
858 // Try start anyway.
859 }
860 start(on_finish, true);
861 });
862 stop(ctx);
863 }
864
865 template <typename I>
866 void ImageReplayer<I>::flush(Context *on_finish)
867 {
868 dout(20) << "enter" << dendl;
869
870 {
871 Mutex::Locker locker(m_lock);
872 if (m_state == STATE_REPLAYING) {
873 Context *ctx = new FunctionContext(
874 [on_finish](int r) {
875 if (on_finish != nullptr) {
876 on_finish->complete(r);
877 }
878 });
879 on_flush_local_replay_flush_start(ctx);
880 return;
881 }
882 }
883
884 if (on_finish) {
885 on_finish->complete(0);
886 }
887 }
888
889 template <typename I>
890 void ImageReplayer<I>::on_flush_local_replay_flush_start(Context *on_flush)
891 {
892 dout(20) << "enter" << dendl;
893 FunctionContext *ctx = new FunctionContext(
894 [this, on_flush](int r) {
895 on_flush_local_replay_flush_finish(on_flush, r);
896 });
897
898 assert(m_lock.is_locked());
899 assert(m_state == STATE_REPLAYING);
900 m_local_replay->flush(ctx);
901 }
902
903 template <typename I>
904 void ImageReplayer<I>::on_flush_local_replay_flush_finish(Context *on_flush,
905 int r)
906 {
907 dout(20) << "r=" << r << dendl;
908 if (r < 0) {
909 derr << "error flushing local replay: " << cpp_strerror(r) << dendl;
910 on_flush->complete(r);
911 return;
912 }
913
914 on_flush_flush_commit_position_start(on_flush);
915 }
916
917 template <typename I>
918 void ImageReplayer<I>::on_flush_flush_commit_position_start(Context *on_flush)
919 {
920 FunctionContext *ctx = new FunctionContext(
921 [this, on_flush](int r) {
922 on_flush_flush_commit_position_finish(on_flush, r);
923 });
924
925 m_remote_journaler->flush_commit_position(ctx);
926 }
927
928 template <typename I>
929 void ImageReplayer<I>::on_flush_flush_commit_position_finish(Context *on_flush,
930 int r)
931 {
932 if (r < 0) {
933 derr << "error flushing remote journal commit position: "
934 << cpp_strerror(r) << dendl;
935 }
936
937 update_mirror_image_status(false, boost::none);
938
939 dout(20) << "flush complete, r=" << r << dendl;
940 on_flush->complete(r);
941 }
942
943 template <typename I>
944 bool ImageReplayer<I>::on_replay_interrupted()
945 {
946 bool shut_down;
947 {
948 Mutex::Locker locker(m_lock);
949 shut_down = m_stop_requested;
950 }
951
952 if (shut_down) {
953 on_stop_journal_replay();
954 }
955 return shut_down;
956 }
957
958 template <typename I>
959 void ImageReplayer<I>::print_status(Formatter *f, stringstream *ss)
960 {
961 dout(20) << "enter" << dendl;
962
963 Mutex::Locker l(m_lock);
964
965 if (f) {
966 f->open_object_section("image_replayer");
967 f->dump_string("name", m_name);
968 f->dump_string("state", to_string(m_state));
969 f->close_section();
970 f->flush(*ss);
971 } else {
972 *ss << m_name << ": state: " << to_string(m_state);
973 }
974 }
975
976 template <typename I>
977 void ImageReplayer<I>::handle_replay_complete(int r, const std::string &error_desc)
978 {
979 dout(20) << "r=" << r << dendl;
980 if (r < 0) {
981 derr << "replay encountered an error: " << cpp_strerror(r) << dendl;
982 set_state_description(r, error_desc);
983 }
984
985 {
986 Mutex::Locker locker(m_lock);
987 m_stop_requested = true;
988 }
989 on_replay_interrupted();
990 }
991
992 template <typename I>
993 void ImageReplayer<I>::replay_flush() {
994 dout(20) << dendl;
995
996 bool interrupted = false;
997 {
998 Mutex::Locker locker(m_lock);
999 if (m_state != STATE_REPLAYING) {
1000 dout(20) << "replay interrupted" << dendl;
1001 interrupted = true;
1002 } else {
1003 m_state = STATE_REPLAY_FLUSHING;
1004 }
1005 }
1006
1007 if (interrupted) {
1008 m_event_replay_tracker.finish_op();
1009 return;
1010 }
1011
1012 // shut down the replay to flush all IO and ops and create a new
1013 // replayer to handle the new tag epoch
1014 Context *ctx = create_context_callback<
1015 ImageReplayer<I>, &ImageReplayer<I>::handle_replay_flush>(this);
1016 ctx = new FunctionContext([this, ctx](int r) {
1017 m_local_image_ctx->journal->stop_external_replay();
1018 m_local_replay = nullptr;
1019
1020 if (r < 0) {
1021 ctx->complete(r);
1022 return;
1023 }
1024
1025 m_local_journal->start_external_replay(&m_local_replay, ctx);
1026 });
1027 m_local_replay->shut_down(false, ctx);
1028 }
1029
1030 template <typename I>
1031 void ImageReplayer<I>::handle_replay_flush(int r) {
1032 dout(20) << "r=" << r << dendl;
1033
1034 {
1035 Mutex::Locker locker(m_lock);
1036 assert(m_state == STATE_REPLAY_FLUSHING);
1037 m_state = STATE_REPLAYING;
1038 }
1039
1040 if (r < 0) {
1041 derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl;
1042 m_event_replay_tracker.finish_op();
1043 handle_replay_complete(r, "replay flush encountered an error");
1044 return;
1045 } else if (on_replay_interrupted()) {
1046 m_event_replay_tracker.finish_op();
1047 return;
1048 }
1049
1050 get_remote_tag();
1051 }
1052
1053 template <typename I>
1054 void ImageReplayer<I>::get_remote_tag() {
1055 dout(20) << "tag_tid: " << m_replay_tag_tid << dendl;
1056
1057 Context *ctx = create_context_callback<
1058 ImageReplayer, &ImageReplayer<I>::handle_get_remote_tag>(this);
1059 m_remote_journaler->get_tag(m_replay_tag_tid, &m_replay_tag, ctx);
1060 }
1061
1062 template <typename I>
1063 void ImageReplayer<I>::handle_get_remote_tag(int r) {
1064 dout(20) << "r=" << r << dendl;
1065
1066 if (r == 0) {
1067 try {
1068 bufferlist::iterator it = m_replay_tag.data.begin();
1069 ::decode(m_replay_tag_data, it);
1070 } catch (const buffer::error &err) {
1071 r = -EBADMSG;
1072 }
1073 }
1074
1075 if (r < 0) {
1076 derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": "
1077 << cpp_strerror(r) << dendl;
1078 m_event_replay_tracker.finish_op();
1079 handle_replay_complete(r, "failed to retrieve remote tag");
1080 return;
1081 }
1082
1083 m_replay_tag_valid = true;
1084 dout(20) << "decoded remote tag " << m_replay_tag_tid << ": "
1085 << m_replay_tag_data << dendl;
1086
1087 allocate_local_tag();
1088 }
1089
1090 template <typename I>
1091 void ImageReplayer<I>::allocate_local_tag() {
1092 dout(20) << dendl;
1093
1094 std::string mirror_uuid = m_replay_tag_data.mirror_uuid;
1095 if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID ||
1096 mirror_uuid == m_local_mirror_uuid) {
1097 mirror_uuid = m_remote_image.mirror_uuid;
1098 } else if (mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) {
1099 dout(5) << "encountered image demotion: stopping" << dendl;
1100 Mutex::Locker locker(m_lock);
1101 m_stop_requested = true;
1102 }
1103
1104 librbd::journal::TagPredecessor predecessor(m_replay_tag_data.predecessor);
1105 if (predecessor.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
1106 predecessor.mirror_uuid = m_remote_image.mirror_uuid;
1107 } else if (predecessor.mirror_uuid == m_local_mirror_uuid) {
1108 predecessor.mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
1109 }
1110
1111 dout(20) << "mirror_uuid=" << mirror_uuid << ", "
1112 << "predecessor_mirror_uuid=" << predecessor.mirror_uuid << ", "
1113 << "replay_tag_tid=" << m_replay_tag_tid << ", "
1114 << "replay_tag_data=" << m_replay_tag_data << dendl;
1115 Context *ctx = create_context_callback<
1116 ImageReplayer, &ImageReplayer<I>::handle_allocate_local_tag>(this);
1117 m_local_journal->allocate_tag(mirror_uuid, predecessor, ctx);
1118 }
1119
1120 template <typename I>
1121 void ImageReplayer<I>::handle_allocate_local_tag(int r) {
1122 dout(20) << "r=" << r << dendl;
1123
1124 if (r < 0) {
1125 derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl;
1126 m_event_replay_tracker.finish_op();
1127 handle_replay_complete(r, "failed to allocate journal tag");
1128 return;
1129 }
1130
1131 preprocess_entry();
1132 }
1133
1134 template <typename I>
1135 void ImageReplayer<I>::preprocess_entry() {
1136 dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid()
1137 << dendl;
1138
1139 bufferlist data = m_replay_entry.get_data();
1140 bufferlist::iterator it = data.begin();
1141 int r = m_local_replay->decode(&it, &m_event_entry);
1142 if (r < 0) {
1143 derr << "failed to decode journal event" << dendl;
1144 m_event_replay_tracker.finish_op();
1145 handle_replay_complete(r, "failed to decode journal event");
1146 return;
1147 }
1148
1149 uint32_t delay = calculate_replay_delay(
1150 m_event_entry.timestamp, m_local_image_ctx->mirroring_replay_delay);
1151 if (delay == 0) {
1152 handle_preprocess_entry_ready(0);
1153 return;
1154 }
1155
1156 dout(20) << "delaying replay by " << delay << " sec" << dendl;
1157
1158 Mutex::Locker timer_locker(m_threads->timer_lock);
1159 assert(m_delayed_preprocess_task == nullptr);
1160 m_delayed_preprocess_task = new FunctionContext(
1161 [this](int r) {
1162 assert(m_threads->timer_lock.is_locked());
1163 m_delayed_preprocess_task = nullptr;
1164 m_threads->work_queue->queue(
1165 create_context_callback<ImageReplayer,
1166 &ImageReplayer<I>::handle_preprocess_entry_ready>(this), 0);
1167 });
1168 m_threads->timer->add_event_after(delay, m_delayed_preprocess_task);
1169 }
1170
1171 template <typename I>
1172 void ImageReplayer<I>::handle_preprocess_entry_ready(int r) {
1173 dout(20) << "r=" << r << dendl;
1174 assert(r == 0);
1175
1176 if (!m_event_preprocessor->is_required(m_event_entry)) {
1177 process_entry();
1178 return;
1179 }
1180
1181 Context *ctx = create_context_callback<
1182 ImageReplayer, &ImageReplayer<I>::handle_preprocess_entry_safe>(this);
1183 m_event_preprocessor->preprocess(&m_event_entry, ctx);
1184 }
1185
1186 template <typename I>
1187 void ImageReplayer<I>::handle_preprocess_entry_safe(int r) {
1188 dout(20) << "r=" << r << dendl;
1189
1190 if (r < 0) {
1191 m_event_replay_tracker.finish_op();
1192
1193 if (r == -ECANCELED) {
1194 handle_replay_complete(0, "lost exclusive lock");
1195 } else {
1196 derr << "failed to preprocess journal event" << dendl;
1197 handle_replay_complete(r, "failed to preprocess journal event");
1198 }
1199 return;
1200 }
1201
1202 process_entry();
1203 }
1204
1205 template <typename I>
1206 void ImageReplayer<I>::process_entry() {
1207 dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid()
1208 << dendl;
1209
1210 // stop replaying events if stop has been requested
1211 if (on_replay_interrupted()) {
1212 m_event_replay_tracker.finish_op();
1213 return;
1214 }
1215
1216 Context *on_ready = create_context_callback<
1217 ImageReplayer, &ImageReplayer<I>::handle_process_entry_ready>(this);
1218 Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry));
1219
1220 m_local_replay->process(m_event_entry, on_ready, on_commit);
1221 }
1222
1223 template <typename I>
1224 void ImageReplayer<I>::handle_process_entry_ready(int r) {
1225 dout(20) << dendl;
1226 assert(r == 0);
1227
1228 // attempt to process the next event
1229 handle_replay_ready();
1230 }
1231
1232 template <typename I>
1233 void ImageReplayer<I>::handle_process_entry_safe(const ReplayEntry& replay_entry,
1234 int r) {
1235 dout(20) << "commit_tid=" << replay_entry.get_commit_tid() << ", r=" << r
1236 << dendl;
1237
1238 if (r < 0) {
1239 derr << "failed to commit journal event: " << cpp_strerror(r) << dendl;
1240 handle_replay_complete(r, "failed to commit journal event");
1241 } else {
1242 assert(m_remote_journaler != nullptr);
1243 m_remote_journaler->committed(replay_entry);
1244 }
1245 m_event_replay_tracker.finish_op();
1246 }
1247
1248 template <typename I>
1249 bool ImageReplayer<I>::update_mirror_image_status(bool force,
1250 const OptionalState &state) {
1251 dout(20) << dendl;
1252 {
1253 Mutex::Locker locker(m_lock);
1254 if (!start_mirror_image_status_update(force, false)) {
1255 return false;
1256 }
1257 }
1258
1259 queue_mirror_image_status_update(state);
1260 return true;
1261 }
1262
1263 template <typename I>
1264 bool ImageReplayer<I>::start_mirror_image_status_update(bool force,
1265 bool restarting) {
1266 assert(m_lock.is_locked());
1267
1268 if (!force && !is_stopped_()) {
1269 if (!is_running_()) {
1270 dout(20) << "shut down in-progress: ignoring update" << dendl;
1271 return false;
1272 } else if (m_in_flight_status_updates > (restarting ? 1 : 0)) {
1273 dout(20) << "already sending update" << dendl;
1274 m_update_status_requested = true;
1275 return false;
1276 }
1277 }
1278
1279 dout(20) << dendl;
1280 ++m_in_flight_status_updates;
1281 return true;
1282 }
1283
1284 template <typename I>
1285 void ImageReplayer<I>::finish_mirror_image_status_update() {
1286 Context *on_finish = nullptr;
1287 {
1288 Mutex::Locker locker(m_lock);
1289 assert(m_in_flight_status_updates > 0);
1290 if (--m_in_flight_status_updates > 0) {
1291 dout(20) << "waiting on " << m_in_flight_status_updates << " in-flight "
1292 << "updates" << dendl;
1293 return;
1294 }
1295
1296 std::swap(on_finish, m_on_update_status_finish);
1297 }
1298
1299 dout(20) << dendl;
1300 if (on_finish != nullptr) {
1301 on_finish->complete(0);
1302 }
1303 }
1304
1305 template <typename I>
1306 void ImageReplayer<I>::queue_mirror_image_status_update(const OptionalState &state) {
1307 dout(20) << dendl;
1308 FunctionContext *ctx = new FunctionContext(
1309 [this, state](int r) {
1310 send_mirror_status_update(state);
1311 });
1312 m_threads->work_queue->queue(ctx, 0);
1313 }
1314
1315 template <typename I>
1316 void ImageReplayer<I>::send_mirror_status_update(const OptionalState &opt_state) {
1317 State state;
1318 std::string state_desc;
1319 int last_r;
1320 bool stopping_replay;
1321
1322 OptionalMirrorImageStatusState mirror_image_status_state{
1323 boost::make_optional(false, cls::rbd::MirrorImageStatusState{})};
1324 image_replayer::BootstrapRequest<I>* bootstrap_request = nullptr;
1325 {
1326 Mutex::Locker locker(m_lock);
1327 state = m_state;
1328 state_desc = m_state_desc;
1329 mirror_image_status_state = m_mirror_image_status_state;
1330 last_r = m_last_r;
1331 stopping_replay = (m_local_image_ctx != nullptr);
1332
1333 if (m_bootstrap_request != nullptr) {
1334 bootstrap_request = m_bootstrap_request;
1335 bootstrap_request->get();
1336 }
1337 }
1338
1339 bool syncing = false;
1340 if (bootstrap_request != nullptr) {
1341 syncing = bootstrap_request->is_syncing();
1342 bootstrap_request->put();
1343 bootstrap_request = nullptr;
1344 }
1345
1346 if (opt_state) {
1347 state = *opt_state;
1348 }
1349
1350 cls::rbd::MirrorImageStatus status;
1351 status.up = true;
1352 switch (state) {
1353 case STATE_STARTING:
1354 if (syncing) {
1355 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING;
1356 status.description = state_desc.empty() ? "syncing" : state_desc;
1357 mirror_image_status_state = status.state;
1358 } else {
1359 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY;
1360 status.description = "starting replay";
1361 }
1362 break;
1363 case STATE_REPLAYING:
1364 case STATE_REPLAY_FLUSHING:
1365 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
1366 {
1367 Context *on_req_finish = new FunctionContext(
1368 [this](int r) {
1369 dout(20) << "replay status ready: r=" << r << dendl;
1370 if (r >= 0) {
1371 send_mirror_status_update(boost::none);
1372 } else if (r == -EAGAIN) {
1373 // decrement in-flight status update counter
1374 handle_mirror_status_update(r);
1375 }
1376 });
1377
1378 std::string desc;
1379 if (!m_replay_status_formatter->get_or_send_update(&desc,
1380 on_req_finish)) {
1381 dout(20) << "waiting for replay status" << dendl;
1382 return;
1383 }
1384 status.description = "replaying, " + desc;
1385 mirror_image_status_state = boost::none;
1386 }
1387 break;
1388 case STATE_STOPPING:
1389 if (stopping_replay) {
1390 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY;
1391 status.description = "stopping replay";
1392 break;
1393 }
1394 // FALLTHROUGH
1395 case STATE_STOPPED:
1396 if (last_r == -EREMOTEIO) {
1397 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN;
1398 status.description = state_desc;
1399 mirror_image_status_state = status.state;
1400 } else if (last_r < 0) {
1401 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR;
1402 status.description = state_desc;
1403 mirror_image_status_state = status.state;
1404 } else {
1405 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED;
1406 status.description = state_desc.empty() ? "stopped" : state_desc;
1407 mirror_image_status_state = boost::none;
1408 }
1409 break;
1410 default:
1411 assert(!"invalid state");
1412 }
1413
1414 {
1415 Mutex::Locker locker(m_lock);
1416 m_mirror_image_status_state = mirror_image_status_state;
1417 }
1418
1419 // prevent the status from ping-ponging when failed replays are restarted
1420 if (mirror_image_status_state &&
1421 *mirror_image_status_state == cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR) {
1422 status.state = *mirror_image_status_state;
1423 }
1424
1425 dout(20) << "status=" << status << dendl;
1426 librados::ObjectWriteOperation op;
1427 librbd::cls_client::mirror_image_status_set(&op, m_global_image_id, status);
1428
1429 librados::AioCompletion *aio_comp = create_rados_callback<
1430 ImageReplayer<I>, &ImageReplayer<I>::handle_mirror_status_update>(this);
1431 int r = m_local_ioctx.aio_operate(RBD_MIRRORING, aio_comp, &op);
1432 assert(r == 0);
1433 aio_comp->release();
1434 }
1435
1436 template <typename I>
1437 void ImageReplayer<I>::handle_mirror_status_update(int r) {
1438 dout(20) << "r=" << r << dendl;
1439
1440 bool running = false;
1441 bool started = false;
1442 {
1443 Mutex::Locker locker(m_lock);
1444 bool update_status_requested = false;
1445 std::swap(update_status_requested, m_update_status_requested);
1446
1447 running = is_running_();
1448 if (running && update_status_requested) {
1449 started = start_mirror_image_status_update(false, true);
1450 }
1451 }
1452
1453 // if a deferred update is available, send it -- otherwise reschedule
1454 // the timer task
1455 if (started) {
1456 queue_mirror_image_status_update(boost::none);
1457 } else if (running) {
1458 reschedule_update_status_task();
1459 }
1460
1461 // mark committed status update as no longer in-flight
1462 finish_mirror_image_status_update();
1463 }
1464
1465 template <typename I>
1466 void ImageReplayer<I>::reschedule_update_status_task(int new_interval) {
1467 dout(20) << dendl;
1468
1469 bool canceled_task = false;
1470 {
1471 Mutex::Locker locker(m_lock);
1472 Mutex::Locker timer_locker(m_threads->timer_lock);
1473
1474 if (m_update_status_task) {
1475 canceled_task = m_threads->timer->cancel_event(m_update_status_task);
1476 m_update_status_task = nullptr;
1477 }
1478
1479 if (new_interval > 0) {
1480 m_update_status_interval = new_interval;
1481 }
1482
1483 bool restarting = (new_interval == 0 || canceled_task);
1484 if (new_interval >= 0 && is_running_() &&
1485 start_mirror_image_status_update(false, restarting)) {
1486 m_update_status_task = new FunctionContext(
1487 [this](int r) {
1488 assert(m_threads->timer_lock.is_locked());
1489 m_update_status_task = nullptr;
1490
1491 queue_mirror_image_status_update(boost::none);
1492 });
1493 m_threads->timer->add_event_after(m_update_status_interval,
1494 m_update_status_task);
1495 }
1496 }
1497
1498 if (canceled_task) {
1499 dout(20) << "canceled task" << dendl;
1500 finish_mirror_image_status_update();
1501 }
1502 }
1503
1504 template <typename I>
1505 void ImageReplayer<I>::shut_down(int r) {
1506 dout(20) << "r=" << r << dendl;
1507 {
1508 Mutex::Locker locker(m_lock);
1509 assert(m_state == STATE_STOPPING);
1510
1511 // if status updates are in-flight, wait for them to complete
1512 // before proceeding
1513 if (m_in_flight_status_updates > 0) {
1514 if (m_on_update_status_finish == nullptr) {
1515 dout(20) << "waiting for in-flight status update" << dendl;
1516 m_on_update_status_finish = new FunctionContext(
1517 [this, r](int _r) {
1518 shut_down(r);
1519 });
1520 }
1521 return;
1522 }
1523 }
1524
1525 // NOTE: it's important to ensure that the local image is fully
1526 // closed before attempting to close the remote journal in
1527 // case the remote cluster is unreachable
1528
1529 // chain the shut down sequence (reverse order)
1530 Context *ctx = new FunctionContext(
1531 [this, r](int _r) {
1532 update_mirror_image_status(true, STATE_STOPPED);
1533 handle_shut_down(r);
1534 });
1535
1536 // close the remote journal
1537 if (m_remote_journaler != nullptr) {
1538 ctx = new FunctionContext([this, ctx](int r) {
1539 delete m_remote_journaler;
1540 m_remote_journaler = nullptr;
1541 ctx->complete(0);
1542 });
1543 ctx = new FunctionContext([this, ctx](int r) {
1544 m_remote_journaler->remove_listener(&m_remote_listener);
1545 m_remote_journaler->shut_down(ctx);
1546 });
1547 if (m_stopping_for_resync) {
1548 ctx = new FunctionContext([this, ctx](int r) {
1549 m_remote_journaler->unregister_client(ctx);
1550 });
1551 }
1552 }
1553
1554 // stop the replay of remote journal events
1555 if (m_replay_handler != nullptr) {
1556 ctx = new FunctionContext([this, ctx](int r) {
1557 delete m_replay_handler;
1558 m_replay_handler = nullptr;
1559
1560 m_event_replay_tracker.wait_for_ops(ctx);
1561 });
1562 ctx = new FunctionContext([this, ctx](int r) {
1563 m_remote_journaler->stop_replay(ctx);
1564 });
1565 }
1566
1567 // close the local image (release exclusive lock)
1568 if (m_local_image_ctx) {
1569 ctx = new FunctionContext([this, ctx](int r) {
1570 CloseImageRequest<I> *request = CloseImageRequest<I>::create(
1571 &m_local_image_ctx, ctx);
1572 request->send();
1573 });
1574 }
1575
1576 // shut down event replay into the local image
1577 if (m_local_journal != nullptr) {
1578 ctx = new FunctionContext([this, ctx](int r) {
1579 m_local_journal = nullptr;
1580 ctx->complete(0);
1581 });
1582 if (m_local_replay != nullptr) {
1583 ctx = new FunctionContext([this, ctx](int r) {
1584 m_local_journal->stop_external_replay();
1585 m_local_replay = nullptr;
1586
1587 EventPreprocessor<I>::destroy(m_event_preprocessor);
1588 m_event_preprocessor = nullptr;
1589 ctx->complete(0);
1590 });
1591 }
1592 ctx = new FunctionContext([this, ctx](int r) {
1593 // blocks if listener notification is in-progress
1594 m_local_journal->remove_listener(m_journal_listener);
1595 ctx->complete(0);
1596 });
1597 }
1598
1599 // wait for all local in-flight replay events to complete
1600 ctx = new FunctionContext([this, ctx](int r) {
1601 if (r < 0) {
1602 derr << "error shutting down journal replay: " << cpp_strerror(r)
1603 << dendl;
1604 }
1605
1606 m_event_replay_tracker.wait_for_ops(ctx);
1607 });
1608
1609 // flush any local in-flight replay events
1610 if (m_local_replay != nullptr) {
1611 ctx = new FunctionContext([this, ctx](int r) {
1612 m_local_replay->shut_down(true, ctx);
1613 });
1614 }
1615
1616 m_threads->work_queue->queue(ctx, 0);
1617 }
1618
1619 template <typename I>
1620 void ImageReplayer<I>::handle_shut_down(int r) {
1621 reschedule_update_status_task(-1);
1622
1623 {
1624 Mutex::Locker locker(m_lock);
1625
1626 // if status updates are in-flight, wait for them to complete
1627 // before proceeding
1628 if (m_in_flight_status_updates > 0) {
1629 if (m_on_update_status_finish == nullptr) {
1630 dout(20) << "waiting for in-flight status update" << dendl;
1631 m_on_update_status_finish = new FunctionContext(
1632 [this, r](int _r) {
1633 handle_shut_down(r);
1634 });
1635 }
1636 return;
1637 }
1638
1639 if (m_stopping_for_resync) {
1640 m_image_deleter->schedule_image_delete(m_local,
1641 m_local_pool_id,
1642 m_global_image_id,
1643 true);
1644 m_stopping_for_resync = false;
1645 }
1646 }
1647
1648 dout(20) << "stop complete" << dendl;
1649 m_local_ioctx.close();
1650
1651 ReplayStatusFormatter<I>::destroy(m_replay_status_formatter);
1652 m_replay_status_formatter = nullptr;
1653
1654 Context *on_start = nullptr;
1655 Context *on_stop = nullptr;
1656 {
1657 Mutex::Locker locker(m_lock);
1658 std::swap(on_start, m_on_start_finish);
1659 std::swap(on_stop, m_on_stop_finish);
1660 m_stop_requested = false;
1661 assert(m_delayed_preprocess_task == nullptr);
1662 assert(m_state == STATE_STOPPING);
1663 m_state = STATE_STOPPED;
1664 }
1665
1666 if (on_start != nullptr) {
1667 dout(20) << "on start finish complete, r=" << r << dendl;
1668 on_start->complete(r);
1669 r = 0;
1670 }
1671 if (on_stop != nullptr) {
1672 dout(20) << "on stop finish complete, r=" << r << dendl;
1673 on_stop->complete(r);
1674 }
1675 }
1676
1677 template <typename I>
1678 void ImageReplayer<I>::handle_remote_journal_metadata_updated() {
1679 dout(20) << dendl;
1680
1681 cls::journal::Client client;
1682 {
1683 Mutex::Locker locker(m_lock);
1684 if (!is_running_()) {
1685 return;
1686 }
1687
1688 int r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
1689 if (r < 0) {
1690 derr << "failed to retrieve client: " << cpp_strerror(r) << dendl;
1691 return;
1692 }
1693 }
1694
1695 if (client.state != cls::journal::CLIENT_STATE_CONNECTED) {
1696 dout(0) << "client flagged disconnected, stopping image replay" << dendl;
1697 stop(nullptr, false, -ENOTCONN, "disconnected");
1698 }
1699 }
1700
1701 template <typename I>
1702 std::string ImageReplayer<I>::to_string(const State state) {
1703 switch (state) {
1704 case ImageReplayer<I>::STATE_STARTING:
1705 return "Starting";
1706 case ImageReplayer<I>::STATE_REPLAYING:
1707 return "Replaying";
1708 case ImageReplayer<I>::STATE_REPLAY_FLUSHING:
1709 return "ReplayFlushing";
1710 case ImageReplayer<I>::STATE_STOPPING:
1711 return "Stopping";
1712 case ImageReplayer<I>::STATE_STOPPED:
1713 return "Stopped";
1714 default:
1715 break;
1716 }
1717 return "Unknown(" + stringify(state) + ")";
1718 }
1719
1720 template <typename I>
1721 void ImageReplayer<I>::resync_image(Context *on_finish) {
1722 dout(20) << dendl;
1723
1724 {
1725 Mutex::Locker l(m_lock);
1726 m_stopping_for_resync = true;
1727 }
1728
1729 stop(on_finish);
1730 }
1731
1732 template <typename I>
1733 std::ostream &operator<<(std::ostream &os, const ImageReplayer<I> &replayer)
1734 {
1735 os << "ImageReplayer: " << &replayer << " [" << replayer.get_local_pool_id()
1736 << "/" << replayer.get_global_image_id() << "]";
1737 return os;
1738 }
1739
1740 } // namespace mirror
1741 } // namespace rbd
1742
1743 template class rbd::mirror::ImageReplayer<librbd::ImageCtx>;