]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/ImageReplayer.cc
update sources to v12.1.3
[ceph.git] / ceph / src / tools / rbd_mirror / ImageReplayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/compat.h"
5 #include "common/Formatter.h"
6 #include "common/debug.h"
7 #include "common/errno.h"
8 #include "include/stringify.h"
9 #include "cls/rbd/cls_rbd_client.h"
10 #include "common/Timer.h"
11 #include "common/WorkQueue.h"
12 #include "global/global_context.h"
13 #include "journal/Journaler.h"
14 #include "journal/ReplayHandler.h"
15 #include "journal/Settings.h"
16 #include "librbd/ExclusiveLock.h"
17 #include "librbd/ImageCtx.h"
18 #include "librbd/ImageState.h"
19 #include "librbd/Journal.h"
20 #include "librbd/Operations.h"
21 #include "librbd/Utils.h"
22 #include "librbd/journal/Replay.h"
23 #include "ImageDeleter.h"
24 #include "ImageReplayer.h"
25 #include "Threads.h"
26 #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
27 #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
28 #include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
29 #include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
30 #include "tools/rbd_mirror/image_replayer/PrepareRemoteImageRequest.h"
31 #include "tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h"
32
33 #define dout_context g_ceph_context
34 #define dout_subsys ceph_subsys_rbd_mirror
35 #undef dout_prefix
36 #define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
37 << __func__ << ": "
38
39 using std::map;
40 using std::string;
41 using std::unique_ptr;
42 using std::shared_ptr;
43 using std::vector;
44
45 namespace rbd {
46 namespace mirror {
47
48 using librbd::util::create_context_callback;
49 using librbd::util::create_rados_callback;
50 using namespace rbd::mirror::image_replayer;
51
52 template <typename I>
53 std::ostream &operator<<(std::ostream &os,
54 const typename ImageReplayer<I>::State &state);
55
56 namespace {
57
58 template <typename I>
59 struct ReplayHandler : public ::journal::ReplayHandler {
60 ImageReplayer<I> *replayer;
61 ReplayHandler(ImageReplayer<I> *replayer) : replayer(replayer) {}
62 void get() override {}
63 void put() override {}
64
65 void handle_entries_available() override {
66 replayer->handle_replay_ready();
67 }
68 void handle_complete(int r) override {
69 std::stringstream ss;
70 if (r < 0) {
71 ss << "replay completed with error: " << cpp_strerror(r);
72 }
73 replayer->handle_replay_complete(r, ss.str());
74 }
75 };
76
77 class ImageReplayerAdminSocketCommand {
78 public:
79 virtual ~ImageReplayerAdminSocketCommand() {}
80 virtual bool call(Formatter *f, stringstream *ss) = 0;
81 };
82
83 template <typename I>
84 class StatusCommand : public ImageReplayerAdminSocketCommand {
85 public:
86 explicit StatusCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
87
88 bool call(Formatter *f, stringstream *ss) override {
89 replayer->print_status(f, ss);
90 return true;
91 }
92
93 private:
94 ImageReplayer<I> *replayer;
95 };
96
97 template <typename I>
98 class StartCommand : public ImageReplayerAdminSocketCommand {
99 public:
100 explicit StartCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
101
102 bool call(Formatter *f, stringstream *ss) override {
103 replayer->start(nullptr, true);
104 return true;
105 }
106
107 private:
108 ImageReplayer<I> *replayer;
109 };
110
111 template <typename I>
112 class StopCommand : public ImageReplayerAdminSocketCommand {
113 public:
114 explicit StopCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
115
116 bool call(Formatter *f, stringstream *ss) override {
117 replayer->stop(nullptr, true);
118 return true;
119 }
120
121 private:
122 ImageReplayer<I> *replayer;
123 };
124
125 template <typename I>
126 class RestartCommand : public ImageReplayerAdminSocketCommand {
127 public:
128 explicit RestartCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
129
130 bool call(Formatter *f, stringstream *ss) override {
131 replayer->restart();
132 return true;
133 }
134
135 private:
136 ImageReplayer<I> *replayer;
137 };
138
139 template <typename I>
140 class FlushCommand : public ImageReplayerAdminSocketCommand {
141 public:
142 explicit FlushCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
143
144 bool call(Formatter *f, stringstream *ss) override {
145 C_SaferCond cond;
146 replayer->flush(&cond);
147 int r = cond.wait();
148 if (r < 0) {
149 *ss << "flush: " << cpp_strerror(r);
150 return false;
151 }
152 return true;
153 }
154
155 private:
156 ImageReplayer<I> *replayer;
157 };
158
159 template <typename I>
160 class ImageReplayerAdminSocketHook : public AdminSocketHook {
161 public:
162 ImageReplayerAdminSocketHook(CephContext *cct, const std::string &name,
163 ImageReplayer<I> *replayer)
164 : admin_socket(cct->get_admin_socket()), name(name), replayer(replayer),
165 lock("ImageReplayerAdminSocketHook::lock " +
166 replayer->get_global_image_id()) {
167 }
168
169 int register_commands() {
170 std::string command;
171 int r;
172
173 command = "rbd mirror status " + name;
174 r = admin_socket->register_command(command, command, this,
175 "get status for rbd mirror " + name);
176 if (r < 0) {
177 return r;
178 }
179 commands[command] = new StatusCommand<I>(replayer);
180
181 command = "rbd mirror start " + name;
182 r = admin_socket->register_command(command, command, this,
183 "start rbd mirror " + name);
184 if (r < 0) {
185 return r;
186 }
187 commands[command] = new StartCommand<I>(replayer);
188
189 command = "rbd mirror stop " + name;
190 r = admin_socket->register_command(command, command, this,
191 "stop rbd mirror " + name);
192 if (r < 0) {
193 return r;
194 }
195 commands[command] = new StopCommand<I>(replayer);
196
197 command = "rbd mirror restart " + name;
198 r = admin_socket->register_command(command, command, this,
199 "restart rbd mirror " + name);
200 if (r < 0) {
201 return r;
202 }
203 commands[command] = new RestartCommand<I>(replayer);
204
205 command = "rbd mirror flush " + name;
206 r = admin_socket->register_command(command, command, this,
207 "flush rbd mirror " + name);
208 if (r < 0) {
209 return r;
210 }
211 commands[command] = new FlushCommand<I>(replayer);
212
213 return 0;
214 }
215
216 ~ImageReplayerAdminSocketHook() override {
217 Mutex::Locker locker(lock);
218 for (Commands::const_iterator i = commands.begin(); i != commands.end();
219 ++i) {
220 (void)admin_socket->unregister_command(i->first);
221 delete i->second;
222 }
223 commands.clear();
224 }
225
226 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
227 bufferlist& out) override {
228 Mutex::Locker locker(lock);
229 Commands::const_iterator i = commands.find(command);
230 assert(i != commands.end());
231 Formatter *f = Formatter::create(format);
232 stringstream ss;
233 bool r = i->second->call(f, &ss);
234 delete f;
235 out.append(ss);
236 return r;
237 }
238
239 private:
240 typedef std::map<std::string, ImageReplayerAdminSocketCommand*> Commands;
241
242 AdminSocket *admin_socket;
243 std::string name;
244 ImageReplayer<I> *replayer;
245 Mutex lock;
246 Commands commands;
247 };
248
249 uint32_t calculate_replay_delay(const utime_t &event_time,
250 int mirroring_replay_delay) {
251 if (mirroring_replay_delay <= 0) {
252 return 0;
253 }
254
255 utime_t now = ceph_clock_now();
256 if (event_time + mirroring_replay_delay <= now) {
257 return 0;
258 }
259
260 // ensure it is rounded up when converting to integer
261 return (event_time + mirroring_replay_delay - now) + 1;
262 }
263
264 } // anonymous namespace
265
266 template <typename I>
267 void ImageReplayer<I>::BootstrapProgressContext::update_progress(
268 const std::string &description, bool flush)
269 {
270 const std::string desc = "bootstrapping, " + description;
271 replayer->set_state_description(0, desc);
272 if (flush) {
273 replayer->update_mirror_image_status(false, boost::none);
274 }
275 }
276
277 template <typename I>
278 void ImageReplayer<I>::RemoteJournalerListener::handle_update(
279 ::journal::JournalMetadata *) {
280 FunctionContext *ctx = new FunctionContext([this](int r) {
281 replayer->handle_remote_journal_metadata_updated();
282 });
283 replayer->m_threads->work_queue->queue(ctx, 0);
284 }
285
286 template <typename I>
287 ImageReplayer<I>::ImageReplayer(Threads<I> *threads,
288 ImageDeleter<I>* image_deleter,
289 InstanceWatcher<I> *instance_watcher,
290 RadosRef local,
291 const std::string &local_mirror_uuid,
292 int64_t local_pool_id,
293 const std::string &global_image_id) :
294 m_threads(threads),
295 m_image_deleter(image_deleter),
296 m_instance_watcher(instance_watcher),
297 m_local(local),
298 m_local_mirror_uuid(local_mirror_uuid),
299 m_local_pool_id(local_pool_id),
300 m_global_image_id(global_image_id),
301 m_lock("rbd::mirror::ImageReplayer " + stringify(local_pool_id) + " " +
302 global_image_id),
303 m_progress_cxt(this),
304 m_journal_listener(new JournalListener(this)),
305 m_remote_listener(this)
306 {
307 // Register asok commands using a temporary "remote_pool_name/global_image_id"
308 // name. When the image name becomes known on start the asok commands will be
309 // re-registered using "remote_pool_name/remote_image_name" name.
310
311 std::string pool_name;
312 int r = m_local->pool_reverse_lookup(m_local_pool_id, &pool_name);
313 if (r < 0) {
314 derr << "error resolving local pool " << m_local_pool_id
315 << ": " << cpp_strerror(r) << dendl;
316 pool_name = stringify(m_local_pool_id);
317 }
318
319 m_name = pool_name + "/" + m_global_image_id;
320 register_admin_socket_hook();
321 }
322
323 template <typename I>
324 ImageReplayer<I>::~ImageReplayer()
325 {
326 unregister_admin_socket_hook();
327 assert(m_event_preprocessor == nullptr);
328 assert(m_replay_status_formatter == nullptr);
329 assert(m_local_image_ctx == nullptr);
330 assert(m_local_replay == nullptr);
331 assert(m_remote_journaler == nullptr);
332 assert(m_replay_handler == nullptr);
333 assert(m_on_start_finish == nullptr);
334 assert(m_on_stop_finish == nullptr);
335 assert(m_bootstrap_request == nullptr);
336 assert(m_in_flight_status_updates == 0);
337
338 delete m_journal_listener;
339 }
340
341 template <typename I>
342 image_replayer::HealthState ImageReplayer<I>::get_health_state() const {
343 Mutex::Locker locker(m_lock);
344
345 if (!m_mirror_image_status_state) {
346 return image_replayer::HEALTH_STATE_OK;
347 } else if (*m_mirror_image_status_state ==
348 cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING ||
349 *m_mirror_image_status_state ==
350 cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN) {
351 return image_replayer::HEALTH_STATE_WARNING;
352 }
353 return image_replayer::HEALTH_STATE_ERROR;
354 }
355
356 template <typename I>
357 void ImageReplayer<I>::add_peer(const std::string &peer_uuid,
358 librados::IoCtx &io_ctx) {
359 Mutex::Locker locker(m_lock);
360 auto it = m_peers.find({peer_uuid});
361 if (it == m_peers.end()) {
362 m_peers.insert({peer_uuid, io_ctx});
363 }
364 }
365
366 template <typename I>
367 void ImageReplayer<I>::set_state_description(int r, const std::string &desc) {
368 dout(20) << r << " " << desc << dendl;
369
370 Mutex::Locker l(m_lock);
371 m_last_r = r;
372 m_state_desc = desc;
373 }
374
375 template <typename I>
376 void ImageReplayer<I>::start(Context *on_finish, bool manual)
377 {
378 dout(20) << "on_finish=" << on_finish << dendl;
379
380 int r = 0;
381 {
382 Mutex::Locker locker(m_lock);
383 if (!is_stopped_()) {
384 derr << "already running" << dendl;
385 r = -EINVAL;
386 } else if (m_manual_stop && !manual) {
387 dout(5) << "stopped manually, ignoring start without manual flag"
388 << dendl;
389 r = -EPERM;
390 } else {
391 m_state = STATE_STARTING;
392 m_last_r = 0;
393 m_state_desc.clear();
394 m_manual_stop = false;
395 m_delete_requested = false;
396
397 if (on_finish != nullptr) {
398 assert(m_on_start_finish == nullptr);
399 m_on_start_finish = on_finish;
400 }
401 assert(m_on_stop_finish == nullptr);
402 }
403 }
404
405 if (r < 0) {
406 if (on_finish) {
407 on_finish->complete(r);
408 }
409 return;
410 }
411
412 r = m_local->ioctx_create2(m_local_pool_id, m_local_ioctx);
413 if (r < 0) {
414 derr << "error opening ioctx for local pool " << m_local_pool_id
415 << ": " << cpp_strerror(r) << dendl;
416 on_start_fail(r, "error opening local pool");
417 return;
418 }
419
420 wait_for_deletion();
421 }
422
423 template <typename I>
424 void ImageReplayer<I>::wait_for_deletion() {
425 dout(20) << dendl;
426
427 Context *ctx = create_context_callback<
428 ImageReplayer, &ImageReplayer<I>::handle_wait_for_deletion>(this);
429 m_image_deleter->wait_for_scheduled_deletion(
430 m_local_pool_id, m_global_image_id, ctx, false);
431 }
432
433 template <typename I>
434 void ImageReplayer<I>::handle_wait_for_deletion(int r) {
435 dout(20) << "r=" << r << dendl;
436
437 if (r == -ECANCELED) {
438 on_start_fail(0, "");
439 return;
440 } else if (r < 0) {
441 on_start_fail(r, "error waiting for image deletion");
442 return;
443 }
444
445 prepare_local_image();
446 }
447
448 template <typename I>
449 void ImageReplayer<I>::prepare_local_image() {
450 dout(20) << dendl;
451
452 m_local_image_id = "";
453 Context *ctx = create_context_callback<
454 ImageReplayer, &ImageReplayer<I>::handle_prepare_local_image>(this);
455 auto req = PrepareLocalImageRequest<I>::create(
456 m_local_ioctx, m_global_image_id, &m_local_image_id,
457 &m_local_image_tag_owner, m_threads->work_queue, ctx);
458 req->send();
459 }
460
461 template <typename I>
462 void ImageReplayer<I>::handle_prepare_local_image(int r) {
463 dout(20) << "r=" << r << dendl;
464
465 if (r == -ENOENT) {
466 dout(20) << "local image does not exist" << dendl;
467 } else if (r < 0) {
468 on_start_fail(r, "error preparing local image for replay");
469 return;
470 } else if (m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) {
471 dout(5) << "local image is primary" << dendl;
472 on_start_fail(0, "local image is primary");
473 return;
474 }
475
476 // local image doesn't exist or is non-primary
477 prepare_remote_image();
478 }
479
480 template <typename I>
481 void ImageReplayer<I>::prepare_remote_image() {
482 dout(20) << dendl;
483
484 // TODO need to support multiple remote images
485 assert(!m_peers.empty());
486 m_remote_image = {*m_peers.begin()};
487
488 Context *ctx = create_context_callback<
489 ImageReplayer, &ImageReplayer<I>::handle_prepare_remote_image>(this);
490 auto req = PrepareRemoteImageRequest<I>::create(
491 m_remote_image.io_ctx, m_global_image_id, &m_remote_image.mirror_uuid,
492 &m_remote_image.image_id, ctx);
493 req->send();
494 }
495
496 template <typename I>
497 void ImageReplayer<I>::handle_prepare_remote_image(int r) {
498 dout(20) << "r=" << r << dendl;
499
500 if (r == -ENOENT) {
501 dout(20) << "remote image does not exist" << dendl;
502
503 // TODO need to support multiple remote images
504 if (!m_local_image_id.empty() &&
505 m_local_image_tag_owner == m_remote_image.mirror_uuid) {
506 // local image exists and is non-primary and linked to the missing
507 // remote image
508
509 m_delete_requested = true;
510 on_start_fail(0, "remote image no longer exists");
511 } else {
512 on_start_fail(-ENOENT, "remote image does not exist");
513 }
514 return;
515 } else if (r < 0) {
516 on_start_fail(r, "error retrieving remote image id");
517 return;
518 }
519
520 bootstrap();
521 }
522
523 template <typename I>
524 void ImageReplayer<I>::bootstrap() {
525 dout(20) << dendl;
526
527 CephContext *cct = static_cast<CephContext *>(m_local->cct());
528 journal::Settings settings;
529 settings.commit_interval = cct->_conf->rbd_mirror_journal_commit_age;
530 settings.max_fetch_bytes = cct->_conf->rbd_mirror_journal_max_fetch_bytes;
531
532 m_remote_journaler = new Journaler(m_threads->work_queue,
533 m_threads->timer,
534 &m_threads->timer_lock,
535 m_remote_image.io_ctx,
536 m_remote_image.image_id,
537 m_local_mirror_uuid, settings);
538
539 Context *ctx = create_context_callback<
540 ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
541
542 BootstrapRequest<I> *request = BootstrapRequest<I>::create(
543 m_local_ioctx, m_remote_image.io_ctx, m_instance_watcher,
544 &m_local_image_ctx, m_local_image_id, m_remote_image.image_id,
545 m_global_image_id, m_threads->work_queue, m_threads->timer,
546 &m_threads->timer_lock, m_local_mirror_uuid, m_remote_image.mirror_uuid,
547 m_remote_journaler, &m_client_meta, ctx, &m_resync_requested,
548 &m_progress_cxt);
549
550 {
551 Mutex::Locker locker(m_lock);
552 request->get();
553 m_bootstrap_request = request;
554 }
555
556 update_mirror_image_status(false, boost::none);
557 reschedule_update_status_task(10);
558
559 request->send();
560 }
561
562 template <typename I>
563 void ImageReplayer<I>::handle_bootstrap(int r) {
564 dout(20) << "r=" << r << dendl;
565 {
566 Mutex::Locker locker(m_lock);
567 m_bootstrap_request->put();
568 m_bootstrap_request = nullptr;
569 if (m_local_image_ctx) {
570 m_local_image_id = m_local_image_ctx->id;
571 }
572 }
573
574 if (r == -EREMOTEIO) {
575 m_local_image_tag_owner = "";
576 dout(5) << "remote image is non-primary" << dendl;
577 on_start_fail(-EREMOTEIO, "remote image is non-primary");
578 return;
579 } else if (r == -EEXIST) {
580 m_local_image_tag_owner = "";
581 on_start_fail(r, "split-brain detected");
582 return;
583 } else if (r < 0) {
584 on_start_fail(r, "error bootstrapping replay");
585 return;
586 } else if (on_start_interrupted()) {
587 return;
588 } else if (m_resync_requested) {
589 on_start_fail(0, "resync requested");
590 return;
591 }
592
593 assert(m_local_journal == nullptr);
594 {
595 RWLock::RLocker snap_locker(m_local_image_ctx->snap_lock);
596 if (m_local_image_ctx->journal != nullptr) {
597 m_local_journal = m_local_image_ctx->journal;
598 m_local_journal->add_listener(m_journal_listener);
599 }
600 }
601
602 if (m_local_journal == nullptr) {
603 on_start_fail(-EINVAL, "error accessing local journal");
604 return;
605 }
606
607 {
608 Mutex::Locker locker(m_lock);
609 std::string name = m_local_ioctx.get_pool_name() + "/" +
610 m_local_image_ctx->name;
611 if (m_name != name) {
612 m_name = name;
613 if (m_asok_hook) {
614 // Re-register asok commands using the new name.
615 delete m_asok_hook;
616 m_asok_hook = nullptr;
617 }
618 }
619 register_admin_socket_hook();
620 }
621
622 update_mirror_image_status(false, boost::none);
623 init_remote_journaler();
624 }
625
626 template <typename I>
627 void ImageReplayer<I>::init_remote_journaler() {
628 dout(20) << dendl;
629
630 Context *ctx = create_context_callback<
631 ImageReplayer, &ImageReplayer<I>::handle_init_remote_journaler>(this);
632 m_remote_journaler->init(ctx);
633 }
634
635 template <typename I>
636 void ImageReplayer<I>::handle_init_remote_journaler(int r) {
637 dout(20) << "r=" << r << dendl;
638
639 if (r < 0) {
640 derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl;
641 on_start_fail(r, "error initializing remote journal");
642 return;
643 } else if (on_start_interrupted()) {
644 return;
645 }
646
647 m_remote_journaler->add_listener(&m_remote_listener);
648
649 cls::journal::Client client;
650 r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
651 if (r < 0) {
652 derr << "error retrieving remote journal client: " << cpp_strerror(r)
653 << dendl;
654 on_start_fail(r, "error retrieving remote journal client");
655 return;
656 }
657
658 derr << "image_id=" << m_local_image_id << ", "
659 << "m_client_meta.image_id=" << m_client_meta.image_id << ", "
660 << "client.state=" << client.state << dendl;
661 if (m_client_meta.image_id == m_local_image_id &&
662 client.state != cls::journal::CLIENT_STATE_CONNECTED) {
663 dout(5) << "client flagged disconnected, stopping image replay" << dendl;
664 if (m_local_image_ctx->mirroring_resync_after_disconnect) {
665 m_resync_requested = true;
666 on_start_fail(-ENOTCONN, "disconnected: automatic resync");
667 } else {
668 on_start_fail(-ENOTCONN, "disconnected");
669 }
670 return;
671 }
672
673 start_replay();
674 }
675
676 template <typename I>
677 void ImageReplayer<I>::start_replay() {
678 dout(20) << dendl;
679
680 Context *start_ctx = create_context_callback<
681 ImageReplayer, &ImageReplayer<I>::handle_start_replay>(this);
682 m_local_journal->start_external_replay(&m_local_replay, start_ctx);
683 }
684
685 template <typename I>
686 void ImageReplayer<I>::handle_start_replay(int r) {
687 dout(20) << "r=" << r << dendl;
688
689 if (r < 0) {
690 assert(m_local_replay == nullptr);
691 derr << "error starting external replay on local image "
692 << m_local_image_id << ": " << cpp_strerror(r) << dendl;
693 on_start_fail(r, "error starting replay on local image");
694 return;
695 }
696
697 Context *on_finish(nullptr);
698 {
699 Mutex::Locker locker(m_lock);
700 assert(m_state == STATE_STARTING);
701 m_state = STATE_REPLAYING;
702 std::swap(m_on_start_finish, on_finish);
703 }
704
705 m_event_preprocessor = EventPreprocessor<I>::create(
706 *m_local_image_ctx, *m_remote_journaler, m_local_mirror_uuid,
707 &m_client_meta, m_threads->work_queue);
708 m_replay_status_formatter =
709 ReplayStatusFormatter<I>::create(m_remote_journaler, m_local_mirror_uuid);
710
711 update_mirror_image_status(true, boost::none);
712 reschedule_update_status_task(30);
713
714 if (on_replay_interrupted()) {
715 return;
716 }
717
718 {
719 CephContext *cct = static_cast<CephContext *>(m_local->cct());
720 double poll_seconds = cct->_conf->rbd_mirror_journal_poll_age;
721
722 Mutex::Locker locker(m_lock);
723 m_replay_handler = new ReplayHandler<I>(this);
724 m_remote_journaler->start_live_replay(m_replay_handler, poll_seconds);
725
726 dout(20) << "m_remote_journaler=" << *m_remote_journaler << dendl;
727 }
728
729 dout(20) << "start succeeded" << dendl;
730 if (on_finish != nullptr) {
731 dout(20) << "on finish complete, r=" << r << dendl;
732 on_finish->complete(r);
733 }
734 }
735
736 template <typename I>
737 void ImageReplayer<I>::on_start_fail(int r, const std::string &desc)
738 {
739 dout(20) << "r=" << r << dendl;
740 Context *ctx = new FunctionContext([this, r, desc](int _r) {
741 {
742 Mutex::Locker locker(m_lock);
743 assert(m_state == STATE_STARTING);
744 m_state = STATE_STOPPING;
745 if (r < 0 && r != -ECANCELED && r != -EREMOTEIO && r != -ENOENT) {
746 derr << "start failed: " << cpp_strerror(r) << dendl;
747 } else {
748 dout(20) << "start canceled" << dendl;
749 }
750 }
751
752 set_state_description(r, desc);
753 update_mirror_image_status(false, boost::none);
754 reschedule_update_status_task(-1);
755 shut_down(r);
756 });
757 m_threads->work_queue->queue(ctx, 0);
758 }
759
760 template <typename I>
761 bool ImageReplayer<I>::on_start_interrupted()
762 {
763 Mutex::Locker locker(m_lock);
764 assert(m_state == STATE_STARTING);
765 if (m_on_stop_finish == nullptr) {
766 return false;
767 }
768
769 on_start_fail(-ECANCELED);
770 return true;
771 }
772
773 template <typename I>
774 void ImageReplayer<I>::stop(Context *on_finish, bool manual, int r,
775 const std::string& desc)
776 {
777 dout(20) << "on_finish=" << on_finish << ", manual=" << manual
778 << ", desc=" << desc << dendl;
779
780 m_image_deleter->cancel_waiter(m_local_pool_id, m_global_image_id);
781
782 image_replayer::BootstrapRequest<I> *bootstrap_request = nullptr;
783 bool shut_down_replay = false;
784 bool running = true;
785 bool canceled_task = false;
786 {
787 Mutex::Locker locker(m_lock);
788
789 if (!is_running_()) {
790 running = false;
791 } else {
792 if (!is_stopped_()) {
793 if (m_state == STATE_STARTING) {
794 dout(20) << "canceling start" << dendl;
795 if (m_bootstrap_request) {
796 bootstrap_request = m_bootstrap_request;
797 bootstrap_request->get();
798 }
799 } else {
800 dout(20) << "interrupting replay" << dendl;
801 shut_down_replay = true;
802 }
803
804 assert(m_on_stop_finish == nullptr);
805 std::swap(m_on_stop_finish, on_finish);
806 m_stop_requested = true;
807 m_manual_stop = manual;
808
809 Mutex::Locker timer_locker(m_threads->timer_lock);
810 if (m_delayed_preprocess_task != nullptr) {
811 canceled_task = m_threads->timer->cancel_event(
812 m_delayed_preprocess_task);
813 assert(canceled_task);
814 m_delayed_preprocess_task = nullptr;
815 }
816 }
817 }
818 }
819
820 // avoid holding lock since bootstrap request will update status
821 if (bootstrap_request != nullptr) {
822 bootstrap_request->cancel();
823 bootstrap_request->put();
824 }
825
826 if (canceled_task) {
827 m_event_replay_tracker.finish_op();
828 on_replay_interrupted();
829 }
830
831 if (!running) {
832 dout(20) << "not running" << dendl;
833 if (on_finish) {
834 on_finish->complete(-EINVAL);
835 }
836 return;
837 }
838
839 if (shut_down_replay) {
840 on_stop_journal_replay(r, desc);
841 } else if (on_finish != nullptr) {
842 on_finish->complete(0);
843 }
844 }
845
846 template <typename I>
847 void ImageReplayer<I>::on_stop_journal_replay(int r, const std::string &desc)
848 {
849 dout(20) << "enter" << dendl;
850
851 {
852 Mutex::Locker locker(m_lock);
853 if (m_state != STATE_REPLAYING) {
854 // might be invoked multiple times while stopping
855 return;
856 }
857 m_stop_requested = true;
858 m_state = STATE_STOPPING;
859 }
860
861 set_state_description(r, desc);
862 update_mirror_image_status(false, boost::none);
863 reschedule_update_status_task(-1);
864 shut_down(0);
865 }
866
867 template <typename I>
868 void ImageReplayer<I>::handle_replay_ready()
869 {
870 dout(20) << "enter" << dendl;
871 if (on_replay_interrupted()) {
872 return;
873 }
874
875 if (!m_remote_journaler->try_pop_front(&m_replay_entry, &m_replay_tag_tid)) {
876 return;
877 }
878
879 m_event_replay_tracker.start_op();
880
881 m_lock.Lock();
882 bool stopping = (m_state == STATE_STOPPING);
883 m_lock.Unlock();
884
885 if (stopping) {
886 dout(10) << "stopping event replay" << dendl;
887 m_event_replay_tracker.finish_op();
888 return;
889 }
890
891 if (m_replay_tag_valid && m_replay_tag.tid == m_replay_tag_tid) {
892 preprocess_entry();
893 return;
894 }
895
896 replay_flush();
897 }
898
899 template <typename I>
900 void ImageReplayer<I>::restart(Context *on_finish)
901 {
902 FunctionContext *ctx = new FunctionContext(
903 [this, on_finish](int r) {
904 if (r < 0) {
905 // Try start anyway.
906 }
907 start(on_finish, true);
908 });
909 stop(ctx);
910 }
911
912 template <typename I>
913 void ImageReplayer<I>::flush(Context *on_finish)
914 {
915 dout(20) << "enter" << dendl;
916
917 {
918 Mutex::Locker locker(m_lock);
919 if (m_state == STATE_REPLAYING) {
920 Context *ctx = new FunctionContext(
921 [on_finish](int r) {
922 if (on_finish != nullptr) {
923 on_finish->complete(r);
924 }
925 });
926 on_flush_local_replay_flush_start(ctx);
927 return;
928 }
929 }
930
931 if (on_finish) {
932 on_finish->complete(0);
933 }
934 }
935
936 template <typename I>
937 void ImageReplayer<I>::on_flush_local_replay_flush_start(Context *on_flush)
938 {
939 dout(20) << "enter" << dendl;
940 FunctionContext *ctx = new FunctionContext(
941 [this, on_flush](int r) {
942 on_flush_local_replay_flush_finish(on_flush, r);
943 });
944
945 assert(m_lock.is_locked());
946 assert(m_state == STATE_REPLAYING);
947 m_local_replay->flush(ctx);
948 }
949
950 template <typename I>
951 void ImageReplayer<I>::on_flush_local_replay_flush_finish(Context *on_flush,
952 int r)
953 {
954 dout(20) << "r=" << r << dendl;
955 if (r < 0) {
956 derr << "error flushing local replay: " << cpp_strerror(r) << dendl;
957 on_flush->complete(r);
958 return;
959 }
960
961 on_flush_flush_commit_position_start(on_flush);
962 }
963
964 template <typename I>
965 void ImageReplayer<I>::on_flush_flush_commit_position_start(Context *on_flush)
966 {
967 FunctionContext *ctx = new FunctionContext(
968 [this, on_flush](int r) {
969 on_flush_flush_commit_position_finish(on_flush, r);
970 });
971
972 m_remote_journaler->flush_commit_position(ctx);
973 }
974
975 template <typename I>
976 void ImageReplayer<I>::on_flush_flush_commit_position_finish(Context *on_flush,
977 int r)
978 {
979 if (r < 0) {
980 derr << "error flushing remote journal commit position: "
981 << cpp_strerror(r) << dendl;
982 }
983
984 update_mirror_image_status(false, boost::none);
985
986 dout(20) << "flush complete, r=" << r << dendl;
987 on_flush->complete(r);
988 }
989
990 template <typename I>
991 bool ImageReplayer<I>::on_replay_interrupted()
992 {
993 bool shut_down;
994 {
995 Mutex::Locker locker(m_lock);
996 shut_down = m_stop_requested;
997 }
998
999 if (shut_down) {
1000 on_stop_journal_replay();
1001 }
1002 return shut_down;
1003 }
1004
1005 template <typename I>
1006 void ImageReplayer<I>::print_status(Formatter *f, stringstream *ss)
1007 {
1008 dout(20) << "enter" << dendl;
1009
1010 Mutex::Locker l(m_lock);
1011
1012 if (f) {
1013 f->open_object_section("image_replayer");
1014 f->dump_string("name", m_name);
1015 f->dump_string("state", to_string(m_state));
1016 f->close_section();
1017 f->flush(*ss);
1018 } else {
1019 *ss << m_name << ": state: " << to_string(m_state);
1020 }
1021 }
1022
1023 template <typename I>
1024 void ImageReplayer<I>::handle_replay_complete(int r, const std::string &error_desc)
1025 {
1026 dout(20) << "r=" << r << dendl;
1027 if (r < 0) {
1028 derr << "replay encountered an error: " << cpp_strerror(r) << dendl;
1029 set_state_description(r, error_desc);
1030 }
1031
1032 {
1033 Mutex::Locker locker(m_lock);
1034 m_stop_requested = true;
1035 }
1036 on_replay_interrupted();
1037 }
1038
1039 template <typename I>
1040 void ImageReplayer<I>::replay_flush() {
1041 dout(20) << dendl;
1042
1043 bool interrupted = false;
1044 {
1045 Mutex::Locker locker(m_lock);
1046 if (m_state != STATE_REPLAYING) {
1047 dout(20) << "replay interrupted" << dendl;
1048 interrupted = true;
1049 } else {
1050 m_state = STATE_REPLAY_FLUSHING;
1051 }
1052 }
1053
1054 if (interrupted) {
1055 m_event_replay_tracker.finish_op();
1056 return;
1057 }
1058
1059 // shut down the replay to flush all IO and ops and create a new
1060 // replayer to handle the new tag epoch
1061 Context *ctx = create_context_callback<
1062 ImageReplayer<I>, &ImageReplayer<I>::handle_replay_flush>(this);
1063 ctx = new FunctionContext([this, ctx](int r) {
1064 m_local_image_ctx->journal->stop_external_replay();
1065 m_local_replay = nullptr;
1066
1067 if (r < 0) {
1068 ctx->complete(r);
1069 return;
1070 }
1071
1072 m_local_journal->start_external_replay(&m_local_replay, ctx);
1073 });
1074 m_local_replay->shut_down(false, ctx);
1075 }
1076
1077 template <typename I>
1078 void ImageReplayer<I>::handle_replay_flush(int r) {
1079 dout(20) << "r=" << r << dendl;
1080
1081 {
1082 Mutex::Locker locker(m_lock);
1083 assert(m_state == STATE_REPLAY_FLUSHING);
1084 m_state = STATE_REPLAYING;
1085 }
1086
1087 if (r < 0) {
1088 derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl;
1089 m_event_replay_tracker.finish_op();
1090 handle_replay_complete(r, "replay flush encountered an error");
1091 return;
1092 } else if (on_replay_interrupted()) {
1093 m_event_replay_tracker.finish_op();
1094 return;
1095 }
1096
1097 get_remote_tag();
1098 }
1099
1100 template <typename I>
1101 void ImageReplayer<I>::get_remote_tag() {
1102 dout(20) << "tag_tid: " << m_replay_tag_tid << dendl;
1103
1104 Context *ctx = create_context_callback<
1105 ImageReplayer, &ImageReplayer<I>::handle_get_remote_tag>(this);
1106 m_remote_journaler->get_tag(m_replay_tag_tid, &m_replay_tag, ctx);
1107 }
1108
1109 template <typename I>
1110 void ImageReplayer<I>::handle_get_remote_tag(int r) {
1111 dout(20) << "r=" << r << dendl;
1112
1113 if (r == 0) {
1114 try {
1115 bufferlist::iterator it = m_replay_tag.data.begin();
1116 ::decode(m_replay_tag_data, it);
1117 } catch (const buffer::error &err) {
1118 r = -EBADMSG;
1119 }
1120 }
1121
1122 if (r < 0) {
1123 derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": "
1124 << cpp_strerror(r) << dendl;
1125 m_event_replay_tracker.finish_op();
1126 handle_replay_complete(r, "failed to retrieve remote tag");
1127 return;
1128 }
1129
1130 m_replay_tag_valid = true;
1131 dout(20) << "decoded remote tag " << m_replay_tag_tid << ": "
1132 << m_replay_tag_data << dendl;
1133
1134 allocate_local_tag();
1135 }
1136
1137 template <typename I>
1138 void ImageReplayer<I>::allocate_local_tag() {
1139 dout(20) << dendl;
1140
1141 std::string mirror_uuid = m_replay_tag_data.mirror_uuid;
1142 if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID ||
1143 mirror_uuid == m_local_mirror_uuid) {
1144 mirror_uuid = m_remote_image.mirror_uuid;
1145 } else if (mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) {
1146 dout(5) << "encountered image demotion: stopping" << dendl;
1147 Mutex::Locker locker(m_lock);
1148 m_stop_requested = true;
1149 }
1150
1151 librbd::journal::TagPredecessor predecessor(m_replay_tag_data.predecessor);
1152 if (predecessor.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
1153 predecessor.mirror_uuid = m_remote_image.mirror_uuid;
1154 } else if (predecessor.mirror_uuid == m_local_mirror_uuid) {
1155 predecessor.mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
1156 }
1157
1158 dout(20) << "mirror_uuid=" << mirror_uuid << ", "
1159 << "predecessor_mirror_uuid=" << predecessor.mirror_uuid << ", "
1160 << "replay_tag_tid=" << m_replay_tag_tid << ", "
1161 << "replay_tag_data=" << m_replay_tag_data << dendl;
1162 Context *ctx = create_context_callback<
1163 ImageReplayer, &ImageReplayer<I>::handle_allocate_local_tag>(this);
1164 m_local_journal->allocate_tag(mirror_uuid, predecessor, ctx);
1165 }
1166
1167 template <typename I>
1168 void ImageReplayer<I>::handle_allocate_local_tag(int r) {
1169 dout(20) << "r=" << r << dendl;
1170
1171 if (r < 0) {
1172 derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl;
1173 m_event_replay_tracker.finish_op();
1174 handle_replay_complete(r, "failed to allocate journal tag");
1175 return;
1176 }
1177
1178 preprocess_entry();
1179 }
1180
1181 template <typename I>
1182 void ImageReplayer<I>::preprocess_entry() {
1183 dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid()
1184 << dendl;
1185
1186 bufferlist data = m_replay_entry.get_data();
1187 bufferlist::iterator it = data.begin();
1188 int r = m_local_replay->decode(&it, &m_event_entry);
1189 if (r < 0) {
1190 derr << "failed to decode journal event" << dendl;
1191 m_event_replay_tracker.finish_op();
1192 handle_replay_complete(r, "failed to decode journal event");
1193 return;
1194 }
1195
1196 uint32_t delay = calculate_replay_delay(
1197 m_event_entry.timestamp, m_local_image_ctx->mirroring_replay_delay);
1198 if (delay == 0) {
1199 handle_preprocess_entry_ready(0);
1200 return;
1201 }
1202
1203 dout(20) << "delaying replay by " << delay << " sec" << dendl;
1204
1205 Mutex::Locker timer_locker(m_threads->timer_lock);
1206 assert(m_delayed_preprocess_task == nullptr);
1207 m_delayed_preprocess_task = new FunctionContext(
1208 [this](int r) {
1209 assert(m_threads->timer_lock.is_locked());
1210 m_delayed_preprocess_task = nullptr;
1211 m_threads->work_queue->queue(
1212 create_context_callback<ImageReplayer,
1213 &ImageReplayer<I>::handle_preprocess_entry_ready>(this), 0);
1214 });
1215 m_threads->timer->add_event_after(delay, m_delayed_preprocess_task);
1216 }
1217
1218 template <typename I>
1219 void ImageReplayer<I>::handle_preprocess_entry_ready(int r) {
1220 dout(20) << "r=" << r << dendl;
1221 assert(r == 0);
1222
1223 if (!m_event_preprocessor->is_required(m_event_entry)) {
1224 process_entry();
1225 return;
1226 }
1227
1228 Context *ctx = create_context_callback<
1229 ImageReplayer, &ImageReplayer<I>::handle_preprocess_entry_safe>(this);
1230 m_event_preprocessor->preprocess(&m_event_entry, ctx);
1231 }
1232
1233 template <typename I>
1234 void ImageReplayer<I>::handle_preprocess_entry_safe(int r) {
1235 dout(20) << "r=" << r << dendl;
1236
1237 if (r < 0) {
1238 m_event_replay_tracker.finish_op();
1239
1240 if (r == -ECANCELED) {
1241 handle_replay_complete(0, "lost exclusive lock");
1242 } else {
1243 derr << "failed to preprocess journal event" << dendl;
1244 handle_replay_complete(r, "failed to preprocess journal event");
1245 }
1246 return;
1247 }
1248
1249 process_entry();
1250 }
1251
1252 template <typename I>
1253 void ImageReplayer<I>::process_entry() {
1254 dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid()
1255 << dendl;
1256
1257 // stop replaying events if stop has been requested
1258 if (on_replay_interrupted()) {
1259 m_event_replay_tracker.finish_op();
1260 return;
1261 }
1262
1263 Context *on_ready = create_context_callback<
1264 ImageReplayer, &ImageReplayer<I>::handle_process_entry_ready>(this);
1265 Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry));
1266
1267 m_local_replay->process(m_event_entry, on_ready, on_commit);
1268 }
1269
1270 template <typename I>
1271 void ImageReplayer<I>::handle_process_entry_ready(int r) {
1272 dout(20) << dendl;
1273 assert(r == 0);
1274
1275 // attempt to process the next event
1276 handle_replay_ready();
1277 }
1278
1279 template <typename I>
1280 void ImageReplayer<I>::handle_process_entry_safe(const ReplayEntry& replay_entry,
1281 int r) {
1282 dout(20) << "commit_tid=" << replay_entry.get_commit_tid() << ", r=" << r
1283 << dendl;
1284
1285 if (r < 0) {
1286 derr << "failed to commit journal event: " << cpp_strerror(r) << dendl;
1287 handle_replay_complete(r, "failed to commit journal event");
1288 } else {
1289 assert(m_remote_journaler != nullptr);
1290 m_remote_journaler->committed(replay_entry);
1291 }
1292 m_event_replay_tracker.finish_op();
1293 }
1294
1295 template <typename I>
1296 bool ImageReplayer<I>::update_mirror_image_status(bool force,
1297 const OptionalState &state) {
1298 dout(20) << dendl;
1299 {
1300 Mutex::Locker locker(m_lock);
1301 if (!start_mirror_image_status_update(force, false)) {
1302 return false;
1303 }
1304 }
1305
1306 queue_mirror_image_status_update(state);
1307 return true;
1308 }
1309
1310 template <typename I>
1311 bool ImageReplayer<I>::start_mirror_image_status_update(bool force,
1312 bool restarting) {
1313 assert(m_lock.is_locked());
1314
1315 if (!force && !is_stopped_()) {
1316 if (!is_running_()) {
1317 dout(20) << "shut down in-progress: ignoring update" << dendl;
1318 return false;
1319 } else if (m_in_flight_status_updates > (restarting ? 1 : 0)) {
1320 dout(20) << "already sending update" << dendl;
1321 m_update_status_requested = true;
1322 return false;
1323 }
1324 }
1325
1326 dout(20) << dendl;
1327 ++m_in_flight_status_updates;
1328 return true;
1329 }
1330
1331 template <typename I>
1332 void ImageReplayer<I>::finish_mirror_image_status_update() {
1333 Context *on_finish = nullptr;
1334 {
1335 Mutex::Locker locker(m_lock);
1336 assert(m_in_flight_status_updates > 0);
1337 if (--m_in_flight_status_updates > 0) {
1338 dout(20) << "waiting on " << m_in_flight_status_updates << " in-flight "
1339 << "updates" << dendl;
1340 return;
1341 }
1342
1343 std::swap(on_finish, m_on_update_status_finish);
1344 }
1345
1346 dout(20) << dendl;
1347 if (on_finish != nullptr) {
1348 on_finish->complete(0);
1349 }
1350 }
1351
1352 template <typename I>
1353 void ImageReplayer<I>::queue_mirror_image_status_update(const OptionalState &state) {
1354 dout(20) << dendl;
1355 FunctionContext *ctx = new FunctionContext(
1356 [this, state](int r) {
1357 send_mirror_status_update(state);
1358 });
1359 m_threads->work_queue->queue(ctx, 0);
1360 }
1361
1362 template <typename I>
1363 void ImageReplayer<I>::send_mirror_status_update(const OptionalState &opt_state) {
1364 State state;
1365 std::string state_desc;
1366 int last_r;
1367 bool stopping_replay;
1368
1369 OptionalMirrorImageStatusState mirror_image_status_state{
1370 boost::make_optional(false, cls::rbd::MirrorImageStatusState{})};
1371 image_replayer::BootstrapRequest<I>* bootstrap_request = nullptr;
1372 {
1373 Mutex::Locker locker(m_lock);
1374 state = m_state;
1375 state_desc = m_state_desc;
1376 mirror_image_status_state = m_mirror_image_status_state;
1377 last_r = m_last_r;
1378 stopping_replay = (m_local_image_ctx != nullptr);
1379
1380 if (m_bootstrap_request != nullptr) {
1381 bootstrap_request = m_bootstrap_request;
1382 bootstrap_request->get();
1383 }
1384 }
1385
1386 bool syncing = false;
1387 if (bootstrap_request != nullptr) {
1388 syncing = bootstrap_request->is_syncing();
1389 bootstrap_request->put();
1390 bootstrap_request = nullptr;
1391 }
1392
1393 if (opt_state) {
1394 state = *opt_state;
1395 }
1396
1397 cls::rbd::MirrorImageStatus status;
1398 status.up = true;
1399 switch (state) {
1400 case STATE_STARTING:
1401 if (syncing) {
1402 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING;
1403 status.description = state_desc.empty() ? "syncing" : state_desc;
1404 mirror_image_status_state = status.state;
1405 } else {
1406 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY;
1407 status.description = "starting replay";
1408 }
1409 break;
1410 case STATE_REPLAYING:
1411 case STATE_REPLAY_FLUSHING:
1412 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
1413 {
1414 Context *on_req_finish = new FunctionContext(
1415 [this](int r) {
1416 dout(20) << "replay status ready: r=" << r << dendl;
1417 if (r >= 0) {
1418 send_mirror_status_update(boost::none);
1419 } else if (r == -EAGAIN) {
1420 // decrement in-flight status update counter
1421 handle_mirror_status_update(r);
1422 }
1423 });
1424
1425 std::string desc;
1426 if (!m_replay_status_formatter->get_or_send_update(&desc,
1427 on_req_finish)) {
1428 dout(20) << "waiting for replay status" << dendl;
1429 return;
1430 }
1431 status.description = "replaying, " + desc;
1432 mirror_image_status_state = boost::none;
1433 }
1434 break;
1435 case STATE_STOPPING:
1436 if (stopping_replay) {
1437 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY;
1438 status.description = "stopping replay";
1439 break;
1440 }
1441 // FALLTHROUGH
1442 case STATE_STOPPED:
1443 if (last_r == -EREMOTEIO) {
1444 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN;
1445 status.description = state_desc;
1446 mirror_image_status_state = status.state;
1447 } else if (last_r < 0) {
1448 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR;
1449 status.description = state_desc;
1450 mirror_image_status_state = status.state;
1451 } else {
1452 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED;
1453 status.description = state_desc.empty() ? "stopped" : state_desc;
1454 mirror_image_status_state = boost::none;
1455 }
1456 break;
1457 default:
1458 assert(!"invalid state");
1459 }
1460
1461 {
1462 Mutex::Locker locker(m_lock);
1463 m_mirror_image_status_state = mirror_image_status_state;
1464 }
1465
1466 // prevent the status from ping-ponging when failed replays are restarted
1467 if (mirror_image_status_state &&
1468 *mirror_image_status_state == cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR) {
1469 status.state = *mirror_image_status_state;
1470 }
1471
1472 dout(20) << "status=" << status << dendl;
1473 librados::ObjectWriteOperation op;
1474 librbd::cls_client::mirror_image_status_set(&op, m_global_image_id, status);
1475
1476 librados::AioCompletion *aio_comp = create_rados_callback<
1477 ImageReplayer<I>, &ImageReplayer<I>::handle_mirror_status_update>(this);
1478 int r = m_local_ioctx.aio_operate(RBD_MIRRORING, aio_comp, &op);
1479 assert(r == 0);
1480 aio_comp->release();
1481 }
1482
1483 template <typename I>
1484 void ImageReplayer<I>::handle_mirror_status_update(int r) {
1485 dout(20) << "r=" << r << dendl;
1486
1487 bool running = false;
1488 bool started = false;
1489 {
1490 Mutex::Locker locker(m_lock);
1491 bool update_status_requested = false;
1492 std::swap(update_status_requested, m_update_status_requested);
1493
1494 running = is_running_();
1495 if (running && update_status_requested) {
1496 started = start_mirror_image_status_update(false, true);
1497 }
1498 }
1499
1500 // if a deferred update is available, send it -- otherwise reschedule
1501 // the timer task
1502 if (started) {
1503 queue_mirror_image_status_update(boost::none);
1504 } else if (running) {
1505 reschedule_update_status_task();
1506 }
1507
1508 // mark committed status update as no longer in-flight
1509 finish_mirror_image_status_update();
1510 }
1511
1512 template <typename I>
1513 void ImageReplayer<I>::reschedule_update_status_task(int new_interval) {
1514 dout(20) << dendl;
1515
1516 bool canceled_task = false;
1517 {
1518 Mutex::Locker locker(m_lock);
1519 Mutex::Locker timer_locker(m_threads->timer_lock);
1520
1521 if (m_update_status_task) {
1522 canceled_task = m_threads->timer->cancel_event(m_update_status_task);
1523 m_update_status_task = nullptr;
1524 }
1525
1526 if (new_interval > 0) {
1527 m_update_status_interval = new_interval;
1528 }
1529
1530 bool restarting = (new_interval == 0 || canceled_task);
1531 if (new_interval >= 0 && is_running_() &&
1532 start_mirror_image_status_update(false, restarting)) {
1533 m_update_status_task = new FunctionContext(
1534 [this](int r) {
1535 assert(m_threads->timer_lock.is_locked());
1536 m_update_status_task = nullptr;
1537
1538 queue_mirror_image_status_update(boost::none);
1539 });
1540 m_threads->timer->add_event_after(m_update_status_interval,
1541 m_update_status_task);
1542 }
1543 }
1544
1545 if (canceled_task) {
1546 dout(20) << "canceled task" << dendl;
1547 finish_mirror_image_status_update();
1548 }
1549 }
1550
1551 template <typename I>
1552 void ImageReplayer<I>::shut_down(int r) {
1553 dout(20) << "r=" << r << dendl;
1554 {
1555 Mutex::Locker locker(m_lock);
1556 assert(m_state == STATE_STOPPING);
1557
1558 // if status updates are in-flight, wait for them to complete
1559 // before proceeding
1560 if (m_in_flight_status_updates > 0) {
1561 if (m_on_update_status_finish == nullptr) {
1562 dout(20) << "waiting for in-flight status update" << dendl;
1563 m_on_update_status_finish = new FunctionContext(
1564 [this, r](int _r) {
1565 shut_down(r);
1566 });
1567 }
1568 return;
1569 }
1570 }
1571
1572 // NOTE: it's important to ensure that the local image is fully
1573 // closed before attempting to close the remote journal in
1574 // case the remote cluster is unreachable
1575
1576 // chain the shut down sequence (reverse order)
1577 Context *ctx = new FunctionContext(
1578 [this, r](int _r) {
1579 update_mirror_image_status(true, STATE_STOPPED);
1580 handle_shut_down(r);
1581 });
1582
1583 // close the remote journal
1584 if (m_remote_journaler != nullptr) {
1585 ctx = new FunctionContext([this, ctx](int r) {
1586 delete m_remote_journaler;
1587 m_remote_journaler = nullptr;
1588 ctx->complete(0);
1589 });
1590 ctx = new FunctionContext([this, ctx](int r) {
1591 m_remote_journaler->remove_listener(&m_remote_listener);
1592 m_remote_journaler->shut_down(ctx);
1593 });
1594 }
1595
1596 // stop the replay of remote journal events
1597 if (m_replay_handler != nullptr) {
1598 ctx = new FunctionContext([this, ctx](int r) {
1599 delete m_replay_handler;
1600 m_replay_handler = nullptr;
1601
1602 m_event_replay_tracker.wait_for_ops(ctx);
1603 });
1604 ctx = new FunctionContext([this, ctx](int r) {
1605 m_remote_journaler->stop_replay(ctx);
1606 });
1607 }
1608
1609 // close the local image (release exclusive lock)
1610 if (m_local_image_ctx) {
1611 ctx = new FunctionContext([this, ctx](int r) {
1612 CloseImageRequest<I> *request = CloseImageRequest<I>::create(
1613 &m_local_image_ctx, ctx);
1614 request->send();
1615 });
1616 }
1617
1618 // shut down event replay into the local image
1619 if (m_local_journal != nullptr) {
1620 ctx = new FunctionContext([this, ctx](int r) {
1621 m_local_journal = nullptr;
1622 ctx->complete(0);
1623 });
1624 if (m_local_replay != nullptr) {
1625 ctx = new FunctionContext([this, ctx](int r) {
1626 m_local_journal->stop_external_replay();
1627 m_local_replay = nullptr;
1628
1629 EventPreprocessor<I>::destroy(m_event_preprocessor);
1630 m_event_preprocessor = nullptr;
1631 ctx->complete(0);
1632 });
1633 }
1634 ctx = new FunctionContext([this, ctx](int r) {
1635 // blocks if listener notification is in-progress
1636 m_local_journal->remove_listener(m_journal_listener);
1637 ctx->complete(0);
1638 });
1639 }
1640
1641 // wait for all local in-flight replay events to complete
1642 ctx = new FunctionContext([this, ctx](int r) {
1643 if (r < 0) {
1644 derr << "error shutting down journal replay: " << cpp_strerror(r)
1645 << dendl;
1646 }
1647
1648 m_event_replay_tracker.wait_for_ops(ctx);
1649 });
1650
1651 // flush any local in-flight replay events
1652 if (m_local_replay != nullptr) {
1653 ctx = new FunctionContext([this, ctx](int r) {
1654 m_local_replay->shut_down(true, ctx);
1655 });
1656 }
1657
1658 m_threads->work_queue->queue(ctx, 0);
1659 }
1660
1661 template <typename I>
1662 void ImageReplayer<I>::handle_shut_down(int r) {
1663 reschedule_update_status_task(-1);
1664
1665 {
1666 Mutex::Locker locker(m_lock);
1667
1668 // if status updates are in-flight, wait for them to complete
1669 // before proceeding
1670 if (m_in_flight_status_updates > 0) {
1671 if (m_on_update_status_finish == nullptr) {
1672 dout(20) << "waiting for in-flight status update" << dendl;
1673 m_on_update_status_finish = new FunctionContext(
1674 [this, r](int _r) {
1675 handle_shut_down(r);
1676 });
1677 }
1678 return;
1679 }
1680
1681 bool delete_requested = false;
1682 if (m_delete_requested && !m_local_image_id.empty()) {
1683 assert(m_remote_image.image_id.empty());
1684 dout(0) << "remote image no longer exists: scheduling deletion" << dendl;
1685 delete_requested = true;
1686 }
1687 if (delete_requested || m_resync_requested) {
1688 m_image_deleter->schedule_image_delete(m_local,
1689 m_local_pool_id,
1690 m_global_image_id,
1691 m_resync_requested);
1692
1693 m_local_image_id = "";
1694 m_resync_requested = false;
1695 if (m_delete_requested) {
1696 unregister_admin_socket_hook();
1697 m_delete_requested = false;
1698 }
1699 } else if (m_last_r == -ENOENT &&
1700 m_local_image_id.empty() && m_remote_image.image_id.empty()) {
1701 dout(0) << "mirror image no longer exists" << dendl;
1702 unregister_admin_socket_hook();
1703 m_finished = true;
1704 }
1705 }
1706
1707 dout(20) << "stop complete" << dendl;
1708 m_local_ioctx.close();
1709
1710 ReplayStatusFormatter<I>::destroy(m_replay_status_formatter);
1711 m_replay_status_formatter = nullptr;
1712
1713 Context *on_start = nullptr;
1714 Context *on_stop = nullptr;
1715 {
1716 Mutex::Locker locker(m_lock);
1717 std::swap(on_start, m_on_start_finish);
1718 std::swap(on_stop, m_on_stop_finish);
1719 m_stop_requested = false;
1720 assert(m_delayed_preprocess_task == nullptr);
1721 assert(m_state == STATE_STOPPING);
1722 m_state = STATE_STOPPED;
1723 }
1724
1725 if (on_start != nullptr) {
1726 dout(20) << "on start finish complete, r=" << r << dendl;
1727 on_start->complete(r);
1728 r = 0;
1729 }
1730 if (on_stop != nullptr) {
1731 dout(20) << "on stop finish complete, r=" << r << dendl;
1732 on_stop->complete(r);
1733 }
1734 }
1735
1736 template <typename I>
1737 void ImageReplayer<I>::handle_remote_journal_metadata_updated() {
1738 dout(20) << dendl;
1739
1740 cls::journal::Client client;
1741 {
1742 Mutex::Locker locker(m_lock);
1743 if (!is_running_()) {
1744 return;
1745 }
1746
1747 int r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
1748 if (r < 0) {
1749 derr << "failed to retrieve client: " << cpp_strerror(r) << dendl;
1750 return;
1751 }
1752 }
1753
1754 if (client.state != cls::journal::CLIENT_STATE_CONNECTED) {
1755 dout(0) << "client flagged disconnected, stopping image replay" << dendl;
1756 stop(nullptr, false, -ENOTCONN, "disconnected");
1757 }
1758 }
1759
1760 template <typename I>
1761 std::string ImageReplayer<I>::to_string(const State state) {
1762 switch (state) {
1763 case ImageReplayer<I>::STATE_STARTING:
1764 return "Starting";
1765 case ImageReplayer<I>::STATE_REPLAYING:
1766 return "Replaying";
1767 case ImageReplayer<I>::STATE_REPLAY_FLUSHING:
1768 return "ReplayFlushing";
1769 case ImageReplayer<I>::STATE_STOPPING:
1770 return "Stopping";
1771 case ImageReplayer<I>::STATE_STOPPED:
1772 return "Stopped";
1773 default:
1774 break;
1775 }
1776 return "Unknown(" + stringify(state) + ")";
1777 }
1778
1779 template <typename I>
1780 void ImageReplayer<I>::resync_image(Context *on_finish) {
1781 dout(20) << dendl;
1782
1783 m_resync_requested = true;
1784 stop(on_finish);
1785 }
1786
1787 template <typename I>
1788 void ImageReplayer<I>::register_admin_socket_hook() {
1789 if (m_asok_hook != nullptr) {
1790 return;
1791 }
1792
1793 dout(20) << "registered asok hook: " << m_name << dendl;
1794 auto asok_hook = new ImageReplayerAdminSocketHook<I>(g_ceph_context, m_name,
1795 this);
1796 int r = asok_hook->register_commands();
1797 if (r < 0) {
1798 derr << "error registering admin socket commands" << dendl;
1799 delete asok_hook;
1800 asok_hook = nullptr;
1801 return;
1802 }
1803
1804 m_asok_hook = asok_hook;
1805 }
1806
1807 template <typename I>
1808 void ImageReplayer<I>::unregister_admin_socket_hook() {
1809 dout(20) << dendl;
1810
1811 delete m_asok_hook;
1812 m_asok_hook = nullptr;
1813 }
1814
1815 template <typename I>
1816 std::ostream &operator<<(std::ostream &os, const ImageReplayer<I> &replayer)
1817 {
1818 os << "ImageReplayer: " << &replayer << " [" << replayer.get_local_pool_id()
1819 << "/" << replayer.get_global_image_id() << "]";
1820 return os;
1821 }
1822
1823 } // namespace mirror
1824 } // namespace rbd
1825
1826 template class rbd::mirror::ImageReplayer<librbd::ImageCtx>;