]> git.proxmox.com Git - rustc.git/blob - vendor/tokio/src/sync/mpsc/chan.rs
New upstream version 1.60.0+dfsg1
[rustc.git] / vendor / tokio / src / sync / mpsc / chan.rs
1 use crate::loom::cell::UnsafeCell;
2 use crate::loom::future::AtomicWaker;
3 use crate::loom::sync::atomic::AtomicUsize;
4 use crate::loom::sync::Arc;
5 use crate::sync::mpsc::list;
6 use crate::sync::notify::Notify;
7
8 use std::fmt;
9 use std::process;
10 use std::sync::atomic::Ordering::{AcqRel, Relaxed};
11 use std::task::Poll::{Pending, Ready};
12 use std::task::{Context, Poll};
13
14 /// Channel sender
15 pub(crate) struct Tx<T, S> {
16 inner: Arc<Chan<T, S>>,
17 }
18
19 impl<T, S: fmt::Debug> fmt::Debug for Tx<T, S> {
20 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
21 fmt.debug_struct("Tx").field("inner", &self.inner).finish()
22 }
23 }
24
25 /// Channel receiver
26 pub(crate) struct Rx<T, S: Semaphore> {
27 inner: Arc<Chan<T, S>>,
28 }
29
30 impl<T, S: Semaphore + fmt::Debug> fmt::Debug for Rx<T, S> {
31 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
32 fmt.debug_struct("Rx").field("inner", &self.inner).finish()
33 }
34 }
35
36 pub(crate) trait Semaphore {
37 fn is_idle(&self) -> bool;
38
39 fn add_permit(&self);
40
41 fn close(&self);
42
43 fn is_closed(&self) -> bool;
44 }
45
46 struct Chan<T, S> {
47 /// Notifies all tasks listening for the receiver being dropped
48 notify_rx_closed: Notify,
49
50 /// Handle to the push half of the lock-free list.
51 tx: list::Tx<T>,
52
53 /// Coordinates access to channel's capacity.
54 semaphore: S,
55
56 /// Receiver waker. Notified when a value is pushed into the channel.
57 rx_waker: AtomicWaker,
58
59 /// Tracks the number of outstanding sender handles.
60 ///
61 /// When this drops to zero, the send half of the channel is closed.
62 tx_count: AtomicUsize,
63
64 /// Only accessed by `Rx` handle.
65 rx_fields: UnsafeCell<RxFields<T>>,
66 }
67
68 impl<T, S> fmt::Debug for Chan<T, S>
69 where
70 S: fmt::Debug,
71 {
72 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
73 fmt.debug_struct("Chan")
74 .field("tx", &self.tx)
75 .field("semaphore", &self.semaphore)
76 .field("rx_waker", &self.rx_waker)
77 .field("tx_count", &self.tx_count)
78 .field("rx_fields", &"...")
79 .finish()
80 }
81 }
82
83 /// Fields only accessed by `Rx` handle.
84 struct RxFields<T> {
85 /// Channel receiver. This field is only accessed by the `Receiver` type.
86 list: list::Rx<T>,
87
88 /// `true` if `Rx::close` is called.
89 rx_closed: bool,
90 }
91
92 impl<T> fmt::Debug for RxFields<T> {
93 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
94 fmt.debug_struct("RxFields")
95 .field("list", &self.list)
96 .field("rx_closed", &self.rx_closed)
97 .finish()
98 }
99 }
100
101 unsafe impl<T: Send, S: Send> Send for Chan<T, S> {}
102 unsafe impl<T: Send, S: Sync> Sync for Chan<T, S> {}
103
104 pub(crate) fn channel<T, S: Semaphore>(semaphore: S) -> (Tx<T, S>, Rx<T, S>) {
105 let (tx, rx) = list::channel();
106
107 let chan = Arc::new(Chan {
108 notify_rx_closed: Notify::new(),
109 tx,
110 semaphore,
111 rx_waker: AtomicWaker::new(),
112 tx_count: AtomicUsize::new(1),
113 rx_fields: UnsafeCell::new(RxFields {
114 list: rx,
115 rx_closed: false,
116 }),
117 });
118
119 (Tx::new(chan.clone()), Rx::new(chan))
120 }
121
122 // ===== impl Tx =====
123
124 impl<T, S> Tx<T, S> {
125 fn new(chan: Arc<Chan<T, S>>) -> Tx<T, S> {
126 Tx { inner: chan }
127 }
128
129 pub(super) fn semaphore(&self) -> &S {
130 &self.inner.semaphore
131 }
132
133 /// Send a message and notify the receiver.
134 pub(crate) fn send(&self, value: T) {
135 self.inner.send(value);
136 }
137
138 /// Wake the receive half
139 pub(crate) fn wake_rx(&self) {
140 self.inner.rx_waker.wake();
141 }
142
143 /// Returns `true` if senders belong to the same channel.
144 pub(crate) fn same_channel(&self, other: &Self) -> bool {
145 Arc::ptr_eq(&self.inner, &other.inner)
146 }
147 }
148
149 impl<T, S: Semaphore> Tx<T, S> {
150 pub(crate) fn is_closed(&self) -> bool {
151 self.inner.semaphore.is_closed()
152 }
153
154 pub(crate) async fn closed(&self) {
155 // In order to avoid a race condition, we first request a notification,
156 // **then** check whether the semaphore is closed. If the semaphore is
157 // closed the notification request is dropped.
158 let notified = self.inner.notify_rx_closed.notified();
159
160 if self.inner.semaphore.is_closed() {
161 return;
162 }
163 notified.await;
164 }
165 }
166
167 impl<T, S> Clone for Tx<T, S> {
168 fn clone(&self) -> Tx<T, S> {
169 // Using a Relaxed ordering here is sufficient as the caller holds a
170 // strong ref to `self`, preventing a concurrent decrement to zero.
171 self.inner.tx_count.fetch_add(1, Relaxed);
172
173 Tx {
174 inner: self.inner.clone(),
175 }
176 }
177 }
178
179 impl<T, S> Drop for Tx<T, S> {
180 fn drop(&mut self) {
181 if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 {
182 return;
183 }
184
185 // Close the list, which sends a `Close` message
186 self.inner.tx.close();
187
188 // Notify the receiver
189 self.wake_rx();
190 }
191 }
192
193 // ===== impl Rx =====
194
195 impl<T, S: Semaphore> Rx<T, S> {
196 fn new(chan: Arc<Chan<T, S>>) -> Rx<T, S> {
197 Rx { inner: chan }
198 }
199
200 pub(crate) fn close(&mut self) {
201 self.inner.rx_fields.with_mut(|rx_fields_ptr| {
202 let rx_fields = unsafe { &mut *rx_fields_ptr };
203
204 if rx_fields.rx_closed {
205 return;
206 }
207
208 rx_fields.rx_closed = true;
209 });
210
211 self.inner.semaphore.close();
212 self.inner.notify_rx_closed.notify_waiters();
213 }
214
215 /// Receive the next value
216 pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
217 use super::block::Read::*;
218
219 // Keep track of task budget
220 let coop = ready!(crate::coop::poll_proceed(cx));
221
222 self.inner.rx_fields.with_mut(|rx_fields_ptr| {
223 let rx_fields = unsafe { &mut *rx_fields_ptr };
224
225 macro_rules! try_recv {
226 () => {
227 match rx_fields.list.pop(&self.inner.tx) {
228 Some(Value(value)) => {
229 self.inner.semaphore.add_permit();
230 coop.made_progress();
231 return Ready(Some(value));
232 }
233 Some(Closed) => {
234 // TODO: This check may not be required as it most
235 // likely can only return `true` at this point. A
236 // channel is closed when all tx handles are
237 // dropped. Dropping a tx handle releases memory,
238 // which ensures that if dropping the tx handle is
239 // visible, then all messages sent are also visible.
240 assert!(self.inner.semaphore.is_idle());
241 coop.made_progress();
242 return Ready(None);
243 }
244 None => {} // fall through
245 }
246 };
247 }
248
249 try_recv!();
250
251 self.inner.rx_waker.register_by_ref(cx.waker());
252
253 // It is possible that a value was pushed between attempting to read
254 // and registering the task, so we have to check the channel a
255 // second time here.
256 try_recv!();
257
258 if rx_fields.rx_closed && self.inner.semaphore.is_idle() {
259 coop.made_progress();
260 Ready(None)
261 } else {
262 Pending
263 }
264 })
265 }
266 }
267
268 impl<T, S: Semaphore> Drop for Rx<T, S> {
269 fn drop(&mut self) {
270 use super::block::Read::Value;
271
272 self.close();
273
274 self.inner.rx_fields.with_mut(|rx_fields_ptr| {
275 let rx_fields = unsafe { &mut *rx_fields_ptr };
276
277 while let Some(Value(_)) = rx_fields.list.pop(&self.inner.tx) {
278 self.inner.semaphore.add_permit();
279 }
280 })
281 }
282 }
283
284 // ===== impl Chan =====
285
286 impl<T, S> Chan<T, S> {
287 fn send(&self, value: T) {
288 // Push the value
289 self.tx.push(value);
290
291 // Notify the rx task
292 self.rx_waker.wake();
293 }
294 }
295
296 impl<T, S> Drop for Chan<T, S> {
297 fn drop(&mut self) {
298 use super::block::Read::Value;
299
300 // Safety: the only owner of the rx fields is Chan, and eing
301 // inside its own Drop means we're the last ones to touch it.
302 self.rx_fields.with_mut(|rx_fields_ptr| {
303 let rx_fields = unsafe { &mut *rx_fields_ptr };
304
305 while let Some(Value(_)) = rx_fields.list.pop(&self.tx) {}
306 unsafe { rx_fields.list.free_blocks() };
307 });
308 }
309 }
310
311 // ===== impl Semaphore for (::Semaphore, capacity) =====
312
313 impl Semaphore for (crate::sync::batch_semaphore::Semaphore, usize) {
314 fn add_permit(&self) {
315 self.0.release(1)
316 }
317
318 fn is_idle(&self) -> bool {
319 self.0.available_permits() == self.1
320 }
321
322 fn close(&self) {
323 self.0.close();
324 }
325
326 fn is_closed(&self) -> bool {
327 self.0.is_closed()
328 }
329 }
330
331 // ===== impl Semaphore for AtomicUsize =====
332
333 use std::sync::atomic::Ordering::{Acquire, Release};
334 use std::usize;
335
336 impl Semaphore for AtomicUsize {
337 fn add_permit(&self) {
338 let prev = self.fetch_sub(2, Release);
339
340 if prev >> 1 == 0 {
341 // Something went wrong
342 process::abort();
343 }
344 }
345
346 fn is_idle(&self) -> bool {
347 self.load(Acquire) >> 1 == 0
348 }
349
350 fn close(&self) {
351 self.fetch_or(1, Release);
352 }
353
354 fn is_closed(&self) -> bool {
355 self.load(Acquire) & 1 == 1
356 }
357 }