1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "WorkQueue.h"
16 #include "include/compat.h"
17 #include "common/errno.h"
19 #define dout_subsys ceph_subsys_tp
21 #define dout_prefix *_dout << name << " "
23 ThreadPool::ThreadPool(CephContext
*cct_
, std::string nm
, std::string tn
, int n
, const char *option
)
24 : cct(cct_
), name(std::move(nm
)), thread_name(std::move(tn
)),
25 lockname(name
+ "::lock"),
26 _lock(ceph::make_mutex(lockname
)), // this should be safe due to declaration order
34 _thread_num_option
= option
;
36 _conf_keys
= new const char*[2];
37 _conf_keys
[0] = _thread_num_option
.c_str();
40 _conf_keys
= new const char*[1];
45 void ThreadPool::TPHandle::suspend_tp_timeout()
47 cct
->get_heartbeat_map()->clear_timeout(hb
);
50 void ThreadPool::TPHandle::reset_tp_timeout()
52 cct
->get_heartbeat_map()->reset_timeout(
53 hb
, grace
, suicide_grace
);
56 ThreadPool::~ThreadPool()
58 ceph_assert(_threads
.empty());
62 void ThreadPool::handle_conf_change(const ConfigProxy
& conf
,
63 const std::set
<std::string
> &changed
)
65 if (changed
.count(_thread_num_option
)) {
67 int r
= conf
.get_val(_thread_num_option
.c_str(), &buf
, -1);
81 void ThreadPool::worker(WorkThread
*wt
)
83 std::unique_lock
ul(_lock
);
84 ldout(cct
,10) << "worker start" << dendl
;
87 ss
<< name
<< " thread " << (void *)pthread_self();
88 auto hb
= cct
->get_heartbeat_map()->add_worker(ss
.str(), pthread_self());
92 // manage dynamic thread pool
94 if (_threads
.size() > _num_threads
) {
95 ldout(cct
,1) << " worker shutting down; too many threads (" << _threads
.size() << " > " << _num_threads
<< ")" << dendl
;
97 _old_threads
.push_back(wt
);
101 if (work_queues
.empty()) {
102 ldout(cct
, 10) << "worker no work queues" << dendl
;
103 } else if (!_pause
) {
105 int tries
= 2 * work_queues
.size();
108 next_work_queue
%= work_queues
.size();
109 wq
= work_queues
[next_work_queue
++];
111 void *item
= wq
->_void_dequeue();
114 ldout(cct
,12) << "worker wq " << wq
->name
<< " start processing " << item
115 << " (" << processing
<< " active)" << dendl
;
117 TPHandle
tp_handle(cct
, hb
, wq
->timeout_interval
, wq
->suicide_interval
);
118 tp_handle
.reset_tp_timeout();
119 wq
->_void_process(item
, tp_handle
);
121 wq
->_void_process_finish(item
);
123 ldout(cct
,15) << "worker wq " << wq
->name
<< " done processing " << item
124 << " (" << processing
<< " active)" << dendl
;
125 if (_pause
|| _draining
)
126 _wait_cond
.notify_all();
135 ldout(cct
,20) << "worker waiting" << dendl
;
136 cct
->get_heartbeat_map()->reset_timeout(
138 ceph::make_timespan(cct
->_conf
->threadpool_default_timeout
),
139 ceph::make_timespan(0));
140 auto wait
= std::chrono::seconds(
141 cct
->_conf
->threadpool_empty_queue_max_wait
);
142 _cond
.wait_for(ul
, wait
);
144 ldout(cct
,1) << "worker finish" << dendl
;
146 cct
->get_heartbeat_map()->remove_worker(hb
);
149 void ThreadPool::start_threads()
151 ceph_assert(ceph_mutex_is_locked(_lock
));
152 while (_threads
.size() < _num_threads
) {
153 WorkThread
*wt
= new WorkThread(this);
154 ldout(cct
, 10) << "start_threads creating and starting " << wt
<< dendl
;
157 wt
->create(thread_name
.c_str());
161 void ThreadPool::join_old_threads()
163 ceph_assert(ceph_mutex_is_locked(_lock
));
164 while (!_old_threads
.empty()) {
165 ldout(cct
, 10) << "join_old_threads joining and deleting " << _old_threads
.front() << dendl
;
166 _old_threads
.front()->join();
167 delete _old_threads
.front();
168 _old_threads
.pop_front();
172 void ThreadPool::start()
174 ldout(cct
,10) << "start" << dendl
;
176 if (_thread_num_option
.length()) {
177 ldout(cct
, 10) << " registering config observer on " << _thread_num_option
<< dendl
;
178 cct
->_conf
.add_observer(this);
184 ldout(cct
,15) << "started" << dendl
;
187 void ThreadPool::stop(bool clear_after
)
189 ldout(cct
,10) << "stop" << dendl
;
191 if (_thread_num_option
.length()) {
192 ldout(cct
, 10) << " unregistering config observer on " << _thread_num_option
<< dendl
;
193 cct
->_conf
.remove_observer(this);
201 for (auto p
= _threads
.begin(); p
!= _threads
.end(); ++p
) {
207 for (unsigned i
=0; i
<work_queues
.size(); i
++)
208 work_queues
[i
]->_clear();
211 ldout(cct
,15) << "stopped" << dendl
;
214 void ThreadPool::pause()
216 std::unique_lock
ul(_lock
);
217 ldout(cct
,10) << "pause" << dendl
;
222 ldout(cct
,15) << "paused" << dendl
;
225 void ThreadPool::pause_new()
227 ldout(cct
,10) << "pause_new" << dendl
;
233 void ThreadPool::unpause()
235 ldout(cct
,10) << "unpause" << dendl
;
237 ceph_assert(_pause
> 0);
243 void ThreadPool::drain(WorkQueue_
* wq
)
245 std::unique_lock
ul(_lock
);
246 ldout(cct
,10) << "drain" << dendl
;
248 while (processing
|| (wq
!= NULL
&& !wq
->_empty())) {
254 ShardedThreadPool::ShardedThreadPool(CephContext
*pcct_
, std::string nm
, std::string tn
,
255 uint32_t pnum_threads
):
258 thread_name(std::move(tn
)),
259 lockname(name
+ "::lock"),
260 shardedpool_lock(ceph::make_mutex(lockname
)),
261 num_threads(pnum_threads
),
266 void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index
)
268 ceph_assert(wq
!= NULL
);
269 ldout(cct
,10) << "worker start" << dendl
;
271 std::stringstream ss
;
272 ss
<< name
<< " thread " << (void *)pthread_self();
273 auto hb
= cct
->get_heartbeat_map()->add_worker(ss
.str(), pthread_self());
275 while (!stop_threads
) {
277 std::unique_lock
ul(shardedpool_lock
);
279 wait_cond
.notify_all();
280 while (pause_threads
) {
281 cct
->get_heartbeat_map()->reset_timeout(
283 wq
->timeout_interval
,
284 wq
->suicide_interval
);
285 shardedpool_cond
.wait_for(
287 std::chrono::seconds(cct
->_conf
->threadpool_empty_queue_max_wait
));
292 std::unique_lock
ul(shardedpool_lock
);
293 if (wq
->is_shard_empty(thread_index
)) {
295 wait_cond
.notify_all();
296 while (drain_threads
) {
297 cct
->get_heartbeat_map()->reset_timeout(
299 wq
->timeout_interval
,
300 wq
->suicide_interval
);
301 shardedpool_cond
.wait_for(
303 std::chrono::seconds(cct
->_conf
->threadpool_empty_queue_max_wait
));
309 cct
->get_heartbeat_map()->reset_timeout(
311 wq
->timeout_interval
,
312 wq
->suicide_interval
);
313 wq
->_process(thread_index
, hb
);
317 ldout(cct
,10) << "sharded worker finish" << dendl
;
319 cct
->get_heartbeat_map()->remove_worker(hb
);
323 void ShardedThreadPool::start_threads()
325 ceph_assert(ceph_mutex_is_locked(shardedpool_lock
));
326 int32_t thread_index
= 0;
327 while (threads_shardedpool
.size() < num_threads
) {
329 WorkThreadSharded
*wt
= new WorkThreadSharded(this, thread_index
);
330 ldout(cct
, 10) << "start_threads creating and starting " << wt
<< dendl
;
331 threads_shardedpool
.push_back(wt
);
332 wt
->create(thread_name
.c_str());
337 void ShardedThreadPool::start()
339 ldout(cct
,10) << "start" << dendl
;
341 shardedpool_lock
.lock();
343 shardedpool_lock
.unlock();
344 ldout(cct
,15) << "started" << dendl
;
347 void ShardedThreadPool::stop()
349 ldout(cct
,10) << "stop" << dendl
;
351 ceph_assert(wq
!= NULL
);
352 wq
->return_waiting_threads();
353 for (auto p
= threads_shardedpool
.begin();
354 p
!= threads_shardedpool
.end();
359 threads_shardedpool
.clear();
360 ldout(cct
,15) << "stopped" << dendl
;
363 void ShardedThreadPool::pause()
365 std::unique_lock
ul(shardedpool_lock
);
366 ldout(cct
,10) << "pause" << dendl
;
367 pause_threads
= true;
368 ceph_assert(wq
!= NULL
);
369 wq
->return_waiting_threads();
370 while (num_threads
!= num_paused
){
373 ldout(cct
,10) << "paused" << dendl
;
376 void ShardedThreadPool::pause_new()
378 ldout(cct
,10) << "pause_new" << dendl
;
379 shardedpool_lock
.lock();
380 pause_threads
= true;
381 ceph_assert(wq
!= NULL
);
382 wq
->return_waiting_threads();
383 shardedpool_lock
.unlock();
384 ldout(cct
,10) << "paused_new" << dendl
;
387 void ShardedThreadPool::unpause()
389 ldout(cct
,10) << "unpause" << dendl
;
390 shardedpool_lock
.lock();
391 pause_threads
= false;
392 wq
->stop_return_waiting_threads();
393 shardedpool_cond
.notify_all();
394 shardedpool_lock
.unlock();
395 ldout(cct
,10) << "unpaused" << dendl
;
398 void ShardedThreadPool::drain()
400 std::unique_lock
ul(shardedpool_lock
);
401 ldout(cct
,10) << "drain" << dendl
;
402 drain_threads
= true;
403 ceph_assert(wq
!= NULL
);
404 wq
->return_waiting_threads();
405 while (num_threads
!= num_drained
) {
408 drain_threads
= false;
409 wq
->stop_return_waiting_threads();
410 shardedpool_cond
.notify_all();
411 ldout(cct
,10) << "drained" << dendl
;