]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_coroutine.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / rgw / rgw_coroutine.cc
index 1ccefc2daba54b827400e63ce0e0f5d951159a6f..4c658ca30ac7c079f87a4daf5e686b47522701d5 100644 (file)
@@ -1,6 +1,7 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
+// vim: ts=8 sw=2 smarttab ft=cpp
 
+#include "include/Context.h"
 #include "common/ceph_json.h"
 #include "rgw_coroutine.h"
 
@@ -23,7 +24,7 @@ public:
   }
 };
 
-RGWCompletionManager::RGWCompletionManager(CephContext *_cct) : cct(_cct), lock("RGWCompletionManager::lock"),
+RGWCompletionManager::RGWCompletionManager(CephContext *_cct) : cct(_cct),
                                             timer(cct, lock)
 {
   timer.init();
@@ -31,20 +32,20 @@ RGWCompletionManager::RGWCompletionManager(CephContext *_cct) : cct(_cct), lock(
 
 RGWCompletionManager::~RGWCompletionManager()
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   timer.cancel_all_events();
   timer.shutdown();
 }
 
 void RGWCompletionManager::complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   _complete(cn, io_id, user_info);
 }
 
 void RGWCompletionManager::register_completion_notifier(RGWAioCompletionNotifier *cn)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   if (cn) {
     cns.insert(cn);
   }
@@ -52,7 +53,7 @@ void RGWCompletionManager::register_completion_notifier(RGWAioCompletionNotifier
 
 void RGWCompletionManager::unregister_completion_notifier(RGWAioCompletionNotifier *cn)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   if (cn) {
     cns.erase(cn);
   }
@@ -69,17 +70,17 @@ void RGWCompletionManager::_complete(RGWAioCompletionNotifier *cn, const rgw_io_
     return;
   }
   complete_reqs.push_back(io_completion{io_id, user_info});
-  cond.Signal();
+  cond.notify_all();
 }
 
 int RGWCompletionManager::get_next(io_completion *io)
 {
-  Mutex::Locker l(lock);
+  std::unique_lock l{lock};
   while (complete_reqs.empty()) {
     if (going_down) {
       return -ECANCELED;
     }
-    cond.Wait(lock);
+    cond.wait(l);
   }
   *io = complete_reqs.front();
   complete_reqs_set.erase(io->io_id);
@@ -89,7 +90,7 @@ int RGWCompletionManager::get_next(io_completion *io)
 
 bool RGWCompletionManager::try_get_next(io_completion *io)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   if (complete_reqs.empty()) {
     return false;
   }
@@ -101,17 +102,17 @@ bool RGWCompletionManager::try_get_next(io_completion *io)
 
 void RGWCompletionManager::go_down()
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   for (auto cn : cns) {
     cn->unregister();
   }
   going_down = true;
-  cond.Signal();
+  cond.notify_all();
 }
 
 void RGWCompletionManager::wait_interval(void *opaque, const utime_t& interval, void *user_info)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   ceph_assert(waiters.find(opaque) == waiters.end());
   waiters[opaque] = user_info;
   timer.add_event_after(interval, new WaitContext(this, opaque));
@@ -119,7 +120,7 @@ void RGWCompletionManager::wait_interval(void *opaque, const utime_t& interval,
 
 void RGWCompletionManager::wakeup(void *opaque)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   _wakeup(opaque);
 }
 
@@ -176,7 +177,7 @@ void RGWCoroutine::StatusItem::dump(Formatter *f) const {
 
 stringstream& RGWCoroutine::Status::set_status()
 {
-  RWLock::WLocker l(lock);
+  std::unique_lock l{lock};
   string s = status.str();
   status.str(string());
   if (!timestamp.is_zero()) {
@@ -437,9 +438,8 @@ static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg)
 
 RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager *_mgr, const rgw_io_id& _io_id, void *_user_data) : completion_mgr(_mgr),
                                                                          io_id(_io_id),
-                                                                         user_data(_user_data), lock("RGWAioCompletionNotifier"), registered(true) {
-  c = librados::Rados::aio_create_completion((void *)this, NULL,
-                                            _aio_completion_notifier_cb);
+                                                                         user_data(_user_data), registered(true) {
+  c = librados::Rados::aio_create_completion(this, _aio_completion_notifier_cb);
 }
 
 RGWAioCompletionNotifier *RGWCoroutinesStack::create_completion_notifier()
@@ -532,7 +532,7 @@ bool RGWCoroutinesStack::consume_io_finish(const rgw_io_id& io_id)
 void RGWCoroutinesManager::handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks,
                                                   RGWCompletionManager::io_completion& io, int *blocked_count)
 {
-  ceph_assert(lock.is_wlocked());
+  ceph_assert(ceph_mutex_is_wlocked(lock));
   RGWCoroutinesStack *stack = static_cast<RGWCoroutinesStack *>(io.user_info);
   if (context_stacks.find(stack) == context_stacks.end()) {
     return;
@@ -558,13 +558,13 @@ void RGWCoroutinesManager::handle_unblocked_stack(set<RGWCoroutinesStack *>& con
 
 void RGWCoroutinesManager::schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack)
 {
-  RWLock::WLocker wl(lock);
+  std::unique_lock wl{lock};
   _schedule(env, stack);
 }
 
 void RGWCoroutinesManager::_schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack)
 {
-  ceph_assert(lock.is_wlocked());
+  ceph_assert(ceph_mutex_is_wlocked(lock));
   if (!stack->is_scheduled) {
     env->scheduled_stacks->push_back(stack);
     stack->set_is_scheduled(true);
@@ -594,7 +594,7 @@ int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
 
   uint64_t run_context = ++run_context_count;
 
-  lock.get_write();
+  lock.lock();
   set<RGWCoroutinesStack *>& context_stacks = run_contexts[run_context];
   list<RGWCoroutinesStack *> scheduled_stacks;
   for (auto& st : stacks) {
@@ -622,7 +622,7 @@ int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
 
     ret = stack->operate(&env);
 
-    lock.get_write();
+    lock.lock();
 
     stack->set_is_scheduled(false);
     if (ret < 0) {
@@ -691,7 +691,7 @@ int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
     while (blocked_count - interval_wait_count >= ops_window) {
       lock.unlock();
       ret = completion_mgr->get_next(&io);
-      lock.get_write();
+      lock.lock();
       if (ret < 0) {
        ldout(cct, 5) << "completion_mgr.get_next() returned ret=" << ret << dendl;
       }
@@ -702,7 +702,7 @@ next:
     while (scheduled_stacks.empty() && blocked_count > 0) {
       lock.unlock();
       ret = completion_mgr->get_next(&io);
-      lock.get_write();
+      lock.lock();
       if (ret < 0) {
         ldout(cct, 5) << "completion_mgr.get_next() returned ret=" << ret << dendl;
       }
@@ -779,7 +779,7 @@ RGWAioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCo
 }
 
 void RGWCoroutinesManager::dump(Formatter *f) const {
-  RWLock::RLocker rl(lock);
+  std::shared_lock rl{lock};
 
   f->open_array_section("run_contexts");
   for (auto& i : run_contexts) {
@@ -811,7 +811,7 @@ string RGWCoroutinesManager::get_id()
 
 void RGWCoroutinesManagerRegistry::add(RGWCoroutinesManager *mgr)
 {
-  RWLock::WLocker wl(lock);
+  std::unique_lock wl{lock};
   if (managers.find(mgr) == managers.end()) {
     managers.insert(mgr);
     get();
@@ -820,7 +820,7 @@ void RGWCoroutinesManagerRegistry::add(RGWCoroutinesManager *mgr)
 
 void RGWCoroutinesManagerRegistry::remove(RGWCoroutinesManager *mgr)
 {
-  RWLock::WLocker wl(lock);
+  std::unique_lock wl{lock};
   if (managers.find(mgr) != managers.end()) {
     managers.erase(mgr);
     put();
@@ -831,7 +831,7 @@ RGWCoroutinesManagerRegistry::~RGWCoroutinesManagerRegistry()
 {
   AdminSocket *admin_socket = cct->get_admin_socket();
   if (!admin_command.empty()) {
-    admin_socket->unregister_command(admin_command);
+    admin_socket->unregister_commands(this);
   }
 }
 
@@ -839,10 +839,10 @@ int RGWCoroutinesManagerRegistry::hook_to_admin_command(const string& command)
 {
   AdminSocket *admin_socket = cct->get_admin_socket();
   if (!admin_command.empty()) {
-    admin_socket->unregister_command(admin_command);
+    admin_socket->unregister_commands(this);
   }
   admin_command = command;
-  int r = admin_socket->register_command(admin_command, admin_command, this,
+  int r = admin_socket->register_command(admin_command, this,
                                     "dump current coroutines stack state");
   if (r < 0) {
     lderr(cct) << "ERROR: fail to register admin socket command (r=" << r << ")" << dendl;
@@ -851,17 +851,14 @@ int RGWCoroutinesManagerRegistry::hook_to_admin_command(const string& command)
   return 0;
 }
 
-bool RGWCoroutinesManagerRegistry::call(std::string_view command,
-                                        const cmdmap_t& cmdmap,
-                                        std::string_view format,
-                                        bufferlist& out) {
-  RWLock::RLocker rl(lock);
-  stringstream ss;
-  JSONFormatter f;
-  ::encode_json("cr_managers", *this, &f);
-  f.flush(ss);
-  out.append(ss);
-  return true;
+int RGWCoroutinesManagerRegistry::call(std::string_view command,
+                                      const cmdmap_t& cmdmap,
+                                      Formatter *f,
+                                      std::ostream& ss,
+                                      bufferlist& out) {
+  std::shared_lock rl{lock};
+  ::encode_json("cr_managers", *this, f);
+  return 0;
 }
 
 void RGWCoroutinesManagerRegistry::dump(Formatter *f) const {