]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/tools/rbd_mirror/ImageSyncThrottler.cc
update sources to v12.1.0
[ceph.git] / ceph / src / tools / rbd_mirror / ImageSyncThrottler.cc
index 9d3123c15dfb85b6cd55abf8c3137cb1fd7b1e45..e5d08cea5f48e8203e450878e2946a836ad44d49 100644 (file)
  */
 
 #include "ImageSyncThrottler.h"
-#include "ImageSync.h"
-#include "common/ceph_context.h"
+#include "common/Formatter.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "librbd/Utils.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rbd_mirror
 #undef dout_prefix
 #define dout_prefix *_dout << "rbd::mirror::ImageSyncThrottler:: " << this \
                            << " " << __func__ << ": "
-using std::unique_ptr;
-using std::string;
-using std::set;
 
 namespace rbd {
 namespace mirror {
 
-template <typename ImageCtxT>
-struct ImageSyncThrottler<ImageCtxT>::C_SyncHolder : public Context {
-  ImageSyncThrottler<ImageCtxT> *m_sync_throttler;
-  PoolImageId m_local_pool_image_id;
-  ImageSync<ImageCtxT> *m_sync = nullptr;
-  Context *m_on_finish;
-
-  C_SyncHolder(ImageSyncThrottler<ImageCtxT> *sync_throttler,
-               const PoolImageId &local_pool_image_id, Context *on_finish)
-    : m_sync_throttler(sync_throttler),
-      m_local_pool_image_id(local_pool_image_id), m_on_finish(on_finish) {
-  }
-
-  void finish(int r) override {
-    m_sync_throttler->handle_sync_finished(this);
-    m_on_finish->complete(r);
-  }
-};
-
 template <typename I>
 ImageSyncThrottler<I>::ImageSyncThrottler()
-  : m_max_concurrent_syncs(g_ceph_context->_conf->rbd_mirror_concurrent_image_syncs),
-    m_lock("rbd::mirror::ImageSyncThrottler")
-{
-  dout(20) << "Initialized max_concurrent_syncs=" << m_max_concurrent_syncs
-           << dendl;
+  : m_lock(librbd::util::unique_lock_name("rbd::mirror::ImageSyncThrottler",
+                                          this)),
+    m_max_concurrent_syncs(
+      g_ceph_context->_conf->rbd_mirror_concurrent_image_syncs) {
+  dout(20) << "max_concurrent_syncs=" << m_max_concurrent_syncs << dendl;
   g_ceph_context->_conf->add_observer(this);
 }
 
 template <typename I>
 ImageSyncThrottler<I>::~ImageSyncThrottler() {
-  {
-    Mutex::Locker l(m_lock);
-    assert(m_sync_queue.empty());
-    assert(m_inflight_syncs.empty());
-  }
-
   g_ceph_context->_conf->remove_observer(this);
+
+  Mutex::Locker locker(m_lock);
+  assert(m_inflight_ops.empty());
+  assert(m_queue.empty());
 }
 
 template <typename I>
-void ImageSyncThrottler<I>::start_sync(I *local_image_ctx, I *remote_image_ctx,
-                                       SafeTimer *timer, Mutex *timer_lock,
-                                       const std::string &mirror_uuid,
-                                       Journaler *journaler,
-                                       MirrorPeerClientMeta *client_meta,
-                                       ContextWQ *work_queue,
-                                       Context *on_finish,
-                                       ProgressContext *progress_ctx) {
-  dout(20) << dendl;
+void ImageSyncThrottler<I>::start_op(const std::string &id, Context *on_start) {
+  dout(20) << "id=" << id << dendl;
 
-  PoolImageId pool_image_id(local_image_ctx->md_ctx.get_id(),
-                            local_image_ctx->id);
-  C_SyncHolder *sync_holder_ctx = new C_SyncHolder(this, pool_image_id,
-                                                   on_finish);
-  sync_holder_ctx->m_sync = ImageSync<I>::create(local_image_ctx,
-                                                 remote_image_ctx, timer,
-                                                 timer_lock, mirror_uuid,
-                                                 journaler, client_meta,
-                                                 work_queue, sync_holder_ctx,
-                                                 progress_ctx);
-  sync_holder_ctx->m_sync->get();
-
-  bool start = false;
   {
-    Mutex::Locker l(m_lock);
-
-    if (m_inflight_syncs.size() < m_max_concurrent_syncs) {
-      assert(m_inflight_syncs.count(pool_image_id) == 0);
-      m_inflight_syncs[pool_image_id] = sync_holder_ctx;
-      start = true;
-      dout(10) << "ready to start image sync for local_image_id "
-               << local_image_ctx->id << " [" << m_inflight_syncs.size() << "/"
-               << m_max_concurrent_syncs << "]" << dendl;
+    Mutex::Locker locker(m_lock);
+
+    if (m_inflight_ops.count(id) > 0) {
+      dout(20) << "duplicate for already started op " << id << dendl;
+    } else if (m_max_concurrent_syncs == 0 ||
+               m_inflight_ops.size() < m_max_concurrent_syncs) {
+      assert(m_queue.empty());
+      m_inflight_ops.insert(id);
+      dout(20) << "ready to start sync for " << id << " ["
+               << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
+               << dendl;
     } else {
-      m_sync_queue.push_front(sync_holder_ctx);
-      dout(10) << "image sync for local_image_id " << local_image_ctx->id
-               << " has been queued" << dendl;
+      m_queue.push_back(std::make_pair(id, on_start));
+      on_start = nullptr;
+      dout(20) << "image sync for " << id << " has been queued" << dendl;
     }
   }
 
-  if (start) {
-    sync_holder_ctx->m_sync->send();
+  if (on_start != nullptr) {
+    on_start->complete(0);
   }
 }
 
 template <typename I>
-void ImageSyncThrottler<I>::cancel_sync(librados::IoCtx &local_io_ctx,
-                                        const std::string local_image_id) {
-  dout(20) << dendl;
-
-  C_SyncHolder *sync_holder = nullptr;
-  bool running_sync = true;
+bool ImageSyncThrottler<I>::cancel_op(const std::string &id) {
+  dout(20) << "id=" << id << dendl;
 
+  Context *on_start = nullptr;
   {
-    Mutex::Locker l(m_lock);
-    if (m_inflight_syncs.empty()) {
-      // no image sync currently running and neither waiting
-      return;
-    }
-
-    PoolImageId local_pool_image_id(local_io_ctx.get_id(),
-                                    local_image_id);
-    auto it = m_inflight_syncs.find(local_pool_image_id);
-    if (it != m_inflight_syncs.end()) {
-      sync_holder = it->second;
-    }
-
-    if (!sync_holder) {
-      for (auto it = m_sync_queue.begin(); it != m_sync_queue.end(); ++it) {
-        if ((*it)->m_local_pool_image_id == local_pool_image_id) {
-          sync_holder = (*it);
-          m_sync_queue.erase(it);
-          running_sync = false;
-          break;
-        }
+    Mutex::Locker locker(m_lock);
+    for (auto it = m_queue.begin(); it != m_queue.end(); ++it) {
+      if (it->first == id) {
+        on_start = it->second;
+        dout(20) << "canceled queued sync for " << id << dendl;
+        m_queue.erase(it);
+        break;
       }
     }
   }
 
-  if (sync_holder) {
-    if (running_sync) {
-      dout(10) << "canceled running image sync for local_image_id "
-               << sync_holder->m_local_pool_image_id.second << dendl;
-      sync_holder->m_sync->cancel();
-    } else {
-      dout(10) << "canceled waiting image sync for local_image_id "
-               << sync_holder->m_local_pool_image_id.second << dendl;
-      sync_holder->m_on_finish->complete(-ECANCELED);
-      sync_holder->m_sync->put();
-      delete sync_holder;
-    }
+  if (on_start == nullptr) {
+    return false;
   }
+
+  on_start->complete(-ECANCELED);
+  return true;
 }
 
 template <typename I>
-void ImageSyncThrottler<I>::handle_sync_finished(C_SyncHolder *sync_holder) {
-  dout(20) << dendl;
+void ImageSyncThrottler<I>::finish_op(const std::string &id) {
+  dout(20) << "id=" << id << dendl;
 
-  C_SyncHolder *next_sync_holder = nullptr;
+  if (cancel_op(id)) {
+    return;
+  }
 
+  Context *on_start = nullptr;
   {
-    Mutex::Locker l(m_lock);
-    m_inflight_syncs.erase(sync_holder->m_local_pool_image_id);
-
-    if (m_inflight_syncs.size() < m_max_concurrent_syncs &&
-        !m_sync_queue.empty()) {
-      next_sync_holder = m_sync_queue.back();
-      m_sync_queue.pop_back();
-
-      assert(
-        m_inflight_syncs.count(next_sync_holder->m_local_pool_image_id) == 0);
-      m_inflight_syncs[next_sync_holder->m_local_pool_image_id] =
-        next_sync_holder;
-      dout(10) << "ready to start image sync for local_image_id "
-               << next_sync_holder->m_local_pool_image_id.second
-               << " [" << m_inflight_syncs.size() << "/"
-               << m_max_concurrent_syncs << "]" << dendl;
+    Mutex::Locker locker(m_lock);
+
+    m_inflight_ops.erase(id);
+
+    if (m_inflight_ops.size() < m_max_concurrent_syncs && !m_queue.empty()) {
+      auto pair = m_queue.front();
+      m_inflight_ops.insert(pair.first);
+      dout(20) << "ready to start sync for " << pair.first << " ["
+               << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
+               << dendl;
+      on_start= pair.second;
+      m_queue.pop_front();
     }
+  }
 
-    dout(10) << "currently running image syncs [" << m_inflight_syncs.size()
-             << "/" << m_max_concurrent_syncs << "]" << dendl;
+  if (on_start != nullptr) {
+    on_start->complete(0);
   }
+}
 
-  if (next_sync_holder) {
-    next_sync_holder->m_sync->send();
+template <typename I>
+void ImageSyncThrottler<I>::drain(int r) {
+  dout(20) << dendl;
+
+  std::list<std::pair<std::string, Context *>> queue;
+  {
+    Mutex::Locker locker(m_lock);
+    std::swap(m_queue, queue);
+    m_inflight_ops.clear();
+  }
+
+  for (auto &pair : queue) {
+    pair.second->complete(r);
   }
 }
 
 template <typename I>
 void ImageSyncThrottler<I>::set_max_concurrent_syncs(uint32_t max) {
-  dout(20) << " max=" << max << dendl;
+  dout(20) << "max=" << max << dendl;
 
-  assert(max > 0);
-
-  std::list<C_SyncHolder *> next_sync_holders;
+  std::list<Context *> ops;
   {
-    Mutex::Locker l(m_lock);
-    this->m_max_concurrent_syncs = max;
-
-    // Start waiting syncs in the case of available free slots
-    while(m_inflight_syncs.size() < m_max_concurrent_syncs
-          && !m_sync_queue.empty()) {
-        C_SyncHolder *next_sync_holder = m_sync_queue.back();
-        next_sync_holders.push_back(next_sync_holder);
-        m_sync_queue.pop_back();
-
-        assert(
-          m_inflight_syncs.count(next_sync_holder->m_local_pool_image_id) == 0);
-        m_inflight_syncs[next_sync_holder->m_local_pool_image_id] =
-          next_sync_holder;
-
-        dout(10) << "ready to start image sync for local_image_id "
-                 << next_sync_holder->m_local_pool_image_id.second
-                 << " [" << m_inflight_syncs.size() << "/"
-                 << m_max_concurrent_syncs << "]" << dendl;
+    Mutex::Locker locker(m_lock);
+    m_max_concurrent_syncs = max;
+
+    // Start waiting ops in the case of available free slots
+    while ((m_max_concurrent_syncs == 0 ||
+            m_inflight_ops.size() < m_max_concurrent_syncs) &&
+           !m_queue.empty()) {
+      auto pair = m_queue.front();
+      m_inflight_ops.insert(pair.first);
+      dout(20) << "ready to start sync for " << pair.first << " ["
+               << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
+               << dendl;
+      ops.push_back(pair.second);
+      m_queue.pop_front();
     }
   }
 
-  for (const auto& sync_holder : next_sync_holders) {
-    sync_holder->m_sync->send();
+  for (const auto& ctx : ops) {
+    ctx->complete(0);
   }
 }
 
 template <typename I>
-void ImageSyncThrottler<I>::print_status(Formatter *f, stringstream *ss) {
-  Mutex::Locker l(m_lock);
+void ImageSyncThrottler<I>::print_status(Formatter *f, std::stringstream *ss) {
+  dout(20) << dendl;
+
+  Mutex::Locker locker(m_lock);
 
   if (f) {
     f->dump_int("max_parallel_syncs", m_max_concurrent_syncs);
-    f->dump_int("running_syncs", m_inflight_syncs.size());
-    f->dump_int("waiting_syncs", m_sync_queue.size());
+    f->dump_int("running_syncs", m_inflight_ops.size());
+    f->dump_int("waiting_syncs", m_queue.size());
     f->flush(*ss);
   } else {
     *ss << "[ ";
     *ss << "max_parallel_syncs=" << m_max_concurrent_syncs << ", ";
-    *ss << "running_syncs=" << m_inflight_syncs.size() << ", ";
-    *ss << "waiting_syncs=" << m_sync_queue.size() << " ]";
+    *ss << "running_syncs=" << m_inflight_ops.size() << ", ";
+    *ss << "waiting_syncs=" << m_queue.size() << " ]";
   }
 }
 
@@ -259,9 +202,8 @@ const char** ImageSyncThrottler<I>::get_tracked_conf_keys() const {
 }
 
 template <typename I>
-void ImageSyncThrottler<I>::handle_conf_change(
-                                              const struct md_config_t *conf,
-                                              const set<string> &changed) {
+void ImageSyncThrottler<I>::handle_conf_change(const struct md_config_t *conf,
+                                      const set<string> &changed) {
   if (changed.count("rbd_mirror_concurrent_image_syncs")) {
     set_max_concurrent_syncs(conf->rbd_mirror_concurrent_image_syncs);
   }