+ /// Disconnects receivers and wakes up all blocked senders.
+ ///
+ /// Returns `true` if this call disconnected the channel.
+ ///
+ /// # Safety
+ /// May only be called once upon dropping the last receiver. The
+ /// destruction of all other receivers must have been observed with acquire
+ /// ordering or stronger.
+ pub(crate) unsafe fn disconnect_receivers(&self) -> bool {
+ let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);
+ let disconnected = if tail & self.mark_bit == 0 {
+ self.senders.disconnect();
+ true
+ } else {
+ false
+ };
+
+ self.discard_all_messages(tail);
+ disconnected
+ }
+
+ /// Discards all messages.
+ ///
+ /// `tail` should be the current (and therefore last) value of `tail`.
+ ///
+ /// # Panicking
+ /// If a destructor panics, the remaining messages are leaked, matching the
+ /// behaviour of the unbounded channel.
+ ///
+ /// # Safety
+ /// This method must only be called when dropping the last receiver. The
+ /// destruction of all other receivers must have been observed with acquire
+ /// ordering or stronger.
+ unsafe fn discard_all_messages(&self, tail: usize) {
+ debug_assert!(self.is_disconnected());
+
+ // Only receivers modify `head`, so since we are the last one,
+ // this value will not change and will not be observed (since
+ // no new messages can be sent after disconnection).
+ let mut head = self.head.load(Ordering::Relaxed);
+ let tail = tail & !self.mark_bit;
+
+ let backoff = Backoff::new();
+ loop {
+ // Deconstruct the head.
+ let index = head & (self.mark_bit - 1);
+ let lap = head & !(self.one_lap - 1);
+
+ // Inspect the corresponding slot.
+ debug_assert!(index < self.buffer.len());
+ let slot = unsafe { self.buffer.get_unchecked(index) };
+ let stamp = slot.stamp.load(Ordering::Acquire);
+
+ // If the stamp is ahead of the head by 1, we may drop the message.
+ if head + 1 == stamp {
+ head = if index + 1 < self.cap {
+ // Same lap, incremented index.
+ // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
+ head + 1
+ } else {
+ // One lap forward, index wraps around to zero.
+ // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
+ lap.wrapping_add(self.one_lap)
+ };
+
+ unsafe {
+ (*slot.msg.get()).assume_init_drop();
+ }
+ // If the tail equals the head, that means the channel is empty.
+ } else if tail == head {
+ return;
+ // Otherwise, a sender is about to write into the slot, so we need
+ // to wait for it to update the stamp.
+ } else {
+ backoff.spin_heavy();
+ }
+ }
+ }
+