]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/libs/fiber/src/scheduler.cpp
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / boost / libs / fiber / src / scheduler.cpp
CommitLineData
7c673cae
FG
1
2// Copyright Oliver Kowalke 2013.
3// Distributed under the Boost Software License, Version 1.0.
4// (See accompanying file LICENSE_1_0.txt or copy at
5// http://www.boost.org/LICENSE_1_0.txt)
6
7#include "boost/fiber/scheduler.hpp"
8
9#include <chrono>
10#include <mutex>
11
12#include <boost/assert.hpp>
13
14#include "boost/fiber/algo/round_robin.hpp"
15#include "boost/fiber/context.hpp"
16#include "boost/fiber/exceptions.hpp"
17
18#ifdef BOOST_HAS_ABI_HEADERS
19# include BOOST_ABI_PREFIX
20#endif
21
22namespace boost {
23namespace fibers {
24
7c673cae
FG
25void
26scheduler::release_terminated_() noexcept {
b32b8144
FG
27 while ( ! terminated_queue_.empty() ) {
28 context * ctx = & terminated_queue_.front();
29 terminated_queue_.pop_front();
30 BOOST_ASSERT( ctx->is_context( type::worker_context) );
31 BOOST_ASSERT( ! ctx->is_context( type::pinned_context) );
32 BOOST_ASSERT( this == ctx->get_scheduler() );
33 BOOST_ASSERT( ctx->is_resumable() );
34 BOOST_ASSERT( ! ctx->worker_is_linked() );
7c673cae 35 BOOST_ASSERT( ! ctx->ready_is_linked() );
b32b8144
FG
36#if ! defined(BOOST_FIBERS_NO_ATOMICS)
37 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
38#endif
7c673cae 39 BOOST_ASSERT( ! ctx->sleep_is_linked() );
b32b8144
FG
40 BOOST_ASSERT( ! ctx->wait_is_linked() );
41 BOOST_ASSERT( ctx->wait_queue_.empty() );
42 BOOST_ASSERT( ctx->terminated_);
7c673cae
FG
43 // if last reference, e.g. fiber::join() or fiber::detach()
44 // have been already called, this will call ~context(),
45 // the context is automatically removeid from worker-queue
46 intrusive_ptr_release( ctx);
47 }
48}
49
b32b8144 50#if ! defined(BOOST_FIBERS_NO_ATOMICS)
7c673cae
FG
51void
52scheduler::remote_ready2ready_() noexcept {
b32b8144
FG
53 remote_ready_queue_type tmp;
54 detail::spinlock_lock lk{ remote_ready_splk_ };
55 remote_ready_queue_.swap( tmp);
56 lk.unlock();
7c673cae 57 // get context from remote ready-queue
b32b8144
FG
58 while ( ! tmp.empty() ) {
59 context * ctx = & tmp.front();
60 tmp.pop_front();
61 // ctx was signaled from remote (other thread)
62 // ctx might have been already resumed because of
63 // its wait-op. has been already timed out and
64 // thus it was already pushed to the ready-queue
65 if ( ! ctx->ready_is_linked() ) {
66 // store context in local queues
67 schedule( ctx);
68 }
7c673cae 69 }
7c673cae 70}
b32b8144 71#endif
7c673cae
FG
72
73void
74scheduler::sleep2ready_() noexcept {
75 // move context which the deadline has reached
76 // to ready-queue
77 // sleep-queue is sorted (ascending)
b32b8144
FG
78 std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
79 sleep_queue_type::iterator e = sleep_queue_.end();
80 for ( sleep_queue_type::iterator i = sleep_queue_.begin(); i != e;) {
7c673cae 81 context * ctx = & ( * i);
b32b8144 82 // dipatcher context must never be pushed to sleep-queue
7c673cae 83 BOOST_ASSERT( ! ctx->is_context( type::dispatcher_context) );
b32b8144 84 BOOST_ASSERT( main_ctx_ == ctx || ctx->worker_is_linked() );
7c673cae 85 BOOST_ASSERT( ! ctx->ready_is_linked() );
b32b8144
FG
86#if ! defined(BOOST_FIBERS_NO_ATOMICS)
87 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
88#endif
89 BOOST_ASSERT( ! ctx->terminated_is_linked() );
7c673cae
FG
90 // set fiber to state_ready if deadline was reached
91 if ( ctx->tp_ <= now) {
92 // remove context from sleep-queue
93 i = sleep_queue_.erase( i);
94 // reset sleep-tp
95 ctx->tp_ = (std::chrono::steady_clock::time_point::max)();
11fdf7f2 96 std::intptr_t prev = ctx->twstatus.exchange( -2);
b32b8144
FG
97 if ( static_cast< std::intptr_t >( -1) == prev) {
98 // timed-wait op.: timeout after notify
99 continue;
100 }
101 // prev == 0: no timed-wait op.
102 // prev == <any>: timed-wait op., timeout before notify
11fdf7f2
TL
103 // store context in local queues
104 schedule( ctx);
7c673cae
FG
105 } else {
106 break; // first context with now < deadline
107 }
108 }
109}
110
111scheduler::scheduler() noexcept :
112 algo_{ new algo::round_robin() } {
113}
114
115scheduler::~scheduler() {
116 BOOST_ASSERT( nullptr != main_ctx_);
117 BOOST_ASSERT( nullptr != dispatcher_ctx_.get() );
118 BOOST_ASSERT( context::active() == main_ctx_);
119 // signal dispatcher-context termination
120 shutdown_ = true;
121 // resume pending fibers
122 // by joining dispatcher-context
123 dispatcher_ctx_->join();
124 // no context' in worker-queue
125 BOOST_ASSERT( worker_queue_.empty() );
126 BOOST_ASSERT( terminated_queue_.empty() );
7c673cae
FG
127 BOOST_ASSERT( sleep_queue_.empty() );
128 // set active context to nullptr
129 context::reset_active();
130 // deallocate dispatcher-context
131 BOOST_ASSERT( ! dispatcher_ctx_->ready_is_linked() );
132 dispatcher_ctx_.reset();
133 // set main-context to nullptr
134 main_ctx_ = nullptr;
135}
136
11fdf7f2 137boost::context::fiber
7c673cae 138scheduler::dispatch() noexcept {
7c673cae
FG
139 BOOST_ASSERT( context::active() == dispatcher_ctx_);
140 for (;;) {
7c673cae
FG
141 if ( shutdown_) {
142 // notify sched-algorithm about termination
143 algo_->notify();
b32b8144 144 if ( worker_queue_.empty() ) {
7c673cae
FG
145 break;
146 }
147 }
148 // release terminated context'
149 release_terminated_();
b32b8144 150#if ! defined(BOOST_FIBERS_NO_ATOMICS)
7c673cae
FG
151 // get context' from remote ready-queue
152 remote_ready2ready_();
b32b8144 153#endif
7c673cae 154 // get sleeping context'
11fdf7f2 155 // must be called after remote_ready2ready_()
7c673cae
FG
156 sleep2ready_();
157 // get next ready context
b32b8144 158 context * ctx = algo_->pick_next();
7c673cae 159 if ( nullptr != ctx) {
b32b8144
FG
160 BOOST_ASSERT( ctx->is_resumable() );
161 BOOST_ASSERT( ! ctx->ready_is_linked() );
162#if ! defined(BOOST_FIBERS_NO_ATOMICS)
163 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
164#endif
165 BOOST_ASSERT( ! ctx->sleep_is_linked() );
166 BOOST_ASSERT( ! ctx->terminated_is_linked() );
167 // no test for '! ctx->wait_is_linked()' because
168 // context is registered in wait-queue of sync. primitives
169 // via wait_for()/wait_until()
7c673cae
FG
170 // push dispatcher-context to ready-queue
171 // so that ready-queue never becomes empty
172 ctx->resume( dispatcher_ctx_.get() );
173 BOOST_ASSERT( context::active() == dispatcher_ctx_.get() );
174 } else {
175 // no ready context, wait till signaled
176 // set deadline to highest value
177 std::chrono::steady_clock::time_point suspend_time =
178 (std::chrono::steady_clock::time_point::max)();
179 // get lowest deadline from sleep-queue
b32b8144 180 sleep_queue_type::iterator i = sleep_queue_.begin();
7c673cae
FG
181 if ( sleep_queue_.end() != i) {
182 suspend_time = i->tp_;
183 }
184 // no ready context, wait till signaled
185 algo_->suspend_until( suspend_time);
186 }
187 }
188 // release termianted context'
189 release_terminated_();
190 // return to main-context
7c673cae 191 return main_ctx_->suspend_with_cc();
7c673cae
FG
192}
193
194void
b32b8144 195scheduler::schedule( context * ctx) noexcept {
7c673cae 196 BOOST_ASSERT( nullptr != ctx);
b32b8144
FG
197 BOOST_ASSERT( ! ctx->ready_is_linked() );
198#if ! defined(BOOST_FIBERS_NO_ATOMICS)
199 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
200#endif
201 BOOST_ASSERT( ! ctx->terminated_is_linked() );
7c673cae
FG
202 // remove context ctx from sleep-queue
203 // (might happen if blocked in timed_mutex::try_lock_until())
204 if ( ctx->sleep_is_linked() ) {
205 // unlink it from sleep-queue
206 ctx->sleep_unlink();
207 }
7c673cae
FG
208 // push new context to ready-queue
209 algo_->awakened( ctx);
210}
211
b32b8144 212#if ! defined(BOOST_FIBERS_NO_ATOMICS)
7c673cae 213void
b32b8144 214scheduler::schedule_from_remote( context * ctx) noexcept {
7c673cae 215 BOOST_ASSERT( nullptr != ctx);
b32b8144 216 // another thread might signal the main-context of this thread
7c673cae
FG
217 BOOST_ASSERT( ! ctx->is_context( type::dispatcher_context) );
218 BOOST_ASSERT( this == ctx->get_scheduler() );
b32b8144
FG
219 BOOST_ASSERT( ! ctx->ready_is_linked() );
220 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
221 BOOST_ASSERT( ! ctx->terminated_is_linked() );
222 BOOST_ASSERT( ! ctx->wait_is_linked() );
7c673cae 223 // protect for concurrent access
b32b8144
FG
224 detail::spinlock_lock lk{ remote_ready_splk_ };
225 BOOST_ASSERT( ! shutdown_);
226 BOOST_ASSERT( nullptr != main_ctx_);
227 BOOST_ASSERT( nullptr != dispatcher_ctx_.get() );
7c673cae 228 // push new context to remote ready-queue
b32b8144 229 ctx->remote_ready_link( remote_ready_queue_);
92f5a8d4 230 lk.unlock();
7c673cae
FG
231 // notify scheduler
232 algo_->notify();
233}
b32b8144 234#endif
7c673cae 235
11fdf7f2 236boost::context::fiber
b32b8144
FG
237scheduler::terminate( detail::spinlock_lock & lk, context * ctx) noexcept {
238 BOOST_ASSERT( nullptr != ctx);
239 BOOST_ASSERT( context::active() == ctx);
240 BOOST_ASSERT( this == ctx->get_scheduler() );
241 BOOST_ASSERT( ctx->is_context( type::worker_context) );
242 BOOST_ASSERT( ! ctx->is_context( type::pinned_context) );
243 BOOST_ASSERT( ! ctx->ready_is_linked() );
244#if ! defined(BOOST_FIBERS_NO_ATOMICS)
245 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
7c673cae 246#endif
b32b8144
FG
247 BOOST_ASSERT( ! ctx->sleep_is_linked() );
248 BOOST_ASSERT( ! ctx->terminated_is_linked() );
249 BOOST_ASSERT( ! ctx->wait_is_linked() );
250 BOOST_ASSERT( ctx->wait_queue_.empty() );
7c673cae 251 // store the terminated fiber in the terminated-queue
b32b8144
FG
252 // the dispatcher-context will call
253 ctx->terminated_link( terminated_queue_);
254 // remove from the worker-queue
255 ctx->worker_unlink();
256 // release lock
257 lk.unlock();
7c673cae 258 // resume another fiber
b32b8144 259 return algo_->pick_next()->suspend_with_cc();
7c673cae
FG
260}
261
262void
b32b8144
FG
263scheduler::yield( context * ctx) noexcept {
264 BOOST_ASSERT( nullptr != ctx);
265 BOOST_ASSERT( context::active() == ctx);
266 BOOST_ASSERT( ctx->is_context( type::worker_context) || ctx->is_context( type::main_context) );
267 BOOST_ASSERT( ! ctx->ready_is_linked() );
268#if ! defined(BOOST_FIBERS_NO_ATOMICS)
269 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
270#endif
271 BOOST_ASSERT( ! ctx->sleep_is_linked() );
272 BOOST_ASSERT( ! ctx->terminated_is_linked() );
273 BOOST_ASSERT( ! ctx->wait_is_linked() );
7c673cae 274 // resume another fiber
b32b8144 275 algo_->pick_next()->resume( ctx);
7c673cae
FG
276}
277
278bool
b32b8144 279scheduler::wait_until( context * ctx,
7c673cae 280 std::chrono::steady_clock::time_point const& sleep_tp) noexcept {
b32b8144
FG
281 BOOST_ASSERT( nullptr != ctx);
282 BOOST_ASSERT( context::active() == ctx);
283 BOOST_ASSERT( ctx->is_context( type::worker_context) || ctx->is_context( type::main_context) );
284 BOOST_ASSERT( ! ctx->ready_is_linked() );
285#if ! defined(BOOST_FIBERS_NO_ATOMICS)
286 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
287#endif
288 BOOST_ASSERT( ! ctx->sleep_is_linked() );
289 BOOST_ASSERT( ! ctx->terminated_is_linked() );
290 BOOST_ASSERT( ! ctx->wait_is_linked() );
291 ctx->tp_ = sleep_tp;
292 ctx->sleep_link( sleep_queue_);
7c673cae 293 // resume another context
b32b8144 294 algo_->pick_next()->resume();
7c673cae
FG
295 // context has been resumed
296 // check if deadline has reached
297 return std::chrono::steady_clock::now() < sleep_tp;
298}
299
300bool
b32b8144 301scheduler::wait_until( context * ctx,
7c673cae
FG
302 std::chrono::steady_clock::time_point const& sleep_tp,
303 detail::spinlock_lock & lk) noexcept {
b32b8144
FG
304 BOOST_ASSERT( nullptr != ctx);
305 BOOST_ASSERT( context::active() == ctx);
306 BOOST_ASSERT( ctx->is_context( type::worker_context) || ctx->is_context( type::main_context) );
307 BOOST_ASSERT( ! ctx->ready_is_linked() );
308#if ! defined(BOOST_FIBERS_NO_ATOMICS)
309 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
310#endif
311 BOOST_ASSERT( ! ctx->sleep_is_linked() );
312 BOOST_ASSERT( ! ctx->terminated_is_linked() );
313 // ctx->wait_is_linked() might return true
7c673cae 314 // if context was locked inside timed_mutex::try_lock_until()
7c673cae 315 // push active context to sleep-queue
b32b8144
FG
316 ctx->tp_ = sleep_tp;
317 ctx->sleep_link( sleep_queue_);
7c673cae 318 // resume another context
b32b8144 319 algo_->pick_next()->resume( lk);
7c673cae
FG
320 // context has been resumed
321 // check if deadline has reached
322 return std::chrono::steady_clock::now() < sleep_tp;
323}
324
325void
326scheduler::suspend() noexcept {
327 // resume another context
b32b8144 328 algo_->pick_next()->resume();
7c673cae
FG
329}
330
331void
332scheduler::suspend( detail::spinlock_lock & lk) noexcept {
333 // resume another context
b32b8144 334 algo_->pick_next()->resume( lk);
7c673cae
FG
335}
336
337bool
338scheduler::has_ready_fibers() const noexcept {
339 return algo_->has_ready_fibers();
340}
341
342void
b32b8144 343scheduler::set_algo( algo::algorithm::ptr_t algo) noexcept {
7c673cae
FG
344 // move remaining cotnext in current scheduler to new one
345 while ( algo_->has_ready_fibers() ) {
346 algo->awakened( algo_->pick_next() );
347 }
348 algo_ = std::move( algo);
349}
350
351void
b32b8144
FG
352scheduler::attach_main_context( context * ctx) noexcept {
353 BOOST_ASSERT( nullptr != ctx);
7c673cae
FG
354 // main-context represents the execution context created
355 // by the system, e.g. main()- or thread-context
356 // should not be in worker-queue
b32b8144 357 main_ctx_ = ctx;
7c673cae
FG
358 main_ctx_->scheduler_ = this;
359}
360
361void
b32b8144
FG
362scheduler::attach_dispatcher_context( intrusive_ptr< context > ctx) noexcept {
363 BOOST_ASSERT( ctx);
7c673cae
FG
364 // dispatcher context has to handle
365 // - remote ready context'
366 // - sleeping context'
367 // - extern event-loops
368 // - suspending the thread if ready-queue is empty (waiting on external event)
369 // should not be in worker-queue
b32b8144 370 dispatcher_ctx_.swap( ctx);
7c673cae
FG
371 // add dispatcher-context to ready-queue
372 // so it is the first element in the ready-queue
373 // if the main context tries to suspend the first time
374 // the dispatcher-context is resumed and
375 // scheduler::dispatch() is executed
376 dispatcher_ctx_->scheduler_ = this;
377 algo_->awakened( dispatcher_ctx_.get() );
378}
379
380void
381scheduler::attach_worker_context( context * ctx) noexcept {
382 BOOST_ASSERT( nullptr != ctx);
b32b8144 383 BOOST_ASSERT( nullptr == ctx->get_scheduler() );
7c673cae 384 BOOST_ASSERT( ! ctx->ready_is_linked() );
b32b8144
FG
385#if ! defined(BOOST_FIBERS_NO_ATOMICS)
386 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
387#endif
7c673cae
FG
388 BOOST_ASSERT( ! ctx->sleep_is_linked() );
389 BOOST_ASSERT( ! ctx->terminated_is_linked() );
390 BOOST_ASSERT( ! ctx->wait_is_linked() );
391 BOOST_ASSERT( ! ctx->worker_is_linked() );
7c673cae 392 ctx->worker_link( worker_queue_);
b32b8144
FG
393 ctx->scheduler_ = this;
394 // an attached context must belong at least to worker-queue
7c673cae
FG
395}
396
397void
398scheduler::detach_worker_context( context * ctx) noexcept {
399 BOOST_ASSERT( nullptr != ctx);
400 BOOST_ASSERT( ! ctx->ready_is_linked() );
b32b8144
FG
401#if ! defined(BOOST_FIBERS_NO_ATOMICS)
402 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
403#endif
7c673cae
FG
404 BOOST_ASSERT( ! ctx->sleep_is_linked() );
405 BOOST_ASSERT( ! ctx->terminated_is_linked() );
406 BOOST_ASSERT( ! ctx->wait_is_linked() );
b32b8144 407 BOOST_ASSERT( ctx->worker_is_linked() );
7c673cae
FG
408 BOOST_ASSERT( ! ctx->is_context( type::pinned_context) );
409 ctx->worker_unlink();
b32b8144 410 BOOST_ASSERT( ! ctx->worker_is_linked() );
7c673cae 411 ctx->scheduler_ = nullptr;
b32b8144 412 // a detached context must not belong to any queue
7c673cae
FG
413}
414
415}}
416
417#ifdef BOOST_HAS_ABI_HEADERS
418# include BOOST_ABI_SUFFIX
419#endif