]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/ImageReplayer.cc
update sources to v12.2.1
[ceph.git] / ceph / src / tools / rbd_mirror / ImageReplayer.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "include/compat.h"
5#include "common/Formatter.h"
6#include "common/debug.h"
7#include "common/errno.h"
8#include "include/stringify.h"
9#include "cls/rbd/cls_rbd_client.h"
10#include "common/Timer.h"
11#include "common/WorkQueue.h"
12#include "global/global_context.h"
13#include "journal/Journaler.h"
14#include "journal/ReplayHandler.h"
15#include "journal/Settings.h"
16#include "librbd/ExclusiveLock.h"
17#include "librbd/ImageCtx.h"
18#include "librbd/ImageState.h"
19#include "librbd/Journal.h"
20#include "librbd/Operations.h"
21#include "librbd/Utils.h"
22#include "librbd/journal/Replay.h"
d2e6a577 23#include "ImageDeleter.h"
7c673cae 24#include "ImageReplayer.h"
7c673cae
FG
25#include "Threads.h"
26#include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
27#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
28#include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
29#include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
d2e6a577 30#include "tools/rbd_mirror/image_replayer/PrepareRemoteImageRequest.h"
7c673cae
FG
31#include "tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h"
32
33#define dout_context g_ceph_context
34#define dout_subsys ceph_subsys_rbd_mirror
35#undef dout_prefix
36#define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
37 << __func__ << ": "
38
39using std::map;
40using std::string;
41using std::unique_ptr;
42using std::shared_ptr;
43using std::vector;
44
45namespace rbd {
46namespace mirror {
47
48using librbd::util::create_context_callback;
49using librbd::util::create_rados_callback;
50using namespace rbd::mirror::image_replayer;
51
52template <typename I>
53std::ostream &operator<<(std::ostream &os,
54 const typename ImageReplayer<I>::State &state);
55
56namespace {
57
58template <typename I>
59struct ReplayHandler : public ::journal::ReplayHandler {
60 ImageReplayer<I> *replayer;
61 ReplayHandler(ImageReplayer<I> *replayer) : replayer(replayer) {}
62 void get() override {}
63 void put() override {}
64
65 void handle_entries_available() override {
66 replayer->handle_replay_ready();
67 }
68 void handle_complete(int r) override {
69 std::stringstream ss;
70 if (r < 0) {
71 ss << "replay completed with error: " << cpp_strerror(r);
72 }
73 replayer->handle_replay_complete(r, ss.str());
74 }
75};
76
77class ImageReplayerAdminSocketCommand {
78public:
79 virtual ~ImageReplayerAdminSocketCommand() {}
80 virtual bool call(Formatter *f, stringstream *ss) = 0;
81};
82
83template <typename I>
84class StatusCommand : public ImageReplayerAdminSocketCommand {
85public:
86 explicit StatusCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
87
88 bool call(Formatter *f, stringstream *ss) override {
89 replayer->print_status(f, ss);
90 return true;
91 }
92
93private:
94 ImageReplayer<I> *replayer;
95};
96
97template <typename I>
98class StartCommand : public ImageReplayerAdminSocketCommand {
99public:
100 explicit StartCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
101
102 bool call(Formatter *f, stringstream *ss) override {
103 replayer->start(nullptr, true);
104 return true;
105 }
106
107private:
108 ImageReplayer<I> *replayer;
109};
110
111template <typename I>
112class StopCommand : public ImageReplayerAdminSocketCommand {
113public:
114 explicit StopCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
115
116 bool call(Formatter *f, stringstream *ss) override {
117 replayer->stop(nullptr, true);
118 return true;
119 }
120
121private:
122 ImageReplayer<I> *replayer;
123};
124
125template <typename I>
126class RestartCommand : public ImageReplayerAdminSocketCommand {
127public:
128 explicit RestartCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
129
130 bool call(Formatter *f, stringstream *ss) override {
131 replayer->restart();
132 return true;
133 }
134
135private:
136 ImageReplayer<I> *replayer;
137};
138
139template <typename I>
140class FlushCommand : public ImageReplayerAdminSocketCommand {
141public:
142 explicit FlushCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
143
144 bool call(Formatter *f, stringstream *ss) override {
145 C_SaferCond cond;
146 replayer->flush(&cond);
147 int r = cond.wait();
148 if (r < 0) {
149 *ss << "flush: " << cpp_strerror(r);
150 return false;
151 }
152 return true;
153 }
154
155private:
156 ImageReplayer<I> *replayer;
157};
158
159template <typename I>
160class ImageReplayerAdminSocketHook : public AdminSocketHook {
161public:
162 ImageReplayerAdminSocketHook(CephContext *cct, const std::string &name,
31f18b77 163 ImageReplayer<I> *replayer)
d2e6a577 164 : admin_socket(cct->get_admin_socket()), name(name), replayer(replayer),
31f18b77
FG
165 lock("ImageReplayerAdminSocketHook::lock " +
166 replayer->get_global_image_id()) {
d2e6a577
FG
167 }
168
169 int register_commands() {
7c673cae
FG
170 std::string command;
171 int r;
172
173 command = "rbd mirror status " + name;
174 r = admin_socket->register_command(command, command, this,
175 "get status for rbd mirror " + name);
d2e6a577
FG
176 if (r < 0) {
177 return r;
7c673cae 178 }
d2e6a577 179 commands[command] = new StatusCommand<I>(replayer);
7c673cae
FG
180
181 command = "rbd mirror start " + name;
182 r = admin_socket->register_command(command, command, this,
183 "start rbd mirror " + name);
d2e6a577
FG
184 if (r < 0) {
185 return r;
7c673cae 186 }
d2e6a577 187 commands[command] = new StartCommand<I>(replayer);
7c673cae
FG
188
189 command = "rbd mirror stop " + name;
190 r = admin_socket->register_command(command, command, this,
191 "stop rbd mirror " + name);
d2e6a577
FG
192 if (r < 0) {
193 return r;
7c673cae 194 }
d2e6a577 195 commands[command] = new StopCommand<I>(replayer);
7c673cae
FG
196
197 command = "rbd mirror restart " + name;
198 r = admin_socket->register_command(command, command, this,
199 "restart rbd mirror " + name);
d2e6a577
FG
200 if (r < 0) {
201 return r;
7c673cae 202 }
d2e6a577 203 commands[command] = new RestartCommand<I>(replayer);
7c673cae
FG
204
205 command = "rbd mirror flush " + name;
206 r = admin_socket->register_command(command, command, this,
207 "flush rbd mirror " + name);
d2e6a577
FG
208 if (r < 0) {
209 return r;
7c673cae 210 }
d2e6a577
FG
211 commands[command] = new FlushCommand<I>(replayer);
212
213 return 0;
7c673cae
FG
214 }
215
216 ~ImageReplayerAdminSocketHook() override {
31f18b77 217 Mutex::Locker locker(lock);
7c673cae
FG
218 for (Commands::const_iterator i = commands.begin(); i != commands.end();
219 ++i) {
220 (void)admin_socket->unregister_command(i->first);
221 delete i->second;
222 }
31f18b77 223 commands.clear();
7c673cae
FG
224 }
225
226 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
227 bufferlist& out) override {
31f18b77 228 Mutex::Locker locker(lock);
7c673cae
FG
229 Commands::const_iterator i = commands.find(command);
230 assert(i != commands.end());
231 Formatter *f = Formatter::create(format);
232 stringstream ss;
233 bool r = i->second->call(f, &ss);
234 delete f;
235 out.append(ss);
236 return r;
237 }
238
239private:
240 typedef std::map<std::string, ImageReplayerAdminSocketCommand*> Commands;
241
242 AdminSocket *admin_socket;
d2e6a577
FG
243 std::string name;
244 ImageReplayer<I> *replayer;
31f18b77 245 Mutex lock;
7c673cae
FG
246 Commands commands;
247};
248
249uint32_t calculate_replay_delay(const utime_t &event_time,
250 int mirroring_replay_delay) {
251 if (mirroring_replay_delay <= 0) {
252 return 0;
253 }
254
255 utime_t now = ceph_clock_now();
256 if (event_time + mirroring_replay_delay <= now) {
257 return 0;
258 }
259
260 // ensure it is rounded up when converting to integer
261 return (event_time + mirroring_replay_delay - now) + 1;
262}
263
264} // anonymous namespace
265
266template <typename I>
267void ImageReplayer<I>::BootstrapProgressContext::update_progress(
268 const std::string &description, bool flush)
269{
270 const std::string desc = "bootstrapping, " + description;
271 replayer->set_state_description(0, desc);
272 if (flush) {
273 replayer->update_mirror_image_status(false, boost::none);
274 }
275}
276
277template <typename I>
278void ImageReplayer<I>::RemoteJournalerListener::handle_update(
279 ::journal::JournalMetadata *) {
280 FunctionContext *ctx = new FunctionContext([this](int r) {
281 replayer->handle_remote_journal_metadata_updated();
282 });
283 replayer->m_threads->work_queue->queue(ctx, 0);
284}
285
286template <typename I>
d2e6a577 287ImageReplayer<I>::ImageReplayer(Threads<I> *threads,
c07f9fc5 288 ImageDeleter<I>* image_deleter,
31f18b77 289 InstanceWatcher<I> *instance_watcher,
7c673cae
FG
290 RadosRef local,
291 const std::string &local_mirror_uuid,
292 int64_t local_pool_id,
293 const std::string &global_image_id) :
294 m_threads(threads),
295 m_image_deleter(image_deleter),
31f18b77 296 m_instance_watcher(instance_watcher),
7c673cae
FG
297 m_local(local),
298 m_local_mirror_uuid(local_mirror_uuid),
299 m_local_pool_id(local_pool_id),
300 m_global_image_id(global_image_id),
301 m_lock("rbd::mirror::ImageReplayer " + stringify(local_pool_id) + " " +
302 global_image_id),
303 m_progress_cxt(this),
304 m_journal_listener(new JournalListener(this)),
305 m_remote_listener(this)
306{
307 // Register asok commands using a temporary "remote_pool_name/global_image_id"
308 // name. When the image name becomes known on start the asok commands will be
309 // re-registered using "remote_pool_name/remote_image_name" name.
310
311 std::string pool_name;
312 int r = m_local->pool_reverse_lookup(m_local_pool_id, &pool_name);
313 if (r < 0) {
314 derr << "error resolving local pool " << m_local_pool_id
315 << ": " << cpp_strerror(r) << dendl;
316 pool_name = stringify(m_local_pool_id);
317 }
318
319 m_name = pool_name + "/" + m_global_image_id;
d2e6a577 320 register_admin_socket_hook();
7c673cae
FG
321}
322
323template <typename I>
324ImageReplayer<I>::~ImageReplayer()
325{
d2e6a577 326 unregister_admin_socket_hook();
7c673cae
FG
327 assert(m_event_preprocessor == nullptr);
328 assert(m_replay_status_formatter == nullptr);
329 assert(m_local_image_ctx == nullptr);
330 assert(m_local_replay == nullptr);
331 assert(m_remote_journaler == nullptr);
332 assert(m_replay_handler == nullptr);
333 assert(m_on_start_finish == nullptr);
334 assert(m_on_stop_finish == nullptr);
335 assert(m_bootstrap_request == nullptr);
336 assert(m_in_flight_status_updates == 0);
337
338 delete m_journal_listener;
7c673cae
FG
339}
340
c07f9fc5
FG
341template <typename I>
342image_replayer::HealthState ImageReplayer<I>::get_health_state() const {
343 Mutex::Locker locker(m_lock);
344
345 if (!m_mirror_image_status_state) {
346 return image_replayer::HEALTH_STATE_OK;
347 } else if (*m_mirror_image_status_state ==
348 cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING ||
349 *m_mirror_image_status_state ==
350 cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN) {
351 return image_replayer::HEALTH_STATE_WARNING;
352 }
353 return image_replayer::HEALTH_STATE_ERROR;
354}
355
7c673cae 356template <typename I>
d2e6a577
FG
357void ImageReplayer<I>::add_peer(const std::string &peer_uuid,
358 librados::IoCtx &io_ctx) {
7c673cae 359 Mutex::Locker locker(m_lock);
d2e6a577
FG
360 auto it = m_peers.find({peer_uuid});
361 if (it == m_peers.end()) {
362 m_peers.insert({peer_uuid, io_ctx});
7c673cae
FG
363 }
364}
365
7c673cae
FG
366template <typename I>
367void ImageReplayer<I>::set_state_description(int r, const std::string &desc) {
368 dout(20) << r << " " << desc << dendl;
369
370 Mutex::Locker l(m_lock);
371 m_last_r = r;
372 m_state_desc = desc;
373}
374
375template <typename I>
376void ImageReplayer<I>::start(Context *on_finish, bool manual)
377{
378 dout(20) << "on_finish=" << on_finish << dendl;
379
380 int r = 0;
381 {
382 Mutex::Locker locker(m_lock);
383 if (!is_stopped_()) {
384 derr << "already running" << dendl;
385 r = -EINVAL;
386 } else if (m_manual_stop && !manual) {
387 dout(5) << "stopped manually, ignoring start without manual flag"
388 << dendl;
389 r = -EPERM;
390 } else {
391 m_state = STATE_STARTING;
392 m_last_r = 0;
393 m_state_desc.clear();
394 m_manual_stop = false;
d2e6a577 395 m_delete_requested = false;
7c673cae
FG
396
397 if (on_finish != nullptr) {
398 assert(m_on_start_finish == nullptr);
399 m_on_start_finish = on_finish;
400 }
401 assert(m_on_stop_finish == nullptr);
402 }
403 }
404
405 if (r < 0) {
406 if (on_finish) {
407 on_finish->complete(r);
408 }
409 return;
410 }
411
412 r = m_local->ioctx_create2(m_local_pool_id, m_local_ioctx);
413 if (r < 0) {
414 derr << "error opening ioctx for local pool " << m_local_pool_id
415 << ": " << cpp_strerror(r) << dendl;
416 on_start_fail(r, "error opening local pool");
417 return;
418 }
419
d2e6a577
FG
420 wait_for_deletion();
421}
422
423template <typename I>
424void ImageReplayer<I>::wait_for_deletion() {
425 dout(20) << dendl;
426
427 Context *ctx = create_context_callback<
428 ImageReplayer, &ImageReplayer<I>::handle_wait_for_deletion>(this);
429 m_image_deleter->wait_for_scheduled_deletion(
430 m_local_pool_id, m_global_image_id, ctx, false);
431}
432
433template <typename I>
434void ImageReplayer<I>::handle_wait_for_deletion(int r) {
435 dout(20) << "r=" << r << dendl;
436
437 if (r == -ECANCELED) {
438 on_start_fail(0, "");
439 return;
440 } else if (r < 0) {
441 on_start_fail(r, "error waiting for image deletion");
442 return;
443 }
444
7c673cae
FG
445 prepare_local_image();
446}
447
448template <typename I>
449void ImageReplayer<I>::prepare_local_image() {
450 dout(20) << dendl;
451
d2e6a577 452 m_local_image_id = "";
7c673cae
FG
453 Context *ctx = create_context_callback<
454 ImageReplayer, &ImageReplayer<I>::handle_prepare_local_image>(this);
455 auto req = PrepareLocalImageRequest<I>::create(
456 m_local_ioctx, m_global_image_id, &m_local_image_id,
457 &m_local_image_tag_owner, m_threads->work_queue, ctx);
458 req->send();
459}
460
461template <typename I>
462void ImageReplayer<I>::handle_prepare_local_image(int r) {
463 dout(20) << "r=" << r << dendl;
464
465 if (r == -ENOENT) {
466 dout(20) << "local image does not exist" << dendl;
467 } else if (r < 0) {
468 on_start_fail(r, "error preparing local image for replay");
469 return;
470 } else if (m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) {
471 dout(5) << "local image is primary" << dendl;
472 on_start_fail(0, "local image is primary");
473 return;
474 }
475
476 // local image doesn't exist or is non-primary
d2e6a577 477 prepare_remote_image();
7c673cae
FG
478}
479
480template <typename I>
d2e6a577 481void ImageReplayer<I>::prepare_remote_image() {
7c673cae
FG
482 dout(20) << dendl;
483
d2e6a577
FG
484 // TODO need to support multiple remote images
485 assert(!m_peers.empty());
486 m_remote_image = {*m_peers.begin()};
487
488 Context *ctx = create_context_callback<
489 ImageReplayer, &ImageReplayer<I>::handle_prepare_remote_image>(this);
490 auto req = PrepareRemoteImageRequest<I>::create(
491 m_remote_image.io_ctx, m_global_image_id, &m_remote_image.mirror_uuid,
492 &m_remote_image.image_id, ctx);
493 req->send();
494}
495
496template <typename I>
497void ImageReplayer<I>::handle_prepare_remote_image(int r) {
498 dout(20) << "r=" << r << dendl;
499
500 if (r == -ENOENT) {
501 dout(20) << "remote image does not exist" << dendl;
502
503 // TODO need to support multiple remote images
504 if (!m_local_image_id.empty() &&
505 m_local_image_tag_owner == m_remote_image.mirror_uuid) {
506 // local image exists and is non-primary and linked to the missing
507 // remote image
508
509 m_delete_requested = true;
510 on_start_fail(0, "remote image no longer exists");
511 } else {
512 on_start_fail(-ENOENT, "remote image does not exist");
513 }
514 return;
515 } else if (r < 0) {
516 on_start_fail(r, "error retrieving remote image id");
7c673cae
FG
517 return;
518 }
519
d2e6a577
FG
520 bootstrap();
521}
522
523template <typename I>
524void ImageReplayer<I>::bootstrap() {
525 dout(20) << dendl;
7c673cae
FG
526
527 CephContext *cct = static_cast<CephContext *>(m_local->cct());
528 journal::Settings settings;
181888fb
FG
529 settings.commit_interval = cct->_conf->get_val<double>(
530 "rbd_mirror_journal_commit_age");
531 settings.max_fetch_bytes = cct->_conf->get_val<uint64_t>(
532 "rbd_mirror_journal_max_fetch_bytes");
7c673cae
FG
533
534 m_remote_journaler = new Journaler(m_threads->work_queue,
535 m_threads->timer,
536 &m_threads->timer_lock,
537 m_remote_image.io_ctx,
538 m_remote_image.image_id,
539 m_local_mirror_uuid, settings);
540
541 Context *ctx = create_context_callback<
542 ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
543
544 BootstrapRequest<I> *request = BootstrapRequest<I>::create(
31f18b77 545 m_local_ioctx, m_remote_image.io_ctx, m_instance_watcher,
7c673cae
FG
546 &m_local_image_ctx, m_local_image_id, m_remote_image.image_id,
547 m_global_image_id, m_threads->work_queue, m_threads->timer,
548 &m_threads->timer_lock, m_local_mirror_uuid, m_remote_image.mirror_uuid,
d2e6a577
FG
549 m_remote_journaler, &m_client_meta, ctx, &m_resync_requested,
550 &m_progress_cxt);
7c673cae
FG
551
552 {
553 Mutex::Locker locker(m_lock);
554 request->get();
555 m_bootstrap_request = request;
556 }
557
558 update_mirror_image_status(false, boost::none);
559 reschedule_update_status_task(10);
560
561 request->send();
562}
563
564template <typename I>
565void ImageReplayer<I>::handle_bootstrap(int r) {
566 dout(20) << "r=" << r << dendl;
567 {
568 Mutex::Locker locker(m_lock);
569 m_bootstrap_request->put();
570 m_bootstrap_request = nullptr;
571 if (m_local_image_ctx) {
572 m_local_image_id = m_local_image_ctx->id;
573 }
574 }
575
576 if (r == -EREMOTEIO) {
577 m_local_image_tag_owner = "";
c07f9fc5
FG
578 dout(5) << "remote image is non-primary" << dendl;
579 on_start_fail(-EREMOTEIO, "remote image is non-primary");
7c673cae
FG
580 return;
581 } else if (r == -EEXIST) {
582 m_local_image_tag_owner = "";
583 on_start_fail(r, "split-brain detected");
584 return;
585 } else if (r < 0) {
586 on_start_fail(r, "error bootstrapping replay");
587 return;
588 } else if (on_start_interrupted()) {
589 return;
d2e6a577
FG
590 } else if (m_resync_requested) {
591 on_start_fail(0, "resync requested");
592 return;
7c673cae
FG
593 }
594
595 assert(m_local_journal == nullptr);
596 {
597 RWLock::RLocker snap_locker(m_local_image_ctx->snap_lock);
598 if (m_local_image_ctx->journal != nullptr) {
599 m_local_journal = m_local_image_ctx->journal;
600 m_local_journal->add_listener(m_journal_listener);
601 }
602 }
603
604 if (m_local_journal == nullptr) {
605 on_start_fail(-EINVAL, "error accessing local journal");
606 return;
607 }
608
609 {
610 Mutex::Locker locker(m_lock);
7c673cae
FG
611 std::string name = m_local_ioctx.get_pool_name() + "/" +
612 m_local_image_ctx->name;
613 if (m_name != name) {
614 m_name = name;
615 if (m_asok_hook) {
616 // Re-register asok commands using the new name.
617 delete m_asok_hook;
618 m_asok_hook = nullptr;
619 }
620 }
d2e6a577 621 register_admin_socket_hook();
7c673cae
FG
622 }
623
624 update_mirror_image_status(false, boost::none);
625 init_remote_journaler();
626}
627
628template <typename I>
629void ImageReplayer<I>::init_remote_journaler() {
630 dout(20) << dendl;
631
632 Context *ctx = create_context_callback<
633 ImageReplayer, &ImageReplayer<I>::handle_init_remote_journaler>(this);
634 m_remote_journaler->init(ctx);
635}
636
637template <typename I>
638void ImageReplayer<I>::handle_init_remote_journaler(int r) {
639 dout(20) << "r=" << r << dendl;
640
641 if (r < 0) {
642 derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl;
643 on_start_fail(r, "error initializing remote journal");
644 return;
645 } else if (on_start_interrupted()) {
646 return;
647 }
648
649 m_remote_journaler->add_listener(&m_remote_listener);
650
651 cls::journal::Client client;
652 r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
653 if (r < 0) {
654 derr << "error retrieving remote journal client: " << cpp_strerror(r)
655 << dendl;
656 on_start_fail(r, "error retrieving remote journal client");
657 return;
658 }
659
d2e6a577
FG
660 derr << "image_id=" << m_local_image_id << ", "
661 << "m_client_meta.image_id=" << m_client_meta.image_id << ", "
662 << "client.state=" << client.state << dendl;
663 if (m_client_meta.image_id == m_local_image_id &&
664 client.state != cls::journal::CLIENT_STATE_CONNECTED) {
7c673cae
FG
665 dout(5) << "client flagged disconnected, stopping image replay" << dendl;
666 if (m_local_image_ctx->mirroring_resync_after_disconnect) {
d2e6a577
FG
667 m_resync_requested = true;
668 on_start_fail(-ENOTCONN, "disconnected: automatic resync");
669 } else {
670 on_start_fail(-ENOTCONN, "disconnected");
7c673cae 671 }
7c673cae
FG
672 return;
673 }
674
675 start_replay();
676}
677
678template <typename I>
679void ImageReplayer<I>::start_replay() {
680 dout(20) << dendl;
681
682 Context *start_ctx = create_context_callback<
683 ImageReplayer, &ImageReplayer<I>::handle_start_replay>(this);
684 m_local_journal->start_external_replay(&m_local_replay, start_ctx);
685}
686
687template <typename I>
688void ImageReplayer<I>::handle_start_replay(int r) {
689 dout(20) << "r=" << r << dendl;
690
691 if (r < 0) {
692 assert(m_local_replay == nullptr);
693 derr << "error starting external replay on local image "
694 << m_local_image_id << ": " << cpp_strerror(r) << dendl;
695 on_start_fail(r, "error starting replay on local image");
696 return;
697 }
698
699 Context *on_finish(nullptr);
700 {
701 Mutex::Locker locker(m_lock);
702 assert(m_state == STATE_STARTING);
703 m_state = STATE_REPLAYING;
704 std::swap(m_on_start_finish, on_finish);
705 }
706
707 m_event_preprocessor = EventPreprocessor<I>::create(
708 *m_local_image_ctx, *m_remote_journaler, m_local_mirror_uuid,
709 &m_client_meta, m_threads->work_queue);
710 m_replay_status_formatter =
711 ReplayStatusFormatter<I>::create(m_remote_journaler, m_local_mirror_uuid);
712
713 update_mirror_image_status(true, boost::none);
714 reschedule_update_status_task(30);
715
7c673cae
FG
716 if (on_replay_interrupted()) {
717 return;
718 }
719
720 {
721 CephContext *cct = static_cast<CephContext *>(m_local->cct());
181888fb
FG
722 double poll_seconds = cct->_conf->get_val<double>(
723 "rbd_mirror_journal_poll_age");
7c673cae
FG
724
725 Mutex::Locker locker(m_lock);
726 m_replay_handler = new ReplayHandler<I>(this);
727 m_remote_journaler->start_live_replay(m_replay_handler, poll_seconds);
728
729 dout(20) << "m_remote_journaler=" << *m_remote_journaler << dendl;
730 }
731
d2e6a577
FG
732 dout(20) << "start succeeded" << dendl;
733 if (on_finish != nullptr) {
734 dout(20) << "on finish complete, r=" << r << dendl;
735 on_finish->complete(r);
736 }
7c673cae
FG
737}
738
739template <typename I>
740void ImageReplayer<I>::on_start_fail(int r, const std::string &desc)
741{
742 dout(20) << "r=" << r << dendl;
743 Context *ctx = new FunctionContext([this, r, desc](int _r) {
744 {
745 Mutex::Locker locker(m_lock);
746 assert(m_state == STATE_STARTING);
747 m_state = STATE_STOPPING;
d2e6a577 748 if (r < 0 && r != -ECANCELED && r != -EREMOTEIO && r != -ENOENT) {
7c673cae
FG
749 derr << "start failed: " << cpp_strerror(r) << dendl;
750 } else {
751 dout(20) << "start canceled" << dendl;
752 }
753 }
754
755 set_state_description(r, desc);
756 update_mirror_image_status(false, boost::none);
757 reschedule_update_status_task(-1);
758 shut_down(r);
759 });
760 m_threads->work_queue->queue(ctx, 0);
761}
762
763template <typename I>
764bool ImageReplayer<I>::on_start_interrupted()
765{
766 Mutex::Locker locker(m_lock);
767 assert(m_state == STATE_STARTING);
768 if (m_on_stop_finish == nullptr) {
769 return false;
770 }
771
772 on_start_fail(-ECANCELED);
773 return true;
774}
775
776template <typename I>
777void ImageReplayer<I>::stop(Context *on_finish, bool manual, int r,
778 const std::string& desc)
779{
780 dout(20) << "on_finish=" << on_finish << ", manual=" << manual
781 << ", desc=" << desc << dendl;
782
d2e6a577
FG
783 m_image_deleter->cancel_waiter(m_local_pool_id, m_global_image_id);
784
7c673cae
FG
785 image_replayer::BootstrapRequest<I> *bootstrap_request = nullptr;
786 bool shut_down_replay = false;
787 bool running = true;
788 bool canceled_task = false;
789 {
790 Mutex::Locker locker(m_lock);
791
792 if (!is_running_()) {
793 running = false;
794 } else {
795 if (!is_stopped_()) {
796 if (m_state == STATE_STARTING) {
797 dout(20) << "canceling start" << dendl;
798 if (m_bootstrap_request) {
799 bootstrap_request = m_bootstrap_request;
800 bootstrap_request->get();
801 }
802 } else {
803 dout(20) << "interrupting replay" << dendl;
804 shut_down_replay = true;
805 }
806
807 assert(m_on_stop_finish == nullptr);
808 std::swap(m_on_stop_finish, on_finish);
809 m_stop_requested = true;
810 m_manual_stop = manual;
811
812 Mutex::Locker timer_locker(m_threads->timer_lock);
813 if (m_delayed_preprocess_task != nullptr) {
814 canceled_task = m_threads->timer->cancel_event(
815 m_delayed_preprocess_task);
816 assert(canceled_task);
817 m_delayed_preprocess_task = nullptr;
818 }
819 }
820 }
821 }
822
823 // avoid holding lock since bootstrap request will update status
824 if (bootstrap_request != nullptr) {
825 bootstrap_request->cancel();
826 bootstrap_request->put();
827 }
828
829 if (canceled_task) {
830 m_event_replay_tracker.finish_op();
831 on_replay_interrupted();
832 }
833
834 if (!running) {
835 dout(20) << "not running" << dendl;
836 if (on_finish) {
837 on_finish->complete(-EINVAL);
838 }
839 return;
840 }
841
842 if (shut_down_replay) {
843 on_stop_journal_replay(r, desc);
844 } else if (on_finish != nullptr) {
845 on_finish->complete(0);
846 }
847}
848
849template <typename I>
850void ImageReplayer<I>::on_stop_journal_replay(int r, const std::string &desc)
851{
852 dout(20) << "enter" << dendl;
853
854 {
855 Mutex::Locker locker(m_lock);
856 if (m_state != STATE_REPLAYING) {
857 // might be invoked multiple times while stopping
858 return;
859 }
860 m_stop_requested = true;
861 m_state = STATE_STOPPING;
862 }
863
864 set_state_description(r, desc);
865 update_mirror_image_status(false, boost::none);
866 reschedule_update_status_task(-1);
867 shut_down(0);
868}
869
870template <typename I>
871void ImageReplayer<I>::handle_replay_ready()
872{
873 dout(20) << "enter" << dendl;
874 if (on_replay_interrupted()) {
875 return;
876 }
877
878 if (!m_remote_journaler->try_pop_front(&m_replay_entry, &m_replay_tag_tid)) {
879 return;
880 }
881
882 m_event_replay_tracker.start_op();
31f18b77
FG
883
884 m_lock.Lock();
885 bool stopping = (m_state == STATE_STOPPING);
886 m_lock.Unlock();
887
888 if (stopping) {
889 dout(10) << "stopping event replay" << dendl;
890 m_event_replay_tracker.finish_op();
891 return;
892 }
893
7c673cae
FG
894 if (m_replay_tag_valid && m_replay_tag.tid == m_replay_tag_tid) {
895 preprocess_entry();
896 return;
897 }
898
899 replay_flush();
900}
901
902template <typename I>
903void ImageReplayer<I>::restart(Context *on_finish)
904{
905 FunctionContext *ctx = new FunctionContext(
906 [this, on_finish](int r) {
907 if (r < 0) {
908 // Try start anyway.
909 }
910 start(on_finish, true);
911 });
912 stop(ctx);
913}
914
915template <typename I>
916void ImageReplayer<I>::flush(Context *on_finish)
917{
918 dout(20) << "enter" << dendl;
919
920 {
921 Mutex::Locker locker(m_lock);
922 if (m_state == STATE_REPLAYING) {
923 Context *ctx = new FunctionContext(
924 [on_finish](int r) {
925 if (on_finish != nullptr) {
926 on_finish->complete(r);
927 }
928 });
929 on_flush_local_replay_flush_start(ctx);
930 return;
931 }
932 }
933
934 if (on_finish) {
935 on_finish->complete(0);
936 }
937}
938
939template <typename I>
940void ImageReplayer<I>::on_flush_local_replay_flush_start(Context *on_flush)
941{
942 dout(20) << "enter" << dendl;
943 FunctionContext *ctx = new FunctionContext(
944 [this, on_flush](int r) {
945 on_flush_local_replay_flush_finish(on_flush, r);
946 });
947
948 assert(m_lock.is_locked());
949 assert(m_state == STATE_REPLAYING);
950 m_local_replay->flush(ctx);
951}
952
953template <typename I>
954void ImageReplayer<I>::on_flush_local_replay_flush_finish(Context *on_flush,
955 int r)
956{
957 dout(20) << "r=" << r << dendl;
958 if (r < 0) {
959 derr << "error flushing local replay: " << cpp_strerror(r) << dendl;
960 on_flush->complete(r);
961 return;
962 }
963
964 on_flush_flush_commit_position_start(on_flush);
965}
966
967template <typename I>
968void ImageReplayer<I>::on_flush_flush_commit_position_start(Context *on_flush)
969{
970 FunctionContext *ctx = new FunctionContext(
971 [this, on_flush](int r) {
972 on_flush_flush_commit_position_finish(on_flush, r);
973 });
974
975 m_remote_journaler->flush_commit_position(ctx);
976}
977
978template <typename I>
979void ImageReplayer<I>::on_flush_flush_commit_position_finish(Context *on_flush,
980 int r)
981{
982 if (r < 0) {
983 derr << "error flushing remote journal commit position: "
984 << cpp_strerror(r) << dendl;
985 }
986
987 update_mirror_image_status(false, boost::none);
988
989 dout(20) << "flush complete, r=" << r << dendl;
990 on_flush->complete(r);
991}
992
993template <typename I>
994bool ImageReplayer<I>::on_replay_interrupted()
995{
996 bool shut_down;
997 {
998 Mutex::Locker locker(m_lock);
999 shut_down = m_stop_requested;
1000 }
1001
1002 if (shut_down) {
1003 on_stop_journal_replay();
1004 }
1005 return shut_down;
1006}
1007
1008template <typename I>
1009void ImageReplayer<I>::print_status(Formatter *f, stringstream *ss)
1010{
1011 dout(20) << "enter" << dendl;
1012
1013 Mutex::Locker l(m_lock);
1014
1015 if (f) {
1016 f->open_object_section("image_replayer");
1017 f->dump_string("name", m_name);
1018 f->dump_string("state", to_string(m_state));
1019 f->close_section();
1020 f->flush(*ss);
1021 } else {
1022 *ss << m_name << ": state: " << to_string(m_state);
1023 }
1024}
1025
1026template <typename I>
1027void ImageReplayer<I>::handle_replay_complete(int r, const std::string &error_desc)
1028{
1029 dout(20) << "r=" << r << dendl;
1030 if (r < 0) {
1031 derr << "replay encountered an error: " << cpp_strerror(r) << dendl;
1032 set_state_description(r, error_desc);
1033 }
1034
1035 {
1036 Mutex::Locker locker(m_lock);
1037 m_stop_requested = true;
1038 }
1039 on_replay_interrupted();
1040}
1041
1042template <typename I>
1043void ImageReplayer<I>::replay_flush() {
1044 dout(20) << dendl;
1045
1046 bool interrupted = false;
1047 {
1048 Mutex::Locker locker(m_lock);
1049 if (m_state != STATE_REPLAYING) {
1050 dout(20) << "replay interrupted" << dendl;
1051 interrupted = true;
1052 } else {
1053 m_state = STATE_REPLAY_FLUSHING;
1054 }
1055 }
1056
1057 if (interrupted) {
1058 m_event_replay_tracker.finish_op();
1059 return;
1060 }
1061
1062 // shut down the replay to flush all IO and ops and create a new
1063 // replayer to handle the new tag epoch
1064 Context *ctx = create_context_callback<
1065 ImageReplayer<I>, &ImageReplayer<I>::handle_replay_flush>(this);
1066 ctx = new FunctionContext([this, ctx](int r) {
1067 m_local_image_ctx->journal->stop_external_replay();
1068 m_local_replay = nullptr;
1069
1070 if (r < 0) {
1071 ctx->complete(r);
1072 return;
1073 }
1074
1075 m_local_journal->start_external_replay(&m_local_replay, ctx);
1076 });
1077 m_local_replay->shut_down(false, ctx);
1078}
1079
1080template <typename I>
1081void ImageReplayer<I>::handle_replay_flush(int r) {
1082 dout(20) << "r=" << r << dendl;
1083
1084 {
1085 Mutex::Locker locker(m_lock);
1086 assert(m_state == STATE_REPLAY_FLUSHING);
1087 m_state = STATE_REPLAYING;
1088 }
1089
1090 if (r < 0) {
1091 derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl;
1092 m_event_replay_tracker.finish_op();
1093 handle_replay_complete(r, "replay flush encountered an error");
1094 return;
1095 } else if (on_replay_interrupted()) {
1096 m_event_replay_tracker.finish_op();
1097 return;
1098 }
1099
1100 get_remote_tag();
1101}
1102
1103template <typename I>
1104void ImageReplayer<I>::get_remote_tag() {
1105 dout(20) << "tag_tid: " << m_replay_tag_tid << dendl;
1106
1107 Context *ctx = create_context_callback<
1108 ImageReplayer, &ImageReplayer<I>::handle_get_remote_tag>(this);
1109 m_remote_journaler->get_tag(m_replay_tag_tid, &m_replay_tag, ctx);
1110}
1111
1112template <typename I>
1113void ImageReplayer<I>::handle_get_remote_tag(int r) {
1114 dout(20) << "r=" << r << dendl;
1115
1116 if (r == 0) {
1117 try {
1118 bufferlist::iterator it = m_replay_tag.data.begin();
1119 ::decode(m_replay_tag_data, it);
1120 } catch (const buffer::error &err) {
1121 r = -EBADMSG;
1122 }
1123 }
1124
1125 if (r < 0) {
1126 derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": "
1127 << cpp_strerror(r) << dendl;
1128 m_event_replay_tracker.finish_op();
1129 handle_replay_complete(r, "failed to retrieve remote tag");
1130 return;
1131 }
1132
1133 m_replay_tag_valid = true;
1134 dout(20) << "decoded remote tag " << m_replay_tag_tid << ": "
1135 << m_replay_tag_data << dendl;
1136
1137 allocate_local_tag();
1138}
1139
1140template <typename I>
1141void ImageReplayer<I>::allocate_local_tag() {
1142 dout(20) << dendl;
1143
1144 std::string mirror_uuid = m_replay_tag_data.mirror_uuid;
1145 if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID ||
1146 mirror_uuid == m_local_mirror_uuid) {
1147 mirror_uuid = m_remote_image.mirror_uuid;
1148 } else if (mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) {
1149 dout(5) << "encountered image demotion: stopping" << dendl;
1150 Mutex::Locker locker(m_lock);
1151 m_stop_requested = true;
1152 }
1153
1154 librbd::journal::TagPredecessor predecessor(m_replay_tag_data.predecessor);
1155 if (predecessor.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
1156 predecessor.mirror_uuid = m_remote_image.mirror_uuid;
1157 } else if (predecessor.mirror_uuid == m_local_mirror_uuid) {
1158 predecessor.mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
1159 }
1160
1161 dout(20) << "mirror_uuid=" << mirror_uuid << ", "
1162 << "predecessor_mirror_uuid=" << predecessor.mirror_uuid << ", "
1163 << "replay_tag_tid=" << m_replay_tag_tid << ", "
1164 << "replay_tag_data=" << m_replay_tag_data << dendl;
1165 Context *ctx = create_context_callback<
1166 ImageReplayer, &ImageReplayer<I>::handle_allocate_local_tag>(this);
1167 m_local_journal->allocate_tag(mirror_uuid, predecessor, ctx);
1168}
1169
1170template <typename I>
1171void ImageReplayer<I>::handle_allocate_local_tag(int r) {
1172 dout(20) << "r=" << r << dendl;
1173
1174 if (r < 0) {
1175 derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl;
1176 m_event_replay_tracker.finish_op();
1177 handle_replay_complete(r, "failed to allocate journal tag");
1178 return;
1179 }
1180
1181 preprocess_entry();
1182}
1183
1184template <typename I>
1185void ImageReplayer<I>::preprocess_entry() {
1186 dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid()
1187 << dendl;
1188
1189 bufferlist data = m_replay_entry.get_data();
1190 bufferlist::iterator it = data.begin();
1191 int r = m_local_replay->decode(&it, &m_event_entry);
1192 if (r < 0) {
1193 derr << "failed to decode journal event" << dendl;
1194 m_event_replay_tracker.finish_op();
1195 handle_replay_complete(r, "failed to decode journal event");
1196 return;
1197 }
1198
1199 uint32_t delay = calculate_replay_delay(
1200 m_event_entry.timestamp, m_local_image_ctx->mirroring_replay_delay);
1201 if (delay == 0) {
1202 handle_preprocess_entry_ready(0);
1203 return;
1204 }
1205
1206 dout(20) << "delaying replay by " << delay << " sec" << dendl;
1207
1208 Mutex::Locker timer_locker(m_threads->timer_lock);
1209 assert(m_delayed_preprocess_task == nullptr);
1210 m_delayed_preprocess_task = new FunctionContext(
1211 [this](int r) {
1212 assert(m_threads->timer_lock.is_locked());
1213 m_delayed_preprocess_task = nullptr;
1214 m_threads->work_queue->queue(
1215 create_context_callback<ImageReplayer,
1216 &ImageReplayer<I>::handle_preprocess_entry_ready>(this), 0);
1217 });
1218 m_threads->timer->add_event_after(delay, m_delayed_preprocess_task);
1219}
1220
1221template <typename I>
1222void ImageReplayer<I>::handle_preprocess_entry_ready(int r) {
1223 dout(20) << "r=" << r << dendl;
1224 assert(r == 0);
1225
1226 if (!m_event_preprocessor->is_required(m_event_entry)) {
1227 process_entry();
1228 return;
1229 }
1230
1231 Context *ctx = create_context_callback<
1232 ImageReplayer, &ImageReplayer<I>::handle_preprocess_entry_safe>(this);
1233 m_event_preprocessor->preprocess(&m_event_entry, ctx);
1234}
1235
1236template <typename I>
1237void ImageReplayer<I>::handle_preprocess_entry_safe(int r) {
1238 dout(20) << "r=" << r << dendl;
1239
1240 if (r < 0) {
7c673cae 1241 m_event_replay_tracker.finish_op();
31f18b77
FG
1242
1243 if (r == -ECANCELED) {
1244 handle_replay_complete(0, "lost exclusive lock");
1245 } else {
1246 derr << "failed to preprocess journal event" << dendl;
1247 handle_replay_complete(r, "failed to preprocess journal event");
1248 }
7c673cae
FG
1249 return;
1250 }
1251
1252 process_entry();
1253}
1254
1255template <typename I>
1256void ImageReplayer<I>::process_entry() {
1257 dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid()
1258 << dendl;
1259
1260 // stop replaying events if stop has been requested
1261 if (on_replay_interrupted()) {
1262 m_event_replay_tracker.finish_op();
1263 return;
1264 }
1265
1266 Context *on_ready = create_context_callback<
1267 ImageReplayer, &ImageReplayer<I>::handle_process_entry_ready>(this);
1268 Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry));
1269
1270 m_local_replay->process(m_event_entry, on_ready, on_commit);
1271}
1272
1273template <typename I>
1274void ImageReplayer<I>::handle_process_entry_ready(int r) {
1275 dout(20) << dendl;
1276 assert(r == 0);
1277
1278 // attempt to process the next event
1279 handle_replay_ready();
1280}
1281
1282template <typename I>
1283void ImageReplayer<I>::handle_process_entry_safe(const ReplayEntry& replay_entry,
1284 int r) {
1285 dout(20) << "commit_tid=" << replay_entry.get_commit_tid() << ", r=" << r
1286 << dendl;
1287
1288 if (r < 0) {
1289 derr << "failed to commit journal event: " << cpp_strerror(r) << dendl;
1290 handle_replay_complete(r, "failed to commit journal event");
1291 } else {
1292 assert(m_remote_journaler != nullptr);
1293 m_remote_journaler->committed(replay_entry);
1294 }
1295 m_event_replay_tracker.finish_op();
1296}
1297
1298template <typename I>
1299bool ImageReplayer<I>::update_mirror_image_status(bool force,
1300 const OptionalState &state) {
1301 dout(20) << dendl;
1302 {
1303 Mutex::Locker locker(m_lock);
1304 if (!start_mirror_image_status_update(force, false)) {
1305 return false;
1306 }
1307 }
1308
1309 queue_mirror_image_status_update(state);
1310 return true;
1311}
1312
1313template <typename I>
1314bool ImageReplayer<I>::start_mirror_image_status_update(bool force,
1315 bool restarting) {
1316 assert(m_lock.is_locked());
1317
1318 if (!force && !is_stopped_()) {
1319 if (!is_running_()) {
1320 dout(20) << "shut down in-progress: ignoring update" << dendl;
1321 return false;
1322 } else if (m_in_flight_status_updates > (restarting ? 1 : 0)) {
1323 dout(20) << "already sending update" << dendl;
1324 m_update_status_requested = true;
1325 return false;
1326 }
1327 }
1328
1329 dout(20) << dendl;
1330 ++m_in_flight_status_updates;
1331 return true;
1332}
1333
1334template <typename I>
1335void ImageReplayer<I>::finish_mirror_image_status_update() {
1336 Context *on_finish = nullptr;
1337 {
1338 Mutex::Locker locker(m_lock);
1339 assert(m_in_flight_status_updates > 0);
1340 if (--m_in_flight_status_updates > 0) {
1341 dout(20) << "waiting on " << m_in_flight_status_updates << " in-flight "
1342 << "updates" << dendl;
1343 return;
1344 }
1345
1346 std::swap(on_finish, m_on_update_status_finish);
1347 }
1348
1349 dout(20) << dendl;
1350 if (on_finish != nullptr) {
1351 on_finish->complete(0);
1352 }
1353}
1354
1355template <typename I>
1356void ImageReplayer<I>::queue_mirror_image_status_update(const OptionalState &state) {
1357 dout(20) << dendl;
1358 FunctionContext *ctx = new FunctionContext(
1359 [this, state](int r) {
1360 send_mirror_status_update(state);
1361 });
1362 m_threads->work_queue->queue(ctx, 0);
1363}
1364
1365template <typename I>
1366void ImageReplayer<I>::send_mirror_status_update(const OptionalState &opt_state) {
1367 State state;
1368 std::string state_desc;
1369 int last_r;
7c673cae 1370 bool stopping_replay;
c07f9fc5
FG
1371
1372 OptionalMirrorImageStatusState mirror_image_status_state{
1373 boost::make_optional(false, cls::rbd::MirrorImageStatusState{})};
1374 image_replayer::BootstrapRequest<I>* bootstrap_request = nullptr;
7c673cae
FG
1375 {
1376 Mutex::Locker locker(m_lock);
1377 state = m_state;
1378 state_desc = m_state_desc;
c07f9fc5 1379 mirror_image_status_state = m_mirror_image_status_state;
7c673cae 1380 last_r = m_last_r;
7c673cae 1381 stopping_replay = (m_local_image_ctx != nullptr);
c07f9fc5
FG
1382
1383 if (m_bootstrap_request != nullptr) {
1384 bootstrap_request = m_bootstrap_request;
1385 bootstrap_request->get();
1386 }
1387 }
1388
1389 bool syncing = false;
1390 if (bootstrap_request != nullptr) {
1391 syncing = bootstrap_request->is_syncing();
1392 bootstrap_request->put();
1393 bootstrap_request = nullptr;
7c673cae
FG
1394 }
1395
1396 if (opt_state) {
1397 state = *opt_state;
1398 }
1399
1400 cls::rbd::MirrorImageStatus status;
1401 status.up = true;
1402 switch (state) {
1403 case STATE_STARTING:
c07f9fc5 1404 if (syncing) {
7c673cae
FG
1405 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING;
1406 status.description = state_desc.empty() ? "syncing" : state_desc;
c07f9fc5 1407 mirror_image_status_state = status.state;
7c673cae
FG
1408 } else {
1409 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY;
1410 status.description = "starting replay";
1411 }
1412 break;
1413 case STATE_REPLAYING:
1414 case STATE_REPLAY_FLUSHING:
1415 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
1416 {
1417 Context *on_req_finish = new FunctionContext(
1418 [this](int r) {
1419 dout(20) << "replay status ready: r=" << r << dendl;
1420 if (r >= 0) {
1421 send_mirror_status_update(boost::none);
1422 } else if (r == -EAGAIN) {
1423 // decrement in-flight status update counter
1424 handle_mirror_status_update(r);
1425 }
1426 });
1427
1428 std::string desc;
1429 if (!m_replay_status_formatter->get_or_send_update(&desc,
1430 on_req_finish)) {
1431 dout(20) << "waiting for replay status" << dendl;
1432 return;
1433 }
1434 status.description = "replaying, " + desc;
c07f9fc5 1435 mirror_image_status_state = boost::none;
7c673cae
FG
1436 }
1437 break;
1438 case STATE_STOPPING:
1439 if (stopping_replay) {
1440 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY;
1441 status.description = "stopping replay";
1442 break;
1443 }
1444 // FALLTHROUGH
1445 case STATE_STOPPED:
c07f9fc5
FG
1446 if (last_r == -EREMOTEIO) {
1447 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN;
1448 status.description = state_desc;
1449 mirror_image_status_state = status.state;
1450 } else if (last_r < 0) {
7c673cae
FG
1451 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR;
1452 status.description = state_desc;
c07f9fc5 1453 mirror_image_status_state = status.state;
7c673cae
FG
1454 } else {
1455 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED;
1456 status.description = state_desc.empty() ? "stopped" : state_desc;
c07f9fc5 1457 mirror_image_status_state = boost::none;
7c673cae
FG
1458 }
1459 break;
1460 default:
1461 assert(!"invalid state");
1462 }
1463
c07f9fc5
FG
1464 {
1465 Mutex::Locker locker(m_lock);
1466 m_mirror_image_status_state = mirror_image_status_state;
1467 }
1468
1469 // prevent the status from ping-ponging when failed replays are restarted
1470 if (mirror_image_status_state &&
1471 *mirror_image_status_state == cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR) {
1472 status.state = *mirror_image_status_state;
1473 }
1474
7c673cae
FG
1475 dout(20) << "status=" << status << dendl;
1476 librados::ObjectWriteOperation op;
1477 librbd::cls_client::mirror_image_status_set(&op, m_global_image_id, status);
1478
1479 librados::AioCompletion *aio_comp = create_rados_callback<
1480 ImageReplayer<I>, &ImageReplayer<I>::handle_mirror_status_update>(this);
1481 int r = m_local_ioctx.aio_operate(RBD_MIRRORING, aio_comp, &op);
1482 assert(r == 0);
1483 aio_comp->release();
1484}
1485
1486template <typename I>
1487void ImageReplayer<I>::handle_mirror_status_update(int r) {
1488 dout(20) << "r=" << r << dendl;
1489
1490 bool running = false;
1491 bool started = false;
1492 {
1493 Mutex::Locker locker(m_lock);
1494 bool update_status_requested = false;
1495 std::swap(update_status_requested, m_update_status_requested);
1496
1497 running = is_running_();
1498 if (running && update_status_requested) {
1499 started = start_mirror_image_status_update(false, true);
1500 }
1501 }
1502
1503 // if a deferred update is available, send it -- otherwise reschedule
1504 // the timer task
1505 if (started) {
1506 queue_mirror_image_status_update(boost::none);
1507 } else if (running) {
1508 reschedule_update_status_task();
1509 }
1510
1511 // mark committed status update as no longer in-flight
1512 finish_mirror_image_status_update();
1513}
1514
1515template <typename I>
1516void ImageReplayer<I>::reschedule_update_status_task(int new_interval) {
1517 dout(20) << dendl;
1518
1519 bool canceled_task = false;
1520 {
1521 Mutex::Locker locker(m_lock);
1522 Mutex::Locker timer_locker(m_threads->timer_lock);
1523
1524 if (m_update_status_task) {
1525 canceled_task = m_threads->timer->cancel_event(m_update_status_task);
1526 m_update_status_task = nullptr;
1527 }
1528
1529 if (new_interval > 0) {
1530 m_update_status_interval = new_interval;
1531 }
1532
1533 bool restarting = (new_interval == 0 || canceled_task);
1534 if (new_interval >= 0 && is_running_() &&
1535 start_mirror_image_status_update(false, restarting)) {
1536 m_update_status_task = new FunctionContext(
1537 [this](int r) {
1538 assert(m_threads->timer_lock.is_locked());
1539 m_update_status_task = nullptr;
1540
1541 queue_mirror_image_status_update(boost::none);
1542 });
1543 m_threads->timer->add_event_after(m_update_status_interval,
1544 m_update_status_task);
1545 }
1546 }
1547
1548 if (canceled_task) {
1549 dout(20) << "canceled task" << dendl;
1550 finish_mirror_image_status_update();
1551 }
1552}
1553
1554template <typename I>
1555void ImageReplayer<I>::shut_down(int r) {
1556 dout(20) << "r=" << r << dendl;
1557 {
1558 Mutex::Locker locker(m_lock);
1559 assert(m_state == STATE_STOPPING);
1560
1561 // if status updates are in-flight, wait for them to complete
1562 // before proceeding
1563 if (m_in_flight_status_updates > 0) {
1564 if (m_on_update_status_finish == nullptr) {
1565 dout(20) << "waiting for in-flight status update" << dendl;
1566 m_on_update_status_finish = new FunctionContext(
1567 [this, r](int _r) {
1568 shut_down(r);
1569 });
1570 }
1571 return;
1572 }
1573 }
1574
31f18b77
FG
1575 // NOTE: it's important to ensure that the local image is fully
1576 // closed before attempting to close the remote journal in
1577 // case the remote cluster is unreachable
1578
7c673cae
FG
1579 // chain the shut down sequence (reverse order)
1580 Context *ctx = new FunctionContext(
1581 [this, r](int _r) {
1582 update_mirror_image_status(true, STATE_STOPPED);
1583 handle_shut_down(r);
1584 });
31f18b77
FG
1585
1586 // close the remote journal
7c673cae
FG
1587 if (m_remote_journaler != nullptr) {
1588 ctx = new FunctionContext([this, ctx](int r) {
1589 delete m_remote_journaler;
1590 m_remote_journaler = nullptr;
1591 ctx->complete(0);
1592 });
1593 ctx = new FunctionContext([this, ctx](int r) {
1594 m_remote_journaler->remove_listener(&m_remote_listener);
1595 m_remote_journaler->shut_down(ctx);
1596 });
7c673cae 1597 }
31f18b77
FG
1598
1599 // stop the replay of remote journal events
1600 if (m_replay_handler != nullptr) {
1601 ctx = new FunctionContext([this, ctx](int r) {
1602 delete m_replay_handler;
1603 m_replay_handler = nullptr;
1604
1605 m_event_replay_tracker.wait_for_ops(ctx);
1606 });
1607 ctx = new FunctionContext([this, ctx](int r) {
1608 m_remote_journaler->stop_replay(ctx);
1609 });
1610 }
1611
1612 // close the local image (release exclusive lock)
1613 if (m_local_image_ctx) {
1614 ctx = new FunctionContext([this, ctx](int r) {
1615 CloseImageRequest<I> *request = CloseImageRequest<I>::create(
1616 &m_local_image_ctx, ctx);
1617 request->send();
1618 });
1619 }
1620
1621 // shut down event replay into the local image
7c673cae
FG
1622 if (m_local_journal != nullptr) {
1623 ctx = new FunctionContext([this, ctx](int r) {
1624 m_local_journal = nullptr;
1625 ctx->complete(0);
1626 });
1627 if (m_local_replay != nullptr) {
1628 ctx = new FunctionContext([this, ctx](int r) {
1629 m_local_journal->stop_external_replay();
1630 m_local_replay = nullptr;
1631
1632 EventPreprocessor<I>::destroy(m_event_preprocessor);
1633 m_event_preprocessor = nullptr;
1634 ctx->complete(0);
1635 });
1636 }
1637 ctx = new FunctionContext([this, ctx](int r) {
7c673cae
FG
1638 // blocks if listener notification is in-progress
1639 m_local_journal->remove_listener(m_journal_listener);
7c673cae
FG
1640 ctx->complete(0);
1641 });
31f18b77
FG
1642 }
1643
1644 // wait for all local in-flight replay events to complete
1645 ctx = new FunctionContext([this, ctx](int r) {
1646 if (r < 0) {
1647 derr << "error shutting down journal replay: " << cpp_strerror(r)
1648 << dendl;
1649 }
1650
1651 m_event_replay_tracker.wait_for_ops(ctx);
1652 });
1653
1654 // flush any local in-flight replay events
1655 if (m_local_replay != nullptr) {
7c673cae 1656 ctx = new FunctionContext([this, ctx](int r) {
31f18b77 1657 m_local_replay->shut_down(true, ctx);
7c673cae
FG
1658 });
1659 }
31f18b77 1660
7c673cae
FG
1661 m_threads->work_queue->queue(ctx, 0);
1662}
1663
1664template <typename I>
1665void ImageReplayer<I>::handle_shut_down(int r) {
1666 reschedule_update_status_task(-1);
1667
1668 {
1669 Mutex::Locker locker(m_lock);
1670
1671 // if status updates are in-flight, wait for them to complete
1672 // before proceeding
1673 if (m_in_flight_status_updates > 0) {
1674 if (m_on_update_status_finish == nullptr) {
1675 dout(20) << "waiting for in-flight status update" << dendl;
1676 m_on_update_status_finish = new FunctionContext(
1677 [this, r](int _r) {
1678 handle_shut_down(r);
1679 });
1680 }
1681 return;
1682 }
1683
d2e6a577
FG
1684 bool delete_requested = false;
1685 if (m_delete_requested && !m_local_image_id.empty()) {
1686 assert(m_remote_image.image_id.empty());
1687 dout(0) << "remote image no longer exists: scheduling deletion" << dendl;
1688 delete_requested = true;
1689 }
1690 if (delete_requested || m_resync_requested) {
7c673cae
FG
1691 m_image_deleter->schedule_image_delete(m_local,
1692 m_local_pool_id,
c07f9fc5 1693 m_global_image_id,
d2e6a577
FG
1694 m_resync_requested);
1695
1696 m_local_image_id = "";
1697 m_resync_requested = false;
1698 if (m_delete_requested) {
1699 unregister_admin_socket_hook();
1700 m_delete_requested = false;
1701 }
1702 } else if (m_last_r == -ENOENT &&
1703 m_local_image_id.empty() && m_remote_image.image_id.empty()) {
1704 dout(0) << "mirror image no longer exists" << dendl;
1705 unregister_admin_socket_hook();
1706 m_finished = true;
7c673cae
FG
1707 }
1708 }
1709
1710 dout(20) << "stop complete" << dendl;
1711 m_local_ioctx.close();
1712
1713 ReplayStatusFormatter<I>::destroy(m_replay_status_formatter);
1714 m_replay_status_formatter = nullptr;
1715
1716 Context *on_start = nullptr;
1717 Context *on_stop = nullptr;
1718 {
1719 Mutex::Locker locker(m_lock);
1720 std::swap(on_start, m_on_start_finish);
1721 std::swap(on_stop, m_on_stop_finish);
1722 m_stop_requested = false;
1723 assert(m_delayed_preprocess_task == nullptr);
1724 assert(m_state == STATE_STOPPING);
1725 m_state = STATE_STOPPED;
1726 }
1727
1728 if (on_start != nullptr) {
1729 dout(20) << "on start finish complete, r=" << r << dendl;
1730 on_start->complete(r);
1731 r = 0;
1732 }
1733 if (on_stop != nullptr) {
1734 dout(20) << "on stop finish complete, r=" << r << dendl;
1735 on_stop->complete(r);
1736 }
1737}
1738
1739template <typename I>
1740void ImageReplayer<I>::handle_remote_journal_metadata_updated() {
1741 dout(20) << dendl;
1742
1743 cls::journal::Client client;
1744 {
1745 Mutex::Locker locker(m_lock);
1746 if (!is_running_()) {
1747 return;
1748 }
1749
1750 int r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
1751 if (r < 0) {
1752 derr << "failed to retrieve client: " << cpp_strerror(r) << dendl;
1753 return;
1754 }
1755 }
1756
1757 if (client.state != cls::journal::CLIENT_STATE_CONNECTED) {
1758 dout(0) << "client flagged disconnected, stopping image replay" << dendl;
1759 stop(nullptr, false, -ENOTCONN, "disconnected");
1760 }
1761}
1762
1763template <typename I>
1764std::string ImageReplayer<I>::to_string(const State state) {
1765 switch (state) {
1766 case ImageReplayer<I>::STATE_STARTING:
1767 return "Starting";
1768 case ImageReplayer<I>::STATE_REPLAYING:
1769 return "Replaying";
1770 case ImageReplayer<I>::STATE_REPLAY_FLUSHING:
1771 return "ReplayFlushing";
1772 case ImageReplayer<I>::STATE_STOPPING:
1773 return "Stopping";
1774 case ImageReplayer<I>::STATE_STOPPED:
1775 return "Stopped";
1776 default:
1777 break;
1778 }
1779 return "Unknown(" + stringify(state) + ")";
1780}
1781
1782template <typename I>
1783void ImageReplayer<I>::resync_image(Context *on_finish) {
1784 dout(20) << dendl;
1785
d2e6a577
FG
1786 m_resync_requested = true;
1787 stop(on_finish);
1788}
1789
1790template <typename I>
1791void ImageReplayer<I>::register_admin_socket_hook() {
1792 if (m_asok_hook != nullptr) {
1793 return;
7c673cae
FG
1794 }
1795
d2e6a577
FG
1796 dout(20) << "registered asok hook: " << m_name << dendl;
1797 auto asok_hook = new ImageReplayerAdminSocketHook<I>(g_ceph_context, m_name,
1798 this);
1799 int r = asok_hook->register_commands();
1800 if (r < 0) {
1801 derr << "error registering admin socket commands" << dendl;
1802 delete asok_hook;
1803 asok_hook = nullptr;
1804 return;
1805 }
1806
1807 m_asok_hook = asok_hook;
1808}
1809
1810template <typename I>
1811void ImageReplayer<I>::unregister_admin_socket_hook() {
1812 dout(20) << dendl;
1813
1814 delete m_asok_hook;
1815 m_asok_hook = nullptr;
7c673cae
FG
1816}
1817
1818template <typename I>
1819std::ostream &operator<<(std::ostream &os, const ImageReplayer<I> &replayer)
1820{
1821 os << "ImageReplayer: " << &replayer << " [" << replayer.get_local_pool_id()
1822 << "/" << replayer.get_global_image_id() << "]";
1823 return os;
1824}
1825
1826} // namespace mirror
1827} // namespace rbd
1828
1829template class rbd::mirror::ImageReplayer<librbd::ImageCtx>;