]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/ImageReplayer.cc
update sources to 12.2.7
[ceph.git] / ceph / src / tools / rbd_mirror / ImageReplayer.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "include/compat.h"
5#include "common/Formatter.h"
6#include "common/debug.h"
7#include "common/errno.h"
8#include "include/stringify.h"
9#include "cls/rbd/cls_rbd_client.h"
10#include "common/Timer.h"
11#include "common/WorkQueue.h"
12#include "global/global_context.h"
13#include "journal/Journaler.h"
14#include "journal/ReplayHandler.h"
15#include "journal/Settings.h"
16#include "librbd/ExclusiveLock.h"
17#include "librbd/ImageCtx.h"
18#include "librbd/ImageState.h"
19#include "librbd/Journal.h"
20#include "librbd/Operations.h"
21#include "librbd/Utils.h"
22#include "librbd/journal/Replay.h"
d2e6a577 23#include "ImageDeleter.h"
7c673cae 24#include "ImageReplayer.h"
7c673cae
FG
25#include "Threads.h"
26#include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
27#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
28#include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
29#include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
d2e6a577 30#include "tools/rbd_mirror/image_replayer/PrepareRemoteImageRequest.h"
7c673cae
FG
31#include "tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h"
32
33#define dout_context g_ceph_context
34#define dout_subsys ceph_subsys_rbd_mirror
35#undef dout_prefix
36#define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
37 << __func__ << ": "
38
39using std::map;
40using std::string;
41using std::unique_ptr;
42using std::shared_ptr;
43using std::vector;
44
45namespace rbd {
46namespace mirror {
47
48using librbd::util::create_context_callback;
49using librbd::util::create_rados_callback;
50using namespace rbd::mirror::image_replayer;
51
52template <typename I>
53std::ostream &operator<<(std::ostream &os,
54 const typename ImageReplayer<I>::State &state);
55
56namespace {
57
58template <typename I>
59struct ReplayHandler : public ::journal::ReplayHandler {
60 ImageReplayer<I> *replayer;
61 ReplayHandler(ImageReplayer<I> *replayer) : replayer(replayer) {}
62 void get() override {}
63 void put() override {}
64
65 void handle_entries_available() override {
66 replayer->handle_replay_ready();
67 }
68 void handle_complete(int r) override {
69 std::stringstream ss;
70 if (r < 0) {
71 ss << "replay completed with error: " << cpp_strerror(r);
72 }
73 replayer->handle_replay_complete(r, ss.str());
74 }
75};
76
3efd9988 77template <typename I>
7c673cae
FG
78class ImageReplayerAdminSocketCommand {
79public:
3efd9988
FG
80 ImageReplayerAdminSocketCommand(const std::string &desc,
81 ImageReplayer<I> *replayer)
82 : desc(desc), replayer(replayer) {
83 }
7c673cae
FG
84 virtual ~ImageReplayerAdminSocketCommand() {}
85 virtual bool call(Formatter *f, stringstream *ss) = 0;
3efd9988
FG
86
87 std::string desc;
88 ImageReplayer<I> *replayer;
89 bool registered = false;
7c673cae
FG
90};
91
92template <typename I>
3efd9988 93class StatusCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 94public:
3efd9988
FG
95 explicit StatusCommand(const std::string &desc, ImageReplayer<I> *replayer)
96 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
97 }
7c673cae
FG
98
99 bool call(Formatter *f, stringstream *ss) override {
3efd9988 100 this->replayer->print_status(f, ss);
7c673cae
FG
101 return true;
102 }
7c673cae
FG
103};
104
105template <typename I>
3efd9988 106class StartCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 107public:
3efd9988
FG
108 explicit StartCommand(const std::string &desc, ImageReplayer<I> *replayer)
109 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
110 }
7c673cae
FG
111
112 bool call(Formatter *f, stringstream *ss) override {
3efd9988 113 this->replayer->start(nullptr, true);
7c673cae
FG
114 return true;
115 }
7c673cae
FG
116};
117
118template <typename I>
3efd9988 119class StopCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 120public:
3efd9988
FG
121 explicit StopCommand(const std::string &desc, ImageReplayer<I> *replayer)
122 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
123 }
7c673cae
FG
124
125 bool call(Formatter *f, stringstream *ss) override {
3efd9988 126 this->replayer->stop(nullptr, true);
7c673cae
FG
127 return true;
128 }
7c673cae
FG
129};
130
131template <typename I>
3efd9988 132class RestartCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 133public:
3efd9988
FG
134 explicit RestartCommand(const std::string &desc, ImageReplayer<I> *replayer)
135 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
136 }
7c673cae
FG
137
138 bool call(Formatter *f, stringstream *ss) override {
3efd9988 139 this->replayer->restart();
7c673cae
FG
140 return true;
141 }
7c673cae
FG
142};
143
144template <typename I>
3efd9988 145class FlushCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 146public:
3efd9988
FG
147 explicit FlushCommand(const std::string &desc, ImageReplayer<I> *replayer)
148 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
149 }
7c673cae
FG
150
151 bool call(Formatter *f, stringstream *ss) override {
28e407b8 152 this->replayer->flush();
7c673cae
FG
153 return true;
154 }
7c673cae
FG
155};
156
157template <typename I>
158class ImageReplayerAdminSocketHook : public AdminSocketHook {
159public:
160 ImageReplayerAdminSocketHook(CephContext *cct, const std::string &name,
31f18b77 161 ImageReplayer<I> *replayer)
3efd9988
FG
162 : admin_socket(cct->get_admin_socket()),
163 commands{{"rbd mirror flush " + name,
164 new FlushCommand<I>("flush rbd mirror " + name, replayer)},
165 {"rbd mirror restart " + name,
166 new RestartCommand<I>("restart rbd mirror " + name, replayer)},
167 {"rbd mirror start " + name,
168 new StartCommand<I>("start rbd mirror " + name, replayer)},
169 {"rbd mirror status " + name,
170 new StatusCommand<I>("get status for rbd mirror " + name, replayer)},
171 {"rbd mirror stop " + name,
172 new StopCommand<I>("stop rbd mirror " + name, replayer)}} {
d2e6a577
FG
173 }
174
175 int register_commands() {
3efd9988
FG
176 for (auto &it : commands) {
177 int r = admin_socket->register_command(it.first, it.first, this,
178 it.second->desc);
179 if (r < 0) {
180 return r;
181 }
182 it.second->registered = true;
7c673cae 183 }
d2e6a577 184 return 0;
7c673cae
FG
185 }
186
187 ~ImageReplayerAdminSocketHook() override {
3efd9988
FG
188 for (auto &it : commands) {
189 if (it.second->registered) {
190 admin_socket->unregister_command(it.first);
191 }
192 delete it.second;
7c673cae 193 }
31f18b77 194 commands.clear();
7c673cae
FG
195 }
196
197 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
198 bufferlist& out) override {
3efd9988 199 auto i = commands.find(command);
7c673cae
FG
200 assert(i != commands.end());
201 Formatter *f = Formatter::create(format);
202 stringstream ss;
203 bool r = i->second->call(f, &ss);
204 delete f;
205 out.append(ss);
206 return r;
207 }
208
209private:
3efd9988 210 typedef std::map<std::string, ImageReplayerAdminSocketCommand<I> *> Commands;
7c673cae
FG
211
212 AdminSocket *admin_socket;
213 Commands commands;
214};
215
216uint32_t calculate_replay_delay(const utime_t &event_time,
217 int mirroring_replay_delay) {
218 if (mirroring_replay_delay <= 0) {
219 return 0;
220 }
221
222 utime_t now = ceph_clock_now();
223 if (event_time + mirroring_replay_delay <= now) {
224 return 0;
225 }
226
227 // ensure it is rounded up when converting to integer
228 return (event_time + mirroring_replay_delay - now) + 1;
229}
230
231} // anonymous namespace
232
233template <typename I>
234void ImageReplayer<I>::BootstrapProgressContext::update_progress(
235 const std::string &description, bool flush)
236{
237 const std::string desc = "bootstrapping, " + description;
238 replayer->set_state_description(0, desc);
239 if (flush) {
240 replayer->update_mirror_image_status(false, boost::none);
241 }
242}
243
244template <typename I>
245void ImageReplayer<I>::RemoteJournalerListener::handle_update(
246 ::journal::JournalMetadata *) {
247 FunctionContext *ctx = new FunctionContext([this](int r) {
248 replayer->handle_remote_journal_metadata_updated();
249 });
250 replayer->m_threads->work_queue->queue(ctx, 0);
251}
252
253template <typename I>
d2e6a577 254ImageReplayer<I>::ImageReplayer(Threads<I> *threads,
c07f9fc5 255 ImageDeleter<I>* image_deleter,
31f18b77 256 InstanceWatcher<I> *instance_watcher,
7c673cae
FG
257 RadosRef local,
258 const std::string &local_mirror_uuid,
259 int64_t local_pool_id,
260 const std::string &global_image_id) :
261 m_threads(threads),
262 m_image_deleter(image_deleter),
31f18b77 263 m_instance_watcher(instance_watcher),
7c673cae
FG
264 m_local(local),
265 m_local_mirror_uuid(local_mirror_uuid),
266 m_local_pool_id(local_pool_id),
28e407b8 267 m_global_image_id(global_image_id), m_local_image_name(global_image_id),
7c673cae
FG
268 m_lock("rbd::mirror::ImageReplayer " + stringify(local_pool_id) + " " +
269 global_image_id),
270 m_progress_cxt(this),
271 m_journal_listener(new JournalListener(this)),
272 m_remote_listener(this)
273{
274 // Register asok commands using a temporary "remote_pool_name/global_image_id"
275 // name. When the image name becomes known on start the asok commands will be
276 // re-registered using "remote_pool_name/remote_image_name" name.
277
278 std::string pool_name;
279 int r = m_local->pool_reverse_lookup(m_local_pool_id, &pool_name);
280 if (r < 0) {
281 derr << "error resolving local pool " << m_local_pool_id
282 << ": " << cpp_strerror(r) << dendl;
283 pool_name = stringify(m_local_pool_id);
284 }
285
286 m_name = pool_name + "/" + m_global_image_id;
d2e6a577 287 register_admin_socket_hook();
7c673cae
FG
288}
289
290template <typename I>
291ImageReplayer<I>::~ImageReplayer()
292{
d2e6a577 293 unregister_admin_socket_hook();
7c673cae
FG
294 assert(m_event_preprocessor == nullptr);
295 assert(m_replay_status_formatter == nullptr);
296 assert(m_local_image_ctx == nullptr);
297 assert(m_local_replay == nullptr);
298 assert(m_remote_journaler == nullptr);
299 assert(m_replay_handler == nullptr);
300 assert(m_on_start_finish == nullptr);
301 assert(m_on_stop_finish == nullptr);
302 assert(m_bootstrap_request == nullptr);
303 assert(m_in_flight_status_updates == 0);
304
305 delete m_journal_listener;
7c673cae
FG
306}
307
c07f9fc5
FG
308template <typename I>
309image_replayer::HealthState ImageReplayer<I>::get_health_state() const {
310 Mutex::Locker locker(m_lock);
311
312 if (!m_mirror_image_status_state) {
313 return image_replayer::HEALTH_STATE_OK;
314 } else if (*m_mirror_image_status_state ==
315 cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING ||
316 *m_mirror_image_status_state ==
317 cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN) {
318 return image_replayer::HEALTH_STATE_WARNING;
319 }
320 return image_replayer::HEALTH_STATE_ERROR;
321}
322
7c673cae 323template <typename I>
d2e6a577
FG
324void ImageReplayer<I>::add_peer(const std::string &peer_uuid,
325 librados::IoCtx &io_ctx) {
7c673cae 326 Mutex::Locker locker(m_lock);
d2e6a577
FG
327 auto it = m_peers.find({peer_uuid});
328 if (it == m_peers.end()) {
329 m_peers.insert({peer_uuid, io_ctx});
7c673cae
FG
330 }
331}
332
7c673cae
FG
333template <typename I>
334void ImageReplayer<I>::set_state_description(int r, const std::string &desc) {
335 dout(20) << r << " " << desc << dendl;
336
337 Mutex::Locker l(m_lock);
338 m_last_r = r;
339 m_state_desc = desc;
340}
341
342template <typename I>
343void ImageReplayer<I>::start(Context *on_finish, bool manual)
344{
345 dout(20) << "on_finish=" << on_finish << dendl;
346
347 int r = 0;
348 {
349 Mutex::Locker locker(m_lock);
350 if (!is_stopped_()) {
351 derr << "already running" << dendl;
352 r = -EINVAL;
353 } else if (m_manual_stop && !manual) {
354 dout(5) << "stopped manually, ignoring start without manual flag"
355 << dendl;
356 r = -EPERM;
357 } else {
358 m_state = STATE_STARTING;
359 m_last_r = 0;
360 m_state_desc.clear();
361 m_manual_stop = false;
d2e6a577 362 m_delete_requested = false;
7c673cae
FG
363
364 if (on_finish != nullptr) {
365 assert(m_on_start_finish == nullptr);
366 m_on_start_finish = on_finish;
367 }
368 assert(m_on_stop_finish == nullptr);
369 }
370 }
371
372 if (r < 0) {
373 if (on_finish) {
374 on_finish->complete(r);
375 }
376 return;
377 }
378
379 r = m_local->ioctx_create2(m_local_pool_id, m_local_ioctx);
380 if (r < 0) {
381 derr << "error opening ioctx for local pool " << m_local_pool_id
382 << ": " << cpp_strerror(r) << dendl;
383 on_start_fail(r, "error opening local pool");
384 return;
385 }
386
d2e6a577
FG
387 wait_for_deletion();
388}
389
390template <typename I>
391void ImageReplayer<I>::wait_for_deletion() {
392 dout(20) << dendl;
393
394 Context *ctx = create_context_callback<
395 ImageReplayer, &ImageReplayer<I>::handle_wait_for_deletion>(this);
396 m_image_deleter->wait_for_scheduled_deletion(
397 m_local_pool_id, m_global_image_id, ctx, false);
398}
399
400template <typename I>
401void ImageReplayer<I>::handle_wait_for_deletion(int r) {
402 dout(20) << "r=" << r << dendl;
403
404 if (r == -ECANCELED) {
405 on_start_fail(0, "");
406 return;
407 } else if (r < 0) {
408 on_start_fail(r, "error waiting for image deletion");
409 return;
410 }
411
7c673cae
FG
412 prepare_local_image();
413}
414
415template <typename I>
416void ImageReplayer<I>::prepare_local_image() {
417 dout(20) << dendl;
418
d2e6a577 419 m_local_image_id = "";
7c673cae
FG
420 Context *ctx = create_context_callback<
421 ImageReplayer, &ImageReplayer<I>::handle_prepare_local_image>(this);
422 auto req = PrepareLocalImageRequest<I>::create(
28e407b8 423 m_local_ioctx, m_global_image_id, &m_local_image_id, &m_local_image_name,
7c673cae
FG
424 &m_local_image_tag_owner, m_threads->work_queue, ctx);
425 req->send();
426}
427
428template <typename I>
429void ImageReplayer<I>::handle_prepare_local_image(int r) {
430 dout(20) << "r=" << r << dendl;
431
432 if (r == -ENOENT) {
433 dout(20) << "local image does not exist" << dendl;
434 } else if (r < 0) {
435 on_start_fail(r, "error preparing local image for replay");
436 return;
28e407b8
AA
437 } else {
438 reregister_admin_socket_hook();
7c673cae
FG
439 }
440
441 // local image doesn't exist or is non-primary
d2e6a577 442 prepare_remote_image();
7c673cae
FG
443}
444
445template <typename I>
d2e6a577 446void ImageReplayer<I>::prepare_remote_image() {
7c673cae 447 dout(20) << dendl;
b32b8144
FG
448 if (m_peers.empty()) {
449 // technically nothing to bootstrap, but it handles the status update
450 bootstrap();
451 return;
452 }
7c673cae 453
d2e6a577
FG
454 // TODO need to support multiple remote images
455 assert(!m_peers.empty());
456 m_remote_image = {*m_peers.begin()};
457
458 Context *ctx = create_context_callback<
459 ImageReplayer, &ImageReplayer<I>::handle_prepare_remote_image>(this);
460 auto req = PrepareRemoteImageRequest<I>::create(
b32b8144
FG
461 m_threads, m_remote_image.io_ctx, m_global_image_id, m_local_mirror_uuid,
462 m_local_image_id, &m_remote_image.mirror_uuid, &m_remote_image.image_id,
463 &m_remote_journaler, &m_client_state, &m_client_meta, ctx);
d2e6a577
FG
464 req->send();
465}
466
467template <typename I>
468void ImageReplayer<I>::handle_prepare_remote_image(int r) {
469 dout(20) << "r=" << r << dendl;
470
b32b8144
FG
471 assert(r < 0 ? m_remote_journaler == nullptr : m_remote_journaler != nullptr);
472 if (r < 0 && !m_local_image_id.empty() &&
473 m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) {
474 // local image is primary -- fall-through
475 } else if (r == -ENOENT) {
d2e6a577
FG
476 dout(20) << "remote image does not exist" << dendl;
477
478 // TODO need to support multiple remote images
479 if (!m_local_image_id.empty() &&
480 m_local_image_tag_owner == m_remote_image.mirror_uuid) {
481 // local image exists and is non-primary and linked to the missing
482 // remote image
483
484 m_delete_requested = true;
485 on_start_fail(0, "remote image no longer exists");
486 } else {
487 on_start_fail(-ENOENT, "remote image does not exist");
488 }
489 return;
490 } else if (r < 0) {
491 on_start_fail(r, "error retrieving remote image id");
7c673cae
FG
492 return;
493 }
494
d2e6a577
FG
495 bootstrap();
496}
497
498template <typename I>
499void ImageReplayer<I>::bootstrap() {
500 dout(20) << dendl;
7c673cae 501
b32b8144
FG
502 if (!m_local_image_id.empty() &&
503 m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) {
504 dout(5) << "local image is primary" << dendl;
505 on_start_fail(0, "local image is primary");
506 return;
507 } else if (m_peers.empty()) {
508 dout(5) << "no peer clusters" << dendl;
509 on_start_fail(-ENOENT, "no peer clusters");
510 return;
511 }
7c673cae
FG
512
513 Context *ctx = create_context_callback<
514 ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
515
516 BootstrapRequest<I> *request = BootstrapRequest<I>::create(
31f18b77 517 m_local_ioctx, m_remote_image.io_ctx, m_instance_watcher,
7c673cae
FG
518 &m_local_image_ctx, m_local_image_id, m_remote_image.image_id,
519 m_global_image_id, m_threads->work_queue, m_threads->timer,
520 &m_threads->timer_lock, m_local_mirror_uuid, m_remote_image.mirror_uuid,
b32b8144
FG
521 m_remote_journaler, &m_client_state, &m_client_meta, ctx,
522 &m_resync_requested, &m_progress_cxt);
7c673cae
FG
523
524 {
525 Mutex::Locker locker(m_lock);
526 request->get();
527 m_bootstrap_request = request;
528 }
529
530 update_mirror_image_status(false, boost::none);
531 reschedule_update_status_task(10);
532
533 request->send();
534}
535
536template <typename I>
537void ImageReplayer<I>::handle_bootstrap(int r) {
538 dout(20) << "r=" << r << dendl;
539 {
540 Mutex::Locker locker(m_lock);
541 m_bootstrap_request->put();
542 m_bootstrap_request = nullptr;
543 if (m_local_image_ctx) {
544 m_local_image_id = m_local_image_ctx->id;
545 }
546 }
547
548 if (r == -EREMOTEIO) {
549 m_local_image_tag_owner = "";
c07f9fc5
FG
550 dout(5) << "remote image is non-primary" << dendl;
551 on_start_fail(-EREMOTEIO, "remote image is non-primary");
7c673cae
FG
552 return;
553 } else if (r == -EEXIST) {
554 m_local_image_tag_owner = "";
555 on_start_fail(r, "split-brain detected");
556 return;
557 } else if (r < 0) {
558 on_start_fail(r, "error bootstrapping replay");
559 return;
560 } else if (on_start_interrupted()) {
561 return;
d2e6a577
FG
562 } else if (m_resync_requested) {
563 on_start_fail(0, "resync requested");
564 return;
7c673cae
FG
565 }
566
567 assert(m_local_journal == nullptr);
568 {
569 RWLock::RLocker snap_locker(m_local_image_ctx->snap_lock);
570 if (m_local_image_ctx->journal != nullptr) {
571 m_local_journal = m_local_image_ctx->journal;
572 m_local_journal->add_listener(m_journal_listener);
573 }
574 }
575
576 if (m_local_journal == nullptr) {
577 on_start_fail(-EINVAL, "error accessing local journal");
578 return;
579 }
580
7c673cae
FG
581 update_mirror_image_status(false, boost::none);
582 init_remote_journaler();
583}
584
585template <typename I>
586void ImageReplayer<I>::init_remote_journaler() {
587 dout(20) << dendl;
588
589 Context *ctx = create_context_callback<
590 ImageReplayer, &ImageReplayer<I>::handle_init_remote_journaler>(this);
591 m_remote_journaler->init(ctx);
592}
593
594template <typename I>
595void ImageReplayer<I>::handle_init_remote_journaler(int r) {
596 dout(20) << "r=" << r << dendl;
597
598 if (r < 0) {
599 derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl;
600 on_start_fail(r, "error initializing remote journal");
601 return;
602 } else if (on_start_interrupted()) {
603 return;
604 }
605
606 m_remote_journaler->add_listener(&m_remote_listener);
607
608 cls::journal::Client client;
609 r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
610 if (r < 0) {
611 derr << "error retrieving remote journal client: " << cpp_strerror(r)
612 << dendl;
613 on_start_fail(r, "error retrieving remote journal client");
614 return;
615 }
616
b32b8144
FG
617 dout(5) << "image_id=" << m_local_image_id << ", "
618 << "client_meta.image_id=" << m_client_meta.image_id << ", "
619 << "client.state=" << client.state << dendl;
d2e6a577
FG
620 if (m_client_meta.image_id == m_local_image_id &&
621 client.state != cls::journal::CLIENT_STATE_CONNECTED) {
7c673cae
FG
622 dout(5) << "client flagged disconnected, stopping image replay" << dendl;
623 if (m_local_image_ctx->mirroring_resync_after_disconnect) {
d2e6a577
FG
624 m_resync_requested = true;
625 on_start_fail(-ENOTCONN, "disconnected: automatic resync");
626 } else {
627 on_start_fail(-ENOTCONN, "disconnected");
7c673cae 628 }
7c673cae
FG
629 return;
630 }
631
632 start_replay();
633}
634
635template <typename I>
636void ImageReplayer<I>::start_replay() {
637 dout(20) << dendl;
638
639 Context *start_ctx = create_context_callback<
640 ImageReplayer, &ImageReplayer<I>::handle_start_replay>(this);
641 m_local_journal->start_external_replay(&m_local_replay, start_ctx);
642}
643
644template <typename I>
645void ImageReplayer<I>::handle_start_replay(int r) {
646 dout(20) << "r=" << r << dendl;
647
648 if (r < 0) {
649 assert(m_local_replay == nullptr);
650 derr << "error starting external replay on local image "
651 << m_local_image_id << ": " << cpp_strerror(r) << dendl;
652 on_start_fail(r, "error starting replay on local image");
653 return;
654 }
655
656 Context *on_finish(nullptr);
657 {
658 Mutex::Locker locker(m_lock);
659 assert(m_state == STATE_STARTING);
660 m_state = STATE_REPLAYING;
661 std::swap(m_on_start_finish, on_finish);
662 }
663
664 m_event_preprocessor = EventPreprocessor<I>::create(
665 *m_local_image_ctx, *m_remote_journaler, m_local_mirror_uuid,
666 &m_client_meta, m_threads->work_queue);
667 m_replay_status_formatter =
668 ReplayStatusFormatter<I>::create(m_remote_journaler, m_local_mirror_uuid);
669
670 update_mirror_image_status(true, boost::none);
671 reschedule_update_status_task(30);
672
7c673cae
FG
673 if (on_replay_interrupted()) {
674 return;
675 }
676
677 {
678 CephContext *cct = static_cast<CephContext *>(m_local->cct());
181888fb
FG
679 double poll_seconds = cct->_conf->get_val<double>(
680 "rbd_mirror_journal_poll_age");
7c673cae
FG
681
682 Mutex::Locker locker(m_lock);
683 m_replay_handler = new ReplayHandler<I>(this);
684 m_remote_journaler->start_live_replay(m_replay_handler, poll_seconds);
685
686 dout(20) << "m_remote_journaler=" << *m_remote_journaler << dendl;
687 }
688
d2e6a577
FG
689 dout(20) << "start succeeded" << dendl;
690 if (on_finish != nullptr) {
691 dout(20) << "on finish complete, r=" << r << dendl;
692 on_finish->complete(r);
693 }
7c673cae
FG
694}
695
696template <typename I>
697void ImageReplayer<I>::on_start_fail(int r, const std::string &desc)
698{
699 dout(20) << "r=" << r << dendl;
700 Context *ctx = new FunctionContext([this, r, desc](int _r) {
701 {
702 Mutex::Locker locker(m_lock);
703 assert(m_state == STATE_STARTING);
704 m_state = STATE_STOPPING;
d2e6a577 705 if (r < 0 && r != -ECANCELED && r != -EREMOTEIO && r != -ENOENT) {
7c673cae
FG
706 derr << "start failed: " << cpp_strerror(r) << dendl;
707 } else {
708 dout(20) << "start canceled" << dendl;
709 }
710 }
711
712 set_state_description(r, desc);
713 update_mirror_image_status(false, boost::none);
714 reschedule_update_status_task(-1);
715 shut_down(r);
716 });
717 m_threads->work_queue->queue(ctx, 0);
718}
719
720template <typename I>
721bool ImageReplayer<I>::on_start_interrupted()
722{
723 Mutex::Locker locker(m_lock);
724 assert(m_state == STATE_STARTING);
725 if (m_on_stop_finish == nullptr) {
726 return false;
727 }
728
729 on_start_fail(-ECANCELED);
730 return true;
731}
732
733template <typename I>
734void ImageReplayer<I>::stop(Context *on_finish, bool manual, int r,
735 const std::string& desc)
736{
737 dout(20) << "on_finish=" << on_finish << ", manual=" << manual
738 << ", desc=" << desc << dendl;
739
d2e6a577
FG
740 m_image_deleter->cancel_waiter(m_local_pool_id, m_global_image_id);
741
7c673cae
FG
742 image_replayer::BootstrapRequest<I> *bootstrap_request = nullptr;
743 bool shut_down_replay = false;
744 bool running = true;
7c673cae
FG
745 {
746 Mutex::Locker locker(m_lock);
747
748 if (!is_running_()) {
749 running = false;
750 } else {
751 if (!is_stopped_()) {
752 if (m_state == STATE_STARTING) {
753 dout(20) << "canceling start" << dendl;
754 if (m_bootstrap_request) {
755 bootstrap_request = m_bootstrap_request;
756 bootstrap_request->get();
757 }
758 } else {
759 dout(20) << "interrupting replay" << dendl;
760 shut_down_replay = true;
761 }
762
763 assert(m_on_stop_finish == nullptr);
764 std::swap(m_on_stop_finish, on_finish);
765 m_stop_requested = true;
766 m_manual_stop = manual;
7c673cae
FG
767 }
768 }
769 }
770
771 // avoid holding lock since bootstrap request will update status
772 if (bootstrap_request != nullptr) {
773 bootstrap_request->cancel();
774 bootstrap_request->put();
775 }
776
7c673cae
FG
777 if (!running) {
778 dout(20) << "not running" << dendl;
779 if (on_finish) {
780 on_finish->complete(-EINVAL);
781 }
782 return;
783 }
784
785 if (shut_down_replay) {
786 on_stop_journal_replay(r, desc);
787 } else if (on_finish != nullptr) {
788 on_finish->complete(0);
789 }
790}
791
792template <typename I>
793void ImageReplayer<I>::on_stop_journal_replay(int r, const std::string &desc)
794{
795 dout(20) << "enter" << dendl;
796
797 {
798 Mutex::Locker locker(m_lock);
799 if (m_state != STATE_REPLAYING) {
800 // might be invoked multiple times while stopping
801 return;
802 }
803 m_stop_requested = true;
804 m_state = STATE_STOPPING;
805 }
806
807 set_state_description(r, desc);
808 update_mirror_image_status(false, boost::none);
809 reschedule_update_status_task(-1);
810 shut_down(0);
811}
812
813template <typename I>
814void ImageReplayer<I>::handle_replay_ready()
815{
816 dout(20) << "enter" << dendl;
817 if (on_replay_interrupted()) {
818 return;
819 }
820
821 if (!m_remote_journaler->try_pop_front(&m_replay_entry, &m_replay_tag_tid)) {
822 return;
823 }
824
825 m_event_replay_tracker.start_op();
31f18b77
FG
826
827 m_lock.Lock();
828 bool stopping = (m_state == STATE_STOPPING);
829 m_lock.Unlock();
830
831 if (stopping) {
832 dout(10) << "stopping event replay" << dendl;
833 m_event_replay_tracker.finish_op();
834 return;
835 }
836
7c673cae
FG
837 if (m_replay_tag_valid && m_replay_tag.tid == m_replay_tag_tid) {
838 preprocess_entry();
839 return;
840 }
841
842 replay_flush();
843}
844
845template <typename I>
846void ImageReplayer<I>::restart(Context *on_finish)
847{
848 FunctionContext *ctx = new FunctionContext(
849 [this, on_finish](int r) {
850 if (r < 0) {
851 // Try start anyway.
852 }
853 start(on_finish, true);
854 });
855 stop(ctx);
856}
857
858template <typename I>
859void ImageReplayer<I>::flush(Context *on_finish)
860{
861 dout(20) << "enter" << dendl;
862
863 {
864 Mutex::Locker locker(m_lock);
865 if (m_state == STATE_REPLAYING) {
866 Context *ctx = new FunctionContext(
867 [on_finish](int r) {
868 if (on_finish != nullptr) {
869 on_finish->complete(r);
870 }
871 });
872 on_flush_local_replay_flush_start(ctx);
873 return;
874 }
875 }
876
877 if (on_finish) {
878 on_finish->complete(0);
879 }
880}
881
882template <typename I>
883void ImageReplayer<I>::on_flush_local_replay_flush_start(Context *on_flush)
884{
885 dout(20) << "enter" << dendl;
886 FunctionContext *ctx = new FunctionContext(
887 [this, on_flush](int r) {
888 on_flush_local_replay_flush_finish(on_flush, r);
889 });
890
891 assert(m_lock.is_locked());
892 assert(m_state == STATE_REPLAYING);
893 m_local_replay->flush(ctx);
894}
895
896template <typename I>
897void ImageReplayer<I>::on_flush_local_replay_flush_finish(Context *on_flush,
898 int r)
899{
900 dout(20) << "r=" << r << dendl;
901 if (r < 0) {
902 derr << "error flushing local replay: " << cpp_strerror(r) << dendl;
903 on_flush->complete(r);
904 return;
905 }
906
907 on_flush_flush_commit_position_start(on_flush);
908}
909
910template <typename I>
911void ImageReplayer<I>::on_flush_flush_commit_position_start(Context *on_flush)
912{
913 FunctionContext *ctx = new FunctionContext(
914 [this, on_flush](int r) {
915 on_flush_flush_commit_position_finish(on_flush, r);
916 });
917
918 m_remote_journaler->flush_commit_position(ctx);
919}
920
921template <typename I>
922void ImageReplayer<I>::on_flush_flush_commit_position_finish(Context *on_flush,
923 int r)
924{
925 if (r < 0) {
926 derr << "error flushing remote journal commit position: "
927 << cpp_strerror(r) << dendl;
928 }
929
930 update_mirror_image_status(false, boost::none);
931
932 dout(20) << "flush complete, r=" << r << dendl;
933 on_flush->complete(r);
934}
935
936template <typename I>
937bool ImageReplayer<I>::on_replay_interrupted()
938{
939 bool shut_down;
940 {
941 Mutex::Locker locker(m_lock);
942 shut_down = m_stop_requested;
943 }
944
945 if (shut_down) {
946 on_stop_journal_replay();
947 }
948 return shut_down;
949}
950
951template <typename I>
952void ImageReplayer<I>::print_status(Formatter *f, stringstream *ss)
953{
954 dout(20) << "enter" << dendl;
955
956 Mutex::Locker l(m_lock);
957
958 if (f) {
959 f->open_object_section("image_replayer");
960 f->dump_string("name", m_name);
961 f->dump_string("state", to_string(m_state));
962 f->close_section();
963 f->flush(*ss);
964 } else {
965 *ss << m_name << ": state: " << to_string(m_state);
966 }
967}
968
969template <typename I>
970void ImageReplayer<I>::handle_replay_complete(int r, const std::string &error_desc)
971{
972 dout(20) << "r=" << r << dendl;
973 if (r < 0) {
974 derr << "replay encountered an error: " << cpp_strerror(r) << dendl;
975 set_state_description(r, error_desc);
976 }
977
978 {
979 Mutex::Locker locker(m_lock);
980 m_stop_requested = true;
981 }
982 on_replay_interrupted();
983}
984
985template <typename I>
986void ImageReplayer<I>::replay_flush() {
987 dout(20) << dendl;
988
989 bool interrupted = false;
990 {
991 Mutex::Locker locker(m_lock);
992 if (m_state != STATE_REPLAYING) {
993 dout(20) << "replay interrupted" << dendl;
994 interrupted = true;
995 } else {
996 m_state = STATE_REPLAY_FLUSHING;
997 }
998 }
999
1000 if (interrupted) {
1001 m_event_replay_tracker.finish_op();
1002 return;
1003 }
1004
1005 // shut down the replay to flush all IO and ops and create a new
1006 // replayer to handle the new tag epoch
1007 Context *ctx = create_context_callback<
1008 ImageReplayer<I>, &ImageReplayer<I>::handle_replay_flush>(this);
1009 ctx = new FunctionContext([this, ctx](int r) {
1010 m_local_image_ctx->journal->stop_external_replay();
1011 m_local_replay = nullptr;
1012
1013 if (r < 0) {
1014 ctx->complete(r);
1015 return;
1016 }
1017
1018 m_local_journal->start_external_replay(&m_local_replay, ctx);
1019 });
1020 m_local_replay->shut_down(false, ctx);
1021}
1022
1023template <typename I>
1024void ImageReplayer<I>::handle_replay_flush(int r) {
1025 dout(20) << "r=" << r << dendl;
1026
1027 {
1028 Mutex::Locker locker(m_lock);
1029 assert(m_state == STATE_REPLAY_FLUSHING);
1030 m_state = STATE_REPLAYING;
1031 }
1032
1033 if (r < 0) {
1034 derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl;
1035 m_event_replay_tracker.finish_op();
1036 handle_replay_complete(r, "replay flush encountered an error");
1037 return;
1038 } else if (on_replay_interrupted()) {
1039 m_event_replay_tracker.finish_op();
1040 return;
1041 }
1042
1043 get_remote_tag();
1044}
1045
1046template <typename I>
1047void ImageReplayer<I>::get_remote_tag() {
1048 dout(20) << "tag_tid: " << m_replay_tag_tid << dendl;
1049
1050 Context *ctx = create_context_callback<
1051 ImageReplayer, &ImageReplayer<I>::handle_get_remote_tag>(this);
1052 m_remote_journaler->get_tag(m_replay_tag_tid, &m_replay_tag, ctx);
1053}
1054
1055template <typename I>
1056void ImageReplayer<I>::handle_get_remote_tag(int r) {
1057 dout(20) << "r=" << r << dendl;
1058
1059 if (r == 0) {
1060 try {
1061 bufferlist::iterator it = m_replay_tag.data.begin();
1062 ::decode(m_replay_tag_data, it);
1063 } catch (const buffer::error &err) {
1064 r = -EBADMSG;
1065 }
1066 }
1067
1068 if (r < 0) {
1069 derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": "
1070 << cpp_strerror(r) << dendl;
1071 m_event_replay_tracker.finish_op();
1072 handle_replay_complete(r, "failed to retrieve remote tag");
1073 return;
1074 }
1075
1076 m_replay_tag_valid = true;
1077 dout(20) << "decoded remote tag " << m_replay_tag_tid << ": "
1078 << m_replay_tag_data << dendl;
1079
1080 allocate_local_tag();
1081}
1082
1083template <typename I>
1084void ImageReplayer<I>::allocate_local_tag() {
28e407b8 1085 dout(15) << dendl;
7c673cae
FG
1086
1087 std::string mirror_uuid = m_replay_tag_data.mirror_uuid;
28e407b8 1088 if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
7c673cae 1089 mirror_uuid = m_remote_image.mirror_uuid;
28e407b8
AA
1090 } else if (mirror_uuid == m_local_mirror_uuid) {
1091 mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
7c673cae 1092 } else if (mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) {
28e407b8
AA
1093 // handle possible edge condition where daemon can failover and
1094 // the local image has already been promoted/demoted
1095 auto local_tag_data = m_local_journal->get_tag_data();
1096 if (local_tag_data.mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID &&
1097 (local_tag_data.predecessor.commit_valid &&
1098 local_tag_data.predecessor.mirror_uuid ==
1099 librbd::Journal<>::LOCAL_MIRROR_UUID)) {
1100 dout(15) << "skipping stale demotion event" << dendl;
1101 handle_process_entry_safe(m_replay_entry, 0);
1102 handle_replay_ready();
1103 return;
1104 } else {
1105 dout(5) << "encountered image demotion: stopping" << dendl;
1106 Mutex::Locker locker(m_lock);
1107 m_stop_requested = true;
1108 }
7c673cae
FG
1109 }
1110
1111 librbd::journal::TagPredecessor predecessor(m_replay_tag_data.predecessor);
1112 if (predecessor.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
1113 predecessor.mirror_uuid = m_remote_image.mirror_uuid;
1114 } else if (predecessor.mirror_uuid == m_local_mirror_uuid) {
1115 predecessor.mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
1116 }
1117
28e407b8
AA
1118 dout(15) << "mirror_uuid=" << mirror_uuid << ", "
1119 << "predecessor=" << predecessor << ", "
1120 << "replay_tag_tid=" << m_replay_tag_tid << dendl;
7c673cae
FG
1121 Context *ctx = create_context_callback<
1122 ImageReplayer, &ImageReplayer<I>::handle_allocate_local_tag>(this);
1123 m_local_journal->allocate_tag(mirror_uuid, predecessor, ctx);
1124}
1125
1126template <typename I>
1127void ImageReplayer<I>::handle_allocate_local_tag(int r) {
28e407b8
AA
1128 dout(15) << "r=" << r << ", "
1129 << "tag_tid=" << m_local_journal->get_tag_tid() << dendl;
7c673cae
FG
1130
1131 if (r < 0) {
1132 derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl;
1133 m_event_replay_tracker.finish_op();
1134 handle_replay_complete(r, "failed to allocate journal tag");
1135 return;
1136 }
1137
1138 preprocess_entry();
1139}
1140
1141template <typename I>
1142void ImageReplayer<I>::preprocess_entry() {
1143 dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid()
1144 << dendl;
1145
1146 bufferlist data = m_replay_entry.get_data();
1147 bufferlist::iterator it = data.begin();
1148 int r = m_local_replay->decode(&it, &m_event_entry);
1149 if (r < 0) {
1150 derr << "failed to decode journal event" << dendl;
1151 m_event_replay_tracker.finish_op();
1152 handle_replay_complete(r, "failed to decode journal event");
1153 return;
1154 }
1155
1156 uint32_t delay = calculate_replay_delay(
1157 m_event_entry.timestamp, m_local_image_ctx->mirroring_replay_delay);
1158 if (delay == 0) {
1159 handle_preprocess_entry_ready(0);
1160 return;
1161 }
1162
1163 dout(20) << "delaying replay by " << delay << " sec" << dendl;
1164
1165 Mutex::Locker timer_locker(m_threads->timer_lock);
1166 assert(m_delayed_preprocess_task == nullptr);
1167 m_delayed_preprocess_task = new FunctionContext(
1168 [this](int r) {
1169 assert(m_threads->timer_lock.is_locked());
1170 m_delayed_preprocess_task = nullptr;
1171 m_threads->work_queue->queue(
1172 create_context_callback<ImageReplayer,
1173 &ImageReplayer<I>::handle_preprocess_entry_ready>(this), 0);
1174 });
1175 m_threads->timer->add_event_after(delay, m_delayed_preprocess_task);
1176}
1177
1178template <typename I>
1179void ImageReplayer<I>::handle_preprocess_entry_ready(int r) {
1180 dout(20) << "r=" << r << dendl;
1181 assert(r == 0);
1182
1183 if (!m_event_preprocessor->is_required(m_event_entry)) {
1184 process_entry();
1185 return;
1186 }
1187
1188 Context *ctx = create_context_callback<
1189 ImageReplayer, &ImageReplayer<I>::handle_preprocess_entry_safe>(this);
1190 m_event_preprocessor->preprocess(&m_event_entry, ctx);
1191}
1192
1193template <typename I>
1194void ImageReplayer<I>::handle_preprocess_entry_safe(int r) {
1195 dout(20) << "r=" << r << dendl;
1196
1197 if (r < 0) {
7c673cae 1198 m_event_replay_tracker.finish_op();
31f18b77
FG
1199
1200 if (r == -ECANCELED) {
1201 handle_replay_complete(0, "lost exclusive lock");
1202 } else {
1203 derr << "failed to preprocess journal event" << dendl;
1204 handle_replay_complete(r, "failed to preprocess journal event");
1205 }
7c673cae
FG
1206 return;
1207 }
1208
1209 process_entry();
1210}
1211
1212template <typename I>
1213void ImageReplayer<I>::process_entry() {
1214 dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid()
1215 << dendl;
1216
1217 // stop replaying events if stop has been requested
1218 if (on_replay_interrupted()) {
1219 m_event_replay_tracker.finish_op();
1220 return;
1221 }
1222
1223 Context *on_ready = create_context_callback<
1224 ImageReplayer, &ImageReplayer<I>::handle_process_entry_ready>(this);
1225 Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry));
1226
1227 m_local_replay->process(m_event_entry, on_ready, on_commit);
1228}
1229
1230template <typename I>
1231void ImageReplayer<I>::handle_process_entry_ready(int r) {
1232 dout(20) << dendl;
1233 assert(r == 0);
1234
28e407b8
AA
1235 bool update_status = false;
1236 {
1237 RWLock::RLocker snap_locker(m_local_image_ctx->snap_lock);
1238 if (m_local_image_name != m_local_image_ctx->name) {
1239 m_local_image_name = m_local_image_ctx->name;
1240 update_status = true;
1241 }
1242 }
1243
1244 if (update_status) {
1245 reschedule_update_status_task(0);
1246 }
3efd9988 1247
7c673cae
FG
1248 // attempt to process the next event
1249 handle_replay_ready();
1250}
1251
1252template <typename I>
1253void ImageReplayer<I>::handle_process_entry_safe(const ReplayEntry& replay_entry,
1254 int r) {
1255 dout(20) << "commit_tid=" << replay_entry.get_commit_tid() << ", r=" << r
1256 << dendl;
1257
1258 if (r < 0) {
1259 derr << "failed to commit journal event: " << cpp_strerror(r) << dendl;
1260 handle_replay_complete(r, "failed to commit journal event");
1261 } else {
1262 assert(m_remote_journaler != nullptr);
1263 m_remote_journaler->committed(replay_entry);
1264 }
1265 m_event_replay_tracker.finish_op();
1266}
1267
1268template <typename I>
1269bool ImageReplayer<I>::update_mirror_image_status(bool force,
1270 const OptionalState &state) {
1271 dout(20) << dendl;
1272 {
1273 Mutex::Locker locker(m_lock);
1274 if (!start_mirror_image_status_update(force, false)) {
1275 return false;
1276 }
1277 }
1278
1279 queue_mirror_image_status_update(state);
1280 return true;
1281}
1282
1283template <typename I>
1284bool ImageReplayer<I>::start_mirror_image_status_update(bool force,
1285 bool restarting) {
1286 assert(m_lock.is_locked());
1287
1288 if (!force && !is_stopped_()) {
1289 if (!is_running_()) {
1290 dout(20) << "shut down in-progress: ignoring update" << dendl;
1291 return false;
1292 } else if (m_in_flight_status_updates > (restarting ? 1 : 0)) {
1293 dout(20) << "already sending update" << dendl;
1294 m_update_status_requested = true;
1295 return false;
1296 }
1297 }
1298
1299 dout(20) << dendl;
1300 ++m_in_flight_status_updates;
1301 return true;
1302}
1303
1304template <typename I>
1305void ImageReplayer<I>::finish_mirror_image_status_update() {
28e407b8
AA
1306 reregister_admin_socket_hook();
1307
7c673cae
FG
1308 Context *on_finish = nullptr;
1309 {
1310 Mutex::Locker locker(m_lock);
1311 assert(m_in_flight_status_updates > 0);
1312 if (--m_in_flight_status_updates > 0) {
1313 dout(20) << "waiting on " << m_in_flight_status_updates << " in-flight "
1314 << "updates" << dendl;
1315 return;
1316 }
1317
1318 std::swap(on_finish, m_on_update_status_finish);
1319 }
1320
1321 dout(20) << dendl;
1322 if (on_finish != nullptr) {
1323 on_finish->complete(0);
1324 }
1325}
1326
1327template <typename I>
1328void ImageReplayer<I>::queue_mirror_image_status_update(const OptionalState &state) {
1329 dout(20) << dendl;
1330 FunctionContext *ctx = new FunctionContext(
1331 [this, state](int r) {
1332 send_mirror_status_update(state);
1333 });
1334 m_threads->work_queue->queue(ctx, 0);
1335}
1336
1337template <typename I>
1338void ImageReplayer<I>::send_mirror_status_update(const OptionalState &opt_state) {
1339 State state;
1340 std::string state_desc;
1341 int last_r;
7c673cae 1342 bool stopping_replay;
c07f9fc5
FG
1343
1344 OptionalMirrorImageStatusState mirror_image_status_state{
1345 boost::make_optional(false, cls::rbd::MirrorImageStatusState{})};
1346 image_replayer::BootstrapRequest<I>* bootstrap_request = nullptr;
7c673cae
FG
1347 {
1348 Mutex::Locker locker(m_lock);
1349 state = m_state;
1350 state_desc = m_state_desc;
c07f9fc5 1351 mirror_image_status_state = m_mirror_image_status_state;
7c673cae 1352 last_r = m_last_r;
7c673cae 1353 stopping_replay = (m_local_image_ctx != nullptr);
c07f9fc5
FG
1354
1355 if (m_bootstrap_request != nullptr) {
1356 bootstrap_request = m_bootstrap_request;
1357 bootstrap_request->get();
1358 }
1359 }
1360
1361 bool syncing = false;
1362 if (bootstrap_request != nullptr) {
1363 syncing = bootstrap_request->is_syncing();
1364 bootstrap_request->put();
1365 bootstrap_request = nullptr;
7c673cae
FG
1366 }
1367
1368 if (opt_state) {
1369 state = *opt_state;
1370 }
1371
1372 cls::rbd::MirrorImageStatus status;
1373 status.up = true;
1374 switch (state) {
1375 case STATE_STARTING:
c07f9fc5 1376 if (syncing) {
7c673cae
FG
1377 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING;
1378 status.description = state_desc.empty() ? "syncing" : state_desc;
c07f9fc5 1379 mirror_image_status_state = status.state;
7c673cae
FG
1380 } else {
1381 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY;
1382 status.description = "starting replay";
1383 }
1384 break;
1385 case STATE_REPLAYING:
1386 case STATE_REPLAY_FLUSHING:
1387 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
1388 {
1389 Context *on_req_finish = new FunctionContext(
1390 [this](int r) {
1391 dout(20) << "replay status ready: r=" << r << dendl;
1392 if (r >= 0) {
1393 send_mirror_status_update(boost::none);
1394 } else if (r == -EAGAIN) {
1395 // decrement in-flight status update counter
1396 handle_mirror_status_update(r);
1397 }
1398 });
1399
1400 std::string desc;
1401 if (!m_replay_status_formatter->get_or_send_update(&desc,
1402 on_req_finish)) {
1403 dout(20) << "waiting for replay status" << dendl;
1404 return;
1405 }
1406 status.description = "replaying, " + desc;
c07f9fc5 1407 mirror_image_status_state = boost::none;
7c673cae
FG
1408 }
1409 break;
1410 case STATE_STOPPING:
1411 if (stopping_replay) {
1412 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY;
1413 status.description = "stopping replay";
1414 break;
1415 }
1416 // FALLTHROUGH
1417 case STATE_STOPPED:
c07f9fc5
FG
1418 if (last_r == -EREMOTEIO) {
1419 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN;
1420 status.description = state_desc;
1421 mirror_image_status_state = status.state;
1422 } else if (last_r < 0) {
7c673cae
FG
1423 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR;
1424 status.description = state_desc;
c07f9fc5 1425 mirror_image_status_state = status.state;
7c673cae
FG
1426 } else {
1427 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED;
1428 status.description = state_desc.empty() ? "stopped" : state_desc;
c07f9fc5 1429 mirror_image_status_state = boost::none;
7c673cae
FG
1430 }
1431 break;
1432 default:
1433 assert(!"invalid state");
1434 }
1435
c07f9fc5
FG
1436 {
1437 Mutex::Locker locker(m_lock);
1438 m_mirror_image_status_state = mirror_image_status_state;
1439 }
1440
1441 // prevent the status from ping-ponging when failed replays are restarted
1442 if (mirror_image_status_state &&
1443 *mirror_image_status_state == cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR) {
1444 status.state = *mirror_image_status_state;
1445 }
1446
7c673cae
FG
1447 dout(20) << "status=" << status << dendl;
1448 librados::ObjectWriteOperation op;
1449 librbd::cls_client::mirror_image_status_set(&op, m_global_image_id, status);
1450
1451 librados::AioCompletion *aio_comp = create_rados_callback<
1452 ImageReplayer<I>, &ImageReplayer<I>::handle_mirror_status_update>(this);
1453 int r = m_local_ioctx.aio_operate(RBD_MIRRORING, aio_comp, &op);
1454 assert(r == 0);
1455 aio_comp->release();
1456}
1457
1458template <typename I>
1459void ImageReplayer<I>::handle_mirror_status_update(int r) {
1460 dout(20) << "r=" << r << dendl;
1461
1462 bool running = false;
1463 bool started = false;
1464 {
1465 Mutex::Locker locker(m_lock);
1466 bool update_status_requested = false;
1467 std::swap(update_status_requested, m_update_status_requested);
1468
1469 running = is_running_();
1470 if (running && update_status_requested) {
1471 started = start_mirror_image_status_update(false, true);
1472 }
1473 }
1474
1475 // if a deferred update is available, send it -- otherwise reschedule
1476 // the timer task
1477 if (started) {
1478 queue_mirror_image_status_update(boost::none);
1479 } else if (running) {
1480 reschedule_update_status_task();
1481 }
1482
1483 // mark committed status update as no longer in-flight
1484 finish_mirror_image_status_update();
1485}
1486
1487template <typename I>
1488void ImageReplayer<I>::reschedule_update_status_task(int new_interval) {
1489 dout(20) << dendl;
1490
1491 bool canceled_task = false;
1492 {
1493 Mutex::Locker locker(m_lock);
1494 Mutex::Locker timer_locker(m_threads->timer_lock);
1495
1496 if (m_update_status_task) {
1497 canceled_task = m_threads->timer->cancel_event(m_update_status_task);
1498 m_update_status_task = nullptr;
1499 }
1500
1501 if (new_interval > 0) {
1502 m_update_status_interval = new_interval;
1503 }
1504
1505 bool restarting = (new_interval == 0 || canceled_task);
1506 if (new_interval >= 0 && is_running_() &&
1507 start_mirror_image_status_update(false, restarting)) {
1508 m_update_status_task = new FunctionContext(
1509 [this](int r) {
1510 assert(m_threads->timer_lock.is_locked());
1511 m_update_status_task = nullptr;
1512
1513 queue_mirror_image_status_update(boost::none);
1514 });
1515 m_threads->timer->add_event_after(m_update_status_interval,
1516 m_update_status_task);
1517 }
1518 }
1519
1520 if (canceled_task) {
1521 dout(20) << "canceled task" << dendl;
1522 finish_mirror_image_status_update();
1523 }
1524}
1525
1526template <typename I>
1527void ImageReplayer<I>::shut_down(int r) {
1528 dout(20) << "r=" << r << dendl;
3efd9988
FG
1529
1530 bool canceled_delayed_preprocess_task = false;
1531 {
1532 Mutex::Locker timer_locker(m_threads->timer_lock);
1533 if (m_delayed_preprocess_task != nullptr) {
1534 canceled_delayed_preprocess_task = m_threads->timer->cancel_event(
1535 m_delayed_preprocess_task);
1536 assert(canceled_delayed_preprocess_task);
1537 m_delayed_preprocess_task = nullptr;
1538 }
1539 }
1540 if (canceled_delayed_preprocess_task) {
1541 // wake up sleeping replay
1542 m_event_replay_tracker.finish_op();
1543 }
1544
7c673cae
FG
1545 {
1546 Mutex::Locker locker(m_lock);
1547 assert(m_state == STATE_STOPPING);
1548
1549 // if status updates are in-flight, wait for them to complete
1550 // before proceeding
1551 if (m_in_flight_status_updates > 0) {
1552 if (m_on_update_status_finish == nullptr) {
1553 dout(20) << "waiting for in-flight status update" << dendl;
1554 m_on_update_status_finish = new FunctionContext(
1555 [this, r](int _r) {
1556 shut_down(r);
1557 });
1558 }
1559 return;
1560 }
1561 }
1562
31f18b77
FG
1563 // NOTE: it's important to ensure that the local image is fully
1564 // closed before attempting to close the remote journal in
1565 // case the remote cluster is unreachable
1566
7c673cae
FG
1567 // chain the shut down sequence (reverse order)
1568 Context *ctx = new FunctionContext(
1569 [this, r](int _r) {
1570 update_mirror_image_status(true, STATE_STOPPED);
1571 handle_shut_down(r);
1572 });
31f18b77
FG
1573
1574 // close the remote journal
7c673cae
FG
1575 if (m_remote_journaler != nullptr) {
1576 ctx = new FunctionContext([this, ctx](int r) {
1577 delete m_remote_journaler;
1578 m_remote_journaler = nullptr;
1579 ctx->complete(0);
1580 });
1581 ctx = new FunctionContext([this, ctx](int r) {
1582 m_remote_journaler->remove_listener(&m_remote_listener);
1583 m_remote_journaler->shut_down(ctx);
1584 });
7c673cae 1585 }
31f18b77
FG
1586
1587 // stop the replay of remote journal events
1588 if (m_replay_handler != nullptr) {
1589 ctx = new FunctionContext([this, ctx](int r) {
1590 delete m_replay_handler;
1591 m_replay_handler = nullptr;
1592
1593 m_event_replay_tracker.wait_for_ops(ctx);
1594 });
1595 ctx = new FunctionContext([this, ctx](int r) {
1596 m_remote_journaler->stop_replay(ctx);
1597 });
1598 }
1599
1600 // close the local image (release exclusive lock)
1601 if (m_local_image_ctx) {
1602 ctx = new FunctionContext([this, ctx](int r) {
1603 CloseImageRequest<I> *request = CloseImageRequest<I>::create(
1604 &m_local_image_ctx, ctx);
1605 request->send();
1606 });
1607 }
1608
1609 // shut down event replay into the local image
7c673cae
FG
1610 if (m_local_journal != nullptr) {
1611 ctx = new FunctionContext([this, ctx](int r) {
1612 m_local_journal = nullptr;
1613 ctx->complete(0);
1614 });
1615 if (m_local_replay != nullptr) {
1616 ctx = new FunctionContext([this, ctx](int r) {
1617 m_local_journal->stop_external_replay();
1618 m_local_replay = nullptr;
1619
1620 EventPreprocessor<I>::destroy(m_event_preprocessor);
1621 m_event_preprocessor = nullptr;
1622 ctx->complete(0);
1623 });
1624 }
1625 ctx = new FunctionContext([this, ctx](int r) {
7c673cae
FG
1626 // blocks if listener notification is in-progress
1627 m_local_journal->remove_listener(m_journal_listener);
7c673cae
FG
1628 ctx->complete(0);
1629 });
31f18b77
FG
1630 }
1631
1632 // wait for all local in-flight replay events to complete
1633 ctx = new FunctionContext([this, ctx](int r) {
1634 if (r < 0) {
1635 derr << "error shutting down journal replay: " << cpp_strerror(r)
1636 << dendl;
1637 }
1638
1639 m_event_replay_tracker.wait_for_ops(ctx);
1640 });
1641
1642 // flush any local in-flight replay events
1643 if (m_local_replay != nullptr) {
7c673cae 1644 ctx = new FunctionContext([this, ctx](int r) {
31f18b77 1645 m_local_replay->shut_down(true, ctx);
7c673cae
FG
1646 });
1647 }
31f18b77 1648
7c673cae
FG
1649 m_threads->work_queue->queue(ctx, 0);
1650}
1651
1652template <typename I>
1653void ImageReplayer<I>::handle_shut_down(int r) {
1654 reschedule_update_status_task(-1);
1655
3efd9988 1656 bool unregister_asok_hook = false;
7c673cae
FG
1657 {
1658 Mutex::Locker locker(m_lock);
1659
1660 // if status updates are in-flight, wait for them to complete
1661 // before proceeding
1662 if (m_in_flight_status_updates > 0) {
1663 if (m_on_update_status_finish == nullptr) {
1664 dout(20) << "waiting for in-flight status update" << dendl;
1665 m_on_update_status_finish = new FunctionContext(
1666 [this, r](int _r) {
1667 handle_shut_down(r);
1668 });
1669 }
1670 return;
1671 }
1672
d2e6a577
FG
1673 bool delete_requested = false;
1674 if (m_delete_requested && !m_local_image_id.empty()) {
1675 assert(m_remote_image.image_id.empty());
1676 dout(0) << "remote image no longer exists: scheduling deletion" << dendl;
1677 delete_requested = true;
1678 }
1679 if (delete_requested || m_resync_requested) {
7c673cae
FG
1680 m_image_deleter->schedule_image_delete(m_local,
1681 m_local_pool_id,
c07f9fc5 1682 m_global_image_id,
d2e6a577
FG
1683 m_resync_requested);
1684
1685 m_local_image_id = "";
1686 m_resync_requested = false;
1687 if (m_delete_requested) {
3efd9988 1688 unregister_asok_hook = true;
d2e6a577
FG
1689 m_delete_requested = false;
1690 }
1691 } else if (m_last_r == -ENOENT &&
1692 m_local_image_id.empty() && m_remote_image.image_id.empty()) {
1693 dout(0) << "mirror image no longer exists" << dendl;
3efd9988 1694 unregister_asok_hook = true;
d2e6a577 1695 m_finished = true;
7c673cae
FG
1696 }
1697 }
1698
3efd9988
FG
1699 if (unregister_asok_hook) {
1700 unregister_admin_socket_hook();
1701 }
1702
7c673cae
FG
1703 dout(20) << "stop complete" << dendl;
1704 m_local_ioctx.close();
1705
1706 ReplayStatusFormatter<I>::destroy(m_replay_status_formatter);
1707 m_replay_status_formatter = nullptr;
1708
1709 Context *on_start = nullptr;
1710 Context *on_stop = nullptr;
1711 {
1712 Mutex::Locker locker(m_lock);
1713 std::swap(on_start, m_on_start_finish);
1714 std::swap(on_stop, m_on_stop_finish);
1715 m_stop_requested = false;
1716 assert(m_delayed_preprocess_task == nullptr);
1717 assert(m_state == STATE_STOPPING);
1718 m_state = STATE_STOPPED;
1719 }
1720
1721 if (on_start != nullptr) {
1722 dout(20) << "on start finish complete, r=" << r << dendl;
1723 on_start->complete(r);
1724 r = 0;
1725 }
1726 if (on_stop != nullptr) {
1727 dout(20) << "on stop finish complete, r=" << r << dendl;
1728 on_stop->complete(r);
1729 }
1730}
1731
1732template <typename I>
1733void ImageReplayer<I>::handle_remote_journal_metadata_updated() {
1734 dout(20) << dendl;
1735
1736 cls::journal::Client client;
1737 {
1738 Mutex::Locker locker(m_lock);
1739 if (!is_running_()) {
1740 return;
1741 }
1742
1743 int r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
1744 if (r < 0) {
1745 derr << "failed to retrieve client: " << cpp_strerror(r) << dendl;
1746 return;
1747 }
1748 }
1749
1750 if (client.state != cls::journal::CLIENT_STATE_CONNECTED) {
1751 dout(0) << "client flagged disconnected, stopping image replay" << dendl;
1752 stop(nullptr, false, -ENOTCONN, "disconnected");
1753 }
1754}
1755
1756template <typename I>
1757std::string ImageReplayer<I>::to_string(const State state) {
1758 switch (state) {
1759 case ImageReplayer<I>::STATE_STARTING:
1760 return "Starting";
1761 case ImageReplayer<I>::STATE_REPLAYING:
1762 return "Replaying";
1763 case ImageReplayer<I>::STATE_REPLAY_FLUSHING:
1764 return "ReplayFlushing";
1765 case ImageReplayer<I>::STATE_STOPPING:
1766 return "Stopping";
1767 case ImageReplayer<I>::STATE_STOPPED:
1768 return "Stopped";
1769 default:
1770 break;
1771 }
1772 return "Unknown(" + stringify(state) + ")";
1773}
1774
1775template <typename I>
1776void ImageReplayer<I>::resync_image(Context *on_finish) {
1777 dout(20) << dendl;
1778
d2e6a577
FG
1779 m_resync_requested = true;
1780 stop(on_finish);
1781}
1782
1783template <typename I>
1784void ImageReplayer<I>::register_admin_socket_hook() {
3efd9988
FG
1785 ImageReplayerAdminSocketHook<I> *asok_hook;
1786 {
1787 Mutex::Locker locker(m_lock);
1788 if (m_asok_hook != nullptr) {
1789 return;
1790 }
7c673cae 1791
28e407b8 1792 dout(15) << "registered asok hook: " << m_name << dendl;
3efd9988
FG
1793 asok_hook = new ImageReplayerAdminSocketHook<I>(g_ceph_context, m_name,
1794 this);
1795 int r = asok_hook->register_commands();
1796 if (r == 0) {
1797 m_asok_hook = asok_hook;
1798 return;
1799 }
d2e6a577 1800 derr << "error registering admin socket commands" << dendl;
d2e6a577 1801 }
3efd9988 1802 delete asok_hook;
d2e6a577
FG
1803}
1804
1805template <typename I>
1806void ImageReplayer<I>::unregister_admin_socket_hook() {
28e407b8 1807 dout(15) << dendl;
d2e6a577 1808
3efd9988
FG
1809 AdminSocketHook *asok_hook = nullptr;
1810 {
1811 Mutex::Locker locker(m_lock);
1812 std::swap(asok_hook, m_asok_hook);
1813 }
1814 delete asok_hook;
1815}
1816
1817template <typename I>
28e407b8 1818void ImageReplayer<I>::reregister_admin_socket_hook() {
3efd9988
FG
1819 {
1820 Mutex::Locker locker(m_lock);
28e407b8 1821 auto name = m_local_ioctx.get_pool_name() + "/" + m_local_image_name;
3efd9988
FG
1822 if (m_name == name) {
1823 return;
1824 }
1825 m_name = name;
1826 }
1827 unregister_admin_socket_hook();
1828 register_admin_socket_hook();
7c673cae
FG
1829}
1830
1831template <typename I>
1832std::ostream &operator<<(std::ostream &os, const ImageReplayer<I> &replayer)
1833{
1834 os << "ImageReplayer: " << &replayer << " [" << replayer.get_local_pool_id()
1835 << "/" << replayer.get_global_image_id() << "]";
1836 return os;
1837}
1838
1839} // namespace mirror
1840} // namespace rbd
1841
1842template class rbd::mirror::ImageReplayer<librbd::ImageCtx>;