// vim: ts=8 sw=2 smarttab
#include "include/stringify.h"
+#include "common/Cond.h"
#include "common/Timer.h"
#include "common/debug.h"
#include "common/errno.h"
+#include "common/WorkQueue.h"
#include "librbd/Utils.h"
#include "ImageReplayer.h"
#include "InstanceReplayer.h"
template <typename I>
InstanceReplayer<I>::InstanceReplayer(
+ librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid,
Threads<I> *threads, ServiceDaemon<I>* service_daemon,
- ImageDeleter<I>* image_deleter, RadosRef local_rados,
- const std::string &local_mirror_uuid, int64_t local_pool_id)
- : m_threads(threads), m_service_daemon(service_daemon),
- m_image_deleter(image_deleter), m_local_rados(local_rados),
- m_local_mirror_uuid(local_mirror_uuid), m_local_pool_id(local_pool_id),
- m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id)) {
+ MirrorStatusUpdater<I>* local_status_updater,
+ journal::CacheManagerHandler *cache_manager_handler,
+ PoolMetaCache* pool_meta_cache)
+ : m_local_io_ctx(local_io_ctx), m_local_mirror_uuid(local_mirror_uuid),
+ m_threads(threads), m_service_daemon(service_daemon),
+ m_local_status_updater(local_status_updater),
+ m_cache_manager_handler(cache_manager_handler),
+ m_pool_meta_cache(pool_meta_cache),
+ m_lock(ceph::make_mutex("rbd::mirror::InstanceReplayer " +
+ stringify(local_io_ctx.get_id()))) {
}
template <typename I>
InstanceReplayer<I>::~InstanceReplayer() {
- assert(m_image_state_check_task == nullptr);
- assert(m_async_op_tracker.empty());
- assert(m_image_replayers.empty());
+ ceph_assert(m_image_state_check_task == nullptr);
+ ceph_assert(m_async_op_tracker.empty());
+ ceph_assert(m_image_replayers.empty());
+}
+
+template <typename I>
+bool InstanceReplayer<I>::is_blacklisted() const {
+ std::lock_guard locker{m_lock};
+ return m_blacklisted;
}
template <typename I>
template <typename I>
void InstanceReplayer<I>::init(Context *on_finish) {
- dout(20) << dendl;
+ dout(10) << dendl;
- Context *ctx = new FunctionContext(
+ Context *ctx = new LambdaContext(
[this, on_finish] (int r) {
{
- Mutex::Locker timer_locker(m_threads->timer_lock);
+ std::lock_guard timer_locker{m_threads->timer_lock};
schedule_image_state_check_task();
}
on_finish->complete(0);
C_SaferCond shut_down_ctx;
shut_down(&shut_down_ctx);
int r = shut_down_ctx.wait();
- assert(r == 0);
+ ceph_assert(r == 0);
}
template <typename I>
void InstanceReplayer<I>::shut_down(Context *on_finish) {
- dout(20) << dendl;
+ dout(10) << dendl;
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
- assert(m_on_shut_down == nullptr);
+ ceph_assert(m_on_shut_down == nullptr);
m_on_shut_down = on_finish;
- Context *ctx = new FunctionContext(
+ Context *ctx = new LambdaContext(
[this] (int r) {
cancel_image_state_check_task();
wait_for_ops();
}
template <typename I>
-void InstanceReplayer<I>::add_peer(std::string mirror_uuid,
- librados::IoCtx io_ctx) {
- dout(20) << mirror_uuid << dendl;
-
- Mutex::Locker locker(m_lock);
- auto result = m_peers.insert(Peer(mirror_uuid, io_ctx)).second;
- assert(result);
-}
-
-template <typename I>
-void InstanceReplayer<I>::remove_peer(std::string mirror_uuid) {
- dout(20) << mirror_uuid << dendl;
+void InstanceReplayer<I>::add_peer(const Peer<I>& peer) {
+ dout(10) << "peer=" << peer << dendl;
- Mutex::Locker locker(m_lock);
- auto result = m_peers.erase(Peer(mirror_uuid));
- assert(result > 0);
+ std::lock_guard locker{m_lock};
+ auto result = m_peers.insert(peer).second;
+ ceph_assert(result);
}
template <typename I>
void InstanceReplayer<I>::release_all(Context *on_finish) {
- dout(20) << dendl;
+ dout(10) << dendl;
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
C_Gather *gather_ctx = new C_Gather(g_ceph_context, on_finish);
for (auto it = m_image_replayers.begin(); it != m_image_replayers.end();
it = m_image_replayers.erase(it)) {
auto image_replayer = it->second;
auto ctx = gather_ctx->new_sub();
- ctx = new FunctionContext(
+ ctx = new LambdaContext(
[image_replayer, ctx] (int r) {
image_replayer->destroy();
ctx->complete(0);
template <typename I>
void InstanceReplayer<I>::acquire_image(InstanceWatcher<I> *instance_watcher,
const std::string &global_image_id,
- const std::string &peer_mirror_uuid,
- const std::string &peer_image_id,
Context *on_finish) {
- dout(20) << "global_image_id=" << global_image_id << ", peer_mirror_uuid="
- << peer_mirror_uuid << ", peer_image_id=" << peer_image_id << dendl;
+ dout(10) << "global_image_id=" << global_image_id << dendl;
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
- assert(m_on_shut_down == nullptr);
+ ceph_assert(m_on_shut_down == nullptr);
auto it = m_image_replayers.find(global_image_id);
-
if (it == m_image_replayers.end()) {
auto image_replayer = ImageReplayer<I>::create(
- m_threads, m_image_deleter, instance_watcher, m_local_rados,
- m_local_mirror_uuid, m_local_pool_id, global_image_id);
+ m_local_io_ctx, m_local_mirror_uuid, global_image_id,
+ m_threads, instance_watcher, m_local_status_updater,
+ m_cache_manager_handler, m_pool_meta_cache);
- dout(20) << global_image_id << ": creating replayer " << image_replayer
+ dout(10) << global_image_id << ": creating replayer " << image_replayer
<< dendl;
it = m_image_replayers.insert(std::make_pair(global_image_id,
image_replayer)).first;
- }
-
- auto image_replayer = it->second;
- if (!peer_mirror_uuid.empty()) {
- auto iter = m_peers.find(Peer(peer_mirror_uuid));
- assert(iter != m_peers.end());
- auto io_ctx = iter->io_ctx;
- image_replayer->add_remote_image(peer_mirror_uuid, peer_image_id, io_ctx);
+ // TODO only a single peer is currently supported
+ ceph_assert(m_peers.size() == 1);
+ auto peer = *m_peers.begin();
+ image_replayer->add_peer(peer);
+ start_image_replayer(image_replayer);
+ } else {
+ // A duplicate acquire notification implies (1) connection hiccup or
+ // (2) new leader election. For the second case, restart the replayer to
+ // detect if the image has been deleted while the leader was offline
+ auto& image_replayer = it->second;
+ image_replayer->set_finished(false);
+ image_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr));
}
- start_image_replayer(image_replayer);
m_threads->work_queue->queue(on_finish, 0);
}
template <typename I>
void InstanceReplayer<I>::release_image(const std::string &global_image_id,
- const std::string &peer_mirror_uuid,
- const std::string &peer_image_id,
- bool schedule_delete,
Context *on_finish) {
- dout(20) << "global_image_id=" << global_image_id << ", peer_mirror_uuid="
- << peer_mirror_uuid << ", peer_image_id=" << peer_image_id << dendl;
+ dout(10) << "global_image_id=" << global_image_id << dendl;
- Mutex::Locker locker(m_lock);
-
- assert(m_on_shut_down == nullptr);
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_on_shut_down == nullptr);
auto it = m_image_replayers.find(global_image_id);
-
if (it == m_image_replayers.end()) {
- dout(20) << global_image_id << ": not found" << dendl;
+ dout(5) << global_image_id << ": not found" << dendl;
m_threads->work_queue->queue(on_finish, 0);
return;
}
auto image_replayer = it->second;
- if (!peer_mirror_uuid.empty()) {
- image_replayer->remove_remote_image(peer_mirror_uuid, peer_image_id,
- schedule_delete);
- }
-
- if (!image_replayer->remote_images_empty()) {
- dout(20) << global_image_id << ": still has peer images" << dendl;
- m_threads->work_queue->queue(on_finish, 0);
- return;
- }
-
m_image_replayers.erase(it);
- on_finish = new FunctionContext(
+ on_finish = new LambdaContext(
[image_replayer, on_finish] (int r) {
image_replayer->destroy();
on_finish->complete(0);
});
-
- if (schedule_delete) {
- on_finish = new FunctionContext(
- [this, image_replayer, on_finish] (int r) {
- auto global_image_id = image_replayer->get_global_image_id();
- m_image_deleter->schedule_image_delete(
- m_local_rados, m_local_pool_id, global_image_id, false);
- on_finish->complete(0);
- });
- }
-
stop_image_replayer(image_replayer, on_finish);
}
template <typename I>
-void InstanceReplayer<I>::print_status(Formatter *f, stringstream *ss) {
- dout(20) << dendl;
+void InstanceReplayer<I>::remove_peer_image(const std::string &global_image_id,
+ const std::string &peer_mirror_uuid,
+ Context *on_finish) {
+ dout(10) << "global_image_id=" << global_image_id << ", "
+ << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
- if (!f) {
- return;
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_on_shut_down == nullptr);
+
+ auto it = m_image_replayers.find(global_image_id);
+ if (it != m_image_replayers.end()) {
+ // TODO only a single peer is currently supported, therefore
+ // we can just interrupt the current image replayer and
+ // it will eventually detect that the peer image is missing and
+ // determine if a delete propagation is required.
+ auto image_replayer = it->second;
+ image_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr));
}
+ m_threads->work_queue->queue(on_finish, 0);
+}
+
+template <typename I>
+void InstanceReplayer<I>::print_status(Formatter *f) {
+ dout(10) << dendl;
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
f->open_array_section("image_replayers");
for (auto &kv : m_image_replayers) {
auto &image_replayer = kv.second;
- image_replayer->print_status(f, ss);
+ image_replayer->print_status(f);
}
f->close_section();
}
template <typename I>
void InstanceReplayer<I>::start()
{
- dout(20) << dendl;
+ dout(10) << dendl;
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
m_manual_stop = false;
+ auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
+ auto gather_ctx = new C_Gather(
+ cct, new C_TrackedOp(m_async_op_tracker, nullptr));
for (auto &kv : m_image_replayers) {
auto &image_replayer = kv.second;
- image_replayer->start(nullptr, true);
+ image_replayer->start(gather_ctx->new_sub(), true);
}
+
+ gather_ctx->activate();
}
template <typename I>
void InstanceReplayer<I>::stop()
{
- dout(20) << dendl;
+ stop(nullptr);
+}
+
+template <typename I>
+void InstanceReplayer<I>::stop(Context *on_finish)
+{
+ dout(10) << dendl;
+
+ if (on_finish == nullptr) {
+ on_finish = new C_TrackedOp(m_async_op_tracker, on_finish);
+ } else {
+ on_finish = new LambdaContext(
+ [this, on_finish] (int r) {
+ m_async_op_tracker.wait_for_ops(on_finish);
+ });
+ }
- Mutex::Locker locker(m_lock);
+ auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
+ auto gather_ctx = new C_Gather(cct, on_finish);
+ {
+ std::lock_guard locker{m_lock};
- m_manual_stop = true;
+ m_manual_stop = true;
- for (auto &kv : m_image_replayers) {
- auto &image_replayer = kv.second;
- image_replayer->stop(nullptr, true);
+ for (auto &kv : m_image_replayers) {
+ auto &image_replayer = kv.second;
+ image_replayer->stop(gather_ctx->new_sub(), true);
+ }
}
+
+ gather_ctx->activate();
}
template <typename I>
void InstanceReplayer<I>::restart()
{
- dout(20) << dendl;
+ dout(10) << dendl;
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
m_manual_stop = false;
for (auto &kv : m_image_replayers) {
auto &image_replayer = kv.second;
- image_replayer->restart();
+ image_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr));
}
}
template <typename I>
void InstanceReplayer<I>::flush()
{
- dout(20) << "enter" << dendl;
+ dout(10) << dendl;
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
for (auto &kv : m_image_replayers) {
auto &image_replayer = kv.second;
template <typename I>
void InstanceReplayer<I>::start_image_replayer(
ImageReplayer<I> *image_replayer) {
- assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_lock));
std::string global_image_id = image_replayer->get_global_image_id();
- dout(20) << "global_image_id=" << global_image_id << dendl;
-
if (!image_replayer->is_stopped()) {
return;
} else if (image_replayer->is_blacklisted()) {
- derr << "blacklisted detected during image replay" << dendl;
+ derr << "global_image_id=" << global_image_id << ": blacklisted detected "
+ << "during image replay" << dendl;
+ m_blacklisted = true;
+ return;
+ } else if (image_replayer->is_finished()) {
+ // TODO temporary until policy integrated
+ dout(5) << "removing image replayer for global_image_id="
+ << global_image_id << dendl;
+ m_image_replayers.erase(image_replayer->get_global_image_id());
+ image_replayer->destroy();
+ return;
+ } else if (m_manual_stop) {
return;
}
- FunctionContext *ctx = new FunctionContext(
- [this, global_image_id] (int r) {
- dout(20) << "image deleter result: r=" << r << ", "
- << "global_image_id=" << global_image_id << dendl;
-
- Mutex::Locker locker(m_lock);
- m_async_op_tracker.finish_op();
-
- if (r == -ESTALE || r == -ECANCELED) {
- return;
- }
-
- auto it = m_image_replayers.find(global_image_id);
- if (it == m_image_replayers.end()) {
- return;
- }
-
- auto image_replayer = it->second;
- if (r >= 0) {
- image_replayer->start(nullptr, false);
- } else {
- start_image_replayer(image_replayer);
- }
- });
-
- m_async_op_tracker.start_op();
- m_image_deleter->wait_for_scheduled_deletion(
- m_local_pool_id, image_replayer->get_global_image_id(), ctx, false);
+ dout(10) << "global_image_id=" << global_image_id << dendl;
+ image_replayer->start(new C_TrackedOp(m_async_op_tracker, nullptr), false);
}
template <typename I>
void InstanceReplayer<I>::queue_start_image_replayers() {
- dout(20) << dendl;
+ dout(10) << dendl;
Context *ctx = create_context_callback<
InstanceReplayer, &InstanceReplayer<I>::start_image_replayers>(this);
template <typename I>
void InstanceReplayer<I>::start_image_replayers(int r) {
- dout(20) << dendl;
+ dout(10) << dendl;
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
if (m_on_shut_down != nullptr) {
return;
}
- size_t image_count = 0;
- size_t warning_count = 0;
- size_t error_count = 0;
- for (auto &it : m_image_replayers) {
+ uint64_t image_count = 0;
+ uint64_t warning_count = 0;
+ uint64_t error_count = 0;
+ for (auto it = m_image_replayers.begin();
+ it != m_image_replayers.end();) {
+ auto current_it(it);
+ ++it;
+
++image_count;
- auto health_state = it.second->get_health_state();
+ auto health_state = current_it->second->get_health_state();
if (health_state == image_replayer::HEALTH_STATE_WARNING) {
++warning_count;
} else if (health_state == image_replayer::HEALTH_STATE_ERROR) {
++error_count;
}
- start_image_replayer(it.second);
+ start_image_replayer(current_it->second);
}
- m_service_daemon->add_or_update_attribute(
- m_local_pool_id, SERVICE_DAEMON_ASSIGNED_COUNT_KEY, image_count);
- m_service_daemon->add_or_update_attribute(
- m_local_pool_id, SERVICE_DAEMON_WARNING_COUNT_KEY, warning_count);
- m_service_daemon->add_or_update_attribute(
- m_local_pool_id, SERVICE_DAEMON_ERROR_COUNT_KEY, error_count);
+ m_service_daemon->add_or_update_namespace_attribute(
+ m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
+ SERVICE_DAEMON_ASSIGNED_COUNT_KEY, image_count);
+ m_service_daemon->add_or_update_namespace_attribute(
+ m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
+ SERVICE_DAEMON_WARNING_COUNT_KEY, warning_count);
+ m_service_daemon->add_or_update_namespace_attribute(
+ m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
+ SERVICE_DAEMON_ERROR_COUNT_KEY, error_count);
m_async_op_tracker.finish_op();
}
template <typename I>
void InstanceReplayer<I>::stop_image_replayer(ImageReplayer<I> *image_replayer,
Context *on_finish) {
- dout(20) << image_replayer << " global_image_id="
+ dout(10) << image_replayer << " global_image_id="
<< image_replayer->get_global_image_id() << ", on_finish="
<< on_finish << dendl;
m_async_op_tracker.start_op();
Context *ctx = create_async_context_callback(
- m_threads->work_queue, new FunctionContext(
+ m_threads->work_queue, new LambdaContext(
[this, image_replayer, on_finish] (int r) {
stop_image_replayer(image_replayer, on_finish);
m_async_op_tracker.finish_op();
image_replayer->stop(ctx, false);
} else {
int after = 1;
- dout(20) << "scheduling image replayer " << image_replayer << " stop after "
+ dout(10) << "scheduling image replayer " << image_replayer << " stop after "
<< after << " sec (task " << ctx << ")" << dendl;
- ctx = new FunctionContext(
+ ctx = new LambdaContext(
[this, after, ctx] (int r) {
- Mutex::Locker timer_locker(m_threads->timer_lock);
+ std::lock_guard timer_locker{m_threads->timer_lock};
m_threads->timer->add_event_after(after, ctx);
});
m_threads->work_queue->queue(ctx, 0);
template <typename I>
void InstanceReplayer<I>::wait_for_ops() {
- dout(20) << dendl;
+ dout(10) << dendl;
Context *ctx = create_context_callback<
InstanceReplayer, &InstanceReplayer<I>::handle_wait_for_ops>(this);
template <typename I>
void InstanceReplayer<I>::handle_wait_for_ops(int r) {
- dout(20) << "r=" << r << dendl;
+ dout(10) << "r=" << r << dendl;
- assert(r == 0);
+ ceph_assert(r == 0);
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
stop_image_replayers();
}
template <typename I>
void InstanceReplayer<I>::stop_image_replayers() {
- dout(20) << dendl;
+ dout(10) << dendl;
- assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_lock));
Context *ctx = create_async_context_callback(
m_threads->work_queue, create_context_callback<InstanceReplayer<I>,
template <typename I>
void InstanceReplayer<I>::handle_stop_image_replayers(int r) {
- dout(20) << "r=" << r << dendl;
+ dout(10) << "r=" << r << dendl;
- assert(r == 0);
+ ceph_assert(r == 0);
Context *on_finish = nullptr;
{
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
for (auto &it : m_image_replayers) {
- assert(it.second->is_stopped());
+ ceph_assert(it.second->is_stopped());
it.second->destroy();
}
m_image_replayers.clear();
- assert(m_on_shut_down != nullptr);
+ ceph_assert(m_on_shut_down != nullptr);
std::swap(on_finish, m_on_shut_down);
}
on_finish->complete(r);
template <typename I>
void InstanceReplayer<I>::cancel_image_state_check_task() {
- Mutex::Locker timer_locker(m_threads->timer_lock);
+ std::lock_guard timer_locker{m_threads->timer_lock};
if (m_image_state_check_task == nullptr) {
return;
}
- dout(20) << m_image_state_check_task << dendl;
+ dout(10) << m_image_state_check_task << dendl;
bool canceled = m_threads->timer->cancel_event(m_image_state_check_task);
- assert(canceled);
+ ceph_assert(canceled);
m_image_state_check_task = nullptr;
}
template <typename I>
void InstanceReplayer<I>::schedule_image_state_check_task() {
- assert(m_threads->timer_lock.is_locked());
- assert(m_image_state_check_task == nullptr);
+ ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
+ ceph_assert(m_image_state_check_task == nullptr);
- m_image_state_check_task = new FunctionContext(
+ m_image_state_check_task = new LambdaContext(
[this](int r) {
- assert(m_threads->timer_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
m_image_state_check_task = nullptr;
schedule_image_state_check_task();
queue_start_image_replayers();
});
- int after = g_ceph_context->_conf->rbd_mirror_image_state_check_interval;
+ auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
+ int after = cct->_conf.get_val<uint64_t>(
+ "rbd_mirror_image_state_check_interval");
- dout(20) << "scheduling image state check after " << after << " sec (task "
+ dout(10) << "scheduling image state check after " << after << " sec (task "
<< m_image_state_check_task << ")" << dendl;
m_threads->timer->add_event_after(after, m_image_state_check_task);
}