]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
31f18b77 | 15 | #include "WorkQueue.h" |
7c673cae | 16 | #include "include/compat.h" |
7c673cae | 17 | #include "common/errno.h" |
7c673cae FG |
18 | |
19 | #define dout_subsys ceph_subsys_tp | |
20 | #undef dout_prefix | |
21 | #define dout_prefix *_dout << name << " " | |
22 | ||
f67539c2 | 23 | ThreadPool::ThreadPool(CephContext *cct_, std::string nm, std::string tn, int n, const char *option) |
7c673cae FG |
24 | : cct(cct_), name(std::move(nm)), thread_name(std::move(tn)), |
25 | lockname(name + "::lock"), | |
11fdf7f2 | 26 | _lock(ceph::make_mutex(lockname)), // this should be safe due to declaration order |
7c673cae FG |
27 | _stop(false), |
28 | _pause(0), | |
29 | _draining(0), | |
7c673cae FG |
30 | _num_threads(n), |
31 | processing(0) | |
32 | { | |
33 | if (option) { | |
34 | _thread_num_option = option; | |
35 | // set up conf_keys | |
36 | _conf_keys = new const char*[2]; | |
37 | _conf_keys[0] = _thread_num_option.c_str(); | |
38 | _conf_keys[1] = NULL; | |
39 | } else { | |
40 | _conf_keys = new const char*[1]; | |
41 | _conf_keys[0] = NULL; | |
42 | } | |
43 | } | |
44 | ||
45 | void ThreadPool::TPHandle::suspend_tp_timeout() | |
46 | { | |
47 | cct->get_heartbeat_map()->clear_timeout(hb); | |
48 | } | |
49 | ||
50 | void ThreadPool::TPHandle::reset_tp_timeout() | |
51 | { | |
52 | cct->get_heartbeat_map()->reset_timeout( | |
53 | hb, grace, suicide_grace); | |
54 | } | |
55 | ||
56 | ThreadPool::~ThreadPool() | |
57 | { | |
11fdf7f2 | 58 | ceph_assert(_threads.empty()); |
7c673cae FG |
59 | delete[] _conf_keys; |
60 | } | |
61 | ||
11fdf7f2 | 62 | void ThreadPool::handle_conf_change(const ConfigProxy& conf, |
7c673cae FG |
63 | const std::set <std::string> &changed) |
64 | { | |
65 | if (changed.count(_thread_num_option)) { | |
66 | char *buf; | |
11fdf7f2 TL |
67 | int r = conf.get_val(_thread_num_option.c_str(), &buf, -1); |
68 | ceph_assert(r >= 0); | |
7c673cae FG |
69 | int v = atoi(buf); |
70 | free(buf); | |
71 | if (v >= 0) { | |
11fdf7f2 | 72 | _lock.lock(); |
7c673cae FG |
73 | _num_threads = v; |
74 | start_threads(); | |
11fdf7f2 TL |
75 | _cond.notify_all(); |
76 | _lock.unlock(); | |
7c673cae FG |
77 | } |
78 | } | |
79 | } | |
80 | ||
81 | void ThreadPool::worker(WorkThread *wt) | |
82 | { | |
11fdf7f2 | 83 | std::unique_lock ul(_lock); |
7c673cae | 84 | ldout(cct,10) << "worker start" << dendl; |
f67539c2 | 85 | |
7c673cae | 86 | std::stringstream ss; |
c07f9fc5 | 87 | ss << name << " thread " << (void *)pthread_self(); |
f67539c2 | 88 | auto hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self()); |
7c673cae FG |
89 | |
90 | while (!_stop) { | |
91 | ||
92 | // manage dynamic thread pool | |
93 | join_old_threads(); | |
94 | if (_threads.size() > _num_threads) { | |
95 | ldout(cct,1) << " worker shutting down; too many threads (" << _threads.size() << " > " << _num_threads << ")" << dendl; | |
96 | _threads.erase(wt); | |
97 | _old_threads.push_back(wt); | |
98 | break; | |
99 | } | |
100 | ||
20effc67 TL |
101 | if (work_queues.empty()) { |
102 | ldout(cct, 10) << "worker no work queues" << dendl; | |
103 | } else if (!_pause) { | |
7c673cae | 104 | WorkQueue_* wq; |
11fdf7f2 | 105 | int tries = 2 * work_queues.size(); |
7c673cae FG |
106 | bool did = false; |
107 | while (tries--) { | |
108 | next_work_queue %= work_queues.size(); | |
109 | wq = work_queues[next_work_queue++]; | |
110 | ||
111 | void *item = wq->_void_dequeue(); | |
112 | if (item) { | |
113 | processing++; | |
114 | ldout(cct,12) << "worker wq " << wq->name << " start processing " << item | |
115 | << " (" << processing << " active)" << dendl; | |
9f95a23c | 116 | ul.unlock(); |
7c673cae FG |
117 | TPHandle tp_handle(cct, hb, wq->timeout_interval, wq->suicide_interval); |
118 | tp_handle.reset_tp_timeout(); | |
7c673cae | 119 | wq->_void_process(item, tp_handle); |
11fdf7f2 | 120 | ul.lock(); |
7c673cae FG |
121 | wq->_void_process_finish(item); |
122 | processing--; | |
123 | ldout(cct,15) << "worker wq " << wq->name << " done processing " << item | |
124 | << " (" << processing << " active)" << dendl; | |
125 | if (_pause || _draining) | |
11fdf7f2 | 126 | _wait_cond.notify_all(); |
7c673cae FG |
127 | did = true; |
128 | break; | |
129 | } | |
130 | } | |
131 | if (did) | |
132 | continue; | |
133 | } | |
134 | ||
135 | ldout(cct,20) << "worker waiting" << dendl; | |
136 | cct->get_heartbeat_map()->reset_timeout( | |
137 | hb, | |
f67539c2 TL |
138 | ceph::make_timespan(cct->_conf->threadpool_default_timeout), |
139 | ceph::make_timespan(0)); | |
11fdf7f2 TL |
140 | auto wait = std::chrono::seconds( |
141 | cct->_conf->threadpool_empty_queue_max_wait); | |
142 | _cond.wait_for(ul, wait); | |
7c673cae FG |
143 | } |
144 | ldout(cct,1) << "worker finish" << dendl; | |
145 | ||
146 | cct->get_heartbeat_map()->remove_worker(hb); | |
7c673cae FG |
147 | } |
148 | ||
149 | void ThreadPool::start_threads() | |
150 | { | |
11fdf7f2 | 151 | ceph_assert(ceph_mutex_is_locked(_lock)); |
7c673cae FG |
152 | while (_threads.size() < _num_threads) { |
153 | WorkThread *wt = new WorkThread(this); | |
154 | ldout(cct, 10) << "start_threads creating and starting " << wt << dendl; | |
155 | _threads.insert(wt); | |
156 | ||
7c673cae FG |
157 | wt->create(thread_name.c_str()); |
158 | } | |
159 | } | |
160 | ||
161 | void ThreadPool::join_old_threads() | |
162 | { | |
11fdf7f2 | 163 | ceph_assert(ceph_mutex_is_locked(_lock)); |
7c673cae FG |
164 | while (!_old_threads.empty()) { |
165 | ldout(cct, 10) << "join_old_threads joining and deleting " << _old_threads.front() << dendl; | |
166 | _old_threads.front()->join(); | |
167 | delete _old_threads.front(); | |
168 | _old_threads.pop_front(); | |
169 | } | |
170 | } | |
171 | ||
172 | void ThreadPool::start() | |
173 | { | |
174 | ldout(cct,10) << "start" << dendl; | |
175 | ||
176 | if (_thread_num_option.length()) { | |
177 | ldout(cct, 10) << " registering config observer on " << _thread_num_option << dendl; | |
11fdf7f2 | 178 | cct->_conf.add_observer(this); |
7c673cae FG |
179 | } |
180 | ||
11fdf7f2 | 181 | _lock.lock(); |
7c673cae | 182 | start_threads(); |
11fdf7f2 | 183 | _lock.unlock(); |
7c673cae FG |
184 | ldout(cct,15) << "started" << dendl; |
185 | } | |
186 | ||
187 | void ThreadPool::stop(bool clear_after) | |
188 | { | |
189 | ldout(cct,10) << "stop" << dendl; | |
190 | ||
191 | if (_thread_num_option.length()) { | |
192 | ldout(cct, 10) << " unregistering config observer on " << _thread_num_option << dendl; | |
11fdf7f2 | 193 | cct->_conf.remove_observer(this); |
7c673cae FG |
194 | } |
195 | ||
11fdf7f2 | 196 | _lock.lock(); |
7c673cae | 197 | _stop = true; |
11fdf7f2 | 198 | _cond.notify_all(); |
7c673cae | 199 | join_old_threads(); |
11fdf7f2 | 200 | _lock.unlock(); |
f67539c2 | 201 | for (auto p = _threads.begin(); p != _threads.end(); ++p) { |
7c673cae FG |
202 | (*p)->join(); |
203 | delete *p; | |
204 | } | |
205 | _threads.clear(); | |
11fdf7f2 | 206 | _lock.lock(); |
7c673cae FG |
207 | for (unsigned i=0; i<work_queues.size(); i++) |
208 | work_queues[i]->_clear(); | |
209 | _stop = false; | |
11fdf7f2 | 210 | _lock.unlock(); |
7c673cae FG |
211 | ldout(cct,15) << "stopped" << dendl; |
212 | } | |
213 | ||
214 | void ThreadPool::pause() | |
215 | { | |
11fdf7f2 | 216 | std::unique_lock ul(_lock); |
7c673cae | 217 | ldout(cct,10) << "pause" << dendl; |
7c673cae | 218 | _pause++; |
11fdf7f2 TL |
219 | while (processing) { |
220 | _wait_cond.wait(ul); | |
221 | } | |
7c673cae FG |
222 | ldout(cct,15) << "paused" << dendl; |
223 | } | |
224 | ||
225 | void ThreadPool::pause_new() | |
226 | { | |
227 | ldout(cct,10) << "pause_new" << dendl; | |
11fdf7f2 | 228 | _lock.lock(); |
7c673cae | 229 | _pause++; |
11fdf7f2 | 230 | _lock.unlock(); |
7c673cae FG |
231 | } |
232 | ||
233 | void ThreadPool::unpause() | |
234 | { | |
235 | ldout(cct,10) << "unpause" << dendl; | |
11fdf7f2 TL |
236 | _lock.lock(); |
237 | ceph_assert(_pause > 0); | |
7c673cae | 238 | _pause--; |
11fdf7f2 TL |
239 | _cond.notify_all(); |
240 | _lock.unlock(); | |
7c673cae FG |
241 | } |
242 | ||
243 | void ThreadPool::drain(WorkQueue_* wq) | |
244 | { | |
11fdf7f2 | 245 | std::unique_lock ul(_lock); |
7c673cae | 246 | ldout(cct,10) << "drain" << dendl; |
7c673cae | 247 | _draining++; |
11fdf7f2 TL |
248 | while (processing || (wq != NULL && !wq->_empty())) { |
249 | _wait_cond.wait(ul); | |
7c673cae | 250 | } |
11fdf7f2 | 251 | _draining--; |
7c673cae FG |
252 | } |
253 | ||
f67539c2 TL |
254 | ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, std::string nm, std::string tn, |
255 | uint32_t pnum_threads): | |
7c673cae FG |
256 | cct(pcct_), |
257 | name(std::move(nm)), | |
258 | thread_name(std::move(tn)), | |
259 | lockname(name + "::lock"), | |
11fdf7f2 | 260 | shardedpool_lock(ceph::make_mutex(lockname)), |
7c673cae | 261 | num_threads(pnum_threads), |
7c673cae FG |
262 | num_paused(0), |
263 | num_drained(0), | |
264 | wq(NULL) {} | |
265 | ||
266 | void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index) | |
267 | { | |
11fdf7f2 | 268 | ceph_assert(wq != NULL); |
7c673cae FG |
269 | ldout(cct,10) << "worker start" << dendl; |
270 | ||
271 | std::stringstream ss; | |
c07f9fc5 | 272 | ss << name << " thread " << (void *)pthread_self(); |
f67539c2 | 273 | auto hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self()); |
7c673cae | 274 | |
31f18b77 FG |
275 | while (!stop_threads) { |
276 | if (pause_threads) { | |
11fdf7f2 | 277 | std::unique_lock ul(shardedpool_lock); |
7c673cae | 278 | ++num_paused; |
11fdf7f2 | 279 | wait_cond.notify_all(); |
31f18b77 | 280 | while (pause_threads) { |
7c673cae | 281 | cct->get_heartbeat_map()->reset_timeout( |
31f18b77 | 282 | hb, |
f67539c2 TL |
283 | wq->timeout_interval, |
284 | wq->suicide_interval); | |
11fdf7f2 TL |
285 | shardedpool_cond.wait_for( |
286 | ul, | |
287 | std::chrono::seconds(cct->_conf->threadpool_empty_queue_max_wait)); | |
7c673cae FG |
288 | } |
289 | --num_paused; | |
7c673cae | 290 | } |
31f18b77 | 291 | if (drain_threads) { |
11fdf7f2 | 292 | std::unique_lock ul(shardedpool_lock); |
7c673cae FG |
293 | if (wq->is_shard_empty(thread_index)) { |
294 | ++num_drained; | |
11fdf7f2 | 295 | wait_cond.notify_all(); |
31f18b77 | 296 | while (drain_threads) { |
7c673cae FG |
297 | cct->get_heartbeat_map()->reset_timeout( |
298 | hb, | |
f67539c2 TL |
299 | wq->timeout_interval, |
300 | wq->suicide_interval); | |
11fdf7f2 TL |
301 | shardedpool_cond.wait_for( |
302 | ul, | |
303 | std::chrono::seconds(cct->_conf->threadpool_empty_queue_max_wait)); | |
7c673cae FG |
304 | } |
305 | --num_drained; | |
306 | } | |
7c673cae FG |
307 | } |
308 | ||
309 | cct->get_heartbeat_map()->reset_timeout( | |
310 | hb, | |
f67539c2 TL |
311 | wq->timeout_interval, |
312 | wq->suicide_interval); | |
7c673cae FG |
313 | wq->_process(thread_index, hb); |
314 | ||
315 | } | |
316 | ||
317 | ldout(cct,10) << "sharded worker finish" << dendl; | |
318 | ||
319 | cct->get_heartbeat_map()->remove_worker(hb); | |
320 | ||
321 | } | |
322 | ||
323 | void ShardedThreadPool::start_threads() | |
324 | { | |
11fdf7f2 | 325 | ceph_assert(ceph_mutex_is_locked(shardedpool_lock)); |
7c673cae FG |
326 | int32_t thread_index = 0; |
327 | while (threads_shardedpool.size() < num_threads) { | |
328 | ||
329 | WorkThreadSharded *wt = new WorkThreadSharded(this, thread_index); | |
330 | ldout(cct, 10) << "start_threads creating and starting " << wt << dendl; | |
331 | threads_shardedpool.push_back(wt); | |
332 | wt->create(thread_name.c_str()); | |
333 | thread_index++; | |
334 | } | |
335 | } | |
336 | ||
337 | void ShardedThreadPool::start() | |
338 | { | |
339 | ldout(cct,10) << "start" << dendl; | |
340 | ||
11fdf7f2 | 341 | shardedpool_lock.lock(); |
7c673cae | 342 | start_threads(); |
11fdf7f2 | 343 | shardedpool_lock.unlock(); |
7c673cae FG |
344 | ldout(cct,15) << "started" << dendl; |
345 | } | |
346 | ||
347 | void ShardedThreadPool::stop() | |
348 | { | |
349 | ldout(cct,10) << "stop" << dendl; | |
31f18b77 | 350 | stop_threads = true; |
11fdf7f2 | 351 | ceph_assert(wq != NULL); |
7c673cae | 352 | wq->return_waiting_threads(); |
f67539c2 | 353 | for (auto p = threads_shardedpool.begin(); |
7c673cae FG |
354 | p != threads_shardedpool.end(); |
355 | ++p) { | |
356 | (*p)->join(); | |
357 | delete *p; | |
358 | } | |
359 | threads_shardedpool.clear(); | |
360 | ldout(cct,15) << "stopped" << dendl; | |
361 | } | |
362 | ||
363 | void ShardedThreadPool::pause() | |
364 | { | |
11fdf7f2 | 365 | std::unique_lock ul(shardedpool_lock); |
7c673cae | 366 | ldout(cct,10) << "pause" << dendl; |
31f18b77 | 367 | pause_threads = true; |
11fdf7f2 | 368 | ceph_assert(wq != NULL); |
7c673cae FG |
369 | wq->return_waiting_threads(); |
370 | while (num_threads != num_paused){ | |
11fdf7f2 | 371 | wait_cond.wait(ul); |
7c673cae | 372 | } |
7c673cae FG |
373 | ldout(cct,10) << "paused" << dendl; |
374 | } | |
375 | ||
376 | void ShardedThreadPool::pause_new() | |
377 | { | |
378 | ldout(cct,10) << "pause_new" << dendl; | |
11fdf7f2 | 379 | shardedpool_lock.lock(); |
31f18b77 | 380 | pause_threads = true; |
11fdf7f2 | 381 | ceph_assert(wq != NULL); |
7c673cae | 382 | wq->return_waiting_threads(); |
11fdf7f2 | 383 | shardedpool_lock.unlock(); |
7c673cae FG |
384 | ldout(cct,10) << "paused_new" << dendl; |
385 | } | |
386 | ||
387 | void ShardedThreadPool::unpause() | |
388 | { | |
389 | ldout(cct,10) << "unpause" << dendl; | |
11fdf7f2 | 390 | shardedpool_lock.lock(); |
31f18b77 | 391 | pause_threads = false; |
11fdf7f2 TL |
392 | wq->stop_return_waiting_threads(); |
393 | shardedpool_cond.notify_all(); | |
394 | shardedpool_lock.unlock(); | |
7c673cae FG |
395 | ldout(cct,10) << "unpaused" << dendl; |
396 | } | |
397 | ||
398 | void ShardedThreadPool::drain() | |
399 | { | |
11fdf7f2 | 400 | std::unique_lock ul(shardedpool_lock); |
7c673cae | 401 | ldout(cct,10) << "drain" << dendl; |
31f18b77 | 402 | drain_threads = true; |
11fdf7f2 | 403 | ceph_assert(wq != NULL); |
7c673cae FG |
404 | wq->return_waiting_threads(); |
405 | while (num_threads != num_drained) { | |
11fdf7f2 | 406 | wait_cond.wait(ul); |
7c673cae | 407 | } |
31f18b77 | 408 | drain_threads = false; |
11fdf7f2 TL |
409 | wq->stop_return_waiting_threads(); |
410 | shardedpool_cond.notify_all(); | |
7c673cae FG |
411 | ldout(cct,10) << "drained" << dendl; |
412 | } | |
413 |