]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/ImageReplayer.cc
buildsys: use download.ceph.com to download source tar ball
[ceph.git] / ceph / src / tools / rbd_mirror / ImageReplayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/compat.h"
5 #include "common/Formatter.h"
6 #include "common/admin_socket.h"
7 #include "common/debug.h"
8 #include "common/errno.h"
9 #include "include/stringify.h"
10 #include "cls/rbd/cls_rbd_client.h"
11 #include "common/Timer.h"
12 #include "common/WorkQueue.h"
13 #include "global/global_context.h"
14 #include "journal/Journaler.h"
15 #include "journal/ReplayHandler.h"
16 #include "journal/Settings.h"
17 #include "librbd/ExclusiveLock.h"
18 #include "librbd/ImageCtx.h"
19 #include "librbd/ImageState.h"
20 #include "librbd/Journal.h"
21 #include "librbd/Operations.h"
22 #include "librbd/Utils.h"
23 #include "librbd/journal/Replay.h"
24 #include "ImageDeleter.h"
25 #include "ImageReplayer.h"
26 #include "Threads.h"
27 #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
28 #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
29 #include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
30 #include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
31 #include "tools/rbd_mirror/image_replayer/PrepareRemoteImageRequest.h"
32 #include "tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h"
33
34 #define dout_context g_ceph_context
35 #define dout_subsys ceph_subsys_rbd_mirror
36 #undef dout_prefix
37 #define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
38 << __func__ << ": "
39
40 using std::map;
41 using std::string;
42 using std::unique_ptr;
43 using std::shared_ptr;
44 using std::vector;
45
46 extern PerfCounters *g_perf_counters;
47
48 namespace rbd {
49 namespace mirror {
50
51 using librbd::util::create_context_callback;
52 using librbd::util::create_rados_callback;
53 using namespace rbd::mirror::image_replayer;
54
55 template <typename I>
56 std::ostream &operator<<(std::ostream &os,
57 const typename ImageReplayer<I>::State &state);
58
59 namespace {
60
61 template <typename I>
62 struct ReplayHandler : public ::journal::ReplayHandler {
63 ImageReplayer<I> *replayer;
64 ReplayHandler(ImageReplayer<I> *replayer) : replayer(replayer) {}
65 void get() override {}
66 void put() override {}
67
68 void handle_entries_available() override {
69 replayer->handle_replay_ready();
70 }
71 void handle_complete(int r) override {
72 std::stringstream ss;
73 if (r < 0) {
74 ss << "replay completed with error: " << cpp_strerror(r);
75 }
76 replayer->handle_replay_complete(r, ss.str());
77 }
78 };
79
80 template <typename I>
81 class ImageReplayerAdminSocketCommand {
82 public:
83 ImageReplayerAdminSocketCommand(const std::string &desc,
84 ImageReplayer<I> *replayer)
85 : desc(desc), replayer(replayer) {
86 }
87 virtual ~ImageReplayerAdminSocketCommand() {}
88 virtual bool call(Formatter *f, stringstream *ss) = 0;
89
90 std::string desc;
91 ImageReplayer<I> *replayer;
92 bool registered = false;
93 };
94
95 template <typename I>
96 class StatusCommand : public ImageReplayerAdminSocketCommand<I> {
97 public:
98 explicit StatusCommand(const std::string &desc, ImageReplayer<I> *replayer)
99 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
100 }
101
102 bool call(Formatter *f, stringstream *ss) override {
103 this->replayer->print_status(f, ss);
104 return true;
105 }
106 };
107
108 template <typename I>
109 class StartCommand : public ImageReplayerAdminSocketCommand<I> {
110 public:
111 explicit StartCommand(const std::string &desc, ImageReplayer<I> *replayer)
112 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
113 }
114
115 bool call(Formatter *f, stringstream *ss) override {
116 this->replayer->start(nullptr, true);
117 return true;
118 }
119 };
120
121 template <typename I>
122 class StopCommand : public ImageReplayerAdminSocketCommand<I> {
123 public:
124 explicit StopCommand(const std::string &desc, ImageReplayer<I> *replayer)
125 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
126 }
127
128 bool call(Formatter *f, stringstream *ss) override {
129 this->replayer->stop(nullptr, true);
130 return true;
131 }
132 };
133
134 template <typename I>
135 class RestartCommand : public ImageReplayerAdminSocketCommand<I> {
136 public:
137 explicit RestartCommand(const std::string &desc, ImageReplayer<I> *replayer)
138 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
139 }
140
141 bool call(Formatter *f, stringstream *ss) override {
142 this->replayer->restart();
143 return true;
144 }
145 };
146
147 template <typename I>
148 class FlushCommand : public ImageReplayerAdminSocketCommand<I> {
149 public:
150 explicit FlushCommand(const std::string &desc, ImageReplayer<I> *replayer)
151 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
152 }
153
154 bool call(Formatter *f, stringstream *ss) override {
155 this->replayer->flush();
156 return true;
157 }
158 };
159
160 template <typename I>
161 class ImageReplayerAdminSocketHook : public AdminSocketHook {
162 public:
163 ImageReplayerAdminSocketHook(CephContext *cct, const std::string &name,
164 ImageReplayer<I> *replayer)
165 : admin_socket(cct->get_admin_socket()),
166 commands{{"rbd mirror flush " + name,
167 new FlushCommand<I>("flush rbd mirror " + name, replayer)},
168 {"rbd mirror restart " + name,
169 new RestartCommand<I>("restart rbd mirror " + name, replayer)},
170 {"rbd mirror start " + name,
171 new StartCommand<I>("start rbd mirror " + name, replayer)},
172 {"rbd mirror status " + name,
173 new StatusCommand<I>("get status for rbd mirror " + name, replayer)},
174 {"rbd mirror stop " + name,
175 new StopCommand<I>("stop rbd mirror " + name, replayer)}} {
176 }
177
178 int register_commands() {
179 for (auto &it : commands) {
180 int r = admin_socket->register_command(it.first, it.first, this,
181 it.second->desc);
182 if (r < 0) {
183 return r;
184 }
185 it.second->registered = true;
186 }
187 return 0;
188 }
189
190 ~ImageReplayerAdminSocketHook() override {
191 for (auto &it : commands) {
192 if (it.second->registered) {
193 admin_socket->unregister_command(it.first);
194 }
195 delete it.second;
196 }
197 commands.clear();
198 }
199
200 bool call(std::string_view command, const cmdmap_t& cmdmap,
201 std::string_view format, bufferlist& out) override {
202 auto i = commands.find(command);
203 ceph_assert(i != commands.end());
204 Formatter *f = Formatter::create(format);
205 stringstream ss;
206 bool r = i->second->call(f, &ss);
207 delete f;
208 out.append(ss);
209 return r;
210 }
211
212 private:
213 typedef std::map<std::string, ImageReplayerAdminSocketCommand<I>*,
214 std::less<>> Commands;
215
216 AdminSocket *admin_socket;
217 Commands commands;
218 };
219
220 uint32_t calculate_replay_delay(const utime_t &event_time,
221 int mirroring_replay_delay) {
222 if (mirroring_replay_delay <= 0) {
223 return 0;
224 }
225
226 utime_t now = ceph_clock_now();
227 if (event_time + mirroring_replay_delay <= now) {
228 return 0;
229 }
230
231 // ensure it is rounded up when converting to integer
232 return (event_time + mirroring_replay_delay - now) + 1;
233 }
234
235 } // anonymous namespace
236
237 template <typename I>
238 void ImageReplayer<I>::BootstrapProgressContext::update_progress(
239 const std::string &description, bool flush)
240 {
241 const std::string desc = "bootstrapping, " + description;
242 replayer->set_state_description(0, desc);
243 if (flush) {
244 replayer->update_mirror_image_status(false, boost::none);
245 }
246 }
247
248 template <typename I>
249 void ImageReplayer<I>::RemoteJournalerListener::handle_update(
250 ::journal::JournalMetadata *) {
251 FunctionContext *ctx = new FunctionContext([this](int r) {
252 replayer->handle_remote_journal_metadata_updated();
253 });
254 replayer->m_threads->work_queue->queue(ctx, 0);
255 }
256
257 template <typename I>
258 ImageReplayer<I>::ImageReplayer(Threads<I> *threads,
259 InstanceWatcher<I> *instance_watcher,
260 RadosRef local,
261 const std::string &local_mirror_uuid,
262 int64_t local_pool_id,
263 const std::string &global_image_id) :
264 m_threads(threads),
265 m_instance_watcher(instance_watcher),
266 m_local(local),
267 m_local_mirror_uuid(local_mirror_uuid),
268 m_local_pool_id(local_pool_id),
269 m_global_image_id(global_image_id), m_local_image_name(global_image_id),
270 m_lock("rbd::mirror::ImageReplayer " + stringify(local_pool_id) + " " +
271 global_image_id),
272 m_progress_cxt(this),
273 m_journal_listener(new JournalListener(this)),
274 m_remote_listener(this)
275 {
276 // Register asok commands using a temporary "remote_pool_name/global_image_id"
277 // name. When the image name becomes known on start the asok commands will be
278 // re-registered using "remote_pool_name/remote_image_name" name.
279
280 std::string pool_name;
281 int r = m_local->pool_reverse_lookup(m_local_pool_id, &pool_name);
282 if (r < 0) {
283 derr << "error resolving local pool " << m_local_pool_id
284 << ": " << cpp_strerror(r) << dendl;
285 pool_name = stringify(m_local_pool_id);
286 }
287
288 m_name = pool_name + "/" + m_global_image_id;
289 register_admin_socket_hook();
290 }
291
292 template <typename I>
293 ImageReplayer<I>::~ImageReplayer()
294 {
295 unregister_admin_socket_hook();
296 ceph_assert(m_event_preprocessor == nullptr);
297 ceph_assert(m_replay_status_formatter == nullptr);
298 ceph_assert(m_local_image_ctx == nullptr);
299 ceph_assert(m_local_replay == nullptr);
300 ceph_assert(m_remote_journaler == nullptr);
301 ceph_assert(m_replay_handler == nullptr);
302 ceph_assert(m_on_start_finish == nullptr);
303 ceph_assert(m_on_stop_finish == nullptr);
304 ceph_assert(m_bootstrap_request == nullptr);
305 ceph_assert(m_in_flight_status_updates == 0);
306
307 delete m_journal_listener;
308 }
309
310 template <typename I>
311 image_replayer::HealthState ImageReplayer<I>::get_health_state() const {
312 Mutex::Locker locker(m_lock);
313
314 if (!m_mirror_image_status_state) {
315 return image_replayer::HEALTH_STATE_OK;
316 } else if (*m_mirror_image_status_state ==
317 cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING ||
318 *m_mirror_image_status_state ==
319 cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN) {
320 return image_replayer::HEALTH_STATE_WARNING;
321 }
322 return image_replayer::HEALTH_STATE_ERROR;
323 }
324
325 template <typename I>
326 void ImageReplayer<I>::add_peer(const std::string &peer_uuid,
327 librados::IoCtx &io_ctx) {
328 Mutex::Locker locker(m_lock);
329 auto it = m_peers.find({peer_uuid});
330 if (it == m_peers.end()) {
331 m_peers.insert({peer_uuid, io_ctx});
332 }
333 }
334
335 template <typename I>
336 void ImageReplayer<I>::set_state_description(int r, const std::string &desc) {
337 dout(10) << r << " " << desc << dendl;
338
339 Mutex::Locker l(m_lock);
340 m_last_r = r;
341 m_state_desc = desc;
342 }
343
344 template <typename I>
345 void ImageReplayer<I>::start(Context *on_finish, bool manual)
346 {
347 dout(10) << "on_finish=" << on_finish << dendl;
348
349 int r = 0;
350 {
351 Mutex::Locker locker(m_lock);
352 if (!is_stopped_()) {
353 derr << "already running" << dendl;
354 r = -EINVAL;
355 } else if (m_manual_stop && !manual) {
356 dout(5) << "stopped manually, ignoring start without manual flag"
357 << dendl;
358 r = -EPERM;
359 } else {
360 m_state = STATE_STARTING;
361 m_last_r = 0;
362 m_state_desc.clear();
363 m_manual_stop = false;
364 m_delete_requested = false;
365
366 if (on_finish != nullptr) {
367 ceph_assert(m_on_start_finish == nullptr);
368 m_on_start_finish = on_finish;
369 }
370 ceph_assert(m_on_stop_finish == nullptr);
371 }
372 }
373
374 if (r < 0) {
375 if (on_finish) {
376 on_finish->complete(r);
377 }
378 return;
379 }
380
381 m_local_ioctx.reset(new librados::IoCtx{});
382 r = m_local->ioctx_create2(m_local_pool_id, *m_local_ioctx);
383 if (r < 0) {
384 m_local_ioctx.reset();
385
386 derr << "error opening ioctx for local pool " << m_local_pool_id
387 << ": " << cpp_strerror(r) << dendl;
388 on_start_fail(r, "error opening local pool");
389 return;
390 }
391
392 prepare_local_image();
393 }
394
395 template <typename I>
396 void ImageReplayer<I>::prepare_local_image() {
397 dout(10) << dendl;
398
399 m_local_image_id = "";
400 Context *ctx = create_context_callback<
401 ImageReplayer, &ImageReplayer<I>::handle_prepare_local_image>(this);
402 auto req = PrepareLocalImageRequest<I>::create(
403 *m_local_ioctx, m_global_image_id, &m_local_image_id, &m_local_image_name,
404 &m_local_image_tag_owner, m_threads->work_queue, ctx);
405 req->send();
406 }
407
408 template <typename I>
409 void ImageReplayer<I>::handle_prepare_local_image(int r) {
410 dout(10) << "r=" << r << dendl;
411
412 if (r == -ENOENT) {
413 dout(10) << "local image does not exist" << dendl;
414 } else if (r < 0) {
415 on_start_fail(r, "error preparing local image for replay");
416 return;
417 } else {
418 reregister_admin_socket_hook();
419 }
420
421 // local image doesn't exist or is non-primary
422 prepare_remote_image();
423 }
424
425 template <typename I>
426 void ImageReplayer<I>::prepare_remote_image() {
427 dout(10) << dendl;
428 if (m_peers.empty()) {
429 // technically nothing to bootstrap, but it handles the status update
430 bootstrap();
431 return;
432 }
433
434 // TODO need to support multiple remote images
435 ceph_assert(!m_peers.empty());
436 m_remote_image = {*m_peers.begin()};
437
438 auto cct = static_cast<CephContext *>(m_local->cct());
439 journal::Settings journal_settings;
440 journal_settings.commit_interval = cct->_conf.get_val<double>(
441 "rbd_mirror_journal_commit_age");
442 journal_settings.max_fetch_bytes = cct->_conf.get_val<Option::size_t>(
443 "rbd_mirror_journal_max_fetch_bytes");
444
445 Context *ctx = create_context_callback<
446 ImageReplayer, &ImageReplayer<I>::handle_prepare_remote_image>(this);
447 auto req = PrepareRemoteImageRequest<I>::create(
448 m_threads, m_remote_image.io_ctx, m_global_image_id, m_local_mirror_uuid,
449 m_local_image_id, journal_settings, &m_remote_image.mirror_uuid,
450 &m_remote_image.image_id, &m_remote_journaler, &m_client_state,
451 &m_client_meta, ctx);
452 req->send();
453 }
454
455 template <typename I>
456 void ImageReplayer<I>::handle_prepare_remote_image(int r) {
457 dout(10) << "r=" << r << dendl;
458
459 ceph_assert(r < 0 ? m_remote_journaler == nullptr : m_remote_journaler != nullptr);
460 if (r < 0 && !m_local_image_id.empty() &&
461 m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) {
462 // local image is primary -- fall-through
463 } else if (r == -ENOENT) {
464 dout(10) << "remote image does not exist" << dendl;
465
466 // TODO need to support multiple remote images
467 if (m_remote_image.image_id.empty() && !m_local_image_id.empty() &&
468 m_local_image_tag_owner == m_remote_image.mirror_uuid) {
469 // local image exists and is non-primary and linked to the missing
470 // remote image
471
472 m_delete_requested = true;
473 on_start_fail(0, "remote image no longer exists");
474 } else {
475 on_start_fail(-ENOENT, "remote image does not exist");
476 }
477 return;
478 } else if (r < 0) {
479 on_start_fail(r, "error retrieving remote image id");
480 return;
481 }
482
483 bootstrap();
484 }
485
486 template <typename I>
487 void ImageReplayer<I>::bootstrap() {
488 dout(10) << dendl;
489
490 if (!m_local_image_id.empty() &&
491 m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) {
492 dout(5) << "local image is primary" << dendl;
493 on_start_fail(0, "local image is primary");
494 return;
495 } else if (m_peers.empty()) {
496 dout(5) << "no peer clusters" << dendl;
497 on_start_fail(-ENOENT, "no peer clusters");
498 return;
499 }
500
501 BootstrapRequest<I> *request = nullptr;
502 {
503 Mutex::Locker locker(m_lock);
504 if (on_start_interrupted(m_lock)) {
505 return;
506 }
507
508 auto ctx = create_context_callback<
509 ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
510 request = BootstrapRequest<I>::create(
511 m_threads, *m_local_ioctx, m_remote_image.io_ctx, m_instance_watcher,
512 &m_local_image_ctx, m_local_image_id, m_remote_image.image_id,
513 m_global_image_id, m_local_mirror_uuid, m_remote_image.mirror_uuid,
514 m_remote_journaler, &m_client_state, &m_client_meta, ctx,
515 &m_resync_requested, &m_progress_cxt);
516 request->get();
517 m_bootstrap_request = request;
518 }
519
520 update_mirror_image_status(false, boost::none);
521 reschedule_update_status_task(10);
522
523 request->send();
524 }
525
526 template <typename I>
527 void ImageReplayer<I>::handle_bootstrap(int r) {
528 dout(10) << "r=" << r << dendl;
529 {
530 Mutex::Locker locker(m_lock);
531 m_bootstrap_request->put();
532 m_bootstrap_request = nullptr;
533 if (m_local_image_ctx) {
534 m_local_image_id = m_local_image_ctx->id;
535 }
536 }
537
538 if (on_start_interrupted()) {
539 return;
540 } else if (r == -EREMOTEIO) {
541 m_local_image_tag_owner = "";
542 dout(5) << "remote image is non-primary" << dendl;
543 on_start_fail(-EREMOTEIO, "remote image is non-primary");
544 return;
545 } else if (r == -EEXIST) {
546 m_local_image_tag_owner = "";
547 on_start_fail(r, "split-brain detected");
548 return;
549 } else if (r < 0) {
550 on_start_fail(r, "error bootstrapping replay");
551 return;
552 } else if (m_resync_requested) {
553 on_start_fail(0, "resync requested");
554 return;
555 }
556
557 ceph_assert(m_local_journal == nullptr);
558 {
559 RWLock::RLocker snap_locker(m_local_image_ctx->snap_lock);
560 if (m_local_image_ctx->journal != nullptr) {
561 m_local_journal = m_local_image_ctx->journal;
562 m_local_journal->add_listener(m_journal_listener);
563 }
564 }
565
566 if (m_local_journal == nullptr) {
567 on_start_fail(-EINVAL, "error accessing local journal");
568 return;
569 }
570
571 update_mirror_image_status(false, boost::none);
572 init_remote_journaler();
573 }
574
575 template <typename I>
576 void ImageReplayer<I>::init_remote_journaler() {
577 dout(10) << dendl;
578
579 Context *ctx = create_context_callback<
580 ImageReplayer, &ImageReplayer<I>::handle_init_remote_journaler>(this);
581 m_remote_journaler->init(ctx);
582 }
583
584 template <typename I>
585 void ImageReplayer<I>::handle_init_remote_journaler(int r) {
586 dout(10) << "r=" << r << dendl;
587
588 if (on_start_interrupted()) {
589 return;
590 } else if (r < 0) {
591 derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl;
592 on_start_fail(r, "error initializing remote journal");
593 return;
594 }
595
596 m_remote_journaler->add_listener(&m_remote_listener);
597
598 cls::journal::Client client;
599 r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
600 if (r < 0) {
601 derr << "error retrieving remote journal client: " << cpp_strerror(r)
602 << dendl;
603 on_start_fail(r, "error retrieving remote journal client");
604 return;
605 }
606
607 dout(5) << "image_id=" << m_local_image_id << ", "
608 << "client_meta.image_id=" << m_client_meta.image_id << ", "
609 << "client.state=" << client.state << dendl;
610 if (m_client_meta.image_id == m_local_image_id &&
611 client.state != cls::journal::CLIENT_STATE_CONNECTED) {
612 dout(5) << "client flagged disconnected, stopping image replay" << dendl;
613 if (m_local_image_ctx->config.template get_val<bool>("rbd_mirroring_resync_after_disconnect")) {
614 m_resync_requested = true;
615 on_start_fail(-ENOTCONN, "disconnected: automatic resync");
616 } else {
617 on_start_fail(-ENOTCONN, "disconnected");
618 }
619 return;
620 }
621
622 start_replay();
623 }
624
625 template <typename I>
626 void ImageReplayer<I>::start_replay() {
627 dout(10) << dendl;
628
629 Context *start_ctx = create_context_callback<
630 ImageReplayer, &ImageReplayer<I>::handle_start_replay>(this);
631 m_local_journal->start_external_replay(&m_local_replay, start_ctx);
632 }
633
634 template <typename I>
635 void ImageReplayer<I>::handle_start_replay(int r) {
636 dout(10) << "r=" << r << dendl;
637
638 if (r < 0) {
639 ceph_assert(m_local_replay == nullptr);
640 derr << "error starting external replay on local image "
641 << m_local_image_id << ": " << cpp_strerror(r) << dendl;
642 on_start_fail(r, "error starting replay on local image");
643 return;
644 }
645
646 m_replay_status_formatter =
647 ReplayStatusFormatter<I>::create(m_remote_journaler, m_local_mirror_uuid);
648
649 Context *on_finish(nullptr);
650 {
651 Mutex::Locker locker(m_lock);
652 ceph_assert(m_state == STATE_STARTING);
653 m_state = STATE_REPLAYING;
654 std::swap(m_on_start_finish, on_finish);
655 }
656
657 m_event_preprocessor = EventPreprocessor<I>::create(
658 *m_local_image_ctx, *m_remote_journaler, m_local_mirror_uuid,
659 &m_client_meta, m_threads->work_queue);
660
661 update_mirror_image_status(true, boost::none);
662 reschedule_update_status_task(30);
663
664 if (on_replay_interrupted()) {
665 return;
666 }
667
668 {
669 CephContext *cct = static_cast<CephContext *>(m_local->cct());
670 double poll_seconds = cct->_conf.get_val<double>(
671 "rbd_mirror_journal_poll_age");
672
673 Mutex::Locker locker(m_lock);
674 m_replay_handler = new ReplayHandler<I>(this);
675 m_remote_journaler->start_live_replay(m_replay_handler, poll_seconds);
676
677 dout(10) << "m_remote_journaler=" << *m_remote_journaler << dendl;
678 }
679
680 dout(10) << "start succeeded" << dendl;
681 if (on_finish != nullptr) {
682 dout(10) << "on finish complete, r=" << r << dendl;
683 on_finish->complete(r);
684 }
685 }
686
687 template <typename I>
688 void ImageReplayer<I>::on_start_fail(int r, const std::string &desc)
689 {
690 dout(10) << "r=" << r << dendl;
691 Context *ctx = new FunctionContext([this, r, desc](int _r) {
692 {
693 Mutex::Locker locker(m_lock);
694 ceph_assert(m_state == STATE_STARTING);
695 m_state = STATE_STOPPING;
696 if (r < 0 && r != -ECANCELED && r != -EREMOTEIO && r != -ENOENT) {
697 derr << "start failed: " << cpp_strerror(r) << dendl;
698 } else {
699 dout(10) << "start canceled" << dendl;
700 }
701 }
702
703 set_state_description(r, desc);
704 if (m_local_ioctx) {
705 update_mirror_image_status(false, boost::none);
706 }
707 reschedule_update_status_task(-1);
708 shut_down(r);
709 });
710 m_threads->work_queue->queue(ctx, 0);
711 }
712
713 template <typename I>
714 bool ImageReplayer<I>::on_start_interrupted() {
715 Mutex::Locker locker(m_lock);
716 return on_start_interrupted(m_lock);
717 }
718
719 template <typename I>
720 bool ImageReplayer<I>::on_start_interrupted(Mutex& lock) {
721 ceph_assert(m_lock.is_locked());
722 ceph_assert(m_state == STATE_STARTING);
723 if (!m_stop_requested) {
724 return false;
725 }
726
727 on_start_fail(-ECANCELED, "");
728 return true;
729 }
730
731 template <typename I>
732 void ImageReplayer<I>::stop(Context *on_finish, bool manual, int r,
733 const std::string& desc)
734 {
735 dout(10) << "on_finish=" << on_finish << ", manual=" << manual
736 << ", desc=" << desc << dendl;
737
738 image_replayer::BootstrapRequest<I> *bootstrap_request = nullptr;
739 bool shut_down_replay = false;
740 bool running = true;
741 {
742 Mutex::Locker locker(m_lock);
743
744 if (!is_running_()) {
745 running = false;
746 } else {
747 if (!is_stopped_()) {
748 if (m_state == STATE_STARTING) {
749 dout(10) << "canceling start" << dendl;
750 if (m_bootstrap_request != nullptr) {
751 bootstrap_request = m_bootstrap_request;
752 bootstrap_request->get();
753 }
754 } else {
755 dout(10) << "interrupting replay" << dendl;
756 shut_down_replay = true;
757 }
758
759 ceph_assert(m_on_stop_finish == nullptr);
760 std::swap(m_on_stop_finish, on_finish);
761 m_stop_requested = true;
762 m_manual_stop = manual;
763 }
764 }
765 }
766
767 // avoid holding lock since bootstrap request will update status
768 if (bootstrap_request != nullptr) {
769 dout(10) << "canceling bootstrap" << dendl;
770 bootstrap_request->cancel();
771 bootstrap_request->put();
772 }
773
774 if (!running) {
775 dout(20) << "not running" << dendl;
776 if (on_finish) {
777 on_finish->complete(-EINVAL);
778 }
779 return;
780 }
781
782 if (shut_down_replay) {
783 on_stop_journal_replay(r, desc);
784 } else if (on_finish != nullptr) {
785 on_finish->complete(0);
786 }
787 }
788
789 template <typename I>
790 void ImageReplayer<I>::on_stop_journal_replay(int r, const std::string &desc)
791 {
792 dout(10) << dendl;
793
794 {
795 Mutex::Locker locker(m_lock);
796 if (m_state != STATE_REPLAYING) {
797 // might be invoked multiple times while stopping
798 return;
799 }
800 m_stop_requested = true;
801 m_state = STATE_STOPPING;
802 }
803
804 set_state_description(r, desc);
805 update_mirror_image_status(true, boost::none);
806 reschedule_update_status_task(-1);
807 shut_down(0);
808 }
809
810 template <typename I>
811 void ImageReplayer<I>::handle_replay_ready()
812 {
813 dout(20) << dendl;
814 if (on_replay_interrupted()) {
815 return;
816 }
817
818 if (!m_remote_journaler->try_pop_front(&m_replay_entry, &m_replay_tag_tid)) {
819 return;
820 }
821
822 m_event_replay_tracker.start_op();
823
824 m_lock.Lock();
825 bool stopping = (m_state == STATE_STOPPING);
826 m_lock.Unlock();
827
828 if (stopping) {
829 dout(10) << "stopping event replay" << dendl;
830 m_event_replay_tracker.finish_op();
831 return;
832 }
833
834 if (m_replay_tag_valid && m_replay_tag.tid == m_replay_tag_tid) {
835 preprocess_entry();
836 return;
837 }
838
839 replay_flush();
840 }
841
842 template <typename I>
843 void ImageReplayer<I>::restart(Context *on_finish)
844 {
845 FunctionContext *ctx = new FunctionContext(
846 [this, on_finish](int r) {
847 if (r < 0) {
848 // Try start anyway.
849 }
850 start(on_finish, true);
851 });
852 stop(ctx);
853 }
854
855 template <typename I>
856 void ImageReplayer<I>::flush(Context *on_finish)
857 {
858 dout(10) << dendl;
859
860 {
861 Mutex::Locker locker(m_lock);
862 if (m_state == STATE_REPLAYING) {
863 Context *ctx = new FunctionContext(
864 [on_finish](int r) {
865 if (on_finish != nullptr) {
866 on_finish->complete(r);
867 }
868 });
869 on_flush_local_replay_flush_start(ctx);
870 return;
871 }
872 }
873
874 if (on_finish) {
875 on_finish->complete(0);
876 }
877 }
878
879 template <typename I>
880 void ImageReplayer<I>::on_flush_local_replay_flush_start(Context *on_flush)
881 {
882 dout(10) << dendl;
883 FunctionContext *ctx = new FunctionContext(
884 [this, on_flush](int r) {
885 on_flush_local_replay_flush_finish(on_flush, r);
886 });
887
888 ceph_assert(m_lock.is_locked());
889 ceph_assert(m_state == STATE_REPLAYING);
890 m_local_replay->flush(ctx);
891 }
892
893 template <typename I>
894 void ImageReplayer<I>::on_flush_local_replay_flush_finish(Context *on_flush,
895 int r)
896 {
897 dout(10) << "r=" << r << dendl;
898 if (r < 0) {
899 derr << "error flushing local replay: " << cpp_strerror(r) << dendl;
900 on_flush->complete(r);
901 return;
902 }
903
904 on_flush_flush_commit_position_start(on_flush);
905 }
906
907 template <typename I>
908 void ImageReplayer<I>::on_flush_flush_commit_position_start(Context *on_flush)
909 {
910 FunctionContext *ctx = new FunctionContext(
911 [this, on_flush](int r) {
912 on_flush_flush_commit_position_finish(on_flush, r);
913 });
914
915 m_remote_journaler->flush_commit_position(ctx);
916 }
917
918 template <typename I>
919 void ImageReplayer<I>::on_flush_flush_commit_position_finish(Context *on_flush,
920 int r)
921 {
922 if (r < 0) {
923 derr << "error flushing remote journal commit position: "
924 << cpp_strerror(r) << dendl;
925 }
926
927 update_mirror_image_status(false, boost::none);
928
929 dout(20) << "flush complete, r=" << r << dendl;
930 on_flush->complete(r);
931 }
932
933 template <typename I>
934 bool ImageReplayer<I>::on_replay_interrupted()
935 {
936 bool shut_down;
937 {
938 Mutex::Locker locker(m_lock);
939 shut_down = m_stop_requested;
940 }
941
942 if (shut_down) {
943 on_stop_journal_replay();
944 }
945 return shut_down;
946 }
947
948 template <typename I>
949 void ImageReplayer<I>::print_status(Formatter *f, stringstream *ss)
950 {
951 dout(10) << dendl;
952
953 Mutex::Locker l(m_lock);
954
955 if (f) {
956 f->open_object_section("image_replayer");
957 f->dump_string("name", m_name);
958 f->dump_string("state", to_string(m_state));
959 f->close_section();
960 f->flush(*ss);
961 } else {
962 *ss << m_name << ": state: " << to_string(m_state);
963 }
964 }
965
966 template <typename I>
967 void ImageReplayer<I>::handle_replay_complete(int r, const std::string &error_desc)
968 {
969 dout(10) << "r=" << r << dendl;
970 if (r < 0) {
971 derr << "replay encountered an error: " << cpp_strerror(r) << dendl;
972 set_state_description(r, error_desc);
973 }
974
975 {
976 Mutex::Locker locker(m_lock);
977 m_stop_requested = true;
978 }
979 on_replay_interrupted();
980 }
981
982 template <typename I>
983 void ImageReplayer<I>::replay_flush() {
984 dout(10) << dendl;
985
986 bool interrupted = false;
987 {
988 Mutex::Locker locker(m_lock);
989 if (m_state != STATE_REPLAYING) {
990 dout(10) << "replay interrupted" << dendl;
991 interrupted = true;
992 } else {
993 m_state = STATE_REPLAY_FLUSHING;
994 }
995 }
996
997 if (interrupted) {
998 m_event_replay_tracker.finish_op();
999 return;
1000 }
1001
1002 // shut down the replay to flush all IO and ops and create a new
1003 // replayer to handle the new tag epoch
1004 Context *ctx = create_context_callback<
1005 ImageReplayer<I>, &ImageReplayer<I>::handle_replay_flush>(this);
1006 ctx = new FunctionContext([this, ctx](int r) {
1007 m_local_image_ctx->journal->stop_external_replay();
1008 m_local_replay = nullptr;
1009
1010 if (r < 0) {
1011 ctx->complete(r);
1012 return;
1013 }
1014
1015 m_local_journal->start_external_replay(&m_local_replay, ctx);
1016 });
1017 m_local_replay->shut_down(false, ctx);
1018 }
1019
1020 template <typename I>
1021 void ImageReplayer<I>::handle_replay_flush(int r) {
1022 dout(10) << "r=" << r << dendl;
1023
1024 {
1025 Mutex::Locker locker(m_lock);
1026 ceph_assert(m_state == STATE_REPLAY_FLUSHING);
1027 m_state = STATE_REPLAYING;
1028 }
1029
1030 if (r < 0) {
1031 derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl;
1032 m_event_replay_tracker.finish_op();
1033 handle_replay_complete(r, "replay flush encountered an error");
1034 return;
1035 } else if (on_replay_interrupted()) {
1036 m_event_replay_tracker.finish_op();
1037 return;
1038 }
1039
1040 get_remote_tag();
1041 }
1042
1043 template <typename I>
1044 void ImageReplayer<I>::get_remote_tag() {
1045 dout(15) << "tag_tid: " << m_replay_tag_tid << dendl;
1046
1047 Context *ctx = create_context_callback<
1048 ImageReplayer, &ImageReplayer<I>::handle_get_remote_tag>(this);
1049 m_remote_journaler->get_tag(m_replay_tag_tid, &m_replay_tag, ctx);
1050 }
1051
1052 template <typename I>
1053 void ImageReplayer<I>::handle_get_remote_tag(int r) {
1054 dout(15) << "r=" << r << dendl;
1055
1056 if (r == 0) {
1057 try {
1058 auto it = m_replay_tag.data.cbegin();
1059 decode(m_replay_tag_data, it);
1060 } catch (const buffer::error &err) {
1061 r = -EBADMSG;
1062 }
1063 }
1064
1065 if (r < 0) {
1066 derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": "
1067 << cpp_strerror(r) << dendl;
1068 m_event_replay_tracker.finish_op();
1069 handle_replay_complete(r, "failed to retrieve remote tag");
1070 return;
1071 }
1072
1073 m_replay_tag_valid = true;
1074 dout(15) << "decoded remote tag " << m_replay_tag_tid << ": "
1075 << m_replay_tag_data << dendl;
1076
1077 allocate_local_tag();
1078 }
1079
1080 template <typename I>
1081 void ImageReplayer<I>::allocate_local_tag() {
1082 dout(15) << dendl;
1083
1084 std::string mirror_uuid = m_replay_tag_data.mirror_uuid;
1085 if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
1086 mirror_uuid = m_remote_image.mirror_uuid;
1087 } else if (mirror_uuid == m_local_mirror_uuid) {
1088 mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
1089 } else if (mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) {
1090 // handle possible edge condition where daemon can failover and
1091 // the local image has already been promoted/demoted
1092 auto local_tag_data = m_local_journal->get_tag_data();
1093 if (local_tag_data.mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID &&
1094 (local_tag_data.predecessor.commit_valid &&
1095 local_tag_data.predecessor.mirror_uuid ==
1096 librbd::Journal<>::LOCAL_MIRROR_UUID)) {
1097 dout(15) << "skipping stale demotion event" << dendl;
1098 handle_process_entry_safe(m_replay_entry, m_replay_start_time, 0);
1099 handle_replay_ready();
1100 return;
1101 } else {
1102 dout(5) << "encountered image demotion: stopping" << dendl;
1103 Mutex::Locker locker(m_lock);
1104 m_stop_requested = true;
1105 }
1106 }
1107
1108 librbd::journal::TagPredecessor predecessor(m_replay_tag_data.predecessor);
1109 if (predecessor.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
1110 predecessor.mirror_uuid = m_remote_image.mirror_uuid;
1111 } else if (predecessor.mirror_uuid == m_local_mirror_uuid) {
1112 predecessor.mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
1113 }
1114
1115 dout(15) << "mirror_uuid=" << mirror_uuid << ", "
1116 << "predecessor=" << predecessor << ", "
1117 << "replay_tag_tid=" << m_replay_tag_tid << dendl;
1118 Context *ctx = create_context_callback<
1119 ImageReplayer, &ImageReplayer<I>::handle_allocate_local_tag>(this);
1120 m_local_journal->allocate_tag(mirror_uuid, predecessor, ctx);
1121 }
1122
1123 template <typename I>
1124 void ImageReplayer<I>::handle_allocate_local_tag(int r) {
1125 dout(15) << "r=" << r << ", "
1126 << "tag_tid=" << m_local_journal->get_tag_tid() << dendl;
1127
1128 if (r < 0) {
1129 derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl;
1130 m_event_replay_tracker.finish_op();
1131 handle_replay_complete(r, "failed to allocate journal tag");
1132 return;
1133 }
1134
1135 preprocess_entry();
1136 }
1137
1138 template <typename I>
1139 void ImageReplayer<I>::preprocess_entry() {
1140 dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid()
1141 << dendl;
1142
1143 bufferlist data = m_replay_entry.get_data();
1144 auto it = data.cbegin();
1145 int r = m_local_replay->decode(&it, &m_event_entry);
1146 if (r < 0) {
1147 derr << "failed to decode journal event" << dendl;
1148 m_event_replay_tracker.finish_op();
1149 handle_replay_complete(r, "failed to decode journal event");
1150 return;
1151 }
1152
1153 uint32_t delay = calculate_replay_delay(
1154 m_event_entry.timestamp, m_local_image_ctx->mirroring_replay_delay);
1155 if (delay == 0) {
1156 handle_preprocess_entry_ready(0);
1157 return;
1158 }
1159
1160 dout(20) << "delaying replay by " << delay << " sec" << dendl;
1161
1162 Mutex::Locker timer_locker(m_threads->timer_lock);
1163 ceph_assert(m_delayed_preprocess_task == nullptr);
1164 m_delayed_preprocess_task = new FunctionContext(
1165 [this](int r) {
1166 ceph_assert(m_threads->timer_lock.is_locked());
1167 m_delayed_preprocess_task = nullptr;
1168 m_threads->work_queue->queue(
1169 create_context_callback<ImageReplayer,
1170 &ImageReplayer<I>::handle_preprocess_entry_ready>(this), 0);
1171 });
1172 m_threads->timer->add_event_after(delay, m_delayed_preprocess_task);
1173 }
1174
1175 template <typename I>
1176 void ImageReplayer<I>::handle_preprocess_entry_ready(int r) {
1177 dout(20) << "r=" << r << dendl;
1178 ceph_assert(r == 0);
1179
1180 m_replay_start_time = ceph_clock_now();
1181 if (!m_event_preprocessor->is_required(m_event_entry)) {
1182 process_entry();
1183 return;
1184 }
1185
1186 Context *ctx = create_context_callback<
1187 ImageReplayer, &ImageReplayer<I>::handle_preprocess_entry_safe>(this);
1188 m_event_preprocessor->preprocess(&m_event_entry, ctx);
1189 }
1190
1191 template <typename I>
1192 void ImageReplayer<I>::handle_preprocess_entry_safe(int r) {
1193 dout(20) << "r=" << r << dendl;
1194
1195 if (r < 0) {
1196 m_event_replay_tracker.finish_op();
1197
1198 if (r == -ECANCELED) {
1199 handle_replay_complete(0, "lost exclusive lock");
1200 } else {
1201 derr << "failed to preprocess journal event" << dendl;
1202 handle_replay_complete(r, "failed to preprocess journal event");
1203 }
1204 return;
1205 }
1206
1207 process_entry();
1208 }
1209
1210 template <typename I>
1211 void ImageReplayer<I>::process_entry() {
1212 dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid()
1213 << dendl;
1214
1215 // stop replaying events if stop has been requested
1216 if (on_replay_interrupted()) {
1217 m_event_replay_tracker.finish_op();
1218 return;
1219 }
1220
1221 Context *on_ready = create_context_callback<
1222 ImageReplayer, &ImageReplayer<I>::handle_process_entry_ready>(this);
1223 Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry),
1224 m_replay_start_time);
1225
1226 m_local_replay->process(m_event_entry, on_ready, on_commit);
1227 }
1228
1229 template <typename I>
1230 void ImageReplayer<I>::handle_process_entry_ready(int r) {
1231 dout(20) << dendl;
1232 ceph_assert(r == 0);
1233
1234 bool update_status = false;
1235 {
1236 RWLock::RLocker snap_locker(m_local_image_ctx->snap_lock);
1237 if (m_local_image_name != m_local_image_ctx->name) {
1238 m_local_image_name = m_local_image_ctx->name;
1239 update_status = true;
1240 }
1241 }
1242
1243 if (update_status) {
1244 reschedule_update_status_task(0);
1245 }
1246
1247 // attempt to process the next event
1248 handle_replay_ready();
1249 }
1250
1251 template <typename I>
1252 void ImageReplayer<I>::handle_process_entry_safe(const ReplayEntry &replay_entry,
1253 const utime_t &replay_start_time,
1254 int r) {
1255 dout(20) << "commit_tid=" << replay_entry.get_commit_tid() << ", r=" << r
1256 << dendl;
1257
1258 if (r < 0) {
1259 derr << "failed to commit journal event: " << cpp_strerror(r) << dendl;
1260 handle_replay_complete(r, "failed to commit journal event");
1261 } else {
1262 ceph_assert(m_remote_journaler != nullptr);
1263 m_remote_journaler->committed(replay_entry);
1264 }
1265
1266 auto bytes = replay_entry.get_data().length();
1267 auto latency = ceph_clock_now() - replay_start_time;
1268
1269 if (g_perf_counters) {
1270 g_perf_counters->inc(l_rbd_mirror_replay);
1271 g_perf_counters->inc(l_rbd_mirror_replay_bytes, bytes);
1272 g_perf_counters->tinc(l_rbd_mirror_replay_latency, latency);
1273 }
1274
1275 auto ctx = new FunctionContext(
1276 [this, bytes, latency](int r) {
1277 Mutex::Locker locker(m_lock);
1278 if (m_perf_counters) {
1279 m_perf_counters->inc(l_rbd_mirror_replay);
1280 m_perf_counters->inc(l_rbd_mirror_replay_bytes, bytes);
1281 m_perf_counters->tinc(l_rbd_mirror_replay_latency, latency);
1282 }
1283 m_event_replay_tracker.finish_op();
1284 });
1285 m_threads->work_queue->queue(ctx, 0);
1286 }
1287
1288 template <typename I>
1289 bool ImageReplayer<I>::update_mirror_image_status(bool force,
1290 const OptionalState &state) {
1291 dout(15) << dendl;
1292 {
1293 Mutex::Locker locker(m_lock);
1294 if (!start_mirror_image_status_update(force, false)) {
1295 return false;
1296 }
1297 }
1298
1299 queue_mirror_image_status_update(state);
1300 return true;
1301 }
1302
1303 template <typename I>
1304 bool ImageReplayer<I>::start_mirror_image_status_update(bool force,
1305 bool restarting) {
1306 ceph_assert(m_lock.is_locked());
1307
1308 if (!force && !is_stopped_()) {
1309 if (!is_running_()) {
1310 dout(15) << "shut down in-progress: ignoring update" << dendl;
1311 return false;
1312 } else if (m_in_flight_status_updates > (restarting ? 1 : 0)) {
1313 dout(15) << "already sending update" << dendl;
1314 m_update_status_requested = true;
1315 return false;
1316 }
1317 }
1318
1319 ++m_in_flight_status_updates;
1320 dout(15) << "in-flight updates=" << m_in_flight_status_updates << dendl;
1321 return true;
1322 }
1323
1324 template <typename I>
1325 void ImageReplayer<I>::finish_mirror_image_status_update() {
1326 reregister_admin_socket_hook();
1327
1328 Context *on_finish = nullptr;
1329 {
1330 Mutex::Locker locker(m_lock);
1331 ceph_assert(m_in_flight_status_updates > 0);
1332 if (--m_in_flight_status_updates > 0) {
1333 dout(15) << "waiting on " << m_in_flight_status_updates << " in-flight "
1334 << "updates" << dendl;
1335 return;
1336 }
1337
1338 std::swap(on_finish, m_on_update_status_finish);
1339 }
1340
1341 dout(15) << dendl;
1342 if (on_finish != nullptr) {
1343 on_finish->complete(0);
1344 }
1345 }
1346
1347 template <typename I>
1348 void ImageReplayer<I>::queue_mirror_image_status_update(const OptionalState &state) {
1349 dout(15) << dendl;
1350 FunctionContext *ctx = new FunctionContext(
1351 [this, state](int r) {
1352 send_mirror_status_update(state);
1353 });
1354 m_threads->work_queue->queue(ctx, 0);
1355 }
1356
1357 template <typename I>
1358 void ImageReplayer<I>::send_mirror_status_update(const OptionalState &opt_state) {
1359 State state;
1360 std::string state_desc;
1361 int last_r;
1362 bool stopping_replay;
1363
1364 OptionalMirrorImageStatusState mirror_image_status_state =
1365 boost::make_optional(false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN);
1366 image_replayer::BootstrapRequest<I>* bootstrap_request = nullptr;
1367 {
1368 Mutex::Locker locker(m_lock);
1369 state = m_state;
1370 state_desc = m_state_desc;
1371 mirror_image_status_state = m_mirror_image_status_state;
1372 last_r = m_last_r;
1373 stopping_replay = (m_local_image_ctx != nullptr);
1374
1375 if (m_bootstrap_request != nullptr) {
1376 bootstrap_request = m_bootstrap_request;
1377 bootstrap_request->get();
1378 }
1379 }
1380
1381 bool syncing = false;
1382 if (bootstrap_request != nullptr) {
1383 syncing = bootstrap_request->is_syncing();
1384 bootstrap_request->put();
1385 bootstrap_request = nullptr;
1386 }
1387
1388 if (opt_state) {
1389 state = *opt_state;
1390 }
1391
1392 cls::rbd::MirrorImageStatus status;
1393 status.up = true;
1394 switch (state) {
1395 case STATE_STARTING:
1396 if (syncing) {
1397 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING;
1398 status.description = state_desc.empty() ? "syncing" : state_desc;
1399 mirror_image_status_state = status.state;
1400 } else {
1401 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY;
1402 status.description = "starting replay";
1403 }
1404 break;
1405 case STATE_REPLAYING:
1406 case STATE_REPLAY_FLUSHING:
1407 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
1408 {
1409 Context *on_req_finish = new FunctionContext(
1410 [this](int r) {
1411 dout(15) << "replay status ready: r=" << r << dendl;
1412 if (r >= 0) {
1413 send_mirror_status_update(boost::none);
1414 } else if (r == -EAGAIN) {
1415 // decrement in-flight status update counter
1416 handle_mirror_status_update(r);
1417 }
1418 });
1419
1420 std::string desc;
1421 ceph_assert(m_replay_status_formatter != nullptr);
1422 if (!m_replay_status_formatter->get_or_send_update(&desc,
1423 on_req_finish)) {
1424 dout(15) << "waiting for replay status" << dendl;
1425 return;
1426 }
1427 status.description = "replaying, " + desc;
1428 mirror_image_status_state = boost::make_optional(
1429 false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN);
1430 }
1431 break;
1432 case STATE_STOPPING:
1433 if (stopping_replay) {
1434 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY;
1435 status.description = state_desc.empty() ? "stopping replay" : state_desc;
1436 break;
1437 }
1438 // FALLTHROUGH
1439 case STATE_STOPPED:
1440 if (last_r == -EREMOTEIO) {
1441 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN;
1442 status.description = state_desc;
1443 mirror_image_status_state = status.state;
1444 } else if (last_r < 0) {
1445 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR;
1446 status.description = state_desc;
1447 mirror_image_status_state = status.state;
1448 } else {
1449 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED;
1450 status.description = state_desc.empty() ? "stopped" : state_desc;
1451 mirror_image_status_state = boost::none;
1452 }
1453 break;
1454 default:
1455 ceph_assert(!"invalid state");
1456 }
1457
1458 {
1459 Mutex::Locker locker(m_lock);
1460 m_mirror_image_status_state = mirror_image_status_state;
1461 }
1462
1463 // prevent the status from ping-ponging when failed replays are restarted
1464 if (mirror_image_status_state &&
1465 *mirror_image_status_state == cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR) {
1466 status.state = *mirror_image_status_state;
1467 }
1468
1469 dout(15) << "status=" << status << dendl;
1470 librados::ObjectWriteOperation op;
1471 librbd::cls_client::mirror_image_status_set(&op, m_global_image_id, status);
1472
1473 ceph_assert(m_local_ioctx);
1474 librados::AioCompletion *aio_comp = create_rados_callback<
1475 ImageReplayer<I>, &ImageReplayer<I>::handle_mirror_status_update>(this);
1476 int r = m_local_ioctx->aio_operate(RBD_MIRRORING, aio_comp, &op);
1477 ceph_assert(r == 0);
1478 aio_comp->release();
1479 }
1480
1481 template <typename I>
1482 void ImageReplayer<I>::handle_mirror_status_update(int r) {
1483 dout(15) << "r=" << r << dendl;
1484
1485 bool running = false;
1486 bool started = false;
1487 {
1488 Mutex::Locker locker(m_lock);
1489 bool update_status_requested = false;
1490 std::swap(update_status_requested, m_update_status_requested);
1491
1492 running = is_running_();
1493 if (running && update_status_requested) {
1494 started = start_mirror_image_status_update(false, true);
1495 }
1496 }
1497
1498 // if a deferred update is available, send it -- otherwise reschedule
1499 // the timer task
1500 if (started) {
1501 queue_mirror_image_status_update(boost::none);
1502 } else if (running) {
1503 reschedule_update_status_task(0);
1504 }
1505
1506 // mark committed status update as no longer in-flight
1507 finish_mirror_image_status_update();
1508 }
1509
1510 template <typename I>
1511 void ImageReplayer<I>::reschedule_update_status_task(int new_interval) {
1512 bool canceled_task = false;
1513 {
1514 Mutex::Locker locker(m_lock);
1515 Mutex::Locker timer_locker(m_threads->timer_lock);
1516
1517 if (m_update_status_task) {
1518 dout(15) << "canceling existing status update task" << dendl;
1519
1520 canceled_task = m_threads->timer->cancel_event(m_update_status_task);
1521 m_update_status_task = nullptr;
1522 }
1523
1524 if (new_interval > 0) {
1525 m_update_status_interval = new_interval;
1526 }
1527
1528 if (new_interval >= 0 && is_running_() &&
1529 start_mirror_image_status_update(true, false)) {
1530 m_update_status_task = new FunctionContext(
1531 [this](int r) {
1532 ceph_assert(m_threads->timer_lock.is_locked());
1533 m_update_status_task = nullptr;
1534
1535 queue_mirror_image_status_update(boost::none);
1536 });
1537 dout(15) << "scheduling status update task after "
1538 << m_update_status_interval << " seconds" << dendl;
1539 m_threads->timer->add_event_after(m_update_status_interval,
1540 m_update_status_task);
1541 }
1542 }
1543
1544 if (canceled_task) {
1545 // decrement in-flight status update counter for canceled task
1546 finish_mirror_image_status_update();
1547 }
1548 }
1549
1550 template <typename I>
1551 void ImageReplayer<I>::shut_down(int r) {
1552 dout(10) << "r=" << r << dendl;
1553
1554 bool canceled_delayed_preprocess_task = false;
1555 {
1556 Mutex::Locker timer_locker(m_threads->timer_lock);
1557 if (m_delayed_preprocess_task != nullptr) {
1558 canceled_delayed_preprocess_task = m_threads->timer->cancel_event(
1559 m_delayed_preprocess_task);
1560 ceph_assert(canceled_delayed_preprocess_task);
1561 m_delayed_preprocess_task = nullptr;
1562 }
1563 }
1564 if (canceled_delayed_preprocess_task) {
1565 // wake up sleeping replay
1566 m_event_replay_tracker.finish_op();
1567 }
1568
1569 reschedule_update_status_task(-1);
1570
1571 {
1572 Mutex::Locker locker(m_lock);
1573 ceph_assert(m_state == STATE_STOPPING);
1574
1575 // if status updates are in-flight, wait for them to complete
1576 // before proceeding
1577 if (m_in_flight_status_updates > 0) {
1578 if (m_on_update_status_finish == nullptr) {
1579 dout(15) << "waiting for in-flight status update" << dendl;
1580 m_on_update_status_finish = new FunctionContext(
1581 [this, r](int _r) {
1582 shut_down(r);
1583 });
1584 }
1585 return;
1586 }
1587 }
1588
1589 // NOTE: it's important to ensure that the local image is fully
1590 // closed before attempting to close the remote journal in
1591 // case the remote cluster is unreachable
1592
1593 // chain the shut down sequence (reverse order)
1594 Context *ctx = new FunctionContext(
1595 [this, r](int _r) {
1596 if (m_local_ioctx) {
1597 update_mirror_image_status(true, STATE_STOPPED);
1598 }
1599 handle_shut_down(r);
1600 });
1601
1602 // close the remote journal
1603 if (m_remote_journaler != nullptr) {
1604 ctx = new FunctionContext([this, ctx](int r) {
1605 delete m_remote_journaler;
1606 m_remote_journaler = nullptr;
1607 ctx->complete(0);
1608 });
1609 ctx = new FunctionContext([this, ctx](int r) {
1610 m_remote_journaler->remove_listener(&m_remote_listener);
1611 m_remote_journaler->shut_down(ctx);
1612 });
1613 }
1614
1615 // stop the replay of remote journal events
1616 if (m_replay_handler != nullptr) {
1617 ctx = new FunctionContext([this, ctx](int r) {
1618 delete m_replay_handler;
1619 m_replay_handler = nullptr;
1620
1621 m_event_replay_tracker.wait_for_ops(ctx);
1622 });
1623 ctx = new FunctionContext([this, ctx](int r) {
1624 m_remote_journaler->stop_replay(ctx);
1625 });
1626 }
1627
1628 // close the local image (release exclusive lock)
1629 if (m_local_image_ctx) {
1630 ctx = new FunctionContext([this, ctx](int r) {
1631 CloseImageRequest<I> *request = CloseImageRequest<I>::create(
1632 &m_local_image_ctx, ctx);
1633 request->send();
1634 });
1635 }
1636
1637 // shut down event replay into the local image
1638 if (m_local_journal != nullptr) {
1639 ctx = new FunctionContext([this, ctx](int r) {
1640 m_local_journal = nullptr;
1641 ctx->complete(0);
1642 });
1643 if (m_local_replay != nullptr) {
1644 ctx = new FunctionContext([this, ctx](int r) {
1645 m_local_journal->stop_external_replay();
1646 m_local_replay = nullptr;
1647
1648 EventPreprocessor<I>::destroy(m_event_preprocessor);
1649 m_event_preprocessor = nullptr;
1650 ctx->complete(0);
1651 });
1652 }
1653 ctx = new FunctionContext([this, ctx](int r) {
1654 // blocks if listener notification is in-progress
1655 m_local_journal->remove_listener(m_journal_listener);
1656 ctx->complete(0);
1657 });
1658 }
1659
1660 // wait for all local in-flight replay events to complete
1661 ctx = new FunctionContext([this, ctx](int r) {
1662 if (r < 0) {
1663 derr << "error shutting down journal replay: " << cpp_strerror(r)
1664 << dendl;
1665 }
1666
1667 m_event_replay_tracker.wait_for_ops(ctx);
1668 });
1669
1670 // flush any local in-flight replay events
1671 if (m_local_replay != nullptr) {
1672 ctx = new FunctionContext([this, ctx](int r) {
1673 m_local_replay->shut_down(true, ctx);
1674 });
1675 }
1676
1677 m_threads->work_queue->queue(ctx, 0);
1678 }
1679
1680 template <typename I>
1681 void ImageReplayer<I>::handle_shut_down(int r) {
1682 reschedule_update_status_task(-1);
1683
1684 bool resync_requested = false;
1685 bool delete_requested = false;
1686 bool unregister_asok_hook = false;
1687 {
1688 Mutex::Locker locker(m_lock);
1689
1690 // if status updates are in-flight, wait for them to complete
1691 // before proceeding
1692 if (m_in_flight_status_updates > 0) {
1693 if (m_on_update_status_finish == nullptr) {
1694 dout(15) << "waiting for in-flight status update" << dendl;
1695 m_on_update_status_finish = new FunctionContext(
1696 [this, r](int _r) {
1697 handle_shut_down(r);
1698 });
1699 }
1700 return;
1701 }
1702
1703 if (m_delete_requested && !m_local_image_id.empty()) {
1704 ceph_assert(m_remote_image.image_id.empty());
1705 dout(0) << "remote image no longer exists: scheduling deletion" << dendl;
1706 unregister_asok_hook = true;
1707 std::swap(delete_requested, m_delete_requested);
1708 }
1709
1710 std::swap(resync_requested, m_resync_requested);
1711 if (delete_requested || resync_requested) {
1712 m_local_image_id = "";
1713 } else if (m_last_r == -ENOENT &&
1714 m_local_image_id.empty() && m_remote_image.image_id.empty()) {
1715 dout(0) << "mirror image no longer exists" << dendl;
1716 unregister_asok_hook = true;
1717 m_finished = true;
1718 }
1719 }
1720
1721 if (unregister_asok_hook) {
1722 unregister_admin_socket_hook();
1723 }
1724
1725 if (delete_requested || resync_requested) {
1726 dout(5) << "moving image to trash" << dendl;
1727 auto ctx = new FunctionContext([this, r](int) {
1728 handle_shut_down(r);
1729 });
1730 ImageDeleter<I>::trash_move(*m_local_ioctx, m_global_image_id,
1731 resync_requested, m_threads->work_queue, ctx);
1732 return;
1733 }
1734
1735 dout(10) << "stop complete" << dendl;
1736 ReplayStatusFormatter<I>::destroy(m_replay_status_formatter);
1737 m_replay_status_formatter = nullptr;
1738
1739 Context *on_start = nullptr;
1740 Context *on_stop = nullptr;
1741 {
1742 Mutex::Locker locker(m_lock);
1743 std::swap(on_start, m_on_start_finish);
1744 std::swap(on_stop, m_on_stop_finish);
1745 m_stop_requested = false;
1746 ceph_assert(m_delayed_preprocess_task == nullptr);
1747 ceph_assert(m_state == STATE_STOPPING);
1748 m_state = STATE_STOPPED;
1749 }
1750
1751 if (on_start != nullptr) {
1752 dout(10) << "on start finish complete, r=" << r << dendl;
1753 on_start->complete(r);
1754 r = 0;
1755 }
1756 if (on_stop != nullptr) {
1757 dout(10) << "on stop finish complete, r=" << r << dendl;
1758 on_stop->complete(r);
1759 }
1760 }
1761
1762 template <typename I>
1763 void ImageReplayer<I>::handle_remote_journal_metadata_updated() {
1764 dout(20) << dendl;
1765
1766 cls::journal::Client client;
1767 {
1768 Mutex::Locker locker(m_lock);
1769 if (!is_running_()) {
1770 return;
1771 }
1772
1773 int r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
1774 if (r < 0) {
1775 derr << "failed to retrieve client: " << cpp_strerror(r) << dendl;
1776 return;
1777 }
1778 }
1779
1780 if (client.state != cls::journal::CLIENT_STATE_CONNECTED) {
1781 dout(0) << "client flagged disconnected, stopping image replay" << dendl;
1782 stop(nullptr, false, -ENOTCONN, "disconnected");
1783 }
1784 }
1785
1786 template <typename I>
1787 std::string ImageReplayer<I>::to_string(const State state) {
1788 switch (state) {
1789 case ImageReplayer<I>::STATE_STARTING:
1790 return "Starting";
1791 case ImageReplayer<I>::STATE_REPLAYING:
1792 return "Replaying";
1793 case ImageReplayer<I>::STATE_REPLAY_FLUSHING:
1794 return "ReplayFlushing";
1795 case ImageReplayer<I>::STATE_STOPPING:
1796 return "Stopping";
1797 case ImageReplayer<I>::STATE_STOPPED:
1798 return "Stopped";
1799 default:
1800 break;
1801 }
1802 return "Unknown(" + stringify(state) + ")";
1803 }
1804
1805 template <typename I>
1806 void ImageReplayer<I>::resync_image(Context *on_finish) {
1807 dout(10) << dendl;
1808
1809 m_resync_requested = true;
1810 stop(on_finish);
1811 }
1812
1813 template <typename I>
1814 void ImageReplayer<I>::register_admin_socket_hook() {
1815 ImageReplayerAdminSocketHook<I> *asok_hook;
1816 {
1817 Mutex::Locker locker(m_lock);
1818 if (m_asok_hook != nullptr) {
1819 return;
1820 }
1821
1822 ceph_assert(m_perf_counters == nullptr);
1823
1824 dout(15) << "registered asok hook: " << m_name << dendl;
1825 asok_hook = new ImageReplayerAdminSocketHook<I>(g_ceph_context, m_name,
1826 this);
1827 int r = asok_hook->register_commands();
1828 if (r == 0) {
1829 m_asok_hook = asok_hook;
1830
1831 CephContext *cct = static_cast<CephContext *>(m_local->cct());
1832 auto prio = cct->_conf.get_val<int64_t>("rbd_mirror_perf_stats_prio");
1833 PerfCountersBuilder plb(g_ceph_context, "rbd_mirror_" + m_name,
1834 l_rbd_mirror_first, l_rbd_mirror_last);
1835 plb.add_u64_counter(l_rbd_mirror_replay, "replay", "Replays", "r", prio);
1836 plb.add_u64_counter(l_rbd_mirror_replay_bytes, "replay_bytes",
1837 "Replayed data", "rb", prio, unit_t(UNIT_BYTES));
1838 plb.add_time_avg(l_rbd_mirror_replay_latency, "replay_latency",
1839 "Replay latency", "rl", prio);
1840 m_perf_counters = plb.create_perf_counters();
1841 g_ceph_context->get_perfcounters_collection()->add(m_perf_counters);
1842
1843 return;
1844 }
1845 derr << "error registering admin socket commands" << dendl;
1846 }
1847 delete asok_hook;
1848 }
1849
1850 template <typename I>
1851 void ImageReplayer<I>::unregister_admin_socket_hook() {
1852 dout(15) << dendl;
1853
1854 AdminSocketHook *asok_hook = nullptr;
1855 PerfCounters *perf_counters = nullptr;
1856 {
1857 Mutex::Locker locker(m_lock);
1858 std::swap(asok_hook, m_asok_hook);
1859 std::swap(perf_counters, m_perf_counters);
1860 }
1861 delete asok_hook;
1862 if (perf_counters != nullptr) {
1863 g_ceph_context->get_perfcounters_collection()->remove(perf_counters);
1864 delete perf_counters;
1865 }
1866 }
1867
1868 template <typename I>
1869 void ImageReplayer<I>::reregister_admin_socket_hook() {
1870 {
1871 Mutex::Locker locker(m_lock);
1872 auto name = m_local_ioctx->get_pool_name() + "/" + m_local_image_name;
1873 if (m_asok_hook != nullptr && m_name == name) {
1874 return;
1875 }
1876 m_name = name;
1877 }
1878 unregister_admin_socket_hook();
1879 register_admin_socket_hook();
1880 }
1881
1882 template <typename I>
1883 std::ostream &operator<<(std::ostream &os, const ImageReplayer<I> &replayer)
1884 {
1885 os << "ImageReplayer: " << &replayer << " [" << replayer.get_local_pool_id()
1886 << "/" << replayer.get_global_image_id() << "]";
1887 return os;
1888 }
1889
1890 } // namespace mirror
1891 } // namespace rbd
1892
1893 template class rbd::mirror::ImageReplayer<librbd::ImageCtx>;