1 //! A scheduler is initialized with a fixed number of workers. Each worker is
2 //! driven by a thread. Each worker has a "core" which contains data such as the
3 //! run queue and other state. When `block_in_place` is called, the worker's
4 //! "core" is handed off to a new thread allowing the scheduler to continue to
5 //! make progress while the originating thread blocks.
8 use crate::loom
::rand
::seed
;
9 use crate::loom
::sync
::{Arc, Mutex}
;
10 use crate::park
::{Park, Unpark}
;
12 use crate::runtime
::enter
::EnterContext
;
13 use crate::runtime
::park
::{Parker, Unparker}
;
14 use crate::runtime
::thread_pool
::{AtomicCell, Idle}
;
15 use crate::runtime
::{queue, task}
;
16 use crate::util
::linked_list
::{Link, LinkedList}
;
17 use crate::util
::FastRand
;
19 use std
::cell
::RefCell
;
20 use std
::time
::Duration
;
22 /// A scheduler worker
23 pub(super) struct Worker
{
24 /// Reference to shared state
27 /// Index holding this worker's remote state
30 /// Used to hand-off a worker's core to another thread.
31 core
: AtomicCell
<Core
>,
36 /// Used to schedule bookkeeping tasks every so often.
39 /// When a task is scheduled from a worker, it is stored in this slot. The
40 /// worker will check this slot for a task **before** checking the run
41 /// queue. This effectively results in the **last** scheduled task to be run
42 /// next (LIFO). This is an optimization for message passing patterns and
43 /// helps to reduce latency.
44 lifo_slot
: Option
<Notified
>,
46 /// The worker-local run queue.
47 run_queue
: queue
::Local
<Arc
<Worker
>>,
49 /// True if the worker is currently searching for more work. Searching
50 /// involves attempting to steal from other workers.
53 /// True if the scheduler is being shutdown
56 /// Tasks owned by the core
57 tasks
: LinkedList
<Task
, <Task
as Link
>::Target
>,
61 /// Stored in an `Option` as the parker is added / removed to make the
62 /// borrow checker happy.
65 /// Fast random number generator.
69 /// State shared across all workers
70 pub(super) struct Shared
{
71 /// Per-worker remote state. All other workers have access to this and is
72 /// how they communicate between each other.
73 remotes
: Box
<[Remote
]>,
75 /// Submit work to the scheduler while **not** currently on a worker thread.
76 inject
: queue
::Inject
<Arc
<Worker
>>,
78 /// Coordinates idle workers
81 /// Cores that have observed the shutdown signal
83 /// The core is **not** placed back in the worker to avoid it from being
84 /// stolen by a thread that was spawned as part of `block_in_place`.
85 #[allow(clippy::vec_box)] // we're moving an already-boxed value
86 shutdown_cores
: Mutex
<Vec
<Box
<Core
>>>,
89 /// Used to communicate with a worker from other threads.
91 /// Steal tasks from this worker.
92 steal
: queue
::Steal
<Arc
<Worker
>>,
94 /// Transfers tasks to be released. Any worker pushes tasks, only the owning
96 pending_drop
: task
::TransferStack
<Arc
<Worker
>>,
98 /// Unparks the associated worker thread
102 /// Thread-local context
108 core
: RefCell
<Option
<Box
<Core
>>>,
111 /// Starts the workers
112 pub(crate) struct Launch(Vec
<Arc
<Worker
>>);
114 /// Running a task may consume the core. If the core is still available when
115 /// running the task completes, it is returned. Otherwise, the worker will need
116 /// to stop processing.
117 type RunResult
= Result
<Box
<Core
>, ()>;
120 type Task
= task
::Task
<Arc
<Worker
>>;
122 /// A notified task handle
123 type Notified
= task
::Notified
<Arc
<Worker
>>;
125 // Tracks thread-local state
126 scoped_thread_local
!(static CURRENT
: Context
);
128 pub(super) fn create(size
: usize, park
: Parker
) -> (Arc
<Shared
>, Launch
) {
129 let mut cores
= vec
![];
130 let mut remotes
= vec
![];
132 // Create the local queues
134 let (steal
, run_queue
) = queue
::local();
136 let park
= park
.clone();
137 let unpark
= park
.unpark();
139 cores
.push(Box
::new(Core
{
145 tasks
: LinkedList
::new(),
147 rand
: FastRand
::new(seed()),
150 remotes
.push(Remote
{
152 pending_drop
: task
::TransferStack
::new(),
157 let shared
= Arc
::new(Shared
{
158 remotes
: remotes
.into_boxed_slice(),
159 inject
: queue
::Inject
::new(),
160 idle
: Idle
::new(size
),
161 shutdown_cores
: Mutex
::new(vec
![]),
164 let mut launch
= Launch(vec
![]);
166 for (index
, core
) in cores
.drain(..).enumerate() {
167 launch
.0.push(Arc
::new(Worker
{
168 shared
: shared
.clone(),
170 core
: AtomicCell
::new(Some(core
)),
177 pub(crate) fn block_in_place
<F
, R
>(f
: F
) -> R
181 // Try to steal the worker core back
182 struct Reset(coop
::Budget
);
184 impl Drop
for Reset
{
186 CURRENT
.with(|maybe_cx
| {
187 if let Some(cx
) = maybe_cx
{
188 let core
= cx
.worker
.core
.take();
189 let mut cx_core
= cx
.core
.borrow_mut();
190 assert
!(cx_core
.is_none());
193 // Reset the task budget as we are re-entering the
201 let mut had_entered
= false;
203 CURRENT
.with(|maybe_cx
| {
204 match (crate::runtime
::enter
::context(), maybe_cx
.is_some()) {
205 (EnterContext
::Entered { .. }
, true) => {
206 // We are on a thread pool runtime thread, so we just need to set up blocking.
209 (EnterContext
::Entered { allow_blocking }
, false) => {
210 // We are on an executor, but _not_ on the thread pool.
211 // That is _only_ okay if we are in a thread pool runtime's block_on method:
216 // This probably means we are on the basic_scheduler or in a LocalSet,
217 // where it is _not_ okay to block.
218 panic
!("can call blocking only when running on the multi-threaded runtime");
221 (EnterContext
::NotEntered
, true) => {
222 // This is a nested call to block_in_place (we already exited).
223 // All the necessary setup has already been done.
226 (EnterContext
::NotEntered
, false) => {
227 // We are outside of the tokio runtime, so blocking is fine.
228 // We can also skip all of the thread pool blocking setup steps.
233 let cx
= maybe_cx
.expect("no .is_some() == false cases above should lead here");
235 // Get the worker core. If none is set, then blocking is fine!
236 let core
= match cx
.core
.borrow_mut().take() {
241 // The parker should be set here
242 assert
!(core
.park
.is_some());
244 // In order to block, the core must be sent to another thread for
247 // First, move the core back into the worker's shared core slot.
248 cx
.worker
.core
.set(core
);
250 // Next, clone the worker handle and send it to a new thread for
253 // Once the blocking task is done executing, we will attempt to
254 // steal the core back.
255 let worker
= cx
.worker
.clone();
256 runtime
::spawn_blocking(move || run(worker
));
260 // Unset the current task's budget. Blocking sections are not
261 // constrained by task budgets.
262 let _reset
= Reset(coop
::stop());
264 crate::runtime
::enter
::exit(f
)
270 /// After how many ticks is the global queue polled. This helps to ensure
273 /// The number is fairly arbitrary. I believe this value was copied from golang.
274 const GLOBAL_POLL_INTERVAL
: u8 = 61;
277 pub(crate) fn launch(mut self) {
278 for worker
in self.0.drain(..) {
279 runtime
::spawn_blocking(move || run(worker
));
284 fn run(worker
: Arc
<Worker
>) {
285 // Acquire a core. If this fails, then another thread is running this
286 // worker and there is nothing further to do.
287 let core
= match worker
.core
.take() {
292 // Set the worker context.
295 core
: RefCell
::new(None
),
298 let _enter
= crate::runtime
::enter(true);
300 CURRENT
.set(&cx
, || {
301 // This should always be an error. It only returns a `Result` to support
302 // using `?` to short circuit.
303 assert
!(cx
.run(core
).is_err());
308 fn run(&self, mut core
: Box
<Core
>) -> RunResult
{
309 while !core
.is_shutdown
{
310 // Increment the tick
313 // Run maintenance, if needed
314 core
= self.maintenance(core
);
316 // First, check work available to the current worker.
317 if let Some(task
) = core
.next_task(&self.worker
) {
318 core
= self.run_task(task
, core
)?
;
322 // There is no more **local** work to process, try to steal work
323 // from other workers.
324 if let Some(task
) = core
.steal_work(&self.worker
) {
325 core
= self.run_task(task
, core
)?
;
328 core
= self.park(core
);
332 core
.pre_shutdown(&self.worker
);
335 self.worker
.shared
.shutdown(core
);
339 fn run_task(&self, task
: Notified
, mut core
: Box
<Core
>) -> RunResult
{
340 // Make sure the worker is not in the **searching** state. This enables
341 // another idle worker to try to steal work.
342 core
.transition_from_searching(&self.worker
);
344 // Make the core available to the runtime context
345 *self.core
.borrow_mut() = Some(core
);
351 // As long as there is budget remaining and a task exists in the
352 // `lifo_slot`, then keep running.
354 // Check if we still have the core. If not, the core was stolen
355 // by another worker.
356 let mut core
= match self.core
.borrow_mut().take() {
358 None
=> return Err(()),
361 // Check for a task in the LIFO slot
362 let task
= match core
.lifo_slot
.take() {
364 None
=> return Ok(core
),
367 if coop
::has_budget_remaining() {
368 // Run the LIFO task, then loop
369 *self.core
.borrow_mut() = Some(core
);
372 // Not enough budget left to run the LIFO task, push it to
373 // the back of the queue and return.
374 core
.run_queue
.push_back(task
, self.worker
.inject());
381 fn maintenance(&self, mut core
: Box
<Core
>) -> Box
<Core
> {
382 if core
.tick
% GLOBAL_POLL_INTERVAL
== 0 {
383 // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
384 // to run without actually putting the thread to sleep.
385 core
= self.park_timeout(core
, Some(Duration
::from_millis(0)));
387 // Run regularly scheduled maintenance
388 core
.maintenance(&self.worker
);
394 fn park(&self, mut core
: Box
<Core
>) -> Box
<Core
> {
395 core
.transition_to_parked(&self.worker
);
397 while !core
.is_shutdown
{
398 core
= self.park_timeout(core
, None
);
400 // Run regularly scheduled maintenance
401 core
.maintenance(&self.worker
);
403 if core
.transition_from_parked(&self.worker
) {
411 fn park_timeout(&self, mut core
: Box
<Core
>, duration
: Option
<Duration
>) -> Box
<Core
> {
412 // Take the parker out of core
413 let mut park
= core
.park
.take().expect("park missing");
415 // Store `core` in context
416 *self.core
.borrow_mut() = Some(core
);
419 if let Some(timeout
) = duration
{
420 park
.park_timeout(timeout
).expect("park failed");
422 park
.park().expect("park failed");
425 // Remove `core` from context
426 core
= self.core
.borrow_mut().take().expect("core missing");
428 // Place `park` back in `core`
429 core
.park
= Some(park
);
431 // If there are tasks available to steal, notify a worker
432 if core
.run_queue
.is_stealable() {
433 self.worker
.shared
.notify_parked();
441 /// Increment the tick
443 self.tick
= self.tick
.wrapping_add(1);
446 /// Return the next notified task available to this worker.
447 fn next_task(&mut self, worker
: &Worker
) -> Option
<Notified
> {
448 if self.tick
% GLOBAL_POLL_INTERVAL
== 0 {
449 worker
.inject().pop().or_else(|| self.next_local_task())
451 self.next_local_task().or_else(|| worker
.inject().pop())
455 fn next_local_task(&mut self) -> Option
<Notified
> {
456 self.lifo_slot
.take().or_else(|| self.run_queue
.pop())
459 fn steal_work(&mut self, worker
: &Worker
) -> Option
<Notified
> {
460 if !self.transition_to_searching(worker
) {
464 let num
= worker
.shared
.remotes
.len();
465 // Start from a random worker
466 let start
= self.rand
.fastrand_n(num
as u32) as usize;
469 let i
= (start
+ i
) % num
;
471 // Don't steal from ourself! We know we don't have work.
472 if i
== worker
.index
{
476 let target
= &worker
.shared
.remotes
[i
];
477 if let Some(task
) = target
.steal
.steal_into(&mut self.run_queue
) {
482 // Fallback on checking the global queue
483 worker
.shared
.inject
.pop()
486 fn transition_to_searching(&mut self, worker
: &Worker
) -> bool
{
487 if !self.is_searching
{
488 self.is_searching
= worker
.shared
.idle
.transition_worker_to_searching();
494 fn transition_from_searching(&mut self, worker
: &Worker
) {
495 if !self.is_searching
{
499 self.is_searching
= false;
500 worker
.shared
.transition_worker_from_searching();
503 /// Prepare the worker state for parking
504 fn transition_to_parked(&mut self, worker
: &Worker
) {
505 // When the final worker transitions **out** of searching to parked, it
506 // must check all the queues one last time in case work materialized
507 // between the last work scan and transitioning out of searching.
508 let is_last_searcher
= worker
511 .transition_worker_to_parked(worker
.index
, self.is_searching
);
513 // The worker is no longer searching. Setting this is the local cache
515 self.is_searching
= false;
517 if is_last_searcher
{
518 worker
.shared
.notify_if_work_pending();
522 /// Returns `true` if the transition happened.
523 fn transition_from_parked(&mut self, worker
: &Worker
) -> bool
{
524 // If a task is in the lifo slot, then we must unpark regardless of
526 if self.lifo_slot
.is_some() {
527 worker
.shared
.idle
.unpark_worker_by_id(worker
.index
);
528 self.is_searching
= true;
532 if worker
.shared
.idle
.is_parked(worker
.index
) {
536 // When unparked, the worker is in the searching state.
537 self.is_searching
= true;
541 /// Runs maintenance work such as free pending tasks and check the pool's
543 fn maintenance(&mut self, worker
: &Worker
) {
544 self.drain_pending_drop(worker
);
546 if !self.is_shutdown
{
547 // Check if the scheduler has been shutdown
548 self.is_shutdown
= worker
.inject().is_closed();
552 // Signals all tasks to shut down, and waits for them to complete. Must run
553 // before we enter the single-threaded phase of shutdown processing.
554 fn pre_shutdown(&mut self, worker
: &Worker
) {
555 // Signal to all tasks to shut down.
556 for header
in self.tasks
.iter() {
561 self.drain_pending_drop(worker
);
563 if self.tasks
.is_empty() {
567 // Wait until signalled
568 let park
= self.park
.as_mut().expect("park missing");
569 park
.park().expect("park failed");
574 fn shutdown(&mut self) {
575 assert
!(self.tasks
.is_empty());
578 let mut park
= self.park
.take().expect("park missing");
581 while self.next_local_task().is_some() {}
586 fn drain_pending_drop(&mut self, worker
: &Worker
) {
587 use std
::mem
::ManuallyDrop
;
589 for task
in worker
.remote().pending_drop
.drain() {
590 let task
= ManuallyDrop
::new(task
);
592 // safety: tasks are only pushed into the `pending_drop` stacks that
593 // are associated with the list they are inserted into. When a task
594 // is pushed into `pending_drop`, the ref-inc is skipped, so we must
597 // See `bind` and `release` implementations.
599 self.tasks
.remove(task
.header().into());
606 /// Returns a reference to the scheduler's injection queue
607 fn inject(&self) -> &queue
::Inject
<Arc
<Worker
>> {
611 /// Return a reference to this worker's remote data
612 fn remote(&self) -> &Remote
{
613 &self.shared
.remotes
[self.index
]
616 fn eq(&self, other
: &Worker
) -> bool
{
617 self.shared
.ptr_eq(&other
.shared
) && self.index
== other
.index
621 impl task
::Schedule
for Arc
<Worker
> {
622 fn bind(task
: Task
) -> Arc
<Worker
> {
623 CURRENT
.with(|maybe_cx
| {
624 let cx
= maybe_cx
.expect("scheduler context missing");
630 .expect("scheduler core missing")
634 // Return a clone of the worker
639 fn release(&self, task
: &Task
) -> Option
<Task
> {
640 use std
::ptr
::NonNull
;
643 // Task has been synchronously removed from the Core owned by the
645 Removed(Option
<Task
>),
646 // Task is owned by another thread, so we need to notify it to clean
647 // up the task later.
651 let immediate
= CURRENT
.with(|maybe_cx
| {
652 let cx
= match maybe_cx
{
654 None
=> return Immediate
::MaybeRemote
,
657 if !self.eq(&cx
.worker
) {
658 // Task owned by another core, so we need to notify it.
659 return Immediate
::MaybeRemote
;
662 let mut maybe_core
= cx
.core
.borrow_mut();
664 if let Some(core
) = &mut *maybe_core
{
665 // Directly remove the task
667 // safety: the task is inserted in the list in `bind`.
669 let ptr
= NonNull
::from(task
.header());
670 return Immediate
::Removed(core
.tasks
.remove(ptr
));
674 Immediate
::MaybeRemote
677 // Checks if we were called from within a worker, allowing for immediate
678 // removal of a scheduled task. Else we have to go through the slower
679 // process below where we remotely mark a task as dropped.
681 Immediate
::Removed(task
) => return task
,
682 Immediate
::MaybeRemote
=> (),
685 // Track the task to be released by the worker that owns it
687 // Safety: We get a new handle without incrementing the ref-count.
688 // A ref-count is held by the "owned" linked list and it is only
689 // ever removed from that list as part of the release process: this
690 // method or popping the task from `pending_drop`. Thus, we can rely
691 // on the ref-count held by the linked-list to keep the memory
694 // When the task is removed from the stack, it is forgotten instead
696 let task
= unsafe { Task::from_raw(task.header().into()) }
;
698 self.remote().pending_drop
.push(task
);
700 // The worker core has been handed off to another thread. In the
701 // event that the scheduler is currently shutting down, the thread
702 // that owns the task may be waiting on the release to complete
704 if self.inject().is_closed() {
705 self.remote().unpark
.unpark();
711 fn schedule(&self, task
: Notified
) {
712 // Because this is not a newly spawned task, if scheduling fails due to
713 // the runtime shutting down, there is no special work that must happen
715 let _
= self.shared
.schedule(task
, false);
718 fn yield_now(&self, task
: Notified
) {
719 // Because this is not a newly spawned task, if scheduling fails due to
720 // the runtime shutting down, there is no special work that must happen
722 let _
= self.shared
.schedule(task
, true);
727 pub(super) fn schedule(&self, task
: Notified
, is_yield
: bool
) -> Result
<(), Notified
> {
728 CURRENT
.with(|maybe_cx
| {
729 if let Some(cx
) = maybe_cx
{
730 // Make sure the task is part of the **current** scheduler.
731 if self.ptr_eq(&cx
.worker
.shared
) {
732 // And the current thread still holds a core
733 if let Some(core
) = cx
.core
.borrow_mut().as_mut() {
734 self.schedule_local(core
, task
, is_yield
);
740 // Otherwise, use the inject queue
741 self.inject
.push(task
)?
;
742 self.notify_parked();
747 fn schedule_local(&self, core
: &mut Core
, task
: Notified
, is_yield
: bool
) {
748 // Spawning from the worker thread. If scheduling a "yield" then the
749 // task must always be pushed to the back of the queue, enabling other
750 // tasks to be executed. If **not** a yield, then there is more
751 // flexibility and the task may go to the front of the queue.
752 let should_notify
= if is_yield
{
753 core
.run_queue
.push_back(task
, &self.inject
);
756 // Push to the LIFO slot
757 let prev
= core
.lifo_slot
.take();
758 let ret
= prev
.is_some();
760 if let Some(prev
) = prev
{
761 core
.run_queue
.push_back(prev
, &self.inject
);
764 core
.lifo_slot
= Some(task
);
769 // Only notify if not currently parked. If `park` is `None`, then the
770 // scheduling is from a resource driver. As notifications often come in
771 // batches, the notification is delayed until the park is complete.
772 if should_notify
&& core
.park
.is_some() {
773 self.notify_parked();
777 pub(super) fn close(&self) {
778 if self.inject
.close() {
783 fn notify_parked(&self) {
784 if let Some(index
) = self.idle
.worker_to_notify() {
785 self.remotes
[index
].unpark
.unpark();
789 fn notify_all(&self) {
790 for remote
in &self.remotes
[..] {
791 remote
.unpark
.unpark();
795 fn notify_if_work_pending(&self) {
796 for remote
in &self.remotes
[..] {
797 if !remote
.steal
.is_empty() {
798 self.notify_parked();
803 if !self.inject
.is_empty() {
804 self.notify_parked();
808 fn transition_worker_from_searching(&self) {
809 if self.idle
.transition_worker_from_searching() {
810 // We are the final searching worker. Because work was found, we
811 // need to notify another worker.
812 self.notify_parked();
816 /// Signals that a worker has observed the shutdown signal and has replaced
817 /// its core back into its handle.
819 /// If all workers have reached this point, the final cleanup is performed.
820 fn shutdown(&self, core
: Box
<Core
>) {
821 let mut cores
= self.shutdown_cores
.lock();
824 if cores
.len() != self.remotes
.len() {
828 for mut core
in cores
.drain(..) {
832 // Drain the injection queue
833 while let Some(task
) = self.inject
.pop() {
838 fn ptr_eq(&self, other
: &Shared
) -> bool
{
839 std
::ptr
::eq(self, other
)