]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/ImageReplayer.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / tools / rbd_mirror / ImageReplayer.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "include/compat.h"
5#include "common/Formatter.h"
11fdf7f2 6#include "common/admin_socket.h"
7c673cae
FG
7#include "common/debug.h"
8#include "common/errno.h"
9#include "include/stringify.h"
10#include "cls/rbd/cls_rbd_client.h"
11#include "common/Timer.h"
12#include "common/WorkQueue.h"
13#include "global/global_context.h"
14#include "journal/Journaler.h"
7c673cae
FG
15#include "librbd/ExclusiveLock.h"
16#include "librbd/ImageCtx.h"
17#include "librbd/ImageState.h"
18#include "librbd/Journal.h"
19#include "librbd/Operations.h"
20#include "librbd/Utils.h"
d2e6a577 21#include "ImageDeleter.h"
7c673cae 22#include "ImageReplayer.h"
9f95a23c 23#include "MirrorStatusUpdater.h"
7c673cae
FG
24#include "Threads.h"
25#include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
9f95a23c
TL
26#include "tools/rbd_mirror/image_replayer/ReplayerListener.h"
27#include "tools/rbd_mirror/image_replayer/StateBuilder.h"
28#include "tools/rbd_mirror/image_replayer/Utils.h"
29#include "tools/rbd_mirror/image_replayer/journal/Replayer.h"
30#include "tools/rbd_mirror/image_replayer/journal/StateBuilder.h"
31#include <map>
7c673cae
FG
32
33#define dout_context g_ceph_context
34#define dout_subsys ceph_subsys_rbd_mirror
35#undef dout_prefix
36#define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
37 << __func__ << ": "
38
11fdf7f2
TL
39extern PerfCounters *g_perf_counters;
40
7c673cae
FG
41namespace rbd {
42namespace mirror {
43
44using librbd::util::create_context_callback;
7c673cae
FG
45
46template <typename I>
47std::ostream &operator<<(std::ostream &os,
48 const typename ImageReplayer<I>::State &state);
49
50namespace {
51
3efd9988 52template <typename I>
7c673cae
FG
53class ImageReplayerAdminSocketCommand {
54public:
3efd9988
FG
55 ImageReplayerAdminSocketCommand(const std::string &desc,
56 ImageReplayer<I> *replayer)
57 : desc(desc), replayer(replayer) {
58 }
7c673cae 59 virtual ~ImageReplayerAdminSocketCommand() {}
9f95a23c 60 virtual int call(Formatter *f) = 0;
3efd9988
FG
61
62 std::string desc;
63 ImageReplayer<I> *replayer;
64 bool registered = false;
7c673cae
FG
65};
66
67template <typename I>
3efd9988 68class StatusCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 69public:
3efd9988
FG
70 explicit StatusCommand(const std::string &desc, ImageReplayer<I> *replayer)
71 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
72 }
7c673cae 73
9f95a23c
TL
74 int call(Formatter *f) override {
75 this->replayer->print_status(f);
76 return 0;
7c673cae 77 }
7c673cae
FG
78};
79
80template <typename I>
3efd9988 81class StartCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 82public:
3efd9988
FG
83 explicit StartCommand(const std::string &desc, ImageReplayer<I> *replayer)
84 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
85 }
7c673cae 86
9f95a23c 87 int call(Formatter *f) override {
3efd9988 88 this->replayer->start(nullptr, true);
9f95a23c 89 return 0;
7c673cae 90 }
7c673cae
FG
91};
92
93template <typename I>
3efd9988 94class StopCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 95public:
3efd9988
FG
96 explicit StopCommand(const std::string &desc, ImageReplayer<I> *replayer)
97 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
98 }
7c673cae 99
9f95a23c 100 int call(Formatter *f) override {
3efd9988 101 this->replayer->stop(nullptr, true);
9f95a23c 102 return 0;
7c673cae 103 }
7c673cae
FG
104};
105
106template <typename I>
3efd9988 107class RestartCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 108public:
3efd9988
FG
109 explicit RestartCommand(const std::string &desc, ImageReplayer<I> *replayer)
110 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
111 }
7c673cae 112
9f95a23c 113 int call(Formatter *f) override {
3efd9988 114 this->replayer->restart();
9f95a23c 115 return 0;
7c673cae 116 }
7c673cae
FG
117};
118
119template <typename I>
3efd9988 120class FlushCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 121public:
3efd9988
FG
122 explicit FlushCommand(const std::string &desc, ImageReplayer<I> *replayer)
123 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
124 }
7c673cae 125
9f95a23c 126 int call(Formatter *f) override {
28e407b8 127 this->replayer->flush();
9f95a23c 128 return 0;
7c673cae 129 }
7c673cae
FG
130};
131
132template <typename I>
133class ImageReplayerAdminSocketHook : public AdminSocketHook {
134public:
135 ImageReplayerAdminSocketHook(CephContext *cct, const std::string &name,
31f18b77 136 ImageReplayer<I> *replayer)
3efd9988
FG
137 : admin_socket(cct->get_admin_socket()),
138 commands{{"rbd mirror flush " + name,
139 new FlushCommand<I>("flush rbd mirror " + name, replayer)},
140 {"rbd mirror restart " + name,
141 new RestartCommand<I>("restart rbd mirror " + name, replayer)},
142 {"rbd mirror start " + name,
143 new StartCommand<I>("start rbd mirror " + name, replayer)},
144 {"rbd mirror status " + name,
145 new StatusCommand<I>("get status for rbd mirror " + name, replayer)},
146 {"rbd mirror stop " + name,
147 new StopCommand<I>("stop rbd mirror " + name, replayer)}} {
d2e6a577
FG
148 }
149
150 int register_commands() {
3efd9988 151 for (auto &it : commands) {
9f95a23c 152 int r = admin_socket->register_command(it.first, this,
3efd9988
FG
153 it.second->desc);
154 if (r < 0) {
155 return r;
156 }
157 it.second->registered = true;
7c673cae 158 }
d2e6a577 159 return 0;
7c673cae
FG
160 }
161
162 ~ImageReplayerAdminSocketHook() override {
9f95a23c 163 admin_socket->unregister_commands(this);
3efd9988 164 for (auto &it : commands) {
3efd9988 165 delete it.second;
7c673cae 166 }
31f18b77 167 commands.clear();
7c673cae
FG
168 }
169
9f95a23c
TL
170 int call(std::string_view command, const cmdmap_t& cmdmap,
171 Formatter *f,
172 std::ostream& errss,
173 bufferlist& out) override {
3efd9988 174 auto i = commands.find(command);
11fdf7f2 175 ceph_assert(i != commands.end());
9f95a23c 176 return i->second->call(f);
7c673cae
FG
177 }
178
179private:
11fdf7f2
TL
180 typedef std::map<std::string, ImageReplayerAdminSocketCommand<I>*,
181 std::less<>> Commands;
7c673cae
FG
182
183 AdminSocket *admin_socket;
184 Commands commands;
185};
186
7c673cae
FG
187} // anonymous namespace
188
189template <typename I>
190void ImageReplayer<I>::BootstrapProgressContext::update_progress(
191 const std::string &description, bool flush)
192{
193 const std::string desc = "bootstrapping, " + description;
194 replayer->set_state_description(0, desc);
195 if (flush) {
196 replayer->update_mirror_image_status(false, boost::none);
197 }
198}
199
200template <typename I>
9f95a23c
TL
201struct ImageReplayer<I>::ReplayerListener
202 : public image_replayer::ReplayerListener {
203 ImageReplayer<I>* image_replayer;
204
205 ReplayerListener(ImageReplayer<I>* image_replayer)
206 : image_replayer(image_replayer) {
207 }
208
209 void handle_notification() override {
210 image_replayer->handle_replayer_notification();
211 }
212};
7c673cae
FG
213
214template <typename I>
9f95a23c
TL
215ImageReplayer<I>::ImageReplayer(
216 librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid,
217 const std::string &global_image_id, Threads<I> *threads,
218 InstanceWatcher<I> *instance_watcher,
219 MirrorStatusUpdater<I>* local_status_updater,
220 journal::CacheManagerHandler *cache_manager_handler,
221 PoolMetaCache* pool_meta_cache) :
222 m_local_io_ctx(local_io_ctx), m_local_mirror_uuid(local_mirror_uuid),
223 m_global_image_id(global_image_id), m_threads(threads),
31f18b77 224 m_instance_watcher(instance_watcher),
9f95a23c
TL
225 m_local_status_updater(local_status_updater),
226 m_cache_manager_handler(cache_manager_handler),
227 m_pool_meta_cache(pool_meta_cache),
228 m_local_image_name(global_image_id),
229 m_lock(ceph::make_mutex("rbd::mirror::ImageReplayer " +
230 stringify(local_io_ctx.get_id()) + " " + global_image_id)),
7c673cae 231 m_progress_cxt(this),
9f95a23c 232 m_replayer_listener(new ReplayerListener(this))
7c673cae
FG
233{
234 // Register asok commands using a temporary "remote_pool_name/global_image_id"
235 // name. When the image name becomes known on start the asok commands will be
236 // re-registered using "remote_pool_name/remote_image_name" name.
237
9f95a23c
TL
238 m_image_spec = image_replayer::util::compute_image_spec(
239 local_io_ctx, global_image_id);
d2e6a577 240 register_admin_socket_hook();
7c673cae
FG
241}
242
243template <typename I>
244ImageReplayer<I>::~ImageReplayer()
245{
d2e6a577 246 unregister_admin_socket_hook();
9f95a23c 247 ceph_assert(m_state_builder == nullptr);
11fdf7f2
TL
248 ceph_assert(m_on_start_finish == nullptr);
249 ceph_assert(m_on_stop_finish == nullptr);
250 ceph_assert(m_bootstrap_request == nullptr);
9f95a23c 251 delete m_replayer_listener;
7c673cae
FG
252}
253
c07f9fc5
FG
254template <typename I>
255image_replayer::HealthState ImageReplayer<I>::get_health_state() const {
9f95a23c 256 std::lock_guard locker{m_lock};
c07f9fc5
FG
257
258 if (!m_mirror_image_status_state) {
259 return image_replayer::HEALTH_STATE_OK;
260 } else if (*m_mirror_image_status_state ==
261 cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING ||
262 *m_mirror_image_status_state ==
263 cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN) {
264 return image_replayer::HEALTH_STATE_WARNING;
265 }
266 return image_replayer::HEALTH_STATE_ERROR;
267}
268
7c673cae 269template <typename I>
9f95a23c
TL
270void ImageReplayer<I>::add_peer(const Peer<I>& peer) {
271 dout(10) << "peer=" << peer << dendl;
272
273 std::lock_guard locker{m_lock};
274 auto it = m_peers.find(peer);
d2e6a577 275 if (it == m_peers.end()) {
9f95a23c 276 m_peers.insert(peer);
7c673cae
FG
277 }
278}
279
7c673cae
FG
280template <typename I>
281void ImageReplayer<I>::set_state_description(int r, const std::string &desc) {
9f95a23c 282 dout(10) << "r=" << r << ", desc=" << desc << dendl;
7c673cae 283
9f95a23c 284 std::lock_guard l{m_lock};
7c673cae
FG
285 m_last_r = r;
286 m_state_desc = desc;
287}
288
289template <typename I>
290void ImageReplayer<I>::start(Context *on_finish, bool manual)
291{
11fdf7f2 292 dout(10) << "on_finish=" << on_finish << dendl;
7c673cae
FG
293
294 int r = 0;
295 {
9f95a23c 296 std::lock_guard locker{m_lock};
7c673cae
FG
297 if (!is_stopped_()) {
298 derr << "already running" << dendl;
299 r = -EINVAL;
300 } else if (m_manual_stop && !manual) {
301 dout(5) << "stopped manually, ignoring start without manual flag"
302 << dendl;
303 r = -EPERM;
304 } else {
305 m_state = STATE_STARTING;
306 m_last_r = 0;
307 m_state_desc.clear();
308 m_manual_stop = false;
d2e6a577 309 m_delete_requested = false;
7c673cae
FG
310
311 if (on_finish != nullptr) {
11fdf7f2 312 ceph_assert(m_on_start_finish == nullptr);
7c673cae
FG
313 m_on_start_finish = on_finish;
314 }
11fdf7f2 315 ceph_assert(m_on_stop_finish == nullptr);
7c673cae
FG
316 }
317 }
318
319 if (r < 0) {
320 if (on_finish) {
321 on_finish->complete(r);
322 }
323 return;
324 }
325
9f95a23c 326 bootstrap();
7c673cae
FG
327}
328
329template <typename I>
9f95a23c 330void ImageReplayer<I>::bootstrap() {
11fdf7f2 331 dout(10) << dendl;
7c673cae 332
9f95a23c 333 std::unique_lock locker{m_lock};
b32b8144 334 if (m_peers.empty()) {
9f95a23c
TL
335 locker.unlock();
336
337 dout(5) << "no peer clusters" << dendl;
338 on_start_fail(-ENOENT, "no peer clusters");
b32b8144
FG
339 return;
340 }
7c673cae 341
d2e6a577 342 // TODO need to support multiple remote images
11fdf7f2 343 ceph_assert(!m_peers.empty());
9f95a23c 344 m_remote_image_peer = *m_peers.begin();
7c673cae 345
9f95a23c 346 if (on_start_interrupted(m_lock)) {
b32b8144
FG
347 return;
348 }
7c673cae 349
9f95a23c
TL
350 ceph_assert(m_state_builder == nullptr);
351 auto ctx = create_context_callback<
11fdf7f2 352 ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
9f95a23c
TL
353 auto request = image_replayer::BootstrapRequest<I>::create(
354 m_threads, m_local_io_ctx, m_remote_image_peer.io_ctx, m_instance_watcher,
355 m_global_image_id, m_local_mirror_uuid,
356 m_remote_image_peer.remote_pool_meta, m_cache_manager_handler,
357 m_pool_meta_cache, &m_progress_cxt, &m_state_builder, &m_resync_requested,
358 ctx);
7c673cae 359
9f95a23c
TL
360 request->get();
361 m_bootstrap_request = request;
362 locker.unlock();
7c673cae 363
9f95a23c 364 update_mirror_image_status(false, boost::none);
7c673cae
FG
365 request->send();
366}
367
368template <typename I>
369void ImageReplayer<I>::handle_bootstrap(int r) {
11fdf7f2 370 dout(10) << "r=" << r << dendl;
7c673cae 371 {
9f95a23c 372 std::lock_guard locker{m_lock};
7c673cae
FG
373 m_bootstrap_request->put();
374 m_bootstrap_request = nullptr;
7c673cae
FG
375 }
376
11fdf7f2
TL
377 if (on_start_interrupted()) {
378 return;
9f95a23c
TL
379 } else if (r == -ENOMSG) {
380 dout(5) << "local image is primary" << dendl;
381 on_start_fail(0, "local image is primary");
382 return;
11fdf7f2 383 } else if (r == -EREMOTEIO) {
c07f9fc5
FG
384 dout(5) << "remote image is non-primary" << dendl;
385 on_start_fail(-EREMOTEIO, "remote image is non-primary");
7c673cae
FG
386 return;
387 } else if (r == -EEXIST) {
7c673cae
FG
388 on_start_fail(r, "split-brain detected");
389 return;
9f95a23c
TL
390 } else if (r == -ENOLINK) {
391 m_delete_requested = true;
392 on_start_fail(0, "remote image no longer exists");
393 return;
7c673cae
FG
394 } else if (r < 0) {
395 on_start_fail(r, "error bootstrapping replay");
396 return;
d2e6a577
FG
397 } else if (m_resync_requested) {
398 on_start_fail(0, "resync requested");
399 return;
7c673cae
FG
400 }
401
9f95a23c 402 start_replay();
7c673cae
FG
403}
404
405template <typename I>
9f95a23c 406void ImageReplayer<I>::start_replay() {
11fdf7f2 407 dout(10) << dendl;
7c673cae 408
9f95a23c
TL
409 std::unique_lock locker{m_lock};
410 ceph_assert(m_replayer == nullptr);
411 m_replayer = m_state_builder->create_replayer(m_threads, m_instance_watcher,
412 m_local_mirror_uuid,
413 m_pool_meta_cache,
414 m_replayer_listener);
415
416 auto ctx = create_context_callback<
417 ImageReplayer<I>, &ImageReplayer<I>::handle_start_replay>(this);
418 m_replayer->init(ctx);
7c673cae
FG
419}
420
421template <typename I>
9f95a23c 422void ImageReplayer<I>::handle_start_replay(int r) {
11fdf7f2 423 dout(10) << "r=" << r << dendl;
7c673cae 424
11fdf7f2
TL
425 if (on_start_interrupted()) {
426 return;
427 } else if (r < 0) {
9f95a23c
TL
428 std::string error_description = m_replayer->get_error_description();
429 if (r == -ENOTCONN && m_replayer->is_resync_requested()) {
430 std::unique_lock locker{m_lock};
d2e6a577 431 m_resync_requested = true;
7c673cae 432 }
7c673cae 433
9f95a23c
TL
434 // shut down not required if init failed
435 m_replayer->destroy();
436 m_replayer = nullptr;
7c673cae 437
9f95a23c
TL
438 derr << "error starting replay: " << cpp_strerror(r) << dendl;
439 on_start_fail(r, error_description);
7c673cae
FG
440 return;
441 }
442
9f95a23c 443 Context *on_finish = nullptr;
7c673cae 444 {
9f95a23c 445 std::unique_lock locker{m_lock};
11fdf7f2 446 ceph_assert(m_state == STATE_STARTING);
7c673cae
FG
447 m_state = STATE_REPLAYING;
448 std::swap(m_on_start_finish, on_finish);
449 }
450
7c673cae 451 update_mirror_image_status(true, boost::none);
7c673cae 452 if (on_replay_interrupted()) {
9f95a23c
TL
453 if (on_finish != nullptr) {
454 on_finish->complete(r);
455 }
7c673cae
FG
456 return;
457 }
458
11fdf7f2 459 dout(10) << "start succeeded" << dendl;
d2e6a577 460 if (on_finish != nullptr) {
11fdf7f2 461 dout(10) << "on finish complete, r=" << r << dendl;
d2e6a577
FG
462 on_finish->complete(r);
463 }
7c673cae
FG
464}
465
466template <typename I>
467void ImageReplayer<I>::on_start_fail(int r, const std::string &desc)
468{
9f95a23c
TL
469 dout(10) << "r=" << r << ", desc=" << desc << dendl;
470 Context *ctx = new LambdaContext([this, r, desc](int _r) {
7c673cae 471 {
9f95a23c 472 std::lock_guard locker{m_lock};
11fdf7f2 473 ceph_assert(m_state == STATE_STARTING);
7c673cae 474 m_state = STATE_STOPPING;
d2e6a577 475 if (r < 0 && r != -ECANCELED && r != -EREMOTEIO && r != -ENOENT) {
7c673cae
FG
476 derr << "start failed: " << cpp_strerror(r) << dendl;
477 } else {
11fdf7f2 478 dout(10) << "start canceled" << dendl;
7c673cae
FG
479 }
480 }
481
482 set_state_description(r, desc);
9f95a23c 483 update_mirror_image_status(false, boost::none);
7c673cae
FG
484 shut_down(r);
485 });
486 m_threads->work_queue->queue(ctx, 0);
487}
488
489template <typename I>
11fdf7f2 490bool ImageReplayer<I>::on_start_interrupted() {
9f95a23c 491 std::lock_guard locker{m_lock};
11fdf7f2
TL
492 return on_start_interrupted(m_lock);
493}
494
495template <typename I>
9f95a23c
TL
496bool ImageReplayer<I>::on_start_interrupted(ceph::mutex& lock) {
497 ceph_assert(ceph_mutex_is_locked(m_lock));
11fdf7f2
TL
498 ceph_assert(m_state == STATE_STARTING);
499 if (!m_stop_requested) {
7c673cae
FG
500 return false;
501 }
502
11fdf7f2 503 on_start_fail(-ECANCELED, "");
7c673cae
FG
504 return true;
505}
506
507template <typename I>
508void ImageReplayer<I>::stop(Context *on_finish, bool manual, int r,
509 const std::string& desc)
510{
11fdf7f2 511 dout(10) << "on_finish=" << on_finish << ", manual=" << manual
7c673cae
FG
512 << ", desc=" << desc << dendl;
513
514 image_replayer::BootstrapRequest<I> *bootstrap_request = nullptr;
515 bool shut_down_replay = false;
516 bool running = true;
7c673cae 517 {
9f95a23c 518 std::lock_guard locker{m_lock};
7c673cae
FG
519
520 if (!is_running_()) {
521 running = false;
522 } else {
523 if (!is_stopped_()) {
524 if (m_state == STATE_STARTING) {
11fdf7f2
TL
525 dout(10) << "canceling start" << dendl;
526 if (m_bootstrap_request != nullptr) {
7c673cae
FG
527 bootstrap_request = m_bootstrap_request;
528 bootstrap_request->get();
529 }
530 } else {
11fdf7f2 531 dout(10) << "interrupting replay" << dendl;
7c673cae
FG
532 shut_down_replay = true;
533 }
534
11fdf7f2 535 ceph_assert(m_on_stop_finish == nullptr);
7c673cae
FG
536 std::swap(m_on_stop_finish, on_finish);
537 m_stop_requested = true;
538 m_manual_stop = manual;
7c673cae
FG
539 }
540 }
541 }
542
543 // avoid holding lock since bootstrap request will update status
544 if (bootstrap_request != nullptr) {
11fdf7f2 545 dout(10) << "canceling bootstrap" << dendl;
7c673cae
FG
546 bootstrap_request->cancel();
547 bootstrap_request->put();
548 }
549
7c673cae
FG
550 if (!running) {
551 dout(20) << "not running" << dendl;
552 if (on_finish) {
553 on_finish->complete(-EINVAL);
554 }
555 return;
556 }
557
558 if (shut_down_replay) {
559 on_stop_journal_replay(r, desc);
560 } else if (on_finish != nullptr) {
561 on_finish->complete(0);
562 }
563}
564
565template <typename I>
566void ImageReplayer<I>::on_stop_journal_replay(int r, const std::string &desc)
567{
11fdf7f2 568 dout(10) << dendl;
7c673cae
FG
569
570 {
9f95a23c 571 std::lock_guard locker{m_lock};
7c673cae
FG
572 if (m_state != STATE_REPLAYING) {
573 // might be invoked multiple times while stopping
574 return;
575 }
9f95a23c 576
7c673cae
FG
577 m_stop_requested = true;
578 m_state = STATE_STOPPING;
579 }
580
581 set_state_description(r, desc);
a8e16298 582 update_mirror_image_status(true, boost::none);
7c673cae
FG
583 shut_down(0);
584}
585
7c673cae
FG
586template <typename I>
587void ImageReplayer<I>::restart(Context *on_finish)
588{
9f95a23c 589 auto ctx = new LambdaContext(
7c673cae
FG
590 [this, on_finish](int r) {
591 if (r < 0) {
592 // Try start anyway.
593 }
594 start(on_finish, true);
595 });
596 stop(ctx);
597}
598
599template <typename I>
81eedcae 600void ImageReplayer<I>::flush()
7c673cae 601{
81eedcae 602 C_SaferCond ctx;
7c673cae 603
9f95a23c
TL
604 {
605 std::unique_lock locker{m_lock};
606 if (m_state != STATE_REPLAYING) {
607 return;
608 }
7c673cae 609
9f95a23c
TL
610 dout(10) << dendl;
611 ceph_assert(m_replayer != nullptr);
612 m_replayer->flush(&ctx);
81eedcae
TL
613 }
614
9f95a23c
TL
615 int r = ctx.wait();
616 if (r >= 0) {
617 update_mirror_image_status(false, boost::none);
7c673cae 618 }
7c673cae
FG
619}
620
621template <typename I>
622bool ImageReplayer<I>::on_replay_interrupted()
623{
624 bool shut_down;
625 {
9f95a23c 626 std::lock_guard locker{m_lock};
7c673cae
FG
627 shut_down = m_stop_requested;
628 }
629
630 if (shut_down) {
631 on_stop_journal_replay();
632 }
633 return shut_down;
634}
635
636template <typename I>
9f95a23c 637void ImageReplayer<I>::print_status(Formatter *f)
7c673cae 638{
11fdf7f2 639 dout(10) << dendl;
7c673cae 640
9f95a23c 641 std::lock_guard l{m_lock};
7c673cae 642
9f95a23c
TL
643 f->open_object_section("image_replayer");
644 f->dump_string("name", m_image_spec);
645 f->dump_string("state", to_string(m_state));
646 f->close_section();
7c673cae
FG
647}
648
649template <typename I>
9f95a23c
TL
650void ImageReplayer<I>::update_mirror_image_status(
651 bool force, const OptionalState &opt_state) {
652 dout(15) << "force=" << force << ", "
653 << "state=" << opt_state << dendl;
7c673cae
FG
654
655 {
9f95a23c
TL
656 std::lock_guard locker{m_lock};
657 if (!force && !is_stopped_() && !is_running_()) {
658 dout(15) << "shut down in-progress: ignoring update" << dendl;
28e407b8 659 return;
28e407b8
AA
660 }
661 }
662
9f95a23c
TL
663 m_in_flight_op_tracker.start_op();
664 auto ctx = new LambdaContext(
665 [this, force, opt_state](int r) {
666 set_mirror_image_status_update(force, opt_state);
11fdf7f2
TL
667 });
668 m_threads->work_queue->queue(ctx, 0);
7c673cae
FG
669}
670
671template <typename I>
9f95a23c
TL
672void ImageReplayer<I>::set_mirror_image_status_update(
673 bool force, const OptionalState &opt_state) {
674 dout(15) << "force=" << force << ", "
675 << "state=" << opt_state << dendl;
7c673cae 676
28e407b8
AA
677 reregister_admin_socket_hook();
678
7c673cae
FG
679 State state;
680 std::string state_desc;
681 int last_r;
7c673cae 682 bool stopping_replay;
c07f9fc5 683
9f95a23c
TL
684 auto mirror_image_status_state = boost::make_optional(
685 false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN);
c07f9fc5 686 image_replayer::BootstrapRequest<I>* bootstrap_request = nullptr;
7c673cae 687 {
9f95a23c 688 std::lock_guard locker{m_lock};
7c673cae
FG
689 state = m_state;
690 state_desc = m_state_desc;
c07f9fc5 691 mirror_image_status_state = m_mirror_image_status_state;
7c673cae 692 last_r = m_last_r;
9f95a23c 693 stopping_replay = (m_replayer != nullptr);
c07f9fc5
FG
694
695 if (m_bootstrap_request != nullptr) {
696 bootstrap_request = m_bootstrap_request;
697 bootstrap_request->get();
698 }
699 }
700
701 bool syncing = false;
702 if (bootstrap_request != nullptr) {
703 syncing = bootstrap_request->is_syncing();
704 bootstrap_request->put();
705 bootstrap_request = nullptr;
7c673cae
FG
706 }
707
708 if (opt_state) {
709 state = *opt_state;
710 }
711
9f95a23c 712 cls::rbd::MirrorImageSiteStatus status;
7c673cae
FG
713 status.up = true;
714 switch (state) {
715 case STATE_STARTING:
c07f9fc5 716 if (syncing) {
7c673cae
FG
717 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING;
718 status.description = state_desc.empty() ? "syncing" : state_desc;
c07f9fc5 719 mirror_image_status_state = status.state;
7c673cae
FG
720 } else {
721 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY;
722 status.description = "starting replay";
723 }
724 break;
725 case STATE_REPLAYING:
7c673cae
FG
726 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
727 {
9f95a23c
TL
728 std::string desc;
729 auto on_req_finish = new LambdaContext(
730 [this, force](int r) {
11fdf7f2 731 dout(15) << "replay status ready: r=" << r << dendl;
7c673cae 732 if (r >= 0) {
9f95a23c 733 set_mirror_image_status_update(force, boost::none);
7c673cae 734 } else if (r == -EAGAIN) {
9f95a23c 735 m_in_flight_op_tracker.finish_op();
7c673cae
FG
736 }
737 });
738
9f95a23c
TL
739 ceph_assert(m_replayer != nullptr);
740 if (!m_replayer->get_replay_status(&desc, on_req_finish)) {
11fdf7f2 741 dout(15) << "waiting for replay status" << dendl;
7c673cae
FG
742 return;
743 }
9f95a23c 744
7c673cae 745 status.description = "replaying, " + desc;
11fdf7f2
TL
746 mirror_image_status_state = boost::make_optional(
747 false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN);
7c673cae
FG
748 }
749 break;
750 case STATE_STOPPING:
751 if (stopping_replay) {
752 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY;
a8e16298 753 status.description = state_desc.empty() ? "stopping replay" : state_desc;
7c673cae
FG
754 break;
755 }
756 // FALLTHROUGH
757 case STATE_STOPPED:
c07f9fc5
FG
758 if (last_r == -EREMOTEIO) {
759 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN;
760 status.description = state_desc;
761 mirror_image_status_state = status.state;
9f95a23c 762 } else if (last_r < 0 && last_r != -ECANCELED) {
7c673cae
FG
763 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR;
764 status.description = state_desc;
c07f9fc5 765 mirror_image_status_state = status.state;
7c673cae
FG
766 } else {
767 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED;
768 status.description = state_desc.empty() ? "stopped" : state_desc;
c07f9fc5 769 mirror_image_status_state = boost::none;
7c673cae
FG
770 }
771 break;
772 default:
11fdf7f2 773 ceph_assert(!"invalid state");
7c673cae
FG
774 }
775
c07f9fc5 776 {
9f95a23c 777 std::lock_guard locker{m_lock};
c07f9fc5
FG
778 m_mirror_image_status_state = mirror_image_status_state;
779 }
780
781 // prevent the status from ping-ponging when failed replays are restarted
782 if (mirror_image_status_state &&
783 *mirror_image_status_state == cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR) {
784 status.state = *mirror_image_status_state;
785 }
786
11fdf7f2 787 dout(15) << "status=" << status << dendl;
9f95a23c
TL
788 m_local_status_updater->set_mirror_image_status(m_global_image_id, status,
789 force);
790 if (m_remote_image_peer.mirror_status_updater != nullptr) {
791 m_remote_image_peer.mirror_status_updater->set_mirror_image_status(
792 m_global_image_id, status, force);
7c673cae
FG
793 }
794
9f95a23c 795 m_in_flight_op_tracker.finish_op();
7c673cae
FG
796}
797
798template <typename I>
799void ImageReplayer<I>::shut_down(int r) {
11fdf7f2 800 dout(10) << "r=" << r << dendl;
3efd9988 801
3efd9988 802 {
9f95a23c 803 std::lock_guard locker{m_lock};
11fdf7f2 804 ceph_assert(m_state == STATE_STOPPING);
7c673cae
FG
805 }
806
9f95a23c
TL
807 if (!m_in_flight_op_tracker.empty()) {
808 dout(15) << "waiting for in-flight operations to complete" << dendl;
809 m_in_flight_op_tracker.wait_for_ops(new LambdaContext([this, r](int) {
810 shut_down(r);
811 }));
812 return;
813 }
31f18b77 814
7c673cae 815 // chain the shut down sequence (reverse order)
9f95a23c 816 Context *ctx = new LambdaContext(
7c673cae 817 [this, r](int _r) {
9f95a23c 818 update_mirror_image_status(true, STATE_STOPPED);
7c673cae
FG
819 handle_shut_down(r);
820 });
31f18b77 821
9f95a23c
TL
822 // destruct the state builder
823 if (m_state_builder != nullptr) {
824 ctx = new LambdaContext([this, ctx](int r) {
825 m_state_builder->close(ctx);
31f18b77
FG
826 });
827 }
828
9f95a23c
TL
829 // close the replayer
830 if (m_replayer != nullptr) {
831 ctx = new LambdaContext([this, ctx](int r) {
832 m_replayer->destroy();
833 m_replayer = nullptr;
834 ctx->complete(0);
835 });
836 ctx = new LambdaContext([this, ctx](int r) {
837 m_replayer->shut_down(ctx);
31f18b77 838 });
7c673cae 839 }
31f18b77 840
7c673cae
FG
841 m_threads->work_queue->queue(ctx, 0);
842}
843
844template <typename I>
845void ImageReplayer<I>::handle_shut_down(int r) {
11fdf7f2
TL
846 bool resync_requested = false;
847 bool delete_requested = false;
3efd9988 848 bool unregister_asok_hook = false;
7c673cae 849 {
9f95a23c 850 std::lock_guard locker{m_lock};
7c673cae 851
9f95a23c
TL
852 if (m_delete_requested && m_state_builder != nullptr &&
853 !m_state_builder->local_image_id.empty()) {
854 ceph_assert(m_state_builder->remote_image_id.empty());
d2e6a577 855 dout(0) << "remote image no longer exists: scheduling deletion" << dendl;
11fdf7f2
TL
856 unregister_asok_hook = true;
857 std::swap(delete_requested, m_delete_requested);
d2e6a577 858 }
d2e6a577 859
11fdf7f2 860 std::swap(resync_requested, m_resync_requested);
9f95a23c
TL
861 if (!delete_requested && !resync_requested && m_last_r == -ENOENT &&
862 ((m_state_builder == nullptr) ||
863 (m_state_builder->local_image_id.empty() &&
864 m_state_builder->remote_image_id.empty()))) {
d2e6a577 865 dout(0) << "mirror image no longer exists" << dendl;
3efd9988 866 unregister_asok_hook = true;
d2e6a577 867 m_finished = true;
7c673cae
FG
868 }
869 }
870
3efd9988
FG
871 if (unregister_asok_hook) {
872 unregister_admin_socket_hook();
873 }
874
11fdf7f2
TL
875 if (delete_requested || resync_requested) {
876 dout(5) << "moving image to trash" << dendl;
9f95a23c 877 auto ctx = new LambdaContext([this, r](int) {
11fdf7f2
TL
878 handle_shut_down(r);
879 });
9f95a23c 880 ImageDeleter<I>::trash_move(m_local_io_ctx, m_global_image_id,
11fdf7f2
TL
881 resync_requested, m_threads->work_queue, ctx);
882 return;
883 }
7c673cae 884
9f95a23c
TL
885 if (!m_in_flight_op_tracker.empty()) {
886 dout(15) << "waiting for in-flight operations to complete" << dendl;
887 m_in_flight_op_tracker.wait_for_ops(new LambdaContext([this, r](int) {
888 handle_shut_down(r);
889 }));
890 return;
891 }
7c673cae 892
9f95a23c
TL
893 if (m_local_status_updater->exists(m_global_image_id)) {
894 dout(15) << "removing local mirror image status" << dendl;
895 auto ctx = new LambdaContext([this, r](int) {
896 handle_shut_down(r);
897 });
898 m_local_status_updater->remove_mirror_image_status(m_global_image_id, ctx);
899 return;
900 }
901
902 if (m_remote_image_peer.mirror_status_updater != nullptr &&
903 m_remote_image_peer.mirror_status_updater->exists(m_global_image_id)) {
904 dout(15) << "removing remote mirror image status" << dendl;
905 auto ctx = new LambdaContext([this, r](int) {
906 handle_shut_down(r);
907 });
908 m_remote_image_peer.mirror_status_updater->remove_mirror_image_status(
909 m_global_image_id, ctx);
910 return;
911 }
912
913 if (m_state_builder != nullptr) {
914 m_state_builder->destroy();
915 m_state_builder = nullptr;
916 }
917
918 dout(10) << "stop complete" << dendl;
7c673cae
FG
919 Context *on_start = nullptr;
920 Context *on_stop = nullptr;
921 {
9f95a23c 922 std::lock_guard locker{m_lock};
7c673cae
FG
923 std::swap(on_start, m_on_start_finish);
924 std::swap(on_stop, m_on_stop_finish);
925 m_stop_requested = false;
11fdf7f2 926 ceph_assert(m_state == STATE_STOPPING);
7c673cae
FG
927 m_state = STATE_STOPPED;
928 }
929
930 if (on_start != nullptr) {
11fdf7f2 931 dout(10) << "on start finish complete, r=" << r << dendl;
7c673cae
FG
932 on_start->complete(r);
933 r = 0;
934 }
935 if (on_stop != nullptr) {
11fdf7f2 936 dout(10) << "on stop finish complete, r=" << r << dendl;
7c673cae
FG
937 on_stop->complete(r);
938 }
939}
940
941template <typename I>
9f95a23c
TL
942void ImageReplayer<I>::handle_replayer_notification() {
943 dout(10) << dendl;
944
945 std::unique_lock locker{m_lock};
946 if (m_state != STATE_REPLAYING) {
947 // might be attempting to shut down
948 return;
949 }
7c673cae 950
7c673cae 951 {
9f95a23c
TL
952 // detect a rename of the local image
953 ceph_assert(m_state_builder != nullptr &&
954 m_state_builder->local_image_ctx != nullptr);
955 std::shared_lock image_locker{m_state_builder->local_image_ctx->image_lock};
956 if (m_local_image_name != m_state_builder->local_image_ctx->name) {
957 // will re-register with new name after next status update
958 dout(10) << "image renamed" << dendl;
959 m_local_image_name = m_state_builder->local_image_ctx->name;
7c673cae 960 }
9f95a23c 961 }
7c673cae 962
9f95a23c
TL
963 // replayer cannot be shut down while notification is in-flight
964 ceph_assert(m_replayer != nullptr);
965 locker.unlock();
966
967 if (m_replayer->is_resync_requested()) {
968 dout(10) << "resync requested" << dendl;
969 m_resync_requested = true;
970 on_stop_journal_replay(0, "resync requested");
971 return;
7c673cae
FG
972 }
973
9f95a23c
TL
974 if (!m_replayer->is_replaying()) {
975 auto error_code = m_replayer->get_error_code();
976 auto error_description = m_replayer->get_error_description();
977 dout(10) << "replay interrupted: "
978 << "r=" << error_code << ", "
979 << "error=" << error_description << dendl;
980 on_stop_journal_replay(error_code, error_description);
981 return;
7c673cae 982 }
9f95a23c
TL
983
984 update_mirror_image_status(false, {});
7c673cae
FG
985}
986
987template <typename I>
988std::string ImageReplayer<I>::to_string(const State state) {
989 switch (state) {
990 case ImageReplayer<I>::STATE_STARTING:
991 return "Starting";
992 case ImageReplayer<I>::STATE_REPLAYING:
993 return "Replaying";
7c673cae
FG
994 case ImageReplayer<I>::STATE_STOPPING:
995 return "Stopping";
996 case ImageReplayer<I>::STATE_STOPPED:
997 return "Stopped";
998 default:
999 break;
1000 }
1001 return "Unknown(" + stringify(state) + ")";
1002}
1003
d2e6a577
FG
1004template <typename I>
1005void ImageReplayer<I>::register_admin_socket_hook() {
3efd9988
FG
1006 ImageReplayerAdminSocketHook<I> *asok_hook;
1007 {
9f95a23c 1008 std::lock_guard locker{m_lock};
3efd9988
FG
1009 if (m_asok_hook != nullptr) {
1010 return;
1011 }
7c673cae 1012
9f95a23c
TL
1013 dout(15) << "registered asok hook: " << m_image_spec << dendl;
1014 asok_hook = new ImageReplayerAdminSocketHook<I>(
1015 g_ceph_context, m_image_spec, this);
3efd9988
FG
1016 int r = asok_hook->register_commands();
1017 if (r == 0) {
1018 m_asok_hook = asok_hook;
1019 return;
1020 }
d2e6a577 1021 derr << "error registering admin socket commands" << dendl;
d2e6a577 1022 }
3efd9988 1023 delete asok_hook;
d2e6a577
FG
1024}
1025
1026template <typename I>
1027void ImageReplayer<I>::unregister_admin_socket_hook() {
28e407b8 1028 dout(15) << dendl;
d2e6a577 1029
3efd9988
FG
1030 AdminSocketHook *asok_hook = nullptr;
1031 {
9f95a23c 1032 std::lock_guard locker{m_lock};
3efd9988
FG
1033 std::swap(asok_hook, m_asok_hook);
1034 }
1035 delete asok_hook;
1036}
1037
1038template <typename I>
28e407b8 1039void ImageReplayer<I>::reregister_admin_socket_hook() {
9f95a23c
TL
1040 std::unique_lock locker{m_lock};
1041 if (m_state == STATE_STARTING && m_bootstrap_request != nullptr) {
1042 m_local_image_name = m_bootstrap_request->get_local_image_name();
3efd9988 1043 }
9f95a23c
TL
1044
1045 auto image_spec = image_replayer::util::compute_image_spec(
1046 m_local_io_ctx, m_local_image_name);
1047 if (m_asok_hook != nullptr && m_image_spec == image_spec) {
1048 return;
1049 }
1050
1051 dout(15) << "old_image_spec=" << m_image_spec << ", "
1052 << "new_image_spec=" << image_spec << dendl;
1053 m_image_spec = image_spec;
1054
1055 if (m_state == STATE_STOPPING || m_state == STATE_STOPPED) {
1056 // no need to re-register if stopping
1057 return;
1058 }
1059 locker.unlock();
1060
3efd9988
FG
1061 unregister_admin_socket_hook();
1062 register_admin_socket_hook();
7c673cae
FG
1063}
1064
1065template <typename I>
1066std::ostream &operator<<(std::ostream &os, const ImageReplayer<I> &replayer)
1067{
1068 os << "ImageReplayer: " << &replayer << " [" << replayer.get_local_pool_id()
1069 << "/" << replayer.get_global_image_id() << "]";
1070 return os;
1071}
1072
1073} // namespace mirror
1074} // namespace rbd
1075
1076template class rbd::mirror::ImageReplayer<librbd::ImageCtx>;