//! Ideally each instance of concurrent data structure may have its own queue that gets fully
//! destroyed as soon as the data structure gets dropped.
-use core::cell::{Cell, UnsafeCell};
+use crate::primitive::cell::UnsafeCell;
+use crate::primitive::sync::atomic;
+use core::cell::Cell;
use core::mem::{self, ManuallyDrop};
use core::num::Wrapping;
-use core::sync::atomic;
use core::sync::atomic::Ordering;
use core::{fmt, ptr};
use crate::sync::queue::Queue;
/// Maximum number of objects a bag can contain.
-#[cfg(not(feature = "sanitize"))]
+#[cfg(not(crossbeam_sanitize))]
const MAX_OBJECTS: usize = 62;
-#[cfg(feature = "sanitize")]
+#[cfg(crossbeam_sanitize)]
const MAX_OBJECTS: usize = 4;
/// A bag of deferred functions.
-pub struct Bag {
+pub(crate) struct Bag {
/// Stashed objects.
deferreds: [Deferred; MAX_OBJECTS],
len: usize,
impl Bag {
/// Returns a new, empty bag.
- pub fn new() -> Self {
+ pub(crate) fn new() -> Self {
Self::default()
}
/// Returns `true` if the bag is empty.
- pub fn is_empty(&self) -> bool {
+ pub(crate) fn is_empty(&self) -> bool {
self.len == 0
}
/// # Safety
///
/// It should be safe for another thread to execute the given function.
- pub unsafe fn try_push(&mut self, deferred: Deferred) -> Result<(), Deferred> {
+ pub(crate) unsafe fn try_push(&mut self, deferred: Deferred) -> Result<(), Deferred> {
if self.len < MAX_OBJECTS {
self.deferreds[self.len] = deferred;
self.len += 1;
#[rustfmt::skip]
fn default() -> Self {
// TODO: [no_op; MAX_OBJECTS] syntax blocked by https://github.com/rust-lang/rust/issues/49147
- #[cfg(not(feature = "sanitize"))]
+ #[cfg(not(crossbeam_sanitize))]
return Bag {
len: 0,
deferreds: [
Deferred::new(no_op_func),
],
};
- #[cfg(feature = "sanitize")]
+ #[cfg(crossbeam_sanitize)]
return Bag {
len: 0,
deferreds: [
}
/// The global data for a garbage collector.
-pub struct Global {
+pub(crate) struct Global {
/// The intrusive linked list of `Local`s.
locals: List<Local>,
/// Creates a new global data for garbage collection.
#[inline]
- pub fn new() -> Self {
+ pub(crate) fn new() -> Self {
Self {
locals: List::new(),
queue: Queue::new(),
}
/// Pushes the bag into the global queue and replaces the bag with a new empty bag.
- pub fn push_bag(&self, bag: &mut Bag, guard: &Guard) {
+ pub(crate) fn push_bag(&self, bag: &mut Bag, guard: &Guard) {
let bag = mem::replace(bag, Bag::new());
atomic::fence(Ordering::SeqCst);
/// path. In other words, we want the compiler to optimize branching for the case when
/// `collect()` is not called.
#[cold]
- pub fn collect(&self, guard: &Guard) {
+ pub(crate) fn collect(&self, guard: &Guard) {
let global_epoch = self.try_advance(guard);
- let steps = if cfg!(feature = "sanitize") {
+ let steps = if cfg!(crossbeam_sanitize) {
usize::max_value()
} else {
Self::COLLECT_STEPS
///
/// `try_advance()` is annotated `#[cold]` because it is rarely called.
#[cold]
- pub fn try_advance(&self, guard: &Guard) -> Epoch {
+ pub(crate) fn try_advance(&self, guard: &Guard) -> Epoch {
let global_epoch = self.epoch.load(Ordering::Relaxed);
atomic::fence(Ordering::SeqCst);
}
/// Participant for garbage collection.
-pub struct Local {
+pub(crate) struct Local {
/// A node in the intrusive linked list of `Local`s.
entry: Entry,
// Make sure `Local` is less than or equal to 2048 bytes.
// https://github.com/crossbeam-rs/crossbeam/issues/551
+#[cfg(not(crossbeam_sanitize))] // `crossbeam_sanitize` reduces the size of `Local`
#[test]
fn local_size() {
- assert!(core::mem::size_of::<Local>() <= 2048, "An allocation of `Local` should be <= 2048 bytes.");
+ assert!(
+ core::mem::size_of::<Local>() <= 2048,
+ "An allocation of `Local` should be <= 2048 bytes."
+ );
}
impl Local {
const PINNINGS_BETWEEN_COLLECT: usize = 128;
/// Registers a new `Local` in the provided `Global`.
- pub fn register(collector: &Collector) -> LocalHandle {
+ pub(crate) fn register(collector: &Collector) -> LocalHandle {
unsafe {
// Since we dereference no pointers in this block, it is safe to use `unprotected`.
/// Returns a reference to the `Global` in which this `Local` resides.
#[inline]
- pub fn global(&self) -> &Global {
+ pub(crate) fn global(&self) -> &Global {
&self.collector().global
}
/// Returns a reference to the `Collector` in which this `Local` resides.
#[inline]
- pub fn collector(&self) -> &Collector {
- unsafe { &**self.collector.get() }
+ pub(crate) fn collector(&self) -> &Collector {
+ self.collector.with(|c| unsafe { &**c })
}
/// Returns `true` if the current participant is pinned.
#[inline]
- pub fn is_pinned(&self) -> bool {
+ pub(crate) fn is_pinned(&self) -> bool {
self.guard_count.get() > 0
}
/// # Safety
///
/// It should be safe for another thread to execute the given function.
- pub unsafe fn defer(&self, mut deferred: Deferred, guard: &Guard) {
- let bag = &mut *self.bag.get();
+ pub(crate) unsafe fn defer(&self, mut deferred: Deferred, guard: &Guard) {
+ let bag = self.bag.with_mut(|b| &mut *b);
while let Err(d) = bag.try_push(deferred) {
self.global().push_bag(bag, guard);
}
}
- pub fn flush(&self, guard: &Guard) {
- let bag = unsafe { &mut *self.bag.get() };
+ pub(crate) fn flush(&self, guard: &Guard) {
+ let bag = self.bag.with_mut(|b| unsafe { &mut *b });
if !bag.is_empty() {
self.global().push_bag(bag, guard);
/// Pins the `Local`.
#[inline]
- pub fn pin(&self) -> Guard {
+ pub(crate) fn pin(&self) -> Guard {
let guard = Guard { local: self };
let guard_count = self.guard_count.get();
// a `SeqCst` fence.
//
// 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
- // 2. `_.compare_and_swap(_, _, SeqCst)`, which compiles into a `lock cmpxchg`
+ // 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg`
// instruction.
//
// Both instructions have the effect of a full barrier, but benchmarks have shown
// works fine. Using inline assembly would be a viable (and correct) alternative,
// but alas, that is not possible on stable Rust.
let current = Epoch::starting();
- let previous = self
- .epoch
- .compare_and_swap(current, new_epoch, Ordering::SeqCst);
- debug_assert_eq!(current, previous, "participant was expected to be unpinned");
+ let res = self.epoch.compare_exchange(
+ current,
+ new_epoch,
+ Ordering::SeqCst,
+ Ordering::SeqCst,
+ );
+ debug_assert!(res.is_ok(), "participant was expected to be unpinned");
// We add a compiler fence to make it less likely for LLVM to do something wrong
// here. Formally, this is not enough to get rid of data races; practically,
// it should go a long way.
/// Unpins the `Local`.
#[inline]
- pub fn unpin(&self) {
+ pub(crate) fn unpin(&self) {
let guard_count = self.guard_count.get();
self.guard_count.set(guard_count - 1);
/// Unpins and then pins the `Local`.
#[inline]
- pub fn repin(&self) {
+ pub(crate) fn repin(&self) {
let guard_count = self.guard_count.get();
// Update the local epoch only if there's only one guard.
/// Increments the handle count.
#[inline]
- pub fn acquire_handle(&self) {
+ pub(crate) fn acquire_handle(&self) {
let handle_count = self.handle_count.get();
debug_assert!(handle_count >= 1);
self.handle_count.set(handle_count + 1);
/// Decrements the handle count.
#[inline]
- pub fn release_handle(&self) {
+ pub(crate) fn release_handle(&self) {
let guard_count = self.guard_count.get();
let handle_count = self.handle_count.get();
debug_assert!(handle_count >= 1);
// Pin and move the local bag into the global queue. It's important that `push_bag`
// doesn't defer destruction on any new garbage.
let guard = &self.pin();
- self.global().push_bag(&mut *self.bag.get(), guard);
+ self.global()
+ .push_bag(self.bag.with_mut(|b| &mut *b), guard);
}
// Revert the handle count back to zero.
self.handle_count.set(0);
// Take the reference to the `Global` out of this `Local`. Since we're not protected
// by a guard at this time, it's crucial that the reference is read before marking the
// `Local` as deleted.
- let collector: Collector = ptr::read(&*(*self.collector.get()));
+ let collector: Collector = ptr::read(self.collector.with(|c| &*(*c)));
// Mark this node in the linked list as deleted.
self.entry.delete(unprotected());
}
}
-#[cfg(test)]
+#[cfg(all(test, not(crossbeam_loom)))]
mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};