]>
git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/thread/ThreadPool.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include <condition_variable>
9 #include <boost/lockfree/queue.hpp>
10 #include <boost/optional.hpp>
11 #include <seastar/core/future.hh>
12 #include <seastar/core/gate.hh>
13 #include <seastar/core/reactor.hh>
14 #include <seastar/core/semaphore.hh>
15 #include <seastar/core/sharded.hh>
17 namespace crimson::thread
{
20 virtual ~WorkItem() {}
21 virtual void process() = 0;
24 template<typename Func
>
25 struct Task final
: WorkItem
{
26 using T
= std::invoke_result_t
<Func
>;
27 using future_state_t
= std::conditional_t
<std::is_void_v
<T
>,
28 seastar::future_state
<>,
29 seastar::future_state
<T
>>;
30 using futurator_t
= seastar::futurize
<T
>;
32 explicit Task(Func
&& f
)
35 void process() override
{
37 if constexpr (std::is_void_v
<T
>) {
44 state
.set_exception(std::current_exception());
46 on_done
.write_side().signal(1);
48 typename
futurator_t::type
get_future() {
49 return on_done
.wait().then([this](size_t) {
51 return futurator_t::make_exception_future(state
.get_exception());
53 return futurator_t::from_tuple(state
.get_value());
60 seastar::readable_eventfd on_done
;
64 seastar::semaphore free_slots
;
65 seastar::gate pending_tasks
;
66 explicit SubmitQueue(size_t num_free_slots
)
67 : free_slots(num_free_slots
)
69 seastar::future
<> stop() {
70 return pending_tasks
.close();
74 /// an engine for scheduling non-seastar tasks from seastar fibers
76 std::atomic
<bool> stopping
= false;
78 std::condition_variable cond
;
79 std::vector
<std::thread
> threads
;
80 seastar::sharded
<SubmitQueue
> submit_queue
;
81 const size_t queue_size
;
82 boost::lockfree::queue
<WorkItem
*> pending
;
85 bool is_stopping() const {
86 return stopping
.load(std::memory_order_relaxed
);
88 static void pin(unsigned cpu_id
);
89 seastar::semaphore
& local_free_slots() {
90 return submit_queue
.local().free_slots
;
92 ThreadPool(const ThreadPool
&) = delete;
93 ThreadPool
& operator=(const ThreadPool
&) = delete;
96 * @param queue_sz the depth of pending queue. before a task is scheduled,
97 * it waits in this queue. we will round this number to
98 * multiple of the number of cores.
99 * @param n_threads the number of threads in this thread pool.
100 * @param cpu the CPU core to which this thread pool is assigned
101 * @note each @c Task has its own crimson::thread::Condition, which possesses
102 * possesses an fd, so we should keep the size of queue under a reasonable
105 ThreadPool(size_t n_threads
, size_t queue_sz
, unsigned cpu
);
107 seastar::future
<> start();
108 seastar::future
<> stop();
109 template<typename Func
, typename
...Args
>
110 auto submit(Func
&& func
, Args
&&... args
) {
111 auto packaged
= [func
=std::move(func
),
112 args
=std::forward_as_tuple(args
...)] {
113 return std::apply(std::move(func
), std::move(args
));
115 return seastar::with_gate(submit_queue
.local().pending_tasks
,
116 [packaged
=std::move(packaged
), this] {
117 return local_free_slots().wait()
118 .then([packaged
=std::move(packaged
), this] {
119 auto task
= new Task
{std::move(packaged
)};
120 auto fut
= task
->get_future();
123 return fut
.finally([task
, this] {
124 local_free_slots().signal();
132 } // namespace crimson::thread