]> git.proxmox.com Git - rustc.git/blob - library/std/src/sync/mpsc/sync_tests.rs
New upstream version 1.67.1+dfsg1
[rustc.git] / library / std / src / sync / mpsc / sync_tests.rs
1 use super::*;
2 use crate::env;
3 use crate::sync::mpmc::SendTimeoutError;
4 use crate::thread;
5 use crate::time::Duration;
6
7 pub fn stress_factor() -> usize {
8 match env::var("RUST_TEST_STRESS") {
9 Ok(val) => val.parse().unwrap(),
10 Err(..) => 1,
11 }
12 }
13
14 #[test]
15 fn smoke() {
16 let (tx, rx) = sync_channel::<i32>(1);
17 tx.send(1).unwrap();
18 assert_eq!(rx.recv().unwrap(), 1);
19 }
20
21 #[test]
22 fn drop_full() {
23 let (tx, _rx) = sync_channel::<Box<isize>>(1);
24 tx.send(Box::new(1)).unwrap();
25 }
26
27 #[test]
28 fn smoke_shared() {
29 let (tx, rx) = sync_channel::<i32>(1);
30 tx.send(1).unwrap();
31 assert_eq!(rx.recv().unwrap(), 1);
32 let tx = tx.clone();
33 tx.send(1).unwrap();
34 assert_eq!(rx.recv().unwrap(), 1);
35 }
36
37 #[test]
38 fn recv_timeout() {
39 let (tx, rx) = sync_channel::<i32>(1);
40 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
41 tx.send(1).unwrap();
42 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
43 }
44
45 #[test]
46 fn send_timeout() {
47 let (tx, _rx) = sync_channel::<i32>(1);
48 assert_eq!(tx.send_timeout(1, Duration::from_millis(1)), Ok(()));
49 assert_eq!(tx.send_timeout(1, Duration::from_millis(1)), Err(SendTimeoutError::Timeout(1)));
50 }
51
52 #[test]
53 fn smoke_threads() {
54 let (tx, rx) = sync_channel::<i32>(0);
55 let _t = thread::spawn(move || {
56 tx.send(1).unwrap();
57 });
58 assert_eq!(rx.recv().unwrap(), 1);
59 }
60
61 #[test]
62 fn smoke_port_gone() {
63 let (tx, rx) = sync_channel::<i32>(0);
64 drop(rx);
65 assert!(tx.send(1).is_err());
66 }
67
68 #[test]
69 fn smoke_shared_port_gone2() {
70 let (tx, rx) = sync_channel::<i32>(0);
71 drop(rx);
72 let tx2 = tx.clone();
73 drop(tx);
74 assert!(tx2.send(1).is_err());
75 }
76
77 #[test]
78 fn port_gone_concurrent() {
79 let (tx, rx) = sync_channel::<i32>(0);
80 let _t = thread::spawn(move || {
81 rx.recv().unwrap();
82 });
83 while tx.send(1).is_ok() {}
84 }
85
86 #[test]
87 fn port_gone_concurrent_shared() {
88 let (tx, rx) = sync_channel::<i32>(0);
89 let tx2 = tx.clone();
90 let _t = thread::spawn(move || {
91 rx.recv().unwrap();
92 });
93 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
94 }
95
96 #[test]
97 fn smoke_chan_gone() {
98 let (tx, rx) = sync_channel::<i32>(0);
99 drop(tx);
100 assert!(rx.recv().is_err());
101 }
102
103 #[test]
104 fn smoke_chan_gone_shared() {
105 let (tx, rx) = sync_channel::<()>(0);
106 let tx2 = tx.clone();
107 drop(tx);
108 drop(tx2);
109 assert!(rx.recv().is_err());
110 }
111
112 #[test]
113 fn chan_gone_concurrent() {
114 let (tx, rx) = sync_channel::<i32>(0);
115 thread::spawn(move || {
116 tx.send(1).unwrap();
117 tx.send(1).unwrap();
118 });
119 while rx.recv().is_ok() {}
120 }
121
122 #[test]
123 fn stress() {
124 let count = if cfg!(miri) { 100 } else { 10000 };
125 let (tx, rx) = sync_channel::<i32>(0);
126 thread::spawn(move || {
127 for _ in 0..count {
128 tx.send(1).unwrap();
129 }
130 });
131 for _ in 0..count {
132 assert_eq!(rx.recv().unwrap(), 1);
133 }
134 }
135
136 #[test]
137 fn stress_recv_timeout_two_threads() {
138 let count = if cfg!(miri) { 100 } else { 10000 };
139 let (tx, rx) = sync_channel::<i32>(0);
140
141 thread::spawn(move || {
142 for _ in 0..count {
143 tx.send(1).unwrap();
144 }
145 });
146
147 let mut recv_count = 0;
148 loop {
149 match rx.recv_timeout(Duration::from_millis(1)) {
150 Ok(v) => {
151 assert_eq!(v, 1);
152 recv_count += 1;
153 }
154 Err(RecvTimeoutError::Timeout) => continue,
155 Err(RecvTimeoutError::Disconnected) => break,
156 }
157 }
158
159 assert_eq!(recv_count, count);
160 }
161
162 #[test]
163 fn stress_recv_timeout_shared() {
164 const AMT: u32 = if cfg!(miri) { 100 } else { 1000 };
165 const NTHREADS: u32 = 8;
166 let (tx, rx) = sync_channel::<i32>(0);
167 let (dtx, drx) = sync_channel::<()>(0);
168
169 thread::spawn(move || {
170 let mut recv_count = 0;
171 loop {
172 match rx.recv_timeout(Duration::from_millis(10)) {
173 Ok(v) => {
174 assert_eq!(v, 1);
175 recv_count += 1;
176 }
177 Err(RecvTimeoutError::Timeout) => continue,
178 Err(RecvTimeoutError::Disconnected) => break,
179 }
180 }
181
182 assert_eq!(recv_count, AMT * NTHREADS);
183 assert!(rx.try_recv().is_err());
184
185 dtx.send(()).unwrap();
186 });
187
188 for _ in 0..NTHREADS {
189 let tx = tx.clone();
190 thread::spawn(move || {
191 for _ in 0..AMT {
192 tx.send(1).unwrap();
193 }
194 });
195 }
196
197 drop(tx);
198
199 drx.recv().unwrap();
200 }
201
202 #[test]
203 fn stress_shared() {
204 const AMT: u32 = if cfg!(miri) { 100 } else { 1000 };
205 const NTHREADS: u32 = 8;
206 let (tx, rx) = sync_channel::<i32>(0);
207 let (dtx, drx) = sync_channel::<()>(0);
208
209 thread::spawn(move || {
210 for _ in 0..AMT * NTHREADS {
211 assert_eq!(rx.recv().unwrap(), 1);
212 }
213 match rx.try_recv() {
214 Ok(..) => panic!(),
215 _ => {}
216 }
217 dtx.send(()).unwrap();
218 });
219
220 for _ in 0..NTHREADS {
221 let tx = tx.clone();
222 thread::spawn(move || {
223 for _ in 0..AMT {
224 tx.send(1).unwrap();
225 }
226 });
227 }
228 drop(tx);
229 drx.recv().unwrap();
230 }
231
232 #[test]
233 fn oneshot_single_thread_close_port_first() {
234 // Simple test of closing without sending
235 let (_tx, rx) = sync_channel::<i32>(0);
236 drop(rx);
237 }
238
239 #[test]
240 fn oneshot_single_thread_close_chan_first() {
241 // Simple test of closing without sending
242 let (tx, _rx) = sync_channel::<i32>(0);
243 drop(tx);
244 }
245
246 #[test]
247 fn oneshot_single_thread_send_port_close() {
248 // Testing that the sender cleans up the payload if receiver is closed
249 let (tx, rx) = sync_channel::<Box<i32>>(0);
250 drop(rx);
251 assert!(tx.send(Box::new(0)).is_err());
252 }
253
254 #[test]
255 fn oneshot_single_thread_recv_chan_close() {
256 // Receiving on a closed chan will panic
257 let res = thread::spawn(move || {
258 let (tx, rx) = sync_channel::<i32>(0);
259 drop(tx);
260 rx.recv().unwrap();
261 })
262 .join();
263 // What is our res?
264 assert!(res.is_err());
265 }
266
267 #[test]
268 fn oneshot_single_thread_send_then_recv() {
269 let (tx, rx) = sync_channel::<Box<i32>>(1);
270 tx.send(Box::new(10)).unwrap();
271 assert!(*rx.recv().unwrap() == 10);
272 }
273
274 #[test]
275 fn oneshot_single_thread_try_send_open() {
276 let (tx, rx) = sync_channel::<i32>(1);
277 assert_eq!(tx.try_send(10), Ok(()));
278 assert!(rx.recv().unwrap() == 10);
279 }
280
281 #[test]
282 fn oneshot_single_thread_try_send_closed() {
283 let (tx, rx) = sync_channel::<i32>(0);
284 drop(rx);
285 assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
286 }
287
288 #[test]
289 fn oneshot_single_thread_try_send_closed2() {
290 let (tx, _rx) = sync_channel::<i32>(0);
291 assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
292 }
293
294 #[test]
295 fn oneshot_single_thread_try_recv_open() {
296 let (tx, rx) = sync_channel::<i32>(1);
297 tx.send(10).unwrap();
298 assert!(rx.recv() == Ok(10));
299 }
300
301 #[test]
302 fn oneshot_single_thread_try_recv_closed() {
303 let (tx, rx) = sync_channel::<i32>(0);
304 drop(tx);
305 assert!(rx.recv().is_err());
306 }
307
308 #[test]
309 fn oneshot_single_thread_try_recv_closed_with_data() {
310 let (tx, rx) = sync_channel::<i32>(1);
311 tx.send(10).unwrap();
312 drop(tx);
313 assert_eq!(rx.try_recv(), Ok(10));
314 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
315 }
316
317 #[test]
318 fn oneshot_single_thread_peek_data() {
319 let (tx, rx) = sync_channel::<i32>(1);
320 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
321 tx.send(10).unwrap();
322 assert_eq!(rx.try_recv(), Ok(10));
323 }
324
325 #[test]
326 fn oneshot_single_thread_peek_close() {
327 let (tx, rx) = sync_channel::<i32>(0);
328 drop(tx);
329 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
330 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
331 }
332
333 #[test]
334 fn oneshot_single_thread_peek_open() {
335 let (_tx, rx) = sync_channel::<i32>(0);
336 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
337 }
338
339 #[test]
340 fn oneshot_multi_task_recv_then_send() {
341 let (tx, rx) = sync_channel::<Box<i32>>(0);
342 let _t = thread::spawn(move || {
343 assert!(*rx.recv().unwrap() == 10);
344 });
345
346 tx.send(Box::new(10)).unwrap();
347 }
348
349 #[test]
350 fn oneshot_multi_task_recv_then_close() {
351 let (tx, rx) = sync_channel::<Box<i32>>(0);
352 let _t = thread::spawn(move || {
353 drop(tx);
354 });
355 let res = thread::spawn(move || {
356 assert!(*rx.recv().unwrap() == 10);
357 })
358 .join();
359 assert!(res.is_err());
360 }
361
362 #[test]
363 fn oneshot_multi_thread_close_stress() {
364 for _ in 0..stress_factor() {
365 let (tx, rx) = sync_channel::<i32>(0);
366 let _t = thread::spawn(move || {
367 drop(rx);
368 });
369 drop(tx);
370 }
371 }
372
373 #[test]
374 fn oneshot_multi_thread_send_close_stress() {
375 for _ in 0..stress_factor() {
376 let (tx, rx) = sync_channel::<i32>(0);
377 let _t = thread::spawn(move || {
378 drop(rx);
379 });
380 let _ = thread::spawn(move || {
381 tx.send(1).unwrap();
382 })
383 .join();
384 }
385 }
386
387 #[test]
388 fn oneshot_multi_thread_recv_close_stress() {
389 for _ in 0..stress_factor() {
390 let (tx, rx) = sync_channel::<i32>(0);
391 let _t = thread::spawn(move || {
392 let res = thread::spawn(move || {
393 rx.recv().unwrap();
394 })
395 .join();
396 assert!(res.is_err());
397 });
398 let _t = thread::spawn(move || {
399 thread::spawn(move || {
400 drop(tx);
401 });
402 });
403 }
404 }
405
406 #[test]
407 fn oneshot_multi_thread_send_recv_stress() {
408 for _ in 0..stress_factor() {
409 let (tx, rx) = sync_channel::<Box<i32>>(0);
410 let _t = thread::spawn(move || {
411 tx.send(Box::new(10)).unwrap();
412 });
413 assert!(*rx.recv().unwrap() == 10);
414 }
415 }
416
417 #[test]
418 fn stream_send_recv_stress() {
419 for _ in 0..stress_factor() {
420 let (tx, rx) = sync_channel::<Box<i32>>(0);
421
422 send(tx, 0);
423 recv(rx, 0);
424
425 fn send(tx: SyncSender<Box<i32>>, i: i32) {
426 if i == 10 {
427 return;
428 }
429
430 thread::spawn(move || {
431 tx.send(Box::new(i)).unwrap();
432 send(tx, i + 1);
433 });
434 }
435
436 fn recv(rx: Receiver<Box<i32>>, i: i32) {
437 if i == 10 {
438 return;
439 }
440
441 thread::spawn(move || {
442 assert!(*rx.recv().unwrap() == i);
443 recv(rx, i + 1);
444 });
445 }
446 }
447 }
448
449 #[test]
450 fn recv_a_lot() {
451 let count = if cfg!(miri) { 1000 } else { 10000 };
452 // Regression test that we don't run out of stack in scheduler context
453 let (tx, rx) = sync_channel(count);
454 for _ in 0..count {
455 tx.send(()).unwrap();
456 }
457 for _ in 0..count {
458 rx.recv().unwrap();
459 }
460 }
461
462 #[test]
463 fn shared_chan_stress() {
464 let (tx, rx) = sync_channel(0);
465 let total = stress_factor() + 100;
466 for _ in 0..total {
467 let tx = tx.clone();
468 thread::spawn(move || {
469 tx.send(()).unwrap();
470 });
471 }
472
473 for _ in 0..total {
474 rx.recv().unwrap();
475 }
476 }
477
478 #[test]
479 fn test_nested_recv_iter() {
480 let (tx, rx) = sync_channel::<i32>(0);
481 let (total_tx, total_rx) = sync_channel::<i32>(0);
482
483 let _t = thread::spawn(move || {
484 let mut acc = 0;
485 for x in rx.iter() {
486 acc += x;
487 }
488 total_tx.send(acc).unwrap();
489 });
490
491 tx.send(3).unwrap();
492 tx.send(1).unwrap();
493 tx.send(2).unwrap();
494 drop(tx);
495 assert_eq!(total_rx.recv().unwrap(), 6);
496 }
497
498 #[test]
499 fn test_recv_iter_break() {
500 let (tx, rx) = sync_channel::<i32>(0);
501 let (count_tx, count_rx) = sync_channel(0);
502
503 let _t = thread::spawn(move || {
504 let mut count = 0;
505 for x in rx.iter() {
506 if count >= 3 {
507 break;
508 } else {
509 count += x;
510 }
511 }
512 count_tx.send(count).unwrap();
513 });
514
515 tx.send(2).unwrap();
516 tx.send(2).unwrap();
517 tx.send(2).unwrap();
518 let _ = tx.try_send(2);
519 drop(tx);
520 assert_eq!(count_rx.recv().unwrap(), 4);
521 }
522
523 #[test]
524 fn try_recv_states() {
525 let (tx1, rx1) = sync_channel::<i32>(1);
526 let (tx2, rx2) = sync_channel::<()>(1);
527 let (tx3, rx3) = sync_channel::<()>(1);
528 let _t = thread::spawn(move || {
529 rx2.recv().unwrap();
530 tx1.send(1).unwrap();
531 tx3.send(()).unwrap();
532 rx2.recv().unwrap();
533 drop(tx1);
534 tx3.send(()).unwrap();
535 });
536
537 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
538 tx2.send(()).unwrap();
539 rx3.recv().unwrap();
540 assert_eq!(rx1.try_recv(), Ok(1));
541 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
542 tx2.send(()).unwrap();
543 rx3.recv().unwrap();
544 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
545 }
546
547 // This bug used to end up in a livelock inside of the Receiver destructor
548 // because the internal state of the Shared packet was corrupted
549 #[test]
550 fn destroy_upgraded_shared_port_when_sender_still_active() {
551 let (tx, rx) = sync_channel::<()>(0);
552 let (tx2, rx2) = sync_channel::<()>(0);
553 let _t = thread::spawn(move || {
554 rx.recv().unwrap(); // wait on a oneshot
555 drop(rx); // destroy a shared
556 tx2.send(()).unwrap();
557 });
558 // make sure the other thread has gone to sleep
559 for _ in 0..5000 {
560 thread::yield_now();
561 }
562
563 // upgrade to a shared chan and send a message
564 let t = tx.clone();
565 drop(tx);
566 t.send(()).unwrap();
567
568 // wait for the child thread to exit before we exit
569 rx2.recv().unwrap();
570 }
571
572 #[test]
573 fn send1() {
574 let (tx, rx) = sync_channel::<i32>(0);
575 let _t = thread::spawn(move || {
576 rx.recv().unwrap();
577 });
578 assert_eq!(tx.send(1), Ok(()));
579 }
580
581 #[test]
582 fn send2() {
583 let (tx, rx) = sync_channel::<i32>(0);
584 let _t = thread::spawn(move || {
585 drop(rx);
586 });
587 assert!(tx.send(1).is_err());
588 }
589
590 #[test]
591 fn send3() {
592 let (tx, rx) = sync_channel::<i32>(1);
593 assert_eq!(tx.send(1), Ok(()));
594 let _t = thread::spawn(move || {
595 drop(rx);
596 });
597 assert!(tx.send(1).is_err());
598 }
599
600 #[test]
601 fn send4() {
602 let (tx, rx) = sync_channel::<i32>(0);
603 let tx2 = tx.clone();
604 let (done, donerx) = channel();
605 let done2 = done.clone();
606 let _t = thread::spawn(move || {
607 assert!(tx.send(1).is_err());
608 done.send(()).unwrap();
609 });
610 let _t = thread::spawn(move || {
611 assert!(tx2.send(2).is_err());
612 done2.send(()).unwrap();
613 });
614 drop(rx);
615 donerx.recv().unwrap();
616 donerx.recv().unwrap();
617 }
618
619 #[test]
620 fn try_send1() {
621 let (tx, _rx) = sync_channel::<i32>(0);
622 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
623 }
624
625 #[test]
626 fn try_send2() {
627 let (tx, _rx) = sync_channel::<i32>(1);
628 assert_eq!(tx.try_send(1), Ok(()));
629 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
630 }
631
632 #[test]
633 fn try_send3() {
634 let (tx, rx) = sync_channel::<i32>(1);
635 assert_eq!(tx.try_send(1), Ok(()));
636 drop(rx);
637 assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
638 }
639
640 #[test]
641 fn issue_15761() {
642 fn repro() {
643 let (tx1, rx1) = sync_channel::<()>(3);
644 let (tx2, rx2) = sync_channel::<()>(3);
645
646 let _t = thread::spawn(move || {
647 rx1.recv().unwrap();
648 tx2.try_send(()).unwrap();
649 });
650
651 tx1.try_send(()).unwrap();
652 rx2.recv().unwrap();
653 }
654
655 for _ in 0..100 {
656 repro()
657 }
658 }