2 // compile-flags:--test
5 use std
::sync
::mpsc
::channel
;
6 use std
::sync
::mpsc
::TryRecvError
;
7 use std
::sync
::mpsc
::RecvError
;
8 use std
::sync
::mpsc
::RecvTimeoutError
;
10 use std
::sync
::atomic
::AtomicUsize
;
11 use std
::sync
::atomic
::Ordering
;
14 use std
::time
::Duration
;
17 /// Simple thread synchronization utility
19 // Not using mutex/condvar for precision
20 shared
: Arc
<AtomicUsize
>,
25 fn new(count
: usize) -> Vec
<Barrier
> {
26 let shared
= Arc
::new(AtomicUsize
::new(0));
27 (0..count
).map(|_
| Barrier { shared: shared.clone(), count: count }
).collect()
30 fn new2() -> (Barrier
, Barrier
) {
31 let mut v
= Barrier
::new(2);
32 (v
.pop().unwrap(), v
.pop().unwrap())
35 /// Returns when `count` threads enter `wait`
37 self.shared
.fetch_add(1, Ordering
::SeqCst
);
38 while self.shared
.load(Ordering
::SeqCst
) != self.count
{
39 #[cfg(target_env = "sgx")]
46 fn shared_close_sender_does_not_lose_messages_iter() {
47 let (tb
, rb
) = Barrier
::new2();
49 let (tx
, rx
) = channel();
50 let _
= tx
.clone(); // convert to shared
52 thread
::spawn(move || {
54 thread
::sleep(Duration
::from_micros(1));
55 tx
.send(17).expect("send");
59 let i
= rx
.into_iter();
61 // Make sure it doesn't return disconnected before returning an element
62 assert_eq
!(vec
![17], i
.collect
::<Vec
<_
>>());
66 fn shared_close_sender_does_not_lose_messages() {
68 shared_close_sender_does_not_lose_messages_iter();
73 // https://github.com/rust-lang/rust/issues/39364
74 fn concurrent_recv_timeout_and_upgrade_iter() {
76 let sleep
= Duration
::new(0, 1_000);
78 let (a
, b
) = Barrier
::new2();
79 let (tx
, rx
) = channel();
80 let th
= thread
::spawn(move || {
83 match rx
.recv_timeout(sleep
) {
93 tx
.clone().send(()).expect("send");
98 fn concurrent_recv_timeout_and_upgrade() {
99 // FIXME: fix and enable
102 // at the moment of writing this test fails like this:
103 // thread '<unnamed>' panicked at 'assertion failed: `(left == right)`
104 // left: `4561387584`,
105 // right: `0`', libstd/sync/mpsc/shared.rs:253:13
108 concurrent_recv_timeout_and_upgrade_iter();
113 fn concurrent_writes_iter() {
114 const THREADS
: usize = 4;
115 const PER_THR
: usize = 100;
117 let mut bs
= Barrier
::new(THREADS
+ 1);
118 let (tx
, rx
) = channel();
120 let mut threads
= Vec
::new();
121 for j
in 0..THREADS
{
123 let b
= bs
.pop().unwrap();
124 threads
.push(thread
::spawn(move || {
126 for i
in 0..PER_THR
{
127 tx
.send(j
* 1000 + i
).expect("send");
132 let b
= bs
.pop().unwrap();
135 let mut v
: Vec
<_
> = rx
.iter().take(THREADS
* PER_THR
).collect();
138 for j
in 0..THREADS
{
139 for i
in 0..PER_THR
{
140 assert_eq
!(j
* 1000 + i
, v
[j
* PER_THR
+ i
]);
148 let one_us
= Duration
::new(0, 1000);
150 assert_eq
!(TryRecvError
::Empty
, rx
.try_recv().unwrap_err());
151 assert_eq
!(RecvTimeoutError
::Timeout
, rx
.recv_timeout(one_us
).unwrap_err());
155 assert_eq
!(RecvError
, rx
.recv().unwrap_err());
156 assert_eq
!(RecvTimeoutError
::Disconnected
, rx
.recv_timeout(one_us
).unwrap_err());
157 assert_eq
!(TryRecvError
::Disconnected
, rx
.try_recv().unwrap_err());
161 fn concurrent_writes() {
163 concurrent_writes_iter();