]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/common/Finisher.h
import 15.2.0 Octopus source
[ceph.git] / ceph / src / common / Finisher.h
index cca3f81c884352509835216ade20856df0011715..f1060b0e4bf87b3a12057bbf313f7cf524f99e91 100644 (file)
 #define CEPH_FINISHER_H
 
 #include "include/Context.h"
+#include "include/common_fwd.h"
 #include "common/Thread.h"
 #include "common/ceph_mutex.h"
 #include "common/perf_counters.h"
 #include "common/Cond.h"
 
-class CephContext;
 
 /// Finisher queue length performance counter ID.
 enum {
@@ -46,18 +46,19 @@ class Finisher {
   bool        finisher_empty_wait; ///< True mean someone wait finisher empty.
 
   /// Queue for contexts for which complete(0) will be called.
-  vector<pair<Context*,int>> finisher_queue;
+  std::vector<std::pair<Context*,int>> finisher_queue;
+  std::vector<std::pair<Context*,int>> in_progress_queue;
 
-  string thread_name;
+  std::string thread_name;
 
   /// Performance counter for the finisher's queue length.
   /// Only active for named finishers.
   PerfCounters *logger;
-  
+
   void *finisher_thread_entry();
 
   struct FinisherThread : public Thread {
-    Finisher *fin;    
+    Finisher *fin;
     explicit FinisherThread(Finisher *f) : fin(f) {}
     void* entry() override { return fin->finisher_thread_entry(); }
   } finisher_thread;
@@ -66,50 +67,51 @@ class Finisher {
   /// Add a context to complete, optionally specifying a parameter for the complete function.
   void queue(Context *c, int r = 0) {
     std::unique_lock ul(finisher_lock);
-    if (finisher_queue.empty()) {
-      finisher_cond.notify_all();
+    bool was_empty = finisher_queue.empty();
+    finisher_queue.push_back(std::make_pair(c, r));
+    if (was_empty) {
+      finisher_cond.notify_one();
     }
-    finisher_queue.push_back(make_pair(c, r));
     if (logger)
       logger->inc(l_finisher_queue_len);
   }
 
-  void queue(list<Context*>& ls) {
+  void queue(std::list<Context*>& ls) {
     {
       std::unique_lock ul(finisher_lock);
       if (finisher_queue.empty()) {
        finisher_cond.notify_all();
       }
       for (auto i : ls) {
-       finisher_queue.push_back(make_pair(i, 0));
+       finisher_queue.push_back(std::make_pair(i, 0));
       }
       if (logger)
        logger->inc(l_finisher_queue_len, ls.size());
     }
     ls.clear();
   }
-  void queue(deque<Context*>& ls) {
+  void queue(std::deque<Context*>& ls) {
     {
       std::unique_lock ul(finisher_lock);
       if (finisher_queue.empty()) {
        finisher_cond.notify_all();
       }
       for (auto i : ls) {
-       finisher_queue.push_back(make_pair(i, 0));
+       finisher_queue.push_back(std::make_pair(i, 0));
       }
       if (logger)
        logger->inc(l_finisher_queue_len, ls.size());
     }
     ls.clear();
   }
-  void queue(vector<Context*>& ls) {
+  void queue(std::vector<Context*>& ls) {
     {
       std::unique_lock ul(finisher_lock);
       if (finisher_queue.empty()) {
        finisher_cond.notify_all();
       }
       for (auto i : ls) {
-       finisher_queue.push_back(make_pair(i, 0));
+       finisher_queue.push_back(std::make_pair(i, 0));
       }
       if (logger)
        logger->inc(l_finisher_queue_len, ls.size());
@@ -142,12 +144,12 @@ class Finisher {
     finisher_thread(this) {}
 
   /// Construct a named Finisher that logs its queue length.
-  Finisher(CephContext *cct_, string name, string tn) :
+  Finisher(CephContext *cct_, std::string name, std::string tn) :
     cct(cct_), finisher_lock(ceph::make_mutex("Finisher::" + name)),
     finisher_stop(false), finisher_running(false), finisher_empty_wait(false),
     thread_name(tn), logger(0),
     finisher_thread(this) {
-    PerfCountersBuilder b(cct, string("finisher-") + name,
+    PerfCountersBuilder b(cct, std::string("finisher-") + name,
                          l_finisher_first, l_finisher_last);
     b.add_u64(l_finisher_queue_len, "queue_len");
     b.add_time_avg(l_finisher_complete_lat, "complete_latency");
@@ -189,28 +191,30 @@ public:
 };
 
 class ContextQueue {
-  list<Context *> q;
+  std::list<Context *> q;
   std::mutex q_mutex;
   ceph::mutex& mutex;
   ceph::condition_variable& cond;
+  std::atomic_bool q_empty = true;
 public:
   ContextQueue(ceph::mutex& mut,
               ceph::condition_variable& con)
     : mutex(mut), cond(con) {}
 
-  void queue(list<Context *>& ls) {
-    bool empty = false;
+  void queue(std::list<Context *>& ls) {
+    bool was_empty = false;
     {
       std::scoped_lock l(q_mutex);
       if (q.empty()) {
        q.swap(ls);
-       empty = true;
+       was_empty = true;
       } else {
        q.insert(q.end(), ls.begin(), ls.end());
       }
+      q_empty = q.empty();
     }
 
-    if (empty) {
+    if (was_empty) {
       std::scoped_lock l{mutex};
       cond.notify_all();
     }
@@ -218,17 +222,17 @@ public:
     ls.clear();
   }
 
-  void swap(list<Context *>& ls) {
+  void move_to(std::list<Context *>& ls) {
     ls.clear();
     std::scoped_lock l(q_mutex);
     if (!q.empty()) {
       q.swap(ls);
     }
+    q_empty = true;
   }
 
   bool empty() {
-    std::scoped_lock l(q_mutex);
-    return q.empty();
+    return q_empty;
   }
 };