1 use crate::loom
::cell
::UnsafeCell
;
2 use crate::loom
::future
::AtomicWaker
;
3 use crate::loom
::sync
::atomic
::AtomicUsize
;
4 use crate::loom
::sync
::Arc
;
5 use crate::sync
::mpsc
::list
;
6 use crate::sync
::notify
::Notify
;
10 use std
::sync
::atomic
::Ordering
::{AcqRel, Relaxed}
;
11 use std
::task
::Poll
::{Pending, Ready}
;
12 use std
::task
::{Context, Poll}
;
15 pub(crate) struct Tx
<T
, S
> {
16 inner
: Arc
<Chan
<T
, S
>>,
19 impl<T
, S
: fmt
::Debug
> fmt
::Debug
for Tx
<T
, S
> {
20 fn fmt(&self, fmt
: &mut fmt
::Formatter
<'_
>) -> fmt
::Result
{
21 fmt
.debug_struct("Tx").field("inner", &self.inner
).finish()
26 pub(crate) struct Rx
<T
, S
: Semaphore
> {
27 inner
: Arc
<Chan
<T
, S
>>,
30 impl<T
, S
: Semaphore
+ fmt
::Debug
> fmt
::Debug
for Rx
<T
, S
> {
31 fn fmt(&self, fmt
: &mut fmt
::Formatter
<'_
>) -> fmt
::Result
{
32 fmt
.debug_struct("Rx").field("inner", &self.inner
).finish()
36 pub(crate) trait Semaphore
{
37 fn is_idle(&self) -> bool
;
43 fn is_closed(&self) -> bool
;
47 /// Notifies all tasks listening for the receiver being dropped
48 notify_rx_closed
: Notify
,
50 /// Handle to the push half of the lock-free list.
53 /// Coordinates access to channel's capacity.
56 /// Receiver waker. Notified when a value is pushed into the channel.
57 rx_waker
: AtomicWaker
,
59 /// Tracks the number of outstanding sender handles.
61 /// When this drops to zero, the send half of the channel is closed.
62 tx_count
: AtomicUsize
,
64 /// Only accessed by `Rx` handle.
65 rx_fields
: UnsafeCell
<RxFields
<T
>>,
68 impl<T
, S
> fmt
::Debug
for Chan
<T
, S
>
72 fn fmt(&self, fmt
: &mut fmt
::Formatter
<'_
>) -> fmt
::Result
{
73 fmt
.debug_struct("Chan")
74 .field("tx", &self.tx
)
75 .field("semaphore", &self.semaphore
)
76 .field("rx_waker", &self.rx_waker
)
77 .field("tx_count", &self.tx_count
)
78 .field("rx_fields", &"...")
83 /// Fields only accessed by `Rx` handle.
85 /// Channel receiver. This field is only accessed by the `Receiver` type.
88 /// `true` if `Rx::close` is called.
92 impl<T
> fmt
::Debug
for RxFields
<T
> {
93 fn fmt(&self, fmt
: &mut fmt
::Formatter
<'_
>) -> fmt
::Result
{
94 fmt
.debug_struct("RxFields")
95 .field("list", &self.list
)
96 .field("rx_closed", &self.rx_closed
)
101 unsafe impl<T
: Send
, S
: Send
> Send
for Chan
<T
, S
> {}
102 unsafe impl<T
: Send
, S
: Sync
> Sync
for Chan
<T
, S
> {}
104 pub(crate) fn channel
<T
, S
: Semaphore
>(semaphore
: S
) -> (Tx
<T
, S
>, Rx
<T
, S
>) {
105 let (tx
, rx
) = list
::channel();
107 let chan
= Arc
::new(Chan
{
108 notify_rx_closed
: Notify
::new(),
111 rx_waker
: AtomicWaker
::new(),
112 tx_count
: AtomicUsize
::new(1),
113 rx_fields
: UnsafeCell
::new(RxFields
{
119 (Tx
::new(chan
.clone()), Rx
::new(chan
))
122 // ===== impl Tx =====
124 impl<T
, S
> Tx
<T
, S
> {
125 fn new(chan
: Arc
<Chan
<T
, S
>>) -> Tx
<T
, S
> {
129 pub(super) fn semaphore(&self) -> &S
{
130 &self.inner
.semaphore
133 /// Send a message and notify the receiver.
134 pub(crate) fn send(&self, value
: T
) {
135 self.inner
.send(value
);
138 /// Wake the receive half
139 pub(crate) fn wake_rx(&self) {
140 self.inner
.rx_waker
.wake();
143 /// Returns `true` if senders belong to the same channel.
144 pub(crate) fn same_channel(&self, other
: &Self) -> bool
{
145 Arc
::ptr_eq(&self.inner
, &other
.inner
)
149 impl<T
, S
: Semaphore
> Tx
<T
, S
> {
150 pub(crate) fn is_closed(&self) -> bool
{
151 self.inner
.semaphore
.is_closed()
154 pub(crate) async
fn closed(&self) {
155 // In order to avoid a race condition, we first request a notification,
156 // **then** check whether the semaphore is closed. If the semaphore is
157 // closed the notification request is dropped.
158 let notified
= self.inner
.notify_rx_closed
.notified();
160 if self.inner
.semaphore
.is_closed() {
167 impl<T
, S
> Clone
for Tx
<T
, S
> {
168 fn clone(&self) -> Tx
<T
, S
> {
169 // Using a Relaxed ordering here is sufficient as the caller holds a
170 // strong ref to `self`, preventing a concurrent decrement to zero.
171 self.inner
.tx_count
.fetch_add(1, Relaxed
);
174 inner
: self.inner
.clone(),
179 impl<T
, S
> Drop
for Tx
<T
, S
> {
181 if self.inner
.tx_count
.fetch_sub(1, AcqRel
) != 1 {
185 // Close the list, which sends a `Close` message
186 self.inner
.tx
.close();
188 // Notify the receiver
193 // ===== impl Rx =====
195 impl<T
, S
: Semaphore
> Rx
<T
, S
> {
196 fn new(chan
: Arc
<Chan
<T
, S
>>) -> Rx
<T
, S
> {
200 pub(crate) fn close(&mut self) {
201 self.inner
.rx_fields
.with_mut(|rx_fields_ptr
| {
202 let rx_fields
= unsafe { &mut *rx_fields_ptr }
;
204 if rx_fields
.rx_closed
{
208 rx_fields
.rx_closed
= true;
211 self.inner
.semaphore
.close();
212 self.inner
.notify_rx_closed
.notify_waiters();
215 /// Receive the next value
216 pub(crate) fn recv(&mut self, cx
: &mut Context
<'_
>) -> Poll
<Option
<T
>> {
217 use super::block
::Read
::*;
219 // Keep track of task budget
220 let coop
= ready
!(crate::coop
::poll_proceed(cx
));
222 self.inner
.rx_fields
.with_mut(|rx_fields_ptr
| {
223 let rx_fields
= unsafe { &mut *rx_fields_ptr }
;
225 macro_rules
! try_recv
{
227 match rx_fields
.list
.pop(&self.inner
.tx
) {
228 Some(Value(value
)) => {
229 self.inner
.semaphore
.add_permit();
230 coop
.made_progress();
231 return Ready(Some(value
));
234 // TODO: This check may not be required as it most
235 // likely can only return `true` at this point. A
236 // channel is closed when all tx handles are
237 // dropped. Dropping a tx handle releases memory,
238 // which ensures that if dropping the tx handle is
239 // visible, then all messages sent are also visible.
240 assert
!(self.inner
.semaphore
.is_idle());
241 coop
.made_progress();
244 None
=> {}
// fall through
251 self.inner
.rx_waker
.register_by_ref(cx
.waker());
253 // It is possible that a value was pushed between attempting to read
254 // and registering the task, so we have to check the channel a
258 if rx_fields
.rx_closed
&& self.inner
.semaphore
.is_idle() {
259 coop
.made_progress();
268 impl<T
, S
: Semaphore
> Drop
for Rx
<T
, S
> {
270 use super::block
::Read
::Value
;
274 self.inner
.rx_fields
.with_mut(|rx_fields_ptr
| {
275 let rx_fields
= unsafe { &mut *rx_fields_ptr }
;
277 while let Some(Value(_
)) = rx_fields
.list
.pop(&self.inner
.tx
) {
278 self.inner
.semaphore
.add_permit();
284 // ===== impl Chan =====
286 impl<T
, S
> Chan
<T
, S
> {
287 fn send(&self, value
: T
) {
291 // Notify the rx task
292 self.rx_waker
.wake();
296 impl<T
, S
> Drop
for Chan
<T
, S
> {
298 use super::block
::Read
::Value
;
300 // Safety: the only owner of the rx fields is Chan, and eing
301 // inside its own Drop means we're the last ones to touch it.
302 self.rx_fields
.with_mut(|rx_fields_ptr
| {
303 let rx_fields
= unsafe { &mut *rx_fields_ptr }
;
305 while let Some(Value(_
)) = rx_fields
.list
.pop(&self.tx
) {}
306 unsafe { rx_fields.list.free_blocks() }
;
311 // ===== impl Semaphore for (::Semaphore, capacity) =====
313 impl Semaphore
for (crate::sync
::batch_semaphore
::Semaphore
, usize) {
314 fn add_permit(&self) {
318 fn is_idle(&self) -> bool
{
319 self.0.available_permits() == self.1
326 fn is_closed(&self) -> bool
{
331 // ===== impl Semaphore for AtomicUsize =====
333 use std
::sync
::atomic
::Ordering
::{Acquire, Release}
;
336 impl Semaphore
for AtomicUsize
{
337 fn add_permit(&self) {
338 let prev
= self.fetch_sub(2, Release
);
341 // Something went wrong
346 fn is_idle(&self) -> bool
{
347 self.load(Acquire
) >> 1 == 0
351 self.fetch_or(1, Release
);
354 fn is_closed(&self) -> bool
{
355 self.load(Acquire
) & 1 == 1