*
*/
+#include "WorkQueue.h"
#include "include/compat.h"
-
-#include <sstream>
-
-#include "include/types.h"
-#include "include/utime.h"
#include "common/errno.h"
-#include "WorkQueue.h"
-
-#include "common/config.h"
-#include "common/HeartbeatMap.h"
#define dout_subsys ceph_subsys_tp
#undef dout_prefix
lockname(name + "::lock"),
shardedpool_lock(lockname.c_str()),
num_threads(pnum_threads),
- stop_threads(0),
- pause_threads(0),
- drain_threads(0),
num_paused(0),
num_drained(0),
wq(NULL) {}
ss << name << " thread " << name;
heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self());
- while (!stop_threads.read()) {
- if(pause_threads.read()) {
+ while (!stop_threads) {
+ if (pause_threads) {
shardedpool_lock.Lock();
++num_paused;
wait_cond.Signal();
- while(pause_threads.read()) {
+ while (pause_threads) {
cct->get_heartbeat_map()->reset_timeout(
- hb,
- wq->timeout_interval, wq->suicide_interval);
+ hb,
+ wq->timeout_interval, wq->suicide_interval);
shardedpool_cond.WaitInterval(shardedpool_lock,
- utime_t(
+ utime_t(
cct->_conf->threadpool_empty_queue_max_wait, 0));
}
--num_paused;
shardedpool_lock.Unlock();
}
- if (drain_threads.read()) {
+ if (drain_threads) {
shardedpool_lock.Lock();
if (wq->is_shard_empty(thread_index)) {
++num_drained;
wait_cond.Signal();
- while (drain_threads.read()) {
+ while (drain_threads) {
cct->get_heartbeat_map()->reset_timeout(
hb,
wq->timeout_interval, wq->suicide_interval);
void ShardedThreadPool::stop()
{
ldout(cct,10) << "stop" << dendl;
- stop_threads.set(1);
+ stop_threads = true;
assert(wq != NULL);
wq->return_waiting_threads();
for (vector<WorkThreadSharded*>::iterator p = threads_shardedpool.begin();
{
ldout(cct,10) << "pause" << dendl;
shardedpool_lock.Lock();
- pause_threads.set(1);
+ pause_threads = true;
assert(wq != NULL);
wq->return_waiting_threads();
while (num_threads != num_paused){
{
ldout(cct,10) << "pause_new" << dendl;
shardedpool_lock.Lock();
- pause_threads.set(1);
+ pause_threads = true;
assert(wq != NULL);
wq->return_waiting_threads();
shardedpool_lock.Unlock();
{
ldout(cct,10) << "unpause" << dendl;
shardedpool_lock.Lock();
- pause_threads.set(0);
+ pause_threads = false;
shardedpool_cond.Signal();
shardedpool_lock.Unlock();
ldout(cct,10) << "unpaused" << dendl;
{
ldout(cct,10) << "drain" << dendl;
shardedpool_lock.Lock();
- drain_threads.set(1);
+ drain_threads = true;
assert(wq != NULL);
wq->return_waiting_threads();
while (num_threads != num_drained) {
wait_cond.Wait(shardedpool_lock);
}
- drain_threads.set(0);
+ drain_threads = false;
shardedpool_cond.Signal();
shardedpool_lock.Unlock();
ldout(cct,10) << "drained" << dendl;