]> git.proxmox.com Git - cargo.git/blobdiff - vendor/crossbeam-channel/src/flavors/list.rs
New upstream version 0.33.0
[cargo.git] / vendor / crossbeam-channel / src / flavors / list.rs
index a7cfc6c6436f22c265109ea195801423dd8dcdbe..fae0600597a018bf9d86fdab00f042d34769feda 100644 (file)
@@ -4,47 +4,56 @@ use std::cell::UnsafeCell;
 use std::marker::PhantomData;
 use std::mem::{self, ManuallyDrop};
 use std::ptr;
-use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
+use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
 use std::time::Instant;
 
-use crossbeam_epoch::{self as epoch, Atomic, Guard, Owned, Shared};
-use crossbeam_utils::CachePadded;
-
-use internal::channel::RecvNonblocking;
-use internal::context::Context;
-use internal::select::{Operation, Selected, SelectHandle, Token};
-use internal::utils::Backoff;
-use internal::waker::SyncWaker;
-
-// TODO: Allocate less memory in the beginning. Blocks should start small and grow exponentially.
-
-/// The maximum number of messages a block can hold.
-const BLOCK_CAP: usize = 32;
+use crossbeam_utils::{Backoff, CachePadded};
+
+use context::Context;
+use err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
+use select::{Operation, SelectHandle, Selected, Token};
+use waker::SyncWaker;
+
+// TODO(stjepang): Once we bump the minimum required Rust version to 1.28 or newer, re-apply the
+// following changes by @kleimkuhler:
+//
+// 1. https://github.com/crossbeam-rs/crossbeam-channel/pull/100
+// 2. https://github.com/crossbeam-rs/crossbeam-channel/pull/101
+
+// Bits indicating the state of a slot:
+// * If a message has been written into the slot, `WRITE` is set.
+// * If a message has been read from the slot, `READ` is set.
+// * If the block is being destroyed, `DESTROY` is set.
+const WRITE: usize = 1;
+const READ: usize = 2;
+const DESTROY: usize = 4;
+
+// Each block covers one "lap" of indices.
+const LAP: usize = 32;
+// The maximum number of messages a block can hold.
+const BLOCK_CAP: usize = LAP - 1;
+// How many lower bits are reserved for metadata.
+const SHIFT: usize = 1;
+// Has two different purposes:
+// * If set in head, indicates that the block is not the last one.
+// * If set in tail, indicates that the channel is disconnected.
+const MARK_BIT: usize = 1;
 
 /// A slot in a block.
 struct Slot<T> {
     /// The message.
     msg: UnsafeCell<ManuallyDrop<T>>,
 
-    /// Equals `true` if the message is ready for reading.
-    ready: AtomicBool,
+    /// The state of the slot.
+    state: AtomicUsize,
 }
 
-/// The token type for the list flavor.
-pub struct ListToken {
-    /// Slot to read from or write to.
-    slot: *const u8,
-
-    /// Guard keeping alive the block that contains the slot.
-    guard: Option<Guard>,
-}
-
-impl Default for ListToken {
-    #[inline]
-    fn default() -> Self {
-        ListToken {
-            slot: ptr::null(),
-            guard: None,
+impl<T> Slot<T> {
+    /// Waits until a message is written into the slot.
+    fn wait_write(&self) {
+        let backoff = Backoff::new();
+        while self.state.load(Ordering::Acquire) & WRITE == 0 {
+            backoff.snooze();
         }
     }
 }
@@ -53,38 +62,79 @@ impl Default for ListToken {
 ///
 /// Each block in the list can hold up to `BLOCK_CAP` messages.
 struct Block<T> {
-    /// The start index of this block.
-    ///
-    /// Slots in this block have indices in `start_index .. start_index + BLOCK_CAP`.
-    start_index: usize,
-
     /// The next block in the linked list.
-    next: Atomic<Block<T>>,
+    next: AtomicPtr<Block<T>>,
 
     /// Slots for messages.
-    slots: [UnsafeCell<Slot<T>>; BLOCK_CAP],
+    slots: [Slot<T>; BLOCK_CAP],
 }
 
 impl<T> Block<T> {
-    /// Creates an empty block that starts at `start_index`.
-    fn new(start_index: usize) -> Block<T> {
-        Block {
-            start_index,
-            slots: unsafe { mem::zeroed() },
-            next: Atomic::null(),
+    /// Creates an empty block.
+    fn new() -> Block<T> {
+        unsafe { mem::zeroed() }
+    }
+
+    /// Waits until the next pointer is set.
+    fn wait_next(&self) -> *mut Block<T> {
+        let backoff = Backoff::new();
+        loop {
+            let next = self.next.load(Ordering::Acquire);
+            if !next.is_null() {
+                return next;
+            }
+            backoff.snooze();
+        }
+    }
+
+    /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
+    unsafe fn destroy(this: *mut Block<T>, start: usize) {
+        // It is not necessary to set the `DESTROY bit in the last slot because that slot has begun
+        // destruction of the block.
+        for i in start..BLOCK_CAP - 1 {
+            let slot = (*this).slots.get_unchecked(i);
+
+            // Mark the `DESTROY` bit if a thread is still using the slot.
+            if slot.state.load(Ordering::Acquire) & READ == 0
+                && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
+            {
+                // If a thread is still using the slot, it will continue destruction of the block.
+                return;
+            }
         }
+
+        // No thread is using the block, now it is safe to destroy it.
+        drop(Box::from_raw(this));
     }
 }
 
-/// Position in the channel (index and block).
-///
-/// This struct describes the current position of the head or the tail in a linked list.
+/// A position in a channel.
+#[derive(Debug)]
 struct Position<T> {
     /// The index in the channel.
     index: AtomicUsize,
 
     /// The block in the linked list.
-    block: Atomic<Block<T>>,
+    block: AtomicPtr<Block<T>>,
+}
+
+/// The token type for the list flavor.
+pub struct ListToken {
+    /// The block of slots.
+    block: *const u8,
+
+    /// The offset into the block.
+    offset: usize,
+}
+
+impl Default for ListToken {
+    #[inline]
+    fn default() -> Self {
+        ListToken {
+            block: ptr::null(),
+            offset: 0,
+        }
+    }
 }
 
 /// Unbounded channel implemented as a linked list.
@@ -101,39 +151,28 @@ pub struct Channel<T> {
     /// The tail of the channel.
     tail: CachePadded<Position<T>>,
 
-    /// Equals `true` when the channel is closed.
-    is_closed: AtomicBool,
-
-    /// Receivers waiting while the channel is empty and not closed.
+    /// Receivers waiting while the channel is empty and not disconnected.
     receivers: SyncWaker,
 
-    /// Indicates that dropping a `Channel<T>` may drop values of type `T`.
+    /// Indicates that dropping a `Channel<T>` may drop messages of type `T`.
     _marker: PhantomData<T>,
 }
 
 impl<T> Channel<T> {
     /// Creates a new unbounded channel.
     pub fn new() -> Self {
-        let channel = Channel {
+        Channel {
             head: CachePadded::new(Position {
+                block: AtomicPtr::new(ptr::null_mut()),
                 index: AtomicUsize::new(0),
-                block: Atomic::null(),
             }),
             tail: CachePadded::new(Position {
+                block: AtomicPtr::new(ptr::null_mut()),
                 index: AtomicUsize::new(0),
-                block: Atomic::null(),
             }),
-            is_closed: AtomicBool::new(false),
             receivers: SyncWaker::new(),
             _marker: PhantomData,
-        };
-
-        // Allocate an empty block for the first batch of messages.
-        let block = unsafe { Owned::new(Block::new(0)).into_shared(epoch::unprotected()) };
-        channel.head.block.store(block, Ordering::Relaxed);
-        channel.tail.block.store(block, Ordering::Relaxed);
-
-        channel
+        }
     }
 
     /// Returns a receiver handle to the channel.
@@ -146,218 +185,264 @@ impl<T> Channel<T> {
         Sender(self)
     }
 
-    /// Writes a message into the channel.
-    pub fn write(&self, _token: &mut Token, msg: T) {
-        let guard = epoch::pin();
-        let mut backoff = Backoff::new();
+    /// Attempts to reserve a slot for sending a message.
+    fn start_send(&self, token: &mut Token) -> bool {
+        let backoff = Backoff::new();
+        let mut tail = self.tail.index.load(Ordering::Acquire);
+        let mut block = self.tail.block.load(Ordering::Acquire);
+        let mut next_block = None;
 
         loop {
-            // These two load operations don't have to be `SeqCst`. If they happen to retrieve
-            // stale values, the following CAS will fail or won't even be attempted.
-            let tail_ptr = self.tail.block.load(Ordering::Acquire, &guard);
-            let tail = unsafe { tail_ptr.deref() };
-            let tail_index = self.tail.index.load(Ordering::Relaxed);
-
-            // Calculate the index of the corresponding slot in the block.
-            let offset = tail_index.wrapping_sub(tail.start_index);
-
-            // Advance the current index one slot forward.
-            let new_index = tail_index.wrapping_add(1);
-
-            // A closure that installs a block following `tail` in case it hasn't been yet.
-            let install_next_block = || {
-                let current = tail.next.compare_and_set(
-                    Shared::null(),
-                    Owned::new(Block::new(tail.start_index.wrapping_add(BLOCK_CAP))),
-                    Ordering::AcqRel,
-                    &guard,
-                ).unwrap_or_else(|err| err.current);
-
-                let _ = self.tail.block.compare_and_set(
-                    tail_ptr,
-                    current,
-                    Ordering::Release,
-                    &guard,
-                );
-            };
-
-            // If `tail_index` is pointing into `tail`...
-            if offset < BLOCK_CAP {
-                // Try moving the tail index forward.
-                if self
-                    .tail
-                    .index
-                    .compare_exchange_weak(
-                        tail_index,
-                        new_index,
-                        Ordering::SeqCst,
-                        Ordering::Relaxed,
-                    )
-                    .is_ok()
-                {
-                    // If this was the last slot in the block, install a new block.
+            // Check if the channel is disconnected.
+            if tail & MARK_BIT != 0 {
+                token.list.block = ptr::null();
+                return true;
+            }
+
+            // Calculate the offset of the index into the block.
+            let offset = (tail >> SHIFT) % LAP;
+
+            // If we reached the end of the block, wait until the next one is installed.
+            if offset == BLOCK_CAP {
+                backoff.snooze();
+                tail = self.tail.index.load(Ordering::Acquire);
+                block = self.tail.block.load(Ordering::Acquire);
+                continue;
+            }
+
+            // If we're going to have to install the next block, allocate it in advance in order to
+            // make the wait for other threads as short as possible.
+            if offset + 1 == BLOCK_CAP && next_block.is_none() {
+                next_block = Some(Box::new(Block::<T>::new()));
+            }
+
+            // If this is the first message to be sent into the channel, we need to allocate the
+            // first block and install it.
+            if block.is_null() {
+                let new = Box::into_raw(Box::new(Block::<T>::new()));
+
+                if self.tail.block.compare_and_swap(block, new, Ordering::Release) == block {
+                    self.head.block.store(new, Ordering::Release);
+                    block = new;
+                } else {
+                    next_block = unsafe { Some(Box::from_raw(new)) };
+                    tail = self.tail.index.load(Ordering::Acquire);
+                    block = self.tail.block.load(Ordering::Acquire);
+                    continue;
+                }
+            }
+
+            let new_tail = tail + (1 << SHIFT);
+
+            // Try advancing the tail forward.
+            match self.tail.index
+                .compare_exchange_weak(
+                    tail,
+                    new_tail,
+                    Ordering::SeqCst,
+                    Ordering::Acquire,
+                )
+            {
+                Ok(_) => unsafe {
+                    // If we've reached the end of the block, install the next one.
                     if offset + 1 == BLOCK_CAP {
-                        install_next_block();
+                        let next_block = Box::into_raw(next_block.unwrap());
+                        self.tail.block.store(next_block, Ordering::Release);
+                        self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
+                        (*block).next.store(next_block, Ordering::Release);
                     }
 
-                    unsafe {
-                        let slot = tail.slots.get_unchecked(offset).get();
-                        (*slot).msg.get().write(ManuallyDrop::new(msg));
-                        (*slot).ready.store(true, Ordering::Release);
-                    }
-                    break;
+                    token.list.block = block as *const u8;
+                    token.list.offset = offset;
+                    return true;
+                }
+                Err(t) => {
+                    tail = t;
+                    block = self.tail.block.load(Ordering::Acquire);
+                    backoff.spin();
                 }
-
-                backoff.spin();
-            } else if offset == BLOCK_CAP {
-                // Help install the next block.
-                install_next_block();
             }
         }
+    }
+
+    /// Writes a message into the channel.
+    pub unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
+        // If there is no slot, the channel is disconnected.
+        if token.list.block.is_null() {
+            return Err(msg);
+        }
 
-        self.receivers.wake_one();
+        // Write the message into the slot.
+        let block = token.list.block as *mut Block<T>;
+        let offset = token.list.offset;
+        let slot = (*block).slots.get_unchecked(offset);
+        slot.msg.get().write(ManuallyDrop::new(msg));
+        slot.state.fetch_or(WRITE, Ordering::Release);
+
+        // Wake a sleeping receiver.
+        self.receivers.notify();
+        Ok(())
     }
 
     /// Attempts to reserve a slot for receiving a message.
     fn start_recv(&self, token: &mut Token) -> bool {
-        let guard = epoch::pin();
-        let mut backoff = Backoff::new();
+        let backoff = Backoff::new();
+        let mut head = self.head.index.load(Ordering::Acquire);
+        let mut block = self.head.block.load(Ordering::Acquire);
 
         loop {
-            // Loading the head block doesn't have to be a `SeqCst` operation. If we get a stale
-            // value, the following CAS will fail or not even be attempted. Loading the head index
-            // must be `SeqCst` because we need the up-to-date value when checking whether the
-            // channel is empty.
-            let head_ptr = self.head.block.load(Ordering::Acquire, &guard);
-            let head = unsafe { head_ptr.deref() };
-            let head_index = self.head.index.load(Ordering::SeqCst);
-
-            // Calculate the index of the corresponding slot in the block.
-            let offset = head_index.wrapping_sub(head.start_index);
-
-            // Advance the current index one slot forward.
-            let new_index = head_index.wrapping_add(1);
-
-            // A closure that installs a block following `head` in case it hasn't been yet.
-            let install_next_block = || {
-                let current = head.next.compare_and_set(
-                    Shared::null(),
-                    Owned::new(Block::new(head.start_index.wrapping_add(BLOCK_CAP))),
-                    Ordering::AcqRel,
-                    &guard,
-                ).unwrap_or_else(|err| err.current);
-
-                let _ = self.head.block.compare_and_set(
-                    head_ptr,
-                    current,
-                    Ordering::Release,
-                    &guard,
-                );
-            };
-
-            // If `head_index` is pointing into `head`...
-            if offset < BLOCK_CAP {
-                let slot = unsafe { &*head.slots.get_unchecked(offset).get() };
-
-                // If this slot does not contain a message...
-                if !slot.ready.load(Ordering::Relaxed) {
-                    let tail_index = self.tail.index.load(Ordering::SeqCst);
-
-                    // If the tail equals the head, that means the channel is empty.
-                    if tail_index == head_index {
-                        // If the channel is closed...
-                        if self.is_closed() {
-                            // ...and still empty...
-                            if self.tail.index.load(Ordering::SeqCst) == tail_index {
-                                // ...then receive `None`.
-                                token.list.slot = ptr::null();
-                                return true;
-                            }
-                        } else {
-                            // Otherwise, the receive operation is not ready.
-                            return false;
-                        }
+            // Calculate the offset of the index into the block.
+            let offset = (head >> SHIFT) % LAP;
+
+            // If we reached the end of the block, wait until the next one is installed.
+            if offset == BLOCK_CAP {
+                backoff.snooze();
+                head = self.head.index.load(Ordering::Acquire);
+                block = self.head.block.load(Ordering::Acquire);
+                continue;
+            }
+
+            let mut new_head = head + (1 << SHIFT);
+
+            if new_head & MARK_BIT == 0 {
+                atomic::fence(Ordering::SeqCst);
+                let tail = self.tail.index.load(Ordering::Relaxed);
+
+                // If the tail equals the head, that means the channel is empty.
+                if head >> SHIFT == tail >> SHIFT {
+                    // If the channel is disconnected...
+                    if tail & MARK_BIT != 0 {
+                        // ...then receive an error.
+                        token.list.block = ptr::null();
+                        return true;
+                    } else {
+                        // Otherwise, the receive operation is not ready.
+                        return false;
                     }
                 }
 
-                // Try moving the head index forward.
-                if self
-                    .head
-                    .index
-                    .compare_exchange_weak(
-                        head_index,
-                        new_index,
-                        Ordering::SeqCst,
-                        Ordering::Relaxed,
-                    )
-                    .is_ok()
-                {
-                    // If this was the last slot in the block, install a new block and destroy the
-                    // old one.
+                // If head and tail are not in the same block, set `MARK_BIT` in head.
+                if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
+                    new_head |= MARK_BIT;
+                }
+            }
+
+            // The block can be null here only if the first message is being sent into the channel.
+            // In that case, just wait until it gets initialized.
+            if block.is_null() {
+                backoff.snooze();
+                head = self.head.index.load(Ordering::Acquire);
+                block = self.head.block.load(Ordering::Acquire);
+                continue;
+            }
+
+            // Try moving the head index forward.
+            match self.head.index
+                .compare_exchange_weak(
+                    head,
+                    new_head,
+                    Ordering::SeqCst,
+                    Ordering::Acquire,
+                )
+            {
+                Ok(_) => unsafe {
+                    // If we've reached the end of the block, move to the next one.
                     if offset + 1 == BLOCK_CAP {
-                        install_next_block();
-                        unsafe {
-                            guard.defer_destroy(head_ptr);
+                        let next = (*block).wait_next();
+                        let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
+                        if !(*next).next.load(Ordering::Relaxed).is_null() {
+                            next_index |= MARK_BIT;
                         }
+
+                        self.head.block.store(next, Ordering::Release);
+                        self.head.index.store(next_index, Ordering::Release);
                     }
 
-                    token.list.slot = slot as *const Slot<T> as *const u8;
-                    break;
+                    token.list.block = block as *const u8;
+                    token.list.offset = offset;
+                    return true;
+                }
+                Err(h) => {
+                    head = h;
+                    block = self.head.block.load(Ordering::Acquire);
+                    backoff.spin();
                 }
-
-                backoff.spin();
-            } else if offset == BLOCK_CAP {
-                // Help install the next block.
-                install_next_block();
             }
         }
-
-        token.list.guard = Some(guard);
-        true
     }
 
     /// Reads a message from the channel.
-    pub unsafe fn read(&self, token: &mut Token) -> Option<T> {
-        if token.list.slot.is_null() {
-            // The channel is closed.
-            return None;
-        }
-
-        let slot = &*(token.list.slot as *const Slot<T>);
-        let _guard: Guard = token.list.guard.take().unwrap();
-
-        // Wait until the message becomes ready.
-        let mut backoff = Backoff::new();
-        while !slot.ready.load(Ordering::Acquire) {
-            backoff.snooze();
+    pub unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
+        if token.list.block.is_null() {
+            // The channel is disconnected.
+            return Err(());
         }
 
         // Read the message.
+        let block = token.list.block as *mut Block<T>;
+        let offset = token.list.offset;
+        let slot = (*block).slots.get_unchecked(offset);
+        slot.wait_write();
         let m = slot.msg.get().read();
         let msg = ManuallyDrop::into_inner(m);
-        Some(msg)
+
+        // Destroy the block if we've reached the end, or if another thread wanted to destroy but
+        // couldn't because we were busy reading from the slot.
+        if offset + 1 == BLOCK_CAP {
+            Block::destroy(block, 0);
+        } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
+            Block::destroy(block, offset + 1);
+        }
+
+        Ok(msg)
+    }
+
+    /// Attempts to send a message into the channel.
+    pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
+        self.send(msg, None).map_err(|err| match err {
+            SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
+            SendTimeoutError::Timeout(_) => unreachable!(),
+        })
     }
 
     /// Sends a message into the channel.
-    pub fn send(&self, msg: T) {
+    pub fn send(&self, msg: T, _deadline: Option<Instant>) -> Result<(), SendTimeoutError<T>> {
+        let token = &mut Token::default();
+        assert!(self.start_send(token));
+        unsafe {
+            self.write(token, msg)
+                .map_err(SendTimeoutError::Disconnected)
+        }
+    }
+
+    /// Attempts to receive a message without blocking.
+    pub fn try_recv(&self) -> Result<T, TryRecvError> {
         let token = &mut Token::default();
-        self.write(token, msg);
+
+        if self.start_recv(token) {
+            unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
+        } else {
+            Err(TryRecvError::Empty)
+        }
     }
 
     /// Receives a message from the channel.
-    pub fn recv(&self) -> Option<T> {
+    pub fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
         let token = &mut Token::default();
         loop {
             // Try receiving a message several times.
-            let mut backoff = Backoff::new();
+            let backoff = Backoff::new();
             loop {
                 if self.start_recv(token) {
                     unsafe {
-                        return self.read(token);
+                        return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
                     }
                 }
-                if !backoff.snooze() {
+
+                if backoff.is_completed() {
                     break;
+                } else {
+                    backoff.snooze();
                 }
             }
 
@@ -367,36 +452,29 @@ impl<T> Channel<T> {
                 self.receivers.register(oper, cx);
 
                 // Has the channel become ready just now?
-                if !self.is_empty() || self.is_closed() {
+                if !self.is_empty() || self.is_disconnected() {
                     let _ = cx.try_select(Selected::Aborted);
                 }
 
                 // Block the current thread.
-                let sel = cx.wait_until(None);
+                let sel = cx.wait_until(deadline);
 
                 match sel {
                     Selected::Waiting => unreachable!(),
-                    Selected::Aborted | Selected::Closed => {
+                    Selected::Aborted | Selected::Disconnected => {
                         self.receivers.unregister(oper).unwrap();
-                        // If the channel was closed, we still have to check for remaining messages.
-                    },
-                    Selected::Operation(_) => {},
+                        // If the channel was disconnected, we still have to check for remaining
+                        // messages.
+                    }
+                    Selected::Operation(_) => {}
                 }
-            })
-        }
-    }
+            });
 
-    /// Attempts to receive a message without blocking.
-    pub fn recv_nonblocking(&self) -> RecvNonblocking<T> {
-        let token = &mut Token::default();
-
-        if self.start_recv(token) {
-            match unsafe { self.read(token) } {
-                None => RecvNonblocking::Closed,
-                Some(msg) => RecvNonblocking::Message(msg),
+            if let Some(d) = deadline {
+                if Instant::now() >= d {
+                    return Err(RecvTimeoutError::Timeout);
+                }
             }
-        } else {
-            RecvNonblocking::Empty
         }
     }
 
@@ -404,12 +482,35 @@ impl<T> Channel<T> {
     pub fn len(&self) -> usize {
         loop {
             // Load the tail index, then load the head index.
-            let tail_index = self.tail.index.load(Ordering::SeqCst);
-            let head_index = self.head.index.load(Ordering::SeqCst);
+            let mut tail = self.tail.index.load(Ordering::SeqCst);
+            let mut head = self.head.index.load(Ordering::SeqCst);
 
             // If the tail index didn't change, we've got consistent indices to work with.
-            if self.tail.index.load(Ordering::SeqCst) == tail_index {
-                return tail_index.wrapping_sub(head_index);
+            if self.tail.index.load(Ordering::SeqCst) == tail {
+                // Erase the lower bits.
+                tail &= !((1 << SHIFT) - 1);
+                head &= !((1 << SHIFT) - 1);
+
+                // Rotate indices so that head falls into the first block.
+                let lap = (head >> SHIFT) / LAP;
+                tail = tail.wrapping_sub((lap * LAP) << SHIFT);
+                head = head.wrapping_sub((lap * LAP) << SHIFT);
+
+                // Remove the lower bits.
+                tail >>= SHIFT;
+                head >>= SHIFT;
+
+                // Fix up indices if they fall onto block ends.
+                if head == BLOCK_CAP {
+                    head = 0;
+                    tail -= LAP;
+                }
+                if tail == BLOCK_CAP {
+                    tail += 1;
+                }
+
+                // Return the difference minus the number of blocks between tail and head.
+                return tail - head - tail / LAP;
             }
         }
     }
@@ -419,22 +520,25 @@ impl<T> Channel<T> {
         None
     }
 
-    /// Closes the channel and wakes up all blocked receivers.
-    pub fn close(&self) {
-        assert!(!self.is_closed.swap(true, Ordering::SeqCst));
-        self.receivers.close();
+    /// Disconnects the channel and wakes up all blocked receivers.
+    pub fn disconnect(&self) {
+        let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
+
+        if tail & MARK_BIT == 0 {
+            self.receivers.disconnect();
+        }
     }
 
-    /// Returns `true` if the channel is closed.
-    pub fn is_closed(&self) -> bool {
-        self.is_closed.load(Ordering::SeqCst)
+    /// Returns `true` if the channel is disconnected.
+    pub fn is_disconnected(&self) -> bool {
+        self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
     }
 
     /// Returns `true` if the channel is empty.
     pub fn is_empty(&self) -> bool {
-        let head_index = self.head.index.load(Ordering::SeqCst);
-        let tail_index = self.tail.index.load(Ordering::SeqCst);
-        head_index == tail_index
+        let head = self.head.index.load(Ordering::SeqCst);
+        let tail = self.tail.index.load(Ordering::SeqCst);
+        head >> SHIFT == tail >> SHIFT
     }
 
     /// Returns `true` if the channel is full.
@@ -445,34 +549,36 @@ impl<T> Channel<T> {
 
 impl<T> Drop for Channel<T> {
     fn drop(&mut self) {
-        // Get the tail and head indices.
-        let tail_index = self.tail.index.load(Ordering::Relaxed);
-        let mut head_index = self.head.index.load(Ordering::Relaxed);
-
-        unsafe {
-            let mut head_ptr = self.head.block.load(Ordering::Relaxed, epoch::unprotected());
-
-            // Manually drop all messages between `head_index` and `tail_index` and destroy the
-            // heap-allocated nodes along the way.
-            while head_index != tail_index {
-                let head = head_ptr.deref();
-                let offset = head_index.wrapping_sub(head.start_index);
+        let mut head = self.head.index.load(Ordering::Relaxed);
+        let mut tail = self.tail.index.load(Ordering::Relaxed);
+        let mut block = self.head.block.load(Ordering::Relaxed);
 
-                let slot = &mut *head.slots.get_unchecked(offset).get();
-                ManuallyDrop::drop(&mut (*slot).msg.get().read());
+        // Erase the lower bits.
+        head &= !((1 << SHIFT) - 1);
+        tail &= !((1 << SHIFT) - 1);
 
-                if offset + 1 == BLOCK_CAP {
-                    let next = head.next.load(Ordering::Relaxed, epoch::unprotected());
-                    drop(head_ptr.into_owned());
-                    head_ptr = next;
+        unsafe {
+            // Drop all messages between head and tail and deallocate the heap-allocated blocks.
+            while head != tail {
+                let offset = (head >> SHIFT) % LAP;
+
+                if offset < BLOCK_CAP {
+                    // Drop the message in the slot.
+                    let slot = (*block).slots.get_unchecked(offset);
+                    ManuallyDrop::drop(&mut *(*slot).msg.get());
+                } else {
+                    // Deallocate the block and move to the next one.
+                    let next = (*block).next.load(Ordering::Relaxed);
+                    drop(Box::from_raw(block));
+                    block = next;
                 }
 
-                head_index = head_index.wrapping_add(1);
+                head = head.wrapping_add(1 << SHIFT);
             }
 
-            // If there is one last remaining block in the end, destroy it.
-            if !head_ptr.is_null() {
-                drop(head_ptr.into_owned());
+            // Deallocate the last remaining block.
+            if !block.is_null() {
+                drop(Box::from_raw(block));
             }
         }
     }
@@ -485,11 +591,7 @@ pub struct Receiver<'a, T: 'a>(&'a Channel<T>);
 pub struct Sender<'a, T: 'a>(&'a Channel<T>);
 
 impl<'a, T> SelectHandle for Receiver<'a, T> {
-    fn try(&self, token: &mut Token) -> bool {
-        self.0.start_recv(token)
-    }
-
-    fn retry(&self, token: &mut Token) -> bool {
+    fn try_select(&self, token: &mut Token) -> bool {
         self.0.start_recv(token)
     }
 
@@ -497,9 +599,9 @@ impl<'a, T> SelectHandle for Receiver<'a, T> {
         None
     }
 
-    fn register(&self, _token: &mut Token, oper: Operation, cx: &Context) -> bool {
+    fn register(&self, oper: Operation, cx: &Context) -> bool {
         self.0.receivers.register(oper, cx);
-        self.0.is_empty() && !self.0.is_closed()
+        self.is_ready()
     }
 
     fn unregister(&self, oper: Operation) {
@@ -507,38 +609,49 @@ impl<'a, T> SelectHandle for Receiver<'a, T> {
     }
 
     fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
-        self.0.start_recv(token)
+        self.try_select(token)
     }
 
-    fn state(&self) -> usize {
-        self.0.tail.index.load(Ordering::SeqCst)
+    fn is_ready(&self) -> bool {
+        !self.0.is_empty() || self.0.is_disconnected()
     }
-}
 
-impl<'a, T> SelectHandle for Sender<'a, T> {
-    fn try(&self, _token: &mut Token) -> bool {
-        true
+    fn watch(&self, oper: Operation, cx: &Context) -> bool {
+        self.0.receivers.watch(oper, cx);
+        self.is_ready()
     }
 
-    fn retry(&self, _token: &mut Token) -> bool {
-        true
+    fn unwatch(&self, oper: Operation) {
+        self.0.receivers.unwatch(oper);
+    }
+}
+
+impl<'a, T> SelectHandle for Sender<'a, T> {
+    fn try_select(&self, token: &mut Token) -> bool {
+        self.0.start_send(token)
     }
 
     fn deadline(&self) -> Option<Instant> {
         None
     }
 
-    fn register(&self, _token: &mut Token, _oper: Operation, _cx: &Context) -> bool {
-        false
+    fn register(&self, _oper: Operation, _cx: &Context) -> bool {
+        self.is_ready()
     }
 
     fn unregister(&self, _oper: Operation) {}
 
-    fn accept(&self, _token: &mut Token, _cx: &Context) -> bool {
+    fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
+        self.try_select(token)
+    }
+
+    fn is_ready(&self) -> bool {
         true
     }
 
-    fn state(&self) -> usize {
-        self.0.head.index.load(Ordering::SeqCst)
+    fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
+        self.is_ready()
     }
+
+    fn unwatch(&self, _oper: Operation) {}
 }