]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/WorkQueue.cc
update sources to v12.1.2
[ceph.git] / ceph / src / common / WorkQueue.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "WorkQueue.h"
16 #include "include/compat.h"
17 #include "common/errno.h"
18
19 #define dout_subsys ceph_subsys_tp
20 #undef dout_prefix
21 #define dout_prefix *_dout << name << " "
22
23
24 ThreadPool::ThreadPool(CephContext *cct_, string nm, string tn, int n, const char *option)
25 : cct(cct_), name(std::move(nm)), thread_name(std::move(tn)),
26 lockname(name + "::lock"),
27 _lock(lockname.c_str()), // this should be safe due to declaration order
28 _stop(false),
29 _pause(0),
30 _draining(0),
31 ioprio_class(-1),
32 ioprio_priority(-1),
33 _num_threads(n),
34 processing(0)
35 {
36 if (option) {
37 _thread_num_option = option;
38 // set up conf_keys
39 _conf_keys = new const char*[2];
40 _conf_keys[0] = _thread_num_option.c_str();
41 _conf_keys[1] = NULL;
42 } else {
43 _conf_keys = new const char*[1];
44 _conf_keys[0] = NULL;
45 }
46 }
47
48 void ThreadPool::TPHandle::suspend_tp_timeout()
49 {
50 cct->get_heartbeat_map()->clear_timeout(hb);
51 }
52
53 void ThreadPool::TPHandle::reset_tp_timeout()
54 {
55 cct->get_heartbeat_map()->reset_timeout(
56 hb, grace, suicide_grace);
57 }
58
59 ThreadPool::~ThreadPool()
60 {
61 assert(_threads.empty());
62 delete[] _conf_keys;
63 }
64
65 void ThreadPool::handle_conf_change(const struct md_config_t *conf,
66 const std::set <std::string> &changed)
67 {
68 if (changed.count(_thread_num_option)) {
69 char *buf;
70 int r = conf->get_val(_thread_num_option.c_str(), &buf, -1);
71 assert(r >= 0);
72 int v = atoi(buf);
73 free(buf);
74 if (v >= 0) {
75 _lock.Lock();
76 _num_threads = v;
77 start_threads();
78 _cond.SignalAll();
79 _lock.Unlock();
80 }
81 }
82 }
83
84 void ThreadPool::worker(WorkThread *wt)
85 {
86 _lock.Lock();
87 ldout(cct,10) << "worker start" << dendl;
88
89 std::stringstream ss;
90 ss << name << " thread " << (void *)pthread_self();
91 heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self());
92
93 while (!_stop) {
94
95 // manage dynamic thread pool
96 join_old_threads();
97 if (_threads.size() > _num_threads) {
98 ldout(cct,1) << " worker shutting down; too many threads (" << _threads.size() << " > " << _num_threads << ")" << dendl;
99 _threads.erase(wt);
100 _old_threads.push_back(wt);
101 break;
102 }
103
104 if (!_pause && !work_queues.empty()) {
105 WorkQueue_* wq;
106 int tries = work_queues.size();
107 bool did = false;
108 while (tries--) {
109 next_work_queue %= work_queues.size();
110 wq = work_queues[next_work_queue++];
111
112 void *item = wq->_void_dequeue();
113 if (item) {
114 processing++;
115 ldout(cct,12) << "worker wq " << wq->name << " start processing " << item
116 << " (" << processing << " active)" << dendl;
117 TPHandle tp_handle(cct, hb, wq->timeout_interval, wq->suicide_interval);
118 tp_handle.reset_tp_timeout();
119 _lock.Unlock();
120 wq->_void_process(item, tp_handle);
121 _lock.Lock();
122 wq->_void_process_finish(item);
123 processing--;
124 ldout(cct,15) << "worker wq " << wq->name << " done processing " << item
125 << " (" << processing << " active)" << dendl;
126 if (_pause || _draining)
127 _wait_cond.Signal();
128 did = true;
129 break;
130 }
131 }
132 if (did)
133 continue;
134 }
135
136 ldout(cct,20) << "worker waiting" << dendl;
137 cct->get_heartbeat_map()->reset_timeout(
138 hb,
139 cct->_conf->threadpool_default_timeout,
140 0);
141 _cond.WaitInterval(_lock,
142 utime_t(
143 cct->_conf->threadpool_empty_queue_max_wait, 0));
144 }
145 ldout(cct,1) << "worker finish" << dendl;
146
147 cct->get_heartbeat_map()->remove_worker(hb);
148
149 _lock.Unlock();
150 }
151
152 void ThreadPool::start_threads()
153 {
154 assert(_lock.is_locked());
155 while (_threads.size() < _num_threads) {
156 WorkThread *wt = new WorkThread(this);
157 ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;
158 _threads.insert(wt);
159
160 int r = wt->set_ioprio(ioprio_class, ioprio_priority);
161 if (r < 0)
162 lderr(cct) << " set_ioprio got " << cpp_strerror(r) << dendl;
163
164 wt->create(thread_name.c_str());
165 }
166 }
167
168 void ThreadPool::join_old_threads()
169 {
170 assert(_lock.is_locked());
171 while (!_old_threads.empty()) {
172 ldout(cct, 10) << "join_old_threads joining and deleting " << _old_threads.front() << dendl;
173 _old_threads.front()->join();
174 delete _old_threads.front();
175 _old_threads.pop_front();
176 }
177 }
178
179 void ThreadPool::start()
180 {
181 ldout(cct,10) << "start" << dendl;
182
183 if (_thread_num_option.length()) {
184 ldout(cct, 10) << " registering config observer on " << _thread_num_option << dendl;
185 cct->_conf->add_observer(this);
186 }
187
188 _lock.Lock();
189 start_threads();
190 _lock.Unlock();
191 ldout(cct,15) << "started" << dendl;
192 }
193
194 void ThreadPool::stop(bool clear_after)
195 {
196 ldout(cct,10) << "stop" << dendl;
197
198 if (_thread_num_option.length()) {
199 ldout(cct, 10) << " unregistering config observer on " << _thread_num_option << dendl;
200 cct->_conf->remove_observer(this);
201 }
202
203 _lock.Lock();
204 _stop = true;
205 _cond.Signal();
206 join_old_threads();
207 _lock.Unlock();
208 for (set<WorkThread*>::iterator p = _threads.begin();
209 p != _threads.end();
210 ++p) {
211 (*p)->join();
212 delete *p;
213 }
214 _threads.clear();
215 _lock.Lock();
216 for (unsigned i=0; i<work_queues.size(); i++)
217 work_queues[i]->_clear();
218 _stop = false;
219 _lock.Unlock();
220 ldout(cct,15) << "stopped" << dendl;
221 }
222
223 void ThreadPool::pause()
224 {
225 ldout(cct,10) << "pause" << dendl;
226 _lock.Lock();
227 _pause++;
228 while (processing)
229 _wait_cond.Wait(_lock);
230 _lock.Unlock();
231 ldout(cct,15) << "paused" << dendl;
232 }
233
234 void ThreadPool::pause_new()
235 {
236 ldout(cct,10) << "pause_new" << dendl;
237 _lock.Lock();
238 _pause++;
239 _lock.Unlock();
240 }
241
242 void ThreadPool::unpause()
243 {
244 ldout(cct,10) << "unpause" << dendl;
245 _lock.Lock();
246 assert(_pause > 0);
247 _pause--;
248 _cond.Signal();
249 _lock.Unlock();
250 }
251
252 void ThreadPool::drain(WorkQueue_* wq)
253 {
254 ldout(cct,10) << "drain" << dendl;
255 _lock.Lock();
256 _draining++;
257 while (processing || (wq != NULL && !wq->_empty()))
258 _wait_cond.Wait(_lock);
259 _draining--;
260 _lock.Unlock();
261 }
262
263 void ThreadPool::set_ioprio(int cls, int priority)
264 {
265 Mutex::Locker l(_lock);
266 ioprio_class = cls;
267 ioprio_priority = priority;
268 for (set<WorkThread*>::iterator p = _threads.begin();
269 p != _threads.end();
270 ++p) {
271 ldout(cct,10) << __func__
272 << " class " << cls << " priority " << priority
273 << " pid " << (*p)->get_pid()
274 << dendl;
275 int r = (*p)->set_ioprio(cls, priority);
276 if (r < 0)
277 lderr(cct) << " set_ioprio got " << cpp_strerror(r) << dendl;
278 }
279 }
280
281 ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, string nm, string tn,
282 uint32_t pnum_threads):
283 cct(pcct_),
284 name(std::move(nm)),
285 thread_name(std::move(tn)),
286 lockname(name + "::lock"),
287 shardedpool_lock(lockname.c_str()),
288 num_threads(pnum_threads),
289 num_paused(0),
290 num_drained(0),
291 wq(NULL) {}
292
293 void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
294 {
295 assert(wq != NULL);
296 ldout(cct,10) << "worker start" << dendl;
297
298 std::stringstream ss;
299 ss << name << " thread " << (void *)pthread_self();
300 heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self());
301
302 while (!stop_threads) {
303 if (pause_threads) {
304 shardedpool_lock.Lock();
305 ++num_paused;
306 wait_cond.Signal();
307 while (pause_threads) {
308 cct->get_heartbeat_map()->reset_timeout(
309 hb,
310 wq->timeout_interval, wq->suicide_interval);
311 shardedpool_cond.WaitInterval(shardedpool_lock,
312 utime_t(
313 cct->_conf->threadpool_empty_queue_max_wait, 0));
314 }
315 --num_paused;
316 shardedpool_lock.Unlock();
317 }
318 if (drain_threads) {
319 shardedpool_lock.Lock();
320 if (wq->is_shard_empty(thread_index)) {
321 ++num_drained;
322 wait_cond.Signal();
323 while (drain_threads) {
324 cct->get_heartbeat_map()->reset_timeout(
325 hb,
326 wq->timeout_interval, wq->suicide_interval);
327 shardedpool_cond.WaitInterval(shardedpool_lock,
328 utime_t(
329 cct->_conf->threadpool_empty_queue_max_wait, 0));
330 }
331 --num_drained;
332 }
333 shardedpool_lock.Unlock();
334 }
335
336 cct->get_heartbeat_map()->reset_timeout(
337 hb,
338 wq->timeout_interval, wq->suicide_interval);
339 wq->_process(thread_index, hb);
340
341 }
342
343 ldout(cct,10) << "sharded worker finish" << dendl;
344
345 cct->get_heartbeat_map()->remove_worker(hb);
346
347 }
348
349 void ShardedThreadPool::start_threads()
350 {
351 assert(shardedpool_lock.is_locked());
352 int32_t thread_index = 0;
353 while (threads_shardedpool.size() < num_threads) {
354
355 WorkThreadSharded *wt = new WorkThreadSharded(this, thread_index);
356 ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;
357 threads_shardedpool.push_back(wt);
358 wt->create(thread_name.c_str());
359 thread_index++;
360 }
361 }
362
363 void ShardedThreadPool::start()
364 {
365 ldout(cct,10) << "start" << dendl;
366
367 shardedpool_lock.Lock();
368 start_threads();
369 shardedpool_lock.Unlock();
370 ldout(cct,15) << "started" << dendl;
371 }
372
373 void ShardedThreadPool::stop()
374 {
375 ldout(cct,10) << "stop" << dendl;
376 stop_threads = true;
377 assert(wq != NULL);
378 wq->return_waiting_threads();
379 for (vector<WorkThreadSharded*>::iterator p = threads_shardedpool.begin();
380 p != threads_shardedpool.end();
381 ++p) {
382 (*p)->join();
383 delete *p;
384 }
385 threads_shardedpool.clear();
386 ldout(cct,15) << "stopped" << dendl;
387 }
388
389 void ShardedThreadPool::pause()
390 {
391 ldout(cct,10) << "pause" << dendl;
392 shardedpool_lock.Lock();
393 pause_threads = true;
394 assert(wq != NULL);
395 wq->return_waiting_threads();
396 while (num_threads != num_paused){
397 wait_cond.Wait(shardedpool_lock);
398 }
399 shardedpool_lock.Unlock();
400 ldout(cct,10) << "paused" << dendl;
401 }
402
403 void ShardedThreadPool::pause_new()
404 {
405 ldout(cct,10) << "pause_new" << dendl;
406 shardedpool_lock.Lock();
407 pause_threads = true;
408 assert(wq != NULL);
409 wq->return_waiting_threads();
410 shardedpool_lock.Unlock();
411 ldout(cct,10) << "paused_new" << dendl;
412 }
413
414 void ShardedThreadPool::unpause()
415 {
416 ldout(cct,10) << "unpause" << dendl;
417 shardedpool_lock.Lock();
418 pause_threads = false;
419 shardedpool_cond.Signal();
420 shardedpool_lock.Unlock();
421 ldout(cct,10) << "unpaused" << dendl;
422 }
423
424 void ShardedThreadPool::drain()
425 {
426 ldout(cct,10) << "drain" << dendl;
427 shardedpool_lock.Lock();
428 drain_threads = true;
429 assert(wq != NULL);
430 wq->return_waiting_threads();
431 while (num_threads != num_drained) {
432 wait_cond.Wait(shardedpool_lock);
433 }
434 drain_threads = false;
435 shardedpool_cond.Signal();
436 shardedpool_lock.Unlock();
437 ldout(cct,10) << "drained" << dendl;
438 }
439