]> git.proxmox.com Git - rustc.git/blob - library/std/src/sync/mpsc/tests.rs
New upstream version 1.48.0~beta.8+dfsg1
[rustc.git] / library / std / src / sync / mpsc / tests.rs
1 use super::*;
2 use crate::env;
3 use crate::thread;
4 use crate::time::{Duration, Instant};
5
6 pub fn stress_factor() -> usize {
7 match env::var("RUST_TEST_STRESS") {
8 Ok(val) => val.parse().unwrap(),
9 Err(..) => 1,
10 }
11 }
12
13 #[test]
14 fn smoke() {
15 let (tx, rx) = channel::<i32>();
16 tx.send(1).unwrap();
17 assert_eq!(rx.recv().unwrap(), 1);
18 }
19
20 #[test]
21 fn drop_full() {
22 let (tx, _rx) = channel::<Box<isize>>();
23 tx.send(box 1).unwrap();
24 }
25
26 #[test]
27 fn drop_full_shared() {
28 let (tx, _rx) = channel::<Box<isize>>();
29 drop(tx.clone());
30 drop(tx.clone());
31 tx.send(box 1).unwrap();
32 }
33
34 #[test]
35 fn smoke_shared() {
36 let (tx, rx) = channel::<i32>();
37 tx.send(1).unwrap();
38 assert_eq!(rx.recv().unwrap(), 1);
39 let tx = tx.clone();
40 tx.send(1).unwrap();
41 assert_eq!(rx.recv().unwrap(), 1);
42 }
43
44 #[test]
45 fn smoke_threads() {
46 let (tx, rx) = channel::<i32>();
47 let _t = thread::spawn(move || {
48 tx.send(1).unwrap();
49 });
50 assert_eq!(rx.recv().unwrap(), 1);
51 }
52
53 #[test]
54 fn smoke_port_gone() {
55 let (tx, rx) = channel::<i32>();
56 drop(rx);
57 assert!(tx.send(1).is_err());
58 }
59
60 #[test]
61 fn smoke_shared_port_gone() {
62 let (tx, rx) = channel::<i32>();
63 drop(rx);
64 assert!(tx.send(1).is_err())
65 }
66
67 #[test]
68 fn smoke_shared_port_gone2() {
69 let (tx, rx) = channel::<i32>();
70 drop(rx);
71 let tx2 = tx.clone();
72 drop(tx);
73 assert!(tx2.send(1).is_err());
74 }
75
76 #[test]
77 fn port_gone_concurrent() {
78 let (tx, rx) = channel::<i32>();
79 let _t = thread::spawn(move || {
80 rx.recv().unwrap();
81 });
82 while tx.send(1).is_ok() {}
83 }
84
85 #[test]
86 fn port_gone_concurrent_shared() {
87 let (tx, rx) = channel::<i32>();
88 let tx2 = tx.clone();
89 let _t = thread::spawn(move || {
90 rx.recv().unwrap();
91 });
92 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
93 }
94
95 #[test]
96 fn smoke_chan_gone() {
97 let (tx, rx) = channel::<i32>();
98 drop(tx);
99 assert!(rx.recv().is_err());
100 }
101
102 #[test]
103 fn smoke_chan_gone_shared() {
104 let (tx, rx) = channel::<()>();
105 let tx2 = tx.clone();
106 drop(tx);
107 drop(tx2);
108 assert!(rx.recv().is_err());
109 }
110
111 #[test]
112 fn chan_gone_concurrent() {
113 let (tx, rx) = channel::<i32>();
114 let _t = thread::spawn(move || {
115 tx.send(1).unwrap();
116 tx.send(1).unwrap();
117 });
118 while rx.recv().is_ok() {}
119 }
120
121 #[test]
122 fn stress() {
123 let (tx, rx) = channel::<i32>();
124 let t = thread::spawn(move || {
125 for _ in 0..10000 {
126 tx.send(1).unwrap();
127 }
128 });
129 for _ in 0..10000 {
130 assert_eq!(rx.recv().unwrap(), 1);
131 }
132 t.join().ok().expect("thread panicked");
133 }
134
135 #[test]
136 fn stress_shared() {
137 const AMT: u32 = 10000;
138 const NTHREADS: u32 = 8;
139 let (tx, rx) = channel::<i32>();
140
141 let t = thread::spawn(move || {
142 for _ in 0..AMT * NTHREADS {
143 assert_eq!(rx.recv().unwrap(), 1);
144 }
145 match rx.try_recv() {
146 Ok(..) => panic!(),
147 _ => {}
148 }
149 });
150
151 for _ in 0..NTHREADS {
152 let tx = tx.clone();
153 thread::spawn(move || {
154 for _ in 0..AMT {
155 tx.send(1).unwrap();
156 }
157 });
158 }
159 drop(tx);
160 t.join().ok().expect("thread panicked");
161 }
162
163 #[test]
164 fn send_from_outside_runtime() {
165 let (tx1, rx1) = channel::<()>();
166 let (tx2, rx2) = channel::<i32>();
167 let t1 = thread::spawn(move || {
168 tx1.send(()).unwrap();
169 for _ in 0..40 {
170 assert_eq!(rx2.recv().unwrap(), 1);
171 }
172 });
173 rx1.recv().unwrap();
174 let t2 = thread::spawn(move || {
175 for _ in 0..40 {
176 tx2.send(1).unwrap();
177 }
178 });
179 t1.join().ok().expect("thread panicked");
180 t2.join().ok().expect("thread panicked");
181 }
182
183 #[test]
184 fn recv_from_outside_runtime() {
185 let (tx, rx) = channel::<i32>();
186 let t = thread::spawn(move || {
187 for _ in 0..40 {
188 assert_eq!(rx.recv().unwrap(), 1);
189 }
190 });
191 for _ in 0..40 {
192 tx.send(1).unwrap();
193 }
194 t.join().ok().expect("thread panicked");
195 }
196
197 #[test]
198 fn no_runtime() {
199 let (tx1, rx1) = channel::<i32>();
200 let (tx2, rx2) = channel::<i32>();
201 let t1 = thread::spawn(move || {
202 assert_eq!(rx1.recv().unwrap(), 1);
203 tx2.send(2).unwrap();
204 });
205 let t2 = thread::spawn(move || {
206 tx1.send(1).unwrap();
207 assert_eq!(rx2.recv().unwrap(), 2);
208 });
209 t1.join().ok().expect("thread panicked");
210 t2.join().ok().expect("thread panicked");
211 }
212
213 #[test]
214 fn oneshot_single_thread_close_port_first() {
215 // Simple test of closing without sending
216 let (_tx, rx) = channel::<i32>();
217 drop(rx);
218 }
219
220 #[test]
221 fn oneshot_single_thread_close_chan_first() {
222 // Simple test of closing without sending
223 let (tx, _rx) = channel::<i32>();
224 drop(tx);
225 }
226
227 #[test]
228 fn oneshot_single_thread_send_port_close() {
229 // Testing that the sender cleans up the payload if receiver is closed
230 let (tx, rx) = channel::<Box<i32>>();
231 drop(rx);
232 assert!(tx.send(box 0).is_err());
233 }
234
235 #[test]
236 fn oneshot_single_thread_recv_chan_close() {
237 // Receiving on a closed chan will panic
238 let res = thread::spawn(move || {
239 let (tx, rx) = channel::<i32>();
240 drop(tx);
241 rx.recv().unwrap();
242 })
243 .join();
244 // What is our res?
245 assert!(res.is_err());
246 }
247
248 #[test]
249 fn oneshot_single_thread_send_then_recv() {
250 let (tx, rx) = channel::<Box<i32>>();
251 tx.send(box 10).unwrap();
252 assert!(*rx.recv().unwrap() == 10);
253 }
254
255 #[test]
256 fn oneshot_single_thread_try_send_open() {
257 let (tx, rx) = channel::<i32>();
258 assert!(tx.send(10).is_ok());
259 assert!(rx.recv().unwrap() == 10);
260 }
261
262 #[test]
263 fn oneshot_single_thread_try_send_closed() {
264 let (tx, rx) = channel::<i32>();
265 drop(rx);
266 assert!(tx.send(10).is_err());
267 }
268
269 #[test]
270 fn oneshot_single_thread_try_recv_open() {
271 let (tx, rx) = channel::<i32>();
272 tx.send(10).unwrap();
273 assert!(rx.recv() == Ok(10));
274 }
275
276 #[test]
277 fn oneshot_single_thread_try_recv_closed() {
278 let (tx, rx) = channel::<i32>();
279 drop(tx);
280 assert!(rx.recv().is_err());
281 }
282
283 #[test]
284 fn oneshot_single_thread_peek_data() {
285 let (tx, rx) = channel::<i32>();
286 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
287 tx.send(10).unwrap();
288 assert_eq!(rx.try_recv(), Ok(10));
289 }
290
291 #[test]
292 fn oneshot_single_thread_peek_close() {
293 let (tx, rx) = channel::<i32>();
294 drop(tx);
295 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
296 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
297 }
298
299 #[test]
300 fn oneshot_single_thread_peek_open() {
301 let (_tx, rx) = channel::<i32>();
302 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
303 }
304
305 #[test]
306 fn oneshot_multi_task_recv_then_send() {
307 let (tx, rx) = channel::<Box<i32>>();
308 let _t = thread::spawn(move || {
309 assert!(*rx.recv().unwrap() == 10);
310 });
311
312 tx.send(box 10).unwrap();
313 }
314
315 #[test]
316 fn oneshot_multi_task_recv_then_close() {
317 let (tx, rx) = channel::<Box<i32>>();
318 let _t = thread::spawn(move || {
319 drop(tx);
320 });
321 let res = thread::spawn(move || {
322 assert!(*rx.recv().unwrap() == 10);
323 })
324 .join();
325 assert!(res.is_err());
326 }
327
328 #[test]
329 fn oneshot_multi_thread_close_stress() {
330 for _ in 0..stress_factor() {
331 let (tx, rx) = channel::<i32>();
332 let _t = thread::spawn(move || {
333 drop(rx);
334 });
335 drop(tx);
336 }
337 }
338
339 #[test]
340 fn oneshot_multi_thread_send_close_stress() {
341 for _ in 0..stress_factor() {
342 let (tx, rx) = channel::<i32>();
343 let _t = thread::spawn(move || {
344 drop(rx);
345 });
346 let _ = thread::spawn(move || {
347 tx.send(1).unwrap();
348 })
349 .join();
350 }
351 }
352
353 #[test]
354 fn oneshot_multi_thread_recv_close_stress() {
355 for _ in 0..stress_factor() {
356 let (tx, rx) = channel::<i32>();
357 thread::spawn(move || {
358 let res = thread::spawn(move || {
359 rx.recv().unwrap();
360 })
361 .join();
362 assert!(res.is_err());
363 });
364 let _t = thread::spawn(move || {
365 thread::spawn(move || {
366 drop(tx);
367 });
368 });
369 }
370 }
371
372 #[test]
373 fn oneshot_multi_thread_send_recv_stress() {
374 for _ in 0..stress_factor() {
375 let (tx, rx) = channel::<Box<isize>>();
376 let _t = thread::spawn(move || {
377 tx.send(box 10).unwrap();
378 });
379 assert!(*rx.recv().unwrap() == 10);
380 }
381 }
382
383 #[test]
384 fn stream_send_recv_stress() {
385 for _ in 0..stress_factor() {
386 let (tx, rx) = channel();
387
388 send(tx, 0);
389 recv(rx, 0);
390
391 fn send(tx: Sender<Box<i32>>, i: i32) {
392 if i == 10 {
393 return;
394 }
395
396 thread::spawn(move || {
397 tx.send(box i).unwrap();
398 send(tx, i + 1);
399 });
400 }
401
402 fn recv(rx: Receiver<Box<i32>>, i: i32) {
403 if i == 10 {
404 return;
405 }
406
407 thread::spawn(move || {
408 assert!(*rx.recv().unwrap() == i);
409 recv(rx, i + 1);
410 });
411 }
412 }
413 }
414
415 #[test]
416 fn oneshot_single_thread_recv_timeout() {
417 let (tx, rx) = channel();
418 tx.send(()).unwrap();
419 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
420 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
421 tx.send(()).unwrap();
422 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
423 }
424
425 #[test]
426 fn stress_recv_timeout_two_threads() {
427 let (tx, rx) = channel();
428 let stress = stress_factor() + 100;
429 let timeout = Duration::from_millis(100);
430
431 thread::spawn(move || {
432 for i in 0..stress {
433 if i % 2 == 0 {
434 thread::sleep(timeout * 2);
435 }
436 tx.send(1usize).unwrap();
437 }
438 });
439
440 let mut recv_count = 0;
441 loop {
442 match rx.recv_timeout(timeout) {
443 Ok(n) => {
444 assert_eq!(n, 1usize);
445 recv_count += 1;
446 }
447 Err(RecvTimeoutError::Timeout) => continue,
448 Err(RecvTimeoutError::Disconnected) => break,
449 }
450 }
451
452 assert_eq!(recv_count, stress);
453 }
454
455 #[test]
456 fn recv_timeout_upgrade() {
457 let (tx, rx) = channel::<()>();
458 let timeout = Duration::from_millis(1);
459 let _tx_clone = tx.clone();
460
461 let start = Instant::now();
462 assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
463 assert!(Instant::now() >= start + timeout);
464 }
465
466 #[test]
467 fn stress_recv_timeout_shared() {
468 let (tx, rx) = channel();
469 let stress = stress_factor() + 100;
470
471 for i in 0..stress {
472 let tx = tx.clone();
473 thread::spawn(move || {
474 thread::sleep(Duration::from_millis(i as u64 * 10));
475 tx.send(1usize).unwrap();
476 });
477 }
478
479 drop(tx);
480
481 let mut recv_count = 0;
482 loop {
483 match rx.recv_timeout(Duration::from_millis(10)) {
484 Ok(n) => {
485 assert_eq!(n, 1usize);
486 recv_count += 1;
487 }
488 Err(RecvTimeoutError::Timeout) => continue,
489 Err(RecvTimeoutError::Disconnected) => break,
490 }
491 }
492
493 assert_eq!(recv_count, stress);
494 }
495
496 #[test]
497 fn very_long_recv_timeout_wont_panic() {
498 let (tx, rx) = channel::<()>();
499 let join_handle = thread::spawn(move || rx.recv_timeout(Duration::from_secs(u64::MAX)));
500 thread::sleep(Duration::from_secs(1));
501 assert!(tx.send(()).is_ok());
502 assert_eq!(join_handle.join().unwrap(), Ok(()));
503 }
504
505 #[test]
506 fn recv_a_lot() {
507 // Regression test that we don't run out of stack in scheduler context
508 let (tx, rx) = channel();
509 for _ in 0..10000 {
510 tx.send(()).unwrap();
511 }
512 for _ in 0..10000 {
513 rx.recv().unwrap();
514 }
515 }
516
517 #[test]
518 fn shared_recv_timeout() {
519 let (tx, rx) = channel();
520 let total = 5;
521 for _ in 0..total {
522 let tx = tx.clone();
523 thread::spawn(move || {
524 tx.send(()).unwrap();
525 });
526 }
527
528 for _ in 0..total {
529 rx.recv().unwrap();
530 }
531
532 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
533 tx.send(()).unwrap();
534 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
535 }
536
537 #[test]
538 fn shared_chan_stress() {
539 let (tx, rx) = channel();
540 let total = stress_factor() + 100;
541 for _ in 0..total {
542 let tx = tx.clone();
543 thread::spawn(move || {
544 tx.send(()).unwrap();
545 });
546 }
547
548 for _ in 0..total {
549 rx.recv().unwrap();
550 }
551 }
552
553 #[test]
554 fn test_nested_recv_iter() {
555 let (tx, rx) = channel::<i32>();
556 let (total_tx, total_rx) = channel::<i32>();
557
558 let _t = thread::spawn(move || {
559 let mut acc = 0;
560 for x in rx.iter() {
561 acc += x;
562 }
563 total_tx.send(acc).unwrap();
564 });
565
566 tx.send(3).unwrap();
567 tx.send(1).unwrap();
568 tx.send(2).unwrap();
569 drop(tx);
570 assert_eq!(total_rx.recv().unwrap(), 6);
571 }
572
573 #[test]
574 fn test_recv_iter_break() {
575 let (tx, rx) = channel::<i32>();
576 let (count_tx, count_rx) = channel();
577
578 let _t = thread::spawn(move || {
579 let mut count = 0;
580 for x in rx.iter() {
581 if count >= 3 {
582 break;
583 } else {
584 count += x;
585 }
586 }
587 count_tx.send(count).unwrap();
588 });
589
590 tx.send(2).unwrap();
591 tx.send(2).unwrap();
592 tx.send(2).unwrap();
593 let _ = tx.send(2);
594 drop(tx);
595 assert_eq!(count_rx.recv().unwrap(), 4);
596 }
597
598 #[test]
599 fn test_recv_try_iter() {
600 let (request_tx, request_rx) = channel();
601 let (response_tx, response_rx) = channel();
602
603 // Request `x`s until we have `6`.
604 let t = thread::spawn(move || {
605 let mut count = 0;
606 loop {
607 for x in response_rx.try_iter() {
608 count += x;
609 if count == 6 {
610 return count;
611 }
612 }
613 request_tx.send(()).unwrap();
614 }
615 });
616
617 for _ in request_rx.iter() {
618 if response_tx.send(2).is_err() {
619 break;
620 }
621 }
622
623 assert_eq!(t.join().unwrap(), 6);
624 }
625
626 #[test]
627 fn test_recv_into_iter_owned() {
628 let mut iter = {
629 let (tx, rx) = channel::<i32>();
630 tx.send(1).unwrap();
631 tx.send(2).unwrap();
632
633 rx.into_iter()
634 };
635 assert_eq!(iter.next().unwrap(), 1);
636 assert_eq!(iter.next().unwrap(), 2);
637 assert_eq!(iter.next().is_none(), true);
638 }
639
640 #[test]
641 fn test_recv_into_iter_borrowed() {
642 let (tx, rx) = channel::<i32>();
643 tx.send(1).unwrap();
644 tx.send(2).unwrap();
645 drop(tx);
646 let mut iter = (&rx).into_iter();
647 assert_eq!(iter.next().unwrap(), 1);
648 assert_eq!(iter.next().unwrap(), 2);
649 assert_eq!(iter.next().is_none(), true);
650 }
651
652 #[test]
653 fn try_recv_states() {
654 let (tx1, rx1) = channel::<i32>();
655 let (tx2, rx2) = channel::<()>();
656 let (tx3, rx3) = channel::<()>();
657 let _t = thread::spawn(move || {
658 rx2.recv().unwrap();
659 tx1.send(1).unwrap();
660 tx3.send(()).unwrap();
661 rx2.recv().unwrap();
662 drop(tx1);
663 tx3.send(()).unwrap();
664 });
665
666 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
667 tx2.send(()).unwrap();
668 rx3.recv().unwrap();
669 assert_eq!(rx1.try_recv(), Ok(1));
670 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
671 tx2.send(()).unwrap();
672 rx3.recv().unwrap();
673 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
674 }
675
676 // This bug used to end up in a livelock inside of the Receiver destructor
677 // because the internal state of the Shared packet was corrupted
678 #[test]
679 fn destroy_upgraded_shared_port_when_sender_still_active() {
680 let (tx, rx) = channel();
681 let (tx2, rx2) = channel();
682 let _t = thread::spawn(move || {
683 rx.recv().unwrap(); // wait on a oneshot
684 drop(rx); // destroy a shared
685 tx2.send(()).unwrap();
686 });
687 // make sure the other thread has gone to sleep
688 for _ in 0..5000 {
689 thread::yield_now();
690 }
691
692 // upgrade to a shared chan and send a message
693 let t = tx.clone();
694 drop(tx);
695 t.send(()).unwrap();
696
697 // wait for the child thread to exit before we exit
698 rx2.recv().unwrap();
699 }
700
701 #[test]
702 fn issue_32114() {
703 let (tx, _) = channel();
704 let _ = tx.send(123);
705 assert_eq!(tx.send(123), Err(SendError(123)));
706 }