1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "include/compat.h"
19 #include "include/types.h"
20 #include "include/utime.h"
21 #include "common/errno.h"
22 #include "WorkQueue.h"
24 #include "common/config.h"
25 #include "common/HeartbeatMap.h"
27 #define dout_subsys ceph_subsys_tp
29 #define dout_prefix *_dout << name << " "
32 ThreadPool::ThreadPool(CephContext
*cct_
, string nm
, string tn
, int n
, const char *option
)
33 : cct(cct_
), name(std::move(nm
)), thread_name(std::move(tn
)),
34 lockname(name
+ "::lock"),
35 _lock(lockname
.c_str()), // this should be safe due to declaration order
45 _thread_num_option
= option
;
47 _conf_keys
= new const char*[2];
48 _conf_keys
[0] = _thread_num_option
.c_str();
51 _conf_keys
= new const char*[1];
56 void ThreadPool::TPHandle::suspend_tp_timeout()
58 cct
->get_heartbeat_map()->clear_timeout(hb
);
61 void ThreadPool::TPHandle::reset_tp_timeout()
63 cct
->get_heartbeat_map()->reset_timeout(
64 hb
, grace
, suicide_grace
);
67 ThreadPool::~ThreadPool()
69 assert(_threads
.empty());
73 void ThreadPool::handle_conf_change(const struct md_config_t
*conf
,
74 const std::set
<std::string
> &changed
)
76 if (changed
.count(_thread_num_option
)) {
78 int r
= conf
->get_val(_thread_num_option
.c_str(), &buf
, -1);
92 void ThreadPool::worker(WorkThread
*wt
)
95 ldout(cct
,10) << "worker start" << dendl
;
99 ceph_pthread_getname(pthread_self(), name
, sizeof(name
));
100 ss
<< name
<< " thread " << name
;
101 heartbeat_handle_d
*hb
= cct
->get_heartbeat_map()->add_worker(ss
.str(), pthread_self());
105 // manage dynamic thread pool
107 if (_threads
.size() > _num_threads
) {
108 ldout(cct
,1) << " worker shutting down; too many threads (" << _threads
.size() << " > " << _num_threads
<< ")" << dendl
;
110 _old_threads
.push_back(wt
);
114 if (!_pause
&& !work_queues
.empty()) {
116 int tries
= work_queues
.size();
119 next_work_queue
%= work_queues
.size();
120 wq
= work_queues
[next_work_queue
++];
122 void *item
= wq
->_void_dequeue();
125 ldout(cct
,12) << "worker wq " << wq
->name
<< " start processing " << item
126 << " (" << processing
<< " active)" << dendl
;
127 TPHandle
tp_handle(cct
, hb
, wq
->timeout_interval
, wq
->suicide_interval
);
128 tp_handle
.reset_tp_timeout();
130 wq
->_void_process(item
, tp_handle
);
132 wq
->_void_process_finish(item
);
134 ldout(cct
,15) << "worker wq " << wq
->name
<< " done processing " << item
135 << " (" << processing
<< " active)" << dendl
;
136 if (_pause
|| _draining
)
146 ldout(cct
,20) << "worker waiting" << dendl
;
147 cct
->get_heartbeat_map()->reset_timeout(
149 cct
->_conf
->threadpool_default_timeout
,
151 _cond
.WaitInterval(_lock
,
153 cct
->_conf
->threadpool_empty_queue_max_wait
, 0));
155 ldout(cct
,1) << "worker finish" << dendl
;
157 cct
->get_heartbeat_map()->remove_worker(hb
);
162 void ThreadPool::start_threads()
164 assert(_lock
.is_locked());
165 while (_threads
.size() < _num_threads
) {
166 WorkThread
*wt
= new WorkThread(this);
167 ldout(cct
, 10) << "start_threads creating and starting " << wt
<< dendl
;
170 int r
= wt
->set_ioprio(ioprio_class
, ioprio_priority
);
172 lderr(cct
) << " set_ioprio got " << cpp_strerror(r
) << dendl
;
174 wt
->create(thread_name
.c_str());
178 void ThreadPool::join_old_threads()
180 assert(_lock
.is_locked());
181 while (!_old_threads
.empty()) {
182 ldout(cct
, 10) << "join_old_threads joining and deleting " << _old_threads
.front() << dendl
;
183 _old_threads
.front()->join();
184 delete _old_threads
.front();
185 _old_threads
.pop_front();
189 void ThreadPool::start()
191 ldout(cct
,10) << "start" << dendl
;
193 if (_thread_num_option
.length()) {
194 ldout(cct
, 10) << " registering config observer on " << _thread_num_option
<< dendl
;
195 cct
->_conf
->add_observer(this);
201 ldout(cct
,15) << "started" << dendl
;
204 void ThreadPool::stop(bool clear_after
)
206 ldout(cct
,10) << "stop" << dendl
;
208 if (_thread_num_option
.length()) {
209 ldout(cct
, 10) << " unregistering config observer on " << _thread_num_option
<< dendl
;
210 cct
->_conf
->remove_observer(this);
218 for (set
<WorkThread
*>::iterator p
= _threads
.begin();
226 for (unsigned i
=0; i
<work_queues
.size(); i
++)
227 work_queues
[i
]->_clear();
230 ldout(cct
,15) << "stopped" << dendl
;
233 void ThreadPool::pause()
235 ldout(cct
,10) << "pause" << dendl
;
239 _wait_cond
.Wait(_lock
);
241 ldout(cct
,15) << "paused" << dendl
;
244 void ThreadPool::pause_new()
246 ldout(cct
,10) << "pause_new" << dendl
;
252 void ThreadPool::unpause()
254 ldout(cct
,10) << "unpause" << dendl
;
262 void ThreadPool::drain(WorkQueue_
* wq
)
264 ldout(cct
,10) << "drain" << dendl
;
267 while (processing
|| (wq
!= NULL
&& !wq
->_empty()))
268 _wait_cond
.Wait(_lock
);
273 void ThreadPool::set_ioprio(int cls
, int priority
)
275 Mutex::Locker
l(_lock
);
277 ioprio_priority
= priority
;
278 for (set
<WorkThread
*>::iterator p
= _threads
.begin();
281 ldout(cct
,10) << __func__
282 << " class " << cls
<< " priority " << priority
283 << " pid " << (*p
)->get_pid()
285 int r
= (*p
)->set_ioprio(cls
, priority
);
287 lderr(cct
) << " set_ioprio got " << cpp_strerror(r
) << dendl
;
291 ShardedThreadPool::ShardedThreadPool(CephContext
*pcct_
, string nm
, string tn
,
292 uint32_t pnum_threads
):
295 thread_name(std::move(tn
)),
296 lockname(name
+ "::lock"),
297 shardedpool_lock(lockname
.c_str()),
298 num_threads(pnum_threads
),
306 void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index
)
309 ldout(cct
,10) << "worker start" << dendl
;
311 std::stringstream ss
;
313 ceph_pthread_getname(pthread_self(), name
, sizeof(name
));
314 ss
<< name
<< " thread " << name
;
315 heartbeat_handle_d
*hb
= cct
->get_heartbeat_map()->add_worker(ss
.str(), pthread_self());
317 while (!stop_threads
.read()) {
318 if(pause_threads
.read()) {
319 shardedpool_lock
.Lock();
322 while(pause_threads
.read()) {
323 cct
->get_heartbeat_map()->reset_timeout(
325 wq
->timeout_interval
, wq
->suicide_interval
);
326 shardedpool_cond
.WaitInterval(shardedpool_lock
,
328 cct
->_conf
->threadpool_empty_queue_max_wait
, 0));
331 shardedpool_lock
.Unlock();
333 if (drain_threads
.read()) {
334 shardedpool_lock
.Lock();
335 if (wq
->is_shard_empty(thread_index
)) {
338 while (drain_threads
.read()) {
339 cct
->get_heartbeat_map()->reset_timeout(
341 wq
->timeout_interval
, wq
->suicide_interval
);
342 shardedpool_cond
.WaitInterval(shardedpool_lock
,
344 cct
->_conf
->threadpool_empty_queue_max_wait
, 0));
348 shardedpool_lock
.Unlock();
351 cct
->get_heartbeat_map()->reset_timeout(
353 wq
->timeout_interval
, wq
->suicide_interval
);
354 wq
->_process(thread_index
, hb
);
358 ldout(cct
,10) << "sharded worker finish" << dendl
;
360 cct
->get_heartbeat_map()->remove_worker(hb
);
364 void ShardedThreadPool::start_threads()
366 assert(shardedpool_lock
.is_locked());
367 int32_t thread_index
= 0;
368 while (threads_shardedpool
.size() < num_threads
) {
370 WorkThreadSharded
*wt
= new WorkThreadSharded(this, thread_index
);
371 ldout(cct
, 10) << "start_threads creating and starting " << wt
<< dendl
;
372 threads_shardedpool
.push_back(wt
);
373 wt
->create(thread_name
.c_str());
378 void ShardedThreadPool::start()
380 ldout(cct
,10) << "start" << dendl
;
382 shardedpool_lock
.Lock();
384 shardedpool_lock
.Unlock();
385 ldout(cct
,15) << "started" << dendl
;
388 void ShardedThreadPool::stop()
390 ldout(cct
,10) << "stop" << dendl
;
393 wq
->return_waiting_threads();
394 for (vector
<WorkThreadSharded
*>::iterator p
= threads_shardedpool
.begin();
395 p
!= threads_shardedpool
.end();
400 threads_shardedpool
.clear();
401 ldout(cct
,15) << "stopped" << dendl
;
404 void ShardedThreadPool::pause()
406 ldout(cct
,10) << "pause" << dendl
;
407 shardedpool_lock
.Lock();
408 pause_threads
.set(1);
410 wq
->return_waiting_threads();
411 while (num_threads
!= num_paused
){
412 wait_cond
.Wait(shardedpool_lock
);
414 shardedpool_lock
.Unlock();
415 ldout(cct
,10) << "paused" << dendl
;
418 void ShardedThreadPool::pause_new()
420 ldout(cct
,10) << "pause_new" << dendl
;
421 shardedpool_lock
.Lock();
422 pause_threads
.set(1);
424 wq
->return_waiting_threads();
425 shardedpool_lock
.Unlock();
426 ldout(cct
,10) << "paused_new" << dendl
;
429 void ShardedThreadPool::unpause()
431 ldout(cct
,10) << "unpause" << dendl
;
432 shardedpool_lock
.Lock();
433 pause_threads
.set(0);
434 shardedpool_cond
.Signal();
435 shardedpool_lock
.Unlock();
436 ldout(cct
,10) << "unpaused" << dendl
;
439 void ShardedThreadPool::drain()
441 ldout(cct
,10) << "drain" << dendl
;
442 shardedpool_lock
.Lock();
443 drain_threads
.set(1);
445 wq
->return_waiting_threads();
446 while (num_threads
!= num_drained
) {
447 wait_cond
.Wait(shardedpool_lock
);
449 drain_threads
.set(0);
450 shardedpool_cond
.Signal();
451 shardedpool_lock
.Unlock();
452 ldout(cct
,10) << "drained" << dendl
;