1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_WORKQUEUE_H
16 #define CEPH_WORKQUEUE_H
18 #if defined(WITH_SEASTAR) && !defined(WITH_ALIEN)
33 #include "common/ceph_mutex.h"
34 #include "include/unordered_map.h"
35 #include "common/config_obs.h"
36 #include "common/HeartbeatMap.h"
37 #include "common/Thread.h"
38 #include "include/common_fwd.h"
39 #include "include/Context.h"
40 #include "common/HBHandle.h"
43 /// Pool of threads that share work submitted to multiple work queues.
44 class ThreadPool
: public md_config_obs_t
{
48 std::string thread_name
;
51 ceph::condition_variable _cond
;
55 ceph::condition_variable _wait_cond
;
58 class TPHandle
: public HBHandle
{
59 friend class ThreadPool
;
61 ceph::heartbeat_handle_d
*hb
;
63 ceph::timespan suicide_grace
;
67 ceph::heartbeat_handle_d
*hb
,
69 ceph::timespan suicide_grace
)
70 : cct(cct
), hb(hb
), grace(grace
), suicide_grace(suicide_grace
) {}
71 void reset_tp_timeout() override final
;
72 void suspend_tp_timeout() override final
;
76 /// Basic interface to a work queue used by the worker threads.
79 ceph::timespan timeout_interval
;
80 ceph::timespan suicide_interval
;
81 WorkQueue_(std::string n
, ceph::timespan ti
, ceph::timespan sti
)
82 : name(std::move(n
)), timeout_interval(ti
), suicide_interval(sti
)
84 virtual ~WorkQueue_() {}
85 /// Remove all work items from the queue.
86 virtual void _clear() = 0;
87 /// Check whether there is anything to do.
88 virtual bool _empty() = 0;
89 /// Get the next work item to process.
90 virtual void *_void_dequeue() = 0;
91 /** @brief Process the work item.
92 * This function will be called several times in parallel
93 * and must therefore be thread-safe. */
94 virtual void _void_process(void *item
, TPHandle
&handle
) = 0;
95 /** @brief Synchronously finish processing a work item.
96 * This function is called after _void_process with the global thread pool lock held,
97 * so at most one copy will execute simultaneously for a given thread pool.
98 * It can be used for non-thread-safe finalization. */
99 virtual void _void_process_finish(void *) = 0;
102 // track thread pool size changes
103 unsigned _num_threads
;
104 std::string _thread_num_option
;
105 const char **_conf_keys
;
107 const char **get_tracked_conf_keys() const override
{
110 void handle_conf_change(const ConfigProxy
& conf
,
111 const std::set
<std::string
> &changed
) override
;
114 /** @brief Templated by-value work queue.
115 * Skeleton implementation of a queue that processes items submitted by value.
116 * This is useful if the items are single primitive values or very small objects
117 * (a few bytes). The queue will automatically add itself to the thread pool on
118 * construction and remove itself on destruction. */
119 template<typename T
, typename U
= T
>
120 class WorkQueueVal
: public WorkQueue_
{
121 ceph::mutex _lock
= ceph::make_mutex("WorkQueueVal::_lock");
123 std::list
<U
> to_process
;
124 std::list
<U
> to_finish
;
125 virtual void _enqueue(T
) = 0;
126 virtual void _enqueue_front(T
) = 0;
127 bool _empty() override
= 0;
128 virtual U
_dequeue() = 0;
129 virtual void _process_finish(U
) {}
131 void *_void_dequeue() override
{
133 std::lock_guard
l(_lock
);
137 to_process
.push_back(u
);
139 return ((void*)1); // Not used
141 void _void_process(void *, TPHandle
&handle
) override
{
143 ceph_assert(!to_process
.empty());
144 U u
= to_process
.front();
145 to_process
.pop_front();
151 to_finish
.push_back(u
);
155 void _void_process_finish(void *) override
{
157 ceph_assert(!to_finish
.empty());
158 U u
= to_finish
.front();
159 to_finish
.pop_front();
165 void _clear() override
{}
168 WorkQueueVal(std::string n
,
172 : WorkQueue_(std::move(n
), ti
, sti
), pool(p
) {
173 pool
->add_work_queue(this);
175 ~WorkQueueVal() override
{
176 pool
->remove_work_queue(this);
179 std::lock_guard
l(pool
->_lock
);
181 pool
->_cond
.notify_one();
183 void queue_front(T item
) {
184 std::lock_guard
l(pool
->_lock
);
185 _enqueue_front(item
);
186 pool
->_cond
.notify_one();
198 virtual void _process(U u
, TPHandle
&) = 0;
201 /** @brief Template by-pointer work queue.
202 * Skeleton implementation of a queue that processes items of a given type submitted as pointers.
203 * This is useful when the work item are large or include dynamically allocated memory. The queue
204 * will automatically add itself to the thread pool on construction and remove itself on
207 class WorkQueue
: public WorkQueue_
{
210 /// Add a work item to the queue.
211 virtual bool _enqueue(T
*) = 0;
212 /// Dequeue a previously submitted work item.
213 virtual void _dequeue(T
*) = 0;
214 /// Dequeue a work item and return the original submitted pointer.
215 virtual T
*_dequeue() = 0;
216 virtual void _process_finish(T
*) {}
218 // implementation of virtual methods from WorkQueue_
219 void *_void_dequeue() override
{
220 return (void *)_dequeue();
222 void _void_process(void *p
, TPHandle
&handle
) override
{
223 _process(static_cast<T
*>(p
), handle
);
225 void _void_process_finish(void *p
) override
{
226 _process_finish(static_cast<T
*>(p
));
230 /// Process a work item. Called from the worker threads.
231 virtual void _process(T
*t
, TPHandle
&) = 0;
234 WorkQueue(std::string n
,
235 ceph::timespan ti
, ceph::timespan sti
,
237 : WorkQueue_(std::move(n
), ti
, sti
), pool(p
) {
238 pool
->add_work_queue(this);
240 ~WorkQueue() override
{
241 pool
->remove_work_queue(this);
244 bool queue(T
*item
) {
246 bool r
= _enqueue(item
);
247 pool
->_cond
.notify_one();
248 pool
->_lock
.unlock();
251 void dequeue(T
*item
) {
254 pool
->_lock
.unlock();
259 pool
->_lock
.unlock();
268 /// wake up the thread pool (without lock held)
272 /// wake up the thread pool (with lock already held)
286 class PointerWQ
: public WorkQueue_
{
288 ~PointerWQ() override
{
289 m_pool
->remove_work_queue(this);
290 ceph_assert(m_processing
== 0);
294 // if this queue is empty and not processing, don't wait for other
295 // queues to finish processing
296 std::lock_guard
l(m_pool
->_lock
);
297 if (m_processing
== 0 && m_items
.empty()) {
303 void queue(T
*item
) {
304 std::lock_guard
l(m_pool
->_lock
);
305 m_items
.push_back(item
);
306 m_pool
->_cond
.notify_one();
309 std::lock_guard
l(m_pool
->_lock
);
313 PointerWQ(std::string n
,
314 ceph::timespan ti
, ceph::timespan sti
,
316 : WorkQueue_(std::move(n
), ti
, sti
), m_pool(p
), m_processing(0) {
318 void register_work_queue() {
319 m_pool
->add_work_queue(this);
321 void _clear() override
{
322 ceph_assert(ceph_mutex_is_locked(m_pool
->_lock
));
325 bool _empty() override
{
326 ceph_assert(ceph_mutex_is_locked(m_pool
->_lock
));
327 return m_items
.empty();
329 void *_void_dequeue() override
{
330 ceph_assert(ceph_mutex_is_locked(m_pool
->_lock
));
331 if (m_items
.empty()) {
336 T
*item
= m_items
.front();
340 void _void_process(void *item
, ThreadPool::TPHandle
&handle
) override
{
341 process(reinterpret_cast<T
*>(item
));
343 void _void_process_finish(void *item
) override
{
344 ceph_assert(ceph_mutex_is_locked(m_pool
->_lock
));
345 ceph_assert(m_processing
> 0);
349 virtual void process(T
*item
) = 0;
350 void process_finish() {
351 std::lock_guard
locker(m_pool
->_lock
);
352 _void_process_finish(nullptr);
356 ceph_assert(ceph_mutex_is_locked(m_pool
->_lock
));
357 if (m_items
.empty()) {
360 return m_items
.front();
362 void requeue_front(T
*item
) {
363 std::lock_guard
pool_locker(m_pool
->_lock
);
364 _void_process_finish(nullptr);
365 m_items
.push_front(item
);
367 void requeue_back(T
*item
) {
368 std::lock_guard
pool_locker(m_pool
->_lock
);
369 _void_process_finish(nullptr);
370 m_items
.push_back(item
);
373 std::lock_guard
pool_locker(m_pool
->_lock
);
374 m_pool
->_cond
.notify_one();
376 ceph::mutex
&get_pool_lock() {
377 return m_pool
->_lock
;
381 std::list
<T
*> m_items
;
382 uint32_t m_processing
;
385 std::vector
<WorkQueue_
*> work_queues
;
386 int next_work_queue
= 0;
390 struct WorkThread
: public Thread
{
392 // cppcheck-suppress noExplicitConstructor
393 WorkThread(ThreadPool
*p
) : pool(p
) {}
394 void *entry() override
{
400 std::set
<WorkThread
*> _threads
;
401 std::list
<WorkThread
*> _old_threads
; ///< need to be joined
404 void start_threads();
405 void join_old_threads();
406 virtual void worker(WorkThread
*wt
);
409 ThreadPool(CephContext
*cct_
, std::string nm
, std::string tn
, int n
, const char *option
= NULL
);
410 ~ThreadPool() override
;
412 /// return number of threads currently running
413 int get_num_threads() {
414 std::lock_guard
l(_lock
);
418 /// assign a work queue to this thread pool
419 void add_work_queue(WorkQueue_
* wq
) {
420 std::lock_guard
l(_lock
);
421 work_queues
.push_back(wq
);
423 /// remove a work queue from this thread pool
424 void remove_work_queue(WorkQueue_
* wq
) {
425 std::lock_guard
l(_lock
);
427 while (work_queues
[i
] != wq
)
429 for (i
++; i
< work_queues
.size(); i
++)
430 work_queues
[i
-1] = work_queues
[i
];
431 ceph_assert(i
== work_queues
.size());
432 work_queues
.resize(i
-1);
435 /// take thread pool lock
439 /// release thread pool lock
444 /// wait for a kick on this thread pool
445 void wait(ceph::condition_variable
&c
) {
446 std::unique_lock
l(_lock
, std::adopt_lock
);
450 /// wake up a waiter (with lock already held)
454 /// wake up a waiter (without lock held)
456 std::lock_guard
l(_lock
);
460 std::unique_lock
l(_lock
, std::adopt_lock
);
464 /// start thread pool thread
466 /// stop thread pool thread
467 void stop(bool clear_after
=true);
468 /// pause thread pool (if it not already paused)
470 /// pause initiation of new work
472 /// resume work in thread pool. must match each pause() call 1:1 to resume.
474 /** @brief Wait until work completes.
475 * If the parameter is NULL, blocks until all threads are idle.
476 * If it is not NULL, blocks until the given work queue does not have
477 * any items left to process. */
478 void drain(WorkQueue_
* wq
= 0);
482 public ThreadPool::WorkQueueVal
<GenContext
<ThreadPool::TPHandle
&>*> {
483 std::list
<GenContext
<ThreadPool::TPHandle
&>*> _queue
;
485 GenContextWQ(const std::string
&name
, ceph::timespan ti
, ThreadPool
*tp
)
486 : ThreadPool::WorkQueueVal
<
487 GenContext
<ThreadPool::TPHandle
&>*>(name
, ti
, ti
*10, tp
) {}
489 void _enqueue(GenContext
<ThreadPool::TPHandle
&> *c
) override
{
492 void _enqueue_front(GenContext
<ThreadPool::TPHandle
&> *c
) override
{
493 _queue
.push_front(c
);
495 bool _empty() override
{
496 return _queue
.empty();
498 GenContext
<ThreadPool::TPHandle
&> *_dequeue() override
{
499 ceph_assert(!_queue
.empty());
500 GenContext
<ThreadPool::TPHandle
&> *c
= _queue
.front();
504 void _process(GenContext
<ThreadPool::TPHandle
&> *c
,
505 ThreadPool::TPHandle
&tp
) override
{
510 class C_QueueInWQ
: public Context
{
512 GenContext
<ThreadPool::TPHandle
&> *c
;
514 C_QueueInWQ(GenContextWQ
*wq
, GenContext
<ThreadPool::TPHandle
&> *c
)
516 void finish(int) override
{
521 /// Work queue that asynchronously completes contexts (executes callbacks).
523 class ContextWQ
: public ThreadPool::PointerWQ
<Context
> {
525 ContextWQ(const std::string
&name
, ceph::timespan ti
, ThreadPool
*tp
)
526 : ThreadPool::PointerWQ
<Context
>(name
, ti
, ceph::timespan::zero(), tp
) {
527 this->register_work_queue();
530 void queue(Context
*ctx
, int result
= 0) {
532 std::lock_guard
locker(m_lock
);
533 m_context_results
[ctx
] = result
;
535 ThreadPool::PointerWQ
<Context
>::queue(ctx
);
538 void _clear() override
{
539 ThreadPool::PointerWQ
<Context
>::_clear();
541 std::lock_guard
locker(m_lock
);
542 m_context_results
.clear();
545 void process(Context
*ctx
) override
{
548 std::lock_guard
locker(m_lock
);
549 ceph::unordered_map
<Context
*, int>::iterator it
=
550 m_context_results
.find(ctx
);
551 if (it
!= m_context_results
.end()) {
553 m_context_results
.erase(it
);
556 ctx
->complete(result
);
559 ceph::mutex m_lock
= ceph::make_mutex("ContextWQ::m_lock");
560 ceph::unordered_map
<Context
*, int> m_context_results
;
563 class ShardedThreadPool
{
567 std::string thread_name
;
568 std::string lockname
;
569 ceph::mutex shardedpool_lock
;
570 ceph::condition_variable shardedpool_cond
;
571 ceph::condition_variable wait_cond
;
572 uint32_t num_threads
;
574 std::atomic
<bool> stop_threads
= { false };
575 std::atomic
<bool> pause_threads
= { false };
576 std::atomic
<bool> drain_threads
= { false };
579 uint32_t num_drained
;
583 class BaseShardedWQ
{
586 ceph::timespan timeout_interval
, suicide_interval
;
587 BaseShardedWQ(ceph::timespan ti
, ceph::timespan sti
)
588 :timeout_interval(ti
), suicide_interval(sti
) {}
589 virtual ~BaseShardedWQ() {}
591 virtual void _process(uint32_t thread_index
, ceph::heartbeat_handle_d
*hb
) = 0;
592 virtual void return_waiting_threads() = 0;
593 virtual void stop_return_waiting_threads() = 0;
594 virtual bool is_shard_empty(uint32_t thread_index
) = 0;
597 template <typename T
>
598 class ShardedWQ
: public BaseShardedWQ
{
600 ShardedThreadPool
* sharded_pool
;
603 virtual void _enqueue(T
&&) = 0;
604 virtual void _enqueue_front(T
&&) = 0;
608 ShardedWQ(ceph::timespan ti
,
609 ceph::timespan sti
, ShardedThreadPool
* tp
)
610 : BaseShardedWQ(ti
, sti
), sharded_pool(tp
) {
613 ~ShardedWQ() override
{}
615 void queue(T
&& item
) {
616 _enqueue(std::move(item
));
618 void queue_front(T
&& item
) {
619 _enqueue_front(std::move(item
));
622 sharded_pool
->drain();
631 struct WorkThreadSharded
: public Thread
{
632 ShardedThreadPool
*pool
;
633 uint32_t thread_index
;
634 WorkThreadSharded(ShardedThreadPool
*p
, uint32_t pthread_index
): pool(p
),
635 thread_index(pthread_index
) {}
636 void *entry() override
{
637 pool
->shardedthreadpool_worker(thread_index
);
642 std::vector
<WorkThreadSharded
*> threads_shardedpool
;
643 void start_threads();
644 void shardedthreadpool_worker(uint32_t thread_index
);
645 void set_wq(BaseShardedWQ
* swq
) {
653 ShardedThreadPool(CephContext
*cct_
, std::string nm
, std::string tn
, uint32_t pnum_threads
);
655 ~ShardedThreadPool(){};
657 /// start thread pool thread
659 /// stop thread pool thread
661 /// pause thread pool (if it not already paused)
663 /// pause initiation of new work
665 /// resume work in thread pool. must match each pause() call 1:1 to resume.
667 /// wait for all work to complete