#include "include/compat.h"
#include "common/Formatter.h"
+#include "common/admin_socket.h"
#include "common/debug.h"
#include "common/errno.h"
#include "include/stringify.h"
#include "cls/rbd/cls_rbd_client.h"
#include "common/Timer.h"
-#include "common/WorkQueue.h"
#include "global/global_context.h"
#include "journal/Journaler.h"
-#include "journal/ReplayHandler.h"
-#include "journal/Settings.h"
#include "librbd/ExclusiveLock.h"
#include "librbd/ImageCtx.h"
#include "librbd/ImageState.h"
#include "librbd/Journal.h"
#include "librbd/Operations.h"
#include "librbd/Utils.h"
-#include "librbd/journal/Replay.h"
+#include "librbd/asio/ContextWQ.h"
+#include "ImageDeleter.h"
#include "ImageReplayer.h"
+#include "MirrorStatusUpdater.h"
#include "Threads.h"
#include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
-#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
-#include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
-#include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
-#include "tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h"
+#include "tools/rbd_mirror/image_replayer/ReplayerListener.h"
+#include "tools/rbd_mirror/image_replayer/StateBuilder.h"
+#include "tools/rbd_mirror/image_replayer/Utils.h"
+#include "tools/rbd_mirror/image_replayer/journal/Replayer.h"
+#include "tools/rbd_mirror/image_replayer/journal/StateBuilder.h"
+#include <map>
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rbd_mirror
#define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
<< __func__ << ": "
-using std::map;
-using std::string;
-using std::unique_ptr;
-using std::shared_ptr;
-using std::vector;
-
namespace rbd {
namespace mirror {
using librbd::util::create_context_callback;
-using librbd::util::create_rados_callback;
-using namespace rbd::mirror::image_replayer;
template <typename I>
std::ostream &operator<<(std::ostream &os,
namespace {
template <typename I>
-struct ReplayHandler : public ::journal::ReplayHandler {
- ImageReplayer<I> *replayer;
- ReplayHandler(ImageReplayer<I> *replayer) : replayer(replayer) {}
- void get() override {}
- void put() override {}
-
- void handle_entries_available() override {
- replayer->handle_replay_ready();
- }
- void handle_complete(int r) override {
- std::stringstream ss;
- if (r < 0) {
- ss << "replay completed with error: " << cpp_strerror(r);
- }
- replayer->handle_replay_complete(r, ss.str());
- }
-};
-
class ImageReplayerAdminSocketCommand {
public:
+ ImageReplayerAdminSocketCommand(const std::string &desc,
+ ImageReplayer<I> *replayer)
+ : desc(desc), replayer(replayer) {
+ }
virtual ~ImageReplayerAdminSocketCommand() {}
- virtual bool call(Formatter *f, stringstream *ss) = 0;
+ virtual int call(Formatter *f) = 0;
+
+ std::string desc;
+ ImageReplayer<I> *replayer;
+ bool registered = false;
};
template <typename I>
-class StatusCommand : public ImageReplayerAdminSocketCommand {
+class StatusCommand : public ImageReplayerAdminSocketCommand<I> {
public:
- explicit StatusCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
-
- bool call(Formatter *f, stringstream *ss) override {
- replayer->print_status(f, ss);
- return true;
+ explicit StatusCommand(const std::string &desc, ImageReplayer<I> *replayer)
+ : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
}
-private:
- ImageReplayer<I> *replayer;
+ int call(Formatter *f) override {
+ this->replayer->print_status(f);
+ return 0;
+ }
};
template <typename I>
-class StartCommand : public ImageReplayerAdminSocketCommand {
+class StartCommand : public ImageReplayerAdminSocketCommand<I> {
public:
- explicit StartCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
-
- bool call(Formatter *f, stringstream *ss) override {
- replayer->start(nullptr, true);
- return true;
+ explicit StartCommand(const std::string &desc, ImageReplayer<I> *replayer)
+ : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
}
-private:
- ImageReplayer<I> *replayer;
+ int call(Formatter *f) override {
+ this->replayer->start(nullptr, true);
+ return 0;
+ }
};
template <typename I>
-class StopCommand : public ImageReplayerAdminSocketCommand {
+class StopCommand : public ImageReplayerAdminSocketCommand<I> {
public:
- explicit StopCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
-
- bool call(Formatter *f, stringstream *ss) override {
- replayer->stop(nullptr, true);
- return true;
+ explicit StopCommand(const std::string &desc, ImageReplayer<I> *replayer)
+ : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
}
-private:
- ImageReplayer<I> *replayer;
+ int call(Formatter *f) override {
+ this->replayer->stop(nullptr, true);
+ return 0;
+ }
};
template <typename I>
-class RestartCommand : public ImageReplayerAdminSocketCommand {
+class RestartCommand : public ImageReplayerAdminSocketCommand<I> {
public:
- explicit RestartCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
-
- bool call(Formatter *f, stringstream *ss) override {
- replayer->restart();
- return true;
+ explicit RestartCommand(const std::string &desc, ImageReplayer<I> *replayer)
+ : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
}
-private:
- ImageReplayer<I> *replayer;
+ int call(Formatter *f) override {
+ this->replayer->restart();
+ return 0;
+ }
};
template <typename I>
-class FlushCommand : public ImageReplayerAdminSocketCommand {
+class FlushCommand : public ImageReplayerAdminSocketCommand<I> {
public:
- explicit FlushCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
-
- bool call(Formatter *f, stringstream *ss) override {
- C_SaferCond cond;
- replayer->flush(&cond);
- int r = cond.wait();
- if (r < 0) {
- *ss << "flush: " << cpp_strerror(r);
- return false;
- }
- return true;
+ explicit FlushCommand(const std::string &desc, ImageReplayer<I> *replayer)
+ : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
}
-private:
- ImageReplayer<I> *replayer;
+ int call(Formatter *f) override {
+ this->replayer->flush();
+ return 0;
+ }
};
template <typename I>
ImageReplayerAdminSocketHook(CephContext *cct, const std::string &name,
ImageReplayer<I> *replayer)
: admin_socket(cct->get_admin_socket()),
- lock("ImageReplayerAdminSocketHook::lock " +
- replayer->get_global_image_id()) {
- std::string command;
- int r;
-
- command = "rbd mirror status " + name;
- r = admin_socket->register_command(command, command, this,
- "get status for rbd mirror " + name);
- if (r == 0) {
- commands[command] = new StatusCommand<I>(replayer);
- }
-
- command = "rbd mirror start " + name;
- r = admin_socket->register_command(command, command, this,
- "start rbd mirror " + name);
- if (r == 0) {
- commands[command] = new StartCommand<I>(replayer);
- }
-
- command = "rbd mirror stop " + name;
- r = admin_socket->register_command(command, command, this,
- "stop rbd mirror " + name);
- if (r == 0) {
- commands[command] = new StopCommand<I>(replayer);
- }
-
- command = "rbd mirror restart " + name;
- r = admin_socket->register_command(command, command, this,
- "restart rbd mirror " + name);
- if (r == 0) {
- commands[command] = new RestartCommand<I>(replayer);
- }
-
- command = "rbd mirror flush " + name;
- r = admin_socket->register_command(command, command, this,
- "flush rbd mirror " + name);
- if (r == 0) {
- commands[command] = new FlushCommand<I>(replayer);
+ commands{{"rbd mirror flush " + name,
+ new FlushCommand<I>("flush rbd mirror " + name, replayer)},
+ {"rbd mirror restart " + name,
+ new RestartCommand<I>("restart rbd mirror " + name, replayer)},
+ {"rbd mirror start " + name,
+ new StartCommand<I>("start rbd mirror " + name, replayer)},
+ {"rbd mirror status " + name,
+ new StatusCommand<I>("get status for rbd mirror " + name, replayer)},
+ {"rbd mirror stop " + name,
+ new StopCommand<I>("stop rbd mirror " + name, replayer)}} {
+ }
+
+ int register_commands() {
+ for (auto &it : commands) {
+ int r = admin_socket->register_command(it.first, this,
+ it.second->desc);
+ if (r < 0) {
+ return r;
+ }
+ it.second->registered = true;
}
+ return 0;
}
~ImageReplayerAdminSocketHook() override {
- Mutex::Locker locker(lock);
- for (Commands::const_iterator i = commands.begin(); i != commands.end();
- ++i) {
- (void)admin_socket->unregister_command(i->first);
- delete i->second;
+ admin_socket->unregister_commands(this);
+ for (auto &it : commands) {
+ delete it.second;
}
commands.clear();
}
- bool call(std::string command, cmdmap_t& cmdmap, std::string format,
- bufferlist& out) override {
- Mutex::Locker locker(lock);
- Commands::const_iterator i = commands.find(command);
- assert(i != commands.end());
- Formatter *f = Formatter::create(format);
- stringstream ss;
- bool r = i->second->call(f, &ss);
- delete f;
- out.append(ss);
- return r;
+ int call(std::string_view command, const cmdmap_t& cmdmap,
+ Formatter *f,
+ std::ostream& errss,
+ bufferlist& out) override {
+ auto i = commands.find(command);
+ ceph_assert(i != commands.end());
+ return i->second->call(f);
}
private:
- typedef std::map<std::string, ImageReplayerAdminSocketCommand*> Commands;
+ typedef std::map<std::string, ImageReplayerAdminSocketCommand<I>*,
+ std::less<>> Commands;
AdminSocket *admin_socket;
- Mutex lock;
Commands commands;
};
-uint32_t calculate_replay_delay(const utime_t &event_time,
- int mirroring_replay_delay) {
- if (mirroring_replay_delay <= 0) {
- return 0;
- }
-
- utime_t now = ceph_clock_now();
- if (event_time + mirroring_replay_delay <= now) {
- return 0;
- }
-
- // ensure it is rounded up when converting to integer
- return (event_time + mirroring_replay_delay - now) + 1;
-}
-
} // anonymous namespace
template <typename I>
}
template <typename I>
-void ImageReplayer<I>::RemoteJournalerListener::handle_update(
- ::journal::JournalMetadata *) {
- FunctionContext *ctx = new FunctionContext([this](int r) {
- replayer->handle_remote_journal_metadata_updated();
- });
- replayer->m_threads->work_queue->queue(ctx, 0);
-}
+struct ImageReplayer<I>::ReplayerListener
+ : public image_replayer::ReplayerListener {
+ ImageReplayer<I>* image_replayer;
+
+ ReplayerListener(ImageReplayer<I>* image_replayer)
+ : image_replayer(image_replayer) {
+ }
+
+ void handle_notification() override {
+ image_replayer->handle_replayer_notification();
+ }
+};
template <typename I>
-ImageReplayer<I>::ImageReplayer(Threads<librbd::ImageCtx> *threads,
- shared_ptr<ImageDeleter> image_deleter,
- InstanceWatcher<I> *instance_watcher,
- RadosRef local,
- const std::string &local_mirror_uuid,
- int64_t local_pool_id,
- const std::string &global_image_id) :
- m_threads(threads),
- m_image_deleter(image_deleter),
+ImageReplayer<I>::ImageReplayer(
+ librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid,
+ const std::string &global_image_id, Threads<I> *threads,
+ InstanceWatcher<I> *instance_watcher,
+ MirrorStatusUpdater<I>* local_status_updater,
+ journal::CacheManagerHandler *cache_manager_handler,
+ PoolMetaCache* pool_meta_cache) :
+ m_local_io_ctx(local_io_ctx), m_local_mirror_uuid(local_mirror_uuid),
+ m_global_image_id(global_image_id), m_threads(threads),
m_instance_watcher(instance_watcher),
- m_local(local),
- m_local_mirror_uuid(local_mirror_uuid),
- m_local_pool_id(local_pool_id),
- m_global_image_id(global_image_id),
- m_lock("rbd::mirror::ImageReplayer " + stringify(local_pool_id) + " " +
- global_image_id),
+ m_local_status_updater(local_status_updater),
+ m_cache_manager_handler(cache_manager_handler),
+ m_pool_meta_cache(pool_meta_cache),
+ m_local_image_name(global_image_id),
+ m_lock(ceph::make_mutex("rbd::mirror::ImageReplayer " +
+ stringify(local_io_ctx.get_id()) + " " + global_image_id)),
m_progress_cxt(this),
- m_journal_listener(new JournalListener(this)),
- m_remote_listener(this)
+ m_replayer_listener(new ReplayerListener(this))
{
// Register asok commands using a temporary "remote_pool_name/global_image_id"
// name. When the image name becomes known on start the asok commands will be
// re-registered using "remote_pool_name/remote_image_name" name.
- std::string pool_name;
- int r = m_local->pool_reverse_lookup(m_local_pool_id, &pool_name);
- if (r < 0) {
- derr << "error resolving local pool " << m_local_pool_id
- << ": " << cpp_strerror(r) << dendl;
- pool_name = stringify(m_local_pool_id);
- }
-
- m_name = pool_name + "/" + m_global_image_id;
- dout(20) << "registered asok hook: " << m_name << dendl;
- m_asok_hook = new ImageReplayerAdminSocketHook<I>(g_ceph_context, m_name,
- this);
+ m_image_spec = image_replayer::util::compute_image_spec(
+ local_io_ctx, global_image_id);
+ register_admin_socket_hook();
}
template <typename I>
ImageReplayer<I>::~ImageReplayer()
{
- assert(m_event_preprocessor == nullptr);
- assert(m_replay_status_formatter == nullptr);
- assert(m_local_image_ctx == nullptr);
- assert(m_local_replay == nullptr);
- assert(m_remote_journaler == nullptr);
- assert(m_replay_handler == nullptr);
- assert(m_on_start_finish == nullptr);
- assert(m_on_stop_finish == nullptr);
- assert(m_bootstrap_request == nullptr);
- assert(m_in_flight_status_updates == 0);
-
- delete m_journal_listener;
- delete m_asok_hook;
+ unregister_admin_socket_hook();
+ ceph_assert(m_state_builder == nullptr);
+ ceph_assert(m_on_start_finish == nullptr);
+ ceph_assert(m_on_stop_contexts.empty());
+ ceph_assert(m_bootstrap_request == nullptr);
+ ceph_assert(m_update_status_task == nullptr);
+ delete m_replayer_listener;
}
template <typename I>
-void ImageReplayer<I>::add_remote_image(const std::string &mirror_uuid,
- const std::string &image_id,
- librados::IoCtx &io_ctx) {
- Mutex::Locker locker(m_lock);
+image_replayer::HealthState ImageReplayer<I>::get_health_state() const {
+ std::lock_guard locker{m_lock};
- RemoteImage remote_image(mirror_uuid, image_id, io_ctx);
- auto it = m_remote_images.find(remote_image);
- if (it == m_remote_images.end()) {
- m_remote_images.insert(remote_image);
+ if (!m_mirror_image_status_state) {
+ return image_replayer::HEALTH_STATE_OK;
+ } else if (*m_mirror_image_status_state ==
+ cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING ||
+ *m_mirror_image_status_state ==
+ cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN) {
+ return image_replayer::HEALTH_STATE_WARNING;
}
+ return image_replayer::HEALTH_STATE_ERROR;
}
template <typename I>
-void ImageReplayer<I>::remove_remote_image(const std::string &mirror_uuid,
- const std::string &image_id,
- bool schedule_delete) {
- Mutex::Locker locker(m_lock);
- m_remote_images.erase({mirror_uuid, image_id});
-}
+void ImageReplayer<I>::add_peer(const Peer<I>& peer) {
+ dout(10) << "peer=" << peer << dendl;
-template <typename I>
-bool ImageReplayer<I>::remote_images_empty() const {
- Mutex::Locker locker(m_lock);
- return m_remote_images.empty();
+ std::lock_guard locker{m_lock};
+ auto it = m_peers.find(peer);
+ if (it == m_peers.end()) {
+ m_peers.insert(peer);
+ }
}
template <typename I>
void ImageReplayer<I>::set_state_description(int r, const std::string &desc) {
- dout(20) << r << " " << desc << dendl;
+ dout(10) << "r=" << r << ", desc=" << desc << dendl;
- Mutex::Locker l(m_lock);
+ std::lock_guard l{m_lock};
m_last_r = r;
m_state_desc = desc;
}
template <typename I>
-void ImageReplayer<I>::start(Context *on_finish, bool manual)
+void ImageReplayer<I>::start(Context *on_finish, bool manual, bool restart)
{
- dout(20) << "on_finish=" << on_finish << dendl;
+ dout(10) << "on_finish=" << on_finish << dendl;
int r = 0;
{
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
if (!is_stopped_()) {
derr << "already running" << dendl;
r = -EINVAL;
dout(5) << "stopped manually, ignoring start without manual flag"
<< dendl;
r = -EPERM;
+ } else if (restart && !m_restart_requested) {
+ dout(10) << "canceled restart" << dendl;
+ r = -ECANCELED;
} else {
m_state = STATE_STARTING;
m_last_r = 0;
m_state_desc.clear();
m_manual_stop = false;
+ m_delete_requested = false;
+ m_restart_requested = false;
+ m_status_removed = false;
if (on_finish != nullptr) {
- assert(m_on_start_finish == nullptr);
+ ceph_assert(m_on_start_finish == nullptr);
m_on_start_finish = on_finish;
}
- assert(m_on_stop_finish == nullptr);
+ ceph_assert(m_on_stop_contexts.empty());
}
}
return;
}
- r = m_local->ioctx_create2(m_local_pool_id, m_local_ioctx);
- if (r < 0) {
- derr << "error opening ioctx for local pool " << m_local_pool_id
- << ": " << cpp_strerror(r) << dendl;
- on_start_fail(r, "error opening local pool");
- return;
- }
-
- prepare_local_image();
+ bootstrap();
}
template <typename I>
-void ImageReplayer<I>::prepare_local_image() {
- dout(20) << dendl;
-
- Context *ctx = create_context_callback<
- ImageReplayer, &ImageReplayer<I>::handle_prepare_local_image>(this);
- auto req = PrepareLocalImageRequest<I>::create(
- m_local_ioctx, m_global_image_id, &m_local_image_id,
- &m_local_image_tag_owner, m_threads->work_queue, ctx);
- req->send();
-}
+void ImageReplayer<I>::bootstrap() {
+ dout(10) << dendl;
-template <typename I>
-void ImageReplayer<I>::handle_prepare_local_image(int r) {
- dout(20) << "r=" << r << dendl;
+ std::unique_lock locker{m_lock};
+ if (m_peers.empty()) {
+ locker.unlock();
- if (r == -ENOENT) {
- dout(20) << "local image does not exist" << dendl;
- } else if (r < 0) {
- on_start_fail(r, "error preparing local image for replay");
- return;
- } else if (m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) {
- dout(5) << "local image is primary" << dendl;
- on_start_fail(0, "local image is primary");
+ dout(5) << "no peer clusters" << dendl;
+ on_start_fail(-ENOENT, "no peer clusters");
return;
}
- // local image doesn't exist or is non-primary
- bootstrap();
-}
-
-template <typename I>
-void ImageReplayer<I>::bootstrap() {
- dout(20) << dendl;
+ // TODO need to support multiple remote images
+ ceph_assert(!m_peers.empty());
+ m_remote_image_peer = *m_peers.begin();
- if (m_remote_images.empty()) {
- on_start_fail(0, "waiting for primary remote image");
+ if (on_start_interrupted(m_lock)) {
return;
}
- // TODO bootstrap will need to support multiple remote images
- m_remote_image = *m_remote_images.begin();
+ ceph_assert(m_state_builder == nullptr);
+ auto ctx = create_context_callback<
+ ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
+ auto request = image_replayer::BootstrapRequest<I>::create(
+ m_threads, m_local_io_ctx, m_remote_image_peer.io_ctx, m_instance_watcher,
+ m_global_image_id, m_local_mirror_uuid,
+ m_remote_image_peer.remote_pool_meta, m_cache_manager_handler,
+ m_pool_meta_cache, &m_progress_cxt, &m_state_builder, &m_resync_requested,
+ ctx);
- CephContext *cct = static_cast<CephContext *>(m_local->cct());
- journal::Settings settings;
- settings.commit_interval = cct->_conf->rbd_mirror_journal_commit_age;
- settings.max_fetch_bytes = cct->_conf->rbd_mirror_journal_max_fetch_bytes;
-
- m_remote_journaler = new Journaler(m_threads->work_queue,
- m_threads->timer,
- &m_threads->timer_lock,
- m_remote_image.io_ctx,
- m_remote_image.image_id,
- m_local_mirror_uuid, settings);
-
- Context *ctx = create_context_callback<
- ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
-
- BootstrapRequest<I> *request = BootstrapRequest<I>::create(
- m_local_ioctx, m_remote_image.io_ctx, m_instance_watcher,
- &m_local_image_ctx, m_local_image_id, m_remote_image.image_id,
- m_global_image_id, m_threads->work_queue, m_threads->timer,
- &m_threads->timer_lock, m_local_mirror_uuid, m_remote_image.mirror_uuid,
- m_remote_journaler, &m_client_meta, ctx, &m_do_resync, &m_progress_cxt);
-
- {
- Mutex::Locker locker(m_lock);
- request->get();
- m_bootstrap_request = request;
- }
+ request->get();
+ m_bootstrap_request = request;
+ locker.unlock();
update_mirror_image_status(false, boost::none);
- reschedule_update_status_task(10);
-
request->send();
}
template <typename I>
void ImageReplayer<I>::handle_bootstrap(int r) {
- dout(20) << "r=" << r << dendl;
+ dout(10) << "r=" << r << dendl;
{
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
m_bootstrap_request->put();
m_bootstrap_request = nullptr;
- if (m_local_image_ctx) {
- m_local_image_id = m_local_image_ctx->id;
- }
}
- if (r == -EREMOTEIO) {
- m_local_image_tag_owner = "";
- dout(5) << "remote image is non-primary or local image is primary" << dendl;
- on_start_fail(0, "remote image is non-primary or local image is primary");
+ if (on_start_interrupted()) {
+ return;
+ } else if (r == -ENOMSG) {
+ dout(5) << "local image is primary" << dendl;
+ on_start_fail(0, "local image is primary");
+ return;
+ } else if (r == -EREMOTEIO) {
+ dout(5) << "remote image is non-primary" << dendl;
+ on_start_fail(-EREMOTEIO, "remote image is non-primary");
return;
} else if (r == -EEXIST) {
- m_local_image_tag_owner = "";
on_start_fail(r, "split-brain detected");
return;
+ } else if (r == -ENOLINK) {
+ m_delete_requested = true;
+ on_start_fail(0, "remote image no longer exists");
+ return;
+ } else if (r == -ERESTART) {
+ on_start_fail(r, "image in transient state, try again");
+ return;
} else if (r < 0) {
on_start_fail(r, "error bootstrapping replay");
return;
- } else if (on_start_interrupted()) {
- return;
- }
-
- assert(m_local_journal == nullptr);
- {
- RWLock::RLocker snap_locker(m_local_image_ctx->snap_lock);
- if (m_local_image_ctx->journal != nullptr) {
- m_local_journal = m_local_image_ctx->journal;
- m_local_journal->add_listener(m_journal_listener);
- }
- }
-
- if (m_local_journal == nullptr) {
- on_start_fail(-EINVAL, "error accessing local journal");
+ } else if (m_resync_requested) {
+ on_start_fail(0, "resync requested");
return;
}
- {
- Mutex::Locker locker(m_lock);
-
- if (m_do_resync) {
- Context *on_finish = m_on_start_finish;
- m_stopping_for_resync = true;
- FunctionContext *ctx = new FunctionContext([this, on_finish](int r) {
- if (r < 0) {
- if (on_finish) {
- on_finish->complete(r);
- }
- return;
- }
- resync_image(on_finish);
- });
- m_on_start_finish = ctx;
- }
-
- std::string name = m_local_ioctx.get_pool_name() + "/" +
- m_local_image_ctx->name;
- if (m_name != name) {
- m_name = name;
- if (m_asok_hook) {
- // Re-register asok commands using the new name.
- delete m_asok_hook;
- m_asok_hook = nullptr;
- }
- }
- if (!m_asok_hook) {
- dout(20) << "registered asok hook: " << m_name << dendl;
- m_asok_hook = new ImageReplayerAdminSocketHook<I>(g_ceph_context, m_name,
- this);
- }
- }
-
- update_mirror_image_status(false, boost::none);
- init_remote_journaler();
+ start_replay();
}
template <typename I>
-void ImageReplayer<I>::init_remote_journaler() {
- dout(20) << dendl;
-
- Context *ctx = create_context_callback<
- ImageReplayer, &ImageReplayer<I>::handle_init_remote_journaler>(this);
- m_remote_journaler->init(ctx);
+void ImageReplayer<I>::start_replay() {
+ dout(10) << dendl;
+
+ std::unique_lock locker{m_lock};
+ ceph_assert(m_replayer == nullptr);
+ m_replayer = m_state_builder->create_replayer(m_threads, m_instance_watcher,
+ m_local_mirror_uuid,
+ m_pool_meta_cache,
+ m_replayer_listener);
+
+ auto ctx = create_context_callback<
+ ImageReplayer<I>, &ImageReplayer<I>::handle_start_replay>(this);
+ m_replayer->init(ctx);
}
template <typename I>
-void ImageReplayer<I>::handle_init_remote_journaler(int r) {
- dout(20) << "r=" << r << dendl;
-
- if (r < 0) {
- derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl;
- on_start_fail(r, "error initializing remote journal");
- return;
- } else if (on_start_interrupted()) {
- return;
- }
-
- m_remote_journaler->add_listener(&m_remote_listener);
+void ImageReplayer<I>::handle_start_replay(int r) {
+ dout(10) << "r=" << r << dendl;
- cls::journal::Client client;
- r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
- if (r < 0) {
- derr << "error retrieving remote journal client: " << cpp_strerror(r)
- << dendl;
- on_start_fail(r, "error retrieving remote journal client");
+ if (on_start_interrupted()) {
return;
- }
-
- if (client.state != cls::journal::CLIENT_STATE_CONNECTED) {
- dout(5) << "client flagged disconnected, stopping image replay" << dendl;
- if (m_local_image_ctx->mirroring_resync_after_disconnect) {
- Mutex::Locker locker(m_lock);
- m_stopping_for_resync = true;
+ } else if (r < 0) {
+ std::string error_description = m_replayer->get_error_description();
+ if (r == -ENOTCONN && m_replayer->is_resync_requested()) {
+ std::unique_lock locker{m_lock};
+ m_resync_requested = true;
}
- on_start_fail(-ENOTCONN, "disconnected");
- return;
- }
-
- start_replay();
-}
-template <typename I>
-void ImageReplayer<I>::start_replay() {
- dout(20) << dendl;
-
- Context *start_ctx = create_context_callback<
- ImageReplayer, &ImageReplayer<I>::handle_start_replay>(this);
- m_local_journal->start_external_replay(&m_local_replay, start_ctx);
-}
+ // shut down not required if init failed
+ m_replayer->destroy();
+ m_replayer = nullptr;
-template <typename I>
-void ImageReplayer<I>::handle_start_replay(int r) {
- dout(20) << "r=" << r << dendl;
-
- if (r < 0) {
- assert(m_local_replay == nullptr);
- derr << "error starting external replay on local image "
- << m_local_image_id << ": " << cpp_strerror(r) << dendl;
- on_start_fail(r, "error starting replay on local image");
+ derr << "error starting replay: " << cpp_strerror(r) << dendl;
+ on_start_fail(r, error_description);
return;
}
- Context *on_finish(nullptr);
+ Context *on_finish = nullptr;
{
- Mutex::Locker locker(m_lock);
- assert(m_state == STATE_STARTING);
+ std::unique_lock locker{m_lock};
+ ceph_assert(m_state == STATE_STARTING);
m_state = STATE_REPLAYING;
std::swap(m_on_start_finish, on_finish);
- }
-
- m_event_preprocessor = EventPreprocessor<I>::create(
- *m_local_image_ctx, *m_remote_journaler, m_local_mirror_uuid,
- &m_client_meta, m_threads->work_queue);
- m_replay_status_formatter =
- ReplayStatusFormatter<I>::create(m_remote_journaler, m_local_mirror_uuid);
-
- update_mirror_image_status(true, boost::none);
- reschedule_update_status_task(30);
- dout(20) << "start succeeded" << dendl;
- if (on_finish != nullptr) {
- dout(20) << "on finish complete, r=" << r << dendl;
- on_finish->complete(r);
+ std::unique_lock timer_locker{m_threads->timer_lock};
+ schedule_update_mirror_image_replay_status();
}
+ update_mirror_image_status(true, boost::none);
if (on_replay_interrupted()) {
+ if (on_finish != nullptr) {
+ on_finish->complete(r);
+ }
return;
}
- {
- CephContext *cct = static_cast<CephContext *>(m_local->cct());
- double poll_seconds = cct->_conf->rbd_mirror_journal_poll_age;
-
- Mutex::Locker locker(m_lock);
- m_replay_handler = new ReplayHandler<I>(this);
- m_remote_journaler->start_live_replay(m_replay_handler, poll_seconds);
-
- dout(20) << "m_remote_journaler=" << *m_remote_journaler << dendl;
+ dout(10) << "start succeeded" << dendl;
+ if (on_finish != nullptr) {
+ dout(10) << "on finish complete, r=" << r << dendl;
+ on_finish->complete(r);
}
-
}
template <typename I>
void ImageReplayer<I>::on_start_fail(int r, const std::string &desc)
{
- dout(20) << "r=" << r << dendl;
- Context *ctx = new FunctionContext([this, r, desc](int _r) {
+ dout(10) << "r=" << r << ", desc=" << desc << dendl;
+ Context *ctx = new LambdaContext([this, r, desc](int _r) {
{
- Mutex::Locker locker(m_lock);
- assert(m_state == STATE_STARTING);
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_state == STATE_STARTING);
m_state = STATE_STOPPING;
- if (r < 0 && r != -ECANCELED) {
+ if (r < 0 && r != -ECANCELED && r != -EREMOTEIO && r != -ENOENT) {
derr << "start failed: " << cpp_strerror(r) << dendl;
} else {
- dout(20) << "start canceled" << dendl;
+ dout(10) << "start canceled" << dendl;
}
}
set_state_description(r, desc);
update_mirror_image_status(false, boost::none);
- reschedule_update_status_task(-1);
shut_down(r);
});
m_threads->work_queue->queue(ctx, 0);
}
template <typename I>
-bool ImageReplayer<I>::on_start_interrupted()
-{
- Mutex::Locker locker(m_lock);
- assert(m_state == STATE_STARTING);
- if (m_on_stop_finish == nullptr) {
+bool ImageReplayer<I>::on_start_interrupted() {
+ std::lock_guard locker{m_lock};
+ return on_start_interrupted(m_lock);
+}
+
+template <typename I>
+bool ImageReplayer<I>::on_start_interrupted(ceph::mutex& lock) {
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+ ceph_assert(m_state == STATE_STARTING);
+ if (!m_stop_requested) {
return false;
}
- on_start_fail(-ECANCELED);
+ on_start_fail(-ECANCELED, "");
return true;
}
template <typename I>
-void ImageReplayer<I>::stop(Context *on_finish, bool manual, int r,
- const std::string& desc)
+void ImageReplayer<I>::stop(Context *on_finish, bool manual, bool restart)
{
- dout(20) << "on_finish=" << on_finish << ", manual=" << manual
- << ", desc=" << desc << dendl;
+ dout(10) << "on_finish=" << on_finish << ", manual=" << manual
+ << ", restart=" << restart << dendl;
image_replayer::BootstrapRequest<I> *bootstrap_request = nullptr;
bool shut_down_replay = false;
- bool running = true;
- bool canceled_task = false;
+ bool is_stopped = false;
{
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
if (!is_running_()) {
- running = false;
+ if (manual && !m_manual_stop) {
+ dout(10) << "marking manual" << dendl;
+ m_manual_stop = true;
+ }
+ if (!restart && m_restart_requested) {
+ dout(10) << "canceling restart" << dendl;
+ m_restart_requested = false;
+ }
+ if (is_stopped_()) {
+ dout(10) << "already stopped" << dendl;
+ is_stopped = true;
+ } else {
+ dout(10) << "joining in-flight stop" << dendl;
+ if (on_finish != nullptr) {
+ m_on_stop_contexts.push_back(on_finish);
+ }
+ }
} else {
- if (!is_stopped_()) {
- if (m_state == STATE_STARTING) {
- dout(20) << "canceling start" << dendl;
- if (m_bootstrap_request) {
- bootstrap_request = m_bootstrap_request;
- bootstrap_request->get();
- }
- } else {
- dout(20) << "interrupting replay" << dendl;
- shut_down_replay = true;
- }
-
- assert(m_on_stop_finish == nullptr);
- std::swap(m_on_stop_finish, on_finish);
- m_stop_requested = true;
- m_manual_stop = manual;
-
- Mutex::Locker timer_locker(m_threads->timer_lock);
- if (m_delayed_preprocess_task != nullptr) {
- canceled_task = m_threads->timer->cancel_event(
- m_delayed_preprocess_task);
- assert(canceled_task);
- m_delayed_preprocess_task = nullptr;
+ if (m_state == STATE_STARTING) {
+ dout(10) << "canceling start" << dendl;
+ if (m_bootstrap_request != nullptr) {
+ bootstrap_request = m_bootstrap_request;
+ bootstrap_request->get();
}
+ } else {
+ dout(10) << "interrupting replay" << dendl;
+ shut_down_replay = true;
}
- }
- }
- // avoid holding lock since bootstrap request will update status
- if (bootstrap_request != nullptr) {
- bootstrap_request->cancel();
- bootstrap_request->put();
- }
-
- if (canceled_task) {
- m_event_replay_tracker.finish_op();
- on_replay_interrupted();
+ ceph_assert(m_on_stop_contexts.empty());
+ if (on_finish != nullptr) {
+ m_on_stop_contexts.push_back(on_finish);
+ }
+ m_stop_requested = true;
+ m_manual_stop = manual;
+ }
}
- if (!running) {
- dout(20) << "not running" << dendl;
+ if (is_stopped) {
if (on_finish) {
on_finish->complete(-EINVAL);
}
return;
}
+ // avoid holding lock since bootstrap request will update status
+ if (bootstrap_request != nullptr) {
+ dout(10) << "canceling bootstrap" << dendl;
+ bootstrap_request->cancel();
+ bootstrap_request->put();
+ }
+
if (shut_down_replay) {
- on_stop_journal_replay(r, desc);
- } else if (on_finish != nullptr) {
- on_finish->complete(0);
+ on_stop_journal_replay();
}
}
template <typename I>
void ImageReplayer<I>::on_stop_journal_replay(int r, const std::string &desc)
{
- dout(20) << "enter" << dendl;
+ dout(10) << dendl;
{
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
if (m_state != STATE_REPLAYING) {
// might be invoked multiple times while stopping
return;
}
+
m_stop_requested = true;
m_state = STATE_STOPPING;
}
+ cancel_update_mirror_image_replay_status();
set_state_description(r, desc);
- update_mirror_image_status(false, boost::none);
- reschedule_update_status_task(-1);
+ update_mirror_image_status(true, boost::none);
shut_down(0);
}
template <typename I>
-void ImageReplayer<I>::handle_replay_ready()
+void ImageReplayer<I>::restart(Context *on_finish)
{
- dout(20) << "enter" << dendl;
- if (on_replay_interrupted()) {
- return;
- }
-
- if (!m_remote_journaler->try_pop_front(&m_replay_entry, &m_replay_tag_tid)) {
- return;
- }
-
- m_event_replay_tracker.start_op();
-
- m_lock.Lock();
- bool stopping = (m_state == STATE_STOPPING);
- m_lock.Unlock();
-
- if (stopping) {
- dout(10) << "stopping event replay" << dendl;
- m_event_replay_tracker.finish_op();
- return;
- }
-
- if (m_replay_tag_valid && m_replay_tag.tid == m_replay_tag_tid) {
- preprocess_entry();
- return;
+ {
+ std::lock_guard locker{m_lock};
+ m_restart_requested = true;
}
- replay_flush();
-}
-
-template <typename I>
-void ImageReplayer<I>::restart(Context *on_finish)
-{
- FunctionContext *ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
[this, on_finish](int r) {
if (r < 0) {
// Try start anyway.
}
- start(on_finish, true);
+ start(on_finish, true, true);
});
- stop(ctx);
+ stop(ctx, false, true);
}
template <typename I>
-void ImageReplayer<I>::flush(Context *on_finish)
+void ImageReplayer<I>::flush()
{
- dout(20) << "enter" << dendl;
+ C_SaferCond ctx;
{
- Mutex::Locker locker(m_lock);
- if (m_state == STATE_REPLAYING) {
- Context *ctx = new FunctionContext(
- [on_finish](int r) {
- if (on_finish != nullptr) {
- on_finish->complete(r);
- }
- });
- on_flush_local_replay_flush_start(ctx);
+ std::unique_lock locker{m_lock};
+ if (m_state != STATE_REPLAYING) {
return;
}
- }
-
- if (on_finish) {
- on_finish->complete(0);
- }
-}
-
-template <typename I>
-void ImageReplayer<I>::on_flush_local_replay_flush_start(Context *on_flush)
-{
- dout(20) << "enter" << dendl;
- FunctionContext *ctx = new FunctionContext(
- [this, on_flush](int r) {
- on_flush_local_replay_flush_finish(on_flush, r);
- });
-
- assert(m_lock.is_locked());
- assert(m_state == STATE_REPLAYING);
- m_local_replay->flush(ctx);
-}
-template <typename I>
-void ImageReplayer<I>::on_flush_local_replay_flush_finish(Context *on_flush,
- int r)
-{
- dout(20) << "r=" << r << dendl;
- if (r < 0) {
- derr << "error flushing local replay: " << cpp_strerror(r) << dendl;
- on_flush->complete(r);
- return;
+ dout(10) << dendl;
+ ceph_assert(m_replayer != nullptr);
+ m_replayer->flush(&ctx);
}
- on_flush_flush_commit_position_start(on_flush);
-}
-
-template <typename I>
-void ImageReplayer<I>::on_flush_flush_commit_position_start(Context *on_flush)
-{
- FunctionContext *ctx = new FunctionContext(
- [this, on_flush](int r) {
- on_flush_flush_commit_position_finish(on_flush, r);
- });
-
- m_remote_journaler->flush_commit_position(ctx);
-}
-
-template <typename I>
-void ImageReplayer<I>::on_flush_flush_commit_position_finish(Context *on_flush,
- int r)
-{
- if (r < 0) {
- derr << "error flushing remote journal commit position: "
- << cpp_strerror(r) << dendl;
+ int r = ctx.wait();
+ if (r >= 0) {
+ update_mirror_image_status(false, boost::none);
}
-
- update_mirror_image_status(false, boost::none);
-
- dout(20) << "flush complete, r=" << r << dendl;
- on_flush->complete(r);
}
template <typename I>
{
bool shut_down;
{
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
shut_down = m_stop_requested;
}
}
template <typename I>
-void ImageReplayer<I>::print_status(Formatter *f, stringstream *ss)
+void ImageReplayer<I>::print_status(Formatter *f)
{
- dout(20) << "enter" << dendl;
-
- Mutex::Locker l(m_lock);
-
- if (f) {
- f->open_object_section("image_replayer");
- f->dump_string("name", m_name);
- f->dump_string("state", to_string(m_state));
- f->close_section();
- f->flush(*ss);
- } else {
- *ss << m_name << ": state: " << to_string(m_state);
- }
-}
-
-template <typename I>
-void ImageReplayer<I>::handle_replay_complete(int r, const std::string &error_desc)
-{
- dout(20) << "r=" << r << dendl;
- if (r < 0) {
- derr << "replay encountered an error: " << cpp_strerror(r) << dendl;
- set_state_description(r, error_desc);
- }
+ dout(10) << dendl;
- {
- Mutex::Locker locker(m_lock);
- m_stop_requested = true;
- }
- on_replay_interrupted();
-}
-
-template <typename I>
-void ImageReplayer<I>::replay_flush() {
- dout(20) << dendl;
-
- bool interrupted = false;
- {
- Mutex::Locker locker(m_lock);
- if (m_state != STATE_REPLAYING) {
- dout(20) << "replay interrupted" << dendl;
- interrupted = true;
- } else {
- m_state = STATE_REPLAY_FLUSHING;
- }
- }
-
- if (interrupted) {
- m_event_replay_tracker.finish_op();
- return;
- }
-
- // shut down the replay to flush all IO and ops and create a new
- // replayer to handle the new tag epoch
- Context *ctx = create_context_callback<
- ImageReplayer<I>, &ImageReplayer<I>::handle_replay_flush>(this);
- ctx = new FunctionContext([this, ctx](int r) {
- m_local_image_ctx->journal->stop_external_replay();
- m_local_replay = nullptr;
-
- if (r < 0) {
- ctx->complete(r);
- return;
- }
-
- m_local_journal->start_external_replay(&m_local_replay, ctx);
- });
- m_local_replay->shut_down(false, ctx);
-}
-
-template <typename I>
-void ImageReplayer<I>::handle_replay_flush(int r) {
- dout(20) << "r=" << r << dendl;
-
- {
- Mutex::Locker locker(m_lock);
- assert(m_state == STATE_REPLAY_FLUSHING);
- m_state = STATE_REPLAYING;
- }
-
- if (r < 0) {
- derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl;
- m_event_replay_tracker.finish_op();
- handle_replay_complete(r, "replay flush encountered an error");
- return;
- } else if (on_replay_interrupted()) {
- m_event_replay_tracker.finish_op();
- return;
- }
-
- get_remote_tag();
-}
-
-template <typename I>
-void ImageReplayer<I>::get_remote_tag() {
- dout(20) << "tag_tid: " << m_replay_tag_tid << dendl;
+ std::lock_guard l{m_lock};
- Context *ctx = create_context_callback<
- ImageReplayer, &ImageReplayer<I>::handle_get_remote_tag>(this);
- m_remote_journaler->get_tag(m_replay_tag_tid, &m_replay_tag, ctx);
+ f->open_object_section("image_replayer");
+ f->dump_string("name", m_image_spec);
+ f->dump_string("state", to_string(m_state));
+ f->close_section();
}
template <typename I>
-void ImageReplayer<I>::handle_get_remote_tag(int r) {
- dout(20) << "r=" << r << dendl;
-
- if (r == 0) {
- try {
- bufferlist::iterator it = m_replay_tag.data.begin();
- ::decode(m_replay_tag_data, it);
- } catch (const buffer::error &err) {
- r = -EBADMSG;
- }
- }
-
- if (r < 0) {
- derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": "
- << cpp_strerror(r) << dendl;
- m_event_replay_tracker.finish_op();
- handle_replay_complete(r, "failed to retrieve remote tag");
+void ImageReplayer<I>::schedule_update_mirror_image_replay_status() {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+ ceph_assert(ceph_mutex_is_locked_by_me(m_threads->timer_lock));
+ if (m_state != STATE_REPLAYING) {
return;
}
- m_replay_tag_valid = true;
- dout(20) << "decoded remote tag " << m_replay_tag_tid << ": "
- << m_replay_tag_data << dendl;
+ dout(10) << dendl;
- allocate_local_tag();
+ // periodically update the replaying status even if nothing changes
+ // so that we can adjust our performance stats
+ ceph_assert(m_update_status_task == nullptr);
+ m_update_status_task = create_context_callback<
+ ImageReplayer<I>,
+ &ImageReplayer<I>::handle_update_mirror_image_replay_status>(this);
+ m_threads->timer->add_event_after(10, m_update_status_task);
}
template <typename I>
-void ImageReplayer<I>::allocate_local_tag() {
- dout(20) << dendl;
-
- std::string mirror_uuid = m_replay_tag_data.mirror_uuid;
- if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID ||
- mirror_uuid == m_local_mirror_uuid) {
- mirror_uuid = m_remote_image.mirror_uuid;
- } else if (mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) {
- dout(5) << "encountered image demotion: stopping" << dendl;
- Mutex::Locker locker(m_lock);
- m_stop_requested = true;
- }
+void ImageReplayer<I>::handle_update_mirror_image_replay_status(int r) {
+ dout(10) << dendl;
- librbd::journal::TagPredecessor predecessor(m_replay_tag_data.predecessor);
- if (predecessor.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
- predecessor.mirror_uuid = m_remote_image.mirror_uuid;
- } else if (predecessor.mirror_uuid == m_local_mirror_uuid) {
- predecessor.mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
- }
+ ceph_assert(ceph_mutex_is_locked_by_me(m_threads->timer_lock));
- dout(20) << "mirror_uuid=" << mirror_uuid << ", "
- << "predecessor_mirror_uuid=" << predecessor.mirror_uuid << ", "
- << "replay_tag_tid=" << m_replay_tag_tid << ", "
- << "replay_tag_data=" << m_replay_tag_data << dendl;
- Context *ctx = create_context_callback<
- ImageReplayer, &ImageReplayer<I>::handle_allocate_local_tag>(this);
- m_local_journal->allocate_tag(mirror_uuid, predecessor, ctx);
-}
+ ceph_assert(m_update_status_task != nullptr);
+ m_update_status_task = nullptr;
-template <typename I>
-void ImageReplayer<I>::handle_allocate_local_tag(int r) {
- dout(20) << "r=" << r << dendl;
-
- if (r < 0) {
- derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl;
- m_event_replay_tracker.finish_op();
- handle_replay_complete(r, "failed to allocate journal tag");
- return;
- }
-
- preprocess_entry();
-}
-
-template <typename I>
-void ImageReplayer<I>::preprocess_entry() {
- dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid()
- << dendl;
-
- bufferlist data = m_replay_entry.get_data();
- bufferlist::iterator it = data.begin();
- int r = m_local_replay->decode(&it, &m_event_entry);
- if (r < 0) {
- derr << "failed to decode journal event" << dendl;
- m_event_replay_tracker.finish_op();
- handle_replay_complete(r, "failed to decode journal event");
- return;
- }
-
- uint32_t delay = calculate_replay_delay(
- m_event_entry.timestamp, m_local_image_ctx->mirroring_replay_delay);
- if (delay == 0) {
- handle_preprocess_entry_ready(0);
- return;
- }
+ auto ctx = new LambdaContext([this](int) {
+ update_mirror_image_status(false, boost::none);
- dout(20) << "delaying replay by " << delay << " sec" << dendl;
+ std::unique_lock locker{m_lock};
+ std::unique_lock timer_locker{m_threads->timer_lock};
- Mutex::Locker timer_locker(m_threads->timer_lock);
- assert(m_delayed_preprocess_task == nullptr);
- m_delayed_preprocess_task = new FunctionContext(
- [this](int r) {
- assert(m_threads->timer_lock.is_locked());
- m_delayed_preprocess_task = nullptr;
- m_threads->work_queue->queue(
- create_context_callback<ImageReplayer,
- &ImageReplayer<I>::handle_preprocess_entry_ready>(this), 0);
+ schedule_update_mirror_image_replay_status();
+ m_in_flight_op_tracker.finish_op();
});
- m_threads->timer->add_event_after(delay, m_delayed_preprocess_task);
-}
-
-template <typename I>
-void ImageReplayer<I>::handle_preprocess_entry_ready(int r) {
- dout(20) << "r=" << r << dendl;
- assert(r == 0);
-
- if (!m_event_preprocessor->is_required(m_event_entry)) {
- process_entry();
- return;
- }
-
- Context *ctx = create_context_callback<
- ImageReplayer, &ImageReplayer<I>::handle_preprocess_entry_safe>(this);
- m_event_preprocessor->preprocess(&m_event_entry, ctx);
-}
-
-template <typename I>
-void ImageReplayer<I>::handle_preprocess_entry_safe(int r) {
- dout(20) << "r=" << r << dendl;
-
- if (r < 0) {
- m_event_replay_tracker.finish_op();
-
- if (r == -ECANCELED) {
- handle_replay_complete(0, "lost exclusive lock");
- } else {
- derr << "failed to preprocess journal event" << dendl;
- handle_replay_complete(r, "failed to preprocess journal event");
- }
- return;
- }
-
- process_entry();
-}
-
-template <typename I>
-void ImageReplayer<I>::process_entry() {
- dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid()
- << dendl;
-
- // stop replaying events if stop has been requested
- if (on_replay_interrupted()) {
- m_event_replay_tracker.finish_op();
- return;
- }
-
- Context *on_ready = create_context_callback<
- ImageReplayer, &ImageReplayer<I>::handle_process_entry_ready>(this);
- Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry));
-
- m_local_replay->process(m_event_entry, on_ready, on_commit);
-}
-
-template <typename I>
-void ImageReplayer<I>::handle_process_entry_ready(int r) {
- dout(20) << dendl;
- assert(r == 0);
- // attempt to process the next event
- handle_replay_ready();
+ m_in_flight_op_tracker.start_op();
+ m_threads->work_queue->queue(ctx, 0);
}
template <typename I>
-void ImageReplayer<I>::handle_process_entry_safe(const ReplayEntry& replay_entry,
- int r) {
- dout(20) << "commit_tid=" << replay_entry.get_commit_tid() << ", r=" << r
- << dendl;
-
- if (r < 0) {
- derr << "failed to commit journal event: " << cpp_strerror(r) << dendl;
- handle_replay_complete(r, "failed to commit journal event");
- } else {
- assert(m_remote_journaler != nullptr);
- m_remote_journaler->committed(replay_entry);
- }
- m_event_replay_tracker.finish_op();
-}
+void ImageReplayer<I>::cancel_update_mirror_image_replay_status() {
+ std::unique_lock timer_locker{m_threads->timer_lock};
+ if (m_update_status_task != nullptr) {
+ dout(10) << dendl;
-template <typename I>
-bool ImageReplayer<I>::update_mirror_image_status(bool force,
- const OptionalState &state) {
- dout(20) << dendl;
- {
- Mutex::Locker locker(m_lock);
- if (!start_mirror_image_status_update(force, false)) {
- return false;
+ if (m_threads->timer->cancel_event(m_update_status_task)) {
+ m_update_status_task = nullptr;
}
}
-
- queue_mirror_image_status_update(state);
- return true;
}
template <typename I>
-bool ImageReplayer<I>::start_mirror_image_status_update(bool force,
- bool restarting) {
- assert(m_lock.is_locked());
-
- if (!force && !is_stopped_()) {
- if (!is_running_()) {
- dout(20) << "shut down in-progress: ignoring update" << dendl;
- return false;
- } else if (m_in_flight_status_updates > (restarting ? 1 : 0)) {
- dout(20) << "already sending update" << dendl;
- m_update_status_requested = true;
- return false;
- }
- }
+void ImageReplayer<I>::update_mirror_image_status(
+ bool force, const OptionalState &opt_state) {
+ dout(15) << "force=" << force << ", "
+ << "state=" << opt_state << dendl;
- dout(20) << dendl;
- ++m_in_flight_status_updates;
- return true;
-}
-
-template <typename I>
-void ImageReplayer<I>::finish_mirror_image_status_update() {
- Context *on_finish = nullptr;
{
- Mutex::Locker locker(m_lock);
- assert(m_in_flight_status_updates > 0);
- if (--m_in_flight_status_updates > 0) {
- dout(20) << "waiting on " << m_in_flight_status_updates << " in-flight "
- << "updates" << dendl;
+ std::lock_guard locker{m_lock};
+ if (!force && !is_stopped_() && !is_running_()) {
+ dout(15) << "shut down in-progress: ignoring update" << dendl;
return;
}
-
- std::swap(on_finish, m_on_update_status_finish);
}
- dout(20) << dendl;
- if (on_finish != nullptr) {
- on_finish->complete(0);
- }
-}
-
-template <typename I>
-void ImageReplayer<I>::queue_mirror_image_status_update(const OptionalState &state) {
- dout(20) << dendl;
- FunctionContext *ctx = new FunctionContext(
- [this, state](int r) {
- send_mirror_status_update(state);
+ m_in_flight_op_tracker.start_op();
+ auto ctx = new LambdaContext(
+ [this, force, opt_state](int r) {
+ set_mirror_image_status_update(force, opt_state);
});
m_threads->work_queue->queue(ctx, 0);
}
template <typename I>
-void ImageReplayer<I>::send_mirror_status_update(const OptionalState &opt_state) {
+void ImageReplayer<I>::set_mirror_image_status_update(
+ bool force, const OptionalState &opt_state) {
+ dout(15) << "force=" << force << ", "
+ << "state=" << opt_state << dendl;
+
+ reregister_admin_socket_hook();
+
State state;
std::string state_desc;
int last_r;
- bool bootstrapping;
bool stopping_replay;
+
+ auto mirror_image_status_state = boost::make_optional(
+ false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN);
+ image_replayer::BootstrapRequest<I>* bootstrap_request = nullptr;
{
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
state = m_state;
state_desc = m_state_desc;
+ mirror_image_status_state = m_mirror_image_status_state;
last_r = m_last_r;
- bootstrapping = (m_bootstrap_request != nullptr);
- stopping_replay = (m_local_image_ctx != nullptr);
+ stopping_replay = (m_replayer != nullptr);
+
+ if (m_bootstrap_request != nullptr) {
+ bootstrap_request = m_bootstrap_request;
+ bootstrap_request->get();
+ }
+ }
+
+ bool syncing = false;
+ if (bootstrap_request != nullptr) {
+ syncing = bootstrap_request->is_syncing();
+ bootstrap_request->put();
+ bootstrap_request = nullptr;
}
if (opt_state) {
state = *opt_state;
}
- cls::rbd::MirrorImageStatus status;
+ cls::rbd::MirrorImageSiteStatus status;
status.up = true;
switch (state) {
case STATE_STARTING:
- if (bootstrapping) {
+ if (syncing) {
status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING;
status.description = state_desc.empty() ? "syncing" : state_desc;
+ mirror_image_status_state = status.state;
} else {
status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY;
status.description = "starting replay";
}
break;
case STATE_REPLAYING:
- case STATE_REPLAY_FLUSHING:
status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
{
- Context *on_req_finish = new FunctionContext(
- [this](int r) {
- dout(20) << "replay status ready: r=" << r << dendl;
+ std::string desc;
+ auto on_req_finish = new LambdaContext(
+ [this, force](int r) {
+ dout(15) << "replay status ready: r=" << r << dendl;
if (r >= 0) {
- send_mirror_status_update(boost::none);
+ set_mirror_image_status_update(force, boost::none);
} else if (r == -EAGAIN) {
- // decrement in-flight status update counter
- handle_mirror_status_update(r);
+ m_in_flight_op_tracker.finish_op();
}
});
- std::string desc;
- if (!m_replay_status_formatter->get_or_send_update(&desc,
- on_req_finish)) {
- dout(20) << "waiting for replay status" << dendl;
+ ceph_assert(m_replayer != nullptr);
+ if (!m_replayer->get_replay_status(&desc, on_req_finish)) {
+ dout(15) << "waiting for replay status" << dendl;
return;
}
+
status.description = "replaying, " + desc;
+ mirror_image_status_state = boost::make_optional(
+ false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN);
}
break;
case STATE_STOPPING:
if (stopping_replay) {
status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY;
- status.description = "stopping replay";
+ status.description = state_desc.empty() ? "stopping replay" : state_desc;
break;
}
// FALLTHROUGH
case STATE_STOPPED:
- if (last_r < 0) {
+ if (last_r == -EREMOTEIO) {
+ status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN;
+ status.description = state_desc;
+ mirror_image_status_state = status.state;
+ } else if (last_r < 0 && last_r != -ECANCELED) {
status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR;
status.description = state_desc;
+ mirror_image_status_state = status.state;
} else {
status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED;
status.description = state_desc.empty() ? "stopped" : state_desc;
+ mirror_image_status_state = boost::none;
}
break;
default:
- assert(!"invalid state");
+ ceph_assert(!"invalid state");
}
- dout(20) << "status=" << status << dendl;
- librados::ObjectWriteOperation op;
- librbd::cls_client::mirror_image_status_set(&op, m_global_image_id, status);
-
- librados::AioCompletion *aio_comp = create_rados_callback<
- ImageReplayer<I>, &ImageReplayer<I>::handle_mirror_status_update>(this);
- int r = m_local_ioctx.aio_operate(RBD_MIRRORING, aio_comp, &op);
- assert(r == 0);
- aio_comp->release();
-}
-
-template <typename I>
-void ImageReplayer<I>::handle_mirror_status_update(int r) {
- dout(20) << "r=" << r << dendl;
-
- bool running = false;
- bool started = false;
{
- Mutex::Locker locker(m_lock);
- bool update_status_requested = false;
- std::swap(update_status_requested, m_update_status_requested);
-
- running = is_running_();
- if (running && update_status_requested) {
- started = start_mirror_image_status_update(false, true);
- }
+ std::lock_guard locker{m_lock};
+ m_mirror_image_status_state = mirror_image_status_state;
}
- // if a deferred update is available, send it -- otherwise reschedule
- // the timer task
- if (started) {
- queue_mirror_image_status_update(boost::none);
- } else if (running) {
- reschedule_update_status_task();
+ // prevent the status from ping-ponging when failed replays are restarted
+ if (mirror_image_status_state &&
+ *mirror_image_status_state == cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR) {
+ status.state = *mirror_image_status_state;
}
- // mark committed status update as no longer in-flight
- finish_mirror_image_status_update();
-}
-
-template <typename I>
-void ImageReplayer<I>::reschedule_update_status_task(int new_interval) {
- dout(20) << dendl;
-
- bool canceled_task = false;
- {
- Mutex::Locker locker(m_lock);
- Mutex::Locker timer_locker(m_threads->timer_lock);
-
- if (m_update_status_task) {
- canceled_task = m_threads->timer->cancel_event(m_update_status_task);
- m_update_status_task = nullptr;
- }
-
- if (new_interval > 0) {
- m_update_status_interval = new_interval;
- }
-
- bool restarting = (new_interval == 0 || canceled_task);
- if (new_interval >= 0 && is_running_() &&
- start_mirror_image_status_update(false, restarting)) {
- m_update_status_task = new FunctionContext(
- [this](int r) {
- assert(m_threads->timer_lock.is_locked());
- m_update_status_task = nullptr;
-
- queue_mirror_image_status_update(boost::none);
- });
- m_threads->timer->add_event_after(m_update_status_interval,
- m_update_status_task);
- }
+ dout(15) << "status=" << status << dendl;
+ m_local_status_updater->set_mirror_image_status(m_global_image_id, status,
+ force);
+ if (m_remote_image_peer.mirror_status_updater != nullptr) {
+ m_remote_image_peer.mirror_status_updater->set_mirror_image_status(
+ m_global_image_id, status, force);
}
- if (canceled_task) {
- dout(20) << "canceled task" << dendl;
- finish_mirror_image_status_update();
- }
+ m_in_flight_op_tracker.finish_op();
}
template <typename I>
void ImageReplayer<I>::shut_down(int r) {
- dout(20) << "r=" << r << dendl;
+ dout(10) << "r=" << r << dendl;
+
{
- Mutex::Locker locker(m_lock);
- assert(m_state == STATE_STOPPING);
-
- // if status updates are in-flight, wait for them to complete
- // before proceeding
- if (m_in_flight_status_updates > 0) {
- if (m_on_update_status_finish == nullptr) {
- dout(20) << "waiting for in-flight status update" << dendl;
- m_on_update_status_finish = new FunctionContext(
- [this, r](int _r) {
- shut_down(r);
- });
- }
- return;
- }
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_state == STATE_STOPPING);
}
- // NOTE: it's important to ensure that the local image is fully
- // closed before attempting to close the remote journal in
- // case the remote cluster is unreachable
+ if (!m_in_flight_op_tracker.empty()) {
+ dout(15) << "waiting for in-flight operations to complete" << dendl;
+ m_in_flight_op_tracker.wait_for_ops(new LambdaContext([this, r](int) {
+ shut_down(r);
+ }));
+ return;
+ }
// chain the shut down sequence (reverse order)
- Context *ctx = new FunctionContext(
+ Context *ctx = new LambdaContext(
[this, r](int _r) {
update_mirror_image_status(true, STATE_STOPPED);
handle_shut_down(r);
});
- // close the remote journal
- if (m_remote_journaler != nullptr) {
- ctx = new FunctionContext([this, ctx](int r) {
- delete m_remote_journaler;
- m_remote_journaler = nullptr;
- ctx->complete(0);
- });
- ctx = new FunctionContext([this, ctx](int r) {
- m_remote_journaler->remove_listener(&m_remote_listener);
- m_remote_journaler->shut_down(ctx);
- });
- if (m_stopping_for_resync) {
- ctx = new FunctionContext([this, ctx](int r) {
- m_remote_journaler->unregister_client(ctx);
- });
- }
- }
-
- // stop the replay of remote journal events
- if (m_replay_handler != nullptr) {
- ctx = new FunctionContext([this, ctx](int r) {
- delete m_replay_handler;
- m_replay_handler = nullptr;
-
- m_event_replay_tracker.wait_for_ops(ctx);
- });
- ctx = new FunctionContext([this, ctx](int r) {
- m_remote_journaler->stop_replay(ctx);
- });
- }
-
- // close the local image (release exclusive lock)
- if (m_local_image_ctx) {
- ctx = new FunctionContext([this, ctx](int r) {
- CloseImageRequest<I> *request = CloseImageRequest<I>::create(
- &m_local_image_ctx, ctx);
- request->send();
+ // destruct the state builder
+ if (m_state_builder != nullptr) {
+ ctx = new LambdaContext([this, ctx](int r) {
+ m_state_builder->close(ctx);
});
}
- // shut down event replay into the local image
- if (m_local_journal != nullptr) {
- ctx = new FunctionContext([this, ctx](int r) {
- m_local_journal = nullptr;
- ctx->complete(0);
- });
- if (m_local_replay != nullptr) {
- ctx = new FunctionContext([this, ctx](int r) {
- m_local_journal->stop_external_replay();
- m_local_replay = nullptr;
-
- EventPreprocessor<I>::destroy(m_event_preprocessor);
- m_event_preprocessor = nullptr;
- ctx->complete(0);
- });
- }
- ctx = new FunctionContext([this, ctx](int r) {
- // blocks if listener notification is in-progress
- m_local_journal->remove_listener(m_journal_listener);
- ctx->complete(0);
- });
- }
-
- // wait for all local in-flight replay events to complete
- ctx = new FunctionContext([this, ctx](int r) {
- if (r < 0) {
- derr << "error shutting down journal replay: " << cpp_strerror(r)
- << dendl;
- }
-
- m_event_replay_tracker.wait_for_ops(ctx);
+ // close the replayer
+ if (m_replayer != nullptr) {
+ ctx = new LambdaContext([this, ctx](int r) {
+ m_replayer->destroy();
+ m_replayer = nullptr;
+ ctx->complete(0);
+ });
+ ctx = new LambdaContext([this, ctx](int r) {
+ m_replayer->shut_down(ctx);
});
-
- // flush any local in-flight replay events
- if (m_local_replay != nullptr) {
- ctx = new FunctionContext([this, ctx](int r) {
- m_local_replay->shut_down(true, ctx);
- });
}
m_threads->work_queue->queue(ctx, 0);
template <typename I>
void ImageReplayer<I>::handle_shut_down(int r) {
- reschedule_update_status_task(-1);
-
+ bool resync_requested = false;
+ bool delete_requested = false;
+ bool unregister_asok_hook = false;
{
- Mutex::Locker locker(m_lock);
-
- // if status updates are in-flight, wait for them to complete
- // before proceeding
- if (m_in_flight_status_updates > 0) {
- if (m_on_update_status_finish == nullptr) {
- dout(20) << "waiting for in-flight status update" << dendl;
- m_on_update_status_finish = new FunctionContext(
- [this, r](int _r) {
- handle_shut_down(r);
- });
- }
- return;
+ std::lock_guard locker{m_lock};
+
+ if (m_delete_requested && m_state_builder != nullptr &&
+ !m_state_builder->local_image_id.empty()) {
+ ceph_assert(m_state_builder->remote_image_id.empty());
+ dout(0) << "remote image no longer exists: scheduling deletion" << dendl;
+ unregister_asok_hook = true;
+ std::swap(delete_requested, m_delete_requested);
+ m_delete_in_progress = true;
}
- if (m_stopping_for_resync) {
- m_image_deleter->schedule_image_delete(m_local,
- m_local_pool_id,
- m_global_image_id);
- m_stopping_for_resync = false;
+ std::swap(resync_requested, m_resync_requested);
+ if (!delete_requested && !resync_requested && m_last_r == -ENOENT &&
+ ((m_state_builder == nullptr) ||
+ (m_state_builder->local_image_id.empty() &&
+ m_state_builder->remote_image_id.empty()))) {
+ dout(0) << "mirror image no longer exists" << dendl;
+ unregister_asok_hook = true;
+ m_finished = true;
}
}
- dout(20) << "stop complete" << dendl;
- m_local_ioctx.close();
+ if (unregister_asok_hook) {
+ unregister_admin_socket_hook();
+ }
+
+ if (delete_requested || resync_requested) {
+ dout(5) << "moving image to trash" << dendl;
+ auto ctx = new LambdaContext([this, r](int) {
+ handle_shut_down(r);
+ });
+ ImageDeleter<I>::trash_move(m_local_io_ctx, m_global_image_id,
+ resync_requested, m_threads->work_queue, ctx);
+ return;
+ }
+
+ if (!m_in_flight_op_tracker.empty()) {
+ dout(15) << "waiting for in-flight operations to complete" << dendl;
+ m_in_flight_op_tracker.wait_for_ops(new LambdaContext([this, r](int) {
+ handle_shut_down(r);
+ }));
+ return;
+ }
+
+ if (!m_status_removed) {
+ auto ctx = new LambdaContext([this, r](int) {
+ m_status_removed = true;
+ handle_shut_down(r);
+ });
+ remove_image_status(m_delete_in_progress, ctx);
+ return;
+ }
- ReplayStatusFormatter<I>::destroy(m_replay_status_formatter);
- m_replay_status_formatter = nullptr;
+ if (m_state_builder != nullptr) {
+ m_state_builder->destroy();
+ m_state_builder = nullptr;
+ }
+ dout(10) << "stop complete" << dendl;
Context *on_start = nullptr;
- Context *on_stop = nullptr;
+ Contexts on_stop_contexts;
{
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
std::swap(on_start, m_on_start_finish);
- std::swap(on_stop, m_on_stop_finish);
+ on_stop_contexts = std::move(m_on_stop_contexts);
m_stop_requested = false;
- assert(m_delayed_preprocess_task == nullptr);
- assert(m_state == STATE_STOPPING);
+ ceph_assert(m_state == STATE_STOPPING);
m_state = STATE_STOPPED;
}
if (on_start != nullptr) {
- dout(20) << "on start finish complete, r=" << r << dendl;
+ dout(10) << "on start finish complete, r=" << r << dendl;
on_start->complete(r);
r = 0;
}
- if (on_stop != nullptr) {
- dout(20) << "on stop finish complete, r=" << r << dendl;
- on_stop->complete(r);
+ for (auto ctx : on_stop_contexts) {
+ dout(10) << "on stop finish " << ctx << " complete, r=" << r << dendl;
+ ctx->complete(r);
}
}
template <typename I>
-void ImageReplayer<I>::handle_remote_journal_metadata_updated() {
- dout(20) << dendl;
+void ImageReplayer<I>::handle_replayer_notification() {
+ dout(10) << dendl;
+
+ std::unique_lock locker{m_lock};
+ if (m_state != STATE_REPLAYING) {
+ // might be attempting to shut down
+ return;
+ }
- cls::journal::Client client;
{
- Mutex::Locker locker(m_lock);
- if (!is_running_()) {
- return;
+ // detect a rename of the local image
+ ceph_assert(m_state_builder != nullptr &&
+ m_state_builder->local_image_ctx != nullptr);
+ std::shared_lock image_locker{m_state_builder->local_image_ctx->image_lock};
+ if (m_local_image_name != m_state_builder->local_image_ctx->name) {
+ // will re-register with new name after next status update
+ dout(10) << "image renamed" << dendl;
+ m_local_image_name = m_state_builder->local_image_ctx->name;
}
+ }
- int r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
- if (r < 0) {
- derr << "failed to retrieve client: " << cpp_strerror(r) << dendl;
- return;
- }
+ // replayer cannot be shut down while notification is in-flight
+ ceph_assert(m_replayer != nullptr);
+ locker.unlock();
+
+ if (m_replayer->is_resync_requested()) {
+ dout(10) << "resync requested" << dendl;
+ m_resync_requested = true;
+ on_stop_journal_replay(0, "resync requested");
+ return;
}
- if (client.state != cls::journal::CLIENT_STATE_CONNECTED) {
- dout(0) << "client flagged disconnected, stopping image replay" << dendl;
- stop(nullptr, false, -ENOTCONN, "disconnected");
+ if (!m_replayer->is_replaying()) {
+ auto error_code = m_replayer->get_error_code();
+ auto error_description = m_replayer->get_error_description();
+ dout(10) << "replay interrupted: "
+ << "r=" << error_code << ", "
+ << "error=" << error_description << dendl;
+ on_stop_journal_replay(error_code, error_description);
+ return;
}
+
+ update_mirror_image_status(false, {});
}
template <typename I>
return "Starting";
case ImageReplayer<I>::STATE_REPLAYING:
return "Replaying";
- case ImageReplayer<I>::STATE_REPLAY_FLUSHING:
- return "ReplayFlushing";
case ImageReplayer<I>::STATE_STOPPING:
return "Stopping";
case ImageReplayer<I>::STATE_STOPPED:
}
template <typename I>
-void ImageReplayer<I>::resync_image(Context *on_finish) {
- dout(20) << dendl;
+void ImageReplayer<I>::register_admin_socket_hook() {
+ ImageReplayerAdminSocketHook<I> *asok_hook;
+ {
+ std::lock_guard locker{m_lock};
+ if (m_asok_hook != nullptr) {
+ return;
+ }
+
+ dout(15) << "registered asok hook: " << m_image_spec << dendl;
+ asok_hook = new ImageReplayerAdminSocketHook<I>(
+ g_ceph_context, m_image_spec, this);
+ int r = asok_hook->register_commands();
+ if (r == 0) {
+ m_asok_hook = asok_hook;
+ return;
+ }
+ derr << "error registering admin socket commands" << dendl;
+ }
+ delete asok_hook;
+}
+
+template <typename I>
+void ImageReplayer<I>::unregister_admin_socket_hook() {
+ dout(15) << dendl;
+ AdminSocketHook *asok_hook = nullptr;
{
- Mutex::Locker l(m_lock);
- m_stopping_for_resync = true;
+ std::lock_guard locker{m_lock};
+ std::swap(asok_hook, m_asok_hook);
}
+ delete asok_hook;
+}
+
+template <typename I>
+void ImageReplayer<I>::reregister_admin_socket_hook() {
+ std::unique_lock locker{m_lock};
+ if (m_state == STATE_STARTING && m_bootstrap_request != nullptr) {
+ m_local_image_name = m_bootstrap_request->get_local_image_name();
+ }
+
+ auto image_spec = image_replayer::util::compute_image_spec(
+ m_local_io_ctx, m_local_image_name);
+ if (m_asok_hook != nullptr && m_image_spec == image_spec) {
+ return;
+ }
+
+ dout(15) << "old_image_spec=" << m_image_spec << ", "
+ << "new_image_spec=" << image_spec << dendl;
+ m_image_spec = image_spec;
- stop(on_finish);
+ if (m_state == STATE_STOPPING || m_state == STATE_STOPPED) {
+ // no need to re-register if stopping
+ return;
+ }
+ locker.unlock();
+
+ unregister_admin_socket_hook();
+ register_admin_socket_hook();
+}
+
+template <typename I>
+void ImageReplayer<I>::remove_image_status(bool force, Context *on_finish)
+{
+ auto ctx = new LambdaContext([this, force, on_finish](int) {
+ remove_image_status_remote(force, on_finish);
+ });
+
+ if (m_local_status_updater->exists(m_global_image_id)) {
+ dout(15) << "removing local mirror image status" << dendl;
+ if (force) {
+ m_local_status_updater->remove_mirror_image_status(
+ m_global_image_id, true, ctx);
+ } else {
+ m_local_status_updater->remove_refresh_mirror_image_status(
+ m_global_image_id, ctx);
+ }
+ return;
+ }
+
+ ctx->complete(0);
+}
+
+template <typename I>
+void ImageReplayer<I>::remove_image_status_remote(bool force, Context *on_finish)
+{
+ if (m_remote_image_peer.mirror_status_updater != nullptr &&
+ m_remote_image_peer.mirror_status_updater->exists(m_global_image_id)) {
+ dout(15) << "removing remote mirror image status" << dendl;
+ if (force) {
+ m_remote_image_peer.mirror_status_updater->remove_mirror_image_status(
+ m_global_image_id, true, on_finish);
+ } else {
+ m_remote_image_peer.mirror_status_updater->remove_refresh_mirror_image_status(
+ m_global_image_id, on_finish);
+ }
+ return;
+ }
+ if (on_finish) {
+ on_finish->complete(0);
+ }
}
template <typename I>