]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | ||
19 | /* | |
20 | * Copyright (C) 2015 Cloudius Systems, Ltd. | |
21 | */ | |
22 | ||
23 | #pragma once | |
24 | ||
25 | #include <seastar/core/thread_impl.hh> | |
26 | #include <seastar/core/future.hh> | |
27 | #include <seastar/core/do_with.hh> | |
11fdf7f2 | 28 | #include <seastar/core/timer.hh> |
11fdf7f2 TL |
29 | #include <seastar/core/scheduling.hh> |
30 | #include <memory> | |
31 | #include <setjmp.h> | |
32 | #include <type_traits> | |
33 | #include <chrono> | |
34 | #include <seastar/util/std-compat.hh> | |
35 | #include <ucontext.h> | |
36 | #include <boost/intrusive/list.hpp> | |
37 | ||
38 | /// \defgroup thread-module Seastar threads | |
39 | /// | |
40 | /// Seastar threads provide an execution environment where blocking | |
41 | /// is tolerated; you can issue I/O, and wait for it in the same function, | |
42 | /// rather then establishing a callback to be called with \ref future<>::then(). | |
43 | /// | |
44 | /// Seastar threads are not the same as operating system threads: | |
45 | /// - seastar threads are cooperative; they are never preempted except | |
46 | /// at blocking points (see below) | |
47 | /// - seastar threads always run on the same core they were launched on | |
48 | /// | |
49 | /// Like other seastar code, seastar threads may not issue blocking system calls. | |
50 | /// | |
f67539c2 | 51 | /// A seastar thread blocking point is any function that returns a \ref future. |
11fdf7f2 TL |
52 | /// you block by calling \ref future<>::get(); this waits for the future to become |
53 | /// available, and in the meanwhile, other seastar threads and seastar non-threaded | |
54 | /// code may execute. | |
55 | /// | |
56 | /// Example: | |
57 | /// \code | |
58 | /// seastar::thread th([] { | |
59 | /// sleep(5s).get(); // blocking point | |
60 | /// }); | |
61 | /// \endcode | |
62 | /// | |
63 | /// An easy way to launch a thread and carry out some computation, and return a | |
64 | /// result from this execution is by using the \ref seastar::async() function. | |
65 | /// The result is returned as a future, so that non-threaded code can wait for | |
66 | /// the thread to terminate and yield a result. | |
67 | ||
68 | /// Seastar API namespace | |
69 | namespace seastar { | |
70 | ||
71 | /// \addtogroup thread-module | |
72 | /// @{ | |
73 | ||
74 | class thread; | |
75 | class thread_attributes; | |
11fdf7f2 TL |
76 | |
77 | /// Class that holds attributes controling the behavior of a thread. | |
78 | class thread_attributes { | |
79 | public: | |
f67539c2 TL |
80 | std::optional<seastar::scheduling_group> sched_group; |
81 | // For stack_size 0, a default value will be used (128KiB when writing this comment) | |
82 | size_t stack_size = 0; | |
11fdf7f2 TL |
83 | }; |
84 | ||
85 | ||
86 | /// \cond internal | |
87 | extern thread_local jmp_buf_link g_unthreaded_context; | |
88 | ||
89 | // Internal class holding thread state. We can't hold this in | |
90 | // \c thread itself because \c thread is movable, and we want pointers | |
91 | // to this state to be captured. | |
9f95a23c | 92 | class thread_context final : private task { |
11fdf7f2 TL |
93 | struct stack_deleter { |
94 | void operator()(char *ptr) const noexcept; | |
f67539c2 TL |
95 | int valgrind_id; |
96 | stack_deleter(int valgrind_id); | |
11fdf7f2 TL |
97 | }; |
98 | using stack_holder = std::unique_ptr<char[], stack_deleter>; | |
11fdf7f2 | 99 | |
f67539c2 | 100 | stack_holder _stack; |
9f95a23c | 101 | noncopyable_function<void ()> _func; |
11fdf7f2 | 102 | jmp_buf_link _context; |
11fdf7f2 TL |
103 | promise<> _done; |
104 | bool _joined = false; | |
11fdf7f2 TL |
105 | |
106 | boost::intrusive::list_member_hook<> _all_link; | |
107 | using all_thread_list = boost::intrusive::list<thread_context, | |
108 | boost::intrusive::member_hook<thread_context, boost::intrusive::list_member_hook<>, | |
109 | &thread_context::_all_link>, | |
110 | boost::intrusive::constant_time_size<false>>; | |
111 | ||
11fdf7f2 TL |
112 | static thread_local all_thread_list _all_threads; |
113 | private: | |
114 | static void s_main(int lo, int hi); // all parameters MUST be 'int' for makecontext | |
f67539c2 | 115 | void setup(size_t stack_size); |
11fdf7f2 | 116 | void main(); |
f67539c2 | 117 | stack_holder make_stack(size_t stack_size); |
9f95a23c | 118 | virtual void run_and_dispose() noexcept override; // from task class |
11fdf7f2 | 119 | public: |
9f95a23c | 120 | thread_context(thread_attributes attr, noncopyable_function<void ()> func); |
11fdf7f2 TL |
121 | ~thread_context(); |
122 | void switch_in(); | |
123 | void switch_out(); | |
124 | bool should_yield() const; | |
125 | void reschedule(); | |
126 | void yield(); | |
f67539c2 | 127 | task* waiting_task() noexcept override { return _done.waiting_task(); } |
11fdf7f2 TL |
128 | friend class thread; |
129 | friend void thread_impl::switch_in(thread_context*); | |
130 | friend void thread_impl::switch_out(thread_context*); | |
131 | friend scheduling_group thread_impl::sched_group(const thread_context*); | |
132 | }; | |
133 | ||
134 | /// \endcond | |
135 | ||
136 | ||
137 | /// \brief thread - stateful thread of execution | |
138 | /// | |
139 | /// Threads allow using seastar APIs in a blocking manner, | |
140 | /// by calling future::get() on a non-ready future. When | |
141 | /// this happens, the thread is put to sleep until the future | |
142 | /// becomes ready. | |
143 | class thread { | |
144 | std::unique_ptr<thread_context> _context; | |
145 | static thread_local thread* _current; | |
146 | public: | |
147 | /// \brief Constructs a \c thread object that does not represent a thread | |
148 | /// of execution. | |
149 | thread() = default; | |
150 | /// \brief Constructs a \c thread object that represents a thread of execution | |
151 | /// | |
152 | /// \param func Callable object to execute in thread. The callable is | |
153 | /// called immediately. | |
154 | template <typename Func> | |
155 | thread(Func func); | |
156 | /// \brief Constructs a \c thread object that represents a thread of execution | |
157 | /// | |
158 | /// \param attr Attributes describing the new thread. | |
159 | /// \param func Callable object to execute in thread. The callable is | |
160 | /// called immediately. | |
161 | template <typename Func> | |
162 | thread(thread_attributes attr, Func func); | |
163 | /// \brief Moves a thread object. | |
164 | thread(thread&& x) noexcept = default; | |
165 | /// \brief Move-assigns a thread object. | |
166 | thread& operator=(thread&& x) noexcept = default; | |
167 | /// \brief Destroys a \c thread object. | |
168 | /// | |
169 | /// The thread must not represent a running thread of execution (see join()). | |
170 | ~thread() { assert(!_context || _context->_joined); } | |
171 | /// \brief Waits for thread execution to terminate. | |
172 | /// | |
173 | /// Waits for thread execution to terminate, and marks the thread object as not | |
174 | /// representing a running thread of execution. | |
175 | future<> join(); | |
176 | /// \brief Voluntarily defer execution of current thread. | |
177 | /// | |
178 | /// Gives other threads/fibers a chance to run on current CPU. | |
179 | /// The current thread will resume execution promptly. | |
180 | static void yield(); | |
181 | /// \brief Checks whether this thread ought to call yield() now. | |
182 | /// | |
183 | /// Useful where we cannot call yield() immediately because we | |
184 | /// Need to take some cleanup action first. | |
185 | static bool should_yield(); | |
186 | ||
9f95a23c TL |
187 | /// \brief Yield if this thread ought to call yield() now. |
188 | /// | |
189 | /// Useful where a code does long running computation and does | |
190 | /// not want to hog cpu for more then its share | |
191 | static void maybe_yield(); | |
192 | ||
11fdf7f2 TL |
193 | static bool running_in_thread() { |
194 | return thread_impl::get() != nullptr; | |
195 | } | |
11fdf7f2 TL |
196 | }; |
197 | ||
198 | template <typename Func> | |
199 | inline | |
200 | thread::thread(thread_attributes attr, Func func) | |
9f95a23c | 201 | : _context(std::make_unique<thread_context>(std::move(attr), std::move(func))) { |
11fdf7f2 TL |
202 | } |
203 | ||
204 | template <typename Func> | |
205 | inline | |
206 | thread::thread(Func func) | |
207 | : thread(thread_attributes(), std::move(func)) { | |
208 | } | |
209 | ||
210 | inline | |
211 | future<> | |
212 | thread::join() { | |
213 | _context->_joined = true; | |
214 | return _context->_done.get_future(); | |
215 | } | |
216 | ||
217 | /// Executes a callable in a seastar thread. | |
218 | /// | |
219 | /// Runs a block of code in a threaded context, | |
220 | /// which allows it to block (using \ref future::get()). The | |
221 | /// result of the callable is returned as a future. | |
222 | /// | |
223 | /// \param attr a \ref thread_attributes instance | |
224 | /// \param func a callable to be executed in a thread | |
225 | /// \param args a parameter pack to be forwarded to \c func. | |
226 | /// \return whatever \c func returns, as a future. | |
227 | /// | |
228 | /// Example: | |
229 | /// \code | |
230 | /// future<int> compute_sum(int a, int b) { | |
231 | /// thread_attributes attr = {}; | |
9f95a23c | 232 | /// attr.sched_group = some_scheduling_group_ptr; |
11fdf7f2 TL |
233 | /// return seastar::async(attr, [a, b] { |
234 | /// // some blocking code: | |
235 | /// sleep(1s).get(); | |
236 | /// return a + b; | |
237 | /// }); | |
238 | /// } | |
239 | /// \endcode | |
240 | template <typename Func, typename... Args> | |
241 | inline | |
20effc67 | 242 | futurize_t<std::invoke_result_t<Func, Args...>> |
f67539c2 | 243 | async(thread_attributes attr, Func&& func, Args&&... args) noexcept { |
20effc67 | 244 | using return_type = std::invoke_result_t<Func, Args...>; |
11fdf7f2 TL |
245 | struct work { |
246 | thread_attributes attr; | |
247 | Func func; | |
248 | std::tuple<Args...> args; | |
1e59de90 TL |
249 | promise<return_type> pr{}; |
250 | thread th{}; | |
11fdf7f2 | 251 | }; |
f67539c2 TL |
252 | |
253 | try { | |
254 | auto wp = std::make_unique<work>(work{std::move(attr), std::forward<Func>(func), std::forward_as_tuple(std::forward<Args>(args)...)}); | |
255 | auto& w = *wp; | |
11fdf7f2 TL |
256 | auto ret = w.pr.get_future(); |
257 | w.th = thread(std::move(w.attr), [&w] { | |
258 | futurize<return_type>::apply(std::move(w.func), std::move(w.args)).forward_to(std::move(w.pr)); | |
259 | }); | |
260 | return w.th.join().then([ret = std::move(ret)] () mutable { | |
261 | return std::move(ret); | |
f67539c2 TL |
262 | }).finally([wp = std::move(wp)] {}); |
263 | } catch (...) { | |
264 | return futurize<return_type>::make_exception_future(std::current_exception()); | |
265 | } | |
11fdf7f2 TL |
266 | } |
267 | ||
268 | /// Executes a callable in a seastar thread. | |
269 | /// | |
270 | /// Runs a block of code in a threaded context, | |
271 | /// which allows it to block (using \ref future::get()). The | |
272 | /// result of the callable is returned as a future. | |
273 | /// | |
274 | /// \param func a callable to be executed in a thread | |
275 | /// \param args a parameter pack to be forwarded to \c func. | |
276 | /// \return whatever \c func returns, as a future. | |
277 | template <typename Func, typename... Args> | |
278 | inline | |
20effc67 | 279 | futurize_t<std::invoke_result_t<Func, Args...>> |
f67539c2 | 280 | async(Func&& func, Args&&... args) noexcept { |
11fdf7f2 TL |
281 | return async(thread_attributes{}, std::forward<Func>(func), std::forward<Args>(args)...); |
282 | } | |
283 | /// @} | |
284 | ||
285 | } |