]>
git.proxmox.com Git - rustc.git/blob - library/std/src/sync/mpsc/sync_tests.rs
3 use crate::sync
::mpmc
::SendTimeoutError
;
5 use crate::time
::Duration
;
7 pub fn stress_factor() -> usize {
8 match env
::var("RUST_TEST_STRESS") {
9 Ok(val
) => val
.parse().unwrap(),
16 let (tx
, rx
) = sync_channel
::<i32>(1);
18 assert_eq
!(rx
.recv().unwrap(), 1);
23 let (tx
, _rx
) = sync_channel
::<Box
<isize>>(1);
24 tx
.send(Box
::new(1)).unwrap();
29 let (tx
, rx
) = sync_channel
::<i32>(1);
31 assert_eq
!(rx
.recv().unwrap(), 1);
34 assert_eq
!(rx
.recv().unwrap(), 1);
39 let (tx
, rx
) = sync_channel
::<i32>(1);
40 assert_eq
!(rx
.recv_timeout(Duration
::from_millis(1)), Err(RecvTimeoutError
::Timeout
));
42 assert_eq
!(rx
.recv_timeout(Duration
::from_millis(1)), Ok(1));
47 let (tx
, _rx
) = sync_channel
::<i32>(1);
48 assert_eq
!(tx
.send_timeout(1, Duration
::from_millis(1)), Ok(()));
49 assert_eq
!(tx
.send_timeout(1, Duration
::from_millis(1)), Err(SendTimeoutError
::Timeout(1)));
54 let (tx
, rx
) = sync_channel
::<i32>(0);
55 let _t
= thread
::spawn(move || {
58 assert_eq
!(rx
.recv().unwrap(), 1);
62 fn smoke_port_gone() {
63 let (tx
, rx
) = sync_channel
::<i32>(0);
65 assert
!(tx
.send(1).is_err());
69 fn smoke_shared_port_gone2() {
70 let (tx
, rx
) = sync_channel
::<i32>(0);
74 assert
!(tx2
.send(1).is_err());
78 fn port_gone_concurrent() {
79 let (tx
, rx
) = sync_channel
::<i32>(0);
80 let _t
= thread
::spawn(move || {
83 while tx
.send(1).is_ok() {}
87 fn port_gone_concurrent_shared() {
88 let (tx
, rx
) = sync_channel
::<i32>(0);
90 let _t
= thread
::spawn(move || {
93 while tx
.send(1).is_ok() && tx2
.send(1).is_ok() {}
97 fn smoke_chan_gone() {
98 let (tx
, rx
) = sync_channel
::<i32>(0);
100 assert
!(rx
.recv().is_err());
104 fn smoke_chan_gone_shared() {
105 let (tx
, rx
) = sync_channel
::<()>(0);
106 let tx2
= tx
.clone();
109 assert
!(rx
.recv().is_err());
113 fn chan_gone_concurrent() {
114 let (tx
, rx
) = sync_channel
::<i32>(0);
115 thread
::spawn(move || {
119 while rx
.recv().is_ok() {}
124 let count
= if cfg
!(miri
) { 100 }
else { 10000 }
;
125 let (tx
, rx
) = sync_channel
::<i32>(0);
126 thread
::spawn(move || {
132 assert_eq
!(rx
.recv().unwrap(), 1);
137 fn stress_recv_timeout_two_threads() {
138 let count
= if cfg
!(miri
) { 100 }
else { 10000 }
;
139 let (tx
, rx
) = sync_channel
::<i32>(0);
141 thread
::spawn(move || {
147 let mut recv_count
= 0;
149 match rx
.recv_timeout(Duration
::from_millis(1)) {
154 Err(RecvTimeoutError
::Timeout
) => continue,
155 Err(RecvTimeoutError
::Disconnected
) => break,
159 assert_eq
!(recv_count
, count
);
163 fn stress_recv_timeout_shared() {
164 const AMT
: u32 = if cfg
!(miri
) { 100 }
else { 1000 }
;
165 const NTHREADS
: u32 = 8;
166 let (tx
, rx
) = sync_channel
::<i32>(0);
167 let (dtx
, drx
) = sync_channel
::<()>(0);
169 thread
::spawn(move || {
170 let mut recv_count
= 0;
172 match rx
.recv_timeout(Duration
::from_millis(10)) {
177 Err(RecvTimeoutError
::Timeout
) => continue,
178 Err(RecvTimeoutError
::Disconnected
) => break,
182 assert_eq
!(recv_count
, AMT
* NTHREADS
);
183 assert
!(rx
.try_recv().is_err());
185 dtx
.send(()).unwrap();
188 for _
in 0..NTHREADS
{
190 thread
::spawn(move || {
204 const AMT
: u32 = if cfg
!(miri
) { 100 }
else { 1000 }
;
205 const NTHREADS
: u32 = 8;
206 let (tx
, rx
) = sync_channel
::<i32>(0);
207 let (dtx
, drx
) = sync_channel
::<()>(0);
209 thread
::spawn(move || {
210 for _
in 0..AMT
* NTHREADS
{
211 assert_eq
!(rx
.recv().unwrap(), 1);
213 match rx
.try_recv() {
217 dtx
.send(()).unwrap();
220 for _
in 0..NTHREADS
{
222 thread
::spawn(move || {
233 fn oneshot_single_thread_close_port_first() {
234 // Simple test of closing without sending
235 let (_tx
, rx
) = sync_channel
::<i32>(0);
240 fn oneshot_single_thread_close_chan_first() {
241 // Simple test of closing without sending
242 let (tx
, _rx
) = sync_channel
::<i32>(0);
247 fn oneshot_single_thread_send_port_close() {
248 // Testing that the sender cleans up the payload if receiver is closed
249 let (tx
, rx
) = sync_channel
::<Box
<i32>>(0);
251 assert
!(tx
.send(Box
::new(0)).is_err());
255 fn oneshot_single_thread_recv_chan_close() {
256 // Receiving on a closed chan will panic
257 let res
= thread
::spawn(move || {
258 let (tx
, rx
) = sync_channel
::<i32>(0);
264 assert
!(res
.is_err());
268 fn oneshot_single_thread_send_then_recv() {
269 let (tx
, rx
) = sync_channel
::<Box
<i32>>(1);
270 tx
.send(Box
::new(10)).unwrap();
271 assert
!(*rx
.recv().unwrap() == 10);
275 fn oneshot_single_thread_try_send_open() {
276 let (tx
, rx
) = sync_channel
::<i32>(1);
277 assert_eq
!(tx
.try_send(10), Ok(()));
278 assert
!(rx
.recv().unwrap() == 10);
282 fn oneshot_single_thread_try_send_closed() {
283 let (tx
, rx
) = sync_channel
::<i32>(0);
285 assert_eq
!(tx
.try_send(10), Err(TrySendError
::Disconnected(10)));
289 fn oneshot_single_thread_try_send_closed2() {
290 let (tx
, _rx
) = sync_channel
::<i32>(0);
291 assert_eq
!(tx
.try_send(10), Err(TrySendError
::Full(10)));
295 fn oneshot_single_thread_try_recv_open() {
296 let (tx
, rx
) = sync_channel
::<i32>(1);
297 tx
.send(10).unwrap();
298 assert
!(rx
.recv() == Ok(10));
302 fn oneshot_single_thread_try_recv_closed() {
303 let (tx
, rx
) = sync_channel
::<i32>(0);
305 assert
!(rx
.recv().is_err());
309 fn oneshot_single_thread_try_recv_closed_with_data() {
310 let (tx
, rx
) = sync_channel
::<i32>(1);
311 tx
.send(10).unwrap();
313 assert_eq
!(rx
.try_recv(), Ok(10));
314 assert_eq
!(rx
.try_recv(), Err(TryRecvError
::Disconnected
));
318 fn oneshot_single_thread_peek_data() {
319 let (tx
, rx
) = sync_channel
::<i32>(1);
320 assert_eq
!(rx
.try_recv(), Err(TryRecvError
::Empty
));
321 tx
.send(10).unwrap();
322 assert_eq
!(rx
.try_recv(), Ok(10));
326 fn oneshot_single_thread_peek_close() {
327 let (tx
, rx
) = sync_channel
::<i32>(0);
329 assert_eq
!(rx
.try_recv(), Err(TryRecvError
::Disconnected
));
330 assert_eq
!(rx
.try_recv(), Err(TryRecvError
::Disconnected
));
334 fn oneshot_single_thread_peek_open() {
335 let (_tx
, rx
) = sync_channel
::<i32>(0);
336 assert_eq
!(rx
.try_recv(), Err(TryRecvError
::Empty
));
340 fn oneshot_multi_task_recv_then_send() {
341 let (tx
, rx
) = sync_channel
::<Box
<i32>>(0);
342 let _t
= thread
::spawn(move || {
343 assert
!(*rx
.recv().unwrap() == 10);
346 tx
.send(Box
::new(10)).unwrap();
350 fn oneshot_multi_task_recv_then_close() {
351 let (tx
, rx
) = sync_channel
::<Box
<i32>>(0);
352 let _t
= thread
::spawn(move || {
355 let res
= thread
::spawn(move || {
356 assert
!(*rx
.recv().unwrap() == 10);
359 assert
!(res
.is_err());
363 fn oneshot_multi_thread_close_stress() {
364 for _
in 0..stress_factor() {
365 let (tx
, rx
) = sync_channel
::<i32>(0);
366 let _t
= thread
::spawn(move || {
374 fn oneshot_multi_thread_send_close_stress() {
375 for _
in 0..stress_factor() {
376 let (tx
, rx
) = sync_channel
::<i32>(0);
377 let _t
= thread
::spawn(move || {
380 let _
= thread
::spawn(move || {
388 fn oneshot_multi_thread_recv_close_stress() {
389 for _
in 0..stress_factor() {
390 let (tx
, rx
) = sync_channel
::<i32>(0);
391 let _t
= thread
::spawn(move || {
392 let res
= thread
::spawn(move || {
396 assert
!(res
.is_err());
398 let _t
= thread
::spawn(move || {
399 thread
::spawn(move || {
407 fn oneshot_multi_thread_send_recv_stress() {
408 for _
in 0..stress_factor() {
409 let (tx
, rx
) = sync_channel
::<Box
<i32>>(0);
410 let _t
= thread
::spawn(move || {
411 tx
.send(Box
::new(10)).unwrap();
413 assert
!(*rx
.recv().unwrap() == 10);
418 fn stream_send_recv_stress() {
419 for _
in 0..stress_factor() {
420 let (tx
, rx
) = sync_channel
::<Box
<i32>>(0);
425 fn send(tx
: SyncSender
<Box
<i32>>, i
: i32) {
430 thread
::spawn(move || {
431 tx
.send(Box
::new(i
)).unwrap();
436 fn recv(rx
: Receiver
<Box
<i32>>, i
: i32) {
441 thread
::spawn(move || {
442 assert
!(*rx
.recv().unwrap() == i
);
451 let count
= if cfg
!(miri
) { 1000 }
else { 10000 }
;
452 // Regression test that we don't run out of stack in scheduler context
453 let (tx
, rx
) = sync_channel(count
);
455 tx
.send(()).unwrap();
463 fn shared_chan_stress() {
464 let (tx
, rx
) = sync_channel(0);
465 let total
= stress_factor() + 100;
468 thread
::spawn(move || {
469 tx
.send(()).unwrap();
479 fn test_nested_recv_iter() {
480 let (tx
, rx
) = sync_channel
::<i32>(0);
481 let (total_tx
, total_rx
) = sync_channel
::<i32>(0);
483 let _t
= thread
::spawn(move || {
488 total_tx
.send(acc
).unwrap();
495 assert_eq
!(total_rx
.recv().unwrap(), 6);
499 fn test_recv_iter_break() {
500 let (tx
, rx
) = sync_channel
::<i32>(0);
501 let (count_tx
, count_rx
) = sync_channel(0);
503 let _t
= thread
::spawn(move || {
512 count_tx
.send(count
).unwrap();
518 let _
= tx
.try_send(2);
520 assert_eq
!(count_rx
.recv().unwrap(), 4);
524 fn try_recv_states() {
525 let (tx1
, rx1
) = sync_channel
::<i32>(1);
526 let (tx2
, rx2
) = sync_channel
::<()>(1);
527 let (tx3
, rx3
) = sync_channel
::<()>(1);
528 let _t
= thread
::spawn(move || {
530 tx1
.send(1).unwrap();
531 tx3
.send(()).unwrap();
534 tx3
.send(()).unwrap();
537 assert_eq
!(rx1
.try_recv(), Err(TryRecvError
::Empty
));
538 tx2
.send(()).unwrap();
540 assert_eq
!(rx1
.try_recv(), Ok(1));
541 assert_eq
!(rx1
.try_recv(), Err(TryRecvError
::Empty
));
542 tx2
.send(()).unwrap();
544 assert_eq
!(rx1
.try_recv(), Err(TryRecvError
::Disconnected
));
547 // This bug used to end up in a livelock inside of the Receiver destructor
548 // because the internal state of the Shared packet was corrupted
550 fn destroy_upgraded_shared_port_when_sender_still_active() {
551 let (tx
, rx
) = sync_channel
::<()>(0);
552 let (tx2
, rx2
) = sync_channel
::<()>(0);
553 let _t
= thread
::spawn(move || {
554 rx
.recv().unwrap(); // wait on a oneshot
555 drop(rx
); // destroy a shared
556 tx2
.send(()).unwrap();
558 // make sure the other thread has gone to sleep
563 // upgrade to a shared chan and send a message
568 // wait for the child thread to exit before we exit
574 let (tx
, rx
) = sync_channel
::<i32>(0);
575 let _t
= thread
::spawn(move || {
578 assert_eq
!(tx
.send(1), Ok(()));
583 let (tx
, rx
) = sync_channel
::<i32>(0);
584 let _t
= thread
::spawn(move || {
587 assert
!(tx
.send(1).is_err());
592 let (tx
, rx
) = sync_channel
::<i32>(1);
593 assert_eq
!(tx
.send(1), Ok(()));
594 let _t
= thread
::spawn(move || {
597 assert
!(tx
.send(1).is_err());
602 let (tx
, rx
) = sync_channel
::<i32>(0);
603 let tx2
= tx
.clone();
604 let (done
, donerx
) = channel();
605 let done2
= done
.clone();
606 let _t
= thread
::spawn(move || {
607 assert
!(tx
.send(1).is_err());
608 done
.send(()).unwrap();
610 let _t
= thread
::spawn(move || {
611 assert
!(tx2
.send(2).is_err());
612 done2
.send(()).unwrap();
615 donerx
.recv().unwrap();
616 donerx
.recv().unwrap();
621 let (tx
, _rx
) = sync_channel
::<i32>(0);
622 assert_eq
!(tx
.try_send(1), Err(TrySendError
::Full(1)));
627 let (tx
, _rx
) = sync_channel
::<i32>(1);
628 assert_eq
!(tx
.try_send(1), Ok(()));
629 assert_eq
!(tx
.try_send(1), Err(TrySendError
::Full(1)));
634 let (tx
, rx
) = sync_channel
::<i32>(1);
635 assert_eq
!(tx
.try_send(1), Ok(()));
637 assert_eq
!(tx
.try_send(1), Err(TrySendError
::Disconnected(1)));
643 let (tx1
, rx1
) = sync_channel
::<()>(3);
644 let (tx2
, rx2
) = sync_channel
::<()>(3);
646 let _t
= thread
::spawn(move || {
648 tx2
.try_send(()).unwrap();
651 tx1
.try_send(()).unwrap();