]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/libs/fiber/src/scheduler.cpp
update sources to v12.2.3
[ceph.git] / ceph / src / boost / libs / fiber / src / scheduler.cpp
1
2 // Copyright Oliver Kowalke 2013.
3 // Distributed under the Boost Software License, Version 1.0.
4 // (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6
7 #include "boost/fiber/scheduler.hpp"
8
9 #include <chrono>
10 #include <mutex>
11
12 #include <boost/assert.hpp>
13
14 #include "boost/fiber/algo/round_robin.hpp"
15 #include "boost/fiber/context.hpp"
16 #include "boost/fiber/exceptions.hpp"
17
18 #ifdef BOOST_HAS_ABI_HEADERS
19 # include BOOST_ABI_PREFIX
20 #endif
21
22 namespace boost {
23 namespace fibers {
24
25 void
26 scheduler::release_terminated_() noexcept {
27 while ( ! terminated_queue_.empty() ) {
28 context * ctx = & terminated_queue_.front();
29 terminated_queue_.pop_front();
30 BOOST_ASSERT( ctx->is_context( type::worker_context) );
31 BOOST_ASSERT( ! ctx->is_context( type::pinned_context) );
32 BOOST_ASSERT( this == ctx->get_scheduler() );
33 BOOST_ASSERT( ctx->is_resumable() );
34 BOOST_ASSERT( ! ctx->worker_is_linked() );
35 BOOST_ASSERT( ! ctx->ready_is_linked() );
36 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
37 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
38 #endif
39 BOOST_ASSERT( ! ctx->sleep_is_linked() );
40 BOOST_ASSERT( ! ctx->wait_is_linked() );
41 BOOST_ASSERT( ctx->wait_queue_.empty() );
42 BOOST_ASSERT( ctx->terminated_);
43 // if last reference, e.g. fiber::join() or fiber::detach()
44 // have been already called, this will call ~context(),
45 // the context is automatically removeid from worker-queue
46 intrusive_ptr_release( ctx);
47 }
48 }
49
50 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
51 void
52 scheduler::remote_ready2ready_() noexcept {
53 remote_ready_queue_type tmp;
54 detail::spinlock_lock lk{ remote_ready_splk_ };
55 remote_ready_queue_.swap( tmp);
56 lk.unlock();
57 // get context from remote ready-queue
58 while ( ! tmp.empty() ) {
59 context * ctx = & tmp.front();
60 tmp.pop_front();
61 // ctx was signaled from remote (other thread)
62 // ctx might have been already resumed because of
63 // its wait-op. has been already timed out and
64 // thus it was already pushed to the ready-queue
65 if ( ! ctx->ready_is_linked() ) {
66 // store context in local queues
67 schedule( ctx);
68 }
69 }
70 }
71 #endif
72
73 void
74 scheduler::sleep2ready_() noexcept {
75 // move context which the deadline has reached
76 // to ready-queue
77 // sleep-queue is sorted (ascending)
78 std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
79 sleep_queue_type::iterator e = sleep_queue_.end();
80 for ( sleep_queue_type::iterator i = sleep_queue_.begin(); i != e;) {
81 context * ctx = & ( * i);
82 // dipatcher context must never be pushed to sleep-queue
83 BOOST_ASSERT( ! ctx->is_context( type::dispatcher_context) );
84 BOOST_ASSERT( main_ctx_ == ctx || ctx->worker_is_linked() );
85 BOOST_ASSERT( ! ctx->ready_is_linked() );
86 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
87 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
88 #endif
89 BOOST_ASSERT( ! ctx->terminated_is_linked() );
90 // set fiber to state_ready if deadline was reached
91 if ( ctx->tp_ <= now) {
92 // remove context from sleep-queue
93 i = sleep_queue_.erase( i);
94 // reset sleep-tp
95 ctx->tp_ = (std::chrono::steady_clock::time_point::max)();
96 std::intptr_t prev = ctx->twstatus.exchange( -1);
97 if ( static_cast< std::intptr_t >( -1) == prev) {
98 // timed-wait op.: timeout after notify
99 continue;
100 }
101 // prev == 0: no timed-wait op.
102 // prev == <any>: timed-wait op., timeout before notify
103 // push new context to ready-queue
104 algo_->awakened( ctx);
105 } else {
106 break; // first context with now < deadline
107 }
108 }
109 }
110
111 scheduler::scheduler() noexcept :
112 algo_{ new algo::round_robin() } {
113 }
114
115 scheduler::~scheduler() {
116 BOOST_ASSERT( nullptr != main_ctx_);
117 BOOST_ASSERT( nullptr != dispatcher_ctx_.get() );
118 BOOST_ASSERT( context::active() == main_ctx_);
119 // signal dispatcher-context termination
120 shutdown_ = true;
121 // resume pending fibers
122 // by joining dispatcher-context
123 dispatcher_ctx_->join();
124 // no context' in worker-queue
125 BOOST_ASSERT( worker_queue_.empty() );
126 BOOST_ASSERT( terminated_queue_.empty() );
127 BOOST_ASSERT( sleep_queue_.empty() );
128 // set active context to nullptr
129 context::reset_active();
130 // deallocate dispatcher-context
131 BOOST_ASSERT( ! dispatcher_ctx_->ready_is_linked() );
132 dispatcher_ctx_.reset();
133 // set main-context to nullptr
134 main_ctx_ = nullptr;
135 }
136
137 boost::context::continuation
138 scheduler::dispatch() noexcept {
139 BOOST_ASSERT( context::active() == dispatcher_ctx_);
140 for (;;) {
141 if ( shutdown_) {
142 // notify sched-algorithm about termination
143 algo_->notify();
144 if ( worker_queue_.empty() ) {
145 break;
146 }
147 }
148 // release terminated context'
149 release_terminated_();
150 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
151 // get context' from remote ready-queue
152 remote_ready2ready_();
153 #endif
154 // get sleeping context'
155 sleep2ready_();
156 // get next ready context
157 context * ctx = algo_->pick_next();
158 if ( nullptr != ctx) {
159 BOOST_ASSERT( ctx->is_resumable() );
160 BOOST_ASSERT( ! ctx->ready_is_linked() );
161 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
162 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
163 #endif
164 BOOST_ASSERT( ! ctx->sleep_is_linked() );
165 BOOST_ASSERT( ! ctx->terminated_is_linked() );
166 // no test for '! ctx->wait_is_linked()' because
167 // context is registered in wait-queue of sync. primitives
168 // via wait_for()/wait_until()
169 // push dispatcher-context to ready-queue
170 // so that ready-queue never becomes empty
171 ctx->resume( dispatcher_ctx_.get() );
172 BOOST_ASSERT( context::active() == dispatcher_ctx_.get() );
173 } else {
174 // no ready context, wait till signaled
175 // set deadline to highest value
176 std::chrono::steady_clock::time_point suspend_time =
177 (std::chrono::steady_clock::time_point::max)();
178 // get lowest deadline from sleep-queue
179 sleep_queue_type::iterator i = sleep_queue_.begin();
180 if ( sleep_queue_.end() != i) {
181 suspend_time = i->tp_;
182 }
183 // no ready context, wait till signaled
184 algo_->suspend_until( suspend_time);
185 }
186 }
187 // release termianted context'
188 release_terminated_();
189 // return to main-context
190 return main_ctx_->suspend_with_cc();
191 }
192
193 void
194 scheduler::schedule( context * ctx) noexcept {
195 BOOST_ASSERT( nullptr != ctx);
196 BOOST_ASSERT( ! ctx->ready_is_linked() );
197 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
198 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
199 #endif
200 BOOST_ASSERT( ! ctx->terminated_is_linked() );
201 BOOST_ASSERT( ! ctx->wait_is_linked() );
202 // remove context ctx from sleep-queue
203 // (might happen if blocked in timed_mutex::try_lock_until())
204 if ( ctx->sleep_is_linked() ) {
205 // unlink it from sleep-queue
206 ctx->sleep_unlink();
207 }
208 // push new context to ready-queue
209 algo_->awakened( ctx);
210 }
211
212 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
213 void
214 scheduler::schedule_from_remote( context * ctx) noexcept {
215 BOOST_ASSERT( nullptr != ctx);
216 // another thread might signal the main-context of this thread
217 BOOST_ASSERT( ! ctx->is_context( type::dispatcher_context) );
218 BOOST_ASSERT( this == ctx->get_scheduler() );
219 BOOST_ASSERT( ! ctx->ready_is_linked() );
220 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
221 BOOST_ASSERT( ! ctx->terminated_is_linked() );
222 BOOST_ASSERT( ! ctx->wait_is_linked() );
223 // protect for concurrent access
224 detail::spinlock_lock lk{ remote_ready_splk_ };
225 BOOST_ASSERT( ! shutdown_);
226 BOOST_ASSERT( nullptr != main_ctx_);
227 BOOST_ASSERT( nullptr != dispatcher_ctx_.get() );
228 // push new context to remote ready-queue
229 ctx->remote_ready_link( remote_ready_queue_);
230 // notify scheduler
231 algo_->notify();
232 }
233 #endif
234
235 boost::context::continuation
236 scheduler::terminate( detail::spinlock_lock & lk, context * ctx) noexcept {
237 BOOST_ASSERT( nullptr != ctx);
238 BOOST_ASSERT( context::active() == ctx);
239 BOOST_ASSERT( this == ctx->get_scheduler() );
240 BOOST_ASSERT( ctx->is_context( type::worker_context) );
241 BOOST_ASSERT( ! ctx->is_context( type::pinned_context) );
242 BOOST_ASSERT( ! ctx->ready_is_linked() );
243 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
244 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
245 #endif
246 BOOST_ASSERT( ! ctx->sleep_is_linked() );
247 BOOST_ASSERT( ! ctx->terminated_is_linked() );
248 BOOST_ASSERT( ! ctx->wait_is_linked() );
249 BOOST_ASSERT( ctx->wait_queue_.empty() );
250 // store the terminated fiber in the terminated-queue
251 // the dispatcher-context will call
252 ctx->terminated_link( terminated_queue_);
253 // remove from the worker-queue
254 ctx->worker_unlink();
255 // release lock
256 lk.unlock();
257 // resume another fiber
258 return algo_->pick_next()->suspend_with_cc();
259 }
260
261 void
262 scheduler::yield( context * ctx) noexcept {
263 BOOST_ASSERT( nullptr != ctx);
264 BOOST_ASSERT( context::active() == ctx);
265 BOOST_ASSERT( ctx->is_context( type::worker_context) || ctx->is_context( type::main_context) );
266 BOOST_ASSERT( ! ctx->ready_is_linked() );
267 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
268 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
269 #endif
270 BOOST_ASSERT( ! ctx->sleep_is_linked() );
271 BOOST_ASSERT( ! ctx->terminated_is_linked() );
272 BOOST_ASSERT( ! ctx->wait_is_linked() );
273 // resume another fiber
274 algo_->pick_next()->resume( ctx);
275 }
276
277 bool
278 scheduler::wait_until( context * ctx,
279 std::chrono::steady_clock::time_point const& sleep_tp) noexcept {
280 BOOST_ASSERT( nullptr != ctx);
281 BOOST_ASSERT( context::active() == ctx);
282 BOOST_ASSERT( ctx->is_context( type::worker_context) || ctx->is_context( type::main_context) );
283 BOOST_ASSERT( ! ctx->ready_is_linked() );
284 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
285 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
286 #endif
287 BOOST_ASSERT( ! ctx->sleep_is_linked() );
288 BOOST_ASSERT( ! ctx->terminated_is_linked() );
289 BOOST_ASSERT( ! ctx->wait_is_linked() );
290 ctx->tp_ = sleep_tp;
291 ctx->sleep_link( sleep_queue_);
292 // resume another context
293 algo_->pick_next()->resume();
294 // context has been resumed
295 // check if deadline has reached
296 return std::chrono::steady_clock::now() < sleep_tp;
297 }
298
299 bool
300 scheduler::wait_until( context * ctx,
301 std::chrono::steady_clock::time_point const& sleep_tp,
302 detail::spinlock_lock & lk) noexcept {
303 BOOST_ASSERT( nullptr != ctx);
304 BOOST_ASSERT( context::active() == ctx);
305 BOOST_ASSERT( ctx->is_context( type::worker_context) || ctx->is_context( type::main_context) );
306 BOOST_ASSERT( ! ctx->ready_is_linked() );
307 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
308 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
309 #endif
310 BOOST_ASSERT( ! ctx->sleep_is_linked() );
311 BOOST_ASSERT( ! ctx->terminated_is_linked() );
312 // ctx->wait_is_linked() might return true
313 // if context was locked inside timed_mutex::try_lock_until()
314 // push active context to sleep-queue
315 ctx->tp_ = sleep_tp;
316 ctx->sleep_link( sleep_queue_);
317 // resume another context
318 algo_->pick_next()->resume( lk);
319 // context has been resumed
320 // check if deadline has reached
321 return std::chrono::steady_clock::now() < sleep_tp;
322 }
323
324 void
325 scheduler::suspend() noexcept {
326 // resume another context
327 algo_->pick_next()->resume();
328 }
329
330 void
331 scheduler::suspend( detail::spinlock_lock & lk) noexcept {
332 // resume another context
333 algo_->pick_next()->resume( lk);
334 }
335
336 bool
337 scheduler::has_ready_fibers() const noexcept {
338 return algo_->has_ready_fibers();
339 }
340
341 void
342 scheduler::set_algo( algo::algorithm::ptr_t algo) noexcept {
343 // move remaining cotnext in current scheduler to new one
344 while ( algo_->has_ready_fibers() ) {
345 algo->awakened( algo_->pick_next() );
346 }
347 algo_ = std::move( algo);
348 }
349
350 void
351 scheduler::attach_main_context( context * ctx) noexcept {
352 BOOST_ASSERT( nullptr != ctx);
353 // main-context represents the execution context created
354 // by the system, e.g. main()- or thread-context
355 // should not be in worker-queue
356 main_ctx_ = ctx;
357 main_ctx_->scheduler_ = this;
358 }
359
360 void
361 scheduler::attach_dispatcher_context( intrusive_ptr< context > ctx) noexcept {
362 BOOST_ASSERT( ctx);
363 // dispatcher context has to handle
364 // - remote ready context'
365 // - sleeping context'
366 // - extern event-loops
367 // - suspending the thread if ready-queue is empty (waiting on external event)
368 // should not be in worker-queue
369 dispatcher_ctx_.swap( ctx);
370 // add dispatcher-context to ready-queue
371 // so it is the first element in the ready-queue
372 // if the main context tries to suspend the first time
373 // the dispatcher-context is resumed and
374 // scheduler::dispatch() is executed
375 dispatcher_ctx_->scheduler_ = this;
376 algo_->awakened( dispatcher_ctx_.get() );
377 }
378
379 void
380 scheduler::attach_worker_context( context * ctx) noexcept {
381 BOOST_ASSERT( nullptr != ctx);
382 BOOST_ASSERT( nullptr == ctx->get_scheduler() );
383 BOOST_ASSERT( ! ctx->ready_is_linked() );
384 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
385 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
386 #endif
387 BOOST_ASSERT( ! ctx->sleep_is_linked() );
388 BOOST_ASSERT( ! ctx->terminated_is_linked() );
389 BOOST_ASSERT( ! ctx->wait_is_linked() );
390 BOOST_ASSERT( ! ctx->worker_is_linked() );
391 ctx->worker_link( worker_queue_);
392 ctx->scheduler_ = this;
393 // an attached context must belong at least to worker-queue
394 }
395
396 void
397 scheduler::detach_worker_context( context * ctx) noexcept {
398 BOOST_ASSERT( nullptr != ctx);
399 BOOST_ASSERT( ! ctx->ready_is_linked() );
400 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
401 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
402 #endif
403 BOOST_ASSERT( ! ctx->sleep_is_linked() );
404 BOOST_ASSERT( ! ctx->terminated_is_linked() );
405 BOOST_ASSERT( ! ctx->wait_is_linked() );
406 BOOST_ASSERT( ctx->worker_is_linked() );
407 BOOST_ASSERT( ! ctx->is_context( type::pinned_context) );
408 ctx->worker_unlink();
409 BOOST_ASSERT( ! ctx->worker_is_linked() );
410 ctx->scheduler_ = nullptr;
411 // a detached context must not belong to any queue
412 }
413
414 }}
415
416 #ifdef BOOST_HAS_ABI_HEADERS
417 # include BOOST_ABI_SUFFIX
418 #endif