2 // Copyright Oliver Kowalke 2013.
3 // Distributed under the Boost Software License, Version 1.0.
4 // (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
7 #include "boost/fiber/scheduler.hpp"
12 #include <boost/assert.hpp>
14 #include "boost/fiber/algo/round_robin.hpp"
15 #include "boost/fiber/context.hpp"
16 #include "boost/fiber/exceptions.hpp"
18 #ifdef BOOST_HAS_ABI_HEADERS
19 # include BOOST_ABI_PREFIX
26 scheduler::release_terminated_() noexcept
{
27 while ( ! terminated_queue_
.empty() ) {
28 context
* ctx
= & terminated_queue_
.front();
29 terminated_queue_
.pop_front();
30 BOOST_ASSERT( ctx
->is_context( type::worker_context
) );
31 BOOST_ASSERT( ! ctx
->is_context( type::pinned_context
) );
32 BOOST_ASSERT( this == ctx
->get_scheduler() );
33 BOOST_ASSERT( ctx
->is_resumable() );
34 BOOST_ASSERT( ! ctx
->worker_is_linked() );
35 BOOST_ASSERT( ! ctx
->ready_is_linked() );
36 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
37 BOOST_ASSERT( ! ctx
->remote_ready_is_linked() );
39 BOOST_ASSERT( ! ctx
->sleep_is_linked() );
40 BOOST_ASSERT( ! ctx
->wait_is_linked() );
41 BOOST_ASSERT( ctx
->wait_queue_
.empty() );
42 BOOST_ASSERT( ctx
->terminated_
);
43 // if last reference, e.g. fiber::join() or fiber::detach()
44 // have been already called, this will call ~context(),
45 // the context is automatically removeid from worker-queue
46 intrusive_ptr_release( ctx
);
50 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
52 scheduler::remote_ready2ready_() noexcept
{
53 remote_ready_queue_type tmp
;
54 detail::spinlock_lock lk
{ remote_ready_splk_
};
55 remote_ready_queue_
.swap( tmp
);
57 // get context from remote ready-queue
58 while ( ! tmp
.empty() ) {
59 context
* ctx
= & tmp
.front();
61 // ctx was signaled from remote (other thread)
62 // ctx might have been already resumed because of
63 // its wait-op. has been already timed out and
64 // thus it was already pushed to the ready-queue
65 if ( ! ctx
->ready_is_linked() ) {
66 // store context in local queues
74 scheduler::sleep2ready_() noexcept
{
75 // move context which the deadline has reached
77 // sleep-queue is sorted (ascending)
78 std::chrono::steady_clock::time_point now
= std::chrono::steady_clock::now();
79 sleep_queue_type::iterator e
= sleep_queue_
.end();
80 for ( sleep_queue_type::iterator i
= sleep_queue_
.begin(); i
!= e
;) {
81 context
* ctx
= & ( * i
);
82 // dipatcher context must never be pushed to sleep-queue
83 BOOST_ASSERT( ! ctx
->is_context( type::dispatcher_context
) );
84 BOOST_ASSERT( main_ctx_
== ctx
|| ctx
->worker_is_linked() );
85 BOOST_ASSERT( ! ctx
->ready_is_linked() );
86 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
87 BOOST_ASSERT( ! ctx
->remote_ready_is_linked() );
89 BOOST_ASSERT( ! ctx
->terminated_is_linked() );
90 // set fiber to state_ready if deadline was reached
91 if ( ctx
->tp_
<= now
) {
92 // remove context from sleep-queue
93 i
= sleep_queue_
.erase( i
);
95 ctx
->tp_
= (std::chrono::steady_clock::time_point::max
)();
96 std::intptr_t prev
= ctx
->twstatus
.exchange( -1);
97 if ( static_cast< std::intptr_t >( -1) == prev
) {
98 // timed-wait op.: timeout after notify
101 // prev == 0: no timed-wait op.
102 // prev == <any>: timed-wait op., timeout before notify
103 // push new context to ready-queue
104 algo_
->awakened( ctx
);
106 break; // first context with now < deadline
111 scheduler::scheduler() noexcept
:
112 algo_
{ new algo::round_robin() } {
115 scheduler::~scheduler() {
116 BOOST_ASSERT( nullptr != main_ctx_
);
117 BOOST_ASSERT( nullptr != dispatcher_ctx_
.get() );
118 BOOST_ASSERT( context::active() == main_ctx_
);
119 // signal dispatcher-context termination
121 // resume pending fibers
122 // by joining dispatcher-context
123 dispatcher_ctx_
->join();
124 // no context' in worker-queue
125 BOOST_ASSERT( worker_queue_
.empty() );
126 BOOST_ASSERT( terminated_queue_
.empty() );
127 BOOST_ASSERT( sleep_queue_
.empty() );
128 // set active context to nullptr
129 context::reset_active();
130 // deallocate dispatcher-context
131 BOOST_ASSERT( ! dispatcher_ctx_
->ready_is_linked() );
132 dispatcher_ctx_
.reset();
133 // set main-context to nullptr
137 boost::context::continuation
138 scheduler::dispatch() noexcept
{
139 BOOST_ASSERT( context::active() == dispatcher_ctx_
);
142 // notify sched-algorithm about termination
144 if ( worker_queue_
.empty() ) {
148 // release terminated context'
149 release_terminated_();
150 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
151 // get context' from remote ready-queue
152 remote_ready2ready_();
154 // get sleeping context'
156 // get next ready context
157 context
* ctx
= algo_
->pick_next();
158 if ( nullptr != ctx
) {
159 BOOST_ASSERT( ctx
->is_resumable() );
160 BOOST_ASSERT( ! ctx
->ready_is_linked() );
161 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
162 BOOST_ASSERT( ! ctx
->remote_ready_is_linked() );
164 BOOST_ASSERT( ! ctx
->sleep_is_linked() );
165 BOOST_ASSERT( ! ctx
->terminated_is_linked() );
166 // no test for '! ctx->wait_is_linked()' because
167 // context is registered in wait-queue of sync. primitives
168 // via wait_for()/wait_until()
169 // push dispatcher-context to ready-queue
170 // so that ready-queue never becomes empty
171 ctx
->resume( dispatcher_ctx_
.get() );
172 BOOST_ASSERT( context::active() == dispatcher_ctx_
.get() );
174 // no ready context, wait till signaled
175 // set deadline to highest value
176 std::chrono::steady_clock::time_point suspend_time
=
177 (std::chrono::steady_clock::time_point::max
)();
178 // get lowest deadline from sleep-queue
179 sleep_queue_type::iterator i
= sleep_queue_
.begin();
180 if ( sleep_queue_
.end() != i
) {
181 suspend_time
= i
->tp_
;
183 // no ready context, wait till signaled
184 algo_
->suspend_until( suspend_time
);
187 // release termianted context'
188 release_terminated_();
189 // return to main-context
190 return main_ctx_
->suspend_with_cc();
194 scheduler::schedule( context
* ctx
) noexcept
{
195 BOOST_ASSERT( nullptr != ctx
);
196 BOOST_ASSERT( ! ctx
->ready_is_linked() );
197 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
198 BOOST_ASSERT( ! ctx
->remote_ready_is_linked() );
200 BOOST_ASSERT( ! ctx
->terminated_is_linked() );
201 BOOST_ASSERT( ! ctx
->wait_is_linked() );
202 // remove context ctx from sleep-queue
203 // (might happen if blocked in timed_mutex::try_lock_until())
204 if ( ctx
->sleep_is_linked() ) {
205 // unlink it from sleep-queue
208 // push new context to ready-queue
209 algo_
->awakened( ctx
);
212 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
214 scheduler::schedule_from_remote( context
* ctx
) noexcept
{
215 BOOST_ASSERT( nullptr != ctx
);
216 // another thread might signal the main-context of this thread
217 BOOST_ASSERT( ! ctx
->is_context( type::dispatcher_context
) );
218 BOOST_ASSERT( this == ctx
->get_scheduler() );
219 BOOST_ASSERT( ! ctx
->ready_is_linked() );
220 BOOST_ASSERT( ! ctx
->remote_ready_is_linked() );
221 BOOST_ASSERT( ! ctx
->terminated_is_linked() );
222 BOOST_ASSERT( ! ctx
->wait_is_linked() );
223 // protect for concurrent access
224 detail::spinlock_lock lk
{ remote_ready_splk_
};
225 BOOST_ASSERT( ! shutdown_
);
226 BOOST_ASSERT( nullptr != main_ctx_
);
227 BOOST_ASSERT( nullptr != dispatcher_ctx_
.get() );
228 // push new context to remote ready-queue
229 ctx
->remote_ready_link( remote_ready_queue_
);
235 boost::context::continuation
236 scheduler::terminate( detail::spinlock_lock
& lk
, context
* ctx
) noexcept
{
237 BOOST_ASSERT( nullptr != ctx
);
238 BOOST_ASSERT( context::active() == ctx
);
239 BOOST_ASSERT( this == ctx
->get_scheduler() );
240 BOOST_ASSERT( ctx
->is_context( type::worker_context
) );
241 BOOST_ASSERT( ! ctx
->is_context( type::pinned_context
) );
242 BOOST_ASSERT( ! ctx
->ready_is_linked() );
243 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
244 BOOST_ASSERT( ! ctx
->remote_ready_is_linked() );
246 BOOST_ASSERT( ! ctx
->sleep_is_linked() );
247 BOOST_ASSERT( ! ctx
->terminated_is_linked() );
248 BOOST_ASSERT( ! ctx
->wait_is_linked() );
249 BOOST_ASSERT( ctx
->wait_queue_
.empty() );
250 // store the terminated fiber in the terminated-queue
251 // the dispatcher-context will call
252 ctx
->terminated_link( terminated_queue_
);
253 // remove from the worker-queue
254 ctx
->worker_unlink();
257 // resume another fiber
258 return algo_
->pick_next()->suspend_with_cc();
262 scheduler::yield( context
* ctx
) noexcept
{
263 BOOST_ASSERT( nullptr != ctx
);
264 BOOST_ASSERT( context::active() == ctx
);
265 BOOST_ASSERT( ctx
->is_context( type::worker_context
) || ctx
->is_context( type::main_context
) );
266 BOOST_ASSERT( ! ctx
->ready_is_linked() );
267 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
268 BOOST_ASSERT( ! ctx
->remote_ready_is_linked() );
270 BOOST_ASSERT( ! ctx
->sleep_is_linked() );
271 BOOST_ASSERT( ! ctx
->terminated_is_linked() );
272 BOOST_ASSERT( ! ctx
->wait_is_linked() );
273 // resume another fiber
274 algo_
->pick_next()->resume( ctx
);
278 scheduler::wait_until( context
* ctx
,
279 std::chrono::steady_clock::time_point
const& sleep_tp
) noexcept
{
280 BOOST_ASSERT( nullptr != ctx
);
281 BOOST_ASSERT( context::active() == ctx
);
282 BOOST_ASSERT( ctx
->is_context( type::worker_context
) || ctx
->is_context( type::main_context
) );
283 BOOST_ASSERT( ! ctx
->ready_is_linked() );
284 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
285 BOOST_ASSERT( ! ctx
->remote_ready_is_linked() );
287 BOOST_ASSERT( ! ctx
->sleep_is_linked() );
288 BOOST_ASSERT( ! ctx
->terminated_is_linked() );
289 BOOST_ASSERT( ! ctx
->wait_is_linked() );
291 ctx
->sleep_link( sleep_queue_
);
292 // resume another context
293 algo_
->pick_next()->resume();
294 // context has been resumed
295 // check if deadline has reached
296 return std::chrono::steady_clock::now() < sleep_tp
;
300 scheduler::wait_until( context
* ctx
,
301 std::chrono::steady_clock::time_point
const& sleep_tp
,
302 detail::spinlock_lock
& lk
) noexcept
{
303 BOOST_ASSERT( nullptr != ctx
);
304 BOOST_ASSERT( context::active() == ctx
);
305 BOOST_ASSERT( ctx
->is_context( type::worker_context
) || ctx
->is_context( type::main_context
) );
306 BOOST_ASSERT( ! ctx
->ready_is_linked() );
307 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
308 BOOST_ASSERT( ! ctx
->remote_ready_is_linked() );
310 BOOST_ASSERT( ! ctx
->sleep_is_linked() );
311 BOOST_ASSERT( ! ctx
->terminated_is_linked() );
312 // ctx->wait_is_linked() might return true
313 // if context was locked inside timed_mutex::try_lock_until()
314 // push active context to sleep-queue
316 ctx
->sleep_link( sleep_queue_
);
317 // resume another context
318 algo_
->pick_next()->resume( lk
);
319 // context has been resumed
320 // check if deadline has reached
321 return std::chrono::steady_clock::now() < sleep_tp
;
325 scheduler::suspend() noexcept
{
326 // resume another context
327 algo_
->pick_next()->resume();
331 scheduler::suspend( detail::spinlock_lock
& lk
) noexcept
{
332 // resume another context
333 algo_
->pick_next()->resume( lk
);
337 scheduler::has_ready_fibers() const noexcept
{
338 return algo_
->has_ready_fibers();
342 scheduler::set_algo( algo::algorithm::ptr_t algo
) noexcept
{
343 // move remaining cotnext in current scheduler to new one
344 while ( algo_
->has_ready_fibers() ) {
345 algo
->awakened( algo_
->pick_next() );
347 algo_
= std::move( algo
);
351 scheduler::attach_main_context( context
* ctx
) noexcept
{
352 BOOST_ASSERT( nullptr != ctx
);
353 // main-context represents the execution context created
354 // by the system, e.g. main()- or thread-context
355 // should not be in worker-queue
357 main_ctx_
->scheduler_
= this;
361 scheduler::attach_dispatcher_context( intrusive_ptr
< context
> ctx
) noexcept
{
363 // dispatcher context has to handle
364 // - remote ready context'
365 // - sleeping context'
366 // - extern event-loops
367 // - suspending the thread if ready-queue is empty (waiting on external event)
368 // should not be in worker-queue
369 dispatcher_ctx_
.swap( ctx
);
370 // add dispatcher-context to ready-queue
371 // so it is the first element in the ready-queue
372 // if the main context tries to suspend the first time
373 // the dispatcher-context is resumed and
374 // scheduler::dispatch() is executed
375 dispatcher_ctx_
->scheduler_
= this;
376 algo_
->awakened( dispatcher_ctx_
.get() );
380 scheduler::attach_worker_context( context
* ctx
) noexcept
{
381 BOOST_ASSERT( nullptr != ctx
);
382 BOOST_ASSERT( nullptr == ctx
->get_scheduler() );
383 BOOST_ASSERT( ! ctx
->ready_is_linked() );
384 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
385 BOOST_ASSERT( ! ctx
->remote_ready_is_linked() );
387 BOOST_ASSERT( ! ctx
->sleep_is_linked() );
388 BOOST_ASSERT( ! ctx
->terminated_is_linked() );
389 BOOST_ASSERT( ! ctx
->wait_is_linked() );
390 BOOST_ASSERT( ! ctx
->worker_is_linked() );
391 ctx
->worker_link( worker_queue_
);
392 ctx
->scheduler_
= this;
393 // an attached context must belong at least to worker-queue
397 scheduler::detach_worker_context( context
* ctx
) noexcept
{
398 BOOST_ASSERT( nullptr != ctx
);
399 BOOST_ASSERT( ! ctx
->ready_is_linked() );
400 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
401 BOOST_ASSERT( ! ctx
->remote_ready_is_linked() );
403 BOOST_ASSERT( ! ctx
->sleep_is_linked() );
404 BOOST_ASSERT( ! ctx
->terminated_is_linked() );
405 BOOST_ASSERT( ! ctx
->wait_is_linked() );
406 BOOST_ASSERT( ctx
->worker_is_linked() );
407 BOOST_ASSERT( ! ctx
->is_context( type::pinned_context
) );
408 ctx
->worker_unlink();
409 BOOST_ASSERT( ! ctx
->worker_is_linked() );
410 ctx
->scheduler_
= nullptr;
411 // a detached context must not belong to any queue
416 #ifdef BOOST_HAS_ABI_HEADERS
417 # include BOOST_ABI_SUFFIX