#include "common/debug.h"
#include "common/Timer.h"
#include "common/admin_socket.h"
-#include "common/RWLock.h"
#include "rgw_common.h"
#include "rgw_http_client_types.h"
rgw_io_id io_id;
void *user_info;
};
- list<io_completion> complete_reqs;
- set<rgw_io_id> complete_reqs_set;
+ std::list<io_completion> complete_reqs;
+ std::set<rgw_io_id> complete_reqs_set;
using NotifierRef = boost::intrusive_ptr<RGWAioCompletionNotifier>;
- set<NotifierRef> cns;
+ std::set<NotifierRef> cns;
ceph::mutex lock = ceph::make_mutex("RGWCompletionManager::lock");
ceph::condition_variable cond;
std::atomic<bool> going_down = { false };
- map<void *, void *> waiters;
+ std::map<void *, void *> waiters;
class WaitContext;
struct RGWCoroutinesEnv {
uint64_t run_context;
RGWCoroutinesManager *manager;
- list<RGWCoroutinesStack *> *scheduled_stacks;
+ std::list<RGWCoroutinesStack *> *scheduled_stacks;
RGWCoroutinesStack *stack;
RGWCoroutinesEnv() : run_context(0), manager(NULL), scheduled_stacks(NULL), stack(NULL) {}
};
struct rgw_spawned_stacks {
- vector<RGWCoroutinesStack *> entries;
+ std::vector<RGWCoroutinesStack *> entries;
rgw_spawned_stacks() {}
}
void inherit(rgw_spawned_stacks *source) {
- for (vector<RGWCoroutinesStack *>::iterator iter = source->entries.begin();
- iter != source->entries.end(); ++iter) {
- add_pending(*iter);
+ for (auto* entry : source->entries) {
+ add_pending(entry);
}
source->entries.clear();
}
struct StatusItem {
utime_t timestamp;
- string status;
+ std::string status;
- StatusItem(utime_t& t, const string& s) : timestamp(t), status(s) {}
+ StatusItem(utime_t& t, const std::string& s) : timestamp(t), status(s) {}
void dump(Formatter *f) const;
};
int max_history;
utime_t timestamp;
- stringstream status;
+ std::stringstream status;
explicit Status(CephContext *_cct) : cct(_cct), max_history(MAX_COROUTINE_HISTORY) {}
- deque<StatusItem> history;
+ std::deque<StatusItem> history;
- stringstream& set_status();
+ std::stringstream& set_status();
} status;
- stringstream description;
+ std::stringstream description;
protected:
bool _yield_ret;
rgw_spawned_stacks spawned;
- stringstream error_stream;
+ std::stringstream error_stream;
int set_state(int s, int ret = 0) {
retcode = ret;
void set_io_blocked(bool flag);
void reset_description() {
- description.str(string());
+ description.str(std::string());
}
- stringstream& set_description() {
+ std::stringstream& set_description() {
return description;
}
- stringstream& set_status() {
+ std::stringstream& set_status() {
return status.set_status();
}
- stringstream& set_status(const string& s) {
- stringstream& status = set_status();
+ std::stringstream& set_status(const std::string& s) {
+ std::stringstream& status = set_status();
status << s;
return status;
}
- virtual int operate_wrapper() {
- return operate();
+ virtual int operate_wrapper(const DoutPrefixProvider *dpp) {
+ return operate(dpp);
}
public:
RGWCoroutine(CephContext *_cct) : status(_cct), _yield_ret(false), cct(_cct), stack(NULL), retcode(0), state(RGWCoroutine_Run) {}
- ~RGWCoroutine() override;
+ virtual ~RGWCoroutine() override;
- virtual int operate() = 0;
+ virtual int operate(const DoutPrefixProvider *dpp) = 0;
bool is_done() { return (state == RGWCoroutine_Done || state == RGWCoroutine_Error); }
bool is_error() { return (state == RGWCoroutine_Error); }
- stringstream& log_error() { return error_stream; }
- string error_str() {
+ std::stringstream& log_error() { return error_stream; }
+ std::string error_str() {
return error_stream.str();
}
void wait_for_child();
- virtual string to_str() const;
+ virtual std::string to_str() const;
RGWCoroutinesStack *get_stack() const {
return stack;
void io_complete(const rgw_io_id& io_id);
};
-ostream& operator<<(ostream& out, const RGWCoroutine& cr);
+std::ostream& operator<<(std::ostream& out, const RGWCoroutine& cr);
#define yield_until_true(x) \
do { \
template <class T>
class RGWConsumerCR : public RGWCoroutine {
- list<T> product;
+ std::list<T> product;
public:
explicit RGWConsumerCR(CephContext *_cct) : RGWCoroutine(_cct) {}
}
void receive(const T& p, bool wakeup = true);
- void receive(list<T>& l, bool wakeup = true);
+ void receive(std::list<T>& l, bool wakeup = true);
};
class RGWCoroutinesStack : public RefCountedObject {
RGWCoroutinesManager *ops_mgr;
- list<RGWCoroutine *> ops;
- list<RGWCoroutine *>::iterator pos;
+ std::list<RGWCoroutine *> ops;
+ std::list<RGWCoroutine *>::iterator pos;
rgw_spawned_stacks spawned;
RGWCoroutinesStack *preallocated_stack{nullptr};
- set<RGWCoroutinesStack *> blocked_by_stack;
- set<RGWCoroutinesStack *> blocking_stacks;
+ std::set<RGWCoroutinesStack *> blocked_by_stack;
+ std::set<RGWCoroutinesStack *> blocking_stacks;
- map<int64_t, rgw_io_id> io_finish_ids;
+ std::map<int64_t, rgw_io_id> io_finish_ids;
rgw_io_id io_blocked_id;
bool done_flag;
return id;
}
- int operate(RGWCoroutinesEnv *env);
+ int operate(const DoutPrefixProvider *dpp, RGWCoroutinesEnv *env);
bool is_done() {
return done_flag;
return retcode;
}
- string error_str();
+ std::string error_str();
void call(RGWCoroutine *next_op);
RGWCoroutinesStack *spawn(RGWCoroutine *next_op, bool wait);
};
template <class T>
-void RGWConsumerCR<T>::receive(list<T>& l, bool wakeup)
+void RGWConsumerCR<T>::receive(std::list<T>& l, bool wakeup)
{
product.splice(product.end(), l);
if (wakeup) {
class RGWCoroutinesManagerRegistry : public RefCountedObject, public AdminSocketHook {
CephContext *cct;
- set<RGWCoroutinesManager *> managers;
+ std::set<RGWCoroutinesManager *> managers;
ceph::shared_mutex lock =
ceph::make_shared_mutex("RGWCoroutinesRegistry::lock");
- string admin_command;
+ std::string admin_command;
public:
explicit RGWCoroutinesManagerRegistry(CephContext *_cct) : cct(_cct) {}
- ~RGWCoroutinesManagerRegistry() override;
+ virtual ~RGWCoroutinesManagerRegistry() override;
void add(RGWCoroutinesManager *mgr);
void remove(RGWCoroutinesManager *mgr);
- int hook_to_admin_command(const string& command);
+ int hook_to_admin_command(const std::string& command);
int call(std::string_view command, const cmdmap_t& cmdmap,
Formatter *f,
std::ostream& ss,
std::atomic<bool> going_down = { false };
std::atomic<int64_t> run_context_count = { 0 };
- map<uint64_t, set<RGWCoroutinesStack *> > run_contexts;
+ std::map<uint64_t, std::set<RGWCoroutinesStack *> > run_contexts;
std::atomic<int64_t> max_io_id = { 0 };
std::atomic<uint64_t> max_stack_id = { 0 };
RGWIOIDProvider io_id_provider;
- void handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks,
+ void handle_unblocked_stack(std::set<RGWCoroutinesStack *>& context_stacks, std::list<RGWCoroutinesStack *>& scheduled_stacks,
RGWCompletionManager::io_completion& io, int *waiting_count);
protected:
RGWCompletionManager *completion_mgr;
int ops_window;
- string id;
+ std::string id;
void put_completion_notifier(RGWAioCompletionNotifier *cn);
public:
cr_registry->add(this);
}
}
- virtual ~RGWCoroutinesManager() {
- stop();
- completion_mgr->put();
- if (cr_registry) {
- cr_registry->remove(this);
- }
- }
+ virtual ~RGWCoroutinesManager();
- int run(list<RGWCoroutinesStack *>& ops);
- int run(RGWCoroutine *op);
+ int run(const DoutPrefixProvider *dpp, std::list<RGWCoroutinesStack *>& ops);
+ int run(const DoutPrefixProvider *dpp, RGWCoroutine *op);
void stop() {
bool expected = false;
if (going_down.compare_exchange_strong(expected, true)) {
void set_sleeping(RGWCoroutine *cr, bool flag);
void io_complete(RGWCoroutine *cr, const rgw_io_id& io_id);
- virtual string get_id();
+ virtual std::string get_id();
void dump(Formatter *f) const;
RGWIOIDProvider& get_io_id_provider() {
class RGWSimpleCoroutine : public RGWCoroutine {
bool called_cleanup;
- int operate() override;
+ int operate(const DoutPrefixProvider *dpp) override;
int state_init();
- int state_send_request();
+ int state_send_request(const DoutPrefixProvider *dpp);
int state_request_complete();
int state_all_complete();
~RGWSimpleCoroutine() override;
virtual int init() { return 0; }
- virtual int send_request() = 0;
+ virtual int send_request(const DoutPrefixProvider *dpp) = 0;
virtual int request_complete() = 0;
virtual int finish() { return 0; }
virtual void request_cleanup() {}