]> git.proxmox.com Git - rustc.git/blob - vendor/crossbeam-channel/tests/golang.rs
New upstream version 1.60.0+dfsg1
[rustc.git] / vendor / crossbeam-channel / tests / golang.rs
1 //! Tests copied from Go and manually rewritten in Rust.
2 //!
3 //! Source:
4 //! - https://github.com/golang/go
5 //!
6 //! Copyright & License:
7 //! - Copyright (c) 2009 The Go Authors
8 //! - https://golang.org/AUTHORS
9 //! - https://golang.org/LICENSE
10 //! - https://golang.org/PATENTS
11
12 #![allow(clippy::mutex_atomic, clippy::redundant_clone)]
13
14 use std::alloc::{GlobalAlloc, Layout, System};
15 use std::any::Any;
16 use std::cell::Cell;
17 use std::collections::HashMap;
18 use std::sync::atomic::{AtomicUsize, Ordering::SeqCst};
19 use std::sync::{Arc, Condvar, Mutex};
20 use std::thread;
21 use std::time::Duration;
22
23 use crossbeam_channel::{bounded, select, tick, unbounded, Receiver, Select, Sender};
24
25 fn ms(ms: u64) -> Duration {
26 Duration::from_millis(ms)
27 }
28
29 struct Chan<T> {
30 inner: Arc<Mutex<ChanInner<T>>>,
31 }
32
33 struct ChanInner<T> {
34 s: Option<Sender<T>>,
35 r: Receiver<T>,
36 }
37
38 impl<T> Clone for Chan<T> {
39 fn clone(&self) -> Chan<T> {
40 Chan {
41 inner: self.inner.clone(),
42 }
43 }
44 }
45
46 impl<T> Chan<T> {
47 fn send(&self, msg: T) {
48 let s = self
49 .inner
50 .lock()
51 .unwrap()
52 .s
53 .as_ref()
54 .expect("sending into closed channel")
55 .clone();
56 let _ = s.send(msg);
57 }
58
59 fn try_recv(&self) -> Option<T> {
60 let r = self.inner.lock().unwrap().r.clone();
61 r.try_recv().ok()
62 }
63
64 fn recv(&self) -> Option<T> {
65 let r = self.inner.lock().unwrap().r.clone();
66 r.recv().ok()
67 }
68
69 fn close(&self) {
70 self.inner
71 .lock()
72 .unwrap()
73 .s
74 .take()
75 .expect("channel already closed");
76 }
77
78 fn rx(&self) -> Receiver<T> {
79 self.inner.lock().unwrap().r.clone()
80 }
81
82 fn tx(&self) -> Sender<T> {
83 match self.inner.lock().unwrap().s.as_ref() {
84 None => {
85 let (s, r) = bounded(0);
86 std::mem::forget(r);
87 s
88 }
89 Some(s) => s.clone(),
90 }
91 }
92 }
93
94 impl<T> Iterator for Chan<T> {
95 type Item = T;
96
97 fn next(&mut self) -> Option<Self::Item> {
98 self.recv()
99 }
100 }
101
102 impl<'a, T> IntoIterator for &'a Chan<T> {
103 type Item = T;
104 type IntoIter = Chan<T>;
105
106 fn into_iter(self) -> Self::IntoIter {
107 self.clone()
108 }
109 }
110
111 fn make<T>(cap: usize) -> Chan<T> {
112 let (s, r) = bounded(cap);
113 Chan {
114 inner: Arc::new(Mutex::new(ChanInner { s: Some(s), r })),
115 }
116 }
117
118 fn make_unbounded<T>() -> Chan<T> {
119 let (s, r) = unbounded();
120 Chan {
121 inner: Arc::new(Mutex::new(ChanInner { s: Some(s), r })),
122 }
123 }
124 #[derive(Clone)]
125 struct WaitGroup(Arc<WaitGroupInner>);
126
127 struct WaitGroupInner {
128 cond: Condvar,
129 count: Mutex<i32>,
130 }
131
132 impl WaitGroup {
133 fn new() -> WaitGroup {
134 WaitGroup(Arc::new(WaitGroupInner {
135 cond: Condvar::new(),
136 count: Mutex::new(0),
137 }))
138 }
139
140 fn add(&self, delta: i32) {
141 let mut count = self.0.count.lock().unwrap();
142 *count += delta;
143 assert!(*count >= 0);
144 self.0.cond.notify_all();
145 }
146
147 fn done(&self) {
148 self.add(-1);
149 }
150
151 fn wait(&self) {
152 let mut count = self.0.count.lock().unwrap();
153 while *count > 0 {
154 count = self.0.cond.wait(count).unwrap();
155 }
156 }
157 }
158
159 struct Defer<F: FnOnce()> {
160 f: Option<Box<F>>,
161 }
162
163 impl<F: FnOnce()> Drop for Defer<F> {
164 fn drop(&mut self) {
165 let f = self.f.take().unwrap();
166 let mut f = Some(f);
167 let mut f = move || f.take().unwrap()();
168 f();
169 }
170 }
171
172 struct Counter;
173
174 static ALLOCATED: AtomicUsize = AtomicUsize::new(0);
175 unsafe impl GlobalAlloc for Counter {
176 unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
177 let ret = System.alloc(layout);
178 if !ret.is_null() {
179 ALLOCATED.fetch_add(layout.size(), SeqCst);
180 }
181 ret
182 }
183
184 unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
185 System.dealloc(ptr, layout);
186 ALLOCATED.fetch_sub(layout.size(), SeqCst);
187 }
188 }
189
190 #[global_allocator]
191 static A: Counter = Counter;
192
193 macro_rules! defer {
194 ($body:expr) => {
195 let _defer = Defer {
196 f: Some(Box::new(|| $body)),
197 };
198 };
199 }
200
201 macro_rules! go {
202 (@parse ref $v:ident, $($tail:tt)*) => {{
203 let ref $v = $v;
204 go!(@parse $($tail)*)
205 }};
206 (@parse move $v:ident, $($tail:tt)*) => {{
207 let $v = $v;
208 go!(@parse $($tail)*)
209 }};
210 (@parse $v:ident, $($tail:tt)*) => {{
211 let $v = $v.clone();
212 go!(@parse $($tail)*)
213 }};
214 (@parse $body:expr) => {
215 ::std::thread::spawn(move || {
216 let res = ::std::panic::catch_unwind(::std::panic::AssertUnwindSafe(|| {
217 $body
218 }));
219 if res.is_err() {
220 eprintln!("goroutine panicked: {:?}", res);
221 ::std::process::abort();
222 }
223 })
224 };
225 (@parse $($tail:tt)*) => {
226 compile_error!("invalid `go!` syntax")
227 };
228 ($($tail:tt)*) => {{
229 go!(@parse $($tail)*)
230 }};
231 }
232
233 // https://github.com/golang/go/blob/master/test/chan/doubleselect.go
234 mod doubleselect {
235 use super::*;
236
237 #[cfg(miri)]
238 const ITERATIONS: i32 = 100;
239 #[cfg(not(miri))]
240 const ITERATIONS: i32 = 10_000;
241
242 fn sender(n: i32, c1: Chan<i32>, c2: Chan<i32>, c3: Chan<i32>, c4: Chan<i32>) {
243 defer! { c1.close() }
244 defer! { c2.close() }
245 defer! { c3.close() }
246 defer! { c4.close() }
247
248 for i in 0..n {
249 select! {
250 send(c1.tx(), i) -> _ => {}
251 send(c2.tx(), i) -> _ => {}
252 send(c3.tx(), i) -> _ => {}
253 send(c4.tx(), i) -> _ => {}
254 }
255 }
256 }
257
258 fn mux(out: Chan<i32>, inp: Chan<i32>, done: Chan<bool>) {
259 for v in inp {
260 out.send(v);
261 }
262 done.send(true);
263 }
264
265 fn recver(inp: Chan<i32>) {
266 let mut seen = HashMap::new();
267
268 for v in &inp {
269 if seen.contains_key(&v) {
270 panic!("got duplicate value for {}", v);
271 }
272 seen.insert(v, true);
273 }
274 }
275
276 #[test]
277 fn main() {
278 let c1 = make::<i32>(0);
279 let c2 = make::<i32>(0);
280 let c3 = make::<i32>(0);
281 let c4 = make::<i32>(0);
282 let done = make::<bool>(0);
283 let cmux = make::<i32>(0);
284
285 go!(c1, c2, c3, c4, sender(ITERATIONS, c1, c2, c3, c4));
286 go!(cmux, c1, done, mux(cmux, c1, done));
287 go!(cmux, c2, done, mux(cmux, c2, done));
288 go!(cmux, c3, done, mux(cmux, c3, done));
289 go!(cmux, c4, done, mux(cmux, c4, done));
290 go!(done, cmux, {
291 done.recv();
292 done.recv();
293 done.recv();
294 done.recv();
295 cmux.close();
296 });
297 recver(cmux);
298 }
299 }
300
301 // https://github.com/golang/go/blob/master/test/chan/fifo.go
302 mod fifo {
303 use super::*;
304
305 const N: i32 = 10;
306
307 #[test]
308 fn asynch_fifo() {
309 let ch = make::<i32>(N as usize);
310 for i in 0..N {
311 ch.send(i);
312 }
313 for i in 0..N {
314 if ch.recv() != Some(i) {
315 panic!("bad receive");
316 }
317 }
318 }
319
320 fn chain(ch: Chan<i32>, val: i32, inp: Chan<i32>, out: Chan<i32>) {
321 inp.recv();
322 if ch.recv() != Some(val) {
323 panic!("{}", val);
324 }
325 out.send(1);
326 }
327
328 #[test]
329 fn synch_fifo() {
330 let ch = make::<i32>(0);
331 let mut inp = make::<i32>(0);
332 let start = inp.clone();
333
334 for i in 0..N {
335 let out = make::<i32>(0);
336 go!(ch, i, inp, out, chain(ch, i, inp, out));
337 inp = out;
338 }
339
340 start.send(0);
341 for i in 0..N {
342 ch.send(i);
343 }
344 inp.recv();
345 }
346 }
347
348 // https://github.com/golang/go/blob/master/test/chan/goroutines.go
349 mod goroutines {
350 use super::*;
351
352 fn f(left: Chan<i32>, right: Chan<i32>) {
353 left.send(right.recv().unwrap());
354 }
355
356 #[test]
357 fn main() {
358 let n = 100i32;
359
360 let leftmost = make::<i32>(0);
361 let mut right = leftmost.clone();
362 let mut left = leftmost.clone();
363
364 for _ in 0..n {
365 right = make::<i32>(0);
366 go!(left, right, f(left, right));
367 left = right.clone();
368 }
369
370 go!(right, right.send(1));
371 leftmost.recv().unwrap();
372 }
373 }
374
375 // https://github.com/golang/go/blob/master/test/chan/nonblock.go
376 mod nonblock {
377 use super::*;
378
379 fn i32receiver(c: Chan<i32>, strobe: Chan<bool>) {
380 if c.recv().unwrap() != 123 {
381 panic!("i32 value");
382 }
383 strobe.send(true);
384 }
385
386 fn i32sender(c: Chan<i32>, strobe: Chan<bool>) {
387 c.send(234);
388 strobe.send(true);
389 }
390
391 fn i64receiver(c: Chan<i64>, strobe: Chan<bool>) {
392 if c.recv().unwrap() != 123456 {
393 panic!("i64 value");
394 }
395 strobe.send(true);
396 }
397
398 fn i64sender(c: Chan<i64>, strobe: Chan<bool>) {
399 c.send(234567);
400 strobe.send(true);
401 }
402
403 fn breceiver(c: Chan<bool>, strobe: Chan<bool>) {
404 if !c.recv().unwrap() {
405 panic!("b value");
406 }
407 strobe.send(true);
408 }
409
410 fn bsender(c: Chan<bool>, strobe: Chan<bool>) {
411 c.send(true);
412 strobe.send(true);
413 }
414
415 fn sreceiver(c: Chan<String>, strobe: Chan<bool>) {
416 if c.recv().unwrap() != "hello" {
417 panic!("x value");
418 }
419 strobe.send(true);
420 }
421
422 fn ssender(c: Chan<String>, strobe: Chan<bool>) {
423 c.send("hello again".to_string());
424 strobe.send(true);
425 }
426
427 const MAX_TRIES: usize = 10000; // Up to 100ms per test.
428
429 #[test]
430 fn main() {
431 let ticker = tick(Duration::new(0, 10_000)); // 10 us
432 let sleep = || {
433 ticker.recv().unwrap();
434 ticker.recv().unwrap();
435 thread::yield_now();
436 thread::yield_now();
437 thread::yield_now();
438 };
439
440 let sync = make::<bool>(0);
441
442 for buffer in 0..2 {
443 let c32 = make::<i32>(buffer);
444 let c64 = make::<i64>(buffer);
445 let cb = make::<bool>(buffer);
446 let cs = make::<String>(buffer);
447
448 select! {
449 recv(c32.rx()) -> _ => panic!("blocked i32sender"),
450 default => {}
451 }
452
453 select! {
454 recv(c64.rx()) -> _ => panic!("blocked i64sender"),
455 default => {}
456 }
457
458 select! {
459 recv(cb.rx()) -> _ => panic!("blocked bsender"),
460 default => {}
461 }
462
463 select! {
464 recv(cs.rx()) -> _ => panic!("blocked ssender"),
465 default => {}
466 }
467
468 go!(c32, sync, i32receiver(c32, sync));
469 let mut r#try = 0;
470 loop {
471 select! {
472 send(c32.tx(), 123) -> _ => break,
473 default => {
474 r#try += 1;
475 if r#try > MAX_TRIES {
476 println!("i32receiver buffer={}", buffer);
477 panic!("fail")
478 }
479 sleep();
480 }
481 }
482 }
483 sync.recv();
484 go!(c32, sync, i32sender(c32, sync));
485 if buffer > 0 {
486 sync.recv();
487 }
488 let mut r#try = 0;
489 loop {
490 select! {
491 recv(c32.rx()) -> v => {
492 if v != Ok(234) {
493 panic!("i32sender value");
494 }
495 break;
496 }
497 default => {
498 r#try += 1;
499 if r#try > MAX_TRIES {
500 println!("i32sender buffer={}", buffer);
501 panic!("fail");
502 }
503 sleep();
504 }
505 }
506 }
507 if buffer == 0 {
508 sync.recv();
509 }
510
511 go!(c64, sync, i64receiver(c64, sync));
512 let mut r#try = 0;
513 loop {
514 select! {
515 send(c64.tx(), 123456) -> _ => break,
516 default => {
517 r#try += 1;
518 if r#try > MAX_TRIES {
519 println!("i64receiver buffer={}", buffer);
520 panic!("fail")
521 }
522 sleep();
523 }
524 }
525 }
526 sync.recv();
527 go!(c64, sync, i64sender(c64, sync));
528 if buffer > 0 {
529 sync.recv();
530 }
531 let mut r#try = 0;
532 loop {
533 select! {
534 recv(c64.rx()) -> v => {
535 if v != Ok(234567) {
536 panic!("i64sender value");
537 }
538 break;
539 }
540 default => {
541 r#try += 1;
542 if r#try > MAX_TRIES {
543 println!("i64sender buffer={}", buffer);
544 panic!("fail");
545 }
546 sleep();
547 }
548 }
549 }
550 if buffer == 0 {
551 sync.recv();
552 }
553
554 go!(cb, sync, breceiver(cb, sync));
555 let mut r#try = 0;
556 loop {
557 select! {
558 send(cb.tx(), true) -> _ => break,
559 default => {
560 r#try += 1;
561 if r#try > MAX_TRIES {
562 println!("breceiver buffer={}", buffer);
563 panic!("fail")
564 }
565 sleep();
566 }
567 }
568 }
569 sync.recv();
570 go!(cb, sync, bsender(cb, sync));
571 if buffer > 0 {
572 sync.recv();
573 }
574 let mut r#try = 0;
575 loop {
576 select! {
577 recv(cb.rx()) -> v => {
578 if v != Ok(true) {
579 panic!("bsender value");
580 }
581 break;
582 }
583 default => {
584 r#try += 1;
585 if r#try > MAX_TRIES {
586 println!("bsender buffer={}", buffer);
587 panic!("fail");
588 }
589 sleep();
590 }
591 }
592 }
593 if buffer == 0 {
594 sync.recv();
595 }
596
597 go!(cs, sync, sreceiver(cs, sync));
598 let mut r#try = 0;
599 loop {
600 select! {
601 send(cs.tx(), "hello".to_string()) -> _ => break,
602 default => {
603 r#try += 1;
604 if r#try > MAX_TRIES {
605 println!("sreceiver buffer={}", buffer);
606 panic!("fail")
607 }
608 sleep();
609 }
610 }
611 }
612 sync.recv();
613 go!(cs, sync, ssender(cs, sync));
614 if buffer > 0 {
615 sync.recv();
616 }
617 let mut r#try = 0;
618 loop {
619 select! {
620 recv(cs.rx()) -> v => {
621 if v != Ok("hello again".to_string()) {
622 panic!("ssender value");
623 }
624 break;
625 }
626 default => {
627 r#try += 1;
628 if r#try > MAX_TRIES {
629 println!("ssender buffer={}", buffer);
630 panic!("fail");
631 }
632 sleep();
633 }
634 }
635 }
636 if buffer == 0 {
637 sync.recv();
638 }
639 }
640 }
641 }
642
643 // https://github.com/golang/go/blob/master/test/chan/select.go
644 mod select {
645 use super::*;
646
647 #[test]
648 fn main() {
649 let shift = Cell::new(0);
650 let counter = Cell::new(0);
651
652 let get_value = || {
653 counter.set(counter.get() + 1);
654 1 << shift.get()
655 };
656
657 let send = |mut a: Option<&Chan<u32>>, mut b: Option<&Chan<u32>>| {
658 let mut i = 0;
659 let never = make::<u32>(0);
660 loop {
661 let nil1 = never.tx();
662 let nil2 = never.tx();
663 let v1 = get_value();
664 let v2 = get_value();
665 select! {
666 send(a.map(|c| c.tx()).unwrap_or(nil1), v1) -> _ => {
667 i += 1;
668 a = None;
669 }
670 send(b.map(|c| c.tx()).unwrap_or(nil2), v2) -> _ => {
671 i += 1;
672 b = None;
673 }
674 default => break,
675 }
676 shift.set(shift.get() + 1);
677 }
678 i
679 };
680
681 let a = make::<u32>(1);
682 let b = make::<u32>(1);
683
684 assert_eq!(send(Some(&a), Some(&b)), 2);
685
686 let av = a.recv().unwrap();
687 let bv = b.recv().unwrap();
688 assert_eq!(av | bv, 3);
689
690 assert_eq!(send(Some(&a), None), 1);
691 assert_eq!(counter.get(), 10);
692 }
693 }
694
695 // https://github.com/golang/go/blob/master/test/chan/select2.go
696 mod select2 {
697 use super::*;
698
699 #[cfg(miri)]
700 const N: i32 = 1000;
701 #[cfg(not(miri))]
702 const N: i32 = 100000;
703
704 #[test]
705 fn main() {
706 fn sender(c: &Chan<i32>, n: i32) {
707 for _ in 0..n {
708 c.send(1);
709 }
710 }
711
712 fn receiver(c: &Chan<i32>, dummy: &Chan<i32>, n: i32) {
713 for _ in 0..n {
714 select! {
715 recv(c.rx()) -> _ => {}
716 recv(dummy.rx()) -> _ => {
717 panic!("dummy");
718 }
719 }
720 }
721 }
722
723 let c = make_unbounded::<i32>();
724 let dummy = make_unbounded::<i32>();
725
726 ALLOCATED.store(0, SeqCst);
727
728 go!(c, sender(&c, N));
729 receiver(&c, &dummy, N);
730
731 let alloc = ALLOCATED.load(SeqCst);
732
733 go!(c, sender(&c, N));
734 receiver(&c, &dummy, N);
735
736 assert!(
737 !(ALLOCATED.load(SeqCst) > alloc
738 && (ALLOCATED.load(SeqCst) - alloc) > (N as usize + 10000))
739 )
740 }
741 }
742
743 // https://github.com/golang/go/blob/master/test/chan/select3.go
744 mod select3 {
745 // TODO
746 }
747
748 // https://github.com/golang/go/blob/master/test/chan/select4.go
749 mod select4 {
750 use super::*;
751
752 #[test]
753 fn main() {
754 let c = make::<i32>(1);
755 let c1 = make::<i32>(0);
756 c.send(42);
757 select! {
758 recv(c1.rx()) -> _ => panic!("BUG"),
759 recv(c.rx()) -> v => assert_eq!(v, Ok(42)),
760 }
761 }
762 }
763
764 // https://github.com/golang/go/blob/master/test/chan/select6.go
765 mod select6 {
766 use super::*;
767
768 #[test]
769 fn main() {
770 let c1 = make::<bool>(0);
771 let c2 = make::<bool>(0);
772 let c3 = make::<bool>(0);
773
774 go!(c1, c1.recv());
775 go!(c1, c2, c3, {
776 select! {
777 recv(c1.rx()) -> _ => panic!("dummy"),
778 recv(c2.rx()) -> _ => c3.send(true),
779 }
780 c1.recv();
781 });
782 go!(c2, c2.send(true));
783
784 c3.recv();
785 c1.send(true);
786 c1.send(true);
787 }
788 }
789
790 // https://github.com/golang/go/blob/master/test/chan/select7.go
791 mod select7 {
792 use super::*;
793
794 fn recv1(c: Chan<i32>) {
795 c.recv().unwrap();
796 }
797
798 fn recv2(c: Chan<i32>) {
799 select! {
800 recv(c.rx()) -> _ => ()
801 }
802 }
803
804 fn recv3(c: Chan<i32>) {
805 let c2 = make::<i32>(1);
806 select! {
807 recv(c.rx()) -> _ => (),
808 recv(c2.rx()) -> _ => ()
809 }
810 }
811
812 fn send1(recv: fn(Chan<i32>)) {
813 let c = make::<i32>(1);
814 go!(c, recv(c));
815 thread::yield_now();
816 c.send(1);
817 }
818
819 fn send2(recv: fn(Chan<i32>)) {
820 let c = make::<i32>(1);
821 go!(c, recv(c));
822 thread::yield_now();
823 select! {
824 send(c.tx(), 1) -> _ => ()
825 }
826 }
827
828 fn send3(recv: fn(Chan<i32>)) {
829 let c = make::<i32>(1);
830 go!(c, recv(c));
831 thread::yield_now();
832 let c2 = make::<i32>(1);
833 select! {
834 send(c.tx(), 1) -> _ => (),
835 send(c2.tx(), 1) -> _ => ()
836 }
837 }
838
839 #[test]
840 fn main() {
841 send1(recv1);
842 send2(recv1);
843 send3(recv1);
844 send1(recv2);
845 send2(recv2);
846 send3(recv2);
847 send1(recv3);
848 send2(recv3);
849 send3(recv3);
850 }
851 }
852
853 // https://github.com/golang/go/blob/master/test/chan/sieve1.go
854 mod sieve1 {
855 use super::*;
856
857 fn generate(ch: Chan<i32>) {
858 let mut i = 2;
859 loop {
860 ch.send(i);
861 i += 1;
862 }
863 }
864
865 fn filter(in_ch: Chan<i32>, out_ch: Chan<i32>, prime: i32) {
866 for i in in_ch {
867 if i % prime != 0 {
868 out_ch.send(i);
869 }
870 }
871 }
872
873 fn sieve(primes: Chan<i32>) {
874 let mut ch = make::<i32>(1);
875 go!(ch, generate(ch));
876 loop {
877 let prime = ch.recv().unwrap();
878 primes.send(prime);
879
880 let ch1 = make::<i32>(1);
881 go!(ch, ch1, prime, filter(ch, ch1, prime));
882 ch = ch1;
883 }
884 }
885
886 #[test]
887 fn main() {
888 let primes = make::<i32>(1);
889 go!(primes, sieve(primes));
890
891 let a = [
892 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83,
893 89, 97,
894 ];
895 for item in a.iter() {
896 let x = primes.recv().unwrap();
897 if x != *item {
898 println!("{} != {}", x, item);
899 panic!("fail");
900 }
901 }
902 }
903 }
904
905 // https://github.com/golang/go/blob/master/test/chan/zerosize.go
906 mod zerosize {
907 use super::*;
908
909 #[test]
910 fn zero_size_struct() {
911 struct ZeroSize;
912 let _ = make::<ZeroSize>(0);
913 }
914
915 #[test]
916 fn zero_size_array() {
917 let _ = make::<[u8; 0]>(0);
918 }
919 }
920
921 // https://github.com/golang/go/blob/master/src/runtime/chan_test.go
922 mod chan_test {
923 use super::*;
924
925 #[test]
926 fn test_chan() {
927 #[cfg(miri)]
928 const N: i32 = 20;
929 #[cfg(not(miri))]
930 const N: i32 = 200;
931
932 for cap in 0..N {
933 {
934 // Ensure that receive from empty chan blocks.
935 let c = make::<i32>(cap as usize);
936
937 let recv1 = Arc::new(Mutex::new(false));
938 go!(c, recv1, {
939 c.recv();
940 *recv1.lock().unwrap() = true;
941 });
942
943 let recv2 = Arc::new(Mutex::new(false));
944 go!(c, recv2, {
945 c.recv();
946 *recv2.lock().unwrap() = true;
947 });
948
949 thread::sleep(ms(1));
950
951 if *recv1.lock().unwrap() || *recv2.lock().unwrap() {
952 panic!();
953 }
954
955 // Ensure that non-blocking receive does not block.
956 select! {
957 recv(c.rx()) -> _ => panic!(),
958 default => {}
959 }
960 select! {
961 recv(c.rx()) -> _ => panic!(),
962 default => {}
963 }
964
965 c.send(0);
966 c.send(0);
967 }
968
969 {
970 // Ensure that send to full chan blocks.
971 let c = make::<i32>(cap as usize);
972 for i in 0..cap {
973 c.send(i);
974 }
975
976 let sent = Arc::new(Mutex::new(0));
977 go!(sent, c, {
978 c.send(0);
979 *sent.lock().unwrap() = 1;
980 });
981
982 thread::sleep(ms(1));
983
984 if *sent.lock().unwrap() != 0 {
985 panic!();
986 }
987
988 // Ensure that non-blocking send does not block.
989 select! {
990 send(c.tx(), 0) -> _ => panic!(),
991 default => {}
992 }
993 c.recv();
994 }
995
996 {
997 // Ensure that we receive 0 from closed chan.
998 let c = make::<i32>(cap as usize);
999 for i in 0..cap {
1000 c.send(i);
1001 }
1002 c.close();
1003
1004 for i in 0..cap {
1005 let v = c.recv();
1006 if v != Some(i) {
1007 panic!();
1008 }
1009 }
1010
1011 if c.recv() != None {
1012 panic!();
1013 }
1014 if c.try_recv() != None {
1015 panic!();
1016 }
1017 }
1018
1019 {
1020 // Ensure that close unblocks receive.
1021 let c = make::<i32>(cap as usize);
1022 let done = make::<bool>(0);
1023
1024 go!(c, done, {
1025 let v = c.try_recv();
1026 done.send(v.is_none());
1027 });
1028
1029 thread::sleep(ms(1));
1030 c.close();
1031
1032 if !done.recv().unwrap() {
1033 panic!();
1034 }
1035 }
1036
1037 {
1038 // Send 100 integers,
1039 // ensure that we receive them non-corrupted in FIFO order.
1040 let c = make::<i32>(cap as usize);
1041 go!(c, {
1042 for i in 0..100 {
1043 c.send(i);
1044 }
1045 });
1046 for i in 0..100 {
1047 if c.recv() != Some(i) {
1048 panic!();
1049 }
1050 }
1051
1052 // Same, but using recv2.
1053 go!(c, {
1054 for i in 0..100 {
1055 c.send(i);
1056 }
1057 });
1058 for i in 0..100 {
1059 if c.recv() != Some(i) {
1060 panic!();
1061 }
1062 }
1063 }
1064 }
1065 }
1066
1067 #[test]
1068 fn test_nonblock_recv_race() {
1069 #[cfg(miri)]
1070 const N: usize = 100;
1071 #[cfg(not(miri))]
1072 const N: usize = 1000;
1073
1074 for _ in 0..N {
1075 let c = make::<i32>(1);
1076 c.send(1);
1077
1078 let t = go!(c, {
1079 select! {
1080 recv(c.rx()) -> _ => {}
1081 default => panic!("chan is not ready"),
1082 }
1083 });
1084
1085 c.close();
1086 c.recv();
1087 t.join().unwrap();
1088 }
1089 }
1090
1091 #[test]
1092 fn test_nonblock_select_race() {
1093 #[cfg(miri)]
1094 const N: usize = 100;
1095 #[cfg(not(miri))]
1096 const N: usize = 1000;
1097
1098 let done = make::<bool>(1);
1099 for _ in 0..N {
1100 let c1 = make::<i32>(1);
1101 let c2 = make::<i32>(1);
1102 c1.send(1);
1103
1104 go!(c1, c2, done, {
1105 select! {
1106 recv(c1.rx()) -> _ => {}
1107 recv(c2.rx()) -> _ => {}
1108 default => {
1109 done.send(false);
1110 return;
1111 }
1112 }
1113 done.send(true);
1114 });
1115
1116 c2.send(1);
1117 select! {
1118 recv(c1.rx()) -> _ => {}
1119 default => {}
1120 }
1121 if !done.recv().unwrap() {
1122 panic!("no chan is ready");
1123 }
1124 }
1125 }
1126
1127 #[test]
1128 fn test_nonblock_select_race2() {
1129 #[cfg(miri)]
1130 const N: usize = 100;
1131 #[cfg(not(miri))]
1132 const N: usize = 1000;
1133
1134 let done = make::<bool>(1);
1135 for _ in 0..N {
1136 let c1 = make::<i32>(1);
1137 let c2 = make::<i32>(0);
1138 c1.send(1);
1139
1140 go!(c1, c2, done, {
1141 select! {
1142 recv(c1.rx()) -> _ => {}
1143 recv(c2.rx()) -> _ => {}
1144 default => {
1145 done.send(false);
1146 return;
1147 }
1148 }
1149 done.send(true);
1150 });
1151
1152 c2.close();
1153 select! {
1154 recv(c1.rx()) -> _ => {}
1155 default => {}
1156 }
1157 if !done.recv().unwrap() {
1158 panic!("no chan is ready");
1159 }
1160 }
1161 }
1162
1163 #[test]
1164 fn test_self_select() {
1165 // Ensure that send/recv on the same chan in select
1166 // does not crash nor deadlock.
1167
1168 #[cfg(miri)]
1169 const N: usize = 100;
1170 #[cfg(not(miri))]
1171 const N: usize = 1000;
1172
1173 for &cap in &[0, 10] {
1174 let wg = WaitGroup::new();
1175 wg.add(2);
1176 let c = make::<i32>(cap);
1177
1178 for p in 0..2 {
1179 let p = p;
1180 go!(wg, p, c, {
1181 defer! { wg.done() }
1182 for i in 0..N {
1183 if p == 0 || i % 2 == 0 {
1184 select! {
1185 send(c.tx(), p) -> _ => {}
1186 recv(c.rx()) -> v => {
1187 if cap == 0 && v.ok() == Some(p) {
1188 panic!("self receive");
1189 }
1190 }
1191 }
1192 } else {
1193 select! {
1194 recv(c.rx()) -> v => {
1195 if cap == 0 && v.ok() == Some(p) {
1196 panic!("self receive");
1197 }
1198 }
1199 send(c.tx(), p) -> _ => {}
1200 }
1201 }
1202 }
1203 });
1204 }
1205 wg.wait();
1206 }
1207 }
1208
1209 #[test]
1210 fn test_select_stress() {
1211 #[cfg(miri)]
1212 const N: usize = 100;
1213 #[cfg(not(miri))]
1214 const N: usize = 10000;
1215
1216 let c = vec![
1217 make::<i32>(0),
1218 make::<i32>(0),
1219 make::<i32>(2),
1220 make::<i32>(3),
1221 ];
1222
1223 // There are 4 goroutines that send N values on each of the chans,
1224 // + 4 goroutines that receive N values on each of the chans,
1225 // + 1 goroutine that sends N values on each of the chans in a single select,
1226 // + 1 goroutine that receives N values on each of the chans in a single select.
1227 // All these sends, receives and selects interact chaotically at runtime,
1228 // but we are careful that this whole construct does not deadlock.
1229 let wg = WaitGroup::new();
1230 wg.add(10);
1231
1232 for k in 0..4 {
1233 go!(k, c, wg, {
1234 for _ in 0..N {
1235 c[k].send(0);
1236 }
1237 wg.done();
1238 });
1239 go!(k, c, wg, {
1240 for _ in 0..N {
1241 c[k].recv();
1242 }
1243 wg.done();
1244 });
1245 }
1246
1247 go!(c, wg, {
1248 let mut n = [0; 4];
1249 let mut c1 = c.iter().map(|c| Some(c.rx().clone())).collect::<Vec<_>>();
1250
1251 for _ in 0..4 * N {
1252 let index = {
1253 let mut sel = Select::new();
1254 let mut opers = [!0; 4];
1255 for &i in &[3, 2, 0, 1] {
1256 if let Some(c) = &c1[i] {
1257 opers[i] = sel.recv(c);
1258 }
1259 }
1260
1261 let oper = sel.select();
1262 let mut index = !0;
1263 for i in 0..4 {
1264 if opers[i] == oper.index() {
1265 index = i;
1266 let _ = oper.recv(c1[i].as_ref().unwrap());
1267 break;
1268 }
1269 }
1270 index
1271 };
1272
1273 n[index] += 1;
1274 if n[index] == N {
1275 c1[index] = None;
1276 }
1277 }
1278 wg.done();
1279 });
1280
1281 go!(c, wg, {
1282 let mut n = [0; 4];
1283 let mut c1 = c.iter().map(|c| Some(c.tx().clone())).collect::<Vec<_>>();
1284
1285 for _ in 0..4 * N {
1286 let index = {
1287 let mut sel = Select::new();
1288 let mut opers = [!0; 4];
1289 for &i in &[0, 1, 2, 3] {
1290 if let Some(c) = &c1[i] {
1291 opers[i] = sel.send(c);
1292 }
1293 }
1294
1295 let oper = sel.select();
1296 let mut index = !0;
1297 for i in 0..4 {
1298 if opers[i] == oper.index() {
1299 index = i;
1300 let _ = oper.send(c1[i].as_ref().unwrap(), 0);
1301 break;
1302 }
1303 }
1304 index
1305 };
1306
1307 n[index] += 1;
1308 if n[index] == N {
1309 c1[index] = None;
1310 }
1311 }
1312 wg.done();
1313 });
1314
1315 wg.wait();
1316 }
1317
1318 #[test]
1319 fn test_select_fairness() {
1320 #[cfg(miri)]
1321 const TRIALS: usize = 100;
1322 #[cfg(not(miri))]
1323 const TRIALS: usize = 10000;
1324
1325 let c1 = make::<u8>(TRIALS + 1);
1326 let c2 = make::<u8>(TRIALS + 1);
1327
1328 for _ in 0..TRIALS + 1 {
1329 c1.send(1);
1330 c2.send(2);
1331 }
1332
1333 let c3 = make::<u8>(0);
1334 let c4 = make::<u8>(0);
1335 let out = make::<u8>(0);
1336 let done = make::<u8>(0);
1337 let wg = WaitGroup::new();
1338
1339 wg.add(1);
1340 go!(wg, c1, c2, c3, c4, out, done, {
1341 defer! { wg.done() };
1342 loop {
1343 let b;
1344 select! {
1345 recv(c3.rx()) -> m => b = m.unwrap(),
1346 recv(c4.rx()) -> m => b = m.unwrap(),
1347 recv(c1.rx()) -> m => b = m.unwrap(),
1348 recv(c2.rx()) -> m => b = m.unwrap(),
1349 }
1350 select! {
1351 send(out.tx(), b) -> _ => {}
1352 recv(done.rx()) -> _ => return,
1353 }
1354 }
1355 });
1356
1357 let (mut cnt1, mut cnt2) = (0, 0);
1358 for _ in 0..TRIALS {
1359 match out.recv() {
1360 Some(1) => cnt1 += 1,
1361 Some(2) => cnt2 += 1,
1362 b => panic!("unexpected value {:?} on channel", b),
1363 }
1364 }
1365
1366 // If the select in the goroutine is fair,
1367 // cnt1 and cnt2 should be about the same value.
1368 // With 10,000 trials, the expected margin of error at
1369 // a confidence level of five nines is 4.4172 / (2 * Sqrt(10000)).
1370
1371 let r = cnt1 as f64 / TRIALS as f64;
1372 let e = (r - 0.5).abs();
1373
1374 if e > 4.4172 / (2.0 * (TRIALS as f64).sqrt()) {
1375 panic!(
1376 "unfair select: in {} trials, results were {}, {}",
1377 TRIALS, cnt1, cnt2,
1378 );
1379 }
1380
1381 done.close();
1382 wg.wait();
1383 }
1384
1385 #[test]
1386 fn test_chan_send_interface() {
1387 struct Mt;
1388
1389 let c = make::<Box<dyn Any>>(1);
1390 c.send(Box::new(Mt));
1391
1392 select! {
1393 send(c.tx(), Box::new(Mt)) -> _ => {}
1394 default => {}
1395 }
1396
1397 select! {
1398 send(c.tx(), Box::new(Mt)) -> _ => {}
1399 send(c.tx(), Box::new(Mt)) -> _ => {}
1400 default => {}
1401 }
1402 }
1403
1404 #[test]
1405 fn test_pseudo_random_send() {
1406 #[cfg(miri)]
1407 const N: usize = 20;
1408 #[cfg(not(miri))]
1409 const N: usize = 100;
1410
1411 for cap in 0..N {
1412 let c = make::<i32>(cap);
1413 let l = Arc::new(Mutex::new(vec![0i32; N]));
1414 let done = make::<bool>(0);
1415
1416 go!(c, done, l, {
1417 let mut l = l.lock().unwrap();
1418 for i in 0..N {
1419 thread::yield_now();
1420 l[i] = c.recv().unwrap();
1421 }
1422 done.send(true);
1423 });
1424
1425 for _ in 0..N {
1426 select! {
1427 send(c.tx(), 1) -> _ => {}
1428 send(c.tx(), 0) -> _ => {}
1429 }
1430 }
1431 done.recv();
1432
1433 let mut n0 = 0;
1434 let mut n1 = 0;
1435 for &i in l.lock().unwrap().iter() {
1436 n0 += (i + 1) % 2;
1437 n1 += i;
1438 }
1439
1440 if n0 <= N as i32 / 10 || n1 <= N as i32 / 10 {
1441 panic!(
1442 "Want pseudorandom, got {} zeros and {} ones (chan cap {})",
1443 n0, n1, cap,
1444 );
1445 }
1446 }
1447 }
1448
1449 #[test]
1450 fn test_multi_consumer() {
1451 const NWORK: usize = 23;
1452 #[cfg(miri)]
1453 const NITER: usize = 100;
1454 #[cfg(not(miri))]
1455 const NITER: usize = 271828;
1456
1457 let pn = [2, 3, 7, 11, 13, 17, 19, 23, 27, 31];
1458
1459 let q = make::<i32>(NWORK * 3);
1460 let r = make::<i32>(NWORK * 3);
1461
1462 let wg = WaitGroup::new();
1463 for i in 0..NWORK {
1464 wg.add(1);
1465 let w = i;
1466 go!(q, r, wg, pn, {
1467 for v in &q {
1468 if pn[w % pn.len()] == v {
1469 thread::yield_now();
1470 }
1471 r.send(v);
1472 }
1473 wg.done();
1474 });
1475 }
1476
1477 let expect = Arc::new(Mutex::new(0));
1478 go!(q, r, expect, wg, pn, {
1479 for i in 0..NITER {
1480 let v = pn[i % pn.len()];
1481 *expect.lock().unwrap() += v;
1482 q.send(v);
1483 }
1484 q.close();
1485 wg.wait();
1486 r.close();
1487 });
1488
1489 let mut n = 0;
1490 let mut s = 0;
1491 for v in &r {
1492 n += 1;
1493 s += v;
1494 }
1495
1496 if n != NITER || s != *expect.lock().unwrap() {
1497 panic!();
1498 }
1499 }
1500
1501 #[test]
1502 fn test_select_duplicate_channel() {
1503 // This test makes sure we can queue a G on
1504 // the same channel multiple times.
1505 let c = make::<i32>(0);
1506 let d = make::<i32>(0);
1507 let e = make::<i32>(0);
1508
1509 go!(c, d, e, {
1510 select! {
1511 recv(c.rx()) -> _ => {}
1512 recv(d.rx()) -> _ => {}
1513 recv(e.rx()) -> _ => {}
1514 }
1515 e.send(9);
1516 });
1517 thread::sleep(ms(1));
1518
1519 go!(c, c.recv());
1520 thread::sleep(ms(1));
1521
1522 d.send(7);
1523 e.recv();
1524 c.send(8);
1525 }
1526 }
1527
1528 // https://github.com/golang/go/blob/master/test/closedchan.go
1529 mod closedchan {
1530 // TODO
1531 }
1532
1533 // https://github.com/golang/go/blob/master/src/runtime/chanbarrier_test.go
1534 mod chanbarrier_test {
1535 // TODO
1536 }
1537
1538 // https://github.com/golang/go/blob/master/src/runtime/race/testdata/chan_test.go
1539 mod race_chan_test {
1540 // TODO
1541 }
1542
1543 // https://github.com/golang/go/blob/master/test/ken/chan.go
1544 mod chan {
1545 // TODO
1546 }
1547
1548 // https://github.com/golang/go/blob/master/test/ken/chan1.go
1549 mod chan1 {
1550 use super::*;
1551
1552 // sent messages
1553 #[cfg(miri)]
1554 const N: usize = 100;
1555 #[cfg(not(miri))]
1556 const N: usize = 1000;
1557 // receiving "goroutines"
1558 const M: usize = 10;
1559 // channel buffering
1560 const W: usize = 2;
1561
1562 fn r(c: Chan<usize>, m: usize, h: Arc<Mutex<[usize; N]>>) {
1563 loop {
1564 select! {
1565 recv(c.rx()) -> rr => {
1566 let r = rr.unwrap();
1567 let mut data = h.lock().unwrap();
1568 if data[r] != 1 {
1569 println!("r\nm={}\nr={}\nh={}\n", m, r, data[r]);
1570 panic!("fail")
1571 }
1572 data[r] = 2;
1573 }
1574 }
1575 }
1576 }
1577
1578 fn s(c: Chan<usize>, h: Arc<Mutex<[usize; N]>>) {
1579 for n in 0..N {
1580 let r = n;
1581 let mut data = h.lock().unwrap();
1582 if data[r] != 0 {
1583 println!("s");
1584 panic!("fail");
1585 }
1586 data[r] = 1;
1587 // https://github.com/crossbeam-rs/crossbeam/pull/615#discussion_r550281094
1588 drop(data);
1589 c.send(r);
1590 }
1591 }
1592
1593 #[test]
1594 fn main() {
1595 let h = Arc::new(Mutex::new([0usize; N]));
1596 let c = make::<usize>(W);
1597 for m in 0..M {
1598 go!(c, h, {
1599 r(c, m, h);
1600 });
1601 thread::yield_now();
1602 }
1603 thread::yield_now();
1604 thread::yield_now();
1605 s(c, h);
1606 }
1607 }