]>
git.proxmox.com Git - rustc.git/blob - library/std/src/sync/mpsc/tests.rs
4 use crate::time
::{Duration, Instant}
;
6 pub fn stress_factor() -> usize {
7 match env
::var("RUST_TEST_STRESS") {
8 Ok(val
) => val
.parse().unwrap(),
15 let (tx
, rx
) = channel
::<i32>();
17 assert_eq
!(rx
.recv().unwrap(), 1);
22 let (tx
, _rx
) = channel
::<Box
<isize>>();
23 tx
.send(Box
::new(1)).unwrap();
27 fn drop_full_shared() {
28 let (tx
, _rx
) = channel
::<Box
<isize>>();
31 tx
.send(Box
::new(1)).unwrap();
36 let (tx
, rx
) = channel
::<i32>();
38 assert_eq
!(rx
.recv().unwrap(), 1);
41 assert_eq
!(rx
.recv().unwrap(), 1);
46 let (tx
, rx
) = channel
::<i32>();
47 let _t
= thread
::spawn(move || {
50 assert_eq
!(rx
.recv().unwrap(), 1);
54 fn smoke_port_gone() {
55 let (tx
, rx
) = channel
::<i32>();
57 assert
!(tx
.send(1).is_err());
61 fn smoke_shared_port_gone() {
62 let (tx
, rx
) = channel
::<i32>();
64 assert
!(tx
.send(1).is_err())
68 fn smoke_shared_port_gone2() {
69 let (tx
, rx
) = channel
::<i32>();
73 assert
!(tx2
.send(1).is_err());
77 fn port_gone_concurrent() {
78 let (tx
, rx
) = channel
::<i32>();
79 let _t
= thread
::spawn(move || {
82 while tx
.send(1).is_ok() {}
86 fn port_gone_concurrent_shared() {
87 let (tx
, rx
) = channel
::<i32>();
89 let _t
= thread
::spawn(move || {
92 while tx
.send(1).is_ok() && tx2
.send(1).is_ok() {}
96 fn smoke_chan_gone() {
97 let (tx
, rx
) = channel
::<i32>();
99 assert
!(rx
.recv().is_err());
103 fn smoke_chan_gone_shared() {
104 let (tx
, rx
) = channel
::<()>();
105 let tx2
= tx
.clone();
108 assert
!(rx
.recv().is_err());
112 fn chan_gone_concurrent() {
113 let (tx
, rx
) = channel
::<i32>();
114 let _t
= thread
::spawn(move || {
118 while rx
.recv().is_ok() {}
123 let (tx
, rx
) = channel
::<i32>();
124 let t
= thread
::spawn(move || {
130 assert_eq
!(rx
.recv().unwrap(), 1);
132 t
.join().ok().expect("thread panicked");
137 const AMT
: u32 = 10000;
138 const NTHREADS
: u32 = 8;
139 let (tx
, rx
) = channel
::<i32>();
141 let t
= thread
::spawn(move || {
142 for _
in 0..AMT
* NTHREADS
{
143 assert_eq
!(rx
.recv().unwrap(), 1);
145 match rx
.try_recv() {
151 for _
in 0..NTHREADS
{
153 thread
::spawn(move || {
160 t
.join().ok().expect("thread panicked");
164 fn send_from_outside_runtime() {
165 let (tx1
, rx1
) = channel
::<()>();
166 let (tx2
, rx2
) = channel
::<i32>();
167 let t1
= thread
::spawn(move || {
168 tx1
.send(()).unwrap();
170 assert_eq
!(rx2
.recv().unwrap(), 1);
174 let t2
= thread
::spawn(move || {
176 tx2
.send(1).unwrap();
179 t1
.join().ok().expect("thread panicked");
180 t2
.join().ok().expect("thread panicked");
184 fn recv_from_outside_runtime() {
185 let (tx
, rx
) = channel
::<i32>();
186 let t
= thread
::spawn(move || {
188 assert_eq
!(rx
.recv().unwrap(), 1);
194 t
.join().ok().expect("thread panicked");
199 let (tx1
, rx1
) = channel
::<i32>();
200 let (tx2
, rx2
) = channel
::<i32>();
201 let t1
= thread
::spawn(move || {
202 assert_eq
!(rx1
.recv().unwrap(), 1);
203 tx2
.send(2).unwrap();
205 let t2
= thread
::spawn(move || {
206 tx1
.send(1).unwrap();
207 assert_eq
!(rx2
.recv().unwrap(), 2);
209 t1
.join().ok().expect("thread panicked");
210 t2
.join().ok().expect("thread panicked");
214 fn oneshot_single_thread_close_port_first() {
215 // Simple test of closing without sending
216 let (_tx
, rx
) = channel
::<i32>();
221 fn oneshot_single_thread_close_chan_first() {
222 // Simple test of closing without sending
223 let (tx
, _rx
) = channel
::<i32>();
228 fn oneshot_single_thread_send_port_close() {
229 // Testing that the sender cleans up the payload if receiver is closed
230 let (tx
, rx
) = channel
::<Box
<i32>>();
232 assert
!(tx
.send(Box
::new(0)).is_err());
236 fn oneshot_single_thread_recv_chan_close() {
237 // Receiving on a closed chan will panic
238 let res
= thread
::spawn(move || {
239 let (tx
, rx
) = channel
::<i32>();
245 assert
!(res
.is_err());
249 fn oneshot_single_thread_send_then_recv() {
250 let (tx
, rx
) = channel
::<Box
<i32>>();
251 tx
.send(Box
::new(10)).unwrap();
252 assert
!(*rx
.recv().unwrap() == 10);
256 fn oneshot_single_thread_try_send_open() {
257 let (tx
, rx
) = channel
::<i32>();
258 assert
!(tx
.send(10).is_ok());
259 assert
!(rx
.recv().unwrap() == 10);
263 fn oneshot_single_thread_try_send_closed() {
264 let (tx
, rx
) = channel
::<i32>();
266 assert
!(tx
.send(10).is_err());
270 fn oneshot_single_thread_try_recv_open() {
271 let (tx
, rx
) = channel
::<i32>();
272 tx
.send(10).unwrap();
273 assert
!(rx
.recv() == Ok(10));
277 fn oneshot_single_thread_try_recv_closed() {
278 let (tx
, rx
) = channel
::<i32>();
280 assert
!(rx
.recv().is_err());
284 fn oneshot_single_thread_peek_data() {
285 let (tx
, rx
) = channel
::<i32>();
286 assert_eq
!(rx
.try_recv(), Err(TryRecvError
::Empty
));
287 tx
.send(10).unwrap();
288 assert_eq
!(rx
.try_recv(), Ok(10));
292 fn oneshot_single_thread_peek_close() {
293 let (tx
, rx
) = channel
::<i32>();
295 assert_eq
!(rx
.try_recv(), Err(TryRecvError
::Disconnected
));
296 assert_eq
!(rx
.try_recv(), Err(TryRecvError
::Disconnected
));
300 fn oneshot_single_thread_peek_open() {
301 let (_tx
, rx
) = channel
::<i32>();
302 assert_eq
!(rx
.try_recv(), Err(TryRecvError
::Empty
));
306 fn oneshot_multi_task_recv_then_send() {
307 let (tx
, rx
) = channel
::<Box
<i32>>();
308 let _t
= thread
::spawn(move || {
309 assert
!(*rx
.recv().unwrap() == 10);
312 tx
.send(Box
::new(10)).unwrap();
316 fn oneshot_multi_task_recv_then_close() {
317 let (tx
, rx
) = channel
::<Box
<i32>>();
318 let _t
= thread
::spawn(move || {
321 let res
= thread
::spawn(move || {
322 assert
!(*rx
.recv().unwrap() == 10);
325 assert
!(res
.is_err());
329 fn oneshot_multi_thread_close_stress() {
330 for _
in 0..stress_factor() {
331 let (tx
, rx
) = channel
::<i32>();
332 let _t
= thread
::spawn(move || {
340 fn oneshot_multi_thread_send_close_stress() {
341 for _
in 0..stress_factor() {
342 let (tx
, rx
) = channel
::<i32>();
343 let _t
= thread
::spawn(move || {
346 let _
= thread
::spawn(move || {
354 fn oneshot_multi_thread_recv_close_stress() {
355 for _
in 0..stress_factor() {
356 let (tx
, rx
) = channel
::<i32>();
357 thread
::spawn(move || {
358 let res
= thread
::spawn(move || {
362 assert
!(res
.is_err());
364 let _t
= thread
::spawn(move || {
365 thread
::spawn(move || {
373 fn oneshot_multi_thread_send_recv_stress() {
374 for _
in 0..stress_factor() {
375 let (tx
, rx
) = channel
::<Box
<isize>>();
376 let _t
= thread
::spawn(move || {
377 tx
.send(Box
::new(10)).unwrap();
379 assert
!(*rx
.recv().unwrap() == 10);
384 fn stream_send_recv_stress() {
385 for _
in 0..stress_factor() {
386 let (tx
, rx
) = channel();
391 fn send(tx
: Sender
<Box
<i32>>, i
: i32) {
396 thread
::spawn(move || {
397 tx
.send(Box
::new(i
)).unwrap();
402 fn recv(rx
: Receiver
<Box
<i32>>, i
: i32) {
407 thread
::spawn(move || {
408 assert
!(*rx
.recv().unwrap() == i
);
416 fn oneshot_single_thread_recv_timeout() {
417 let (tx
, rx
) = channel();
418 tx
.send(()).unwrap();
419 assert_eq
!(rx
.recv_timeout(Duration
::from_millis(1)), Ok(()));
420 assert_eq
!(rx
.recv_timeout(Duration
::from_millis(1)), Err(RecvTimeoutError
::Timeout
));
421 tx
.send(()).unwrap();
422 assert_eq
!(rx
.recv_timeout(Duration
::from_millis(1)), Ok(()));
426 fn stress_recv_timeout_two_threads() {
427 let (tx
, rx
) = channel();
428 let stress
= stress_factor() + 100;
429 let timeout
= Duration
::from_millis(100);
431 thread
::spawn(move || {
434 thread
::sleep(timeout
* 2);
436 tx
.send(1usize
).unwrap();
440 let mut recv_count
= 0;
442 match rx
.recv_timeout(timeout
) {
444 assert_eq
!(n
, 1usize
);
447 Err(RecvTimeoutError
::Timeout
) => continue,
448 Err(RecvTimeoutError
::Disconnected
) => break,
452 assert_eq
!(recv_count
, stress
);
456 fn recv_timeout_upgrade() {
457 let (tx
, rx
) = channel
::<()>();
458 let timeout
= Duration
::from_millis(1);
459 let _tx_clone
= tx
.clone();
461 let start
= Instant
::now();
462 assert_eq
!(rx
.recv_timeout(timeout
), Err(RecvTimeoutError
::Timeout
));
463 assert
!(Instant
::now() >= start
+ timeout
);
467 fn stress_recv_timeout_shared() {
468 let (tx
, rx
) = channel();
469 let stress
= stress_factor() + 100;
473 thread
::spawn(move || {
474 thread
::sleep(Duration
::from_millis(i
as u64 * 10));
475 tx
.send(1usize
).unwrap();
481 let mut recv_count
= 0;
483 match rx
.recv_timeout(Duration
::from_millis(10)) {
485 assert_eq
!(n
, 1usize
);
488 Err(RecvTimeoutError
::Timeout
) => continue,
489 Err(RecvTimeoutError
::Disconnected
) => break,
493 assert_eq
!(recv_count
, stress
);
497 fn very_long_recv_timeout_wont_panic() {
498 let (tx
, rx
) = channel
::<()>();
499 let join_handle
= thread
::spawn(move || rx
.recv_timeout(Duration
::from_secs(u64::MAX
)));
500 thread
::sleep(Duration
::from_secs(1));
501 assert
!(tx
.send(()).is_ok());
502 assert_eq
!(join_handle
.join().unwrap(), Ok(()));
507 // Regression test that we don't run out of stack in scheduler context
508 let (tx
, rx
) = channel();
510 tx
.send(()).unwrap();
518 fn shared_recv_timeout() {
519 let (tx
, rx
) = channel();
523 thread
::spawn(move || {
524 tx
.send(()).unwrap();
532 assert_eq
!(rx
.recv_timeout(Duration
::from_millis(1)), Err(RecvTimeoutError
::Timeout
));
533 tx
.send(()).unwrap();
534 assert_eq
!(rx
.recv_timeout(Duration
::from_millis(1)), Ok(()));
538 fn shared_chan_stress() {
539 let (tx
, rx
) = channel();
540 let total
= stress_factor() + 100;
543 thread
::spawn(move || {
544 tx
.send(()).unwrap();
554 fn test_nested_recv_iter() {
555 let (tx
, rx
) = channel
::<i32>();
556 let (total_tx
, total_rx
) = channel
::<i32>();
558 let _t
= thread
::spawn(move || {
563 total_tx
.send(acc
).unwrap();
570 assert_eq
!(total_rx
.recv().unwrap(), 6);
574 fn test_recv_iter_break() {
575 let (tx
, rx
) = channel
::<i32>();
576 let (count_tx
, count_rx
) = channel();
578 let _t
= thread
::spawn(move || {
587 count_tx
.send(count
).unwrap();
595 assert_eq
!(count_rx
.recv().unwrap(), 4);
599 fn test_recv_try_iter() {
600 let (request_tx
, request_rx
) = channel();
601 let (response_tx
, response_rx
) = channel();
603 // Request `x`s until we have `6`.
604 let t
= thread
::spawn(move || {
607 for x
in response_rx
.try_iter() {
613 request_tx
.send(()).unwrap();
617 for _
in request_rx
.iter() {
618 if response_tx
.send(2).is_err() {
623 assert_eq
!(t
.join().unwrap(), 6);
627 fn test_recv_into_iter_owned() {
629 let (tx
, rx
) = channel
::<i32>();
635 assert_eq
!(iter
.next().unwrap(), 1);
636 assert_eq
!(iter
.next().unwrap(), 2);
637 assert_eq
!(iter
.next().is_none(), true);
641 fn test_recv_into_iter_borrowed() {
642 let (tx
, rx
) = channel
::<i32>();
646 let mut iter
= (&rx
).into_iter();
647 assert_eq
!(iter
.next().unwrap(), 1);
648 assert_eq
!(iter
.next().unwrap(), 2);
649 assert_eq
!(iter
.next().is_none(), true);
653 fn try_recv_states() {
654 let (tx1
, rx1
) = channel
::<i32>();
655 let (tx2
, rx2
) = channel
::<()>();
656 let (tx3
, rx3
) = channel
::<()>();
657 let _t
= thread
::spawn(move || {
659 tx1
.send(1).unwrap();
660 tx3
.send(()).unwrap();
663 tx3
.send(()).unwrap();
666 assert_eq
!(rx1
.try_recv(), Err(TryRecvError
::Empty
));
667 tx2
.send(()).unwrap();
669 assert_eq
!(rx1
.try_recv(), Ok(1));
670 assert_eq
!(rx1
.try_recv(), Err(TryRecvError
::Empty
));
671 tx2
.send(()).unwrap();
673 assert_eq
!(rx1
.try_recv(), Err(TryRecvError
::Disconnected
));
676 // This bug used to end up in a livelock inside of the Receiver destructor
677 // because the internal state of the Shared packet was corrupted
679 fn destroy_upgraded_shared_port_when_sender_still_active() {
680 let (tx
, rx
) = channel();
681 let (tx2
, rx2
) = channel();
682 let _t
= thread
::spawn(move || {
683 rx
.recv().unwrap(); // wait on a oneshot
684 drop(rx
); // destroy a shared
685 tx2
.send(()).unwrap();
687 // make sure the other thread has gone to sleep
692 // upgrade to a shared chan and send a message
697 // wait for the child thread to exit before we exit
703 let (tx
, _
) = channel();
704 let _
= tx
.send(123);
705 assert_eq
!(tx
.send(123), Err(SendError(123)));