]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/WorkQueue.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / common / WorkQueue.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "include/compat.h"
16
17 #include <sstream>
18
19 #include "include/types.h"
20 #include "include/utime.h"
21 #include "common/errno.h"
22 #include "WorkQueue.h"
23
24 #include "common/config.h"
25 #include "common/HeartbeatMap.h"
26
27 #define dout_subsys ceph_subsys_tp
28 #undef dout_prefix
29 #define dout_prefix *_dout << name << " "
30
31
32 ThreadPool::ThreadPool(CephContext *cct_, string nm, string tn, int n, const char *option)
33 : cct(cct_), name(std::move(nm)), thread_name(std::move(tn)),
34 lockname(name + "::lock"),
35 _lock(lockname.c_str()), // this should be safe due to declaration order
36 _stop(false),
37 _pause(0),
38 _draining(0),
39 ioprio_class(-1),
40 ioprio_priority(-1),
41 _num_threads(n),
42 processing(0)
43 {
44 if (option) {
45 _thread_num_option = option;
46 // set up conf_keys
47 _conf_keys = new const char*[2];
48 _conf_keys[0] = _thread_num_option.c_str();
49 _conf_keys[1] = NULL;
50 } else {
51 _conf_keys = new const char*[1];
52 _conf_keys[0] = NULL;
53 }
54 }
55
56 void ThreadPool::TPHandle::suspend_tp_timeout()
57 {
58 cct->get_heartbeat_map()->clear_timeout(hb);
59 }
60
61 void ThreadPool::TPHandle::reset_tp_timeout()
62 {
63 cct->get_heartbeat_map()->reset_timeout(
64 hb, grace, suicide_grace);
65 }
66
67 ThreadPool::~ThreadPool()
68 {
69 assert(_threads.empty());
70 delete[] _conf_keys;
71 }
72
73 void ThreadPool::handle_conf_change(const struct md_config_t *conf,
74 const std::set <std::string> &changed)
75 {
76 if (changed.count(_thread_num_option)) {
77 char *buf;
78 int r = conf->get_val(_thread_num_option.c_str(), &buf, -1);
79 assert(r >= 0);
80 int v = atoi(buf);
81 free(buf);
82 if (v >= 0) {
83 _lock.Lock();
84 _num_threads = v;
85 start_threads();
86 _cond.SignalAll();
87 _lock.Unlock();
88 }
89 }
90 }
91
92 void ThreadPool::worker(WorkThread *wt)
93 {
94 _lock.Lock();
95 ldout(cct,10) << "worker start" << dendl;
96
97 std::stringstream ss;
98 char name[16] = {0};
99 ceph_pthread_getname(pthread_self(), name, sizeof(name));
100 ss << name << " thread " << name;
101 heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self());
102
103 while (!_stop) {
104
105 // manage dynamic thread pool
106 join_old_threads();
107 if (_threads.size() > _num_threads) {
108 ldout(cct,1) << " worker shutting down; too many threads (" << _threads.size() << " > " << _num_threads << ")" << dendl;
109 _threads.erase(wt);
110 _old_threads.push_back(wt);
111 break;
112 }
113
114 if (!_pause && !work_queues.empty()) {
115 WorkQueue_* wq;
116 int tries = work_queues.size();
117 bool did = false;
118 while (tries--) {
119 next_work_queue %= work_queues.size();
120 wq = work_queues[next_work_queue++];
121
122 void *item = wq->_void_dequeue();
123 if (item) {
124 processing++;
125 ldout(cct,12) << "worker wq " << wq->name << " start processing " << item
126 << " (" << processing << " active)" << dendl;
127 TPHandle tp_handle(cct, hb, wq->timeout_interval, wq->suicide_interval);
128 tp_handle.reset_tp_timeout();
129 _lock.Unlock();
130 wq->_void_process(item, tp_handle);
131 _lock.Lock();
132 wq->_void_process_finish(item);
133 processing--;
134 ldout(cct,15) << "worker wq " << wq->name << " done processing " << item
135 << " (" << processing << " active)" << dendl;
136 if (_pause || _draining)
137 _wait_cond.Signal();
138 did = true;
139 break;
140 }
141 }
142 if (did)
143 continue;
144 }
145
146 ldout(cct,20) << "worker waiting" << dendl;
147 cct->get_heartbeat_map()->reset_timeout(
148 hb,
149 cct->_conf->threadpool_default_timeout,
150 0);
151 _cond.WaitInterval(_lock,
152 utime_t(
153 cct->_conf->threadpool_empty_queue_max_wait, 0));
154 }
155 ldout(cct,1) << "worker finish" << dendl;
156
157 cct->get_heartbeat_map()->remove_worker(hb);
158
159 _lock.Unlock();
160 }
161
162 void ThreadPool::start_threads()
163 {
164 assert(_lock.is_locked());
165 while (_threads.size() < _num_threads) {
166 WorkThread *wt = new WorkThread(this);
167 ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;
168 _threads.insert(wt);
169
170 int r = wt->set_ioprio(ioprio_class, ioprio_priority);
171 if (r < 0)
172 lderr(cct) << " set_ioprio got " << cpp_strerror(r) << dendl;
173
174 wt->create(thread_name.c_str());
175 }
176 }
177
178 void ThreadPool::join_old_threads()
179 {
180 assert(_lock.is_locked());
181 while (!_old_threads.empty()) {
182 ldout(cct, 10) << "join_old_threads joining and deleting " << _old_threads.front() << dendl;
183 _old_threads.front()->join();
184 delete _old_threads.front();
185 _old_threads.pop_front();
186 }
187 }
188
189 void ThreadPool::start()
190 {
191 ldout(cct,10) << "start" << dendl;
192
193 if (_thread_num_option.length()) {
194 ldout(cct, 10) << " registering config observer on " << _thread_num_option << dendl;
195 cct->_conf->add_observer(this);
196 }
197
198 _lock.Lock();
199 start_threads();
200 _lock.Unlock();
201 ldout(cct,15) << "started" << dendl;
202 }
203
204 void ThreadPool::stop(bool clear_after)
205 {
206 ldout(cct,10) << "stop" << dendl;
207
208 if (_thread_num_option.length()) {
209 ldout(cct, 10) << " unregistering config observer on " << _thread_num_option << dendl;
210 cct->_conf->remove_observer(this);
211 }
212
213 _lock.Lock();
214 _stop = true;
215 _cond.Signal();
216 join_old_threads();
217 _lock.Unlock();
218 for (set<WorkThread*>::iterator p = _threads.begin();
219 p != _threads.end();
220 ++p) {
221 (*p)->join();
222 delete *p;
223 }
224 _threads.clear();
225 _lock.Lock();
226 for (unsigned i=0; i<work_queues.size(); i++)
227 work_queues[i]->_clear();
228 _stop = false;
229 _lock.Unlock();
230 ldout(cct,15) << "stopped" << dendl;
231 }
232
233 void ThreadPool::pause()
234 {
235 ldout(cct,10) << "pause" << dendl;
236 _lock.Lock();
237 _pause++;
238 while (processing)
239 _wait_cond.Wait(_lock);
240 _lock.Unlock();
241 ldout(cct,15) << "paused" << dendl;
242 }
243
244 void ThreadPool::pause_new()
245 {
246 ldout(cct,10) << "pause_new" << dendl;
247 _lock.Lock();
248 _pause++;
249 _lock.Unlock();
250 }
251
252 void ThreadPool::unpause()
253 {
254 ldout(cct,10) << "unpause" << dendl;
255 _lock.Lock();
256 assert(_pause > 0);
257 _pause--;
258 _cond.Signal();
259 _lock.Unlock();
260 }
261
262 void ThreadPool::drain(WorkQueue_* wq)
263 {
264 ldout(cct,10) << "drain" << dendl;
265 _lock.Lock();
266 _draining++;
267 while (processing || (wq != NULL && !wq->_empty()))
268 _wait_cond.Wait(_lock);
269 _draining--;
270 _lock.Unlock();
271 }
272
273 void ThreadPool::set_ioprio(int cls, int priority)
274 {
275 Mutex::Locker l(_lock);
276 ioprio_class = cls;
277 ioprio_priority = priority;
278 for (set<WorkThread*>::iterator p = _threads.begin();
279 p != _threads.end();
280 ++p) {
281 ldout(cct,10) << __func__
282 << " class " << cls << " priority " << priority
283 << " pid " << (*p)->get_pid()
284 << dendl;
285 int r = (*p)->set_ioprio(cls, priority);
286 if (r < 0)
287 lderr(cct) << " set_ioprio got " << cpp_strerror(r) << dendl;
288 }
289 }
290
291 ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, string nm, string tn,
292 uint32_t pnum_threads):
293 cct(pcct_),
294 name(std::move(nm)),
295 thread_name(std::move(tn)),
296 lockname(name + "::lock"),
297 shardedpool_lock(lockname.c_str()),
298 num_threads(pnum_threads),
299 stop_threads(0),
300 pause_threads(0),
301 drain_threads(0),
302 num_paused(0),
303 num_drained(0),
304 wq(NULL) {}
305
306 void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
307 {
308 assert(wq != NULL);
309 ldout(cct,10) << "worker start" << dendl;
310
311 std::stringstream ss;
312 char name[16] = {0};
313 ceph_pthread_getname(pthread_self(), name, sizeof(name));
314 ss << name << " thread " << name;
315 heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self());
316
317 while (!stop_threads.read()) {
318 if(pause_threads.read()) {
319 shardedpool_lock.Lock();
320 ++num_paused;
321 wait_cond.Signal();
322 while(pause_threads.read()) {
323 cct->get_heartbeat_map()->reset_timeout(
324 hb,
325 wq->timeout_interval, wq->suicide_interval);
326 shardedpool_cond.WaitInterval(shardedpool_lock,
327 utime_t(
328 cct->_conf->threadpool_empty_queue_max_wait, 0));
329 }
330 --num_paused;
331 shardedpool_lock.Unlock();
332 }
333 if (drain_threads.read()) {
334 shardedpool_lock.Lock();
335 if (wq->is_shard_empty(thread_index)) {
336 ++num_drained;
337 wait_cond.Signal();
338 while (drain_threads.read()) {
339 cct->get_heartbeat_map()->reset_timeout(
340 hb,
341 wq->timeout_interval, wq->suicide_interval);
342 shardedpool_cond.WaitInterval(shardedpool_lock,
343 utime_t(
344 cct->_conf->threadpool_empty_queue_max_wait, 0));
345 }
346 --num_drained;
347 }
348 shardedpool_lock.Unlock();
349 }
350
351 cct->get_heartbeat_map()->reset_timeout(
352 hb,
353 wq->timeout_interval, wq->suicide_interval);
354 wq->_process(thread_index, hb);
355
356 }
357
358 ldout(cct,10) << "sharded worker finish" << dendl;
359
360 cct->get_heartbeat_map()->remove_worker(hb);
361
362 }
363
364 void ShardedThreadPool::start_threads()
365 {
366 assert(shardedpool_lock.is_locked());
367 int32_t thread_index = 0;
368 while (threads_shardedpool.size() < num_threads) {
369
370 WorkThreadSharded *wt = new WorkThreadSharded(this, thread_index);
371 ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;
372 threads_shardedpool.push_back(wt);
373 wt->create(thread_name.c_str());
374 thread_index++;
375 }
376 }
377
378 void ShardedThreadPool::start()
379 {
380 ldout(cct,10) << "start" << dendl;
381
382 shardedpool_lock.Lock();
383 start_threads();
384 shardedpool_lock.Unlock();
385 ldout(cct,15) << "started" << dendl;
386 }
387
388 void ShardedThreadPool::stop()
389 {
390 ldout(cct,10) << "stop" << dendl;
391 stop_threads.set(1);
392 assert(wq != NULL);
393 wq->return_waiting_threads();
394 for (vector<WorkThreadSharded*>::iterator p = threads_shardedpool.begin();
395 p != threads_shardedpool.end();
396 ++p) {
397 (*p)->join();
398 delete *p;
399 }
400 threads_shardedpool.clear();
401 ldout(cct,15) << "stopped" << dendl;
402 }
403
404 void ShardedThreadPool::pause()
405 {
406 ldout(cct,10) << "pause" << dendl;
407 shardedpool_lock.Lock();
408 pause_threads.set(1);
409 assert(wq != NULL);
410 wq->return_waiting_threads();
411 while (num_threads != num_paused){
412 wait_cond.Wait(shardedpool_lock);
413 }
414 shardedpool_lock.Unlock();
415 ldout(cct,10) << "paused" << dendl;
416 }
417
418 void ShardedThreadPool::pause_new()
419 {
420 ldout(cct,10) << "pause_new" << dendl;
421 shardedpool_lock.Lock();
422 pause_threads.set(1);
423 assert(wq != NULL);
424 wq->return_waiting_threads();
425 shardedpool_lock.Unlock();
426 ldout(cct,10) << "paused_new" << dendl;
427 }
428
429 void ShardedThreadPool::unpause()
430 {
431 ldout(cct,10) << "unpause" << dendl;
432 shardedpool_lock.Lock();
433 pause_threads.set(0);
434 shardedpool_cond.Signal();
435 shardedpool_lock.Unlock();
436 ldout(cct,10) << "unpaused" << dendl;
437 }
438
439 void ShardedThreadPool::drain()
440 {
441 ldout(cct,10) << "drain" << dendl;
442 shardedpool_lock.Lock();
443 drain_threads.set(1);
444 assert(wq != NULL);
445 wq->return_waiting_threads();
446 while (num_threads != num_drained) {
447 wait_cond.Wait(shardedpool_lock);
448 }
449 drain_threads.set(0);
450 shardedpool_cond.Signal();
451 shardedpool_lock.Unlock();
452 ldout(cct,10) << "drained" << dendl;
453 }
454