]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/ImageReplayer.cc
import ceph nautilus 14.2.2
[ceph.git] / ceph / src / tools / rbd_mirror / ImageReplayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/compat.h"
5 #include "common/Formatter.h"
6 #include "common/admin_socket.h"
7 #include "common/debug.h"
8 #include "common/errno.h"
9 #include "include/stringify.h"
10 #include "cls/rbd/cls_rbd_client.h"
11 #include "common/Timer.h"
12 #include "common/WorkQueue.h"
13 #include "global/global_context.h"
14 #include "journal/Journaler.h"
15 #include "journal/ReplayHandler.h"
16 #include "journal/Settings.h"
17 #include "librbd/ExclusiveLock.h"
18 #include "librbd/ImageCtx.h"
19 #include "librbd/ImageState.h"
20 #include "librbd/Journal.h"
21 #include "librbd/Operations.h"
22 #include "librbd/Utils.h"
23 #include "librbd/journal/Replay.h"
24 #include "ImageDeleter.h"
25 #include "ImageReplayer.h"
26 #include "Threads.h"
27 #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
28 #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
29 #include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
30 #include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
31 #include "tools/rbd_mirror/image_replayer/PrepareRemoteImageRequest.h"
32 #include "tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h"
33
34 #define dout_context g_ceph_context
35 #define dout_subsys ceph_subsys_rbd_mirror
36 #undef dout_prefix
37 #define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
38 << __func__ << ": "
39
40 using std::map;
41 using std::string;
42 using std::unique_ptr;
43 using std::shared_ptr;
44 using std::vector;
45
46 extern PerfCounters *g_perf_counters;
47
48 namespace rbd {
49 namespace mirror {
50
51 using librbd::util::create_context_callback;
52 using librbd::util::create_rados_callback;
53 using namespace rbd::mirror::image_replayer;
54
55 template <typename I>
56 std::ostream &operator<<(std::ostream &os,
57 const typename ImageReplayer<I>::State &state);
58
59 namespace {
60
61 template <typename I>
62 struct ReplayHandler : public ::journal::ReplayHandler {
63 ImageReplayer<I> *replayer;
64 ReplayHandler(ImageReplayer<I> *replayer) : replayer(replayer) {}
65 void get() override {}
66 void put() override {}
67
68 void handle_entries_available() override {
69 replayer->handle_replay_ready();
70 }
71 void handle_complete(int r) override {
72 std::stringstream ss;
73 if (r < 0) {
74 ss << "replay completed with error: " << cpp_strerror(r);
75 }
76 replayer->handle_replay_complete(r, ss.str());
77 }
78 };
79
80 template <typename I>
81 class ImageReplayerAdminSocketCommand {
82 public:
83 ImageReplayerAdminSocketCommand(const std::string &desc,
84 ImageReplayer<I> *replayer)
85 : desc(desc), replayer(replayer) {
86 }
87 virtual ~ImageReplayerAdminSocketCommand() {}
88 virtual bool call(Formatter *f, stringstream *ss) = 0;
89
90 std::string desc;
91 ImageReplayer<I> *replayer;
92 bool registered = false;
93 };
94
95 template <typename I>
96 class StatusCommand : public ImageReplayerAdminSocketCommand<I> {
97 public:
98 explicit StatusCommand(const std::string &desc, ImageReplayer<I> *replayer)
99 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
100 }
101
102 bool call(Formatter *f, stringstream *ss) override {
103 this->replayer->print_status(f, ss);
104 return true;
105 }
106 };
107
108 template <typename I>
109 class StartCommand : public ImageReplayerAdminSocketCommand<I> {
110 public:
111 explicit StartCommand(const std::string &desc, ImageReplayer<I> *replayer)
112 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
113 }
114
115 bool call(Formatter *f, stringstream *ss) override {
116 this->replayer->start(nullptr, true);
117 return true;
118 }
119 };
120
121 template <typename I>
122 class StopCommand : public ImageReplayerAdminSocketCommand<I> {
123 public:
124 explicit StopCommand(const std::string &desc, ImageReplayer<I> *replayer)
125 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
126 }
127
128 bool call(Formatter *f, stringstream *ss) override {
129 this->replayer->stop(nullptr, true);
130 return true;
131 }
132 };
133
134 template <typename I>
135 class RestartCommand : public ImageReplayerAdminSocketCommand<I> {
136 public:
137 explicit RestartCommand(const std::string &desc, ImageReplayer<I> *replayer)
138 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
139 }
140
141 bool call(Formatter *f, stringstream *ss) override {
142 this->replayer->restart();
143 return true;
144 }
145 };
146
147 template <typename I>
148 class FlushCommand : public ImageReplayerAdminSocketCommand<I> {
149 public:
150 explicit FlushCommand(const std::string &desc, ImageReplayer<I> *replayer)
151 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
152 }
153
154 bool call(Formatter *f, stringstream *ss) override {
155 this->replayer->flush();
156 return true;
157 }
158 };
159
160 template <typename I>
161 class ImageReplayerAdminSocketHook : public AdminSocketHook {
162 public:
163 ImageReplayerAdminSocketHook(CephContext *cct, const std::string &name,
164 ImageReplayer<I> *replayer)
165 : admin_socket(cct->get_admin_socket()),
166 commands{{"rbd mirror flush " + name,
167 new FlushCommand<I>("flush rbd mirror " + name, replayer)},
168 {"rbd mirror restart " + name,
169 new RestartCommand<I>("restart rbd mirror " + name, replayer)},
170 {"rbd mirror start " + name,
171 new StartCommand<I>("start rbd mirror " + name, replayer)},
172 {"rbd mirror status " + name,
173 new StatusCommand<I>("get status for rbd mirror " + name, replayer)},
174 {"rbd mirror stop " + name,
175 new StopCommand<I>("stop rbd mirror " + name, replayer)}} {
176 }
177
178 int register_commands() {
179 for (auto &it : commands) {
180 int r = admin_socket->register_command(it.first, it.first, this,
181 it.second->desc);
182 if (r < 0) {
183 return r;
184 }
185 it.second->registered = true;
186 }
187 return 0;
188 }
189
190 ~ImageReplayerAdminSocketHook() override {
191 for (auto &it : commands) {
192 if (it.second->registered) {
193 admin_socket->unregister_command(it.first);
194 }
195 delete it.second;
196 }
197 commands.clear();
198 }
199
200 bool call(std::string_view command, const cmdmap_t& cmdmap,
201 std::string_view format, bufferlist& out) override {
202 auto i = commands.find(command);
203 ceph_assert(i != commands.end());
204 Formatter *f = Formatter::create(format);
205 stringstream ss;
206 bool r = i->second->call(f, &ss);
207 delete f;
208 out.append(ss);
209 return r;
210 }
211
212 private:
213 typedef std::map<std::string, ImageReplayerAdminSocketCommand<I>*,
214 std::less<>> Commands;
215
216 AdminSocket *admin_socket;
217 Commands commands;
218 };
219
220 uint32_t calculate_replay_delay(const utime_t &event_time,
221 int mirroring_replay_delay) {
222 if (mirroring_replay_delay <= 0) {
223 return 0;
224 }
225
226 utime_t now = ceph_clock_now();
227 if (event_time + mirroring_replay_delay <= now) {
228 return 0;
229 }
230
231 // ensure it is rounded up when converting to integer
232 return (event_time + mirroring_replay_delay - now) + 1;
233 }
234
235 } // anonymous namespace
236
237 template <typename I>
238 void ImageReplayer<I>::BootstrapProgressContext::update_progress(
239 const std::string &description, bool flush)
240 {
241 const std::string desc = "bootstrapping, " + description;
242 replayer->set_state_description(0, desc);
243 if (flush) {
244 replayer->update_mirror_image_status(false, boost::none);
245 }
246 }
247
248 template <typename I>
249 void ImageReplayer<I>::RemoteJournalerListener::handle_update(
250 ::journal::JournalMetadata *) {
251 FunctionContext *ctx = new FunctionContext([this](int r) {
252 replayer->handle_remote_journal_metadata_updated();
253 });
254 replayer->m_threads->work_queue->queue(ctx, 0);
255 }
256
257 template <typename I>
258 ImageReplayer<I>::ImageReplayer(Threads<I> *threads,
259 InstanceWatcher<I> *instance_watcher,
260 RadosRef local,
261 const std::string &local_mirror_uuid,
262 int64_t local_pool_id,
263 const std::string &global_image_id) :
264 m_threads(threads),
265 m_instance_watcher(instance_watcher),
266 m_local(local),
267 m_local_mirror_uuid(local_mirror_uuid),
268 m_local_pool_id(local_pool_id),
269 m_global_image_id(global_image_id), m_local_image_name(global_image_id),
270 m_lock("rbd::mirror::ImageReplayer " + stringify(local_pool_id) + " " +
271 global_image_id),
272 m_progress_cxt(this),
273 m_journal_listener(new JournalListener(this)),
274 m_remote_listener(this)
275 {
276 // Register asok commands using a temporary "remote_pool_name/global_image_id"
277 // name. When the image name becomes known on start the asok commands will be
278 // re-registered using "remote_pool_name/remote_image_name" name.
279
280 std::string pool_name;
281 int r = m_local->pool_reverse_lookup(m_local_pool_id, &pool_name);
282 if (r < 0) {
283 derr << "error resolving local pool " << m_local_pool_id
284 << ": " << cpp_strerror(r) << dendl;
285 pool_name = stringify(m_local_pool_id);
286 }
287
288 m_name = pool_name + "/" + m_global_image_id;
289 register_admin_socket_hook();
290 }
291
292 template <typename I>
293 ImageReplayer<I>::~ImageReplayer()
294 {
295 unregister_admin_socket_hook();
296 ceph_assert(m_event_preprocessor == nullptr);
297 ceph_assert(m_replay_status_formatter == nullptr);
298 ceph_assert(m_local_image_ctx == nullptr);
299 ceph_assert(m_local_replay == nullptr);
300 ceph_assert(m_remote_journaler == nullptr);
301 ceph_assert(m_replay_handler == nullptr);
302 ceph_assert(m_on_start_finish == nullptr);
303 ceph_assert(m_on_stop_finish == nullptr);
304 ceph_assert(m_bootstrap_request == nullptr);
305 ceph_assert(m_in_flight_status_updates == 0);
306
307 delete m_journal_listener;
308 }
309
310 template <typename I>
311 image_replayer::HealthState ImageReplayer<I>::get_health_state() const {
312 Mutex::Locker locker(m_lock);
313
314 if (!m_mirror_image_status_state) {
315 return image_replayer::HEALTH_STATE_OK;
316 } else if (*m_mirror_image_status_state ==
317 cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING ||
318 *m_mirror_image_status_state ==
319 cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN) {
320 return image_replayer::HEALTH_STATE_WARNING;
321 }
322 return image_replayer::HEALTH_STATE_ERROR;
323 }
324
325 template <typename I>
326 void ImageReplayer<I>::add_peer(const std::string &peer_uuid,
327 librados::IoCtx &io_ctx) {
328 Mutex::Locker locker(m_lock);
329 auto it = m_peers.find({peer_uuid});
330 if (it == m_peers.end()) {
331 m_peers.insert({peer_uuid, io_ctx});
332 }
333 }
334
335 template <typename I>
336 void ImageReplayer<I>::set_state_description(int r, const std::string &desc) {
337 dout(10) << r << " " << desc << dendl;
338
339 Mutex::Locker l(m_lock);
340 m_last_r = r;
341 m_state_desc = desc;
342 }
343
344 template <typename I>
345 void ImageReplayer<I>::start(Context *on_finish, bool manual)
346 {
347 dout(10) << "on_finish=" << on_finish << dendl;
348
349 int r = 0;
350 {
351 Mutex::Locker locker(m_lock);
352 if (!is_stopped_()) {
353 derr << "already running" << dendl;
354 r = -EINVAL;
355 } else if (m_manual_stop && !manual) {
356 dout(5) << "stopped manually, ignoring start without manual flag"
357 << dendl;
358 r = -EPERM;
359 } else {
360 m_state = STATE_STARTING;
361 m_last_r = 0;
362 m_state_desc.clear();
363 m_manual_stop = false;
364 m_delete_requested = false;
365
366 if (on_finish != nullptr) {
367 ceph_assert(m_on_start_finish == nullptr);
368 m_on_start_finish = on_finish;
369 }
370 ceph_assert(m_on_stop_finish == nullptr);
371 }
372 }
373
374 if (r < 0) {
375 if (on_finish) {
376 on_finish->complete(r);
377 }
378 return;
379 }
380
381 m_local_ioctx.reset(new librados::IoCtx{});
382 r = m_local->ioctx_create2(m_local_pool_id, *m_local_ioctx);
383 if (r < 0) {
384 m_local_ioctx.reset();
385
386 derr << "error opening ioctx for local pool " << m_local_pool_id
387 << ": " << cpp_strerror(r) << dendl;
388 on_start_fail(r, "error opening local pool");
389 return;
390 }
391
392 prepare_local_image();
393 }
394
395 template <typename I>
396 void ImageReplayer<I>::prepare_local_image() {
397 dout(10) << dendl;
398
399 m_local_image_id = "";
400 Context *ctx = create_context_callback<
401 ImageReplayer, &ImageReplayer<I>::handle_prepare_local_image>(this);
402 auto req = PrepareLocalImageRequest<I>::create(
403 *m_local_ioctx, m_global_image_id, &m_local_image_id, &m_local_image_name,
404 &m_local_image_tag_owner, m_threads->work_queue, ctx);
405 req->send();
406 }
407
408 template <typename I>
409 void ImageReplayer<I>::handle_prepare_local_image(int r) {
410 dout(10) << "r=" << r << dendl;
411
412 if (r == -ENOENT) {
413 dout(10) << "local image does not exist" << dendl;
414 } else if (r < 0) {
415 on_start_fail(r, "error preparing local image for replay");
416 return;
417 } else {
418 reregister_admin_socket_hook();
419 }
420
421 // local image doesn't exist or is non-primary
422 prepare_remote_image();
423 }
424
425 template <typename I>
426 void ImageReplayer<I>::prepare_remote_image() {
427 dout(10) << dendl;
428 if (m_peers.empty()) {
429 // technically nothing to bootstrap, but it handles the status update
430 bootstrap();
431 return;
432 }
433
434 // TODO need to support multiple remote images
435 ceph_assert(!m_peers.empty());
436 m_remote_image = {*m_peers.begin()};
437
438 auto cct = static_cast<CephContext *>(m_local->cct());
439 journal::Settings journal_settings;
440 journal_settings.commit_interval = cct->_conf.get_val<double>(
441 "rbd_mirror_journal_commit_age");
442 journal_settings.max_fetch_bytes = cct->_conf.get_val<Option::size_t>(
443 "rbd_mirror_journal_max_fetch_bytes");
444
445 Context *ctx = create_context_callback<
446 ImageReplayer, &ImageReplayer<I>::handle_prepare_remote_image>(this);
447 auto req = PrepareRemoteImageRequest<I>::create(
448 m_threads, m_remote_image.io_ctx, m_global_image_id, m_local_mirror_uuid,
449 m_local_image_id, journal_settings, &m_remote_image.mirror_uuid,
450 &m_remote_image.image_id, &m_remote_journaler, &m_client_state,
451 &m_client_meta, ctx);
452 req->send();
453 }
454
455 template <typename I>
456 void ImageReplayer<I>::handle_prepare_remote_image(int r) {
457 dout(10) << "r=" << r << dendl;
458
459 ceph_assert(r < 0 ? m_remote_journaler == nullptr : m_remote_journaler != nullptr);
460 if (r < 0 && !m_local_image_id.empty() &&
461 m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) {
462 // local image is primary -- fall-through
463 } else if (r == -ENOENT) {
464 dout(10) << "remote image does not exist" << dendl;
465
466 // TODO need to support multiple remote images
467 if (m_remote_image.image_id.empty() && !m_local_image_id.empty() &&
468 m_local_image_tag_owner == m_remote_image.mirror_uuid) {
469 // local image exists and is non-primary and linked to the missing
470 // remote image
471
472 m_delete_requested = true;
473 on_start_fail(0, "remote image no longer exists");
474 } else {
475 on_start_fail(-ENOENT, "remote image does not exist");
476 }
477 return;
478 } else if (r < 0) {
479 on_start_fail(r, "error retrieving remote image id");
480 return;
481 }
482
483 bootstrap();
484 }
485
486 template <typename I>
487 void ImageReplayer<I>::bootstrap() {
488 dout(10) << dendl;
489
490 if (!m_local_image_id.empty() &&
491 m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) {
492 dout(5) << "local image is primary" << dendl;
493 on_start_fail(0, "local image is primary");
494 return;
495 } else if (m_peers.empty()) {
496 dout(5) << "no peer clusters" << dendl;
497 on_start_fail(-ENOENT, "no peer clusters");
498 return;
499 }
500
501 BootstrapRequest<I> *request = nullptr;
502 {
503 Mutex::Locker locker(m_lock);
504 if (on_start_interrupted(m_lock)) {
505 return;
506 }
507
508 auto ctx = create_context_callback<
509 ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
510 request = BootstrapRequest<I>::create(
511 m_threads, *m_local_ioctx, m_remote_image.io_ctx, m_instance_watcher,
512 &m_local_image_ctx, m_local_image_id, m_remote_image.image_id,
513 m_global_image_id, m_local_mirror_uuid, m_remote_image.mirror_uuid,
514 m_remote_journaler, &m_client_state, &m_client_meta, ctx,
515 &m_resync_requested, &m_progress_cxt);
516 request->get();
517 m_bootstrap_request = request;
518 }
519
520 update_mirror_image_status(false, boost::none);
521 reschedule_update_status_task(10);
522
523 request->send();
524 }
525
526 template <typename I>
527 void ImageReplayer<I>::handle_bootstrap(int r) {
528 dout(10) << "r=" << r << dendl;
529 {
530 Mutex::Locker locker(m_lock);
531 m_bootstrap_request->put();
532 m_bootstrap_request = nullptr;
533 if (m_local_image_ctx) {
534 m_local_image_id = m_local_image_ctx->id;
535 }
536 }
537
538 if (on_start_interrupted()) {
539 return;
540 } else if (r == -EREMOTEIO) {
541 m_local_image_tag_owner = "";
542 dout(5) << "remote image is non-primary" << dendl;
543 on_start_fail(-EREMOTEIO, "remote image is non-primary");
544 return;
545 } else if (r == -EEXIST) {
546 m_local_image_tag_owner = "";
547 on_start_fail(r, "split-brain detected");
548 return;
549 } else if (r < 0) {
550 on_start_fail(r, "error bootstrapping replay");
551 return;
552 } else if (m_resync_requested) {
553 on_start_fail(0, "resync requested");
554 return;
555 }
556
557 ceph_assert(m_local_journal == nullptr);
558 {
559 RWLock::RLocker snap_locker(m_local_image_ctx->snap_lock);
560 if (m_local_image_ctx->journal != nullptr) {
561 m_local_journal = m_local_image_ctx->journal;
562 m_local_journal->add_listener(m_journal_listener);
563 }
564 }
565
566 if (m_local_journal == nullptr) {
567 on_start_fail(-EINVAL, "error accessing local journal");
568 return;
569 }
570
571 update_mirror_image_status(false, boost::none);
572 init_remote_journaler();
573 }
574
575 template <typename I>
576 void ImageReplayer<I>::init_remote_journaler() {
577 dout(10) << dendl;
578
579 Context *ctx = create_context_callback<
580 ImageReplayer, &ImageReplayer<I>::handle_init_remote_journaler>(this);
581 m_remote_journaler->init(ctx);
582 }
583
584 template <typename I>
585 void ImageReplayer<I>::handle_init_remote_journaler(int r) {
586 dout(10) << "r=" << r << dendl;
587
588 if (on_start_interrupted()) {
589 return;
590 } else if (r < 0) {
591 derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl;
592 on_start_fail(r, "error initializing remote journal");
593 return;
594 }
595
596 m_remote_journaler->add_listener(&m_remote_listener);
597
598 cls::journal::Client client;
599 r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
600 if (r < 0) {
601 derr << "error retrieving remote journal client: " << cpp_strerror(r)
602 << dendl;
603 on_start_fail(r, "error retrieving remote journal client");
604 return;
605 }
606
607 dout(5) << "image_id=" << m_local_image_id << ", "
608 << "client_meta.image_id=" << m_client_meta.image_id << ", "
609 << "client.state=" << client.state << dendl;
610 if (m_client_meta.image_id == m_local_image_id &&
611 client.state != cls::journal::CLIENT_STATE_CONNECTED) {
612 dout(5) << "client flagged disconnected, stopping image replay" << dendl;
613 if (m_local_image_ctx->config.template get_val<bool>("rbd_mirroring_resync_after_disconnect")) {
614 m_resync_requested = true;
615 on_start_fail(-ENOTCONN, "disconnected: automatic resync");
616 } else {
617 on_start_fail(-ENOTCONN, "disconnected");
618 }
619 return;
620 }
621
622 start_replay();
623 }
624
625 template <typename I>
626 void ImageReplayer<I>::start_replay() {
627 dout(10) << dendl;
628
629 Context *start_ctx = create_context_callback<
630 ImageReplayer, &ImageReplayer<I>::handle_start_replay>(this);
631 m_local_journal->start_external_replay(&m_local_replay, start_ctx);
632 }
633
634 template <typename I>
635 void ImageReplayer<I>::handle_start_replay(int r) {
636 dout(10) << "r=" << r << dendl;
637
638 if (r < 0) {
639 ceph_assert(m_local_replay == nullptr);
640 derr << "error starting external replay on local image "
641 << m_local_image_id << ": " << cpp_strerror(r) << dendl;
642 on_start_fail(r, "error starting replay on local image");
643 return;
644 }
645
646 m_replay_status_formatter =
647 ReplayStatusFormatter<I>::create(m_remote_journaler, m_local_mirror_uuid);
648
649 Context *on_finish(nullptr);
650 {
651 Mutex::Locker locker(m_lock);
652 ceph_assert(m_state == STATE_STARTING);
653 m_state = STATE_REPLAYING;
654 std::swap(m_on_start_finish, on_finish);
655 }
656
657 m_event_preprocessor = EventPreprocessor<I>::create(
658 *m_local_image_ctx, *m_remote_journaler, m_local_mirror_uuid,
659 &m_client_meta, m_threads->work_queue);
660
661 update_mirror_image_status(true, boost::none);
662 reschedule_update_status_task(30);
663
664 if (on_replay_interrupted()) {
665 return;
666 }
667
668 {
669 CephContext *cct = static_cast<CephContext *>(m_local->cct());
670 double poll_seconds = cct->_conf.get_val<double>(
671 "rbd_mirror_journal_poll_age");
672
673 Mutex::Locker locker(m_lock);
674 m_replay_handler = new ReplayHandler<I>(this);
675 m_remote_journaler->start_live_replay(m_replay_handler, poll_seconds);
676
677 dout(10) << "m_remote_journaler=" << *m_remote_journaler << dendl;
678 }
679
680 dout(10) << "start succeeded" << dendl;
681 if (on_finish != nullptr) {
682 dout(10) << "on finish complete, r=" << r << dendl;
683 on_finish->complete(r);
684 }
685 }
686
687 template <typename I>
688 void ImageReplayer<I>::on_start_fail(int r, const std::string &desc)
689 {
690 dout(10) << "r=" << r << dendl;
691 Context *ctx = new FunctionContext([this, r, desc](int _r) {
692 {
693 Mutex::Locker locker(m_lock);
694 ceph_assert(m_state == STATE_STARTING);
695 m_state = STATE_STOPPING;
696 if (r < 0 && r != -ECANCELED && r != -EREMOTEIO && r != -ENOENT) {
697 derr << "start failed: " << cpp_strerror(r) << dendl;
698 } else {
699 dout(10) << "start canceled" << dendl;
700 }
701 }
702
703 set_state_description(r, desc);
704 if (m_local_ioctx) {
705 update_mirror_image_status(false, boost::none);
706 }
707 reschedule_update_status_task(-1);
708 shut_down(r);
709 });
710 m_threads->work_queue->queue(ctx, 0);
711 }
712
713 template <typename I>
714 bool ImageReplayer<I>::on_start_interrupted() {
715 Mutex::Locker locker(m_lock);
716 return on_start_interrupted(m_lock);
717 }
718
719 template <typename I>
720 bool ImageReplayer<I>::on_start_interrupted(Mutex& lock) {
721 ceph_assert(m_lock.is_locked());
722 ceph_assert(m_state == STATE_STARTING);
723 if (!m_stop_requested) {
724 return false;
725 }
726
727 on_start_fail(-ECANCELED, "");
728 return true;
729 }
730
731 template <typename I>
732 void ImageReplayer<I>::stop(Context *on_finish, bool manual, int r,
733 const std::string& desc)
734 {
735 dout(10) << "on_finish=" << on_finish << ", manual=" << manual
736 << ", desc=" << desc << dendl;
737
738 image_replayer::BootstrapRequest<I> *bootstrap_request = nullptr;
739 bool shut_down_replay = false;
740 bool running = true;
741 {
742 Mutex::Locker locker(m_lock);
743
744 if (!is_running_()) {
745 running = false;
746 } else {
747 if (!is_stopped_()) {
748 if (m_state == STATE_STARTING) {
749 dout(10) << "canceling start" << dendl;
750 if (m_bootstrap_request != nullptr) {
751 bootstrap_request = m_bootstrap_request;
752 bootstrap_request->get();
753 }
754 } else {
755 dout(10) << "interrupting replay" << dendl;
756 shut_down_replay = true;
757 }
758
759 ceph_assert(m_on_stop_finish == nullptr);
760 std::swap(m_on_stop_finish, on_finish);
761 m_stop_requested = true;
762 m_manual_stop = manual;
763 }
764 }
765 }
766
767 // avoid holding lock since bootstrap request will update status
768 if (bootstrap_request != nullptr) {
769 dout(10) << "canceling bootstrap" << dendl;
770 bootstrap_request->cancel();
771 bootstrap_request->put();
772 }
773
774 if (!running) {
775 dout(20) << "not running" << dendl;
776 if (on_finish) {
777 on_finish->complete(-EINVAL);
778 }
779 return;
780 }
781
782 if (shut_down_replay) {
783 on_stop_journal_replay(r, desc);
784 } else if (on_finish != nullptr) {
785 on_finish->complete(0);
786 }
787 }
788
789 template <typename I>
790 void ImageReplayer<I>::on_stop_journal_replay(int r, const std::string &desc)
791 {
792 dout(10) << dendl;
793
794 {
795 Mutex::Locker locker(m_lock);
796 if (m_state != STATE_REPLAYING) {
797 // might be invoked multiple times while stopping
798 return;
799 }
800 m_stop_requested = true;
801 m_state = STATE_STOPPING;
802 }
803
804 set_state_description(r, desc);
805 update_mirror_image_status(true, boost::none);
806 reschedule_update_status_task(-1);
807 shut_down(0);
808 }
809
810 template <typename I>
811 void ImageReplayer<I>::handle_replay_ready()
812 {
813 dout(20) << dendl;
814 if (on_replay_interrupted()) {
815 return;
816 }
817
818 if (!m_remote_journaler->try_pop_front(&m_replay_entry, &m_replay_tag_tid)) {
819 return;
820 }
821
822 m_event_replay_tracker.start_op();
823
824 m_lock.Lock();
825 bool stopping = (m_state == STATE_STOPPING);
826 m_lock.Unlock();
827
828 if (stopping) {
829 dout(10) << "stopping event replay" << dendl;
830 m_event_replay_tracker.finish_op();
831 return;
832 }
833
834 if (m_replay_tag_valid && m_replay_tag.tid == m_replay_tag_tid) {
835 preprocess_entry();
836 return;
837 }
838
839 replay_flush();
840 }
841
842 template <typename I>
843 void ImageReplayer<I>::restart(Context *on_finish)
844 {
845 FunctionContext *ctx = new FunctionContext(
846 [this, on_finish](int r) {
847 if (r < 0) {
848 // Try start anyway.
849 }
850 start(on_finish, true);
851 });
852 stop(ctx);
853 }
854
855 template <typename I>
856 void ImageReplayer<I>::flush()
857 {
858 dout(10) << dendl;
859 C_SaferCond ctx;
860 flush_local_replay(&ctx);
861 ctx.wait();
862
863 update_mirror_image_status(false, boost::none);
864 }
865
866 template <typename I>
867 void ImageReplayer<I>::flush_local_replay(Context* on_flush)
868 {
869 m_lock.Lock();
870 if (m_state != STATE_REPLAYING) {
871 m_lock.Unlock();
872 on_flush->complete(0);
873 return;
874 }
875
876 dout(15) << dendl;
877 auto ctx = new FunctionContext(
878 [this, on_flush](int r) {
879 handle_flush_local_replay(on_flush, r);
880 });
881 m_local_replay->flush(ctx);
882 m_lock.Unlock();
883 }
884
885 template <typename I>
886 void ImageReplayer<I>::handle_flush_local_replay(Context* on_flush, int r)
887 {
888 dout(15) << "r=" << r << dendl;
889 if (r < 0) {
890 derr << "error flushing local replay: " << cpp_strerror(r) << dendl;
891 on_flush->complete(r);
892 return;
893 }
894
895 flush_commit_position(on_flush);
896 }
897
898 template <typename I>
899 void ImageReplayer<I>::flush_commit_position(Context* on_flush)
900 {
901 m_lock.Lock();
902 if (m_state != STATE_REPLAYING) {
903 m_lock.Unlock();
904 on_flush->complete(0);
905 return;
906 }
907
908 dout(15) << dendl;
909 auto ctx = new FunctionContext(
910 [this, on_flush](int r) {
911 handle_flush_commit_position(on_flush, r);
912 });
913 m_remote_journaler->flush_commit_position(ctx);
914 m_lock.Unlock();
915 }
916
917 template <typename I>
918 void ImageReplayer<I>::handle_flush_commit_position(Context* on_flush, int r)
919 {
920 dout(15) << "r=" << r << dendl;
921 if (r < 0) {
922 derr << "error flushing remote journal commit position: "
923 << cpp_strerror(r) << dendl;
924 }
925
926 on_flush->complete(r);
927 }
928
929 template <typename I>
930 bool ImageReplayer<I>::on_replay_interrupted()
931 {
932 bool shut_down;
933 {
934 Mutex::Locker locker(m_lock);
935 shut_down = m_stop_requested;
936 }
937
938 if (shut_down) {
939 on_stop_journal_replay();
940 }
941 return shut_down;
942 }
943
944 template <typename I>
945 void ImageReplayer<I>::print_status(Formatter *f, stringstream *ss)
946 {
947 dout(10) << dendl;
948
949 Mutex::Locker l(m_lock);
950
951 if (f) {
952 f->open_object_section("image_replayer");
953 f->dump_string("name", m_name);
954 f->dump_string("state", to_string(m_state));
955 f->close_section();
956 f->flush(*ss);
957 } else {
958 *ss << m_name << ": state: " << to_string(m_state);
959 }
960 }
961
962 template <typename I>
963 void ImageReplayer<I>::handle_replay_complete(int r, const std::string &error_desc)
964 {
965 dout(10) << "r=" << r << dendl;
966 if (r < 0) {
967 derr << "replay encountered an error: " << cpp_strerror(r) << dendl;
968 set_state_description(r, error_desc);
969 }
970
971 {
972 Mutex::Locker locker(m_lock);
973 m_stop_requested = true;
974 }
975 on_replay_interrupted();
976 }
977
978 template <typename I>
979 void ImageReplayer<I>::replay_flush() {
980 dout(10) << dendl;
981
982 bool interrupted = false;
983 {
984 Mutex::Locker locker(m_lock);
985 if (m_state != STATE_REPLAYING) {
986 dout(10) << "replay interrupted" << dendl;
987 interrupted = true;
988 } else {
989 m_state = STATE_REPLAY_FLUSHING;
990 }
991 }
992
993 if (interrupted) {
994 m_event_replay_tracker.finish_op();
995 return;
996 }
997
998 // shut down the replay to flush all IO and ops and create a new
999 // replayer to handle the new tag epoch
1000 Context *ctx = create_context_callback<
1001 ImageReplayer<I>, &ImageReplayer<I>::handle_replay_flush>(this);
1002 ctx = new FunctionContext([this, ctx](int r) {
1003 m_local_image_ctx->journal->stop_external_replay();
1004 m_local_replay = nullptr;
1005
1006 if (r < 0) {
1007 ctx->complete(r);
1008 return;
1009 }
1010
1011 m_local_journal->start_external_replay(&m_local_replay, ctx);
1012 });
1013 m_local_replay->shut_down(false, ctx);
1014 }
1015
1016 template <typename I>
1017 void ImageReplayer<I>::handle_replay_flush(int r) {
1018 dout(10) << "r=" << r << dendl;
1019
1020 {
1021 Mutex::Locker locker(m_lock);
1022 ceph_assert(m_state == STATE_REPLAY_FLUSHING);
1023 m_state = STATE_REPLAYING;
1024 }
1025
1026 if (r < 0) {
1027 derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl;
1028 m_event_replay_tracker.finish_op();
1029 handle_replay_complete(r, "replay flush encountered an error");
1030 return;
1031 } else if (on_replay_interrupted()) {
1032 m_event_replay_tracker.finish_op();
1033 return;
1034 }
1035
1036 get_remote_tag();
1037 }
1038
1039 template <typename I>
1040 void ImageReplayer<I>::get_remote_tag() {
1041 dout(15) << "tag_tid: " << m_replay_tag_tid << dendl;
1042
1043 Context *ctx = create_context_callback<
1044 ImageReplayer, &ImageReplayer<I>::handle_get_remote_tag>(this);
1045 m_remote_journaler->get_tag(m_replay_tag_tid, &m_replay_tag, ctx);
1046 }
1047
1048 template <typename I>
1049 void ImageReplayer<I>::handle_get_remote_tag(int r) {
1050 dout(15) << "r=" << r << dendl;
1051
1052 if (r == 0) {
1053 try {
1054 auto it = m_replay_tag.data.cbegin();
1055 decode(m_replay_tag_data, it);
1056 } catch (const buffer::error &err) {
1057 r = -EBADMSG;
1058 }
1059 }
1060
1061 if (r < 0) {
1062 derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": "
1063 << cpp_strerror(r) << dendl;
1064 m_event_replay_tracker.finish_op();
1065 handle_replay_complete(r, "failed to retrieve remote tag");
1066 return;
1067 }
1068
1069 m_replay_tag_valid = true;
1070 dout(15) << "decoded remote tag " << m_replay_tag_tid << ": "
1071 << m_replay_tag_data << dendl;
1072
1073 allocate_local_tag();
1074 }
1075
1076 template <typename I>
1077 void ImageReplayer<I>::allocate_local_tag() {
1078 dout(15) << dendl;
1079
1080 std::string mirror_uuid = m_replay_tag_data.mirror_uuid;
1081 if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
1082 mirror_uuid = m_remote_image.mirror_uuid;
1083 } else if (mirror_uuid == m_local_mirror_uuid) {
1084 mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
1085 } else if (mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) {
1086 // handle possible edge condition where daemon can failover and
1087 // the local image has already been promoted/demoted
1088 auto local_tag_data = m_local_journal->get_tag_data();
1089 if (local_tag_data.mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID &&
1090 (local_tag_data.predecessor.commit_valid &&
1091 local_tag_data.predecessor.mirror_uuid ==
1092 librbd::Journal<>::LOCAL_MIRROR_UUID)) {
1093 dout(15) << "skipping stale demotion event" << dendl;
1094 handle_process_entry_safe(m_replay_entry, m_replay_start_time, 0);
1095 handle_replay_ready();
1096 return;
1097 } else {
1098 dout(5) << "encountered image demotion: stopping" << dendl;
1099 Mutex::Locker locker(m_lock);
1100 m_stop_requested = true;
1101 }
1102 }
1103
1104 librbd::journal::TagPredecessor predecessor(m_replay_tag_data.predecessor);
1105 if (predecessor.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
1106 predecessor.mirror_uuid = m_remote_image.mirror_uuid;
1107 } else if (predecessor.mirror_uuid == m_local_mirror_uuid) {
1108 predecessor.mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
1109 }
1110
1111 dout(15) << "mirror_uuid=" << mirror_uuid << ", "
1112 << "predecessor=" << predecessor << ", "
1113 << "replay_tag_tid=" << m_replay_tag_tid << dendl;
1114 Context *ctx = create_context_callback<
1115 ImageReplayer, &ImageReplayer<I>::handle_allocate_local_tag>(this);
1116 m_local_journal->allocate_tag(mirror_uuid, predecessor, ctx);
1117 }
1118
1119 template <typename I>
1120 void ImageReplayer<I>::handle_allocate_local_tag(int r) {
1121 dout(15) << "r=" << r << ", "
1122 << "tag_tid=" << m_local_journal->get_tag_tid() << dendl;
1123
1124 if (r < 0) {
1125 derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl;
1126 m_event_replay_tracker.finish_op();
1127 handle_replay_complete(r, "failed to allocate journal tag");
1128 return;
1129 }
1130
1131 preprocess_entry();
1132 }
1133
1134 template <typename I>
1135 void ImageReplayer<I>::preprocess_entry() {
1136 dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid()
1137 << dendl;
1138
1139 bufferlist data = m_replay_entry.get_data();
1140 auto it = data.cbegin();
1141 int r = m_local_replay->decode(&it, &m_event_entry);
1142 if (r < 0) {
1143 derr << "failed to decode journal event" << dendl;
1144 m_event_replay_tracker.finish_op();
1145 handle_replay_complete(r, "failed to decode journal event");
1146 return;
1147 }
1148
1149 uint32_t delay = calculate_replay_delay(
1150 m_event_entry.timestamp, m_local_image_ctx->mirroring_replay_delay);
1151 if (delay == 0) {
1152 handle_preprocess_entry_ready(0);
1153 return;
1154 }
1155
1156 dout(20) << "delaying replay by " << delay << " sec" << dendl;
1157
1158 Mutex::Locker timer_locker(m_threads->timer_lock);
1159 ceph_assert(m_delayed_preprocess_task == nullptr);
1160 m_delayed_preprocess_task = new FunctionContext(
1161 [this](int r) {
1162 ceph_assert(m_threads->timer_lock.is_locked());
1163 m_delayed_preprocess_task = nullptr;
1164 m_threads->work_queue->queue(
1165 create_context_callback<ImageReplayer,
1166 &ImageReplayer<I>::handle_preprocess_entry_ready>(this), 0);
1167 });
1168 m_threads->timer->add_event_after(delay, m_delayed_preprocess_task);
1169 }
1170
1171 template <typename I>
1172 void ImageReplayer<I>::handle_preprocess_entry_ready(int r) {
1173 dout(20) << "r=" << r << dendl;
1174 ceph_assert(r == 0);
1175
1176 m_replay_start_time = ceph_clock_now();
1177 if (!m_event_preprocessor->is_required(m_event_entry)) {
1178 process_entry();
1179 return;
1180 }
1181
1182 Context *ctx = create_context_callback<
1183 ImageReplayer, &ImageReplayer<I>::handle_preprocess_entry_safe>(this);
1184 m_event_preprocessor->preprocess(&m_event_entry, ctx);
1185 }
1186
1187 template <typename I>
1188 void ImageReplayer<I>::handle_preprocess_entry_safe(int r) {
1189 dout(20) << "r=" << r << dendl;
1190
1191 if (r < 0) {
1192 m_event_replay_tracker.finish_op();
1193
1194 if (r == -ECANCELED) {
1195 handle_replay_complete(0, "lost exclusive lock");
1196 } else {
1197 derr << "failed to preprocess journal event" << dendl;
1198 handle_replay_complete(r, "failed to preprocess journal event");
1199 }
1200 return;
1201 }
1202
1203 process_entry();
1204 }
1205
1206 template <typename I>
1207 void ImageReplayer<I>::process_entry() {
1208 dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid()
1209 << dendl;
1210
1211 // stop replaying events if stop has been requested
1212 if (on_replay_interrupted()) {
1213 m_event_replay_tracker.finish_op();
1214 return;
1215 }
1216
1217 Context *on_ready = create_context_callback<
1218 ImageReplayer, &ImageReplayer<I>::handle_process_entry_ready>(this);
1219 Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry),
1220 m_replay_start_time);
1221
1222 m_local_replay->process(m_event_entry, on_ready, on_commit);
1223 }
1224
1225 template <typename I>
1226 void ImageReplayer<I>::handle_process_entry_ready(int r) {
1227 dout(20) << dendl;
1228 ceph_assert(r == 0);
1229
1230 bool update_status = false;
1231 {
1232 RWLock::RLocker snap_locker(m_local_image_ctx->snap_lock);
1233 if (m_local_image_name != m_local_image_ctx->name) {
1234 m_local_image_name = m_local_image_ctx->name;
1235 update_status = true;
1236 }
1237 }
1238
1239 if (update_status) {
1240 reschedule_update_status_task(0);
1241 }
1242
1243 // attempt to process the next event
1244 handle_replay_ready();
1245 }
1246
1247 template <typename I>
1248 void ImageReplayer<I>::handle_process_entry_safe(const ReplayEntry &replay_entry,
1249 const utime_t &replay_start_time,
1250 int r) {
1251 dout(20) << "commit_tid=" << replay_entry.get_commit_tid() << ", r=" << r
1252 << dendl;
1253
1254 if (r < 0) {
1255 derr << "failed to commit journal event: " << cpp_strerror(r) << dendl;
1256 handle_replay_complete(r, "failed to commit journal event");
1257 } else {
1258 ceph_assert(m_remote_journaler != nullptr);
1259 m_remote_journaler->committed(replay_entry);
1260 }
1261
1262 auto bytes = replay_entry.get_data().length();
1263 auto latency = ceph_clock_now() - replay_start_time;
1264
1265 if (g_perf_counters) {
1266 g_perf_counters->inc(l_rbd_mirror_replay);
1267 g_perf_counters->inc(l_rbd_mirror_replay_bytes, bytes);
1268 g_perf_counters->tinc(l_rbd_mirror_replay_latency, latency);
1269 }
1270
1271 auto ctx = new FunctionContext(
1272 [this, bytes, latency](int r) {
1273 Mutex::Locker locker(m_lock);
1274 if (m_perf_counters) {
1275 m_perf_counters->inc(l_rbd_mirror_replay);
1276 m_perf_counters->inc(l_rbd_mirror_replay_bytes, bytes);
1277 m_perf_counters->tinc(l_rbd_mirror_replay_latency, latency);
1278 }
1279 m_event_replay_tracker.finish_op();
1280 });
1281 m_threads->work_queue->queue(ctx, 0);
1282 }
1283
1284 template <typename I>
1285 bool ImageReplayer<I>::update_mirror_image_status(bool force,
1286 const OptionalState &state) {
1287 dout(15) << dendl;
1288 {
1289 Mutex::Locker locker(m_lock);
1290 if (!start_mirror_image_status_update(force, false)) {
1291 return false;
1292 }
1293 }
1294
1295 queue_mirror_image_status_update(state);
1296 return true;
1297 }
1298
1299 template <typename I>
1300 bool ImageReplayer<I>::start_mirror_image_status_update(bool force,
1301 bool restarting) {
1302 ceph_assert(m_lock.is_locked());
1303
1304 if (!force && !is_stopped_()) {
1305 if (!is_running_()) {
1306 dout(15) << "shut down in-progress: ignoring update" << dendl;
1307 return false;
1308 } else if (m_in_flight_status_updates > (restarting ? 1 : 0)) {
1309 dout(15) << "already sending update" << dendl;
1310 m_update_status_requested = true;
1311 return false;
1312 }
1313 }
1314
1315 ++m_in_flight_status_updates;
1316 dout(15) << "in-flight updates=" << m_in_flight_status_updates << dendl;
1317 return true;
1318 }
1319
1320 template <typename I>
1321 void ImageReplayer<I>::finish_mirror_image_status_update() {
1322 reregister_admin_socket_hook();
1323
1324 Context *on_finish = nullptr;
1325 {
1326 Mutex::Locker locker(m_lock);
1327 ceph_assert(m_in_flight_status_updates > 0);
1328 if (--m_in_flight_status_updates > 0) {
1329 dout(15) << "waiting on " << m_in_flight_status_updates << " in-flight "
1330 << "updates" << dendl;
1331 return;
1332 }
1333
1334 std::swap(on_finish, m_on_update_status_finish);
1335 }
1336
1337 dout(15) << dendl;
1338 if (on_finish != nullptr) {
1339 on_finish->complete(0);
1340 }
1341 }
1342
1343 template <typename I>
1344 void ImageReplayer<I>::queue_mirror_image_status_update(const OptionalState &state) {
1345 dout(15) << dendl;
1346
1347 auto ctx = new FunctionContext(
1348 [this, state](int r) {
1349 send_mirror_status_update(state);
1350 });
1351
1352 // ensure pending IO is flushed and the commit position is updated
1353 // prior to updating the mirror status
1354 ctx = new FunctionContext(
1355 [this, ctx](int r) {
1356 flush_local_replay(ctx);
1357 });
1358 m_threads->work_queue->queue(ctx, 0);
1359 }
1360
1361 template <typename I>
1362 void ImageReplayer<I>::send_mirror_status_update(const OptionalState &opt_state) {
1363 State state;
1364 std::string state_desc;
1365 int last_r;
1366 bool stopping_replay;
1367
1368 OptionalMirrorImageStatusState mirror_image_status_state =
1369 boost::make_optional(false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN);
1370 image_replayer::BootstrapRequest<I>* bootstrap_request = nullptr;
1371 {
1372 Mutex::Locker locker(m_lock);
1373 state = m_state;
1374 state_desc = m_state_desc;
1375 mirror_image_status_state = m_mirror_image_status_state;
1376 last_r = m_last_r;
1377 stopping_replay = (m_local_image_ctx != nullptr);
1378
1379 if (m_bootstrap_request != nullptr) {
1380 bootstrap_request = m_bootstrap_request;
1381 bootstrap_request->get();
1382 }
1383 }
1384
1385 bool syncing = false;
1386 if (bootstrap_request != nullptr) {
1387 syncing = bootstrap_request->is_syncing();
1388 bootstrap_request->put();
1389 bootstrap_request = nullptr;
1390 }
1391
1392 if (opt_state) {
1393 state = *opt_state;
1394 }
1395
1396 cls::rbd::MirrorImageStatus status;
1397 status.up = true;
1398 switch (state) {
1399 case STATE_STARTING:
1400 if (syncing) {
1401 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING;
1402 status.description = state_desc.empty() ? "syncing" : state_desc;
1403 mirror_image_status_state = status.state;
1404 } else {
1405 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY;
1406 status.description = "starting replay";
1407 }
1408 break;
1409 case STATE_REPLAYING:
1410 case STATE_REPLAY_FLUSHING:
1411 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
1412 {
1413 Context *on_req_finish = new FunctionContext(
1414 [this](int r) {
1415 dout(15) << "replay status ready: r=" << r << dendl;
1416 if (r >= 0) {
1417 send_mirror_status_update(boost::none);
1418 } else if (r == -EAGAIN) {
1419 // decrement in-flight status update counter
1420 handle_mirror_status_update(r);
1421 }
1422 });
1423
1424 std::string desc;
1425 ceph_assert(m_replay_status_formatter != nullptr);
1426 if (!m_replay_status_formatter->get_or_send_update(&desc,
1427 on_req_finish)) {
1428 dout(15) << "waiting for replay status" << dendl;
1429 return;
1430 }
1431 status.description = "replaying, " + desc;
1432 mirror_image_status_state = boost::make_optional(
1433 false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN);
1434 }
1435 break;
1436 case STATE_STOPPING:
1437 if (stopping_replay) {
1438 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY;
1439 status.description = state_desc.empty() ? "stopping replay" : state_desc;
1440 break;
1441 }
1442 // FALLTHROUGH
1443 case STATE_STOPPED:
1444 if (last_r == -EREMOTEIO) {
1445 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN;
1446 status.description = state_desc;
1447 mirror_image_status_state = status.state;
1448 } else if (last_r < 0) {
1449 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR;
1450 status.description = state_desc;
1451 mirror_image_status_state = status.state;
1452 } else {
1453 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED;
1454 status.description = state_desc.empty() ? "stopped" : state_desc;
1455 mirror_image_status_state = boost::none;
1456 }
1457 break;
1458 default:
1459 ceph_assert(!"invalid state");
1460 }
1461
1462 {
1463 Mutex::Locker locker(m_lock);
1464 m_mirror_image_status_state = mirror_image_status_state;
1465 }
1466
1467 // prevent the status from ping-ponging when failed replays are restarted
1468 if (mirror_image_status_state &&
1469 *mirror_image_status_state == cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR) {
1470 status.state = *mirror_image_status_state;
1471 }
1472
1473 dout(15) << "status=" << status << dendl;
1474 librados::ObjectWriteOperation op;
1475 librbd::cls_client::mirror_image_status_set(&op, m_global_image_id, status);
1476
1477 ceph_assert(m_local_ioctx);
1478 librados::AioCompletion *aio_comp = create_rados_callback<
1479 ImageReplayer<I>, &ImageReplayer<I>::handle_mirror_status_update>(this);
1480 int r = m_local_ioctx->aio_operate(RBD_MIRRORING, aio_comp, &op);
1481 ceph_assert(r == 0);
1482 aio_comp->release();
1483 }
1484
1485 template <typename I>
1486 void ImageReplayer<I>::handle_mirror_status_update(int r) {
1487 dout(15) << "r=" << r << dendl;
1488
1489 bool running = false;
1490 bool started = false;
1491 {
1492 Mutex::Locker locker(m_lock);
1493 bool update_status_requested = false;
1494 std::swap(update_status_requested, m_update_status_requested);
1495
1496 running = is_running_();
1497 if (running && update_status_requested) {
1498 started = start_mirror_image_status_update(false, true);
1499 }
1500 }
1501
1502 // if a deferred update is available, send it -- otherwise reschedule
1503 // the timer task
1504 if (started) {
1505 queue_mirror_image_status_update(boost::none);
1506 } else if (running) {
1507 reschedule_update_status_task(0);
1508 }
1509
1510 // mark committed status update as no longer in-flight
1511 finish_mirror_image_status_update();
1512 }
1513
1514 template <typename I>
1515 void ImageReplayer<I>::reschedule_update_status_task(int new_interval) {
1516 bool canceled_task = false;
1517 {
1518 Mutex::Locker locker(m_lock);
1519 Mutex::Locker timer_locker(m_threads->timer_lock);
1520
1521 if (m_update_status_task) {
1522 dout(15) << "canceling existing status update task" << dendl;
1523
1524 canceled_task = m_threads->timer->cancel_event(m_update_status_task);
1525 m_update_status_task = nullptr;
1526 }
1527
1528 if (new_interval > 0) {
1529 m_update_status_interval = new_interval;
1530 }
1531
1532 if (new_interval >= 0 && is_running_() &&
1533 start_mirror_image_status_update(true, false)) {
1534 m_update_status_task = new FunctionContext(
1535 [this](int r) {
1536 ceph_assert(m_threads->timer_lock.is_locked());
1537 m_update_status_task = nullptr;
1538
1539 queue_mirror_image_status_update(boost::none);
1540 });
1541 dout(15) << "scheduling status update task after "
1542 << m_update_status_interval << " seconds" << dendl;
1543 m_threads->timer->add_event_after(m_update_status_interval,
1544 m_update_status_task);
1545 }
1546 }
1547
1548 if (canceled_task) {
1549 // decrement in-flight status update counter for canceled task
1550 finish_mirror_image_status_update();
1551 }
1552 }
1553
1554 template <typename I>
1555 void ImageReplayer<I>::shut_down(int r) {
1556 dout(10) << "r=" << r << dendl;
1557
1558 bool canceled_delayed_preprocess_task = false;
1559 {
1560 Mutex::Locker timer_locker(m_threads->timer_lock);
1561 if (m_delayed_preprocess_task != nullptr) {
1562 canceled_delayed_preprocess_task = m_threads->timer->cancel_event(
1563 m_delayed_preprocess_task);
1564 ceph_assert(canceled_delayed_preprocess_task);
1565 m_delayed_preprocess_task = nullptr;
1566 }
1567 }
1568 if (canceled_delayed_preprocess_task) {
1569 // wake up sleeping replay
1570 m_event_replay_tracker.finish_op();
1571 }
1572
1573 reschedule_update_status_task(-1);
1574
1575 {
1576 Mutex::Locker locker(m_lock);
1577 ceph_assert(m_state == STATE_STOPPING);
1578
1579 // if status updates are in-flight, wait for them to complete
1580 // before proceeding
1581 if (m_in_flight_status_updates > 0) {
1582 if (m_on_update_status_finish == nullptr) {
1583 dout(15) << "waiting for in-flight status update" << dendl;
1584 m_on_update_status_finish = new FunctionContext(
1585 [this, r](int _r) {
1586 shut_down(r);
1587 });
1588 }
1589 return;
1590 }
1591 }
1592
1593 // NOTE: it's important to ensure that the local image is fully
1594 // closed before attempting to close the remote journal in
1595 // case the remote cluster is unreachable
1596
1597 // chain the shut down sequence (reverse order)
1598 Context *ctx = new FunctionContext(
1599 [this, r](int _r) {
1600 if (m_local_ioctx) {
1601 update_mirror_image_status(true, STATE_STOPPED);
1602 }
1603 handle_shut_down(r);
1604 });
1605
1606 // close the remote journal
1607 if (m_remote_journaler != nullptr) {
1608 ctx = new FunctionContext([this, ctx](int r) {
1609 delete m_remote_journaler;
1610 m_remote_journaler = nullptr;
1611 ctx->complete(0);
1612 });
1613 ctx = new FunctionContext([this, ctx](int r) {
1614 m_remote_journaler->remove_listener(&m_remote_listener);
1615 m_remote_journaler->shut_down(ctx);
1616 });
1617 }
1618
1619 // stop the replay of remote journal events
1620 if (m_replay_handler != nullptr) {
1621 ctx = new FunctionContext([this, ctx](int r) {
1622 delete m_replay_handler;
1623 m_replay_handler = nullptr;
1624
1625 m_event_replay_tracker.wait_for_ops(ctx);
1626 });
1627 ctx = new FunctionContext([this, ctx](int r) {
1628 m_remote_journaler->stop_replay(ctx);
1629 });
1630 }
1631
1632 // close the local image (release exclusive lock)
1633 if (m_local_image_ctx) {
1634 ctx = new FunctionContext([this, ctx](int r) {
1635 CloseImageRequest<I> *request = CloseImageRequest<I>::create(
1636 &m_local_image_ctx, ctx);
1637 request->send();
1638 });
1639 }
1640
1641 // shut down event replay into the local image
1642 if (m_local_journal != nullptr) {
1643 ctx = new FunctionContext([this, ctx](int r) {
1644 m_local_journal = nullptr;
1645 ctx->complete(0);
1646 });
1647 if (m_local_replay != nullptr) {
1648 ctx = new FunctionContext([this, ctx](int r) {
1649 m_local_journal->stop_external_replay();
1650 m_local_replay = nullptr;
1651
1652 EventPreprocessor<I>::destroy(m_event_preprocessor);
1653 m_event_preprocessor = nullptr;
1654 ctx->complete(0);
1655 });
1656 }
1657 ctx = new FunctionContext([this, ctx](int r) {
1658 // blocks if listener notification is in-progress
1659 m_local_journal->remove_listener(m_journal_listener);
1660 ctx->complete(0);
1661 });
1662 }
1663
1664 // wait for all local in-flight replay events to complete
1665 ctx = new FunctionContext([this, ctx](int r) {
1666 if (r < 0) {
1667 derr << "error shutting down journal replay: " << cpp_strerror(r)
1668 << dendl;
1669 }
1670
1671 m_event_replay_tracker.wait_for_ops(ctx);
1672 });
1673
1674 // flush any local in-flight replay events
1675 if (m_local_replay != nullptr) {
1676 ctx = new FunctionContext([this, ctx](int r) {
1677 m_local_replay->shut_down(true, ctx);
1678 });
1679 }
1680
1681 m_threads->work_queue->queue(ctx, 0);
1682 }
1683
1684 template <typename I>
1685 void ImageReplayer<I>::handle_shut_down(int r) {
1686 reschedule_update_status_task(-1);
1687
1688 bool resync_requested = false;
1689 bool delete_requested = false;
1690 bool unregister_asok_hook = false;
1691 {
1692 Mutex::Locker locker(m_lock);
1693
1694 // if status updates are in-flight, wait for them to complete
1695 // before proceeding
1696 if (m_in_flight_status_updates > 0) {
1697 if (m_on_update_status_finish == nullptr) {
1698 dout(15) << "waiting for in-flight status update" << dendl;
1699 m_on_update_status_finish = new FunctionContext(
1700 [this, r](int _r) {
1701 handle_shut_down(r);
1702 });
1703 }
1704 return;
1705 }
1706
1707 if (m_delete_requested && !m_local_image_id.empty()) {
1708 ceph_assert(m_remote_image.image_id.empty());
1709 dout(0) << "remote image no longer exists: scheduling deletion" << dendl;
1710 unregister_asok_hook = true;
1711 std::swap(delete_requested, m_delete_requested);
1712 }
1713
1714 std::swap(resync_requested, m_resync_requested);
1715 if (delete_requested || resync_requested) {
1716 m_local_image_id = "";
1717 } else if (m_last_r == -ENOENT &&
1718 m_local_image_id.empty() && m_remote_image.image_id.empty()) {
1719 dout(0) << "mirror image no longer exists" << dendl;
1720 unregister_asok_hook = true;
1721 m_finished = true;
1722 }
1723 }
1724
1725 if (unregister_asok_hook) {
1726 unregister_admin_socket_hook();
1727 }
1728
1729 if (delete_requested || resync_requested) {
1730 dout(5) << "moving image to trash" << dendl;
1731 auto ctx = new FunctionContext([this, r](int) {
1732 handle_shut_down(r);
1733 });
1734 ImageDeleter<I>::trash_move(*m_local_ioctx, m_global_image_id,
1735 resync_requested, m_threads->work_queue, ctx);
1736 return;
1737 }
1738
1739 dout(10) << "stop complete" << dendl;
1740 ReplayStatusFormatter<I>::destroy(m_replay_status_formatter);
1741 m_replay_status_formatter = nullptr;
1742
1743 Context *on_start = nullptr;
1744 Context *on_stop = nullptr;
1745 {
1746 Mutex::Locker locker(m_lock);
1747 std::swap(on_start, m_on_start_finish);
1748 std::swap(on_stop, m_on_stop_finish);
1749 m_stop_requested = false;
1750 ceph_assert(m_delayed_preprocess_task == nullptr);
1751 ceph_assert(m_state == STATE_STOPPING);
1752 m_state = STATE_STOPPED;
1753 }
1754
1755 if (on_start != nullptr) {
1756 dout(10) << "on start finish complete, r=" << r << dendl;
1757 on_start->complete(r);
1758 r = 0;
1759 }
1760 if (on_stop != nullptr) {
1761 dout(10) << "on stop finish complete, r=" << r << dendl;
1762 on_stop->complete(r);
1763 }
1764 }
1765
1766 template <typename I>
1767 void ImageReplayer<I>::handle_remote_journal_metadata_updated() {
1768 dout(20) << dendl;
1769
1770 cls::journal::Client client;
1771 {
1772 Mutex::Locker locker(m_lock);
1773 if (!is_running_()) {
1774 return;
1775 }
1776
1777 int r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
1778 if (r < 0) {
1779 derr << "failed to retrieve client: " << cpp_strerror(r) << dendl;
1780 return;
1781 }
1782 }
1783
1784 if (client.state != cls::journal::CLIENT_STATE_CONNECTED) {
1785 dout(0) << "client flagged disconnected, stopping image replay" << dendl;
1786 stop(nullptr, false, -ENOTCONN, "disconnected");
1787 }
1788 }
1789
1790 template <typename I>
1791 std::string ImageReplayer<I>::to_string(const State state) {
1792 switch (state) {
1793 case ImageReplayer<I>::STATE_STARTING:
1794 return "Starting";
1795 case ImageReplayer<I>::STATE_REPLAYING:
1796 return "Replaying";
1797 case ImageReplayer<I>::STATE_REPLAY_FLUSHING:
1798 return "ReplayFlushing";
1799 case ImageReplayer<I>::STATE_STOPPING:
1800 return "Stopping";
1801 case ImageReplayer<I>::STATE_STOPPED:
1802 return "Stopped";
1803 default:
1804 break;
1805 }
1806 return "Unknown(" + stringify(state) + ")";
1807 }
1808
1809 template <typename I>
1810 void ImageReplayer<I>::resync_image(Context *on_finish) {
1811 dout(10) << dendl;
1812
1813 m_resync_requested = true;
1814 stop(on_finish);
1815 }
1816
1817 template <typename I>
1818 void ImageReplayer<I>::register_admin_socket_hook() {
1819 ImageReplayerAdminSocketHook<I> *asok_hook;
1820 {
1821 Mutex::Locker locker(m_lock);
1822 if (m_asok_hook != nullptr) {
1823 return;
1824 }
1825
1826 ceph_assert(m_perf_counters == nullptr);
1827
1828 dout(15) << "registered asok hook: " << m_name << dendl;
1829 asok_hook = new ImageReplayerAdminSocketHook<I>(g_ceph_context, m_name,
1830 this);
1831 int r = asok_hook->register_commands();
1832 if (r == 0) {
1833 m_asok_hook = asok_hook;
1834
1835 CephContext *cct = static_cast<CephContext *>(m_local->cct());
1836 auto prio = cct->_conf.get_val<int64_t>("rbd_mirror_perf_stats_prio");
1837 PerfCountersBuilder plb(g_ceph_context, "rbd_mirror_" + m_name,
1838 l_rbd_mirror_first, l_rbd_mirror_last);
1839 plb.add_u64_counter(l_rbd_mirror_replay, "replay", "Replays", "r", prio);
1840 plb.add_u64_counter(l_rbd_mirror_replay_bytes, "replay_bytes",
1841 "Replayed data", "rb", prio, unit_t(UNIT_BYTES));
1842 plb.add_time_avg(l_rbd_mirror_replay_latency, "replay_latency",
1843 "Replay latency", "rl", prio);
1844 m_perf_counters = plb.create_perf_counters();
1845 g_ceph_context->get_perfcounters_collection()->add(m_perf_counters);
1846
1847 return;
1848 }
1849 derr << "error registering admin socket commands" << dendl;
1850 }
1851 delete asok_hook;
1852 }
1853
1854 template <typename I>
1855 void ImageReplayer<I>::unregister_admin_socket_hook() {
1856 dout(15) << dendl;
1857
1858 AdminSocketHook *asok_hook = nullptr;
1859 PerfCounters *perf_counters = nullptr;
1860 {
1861 Mutex::Locker locker(m_lock);
1862 std::swap(asok_hook, m_asok_hook);
1863 std::swap(perf_counters, m_perf_counters);
1864 }
1865 delete asok_hook;
1866 if (perf_counters != nullptr) {
1867 g_ceph_context->get_perfcounters_collection()->remove(perf_counters);
1868 delete perf_counters;
1869 }
1870 }
1871
1872 template <typename I>
1873 void ImageReplayer<I>::reregister_admin_socket_hook() {
1874 {
1875 Mutex::Locker locker(m_lock);
1876 auto name = m_local_ioctx->get_pool_name() + "/" + m_local_image_name;
1877 if (m_asok_hook != nullptr && m_name == name) {
1878 return;
1879 }
1880 m_name = name;
1881 }
1882 unregister_admin_socket_hook();
1883 register_admin_socket_hook();
1884 }
1885
1886 template <typename I>
1887 std::ostream &operator<<(std::ostream &os, const ImageReplayer<I> &replayer)
1888 {
1889 os << "ImageReplayer: " << &replayer << " [" << replayer.get_local_pool_id()
1890 << "/" << replayer.get_global_image_id() << "]";
1891 return os;
1892 }
1893
1894 } // namespace mirror
1895 } // namespace rbd
1896
1897 template class rbd::mirror::ImageReplayer<librbd::ImageCtx>;