]> git.proxmox.com Git - rustc.git/blame - vendor/crossbeam-channel/tests/array.rs
New upstream version 1.65.0+dfsg1
[rustc.git] / vendor / crossbeam-channel / tests / array.rs
CommitLineData
1b1a35ee
XL
1//! Tests for the array channel flavor.
2
1b1a35ee
XL
3use std::any::Any;
4use std::sync::atomic::AtomicUsize;
5use std::sync::atomic::Ordering;
6use std::thread;
7use std::time::Duration;
8
5869c6ff 9use crossbeam_channel::{bounded, select, Receiver};
1b1a35ee
XL
10use crossbeam_channel::{RecvError, RecvTimeoutError, TryRecvError};
11use crossbeam_channel::{SendError, SendTimeoutError, TrySendError};
12use crossbeam_utils::thread::scope;
13use rand::{thread_rng, Rng};
14
15fn ms(ms: u64) -> Duration {
16 Duration::from_millis(ms)
17}
18
19#[test]
20fn smoke() {
21 let (s, r) = bounded(1);
22 s.send(7).unwrap();
23 assert_eq!(r.try_recv(), Ok(7));
24
25 s.send(8).unwrap();
26 assert_eq!(r.recv(), Ok(8));
27
28 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
29 assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout));
30}
31
32#[test]
33fn capacity() {
34 for i in 1..10 {
35 let (s, r) = bounded::<()>(i);
36 assert_eq!(s.capacity(), Some(i));
37 assert_eq!(r.capacity(), Some(i));
38 }
39}
40
41#[test]
42fn len_empty_full() {
43 let (s, r) = bounded(2);
44
45 assert_eq!(s.len(), 0);
5099ac24
FG
46 assert!(s.is_empty());
47 assert!(!s.is_full());
1b1a35ee 48 assert_eq!(r.len(), 0);
5099ac24
FG
49 assert!(r.is_empty());
50 assert!(!r.is_full());
1b1a35ee
XL
51
52 s.send(()).unwrap();
53
54 assert_eq!(s.len(), 1);
5099ac24
FG
55 assert!(!s.is_empty());
56 assert!(!s.is_full());
1b1a35ee 57 assert_eq!(r.len(), 1);
5099ac24
FG
58 assert!(!r.is_empty());
59 assert!(!r.is_full());
1b1a35ee
XL
60
61 s.send(()).unwrap();
62
63 assert_eq!(s.len(), 2);
5099ac24
FG
64 assert!(!s.is_empty());
65 assert!(s.is_full());
1b1a35ee 66 assert_eq!(r.len(), 2);
5099ac24
FG
67 assert!(!r.is_empty());
68 assert!(r.is_full());
1b1a35ee
XL
69
70 r.recv().unwrap();
71
72 assert_eq!(s.len(), 1);
5099ac24
FG
73 assert!(!s.is_empty());
74 assert!(!s.is_full());
1b1a35ee 75 assert_eq!(r.len(), 1);
5099ac24
FG
76 assert!(!r.is_empty());
77 assert!(!r.is_full());
1b1a35ee
XL
78}
79
80#[test]
81fn try_recv() {
82 let (s, r) = bounded(100);
83
84 scope(|scope| {
85 scope.spawn(move |_| {
86 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
87 thread::sleep(ms(1500));
88 assert_eq!(r.try_recv(), Ok(7));
89 thread::sleep(ms(500));
90 assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
91 });
92 scope.spawn(move |_| {
93 thread::sleep(ms(1000));
94 s.send(7).unwrap();
95 });
96 })
97 .unwrap();
98}
99
100#[test]
101fn recv() {
102 let (s, r) = bounded(100);
103
104 scope(|scope| {
105 scope.spawn(move |_| {
106 assert_eq!(r.recv(), Ok(7));
107 thread::sleep(ms(1000));
108 assert_eq!(r.recv(), Ok(8));
109 thread::sleep(ms(1000));
110 assert_eq!(r.recv(), Ok(9));
111 assert_eq!(r.recv(), Err(RecvError));
112 });
113 scope.spawn(move |_| {
114 thread::sleep(ms(1500));
115 s.send(7).unwrap();
116 s.send(8).unwrap();
117 s.send(9).unwrap();
118 });
119 })
120 .unwrap();
121}
122
123#[test]
124fn recv_timeout() {
125 let (s, r) = bounded::<i32>(100);
126
127 scope(|scope| {
128 scope.spawn(move |_| {
129 assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout));
130 assert_eq!(r.recv_timeout(ms(1000)), Ok(7));
131 assert_eq!(
132 r.recv_timeout(ms(1000)),
133 Err(RecvTimeoutError::Disconnected)
134 );
135 });
136 scope.spawn(move |_| {
137 thread::sleep(ms(1500));
138 s.send(7).unwrap();
139 });
140 })
141 .unwrap();
142}
143
144#[test]
145fn try_send() {
146 let (s, r) = bounded(1);
147
148 scope(|scope| {
149 scope.spawn(move |_| {
150 assert_eq!(s.try_send(1), Ok(()));
151 assert_eq!(s.try_send(2), Err(TrySendError::Full(2)));
152 thread::sleep(ms(1500));
153 assert_eq!(s.try_send(3), Ok(()));
154 thread::sleep(ms(500));
155 assert_eq!(s.try_send(4), Err(TrySendError::Disconnected(4)));
156 });
157 scope.spawn(move |_| {
158 thread::sleep(ms(1000));
159 assert_eq!(r.try_recv(), Ok(1));
160 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
161 assert_eq!(r.recv(), Ok(3));
162 });
163 })
164 .unwrap();
165}
166
167#[test]
168fn send() {
169 let (s, r) = bounded(1);
170
171 scope(|scope| {
172 scope.spawn(|_| {
173 s.send(7).unwrap();
174 thread::sleep(ms(1000));
175 s.send(8).unwrap();
176 thread::sleep(ms(1000));
177 s.send(9).unwrap();
178 thread::sleep(ms(1000));
179 s.send(10).unwrap();
180 });
181 scope.spawn(|_| {
182 thread::sleep(ms(1500));
183 assert_eq!(r.recv(), Ok(7));
184 assert_eq!(r.recv(), Ok(8));
185 assert_eq!(r.recv(), Ok(9));
186 });
187 })
188 .unwrap();
189}
190
191#[test]
192fn send_timeout() {
193 let (s, r) = bounded(2);
194
195 scope(|scope| {
196 scope.spawn(move |_| {
197 assert_eq!(s.send_timeout(1, ms(1000)), Ok(()));
198 assert_eq!(s.send_timeout(2, ms(1000)), Ok(()));
199 assert_eq!(
200 s.send_timeout(3, ms(500)),
201 Err(SendTimeoutError::Timeout(3))
202 );
203 thread::sleep(ms(1000));
204 assert_eq!(s.send_timeout(4, ms(1000)), Ok(()));
205 thread::sleep(ms(1000));
206 assert_eq!(s.send(5), Err(SendError(5)));
207 });
208 scope.spawn(move |_| {
209 thread::sleep(ms(1000));
210 assert_eq!(r.recv(), Ok(1));
211 thread::sleep(ms(1000));
212 assert_eq!(r.recv(), Ok(2));
213 assert_eq!(r.recv(), Ok(4));
214 });
215 })
216 .unwrap();
217}
218
219#[test]
220fn send_after_disconnect() {
221 let (s, r) = bounded(100);
222
223 s.send(1).unwrap();
224 s.send(2).unwrap();
225 s.send(3).unwrap();
226
227 drop(r);
228
229 assert_eq!(s.send(4), Err(SendError(4)));
230 assert_eq!(s.try_send(5), Err(TrySendError::Disconnected(5)));
231 assert_eq!(
232 s.send_timeout(6, ms(500)),
233 Err(SendTimeoutError::Disconnected(6))
234 );
235}
236
237#[test]
238fn recv_after_disconnect() {
239 let (s, r) = bounded(100);
240
241 s.send(1).unwrap();
242 s.send(2).unwrap();
243 s.send(3).unwrap();
244
245 drop(s);
246
247 assert_eq!(r.recv(), Ok(1));
248 assert_eq!(r.recv(), Ok(2));
249 assert_eq!(r.recv(), Ok(3));
250 assert_eq!(r.recv(), Err(RecvError));
251}
252
253#[test]
254fn len() {
5e7ed085 255 #[cfg(miri)]
064997fb 256 const COUNT: usize = 50;
5e7ed085 257 #[cfg(not(miri))]
1b1a35ee 258 const COUNT: usize = 25_000;
5e7ed085 259 #[cfg(miri)]
064997fb 260 const CAP: usize = 50;
5e7ed085 261 #[cfg(not(miri))]
1b1a35ee
XL
262 const CAP: usize = 1000;
263
264 let (s, r) = bounded(CAP);
265
266 assert_eq!(s.len(), 0);
267 assert_eq!(r.len(), 0);
268
269 for _ in 0..CAP / 10 {
270 for i in 0..50 {
271 s.send(i).unwrap();
272 assert_eq!(s.len(), i + 1);
273 }
274
275 for i in 0..50 {
276 r.recv().unwrap();
277 assert_eq!(r.len(), 50 - i - 1);
278 }
279 }
280
281 assert_eq!(s.len(), 0);
282 assert_eq!(r.len(), 0);
283
284 for i in 0..CAP {
285 s.send(i).unwrap();
286 assert_eq!(s.len(), i + 1);
287 }
288
289 for _ in 0..CAP {
290 r.recv().unwrap();
291 }
292
293 assert_eq!(s.len(), 0);
294 assert_eq!(r.len(), 0);
295
296 scope(|scope| {
297 scope.spawn(|_| {
298 for i in 0..COUNT {
299 assert_eq!(r.recv(), Ok(i));
300 let len = r.len();
301 assert!(len <= CAP);
302 }
303 });
304
305 scope.spawn(|_| {
306 for i in 0..COUNT {
307 s.send(i).unwrap();
308 let len = s.len();
309 assert!(len <= CAP);
310 }
311 });
312 })
313 .unwrap();
314
315 assert_eq!(s.len(), 0);
316 assert_eq!(r.len(), 0);
317}
318
319#[test]
320fn disconnect_wakes_sender() {
321 let (s, r) = bounded(1);
322
323 scope(|scope| {
324 scope.spawn(move |_| {
325 assert_eq!(s.send(()), Ok(()));
326 assert_eq!(s.send(()), Err(SendError(())));
327 });
328 scope.spawn(move |_| {
329 thread::sleep(ms(1000));
330 drop(r);
331 });
332 })
333 .unwrap();
334}
335
336#[test]
337fn disconnect_wakes_receiver() {
338 let (s, r) = bounded::<()>(1);
339
340 scope(|scope| {
341 scope.spawn(move |_| {
342 assert_eq!(r.recv(), Err(RecvError));
343 });
344 scope.spawn(move |_| {
345 thread::sleep(ms(1000));
346 drop(s);
347 });
348 })
349 .unwrap();
350}
351
352#[test]
353fn spsc() {
5e7ed085
FG
354 #[cfg(miri)]
355 const COUNT: usize = 100;
356 #[cfg(not(miri))]
1b1a35ee
XL
357 const COUNT: usize = 100_000;
358
359 let (s, r) = bounded(3);
360
361 scope(|scope| {
362 scope.spawn(move |_| {
363 for i in 0..COUNT {
364 assert_eq!(r.recv(), Ok(i));
365 }
366 assert_eq!(r.recv(), Err(RecvError));
367 });
368 scope.spawn(move |_| {
369 for i in 0..COUNT {
370 s.send(i).unwrap();
371 }
372 });
373 })
374 .unwrap();
375}
376
377#[test]
378fn mpmc() {
5e7ed085 379 #[cfg(miri)]
f2b60f7d 380 const COUNT: usize = 50;
5e7ed085 381 #[cfg(not(miri))]
1b1a35ee
XL
382 const COUNT: usize = 25_000;
383 const THREADS: usize = 4;
384
385 let (s, r) = bounded::<usize>(3);
386 let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
387
388 scope(|scope| {
389 for _ in 0..THREADS {
390 scope.spawn(|_| {
391 for _ in 0..COUNT {
392 let n = r.recv().unwrap();
393 v[n].fetch_add(1, Ordering::SeqCst);
394 }
395 });
396 }
397 for _ in 0..THREADS {
398 scope.spawn(|_| {
399 for i in 0..COUNT {
400 s.send(i).unwrap();
401 }
402 });
403 }
404 })
405 .unwrap();
406
407 for c in v {
408 assert_eq!(c.load(Ordering::SeqCst), THREADS);
409 }
410}
411
412#[test]
413fn stress_oneshot() {
5e7ed085
FG
414 #[cfg(miri)]
415 const COUNT: usize = 100;
416 #[cfg(not(miri))]
1b1a35ee
XL
417 const COUNT: usize = 10_000;
418
419 for _ in 0..COUNT {
420 let (s, r) = bounded(1);
421
422 scope(|scope| {
423 scope.spawn(|_| r.recv().unwrap());
424 scope.spawn(|_| s.send(0).unwrap());
425 })
426 .unwrap();
427 }
428}
429
430#[test]
431fn stress_iter() {
5e7ed085
FG
432 #[cfg(miri)]
433 const COUNT: usize = 100;
434 #[cfg(not(miri))]
1b1a35ee
XL
435 const COUNT: usize = 100_000;
436
437 let (request_s, request_r) = bounded(1);
438 let (response_s, response_r) = bounded(1);
439
440 scope(|scope| {
441 scope.spawn(move |_| {
442 let mut count = 0;
443 loop {
444 for x in response_r.try_iter() {
445 count += x;
446 if count == COUNT {
447 return;
448 }
449 }
450 request_s.send(()).unwrap();
451 }
452 });
453
454 for _ in request_r.iter() {
455 if response_s.send(1).is_err() {
456 break;
457 }
458 }
459 })
460 .unwrap();
461}
462
463#[test]
464fn stress_timeout_two_threads() {
465 const COUNT: usize = 100;
466
467 let (s, r) = bounded(2);
468
469 scope(|scope| {
470 scope.spawn(|_| {
471 for i in 0..COUNT {
472 if i % 2 == 0 {
473 thread::sleep(ms(50));
474 }
475 loop {
476 if let Ok(()) = s.send_timeout(i, ms(10)) {
477 break;
478 }
479 }
480 }
481 });
482
483 scope.spawn(|_| {
484 for i in 0..COUNT {
485 if i % 2 == 0 {
486 thread::sleep(ms(50));
487 }
488 loop {
489 if let Ok(x) = r.recv_timeout(ms(10)) {
490 assert_eq!(x, i);
491 break;
492 }
493 }
494 }
495 });
496 })
497 .unwrap();
498}
499
500#[test]
501fn drops() {
064997fb
FG
502 #[cfg(miri)]
503 const RUNS: usize = 10;
504 #[cfg(not(miri))]
1b1a35ee 505 const RUNS: usize = 100;
064997fb
FG
506 #[cfg(miri)]
507 const STEPS: usize = 100;
508 #[cfg(not(miri))]
509 const STEPS: usize = 10_000;
1b1a35ee
XL
510
511 static DROPS: AtomicUsize = AtomicUsize::new(0);
512
513 #[derive(Debug, PartialEq)]
514 struct DropCounter;
515
516 impl Drop for DropCounter {
517 fn drop(&mut self) {
518 DROPS.fetch_add(1, Ordering::SeqCst);
519 }
520 }
521
522 let mut rng = thread_rng();
523
524 for _ in 0..RUNS {
064997fb 525 let steps = rng.gen_range(0..STEPS);
cdc7bbd5 526 let additional = rng.gen_range(0..50);
1b1a35ee
XL
527
528 DROPS.store(0, Ordering::SeqCst);
529 let (s, r) = bounded::<DropCounter>(50);
530
531 scope(|scope| {
532 scope.spawn(|_| {
533 for _ in 0..steps {
534 r.recv().unwrap();
535 }
536 });
537
538 scope.spawn(|_| {
539 for _ in 0..steps {
540 s.send(DropCounter).unwrap();
541 }
542 });
543 })
544 .unwrap();
545
546 for _ in 0..additional {
547 s.send(DropCounter).unwrap();
548 }
549
550 assert_eq!(DROPS.load(Ordering::SeqCst), steps);
551 drop(s);
552 drop(r);
553 assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional);
554 }
555}
556
557#[test]
558fn linearizable() {
5e7ed085 559 #[cfg(miri)]
064997fb 560 const COUNT: usize = 50;
5e7ed085 561 #[cfg(not(miri))]
1b1a35ee
XL
562 const COUNT: usize = 25_000;
563 const THREADS: usize = 4;
564
565 let (s, r) = bounded(THREADS);
566
567 scope(|scope| {
568 for _ in 0..THREADS {
569 scope.spawn(|_| {
570 for _ in 0..COUNT {
571 s.send(0).unwrap();
572 r.try_recv().unwrap();
573 }
574 });
575 }
576 })
577 .unwrap();
578}
579
580#[test]
581fn fairness() {
5e7ed085
FG
582 #[cfg(miri)]
583 const COUNT: usize = 100;
584 #[cfg(not(miri))]
1b1a35ee
XL
585 const COUNT: usize = 10_000;
586
587 let (s1, r1) = bounded::<()>(COUNT);
588 let (s2, r2) = bounded::<()>(COUNT);
589
590 for _ in 0..COUNT {
591 s1.send(()).unwrap();
592 s2.send(()).unwrap();
593 }
594
595 let mut hits = [0usize; 2];
596 for _ in 0..COUNT {
597 select! {
598 recv(r1) -> _ => hits[0] += 1,
599 recv(r2) -> _ => hits[1] += 1,
600 }
601 }
602 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
603}
604
605#[test]
606fn fairness_duplicates() {
5e7ed085
FG
607 #[cfg(miri)]
608 const COUNT: usize = 100;
609 #[cfg(not(miri))]
1b1a35ee
XL
610 const COUNT: usize = 10_000;
611
612 let (s, r) = bounded::<()>(COUNT);
613
614 for _ in 0..COUNT {
615 s.send(()).unwrap();
616 }
617
618 let mut hits = [0usize; 5];
619 for _ in 0..COUNT {
620 select! {
621 recv(r) -> _ => hits[0] += 1,
622 recv(r) -> _ => hits[1] += 1,
623 recv(r) -> _ => hits[2] += 1,
624 recv(r) -> _ => hits[3] += 1,
625 recv(r) -> _ => hits[4] += 1,
626 }
627 }
628 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
629}
630
631#[test]
632fn recv_in_send() {
633 let (s, _r) = bounded(1);
634 s.send(()).unwrap();
635
636 #[allow(unreachable_code)]
637 {
638 select! {
639 send(s, panic!()) -> _ => panic!(),
640 default => {}
641 }
642 }
643
644 let (s, r) = bounded(2);
645 s.send(()).unwrap();
646
647 select! {
648 send(s, assert_eq!(r.recv(), Ok(()))) -> _ => {}
649 }
650}
651
652#[test]
653fn channel_through_channel() {
5e7ed085
FG
654 #[cfg(miri)]
655 const COUNT: usize = 100;
656 #[cfg(not(miri))]
1b1a35ee
XL
657 const COUNT: usize = 1000;
658
659 type T = Box<dyn Any + Send>;
660
661 let (s, r) = bounded::<T>(1);
662
663 scope(|scope| {
664 scope.spawn(move |_| {
665 let mut s = s;
666
667 for _ in 0..COUNT {
668 let (new_s, new_r) = bounded(1);
669 let new_r: T = Box::new(Some(new_r));
670
671 s.send(new_r).unwrap();
672 s = new_s;
673 }
674 });
675
676 scope.spawn(move |_| {
677 let mut r = r;
678
679 for _ in 0..COUNT {
680 r = r
681 .recv()
682 .unwrap()
683 .downcast_mut::<Option<Receiver<T>>>()
684 .unwrap()
685 .take()
686 .unwrap()
687 }
688 });
689 })
690 .unwrap();
691}
5e7ed085
FG
692
693#[test]
694fn panic_on_drop() {
695 struct Msg1<'a>(&'a mut bool);
696 impl Drop for Msg1<'_> {
697 fn drop(&mut self) {
698 if *self.0 && !std::thread::panicking() {
699 panic!("double drop");
700 } else {
701 *self.0 = true;
702 }
703 }
704 }
705
706 struct Msg2<'a>(&'a mut bool);
707 impl Drop for Msg2<'_> {
708 fn drop(&mut self) {
709 if *self.0 {
710 panic!("double drop");
711 } else {
712 *self.0 = true;
713 panic!("first drop");
714 }
715 }
716 }
717
718 // normal
719 let (s, r) = bounded(2);
720 let (mut a, mut b) = (false, false);
721 s.send(Msg1(&mut a)).unwrap();
722 s.send(Msg1(&mut b)).unwrap();
723 drop(s);
724 drop(r);
725 assert!(a);
726 assert!(b);
727
728 // panic on drop
729 let (s, r) = bounded(2);
730 let (mut a, mut b) = (false, false);
731 s.send(Msg2(&mut a)).unwrap();
732 s.send(Msg2(&mut b)).unwrap();
733 drop(s);
734 let res = std::panic::catch_unwind(move || {
735 drop(r);
736 });
737 assert_eq!(
738 *res.unwrap_err().downcast_ref::<&str>().unwrap(),
739 "first drop"
740 );
741 assert!(a);
742 // Elements after the panicked element will leak.
743 assert!(!b);
744}