]> git.proxmox.com Git - ceph.git/blame - ceph/src/common/WorkQueue.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / common / WorkQueue.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
31f18b77 15#include "WorkQueue.h"
7c673cae 16#include "include/compat.h"
7c673cae 17#include "common/errno.h"
7c673cae
FG
18
19#define dout_subsys ceph_subsys_tp
20#undef dout_prefix
21#define dout_prefix *_dout << name << " "
22
23
24ThreadPool::ThreadPool(CephContext *cct_, string nm, string tn, int n, const char *option)
25 : cct(cct_), name(std::move(nm)), thread_name(std::move(tn)),
26 lockname(name + "::lock"),
11fdf7f2 27 _lock(ceph::make_mutex(lockname)), // this should be safe due to declaration order
7c673cae
FG
28 _stop(false),
29 _pause(0),
30 _draining(0),
7c673cae
FG
31 _num_threads(n),
32 processing(0)
33{
34 if (option) {
35 _thread_num_option = option;
36 // set up conf_keys
37 _conf_keys = new const char*[2];
38 _conf_keys[0] = _thread_num_option.c_str();
39 _conf_keys[1] = NULL;
40 } else {
41 _conf_keys = new const char*[1];
42 _conf_keys[0] = NULL;
43 }
44}
45
46void ThreadPool::TPHandle::suspend_tp_timeout()
47{
48 cct->get_heartbeat_map()->clear_timeout(hb);
49}
50
51void ThreadPool::TPHandle::reset_tp_timeout()
52{
53 cct->get_heartbeat_map()->reset_timeout(
54 hb, grace, suicide_grace);
55}
56
57ThreadPool::~ThreadPool()
58{
11fdf7f2 59 ceph_assert(_threads.empty());
7c673cae
FG
60 delete[] _conf_keys;
61}
62
11fdf7f2 63void ThreadPool::handle_conf_change(const ConfigProxy& conf,
7c673cae
FG
64 const std::set <std::string> &changed)
65{
66 if (changed.count(_thread_num_option)) {
67 char *buf;
11fdf7f2
TL
68 int r = conf.get_val(_thread_num_option.c_str(), &buf, -1);
69 ceph_assert(r >= 0);
7c673cae
FG
70 int v = atoi(buf);
71 free(buf);
72 if (v >= 0) {
11fdf7f2 73 _lock.lock();
7c673cae
FG
74 _num_threads = v;
75 start_threads();
11fdf7f2
TL
76 _cond.notify_all();
77 _lock.unlock();
7c673cae
FG
78 }
79 }
80}
81
82void ThreadPool::worker(WorkThread *wt)
83{
11fdf7f2 84 std::unique_lock ul(_lock);
7c673cae
FG
85 ldout(cct,10) << "worker start" << dendl;
86
87 std::stringstream ss;
c07f9fc5 88 ss << name << " thread " << (void *)pthread_self();
7c673cae
FG
89 heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self());
90
91 while (!_stop) {
92
93 // manage dynamic thread pool
94 join_old_threads();
95 if (_threads.size() > _num_threads) {
96 ldout(cct,1) << " worker shutting down; too many threads (" << _threads.size() << " > " << _num_threads << ")" << dendl;
97 _threads.erase(wt);
98 _old_threads.push_back(wt);
99 break;
100 }
101
102 if (!_pause && !work_queues.empty()) {
103 WorkQueue_* wq;
11fdf7f2 104 int tries = 2 * work_queues.size();
7c673cae
FG
105 bool did = false;
106 while (tries--) {
107 next_work_queue %= work_queues.size();
108 wq = work_queues[next_work_queue++];
109
110 void *item = wq->_void_dequeue();
111 if (item) {
112 processing++;
113 ldout(cct,12) << "worker wq " << wq->name << " start processing " << item
114 << " (" << processing << " active)" << dendl;
115 TPHandle tp_handle(cct, hb, wq->timeout_interval, wq->suicide_interval);
116 tp_handle.reset_tp_timeout();
11fdf7f2 117 ul.unlock();
7c673cae 118 wq->_void_process(item, tp_handle);
11fdf7f2 119 ul.lock();
7c673cae
FG
120 wq->_void_process_finish(item);
121 processing--;
122 ldout(cct,15) << "worker wq " << wq->name << " done processing " << item
123 << " (" << processing << " active)" << dendl;
124 if (_pause || _draining)
11fdf7f2 125 _wait_cond.notify_all();
7c673cae
FG
126 did = true;
127 break;
128 }
129 }
130 if (did)
131 continue;
132 }
133
134 ldout(cct,20) << "worker waiting" << dendl;
135 cct->get_heartbeat_map()->reset_timeout(
136 hb,
137 cct->_conf->threadpool_default_timeout,
138 0);
11fdf7f2
TL
139 auto wait = std::chrono::seconds(
140 cct->_conf->threadpool_empty_queue_max_wait);
141 _cond.wait_for(ul, wait);
7c673cae
FG
142 }
143 ldout(cct,1) << "worker finish" << dendl;
144
145 cct->get_heartbeat_map()->remove_worker(hb);
7c673cae
FG
146}
147
148void ThreadPool::start_threads()
149{
11fdf7f2 150 ceph_assert(ceph_mutex_is_locked(_lock));
7c673cae
FG
151 while (_threads.size() < _num_threads) {
152 WorkThread *wt = new WorkThread(this);
153 ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;
154 _threads.insert(wt);
155
7c673cae
FG
156 wt->create(thread_name.c_str());
157 }
158}
159
160void ThreadPool::join_old_threads()
161{
11fdf7f2 162 ceph_assert(ceph_mutex_is_locked(_lock));
7c673cae
FG
163 while (!_old_threads.empty()) {
164 ldout(cct, 10) << "join_old_threads joining and deleting " << _old_threads.front() << dendl;
165 _old_threads.front()->join();
166 delete _old_threads.front();
167 _old_threads.pop_front();
168 }
169}
170
171void ThreadPool::start()
172{
173 ldout(cct,10) << "start" << dendl;
174
175 if (_thread_num_option.length()) {
176 ldout(cct, 10) << " registering config observer on " << _thread_num_option << dendl;
11fdf7f2 177 cct->_conf.add_observer(this);
7c673cae
FG
178 }
179
11fdf7f2 180 _lock.lock();
7c673cae 181 start_threads();
11fdf7f2 182 _lock.unlock();
7c673cae
FG
183 ldout(cct,15) << "started" << dendl;
184}
185
186void ThreadPool::stop(bool clear_after)
187{
188 ldout(cct,10) << "stop" << dendl;
189
190 if (_thread_num_option.length()) {
191 ldout(cct, 10) << " unregistering config observer on " << _thread_num_option << dendl;
11fdf7f2 192 cct->_conf.remove_observer(this);
7c673cae
FG
193 }
194
11fdf7f2 195 _lock.lock();
7c673cae 196 _stop = true;
11fdf7f2 197 _cond.notify_all();
7c673cae 198 join_old_threads();
11fdf7f2 199 _lock.unlock();
7c673cae
FG
200 for (set<WorkThread*>::iterator p = _threads.begin();
201 p != _threads.end();
202 ++p) {
203 (*p)->join();
204 delete *p;
205 }
206 _threads.clear();
11fdf7f2 207 _lock.lock();
7c673cae
FG
208 for (unsigned i=0; i<work_queues.size(); i++)
209 work_queues[i]->_clear();
210 _stop = false;
11fdf7f2 211 _lock.unlock();
7c673cae
FG
212 ldout(cct,15) << "stopped" << dendl;
213}
214
215void ThreadPool::pause()
216{
11fdf7f2 217 std::unique_lock ul(_lock);
7c673cae 218 ldout(cct,10) << "pause" << dendl;
7c673cae 219 _pause++;
11fdf7f2
TL
220 while (processing) {
221 _wait_cond.wait(ul);
222 }
7c673cae
FG
223 ldout(cct,15) << "paused" << dendl;
224}
225
226void ThreadPool::pause_new()
227{
228 ldout(cct,10) << "pause_new" << dendl;
11fdf7f2 229 _lock.lock();
7c673cae 230 _pause++;
11fdf7f2 231 _lock.unlock();
7c673cae
FG
232}
233
234void ThreadPool::unpause()
235{
236 ldout(cct,10) << "unpause" << dendl;
11fdf7f2
TL
237 _lock.lock();
238 ceph_assert(_pause > 0);
7c673cae 239 _pause--;
11fdf7f2
TL
240 _cond.notify_all();
241 _lock.unlock();
7c673cae
FG
242}
243
244void ThreadPool::drain(WorkQueue_* wq)
245{
11fdf7f2 246 std::unique_lock ul(_lock);
7c673cae 247 ldout(cct,10) << "drain" << dendl;
7c673cae 248 _draining++;
11fdf7f2
TL
249 while (processing || (wq != NULL && !wq->_empty())) {
250 _wait_cond.wait(ul);
7c673cae 251 }
11fdf7f2 252 _draining--;
7c673cae
FG
253}
254
255ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, string nm, string tn,
256 uint32_t pnum_threads):
257 cct(pcct_),
258 name(std::move(nm)),
259 thread_name(std::move(tn)),
260 lockname(name + "::lock"),
11fdf7f2 261 shardedpool_lock(ceph::make_mutex(lockname)),
7c673cae 262 num_threads(pnum_threads),
7c673cae
FG
263 num_paused(0),
264 num_drained(0),
265 wq(NULL) {}
266
267void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
268{
11fdf7f2 269 ceph_assert(wq != NULL);
7c673cae
FG
270 ldout(cct,10) << "worker start" << dendl;
271
272 std::stringstream ss;
c07f9fc5 273 ss << name << " thread " << (void *)pthread_self();
7c673cae
FG
274 heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self());
275
31f18b77
FG
276 while (!stop_threads) {
277 if (pause_threads) {
11fdf7f2 278 std::unique_lock ul(shardedpool_lock);
7c673cae 279 ++num_paused;
11fdf7f2 280 wait_cond.notify_all();
31f18b77 281 while (pause_threads) {
7c673cae 282 cct->get_heartbeat_map()->reset_timeout(
31f18b77
FG
283 hb,
284 wq->timeout_interval, wq->suicide_interval);
11fdf7f2
TL
285 shardedpool_cond.wait_for(
286 ul,
287 std::chrono::seconds(cct->_conf->threadpool_empty_queue_max_wait));
7c673cae
FG
288 }
289 --num_paused;
7c673cae 290 }
31f18b77 291 if (drain_threads) {
11fdf7f2 292 std::unique_lock ul(shardedpool_lock);
7c673cae
FG
293 if (wq->is_shard_empty(thread_index)) {
294 ++num_drained;
11fdf7f2 295 wait_cond.notify_all();
31f18b77 296 while (drain_threads) {
7c673cae
FG
297 cct->get_heartbeat_map()->reset_timeout(
298 hb,
299 wq->timeout_interval, wq->suicide_interval);
11fdf7f2
TL
300 shardedpool_cond.wait_for(
301 ul,
302 std::chrono::seconds(cct->_conf->threadpool_empty_queue_max_wait));
7c673cae
FG
303 }
304 --num_drained;
305 }
7c673cae
FG
306 }
307
308 cct->get_heartbeat_map()->reset_timeout(
309 hb,
310 wq->timeout_interval, wq->suicide_interval);
311 wq->_process(thread_index, hb);
312
313 }
314
315 ldout(cct,10) << "sharded worker finish" << dendl;
316
317 cct->get_heartbeat_map()->remove_worker(hb);
318
319}
320
321void ShardedThreadPool::start_threads()
322{
11fdf7f2 323 ceph_assert(ceph_mutex_is_locked(shardedpool_lock));
7c673cae
FG
324 int32_t thread_index = 0;
325 while (threads_shardedpool.size() < num_threads) {
326
327 WorkThreadSharded *wt = new WorkThreadSharded(this, thread_index);
328 ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;
329 threads_shardedpool.push_back(wt);
330 wt->create(thread_name.c_str());
331 thread_index++;
332 }
333}
334
335void ShardedThreadPool::start()
336{
337 ldout(cct,10) << "start" << dendl;
338
11fdf7f2 339 shardedpool_lock.lock();
7c673cae 340 start_threads();
11fdf7f2 341 shardedpool_lock.unlock();
7c673cae
FG
342 ldout(cct,15) << "started" << dendl;
343}
344
345void ShardedThreadPool::stop()
346{
347 ldout(cct,10) << "stop" << dendl;
31f18b77 348 stop_threads = true;
11fdf7f2 349 ceph_assert(wq != NULL);
7c673cae
FG
350 wq->return_waiting_threads();
351 for (vector<WorkThreadSharded*>::iterator p = threads_shardedpool.begin();
352 p != threads_shardedpool.end();
353 ++p) {
354 (*p)->join();
355 delete *p;
356 }
357 threads_shardedpool.clear();
358 ldout(cct,15) << "stopped" << dendl;
359}
360
361void ShardedThreadPool::pause()
362{
11fdf7f2 363 std::unique_lock ul(shardedpool_lock);
7c673cae 364 ldout(cct,10) << "pause" << dendl;
31f18b77 365 pause_threads = true;
11fdf7f2 366 ceph_assert(wq != NULL);
7c673cae
FG
367 wq->return_waiting_threads();
368 while (num_threads != num_paused){
11fdf7f2 369 wait_cond.wait(ul);
7c673cae 370 }
7c673cae
FG
371 ldout(cct,10) << "paused" << dendl;
372}
373
374void ShardedThreadPool::pause_new()
375{
376 ldout(cct,10) << "pause_new" << dendl;
11fdf7f2 377 shardedpool_lock.lock();
31f18b77 378 pause_threads = true;
11fdf7f2 379 ceph_assert(wq != NULL);
7c673cae 380 wq->return_waiting_threads();
11fdf7f2 381 shardedpool_lock.unlock();
7c673cae
FG
382 ldout(cct,10) << "paused_new" << dendl;
383}
384
385void ShardedThreadPool::unpause()
386{
387 ldout(cct,10) << "unpause" << dendl;
11fdf7f2 388 shardedpool_lock.lock();
31f18b77 389 pause_threads = false;
11fdf7f2
TL
390 wq->stop_return_waiting_threads();
391 shardedpool_cond.notify_all();
392 shardedpool_lock.unlock();
7c673cae
FG
393 ldout(cct,10) << "unpaused" << dendl;
394}
395
396void ShardedThreadPool::drain()
397{
11fdf7f2 398 std::unique_lock ul(shardedpool_lock);
7c673cae 399 ldout(cct,10) << "drain" << dendl;
31f18b77 400 drain_threads = true;
11fdf7f2 401 ceph_assert(wq != NULL);
7c673cae
FG
402 wq->return_waiting_threads();
403 while (num_threads != num_drained) {
11fdf7f2 404 wait_cond.wait(ul);
7c673cae 405 }
31f18b77 406 drain_threads = false;
11fdf7f2
TL
407 wq->stop_return_waiting_threads();
408 shardedpool_cond.notify_all();
7c673cae
FG
409 ldout(cct,10) << "drained" << dendl;
410}
411