use std::marker::PhantomData;
use std::mem::{self, ManuallyDrop};
use std::ptr;
-use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
+use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
use std::time::Instant;
-use crossbeam_epoch::{self as epoch, Atomic, Guard, Owned, Shared};
-use crossbeam_utils::CachePadded;
-
-use internal::channel::RecvNonblocking;
-use internal::context::Context;
-use internal::select::{Operation, Selected, SelectHandle, Token};
-use internal::utils::Backoff;
-use internal::waker::SyncWaker;
-
-// TODO: Allocate less memory in the beginning. Blocks should start small and grow exponentially.
-
-/// The maximum number of messages a block can hold.
-const BLOCK_CAP: usize = 32;
+use crossbeam_utils::{Backoff, CachePadded};
+
+use context::Context;
+use err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
+use select::{Operation, SelectHandle, Selected, Token};
+use waker::SyncWaker;
+
+// TODO(stjepang): Once we bump the minimum required Rust version to 1.28 or newer, re-apply the
+// following changes by @kleimkuhler:
+//
+// 1. https://github.com/crossbeam-rs/crossbeam-channel/pull/100
+// 2. https://github.com/crossbeam-rs/crossbeam-channel/pull/101
+
+// Bits indicating the state of a slot:
+// * If a message has been written into the slot, `WRITE` is set.
+// * If a message has been read from the slot, `READ` is set.
+// * If the block is being destroyed, `DESTROY` is set.
+const WRITE: usize = 1;
+const READ: usize = 2;
+const DESTROY: usize = 4;
+
+// Each block covers one "lap" of indices.
+const LAP: usize = 32;
+// The maximum number of messages a block can hold.
+const BLOCK_CAP: usize = LAP - 1;
+// How many lower bits are reserved for metadata.
+const SHIFT: usize = 1;
+// Has two different purposes:
+// * If set in head, indicates that the block is not the last one.
+// * If set in tail, indicates that the channel is disconnected.
+const MARK_BIT: usize = 1;
/// A slot in a block.
struct Slot<T> {
/// The message.
msg: UnsafeCell<ManuallyDrop<T>>,
- /// Equals `true` if the message is ready for reading.
- ready: AtomicBool,
+ /// The state of the slot.
+ state: AtomicUsize,
}
-/// The token type for the list flavor.
-pub struct ListToken {
- /// Slot to read from or write to.
- slot: *const u8,
-
- /// Guard keeping alive the block that contains the slot.
- guard: Option<Guard>,
-}
-
-impl Default for ListToken {
- #[inline]
- fn default() -> Self {
- ListToken {
- slot: ptr::null(),
- guard: None,
+impl<T> Slot<T> {
+ /// Waits until a message is written into the slot.
+ fn wait_write(&self) {
+ let backoff = Backoff::new();
+ while self.state.load(Ordering::Acquire) & WRITE == 0 {
+ backoff.snooze();
}
}
}
///
/// Each block in the list can hold up to `BLOCK_CAP` messages.
struct Block<T> {
- /// The start index of this block.
- ///
- /// Slots in this block have indices in `start_index .. start_index + BLOCK_CAP`.
- start_index: usize,
-
/// The next block in the linked list.
- next: Atomic<Block<T>>,
+ next: AtomicPtr<Block<T>>,
/// Slots for messages.
- slots: [UnsafeCell<Slot<T>>; BLOCK_CAP],
+ slots: [Slot<T>; BLOCK_CAP],
}
impl<T> Block<T> {
- /// Creates an empty block that starts at `start_index`.
- fn new(start_index: usize) -> Block<T> {
- Block {
- start_index,
- slots: unsafe { mem::zeroed() },
- next: Atomic::null(),
+ /// Creates an empty block.
+ fn new() -> Block<T> {
+ unsafe { mem::zeroed() }
+ }
+
+ /// Waits until the next pointer is set.
+ fn wait_next(&self) -> *mut Block<T> {
+ let backoff = Backoff::new();
+ loop {
+ let next = self.next.load(Ordering::Acquire);
+ if !next.is_null() {
+ return next;
+ }
+ backoff.snooze();
+ }
+ }
+
+ /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
+ unsafe fn destroy(this: *mut Block<T>, start: usize) {
+ // It is not necessary to set the `DESTROY bit in the last slot because that slot has begun
+ // destruction of the block.
+ for i in start..BLOCK_CAP - 1 {
+ let slot = (*this).slots.get_unchecked(i);
+
+ // Mark the `DESTROY` bit if a thread is still using the slot.
+ if slot.state.load(Ordering::Acquire) & READ == 0
+ && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
+ {
+ // If a thread is still using the slot, it will continue destruction of the block.
+ return;
+ }
}
+
+ // No thread is using the block, now it is safe to destroy it.
+ drop(Box::from_raw(this));
}
}
-/// Position in the channel (index and block).
-///
-/// This struct describes the current position of the head or the tail in a linked list.
+/// A position in a channel.
+#[derive(Debug)]
struct Position<T> {
/// The index in the channel.
index: AtomicUsize,
/// The block in the linked list.
- block: Atomic<Block<T>>,
+ block: AtomicPtr<Block<T>>,
+}
+
+/// The token type for the list flavor.
+pub struct ListToken {
+ /// The block of slots.
+ block: *const u8,
+
+ /// The offset into the block.
+ offset: usize,
+}
+
+impl Default for ListToken {
+ #[inline]
+ fn default() -> Self {
+ ListToken {
+ block: ptr::null(),
+ offset: 0,
+ }
+ }
}
/// Unbounded channel implemented as a linked list.
/// The tail of the channel.
tail: CachePadded<Position<T>>,
- /// Equals `true` when the channel is closed.
- is_closed: AtomicBool,
-
- /// Receivers waiting while the channel is empty and not closed.
+ /// Receivers waiting while the channel is empty and not disconnected.
receivers: SyncWaker,
- /// Indicates that dropping a `Channel<T>` may drop values of type `T`.
+ /// Indicates that dropping a `Channel<T>` may drop messages of type `T`.
_marker: PhantomData<T>,
}
impl<T> Channel<T> {
/// Creates a new unbounded channel.
pub fn new() -> Self {
- let channel = Channel {
+ Channel {
head: CachePadded::new(Position {
+ block: AtomicPtr::new(ptr::null_mut()),
index: AtomicUsize::new(0),
- block: Atomic::null(),
}),
tail: CachePadded::new(Position {
+ block: AtomicPtr::new(ptr::null_mut()),
index: AtomicUsize::new(0),
- block: Atomic::null(),
}),
- is_closed: AtomicBool::new(false),
receivers: SyncWaker::new(),
_marker: PhantomData,
- };
-
- // Allocate an empty block for the first batch of messages.
- let block = unsafe { Owned::new(Block::new(0)).into_shared(epoch::unprotected()) };
- channel.head.block.store(block, Ordering::Relaxed);
- channel.tail.block.store(block, Ordering::Relaxed);
-
- channel
+ }
}
/// Returns a receiver handle to the channel.
Sender(self)
}
- /// Writes a message into the channel.
- pub fn write(&self, _token: &mut Token, msg: T) {
- let guard = epoch::pin();
- let mut backoff = Backoff::new();
+ /// Attempts to reserve a slot for sending a message.
+ fn start_send(&self, token: &mut Token) -> bool {
+ let backoff = Backoff::new();
+ let mut tail = self.tail.index.load(Ordering::Acquire);
+ let mut block = self.tail.block.load(Ordering::Acquire);
+ let mut next_block = None;
loop {
- // These two load operations don't have to be `SeqCst`. If they happen to retrieve
- // stale values, the following CAS will fail or won't even be attempted.
- let tail_ptr = self.tail.block.load(Ordering::Acquire, &guard);
- let tail = unsafe { tail_ptr.deref() };
- let tail_index = self.tail.index.load(Ordering::Relaxed);
-
- // Calculate the index of the corresponding slot in the block.
- let offset = tail_index.wrapping_sub(tail.start_index);
-
- // Advance the current index one slot forward.
- let new_index = tail_index.wrapping_add(1);
-
- // A closure that installs a block following `tail` in case it hasn't been yet.
- let install_next_block = || {
- let current = tail.next.compare_and_set(
- Shared::null(),
- Owned::new(Block::new(tail.start_index.wrapping_add(BLOCK_CAP))),
- Ordering::AcqRel,
- &guard,
- ).unwrap_or_else(|err| err.current);
-
- let _ = self.tail.block.compare_and_set(
- tail_ptr,
- current,
- Ordering::Release,
- &guard,
- );
- };
-
- // If `tail_index` is pointing into `tail`...
- if offset < BLOCK_CAP {
- // Try moving the tail index forward.
- if self
- .tail
- .index
- .compare_exchange_weak(
- tail_index,
- new_index,
- Ordering::SeqCst,
- Ordering::Relaxed,
- )
- .is_ok()
- {
- // If this was the last slot in the block, install a new block.
+ // Check if the channel is disconnected.
+ if tail & MARK_BIT != 0 {
+ token.list.block = ptr::null();
+ return true;
+ }
+
+ // Calculate the offset of the index into the block.
+ let offset = (tail >> SHIFT) % LAP;
+
+ // If we reached the end of the block, wait until the next one is installed.
+ if offset == BLOCK_CAP {
+ backoff.snooze();
+ tail = self.tail.index.load(Ordering::Acquire);
+ block = self.tail.block.load(Ordering::Acquire);
+ continue;
+ }
+
+ // If we're going to have to install the next block, allocate it in advance in order to
+ // make the wait for other threads as short as possible.
+ if offset + 1 == BLOCK_CAP && next_block.is_none() {
+ next_block = Some(Box::new(Block::<T>::new()));
+ }
+
+ // If this is the first message to be sent into the channel, we need to allocate the
+ // first block and install it.
+ if block.is_null() {
+ let new = Box::into_raw(Box::new(Block::<T>::new()));
+
+ if self.tail.block.compare_and_swap(block, new, Ordering::Release) == block {
+ self.head.block.store(new, Ordering::Release);
+ block = new;
+ } else {
+ next_block = unsafe { Some(Box::from_raw(new)) };
+ tail = self.tail.index.load(Ordering::Acquire);
+ block = self.tail.block.load(Ordering::Acquire);
+ continue;
+ }
+ }
+
+ let new_tail = tail + (1 << SHIFT);
+
+ // Try advancing the tail forward.
+ match self.tail.index
+ .compare_exchange_weak(
+ tail,
+ new_tail,
+ Ordering::SeqCst,
+ Ordering::Acquire,
+ )
+ {
+ Ok(_) => unsafe {
+ // If we've reached the end of the block, install the next one.
if offset + 1 == BLOCK_CAP {
- install_next_block();
+ let next_block = Box::into_raw(next_block.unwrap());
+ self.tail.block.store(next_block, Ordering::Release);
+ self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
+ (*block).next.store(next_block, Ordering::Release);
}
- unsafe {
- let slot = tail.slots.get_unchecked(offset).get();
- (*slot).msg.get().write(ManuallyDrop::new(msg));
- (*slot).ready.store(true, Ordering::Release);
- }
- break;
+ token.list.block = block as *const u8;
+ token.list.offset = offset;
+ return true;
+ }
+ Err(t) => {
+ tail = t;
+ block = self.tail.block.load(Ordering::Acquire);
+ backoff.spin();
}
-
- backoff.spin();
- } else if offset == BLOCK_CAP {
- // Help install the next block.
- install_next_block();
}
}
+ }
+
+ /// Writes a message into the channel.
+ pub unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
+ // If there is no slot, the channel is disconnected.
+ if token.list.block.is_null() {
+ return Err(msg);
+ }
- self.receivers.wake_one();
+ // Write the message into the slot.
+ let block = token.list.block as *mut Block<T>;
+ let offset = token.list.offset;
+ let slot = (*block).slots.get_unchecked(offset);
+ slot.msg.get().write(ManuallyDrop::new(msg));
+ slot.state.fetch_or(WRITE, Ordering::Release);
+
+ // Wake a sleeping receiver.
+ self.receivers.notify();
+ Ok(())
}
/// Attempts to reserve a slot for receiving a message.
fn start_recv(&self, token: &mut Token) -> bool {
- let guard = epoch::pin();
- let mut backoff = Backoff::new();
+ let backoff = Backoff::new();
+ let mut head = self.head.index.load(Ordering::Acquire);
+ let mut block = self.head.block.load(Ordering::Acquire);
loop {
- // Loading the head block doesn't have to be a `SeqCst` operation. If we get a stale
- // value, the following CAS will fail or not even be attempted. Loading the head index
- // must be `SeqCst` because we need the up-to-date value when checking whether the
- // channel is empty.
- let head_ptr = self.head.block.load(Ordering::Acquire, &guard);
- let head = unsafe { head_ptr.deref() };
- let head_index = self.head.index.load(Ordering::SeqCst);
-
- // Calculate the index of the corresponding slot in the block.
- let offset = head_index.wrapping_sub(head.start_index);
-
- // Advance the current index one slot forward.
- let new_index = head_index.wrapping_add(1);
-
- // A closure that installs a block following `head` in case it hasn't been yet.
- let install_next_block = || {
- let current = head.next.compare_and_set(
- Shared::null(),
- Owned::new(Block::new(head.start_index.wrapping_add(BLOCK_CAP))),
- Ordering::AcqRel,
- &guard,
- ).unwrap_or_else(|err| err.current);
-
- let _ = self.head.block.compare_and_set(
- head_ptr,
- current,
- Ordering::Release,
- &guard,
- );
- };
-
- // If `head_index` is pointing into `head`...
- if offset < BLOCK_CAP {
- let slot = unsafe { &*head.slots.get_unchecked(offset).get() };
-
- // If this slot does not contain a message...
- if !slot.ready.load(Ordering::Relaxed) {
- let tail_index = self.tail.index.load(Ordering::SeqCst);
-
- // If the tail equals the head, that means the channel is empty.
- if tail_index == head_index {
- // If the channel is closed...
- if self.is_closed() {
- // ...and still empty...
- if self.tail.index.load(Ordering::SeqCst) == tail_index {
- // ...then receive `None`.
- token.list.slot = ptr::null();
- return true;
- }
- } else {
- // Otherwise, the receive operation is not ready.
- return false;
- }
+ // Calculate the offset of the index into the block.
+ let offset = (head >> SHIFT) % LAP;
+
+ // If we reached the end of the block, wait until the next one is installed.
+ if offset == BLOCK_CAP {
+ backoff.snooze();
+ head = self.head.index.load(Ordering::Acquire);
+ block = self.head.block.load(Ordering::Acquire);
+ continue;
+ }
+
+ let mut new_head = head + (1 << SHIFT);
+
+ if new_head & MARK_BIT == 0 {
+ atomic::fence(Ordering::SeqCst);
+ let tail = self.tail.index.load(Ordering::Relaxed);
+
+ // If the tail equals the head, that means the channel is empty.
+ if head >> SHIFT == tail >> SHIFT {
+ // If the channel is disconnected...
+ if tail & MARK_BIT != 0 {
+ // ...then receive an error.
+ token.list.block = ptr::null();
+ return true;
+ } else {
+ // Otherwise, the receive operation is not ready.
+ return false;
}
}
- // Try moving the head index forward.
- if self
- .head
- .index
- .compare_exchange_weak(
- head_index,
- new_index,
- Ordering::SeqCst,
- Ordering::Relaxed,
- )
- .is_ok()
- {
- // If this was the last slot in the block, install a new block and destroy the
- // old one.
+ // If head and tail are not in the same block, set `MARK_BIT` in head.
+ if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
+ new_head |= MARK_BIT;
+ }
+ }
+
+ // The block can be null here only if the first message is being sent into the channel.
+ // In that case, just wait until it gets initialized.
+ if block.is_null() {
+ backoff.snooze();
+ head = self.head.index.load(Ordering::Acquire);
+ block = self.head.block.load(Ordering::Acquire);
+ continue;
+ }
+
+ // Try moving the head index forward.
+ match self.head.index
+ .compare_exchange_weak(
+ head,
+ new_head,
+ Ordering::SeqCst,
+ Ordering::Acquire,
+ )
+ {
+ Ok(_) => unsafe {
+ // If we've reached the end of the block, move to the next one.
if offset + 1 == BLOCK_CAP {
- install_next_block();
- unsafe {
- guard.defer_destroy(head_ptr);
+ let next = (*block).wait_next();
+ let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
+ if !(*next).next.load(Ordering::Relaxed).is_null() {
+ next_index |= MARK_BIT;
}
+
+ self.head.block.store(next, Ordering::Release);
+ self.head.index.store(next_index, Ordering::Release);
}
- token.list.slot = slot as *const Slot<T> as *const u8;
- break;
+ token.list.block = block as *const u8;
+ token.list.offset = offset;
+ return true;
+ }
+ Err(h) => {
+ head = h;
+ block = self.head.block.load(Ordering::Acquire);
+ backoff.spin();
}
-
- backoff.spin();
- } else if offset == BLOCK_CAP {
- // Help install the next block.
- install_next_block();
}
}
-
- token.list.guard = Some(guard);
- true
}
/// Reads a message from the channel.
- pub unsafe fn read(&self, token: &mut Token) -> Option<T> {
- if token.list.slot.is_null() {
- // The channel is closed.
- return None;
- }
-
- let slot = &*(token.list.slot as *const Slot<T>);
- let _guard: Guard = token.list.guard.take().unwrap();
-
- // Wait until the message becomes ready.
- let mut backoff = Backoff::new();
- while !slot.ready.load(Ordering::Acquire) {
- backoff.snooze();
+ pub unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
+ if token.list.block.is_null() {
+ // The channel is disconnected.
+ return Err(());
}
// Read the message.
+ let block = token.list.block as *mut Block<T>;
+ let offset = token.list.offset;
+ let slot = (*block).slots.get_unchecked(offset);
+ slot.wait_write();
let m = slot.msg.get().read();
let msg = ManuallyDrop::into_inner(m);
- Some(msg)
+
+ // Destroy the block if we've reached the end, or if another thread wanted to destroy but
+ // couldn't because we were busy reading from the slot.
+ if offset + 1 == BLOCK_CAP {
+ Block::destroy(block, 0);
+ } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
+ Block::destroy(block, offset + 1);
+ }
+
+ Ok(msg)
+ }
+
+ /// Attempts to send a message into the channel.
+ pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
+ self.send(msg, None).map_err(|err| match err {
+ SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
+ SendTimeoutError::Timeout(_) => unreachable!(),
+ })
}
/// Sends a message into the channel.
- pub fn send(&self, msg: T) {
+ pub fn send(&self, msg: T, _deadline: Option<Instant>) -> Result<(), SendTimeoutError<T>> {
+ let token = &mut Token::default();
+ assert!(self.start_send(token));
+ unsafe {
+ self.write(token, msg)
+ .map_err(SendTimeoutError::Disconnected)
+ }
+ }
+
+ /// Attempts to receive a message without blocking.
+ pub fn try_recv(&self) -> Result<T, TryRecvError> {
let token = &mut Token::default();
- self.write(token, msg);
+
+ if self.start_recv(token) {
+ unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
+ } else {
+ Err(TryRecvError::Empty)
+ }
}
/// Receives a message from the channel.
- pub fn recv(&self) -> Option<T> {
+ pub fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
let token = &mut Token::default();
loop {
// Try receiving a message several times.
- let mut backoff = Backoff::new();
+ let backoff = Backoff::new();
loop {
if self.start_recv(token) {
unsafe {
- return self.read(token);
+ return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
}
}
- if !backoff.snooze() {
+
+ if backoff.is_completed() {
break;
+ } else {
+ backoff.snooze();
}
}
self.receivers.register(oper, cx);
// Has the channel become ready just now?
- if !self.is_empty() || self.is_closed() {
+ if !self.is_empty() || self.is_disconnected() {
let _ = cx.try_select(Selected::Aborted);
}
// Block the current thread.
- let sel = cx.wait_until(None);
+ let sel = cx.wait_until(deadline);
match sel {
Selected::Waiting => unreachable!(),
- Selected::Aborted | Selected::Closed => {
+ Selected::Aborted | Selected::Disconnected => {
self.receivers.unregister(oper).unwrap();
- // If the channel was closed, we still have to check for remaining messages.
- },
- Selected::Operation(_) => {},
+ // If the channel was disconnected, we still have to check for remaining
+ // messages.
+ }
+ Selected::Operation(_) => {}
}
- })
- }
- }
+ });
- /// Attempts to receive a message without blocking.
- pub fn recv_nonblocking(&self) -> RecvNonblocking<T> {
- let token = &mut Token::default();
-
- if self.start_recv(token) {
- match unsafe { self.read(token) } {
- None => RecvNonblocking::Closed,
- Some(msg) => RecvNonblocking::Message(msg),
+ if let Some(d) = deadline {
+ if Instant::now() >= d {
+ return Err(RecvTimeoutError::Timeout);
+ }
}
- } else {
- RecvNonblocking::Empty
}
}
pub fn len(&self) -> usize {
loop {
// Load the tail index, then load the head index.
- let tail_index = self.tail.index.load(Ordering::SeqCst);
- let head_index = self.head.index.load(Ordering::SeqCst);
+ let mut tail = self.tail.index.load(Ordering::SeqCst);
+ let mut head = self.head.index.load(Ordering::SeqCst);
// If the tail index didn't change, we've got consistent indices to work with.
- if self.tail.index.load(Ordering::SeqCst) == tail_index {
- return tail_index.wrapping_sub(head_index);
+ if self.tail.index.load(Ordering::SeqCst) == tail {
+ // Erase the lower bits.
+ tail &= !((1 << SHIFT) - 1);
+ head &= !((1 << SHIFT) - 1);
+
+ // Rotate indices so that head falls into the first block.
+ let lap = (head >> SHIFT) / LAP;
+ tail = tail.wrapping_sub((lap * LAP) << SHIFT);
+ head = head.wrapping_sub((lap * LAP) << SHIFT);
+
+ // Remove the lower bits.
+ tail >>= SHIFT;
+ head >>= SHIFT;
+
+ // Fix up indices if they fall onto block ends.
+ if head == BLOCK_CAP {
+ head = 0;
+ tail -= LAP;
+ }
+ if tail == BLOCK_CAP {
+ tail += 1;
+ }
+
+ // Return the difference minus the number of blocks between tail and head.
+ return tail - head - tail / LAP;
}
}
}
None
}
- /// Closes the channel and wakes up all blocked receivers.
- pub fn close(&self) {
- assert!(!self.is_closed.swap(true, Ordering::SeqCst));
- self.receivers.close();
+ /// Disconnects the channel and wakes up all blocked receivers.
+ pub fn disconnect(&self) {
+ let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
+
+ if tail & MARK_BIT == 0 {
+ self.receivers.disconnect();
+ }
}
- /// Returns `true` if the channel is closed.
- pub fn is_closed(&self) -> bool {
- self.is_closed.load(Ordering::SeqCst)
+ /// Returns `true` if the channel is disconnected.
+ pub fn is_disconnected(&self) -> bool {
+ self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
}
/// Returns `true` if the channel is empty.
pub fn is_empty(&self) -> bool {
- let head_index = self.head.index.load(Ordering::SeqCst);
- let tail_index = self.tail.index.load(Ordering::SeqCst);
- head_index == tail_index
+ let head = self.head.index.load(Ordering::SeqCst);
+ let tail = self.tail.index.load(Ordering::SeqCst);
+ head >> SHIFT == tail >> SHIFT
}
/// Returns `true` if the channel is full.
impl<T> Drop for Channel<T> {
fn drop(&mut self) {
- // Get the tail and head indices.
- let tail_index = self.tail.index.load(Ordering::Relaxed);
- let mut head_index = self.head.index.load(Ordering::Relaxed);
-
- unsafe {
- let mut head_ptr = self.head.block.load(Ordering::Relaxed, epoch::unprotected());
-
- // Manually drop all messages between `head_index` and `tail_index` and destroy the
- // heap-allocated nodes along the way.
- while head_index != tail_index {
- let head = head_ptr.deref();
- let offset = head_index.wrapping_sub(head.start_index);
+ let mut head = self.head.index.load(Ordering::Relaxed);
+ let mut tail = self.tail.index.load(Ordering::Relaxed);
+ let mut block = self.head.block.load(Ordering::Relaxed);
- let slot = &mut *head.slots.get_unchecked(offset).get();
- ManuallyDrop::drop(&mut (*slot).msg.get().read());
+ // Erase the lower bits.
+ head &= !((1 << SHIFT) - 1);
+ tail &= !((1 << SHIFT) - 1);
- if offset + 1 == BLOCK_CAP {
- let next = head.next.load(Ordering::Relaxed, epoch::unprotected());
- drop(head_ptr.into_owned());
- head_ptr = next;
+ unsafe {
+ // Drop all messages between head and tail and deallocate the heap-allocated blocks.
+ while head != tail {
+ let offset = (head >> SHIFT) % LAP;
+
+ if offset < BLOCK_CAP {
+ // Drop the message in the slot.
+ let slot = (*block).slots.get_unchecked(offset);
+ ManuallyDrop::drop(&mut *(*slot).msg.get());
+ } else {
+ // Deallocate the block and move to the next one.
+ let next = (*block).next.load(Ordering::Relaxed);
+ drop(Box::from_raw(block));
+ block = next;
}
- head_index = head_index.wrapping_add(1);
+ head = head.wrapping_add(1 << SHIFT);
}
- // If there is one last remaining block in the end, destroy it.
- if !head_ptr.is_null() {
- drop(head_ptr.into_owned());
+ // Deallocate the last remaining block.
+ if !block.is_null() {
+ drop(Box::from_raw(block));
}
}
}
pub struct Sender<'a, T: 'a>(&'a Channel<T>);
impl<'a, T> SelectHandle for Receiver<'a, T> {
- fn try(&self, token: &mut Token) -> bool {
- self.0.start_recv(token)
- }
-
- fn retry(&self, token: &mut Token) -> bool {
+ fn try_select(&self, token: &mut Token) -> bool {
self.0.start_recv(token)
}
None
}
- fn register(&self, _token: &mut Token, oper: Operation, cx: &Context) -> bool {
+ fn register(&self, oper: Operation, cx: &Context) -> bool {
self.0.receivers.register(oper, cx);
- self.0.is_empty() && !self.0.is_closed()
+ self.is_ready()
}
fn unregister(&self, oper: Operation) {
}
fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
- self.0.start_recv(token)
+ self.try_select(token)
}
- fn state(&self) -> usize {
- self.0.tail.index.load(Ordering::SeqCst)
+ fn is_ready(&self) -> bool {
+ !self.0.is_empty() || self.0.is_disconnected()
}
-}
-impl<'a, T> SelectHandle for Sender<'a, T> {
- fn try(&self, _token: &mut Token) -> bool {
- true
+ fn watch(&self, oper: Operation, cx: &Context) -> bool {
+ self.0.receivers.watch(oper, cx);
+ self.is_ready()
}
- fn retry(&self, _token: &mut Token) -> bool {
- true
+ fn unwatch(&self, oper: Operation) {
+ self.0.receivers.unwatch(oper);
+ }
+}
+
+impl<'a, T> SelectHandle for Sender<'a, T> {
+ fn try_select(&self, token: &mut Token) -> bool {
+ self.0.start_send(token)
}
fn deadline(&self) -> Option<Instant> {
None
}
- fn register(&self, _token: &mut Token, _oper: Operation, _cx: &Context) -> bool {
- false
+ fn register(&self, _oper: Operation, _cx: &Context) -> bool {
+ self.is_ready()
}
fn unregister(&self, _oper: Operation) {}
- fn accept(&self, _token: &mut Token, _cx: &Context) -> bool {
+ fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
+ self.try_select(token)
+ }
+
+ fn is_ready(&self) -> bool {
true
}
- fn state(&self) -> usize {
- self.0.head.index.load(Ordering::SeqCst)
+ fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
+ self.is_ready()
}
+
+ fn unwatch(&self, _oper: Operation) {}
}