#define CEPH_FINISHER_H
#include "include/Context.h"
+#include "include/common_fwd.h"
#include "common/Thread.h"
#include "common/ceph_mutex.h"
#include "common/perf_counters.h"
#include "common/Cond.h"
-class CephContext;
/// Finisher queue length performance counter ID.
enum {
bool finisher_empty_wait; ///< True mean someone wait finisher empty.
/// Queue for contexts for which complete(0) will be called.
- vector<pair<Context*,int>> finisher_queue;
+ std::vector<std::pair<Context*,int>> finisher_queue;
+ std::vector<std::pair<Context*,int>> in_progress_queue;
- string thread_name;
+ std::string thread_name;
/// Performance counter for the finisher's queue length.
/// Only active for named finishers.
PerfCounters *logger;
-
+
void *finisher_thread_entry();
struct FinisherThread : public Thread {
- Finisher *fin;
+ Finisher *fin;
explicit FinisherThread(Finisher *f) : fin(f) {}
void* entry() override { return fin->finisher_thread_entry(); }
} finisher_thread;
/// Add a context to complete, optionally specifying a parameter for the complete function.
void queue(Context *c, int r = 0) {
std::unique_lock ul(finisher_lock);
- if (finisher_queue.empty()) {
- finisher_cond.notify_all();
+ bool was_empty = finisher_queue.empty();
+ finisher_queue.push_back(std::make_pair(c, r));
+ if (was_empty) {
+ finisher_cond.notify_one();
}
- finisher_queue.push_back(make_pair(c, r));
if (logger)
logger->inc(l_finisher_queue_len);
}
- void queue(list<Context*>& ls) {
+ void queue(std::list<Context*>& ls) {
{
std::unique_lock ul(finisher_lock);
if (finisher_queue.empty()) {
finisher_cond.notify_all();
}
for (auto i : ls) {
- finisher_queue.push_back(make_pair(i, 0));
+ finisher_queue.push_back(std::make_pair(i, 0));
}
if (logger)
logger->inc(l_finisher_queue_len, ls.size());
}
ls.clear();
}
- void queue(deque<Context*>& ls) {
+ void queue(std::deque<Context*>& ls) {
{
std::unique_lock ul(finisher_lock);
if (finisher_queue.empty()) {
finisher_cond.notify_all();
}
for (auto i : ls) {
- finisher_queue.push_back(make_pair(i, 0));
+ finisher_queue.push_back(std::make_pair(i, 0));
}
if (logger)
logger->inc(l_finisher_queue_len, ls.size());
}
ls.clear();
}
- void queue(vector<Context*>& ls) {
+ void queue(std::vector<Context*>& ls) {
{
std::unique_lock ul(finisher_lock);
if (finisher_queue.empty()) {
finisher_cond.notify_all();
}
for (auto i : ls) {
- finisher_queue.push_back(make_pair(i, 0));
+ finisher_queue.push_back(std::make_pair(i, 0));
}
if (logger)
logger->inc(l_finisher_queue_len, ls.size());
finisher_thread(this) {}
/// Construct a named Finisher that logs its queue length.
- Finisher(CephContext *cct_, string name, string tn) :
+ Finisher(CephContext *cct_, std::string name, std::string tn) :
cct(cct_), finisher_lock(ceph::make_mutex("Finisher::" + name)),
finisher_stop(false), finisher_running(false), finisher_empty_wait(false),
thread_name(tn), logger(0),
finisher_thread(this) {
- PerfCountersBuilder b(cct, string("finisher-") + name,
+ PerfCountersBuilder b(cct, std::string("finisher-") + name,
l_finisher_first, l_finisher_last);
b.add_u64(l_finisher_queue_len, "queue_len");
b.add_time_avg(l_finisher_complete_lat, "complete_latency");
};
class ContextQueue {
- list<Context *> q;
+ std::list<Context *> q;
std::mutex q_mutex;
ceph::mutex& mutex;
ceph::condition_variable& cond;
+ std::atomic_bool q_empty = true;
public:
ContextQueue(ceph::mutex& mut,
ceph::condition_variable& con)
: mutex(mut), cond(con) {}
- void queue(list<Context *>& ls) {
- bool empty = false;
+ void queue(std::list<Context *>& ls) {
+ bool was_empty = false;
{
std::scoped_lock l(q_mutex);
if (q.empty()) {
q.swap(ls);
- empty = true;
+ was_empty = true;
} else {
q.insert(q.end(), ls.begin(), ls.end());
}
+ q_empty = q.empty();
}
- if (empty) {
+ if (was_empty) {
std::scoped_lock l{mutex};
cond.notify_all();
}
ls.clear();
}
- void swap(list<Context *>& ls) {
+ void move_to(std::list<Context *>& ls) {
ls.clear();
std::scoped_lock l(q_mutex);
if (!q.empty()) {
q.swap(ls);
}
+ q_empty = true;
}
bool empty() {
- std::scoped_lock l(q_mutex);
- return q.empty();
+ return q_empty;
}
};