]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/ImageReplayer.cc
bump version to 15.2.11-pve1
[ceph.git] / ceph / src / tools / rbd_mirror / ImageReplayer.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "include/compat.h"
5#include "common/Formatter.h"
11fdf7f2 6#include "common/admin_socket.h"
7c673cae
FG
7#include "common/debug.h"
8#include "common/errno.h"
9#include "include/stringify.h"
10#include "cls/rbd/cls_rbd_client.h"
11#include "common/Timer.h"
12#include "common/WorkQueue.h"
13#include "global/global_context.h"
14#include "journal/Journaler.h"
7c673cae
FG
15#include "librbd/ExclusiveLock.h"
16#include "librbd/ImageCtx.h"
17#include "librbd/ImageState.h"
18#include "librbd/Journal.h"
19#include "librbd/Operations.h"
20#include "librbd/Utils.h"
d2e6a577 21#include "ImageDeleter.h"
7c673cae 22#include "ImageReplayer.h"
9f95a23c 23#include "MirrorStatusUpdater.h"
7c673cae
FG
24#include "Threads.h"
25#include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
9f95a23c
TL
26#include "tools/rbd_mirror/image_replayer/ReplayerListener.h"
27#include "tools/rbd_mirror/image_replayer/StateBuilder.h"
28#include "tools/rbd_mirror/image_replayer/Utils.h"
29#include "tools/rbd_mirror/image_replayer/journal/Replayer.h"
30#include "tools/rbd_mirror/image_replayer/journal/StateBuilder.h"
31#include <map>
7c673cae
FG
32
33#define dout_context g_ceph_context
34#define dout_subsys ceph_subsys_rbd_mirror
35#undef dout_prefix
36#define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
37 << __func__ << ": "
38
11fdf7f2
TL
39extern PerfCounters *g_perf_counters;
40
7c673cae
FG
41namespace rbd {
42namespace mirror {
43
44using librbd::util::create_context_callback;
7c673cae
FG
45
46template <typename I>
47std::ostream &operator<<(std::ostream &os,
48 const typename ImageReplayer<I>::State &state);
49
50namespace {
51
3efd9988 52template <typename I>
7c673cae
FG
53class ImageReplayerAdminSocketCommand {
54public:
3efd9988
FG
55 ImageReplayerAdminSocketCommand(const std::string &desc,
56 ImageReplayer<I> *replayer)
57 : desc(desc), replayer(replayer) {
58 }
7c673cae 59 virtual ~ImageReplayerAdminSocketCommand() {}
9f95a23c 60 virtual int call(Formatter *f) = 0;
3efd9988
FG
61
62 std::string desc;
63 ImageReplayer<I> *replayer;
64 bool registered = false;
7c673cae
FG
65};
66
67template <typename I>
3efd9988 68class StatusCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 69public:
3efd9988
FG
70 explicit StatusCommand(const std::string &desc, ImageReplayer<I> *replayer)
71 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
72 }
7c673cae 73
9f95a23c
TL
74 int call(Formatter *f) override {
75 this->replayer->print_status(f);
76 return 0;
7c673cae 77 }
7c673cae
FG
78};
79
80template <typename I>
3efd9988 81class StartCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 82public:
3efd9988
FG
83 explicit StartCommand(const std::string &desc, ImageReplayer<I> *replayer)
84 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
85 }
7c673cae 86
9f95a23c 87 int call(Formatter *f) override {
3efd9988 88 this->replayer->start(nullptr, true);
9f95a23c 89 return 0;
7c673cae 90 }
7c673cae
FG
91};
92
93template <typename I>
3efd9988 94class StopCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 95public:
3efd9988
FG
96 explicit StopCommand(const std::string &desc, ImageReplayer<I> *replayer)
97 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
98 }
7c673cae 99
9f95a23c 100 int call(Formatter *f) override {
3efd9988 101 this->replayer->stop(nullptr, true);
9f95a23c 102 return 0;
7c673cae 103 }
7c673cae
FG
104};
105
106template <typename I>
3efd9988 107class RestartCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 108public:
3efd9988
FG
109 explicit RestartCommand(const std::string &desc, ImageReplayer<I> *replayer)
110 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
111 }
7c673cae 112
9f95a23c 113 int call(Formatter *f) override {
3efd9988 114 this->replayer->restart();
9f95a23c 115 return 0;
7c673cae 116 }
7c673cae
FG
117};
118
119template <typename I>
3efd9988 120class FlushCommand : public ImageReplayerAdminSocketCommand<I> {
7c673cae 121public:
3efd9988
FG
122 explicit FlushCommand(const std::string &desc, ImageReplayer<I> *replayer)
123 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
124 }
7c673cae 125
9f95a23c 126 int call(Formatter *f) override {
28e407b8 127 this->replayer->flush();
9f95a23c 128 return 0;
7c673cae 129 }
7c673cae
FG
130};
131
132template <typename I>
133class ImageReplayerAdminSocketHook : public AdminSocketHook {
134public:
135 ImageReplayerAdminSocketHook(CephContext *cct, const std::string &name,
31f18b77 136 ImageReplayer<I> *replayer)
3efd9988
FG
137 : admin_socket(cct->get_admin_socket()),
138 commands{{"rbd mirror flush " + name,
139 new FlushCommand<I>("flush rbd mirror " + name, replayer)},
140 {"rbd mirror restart " + name,
141 new RestartCommand<I>("restart rbd mirror " + name, replayer)},
142 {"rbd mirror start " + name,
143 new StartCommand<I>("start rbd mirror " + name, replayer)},
144 {"rbd mirror status " + name,
145 new StatusCommand<I>("get status for rbd mirror " + name, replayer)},
146 {"rbd mirror stop " + name,
147 new StopCommand<I>("stop rbd mirror " + name, replayer)}} {
d2e6a577
FG
148 }
149
150 int register_commands() {
3efd9988 151 for (auto &it : commands) {
9f95a23c 152 int r = admin_socket->register_command(it.first, this,
3efd9988
FG
153 it.second->desc);
154 if (r < 0) {
155 return r;
156 }
157 it.second->registered = true;
7c673cae 158 }
d2e6a577 159 return 0;
7c673cae
FG
160 }
161
162 ~ImageReplayerAdminSocketHook() override {
9f95a23c 163 admin_socket->unregister_commands(this);
3efd9988 164 for (auto &it : commands) {
3efd9988 165 delete it.second;
7c673cae 166 }
31f18b77 167 commands.clear();
7c673cae
FG
168 }
169
9f95a23c
TL
170 int call(std::string_view command, const cmdmap_t& cmdmap,
171 Formatter *f,
172 std::ostream& errss,
173 bufferlist& out) override {
3efd9988 174 auto i = commands.find(command);
11fdf7f2 175 ceph_assert(i != commands.end());
9f95a23c 176 return i->second->call(f);
7c673cae
FG
177 }
178
179private:
11fdf7f2
TL
180 typedef std::map<std::string, ImageReplayerAdminSocketCommand<I>*,
181 std::less<>> Commands;
7c673cae
FG
182
183 AdminSocket *admin_socket;
184 Commands commands;
185};
186
7c673cae
FG
187} // anonymous namespace
188
189template <typename I>
190void ImageReplayer<I>::BootstrapProgressContext::update_progress(
191 const std::string &description, bool flush)
192{
193 const std::string desc = "bootstrapping, " + description;
194 replayer->set_state_description(0, desc);
195 if (flush) {
196 replayer->update_mirror_image_status(false, boost::none);
197 }
198}
199
200template <typename I>
9f95a23c
TL
201struct ImageReplayer<I>::ReplayerListener
202 : public image_replayer::ReplayerListener {
203 ImageReplayer<I>* image_replayer;
204
205 ReplayerListener(ImageReplayer<I>* image_replayer)
206 : image_replayer(image_replayer) {
207 }
208
209 void handle_notification() override {
210 image_replayer->handle_replayer_notification();
211 }
212};
7c673cae
FG
213
214template <typename I>
9f95a23c
TL
215ImageReplayer<I>::ImageReplayer(
216 librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid,
217 const std::string &global_image_id, Threads<I> *threads,
218 InstanceWatcher<I> *instance_watcher,
219 MirrorStatusUpdater<I>* local_status_updater,
220 journal::CacheManagerHandler *cache_manager_handler,
221 PoolMetaCache* pool_meta_cache) :
222 m_local_io_ctx(local_io_ctx), m_local_mirror_uuid(local_mirror_uuid),
223 m_global_image_id(global_image_id), m_threads(threads),
31f18b77 224 m_instance_watcher(instance_watcher),
9f95a23c
TL
225 m_local_status_updater(local_status_updater),
226 m_cache_manager_handler(cache_manager_handler),
227 m_pool_meta_cache(pool_meta_cache),
228 m_local_image_name(global_image_id),
229 m_lock(ceph::make_mutex("rbd::mirror::ImageReplayer " +
230 stringify(local_io_ctx.get_id()) + " " + global_image_id)),
7c673cae 231 m_progress_cxt(this),
9f95a23c 232 m_replayer_listener(new ReplayerListener(this))
7c673cae
FG
233{
234 // Register asok commands using a temporary "remote_pool_name/global_image_id"
235 // name. When the image name becomes known on start the asok commands will be
236 // re-registered using "remote_pool_name/remote_image_name" name.
237
9f95a23c
TL
238 m_image_spec = image_replayer::util::compute_image_spec(
239 local_io_ctx, global_image_id);
d2e6a577 240 register_admin_socket_hook();
7c673cae
FG
241}
242
243template <typename I>
244ImageReplayer<I>::~ImageReplayer()
245{
d2e6a577 246 unregister_admin_socket_hook();
9f95a23c 247 ceph_assert(m_state_builder == nullptr);
11fdf7f2
TL
248 ceph_assert(m_on_start_finish == nullptr);
249 ceph_assert(m_on_stop_finish == nullptr);
250 ceph_assert(m_bootstrap_request == nullptr);
1911f103 251 ceph_assert(m_update_status_task == nullptr);
9f95a23c 252 delete m_replayer_listener;
7c673cae
FG
253}
254
c07f9fc5
FG
255template <typename I>
256image_replayer::HealthState ImageReplayer<I>::get_health_state() const {
9f95a23c 257 std::lock_guard locker{m_lock};
c07f9fc5
FG
258
259 if (!m_mirror_image_status_state) {
260 return image_replayer::HEALTH_STATE_OK;
261 } else if (*m_mirror_image_status_state ==
262 cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING ||
263 *m_mirror_image_status_state ==
264 cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN) {
265 return image_replayer::HEALTH_STATE_WARNING;
266 }
267 return image_replayer::HEALTH_STATE_ERROR;
268}
269
7c673cae 270template <typename I>
9f95a23c
TL
271void ImageReplayer<I>::add_peer(const Peer<I>& peer) {
272 dout(10) << "peer=" << peer << dendl;
273
274 std::lock_guard locker{m_lock};
275 auto it = m_peers.find(peer);
d2e6a577 276 if (it == m_peers.end()) {
9f95a23c 277 m_peers.insert(peer);
7c673cae
FG
278 }
279}
280
7c673cae
FG
281template <typename I>
282void ImageReplayer<I>::set_state_description(int r, const std::string &desc) {
9f95a23c 283 dout(10) << "r=" << r << ", desc=" << desc << dendl;
7c673cae 284
9f95a23c 285 std::lock_guard l{m_lock};
7c673cae
FG
286 m_last_r = r;
287 m_state_desc = desc;
288}
289
290template <typename I>
e306af50 291void ImageReplayer<I>::start(Context *on_finish, bool manual, bool restart)
7c673cae 292{
11fdf7f2 293 dout(10) << "on_finish=" << on_finish << dendl;
7c673cae
FG
294
295 int r = 0;
296 {
9f95a23c 297 std::lock_guard locker{m_lock};
7c673cae
FG
298 if (!is_stopped_()) {
299 derr << "already running" << dendl;
300 r = -EINVAL;
301 } else if (m_manual_stop && !manual) {
302 dout(5) << "stopped manually, ignoring start without manual flag"
303 << dendl;
304 r = -EPERM;
e306af50
TL
305 } else if (restart && !m_restart_requested) {
306 dout(10) << "canceled restart" << dendl;
307 r = -ECANCELED;
7c673cae
FG
308 } else {
309 m_state = STATE_STARTING;
310 m_last_r = 0;
311 m_state_desc.clear();
312 m_manual_stop = false;
d2e6a577 313 m_delete_requested = false;
e306af50 314 m_restart_requested = false;
7c673cae
FG
315
316 if (on_finish != nullptr) {
11fdf7f2 317 ceph_assert(m_on_start_finish == nullptr);
7c673cae
FG
318 m_on_start_finish = on_finish;
319 }
11fdf7f2 320 ceph_assert(m_on_stop_finish == nullptr);
7c673cae
FG
321 }
322 }
323
324 if (r < 0) {
325 if (on_finish) {
326 on_finish->complete(r);
327 }
328 return;
329 }
330
9f95a23c 331 bootstrap();
7c673cae
FG
332}
333
334template <typename I>
9f95a23c 335void ImageReplayer<I>::bootstrap() {
11fdf7f2 336 dout(10) << dendl;
7c673cae 337
9f95a23c 338 std::unique_lock locker{m_lock};
b32b8144 339 if (m_peers.empty()) {
9f95a23c
TL
340 locker.unlock();
341
342 dout(5) << "no peer clusters" << dendl;
343 on_start_fail(-ENOENT, "no peer clusters");
b32b8144
FG
344 return;
345 }
7c673cae 346
d2e6a577 347 // TODO need to support multiple remote images
11fdf7f2 348 ceph_assert(!m_peers.empty());
9f95a23c 349 m_remote_image_peer = *m_peers.begin();
7c673cae 350
9f95a23c 351 if (on_start_interrupted(m_lock)) {
b32b8144
FG
352 return;
353 }
7c673cae 354
9f95a23c
TL
355 ceph_assert(m_state_builder == nullptr);
356 auto ctx = create_context_callback<
11fdf7f2 357 ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
9f95a23c
TL
358 auto request = image_replayer::BootstrapRequest<I>::create(
359 m_threads, m_local_io_ctx, m_remote_image_peer.io_ctx, m_instance_watcher,
360 m_global_image_id, m_local_mirror_uuid,
361 m_remote_image_peer.remote_pool_meta, m_cache_manager_handler,
362 m_pool_meta_cache, &m_progress_cxt, &m_state_builder, &m_resync_requested,
363 ctx);
7c673cae 364
9f95a23c
TL
365 request->get();
366 m_bootstrap_request = request;
367 locker.unlock();
7c673cae 368
9f95a23c 369 update_mirror_image_status(false, boost::none);
7c673cae
FG
370 request->send();
371}
372
373template <typename I>
374void ImageReplayer<I>::handle_bootstrap(int r) {
11fdf7f2 375 dout(10) << "r=" << r << dendl;
7c673cae 376 {
9f95a23c 377 std::lock_guard locker{m_lock};
7c673cae
FG
378 m_bootstrap_request->put();
379 m_bootstrap_request = nullptr;
7c673cae
FG
380 }
381
11fdf7f2
TL
382 if (on_start_interrupted()) {
383 return;
9f95a23c
TL
384 } else if (r == -ENOMSG) {
385 dout(5) << "local image is primary" << dendl;
386 on_start_fail(0, "local image is primary");
387 return;
11fdf7f2 388 } else if (r == -EREMOTEIO) {
c07f9fc5
FG
389 dout(5) << "remote image is non-primary" << dendl;
390 on_start_fail(-EREMOTEIO, "remote image is non-primary");
7c673cae
FG
391 return;
392 } else if (r == -EEXIST) {
7c673cae
FG
393 on_start_fail(r, "split-brain detected");
394 return;
9f95a23c
TL
395 } else if (r == -ENOLINK) {
396 m_delete_requested = true;
397 on_start_fail(0, "remote image no longer exists");
398 return;
7c673cae
FG
399 } else if (r < 0) {
400 on_start_fail(r, "error bootstrapping replay");
401 return;
d2e6a577
FG
402 } else if (m_resync_requested) {
403 on_start_fail(0, "resync requested");
404 return;
7c673cae
FG
405 }
406
9f95a23c 407 start_replay();
7c673cae
FG
408}
409
410template <typename I>
9f95a23c 411void ImageReplayer<I>::start_replay() {
11fdf7f2 412 dout(10) << dendl;
7c673cae 413
9f95a23c
TL
414 std::unique_lock locker{m_lock};
415 ceph_assert(m_replayer == nullptr);
416 m_replayer = m_state_builder->create_replayer(m_threads, m_instance_watcher,
417 m_local_mirror_uuid,
418 m_pool_meta_cache,
419 m_replayer_listener);
420
421 auto ctx = create_context_callback<
422 ImageReplayer<I>, &ImageReplayer<I>::handle_start_replay>(this);
423 m_replayer->init(ctx);
7c673cae
FG
424}
425
426template <typename I>
9f95a23c 427void ImageReplayer<I>::handle_start_replay(int r) {
11fdf7f2 428 dout(10) << "r=" << r << dendl;
7c673cae 429
11fdf7f2
TL
430 if (on_start_interrupted()) {
431 return;
432 } else if (r < 0) {
9f95a23c
TL
433 std::string error_description = m_replayer->get_error_description();
434 if (r == -ENOTCONN && m_replayer->is_resync_requested()) {
435 std::unique_lock locker{m_lock};
d2e6a577 436 m_resync_requested = true;
7c673cae 437 }
7c673cae 438
9f95a23c
TL
439 // shut down not required if init failed
440 m_replayer->destroy();
441 m_replayer = nullptr;
7c673cae 442
9f95a23c
TL
443 derr << "error starting replay: " << cpp_strerror(r) << dendl;
444 on_start_fail(r, error_description);
7c673cae
FG
445 return;
446 }
447
9f95a23c 448 Context *on_finish = nullptr;
7c673cae 449 {
9f95a23c 450 std::unique_lock locker{m_lock};
11fdf7f2 451 ceph_assert(m_state == STATE_STARTING);
7c673cae
FG
452 m_state = STATE_REPLAYING;
453 std::swap(m_on_start_finish, on_finish);
1911f103
TL
454
455 std::unique_lock timer_locker{m_threads->timer_lock};
456 schedule_update_mirror_image_replay_status();
7c673cae
FG
457 }
458
7c673cae 459 update_mirror_image_status(true, boost::none);
7c673cae 460 if (on_replay_interrupted()) {
9f95a23c
TL
461 if (on_finish != nullptr) {
462 on_finish->complete(r);
463 }
7c673cae
FG
464 return;
465 }
466
11fdf7f2 467 dout(10) << "start succeeded" << dendl;
d2e6a577 468 if (on_finish != nullptr) {
11fdf7f2 469 dout(10) << "on finish complete, r=" << r << dendl;
d2e6a577
FG
470 on_finish->complete(r);
471 }
7c673cae
FG
472}
473
474template <typename I>
475void ImageReplayer<I>::on_start_fail(int r, const std::string &desc)
476{
9f95a23c
TL
477 dout(10) << "r=" << r << ", desc=" << desc << dendl;
478 Context *ctx = new LambdaContext([this, r, desc](int _r) {
7c673cae 479 {
9f95a23c 480 std::lock_guard locker{m_lock};
11fdf7f2 481 ceph_assert(m_state == STATE_STARTING);
7c673cae 482 m_state = STATE_STOPPING;
d2e6a577 483 if (r < 0 && r != -ECANCELED && r != -EREMOTEIO && r != -ENOENT) {
7c673cae
FG
484 derr << "start failed: " << cpp_strerror(r) << dendl;
485 } else {
11fdf7f2 486 dout(10) << "start canceled" << dendl;
7c673cae
FG
487 }
488 }
489
490 set_state_description(r, desc);
9f95a23c 491 update_mirror_image_status(false, boost::none);
7c673cae
FG
492 shut_down(r);
493 });
494 m_threads->work_queue->queue(ctx, 0);
495}
496
497template <typename I>
11fdf7f2 498bool ImageReplayer<I>::on_start_interrupted() {
9f95a23c 499 std::lock_guard locker{m_lock};
11fdf7f2
TL
500 return on_start_interrupted(m_lock);
501}
502
503template <typename I>
9f95a23c
TL
504bool ImageReplayer<I>::on_start_interrupted(ceph::mutex& lock) {
505 ceph_assert(ceph_mutex_is_locked(m_lock));
11fdf7f2
TL
506 ceph_assert(m_state == STATE_STARTING);
507 if (!m_stop_requested) {
7c673cae
FG
508 return false;
509 }
510
11fdf7f2 511 on_start_fail(-ECANCELED, "");
7c673cae
FG
512 return true;
513}
514
515template <typename I>
e306af50 516void ImageReplayer<I>::stop(Context *on_finish, bool manual, bool restart)
7c673cae 517{
11fdf7f2 518 dout(10) << "on_finish=" << on_finish << ", manual=" << manual
e306af50 519 << ", restart=" << restart << dendl;
7c673cae
FG
520
521 image_replayer::BootstrapRequest<I> *bootstrap_request = nullptr;
522 bool shut_down_replay = false;
523 bool running = true;
7c673cae 524 {
9f95a23c 525 std::lock_guard locker{m_lock};
7c673cae 526
e306af50
TL
527 if (restart) {
528 m_restart_requested = true;
529 }
530
7c673cae
FG
531 if (!is_running_()) {
532 running = false;
e306af50
TL
533 if (!restart && m_restart_requested) {
534 dout(10) << "canceling restart" << dendl;
535 m_restart_requested = false;
536 }
7c673cae
FG
537 } else {
538 if (!is_stopped_()) {
539 if (m_state == STATE_STARTING) {
11fdf7f2
TL
540 dout(10) << "canceling start" << dendl;
541 if (m_bootstrap_request != nullptr) {
7c673cae
FG
542 bootstrap_request = m_bootstrap_request;
543 bootstrap_request->get();
544 }
545 } else {
11fdf7f2 546 dout(10) << "interrupting replay" << dendl;
7c673cae
FG
547 shut_down_replay = true;
548 }
549
11fdf7f2 550 ceph_assert(m_on_stop_finish == nullptr);
7c673cae
FG
551 std::swap(m_on_stop_finish, on_finish);
552 m_stop_requested = true;
553 m_manual_stop = manual;
7c673cae
FG
554 }
555 }
556 }
557
558 // avoid holding lock since bootstrap request will update status
559 if (bootstrap_request != nullptr) {
11fdf7f2 560 dout(10) << "canceling bootstrap" << dendl;
7c673cae
FG
561 bootstrap_request->cancel();
562 bootstrap_request->put();
563 }
564
7c673cae
FG
565 if (!running) {
566 dout(20) << "not running" << dendl;
567 if (on_finish) {
568 on_finish->complete(-EINVAL);
569 }
570 return;
571 }
572
573 if (shut_down_replay) {
e306af50 574 on_stop_journal_replay();
7c673cae
FG
575 } else if (on_finish != nullptr) {
576 on_finish->complete(0);
577 }
578}
579
580template <typename I>
581void ImageReplayer<I>::on_stop_journal_replay(int r, const std::string &desc)
582{
11fdf7f2 583 dout(10) << dendl;
7c673cae
FG
584
585 {
9f95a23c 586 std::lock_guard locker{m_lock};
7c673cae
FG
587 if (m_state != STATE_REPLAYING) {
588 // might be invoked multiple times while stopping
589 return;
590 }
9f95a23c 591
7c673cae
FG
592 m_stop_requested = true;
593 m_state = STATE_STOPPING;
594 }
595
1911f103 596 cancel_update_mirror_image_replay_status();
7c673cae 597 set_state_description(r, desc);
a8e16298 598 update_mirror_image_status(true, boost::none);
7c673cae
FG
599 shut_down(0);
600}
601
7c673cae
FG
602template <typename I>
603void ImageReplayer<I>::restart(Context *on_finish)
604{
e306af50
TL
605 {
606 std::lock_guard locker{m_lock};
607 m_restart_requested = true;
608 }
609
9f95a23c 610 auto ctx = new LambdaContext(
7c673cae
FG
611 [this, on_finish](int r) {
612 if (r < 0) {
613 // Try start anyway.
614 }
e306af50 615 start(on_finish, true, true);
7c673cae 616 });
e306af50 617 stop(ctx, false, true);
7c673cae
FG
618}
619
620template <typename I>
81eedcae 621void ImageReplayer<I>::flush()
7c673cae 622{
81eedcae 623 C_SaferCond ctx;
7c673cae 624
9f95a23c
TL
625 {
626 std::unique_lock locker{m_lock};
627 if (m_state != STATE_REPLAYING) {
628 return;
629 }
7c673cae 630
9f95a23c
TL
631 dout(10) << dendl;
632 ceph_assert(m_replayer != nullptr);
633 m_replayer->flush(&ctx);
81eedcae
TL
634 }
635
9f95a23c
TL
636 int r = ctx.wait();
637 if (r >= 0) {
638 update_mirror_image_status(false, boost::none);
7c673cae 639 }
7c673cae
FG
640}
641
642template <typename I>
643bool ImageReplayer<I>::on_replay_interrupted()
644{
645 bool shut_down;
646 {
9f95a23c 647 std::lock_guard locker{m_lock};
7c673cae
FG
648 shut_down = m_stop_requested;
649 }
650
651 if (shut_down) {
652 on_stop_journal_replay();
653 }
654 return shut_down;
655}
656
657template <typename I>
9f95a23c 658void ImageReplayer<I>::print_status(Formatter *f)
7c673cae 659{
11fdf7f2 660 dout(10) << dendl;
7c673cae 661
9f95a23c 662 std::lock_guard l{m_lock};
7c673cae 663
9f95a23c
TL
664 f->open_object_section("image_replayer");
665 f->dump_string("name", m_image_spec);
666 f->dump_string("state", to_string(m_state));
667 f->close_section();
7c673cae
FG
668}
669
1911f103
TL
670template <typename I>
671void ImageReplayer<I>::schedule_update_mirror_image_replay_status() {
672 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
673 ceph_assert(ceph_mutex_is_locked_by_me(m_threads->timer_lock));
674 if (m_state != STATE_REPLAYING) {
675 return;
676 }
677
678 dout(10) << dendl;
679
680 // periodically update the replaying status even if nothing changes
681 // so that we can adjust our performance stats
682 ceph_assert(m_update_status_task == nullptr);
683 m_update_status_task = create_context_callback<
684 ImageReplayer<I>,
685 &ImageReplayer<I>::handle_update_mirror_image_replay_status>(this);
686 m_threads->timer->add_event_after(10, m_update_status_task);
687}
688
689template <typename I>
690void ImageReplayer<I>::handle_update_mirror_image_replay_status(int r) {
691 dout(10) << dendl;
692
cd265ab1
TL
693 ceph_assert(ceph_mutex_is_locked_by_me(m_threads->timer_lock));
694
695 ceph_assert(m_update_status_task != nullptr);
696 m_update_status_task = nullptr;
697
1911f103
TL
698 auto ctx = new LambdaContext([this](int) {
699 update_mirror_image_status(false, boost::none);
700
cd265ab1
TL
701 std::unique_lock locker{m_lock};
702 std::unique_lock timer_locker{m_threads->timer_lock};
1911f103 703
cd265ab1 704 schedule_update_mirror_image_replay_status();
1911f103
TL
705 m_in_flight_op_tracker.finish_op();
706 });
707
708 m_in_flight_op_tracker.start_op();
709 m_threads->work_queue->queue(ctx, 0);
710}
711
712template <typename I>
713void ImageReplayer<I>::cancel_update_mirror_image_replay_status() {
714 std::unique_lock timer_locker{m_threads->timer_lock};
715 if (m_update_status_task != nullptr) {
716 dout(10) << dendl;
717
718 if (m_threads->timer->cancel_event(m_update_status_task)) {
719 m_update_status_task = nullptr;
720 }
721 }
722}
723
7c673cae 724template <typename I>
9f95a23c
TL
725void ImageReplayer<I>::update_mirror_image_status(
726 bool force, const OptionalState &opt_state) {
727 dout(15) << "force=" << force << ", "
728 << "state=" << opt_state << dendl;
7c673cae
FG
729
730 {
9f95a23c
TL
731 std::lock_guard locker{m_lock};
732 if (!force && !is_stopped_() && !is_running_()) {
733 dout(15) << "shut down in-progress: ignoring update" << dendl;
28e407b8 734 return;
28e407b8
AA
735 }
736 }
737
9f95a23c
TL
738 m_in_flight_op_tracker.start_op();
739 auto ctx = new LambdaContext(
740 [this, force, opt_state](int r) {
741 set_mirror_image_status_update(force, opt_state);
11fdf7f2
TL
742 });
743 m_threads->work_queue->queue(ctx, 0);
7c673cae
FG
744}
745
746template <typename I>
9f95a23c
TL
747void ImageReplayer<I>::set_mirror_image_status_update(
748 bool force, const OptionalState &opt_state) {
749 dout(15) << "force=" << force << ", "
750 << "state=" << opt_state << dendl;
7c673cae 751
28e407b8
AA
752 reregister_admin_socket_hook();
753
7c673cae
FG
754 State state;
755 std::string state_desc;
756 int last_r;
7c673cae 757 bool stopping_replay;
c07f9fc5 758
9f95a23c
TL
759 auto mirror_image_status_state = boost::make_optional(
760 false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN);
c07f9fc5 761 image_replayer::BootstrapRequest<I>* bootstrap_request = nullptr;
7c673cae 762 {
9f95a23c 763 std::lock_guard locker{m_lock};
7c673cae
FG
764 state = m_state;
765 state_desc = m_state_desc;
c07f9fc5 766 mirror_image_status_state = m_mirror_image_status_state;
7c673cae 767 last_r = m_last_r;
9f95a23c 768 stopping_replay = (m_replayer != nullptr);
c07f9fc5
FG
769
770 if (m_bootstrap_request != nullptr) {
771 bootstrap_request = m_bootstrap_request;
772 bootstrap_request->get();
773 }
774 }
775
776 bool syncing = false;
777 if (bootstrap_request != nullptr) {
778 syncing = bootstrap_request->is_syncing();
779 bootstrap_request->put();
780 bootstrap_request = nullptr;
7c673cae
FG
781 }
782
783 if (opt_state) {
784 state = *opt_state;
785 }
786
9f95a23c 787 cls::rbd::MirrorImageSiteStatus status;
7c673cae
FG
788 status.up = true;
789 switch (state) {
790 case STATE_STARTING:
c07f9fc5 791 if (syncing) {
7c673cae
FG
792 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING;
793 status.description = state_desc.empty() ? "syncing" : state_desc;
c07f9fc5 794 mirror_image_status_state = status.state;
7c673cae
FG
795 } else {
796 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY;
797 status.description = "starting replay";
798 }
799 break;
800 case STATE_REPLAYING:
7c673cae
FG
801 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
802 {
9f95a23c
TL
803 std::string desc;
804 auto on_req_finish = new LambdaContext(
805 [this, force](int r) {
11fdf7f2 806 dout(15) << "replay status ready: r=" << r << dendl;
7c673cae 807 if (r >= 0) {
9f95a23c 808 set_mirror_image_status_update(force, boost::none);
7c673cae 809 } else if (r == -EAGAIN) {
9f95a23c 810 m_in_flight_op_tracker.finish_op();
7c673cae
FG
811 }
812 });
813
9f95a23c
TL
814 ceph_assert(m_replayer != nullptr);
815 if (!m_replayer->get_replay_status(&desc, on_req_finish)) {
11fdf7f2 816 dout(15) << "waiting for replay status" << dendl;
7c673cae
FG
817 return;
818 }
9f95a23c 819
7c673cae 820 status.description = "replaying, " + desc;
11fdf7f2
TL
821 mirror_image_status_state = boost::make_optional(
822 false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN);
7c673cae
FG
823 }
824 break;
825 case STATE_STOPPING:
826 if (stopping_replay) {
827 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY;
a8e16298 828 status.description = state_desc.empty() ? "stopping replay" : state_desc;
7c673cae
FG
829 break;
830 }
831 // FALLTHROUGH
832 case STATE_STOPPED:
c07f9fc5
FG
833 if (last_r == -EREMOTEIO) {
834 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN;
835 status.description = state_desc;
836 mirror_image_status_state = status.state;
9f95a23c 837 } else if (last_r < 0 && last_r != -ECANCELED) {
7c673cae
FG
838 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR;
839 status.description = state_desc;
c07f9fc5 840 mirror_image_status_state = status.state;
7c673cae
FG
841 } else {
842 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED;
843 status.description = state_desc.empty() ? "stopped" : state_desc;
c07f9fc5 844 mirror_image_status_state = boost::none;
7c673cae
FG
845 }
846 break;
847 default:
11fdf7f2 848 ceph_assert(!"invalid state");
7c673cae
FG
849 }
850
c07f9fc5 851 {
9f95a23c 852 std::lock_guard locker{m_lock};
c07f9fc5
FG
853 m_mirror_image_status_state = mirror_image_status_state;
854 }
855
856 // prevent the status from ping-ponging when failed replays are restarted
857 if (mirror_image_status_state &&
858 *mirror_image_status_state == cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR) {
859 status.state = *mirror_image_status_state;
860 }
861
11fdf7f2 862 dout(15) << "status=" << status << dendl;
9f95a23c
TL
863 m_local_status_updater->set_mirror_image_status(m_global_image_id, status,
864 force);
865 if (m_remote_image_peer.mirror_status_updater != nullptr) {
866 m_remote_image_peer.mirror_status_updater->set_mirror_image_status(
867 m_global_image_id, status, force);
7c673cae
FG
868 }
869
9f95a23c 870 m_in_flight_op_tracker.finish_op();
7c673cae
FG
871}
872
873template <typename I>
874void ImageReplayer<I>::shut_down(int r) {
11fdf7f2 875 dout(10) << "r=" << r << dendl;
3efd9988 876
3efd9988 877 {
9f95a23c 878 std::lock_guard locker{m_lock};
11fdf7f2 879 ceph_assert(m_state == STATE_STOPPING);
7c673cae
FG
880 }
881
9f95a23c
TL
882 if (!m_in_flight_op_tracker.empty()) {
883 dout(15) << "waiting for in-flight operations to complete" << dendl;
884 m_in_flight_op_tracker.wait_for_ops(new LambdaContext([this, r](int) {
885 shut_down(r);
886 }));
887 return;
888 }
31f18b77 889
7c673cae 890 // chain the shut down sequence (reverse order)
9f95a23c 891 Context *ctx = new LambdaContext(
7c673cae 892 [this, r](int _r) {
9f95a23c 893 update_mirror_image_status(true, STATE_STOPPED);
7c673cae
FG
894 handle_shut_down(r);
895 });
31f18b77 896
9f95a23c
TL
897 // destruct the state builder
898 if (m_state_builder != nullptr) {
899 ctx = new LambdaContext([this, ctx](int r) {
900 m_state_builder->close(ctx);
31f18b77
FG
901 });
902 }
903
9f95a23c
TL
904 // close the replayer
905 if (m_replayer != nullptr) {
906 ctx = new LambdaContext([this, ctx](int r) {
907 m_replayer->destroy();
908 m_replayer = nullptr;
909 ctx->complete(0);
910 });
911 ctx = new LambdaContext([this, ctx](int r) {
912 m_replayer->shut_down(ctx);
31f18b77 913 });
7c673cae 914 }
31f18b77 915
7c673cae
FG
916 m_threads->work_queue->queue(ctx, 0);
917}
918
919template <typename I>
920void ImageReplayer<I>::handle_shut_down(int r) {
11fdf7f2
TL
921 bool resync_requested = false;
922 bool delete_requested = false;
3efd9988 923 bool unregister_asok_hook = false;
7c673cae 924 {
9f95a23c 925 std::lock_guard locker{m_lock};
7c673cae 926
9f95a23c
TL
927 if (m_delete_requested && m_state_builder != nullptr &&
928 !m_state_builder->local_image_id.empty()) {
929 ceph_assert(m_state_builder->remote_image_id.empty());
d2e6a577 930 dout(0) << "remote image no longer exists: scheduling deletion" << dendl;
11fdf7f2
TL
931 unregister_asok_hook = true;
932 std::swap(delete_requested, m_delete_requested);
d2e6a577 933 }
d2e6a577 934
11fdf7f2 935 std::swap(resync_requested, m_resync_requested);
9f95a23c
TL
936 if (!delete_requested && !resync_requested && m_last_r == -ENOENT &&
937 ((m_state_builder == nullptr) ||
938 (m_state_builder->local_image_id.empty() &&
939 m_state_builder->remote_image_id.empty()))) {
d2e6a577 940 dout(0) << "mirror image no longer exists" << dendl;
3efd9988 941 unregister_asok_hook = true;
d2e6a577 942 m_finished = true;
7c673cae
FG
943 }
944 }
945
3efd9988
FG
946 if (unregister_asok_hook) {
947 unregister_admin_socket_hook();
948 }
949
11fdf7f2
TL
950 if (delete_requested || resync_requested) {
951 dout(5) << "moving image to trash" << dendl;
9f95a23c 952 auto ctx = new LambdaContext([this, r](int) {
11fdf7f2
TL
953 handle_shut_down(r);
954 });
9f95a23c 955 ImageDeleter<I>::trash_move(m_local_io_ctx, m_global_image_id,
11fdf7f2
TL
956 resync_requested, m_threads->work_queue, ctx);
957 return;
958 }
7c673cae 959
9f95a23c
TL
960 if (!m_in_flight_op_tracker.empty()) {
961 dout(15) << "waiting for in-flight operations to complete" << dendl;
962 m_in_flight_op_tracker.wait_for_ops(new LambdaContext([this, r](int) {
963 handle_shut_down(r);
964 }));
965 return;
966 }
7c673cae 967
9f95a23c
TL
968 if (m_local_status_updater->exists(m_global_image_id)) {
969 dout(15) << "removing local mirror image status" << dendl;
970 auto ctx = new LambdaContext([this, r](int) {
971 handle_shut_down(r);
972 });
973 m_local_status_updater->remove_mirror_image_status(m_global_image_id, ctx);
974 return;
975 }
976
977 if (m_remote_image_peer.mirror_status_updater != nullptr &&
978 m_remote_image_peer.mirror_status_updater->exists(m_global_image_id)) {
979 dout(15) << "removing remote mirror image status" << dendl;
980 auto ctx = new LambdaContext([this, r](int) {
981 handle_shut_down(r);
982 });
983 m_remote_image_peer.mirror_status_updater->remove_mirror_image_status(
984 m_global_image_id, ctx);
985 return;
986 }
987
988 if (m_state_builder != nullptr) {
989 m_state_builder->destroy();
990 m_state_builder = nullptr;
991 }
992
993 dout(10) << "stop complete" << dendl;
7c673cae
FG
994 Context *on_start = nullptr;
995 Context *on_stop = nullptr;
996 {
9f95a23c 997 std::lock_guard locker{m_lock};
7c673cae
FG
998 std::swap(on_start, m_on_start_finish);
999 std::swap(on_stop, m_on_stop_finish);
1000 m_stop_requested = false;
11fdf7f2 1001 ceph_assert(m_state == STATE_STOPPING);
7c673cae
FG
1002 m_state = STATE_STOPPED;
1003 }
1004
1005 if (on_start != nullptr) {
11fdf7f2 1006 dout(10) << "on start finish complete, r=" << r << dendl;
7c673cae
FG
1007 on_start->complete(r);
1008 r = 0;
1009 }
1010 if (on_stop != nullptr) {
11fdf7f2 1011 dout(10) << "on stop finish complete, r=" << r << dendl;
7c673cae
FG
1012 on_stop->complete(r);
1013 }
1014}
1015
1016template <typename I>
9f95a23c
TL
1017void ImageReplayer<I>::handle_replayer_notification() {
1018 dout(10) << dendl;
1019
1020 std::unique_lock locker{m_lock};
1021 if (m_state != STATE_REPLAYING) {
1022 // might be attempting to shut down
1023 return;
1024 }
7c673cae 1025
7c673cae 1026 {
9f95a23c
TL
1027 // detect a rename of the local image
1028 ceph_assert(m_state_builder != nullptr &&
1029 m_state_builder->local_image_ctx != nullptr);
1030 std::shared_lock image_locker{m_state_builder->local_image_ctx->image_lock};
1031 if (m_local_image_name != m_state_builder->local_image_ctx->name) {
1032 // will re-register with new name after next status update
1033 dout(10) << "image renamed" << dendl;
1034 m_local_image_name = m_state_builder->local_image_ctx->name;
7c673cae 1035 }
9f95a23c 1036 }
7c673cae 1037
9f95a23c
TL
1038 // replayer cannot be shut down while notification is in-flight
1039 ceph_assert(m_replayer != nullptr);
1040 locker.unlock();
1041
1042 if (m_replayer->is_resync_requested()) {
1043 dout(10) << "resync requested" << dendl;
1044 m_resync_requested = true;
1045 on_stop_journal_replay(0, "resync requested");
1046 return;
7c673cae
FG
1047 }
1048
9f95a23c
TL
1049 if (!m_replayer->is_replaying()) {
1050 auto error_code = m_replayer->get_error_code();
1051 auto error_description = m_replayer->get_error_description();
1052 dout(10) << "replay interrupted: "
1053 << "r=" << error_code << ", "
1054 << "error=" << error_description << dendl;
1055 on_stop_journal_replay(error_code, error_description);
1056 return;
7c673cae 1057 }
9f95a23c
TL
1058
1059 update_mirror_image_status(false, {});
7c673cae
FG
1060}
1061
1062template <typename I>
1063std::string ImageReplayer<I>::to_string(const State state) {
1064 switch (state) {
1065 case ImageReplayer<I>::STATE_STARTING:
1066 return "Starting";
1067 case ImageReplayer<I>::STATE_REPLAYING:
1068 return "Replaying";
7c673cae
FG
1069 case ImageReplayer<I>::STATE_STOPPING:
1070 return "Stopping";
1071 case ImageReplayer<I>::STATE_STOPPED:
1072 return "Stopped";
1073 default:
1074 break;
1075 }
1076 return "Unknown(" + stringify(state) + ")";
1077}
1078
d2e6a577
FG
1079template <typename I>
1080void ImageReplayer<I>::register_admin_socket_hook() {
3efd9988
FG
1081 ImageReplayerAdminSocketHook<I> *asok_hook;
1082 {
9f95a23c 1083 std::lock_guard locker{m_lock};
3efd9988
FG
1084 if (m_asok_hook != nullptr) {
1085 return;
1086 }
7c673cae 1087
9f95a23c
TL
1088 dout(15) << "registered asok hook: " << m_image_spec << dendl;
1089 asok_hook = new ImageReplayerAdminSocketHook<I>(
1090 g_ceph_context, m_image_spec, this);
3efd9988
FG
1091 int r = asok_hook->register_commands();
1092 if (r == 0) {
1093 m_asok_hook = asok_hook;
1094 return;
1095 }
d2e6a577 1096 derr << "error registering admin socket commands" << dendl;
d2e6a577 1097 }
3efd9988 1098 delete asok_hook;
d2e6a577
FG
1099}
1100
1101template <typename I>
1102void ImageReplayer<I>::unregister_admin_socket_hook() {
28e407b8 1103 dout(15) << dendl;
d2e6a577 1104
3efd9988
FG
1105 AdminSocketHook *asok_hook = nullptr;
1106 {
9f95a23c 1107 std::lock_guard locker{m_lock};
3efd9988
FG
1108 std::swap(asok_hook, m_asok_hook);
1109 }
1110 delete asok_hook;
1111}
1112
1113template <typename I>
28e407b8 1114void ImageReplayer<I>::reregister_admin_socket_hook() {
9f95a23c
TL
1115 std::unique_lock locker{m_lock};
1116 if (m_state == STATE_STARTING && m_bootstrap_request != nullptr) {
1117 m_local_image_name = m_bootstrap_request->get_local_image_name();
3efd9988 1118 }
9f95a23c
TL
1119
1120 auto image_spec = image_replayer::util::compute_image_spec(
1121 m_local_io_ctx, m_local_image_name);
1122 if (m_asok_hook != nullptr && m_image_spec == image_spec) {
1123 return;
1124 }
1125
1126 dout(15) << "old_image_spec=" << m_image_spec << ", "
1127 << "new_image_spec=" << image_spec << dendl;
1128 m_image_spec = image_spec;
1129
1130 if (m_state == STATE_STOPPING || m_state == STATE_STOPPED) {
1131 // no need to re-register if stopping
1132 return;
1133 }
1134 locker.unlock();
1135
3efd9988
FG
1136 unregister_admin_socket_hook();
1137 register_admin_socket_hook();
7c673cae
FG
1138}
1139
1140template <typename I>
1141std::ostream &operator<<(std::ostream &os, const ImageReplayer<I> &replayer)
1142{
1143 os << "ImageReplayer: " << &replayer << " [" << replayer.get_local_pool_id()
1144 << "/" << replayer.get_global_image_id() << "]";
1145 return os;
1146}
1147
1148} // namespace mirror
1149} // namespace rbd
1150
1151template class rbd::mirror::ImageReplayer<librbd::ImageCtx>;