]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/ImageReplayer.cc
update sources to v12.2.3
[ceph.git] / ceph / src / tools / rbd_mirror / ImageReplayer.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "include/compat.h"
5#include "common/Formatter.h"
6#include "common/debug.h"
7#include "common/errno.h"
8#include "include/stringify.h"
9#include "cls/rbd/cls_rbd_client.h"
10#include "common/Timer.h"
11#include "common/WorkQueue.h"
12#include "global/global_context.h"
13#include "journal/Journaler.h"
14#include "journal/ReplayHandler.h"
15#include "journal/Settings.h"
16#include "librbd/ExclusiveLock.h"
17#include "librbd/ImageCtx.h"
18#include "librbd/ImageState.h"
19#include "librbd/Journal.h"
20#include "librbd/Operations.h"
21#include "librbd/Utils.h"
22#include "librbd/journal/Replay.h"
d2e6a577 23#include "ImageDeleter.h"
7c673cae 24#include "ImageReplayer.h"
7c673cae
FG
25#include "Threads.h"
26#include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
27#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
28#include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
29#include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
d2e6a577 30#include "tools/rbd_mirror/image_replayer/PrepareRemoteImageRequest.h"
7c673cae
FG
31#include "tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h"
32
33#define dout_context g_ceph_context
34#define dout_subsys ceph_subsys_rbd_mirror
35#undef dout_prefix
36#define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
37 << __func__ << ": "
38
39using std::map;
40using std::string;
41using std::unique_ptr;
42using std::shared_ptr;
43using std::vector;
44
45namespace rbd {
46namespace mirror {
47
48using librbd::util::create_context_callback;
49using librbd::util::create_rados_callback;
50using namespace rbd::mirror::image_replayer;
51
52template <typename I>
53std::ostream &operator<<(std::ostream &os,
54 const typename ImageReplayer<I>::State &state);
55
56namespace {
57
58template <typename I>
59struct ReplayHandler : public ::journal::ReplayHandler {
60 ImageReplayer<I> *replayer;
61 ReplayHandler(ImageReplayer<I> *replayer) : replayer(replayer) {}
62 void get() override {}
63 void put() override {}
64
65 void handle_entries_available() override {
66 replayer->handle_replay_ready();
67 }
68 void handle_complete(int r) override {
69 std::stringstream ss;
70 if (r < 0) {
71 ss << "replay completed with error: " << cpp_strerror(r);
72 }
73 replayer->handle_replay_complete(r, ss.str());
74 }
75};
76
3efd9988 77template <typename I>
7c673cae
FG
78class ImageReplayerAdminSocketCommand {
79public:
3efd9988
FG
80 ImageReplayerAdminSocketCommand(const std::string &desc,
81 ImageReplayer<I> *replayer)
82 : desc(desc), replayer(replayer) {
83 }
7c673cae
FG
84 virtual ~ImageReplayerAdminSocketCommand() {}
85 virtual bool call(Formatter *f, stringstream *ss) = 0;
3efd9988
FG
86
87 std::string desc;
88 ImageReplayer<I> *replayer;
89 bool registered = false;
7c673cae
FG
90};
91
92template <typename I>
3efd9988 93class StatusCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 94public:
3efd9988
FG
95 explicit StatusCommand(const std::string &desc, ImageReplayer<I> *replayer)
96 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
97 }
7c673cae
FG
98
99 bool call(Formatter *f, stringstream *ss) override {
3efd9988 100 this->replayer->print_status(f, ss);
7c673cae
FG
101 return true;
102 }
7c673cae
FG
103};
104
105template <typename I>
3efd9988 106class StartCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 107public:
3efd9988
FG
108 explicit StartCommand(const std::string &desc, ImageReplayer<I> *replayer)
109 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
110 }
7c673cae
FG
111
112 bool call(Formatter *f, stringstream *ss) override {
3efd9988 113 this->replayer->start(nullptr, true);
7c673cae
FG
114 return true;
115 }
7c673cae
FG
116};
117
118template <typename I>
3efd9988 119class StopCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 120public:
3efd9988
FG
121 explicit StopCommand(const std::string &desc, ImageReplayer<I> *replayer)
122 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
123 }
7c673cae
FG
124
125 bool call(Formatter *f, stringstream *ss) override {
3efd9988 126 this->replayer->stop(nullptr, true);
7c673cae
FG
127 return true;
128 }
7c673cae
FG
129};
130
131template <typename I>
3efd9988 132class RestartCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 133public:
3efd9988
FG
134 explicit RestartCommand(const std::string &desc, ImageReplayer<I> *replayer)
135 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
136 }
7c673cae
FG
137
138 bool call(Formatter *f, stringstream *ss) override {
3efd9988 139 this->replayer->restart();
7c673cae
FG
140 return true;
141 }
7c673cae
FG
142};
143
144template <typename I>
3efd9988 145class FlushCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 146public:
3efd9988
FG
147 explicit FlushCommand(const std::string &desc, ImageReplayer<I> *replayer)
148 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
149 }
7c673cae
FG
150
151 bool call(Formatter *f, stringstream *ss) override {
152 C_SaferCond cond;
3efd9988 153 this->replayer->flush(&cond);
7c673cae
FG
154 int r = cond.wait();
155 if (r < 0) {
156 *ss << "flush: " << cpp_strerror(r);
157 return false;
158 }
159 return true;
160 }
7c673cae
FG
161};
162
163template <typename I>
164class ImageReplayerAdminSocketHook : public AdminSocketHook {
165public:
166 ImageReplayerAdminSocketHook(CephContext *cct, const std::string &name,
31f18b77 167 ImageReplayer<I> *replayer)
3efd9988
FG
168 : admin_socket(cct->get_admin_socket()),
169 commands{{"rbd mirror flush " + name,
170 new FlushCommand<I>("flush rbd mirror " + name, replayer)},
171 {"rbd mirror restart " + name,
172 new RestartCommand<I>("restart rbd mirror " + name, replayer)},
173 {"rbd mirror start " + name,
174 new StartCommand<I>("start rbd mirror " + name, replayer)},
175 {"rbd mirror status " + name,
176 new StatusCommand<I>("get status for rbd mirror " + name, replayer)},
177 {"rbd mirror stop " + name,
178 new StopCommand<I>("stop rbd mirror " + name, replayer)}} {
d2e6a577
FG
179 }
180
181 int register_commands() {
3efd9988
FG
182 for (auto &it : commands) {
183 int r = admin_socket->register_command(it.first, it.first, this,
184 it.second->desc);
185 if (r < 0) {
186 return r;
187 }
188 it.second->registered = true;
7c673cae 189 }
d2e6a577 190 return 0;
7c673cae
FG
191 }
192
193 ~ImageReplayerAdminSocketHook() override {
3efd9988
FG
194 for (auto &it : commands) {
195 if (it.second->registered) {
196 admin_socket->unregister_command(it.first);
197 }
198 delete it.second;
7c673cae 199 }
31f18b77 200 commands.clear();
7c673cae
FG
201 }
202
203 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
204 bufferlist& out) override {
3efd9988 205 auto i = commands.find(command);
7c673cae
FG
206 assert(i != commands.end());
207 Formatter *f = Formatter::create(format);
208 stringstream ss;
209 bool r = i->second->call(f, &ss);
210 delete f;
211 out.append(ss);
212 return r;
213 }
214
215private:
3efd9988 216 typedef std::map<std::string, ImageReplayerAdminSocketCommand<I> *> Commands;
7c673cae
FG
217
218 AdminSocket *admin_socket;
219 Commands commands;
220};
221
222uint32_t calculate_replay_delay(const utime_t &event_time,
223 int mirroring_replay_delay) {
224 if (mirroring_replay_delay <= 0) {
225 return 0;
226 }
227
228 utime_t now = ceph_clock_now();
229 if (event_time + mirroring_replay_delay <= now) {
230 return 0;
231 }
232
233 // ensure it is rounded up when converting to integer
234 return (event_time + mirroring_replay_delay - now) + 1;
235}
236
237} // anonymous namespace
238
239template <typename I>
240void ImageReplayer<I>::BootstrapProgressContext::update_progress(
241 const std::string &description, bool flush)
242{
243 const std::string desc = "bootstrapping, " + description;
244 replayer->set_state_description(0, desc);
245 if (flush) {
246 replayer->update_mirror_image_status(false, boost::none);
247 }
248}
249
250template <typename I>
251void ImageReplayer<I>::RemoteJournalerListener::handle_update(
252 ::journal::JournalMetadata *) {
253 FunctionContext *ctx = new FunctionContext([this](int r) {
254 replayer->handle_remote_journal_metadata_updated();
255 });
256 replayer->m_threads->work_queue->queue(ctx, 0);
257}
258
259template <typename I>
d2e6a577 260ImageReplayer<I>::ImageReplayer(Threads<I> *threads,
c07f9fc5 261 ImageDeleter<I>* image_deleter,
31f18b77 262 InstanceWatcher<I> *instance_watcher,
7c673cae
FG
263 RadosRef local,
264 const std::string &local_mirror_uuid,
265 int64_t local_pool_id,
266 const std::string &global_image_id) :
267 m_threads(threads),
268 m_image_deleter(image_deleter),
31f18b77 269 m_instance_watcher(instance_watcher),
7c673cae
FG
270 m_local(local),
271 m_local_mirror_uuid(local_mirror_uuid),
272 m_local_pool_id(local_pool_id),
273 m_global_image_id(global_image_id),
274 m_lock("rbd::mirror::ImageReplayer " + stringify(local_pool_id) + " " +
275 global_image_id),
276 m_progress_cxt(this),
277 m_journal_listener(new JournalListener(this)),
278 m_remote_listener(this)
279{
280 // Register asok commands using a temporary "remote_pool_name/global_image_id"
281 // name. When the image name becomes known on start the asok commands will be
282 // re-registered using "remote_pool_name/remote_image_name" name.
283
284 std::string pool_name;
285 int r = m_local->pool_reverse_lookup(m_local_pool_id, &pool_name);
286 if (r < 0) {
287 derr << "error resolving local pool " << m_local_pool_id
288 << ": " << cpp_strerror(r) << dendl;
289 pool_name = stringify(m_local_pool_id);
290 }
291
292 m_name = pool_name + "/" + m_global_image_id;
d2e6a577 293 register_admin_socket_hook();
7c673cae
FG
294}
295
296template <typename I>
297ImageReplayer<I>::~ImageReplayer()
298{
d2e6a577 299 unregister_admin_socket_hook();
7c673cae
FG
300 assert(m_event_preprocessor == nullptr);
301 assert(m_replay_status_formatter == nullptr);
302 assert(m_local_image_ctx == nullptr);
303 assert(m_local_replay == nullptr);
304 assert(m_remote_journaler == nullptr);
305 assert(m_replay_handler == nullptr);
306 assert(m_on_start_finish == nullptr);
307 assert(m_on_stop_finish == nullptr);
308 assert(m_bootstrap_request == nullptr);
309 assert(m_in_flight_status_updates == 0);
310
311 delete m_journal_listener;
7c673cae
FG
312}
313
c07f9fc5
FG
314template <typename I>
315image_replayer::HealthState ImageReplayer<I>::get_health_state() const {
316 Mutex::Locker locker(m_lock);
317
318 if (!m_mirror_image_status_state) {
319 return image_replayer::HEALTH_STATE_OK;
320 } else if (*m_mirror_image_status_state ==
321 cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING ||
322 *m_mirror_image_status_state ==
323 cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN) {
324 return image_replayer::HEALTH_STATE_WARNING;
325 }
326 return image_replayer::HEALTH_STATE_ERROR;
327}
328
7c673cae 329template <typename I>
d2e6a577
FG
330void ImageReplayer<I>::add_peer(const std::string &peer_uuid,
331 librados::IoCtx &io_ctx) {
7c673cae 332 Mutex::Locker locker(m_lock);
d2e6a577
FG
333 auto it = m_peers.find({peer_uuid});
334 if (it == m_peers.end()) {
335 m_peers.insert({peer_uuid, io_ctx});
7c673cae
FG
336 }
337}
338
7c673cae
FG
339template <typename I>
340void ImageReplayer<I>::set_state_description(int r, const std::string &desc) {
341 dout(20) << r << " " << desc << dendl;
342
343 Mutex::Locker l(m_lock);
344 m_last_r = r;
345 m_state_desc = desc;
346}
347
348template <typename I>
349void ImageReplayer<I>::start(Context *on_finish, bool manual)
350{
351 dout(20) << "on_finish=" << on_finish << dendl;
352
353 int r = 0;
354 {
355 Mutex::Locker locker(m_lock);
356 if (!is_stopped_()) {
357 derr << "already running" << dendl;
358 r = -EINVAL;
359 } else if (m_manual_stop && !manual) {
360 dout(5) << "stopped manually, ignoring start without manual flag"
361 << dendl;
362 r = -EPERM;
363 } else {
364 m_state = STATE_STARTING;
365 m_last_r = 0;
366 m_state_desc.clear();
367 m_manual_stop = false;
d2e6a577 368 m_delete_requested = false;
7c673cae
FG
369
370 if (on_finish != nullptr) {
371 assert(m_on_start_finish == nullptr);
372 m_on_start_finish = on_finish;
373 }
374 assert(m_on_stop_finish == nullptr);
375 }
376 }
377
378 if (r < 0) {
379 if (on_finish) {
380 on_finish->complete(r);
381 }
382 return;
383 }
384
385 r = m_local->ioctx_create2(m_local_pool_id, m_local_ioctx);
386 if (r < 0) {
387 derr << "error opening ioctx for local pool " << m_local_pool_id
388 << ": " << cpp_strerror(r) << dendl;
389 on_start_fail(r, "error opening local pool");
390 return;
391 }
392
d2e6a577
FG
393 wait_for_deletion();
394}
395
396template <typename I>
397void ImageReplayer<I>::wait_for_deletion() {
398 dout(20) << dendl;
399
400 Context *ctx = create_context_callback<
401 ImageReplayer, &ImageReplayer<I>::handle_wait_for_deletion>(this);
402 m_image_deleter->wait_for_scheduled_deletion(
403 m_local_pool_id, m_global_image_id, ctx, false);
404}
405
406template <typename I>
407void ImageReplayer<I>::handle_wait_for_deletion(int r) {
408 dout(20) << "r=" << r << dendl;
409
410 if (r == -ECANCELED) {
411 on_start_fail(0, "");
412 return;
413 } else if (r < 0) {
414 on_start_fail(r, "error waiting for image deletion");
415 return;
416 }
417
7c673cae
FG
418 prepare_local_image();
419}
420
421template <typename I>
422void ImageReplayer<I>::prepare_local_image() {
423 dout(20) << dendl;
424
d2e6a577 425 m_local_image_id = "";
7c673cae
FG
426 Context *ctx = create_context_callback<
427 ImageReplayer, &ImageReplayer<I>::handle_prepare_local_image>(this);
428 auto req = PrepareLocalImageRequest<I>::create(
429 m_local_ioctx, m_global_image_id, &m_local_image_id,
430 &m_local_image_tag_owner, m_threads->work_queue, ctx);
431 req->send();
432}
433
434template <typename I>
435void ImageReplayer<I>::handle_prepare_local_image(int r) {
436 dout(20) << "r=" << r << dendl;
437
438 if (r == -ENOENT) {
439 dout(20) << "local image does not exist" << dendl;
440 } else if (r < 0) {
441 on_start_fail(r, "error preparing local image for replay");
442 return;
7c673cae
FG
443 }
444
445 // local image doesn't exist or is non-primary
d2e6a577 446 prepare_remote_image();
7c673cae
FG
447}
448
449template <typename I>
d2e6a577 450void ImageReplayer<I>::prepare_remote_image() {
7c673cae 451 dout(20) << dendl;
b32b8144
FG
452 if (m_peers.empty()) {
453 // technically nothing to bootstrap, but it handles the status update
454 bootstrap();
455 return;
456 }
7c673cae 457
d2e6a577
FG
458 // TODO need to support multiple remote images
459 assert(!m_peers.empty());
460 m_remote_image = {*m_peers.begin()};
461
462 Context *ctx = create_context_callback<
463 ImageReplayer, &ImageReplayer<I>::handle_prepare_remote_image>(this);
464 auto req = PrepareRemoteImageRequest<I>::create(
b32b8144
FG
465 m_threads, m_remote_image.io_ctx, m_global_image_id, m_local_mirror_uuid,
466 m_local_image_id, &m_remote_image.mirror_uuid, &m_remote_image.image_id,
467 &m_remote_journaler, &m_client_state, &m_client_meta, ctx);
d2e6a577
FG
468 req->send();
469}
470
471template <typename I>
472void ImageReplayer<I>::handle_prepare_remote_image(int r) {
473 dout(20) << "r=" << r << dendl;
474
b32b8144
FG
475 assert(r < 0 ? m_remote_journaler == nullptr : m_remote_journaler != nullptr);
476 if (r < 0 && !m_local_image_id.empty() &&
477 m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) {
478 // local image is primary -- fall-through
479 } else if (r == -ENOENT) {
d2e6a577
FG
480 dout(20) << "remote image does not exist" << dendl;
481
482 // TODO need to support multiple remote images
483 if (!m_local_image_id.empty() &&
484 m_local_image_tag_owner == m_remote_image.mirror_uuid) {
485 // local image exists and is non-primary and linked to the missing
486 // remote image
487
488 m_delete_requested = true;
489 on_start_fail(0, "remote image no longer exists");
490 } else {
491 on_start_fail(-ENOENT, "remote image does not exist");
492 }
493 return;
494 } else if (r < 0) {
495 on_start_fail(r, "error retrieving remote image id");
7c673cae
FG
496 return;
497 }
498
d2e6a577
FG
499 bootstrap();
500}
501
502template <typename I>
503void ImageReplayer<I>::bootstrap() {
504 dout(20) << dendl;
7c673cae 505
b32b8144
FG
506 if (!m_local_image_id.empty() &&
507 m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) {
508 dout(5) << "local image is primary" << dendl;
509 on_start_fail(0, "local image is primary");
510 return;
511 } else if (m_peers.empty()) {
512 dout(5) << "no peer clusters" << dendl;
513 on_start_fail(-ENOENT, "no peer clusters");
514 return;
515 }
7c673cae
FG
516
517 Context *ctx = create_context_callback<
518 ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
519
520 BootstrapRequest<I> *request = BootstrapRequest<I>::create(
31f18b77 521 m_local_ioctx, m_remote_image.io_ctx, m_instance_watcher,
7c673cae
FG
522 &m_local_image_ctx, m_local_image_id, m_remote_image.image_id,
523 m_global_image_id, m_threads->work_queue, m_threads->timer,
524 &m_threads->timer_lock, m_local_mirror_uuid, m_remote_image.mirror_uuid,
b32b8144
FG
525 m_remote_journaler, &m_client_state, &m_client_meta, ctx,
526 &m_resync_requested, &m_progress_cxt);
7c673cae
FG
527
528 {
529 Mutex::Locker locker(m_lock);
530 request->get();
531 m_bootstrap_request = request;
532 }
533
534 update_mirror_image_status(false, boost::none);
535 reschedule_update_status_task(10);
536
537 request->send();
538}
539
540template <typename I>
541void ImageReplayer<I>::handle_bootstrap(int r) {
542 dout(20) << "r=" << r << dendl;
543 {
544 Mutex::Locker locker(m_lock);
545 m_bootstrap_request->put();
546 m_bootstrap_request = nullptr;
547 if (m_local_image_ctx) {
548 m_local_image_id = m_local_image_ctx->id;
549 }
550 }
551
552 if (r == -EREMOTEIO) {
553 m_local_image_tag_owner = "";
c07f9fc5
FG
554 dout(5) << "remote image is non-primary" << dendl;
555 on_start_fail(-EREMOTEIO, "remote image is non-primary");
7c673cae
FG
556 return;
557 } else if (r == -EEXIST) {
558 m_local_image_tag_owner = "";
559 on_start_fail(r, "split-brain detected");
560 return;
561 } else if (r < 0) {
562 on_start_fail(r, "error bootstrapping replay");
563 return;
564 } else if (on_start_interrupted()) {
565 return;
d2e6a577
FG
566 } else if (m_resync_requested) {
567 on_start_fail(0, "resync requested");
568 return;
7c673cae
FG
569 }
570
571 assert(m_local_journal == nullptr);
572 {
573 RWLock::RLocker snap_locker(m_local_image_ctx->snap_lock);
574 if (m_local_image_ctx->journal != nullptr) {
575 m_local_journal = m_local_image_ctx->journal;
576 m_local_journal->add_listener(m_journal_listener);
577 }
578 }
579
580 if (m_local_journal == nullptr) {
581 on_start_fail(-EINVAL, "error accessing local journal");
582 return;
583 }
584
3efd9988 585 on_name_changed();
7c673cae
FG
586
587 update_mirror_image_status(false, boost::none);
588 init_remote_journaler();
589}
590
591template <typename I>
592void ImageReplayer<I>::init_remote_journaler() {
593 dout(20) << dendl;
594
595 Context *ctx = create_context_callback<
596 ImageReplayer, &ImageReplayer<I>::handle_init_remote_journaler>(this);
597 m_remote_journaler->init(ctx);
598}
599
600template <typename I>
601void ImageReplayer<I>::handle_init_remote_journaler(int r) {
602 dout(20) << "r=" << r << dendl;
603
604 if (r < 0) {
605 derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl;
606 on_start_fail(r, "error initializing remote journal");
607 return;
608 } else if (on_start_interrupted()) {
609 return;
610 }
611
612 m_remote_journaler->add_listener(&m_remote_listener);
613
614 cls::journal::Client client;
615 r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
616 if (r < 0) {
617 derr << "error retrieving remote journal client: " << cpp_strerror(r)
618 << dendl;
619 on_start_fail(r, "error retrieving remote journal client");
620 return;
621 }
622
b32b8144
FG
623 dout(5) << "image_id=" << m_local_image_id << ", "
624 << "client_meta.image_id=" << m_client_meta.image_id << ", "
625 << "client.state=" << client.state << dendl;
d2e6a577
FG
626 if (m_client_meta.image_id == m_local_image_id &&
627 client.state != cls::journal::CLIENT_STATE_CONNECTED) {
7c673cae
FG
628 dout(5) << "client flagged disconnected, stopping image replay" << dendl;
629 if (m_local_image_ctx->mirroring_resync_after_disconnect) {
d2e6a577
FG
630 m_resync_requested = true;
631 on_start_fail(-ENOTCONN, "disconnected: automatic resync");
632 } else {
633 on_start_fail(-ENOTCONN, "disconnected");
7c673cae 634 }
7c673cae
FG
635 return;
636 }
637
638 start_replay();
639}
640
641template <typename I>
642void ImageReplayer<I>::start_replay() {
643 dout(20) << dendl;
644
645 Context *start_ctx = create_context_callback<
646 ImageReplayer, &ImageReplayer<I>::handle_start_replay>(this);
647 m_local_journal->start_external_replay(&m_local_replay, start_ctx);
648}
649
650template <typename I>
651void ImageReplayer<I>::handle_start_replay(int r) {
652 dout(20) << "r=" << r << dendl;
653
654 if (r < 0) {
655 assert(m_local_replay == nullptr);
656 derr << "error starting external replay on local image "
657 << m_local_image_id << ": " << cpp_strerror(r) << dendl;
658 on_start_fail(r, "error starting replay on local image");
659 return;
660 }
661
662 Context *on_finish(nullptr);
663 {
664 Mutex::Locker locker(m_lock);
665 assert(m_state == STATE_STARTING);
666 m_state = STATE_REPLAYING;
667 std::swap(m_on_start_finish, on_finish);
668 }
669
670 m_event_preprocessor = EventPreprocessor<I>::create(
671 *m_local_image_ctx, *m_remote_journaler, m_local_mirror_uuid,
672 &m_client_meta, m_threads->work_queue);
673 m_replay_status_formatter =
674 ReplayStatusFormatter<I>::create(m_remote_journaler, m_local_mirror_uuid);
675
676 update_mirror_image_status(true, boost::none);
677 reschedule_update_status_task(30);
678
7c673cae
FG
679 if (on_replay_interrupted()) {
680 return;
681 }
682
683 {
684 CephContext *cct = static_cast<CephContext *>(m_local->cct());
181888fb
FG
685 double poll_seconds = cct->_conf->get_val<double>(
686 "rbd_mirror_journal_poll_age");
7c673cae
FG
687
688 Mutex::Locker locker(m_lock);
689 m_replay_handler = new ReplayHandler<I>(this);
690 m_remote_journaler->start_live_replay(m_replay_handler, poll_seconds);
691
692 dout(20) << "m_remote_journaler=" << *m_remote_journaler << dendl;
693 }
694
d2e6a577
FG
695 dout(20) << "start succeeded" << dendl;
696 if (on_finish != nullptr) {
697 dout(20) << "on finish complete, r=" << r << dendl;
698 on_finish->complete(r);
699 }
7c673cae
FG
700}
701
702template <typename I>
703void ImageReplayer<I>::on_start_fail(int r, const std::string &desc)
704{
705 dout(20) << "r=" << r << dendl;
706 Context *ctx = new FunctionContext([this, r, desc](int _r) {
707 {
708 Mutex::Locker locker(m_lock);
709 assert(m_state == STATE_STARTING);
710 m_state = STATE_STOPPING;
d2e6a577 711 if (r < 0 && r != -ECANCELED && r != -EREMOTEIO && r != -ENOENT) {
7c673cae
FG
712 derr << "start failed: " << cpp_strerror(r) << dendl;
713 } else {
714 dout(20) << "start canceled" << dendl;
715 }
716 }
717
718 set_state_description(r, desc);
719 update_mirror_image_status(false, boost::none);
720 reschedule_update_status_task(-1);
721 shut_down(r);
722 });
723 m_threads->work_queue->queue(ctx, 0);
724}
725
726template <typename I>
727bool ImageReplayer<I>::on_start_interrupted()
728{
729 Mutex::Locker locker(m_lock);
730 assert(m_state == STATE_STARTING);
731 if (m_on_stop_finish == nullptr) {
732 return false;
733 }
734
735 on_start_fail(-ECANCELED);
736 return true;
737}
738
739template <typename I>
740void ImageReplayer<I>::stop(Context *on_finish, bool manual, int r,
741 const std::string& desc)
742{
743 dout(20) << "on_finish=" << on_finish << ", manual=" << manual
744 << ", desc=" << desc << dendl;
745
d2e6a577
FG
746 m_image_deleter->cancel_waiter(m_local_pool_id, m_global_image_id);
747
7c673cae
FG
748 image_replayer::BootstrapRequest<I> *bootstrap_request = nullptr;
749 bool shut_down_replay = false;
750 bool running = true;
7c673cae
FG
751 {
752 Mutex::Locker locker(m_lock);
753
754 if (!is_running_()) {
755 running = false;
756 } else {
757 if (!is_stopped_()) {
758 if (m_state == STATE_STARTING) {
759 dout(20) << "canceling start" << dendl;
760 if (m_bootstrap_request) {
761 bootstrap_request = m_bootstrap_request;
762 bootstrap_request->get();
763 }
764 } else {
765 dout(20) << "interrupting replay" << dendl;
766 shut_down_replay = true;
767 }
768
769 assert(m_on_stop_finish == nullptr);
770 std::swap(m_on_stop_finish, on_finish);
771 m_stop_requested = true;
772 m_manual_stop = manual;
7c673cae
FG
773 }
774 }
775 }
776
777 // avoid holding lock since bootstrap request will update status
778 if (bootstrap_request != nullptr) {
779 bootstrap_request->cancel();
780 bootstrap_request->put();
781 }
782
7c673cae
FG
783 if (!running) {
784 dout(20) << "not running" << dendl;
785 if (on_finish) {
786 on_finish->complete(-EINVAL);
787 }
788 return;
789 }
790
791 if (shut_down_replay) {
792 on_stop_journal_replay(r, desc);
793 } else if (on_finish != nullptr) {
794 on_finish->complete(0);
795 }
796}
797
798template <typename I>
799void ImageReplayer<I>::on_stop_journal_replay(int r, const std::string &desc)
800{
801 dout(20) << "enter" << dendl;
802
803 {
804 Mutex::Locker locker(m_lock);
805 if (m_state != STATE_REPLAYING) {
806 // might be invoked multiple times while stopping
807 return;
808 }
809 m_stop_requested = true;
810 m_state = STATE_STOPPING;
811 }
812
813 set_state_description(r, desc);
814 update_mirror_image_status(false, boost::none);
815 reschedule_update_status_task(-1);
816 shut_down(0);
817}
818
819template <typename I>
820void ImageReplayer<I>::handle_replay_ready()
821{
822 dout(20) << "enter" << dendl;
823 if (on_replay_interrupted()) {
824 return;
825 }
826
827 if (!m_remote_journaler->try_pop_front(&m_replay_entry, &m_replay_tag_tid)) {
828 return;
829 }
830
831 m_event_replay_tracker.start_op();
31f18b77
FG
832
833 m_lock.Lock();
834 bool stopping = (m_state == STATE_STOPPING);
835 m_lock.Unlock();
836
837 if (stopping) {
838 dout(10) << "stopping event replay" << dendl;
839 m_event_replay_tracker.finish_op();
840 return;
841 }
842
7c673cae
FG
843 if (m_replay_tag_valid && m_replay_tag.tid == m_replay_tag_tid) {
844 preprocess_entry();
845 return;
846 }
847
848 replay_flush();
849}
850
851template <typename I>
852void ImageReplayer<I>::restart(Context *on_finish)
853{
854 FunctionContext *ctx = new FunctionContext(
855 [this, on_finish](int r) {
856 if (r < 0) {
857 // Try start anyway.
858 }
859 start(on_finish, true);
860 });
861 stop(ctx);
862}
863
864template <typename I>
865void ImageReplayer<I>::flush(Context *on_finish)
866{
867 dout(20) << "enter" << dendl;
868
869 {
870 Mutex::Locker locker(m_lock);
871 if (m_state == STATE_REPLAYING) {
872 Context *ctx = new FunctionContext(
873 [on_finish](int r) {
874 if (on_finish != nullptr) {
875 on_finish->complete(r);
876 }
877 });
878 on_flush_local_replay_flush_start(ctx);
879 return;
880 }
881 }
882
883 if (on_finish) {
884 on_finish->complete(0);
885 }
886}
887
888template <typename I>
889void ImageReplayer<I>::on_flush_local_replay_flush_start(Context *on_flush)
890{
891 dout(20) << "enter" << dendl;
892 FunctionContext *ctx = new FunctionContext(
893 [this, on_flush](int r) {
894 on_flush_local_replay_flush_finish(on_flush, r);
895 });
896
897 assert(m_lock.is_locked());
898 assert(m_state == STATE_REPLAYING);
899 m_local_replay->flush(ctx);
900}
901
902template <typename I>
903void ImageReplayer<I>::on_flush_local_replay_flush_finish(Context *on_flush,
904 int r)
905{
906 dout(20) << "r=" << r << dendl;
907 if (r < 0) {
908 derr << "error flushing local replay: " << cpp_strerror(r) << dendl;
909 on_flush->complete(r);
910 return;
911 }
912
913 on_flush_flush_commit_position_start(on_flush);
914}
915
916template <typename I>
917void ImageReplayer<I>::on_flush_flush_commit_position_start(Context *on_flush)
918{
919 FunctionContext *ctx = new FunctionContext(
920 [this, on_flush](int r) {
921 on_flush_flush_commit_position_finish(on_flush, r);
922 });
923
924 m_remote_journaler->flush_commit_position(ctx);
925}
926
927template <typename I>
928void ImageReplayer<I>::on_flush_flush_commit_position_finish(Context *on_flush,
929 int r)
930{
931 if (r < 0) {
932 derr << "error flushing remote journal commit position: "
933 << cpp_strerror(r) << dendl;
934 }
935
936 update_mirror_image_status(false, boost::none);
937
938 dout(20) << "flush complete, r=" << r << dendl;
939 on_flush->complete(r);
940}
941
942template <typename I>
943bool ImageReplayer<I>::on_replay_interrupted()
944{
945 bool shut_down;
946 {
947 Mutex::Locker locker(m_lock);
948 shut_down = m_stop_requested;
949 }
950
951 if (shut_down) {
952 on_stop_journal_replay();
953 }
954 return shut_down;
955}
956
957template <typename I>
958void ImageReplayer<I>::print_status(Formatter *f, stringstream *ss)
959{
960 dout(20) << "enter" << dendl;
961
962 Mutex::Locker l(m_lock);
963
964 if (f) {
965 f->open_object_section("image_replayer");
966 f->dump_string("name", m_name);
967 f->dump_string("state", to_string(m_state));
968 f->close_section();
969 f->flush(*ss);
970 } else {
971 *ss << m_name << ": state: " << to_string(m_state);
972 }
973}
974
975template <typename I>
976void ImageReplayer<I>::handle_replay_complete(int r, const std::string &error_desc)
977{
978 dout(20) << "r=" << r << dendl;
979 if (r < 0) {
980 derr << "replay encountered an error: " << cpp_strerror(r) << dendl;
981 set_state_description(r, error_desc);
982 }
983
984 {
985 Mutex::Locker locker(m_lock);
986 m_stop_requested = true;
987 }
988 on_replay_interrupted();
989}
990
991template <typename I>
992void ImageReplayer<I>::replay_flush() {
993 dout(20) << dendl;
994
995 bool interrupted = false;
996 {
997 Mutex::Locker locker(m_lock);
998 if (m_state != STATE_REPLAYING) {
999 dout(20) << "replay interrupted" << dendl;
1000 interrupted = true;
1001 } else {
1002 m_state = STATE_REPLAY_FLUSHING;
1003 }
1004 }
1005
1006 if (interrupted) {
1007 m_event_replay_tracker.finish_op();
1008 return;
1009 }
1010
1011 // shut down the replay to flush all IO and ops and create a new
1012 // replayer to handle the new tag epoch
1013 Context *ctx = create_context_callback<
1014 ImageReplayer<I>, &ImageReplayer<I>::handle_replay_flush>(this);
1015 ctx = new FunctionContext([this, ctx](int r) {
1016 m_local_image_ctx->journal->stop_external_replay();
1017 m_local_replay = nullptr;
1018
1019 if (r < 0) {
1020 ctx->complete(r);
1021 return;
1022 }
1023
1024 m_local_journal->start_external_replay(&m_local_replay, ctx);
1025 });
1026 m_local_replay->shut_down(false, ctx);
1027}
1028
1029template <typename I>
1030void ImageReplayer<I>::handle_replay_flush(int r) {
1031 dout(20) << "r=" << r << dendl;
1032
1033 {
1034 Mutex::Locker locker(m_lock);
1035 assert(m_state == STATE_REPLAY_FLUSHING);
1036 m_state = STATE_REPLAYING;
1037 }
1038
1039 if (r < 0) {
1040 derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl;
1041 m_event_replay_tracker.finish_op();
1042 handle_replay_complete(r, "replay flush encountered an error");
1043 return;
1044 } else if (on_replay_interrupted()) {
1045 m_event_replay_tracker.finish_op();
1046 return;
1047 }
1048
1049 get_remote_tag();
1050}
1051
1052template <typename I>
1053void ImageReplayer<I>::get_remote_tag() {
1054 dout(20) << "tag_tid: " << m_replay_tag_tid << dendl;
1055
1056 Context *ctx = create_context_callback<
1057 ImageReplayer, &ImageReplayer<I>::handle_get_remote_tag>(this);
1058 m_remote_journaler->get_tag(m_replay_tag_tid, &m_replay_tag, ctx);
1059}
1060
1061template <typename I>
1062void ImageReplayer<I>::handle_get_remote_tag(int r) {
1063 dout(20) << "r=" << r << dendl;
1064
1065 if (r == 0) {
1066 try {
1067 bufferlist::iterator it = m_replay_tag.data.begin();
1068 ::decode(m_replay_tag_data, it);
1069 } catch (const buffer::error &err) {
1070 r = -EBADMSG;
1071 }
1072 }
1073
1074 if (r < 0) {
1075 derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": "
1076 << cpp_strerror(r) << dendl;
1077 m_event_replay_tracker.finish_op();
1078 handle_replay_complete(r, "failed to retrieve remote tag");
1079 return;
1080 }
1081
1082 m_replay_tag_valid = true;
1083 dout(20) << "decoded remote tag " << m_replay_tag_tid << ": "
1084 << m_replay_tag_data << dendl;
1085
1086 allocate_local_tag();
1087}
1088
1089template <typename I>
1090void ImageReplayer<I>::allocate_local_tag() {
1091 dout(20) << dendl;
1092
1093 std::string mirror_uuid = m_replay_tag_data.mirror_uuid;
1094 if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID ||
1095 mirror_uuid == m_local_mirror_uuid) {
1096 mirror_uuid = m_remote_image.mirror_uuid;
1097 } else if (mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) {
1098 dout(5) << "encountered image demotion: stopping" << dendl;
1099 Mutex::Locker locker(m_lock);
1100 m_stop_requested = true;
1101 }
1102
1103 librbd::journal::TagPredecessor predecessor(m_replay_tag_data.predecessor);
1104 if (predecessor.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
1105 predecessor.mirror_uuid = m_remote_image.mirror_uuid;
1106 } else if (predecessor.mirror_uuid == m_local_mirror_uuid) {
1107 predecessor.mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
1108 }
1109
1110 dout(20) << "mirror_uuid=" << mirror_uuid << ", "
1111 << "predecessor_mirror_uuid=" << predecessor.mirror_uuid << ", "
1112 << "replay_tag_tid=" << m_replay_tag_tid << ", "
1113 << "replay_tag_data=" << m_replay_tag_data << dendl;
1114 Context *ctx = create_context_callback<
1115 ImageReplayer, &ImageReplayer<I>::handle_allocate_local_tag>(this);
1116 m_local_journal->allocate_tag(mirror_uuid, predecessor, ctx);
1117}
1118
1119template <typename I>
1120void ImageReplayer<I>::handle_allocate_local_tag(int r) {
1121 dout(20) << "r=" << r << dendl;
1122
1123 if (r < 0) {
1124 derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl;
1125 m_event_replay_tracker.finish_op();
1126 handle_replay_complete(r, "failed to allocate journal tag");
1127 return;
1128 }
1129
1130 preprocess_entry();
1131}
1132
1133template <typename I>
1134void ImageReplayer<I>::preprocess_entry() {
1135 dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid()
1136 << dendl;
1137
1138 bufferlist data = m_replay_entry.get_data();
1139 bufferlist::iterator it = data.begin();
1140 int r = m_local_replay->decode(&it, &m_event_entry);
1141 if (r < 0) {
1142 derr << "failed to decode journal event" << dendl;
1143 m_event_replay_tracker.finish_op();
1144 handle_replay_complete(r, "failed to decode journal event");
1145 return;
1146 }
1147
1148 uint32_t delay = calculate_replay_delay(
1149 m_event_entry.timestamp, m_local_image_ctx->mirroring_replay_delay);
1150 if (delay == 0) {
1151 handle_preprocess_entry_ready(0);
1152 return;
1153 }
1154
1155 dout(20) << "delaying replay by " << delay << " sec" << dendl;
1156
1157 Mutex::Locker timer_locker(m_threads->timer_lock);
1158 assert(m_delayed_preprocess_task == nullptr);
1159 m_delayed_preprocess_task = new FunctionContext(
1160 [this](int r) {
1161 assert(m_threads->timer_lock.is_locked());
1162 m_delayed_preprocess_task = nullptr;
1163 m_threads->work_queue->queue(
1164 create_context_callback<ImageReplayer,
1165 &ImageReplayer<I>::handle_preprocess_entry_ready>(this), 0);
1166 });
1167 m_threads->timer->add_event_after(delay, m_delayed_preprocess_task);
1168}
1169
1170template <typename I>
1171void ImageReplayer<I>::handle_preprocess_entry_ready(int r) {
1172 dout(20) << "r=" << r << dendl;
1173 assert(r == 0);
1174
1175 if (!m_event_preprocessor->is_required(m_event_entry)) {
1176 process_entry();
1177 return;
1178 }
1179
1180 Context *ctx = create_context_callback<
1181 ImageReplayer, &ImageReplayer<I>::handle_preprocess_entry_safe>(this);
1182 m_event_preprocessor->preprocess(&m_event_entry, ctx);
1183}
1184
1185template <typename I>
1186void ImageReplayer<I>::handle_preprocess_entry_safe(int r) {
1187 dout(20) << "r=" << r << dendl;
1188
1189 if (r < 0) {
7c673cae 1190 m_event_replay_tracker.finish_op();
31f18b77
FG
1191
1192 if (r == -ECANCELED) {
1193 handle_replay_complete(0, "lost exclusive lock");
1194 } else {
1195 derr << "failed to preprocess journal event" << dendl;
1196 handle_replay_complete(r, "failed to preprocess journal event");
1197 }
7c673cae
FG
1198 return;
1199 }
1200
1201 process_entry();
1202}
1203
1204template <typename I>
1205void ImageReplayer<I>::process_entry() {
1206 dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid()
1207 << dendl;
1208
1209 // stop replaying events if stop has been requested
1210 if (on_replay_interrupted()) {
1211 m_event_replay_tracker.finish_op();
1212 return;
1213 }
1214
1215 Context *on_ready = create_context_callback<
1216 ImageReplayer, &ImageReplayer<I>::handle_process_entry_ready>(this);
1217 Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry));
1218
1219 m_local_replay->process(m_event_entry, on_ready, on_commit);
1220}
1221
1222template <typename I>
1223void ImageReplayer<I>::handle_process_entry_ready(int r) {
1224 dout(20) << dendl;
1225 assert(r == 0);
1226
3efd9988
FG
1227 on_name_changed();
1228
7c673cae
FG
1229 // attempt to process the next event
1230 handle_replay_ready();
1231}
1232
1233template <typename I>
1234void ImageReplayer<I>::handle_process_entry_safe(const ReplayEntry& replay_entry,
1235 int r) {
1236 dout(20) << "commit_tid=" << replay_entry.get_commit_tid() << ", r=" << r
1237 << dendl;
1238
1239 if (r < 0) {
1240 derr << "failed to commit journal event: " << cpp_strerror(r) << dendl;
1241 handle_replay_complete(r, "failed to commit journal event");
1242 } else {
1243 assert(m_remote_journaler != nullptr);
1244 m_remote_journaler->committed(replay_entry);
1245 }
1246 m_event_replay_tracker.finish_op();
1247}
1248
1249template <typename I>
1250bool ImageReplayer<I>::update_mirror_image_status(bool force,
1251 const OptionalState &state) {
1252 dout(20) << dendl;
1253 {
1254 Mutex::Locker locker(m_lock);
1255 if (!start_mirror_image_status_update(force, false)) {
1256 return false;
1257 }
1258 }
1259
1260 queue_mirror_image_status_update(state);
1261 return true;
1262}
1263
1264template <typename I>
1265bool ImageReplayer<I>::start_mirror_image_status_update(bool force,
1266 bool restarting) {
1267 assert(m_lock.is_locked());
1268
1269 if (!force && !is_stopped_()) {
1270 if (!is_running_()) {
1271 dout(20) << "shut down in-progress: ignoring update" << dendl;
1272 return false;
1273 } else if (m_in_flight_status_updates > (restarting ? 1 : 0)) {
1274 dout(20) << "already sending update" << dendl;
1275 m_update_status_requested = true;
1276 return false;
1277 }
1278 }
1279
1280 dout(20) << dendl;
1281 ++m_in_flight_status_updates;
1282 return true;
1283}
1284
1285template <typename I>
1286void ImageReplayer<I>::finish_mirror_image_status_update() {
1287 Context *on_finish = nullptr;
1288 {
1289 Mutex::Locker locker(m_lock);
1290 assert(m_in_flight_status_updates > 0);
1291 if (--m_in_flight_status_updates > 0) {
1292 dout(20) << "waiting on " << m_in_flight_status_updates << " in-flight "
1293 << "updates" << dendl;
1294 return;
1295 }
1296
1297 std::swap(on_finish, m_on_update_status_finish);
1298 }
1299
1300 dout(20) << dendl;
1301 if (on_finish != nullptr) {
1302 on_finish->complete(0);
1303 }
1304}
1305
1306template <typename I>
1307void ImageReplayer<I>::queue_mirror_image_status_update(const OptionalState &state) {
1308 dout(20) << dendl;
1309 FunctionContext *ctx = new FunctionContext(
1310 [this, state](int r) {
1311 send_mirror_status_update(state);
1312 });
1313 m_threads->work_queue->queue(ctx, 0);
1314}
1315
1316template <typename I>
1317void ImageReplayer<I>::send_mirror_status_update(const OptionalState &opt_state) {
1318 State state;
1319 std::string state_desc;
1320 int last_r;
7c673cae 1321 bool stopping_replay;
c07f9fc5
FG
1322
1323 OptionalMirrorImageStatusState mirror_image_status_state{
1324 boost::make_optional(false, cls::rbd::MirrorImageStatusState{})};
1325 image_replayer::BootstrapRequest<I>* bootstrap_request = nullptr;
7c673cae
FG
1326 {
1327 Mutex::Locker locker(m_lock);
1328 state = m_state;
1329 state_desc = m_state_desc;
c07f9fc5 1330 mirror_image_status_state = m_mirror_image_status_state;
7c673cae 1331 last_r = m_last_r;
7c673cae 1332 stopping_replay = (m_local_image_ctx != nullptr);
c07f9fc5
FG
1333
1334 if (m_bootstrap_request != nullptr) {
1335 bootstrap_request = m_bootstrap_request;
1336 bootstrap_request->get();
1337 }
1338 }
1339
1340 bool syncing = false;
1341 if (bootstrap_request != nullptr) {
1342 syncing = bootstrap_request->is_syncing();
1343 bootstrap_request->put();
1344 bootstrap_request = nullptr;
7c673cae
FG
1345 }
1346
1347 if (opt_state) {
1348 state = *opt_state;
1349 }
1350
1351 cls::rbd::MirrorImageStatus status;
1352 status.up = true;
1353 switch (state) {
1354 case STATE_STARTING:
c07f9fc5 1355 if (syncing) {
7c673cae
FG
1356 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING;
1357 status.description = state_desc.empty() ? "syncing" : state_desc;
c07f9fc5 1358 mirror_image_status_state = status.state;
7c673cae
FG
1359 } else {
1360 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY;
1361 status.description = "starting replay";
1362 }
1363 break;
1364 case STATE_REPLAYING:
1365 case STATE_REPLAY_FLUSHING:
1366 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
1367 {
1368 Context *on_req_finish = new FunctionContext(
1369 [this](int r) {
1370 dout(20) << "replay status ready: r=" << r << dendl;
1371 if (r >= 0) {
1372 send_mirror_status_update(boost::none);
1373 } else if (r == -EAGAIN) {
1374 // decrement in-flight status update counter
1375 handle_mirror_status_update(r);
1376 }
1377 });
1378
1379 std::string desc;
1380 if (!m_replay_status_formatter->get_or_send_update(&desc,
1381 on_req_finish)) {
1382 dout(20) << "waiting for replay status" << dendl;
1383 return;
1384 }
1385 status.description = "replaying, " + desc;
c07f9fc5 1386 mirror_image_status_state = boost::none;
7c673cae
FG
1387 }
1388 break;
1389 case STATE_STOPPING:
1390 if (stopping_replay) {
1391 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY;
1392 status.description = "stopping replay";
1393 break;
1394 }
1395 // FALLTHROUGH
1396 case STATE_STOPPED:
c07f9fc5
FG
1397 if (last_r == -EREMOTEIO) {
1398 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN;
1399 status.description = state_desc;
1400 mirror_image_status_state = status.state;
1401 } else if (last_r < 0) {
7c673cae
FG
1402 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR;
1403 status.description = state_desc;
c07f9fc5 1404 mirror_image_status_state = status.state;
7c673cae
FG
1405 } else {
1406 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED;
1407 status.description = state_desc.empty() ? "stopped" : state_desc;
c07f9fc5 1408 mirror_image_status_state = boost::none;
7c673cae
FG
1409 }
1410 break;
1411 default:
1412 assert(!"invalid state");
1413 }
1414
c07f9fc5
FG
1415 {
1416 Mutex::Locker locker(m_lock);
1417 m_mirror_image_status_state = mirror_image_status_state;
1418 }
1419
1420 // prevent the status from ping-ponging when failed replays are restarted
1421 if (mirror_image_status_state &&
1422 *mirror_image_status_state == cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR) {
1423 status.state = *mirror_image_status_state;
1424 }
1425
7c673cae
FG
1426 dout(20) << "status=" << status << dendl;
1427 librados::ObjectWriteOperation op;
1428 librbd::cls_client::mirror_image_status_set(&op, m_global_image_id, status);
1429
1430 librados::AioCompletion *aio_comp = create_rados_callback<
1431 ImageReplayer<I>, &ImageReplayer<I>::handle_mirror_status_update>(this);
1432 int r = m_local_ioctx.aio_operate(RBD_MIRRORING, aio_comp, &op);
1433 assert(r == 0);
1434 aio_comp->release();
1435}
1436
1437template <typename I>
1438void ImageReplayer<I>::handle_mirror_status_update(int r) {
1439 dout(20) << "r=" << r << dendl;
1440
1441 bool running = false;
1442 bool started = false;
1443 {
1444 Mutex::Locker locker(m_lock);
1445 bool update_status_requested = false;
1446 std::swap(update_status_requested, m_update_status_requested);
1447
1448 running = is_running_();
1449 if (running && update_status_requested) {
1450 started = start_mirror_image_status_update(false, true);
1451 }
1452 }
1453
1454 // if a deferred update is available, send it -- otherwise reschedule
1455 // the timer task
1456 if (started) {
1457 queue_mirror_image_status_update(boost::none);
1458 } else if (running) {
1459 reschedule_update_status_task();
1460 }
1461
1462 // mark committed status update as no longer in-flight
1463 finish_mirror_image_status_update();
1464}
1465
1466template <typename I>
1467void ImageReplayer<I>::reschedule_update_status_task(int new_interval) {
1468 dout(20) << dendl;
1469
1470 bool canceled_task = false;
1471 {
1472 Mutex::Locker locker(m_lock);
1473 Mutex::Locker timer_locker(m_threads->timer_lock);
1474
1475 if (m_update_status_task) {
1476 canceled_task = m_threads->timer->cancel_event(m_update_status_task);
1477 m_update_status_task = nullptr;
1478 }
1479
1480 if (new_interval > 0) {
1481 m_update_status_interval = new_interval;
1482 }
1483
1484 bool restarting = (new_interval == 0 || canceled_task);
1485 if (new_interval >= 0 && is_running_() &&
1486 start_mirror_image_status_update(false, restarting)) {
1487 m_update_status_task = new FunctionContext(
1488 [this](int r) {
1489 assert(m_threads->timer_lock.is_locked());
1490 m_update_status_task = nullptr;
1491
1492 queue_mirror_image_status_update(boost::none);
1493 });
1494 m_threads->timer->add_event_after(m_update_status_interval,
1495 m_update_status_task);
1496 }
1497 }
1498
1499 if (canceled_task) {
1500 dout(20) << "canceled task" << dendl;
1501 finish_mirror_image_status_update();
1502 }
1503}
1504
1505template <typename I>
1506void ImageReplayer<I>::shut_down(int r) {
1507 dout(20) << "r=" << r << dendl;
3efd9988
FG
1508
1509 bool canceled_delayed_preprocess_task = false;
1510 {
1511 Mutex::Locker timer_locker(m_threads->timer_lock);
1512 if (m_delayed_preprocess_task != nullptr) {
1513 canceled_delayed_preprocess_task = m_threads->timer->cancel_event(
1514 m_delayed_preprocess_task);
1515 assert(canceled_delayed_preprocess_task);
1516 m_delayed_preprocess_task = nullptr;
1517 }
1518 }
1519 if (canceled_delayed_preprocess_task) {
1520 // wake up sleeping replay
1521 m_event_replay_tracker.finish_op();
1522 }
1523
7c673cae
FG
1524 {
1525 Mutex::Locker locker(m_lock);
1526 assert(m_state == STATE_STOPPING);
1527
1528 // if status updates are in-flight, wait for them to complete
1529 // before proceeding
1530 if (m_in_flight_status_updates > 0) {
1531 if (m_on_update_status_finish == nullptr) {
1532 dout(20) << "waiting for in-flight status update" << dendl;
1533 m_on_update_status_finish = new FunctionContext(
1534 [this, r](int _r) {
1535 shut_down(r);
1536 });
1537 }
1538 return;
1539 }
1540 }
1541
31f18b77
FG
1542 // NOTE: it's important to ensure that the local image is fully
1543 // closed before attempting to close the remote journal in
1544 // case the remote cluster is unreachable
1545
7c673cae
FG
1546 // chain the shut down sequence (reverse order)
1547 Context *ctx = new FunctionContext(
1548 [this, r](int _r) {
1549 update_mirror_image_status(true, STATE_STOPPED);
1550 handle_shut_down(r);
1551 });
31f18b77
FG
1552
1553 // close the remote journal
7c673cae
FG
1554 if (m_remote_journaler != nullptr) {
1555 ctx = new FunctionContext([this, ctx](int r) {
1556 delete m_remote_journaler;
1557 m_remote_journaler = nullptr;
1558 ctx->complete(0);
1559 });
1560 ctx = new FunctionContext([this, ctx](int r) {
1561 m_remote_journaler->remove_listener(&m_remote_listener);
1562 m_remote_journaler->shut_down(ctx);
1563 });
7c673cae 1564 }
31f18b77
FG
1565
1566 // stop the replay of remote journal events
1567 if (m_replay_handler != nullptr) {
1568 ctx = new FunctionContext([this, ctx](int r) {
1569 delete m_replay_handler;
1570 m_replay_handler = nullptr;
1571
1572 m_event_replay_tracker.wait_for_ops(ctx);
1573 });
1574 ctx = new FunctionContext([this, ctx](int r) {
1575 m_remote_journaler->stop_replay(ctx);
1576 });
1577 }
1578
1579 // close the local image (release exclusive lock)
1580 if (m_local_image_ctx) {
1581 ctx = new FunctionContext([this, ctx](int r) {
1582 CloseImageRequest<I> *request = CloseImageRequest<I>::create(
1583 &m_local_image_ctx, ctx);
1584 request->send();
1585 });
1586 }
1587
1588 // shut down event replay into the local image
7c673cae
FG
1589 if (m_local_journal != nullptr) {
1590 ctx = new FunctionContext([this, ctx](int r) {
1591 m_local_journal = nullptr;
1592 ctx->complete(0);
1593 });
1594 if (m_local_replay != nullptr) {
1595 ctx = new FunctionContext([this, ctx](int r) {
1596 m_local_journal->stop_external_replay();
1597 m_local_replay = nullptr;
1598
1599 EventPreprocessor<I>::destroy(m_event_preprocessor);
1600 m_event_preprocessor = nullptr;
1601 ctx->complete(0);
1602 });
1603 }
1604 ctx = new FunctionContext([this, ctx](int r) {
7c673cae
FG
1605 // blocks if listener notification is in-progress
1606 m_local_journal->remove_listener(m_journal_listener);
7c673cae
FG
1607 ctx->complete(0);
1608 });
31f18b77
FG
1609 }
1610
1611 // wait for all local in-flight replay events to complete
1612 ctx = new FunctionContext([this, ctx](int r) {
1613 if (r < 0) {
1614 derr << "error shutting down journal replay: " << cpp_strerror(r)
1615 << dendl;
1616 }
1617
1618 m_event_replay_tracker.wait_for_ops(ctx);
1619 });
1620
1621 // flush any local in-flight replay events
1622 if (m_local_replay != nullptr) {
7c673cae 1623 ctx = new FunctionContext([this, ctx](int r) {
31f18b77 1624 m_local_replay->shut_down(true, ctx);
7c673cae
FG
1625 });
1626 }
31f18b77 1627
7c673cae
FG
1628 m_threads->work_queue->queue(ctx, 0);
1629}
1630
1631template <typename I>
1632void ImageReplayer<I>::handle_shut_down(int r) {
1633 reschedule_update_status_task(-1);
1634
3efd9988 1635 bool unregister_asok_hook = false;
7c673cae
FG
1636 {
1637 Mutex::Locker locker(m_lock);
1638
1639 // if status updates are in-flight, wait for them to complete
1640 // before proceeding
1641 if (m_in_flight_status_updates > 0) {
1642 if (m_on_update_status_finish == nullptr) {
1643 dout(20) << "waiting for in-flight status update" << dendl;
1644 m_on_update_status_finish = new FunctionContext(
1645 [this, r](int _r) {
1646 handle_shut_down(r);
1647 });
1648 }
1649 return;
1650 }
1651
d2e6a577
FG
1652 bool delete_requested = false;
1653 if (m_delete_requested && !m_local_image_id.empty()) {
1654 assert(m_remote_image.image_id.empty());
1655 dout(0) << "remote image no longer exists: scheduling deletion" << dendl;
1656 delete_requested = true;
1657 }
1658 if (delete_requested || m_resync_requested) {
7c673cae
FG
1659 m_image_deleter->schedule_image_delete(m_local,
1660 m_local_pool_id,
c07f9fc5 1661 m_global_image_id,
d2e6a577
FG
1662 m_resync_requested);
1663
1664 m_local_image_id = "";
1665 m_resync_requested = false;
1666 if (m_delete_requested) {
3efd9988 1667 unregister_asok_hook = true;
d2e6a577
FG
1668 m_delete_requested = false;
1669 }
1670 } else if (m_last_r == -ENOENT &&
1671 m_local_image_id.empty() && m_remote_image.image_id.empty()) {
1672 dout(0) << "mirror image no longer exists" << dendl;
3efd9988 1673 unregister_asok_hook = true;
d2e6a577 1674 m_finished = true;
7c673cae
FG
1675 }
1676 }
1677
3efd9988
FG
1678 if (unregister_asok_hook) {
1679 unregister_admin_socket_hook();
1680 }
1681
7c673cae
FG
1682 dout(20) << "stop complete" << dendl;
1683 m_local_ioctx.close();
1684
1685 ReplayStatusFormatter<I>::destroy(m_replay_status_formatter);
1686 m_replay_status_formatter = nullptr;
1687
1688 Context *on_start = nullptr;
1689 Context *on_stop = nullptr;
1690 {
1691 Mutex::Locker locker(m_lock);
1692 std::swap(on_start, m_on_start_finish);
1693 std::swap(on_stop, m_on_stop_finish);
1694 m_stop_requested = false;
1695 assert(m_delayed_preprocess_task == nullptr);
1696 assert(m_state == STATE_STOPPING);
1697 m_state = STATE_STOPPED;
1698 }
1699
1700 if (on_start != nullptr) {
1701 dout(20) << "on start finish complete, r=" << r << dendl;
1702 on_start->complete(r);
1703 r = 0;
1704 }
1705 if (on_stop != nullptr) {
1706 dout(20) << "on stop finish complete, r=" << r << dendl;
1707 on_stop->complete(r);
1708 }
1709}
1710
1711template <typename I>
1712void ImageReplayer<I>::handle_remote_journal_metadata_updated() {
1713 dout(20) << dendl;
1714
1715 cls::journal::Client client;
1716 {
1717 Mutex::Locker locker(m_lock);
1718 if (!is_running_()) {
1719 return;
1720 }
1721
1722 int r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
1723 if (r < 0) {
1724 derr << "failed to retrieve client: " << cpp_strerror(r) << dendl;
1725 return;
1726 }
1727 }
1728
1729 if (client.state != cls::journal::CLIENT_STATE_CONNECTED) {
1730 dout(0) << "client flagged disconnected, stopping image replay" << dendl;
1731 stop(nullptr, false, -ENOTCONN, "disconnected");
1732 }
1733}
1734
1735template <typename I>
1736std::string ImageReplayer<I>::to_string(const State state) {
1737 switch (state) {
1738 case ImageReplayer<I>::STATE_STARTING:
1739 return "Starting";
1740 case ImageReplayer<I>::STATE_REPLAYING:
1741 return "Replaying";
1742 case ImageReplayer<I>::STATE_REPLAY_FLUSHING:
1743 return "ReplayFlushing";
1744 case ImageReplayer<I>::STATE_STOPPING:
1745 return "Stopping";
1746 case ImageReplayer<I>::STATE_STOPPED:
1747 return "Stopped";
1748 default:
1749 break;
1750 }
1751 return "Unknown(" + stringify(state) + ")";
1752}
1753
1754template <typename I>
1755void ImageReplayer<I>::resync_image(Context *on_finish) {
1756 dout(20) << dendl;
1757
d2e6a577
FG
1758 m_resync_requested = true;
1759 stop(on_finish);
1760}
1761
1762template <typename I>
1763void ImageReplayer<I>::register_admin_socket_hook() {
3efd9988
FG
1764 ImageReplayerAdminSocketHook<I> *asok_hook;
1765 {
1766 Mutex::Locker locker(m_lock);
1767 if (m_asok_hook != nullptr) {
1768 return;
1769 }
7c673cae 1770
3efd9988
FG
1771 dout(20) << "registered asok hook: " << m_name << dendl;
1772 asok_hook = new ImageReplayerAdminSocketHook<I>(g_ceph_context, m_name,
1773 this);
1774 int r = asok_hook->register_commands();
1775 if (r == 0) {
1776 m_asok_hook = asok_hook;
1777 return;
1778 }
d2e6a577 1779 derr << "error registering admin socket commands" << dendl;
d2e6a577 1780 }
3efd9988 1781 delete asok_hook;
d2e6a577
FG
1782}
1783
1784template <typename I>
1785void ImageReplayer<I>::unregister_admin_socket_hook() {
1786 dout(20) << dendl;
1787
3efd9988
FG
1788 AdminSocketHook *asok_hook = nullptr;
1789 {
1790 Mutex::Locker locker(m_lock);
1791 std::swap(asok_hook, m_asok_hook);
1792 }
1793 delete asok_hook;
1794}
1795
1796template <typename I>
1797void ImageReplayer<I>::on_name_changed() {
1798 {
1799 Mutex::Locker locker(m_lock);
1800 std::string name = m_local_ioctx.get_pool_name() + "/" +
1801 m_local_image_ctx->name;
1802 if (m_name == name) {
1803 return;
1804 }
1805 m_name = name;
1806 }
1807 unregister_admin_socket_hook();
1808 register_admin_socket_hook();
7c673cae
FG
1809}
1810
1811template <typename I>
1812std::ostream &operator<<(std::ostream &os, const ImageReplayer<I> &replayer)
1813{
1814 os << "ImageReplayer: " << &replayer << " [" << replayer.get_local_pool_id()
1815 << "/" << replayer.get_global_image_id() << "]";
1816 return os;
1817}
1818
1819} // namespace mirror
1820} // namespace rbd
1821
1822template class rbd::mirror::ImageReplayer<librbd::ImageCtx>;