1 use futures
::channel
::mpsc
;
2 use futures
::executor
::block_on
;
3 use futures
::future
::Future
;
4 use futures
::sink
::SinkExt
;
5 use futures
::stream
::StreamExt
;
6 use futures
::task
::{Context, Poll}
;
8 use std
::sync
::{Arc, Weak}
;
10 use std
::time
::{Duration, Instant}
;
14 let (mut sender
, receiver
) = mpsc
::channel(1);
16 let t
= thread
::spawn(move || while let Ok(()) = block_on(sender
.send(42)) {}
);
18 // `receiver` needs to be dropped for `sender` to stop sending and therefore before the join.
19 block_on(receiver
.take(3).for_each(|_
| futures
::future
::ready(())));
25 fn multiple_senders_disconnect() {
27 let (mut tx1
, mut rx
) = mpsc
::channel(1);
28 let (tx2
, mut tx3
, mut tx4
) = (tx1
.clone(), tx1
.clone(), tx1
.clone());
30 // disconnect, dropping and Sink::poll_close should all close this sender but leave the
31 // channel open for other senders
34 block_on(tx3
.close()).unwrap();
36 assert
!(tx1
.is_closed());
37 assert
!(tx3
.is_closed());
38 assert
!(!tx4
.is_closed());
40 block_on(tx4
.send(5)).unwrap();
41 assert_eq
!(block_on(rx
.next()), Some(5));
43 // dropping the final sender will close the channel
45 assert_eq
!(block_on(rx
.next()), None
);
49 let (mut tx1
, mut rx
) = mpsc
::unbounded();
50 let (tx2
, mut tx3
, mut tx4
) = (tx1
.clone(), tx1
.clone(), tx1
.clone());
52 // disconnect, dropping and Sink::poll_close should all close this sender but leave the
53 // channel open for other senders
56 block_on(tx3
.close()).unwrap();
58 assert
!(tx1
.is_closed());
59 assert
!(tx3
.is_closed());
60 assert
!(!tx4
.is_closed());
62 block_on(tx4
.send(5)).unwrap();
63 assert_eq
!(block_on(rx
.next()), Some(5));
65 // dropping the final sender will close the channel
67 assert_eq
!(block_on(rx
.next()), None
);
72 fn multiple_senders_close_channel() {
74 let (mut tx1
, mut rx
) = mpsc
::channel(1);
75 let mut tx2
= tx1
.clone();
77 // close_channel should shut down the whole channel
80 assert
!(tx1
.is_closed());
81 assert
!(tx2
.is_closed());
83 let err
= block_on(tx2
.send(5)).unwrap_err();
84 assert
!(err
.is_disconnected());
86 assert_eq
!(block_on(rx
.next()), None
);
90 let (tx1
, mut rx
) = mpsc
::unbounded();
91 let mut tx2
= tx1
.clone();
93 // close_channel should shut down the whole channel
96 assert
!(tx1
.is_closed());
97 assert
!(tx2
.is_closed());
99 let err
= block_on(tx2
.send(5)).unwrap_err();
100 assert
!(err
.is_disconnected());
102 assert_eq
!(block_on(rx
.next()), None
);
107 fn single_receiver_drop_closes_channel_and_drains() {
109 let ref_count
= Arc
::new(0);
110 let weak_ref
= Arc
::downgrade(&ref_count
);
112 let (sender
, receiver
) = mpsc
::unbounded();
113 sender
.unbounded_send(ref_count
).expect("failed to send");
115 // Verify that the sent message is still live.
116 assert
!(weak_ref
.upgrade().is_some());
120 // The sender should know the channel is closed.
121 assert
!(sender
.is_closed());
123 // Verify that the sent message has been dropped.
124 assert
!(weak_ref
.upgrade().is_none());
128 let ref_count
= Arc
::new(0);
129 let weak_ref
= Arc
::downgrade(&ref_count
);
131 let (mut sender
, receiver
) = mpsc
::channel(1);
132 sender
.try_send(ref_count
).expect("failed to send");
134 // Verify that the sent message is still live.
135 assert
!(weak_ref
.upgrade().is_some());
139 // The sender should know the channel is closed.
140 assert
!(sender
.is_closed());
142 // Verify that the sent message has been dropped.
143 assert
!(weak_ref
.upgrade().is_none());
144 assert
!(sender
.is_closed());
148 // Stress test that `try_send()`s occurring concurrently with receiver
149 // close/drops don't appear as successful sends.
150 #[cfg_attr(miri, ignore)] // Miri is too slow
152 fn stress_try_send_as_receiver_closes() {
153 const AMT
: usize = 10000;
154 // To provide variable timing characteristics (in the hopes of
155 // reproducing the collision that leads to a race), we busy-re-poll
156 // the test MPSC receiver a variable number of times before actually
157 // stopping. We vary this countdown between 1 and the following
159 const MAX_COUNTDOWN
: usize = 20;
160 // When we detect that a successfully sent item is still in the
161 // queue after a disconnect, we spin for up to 100ms to confirm that
162 // it is a persistent condition and not a concurrency illusion.
163 const SPIN_TIMEOUT_S
: u64 = 10;
164 const SPIN_SLEEP_MS
: u64 = 10;
166 rx
: mpsc
::Receiver
<Arc
<()>>,
167 // The number of times to query `rx` before dropping it.
171 command_rx
: mpsc
::Receiver
<TestRx
>,
172 test_rx
: Option
<mpsc
::Receiver
<Arc
<()>>>,
176 /// Create a new TestTask
177 fn new() -> (TestTask
, mpsc
::Sender
<TestRx
>) {
178 let (command_tx
, command_rx
) = mpsc
::channel
::<TestRx
>(0);
183 countdown
: 0, // 0 means no countdown is in progress.
189 impl Future
for TestTask
{
192 fn poll(mut self: Pin
<&mut Self>, cx
: &mut Context
<'_
>) -> Poll
<Self::Output
> {
193 // Poll the test channel, if one is present.
194 if let Some(rx
) = &mut self.test_rx
{
195 if let Poll
::Ready(v
) = rx
.poll_next_unpin(cx
) {
196 let _
= v
.expect("test finished unexpectedly!");
199 // Busy-poll until the countdown is finished.
200 cx
.waker().wake_by_ref();
202 // Accept any newly submitted MPSC channels for testing.
203 match self.command_rx
.poll_next_unpin(cx
) {
204 Poll
::Ready(Some(TestRx { rx, poll_count }
)) => {
205 self.test_rx
= Some(rx
);
206 self.countdown
= poll_count
;
207 cx
.waker().wake_by_ref();
209 Poll
::Ready(None
) => return Poll
::Ready(()),
212 if self.countdown
== 0 {
213 // Countdown complete -- drop the Receiver.
219 let (f
, mut cmd_tx
) = TestTask
::new();
220 let bg
= thread
::spawn(move || block_on(f
));
222 let (mut test_tx
, rx
) = mpsc
::channel(0);
223 let poll_count
= i
% MAX_COUNTDOWN
;
224 cmd_tx
.try_send(TestRx { rx, poll_count }
).unwrap();
225 let mut prev_weak
: Option
<Weak
<()>> = None
;
226 let mut attempted_sends
= 0;
227 let mut successful_sends
= 0;
229 // Create a test item.
230 let item
= Arc
::new(());
231 let weak
= Arc
::downgrade(&item
);
232 match test_tx
.try_send(item
) {
234 prev_weak
= Some(weak
);
235 successful_sends
+= 1;
237 Err(ref e
) if e
.is_full() => {}
238 Err(ref e
) if e
.is_disconnected() => {
239 // Test for evidence of the race condition.
240 if let Some(prev_weak
) = prev_weak
{
241 if prev_weak
.upgrade().is_some() {
242 // The previously sent item is still allocated.
243 // However, there appears to be some aspect of the
244 // concurrency that can legitimately cause the Arc
245 // to be momentarily valid. Spin for up to 100ms
246 // waiting for the previously sent item to be
248 let t0
= Instant
::now();
251 if prev_weak
.upgrade().is_none() {
255 t0
.elapsed() < Duration
::from_secs(SPIN_TIMEOUT_S
),
256 "item not dropped on iteration {} after \
257 {} sends ({} successful). spin=({})",
264 thread
::sleep(Duration
::from_millis(SPIN_SLEEP_MS
));
270 Err(ref e
) => panic
!("unexpected error: {}", e
),
272 attempted_sends
+= 1;
276 bg
.join().expect("background thread join");
280 fn unbounded_try_next_after_none() {
281 let (tx
, mut rx
) = mpsc
::unbounded
::<String
>();
282 // Drop the sender, close the channel.
284 // Receive the end of channel.
285 assert_eq
!(Ok(None
), rx
.try_next().map_err(|_
| ()));
286 // None received, check we can call `try_next` again.
287 assert_eq
!(Ok(None
), rx
.try_next().map_err(|_
| ()));
291 fn bounded_try_next_after_none() {
292 let (tx
, mut rx
) = mpsc
::channel
::<String
>(17);
293 // Drop the sender, close the channel.
295 // Receive the end of channel.
296 assert_eq
!(Ok(None
), rx
.try_next().map_err(|_
| ()));
297 // None received, check we can call `try_next` again.
298 assert_eq
!(Ok(None
), rx
.try_next().map_err(|_
| ()));