]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
31f18b77 | 15 | #include "WorkQueue.h" |
7c673cae | 16 | #include "include/compat.h" |
7c673cae | 17 | #include "common/errno.h" |
7c673cae FG |
18 | |
19 | #define dout_subsys ceph_subsys_tp | |
20 | #undef dout_prefix | |
21 | #define dout_prefix *_dout << name << " " | |
22 | ||
23 | ||
24 | ThreadPool::ThreadPool(CephContext *cct_, string nm, string tn, int n, const char *option) | |
25 | : cct(cct_), name(std::move(nm)), thread_name(std::move(tn)), | |
26 | lockname(name + "::lock"), | |
27 | _lock(lockname.c_str()), // this should be safe due to declaration order | |
28 | _stop(false), | |
29 | _pause(0), | |
30 | _draining(0), | |
31 | ioprio_class(-1), | |
32 | ioprio_priority(-1), | |
33 | _num_threads(n), | |
34 | processing(0) | |
35 | { | |
36 | if (option) { | |
37 | _thread_num_option = option; | |
38 | // set up conf_keys | |
39 | _conf_keys = new const char*[2]; | |
40 | _conf_keys[0] = _thread_num_option.c_str(); | |
41 | _conf_keys[1] = NULL; | |
42 | } else { | |
43 | _conf_keys = new const char*[1]; | |
44 | _conf_keys[0] = NULL; | |
45 | } | |
46 | } | |
47 | ||
48 | void ThreadPool::TPHandle::suspend_tp_timeout() | |
49 | { | |
50 | cct->get_heartbeat_map()->clear_timeout(hb); | |
51 | } | |
52 | ||
53 | void ThreadPool::TPHandle::reset_tp_timeout() | |
54 | { | |
55 | cct->get_heartbeat_map()->reset_timeout( | |
56 | hb, grace, suicide_grace); | |
57 | } | |
58 | ||
59 | ThreadPool::~ThreadPool() | |
60 | { | |
61 | assert(_threads.empty()); | |
62 | delete[] _conf_keys; | |
63 | } | |
64 | ||
65 | void ThreadPool::handle_conf_change(const struct md_config_t *conf, | |
66 | const std::set <std::string> &changed) | |
67 | { | |
68 | if (changed.count(_thread_num_option)) { | |
69 | char *buf; | |
70 | int r = conf->get_val(_thread_num_option.c_str(), &buf, -1); | |
71 | assert(r >= 0); | |
72 | int v = atoi(buf); | |
73 | free(buf); | |
74 | if (v >= 0) { | |
75 | _lock.Lock(); | |
76 | _num_threads = v; | |
77 | start_threads(); | |
78 | _cond.SignalAll(); | |
79 | _lock.Unlock(); | |
80 | } | |
81 | } | |
82 | } | |
83 | ||
84 | void ThreadPool::worker(WorkThread *wt) | |
85 | { | |
86 | _lock.Lock(); | |
87 | ldout(cct,10) << "worker start" << dendl; | |
88 | ||
89 | std::stringstream ss; | |
90 | char name[16] = {0}; | |
91 | ceph_pthread_getname(pthread_self(), name, sizeof(name)); | |
92 | ss << name << " thread " << name; | |
93 | heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self()); | |
94 | ||
95 | while (!_stop) { | |
96 | ||
97 | // manage dynamic thread pool | |
98 | join_old_threads(); | |
99 | if (_threads.size() > _num_threads) { | |
100 | ldout(cct,1) << " worker shutting down; too many threads (" << _threads.size() << " > " << _num_threads << ")" << dendl; | |
101 | _threads.erase(wt); | |
102 | _old_threads.push_back(wt); | |
103 | break; | |
104 | } | |
105 | ||
106 | if (!_pause && !work_queues.empty()) { | |
107 | WorkQueue_* wq; | |
108 | int tries = work_queues.size(); | |
109 | bool did = false; | |
110 | while (tries--) { | |
111 | next_work_queue %= work_queues.size(); | |
112 | wq = work_queues[next_work_queue++]; | |
113 | ||
114 | void *item = wq->_void_dequeue(); | |
115 | if (item) { | |
116 | processing++; | |
117 | ldout(cct,12) << "worker wq " << wq->name << " start processing " << item | |
118 | << " (" << processing << " active)" << dendl; | |
119 | TPHandle tp_handle(cct, hb, wq->timeout_interval, wq->suicide_interval); | |
120 | tp_handle.reset_tp_timeout(); | |
121 | _lock.Unlock(); | |
122 | wq->_void_process(item, tp_handle); | |
123 | _lock.Lock(); | |
124 | wq->_void_process_finish(item); | |
125 | processing--; | |
126 | ldout(cct,15) << "worker wq " << wq->name << " done processing " << item | |
127 | << " (" << processing << " active)" << dendl; | |
128 | if (_pause || _draining) | |
129 | _wait_cond.Signal(); | |
130 | did = true; | |
131 | break; | |
132 | } | |
133 | } | |
134 | if (did) | |
135 | continue; | |
136 | } | |
137 | ||
138 | ldout(cct,20) << "worker waiting" << dendl; | |
139 | cct->get_heartbeat_map()->reset_timeout( | |
140 | hb, | |
141 | cct->_conf->threadpool_default_timeout, | |
142 | 0); | |
143 | _cond.WaitInterval(_lock, | |
144 | utime_t( | |
145 | cct->_conf->threadpool_empty_queue_max_wait, 0)); | |
146 | } | |
147 | ldout(cct,1) << "worker finish" << dendl; | |
148 | ||
149 | cct->get_heartbeat_map()->remove_worker(hb); | |
150 | ||
151 | _lock.Unlock(); | |
152 | } | |
153 | ||
154 | void ThreadPool::start_threads() | |
155 | { | |
156 | assert(_lock.is_locked()); | |
157 | while (_threads.size() < _num_threads) { | |
158 | WorkThread *wt = new WorkThread(this); | |
159 | ldout(cct, 10) << "start_threads creating and starting " << wt << dendl; | |
160 | _threads.insert(wt); | |
161 | ||
162 | int r = wt->set_ioprio(ioprio_class, ioprio_priority); | |
163 | if (r < 0) | |
164 | lderr(cct) << " set_ioprio got " << cpp_strerror(r) << dendl; | |
165 | ||
166 | wt->create(thread_name.c_str()); | |
167 | } | |
168 | } | |
169 | ||
170 | void ThreadPool::join_old_threads() | |
171 | { | |
172 | assert(_lock.is_locked()); | |
173 | while (!_old_threads.empty()) { | |
174 | ldout(cct, 10) << "join_old_threads joining and deleting " << _old_threads.front() << dendl; | |
175 | _old_threads.front()->join(); | |
176 | delete _old_threads.front(); | |
177 | _old_threads.pop_front(); | |
178 | } | |
179 | } | |
180 | ||
181 | void ThreadPool::start() | |
182 | { | |
183 | ldout(cct,10) << "start" << dendl; | |
184 | ||
185 | if (_thread_num_option.length()) { | |
186 | ldout(cct, 10) << " registering config observer on " << _thread_num_option << dendl; | |
187 | cct->_conf->add_observer(this); | |
188 | } | |
189 | ||
190 | _lock.Lock(); | |
191 | start_threads(); | |
192 | _lock.Unlock(); | |
193 | ldout(cct,15) << "started" << dendl; | |
194 | } | |
195 | ||
196 | void ThreadPool::stop(bool clear_after) | |
197 | { | |
198 | ldout(cct,10) << "stop" << dendl; | |
199 | ||
200 | if (_thread_num_option.length()) { | |
201 | ldout(cct, 10) << " unregistering config observer on " << _thread_num_option << dendl; | |
202 | cct->_conf->remove_observer(this); | |
203 | } | |
204 | ||
205 | _lock.Lock(); | |
206 | _stop = true; | |
207 | _cond.Signal(); | |
208 | join_old_threads(); | |
209 | _lock.Unlock(); | |
210 | for (set<WorkThread*>::iterator p = _threads.begin(); | |
211 | p != _threads.end(); | |
212 | ++p) { | |
213 | (*p)->join(); | |
214 | delete *p; | |
215 | } | |
216 | _threads.clear(); | |
217 | _lock.Lock(); | |
218 | for (unsigned i=0; i<work_queues.size(); i++) | |
219 | work_queues[i]->_clear(); | |
220 | _stop = false; | |
221 | _lock.Unlock(); | |
222 | ldout(cct,15) << "stopped" << dendl; | |
223 | } | |
224 | ||
225 | void ThreadPool::pause() | |
226 | { | |
227 | ldout(cct,10) << "pause" << dendl; | |
228 | _lock.Lock(); | |
229 | _pause++; | |
230 | while (processing) | |
231 | _wait_cond.Wait(_lock); | |
232 | _lock.Unlock(); | |
233 | ldout(cct,15) << "paused" << dendl; | |
234 | } | |
235 | ||
236 | void ThreadPool::pause_new() | |
237 | { | |
238 | ldout(cct,10) << "pause_new" << dendl; | |
239 | _lock.Lock(); | |
240 | _pause++; | |
241 | _lock.Unlock(); | |
242 | } | |
243 | ||
244 | void ThreadPool::unpause() | |
245 | { | |
246 | ldout(cct,10) << "unpause" << dendl; | |
247 | _lock.Lock(); | |
248 | assert(_pause > 0); | |
249 | _pause--; | |
250 | _cond.Signal(); | |
251 | _lock.Unlock(); | |
252 | } | |
253 | ||
254 | void ThreadPool::drain(WorkQueue_* wq) | |
255 | { | |
256 | ldout(cct,10) << "drain" << dendl; | |
257 | _lock.Lock(); | |
258 | _draining++; | |
259 | while (processing || (wq != NULL && !wq->_empty())) | |
260 | _wait_cond.Wait(_lock); | |
261 | _draining--; | |
262 | _lock.Unlock(); | |
263 | } | |
264 | ||
265 | void ThreadPool::set_ioprio(int cls, int priority) | |
266 | { | |
267 | Mutex::Locker l(_lock); | |
268 | ioprio_class = cls; | |
269 | ioprio_priority = priority; | |
270 | for (set<WorkThread*>::iterator p = _threads.begin(); | |
271 | p != _threads.end(); | |
272 | ++p) { | |
273 | ldout(cct,10) << __func__ | |
274 | << " class " << cls << " priority " << priority | |
275 | << " pid " << (*p)->get_pid() | |
276 | << dendl; | |
277 | int r = (*p)->set_ioprio(cls, priority); | |
278 | if (r < 0) | |
279 | lderr(cct) << " set_ioprio got " << cpp_strerror(r) << dendl; | |
280 | } | |
281 | } | |
282 | ||
283 | ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, string nm, string tn, | |
284 | uint32_t pnum_threads): | |
285 | cct(pcct_), | |
286 | name(std::move(nm)), | |
287 | thread_name(std::move(tn)), | |
288 | lockname(name + "::lock"), | |
289 | shardedpool_lock(lockname.c_str()), | |
290 | num_threads(pnum_threads), | |
7c673cae FG |
291 | num_paused(0), |
292 | num_drained(0), | |
293 | wq(NULL) {} | |
294 | ||
295 | void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index) | |
296 | { | |
297 | assert(wq != NULL); | |
298 | ldout(cct,10) << "worker start" << dendl; | |
299 | ||
300 | std::stringstream ss; | |
301 | char name[16] = {0}; | |
302 | ceph_pthread_getname(pthread_self(), name, sizeof(name)); | |
303 | ss << name << " thread " << name; | |
304 | heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self()); | |
305 | ||
31f18b77 FG |
306 | while (!stop_threads) { |
307 | if (pause_threads) { | |
7c673cae FG |
308 | shardedpool_lock.Lock(); |
309 | ++num_paused; | |
310 | wait_cond.Signal(); | |
31f18b77 | 311 | while (pause_threads) { |
7c673cae | 312 | cct->get_heartbeat_map()->reset_timeout( |
31f18b77 FG |
313 | hb, |
314 | wq->timeout_interval, wq->suicide_interval); | |
7c673cae | 315 | shardedpool_cond.WaitInterval(shardedpool_lock, |
31f18b77 | 316 | utime_t( |
7c673cae FG |
317 | cct->_conf->threadpool_empty_queue_max_wait, 0)); |
318 | } | |
319 | --num_paused; | |
320 | shardedpool_lock.Unlock(); | |
321 | } | |
31f18b77 | 322 | if (drain_threads) { |
7c673cae FG |
323 | shardedpool_lock.Lock(); |
324 | if (wq->is_shard_empty(thread_index)) { | |
325 | ++num_drained; | |
326 | wait_cond.Signal(); | |
31f18b77 | 327 | while (drain_threads) { |
7c673cae FG |
328 | cct->get_heartbeat_map()->reset_timeout( |
329 | hb, | |
330 | wq->timeout_interval, wq->suicide_interval); | |
331 | shardedpool_cond.WaitInterval(shardedpool_lock, | |
332 | utime_t( | |
333 | cct->_conf->threadpool_empty_queue_max_wait, 0)); | |
334 | } | |
335 | --num_drained; | |
336 | } | |
337 | shardedpool_lock.Unlock(); | |
338 | } | |
339 | ||
340 | cct->get_heartbeat_map()->reset_timeout( | |
341 | hb, | |
342 | wq->timeout_interval, wq->suicide_interval); | |
343 | wq->_process(thread_index, hb); | |
344 | ||
345 | } | |
346 | ||
347 | ldout(cct,10) << "sharded worker finish" << dendl; | |
348 | ||
349 | cct->get_heartbeat_map()->remove_worker(hb); | |
350 | ||
351 | } | |
352 | ||
353 | void ShardedThreadPool::start_threads() | |
354 | { | |
355 | assert(shardedpool_lock.is_locked()); | |
356 | int32_t thread_index = 0; | |
357 | while (threads_shardedpool.size() < num_threads) { | |
358 | ||
359 | WorkThreadSharded *wt = new WorkThreadSharded(this, thread_index); | |
360 | ldout(cct, 10) << "start_threads creating and starting " << wt << dendl; | |
361 | threads_shardedpool.push_back(wt); | |
362 | wt->create(thread_name.c_str()); | |
363 | thread_index++; | |
364 | } | |
365 | } | |
366 | ||
367 | void ShardedThreadPool::start() | |
368 | { | |
369 | ldout(cct,10) << "start" << dendl; | |
370 | ||
371 | shardedpool_lock.Lock(); | |
372 | start_threads(); | |
373 | shardedpool_lock.Unlock(); | |
374 | ldout(cct,15) << "started" << dendl; | |
375 | } | |
376 | ||
377 | void ShardedThreadPool::stop() | |
378 | { | |
379 | ldout(cct,10) << "stop" << dendl; | |
31f18b77 | 380 | stop_threads = true; |
7c673cae FG |
381 | assert(wq != NULL); |
382 | wq->return_waiting_threads(); | |
383 | for (vector<WorkThreadSharded*>::iterator p = threads_shardedpool.begin(); | |
384 | p != threads_shardedpool.end(); | |
385 | ++p) { | |
386 | (*p)->join(); | |
387 | delete *p; | |
388 | } | |
389 | threads_shardedpool.clear(); | |
390 | ldout(cct,15) << "stopped" << dendl; | |
391 | } | |
392 | ||
393 | void ShardedThreadPool::pause() | |
394 | { | |
395 | ldout(cct,10) << "pause" << dendl; | |
396 | shardedpool_lock.Lock(); | |
31f18b77 | 397 | pause_threads = true; |
7c673cae FG |
398 | assert(wq != NULL); |
399 | wq->return_waiting_threads(); | |
400 | while (num_threads != num_paused){ | |
401 | wait_cond.Wait(shardedpool_lock); | |
402 | } | |
403 | shardedpool_lock.Unlock(); | |
404 | ldout(cct,10) << "paused" << dendl; | |
405 | } | |
406 | ||
407 | void ShardedThreadPool::pause_new() | |
408 | { | |
409 | ldout(cct,10) << "pause_new" << dendl; | |
410 | shardedpool_lock.Lock(); | |
31f18b77 | 411 | pause_threads = true; |
7c673cae FG |
412 | assert(wq != NULL); |
413 | wq->return_waiting_threads(); | |
414 | shardedpool_lock.Unlock(); | |
415 | ldout(cct,10) << "paused_new" << dendl; | |
416 | } | |
417 | ||
418 | void ShardedThreadPool::unpause() | |
419 | { | |
420 | ldout(cct,10) << "unpause" << dendl; | |
421 | shardedpool_lock.Lock(); | |
31f18b77 | 422 | pause_threads = false; |
7c673cae FG |
423 | shardedpool_cond.Signal(); |
424 | shardedpool_lock.Unlock(); | |
425 | ldout(cct,10) << "unpaused" << dendl; | |
426 | } | |
427 | ||
428 | void ShardedThreadPool::drain() | |
429 | { | |
430 | ldout(cct,10) << "drain" << dendl; | |
431 | shardedpool_lock.Lock(); | |
31f18b77 | 432 | drain_threads = true; |
7c673cae FG |
433 | assert(wq != NULL); |
434 | wq->return_waiting_threads(); | |
435 | while (num_threads != num_drained) { | |
436 | wait_cond.Wait(shardedpool_lock); | |
437 | } | |
31f18b77 | 438 | drain_threads = false; |
7c673cae FG |
439 | shardedpool_cond.Signal(); |
440 | shardedpool_lock.Unlock(); | |
441 | ldout(cct,10) << "drained" << dendl; | |
442 | } | |
443 |