]> git.proxmox.com Git - ceph.git/blame - ceph/src/common/WorkQueue.cc
update sources to v12.1.0
[ceph.git] / ceph / src / common / WorkQueue.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
31f18b77 15#include "WorkQueue.h"
7c673cae 16#include "include/compat.h"
7c673cae 17#include "common/errno.h"
7c673cae
FG
18
19#define dout_subsys ceph_subsys_tp
20#undef dout_prefix
21#define dout_prefix *_dout << name << " "
22
23
24ThreadPool::ThreadPool(CephContext *cct_, string nm, string tn, int n, const char *option)
25 : cct(cct_), name(std::move(nm)), thread_name(std::move(tn)),
26 lockname(name + "::lock"),
27 _lock(lockname.c_str()), // this should be safe due to declaration order
28 _stop(false),
29 _pause(0),
30 _draining(0),
31 ioprio_class(-1),
32 ioprio_priority(-1),
33 _num_threads(n),
34 processing(0)
35{
36 if (option) {
37 _thread_num_option = option;
38 // set up conf_keys
39 _conf_keys = new const char*[2];
40 _conf_keys[0] = _thread_num_option.c_str();
41 _conf_keys[1] = NULL;
42 } else {
43 _conf_keys = new const char*[1];
44 _conf_keys[0] = NULL;
45 }
46}
47
48void ThreadPool::TPHandle::suspend_tp_timeout()
49{
50 cct->get_heartbeat_map()->clear_timeout(hb);
51}
52
53void ThreadPool::TPHandle::reset_tp_timeout()
54{
55 cct->get_heartbeat_map()->reset_timeout(
56 hb, grace, suicide_grace);
57}
58
59ThreadPool::~ThreadPool()
60{
61 assert(_threads.empty());
62 delete[] _conf_keys;
63}
64
65void ThreadPool::handle_conf_change(const struct md_config_t *conf,
66 const std::set <std::string> &changed)
67{
68 if (changed.count(_thread_num_option)) {
69 char *buf;
70 int r = conf->get_val(_thread_num_option.c_str(), &buf, -1);
71 assert(r >= 0);
72 int v = atoi(buf);
73 free(buf);
74 if (v >= 0) {
75 _lock.Lock();
76 _num_threads = v;
77 start_threads();
78 _cond.SignalAll();
79 _lock.Unlock();
80 }
81 }
82}
83
84void ThreadPool::worker(WorkThread *wt)
85{
86 _lock.Lock();
87 ldout(cct,10) << "worker start" << dendl;
88
89 std::stringstream ss;
90 char name[16] = {0};
91 ceph_pthread_getname(pthread_self(), name, sizeof(name));
92 ss << name << " thread " << name;
93 heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self());
94
95 while (!_stop) {
96
97 // manage dynamic thread pool
98 join_old_threads();
99 if (_threads.size() > _num_threads) {
100 ldout(cct,1) << " worker shutting down; too many threads (" << _threads.size() << " > " << _num_threads << ")" << dendl;
101 _threads.erase(wt);
102 _old_threads.push_back(wt);
103 break;
104 }
105
106 if (!_pause && !work_queues.empty()) {
107 WorkQueue_* wq;
108 int tries = work_queues.size();
109 bool did = false;
110 while (tries--) {
111 next_work_queue %= work_queues.size();
112 wq = work_queues[next_work_queue++];
113
114 void *item = wq->_void_dequeue();
115 if (item) {
116 processing++;
117 ldout(cct,12) << "worker wq " << wq->name << " start processing " << item
118 << " (" << processing << " active)" << dendl;
119 TPHandle tp_handle(cct, hb, wq->timeout_interval, wq->suicide_interval);
120 tp_handle.reset_tp_timeout();
121 _lock.Unlock();
122 wq->_void_process(item, tp_handle);
123 _lock.Lock();
124 wq->_void_process_finish(item);
125 processing--;
126 ldout(cct,15) << "worker wq " << wq->name << " done processing " << item
127 << " (" << processing << " active)" << dendl;
128 if (_pause || _draining)
129 _wait_cond.Signal();
130 did = true;
131 break;
132 }
133 }
134 if (did)
135 continue;
136 }
137
138 ldout(cct,20) << "worker waiting" << dendl;
139 cct->get_heartbeat_map()->reset_timeout(
140 hb,
141 cct->_conf->threadpool_default_timeout,
142 0);
143 _cond.WaitInterval(_lock,
144 utime_t(
145 cct->_conf->threadpool_empty_queue_max_wait, 0));
146 }
147 ldout(cct,1) << "worker finish" << dendl;
148
149 cct->get_heartbeat_map()->remove_worker(hb);
150
151 _lock.Unlock();
152}
153
154void ThreadPool::start_threads()
155{
156 assert(_lock.is_locked());
157 while (_threads.size() < _num_threads) {
158 WorkThread *wt = new WorkThread(this);
159 ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;
160 _threads.insert(wt);
161
162 int r = wt->set_ioprio(ioprio_class, ioprio_priority);
163 if (r < 0)
164 lderr(cct) << " set_ioprio got " << cpp_strerror(r) << dendl;
165
166 wt->create(thread_name.c_str());
167 }
168}
169
170void ThreadPool::join_old_threads()
171{
172 assert(_lock.is_locked());
173 while (!_old_threads.empty()) {
174 ldout(cct, 10) << "join_old_threads joining and deleting " << _old_threads.front() << dendl;
175 _old_threads.front()->join();
176 delete _old_threads.front();
177 _old_threads.pop_front();
178 }
179}
180
181void ThreadPool::start()
182{
183 ldout(cct,10) << "start" << dendl;
184
185 if (_thread_num_option.length()) {
186 ldout(cct, 10) << " registering config observer on " << _thread_num_option << dendl;
187 cct->_conf->add_observer(this);
188 }
189
190 _lock.Lock();
191 start_threads();
192 _lock.Unlock();
193 ldout(cct,15) << "started" << dendl;
194}
195
196void ThreadPool::stop(bool clear_after)
197{
198 ldout(cct,10) << "stop" << dendl;
199
200 if (_thread_num_option.length()) {
201 ldout(cct, 10) << " unregistering config observer on " << _thread_num_option << dendl;
202 cct->_conf->remove_observer(this);
203 }
204
205 _lock.Lock();
206 _stop = true;
207 _cond.Signal();
208 join_old_threads();
209 _lock.Unlock();
210 for (set<WorkThread*>::iterator p = _threads.begin();
211 p != _threads.end();
212 ++p) {
213 (*p)->join();
214 delete *p;
215 }
216 _threads.clear();
217 _lock.Lock();
218 for (unsigned i=0; i<work_queues.size(); i++)
219 work_queues[i]->_clear();
220 _stop = false;
221 _lock.Unlock();
222 ldout(cct,15) << "stopped" << dendl;
223}
224
225void ThreadPool::pause()
226{
227 ldout(cct,10) << "pause" << dendl;
228 _lock.Lock();
229 _pause++;
230 while (processing)
231 _wait_cond.Wait(_lock);
232 _lock.Unlock();
233 ldout(cct,15) << "paused" << dendl;
234}
235
236void ThreadPool::pause_new()
237{
238 ldout(cct,10) << "pause_new" << dendl;
239 _lock.Lock();
240 _pause++;
241 _lock.Unlock();
242}
243
244void ThreadPool::unpause()
245{
246 ldout(cct,10) << "unpause" << dendl;
247 _lock.Lock();
248 assert(_pause > 0);
249 _pause--;
250 _cond.Signal();
251 _lock.Unlock();
252}
253
254void ThreadPool::drain(WorkQueue_* wq)
255{
256 ldout(cct,10) << "drain" << dendl;
257 _lock.Lock();
258 _draining++;
259 while (processing || (wq != NULL && !wq->_empty()))
260 _wait_cond.Wait(_lock);
261 _draining--;
262 _lock.Unlock();
263}
264
265void ThreadPool::set_ioprio(int cls, int priority)
266{
267 Mutex::Locker l(_lock);
268 ioprio_class = cls;
269 ioprio_priority = priority;
270 for (set<WorkThread*>::iterator p = _threads.begin();
271 p != _threads.end();
272 ++p) {
273 ldout(cct,10) << __func__
274 << " class " << cls << " priority " << priority
275 << " pid " << (*p)->get_pid()
276 << dendl;
277 int r = (*p)->set_ioprio(cls, priority);
278 if (r < 0)
279 lderr(cct) << " set_ioprio got " << cpp_strerror(r) << dendl;
280 }
281}
282
283ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, string nm, string tn,
284 uint32_t pnum_threads):
285 cct(pcct_),
286 name(std::move(nm)),
287 thread_name(std::move(tn)),
288 lockname(name + "::lock"),
289 shardedpool_lock(lockname.c_str()),
290 num_threads(pnum_threads),
7c673cae
FG
291 num_paused(0),
292 num_drained(0),
293 wq(NULL) {}
294
295void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
296{
297 assert(wq != NULL);
298 ldout(cct,10) << "worker start" << dendl;
299
300 std::stringstream ss;
301 char name[16] = {0};
302 ceph_pthread_getname(pthread_self(), name, sizeof(name));
303 ss << name << " thread " << name;
304 heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self());
305
31f18b77
FG
306 while (!stop_threads) {
307 if (pause_threads) {
7c673cae
FG
308 shardedpool_lock.Lock();
309 ++num_paused;
310 wait_cond.Signal();
31f18b77 311 while (pause_threads) {
7c673cae 312 cct->get_heartbeat_map()->reset_timeout(
31f18b77
FG
313 hb,
314 wq->timeout_interval, wq->suicide_interval);
7c673cae 315 shardedpool_cond.WaitInterval(shardedpool_lock,
31f18b77 316 utime_t(
7c673cae
FG
317 cct->_conf->threadpool_empty_queue_max_wait, 0));
318 }
319 --num_paused;
320 shardedpool_lock.Unlock();
321 }
31f18b77 322 if (drain_threads) {
7c673cae
FG
323 shardedpool_lock.Lock();
324 if (wq->is_shard_empty(thread_index)) {
325 ++num_drained;
326 wait_cond.Signal();
31f18b77 327 while (drain_threads) {
7c673cae
FG
328 cct->get_heartbeat_map()->reset_timeout(
329 hb,
330 wq->timeout_interval, wq->suicide_interval);
331 shardedpool_cond.WaitInterval(shardedpool_lock,
332 utime_t(
333 cct->_conf->threadpool_empty_queue_max_wait, 0));
334 }
335 --num_drained;
336 }
337 shardedpool_lock.Unlock();
338 }
339
340 cct->get_heartbeat_map()->reset_timeout(
341 hb,
342 wq->timeout_interval, wq->suicide_interval);
343 wq->_process(thread_index, hb);
344
345 }
346
347 ldout(cct,10) << "sharded worker finish" << dendl;
348
349 cct->get_heartbeat_map()->remove_worker(hb);
350
351}
352
353void ShardedThreadPool::start_threads()
354{
355 assert(shardedpool_lock.is_locked());
356 int32_t thread_index = 0;
357 while (threads_shardedpool.size() < num_threads) {
358
359 WorkThreadSharded *wt = new WorkThreadSharded(this, thread_index);
360 ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;
361 threads_shardedpool.push_back(wt);
362 wt->create(thread_name.c_str());
363 thread_index++;
364 }
365}
366
367void ShardedThreadPool::start()
368{
369 ldout(cct,10) << "start" << dendl;
370
371 shardedpool_lock.Lock();
372 start_threads();
373 shardedpool_lock.Unlock();
374 ldout(cct,15) << "started" << dendl;
375}
376
377void ShardedThreadPool::stop()
378{
379 ldout(cct,10) << "stop" << dendl;
31f18b77 380 stop_threads = true;
7c673cae
FG
381 assert(wq != NULL);
382 wq->return_waiting_threads();
383 for (vector<WorkThreadSharded*>::iterator p = threads_shardedpool.begin();
384 p != threads_shardedpool.end();
385 ++p) {
386 (*p)->join();
387 delete *p;
388 }
389 threads_shardedpool.clear();
390 ldout(cct,15) << "stopped" << dendl;
391}
392
393void ShardedThreadPool::pause()
394{
395 ldout(cct,10) << "pause" << dendl;
396 shardedpool_lock.Lock();
31f18b77 397 pause_threads = true;
7c673cae
FG
398 assert(wq != NULL);
399 wq->return_waiting_threads();
400 while (num_threads != num_paused){
401 wait_cond.Wait(shardedpool_lock);
402 }
403 shardedpool_lock.Unlock();
404 ldout(cct,10) << "paused" << dendl;
405}
406
407void ShardedThreadPool::pause_new()
408{
409 ldout(cct,10) << "pause_new" << dendl;
410 shardedpool_lock.Lock();
31f18b77 411 pause_threads = true;
7c673cae
FG
412 assert(wq != NULL);
413 wq->return_waiting_threads();
414 shardedpool_lock.Unlock();
415 ldout(cct,10) << "paused_new" << dendl;
416}
417
418void ShardedThreadPool::unpause()
419{
420 ldout(cct,10) << "unpause" << dendl;
421 shardedpool_lock.Lock();
31f18b77 422 pause_threads = false;
7c673cae
FG
423 shardedpool_cond.Signal();
424 shardedpool_lock.Unlock();
425 ldout(cct,10) << "unpaused" << dendl;
426}
427
428void ShardedThreadPool::drain()
429{
430 ldout(cct,10) << "drain" << dendl;
431 shardedpool_lock.Lock();
31f18b77 432 drain_threads = true;
7c673cae
FG
433 assert(wq != NULL);
434 wq->return_waiting_threads();
435 while (num_threads != num_drained) {
436 wait_cond.Wait(shardedpool_lock);
437 }
31f18b77 438 drain_threads = false;
7c673cae
FG
439 shardedpool_cond.Signal();
440 shardedpool_lock.Unlock();
441 ldout(cct,10) << "drained" << dendl;
442}
443