]> git.proxmox.com Git - ceph.git/blame - ceph/src/common/WorkQueue.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / common / WorkQueue.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
31f18b77 15#include "WorkQueue.h"
7c673cae 16#include "include/compat.h"
7c673cae 17#include "common/errno.h"
7c673cae
FG
18
19#define dout_subsys ceph_subsys_tp
20#undef dout_prefix
21#define dout_prefix *_dout << name << " "
22
f67539c2 23ThreadPool::ThreadPool(CephContext *cct_, std::string nm, std::string tn, int n, const char *option)
7c673cae
FG
24 : cct(cct_), name(std::move(nm)), thread_name(std::move(tn)),
25 lockname(name + "::lock"),
11fdf7f2 26 _lock(ceph::make_mutex(lockname)), // this should be safe due to declaration order
7c673cae
FG
27 _stop(false),
28 _pause(0),
29 _draining(0),
7c673cae
FG
30 _num_threads(n),
31 processing(0)
32{
33 if (option) {
34 _thread_num_option = option;
35 // set up conf_keys
36 _conf_keys = new const char*[2];
37 _conf_keys[0] = _thread_num_option.c_str();
38 _conf_keys[1] = NULL;
39 } else {
40 _conf_keys = new const char*[1];
41 _conf_keys[0] = NULL;
42 }
43}
44
45void ThreadPool::TPHandle::suspend_tp_timeout()
46{
47 cct->get_heartbeat_map()->clear_timeout(hb);
48}
49
50void ThreadPool::TPHandle::reset_tp_timeout()
51{
52 cct->get_heartbeat_map()->reset_timeout(
53 hb, grace, suicide_grace);
54}
55
56ThreadPool::~ThreadPool()
57{
11fdf7f2 58 ceph_assert(_threads.empty());
7c673cae
FG
59 delete[] _conf_keys;
60}
61
11fdf7f2 62void ThreadPool::handle_conf_change(const ConfigProxy& conf,
7c673cae
FG
63 const std::set <std::string> &changed)
64{
65 if (changed.count(_thread_num_option)) {
66 char *buf;
11fdf7f2
TL
67 int r = conf.get_val(_thread_num_option.c_str(), &buf, -1);
68 ceph_assert(r >= 0);
7c673cae
FG
69 int v = atoi(buf);
70 free(buf);
71 if (v >= 0) {
11fdf7f2 72 _lock.lock();
7c673cae
FG
73 _num_threads = v;
74 start_threads();
11fdf7f2
TL
75 _cond.notify_all();
76 _lock.unlock();
7c673cae
FG
77 }
78 }
79}
80
81void ThreadPool::worker(WorkThread *wt)
82{
11fdf7f2 83 std::unique_lock ul(_lock);
7c673cae 84 ldout(cct,10) << "worker start" << dendl;
f67539c2 85
7c673cae 86 std::stringstream ss;
c07f9fc5 87 ss << name << " thread " << (void *)pthread_self();
f67539c2 88 auto hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self());
7c673cae
FG
89
90 while (!_stop) {
91
92 // manage dynamic thread pool
93 join_old_threads();
94 if (_threads.size() > _num_threads) {
95 ldout(cct,1) << " worker shutting down; too many threads (" << _threads.size() << " > " << _num_threads << ")" << dendl;
96 _threads.erase(wt);
97 _old_threads.push_back(wt);
98 break;
99 }
100
20effc67
TL
101 if (work_queues.empty()) {
102 ldout(cct, 10) << "worker no work queues" << dendl;
103 } else if (!_pause) {
7c673cae 104 WorkQueue_* wq;
11fdf7f2 105 int tries = 2 * work_queues.size();
7c673cae
FG
106 bool did = false;
107 while (tries--) {
108 next_work_queue %= work_queues.size();
109 wq = work_queues[next_work_queue++];
110
111 void *item = wq->_void_dequeue();
112 if (item) {
113 processing++;
114 ldout(cct,12) << "worker wq " << wq->name << " start processing " << item
115 << " (" << processing << " active)" << dendl;
9f95a23c 116 ul.unlock();
7c673cae
FG
117 TPHandle tp_handle(cct, hb, wq->timeout_interval, wq->suicide_interval);
118 tp_handle.reset_tp_timeout();
7c673cae 119 wq->_void_process(item, tp_handle);
11fdf7f2 120 ul.lock();
7c673cae
FG
121 wq->_void_process_finish(item);
122 processing--;
123 ldout(cct,15) << "worker wq " << wq->name << " done processing " << item
124 << " (" << processing << " active)" << dendl;
125 if (_pause || _draining)
11fdf7f2 126 _wait_cond.notify_all();
7c673cae
FG
127 did = true;
128 break;
129 }
130 }
131 if (did)
132 continue;
133 }
134
135 ldout(cct,20) << "worker waiting" << dendl;
136 cct->get_heartbeat_map()->reset_timeout(
137 hb,
f67539c2
TL
138 ceph::make_timespan(cct->_conf->threadpool_default_timeout),
139 ceph::make_timespan(0));
11fdf7f2
TL
140 auto wait = std::chrono::seconds(
141 cct->_conf->threadpool_empty_queue_max_wait);
142 _cond.wait_for(ul, wait);
7c673cae
FG
143 }
144 ldout(cct,1) << "worker finish" << dendl;
145
146 cct->get_heartbeat_map()->remove_worker(hb);
7c673cae
FG
147}
148
149void ThreadPool::start_threads()
150{
11fdf7f2 151 ceph_assert(ceph_mutex_is_locked(_lock));
7c673cae
FG
152 while (_threads.size() < _num_threads) {
153 WorkThread *wt = new WorkThread(this);
154 ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;
155 _threads.insert(wt);
156
7c673cae
FG
157 wt->create(thread_name.c_str());
158 }
159}
160
161void ThreadPool::join_old_threads()
162{
11fdf7f2 163 ceph_assert(ceph_mutex_is_locked(_lock));
7c673cae
FG
164 while (!_old_threads.empty()) {
165 ldout(cct, 10) << "join_old_threads joining and deleting " << _old_threads.front() << dendl;
166 _old_threads.front()->join();
167 delete _old_threads.front();
168 _old_threads.pop_front();
169 }
170}
171
172void ThreadPool::start()
173{
174 ldout(cct,10) << "start" << dendl;
175
176 if (_thread_num_option.length()) {
177 ldout(cct, 10) << " registering config observer on " << _thread_num_option << dendl;
11fdf7f2 178 cct->_conf.add_observer(this);
7c673cae
FG
179 }
180
11fdf7f2 181 _lock.lock();
7c673cae 182 start_threads();
11fdf7f2 183 _lock.unlock();
7c673cae
FG
184 ldout(cct,15) << "started" << dendl;
185}
186
187void ThreadPool::stop(bool clear_after)
188{
189 ldout(cct,10) << "stop" << dendl;
190
191 if (_thread_num_option.length()) {
192 ldout(cct, 10) << " unregistering config observer on " << _thread_num_option << dendl;
11fdf7f2 193 cct->_conf.remove_observer(this);
7c673cae
FG
194 }
195
11fdf7f2 196 _lock.lock();
7c673cae 197 _stop = true;
11fdf7f2 198 _cond.notify_all();
7c673cae 199 join_old_threads();
11fdf7f2 200 _lock.unlock();
f67539c2 201 for (auto p = _threads.begin(); p != _threads.end(); ++p) {
7c673cae
FG
202 (*p)->join();
203 delete *p;
204 }
205 _threads.clear();
11fdf7f2 206 _lock.lock();
7c673cae
FG
207 for (unsigned i=0; i<work_queues.size(); i++)
208 work_queues[i]->_clear();
209 _stop = false;
11fdf7f2 210 _lock.unlock();
7c673cae
FG
211 ldout(cct,15) << "stopped" << dendl;
212}
213
214void ThreadPool::pause()
215{
11fdf7f2 216 std::unique_lock ul(_lock);
7c673cae 217 ldout(cct,10) << "pause" << dendl;
7c673cae 218 _pause++;
11fdf7f2
TL
219 while (processing) {
220 _wait_cond.wait(ul);
221 }
7c673cae
FG
222 ldout(cct,15) << "paused" << dendl;
223}
224
225void ThreadPool::pause_new()
226{
227 ldout(cct,10) << "pause_new" << dendl;
11fdf7f2 228 _lock.lock();
7c673cae 229 _pause++;
11fdf7f2 230 _lock.unlock();
7c673cae
FG
231}
232
233void ThreadPool::unpause()
234{
235 ldout(cct,10) << "unpause" << dendl;
11fdf7f2
TL
236 _lock.lock();
237 ceph_assert(_pause > 0);
7c673cae 238 _pause--;
11fdf7f2
TL
239 _cond.notify_all();
240 _lock.unlock();
7c673cae
FG
241}
242
243void ThreadPool::drain(WorkQueue_* wq)
244{
11fdf7f2 245 std::unique_lock ul(_lock);
7c673cae 246 ldout(cct,10) << "drain" << dendl;
7c673cae 247 _draining++;
11fdf7f2
TL
248 while (processing || (wq != NULL && !wq->_empty())) {
249 _wait_cond.wait(ul);
7c673cae 250 }
11fdf7f2 251 _draining--;
7c673cae
FG
252}
253
f67539c2
TL
254ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, std::string nm, std::string tn,
255 uint32_t pnum_threads):
7c673cae
FG
256 cct(pcct_),
257 name(std::move(nm)),
258 thread_name(std::move(tn)),
259 lockname(name + "::lock"),
11fdf7f2 260 shardedpool_lock(ceph::make_mutex(lockname)),
7c673cae 261 num_threads(pnum_threads),
7c673cae
FG
262 num_paused(0),
263 num_drained(0),
264 wq(NULL) {}
265
266void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
267{
11fdf7f2 268 ceph_assert(wq != NULL);
7c673cae
FG
269 ldout(cct,10) << "worker start" << dendl;
270
271 std::stringstream ss;
c07f9fc5 272 ss << name << " thread " << (void *)pthread_self();
f67539c2 273 auto hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self());
7c673cae 274
31f18b77
FG
275 while (!stop_threads) {
276 if (pause_threads) {
11fdf7f2 277 std::unique_lock ul(shardedpool_lock);
7c673cae 278 ++num_paused;
11fdf7f2 279 wait_cond.notify_all();
31f18b77 280 while (pause_threads) {
7c673cae 281 cct->get_heartbeat_map()->reset_timeout(
31f18b77 282 hb,
f67539c2
TL
283 wq->timeout_interval,
284 wq->suicide_interval);
11fdf7f2
TL
285 shardedpool_cond.wait_for(
286 ul,
287 std::chrono::seconds(cct->_conf->threadpool_empty_queue_max_wait));
7c673cae
FG
288 }
289 --num_paused;
7c673cae 290 }
31f18b77 291 if (drain_threads) {
11fdf7f2 292 std::unique_lock ul(shardedpool_lock);
7c673cae
FG
293 if (wq->is_shard_empty(thread_index)) {
294 ++num_drained;
11fdf7f2 295 wait_cond.notify_all();
31f18b77 296 while (drain_threads) {
7c673cae
FG
297 cct->get_heartbeat_map()->reset_timeout(
298 hb,
f67539c2
TL
299 wq->timeout_interval,
300 wq->suicide_interval);
11fdf7f2
TL
301 shardedpool_cond.wait_for(
302 ul,
303 std::chrono::seconds(cct->_conf->threadpool_empty_queue_max_wait));
7c673cae
FG
304 }
305 --num_drained;
306 }
7c673cae
FG
307 }
308
309 cct->get_heartbeat_map()->reset_timeout(
310 hb,
f67539c2
TL
311 wq->timeout_interval,
312 wq->suicide_interval);
7c673cae
FG
313 wq->_process(thread_index, hb);
314
315 }
316
317 ldout(cct,10) << "sharded worker finish" << dendl;
318
319 cct->get_heartbeat_map()->remove_worker(hb);
320
321}
322
323void ShardedThreadPool::start_threads()
324{
11fdf7f2 325 ceph_assert(ceph_mutex_is_locked(shardedpool_lock));
7c673cae
FG
326 int32_t thread_index = 0;
327 while (threads_shardedpool.size() < num_threads) {
328
329 WorkThreadSharded *wt = new WorkThreadSharded(this, thread_index);
330 ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;
331 threads_shardedpool.push_back(wt);
332 wt->create(thread_name.c_str());
333 thread_index++;
334 }
335}
336
337void ShardedThreadPool::start()
338{
339 ldout(cct,10) << "start" << dendl;
340
11fdf7f2 341 shardedpool_lock.lock();
7c673cae 342 start_threads();
11fdf7f2 343 shardedpool_lock.unlock();
7c673cae
FG
344 ldout(cct,15) << "started" << dendl;
345}
346
347void ShardedThreadPool::stop()
348{
349 ldout(cct,10) << "stop" << dendl;
31f18b77 350 stop_threads = true;
11fdf7f2 351 ceph_assert(wq != NULL);
7c673cae 352 wq->return_waiting_threads();
f67539c2 353 for (auto p = threads_shardedpool.begin();
7c673cae
FG
354 p != threads_shardedpool.end();
355 ++p) {
356 (*p)->join();
357 delete *p;
358 }
359 threads_shardedpool.clear();
360 ldout(cct,15) << "stopped" << dendl;
361}
362
363void ShardedThreadPool::pause()
364{
11fdf7f2 365 std::unique_lock ul(shardedpool_lock);
7c673cae 366 ldout(cct,10) << "pause" << dendl;
31f18b77 367 pause_threads = true;
11fdf7f2 368 ceph_assert(wq != NULL);
7c673cae
FG
369 wq->return_waiting_threads();
370 while (num_threads != num_paused){
11fdf7f2 371 wait_cond.wait(ul);
7c673cae 372 }
7c673cae
FG
373 ldout(cct,10) << "paused" << dendl;
374}
375
376void ShardedThreadPool::pause_new()
377{
378 ldout(cct,10) << "pause_new" << dendl;
11fdf7f2 379 shardedpool_lock.lock();
31f18b77 380 pause_threads = true;
11fdf7f2 381 ceph_assert(wq != NULL);
7c673cae 382 wq->return_waiting_threads();
11fdf7f2 383 shardedpool_lock.unlock();
7c673cae
FG
384 ldout(cct,10) << "paused_new" << dendl;
385}
386
387void ShardedThreadPool::unpause()
388{
389 ldout(cct,10) << "unpause" << dendl;
11fdf7f2 390 shardedpool_lock.lock();
31f18b77 391 pause_threads = false;
11fdf7f2
TL
392 wq->stop_return_waiting_threads();
393 shardedpool_cond.notify_all();
394 shardedpool_lock.unlock();
7c673cae
FG
395 ldout(cct,10) << "unpaused" << dendl;
396}
397
398void ShardedThreadPool::drain()
399{
11fdf7f2 400 std::unique_lock ul(shardedpool_lock);
7c673cae 401 ldout(cct,10) << "drain" << dendl;
31f18b77 402 drain_threads = true;
11fdf7f2 403 ceph_assert(wq != NULL);
7c673cae
FG
404 wq->return_waiting_threads();
405 while (num_threads != num_drained) {
11fdf7f2 406 wait_cond.wait(ul);
7c673cae 407 }
31f18b77 408 drain_threads = false;
11fdf7f2
TL
409 wq->stop_return_waiting_threads();
410 shardedpool_cond.notify_all();
7c673cae
FG
411 ldout(cct,10) << "drained" << dendl;
412}
413