]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/thread/ThreadPool.h
import 15.2.0 Octopus source
[ceph.git] / ceph / src / crimson / thread / ThreadPool.h
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3#pragma once
4
5#include <atomic>
6#include <condition_variable>
7#include <tuple>
8#include <type_traits>
9#include <boost/lockfree/queue.hpp>
10#include <boost/optional.hpp>
11#include <seastar/core/future.hh>
12#include <seastar/core/gate.hh>
9f95a23c 13#include <seastar/core/reactor.hh>
11fdf7f2
TL
14#include <seastar/core/semaphore.hh>
15#include <seastar/core/sharded.hh>
16
9f95a23c 17namespace crimson::thread {
11fdf7f2
TL
18
19struct WorkItem {
20 virtual ~WorkItem() {}
21 virtual void process() = 0;
22};
23
9f95a23c 24template<typename Func>
11fdf7f2 25struct Task final : WorkItem {
9f95a23c
TL
26 using T = std::invoke_result_t<Func>;
27 using future_state_t = std::conditional_t<std::is_void_v<T>,
28 seastar::future_state<>,
29 seastar::future_state<T>>;
30 using futurator_t = seastar::futurize<T>;
11fdf7f2
TL
31public:
32 explicit Task(Func&& f)
33 : func(std::move(f))
34 {}
35 void process() override {
36 try {
9f95a23c
TL
37 if constexpr (std::is_void_v<T>) {
38 func();
39 state.set();
40 } else {
41 state.set(func());
42 }
11fdf7f2
TL
43 } catch (...) {
44 state.set_exception(std::current_exception());
45 }
9f95a23c 46 on_done.write_side().signal(1);
11fdf7f2 47 }
9f95a23c
TL
48 typename futurator_t::type get_future() {
49 return on_done.wait().then([this](size_t) {
50 if (state.failed()) {
51 return futurator_t::make_exception_future(state.get_exception());
52 } else {
53 return futurator_t::from_tuple(state.get_value());
54 }
11fdf7f2
TL
55 });
56 }
9f95a23c
TL
57private:
58 Func func;
59 future_state_t state;
60 seastar::readable_eventfd on_done;
11fdf7f2
TL
61};
62
63struct SubmitQueue {
64 seastar::semaphore free_slots;
65 seastar::gate pending_tasks;
66 explicit SubmitQueue(size_t num_free_slots)
67 : free_slots(num_free_slots)
68 {}
69 seastar::future<> stop() {
70 return pending_tasks.close();
71 }
72};
73
74/// an engine for scheduling non-seastar tasks from seastar fibers
75class ThreadPool {
76 std::atomic<bool> stopping = false;
77 std::mutex mutex;
78 std::condition_variable cond;
79 std::vector<std::thread> threads;
80 seastar::sharded<SubmitQueue> submit_queue;
81 const size_t queue_size;
82 boost::lockfree::queue<WorkItem*> pending;
83
84 void loop();
85 bool is_stopping() const {
86 return stopping.load(std::memory_order_relaxed);
87 }
88 static void pin(unsigned cpu_id);
89 seastar::semaphore& local_free_slots() {
90 return submit_queue.local().free_slots;
91 }
92 ThreadPool(const ThreadPool&) = delete;
93 ThreadPool& operator=(const ThreadPool&) = delete;
94public:
95 /**
96 * @param queue_sz the depth of pending queue. before a task is scheduled,
97 * it waits in this queue. we will round this number to
98 * multiple of the number of cores.
99 * @param n_threads the number of threads in this thread pool.
100 * @param cpu the CPU core to which this thread pool is assigned
9f95a23c 101 * @note each @c Task has its own crimson::thread::Condition, which possesses
11fdf7f2
TL
102 * possesses an fd, so we should keep the size of queue under a reasonable
103 * limit.
104 */
105 ThreadPool(size_t n_threads, size_t queue_sz, unsigned cpu);
106 ~ThreadPool();
107 seastar::future<> start();
108 seastar::future<> stop();
109 template<typename Func, typename...Args>
110 auto submit(Func&& func, Args&&... args) {
111 auto packaged = [func=std::move(func),
112 args=std::forward_as_tuple(args...)] {
113 return std::apply(std::move(func), std::move(args));
114 };
115 return seastar::with_gate(submit_queue.local().pending_tasks,
116 [packaged=std::move(packaged), this] {
117 return local_free_slots().wait()
118 .then([packaged=std::move(packaged), this] {
119 auto task = new Task{std::move(packaged)};
120 auto fut = task->get_future();
121 pending.push(task);
122 cond.notify_one();
123 return fut.finally([task, this] {
124 local_free_slots().signal();
125 delete task;
126 });
127 });
128 });
129 }
130};
131
9f95a23c 132} // namespace crimson::thread