]> git.proxmox.com Git - rustc.git/blob - vendor/tokio/src/runtime/thread_pool/worker.rs
New upstream version 1.60.0+dfsg1
[rustc.git] / vendor / tokio / src / runtime / thread_pool / worker.rs
1 //! A scheduler is initialized with a fixed number of workers. Each worker is
2 //! driven by a thread. Each worker has a "core" which contains data such as the
3 //! run queue and other state. When `block_in_place` is called, the worker's
4 //! "core" is handed off to a new thread allowing the scheduler to continue to
5 //! make progress while the originating thread blocks.
6
7 use crate::coop;
8 use crate::loom::rand::seed;
9 use crate::loom::sync::{Arc, Mutex};
10 use crate::park::{Park, Unpark};
11 use crate::runtime;
12 use crate::runtime::enter::EnterContext;
13 use crate::runtime::park::{Parker, Unparker};
14 use crate::runtime::thread_pool::{AtomicCell, Idle};
15 use crate::runtime::{queue, task};
16 use crate::util::linked_list::{Link, LinkedList};
17 use crate::util::FastRand;
18
19 use std::cell::RefCell;
20 use std::time::Duration;
21
22 /// A scheduler worker
23 pub(super) struct Worker {
24 /// Reference to shared state
25 shared: Arc<Shared>,
26
27 /// Index holding this worker's remote state
28 index: usize,
29
30 /// Used to hand-off a worker's core to another thread.
31 core: AtomicCell<Core>,
32 }
33
34 /// Core data
35 struct Core {
36 /// Used to schedule bookkeeping tasks every so often.
37 tick: u8,
38
39 /// When a task is scheduled from a worker, it is stored in this slot. The
40 /// worker will check this slot for a task **before** checking the run
41 /// queue. This effectively results in the **last** scheduled task to be run
42 /// next (LIFO). This is an optimization for message passing patterns and
43 /// helps to reduce latency.
44 lifo_slot: Option<Notified>,
45
46 /// The worker-local run queue.
47 run_queue: queue::Local<Arc<Worker>>,
48
49 /// True if the worker is currently searching for more work. Searching
50 /// involves attempting to steal from other workers.
51 is_searching: bool,
52
53 /// True if the scheduler is being shutdown
54 is_shutdown: bool,
55
56 /// Tasks owned by the core
57 tasks: LinkedList<Task, <Task as Link>::Target>,
58
59 /// Parker
60 ///
61 /// Stored in an `Option` as the parker is added / removed to make the
62 /// borrow checker happy.
63 park: Option<Parker>,
64
65 /// Fast random number generator.
66 rand: FastRand,
67 }
68
69 /// State shared across all workers
70 pub(super) struct Shared {
71 /// Per-worker remote state. All other workers have access to this and is
72 /// how they communicate between each other.
73 remotes: Box<[Remote]>,
74
75 /// Submit work to the scheduler while **not** currently on a worker thread.
76 inject: queue::Inject<Arc<Worker>>,
77
78 /// Coordinates idle workers
79 idle: Idle,
80
81 /// Cores that have observed the shutdown signal
82 ///
83 /// The core is **not** placed back in the worker to avoid it from being
84 /// stolen by a thread that was spawned as part of `block_in_place`.
85 #[allow(clippy::vec_box)] // we're moving an already-boxed value
86 shutdown_cores: Mutex<Vec<Box<Core>>>,
87 }
88
89 /// Used to communicate with a worker from other threads.
90 struct Remote {
91 /// Steal tasks from this worker.
92 steal: queue::Steal<Arc<Worker>>,
93
94 /// Transfers tasks to be released. Any worker pushes tasks, only the owning
95 /// worker pops.
96 pending_drop: task::TransferStack<Arc<Worker>>,
97
98 /// Unparks the associated worker thread
99 unpark: Unparker,
100 }
101
102 /// Thread-local context
103 struct Context {
104 /// Worker
105 worker: Arc<Worker>,
106
107 /// Core data
108 core: RefCell<Option<Box<Core>>>,
109 }
110
111 /// Starts the workers
112 pub(crate) struct Launch(Vec<Arc<Worker>>);
113
114 /// Running a task may consume the core. If the core is still available when
115 /// running the task completes, it is returned. Otherwise, the worker will need
116 /// to stop processing.
117 type RunResult = Result<Box<Core>, ()>;
118
119 /// A task handle
120 type Task = task::Task<Arc<Worker>>;
121
122 /// A notified task handle
123 type Notified = task::Notified<Arc<Worker>>;
124
125 // Tracks thread-local state
126 scoped_thread_local!(static CURRENT: Context);
127
128 pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) {
129 let mut cores = vec![];
130 let mut remotes = vec![];
131
132 // Create the local queues
133 for _ in 0..size {
134 let (steal, run_queue) = queue::local();
135
136 let park = park.clone();
137 let unpark = park.unpark();
138
139 cores.push(Box::new(Core {
140 tick: 0,
141 lifo_slot: None,
142 run_queue,
143 is_searching: false,
144 is_shutdown: false,
145 tasks: LinkedList::new(),
146 park: Some(park),
147 rand: FastRand::new(seed()),
148 }));
149
150 remotes.push(Remote {
151 steal,
152 pending_drop: task::TransferStack::new(),
153 unpark,
154 });
155 }
156
157 let shared = Arc::new(Shared {
158 remotes: remotes.into_boxed_slice(),
159 inject: queue::Inject::new(),
160 idle: Idle::new(size),
161 shutdown_cores: Mutex::new(vec![]),
162 });
163
164 let mut launch = Launch(vec![]);
165
166 for (index, core) in cores.drain(..).enumerate() {
167 launch.0.push(Arc::new(Worker {
168 shared: shared.clone(),
169 index,
170 core: AtomicCell::new(Some(core)),
171 }));
172 }
173
174 (shared, launch)
175 }
176
177 pub(crate) fn block_in_place<F, R>(f: F) -> R
178 where
179 F: FnOnce() -> R,
180 {
181 // Try to steal the worker core back
182 struct Reset(coop::Budget);
183
184 impl Drop for Reset {
185 fn drop(&mut self) {
186 CURRENT.with(|maybe_cx| {
187 if let Some(cx) = maybe_cx {
188 let core = cx.worker.core.take();
189 let mut cx_core = cx.core.borrow_mut();
190 assert!(cx_core.is_none());
191 *cx_core = core;
192
193 // Reset the task budget as we are re-entering the
194 // runtime.
195 coop::set(self.0);
196 }
197 });
198 }
199 }
200
201 let mut had_entered = false;
202
203 CURRENT.with(|maybe_cx| {
204 match (crate::runtime::enter::context(), maybe_cx.is_some()) {
205 (EnterContext::Entered { .. }, true) => {
206 // We are on a thread pool runtime thread, so we just need to set up blocking.
207 had_entered = true;
208 }
209 (EnterContext::Entered { allow_blocking }, false) => {
210 // We are on an executor, but _not_ on the thread pool.
211 // That is _only_ okay if we are in a thread pool runtime's block_on method:
212 if allow_blocking {
213 had_entered = true;
214 return;
215 } else {
216 // This probably means we are on the basic_scheduler or in a LocalSet,
217 // where it is _not_ okay to block.
218 panic!("can call blocking only when running on the multi-threaded runtime");
219 }
220 }
221 (EnterContext::NotEntered, true) => {
222 // This is a nested call to block_in_place (we already exited).
223 // All the necessary setup has already been done.
224 return;
225 }
226 (EnterContext::NotEntered, false) => {
227 // We are outside of the tokio runtime, so blocking is fine.
228 // We can also skip all of the thread pool blocking setup steps.
229 return;
230 }
231 }
232
233 let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
234
235 // Get the worker core. If none is set, then blocking is fine!
236 let core = match cx.core.borrow_mut().take() {
237 Some(core) => core,
238 None => return,
239 };
240
241 // The parker should be set here
242 assert!(core.park.is_some());
243
244 // In order to block, the core must be sent to another thread for
245 // execution.
246 //
247 // First, move the core back into the worker's shared core slot.
248 cx.worker.core.set(core);
249
250 // Next, clone the worker handle and send it to a new thread for
251 // processing.
252 //
253 // Once the blocking task is done executing, we will attempt to
254 // steal the core back.
255 let worker = cx.worker.clone();
256 runtime::spawn_blocking(move || run(worker));
257 });
258
259 if had_entered {
260 // Unset the current task's budget. Blocking sections are not
261 // constrained by task budgets.
262 let _reset = Reset(coop::stop());
263
264 crate::runtime::enter::exit(f)
265 } else {
266 f()
267 }
268 }
269
270 /// After how many ticks is the global queue polled. This helps to ensure
271 /// fairness.
272 ///
273 /// The number is fairly arbitrary. I believe this value was copied from golang.
274 const GLOBAL_POLL_INTERVAL: u8 = 61;
275
276 impl Launch {
277 pub(crate) fn launch(mut self) {
278 for worker in self.0.drain(..) {
279 runtime::spawn_blocking(move || run(worker));
280 }
281 }
282 }
283
284 fn run(worker: Arc<Worker>) {
285 // Acquire a core. If this fails, then another thread is running this
286 // worker and there is nothing further to do.
287 let core = match worker.core.take() {
288 Some(core) => core,
289 None => return,
290 };
291
292 // Set the worker context.
293 let cx = Context {
294 worker,
295 core: RefCell::new(None),
296 };
297
298 let _enter = crate::runtime::enter(true);
299
300 CURRENT.set(&cx, || {
301 // This should always be an error. It only returns a `Result` to support
302 // using `?` to short circuit.
303 assert!(cx.run(core).is_err());
304 });
305 }
306
307 impl Context {
308 fn run(&self, mut core: Box<Core>) -> RunResult {
309 while !core.is_shutdown {
310 // Increment the tick
311 core.tick();
312
313 // Run maintenance, if needed
314 core = self.maintenance(core);
315
316 // First, check work available to the current worker.
317 if let Some(task) = core.next_task(&self.worker) {
318 core = self.run_task(task, core)?;
319 continue;
320 }
321
322 // There is no more **local** work to process, try to steal work
323 // from other workers.
324 if let Some(task) = core.steal_work(&self.worker) {
325 core = self.run_task(task, core)?;
326 } else {
327 // Wait for work
328 core = self.park(core);
329 }
330 }
331
332 core.pre_shutdown(&self.worker);
333
334 // Signal shutdown
335 self.worker.shared.shutdown(core);
336 Err(())
337 }
338
339 fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
340 // Make sure the worker is not in the **searching** state. This enables
341 // another idle worker to try to steal work.
342 core.transition_from_searching(&self.worker);
343
344 // Make the core available to the runtime context
345 *self.core.borrow_mut() = Some(core);
346
347 // Run the task
348 coop::budget(|| {
349 task.run();
350
351 // As long as there is budget remaining and a task exists in the
352 // `lifo_slot`, then keep running.
353 loop {
354 // Check if we still have the core. If not, the core was stolen
355 // by another worker.
356 let mut core = match self.core.borrow_mut().take() {
357 Some(core) => core,
358 None => return Err(()),
359 };
360
361 // Check for a task in the LIFO slot
362 let task = match core.lifo_slot.take() {
363 Some(task) => task,
364 None => return Ok(core),
365 };
366
367 if coop::has_budget_remaining() {
368 // Run the LIFO task, then loop
369 *self.core.borrow_mut() = Some(core);
370 task.run();
371 } else {
372 // Not enough budget left to run the LIFO task, push it to
373 // the back of the queue and return.
374 core.run_queue.push_back(task, self.worker.inject());
375 return Ok(core);
376 }
377 }
378 })
379 }
380
381 fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
382 if core.tick % GLOBAL_POLL_INTERVAL == 0 {
383 // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
384 // to run without actually putting the thread to sleep.
385 core = self.park_timeout(core, Some(Duration::from_millis(0)));
386
387 // Run regularly scheduled maintenance
388 core.maintenance(&self.worker);
389 }
390
391 core
392 }
393
394 fn park(&self, mut core: Box<Core>) -> Box<Core> {
395 core.transition_to_parked(&self.worker);
396
397 while !core.is_shutdown {
398 core = self.park_timeout(core, None);
399
400 // Run regularly scheduled maintenance
401 core.maintenance(&self.worker);
402
403 if core.transition_from_parked(&self.worker) {
404 return core;
405 }
406 }
407
408 core
409 }
410
411 fn park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> {
412 // Take the parker out of core
413 let mut park = core.park.take().expect("park missing");
414
415 // Store `core` in context
416 *self.core.borrow_mut() = Some(core);
417
418 // Park thread
419 if let Some(timeout) = duration {
420 park.park_timeout(timeout).expect("park failed");
421 } else {
422 park.park().expect("park failed");
423 }
424
425 // Remove `core` from context
426 core = self.core.borrow_mut().take().expect("core missing");
427
428 // Place `park` back in `core`
429 core.park = Some(park);
430
431 // If there are tasks available to steal, notify a worker
432 if core.run_queue.is_stealable() {
433 self.worker.shared.notify_parked();
434 }
435
436 core
437 }
438 }
439
440 impl Core {
441 /// Increment the tick
442 fn tick(&mut self) {
443 self.tick = self.tick.wrapping_add(1);
444 }
445
446 /// Return the next notified task available to this worker.
447 fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
448 if self.tick % GLOBAL_POLL_INTERVAL == 0 {
449 worker.inject().pop().or_else(|| self.next_local_task())
450 } else {
451 self.next_local_task().or_else(|| worker.inject().pop())
452 }
453 }
454
455 fn next_local_task(&mut self) -> Option<Notified> {
456 self.lifo_slot.take().or_else(|| self.run_queue.pop())
457 }
458
459 fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
460 if !self.transition_to_searching(worker) {
461 return None;
462 }
463
464 let num = worker.shared.remotes.len();
465 // Start from a random worker
466 let start = self.rand.fastrand_n(num as u32) as usize;
467
468 for i in 0..num {
469 let i = (start + i) % num;
470
471 // Don't steal from ourself! We know we don't have work.
472 if i == worker.index {
473 continue;
474 }
475
476 let target = &worker.shared.remotes[i];
477 if let Some(task) = target.steal.steal_into(&mut self.run_queue) {
478 return Some(task);
479 }
480 }
481
482 // Fallback on checking the global queue
483 worker.shared.inject.pop()
484 }
485
486 fn transition_to_searching(&mut self, worker: &Worker) -> bool {
487 if !self.is_searching {
488 self.is_searching = worker.shared.idle.transition_worker_to_searching();
489 }
490
491 self.is_searching
492 }
493
494 fn transition_from_searching(&mut self, worker: &Worker) {
495 if !self.is_searching {
496 return;
497 }
498
499 self.is_searching = false;
500 worker.shared.transition_worker_from_searching();
501 }
502
503 /// Prepare the worker state for parking
504 fn transition_to_parked(&mut self, worker: &Worker) {
505 // When the final worker transitions **out** of searching to parked, it
506 // must check all the queues one last time in case work materialized
507 // between the last work scan and transitioning out of searching.
508 let is_last_searcher = worker
509 .shared
510 .idle
511 .transition_worker_to_parked(worker.index, self.is_searching);
512
513 // The worker is no longer searching. Setting this is the local cache
514 // only.
515 self.is_searching = false;
516
517 if is_last_searcher {
518 worker.shared.notify_if_work_pending();
519 }
520 }
521
522 /// Returns `true` if the transition happened.
523 fn transition_from_parked(&mut self, worker: &Worker) -> bool {
524 // If a task is in the lifo slot, then we must unpark regardless of
525 // being notified
526 if self.lifo_slot.is_some() {
527 worker.shared.idle.unpark_worker_by_id(worker.index);
528 self.is_searching = true;
529 return true;
530 }
531
532 if worker.shared.idle.is_parked(worker.index) {
533 return false;
534 }
535
536 // When unparked, the worker is in the searching state.
537 self.is_searching = true;
538 true
539 }
540
541 /// Runs maintenance work such as free pending tasks and check the pool's
542 /// state.
543 fn maintenance(&mut self, worker: &Worker) {
544 self.drain_pending_drop(worker);
545
546 if !self.is_shutdown {
547 // Check if the scheduler has been shutdown
548 self.is_shutdown = worker.inject().is_closed();
549 }
550 }
551
552 // Signals all tasks to shut down, and waits for them to complete. Must run
553 // before we enter the single-threaded phase of shutdown processing.
554 fn pre_shutdown(&mut self, worker: &Worker) {
555 // Signal to all tasks to shut down.
556 for header in self.tasks.iter() {
557 header.shutdown();
558 }
559
560 loop {
561 self.drain_pending_drop(worker);
562
563 if self.tasks.is_empty() {
564 break;
565 }
566
567 // Wait until signalled
568 let park = self.park.as_mut().expect("park missing");
569 park.park().expect("park failed");
570 }
571 }
572
573 // Shutdown the core
574 fn shutdown(&mut self) {
575 assert!(self.tasks.is_empty());
576
577 // Take the core
578 let mut park = self.park.take().expect("park missing");
579
580 // Drain the queue
581 while self.next_local_task().is_some() {}
582
583 park.shutdown();
584 }
585
586 fn drain_pending_drop(&mut self, worker: &Worker) {
587 use std::mem::ManuallyDrop;
588
589 for task in worker.remote().pending_drop.drain() {
590 let task = ManuallyDrop::new(task);
591
592 // safety: tasks are only pushed into the `pending_drop` stacks that
593 // are associated with the list they are inserted into. When a task
594 // is pushed into `pending_drop`, the ref-inc is skipped, so we must
595 // not ref-dec here.
596 //
597 // See `bind` and `release` implementations.
598 unsafe {
599 self.tasks.remove(task.header().into());
600 }
601 }
602 }
603 }
604
605 impl Worker {
606 /// Returns a reference to the scheduler's injection queue
607 fn inject(&self) -> &queue::Inject<Arc<Worker>> {
608 &self.shared.inject
609 }
610
611 /// Return a reference to this worker's remote data
612 fn remote(&self) -> &Remote {
613 &self.shared.remotes[self.index]
614 }
615
616 fn eq(&self, other: &Worker) -> bool {
617 self.shared.ptr_eq(&other.shared) && self.index == other.index
618 }
619 }
620
621 impl task::Schedule for Arc<Worker> {
622 fn bind(task: Task) -> Arc<Worker> {
623 CURRENT.with(|maybe_cx| {
624 let cx = maybe_cx.expect("scheduler context missing");
625
626 // Track the task
627 cx.core
628 .borrow_mut()
629 .as_mut()
630 .expect("scheduler core missing")
631 .tasks
632 .push_front(task);
633
634 // Return a clone of the worker
635 cx.worker.clone()
636 })
637 }
638
639 fn release(&self, task: &Task) -> Option<Task> {
640 use std::ptr::NonNull;
641
642 enum Immediate {
643 // Task has been synchronously removed from the Core owned by the
644 // current thread
645 Removed(Option<Task>),
646 // Task is owned by another thread, so we need to notify it to clean
647 // up the task later.
648 MaybeRemote,
649 }
650
651 let immediate = CURRENT.with(|maybe_cx| {
652 let cx = match maybe_cx {
653 Some(cx) => cx,
654 None => return Immediate::MaybeRemote,
655 };
656
657 if !self.eq(&cx.worker) {
658 // Task owned by another core, so we need to notify it.
659 return Immediate::MaybeRemote;
660 }
661
662 let mut maybe_core = cx.core.borrow_mut();
663
664 if let Some(core) = &mut *maybe_core {
665 // Directly remove the task
666 //
667 // safety: the task is inserted in the list in `bind`.
668 unsafe {
669 let ptr = NonNull::from(task.header());
670 return Immediate::Removed(core.tasks.remove(ptr));
671 }
672 }
673
674 Immediate::MaybeRemote
675 });
676
677 // Checks if we were called from within a worker, allowing for immediate
678 // removal of a scheduled task. Else we have to go through the slower
679 // process below where we remotely mark a task as dropped.
680 match immediate {
681 Immediate::Removed(task) => return task,
682 Immediate::MaybeRemote => (),
683 };
684
685 // Track the task to be released by the worker that owns it
686 //
687 // Safety: We get a new handle without incrementing the ref-count.
688 // A ref-count is held by the "owned" linked list and it is only
689 // ever removed from that list as part of the release process: this
690 // method or popping the task from `pending_drop`. Thus, we can rely
691 // on the ref-count held by the linked-list to keep the memory
692 // alive.
693 //
694 // When the task is removed from the stack, it is forgotten instead
695 // of dropped.
696 let task = unsafe { Task::from_raw(task.header().into()) };
697
698 self.remote().pending_drop.push(task);
699
700 // The worker core has been handed off to another thread. In the
701 // event that the scheduler is currently shutting down, the thread
702 // that owns the task may be waiting on the release to complete
703 // shutdown.
704 if self.inject().is_closed() {
705 self.remote().unpark.unpark();
706 }
707
708 None
709 }
710
711 fn schedule(&self, task: Notified) {
712 // Because this is not a newly spawned task, if scheduling fails due to
713 // the runtime shutting down, there is no special work that must happen
714 // here.
715 let _ = self.shared.schedule(task, false);
716 }
717
718 fn yield_now(&self, task: Notified) {
719 // Because this is not a newly spawned task, if scheduling fails due to
720 // the runtime shutting down, there is no special work that must happen
721 // here.
722 let _ = self.shared.schedule(task, true);
723 }
724 }
725
726 impl Shared {
727 pub(super) fn schedule(&self, task: Notified, is_yield: bool) -> Result<(), Notified> {
728 CURRENT.with(|maybe_cx| {
729 if let Some(cx) = maybe_cx {
730 // Make sure the task is part of the **current** scheduler.
731 if self.ptr_eq(&cx.worker.shared) {
732 // And the current thread still holds a core
733 if let Some(core) = cx.core.borrow_mut().as_mut() {
734 self.schedule_local(core, task, is_yield);
735 return Ok(());
736 }
737 }
738 }
739
740 // Otherwise, use the inject queue
741 self.inject.push(task)?;
742 self.notify_parked();
743 Ok(())
744 })
745 }
746
747 fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
748 // Spawning from the worker thread. If scheduling a "yield" then the
749 // task must always be pushed to the back of the queue, enabling other
750 // tasks to be executed. If **not** a yield, then there is more
751 // flexibility and the task may go to the front of the queue.
752 let should_notify = if is_yield {
753 core.run_queue.push_back(task, &self.inject);
754 true
755 } else {
756 // Push to the LIFO slot
757 let prev = core.lifo_slot.take();
758 let ret = prev.is_some();
759
760 if let Some(prev) = prev {
761 core.run_queue.push_back(prev, &self.inject);
762 }
763
764 core.lifo_slot = Some(task);
765
766 ret
767 };
768
769 // Only notify if not currently parked. If `park` is `None`, then the
770 // scheduling is from a resource driver. As notifications often come in
771 // batches, the notification is delayed until the park is complete.
772 if should_notify && core.park.is_some() {
773 self.notify_parked();
774 }
775 }
776
777 pub(super) fn close(&self) {
778 if self.inject.close() {
779 self.notify_all();
780 }
781 }
782
783 fn notify_parked(&self) {
784 if let Some(index) = self.idle.worker_to_notify() {
785 self.remotes[index].unpark.unpark();
786 }
787 }
788
789 fn notify_all(&self) {
790 for remote in &self.remotes[..] {
791 remote.unpark.unpark();
792 }
793 }
794
795 fn notify_if_work_pending(&self) {
796 for remote in &self.remotes[..] {
797 if !remote.steal.is_empty() {
798 self.notify_parked();
799 return;
800 }
801 }
802
803 if !self.inject.is_empty() {
804 self.notify_parked();
805 }
806 }
807
808 fn transition_worker_from_searching(&self) {
809 if self.idle.transition_worker_from_searching() {
810 // We are the final searching worker. Because work was found, we
811 // need to notify another worker.
812 self.notify_parked();
813 }
814 }
815
816 /// Signals that a worker has observed the shutdown signal and has replaced
817 /// its core back into its handle.
818 ///
819 /// If all workers have reached this point, the final cleanup is performed.
820 fn shutdown(&self, core: Box<Core>) {
821 let mut cores = self.shutdown_cores.lock();
822 cores.push(core);
823
824 if cores.len() != self.remotes.len() {
825 return;
826 }
827
828 for mut core in cores.drain(..) {
829 core.shutdown();
830 }
831
832 // Drain the injection queue
833 while let Some(task) = self.inject.pop() {
834 task.shutdown();
835 }
836 }
837
838 fn ptr_eq(&self, other: &Shared) -> bool {
839 std::ptr::eq(self, other)
840 }
841 }