1 //! Unbounded channel implemented as a linked list.
3 use std
::cell
::UnsafeCell
;
4 use std
::marker
::PhantomData
;
6 use std
::sync
::atomic
::{self, AtomicPtr, AtomicUsize, Ordering}
;
7 use std
::time
::Instant
;
9 use crossbeam_utils
::{Backoff, CachePadded}
;
11 use maybe_uninit
::MaybeUninit
;
14 use err
::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError}
;
15 use select
::{Operation, SelectHandle, Selected, Token}
;
18 // TODO(stjepang): Once we bump the minimum required Rust version to 1.28 or newer, re-apply the
19 // following changes by @kleimkuhler:
21 // 1. https://github.com/crossbeam-rs/crossbeam-channel/pull/100
22 // 2. https://github.com/crossbeam-rs/crossbeam-channel/pull/101
24 // Bits indicating the state of a slot:
25 // * If a message has been written into the slot, `WRITE` is set.
26 // * If a message has been read from the slot, `READ` is set.
27 // * If the block is being destroyed, `DESTROY` is set.
28 const WRITE
: usize = 1;
29 const READ
: usize = 2;
30 const DESTROY
: usize = 4;
32 // Each block covers one "lap" of indices.
33 const LAP
: usize = 32;
34 // The maximum number of messages a block can hold.
35 const BLOCK_CAP
: usize = LAP
- 1;
36 // How many lower bits are reserved for metadata.
37 const SHIFT
: usize = 1;
38 // Has two different purposes:
39 // * If set in head, indicates that the block is not the last one.
40 // * If set in tail, indicates that the channel is disconnected.
41 const MARK_BIT
: usize = 1;
43 /// A slot in a block.
46 msg
: UnsafeCell
<MaybeUninit
<T
>>,
48 /// The state of the slot.
53 /// Waits until a message is written into the slot.
54 fn wait_write(&self) {
55 let backoff
= Backoff
::new();
56 while self.state
.load(Ordering
::Acquire
) & WRITE
== 0 {
62 /// A block in a linked list.
64 /// Each block in the list can hold up to `BLOCK_CAP` messages.
66 /// The next block in the linked list.
67 next
: AtomicPtr
<Block
<T
>>,
69 /// Slots for messages.
70 slots
: [Slot
<T
>; BLOCK_CAP
],
74 /// Creates an empty block.
75 fn new() -> Block
<T
> {
76 // SAFETY: This is safe because:
77 // [1] `Block::next` (AtomicPtr) may be safely zero initialized.
78 // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
79 // [3] `Slot::msg` (UnsafeCell) may be safely zero initialized because it
80 // holds a MaybeUninit.
81 // [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
82 unsafe { MaybeUninit::zeroed().assume_init() }
85 /// Waits until the next pointer is set.
86 fn wait_next(&self) -> *mut Block
<T
> {
87 let backoff
= Backoff
::new();
89 let next
= self.next
.load(Ordering
::Acquire
);
97 /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
98 unsafe fn destroy(this
: *mut Block
<T
>, start
: usize) {
99 // It is not necessary to set the `DESTROY bit in the last slot because that slot has begun
100 // destruction of the block.
101 for i
in start
..BLOCK_CAP
- 1 {
102 let slot
= (*this
).slots
.get_unchecked(i
);
104 // Mark the `DESTROY` bit if a thread is still using the slot.
105 if slot
.state
.load(Ordering
::Acquire
) & READ
== 0
106 && slot
.state
.fetch_or(DESTROY
, Ordering
::AcqRel
) & READ
== 0
108 // If a thread is still using the slot, it will continue destruction of the block.
113 // No thread is using the block, now it is safe to destroy it.
114 drop(Box
::from_raw(this
));
118 /// A position in a channel.
121 /// The index in the channel.
124 /// The block in the linked list.
125 block
: AtomicPtr
<Block
<T
>>,
128 /// The token type for the list flavor.
130 pub struct ListToken
{
131 /// The block of slots.
134 /// The offset into the block.
138 impl Default
for ListToken
{
140 fn default() -> Self {
148 /// Unbounded channel implemented as a linked list.
150 /// Each message sent into the channel is assigned a sequence number, i.e. an index. Indices are
151 /// represented as numbers of type `usize` and wrap on overflow.
153 /// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and
154 /// improve cache efficiency.
155 pub struct Channel
<T
> {
156 /// The head of the channel.
157 head
: CachePadded
<Position
<T
>>,
159 /// The tail of the channel.
160 tail
: CachePadded
<Position
<T
>>,
162 /// Receivers waiting while the channel is empty and not disconnected.
163 receivers
: SyncWaker
,
165 /// Indicates that dropping a `Channel<T>` may drop messages of type `T`.
166 _marker
: PhantomData
<T
>,
170 /// Creates a new unbounded channel.
171 pub fn new() -> Self {
173 head
: CachePadded
::new(Position
{
174 block
: AtomicPtr
::new(ptr
::null_mut()),
175 index
: AtomicUsize
::new(0),
177 tail
: CachePadded
::new(Position
{
178 block
: AtomicPtr
::new(ptr
::null_mut()),
179 index
: AtomicUsize
::new(0),
181 receivers
: SyncWaker
::new(),
182 _marker
: PhantomData
,
186 /// Returns a receiver handle to the channel.
187 pub fn receiver(&self) -> Receiver
<T
> {
191 /// Returns a sender handle to the channel.
192 pub fn sender(&self) -> Sender
<T
> {
196 /// Attempts to reserve a slot for sending a message.
197 fn start_send(&self, token
: &mut Token
) -> bool
{
198 let backoff
= Backoff
::new();
199 let mut tail
= self.tail
.index
.load(Ordering
::Acquire
);
200 let mut block
= self.tail
.block
.load(Ordering
::Acquire
);
201 let mut next_block
= None
;
204 // Check if the channel is disconnected.
205 if tail
& MARK_BIT
!= 0 {
206 token
.list
.block
= ptr
::null();
210 // Calculate the offset of the index into the block.
211 let offset
= (tail
>> SHIFT
) % LAP
;
213 // If we reached the end of the block, wait until the next one is installed.
214 if offset
== BLOCK_CAP
{
216 tail
= self.tail
.index
.load(Ordering
::Acquire
);
217 block
= self.tail
.block
.load(Ordering
::Acquire
);
221 // If we're going to have to install the next block, allocate it in advance in order to
222 // make the wait for other threads as short as possible.
223 if offset
+ 1 == BLOCK_CAP
&& next_block
.is_none() {
224 next_block
= Some(Box
::new(Block
::<T
>::new()));
227 // If this is the first message to be sent into the channel, we need to allocate the
228 // first block and install it.
230 let new
= Box
::into_raw(Box
::new(Block
::<T
>::new()));
235 .compare_and_swap(block
, new
, Ordering
::Release
)
238 self.head
.block
.store(new
, Ordering
::Release
);
241 next_block
= unsafe { Some(Box::from_raw(new)) }
;
242 tail
= self.tail
.index
.load(Ordering
::Acquire
);
243 block
= self.tail
.block
.load(Ordering
::Acquire
);
248 let new_tail
= tail
+ (1 << SHIFT
);
250 // Try advancing the tail forward.
251 match self.tail
.index
.compare_exchange_weak(
258 // If we've reached the end of the block, install the next one.
259 if offset
+ 1 == BLOCK_CAP
{
260 let next_block
= Box
::into_raw(next_block
.unwrap());
261 self.tail
.block
.store(next_block
, Ordering
::Release
);
262 self.tail
.index
.fetch_add(1 << SHIFT
, Ordering
::Release
);
263 (*block
).next
.store(next_block
, Ordering
::Release
);
266 token
.list
.block
= block
as *const u8;
267 token
.list
.offset
= offset
;
272 block
= self.tail
.block
.load(Ordering
::Acquire
);
279 /// Writes a message into the channel.
280 pub unsafe fn write(&self, token
: &mut Token
, msg
: T
) -> Result
<(), T
> {
281 // If there is no slot, the channel is disconnected.
282 if token
.list
.block
.is_null() {
286 // Write the message into the slot.
287 let block
= token
.list
.block
as *mut Block
<T
>;
288 let offset
= token
.list
.offset
;
289 let slot
= (*block
).slots
.get_unchecked(offset
);
290 slot
.msg
.get().write(MaybeUninit
::new(msg
));
291 slot
.state
.fetch_or(WRITE
, Ordering
::Release
);
293 // Wake a sleeping receiver.
294 self.receivers
.notify();
298 /// Attempts to reserve a slot for receiving a message.
299 fn start_recv(&self, token
: &mut Token
) -> bool
{
300 let backoff
= Backoff
::new();
301 let mut head
= self.head
.index
.load(Ordering
::Acquire
);
302 let mut block
= self.head
.block
.load(Ordering
::Acquire
);
305 // Calculate the offset of the index into the block.
306 let offset
= (head
>> SHIFT
) % LAP
;
308 // If we reached the end of the block, wait until the next one is installed.
309 if offset
== BLOCK_CAP
{
311 head
= self.head
.index
.load(Ordering
::Acquire
);
312 block
= self.head
.block
.load(Ordering
::Acquire
);
316 let mut new_head
= head
+ (1 << SHIFT
);
318 if new_head
& MARK_BIT
== 0 {
319 atomic
::fence(Ordering
::SeqCst
);
320 let tail
= self.tail
.index
.load(Ordering
::Relaxed
);
322 // If the tail equals the head, that means the channel is empty.
323 if head
>> SHIFT
== tail
>> SHIFT
{
324 // If the channel is disconnected...
325 if tail
& MARK_BIT
!= 0 {
326 // ...then receive an error.
327 token
.list
.block
= ptr
::null();
330 // Otherwise, the receive operation is not ready.
335 // If head and tail are not in the same block, set `MARK_BIT` in head.
336 if (head
>> SHIFT
) / LAP
!= (tail
>> SHIFT
) / LAP
{
337 new_head
|= MARK_BIT
;
341 // The block can be null here only if the first message is being sent into the channel.
342 // In that case, just wait until it gets initialized.
345 head
= self.head
.index
.load(Ordering
::Acquire
);
346 block
= self.head
.block
.load(Ordering
::Acquire
);
350 // Try moving the head index forward.
351 match self.head
.index
.compare_exchange_weak(
358 // If we've reached the end of the block, move to the next one.
359 if offset
+ 1 == BLOCK_CAP
{
360 let next
= (*block
).wait_next();
361 let mut next_index
= (new_head
& !MARK_BIT
).wrapping_add(1 << SHIFT
);
362 if !(*next
).next
.load(Ordering
::Relaxed
).is_null() {
363 next_index
|= MARK_BIT
;
366 self.head
.block
.store(next
, Ordering
::Release
);
367 self.head
.index
.store(next_index
, Ordering
::Release
);
370 token
.list
.block
= block
as *const u8;
371 token
.list
.offset
= offset
;
376 block
= self.head
.block
.load(Ordering
::Acquire
);
383 /// Reads a message from the channel.
384 pub unsafe fn read(&self, token
: &mut Token
) -> Result
<T
, ()> {
385 if token
.list
.block
.is_null() {
386 // The channel is disconnected.
391 let block
= token
.list
.block
as *mut Block
<T
>;
392 let offset
= token
.list
.offset
;
393 let slot
= (*block
).slots
.get_unchecked(offset
);
395 let msg
= slot
.msg
.get().read().assume_init();
397 // Destroy the block if we've reached the end, or if another thread wanted to destroy but
398 // couldn't because we were busy reading from the slot.
399 if offset
+ 1 == BLOCK_CAP
{
400 Block
::destroy(block
, 0);
401 } else if slot
.state
.fetch_or(READ
, Ordering
::AcqRel
) & DESTROY
!= 0 {
402 Block
::destroy(block
, offset
+ 1);
408 /// Attempts to send a message into the channel.
409 pub fn try_send(&self, msg
: T
) -> Result
<(), TrySendError
<T
>> {
410 self.send(msg
, None
).map_err(|err
| match err
{
411 SendTimeoutError
::Disconnected(msg
) => TrySendError
::Disconnected(msg
),
412 SendTimeoutError
::Timeout(_
) => unreachable
!(),
416 /// Sends a message into the channel.
417 pub fn send(&self, msg
: T
, _deadline
: Option
<Instant
>) -> Result
<(), SendTimeoutError
<T
>> {
418 let token
= &mut Token
::default();
419 assert
!(self.start_send(token
));
421 self.write(token
, msg
)
422 .map_err(SendTimeoutError
::Disconnected
)
426 /// Attempts to receive a message without blocking.
427 pub fn try_recv(&self) -> Result
<T
, TryRecvError
> {
428 let token
= &mut Token
::default();
430 if self.start_recv(token
) {
431 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
433 Err(TryRecvError
::Empty
)
437 /// Receives a message from the channel.
438 pub fn recv(&self, deadline
: Option
<Instant
>) -> Result
<T
, RecvTimeoutError
> {
439 let token
= &mut Token
::default();
441 // Try receiving a message several times.
442 let backoff
= Backoff
::new();
444 if self.start_recv(token
) {
446 return self.read(token
).map_err(|_
| RecvTimeoutError
::Disconnected
);
450 if backoff
.is_completed() {
457 if let Some(d
) = deadline
{
458 if Instant
::now() >= d
{
459 return Err(RecvTimeoutError
::Timeout
);
463 // Prepare for blocking until a sender wakes us up.
465 let oper
= Operation
::hook(token
);
466 self.receivers
.register(oper
, cx
);
468 // Has the channel become ready just now?
469 if !self.is_empty() || self.is_disconnected() {
470 let _
= cx
.try_select(Selected
::Aborted
);
473 // Block the current thread.
474 let sel
= cx
.wait_until(deadline
);
477 Selected
::Waiting
=> unreachable
!(),
478 Selected
::Aborted
| Selected
::Disconnected
=> {
479 self.receivers
.unregister(oper
).unwrap();
480 // If the channel was disconnected, we still have to check for remaining
483 Selected
::Operation(_
) => {}
489 /// Returns the current number of messages inside the channel.
490 pub fn len(&self) -> usize {
492 // Load the tail index, then load the head index.
493 let mut tail
= self.tail
.index
.load(Ordering
::SeqCst
);
494 let mut head
= self.head
.index
.load(Ordering
::SeqCst
);
496 // If the tail index didn't change, we've got consistent indices to work with.
497 if self.tail
.index
.load(Ordering
::SeqCst
) == tail
{
498 // Erase the lower bits.
499 tail
&= !((1 << SHIFT
) - 1);
500 head
&= !((1 << SHIFT
) - 1);
502 // Rotate indices so that head falls into the first block.
503 let lap
= (head
>> SHIFT
) / LAP
;
504 tail
= tail
.wrapping_sub((lap
* LAP
) << SHIFT
);
505 head
= head
.wrapping_sub((lap
* LAP
) << SHIFT
);
507 // Remove the lower bits.
511 // Fix up indices if they fall onto block ends.
512 if head
== BLOCK_CAP
{
516 if tail
== BLOCK_CAP
{
520 // Return the difference minus the number of blocks between tail and head.
521 return tail
- head
- tail
/ LAP
;
526 /// Returns the capacity of the channel.
527 pub fn capacity(&self) -> Option
<usize> {
531 /// Disconnects the channel and wakes up all blocked receivers.
533 /// Returns `true` if this call disconnected the channel.
534 pub fn disconnect(&self) -> bool
{
535 let tail
= self.tail
.index
.fetch_or(MARK_BIT
, Ordering
::SeqCst
);
537 if tail
& MARK_BIT
== 0 {
538 self.receivers
.disconnect();
545 /// Returns `true` if the channel is disconnected.
546 pub fn is_disconnected(&self) -> bool
{
547 self.tail
.index
.load(Ordering
::SeqCst
) & MARK_BIT
!= 0
550 /// Returns `true` if the channel is empty.
551 pub fn is_empty(&self) -> bool
{
552 let head
= self.head
.index
.load(Ordering
::SeqCst
);
553 let tail
= self.tail
.index
.load(Ordering
::SeqCst
);
554 head
>> SHIFT
== tail
>> SHIFT
557 /// Returns `true` if the channel is full.
558 pub fn is_full(&self) -> bool
{
563 impl<T
> Drop
for Channel
<T
> {
565 let mut head
= self.head
.index
.load(Ordering
::Relaxed
);
566 let mut tail
= self.tail
.index
.load(Ordering
::Relaxed
);
567 let mut block
= self.head
.block
.load(Ordering
::Relaxed
);
569 // Erase the lower bits.
570 head
&= !((1 << SHIFT
) - 1);
571 tail
&= !((1 << SHIFT
) - 1);
574 // Drop all messages between head and tail and deallocate the heap-allocated blocks.
576 let offset
= (head
>> SHIFT
) % LAP
;
578 if offset
< BLOCK_CAP
{
579 // Drop the message in the slot.
580 let slot
= (*block
).slots
.get_unchecked(offset
);
581 let p
= &mut *slot
.msg
.get();
582 p
.as_mut_ptr().drop_in_place();
584 // Deallocate the block and move to the next one.
585 let next
= (*block
).next
.load(Ordering
::Relaxed
);
586 drop(Box
::from_raw(block
));
590 head
= head
.wrapping_add(1 << SHIFT
);
593 // Deallocate the last remaining block.
594 if !block
.is_null() {
595 drop(Box
::from_raw(block
));
601 /// Receiver handle to a channel.
602 pub struct Receiver
<'a
, T
: 'a
>(&'a Channel
<T
>);
604 /// Sender handle to a channel.
605 pub struct Sender
<'a
, T
: 'a
>(&'a Channel
<T
>);
607 impl<'a
, T
> SelectHandle
for Receiver
<'a
, T
> {
608 fn try_select(&self, token
: &mut Token
) -> bool
{
609 self.0.start_recv(token
)
612 fn deadline(&self) -> Option
<Instant
> {
616 fn register(&self, oper
: Operation
, cx
: &Context
) -> bool
{
617 self.0.receivers
.register(oper
, cx
);
621 fn unregister(&self, oper
: Operation
) {
622 self.0.receivers
.unregister(oper
);
625 fn accept(&self, token
: &mut Token
, _cx
: &Context
) -> bool
{
626 self.try_select(token
)
629 fn is_ready(&self) -> bool
{
630 !self.0.is_empty
() || self.0.is_disconnected
()
633 fn watch(&self, oper
: Operation
, cx
: &Context
) -> bool
{
634 self.0.receivers
.watch(oper
, cx
);
638 fn unwatch(&self, oper
: Operation
) {
639 self.0.receivers
.unwatch(oper
);
643 impl<'a
, T
> SelectHandle
for Sender
<'a
, T
> {
644 fn try_select(&self, token
: &mut Token
) -> bool
{
645 self.0.start_send(token
)
648 fn deadline(&self) -> Option
<Instant
> {
652 fn register(&self, _oper
: Operation
, _cx
: &Context
) -> bool
{
656 fn unregister(&self, _oper
: Operation
) {}
658 fn accept(&self, token
: &mut Token
, _cx
: &Context
) -> bool
{
659 self.try_select(token
)
662 fn is_ready(&self) -> bool
{
666 fn watch(&self, _oper
: Operation
, _cx
: &Context
) -> bool
{
670 fn unwatch(&self, _oper
: Operation
) {}