*/
#include "ImageSyncThrottler.h"
-#include "ImageSync.h"
-#include "common/ceph_context.h"
+#include "common/Formatter.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "librbd/Utils.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rbd_mirror
#undef dout_prefix
#define dout_prefix *_dout << "rbd::mirror::ImageSyncThrottler:: " << this \
<< " " << __func__ << ": "
-using std::unique_ptr;
-using std::string;
-using std::set;
namespace rbd {
namespace mirror {
-template <typename ImageCtxT>
-struct ImageSyncThrottler<ImageCtxT>::C_SyncHolder : public Context {
- ImageSyncThrottler<ImageCtxT> *m_sync_throttler;
- PoolImageId m_local_pool_image_id;
- ImageSync<ImageCtxT> *m_sync = nullptr;
- Context *m_on_finish;
-
- C_SyncHolder(ImageSyncThrottler<ImageCtxT> *sync_throttler,
- const PoolImageId &local_pool_image_id, Context *on_finish)
- : m_sync_throttler(sync_throttler),
- m_local_pool_image_id(local_pool_image_id), m_on_finish(on_finish) {
- }
-
- void finish(int r) override {
- m_sync_throttler->handle_sync_finished(this);
- m_on_finish->complete(r);
- }
-};
-
template <typename I>
ImageSyncThrottler<I>::ImageSyncThrottler()
- : m_max_concurrent_syncs(g_ceph_context->_conf->rbd_mirror_concurrent_image_syncs),
- m_lock("rbd::mirror::ImageSyncThrottler")
-{
- dout(20) << "Initialized max_concurrent_syncs=" << m_max_concurrent_syncs
- << dendl;
+ : m_lock(librbd::util::unique_lock_name("rbd::mirror::ImageSyncThrottler",
+ this)),
+ m_max_concurrent_syncs(
+ g_ceph_context->_conf->rbd_mirror_concurrent_image_syncs) {
+ dout(20) << "max_concurrent_syncs=" << m_max_concurrent_syncs << dendl;
g_ceph_context->_conf->add_observer(this);
}
template <typename I>
ImageSyncThrottler<I>::~ImageSyncThrottler() {
- {
- Mutex::Locker l(m_lock);
- assert(m_sync_queue.empty());
- assert(m_inflight_syncs.empty());
- }
-
g_ceph_context->_conf->remove_observer(this);
+
+ Mutex::Locker locker(m_lock);
+ assert(m_inflight_ops.empty());
+ assert(m_queue.empty());
}
template <typename I>
-void ImageSyncThrottler<I>::start_sync(I *local_image_ctx, I *remote_image_ctx,
- SafeTimer *timer, Mutex *timer_lock,
- const std::string &mirror_uuid,
- Journaler *journaler,
- MirrorPeerClientMeta *client_meta,
- ContextWQ *work_queue,
- Context *on_finish,
- ProgressContext *progress_ctx) {
- dout(20) << dendl;
+void ImageSyncThrottler<I>::start_op(const std::string &id, Context *on_start) {
+ dout(20) << "id=" << id << dendl;
- PoolImageId pool_image_id(local_image_ctx->md_ctx.get_id(),
- local_image_ctx->id);
- C_SyncHolder *sync_holder_ctx = new C_SyncHolder(this, pool_image_id,
- on_finish);
- sync_holder_ctx->m_sync = ImageSync<I>::create(local_image_ctx,
- remote_image_ctx, timer,
- timer_lock, mirror_uuid,
- journaler, client_meta,
- work_queue, sync_holder_ctx,
- progress_ctx);
- sync_holder_ctx->m_sync->get();
-
- bool start = false;
{
- Mutex::Locker l(m_lock);
-
- if (m_inflight_syncs.size() < m_max_concurrent_syncs) {
- assert(m_inflight_syncs.count(pool_image_id) == 0);
- m_inflight_syncs[pool_image_id] = sync_holder_ctx;
- start = true;
- dout(10) << "ready to start image sync for local_image_id "
- << local_image_ctx->id << " [" << m_inflight_syncs.size() << "/"
- << m_max_concurrent_syncs << "]" << dendl;
+ Mutex::Locker locker(m_lock);
+
+ if (m_inflight_ops.count(id) > 0) {
+ dout(20) << "duplicate for already started op " << id << dendl;
+ } else if (m_max_concurrent_syncs == 0 ||
+ m_inflight_ops.size() < m_max_concurrent_syncs) {
+ assert(m_queue.empty());
+ m_inflight_ops.insert(id);
+ dout(20) << "ready to start sync for " << id << " ["
+ << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
+ << dendl;
} else {
- m_sync_queue.push_front(sync_holder_ctx);
- dout(10) << "image sync for local_image_id " << local_image_ctx->id
- << " has been queued" << dendl;
+ m_queue.push_back(std::make_pair(id, on_start));
+ on_start = nullptr;
+ dout(20) << "image sync for " << id << " has been queued" << dendl;
}
}
- if (start) {
- sync_holder_ctx->m_sync->send();
+ if (on_start != nullptr) {
+ on_start->complete(0);
}
}
template <typename I>
-void ImageSyncThrottler<I>::cancel_sync(librados::IoCtx &local_io_ctx,
- const std::string local_image_id) {
- dout(20) << dendl;
-
- C_SyncHolder *sync_holder = nullptr;
- bool running_sync = true;
+bool ImageSyncThrottler<I>::cancel_op(const std::string &id) {
+ dout(20) << "id=" << id << dendl;
+ Context *on_start = nullptr;
{
- Mutex::Locker l(m_lock);
- if (m_inflight_syncs.empty()) {
- // no image sync currently running and neither waiting
- return;
- }
-
- PoolImageId local_pool_image_id(local_io_ctx.get_id(),
- local_image_id);
- auto it = m_inflight_syncs.find(local_pool_image_id);
- if (it != m_inflight_syncs.end()) {
- sync_holder = it->second;
- }
-
- if (!sync_holder) {
- for (auto it = m_sync_queue.begin(); it != m_sync_queue.end(); ++it) {
- if ((*it)->m_local_pool_image_id == local_pool_image_id) {
- sync_holder = (*it);
- m_sync_queue.erase(it);
- running_sync = false;
- break;
- }
+ Mutex::Locker locker(m_lock);
+ for (auto it = m_queue.begin(); it != m_queue.end(); ++it) {
+ if (it->first == id) {
+ on_start = it->second;
+ dout(20) << "canceled queued sync for " << id << dendl;
+ m_queue.erase(it);
+ break;
}
}
}
- if (sync_holder) {
- if (running_sync) {
- dout(10) << "canceled running image sync for local_image_id "
- << sync_holder->m_local_pool_image_id.second << dendl;
- sync_holder->m_sync->cancel();
- } else {
- dout(10) << "canceled waiting image sync for local_image_id "
- << sync_holder->m_local_pool_image_id.second << dendl;
- sync_holder->m_on_finish->complete(-ECANCELED);
- sync_holder->m_sync->put();
- delete sync_holder;
- }
+ if (on_start == nullptr) {
+ return false;
}
+
+ on_start->complete(-ECANCELED);
+ return true;
}
template <typename I>
-void ImageSyncThrottler<I>::handle_sync_finished(C_SyncHolder *sync_holder) {
- dout(20) << dendl;
+void ImageSyncThrottler<I>::finish_op(const std::string &id) {
+ dout(20) << "id=" << id << dendl;
- C_SyncHolder *next_sync_holder = nullptr;
+ if (cancel_op(id)) {
+ return;
+ }
+ Context *on_start = nullptr;
{
- Mutex::Locker l(m_lock);
- m_inflight_syncs.erase(sync_holder->m_local_pool_image_id);
-
- if (m_inflight_syncs.size() < m_max_concurrent_syncs &&
- !m_sync_queue.empty()) {
- next_sync_holder = m_sync_queue.back();
- m_sync_queue.pop_back();
-
- assert(
- m_inflight_syncs.count(next_sync_holder->m_local_pool_image_id) == 0);
- m_inflight_syncs[next_sync_holder->m_local_pool_image_id] =
- next_sync_holder;
- dout(10) << "ready to start image sync for local_image_id "
- << next_sync_holder->m_local_pool_image_id.second
- << " [" << m_inflight_syncs.size() << "/"
- << m_max_concurrent_syncs << "]" << dendl;
+ Mutex::Locker locker(m_lock);
+
+ m_inflight_ops.erase(id);
+
+ if (m_inflight_ops.size() < m_max_concurrent_syncs && !m_queue.empty()) {
+ auto pair = m_queue.front();
+ m_inflight_ops.insert(pair.first);
+ dout(20) << "ready to start sync for " << pair.first << " ["
+ << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
+ << dendl;
+ on_start= pair.second;
+ m_queue.pop_front();
}
+ }
- dout(10) << "currently running image syncs [" << m_inflight_syncs.size()
- << "/" << m_max_concurrent_syncs << "]" << dendl;
+ if (on_start != nullptr) {
+ on_start->complete(0);
}
+}
- if (next_sync_holder) {
- next_sync_holder->m_sync->send();
+template <typename I>
+void ImageSyncThrottler<I>::drain(int r) {
+ dout(20) << dendl;
+
+ std::list<std::pair<std::string, Context *>> queue;
+ {
+ Mutex::Locker locker(m_lock);
+ std::swap(m_queue, queue);
+ m_inflight_ops.clear();
+ }
+
+ for (auto &pair : queue) {
+ pair.second->complete(r);
}
}
template <typename I>
void ImageSyncThrottler<I>::set_max_concurrent_syncs(uint32_t max) {
- dout(20) << " max=" << max << dendl;
+ dout(20) << "max=" << max << dendl;
- assert(max > 0);
-
- std::list<C_SyncHolder *> next_sync_holders;
+ std::list<Context *> ops;
{
- Mutex::Locker l(m_lock);
- this->m_max_concurrent_syncs = max;
-
- // Start waiting syncs in the case of available free slots
- while(m_inflight_syncs.size() < m_max_concurrent_syncs
- && !m_sync_queue.empty()) {
- C_SyncHolder *next_sync_holder = m_sync_queue.back();
- next_sync_holders.push_back(next_sync_holder);
- m_sync_queue.pop_back();
-
- assert(
- m_inflight_syncs.count(next_sync_holder->m_local_pool_image_id) == 0);
- m_inflight_syncs[next_sync_holder->m_local_pool_image_id] =
- next_sync_holder;
-
- dout(10) << "ready to start image sync for local_image_id "
- << next_sync_holder->m_local_pool_image_id.second
- << " [" << m_inflight_syncs.size() << "/"
- << m_max_concurrent_syncs << "]" << dendl;
+ Mutex::Locker locker(m_lock);
+ m_max_concurrent_syncs = max;
+
+ // Start waiting ops in the case of available free slots
+ while ((m_max_concurrent_syncs == 0 ||
+ m_inflight_ops.size() < m_max_concurrent_syncs) &&
+ !m_queue.empty()) {
+ auto pair = m_queue.front();
+ m_inflight_ops.insert(pair.first);
+ dout(20) << "ready to start sync for " << pair.first << " ["
+ << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
+ << dendl;
+ ops.push_back(pair.second);
+ m_queue.pop_front();
}
}
- for (const auto& sync_holder : next_sync_holders) {
- sync_holder->m_sync->send();
+ for (const auto& ctx : ops) {
+ ctx->complete(0);
}
}
template <typename I>
-void ImageSyncThrottler<I>::print_status(Formatter *f, stringstream *ss) {
- Mutex::Locker l(m_lock);
+void ImageSyncThrottler<I>::print_status(Formatter *f, std::stringstream *ss) {
+ dout(20) << dendl;
+
+ Mutex::Locker locker(m_lock);
if (f) {
f->dump_int("max_parallel_syncs", m_max_concurrent_syncs);
- f->dump_int("running_syncs", m_inflight_syncs.size());
- f->dump_int("waiting_syncs", m_sync_queue.size());
+ f->dump_int("running_syncs", m_inflight_ops.size());
+ f->dump_int("waiting_syncs", m_queue.size());
f->flush(*ss);
} else {
*ss << "[ ";
*ss << "max_parallel_syncs=" << m_max_concurrent_syncs << ", ";
- *ss << "running_syncs=" << m_inflight_syncs.size() << ", ";
- *ss << "waiting_syncs=" << m_sync_queue.size() << " ]";
+ *ss << "running_syncs=" << m_inflight_ops.size() << ", ";
+ *ss << "waiting_syncs=" << m_queue.size() << " ]";
}
}
}
template <typename I>
-void ImageSyncThrottler<I>::handle_conf_change(
- const struct md_config_t *conf,
- const set<string> &changed) {
+void ImageSyncThrottler<I>::handle_conf_change(const struct md_config_t *conf,
+ const set<string> &changed) {
if (changed.count("rbd_mirror_concurrent_image_syncs")) {
set_max_concurrent_syncs(conf->rbd_mirror_concurrent_image_syncs);
}