]> git.proxmox.com Git - rustc.git/blob - vendor/crossbeam-channel/tests/select.rs
New upstream version 1.65.0+dfsg1
[rustc.git] / vendor / crossbeam-channel / tests / select.rs
1 //! Tests for channel selection using the `Select` struct.
2
3 #![allow(clippy::drop_copy)]
4
5 use std::any::Any;
6 use std::cell::Cell;
7 use std::thread;
8 use std::time::{Duration, Instant};
9
10 use crossbeam_channel::{after, bounded, tick, unbounded, Receiver, Select, TryRecvError};
11 use crossbeam_utils::thread::scope;
12
13 fn ms(ms: u64) -> Duration {
14 Duration::from_millis(ms)
15 }
16
17 #[test]
18 fn smoke1() {
19 let (s1, r1) = unbounded::<usize>();
20 let (s2, r2) = unbounded::<usize>();
21
22 s1.send(1).unwrap();
23
24 let mut sel = Select::new();
25 let oper1 = sel.recv(&r1);
26 let oper2 = sel.recv(&r2);
27 let oper = sel.select();
28 match oper.index() {
29 i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)),
30 i if i == oper2 => panic!(),
31 _ => unreachable!(),
32 }
33
34 s2.send(2).unwrap();
35
36 let mut sel = Select::new();
37 let oper1 = sel.recv(&r1);
38 let oper2 = sel.recv(&r2);
39 let oper = sel.select();
40 match oper.index() {
41 i if i == oper1 => panic!(),
42 i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)),
43 _ => unreachable!(),
44 }
45 }
46
47 #[test]
48 fn smoke2() {
49 let (_s1, r1) = unbounded::<i32>();
50 let (_s2, r2) = unbounded::<i32>();
51 let (_s3, r3) = unbounded::<i32>();
52 let (_s4, r4) = unbounded::<i32>();
53 let (s5, r5) = unbounded::<i32>();
54
55 s5.send(5).unwrap();
56
57 let mut sel = Select::new();
58 let oper1 = sel.recv(&r1);
59 let oper2 = sel.recv(&r2);
60 let oper3 = sel.recv(&r3);
61 let oper4 = sel.recv(&r4);
62 let oper5 = sel.recv(&r5);
63 let oper = sel.select();
64 match oper.index() {
65 i if i == oper1 => panic!(),
66 i if i == oper2 => panic!(),
67 i if i == oper3 => panic!(),
68 i if i == oper4 => panic!(),
69 i if i == oper5 => assert_eq!(oper.recv(&r5), Ok(5)),
70 _ => unreachable!(),
71 }
72 }
73
74 #[test]
75 fn disconnected() {
76 let (s1, r1) = unbounded::<i32>();
77 let (s2, r2) = unbounded::<i32>();
78
79 scope(|scope| {
80 scope.spawn(|_| {
81 drop(s1);
82 thread::sleep(ms(500));
83 s2.send(5).unwrap();
84 });
85
86 let mut sel = Select::new();
87 let oper1 = sel.recv(&r1);
88 let oper2 = sel.recv(&r2);
89 let oper = sel.select_timeout(ms(1000));
90 match oper {
91 Err(_) => panic!(),
92 Ok(oper) => match oper.index() {
93 i if i == oper1 => assert!(oper.recv(&r1).is_err()),
94 i if i == oper2 => panic!(),
95 _ => unreachable!(),
96 },
97 }
98
99 r2.recv().unwrap();
100 })
101 .unwrap();
102
103 let mut sel = Select::new();
104 let oper1 = sel.recv(&r1);
105 let oper2 = sel.recv(&r2);
106 let oper = sel.select_timeout(ms(1000));
107 match oper {
108 Err(_) => panic!(),
109 Ok(oper) => match oper.index() {
110 i if i == oper1 => assert!(oper.recv(&r1).is_err()),
111 i if i == oper2 => panic!(),
112 _ => unreachable!(),
113 },
114 }
115
116 scope(|scope| {
117 scope.spawn(|_| {
118 thread::sleep(ms(500));
119 drop(s2);
120 });
121
122 let mut sel = Select::new();
123 let oper1 = sel.recv(&r2);
124 let oper = sel.select_timeout(ms(1000));
125 match oper {
126 Err(_) => panic!(),
127 Ok(oper) => match oper.index() {
128 i if i == oper1 => assert!(oper.recv(&r2).is_err()),
129 _ => unreachable!(),
130 },
131 }
132 })
133 .unwrap();
134 }
135
136 #[test]
137 fn default() {
138 let (s1, r1) = unbounded::<i32>();
139 let (s2, r2) = unbounded::<i32>();
140
141 let mut sel = Select::new();
142 let _oper1 = sel.recv(&r1);
143 let _oper2 = sel.recv(&r2);
144 let oper = sel.try_select();
145 match oper {
146 Err(_) => {}
147 Ok(_) => panic!(),
148 }
149
150 drop(s1);
151
152 let mut sel = Select::new();
153 let oper1 = sel.recv(&r1);
154 let oper2 = sel.recv(&r2);
155 let oper = sel.try_select();
156 match oper {
157 Err(_) => panic!(),
158 Ok(oper) => match oper.index() {
159 i if i == oper1 => assert!(oper.recv(&r1).is_err()),
160 i if i == oper2 => panic!(),
161 _ => unreachable!(),
162 },
163 }
164
165 s2.send(2).unwrap();
166
167 let mut sel = Select::new();
168 let oper1 = sel.recv(&r2);
169 let oper = sel.try_select();
170 match oper {
171 Err(_) => panic!(),
172 Ok(oper) => match oper.index() {
173 i if i == oper1 => assert_eq!(oper.recv(&r2), Ok(2)),
174 _ => unreachable!(),
175 },
176 }
177
178 let mut sel = Select::new();
179 let _oper1 = sel.recv(&r2);
180 let oper = sel.try_select();
181 match oper {
182 Err(_) => {}
183 Ok(_) => panic!(),
184 }
185
186 let mut sel = Select::new();
187 let oper = sel.try_select();
188 match oper {
189 Err(_) => {}
190 Ok(_) => panic!(),
191 }
192 }
193
194 #[test]
195 fn timeout() {
196 let (_s1, r1) = unbounded::<i32>();
197 let (s2, r2) = unbounded::<i32>();
198
199 scope(|scope| {
200 scope.spawn(|_| {
201 thread::sleep(ms(1500));
202 s2.send(2).unwrap();
203 });
204
205 let mut sel = Select::new();
206 let oper1 = sel.recv(&r1);
207 let oper2 = sel.recv(&r2);
208 let oper = sel.select_timeout(ms(1000));
209 match oper {
210 Err(_) => {}
211 Ok(oper) => match oper.index() {
212 i if i == oper1 => panic!(),
213 i if i == oper2 => panic!(),
214 _ => unreachable!(),
215 },
216 }
217
218 let mut sel = Select::new();
219 let oper1 = sel.recv(&r1);
220 let oper2 = sel.recv(&r2);
221 let oper = sel.select_timeout(ms(1000));
222 match oper {
223 Err(_) => panic!(),
224 Ok(oper) => match oper.index() {
225 i if i == oper1 => panic!(),
226 i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)),
227 _ => unreachable!(),
228 },
229 }
230 })
231 .unwrap();
232
233 scope(|scope| {
234 let (s, r) = unbounded::<i32>();
235
236 scope.spawn(move |_| {
237 thread::sleep(ms(500));
238 drop(s);
239 });
240
241 let mut sel = Select::new();
242 let oper = sel.select_timeout(ms(1000));
243 match oper {
244 Err(_) => {
245 let mut sel = Select::new();
246 let oper1 = sel.recv(&r);
247 let oper = sel.try_select();
248 match oper {
249 Err(_) => panic!(),
250 Ok(oper) => match oper.index() {
251 i if i == oper1 => assert!(oper.recv(&r).is_err()),
252 _ => unreachable!(),
253 },
254 }
255 }
256 Ok(_) => unreachable!(),
257 }
258 })
259 .unwrap();
260 }
261
262 #[test]
263 fn default_when_disconnected() {
264 let (_, r) = unbounded::<i32>();
265
266 let mut sel = Select::new();
267 let oper1 = sel.recv(&r);
268 let oper = sel.try_select();
269 match oper {
270 Err(_) => panic!(),
271 Ok(oper) => match oper.index() {
272 i if i == oper1 => assert!(oper.recv(&r).is_err()),
273 _ => unreachable!(),
274 },
275 }
276
277 let (_, r) = unbounded::<i32>();
278
279 let mut sel = Select::new();
280 let oper1 = sel.recv(&r);
281 let oper = sel.select_timeout(ms(1000));
282 match oper {
283 Err(_) => panic!(),
284 Ok(oper) => match oper.index() {
285 i if i == oper1 => assert!(oper.recv(&r).is_err()),
286 _ => unreachable!(),
287 },
288 }
289
290 let (s, _) = bounded::<i32>(0);
291
292 let mut sel = Select::new();
293 let oper1 = sel.send(&s);
294 let oper = sel.try_select();
295 match oper {
296 Err(_) => panic!(),
297 Ok(oper) => match oper.index() {
298 i if i == oper1 => assert!(oper.send(&s, 0).is_err()),
299 _ => unreachable!(),
300 },
301 }
302
303 let (s, _) = bounded::<i32>(0);
304
305 let mut sel = Select::new();
306 let oper1 = sel.send(&s);
307 let oper = sel.select_timeout(ms(1000));
308 match oper {
309 Err(_) => panic!(),
310 Ok(oper) => match oper.index() {
311 i if i == oper1 => assert!(oper.send(&s, 0).is_err()),
312 _ => unreachable!(),
313 },
314 }
315 }
316
317 #[test]
318 fn default_only() {
319 let start = Instant::now();
320
321 let mut sel = Select::new();
322 let oper = sel.try_select();
323 assert!(oper.is_err());
324 let now = Instant::now();
325 assert!(now - start <= ms(50));
326
327 let start = Instant::now();
328 let mut sel = Select::new();
329 let oper = sel.select_timeout(ms(500));
330 assert!(oper.is_err());
331 let now = Instant::now();
332 assert!(now - start >= ms(450));
333 assert!(now - start <= ms(550));
334 }
335
336 #[test]
337 fn unblocks() {
338 let (s1, r1) = bounded::<i32>(0);
339 let (s2, r2) = bounded::<i32>(0);
340
341 scope(|scope| {
342 scope.spawn(|_| {
343 thread::sleep(ms(500));
344 s2.send(2).unwrap();
345 });
346
347 let mut sel = Select::new();
348 let oper1 = sel.recv(&r1);
349 let oper2 = sel.recv(&r2);
350 let oper = sel.select_timeout(ms(1000));
351 match oper {
352 Err(_) => panic!(),
353 Ok(oper) => match oper.index() {
354 i if i == oper1 => panic!(),
355 i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)),
356 _ => unreachable!(),
357 },
358 }
359 })
360 .unwrap();
361
362 scope(|scope| {
363 scope.spawn(|_| {
364 thread::sleep(ms(500));
365 assert_eq!(r1.recv().unwrap(), 1);
366 });
367
368 let mut sel = Select::new();
369 let oper1 = sel.send(&s1);
370 let oper2 = sel.send(&s2);
371 let oper = sel.select_timeout(ms(1000));
372 match oper {
373 Err(_) => panic!(),
374 Ok(oper) => match oper.index() {
375 i if i == oper1 => oper.send(&s1, 1).unwrap(),
376 i if i == oper2 => panic!(),
377 _ => unreachable!(),
378 },
379 }
380 })
381 .unwrap();
382 }
383
384 #[test]
385 fn both_ready() {
386 let (s1, r1) = bounded(0);
387 let (s2, r2) = bounded(0);
388
389 scope(|scope| {
390 scope.spawn(|_| {
391 thread::sleep(ms(500));
392 s1.send(1).unwrap();
393 assert_eq!(r2.recv().unwrap(), 2);
394 });
395
396 for _ in 0..2 {
397 let mut sel = Select::new();
398 let oper1 = sel.recv(&r1);
399 let oper2 = sel.send(&s2);
400 let oper = sel.select();
401 match oper.index() {
402 i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)),
403 i if i == oper2 => oper.send(&s2, 2).unwrap(),
404 _ => unreachable!(),
405 }
406 }
407 })
408 .unwrap();
409 }
410
411 #[test]
412 fn loop_try() {
413 const RUNS: usize = 20;
414
415 for _ in 0..RUNS {
416 let (s1, r1) = bounded::<i32>(0);
417 let (s2, r2) = bounded::<i32>(0);
418 let (s_end, r_end) = bounded::<()>(0);
419
420 scope(|scope| {
421 scope.spawn(|_| loop {
422 let mut done = false;
423
424 let mut sel = Select::new();
425 let oper1 = sel.send(&s1);
426 let oper = sel.try_select();
427 match oper {
428 Err(_) => {}
429 Ok(oper) => match oper.index() {
430 i if i == oper1 => {
431 let _ = oper.send(&s1, 1);
432 done = true;
433 }
434 _ => unreachable!(),
435 },
436 }
437 if done {
438 break;
439 }
440
441 let mut sel = Select::new();
442 let oper1 = sel.recv(&r_end);
443 let oper = sel.try_select();
444 match oper {
445 Err(_) => {}
446 Ok(oper) => match oper.index() {
447 i if i == oper1 => {
448 let _ = oper.recv(&r_end);
449 done = true;
450 }
451 _ => unreachable!(),
452 },
453 }
454 if done {
455 break;
456 }
457 });
458
459 scope.spawn(|_| loop {
460 if let Ok(x) = r2.try_recv() {
461 assert_eq!(x, 2);
462 break;
463 }
464
465 let mut done = false;
466 let mut sel = Select::new();
467 let oper1 = sel.recv(&r_end);
468 let oper = sel.try_select();
469 match oper {
470 Err(_) => {}
471 Ok(oper) => match oper.index() {
472 i if i == oper1 => {
473 let _ = oper.recv(&r_end);
474 done = true;
475 }
476 _ => unreachable!(),
477 },
478 }
479 if done {
480 break;
481 }
482 });
483
484 scope.spawn(|_| {
485 thread::sleep(ms(500));
486
487 let mut sel = Select::new();
488 let oper1 = sel.recv(&r1);
489 let oper2 = sel.send(&s2);
490 let oper = sel.select_timeout(ms(1000));
491 match oper {
492 Err(_) => {}
493 Ok(oper) => match oper.index() {
494 i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)),
495 i if i == oper2 => assert!(oper.send(&s2, 2).is_ok()),
496 _ => unreachable!(),
497 },
498 }
499
500 drop(s_end);
501 });
502 })
503 .unwrap();
504 }
505 }
506
507 #[test]
508 fn cloning1() {
509 scope(|scope| {
510 let (s1, r1) = unbounded::<i32>();
511 let (_s2, r2) = unbounded::<i32>();
512 let (s3, r3) = unbounded::<()>();
513
514 scope.spawn(move |_| {
515 r3.recv().unwrap();
516 drop(s1.clone());
517 assert!(r3.try_recv().is_err());
518 s1.send(1).unwrap();
519 r3.recv().unwrap();
520 });
521
522 s3.send(()).unwrap();
523
524 let mut sel = Select::new();
525 let oper1 = sel.recv(&r1);
526 let oper2 = sel.recv(&r2);
527 let oper = sel.select();
528 match oper.index() {
529 i if i == oper1 => drop(oper.recv(&r1)),
530 i if i == oper2 => drop(oper.recv(&r2)),
531 _ => unreachable!(),
532 }
533
534 s3.send(()).unwrap();
535 })
536 .unwrap();
537 }
538
539 #[test]
540 fn cloning2() {
541 let (s1, r1) = unbounded::<()>();
542 let (s2, r2) = unbounded::<()>();
543 let (_s3, _r3) = unbounded::<()>();
544
545 scope(|scope| {
546 scope.spawn(move |_| {
547 let mut sel = Select::new();
548 let oper1 = sel.recv(&r1);
549 let oper2 = sel.recv(&r2);
550 let oper = sel.select();
551 match oper.index() {
552 i if i == oper1 => panic!(),
553 i if i == oper2 => drop(oper.recv(&r2)),
554 _ => unreachable!(),
555 }
556 });
557
558 thread::sleep(ms(500));
559 drop(s1.clone());
560 s2.send(()).unwrap();
561 })
562 .unwrap();
563 }
564
565 #[test]
566 fn preflight1() {
567 let (s, r) = unbounded();
568 s.send(()).unwrap();
569
570 let mut sel = Select::new();
571 let oper1 = sel.recv(&r);
572 let oper = sel.select();
573 match oper.index() {
574 i if i == oper1 => drop(oper.recv(&r)),
575 _ => unreachable!(),
576 }
577 }
578
579 #[test]
580 fn preflight2() {
581 let (s, r) = unbounded();
582 drop(s.clone());
583 s.send(()).unwrap();
584 drop(s);
585
586 let mut sel = Select::new();
587 let oper1 = sel.recv(&r);
588 let oper = sel.select();
589 match oper.index() {
590 i if i == oper1 => assert_eq!(oper.recv(&r), Ok(())),
591 _ => unreachable!(),
592 }
593
594 assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
595 }
596
597 #[test]
598 fn preflight3() {
599 let (s, r) = unbounded();
600 drop(s.clone());
601 s.send(()).unwrap();
602 drop(s);
603 r.recv().unwrap();
604
605 let mut sel = Select::new();
606 let oper1 = sel.recv(&r);
607 let oper = sel.select();
608 match oper.index() {
609 i if i == oper1 => assert!(oper.recv(&r).is_err()),
610 _ => unreachable!(),
611 }
612 }
613
614 #[test]
615 fn duplicate_operations() {
616 let (s, r) = unbounded::<i32>();
617 let hit = vec![Cell::new(false); 4];
618
619 while hit.iter().map(|h| h.get()).any(|hit| !hit) {
620 let mut sel = Select::new();
621 let oper0 = sel.recv(&r);
622 let oper1 = sel.recv(&r);
623 let oper2 = sel.send(&s);
624 let oper3 = sel.send(&s);
625 let oper = sel.select();
626 match oper.index() {
627 i if i == oper0 => {
628 assert!(oper.recv(&r).is_ok());
629 hit[0].set(true);
630 }
631 i if i == oper1 => {
632 assert!(oper.recv(&r).is_ok());
633 hit[1].set(true);
634 }
635 i if i == oper2 => {
636 assert!(oper.send(&s, 0).is_ok());
637 hit[2].set(true);
638 }
639 i if i == oper3 => {
640 assert!(oper.send(&s, 0).is_ok());
641 hit[3].set(true);
642 }
643 _ => unreachable!(),
644 }
645 }
646 }
647
648 #[test]
649 fn nesting() {
650 let (s, r) = unbounded::<i32>();
651
652 let mut sel = Select::new();
653 let oper1 = sel.send(&s);
654 let oper = sel.select();
655 match oper.index() {
656 i if i == oper1 => {
657 assert!(oper.send(&s, 0).is_ok());
658
659 let mut sel = Select::new();
660 let oper1 = sel.recv(&r);
661 let oper = sel.select();
662 match oper.index() {
663 i if i == oper1 => {
664 assert_eq!(oper.recv(&r), Ok(0));
665
666 let mut sel = Select::new();
667 let oper1 = sel.send(&s);
668 let oper = sel.select();
669 match oper.index() {
670 i if i == oper1 => {
671 assert!(oper.send(&s, 1).is_ok());
672
673 let mut sel = Select::new();
674 let oper1 = sel.recv(&r);
675 let oper = sel.select();
676 match oper.index() {
677 i if i == oper1 => {
678 assert_eq!(oper.recv(&r), Ok(1));
679 }
680 _ => unreachable!(),
681 }
682 }
683 _ => unreachable!(),
684 }
685 }
686 _ => unreachable!(),
687 }
688 }
689 _ => unreachable!(),
690 }
691 }
692
693 #[test]
694 fn stress_recv() {
695 #[cfg(miri)]
696 const COUNT: usize = 50;
697 #[cfg(not(miri))]
698 const COUNT: usize = 10_000;
699
700 let (s1, r1) = unbounded();
701 let (s2, r2) = bounded(5);
702 let (s3, r3) = bounded(100);
703
704 scope(|scope| {
705 scope.spawn(|_| {
706 for i in 0..COUNT {
707 s1.send(i).unwrap();
708 r3.recv().unwrap();
709
710 s2.send(i).unwrap();
711 r3.recv().unwrap();
712 }
713 });
714
715 for i in 0..COUNT {
716 for _ in 0..2 {
717 let mut sel = Select::new();
718 let oper1 = sel.recv(&r1);
719 let oper2 = sel.recv(&r2);
720 let oper = sel.select();
721 match oper.index() {
722 ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)),
723 ix if ix == oper2 => assert_eq!(oper.recv(&r2), Ok(i)),
724 _ => unreachable!(),
725 }
726
727 s3.send(()).unwrap();
728 }
729 }
730 })
731 .unwrap();
732 }
733
734 #[test]
735 fn stress_send() {
736 #[cfg(miri)]
737 const COUNT: usize = 50;
738 #[cfg(not(miri))]
739 const COUNT: usize = 10_000;
740
741 let (s1, r1) = bounded(0);
742 let (s2, r2) = bounded(0);
743 let (s3, r3) = bounded(100);
744
745 scope(|scope| {
746 scope.spawn(|_| {
747 for i in 0..COUNT {
748 assert_eq!(r1.recv().unwrap(), i);
749 assert_eq!(r2.recv().unwrap(), i);
750 r3.recv().unwrap();
751 }
752 });
753
754 for i in 0..COUNT {
755 for _ in 0..2 {
756 let mut sel = Select::new();
757 let oper1 = sel.send(&s1);
758 let oper2 = sel.send(&s2);
759 let oper = sel.select();
760 match oper.index() {
761 ix if ix == oper1 => assert!(oper.send(&s1, i).is_ok()),
762 ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()),
763 _ => unreachable!(),
764 }
765 }
766 s3.send(()).unwrap();
767 }
768 })
769 .unwrap();
770 }
771
772 #[test]
773 fn stress_mixed() {
774 #[cfg(miri)]
775 const COUNT: usize = 100;
776 #[cfg(not(miri))]
777 const COUNT: usize = 10_000;
778
779 let (s1, r1) = bounded(0);
780 let (s2, r2) = bounded(0);
781 let (s3, r3) = bounded(100);
782
783 scope(|scope| {
784 scope.spawn(|_| {
785 for i in 0..COUNT {
786 s1.send(i).unwrap();
787 assert_eq!(r2.recv().unwrap(), i);
788 r3.recv().unwrap();
789 }
790 });
791
792 for i in 0..COUNT {
793 for _ in 0..2 {
794 let mut sel = Select::new();
795 let oper1 = sel.recv(&r1);
796 let oper2 = sel.send(&s2);
797 let oper = sel.select();
798 match oper.index() {
799 ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)),
800 ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()),
801 _ => unreachable!(),
802 }
803 }
804 s3.send(()).unwrap();
805 }
806 })
807 .unwrap();
808 }
809
810 #[test]
811 fn stress_timeout_two_threads() {
812 const COUNT: usize = 20;
813
814 let (s, r) = bounded(2);
815
816 scope(|scope| {
817 scope.spawn(|_| {
818 for i in 0..COUNT {
819 if i % 2 == 0 {
820 thread::sleep(ms(500));
821 }
822
823 loop {
824 let mut sel = Select::new();
825 let oper1 = sel.send(&s);
826 let oper = sel.select_timeout(ms(100));
827 match oper {
828 Err(_) => {}
829 Ok(oper) => match oper.index() {
830 ix if ix == oper1 => {
831 assert!(oper.send(&s, i).is_ok());
832 break;
833 }
834 _ => unreachable!(),
835 },
836 }
837 }
838 }
839 });
840
841 scope.spawn(|_| {
842 for i in 0..COUNT {
843 if i % 2 == 0 {
844 thread::sleep(ms(500));
845 }
846
847 loop {
848 let mut sel = Select::new();
849 let oper1 = sel.recv(&r);
850 let oper = sel.select_timeout(ms(100));
851 match oper {
852 Err(_) => {}
853 Ok(oper) => match oper.index() {
854 ix if ix == oper1 => {
855 assert_eq!(oper.recv(&r), Ok(i));
856 break;
857 }
858 _ => unreachable!(),
859 },
860 }
861 }
862 }
863 });
864 })
865 .unwrap();
866 }
867
868 #[test]
869 fn send_recv_same_channel() {
870 let (s, r) = bounded::<i32>(0);
871 let mut sel = Select::new();
872 let oper1 = sel.send(&s);
873 let oper2 = sel.recv(&r);
874 let oper = sel.select_timeout(ms(100));
875 match oper {
876 Err(_) => {}
877 Ok(oper) => match oper.index() {
878 ix if ix == oper1 => panic!(),
879 ix if ix == oper2 => panic!(),
880 _ => unreachable!(),
881 },
882 }
883
884 let (s, r) = unbounded::<i32>();
885 let mut sel = Select::new();
886 let oper1 = sel.send(&s);
887 let oper2 = sel.recv(&r);
888 let oper = sel.select_timeout(ms(100));
889 match oper {
890 Err(_) => panic!(),
891 Ok(oper) => match oper.index() {
892 ix if ix == oper1 => assert!(oper.send(&s, 0).is_ok()),
893 ix if ix == oper2 => panic!(),
894 _ => unreachable!(),
895 },
896 }
897 }
898
899 #[test]
900 fn matching() {
901 const THREADS: usize = 44;
902
903 let (s, r) = &bounded::<usize>(0);
904
905 scope(|scope| {
906 for i in 0..THREADS {
907 scope.spawn(move |_| {
908 let mut sel = Select::new();
909 let oper1 = sel.recv(r);
910 let oper2 = sel.send(s);
911 let oper = sel.select();
912 match oper.index() {
913 ix if ix == oper1 => assert_ne!(oper.recv(r), Ok(i)),
914 ix if ix == oper2 => assert!(oper.send(s, i).is_ok()),
915 _ => unreachable!(),
916 }
917 });
918 }
919 })
920 .unwrap();
921
922 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
923 }
924
925 #[test]
926 fn matching_with_leftover() {
927 const THREADS: usize = 55;
928
929 let (s, r) = &bounded::<usize>(0);
930
931 scope(|scope| {
932 for i in 0..THREADS {
933 scope.spawn(move |_| {
934 let mut sel = Select::new();
935 let oper1 = sel.recv(r);
936 let oper2 = sel.send(s);
937 let oper = sel.select();
938 match oper.index() {
939 ix if ix == oper1 => assert_ne!(oper.recv(r), Ok(i)),
940 ix if ix == oper2 => assert!(oper.send(s, i).is_ok()),
941 _ => unreachable!(),
942 }
943 });
944 }
945 s.send(!0).unwrap();
946 })
947 .unwrap();
948
949 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
950 }
951
952 #[test]
953 fn channel_through_channel() {
954 #[cfg(miri)]
955 const COUNT: usize = 50;
956 #[cfg(not(miri))]
957 const COUNT: usize = 1000;
958
959 type T = Box<dyn Any + Send>;
960
961 for cap in 0..3 {
962 let (s, r) = bounded::<T>(cap);
963
964 scope(|scope| {
965 scope.spawn(move |_| {
966 let mut s = s;
967
968 for _ in 0..COUNT {
969 let (new_s, new_r) = bounded(cap);
970 let new_r: T = Box::new(Some(new_r));
971
972 {
973 let mut sel = Select::new();
974 let oper1 = sel.send(&s);
975 let oper = sel.select();
976 match oper.index() {
977 ix if ix == oper1 => assert!(oper.send(&s, new_r).is_ok()),
978 _ => unreachable!(),
979 }
980 }
981
982 s = new_s;
983 }
984 });
985
986 scope.spawn(move |_| {
987 let mut r = r;
988
989 for _ in 0..COUNT {
990 let new = {
991 let mut sel = Select::new();
992 let oper1 = sel.recv(&r);
993 let oper = sel.select();
994 match oper.index() {
995 ix if ix == oper1 => oper
996 .recv(&r)
997 .unwrap()
998 .downcast_mut::<Option<Receiver<T>>>()
999 .unwrap()
1000 .take()
1001 .unwrap(),
1002 _ => unreachable!(),
1003 }
1004 };
1005 r = new;
1006 }
1007 });
1008 })
1009 .unwrap();
1010 }
1011 }
1012
1013 #[test]
1014 fn linearizable_try() {
1015 #[cfg(miri)]
1016 const COUNT: usize = 50;
1017 #[cfg(not(miri))]
1018 const COUNT: usize = 100_000;
1019
1020 for step in 0..2 {
1021 let (start_s, start_r) = bounded::<()>(0);
1022 let (end_s, end_r) = bounded::<()>(0);
1023
1024 let ((s1, r1), (s2, r2)) = if step == 0 {
1025 (bounded::<i32>(1), bounded::<i32>(1))
1026 } else {
1027 (unbounded::<i32>(), unbounded::<i32>())
1028 };
1029
1030 scope(|scope| {
1031 scope.spawn(|_| {
1032 for _ in 0..COUNT {
1033 start_s.send(()).unwrap();
1034
1035 s1.send(1).unwrap();
1036
1037 let mut sel = Select::new();
1038 let oper1 = sel.recv(&r1);
1039 let oper2 = sel.recv(&r2);
1040 let oper = sel.try_select();
1041 match oper {
1042 Err(_) => unreachable!(),
1043 Ok(oper) => match oper.index() {
1044 ix if ix == oper1 => assert!(oper.recv(&r1).is_ok()),
1045 ix if ix == oper2 => assert!(oper.recv(&r2).is_ok()),
1046 _ => unreachable!(),
1047 },
1048 }
1049
1050 end_s.send(()).unwrap();
1051 let _ = r2.try_recv();
1052 }
1053 });
1054
1055 for _ in 0..COUNT {
1056 start_r.recv().unwrap();
1057
1058 s2.send(1).unwrap();
1059 let _ = r1.try_recv();
1060
1061 end_r.recv().unwrap();
1062 }
1063 })
1064 .unwrap();
1065 }
1066 }
1067
1068 #[test]
1069 fn linearizable_timeout() {
1070 #[cfg(miri)]
1071 const COUNT: usize = 50;
1072 #[cfg(not(miri))]
1073 const COUNT: usize = 100_000;
1074
1075 for step in 0..2 {
1076 let (start_s, start_r) = bounded::<()>(0);
1077 let (end_s, end_r) = bounded::<()>(0);
1078
1079 let ((s1, r1), (s2, r2)) = if step == 0 {
1080 (bounded::<i32>(1), bounded::<i32>(1))
1081 } else {
1082 (unbounded::<i32>(), unbounded::<i32>())
1083 };
1084
1085 scope(|scope| {
1086 scope.spawn(|_| {
1087 for _ in 0..COUNT {
1088 start_s.send(()).unwrap();
1089
1090 s1.send(1).unwrap();
1091
1092 let mut sel = Select::new();
1093 let oper1 = sel.recv(&r1);
1094 let oper2 = sel.recv(&r2);
1095 let oper = sel.select_timeout(ms(0));
1096 match oper {
1097 Err(_) => unreachable!(),
1098 Ok(oper) => match oper.index() {
1099 ix if ix == oper1 => assert!(oper.recv(&r1).is_ok()),
1100 ix if ix == oper2 => assert!(oper.recv(&r2).is_ok()),
1101 _ => unreachable!(),
1102 },
1103 }
1104
1105 end_s.send(()).unwrap();
1106 let _ = r2.try_recv();
1107 }
1108 });
1109
1110 for _ in 0..COUNT {
1111 start_r.recv().unwrap();
1112
1113 s2.send(1).unwrap();
1114 let _ = r1.try_recv();
1115
1116 end_r.recv().unwrap();
1117 }
1118 })
1119 .unwrap();
1120 }
1121 }
1122
1123 #[test]
1124 fn fairness1() {
1125 #[cfg(miri)]
1126 const COUNT: usize = 50;
1127 #[cfg(not(miri))]
1128 const COUNT: usize = 10_000;
1129
1130 let (s1, r1) = bounded::<()>(COUNT);
1131 let (s2, r2) = unbounded::<()>();
1132
1133 for _ in 0..COUNT {
1134 s1.send(()).unwrap();
1135 s2.send(()).unwrap();
1136 }
1137
1138 let hits = vec![Cell::new(0usize); 4];
1139 for _ in 0..COUNT {
1140 let after = after(ms(0));
1141 let tick = tick(ms(0));
1142
1143 let mut sel = Select::new();
1144 let oper1 = sel.recv(&r1);
1145 let oper2 = sel.recv(&r2);
1146 let oper3 = sel.recv(&after);
1147 let oper4 = sel.recv(&tick);
1148 let oper = sel.select();
1149 match oper.index() {
1150 i if i == oper1 => {
1151 oper.recv(&r1).unwrap();
1152 hits[0].set(hits[0].get() + 1);
1153 }
1154 i if i == oper2 => {
1155 oper.recv(&r2).unwrap();
1156 hits[1].set(hits[1].get() + 1);
1157 }
1158 i if i == oper3 => {
1159 oper.recv(&after).unwrap();
1160 hits[2].set(hits[2].get() + 1);
1161 }
1162 i if i == oper4 => {
1163 oper.recv(&tick).unwrap();
1164 hits[3].set(hits[3].get() + 1);
1165 }
1166 _ => unreachable!(),
1167 }
1168 }
1169 assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 2));
1170 }
1171
1172 #[test]
1173 fn fairness2() {
1174 #[cfg(miri)]
1175 const COUNT: usize = 50;
1176 #[cfg(not(miri))]
1177 const COUNT: usize = 10_000;
1178
1179 let (s1, r1) = unbounded::<()>();
1180 let (s2, r2) = bounded::<()>(1);
1181 let (s3, r3) = bounded::<()>(0);
1182
1183 scope(|scope| {
1184 scope.spawn(|_| {
1185 for _ in 0..COUNT {
1186 let mut sel = Select::new();
1187 let mut oper1 = None;
1188 let mut oper2 = None;
1189 if s1.is_empty() {
1190 oper1 = Some(sel.send(&s1));
1191 }
1192 if s2.is_empty() {
1193 oper2 = Some(sel.send(&s2));
1194 }
1195 let oper3 = sel.send(&s3);
1196 let oper = sel.select();
1197 match oper.index() {
1198 i if Some(i) == oper1 => assert!(oper.send(&s1, ()).is_ok()),
1199 i if Some(i) == oper2 => assert!(oper.send(&s2, ()).is_ok()),
1200 i if i == oper3 => assert!(oper.send(&s3, ()).is_ok()),
1201 _ => unreachable!(),
1202 }
1203 }
1204 });
1205
1206 let hits = vec![Cell::new(0usize); 3];
1207 for _ in 0..COUNT {
1208 let mut sel = Select::new();
1209 let oper1 = sel.recv(&r1);
1210 let oper2 = sel.recv(&r2);
1211 let oper3 = sel.recv(&r3);
1212 let oper = sel.select();
1213 match oper.index() {
1214 i if i == oper1 => {
1215 oper.recv(&r1).unwrap();
1216 hits[0].set(hits[0].get() + 1);
1217 }
1218 i if i == oper2 => {
1219 oper.recv(&r2).unwrap();
1220 hits[1].set(hits[1].get() + 1);
1221 }
1222 i if i == oper3 => {
1223 oper.recv(&r3).unwrap();
1224 hits[2].set(hits[2].get() + 1);
1225 }
1226 _ => unreachable!(),
1227 }
1228 }
1229 assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 50));
1230 })
1231 .unwrap();
1232 }
1233
1234 #[test]
1235 fn sync_and_clone() {
1236 const THREADS: usize = 20;
1237
1238 let (s, r) = &bounded::<usize>(0);
1239
1240 let mut sel = Select::new();
1241 let oper1 = sel.recv(r);
1242 let oper2 = sel.send(s);
1243 let sel = &sel;
1244
1245 scope(|scope| {
1246 for i in 0..THREADS {
1247 scope.spawn(move |_| {
1248 let mut sel = sel.clone();
1249 let oper = sel.select();
1250 match oper.index() {
1251 ix if ix == oper1 => assert_ne!(oper.recv(r), Ok(i)),
1252 ix if ix == oper2 => assert!(oper.send(s, i).is_ok()),
1253 _ => unreachable!(),
1254 }
1255 });
1256 }
1257 })
1258 .unwrap();
1259
1260 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
1261 }
1262
1263 #[test]
1264 fn send_and_clone() {
1265 const THREADS: usize = 20;
1266
1267 let (s, r) = &bounded::<usize>(0);
1268
1269 let mut sel = Select::new();
1270 let oper1 = sel.recv(r);
1271 let oper2 = sel.send(s);
1272
1273 scope(|scope| {
1274 for i in 0..THREADS {
1275 let mut sel = sel.clone();
1276 scope.spawn(move |_| {
1277 let oper = sel.select();
1278 match oper.index() {
1279 ix if ix == oper1 => assert_ne!(oper.recv(r), Ok(i)),
1280 ix if ix == oper2 => assert!(oper.send(s, i).is_ok()),
1281 _ => unreachable!(),
1282 }
1283 });
1284 }
1285 })
1286 .unwrap();
1287
1288 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
1289 }
1290
1291 #[test]
1292 fn reuse() {
1293 #[cfg(miri)]
1294 const COUNT: usize = 50;
1295 #[cfg(not(miri))]
1296 const COUNT: usize = 10_000;
1297
1298 let (s1, r1) = bounded(0);
1299 let (s2, r2) = bounded(0);
1300 let (s3, r3) = bounded(100);
1301
1302 scope(|scope| {
1303 scope.spawn(|_| {
1304 for i in 0..COUNT {
1305 s1.send(i).unwrap();
1306 assert_eq!(r2.recv().unwrap(), i);
1307 r3.recv().unwrap();
1308 }
1309 });
1310
1311 let mut sel = Select::new();
1312 let oper1 = sel.recv(&r1);
1313 let oper2 = sel.send(&s2);
1314
1315 for i in 0..COUNT {
1316 for _ in 0..2 {
1317 let oper = sel.select();
1318 match oper.index() {
1319 ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)),
1320 ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()),
1321 _ => unreachable!(),
1322 }
1323 }
1324 s3.send(()).unwrap();
1325 }
1326 })
1327 .unwrap();
1328 }