]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/WorkQueue.h
update sources to v12.1.1
[ceph.git] / ceph / src / common / WorkQueue.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_WORKQUEUE_H
16 #define CEPH_WORKQUEUE_H
17
18 #include "Cond.h"
19 #include "include/unordered_map.h"
20 #include "common/HeartbeatMap.h"
21
22 #include <atomic>
23
24 class CephContext;
25
26 /// Pool of threads that share work submitted to multiple work queues.
27 class ThreadPool : public md_config_obs_t {
28 CephContext *cct;
29 string name;
30 string thread_name;
31 string lockname;
32 Mutex _lock;
33 Cond _cond;
34 bool _stop;
35 int _pause;
36 int _draining;
37 Cond _wait_cond;
38 int ioprio_class, ioprio_priority;
39
40 public:
41 class TPHandle {
42 friend class ThreadPool;
43 CephContext *cct;
44 heartbeat_handle_d *hb;
45 time_t grace;
46 time_t suicide_grace;
47 public:
48 TPHandle(
49 CephContext *cct,
50 heartbeat_handle_d *hb,
51 time_t grace,
52 time_t suicide_grace)
53 : cct(cct), hb(hb), grace(grace), suicide_grace(suicide_grace) {}
54 void reset_tp_timeout();
55 void suspend_tp_timeout();
56 };
57 private:
58
59 /// Basic interface to a work queue used by the worker threads.
60 struct WorkQueue_ {
61 string name;
62 time_t timeout_interval, suicide_interval;
63 WorkQueue_(string n, time_t ti, time_t sti)
64 : name(std::move(n)), timeout_interval(ti), suicide_interval(sti)
65 { }
66 virtual ~WorkQueue_() {}
67 /// Remove all work items from the queue.
68 virtual void _clear() = 0;
69 /// Check whether there is anything to do.
70 virtual bool _empty() = 0;
71 /// Get the next work item to process.
72 virtual void *_void_dequeue() = 0;
73 /** @brief Process the work item.
74 * This function will be called several times in parallel
75 * and must therefore be thread-safe. */
76 virtual void _void_process(void *item, TPHandle &handle) = 0;
77 /** @brief Synchronously finish processing a work item.
78 * This function is called after _void_process with the global thread pool lock held,
79 * so at most one copy will execute simultaneously for a given thread pool.
80 * It can be used for non-thread-safe finalization. */
81 virtual void _void_process_finish(void *) = 0;
82 };
83
84 // track thread pool size changes
85 unsigned _num_threads;
86 string _thread_num_option;
87 const char **_conf_keys;
88
89 const char **get_tracked_conf_keys() const override {
90 return _conf_keys;
91 }
92 void handle_conf_change(const struct md_config_t *conf,
93 const std::set <std::string> &changed) override;
94
95 public:
96 /** @brief Work queue that processes several submitted items at once.
97 * The queue will automatically add itself to the thread pool on construction
98 * and remove itself on destruction. */
99 template<class T>
100 class BatchWorkQueue : public WorkQueue_ {
101 ThreadPool *pool;
102
103 virtual bool _enqueue(T *) = 0;
104 virtual void _dequeue(T *) = 0;
105 virtual void _dequeue(list<T*> *) = 0;
106 virtual void _process_finish(const list<T*> &) {}
107
108 // virtual methods from WorkQueue_ below
109 void *_void_dequeue() override {
110 list<T*> *out(new list<T*>);
111 _dequeue(out);
112 if (!out->empty()) {
113 return (void *)out;
114 } else {
115 delete out;
116 return 0;
117 }
118 }
119 void _void_process(void *p, TPHandle &handle) override {
120 _process(*((list<T*>*)p), handle);
121 }
122 void _void_process_finish(void *p) override {
123 _process_finish(*(list<T*>*)p);
124 delete (list<T*> *)p;
125 }
126
127 protected:
128 virtual void _process(const list<T*> &items, TPHandle &handle) = 0;
129
130 public:
131 BatchWorkQueue(string n, time_t ti, time_t sti, ThreadPool* p)
132 : WorkQueue_(std::move(n), ti, sti), pool(p) {
133 pool->add_work_queue(this);
134 }
135 ~BatchWorkQueue() override {
136 pool->remove_work_queue(this);
137 }
138
139 bool queue(T *item) {
140 pool->_lock.Lock();
141 bool r = _enqueue(item);
142 pool->_cond.SignalOne();
143 pool->_lock.Unlock();
144 return r;
145 }
146 void dequeue(T *item) {
147 pool->_lock.Lock();
148 _dequeue(item);
149 pool->_lock.Unlock();
150 }
151 void clear() {
152 pool->_lock.Lock();
153 _clear();
154 pool->_lock.Unlock();
155 }
156
157 void lock() {
158 pool->lock();
159 }
160 void unlock() {
161 pool->unlock();
162 }
163 void wake() {
164 pool->wake();
165 }
166 void _wake() {
167 pool->_wake();
168 }
169 void drain() {
170 pool->drain(this);
171 }
172
173 };
174
175 /** @brief Templated by-value work queue.
176 * Skeleton implementation of a queue that processes items submitted by value.
177 * This is useful if the items are single primitive values or very small objects
178 * (a few bytes). The queue will automatically add itself to the thread pool on
179 * construction and remove itself on destruction. */
180 template<typename T, typename U = T>
181 class WorkQueueVal : public WorkQueue_ {
182 Mutex _lock;
183 ThreadPool *pool;
184 list<U> to_process;
185 list<U> to_finish;
186 virtual void _enqueue(T) = 0;
187 virtual void _enqueue_front(T) = 0;
188 bool _empty() override = 0;
189 virtual U _dequeue() = 0;
190 virtual void _process_finish(U) {}
191
192 void *_void_dequeue() override {
193 {
194 Mutex::Locker l(_lock);
195 if (_empty())
196 return 0;
197 U u = _dequeue();
198 to_process.push_back(u);
199 }
200 return ((void*)1); // Not used
201 }
202 void _void_process(void *, TPHandle &handle) override {
203 _lock.Lock();
204 assert(!to_process.empty());
205 U u = to_process.front();
206 to_process.pop_front();
207 _lock.Unlock();
208
209 _process(u, handle);
210
211 _lock.Lock();
212 to_finish.push_back(u);
213 _lock.Unlock();
214 }
215
216 void _void_process_finish(void *) override {
217 _lock.Lock();
218 assert(!to_finish.empty());
219 U u = to_finish.front();
220 to_finish.pop_front();
221 _lock.Unlock();
222
223 _process_finish(u);
224 }
225
226 void _clear() override {}
227
228 public:
229 WorkQueueVal(string n, time_t ti, time_t sti, ThreadPool *p)
230 : WorkQueue_(std::move(n), ti, sti), _lock("WorkQueueVal::lock"), pool(p) {
231 pool->add_work_queue(this);
232 }
233 ~WorkQueueVal() override {
234 pool->remove_work_queue(this);
235 }
236 void queue(T item) {
237 Mutex::Locker l(pool->_lock);
238 _enqueue(item);
239 pool->_cond.SignalOne();
240 }
241 void queue_front(T item) {
242 Mutex::Locker l(pool->_lock);
243 _enqueue_front(item);
244 pool->_cond.SignalOne();
245 }
246 void drain() {
247 pool->drain(this);
248 }
249 protected:
250 void lock() {
251 pool->lock();
252 }
253 void unlock() {
254 pool->unlock();
255 }
256 virtual void _process(U u, TPHandle &) = 0;
257 };
258
259 /** @brief Template by-pointer work queue.
260 * Skeleton implementation of a queue that processes items of a given type submitted as pointers.
261 * This is useful when the work item are large or include dynamically allocated memory. The queue
262 * will automatically add itself to the thread pool on construction and remove itself on
263 * destruction. */
264 template<class T>
265 class WorkQueue : public WorkQueue_ {
266 ThreadPool *pool;
267
268 /// Add a work item to the queue.
269 virtual bool _enqueue(T *) = 0;
270 /// Dequeue a previously submitted work item.
271 virtual void _dequeue(T *) = 0;
272 /// Dequeue a work item and return the original submitted pointer.
273 virtual T *_dequeue() = 0;
274 virtual void _process_finish(T *) {}
275
276 // implementation of virtual methods from WorkQueue_
277 void *_void_dequeue() override {
278 return (void *)_dequeue();
279 }
280 void _void_process(void *p, TPHandle &handle) override {
281 _process(static_cast<T *>(p), handle);
282 }
283 void _void_process_finish(void *p) override {
284 _process_finish(static_cast<T *>(p));
285 }
286
287 protected:
288 /// Process a work item. Called from the worker threads.
289 virtual void _process(T *t, TPHandle &) = 0;
290
291 public:
292 WorkQueue(string n, time_t ti, time_t sti, ThreadPool* p)
293 : WorkQueue_(std::move(n), ti, sti), pool(p) {
294 pool->add_work_queue(this);
295 }
296 ~WorkQueue() override {
297 pool->remove_work_queue(this);
298 }
299
300 bool queue(T *item) {
301 pool->_lock.Lock();
302 bool r = _enqueue(item);
303 pool->_cond.SignalOne();
304 pool->_lock.Unlock();
305 return r;
306 }
307 void dequeue(T *item) {
308 pool->_lock.Lock();
309 _dequeue(item);
310 pool->_lock.Unlock();
311 }
312 void clear() {
313 pool->_lock.Lock();
314 _clear();
315 pool->_lock.Unlock();
316 }
317
318 Mutex &get_lock() {
319 return pool->_lock;
320 }
321
322 void lock() {
323 pool->lock();
324 }
325 void unlock() {
326 pool->unlock();
327 }
328 /// wake up the thread pool (without lock held)
329 void wake() {
330 pool->wake();
331 }
332 /// wake up the thread pool (with lock already held)
333 void _wake() {
334 pool->_wake();
335 }
336 void _wait() {
337 pool->_wait();
338 }
339 void drain() {
340 pool->drain(this);
341 }
342
343 };
344
345 template<typename T>
346 class PointerWQ : public WorkQueue_ {
347 public:
348 ~PointerWQ() override {
349 m_pool->remove_work_queue(this);
350 assert(m_processing == 0);
351 }
352 void drain() {
353 {
354 // if this queue is empty and not processing, don't wait for other
355 // queues to finish processing
356 Mutex::Locker l(m_pool->_lock);
357 if (m_processing == 0 && m_items.empty()) {
358 return;
359 }
360 }
361 m_pool->drain(this);
362 }
363 void queue(T *item) {
364 Mutex::Locker l(m_pool->_lock);
365 m_items.push_back(item);
366 m_pool->_cond.SignalOne();
367 }
368 bool empty() {
369 Mutex::Locker l(m_pool->_lock);
370 return _empty();
371 }
372 protected:
373 PointerWQ(string n, time_t ti, time_t sti, ThreadPool* p)
374 : WorkQueue_(std::move(n), ti, sti), m_pool(p), m_processing(0) {
375 }
376 void register_work_queue() {
377 m_pool->add_work_queue(this);
378 }
379 void _clear() override {
380 assert(m_pool->_lock.is_locked());
381 m_items.clear();
382 }
383 bool _empty() override {
384 assert(m_pool->_lock.is_locked());
385 return m_items.empty();
386 }
387 void *_void_dequeue() override {
388 assert(m_pool->_lock.is_locked());
389 if (m_items.empty()) {
390 return NULL;
391 }
392
393 ++m_processing;
394 T *item = m_items.front();
395 m_items.pop_front();
396 return item;
397 }
398 void _void_process(void *item, ThreadPool::TPHandle &handle) override {
399 process(reinterpret_cast<T *>(item));
400 }
401 void _void_process_finish(void *item) override {
402 assert(m_pool->_lock.is_locked());
403 assert(m_processing > 0);
404 --m_processing;
405 }
406
407 virtual void process(T *item) = 0;
408 void process_finish() {
409 Mutex::Locker locker(m_pool->_lock);
410 _void_process_finish(nullptr);
411 }
412
413 T *front() {
414 assert(m_pool->_lock.is_locked());
415 if (m_items.empty()) {
416 return NULL;
417 }
418 return m_items.front();
419 }
420 void requeue(T *item) {
421 Mutex::Locker pool_locker(m_pool->_lock);
422 _void_process_finish(nullptr);
423 m_items.push_front(item);
424 }
425 void signal() {
426 Mutex::Locker pool_locker(m_pool->_lock);
427 m_pool->_cond.SignalOne();
428 }
429 Mutex &get_pool_lock() {
430 return m_pool->_lock;
431 }
432 private:
433 ThreadPool *m_pool;
434 std::list<T *> m_items;
435 uint32_t m_processing;
436 };
437 private:
438 vector<WorkQueue_*> work_queues;
439 int next_work_queue = 0;
440
441
442 // threads
443 struct WorkThread : public Thread {
444 ThreadPool *pool;
445 // cppcheck-suppress noExplicitConstructor
446 WorkThread(ThreadPool *p) : pool(p) {}
447 void *entry() override {
448 pool->worker(this);
449 return 0;
450 }
451 };
452
453 set<WorkThread*> _threads;
454 list<WorkThread*> _old_threads; ///< need to be joined
455 int processing;
456
457 void start_threads();
458 void join_old_threads();
459 void worker(WorkThread *wt);
460
461 public:
462 ThreadPool(CephContext *cct_, string nm, string tn, int n, const char *option = NULL);
463 ~ThreadPool() override;
464
465 /// return number of threads currently running
466 int get_num_threads() {
467 Mutex::Locker l(_lock);
468 return _num_threads;
469 }
470
471 /// assign a work queue to this thread pool
472 void add_work_queue(WorkQueue_* wq) {
473 Mutex::Locker l(_lock);
474 work_queues.push_back(wq);
475 }
476 /// remove a work queue from this thread pool
477 void remove_work_queue(WorkQueue_* wq) {
478 Mutex::Locker l(_lock);
479 unsigned i = 0;
480 while (work_queues[i] != wq)
481 i++;
482 for (i++; i < work_queues.size(); i++)
483 work_queues[i-1] = work_queues[i];
484 assert(i == work_queues.size());
485 work_queues.resize(i-1);
486 }
487
488 /// take thread pool lock
489 void lock() {
490 _lock.Lock();
491 }
492 /// release thread pool lock
493 void unlock() {
494 _lock.Unlock();
495 }
496
497 /// wait for a kick on this thread pool
498 void wait(Cond &c) {
499 c.Wait(_lock);
500 }
501
502 /// wake up a waiter (with lock already held)
503 void _wake() {
504 _cond.Signal();
505 }
506 /// wake up a waiter (without lock held)
507 void wake() {
508 Mutex::Locker l(_lock);
509 _cond.Signal();
510 }
511 void _wait() {
512 _cond.Wait(_lock);
513 }
514
515 /// start thread pool thread
516 void start();
517 /// stop thread pool thread
518 void stop(bool clear_after=true);
519 /// pause thread pool (if it not already paused)
520 void pause();
521 /// pause initiation of new work
522 void pause_new();
523 /// resume work in thread pool. must match each pause() call 1:1 to resume.
524 void unpause();
525 /** @brief Wait until work completes.
526 * If the parameter is NULL, blocks until all threads are idle.
527 * If it is not NULL, blocks until the given work queue does not have
528 * any items left to process. */
529 void drain(WorkQueue_* wq = 0);
530
531 /// set io priority
532 void set_ioprio(int cls, int priority);
533 };
534
535 class GenContextWQ :
536 public ThreadPool::WorkQueueVal<GenContext<ThreadPool::TPHandle&>*> {
537 list<GenContext<ThreadPool::TPHandle&>*> _queue;
538 public:
539 GenContextWQ(const string &name, time_t ti, ThreadPool *tp)
540 : ThreadPool::WorkQueueVal<
541 GenContext<ThreadPool::TPHandle&>*>(name, ti, ti*10, tp) {}
542
543 void _enqueue(GenContext<ThreadPool::TPHandle&> *c) override {
544 _queue.push_back(c);
545 }
546 void _enqueue_front(GenContext<ThreadPool::TPHandle&> *c) override {
547 _queue.push_front(c);
548 }
549 bool _empty() override {
550 return _queue.empty();
551 }
552 GenContext<ThreadPool::TPHandle&> *_dequeue() override {
553 assert(!_queue.empty());
554 GenContext<ThreadPool::TPHandle&> *c = _queue.front();
555 _queue.pop_front();
556 return c;
557 }
558 void _process(GenContext<ThreadPool::TPHandle&> *c,
559 ThreadPool::TPHandle &tp) override {
560 c->complete(tp);
561 }
562 };
563
564 class C_QueueInWQ : public Context {
565 GenContextWQ *wq;
566 GenContext<ThreadPool::TPHandle&> *c;
567 public:
568 C_QueueInWQ(GenContextWQ *wq, GenContext<ThreadPool::TPHandle &> *c)
569 : wq(wq), c(c) {}
570 void finish(int) override {
571 wq->queue(c);
572 }
573 };
574
575 /// Work queue that asynchronously completes contexts (executes callbacks).
576 /// @see Finisher
577 class ContextWQ : public ThreadPool::PointerWQ<Context> {
578 public:
579 ContextWQ(const string &name, time_t ti, ThreadPool *tp)
580 : ThreadPool::PointerWQ<Context>(name, ti, 0, tp),
581 m_lock("ContextWQ::m_lock") {
582 this->register_work_queue();
583 }
584
585 void queue(Context *ctx, int result = 0) {
586 if (result != 0) {
587 Mutex::Locker locker(m_lock);
588 m_context_results[ctx] = result;
589 }
590 ThreadPool::PointerWQ<Context>::queue(ctx);
591 }
592 protected:
593 void _clear() override {
594 ThreadPool::PointerWQ<Context>::_clear();
595
596 Mutex::Locker locker(m_lock);
597 m_context_results.clear();
598 }
599
600 void process(Context *ctx) override {
601 int result = 0;
602 {
603 Mutex::Locker locker(m_lock);
604 ceph::unordered_map<Context *, int>::iterator it =
605 m_context_results.find(ctx);
606 if (it != m_context_results.end()) {
607 result = it->second;
608 m_context_results.erase(it);
609 }
610 }
611 ctx->complete(result);
612 }
613 private:
614 Mutex m_lock;
615 ceph::unordered_map<Context*, int> m_context_results;
616 };
617
618 class ShardedThreadPool {
619
620 CephContext *cct;
621 string name;
622 string thread_name;
623 string lockname;
624 Mutex shardedpool_lock;
625 Cond shardedpool_cond;
626 Cond wait_cond;
627 uint32_t num_threads;
628
629 std::atomic<bool> stop_threads = { false };
630 std::atomic<bool> pause_threads = { false };
631 std::atomic<bool> drain_threads = { false };
632
633 uint32_t num_paused;
634 uint32_t num_drained;
635
636 public:
637
638 class BaseShardedWQ {
639
640 public:
641 time_t timeout_interval, suicide_interval;
642 BaseShardedWQ(time_t ti, time_t sti):timeout_interval(ti), suicide_interval(sti) {}
643 virtual ~BaseShardedWQ() {}
644
645 virtual void _process(uint32_t thread_index, heartbeat_handle_d *hb ) = 0;
646 virtual void return_waiting_threads() = 0;
647 virtual bool is_shard_empty(uint32_t thread_index) = 0;
648 };
649
650 template <typename T>
651 class ShardedWQ: public BaseShardedWQ {
652
653 ShardedThreadPool* sharded_pool;
654
655 protected:
656 virtual void _enqueue(T) = 0;
657 virtual void _enqueue_front(T) = 0;
658
659
660 public:
661 ShardedWQ(time_t ti, time_t sti, ShardedThreadPool* tp): BaseShardedWQ(ti, sti),
662 sharded_pool(tp) {
663 tp->set_wq(this);
664 }
665 ~ShardedWQ() override {}
666
667 void queue(T item) {
668 _enqueue(item);
669 }
670 void queue_front(T item) {
671 _enqueue_front(item);
672 }
673 void drain() {
674 sharded_pool->drain();
675 }
676
677 };
678
679 private:
680
681 BaseShardedWQ* wq;
682 // threads
683 struct WorkThreadSharded : public Thread {
684 ShardedThreadPool *pool;
685 uint32_t thread_index;
686 WorkThreadSharded(ShardedThreadPool *p, uint32_t pthread_index): pool(p),
687 thread_index(pthread_index) {}
688 void *entry() override {
689 pool->shardedthreadpool_worker(thread_index);
690 return 0;
691 }
692 };
693
694 vector<WorkThreadSharded*> threads_shardedpool;
695 void start_threads();
696 void shardedthreadpool_worker(uint32_t thread_index);
697 void set_wq(BaseShardedWQ* swq) {
698 wq = swq;
699 }
700
701
702
703 public:
704
705 ShardedThreadPool(CephContext *cct_, string nm, string tn, uint32_t pnum_threads);
706
707 ~ShardedThreadPool(){};
708
709 /// start thread pool thread
710 void start();
711 /// stop thread pool thread
712 void stop();
713 /// pause thread pool (if it not already paused)
714 void pause();
715 /// pause initiation of new work
716 void pause_new();
717 /// resume work in thread pool. must match each pause() call 1:1 to resume.
718 void unpause();
719 /// wait for all work to complete
720 void drain();
721
722 };
723
724
725 #endif