]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/ImageReplayer.cc
update sources to 12.2.10
[ceph.git] / ceph / src / tools / rbd_mirror / ImageReplayer.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "include/compat.h"
5#include "common/Formatter.h"
6#include "common/debug.h"
7#include "common/errno.h"
8#include "include/stringify.h"
9#include "cls/rbd/cls_rbd_client.h"
10#include "common/Timer.h"
11#include "common/WorkQueue.h"
12#include "global/global_context.h"
13#include "journal/Journaler.h"
14#include "journal/ReplayHandler.h"
15#include "journal/Settings.h"
16#include "librbd/ExclusiveLock.h"
17#include "librbd/ImageCtx.h"
18#include "librbd/ImageState.h"
19#include "librbd/Journal.h"
20#include "librbd/Operations.h"
21#include "librbd/Utils.h"
22#include "librbd/journal/Replay.h"
d2e6a577 23#include "ImageDeleter.h"
7c673cae 24#include "ImageReplayer.h"
7c673cae
FG
25#include "Threads.h"
26#include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
27#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
28#include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
29#include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
d2e6a577 30#include "tools/rbd_mirror/image_replayer/PrepareRemoteImageRequest.h"
7c673cae
FG
31#include "tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h"
32
33#define dout_context g_ceph_context
34#define dout_subsys ceph_subsys_rbd_mirror
35#undef dout_prefix
36#define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
37 << __func__ << ": "
38
39using std::map;
40using std::string;
41using std::unique_ptr;
42using std::shared_ptr;
43using std::vector;
44
45namespace rbd {
46namespace mirror {
47
48using librbd::util::create_context_callback;
49using librbd::util::create_rados_callback;
50using namespace rbd::mirror::image_replayer;
51
52template <typename I>
53std::ostream &operator<<(std::ostream &os,
54 const typename ImageReplayer<I>::State &state);
55
56namespace {
57
58template <typename I>
59struct ReplayHandler : public ::journal::ReplayHandler {
60 ImageReplayer<I> *replayer;
61 ReplayHandler(ImageReplayer<I> *replayer) : replayer(replayer) {}
62 void get() override {}
63 void put() override {}
64
65 void handle_entries_available() override {
66 replayer->handle_replay_ready();
67 }
68 void handle_complete(int r) override {
69 std::stringstream ss;
70 if (r < 0) {
71 ss << "replay completed with error: " << cpp_strerror(r);
72 }
73 replayer->handle_replay_complete(r, ss.str());
74 }
75};
76
3efd9988 77template <typename I>
7c673cae
FG
78class ImageReplayerAdminSocketCommand {
79public:
3efd9988
FG
80 ImageReplayerAdminSocketCommand(const std::string &desc,
81 ImageReplayer<I> *replayer)
82 : desc(desc), replayer(replayer) {
83 }
7c673cae
FG
84 virtual ~ImageReplayerAdminSocketCommand() {}
85 virtual bool call(Formatter *f, stringstream *ss) = 0;
3efd9988
FG
86
87 std::string desc;
88 ImageReplayer<I> *replayer;
89 bool registered = false;
7c673cae
FG
90};
91
92template <typename I>
3efd9988 93class StatusCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 94public:
3efd9988
FG
95 explicit StatusCommand(const std::string &desc, ImageReplayer<I> *replayer)
96 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
97 }
7c673cae
FG
98
99 bool call(Formatter *f, stringstream *ss) override {
3efd9988 100 this->replayer->print_status(f, ss);
7c673cae
FG
101 return true;
102 }
7c673cae
FG
103};
104
105template <typename I>
3efd9988 106class StartCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 107public:
3efd9988
FG
108 explicit StartCommand(const std::string &desc, ImageReplayer<I> *replayer)
109 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
110 }
7c673cae
FG
111
112 bool call(Formatter *f, stringstream *ss) override {
3efd9988 113 this->replayer->start(nullptr, true);
7c673cae
FG
114 return true;
115 }
7c673cae
FG
116};
117
118template <typename I>
3efd9988 119class StopCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 120public:
3efd9988
FG
121 explicit StopCommand(const std::string &desc, ImageReplayer<I> *replayer)
122 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
123 }
7c673cae
FG
124
125 bool call(Formatter *f, stringstream *ss) override {
3efd9988 126 this->replayer->stop(nullptr, true);
7c673cae
FG
127 return true;
128 }
7c673cae
FG
129};
130
131template <typename I>
3efd9988 132class RestartCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 133public:
3efd9988
FG
134 explicit RestartCommand(const std::string &desc, ImageReplayer<I> *replayer)
135 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
136 }
7c673cae
FG
137
138 bool call(Formatter *f, stringstream *ss) override {
3efd9988 139 this->replayer->restart();
7c673cae
FG
140 return true;
141 }
7c673cae
FG
142};
143
144template <typename I>
3efd9988 145class FlushCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 146public:
3efd9988
FG
147 explicit FlushCommand(const std::string &desc, ImageReplayer<I> *replayer)
148 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
149 }
7c673cae
FG
150
151 bool call(Formatter *f, stringstream *ss) override {
28e407b8 152 this->replayer->flush();
7c673cae
FG
153 return true;
154 }
7c673cae
FG
155};
156
157template <typename I>
158class ImageReplayerAdminSocketHook : public AdminSocketHook {
159public:
160 ImageReplayerAdminSocketHook(CephContext *cct, const std::string &name,
31f18b77 161 ImageReplayer<I> *replayer)
3efd9988
FG
162 : admin_socket(cct->get_admin_socket()),
163 commands{{"rbd mirror flush " + name,
164 new FlushCommand<I>("flush rbd mirror " + name, replayer)},
165 {"rbd mirror restart " + name,
166 new RestartCommand<I>("restart rbd mirror " + name, replayer)},
167 {"rbd mirror start " + name,
168 new StartCommand<I>("start rbd mirror " + name, replayer)},
169 {"rbd mirror status " + name,
170 new StatusCommand<I>("get status for rbd mirror " + name, replayer)},
171 {"rbd mirror stop " + name,
172 new StopCommand<I>("stop rbd mirror " + name, replayer)}} {
d2e6a577
FG
173 }
174
175 int register_commands() {
3efd9988
FG
176 for (auto &it : commands) {
177 int r = admin_socket->register_command(it.first, it.first, this,
178 it.second->desc);
179 if (r < 0) {
180 return r;
181 }
182 it.second->registered = true;
7c673cae 183 }
d2e6a577 184 return 0;
7c673cae
FG
185 }
186
187 ~ImageReplayerAdminSocketHook() override {
3efd9988
FG
188 for (auto &it : commands) {
189 if (it.second->registered) {
190 admin_socket->unregister_command(it.first);
191 }
192 delete it.second;
7c673cae 193 }
31f18b77 194 commands.clear();
7c673cae
FG
195 }
196
197 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
198 bufferlist& out) override {
3efd9988 199 auto i = commands.find(command);
7c673cae
FG
200 assert(i != commands.end());
201 Formatter *f = Formatter::create(format);
202 stringstream ss;
203 bool r = i->second->call(f, &ss);
204 delete f;
205 out.append(ss);
206 return r;
207 }
208
209private:
3efd9988 210 typedef std::map<std::string, ImageReplayerAdminSocketCommand<I> *> Commands;
7c673cae
FG
211
212 AdminSocket *admin_socket;
213 Commands commands;
214};
215
216uint32_t calculate_replay_delay(const utime_t &event_time,
217 int mirroring_replay_delay) {
218 if (mirroring_replay_delay <= 0) {
219 return 0;
220 }
221
222 utime_t now = ceph_clock_now();
223 if (event_time + mirroring_replay_delay <= now) {
224 return 0;
225 }
226
227 // ensure it is rounded up when converting to integer
228 return (event_time + mirroring_replay_delay - now) + 1;
229}
230
231} // anonymous namespace
232
233template <typename I>
234void ImageReplayer<I>::BootstrapProgressContext::update_progress(
235 const std::string &description, bool flush)
236{
237 const std::string desc = "bootstrapping, " + description;
238 replayer->set_state_description(0, desc);
239 if (flush) {
240 replayer->update_mirror_image_status(false, boost::none);
241 }
242}
243
244template <typename I>
245void ImageReplayer<I>::RemoteJournalerListener::handle_update(
246 ::journal::JournalMetadata *) {
247 FunctionContext *ctx = new FunctionContext([this](int r) {
248 replayer->handle_remote_journal_metadata_updated();
249 });
250 replayer->m_threads->work_queue->queue(ctx, 0);
251}
252
253template <typename I>
d2e6a577 254ImageReplayer<I>::ImageReplayer(Threads<I> *threads,
c07f9fc5 255 ImageDeleter<I>* image_deleter,
31f18b77 256 InstanceWatcher<I> *instance_watcher,
7c673cae
FG
257 RadosRef local,
258 const std::string &local_mirror_uuid,
259 int64_t local_pool_id,
260 const std::string &global_image_id) :
261 m_threads(threads),
262 m_image_deleter(image_deleter),
31f18b77 263 m_instance_watcher(instance_watcher),
7c673cae
FG
264 m_local(local),
265 m_local_mirror_uuid(local_mirror_uuid),
266 m_local_pool_id(local_pool_id),
28e407b8 267 m_global_image_id(global_image_id), m_local_image_name(global_image_id),
7c673cae
FG
268 m_lock("rbd::mirror::ImageReplayer " + stringify(local_pool_id) + " " +
269 global_image_id),
270 m_progress_cxt(this),
271 m_journal_listener(new JournalListener(this)),
272 m_remote_listener(this)
273{
274 // Register asok commands using a temporary "remote_pool_name/global_image_id"
275 // name. When the image name becomes known on start the asok commands will be
276 // re-registered using "remote_pool_name/remote_image_name" name.
277
278 std::string pool_name;
279 int r = m_local->pool_reverse_lookup(m_local_pool_id, &pool_name);
280 if (r < 0) {
281 derr << "error resolving local pool " << m_local_pool_id
282 << ": " << cpp_strerror(r) << dendl;
283 pool_name = stringify(m_local_pool_id);
284 }
285
286 m_name = pool_name + "/" + m_global_image_id;
d2e6a577 287 register_admin_socket_hook();
7c673cae
FG
288}
289
290template <typename I>
291ImageReplayer<I>::~ImageReplayer()
292{
d2e6a577 293 unregister_admin_socket_hook();
7c673cae
FG
294 assert(m_event_preprocessor == nullptr);
295 assert(m_replay_status_formatter == nullptr);
296 assert(m_local_image_ctx == nullptr);
297 assert(m_local_replay == nullptr);
298 assert(m_remote_journaler == nullptr);
299 assert(m_replay_handler == nullptr);
300 assert(m_on_start_finish == nullptr);
301 assert(m_on_stop_finish == nullptr);
302 assert(m_bootstrap_request == nullptr);
303 assert(m_in_flight_status_updates == 0);
304
305 delete m_journal_listener;
7c673cae
FG
306}
307
c07f9fc5
FG
308template <typename I>
309image_replayer::HealthState ImageReplayer<I>::get_health_state() const {
310 Mutex::Locker locker(m_lock);
311
312 if (!m_mirror_image_status_state) {
313 return image_replayer::HEALTH_STATE_OK;
314 } else if (*m_mirror_image_status_state ==
315 cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING ||
316 *m_mirror_image_status_state ==
317 cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN) {
318 return image_replayer::HEALTH_STATE_WARNING;
319 }
320 return image_replayer::HEALTH_STATE_ERROR;
321}
322
7c673cae 323template <typename I>
d2e6a577
FG
324void ImageReplayer<I>::add_peer(const std::string &peer_uuid,
325 librados::IoCtx &io_ctx) {
7c673cae 326 Mutex::Locker locker(m_lock);
d2e6a577
FG
327 auto it = m_peers.find({peer_uuid});
328 if (it == m_peers.end()) {
329 m_peers.insert({peer_uuid, io_ctx});
7c673cae
FG
330 }
331}
332
7c673cae
FG
333template <typename I>
334void ImageReplayer<I>::set_state_description(int r, const std::string &desc) {
335 dout(20) << r << " " << desc << dendl;
336
337 Mutex::Locker l(m_lock);
338 m_last_r = r;
339 m_state_desc = desc;
340}
341
342template <typename I>
343void ImageReplayer<I>::start(Context *on_finish, bool manual)
344{
345 dout(20) << "on_finish=" << on_finish << dendl;
346
347 int r = 0;
348 {
349 Mutex::Locker locker(m_lock);
350 if (!is_stopped_()) {
351 derr << "already running" << dendl;
352 r = -EINVAL;
353 } else if (m_manual_stop && !manual) {
354 dout(5) << "stopped manually, ignoring start without manual flag"
355 << dendl;
356 r = -EPERM;
357 } else {
358 m_state = STATE_STARTING;
359 m_last_r = 0;
360 m_state_desc.clear();
361 m_manual_stop = false;
d2e6a577 362 m_delete_requested = false;
7c673cae
FG
363
364 if (on_finish != nullptr) {
365 assert(m_on_start_finish == nullptr);
366 m_on_start_finish = on_finish;
367 }
368 assert(m_on_stop_finish == nullptr);
369 }
370 }
371
372 if (r < 0) {
373 if (on_finish) {
374 on_finish->complete(r);
375 }
376 return;
377 }
378
379 r = m_local->ioctx_create2(m_local_pool_id, m_local_ioctx);
380 if (r < 0) {
381 derr << "error opening ioctx for local pool " << m_local_pool_id
382 << ": " << cpp_strerror(r) << dendl;
383 on_start_fail(r, "error opening local pool");
384 return;
385 }
386
d2e6a577
FG
387 wait_for_deletion();
388}
389
390template <typename I>
391void ImageReplayer<I>::wait_for_deletion() {
392 dout(20) << dendl;
393
394 Context *ctx = create_context_callback<
395 ImageReplayer, &ImageReplayer<I>::handle_wait_for_deletion>(this);
396 m_image_deleter->wait_for_scheduled_deletion(
397 m_local_pool_id, m_global_image_id, ctx, false);
398}
399
400template <typename I>
401void ImageReplayer<I>::handle_wait_for_deletion(int r) {
402 dout(20) << "r=" << r << dendl;
403
404 if (r == -ECANCELED) {
405 on_start_fail(0, "");
406 return;
407 } else if (r < 0) {
408 on_start_fail(r, "error waiting for image deletion");
409 return;
410 }
411
7c673cae
FG
412 prepare_local_image();
413}
414
415template <typename I>
416void ImageReplayer<I>::prepare_local_image() {
417 dout(20) << dendl;
418
d2e6a577 419 m_local_image_id = "";
7c673cae
FG
420 Context *ctx = create_context_callback<
421 ImageReplayer, &ImageReplayer<I>::handle_prepare_local_image>(this);
422 auto req = PrepareLocalImageRequest<I>::create(
28e407b8 423 m_local_ioctx, m_global_image_id, &m_local_image_id, &m_local_image_name,
7c673cae
FG
424 &m_local_image_tag_owner, m_threads->work_queue, ctx);
425 req->send();
426}
427
428template <typename I>
429void ImageReplayer<I>::handle_prepare_local_image(int r) {
430 dout(20) << "r=" << r << dendl;
431
432 if (r == -ENOENT) {
433 dout(20) << "local image does not exist" << dendl;
434 } else if (r < 0) {
435 on_start_fail(r, "error preparing local image for replay");
436 return;
28e407b8
AA
437 } else {
438 reregister_admin_socket_hook();
7c673cae
FG
439 }
440
441 // local image doesn't exist or is non-primary
d2e6a577 442 prepare_remote_image();
7c673cae
FG
443}
444
445template <typename I>
d2e6a577 446void ImageReplayer<I>::prepare_remote_image() {
7c673cae 447 dout(20) << dendl;
b32b8144
FG
448 if (m_peers.empty()) {
449 // technically nothing to bootstrap, but it handles the status update
450 bootstrap();
451 return;
452 }
7c673cae 453
d2e6a577
FG
454 // TODO need to support multiple remote images
455 assert(!m_peers.empty());
456 m_remote_image = {*m_peers.begin()};
457
458 Context *ctx = create_context_callback<
459 ImageReplayer, &ImageReplayer<I>::handle_prepare_remote_image>(this);
460 auto req = PrepareRemoteImageRequest<I>::create(
b32b8144
FG
461 m_threads, m_remote_image.io_ctx, m_global_image_id, m_local_mirror_uuid,
462 m_local_image_id, &m_remote_image.mirror_uuid, &m_remote_image.image_id,
463 &m_remote_journaler, &m_client_state, &m_client_meta, ctx);
d2e6a577
FG
464 req->send();
465}
466
467template <typename I>
468void ImageReplayer<I>::handle_prepare_remote_image(int r) {
469 dout(20) << "r=" << r << dendl;
470
b32b8144
FG
471 assert(r < 0 ? m_remote_journaler == nullptr : m_remote_journaler != nullptr);
472 if (r < 0 && !m_local_image_id.empty() &&
473 m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) {
474 // local image is primary -- fall-through
475 } else if (r == -ENOENT) {
d2e6a577
FG
476 dout(20) << "remote image does not exist" << dendl;
477
478 // TODO need to support multiple remote images
479 if (!m_local_image_id.empty() &&
480 m_local_image_tag_owner == m_remote_image.mirror_uuid) {
481 // local image exists and is non-primary and linked to the missing
482 // remote image
483
484 m_delete_requested = true;
485 on_start_fail(0, "remote image no longer exists");
486 } else {
487 on_start_fail(-ENOENT, "remote image does not exist");
488 }
489 return;
490 } else if (r < 0) {
491 on_start_fail(r, "error retrieving remote image id");
7c673cae
FG
492 return;
493 }
494
d2e6a577
FG
495 bootstrap();
496}
497
498template <typename I>
499void ImageReplayer<I>::bootstrap() {
500 dout(20) << dendl;
7c673cae 501
b32b8144
FG
502 if (!m_local_image_id.empty() &&
503 m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) {
504 dout(5) << "local image is primary" << dendl;
505 on_start_fail(0, "local image is primary");
506 return;
507 } else if (m_peers.empty()) {
508 dout(5) << "no peer clusters" << dendl;
509 on_start_fail(-ENOENT, "no peer clusters");
510 return;
511 }
7c673cae
FG
512
513 Context *ctx = create_context_callback<
514 ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
515
516 BootstrapRequest<I> *request = BootstrapRequest<I>::create(
31f18b77 517 m_local_ioctx, m_remote_image.io_ctx, m_instance_watcher,
7c673cae
FG
518 &m_local_image_ctx, m_local_image_id, m_remote_image.image_id,
519 m_global_image_id, m_threads->work_queue, m_threads->timer,
520 &m_threads->timer_lock, m_local_mirror_uuid, m_remote_image.mirror_uuid,
b32b8144
FG
521 m_remote_journaler, &m_client_state, &m_client_meta, ctx,
522 &m_resync_requested, &m_progress_cxt);
7c673cae
FG
523
524 {
525 Mutex::Locker locker(m_lock);
526 request->get();
527 m_bootstrap_request = request;
528 }
529
530 update_mirror_image_status(false, boost::none);
531 reschedule_update_status_task(10);
532
533 request->send();
534}
535
536template <typename I>
537void ImageReplayer<I>::handle_bootstrap(int r) {
538 dout(20) << "r=" << r << dendl;
539 {
540 Mutex::Locker locker(m_lock);
541 m_bootstrap_request->put();
542 m_bootstrap_request = nullptr;
543 if (m_local_image_ctx) {
544 m_local_image_id = m_local_image_ctx->id;
545 }
546 }
547
548 if (r == -EREMOTEIO) {
549 m_local_image_tag_owner = "";
c07f9fc5
FG
550 dout(5) << "remote image is non-primary" << dendl;
551 on_start_fail(-EREMOTEIO, "remote image is non-primary");
7c673cae
FG
552 return;
553 } else if (r == -EEXIST) {
554 m_local_image_tag_owner = "";
555 on_start_fail(r, "split-brain detected");
556 return;
557 } else if (r < 0) {
558 on_start_fail(r, "error bootstrapping replay");
559 return;
560 } else if (on_start_interrupted()) {
561 return;
d2e6a577
FG
562 } else if (m_resync_requested) {
563 on_start_fail(0, "resync requested");
564 return;
7c673cae
FG
565 }
566
567 assert(m_local_journal == nullptr);
568 {
569 RWLock::RLocker snap_locker(m_local_image_ctx->snap_lock);
570 if (m_local_image_ctx->journal != nullptr) {
571 m_local_journal = m_local_image_ctx->journal;
572 m_local_journal->add_listener(m_journal_listener);
573 }
574 }
575
576 if (m_local_journal == nullptr) {
577 on_start_fail(-EINVAL, "error accessing local journal");
578 return;
579 }
580
7c673cae
FG
581 update_mirror_image_status(false, boost::none);
582 init_remote_journaler();
583}
584
585template <typename I>
586void ImageReplayer<I>::init_remote_journaler() {
587 dout(20) << dendl;
588
589 Context *ctx = create_context_callback<
590 ImageReplayer, &ImageReplayer<I>::handle_init_remote_journaler>(this);
591 m_remote_journaler->init(ctx);
592}
593
594template <typename I>
595void ImageReplayer<I>::handle_init_remote_journaler(int r) {
596 dout(20) << "r=" << r << dendl;
597
598 if (r < 0) {
599 derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl;
600 on_start_fail(r, "error initializing remote journal");
601 return;
602 } else if (on_start_interrupted()) {
603 return;
604 }
605
606 m_remote_journaler->add_listener(&m_remote_listener);
607
608 cls::journal::Client client;
609 r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
610 if (r < 0) {
611 derr << "error retrieving remote journal client: " << cpp_strerror(r)
612 << dendl;
613 on_start_fail(r, "error retrieving remote journal client");
614 return;
615 }
616
b32b8144
FG
617 dout(5) << "image_id=" << m_local_image_id << ", "
618 << "client_meta.image_id=" << m_client_meta.image_id << ", "
619 << "client.state=" << client.state << dendl;
d2e6a577
FG
620 if (m_client_meta.image_id == m_local_image_id &&
621 client.state != cls::journal::CLIENT_STATE_CONNECTED) {
7c673cae
FG
622 dout(5) << "client flagged disconnected, stopping image replay" << dendl;
623 if (m_local_image_ctx->mirroring_resync_after_disconnect) {
d2e6a577
FG
624 m_resync_requested = true;
625 on_start_fail(-ENOTCONN, "disconnected: automatic resync");
626 } else {
627 on_start_fail(-ENOTCONN, "disconnected");
7c673cae 628 }
7c673cae
FG
629 return;
630 }
631
632 start_replay();
633}
634
635template <typename I>
636void ImageReplayer<I>::start_replay() {
637 dout(20) << dendl;
638
639 Context *start_ctx = create_context_callback<
640 ImageReplayer, &ImageReplayer<I>::handle_start_replay>(this);
641 m_local_journal->start_external_replay(&m_local_replay, start_ctx);
642}
643
644template <typename I>
645void ImageReplayer<I>::handle_start_replay(int r) {
646 dout(20) << "r=" << r << dendl;
647
648 if (r < 0) {
649 assert(m_local_replay == nullptr);
650 derr << "error starting external replay on local image "
651 << m_local_image_id << ": " << cpp_strerror(r) << dendl;
652 on_start_fail(r, "error starting replay on local image");
653 return;
654 }
655
91327a77
AA
656 m_replay_status_formatter =
657 ReplayStatusFormatter<I>::create(m_remote_journaler, m_local_mirror_uuid);
658
7c673cae
FG
659 Context *on_finish(nullptr);
660 {
661 Mutex::Locker locker(m_lock);
662 assert(m_state == STATE_STARTING);
663 m_state = STATE_REPLAYING;
664 std::swap(m_on_start_finish, on_finish);
665 }
666
667 m_event_preprocessor = EventPreprocessor<I>::create(
668 *m_local_image_ctx, *m_remote_journaler, m_local_mirror_uuid,
669 &m_client_meta, m_threads->work_queue);
7c673cae
FG
670
671 update_mirror_image_status(true, boost::none);
672 reschedule_update_status_task(30);
673
7c673cae
FG
674 if (on_replay_interrupted()) {
675 return;
676 }
677
678 {
679 CephContext *cct = static_cast<CephContext *>(m_local->cct());
181888fb
FG
680 double poll_seconds = cct->_conf->get_val<double>(
681 "rbd_mirror_journal_poll_age");
7c673cae
FG
682
683 Mutex::Locker locker(m_lock);
684 m_replay_handler = new ReplayHandler<I>(this);
685 m_remote_journaler->start_live_replay(m_replay_handler, poll_seconds);
686
687 dout(20) << "m_remote_journaler=" << *m_remote_journaler << dendl;
688 }
689
d2e6a577
FG
690 dout(20) << "start succeeded" << dendl;
691 if (on_finish != nullptr) {
692 dout(20) << "on finish complete, r=" << r << dendl;
693 on_finish->complete(r);
694 }
7c673cae
FG
695}
696
697template <typename I>
698void ImageReplayer<I>::on_start_fail(int r, const std::string &desc)
699{
700 dout(20) << "r=" << r << dendl;
701 Context *ctx = new FunctionContext([this, r, desc](int _r) {
702 {
703 Mutex::Locker locker(m_lock);
704 assert(m_state == STATE_STARTING);
705 m_state = STATE_STOPPING;
d2e6a577 706 if (r < 0 && r != -ECANCELED && r != -EREMOTEIO && r != -ENOENT) {
7c673cae
FG
707 derr << "start failed: " << cpp_strerror(r) << dendl;
708 } else {
709 dout(20) << "start canceled" << dendl;
710 }
711 }
712
713 set_state_description(r, desc);
714 update_mirror_image_status(false, boost::none);
715 reschedule_update_status_task(-1);
716 shut_down(r);
717 });
718 m_threads->work_queue->queue(ctx, 0);
719}
720
721template <typename I>
722bool ImageReplayer<I>::on_start_interrupted()
723{
724 Mutex::Locker locker(m_lock);
725 assert(m_state == STATE_STARTING);
726 if (m_on_stop_finish == nullptr) {
727 return false;
728 }
729
730 on_start_fail(-ECANCELED);
731 return true;
732}
733
734template <typename I>
735void ImageReplayer<I>::stop(Context *on_finish, bool manual, int r,
736 const std::string& desc)
737{
738 dout(20) << "on_finish=" << on_finish << ", manual=" << manual
739 << ", desc=" << desc << dendl;
740
d2e6a577
FG
741 m_image_deleter->cancel_waiter(m_local_pool_id, m_global_image_id);
742
7c673cae
FG
743 image_replayer::BootstrapRequest<I> *bootstrap_request = nullptr;
744 bool shut_down_replay = false;
745 bool running = true;
7c673cae
FG
746 {
747 Mutex::Locker locker(m_lock);
748
749 if (!is_running_()) {
750 running = false;
751 } else {
752 if (!is_stopped_()) {
753 if (m_state == STATE_STARTING) {
754 dout(20) << "canceling start" << dendl;
755 if (m_bootstrap_request) {
756 bootstrap_request = m_bootstrap_request;
757 bootstrap_request->get();
758 }
759 } else {
760 dout(20) << "interrupting replay" << dendl;
761 shut_down_replay = true;
762 }
763
764 assert(m_on_stop_finish == nullptr);
765 std::swap(m_on_stop_finish, on_finish);
766 m_stop_requested = true;
767 m_manual_stop = manual;
7c673cae
FG
768 }
769 }
770 }
771
772 // avoid holding lock since bootstrap request will update status
773 if (bootstrap_request != nullptr) {
774 bootstrap_request->cancel();
775 bootstrap_request->put();
776 }
777
7c673cae
FG
778 if (!running) {
779 dout(20) << "not running" << dendl;
780 if (on_finish) {
781 on_finish->complete(-EINVAL);
782 }
783 return;
784 }
785
786 if (shut_down_replay) {
787 on_stop_journal_replay(r, desc);
788 } else if (on_finish != nullptr) {
789 on_finish->complete(0);
790 }
791}
792
793template <typename I>
794void ImageReplayer<I>::on_stop_journal_replay(int r, const std::string &desc)
795{
796 dout(20) << "enter" << dendl;
797
798 {
799 Mutex::Locker locker(m_lock);
800 if (m_state != STATE_REPLAYING) {
801 // might be invoked multiple times while stopping
802 return;
803 }
804 m_stop_requested = true;
805 m_state = STATE_STOPPING;
806 }
807
808 set_state_description(r, desc);
809 update_mirror_image_status(false, boost::none);
810 reschedule_update_status_task(-1);
811 shut_down(0);
812}
813
814template <typename I>
815void ImageReplayer<I>::handle_replay_ready()
816{
817 dout(20) << "enter" << dendl;
818 if (on_replay_interrupted()) {
819 return;
820 }
821
822 if (!m_remote_journaler->try_pop_front(&m_replay_entry, &m_replay_tag_tid)) {
823 return;
824 }
825
826 m_event_replay_tracker.start_op();
31f18b77
FG
827
828 m_lock.Lock();
829 bool stopping = (m_state == STATE_STOPPING);
830 m_lock.Unlock();
831
832 if (stopping) {
833 dout(10) << "stopping event replay" << dendl;
834 m_event_replay_tracker.finish_op();
835 return;
836 }
837
7c673cae
FG
838 if (m_replay_tag_valid && m_replay_tag.tid == m_replay_tag_tid) {
839 preprocess_entry();
840 return;
841 }
842
843 replay_flush();
844}
845
846template <typename I>
847void ImageReplayer<I>::restart(Context *on_finish)
848{
849 FunctionContext *ctx = new FunctionContext(
850 [this, on_finish](int r) {
851 if (r < 0) {
852 // Try start anyway.
853 }
854 start(on_finish, true);
855 });
856 stop(ctx);
857}
858
859template <typename I>
860void ImageReplayer<I>::flush(Context *on_finish)
861{
862 dout(20) << "enter" << dendl;
863
864 {
865 Mutex::Locker locker(m_lock);
866 if (m_state == STATE_REPLAYING) {
867 Context *ctx = new FunctionContext(
868 [on_finish](int r) {
869 if (on_finish != nullptr) {
870 on_finish->complete(r);
871 }
872 });
873 on_flush_local_replay_flush_start(ctx);
874 return;
875 }
876 }
877
878 if (on_finish) {
879 on_finish->complete(0);
880 }
881}
882
883template <typename I>
884void ImageReplayer<I>::on_flush_local_replay_flush_start(Context *on_flush)
885{
886 dout(20) << "enter" << dendl;
887 FunctionContext *ctx = new FunctionContext(
888 [this, on_flush](int r) {
889 on_flush_local_replay_flush_finish(on_flush, r);
890 });
891
892 assert(m_lock.is_locked());
893 assert(m_state == STATE_REPLAYING);
894 m_local_replay->flush(ctx);
895}
896
897template <typename I>
898void ImageReplayer<I>::on_flush_local_replay_flush_finish(Context *on_flush,
899 int r)
900{
901 dout(20) << "r=" << r << dendl;
902 if (r < 0) {
903 derr << "error flushing local replay: " << cpp_strerror(r) << dendl;
904 on_flush->complete(r);
905 return;
906 }
907
908 on_flush_flush_commit_position_start(on_flush);
909}
910
911template <typename I>
912void ImageReplayer<I>::on_flush_flush_commit_position_start(Context *on_flush)
913{
914 FunctionContext *ctx = new FunctionContext(
915 [this, on_flush](int r) {
916 on_flush_flush_commit_position_finish(on_flush, r);
917 });
918
919 m_remote_journaler->flush_commit_position(ctx);
920}
921
922template <typename I>
923void ImageReplayer<I>::on_flush_flush_commit_position_finish(Context *on_flush,
924 int r)
925{
926 if (r < 0) {
927 derr << "error flushing remote journal commit position: "
928 << cpp_strerror(r) << dendl;
929 }
930
931 update_mirror_image_status(false, boost::none);
932
933 dout(20) << "flush complete, r=" << r << dendl;
934 on_flush->complete(r);
935}
936
937template <typename I>
938bool ImageReplayer<I>::on_replay_interrupted()
939{
940 bool shut_down;
941 {
942 Mutex::Locker locker(m_lock);
943 shut_down = m_stop_requested;
944 }
945
946 if (shut_down) {
947 on_stop_journal_replay();
948 }
949 return shut_down;
950}
951
952template <typename I>
953void ImageReplayer<I>::print_status(Formatter *f, stringstream *ss)
954{
955 dout(20) << "enter" << dendl;
956
957 Mutex::Locker l(m_lock);
958
959 if (f) {
960 f->open_object_section("image_replayer");
961 f->dump_string("name", m_name);
962 f->dump_string("state", to_string(m_state));
963 f->close_section();
964 f->flush(*ss);
965 } else {
966 *ss << m_name << ": state: " << to_string(m_state);
967 }
968}
969
970template <typename I>
971void ImageReplayer<I>::handle_replay_complete(int r, const std::string &error_desc)
972{
973 dout(20) << "r=" << r << dendl;
974 if (r < 0) {
975 derr << "replay encountered an error: " << cpp_strerror(r) << dendl;
976 set_state_description(r, error_desc);
977 }
978
979 {
980 Mutex::Locker locker(m_lock);
981 m_stop_requested = true;
982 }
983 on_replay_interrupted();
984}
985
986template <typename I>
987void ImageReplayer<I>::replay_flush() {
988 dout(20) << dendl;
989
990 bool interrupted = false;
991 {
992 Mutex::Locker locker(m_lock);
993 if (m_state != STATE_REPLAYING) {
994 dout(20) << "replay interrupted" << dendl;
995 interrupted = true;
996 } else {
997 m_state = STATE_REPLAY_FLUSHING;
998 }
999 }
1000
1001 if (interrupted) {
1002 m_event_replay_tracker.finish_op();
1003 return;
1004 }
1005
1006 // shut down the replay to flush all IO and ops and create a new
1007 // replayer to handle the new tag epoch
1008 Context *ctx = create_context_callback<
1009 ImageReplayer<I>, &ImageReplayer<I>::handle_replay_flush>(this);
1010 ctx = new FunctionContext([this, ctx](int r) {
1011 m_local_image_ctx->journal->stop_external_replay();
1012 m_local_replay = nullptr;
1013
1014 if (r < 0) {
1015 ctx->complete(r);
1016 return;
1017 }
1018
1019 m_local_journal->start_external_replay(&m_local_replay, ctx);
1020 });
1021 m_local_replay->shut_down(false, ctx);
1022}
1023
1024template <typename I>
1025void ImageReplayer<I>::handle_replay_flush(int r) {
1026 dout(20) << "r=" << r << dendl;
1027
1028 {
1029 Mutex::Locker locker(m_lock);
1030 assert(m_state == STATE_REPLAY_FLUSHING);
1031 m_state = STATE_REPLAYING;
1032 }
1033
1034 if (r < 0) {
1035 derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl;
1036 m_event_replay_tracker.finish_op();
1037 handle_replay_complete(r, "replay flush encountered an error");
1038 return;
1039 } else if (on_replay_interrupted()) {
1040 m_event_replay_tracker.finish_op();
1041 return;
1042 }
1043
1044 get_remote_tag();
1045}
1046
1047template <typename I>
1048void ImageReplayer<I>::get_remote_tag() {
1049 dout(20) << "tag_tid: " << m_replay_tag_tid << dendl;
1050
1051 Context *ctx = create_context_callback<
1052 ImageReplayer, &ImageReplayer<I>::handle_get_remote_tag>(this);
1053 m_remote_journaler->get_tag(m_replay_tag_tid, &m_replay_tag, ctx);
1054}
1055
1056template <typename I>
1057void ImageReplayer<I>::handle_get_remote_tag(int r) {
1058 dout(20) << "r=" << r << dendl;
1059
1060 if (r == 0) {
1061 try {
1062 bufferlist::iterator it = m_replay_tag.data.begin();
1063 ::decode(m_replay_tag_data, it);
1064 } catch (const buffer::error &err) {
1065 r = -EBADMSG;
1066 }
1067 }
1068
1069 if (r < 0) {
1070 derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": "
1071 << cpp_strerror(r) << dendl;
1072 m_event_replay_tracker.finish_op();
1073 handle_replay_complete(r, "failed to retrieve remote tag");
1074 return;
1075 }
1076
1077 m_replay_tag_valid = true;
1078 dout(20) << "decoded remote tag " << m_replay_tag_tid << ": "
1079 << m_replay_tag_data << dendl;
1080
1081 allocate_local_tag();
1082}
1083
1084template <typename I>
1085void ImageReplayer<I>::allocate_local_tag() {
28e407b8 1086 dout(15) << dendl;
7c673cae
FG
1087
1088 std::string mirror_uuid = m_replay_tag_data.mirror_uuid;
28e407b8 1089 if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
7c673cae 1090 mirror_uuid = m_remote_image.mirror_uuid;
28e407b8
AA
1091 } else if (mirror_uuid == m_local_mirror_uuid) {
1092 mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
7c673cae 1093 } else if (mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) {
28e407b8
AA
1094 // handle possible edge condition where daemon can failover and
1095 // the local image has already been promoted/demoted
1096 auto local_tag_data = m_local_journal->get_tag_data();
1097 if (local_tag_data.mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID &&
1098 (local_tag_data.predecessor.commit_valid &&
1099 local_tag_data.predecessor.mirror_uuid ==
1100 librbd::Journal<>::LOCAL_MIRROR_UUID)) {
1101 dout(15) << "skipping stale demotion event" << dendl;
1102 handle_process_entry_safe(m_replay_entry, 0);
1103 handle_replay_ready();
1104 return;
1105 } else {
1106 dout(5) << "encountered image demotion: stopping" << dendl;
1107 Mutex::Locker locker(m_lock);
1108 m_stop_requested = true;
1109 }
7c673cae
FG
1110 }
1111
1112 librbd::journal::TagPredecessor predecessor(m_replay_tag_data.predecessor);
1113 if (predecessor.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
1114 predecessor.mirror_uuid = m_remote_image.mirror_uuid;
1115 } else if (predecessor.mirror_uuid == m_local_mirror_uuid) {
1116 predecessor.mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
1117 }
1118
28e407b8
AA
1119 dout(15) << "mirror_uuid=" << mirror_uuid << ", "
1120 << "predecessor=" << predecessor << ", "
1121 << "replay_tag_tid=" << m_replay_tag_tid << dendl;
7c673cae
FG
1122 Context *ctx = create_context_callback<
1123 ImageReplayer, &ImageReplayer<I>::handle_allocate_local_tag>(this);
1124 m_local_journal->allocate_tag(mirror_uuid, predecessor, ctx);
1125}
1126
1127template <typename I>
1128void ImageReplayer<I>::handle_allocate_local_tag(int r) {
28e407b8
AA
1129 dout(15) << "r=" << r << ", "
1130 << "tag_tid=" << m_local_journal->get_tag_tid() << dendl;
7c673cae
FG
1131
1132 if (r < 0) {
1133 derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl;
1134 m_event_replay_tracker.finish_op();
1135 handle_replay_complete(r, "failed to allocate journal tag");
1136 return;
1137 }
1138
1139 preprocess_entry();
1140}
1141
1142template <typename I>
1143void ImageReplayer<I>::preprocess_entry() {
1144 dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid()
1145 << dendl;
1146
1147 bufferlist data = m_replay_entry.get_data();
1148 bufferlist::iterator it = data.begin();
1149 int r = m_local_replay->decode(&it, &m_event_entry);
1150 if (r < 0) {
1151 derr << "failed to decode journal event" << dendl;
1152 m_event_replay_tracker.finish_op();
1153 handle_replay_complete(r, "failed to decode journal event");
1154 return;
1155 }
1156
1157 uint32_t delay = calculate_replay_delay(
1158 m_event_entry.timestamp, m_local_image_ctx->mirroring_replay_delay);
1159 if (delay == 0) {
1160 handle_preprocess_entry_ready(0);
1161 return;
1162 }
1163
1164 dout(20) << "delaying replay by " << delay << " sec" << dendl;
1165
1166 Mutex::Locker timer_locker(m_threads->timer_lock);
1167 assert(m_delayed_preprocess_task == nullptr);
1168 m_delayed_preprocess_task = new FunctionContext(
1169 [this](int r) {
1170 assert(m_threads->timer_lock.is_locked());
1171 m_delayed_preprocess_task = nullptr;
1172 m_threads->work_queue->queue(
1173 create_context_callback<ImageReplayer,
1174 &ImageReplayer<I>::handle_preprocess_entry_ready>(this), 0);
1175 });
1176 m_threads->timer->add_event_after(delay, m_delayed_preprocess_task);
1177}
1178
1179template <typename I>
1180void ImageReplayer<I>::handle_preprocess_entry_ready(int r) {
1181 dout(20) << "r=" << r << dendl;
1182 assert(r == 0);
1183
1184 if (!m_event_preprocessor->is_required(m_event_entry)) {
1185 process_entry();
1186 return;
1187 }
1188
1189 Context *ctx = create_context_callback<
1190 ImageReplayer, &ImageReplayer<I>::handle_preprocess_entry_safe>(this);
1191 m_event_preprocessor->preprocess(&m_event_entry, ctx);
1192}
1193
1194template <typename I>
1195void ImageReplayer<I>::handle_preprocess_entry_safe(int r) {
1196 dout(20) << "r=" << r << dendl;
1197
1198 if (r < 0) {
7c673cae 1199 m_event_replay_tracker.finish_op();
31f18b77
FG
1200
1201 if (r == -ECANCELED) {
1202 handle_replay_complete(0, "lost exclusive lock");
1203 } else {
1204 derr << "failed to preprocess journal event" << dendl;
1205 handle_replay_complete(r, "failed to preprocess journal event");
1206 }
7c673cae
FG
1207 return;
1208 }
1209
1210 process_entry();
1211}
1212
1213template <typename I>
1214void ImageReplayer<I>::process_entry() {
1215 dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid()
1216 << dendl;
1217
1218 // stop replaying events if stop has been requested
1219 if (on_replay_interrupted()) {
1220 m_event_replay_tracker.finish_op();
1221 return;
1222 }
1223
1224 Context *on_ready = create_context_callback<
1225 ImageReplayer, &ImageReplayer<I>::handle_process_entry_ready>(this);
1226 Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry));
1227
1228 m_local_replay->process(m_event_entry, on_ready, on_commit);
1229}
1230
1231template <typename I>
1232void ImageReplayer<I>::handle_process_entry_ready(int r) {
1233 dout(20) << dendl;
1234 assert(r == 0);
1235
28e407b8
AA
1236 bool update_status = false;
1237 {
1238 RWLock::RLocker snap_locker(m_local_image_ctx->snap_lock);
1239 if (m_local_image_name != m_local_image_ctx->name) {
1240 m_local_image_name = m_local_image_ctx->name;
1241 update_status = true;
1242 }
1243 }
1244
1245 if (update_status) {
1246 reschedule_update_status_task(0);
1247 }
3efd9988 1248
7c673cae
FG
1249 // attempt to process the next event
1250 handle_replay_ready();
1251}
1252
1253template <typename I>
1254void ImageReplayer<I>::handle_process_entry_safe(const ReplayEntry& replay_entry,
1255 int r) {
1256 dout(20) << "commit_tid=" << replay_entry.get_commit_tid() << ", r=" << r
1257 << dendl;
1258
1259 if (r < 0) {
1260 derr << "failed to commit journal event: " << cpp_strerror(r) << dendl;
1261 handle_replay_complete(r, "failed to commit journal event");
1262 } else {
1263 assert(m_remote_journaler != nullptr);
1264 m_remote_journaler->committed(replay_entry);
1265 }
1266 m_event_replay_tracker.finish_op();
1267}
1268
1269template <typename I>
1270bool ImageReplayer<I>::update_mirror_image_status(bool force,
1271 const OptionalState &state) {
1272 dout(20) << dendl;
1273 {
1274 Mutex::Locker locker(m_lock);
1275 if (!start_mirror_image_status_update(force, false)) {
1276 return false;
1277 }
1278 }
1279
1280 queue_mirror_image_status_update(state);
1281 return true;
1282}
1283
1284template <typename I>
1285bool ImageReplayer<I>::start_mirror_image_status_update(bool force,
1286 bool restarting) {
1287 assert(m_lock.is_locked());
1288
1289 if (!force && !is_stopped_()) {
1290 if (!is_running_()) {
1291 dout(20) << "shut down in-progress: ignoring update" << dendl;
1292 return false;
1293 } else if (m_in_flight_status_updates > (restarting ? 1 : 0)) {
1294 dout(20) << "already sending update" << dendl;
1295 m_update_status_requested = true;
1296 return false;
1297 }
1298 }
1299
1300 dout(20) << dendl;
1301 ++m_in_flight_status_updates;
1302 return true;
1303}
1304
1305template <typename I>
1306void ImageReplayer<I>::finish_mirror_image_status_update() {
28e407b8
AA
1307 reregister_admin_socket_hook();
1308
7c673cae
FG
1309 Context *on_finish = nullptr;
1310 {
1311 Mutex::Locker locker(m_lock);
1312 assert(m_in_flight_status_updates > 0);
1313 if (--m_in_flight_status_updates > 0) {
1314 dout(20) << "waiting on " << m_in_flight_status_updates << " in-flight "
1315 << "updates" << dendl;
1316 return;
1317 }
1318
1319 std::swap(on_finish, m_on_update_status_finish);
1320 }
1321
1322 dout(20) << dendl;
1323 if (on_finish != nullptr) {
1324 on_finish->complete(0);
1325 }
1326}
1327
1328template <typename I>
1329void ImageReplayer<I>::queue_mirror_image_status_update(const OptionalState &state) {
1330 dout(20) << dendl;
1331 FunctionContext *ctx = new FunctionContext(
1332 [this, state](int r) {
1333 send_mirror_status_update(state);
1334 });
1335 m_threads->work_queue->queue(ctx, 0);
1336}
1337
1338template <typename I>
1339void ImageReplayer<I>::send_mirror_status_update(const OptionalState &opt_state) {
1340 State state;
1341 std::string state_desc;
1342 int last_r;
7c673cae 1343 bool stopping_replay;
c07f9fc5
FG
1344
1345 OptionalMirrorImageStatusState mirror_image_status_state{
1346 boost::make_optional(false, cls::rbd::MirrorImageStatusState{})};
1347 image_replayer::BootstrapRequest<I>* bootstrap_request = nullptr;
7c673cae
FG
1348 {
1349 Mutex::Locker locker(m_lock);
1350 state = m_state;
1351 state_desc = m_state_desc;
c07f9fc5 1352 mirror_image_status_state = m_mirror_image_status_state;
7c673cae 1353 last_r = m_last_r;
7c673cae 1354 stopping_replay = (m_local_image_ctx != nullptr);
c07f9fc5
FG
1355
1356 if (m_bootstrap_request != nullptr) {
1357 bootstrap_request = m_bootstrap_request;
1358 bootstrap_request->get();
1359 }
1360 }
1361
1362 bool syncing = false;
1363 if (bootstrap_request != nullptr) {
1364 syncing = bootstrap_request->is_syncing();
1365 bootstrap_request->put();
1366 bootstrap_request = nullptr;
7c673cae
FG
1367 }
1368
1369 if (opt_state) {
1370 state = *opt_state;
1371 }
1372
1373 cls::rbd::MirrorImageStatus status;
1374 status.up = true;
1375 switch (state) {
1376 case STATE_STARTING:
c07f9fc5 1377 if (syncing) {
7c673cae
FG
1378 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING;
1379 status.description = state_desc.empty() ? "syncing" : state_desc;
c07f9fc5 1380 mirror_image_status_state = status.state;
7c673cae
FG
1381 } else {
1382 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY;
1383 status.description = "starting replay";
1384 }
1385 break;
1386 case STATE_REPLAYING:
1387 case STATE_REPLAY_FLUSHING:
1388 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
1389 {
1390 Context *on_req_finish = new FunctionContext(
1391 [this](int r) {
1392 dout(20) << "replay status ready: r=" << r << dendl;
1393 if (r >= 0) {
1394 send_mirror_status_update(boost::none);
1395 } else if (r == -EAGAIN) {
1396 // decrement in-flight status update counter
1397 handle_mirror_status_update(r);
1398 }
1399 });
1400
1401 std::string desc;
1402 if (!m_replay_status_formatter->get_or_send_update(&desc,
1403 on_req_finish)) {
1404 dout(20) << "waiting for replay status" << dendl;
1405 return;
1406 }
1407 status.description = "replaying, " + desc;
c07f9fc5 1408 mirror_image_status_state = boost::none;
7c673cae
FG
1409 }
1410 break;
1411 case STATE_STOPPING:
1412 if (stopping_replay) {
1413 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY;
1414 status.description = "stopping replay";
1415 break;
1416 }
1417 // FALLTHROUGH
1418 case STATE_STOPPED:
c07f9fc5
FG
1419 if (last_r == -EREMOTEIO) {
1420 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN;
1421 status.description = state_desc;
1422 mirror_image_status_state = status.state;
1423 } else if (last_r < 0) {
7c673cae
FG
1424 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR;
1425 status.description = state_desc;
c07f9fc5 1426 mirror_image_status_state = status.state;
7c673cae
FG
1427 } else {
1428 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED;
1429 status.description = state_desc.empty() ? "stopped" : state_desc;
c07f9fc5 1430 mirror_image_status_state = boost::none;
7c673cae
FG
1431 }
1432 break;
1433 default:
1434 assert(!"invalid state");
1435 }
1436
c07f9fc5
FG
1437 {
1438 Mutex::Locker locker(m_lock);
1439 m_mirror_image_status_state = mirror_image_status_state;
1440 }
1441
1442 // prevent the status from ping-ponging when failed replays are restarted
1443 if (mirror_image_status_state &&
1444 *mirror_image_status_state == cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR) {
1445 status.state = *mirror_image_status_state;
1446 }
1447
7c673cae
FG
1448 dout(20) << "status=" << status << dendl;
1449 librados::ObjectWriteOperation op;
1450 librbd::cls_client::mirror_image_status_set(&op, m_global_image_id, status);
1451
1452 librados::AioCompletion *aio_comp = create_rados_callback<
1453 ImageReplayer<I>, &ImageReplayer<I>::handle_mirror_status_update>(this);
1454 int r = m_local_ioctx.aio_operate(RBD_MIRRORING, aio_comp, &op);
1455 assert(r == 0);
1456 aio_comp->release();
1457}
1458
1459template <typename I>
1460void ImageReplayer<I>::handle_mirror_status_update(int r) {
1461 dout(20) << "r=" << r << dendl;
1462
1463 bool running = false;
1464 bool started = false;
1465 {
1466 Mutex::Locker locker(m_lock);
1467 bool update_status_requested = false;
1468 std::swap(update_status_requested, m_update_status_requested);
1469
1470 running = is_running_();
1471 if (running && update_status_requested) {
1472 started = start_mirror_image_status_update(false, true);
1473 }
1474 }
1475
1476 // if a deferred update is available, send it -- otherwise reschedule
1477 // the timer task
1478 if (started) {
1479 queue_mirror_image_status_update(boost::none);
1480 } else if (running) {
1481 reschedule_update_status_task();
1482 }
1483
1484 // mark committed status update as no longer in-flight
1485 finish_mirror_image_status_update();
1486}
1487
1488template <typename I>
1489void ImageReplayer<I>::reschedule_update_status_task(int new_interval) {
1490 dout(20) << dendl;
1491
1492 bool canceled_task = false;
1493 {
1494 Mutex::Locker locker(m_lock);
1495 Mutex::Locker timer_locker(m_threads->timer_lock);
1496
1497 if (m_update_status_task) {
1498 canceled_task = m_threads->timer->cancel_event(m_update_status_task);
1499 m_update_status_task = nullptr;
1500 }
1501
1502 if (new_interval > 0) {
1503 m_update_status_interval = new_interval;
1504 }
1505
1506 bool restarting = (new_interval == 0 || canceled_task);
1507 if (new_interval >= 0 && is_running_() &&
1508 start_mirror_image_status_update(false, restarting)) {
1509 m_update_status_task = new FunctionContext(
1510 [this](int r) {
1511 assert(m_threads->timer_lock.is_locked());
1512 m_update_status_task = nullptr;
1513
1514 queue_mirror_image_status_update(boost::none);
1515 });
1516 m_threads->timer->add_event_after(m_update_status_interval,
1517 m_update_status_task);
1518 }
1519 }
1520
1521 if (canceled_task) {
1522 dout(20) << "canceled task" << dendl;
1523 finish_mirror_image_status_update();
1524 }
1525}
1526
1527template <typename I>
1528void ImageReplayer<I>::shut_down(int r) {
1529 dout(20) << "r=" << r << dendl;
3efd9988
FG
1530
1531 bool canceled_delayed_preprocess_task = false;
1532 {
1533 Mutex::Locker timer_locker(m_threads->timer_lock);
1534 if (m_delayed_preprocess_task != nullptr) {
1535 canceled_delayed_preprocess_task = m_threads->timer->cancel_event(
1536 m_delayed_preprocess_task);
1537 assert(canceled_delayed_preprocess_task);
1538 m_delayed_preprocess_task = nullptr;
1539 }
1540 }
1541 if (canceled_delayed_preprocess_task) {
1542 // wake up sleeping replay
1543 m_event_replay_tracker.finish_op();
1544 }
1545
7c673cae
FG
1546 {
1547 Mutex::Locker locker(m_lock);
1548 assert(m_state == STATE_STOPPING);
1549
1550 // if status updates are in-flight, wait for them to complete
1551 // before proceeding
1552 if (m_in_flight_status_updates > 0) {
1553 if (m_on_update_status_finish == nullptr) {
1554 dout(20) << "waiting for in-flight status update" << dendl;
1555 m_on_update_status_finish = new FunctionContext(
1556 [this, r](int _r) {
1557 shut_down(r);
1558 });
1559 }
1560 return;
1561 }
1562 }
1563
31f18b77
FG
1564 // NOTE: it's important to ensure that the local image is fully
1565 // closed before attempting to close the remote journal in
1566 // case the remote cluster is unreachable
1567
7c673cae
FG
1568 // chain the shut down sequence (reverse order)
1569 Context *ctx = new FunctionContext(
1570 [this, r](int _r) {
1571 update_mirror_image_status(true, STATE_STOPPED);
1572 handle_shut_down(r);
1573 });
31f18b77
FG
1574
1575 // close the remote journal
7c673cae
FG
1576 if (m_remote_journaler != nullptr) {
1577 ctx = new FunctionContext([this, ctx](int r) {
1578 delete m_remote_journaler;
1579 m_remote_journaler = nullptr;
1580 ctx->complete(0);
1581 });
1582 ctx = new FunctionContext([this, ctx](int r) {
1583 m_remote_journaler->remove_listener(&m_remote_listener);
1584 m_remote_journaler->shut_down(ctx);
1585 });
7c673cae 1586 }
31f18b77
FG
1587
1588 // stop the replay of remote journal events
1589 if (m_replay_handler != nullptr) {
1590 ctx = new FunctionContext([this, ctx](int r) {
1591 delete m_replay_handler;
1592 m_replay_handler = nullptr;
1593
1594 m_event_replay_tracker.wait_for_ops(ctx);
1595 });
1596 ctx = new FunctionContext([this, ctx](int r) {
1597 m_remote_journaler->stop_replay(ctx);
1598 });
1599 }
1600
1601 // close the local image (release exclusive lock)
1602 if (m_local_image_ctx) {
1603 ctx = new FunctionContext([this, ctx](int r) {
1604 CloseImageRequest<I> *request = CloseImageRequest<I>::create(
1605 &m_local_image_ctx, ctx);
1606 request->send();
1607 });
1608 }
1609
1610 // shut down event replay into the local image
7c673cae
FG
1611 if (m_local_journal != nullptr) {
1612 ctx = new FunctionContext([this, ctx](int r) {
1613 m_local_journal = nullptr;
1614 ctx->complete(0);
1615 });
1616 if (m_local_replay != nullptr) {
1617 ctx = new FunctionContext([this, ctx](int r) {
1618 m_local_journal->stop_external_replay();
1619 m_local_replay = nullptr;
1620
1621 EventPreprocessor<I>::destroy(m_event_preprocessor);
1622 m_event_preprocessor = nullptr;
1623 ctx->complete(0);
1624 });
1625 }
1626 ctx = new FunctionContext([this, ctx](int r) {
7c673cae
FG
1627 // blocks if listener notification is in-progress
1628 m_local_journal->remove_listener(m_journal_listener);
7c673cae
FG
1629 ctx->complete(0);
1630 });
31f18b77
FG
1631 }
1632
1633 // wait for all local in-flight replay events to complete
1634 ctx = new FunctionContext([this, ctx](int r) {
1635 if (r < 0) {
1636 derr << "error shutting down journal replay: " << cpp_strerror(r)
1637 << dendl;
1638 }
1639
1640 m_event_replay_tracker.wait_for_ops(ctx);
1641 });
1642
1643 // flush any local in-flight replay events
1644 if (m_local_replay != nullptr) {
7c673cae 1645 ctx = new FunctionContext([this, ctx](int r) {
31f18b77 1646 m_local_replay->shut_down(true, ctx);
7c673cae
FG
1647 });
1648 }
31f18b77 1649
7c673cae
FG
1650 m_threads->work_queue->queue(ctx, 0);
1651}
1652
1653template <typename I>
1654void ImageReplayer<I>::handle_shut_down(int r) {
1655 reschedule_update_status_task(-1);
1656
3efd9988 1657 bool unregister_asok_hook = false;
7c673cae
FG
1658 {
1659 Mutex::Locker locker(m_lock);
1660
1661 // if status updates are in-flight, wait for them to complete
1662 // before proceeding
1663 if (m_in_flight_status_updates > 0) {
1664 if (m_on_update_status_finish == nullptr) {
1665 dout(20) << "waiting for in-flight status update" << dendl;
1666 m_on_update_status_finish = new FunctionContext(
1667 [this, r](int _r) {
1668 handle_shut_down(r);
1669 });
1670 }
1671 return;
1672 }
1673
d2e6a577
FG
1674 bool delete_requested = false;
1675 if (m_delete_requested && !m_local_image_id.empty()) {
1676 assert(m_remote_image.image_id.empty());
1677 dout(0) << "remote image no longer exists: scheduling deletion" << dendl;
1678 delete_requested = true;
1679 }
1680 if (delete_requested || m_resync_requested) {
7c673cae
FG
1681 m_image_deleter->schedule_image_delete(m_local,
1682 m_local_pool_id,
c07f9fc5 1683 m_global_image_id,
d2e6a577
FG
1684 m_resync_requested);
1685
1686 m_local_image_id = "";
1687 m_resync_requested = false;
1688 if (m_delete_requested) {
3efd9988 1689 unregister_asok_hook = true;
d2e6a577
FG
1690 m_delete_requested = false;
1691 }
1692 } else if (m_last_r == -ENOENT &&
1693 m_local_image_id.empty() && m_remote_image.image_id.empty()) {
1694 dout(0) << "mirror image no longer exists" << dendl;
3efd9988 1695 unregister_asok_hook = true;
d2e6a577 1696 m_finished = true;
7c673cae
FG
1697 }
1698 }
1699
3efd9988
FG
1700 if (unregister_asok_hook) {
1701 unregister_admin_socket_hook();
1702 }
1703
7c673cae
FG
1704 dout(20) << "stop complete" << dendl;
1705 m_local_ioctx.close();
1706
1707 ReplayStatusFormatter<I>::destroy(m_replay_status_formatter);
1708 m_replay_status_formatter = nullptr;
1709
1710 Context *on_start = nullptr;
1711 Context *on_stop = nullptr;
1712 {
1713 Mutex::Locker locker(m_lock);
1714 std::swap(on_start, m_on_start_finish);
1715 std::swap(on_stop, m_on_stop_finish);
1716 m_stop_requested = false;
1717 assert(m_delayed_preprocess_task == nullptr);
1718 assert(m_state == STATE_STOPPING);
1719 m_state = STATE_STOPPED;
1720 }
1721
1722 if (on_start != nullptr) {
1723 dout(20) << "on start finish complete, r=" << r << dendl;
1724 on_start->complete(r);
1725 r = 0;
1726 }
1727 if (on_stop != nullptr) {
1728 dout(20) << "on stop finish complete, r=" << r << dendl;
1729 on_stop->complete(r);
1730 }
1731}
1732
1733template <typename I>
1734void ImageReplayer<I>::handle_remote_journal_metadata_updated() {
1735 dout(20) << dendl;
1736
1737 cls::journal::Client client;
1738 {
1739 Mutex::Locker locker(m_lock);
1740 if (!is_running_()) {
1741 return;
1742 }
1743
1744 int r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
1745 if (r < 0) {
1746 derr << "failed to retrieve client: " << cpp_strerror(r) << dendl;
1747 return;
1748 }
1749 }
1750
1751 if (client.state != cls::journal::CLIENT_STATE_CONNECTED) {
1752 dout(0) << "client flagged disconnected, stopping image replay" << dendl;
1753 stop(nullptr, false, -ENOTCONN, "disconnected");
1754 }
1755}
1756
1757template <typename I>
1758std::string ImageReplayer<I>::to_string(const State state) {
1759 switch (state) {
1760 case ImageReplayer<I>::STATE_STARTING:
1761 return "Starting";
1762 case ImageReplayer<I>::STATE_REPLAYING:
1763 return "Replaying";
1764 case ImageReplayer<I>::STATE_REPLAY_FLUSHING:
1765 return "ReplayFlushing";
1766 case ImageReplayer<I>::STATE_STOPPING:
1767 return "Stopping";
1768 case ImageReplayer<I>::STATE_STOPPED:
1769 return "Stopped";
1770 default:
1771 break;
1772 }
1773 return "Unknown(" + stringify(state) + ")";
1774}
1775
1776template <typename I>
1777void ImageReplayer<I>::resync_image(Context *on_finish) {
1778 dout(20) << dendl;
1779
d2e6a577
FG
1780 m_resync_requested = true;
1781 stop(on_finish);
1782}
1783
1784template <typename I>
1785void ImageReplayer<I>::register_admin_socket_hook() {
3efd9988
FG
1786 ImageReplayerAdminSocketHook<I> *asok_hook;
1787 {
1788 Mutex::Locker locker(m_lock);
1789 if (m_asok_hook != nullptr) {
1790 return;
1791 }
7c673cae 1792
28e407b8 1793 dout(15) << "registered asok hook: " << m_name << dendl;
3efd9988
FG
1794 asok_hook = new ImageReplayerAdminSocketHook<I>(g_ceph_context, m_name,
1795 this);
1796 int r = asok_hook->register_commands();
1797 if (r == 0) {
1798 m_asok_hook = asok_hook;
1799 return;
1800 }
d2e6a577 1801 derr << "error registering admin socket commands" << dendl;
d2e6a577 1802 }
3efd9988 1803 delete asok_hook;
d2e6a577
FG
1804}
1805
1806template <typename I>
1807void ImageReplayer<I>::unregister_admin_socket_hook() {
28e407b8 1808 dout(15) << dendl;
d2e6a577 1809
3efd9988
FG
1810 AdminSocketHook *asok_hook = nullptr;
1811 {
1812 Mutex::Locker locker(m_lock);
1813 std::swap(asok_hook, m_asok_hook);
1814 }
1815 delete asok_hook;
1816}
1817
1818template <typename I>
28e407b8 1819void ImageReplayer<I>::reregister_admin_socket_hook() {
3efd9988
FG
1820 {
1821 Mutex::Locker locker(m_lock);
28e407b8 1822 auto name = m_local_ioctx.get_pool_name() + "/" + m_local_image_name;
3efd9988
FG
1823 if (m_name == name) {
1824 return;
1825 }
1826 m_name = name;
1827 }
1828 unregister_admin_socket_hook();
1829 register_admin_socket_hook();
7c673cae
FG
1830}
1831
1832template <typename I>
1833std::ostream &operator<<(std::ostream &os, const ImageReplayer<I> &replayer)
1834{
1835 os << "ImageReplayer: " << &replayer << " [" << replayer.get_local_pool_id()
1836 << "/" << replayer.get_global_image_id() << "]";
1837 return os;
1838}
1839
1840} // namespace mirror
1841} // namespace rbd
1842
1843template class rbd::mirror::ImageReplayer<librbd::ImageCtx>;