]>
Commit | Line | Data |
---|---|---|
1b1a35ee XL |
1 | //! Tests for the array channel flavor. |
2 | ||
1b1a35ee XL |
3 | use std::any::Any; |
4 | use std::sync::atomic::AtomicUsize; | |
5 | use std::sync::atomic::Ordering; | |
6 | use std::thread; | |
7 | use std::time::Duration; | |
8 | ||
5869c6ff | 9 | use crossbeam_channel::{bounded, select, Receiver}; |
1b1a35ee XL |
10 | use crossbeam_channel::{RecvError, RecvTimeoutError, TryRecvError}; |
11 | use crossbeam_channel::{SendError, SendTimeoutError, TrySendError}; | |
12 | use crossbeam_utils::thread::scope; | |
13 | use rand::{thread_rng, Rng}; | |
14 | ||
15 | fn ms(ms: u64) -> Duration { | |
16 | Duration::from_millis(ms) | |
17 | } | |
18 | ||
19 | #[test] | |
20 | fn smoke() { | |
21 | let (s, r) = bounded(1); | |
22 | s.send(7).unwrap(); | |
23 | assert_eq!(r.try_recv(), Ok(7)); | |
24 | ||
25 | s.send(8).unwrap(); | |
26 | assert_eq!(r.recv(), Ok(8)); | |
27 | ||
28 | assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); | |
29 | assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout)); | |
30 | } | |
31 | ||
32 | #[test] | |
33 | fn capacity() { | |
34 | for i in 1..10 { | |
35 | let (s, r) = bounded::<()>(i); | |
36 | assert_eq!(s.capacity(), Some(i)); | |
37 | assert_eq!(r.capacity(), Some(i)); | |
38 | } | |
39 | } | |
40 | ||
41 | #[test] | |
42 | fn len_empty_full() { | |
43 | let (s, r) = bounded(2); | |
44 | ||
45 | assert_eq!(s.len(), 0); | |
46 | assert_eq!(s.is_empty(), true); | |
47 | assert_eq!(s.is_full(), false); | |
48 | assert_eq!(r.len(), 0); | |
49 | assert_eq!(r.is_empty(), true); | |
50 | assert_eq!(r.is_full(), false); | |
51 | ||
52 | s.send(()).unwrap(); | |
53 | ||
54 | assert_eq!(s.len(), 1); | |
55 | assert_eq!(s.is_empty(), false); | |
56 | assert_eq!(s.is_full(), false); | |
57 | assert_eq!(r.len(), 1); | |
58 | assert_eq!(r.is_empty(), false); | |
59 | assert_eq!(r.is_full(), false); | |
60 | ||
61 | s.send(()).unwrap(); | |
62 | ||
63 | assert_eq!(s.len(), 2); | |
64 | assert_eq!(s.is_empty(), false); | |
65 | assert_eq!(s.is_full(), true); | |
66 | assert_eq!(r.len(), 2); | |
67 | assert_eq!(r.is_empty(), false); | |
68 | assert_eq!(r.is_full(), true); | |
69 | ||
70 | r.recv().unwrap(); | |
71 | ||
72 | assert_eq!(s.len(), 1); | |
73 | assert_eq!(s.is_empty(), false); | |
74 | assert_eq!(s.is_full(), false); | |
75 | assert_eq!(r.len(), 1); | |
76 | assert_eq!(r.is_empty(), false); | |
77 | assert_eq!(r.is_full(), false); | |
78 | } | |
79 | ||
80 | #[test] | |
81 | fn try_recv() { | |
82 | let (s, r) = bounded(100); | |
83 | ||
84 | scope(|scope| { | |
85 | scope.spawn(move |_| { | |
86 | assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); | |
87 | thread::sleep(ms(1500)); | |
88 | assert_eq!(r.try_recv(), Ok(7)); | |
89 | thread::sleep(ms(500)); | |
90 | assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)); | |
91 | }); | |
92 | scope.spawn(move |_| { | |
93 | thread::sleep(ms(1000)); | |
94 | s.send(7).unwrap(); | |
95 | }); | |
96 | }) | |
97 | .unwrap(); | |
98 | } | |
99 | ||
100 | #[test] | |
101 | fn recv() { | |
102 | let (s, r) = bounded(100); | |
103 | ||
104 | scope(|scope| { | |
105 | scope.spawn(move |_| { | |
106 | assert_eq!(r.recv(), Ok(7)); | |
107 | thread::sleep(ms(1000)); | |
108 | assert_eq!(r.recv(), Ok(8)); | |
109 | thread::sleep(ms(1000)); | |
110 | assert_eq!(r.recv(), Ok(9)); | |
111 | assert_eq!(r.recv(), Err(RecvError)); | |
112 | }); | |
113 | scope.spawn(move |_| { | |
114 | thread::sleep(ms(1500)); | |
115 | s.send(7).unwrap(); | |
116 | s.send(8).unwrap(); | |
117 | s.send(9).unwrap(); | |
118 | }); | |
119 | }) | |
120 | .unwrap(); | |
121 | } | |
122 | ||
123 | #[test] | |
124 | fn recv_timeout() { | |
125 | let (s, r) = bounded::<i32>(100); | |
126 | ||
127 | scope(|scope| { | |
128 | scope.spawn(move |_| { | |
129 | assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout)); | |
130 | assert_eq!(r.recv_timeout(ms(1000)), Ok(7)); | |
131 | assert_eq!( | |
132 | r.recv_timeout(ms(1000)), | |
133 | Err(RecvTimeoutError::Disconnected) | |
134 | ); | |
135 | }); | |
136 | scope.spawn(move |_| { | |
137 | thread::sleep(ms(1500)); | |
138 | s.send(7).unwrap(); | |
139 | }); | |
140 | }) | |
141 | .unwrap(); | |
142 | } | |
143 | ||
144 | #[test] | |
145 | fn try_send() { | |
146 | let (s, r) = bounded(1); | |
147 | ||
148 | scope(|scope| { | |
149 | scope.spawn(move |_| { | |
150 | assert_eq!(s.try_send(1), Ok(())); | |
151 | assert_eq!(s.try_send(2), Err(TrySendError::Full(2))); | |
152 | thread::sleep(ms(1500)); | |
153 | assert_eq!(s.try_send(3), Ok(())); | |
154 | thread::sleep(ms(500)); | |
155 | assert_eq!(s.try_send(4), Err(TrySendError::Disconnected(4))); | |
156 | }); | |
157 | scope.spawn(move |_| { | |
158 | thread::sleep(ms(1000)); | |
159 | assert_eq!(r.try_recv(), Ok(1)); | |
160 | assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); | |
161 | assert_eq!(r.recv(), Ok(3)); | |
162 | }); | |
163 | }) | |
164 | .unwrap(); | |
165 | } | |
166 | ||
167 | #[test] | |
168 | fn send() { | |
169 | let (s, r) = bounded(1); | |
170 | ||
171 | scope(|scope| { | |
172 | scope.spawn(|_| { | |
173 | s.send(7).unwrap(); | |
174 | thread::sleep(ms(1000)); | |
175 | s.send(8).unwrap(); | |
176 | thread::sleep(ms(1000)); | |
177 | s.send(9).unwrap(); | |
178 | thread::sleep(ms(1000)); | |
179 | s.send(10).unwrap(); | |
180 | }); | |
181 | scope.spawn(|_| { | |
182 | thread::sleep(ms(1500)); | |
183 | assert_eq!(r.recv(), Ok(7)); | |
184 | assert_eq!(r.recv(), Ok(8)); | |
185 | assert_eq!(r.recv(), Ok(9)); | |
186 | }); | |
187 | }) | |
188 | .unwrap(); | |
189 | } | |
190 | ||
191 | #[test] | |
192 | fn send_timeout() { | |
193 | let (s, r) = bounded(2); | |
194 | ||
195 | scope(|scope| { | |
196 | scope.spawn(move |_| { | |
197 | assert_eq!(s.send_timeout(1, ms(1000)), Ok(())); | |
198 | assert_eq!(s.send_timeout(2, ms(1000)), Ok(())); | |
199 | assert_eq!( | |
200 | s.send_timeout(3, ms(500)), | |
201 | Err(SendTimeoutError::Timeout(3)) | |
202 | ); | |
203 | thread::sleep(ms(1000)); | |
204 | assert_eq!(s.send_timeout(4, ms(1000)), Ok(())); | |
205 | thread::sleep(ms(1000)); | |
206 | assert_eq!(s.send(5), Err(SendError(5))); | |
207 | }); | |
208 | scope.spawn(move |_| { | |
209 | thread::sleep(ms(1000)); | |
210 | assert_eq!(r.recv(), Ok(1)); | |
211 | thread::sleep(ms(1000)); | |
212 | assert_eq!(r.recv(), Ok(2)); | |
213 | assert_eq!(r.recv(), Ok(4)); | |
214 | }); | |
215 | }) | |
216 | .unwrap(); | |
217 | } | |
218 | ||
219 | #[test] | |
220 | fn send_after_disconnect() { | |
221 | let (s, r) = bounded(100); | |
222 | ||
223 | s.send(1).unwrap(); | |
224 | s.send(2).unwrap(); | |
225 | s.send(3).unwrap(); | |
226 | ||
227 | drop(r); | |
228 | ||
229 | assert_eq!(s.send(4), Err(SendError(4))); | |
230 | assert_eq!(s.try_send(5), Err(TrySendError::Disconnected(5))); | |
231 | assert_eq!( | |
232 | s.send_timeout(6, ms(500)), | |
233 | Err(SendTimeoutError::Disconnected(6)) | |
234 | ); | |
235 | } | |
236 | ||
237 | #[test] | |
238 | fn recv_after_disconnect() { | |
239 | let (s, r) = bounded(100); | |
240 | ||
241 | s.send(1).unwrap(); | |
242 | s.send(2).unwrap(); | |
243 | s.send(3).unwrap(); | |
244 | ||
245 | drop(s); | |
246 | ||
247 | assert_eq!(r.recv(), Ok(1)); | |
248 | assert_eq!(r.recv(), Ok(2)); | |
249 | assert_eq!(r.recv(), Ok(3)); | |
250 | assert_eq!(r.recv(), Err(RecvError)); | |
251 | } | |
252 | ||
253 | #[test] | |
254 | fn len() { | |
255 | const COUNT: usize = 25_000; | |
256 | const CAP: usize = 1000; | |
257 | ||
258 | let (s, r) = bounded(CAP); | |
259 | ||
260 | assert_eq!(s.len(), 0); | |
261 | assert_eq!(r.len(), 0); | |
262 | ||
263 | for _ in 0..CAP / 10 { | |
264 | for i in 0..50 { | |
265 | s.send(i).unwrap(); | |
266 | assert_eq!(s.len(), i + 1); | |
267 | } | |
268 | ||
269 | for i in 0..50 { | |
270 | r.recv().unwrap(); | |
271 | assert_eq!(r.len(), 50 - i - 1); | |
272 | } | |
273 | } | |
274 | ||
275 | assert_eq!(s.len(), 0); | |
276 | assert_eq!(r.len(), 0); | |
277 | ||
278 | for i in 0..CAP { | |
279 | s.send(i).unwrap(); | |
280 | assert_eq!(s.len(), i + 1); | |
281 | } | |
282 | ||
283 | for _ in 0..CAP { | |
284 | r.recv().unwrap(); | |
285 | } | |
286 | ||
287 | assert_eq!(s.len(), 0); | |
288 | assert_eq!(r.len(), 0); | |
289 | ||
290 | scope(|scope| { | |
291 | scope.spawn(|_| { | |
292 | for i in 0..COUNT { | |
293 | assert_eq!(r.recv(), Ok(i)); | |
294 | let len = r.len(); | |
295 | assert!(len <= CAP); | |
296 | } | |
297 | }); | |
298 | ||
299 | scope.spawn(|_| { | |
300 | for i in 0..COUNT { | |
301 | s.send(i).unwrap(); | |
302 | let len = s.len(); | |
303 | assert!(len <= CAP); | |
304 | } | |
305 | }); | |
306 | }) | |
307 | .unwrap(); | |
308 | ||
309 | assert_eq!(s.len(), 0); | |
310 | assert_eq!(r.len(), 0); | |
311 | } | |
312 | ||
313 | #[test] | |
314 | fn disconnect_wakes_sender() { | |
315 | let (s, r) = bounded(1); | |
316 | ||
317 | scope(|scope| { | |
318 | scope.spawn(move |_| { | |
319 | assert_eq!(s.send(()), Ok(())); | |
320 | assert_eq!(s.send(()), Err(SendError(()))); | |
321 | }); | |
322 | scope.spawn(move |_| { | |
323 | thread::sleep(ms(1000)); | |
324 | drop(r); | |
325 | }); | |
326 | }) | |
327 | .unwrap(); | |
328 | } | |
329 | ||
330 | #[test] | |
331 | fn disconnect_wakes_receiver() { | |
332 | let (s, r) = bounded::<()>(1); | |
333 | ||
334 | scope(|scope| { | |
335 | scope.spawn(move |_| { | |
336 | assert_eq!(r.recv(), Err(RecvError)); | |
337 | }); | |
338 | scope.spawn(move |_| { | |
339 | thread::sleep(ms(1000)); | |
340 | drop(s); | |
341 | }); | |
342 | }) | |
343 | .unwrap(); | |
344 | } | |
345 | ||
346 | #[test] | |
347 | fn spsc() { | |
348 | const COUNT: usize = 100_000; | |
349 | ||
350 | let (s, r) = bounded(3); | |
351 | ||
352 | scope(|scope| { | |
353 | scope.spawn(move |_| { | |
354 | for i in 0..COUNT { | |
355 | assert_eq!(r.recv(), Ok(i)); | |
356 | } | |
357 | assert_eq!(r.recv(), Err(RecvError)); | |
358 | }); | |
359 | scope.spawn(move |_| { | |
360 | for i in 0..COUNT { | |
361 | s.send(i).unwrap(); | |
362 | } | |
363 | }); | |
364 | }) | |
365 | .unwrap(); | |
366 | } | |
367 | ||
368 | #[test] | |
369 | fn mpmc() { | |
370 | const COUNT: usize = 25_000; | |
371 | const THREADS: usize = 4; | |
372 | ||
373 | let (s, r) = bounded::<usize>(3); | |
374 | let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>(); | |
375 | ||
376 | scope(|scope| { | |
377 | for _ in 0..THREADS { | |
378 | scope.spawn(|_| { | |
379 | for _ in 0..COUNT { | |
380 | let n = r.recv().unwrap(); | |
381 | v[n].fetch_add(1, Ordering::SeqCst); | |
382 | } | |
383 | }); | |
384 | } | |
385 | for _ in 0..THREADS { | |
386 | scope.spawn(|_| { | |
387 | for i in 0..COUNT { | |
388 | s.send(i).unwrap(); | |
389 | } | |
390 | }); | |
391 | } | |
392 | }) | |
393 | .unwrap(); | |
394 | ||
395 | for c in v { | |
396 | assert_eq!(c.load(Ordering::SeqCst), THREADS); | |
397 | } | |
398 | } | |
399 | ||
400 | #[test] | |
401 | fn stress_oneshot() { | |
402 | const COUNT: usize = 10_000; | |
403 | ||
404 | for _ in 0..COUNT { | |
405 | let (s, r) = bounded(1); | |
406 | ||
407 | scope(|scope| { | |
408 | scope.spawn(|_| r.recv().unwrap()); | |
409 | scope.spawn(|_| s.send(0).unwrap()); | |
410 | }) | |
411 | .unwrap(); | |
412 | } | |
413 | } | |
414 | ||
415 | #[test] | |
416 | fn stress_iter() { | |
417 | const COUNT: usize = 100_000; | |
418 | ||
419 | let (request_s, request_r) = bounded(1); | |
420 | let (response_s, response_r) = bounded(1); | |
421 | ||
422 | scope(|scope| { | |
423 | scope.spawn(move |_| { | |
424 | let mut count = 0; | |
425 | loop { | |
426 | for x in response_r.try_iter() { | |
427 | count += x; | |
428 | if count == COUNT { | |
429 | return; | |
430 | } | |
431 | } | |
432 | request_s.send(()).unwrap(); | |
433 | } | |
434 | }); | |
435 | ||
436 | for _ in request_r.iter() { | |
437 | if response_s.send(1).is_err() { | |
438 | break; | |
439 | } | |
440 | } | |
441 | }) | |
442 | .unwrap(); | |
443 | } | |
444 | ||
445 | #[test] | |
446 | fn stress_timeout_two_threads() { | |
447 | const COUNT: usize = 100; | |
448 | ||
449 | let (s, r) = bounded(2); | |
450 | ||
451 | scope(|scope| { | |
452 | scope.spawn(|_| { | |
453 | for i in 0..COUNT { | |
454 | if i % 2 == 0 { | |
455 | thread::sleep(ms(50)); | |
456 | } | |
457 | loop { | |
458 | if let Ok(()) = s.send_timeout(i, ms(10)) { | |
459 | break; | |
460 | } | |
461 | } | |
462 | } | |
463 | }); | |
464 | ||
465 | scope.spawn(|_| { | |
466 | for i in 0..COUNT { | |
467 | if i % 2 == 0 { | |
468 | thread::sleep(ms(50)); | |
469 | } | |
470 | loop { | |
471 | if let Ok(x) = r.recv_timeout(ms(10)) { | |
472 | assert_eq!(x, i); | |
473 | break; | |
474 | } | |
475 | } | |
476 | } | |
477 | }); | |
478 | }) | |
479 | .unwrap(); | |
480 | } | |
481 | ||
482 | #[test] | |
483 | fn drops() { | |
484 | const RUNS: usize = 100; | |
485 | ||
486 | static DROPS: AtomicUsize = AtomicUsize::new(0); | |
487 | ||
488 | #[derive(Debug, PartialEq)] | |
489 | struct DropCounter; | |
490 | ||
491 | impl Drop for DropCounter { | |
492 | fn drop(&mut self) { | |
493 | DROPS.fetch_add(1, Ordering::SeqCst); | |
494 | } | |
495 | } | |
496 | ||
497 | let mut rng = thread_rng(); | |
498 | ||
499 | for _ in 0..RUNS { | |
500 | let steps = rng.gen_range(0, 10_000); | |
501 | let additional = rng.gen_range(0, 50); | |
502 | ||
503 | DROPS.store(0, Ordering::SeqCst); | |
504 | let (s, r) = bounded::<DropCounter>(50); | |
505 | ||
506 | scope(|scope| { | |
507 | scope.spawn(|_| { | |
508 | for _ in 0..steps { | |
509 | r.recv().unwrap(); | |
510 | } | |
511 | }); | |
512 | ||
513 | scope.spawn(|_| { | |
514 | for _ in 0..steps { | |
515 | s.send(DropCounter).unwrap(); | |
516 | } | |
517 | }); | |
518 | }) | |
519 | .unwrap(); | |
520 | ||
521 | for _ in 0..additional { | |
522 | s.send(DropCounter).unwrap(); | |
523 | } | |
524 | ||
525 | assert_eq!(DROPS.load(Ordering::SeqCst), steps); | |
526 | drop(s); | |
527 | drop(r); | |
528 | assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional); | |
529 | } | |
530 | } | |
531 | ||
532 | #[test] | |
533 | fn linearizable() { | |
534 | const COUNT: usize = 25_000; | |
535 | const THREADS: usize = 4; | |
536 | ||
537 | let (s, r) = bounded(THREADS); | |
538 | ||
539 | scope(|scope| { | |
540 | for _ in 0..THREADS { | |
541 | scope.spawn(|_| { | |
542 | for _ in 0..COUNT { | |
543 | s.send(0).unwrap(); | |
544 | r.try_recv().unwrap(); | |
545 | } | |
546 | }); | |
547 | } | |
548 | }) | |
549 | .unwrap(); | |
550 | } | |
551 | ||
552 | #[test] | |
553 | fn fairness() { | |
554 | const COUNT: usize = 10_000; | |
555 | ||
556 | let (s1, r1) = bounded::<()>(COUNT); | |
557 | let (s2, r2) = bounded::<()>(COUNT); | |
558 | ||
559 | for _ in 0..COUNT { | |
560 | s1.send(()).unwrap(); | |
561 | s2.send(()).unwrap(); | |
562 | } | |
563 | ||
564 | let mut hits = [0usize; 2]; | |
565 | for _ in 0..COUNT { | |
566 | select! { | |
567 | recv(r1) -> _ => hits[0] += 1, | |
568 | recv(r2) -> _ => hits[1] += 1, | |
569 | } | |
570 | } | |
571 | assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); | |
572 | } | |
573 | ||
574 | #[test] | |
575 | fn fairness_duplicates() { | |
576 | const COUNT: usize = 10_000; | |
577 | ||
578 | let (s, r) = bounded::<()>(COUNT); | |
579 | ||
580 | for _ in 0..COUNT { | |
581 | s.send(()).unwrap(); | |
582 | } | |
583 | ||
584 | let mut hits = [0usize; 5]; | |
585 | for _ in 0..COUNT { | |
586 | select! { | |
587 | recv(r) -> _ => hits[0] += 1, | |
588 | recv(r) -> _ => hits[1] += 1, | |
589 | recv(r) -> _ => hits[2] += 1, | |
590 | recv(r) -> _ => hits[3] += 1, | |
591 | recv(r) -> _ => hits[4] += 1, | |
592 | } | |
593 | } | |
594 | assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); | |
595 | } | |
596 | ||
597 | #[test] | |
598 | fn recv_in_send() { | |
599 | let (s, _r) = bounded(1); | |
600 | s.send(()).unwrap(); | |
601 | ||
602 | #[allow(unreachable_code)] | |
603 | { | |
604 | select! { | |
605 | send(s, panic!()) -> _ => panic!(), | |
606 | default => {} | |
607 | } | |
608 | } | |
609 | ||
610 | let (s, r) = bounded(2); | |
611 | s.send(()).unwrap(); | |
612 | ||
613 | select! { | |
614 | send(s, assert_eq!(r.recv(), Ok(()))) -> _ => {} | |
615 | } | |
616 | } | |
617 | ||
618 | #[test] | |
619 | fn channel_through_channel() { | |
620 | const COUNT: usize = 1000; | |
621 | ||
622 | type T = Box<dyn Any + Send>; | |
623 | ||
624 | let (s, r) = bounded::<T>(1); | |
625 | ||
626 | scope(|scope| { | |
627 | scope.spawn(move |_| { | |
628 | let mut s = s; | |
629 | ||
630 | for _ in 0..COUNT { | |
631 | let (new_s, new_r) = bounded(1); | |
632 | let new_r: T = Box::new(Some(new_r)); | |
633 | ||
634 | s.send(new_r).unwrap(); | |
635 | s = new_s; | |
636 | } | |
637 | }); | |
638 | ||
639 | scope.spawn(move |_| { | |
640 | let mut r = r; | |
641 | ||
642 | for _ in 0..COUNT { | |
643 | r = r | |
644 | .recv() | |
645 | .unwrap() | |
646 | .downcast_mut::<Option<Receiver<T>>>() | |
647 | .unwrap() | |
648 | .take() | |
649 | .unwrap() | |
650 | } | |
651 | }); | |
652 | }) | |
653 | .unwrap(); | |
654 | } |