]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/ImageReplayer.cc
import ceph quincy 17.2.6
[ceph.git] / ceph / src / tools / rbd_mirror / ImageReplayer.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "include/compat.h"
5#include "common/Formatter.h"
11fdf7f2 6#include "common/admin_socket.h"
7c673cae
FG
7#include "common/debug.h"
8#include "common/errno.h"
9#include "include/stringify.h"
10#include "cls/rbd/cls_rbd_client.h"
11#include "common/Timer.h"
7c673cae
FG
12#include "global/global_context.h"
13#include "journal/Journaler.h"
7c673cae
FG
14#include "librbd/ExclusiveLock.h"
15#include "librbd/ImageCtx.h"
16#include "librbd/ImageState.h"
17#include "librbd/Journal.h"
18#include "librbd/Operations.h"
19#include "librbd/Utils.h"
f67539c2 20#include "librbd/asio/ContextWQ.h"
d2e6a577 21#include "ImageDeleter.h"
7c673cae 22#include "ImageReplayer.h"
9f95a23c 23#include "MirrorStatusUpdater.h"
7c673cae
FG
24#include "Threads.h"
25#include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
9f95a23c
TL
26#include "tools/rbd_mirror/image_replayer/ReplayerListener.h"
27#include "tools/rbd_mirror/image_replayer/StateBuilder.h"
28#include "tools/rbd_mirror/image_replayer/Utils.h"
29#include "tools/rbd_mirror/image_replayer/journal/Replayer.h"
30#include "tools/rbd_mirror/image_replayer/journal/StateBuilder.h"
31#include <map>
7c673cae
FG
32
33#define dout_context g_ceph_context
34#define dout_subsys ceph_subsys_rbd_mirror
35#undef dout_prefix
36#define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
37 << __func__ << ": "
38
7c673cae
FG
39namespace rbd {
40namespace mirror {
41
42using librbd::util::create_context_callback;
7c673cae
FG
43
44template <typename I>
45std::ostream &operator<<(std::ostream &os,
46 const typename ImageReplayer<I>::State &state);
47
48namespace {
49
3efd9988 50template <typename I>
7c673cae
FG
51class ImageReplayerAdminSocketCommand {
52public:
3efd9988
FG
53 ImageReplayerAdminSocketCommand(const std::string &desc,
54 ImageReplayer<I> *replayer)
55 : desc(desc), replayer(replayer) {
56 }
7c673cae 57 virtual ~ImageReplayerAdminSocketCommand() {}
9f95a23c 58 virtual int call(Formatter *f) = 0;
3efd9988
FG
59
60 std::string desc;
61 ImageReplayer<I> *replayer;
62 bool registered = false;
7c673cae
FG
63};
64
65template <typename I>
3efd9988 66class StatusCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 67public:
3efd9988
FG
68 explicit StatusCommand(const std::string &desc, ImageReplayer<I> *replayer)
69 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
70 }
7c673cae 71
9f95a23c
TL
72 int call(Formatter *f) override {
73 this->replayer->print_status(f);
74 return 0;
7c673cae 75 }
7c673cae
FG
76};
77
78template <typename I>
3efd9988 79class StartCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 80public:
3efd9988
FG
81 explicit StartCommand(const std::string &desc, ImageReplayer<I> *replayer)
82 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
83 }
7c673cae 84
9f95a23c 85 int call(Formatter *f) override {
3efd9988 86 this->replayer->start(nullptr, true);
9f95a23c 87 return 0;
7c673cae 88 }
7c673cae
FG
89};
90
91template <typename I>
3efd9988 92class StopCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 93public:
3efd9988
FG
94 explicit StopCommand(const std::string &desc, ImageReplayer<I> *replayer)
95 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
96 }
7c673cae 97
9f95a23c 98 int call(Formatter *f) override {
3efd9988 99 this->replayer->stop(nullptr, true);
9f95a23c 100 return 0;
7c673cae 101 }
7c673cae
FG
102};
103
104template <typename I>
3efd9988 105class RestartCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 106public:
3efd9988
FG
107 explicit RestartCommand(const std::string &desc, ImageReplayer<I> *replayer)
108 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
109 }
7c673cae 110
9f95a23c 111 int call(Formatter *f) override {
3efd9988 112 this->replayer->restart();
9f95a23c 113 return 0;
7c673cae 114 }
7c673cae
FG
115};
116
117template <typename I>
3efd9988 118class FlushCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 119public:
3efd9988
FG
120 explicit FlushCommand(const std::string &desc, ImageReplayer<I> *replayer)
121 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
122 }
7c673cae 123
9f95a23c 124 int call(Formatter *f) override {
28e407b8 125 this->replayer->flush();
9f95a23c 126 return 0;
7c673cae 127 }
7c673cae
FG
128};
129
130template <typename I>
131class ImageReplayerAdminSocketHook : public AdminSocketHook {
132public:
133 ImageReplayerAdminSocketHook(CephContext *cct, const std::string &name,
31f18b77 134 ImageReplayer<I> *replayer)
3efd9988
FG
135 : admin_socket(cct->get_admin_socket()),
136 commands{{"rbd mirror flush " + name,
137 new FlushCommand<I>("flush rbd mirror " + name, replayer)},
138 {"rbd mirror restart " + name,
139 new RestartCommand<I>("restart rbd mirror " + name, replayer)},
140 {"rbd mirror start " + name,
141 new StartCommand<I>("start rbd mirror " + name, replayer)},
142 {"rbd mirror status " + name,
143 new StatusCommand<I>("get status for rbd mirror " + name, replayer)},
144 {"rbd mirror stop " + name,
145 new StopCommand<I>("stop rbd mirror " + name, replayer)}} {
d2e6a577
FG
146 }
147
148 int register_commands() {
3efd9988 149 for (auto &it : commands) {
9f95a23c 150 int r = admin_socket->register_command(it.first, this,
3efd9988
FG
151 it.second->desc);
152 if (r < 0) {
153 return r;
154 }
155 it.second->registered = true;
7c673cae 156 }
d2e6a577 157 return 0;
7c673cae
FG
158 }
159
160 ~ImageReplayerAdminSocketHook() override {
9f95a23c 161 admin_socket->unregister_commands(this);
3efd9988 162 for (auto &it : commands) {
3efd9988 163 delete it.second;
7c673cae 164 }
31f18b77 165 commands.clear();
7c673cae
FG
166 }
167
9f95a23c 168 int call(std::string_view command, const cmdmap_t& cmdmap,
39ae355f 169 const bufferlist&,
9f95a23c
TL
170 Formatter *f,
171 std::ostream& errss,
172 bufferlist& out) override {
3efd9988 173 auto i = commands.find(command);
11fdf7f2 174 ceph_assert(i != commands.end());
9f95a23c 175 return i->second->call(f);
7c673cae
FG
176 }
177
178private:
11fdf7f2
TL
179 typedef std::map<std::string, ImageReplayerAdminSocketCommand<I>*,
180 std::less<>> Commands;
7c673cae
FG
181
182 AdminSocket *admin_socket;
183 Commands commands;
184};
185
7c673cae
FG
186} // anonymous namespace
187
188template <typename I>
189void ImageReplayer<I>::BootstrapProgressContext::update_progress(
190 const std::string &description, bool flush)
191{
192 const std::string desc = "bootstrapping, " + description;
193 replayer->set_state_description(0, desc);
194 if (flush) {
195 replayer->update_mirror_image_status(false, boost::none);
196 }
197}
198
199template <typename I>
9f95a23c
TL
200struct ImageReplayer<I>::ReplayerListener
201 : public image_replayer::ReplayerListener {
202 ImageReplayer<I>* image_replayer;
203
204 ReplayerListener(ImageReplayer<I>* image_replayer)
205 : image_replayer(image_replayer) {
206 }
207
208 void handle_notification() override {
209 image_replayer->handle_replayer_notification();
210 }
211};
7c673cae
FG
212
213template <typename I>
9f95a23c
TL
214ImageReplayer<I>::ImageReplayer(
215 librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid,
216 const std::string &global_image_id, Threads<I> *threads,
217 InstanceWatcher<I> *instance_watcher,
218 MirrorStatusUpdater<I>* local_status_updater,
219 journal::CacheManagerHandler *cache_manager_handler,
220 PoolMetaCache* pool_meta_cache) :
221 m_local_io_ctx(local_io_ctx), m_local_mirror_uuid(local_mirror_uuid),
222 m_global_image_id(global_image_id), m_threads(threads),
31f18b77 223 m_instance_watcher(instance_watcher),
9f95a23c
TL
224 m_local_status_updater(local_status_updater),
225 m_cache_manager_handler(cache_manager_handler),
226 m_pool_meta_cache(pool_meta_cache),
227 m_local_image_name(global_image_id),
228 m_lock(ceph::make_mutex("rbd::mirror::ImageReplayer " +
229 stringify(local_io_ctx.get_id()) + " " + global_image_id)),
7c673cae 230 m_progress_cxt(this),
9f95a23c 231 m_replayer_listener(new ReplayerListener(this))
7c673cae
FG
232{
233 // Register asok commands using a temporary "remote_pool_name/global_image_id"
234 // name. When the image name becomes known on start the asok commands will be
235 // re-registered using "remote_pool_name/remote_image_name" name.
236
9f95a23c
TL
237 m_image_spec = image_replayer::util::compute_image_spec(
238 local_io_ctx, global_image_id);
d2e6a577 239 register_admin_socket_hook();
7c673cae
FG
240}
241
242template <typename I>
243ImageReplayer<I>::~ImageReplayer()
244{
d2e6a577 245 unregister_admin_socket_hook();
9f95a23c 246 ceph_assert(m_state_builder == nullptr);
11fdf7f2 247 ceph_assert(m_on_start_finish == nullptr);
20effc67 248 ceph_assert(m_on_stop_contexts.empty());
11fdf7f2 249 ceph_assert(m_bootstrap_request == nullptr);
1911f103 250 ceph_assert(m_update_status_task == nullptr);
9f95a23c 251 delete m_replayer_listener;
7c673cae
FG
252}
253
c07f9fc5
FG
254template <typename I>
255image_replayer::HealthState ImageReplayer<I>::get_health_state() const {
9f95a23c 256 std::lock_guard locker{m_lock};
c07f9fc5
FG
257
258 if (!m_mirror_image_status_state) {
259 return image_replayer::HEALTH_STATE_OK;
260 } else if (*m_mirror_image_status_state ==
261 cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING ||
262 *m_mirror_image_status_state ==
263 cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN) {
264 return image_replayer::HEALTH_STATE_WARNING;
265 }
266 return image_replayer::HEALTH_STATE_ERROR;
267}
268
7c673cae 269template <typename I>
9f95a23c
TL
270void ImageReplayer<I>::add_peer(const Peer<I>& peer) {
271 dout(10) << "peer=" << peer << dendl;
272
273 std::lock_guard locker{m_lock};
274 auto it = m_peers.find(peer);
d2e6a577 275 if (it == m_peers.end()) {
9f95a23c 276 m_peers.insert(peer);
7c673cae
FG
277 }
278}
279
7c673cae
FG
280template <typename I>
281void ImageReplayer<I>::set_state_description(int r, const std::string &desc) {
9f95a23c 282 dout(10) << "r=" << r << ", desc=" << desc << dendl;
7c673cae 283
9f95a23c 284 std::lock_guard l{m_lock};
7c673cae
FG
285 m_last_r = r;
286 m_state_desc = desc;
287}
288
289template <typename I>
e306af50 290void ImageReplayer<I>::start(Context *on_finish, bool manual, bool restart)
7c673cae 291{
11fdf7f2 292 dout(10) << "on_finish=" << on_finish << dendl;
7c673cae
FG
293
294 int r = 0;
295 {
9f95a23c 296 std::lock_guard locker{m_lock};
7c673cae
FG
297 if (!is_stopped_()) {
298 derr << "already running" << dendl;
299 r = -EINVAL;
300 } else if (m_manual_stop && !manual) {
301 dout(5) << "stopped manually, ignoring start without manual flag"
302 << dendl;
303 r = -EPERM;
e306af50
TL
304 } else if (restart && !m_restart_requested) {
305 dout(10) << "canceled restart" << dendl;
306 r = -ECANCELED;
7c673cae
FG
307 } else {
308 m_state = STATE_STARTING;
309 m_last_r = 0;
310 m_state_desc.clear();
311 m_manual_stop = false;
d2e6a577 312 m_delete_requested = false;
e306af50 313 m_restart_requested = false;
a4b75251 314 m_status_removed = false;
7c673cae
FG
315
316 if (on_finish != nullptr) {
11fdf7f2 317 ceph_assert(m_on_start_finish == nullptr);
7c673cae
FG
318 m_on_start_finish = on_finish;
319 }
20effc67 320 ceph_assert(m_on_stop_contexts.empty());
7c673cae
FG
321 }
322 }
323
324 if (r < 0) {
325 if (on_finish) {
326 on_finish->complete(r);
327 }
328 return;
329 }
330
9f95a23c 331 bootstrap();
7c673cae
FG
332}
333
334template <typename I>
9f95a23c 335void ImageReplayer<I>::bootstrap() {
11fdf7f2 336 dout(10) << dendl;
7c673cae 337
9f95a23c 338 std::unique_lock locker{m_lock};
b32b8144 339 if (m_peers.empty()) {
9f95a23c
TL
340 locker.unlock();
341
342 dout(5) << "no peer clusters" << dendl;
343 on_start_fail(-ENOENT, "no peer clusters");
b32b8144
FG
344 return;
345 }
7c673cae 346
d2e6a577 347 // TODO need to support multiple remote images
11fdf7f2 348 ceph_assert(!m_peers.empty());
9f95a23c 349 m_remote_image_peer = *m_peers.begin();
7c673cae 350
9f95a23c 351 if (on_start_interrupted(m_lock)) {
b32b8144
FG
352 return;
353 }
7c673cae 354
9f95a23c
TL
355 ceph_assert(m_state_builder == nullptr);
356 auto ctx = create_context_callback<
11fdf7f2 357 ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
9f95a23c
TL
358 auto request = image_replayer::BootstrapRequest<I>::create(
359 m_threads, m_local_io_ctx, m_remote_image_peer.io_ctx, m_instance_watcher,
360 m_global_image_id, m_local_mirror_uuid,
361 m_remote_image_peer.remote_pool_meta, m_cache_manager_handler,
362 m_pool_meta_cache, &m_progress_cxt, &m_state_builder, &m_resync_requested,
363 ctx);
7c673cae 364
9f95a23c
TL
365 request->get();
366 m_bootstrap_request = request;
367 locker.unlock();
7c673cae 368
9f95a23c 369 update_mirror_image_status(false, boost::none);
7c673cae
FG
370 request->send();
371}
372
373template <typename I>
374void ImageReplayer<I>::handle_bootstrap(int r) {
11fdf7f2 375 dout(10) << "r=" << r << dendl;
7c673cae 376 {
9f95a23c 377 std::lock_guard locker{m_lock};
7c673cae
FG
378 m_bootstrap_request->put();
379 m_bootstrap_request = nullptr;
7c673cae
FG
380 }
381
11fdf7f2
TL
382 if (on_start_interrupted()) {
383 return;
9f95a23c
TL
384 } else if (r == -ENOMSG) {
385 dout(5) << "local image is primary" << dendl;
386 on_start_fail(0, "local image is primary");
387 return;
11fdf7f2 388 } else if (r == -EREMOTEIO) {
2a845540
TL
389 dout(5) << "remote image is not primary" << dendl;
390 on_start_fail(-EREMOTEIO, "remote image is not primary");
7c673cae
FG
391 return;
392 } else if (r == -EEXIST) {
7c673cae
FG
393 on_start_fail(r, "split-brain detected");
394 return;
9f95a23c
TL
395 } else if (r == -ENOLINK) {
396 m_delete_requested = true;
397 on_start_fail(0, "remote image no longer exists");
398 return;
a4b75251
TL
399 } else if (r == -ERESTART) {
400 on_start_fail(r, "image in transient state, try again");
401 return;
7c673cae
FG
402 } else if (r < 0) {
403 on_start_fail(r, "error bootstrapping replay");
404 return;
d2e6a577
FG
405 } else if (m_resync_requested) {
406 on_start_fail(0, "resync requested");
407 return;
7c673cae
FG
408 }
409
9f95a23c 410 start_replay();
7c673cae
FG
411}
412
413template <typename I>
9f95a23c 414void ImageReplayer<I>::start_replay() {
11fdf7f2 415 dout(10) << dendl;
7c673cae 416
9f95a23c
TL
417 std::unique_lock locker{m_lock};
418 ceph_assert(m_replayer == nullptr);
419 m_replayer = m_state_builder->create_replayer(m_threads, m_instance_watcher,
420 m_local_mirror_uuid,
421 m_pool_meta_cache,
422 m_replayer_listener);
423
424 auto ctx = create_context_callback<
425 ImageReplayer<I>, &ImageReplayer<I>::handle_start_replay>(this);
426 m_replayer->init(ctx);
7c673cae
FG
427}
428
429template <typename I>
9f95a23c 430void ImageReplayer<I>::handle_start_replay(int r) {
11fdf7f2 431 dout(10) << "r=" << r << dendl;
7c673cae 432
11fdf7f2
TL
433 if (on_start_interrupted()) {
434 return;
435 } else if (r < 0) {
9f95a23c
TL
436 std::string error_description = m_replayer->get_error_description();
437 if (r == -ENOTCONN && m_replayer->is_resync_requested()) {
438 std::unique_lock locker{m_lock};
d2e6a577 439 m_resync_requested = true;
7c673cae 440 }
7c673cae 441
9f95a23c
TL
442 // shut down not required if init failed
443 m_replayer->destroy();
444 m_replayer = nullptr;
7c673cae 445
9f95a23c
TL
446 derr << "error starting replay: " << cpp_strerror(r) << dendl;
447 on_start_fail(r, error_description);
7c673cae
FG
448 return;
449 }
450
9f95a23c 451 Context *on_finish = nullptr;
7c673cae 452 {
9f95a23c 453 std::unique_lock locker{m_lock};
11fdf7f2 454 ceph_assert(m_state == STATE_STARTING);
7c673cae
FG
455 m_state = STATE_REPLAYING;
456 std::swap(m_on_start_finish, on_finish);
1911f103
TL
457
458 std::unique_lock timer_locker{m_threads->timer_lock};
459 schedule_update_mirror_image_replay_status();
7c673cae
FG
460 }
461
7c673cae 462 update_mirror_image_status(true, boost::none);
7c673cae 463 if (on_replay_interrupted()) {
9f95a23c
TL
464 if (on_finish != nullptr) {
465 on_finish->complete(r);
466 }
7c673cae
FG
467 return;
468 }
469
11fdf7f2 470 dout(10) << "start succeeded" << dendl;
d2e6a577 471 if (on_finish != nullptr) {
11fdf7f2 472 dout(10) << "on finish complete, r=" << r << dendl;
d2e6a577
FG
473 on_finish->complete(r);
474 }
7c673cae
FG
475}
476
477template <typename I>
478void ImageReplayer<I>::on_start_fail(int r, const std::string &desc)
479{
9f95a23c
TL
480 dout(10) << "r=" << r << ", desc=" << desc << dendl;
481 Context *ctx = new LambdaContext([this, r, desc](int _r) {
7c673cae 482 {
9f95a23c 483 std::lock_guard locker{m_lock};
11fdf7f2 484 ceph_assert(m_state == STATE_STARTING);
7c673cae 485 m_state = STATE_STOPPING;
d2e6a577 486 if (r < 0 && r != -ECANCELED && r != -EREMOTEIO && r != -ENOENT) {
7c673cae
FG
487 derr << "start failed: " << cpp_strerror(r) << dendl;
488 } else {
11fdf7f2 489 dout(10) << "start canceled" << dendl;
7c673cae
FG
490 }
491 }
492
493 set_state_description(r, desc);
9f95a23c 494 update_mirror_image_status(false, boost::none);
7c673cae
FG
495 shut_down(r);
496 });
497 m_threads->work_queue->queue(ctx, 0);
498}
499
500template <typename I>
11fdf7f2 501bool ImageReplayer<I>::on_start_interrupted() {
9f95a23c 502 std::lock_guard locker{m_lock};
11fdf7f2
TL
503 return on_start_interrupted(m_lock);
504}
505
506template <typename I>
9f95a23c
TL
507bool ImageReplayer<I>::on_start_interrupted(ceph::mutex& lock) {
508 ceph_assert(ceph_mutex_is_locked(m_lock));
11fdf7f2
TL
509 ceph_assert(m_state == STATE_STARTING);
510 if (!m_stop_requested) {
7c673cae
FG
511 return false;
512 }
513
11fdf7f2 514 on_start_fail(-ECANCELED, "");
7c673cae
FG
515 return true;
516}
517
518template <typename I>
e306af50 519void ImageReplayer<I>::stop(Context *on_finish, bool manual, bool restart)
7c673cae 520{
11fdf7f2 521 dout(10) << "on_finish=" << on_finish << ", manual=" << manual
e306af50 522 << ", restart=" << restart << dendl;
7c673cae
FG
523
524 image_replayer::BootstrapRequest<I> *bootstrap_request = nullptr;
525 bool shut_down_replay = false;
20effc67 526 bool is_stopped = false;
7c673cae 527 {
9f95a23c 528 std::lock_guard locker{m_lock};
7c673cae
FG
529
530 if (!is_running_()) {
20effc67
TL
531 if (manual && !m_manual_stop) {
532 dout(10) << "marking manual" << dendl;
533 m_manual_stop = true;
534 }
e306af50
TL
535 if (!restart && m_restart_requested) {
536 dout(10) << "canceling restart" << dendl;
537 m_restart_requested = false;
538 }
20effc67
TL
539 if (is_stopped_()) {
540 dout(10) << "already stopped" << dendl;
541 is_stopped = true;
542 } else {
543 dout(10) << "joining in-flight stop" << dendl;
544 if (on_finish != nullptr) {
545 m_on_stop_contexts.push_back(on_finish);
546 }
547 }
7c673cae 548 } else {
20effc67
TL
549 if (m_state == STATE_STARTING) {
550 dout(10) << "canceling start" << dendl;
551 if (m_bootstrap_request != nullptr) {
552 bootstrap_request = m_bootstrap_request;
553 bootstrap_request->get();
554 }
555 } else {
556 dout(10) << "interrupting replay" << dendl;
557 shut_down_replay = true;
7c673cae 558 }
20effc67
TL
559
560 ceph_assert(m_on_stop_contexts.empty());
561 if (on_finish != nullptr) {
562 m_on_stop_contexts.push_back(on_finish);
563 }
564 m_stop_requested = true;
565 m_manual_stop = manual;
7c673cae
FG
566 }
567 }
568
20effc67
TL
569 if (is_stopped) {
570 if (on_finish) {
571 on_finish->complete(-EINVAL);
572 }
573 return;
574 }
575
7c673cae
FG
576 // avoid holding lock since bootstrap request will update status
577 if (bootstrap_request != nullptr) {
11fdf7f2 578 dout(10) << "canceling bootstrap" << dendl;
7c673cae
FG
579 bootstrap_request->cancel();
580 bootstrap_request->put();
581 }
582
7c673cae 583 if (shut_down_replay) {
e306af50 584 on_stop_journal_replay();
7c673cae
FG
585 }
586}
587
588template <typename I>
589void ImageReplayer<I>::on_stop_journal_replay(int r, const std::string &desc)
590{
11fdf7f2 591 dout(10) << dendl;
7c673cae
FG
592
593 {
9f95a23c 594 std::lock_guard locker{m_lock};
7c673cae
FG
595 if (m_state != STATE_REPLAYING) {
596 // might be invoked multiple times while stopping
597 return;
598 }
9f95a23c 599
7c673cae
FG
600 m_stop_requested = true;
601 m_state = STATE_STOPPING;
602 }
603
1911f103 604 cancel_update_mirror_image_replay_status();
7c673cae 605 set_state_description(r, desc);
a8e16298 606 update_mirror_image_status(true, boost::none);
7c673cae
FG
607 shut_down(0);
608}
609
7c673cae
FG
610template <typename I>
611void ImageReplayer<I>::restart(Context *on_finish)
612{
e306af50
TL
613 {
614 std::lock_guard locker{m_lock};
615 m_restart_requested = true;
616 }
617
9f95a23c 618 auto ctx = new LambdaContext(
7c673cae
FG
619 [this, on_finish](int r) {
620 if (r < 0) {
621 // Try start anyway.
622 }
e306af50 623 start(on_finish, true, true);
7c673cae 624 });
e306af50 625 stop(ctx, false, true);
7c673cae
FG
626}
627
628template <typename I>
81eedcae 629void ImageReplayer<I>::flush()
7c673cae 630{
81eedcae 631 C_SaferCond ctx;
7c673cae 632
9f95a23c
TL
633 {
634 std::unique_lock locker{m_lock};
635 if (m_state != STATE_REPLAYING) {
636 return;
637 }
7c673cae 638
9f95a23c
TL
639 dout(10) << dendl;
640 ceph_assert(m_replayer != nullptr);
641 m_replayer->flush(&ctx);
81eedcae
TL
642 }
643
9f95a23c
TL
644 int r = ctx.wait();
645 if (r >= 0) {
646 update_mirror_image_status(false, boost::none);
7c673cae 647 }
7c673cae
FG
648}
649
650template <typename I>
651bool ImageReplayer<I>::on_replay_interrupted()
652{
653 bool shut_down;
654 {
9f95a23c 655 std::lock_guard locker{m_lock};
7c673cae
FG
656 shut_down = m_stop_requested;
657 }
658
659 if (shut_down) {
660 on_stop_journal_replay();
661 }
662 return shut_down;
663}
664
665template <typename I>
9f95a23c 666void ImageReplayer<I>::print_status(Formatter *f)
7c673cae 667{
11fdf7f2 668 dout(10) << dendl;
7c673cae 669
9f95a23c 670 std::lock_guard l{m_lock};
7c673cae 671
9f95a23c
TL
672 f->open_object_section("image_replayer");
673 f->dump_string("name", m_image_spec);
674 f->dump_string("state", to_string(m_state));
675 f->close_section();
7c673cae
FG
676}
677
1911f103
TL
678template <typename I>
679void ImageReplayer<I>::schedule_update_mirror_image_replay_status() {
680 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
681 ceph_assert(ceph_mutex_is_locked_by_me(m_threads->timer_lock));
682 if (m_state != STATE_REPLAYING) {
683 return;
684 }
685
686 dout(10) << dendl;
687
688 // periodically update the replaying status even if nothing changes
689 // so that we can adjust our performance stats
690 ceph_assert(m_update_status_task == nullptr);
691 m_update_status_task = create_context_callback<
692 ImageReplayer<I>,
693 &ImageReplayer<I>::handle_update_mirror_image_replay_status>(this);
694 m_threads->timer->add_event_after(10, m_update_status_task);
695}
696
697template <typename I>
698void ImageReplayer<I>::handle_update_mirror_image_replay_status(int r) {
699 dout(10) << dendl;
700
cd265ab1
TL
701 ceph_assert(ceph_mutex_is_locked_by_me(m_threads->timer_lock));
702
703 ceph_assert(m_update_status_task != nullptr);
704 m_update_status_task = nullptr;
705
1911f103
TL
706 auto ctx = new LambdaContext([this](int) {
707 update_mirror_image_status(false, boost::none);
708
cd265ab1
TL
709 std::unique_lock locker{m_lock};
710 std::unique_lock timer_locker{m_threads->timer_lock};
1911f103 711
cd265ab1 712 schedule_update_mirror_image_replay_status();
1911f103
TL
713 m_in_flight_op_tracker.finish_op();
714 });
715
716 m_in_flight_op_tracker.start_op();
717 m_threads->work_queue->queue(ctx, 0);
718}
719
720template <typename I>
721void ImageReplayer<I>::cancel_update_mirror_image_replay_status() {
722 std::unique_lock timer_locker{m_threads->timer_lock};
723 if (m_update_status_task != nullptr) {
724 dout(10) << dendl;
725
726 if (m_threads->timer->cancel_event(m_update_status_task)) {
727 m_update_status_task = nullptr;
728 }
729 }
730}
731
7c673cae 732template <typename I>
9f95a23c
TL
733void ImageReplayer<I>::update_mirror_image_status(
734 bool force, const OptionalState &opt_state) {
735 dout(15) << "force=" << force << ", "
736 << "state=" << opt_state << dendl;
7c673cae
FG
737
738 {
9f95a23c
TL
739 std::lock_guard locker{m_lock};
740 if (!force && !is_stopped_() && !is_running_()) {
741 dout(15) << "shut down in-progress: ignoring update" << dendl;
28e407b8 742 return;
28e407b8
AA
743 }
744 }
745
9f95a23c
TL
746 m_in_flight_op_tracker.start_op();
747 auto ctx = new LambdaContext(
748 [this, force, opt_state](int r) {
749 set_mirror_image_status_update(force, opt_state);
11fdf7f2
TL
750 });
751 m_threads->work_queue->queue(ctx, 0);
7c673cae
FG
752}
753
754template <typename I>
9f95a23c
TL
755void ImageReplayer<I>::set_mirror_image_status_update(
756 bool force, const OptionalState &opt_state) {
757 dout(15) << "force=" << force << ", "
758 << "state=" << opt_state << dendl;
7c673cae 759
28e407b8
AA
760 reregister_admin_socket_hook();
761
7c673cae
FG
762 State state;
763 std::string state_desc;
764 int last_r;
7c673cae 765 bool stopping_replay;
c07f9fc5 766
9f95a23c
TL
767 auto mirror_image_status_state = boost::make_optional(
768 false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN);
c07f9fc5 769 image_replayer::BootstrapRequest<I>* bootstrap_request = nullptr;
7c673cae 770 {
9f95a23c 771 std::lock_guard locker{m_lock};
7c673cae
FG
772 state = m_state;
773 state_desc = m_state_desc;
c07f9fc5 774 mirror_image_status_state = m_mirror_image_status_state;
7c673cae 775 last_r = m_last_r;
9f95a23c 776 stopping_replay = (m_replayer != nullptr);
c07f9fc5
FG
777
778 if (m_bootstrap_request != nullptr) {
779 bootstrap_request = m_bootstrap_request;
780 bootstrap_request->get();
781 }
782 }
783
784 bool syncing = false;
785 if (bootstrap_request != nullptr) {
786 syncing = bootstrap_request->is_syncing();
787 bootstrap_request->put();
788 bootstrap_request = nullptr;
7c673cae
FG
789 }
790
791 if (opt_state) {
792 state = *opt_state;
793 }
794
9f95a23c 795 cls::rbd::MirrorImageSiteStatus status;
7c673cae
FG
796 status.up = true;
797 switch (state) {
798 case STATE_STARTING:
c07f9fc5 799 if (syncing) {
7c673cae
FG
800 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING;
801 status.description = state_desc.empty() ? "syncing" : state_desc;
c07f9fc5 802 mirror_image_status_state = status.state;
7c673cae
FG
803 } else {
804 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY;
805 status.description = "starting replay";
806 }
807 break;
808 case STATE_REPLAYING:
7c673cae
FG
809 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
810 {
9f95a23c
TL
811 std::string desc;
812 auto on_req_finish = new LambdaContext(
813 [this, force](int r) {
11fdf7f2 814 dout(15) << "replay status ready: r=" << r << dendl;
7c673cae 815 if (r >= 0) {
9f95a23c 816 set_mirror_image_status_update(force, boost::none);
7c673cae 817 } else if (r == -EAGAIN) {
9f95a23c 818 m_in_flight_op_tracker.finish_op();
7c673cae
FG
819 }
820 });
821
9f95a23c
TL
822 ceph_assert(m_replayer != nullptr);
823 if (!m_replayer->get_replay_status(&desc, on_req_finish)) {
11fdf7f2 824 dout(15) << "waiting for replay status" << dendl;
7c673cae
FG
825 return;
826 }
9f95a23c 827
7c673cae 828 status.description = "replaying, " + desc;
11fdf7f2
TL
829 mirror_image_status_state = boost::make_optional(
830 false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN);
7c673cae
FG
831 }
832 break;
833 case STATE_STOPPING:
834 if (stopping_replay) {
835 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY;
a8e16298 836 status.description = state_desc.empty() ? "stopping replay" : state_desc;
7c673cae
FG
837 break;
838 }
839 // FALLTHROUGH
840 case STATE_STOPPED:
c07f9fc5
FG
841 if (last_r == -EREMOTEIO) {
842 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN;
843 status.description = state_desc;
844 mirror_image_status_state = status.state;
9f95a23c 845 } else if (last_r < 0 && last_r != -ECANCELED) {
7c673cae
FG
846 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR;
847 status.description = state_desc;
c07f9fc5 848 mirror_image_status_state = status.state;
7c673cae
FG
849 } else {
850 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED;
851 status.description = state_desc.empty() ? "stopped" : state_desc;
c07f9fc5 852 mirror_image_status_state = boost::none;
7c673cae
FG
853 }
854 break;
855 default:
11fdf7f2 856 ceph_assert(!"invalid state");
7c673cae
FG
857 }
858
c07f9fc5 859 {
9f95a23c 860 std::lock_guard locker{m_lock};
c07f9fc5
FG
861 m_mirror_image_status_state = mirror_image_status_state;
862 }
863
864 // prevent the status from ping-ponging when failed replays are restarted
865 if (mirror_image_status_state &&
866 *mirror_image_status_state == cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR) {
867 status.state = *mirror_image_status_state;
868 }
869
11fdf7f2 870 dout(15) << "status=" << status << dendl;
9f95a23c
TL
871 m_local_status_updater->set_mirror_image_status(m_global_image_id, status,
872 force);
873 if (m_remote_image_peer.mirror_status_updater != nullptr) {
874 m_remote_image_peer.mirror_status_updater->set_mirror_image_status(
875 m_global_image_id, status, force);
7c673cae
FG
876 }
877
9f95a23c 878 m_in_flight_op_tracker.finish_op();
7c673cae
FG
879}
880
881template <typename I>
882void ImageReplayer<I>::shut_down(int r) {
11fdf7f2 883 dout(10) << "r=" << r << dendl;
3efd9988 884
3efd9988 885 {
9f95a23c 886 std::lock_guard locker{m_lock};
11fdf7f2 887 ceph_assert(m_state == STATE_STOPPING);
7c673cae
FG
888 }
889
9f95a23c
TL
890 if (!m_in_flight_op_tracker.empty()) {
891 dout(15) << "waiting for in-flight operations to complete" << dendl;
892 m_in_flight_op_tracker.wait_for_ops(new LambdaContext([this, r](int) {
893 shut_down(r);
894 }));
895 return;
896 }
31f18b77 897
7c673cae 898 // chain the shut down sequence (reverse order)
9f95a23c 899 Context *ctx = new LambdaContext(
7c673cae 900 [this, r](int _r) {
9f95a23c 901 update_mirror_image_status(true, STATE_STOPPED);
7c673cae
FG
902 handle_shut_down(r);
903 });
31f18b77 904
9f95a23c
TL
905 // destruct the state builder
906 if (m_state_builder != nullptr) {
907 ctx = new LambdaContext([this, ctx](int r) {
908 m_state_builder->close(ctx);
31f18b77
FG
909 });
910 }
911
9f95a23c
TL
912 // close the replayer
913 if (m_replayer != nullptr) {
914 ctx = new LambdaContext([this, ctx](int r) {
915 m_replayer->destroy();
916 m_replayer = nullptr;
917 ctx->complete(0);
918 });
919 ctx = new LambdaContext([this, ctx](int r) {
920 m_replayer->shut_down(ctx);
31f18b77 921 });
7c673cae 922 }
31f18b77 923
7c673cae
FG
924 m_threads->work_queue->queue(ctx, 0);
925}
926
927template <typename I>
928void ImageReplayer<I>::handle_shut_down(int r) {
11fdf7f2
TL
929 bool resync_requested = false;
930 bool delete_requested = false;
3efd9988 931 bool unregister_asok_hook = false;
7c673cae 932 {
9f95a23c 933 std::lock_guard locker{m_lock};
7c673cae 934
9f95a23c
TL
935 if (m_delete_requested && m_state_builder != nullptr &&
936 !m_state_builder->local_image_id.empty()) {
937 ceph_assert(m_state_builder->remote_image_id.empty());
d2e6a577 938 dout(0) << "remote image no longer exists: scheduling deletion" << dendl;
11fdf7f2
TL
939 unregister_asok_hook = true;
940 std::swap(delete_requested, m_delete_requested);
a4b75251 941 m_delete_in_progress = true;
d2e6a577 942 }
d2e6a577 943
11fdf7f2 944 std::swap(resync_requested, m_resync_requested);
9f95a23c
TL
945 if (!delete_requested && !resync_requested && m_last_r == -ENOENT &&
946 ((m_state_builder == nullptr) ||
947 (m_state_builder->local_image_id.empty() &&
948 m_state_builder->remote_image_id.empty()))) {
d2e6a577 949 dout(0) << "mirror image no longer exists" << dendl;
3efd9988 950 unregister_asok_hook = true;
d2e6a577 951 m_finished = true;
7c673cae
FG
952 }
953 }
954
3efd9988
FG
955 if (unregister_asok_hook) {
956 unregister_admin_socket_hook();
957 }
958
11fdf7f2
TL
959 if (delete_requested || resync_requested) {
960 dout(5) << "moving image to trash" << dendl;
9f95a23c 961 auto ctx = new LambdaContext([this, r](int) {
11fdf7f2
TL
962 handle_shut_down(r);
963 });
9f95a23c 964 ImageDeleter<I>::trash_move(m_local_io_ctx, m_global_image_id,
11fdf7f2
TL
965 resync_requested, m_threads->work_queue, ctx);
966 return;
967 }
7c673cae 968
9f95a23c
TL
969 if (!m_in_flight_op_tracker.empty()) {
970 dout(15) << "waiting for in-flight operations to complete" << dendl;
971 m_in_flight_op_tracker.wait_for_ops(new LambdaContext([this, r](int) {
972 handle_shut_down(r);
973 }));
974 return;
975 }
7c673cae 976
a4b75251 977 if (!m_status_removed) {
9f95a23c 978 auto ctx = new LambdaContext([this, r](int) {
a4b75251
TL
979 m_status_removed = true;
980 handle_shut_down(r);
981 });
982 remove_image_status(m_delete_in_progress, ctx);
9f95a23c
TL
983 return;
984 }
985
986 if (m_state_builder != nullptr) {
987 m_state_builder->destroy();
988 m_state_builder = nullptr;
989 }
990
991 dout(10) << "stop complete" << dendl;
7c673cae 992 Context *on_start = nullptr;
20effc67 993 Contexts on_stop_contexts;
7c673cae 994 {
9f95a23c 995 std::lock_guard locker{m_lock};
7c673cae 996 std::swap(on_start, m_on_start_finish);
20effc67 997 on_stop_contexts = std::move(m_on_stop_contexts);
7c673cae 998 m_stop_requested = false;
11fdf7f2 999 ceph_assert(m_state == STATE_STOPPING);
7c673cae
FG
1000 m_state = STATE_STOPPED;
1001 }
1002
1003 if (on_start != nullptr) {
11fdf7f2 1004 dout(10) << "on start finish complete, r=" << r << dendl;
7c673cae
FG
1005 on_start->complete(r);
1006 r = 0;
1007 }
20effc67
TL
1008 for (auto ctx : on_stop_contexts) {
1009 dout(10) << "on stop finish " << ctx << " complete, r=" << r << dendl;
1010 ctx->complete(r);
7c673cae
FG
1011 }
1012}
1013
1014template <typename I>
9f95a23c
TL
1015void ImageReplayer<I>::handle_replayer_notification() {
1016 dout(10) << dendl;
1017
1018 std::unique_lock locker{m_lock};
1019 if (m_state != STATE_REPLAYING) {
1020 // might be attempting to shut down
1021 return;
1022 }
7c673cae 1023
7c673cae 1024 {
9f95a23c
TL
1025 // detect a rename of the local image
1026 ceph_assert(m_state_builder != nullptr &&
1027 m_state_builder->local_image_ctx != nullptr);
1028 std::shared_lock image_locker{m_state_builder->local_image_ctx->image_lock};
1029 if (m_local_image_name != m_state_builder->local_image_ctx->name) {
1030 // will re-register with new name after next status update
1031 dout(10) << "image renamed" << dendl;
1032 m_local_image_name = m_state_builder->local_image_ctx->name;
7c673cae 1033 }
9f95a23c 1034 }
7c673cae 1035
9f95a23c
TL
1036 // replayer cannot be shut down while notification is in-flight
1037 ceph_assert(m_replayer != nullptr);
1038 locker.unlock();
1039
1040 if (m_replayer->is_resync_requested()) {
1041 dout(10) << "resync requested" << dendl;
1042 m_resync_requested = true;
1043 on_stop_journal_replay(0, "resync requested");
1044 return;
7c673cae
FG
1045 }
1046
9f95a23c
TL
1047 if (!m_replayer->is_replaying()) {
1048 auto error_code = m_replayer->get_error_code();
1049 auto error_description = m_replayer->get_error_description();
1050 dout(10) << "replay interrupted: "
1051 << "r=" << error_code << ", "
1052 << "error=" << error_description << dendl;
1053 on_stop_journal_replay(error_code, error_description);
1054 return;
7c673cae 1055 }
9f95a23c
TL
1056
1057 update_mirror_image_status(false, {});
7c673cae
FG
1058}
1059
1060template <typename I>
1061std::string ImageReplayer<I>::to_string(const State state) {
1062 switch (state) {
1063 case ImageReplayer<I>::STATE_STARTING:
1064 return "Starting";
1065 case ImageReplayer<I>::STATE_REPLAYING:
1066 return "Replaying";
7c673cae
FG
1067 case ImageReplayer<I>::STATE_STOPPING:
1068 return "Stopping";
1069 case ImageReplayer<I>::STATE_STOPPED:
1070 return "Stopped";
1071 default:
1072 break;
1073 }
1074 return "Unknown(" + stringify(state) + ")";
1075}
1076
d2e6a577
FG
1077template <typename I>
1078void ImageReplayer<I>::register_admin_socket_hook() {
3efd9988
FG
1079 ImageReplayerAdminSocketHook<I> *asok_hook;
1080 {
9f95a23c 1081 std::lock_guard locker{m_lock};
3efd9988
FG
1082 if (m_asok_hook != nullptr) {
1083 return;
1084 }
7c673cae 1085
9f95a23c
TL
1086 dout(15) << "registered asok hook: " << m_image_spec << dendl;
1087 asok_hook = new ImageReplayerAdminSocketHook<I>(
1088 g_ceph_context, m_image_spec, this);
3efd9988
FG
1089 int r = asok_hook->register_commands();
1090 if (r == 0) {
1091 m_asok_hook = asok_hook;
1092 return;
1093 }
d2e6a577 1094 derr << "error registering admin socket commands" << dendl;
d2e6a577 1095 }
3efd9988 1096 delete asok_hook;
d2e6a577
FG
1097}
1098
1099template <typename I>
1100void ImageReplayer<I>::unregister_admin_socket_hook() {
28e407b8 1101 dout(15) << dendl;
d2e6a577 1102
3efd9988
FG
1103 AdminSocketHook *asok_hook = nullptr;
1104 {
9f95a23c 1105 std::lock_guard locker{m_lock};
3efd9988
FG
1106 std::swap(asok_hook, m_asok_hook);
1107 }
1108 delete asok_hook;
1109}
1110
1111template <typename I>
28e407b8 1112void ImageReplayer<I>::reregister_admin_socket_hook() {
9f95a23c
TL
1113 std::unique_lock locker{m_lock};
1114 if (m_state == STATE_STARTING && m_bootstrap_request != nullptr) {
1115 m_local_image_name = m_bootstrap_request->get_local_image_name();
3efd9988 1116 }
9f95a23c
TL
1117
1118 auto image_spec = image_replayer::util::compute_image_spec(
1119 m_local_io_ctx, m_local_image_name);
1120 if (m_asok_hook != nullptr && m_image_spec == image_spec) {
1121 return;
1122 }
1123
1124 dout(15) << "old_image_spec=" << m_image_spec << ", "
1125 << "new_image_spec=" << image_spec << dendl;
1126 m_image_spec = image_spec;
1127
1128 if (m_state == STATE_STOPPING || m_state == STATE_STOPPED) {
1129 // no need to re-register if stopping
1130 return;
1131 }
1132 locker.unlock();
1133
3efd9988
FG
1134 unregister_admin_socket_hook();
1135 register_admin_socket_hook();
7c673cae
FG
1136}
1137
a4b75251
TL
1138template <typename I>
1139void ImageReplayer<I>::remove_image_status(bool force, Context *on_finish)
1140{
1141 auto ctx = new LambdaContext([this, force, on_finish](int) {
1142 remove_image_status_remote(force, on_finish);
1143 });
1144
1145 if (m_local_status_updater->exists(m_global_image_id)) {
1146 dout(15) << "removing local mirror image status" << dendl;
1147 if (force) {
1148 m_local_status_updater->remove_mirror_image_status(
1149 m_global_image_id, true, ctx);
1150 } else {
1151 m_local_status_updater->remove_refresh_mirror_image_status(
1152 m_global_image_id, ctx);
1153 }
1154 return;
1155 }
1156
1157 ctx->complete(0);
1158}
1159
1160template <typename I>
1161void ImageReplayer<I>::remove_image_status_remote(bool force, Context *on_finish)
1162{
1163 if (m_remote_image_peer.mirror_status_updater != nullptr &&
1164 m_remote_image_peer.mirror_status_updater->exists(m_global_image_id)) {
1165 dout(15) << "removing remote mirror image status" << dendl;
1166 if (force) {
1167 m_remote_image_peer.mirror_status_updater->remove_mirror_image_status(
1168 m_global_image_id, true, on_finish);
1169 } else {
1170 m_remote_image_peer.mirror_status_updater->remove_refresh_mirror_image_status(
1171 m_global_image_id, on_finish);
1172 }
1173 return;
1174 }
1175 if (on_finish) {
1176 on_finish->complete(0);
1177 }
1178}
1179
7c673cae
FG
1180template <typename I>
1181std::ostream &operator<<(std::ostream &os, const ImageReplayer<I> &replayer)
1182{
1183 os << "ImageReplayer: " << &replayer << " [" << replayer.get_local_pool_id()
1184 << "/" << replayer.get_global_image_id() << "]";
1185 return os;
1186}
1187
1188} // namespace mirror
1189} // namespace rbd
1190
1191template class rbd::mirror::ImageReplayer<librbd::ImageCtx>;