]> git.proxmox.com Git - rustc.git/blob - vendor/crossbeam-channel/tests/array.rs
New upstream version 1.64.0+dfsg1
[rustc.git] / vendor / crossbeam-channel / tests / array.rs
1 //! Tests for the array channel flavor.
2
3 use std::any::Any;
4 use std::sync::atomic::AtomicUsize;
5 use std::sync::atomic::Ordering;
6 use std::thread;
7 use std::time::Duration;
8
9 use crossbeam_channel::{bounded, select, Receiver};
10 use crossbeam_channel::{RecvError, RecvTimeoutError, TryRecvError};
11 use crossbeam_channel::{SendError, SendTimeoutError, TrySendError};
12 use crossbeam_utils::thread::scope;
13 use rand::{thread_rng, Rng};
14
15 fn ms(ms: u64) -> Duration {
16 Duration::from_millis(ms)
17 }
18
19 #[test]
20 fn smoke() {
21 let (s, r) = bounded(1);
22 s.send(7).unwrap();
23 assert_eq!(r.try_recv(), Ok(7));
24
25 s.send(8).unwrap();
26 assert_eq!(r.recv(), Ok(8));
27
28 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
29 assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout));
30 }
31
32 #[test]
33 fn capacity() {
34 for i in 1..10 {
35 let (s, r) = bounded::<()>(i);
36 assert_eq!(s.capacity(), Some(i));
37 assert_eq!(r.capacity(), Some(i));
38 }
39 }
40
41 #[test]
42 fn len_empty_full() {
43 let (s, r) = bounded(2);
44
45 assert_eq!(s.len(), 0);
46 assert!(s.is_empty());
47 assert!(!s.is_full());
48 assert_eq!(r.len(), 0);
49 assert!(r.is_empty());
50 assert!(!r.is_full());
51
52 s.send(()).unwrap();
53
54 assert_eq!(s.len(), 1);
55 assert!(!s.is_empty());
56 assert!(!s.is_full());
57 assert_eq!(r.len(), 1);
58 assert!(!r.is_empty());
59 assert!(!r.is_full());
60
61 s.send(()).unwrap();
62
63 assert_eq!(s.len(), 2);
64 assert!(!s.is_empty());
65 assert!(s.is_full());
66 assert_eq!(r.len(), 2);
67 assert!(!r.is_empty());
68 assert!(r.is_full());
69
70 r.recv().unwrap();
71
72 assert_eq!(s.len(), 1);
73 assert!(!s.is_empty());
74 assert!(!s.is_full());
75 assert_eq!(r.len(), 1);
76 assert!(!r.is_empty());
77 assert!(!r.is_full());
78 }
79
80 #[test]
81 fn try_recv() {
82 let (s, r) = bounded(100);
83
84 scope(|scope| {
85 scope.spawn(move |_| {
86 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
87 thread::sleep(ms(1500));
88 assert_eq!(r.try_recv(), Ok(7));
89 thread::sleep(ms(500));
90 assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
91 });
92 scope.spawn(move |_| {
93 thread::sleep(ms(1000));
94 s.send(7).unwrap();
95 });
96 })
97 .unwrap();
98 }
99
100 #[test]
101 fn recv() {
102 let (s, r) = bounded(100);
103
104 scope(|scope| {
105 scope.spawn(move |_| {
106 assert_eq!(r.recv(), Ok(7));
107 thread::sleep(ms(1000));
108 assert_eq!(r.recv(), Ok(8));
109 thread::sleep(ms(1000));
110 assert_eq!(r.recv(), Ok(9));
111 assert_eq!(r.recv(), Err(RecvError));
112 });
113 scope.spawn(move |_| {
114 thread::sleep(ms(1500));
115 s.send(7).unwrap();
116 s.send(8).unwrap();
117 s.send(9).unwrap();
118 });
119 })
120 .unwrap();
121 }
122
123 #[test]
124 fn recv_timeout() {
125 let (s, r) = bounded::<i32>(100);
126
127 scope(|scope| {
128 scope.spawn(move |_| {
129 assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout));
130 assert_eq!(r.recv_timeout(ms(1000)), Ok(7));
131 assert_eq!(
132 r.recv_timeout(ms(1000)),
133 Err(RecvTimeoutError::Disconnected)
134 );
135 });
136 scope.spawn(move |_| {
137 thread::sleep(ms(1500));
138 s.send(7).unwrap();
139 });
140 })
141 .unwrap();
142 }
143
144 #[test]
145 fn try_send() {
146 let (s, r) = bounded(1);
147
148 scope(|scope| {
149 scope.spawn(move |_| {
150 assert_eq!(s.try_send(1), Ok(()));
151 assert_eq!(s.try_send(2), Err(TrySendError::Full(2)));
152 thread::sleep(ms(1500));
153 assert_eq!(s.try_send(3), Ok(()));
154 thread::sleep(ms(500));
155 assert_eq!(s.try_send(4), Err(TrySendError::Disconnected(4)));
156 });
157 scope.spawn(move |_| {
158 thread::sleep(ms(1000));
159 assert_eq!(r.try_recv(), Ok(1));
160 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
161 assert_eq!(r.recv(), Ok(3));
162 });
163 })
164 .unwrap();
165 }
166
167 #[test]
168 fn send() {
169 let (s, r) = bounded(1);
170
171 scope(|scope| {
172 scope.spawn(|_| {
173 s.send(7).unwrap();
174 thread::sleep(ms(1000));
175 s.send(8).unwrap();
176 thread::sleep(ms(1000));
177 s.send(9).unwrap();
178 thread::sleep(ms(1000));
179 s.send(10).unwrap();
180 });
181 scope.spawn(|_| {
182 thread::sleep(ms(1500));
183 assert_eq!(r.recv(), Ok(7));
184 assert_eq!(r.recv(), Ok(8));
185 assert_eq!(r.recv(), Ok(9));
186 });
187 })
188 .unwrap();
189 }
190
191 #[test]
192 fn send_timeout() {
193 let (s, r) = bounded(2);
194
195 scope(|scope| {
196 scope.spawn(move |_| {
197 assert_eq!(s.send_timeout(1, ms(1000)), Ok(()));
198 assert_eq!(s.send_timeout(2, ms(1000)), Ok(()));
199 assert_eq!(
200 s.send_timeout(3, ms(500)),
201 Err(SendTimeoutError::Timeout(3))
202 );
203 thread::sleep(ms(1000));
204 assert_eq!(s.send_timeout(4, ms(1000)), Ok(()));
205 thread::sleep(ms(1000));
206 assert_eq!(s.send(5), Err(SendError(5)));
207 });
208 scope.spawn(move |_| {
209 thread::sleep(ms(1000));
210 assert_eq!(r.recv(), Ok(1));
211 thread::sleep(ms(1000));
212 assert_eq!(r.recv(), Ok(2));
213 assert_eq!(r.recv(), Ok(4));
214 });
215 })
216 .unwrap();
217 }
218
219 #[test]
220 fn send_after_disconnect() {
221 let (s, r) = bounded(100);
222
223 s.send(1).unwrap();
224 s.send(2).unwrap();
225 s.send(3).unwrap();
226
227 drop(r);
228
229 assert_eq!(s.send(4), Err(SendError(4)));
230 assert_eq!(s.try_send(5), Err(TrySendError::Disconnected(5)));
231 assert_eq!(
232 s.send_timeout(6, ms(500)),
233 Err(SendTimeoutError::Disconnected(6))
234 );
235 }
236
237 #[test]
238 fn recv_after_disconnect() {
239 let (s, r) = bounded(100);
240
241 s.send(1).unwrap();
242 s.send(2).unwrap();
243 s.send(3).unwrap();
244
245 drop(s);
246
247 assert_eq!(r.recv(), Ok(1));
248 assert_eq!(r.recv(), Ok(2));
249 assert_eq!(r.recv(), Ok(3));
250 assert_eq!(r.recv(), Err(RecvError));
251 }
252
253 #[test]
254 fn len() {
255 #[cfg(miri)]
256 const COUNT: usize = 50;
257 #[cfg(not(miri))]
258 const COUNT: usize = 25_000;
259 #[cfg(miri)]
260 const CAP: usize = 50;
261 #[cfg(not(miri))]
262 const CAP: usize = 1000;
263
264 let (s, r) = bounded(CAP);
265
266 assert_eq!(s.len(), 0);
267 assert_eq!(r.len(), 0);
268
269 for _ in 0..CAP / 10 {
270 for i in 0..50 {
271 s.send(i).unwrap();
272 assert_eq!(s.len(), i + 1);
273 }
274
275 for i in 0..50 {
276 r.recv().unwrap();
277 assert_eq!(r.len(), 50 - i - 1);
278 }
279 }
280
281 assert_eq!(s.len(), 0);
282 assert_eq!(r.len(), 0);
283
284 for i in 0..CAP {
285 s.send(i).unwrap();
286 assert_eq!(s.len(), i + 1);
287 }
288
289 for _ in 0..CAP {
290 r.recv().unwrap();
291 }
292
293 assert_eq!(s.len(), 0);
294 assert_eq!(r.len(), 0);
295
296 scope(|scope| {
297 scope.spawn(|_| {
298 for i in 0..COUNT {
299 assert_eq!(r.recv(), Ok(i));
300 let len = r.len();
301 assert!(len <= CAP);
302 }
303 });
304
305 scope.spawn(|_| {
306 for i in 0..COUNT {
307 s.send(i).unwrap();
308 let len = s.len();
309 assert!(len <= CAP);
310 }
311 });
312 })
313 .unwrap();
314
315 assert_eq!(s.len(), 0);
316 assert_eq!(r.len(), 0);
317 }
318
319 #[test]
320 fn disconnect_wakes_sender() {
321 let (s, r) = bounded(1);
322
323 scope(|scope| {
324 scope.spawn(move |_| {
325 assert_eq!(s.send(()), Ok(()));
326 assert_eq!(s.send(()), Err(SendError(())));
327 });
328 scope.spawn(move |_| {
329 thread::sleep(ms(1000));
330 drop(r);
331 });
332 })
333 .unwrap();
334 }
335
336 #[test]
337 fn disconnect_wakes_receiver() {
338 let (s, r) = bounded::<()>(1);
339
340 scope(|scope| {
341 scope.spawn(move |_| {
342 assert_eq!(r.recv(), Err(RecvError));
343 });
344 scope.spawn(move |_| {
345 thread::sleep(ms(1000));
346 drop(s);
347 });
348 })
349 .unwrap();
350 }
351
352 #[test]
353 fn spsc() {
354 #[cfg(miri)]
355 const COUNT: usize = 100;
356 #[cfg(not(miri))]
357 const COUNT: usize = 100_000;
358
359 let (s, r) = bounded(3);
360
361 scope(|scope| {
362 scope.spawn(move |_| {
363 for i in 0..COUNT {
364 assert_eq!(r.recv(), Ok(i));
365 }
366 assert_eq!(r.recv(), Err(RecvError));
367 });
368 scope.spawn(move |_| {
369 for i in 0..COUNT {
370 s.send(i).unwrap();
371 }
372 });
373 })
374 .unwrap();
375 }
376
377 #[test]
378 fn mpmc() {
379 #[cfg(miri)]
380 const COUNT: usize = 100;
381 #[cfg(not(miri))]
382 const COUNT: usize = 25_000;
383 const THREADS: usize = 4;
384
385 let (s, r) = bounded::<usize>(3);
386 let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
387
388 scope(|scope| {
389 for _ in 0..THREADS {
390 scope.spawn(|_| {
391 for _ in 0..COUNT {
392 let n = r.recv().unwrap();
393 v[n].fetch_add(1, Ordering::SeqCst);
394 }
395 });
396 }
397 for _ in 0..THREADS {
398 scope.spawn(|_| {
399 for i in 0..COUNT {
400 s.send(i).unwrap();
401 }
402 });
403 }
404 })
405 .unwrap();
406
407 for c in v {
408 assert_eq!(c.load(Ordering::SeqCst), THREADS);
409 }
410 }
411
412 #[test]
413 fn stress_oneshot() {
414 #[cfg(miri)]
415 const COUNT: usize = 100;
416 #[cfg(not(miri))]
417 const COUNT: usize = 10_000;
418
419 for _ in 0..COUNT {
420 let (s, r) = bounded(1);
421
422 scope(|scope| {
423 scope.spawn(|_| r.recv().unwrap());
424 scope.spawn(|_| s.send(0).unwrap());
425 })
426 .unwrap();
427 }
428 }
429
430 #[test]
431 fn stress_iter() {
432 #[cfg(miri)]
433 const COUNT: usize = 100;
434 #[cfg(not(miri))]
435 const COUNT: usize = 100_000;
436
437 let (request_s, request_r) = bounded(1);
438 let (response_s, response_r) = bounded(1);
439
440 scope(|scope| {
441 scope.spawn(move |_| {
442 let mut count = 0;
443 loop {
444 for x in response_r.try_iter() {
445 count += x;
446 if count == COUNT {
447 return;
448 }
449 }
450 request_s.send(()).unwrap();
451 }
452 });
453
454 for _ in request_r.iter() {
455 if response_s.send(1).is_err() {
456 break;
457 }
458 }
459 })
460 .unwrap();
461 }
462
463 #[test]
464 fn stress_timeout_two_threads() {
465 const COUNT: usize = 100;
466
467 let (s, r) = bounded(2);
468
469 scope(|scope| {
470 scope.spawn(|_| {
471 for i in 0..COUNT {
472 if i % 2 == 0 {
473 thread::sleep(ms(50));
474 }
475 loop {
476 if let Ok(()) = s.send_timeout(i, ms(10)) {
477 break;
478 }
479 }
480 }
481 });
482
483 scope.spawn(|_| {
484 for i in 0..COUNT {
485 if i % 2 == 0 {
486 thread::sleep(ms(50));
487 }
488 loop {
489 if let Ok(x) = r.recv_timeout(ms(10)) {
490 assert_eq!(x, i);
491 break;
492 }
493 }
494 }
495 });
496 })
497 .unwrap();
498 }
499
500 #[test]
501 fn drops() {
502 #[cfg(miri)]
503 const RUNS: usize = 10;
504 #[cfg(not(miri))]
505 const RUNS: usize = 100;
506 #[cfg(miri)]
507 const STEPS: usize = 100;
508 #[cfg(not(miri))]
509 const STEPS: usize = 10_000;
510
511 static DROPS: AtomicUsize = AtomicUsize::new(0);
512
513 #[derive(Debug, PartialEq)]
514 struct DropCounter;
515
516 impl Drop for DropCounter {
517 fn drop(&mut self) {
518 DROPS.fetch_add(1, Ordering::SeqCst);
519 }
520 }
521
522 let mut rng = thread_rng();
523
524 for _ in 0..RUNS {
525 let steps = rng.gen_range(0..STEPS);
526 let additional = rng.gen_range(0..50);
527
528 DROPS.store(0, Ordering::SeqCst);
529 let (s, r) = bounded::<DropCounter>(50);
530
531 scope(|scope| {
532 scope.spawn(|_| {
533 for _ in 0..steps {
534 r.recv().unwrap();
535 #[cfg(miri)]
536 std::thread::yield_now(); // https://github.com/rust-lang/miri/issues/1388
537 }
538 });
539
540 scope.spawn(|_| {
541 for _ in 0..steps {
542 s.send(DropCounter).unwrap();
543 #[cfg(miri)]
544 std::thread::yield_now(); // https://github.com/rust-lang/miri/issues/1388
545 }
546 });
547 })
548 .unwrap();
549
550 for _ in 0..additional {
551 s.send(DropCounter).unwrap();
552 }
553
554 assert_eq!(DROPS.load(Ordering::SeqCst), steps);
555 drop(s);
556 drop(r);
557 assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional);
558 }
559 }
560
561 #[test]
562 fn linearizable() {
563 #[cfg(miri)]
564 const COUNT: usize = 50;
565 #[cfg(not(miri))]
566 const COUNT: usize = 25_000;
567 const THREADS: usize = 4;
568
569 let (s, r) = bounded(THREADS);
570
571 scope(|scope| {
572 for _ in 0..THREADS {
573 scope.spawn(|_| {
574 for _ in 0..COUNT {
575 s.send(0).unwrap();
576 r.try_recv().unwrap();
577 }
578 });
579 }
580 })
581 .unwrap();
582 }
583
584 #[test]
585 fn fairness() {
586 #[cfg(miri)]
587 const COUNT: usize = 100;
588 #[cfg(not(miri))]
589 const COUNT: usize = 10_000;
590
591 let (s1, r1) = bounded::<()>(COUNT);
592 let (s2, r2) = bounded::<()>(COUNT);
593
594 for _ in 0..COUNT {
595 s1.send(()).unwrap();
596 s2.send(()).unwrap();
597 }
598
599 let mut hits = [0usize; 2];
600 for _ in 0..COUNT {
601 select! {
602 recv(r1) -> _ => hits[0] += 1,
603 recv(r2) -> _ => hits[1] += 1,
604 }
605 }
606 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
607 }
608
609 #[test]
610 fn fairness_duplicates() {
611 #[cfg(miri)]
612 const COUNT: usize = 100;
613 #[cfg(not(miri))]
614 const COUNT: usize = 10_000;
615
616 let (s, r) = bounded::<()>(COUNT);
617
618 for _ in 0..COUNT {
619 s.send(()).unwrap();
620 }
621
622 let mut hits = [0usize; 5];
623 for _ in 0..COUNT {
624 select! {
625 recv(r) -> _ => hits[0] += 1,
626 recv(r) -> _ => hits[1] += 1,
627 recv(r) -> _ => hits[2] += 1,
628 recv(r) -> _ => hits[3] += 1,
629 recv(r) -> _ => hits[4] += 1,
630 }
631 }
632 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
633 }
634
635 #[test]
636 fn recv_in_send() {
637 let (s, _r) = bounded(1);
638 s.send(()).unwrap();
639
640 #[allow(unreachable_code)]
641 {
642 select! {
643 send(s, panic!()) -> _ => panic!(),
644 default => {}
645 }
646 }
647
648 let (s, r) = bounded(2);
649 s.send(()).unwrap();
650
651 select! {
652 send(s, assert_eq!(r.recv(), Ok(()))) -> _ => {}
653 }
654 }
655
656 #[test]
657 fn channel_through_channel() {
658 #[cfg(miri)]
659 const COUNT: usize = 100;
660 #[cfg(not(miri))]
661 const COUNT: usize = 1000;
662
663 type T = Box<dyn Any + Send>;
664
665 let (s, r) = bounded::<T>(1);
666
667 scope(|scope| {
668 scope.spawn(move |_| {
669 let mut s = s;
670
671 for _ in 0..COUNT {
672 let (new_s, new_r) = bounded(1);
673 let new_r: T = Box::new(Some(new_r));
674
675 s.send(new_r).unwrap();
676 s = new_s;
677 }
678 });
679
680 scope.spawn(move |_| {
681 let mut r = r;
682
683 for _ in 0..COUNT {
684 r = r
685 .recv()
686 .unwrap()
687 .downcast_mut::<Option<Receiver<T>>>()
688 .unwrap()
689 .take()
690 .unwrap()
691 }
692 });
693 })
694 .unwrap();
695 }
696
697 #[test]
698 fn panic_on_drop() {
699 struct Msg1<'a>(&'a mut bool);
700 impl Drop for Msg1<'_> {
701 fn drop(&mut self) {
702 if *self.0 && !std::thread::panicking() {
703 panic!("double drop");
704 } else {
705 *self.0 = true;
706 }
707 }
708 }
709
710 struct Msg2<'a>(&'a mut bool);
711 impl Drop for Msg2<'_> {
712 fn drop(&mut self) {
713 if *self.0 {
714 panic!("double drop");
715 } else {
716 *self.0 = true;
717 panic!("first drop");
718 }
719 }
720 }
721
722 // normal
723 let (s, r) = bounded(2);
724 let (mut a, mut b) = (false, false);
725 s.send(Msg1(&mut a)).unwrap();
726 s.send(Msg1(&mut b)).unwrap();
727 drop(s);
728 drop(r);
729 assert!(a);
730 assert!(b);
731
732 // panic on drop
733 let (s, r) = bounded(2);
734 let (mut a, mut b) = (false, false);
735 s.send(Msg2(&mut a)).unwrap();
736 s.send(Msg2(&mut b)).unwrap();
737 drop(s);
738 let res = std::panic::catch_unwind(move || {
739 drop(r);
740 });
741 assert_eq!(
742 *res.unwrap_err().downcast_ref::<&str>().unwrap(),
743 "first drop"
744 );
745 assert!(a);
746 // Elements after the panicked element will leak.
747 assert!(!b);
748 }