]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/WorkQueue.cc
update ceph source to reef 18.2.0
[ceph.git] / ceph / src / common / WorkQueue.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "WorkQueue.h"
16 #include "include/compat.h"
17 #include "common/errno.h"
18
19 #define dout_subsys ceph_subsys_tp
20 #undef dout_prefix
21 #define dout_prefix *_dout << name << " "
22
23 ThreadPool::ThreadPool(CephContext *cct_, std::string nm, std::string tn, int n, const char *option)
24 : cct(cct_), name(std::move(nm)), thread_name(std::move(tn)),
25 lockname(name + "::lock"),
26 _lock(ceph::make_mutex(lockname)), // this should be safe due to declaration order
27 _stop(false),
28 _pause(0),
29 _draining(0),
30 _num_threads(n),
31 processing(0)
32 {
33 if (option) {
34 _thread_num_option = option;
35 // set up conf_keys
36 _conf_keys = new const char*[2];
37 _conf_keys[0] = _thread_num_option.c_str();
38 _conf_keys[1] = NULL;
39 } else {
40 _conf_keys = new const char*[1];
41 _conf_keys[0] = NULL;
42 }
43 }
44
45 void ThreadPool::TPHandle::suspend_tp_timeout()
46 {
47 cct->get_heartbeat_map()->clear_timeout(hb);
48 }
49
50 void ThreadPool::TPHandle::reset_tp_timeout()
51 {
52 cct->get_heartbeat_map()->reset_timeout(
53 hb, grace, suicide_grace);
54 }
55
56 ThreadPool::~ThreadPool()
57 {
58 ceph_assert(_threads.empty());
59 delete[] _conf_keys;
60 }
61
62 void ThreadPool::handle_conf_change(const ConfigProxy& conf,
63 const std::set <std::string> &changed)
64 {
65 if (changed.count(_thread_num_option)) {
66 char *buf;
67 int r = conf.get_val(_thread_num_option.c_str(), &buf, -1);
68 ceph_assert(r >= 0);
69 int v = atoi(buf);
70 free(buf);
71 if (v >= 0) {
72 _lock.lock();
73 _num_threads = v;
74 start_threads();
75 _cond.notify_all();
76 _lock.unlock();
77 }
78 }
79 }
80
81 void ThreadPool::worker(WorkThread *wt)
82 {
83 std::unique_lock ul(_lock);
84 ldout(cct,10) << "worker start" << dendl;
85
86 std::stringstream ss;
87 ss << name << " thread " << (void *)pthread_self();
88 auto hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self());
89
90 while (!_stop) {
91
92 // manage dynamic thread pool
93 join_old_threads();
94 if (_threads.size() > _num_threads) {
95 ldout(cct,1) << " worker shutting down; too many threads (" << _threads.size() << " > " << _num_threads << ")" << dendl;
96 _threads.erase(wt);
97 _old_threads.push_back(wt);
98 break;
99 }
100
101 if (work_queues.empty()) {
102 ldout(cct, 10) << "worker no work queues" << dendl;
103 } else if (!_pause) {
104 WorkQueue_* wq;
105 int tries = 2 * work_queues.size();
106 bool did = false;
107 while (tries--) {
108 next_work_queue %= work_queues.size();
109 wq = work_queues[next_work_queue++];
110
111 void *item = wq->_void_dequeue();
112 if (item) {
113 processing++;
114 ldout(cct,12) << "worker wq " << wq->name << " start processing " << item
115 << " (" << processing << " active)" << dendl;
116 ul.unlock();
117 TPHandle tp_handle(cct, hb, wq->timeout_interval, wq->suicide_interval);
118 tp_handle.reset_tp_timeout();
119 wq->_void_process(item, tp_handle);
120 ul.lock();
121 wq->_void_process_finish(item);
122 processing--;
123 ldout(cct,15) << "worker wq " << wq->name << " done processing " << item
124 << " (" << processing << " active)" << dendl;
125 if (_pause || _draining)
126 _wait_cond.notify_all();
127 did = true;
128 break;
129 }
130 }
131 if (did)
132 continue;
133 }
134
135 ldout(cct,20) << "worker waiting" << dendl;
136 cct->get_heartbeat_map()->reset_timeout(
137 hb,
138 ceph::make_timespan(cct->_conf->threadpool_default_timeout),
139 ceph::make_timespan(0));
140 auto wait = std::chrono::seconds(
141 cct->_conf->threadpool_empty_queue_max_wait);
142 _cond.wait_for(ul, wait);
143 }
144 ldout(cct,1) << "worker finish" << dendl;
145
146 cct->get_heartbeat_map()->remove_worker(hb);
147 }
148
149 void ThreadPool::start_threads()
150 {
151 ceph_assert(ceph_mutex_is_locked(_lock));
152 while (_threads.size() < _num_threads) {
153 WorkThread *wt = new WorkThread(this);
154 ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;
155 _threads.insert(wt);
156
157 wt->create(thread_name.c_str());
158 }
159 }
160
161 void ThreadPool::join_old_threads()
162 {
163 ceph_assert(ceph_mutex_is_locked(_lock));
164 while (!_old_threads.empty()) {
165 ldout(cct, 10) << "join_old_threads joining and deleting " << _old_threads.front() << dendl;
166 _old_threads.front()->join();
167 delete _old_threads.front();
168 _old_threads.pop_front();
169 }
170 }
171
172 void ThreadPool::start()
173 {
174 ldout(cct,10) << "start" << dendl;
175
176 if (_thread_num_option.length()) {
177 ldout(cct, 10) << " registering config observer on " << _thread_num_option << dendl;
178 cct->_conf.add_observer(this);
179 }
180
181 _lock.lock();
182 start_threads();
183 _lock.unlock();
184 ldout(cct,15) << "started" << dendl;
185 }
186
187 void ThreadPool::stop(bool clear_after)
188 {
189 ldout(cct,10) << "stop" << dendl;
190
191 if (_thread_num_option.length()) {
192 ldout(cct, 10) << " unregistering config observer on " << _thread_num_option << dendl;
193 cct->_conf.remove_observer(this);
194 }
195
196 _lock.lock();
197 _stop = true;
198 _cond.notify_all();
199 join_old_threads();
200 _lock.unlock();
201 for (auto p = _threads.begin(); p != _threads.end(); ++p) {
202 (*p)->join();
203 delete *p;
204 }
205 _threads.clear();
206 _lock.lock();
207 for (unsigned i=0; i<work_queues.size(); i++)
208 work_queues[i]->_clear();
209 _stop = false;
210 _lock.unlock();
211 ldout(cct,15) << "stopped" << dendl;
212 }
213
214 void ThreadPool::pause()
215 {
216 std::unique_lock ul(_lock);
217 ldout(cct,10) << "pause" << dendl;
218 _pause++;
219 while (processing) {
220 _wait_cond.wait(ul);
221 }
222 ldout(cct,15) << "paused" << dendl;
223 }
224
225 void ThreadPool::pause_new()
226 {
227 ldout(cct,10) << "pause_new" << dendl;
228 _lock.lock();
229 _pause++;
230 _lock.unlock();
231 }
232
233 void ThreadPool::unpause()
234 {
235 ldout(cct,10) << "unpause" << dendl;
236 _lock.lock();
237 ceph_assert(_pause > 0);
238 _pause--;
239 _cond.notify_all();
240 _lock.unlock();
241 }
242
243 void ThreadPool::drain(WorkQueue_* wq)
244 {
245 std::unique_lock ul(_lock);
246 ldout(cct,10) << "drain" << dendl;
247 _draining++;
248 while (processing || (wq != NULL && !wq->_empty())) {
249 _wait_cond.wait(ul);
250 }
251 _draining--;
252 }
253
254 ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, std::string nm, std::string tn,
255 uint32_t pnum_threads):
256 cct(pcct_),
257 name(std::move(nm)),
258 thread_name(std::move(tn)),
259 lockname(name + "::lock"),
260 shardedpool_lock(ceph::make_mutex(lockname)),
261 num_threads(pnum_threads),
262 num_paused(0),
263 num_drained(0),
264 wq(NULL) {}
265
266 void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
267 {
268 ceph_assert(wq != NULL);
269 ldout(cct,10) << "worker start" << dendl;
270
271 std::stringstream ss;
272 ss << name << " thread " << (void *)pthread_self();
273 auto hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self());
274
275 while (!stop_threads) {
276 if (pause_threads) {
277 std::unique_lock ul(shardedpool_lock);
278 ++num_paused;
279 wait_cond.notify_all();
280 while (pause_threads) {
281 cct->get_heartbeat_map()->reset_timeout(
282 hb,
283 wq->timeout_interval,
284 wq->suicide_interval);
285 shardedpool_cond.wait_for(
286 ul,
287 std::chrono::seconds(cct->_conf->threadpool_empty_queue_max_wait));
288 }
289 --num_paused;
290 }
291 if (drain_threads) {
292 std::unique_lock ul(shardedpool_lock);
293 if (wq->is_shard_empty(thread_index)) {
294 ++num_drained;
295 wait_cond.notify_all();
296 while (drain_threads) {
297 cct->get_heartbeat_map()->reset_timeout(
298 hb,
299 wq->timeout_interval,
300 wq->suicide_interval);
301 shardedpool_cond.wait_for(
302 ul,
303 std::chrono::seconds(cct->_conf->threadpool_empty_queue_max_wait));
304 }
305 --num_drained;
306 }
307 }
308
309 cct->get_heartbeat_map()->reset_timeout(
310 hb,
311 wq->timeout_interval,
312 wq->suicide_interval);
313 wq->_process(thread_index, hb);
314
315 }
316
317 ldout(cct,10) << "sharded worker finish" << dendl;
318
319 cct->get_heartbeat_map()->remove_worker(hb);
320
321 }
322
323 void ShardedThreadPool::start_threads()
324 {
325 ceph_assert(ceph_mutex_is_locked(shardedpool_lock));
326 int32_t thread_index = 0;
327 while (threads_shardedpool.size() < num_threads) {
328
329 WorkThreadSharded *wt = new WorkThreadSharded(this, thread_index);
330 ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;
331 threads_shardedpool.push_back(wt);
332 wt->create(thread_name.c_str());
333 thread_index++;
334 }
335 }
336
337 void ShardedThreadPool::start()
338 {
339 ldout(cct,10) << "start" << dendl;
340
341 shardedpool_lock.lock();
342 start_threads();
343 shardedpool_lock.unlock();
344 ldout(cct,15) << "started" << dendl;
345 }
346
347 void ShardedThreadPool::stop()
348 {
349 ldout(cct,10) << "stop" << dendl;
350 stop_threads = true;
351 ceph_assert(wq != NULL);
352 wq->return_waiting_threads();
353 for (auto p = threads_shardedpool.begin();
354 p != threads_shardedpool.end();
355 ++p) {
356 (*p)->join();
357 delete *p;
358 }
359 threads_shardedpool.clear();
360 ldout(cct,15) << "stopped" << dendl;
361 }
362
363 void ShardedThreadPool::pause()
364 {
365 std::unique_lock ul(shardedpool_lock);
366 ldout(cct,10) << "pause" << dendl;
367 pause_threads = true;
368 ceph_assert(wq != NULL);
369 wq->return_waiting_threads();
370 while (num_threads != num_paused){
371 wait_cond.wait(ul);
372 }
373 ldout(cct,10) << "paused" << dendl;
374 }
375
376 void ShardedThreadPool::pause_new()
377 {
378 ldout(cct,10) << "pause_new" << dendl;
379 shardedpool_lock.lock();
380 pause_threads = true;
381 ceph_assert(wq != NULL);
382 wq->return_waiting_threads();
383 shardedpool_lock.unlock();
384 ldout(cct,10) << "paused_new" << dendl;
385 }
386
387 void ShardedThreadPool::unpause()
388 {
389 ldout(cct,10) << "unpause" << dendl;
390 shardedpool_lock.lock();
391 pause_threads = false;
392 wq->stop_return_waiting_threads();
393 shardedpool_cond.notify_all();
394 shardedpool_lock.unlock();
395 ldout(cct,10) << "unpaused" << dendl;
396 }
397
398 void ShardedThreadPool::drain()
399 {
400 std::unique_lock ul(shardedpool_lock);
401 ldout(cct,10) << "drain" << dendl;
402 drain_threads = true;
403 ceph_assert(wq != NULL);
404 wq->return_waiting_threads();
405 while (num_threads != num_drained) {
406 wait_cond.wait(ul);
407 }
408 drain_threads = false;
409 wq->stop_return_waiting_threads();
410 shardedpool_cond.notify_all();
411 ldout(cct,10) << "drained" << dendl;
412 }
413