]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | |
2 | // Copyright Oliver Kowalke 2013. | |
3 | // Distributed under the Boost Software License, Version 1.0. | |
4 | // (See accompanying file LICENSE_1_0.txt or copy at | |
5 | // http://www.boost.org/LICENSE_1_0.txt) | |
6 | ||
7 | #include "boost/fiber/scheduler.hpp" | |
8 | ||
9 | #include <chrono> | |
10 | #include <mutex> | |
11 | ||
12 | #include <boost/assert.hpp> | |
13 | ||
14 | #include "boost/fiber/algo/round_robin.hpp" | |
15 | #include "boost/fiber/context.hpp" | |
16 | #include "boost/fiber/exceptions.hpp" | |
17 | ||
18 | #ifdef BOOST_HAS_ABI_HEADERS | |
19 | # include BOOST_ABI_PREFIX | |
20 | #endif | |
21 | ||
22 | namespace boost { | |
23 | namespace fibers { | |
24 | ||
7c673cae FG |
25 | void |
26 | scheduler::release_terminated_() noexcept { | |
b32b8144 FG |
27 | while ( ! terminated_queue_.empty() ) { |
28 | context * ctx = & terminated_queue_.front(); | |
29 | terminated_queue_.pop_front(); | |
30 | BOOST_ASSERT( ctx->is_context( type::worker_context) ); | |
31 | BOOST_ASSERT( ! ctx->is_context( type::pinned_context) ); | |
32 | BOOST_ASSERT( this == ctx->get_scheduler() ); | |
33 | BOOST_ASSERT( ctx->is_resumable() ); | |
34 | BOOST_ASSERT( ! ctx->worker_is_linked() ); | |
7c673cae | 35 | BOOST_ASSERT( ! ctx->ready_is_linked() ); |
b32b8144 FG |
36 | #if ! defined(BOOST_FIBERS_NO_ATOMICS) |
37 | BOOST_ASSERT( ! ctx->remote_ready_is_linked() ); | |
38 | #endif | |
7c673cae | 39 | BOOST_ASSERT( ! ctx->sleep_is_linked() ); |
b32b8144 FG |
40 | BOOST_ASSERT( ! ctx->wait_is_linked() ); |
41 | BOOST_ASSERT( ctx->wait_queue_.empty() ); | |
42 | BOOST_ASSERT( ctx->terminated_); | |
7c673cae FG |
43 | // if last reference, e.g. fiber::join() or fiber::detach() |
44 | // have been already called, this will call ~context(), | |
45 | // the context is automatically removeid from worker-queue | |
46 | intrusive_ptr_release( ctx); | |
47 | } | |
48 | } | |
49 | ||
b32b8144 | 50 | #if ! defined(BOOST_FIBERS_NO_ATOMICS) |
7c673cae FG |
51 | void |
52 | scheduler::remote_ready2ready_() noexcept { | |
b32b8144 FG |
53 | remote_ready_queue_type tmp; |
54 | detail::spinlock_lock lk{ remote_ready_splk_ }; | |
55 | remote_ready_queue_.swap( tmp); | |
56 | lk.unlock(); | |
7c673cae | 57 | // get context from remote ready-queue |
b32b8144 FG |
58 | while ( ! tmp.empty() ) { |
59 | context * ctx = & tmp.front(); | |
60 | tmp.pop_front(); | |
61 | // ctx was signaled from remote (other thread) | |
62 | // ctx might have been already resumed because of | |
63 | // its wait-op. has been already timed out and | |
64 | // thus it was already pushed to the ready-queue | |
65 | if ( ! ctx->ready_is_linked() ) { | |
66 | // store context in local queues | |
67 | schedule( ctx); | |
68 | } | |
7c673cae | 69 | } |
7c673cae | 70 | } |
b32b8144 | 71 | #endif |
7c673cae FG |
72 | |
73 | void | |
74 | scheduler::sleep2ready_() noexcept { | |
75 | // move context which the deadline has reached | |
76 | // to ready-queue | |
77 | // sleep-queue is sorted (ascending) | |
b32b8144 FG |
78 | std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now(); |
79 | sleep_queue_type::iterator e = sleep_queue_.end(); | |
80 | for ( sleep_queue_type::iterator i = sleep_queue_.begin(); i != e;) { | |
7c673cae | 81 | context * ctx = & ( * i); |
b32b8144 | 82 | // dipatcher context must never be pushed to sleep-queue |
7c673cae | 83 | BOOST_ASSERT( ! ctx->is_context( type::dispatcher_context) ); |
b32b8144 | 84 | BOOST_ASSERT( main_ctx_ == ctx || ctx->worker_is_linked() ); |
7c673cae | 85 | BOOST_ASSERT( ! ctx->ready_is_linked() ); |
b32b8144 FG |
86 | #if ! defined(BOOST_FIBERS_NO_ATOMICS) |
87 | BOOST_ASSERT( ! ctx->remote_ready_is_linked() ); | |
88 | #endif | |
89 | BOOST_ASSERT( ! ctx->terminated_is_linked() ); | |
7c673cae FG |
90 | // set fiber to state_ready if deadline was reached |
91 | if ( ctx->tp_ <= now) { | |
92 | // remove context from sleep-queue | |
93 | i = sleep_queue_.erase( i); | |
94 | // reset sleep-tp | |
95 | ctx->tp_ = (std::chrono::steady_clock::time_point::max)(); | |
11fdf7f2 | 96 | std::intptr_t prev = ctx->twstatus.exchange( -2); |
b32b8144 FG |
97 | if ( static_cast< std::intptr_t >( -1) == prev) { |
98 | // timed-wait op.: timeout after notify | |
99 | continue; | |
100 | } | |
101 | // prev == 0: no timed-wait op. | |
102 | // prev == <any>: timed-wait op., timeout before notify | |
11fdf7f2 TL |
103 | // store context in local queues |
104 | schedule( ctx); | |
7c673cae FG |
105 | } else { |
106 | break; // first context with now < deadline | |
107 | } | |
108 | } | |
109 | } | |
110 | ||
111 | scheduler::scheduler() noexcept : | |
112 | algo_{ new algo::round_robin() } { | |
113 | } | |
114 | ||
115 | scheduler::~scheduler() { | |
116 | BOOST_ASSERT( nullptr != main_ctx_); | |
117 | BOOST_ASSERT( nullptr != dispatcher_ctx_.get() ); | |
118 | BOOST_ASSERT( context::active() == main_ctx_); | |
119 | // signal dispatcher-context termination | |
120 | shutdown_ = true; | |
121 | // resume pending fibers | |
122 | // by joining dispatcher-context | |
123 | dispatcher_ctx_->join(); | |
124 | // no context' in worker-queue | |
125 | BOOST_ASSERT( worker_queue_.empty() ); | |
126 | BOOST_ASSERT( terminated_queue_.empty() ); | |
7c673cae FG |
127 | BOOST_ASSERT( sleep_queue_.empty() ); |
128 | // set active context to nullptr | |
129 | context::reset_active(); | |
130 | // deallocate dispatcher-context | |
131 | BOOST_ASSERT( ! dispatcher_ctx_->ready_is_linked() ); | |
132 | dispatcher_ctx_.reset(); | |
133 | // set main-context to nullptr | |
134 | main_ctx_ = nullptr; | |
135 | } | |
136 | ||
11fdf7f2 | 137 | boost::context::fiber |
7c673cae | 138 | scheduler::dispatch() noexcept { |
7c673cae FG |
139 | BOOST_ASSERT( context::active() == dispatcher_ctx_); |
140 | for (;;) { | |
7c673cae FG |
141 | if ( shutdown_) { |
142 | // notify sched-algorithm about termination | |
143 | algo_->notify(); | |
b32b8144 | 144 | if ( worker_queue_.empty() ) { |
7c673cae FG |
145 | break; |
146 | } | |
147 | } | |
148 | // release terminated context' | |
149 | release_terminated_(); | |
b32b8144 | 150 | #if ! defined(BOOST_FIBERS_NO_ATOMICS) |
7c673cae FG |
151 | // get context' from remote ready-queue |
152 | remote_ready2ready_(); | |
b32b8144 | 153 | #endif |
7c673cae | 154 | // get sleeping context' |
11fdf7f2 | 155 | // must be called after remote_ready2ready_() |
7c673cae FG |
156 | sleep2ready_(); |
157 | // get next ready context | |
b32b8144 | 158 | context * ctx = algo_->pick_next(); |
7c673cae | 159 | if ( nullptr != ctx) { |
b32b8144 FG |
160 | BOOST_ASSERT( ctx->is_resumable() ); |
161 | BOOST_ASSERT( ! ctx->ready_is_linked() ); | |
162 | #if ! defined(BOOST_FIBERS_NO_ATOMICS) | |
163 | BOOST_ASSERT( ! ctx->remote_ready_is_linked() ); | |
164 | #endif | |
165 | BOOST_ASSERT( ! ctx->sleep_is_linked() ); | |
166 | BOOST_ASSERT( ! ctx->terminated_is_linked() ); | |
167 | // no test for '! ctx->wait_is_linked()' because | |
168 | // context is registered in wait-queue of sync. primitives | |
169 | // via wait_for()/wait_until() | |
7c673cae FG |
170 | // push dispatcher-context to ready-queue |
171 | // so that ready-queue never becomes empty | |
172 | ctx->resume( dispatcher_ctx_.get() ); | |
173 | BOOST_ASSERT( context::active() == dispatcher_ctx_.get() ); | |
174 | } else { | |
175 | // no ready context, wait till signaled | |
176 | // set deadline to highest value | |
177 | std::chrono::steady_clock::time_point suspend_time = | |
178 | (std::chrono::steady_clock::time_point::max)(); | |
179 | // get lowest deadline from sleep-queue | |
b32b8144 | 180 | sleep_queue_type::iterator i = sleep_queue_.begin(); |
7c673cae FG |
181 | if ( sleep_queue_.end() != i) { |
182 | suspend_time = i->tp_; | |
183 | } | |
184 | // no ready context, wait till signaled | |
185 | algo_->suspend_until( suspend_time); | |
186 | } | |
187 | } | |
188 | // release termianted context' | |
189 | release_terminated_(); | |
190 | // return to main-context | |
7c673cae | 191 | return main_ctx_->suspend_with_cc(); |
7c673cae FG |
192 | } |
193 | ||
194 | void | |
b32b8144 | 195 | scheduler::schedule( context * ctx) noexcept { |
7c673cae | 196 | BOOST_ASSERT( nullptr != ctx); |
b32b8144 FG |
197 | BOOST_ASSERT( ! ctx->ready_is_linked() ); |
198 | #if ! defined(BOOST_FIBERS_NO_ATOMICS) | |
199 | BOOST_ASSERT( ! ctx->remote_ready_is_linked() ); | |
200 | #endif | |
201 | BOOST_ASSERT( ! ctx->terminated_is_linked() ); | |
7c673cae FG |
202 | // remove context ctx from sleep-queue |
203 | // (might happen if blocked in timed_mutex::try_lock_until()) | |
204 | if ( ctx->sleep_is_linked() ) { | |
205 | // unlink it from sleep-queue | |
206 | ctx->sleep_unlink(); | |
207 | } | |
7c673cae FG |
208 | // push new context to ready-queue |
209 | algo_->awakened( ctx); | |
210 | } | |
211 | ||
b32b8144 | 212 | #if ! defined(BOOST_FIBERS_NO_ATOMICS) |
7c673cae | 213 | void |
b32b8144 | 214 | scheduler::schedule_from_remote( context * ctx) noexcept { |
7c673cae | 215 | BOOST_ASSERT( nullptr != ctx); |
b32b8144 | 216 | // another thread might signal the main-context of this thread |
7c673cae FG |
217 | BOOST_ASSERT( ! ctx->is_context( type::dispatcher_context) ); |
218 | BOOST_ASSERT( this == ctx->get_scheduler() ); | |
b32b8144 FG |
219 | BOOST_ASSERT( ! ctx->ready_is_linked() ); |
220 | BOOST_ASSERT( ! ctx->remote_ready_is_linked() ); | |
221 | BOOST_ASSERT( ! ctx->terminated_is_linked() ); | |
222 | BOOST_ASSERT( ! ctx->wait_is_linked() ); | |
7c673cae | 223 | // protect for concurrent access |
b32b8144 FG |
224 | detail::spinlock_lock lk{ remote_ready_splk_ }; |
225 | BOOST_ASSERT( ! shutdown_); | |
226 | BOOST_ASSERT( nullptr != main_ctx_); | |
227 | BOOST_ASSERT( nullptr != dispatcher_ctx_.get() ); | |
7c673cae | 228 | // push new context to remote ready-queue |
b32b8144 | 229 | ctx->remote_ready_link( remote_ready_queue_); |
92f5a8d4 | 230 | lk.unlock(); |
7c673cae FG |
231 | // notify scheduler |
232 | algo_->notify(); | |
233 | } | |
b32b8144 | 234 | #endif |
7c673cae | 235 | |
11fdf7f2 | 236 | boost::context::fiber |
b32b8144 FG |
237 | scheduler::terminate( detail::spinlock_lock & lk, context * ctx) noexcept { |
238 | BOOST_ASSERT( nullptr != ctx); | |
239 | BOOST_ASSERT( context::active() == ctx); | |
240 | BOOST_ASSERT( this == ctx->get_scheduler() ); | |
241 | BOOST_ASSERT( ctx->is_context( type::worker_context) ); | |
242 | BOOST_ASSERT( ! ctx->is_context( type::pinned_context) ); | |
243 | BOOST_ASSERT( ! ctx->ready_is_linked() ); | |
244 | #if ! defined(BOOST_FIBERS_NO_ATOMICS) | |
245 | BOOST_ASSERT( ! ctx->remote_ready_is_linked() ); | |
7c673cae | 246 | #endif |
b32b8144 FG |
247 | BOOST_ASSERT( ! ctx->sleep_is_linked() ); |
248 | BOOST_ASSERT( ! ctx->terminated_is_linked() ); | |
249 | BOOST_ASSERT( ! ctx->wait_is_linked() ); | |
250 | BOOST_ASSERT( ctx->wait_queue_.empty() ); | |
7c673cae | 251 | // store the terminated fiber in the terminated-queue |
b32b8144 FG |
252 | // the dispatcher-context will call |
253 | ctx->terminated_link( terminated_queue_); | |
254 | // remove from the worker-queue | |
255 | ctx->worker_unlink(); | |
256 | // release lock | |
257 | lk.unlock(); | |
7c673cae | 258 | // resume another fiber |
b32b8144 | 259 | return algo_->pick_next()->suspend_with_cc(); |
7c673cae FG |
260 | } |
261 | ||
262 | void | |
b32b8144 FG |
263 | scheduler::yield( context * ctx) noexcept { |
264 | BOOST_ASSERT( nullptr != ctx); | |
265 | BOOST_ASSERT( context::active() == ctx); | |
266 | BOOST_ASSERT( ctx->is_context( type::worker_context) || ctx->is_context( type::main_context) ); | |
267 | BOOST_ASSERT( ! ctx->ready_is_linked() ); | |
268 | #if ! defined(BOOST_FIBERS_NO_ATOMICS) | |
269 | BOOST_ASSERT( ! ctx->remote_ready_is_linked() ); | |
270 | #endif | |
271 | BOOST_ASSERT( ! ctx->sleep_is_linked() ); | |
272 | BOOST_ASSERT( ! ctx->terminated_is_linked() ); | |
273 | BOOST_ASSERT( ! ctx->wait_is_linked() ); | |
7c673cae | 274 | // resume another fiber |
b32b8144 | 275 | algo_->pick_next()->resume( ctx); |
7c673cae FG |
276 | } |
277 | ||
278 | bool | |
b32b8144 | 279 | scheduler::wait_until( context * ctx, |
7c673cae | 280 | std::chrono::steady_clock::time_point const& sleep_tp) noexcept { |
b32b8144 FG |
281 | BOOST_ASSERT( nullptr != ctx); |
282 | BOOST_ASSERT( context::active() == ctx); | |
283 | BOOST_ASSERT( ctx->is_context( type::worker_context) || ctx->is_context( type::main_context) ); | |
284 | BOOST_ASSERT( ! ctx->ready_is_linked() ); | |
285 | #if ! defined(BOOST_FIBERS_NO_ATOMICS) | |
286 | BOOST_ASSERT( ! ctx->remote_ready_is_linked() ); | |
287 | #endif | |
288 | BOOST_ASSERT( ! ctx->sleep_is_linked() ); | |
289 | BOOST_ASSERT( ! ctx->terminated_is_linked() ); | |
290 | BOOST_ASSERT( ! ctx->wait_is_linked() ); | |
291 | ctx->tp_ = sleep_tp; | |
292 | ctx->sleep_link( sleep_queue_); | |
7c673cae | 293 | // resume another context |
b32b8144 | 294 | algo_->pick_next()->resume(); |
7c673cae FG |
295 | // context has been resumed |
296 | // check if deadline has reached | |
297 | return std::chrono::steady_clock::now() < sleep_tp; | |
298 | } | |
299 | ||
300 | bool | |
b32b8144 | 301 | scheduler::wait_until( context * ctx, |
7c673cae FG |
302 | std::chrono::steady_clock::time_point const& sleep_tp, |
303 | detail::spinlock_lock & lk) noexcept { | |
b32b8144 FG |
304 | BOOST_ASSERT( nullptr != ctx); |
305 | BOOST_ASSERT( context::active() == ctx); | |
306 | BOOST_ASSERT( ctx->is_context( type::worker_context) || ctx->is_context( type::main_context) ); | |
307 | BOOST_ASSERT( ! ctx->ready_is_linked() ); | |
308 | #if ! defined(BOOST_FIBERS_NO_ATOMICS) | |
309 | BOOST_ASSERT( ! ctx->remote_ready_is_linked() ); | |
310 | #endif | |
311 | BOOST_ASSERT( ! ctx->sleep_is_linked() ); | |
312 | BOOST_ASSERT( ! ctx->terminated_is_linked() ); | |
313 | // ctx->wait_is_linked() might return true | |
7c673cae | 314 | // if context was locked inside timed_mutex::try_lock_until() |
7c673cae | 315 | // push active context to sleep-queue |
b32b8144 FG |
316 | ctx->tp_ = sleep_tp; |
317 | ctx->sleep_link( sleep_queue_); | |
7c673cae | 318 | // resume another context |
b32b8144 | 319 | algo_->pick_next()->resume( lk); |
7c673cae FG |
320 | // context has been resumed |
321 | // check if deadline has reached | |
322 | return std::chrono::steady_clock::now() < sleep_tp; | |
323 | } | |
324 | ||
325 | void | |
326 | scheduler::suspend() noexcept { | |
327 | // resume another context | |
b32b8144 | 328 | algo_->pick_next()->resume(); |
7c673cae FG |
329 | } |
330 | ||
331 | void | |
332 | scheduler::suspend( detail::spinlock_lock & lk) noexcept { | |
333 | // resume another context | |
b32b8144 | 334 | algo_->pick_next()->resume( lk); |
7c673cae FG |
335 | } |
336 | ||
337 | bool | |
338 | scheduler::has_ready_fibers() const noexcept { | |
339 | return algo_->has_ready_fibers(); | |
340 | } | |
341 | ||
342 | void | |
b32b8144 | 343 | scheduler::set_algo( algo::algorithm::ptr_t algo) noexcept { |
7c673cae FG |
344 | // move remaining cotnext in current scheduler to new one |
345 | while ( algo_->has_ready_fibers() ) { | |
346 | algo->awakened( algo_->pick_next() ); | |
347 | } | |
348 | algo_ = std::move( algo); | |
349 | } | |
350 | ||
351 | void | |
b32b8144 FG |
352 | scheduler::attach_main_context( context * ctx) noexcept { |
353 | BOOST_ASSERT( nullptr != ctx); | |
7c673cae FG |
354 | // main-context represents the execution context created |
355 | // by the system, e.g. main()- or thread-context | |
356 | // should not be in worker-queue | |
b32b8144 | 357 | main_ctx_ = ctx; |
7c673cae FG |
358 | main_ctx_->scheduler_ = this; |
359 | } | |
360 | ||
361 | void | |
b32b8144 FG |
362 | scheduler::attach_dispatcher_context( intrusive_ptr< context > ctx) noexcept { |
363 | BOOST_ASSERT( ctx); | |
7c673cae FG |
364 | // dispatcher context has to handle |
365 | // - remote ready context' | |
366 | // - sleeping context' | |
367 | // - extern event-loops | |
368 | // - suspending the thread if ready-queue is empty (waiting on external event) | |
369 | // should not be in worker-queue | |
b32b8144 | 370 | dispatcher_ctx_.swap( ctx); |
7c673cae FG |
371 | // add dispatcher-context to ready-queue |
372 | // so it is the first element in the ready-queue | |
373 | // if the main context tries to suspend the first time | |
374 | // the dispatcher-context is resumed and | |
375 | // scheduler::dispatch() is executed | |
376 | dispatcher_ctx_->scheduler_ = this; | |
377 | algo_->awakened( dispatcher_ctx_.get() ); | |
378 | } | |
379 | ||
380 | void | |
381 | scheduler::attach_worker_context( context * ctx) noexcept { | |
382 | BOOST_ASSERT( nullptr != ctx); | |
b32b8144 | 383 | BOOST_ASSERT( nullptr == ctx->get_scheduler() ); |
7c673cae | 384 | BOOST_ASSERT( ! ctx->ready_is_linked() ); |
b32b8144 FG |
385 | #if ! defined(BOOST_FIBERS_NO_ATOMICS) |
386 | BOOST_ASSERT( ! ctx->remote_ready_is_linked() ); | |
387 | #endif | |
7c673cae FG |
388 | BOOST_ASSERT( ! ctx->sleep_is_linked() ); |
389 | BOOST_ASSERT( ! ctx->terminated_is_linked() ); | |
390 | BOOST_ASSERT( ! ctx->wait_is_linked() ); | |
391 | BOOST_ASSERT( ! ctx->worker_is_linked() ); | |
7c673cae | 392 | ctx->worker_link( worker_queue_); |
b32b8144 FG |
393 | ctx->scheduler_ = this; |
394 | // an attached context must belong at least to worker-queue | |
7c673cae FG |
395 | } |
396 | ||
397 | void | |
398 | scheduler::detach_worker_context( context * ctx) noexcept { | |
399 | BOOST_ASSERT( nullptr != ctx); | |
400 | BOOST_ASSERT( ! ctx->ready_is_linked() ); | |
b32b8144 FG |
401 | #if ! defined(BOOST_FIBERS_NO_ATOMICS) |
402 | BOOST_ASSERT( ! ctx->remote_ready_is_linked() ); | |
403 | #endif | |
7c673cae FG |
404 | BOOST_ASSERT( ! ctx->sleep_is_linked() ); |
405 | BOOST_ASSERT( ! ctx->terminated_is_linked() ); | |
406 | BOOST_ASSERT( ! ctx->wait_is_linked() ); | |
b32b8144 | 407 | BOOST_ASSERT( ctx->worker_is_linked() ); |
7c673cae FG |
408 | BOOST_ASSERT( ! ctx->is_context( type::pinned_context) ); |
409 | ctx->worker_unlink(); | |
b32b8144 | 410 | BOOST_ASSERT( ! ctx->worker_is_linked() ); |
7c673cae | 411 | ctx->scheduler_ = nullptr; |
b32b8144 | 412 | // a detached context must not belong to any queue |
7c673cae FG |
413 | } |
414 | ||
415 | }} | |
416 | ||
417 | #ifdef BOOST_HAS_ABI_HEADERS | |
418 | # include BOOST_ABI_SUFFIX | |
419 | #endif |