1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "WorkQueue.h"
16 #include "include/compat.h"
17 #include "common/errno.h"
19 #define dout_subsys ceph_subsys_tp
21 #define dout_prefix *_dout << name << " "
24 ThreadPool::ThreadPool(CephContext
*cct_
, string nm
, string tn
, int n
, const char *option
)
25 : cct(cct_
), name(std::move(nm
)), thread_name(std::move(tn
)),
26 lockname(name
+ "::lock"),
27 _lock(lockname
.c_str()), // this should be safe due to declaration order
37 _thread_num_option
= option
;
39 _conf_keys
= new const char*[2];
40 _conf_keys
[0] = _thread_num_option
.c_str();
43 _conf_keys
= new const char*[1];
48 void ThreadPool::TPHandle::suspend_tp_timeout()
50 cct
->get_heartbeat_map()->clear_timeout(hb
);
53 void ThreadPool::TPHandle::reset_tp_timeout()
55 cct
->get_heartbeat_map()->reset_timeout(
56 hb
, grace
, suicide_grace
);
59 ThreadPool::~ThreadPool()
61 assert(_threads
.empty());
65 void ThreadPool::handle_conf_change(const struct md_config_t
*conf
,
66 const std::set
<std::string
> &changed
)
68 if (changed
.count(_thread_num_option
)) {
70 int r
= conf
->get_val(_thread_num_option
.c_str(), &buf
, -1);
84 void ThreadPool::worker(WorkThread
*wt
)
87 ldout(cct
,10) << "worker start" << dendl
;
91 ceph_pthread_getname(pthread_self(), name
, sizeof(name
));
92 ss
<< name
<< " thread " << name
;
93 heartbeat_handle_d
*hb
= cct
->get_heartbeat_map()->add_worker(ss
.str(), pthread_self());
97 // manage dynamic thread pool
99 if (_threads
.size() > _num_threads
) {
100 ldout(cct
,1) << " worker shutting down; too many threads (" << _threads
.size() << " > " << _num_threads
<< ")" << dendl
;
102 _old_threads
.push_back(wt
);
106 if (!_pause
&& !work_queues
.empty()) {
108 int tries
= work_queues
.size();
111 next_work_queue
%= work_queues
.size();
112 wq
= work_queues
[next_work_queue
++];
114 void *item
= wq
->_void_dequeue();
117 ldout(cct
,12) << "worker wq " << wq
->name
<< " start processing " << item
118 << " (" << processing
<< " active)" << dendl
;
119 TPHandle
tp_handle(cct
, hb
, wq
->timeout_interval
, wq
->suicide_interval
);
120 tp_handle
.reset_tp_timeout();
122 wq
->_void_process(item
, tp_handle
);
124 wq
->_void_process_finish(item
);
126 ldout(cct
,15) << "worker wq " << wq
->name
<< " done processing " << item
127 << " (" << processing
<< " active)" << dendl
;
128 if (_pause
|| _draining
)
138 ldout(cct
,20) << "worker waiting" << dendl
;
139 cct
->get_heartbeat_map()->reset_timeout(
141 cct
->_conf
->threadpool_default_timeout
,
143 _cond
.WaitInterval(_lock
,
145 cct
->_conf
->threadpool_empty_queue_max_wait
, 0));
147 ldout(cct
,1) << "worker finish" << dendl
;
149 cct
->get_heartbeat_map()->remove_worker(hb
);
154 void ThreadPool::start_threads()
156 assert(_lock
.is_locked());
157 while (_threads
.size() < _num_threads
) {
158 WorkThread
*wt
= new WorkThread(this);
159 ldout(cct
, 10) << "start_threads creating and starting " << wt
<< dendl
;
162 int r
= wt
->set_ioprio(ioprio_class
, ioprio_priority
);
164 lderr(cct
) << " set_ioprio got " << cpp_strerror(r
) << dendl
;
166 wt
->create(thread_name
.c_str());
170 void ThreadPool::join_old_threads()
172 assert(_lock
.is_locked());
173 while (!_old_threads
.empty()) {
174 ldout(cct
, 10) << "join_old_threads joining and deleting " << _old_threads
.front() << dendl
;
175 _old_threads
.front()->join();
176 delete _old_threads
.front();
177 _old_threads
.pop_front();
181 void ThreadPool::start()
183 ldout(cct
,10) << "start" << dendl
;
185 if (_thread_num_option
.length()) {
186 ldout(cct
, 10) << " registering config observer on " << _thread_num_option
<< dendl
;
187 cct
->_conf
->add_observer(this);
193 ldout(cct
,15) << "started" << dendl
;
196 void ThreadPool::stop(bool clear_after
)
198 ldout(cct
,10) << "stop" << dendl
;
200 if (_thread_num_option
.length()) {
201 ldout(cct
, 10) << " unregistering config observer on " << _thread_num_option
<< dendl
;
202 cct
->_conf
->remove_observer(this);
210 for (set
<WorkThread
*>::iterator p
= _threads
.begin();
218 for (unsigned i
=0; i
<work_queues
.size(); i
++)
219 work_queues
[i
]->_clear();
222 ldout(cct
,15) << "stopped" << dendl
;
225 void ThreadPool::pause()
227 ldout(cct
,10) << "pause" << dendl
;
231 _wait_cond
.Wait(_lock
);
233 ldout(cct
,15) << "paused" << dendl
;
236 void ThreadPool::pause_new()
238 ldout(cct
,10) << "pause_new" << dendl
;
244 void ThreadPool::unpause()
246 ldout(cct
,10) << "unpause" << dendl
;
254 void ThreadPool::drain(WorkQueue_
* wq
)
256 ldout(cct
,10) << "drain" << dendl
;
259 while (processing
|| (wq
!= NULL
&& !wq
->_empty()))
260 _wait_cond
.Wait(_lock
);
265 void ThreadPool::set_ioprio(int cls
, int priority
)
267 Mutex::Locker
l(_lock
);
269 ioprio_priority
= priority
;
270 for (set
<WorkThread
*>::iterator p
= _threads
.begin();
273 ldout(cct
,10) << __func__
274 << " class " << cls
<< " priority " << priority
275 << " pid " << (*p
)->get_pid()
277 int r
= (*p
)->set_ioprio(cls
, priority
);
279 lderr(cct
) << " set_ioprio got " << cpp_strerror(r
) << dendl
;
283 ShardedThreadPool::ShardedThreadPool(CephContext
*pcct_
, string nm
, string tn
,
284 uint32_t pnum_threads
):
287 thread_name(std::move(tn
)),
288 lockname(name
+ "::lock"),
289 shardedpool_lock(lockname
.c_str()),
290 num_threads(pnum_threads
),
295 void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index
)
298 ldout(cct
,10) << "worker start" << dendl
;
300 std::stringstream ss
;
302 ceph_pthread_getname(pthread_self(), name
, sizeof(name
));
303 ss
<< name
<< " thread " << name
;
304 heartbeat_handle_d
*hb
= cct
->get_heartbeat_map()->add_worker(ss
.str(), pthread_self());
306 while (!stop_threads
) {
308 shardedpool_lock
.Lock();
311 while (pause_threads
) {
312 cct
->get_heartbeat_map()->reset_timeout(
314 wq
->timeout_interval
, wq
->suicide_interval
);
315 shardedpool_cond
.WaitInterval(shardedpool_lock
,
317 cct
->_conf
->threadpool_empty_queue_max_wait
, 0));
320 shardedpool_lock
.Unlock();
323 shardedpool_lock
.Lock();
324 if (wq
->is_shard_empty(thread_index
)) {
327 while (drain_threads
) {
328 cct
->get_heartbeat_map()->reset_timeout(
330 wq
->timeout_interval
, wq
->suicide_interval
);
331 shardedpool_cond
.WaitInterval(shardedpool_lock
,
333 cct
->_conf
->threadpool_empty_queue_max_wait
, 0));
337 shardedpool_lock
.Unlock();
340 cct
->get_heartbeat_map()->reset_timeout(
342 wq
->timeout_interval
, wq
->suicide_interval
);
343 wq
->_process(thread_index
, hb
);
347 ldout(cct
,10) << "sharded worker finish" << dendl
;
349 cct
->get_heartbeat_map()->remove_worker(hb
);
353 void ShardedThreadPool::start_threads()
355 assert(shardedpool_lock
.is_locked());
356 int32_t thread_index
= 0;
357 while (threads_shardedpool
.size() < num_threads
) {
359 WorkThreadSharded
*wt
= new WorkThreadSharded(this, thread_index
);
360 ldout(cct
, 10) << "start_threads creating and starting " << wt
<< dendl
;
361 threads_shardedpool
.push_back(wt
);
362 wt
->create(thread_name
.c_str());
367 void ShardedThreadPool::start()
369 ldout(cct
,10) << "start" << dendl
;
371 shardedpool_lock
.Lock();
373 shardedpool_lock
.Unlock();
374 ldout(cct
,15) << "started" << dendl
;
377 void ShardedThreadPool::stop()
379 ldout(cct
,10) << "stop" << dendl
;
382 wq
->return_waiting_threads();
383 for (vector
<WorkThreadSharded
*>::iterator p
= threads_shardedpool
.begin();
384 p
!= threads_shardedpool
.end();
389 threads_shardedpool
.clear();
390 ldout(cct
,15) << "stopped" << dendl
;
393 void ShardedThreadPool::pause()
395 ldout(cct
,10) << "pause" << dendl
;
396 shardedpool_lock
.Lock();
397 pause_threads
= true;
399 wq
->return_waiting_threads();
400 while (num_threads
!= num_paused
){
401 wait_cond
.Wait(shardedpool_lock
);
403 shardedpool_lock
.Unlock();
404 ldout(cct
,10) << "paused" << dendl
;
407 void ShardedThreadPool::pause_new()
409 ldout(cct
,10) << "pause_new" << dendl
;
410 shardedpool_lock
.Lock();
411 pause_threads
= true;
413 wq
->return_waiting_threads();
414 shardedpool_lock
.Unlock();
415 ldout(cct
,10) << "paused_new" << dendl
;
418 void ShardedThreadPool::unpause()
420 ldout(cct
,10) << "unpause" << dendl
;
421 shardedpool_lock
.Lock();
422 pause_threads
= false;
423 shardedpool_cond
.Signal();
424 shardedpool_lock
.Unlock();
425 ldout(cct
,10) << "unpaused" << dendl
;
428 void ShardedThreadPool::drain()
430 ldout(cct
,10) << "drain" << dendl
;
431 shardedpool_lock
.Lock();
432 drain_threads
= true;
434 wq
->return_waiting_threads();
435 while (num_threads
!= num_drained
) {
436 wait_cond
.Wait(shardedpool_lock
);
438 drain_threads
= false;
439 shardedpool_cond
.Signal();
440 shardedpool_lock
.Unlock();
441 ldout(cct
,10) << "drained" << dendl
;