#include "InstanceReplayer.h"
#include "InstanceWatcher.h"
#include "LeaderWatcher.h"
+#include "ServiceDaemon.h"
#include "Threads.h"
#define dout_context g_ceph_context
namespace {
+const std::string SERVICE_DAEMON_LEADER_KEY("leader");
+const std::string SERVICE_DAEMON_LOCAL_COUNT_KEY("image_local_count");
+const std::string SERVICE_DAEMON_REMOTE_COUNT_KEY("image_remote_count");
+
class PoolReplayerAdminSocketCommand {
public:
PoolReplayerAdminSocketCommand(PoolReplayer *pool_replayer)
} // anonymous namespace
PoolReplayer::PoolReplayer(Threads<librbd::ImageCtx> *threads,
- std::shared_ptr<ImageDeleter> image_deleter,
+ ServiceDaemon<librbd::ImageCtx>* service_daemon,
+ ImageDeleter<>* image_deleter,
int64_t local_pool_id, const peer_t &peer,
const std::vector<const char*> &args) :
m_threads(threads),
+ m_service_daemon(service_daemon),
m_image_deleter(image_deleter),
- m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer)),
+ m_local_pool_id(local_pool_id),
m_peer(peer),
m_args(args),
- m_local_pool_id(local_pool_id),
+ m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer)),
m_local_pool_watcher_listener(this, true),
m_remote_pool_watcher_listener(this, false),
- m_asok_hook(nullptr),
m_pool_replayer_thread(this),
m_leader_listener(this)
{
PoolReplayer::~PoolReplayer()
{
delete m_asok_hook;
-
- m_stopping = true;
- {
- Mutex::Locker l(m_lock);
- m_cond.Signal();
- }
- if (m_pool_replayer_thread.is_started()) {
- m_pool_replayer_thread.join();
- }
- if (m_leader_watcher) {
- m_leader_watcher->shut_down();
- }
- if (m_instance_watcher) {
- m_instance_watcher->shut_down();
- }
- if (m_instance_replayer) {
- m_instance_replayer->shut_down();
- }
-
- assert(!m_local_pool_watcher);
- assert(!m_remote_pool_watcher);
+ shut_down();
}
bool PoolReplayer::is_blacklisted() const {
return m_leader_watcher && m_leader_watcher->is_leader();
}
-int PoolReplayer::init()
+bool PoolReplayer::is_running() const {
+ return m_pool_replayer_thread.is_started();
+}
+
+void PoolReplayer::init()
{
- dout(20) << "replaying for " << m_peer << dendl;
+ assert(!m_pool_replayer_thread.is_started());
+ // reset state
+ m_stopping = false;
+ m_blacklisted = false;
+
+ dout(20) << "replaying for " << m_peer << dendl;
int r = init_rados(g_ceph_context->_conf->cluster,
g_ceph_context->_conf->name.to_str(),
"local cluster", &m_local_rados);
if (r < 0) {
- return r;
+ m_callout_id = m_service_daemon->add_or_update_callout(
+ m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
+ "unable to connect to local cluster");
+ return;
}
r = init_rados(m_peer.cluster_name, m_peer.client_name,
std::string("remote peer ") + stringify(m_peer),
&m_remote_rados);
if (r < 0) {
- return r;
+ m_callout_id = m_service_daemon->add_or_update_callout(
+ m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
+ "unable to connect to remote cluster");
+ return;
}
r = m_local_rados->ioctx_create2(m_local_pool_id, m_local_io_ctx);
if (r < 0) {
derr << "error accessing local pool " << m_local_pool_id << ": "
<< cpp_strerror(r) << dendl;
- return r;
+ return;
}
std::string local_mirror_uuid;
if (r < 0) {
derr << "failed to retrieve local mirror uuid from pool "
<< m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
- return r;
+ m_callout_id = m_service_daemon->add_or_update_callout(
+ m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
+ "unable to query local mirror uuid");
+ return;
}
r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(),
if (r < 0) {
derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name()
<< ": " << cpp_strerror(r) << dendl;
- return r;
+ m_callout_id = m_service_daemon->add_or_update_callout(
+ m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_WARNING,
+ "unable to access remote pool");
+ return;
}
dout(20) << "connected to " << m_peer << dendl;
- m_instance_replayer.reset(
- InstanceReplayer<>::create(m_threads, m_image_deleter, m_local_rados,
- local_mirror_uuid, m_local_pool_id));
+ m_instance_replayer.reset(InstanceReplayer<>::create(
+ m_threads, m_service_daemon, m_image_deleter, m_local_rados,
+ local_mirror_uuid, m_local_pool_id));
m_instance_replayer->init();
m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx);
- m_instance_watcher.reset(InstanceWatcher<>::create(m_local_io_ctx,
- m_threads->work_queue,
- m_instance_replayer.get()));
+ m_instance_watcher.reset(InstanceWatcher<>::create(
+ m_local_io_ctx, m_threads->work_queue, m_instance_replayer.get()));
r = m_instance_watcher->init();
if (r < 0) {
derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl;
- return r;
+ m_callout_id = m_service_daemon->add_or_update_callout(
+ m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
+ "unable to initialize instance messenger object");
+ return;
}
m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
&m_leader_listener));
-
r = m_leader_watcher->init();
if (r < 0) {
derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl;
- return r;
+ m_callout_id = m_service_daemon->add_or_update_callout(
+ m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
+ "unable to initialize leader messenger object");
+ return;
+ }
+
+ if (m_callout_id != service_daemon::CALLOUT_ID_NONE) {
+ m_service_daemon->remove_callout(m_local_pool_id, m_callout_id);
+ m_callout_id = service_daemon::CALLOUT_ID_NONE;
}
m_pool_replayer_thread.create("pool replayer");
+}
- return 0;
+void PoolReplayer::shut_down() {
+ m_stopping = true;
+ {
+ Mutex::Locker l(m_lock);
+ m_cond.Signal();
+ }
+ if (m_pool_replayer_thread.is_started()) {
+ m_pool_replayer_thread.join();
+ }
+ if (m_leader_watcher) {
+ m_leader_watcher->shut_down();
+ m_leader_watcher.reset();
+ }
+ if (m_instance_watcher) {
+ m_instance_watcher->shut_down();
+ m_instance_watcher.reset();
+ }
+ if (m_instance_replayer) {
+ m_instance_replayer->shut_down();
+ m_instance_replayer.reset();
+ }
+
+ assert(!m_local_pool_watcher);
+ assert(!m_remote_pool_watcher);
+ m_local_rados.reset();
+ m_remote_rados.reset();
}
int PoolReplayer::init_rados(const std::string &cluster_name,
break;
}
- m_cond.WaitInterval(m_lock, utime_t(1, 0));
+ if (!m_stopping) {
+ m_cond.WaitInterval(m_lock, utime_t(1, 0));
+ }
}
}
f->dump_string("local_cluster_admin_socket",
reinterpret_cast<CephContext *>(m_local_io_ctx.cct())->_conf->
- admin_socket);
+ get_val<std::string>("admin_socket"));
f->dump_string("remote_cluster_admin_socket",
reinterpret_cast<CephContext *>(m_remote_io_ctx.cct())->_conf->
- admin_socket);
+ get_val<std::string>("admin_socket"));
f->open_object_section("sync_throttler");
m_instance_watcher->print_sync_status(f, ss);
return;
}
- if (m_initial_mirror_image_ids.find(mirror_uuid) ==
- m_initial_mirror_image_ids.end() &&
- m_initial_mirror_image_ids.size() < 2) {
- m_initial_mirror_image_ids[mirror_uuid] = added_image_ids;
-
- if (m_initial_mirror_image_ids.size() == 2) {
- dout(10) << "local and remote pools refreshed" << dendl;
-
- // both local and remote initial pool listing received. derive
- // removal notifications for the remote pool
- auto &local_image_ids = m_initial_mirror_image_ids.begin()->second;
- auto &remote_image_ids = m_initial_mirror_image_ids.rbegin()->second;
- for (auto &local_image_id : local_image_ids) {
- if (remote_image_ids.find(local_image_id) == remote_image_ids.end()) {
- removed_image_ids.emplace(local_image_id.global_id, "");
- }
- }
- local_image_ids.clear();
- remote_image_ids.clear();
- }
- }
-
- if (!mirror_uuid.empty() && m_peer.uuid != mirror_uuid) {
- m_instance_replayer->remove_peer(m_peer.uuid);
- m_instance_replayer->add_peer(mirror_uuid, m_remote_io_ctx);
- m_peer.uuid = mirror_uuid;
+ m_service_daemon->add_or_update_attribute(
+ m_local_pool_id, SERVICE_DAEMON_LOCAL_COUNT_KEY,
+ m_local_pool_watcher->get_image_count());
+ if (m_remote_pool_watcher) {
+ m_service_daemon->add_or_update_attribute(
+ m_local_pool_id, SERVICE_DAEMON_REMOTE_COUNT_KEY,
+ m_remote_pool_watcher->get_image_count());
}
m_update_op_tracker.start_op();
// for now always send to myself (the leader)
std::string &instance_id = m_instance_watcher->get_instance_id();
m_instance_watcher->notify_image_acquire(instance_id, image_id.global_id,
- mirror_uuid, image_id.id,
gather_ctx->new_sub());
}
- for (auto &image_id : removed_image_ids) {
- // for now always send to myself (the leader)
- std::string &instance_id = m_instance_watcher->get_instance_id();
- m_instance_watcher->notify_image_release(instance_id, image_id.global_id,
- mirror_uuid, image_id.id, true,
- gather_ctx->new_sub());
+ if (!mirror_uuid.empty()) {
+ for (auto &image_id : removed_image_ids) {
+ // for now always send to myself (the leader)
+ std::string &instance_id = m_instance_watcher->get_instance_id();
+ m_instance_watcher->notify_peer_image_removed(instance_id,
+ image_id.global_id,
+ mirror_uuid,
+ gather_ctx->new_sub());
+ }
}
gather_ctx->activate();
void PoolReplayer::handle_post_acquire_leader(Context *on_finish) {
dout(20) << dendl;
+ m_service_daemon->add_or_update_attribute(m_local_pool_id,
+ SERVICE_DAEMON_LEADER_KEY, true);
m_instance_watcher->handle_acquire_leader();
init_local_pool_watcher(on_finish);
}
void PoolReplayer::handle_pre_release_leader(Context *on_finish) {
dout(20) << dendl;
+ m_service_daemon->remove_attribute(m_local_pool_id, SERVICE_DAEMON_LEADER_KEY);
m_instance_watcher->handle_release_leader();
shut_down_pool_watchers(on_finish);
}
assert(!m_local_pool_watcher);
m_local_pool_watcher.reset(new PoolWatcher<>(
m_threads, m_local_io_ctx, m_local_pool_watcher_listener));
- m_initial_mirror_image_ids.clear();
// ensure the initial set of local images is up-to-date
// after acquiring the leader role