]> git.proxmox.com Git - rustc.git/blob - vendor/crossbeam-deque-0.7.3/tests/injector.rs
New upstream version 1.55.0+dfsg1
[rustc.git] / vendor / crossbeam-deque-0.7.3 / tests / injector.rs
1 extern crate crossbeam_deque as deque;
2 extern crate crossbeam_utils as utils;
3 extern crate rand;
4
5 use std::sync::atomic::Ordering::SeqCst;
6 use std::sync::atomic::{AtomicBool, AtomicUsize};
7 use std::sync::{Arc, Mutex};
8
9 use deque::Steal::{Empty, Success};
10 use deque::{Injector, Worker};
11 use rand::Rng;
12 use utils::thread::scope;
13
14 #[test]
15 fn smoke() {
16 let q = Injector::new();
17 assert_eq!(q.steal(), Empty);
18
19 q.push(1);
20 q.push(2);
21 assert_eq!(q.steal(), Success(1));
22 assert_eq!(q.steal(), Success(2));
23 assert_eq!(q.steal(), Empty);
24
25 q.push(3);
26 assert_eq!(q.steal(), Success(3));
27 assert_eq!(q.steal(), Empty);
28 }
29
30 #[test]
31 fn is_empty() {
32 let q = Injector::new();
33 assert!(q.is_empty());
34
35 q.push(1);
36 assert!(!q.is_empty());
37 q.push(2);
38 assert!(!q.is_empty());
39
40 let _ = q.steal();
41 assert!(!q.is_empty());
42 let _ = q.steal();
43 assert!(q.is_empty());
44
45 q.push(3);
46 assert!(!q.is_empty());
47 let _ = q.steal();
48 assert!(q.is_empty());
49 }
50
51 #[test]
52 fn spsc() {
53 const COUNT: usize = 100_000;
54
55 let q = Injector::new();
56
57 scope(|scope| {
58 scope.spawn(|_| {
59 for i in 0..COUNT {
60 loop {
61 if let Success(v) = q.steal() {
62 assert_eq!(i, v);
63 break;
64 }
65 }
66 }
67
68 assert_eq!(q.steal(), Empty);
69 });
70
71 for i in 0..COUNT {
72 q.push(i);
73 }
74 })
75 .unwrap();
76 }
77
78 #[test]
79 fn mpmc() {
80 const COUNT: usize = 25_000;
81 const THREADS: usize = 4;
82
83 let q = Injector::new();
84 let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
85
86 scope(|scope| {
87 for _ in 0..THREADS {
88 scope.spawn(|_| {
89 for i in 0..COUNT {
90 q.push(i);
91 }
92 });
93 }
94
95 for _ in 0..THREADS {
96 scope.spawn(|_| {
97 for _ in 0..COUNT {
98 loop {
99 if let Success(n) = q.steal() {
100 v[n].fetch_add(1, SeqCst);
101 break;
102 }
103 }
104 }
105 });
106 }
107 })
108 .unwrap();
109
110 for c in v {
111 assert_eq!(c.load(SeqCst), THREADS);
112 }
113 }
114
115 #[test]
116 fn stampede() {
117 const THREADS: usize = 8;
118 const COUNT: usize = 50_000;
119
120 let q = Injector::new();
121
122 for i in 0..COUNT {
123 q.push(Box::new(i + 1));
124 }
125 let remaining = Arc::new(AtomicUsize::new(COUNT));
126
127 scope(|scope| {
128 for _ in 0..THREADS {
129 let remaining = remaining.clone();
130 let q = &q;
131
132 scope.spawn(move |_| {
133 let mut last = 0;
134 while remaining.load(SeqCst) > 0 {
135 if let Success(x) = q.steal() {
136 assert!(last < *x);
137 last = *x;
138 remaining.fetch_sub(1, SeqCst);
139 }
140 }
141 });
142 }
143
144 let mut last = 0;
145 while remaining.load(SeqCst) > 0 {
146 if let Success(x) = q.steal() {
147 assert!(last < *x);
148 last = *x;
149 remaining.fetch_sub(1, SeqCst);
150 }
151 }
152 })
153 .unwrap();
154 }
155
156 #[test]
157 fn stress() {
158 const THREADS: usize = 8;
159 const COUNT: usize = 50_000;
160
161 let q = Injector::new();
162 let done = Arc::new(AtomicBool::new(false));
163 let hits = Arc::new(AtomicUsize::new(0));
164
165 scope(|scope| {
166 for _ in 0..THREADS {
167 let done = done.clone();
168 let hits = hits.clone();
169 let q = &q;
170
171 scope.spawn(move |_| {
172 let w2 = Worker::new_fifo();
173
174 while !done.load(SeqCst) {
175 if let Success(_) = q.steal() {
176 hits.fetch_add(1, SeqCst);
177 }
178
179 let _ = q.steal_batch(&w2);
180
181 if let Success(_) = q.steal_batch_and_pop(&w2) {
182 hits.fetch_add(1, SeqCst);
183 }
184
185 while let Some(_) = w2.pop() {
186 hits.fetch_add(1, SeqCst);
187 }
188 }
189 });
190 }
191
192 let mut rng = rand::thread_rng();
193 let mut expected = 0;
194 while expected < COUNT {
195 if rng.gen_range(0, 3) == 0 {
196 while let Success(_) = q.steal() {
197 hits.fetch_add(1, SeqCst);
198 }
199 } else {
200 q.push(expected);
201 expected += 1;
202 }
203 }
204
205 while hits.load(SeqCst) < COUNT {
206 while let Success(_) = q.steal() {
207 hits.fetch_add(1, SeqCst);
208 }
209 }
210 done.store(true, SeqCst);
211 })
212 .unwrap();
213 }
214
215 #[test]
216 fn no_starvation() {
217 const THREADS: usize = 8;
218 const COUNT: usize = 50_000;
219
220 let q = Injector::new();
221 let done = Arc::new(AtomicBool::new(false));
222 let mut all_hits = Vec::new();
223
224 scope(|scope| {
225 for _ in 0..THREADS {
226 let done = done.clone();
227 let hits = Arc::new(AtomicUsize::new(0));
228 all_hits.push(hits.clone());
229 let q = &q;
230
231 scope.spawn(move |_| {
232 let w2 = Worker::new_fifo();
233
234 while !done.load(SeqCst) {
235 if let Success(_) = q.steal() {
236 hits.fetch_add(1, SeqCst);
237 }
238
239 let _ = q.steal_batch(&w2);
240
241 if let Success(_) = q.steal_batch_and_pop(&w2) {
242 hits.fetch_add(1, SeqCst);
243 }
244
245 while let Some(_) = w2.pop() {
246 hits.fetch_add(1, SeqCst);
247 }
248 }
249 });
250 }
251
252 let mut rng = rand::thread_rng();
253 let mut my_hits = 0;
254 loop {
255 for i in 0..rng.gen_range(0, COUNT) {
256 if rng.gen_range(0, 3) == 0 && my_hits == 0 {
257 while let Success(_) = q.steal() {
258 my_hits += 1;
259 }
260 } else {
261 q.push(i);
262 }
263 }
264
265 if my_hits > 0 && all_hits.iter().all(|h| h.load(SeqCst) > 0) {
266 break;
267 }
268 }
269 done.store(true, SeqCst);
270 })
271 .unwrap();
272 }
273
274 #[test]
275 fn destructors() {
276 const THREADS: usize = 8;
277 const COUNT: usize = 50_000;
278 const STEPS: usize = 1000;
279
280 struct Elem(usize, Arc<Mutex<Vec<usize>>>);
281
282 impl Drop for Elem {
283 fn drop(&mut self) {
284 self.1.lock().unwrap().push(self.0);
285 }
286 }
287
288 let q = Injector::new();
289 let dropped = Arc::new(Mutex::new(Vec::new()));
290 let remaining = Arc::new(AtomicUsize::new(COUNT));
291
292 for i in 0..COUNT {
293 q.push(Elem(i, dropped.clone()));
294 }
295
296 scope(|scope| {
297 for _ in 0..THREADS {
298 let remaining = remaining.clone();
299 let q = &q;
300
301 scope.spawn(move |_| {
302 let w2 = Worker::new_fifo();
303 let mut cnt = 0;
304
305 while cnt < STEPS {
306 if let Success(_) = q.steal() {
307 cnt += 1;
308 remaining.fetch_sub(1, SeqCst);
309 }
310
311 let _ = q.steal_batch(&w2);
312
313 if let Success(_) = q.steal_batch_and_pop(&w2) {
314 cnt += 1;
315 remaining.fetch_sub(1, SeqCst);
316 }
317
318 while let Some(_) = w2.pop() {
319 cnt += 1;
320 remaining.fetch_sub(1, SeqCst);
321 }
322 }
323 });
324 }
325
326 for _ in 0..STEPS {
327 if let Success(_) = q.steal() {
328 remaining.fetch_sub(1, SeqCst);
329 }
330 }
331 })
332 .unwrap();
333
334 let rem = remaining.load(SeqCst);
335 assert!(rem > 0);
336
337 {
338 let mut v = dropped.lock().unwrap();
339 assert_eq!(v.len(), COUNT - rem);
340 v.clear();
341 }
342
343 drop(q);
344
345 {
346 let mut v = dropped.lock().unwrap();
347 assert_eq!(v.len(), rem);
348 v.sort();
349 for pair in v.windows(2) {
350 assert_eq!(pair[0] + 1, pair[1]);
351 }
352 }
353 }