]> git.proxmox.com Git - ceph.git/blame - ceph/src/common/WorkQueue.h
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / common / WorkQueue.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#ifndef CEPH_WORKQUEUE_H
16#define CEPH_WORKQUEUE_H
17
18#include "Mutex.h"
19#include "Cond.h"
20#include "Thread.h"
21#include "include/unordered_map.h"
22#include "common/config_obs.h"
23#include "common/HeartbeatMap.h"
24
25class CephContext;
26
27/// Pool of threads that share work submitted to multiple work queues.
28class ThreadPool : public md_config_obs_t {
29 CephContext *cct;
30 string name;
31 string thread_name;
32 string lockname;
33 Mutex _lock;
34 Cond _cond;
35 bool _stop;
36 int _pause;
37 int _draining;
38 Cond _wait_cond;
39 int ioprio_class, ioprio_priority;
40
41public:
42 class TPHandle {
43 friend class ThreadPool;
44 CephContext *cct;
45 heartbeat_handle_d *hb;
46 time_t grace;
47 time_t suicide_grace;
48 public:
49 TPHandle(
50 CephContext *cct,
51 heartbeat_handle_d *hb,
52 time_t grace,
53 time_t suicide_grace)
54 : cct(cct), hb(hb), grace(grace), suicide_grace(suicide_grace) {}
55 void reset_tp_timeout();
56 void suspend_tp_timeout();
57 };
58private:
59
60 /// Basic interface to a work queue used by the worker threads.
61 struct WorkQueue_ {
62 string name;
63 time_t timeout_interval, suicide_interval;
64 WorkQueue_(string n, time_t ti, time_t sti)
65 : name(std::move(n)), timeout_interval(ti), suicide_interval(sti)
66 { }
67 virtual ~WorkQueue_() {}
68 /// Remove all work items from the queue.
69 virtual void _clear() = 0;
70 /// Check whether there is anything to do.
71 virtual bool _empty() = 0;
72 /// Get the next work item to process.
73 virtual void *_void_dequeue() = 0;
74 /** @brief Process the work item.
75 * This function will be called several times in parallel
76 * and must therefore be thread-safe. */
77 virtual void _void_process(void *item, TPHandle &handle) = 0;
78 /** @brief Synchronously finish processing a work item.
79 * This function is called after _void_process with the global thread pool lock held,
80 * so at most one copy will execute simultaneously for a given thread pool.
81 * It can be used for non-thread-safe finalization. */
82 virtual void _void_process_finish(void *) = 0;
83 };
84
85 // track thread pool size changes
86 unsigned _num_threads;
87 string _thread_num_option;
88 const char **_conf_keys;
89
90 const char **get_tracked_conf_keys() const override {
91 return _conf_keys;
92 }
93 void handle_conf_change(const struct md_config_t *conf,
94 const std::set <std::string> &changed) override;
95
96public:
97 /** @brief Work queue that processes several submitted items at once.
98 * The queue will automatically add itself to the thread pool on construction
99 * and remove itself on destruction. */
100 template<class T>
101 class BatchWorkQueue : public WorkQueue_ {
102 ThreadPool *pool;
103
104 virtual bool _enqueue(T *) = 0;
105 virtual void _dequeue(T *) = 0;
106 virtual void _dequeue(list<T*> *) = 0;
107 virtual void _process_finish(const list<T*> &) {}
108
109 // virtual methods from WorkQueue_ below
110 void *_void_dequeue() override {
111 list<T*> *out(new list<T*>);
112 _dequeue(out);
113 if (!out->empty()) {
114 return (void *)out;
115 } else {
116 delete out;
117 return 0;
118 }
119 }
120 void _void_process(void *p, TPHandle &handle) override {
121 _process(*((list<T*>*)p), handle);
122 }
123 void _void_process_finish(void *p) override {
124 _process_finish(*(list<T*>*)p);
125 delete (list<T*> *)p;
126 }
127
128 protected:
129 virtual void _process(const list<T*> &items, TPHandle &handle) = 0;
130
131 public:
132 BatchWorkQueue(string n, time_t ti, time_t sti, ThreadPool* p)
133 : WorkQueue_(std::move(n), ti, sti), pool(p) {
134 pool->add_work_queue(this);
135 }
136 ~BatchWorkQueue() override {
137 pool->remove_work_queue(this);
138 }
139
140 bool queue(T *item) {
141 pool->_lock.Lock();
142 bool r = _enqueue(item);
143 pool->_cond.SignalOne();
144 pool->_lock.Unlock();
145 return r;
146 }
147 void dequeue(T *item) {
148 pool->_lock.Lock();
149 _dequeue(item);
150 pool->_lock.Unlock();
151 }
152 void clear() {
153 pool->_lock.Lock();
154 _clear();
155 pool->_lock.Unlock();
156 }
157
158 void lock() {
159 pool->lock();
160 }
161 void unlock() {
162 pool->unlock();
163 }
164 void wake() {
165 pool->wake();
166 }
167 void _wake() {
168 pool->_wake();
169 }
170 void drain() {
171 pool->drain(this);
172 }
173
174 };
175
176 /** @brief Templated by-value work queue.
177 * Skeleton implementation of a queue that processes items submitted by value.
178 * This is useful if the items are single primitive values or very small objects
179 * (a few bytes). The queue will automatically add itself to the thread pool on
180 * construction and remove itself on destruction. */
181 template<typename T, typename U = T>
182 class WorkQueueVal : public WorkQueue_ {
183 Mutex _lock;
184 ThreadPool *pool;
185 list<U> to_process;
186 list<U> to_finish;
187 virtual void _enqueue(T) = 0;
188 virtual void _enqueue_front(T) = 0;
189 bool _empty() override = 0;
190 virtual U _dequeue() = 0;
191 virtual void _process_finish(U) {}
192
193 void *_void_dequeue() override {
194 {
195 Mutex::Locker l(_lock);
196 if (_empty())
197 return 0;
198 U u = _dequeue();
199 to_process.push_back(u);
200 }
201 return ((void*)1); // Not used
202 }
203 void _void_process(void *, TPHandle &handle) override {
204 _lock.Lock();
205 assert(!to_process.empty());
206 U u = to_process.front();
207 to_process.pop_front();
208 _lock.Unlock();
209
210 _process(u, handle);
211
212 _lock.Lock();
213 to_finish.push_back(u);
214 _lock.Unlock();
215 }
216
217 void _void_process_finish(void *) override {
218 _lock.Lock();
219 assert(!to_finish.empty());
220 U u = to_finish.front();
221 to_finish.pop_front();
222 _lock.Unlock();
223
224 _process_finish(u);
225 }
226
227 void _clear() override {}
228
229 public:
230 WorkQueueVal(string n, time_t ti, time_t sti, ThreadPool *p)
231 : WorkQueue_(std::move(n), ti, sti), _lock("WorkQueueVal::lock"), pool(p) {
232 pool->add_work_queue(this);
233 }
234 ~WorkQueueVal() override {
235 pool->remove_work_queue(this);
236 }
237 void queue(T item) {
238 Mutex::Locker l(pool->_lock);
239 _enqueue(item);
240 pool->_cond.SignalOne();
241 }
242 void queue_front(T item) {
243 Mutex::Locker l(pool->_lock);
244 _enqueue_front(item);
245 pool->_cond.SignalOne();
246 }
247 void drain() {
248 pool->drain(this);
249 }
250 protected:
251 void lock() {
252 pool->lock();
253 }
254 void unlock() {
255 pool->unlock();
256 }
257 virtual void _process(U u, TPHandle &) = 0;
258 };
259
260 /** @brief Template by-pointer work queue.
261 * Skeleton implementation of a queue that processes items of a given type submitted as pointers.
262 * This is useful when the work item are large or include dynamically allocated memory. The queue
263 * will automatically add itself to the thread pool on construction and remove itself on
264 * destruction. */
265 template<class T>
266 class WorkQueue : public WorkQueue_ {
267 ThreadPool *pool;
268
269 /// Add a work item to the queue.
270 virtual bool _enqueue(T *) = 0;
271 /// Dequeue a previously submitted work item.
272 virtual void _dequeue(T *) = 0;
273 /// Dequeue a work item and return the original submitted pointer.
274 virtual T *_dequeue() = 0;
275 virtual void _process_finish(T *) {}
276
277 // implementation of virtual methods from WorkQueue_
278 void *_void_dequeue() override {
279 return (void *)_dequeue();
280 }
281 void _void_process(void *p, TPHandle &handle) override {
282 _process(static_cast<T *>(p), handle);
283 }
284 void _void_process_finish(void *p) override {
285 _process_finish(static_cast<T *>(p));
286 }
287
288 protected:
289 /// Process a work item. Called from the worker threads.
290 virtual void _process(T *t, TPHandle &) = 0;
291
292 public:
293 WorkQueue(string n, time_t ti, time_t sti, ThreadPool* p)
294 : WorkQueue_(std::move(n), ti, sti), pool(p) {
295 pool->add_work_queue(this);
296 }
297 ~WorkQueue() override {
298 pool->remove_work_queue(this);
299 }
300
301 bool queue(T *item) {
302 pool->_lock.Lock();
303 bool r = _enqueue(item);
304 pool->_cond.SignalOne();
305 pool->_lock.Unlock();
306 return r;
307 }
308 void dequeue(T *item) {
309 pool->_lock.Lock();
310 _dequeue(item);
311 pool->_lock.Unlock();
312 }
313 void clear() {
314 pool->_lock.Lock();
315 _clear();
316 pool->_lock.Unlock();
317 }
318
319 Mutex &get_lock() {
320 return pool->_lock;
321 }
322
323 void lock() {
324 pool->lock();
325 }
326 void unlock() {
327 pool->unlock();
328 }
329 /// wake up the thread pool (without lock held)
330 void wake() {
331 pool->wake();
332 }
333 /// wake up the thread pool (with lock already held)
334 void _wake() {
335 pool->_wake();
336 }
337 void _wait() {
338 pool->_wait();
339 }
340 void drain() {
341 pool->drain(this);
342 }
343
344 };
345
346 template<typename T>
347 class PointerWQ : public WorkQueue_ {
348 public:
349 ~PointerWQ() override {
350 m_pool->remove_work_queue(this);
351 assert(m_processing == 0);
352 }
353 void drain() {
354 {
355 // if this queue is empty and not processing, don't wait for other
356 // queues to finish processing
357 Mutex::Locker l(m_pool->_lock);
358 if (m_processing == 0 && m_items.empty()) {
359 return;
360 }
361 }
362 m_pool->drain(this);
363 }
364 void queue(T *item) {
365 Mutex::Locker l(m_pool->_lock);
366 m_items.push_back(item);
367 m_pool->_cond.SignalOne();
368 }
369 bool empty() {
370 Mutex::Locker l(m_pool->_lock);
371 return _empty();
372 }
373 protected:
374 PointerWQ(string n, time_t ti, time_t sti, ThreadPool* p)
375 : WorkQueue_(std::move(n), ti, sti), m_pool(p), m_processing(0) {
376 }
377 void _clear() override {
378 assert(m_pool->_lock.is_locked());
379 m_items.clear();
380 }
381 bool _empty() override {
382 assert(m_pool->_lock.is_locked());
383 return m_items.empty();
384 }
385 void *_void_dequeue() override {
386 assert(m_pool->_lock.is_locked());
387 if (m_items.empty()) {
388 return NULL;
389 }
390
391 ++m_processing;
392 T *item = m_items.front();
393 m_items.pop_front();
394 return item;
395 }
396 void _void_process(void *item, ThreadPool::TPHandle &handle) override {
397 process(reinterpret_cast<T *>(item));
398 }
399 void _void_process_finish(void *item) override {
400 assert(m_pool->_lock.is_locked());
401 assert(m_processing > 0);
402 --m_processing;
403 }
404
405 virtual void process(T *item) = 0;
406 void process_finish() {
407 Mutex::Locker locker(m_pool->_lock);
408 _void_process_finish(nullptr);
409 }
410
411 T *front() {
412 assert(m_pool->_lock.is_locked());
413 if (m_items.empty()) {
414 return NULL;
415 }
416 return m_items.front();
417 }
418 void requeue(T *item) {
419 Mutex::Locker pool_locker(m_pool->_lock);
420 _void_process_finish(nullptr);
421 m_items.push_front(item);
422 }
423 void signal() {
424 Mutex::Locker pool_locker(m_pool->_lock);
425 m_pool->_cond.SignalOne();
426 }
427 Mutex &get_pool_lock() {
428 return m_pool->_lock;
429 }
430 private:
431 ThreadPool *m_pool;
432 std::list<T *> m_items;
433 uint32_t m_processing;
434 };
435private:
436 vector<WorkQueue_*> work_queues;
437 int next_work_queue = 0;
438
439
440 // threads
441 struct WorkThread : public Thread {
442 ThreadPool *pool;
443 // cppcheck-suppress noExplicitConstructor
444 WorkThread(ThreadPool *p) : pool(p) {}
445 void *entry() override {
446 pool->worker(this);
447 return 0;
448 }
449 };
450
451 set<WorkThread*> _threads;
452 list<WorkThread*> _old_threads; ///< need to be joined
453 int processing;
454
455 void start_threads();
456 void join_old_threads();
457 void worker(WorkThread *wt);
458
459public:
460 ThreadPool(CephContext *cct_, string nm, string tn, int n, const char *option = NULL);
461 ~ThreadPool() override;
462
463 /// return number of threads currently running
464 int get_num_threads() {
465 Mutex::Locker l(_lock);
466 return _num_threads;
467 }
468
469 /// assign a work queue to this thread pool
470 void add_work_queue(WorkQueue_* wq) {
471 Mutex::Locker l(_lock);
472 work_queues.push_back(wq);
473 }
474 /// remove a work queue from this thread pool
475 void remove_work_queue(WorkQueue_* wq) {
476 Mutex::Locker l(_lock);
477 unsigned i = 0;
478 while (work_queues[i] != wq)
479 i++;
480 for (i++; i < work_queues.size(); i++)
481 work_queues[i-1] = work_queues[i];
482 assert(i == work_queues.size());
483 work_queues.resize(i-1);
484 }
485
486 /// take thread pool lock
487 void lock() {
488 _lock.Lock();
489 }
490 /// release thread pool lock
491 void unlock() {
492 _lock.Unlock();
493 }
494
495 /// wait for a kick on this thread pool
496 void wait(Cond &c) {
497 c.Wait(_lock);
498 }
499
500 /// wake up a waiter (with lock already held)
501 void _wake() {
502 _cond.Signal();
503 }
504 /// wake up a waiter (without lock held)
505 void wake() {
506 Mutex::Locker l(_lock);
507 _cond.Signal();
508 }
509 void _wait() {
510 _cond.Wait(_lock);
511 }
512
513 /// start thread pool thread
514 void start();
515 /// stop thread pool thread
516 void stop(bool clear_after=true);
517 /// pause thread pool (if it not already paused)
518 void pause();
519 /// pause initiation of new work
520 void pause_new();
521 /// resume work in thread pool. must match each pause() call 1:1 to resume.
522 void unpause();
523 /** @brief Wait until work completes.
524 * If the parameter is NULL, blocks until all threads are idle.
525 * If it is not NULL, blocks until the given work queue does not have
526 * any items left to process. */
527 void drain(WorkQueue_* wq = 0);
528
529 /// set io priority
530 void set_ioprio(int cls, int priority);
531};
532
533class GenContextWQ :
534 public ThreadPool::WorkQueueVal<GenContext<ThreadPool::TPHandle&>*> {
535 list<GenContext<ThreadPool::TPHandle&>*> _queue;
536public:
537 GenContextWQ(const string &name, time_t ti, ThreadPool *tp)
538 : ThreadPool::WorkQueueVal<
539 GenContext<ThreadPool::TPHandle&>*>(name, ti, ti*10, tp) {}
540
541 void _enqueue(GenContext<ThreadPool::TPHandle&> *c) override {
542 _queue.push_back(c);
543 }
544 void _enqueue_front(GenContext<ThreadPool::TPHandle&> *c) override {
545 _queue.push_front(c);
546 }
547 bool _empty() override {
548 return _queue.empty();
549 }
550 GenContext<ThreadPool::TPHandle&> *_dequeue() override {
551 assert(!_queue.empty());
552 GenContext<ThreadPool::TPHandle&> *c = _queue.front();
553 _queue.pop_front();
554 return c;
555 }
556 void _process(GenContext<ThreadPool::TPHandle&> *c,
557 ThreadPool::TPHandle &tp) override {
558 c->complete(tp);
559 }
560};
561
562class C_QueueInWQ : public Context {
563 GenContextWQ *wq;
564 GenContext<ThreadPool::TPHandle&> *c;
565public:
566 C_QueueInWQ(GenContextWQ *wq, GenContext<ThreadPool::TPHandle &> *c)
567 : wq(wq), c(c) {}
568 void finish(int) override {
569 wq->queue(c);
570 }
571};
572
573/// Work queue that asynchronously completes contexts (executes callbacks).
574/// @see Finisher
575class ContextWQ : public ThreadPool::PointerWQ<Context> {
576public:
577 ContextWQ(const string &name, time_t ti, ThreadPool *tp)
578 : ThreadPool::PointerWQ<Context>(name, ti, 0, tp),
579 m_lock("ContextWQ::m_lock") {
580 tp->add_work_queue(this);
581 }
582
583 void queue(Context *ctx, int result = 0) {
584 if (result != 0) {
585 Mutex::Locker locker(m_lock);
586 m_context_results[ctx] = result;
587 }
588 ThreadPool::PointerWQ<Context>::queue(ctx);
589 }
590protected:
591 void _clear() override {
592 ThreadPool::PointerWQ<Context>::_clear();
593
594 Mutex::Locker locker(m_lock);
595 m_context_results.clear();
596 }
597
598 void process(Context *ctx) override {
599 int result = 0;
600 {
601 Mutex::Locker locker(m_lock);
602 ceph::unordered_map<Context *, int>::iterator it =
603 m_context_results.find(ctx);
604 if (it != m_context_results.end()) {
605 result = it->second;
606 m_context_results.erase(it);
607 }
608 }
609 ctx->complete(result);
610 }
611private:
612 Mutex m_lock;
613 ceph::unordered_map<Context*, int> m_context_results;
614};
615
616class ShardedThreadPool {
617
618 CephContext *cct;
619 string name;
620 string thread_name;
621 string lockname;
622 Mutex shardedpool_lock;
623 Cond shardedpool_cond;
624 Cond wait_cond;
625 uint32_t num_threads;
626 atomic_t stop_threads;
627 atomic_t pause_threads;
628 atomic_t drain_threads;
629 uint32_t num_paused;
630 uint32_t num_drained;
631
632public:
633
634 class BaseShardedWQ {
635
636 public:
637 time_t timeout_interval, suicide_interval;
638 BaseShardedWQ(time_t ti, time_t sti):timeout_interval(ti), suicide_interval(sti) {}
639 virtual ~BaseShardedWQ() {}
640
641 virtual void _process(uint32_t thread_index, heartbeat_handle_d *hb ) = 0;
642 virtual void return_waiting_threads() = 0;
643 virtual bool is_shard_empty(uint32_t thread_index) = 0;
644 };
645
646 template <typename T>
647 class ShardedWQ: public BaseShardedWQ {
648
649 ShardedThreadPool* sharded_pool;
650
651 protected:
652 virtual void _enqueue(T) = 0;
653 virtual void _enqueue_front(T) = 0;
654
655
656 public:
657 ShardedWQ(time_t ti, time_t sti, ShardedThreadPool* tp): BaseShardedWQ(ti, sti),
658 sharded_pool(tp) {
659 tp->set_wq(this);
660 }
661 ~ShardedWQ() override {}
662
663 void queue(T item) {
664 _enqueue(item);
665 }
666 void queue_front(T item) {
667 _enqueue_front(item);
668 }
669 void drain() {
670 sharded_pool->drain();
671 }
672
673 };
674
675private:
676
677 BaseShardedWQ* wq;
678 // threads
679 struct WorkThreadSharded : public Thread {
680 ShardedThreadPool *pool;
681 uint32_t thread_index;
682 WorkThreadSharded(ShardedThreadPool *p, uint32_t pthread_index): pool(p),
683 thread_index(pthread_index) {}
684 void *entry() override {
685 pool->shardedthreadpool_worker(thread_index);
686 return 0;
687 }
688 };
689
690 vector<WorkThreadSharded*> threads_shardedpool;
691 void start_threads();
692 void shardedthreadpool_worker(uint32_t thread_index);
693 void set_wq(BaseShardedWQ* swq) {
694 wq = swq;
695 }
696
697
698
699public:
700
701 ShardedThreadPool(CephContext *cct_, string nm, string tn, uint32_t pnum_threads);
702
703 ~ShardedThreadPool(){};
704
705 /// start thread pool thread
706 void start();
707 /// stop thread pool thread
708 void stop();
709 /// pause thread pool (if it not already paused)
710 void pause();
711 /// pause initiation of new work
712 void pause_new();
713 /// resume work in thread pool. must match each pause() call 1:1 to resume.
714 void unpause();
715 /// wait for all work to complete
716 void drain();
717
718};
719
720
721#endif