]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/libs/fiber/src/scheduler.cpp
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / boost / libs / fiber / src / scheduler.cpp
1
2 // Copyright Oliver Kowalke 2013.
3 // Distributed under the Boost Software License, Version 1.0.
4 // (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6
7 #include "boost/fiber/scheduler.hpp"
8
9 #include <chrono>
10 #include <mutex>
11
12 #include <boost/assert.hpp>
13
14 #include "boost/fiber/algo/round_robin.hpp"
15 #include "boost/fiber/context.hpp"
16 #include "boost/fiber/exceptions.hpp"
17
18 #ifdef BOOST_HAS_ABI_HEADERS
19 # include BOOST_ABI_PREFIX
20 #endif
21
22 namespace boost {
23 namespace fibers {
24
25 void
26 scheduler::release_terminated_() noexcept {
27 while ( ! terminated_queue_.empty() ) {
28 context * ctx = & terminated_queue_.front();
29 terminated_queue_.pop_front();
30 BOOST_ASSERT( ctx->is_context( type::worker_context) );
31 BOOST_ASSERT( ! ctx->is_context( type::pinned_context) );
32 BOOST_ASSERT( this == ctx->get_scheduler() );
33 BOOST_ASSERT( ctx->is_resumable() );
34 BOOST_ASSERT( ! ctx->worker_is_linked() );
35 BOOST_ASSERT( ! ctx->ready_is_linked() );
36 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
37 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
38 #endif
39 BOOST_ASSERT( ! ctx->sleep_is_linked() );
40 BOOST_ASSERT( ctx->wait_queue_.empty() );
41 BOOST_ASSERT( ctx->terminated_);
42 // if last reference, e.g. fiber::join() or fiber::detach()
43 // have been already called, this will call ~context(),
44 // the context is automatically removeid from worker-queue
45 intrusive_ptr_release( ctx);
46 }
47 }
48
49 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
50 void
51 scheduler::remote_ready2ready_() noexcept {
52 remote_ready_queue_type tmp;
53 detail::spinlock_lock lk{ remote_ready_splk_ };
54 remote_ready_queue_.swap( tmp);
55 lk.unlock();
56 // get context from remote ready-queue
57 while ( ! tmp.empty() ) {
58 context * ctx = & tmp.front();
59 tmp.pop_front();
60 // store context in local queues
61 schedule( ctx);
62 }
63 }
64 #endif
65
66 void
67 scheduler::sleep2ready_() noexcept {
68 // move context which the deadline has reached
69 // to ready-queue
70 // sleep-queue is sorted (ascending)
71 std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
72 sleep_queue_type::iterator e = sleep_queue_.end();
73 for ( sleep_queue_type::iterator i = sleep_queue_.begin(); i != e;) {
74 context * ctx = & ( * i);
75 // dispatcher context must never be pushed to sleep-queue
76 BOOST_ASSERT( ! ctx->is_context( type::dispatcher_context) );
77 BOOST_ASSERT( main_ctx_ == ctx || ctx->worker_is_linked() );
78 BOOST_ASSERT( ! ctx->ready_is_linked() );
79 // remote_ready_hook_ can be linked in that point in case when the ctx
80 // has been signaled concurrently when sleep2ready_ is called. In that
81 // case sleep_waker_.wake() is just no-op, because sleep_waker_ is
82 // outdated
83 BOOST_ASSERT( ! ctx->terminated_is_linked() );
84 // set fiber to state_ready if deadline was reached
85 if ( ctx->tp_ <= now) {
86 // remove context from sleep-queue
87 i = sleep_queue_.erase( i);
88 // reset sleep-tp
89 ctx->tp_ = (std::chrono::steady_clock::time_point::max)();
90 ctx->sleep_waker_.wake();
91 } else {
92 break; // first context with now < deadline
93 }
94 }
95 }
96
97 scheduler::scheduler() noexcept :
98 algo_{ new algo::round_robin() } {
99 }
100
101 scheduler::~scheduler() {
102 BOOST_ASSERT( nullptr != main_ctx_);
103 BOOST_ASSERT( nullptr != dispatcher_ctx_.get() );
104 BOOST_ASSERT( context::active() == main_ctx_);
105 // signal dispatcher-context termination
106 shutdown_ = true;
107 // resume pending fibers
108 // by resuming dispatcher-context
109 context::active()->suspend();
110 // no context' in worker-queue
111 BOOST_ASSERT( worker_queue_.empty() );
112 BOOST_ASSERT( terminated_queue_.empty() );
113 BOOST_ASSERT( sleep_queue_.empty() );
114 // set active context to nullptr
115 context::reset_active();
116 // deallocate dispatcher-context
117 BOOST_ASSERT( ! dispatcher_ctx_->ready_is_linked() );
118 dispatcher_ctx_.reset();
119 // set main-context to nullptr
120 main_ctx_ = nullptr;
121 }
122
123 boost::context::fiber
124 scheduler::dispatch() noexcept {
125 BOOST_ASSERT( context::active() == dispatcher_ctx_);
126 for (;;) {
127 if ( shutdown_) {
128 // notify sched-algorithm about termination
129 algo_->notify();
130 if ( worker_queue_.empty() ) {
131 break;
132 }
133 }
134 // release terminated context'
135 release_terminated_();
136 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
137 // get context' from remote ready-queue
138 remote_ready2ready_();
139 #endif
140 // get sleeping context'
141 // must be called after remote_ready2ready_()
142 sleep2ready_();
143 // get next ready context
144 context * ctx = algo_->pick_next();
145 if ( nullptr != ctx) {
146 BOOST_ASSERT( ctx->is_resumable() );
147 BOOST_ASSERT( ! ctx->ready_is_linked() );
148 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
149 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
150 #endif
151 BOOST_ASSERT( ! ctx->sleep_is_linked() );
152 BOOST_ASSERT( ! ctx->terminated_is_linked() );
153 // push dispatcher-context to ready-queue
154 // so that ready-queue never becomes empty
155 ctx->resume( dispatcher_ctx_.get() );
156 BOOST_ASSERT( context::active() == dispatcher_ctx_.get() );
157 } else {
158 // no ready context, wait till signaled
159 // set deadline to highest value
160 std::chrono::steady_clock::time_point suspend_time =
161 (std::chrono::steady_clock::time_point::max)();
162 // get lowest deadline from sleep-queue
163 sleep_queue_type::iterator i = sleep_queue_.begin();
164 if ( sleep_queue_.end() != i) {
165 suspend_time = i->tp_;
166 }
167 // no ready context, wait till signaled
168 algo_->suspend_until( suspend_time);
169 }
170 }
171 // release termianted context'
172 release_terminated_();
173 // return to main-context
174 return main_ctx_->suspend_with_cc();
175 }
176
177 void
178 scheduler::schedule( context * ctx) noexcept {
179 BOOST_ASSERT( nullptr != ctx);
180 BOOST_ASSERT( ! ctx->ready_is_linked() );
181 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
182 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
183 #endif
184 BOOST_ASSERT( ! ctx->terminated_is_linked() );
185 // remove context ctx from sleep-queue
186 // (might happen if blocked in timed_mutex::try_lock_until())
187 if ( ctx->sleep_is_linked() ) {
188 // unlink it from sleep-queue
189 ctx->sleep_unlink();
190 }
191 // push new context to ready-queue
192 algo_->awakened( ctx);
193 }
194
195 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
196 void
197 scheduler::schedule_from_remote( context * ctx) noexcept {
198 BOOST_ASSERT( nullptr != ctx);
199 // another thread might signal the main-context of this thread
200 BOOST_ASSERT( ! ctx->is_context( type::dispatcher_context) );
201 BOOST_ASSERT( this == ctx->get_scheduler() );
202 BOOST_ASSERT( ! ctx->ready_is_linked() );
203 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
204 BOOST_ASSERT( ! ctx->terminated_is_linked() );
205 // protect for concurrent access
206 detail::spinlock_lock lk{ remote_ready_splk_ };
207 BOOST_ASSERT( ! shutdown_);
208 BOOST_ASSERT( nullptr != main_ctx_);
209 BOOST_ASSERT( nullptr != dispatcher_ctx_.get() );
210 // push new context to remote ready-queue
211 ctx->remote_ready_link( remote_ready_queue_);
212 lk.unlock();
213 // notify scheduler
214 algo_->notify();
215 }
216 #endif
217
218 boost::context::fiber
219 scheduler::terminate( detail::spinlock_lock & lk, context * ctx) noexcept {
220 BOOST_ASSERT( nullptr != ctx);
221 BOOST_ASSERT( context::active() == ctx);
222 BOOST_ASSERT( this == ctx->get_scheduler() );
223 BOOST_ASSERT( ctx->is_context( type::worker_context) );
224 BOOST_ASSERT( ! ctx->is_context( type::pinned_context) );
225 BOOST_ASSERT( ! ctx->ready_is_linked() );
226 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
227 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
228 #endif
229 BOOST_ASSERT( ! ctx->sleep_is_linked() );
230 BOOST_ASSERT( ! ctx->terminated_is_linked() );
231 BOOST_ASSERT( ctx->wait_queue_.empty() );
232 // store the terminated fiber in the terminated-queue
233 // the dispatcher-context will call
234 ctx->terminated_link( terminated_queue_);
235 // remove from the worker-queue
236 ctx->worker_unlink();
237 // release lock
238 lk.unlock();
239 // resume another fiber
240 return algo_->pick_next()->suspend_with_cc();
241 }
242
243 void
244 scheduler::yield( context * ctx) noexcept {
245 BOOST_ASSERT( nullptr != ctx);
246 BOOST_ASSERT( context::active() == ctx);
247 BOOST_ASSERT( ctx->is_context( type::worker_context) || ctx->is_context( type::main_context) );
248 BOOST_ASSERT( ! ctx->ready_is_linked() );
249 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
250 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
251 #endif
252 BOOST_ASSERT( ! ctx->sleep_is_linked() );
253 BOOST_ASSERT( ! ctx->terminated_is_linked() );
254 // resume another fiber
255 algo_->pick_next()->resume( ctx);
256 }
257
258 bool
259 scheduler::wait_until( context * ctx,
260 std::chrono::steady_clock::time_point const& sleep_tp) noexcept {
261 BOOST_ASSERT( nullptr != ctx);
262 BOOST_ASSERT( context::active() == ctx);
263 BOOST_ASSERT( ctx->is_context( type::worker_context) || ctx->is_context( type::main_context) );
264 BOOST_ASSERT( ! ctx->ready_is_linked() );
265 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
266 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
267 #endif
268 BOOST_ASSERT( ! ctx->sleep_is_linked() );
269 BOOST_ASSERT( ! ctx->terminated_is_linked() );
270 ctx->sleep_waker_ = ctx->create_waker();
271 ctx->tp_ = sleep_tp;
272 ctx->sleep_link( sleep_queue_);
273 // resume another context
274 algo_->pick_next()->resume();
275 // context has been resumed
276 // check if deadline has reached
277 return std::chrono::steady_clock::now() < sleep_tp;
278 }
279
280 bool
281 scheduler::wait_until( context * ctx,
282 std::chrono::steady_clock::time_point const& sleep_tp,
283 detail::spinlock_lock & lk,
284 waker && w) noexcept {
285 BOOST_ASSERT( nullptr != ctx);
286 BOOST_ASSERT( context::active() == ctx);
287 BOOST_ASSERT( ctx->is_context( type::worker_context) || ctx->is_context( type::main_context) );
288 BOOST_ASSERT( ! ctx->ready_is_linked() );
289 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
290 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
291 #endif
292 BOOST_ASSERT( ! ctx->sleep_is_linked() );
293 BOOST_ASSERT( ! ctx->terminated_is_linked() );
294 // push active context to sleep-queue
295 ctx->sleep_waker_ = std::move( w);
296 ctx->tp_ = sleep_tp;
297 ctx->sleep_link( sleep_queue_);
298 // resume another context
299 algo_->pick_next()->resume( lk);
300 // context has been resumed
301 // check if deadline has reached
302 return std::chrono::steady_clock::now() < sleep_tp;
303 }
304
305 void
306 scheduler::suspend() noexcept {
307 // resume another context
308 algo_->pick_next()->resume();
309 }
310
311 void
312 scheduler::suspend( detail::spinlock_lock & lk) noexcept {
313 // resume another context
314 algo_->pick_next()->resume( lk);
315 }
316
317 bool
318 scheduler::has_ready_fibers() const noexcept {
319 return algo_->has_ready_fibers();
320 }
321
322 void
323 scheduler::set_algo( algo::algorithm::ptr_t algo) noexcept {
324 // move remaining context in current scheduler to new one
325 while ( algo_->has_ready_fibers() ) {
326 algo->awakened( algo_->pick_next() );
327 }
328 algo_ = std::move( algo);
329 }
330
331 void
332 scheduler::attach_main_context( context * ctx) noexcept {
333 BOOST_ASSERT( nullptr != ctx);
334 // main-context represents the execution context created
335 // by the system, e.g. main()- or thread-context
336 // should not be in worker-queue
337 main_ctx_ = ctx;
338 main_ctx_->scheduler_ = this;
339 }
340
341 void
342 scheduler::attach_dispatcher_context( intrusive_ptr< context > ctx) noexcept {
343 BOOST_ASSERT( ctx);
344 // dispatcher context has to handle
345 // - remote ready context'
346 // - sleeping context'
347 // - extern event-loops
348 // - suspending the thread if ready-queue is empty (waiting on external event)
349 // should not be in worker-queue
350 dispatcher_ctx_.swap( ctx);
351 // add dispatcher-context to ready-queue
352 // so it is the first element in the ready-queue
353 // if the main context tries to suspend the first time
354 // the dispatcher-context is resumed and
355 // scheduler::dispatch() is executed
356 dispatcher_ctx_->scheduler_ = this;
357 algo_->awakened( dispatcher_ctx_.get() );
358 }
359
360 void
361 scheduler::attach_worker_context( context * ctx) noexcept {
362 BOOST_ASSERT( nullptr != ctx);
363 BOOST_ASSERT( nullptr == ctx->get_scheduler() );
364 BOOST_ASSERT( ! ctx->ready_is_linked() );
365 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
366 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
367 #endif
368 BOOST_ASSERT( ! ctx->sleep_is_linked() );
369 BOOST_ASSERT( ! ctx->terminated_is_linked() );
370 BOOST_ASSERT( ! ctx->worker_is_linked() );
371 ctx->worker_link( worker_queue_);
372 ctx->scheduler_ = this;
373 // an attached context must belong at least to worker-queue
374 }
375
376 void
377 scheduler::detach_worker_context( context * ctx) noexcept {
378 BOOST_ASSERT( nullptr != ctx);
379 BOOST_ASSERT( ! ctx->ready_is_linked() );
380 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
381 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
382 #endif
383 BOOST_ASSERT( ! ctx->sleep_is_linked() );
384 BOOST_ASSERT( ! ctx->terminated_is_linked() );
385 BOOST_ASSERT( ctx->worker_is_linked() );
386 BOOST_ASSERT( ! ctx->is_context( type::pinned_context) );
387 ctx->worker_unlink();
388 BOOST_ASSERT( ! ctx->worker_is_linked() );
389 ctx->scheduler_ = nullptr;
390 // a detached context must not belong to any queue
391 }
392
393 }}
394
395 #ifdef BOOST_HAS_ABI_HEADERS
396 # include BOOST_ABI_SUFFIX
397 #endif