]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | #pragma once | |
4 | ||
5 | #include <atomic> | |
6 | #include <condition_variable> | |
7 | #include <tuple> | |
8 | #include <type_traits> | |
9 | #include <boost/lockfree/queue.hpp> | |
10 | #include <boost/optional.hpp> | |
11 | #include <seastar/core/future.hh> | |
12 | #include <seastar/core/gate.hh> | |
9f95a23c | 13 | #include <seastar/core/reactor.hh> |
11fdf7f2 TL |
14 | #include <seastar/core/semaphore.hh> |
15 | #include <seastar/core/sharded.hh> | |
16 | ||
f67539c2 | 17 | namespace crimson::os { |
11fdf7f2 TL |
18 | |
19 | struct WorkItem { | |
20 | virtual ~WorkItem() {} | |
21 | virtual void process() = 0; | |
22 | }; | |
23 | ||
9f95a23c | 24 | template<typename Func> |
11fdf7f2 | 25 | struct Task final : WorkItem { |
9f95a23c | 26 | using T = std::invoke_result_t<Func>; |
f67539c2 TL |
27 | using future_stored_type_t = |
28 | std::conditional_t<std::is_void_v<T>, | |
29 | seastar::internal::future_stored_type_t<>, | |
30 | seastar::internal::future_stored_type_t<T>>; | |
9f95a23c | 31 | using futurator_t = seastar::futurize<T>; |
11fdf7f2 TL |
32 | public: |
33 | explicit Task(Func&& f) | |
34 | : func(std::move(f)) | |
35 | {} | |
36 | void process() override { | |
37 | try { | |
9f95a23c TL |
38 | if constexpr (std::is_void_v<T>) { |
39 | func(); | |
40 | state.set(); | |
41 | } else { | |
42 | state.set(func()); | |
43 | } | |
11fdf7f2 TL |
44 | } catch (...) { |
45 | state.set_exception(std::current_exception()); | |
46 | } | |
9f95a23c | 47 | on_done.write_side().signal(1); |
11fdf7f2 | 48 | } |
9f95a23c TL |
49 | typename futurator_t::type get_future() { |
50 | return on_done.wait().then([this](size_t) { | |
51 | if (state.failed()) { | |
52 | return futurator_t::make_exception_future(state.get_exception()); | |
53 | } else { | |
54 | return futurator_t::from_tuple(state.get_value()); | |
55 | } | |
11fdf7f2 TL |
56 | }); |
57 | } | |
9f95a23c TL |
58 | private: |
59 | Func func; | |
f67539c2 | 60 | seastar::future_state<future_stored_type_t> state; |
9f95a23c | 61 | seastar::readable_eventfd on_done; |
11fdf7f2 TL |
62 | }; |
63 | ||
64 | struct SubmitQueue { | |
65 | seastar::semaphore free_slots; | |
66 | seastar::gate pending_tasks; | |
67 | explicit SubmitQueue(size_t num_free_slots) | |
68 | : free_slots(num_free_slots) | |
69 | {} | |
70 | seastar::future<> stop() { | |
71 | return pending_tasks.close(); | |
72 | } | |
73 | }; | |
74 | ||
75 | /// an engine for scheduling non-seastar tasks from seastar fibers | |
76 | class ThreadPool { | |
77 | std::atomic<bool> stopping = false; | |
78 | std::mutex mutex; | |
79 | std::condition_variable cond; | |
80 | std::vector<std::thread> threads; | |
81 | seastar::sharded<SubmitQueue> submit_queue; | |
82 | const size_t queue_size; | |
83 | boost::lockfree::queue<WorkItem*> pending; | |
84 | ||
f67539c2 | 85 | void loop(std::chrono::milliseconds queue_max_wait); |
11fdf7f2 TL |
86 | bool is_stopping() const { |
87 | return stopping.load(std::memory_order_relaxed); | |
88 | } | |
89 | static void pin(unsigned cpu_id); | |
90 | seastar::semaphore& local_free_slots() { | |
91 | return submit_queue.local().free_slots; | |
92 | } | |
93 | ThreadPool(const ThreadPool&) = delete; | |
94 | ThreadPool& operator=(const ThreadPool&) = delete; | |
95 | public: | |
96 | /** | |
97 | * @param queue_sz the depth of pending queue. before a task is scheduled, | |
98 | * it waits in this queue. we will round this number to | |
99 | * multiple of the number of cores. | |
100 | * @param n_threads the number of threads in this thread pool. | |
101 | * @param cpu the CPU core to which this thread pool is assigned | |
9f95a23c | 102 | * @note each @c Task has its own crimson::thread::Condition, which possesses |
f67539c2 | 103 | * an fd, so we should keep the size of queue under a reasonable limit. |
11fdf7f2 | 104 | */ |
f67539c2 | 105 | ThreadPool(size_t n_threads, size_t queue_sz, long cpu); |
11fdf7f2 TL |
106 | ~ThreadPool(); |
107 | seastar::future<> start(); | |
108 | seastar::future<> stop(); | |
109 | template<typename Func, typename...Args> | |
110 | auto submit(Func&& func, Args&&... args) { | |
111 | auto packaged = [func=std::move(func), | |
112 | args=std::forward_as_tuple(args...)] { | |
113 | return std::apply(std::move(func), std::move(args)); | |
114 | }; | |
115 | return seastar::with_gate(submit_queue.local().pending_tasks, | |
116 | [packaged=std::move(packaged), this] { | |
117 | return local_free_slots().wait() | |
118 | .then([packaged=std::move(packaged), this] { | |
119 | auto task = new Task{std::move(packaged)}; | |
120 | auto fut = task->get_future(); | |
121 | pending.push(task); | |
122 | cond.notify_one(); | |
123 | return fut.finally([task, this] { | |
124 | local_free_slots().signal(); | |
125 | delete task; | |
126 | }); | |
127 | }); | |
128 | }); | |
129 | } | |
130 | }; | |
131 | ||
f67539c2 | 132 | } // namespace crimson::os |