]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/common/WorkQueue.cc
update sources to v12.1.0
[ceph.git] / ceph / src / common / WorkQueue.cc
index ec5f8a1476d1f0cbe5a7928c444f94cc9d9576f9..1d26723128a873ccf56a2871afa7c89c735fca45 100644 (file)
  * 
  */
 
+#include "WorkQueue.h"
 #include "include/compat.h"
-
-#include <sstream>
-
-#include "include/types.h"
-#include "include/utime.h"
 #include "common/errno.h"
-#include "WorkQueue.h"
-
-#include "common/config.h"
-#include "common/HeartbeatMap.h"
 
 #define dout_subsys ceph_subsys_tp
 #undef dout_prefix
@@ -296,9 +288,6 @@ ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, string nm, string tn,
   lockname(name + "::lock"),
   shardedpool_lock(lockname.c_str()),
   num_threads(pnum_threads),
-  stop_threads(0),
-  pause_threads(0),
-  drain_threads(0),
   num_paused(0),
   num_drained(0),
   wq(NULL) {}
@@ -314,28 +303,28 @@ void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
   ss << name << " thread " << name;
   heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self());
 
-  while (!stop_threads.read()) {
-    if(pause_threads.read()) {
+  while (!stop_threads) {
+    if (pause_threads) {
       shardedpool_lock.Lock();
       ++num_paused;
       wait_cond.Signal();
-      while(pause_threads.read()) {
+      while (pause_threads) {
        cct->get_heartbeat_map()->reset_timeout(
-        hb,
-        wq->timeout_interval, wq->suicide_interval);
+               hb,
+               wq->timeout_interval, wq->suicide_interval);
        shardedpool_cond.WaitInterval(shardedpool_lock,
-        utime_t(
+          utime_t(
           cct->_conf->threadpool_empty_queue_max_wait, 0));
       }
       --num_paused;
       shardedpool_lock.Unlock();
     }
-    if (drain_threads.read()) {
+    if (drain_threads) {
       shardedpool_lock.Lock();
       if (wq->is_shard_empty(thread_index)) {
         ++num_drained;
         wait_cond.Signal();
-        while (drain_threads.read()) {
+        while (drain_threads) {
          cct->get_heartbeat_map()->reset_timeout(
            hb,
            wq->timeout_interval, wq->suicide_interval);
@@ -388,7 +377,7 @@ void ShardedThreadPool::start()
 void ShardedThreadPool::stop()
 {
   ldout(cct,10) << "stop" << dendl;
-  stop_threads.set(1);
+  stop_threads = true;
   assert(wq != NULL);
   wq->return_waiting_threads();
   for (vector<WorkThreadSharded*>::iterator p = threads_shardedpool.begin();
@@ -405,7 +394,7 @@ void ShardedThreadPool::pause()
 {
   ldout(cct,10) << "pause" << dendl;
   shardedpool_lock.Lock();
-  pause_threads.set(1);
+  pause_threads = true;
   assert(wq != NULL);
   wq->return_waiting_threads();
   while (num_threads != num_paused){
@@ -419,7 +408,7 @@ void ShardedThreadPool::pause_new()
 {
   ldout(cct,10) << "pause_new" << dendl;
   shardedpool_lock.Lock();
-  pause_threads.set(1);
+  pause_threads = true;
   assert(wq != NULL);
   wq->return_waiting_threads();
   shardedpool_lock.Unlock();
@@ -430,7 +419,7 @@ void ShardedThreadPool::unpause()
 {
   ldout(cct,10) << "unpause" << dendl;
   shardedpool_lock.Lock();
-  pause_threads.set(0);
+  pause_threads = false;
   shardedpool_cond.Signal();
   shardedpool_lock.Unlock();
   ldout(cct,10) << "unpaused" << dendl;
@@ -440,13 +429,13 @@ void ShardedThreadPool::drain()
 {
   ldout(cct,10) << "drain" << dendl;
   shardedpool_lock.Lock();
-  drain_threads.set(1);
+  drain_threads = true;
   assert(wq != NULL);
   wq->return_waiting_threads();
   while (num_threads != num_drained) {
     wait_cond.Wait(shardedpool_lock);
   }
-  drain_threads.set(0);
+  drain_threads = false;
   shardedpool_cond.Signal();
   shardedpool_lock.Unlock();
   ldout(cct,10) << "drained" << dendl;