#include "librbd/Operations.h"
#include "librbd/Utils.h"
#include "librbd/journal/Replay.h"
+#include "ImageDeleter.h"
#include "ImageReplayer.h"
#include "Threads.h"
#include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
#include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
#include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
+#include "tools/rbd_mirror/image_replayer/PrepareRemoteImageRequest.h"
#include "tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h"
#define dout_context g_ceph_context
}
};
+template <typename I>
class ImageReplayerAdminSocketCommand {
public:
+ ImageReplayerAdminSocketCommand(const std::string &desc,
+ ImageReplayer<I> *replayer)
+ : desc(desc), replayer(replayer) {
+ }
virtual ~ImageReplayerAdminSocketCommand() {}
virtual bool call(Formatter *f, stringstream *ss) = 0;
+
+ std::string desc;
+ ImageReplayer<I> *replayer;
+ bool registered = false;
};
template <typename I>
-class StatusCommand : public ImageReplayerAdminSocketCommand {
+class StatusCommand : public ImageReplayerAdminSocketCommand<I> {
public:
- explicit StatusCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
+ explicit StatusCommand(const std::string &desc, ImageReplayer<I> *replayer)
+ : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
+ }
bool call(Formatter *f, stringstream *ss) override {
- replayer->print_status(f, ss);
+ this->replayer->print_status(f, ss);
return true;
}
-
-private:
- ImageReplayer<I> *replayer;
};
template <typename I>
-class StartCommand : public ImageReplayerAdminSocketCommand {
+class StartCommand : public ImageReplayerAdminSocketCommand<I> {
public:
- explicit StartCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
+ explicit StartCommand(const std::string &desc, ImageReplayer<I> *replayer)
+ : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
+ }
bool call(Formatter *f, stringstream *ss) override {
- replayer->start(nullptr, true);
+ this->replayer->start(nullptr, true);
return true;
}
-
-private:
- ImageReplayer<I> *replayer;
};
template <typename I>
-class StopCommand : public ImageReplayerAdminSocketCommand {
+class StopCommand : public ImageReplayerAdminSocketCommand<I> {
public:
- explicit StopCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
+ explicit StopCommand(const std::string &desc, ImageReplayer<I> *replayer)
+ : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
+ }
bool call(Formatter *f, stringstream *ss) override {
- replayer->stop(nullptr, true);
+ this->replayer->stop(nullptr, true);
return true;
}
-
-private:
- ImageReplayer<I> *replayer;
};
template <typename I>
-class RestartCommand : public ImageReplayerAdminSocketCommand {
+class RestartCommand : public ImageReplayerAdminSocketCommand<I> {
public:
- explicit RestartCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
+ explicit RestartCommand(const std::string &desc, ImageReplayer<I> *replayer)
+ : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
+ }
bool call(Formatter *f, stringstream *ss) override {
- replayer->restart();
+ this->replayer->restart();
return true;
}
-
-private:
- ImageReplayer<I> *replayer;
};
template <typename I>
-class FlushCommand : public ImageReplayerAdminSocketCommand {
+class FlushCommand : public ImageReplayerAdminSocketCommand<I> {
public:
- explicit FlushCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
+ explicit FlushCommand(const std::string &desc, ImageReplayer<I> *replayer)
+ : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
+ }
bool call(Formatter *f, stringstream *ss) override {
- C_SaferCond cond;
- replayer->flush(&cond);
- int r = cond.wait();
- if (r < 0) {
- *ss << "flush: " << cpp_strerror(r);
- return false;
- }
+ this->replayer->flush();
return true;
}
-
-private:
- ImageReplayer<I> *replayer;
};
template <typename I>
ImageReplayerAdminSocketHook(CephContext *cct, const std::string &name,
ImageReplayer<I> *replayer)
: admin_socket(cct->get_admin_socket()),
- lock("ImageReplayerAdminSocketHook::lock " +
- replayer->get_global_image_id()) {
- std::string command;
- int r;
-
- command = "rbd mirror status " + name;
- r = admin_socket->register_command(command, command, this,
- "get status for rbd mirror " + name);
- if (r == 0) {
- commands[command] = new StatusCommand<I>(replayer);
- }
-
- command = "rbd mirror start " + name;
- r = admin_socket->register_command(command, command, this,
- "start rbd mirror " + name);
- if (r == 0) {
- commands[command] = new StartCommand<I>(replayer);
- }
-
- command = "rbd mirror stop " + name;
- r = admin_socket->register_command(command, command, this,
- "stop rbd mirror " + name);
- if (r == 0) {
- commands[command] = new StopCommand<I>(replayer);
- }
-
- command = "rbd mirror restart " + name;
- r = admin_socket->register_command(command, command, this,
- "restart rbd mirror " + name);
- if (r == 0) {
- commands[command] = new RestartCommand<I>(replayer);
- }
-
- command = "rbd mirror flush " + name;
- r = admin_socket->register_command(command, command, this,
- "flush rbd mirror " + name);
- if (r == 0) {
- commands[command] = new FlushCommand<I>(replayer);
+ commands{{"rbd mirror flush " + name,
+ new FlushCommand<I>("flush rbd mirror " + name, replayer)},
+ {"rbd mirror restart " + name,
+ new RestartCommand<I>("restart rbd mirror " + name, replayer)},
+ {"rbd mirror start " + name,
+ new StartCommand<I>("start rbd mirror " + name, replayer)},
+ {"rbd mirror status " + name,
+ new StatusCommand<I>("get status for rbd mirror " + name, replayer)},
+ {"rbd mirror stop " + name,
+ new StopCommand<I>("stop rbd mirror " + name, replayer)}} {
+ }
+
+ int register_commands() {
+ for (auto &it : commands) {
+ int r = admin_socket->register_command(it.first, it.first, this,
+ it.second->desc);
+ if (r < 0) {
+ return r;
+ }
+ it.second->registered = true;
}
+ return 0;
}
~ImageReplayerAdminSocketHook() override {
- Mutex::Locker locker(lock);
- for (Commands::const_iterator i = commands.begin(); i != commands.end();
- ++i) {
- (void)admin_socket->unregister_command(i->first);
- delete i->second;
+ for (auto &it : commands) {
+ if (it.second->registered) {
+ admin_socket->unregister_command(it.first);
+ }
+ delete it.second;
}
commands.clear();
}
bool call(std::string command, cmdmap_t& cmdmap, std::string format,
bufferlist& out) override {
- Mutex::Locker locker(lock);
- Commands::const_iterator i = commands.find(command);
+ auto i = commands.find(command);
assert(i != commands.end());
Formatter *f = Formatter::create(format);
stringstream ss;
}
private:
- typedef std::map<std::string, ImageReplayerAdminSocketCommand*> Commands;
+ typedef std::map<std::string, ImageReplayerAdminSocketCommand<I> *> Commands;
AdminSocket *admin_socket;
- Mutex lock;
Commands commands;
};
}
template <typename I>
-ImageReplayer<I>::ImageReplayer(Threads<librbd::ImageCtx> *threads,
+ImageReplayer<I>::ImageReplayer(Threads<I> *threads,
ImageDeleter<I>* image_deleter,
InstanceWatcher<I> *instance_watcher,
RadosRef local,
m_local(local),
m_local_mirror_uuid(local_mirror_uuid),
m_local_pool_id(local_pool_id),
- m_global_image_id(global_image_id),
+ m_global_image_id(global_image_id), m_local_image_name(global_image_id),
m_lock("rbd::mirror::ImageReplayer " + stringify(local_pool_id) + " " +
global_image_id),
m_progress_cxt(this),
}
m_name = pool_name + "/" + m_global_image_id;
- dout(20) << "registered asok hook: " << m_name << dendl;
- m_asok_hook = new ImageReplayerAdminSocketHook<I>(g_ceph_context, m_name,
- this);
+ register_admin_socket_hook();
}
template <typename I>
ImageReplayer<I>::~ImageReplayer()
{
+ unregister_admin_socket_hook();
assert(m_event_preprocessor == nullptr);
assert(m_replay_status_formatter == nullptr);
assert(m_local_image_ctx == nullptr);
assert(m_in_flight_status_updates == 0);
delete m_journal_listener;
- delete m_asok_hook;
}
template <typename I>
}
template <typename I>
-void ImageReplayer<I>::add_remote_image(const std::string &mirror_uuid,
- const std::string &image_id,
- librados::IoCtx &io_ctx) {
+void ImageReplayer<I>::add_peer(const std::string &peer_uuid,
+ librados::IoCtx &io_ctx) {
Mutex::Locker locker(m_lock);
-
- RemoteImage remote_image(mirror_uuid, image_id, io_ctx);
- auto it = m_remote_images.find(remote_image);
- if (it == m_remote_images.end()) {
- m_remote_images.insert(remote_image);
+ auto it = m_peers.find({peer_uuid});
+ if (it == m_peers.end()) {
+ m_peers.insert({peer_uuid, io_ctx});
}
}
-template <typename I>
-void ImageReplayer<I>::remove_remote_image(const std::string &mirror_uuid,
- const std::string &image_id,
- bool schedule_delete) {
- Mutex::Locker locker(m_lock);
- m_remote_images.erase({mirror_uuid, image_id});
-}
-
-template <typename I>
-bool ImageReplayer<I>::remote_images_empty() const {
- Mutex::Locker locker(m_lock);
- return m_remote_images.empty();
-}
-
template <typename I>
void ImageReplayer<I>::set_state_description(int r, const std::string &desc) {
dout(20) << r << " " << desc << dendl;
m_last_r = 0;
m_state_desc.clear();
m_manual_stop = false;
+ m_delete_requested = false;
if (on_finish != nullptr) {
assert(m_on_start_finish == nullptr);
return;
}
+ wait_for_deletion();
+}
+
+template <typename I>
+void ImageReplayer<I>::wait_for_deletion() {
+ dout(20) << dendl;
+
+ Context *ctx = create_context_callback<
+ ImageReplayer, &ImageReplayer<I>::handle_wait_for_deletion>(this);
+ m_image_deleter->wait_for_scheduled_deletion(
+ m_local_pool_id, m_global_image_id, ctx, false);
+}
+
+template <typename I>
+void ImageReplayer<I>::handle_wait_for_deletion(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ if (r == -ECANCELED) {
+ on_start_fail(0, "");
+ return;
+ } else if (r < 0) {
+ on_start_fail(r, "error waiting for image deletion");
+ return;
+ }
+
prepare_local_image();
}
void ImageReplayer<I>::prepare_local_image() {
dout(20) << dendl;
+ m_local_image_id = "";
Context *ctx = create_context_callback<
ImageReplayer, &ImageReplayer<I>::handle_prepare_local_image>(this);
auto req = PrepareLocalImageRequest<I>::create(
- m_local_ioctx, m_global_image_id, &m_local_image_id,
+ m_local_ioctx, m_global_image_id, &m_local_image_id, &m_local_image_name,
&m_local_image_tag_owner, m_threads->work_queue, ctx);
req->send();
}
} else if (r < 0) {
on_start_fail(r, "error preparing local image for replay");
return;
- } else if (m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) {
- dout(5) << "local image is primary" << dendl;
- on_start_fail(0, "local image is primary");
- return;
+ } else {
+ reregister_admin_socket_hook();
}
// local image doesn't exist or is non-primary
- bootstrap();
+ prepare_remote_image();
}
template <typename I>
-void ImageReplayer<I>::bootstrap() {
+void ImageReplayer<I>::prepare_remote_image() {
dout(20) << dendl;
+ if (m_peers.empty()) {
+ // technically nothing to bootstrap, but it handles the status update
+ bootstrap();
+ return;
+ }
+
+ // TODO need to support multiple remote images
+ assert(!m_peers.empty());
+ m_remote_image = {*m_peers.begin()};
+
+ Context *ctx = create_context_callback<
+ ImageReplayer, &ImageReplayer<I>::handle_prepare_remote_image>(this);
+ auto req = PrepareRemoteImageRequest<I>::create(
+ m_threads, m_remote_image.io_ctx, m_global_image_id, m_local_mirror_uuid,
+ m_local_image_id, &m_remote_image.mirror_uuid, &m_remote_image.image_id,
+ &m_remote_journaler, &m_client_state, &m_client_meta, ctx);
+ req->send();
+}
+
+template <typename I>
+void ImageReplayer<I>::handle_prepare_remote_image(int r) {
+ dout(20) << "r=" << r << dendl;
- if (m_remote_images.empty()) {
- on_start_fail(-EREMOTEIO, "waiting for primary remote image");
+ assert(r < 0 ? m_remote_journaler == nullptr : m_remote_journaler != nullptr);
+ if (r < 0 && !m_local_image_id.empty() &&
+ m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) {
+ // local image is primary -- fall-through
+ } else if (r == -ENOENT) {
+ dout(20) << "remote image does not exist" << dendl;
+
+ // TODO need to support multiple remote images
+ if (!m_local_image_id.empty() &&
+ m_local_image_tag_owner == m_remote_image.mirror_uuid) {
+ // local image exists and is non-primary and linked to the missing
+ // remote image
+
+ m_delete_requested = true;
+ on_start_fail(0, "remote image no longer exists");
+ } else {
+ on_start_fail(-ENOENT, "remote image does not exist");
+ }
+ return;
+ } else if (r < 0) {
+ on_start_fail(r, "error retrieving remote image id");
return;
}
- // TODO bootstrap will need to support multiple remote images
- m_remote_image = *m_remote_images.begin();
+ bootstrap();
+}
- CephContext *cct = static_cast<CephContext *>(m_local->cct());
- journal::Settings settings;
- settings.commit_interval = cct->_conf->rbd_mirror_journal_commit_age;
- settings.max_fetch_bytes = cct->_conf->rbd_mirror_journal_max_fetch_bytes;
+template <typename I>
+void ImageReplayer<I>::bootstrap() {
+ dout(20) << dendl;
- m_remote_journaler = new Journaler(m_threads->work_queue,
- m_threads->timer,
- &m_threads->timer_lock,
- m_remote_image.io_ctx,
- m_remote_image.image_id,
- m_local_mirror_uuid, settings);
+ if (!m_local_image_id.empty() &&
+ m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) {
+ dout(5) << "local image is primary" << dendl;
+ on_start_fail(0, "local image is primary");
+ return;
+ } else if (m_peers.empty()) {
+ dout(5) << "no peer clusters" << dendl;
+ on_start_fail(-ENOENT, "no peer clusters");
+ return;
+ }
Context *ctx = create_context_callback<
ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
&m_local_image_ctx, m_local_image_id, m_remote_image.image_id,
m_global_image_id, m_threads->work_queue, m_threads->timer,
&m_threads->timer_lock, m_local_mirror_uuid, m_remote_image.mirror_uuid,
- m_remote_journaler, &m_client_meta, ctx, &m_do_resync, &m_progress_cxt);
+ m_remote_journaler, &m_client_state, &m_client_meta, ctx,
+ &m_resync_requested, &m_progress_cxt);
{
Mutex::Locker locker(m_lock);
return;
} else if (on_start_interrupted()) {
return;
+ } else if (m_resync_requested) {
+ on_start_fail(0, "resync requested");
+ return;
}
assert(m_local_journal == nullptr);
return;
}
- {
- Mutex::Locker locker(m_lock);
-
- if (m_do_resync) {
- Context *on_finish = m_on_start_finish;
- m_stopping_for_resync = true;
- FunctionContext *ctx = new FunctionContext([this, on_finish](int r) {
- if (r < 0) {
- if (on_finish) {
- on_finish->complete(r);
- }
- return;
- }
- resync_image(on_finish);
- });
- m_on_start_finish = ctx;
- }
-
- std::string name = m_local_ioctx.get_pool_name() + "/" +
- m_local_image_ctx->name;
- if (m_name != name) {
- m_name = name;
- if (m_asok_hook) {
- // Re-register asok commands using the new name.
- delete m_asok_hook;
- m_asok_hook = nullptr;
- }
- }
- if (!m_asok_hook) {
- dout(20) << "registered asok hook: " << m_name << dendl;
- m_asok_hook = new ImageReplayerAdminSocketHook<I>(g_ceph_context, m_name,
- this);
- }
- }
-
update_mirror_image_status(false, boost::none);
init_remote_journaler();
}
return;
}
- if (client.state != cls::journal::CLIENT_STATE_CONNECTED) {
+ dout(5) << "image_id=" << m_local_image_id << ", "
+ << "client_meta.image_id=" << m_client_meta.image_id << ", "
+ << "client.state=" << client.state << dendl;
+ if (m_client_meta.image_id == m_local_image_id &&
+ client.state != cls::journal::CLIENT_STATE_CONNECTED) {
dout(5) << "client flagged disconnected, stopping image replay" << dendl;
if (m_local_image_ctx->mirroring_resync_after_disconnect) {
- Mutex::Locker locker(m_lock);
- m_stopping_for_resync = true;
+ m_resync_requested = true;
+ on_start_fail(-ENOTCONN, "disconnected: automatic resync");
+ } else {
+ on_start_fail(-ENOTCONN, "disconnected");
}
- on_start_fail(-ENOTCONN, "disconnected");
return;
}
update_mirror_image_status(true, boost::none);
reschedule_update_status_task(30);
- dout(20) << "start succeeded" << dendl;
- if (on_finish != nullptr) {
- dout(20) << "on finish complete, r=" << r << dendl;
- on_finish->complete(r);
- }
-
if (on_replay_interrupted()) {
return;
}
{
CephContext *cct = static_cast<CephContext *>(m_local->cct());
- double poll_seconds = cct->_conf->rbd_mirror_journal_poll_age;
+ double poll_seconds = cct->_conf->get_val<double>(
+ "rbd_mirror_journal_poll_age");
Mutex::Locker locker(m_lock);
m_replay_handler = new ReplayHandler<I>(this);
dout(20) << "m_remote_journaler=" << *m_remote_journaler << dendl;
}
+ dout(20) << "start succeeded" << dendl;
+ if (on_finish != nullptr) {
+ dout(20) << "on finish complete, r=" << r << dendl;
+ on_finish->complete(r);
+ }
}
template <typename I>
Mutex::Locker locker(m_lock);
assert(m_state == STATE_STARTING);
m_state = STATE_STOPPING;
- if (r < 0 && r != -ECANCELED && r != -EREMOTEIO) {
+ if (r < 0 && r != -ECANCELED && r != -EREMOTEIO && r != -ENOENT) {
derr << "start failed: " << cpp_strerror(r) << dendl;
} else {
dout(20) << "start canceled" << dendl;
dout(20) << "on_finish=" << on_finish << ", manual=" << manual
<< ", desc=" << desc << dendl;
+ m_image_deleter->cancel_waiter(m_local_pool_id, m_global_image_id);
+
image_replayer::BootstrapRequest<I> *bootstrap_request = nullptr;
bool shut_down_replay = false;
bool running = true;
- bool canceled_task = false;
{
Mutex::Locker locker(m_lock);
std::swap(m_on_stop_finish, on_finish);
m_stop_requested = true;
m_manual_stop = manual;
-
- Mutex::Locker timer_locker(m_threads->timer_lock);
- if (m_delayed_preprocess_task != nullptr) {
- canceled_task = m_threads->timer->cancel_event(
- m_delayed_preprocess_task);
- assert(canceled_task);
- m_delayed_preprocess_task = nullptr;
- }
}
}
}
bootstrap_request->put();
}
- if (canceled_task) {
- m_event_replay_tracker.finish_op();
- on_replay_interrupted();
- }
-
if (!running) {
dout(20) << "not running" << dendl;
if (on_finish) {
template <typename I>
void ImageReplayer<I>::allocate_local_tag() {
- dout(20) << dendl;
+ dout(15) << dendl;
std::string mirror_uuid = m_replay_tag_data.mirror_uuid;
- if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID ||
- mirror_uuid == m_local_mirror_uuid) {
+ if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
mirror_uuid = m_remote_image.mirror_uuid;
+ } else if (mirror_uuid == m_local_mirror_uuid) {
+ mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
} else if (mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) {
- dout(5) << "encountered image demotion: stopping" << dendl;
- Mutex::Locker locker(m_lock);
- m_stop_requested = true;
+ // handle possible edge condition where daemon can failover and
+ // the local image has already been promoted/demoted
+ auto local_tag_data = m_local_journal->get_tag_data();
+ if (local_tag_data.mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID &&
+ (local_tag_data.predecessor.commit_valid &&
+ local_tag_data.predecessor.mirror_uuid ==
+ librbd::Journal<>::LOCAL_MIRROR_UUID)) {
+ dout(15) << "skipping stale demotion event" << dendl;
+ handle_process_entry_safe(m_replay_entry, 0);
+ handle_replay_ready();
+ return;
+ } else {
+ dout(5) << "encountered image demotion: stopping" << dendl;
+ Mutex::Locker locker(m_lock);
+ m_stop_requested = true;
+ }
}
librbd::journal::TagPredecessor predecessor(m_replay_tag_data.predecessor);
predecessor.mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
}
- dout(20) << "mirror_uuid=" << mirror_uuid << ", "
- << "predecessor_mirror_uuid=" << predecessor.mirror_uuid << ", "
- << "replay_tag_tid=" << m_replay_tag_tid << ", "
- << "replay_tag_data=" << m_replay_tag_data << dendl;
+ dout(15) << "mirror_uuid=" << mirror_uuid << ", "
+ << "predecessor=" << predecessor << ", "
+ << "replay_tag_tid=" << m_replay_tag_tid << dendl;
Context *ctx = create_context_callback<
ImageReplayer, &ImageReplayer<I>::handle_allocate_local_tag>(this);
m_local_journal->allocate_tag(mirror_uuid, predecessor, ctx);
template <typename I>
void ImageReplayer<I>::handle_allocate_local_tag(int r) {
- dout(20) << "r=" << r << dendl;
+ dout(15) << "r=" << r << ", "
+ << "tag_tid=" << m_local_journal->get_tag_tid() << dendl;
if (r < 0) {
derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl;
dout(20) << dendl;
assert(r == 0);
+ bool update_status = false;
+ {
+ RWLock::RLocker snap_locker(m_local_image_ctx->snap_lock);
+ if (m_local_image_name != m_local_image_ctx->name) {
+ m_local_image_name = m_local_image_ctx->name;
+ update_status = true;
+ }
+ }
+
+ if (update_status) {
+ reschedule_update_status_task(0);
+ }
+
// attempt to process the next event
handle_replay_ready();
}
template <typename I>
void ImageReplayer<I>::finish_mirror_image_status_update() {
+ reregister_admin_socket_hook();
+
Context *on_finish = nullptr;
{
Mutex::Locker locker(m_lock);
template <typename I>
void ImageReplayer<I>::shut_down(int r) {
dout(20) << "r=" << r << dendl;
+
+ bool canceled_delayed_preprocess_task = false;
+ {
+ Mutex::Locker timer_locker(m_threads->timer_lock);
+ if (m_delayed_preprocess_task != nullptr) {
+ canceled_delayed_preprocess_task = m_threads->timer->cancel_event(
+ m_delayed_preprocess_task);
+ assert(canceled_delayed_preprocess_task);
+ m_delayed_preprocess_task = nullptr;
+ }
+ }
+ if (canceled_delayed_preprocess_task) {
+ // wake up sleeping replay
+ m_event_replay_tracker.finish_op();
+ }
+
{
Mutex::Locker locker(m_lock);
assert(m_state == STATE_STOPPING);
m_remote_journaler->remove_listener(&m_remote_listener);
m_remote_journaler->shut_down(ctx);
});
- if (m_stopping_for_resync) {
- ctx = new FunctionContext([this, ctx](int r) {
- m_remote_journaler->unregister_client(ctx);
- });
- }
}
// stop the replay of remote journal events
void ImageReplayer<I>::handle_shut_down(int r) {
reschedule_update_status_task(-1);
+ bool unregister_asok_hook = false;
{
Mutex::Locker locker(m_lock);
return;
}
- if (m_stopping_for_resync) {
+ bool delete_requested = false;
+ if (m_delete_requested && !m_local_image_id.empty()) {
+ assert(m_remote_image.image_id.empty());
+ dout(0) << "remote image no longer exists: scheduling deletion" << dendl;
+ delete_requested = true;
+ }
+ if (delete_requested || m_resync_requested) {
m_image_deleter->schedule_image_delete(m_local,
m_local_pool_id,
m_global_image_id,
- true);
- m_stopping_for_resync = false;
+ m_resync_requested);
+
+ m_local_image_id = "";
+ m_resync_requested = false;
+ if (m_delete_requested) {
+ unregister_asok_hook = true;
+ m_delete_requested = false;
+ }
+ } else if (m_last_r == -ENOENT &&
+ m_local_image_id.empty() && m_remote_image.image_id.empty()) {
+ dout(0) << "mirror image no longer exists" << dendl;
+ unregister_asok_hook = true;
+ m_finished = true;
}
}
+ if (unregister_asok_hook) {
+ unregister_admin_socket_hook();
+ }
+
dout(20) << "stop complete" << dendl;
m_local_ioctx.close();
void ImageReplayer<I>::resync_image(Context *on_finish) {
dout(20) << dendl;
+ m_resync_requested = true;
+ stop(on_finish);
+}
+
+template <typename I>
+void ImageReplayer<I>::register_admin_socket_hook() {
+ ImageReplayerAdminSocketHook<I> *asok_hook;
{
- Mutex::Locker l(m_lock);
- m_stopping_for_resync = true;
+ Mutex::Locker locker(m_lock);
+ if (m_asok_hook != nullptr) {
+ return;
+ }
+
+ dout(15) << "registered asok hook: " << m_name << dendl;
+ asok_hook = new ImageReplayerAdminSocketHook<I>(g_ceph_context, m_name,
+ this);
+ int r = asok_hook->register_commands();
+ if (r == 0) {
+ m_asok_hook = asok_hook;
+ return;
+ }
+ derr << "error registering admin socket commands" << dendl;
}
+ delete asok_hook;
+}
- stop(on_finish);
+template <typename I>
+void ImageReplayer<I>::unregister_admin_socket_hook() {
+ dout(15) << dendl;
+
+ AdminSocketHook *asok_hook = nullptr;
+ {
+ Mutex::Locker locker(m_lock);
+ std::swap(asok_hook, m_asok_hook);
+ }
+ delete asok_hook;
+}
+
+template <typename I>
+void ImageReplayer<I>::reregister_admin_socket_hook() {
+ {
+ Mutex::Locker locker(m_lock);
+ auto name = m_local_ioctx.get_pool_name() + "/" + m_local_image_name;
+ if (m_name == name) {
+ return;
+ }
+ m_name = name;
+ }
+ unregister_admin_socket_hook();
+ register_admin_socket_hook();
}
template <typename I>