1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2 // vim: ts=8 sw=2 smarttab expandtab
6 #include <condition_variable>
9 #include <boost/lockfree/queue.hpp>
10 #include <boost/optional.hpp>
11 #include <seastar/core/future.hh>
12 #include <seastar/core/gate.hh>
13 #include <seastar/core/reactor.hh>
14 #include <seastar/core/semaphore.hh>
15 #include <seastar/core/sharded.hh>
17 #if __cplusplus > 201703L
20 using std::counting_semaphore
;
23 #include "semaphore.h"
26 namespace crimson::os
{
29 virtual ~WorkItem() {}
30 virtual void process() = 0;
33 template<typename Func
>
34 struct Task final
: WorkItem
{
35 using T
= std::invoke_result_t
<Func
>;
36 using future_stored_type_t
=
37 std::conditional_t
<std::is_void_v
<T
>,
38 seastar::internal::future_stored_type_t
<>,
39 seastar::internal::future_stored_type_t
<T
>>;
40 using futurator_t
= seastar::futurize
<T
>;
42 explicit Task(Func
&& f
)
45 void process() override
{
47 if constexpr (std::is_void_v
<T
>) {
54 state
.set_exception(std::current_exception());
56 on_done
.write_side().signal(1);
58 typename
futurator_t::type
get_future() {
59 return on_done
.wait().then([this](size_t) {
61 return futurator_t::make_exception_future(state
.get_exception());
63 return futurator_t::from_tuple(state
.get_value());
69 seastar::future_state
<future_stored_type_t
> state
;
70 seastar::readable_eventfd on_done
;
74 seastar::semaphore free_slots
;
75 seastar::gate pending_tasks
;
76 explicit SubmitQueue(size_t num_free_slots
)
77 : free_slots(num_free_slots
)
79 seastar::future
<> stop() {
80 return pending_tasks
.close();
84 struct ShardedWorkQueue
{
86 WorkItem
* pop_front(std::chrono::milliseconds
& queue_max_wait
) {
87 if (sem
.try_acquire_for(queue_max_wait
)) {
89 WorkItem
* work_item
= nullptr;
90 [[maybe_unused
]] bool popped
= pending
.pop(work_item
);
101 void push_back(WorkItem
* work_item
) {
102 [[maybe_unused
]] bool pushed
= pending
.push(work_item
);
107 bool is_stopping() const {
110 std::atomic
<bool> stopping
= false;
111 static constexpr unsigned QUEUE_SIZE
= 128;
112 crimson::counting_semaphore
<QUEUE_SIZE
> sem
{0};
113 boost::lockfree::queue
<WorkItem
*> pending
{QUEUE_SIZE
};
116 /// an engine for scheduling non-seastar tasks from seastar fibers
120 * @param queue_sz the depth of pending queue. before a task is scheduled,
121 * it waits in this queue. we will round this number to
122 * multiple of the number of cores.
123 * @param n_threads the number of threads in this thread pool.
124 * @param cpu the CPU core to which this thread pool is assigned
125 * @note each @c Task has its own crimson::thread::Condition, which possesses
126 * an fd, so we should keep the size of queue under a reasonable limit.
128 ThreadPool(size_t n_threads
, size_t queue_sz
, std::vector
<uint64_t> cpus
);
130 seastar::future
<> start();
131 seastar::future
<> stop();
135 template<typename Func
, typename
...Args
>
136 auto submit(int shard
, Func
&& func
, Args
&&... args
) {
137 auto packaged
= [func
=std::move(func
),
138 args
=std::forward_as_tuple(args
...)] {
139 return std::apply(std::move(func
), std::move(args
));
141 return seastar::with_gate(submit_queue
.local().pending_tasks
,
142 [packaged
=std::move(packaged
), shard
, this] {
143 return local_free_slots().wait()
144 .then([packaged
=std::move(packaged
), shard
, this] {
145 auto task
= new Task
{std::move(packaged
)};
146 auto fut
= task
->get_future();
147 pending_queues
[shard
].push_back(task
);
148 return fut
.finally([task
, this] {
149 local_free_slots().signal();
156 template<typename Func
>
157 auto submit(Func
&& func
) {
158 return submit(::rand() % n_threads
, std::forward
<Func
>(func
));
162 void loop(std::chrono::milliseconds queue_max_wait
, size_t shard
);
163 bool is_stopping() const {
164 return stopping
.load(std::memory_order_relaxed
);
166 static void pin(const std::vector
<uint64_t>& cpus
);
167 static void block_sighup();
168 seastar::semaphore
& local_free_slots() {
169 return submit_queue
.local().free_slots
;
171 ThreadPool(const ThreadPool
&) = delete;
172 ThreadPool
& operator=(const ThreadPool
&) = delete;
176 std::atomic
<bool> stopping
= false;
177 std::vector
<std::thread
> threads
;
178 seastar::sharded
<SubmitQueue
> submit_queue
;
179 const size_t queue_size
;
180 std::vector
<ShardedWorkQueue
> pending_queues
;
183 } // namespace crimson::os