]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/WorkQueue.h
c817e74ec0920201ef88199ff42750b284823f17
[ceph.git] / ceph / src / common / WorkQueue.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_WORKQUEUE_H
16 #define CEPH_WORKQUEUE_H
17
18 #include "Cond.h"
19 #include "include/unordered_map.h"
20 #include "common/HeartbeatMap.h"
21
22 #include <atomic>
23
24 class CephContext;
25
26 /// Pool of threads that share work submitted to multiple work queues.
27 class ThreadPool : public md_config_obs_t {
28 CephContext *cct;
29 string name;
30 string thread_name;
31 string lockname;
32 Mutex _lock;
33 Cond _cond;
34 bool _stop;
35 int _pause;
36 int _draining;
37 Cond _wait_cond;
38 int ioprio_class, ioprio_priority;
39
40 public:
41 class TPHandle {
42 friend class ThreadPool;
43 CephContext *cct;
44 heartbeat_handle_d *hb;
45 time_t grace;
46 time_t suicide_grace;
47 public:
48 TPHandle(
49 CephContext *cct,
50 heartbeat_handle_d *hb,
51 time_t grace,
52 time_t suicide_grace)
53 : cct(cct), hb(hb), grace(grace), suicide_grace(suicide_grace) {}
54 void reset_tp_timeout();
55 void suspend_tp_timeout();
56 };
57 private:
58
59 /// Basic interface to a work queue used by the worker threads.
60 struct WorkQueue_ {
61 string name;
62 time_t timeout_interval, suicide_interval;
63 WorkQueue_(string n, time_t ti, time_t sti)
64 : name(std::move(n)), timeout_interval(ti), suicide_interval(sti)
65 { }
66 virtual ~WorkQueue_() {}
67 /// Remove all work items from the queue.
68 virtual void _clear() = 0;
69 /// Check whether there is anything to do.
70 virtual bool _empty() = 0;
71 /// Get the next work item to process.
72 virtual void *_void_dequeue() = 0;
73 /** @brief Process the work item.
74 * This function will be called several times in parallel
75 * and must therefore be thread-safe. */
76 virtual void _void_process(void *item, TPHandle &handle) = 0;
77 /** @brief Synchronously finish processing a work item.
78 * This function is called after _void_process with the global thread pool lock held,
79 * so at most one copy will execute simultaneously for a given thread pool.
80 * It can be used for non-thread-safe finalization. */
81 virtual void _void_process_finish(void *) = 0;
82 };
83
84 // track thread pool size changes
85 unsigned _num_threads;
86 string _thread_num_option;
87 const char **_conf_keys;
88
89 const char **get_tracked_conf_keys() const override {
90 return _conf_keys;
91 }
92 void handle_conf_change(const struct md_config_t *conf,
93 const std::set <std::string> &changed) override;
94
95 public:
96 /** @brief Work queue that processes several submitted items at once.
97 * The queue will automatically add itself to the thread pool on construction
98 * and remove itself on destruction. */
99 template<class T>
100 class BatchWorkQueue : public WorkQueue_ {
101 ThreadPool *pool;
102
103 virtual bool _enqueue(T *) = 0;
104 virtual void _dequeue(T *) = 0;
105 virtual void _dequeue(list<T*> *) = 0;
106 virtual void _process_finish(const list<T*> &) {}
107
108 // virtual methods from WorkQueue_ below
109 void *_void_dequeue() override {
110 list<T*> *out(new list<T*>);
111 _dequeue(out);
112 if (!out->empty()) {
113 return (void *)out;
114 } else {
115 delete out;
116 return 0;
117 }
118 }
119 void _void_process(void *p, TPHandle &handle) override {
120 _process(*((list<T*>*)p), handle);
121 }
122 void _void_process_finish(void *p) override {
123 _process_finish(*(list<T*>*)p);
124 delete (list<T*> *)p;
125 }
126
127 protected:
128 virtual void _process(const list<T*> &items, TPHandle &handle) = 0;
129
130 public:
131 BatchWorkQueue(string n, time_t ti, time_t sti, ThreadPool* p)
132 : WorkQueue_(std::move(n), ti, sti), pool(p) {
133 pool->add_work_queue(this);
134 }
135 ~BatchWorkQueue() override {
136 pool->remove_work_queue(this);
137 }
138
139 bool queue(T *item) {
140 pool->_lock.Lock();
141 bool r = _enqueue(item);
142 pool->_cond.SignalOne();
143 pool->_lock.Unlock();
144 return r;
145 }
146 void dequeue(T *item) {
147 pool->_lock.Lock();
148 _dequeue(item);
149 pool->_lock.Unlock();
150 }
151 void clear() {
152 pool->_lock.Lock();
153 _clear();
154 pool->_lock.Unlock();
155 }
156
157 void lock() {
158 pool->lock();
159 }
160 void unlock() {
161 pool->unlock();
162 }
163 void wake() {
164 pool->wake();
165 }
166 void _wake() {
167 pool->_wake();
168 }
169 void drain() {
170 pool->drain(this);
171 }
172
173 };
174
175 /** @brief Templated by-value work queue.
176 * Skeleton implementation of a queue that processes items submitted by value.
177 * This is useful if the items are single primitive values or very small objects
178 * (a few bytes). The queue will automatically add itself to the thread pool on
179 * construction and remove itself on destruction. */
180 template<typename T, typename U = T>
181 class WorkQueueVal : public WorkQueue_ {
182 Mutex _lock;
183 ThreadPool *pool;
184 list<U> to_process;
185 list<U> to_finish;
186 virtual void _enqueue(T) = 0;
187 virtual void _enqueue_front(T) = 0;
188 bool _empty() override = 0;
189 virtual U _dequeue() = 0;
190 virtual void _process_finish(U) {}
191
192 void *_void_dequeue() override {
193 {
194 Mutex::Locker l(_lock);
195 if (_empty())
196 return 0;
197 U u = _dequeue();
198 to_process.push_back(u);
199 }
200 return ((void*)1); // Not used
201 }
202 void _void_process(void *, TPHandle &handle) override {
203 _lock.Lock();
204 assert(!to_process.empty());
205 U u = to_process.front();
206 to_process.pop_front();
207 _lock.Unlock();
208
209 _process(u, handle);
210
211 _lock.Lock();
212 to_finish.push_back(u);
213 _lock.Unlock();
214 }
215
216 void _void_process_finish(void *) override {
217 _lock.Lock();
218 assert(!to_finish.empty());
219 U u = to_finish.front();
220 to_finish.pop_front();
221 _lock.Unlock();
222
223 _process_finish(u);
224 }
225
226 void _clear() override {}
227
228 public:
229 WorkQueueVal(string n, time_t ti, time_t sti, ThreadPool *p)
230 : WorkQueue_(std::move(n), ti, sti), _lock("WorkQueueVal::lock"), pool(p) {
231 pool->add_work_queue(this);
232 }
233 ~WorkQueueVal() override {
234 pool->remove_work_queue(this);
235 }
236 void queue(T item) {
237 Mutex::Locker l(pool->_lock);
238 _enqueue(item);
239 pool->_cond.SignalOne();
240 }
241 void queue_front(T item) {
242 Mutex::Locker l(pool->_lock);
243 _enqueue_front(item);
244 pool->_cond.SignalOne();
245 }
246 void drain() {
247 pool->drain(this);
248 }
249 protected:
250 void lock() {
251 pool->lock();
252 }
253 void unlock() {
254 pool->unlock();
255 }
256 virtual void _process(U u, TPHandle &) = 0;
257 };
258
259 /** @brief Template by-pointer work queue.
260 * Skeleton implementation of a queue that processes items of a given type submitted as pointers.
261 * This is useful when the work item are large or include dynamically allocated memory. The queue
262 * will automatically add itself to the thread pool on construction and remove itself on
263 * destruction. */
264 template<class T>
265 class WorkQueue : public WorkQueue_ {
266 ThreadPool *pool;
267
268 /// Add a work item to the queue.
269 virtual bool _enqueue(T *) = 0;
270 /// Dequeue a previously submitted work item.
271 virtual void _dequeue(T *) = 0;
272 /// Dequeue a work item and return the original submitted pointer.
273 virtual T *_dequeue() = 0;
274 virtual void _process_finish(T *) {}
275
276 // implementation of virtual methods from WorkQueue_
277 void *_void_dequeue() override {
278 return (void *)_dequeue();
279 }
280 void _void_process(void *p, TPHandle &handle) override {
281 _process(static_cast<T *>(p), handle);
282 }
283 void _void_process_finish(void *p) override {
284 _process_finish(static_cast<T *>(p));
285 }
286
287 protected:
288 /// Process a work item. Called from the worker threads.
289 virtual void _process(T *t, TPHandle &) = 0;
290
291 public:
292 WorkQueue(string n, time_t ti, time_t sti, ThreadPool* p)
293 : WorkQueue_(std::move(n), ti, sti), pool(p) {
294 pool->add_work_queue(this);
295 }
296 ~WorkQueue() override {
297 pool->remove_work_queue(this);
298 }
299
300 bool queue(T *item) {
301 pool->_lock.Lock();
302 bool r = _enqueue(item);
303 pool->_cond.SignalOne();
304 pool->_lock.Unlock();
305 return r;
306 }
307 void dequeue(T *item) {
308 pool->_lock.Lock();
309 _dequeue(item);
310 pool->_lock.Unlock();
311 }
312 void clear() {
313 pool->_lock.Lock();
314 _clear();
315 pool->_lock.Unlock();
316 }
317
318 Mutex &get_lock() {
319 return pool->_lock;
320 }
321
322 void lock() {
323 pool->lock();
324 }
325 void unlock() {
326 pool->unlock();
327 }
328 /// wake up the thread pool (without lock held)
329 void wake() {
330 pool->wake();
331 }
332 /// wake up the thread pool (with lock already held)
333 void _wake() {
334 pool->_wake();
335 }
336 void _wait() {
337 pool->_wait();
338 }
339 void drain() {
340 pool->drain(this);
341 }
342
343 };
344
345 template<typename T>
346 class PointerWQ : public WorkQueue_ {
347 public:
348 ~PointerWQ() override {
349 m_pool->remove_work_queue(this);
350 assert(m_processing == 0);
351 }
352 void drain() {
353 {
354 // if this queue is empty and not processing, don't wait for other
355 // queues to finish processing
356 Mutex::Locker l(m_pool->_lock);
357 if (m_processing == 0 && m_items.empty()) {
358 return;
359 }
360 }
361 m_pool->drain(this);
362 }
363 void queue(T *item) {
364 Mutex::Locker l(m_pool->_lock);
365 m_items.push_back(item);
366 m_pool->_cond.SignalOne();
367 }
368 bool empty() {
369 Mutex::Locker l(m_pool->_lock);
370 return _empty();
371 }
372 protected:
373 PointerWQ(string n, time_t ti, time_t sti, ThreadPool* p)
374 : WorkQueue_(std::move(n), ti, sti), m_pool(p), m_processing(0) {
375 }
376 void _clear() override {
377 assert(m_pool->_lock.is_locked());
378 m_items.clear();
379 }
380 bool _empty() override {
381 assert(m_pool->_lock.is_locked());
382 return m_items.empty();
383 }
384 void *_void_dequeue() override {
385 assert(m_pool->_lock.is_locked());
386 if (m_items.empty()) {
387 return NULL;
388 }
389
390 ++m_processing;
391 T *item = m_items.front();
392 m_items.pop_front();
393 return item;
394 }
395 void _void_process(void *item, ThreadPool::TPHandle &handle) override {
396 process(reinterpret_cast<T *>(item));
397 }
398 void _void_process_finish(void *item) override {
399 assert(m_pool->_lock.is_locked());
400 assert(m_processing > 0);
401 --m_processing;
402 }
403
404 virtual void process(T *item) = 0;
405 void process_finish() {
406 Mutex::Locker locker(m_pool->_lock);
407 _void_process_finish(nullptr);
408 }
409
410 T *front() {
411 assert(m_pool->_lock.is_locked());
412 if (m_items.empty()) {
413 return NULL;
414 }
415 return m_items.front();
416 }
417 void requeue(T *item) {
418 Mutex::Locker pool_locker(m_pool->_lock);
419 _void_process_finish(nullptr);
420 m_items.push_front(item);
421 }
422 void signal() {
423 Mutex::Locker pool_locker(m_pool->_lock);
424 m_pool->_cond.SignalOne();
425 }
426 Mutex &get_pool_lock() {
427 return m_pool->_lock;
428 }
429 private:
430 ThreadPool *m_pool;
431 std::list<T *> m_items;
432 uint32_t m_processing;
433 };
434 private:
435 vector<WorkQueue_*> work_queues;
436 int next_work_queue = 0;
437
438
439 // threads
440 struct WorkThread : public Thread {
441 ThreadPool *pool;
442 // cppcheck-suppress noExplicitConstructor
443 WorkThread(ThreadPool *p) : pool(p) {}
444 void *entry() override {
445 pool->worker(this);
446 return 0;
447 }
448 };
449
450 set<WorkThread*> _threads;
451 list<WorkThread*> _old_threads; ///< need to be joined
452 int processing;
453
454 void start_threads();
455 void join_old_threads();
456 void worker(WorkThread *wt);
457
458 public:
459 ThreadPool(CephContext *cct_, string nm, string tn, int n, const char *option = NULL);
460 ~ThreadPool() override;
461
462 /// return number of threads currently running
463 int get_num_threads() {
464 Mutex::Locker l(_lock);
465 return _num_threads;
466 }
467
468 /// assign a work queue to this thread pool
469 void add_work_queue(WorkQueue_* wq) {
470 Mutex::Locker l(_lock);
471 work_queues.push_back(wq);
472 }
473 /// remove a work queue from this thread pool
474 void remove_work_queue(WorkQueue_* wq) {
475 Mutex::Locker l(_lock);
476 unsigned i = 0;
477 while (work_queues[i] != wq)
478 i++;
479 for (i++; i < work_queues.size(); i++)
480 work_queues[i-1] = work_queues[i];
481 assert(i == work_queues.size());
482 work_queues.resize(i-1);
483 }
484
485 /// take thread pool lock
486 void lock() {
487 _lock.Lock();
488 }
489 /// release thread pool lock
490 void unlock() {
491 _lock.Unlock();
492 }
493
494 /// wait for a kick on this thread pool
495 void wait(Cond &c) {
496 c.Wait(_lock);
497 }
498
499 /// wake up a waiter (with lock already held)
500 void _wake() {
501 _cond.Signal();
502 }
503 /// wake up a waiter (without lock held)
504 void wake() {
505 Mutex::Locker l(_lock);
506 _cond.Signal();
507 }
508 void _wait() {
509 _cond.Wait(_lock);
510 }
511
512 /// start thread pool thread
513 void start();
514 /// stop thread pool thread
515 void stop(bool clear_after=true);
516 /// pause thread pool (if it not already paused)
517 void pause();
518 /// pause initiation of new work
519 void pause_new();
520 /// resume work in thread pool. must match each pause() call 1:1 to resume.
521 void unpause();
522 /** @brief Wait until work completes.
523 * If the parameter is NULL, blocks until all threads are idle.
524 * If it is not NULL, blocks until the given work queue does not have
525 * any items left to process. */
526 void drain(WorkQueue_* wq = 0);
527
528 /// set io priority
529 void set_ioprio(int cls, int priority);
530 };
531
532 class GenContextWQ :
533 public ThreadPool::WorkQueueVal<GenContext<ThreadPool::TPHandle&>*> {
534 list<GenContext<ThreadPool::TPHandle&>*> _queue;
535 public:
536 GenContextWQ(const string &name, time_t ti, ThreadPool *tp)
537 : ThreadPool::WorkQueueVal<
538 GenContext<ThreadPool::TPHandle&>*>(name, ti, ti*10, tp) {}
539
540 void _enqueue(GenContext<ThreadPool::TPHandle&> *c) override {
541 _queue.push_back(c);
542 }
543 void _enqueue_front(GenContext<ThreadPool::TPHandle&> *c) override {
544 _queue.push_front(c);
545 }
546 bool _empty() override {
547 return _queue.empty();
548 }
549 GenContext<ThreadPool::TPHandle&> *_dequeue() override {
550 assert(!_queue.empty());
551 GenContext<ThreadPool::TPHandle&> *c = _queue.front();
552 _queue.pop_front();
553 return c;
554 }
555 void _process(GenContext<ThreadPool::TPHandle&> *c,
556 ThreadPool::TPHandle &tp) override {
557 c->complete(tp);
558 }
559 };
560
561 class C_QueueInWQ : public Context {
562 GenContextWQ *wq;
563 GenContext<ThreadPool::TPHandle&> *c;
564 public:
565 C_QueueInWQ(GenContextWQ *wq, GenContext<ThreadPool::TPHandle &> *c)
566 : wq(wq), c(c) {}
567 void finish(int) override {
568 wq->queue(c);
569 }
570 };
571
572 /// Work queue that asynchronously completes contexts (executes callbacks).
573 /// @see Finisher
574 class ContextWQ : public ThreadPool::PointerWQ<Context> {
575 public:
576 ContextWQ(const string &name, time_t ti, ThreadPool *tp)
577 : ThreadPool::PointerWQ<Context>(name, ti, 0, tp),
578 m_lock("ContextWQ::m_lock") {
579 tp->add_work_queue(this);
580 }
581
582 void queue(Context *ctx, int result = 0) {
583 if (result != 0) {
584 Mutex::Locker locker(m_lock);
585 m_context_results[ctx] = result;
586 }
587 ThreadPool::PointerWQ<Context>::queue(ctx);
588 }
589 protected:
590 void _clear() override {
591 ThreadPool::PointerWQ<Context>::_clear();
592
593 Mutex::Locker locker(m_lock);
594 m_context_results.clear();
595 }
596
597 void process(Context *ctx) override {
598 int result = 0;
599 {
600 Mutex::Locker locker(m_lock);
601 ceph::unordered_map<Context *, int>::iterator it =
602 m_context_results.find(ctx);
603 if (it != m_context_results.end()) {
604 result = it->second;
605 m_context_results.erase(it);
606 }
607 }
608 ctx->complete(result);
609 }
610 private:
611 Mutex m_lock;
612 ceph::unordered_map<Context*, int> m_context_results;
613 };
614
615 class ShardedThreadPool {
616
617 CephContext *cct;
618 string name;
619 string thread_name;
620 string lockname;
621 Mutex shardedpool_lock;
622 Cond shardedpool_cond;
623 Cond wait_cond;
624 uint32_t num_threads;
625
626 std::atomic<bool> stop_threads = { false };
627 std::atomic<bool> pause_threads = { false };
628 std::atomic<bool> drain_threads = { false };
629
630 uint32_t num_paused;
631 uint32_t num_drained;
632
633 public:
634
635 class BaseShardedWQ {
636
637 public:
638 time_t timeout_interval, suicide_interval;
639 BaseShardedWQ(time_t ti, time_t sti):timeout_interval(ti), suicide_interval(sti) {}
640 virtual ~BaseShardedWQ() {}
641
642 virtual void _process(uint32_t thread_index, heartbeat_handle_d *hb ) = 0;
643 virtual void return_waiting_threads() = 0;
644 virtual bool is_shard_empty(uint32_t thread_index) = 0;
645 };
646
647 template <typename T>
648 class ShardedWQ: public BaseShardedWQ {
649
650 ShardedThreadPool* sharded_pool;
651
652 protected:
653 virtual void _enqueue(T) = 0;
654 virtual void _enqueue_front(T) = 0;
655
656
657 public:
658 ShardedWQ(time_t ti, time_t sti, ShardedThreadPool* tp): BaseShardedWQ(ti, sti),
659 sharded_pool(tp) {
660 tp->set_wq(this);
661 }
662 ~ShardedWQ() override {}
663
664 void queue(T item) {
665 _enqueue(item);
666 }
667 void queue_front(T item) {
668 _enqueue_front(item);
669 }
670 void drain() {
671 sharded_pool->drain();
672 }
673
674 };
675
676 private:
677
678 BaseShardedWQ* wq;
679 // threads
680 struct WorkThreadSharded : public Thread {
681 ShardedThreadPool *pool;
682 uint32_t thread_index;
683 WorkThreadSharded(ShardedThreadPool *p, uint32_t pthread_index): pool(p),
684 thread_index(pthread_index) {}
685 void *entry() override {
686 pool->shardedthreadpool_worker(thread_index);
687 return 0;
688 }
689 };
690
691 vector<WorkThreadSharded*> threads_shardedpool;
692 void start_threads();
693 void shardedthreadpool_worker(uint32_t thread_index);
694 void set_wq(BaseShardedWQ* swq) {
695 wq = swq;
696 }
697
698
699
700 public:
701
702 ShardedThreadPool(CephContext *cct_, string nm, string tn, uint32_t pnum_threads);
703
704 ~ShardedThreadPool(){};
705
706 /// start thread pool thread
707 void start();
708 /// stop thread pool thread
709 void stop();
710 /// pause thread pool (if it not already paused)
711 void pause();
712 /// pause initiation of new work
713 void pause_new();
714 /// resume work in thread pool. must match each pause() call 1:1 to resume.
715 void unpause();
716 /// wait for all work to complete
717 void drain();
718
719 };
720
721
722 #endif