2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
20 * Copyright (C) 2015 Cloudius Systems, Ltd.
25 #include <seastar/core/thread_impl.hh>
26 #include <seastar/core/future.hh>
27 #include <seastar/core/do_with.hh>
28 #include <seastar/core/future-util.hh>
29 #include <seastar/core/timer.hh>
30 #include <seastar/core/reactor.hh>
31 #include <seastar/core/scheduling.hh>
34 #include <type_traits>
36 #include <seastar/util/std-compat.hh>
38 #include <boost/intrusive/list.hpp>
40 /// \defgroup thread-module Seastar threads
42 /// Seastar threads provide an execution environment where blocking
43 /// is tolerated; you can issue I/O, and wait for it in the same function,
44 /// rather then establishing a callback to be called with \ref future<>::then().
46 /// Seastar threads are not the same as operating system threads:
47 /// - seastar threads are cooperative; they are never preempted except
48 /// at blocking points (see below)
49 /// - seastar threads always run on the same core they were launched on
51 /// Like other seastar code, seastar threads may not issue blocking system calls.
53 /// A seastar thread blocking point is any function that returns a \ref future<>.
54 /// you block by calling \ref future<>::get(); this waits for the future to become
55 /// available, and in the meanwhile, other seastar threads and seastar non-threaded
60 /// seastar::thread th([] {
61 /// sleep(5s).get(); // blocking point
65 /// An easy way to launch a thread and carry out some computation, and return a
66 /// result from this execution is by using the \ref seastar::async() function.
67 /// The result is returned as a future, so that non-threaded code can wait for
68 /// the thread to terminate and yield a result.
70 /// Seastar API namespace
73 /// \addtogroup thread-module
77 class thread_attributes;
78 class thread_scheduling_group;
80 /// Class that holds attributes controling the behavior of a thread.
81 class thread_attributes {
83 thread_scheduling_group* scheduling_group = nullptr; // FIXME: remove
84 compat::optional<seastar::scheduling_group> sched_group;
89 extern thread_local jmp_buf_link g_unthreaded_context;
91 // Internal class holding thread state. We can't hold this in
92 // \c thread itself because \c thread is movable, and we want pointers
93 // to this state to be captured.
94 class thread_context {
95 struct stack_deleter {
96 void operator()(char *ptr) const noexcept;
98 using stack_holder = std::unique_ptr<char[], stack_deleter>;
99 static constexpr size_t base_stack_size = 128*1024;
101 thread_attributes _attr;
102 #ifdef SEASTAR_THREAD_STACK_GUARDS
103 const size_t _stack_size;
105 static constexpr size_t _stack_size = base_stack_size;
107 stack_holder _stack{make_stack()};
108 std::function<void ()> _func;
109 jmp_buf_link _context;
110 scheduling_group _scheduling_group;
112 bool _joined = false;
113 timer<> _sched_timer{[this] { reschedule(); }};
114 compat::optional<promise<>> _sched_promise;
116 boost::intrusive::list_member_hook<> _preempted_link;
117 using preempted_thread_list = boost::intrusive::list<thread_context,
118 boost::intrusive::member_hook<thread_context, boost::intrusive::list_member_hook<>,
119 &thread_context::_preempted_link>,
120 boost::intrusive::constant_time_size<false>>;
122 boost::intrusive::list_member_hook<> _all_link;
123 using all_thread_list = boost::intrusive::list<thread_context,
124 boost::intrusive::member_hook<thread_context, boost::intrusive::list_member_hook<>,
125 &thread_context::_all_link>,
126 boost::intrusive::constant_time_size<false>>;
128 static thread_local preempted_thread_list _preempted_threads;
129 static thread_local all_thread_list _all_threads;
131 static void s_main(int lo, int hi); // all parameters MUST be 'int' for makecontext
134 stack_holder make_stack();
136 thread_context(thread_attributes attr, std::function<void ()> func);
140 bool should_yield() const;
144 friend void thread_impl::switch_in(thread_context*);
145 friend void thread_impl::switch_out(thread_context*);
146 friend scheduling_group thread_impl::sched_group(const thread_context*);
152 /// \brief thread - stateful thread of execution
154 /// Threads allow using seastar APIs in a blocking manner,
155 /// by calling future::get() on a non-ready future. When
156 /// this happens, the thread is put to sleep until the future
159 std::unique_ptr<thread_context> _context;
160 static thread_local thread* _current;
162 /// \brief Constructs a \c thread object that does not represent a thread
165 /// \brief Constructs a \c thread object that represents a thread of execution
167 /// \param func Callable object to execute in thread. The callable is
168 /// called immediately.
169 template <typename Func>
171 /// \brief Constructs a \c thread object that represents a thread of execution
173 /// \param attr Attributes describing the new thread.
174 /// \param func Callable object to execute in thread. The callable is
175 /// called immediately.
176 template <typename Func>
177 thread(thread_attributes attr, Func func);
178 /// \brief Moves a thread object.
179 thread(thread&& x) noexcept = default;
180 /// \brief Move-assigns a thread object.
181 thread& operator=(thread&& x) noexcept = default;
182 /// \brief Destroys a \c thread object.
184 /// The thread must not represent a running thread of execution (see join()).
185 ~thread() { assert(!_context || _context->_joined); }
186 /// \brief Waits for thread execution to terminate.
188 /// Waits for thread execution to terminate, and marks the thread object as not
189 /// representing a running thread of execution.
191 /// \brief Voluntarily defer execution of current thread.
193 /// Gives other threads/fibers a chance to run on current CPU.
194 /// The current thread will resume execution promptly.
196 /// \brief Checks whether this thread ought to call yield() now.
198 /// Useful where we cannot call yield() immediately because we
199 /// Need to take some cleanup action first.
200 static bool should_yield();
202 static bool running_in_thread() {
203 return thread_impl::get() != nullptr;
206 friend class reactor;
207 // To be used by seastar reactor only.
208 static bool try_run_one_yielded_thread();
211 /// An instance of this class can be used to assign a thread to a particular scheduling group.
212 /// Threads can share the same scheduling group if they hold a pointer to the same instance
215 /// All threads that belongs to a scheduling group will have a time granularity defined by \c period,
216 /// and can specify a fraction \c usage of that period that indicates the maximum amount of time they
217 /// expect to run. \c usage, is expected to be a number between 0 and 1 for this to have any effect.
218 /// Numbers greater than 1 are allowed for simplicity, but they just have the same meaning of 1, alas,
219 /// "the whole period".
221 /// Note that this is not a preemptive runtime, and a thread will not exit the CPU unless it is scheduled out.
222 /// In that case, \c usage will not be enforced and the thread will simply run until it loses the CPU.
223 /// This can happen when a thread waits on a future that is not ready, or when it voluntarily call yield.
225 /// Unlike what happens for a thread that is not part of a scheduling group - which puts itself at the back
226 /// of the runqueue everytime it yields, a thread that is part of a scheduling group will only yield if
227 /// it has exhausted its \c usage at the call to yield. Therefore, threads in a schedule group can and
228 /// should yield often.
230 /// After those events, if the thread has already run for more than its fraction, it will be scheduled to
231 /// run again only after \c period completes, unless there are no other tasks to run (the system is
233 class thread_scheduling_group {
234 std::chrono::nanoseconds _period;
235 std::chrono::nanoseconds _quota;
236 std::chrono::time_point<thread_clock> _this_period_ends = {};
237 std::chrono::time_point<thread_clock> _this_run_start = {};
238 std::chrono::nanoseconds _this_period_remain = {};
240 /// \brief Constructs a \c thread_scheduling_group object
242 /// \param period a duration representing the period
243 /// \param usage which fraction of the \c period to assign for the scheduling group. Expected between 0 and 1.
244 thread_scheduling_group(std::chrono::nanoseconds period, float usage);
245 /// \brief changes the current maximum usage per period
247 /// \param new_usage The new fraction of the \c period (Expected between 0 and 1) during which to run
248 void update_usage(float new_usage) {
249 _quota = std::chrono::duration_cast<std::chrono::nanoseconds>(new_usage * _period);
252 void account_start();
254 compat::optional<thread_clock::time_point> next_scheduling_point() const;
255 friend class thread_context;
258 template <typename Func>
260 thread::thread(thread_attributes attr, Func func)
261 : _context(std::make_unique<thread_context>(std::move(attr), func)) {
264 template <typename Func>
266 thread::thread(Func func)
267 : thread(thread_attributes(), std::move(func)) {
273 _context->_joined = true;
274 return _context->_done.get_future();
277 /// Executes a callable in a seastar thread.
279 /// Runs a block of code in a threaded context,
280 /// which allows it to block (using \ref future::get()). The
281 /// result of the callable is returned as a future.
283 /// \param attr a \ref thread_attributes instance
284 /// \param func a callable to be executed in a thread
285 /// \param args a parameter pack to be forwarded to \c func.
286 /// \return whatever \c func returns, as a future.
290 /// future<int> compute_sum(int a, int b) {
291 /// thread_attributes attr = {};
292 /// attr.scheduling_group = some_scheduling_group_ptr;
293 /// return seastar::async(attr, [a, b] {
294 /// // some blocking code:
300 template <typename Func, typename... Args>
302 futurize_t<std::result_of_t<std::decay_t<Func>(std::decay_t<Args>...)>>
303 async(thread_attributes attr, Func&& func, Args&&... args) {
304 using return_type = std::result_of_t<std::decay_t<Func>(std::decay_t<Args>...)>;
306 thread_attributes attr;
308 std::tuple<Args...> args;
309 promise<return_type> pr;
312 return do_with(work{std::move(attr), std::forward<Func>(func), std::forward_as_tuple(std::forward<Args>(args)...)}, [] (work& w) mutable {
313 auto ret = w.pr.get_future();
314 w.th = thread(std::move(w.attr), [&w] {
315 futurize<return_type>::apply(std::move(w.func), std::move(w.args)).forward_to(std::move(w.pr));
317 return w.th.join().then([ret = std::move(ret)] () mutable {
318 return std::move(ret);
323 /// Executes a callable in a seastar thread.
325 /// Runs a block of code in a threaded context,
326 /// which allows it to block (using \ref future::get()). The
327 /// result of the callable is returned as a future.
329 /// \param func a callable to be executed in a thread
330 /// \param args a parameter pack to be forwarded to \c func.
331 /// \return whatever \c func returns, as a future.
332 template <typename Func, typename... Args>
334 futurize_t<std::result_of_t<std::decay_t<Func>(std::decay_t<Args>...)>>
335 async(Func&& func, Args&&... args) {
336 return async(thread_attributes{}, std::forward<Func>(func), std::forward<Args>(args)...);