]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/os/alienstore/thread_pool.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / crimson / os / alienstore / thread_pool.h
CommitLineData
20effc67
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2// vim: ts=8 sw=2 smarttab expandtab
11fdf7f2
TL
3#pragma once
4
5#include <atomic>
6#include <condition_variable>
7#include <tuple>
8#include <type_traits>
9#include <boost/lockfree/queue.hpp>
10#include <boost/optional.hpp>
11#include <seastar/core/future.hh>
12#include <seastar/core/gate.hh>
9f95a23c 13#include <seastar/core/reactor.hh>
1e59de90 14#include <seastar/core/resource.hh>
11fdf7f2
TL
15#include <seastar/core/semaphore.hh>
16#include <seastar/core/sharded.hh>
17
20effc67
TL
18#if __cplusplus > 201703L
19#include <semaphore>
20namespace crimson {
21 using std::counting_semaphore;
22}
23#else
24#include "semaphore.h"
25#endif
26
f67539c2 27namespace crimson::os {
11fdf7f2
TL
28
29struct WorkItem {
30 virtual ~WorkItem() {}
31 virtual void process() = 0;
32};
33
9f95a23c 34template<typename Func>
11fdf7f2 35struct Task final : WorkItem {
9f95a23c 36 using T = std::invoke_result_t<Func>;
f67539c2
TL
37 using future_stored_type_t =
38 std::conditional_t<std::is_void_v<T>,
20effc67
TL
39 seastar::internal::future_stored_type_t<>,
40 seastar::internal::future_stored_type_t<T>>;
9f95a23c 41 using futurator_t = seastar::futurize<T>;
11fdf7f2
TL
42public:
43 explicit Task(Func&& f)
44 : func(std::move(f))
45 {}
46 void process() override {
47 try {
9f95a23c
TL
48 if constexpr (std::is_void_v<T>) {
49 func();
50 state.set();
51 } else {
52 state.set(func());
53 }
11fdf7f2
TL
54 } catch (...) {
55 state.set_exception(std::current_exception());
56 }
9f95a23c 57 on_done.write_side().signal(1);
11fdf7f2 58 }
9f95a23c
TL
59 typename futurator_t::type get_future() {
60 return on_done.wait().then([this](size_t) {
61 if (state.failed()) {
20effc67 62 return futurator_t::make_exception_future(state.get_exception());
9f95a23c 63 } else {
20effc67 64 return futurator_t::from_tuple(state.get_value());
9f95a23c 65 }
11fdf7f2
TL
66 });
67 }
9f95a23c
TL
68private:
69 Func func;
f67539c2 70 seastar::future_state<future_stored_type_t> state;
9f95a23c 71 seastar::readable_eventfd on_done;
11fdf7f2
TL
72};
73
74struct SubmitQueue {
75 seastar::semaphore free_slots;
76 seastar::gate pending_tasks;
77 explicit SubmitQueue(size_t num_free_slots)
78 : free_slots(num_free_slots)
79 {}
80 seastar::future<> stop() {
81 return pending_tasks.close();
82 }
83};
84
20effc67
TL
85struct ShardedWorkQueue {
86public:
87 WorkItem* pop_front(std::chrono::milliseconds& queue_max_wait) {
88 if (sem.try_acquire_for(queue_max_wait)) {
89 if (!is_stopping()) {
90 WorkItem* work_item = nullptr;
91 [[maybe_unused]] bool popped = pending.pop(work_item);
92 assert(popped);
93 return work_item;
94 }
95 }
96 return nullptr;
11fdf7f2 97 }
20effc67
TL
98 void stop() {
99 stopping = true;
100 sem.release();
11fdf7f2 101 }
20effc67
TL
102 void push_back(WorkItem* work_item) {
103 [[maybe_unused]] bool pushed = pending.push(work_item);
104 assert(pushed);
105 sem.release();
106 }
107private:
108 bool is_stopping() const {
109 return stopping;
110 }
111 std::atomic<bool> stopping = false;
112 static constexpr unsigned QUEUE_SIZE = 128;
113 crimson::counting_semaphore<QUEUE_SIZE> sem{0};
114 boost::lockfree::queue<WorkItem*> pending{QUEUE_SIZE};
115};
116
117/// an engine for scheduling non-seastar tasks from seastar fibers
118class ThreadPool {
11fdf7f2
TL
119public:
120 /**
121 * @param queue_sz the depth of pending queue. before a task is scheduled,
122 * it waits in this queue. we will round this number to
123 * multiple of the number of cores.
124 * @param n_threads the number of threads in this thread pool.
125 * @param cpu the CPU core to which this thread pool is assigned
9f95a23c 126 * @note each @c Task has its own crimson::thread::Condition, which possesses
f67539c2 127 * an fd, so we should keep the size of queue under a reasonable limit.
11fdf7f2 128 */
1e59de90 129 ThreadPool(size_t n_threads, size_t queue_sz, const std::optional<seastar::resource::cpuset>& cpus);
11fdf7f2
TL
130 ~ThreadPool();
131 seastar::future<> start();
132 seastar::future<> stop();
20effc67
TL
133 size_t size() {
134 return n_threads;
135 }
11fdf7f2 136 template<typename Func, typename...Args>
20effc67 137 auto submit(int shard, Func&& func, Args&&... args) {
11fdf7f2
TL
138 auto packaged = [func=std::move(func),
139 args=std::forward_as_tuple(args...)] {
140 return std::apply(std::move(func), std::move(args));
141 };
142 return seastar::with_gate(submit_queue.local().pending_tasks,
20effc67 143 [packaged=std::move(packaged), shard, this] {
11fdf7f2 144 return local_free_slots().wait()
20effc67 145 .then([packaged=std::move(packaged), shard, this] {
11fdf7f2
TL
146 auto task = new Task{std::move(packaged)};
147 auto fut = task->get_future();
20effc67 148 pending_queues[shard].push_back(task);
11fdf7f2
TL
149 return fut.finally([task, this] {
150 local_free_slots().signal();
151 delete task;
152 });
153 });
154 });
155 }
20effc67
TL
156
157 template<typename Func>
158 auto submit(Func&& func) {
159 return submit(::rand() % n_threads, std::forward<Func>(func));
160 }
161
162private:
163 void loop(std::chrono::milliseconds queue_max_wait, size_t shard);
164 bool is_stopping() const {
165 return stopping.load(std::memory_order_relaxed);
166 }
1e59de90 167 static void pin(const seastar::resource::cpuset& cpus);
20effc67
TL
168 static void block_sighup();
169 seastar::semaphore& local_free_slots() {
170 return submit_queue.local().free_slots;
171 }
172 ThreadPool(const ThreadPool&) = delete;
173 ThreadPool& operator=(const ThreadPool&) = delete;
174
175private:
176 size_t n_threads;
177 std::atomic<bool> stopping = false;
178 std::vector<std::thread> threads;
179 seastar::sharded<SubmitQueue> submit_queue;
180 const size_t queue_size;
181 std::vector<ShardedWorkQueue> pending_queues;
11fdf7f2
TL
182};
183
f67539c2 184} // namespace crimson::os