1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "full")]
4 use std
::time
::Duration
;
5 use tokio
::io
::{self, copy_bidirectional, AsyncReadExt, AsyncWriteExt}
;
6 use tokio
::net
::TcpStream
;
7 use tokio
::task
::JoinHandle
;
9 async
fn make_socketpair() -> (TcpStream
, TcpStream
) {
10 let listener
= tokio
::net
::TcpListener
::bind("127.0.0.1:0").await
.unwrap();
11 let addr
= listener
.local_addr().unwrap();
12 let connector
= TcpStream
::connect(addr
);
13 let acceptor
= listener
.accept();
15 let (c1
, c2
) = tokio
::join
!(connector
, acceptor
);
17 (c1
.unwrap(), c2
.unwrap().0)
20 async
fn block_write(s
: &mut TcpStream
) -> usize {
21 static BUF
: [u8; 2048] = [0; 2048];
26 result
= s
.write(&BUF
) => {
27 copied
+= result
.expect("write error")
29 _
= tokio
::time
::sleep(Duration
::from_millis(100)) => {
38 async
fn symmetric
<F
, Fut
>(mut cb
: F
)
40 F
: FnMut(JoinHandle
<io
::Result
<(u64, u64)>>, TcpStream
, TcpStream
) -> Fut
,
41 Fut
: std
::future
::Future
<Output
= ()>,
43 // We run the test twice, with streams passed to copy_bidirectional in
44 // different orders, in order to ensure that the two arguments are
47 let (a
, mut a1
) = make_socketpair().await
;
48 let (b
, mut b1
) = make_socketpair().await
;
50 let handle
= tokio
::spawn(async
move { copy_bidirectional(&mut a1, &mut b1).await }
);
51 cb(handle
, a
, b
).await
;
53 let (a
, mut a1
) = make_socketpair().await
;
54 let (b
, mut b1
) = make_socketpair().await
;
56 let handle
= tokio
::spawn(async
move { copy_bidirectional(&mut b1, &mut a1).await }
);
58 cb(handle
, b
, a
).await
;
62 async
fn test_basic_transfer() {
63 symmetric(|_handle
, mut a
, mut b
| async
move {
64 a
.write_all(b
"test").await
.unwrap();
66 b
.read_exact(&mut tmp
).await
.unwrap();
67 assert_eq
!(&tmp
[..], b
"test");
73 async
fn test_transfer_after_close() {
74 symmetric(|handle
, mut a
, mut b
| async
move {
75 AsyncWriteExt
::shutdown(&mut a
).await
.unwrap();
76 b
.read_to_end(&mut Vec
::new()).await
.unwrap();
78 b
.write_all(b
"quux").await
.unwrap();
80 a
.read_exact(&mut tmp
).await
.unwrap();
81 assert_eq
!(&tmp
[..], b
"quux");
83 // Once both are closed, we should have our handle back
86 assert_eq
!(handle
.await
.unwrap().unwrap(), (0, 4));
92 async
fn blocking_one_side_does_not_block_other() {
93 symmetric(|handle
, mut a
, mut b
| async
move {
94 block_write(&mut a
).await
;
96 b
.write_all(b
"quux").await
.unwrap();
98 a
.read_exact(&mut tmp
).await
.unwrap();
99 assert_eq
!(&tmp
[..], b
"quux");
101 AsyncWriteExt
::shutdown(&mut a
).await
.unwrap();
103 let mut buf
= Vec
::new();
104 b
.read_to_end(&mut buf
).await
.unwrap();
108 assert_eq
!(handle
.await
.unwrap().unwrap(), (buf
.len() as u64, 4));
114 async
fn immediate_exit_on_error() {
115 symmetric(|handle
, mut a
, mut b
| async
move {
116 block_write(&mut a
).await
;
118 // Fill up the b->copy->a path. We expect that this will _not_ drain
119 // before we exit the copy task.
120 let _bytes_written
= block_write(&mut b
).await
;
122 // Drop b. We should not wait for a to consume the data buffered in the
123 // copy loop, since b will be failing writes.
125 assert
!(handle
.await
.unwrap().is_err());