]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/os/alienstore/thread_pool.h
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / crimson / os / alienstore / thread_pool.h
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3#pragma once
4
5#include <atomic>
6#include <condition_variable>
7#include <tuple>
8#include <type_traits>
9#include <boost/lockfree/queue.hpp>
10#include <boost/optional.hpp>
11#include <seastar/core/future.hh>
12#include <seastar/core/gate.hh>
9f95a23c 13#include <seastar/core/reactor.hh>
11fdf7f2
TL
14#include <seastar/core/semaphore.hh>
15#include <seastar/core/sharded.hh>
16
f67539c2 17namespace crimson::os {
11fdf7f2
TL
18
19struct WorkItem {
20 virtual ~WorkItem() {}
21 virtual void process() = 0;
22};
23
9f95a23c 24template<typename Func>
11fdf7f2 25struct Task final : WorkItem {
9f95a23c 26 using T = std::invoke_result_t<Func>;
f67539c2
TL
27 using future_stored_type_t =
28 std::conditional_t<std::is_void_v<T>,
29 seastar::internal::future_stored_type_t<>,
30 seastar::internal::future_stored_type_t<T>>;
9f95a23c 31 using futurator_t = seastar::futurize<T>;
11fdf7f2
TL
32public:
33 explicit Task(Func&& f)
34 : func(std::move(f))
35 {}
36 void process() override {
37 try {
9f95a23c
TL
38 if constexpr (std::is_void_v<T>) {
39 func();
40 state.set();
41 } else {
42 state.set(func());
43 }
11fdf7f2
TL
44 } catch (...) {
45 state.set_exception(std::current_exception());
46 }
9f95a23c 47 on_done.write_side().signal(1);
11fdf7f2 48 }
9f95a23c
TL
49 typename futurator_t::type get_future() {
50 return on_done.wait().then([this](size_t) {
51 if (state.failed()) {
52 return futurator_t::make_exception_future(state.get_exception());
53 } else {
54 return futurator_t::from_tuple(state.get_value());
55 }
11fdf7f2
TL
56 });
57 }
9f95a23c
TL
58private:
59 Func func;
f67539c2 60 seastar::future_state<future_stored_type_t> state;
9f95a23c 61 seastar::readable_eventfd on_done;
11fdf7f2
TL
62};
63
64struct SubmitQueue {
65 seastar::semaphore free_slots;
66 seastar::gate pending_tasks;
67 explicit SubmitQueue(size_t num_free_slots)
68 : free_slots(num_free_slots)
69 {}
70 seastar::future<> stop() {
71 return pending_tasks.close();
72 }
73};
74
75/// an engine for scheduling non-seastar tasks from seastar fibers
76class ThreadPool {
77 std::atomic<bool> stopping = false;
78 std::mutex mutex;
79 std::condition_variable cond;
80 std::vector<std::thread> threads;
81 seastar::sharded<SubmitQueue> submit_queue;
82 const size_t queue_size;
83 boost::lockfree::queue<WorkItem*> pending;
84
f67539c2 85 void loop(std::chrono::milliseconds queue_max_wait);
11fdf7f2
TL
86 bool is_stopping() const {
87 return stopping.load(std::memory_order_relaxed);
88 }
89 static void pin(unsigned cpu_id);
90 seastar::semaphore& local_free_slots() {
91 return submit_queue.local().free_slots;
92 }
93 ThreadPool(const ThreadPool&) = delete;
94 ThreadPool& operator=(const ThreadPool&) = delete;
95public:
96 /**
97 * @param queue_sz the depth of pending queue. before a task is scheduled,
98 * it waits in this queue. we will round this number to
99 * multiple of the number of cores.
100 * @param n_threads the number of threads in this thread pool.
101 * @param cpu the CPU core to which this thread pool is assigned
9f95a23c 102 * @note each @c Task has its own crimson::thread::Condition, which possesses
f67539c2 103 * an fd, so we should keep the size of queue under a reasonable limit.
11fdf7f2 104 */
f67539c2 105 ThreadPool(size_t n_threads, size_t queue_sz, long cpu);
11fdf7f2
TL
106 ~ThreadPool();
107 seastar::future<> start();
108 seastar::future<> stop();
109 template<typename Func, typename...Args>
110 auto submit(Func&& func, Args&&... args) {
111 auto packaged = [func=std::move(func),
112 args=std::forward_as_tuple(args...)] {
113 return std::apply(std::move(func), std::move(args));
114 };
115 return seastar::with_gate(submit_queue.local().pending_tasks,
116 [packaged=std::move(packaged), this] {
117 return local_free_slots().wait()
118 .then([packaged=std::move(packaged), this] {
119 auto task = new Task{std::move(packaged)};
120 auto fut = task->get_future();
121 pending.push(task);
122 cond.notify_one();
123 return fut.finally([task, this] {
124 local_free_slots().signal();
125 delete task;
126 });
127 });
128 });
129 }
130};
131
f67539c2 132} // namespace crimson::os