-// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
-// file at the top-level directory of this distribution and at
-// http://rust-lang.org/COPYRIGHT.
-//
-// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
-// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
-// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
-// option. This file may not be copied, modified, or distributed
-// except according to those terms.
-
//! A "once initialization" primitive
//!
//! This primitive is meant to be used to run one-time initialization. An
// initialization closure panics, the Once enters a "poisoned" state which means
// that all future calls will immediately panic as well.
//
-// So to implement this, one might first reach for a `StaticMutex`, but those
-// unfortunately need to be deallocated (e.g. call `destroy()`) to free memory
-// on all OSes (some of the BSDs allocate memory for mutexes). It also gets a
-// lot harder with poisoning to figure out when the mutex needs to be
-// deallocated because it's not after the closure finishes, but after the first
-// successful closure finishes.
+// So to implement this, one might first reach for a `Mutex`, but those cannot
+// be put into a `static`. It also gets a lot harder with poisoning to figure
+// out when the mutex needs to be deallocated because it's not after the closure
+// finishes, but after the first successful closure finishes.
//
// All in all, this is instead implemented with atomics and lock-free
// operations! Whee! Each `Once` has one word of atomic state, and this state is
//
// You'll find a few more details in the implementation, but that's the gist of
// it!
-
-use fmt;
-use marker;
-use ptr;
-use sync::atomic::{AtomicUsize, AtomicBool, Ordering};
-use thread::{self, Thread};
+//
+// Atomic orderings:
+// When running `Once` we deal with multiple atomics:
+// `Once.state_and_queue` and an unknown number of `Waiter.signaled`.
+// * `state_and_queue` is used (1) as a state flag, (2) for synchronizing the
+// result of the `Once`, and (3) for synchronizing `Waiter` nodes.
+// - At the end of the `call_inner` function we have to make sure the result
+// of the `Once` is acquired. So every load which can be the only one to
+// load COMPLETED must have at least Acquire ordering, which means all
+// three of them.
+// - `WaiterQueue::Drop` is the only place that may store COMPLETED, and
+// must do so with Release ordering to make the result available.
+// - `wait` inserts `Waiter` nodes as a pointer in `state_and_queue`, and
+// needs to make the nodes available with Release ordering. The load in
+// its `compare_and_swap` can be Relaxed because it only has to compare
+// the atomic, not to read other data.
+// - `WaiterQueue::Drop` must see the `Waiter` nodes, so it must load
+// `state_and_queue` with Acquire ordering.
+// - There is just one store where `state_and_queue` is used only as a
+// state flag, without having to synchronize data: switching the state
+// from INCOMPLETE to RUNNING in `call_inner`. This store can be Relaxed,
+// but the read has to be Acquire because of the requirements mentioned
+// above.
+// * `Waiter.signaled` is both used as a flag, and to protect a field with
+// interior mutability in `Waiter`. `Waiter.thread` is changed in
+// `WaiterQueue::Drop` which then sets `signaled` with Release ordering.
+// After `wait` loads `signaled` with Acquire and sees it is true, it needs to
+// see the changes to drop the `Waiter` struct correctly.
+// * There is one place where the two atomics `Once.state_and_queue` and
+// `Waiter.signaled` come together, and might be reordered by the compiler or
+// processor. Because both use Aquire ordering such a reordering is not
+// allowed, so no need for SeqCst.
+
+use crate::cell::Cell;
+use crate::fmt;
+use crate::marker;
+use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
+use crate::thread::{self, Thread};
/// A synchronization primitive which can be used to run a one-time global
/// initialization. Useful for one-time initialization for FFI or related
-/// functionality. This type can only be constructed with the [`ONCE_INIT`]
-/// value or the equivalent [`Once::new`] constructor.
+/// functionality. This type can only be constructed with the [`Once::new`]
+/// constructor.
///
-/// [`ONCE_INIT`]: constant.ONCE_INIT.html
/// [`Once::new`]: struct.Once.html#method.new
///
/// # Examples
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub struct Once {
- // This `state` word is actually an encoded version of just a pointer to a
- // `Waiter`, so we add the `PhantomData` appropriately.
- state: AtomicUsize,
- _marker: marker::PhantomData<*mut Waiter>,
+ // `state_and_queue` is actually an a pointer to a `Waiter` with extra state
+ // bits, so we add the `PhantomData` appropriately.
+ state_and_queue: AtomicUsize,
+ _marker: marker::PhantomData<*const Waiter>,
}
// The `PhantomData` of a raw pointer removes these two auto traits, but we
/// static START: Once = ONCE_INIT;
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
+#[rustc_deprecated(
+ since = "1.38.0",
+ reason = "the `new` function is now preferred",
+ suggestion = "Once::new()"
+)]
pub const ONCE_INIT: Once = Once::new();
-// Four states that a Once can be in, encoded into the lower bits of `state` in
-// the Once structure.
+// Four states that a Once can be in, encoded into the lower bits of
+// `state_and_queue` in the Once structure.
const INCOMPLETE: usize = 0x0;
const POISONED: usize = 0x1;
const RUNNING: usize = 0x2;
// this is in the RUNNING state.
const STATE_MASK: usize = 0x3;
-// Representation of a node in the linked list of waiters in the RUNNING state.
+// Representation of a node in the linked list of waiters, used while in the
+// RUNNING state.
+// Note: `Waiter` can't hold a mutable pointer to the next thread, because then
+// `wait` would both hand out a mutable reference to its `Waiter` node, and keep
+// a shared reference to check `signaled`. Instead we hold shared references and
+// use interior mutability.
+#[repr(align(4))] // Ensure the two lower bits are free to use as state bits.
struct Waiter {
- thread: Option<Thread>,
+ thread: Cell<Option<Thread>>,
signaled: AtomicBool,
- next: *mut Waiter,
+ next: *const Waiter,
}
-// Helper struct used to clean up after a closure call with a `Drop`
-// implementation to also run on panic.
-struct Finish<'a> {
- panicked: bool,
- me: &'a Once,
+// Head of a linked list of waiters.
+// Every node is a struct on the stack of a waiting thread.
+// Will wake up the waiters when it gets dropped, i.e. also on panic.
+struct WaiterQueue<'a> {
+ state_and_queue: &'a AtomicUsize,
+ set_state_on_drop_to: usize,
}
impl Once {
/// Creates a new `Once` value.
#[stable(feature = "once_new", since = "1.2.0")]
+ #[rustc_const_stable(feature = "const_once_new", since = "1.32.0")]
pub const fn new() -> Once {
- Once {
- state: AtomicUsize::new(INCOMPLETE),
- _marker: marker::PhantomData,
- }
+ Once { state_and_queue: AtomicUsize::new(INCOMPLETE), _marker: marker::PhantomData }
}
/// Performs an initialization routine once and only once. The given closure
/// happens-before relation between the closure and code executing after the
/// return).
///
+ /// If the given closure recursively invokes `call_once` on the same `Once`
+ /// instance the exact behavior is not specified, allowed outcomes are
+ /// a panic or a deadlock.
+ ///
/// # Examples
///
/// ```
/// static INIT: Once = Once::new();
///
/// // Accessing a `static mut` is unsafe much of the time, but if we do so
- /// // in a synchronized fashion (e.g. write once or read all) then we're
+ /// // in a synchronized fashion (e.g., write once or read all) then we're
/// // good to go!
/// //
/// // This function will only call `expensive_computation` once, and will
///
/// [poison]: struct.Mutex.html#poisoning
#[stable(feature = "rust1", since = "1.0.0")]
- pub fn call_once<F>(&self, f: F) where F: FnOnce() {
- // Fast path, just see if we've completed initialization.
- // An `Acquire` load is enough because that makes all the initialization
- // operations visible to us. The cold path uses SeqCst consistently
- // because the performance difference really does not matter there,
- // and SeqCst minimizes the chances of something going wrong.
- if self.state.load(Ordering::Acquire) == COMPLETE {
- return
+ pub fn call_once<F>(&self, f: F)
+ where
+ F: FnOnce(),
+ {
+ // Fast path check
+ if self.is_completed() {
+ return;
}
let mut f = Some(f);
/// Performs the same function as [`call_once`] except ignores poisoning.
///
- /// Unlike [`call_once`], if this `Once` has been poisoned (i.e. a previous
+ /// Unlike [`call_once`], if this `Once` has been poisoned (i.e., a previous
/// call to `call_once` or `call_once_force` caused a panic), calling
/// `call_once_force` will still invoke the closure `f` and will _not_
/// result in an immediate panic. If `f` panics, the `Once` will remain
/// in a poison state. If `f` does _not_ panic, the `Once` will no
/// longer be in a poison state and all future calls to `call_once` or
- /// `call_one_force` will no-op.
+ /// `call_one_force` will be no-ops.
///
/// The closure `f` is yielded a [`OnceState`] structure which can be used
/// to query the poison status of the `Once`.
/// INIT.call_once(|| {});
/// ```
#[unstable(feature = "once_poison", issue = "33577")]
- pub fn call_once_force<F>(&self, f: F) where F: FnOnce(&OnceState) {
- // same as above, just with a different parameter to `call_inner`.
- // An `Acquire` load is enough because that makes all the initialization
- // operations visible to us. The cold path uses SeqCst consistently
- // because the performance difference really does not matter there,
- // and SeqCst minimizes the chances of something going wrong.
- if self.state.load(Ordering::Acquire) == COMPLETE {
- return
+ pub fn call_once_force<F>(&self, f: F)
+ where
+ F: FnOnce(&OnceState),
+ {
+ // Fast path check
+ if self.is_completed() {
+ return;
}
let mut f = Some(f);
- self.call_inner(true, &mut |p| {
- f.take().unwrap()(&OnceState { poisoned: p })
- });
+ self.call_inner(true, &mut |p| f.take().unwrap()(&OnceState { poisoned: p }));
+ }
+
+ /// Returns `true` if some `call_once` call has completed
+ /// successfully. Specifically, `is_completed` will return false in
+ /// the following situations:
+ /// * `call_once` was not called at all,
+ /// * `call_once` was called, but has not yet completed,
+ /// * the `Once` instance is poisoned
+ ///
+ /// This function returning `false` does not mean that `Once` has not been
+ /// executed. For example, it may have been executed in the time between
+ /// when `is_completed` starts executing and when it returns, in which case
+ /// the `false` return value would be stale (but still permissible).
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::Once;
+ ///
+ /// static INIT: Once = Once::new();
+ ///
+ /// assert_eq!(INIT.is_completed(), false);
+ /// INIT.call_once(|| {
+ /// assert_eq!(INIT.is_completed(), false);
+ /// });
+ /// assert_eq!(INIT.is_completed(), true);
+ /// ```
+ ///
+ /// ```
+ /// use std::sync::Once;
+ /// use std::thread;
+ ///
+ /// static INIT: Once = Once::new();
+ ///
+ /// assert_eq!(INIT.is_completed(), false);
+ /// let handle = thread::spawn(|| {
+ /// INIT.call_once(|| panic!());
+ /// });
+ /// assert!(handle.join().is_err());
+ /// assert_eq!(INIT.is_completed(), false);
+ /// ```
+ #[stable(feature = "once_is_completed", since = "1.43.0")]
+ #[inline]
+ pub fn is_completed(&self) -> bool {
+ // An `Acquire` load is enough because that makes all the initialization
+ // operations visible to us, and, this being a fast path, weaker
+ // ordering helps with performance. This `Acquire` synchronizes with
+ // `Release` operations on the slow path.
+ self.state_and_queue.load(Ordering::Acquire) == COMPLETE
}
// This is a non-generic function to reduce the monomorphization cost of
// currently no way to take an `FnOnce` and call it via virtual dispatch
// without some allocation overhead.
#[cold]
- fn call_inner(&self,
- ignore_poisoning: bool,
- init: &mut dyn FnMut(bool)) {
- let mut state = self.state.load(Ordering::SeqCst);
-
- 'outer: loop {
- match state {
- // If we're complete, then there's nothing to do, we just
- // jettison out as we shouldn't run the closure.
- COMPLETE => return,
-
- // If we're poisoned and we're not in a mode to ignore
- // poisoning, then we panic here to propagate the poison.
+ fn call_inner(&self, ignore_poisoning: bool, init: &mut dyn FnMut(bool)) {
+ let mut state_and_queue = self.state_and_queue.load(Ordering::Acquire);
+ loop {
+ match state_and_queue {
+ COMPLETE => break,
POISONED if !ignore_poisoning => {
+ // Panic to propagate the poison.
panic!("Once instance has previously been poisoned");
}
-
- // Otherwise if we see a poisoned or otherwise incomplete state
- // we will attempt to move ourselves into the RUNNING state. If
- // we succeed, then the queue of waiters starts at null (all 0
- // bits).
- POISONED |
- INCOMPLETE => {
- let old = self.state.compare_and_swap(state, RUNNING,
- Ordering::SeqCst);
- if old != state {
- state = old;
- continue
+ POISONED | INCOMPLETE => {
+ // Try to register this thread as the one RUNNING.
+ let old = self.state_and_queue.compare_and_swap(
+ state_and_queue,
+ RUNNING,
+ Ordering::Acquire,
+ );
+ if old != state_and_queue {
+ state_and_queue = old;
+ continue;
}
-
- // Run the initialization routine, letting it know if we're
- // poisoned or not. The `Finish` struct is then dropped, and
- // the `Drop` implementation here is responsible for waking
- // up other waiters both in the normal return and panicking
- // case.
- let mut complete = Finish {
- panicked: true,
- me: self,
+ // `waiter_queue` will manage other waiting threads, and
+ // wake them up on drop.
+ let mut waiter_queue = WaiterQueue {
+ state_and_queue: &self.state_and_queue,
+ set_state_on_drop_to: POISONED,
};
- init(state == POISONED);
- complete.panicked = false;
- return
+ // Run the initialization function, letting it know if we're
+ // poisoned or not.
+ init(state_and_queue == POISONED);
+ waiter_queue.set_state_on_drop_to = COMPLETE;
+ break;
}
-
- // All other values we find should correspond to the RUNNING
- // state with an encoded waiter list in the more significant
- // bits. We attempt to enqueue ourselves by moving us to the
- // head of the list and bail out if we ever see a state that's
- // not RUNNING.
_ => {
- assert!(state & STATE_MASK == RUNNING);
- let mut node = Waiter {
- thread: Some(thread::current()),
- signaled: AtomicBool::new(false),
- next: ptr::null_mut(),
- };
- let me = &mut node as *mut Waiter as usize;
- assert!(me & STATE_MASK == 0);
-
- while state & STATE_MASK == RUNNING {
- node.next = (state & !STATE_MASK) as *mut Waiter;
- let old = self.state.compare_and_swap(state,
- me | RUNNING,
- Ordering::SeqCst);
- if old != state {
- state = old;
- continue
- }
-
- // Once we've enqueued ourselves, wait in a loop.
- // Afterwards reload the state and continue with what we
- // were doing from before.
- while !node.signaled.load(Ordering::SeqCst) {
- thread::park();
- }
- state = self.state.load(Ordering::SeqCst);
- continue 'outer
- }
+ // All other values must be RUNNING with possibly a
+ // pointer to the waiter queue in the more significant bits.
+ assert!(state_and_queue & STATE_MASK == RUNNING);
+ wait(&self.state_and_queue, state_and_queue);
+ state_and_queue = self.state_and_queue.load(Ordering::Acquire);
}
}
}
}
}
+fn wait(state_and_queue: &AtomicUsize, mut current_state: usize) {
+ // Note: the following code was carefully written to avoid creating a
+ // mutable reference to `node` that gets aliased.
+ loop {
+ // Don't queue this thread if the status is no longer running,
+ // otherwise we will not be woken up.
+ if current_state & STATE_MASK != RUNNING {
+ return;
+ }
+
+ // Create the node for our current thread.
+ let node = Waiter {
+ thread: Cell::new(Some(thread::current())),
+ signaled: AtomicBool::new(false),
+ next: (current_state & !STATE_MASK) as *const Waiter,
+ };
+ let me = &node as *const Waiter as usize;
+
+ // Try to slide in the node at the head of the linked list, making sure
+ // that another thread didn't just replace the head of the linked list.
+ let old = state_and_queue.compare_and_swap(current_state, me | RUNNING, Ordering::Release);
+ if old != current_state {
+ current_state = old;
+ continue;
+ }
+
+ // We have enqueued ourselves, now lets wait.
+ // It is important not to return before being signaled, otherwise we
+ // would drop our `Waiter` node and leave a hole in the linked list
+ // (and a dangling reference). Guard against spurious wakeups by
+ // reparking ourselves until we are signaled.
+ while !node.signaled.load(Ordering::Acquire) {
+ // If the managing thread happens to signal and unpark us before we
+ // can park ourselves, the result could be this thread never gets
+ // unparked. Luckily `park` comes with the guarantee that if it got
+ // an `unpark` just before on an unparked thread is does not park.
+ thread::park();
+ }
+ break;
+ }
+}
+
#[stable(feature = "std_debug", since = "1.16.0")]
impl fmt::Debug for Once {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Once { .. }")
}
}
-impl<'a> Drop for Finish<'a> {
+impl Drop for WaiterQueue<'_> {
fn drop(&mut self) {
- // Swap out our state with however we finished. We should only ever see
- // an old state which was RUNNING.
- let queue = if self.panicked {
- self.me.state.swap(POISONED, Ordering::SeqCst)
- } else {
- self.me.state.swap(COMPLETE, Ordering::SeqCst)
- };
- assert_eq!(queue & STATE_MASK, RUNNING);
+ // Swap out our state with however we finished.
+ let state_and_queue =
+ self.state_and_queue.swap(self.set_state_on_drop_to, Ordering::AcqRel);
+
+ // We should only ever see an old state which was RUNNING.
+ assert_eq!(state_and_queue & STATE_MASK, RUNNING);
- // Decode the RUNNING to a list of waiters, then walk that entire list
- // and wake them up. Note that it is crucial that after we store `true`
- // in the node it can be free'd! As a result we load the `thread` to
- // signal ahead of time and then unpark it after the store.
+ // Walk the entire linked list of waiters and wake them up (in lifo
+ // order, last to register is first to wake up).
unsafe {
- let mut queue = (queue & !STATE_MASK) as *mut Waiter;
+ // Right after setting `node.signaled = true` the other thread may
+ // free `node` if there happens to be has a spurious wakeup.
+ // So we have to take out the `thread` field and copy the pointer to
+ // `next` first.
+ let mut queue = (state_and_queue & !STATE_MASK) as *const Waiter;
while !queue.is_null() {
let next = (*queue).next;
- let thread = (*queue).thread.take().unwrap();
- (*queue).signaled.store(true, Ordering::SeqCst);
- thread.unpark();
+ let thread = (*queue).thread.replace(None).unwrap();
+ (*queue).signaled.store(true, Ordering::Release);
+ // ^- FIXME (maybe): This is another case of issue #55005
+ // `store()` has a potentially dangling ref to `signaled`.
queue = next;
+ thread.unpark();
}
}
}
}
impl OnceState {
- /// Returns whether the associated [`Once`] was poisoned prior to the
+ /// Returns `true` if the associated [`Once`] was poisoned prior to the
/// invocation of the closure passed to [`call_once_force`].
///
/// [`call_once_force`]: struct.Once.html#method.call_once_force
#[cfg(all(test, not(target_os = "emscripten")))]
mod tests {
- use panic;
- use sync::mpsc::channel;
- use thread;
use super::Once;
+ use crate::panic;
+ use crate::sync::mpsc::channel;
+ use crate::thread;
#[test]
fn smoke_once() {
let (tx, rx) = channel();
for _ in 0..10 {
let tx = tx.clone();
- thread::spawn(move|| {
- for _ in 0..4 { thread::yield_now() }
+ thread::spawn(move || {
+ for _ in 0..4 {
+ thread::yield_now()
+ }
unsafe {
O.call_once(|| {
assert!(!RUN);
assert!(t1.join().is_ok());
assert!(t2.join().is_ok());
-
}
}