#include "librbd/internal.h"
#include "librbd/Utils.h"
#include "librbd/Watcher.h"
+#include "librbd/api/Config.h"
#include "librbd/api/Mirror.h"
+#include "ImageMap.h"
#include "InstanceReplayer.h"
#include "InstanceWatcher.h"
#include "LeaderWatcher.h"
namespace rbd {
namespace mirror {
+using ::operator<<;
+
namespace {
+const std::string SERVICE_DAEMON_INSTANCE_ID_KEY("instance_id");
const std::string SERVICE_DAEMON_LEADER_KEY("leader");
const std::string SERVICE_DAEMON_LOCAL_COUNT_KEY("image_local_count");
const std::string SERVICE_DAEMON_REMOTE_COUNT_KEY("image_remote_count");
const std::vector<std::string> UNIQUE_PEER_CONFIG_KEYS {
{"monmap", "mon_host", "mon_dns_srv_name", "key", "keyfile", "keyring"}};
+template <typename I>
class PoolReplayerAdminSocketCommand {
public:
- PoolReplayerAdminSocketCommand(PoolReplayer *pool_replayer)
+ PoolReplayerAdminSocketCommand(PoolReplayer<I> *pool_replayer)
: pool_replayer(pool_replayer) {
}
virtual ~PoolReplayerAdminSocketCommand() {}
virtual bool call(Formatter *f, stringstream *ss) = 0;
protected:
- PoolReplayer *pool_replayer;
+ PoolReplayer<I> *pool_replayer;
};
-class StatusCommand : public PoolReplayerAdminSocketCommand {
+template <typename I>
+class StatusCommand : public PoolReplayerAdminSocketCommand<I> {
public:
- explicit StatusCommand(PoolReplayer *pool_replayer)
- : PoolReplayerAdminSocketCommand(pool_replayer) {
+ explicit StatusCommand(PoolReplayer<I> *pool_replayer)
+ : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
}
bool call(Formatter *f, stringstream *ss) override {
- pool_replayer->print_status(f, ss);
+ this->pool_replayer->print_status(f, ss);
return true;
}
};
-class StartCommand : public PoolReplayerAdminSocketCommand {
+template <typename I>
+class StartCommand : public PoolReplayerAdminSocketCommand<I> {
public:
- explicit StartCommand(PoolReplayer *pool_replayer)
- : PoolReplayerAdminSocketCommand(pool_replayer) {
+ explicit StartCommand(PoolReplayer<I> *pool_replayer)
+ : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
}
bool call(Formatter *f, stringstream *ss) override {
- pool_replayer->start();
+ this->pool_replayer->start();
return true;
}
};
-class StopCommand : public PoolReplayerAdminSocketCommand {
+template <typename I>
+class StopCommand : public PoolReplayerAdminSocketCommand<I> {
public:
- explicit StopCommand(PoolReplayer *pool_replayer)
- : PoolReplayerAdminSocketCommand(pool_replayer) {
+ explicit StopCommand(PoolReplayer<I> *pool_replayer)
+ : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
}
bool call(Formatter *f, stringstream *ss) override {
- pool_replayer->stop(true);
+ this->pool_replayer->stop(true);
return true;
}
};
-class RestartCommand : public PoolReplayerAdminSocketCommand {
+template <typename I>
+class RestartCommand : public PoolReplayerAdminSocketCommand<I> {
public:
- explicit RestartCommand(PoolReplayer *pool_replayer)
- : PoolReplayerAdminSocketCommand(pool_replayer) {
+ explicit RestartCommand(PoolReplayer<I> *pool_replayer)
+ : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
}
bool call(Formatter *f, stringstream *ss) override {
- pool_replayer->restart();
+ this->pool_replayer->restart();
return true;
}
};
-class FlushCommand : public PoolReplayerAdminSocketCommand {
+template <typename I>
+class FlushCommand : public PoolReplayerAdminSocketCommand<I> {
public:
- explicit FlushCommand(PoolReplayer *pool_replayer)
- : PoolReplayerAdminSocketCommand(pool_replayer) {
+ explicit FlushCommand(PoolReplayer<I> *pool_replayer)
+ : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
}
bool call(Formatter *f, stringstream *ss) override {
- pool_replayer->flush();
+ this->pool_replayer->flush();
return true;
}
};
-class LeaderReleaseCommand : public PoolReplayerAdminSocketCommand {
+template <typename I>
+class LeaderReleaseCommand : public PoolReplayerAdminSocketCommand<I> {
public:
- explicit LeaderReleaseCommand(PoolReplayer *pool_replayer)
- : PoolReplayerAdminSocketCommand(pool_replayer) {
+ explicit LeaderReleaseCommand(PoolReplayer<I> *pool_replayer)
+ : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
}
bool call(Formatter *f, stringstream *ss) override {
- pool_replayer->release_leader();
+ this->pool_replayer->release_leader();
return true;
}
};
+template <typename I>
class PoolReplayerAdminSocketHook : public AdminSocketHook {
public:
PoolReplayerAdminSocketHook(CephContext *cct, const std::string &name,
- PoolReplayer *pool_replayer)
+ PoolReplayer<I> *pool_replayer)
: admin_socket(cct->get_admin_socket()) {
std::string command;
int r;
r = admin_socket->register_command(command, command, this,
"get status for rbd mirror " + name);
if (r == 0) {
- commands[command] = new StatusCommand(pool_replayer);
+ commands[command] = new StatusCommand<I>(pool_replayer);
}
command = "rbd mirror start " + name;
r = admin_socket->register_command(command, command, this,
"start rbd mirror " + name);
if (r == 0) {
- commands[command] = new StartCommand(pool_replayer);
+ commands[command] = new StartCommand<I>(pool_replayer);
}
command = "rbd mirror stop " + name;
r = admin_socket->register_command(command, command, this,
"stop rbd mirror " + name);
if (r == 0) {
- commands[command] = new StopCommand(pool_replayer);
+ commands[command] = new StopCommand<I>(pool_replayer);
}
command = "rbd mirror restart " + name;
r = admin_socket->register_command(command, command, this,
"restart rbd mirror " + name);
if (r == 0) {
- commands[command] = new RestartCommand(pool_replayer);
+ commands[command] = new RestartCommand<I>(pool_replayer);
}
command = "rbd mirror flush " + name;
r = admin_socket->register_command(command, command, this,
"flush rbd mirror " + name);
if (r == 0) {
- commands[command] = new FlushCommand(pool_replayer);
+ commands[command] = new FlushCommand<I>(pool_replayer);
}
command = "rbd mirror leader release " + name;
r = admin_socket->register_command(command, command, this,
"release rbd mirror leader " + name);
if (r == 0) {
- commands[command] = new LeaderReleaseCommand(pool_replayer);
+ commands[command] = new LeaderReleaseCommand<I>(pool_replayer);
}
}
~PoolReplayerAdminSocketHook() override {
- for (Commands::const_iterator i = commands.begin(); i != commands.end();
- ++i) {
+ for (auto i = commands.begin(); i != commands.end(); ++i) {
(void)admin_socket->unregister_command(i->first);
delete i->second;
}
}
- bool call(std::string command, cmdmap_t& cmdmap, std::string format,
- bufferlist& out) override {
- Commands::const_iterator i = commands.find(command);
- assert(i != commands.end());
+ bool call(std::string_view command, const cmdmap_t& cmdmap,
+ std::string_view format, bufferlist& out) override {
+ auto i = commands.find(command);
+ ceph_assert(i != commands.end());
Formatter *f = Formatter::create(format);
stringstream ss;
bool r = i->second->call(f, &ss);
}
private:
- typedef std::map<std::string, PoolReplayerAdminSocketCommand*> Commands;
+ typedef std::map<std::string, PoolReplayerAdminSocketCommand<I>*,
+ std::less<>> Commands;
AdminSocket *admin_socket;
Commands commands;
} // anonymous namespace
-PoolReplayer::PoolReplayer(Threads<librbd::ImageCtx> *threads,
- ServiceDaemon<librbd::ImageCtx>* service_daemon,
- ImageDeleter<>* image_deleter,
- int64_t local_pool_id, const peer_t &peer,
- const std::vector<const char*> &args) :
+template <typename I>
+PoolReplayer<I>::PoolReplayer(Threads<I> *threads,
+ ServiceDaemon<I>* service_daemon,
+ int64_t local_pool_id, const PeerSpec &peer,
+ const std::vector<const char*> &args) :
m_threads(threads),
m_service_daemon(service_daemon),
- m_image_deleter(image_deleter),
m_local_pool_id(local_pool_id),
m_peer(peer),
m_args(args),
m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer)),
m_local_pool_watcher_listener(this, true),
m_remote_pool_watcher_listener(this, false),
+ m_image_map_listener(this),
m_pool_replayer_thread(this),
m_leader_listener(this)
{
}
-PoolReplayer::~PoolReplayer()
+template <typename I>
+PoolReplayer<I>::~PoolReplayer()
{
delete m_asok_hook;
shut_down();
}
-bool PoolReplayer::is_blacklisted() const {
+template <typename I>
+bool PoolReplayer<I>::is_blacklisted() const {
Mutex::Locker locker(m_lock);
return m_blacklisted;
}
-bool PoolReplayer::is_leader() const {
+template <typename I>
+bool PoolReplayer<I>::is_leader() const {
Mutex::Locker locker(m_lock);
return m_leader_watcher && m_leader_watcher->is_leader();
}
-bool PoolReplayer::is_running() const {
+template <typename I>
+bool PoolReplayer<I>::is_running() const {
return m_pool_replayer_thread.is_started();
}
-void PoolReplayer::init()
+template <typename I>
+void PoolReplayer<I>::init()
{
- assert(!m_pool_replayer_thread.is_started());
+ ceph_assert(!m_pool_replayer_thread.is_started());
// reset state
m_stopping = false;
m_blacklisted = false;
- dout(20) << "replaying for " << m_peer << dendl;
+ dout(10) << "replaying for " << m_peer << dendl;
int r = init_rados(g_ceph_context->_conf->cluster,
g_ceph_context->_conf->name.to_str(),
- "local cluster", &m_local_rados, false);
+ "", "", "local cluster", &m_local_rados, false);
if (r < 0) {
m_callout_id = m_service_daemon->add_or_update_callout(
m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
}
r = init_rados(m_peer.cluster_name, m_peer.client_name,
+ m_peer.mon_host, m_peer.key,
std::string("remote peer ") + stringify(m_peer),
&m_remote_rados, true);
if (r < 0) {
return;
}
+ auto cct = reinterpret_cast<CephContext *>(m_local_io_ctx.cct());
+ librbd::api::Config<I>::apply_pool_overrides(m_local_io_ctx, &cct->_conf);
+
std::string local_mirror_uuid;
r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx,
&local_mirror_uuid);
return;
}
- dout(20) << "connected to " << m_peer << dendl;
+ dout(10) << "connected to " << m_peer << dendl;
- m_instance_replayer.reset(InstanceReplayer<>::create(
- m_threads, m_service_daemon, m_image_deleter, m_local_rados,
- local_mirror_uuid, m_local_pool_id));
+ m_instance_replayer.reset(InstanceReplayer<I>::create(
+ m_threads, m_service_daemon, m_local_rados, local_mirror_uuid,
+ m_local_pool_id));
m_instance_replayer->init();
m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx);
- m_instance_watcher.reset(InstanceWatcher<>::create(
+ m_instance_watcher.reset(InstanceWatcher<I>::create(
m_local_io_ctx, m_threads->work_queue, m_instance_replayer.get()));
r = m_instance_watcher->init();
if (r < 0) {
"unable to initialize instance messenger object");
return;
}
+ m_service_daemon->add_or_update_attribute(
+ m_local_pool_id, SERVICE_DAEMON_INSTANCE_ID_KEY,
+ m_instance_watcher->get_instance_id());
- m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
- &m_leader_listener));
+ m_leader_watcher.reset(LeaderWatcher<I>::create(m_threads, m_local_io_ctx,
+ &m_leader_listener));
r = m_leader_watcher->init();
if (r < 0) {
derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl;
m_pool_replayer_thread.create("pool replayer");
}
-void PoolReplayer::shut_down() {
+template <typename I>
+void PoolReplayer<I>::shut_down() {
m_stopping = true;
{
Mutex::Locker l(m_lock);
m_instance_watcher.reset();
m_instance_replayer.reset();
- assert(!m_local_pool_watcher);
- assert(!m_remote_pool_watcher);
+ ceph_assert(!m_image_map);
+ ceph_assert(!m_image_deleter);
+ ceph_assert(!m_local_pool_watcher);
+ ceph_assert(!m_remote_pool_watcher);
m_local_rados.reset();
m_remote_rados.reset();
}
-int PoolReplayer::init_rados(const std::string &cluster_name,
- const std::string &client_name,
- const std::string &description,
- RadosRef *rados_ref,
- bool strip_cluster_overrides) {
+template <typename I>
+int PoolReplayer<I>::init_rados(const std::string &cluster_name,
+ const std::string &client_name,
+ const std::string &mon_host,
+ const std::string &key,
+ const std::string &description,
+ RadosRef *rados_ref,
+ bool strip_cluster_overrides) {
rados_ref->reset(new librados::Rados());
// NOTE: manually bootstrap a CephContext here instead of via
cct->_conf->cluster = cluster_name;
// librados::Rados::conf_read_file
- int r = cct->_conf->parse_config_files(nullptr, nullptr, 0);
- if (r < 0) {
+ int r = cct->_conf.parse_config_files(nullptr, nullptr, 0);
+ if (r < 0 && r != -ENOENT) {
derr << "could not read ceph conf for " << description << ": "
<< cpp_strerror(r) << dendl;
cct->put();
// remote peer connections shouldn't apply cluster-specific
// configuration settings
for (auto& key : UNIQUE_PEER_CONFIG_KEYS) {
- config_values[key] = cct->_conf->get_val<std::string>(key);
+ config_values[key] = cct->_conf.get_val<std::string>(key);
}
}
- cct->_conf->parse_env();
+ cct->_conf.parse_env(cct->get_module_type());
// librados::Rados::conf_parse_env
std::vector<const char*> args;
- env_to_vec(args, nullptr);
- r = cct->_conf->parse_argv(args);
+ r = cct->_conf.parse_argv(args);
if (r < 0) {
derr << "could not parse environment for " << description << ":"
<< cpp_strerror(r) << dendl;
cct->put();
return r;
}
+ cct->_conf.parse_env(cct->get_module_type());
if (!m_args.empty()) {
// librados::Rados::conf_parse_argv
args = m_args;
- r = cct->_conf->parse_argv(args);
+ r = cct->_conf.parse_argv(args);
if (r < 0) {
derr << "could not parse command line args for " << description << ": "
<< cpp_strerror(r) << dendl;
// remote peer connections shouldn't apply cluster-specific
// configuration settings
for (auto& pair : config_values) {
- auto value = cct->_conf->get_val<std::string>(pair.first);
+ auto value = cct->_conf.get_val<std::string>(pair.first);
if (pair.second != value) {
dout(0) << "reverting global config option override: "
<< pair.first << ": " << value << " -> " << pair.second
<< dendl;
- cct->_conf->set_val_or_die(pair.first, pair.second);
+ cct->_conf.set_val_or_die(pair.first, pair.second);
}
}
}
if (!g_ceph_context->_conf->admin_socket.empty()) {
- cct->_conf->set_val_or_die("admin_socket",
+ cct->_conf.set_val_or_die("admin_socket",
"$run_dir/$name.$pid.$cluster.$cctid.asok");
}
+ if (!mon_host.empty()) {
+ r = cct->_conf.set_val("mon_host", mon_host);
+ if (r < 0) {
+ derr << "failed to set mon_host config for " << description << ": "
+ << cpp_strerror(r) << dendl;
+ cct->put();
+ return r;
+ }
+ }
+
+ if (!key.empty()) {
+ r = cct->_conf.set_val("key", key);
+ if (r < 0) {
+ derr << "failed to set key config for " << description << ": "
+ << cpp_strerror(r) << dendl;
+ cct->put();
+ return r;
+ }
+ }
+
// disable unnecessary librbd cache
- cct->_conf->set_val_or_die("rbd_cache", "false");
- cct->_conf->apply_changes(nullptr);
- cct->_conf->complain_about_parse_errors(cct);
+ cct->_conf.set_val_or_die("rbd_cache", "false");
+ cct->_conf.apply_changes(nullptr);
+ cct->_conf.complain_about_parse_errors(cct);
r = (*rados_ref)->init_with_context(cct);
- assert(r == 0);
+ ceph_assert(r == 0);
cct->put();
r = (*rados_ref)->connect();
return 0;
}
-void PoolReplayer::run()
+template <typename I>
+void PoolReplayer<I>::run()
{
dout(20) << "enter" << dendl;
m_asok_hook_name = asok_hook_name;
delete m_asok_hook;
- m_asok_hook = new PoolReplayerAdminSocketHook(g_ceph_context,
- m_asok_hook_name, this);
+ m_asok_hook = new PoolReplayerAdminSocketHook<I>(g_ceph_context,
+ m_asok_hook_name, this);
}
Mutex::Locker locker(m_lock);
m_instance_replayer->stop();
}
-void PoolReplayer::print_status(Formatter *f, stringstream *ss)
+template <typename I>
+void PoolReplayer<I>::print_status(Formatter *f, stringstream *ss)
{
dout(20) << "enter" << dendl;
f->dump_stream("peer") << m_peer;
f->dump_string("instance_id", m_instance_watcher->get_instance_id());
+ std::string state("running");
+ if (m_manual_stop) {
+ state = "stopped (manual)";
+ } else if (m_stopping) {
+ state = "stopped";
+ }
+ f->dump_string("state", state);
+
std::string leader_instance_id;
m_leader_watcher->get_leader_instance_id(&leader_instance_id);
f->dump_string("leader_instance_id", leader_instance_id);
}
f->dump_string("local_cluster_admin_socket",
- reinterpret_cast<CephContext *>(m_local_io_ctx.cct())->_conf->
+ reinterpret_cast<CephContext *>(m_local_io_ctx.cct())->_conf.
get_val<std::string>("admin_socket"));
f->dump_string("remote_cluster_admin_socket",
- reinterpret_cast<CephContext *>(m_remote_io_ctx.cct())->_conf->
+ reinterpret_cast<CephContext *>(m_remote_io_ctx.cct())->_conf.
get_val<std::string>("admin_socket"));
f->open_object_section("sync_throttler");
m_instance_replayer->print_status(f, ss);
+ if (m_image_deleter) {
+ f->open_object_section("image_deleter");
+ m_image_deleter->print_status(f, ss);
+ f->close_section();
+ }
+
f->close_section();
f->flush(*ss);
}
-void PoolReplayer::start()
+template <typename I>
+void PoolReplayer<I>::start()
{
dout(20) << "enter" << dendl;
return;
}
+ m_manual_stop = false;
m_instance_replayer->start();
}
-void PoolReplayer::stop(bool manual)
+template <typename I>
+void PoolReplayer<I>::stop(bool manual)
{
dout(20) << "enter: manual=" << manual << dendl;
return;
}
+ m_manual_stop = true;
m_instance_replayer->stop();
}
-void PoolReplayer::restart()
+template <typename I>
+void PoolReplayer<I>::restart()
{
dout(20) << "enter" << dendl;
m_instance_replayer->restart();
}
-void PoolReplayer::flush()
+template <typename I>
+void PoolReplayer<I>::flush()
{
dout(20) << "enter" << dendl;
m_instance_replayer->flush();
}
-void PoolReplayer::release_leader()
+template <typename I>
+void PoolReplayer<I>::release_leader()
{
dout(20) << "enter" << dendl;
m_leader_watcher->release_leader();
}
-void PoolReplayer::handle_update(const std::string &mirror_uuid,
- ImageIds &&added_image_ids,
- ImageIds &&removed_image_ids) {
+template <typename I>
+void PoolReplayer<I>::handle_update(const std::string &mirror_uuid,
+ ImageIds &&added_image_ids,
+ ImageIds &&removed_image_ids) {
if (m_stopping) {
return;
}
m_remote_pool_watcher->get_image_count());
}
- m_update_op_tracker.start_op();
- Context *ctx = new FunctionContext([this](int r) {
- dout(20) << "complete handle_update: r=" << r << dendl;
- m_update_op_tracker.finish_op();
- });
-
- C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
-
- for (auto &image_id : added_image_ids) {
- // for now always send to myself (the leader)
- std::string &instance_id = m_instance_watcher->get_instance_id();
- m_instance_watcher->notify_image_acquire(instance_id, image_id.global_id,
- gather_ctx->new_sub());
+ std::set<std::string> added_global_image_ids;
+ for (auto& image_id : added_image_ids) {
+ added_global_image_ids.insert(image_id.global_id);
}
- if (!mirror_uuid.empty()) {
- for (auto &image_id : removed_image_ids) {
- // for now always send to myself (the leader)
- std::string &instance_id = m_instance_watcher->get_instance_id();
- m_instance_watcher->notify_peer_image_removed(instance_id,
- image_id.global_id,
- mirror_uuid,
- gather_ctx->new_sub());
- }
+ std::set<std::string> removed_global_image_ids;
+ for (auto& image_id : removed_image_ids) {
+ removed_global_image_ids.insert(image_id.global_id);
}
- gather_ctx->activate();
+ m_image_map->update_images(mirror_uuid,
+ std::move(added_global_image_ids),
+ std::move(removed_global_image_ids));
}
-void PoolReplayer::handle_post_acquire_leader(Context *on_finish) {
- dout(20) << dendl;
+template <typename I>
+void PoolReplayer<I>::handle_post_acquire_leader(Context *on_finish) {
+ dout(10) << dendl;
m_service_daemon->add_or_update_attribute(m_local_pool_id,
SERVICE_DAEMON_LEADER_KEY, true);
m_instance_watcher->handle_acquire_leader();
- init_local_pool_watcher(on_finish);
+ init_image_map(on_finish);
}
-void PoolReplayer::handle_pre_release_leader(Context *on_finish) {
- dout(20) << dendl;
+template <typename I>
+void PoolReplayer<I>::handle_pre_release_leader(Context *on_finish) {
+ dout(10) << dendl;
- m_service_daemon->remove_attribute(m_local_pool_id, SERVICE_DAEMON_LEADER_KEY);
+ m_service_daemon->remove_attribute(m_local_pool_id,
+ SERVICE_DAEMON_LEADER_KEY);
m_instance_watcher->handle_release_leader();
- shut_down_pool_watchers(on_finish);
+ shut_down_image_deleter(on_finish);
+}
+
+template <typename I>
+void PoolReplayer<I>::init_image_map(Context *on_finish) {
+ dout(5) << dendl;
+
+ Mutex::Locker locker(m_lock);
+ ceph_assert(!m_image_map);
+ m_image_map.reset(ImageMap<I>::create(m_local_io_ctx, m_threads,
+ m_instance_watcher->get_instance_id(),
+ m_image_map_listener));
+
+ auto ctx = new FunctionContext([this, on_finish](int r) {
+ handle_init_image_map(r, on_finish);
+ });
+ m_image_map->init(create_async_context_callback(
+ m_threads->work_queue, ctx));
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_init_image_map(int r, Context *on_finish) {
+ dout(5) << "r=" << r << dendl;
+ if (r < 0) {
+ derr << "failed to init image map: " << cpp_strerror(r) << dendl;
+ on_finish = new FunctionContext([on_finish, r](int) {
+ on_finish->complete(r);
+ });
+ shut_down_image_map(on_finish);
+ return;
+ }
+
+ init_local_pool_watcher(on_finish);
}
-void PoolReplayer::init_local_pool_watcher(Context *on_finish) {
- dout(20) << dendl;
+template <typename I>
+void PoolReplayer<I>::init_local_pool_watcher(Context *on_finish) {
+ dout(10) << dendl;
Mutex::Locker locker(m_lock);
- assert(!m_local_pool_watcher);
- m_local_pool_watcher.reset(new PoolWatcher<>(
+ ceph_assert(!m_local_pool_watcher);
+ m_local_pool_watcher.reset(PoolWatcher<I>::create(
m_threads, m_local_io_ctx, m_local_pool_watcher_listener));
// ensure the initial set of local images is up-to-date
m_threads->work_queue, ctx));
}
-void PoolReplayer::handle_init_local_pool_watcher(int r, Context *on_finish) {
- dout(20) << "r=" << r << dendl;
+template <typename I>
+void PoolReplayer<I>::handle_init_local_pool_watcher(
+ int r, Context *on_finish) {
+ dout(10) << "r=" << r << dendl;
if (r < 0) {
derr << "failed to retrieve local images: " << cpp_strerror(r) << dendl;
- on_finish->complete(r);
+ on_finish = new FunctionContext([on_finish, r](int) {
+ on_finish->complete(r);
+ });
+ shut_down_pool_watchers(on_finish);
return;
}
init_remote_pool_watcher(on_finish);
}
-void PoolReplayer::init_remote_pool_watcher(Context *on_finish) {
- dout(20) << dendl;
+template <typename I>
+void PoolReplayer<I>::init_remote_pool_watcher(Context *on_finish) {
+ dout(10) << dendl;
Mutex::Locker locker(m_lock);
- assert(!m_remote_pool_watcher);
- m_remote_pool_watcher.reset(new PoolWatcher<>(
+ ceph_assert(!m_remote_pool_watcher);
+ m_remote_pool_watcher.reset(PoolWatcher<I>::create(
m_threads, m_remote_io_ctx, m_remote_pool_watcher_listener));
+
+ auto ctx = new FunctionContext([this, on_finish](int r) {
+ handle_init_remote_pool_watcher(r, on_finish);
+ });
m_remote_pool_watcher->init(create_async_context_callback(
+ m_threads->work_queue, ctx));
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_init_remote_pool_watcher(
+ int r, Context *on_finish) {
+ dout(10) << "r=" << r << dendl;
+ if (r == -ENOENT) {
+ // Technically nothing to do since the other side doesn't
+ // have mirroring enabled. Eventually the remote pool watcher will
+ // detect images (if mirroring is enabled), so no point propagating
+ // an error which would just busy-spin the state machines.
+ dout(0) << "remote peer does not have mirroring configured" << dendl;
+ } else if (r < 0) {
+ derr << "failed to retrieve remote images: " << cpp_strerror(r) << dendl;
+ on_finish = new FunctionContext([on_finish, r](int) {
+ on_finish->complete(r);
+ });
+ shut_down_pool_watchers(on_finish);
+ return;
+ }
+
+ init_image_deleter(on_finish);
+}
+
+template <typename I>
+void PoolReplayer<I>::init_image_deleter(Context *on_finish) {
+ dout(10) << dendl;
+
+ Mutex::Locker locker(m_lock);
+ ceph_assert(!m_image_deleter);
+
+ on_finish = new FunctionContext([this, on_finish](int r) {
+ handle_init_image_deleter(r, on_finish);
+ });
+ m_image_deleter.reset(ImageDeleter<I>::create(m_local_io_ctx, m_threads,
+ m_service_daemon));
+ m_image_deleter->init(create_async_context_callback(
m_threads->work_queue, on_finish));
+}
+template <typename I>
+void PoolReplayer<I>::handle_init_image_deleter(int r, Context *on_finish) {
+ dout(10) << "r=" << r << dendl;
+ if (r < 0) {
+ derr << "failed to init image deleter: " << cpp_strerror(r) << dendl;
+ on_finish = new FunctionContext([on_finish, r](int) {
+ on_finish->complete(r);
+ });
+ shut_down_image_deleter(on_finish);
+ return;
+ }
+
+ on_finish->complete(0);
+
+ Mutex::Locker locker(m_lock);
m_cond.Signal();
}
-void PoolReplayer::shut_down_pool_watchers(Context *on_finish) {
- dout(20) << dendl;
+template <typename I>
+void PoolReplayer<I>::shut_down_image_deleter(Context* on_finish) {
+ dout(10) << dendl;
+ {
+ Mutex::Locker locker(m_lock);
+ if (m_image_deleter) {
+ Context *ctx = new FunctionContext([this, on_finish](int r) {
+ handle_shut_down_image_deleter(r, on_finish);
+ });
+ ctx = create_async_context_callback(m_threads->work_queue, ctx);
+
+ m_image_deleter->shut_down(ctx);
+ return;
+ }
+ }
+ shut_down_pool_watchers(on_finish);
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_shut_down_image_deleter(
+ int r, Context* on_finish) {
+ dout(10) << "r=" << r << dendl;
{
Mutex::Locker locker(m_lock);
- if (m_local_pool_watcher) {
+ ceph_assert(m_image_deleter);
+ m_image_deleter.reset();
+ }
+
+ shut_down_pool_watchers(on_finish);
+}
+
+template <typename I>
+void PoolReplayer<I>::shut_down_pool_watchers(Context *on_finish) {
+ dout(10) << dendl;
+
+ {
+ Mutex::Locker locker(m_lock);
+ if (m_local_pool_watcher) {
Context *ctx = new FunctionContext([this, on_finish](int r) {
handle_shut_down_pool_watchers(r, on_finish);
});
on_finish->complete(0);
}
-void PoolReplayer::handle_shut_down_pool_watchers(int r, Context *on_finish) {
- dout(20) << "r=" << r << dendl;
+template <typename I>
+void PoolReplayer<I>::handle_shut_down_pool_watchers(
+ int r, Context *on_finish) {
+ dout(10) << "r=" << r << dendl;
{
Mutex::Locker locker(m_lock);
- assert(m_local_pool_watcher);
+ ceph_assert(m_local_pool_watcher);
m_local_pool_watcher.reset();
if (m_remote_pool_watcher) {
wait_for_update_ops(on_finish);
}
-void PoolReplayer::wait_for_update_ops(Context *on_finish) {
- dout(20) << dendl;
+template <typename I>
+void PoolReplayer<I>::wait_for_update_ops(Context *on_finish) {
+ dout(10) << dendl;
Mutex::Locker locker(m_lock);
m_update_op_tracker.wait_for_ops(ctx);
}
-void PoolReplayer::handle_wait_for_update_ops(int r, Context *on_finish) {
- dout(20) << "r=" << r << dendl;
+template <typename I>
+void PoolReplayer<I>::handle_wait_for_update_ops(int r, Context *on_finish) {
+ dout(10) << "r=" << r << dendl;
+ ceph_assert(r == 0);
- assert(r == 0);
+ shut_down_image_map(on_finish);
+}
+
+template <typename I>
+void PoolReplayer<I>::shut_down_image_map(Context *on_finish) {
+ dout(5) << dendl;
+
+ {
+ Mutex::Locker locker(m_lock);
+ if (m_image_map) {
+ on_finish = new FunctionContext([this, on_finish](int r) {
+ handle_shut_down_image_map(r, on_finish);
+ });
+ m_image_map->shut_down(create_async_context_callback(
+ m_threads->work_queue, on_finish));
+ return;
+ }
+ }
+
+ on_finish->complete(0);
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_shut_down_image_map(int r, Context *on_finish) {
+ dout(5) << "r=" << r << dendl;
+ if (r < 0 && r != -EBLACKLISTED) {
+ derr << "failed to shut down image map: " << cpp_strerror(r) << dendl;
+ }
Mutex::Locker locker(m_lock);
+ ceph_assert(m_image_map);
+ m_image_map.reset();
+
m_instance_replayer->release_all(on_finish);
}
-void PoolReplayer::handle_update_leader(const std::string &leader_instance_id) {
- dout(20) << "leader_instance_id=" << leader_instance_id << dendl;
+template <typename I>
+void PoolReplayer<I>::handle_update_leader(
+ const std::string &leader_instance_id) {
+ dout(10) << "leader_instance_id=" << leader_instance_id << dendl;
m_instance_watcher->handle_update_leader(leader_instance_id);
}
+template <typename I>
+void PoolReplayer<I>::handle_acquire_image(const std::string &global_image_id,
+ const std::string &instance_id,
+ Context* on_finish) {
+ dout(5) << "global_image_id=" << global_image_id << ", "
+ << "instance_id=" << instance_id << dendl;
+
+ m_instance_watcher->notify_image_acquire(instance_id, global_image_id,
+ on_finish);
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_release_image(const std::string &global_image_id,
+ const std::string &instance_id,
+ Context* on_finish) {
+ dout(5) << "global_image_id=" << global_image_id << ", "
+ << "instance_id=" << instance_id << dendl;
+
+ m_instance_watcher->notify_image_release(instance_id, global_image_id,
+ on_finish);
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_remove_image(const std::string &mirror_uuid,
+ const std::string &global_image_id,
+ const std::string &instance_id,
+ Context* on_finish) {
+ ceph_assert(!mirror_uuid.empty());
+ dout(5) << "mirror_uuid=" << mirror_uuid << ", "
+ << "global_image_id=" << global_image_id << ", "
+ << "instance_id=" << instance_id << dendl;
+
+ m_instance_watcher->notify_peer_image_removed(instance_id, global_image_id,
+ mirror_uuid, on_finish);
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_instances_added(const InstanceIds &instance_ids) {
+ dout(5) << "instance_ids=" << instance_ids << dendl;
+ Mutex::Locker locker(m_lock);
+ if (!m_leader_watcher->is_leader()) {
+ return;
+ }
+
+ ceph_assert(m_image_map);
+ m_image_map->update_instances_added(instance_ids);
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_instances_removed(
+ const InstanceIds &instance_ids) {
+ dout(5) << "instance_ids=" << instance_ids << dendl;
+ Mutex::Locker locker(m_lock);
+ if (!m_leader_watcher->is_leader()) {
+ return;
+ }
+
+ ceph_assert(m_image_map);
+ m_image_map->update_instances_removed(instance_ids);
+}
+
} // namespace mirror
} // namespace rbd
+
+template class rbd::mirror::PoolReplayer<librbd::ImageCtx>;