1 //! Tests for the array channel flavor.
4 use std
::sync
::atomic
::AtomicUsize
;
5 use std
::sync
::atomic
::Ordering
;
7 use std
::time
::Duration
;
9 use crossbeam_channel
::{bounded, select, Receiver}
;
10 use crossbeam_channel
::{RecvError, RecvTimeoutError, TryRecvError}
;
11 use crossbeam_channel
::{SendError, SendTimeoutError, TrySendError}
;
12 use crossbeam_utils
::thread
::scope
;
13 use rand
::{thread_rng, Rng}
;
15 fn ms(ms
: u64) -> Duration
{
16 Duration
::from_millis(ms
)
21 let (s
, r
) = bounded(1);
23 assert_eq
!(r
.try_recv(), Ok(7));
26 assert_eq
!(r
.recv(), Ok(8));
28 assert_eq
!(r
.try_recv(), Err(TryRecvError
::Empty
));
29 assert_eq
!(r
.recv_timeout(ms(1000)), Err(RecvTimeoutError
::Timeout
));
35 let (s
, r
) = bounded
::<()>(i
);
36 assert_eq
!(s
.capacity(), Some(i
));
37 assert_eq
!(r
.capacity(), Some(i
));
43 let (s
, r
) = bounded(2);
45 assert_eq
!(s
.len(), 0);
46 assert
!(s
.is_empty());
47 assert
!(!s
.is_full());
48 assert_eq
!(r
.len(), 0);
49 assert
!(r
.is_empty());
50 assert
!(!r
.is_full());
54 assert_eq
!(s
.len(), 1);
55 assert
!(!s
.is_empty());
56 assert
!(!s
.is_full());
57 assert_eq
!(r
.len(), 1);
58 assert
!(!r
.is_empty());
59 assert
!(!r
.is_full());
63 assert_eq
!(s
.len(), 2);
64 assert
!(!s
.is_empty());
66 assert_eq
!(r
.len(), 2);
67 assert
!(!r
.is_empty());
72 assert_eq
!(s
.len(), 1);
73 assert
!(!s
.is_empty());
74 assert
!(!s
.is_full());
75 assert_eq
!(r
.len(), 1);
76 assert
!(!r
.is_empty());
77 assert
!(!r
.is_full());
82 let (s
, r
) = bounded(100);
85 scope
.spawn(move |_
| {
86 assert_eq
!(r
.try_recv(), Err(TryRecvError
::Empty
));
87 thread
::sleep(ms(1500));
88 assert_eq
!(r
.try_recv(), Ok(7));
89 thread
::sleep(ms(500));
90 assert_eq
!(r
.try_recv(), Err(TryRecvError
::Disconnected
));
92 scope
.spawn(move |_
| {
93 thread
::sleep(ms(1000));
102 let (s
, r
) = bounded(100);
105 scope
.spawn(move |_
| {
106 assert_eq
!(r
.recv(), Ok(7));
107 thread
::sleep(ms(1000));
108 assert_eq
!(r
.recv(), Ok(8));
109 thread
::sleep(ms(1000));
110 assert_eq
!(r
.recv(), Ok(9));
111 assert_eq
!(r
.recv(), Err(RecvError
));
113 scope
.spawn(move |_
| {
114 thread
::sleep(ms(1500));
125 let (s
, r
) = bounded
::<i32>(100);
128 scope
.spawn(move |_
| {
129 assert_eq
!(r
.recv_timeout(ms(1000)), Err(RecvTimeoutError
::Timeout
));
130 assert_eq
!(r
.recv_timeout(ms(1000)), Ok(7));
132 r
.recv_timeout(ms(1000)),
133 Err(RecvTimeoutError
::Disconnected
)
136 scope
.spawn(move |_
| {
137 thread
::sleep(ms(1500));
146 let (s
, r
) = bounded(1);
149 scope
.spawn(move |_
| {
150 assert_eq
!(s
.try_send(1), Ok(()));
151 assert_eq
!(s
.try_send(2), Err(TrySendError
::Full(2)));
152 thread
::sleep(ms(1500));
153 assert_eq
!(s
.try_send(3), Ok(()));
154 thread
::sleep(ms(500));
155 assert_eq
!(s
.try_send(4), Err(TrySendError
::Disconnected(4)));
157 scope
.spawn(move |_
| {
158 thread
::sleep(ms(1000));
159 assert_eq
!(r
.try_recv(), Ok(1));
160 assert_eq
!(r
.try_recv(), Err(TryRecvError
::Empty
));
161 assert_eq
!(r
.recv(), Ok(3));
169 let (s
, r
) = bounded(1);
174 thread
::sleep(ms(1000));
176 thread
::sleep(ms(1000));
178 thread
::sleep(ms(1000));
182 thread
::sleep(ms(1500));
183 assert_eq
!(r
.recv(), Ok(7));
184 assert_eq
!(r
.recv(), Ok(8));
185 assert_eq
!(r
.recv(), Ok(9));
193 let (s
, r
) = bounded(2);
196 scope
.spawn(move |_
| {
197 assert_eq
!(s
.send_timeout(1, ms(1000)), Ok(()));
198 assert_eq
!(s
.send_timeout(2, ms(1000)), Ok(()));
200 s
.send_timeout(3, ms(500)),
201 Err(SendTimeoutError
::Timeout(3))
203 thread
::sleep(ms(1000));
204 assert_eq
!(s
.send_timeout(4, ms(1000)), Ok(()));
205 thread
::sleep(ms(1000));
206 assert_eq
!(s
.send(5), Err(SendError(5)));
208 scope
.spawn(move |_
| {
209 thread
::sleep(ms(1000));
210 assert_eq
!(r
.recv(), Ok(1));
211 thread
::sleep(ms(1000));
212 assert_eq
!(r
.recv(), Ok(2));
213 assert_eq
!(r
.recv(), Ok(4));
220 fn send_after_disconnect() {
221 let (s
, r
) = bounded(100);
229 assert_eq
!(s
.send(4), Err(SendError(4)));
230 assert_eq
!(s
.try_send(5), Err(TrySendError
::Disconnected(5)));
232 s
.send_timeout(6, ms(500)),
233 Err(SendTimeoutError
::Disconnected(6))
238 fn recv_after_disconnect() {
239 let (s
, r
) = bounded(100);
247 assert_eq
!(r
.recv(), Ok(1));
248 assert_eq
!(r
.recv(), Ok(2));
249 assert_eq
!(r
.recv(), Ok(3));
250 assert_eq
!(r
.recv(), Err(RecvError
));
256 const COUNT
: usize = 50;
258 const COUNT
: usize = 25_000;
260 const CAP
: usize = 50;
262 const CAP
: usize = 1000;
264 let (s
, r
) = bounded(CAP
);
266 assert_eq
!(s
.len(), 0);
267 assert_eq
!(r
.len(), 0);
269 for _
in 0..CAP
/ 10 {
272 assert_eq
!(s
.len(), i
+ 1);
277 assert_eq
!(r
.len(), 50 - i
- 1);
281 assert_eq
!(s
.len(), 0);
282 assert_eq
!(r
.len(), 0);
286 assert_eq
!(s
.len(), i
+ 1);
293 assert_eq
!(s
.len(), 0);
294 assert_eq
!(r
.len(), 0);
299 assert_eq
!(r
.recv(), Ok(i
));
315 assert_eq
!(s
.len(), 0);
316 assert_eq
!(r
.len(), 0);
320 fn disconnect_wakes_sender() {
321 let (s
, r
) = bounded(1);
324 scope
.spawn(move |_
| {
325 assert_eq
!(s
.send(()), Ok(()));
326 assert_eq
!(s
.send(()), Err(SendError(())));
328 scope
.spawn(move |_
| {
329 thread
::sleep(ms(1000));
337 fn disconnect_wakes_receiver() {
338 let (s
, r
) = bounded
::<()>(1);
341 scope
.spawn(move |_
| {
342 assert_eq
!(r
.recv(), Err(RecvError
));
344 scope
.spawn(move |_
| {
345 thread
::sleep(ms(1000));
355 const COUNT
: usize = 100;
357 const COUNT
: usize = 100_000;
359 let (s
, r
) = bounded(3);
362 scope
.spawn(move |_
| {
364 assert_eq
!(r
.recv(), Ok(i
));
366 assert_eq
!(r
.recv(), Err(RecvError
));
368 scope
.spawn(move |_
| {
380 const COUNT
: usize = 100;
382 const COUNT
: usize = 25_000;
383 const THREADS
: usize = 4;
385 let (s
, r
) = bounded
::<usize>(3);
386 let v
= (0..COUNT
).map(|_
| AtomicUsize
::new(0)).collect
::<Vec
<_
>>();
389 for _
in 0..THREADS
{
392 let n
= r
.recv().unwrap();
393 v
[n
].fetch_add(1, Ordering
::SeqCst
);
397 for _
in 0..THREADS
{
408 assert_eq
!(c
.load(Ordering
::SeqCst
), THREADS
);
413 fn stress_oneshot() {
415 const COUNT
: usize = 100;
417 const COUNT
: usize = 10_000;
420 let (s
, r
) = bounded(1);
423 scope
.spawn(|_
| r
.recv().unwrap());
424 scope
.spawn(|_
| s
.send(0).unwrap());
433 const COUNT
: usize = 100;
435 const COUNT
: usize = 100_000;
437 let (request_s
, request_r
) = bounded(1);
438 let (response_s
, response_r
) = bounded(1);
441 scope
.spawn(move |_
| {
444 for x
in response_r
.try_iter() {
450 request_s
.send(()).unwrap();
454 for _
in request_r
.iter() {
455 if response_s
.send(1).is_err() {
464 fn stress_timeout_two_threads() {
465 const COUNT
: usize = 100;
467 let (s
, r
) = bounded(2);
473 thread
::sleep(ms(50));
476 if let Ok(()) = s
.send_timeout(i
, ms(10)) {
486 thread
::sleep(ms(50));
489 if let Ok(x
) = r
.recv_timeout(ms(10)) {
503 const RUNS
: usize = 10;
505 const RUNS
: usize = 100;
507 const STEPS
: usize = 100;
509 const STEPS
: usize = 10_000;
511 static DROPS
: AtomicUsize
= AtomicUsize
::new(0);
513 #[derive(Debug, PartialEq)]
516 impl Drop
for DropCounter
{
518 DROPS
.fetch_add(1, Ordering
::SeqCst
);
522 let mut rng
= thread_rng();
525 let steps
= rng
.gen_range(0..STEPS
);
526 let additional
= rng
.gen_range(0..50);
528 DROPS
.store(0, Ordering
::SeqCst
);
529 let (s
, r
) = bounded
::<DropCounter
>(50);
536 std
::thread
::yield_now(); // https://github.com/rust-lang/miri/issues/1388
542 s
.send(DropCounter
).unwrap();
544 std
::thread
::yield_now(); // https://github.com/rust-lang/miri/issues/1388
550 for _
in 0..additional
{
551 s
.send(DropCounter
).unwrap();
554 assert_eq
!(DROPS
.load(Ordering
::SeqCst
), steps
);
557 assert_eq
!(DROPS
.load(Ordering
::SeqCst
), steps
+ additional
);
564 const COUNT
: usize = 50;
566 const COUNT
: usize = 25_000;
567 const THREADS
: usize = 4;
569 let (s
, r
) = bounded(THREADS
);
572 for _
in 0..THREADS
{
576 r
.try_recv().unwrap();
587 const COUNT
: usize = 100;
589 const COUNT
: usize = 10_000;
591 let (s1
, r1
) = bounded
::<()>(COUNT
);
592 let (s2
, r2
) = bounded
::<()>(COUNT
);
595 s1
.send(()).unwrap();
596 s2
.send(()).unwrap();
599 let mut hits
= [0usize
; 2];
602 recv(r1
) -> _
=> hits
[0] += 1,
603 recv(r2
) -> _
=> hits
[1] += 1,
606 assert
!(hits
.iter().all(|x
| *x
>= COUNT
/ hits
.len() / 2));
610 fn fairness_duplicates() {
612 const COUNT
: usize = 100;
614 const COUNT
: usize = 10_000;
616 let (s
, r
) = bounded
::<()>(COUNT
);
622 let mut hits
= [0usize
; 5];
625 recv(r
) -> _
=> hits
[0] += 1,
626 recv(r
) -> _
=> hits
[1] += 1,
627 recv(r
) -> _
=> hits
[2] += 1,
628 recv(r
) -> _
=> hits
[3] += 1,
629 recv(r
) -> _
=> hits
[4] += 1,
632 assert
!(hits
.iter().all(|x
| *x
>= COUNT
/ hits
.len() / 2));
637 let (s
, _r
) = bounded(1);
640 #[allow(unreachable_code)]
643 send(s
, panic
!()) -> _
=> panic
!(),
648 let (s
, r
) = bounded(2);
652 send(s
, assert_eq
!(r
.recv(), Ok(()))) -> _
=> {}
657 fn channel_through_channel() {
659 const COUNT
: usize = 100;
661 const COUNT
: usize = 1000;
663 type T
= Box
<dyn Any
+ Send
>;
665 let (s
, r
) = bounded
::<T
>(1);
668 scope
.spawn(move |_
| {
672 let (new_s
, new_r
) = bounded(1);
673 let new_r
: T
= Box
::new(Some(new_r
));
675 s
.send(new_r
).unwrap();
680 scope
.spawn(move |_
| {
687 .downcast_mut
::<Option
<Receiver
<T
>>>()
699 struct Msg1
<'a
>(&'a
mut bool
);
700 impl Drop
for Msg1
<'_
> {
702 if *self.0 && !std
::thread
::panicking() {
703 panic
!("double drop");
710 struct Msg2
<'a
>(&'a
mut bool
);
711 impl Drop
for Msg2
<'_
> {
714 panic
!("double drop");
717 panic
!("first drop");
723 let (s
, r
) = bounded(2);
724 let (mut a
, mut b
) = (false, false);
725 s
.send(Msg1(&mut a
)).unwrap();
726 s
.send(Msg1(&mut b
)).unwrap();
733 let (s
, r
) = bounded(2);
734 let (mut a
, mut b
) = (false, false);
735 s
.send(Msg2(&mut a
)).unwrap();
736 s
.send(Msg2(&mut b
)).unwrap();
738 let res
= std
::panic
::catch_unwind(move || {
742 *res
.unwrap_err().downcast_ref
::<&str>().unwrap(),
746 // Elements after the panicked element will leak.