]> git.proxmox.com Git - rustc.git/blob - src/test/ui/threads-sendsync/mpsc_stress.rs
New upstream version 1.58.1+dfsg1
[rustc.git] / src / test / ui / threads-sendsync / mpsc_stress.rs
1 // run-pass
2 // compile-flags:--test
3 // ignore-emscripten
4
5 use std::sync::mpsc::channel;
6 use std::sync::mpsc::TryRecvError;
7 use std::sync::mpsc::RecvError;
8 use std::sync::mpsc::RecvTimeoutError;
9 use std::sync::Arc;
10 use std::sync::atomic::AtomicUsize;
11 use std::sync::atomic::Ordering;
12
13 use std::thread;
14 use std::time::Duration;
15
16
17 /// Simple thread synchronization utility
18 struct Barrier {
19 // Not using mutex/condvar for precision
20 shared: Arc<AtomicUsize>,
21 count: usize,
22 }
23
24 impl Barrier {
25 fn new(count: usize) -> Vec<Barrier> {
26 let shared = Arc::new(AtomicUsize::new(0));
27 (0..count).map(|_| Barrier { shared: shared.clone(), count: count }).collect()
28 }
29
30 fn new2() -> (Barrier, Barrier) {
31 let mut v = Barrier::new(2);
32 (v.pop().unwrap(), v.pop().unwrap())
33 }
34
35 /// Returns when `count` threads enter `wait`
36 fn wait(self) {
37 self.shared.fetch_add(1, Ordering::SeqCst);
38 while self.shared.load(Ordering::SeqCst) != self.count {
39 #[cfg(target_env = "sgx")]
40 thread::yield_now();
41 }
42 }
43 }
44
45
46 fn shared_close_sender_does_not_lose_messages_iter() {
47 let (tb, rb) = Barrier::new2();
48
49 let (tx, rx) = channel();
50 let _ = tx.clone(); // convert to shared
51
52 thread::spawn(move || {
53 tb.wait();
54 thread::sleep(Duration::from_micros(1));
55 tx.send(17).expect("send");
56 drop(tx);
57 });
58
59 let i = rx.into_iter();
60 rb.wait();
61 // Make sure it doesn't return disconnected before returning an element
62 assert_eq!(vec![17], i.collect::<Vec<_>>());
63 }
64
65 #[test]
66 fn shared_close_sender_does_not_lose_messages() {
67 for _ in 0..10000 {
68 shared_close_sender_does_not_lose_messages_iter();
69 }
70 }
71
72
73 // https://github.com/rust-lang/rust/issues/39364
74 fn concurrent_recv_timeout_and_upgrade_iter() {
75 // 1 us
76 let sleep = Duration::new(0, 1_000);
77
78 let (a, b) = Barrier::new2();
79 let (tx, rx) = channel();
80 let th = thread::spawn(move || {
81 a.wait();
82 loop {
83 match rx.recv_timeout(sleep) {
84 Ok(_) => {
85 break;
86 },
87 Err(_) => {},
88 }
89 }
90 });
91 b.wait();
92 thread::sleep(sleep);
93 tx.clone().send(()).expect("send");
94 th.join().unwrap();
95 }
96
97 #[test]
98 fn concurrent_recv_timeout_and_upgrade() {
99 // FIXME: fix and enable
100 if true { return }
101
102 // at the moment of writing this test fails like this:
103 // thread '<unnamed>' panicked at 'assertion failed: `(left == right)`
104 // left: `4561387584`,
105 // right: `0`', libstd/sync/mpsc/shared.rs:253:13
106
107 for _ in 0..10000 {
108 concurrent_recv_timeout_and_upgrade_iter();
109 }
110 }
111
112
113 fn concurrent_writes_iter() {
114 const THREADS: usize = 4;
115 const PER_THR: usize = 100;
116
117 let mut bs = Barrier::new(THREADS + 1);
118 let (tx, rx) = channel();
119
120 let mut threads = Vec::new();
121 for j in 0..THREADS {
122 let tx = tx.clone();
123 let b = bs.pop().unwrap();
124 threads.push(thread::spawn(move || {
125 b.wait();
126 for i in 0..PER_THR {
127 tx.send(j * 1000 + i).expect("send");
128 }
129 }));
130 }
131
132 let b = bs.pop().unwrap();
133 b.wait();
134
135 let mut v: Vec<_> = rx.iter().take(THREADS * PER_THR).collect();
136 v.sort();
137
138 for j in 0..THREADS {
139 for i in 0..PER_THR {
140 assert_eq!(j * 1000 + i, v[j * PER_THR + i]);
141 }
142 }
143
144 for t in threads {
145 t.join().unwrap();
146 }
147
148 let one_us = Duration::new(0, 1000);
149
150 assert_eq!(TryRecvError::Empty, rx.try_recv().unwrap_err());
151 assert_eq!(RecvTimeoutError::Timeout, rx.recv_timeout(one_us).unwrap_err());
152
153 drop(tx);
154
155 assert_eq!(RecvError, rx.recv().unwrap_err());
156 assert_eq!(RecvTimeoutError::Disconnected, rx.recv_timeout(one_us).unwrap_err());
157 assert_eq!(TryRecvError::Disconnected, rx.try_recv().unwrap_err());
158 }
159
160 #[test]
161 fn concurrent_writes() {
162 for _ in 0..100 {
163 concurrent_writes_iter();
164 }
165 }