1 use crate::sync
::broadcast
;
2 use crate::sync
::broadcast
::error
::RecvError
::{Closed, Lagged}
;
4 use loom
::future
::block_on
;
7 use tokio_test
::{assert_err, assert_ok}
;
12 let (tx1
, mut rx
) = broadcast
::channel(2);
13 let tx1
= Arc
::new(tx1
);
14 let tx2
= tx1
.clone();
16 let th1
= thread
::spawn(move || {
18 assert_ok
!(tx1
.send("one"));
19 assert_ok
!(tx1
.send("two"));
20 assert_ok
!(tx1
.send("three"));
24 let th2
= thread
::spawn(move || {
26 assert_ok
!(tx2
.send("eins"));
27 assert_ok
!(tx2
.send("zwei"));
28 assert_ok
!(tx2
.send("drei"));
35 match rx
.recv().await
{
38 Err(Lagged(n
)) => num
+= n
as usize,
44 assert_ok
!(th1
.join());
45 assert_ok
!(th2
.join());
49 // An `Arc` is used as the value in order to detect memory leaks.
53 let (tx
, mut rx1
) = broadcast
::channel
::<Arc
<&'
static str>>(16);
54 let mut rx2
= tx
.subscribe();
56 let th1
= thread
::spawn(move || {
58 let v
= assert_ok
!(rx1
.recv().await
);
59 assert_eq
!(*v
, "hello");
61 let v
= assert_ok
!(rx1
.recv().await
);
62 assert_eq
!(*v
, "world");
64 match assert_err
!(rx1
.recv().await
) {
71 let th2
= thread
::spawn(move || {
73 let v
= assert_ok
!(rx2
.recv().await
);
74 assert_eq
!(*v
, "hello");
76 let v
= assert_ok
!(rx2
.recv().await
);
77 assert_eq
!(*v
, "world");
79 match assert_err
!(rx2
.recv().await
) {
86 assert_ok
!(tx
.send(Arc
::new("hello")));
87 assert_ok
!(tx
.send(Arc
::new("world")));
90 assert_ok
!(th1
.join());
91 assert_ok
!(th2
.join());
98 let (tx
, mut rx1
) = broadcast
::channel(2);
99 let mut rx2
= tx
.subscribe();
101 let th1
= thread
::spawn(move || {
106 match rx1
.recv().await
{
108 Err(Closed
) => break,
109 Err(Lagged(n
)) => num
+= n
as usize,
117 let th2
= thread
::spawn(move || {
122 match rx2
.recv().await
{
124 Err(Closed
) => break,
125 Err(Lagged(n
)) => num
+= n
as usize,
133 assert_ok
!(tx
.send("one"));
134 assert_ok
!(tx
.send("two"));
135 assert_ok
!(tx
.send("three"));
139 assert_ok
!(th1
.join());
140 assert_ok
!(th2
.join());
147 let (tx
, mut rx1
) = broadcast
::channel(16);
148 let rx2
= tx
.subscribe();
150 let th1
= thread
::spawn(move || {
152 let v
= assert_ok
!(rx1
.recv().await
);
153 assert_eq
!(v
, "one");
155 let v
= assert_ok
!(rx1
.recv().await
);
156 assert_eq
!(v
, "two");
158 let v
= assert_ok
!(rx1
.recv().await
);
159 assert_eq
!(v
, "three");
161 match assert_err
!(rx1
.recv().await
) {
168 let th2
= thread
::spawn(move || {
172 assert_ok
!(tx
.send("one"));
173 assert_ok
!(tx
.send("two"));
174 assert_ok
!(tx
.send("three"));
177 assert_ok
!(th1
.join());
178 assert_ok
!(th2
.join());
183 fn drop_multiple_rx_with_overflow() {
184 loom
::model(move || {
185 // It is essential to have multiple senders and receivers in this test case.
186 let (tx
, mut rx
) = broadcast
::channel(1);
187 let _rx2
= tx
.subscribe();
190 let tx2
= tx
.clone();
191 let th1
= thread
::spawn(move || {
194 let _
= tx2
.send(());
200 let th2
= thread
::spawn(move || {
201 block_on(async { while let Ok(_) = rx.recv().await {}
});
204 assert_ok
!(th1
.join());
205 assert_ok
!(th2
.join());