#include "common/errno.h"
#include "librbd/ImageCtx.h"
#include "Mirror.h"
+#include "ServiceDaemon.h"
#include "Threads.h"
-#include "ImageSync.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rbd_mirror
{
cct->lookup_or_create_singleton_object<Threads<librbd::ImageCtx> >(
m_threads, "rbd_mirror::threads");
+ m_service_daemon.reset(new ServiceDaemon<>(m_cct, m_local, m_threads));
}
Mirror::~Mirror()
return r;
}
- m_local_cluster_watcher.reset(new ClusterWatcher(m_local, m_lock));
+ r = m_service_daemon->init();
+ if (r < 0) {
+ derr << "error registering service daemon: " << cpp_strerror(r) << dendl;
+ return r;
+ }
- m_image_deleter.reset(new ImageDeleter(m_threads->work_queue,
- m_threads->timer,
- &m_threads->timer_lock));
+ m_local_cluster_watcher.reset(new ClusterWatcher(m_local, m_lock,
+ m_service_daemon.get()));
+ m_image_deleter.reset(new ImageDeleter<>(m_threads->work_queue,
+ m_threads->timer,
+ &m_threads->timer_lock,
+ m_service_daemon.get()));
return r;
}
for (auto it = m_pool_replayers.begin(); it != m_pool_replayers.end();) {
auto &peer = it->first.second;
auto pool_peer_it = pool_peers.find(it->first.first);
- if (it->second->is_blacklisted()) {
- derr << "removing blacklisted pool replayer for " << peer << dendl;
- // TODO: make async
- it = m_pool_replayers.erase(it);
- } else if (pool_peer_it == pool_peers.end() ||
- pool_peer_it->second.find(peer) == pool_peer_it->second.end()) {
+ if (pool_peer_it == pool_peers.end() ||
+ pool_peer_it->second.find(peer) == pool_peer_it->second.end()) {
dout(20) << "removing pool replayer for " << peer << dendl;
// TODO: make async
+ it->second->shut_down();
it = m_pool_replayers.erase(it);
} else {
++it;
for (auto &kv : pool_peers) {
for (auto &peer : kv.second) {
PoolPeer pool_peer(kv.first, peer);
- if (m_pool_replayers.find(pool_peer) == m_pool_replayers.end()) {
+
+ auto pool_replayers_it = m_pool_replayers.find(pool_peer);
+ if (pool_replayers_it != m_pool_replayers.end()) {
+ auto& pool_replayer = pool_replayers_it->second;
+ if (pool_replayer->is_blacklisted()) {
+ derr << "restarting blacklisted pool replayer for " << peer << dendl;
+ // TODO: make async
+ pool_replayer->shut_down();
+ pool_replayer->init();
+ } else if (!pool_replayer->is_running()) {
+ derr << "restarting failed pool replayer for " << peer << dendl;
+ // TODO: make async
+ pool_replayer->shut_down();
+ pool_replayer->init();
+ }
+ } else {
dout(20) << "starting pool replayer for " << peer << dendl;
unique_ptr<PoolReplayer> pool_replayer(new PoolReplayer(
- m_threads, m_image_deleter, kv.first, peer, m_args));
+ m_threads, m_service_daemon.get(), m_image_deleter.get(), kv.first,
+ peer, m_args));
- // TODO: make async, and retry connecting within pool replayer
- int r = pool_replayer->init();
- if (r < 0) {
- continue;
- }
+ // TODO: make async
+ pool_replayer->init();
m_pool_replayers.emplace(pool_peer, std::move(pool_replayer));
}
}