// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
+// vim: ts=8 sw=2 smarttab ft=cpp
+#include "include/Context.h"
#include "common/ceph_json.h"
#include "rgw_coroutine.h"
}
};
-RGWCompletionManager::RGWCompletionManager(CephContext *_cct) : cct(_cct), lock("RGWCompletionManager::lock"),
+RGWCompletionManager::RGWCompletionManager(CephContext *_cct) : cct(_cct),
timer(cct, lock)
{
timer.init();
RGWCompletionManager::~RGWCompletionManager()
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
timer.cancel_all_events();
timer.shutdown();
}
void RGWCompletionManager::complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
_complete(cn, io_id, user_info);
}
void RGWCompletionManager::register_completion_notifier(RGWAioCompletionNotifier *cn)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (cn) {
cns.insert(cn);
}
void RGWCompletionManager::unregister_completion_notifier(RGWAioCompletionNotifier *cn)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (cn) {
cns.erase(cn);
}
return;
}
complete_reqs.push_back(io_completion{io_id, user_info});
- cond.Signal();
+ cond.notify_all();
}
int RGWCompletionManager::get_next(io_completion *io)
{
- Mutex::Locker l(lock);
+ std::unique_lock l{lock};
while (complete_reqs.empty()) {
if (going_down) {
return -ECANCELED;
}
- cond.Wait(lock);
+ cond.wait(l);
}
*io = complete_reqs.front();
complete_reqs_set.erase(io->io_id);
bool RGWCompletionManager::try_get_next(io_completion *io)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (complete_reqs.empty()) {
return false;
}
void RGWCompletionManager::go_down()
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
for (auto cn : cns) {
cn->unregister();
}
going_down = true;
- cond.Signal();
+ cond.notify_all();
}
void RGWCompletionManager::wait_interval(void *opaque, const utime_t& interval, void *user_info)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
ceph_assert(waiters.find(opaque) == waiters.end());
waiters[opaque] = user_info;
timer.add_event_after(interval, new WaitContext(this, opaque));
void RGWCompletionManager::wakeup(void *opaque)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
_wakeup(opaque);
}
stringstream& RGWCoroutine::Status::set_status()
{
- RWLock::WLocker l(lock);
+ std::unique_lock l{lock};
string s = status.str();
status.str(string());
if (!timestamp.is_zero()) {
RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager *_mgr, const rgw_io_id& _io_id, void *_user_data) : completion_mgr(_mgr),
io_id(_io_id),
- user_data(_user_data), lock("RGWAioCompletionNotifier"), registered(true) {
- c = librados::Rados::aio_create_completion((void *)this, NULL,
- _aio_completion_notifier_cb);
+ user_data(_user_data), registered(true) {
+ c = librados::Rados::aio_create_completion(this, _aio_completion_notifier_cb);
}
RGWAioCompletionNotifier *RGWCoroutinesStack::create_completion_notifier()
void RGWCoroutinesManager::handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks,
RGWCompletionManager::io_completion& io, int *blocked_count)
{
- ceph_assert(lock.is_wlocked());
+ ceph_assert(ceph_mutex_is_wlocked(lock));
RGWCoroutinesStack *stack = static_cast<RGWCoroutinesStack *>(io.user_info);
if (context_stacks.find(stack) == context_stacks.end()) {
return;
void RGWCoroutinesManager::schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack)
{
- RWLock::WLocker wl(lock);
+ std::unique_lock wl{lock};
_schedule(env, stack);
}
void RGWCoroutinesManager::_schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack)
{
- ceph_assert(lock.is_wlocked());
+ ceph_assert(ceph_mutex_is_wlocked(lock));
if (!stack->is_scheduled) {
env->scheduled_stacks->push_back(stack);
stack->set_is_scheduled(true);
uint64_t run_context = ++run_context_count;
- lock.get_write();
+ lock.lock();
set<RGWCoroutinesStack *>& context_stacks = run_contexts[run_context];
list<RGWCoroutinesStack *> scheduled_stacks;
for (auto& st : stacks) {
ret = stack->operate(&env);
- lock.get_write();
+ lock.lock();
stack->set_is_scheduled(false);
if (ret < 0) {
while (blocked_count - interval_wait_count >= ops_window) {
lock.unlock();
ret = completion_mgr->get_next(&io);
- lock.get_write();
+ lock.lock();
if (ret < 0) {
ldout(cct, 5) << "completion_mgr.get_next() returned ret=" << ret << dendl;
}
while (scheduled_stacks.empty() && blocked_count > 0) {
lock.unlock();
ret = completion_mgr->get_next(&io);
- lock.get_write();
+ lock.lock();
if (ret < 0) {
ldout(cct, 5) << "completion_mgr.get_next() returned ret=" << ret << dendl;
}
}
void RGWCoroutinesManager::dump(Formatter *f) const {
- RWLock::RLocker rl(lock);
+ std::shared_lock rl{lock};
f->open_array_section("run_contexts");
for (auto& i : run_contexts) {
void RGWCoroutinesManagerRegistry::add(RGWCoroutinesManager *mgr)
{
- RWLock::WLocker wl(lock);
+ std::unique_lock wl{lock};
if (managers.find(mgr) == managers.end()) {
managers.insert(mgr);
get();
void RGWCoroutinesManagerRegistry::remove(RGWCoroutinesManager *mgr)
{
- RWLock::WLocker wl(lock);
+ std::unique_lock wl{lock};
if (managers.find(mgr) != managers.end()) {
managers.erase(mgr);
put();
{
AdminSocket *admin_socket = cct->get_admin_socket();
if (!admin_command.empty()) {
- admin_socket->unregister_command(admin_command);
+ admin_socket->unregister_commands(this);
}
}
{
AdminSocket *admin_socket = cct->get_admin_socket();
if (!admin_command.empty()) {
- admin_socket->unregister_command(admin_command);
+ admin_socket->unregister_commands(this);
}
admin_command = command;
- int r = admin_socket->register_command(admin_command, admin_command, this,
+ int r = admin_socket->register_command(admin_command, this,
"dump current coroutines stack state");
if (r < 0) {
lderr(cct) << "ERROR: fail to register admin socket command (r=" << r << ")" << dendl;
return 0;
}
-bool RGWCoroutinesManagerRegistry::call(std::string_view command,
- const cmdmap_t& cmdmap,
- std::string_view format,
- bufferlist& out) {
- RWLock::RLocker rl(lock);
- stringstream ss;
- JSONFormatter f;
- ::encode_json("cr_managers", *this, &f);
- f.flush(ss);
- out.append(ss);
- return true;
+int RGWCoroutinesManagerRegistry::call(std::string_view command,
+ const cmdmap_t& cmdmap,
+ Formatter *f,
+ std::ostream& ss,
+ bufferlist& out) {
+ std::shared_lock rl{lock};
+ ::encode_json("cr_managers", *this, f);
+ return 0;
}
void RGWCoroutinesManagerRegistry::dump(Formatter *f) const {