]> git.proxmox.com Git - rustc.git/blame - vendor/crossbeam-channel/tests/array.rs
New upstream version 1.43.0+dfsg1
[rustc.git] / vendor / crossbeam-channel / tests / array.rs
CommitLineData
dfeec247
XL
1//! Tests for the array channel flavor.
2
3#[macro_use]
4extern crate crossbeam_channel;
5extern crate crossbeam_utils;
6extern crate rand;
7
74b04a01 8use std::any::Any;
dfeec247
XL
9use std::sync::atomic::AtomicUsize;
10use std::sync::atomic::Ordering;
11use std::thread;
12use std::time::Duration;
13
74b04a01 14use crossbeam_channel::{bounded, Receiver};
dfeec247
XL
15use crossbeam_channel::{RecvError, RecvTimeoutError, TryRecvError};
16use crossbeam_channel::{SendError, SendTimeoutError, TrySendError};
17use crossbeam_utils::thread::scope;
18use rand::{thread_rng, Rng};
19
20fn ms(ms: u64) -> Duration {
21 Duration::from_millis(ms)
22}
23
24#[test]
25fn smoke() {
26 let (s, r) = bounded(1);
27 s.send(7).unwrap();
28 assert_eq!(r.try_recv(), Ok(7));
29
30 s.send(8).unwrap();
31 assert_eq!(r.recv(), Ok(8));
32
33 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
34 assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout));
35}
36
37#[test]
38fn capacity() {
39 for i in 1..10 {
40 let (s, r) = bounded::<()>(i);
41 assert_eq!(s.capacity(), Some(i));
42 assert_eq!(r.capacity(), Some(i));
43 }
44}
45
46#[test]
47fn len_empty_full() {
48 let (s, r) = bounded(2);
49
50 assert_eq!(s.len(), 0);
51 assert_eq!(s.is_empty(), true);
52 assert_eq!(s.is_full(), false);
53 assert_eq!(r.len(), 0);
54 assert_eq!(r.is_empty(), true);
55 assert_eq!(r.is_full(), false);
56
57 s.send(()).unwrap();
58
59 assert_eq!(s.len(), 1);
60 assert_eq!(s.is_empty(), false);
61 assert_eq!(s.is_full(), false);
62 assert_eq!(r.len(), 1);
63 assert_eq!(r.is_empty(), false);
64 assert_eq!(r.is_full(), false);
65
66 s.send(()).unwrap();
67
68 assert_eq!(s.len(), 2);
69 assert_eq!(s.is_empty(), false);
70 assert_eq!(s.is_full(), true);
71 assert_eq!(r.len(), 2);
72 assert_eq!(r.is_empty(), false);
73 assert_eq!(r.is_full(), true);
74
75 r.recv().unwrap();
76
77 assert_eq!(s.len(), 1);
78 assert_eq!(s.is_empty(), false);
79 assert_eq!(s.is_full(), false);
80 assert_eq!(r.len(), 1);
81 assert_eq!(r.is_empty(), false);
82 assert_eq!(r.is_full(), false);
83}
84
85#[test]
86fn try_recv() {
87 let (s, r) = bounded(100);
88
89 scope(|scope| {
90 scope.spawn(move |_| {
91 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
92 thread::sleep(ms(1500));
93 assert_eq!(r.try_recv(), Ok(7));
94 thread::sleep(ms(500));
95 assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
96 });
97 scope.spawn(move |_| {
98 thread::sleep(ms(1000));
99 s.send(7).unwrap();
100 });
74b04a01
XL
101 })
102 .unwrap();
dfeec247
XL
103}
104
105#[test]
106fn recv() {
107 let (s, r) = bounded(100);
108
109 scope(|scope| {
110 scope.spawn(move |_| {
111 assert_eq!(r.recv(), Ok(7));
112 thread::sleep(ms(1000));
113 assert_eq!(r.recv(), Ok(8));
114 thread::sleep(ms(1000));
115 assert_eq!(r.recv(), Ok(9));
116 assert_eq!(r.recv(), Err(RecvError));
117 });
118 scope.spawn(move |_| {
119 thread::sleep(ms(1500));
120 s.send(7).unwrap();
121 s.send(8).unwrap();
122 s.send(9).unwrap();
123 });
74b04a01
XL
124 })
125 .unwrap();
dfeec247
XL
126}
127
128#[test]
129fn recv_timeout() {
130 let (s, r) = bounded::<i32>(100);
131
132 scope(|scope| {
133 scope.spawn(move |_| {
134 assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout));
135 assert_eq!(r.recv_timeout(ms(1000)), Ok(7));
136 assert_eq!(
137 r.recv_timeout(ms(1000)),
138 Err(RecvTimeoutError::Disconnected)
139 );
140 });
141 scope.spawn(move |_| {
142 thread::sleep(ms(1500));
143 s.send(7).unwrap();
144 });
74b04a01
XL
145 })
146 .unwrap();
dfeec247
XL
147}
148
149#[test]
150fn try_send() {
151 let (s, r) = bounded(1);
152
153 scope(|scope| {
154 scope.spawn(move |_| {
155 assert_eq!(s.try_send(1), Ok(()));
156 assert_eq!(s.try_send(2), Err(TrySendError::Full(2)));
157 thread::sleep(ms(1500));
158 assert_eq!(s.try_send(3), Ok(()));
159 thread::sleep(ms(500));
160 assert_eq!(s.try_send(4), Err(TrySendError::Disconnected(4)));
161 });
162 scope.spawn(move |_| {
163 thread::sleep(ms(1000));
164 assert_eq!(r.try_recv(), Ok(1));
165 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
166 assert_eq!(r.recv(), Ok(3));
167 });
74b04a01
XL
168 })
169 .unwrap();
dfeec247
XL
170}
171
172#[test]
173fn send() {
174 let (s, r) = bounded(1);
175
176 scope(|scope| {
177 scope.spawn(|_| {
178 s.send(7).unwrap();
179 thread::sleep(ms(1000));
180 s.send(8).unwrap();
181 thread::sleep(ms(1000));
182 s.send(9).unwrap();
183 thread::sleep(ms(1000));
184 s.send(10).unwrap();
185 });
186 scope.spawn(|_| {
187 thread::sleep(ms(1500));
188 assert_eq!(r.recv(), Ok(7));
189 assert_eq!(r.recv(), Ok(8));
190 assert_eq!(r.recv(), Ok(9));
191 });
74b04a01
XL
192 })
193 .unwrap();
dfeec247
XL
194}
195
196#[test]
197fn send_timeout() {
198 let (s, r) = bounded(2);
199
200 scope(|scope| {
201 scope.spawn(move |_| {
202 assert_eq!(s.send_timeout(1, ms(1000)), Ok(()));
203 assert_eq!(s.send_timeout(2, ms(1000)), Ok(()));
204 assert_eq!(
205 s.send_timeout(3, ms(500)),
206 Err(SendTimeoutError::Timeout(3))
207 );
208 thread::sleep(ms(1000));
209 assert_eq!(s.send_timeout(4, ms(1000)), Ok(()));
210 thread::sleep(ms(1000));
211 assert_eq!(s.send(5), Err(SendError(5)));
212 });
213 scope.spawn(move |_| {
214 thread::sleep(ms(1000));
215 assert_eq!(r.recv(), Ok(1));
216 thread::sleep(ms(1000));
217 assert_eq!(r.recv(), Ok(2));
218 assert_eq!(r.recv(), Ok(4));
219 });
74b04a01
XL
220 })
221 .unwrap();
dfeec247
XL
222}
223
224#[test]
225fn send_after_disconnect() {
226 let (s, r) = bounded(100);
227
228 s.send(1).unwrap();
229 s.send(2).unwrap();
230 s.send(3).unwrap();
231
232 drop(r);
233
234 assert_eq!(s.send(4), Err(SendError(4)));
235 assert_eq!(s.try_send(5), Err(TrySendError::Disconnected(5)));
236 assert_eq!(
237 s.send_timeout(6, ms(500)),
238 Err(SendTimeoutError::Disconnected(6))
239 );
240}
241
242#[test]
243fn recv_after_disconnect() {
244 let (s, r) = bounded(100);
245
246 s.send(1).unwrap();
247 s.send(2).unwrap();
248 s.send(3).unwrap();
249
250 drop(s);
251
252 assert_eq!(r.recv(), Ok(1));
253 assert_eq!(r.recv(), Ok(2));
254 assert_eq!(r.recv(), Ok(3));
255 assert_eq!(r.recv(), Err(RecvError));
256}
257
258#[test]
259fn len() {
260 const COUNT: usize = 25_000;
261 const CAP: usize = 1000;
262
263 let (s, r) = bounded(CAP);
264
265 assert_eq!(s.len(), 0);
266 assert_eq!(r.len(), 0);
267
268 for _ in 0..CAP / 10 {
269 for i in 0..50 {
270 s.send(i).unwrap();
271 assert_eq!(s.len(), i + 1);
272 }
273
274 for i in 0..50 {
275 r.recv().unwrap();
276 assert_eq!(r.len(), 50 - i - 1);
277 }
278 }
279
280 assert_eq!(s.len(), 0);
281 assert_eq!(r.len(), 0);
282
283 for i in 0..CAP {
284 s.send(i).unwrap();
285 assert_eq!(s.len(), i + 1);
286 }
287
288 for _ in 0..CAP {
289 r.recv().unwrap();
290 }
291
292 assert_eq!(s.len(), 0);
293 assert_eq!(r.len(), 0);
294
295 scope(|scope| {
296 scope.spawn(|_| {
297 for i in 0..COUNT {
298 assert_eq!(r.recv(), Ok(i));
299 let len = r.len();
300 assert!(len <= CAP);
301 }
302 });
303
304 scope.spawn(|_| {
305 for i in 0..COUNT {
306 s.send(i).unwrap();
307 let len = s.len();
308 assert!(len <= CAP);
309 }
310 });
74b04a01
XL
311 })
312 .unwrap();
dfeec247
XL
313
314 assert_eq!(s.len(), 0);
315 assert_eq!(r.len(), 0);
316}
317
318#[test]
319fn disconnect_wakes_sender() {
320 let (s, r) = bounded(1);
321
322 scope(|scope| {
323 scope.spawn(move |_| {
324 assert_eq!(s.send(()), Ok(()));
325 assert_eq!(s.send(()), Err(SendError(())));
326 });
327 scope.spawn(move |_| {
328 thread::sleep(ms(1000));
329 drop(r);
330 });
74b04a01
XL
331 })
332 .unwrap();
dfeec247
XL
333}
334
335#[test]
336fn disconnect_wakes_receiver() {
337 let (s, r) = bounded::<()>(1);
338
339 scope(|scope| {
340 scope.spawn(move |_| {
341 assert_eq!(r.recv(), Err(RecvError));
342 });
343 scope.spawn(move |_| {
344 thread::sleep(ms(1000));
345 drop(s);
346 });
74b04a01
XL
347 })
348 .unwrap();
dfeec247
XL
349}
350
351#[test]
352fn spsc() {
353 const COUNT: usize = 100_000;
354
355 let (s, r) = bounded(3);
356
357 scope(|scope| {
358 scope.spawn(move |_| {
359 for i in 0..COUNT {
360 assert_eq!(r.recv(), Ok(i));
361 }
362 assert_eq!(r.recv(), Err(RecvError));
363 });
364 scope.spawn(move |_| {
365 for i in 0..COUNT {
366 s.send(i).unwrap();
367 }
368 });
74b04a01
XL
369 })
370 .unwrap();
dfeec247
XL
371}
372
373#[test]
374fn mpmc() {
375 const COUNT: usize = 25_000;
376 const THREADS: usize = 4;
377
378 let (s, r) = bounded::<usize>(3);
379 let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
380
381 scope(|scope| {
382 for _ in 0..THREADS {
383 scope.spawn(|_| {
384 for _ in 0..COUNT {
385 let n = r.recv().unwrap();
386 v[n].fetch_add(1, Ordering::SeqCst);
387 }
388 });
389 }
390 for _ in 0..THREADS {
391 scope.spawn(|_| {
392 for i in 0..COUNT {
393 s.send(i).unwrap();
394 }
395 });
396 }
74b04a01
XL
397 })
398 .unwrap();
dfeec247
XL
399
400 for c in v {
401 assert_eq!(c.load(Ordering::SeqCst), THREADS);
402 }
403}
404
405#[test]
406fn stress_oneshot() {
407 const COUNT: usize = 10_000;
408
409 for _ in 0..COUNT {
410 let (s, r) = bounded(1);
411
412 scope(|scope| {
413 scope.spawn(|_| r.recv().unwrap());
414 scope.spawn(|_| s.send(0).unwrap());
74b04a01
XL
415 })
416 .unwrap();
dfeec247
XL
417 }
418}
419
420#[test]
421fn stress_iter() {
422 const COUNT: usize = 100_000;
423
424 let (request_s, request_r) = bounded(1);
425 let (response_s, response_r) = bounded(1);
426
427 scope(|scope| {
428 scope.spawn(move |_| {
429 let mut count = 0;
430 loop {
431 for x in response_r.try_iter() {
432 count += x;
433 if count == COUNT {
434 return;
435 }
436 }
437 request_s.send(()).unwrap();
438 }
439 });
440
441 for _ in request_r.iter() {
442 if response_s.send(1).is_err() {
443 break;
444 }
445 }
74b04a01
XL
446 })
447 .unwrap();
dfeec247
XL
448}
449
450#[test]
451fn stress_timeout_two_threads() {
452 const COUNT: usize = 100;
453
454 let (s, r) = bounded(2);
455
456 scope(|scope| {
457 scope.spawn(|_| {
458 for i in 0..COUNT {
459 if i % 2 == 0 {
460 thread::sleep(ms(50));
461 }
462 loop {
463 if let Ok(()) = s.send_timeout(i, ms(10)) {
464 break;
465 }
466 }
467 }
468 });
469
470 scope.spawn(|_| {
471 for i in 0..COUNT {
472 if i % 2 == 0 {
473 thread::sleep(ms(50));
474 }
475 loop {
476 if let Ok(x) = r.recv_timeout(ms(10)) {
477 assert_eq!(x, i);
478 break;
479 }
480 }
481 }
482 });
74b04a01
XL
483 })
484 .unwrap();
dfeec247
XL
485}
486
487#[test]
488fn drops() {
489 const RUNS: usize = 100;
490
491 static DROPS: AtomicUsize = AtomicUsize::new(0);
492
493 #[derive(Debug, PartialEq)]
494 struct DropCounter;
495
496 impl Drop for DropCounter {
497 fn drop(&mut self) {
498 DROPS.fetch_add(1, Ordering::SeqCst);
499 }
500 }
501
502 let mut rng = thread_rng();
503
504 for _ in 0..RUNS {
505 let steps = rng.gen_range(0, 10_000);
506 let additional = rng.gen_range(0, 50);
507
508 DROPS.store(0, Ordering::SeqCst);
509 let (s, r) = bounded::<DropCounter>(50);
510
511 scope(|scope| {
512 scope.spawn(|_| {
513 for _ in 0..steps {
514 r.recv().unwrap();
515 }
516 });
517
518 scope.spawn(|_| {
519 for _ in 0..steps {
520 s.send(DropCounter).unwrap();
521 }
522 });
74b04a01
XL
523 })
524 .unwrap();
dfeec247
XL
525
526 for _ in 0..additional {
527 s.send(DropCounter).unwrap();
528 }
529
530 assert_eq!(DROPS.load(Ordering::SeqCst), steps);
531 drop(s);
532 drop(r);
533 assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional);
534 }
535}
536
537#[test]
538fn linearizable() {
539 const COUNT: usize = 25_000;
540 const THREADS: usize = 4;
541
542 let (s, r) = bounded(THREADS);
543
544 scope(|scope| {
545 for _ in 0..THREADS {
546 scope.spawn(|_| {
547 for _ in 0..COUNT {
548 s.send(0).unwrap();
549 r.try_recv().unwrap();
550 }
551 });
552 }
74b04a01
XL
553 })
554 .unwrap();
dfeec247
XL
555}
556
557#[test]
558fn fairness() {
559 const COUNT: usize = 10_000;
560
561 let (s1, r1) = bounded::<()>(COUNT);
562 let (s2, r2) = bounded::<()>(COUNT);
563
564 for _ in 0..COUNT {
565 s1.send(()).unwrap();
566 s2.send(()).unwrap();
567 }
568
569 let mut hits = [0usize; 2];
570 for _ in 0..COUNT {
571 select! {
572 recv(r1) -> _ => hits[0] += 1,
573 recv(r2) -> _ => hits[1] += 1,
574 }
575 }
576 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
577}
578
579#[test]
580fn fairness_duplicates() {
581 const COUNT: usize = 10_000;
582
583 let (s, r) = bounded::<()>(COUNT);
584
585 for _ in 0..COUNT {
586 s.send(()).unwrap();
587 }
588
589 let mut hits = [0usize; 5];
590 for _ in 0..COUNT {
591 select! {
592 recv(r) -> _ => hits[0] += 1,
593 recv(r) -> _ => hits[1] += 1,
594 recv(r) -> _ => hits[2] += 1,
595 recv(r) -> _ => hits[3] += 1,
596 recv(r) -> _ => hits[4] += 1,
597 }
598 }
599 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
600}
601
602#[test]
603fn recv_in_send() {
604 let (s, _r) = bounded(1);
605 s.send(()).unwrap();
606
607 #[allow(unreachable_code)]
608 {
609 select! {
610 send(s, panic!()) -> _ => panic!(),
611 default => {}
612 }
613 }
614
615 let (s, r) = bounded(2);
616 s.send(()).unwrap();
617
618 select! {
619 send(s, assert_eq!(r.recv(), Ok(()))) -> _ => {}
620 }
621}
74b04a01
XL
622
623#[test]
624fn channel_through_channel() {
625 const COUNT: usize = 1000;
626
627 type T = Box<dyn Any + Send>;
628
629 let (s, r) = bounded::<T>(1);
630
631 scope(|scope| {
632 scope.spawn(move |_| {
633 let mut s = s;
634
635 for _ in 0..COUNT {
636 let (new_s, new_r) = bounded(1);
637 let new_r: T = Box::new(Some(new_r));
638
639 s.send(new_r).unwrap();
640 s = new_s;
641 }
642 });
643
644 scope.spawn(move |_| {
645 let mut r = r;
646
647 for _ in 0..COUNT {
648 r = r
649 .recv()
650 .unwrap()
651 .downcast_mut::<Option<Receiver<T>>>()
652 .unwrap()
653 .take()
654 .unwrap()
655 }
656 });
657 })
658 .unwrap();
659}