1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_WORKQUEUE_H
16 #define CEPH_WORKQUEUE_H
19 #include "include/unordered_map.h"
20 #include "common/HeartbeatMap.h"
26 /// Pool of threads that share work submitted to multiple work queues.
27 class ThreadPool
: public md_config_obs_t
{
38 int ioprio_class
, ioprio_priority
;
42 friend class ThreadPool
;
44 heartbeat_handle_d
*hb
;
50 heartbeat_handle_d
*hb
,
53 : cct(cct
), hb(hb
), grace(grace
), suicide_grace(suicide_grace
) {}
54 void reset_tp_timeout();
55 void suspend_tp_timeout();
59 /// Basic interface to a work queue used by the worker threads.
62 time_t timeout_interval
, suicide_interval
;
63 WorkQueue_(string n
, time_t ti
, time_t sti
)
64 : name(std::move(n
)), timeout_interval(ti
), suicide_interval(sti
)
66 virtual ~WorkQueue_() {}
67 /// Remove all work items from the queue.
68 virtual void _clear() = 0;
69 /// Check whether there is anything to do.
70 virtual bool _empty() = 0;
71 /// Get the next work item to process.
72 virtual void *_void_dequeue() = 0;
73 /** @brief Process the work item.
74 * This function will be called several times in parallel
75 * and must therefore be thread-safe. */
76 virtual void _void_process(void *item
, TPHandle
&handle
) = 0;
77 /** @brief Synchronously finish processing a work item.
78 * This function is called after _void_process with the global thread pool lock held,
79 * so at most one copy will execute simultaneously for a given thread pool.
80 * It can be used for non-thread-safe finalization. */
81 virtual void _void_process_finish(void *) = 0;
84 // track thread pool size changes
85 unsigned _num_threads
;
86 string _thread_num_option
;
87 const char **_conf_keys
;
89 const char **get_tracked_conf_keys() const override
{
92 void handle_conf_change(const struct md_config_t
*conf
,
93 const std::set
<std::string
> &changed
) override
;
96 /** @brief Work queue that processes several submitted items at once.
97 * The queue will automatically add itself to the thread pool on construction
98 * and remove itself on destruction. */
100 class BatchWorkQueue
: public WorkQueue_
{
103 virtual bool _enqueue(T
*) = 0;
104 virtual void _dequeue(T
*) = 0;
105 virtual void _dequeue(list
<T
*> *) = 0;
106 virtual void _process_finish(const list
<T
*> &) {}
108 // virtual methods from WorkQueue_ below
109 void *_void_dequeue() override
{
110 list
<T
*> *out(new list
<T
*>);
119 void _void_process(void *p
, TPHandle
&handle
) override
{
120 _process(*((list
<T
*>*)p
), handle
);
122 void _void_process_finish(void *p
) override
{
123 _process_finish(*(list
<T
*>*)p
);
124 delete (list
<T
*> *)p
;
128 virtual void _process(const list
<T
*> &items
, TPHandle
&handle
) = 0;
131 BatchWorkQueue(string n
, time_t ti
, time_t sti
, ThreadPool
* p
)
132 : WorkQueue_(std::move(n
), ti
, sti
), pool(p
) {
133 pool
->add_work_queue(this);
135 ~BatchWorkQueue() override
{
136 pool
->remove_work_queue(this);
139 bool queue(T
*item
) {
141 bool r
= _enqueue(item
);
142 pool
->_cond
.SignalOne();
143 pool
->_lock
.Unlock();
146 void dequeue(T
*item
) {
149 pool
->_lock
.Unlock();
154 pool
->_lock
.Unlock();
175 /** @brief Templated by-value work queue.
176 * Skeleton implementation of a queue that processes items submitted by value.
177 * This is useful if the items are single primitive values or very small objects
178 * (a few bytes). The queue will automatically add itself to the thread pool on
179 * construction and remove itself on destruction. */
180 template<typename T
, typename U
= T
>
181 class WorkQueueVal
: public WorkQueue_
{
186 virtual void _enqueue(T
) = 0;
187 virtual void _enqueue_front(T
) = 0;
188 bool _empty() override
= 0;
189 virtual U
_dequeue() = 0;
190 virtual void _process_finish(U
) {}
192 void *_void_dequeue() override
{
194 Mutex::Locker
l(_lock
);
198 to_process
.push_back(u
);
200 return ((void*)1); // Not used
202 void _void_process(void *, TPHandle
&handle
) override
{
204 assert(!to_process
.empty());
205 U u
= to_process
.front();
206 to_process
.pop_front();
212 to_finish
.push_back(u
);
216 void _void_process_finish(void *) override
{
218 assert(!to_finish
.empty());
219 U u
= to_finish
.front();
220 to_finish
.pop_front();
226 void _clear() override
{}
229 WorkQueueVal(string n
, time_t ti
, time_t sti
, ThreadPool
*p
)
230 : WorkQueue_(std::move(n
), ti
, sti
), _lock("WorkQueueVal::lock"), pool(p
) {
231 pool
->add_work_queue(this);
233 ~WorkQueueVal() override
{
234 pool
->remove_work_queue(this);
237 Mutex::Locker
l(pool
->_lock
);
239 pool
->_cond
.SignalOne();
241 void queue_front(T item
) {
242 Mutex::Locker
l(pool
->_lock
);
243 _enqueue_front(item
);
244 pool
->_cond
.SignalOne();
256 virtual void _process(U u
, TPHandle
&) = 0;
259 /** @brief Template by-pointer work queue.
260 * Skeleton implementation of a queue that processes items of a given type submitted as pointers.
261 * This is useful when the work item are large or include dynamically allocated memory. The queue
262 * will automatically add itself to the thread pool on construction and remove itself on
265 class WorkQueue
: public WorkQueue_
{
268 /// Add a work item to the queue.
269 virtual bool _enqueue(T
*) = 0;
270 /// Dequeue a previously submitted work item.
271 virtual void _dequeue(T
*) = 0;
272 /// Dequeue a work item and return the original submitted pointer.
273 virtual T
*_dequeue() = 0;
274 virtual void _process_finish(T
*) {}
276 // implementation of virtual methods from WorkQueue_
277 void *_void_dequeue() override
{
278 return (void *)_dequeue();
280 void _void_process(void *p
, TPHandle
&handle
) override
{
281 _process(static_cast<T
*>(p
), handle
);
283 void _void_process_finish(void *p
) override
{
284 _process_finish(static_cast<T
*>(p
));
288 /// Process a work item. Called from the worker threads.
289 virtual void _process(T
*t
, TPHandle
&) = 0;
292 WorkQueue(string n
, time_t ti
, time_t sti
, ThreadPool
* p
)
293 : WorkQueue_(std::move(n
), ti
, sti
), pool(p
) {
294 pool
->add_work_queue(this);
296 ~WorkQueue() override
{
297 pool
->remove_work_queue(this);
300 bool queue(T
*item
) {
302 bool r
= _enqueue(item
);
303 pool
->_cond
.SignalOne();
304 pool
->_lock
.Unlock();
307 void dequeue(T
*item
) {
310 pool
->_lock
.Unlock();
315 pool
->_lock
.Unlock();
328 /// wake up the thread pool (without lock held)
332 /// wake up the thread pool (with lock already held)
346 class PointerWQ
: public WorkQueue_
{
348 ~PointerWQ() override
{
349 m_pool
->remove_work_queue(this);
350 assert(m_processing
== 0);
354 // if this queue is empty and not processing, don't wait for other
355 // queues to finish processing
356 Mutex::Locker
l(m_pool
->_lock
);
357 if (m_processing
== 0 && m_items
.empty()) {
363 void queue(T
*item
) {
364 Mutex::Locker
l(m_pool
->_lock
);
365 m_items
.push_back(item
);
366 m_pool
->_cond
.SignalOne();
369 Mutex::Locker
l(m_pool
->_lock
);
373 PointerWQ(string n
, time_t ti
, time_t sti
, ThreadPool
* p
)
374 : WorkQueue_(std::move(n
), ti
, sti
), m_pool(p
), m_processing(0) {
376 void _clear() override
{
377 assert(m_pool
->_lock
.is_locked());
380 bool _empty() override
{
381 assert(m_pool
->_lock
.is_locked());
382 return m_items
.empty();
384 void *_void_dequeue() override
{
385 assert(m_pool
->_lock
.is_locked());
386 if (m_items
.empty()) {
391 T
*item
= m_items
.front();
395 void _void_process(void *item
, ThreadPool::TPHandle
&handle
) override
{
396 process(reinterpret_cast<T
*>(item
));
398 void _void_process_finish(void *item
) override
{
399 assert(m_pool
->_lock
.is_locked());
400 assert(m_processing
> 0);
404 virtual void process(T
*item
) = 0;
405 void process_finish() {
406 Mutex::Locker
locker(m_pool
->_lock
);
407 _void_process_finish(nullptr);
411 assert(m_pool
->_lock
.is_locked());
412 if (m_items
.empty()) {
415 return m_items
.front();
417 void requeue(T
*item
) {
418 Mutex::Locker
pool_locker(m_pool
->_lock
);
419 _void_process_finish(nullptr);
420 m_items
.push_front(item
);
423 Mutex::Locker
pool_locker(m_pool
->_lock
);
424 m_pool
->_cond
.SignalOne();
426 Mutex
&get_pool_lock() {
427 return m_pool
->_lock
;
431 std::list
<T
*> m_items
;
432 uint32_t m_processing
;
435 vector
<WorkQueue_
*> work_queues
;
436 int next_work_queue
= 0;
440 struct WorkThread
: public Thread
{
442 // cppcheck-suppress noExplicitConstructor
443 WorkThread(ThreadPool
*p
) : pool(p
) {}
444 void *entry() override
{
450 set
<WorkThread
*> _threads
;
451 list
<WorkThread
*> _old_threads
; ///< need to be joined
454 void start_threads();
455 void join_old_threads();
456 void worker(WorkThread
*wt
);
459 ThreadPool(CephContext
*cct_
, string nm
, string tn
, int n
, const char *option
= NULL
);
460 ~ThreadPool() override
;
462 /// return number of threads currently running
463 int get_num_threads() {
464 Mutex::Locker
l(_lock
);
468 /// assign a work queue to this thread pool
469 void add_work_queue(WorkQueue_
* wq
) {
470 Mutex::Locker
l(_lock
);
471 work_queues
.push_back(wq
);
473 /// remove a work queue from this thread pool
474 void remove_work_queue(WorkQueue_
* wq
) {
475 Mutex::Locker
l(_lock
);
477 while (work_queues
[i
] != wq
)
479 for (i
++; i
< work_queues
.size(); i
++)
480 work_queues
[i
-1] = work_queues
[i
];
481 assert(i
== work_queues
.size());
482 work_queues
.resize(i
-1);
485 /// take thread pool lock
489 /// release thread pool lock
494 /// wait for a kick on this thread pool
499 /// wake up a waiter (with lock already held)
503 /// wake up a waiter (without lock held)
505 Mutex::Locker
l(_lock
);
512 /// start thread pool thread
514 /// stop thread pool thread
515 void stop(bool clear_after
=true);
516 /// pause thread pool (if it not already paused)
518 /// pause initiation of new work
520 /// resume work in thread pool. must match each pause() call 1:1 to resume.
522 /** @brief Wait until work completes.
523 * If the parameter is NULL, blocks until all threads are idle.
524 * If it is not NULL, blocks until the given work queue does not have
525 * any items left to process. */
526 void drain(WorkQueue_
* wq
= 0);
529 void set_ioprio(int cls
, int priority
);
533 public ThreadPool::WorkQueueVal
<GenContext
<ThreadPool::TPHandle
&>*> {
534 list
<GenContext
<ThreadPool::TPHandle
&>*> _queue
;
536 GenContextWQ(const string
&name
, time_t ti
, ThreadPool
*tp
)
537 : ThreadPool::WorkQueueVal
<
538 GenContext
<ThreadPool::TPHandle
&>*>(name
, ti
, ti
*10, tp
) {}
540 void _enqueue(GenContext
<ThreadPool::TPHandle
&> *c
) override
{
543 void _enqueue_front(GenContext
<ThreadPool::TPHandle
&> *c
) override
{
544 _queue
.push_front(c
);
546 bool _empty() override
{
547 return _queue
.empty();
549 GenContext
<ThreadPool::TPHandle
&> *_dequeue() override
{
550 assert(!_queue
.empty());
551 GenContext
<ThreadPool::TPHandle
&> *c
= _queue
.front();
555 void _process(GenContext
<ThreadPool::TPHandle
&> *c
,
556 ThreadPool::TPHandle
&tp
) override
{
561 class C_QueueInWQ
: public Context
{
563 GenContext
<ThreadPool::TPHandle
&> *c
;
565 C_QueueInWQ(GenContextWQ
*wq
, GenContext
<ThreadPool::TPHandle
&> *c
)
567 void finish(int) override
{
572 /// Work queue that asynchronously completes contexts (executes callbacks).
574 class ContextWQ
: public ThreadPool::PointerWQ
<Context
> {
576 ContextWQ(const string
&name
, time_t ti
, ThreadPool
*tp
)
577 : ThreadPool::PointerWQ
<Context
>(name
, ti
, 0, tp
),
578 m_lock("ContextWQ::m_lock") {
579 tp
->add_work_queue(this);
582 void queue(Context
*ctx
, int result
= 0) {
584 Mutex::Locker
locker(m_lock
);
585 m_context_results
[ctx
] = result
;
587 ThreadPool::PointerWQ
<Context
>::queue(ctx
);
590 void _clear() override
{
591 ThreadPool::PointerWQ
<Context
>::_clear();
593 Mutex::Locker
locker(m_lock
);
594 m_context_results
.clear();
597 void process(Context
*ctx
) override
{
600 Mutex::Locker
locker(m_lock
);
601 ceph::unordered_map
<Context
*, int>::iterator it
=
602 m_context_results
.find(ctx
);
603 if (it
!= m_context_results
.end()) {
605 m_context_results
.erase(it
);
608 ctx
->complete(result
);
612 ceph::unordered_map
<Context
*, int> m_context_results
;
615 class ShardedThreadPool
{
621 Mutex shardedpool_lock
;
622 Cond shardedpool_cond
;
624 uint32_t num_threads
;
626 std::atomic
<bool> stop_threads
= { false };
627 std::atomic
<bool> pause_threads
= { false };
628 std::atomic
<bool> drain_threads
= { false };
631 uint32_t num_drained
;
635 class BaseShardedWQ
{
638 time_t timeout_interval
, suicide_interval
;
639 BaseShardedWQ(time_t ti
, time_t sti
):timeout_interval(ti
), suicide_interval(sti
) {}
640 virtual ~BaseShardedWQ() {}
642 virtual void _process(uint32_t thread_index
, heartbeat_handle_d
*hb
) = 0;
643 virtual void return_waiting_threads() = 0;
644 virtual bool is_shard_empty(uint32_t thread_index
) = 0;
647 template <typename T
>
648 class ShardedWQ
: public BaseShardedWQ
{
650 ShardedThreadPool
* sharded_pool
;
653 virtual void _enqueue(T
) = 0;
654 virtual void _enqueue_front(T
) = 0;
658 ShardedWQ(time_t ti
, time_t sti
, ShardedThreadPool
* tp
): BaseShardedWQ(ti
, sti
),
662 ~ShardedWQ() override
{}
667 void queue_front(T item
) {
668 _enqueue_front(item
);
671 sharded_pool
->drain();
680 struct WorkThreadSharded
: public Thread
{
681 ShardedThreadPool
*pool
;
682 uint32_t thread_index
;
683 WorkThreadSharded(ShardedThreadPool
*p
, uint32_t pthread_index
): pool(p
),
684 thread_index(pthread_index
) {}
685 void *entry() override
{
686 pool
->shardedthreadpool_worker(thread_index
);
691 vector
<WorkThreadSharded
*> threads_shardedpool
;
692 void start_threads();
693 void shardedthreadpool_worker(uint32_t thread_index
);
694 void set_wq(BaseShardedWQ
* swq
) {
702 ShardedThreadPool(CephContext
*cct_
, string nm
, string tn
, uint32_t pnum_threads
);
704 ~ShardedThreadPool(){};
706 /// start thread pool thread
708 /// stop thread pool thread
710 /// pause thread pool (if it not already paused)
712 /// pause initiation of new work
714 /// resume work in thread pool. must match each pause() call 1:1 to resume.
716 /// wait for all work to complete