]>
Commit | Line | Data |
---|---|---|
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- | |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #include "WorkQueue.h" | |
16 | #include "include/compat.h" | |
17 | #include "common/errno.h" | |
18 | ||
19 | #define dout_subsys ceph_subsys_tp | |
20 | #undef dout_prefix | |
21 | #define dout_prefix *_dout << name << " " | |
22 | ||
23 | ||
24 | ThreadPool::ThreadPool(CephContext *cct_, string nm, string tn, int n, const char *option) | |
25 | : cct(cct_), name(std::move(nm)), thread_name(std::move(tn)), | |
26 | lockname(name + "::lock"), | |
27 | _lock(lockname.c_str()), // this should be safe due to declaration order | |
28 | _stop(false), | |
29 | _pause(0), | |
30 | _draining(0), | |
31 | ioprio_class(-1), | |
32 | ioprio_priority(-1), | |
33 | _num_threads(n), | |
34 | processing(0) | |
35 | { | |
36 | if (option) { | |
37 | _thread_num_option = option; | |
38 | // set up conf_keys | |
39 | _conf_keys = new const char*[2]; | |
40 | _conf_keys[0] = _thread_num_option.c_str(); | |
41 | _conf_keys[1] = NULL; | |
42 | } else { | |
43 | _conf_keys = new const char*[1]; | |
44 | _conf_keys[0] = NULL; | |
45 | } | |
46 | } | |
47 | ||
48 | void ThreadPool::TPHandle::suspend_tp_timeout() | |
49 | { | |
50 | cct->get_heartbeat_map()->clear_timeout(hb); | |
51 | } | |
52 | ||
53 | void ThreadPool::TPHandle::reset_tp_timeout() | |
54 | { | |
55 | cct->get_heartbeat_map()->reset_timeout( | |
56 | hb, grace, suicide_grace); | |
57 | } | |
58 | ||
59 | ThreadPool::~ThreadPool() | |
60 | { | |
61 | assert(_threads.empty()); | |
62 | delete[] _conf_keys; | |
63 | } | |
64 | ||
65 | void ThreadPool::handle_conf_change(const struct md_config_t *conf, | |
66 | const std::set <std::string> &changed) | |
67 | { | |
68 | if (changed.count(_thread_num_option)) { | |
69 | char *buf; | |
70 | int r = conf->get_val(_thread_num_option.c_str(), &buf, -1); | |
71 | assert(r >= 0); | |
72 | int v = atoi(buf); | |
73 | free(buf); | |
74 | if (v >= 0) { | |
75 | _lock.Lock(); | |
76 | _num_threads = v; | |
77 | start_threads(); | |
78 | _cond.SignalAll(); | |
79 | _lock.Unlock(); | |
80 | } | |
81 | } | |
82 | } | |
83 | ||
84 | void ThreadPool::worker(WorkThread *wt) | |
85 | { | |
86 | _lock.Lock(); | |
87 | ldout(cct,10) << "worker start" << dendl; | |
88 | ||
89 | std::stringstream ss; | |
90 | ss << name << " thread " << (void *)pthread_self(); | |
91 | heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self()); | |
92 | ||
93 | while (!_stop) { | |
94 | ||
95 | // manage dynamic thread pool | |
96 | join_old_threads(); | |
97 | if (_threads.size() > _num_threads) { | |
98 | ldout(cct,1) << " worker shutting down; too many threads (" << _threads.size() << " > " << _num_threads << ")" << dendl; | |
99 | _threads.erase(wt); | |
100 | _old_threads.push_back(wt); | |
101 | break; | |
102 | } | |
103 | ||
104 | if (!_pause && !work_queues.empty()) { | |
105 | WorkQueue_* wq; | |
106 | int tries = work_queues.size(); | |
107 | bool did = false; | |
108 | while (tries--) { | |
109 | next_work_queue %= work_queues.size(); | |
110 | wq = work_queues[next_work_queue++]; | |
111 | ||
112 | void *item = wq->_void_dequeue(); | |
113 | if (item) { | |
114 | processing++; | |
115 | ldout(cct,12) << "worker wq " << wq->name << " start processing " << item | |
116 | << " (" << processing << " active)" << dendl; | |
117 | TPHandle tp_handle(cct, hb, wq->timeout_interval, wq->suicide_interval); | |
118 | tp_handle.reset_tp_timeout(); | |
119 | _lock.Unlock(); | |
120 | wq->_void_process(item, tp_handle); | |
121 | _lock.Lock(); | |
122 | wq->_void_process_finish(item); | |
123 | processing--; | |
124 | ldout(cct,15) << "worker wq " << wq->name << " done processing " << item | |
125 | << " (" << processing << " active)" << dendl; | |
126 | if (_pause || _draining) | |
127 | _wait_cond.Signal(); | |
128 | did = true; | |
129 | break; | |
130 | } | |
131 | } | |
132 | if (did) | |
133 | continue; | |
134 | } | |
135 | ||
136 | ldout(cct,20) << "worker waiting" << dendl; | |
137 | cct->get_heartbeat_map()->reset_timeout( | |
138 | hb, | |
139 | cct->_conf->threadpool_default_timeout, | |
140 | 0); | |
141 | _cond.WaitInterval(_lock, | |
142 | utime_t( | |
143 | cct->_conf->threadpool_empty_queue_max_wait, 0)); | |
144 | } | |
145 | ldout(cct,1) << "worker finish" << dendl; | |
146 | ||
147 | cct->get_heartbeat_map()->remove_worker(hb); | |
148 | ||
149 | _lock.Unlock(); | |
150 | } | |
151 | ||
152 | void ThreadPool::start_threads() | |
153 | { | |
154 | assert(_lock.is_locked()); | |
155 | while (_threads.size() < _num_threads) { | |
156 | WorkThread *wt = new WorkThread(this); | |
157 | ldout(cct, 10) << "start_threads creating and starting " << wt << dendl; | |
158 | _threads.insert(wt); | |
159 | ||
160 | int r = wt->set_ioprio(ioprio_class, ioprio_priority); | |
161 | if (r < 0) | |
162 | lderr(cct) << " set_ioprio got " << cpp_strerror(r) << dendl; | |
163 | ||
164 | wt->create(thread_name.c_str()); | |
165 | } | |
166 | } | |
167 | ||
168 | void ThreadPool::join_old_threads() | |
169 | { | |
170 | assert(_lock.is_locked()); | |
171 | while (!_old_threads.empty()) { | |
172 | ldout(cct, 10) << "join_old_threads joining and deleting " << _old_threads.front() << dendl; | |
173 | _old_threads.front()->join(); | |
174 | delete _old_threads.front(); | |
175 | _old_threads.pop_front(); | |
176 | } | |
177 | } | |
178 | ||
179 | void ThreadPool::start() | |
180 | { | |
181 | ldout(cct,10) << "start" << dendl; | |
182 | ||
183 | if (_thread_num_option.length()) { | |
184 | ldout(cct, 10) << " registering config observer on " << _thread_num_option << dendl; | |
185 | cct->_conf->add_observer(this); | |
186 | } | |
187 | ||
188 | _lock.Lock(); | |
189 | start_threads(); | |
190 | _lock.Unlock(); | |
191 | ldout(cct,15) << "started" << dendl; | |
192 | } | |
193 | ||
194 | void ThreadPool::stop(bool clear_after) | |
195 | { | |
196 | ldout(cct,10) << "stop" << dendl; | |
197 | ||
198 | if (_thread_num_option.length()) { | |
199 | ldout(cct, 10) << " unregistering config observer on " << _thread_num_option << dendl; | |
200 | cct->_conf->remove_observer(this); | |
201 | } | |
202 | ||
203 | _lock.Lock(); | |
204 | _stop = true; | |
205 | _cond.Signal(); | |
206 | join_old_threads(); | |
207 | _lock.Unlock(); | |
208 | for (set<WorkThread*>::iterator p = _threads.begin(); | |
209 | p != _threads.end(); | |
210 | ++p) { | |
211 | (*p)->join(); | |
212 | delete *p; | |
213 | } | |
214 | _threads.clear(); | |
215 | _lock.Lock(); | |
216 | for (unsigned i=0; i<work_queues.size(); i++) | |
217 | work_queues[i]->_clear(); | |
218 | _stop = false; | |
219 | _lock.Unlock(); | |
220 | ldout(cct,15) << "stopped" << dendl; | |
221 | } | |
222 | ||
223 | void ThreadPool::pause() | |
224 | { | |
225 | ldout(cct,10) << "pause" << dendl; | |
226 | _lock.Lock(); | |
227 | _pause++; | |
228 | while (processing) | |
229 | _wait_cond.Wait(_lock); | |
230 | _lock.Unlock(); | |
231 | ldout(cct,15) << "paused" << dendl; | |
232 | } | |
233 | ||
234 | void ThreadPool::pause_new() | |
235 | { | |
236 | ldout(cct,10) << "pause_new" << dendl; | |
237 | _lock.Lock(); | |
238 | _pause++; | |
239 | _lock.Unlock(); | |
240 | } | |
241 | ||
242 | void ThreadPool::unpause() | |
243 | { | |
244 | ldout(cct,10) << "unpause" << dendl; | |
245 | _lock.Lock(); | |
246 | assert(_pause > 0); | |
247 | _pause--; | |
248 | _cond.Signal(); | |
249 | _lock.Unlock(); | |
250 | } | |
251 | ||
252 | void ThreadPool::drain(WorkQueue_* wq) | |
253 | { | |
254 | ldout(cct,10) << "drain" << dendl; | |
255 | _lock.Lock(); | |
256 | _draining++; | |
257 | while (processing || (wq != NULL && !wq->_empty())) | |
258 | _wait_cond.Wait(_lock); | |
259 | _draining--; | |
260 | _lock.Unlock(); | |
261 | } | |
262 | ||
263 | void ThreadPool::set_ioprio(int cls, int priority) | |
264 | { | |
265 | Mutex::Locker l(_lock); | |
266 | ioprio_class = cls; | |
267 | ioprio_priority = priority; | |
268 | for (set<WorkThread*>::iterator p = _threads.begin(); | |
269 | p != _threads.end(); | |
270 | ++p) { | |
271 | ldout(cct,10) << __func__ | |
272 | << " class " << cls << " priority " << priority | |
273 | << " pid " << (*p)->get_pid() | |
274 | << dendl; | |
275 | int r = (*p)->set_ioprio(cls, priority); | |
276 | if (r < 0) | |
277 | lderr(cct) << " set_ioprio got " << cpp_strerror(r) << dendl; | |
278 | } | |
279 | } | |
280 | ||
281 | ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, string nm, string tn, | |
282 | uint32_t pnum_threads): | |
283 | cct(pcct_), | |
284 | name(std::move(nm)), | |
285 | thread_name(std::move(tn)), | |
286 | lockname(name + "::lock"), | |
287 | shardedpool_lock(lockname.c_str()), | |
288 | num_threads(pnum_threads), | |
289 | num_paused(0), | |
290 | num_drained(0), | |
291 | wq(NULL) {} | |
292 | ||
293 | void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index) | |
294 | { | |
295 | assert(wq != NULL); | |
296 | ldout(cct,10) << "worker start" << dendl; | |
297 | ||
298 | std::stringstream ss; | |
299 | ss << name << " thread " << (void *)pthread_self(); | |
300 | heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self()); | |
301 | ||
302 | while (!stop_threads) { | |
303 | if (pause_threads) { | |
304 | shardedpool_lock.Lock(); | |
305 | ++num_paused; | |
306 | wait_cond.Signal(); | |
307 | while (pause_threads) { | |
308 | cct->get_heartbeat_map()->reset_timeout( | |
309 | hb, | |
310 | wq->timeout_interval, wq->suicide_interval); | |
311 | shardedpool_cond.WaitInterval(shardedpool_lock, | |
312 | utime_t( | |
313 | cct->_conf->threadpool_empty_queue_max_wait, 0)); | |
314 | } | |
315 | --num_paused; | |
316 | shardedpool_lock.Unlock(); | |
317 | } | |
318 | if (drain_threads) { | |
319 | shardedpool_lock.Lock(); | |
320 | if (wq->is_shard_empty(thread_index)) { | |
321 | ++num_drained; | |
322 | wait_cond.Signal(); | |
323 | while (drain_threads) { | |
324 | cct->get_heartbeat_map()->reset_timeout( | |
325 | hb, | |
326 | wq->timeout_interval, wq->suicide_interval); | |
327 | shardedpool_cond.WaitInterval(shardedpool_lock, | |
328 | utime_t( | |
329 | cct->_conf->threadpool_empty_queue_max_wait, 0)); | |
330 | } | |
331 | --num_drained; | |
332 | } | |
333 | shardedpool_lock.Unlock(); | |
334 | } | |
335 | ||
336 | cct->get_heartbeat_map()->reset_timeout( | |
337 | hb, | |
338 | wq->timeout_interval, wq->suicide_interval); | |
339 | wq->_process(thread_index, hb); | |
340 | ||
341 | } | |
342 | ||
343 | ldout(cct,10) << "sharded worker finish" << dendl; | |
344 | ||
345 | cct->get_heartbeat_map()->remove_worker(hb); | |
346 | ||
347 | } | |
348 | ||
349 | void ShardedThreadPool::start_threads() | |
350 | { | |
351 | assert(shardedpool_lock.is_locked()); | |
352 | int32_t thread_index = 0; | |
353 | while (threads_shardedpool.size() < num_threads) { | |
354 | ||
355 | WorkThreadSharded *wt = new WorkThreadSharded(this, thread_index); | |
356 | ldout(cct, 10) << "start_threads creating and starting " << wt << dendl; | |
357 | threads_shardedpool.push_back(wt); | |
358 | wt->create(thread_name.c_str()); | |
359 | thread_index++; | |
360 | } | |
361 | } | |
362 | ||
363 | void ShardedThreadPool::start() | |
364 | { | |
365 | ldout(cct,10) << "start" << dendl; | |
366 | ||
367 | shardedpool_lock.Lock(); | |
368 | start_threads(); | |
369 | shardedpool_lock.Unlock(); | |
370 | ldout(cct,15) << "started" << dendl; | |
371 | } | |
372 | ||
373 | void ShardedThreadPool::stop() | |
374 | { | |
375 | ldout(cct,10) << "stop" << dendl; | |
376 | stop_threads = true; | |
377 | assert(wq != NULL); | |
378 | wq->return_waiting_threads(); | |
379 | for (vector<WorkThreadSharded*>::iterator p = threads_shardedpool.begin(); | |
380 | p != threads_shardedpool.end(); | |
381 | ++p) { | |
382 | (*p)->join(); | |
383 | delete *p; | |
384 | } | |
385 | threads_shardedpool.clear(); | |
386 | ldout(cct,15) << "stopped" << dendl; | |
387 | } | |
388 | ||
389 | void ShardedThreadPool::pause() | |
390 | { | |
391 | ldout(cct,10) << "pause" << dendl; | |
392 | shardedpool_lock.Lock(); | |
393 | pause_threads = true; | |
394 | assert(wq != NULL); | |
395 | wq->return_waiting_threads(); | |
396 | while (num_threads != num_paused){ | |
397 | wait_cond.Wait(shardedpool_lock); | |
398 | } | |
399 | shardedpool_lock.Unlock(); | |
400 | ldout(cct,10) << "paused" << dendl; | |
401 | } | |
402 | ||
403 | void ShardedThreadPool::pause_new() | |
404 | { | |
405 | ldout(cct,10) << "pause_new" << dendl; | |
406 | shardedpool_lock.Lock(); | |
407 | pause_threads = true; | |
408 | assert(wq != NULL); | |
409 | wq->return_waiting_threads(); | |
410 | shardedpool_lock.Unlock(); | |
411 | ldout(cct,10) << "paused_new" << dendl; | |
412 | } | |
413 | ||
414 | void ShardedThreadPool::unpause() | |
415 | { | |
416 | ldout(cct,10) << "unpause" << dendl; | |
417 | shardedpool_lock.Lock(); | |
418 | pause_threads = false; | |
419 | shardedpool_cond.Signal(); | |
420 | shardedpool_lock.Unlock(); | |
421 | ldout(cct,10) << "unpaused" << dendl; | |
422 | } | |
423 | ||
424 | void ShardedThreadPool::drain() | |
425 | { | |
426 | ldout(cct,10) << "drain" << dendl; | |
427 | shardedpool_lock.Lock(); | |
428 | drain_threads = true; | |
429 | assert(wq != NULL); | |
430 | wq->return_waiting_threads(); | |
431 | while (num_threads != num_drained) { | |
432 | wait_cond.Wait(shardedpool_lock); | |
433 | } | |
434 | drain_threads = false; | |
435 | shardedpool_cond.Signal(); | |
436 | shardedpool_lock.Unlock(); | |
437 | ldout(cct,10) << "drained" << dendl; | |
438 | } | |
439 |