]>
Commit | Line | Data |
---|---|---|
20effc67 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- |
2 | // vim: ts=8 sw=2 smarttab expandtab | |
11fdf7f2 TL |
3 | #pragma once |
4 | ||
5 | #include <atomic> | |
6 | #include <condition_variable> | |
7 | #include <tuple> | |
8 | #include <type_traits> | |
9 | #include <boost/lockfree/queue.hpp> | |
10 | #include <boost/optional.hpp> | |
11 | #include <seastar/core/future.hh> | |
12 | #include <seastar/core/gate.hh> | |
9f95a23c | 13 | #include <seastar/core/reactor.hh> |
1e59de90 | 14 | #include <seastar/core/resource.hh> |
11fdf7f2 TL |
15 | #include <seastar/core/semaphore.hh> |
16 | #include <seastar/core/sharded.hh> | |
17 | ||
20effc67 TL |
18 | #if __cplusplus > 201703L |
19 | #include <semaphore> | |
20 | namespace crimson { | |
21 | using std::counting_semaphore; | |
22 | } | |
23 | #else | |
24 | #include "semaphore.h" | |
25 | #endif | |
26 | ||
f67539c2 | 27 | namespace crimson::os { |
11fdf7f2 TL |
28 | |
29 | struct WorkItem { | |
30 | virtual ~WorkItem() {} | |
31 | virtual void process() = 0; | |
32 | }; | |
33 | ||
9f95a23c | 34 | template<typename Func> |
11fdf7f2 | 35 | struct Task final : WorkItem { |
9f95a23c | 36 | using T = std::invoke_result_t<Func>; |
f67539c2 TL |
37 | using future_stored_type_t = |
38 | std::conditional_t<std::is_void_v<T>, | |
20effc67 TL |
39 | seastar::internal::future_stored_type_t<>, |
40 | seastar::internal::future_stored_type_t<T>>; | |
9f95a23c | 41 | using futurator_t = seastar::futurize<T>; |
11fdf7f2 TL |
42 | public: |
43 | explicit Task(Func&& f) | |
44 | : func(std::move(f)) | |
45 | {} | |
46 | void process() override { | |
47 | try { | |
9f95a23c TL |
48 | if constexpr (std::is_void_v<T>) { |
49 | func(); | |
50 | state.set(); | |
51 | } else { | |
52 | state.set(func()); | |
53 | } | |
11fdf7f2 TL |
54 | } catch (...) { |
55 | state.set_exception(std::current_exception()); | |
56 | } | |
9f95a23c | 57 | on_done.write_side().signal(1); |
11fdf7f2 | 58 | } |
9f95a23c TL |
59 | typename futurator_t::type get_future() { |
60 | return on_done.wait().then([this](size_t) { | |
61 | if (state.failed()) { | |
20effc67 | 62 | return futurator_t::make_exception_future(state.get_exception()); |
9f95a23c | 63 | } else { |
20effc67 | 64 | return futurator_t::from_tuple(state.get_value()); |
9f95a23c | 65 | } |
11fdf7f2 TL |
66 | }); |
67 | } | |
9f95a23c TL |
68 | private: |
69 | Func func; | |
f67539c2 | 70 | seastar::future_state<future_stored_type_t> state; |
9f95a23c | 71 | seastar::readable_eventfd on_done; |
11fdf7f2 TL |
72 | }; |
73 | ||
74 | struct SubmitQueue { | |
75 | seastar::semaphore free_slots; | |
76 | seastar::gate pending_tasks; | |
77 | explicit SubmitQueue(size_t num_free_slots) | |
78 | : free_slots(num_free_slots) | |
79 | {} | |
80 | seastar::future<> stop() { | |
81 | return pending_tasks.close(); | |
82 | } | |
83 | }; | |
84 | ||
20effc67 TL |
85 | struct ShardedWorkQueue { |
86 | public: | |
87 | WorkItem* pop_front(std::chrono::milliseconds& queue_max_wait) { | |
88 | if (sem.try_acquire_for(queue_max_wait)) { | |
89 | if (!is_stopping()) { | |
90 | WorkItem* work_item = nullptr; | |
91 | [[maybe_unused]] bool popped = pending.pop(work_item); | |
92 | assert(popped); | |
93 | return work_item; | |
94 | } | |
95 | } | |
96 | return nullptr; | |
11fdf7f2 | 97 | } |
20effc67 TL |
98 | void stop() { |
99 | stopping = true; | |
100 | sem.release(); | |
11fdf7f2 | 101 | } |
20effc67 TL |
102 | void push_back(WorkItem* work_item) { |
103 | [[maybe_unused]] bool pushed = pending.push(work_item); | |
104 | assert(pushed); | |
105 | sem.release(); | |
106 | } | |
107 | private: | |
108 | bool is_stopping() const { | |
109 | return stopping; | |
110 | } | |
111 | std::atomic<bool> stopping = false; | |
112 | static constexpr unsigned QUEUE_SIZE = 128; | |
113 | crimson::counting_semaphore<QUEUE_SIZE> sem{0}; | |
114 | boost::lockfree::queue<WorkItem*> pending{QUEUE_SIZE}; | |
115 | }; | |
116 | ||
117 | /// an engine for scheduling non-seastar tasks from seastar fibers | |
118 | class ThreadPool { | |
11fdf7f2 TL |
119 | public: |
120 | /** | |
121 | * @param queue_sz the depth of pending queue. before a task is scheduled, | |
122 | * it waits in this queue. we will round this number to | |
123 | * multiple of the number of cores. | |
124 | * @param n_threads the number of threads in this thread pool. | |
125 | * @param cpu the CPU core to which this thread pool is assigned | |
9f95a23c | 126 | * @note each @c Task has its own crimson::thread::Condition, which possesses |
f67539c2 | 127 | * an fd, so we should keep the size of queue under a reasonable limit. |
11fdf7f2 | 128 | */ |
1e59de90 | 129 | ThreadPool(size_t n_threads, size_t queue_sz, const std::optional<seastar::resource::cpuset>& cpus); |
11fdf7f2 TL |
130 | ~ThreadPool(); |
131 | seastar::future<> start(); | |
132 | seastar::future<> stop(); | |
20effc67 TL |
133 | size_t size() { |
134 | return n_threads; | |
135 | } | |
11fdf7f2 | 136 | template<typename Func, typename...Args> |
20effc67 | 137 | auto submit(int shard, Func&& func, Args&&... args) { |
11fdf7f2 TL |
138 | auto packaged = [func=std::move(func), |
139 | args=std::forward_as_tuple(args...)] { | |
140 | return std::apply(std::move(func), std::move(args)); | |
141 | }; | |
142 | return seastar::with_gate(submit_queue.local().pending_tasks, | |
20effc67 | 143 | [packaged=std::move(packaged), shard, this] { |
11fdf7f2 | 144 | return local_free_slots().wait() |
20effc67 | 145 | .then([packaged=std::move(packaged), shard, this] { |
11fdf7f2 TL |
146 | auto task = new Task{std::move(packaged)}; |
147 | auto fut = task->get_future(); | |
20effc67 | 148 | pending_queues[shard].push_back(task); |
11fdf7f2 TL |
149 | return fut.finally([task, this] { |
150 | local_free_slots().signal(); | |
151 | delete task; | |
152 | }); | |
153 | }); | |
154 | }); | |
155 | } | |
20effc67 TL |
156 | |
157 | template<typename Func> | |
158 | auto submit(Func&& func) { | |
159 | return submit(::rand() % n_threads, std::forward<Func>(func)); | |
160 | } | |
161 | ||
162 | private: | |
163 | void loop(std::chrono::milliseconds queue_max_wait, size_t shard); | |
164 | bool is_stopping() const { | |
165 | return stopping.load(std::memory_order_relaxed); | |
166 | } | |
1e59de90 | 167 | static void pin(const seastar::resource::cpuset& cpus); |
20effc67 TL |
168 | static void block_sighup(); |
169 | seastar::semaphore& local_free_slots() { | |
170 | return submit_queue.local().free_slots; | |
171 | } | |
172 | ThreadPool(const ThreadPool&) = delete; | |
173 | ThreadPool& operator=(const ThreadPool&) = delete; | |
174 | ||
175 | private: | |
176 | size_t n_threads; | |
177 | std::atomic<bool> stopping = false; | |
178 | std::vector<std::thread> threads; | |
179 | seastar::sharded<SubmitQueue> submit_queue; | |
180 | const size_t queue_size; | |
181 | std::vector<ShardedWorkQueue> pending_queues; | |
11fdf7f2 TL |
182 | }; |
183 | ||
f67539c2 | 184 | } // namespace crimson::os |