]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/ImageReplayer.cc
buildsys: use download.ceph.com to download source tar ball
[ceph.git] / ceph / src / tools / rbd_mirror / ImageReplayer.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "include/compat.h"
5#include "common/Formatter.h"
11fdf7f2 6#include "common/admin_socket.h"
7c673cae
FG
7#include "common/debug.h"
8#include "common/errno.h"
9#include "include/stringify.h"
10#include "cls/rbd/cls_rbd_client.h"
11#include "common/Timer.h"
12#include "common/WorkQueue.h"
13#include "global/global_context.h"
14#include "journal/Journaler.h"
15#include "journal/ReplayHandler.h"
16#include "journal/Settings.h"
17#include "librbd/ExclusiveLock.h"
18#include "librbd/ImageCtx.h"
19#include "librbd/ImageState.h"
20#include "librbd/Journal.h"
21#include "librbd/Operations.h"
22#include "librbd/Utils.h"
23#include "librbd/journal/Replay.h"
d2e6a577 24#include "ImageDeleter.h"
7c673cae 25#include "ImageReplayer.h"
7c673cae
FG
26#include "Threads.h"
27#include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
28#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
29#include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
30#include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
d2e6a577 31#include "tools/rbd_mirror/image_replayer/PrepareRemoteImageRequest.h"
7c673cae
FG
32#include "tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h"
33
34#define dout_context g_ceph_context
35#define dout_subsys ceph_subsys_rbd_mirror
36#undef dout_prefix
37#define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
38 << __func__ << ": "
39
40using std::map;
41using std::string;
42using std::unique_ptr;
43using std::shared_ptr;
44using std::vector;
45
11fdf7f2
TL
46extern PerfCounters *g_perf_counters;
47
7c673cae
FG
48namespace rbd {
49namespace mirror {
50
51using librbd::util::create_context_callback;
52using librbd::util::create_rados_callback;
53using namespace rbd::mirror::image_replayer;
54
55template <typename I>
56std::ostream &operator<<(std::ostream &os,
57 const typename ImageReplayer<I>::State &state);
58
59namespace {
60
61template <typename I>
62struct ReplayHandler : public ::journal::ReplayHandler {
63 ImageReplayer<I> *replayer;
64 ReplayHandler(ImageReplayer<I> *replayer) : replayer(replayer) {}
65 void get() override {}
66 void put() override {}
67
68 void handle_entries_available() override {
69 replayer->handle_replay_ready();
70 }
71 void handle_complete(int r) override {
72 std::stringstream ss;
73 if (r < 0) {
74 ss << "replay completed with error: " << cpp_strerror(r);
75 }
76 replayer->handle_replay_complete(r, ss.str());
77 }
78};
79
3efd9988 80template <typename I>
7c673cae
FG
81class ImageReplayerAdminSocketCommand {
82public:
3efd9988
FG
83 ImageReplayerAdminSocketCommand(const std::string &desc,
84 ImageReplayer<I> *replayer)
85 : desc(desc), replayer(replayer) {
86 }
7c673cae
FG
87 virtual ~ImageReplayerAdminSocketCommand() {}
88 virtual bool call(Formatter *f, stringstream *ss) = 0;
3efd9988
FG
89
90 std::string desc;
91 ImageReplayer<I> *replayer;
92 bool registered = false;
7c673cae
FG
93};
94
95template <typename I>
3efd9988 96class StatusCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 97public:
3efd9988
FG
98 explicit StatusCommand(const std::string &desc, ImageReplayer<I> *replayer)
99 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
100 }
7c673cae
FG
101
102 bool call(Formatter *f, stringstream *ss) override {
3efd9988 103 this->replayer->print_status(f, ss);
7c673cae
FG
104 return true;
105 }
7c673cae
FG
106};
107
108template <typename I>
3efd9988 109class StartCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 110public:
3efd9988
FG
111 explicit StartCommand(const std::string &desc, ImageReplayer<I> *replayer)
112 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
113 }
7c673cae
FG
114
115 bool call(Formatter *f, stringstream *ss) override {
3efd9988 116 this->replayer->start(nullptr, true);
7c673cae
FG
117 return true;
118 }
7c673cae
FG
119};
120
121template <typename I>
3efd9988 122class StopCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 123public:
3efd9988
FG
124 explicit StopCommand(const std::string &desc, ImageReplayer<I> *replayer)
125 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
126 }
7c673cae
FG
127
128 bool call(Formatter *f, stringstream *ss) override {
3efd9988 129 this->replayer->stop(nullptr, true);
7c673cae
FG
130 return true;
131 }
7c673cae
FG
132};
133
134template <typename I>
3efd9988 135class RestartCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 136public:
3efd9988
FG
137 explicit RestartCommand(const std::string &desc, ImageReplayer<I> *replayer)
138 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
139 }
7c673cae
FG
140
141 bool call(Formatter *f, stringstream *ss) override {
3efd9988 142 this->replayer->restart();
7c673cae
FG
143 return true;
144 }
7c673cae
FG
145};
146
147template <typename I>
3efd9988 148class FlushCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 149public:
3efd9988
FG
150 explicit FlushCommand(const std::string &desc, ImageReplayer<I> *replayer)
151 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
152 }
7c673cae
FG
153
154 bool call(Formatter *f, stringstream *ss) override {
28e407b8 155 this->replayer->flush();
7c673cae
FG
156 return true;
157 }
7c673cae
FG
158};
159
160template <typename I>
161class ImageReplayerAdminSocketHook : public AdminSocketHook {
162public:
163 ImageReplayerAdminSocketHook(CephContext *cct, const std::string &name,
31f18b77 164 ImageReplayer<I> *replayer)
3efd9988
FG
165 : admin_socket(cct->get_admin_socket()),
166 commands{{"rbd mirror flush " + name,
167 new FlushCommand<I>("flush rbd mirror " + name, replayer)},
168 {"rbd mirror restart " + name,
169 new RestartCommand<I>("restart rbd mirror " + name, replayer)},
170 {"rbd mirror start " + name,
171 new StartCommand<I>("start rbd mirror " + name, replayer)},
172 {"rbd mirror status " + name,
173 new StatusCommand<I>("get status for rbd mirror " + name, replayer)},
174 {"rbd mirror stop " + name,
175 new StopCommand<I>("stop rbd mirror " + name, replayer)}} {
d2e6a577
FG
176 }
177
178 int register_commands() {
3efd9988
FG
179 for (auto &it : commands) {
180 int r = admin_socket->register_command(it.first, it.first, this,
181 it.second->desc);
182 if (r < 0) {
183 return r;
184 }
185 it.second->registered = true;
7c673cae 186 }
d2e6a577 187 return 0;
7c673cae
FG
188 }
189
190 ~ImageReplayerAdminSocketHook() override {
3efd9988
FG
191 for (auto &it : commands) {
192 if (it.second->registered) {
193 admin_socket->unregister_command(it.first);
194 }
195 delete it.second;
7c673cae 196 }
31f18b77 197 commands.clear();
7c673cae
FG
198 }
199
11fdf7f2
TL
200 bool call(std::string_view command, const cmdmap_t& cmdmap,
201 std::string_view format, bufferlist& out) override {
3efd9988 202 auto i = commands.find(command);
11fdf7f2 203 ceph_assert(i != commands.end());
7c673cae
FG
204 Formatter *f = Formatter::create(format);
205 stringstream ss;
206 bool r = i->second->call(f, &ss);
207 delete f;
208 out.append(ss);
209 return r;
210 }
211
212private:
11fdf7f2
TL
213 typedef std::map<std::string, ImageReplayerAdminSocketCommand<I>*,
214 std::less<>> Commands;
7c673cae
FG
215
216 AdminSocket *admin_socket;
217 Commands commands;
218};
219
220uint32_t calculate_replay_delay(const utime_t &event_time,
221 int mirroring_replay_delay) {
222 if (mirroring_replay_delay <= 0) {
223 return 0;
224 }
225
226 utime_t now = ceph_clock_now();
227 if (event_time + mirroring_replay_delay <= now) {
228 return 0;
229 }
230
231 // ensure it is rounded up when converting to integer
232 return (event_time + mirroring_replay_delay - now) + 1;
233}
234
235} // anonymous namespace
236
237template <typename I>
238void ImageReplayer<I>::BootstrapProgressContext::update_progress(
239 const std::string &description, bool flush)
240{
241 const std::string desc = "bootstrapping, " + description;
242 replayer->set_state_description(0, desc);
243 if (flush) {
244 replayer->update_mirror_image_status(false, boost::none);
245 }
246}
247
248template <typename I>
249void ImageReplayer<I>::RemoteJournalerListener::handle_update(
250 ::journal::JournalMetadata *) {
251 FunctionContext *ctx = new FunctionContext([this](int r) {
252 replayer->handle_remote_journal_metadata_updated();
253 });
254 replayer->m_threads->work_queue->queue(ctx, 0);
255}
256
257template <typename I>
d2e6a577 258ImageReplayer<I>::ImageReplayer(Threads<I> *threads,
31f18b77 259 InstanceWatcher<I> *instance_watcher,
7c673cae
FG
260 RadosRef local,
261 const std::string &local_mirror_uuid,
262 int64_t local_pool_id,
263 const std::string &global_image_id) :
264 m_threads(threads),
31f18b77 265 m_instance_watcher(instance_watcher),
7c673cae
FG
266 m_local(local),
267 m_local_mirror_uuid(local_mirror_uuid),
268 m_local_pool_id(local_pool_id),
28e407b8 269 m_global_image_id(global_image_id), m_local_image_name(global_image_id),
7c673cae
FG
270 m_lock("rbd::mirror::ImageReplayer " + stringify(local_pool_id) + " " +
271 global_image_id),
272 m_progress_cxt(this),
273 m_journal_listener(new JournalListener(this)),
274 m_remote_listener(this)
275{
276 // Register asok commands using a temporary "remote_pool_name/global_image_id"
277 // name. When the image name becomes known on start the asok commands will be
278 // re-registered using "remote_pool_name/remote_image_name" name.
279
280 std::string pool_name;
281 int r = m_local->pool_reverse_lookup(m_local_pool_id, &pool_name);
282 if (r < 0) {
283 derr << "error resolving local pool " << m_local_pool_id
284 << ": " << cpp_strerror(r) << dendl;
285 pool_name = stringify(m_local_pool_id);
286 }
287
288 m_name = pool_name + "/" + m_global_image_id;
d2e6a577 289 register_admin_socket_hook();
7c673cae
FG
290}
291
292template <typename I>
293ImageReplayer<I>::~ImageReplayer()
294{
d2e6a577 295 unregister_admin_socket_hook();
11fdf7f2
TL
296 ceph_assert(m_event_preprocessor == nullptr);
297 ceph_assert(m_replay_status_formatter == nullptr);
298 ceph_assert(m_local_image_ctx == nullptr);
299 ceph_assert(m_local_replay == nullptr);
300 ceph_assert(m_remote_journaler == nullptr);
301 ceph_assert(m_replay_handler == nullptr);
302 ceph_assert(m_on_start_finish == nullptr);
303 ceph_assert(m_on_stop_finish == nullptr);
304 ceph_assert(m_bootstrap_request == nullptr);
305 ceph_assert(m_in_flight_status_updates == 0);
7c673cae
FG
306
307 delete m_journal_listener;
7c673cae
FG
308}
309
c07f9fc5
FG
310template <typename I>
311image_replayer::HealthState ImageReplayer<I>::get_health_state() const {
312 Mutex::Locker locker(m_lock);
313
314 if (!m_mirror_image_status_state) {
315 return image_replayer::HEALTH_STATE_OK;
316 } else if (*m_mirror_image_status_state ==
317 cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING ||
318 *m_mirror_image_status_state ==
319 cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN) {
320 return image_replayer::HEALTH_STATE_WARNING;
321 }
322 return image_replayer::HEALTH_STATE_ERROR;
323}
324
7c673cae 325template <typename I>
d2e6a577
FG
326void ImageReplayer<I>::add_peer(const std::string &peer_uuid,
327 librados::IoCtx &io_ctx) {
7c673cae 328 Mutex::Locker locker(m_lock);
d2e6a577
FG
329 auto it = m_peers.find({peer_uuid});
330 if (it == m_peers.end()) {
331 m_peers.insert({peer_uuid, io_ctx});
7c673cae
FG
332 }
333}
334
7c673cae
FG
335template <typename I>
336void ImageReplayer<I>::set_state_description(int r, const std::string &desc) {
11fdf7f2 337 dout(10) << r << " " << desc << dendl;
7c673cae
FG
338
339 Mutex::Locker l(m_lock);
340 m_last_r = r;
341 m_state_desc = desc;
342}
343
344template <typename I>
345void ImageReplayer<I>::start(Context *on_finish, bool manual)
346{
11fdf7f2 347 dout(10) << "on_finish=" << on_finish << dendl;
7c673cae
FG
348
349 int r = 0;
350 {
351 Mutex::Locker locker(m_lock);
352 if (!is_stopped_()) {
353 derr << "already running" << dendl;
354 r = -EINVAL;
355 } else if (m_manual_stop && !manual) {
356 dout(5) << "stopped manually, ignoring start without manual flag"
357 << dendl;
358 r = -EPERM;
359 } else {
360 m_state = STATE_STARTING;
361 m_last_r = 0;
362 m_state_desc.clear();
363 m_manual_stop = false;
d2e6a577 364 m_delete_requested = false;
7c673cae
FG
365
366 if (on_finish != nullptr) {
11fdf7f2 367 ceph_assert(m_on_start_finish == nullptr);
7c673cae
FG
368 m_on_start_finish = on_finish;
369 }
11fdf7f2 370 ceph_assert(m_on_stop_finish == nullptr);
7c673cae
FG
371 }
372 }
373
374 if (r < 0) {
375 if (on_finish) {
376 on_finish->complete(r);
377 }
378 return;
379 }
380
11fdf7f2
TL
381 m_local_ioctx.reset(new librados::IoCtx{});
382 r = m_local->ioctx_create2(m_local_pool_id, *m_local_ioctx);
7c673cae 383 if (r < 0) {
11fdf7f2
TL
384 m_local_ioctx.reset();
385
7c673cae
FG
386 derr << "error opening ioctx for local pool " << m_local_pool_id
387 << ": " << cpp_strerror(r) << dendl;
388 on_start_fail(r, "error opening local pool");
389 return;
390 }
391
392 prepare_local_image();
393}
394
395template <typename I>
396void ImageReplayer<I>::prepare_local_image() {
11fdf7f2 397 dout(10) << dendl;
7c673cae 398
d2e6a577 399 m_local_image_id = "";
7c673cae
FG
400 Context *ctx = create_context_callback<
401 ImageReplayer, &ImageReplayer<I>::handle_prepare_local_image>(this);
402 auto req = PrepareLocalImageRequest<I>::create(
11fdf7f2 403 *m_local_ioctx, m_global_image_id, &m_local_image_id, &m_local_image_name,
7c673cae
FG
404 &m_local_image_tag_owner, m_threads->work_queue, ctx);
405 req->send();
406}
407
408template <typename I>
409void ImageReplayer<I>::handle_prepare_local_image(int r) {
11fdf7f2 410 dout(10) << "r=" << r << dendl;
7c673cae
FG
411
412 if (r == -ENOENT) {
11fdf7f2 413 dout(10) << "local image does not exist" << dendl;
7c673cae
FG
414 } else if (r < 0) {
415 on_start_fail(r, "error preparing local image for replay");
416 return;
28e407b8
AA
417 } else {
418 reregister_admin_socket_hook();
7c673cae
FG
419 }
420
421 // local image doesn't exist or is non-primary
d2e6a577 422 prepare_remote_image();
7c673cae
FG
423}
424
425template <typename I>
d2e6a577 426void ImageReplayer<I>::prepare_remote_image() {
11fdf7f2 427 dout(10) << dendl;
b32b8144
FG
428 if (m_peers.empty()) {
429 // technically nothing to bootstrap, but it handles the status update
430 bootstrap();
431 return;
432 }
7c673cae 433
d2e6a577 434 // TODO need to support multiple remote images
11fdf7f2 435 ceph_assert(!m_peers.empty());
d2e6a577
FG
436 m_remote_image = {*m_peers.begin()};
437
11fdf7f2
TL
438 auto cct = static_cast<CephContext *>(m_local->cct());
439 journal::Settings journal_settings;
440 journal_settings.commit_interval = cct->_conf.get_val<double>(
441 "rbd_mirror_journal_commit_age");
442 journal_settings.max_fetch_bytes = cct->_conf.get_val<Option::size_t>(
443 "rbd_mirror_journal_max_fetch_bytes");
444
d2e6a577
FG
445 Context *ctx = create_context_callback<
446 ImageReplayer, &ImageReplayer<I>::handle_prepare_remote_image>(this);
447 auto req = PrepareRemoteImageRequest<I>::create(
b32b8144 448 m_threads, m_remote_image.io_ctx, m_global_image_id, m_local_mirror_uuid,
11fdf7f2
TL
449 m_local_image_id, journal_settings, &m_remote_image.mirror_uuid,
450 &m_remote_image.image_id, &m_remote_journaler, &m_client_state,
451 &m_client_meta, ctx);
d2e6a577
FG
452 req->send();
453}
454
455template <typename I>
456void ImageReplayer<I>::handle_prepare_remote_image(int r) {
11fdf7f2 457 dout(10) << "r=" << r << dendl;
d2e6a577 458
11fdf7f2 459 ceph_assert(r < 0 ? m_remote_journaler == nullptr : m_remote_journaler != nullptr);
b32b8144
FG
460 if (r < 0 && !m_local_image_id.empty() &&
461 m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) {
462 // local image is primary -- fall-through
463 } else if (r == -ENOENT) {
11fdf7f2 464 dout(10) << "remote image does not exist" << dendl;
d2e6a577
FG
465
466 // TODO need to support multiple remote images
11fdf7f2 467 if (m_remote_image.image_id.empty() && !m_local_image_id.empty() &&
d2e6a577
FG
468 m_local_image_tag_owner == m_remote_image.mirror_uuid) {
469 // local image exists and is non-primary and linked to the missing
470 // remote image
471
472 m_delete_requested = true;
473 on_start_fail(0, "remote image no longer exists");
474 } else {
475 on_start_fail(-ENOENT, "remote image does not exist");
476 }
477 return;
478 } else if (r < 0) {
479 on_start_fail(r, "error retrieving remote image id");
7c673cae
FG
480 return;
481 }
482
d2e6a577
FG
483 bootstrap();
484}
485
486template <typename I>
487void ImageReplayer<I>::bootstrap() {
11fdf7f2 488 dout(10) << dendl;
7c673cae 489
b32b8144
FG
490 if (!m_local_image_id.empty() &&
491 m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) {
492 dout(5) << "local image is primary" << dendl;
493 on_start_fail(0, "local image is primary");
494 return;
495 } else if (m_peers.empty()) {
496 dout(5) << "no peer clusters" << dendl;
497 on_start_fail(-ENOENT, "no peer clusters");
498 return;
499 }
7c673cae 500
11fdf7f2 501 BootstrapRequest<I> *request = nullptr;
7c673cae
FG
502 {
503 Mutex::Locker locker(m_lock);
11fdf7f2
TL
504 if (on_start_interrupted(m_lock)) {
505 return;
506 }
507
508 auto ctx = create_context_callback<
509 ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
510 request = BootstrapRequest<I>::create(
511 m_threads, *m_local_ioctx, m_remote_image.io_ctx, m_instance_watcher,
512 &m_local_image_ctx, m_local_image_id, m_remote_image.image_id,
513 m_global_image_id, m_local_mirror_uuid, m_remote_image.mirror_uuid,
514 m_remote_journaler, &m_client_state, &m_client_meta, ctx,
515 &m_resync_requested, &m_progress_cxt);
7c673cae
FG
516 request->get();
517 m_bootstrap_request = request;
518 }
519
520 update_mirror_image_status(false, boost::none);
521 reschedule_update_status_task(10);
522
523 request->send();
524}
525
526template <typename I>
527void ImageReplayer<I>::handle_bootstrap(int r) {
11fdf7f2 528 dout(10) << "r=" << r << dendl;
7c673cae
FG
529 {
530 Mutex::Locker locker(m_lock);
531 m_bootstrap_request->put();
532 m_bootstrap_request = nullptr;
533 if (m_local_image_ctx) {
534 m_local_image_id = m_local_image_ctx->id;
535 }
536 }
537
11fdf7f2
TL
538 if (on_start_interrupted()) {
539 return;
540 } else if (r == -EREMOTEIO) {
7c673cae 541 m_local_image_tag_owner = "";
c07f9fc5
FG
542 dout(5) << "remote image is non-primary" << dendl;
543 on_start_fail(-EREMOTEIO, "remote image is non-primary");
7c673cae
FG
544 return;
545 } else if (r == -EEXIST) {
546 m_local_image_tag_owner = "";
547 on_start_fail(r, "split-brain detected");
548 return;
549 } else if (r < 0) {
550 on_start_fail(r, "error bootstrapping replay");
551 return;
d2e6a577
FG
552 } else if (m_resync_requested) {
553 on_start_fail(0, "resync requested");
554 return;
7c673cae
FG
555 }
556
11fdf7f2 557 ceph_assert(m_local_journal == nullptr);
7c673cae
FG
558 {
559 RWLock::RLocker snap_locker(m_local_image_ctx->snap_lock);
560 if (m_local_image_ctx->journal != nullptr) {
561 m_local_journal = m_local_image_ctx->journal;
562 m_local_journal->add_listener(m_journal_listener);
563 }
564 }
565
566 if (m_local_journal == nullptr) {
567 on_start_fail(-EINVAL, "error accessing local journal");
568 return;
569 }
570
7c673cae
FG
571 update_mirror_image_status(false, boost::none);
572 init_remote_journaler();
573}
574
575template <typename I>
576void ImageReplayer<I>::init_remote_journaler() {
11fdf7f2 577 dout(10) << dendl;
7c673cae
FG
578
579 Context *ctx = create_context_callback<
580 ImageReplayer, &ImageReplayer<I>::handle_init_remote_journaler>(this);
581 m_remote_journaler->init(ctx);
582}
583
584template <typename I>
585void ImageReplayer<I>::handle_init_remote_journaler(int r) {
11fdf7f2 586 dout(10) << "r=" << r << dendl;
7c673cae 587
11fdf7f2
TL
588 if (on_start_interrupted()) {
589 return;
590 } else if (r < 0) {
7c673cae
FG
591 derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl;
592 on_start_fail(r, "error initializing remote journal");
593 return;
7c673cae
FG
594 }
595
596 m_remote_journaler->add_listener(&m_remote_listener);
597
598 cls::journal::Client client;
599 r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
600 if (r < 0) {
601 derr << "error retrieving remote journal client: " << cpp_strerror(r)
602 << dendl;
603 on_start_fail(r, "error retrieving remote journal client");
604 return;
605 }
606
b32b8144
FG
607 dout(5) << "image_id=" << m_local_image_id << ", "
608 << "client_meta.image_id=" << m_client_meta.image_id << ", "
609 << "client.state=" << client.state << dendl;
d2e6a577
FG
610 if (m_client_meta.image_id == m_local_image_id &&
611 client.state != cls::journal::CLIENT_STATE_CONNECTED) {
7c673cae 612 dout(5) << "client flagged disconnected, stopping image replay" << dendl;
11fdf7f2 613 if (m_local_image_ctx->config.template get_val<bool>("rbd_mirroring_resync_after_disconnect")) {
d2e6a577
FG
614 m_resync_requested = true;
615 on_start_fail(-ENOTCONN, "disconnected: automatic resync");
616 } else {
617 on_start_fail(-ENOTCONN, "disconnected");
7c673cae 618 }
7c673cae
FG
619 return;
620 }
621
622 start_replay();
623}
624
625template <typename I>
626void ImageReplayer<I>::start_replay() {
11fdf7f2 627 dout(10) << dendl;
7c673cae
FG
628
629 Context *start_ctx = create_context_callback<
630 ImageReplayer, &ImageReplayer<I>::handle_start_replay>(this);
631 m_local_journal->start_external_replay(&m_local_replay, start_ctx);
632}
633
634template <typename I>
635void ImageReplayer<I>::handle_start_replay(int r) {
11fdf7f2 636 dout(10) << "r=" << r << dendl;
7c673cae
FG
637
638 if (r < 0) {
11fdf7f2 639 ceph_assert(m_local_replay == nullptr);
7c673cae
FG
640 derr << "error starting external replay on local image "
641 << m_local_image_id << ": " << cpp_strerror(r) << dendl;
642 on_start_fail(r, "error starting replay on local image");
643 return;
644 }
645
91327a77
AA
646 m_replay_status_formatter =
647 ReplayStatusFormatter<I>::create(m_remote_journaler, m_local_mirror_uuid);
648
7c673cae
FG
649 Context *on_finish(nullptr);
650 {
651 Mutex::Locker locker(m_lock);
11fdf7f2 652 ceph_assert(m_state == STATE_STARTING);
7c673cae
FG
653 m_state = STATE_REPLAYING;
654 std::swap(m_on_start_finish, on_finish);
655 }
656
657 m_event_preprocessor = EventPreprocessor<I>::create(
658 *m_local_image_ctx, *m_remote_journaler, m_local_mirror_uuid,
659 &m_client_meta, m_threads->work_queue);
7c673cae
FG
660
661 update_mirror_image_status(true, boost::none);
662 reschedule_update_status_task(30);
663
7c673cae
FG
664 if (on_replay_interrupted()) {
665 return;
666 }
667
668 {
669 CephContext *cct = static_cast<CephContext *>(m_local->cct());
11fdf7f2 670 double poll_seconds = cct->_conf.get_val<double>(
181888fb 671 "rbd_mirror_journal_poll_age");
7c673cae
FG
672
673 Mutex::Locker locker(m_lock);
674 m_replay_handler = new ReplayHandler<I>(this);
675 m_remote_journaler->start_live_replay(m_replay_handler, poll_seconds);
676
11fdf7f2 677 dout(10) << "m_remote_journaler=" << *m_remote_journaler << dendl;
7c673cae
FG
678 }
679
11fdf7f2 680 dout(10) << "start succeeded" << dendl;
d2e6a577 681 if (on_finish != nullptr) {
11fdf7f2 682 dout(10) << "on finish complete, r=" << r << dendl;
d2e6a577
FG
683 on_finish->complete(r);
684 }
7c673cae
FG
685}
686
687template <typename I>
688void ImageReplayer<I>::on_start_fail(int r, const std::string &desc)
689{
11fdf7f2 690 dout(10) << "r=" << r << dendl;
7c673cae
FG
691 Context *ctx = new FunctionContext([this, r, desc](int _r) {
692 {
693 Mutex::Locker locker(m_lock);
11fdf7f2 694 ceph_assert(m_state == STATE_STARTING);
7c673cae 695 m_state = STATE_STOPPING;
d2e6a577 696 if (r < 0 && r != -ECANCELED && r != -EREMOTEIO && r != -ENOENT) {
7c673cae
FG
697 derr << "start failed: " << cpp_strerror(r) << dendl;
698 } else {
11fdf7f2 699 dout(10) << "start canceled" << dendl;
7c673cae
FG
700 }
701 }
702
703 set_state_description(r, desc);
11fdf7f2
TL
704 if (m_local_ioctx) {
705 update_mirror_image_status(false, boost::none);
706 }
7c673cae
FG
707 reschedule_update_status_task(-1);
708 shut_down(r);
709 });
710 m_threads->work_queue->queue(ctx, 0);
711}
712
713template <typename I>
11fdf7f2 714bool ImageReplayer<I>::on_start_interrupted() {
7c673cae 715 Mutex::Locker locker(m_lock);
11fdf7f2
TL
716 return on_start_interrupted(m_lock);
717}
718
719template <typename I>
720bool ImageReplayer<I>::on_start_interrupted(Mutex& lock) {
721 ceph_assert(m_lock.is_locked());
722 ceph_assert(m_state == STATE_STARTING);
723 if (!m_stop_requested) {
7c673cae
FG
724 return false;
725 }
726
11fdf7f2 727 on_start_fail(-ECANCELED, "");
7c673cae
FG
728 return true;
729}
730
731template <typename I>
732void ImageReplayer<I>::stop(Context *on_finish, bool manual, int r,
733 const std::string& desc)
734{
11fdf7f2 735 dout(10) << "on_finish=" << on_finish << ", manual=" << manual
7c673cae
FG
736 << ", desc=" << desc << dendl;
737
738 image_replayer::BootstrapRequest<I> *bootstrap_request = nullptr;
739 bool shut_down_replay = false;
740 bool running = true;
7c673cae
FG
741 {
742 Mutex::Locker locker(m_lock);
743
744 if (!is_running_()) {
745 running = false;
746 } else {
747 if (!is_stopped_()) {
748 if (m_state == STATE_STARTING) {
11fdf7f2
TL
749 dout(10) << "canceling start" << dendl;
750 if (m_bootstrap_request != nullptr) {
7c673cae
FG
751 bootstrap_request = m_bootstrap_request;
752 bootstrap_request->get();
753 }
754 } else {
11fdf7f2 755 dout(10) << "interrupting replay" << dendl;
7c673cae
FG
756 shut_down_replay = true;
757 }
758
11fdf7f2 759 ceph_assert(m_on_stop_finish == nullptr);
7c673cae
FG
760 std::swap(m_on_stop_finish, on_finish);
761 m_stop_requested = true;
762 m_manual_stop = manual;
7c673cae
FG
763 }
764 }
765 }
766
767 // avoid holding lock since bootstrap request will update status
768 if (bootstrap_request != nullptr) {
11fdf7f2 769 dout(10) << "canceling bootstrap" << dendl;
7c673cae
FG
770 bootstrap_request->cancel();
771 bootstrap_request->put();
772 }
773
7c673cae
FG
774 if (!running) {
775 dout(20) << "not running" << dendl;
776 if (on_finish) {
777 on_finish->complete(-EINVAL);
778 }
779 return;
780 }
781
782 if (shut_down_replay) {
783 on_stop_journal_replay(r, desc);
784 } else if (on_finish != nullptr) {
785 on_finish->complete(0);
786 }
787}
788
789template <typename I>
790void ImageReplayer<I>::on_stop_journal_replay(int r, const std::string &desc)
791{
11fdf7f2 792 dout(10) << dendl;
7c673cae
FG
793
794 {
795 Mutex::Locker locker(m_lock);
796 if (m_state != STATE_REPLAYING) {
797 // might be invoked multiple times while stopping
798 return;
799 }
800 m_stop_requested = true;
801 m_state = STATE_STOPPING;
802 }
803
804 set_state_description(r, desc);
a8e16298 805 update_mirror_image_status(true, boost::none);
7c673cae
FG
806 reschedule_update_status_task(-1);
807 shut_down(0);
808}
809
810template <typename I>
811void ImageReplayer<I>::handle_replay_ready()
812{
11fdf7f2 813 dout(20) << dendl;
7c673cae
FG
814 if (on_replay_interrupted()) {
815 return;
816 }
817
818 if (!m_remote_journaler->try_pop_front(&m_replay_entry, &m_replay_tag_tid)) {
819 return;
820 }
821
822 m_event_replay_tracker.start_op();
31f18b77
FG
823
824 m_lock.Lock();
825 bool stopping = (m_state == STATE_STOPPING);
826 m_lock.Unlock();
827
828 if (stopping) {
829 dout(10) << "stopping event replay" << dendl;
830 m_event_replay_tracker.finish_op();
831 return;
832 }
833
7c673cae
FG
834 if (m_replay_tag_valid && m_replay_tag.tid == m_replay_tag_tid) {
835 preprocess_entry();
836 return;
837 }
838
839 replay_flush();
840}
841
842template <typename I>
843void ImageReplayer<I>::restart(Context *on_finish)
844{
845 FunctionContext *ctx = new FunctionContext(
846 [this, on_finish](int r) {
847 if (r < 0) {
848 // Try start anyway.
849 }
850 start(on_finish, true);
851 });
852 stop(ctx);
853}
854
855template <typename I>
856void ImageReplayer<I>::flush(Context *on_finish)
857{
11fdf7f2 858 dout(10) << dendl;
7c673cae
FG
859
860 {
861 Mutex::Locker locker(m_lock);
862 if (m_state == STATE_REPLAYING) {
863 Context *ctx = new FunctionContext(
864 [on_finish](int r) {
865 if (on_finish != nullptr) {
866 on_finish->complete(r);
867 }
868 });
869 on_flush_local_replay_flush_start(ctx);
870 return;
871 }
872 }
873
874 if (on_finish) {
875 on_finish->complete(0);
876 }
877}
878
879template <typename I>
880void ImageReplayer<I>::on_flush_local_replay_flush_start(Context *on_flush)
881{
11fdf7f2 882 dout(10) << dendl;
7c673cae
FG
883 FunctionContext *ctx = new FunctionContext(
884 [this, on_flush](int r) {
885 on_flush_local_replay_flush_finish(on_flush, r);
886 });
887
11fdf7f2
TL
888 ceph_assert(m_lock.is_locked());
889 ceph_assert(m_state == STATE_REPLAYING);
7c673cae
FG
890 m_local_replay->flush(ctx);
891}
892
893template <typename I>
894void ImageReplayer<I>::on_flush_local_replay_flush_finish(Context *on_flush,
895 int r)
896{
11fdf7f2 897 dout(10) << "r=" << r << dendl;
7c673cae
FG
898 if (r < 0) {
899 derr << "error flushing local replay: " << cpp_strerror(r) << dendl;
900 on_flush->complete(r);
901 return;
902 }
903
904 on_flush_flush_commit_position_start(on_flush);
905}
906
907template <typename I>
908void ImageReplayer<I>::on_flush_flush_commit_position_start(Context *on_flush)
909{
910 FunctionContext *ctx = new FunctionContext(
911 [this, on_flush](int r) {
912 on_flush_flush_commit_position_finish(on_flush, r);
913 });
914
915 m_remote_journaler->flush_commit_position(ctx);
916}
917
918template <typename I>
919void ImageReplayer<I>::on_flush_flush_commit_position_finish(Context *on_flush,
920 int r)
921{
922 if (r < 0) {
923 derr << "error flushing remote journal commit position: "
924 << cpp_strerror(r) << dendl;
925 }
926
927 update_mirror_image_status(false, boost::none);
928
929 dout(20) << "flush complete, r=" << r << dendl;
930 on_flush->complete(r);
931}
932
933template <typename I>
934bool ImageReplayer<I>::on_replay_interrupted()
935{
936 bool shut_down;
937 {
938 Mutex::Locker locker(m_lock);
939 shut_down = m_stop_requested;
940 }
941
942 if (shut_down) {
943 on_stop_journal_replay();
944 }
945 return shut_down;
946}
947
948template <typename I>
949void ImageReplayer<I>::print_status(Formatter *f, stringstream *ss)
950{
11fdf7f2 951 dout(10) << dendl;
7c673cae
FG
952
953 Mutex::Locker l(m_lock);
954
955 if (f) {
956 f->open_object_section("image_replayer");
957 f->dump_string("name", m_name);
958 f->dump_string("state", to_string(m_state));
959 f->close_section();
960 f->flush(*ss);
961 } else {
962 *ss << m_name << ": state: " << to_string(m_state);
963 }
964}
965
966template <typename I>
967void ImageReplayer<I>::handle_replay_complete(int r, const std::string &error_desc)
968{
11fdf7f2 969 dout(10) << "r=" << r << dendl;
7c673cae
FG
970 if (r < 0) {
971 derr << "replay encountered an error: " << cpp_strerror(r) << dendl;
972 set_state_description(r, error_desc);
973 }
974
975 {
976 Mutex::Locker locker(m_lock);
977 m_stop_requested = true;
978 }
979 on_replay_interrupted();
980}
981
982template <typename I>
983void ImageReplayer<I>::replay_flush() {
11fdf7f2 984 dout(10) << dendl;
7c673cae
FG
985
986 bool interrupted = false;
987 {
988 Mutex::Locker locker(m_lock);
989 if (m_state != STATE_REPLAYING) {
11fdf7f2 990 dout(10) << "replay interrupted" << dendl;
7c673cae
FG
991 interrupted = true;
992 } else {
993 m_state = STATE_REPLAY_FLUSHING;
994 }
995 }
996
997 if (interrupted) {
998 m_event_replay_tracker.finish_op();
999 return;
1000 }
1001
1002 // shut down the replay to flush all IO and ops and create a new
1003 // replayer to handle the new tag epoch
1004 Context *ctx = create_context_callback<
1005 ImageReplayer<I>, &ImageReplayer<I>::handle_replay_flush>(this);
1006 ctx = new FunctionContext([this, ctx](int r) {
1007 m_local_image_ctx->journal->stop_external_replay();
1008 m_local_replay = nullptr;
1009
1010 if (r < 0) {
1011 ctx->complete(r);
1012 return;
1013 }
1014
1015 m_local_journal->start_external_replay(&m_local_replay, ctx);
1016 });
1017 m_local_replay->shut_down(false, ctx);
1018}
1019
1020template <typename I>
1021void ImageReplayer<I>::handle_replay_flush(int r) {
11fdf7f2 1022 dout(10) << "r=" << r << dendl;
7c673cae
FG
1023
1024 {
1025 Mutex::Locker locker(m_lock);
11fdf7f2 1026 ceph_assert(m_state == STATE_REPLAY_FLUSHING);
7c673cae
FG
1027 m_state = STATE_REPLAYING;
1028 }
1029
1030 if (r < 0) {
1031 derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl;
1032 m_event_replay_tracker.finish_op();
1033 handle_replay_complete(r, "replay flush encountered an error");
1034 return;
1035 } else if (on_replay_interrupted()) {
1036 m_event_replay_tracker.finish_op();
1037 return;
1038 }
1039
1040 get_remote_tag();
1041}
1042
1043template <typename I>
1044void ImageReplayer<I>::get_remote_tag() {
11fdf7f2 1045 dout(15) << "tag_tid: " << m_replay_tag_tid << dendl;
7c673cae
FG
1046
1047 Context *ctx = create_context_callback<
1048 ImageReplayer, &ImageReplayer<I>::handle_get_remote_tag>(this);
1049 m_remote_journaler->get_tag(m_replay_tag_tid, &m_replay_tag, ctx);
1050}
1051
1052template <typename I>
1053void ImageReplayer<I>::handle_get_remote_tag(int r) {
11fdf7f2 1054 dout(15) << "r=" << r << dendl;
7c673cae
FG
1055
1056 if (r == 0) {
1057 try {
11fdf7f2
TL
1058 auto it = m_replay_tag.data.cbegin();
1059 decode(m_replay_tag_data, it);
7c673cae
FG
1060 } catch (const buffer::error &err) {
1061 r = -EBADMSG;
1062 }
1063 }
1064
1065 if (r < 0) {
1066 derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": "
1067 << cpp_strerror(r) << dendl;
1068 m_event_replay_tracker.finish_op();
1069 handle_replay_complete(r, "failed to retrieve remote tag");
1070 return;
1071 }
1072
1073 m_replay_tag_valid = true;
11fdf7f2 1074 dout(15) << "decoded remote tag " << m_replay_tag_tid << ": "
7c673cae
FG
1075 << m_replay_tag_data << dendl;
1076
1077 allocate_local_tag();
1078}
1079
1080template <typename I>
1081void ImageReplayer<I>::allocate_local_tag() {
28e407b8 1082 dout(15) << dendl;
7c673cae
FG
1083
1084 std::string mirror_uuid = m_replay_tag_data.mirror_uuid;
28e407b8 1085 if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
7c673cae 1086 mirror_uuid = m_remote_image.mirror_uuid;
28e407b8
AA
1087 } else if (mirror_uuid == m_local_mirror_uuid) {
1088 mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
7c673cae 1089 } else if (mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) {
28e407b8
AA
1090 // handle possible edge condition where daemon can failover and
1091 // the local image has already been promoted/demoted
1092 auto local_tag_data = m_local_journal->get_tag_data();
1093 if (local_tag_data.mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID &&
1094 (local_tag_data.predecessor.commit_valid &&
1095 local_tag_data.predecessor.mirror_uuid ==
1096 librbd::Journal<>::LOCAL_MIRROR_UUID)) {
1097 dout(15) << "skipping stale demotion event" << dendl;
11fdf7f2 1098 handle_process_entry_safe(m_replay_entry, m_replay_start_time, 0);
28e407b8
AA
1099 handle_replay_ready();
1100 return;
1101 } else {
1102 dout(5) << "encountered image demotion: stopping" << dendl;
1103 Mutex::Locker locker(m_lock);
1104 m_stop_requested = true;
1105 }
7c673cae
FG
1106 }
1107
1108 librbd::journal::TagPredecessor predecessor(m_replay_tag_data.predecessor);
1109 if (predecessor.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
1110 predecessor.mirror_uuid = m_remote_image.mirror_uuid;
1111 } else if (predecessor.mirror_uuid == m_local_mirror_uuid) {
1112 predecessor.mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
1113 }
1114
28e407b8
AA
1115 dout(15) << "mirror_uuid=" << mirror_uuid << ", "
1116 << "predecessor=" << predecessor << ", "
1117 << "replay_tag_tid=" << m_replay_tag_tid << dendl;
7c673cae
FG
1118 Context *ctx = create_context_callback<
1119 ImageReplayer, &ImageReplayer<I>::handle_allocate_local_tag>(this);
1120 m_local_journal->allocate_tag(mirror_uuid, predecessor, ctx);
1121}
1122
1123template <typename I>
1124void ImageReplayer<I>::handle_allocate_local_tag(int r) {
28e407b8
AA
1125 dout(15) << "r=" << r << ", "
1126 << "tag_tid=" << m_local_journal->get_tag_tid() << dendl;
7c673cae
FG
1127
1128 if (r < 0) {
1129 derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl;
1130 m_event_replay_tracker.finish_op();
1131 handle_replay_complete(r, "failed to allocate journal tag");
1132 return;
1133 }
1134
1135 preprocess_entry();
1136}
1137
1138template <typename I>
1139void ImageReplayer<I>::preprocess_entry() {
1140 dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid()
1141 << dendl;
1142
1143 bufferlist data = m_replay_entry.get_data();
11fdf7f2 1144 auto it = data.cbegin();
7c673cae
FG
1145 int r = m_local_replay->decode(&it, &m_event_entry);
1146 if (r < 0) {
1147 derr << "failed to decode journal event" << dendl;
1148 m_event_replay_tracker.finish_op();
1149 handle_replay_complete(r, "failed to decode journal event");
1150 return;
1151 }
1152
1153 uint32_t delay = calculate_replay_delay(
1154 m_event_entry.timestamp, m_local_image_ctx->mirroring_replay_delay);
1155 if (delay == 0) {
1156 handle_preprocess_entry_ready(0);
1157 return;
1158 }
1159
1160 dout(20) << "delaying replay by " << delay << " sec" << dendl;
1161
1162 Mutex::Locker timer_locker(m_threads->timer_lock);
11fdf7f2 1163 ceph_assert(m_delayed_preprocess_task == nullptr);
7c673cae
FG
1164 m_delayed_preprocess_task = new FunctionContext(
1165 [this](int r) {
11fdf7f2 1166 ceph_assert(m_threads->timer_lock.is_locked());
7c673cae
FG
1167 m_delayed_preprocess_task = nullptr;
1168 m_threads->work_queue->queue(
1169 create_context_callback<ImageReplayer,
1170 &ImageReplayer<I>::handle_preprocess_entry_ready>(this), 0);
1171 });
1172 m_threads->timer->add_event_after(delay, m_delayed_preprocess_task);
1173}
1174
1175template <typename I>
1176void ImageReplayer<I>::handle_preprocess_entry_ready(int r) {
1177 dout(20) << "r=" << r << dendl;
11fdf7f2 1178 ceph_assert(r == 0);
7c673cae 1179
11fdf7f2 1180 m_replay_start_time = ceph_clock_now();
7c673cae
FG
1181 if (!m_event_preprocessor->is_required(m_event_entry)) {
1182 process_entry();
1183 return;
1184 }
1185
1186 Context *ctx = create_context_callback<
1187 ImageReplayer, &ImageReplayer<I>::handle_preprocess_entry_safe>(this);
1188 m_event_preprocessor->preprocess(&m_event_entry, ctx);
1189}
1190
1191template <typename I>
1192void ImageReplayer<I>::handle_preprocess_entry_safe(int r) {
1193 dout(20) << "r=" << r << dendl;
1194
1195 if (r < 0) {
7c673cae 1196 m_event_replay_tracker.finish_op();
31f18b77
FG
1197
1198 if (r == -ECANCELED) {
1199 handle_replay_complete(0, "lost exclusive lock");
1200 } else {
1201 derr << "failed to preprocess journal event" << dendl;
1202 handle_replay_complete(r, "failed to preprocess journal event");
1203 }
7c673cae
FG
1204 return;
1205 }
1206
1207 process_entry();
1208}
1209
1210template <typename I>
1211void ImageReplayer<I>::process_entry() {
1212 dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid()
1213 << dendl;
1214
1215 // stop replaying events if stop has been requested
1216 if (on_replay_interrupted()) {
1217 m_event_replay_tracker.finish_op();
1218 return;
1219 }
1220
1221 Context *on_ready = create_context_callback<
1222 ImageReplayer, &ImageReplayer<I>::handle_process_entry_ready>(this);
11fdf7f2
TL
1223 Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry),
1224 m_replay_start_time);
7c673cae
FG
1225
1226 m_local_replay->process(m_event_entry, on_ready, on_commit);
1227}
1228
1229template <typename I>
1230void ImageReplayer<I>::handle_process_entry_ready(int r) {
1231 dout(20) << dendl;
11fdf7f2 1232 ceph_assert(r == 0);
7c673cae 1233
28e407b8
AA
1234 bool update_status = false;
1235 {
1236 RWLock::RLocker snap_locker(m_local_image_ctx->snap_lock);
1237 if (m_local_image_name != m_local_image_ctx->name) {
1238 m_local_image_name = m_local_image_ctx->name;
1239 update_status = true;
1240 }
1241 }
1242
1243 if (update_status) {
1244 reschedule_update_status_task(0);
1245 }
3efd9988 1246
7c673cae
FG
1247 // attempt to process the next event
1248 handle_replay_ready();
1249}
1250
1251template <typename I>
11fdf7f2
TL
1252void ImageReplayer<I>::handle_process_entry_safe(const ReplayEntry &replay_entry,
1253 const utime_t &replay_start_time,
7c673cae
FG
1254 int r) {
1255 dout(20) << "commit_tid=" << replay_entry.get_commit_tid() << ", r=" << r
1256 << dendl;
1257
1258 if (r < 0) {
1259 derr << "failed to commit journal event: " << cpp_strerror(r) << dendl;
1260 handle_replay_complete(r, "failed to commit journal event");
1261 } else {
11fdf7f2 1262 ceph_assert(m_remote_journaler != nullptr);
7c673cae
FG
1263 m_remote_journaler->committed(replay_entry);
1264 }
11fdf7f2
TL
1265
1266 auto bytes = replay_entry.get_data().length();
1267 auto latency = ceph_clock_now() - replay_start_time;
1268
1269 if (g_perf_counters) {
1270 g_perf_counters->inc(l_rbd_mirror_replay);
1271 g_perf_counters->inc(l_rbd_mirror_replay_bytes, bytes);
1272 g_perf_counters->tinc(l_rbd_mirror_replay_latency, latency);
1273 }
1274
1275 auto ctx = new FunctionContext(
1276 [this, bytes, latency](int r) {
1277 Mutex::Locker locker(m_lock);
1278 if (m_perf_counters) {
1279 m_perf_counters->inc(l_rbd_mirror_replay);
1280 m_perf_counters->inc(l_rbd_mirror_replay_bytes, bytes);
1281 m_perf_counters->tinc(l_rbd_mirror_replay_latency, latency);
1282 }
1283 m_event_replay_tracker.finish_op();
1284 });
1285 m_threads->work_queue->queue(ctx, 0);
7c673cae
FG
1286}
1287
1288template <typename I>
1289bool ImageReplayer<I>::update_mirror_image_status(bool force,
1290 const OptionalState &state) {
11fdf7f2 1291 dout(15) << dendl;
7c673cae
FG
1292 {
1293 Mutex::Locker locker(m_lock);
1294 if (!start_mirror_image_status_update(force, false)) {
1295 return false;
1296 }
1297 }
1298
1299 queue_mirror_image_status_update(state);
1300 return true;
1301}
1302
1303template <typename I>
1304bool ImageReplayer<I>::start_mirror_image_status_update(bool force,
1305 bool restarting) {
11fdf7f2 1306 ceph_assert(m_lock.is_locked());
7c673cae
FG
1307
1308 if (!force && !is_stopped_()) {
1309 if (!is_running_()) {
11fdf7f2 1310 dout(15) << "shut down in-progress: ignoring update" << dendl;
7c673cae
FG
1311 return false;
1312 } else if (m_in_flight_status_updates > (restarting ? 1 : 0)) {
11fdf7f2 1313 dout(15) << "already sending update" << dendl;
7c673cae
FG
1314 m_update_status_requested = true;
1315 return false;
1316 }
1317 }
1318
7c673cae 1319 ++m_in_flight_status_updates;
11fdf7f2 1320 dout(15) << "in-flight updates=" << m_in_flight_status_updates << dendl;
7c673cae
FG
1321 return true;
1322}
1323
1324template <typename I>
1325void ImageReplayer<I>::finish_mirror_image_status_update() {
28e407b8
AA
1326 reregister_admin_socket_hook();
1327
7c673cae
FG
1328 Context *on_finish = nullptr;
1329 {
1330 Mutex::Locker locker(m_lock);
11fdf7f2 1331 ceph_assert(m_in_flight_status_updates > 0);
7c673cae 1332 if (--m_in_flight_status_updates > 0) {
11fdf7f2 1333 dout(15) << "waiting on " << m_in_flight_status_updates << " in-flight "
7c673cae
FG
1334 << "updates" << dendl;
1335 return;
1336 }
1337
1338 std::swap(on_finish, m_on_update_status_finish);
1339 }
1340
11fdf7f2 1341 dout(15) << dendl;
7c673cae
FG
1342 if (on_finish != nullptr) {
1343 on_finish->complete(0);
1344 }
1345}
1346
1347template <typename I>
1348void ImageReplayer<I>::queue_mirror_image_status_update(const OptionalState &state) {
11fdf7f2 1349 dout(15) << dendl;
7c673cae
FG
1350 FunctionContext *ctx = new FunctionContext(
1351 [this, state](int r) {
1352 send_mirror_status_update(state);
1353 });
1354 m_threads->work_queue->queue(ctx, 0);
1355}
1356
1357template <typename I>
1358void ImageReplayer<I>::send_mirror_status_update(const OptionalState &opt_state) {
1359 State state;
1360 std::string state_desc;
1361 int last_r;
7c673cae 1362 bool stopping_replay;
c07f9fc5 1363
11fdf7f2
TL
1364 OptionalMirrorImageStatusState mirror_image_status_state =
1365 boost::make_optional(false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN);
c07f9fc5 1366 image_replayer::BootstrapRequest<I>* bootstrap_request = nullptr;
7c673cae
FG
1367 {
1368 Mutex::Locker locker(m_lock);
1369 state = m_state;
1370 state_desc = m_state_desc;
c07f9fc5 1371 mirror_image_status_state = m_mirror_image_status_state;
7c673cae 1372 last_r = m_last_r;
7c673cae 1373 stopping_replay = (m_local_image_ctx != nullptr);
c07f9fc5
FG
1374
1375 if (m_bootstrap_request != nullptr) {
1376 bootstrap_request = m_bootstrap_request;
1377 bootstrap_request->get();
1378 }
1379 }
1380
1381 bool syncing = false;
1382 if (bootstrap_request != nullptr) {
1383 syncing = bootstrap_request->is_syncing();
1384 bootstrap_request->put();
1385 bootstrap_request = nullptr;
7c673cae
FG
1386 }
1387
1388 if (opt_state) {
1389 state = *opt_state;
1390 }
1391
1392 cls::rbd::MirrorImageStatus status;
1393 status.up = true;
1394 switch (state) {
1395 case STATE_STARTING:
c07f9fc5 1396 if (syncing) {
7c673cae
FG
1397 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING;
1398 status.description = state_desc.empty() ? "syncing" : state_desc;
c07f9fc5 1399 mirror_image_status_state = status.state;
7c673cae
FG
1400 } else {
1401 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY;
1402 status.description = "starting replay";
1403 }
1404 break;
1405 case STATE_REPLAYING:
1406 case STATE_REPLAY_FLUSHING:
1407 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
1408 {
1409 Context *on_req_finish = new FunctionContext(
1410 [this](int r) {
11fdf7f2 1411 dout(15) << "replay status ready: r=" << r << dendl;
7c673cae
FG
1412 if (r >= 0) {
1413 send_mirror_status_update(boost::none);
1414 } else if (r == -EAGAIN) {
1415 // decrement in-flight status update counter
1416 handle_mirror_status_update(r);
1417 }
1418 });
1419
1420 std::string desc;
11fdf7f2 1421 ceph_assert(m_replay_status_formatter != nullptr);
7c673cae
FG
1422 if (!m_replay_status_formatter->get_or_send_update(&desc,
1423 on_req_finish)) {
11fdf7f2 1424 dout(15) << "waiting for replay status" << dendl;
7c673cae
FG
1425 return;
1426 }
1427 status.description = "replaying, " + desc;
11fdf7f2
TL
1428 mirror_image_status_state = boost::make_optional(
1429 false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN);
7c673cae
FG
1430 }
1431 break;
1432 case STATE_STOPPING:
1433 if (stopping_replay) {
1434 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY;
a8e16298 1435 status.description = state_desc.empty() ? "stopping replay" : state_desc;
7c673cae
FG
1436 break;
1437 }
1438 // FALLTHROUGH
1439 case STATE_STOPPED:
c07f9fc5
FG
1440 if (last_r == -EREMOTEIO) {
1441 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN;
1442 status.description = state_desc;
1443 mirror_image_status_state = status.state;
1444 } else if (last_r < 0) {
7c673cae
FG
1445 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR;
1446 status.description = state_desc;
c07f9fc5 1447 mirror_image_status_state = status.state;
7c673cae
FG
1448 } else {
1449 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED;
1450 status.description = state_desc.empty() ? "stopped" : state_desc;
c07f9fc5 1451 mirror_image_status_state = boost::none;
7c673cae
FG
1452 }
1453 break;
1454 default:
11fdf7f2 1455 ceph_assert(!"invalid state");
7c673cae
FG
1456 }
1457
c07f9fc5
FG
1458 {
1459 Mutex::Locker locker(m_lock);
1460 m_mirror_image_status_state = mirror_image_status_state;
1461 }
1462
1463 // prevent the status from ping-ponging when failed replays are restarted
1464 if (mirror_image_status_state &&
1465 *mirror_image_status_state == cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR) {
1466 status.state = *mirror_image_status_state;
1467 }
1468
11fdf7f2 1469 dout(15) << "status=" << status << dendl;
7c673cae
FG
1470 librados::ObjectWriteOperation op;
1471 librbd::cls_client::mirror_image_status_set(&op, m_global_image_id, status);
1472
11fdf7f2 1473 ceph_assert(m_local_ioctx);
7c673cae
FG
1474 librados::AioCompletion *aio_comp = create_rados_callback<
1475 ImageReplayer<I>, &ImageReplayer<I>::handle_mirror_status_update>(this);
11fdf7f2
TL
1476 int r = m_local_ioctx->aio_operate(RBD_MIRRORING, aio_comp, &op);
1477 ceph_assert(r == 0);
7c673cae
FG
1478 aio_comp->release();
1479}
1480
1481template <typename I>
1482void ImageReplayer<I>::handle_mirror_status_update(int r) {
11fdf7f2 1483 dout(15) << "r=" << r << dendl;
7c673cae
FG
1484
1485 bool running = false;
1486 bool started = false;
1487 {
1488 Mutex::Locker locker(m_lock);
1489 bool update_status_requested = false;
1490 std::swap(update_status_requested, m_update_status_requested);
1491
1492 running = is_running_();
1493 if (running && update_status_requested) {
1494 started = start_mirror_image_status_update(false, true);
1495 }
1496 }
1497
1498 // if a deferred update is available, send it -- otherwise reschedule
1499 // the timer task
1500 if (started) {
1501 queue_mirror_image_status_update(boost::none);
1502 } else if (running) {
11fdf7f2 1503 reschedule_update_status_task(0);
7c673cae
FG
1504 }
1505
1506 // mark committed status update as no longer in-flight
1507 finish_mirror_image_status_update();
1508}
1509
1510template <typename I>
1511void ImageReplayer<I>::reschedule_update_status_task(int new_interval) {
7c673cae
FG
1512 bool canceled_task = false;
1513 {
1514 Mutex::Locker locker(m_lock);
1515 Mutex::Locker timer_locker(m_threads->timer_lock);
1516
1517 if (m_update_status_task) {
11fdf7f2
TL
1518 dout(15) << "canceling existing status update task" << dendl;
1519
7c673cae
FG
1520 canceled_task = m_threads->timer->cancel_event(m_update_status_task);
1521 m_update_status_task = nullptr;
1522 }
1523
1524 if (new_interval > 0) {
1525 m_update_status_interval = new_interval;
1526 }
1527
7c673cae 1528 if (new_interval >= 0 && is_running_() &&
f64942e4 1529 start_mirror_image_status_update(true, false)) {
7c673cae
FG
1530 m_update_status_task = new FunctionContext(
1531 [this](int r) {
11fdf7f2 1532 ceph_assert(m_threads->timer_lock.is_locked());
7c673cae
FG
1533 m_update_status_task = nullptr;
1534
1535 queue_mirror_image_status_update(boost::none);
1536 });
11fdf7f2
TL
1537 dout(15) << "scheduling status update task after "
1538 << m_update_status_interval << " seconds" << dendl;
7c673cae
FG
1539 m_threads->timer->add_event_after(m_update_status_interval,
1540 m_update_status_task);
1541 }
1542 }
1543
1544 if (canceled_task) {
f64942e4 1545 // decrement in-flight status update counter for canceled task
7c673cae
FG
1546 finish_mirror_image_status_update();
1547 }
1548}
1549
1550template <typename I>
1551void ImageReplayer<I>::shut_down(int r) {
11fdf7f2 1552 dout(10) << "r=" << r << dendl;
3efd9988
FG
1553
1554 bool canceled_delayed_preprocess_task = false;
1555 {
1556 Mutex::Locker timer_locker(m_threads->timer_lock);
1557 if (m_delayed_preprocess_task != nullptr) {
1558 canceled_delayed_preprocess_task = m_threads->timer->cancel_event(
1559 m_delayed_preprocess_task);
11fdf7f2 1560 ceph_assert(canceled_delayed_preprocess_task);
3efd9988
FG
1561 m_delayed_preprocess_task = nullptr;
1562 }
1563 }
1564 if (canceled_delayed_preprocess_task) {
1565 // wake up sleeping replay
1566 m_event_replay_tracker.finish_op();
1567 }
1568
11fdf7f2
TL
1569 reschedule_update_status_task(-1);
1570
7c673cae
FG
1571 {
1572 Mutex::Locker locker(m_lock);
11fdf7f2 1573 ceph_assert(m_state == STATE_STOPPING);
7c673cae
FG
1574
1575 // if status updates are in-flight, wait for them to complete
1576 // before proceeding
1577 if (m_in_flight_status_updates > 0) {
1578 if (m_on_update_status_finish == nullptr) {
11fdf7f2 1579 dout(15) << "waiting for in-flight status update" << dendl;
7c673cae
FG
1580 m_on_update_status_finish = new FunctionContext(
1581 [this, r](int _r) {
1582 shut_down(r);
1583 });
1584 }
1585 return;
1586 }
1587 }
1588
31f18b77
FG
1589 // NOTE: it's important to ensure that the local image is fully
1590 // closed before attempting to close the remote journal in
1591 // case the remote cluster is unreachable
1592
7c673cae
FG
1593 // chain the shut down sequence (reverse order)
1594 Context *ctx = new FunctionContext(
1595 [this, r](int _r) {
11fdf7f2
TL
1596 if (m_local_ioctx) {
1597 update_mirror_image_status(true, STATE_STOPPED);
1598 }
7c673cae
FG
1599 handle_shut_down(r);
1600 });
31f18b77
FG
1601
1602 // close the remote journal
7c673cae
FG
1603 if (m_remote_journaler != nullptr) {
1604 ctx = new FunctionContext([this, ctx](int r) {
1605 delete m_remote_journaler;
1606 m_remote_journaler = nullptr;
1607 ctx->complete(0);
1608 });
1609 ctx = new FunctionContext([this, ctx](int r) {
1610 m_remote_journaler->remove_listener(&m_remote_listener);
1611 m_remote_journaler->shut_down(ctx);
1612 });
7c673cae 1613 }
31f18b77
FG
1614
1615 // stop the replay of remote journal events
1616 if (m_replay_handler != nullptr) {
1617 ctx = new FunctionContext([this, ctx](int r) {
1618 delete m_replay_handler;
1619 m_replay_handler = nullptr;
1620
1621 m_event_replay_tracker.wait_for_ops(ctx);
1622 });
1623 ctx = new FunctionContext([this, ctx](int r) {
1624 m_remote_journaler->stop_replay(ctx);
1625 });
1626 }
1627
1628 // close the local image (release exclusive lock)
1629 if (m_local_image_ctx) {
1630 ctx = new FunctionContext([this, ctx](int r) {
1631 CloseImageRequest<I> *request = CloseImageRequest<I>::create(
1632 &m_local_image_ctx, ctx);
1633 request->send();
1634 });
1635 }
1636
1637 // shut down event replay into the local image
7c673cae
FG
1638 if (m_local_journal != nullptr) {
1639 ctx = new FunctionContext([this, ctx](int r) {
1640 m_local_journal = nullptr;
1641 ctx->complete(0);
1642 });
1643 if (m_local_replay != nullptr) {
1644 ctx = new FunctionContext([this, ctx](int r) {
1645 m_local_journal->stop_external_replay();
1646 m_local_replay = nullptr;
1647
1648 EventPreprocessor<I>::destroy(m_event_preprocessor);
1649 m_event_preprocessor = nullptr;
1650 ctx->complete(0);
1651 });
1652 }
1653 ctx = new FunctionContext([this, ctx](int r) {
7c673cae
FG
1654 // blocks if listener notification is in-progress
1655 m_local_journal->remove_listener(m_journal_listener);
7c673cae
FG
1656 ctx->complete(0);
1657 });
31f18b77
FG
1658 }
1659
1660 // wait for all local in-flight replay events to complete
1661 ctx = new FunctionContext([this, ctx](int r) {
1662 if (r < 0) {
1663 derr << "error shutting down journal replay: " << cpp_strerror(r)
1664 << dendl;
1665 }
1666
1667 m_event_replay_tracker.wait_for_ops(ctx);
1668 });
1669
1670 // flush any local in-flight replay events
1671 if (m_local_replay != nullptr) {
7c673cae 1672 ctx = new FunctionContext([this, ctx](int r) {
31f18b77 1673 m_local_replay->shut_down(true, ctx);
7c673cae
FG
1674 });
1675 }
31f18b77 1676
7c673cae
FG
1677 m_threads->work_queue->queue(ctx, 0);
1678}
1679
1680template <typename I>
1681void ImageReplayer<I>::handle_shut_down(int r) {
1682 reschedule_update_status_task(-1);
1683
11fdf7f2
TL
1684 bool resync_requested = false;
1685 bool delete_requested = false;
3efd9988 1686 bool unregister_asok_hook = false;
7c673cae
FG
1687 {
1688 Mutex::Locker locker(m_lock);
1689
1690 // if status updates are in-flight, wait for them to complete
1691 // before proceeding
1692 if (m_in_flight_status_updates > 0) {
1693 if (m_on_update_status_finish == nullptr) {
11fdf7f2 1694 dout(15) << "waiting for in-flight status update" << dendl;
7c673cae
FG
1695 m_on_update_status_finish = new FunctionContext(
1696 [this, r](int _r) {
1697 handle_shut_down(r);
1698 });
1699 }
1700 return;
1701 }
1702
d2e6a577 1703 if (m_delete_requested && !m_local_image_id.empty()) {
11fdf7f2 1704 ceph_assert(m_remote_image.image_id.empty());
d2e6a577 1705 dout(0) << "remote image no longer exists: scheduling deletion" << dendl;
11fdf7f2
TL
1706 unregister_asok_hook = true;
1707 std::swap(delete_requested, m_delete_requested);
d2e6a577 1708 }
d2e6a577 1709
11fdf7f2
TL
1710 std::swap(resync_requested, m_resync_requested);
1711 if (delete_requested || resync_requested) {
d2e6a577 1712 m_local_image_id = "";
d2e6a577
FG
1713 } else if (m_last_r == -ENOENT &&
1714 m_local_image_id.empty() && m_remote_image.image_id.empty()) {
1715 dout(0) << "mirror image no longer exists" << dendl;
3efd9988 1716 unregister_asok_hook = true;
d2e6a577 1717 m_finished = true;
7c673cae
FG
1718 }
1719 }
1720
3efd9988
FG
1721 if (unregister_asok_hook) {
1722 unregister_admin_socket_hook();
1723 }
1724
11fdf7f2
TL
1725 if (delete_requested || resync_requested) {
1726 dout(5) << "moving image to trash" << dendl;
1727 auto ctx = new FunctionContext([this, r](int) {
1728 handle_shut_down(r);
1729 });
1730 ImageDeleter<I>::trash_move(*m_local_ioctx, m_global_image_id,
1731 resync_requested, m_threads->work_queue, ctx);
1732 return;
1733 }
7c673cae 1734
11fdf7f2 1735 dout(10) << "stop complete" << dendl;
7c673cae
FG
1736 ReplayStatusFormatter<I>::destroy(m_replay_status_formatter);
1737 m_replay_status_formatter = nullptr;
1738
1739 Context *on_start = nullptr;
1740 Context *on_stop = nullptr;
1741 {
1742 Mutex::Locker locker(m_lock);
1743 std::swap(on_start, m_on_start_finish);
1744 std::swap(on_stop, m_on_stop_finish);
1745 m_stop_requested = false;
11fdf7f2
TL
1746 ceph_assert(m_delayed_preprocess_task == nullptr);
1747 ceph_assert(m_state == STATE_STOPPING);
7c673cae
FG
1748 m_state = STATE_STOPPED;
1749 }
1750
1751 if (on_start != nullptr) {
11fdf7f2 1752 dout(10) << "on start finish complete, r=" << r << dendl;
7c673cae
FG
1753 on_start->complete(r);
1754 r = 0;
1755 }
1756 if (on_stop != nullptr) {
11fdf7f2 1757 dout(10) << "on stop finish complete, r=" << r << dendl;
7c673cae
FG
1758 on_stop->complete(r);
1759 }
1760}
1761
1762template <typename I>
1763void ImageReplayer<I>::handle_remote_journal_metadata_updated() {
1764 dout(20) << dendl;
1765
1766 cls::journal::Client client;
1767 {
1768 Mutex::Locker locker(m_lock);
1769 if (!is_running_()) {
1770 return;
1771 }
1772
1773 int r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
1774 if (r < 0) {
1775 derr << "failed to retrieve client: " << cpp_strerror(r) << dendl;
1776 return;
1777 }
1778 }
1779
1780 if (client.state != cls::journal::CLIENT_STATE_CONNECTED) {
1781 dout(0) << "client flagged disconnected, stopping image replay" << dendl;
1782 stop(nullptr, false, -ENOTCONN, "disconnected");
1783 }
1784}
1785
1786template <typename I>
1787std::string ImageReplayer<I>::to_string(const State state) {
1788 switch (state) {
1789 case ImageReplayer<I>::STATE_STARTING:
1790 return "Starting";
1791 case ImageReplayer<I>::STATE_REPLAYING:
1792 return "Replaying";
1793 case ImageReplayer<I>::STATE_REPLAY_FLUSHING:
1794 return "ReplayFlushing";
1795 case ImageReplayer<I>::STATE_STOPPING:
1796 return "Stopping";
1797 case ImageReplayer<I>::STATE_STOPPED:
1798 return "Stopped";
1799 default:
1800 break;
1801 }
1802 return "Unknown(" + stringify(state) + ")";
1803}
1804
1805template <typename I>
1806void ImageReplayer<I>::resync_image(Context *on_finish) {
11fdf7f2 1807 dout(10) << dendl;
7c673cae 1808
d2e6a577
FG
1809 m_resync_requested = true;
1810 stop(on_finish);
1811}
1812
1813template <typename I>
1814void ImageReplayer<I>::register_admin_socket_hook() {
3efd9988
FG
1815 ImageReplayerAdminSocketHook<I> *asok_hook;
1816 {
1817 Mutex::Locker locker(m_lock);
1818 if (m_asok_hook != nullptr) {
1819 return;
1820 }
7c673cae 1821
11fdf7f2
TL
1822 ceph_assert(m_perf_counters == nullptr);
1823
28e407b8 1824 dout(15) << "registered asok hook: " << m_name << dendl;
3efd9988
FG
1825 asok_hook = new ImageReplayerAdminSocketHook<I>(g_ceph_context, m_name,
1826 this);
1827 int r = asok_hook->register_commands();
1828 if (r == 0) {
1829 m_asok_hook = asok_hook;
11fdf7f2
TL
1830
1831 CephContext *cct = static_cast<CephContext *>(m_local->cct());
1832 auto prio = cct->_conf.get_val<int64_t>("rbd_mirror_perf_stats_prio");
1833 PerfCountersBuilder plb(g_ceph_context, "rbd_mirror_" + m_name,
1834 l_rbd_mirror_first, l_rbd_mirror_last);
1835 plb.add_u64_counter(l_rbd_mirror_replay, "replay", "Replays", "r", prio);
1836 plb.add_u64_counter(l_rbd_mirror_replay_bytes, "replay_bytes",
1837 "Replayed data", "rb", prio, unit_t(UNIT_BYTES));
1838 plb.add_time_avg(l_rbd_mirror_replay_latency, "replay_latency",
1839 "Replay latency", "rl", prio);
1840 m_perf_counters = plb.create_perf_counters();
1841 g_ceph_context->get_perfcounters_collection()->add(m_perf_counters);
1842
3efd9988
FG
1843 return;
1844 }
d2e6a577 1845 derr << "error registering admin socket commands" << dendl;
d2e6a577 1846 }
3efd9988 1847 delete asok_hook;
d2e6a577
FG
1848}
1849
1850template <typename I>
1851void ImageReplayer<I>::unregister_admin_socket_hook() {
28e407b8 1852 dout(15) << dendl;
d2e6a577 1853
3efd9988 1854 AdminSocketHook *asok_hook = nullptr;
11fdf7f2 1855 PerfCounters *perf_counters = nullptr;
3efd9988
FG
1856 {
1857 Mutex::Locker locker(m_lock);
1858 std::swap(asok_hook, m_asok_hook);
11fdf7f2 1859 std::swap(perf_counters, m_perf_counters);
3efd9988
FG
1860 }
1861 delete asok_hook;
11fdf7f2
TL
1862 if (perf_counters != nullptr) {
1863 g_ceph_context->get_perfcounters_collection()->remove(perf_counters);
1864 delete perf_counters;
1865 }
3efd9988
FG
1866}
1867
1868template <typename I>
28e407b8 1869void ImageReplayer<I>::reregister_admin_socket_hook() {
3efd9988
FG
1870 {
1871 Mutex::Locker locker(m_lock);
11fdf7f2
TL
1872 auto name = m_local_ioctx->get_pool_name() + "/" + m_local_image_name;
1873 if (m_asok_hook != nullptr && m_name == name) {
3efd9988
FG
1874 return;
1875 }
1876 m_name = name;
1877 }
1878 unregister_admin_socket_hook();
1879 register_admin_socket_hook();
7c673cae
FG
1880}
1881
1882template <typename I>
1883std::ostream &operator<<(std::ostream &os, const ImageReplayer<I> &replayer)
1884{
1885 os << "ImageReplayer: " << &replayer << " [" << replayer.get_local_pool_id()
1886 << "/" << replayer.get_global_image_id() << "]";
1887 return os;
1888}
1889
1890} // namespace mirror
1891} // namespace rbd
1892
1893template class rbd::mirror::ImageReplayer<librbd::ImageCtx>;