]>
Commit | Line | Data |
---|---|---|
2b03887a FG |
1 | use crate::cell::Cell; |
2 | use crate::sync as public; | |
3 | use crate::sync::atomic::{ | |
4 | AtomicU32, | |
5 | Ordering::{Acquire, Relaxed, Release}, | |
6 | }; | |
7 | use crate::sys::futex::{futex_wait, futex_wake_all}; | |
8 | ||
9 | // On some platforms, the OS is very nice and handles the waiter queue for us. | |
10 | // This means we only need one atomic value with 5 states: | |
11 | ||
12 | /// No initialization has run yet, and no thread is currently using the Once. | |
13 | const INCOMPLETE: u32 = 0; | |
14 | /// Some thread has previously attempted to initialize the Once, but it panicked, | |
15 | /// so the Once is now poisoned. There are no other threads currently accessing | |
16 | /// this Once. | |
17 | const POISONED: u32 = 1; | |
18 | /// Some thread is currently attempting to run initialization. It may succeed, | |
19 | /// so all future threads need to wait for it to finish. | |
20 | const RUNNING: u32 = 2; | |
21 | /// Some thread is currently attempting to run initialization and there are threads | |
22 | /// waiting for it to finish. | |
23 | const QUEUED: u32 = 3; | |
24 | /// Initialization has completed and all future calls should finish immediately. | |
25 | const COMPLETE: u32 = 4; | |
26 | ||
27 | // Threads wait by setting the state to QUEUED and calling `futex_wait` on the state | |
28 | // variable. When the running thread finishes, it will wake all waiting threads using | |
29 | // `futex_wake_all`. | |
30 | ||
31 | pub struct OnceState { | |
32 | poisoned: bool, | |
33 | set_state_to: Cell<u32>, | |
34 | } | |
35 | ||
36 | impl OnceState { | |
37 | #[inline] | |
38 | pub fn is_poisoned(&self) -> bool { | |
39 | self.poisoned | |
40 | } | |
41 | ||
42 | #[inline] | |
43 | pub fn poison(&self) { | |
44 | self.set_state_to.set(POISONED); | |
45 | } | |
46 | } | |
47 | ||
48 | struct CompletionGuard<'a> { | |
49 | state: &'a AtomicU32, | |
50 | set_state_on_drop_to: u32, | |
51 | } | |
52 | ||
53 | impl<'a> Drop for CompletionGuard<'a> { | |
54 | fn drop(&mut self) { | |
55 | // Use release ordering to propagate changes to all threads checking | |
56 | // up on the Once. `futex_wake_all` does its own synchronization, hence | |
57 | // we do not need `AcqRel`. | |
58 | if self.state.swap(self.set_state_on_drop_to, Release) == QUEUED { | |
59 | futex_wake_all(&self.state); | |
60 | } | |
61 | } | |
62 | } | |
63 | ||
64 | pub struct Once { | |
65 | state: AtomicU32, | |
66 | } | |
67 | ||
68 | impl Once { | |
69 | #[inline] | |
70 | pub const fn new() -> Once { | |
71 | Once { state: AtomicU32::new(INCOMPLETE) } | |
72 | } | |
73 | ||
74 | #[inline] | |
75 | pub fn is_completed(&self) -> bool { | |
76 | // Use acquire ordering to make all initialization changes visible to the | |
77 | // current thread. | |
78 | self.state.load(Acquire) == COMPLETE | |
79 | } | |
80 | ||
81 | // This uses FnMut to match the API of the generic implementation. As this | |
82 | // implementation is quite light-weight, it is generic over the closure and | |
83 | // so avoids the cost of dynamic dispatch. | |
84 | #[cold] | |
85 | #[track_caller] | |
86 | pub fn call(&self, ignore_poisoning: bool, f: &mut impl FnMut(&public::OnceState)) { | |
87 | let mut state = self.state.load(Acquire); | |
88 | loop { | |
89 | match state { | |
90 | POISONED if !ignore_poisoning => { | |
91 | // Panic to propagate the poison. | |
92 | panic!("Once instance has previously been poisoned"); | |
93 | } | |
94 | INCOMPLETE | POISONED => { | |
95 | // Try to register the current thread as the one running. | |
96 | if let Err(new) = | |
97 | self.state.compare_exchange_weak(state, RUNNING, Acquire, Acquire) | |
98 | { | |
99 | state = new; | |
100 | continue; | |
101 | } | |
102 | // `waiter_queue` will manage other waiting threads, and | |
103 | // wake them up on drop. | |
104 | let mut waiter_queue = | |
105 | CompletionGuard { state: &self.state, set_state_on_drop_to: POISONED }; | |
106 | // Run the function, letting it know if we're poisoned or not. | |
107 | let f_state = public::OnceState { | |
108 | inner: OnceState { | |
109 | poisoned: state == POISONED, | |
110 | set_state_to: Cell::new(COMPLETE), | |
111 | }, | |
112 | }; | |
113 | f(&f_state); | |
114 | waiter_queue.set_state_on_drop_to = f_state.inner.set_state_to.get(); | |
115 | return; | |
116 | } | |
117 | RUNNING | QUEUED => { | |
118 | // Set the state to QUEUED if it is not already. | |
119 | if state == RUNNING | |
120 | && let Err(new) = self.state.compare_exchange_weak(RUNNING, QUEUED, Relaxed, Acquire) | |
121 | { | |
122 | state = new; | |
123 | continue; | |
124 | } | |
125 | ||
126 | futex_wait(&self.state, QUEUED, None); | |
127 | state = self.state.load(Acquire); | |
128 | } | |
129 | COMPLETE => return, | |
130 | _ => unreachable!("state is never set to invalid values"), | |
131 | } | |
132 | } | |
133 | } | |
134 | } |