1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_WORKQUEUE_H
16 #define CEPH_WORKQUEUE_H
19 #include "include/unordered_map.h"
20 #include "common/HeartbeatMap.h"
26 /// Pool of threads that share work submitted to multiple work queues.
27 class ThreadPool
: public md_config_obs_t
{
38 int ioprio_class
, ioprio_priority
;
42 friend class ThreadPool
;
44 heartbeat_handle_d
*hb
;
50 heartbeat_handle_d
*hb
,
53 : cct(cct
), hb(hb
), grace(grace
), suicide_grace(suicide_grace
) {}
54 void reset_tp_timeout();
55 void suspend_tp_timeout();
59 /// Basic interface to a work queue used by the worker threads.
62 time_t timeout_interval
, suicide_interval
;
63 WorkQueue_(string n
, time_t ti
, time_t sti
)
64 : name(std::move(n
)), timeout_interval(ti
), suicide_interval(sti
)
66 virtual ~WorkQueue_() {}
67 /// Remove all work items from the queue.
68 virtual void _clear() = 0;
69 /// Check whether there is anything to do.
70 virtual bool _empty() = 0;
71 /// Get the next work item to process.
72 virtual void *_void_dequeue() = 0;
73 /** @brief Process the work item.
74 * This function will be called several times in parallel
75 * and must therefore be thread-safe. */
76 virtual void _void_process(void *item
, TPHandle
&handle
) = 0;
77 /** @brief Synchronously finish processing a work item.
78 * This function is called after _void_process with the global thread pool lock held,
79 * so at most one copy will execute simultaneously for a given thread pool.
80 * It can be used for non-thread-safe finalization. */
81 virtual void _void_process_finish(void *) = 0;
84 // track thread pool size changes
85 unsigned _num_threads
;
86 string _thread_num_option
;
87 const char **_conf_keys
;
89 const char **get_tracked_conf_keys() const override
{
92 void handle_conf_change(const struct md_config_t
*conf
,
93 const std::set
<std::string
> &changed
) override
;
96 /** @brief Work queue that processes several submitted items at once.
97 * The queue will automatically add itself to the thread pool on construction
98 * and remove itself on destruction. */
100 class BatchWorkQueue
: public WorkQueue_
{
103 virtual bool _enqueue(T
*) = 0;
104 virtual void _dequeue(T
*) = 0;
105 virtual void _dequeue(list
<T
*> *) = 0;
106 virtual void _process_finish(const list
<T
*> &) {}
108 // virtual methods from WorkQueue_ below
109 void *_void_dequeue() override
{
110 list
<T
*> *out(new list
<T
*>);
119 void _void_process(void *p
, TPHandle
&handle
) override
{
120 _process(*((list
<T
*>*)p
), handle
);
122 void _void_process_finish(void *p
) override
{
123 _process_finish(*(list
<T
*>*)p
);
124 delete (list
<T
*> *)p
;
128 virtual void _process(const list
<T
*> &items
, TPHandle
&handle
) = 0;
131 BatchWorkQueue(string n
, time_t ti
, time_t sti
, ThreadPool
* p
)
132 : WorkQueue_(std::move(n
), ti
, sti
), pool(p
) {
133 pool
->add_work_queue(this);
135 ~BatchWorkQueue() override
{
136 pool
->remove_work_queue(this);
139 bool queue(T
*item
) {
141 bool r
= _enqueue(item
);
142 pool
->_cond
.SignalOne();
143 pool
->_lock
.Unlock();
146 void dequeue(T
*item
) {
149 pool
->_lock
.Unlock();
154 pool
->_lock
.Unlock();
175 /** @brief Templated by-value work queue.
176 * Skeleton implementation of a queue that processes items submitted by value.
177 * This is useful if the items are single primitive values or very small objects
178 * (a few bytes). The queue will automatically add itself to the thread pool on
179 * construction and remove itself on destruction. */
180 template<typename T
, typename U
= T
>
181 class WorkQueueVal
: public WorkQueue_
{
186 virtual void _enqueue(T
) = 0;
187 virtual void _enqueue_front(T
) = 0;
188 bool _empty() override
= 0;
189 virtual U
_dequeue() = 0;
190 virtual void _process_finish(U
) {}
192 void *_void_dequeue() override
{
194 Mutex::Locker
l(_lock
);
198 to_process
.push_back(u
);
200 return ((void*)1); // Not used
202 void _void_process(void *, TPHandle
&handle
) override
{
204 assert(!to_process
.empty());
205 U u
= to_process
.front();
206 to_process
.pop_front();
212 to_finish
.push_back(u
);
216 void _void_process_finish(void *) override
{
218 assert(!to_finish
.empty());
219 U u
= to_finish
.front();
220 to_finish
.pop_front();
226 void _clear() override
{}
229 WorkQueueVal(string n
, time_t ti
, time_t sti
, ThreadPool
*p
)
230 : WorkQueue_(std::move(n
), ti
, sti
), _lock("WorkQueueVal::lock"), pool(p
) {
231 pool
->add_work_queue(this);
233 ~WorkQueueVal() override
{
234 pool
->remove_work_queue(this);
237 Mutex::Locker
l(pool
->_lock
);
239 pool
->_cond
.SignalOne();
241 void queue_front(T item
) {
242 Mutex::Locker
l(pool
->_lock
);
243 _enqueue_front(item
);
244 pool
->_cond
.SignalOne();
256 virtual void _process(U u
, TPHandle
&) = 0;
259 /** @brief Template by-pointer work queue.
260 * Skeleton implementation of a queue that processes items of a given type submitted as pointers.
261 * This is useful when the work item are large or include dynamically allocated memory. The queue
262 * will automatically add itself to the thread pool on construction and remove itself on
265 class WorkQueue
: public WorkQueue_
{
268 /// Add a work item to the queue.
269 virtual bool _enqueue(T
*) = 0;
270 /// Dequeue a previously submitted work item.
271 virtual void _dequeue(T
*) = 0;
272 /// Dequeue a work item and return the original submitted pointer.
273 virtual T
*_dequeue() = 0;
274 virtual void _process_finish(T
*) {}
276 // implementation of virtual methods from WorkQueue_
277 void *_void_dequeue() override
{
278 return (void *)_dequeue();
280 void _void_process(void *p
, TPHandle
&handle
) override
{
281 _process(static_cast<T
*>(p
), handle
);
283 void _void_process_finish(void *p
) override
{
284 _process_finish(static_cast<T
*>(p
));
288 /// Process a work item. Called from the worker threads.
289 virtual void _process(T
*t
, TPHandle
&) = 0;
292 WorkQueue(string n
, time_t ti
, time_t sti
, ThreadPool
* p
)
293 : WorkQueue_(std::move(n
), ti
, sti
), pool(p
) {
294 pool
->add_work_queue(this);
296 ~WorkQueue() override
{
297 pool
->remove_work_queue(this);
300 bool queue(T
*item
) {
302 bool r
= _enqueue(item
);
303 pool
->_cond
.SignalOne();
304 pool
->_lock
.Unlock();
307 void dequeue(T
*item
) {
310 pool
->_lock
.Unlock();
315 pool
->_lock
.Unlock();
328 /// wake up the thread pool (without lock held)
332 /// wake up the thread pool (with lock already held)
346 class PointerWQ
: public WorkQueue_
{
348 ~PointerWQ() override
{
349 m_pool
->remove_work_queue(this);
350 assert(m_processing
== 0);
354 // if this queue is empty and not processing, don't wait for other
355 // queues to finish processing
356 Mutex::Locker
l(m_pool
->_lock
);
357 if (m_processing
== 0 && m_items
.empty()) {
363 void queue(T
*item
) {
364 Mutex::Locker
l(m_pool
->_lock
);
365 m_items
.push_back(item
);
366 m_pool
->_cond
.SignalOne();
369 Mutex::Locker
l(m_pool
->_lock
);
373 PointerWQ(string n
, time_t ti
, time_t sti
, ThreadPool
* p
)
374 : WorkQueue_(std::move(n
), ti
, sti
), m_pool(p
), m_processing(0) {
376 void register_work_queue() {
377 m_pool
->add_work_queue(this);
379 void _clear() override
{
380 assert(m_pool
->_lock
.is_locked());
383 bool _empty() override
{
384 assert(m_pool
->_lock
.is_locked());
385 return m_items
.empty();
387 void *_void_dequeue() override
{
388 assert(m_pool
->_lock
.is_locked());
389 if (m_items
.empty()) {
394 T
*item
= m_items
.front();
398 void _void_process(void *item
, ThreadPool::TPHandle
&handle
) override
{
399 process(reinterpret_cast<T
*>(item
));
401 void _void_process_finish(void *item
) override
{
402 assert(m_pool
->_lock
.is_locked());
403 assert(m_processing
> 0);
407 virtual void process(T
*item
) = 0;
408 void process_finish() {
409 Mutex::Locker
locker(m_pool
->_lock
);
410 _void_process_finish(nullptr);
414 assert(m_pool
->_lock
.is_locked());
415 if (m_items
.empty()) {
418 return m_items
.front();
420 void requeue(T
*item
) {
421 Mutex::Locker
pool_locker(m_pool
->_lock
);
422 _void_process_finish(nullptr);
423 m_items
.push_front(item
);
426 Mutex::Locker
pool_locker(m_pool
->_lock
);
427 m_pool
->_cond
.SignalOne();
429 Mutex
&get_pool_lock() {
430 return m_pool
->_lock
;
434 std::list
<T
*> m_items
;
435 uint32_t m_processing
;
438 vector
<WorkQueue_
*> work_queues
;
439 int next_work_queue
= 0;
443 struct WorkThread
: public Thread
{
445 // cppcheck-suppress noExplicitConstructor
446 WorkThread(ThreadPool
*p
) : pool(p
) {}
447 void *entry() override
{
453 set
<WorkThread
*> _threads
;
454 list
<WorkThread
*> _old_threads
; ///< need to be joined
457 void start_threads();
458 void join_old_threads();
459 void worker(WorkThread
*wt
);
462 ThreadPool(CephContext
*cct_
, string nm
, string tn
, int n
, const char *option
= NULL
);
463 ~ThreadPool() override
;
465 /// return number of threads currently running
466 int get_num_threads() {
467 Mutex::Locker
l(_lock
);
471 /// assign a work queue to this thread pool
472 void add_work_queue(WorkQueue_
* wq
) {
473 Mutex::Locker
l(_lock
);
474 work_queues
.push_back(wq
);
476 /// remove a work queue from this thread pool
477 void remove_work_queue(WorkQueue_
* wq
) {
478 Mutex::Locker
l(_lock
);
480 while (work_queues
[i
] != wq
)
482 for (i
++; i
< work_queues
.size(); i
++)
483 work_queues
[i
-1] = work_queues
[i
];
484 assert(i
== work_queues
.size());
485 work_queues
.resize(i
-1);
488 /// take thread pool lock
492 /// release thread pool lock
497 /// wait for a kick on this thread pool
502 /// wake up a waiter (with lock already held)
506 /// wake up a waiter (without lock held)
508 Mutex::Locker
l(_lock
);
515 /// start thread pool thread
517 /// stop thread pool thread
518 void stop(bool clear_after
=true);
519 /// pause thread pool (if it not already paused)
521 /// pause initiation of new work
523 /// resume work in thread pool. must match each pause() call 1:1 to resume.
525 /** @brief Wait until work completes.
526 * If the parameter is NULL, blocks until all threads are idle.
527 * If it is not NULL, blocks until the given work queue does not have
528 * any items left to process. */
529 void drain(WorkQueue_
* wq
= 0);
532 void set_ioprio(int cls
, int priority
);
536 public ThreadPool::WorkQueueVal
<GenContext
<ThreadPool::TPHandle
&>*> {
537 list
<GenContext
<ThreadPool::TPHandle
&>*> _queue
;
539 GenContextWQ(const string
&name
, time_t ti
, ThreadPool
*tp
)
540 : ThreadPool::WorkQueueVal
<
541 GenContext
<ThreadPool::TPHandle
&>*>(name
, ti
, ti
*10, tp
) {}
543 void _enqueue(GenContext
<ThreadPool::TPHandle
&> *c
) override
{
546 void _enqueue_front(GenContext
<ThreadPool::TPHandle
&> *c
) override
{
547 _queue
.push_front(c
);
549 bool _empty() override
{
550 return _queue
.empty();
552 GenContext
<ThreadPool::TPHandle
&> *_dequeue() override
{
553 assert(!_queue
.empty());
554 GenContext
<ThreadPool::TPHandle
&> *c
= _queue
.front();
558 void _process(GenContext
<ThreadPool::TPHandle
&> *c
,
559 ThreadPool::TPHandle
&tp
) override
{
564 class C_QueueInWQ
: public Context
{
566 GenContext
<ThreadPool::TPHandle
&> *c
;
568 C_QueueInWQ(GenContextWQ
*wq
, GenContext
<ThreadPool::TPHandle
&> *c
)
570 void finish(int) override
{
575 /// Work queue that asynchronously completes contexts (executes callbacks).
577 class ContextWQ
: public ThreadPool::PointerWQ
<Context
> {
579 ContextWQ(const string
&name
, time_t ti
, ThreadPool
*tp
)
580 : ThreadPool::PointerWQ
<Context
>(name
, ti
, 0, tp
),
581 m_lock("ContextWQ::m_lock") {
582 this->register_work_queue();
585 void queue(Context
*ctx
, int result
= 0) {
587 Mutex::Locker
locker(m_lock
);
588 m_context_results
[ctx
] = result
;
590 ThreadPool::PointerWQ
<Context
>::queue(ctx
);
593 void _clear() override
{
594 ThreadPool::PointerWQ
<Context
>::_clear();
596 Mutex::Locker
locker(m_lock
);
597 m_context_results
.clear();
600 void process(Context
*ctx
) override
{
603 Mutex::Locker
locker(m_lock
);
604 ceph::unordered_map
<Context
*, int>::iterator it
=
605 m_context_results
.find(ctx
);
606 if (it
!= m_context_results
.end()) {
608 m_context_results
.erase(it
);
611 ctx
->complete(result
);
615 ceph::unordered_map
<Context
*, int> m_context_results
;
618 class ShardedThreadPool
{
624 Mutex shardedpool_lock
;
625 Cond shardedpool_cond
;
627 uint32_t num_threads
;
629 std::atomic
<bool> stop_threads
= { false };
630 std::atomic
<bool> pause_threads
= { false };
631 std::atomic
<bool> drain_threads
= { false };
634 uint32_t num_drained
;
638 class BaseShardedWQ
{
641 time_t timeout_interval
, suicide_interval
;
642 BaseShardedWQ(time_t ti
, time_t sti
):timeout_interval(ti
), suicide_interval(sti
) {}
643 virtual ~BaseShardedWQ() {}
645 virtual void _process(uint32_t thread_index
, heartbeat_handle_d
*hb
) = 0;
646 virtual void return_waiting_threads() = 0;
647 virtual bool is_shard_empty(uint32_t thread_index
) = 0;
650 template <typename T
>
651 class ShardedWQ
: public BaseShardedWQ
{
653 ShardedThreadPool
* sharded_pool
;
656 virtual void _enqueue(T
) = 0;
657 virtual void _enqueue_front(T
) = 0;
661 ShardedWQ(time_t ti
, time_t sti
, ShardedThreadPool
* tp
): BaseShardedWQ(ti
, sti
),
665 ~ShardedWQ() override
{}
670 void queue_front(T item
) {
671 _enqueue_front(item
);
674 sharded_pool
->drain();
683 struct WorkThreadSharded
: public Thread
{
684 ShardedThreadPool
*pool
;
685 uint32_t thread_index
;
686 WorkThreadSharded(ShardedThreadPool
*p
, uint32_t pthread_index
): pool(p
),
687 thread_index(pthread_index
) {}
688 void *entry() override
{
689 pool
->shardedthreadpool_worker(thread_index
);
694 vector
<WorkThreadSharded
*> threads_shardedpool
;
695 void start_threads();
696 void shardedthreadpool_worker(uint32_t thread_index
);
697 void set_wq(BaseShardedWQ
* swq
) {
705 ShardedThreadPool(CephContext
*cct_
, string nm
, string tn
, uint32_t pnum_threads
);
707 ~ShardedThreadPool(){};
709 /// start thread pool thread
711 /// stop thread pool thread
713 /// pause thread pool (if it not already paused)
715 /// pause initiation of new work
717 /// resume work in thread pool. must match each pause() call 1:1 to resume.
719 /// wait for all work to complete