]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/ImageReplayer.cc
update sources to 12.2.7
[ceph.git] / ceph / src / tools / rbd_mirror / ImageReplayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/compat.h"
5 #include "common/Formatter.h"
6 #include "common/debug.h"
7 #include "common/errno.h"
8 #include "include/stringify.h"
9 #include "cls/rbd/cls_rbd_client.h"
10 #include "common/Timer.h"
11 #include "common/WorkQueue.h"
12 #include "global/global_context.h"
13 #include "journal/Journaler.h"
14 #include "journal/ReplayHandler.h"
15 #include "journal/Settings.h"
16 #include "librbd/ExclusiveLock.h"
17 #include "librbd/ImageCtx.h"
18 #include "librbd/ImageState.h"
19 #include "librbd/Journal.h"
20 #include "librbd/Operations.h"
21 #include "librbd/Utils.h"
22 #include "librbd/journal/Replay.h"
23 #include "ImageDeleter.h"
24 #include "ImageReplayer.h"
25 #include "Threads.h"
26 #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
27 #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
28 #include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
29 #include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
30 #include "tools/rbd_mirror/image_replayer/PrepareRemoteImageRequest.h"
31 #include "tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h"
32
33 #define dout_context g_ceph_context
34 #define dout_subsys ceph_subsys_rbd_mirror
35 #undef dout_prefix
36 #define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
37 << __func__ << ": "
38
39 using std::map;
40 using std::string;
41 using std::unique_ptr;
42 using std::shared_ptr;
43 using std::vector;
44
45 namespace rbd {
46 namespace mirror {
47
48 using librbd::util::create_context_callback;
49 using librbd::util::create_rados_callback;
50 using namespace rbd::mirror::image_replayer;
51
52 template <typename I>
53 std::ostream &operator<<(std::ostream &os,
54 const typename ImageReplayer<I>::State &state);
55
56 namespace {
57
58 template <typename I>
59 struct ReplayHandler : public ::journal::ReplayHandler {
60 ImageReplayer<I> *replayer;
61 ReplayHandler(ImageReplayer<I> *replayer) : replayer(replayer) {}
62 void get() override {}
63 void put() override {}
64
65 void handle_entries_available() override {
66 replayer->handle_replay_ready();
67 }
68 void handle_complete(int r) override {
69 std::stringstream ss;
70 if (r < 0) {
71 ss << "replay completed with error: " << cpp_strerror(r);
72 }
73 replayer->handle_replay_complete(r, ss.str());
74 }
75 };
76
77 template <typename I>
78 class ImageReplayerAdminSocketCommand {
79 public:
80 ImageReplayerAdminSocketCommand(const std::string &desc,
81 ImageReplayer<I> *replayer)
82 : desc(desc), replayer(replayer) {
83 }
84 virtual ~ImageReplayerAdminSocketCommand() {}
85 virtual bool call(Formatter *f, stringstream *ss) = 0;
86
87 std::string desc;
88 ImageReplayer<I> *replayer;
89 bool registered = false;
90 };
91
92 template <typename I>
93 class StatusCommand : public ImageReplayerAdminSocketCommand<I> {
94 public:
95 explicit StatusCommand(const std::string &desc, ImageReplayer<I> *replayer)
96 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
97 }
98
99 bool call(Formatter *f, stringstream *ss) override {
100 this->replayer->print_status(f, ss);
101 return true;
102 }
103 };
104
105 template <typename I>
106 class StartCommand : public ImageReplayerAdminSocketCommand<I> {
107 public:
108 explicit StartCommand(const std::string &desc, ImageReplayer<I> *replayer)
109 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
110 }
111
112 bool call(Formatter *f, stringstream *ss) override {
113 this->replayer->start(nullptr, true);
114 return true;
115 }
116 };
117
118 template <typename I>
119 class StopCommand : public ImageReplayerAdminSocketCommand<I> {
120 public:
121 explicit StopCommand(const std::string &desc, ImageReplayer<I> *replayer)
122 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
123 }
124
125 bool call(Formatter *f, stringstream *ss) override {
126 this->replayer->stop(nullptr, true);
127 return true;
128 }
129 };
130
131 template <typename I>
132 class RestartCommand : public ImageReplayerAdminSocketCommand<I> {
133 public:
134 explicit RestartCommand(const std::string &desc, ImageReplayer<I> *replayer)
135 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
136 }
137
138 bool call(Formatter *f, stringstream *ss) override {
139 this->replayer->restart();
140 return true;
141 }
142 };
143
144 template <typename I>
145 class FlushCommand : public ImageReplayerAdminSocketCommand<I> {
146 public:
147 explicit FlushCommand(const std::string &desc, ImageReplayer<I> *replayer)
148 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
149 }
150
151 bool call(Formatter *f, stringstream *ss) override {
152 this->replayer->flush();
153 return true;
154 }
155 };
156
157 template <typename I>
158 class ImageReplayerAdminSocketHook : public AdminSocketHook {
159 public:
160 ImageReplayerAdminSocketHook(CephContext *cct, const std::string &name,
161 ImageReplayer<I> *replayer)
162 : admin_socket(cct->get_admin_socket()),
163 commands{{"rbd mirror flush " + name,
164 new FlushCommand<I>("flush rbd mirror " + name, replayer)},
165 {"rbd mirror restart " + name,
166 new RestartCommand<I>("restart rbd mirror " + name, replayer)},
167 {"rbd mirror start " + name,
168 new StartCommand<I>("start rbd mirror " + name, replayer)},
169 {"rbd mirror status " + name,
170 new StatusCommand<I>("get status for rbd mirror " + name, replayer)},
171 {"rbd mirror stop " + name,
172 new StopCommand<I>("stop rbd mirror " + name, replayer)}} {
173 }
174
175 int register_commands() {
176 for (auto &it : commands) {
177 int r = admin_socket->register_command(it.first, it.first, this,
178 it.second->desc);
179 if (r < 0) {
180 return r;
181 }
182 it.second->registered = true;
183 }
184 return 0;
185 }
186
187 ~ImageReplayerAdminSocketHook() override {
188 for (auto &it : commands) {
189 if (it.second->registered) {
190 admin_socket->unregister_command(it.first);
191 }
192 delete it.second;
193 }
194 commands.clear();
195 }
196
197 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
198 bufferlist& out) override {
199 auto i = commands.find(command);
200 assert(i != commands.end());
201 Formatter *f = Formatter::create(format);
202 stringstream ss;
203 bool r = i->second->call(f, &ss);
204 delete f;
205 out.append(ss);
206 return r;
207 }
208
209 private:
210 typedef std::map<std::string, ImageReplayerAdminSocketCommand<I> *> Commands;
211
212 AdminSocket *admin_socket;
213 Commands commands;
214 };
215
216 uint32_t calculate_replay_delay(const utime_t &event_time,
217 int mirroring_replay_delay) {
218 if (mirroring_replay_delay <= 0) {
219 return 0;
220 }
221
222 utime_t now = ceph_clock_now();
223 if (event_time + mirroring_replay_delay <= now) {
224 return 0;
225 }
226
227 // ensure it is rounded up when converting to integer
228 return (event_time + mirroring_replay_delay - now) + 1;
229 }
230
231 } // anonymous namespace
232
233 template <typename I>
234 void ImageReplayer<I>::BootstrapProgressContext::update_progress(
235 const std::string &description, bool flush)
236 {
237 const std::string desc = "bootstrapping, " + description;
238 replayer->set_state_description(0, desc);
239 if (flush) {
240 replayer->update_mirror_image_status(false, boost::none);
241 }
242 }
243
244 template <typename I>
245 void ImageReplayer<I>::RemoteJournalerListener::handle_update(
246 ::journal::JournalMetadata *) {
247 FunctionContext *ctx = new FunctionContext([this](int r) {
248 replayer->handle_remote_journal_metadata_updated();
249 });
250 replayer->m_threads->work_queue->queue(ctx, 0);
251 }
252
253 template <typename I>
254 ImageReplayer<I>::ImageReplayer(Threads<I> *threads,
255 ImageDeleter<I>* image_deleter,
256 InstanceWatcher<I> *instance_watcher,
257 RadosRef local,
258 const std::string &local_mirror_uuid,
259 int64_t local_pool_id,
260 const std::string &global_image_id) :
261 m_threads(threads),
262 m_image_deleter(image_deleter),
263 m_instance_watcher(instance_watcher),
264 m_local(local),
265 m_local_mirror_uuid(local_mirror_uuid),
266 m_local_pool_id(local_pool_id),
267 m_global_image_id(global_image_id), m_local_image_name(global_image_id),
268 m_lock("rbd::mirror::ImageReplayer " + stringify(local_pool_id) + " " +
269 global_image_id),
270 m_progress_cxt(this),
271 m_journal_listener(new JournalListener(this)),
272 m_remote_listener(this)
273 {
274 // Register asok commands using a temporary "remote_pool_name/global_image_id"
275 // name. When the image name becomes known on start the asok commands will be
276 // re-registered using "remote_pool_name/remote_image_name" name.
277
278 std::string pool_name;
279 int r = m_local->pool_reverse_lookup(m_local_pool_id, &pool_name);
280 if (r < 0) {
281 derr << "error resolving local pool " << m_local_pool_id
282 << ": " << cpp_strerror(r) << dendl;
283 pool_name = stringify(m_local_pool_id);
284 }
285
286 m_name = pool_name + "/" + m_global_image_id;
287 register_admin_socket_hook();
288 }
289
290 template <typename I>
291 ImageReplayer<I>::~ImageReplayer()
292 {
293 unregister_admin_socket_hook();
294 assert(m_event_preprocessor == nullptr);
295 assert(m_replay_status_formatter == nullptr);
296 assert(m_local_image_ctx == nullptr);
297 assert(m_local_replay == nullptr);
298 assert(m_remote_journaler == nullptr);
299 assert(m_replay_handler == nullptr);
300 assert(m_on_start_finish == nullptr);
301 assert(m_on_stop_finish == nullptr);
302 assert(m_bootstrap_request == nullptr);
303 assert(m_in_flight_status_updates == 0);
304
305 delete m_journal_listener;
306 }
307
308 template <typename I>
309 image_replayer::HealthState ImageReplayer<I>::get_health_state() const {
310 Mutex::Locker locker(m_lock);
311
312 if (!m_mirror_image_status_state) {
313 return image_replayer::HEALTH_STATE_OK;
314 } else if (*m_mirror_image_status_state ==
315 cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING ||
316 *m_mirror_image_status_state ==
317 cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN) {
318 return image_replayer::HEALTH_STATE_WARNING;
319 }
320 return image_replayer::HEALTH_STATE_ERROR;
321 }
322
323 template <typename I>
324 void ImageReplayer<I>::add_peer(const std::string &peer_uuid,
325 librados::IoCtx &io_ctx) {
326 Mutex::Locker locker(m_lock);
327 auto it = m_peers.find({peer_uuid});
328 if (it == m_peers.end()) {
329 m_peers.insert({peer_uuid, io_ctx});
330 }
331 }
332
333 template <typename I>
334 void ImageReplayer<I>::set_state_description(int r, const std::string &desc) {
335 dout(20) << r << " " << desc << dendl;
336
337 Mutex::Locker l(m_lock);
338 m_last_r = r;
339 m_state_desc = desc;
340 }
341
342 template <typename I>
343 void ImageReplayer<I>::start(Context *on_finish, bool manual)
344 {
345 dout(20) << "on_finish=" << on_finish << dendl;
346
347 int r = 0;
348 {
349 Mutex::Locker locker(m_lock);
350 if (!is_stopped_()) {
351 derr << "already running" << dendl;
352 r = -EINVAL;
353 } else if (m_manual_stop && !manual) {
354 dout(5) << "stopped manually, ignoring start without manual flag"
355 << dendl;
356 r = -EPERM;
357 } else {
358 m_state = STATE_STARTING;
359 m_last_r = 0;
360 m_state_desc.clear();
361 m_manual_stop = false;
362 m_delete_requested = false;
363
364 if (on_finish != nullptr) {
365 assert(m_on_start_finish == nullptr);
366 m_on_start_finish = on_finish;
367 }
368 assert(m_on_stop_finish == nullptr);
369 }
370 }
371
372 if (r < 0) {
373 if (on_finish) {
374 on_finish->complete(r);
375 }
376 return;
377 }
378
379 r = m_local->ioctx_create2(m_local_pool_id, m_local_ioctx);
380 if (r < 0) {
381 derr << "error opening ioctx for local pool " << m_local_pool_id
382 << ": " << cpp_strerror(r) << dendl;
383 on_start_fail(r, "error opening local pool");
384 return;
385 }
386
387 wait_for_deletion();
388 }
389
390 template <typename I>
391 void ImageReplayer<I>::wait_for_deletion() {
392 dout(20) << dendl;
393
394 Context *ctx = create_context_callback<
395 ImageReplayer, &ImageReplayer<I>::handle_wait_for_deletion>(this);
396 m_image_deleter->wait_for_scheduled_deletion(
397 m_local_pool_id, m_global_image_id, ctx, false);
398 }
399
400 template <typename I>
401 void ImageReplayer<I>::handle_wait_for_deletion(int r) {
402 dout(20) << "r=" << r << dendl;
403
404 if (r == -ECANCELED) {
405 on_start_fail(0, "");
406 return;
407 } else if (r < 0) {
408 on_start_fail(r, "error waiting for image deletion");
409 return;
410 }
411
412 prepare_local_image();
413 }
414
415 template <typename I>
416 void ImageReplayer<I>::prepare_local_image() {
417 dout(20) << dendl;
418
419 m_local_image_id = "";
420 Context *ctx = create_context_callback<
421 ImageReplayer, &ImageReplayer<I>::handle_prepare_local_image>(this);
422 auto req = PrepareLocalImageRequest<I>::create(
423 m_local_ioctx, m_global_image_id, &m_local_image_id, &m_local_image_name,
424 &m_local_image_tag_owner, m_threads->work_queue, ctx);
425 req->send();
426 }
427
428 template <typename I>
429 void ImageReplayer<I>::handle_prepare_local_image(int r) {
430 dout(20) << "r=" << r << dendl;
431
432 if (r == -ENOENT) {
433 dout(20) << "local image does not exist" << dendl;
434 } else if (r < 0) {
435 on_start_fail(r, "error preparing local image for replay");
436 return;
437 } else {
438 reregister_admin_socket_hook();
439 }
440
441 // local image doesn't exist or is non-primary
442 prepare_remote_image();
443 }
444
445 template <typename I>
446 void ImageReplayer<I>::prepare_remote_image() {
447 dout(20) << dendl;
448 if (m_peers.empty()) {
449 // technically nothing to bootstrap, but it handles the status update
450 bootstrap();
451 return;
452 }
453
454 // TODO need to support multiple remote images
455 assert(!m_peers.empty());
456 m_remote_image = {*m_peers.begin()};
457
458 Context *ctx = create_context_callback<
459 ImageReplayer, &ImageReplayer<I>::handle_prepare_remote_image>(this);
460 auto req = PrepareRemoteImageRequest<I>::create(
461 m_threads, m_remote_image.io_ctx, m_global_image_id, m_local_mirror_uuid,
462 m_local_image_id, &m_remote_image.mirror_uuid, &m_remote_image.image_id,
463 &m_remote_journaler, &m_client_state, &m_client_meta, ctx);
464 req->send();
465 }
466
467 template <typename I>
468 void ImageReplayer<I>::handle_prepare_remote_image(int r) {
469 dout(20) << "r=" << r << dendl;
470
471 assert(r < 0 ? m_remote_journaler == nullptr : m_remote_journaler != nullptr);
472 if (r < 0 && !m_local_image_id.empty() &&
473 m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) {
474 // local image is primary -- fall-through
475 } else if (r == -ENOENT) {
476 dout(20) << "remote image does not exist" << dendl;
477
478 // TODO need to support multiple remote images
479 if (!m_local_image_id.empty() &&
480 m_local_image_tag_owner == m_remote_image.mirror_uuid) {
481 // local image exists and is non-primary and linked to the missing
482 // remote image
483
484 m_delete_requested = true;
485 on_start_fail(0, "remote image no longer exists");
486 } else {
487 on_start_fail(-ENOENT, "remote image does not exist");
488 }
489 return;
490 } else if (r < 0) {
491 on_start_fail(r, "error retrieving remote image id");
492 return;
493 }
494
495 bootstrap();
496 }
497
498 template <typename I>
499 void ImageReplayer<I>::bootstrap() {
500 dout(20) << dendl;
501
502 if (!m_local_image_id.empty() &&
503 m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) {
504 dout(5) << "local image is primary" << dendl;
505 on_start_fail(0, "local image is primary");
506 return;
507 } else if (m_peers.empty()) {
508 dout(5) << "no peer clusters" << dendl;
509 on_start_fail(-ENOENT, "no peer clusters");
510 return;
511 }
512
513 Context *ctx = create_context_callback<
514 ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
515
516 BootstrapRequest<I> *request = BootstrapRequest<I>::create(
517 m_local_ioctx, m_remote_image.io_ctx, m_instance_watcher,
518 &m_local_image_ctx, m_local_image_id, m_remote_image.image_id,
519 m_global_image_id, m_threads->work_queue, m_threads->timer,
520 &m_threads->timer_lock, m_local_mirror_uuid, m_remote_image.mirror_uuid,
521 m_remote_journaler, &m_client_state, &m_client_meta, ctx,
522 &m_resync_requested, &m_progress_cxt);
523
524 {
525 Mutex::Locker locker(m_lock);
526 request->get();
527 m_bootstrap_request = request;
528 }
529
530 update_mirror_image_status(false, boost::none);
531 reschedule_update_status_task(10);
532
533 request->send();
534 }
535
536 template <typename I>
537 void ImageReplayer<I>::handle_bootstrap(int r) {
538 dout(20) << "r=" << r << dendl;
539 {
540 Mutex::Locker locker(m_lock);
541 m_bootstrap_request->put();
542 m_bootstrap_request = nullptr;
543 if (m_local_image_ctx) {
544 m_local_image_id = m_local_image_ctx->id;
545 }
546 }
547
548 if (r == -EREMOTEIO) {
549 m_local_image_tag_owner = "";
550 dout(5) << "remote image is non-primary" << dendl;
551 on_start_fail(-EREMOTEIO, "remote image is non-primary");
552 return;
553 } else if (r == -EEXIST) {
554 m_local_image_tag_owner = "";
555 on_start_fail(r, "split-brain detected");
556 return;
557 } else if (r < 0) {
558 on_start_fail(r, "error bootstrapping replay");
559 return;
560 } else if (on_start_interrupted()) {
561 return;
562 } else if (m_resync_requested) {
563 on_start_fail(0, "resync requested");
564 return;
565 }
566
567 assert(m_local_journal == nullptr);
568 {
569 RWLock::RLocker snap_locker(m_local_image_ctx->snap_lock);
570 if (m_local_image_ctx->journal != nullptr) {
571 m_local_journal = m_local_image_ctx->journal;
572 m_local_journal->add_listener(m_journal_listener);
573 }
574 }
575
576 if (m_local_journal == nullptr) {
577 on_start_fail(-EINVAL, "error accessing local journal");
578 return;
579 }
580
581 update_mirror_image_status(false, boost::none);
582 init_remote_journaler();
583 }
584
585 template <typename I>
586 void ImageReplayer<I>::init_remote_journaler() {
587 dout(20) << dendl;
588
589 Context *ctx = create_context_callback<
590 ImageReplayer, &ImageReplayer<I>::handle_init_remote_journaler>(this);
591 m_remote_journaler->init(ctx);
592 }
593
594 template <typename I>
595 void ImageReplayer<I>::handle_init_remote_journaler(int r) {
596 dout(20) << "r=" << r << dendl;
597
598 if (r < 0) {
599 derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl;
600 on_start_fail(r, "error initializing remote journal");
601 return;
602 } else if (on_start_interrupted()) {
603 return;
604 }
605
606 m_remote_journaler->add_listener(&m_remote_listener);
607
608 cls::journal::Client client;
609 r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
610 if (r < 0) {
611 derr << "error retrieving remote journal client: " << cpp_strerror(r)
612 << dendl;
613 on_start_fail(r, "error retrieving remote journal client");
614 return;
615 }
616
617 dout(5) << "image_id=" << m_local_image_id << ", "
618 << "client_meta.image_id=" << m_client_meta.image_id << ", "
619 << "client.state=" << client.state << dendl;
620 if (m_client_meta.image_id == m_local_image_id &&
621 client.state != cls::journal::CLIENT_STATE_CONNECTED) {
622 dout(5) << "client flagged disconnected, stopping image replay" << dendl;
623 if (m_local_image_ctx->mirroring_resync_after_disconnect) {
624 m_resync_requested = true;
625 on_start_fail(-ENOTCONN, "disconnected: automatic resync");
626 } else {
627 on_start_fail(-ENOTCONN, "disconnected");
628 }
629 return;
630 }
631
632 start_replay();
633 }
634
635 template <typename I>
636 void ImageReplayer<I>::start_replay() {
637 dout(20) << dendl;
638
639 Context *start_ctx = create_context_callback<
640 ImageReplayer, &ImageReplayer<I>::handle_start_replay>(this);
641 m_local_journal->start_external_replay(&m_local_replay, start_ctx);
642 }
643
644 template <typename I>
645 void ImageReplayer<I>::handle_start_replay(int r) {
646 dout(20) << "r=" << r << dendl;
647
648 if (r < 0) {
649 assert(m_local_replay == nullptr);
650 derr << "error starting external replay on local image "
651 << m_local_image_id << ": " << cpp_strerror(r) << dendl;
652 on_start_fail(r, "error starting replay on local image");
653 return;
654 }
655
656 Context *on_finish(nullptr);
657 {
658 Mutex::Locker locker(m_lock);
659 assert(m_state == STATE_STARTING);
660 m_state = STATE_REPLAYING;
661 std::swap(m_on_start_finish, on_finish);
662 }
663
664 m_event_preprocessor = EventPreprocessor<I>::create(
665 *m_local_image_ctx, *m_remote_journaler, m_local_mirror_uuid,
666 &m_client_meta, m_threads->work_queue);
667 m_replay_status_formatter =
668 ReplayStatusFormatter<I>::create(m_remote_journaler, m_local_mirror_uuid);
669
670 update_mirror_image_status(true, boost::none);
671 reschedule_update_status_task(30);
672
673 if (on_replay_interrupted()) {
674 return;
675 }
676
677 {
678 CephContext *cct = static_cast<CephContext *>(m_local->cct());
679 double poll_seconds = cct->_conf->get_val<double>(
680 "rbd_mirror_journal_poll_age");
681
682 Mutex::Locker locker(m_lock);
683 m_replay_handler = new ReplayHandler<I>(this);
684 m_remote_journaler->start_live_replay(m_replay_handler, poll_seconds);
685
686 dout(20) << "m_remote_journaler=" << *m_remote_journaler << dendl;
687 }
688
689 dout(20) << "start succeeded" << dendl;
690 if (on_finish != nullptr) {
691 dout(20) << "on finish complete, r=" << r << dendl;
692 on_finish->complete(r);
693 }
694 }
695
696 template <typename I>
697 void ImageReplayer<I>::on_start_fail(int r, const std::string &desc)
698 {
699 dout(20) << "r=" << r << dendl;
700 Context *ctx = new FunctionContext([this, r, desc](int _r) {
701 {
702 Mutex::Locker locker(m_lock);
703 assert(m_state == STATE_STARTING);
704 m_state = STATE_STOPPING;
705 if (r < 0 && r != -ECANCELED && r != -EREMOTEIO && r != -ENOENT) {
706 derr << "start failed: " << cpp_strerror(r) << dendl;
707 } else {
708 dout(20) << "start canceled" << dendl;
709 }
710 }
711
712 set_state_description(r, desc);
713 update_mirror_image_status(false, boost::none);
714 reschedule_update_status_task(-1);
715 shut_down(r);
716 });
717 m_threads->work_queue->queue(ctx, 0);
718 }
719
720 template <typename I>
721 bool ImageReplayer<I>::on_start_interrupted()
722 {
723 Mutex::Locker locker(m_lock);
724 assert(m_state == STATE_STARTING);
725 if (m_on_stop_finish == nullptr) {
726 return false;
727 }
728
729 on_start_fail(-ECANCELED);
730 return true;
731 }
732
733 template <typename I>
734 void ImageReplayer<I>::stop(Context *on_finish, bool manual, int r,
735 const std::string& desc)
736 {
737 dout(20) << "on_finish=" << on_finish << ", manual=" << manual
738 << ", desc=" << desc << dendl;
739
740 m_image_deleter->cancel_waiter(m_local_pool_id, m_global_image_id);
741
742 image_replayer::BootstrapRequest<I> *bootstrap_request = nullptr;
743 bool shut_down_replay = false;
744 bool running = true;
745 {
746 Mutex::Locker locker(m_lock);
747
748 if (!is_running_()) {
749 running = false;
750 } else {
751 if (!is_stopped_()) {
752 if (m_state == STATE_STARTING) {
753 dout(20) << "canceling start" << dendl;
754 if (m_bootstrap_request) {
755 bootstrap_request = m_bootstrap_request;
756 bootstrap_request->get();
757 }
758 } else {
759 dout(20) << "interrupting replay" << dendl;
760 shut_down_replay = true;
761 }
762
763 assert(m_on_stop_finish == nullptr);
764 std::swap(m_on_stop_finish, on_finish);
765 m_stop_requested = true;
766 m_manual_stop = manual;
767 }
768 }
769 }
770
771 // avoid holding lock since bootstrap request will update status
772 if (bootstrap_request != nullptr) {
773 bootstrap_request->cancel();
774 bootstrap_request->put();
775 }
776
777 if (!running) {
778 dout(20) << "not running" << dendl;
779 if (on_finish) {
780 on_finish->complete(-EINVAL);
781 }
782 return;
783 }
784
785 if (shut_down_replay) {
786 on_stop_journal_replay(r, desc);
787 } else if (on_finish != nullptr) {
788 on_finish->complete(0);
789 }
790 }
791
792 template <typename I>
793 void ImageReplayer<I>::on_stop_journal_replay(int r, const std::string &desc)
794 {
795 dout(20) << "enter" << dendl;
796
797 {
798 Mutex::Locker locker(m_lock);
799 if (m_state != STATE_REPLAYING) {
800 // might be invoked multiple times while stopping
801 return;
802 }
803 m_stop_requested = true;
804 m_state = STATE_STOPPING;
805 }
806
807 set_state_description(r, desc);
808 update_mirror_image_status(false, boost::none);
809 reschedule_update_status_task(-1);
810 shut_down(0);
811 }
812
813 template <typename I>
814 void ImageReplayer<I>::handle_replay_ready()
815 {
816 dout(20) << "enter" << dendl;
817 if (on_replay_interrupted()) {
818 return;
819 }
820
821 if (!m_remote_journaler->try_pop_front(&m_replay_entry, &m_replay_tag_tid)) {
822 return;
823 }
824
825 m_event_replay_tracker.start_op();
826
827 m_lock.Lock();
828 bool stopping = (m_state == STATE_STOPPING);
829 m_lock.Unlock();
830
831 if (stopping) {
832 dout(10) << "stopping event replay" << dendl;
833 m_event_replay_tracker.finish_op();
834 return;
835 }
836
837 if (m_replay_tag_valid && m_replay_tag.tid == m_replay_tag_tid) {
838 preprocess_entry();
839 return;
840 }
841
842 replay_flush();
843 }
844
845 template <typename I>
846 void ImageReplayer<I>::restart(Context *on_finish)
847 {
848 FunctionContext *ctx = new FunctionContext(
849 [this, on_finish](int r) {
850 if (r < 0) {
851 // Try start anyway.
852 }
853 start(on_finish, true);
854 });
855 stop(ctx);
856 }
857
858 template <typename I>
859 void ImageReplayer<I>::flush(Context *on_finish)
860 {
861 dout(20) << "enter" << dendl;
862
863 {
864 Mutex::Locker locker(m_lock);
865 if (m_state == STATE_REPLAYING) {
866 Context *ctx = new FunctionContext(
867 [on_finish](int r) {
868 if (on_finish != nullptr) {
869 on_finish->complete(r);
870 }
871 });
872 on_flush_local_replay_flush_start(ctx);
873 return;
874 }
875 }
876
877 if (on_finish) {
878 on_finish->complete(0);
879 }
880 }
881
882 template <typename I>
883 void ImageReplayer<I>::on_flush_local_replay_flush_start(Context *on_flush)
884 {
885 dout(20) << "enter" << dendl;
886 FunctionContext *ctx = new FunctionContext(
887 [this, on_flush](int r) {
888 on_flush_local_replay_flush_finish(on_flush, r);
889 });
890
891 assert(m_lock.is_locked());
892 assert(m_state == STATE_REPLAYING);
893 m_local_replay->flush(ctx);
894 }
895
896 template <typename I>
897 void ImageReplayer<I>::on_flush_local_replay_flush_finish(Context *on_flush,
898 int r)
899 {
900 dout(20) << "r=" << r << dendl;
901 if (r < 0) {
902 derr << "error flushing local replay: " << cpp_strerror(r) << dendl;
903 on_flush->complete(r);
904 return;
905 }
906
907 on_flush_flush_commit_position_start(on_flush);
908 }
909
910 template <typename I>
911 void ImageReplayer<I>::on_flush_flush_commit_position_start(Context *on_flush)
912 {
913 FunctionContext *ctx = new FunctionContext(
914 [this, on_flush](int r) {
915 on_flush_flush_commit_position_finish(on_flush, r);
916 });
917
918 m_remote_journaler->flush_commit_position(ctx);
919 }
920
921 template <typename I>
922 void ImageReplayer<I>::on_flush_flush_commit_position_finish(Context *on_flush,
923 int r)
924 {
925 if (r < 0) {
926 derr << "error flushing remote journal commit position: "
927 << cpp_strerror(r) << dendl;
928 }
929
930 update_mirror_image_status(false, boost::none);
931
932 dout(20) << "flush complete, r=" << r << dendl;
933 on_flush->complete(r);
934 }
935
936 template <typename I>
937 bool ImageReplayer<I>::on_replay_interrupted()
938 {
939 bool shut_down;
940 {
941 Mutex::Locker locker(m_lock);
942 shut_down = m_stop_requested;
943 }
944
945 if (shut_down) {
946 on_stop_journal_replay();
947 }
948 return shut_down;
949 }
950
951 template <typename I>
952 void ImageReplayer<I>::print_status(Formatter *f, stringstream *ss)
953 {
954 dout(20) << "enter" << dendl;
955
956 Mutex::Locker l(m_lock);
957
958 if (f) {
959 f->open_object_section("image_replayer");
960 f->dump_string("name", m_name);
961 f->dump_string("state", to_string(m_state));
962 f->close_section();
963 f->flush(*ss);
964 } else {
965 *ss << m_name << ": state: " << to_string(m_state);
966 }
967 }
968
969 template <typename I>
970 void ImageReplayer<I>::handle_replay_complete(int r, const std::string &error_desc)
971 {
972 dout(20) << "r=" << r << dendl;
973 if (r < 0) {
974 derr << "replay encountered an error: " << cpp_strerror(r) << dendl;
975 set_state_description(r, error_desc);
976 }
977
978 {
979 Mutex::Locker locker(m_lock);
980 m_stop_requested = true;
981 }
982 on_replay_interrupted();
983 }
984
985 template <typename I>
986 void ImageReplayer<I>::replay_flush() {
987 dout(20) << dendl;
988
989 bool interrupted = false;
990 {
991 Mutex::Locker locker(m_lock);
992 if (m_state != STATE_REPLAYING) {
993 dout(20) << "replay interrupted" << dendl;
994 interrupted = true;
995 } else {
996 m_state = STATE_REPLAY_FLUSHING;
997 }
998 }
999
1000 if (interrupted) {
1001 m_event_replay_tracker.finish_op();
1002 return;
1003 }
1004
1005 // shut down the replay to flush all IO and ops and create a new
1006 // replayer to handle the new tag epoch
1007 Context *ctx = create_context_callback<
1008 ImageReplayer<I>, &ImageReplayer<I>::handle_replay_flush>(this);
1009 ctx = new FunctionContext([this, ctx](int r) {
1010 m_local_image_ctx->journal->stop_external_replay();
1011 m_local_replay = nullptr;
1012
1013 if (r < 0) {
1014 ctx->complete(r);
1015 return;
1016 }
1017
1018 m_local_journal->start_external_replay(&m_local_replay, ctx);
1019 });
1020 m_local_replay->shut_down(false, ctx);
1021 }
1022
1023 template <typename I>
1024 void ImageReplayer<I>::handle_replay_flush(int r) {
1025 dout(20) << "r=" << r << dendl;
1026
1027 {
1028 Mutex::Locker locker(m_lock);
1029 assert(m_state == STATE_REPLAY_FLUSHING);
1030 m_state = STATE_REPLAYING;
1031 }
1032
1033 if (r < 0) {
1034 derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl;
1035 m_event_replay_tracker.finish_op();
1036 handle_replay_complete(r, "replay flush encountered an error");
1037 return;
1038 } else if (on_replay_interrupted()) {
1039 m_event_replay_tracker.finish_op();
1040 return;
1041 }
1042
1043 get_remote_tag();
1044 }
1045
1046 template <typename I>
1047 void ImageReplayer<I>::get_remote_tag() {
1048 dout(20) << "tag_tid: " << m_replay_tag_tid << dendl;
1049
1050 Context *ctx = create_context_callback<
1051 ImageReplayer, &ImageReplayer<I>::handle_get_remote_tag>(this);
1052 m_remote_journaler->get_tag(m_replay_tag_tid, &m_replay_tag, ctx);
1053 }
1054
1055 template <typename I>
1056 void ImageReplayer<I>::handle_get_remote_tag(int r) {
1057 dout(20) << "r=" << r << dendl;
1058
1059 if (r == 0) {
1060 try {
1061 bufferlist::iterator it = m_replay_tag.data.begin();
1062 ::decode(m_replay_tag_data, it);
1063 } catch (const buffer::error &err) {
1064 r = -EBADMSG;
1065 }
1066 }
1067
1068 if (r < 0) {
1069 derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": "
1070 << cpp_strerror(r) << dendl;
1071 m_event_replay_tracker.finish_op();
1072 handle_replay_complete(r, "failed to retrieve remote tag");
1073 return;
1074 }
1075
1076 m_replay_tag_valid = true;
1077 dout(20) << "decoded remote tag " << m_replay_tag_tid << ": "
1078 << m_replay_tag_data << dendl;
1079
1080 allocate_local_tag();
1081 }
1082
1083 template <typename I>
1084 void ImageReplayer<I>::allocate_local_tag() {
1085 dout(15) << dendl;
1086
1087 std::string mirror_uuid = m_replay_tag_data.mirror_uuid;
1088 if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
1089 mirror_uuid = m_remote_image.mirror_uuid;
1090 } else if (mirror_uuid == m_local_mirror_uuid) {
1091 mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
1092 } else if (mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) {
1093 // handle possible edge condition where daemon can failover and
1094 // the local image has already been promoted/demoted
1095 auto local_tag_data = m_local_journal->get_tag_data();
1096 if (local_tag_data.mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID &&
1097 (local_tag_data.predecessor.commit_valid &&
1098 local_tag_data.predecessor.mirror_uuid ==
1099 librbd::Journal<>::LOCAL_MIRROR_UUID)) {
1100 dout(15) << "skipping stale demotion event" << dendl;
1101 handle_process_entry_safe(m_replay_entry, 0);
1102 handle_replay_ready();
1103 return;
1104 } else {
1105 dout(5) << "encountered image demotion: stopping" << dendl;
1106 Mutex::Locker locker(m_lock);
1107 m_stop_requested = true;
1108 }
1109 }
1110
1111 librbd::journal::TagPredecessor predecessor(m_replay_tag_data.predecessor);
1112 if (predecessor.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
1113 predecessor.mirror_uuid = m_remote_image.mirror_uuid;
1114 } else if (predecessor.mirror_uuid == m_local_mirror_uuid) {
1115 predecessor.mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
1116 }
1117
1118 dout(15) << "mirror_uuid=" << mirror_uuid << ", "
1119 << "predecessor=" << predecessor << ", "
1120 << "replay_tag_tid=" << m_replay_tag_tid << dendl;
1121 Context *ctx = create_context_callback<
1122 ImageReplayer, &ImageReplayer<I>::handle_allocate_local_tag>(this);
1123 m_local_journal->allocate_tag(mirror_uuid, predecessor, ctx);
1124 }
1125
1126 template <typename I>
1127 void ImageReplayer<I>::handle_allocate_local_tag(int r) {
1128 dout(15) << "r=" << r << ", "
1129 << "tag_tid=" << m_local_journal->get_tag_tid() << dendl;
1130
1131 if (r < 0) {
1132 derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl;
1133 m_event_replay_tracker.finish_op();
1134 handle_replay_complete(r, "failed to allocate journal tag");
1135 return;
1136 }
1137
1138 preprocess_entry();
1139 }
1140
1141 template <typename I>
1142 void ImageReplayer<I>::preprocess_entry() {
1143 dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid()
1144 << dendl;
1145
1146 bufferlist data = m_replay_entry.get_data();
1147 bufferlist::iterator it = data.begin();
1148 int r = m_local_replay->decode(&it, &m_event_entry);
1149 if (r < 0) {
1150 derr << "failed to decode journal event" << dendl;
1151 m_event_replay_tracker.finish_op();
1152 handle_replay_complete(r, "failed to decode journal event");
1153 return;
1154 }
1155
1156 uint32_t delay = calculate_replay_delay(
1157 m_event_entry.timestamp, m_local_image_ctx->mirroring_replay_delay);
1158 if (delay == 0) {
1159 handle_preprocess_entry_ready(0);
1160 return;
1161 }
1162
1163 dout(20) << "delaying replay by " << delay << " sec" << dendl;
1164
1165 Mutex::Locker timer_locker(m_threads->timer_lock);
1166 assert(m_delayed_preprocess_task == nullptr);
1167 m_delayed_preprocess_task = new FunctionContext(
1168 [this](int r) {
1169 assert(m_threads->timer_lock.is_locked());
1170 m_delayed_preprocess_task = nullptr;
1171 m_threads->work_queue->queue(
1172 create_context_callback<ImageReplayer,
1173 &ImageReplayer<I>::handle_preprocess_entry_ready>(this), 0);
1174 });
1175 m_threads->timer->add_event_after(delay, m_delayed_preprocess_task);
1176 }
1177
1178 template <typename I>
1179 void ImageReplayer<I>::handle_preprocess_entry_ready(int r) {
1180 dout(20) << "r=" << r << dendl;
1181 assert(r == 0);
1182
1183 if (!m_event_preprocessor->is_required(m_event_entry)) {
1184 process_entry();
1185 return;
1186 }
1187
1188 Context *ctx = create_context_callback<
1189 ImageReplayer, &ImageReplayer<I>::handle_preprocess_entry_safe>(this);
1190 m_event_preprocessor->preprocess(&m_event_entry, ctx);
1191 }
1192
1193 template <typename I>
1194 void ImageReplayer<I>::handle_preprocess_entry_safe(int r) {
1195 dout(20) << "r=" << r << dendl;
1196
1197 if (r < 0) {
1198 m_event_replay_tracker.finish_op();
1199
1200 if (r == -ECANCELED) {
1201 handle_replay_complete(0, "lost exclusive lock");
1202 } else {
1203 derr << "failed to preprocess journal event" << dendl;
1204 handle_replay_complete(r, "failed to preprocess journal event");
1205 }
1206 return;
1207 }
1208
1209 process_entry();
1210 }
1211
1212 template <typename I>
1213 void ImageReplayer<I>::process_entry() {
1214 dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid()
1215 << dendl;
1216
1217 // stop replaying events if stop has been requested
1218 if (on_replay_interrupted()) {
1219 m_event_replay_tracker.finish_op();
1220 return;
1221 }
1222
1223 Context *on_ready = create_context_callback<
1224 ImageReplayer, &ImageReplayer<I>::handle_process_entry_ready>(this);
1225 Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry));
1226
1227 m_local_replay->process(m_event_entry, on_ready, on_commit);
1228 }
1229
1230 template <typename I>
1231 void ImageReplayer<I>::handle_process_entry_ready(int r) {
1232 dout(20) << dendl;
1233 assert(r == 0);
1234
1235 bool update_status = false;
1236 {
1237 RWLock::RLocker snap_locker(m_local_image_ctx->snap_lock);
1238 if (m_local_image_name != m_local_image_ctx->name) {
1239 m_local_image_name = m_local_image_ctx->name;
1240 update_status = true;
1241 }
1242 }
1243
1244 if (update_status) {
1245 reschedule_update_status_task(0);
1246 }
1247
1248 // attempt to process the next event
1249 handle_replay_ready();
1250 }
1251
1252 template <typename I>
1253 void ImageReplayer<I>::handle_process_entry_safe(const ReplayEntry& replay_entry,
1254 int r) {
1255 dout(20) << "commit_tid=" << replay_entry.get_commit_tid() << ", r=" << r
1256 << dendl;
1257
1258 if (r < 0) {
1259 derr << "failed to commit journal event: " << cpp_strerror(r) << dendl;
1260 handle_replay_complete(r, "failed to commit journal event");
1261 } else {
1262 assert(m_remote_journaler != nullptr);
1263 m_remote_journaler->committed(replay_entry);
1264 }
1265 m_event_replay_tracker.finish_op();
1266 }
1267
1268 template <typename I>
1269 bool ImageReplayer<I>::update_mirror_image_status(bool force,
1270 const OptionalState &state) {
1271 dout(20) << dendl;
1272 {
1273 Mutex::Locker locker(m_lock);
1274 if (!start_mirror_image_status_update(force, false)) {
1275 return false;
1276 }
1277 }
1278
1279 queue_mirror_image_status_update(state);
1280 return true;
1281 }
1282
1283 template <typename I>
1284 bool ImageReplayer<I>::start_mirror_image_status_update(bool force,
1285 bool restarting) {
1286 assert(m_lock.is_locked());
1287
1288 if (!force && !is_stopped_()) {
1289 if (!is_running_()) {
1290 dout(20) << "shut down in-progress: ignoring update" << dendl;
1291 return false;
1292 } else if (m_in_flight_status_updates > (restarting ? 1 : 0)) {
1293 dout(20) << "already sending update" << dendl;
1294 m_update_status_requested = true;
1295 return false;
1296 }
1297 }
1298
1299 dout(20) << dendl;
1300 ++m_in_flight_status_updates;
1301 return true;
1302 }
1303
1304 template <typename I>
1305 void ImageReplayer<I>::finish_mirror_image_status_update() {
1306 reregister_admin_socket_hook();
1307
1308 Context *on_finish = nullptr;
1309 {
1310 Mutex::Locker locker(m_lock);
1311 assert(m_in_flight_status_updates > 0);
1312 if (--m_in_flight_status_updates > 0) {
1313 dout(20) << "waiting on " << m_in_flight_status_updates << " in-flight "
1314 << "updates" << dendl;
1315 return;
1316 }
1317
1318 std::swap(on_finish, m_on_update_status_finish);
1319 }
1320
1321 dout(20) << dendl;
1322 if (on_finish != nullptr) {
1323 on_finish->complete(0);
1324 }
1325 }
1326
1327 template <typename I>
1328 void ImageReplayer<I>::queue_mirror_image_status_update(const OptionalState &state) {
1329 dout(20) << dendl;
1330 FunctionContext *ctx = new FunctionContext(
1331 [this, state](int r) {
1332 send_mirror_status_update(state);
1333 });
1334 m_threads->work_queue->queue(ctx, 0);
1335 }
1336
1337 template <typename I>
1338 void ImageReplayer<I>::send_mirror_status_update(const OptionalState &opt_state) {
1339 State state;
1340 std::string state_desc;
1341 int last_r;
1342 bool stopping_replay;
1343
1344 OptionalMirrorImageStatusState mirror_image_status_state{
1345 boost::make_optional(false, cls::rbd::MirrorImageStatusState{})};
1346 image_replayer::BootstrapRequest<I>* bootstrap_request = nullptr;
1347 {
1348 Mutex::Locker locker(m_lock);
1349 state = m_state;
1350 state_desc = m_state_desc;
1351 mirror_image_status_state = m_mirror_image_status_state;
1352 last_r = m_last_r;
1353 stopping_replay = (m_local_image_ctx != nullptr);
1354
1355 if (m_bootstrap_request != nullptr) {
1356 bootstrap_request = m_bootstrap_request;
1357 bootstrap_request->get();
1358 }
1359 }
1360
1361 bool syncing = false;
1362 if (bootstrap_request != nullptr) {
1363 syncing = bootstrap_request->is_syncing();
1364 bootstrap_request->put();
1365 bootstrap_request = nullptr;
1366 }
1367
1368 if (opt_state) {
1369 state = *opt_state;
1370 }
1371
1372 cls::rbd::MirrorImageStatus status;
1373 status.up = true;
1374 switch (state) {
1375 case STATE_STARTING:
1376 if (syncing) {
1377 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING;
1378 status.description = state_desc.empty() ? "syncing" : state_desc;
1379 mirror_image_status_state = status.state;
1380 } else {
1381 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY;
1382 status.description = "starting replay";
1383 }
1384 break;
1385 case STATE_REPLAYING:
1386 case STATE_REPLAY_FLUSHING:
1387 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
1388 {
1389 Context *on_req_finish = new FunctionContext(
1390 [this](int r) {
1391 dout(20) << "replay status ready: r=" << r << dendl;
1392 if (r >= 0) {
1393 send_mirror_status_update(boost::none);
1394 } else if (r == -EAGAIN) {
1395 // decrement in-flight status update counter
1396 handle_mirror_status_update(r);
1397 }
1398 });
1399
1400 std::string desc;
1401 if (!m_replay_status_formatter->get_or_send_update(&desc,
1402 on_req_finish)) {
1403 dout(20) << "waiting for replay status" << dendl;
1404 return;
1405 }
1406 status.description = "replaying, " + desc;
1407 mirror_image_status_state = boost::none;
1408 }
1409 break;
1410 case STATE_STOPPING:
1411 if (stopping_replay) {
1412 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY;
1413 status.description = "stopping replay";
1414 break;
1415 }
1416 // FALLTHROUGH
1417 case STATE_STOPPED:
1418 if (last_r == -EREMOTEIO) {
1419 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN;
1420 status.description = state_desc;
1421 mirror_image_status_state = status.state;
1422 } else if (last_r < 0) {
1423 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR;
1424 status.description = state_desc;
1425 mirror_image_status_state = status.state;
1426 } else {
1427 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED;
1428 status.description = state_desc.empty() ? "stopped" : state_desc;
1429 mirror_image_status_state = boost::none;
1430 }
1431 break;
1432 default:
1433 assert(!"invalid state");
1434 }
1435
1436 {
1437 Mutex::Locker locker(m_lock);
1438 m_mirror_image_status_state = mirror_image_status_state;
1439 }
1440
1441 // prevent the status from ping-ponging when failed replays are restarted
1442 if (mirror_image_status_state &&
1443 *mirror_image_status_state == cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR) {
1444 status.state = *mirror_image_status_state;
1445 }
1446
1447 dout(20) << "status=" << status << dendl;
1448 librados::ObjectWriteOperation op;
1449 librbd::cls_client::mirror_image_status_set(&op, m_global_image_id, status);
1450
1451 librados::AioCompletion *aio_comp = create_rados_callback<
1452 ImageReplayer<I>, &ImageReplayer<I>::handle_mirror_status_update>(this);
1453 int r = m_local_ioctx.aio_operate(RBD_MIRRORING, aio_comp, &op);
1454 assert(r == 0);
1455 aio_comp->release();
1456 }
1457
1458 template <typename I>
1459 void ImageReplayer<I>::handle_mirror_status_update(int r) {
1460 dout(20) << "r=" << r << dendl;
1461
1462 bool running = false;
1463 bool started = false;
1464 {
1465 Mutex::Locker locker(m_lock);
1466 bool update_status_requested = false;
1467 std::swap(update_status_requested, m_update_status_requested);
1468
1469 running = is_running_();
1470 if (running && update_status_requested) {
1471 started = start_mirror_image_status_update(false, true);
1472 }
1473 }
1474
1475 // if a deferred update is available, send it -- otherwise reschedule
1476 // the timer task
1477 if (started) {
1478 queue_mirror_image_status_update(boost::none);
1479 } else if (running) {
1480 reschedule_update_status_task();
1481 }
1482
1483 // mark committed status update as no longer in-flight
1484 finish_mirror_image_status_update();
1485 }
1486
1487 template <typename I>
1488 void ImageReplayer<I>::reschedule_update_status_task(int new_interval) {
1489 dout(20) << dendl;
1490
1491 bool canceled_task = false;
1492 {
1493 Mutex::Locker locker(m_lock);
1494 Mutex::Locker timer_locker(m_threads->timer_lock);
1495
1496 if (m_update_status_task) {
1497 canceled_task = m_threads->timer->cancel_event(m_update_status_task);
1498 m_update_status_task = nullptr;
1499 }
1500
1501 if (new_interval > 0) {
1502 m_update_status_interval = new_interval;
1503 }
1504
1505 bool restarting = (new_interval == 0 || canceled_task);
1506 if (new_interval >= 0 && is_running_() &&
1507 start_mirror_image_status_update(false, restarting)) {
1508 m_update_status_task = new FunctionContext(
1509 [this](int r) {
1510 assert(m_threads->timer_lock.is_locked());
1511 m_update_status_task = nullptr;
1512
1513 queue_mirror_image_status_update(boost::none);
1514 });
1515 m_threads->timer->add_event_after(m_update_status_interval,
1516 m_update_status_task);
1517 }
1518 }
1519
1520 if (canceled_task) {
1521 dout(20) << "canceled task" << dendl;
1522 finish_mirror_image_status_update();
1523 }
1524 }
1525
1526 template <typename I>
1527 void ImageReplayer<I>::shut_down(int r) {
1528 dout(20) << "r=" << r << dendl;
1529
1530 bool canceled_delayed_preprocess_task = false;
1531 {
1532 Mutex::Locker timer_locker(m_threads->timer_lock);
1533 if (m_delayed_preprocess_task != nullptr) {
1534 canceled_delayed_preprocess_task = m_threads->timer->cancel_event(
1535 m_delayed_preprocess_task);
1536 assert(canceled_delayed_preprocess_task);
1537 m_delayed_preprocess_task = nullptr;
1538 }
1539 }
1540 if (canceled_delayed_preprocess_task) {
1541 // wake up sleeping replay
1542 m_event_replay_tracker.finish_op();
1543 }
1544
1545 {
1546 Mutex::Locker locker(m_lock);
1547 assert(m_state == STATE_STOPPING);
1548
1549 // if status updates are in-flight, wait for them to complete
1550 // before proceeding
1551 if (m_in_flight_status_updates > 0) {
1552 if (m_on_update_status_finish == nullptr) {
1553 dout(20) << "waiting for in-flight status update" << dendl;
1554 m_on_update_status_finish = new FunctionContext(
1555 [this, r](int _r) {
1556 shut_down(r);
1557 });
1558 }
1559 return;
1560 }
1561 }
1562
1563 // NOTE: it's important to ensure that the local image is fully
1564 // closed before attempting to close the remote journal in
1565 // case the remote cluster is unreachable
1566
1567 // chain the shut down sequence (reverse order)
1568 Context *ctx = new FunctionContext(
1569 [this, r](int _r) {
1570 update_mirror_image_status(true, STATE_STOPPED);
1571 handle_shut_down(r);
1572 });
1573
1574 // close the remote journal
1575 if (m_remote_journaler != nullptr) {
1576 ctx = new FunctionContext([this, ctx](int r) {
1577 delete m_remote_journaler;
1578 m_remote_journaler = nullptr;
1579 ctx->complete(0);
1580 });
1581 ctx = new FunctionContext([this, ctx](int r) {
1582 m_remote_journaler->remove_listener(&m_remote_listener);
1583 m_remote_journaler->shut_down(ctx);
1584 });
1585 }
1586
1587 // stop the replay of remote journal events
1588 if (m_replay_handler != nullptr) {
1589 ctx = new FunctionContext([this, ctx](int r) {
1590 delete m_replay_handler;
1591 m_replay_handler = nullptr;
1592
1593 m_event_replay_tracker.wait_for_ops(ctx);
1594 });
1595 ctx = new FunctionContext([this, ctx](int r) {
1596 m_remote_journaler->stop_replay(ctx);
1597 });
1598 }
1599
1600 // close the local image (release exclusive lock)
1601 if (m_local_image_ctx) {
1602 ctx = new FunctionContext([this, ctx](int r) {
1603 CloseImageRequest<I> *request = CloseImageRequest<I>::create(
1604 &m_local_image_ctx, ctx);
1605 request->send();
1606 });
1607 }
1608
1609 // shut down event replay into the local image
1610 if (m_local_journal != nullptr) {
1611 ctx = new FunctionContext([this, ctx](int r) {
1612 m_local_journal = nullptr;
1613 ctx->complete(0);
1614 });
1615 if (m_local_replay != nullptr) {
1616 ctx = new FunctionContext([this, ctx](int r) {
1617 m_local_journal->stop_external_replay();
1618 m_local_replay = nullptr;
1619
1620 EventPreprocessor<I>::destroy(m_event_preprocessor);
1621 m_event_preprocessor = nullptr;
1622 ctx->complete(0);
1623 });
1624 }
1625 ctx = new FunctionContext([this, ctx](int r) {
1626 // blocks if listener notification is in-progress
1627 m_local_journal->remove_listener(m_journal_listener);
1628 ctx->complete(0);
1629 });
1630 }
1631
1632 // wait for all local in-flight replay events to complete
1633 ctx = new FunctionContext([this, ctx](int r) {
1634 if (r < 0) {
1635 derr << "error shutting down journal replay: " << cpp_strerror(r)
1636 << dendl;
1637 }
1638
1639 m_event_replay_tracker.wait_for_ops(ctx);
1640 });
1641
1642 // flush any local in-flight replay events
1643 if (m_local_replay != nullptr) {
1644 ctx = new FunctionContext([this, ctx](int r) {
1645 m_local_replay->shut_down(true, ctx);
1646 });
1647 }
1648
1649 m_threads->work_queue->queue(ctx, 0);
1650 }
1651
1652 template <typename I>
1653 void ImageReplayer<I>::handle_shut_down(int r) {
1654 reschedule_update_status_task(-1);
1655
1656 bool unregister_asok_hook = false;
1657 {
1658 Mutex::Locker locker(m_lock);
1659
1660 // if status updates are in-flight, wait for them to complete
1661 // before proceeding
1662 if (m_in_flight_status_updates > 0) {
1663 if (m_on_update_status_finish == nullptr) {
1664 dout(20) << "waiting for in-flight status update" << dendl;
1665 m_on_update_status_finish = new FunctionContext(
1666 [this, r](int _r) {
1667 handle_shut_down(r);
1668 });
1669 }
1670 return;
1671 }
1672
1673 bool delete_requested = false;
1674 if (m_delete_requested && !m_local_image_id.empty()) {
1675 assert(m_remote_image.image_id.empty());
1676 dout(0) << "remote image no longer exists: scheduling deletion" << dendl;
1677 delete_requested = true;
1678 }
1679 if (delete_requested || m_resync_requested) {
1680 m_image_deleter->schedule_image_delete(m_local,
1681 m_local_pool_id,
1682 m_global_image_id,
1683 m_resync_requested);
1684
1685 m_local_image_id = "";
1686 m_resync_requested = false;
1687 if (m_delete_requested) {
1688 unregister_asok_hook = true;
1689 m_delete_requested = false;
1690 }
1691 } else if (m_last_r == -ENOENT &&
1692 m_local_image_id.empty() && m_remote_image.image_id.empty()) {
1693 dout(0) << "mirror image no longer exists" << dendl;
1694 unregister_asok_hook = true;
1695 m_finished = true;
1696 }
1697 }
1698
1699 if (unregister_asok_hook) {
1700 unregister_admin_socket_hook();
1701 }
1702
1703 dout(20) << "stop complete" << dendl;
1704 m_local_ioctx.close();
1705
1706 ReplayStatusFormatter<I>::destroy(m_replay_status_formatter);
1707 m_replay_status_formatter = nullptr;
1708
1709 Context *on_start = nullptr;
1710 Context *on_stop = nullptr;
1711 {
1712 Mutex::Locker locker(m_lock);
1713 std::swap(on_start, m_on_start_finish);
1714 std::swap(on_stop, m_on_stop_finish);
1715 m_stop_requested = false;
1716 assert(m_delayed_preprocess_task == nullptr);
1717 assert(m_state == STATE_STOPPING);
1718 m_state = STATE_STOPPED;
1719 }
1720
1721 if (on_start != nullptr) {
1722 dout(20) << "on start finish complete, r=" << r << dendl;
1723 on_start->complete(r);
1724 r = 0;
1725 }
1726 if (on_stop != nullptr) {
1727 dout(20) << "on stop finish complete, r=" << r << dendl;
1728 on_stop->complete(r);
1729 }
1730 }
1731
1732 template <typename I>
1733 void ImageReplayer<I>::handle_remote_journal_metadata_updated() {
1734 dout(20) << dendl;
1735
1736 cls::journal::Client client;
1737 {
1738 Mutex::Locker locker(m_lock);
1739 if (!is_running_()) {
1740 return;
1741 }
1742
1743 int r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
1744 if (r < 0) {
1745 derr << "failed to retrieve client: " << cpp_strerror(r) << dendl;
1746 return;
1747 }
1748 }
1749
1750 if (client.state != cls::journal::CLIENT_STATE_CONNECTED) {
1751 dout(0) << "client flagged disconnected, stopping image replay" << dendl;
1752 stop(nullptr, false, -ENOTCONN, "disconnected");
1753 }
1754 }
1755
1756 template <typename I>
1757 std::string ImageReplayer<I>::to_string(const State state) {
1758 switch (state) {
1759 case ImageReplayer<I>::STATE_STARTING:
1760 return "Starting";
1761 case ImageReplayer<I>::STATE_REPLAYING:
1762 return "Replaying";
1763 case ImageReplayer<I>::STATE_REPLAY_FLUSHING:
1764 return "ReplayFlushing";
1765 case ImageReplayer<I>::STATE_STOPPING:
1766 return "Stopping";
1767 case ImageReplayer<I>::STATE_STOPPED:
1768 return "Stopped";
1769 default:
1770 break;
1771 }
1772 return "Unknown(" + stringify(state) + ")";
1773 }
1774
1775 template <typename I>
1776 void ImageReplayer<I>::resync_image(Context *on_finish) {
1777 dout(20) << dendl;
1778
1779 m_resync_requested = true;
1780 stop(on_finish);
1781 }
1782
1783 template <typename I>
1784 void ImageReplayer<I>::register_admin_socket_hook() {
1785 ImageReplayerAdminSocketHook<I> *asok_hook;
1786 {
1787 Mutex::Locker locker(m_lock);
1788 if (m_asok_hook != nullptr) {
1789 return;
1790 }
1791
1792 dout(15) << "registered asok hook: " << m_name << dendl;
1793 asok_hook = new ImageReplayerAdminSocketHook<I>(g_ceph_context, m_name,
1794 this);
1795 int r = asok_hook->register_commands();
1796 if (r == 0) {
1797 m_asok_hook = asok_hook;
1798 return;
1799 }
1800 derr << "error registering admin socket commands" << dendl;
1801 }
1802 delete asok_hook;
1803 }
1804
1805 template <typename I>
1806 void ImageReplayer<I>::unregister_admin_socket_hook() {
1807 dout(15) << dendl;
1808
1809 AdminSocketHook *asok_hook = nullptr;
1810 {
1811 Mutex::Locker locker(m_lock);
1812 std::swap(asok_hook, m_asok_hook);
1813 }
1814 delete asok_hook;
1815 }
1816
1817 template <typename I>
1818 void ImageReplayer<I>::reregister_admin_socket_hook() {
1819 {
1820 Mutex::Locker locker(m_lock);
1821 auto name = m_local_ioctx.get_pool_name() + "/" + m_local_image_name;
1822 if (m_name == name) {
1823 return;
1824 }
1825 m_name = name;
1826 }
1827 unregister_admin_socket_hook();
1828 register_admin_socket_hook();
1829 }
1830
1831 template <typename I>
1832 std::ostream &operator<<(std::ostream &os, const ImageReplayer<I> &replayer)
1833 {
1834 os << "ImageReplayer: " << &replayer << " [" << replayer.get_local_pool_id()
1835 << "/" << replayer.get_global_image_id() << "]";
1836 return os;
1837 }
1838
1839 } // namespace mirror
1840 } // namespace rbd
1841
1842 template class rbd::mirror::ImageReplayer<librbd::ImageCtx>;