X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=ceph%2Fsrc%2Ftools%2Frbd_mirror%2FInstanceReplayer.cc;h=4ef838fa43ebbfcd1671d07231b01b153c7d035a;hb=e306af509c4d4816a1f73b17a825ea5186fa0030;hp=d2426d0d0e433974bb8cea966d89dc875cde7f08;hpb=7c673caec407dd16107e56e4b51a6d00f021315c;p=ceph.git diff --git a/ceph/src/tools/rbd_mirror/InstanceReplayer.cc b/ceph/src/tools/rbd_mirror/InstanceReplayer.cc index d2426d0d0..4ef838fa4 100644 --- a/ceph/src/tools/rbd_mirror/InstanceReplayer.cc +++ b/ceph/src/tools/rbd_mirror/InstanceReplayer.cc @@ -2,12 +2,15 @@ // vim: ts=8 sw=2 smarttab #include "include/stringify.h" +#include "common/Cond.h" #include "common/Timer.h" #include "common/debug.h" #include "common/errno.h" +#include "common/WorkQueue.h" #include "librbd/Utils.h" #include "ImageReplayer.h" #include "InstanceReplayer.h" +#include "ServiceDaemon.h" #include "Threads.h" #define dout_context g_ceph_context @@ -19,25 +22,44 @@ namespace rbd { namespace mirror { +namespace { + +const std::string SERVICE_DAEMON_ASSIGNED_COUNT_KEY("image_assigned_count"); +const std::string SERVICE_DAEMON_WARNING_COUNT_KEY("image_warning_count"); +const std::string SERVICE_DAEMON_ERROR_COUNT_KEY("image_error_count"); + +} // anonymous namespace + using librbd::util::create_async_context_callback; using librbd::util::create_context_callback; template InstanceReplayer::InstanceReplayer( - Threads *threads, std::shared_ptr image_deleter, - ImageSyncThrottlerRef image_sync_throttler, RadosRef local_rados, - const std::string &local_mirror_uuid, int64_t local_pool_id) - : m_threads(threads), m_image_deleter(image_deleter), - m_image_sync_throttler(image_sync_throttler), m_local_rados(local_rados), - m_local_mirror_uuid(local_mirror_uuid), m_local_pool_id(local_pool_id), - m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id)) { + librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid, + Threads *threads, ServiceDaemon* service_daemon, + MirrorStatusUpdater* local_status_updater, + journal::CacheManagerHandler *cache_manager_handler, + PoolMetaCache* pool_meta_cache) + : m_local_io_ctx(local_io_ctx), m_local_mirror_uuid(local_mirror_uuid), + m_threads(threads), m_service_daemon(service_daemon), + m_local_status_updater(local_status_updater), + m_cache_manager_handler(cache_manager_handler), + m_pool_meta_cache(pool_meta_cache), + m_lock(ceph::make_mutex("rbd::mirror::InstanceReplayer " + + stringify(local_io_ctx.get_id()))) { } template InstanceReplayer::~InstanceReplayer() { - assert(m_image_state_check_task == nullptr); - assert(m_async_op_tracker.empty()); - assert(m_image_replayers.empty()); + ceph_assert(m_image_state_check_task == nullptr); + ceph_assert(m_async_op_tracker.empty()); + ceph_assert(m_image_replayers.empty()); +} + +template +bool InstanceReplayer::is_blacklisted() const { + std::lock_guard locker{m_lock}; + return m_blacklisted; } template @@ -49,12 +71,12 @@ int InstanceReplayer::init() { template void InstanceReplayer::init(Context *on_finish) { - dout(20) << dendl; + dout(10) << dendl; - Context *ctx = new FunctionContext( + Context *ctx = new LambdaContext( [this, on_finish] (int r) { { - Mutex::Locker timer_locker(m_threads->timer_lock); + std::lock_guard timer_locker{m_threads->timer_lock}; schedule_image_state_check_task(); } on_finish->complete(0); @@ -68,19 +90,19 @@ void InstanceReplayer::shut_down() { C_SaferCond shut_down_ctx; shut_down(&shut_down_ctx); int r = shut_down_ctx.wait(); - assert(r == 0); + ceph_assert(r == 0); } template void InstanceReplayer::shut_down(Context *on_finish) { - dout(20) << dendl; + dout(10) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; - assert(m_on_shut_down == nullptr); + ceph_assert(m_on_shut_down == nullptr); m_on_shut_down = on_finish; - Context *ctx = new FunctionContext( + Context *ctx = new LambdaContext( [this] (int r) { cancel_image_state_check_task(); wait_for_ops(); @@ -90,36 +112,26 @@ void InstanceReplayer::shut_down(Context *on_finish) { } template -void InstanceReplayer::add_peer(std::string mirror_uuid, - librados::IoCtx io_ctx) { - dout(20) << mirror_uuid << dendl; - - Mutex::Locker locker(m_lock); - auto result = m_peers.insert(Peer(mirror_uuid, io_ctx)).second; - assert(result); -} - -template -void InstanceReplayer::remove_peer(std::string mirror_uuid) { - dout(20) << mirror_uuid << dendl; +void InstanceReplayer::add_peer(const Peer& peer) { + dout(10) << "peer=" << peer << dendl; - Mutex::Locker locker(m_lock); - auto result = m_peers.erase(Peer(mirror_uuid)); - assert(result > 0); + std::lock_guard locker{m_lock}; + auto result = m_peers.insert(peer).second; + ceph_assert(result); } template void InstanceReplayer::release_all(Context *on_finish) { - dout(20) << dendl; + dout(10) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; C_Gather *gather_ctx = new C_Gather(g_ceph_context, on_finish); for (auto it = m_image_replayers.begin(); it != m_image_replayers.end(); it = m_image_replayers.erase(it)) { auto image_replayer = it->second; auto ctx = gather_ctx->new_sub(); - ctx = new FunctionContext( + ctx = new LambdaContext( [image_replayer, ctx] (int r) { image_replayer->destroy(); ctx->complete(0); @@ -130,112 +142,103 @@ void InstanceReplayer::release_all(Context *on_finish) { } template -void InstanceReplayer::acquire_image(const std::string &global_image_id, - const std::string &peer_mirror_uuid, - const std::string &peer_image_id, +void InstanceReplayer::acquire_image(InstanceWatcher *instance_watcher, + const std::string &global_image_id, Context *on_finish) { - dout(20) << "global_image_id=" << global_image_id << ", peer_mirror_uuid=" - << peer_mirror_uuid << ", peer_image_id=" << peer_image_id << dendl; + dout(10) << "global_image_id=" << global_image_id << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; - assert(m_on_shut_down == nullptr); + ceph_assert(m_on_shut_down == nullptr); auto it = m_image_replayers.find(global_image_id); - if (it == m_image_replayers.end()) { auto image_replayer = ImageReplayer::create( - m_threads, m_image_deleter, m_image_sync_throttler, m_local_rados, - m_local_mirror_uuid, m_local_pool_id, global_image_id); + m_local_io_ctx, m_local_mirror_uuid, global_image_id, + m_threads, instance_watcher, m_local_status_updater, + m_cache_manager_handler, m_pool_meta_cache); - dout(20) << global_image_id << ": creating replayer " << image_replayer + dout(10) << global_image_id << ": creating replayer " << image_replayer << dendl; it = m_image_replayers.insert(std::make_pair(global_image_id, image_replayer)).first; - } - auto image_replayer = it->second; - if (!peer_mirror_uuid.empty()) { - auto iter = m_peers.find(Peer(peer_mirror_uuid)); - assert(iter != m_peers.end()); - auto io_ctx = iter->io_ctx; - - image_replayer->add_remote_image(peer_mirror_uuid, peer_image_id, io_ctx); + // TODO only a single peer is currently supported + ceph_assert(m_peers.size() == 1); + auto peer = *m_peers.begin(); + image_replayer->add_peer(peer); + start_image_replayer(image_replayer); + } else { + // A duplicate acquire notification implies (1) connection hiccup or + // (2) new leader election. For the second case, restart the replayer to + // detect if the image has been deleted while the leader was offline + auto& image_replayer = it->second; + image_replayer->set_finished(false); + image_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr)); } - start_image_replayer(image_replayer); m_threads->work_queue->queue(on_finish, 0); } template void InstanceReplayer::release_image(const std::string &global_image_id, - const std::string &peer_mirror_uuid, - const std::string &peer_image_id, - bool schedule_delete, Context *on_finish) { - dout(20) << "global_image_id=" << global_image_id << ", peer_mirror_uuid=" - << peer_mirror_uuid << ", peer_image_id=" << peer_image_id << dendl; + dout(10) << "global_image_id=" << global_image_id << dendl; - Mutex::Locker locker(m_lock); - - assert(m_on_shut_down == nullptr); + std::lock_guard locker{m_lock}; + ceph_assert(m_on_shut_down == nullptr); auto it = m_image_replayers.find(global_image_id); - if (it == m_image_replayers.end()) { - dout(20) << global_image_id << ": not found" << dendl; + dout(5) << global_image_id << ": not found" << dendl; m_threads->work_queue->queue(on_finish, 0); return; } auto image_replayer = it->second; - if (!peer_mirror_uuid.empty()) { - image_replayer->remove_remote_image(peer_mirror_uuid, peer_image_id, - schedule_delete); - } - - if (!image_replayer->remote_images_empty()) { - dout(20) << global_image_id << ": still has peer images" << dendl; - m_threads->work_queue->queue(on_finish, 0); - return; - } - m_image_replayers.erase(it); - on_finish = new FunctionContext( + on_finish = new LambdaContext( [image_replayer, on_finish] (int r) { image_replayer->destroy(); on_finish->complete(0); }); - - if (schedule_delete) { - on_finish = new FunctionContext( - [this, image_replayer, on_finish] (int r) { - auto global_image_id = image_replayer->get_global_image_id(); - m_image_deleter->schedule_image_delete( - m_local_rados, m_local_pool_id, global_image_id); - on_finish->complete(0); - }); - } - stop_image_replayer(image_replayer, on_finish); } template -void InstanceReplayer::print_status(Formatter *f, stringstream *ss) { - dout(20) << dendl; +void InstanceReplayer::remove_peer_image(const std::string &global_image_id, + const std::string &peer_mirror_uuid, + Context *on_finish) { + dout(10) << "global_image_id=" << global_image_id << ", " + << "peer_mirror_uuid=" << peer_mirror_uuid << dendl; - if (!f) { - return; + std::lock_guard locker{m_lock}; + ceph_assert(m_on_shut_down == nullptr); + + auto it = m_image_replayers.find(global_image_id); + if (it != m_image_replayers.end()) { + // TODO only a single peer is currently supported, therefore + // we can just interrupt the current image replayer and + // it will eventually detect that the peer image is missing and + // determine if a delete propagation is required. + auto image_replayer = it->second; + image_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr)); } + m_threads->work_queue->queue(on_finish, 0); +} + +template +void InstanceReplayer::print_status(Formatter *f) { + dout(10) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; f->open_array_section("image_replayers"); for (auto &kv : m_image_replayers) { auto &image_replayer = kv.second; - image_replayer->print_status(f, ss); + image_replayer->print_status(f); } f->close_section(); } @@ -243,54 +246,80 @@ void InstanceReplayer::print_status(Formatter *f, stringstream *ss) { template void InstanceReplayer::start() { - dout(20) << dendl; + dout(10) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_manual_stop = false; + auto cct = static_cast(m_local_io_ctx.cct()); + auto gather_ctx = new C_Gather( + cct, new C_TrackedOp(m_async_op_tracker, nullptr)); for (auto &kv : m_image_replayers) { auto &image_replayer = kv.second; - image_replayer->start(nullptr, true); + image_replayer->start(gather_ctx->new_sub(), true); } + + gather_ctx->activate(); } template void InstanceReplayer::stop() { - dout(20) << dendl; + stop(nullptr); +} - Mutex::Locker locker(m_lock); +template +void InstanceReplayer::stop(Context *on_finish) +{ + dout(10) << dendl; - m_manual_stop = true; + if (on_finish == nullptr) { + on_finish = new C_TrackedOp(m_async_op_tracker, on_finish); + } else { + on_finish = new LambdaContext( + [this, on_finish] (int r) { + m_async_op_tracker.wait_for_ops(on_finish); + }); + } - for (auto &kv : m_image_replayers) { - auto &image_replayer = kv.second; - image_replayer->stop(nullptr, true); + auto cct = static_cast(m_local_io_ctx.cct()); + auto gather_ctx = new C_Gather(cct, on_finish); + { + std::lock_guard locker{m_lock}; + + m_manual_stop = true; + + for (auto &kv : m_image_replayers) { + auto &image_replayer = kv.second; + image_replayer->stop(gather_ctx->new_sub(), true); + } } + + gather_ctx->activate(); } template void InstanceReplayer::restart() { - dout(20) << dendl; + dout(10) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_manual_stop = false; for (auto &kv : m_image_replayers) { auto &image_replayer = kv.second; - image_replayer->restart(); + image_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr)); } } template void InstanceReplayer::flush() { - dout(20) << "enter" << dendl; + dout(10) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; for (auto &kv : m_image_replayers) { auto &image_replayer = kv.second; @@ -301,73 +330,86 @@ void InstanceReplayer::flush() template void InstanceReplayer::start_image_replayer( ImageReplayer *image_replayer) { - assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); std::string global_image_id = image_replayer->get_global_image_id(); - dout(20) << "global_image_id=" << global_image_id << dendl; - if (!image_replayer->is_stopped()) { return; } else if (image_replayer->is_blacklisted()) { - derr << "blacklisted detected during image replay" << dendl; + derr << "global_image_id=" << global_image_id << ": blacklisted detected " + << "during image replay" << dendl; + m_blacklisted = true; + return; + } else if (image_replayer->is_finished()) { + // TODO temporary until policy integrated + dout(5) << "removing image replayer for global_image_id=" + << global_image_id << dendl; + m_image_replayers.erase(image_replayer->get_global_image_id()); + image_replayer->destroy(); + return; + } else if (m_manual_stop) { return; } - FunctionContext *ctx = new FunctionContext( - [this, global_image_id] (int r) { - dout(20) << "image deleter result: r=" << r << ", " - << "global_image_id=" << global_image_id << dendl; - - Mutex::Locker locker(m_lock); - m_async_op_tracker.finish_op(); - - if (r == -ESTALE || r == -ECANCELED) { - return; - } - - auto it = m_image_replayers.find(global_image_id); - if (it == m_image_replayers.end()) { - return; - } + dout(10) << "global_image_id=" << global_image_id << dendl; + image_replayer->start(new C_TrackedOp(m_async_op_tracker, nullptr), false); +} - auto image_replayer = it->second; - if (r >= 0) { - image_replayer->start(nullptr, false); - } else { - start_image_replayer(image_replayer); - } - }); +template +void InstanceReplayer::queue_start_image_replayers() { + dout(10) << dendl; + Context *ctx = create_context_callback< + InstanceReplayer, &InstanceReplayer::start_image_replayers>(this); m_async_op_tracker.start_op(); - m_image_deleter->wait_for_scheduled_deletion( - m_local_pool_id, image_replayer->get_global_image_id(), ctx, false); + m_threads->work_queue->queue(ctx, 0); } template -void InstanceReplayer::start_image_replayers() { - dout(20) << dendl; +void InstanceReplayer::start_image_replayers(int r) { + dout(10) << dendl; - Context *ctx = new FunctionContext( - [this] (int r) { - Mutex::Locker locker(m_lock); - m_async_op_tracker.finish_op(); - if (m_on_shut_down != nullptr) { - return; - } - for (auto &it : m_image_replayers) { - start_image_replayer(it.second); - } - }); + std::lock_guard locker{m_lock}; + if (m_on_shut_down != nullptr) { + return; + } - m_async_op_tracker.start_op(); - m_threads->work_queue->queue(ctx, 0); -} + uint64_t image_count = 0; + uint64_t warning_count = 0; + uint64_t error_count = 0; + for (auto it = m_image_replayers.begin(); + it != m_image_replayers.end();) { + auto current_it(it); + ++it; + + ++image_count; + auto health_state = current_it->second->get_health_state(); + if (health_state == image_replayer::HEALTH_STATE_WARNING) { + ++warning_count; + } else if (health_state == image_replayer::HEALTH_STATE_ERROR) { + ++error_count; + } + + start_image_replayer(current_it->second); + } + m_service_daemon->add_or_update_namespace_attribute( + m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(), + SERVICE_DAEMON_ASSIGNED_COUNT_KEY, image_count); + m_service_daemon->add_or_update_namespace_attribute( + m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(), + SERVICE_DAEMON_WARNING_COUNT_KEY, warning_count); + m_service_daemon->add_or_update_namespace_attribute( + m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(), + SERVICE_DAEMON_ERROR_COUNT_KEY, error_count); + + m_async_op_tracker.finish_op(); +} template void InstanceReplayer::stop_image_replayer(ImageReplayer *image_replayer, Context *on_finish) { - dout(20) << image_replayer << " global_image_id=" + dout(10) << image_replayer << " global_image_id=" << image_replayer->get_global_image_id() << ", on_finish=" << on_finish << dendl; @@ -378,7 +420,7 @@ void InstanceReplayer::stop_image_replayer(ImageReplayer *image_replayer, m_async_op_tracker.start_op(); Context *ctx = create_async_context_callback( - m_threads->work_queue, new FunctionContext( + m_threads->work_queue, new LambdaContext( [this, image_replayer, on_finish] (int r) { stop_image_replayer(image_replayer, on_finish); m_async_op_tracker.finish_op(); @@ -388,11 +430,11 @@ void InstanceReplayer::stop_image_replayer(ImageReplayer *image_replayer, image_replayer->stop(ctx, false); } else { int after = 1; - dout(20) << "scheduling image replayer " << image_replayer << " stop after " + dout(10) << "scheduling image replayer " << image_replayer << " stop after " << after << " sec (task " << ctx << ")" << dendl; - ctx = new FunctionContext( + ctx = new LambdaContext( [this, after, ctx] (int r) { - Mutex::Locker timer_locker(m_threads->timer_lock); + std::lock_guard timer_locker{m_threads->timer_lock}; m_threads->timer->add_event_after(after, ctx); }); m_threads->work_queue->queue(ctx, 0); @@ -401,7 +443,7 @@ void InstanceReplayer::stop_image_replayer(ImageReplayer *image_replayer, template void InstanceReplayer::wait_for_ops() { - dout(20) << dendl; + dout(10) << dendl; Context *ctx = create_context_callback< InstanceReplayer, &InstanceReplayer::handle_wait_for_ops>(this); @@ -411,19 +453,19 @@ void InstanceReplayer::wait_for_ops() { template void InstanceReplayer::handle_wait_for_ops(int r) { - dout(20) << "r=" << r << dendl; + dout(10) << "r=" << r << dendl; - assert(r == 0); + ceph_assert(r == 0); - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; stop_image_replayers(); } template void InstanceReplayer::stop_image_replayers() { - dout(20) << dendl; + dout(10) << dendl; - assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); Context *ctx = create_async_context_callback( m_threads->work_queue, create_context_callback, @@ -438,21 +480,21 @@ void InstanceReplayer::stop_image_replayers() { template void InstanceReplayer::handle_stop_image_replayers(int r) { - dout(20) << "r=" << r << dendl; + dout(10) << "r=" << r << dendl; - assert(r == 0); + ceph_assert(r == 0); Context *on_finish = nullptr; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; for (auto &it : m_image_replayers) { - assert(it.second->is_stopped()); + ceph_assert(it.second->is_stopped()); it.second->destroy(); } m_image_replayers.clear(); - assert(m_on_shut_down != nullptr); + ceph_assert(m_on_shut_down != nullptr); std::swap(on_finish, m_on_shut_down); } on_finish->complete(r); @@ -460,35 +502,36 @@ void InstanceReplayer::handle_stop_image_replayers(int r) { template void InstanceReplayer::cancel_image_state_check_task() { - Mutex::Locker timer_locker(m_threads->timer_lock); + std::lock_guard timer_locker{m_threads->timer_lock}; if (m_image_state_check_task == nullptr) { return; } - dout(20) << m_image_state_check_task << dendl; + dout(10) << m_image_state_check_task << dendl; bool canceled = m_threads->timer->cancel_event(m_image_state_check_task); - assert(canceled); + ceph_assert(canceled); m_image_state_check_task = nullptr; } template void InstanceReplayer::schedule_image_state_check_task() { - assert(m_threads->timer_lock.is_locked()); - assert(m_image_state_check_task == nullptr); + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); + ceph_assert(m_image_state_check_task == nullptr); - m_image_state_check_task = new FunctionContext( + m_image_state_check_task = new LambdaContext( [this](int r) { - assert(m_threads->timer_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); m_image_state_check_task = nullptr; schedule_image_state_check_task(); - start_image_replayers(); + queue_start_image_replayers(); }); - int after = - max(1, g_ceph_context->_conf->rbd_mirror_image_state_check_interval); + auto cct = static_cast(m_local_io_ctx.cct()); + int after = cct->_conf.get_val( + "rbd_mirror_image_state_check_interval"); - dout(20) << "scheduling image state check after " << after << " sec (task " + dout(10) << "scheduling image state check after " << after << " sec (task " << m_image_state_check_task << ")" << dendl; m_threads->timer->add_event_after(after, m_image_state_check_task); }