]>
Commit | Line | Data |
---|---|---|
2b03887a FG |
1 | use crate::cell::Cell; |
2 | use crate::sync as public; | |
3 | use crate::sync::atomic::{ | |
4 | AtomicU32, | |
5 | Ordering::{Acquire, Relaxed, Release}, | |
6 | }; | |
9ffffee4 | 7 | use crate::sync::once::ExclusiveState; |
2b03887a FG |
8 | use crate::sys::futex::{futex_wait, futex_wake_all}; |
9 | ||
10 | // On some platforms, the OS is very nice and handles the waiter queue for us. | |
11 | // This means we only need one atomic value with 5 states: | |
12 | ||
13 | /// No initialization has run yet, and no thread is currently using the Once. | |
14 | const INCOMPLETE: u32 = 0; | |
15 | /// Some thread has previously attempted to initialize the Once, but it panicked, | |
16 | /// so the Once is now poisoned. There are no other threads currently accessing | |
17 | /// this Once. | |
18 | const POISONED: u32 = 1; | |
19 | /// Some thread is currently attempting to run initialization. It may succeed, | |
20 | /// so all future threads need to wait for it to finish. | |
21 | const RUNNING: u32 = 2; | |
22 | /// Some thread is currently attempting to run initialization and there are threads | |
23 | /// waiting for it to finish. | |
24 | const QUEUED: u32 = 3; | |
25 | /// Initialization has completed and all future calls should finish immediately. | |
26 | const COMPLETE: u32 = 4; | |
27 | ||
28 | // Threads wait by setting the state to QUEUED and calling `futex_wait` on the state | |
29 | // variable. When the running thread finishes, it will wake all waiting threads using | |
30 | // `futex_wake_all`. | |
31 | ||
32 | pub struct OnceState { | |
33 | poisoned: bool, | |
34 | set_state_to: Cell<u32>, | |
35 | } | |
36 | ||
37 | impl OnceState { | |
38 | #[inline] | |
39 | pub fn is_poisoned(&self) -> bool { | |
40 | self.poisoned | |
41 | } | |
42 | ||
43 | #[inline] | |
44 | pub fn poison(&self) { | |
45 | self.set_state_to.set(POISONED); | |
46 | } | |
47 | } | |
48 | ||
49 | struct CompletionGuard<'a> { | |
50 | state: &'a AtomicU32, | |
51 | set_state_on_drop_to: u32, | |
52 | } | |
53 | ||
54 | impl<'a> Drop for CompletionGuard<'a> { | |
55 | fn drop(&mut self) { | |
56 | // Use release ordering to propagate changes to all threads checking | |
57 | // up on the Once. `futex_wake_all` does its own synchronization, hence | |
58 | // we do not need `AcqRel`. | |
59 | if self.state.swap(self.set_state_on_drop_to, Release) == QUEUED { | |
60 | futex_wake_all(&self.state); | |
61 | } | |
62 | } | |
63 | } | |
64 | ||
65 | pub struct Once { | |
66 | state: AtomicU32, | |
67 | } | |
68 | ||
69 | impl Once { | |
70 | #[inline] | |
71 | pub const fn new() -> Once { | |
72 | Once { state: AtomicU32::new(INCOMPLETE) } | |
73 | } | |
74 | ||
75 | #[inline] | |
76 | pub fn is_completed(&self) -> bool { | |
77 | // Use acquire ordering to make all initialization changes visible to the | |
78 | // current thread. | |
79 | self.state.load(Acquire) == COMPLETE | |
80 | } | |
81 | ||
9ffffee4 FG |
82 | #[inline] |
83 | pub(crate) fn state(&mut self) -> ExclusiveState { | |
84 | match *self.state.get_mut() { | |
85 | INCOMPLETE => ExclusiveState::Incomplete, | |
86 | POISONED => ExclusiveState::Poisoned, | |
87 | COMPLETE => ExclusiveState::Complete, | |
88 | _ => unreachable!("invalid Once state"), | |
89 | } | |
90 | } | |
91 | ||
2b03887a FG |
92 | // This uses FnMut to match the API of the generic implementation. As this |
93 | // implementation is quite light-weight, it is generic over the closure and | |
94 | // so avoids the cost of dynamic dispatch. | |
95 | #[cold] | |
96 | #[track_caller] | |
97 | pub fn call(&self, ignore_poisoning: bool, f: &mut impl FnMut(&public::OnceState)) { | |
98 | let mut state = self.state.load(Acquire); | |
99 | loop { | |
100 | match state { | |
101 | POISONED if !ignore_poisoning => { | |
102 | // Panic to propagate the poison. | |
103 | panic!("Once instance has previously been poisoned"); | |
104 | } | |
105 | INCOMPLETE | POISONED => { | |
106 | // Try to register the current thread as the one running. | |
107 | if let Err(new) = | |
108 | self.state.compare_exchange_weak(state, RUNNING, Acquire, Acquire) | |
109 | { | |
110 | state = new; | |
111 | continue; | |
112 | } | |
113 | // `waiter_queue` will manage other waiting threads, and | |
114 | // wake them up on drop. | |
115 | let mut waiter_queue = | |
116 | CompletionGuard { state: &self.state, set_state_on_drop_to: POISONED }; | |
117 | // Run the function, letting it know if we're poisoned or not. | |
118 | let f_state = public::OnceState { | |
119 | inner: OnceState { | |
120 | poisoned: state == POISONED, | |
121 | set_state_to: Cell::new(COMPLETE), | |
122 | }, | |
123 | }; | |
124 | f(&f_state); | |
125 | waiter_queue.set_state_on_drop_to = f_state.inner.set_state_to.get(); | |
126 | return; | |
127 | } | |
128 | RUNNING | QUEUED => { | |
129 | // Set the state to QUEUED if it is not already. | |
130 | if state == RUNNING | |
131 | && let Err(new) = self.state.compare_exchange_weak(RUNNING, QUEUED, Relaxed, Acquire) | |
132 | { | |
133 | state = new; | |
134 | continue; | |
135 | } | |
136 | ||
137 | futex_wait(&self.state, QUEUED, None); | |
138 | state = self.state.load(Acquire); | |
139 | } | |
140 | COMPLETE => return, | |
141 | _ => unreachable!("state is never set to invalid values"), | |
142 | } | |
143 | } | |
144 | } | |
145 | } |