1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "WorkQueue.h"
16 #include "include/compat.h"
17 #include "common/errno.h"
19 #define dout_subsys ceph_subsys_tp
21 #define dout_prefix *_dout << name << " "
24 ThreadPool::ThreadPool(CephContext
*cct_
, string nm
, string tn
, int n
, const char *option
)
25 : cct(cct_
), name(std::move(nm
)), thread_name(std::move(tn
)),
26 lockname(name
+ "::lock"),
27 _lock(lockname
.c_str()), // this should be safe due to declaration order
37 _thread_num_option
= option
;
39 _conf_keys
= new const char*[2];
40 _conf_keys
[0] = _thread_num_option
.c_str();
43 _conf_keys
= new const char*[1];
48 void ThreadPool::TPHandle::suspend_tp_timeout()
50 cct
->get_heartbeat_map()->clear_timeout(hb
);
53 void ThreadPool::TPHandle::reset_tp_timeout()
55 cct
->get_heartbeat_map()->reset_timeout(
56 hb
, grace
, suicide_grace
);
59 ThreadPool::~ThreadPool()
61 assert(_threads
.empty());
65 void ThreadPool::handle_conf_change(const struct md_config_t
*conf
,
66 const std::set
<std::string
> &changed
)
68 if (changed
.count(_thread_num_option
)) {
70 int r
= conf
->get_val(_thread_num_option
.c_str(), &buf
, -1);
84 void ThreadPool::worker(WorkThread
*wt
)
87 ldout(cct
,10) << "worker start" << dendl
;
90 ss
<< name
<< " thread " << (void *)pthread_self();
91 heartbeat_handle_d
*hb
= cct
->get_heartbeat_map()->add_worker(ss
.str(), pthread_self());
95 // manage dynamic thread pool
97 if (_threads
.size() > _num_threads
) {
98 ldout(cct
,1) << " worker shutting down; too many threads (" << _threads
.size() << " > " << _num_threads
<< ")" << dendl
;
100 _old_threads
.push_back(wt
);
104 if (!_pause
&& !work_queues
.empty()) {
106 int tries
= work_queues
.size();
109 next_work_queue
%= work_queues
.size();
110 wq
= work_queues
[next_work_queue
++];
112 void *item
= wq
->_void_dequeue();
115 ldout(cct
,12) << "worker wq " << wq
->name
<< " start processing " << item
116 << " (" << processing
<< " active)" << dendl
;
117 TPHandle
tp_handle(cct
, hb
, wq
->timeout_interval
, wq
->suicide_interval
);
118 tp_handle
.reset_tp_timeout();
120 wq
->_void_process(item
, tp_handle
);
122 wq
->_void_process_finish(item
);
124 ldout(cct
,15) << "worker wq " << wq
->name
<< " done processing " << item
125 << " (" << processing
<< " active)" << dendl
;
126 if (_pause
|| _draining
)
136 ldout(cct
,20) << "worker waiting" << dendl
;
137 cct
->get_heartbeat_map()->reset_timeout(
139 cct
->_conf
->threadpool_default_timeout
,
141 _cond
.WaitInterval(_lock
,
143 cct
->_conf
->threadpool_empty_queue_max_wait
, 0));
145 ldout(cct
,1) << "worker finish" << dendl
;
147 cct
->get_heartbeat_map()->remove_worker(hb
);
152 void ThreadPool::start_threads()
154 assert(_lock
.is_locked());
155 while (_threads
.size() < _num_threads
) {
156 WorkThread
*wt
= new WorkThread(this);
157 ldout(cct
, 10) << "start_threads creating and starting " << wt
<< dendl
;
160 int r
= wt
->set_ioprio(ioprio_class
, ioprio_priority
);
162 lderr(cct
) << " set_ioprio got " << cpp_strerror(r
) << dendl
;
164 wt
->create(thread_name
.c_str());
168 void ThreadPool::join_old_threads()
170 assert(_lock
.is_locked());
171 while (!_old_threads
.empty()) {
172 ldout(cct
, 10) << "join_old_threads joining and deleting " << _old_threads
.front() << dendl
;
173 _old_threads
.front()->join();
174 delete _old_threads
.front();
175 _old_threads
.pop_front();
179 void ThreadPool::start()
181 ldout(cct
,10) << "start" << dendl
;
183 if (_thread_num_option
.length()) {
184 ldout(cct
, 10) << " registering config observer on " << _thread_num_option
<< dendl
;
185 cct
->_conf
->add_observer(this);
191 ldout(cct
,15) << "started" << dendl
;
194 void ThreadPool::stop(bool clear_after
)
196 ldout(cct
,10) << "stop" << dendl
;
198 if (_thread_num_option
.length()) {
199 ldout(cct
, 10) << " unregistering config observer on " << _thread_num_option
<< dendl
;
200 cct
->_conf
->remove_observer(this);
208 for (set
<WorkThread
*>::iterator p
= _threads
.begin();
216 for (unsigned i
=0; i
<work_queues
.size(); i
++)
217 work_queues
[i
]->_clear();
220 ldout(cct
,15) << "stopped" << dendl
;
223 void ThreadPool::pause()
225 ldout(cct
,10) << "pause" << dendl
;
229 _wait_cond
.Wait(_lock
);
231 ldout(cct
,15) << "paused" << dendl
;
234 void ThreadPool::pause_new()
236 ldout(cct
,10) << "pause_new" << dendl
;
242 void ThreadPool::unpause()
244 ldout(cct
,10) << "unpause" << dendl
;
252 void ThreadPool::drain(WorkQueue_
* wq
)
254 ldout(cct
,10) << "drain" << dendl
;
257 while (processing
|| (wq
!= NULL
&& !wq
->_empty()))
258 _wait_cond
.Wait(_lock
);
263 void ThreadPool::set_ioprio(int cls
, int priority
)
265 Mutex::Locker
l(_lock
);
267 ioprio_priority
= priority
;
268 for (set
<WorkThread
*>::iterator p
= _threads
.begin();
271 ldout(cct
,10) << __func__
272 << " class " << cls
<< " priority " << priority
273 << " pid " << (*p
)->get_pid()
275 int r
= (*p
)->set_ioprio(cls
, priority
);
277 lderr(cct
) << " set_ioprio got " << cpp_strerror(r
) << dendl
;
281 ShardedThreadPool::ShardedThreadPool(CephContext
*pcct_
, string nm
, string tn
,
282 uint32_t pnum_threads
):
285 thread_name(std::move(tn
)),
286 lockname(name
+ "::lock"),
287 shardedpool_lock(lockname
.c_str()),
288 num_threads(pnum_threads
),
293 void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index
)
296 ldout(cct
,10) << "worker start" << dendl
;
298 std::stringstream ss
;
299 ss
<< name
<< " thread " << (void *)pthread_self();
300 heartbeat_handle_d
*hb
= cct
->get_heartbeat_map()->add_worker(ss
.str(), pthread_self());
302 while (!stop_threads
) {
304 shardedpool_lock
.Lock();
307 while (pause_threads
) {
308 cct
->get_heartbeat_map()->reset_timeout(
310 wq
->timeout_interval
, wq
->suicide_interval
);
311 shardedpool_cond
.WaitInterval(shardedpool_lock
,
313 cct
->_conf
->threadpool_empty_queue_max_wait
, 0));
316 shardedpool_lock
.Unlock();
319 shardedpool_lock
.Lock();
320 if (wq
->is_shard_empty(thread_index
)) {
323 while (drain_threads
) {
324 cct
->get_heartbeat_map()->reset_timeout(
326 wq
->timeout_interval
, wq
->suicide_interval
);
327 shardedpool_cond
.WaitInterval(shardedpool_lock
,
329 cct
->_conf
->threadpool_empty_queue_max_wait
, 0));
333 shardedpool_lock
.Unlock();
336 cct
->get_heartbeat_map()->reset_timeout(
338 wq
->timeout_interval
, wq
->suicide_interval
);
339 wq
->_process(thread_index
, hb
);
343 ldout(cct
,10) << "sharded worker finish" << dendl
;
345 cct
->get_heartbeat_map()->remove_worker(hb
);
349 void ShardedThreadPool::start_threads()
351 assert(shardedpool_lock
.is_locked());
352 int32_t thread_index
= 0;
353 while (threads_shardedpool
.size() < num_threads
) {
355 WorkThreadSharded
*wt
= new WorkThreadSharded(this, thread_index
);
356 ldout(cct
, 10) << "start_threads creating and starting " << wt
<< dendl
;
357 threads_shardedpool
.push_back(wt
);
358 wt
->create(thread_name
.c_str());
363 void ShardedThreadPool::start()
365 ldout(cct
,10) << "start" << dendl
;
367 shardedpool_lock
.Lock();
369 shardedpool_lock
.Unlock();
370 ldout(cct
,15) << "started" << dendl
;
373 void ShardedThreadPool::stop()
375 ldout(cct
,10) << "stop" << dendl
;
378 wq
->return_waiting_threads();
379 for (vector
<WorkThreadSharded
*>::iterator p
= threads_shardedpool
.begin();
380 p
!= threads_shardedpool
.end();
385 threads_shardedpool
.clear();
386 ldout(cct
,15) << "stopped" << dendl
;
389 void ShardedThreadPool::pause()
391 ldout(cct
,10) << "pause" << dendl
;
392 shardedpool_lock
.Lock();
393 pause_threads
= true;
395 wq
->return_waiting_threads();
396 while (num_threads
!= num_paused
){
397 wait_cond
.Wait(shardedpool_lock
);
399 shardedpool_lock
.Unlock();
400 ldout(cct
,10) << "paused" << dendl
;
403 void ShardedThreadPool::pause_new()
405 ldout(cct
,10) << "pause_new" << dendl
;
406 shardedpool_lock
.Lock();
407 pause_threads
= true;
409 wq
->return_waiting_threads();
410 shardedpool_lock
.Unlock();
411 ldout(cct
,10) << "paused_new" << dendl
;
414 void ShardedThreadPool::unpause()
416 ldout(cct
,10) << "unpause" << dendl
;
417 shardedpool_lock
.Lock();
418 pause_threads
= false;
419 shardedpool_cond
.Signal();
420 shardedpool_lock
.Unlock();
421 ldout(cct
,10) << "unpaused" << dendl
;
424 void ShardedThreadPool::drain()
426 ldout(cct
,10) << "drain" << dendl
;
427 shardedpool_lock
.Lock();
428 drain_threads
= true;
430 wq
->return_waiting_threads();
431 while (num_threads
!= num_drained
) {
432 wait_cond
.Wait(shardedpool_lock
);
434 drain_threads
= false;
435 shardedpool_cond
.Signal();
436 shardedpool_lock
.Unlock();
437 ldout(cct
,10) << "drained" << dendl
;