1 #![warn(rust_2018_idioms)]
2 #![cfg(all(unix, feature = "full"))]
4 use std
::os
::unix
::io
::{AsRawFd, RawFd}
;
6 atomic
::{AtomicBool, Ordering}
,
9 use std
::time
::Duration
;
12 io
::{self, ErrorKind, Read, Write}
,
13 task
::{Context, Waker}
,
16 use nix
::unistd
::{close, read, write}
;
18 use futures
::{poll, FutureExt}
;
20 use tokio
::io
::unix
::{AsyncFd, AsyncFdReadyGuard}
;
21 use tokio_test
::{assert_err, assert_pending}
;
24 inner
: Arc
<TestWakerInner
>,
29 struct TestWakerInner
{
33 impl futures
::task
::ArcWake
for TestWakerInner
{
34 fn wake_by_ref(arc_self
: &Arc
<Self>) {
35 arc_self
.awoken
.store(true, Ordering
::SeqCst
);
41 let inner
: Arc
<TestWakerInner
> = Default
::default();
45 waker
: futures
::task
::waker(inner
),
49 fn awoken(&self) -> bool
{
50 self.inner
.awoken
.swap(false, Ordering
::SeqCst
)
53 fn context(&self) -> Context
<'_
> {
54 Context
::from_waker(&self.waker
)
59 struct FileDescriptor
{
63 impl AsRawFd
for FileDescriptor
{
64 fn as_raw_fd(&self) -> RawFd
{
69 impl Read
for &FileDescriptor
{
70 fn read(&mut self, buf
: &mut [u8]) -> io
::Result
<usize> {
71 read(self.fd
, buf
).map_err(io
::Error
::from
)
75 impl Read
for FileDescriptor
{
76 fn read(&mut self, buf
: &mut [u8]) -> io
::Result
<usize> {
77 (self as &Self).read(buf
)
81 impl Write
for &FileDescriptor
{
82 fn write(&mut self, buf
: &[u8]) -> io
::Result
<usize> {
83 write(self.fd
, buf
).map_err(io
::Error
::from
)
86 fn flush(&mut self) -> io
::Result
<()> {
91 impl Write
for FileDescriptor
{
92 fn write(&mut self, buf
: &[u8]) -> io
::Result
<usize> {
93 (self as &Self).write(buf
)
96 fn flush(&mut self) -> io
::Result
<()> {
97 (self as &Self).flush()
101 impl Drop
for FileDescriptor
{
103 let _
= close(self.fd
);
107 fn set_nonblocking(fd
: RawFd
) {
108 use nix
::fcntl
::{OFlag, F_GETFL, F_SETFL}
;
110 let flags
= nix
::fcntl
::fcntl(fd
, F_GETFL
).expect("fcntl(F_GETFD)");
114 "bad return value from fcntl(F_GETFL): {} ({:?})",
120 let flags
= OFlag
::from_bits_truncate(flags
) | OFlag
::O_NONBLOCK
;
122 nix
::fcntl
::fcntl(fd
, F_SETFL(flags
)).expect("fcntl(F_SETFD)");
125 fn socketpair() -> (FileDescriptor
, FileDescriptor
) {
126 use nix
::sys
::socket
::{self, AddressFamily, SockFlag, SockType}
;
128 let (fd_a
, fd_b
) = socket
::socketpair(
134 .expect("socketpair");
135 let fds
= (FileDescriptor { fd: fd_a }
, FileDescriptor { fd: fd_b }
);
137 set_nonblocking(fds
.0.fd
);
138 set_nonblocking(fds
.1.fd
);
143 fn drain(mut fd
: &FileDescriptor
) {
144 let mut buf
= [0u8; 512];
147 match fd
.read(&mut buf
[..]) {
148 Err(e
) if e
.kind() == ErrorKind
::WouldBlock
=> break,
149 Ok(0) => panic
!("unexpected EOF"),
150 Err(e
) => panic
!("unexpected error: {:?}", e
),
157 async
fn initially_writable() {
158 let (a
, b
) = socketpair();
160 let afd_a
= AsyncFd
::new(a
).unwrap();
161 let afd_b
= AsyncFd
::new(b
).unwrap();
163 afd_a
.writable().await
.unwrap().clear_ready();
164 afd_b
.writable().await
.unwrap().clear_ready();
166 futures
::select_biased
! {
167 _
= tokio
::time
::sleep(Duration
::from_millis(10)).fuse() => {}
,
168 _
= afd_a
.readable().fuse() => panic
!("Unexpected readable state"),
169 _
= afd_b
.readable().fuse() => panic
!("Unexpected readable state"),
174 async
fn reset_readable() {
175 let (a
, mut b
) = socketpair();
177 let afd_a
= AsyncFd
::new(a
).unwrap();
179 let readable
= afd_a
.readable();
180 tokio
::pin
!(readable
);
183 _
= readable
.as_mut() => panic
!(),
184 _
= tokio
::time
::sleep(Duration
::from_millis(10)) => {}
187 b
.write_all(b
"0").unwrap();
189 let mut guard
= readable
.await
.unwrap();
192 .try_io(|_
| afd_a
.get_ref().read(&mut [0]))
196 // `a` is not readable, but the reactor still thinks it is
197 // (because we have not observed a not-ready error yet)
198 afd_a
.readable().await
.unwrap().retain_ready();
200 // Explicitly clear the ready state
203 let readable
= afd_a
.readable();
204 tokio
::pin
!(readable
);
207 _
= readable
.as_mut() => panic
!(),
208 _
= tokio
::time
::sleep(Duration
::from_millis(10)) => {}
211 b
.write_all(b
"0").unwrap();
213 // We can observe the new readable event
214 afd_a
.readable().await
.unwrap().clear_ready();
218 async
fn reset_writable() {
219 let (a
, b
) = socketpair();
221 let afd_a
= AsyncFd
::new(a
).unwrap();
223 let mut guard
= afd_a
.writable().await
.unwrap();
225 // Write until we get a WouldBlock. This also clears the ready state.
227 .try_io(|_
| afd_a
.get_ref().write(&[0; 512][..]))
231 // Writable state should be cleared now.
232 let writable
= afd_a
.writable();
233 tokio
::pin
!(writable
);
236 _
= writable
.as_mut() => panic
!(),
237 _
= tokio
::time
::sleep(Duration
::from_millis(10)) => {}
240 // Read from the other side; we should become writable now.
243 let _
= writable
.await
.unwrap();
247 struct ArcFd
<T
>(Arc
<T
>);
248 impl<T
: AsRawFd
> AsRawFd
for ArcFd
<T
> {
249 fn as_raw_fd(&self) -> RawFd
{
255 async
fn drop_closes() {
256 let (a
, mut b
) = socketpair();
258 let afd_a
= AsyncFd
::new(a
).unwrap();
261 ErrorKind
::WouldBlock
,
262 b
.read(&mut [0]).err().unwrap().kind()
265 std
::mem
::drop(afd_a
);
267 assert_eq
!(0, b
.read(&mut [0]).unwrap());
269 // into_inner does not close the fd
271 let (a
, mut b
) = socketpair();
272 let afd_a
= AsyncFd
::new(a
).unwrap();
273 let _a
: FileDescriptor
= afd_a
.into_inner();
276 ErrorKind
::WouldBlock
,
277 b
.read(&mut [0]).err().unwrap().kind()
280 // Drop closure behavior is delegated to the inner object
281 let (a
, mut b
) = socketpair();
282 let arc_fd
= Arc
::new(a
);
283 let afd_a
= AsyncFd
::new(ArcFd(arc_fd
.clone())).unwrap();
284 std
::mem
::drop(afd_a
);
287 ErrorKind
::WouldBlock
,
288 b
.read(&mut [0]).err().unwrap().kind()
291 std
::mem
::drop(arc_fd
); // suppress unnecessary clone clippy warning
295 async
fn reregister() {
296 let (a
, _b
) = socketpair();
298 let afd_a
= AsyncFd
::new(a
).unwrap();
299 let a
= afd_a
.into_inner();
300 AsyncFd
::new(a
).unwrap();
305 let (a
, mut b
) = socketpair();
307 b
.write_all(b
"0").unwrap();
309 let afd_a
= AsyncFd
::new(a
).unwrap();
311 let mut guard
= afd_a
.readable().await
.unwrap();
313 afd_a
.get_ref().read_exact(&mut [0]).unwrap();
315 // Should not clear the readable state
316 let _
= guard
.try_io(|_
| Ok(()));
319 let _
= afd_a
.readable().await
.unwrap();
321 // Should clear the readable state
322 let _
= guard
.try_io(|_
| io
::Result
::<()>::Err(ErrorKind
::WouldBlock
.into()));
324 // Assert not readable
325 let readable
= afd_a
.readable();
326 tokio
::pin
!(readable
);
329 _
= readable
.as_mut() => panic
!(),
330 _
= tokio
::time
::sleep(Duration
::from_millis(10)) => {}
333 // Write something down b again and make sure we're reawoken
334 b
.write_all(b
"0").unwrap();
335 let _
= readable
.await
.unwrap();
339 async
fn multiple_waiters() {
340 let (a
, mut b
) = socketpair();
341 let afd_a
= Arc
::new(AsyncFd
::new(a
).unwrap());
343 let barrier
= Arc
::new(tokio
::sync
::Barrier
::new(11));
345 let mut tasks
= Vec
::new();
347 let afd_a
= afd_a
.clone();
348 let barrier
= barrier
.clone();
351 let notify_barrier
= async
{
352 barrier
.wait().await
;
353 futures
::future
::pending
::<()>().await
;
356 futures
::select_biased
! {
357 guard
= afd_a
.readable().fuse() => {
358 tokio
::task
::yield_now().await
;
359 guard
.unwrap().clear_ready()
361 _
= notify_barrier
.fuse() => unreachable
!(),
364 std
::mem
::drop(afd_a
);
367 tasks
.push(tokio
::spawn(f
));
370 let mut all_tasks
= futures
::future
::try_join_all(tasks
);
373 r
= std
::pin
::Pin
::new(&mut all_tasks
) => {
374 r
.unwrap(); // propagate panic
375 panic
!("Tasks exited unexpectedly")
377 _
= barrier
.wait() => {}
380 b
.write_all(b
"0").unwrap();
382 all_tasks
.await
.unwrap();
386 async
fn poll_fns() {
387 let (a
, b
) = socketpair();
388 let afd_a
= Arc
::new(AsyncFd
::new(a
).unwrap());
389 let afd_b
= Arc
::new(AsyncFd
::new(b
).unwrap());
391 // Fill up the write side of A
392 while afd_a
.get_ref().write(&[0; 512]).is_ok() {}
394 let waker
= TestWaker
::new();
396 assert_pending
!(afd_a
.as_ref().poll_read_ready(&mut waker
.context()));
398 let afd_a_2
= afd_a
.clone();
399 let r_barrier
= Arc
::new(tokio
::sync
::Barrier
::new(2));
400 let barrier_clone
= r_barrier
.clone();
402 let read_fut
= tokio
::spawn(async
move {
403 // Move waker onto this task first
404 assert_pending
!(poll
!(futures
::future
::poll_fn(|cx
| afd_a_2
406 .poll_read_ready(cx
))));
407 barrier_clone
.wait().await
;
409 let _
= futures
::future
::poll_fn(|cx
| afd_a_2
.as_ref().poll_read_ready(cx
)).await
;
412 let afd_a_2
= afd_a
.clone();
413 let w_barrier
= Arc
::new(tokio
::sync
::Barrier
::new(2));
414 let barrier_clone
= w_barrier
.clone();
416 let mut write_fut
= tokio
::spawn(async
move {
417 // Move waker onto this task first
418 assert_pending
!(poll
!(futures
::future
::poll_fn(|cx
| afd_a_2
420 .poll_write_ready(cx
))));
421 barrier_clone
.wait().await
;
423 let _
= futures
::future
::poll_fn(|cx
| afd_a_2
.as_ref().poll_write_ready(cx
)).await
;
426 r_barrier
.wait().await
;
427 w_barrier
.wait().await
;
429 let readable
= afd_a
.readable();
430 tokio
::pin
!(readable
);
433 _
= &mut readable
=> unreachable
!(),
434 _
= tokio
::task
::yield_now() => {}
437 // Make A readable. We expect that 'readable' and 'read_fut' will both complete quickly
438 afd_b
.get_ref().write_all(b
"0").unwrap();
440 let _
= tokio
::join
!(readable
, read_fut
);
442 // Our original waker should _not_ be awoken (poll_read_ready retains only the last context)
443 assert
!(!waker
.awoken());
445 // The writable side should not be awoken
447 _
= &mut write_fut
=> unreachable
!(),
448 _
= tokio
::time
::sleep(Duration
::from_millis(5)) => {}
451 // Make it writable now
452 drain(afd_b
.get_ref());
454 // now we should be writable (ie - the waker for poll_write should still be registered after we wake the read side)
455 let _
= write_fut
.await
;
458 fn assert_pending
<T
: std
::fmt
::Debug
, F
: Future
<Output
= T
>>(f
: F
) -> std
::pin
::Pin
<Box
<F
>> {
459 let mut pinned
= Box
::pin(f
);
461 assert_pending
!(pinned
463 .poll(&mut Context
::from_waker(futures
::task
::noop_waker_ref())));
468 fn rt() -> tokio
::runtime
::Runtime
{
469 tokio
::runtime
::Builder
::new_current_thread()
476 fn driver_shutdown_wakes_currently_pending() {
479 let (a
, _b
) = socketpair();
481 let _enter
= rt
.enter();
482 AsyncFd
::new(a
).unwrap()
485 let readable
= assert_pending(afd_a
.readable());
489 // The future was initialized **before** dropping the rt
490 assert_err
!(futures
::executor
::block_on(readable
));
492 // The future is initialized **after** dropping the rt.
493 assert_err
!(futures
::executor
::block_on(afd_a
.readable()));
497 fn driver_shutdown_wakes_future_pending() {
500 let (a
, _b
) = socketpair();
502 let _enter
= rt
.enter();
503 AsyncFd
::new(a
).unwrap()
508 assert_err
!(futures
::executor
::block_on(afd_a
.readable()));
512 fn driver_shutdown_wakes_pending_race() {
513 // TODO: make this a loom test
517 let (a
, _b
) = socketpair();
519 let _enter
= rt
.enter();
520 AsyncFd
::new(a
).unwrap()
523 let _
= std
::thread
::spawn(move || std
::mem
::drop(rt
));
525 // This may or may not return an error (but will be awoken)
526 let _
= futures
::executor
::block_on(afd_a
.readable());
528 // However retrying will always return an error
529 assert_err
!(futures
::executor
::block_on(afd_a
.readable()));
533 async
fn poll_readable
<T
: AsRawFd
>(fd
: &AsyncFd
<T
>) -> std
::io
::Result
<AsyncFdReadyGuard
<'_
, T
>> {
534 futures
::future
::poll_fn(|cx
| fd
.poll_read_ready(cx
)).await
537 async
fn poll_writable
<T
: AsRawFd
>(fd
: &AsyncFd
<T
>) -> std
::io
::Result
<AsyncFdReadyGuard
<'_
, T
>> {
538 futures
::future
::poll_fn(|cx
| fd
.poll_write_ready(cx
)).await
542 fn driver_shutdown_wakes_currently_pending_polls() {
545 let (a
, _b
) = socketpair();
547 let _enter
= rt
.enter();
548 AsyncFd
::new(a
).unwrap()
551 while afd_a
.get_ref().write(&[0; 512]).is_ok() {}
// make not writable
553 let readable
= assert_pending(poll_readable(&afd_a
));
554 let writable
= assert_pending(poll_writable(&afd_a
));
558 // Attempting to poll readiness when the rt is dropped is an error
559 assert_err
!(futures
::executor
::block_on(readable
));
560 assert_err
!(futures
::executor
::block_on(writable
));
564 fn driver_shutdown_wakes_poll() {
567 let (a
, _b
) = socketpair();
569 let _enter
= rt
.enter();
570 AsyncFd
::new(a
).unwrap()
575 assert_err
!(futures
::executor
::block_on(poll_readable(&afd_a
)));
576 assert_err
!(futures
::executor
::block_on(poll_writable(&afd_a
)));
580 fn driver_shutdown_wakes_poll_race() {
581 // TODO: make this a loom test
585 let (a
, _b
) = socketpair();
587 let _enter
= rt
.enter();
588 AsyncFd
::new(a
).unwrap()
591 while afd_a
.get_ref().write(&[0; 512]).is_ok() {}
// make not writable
593 let _
= std
::thread
::spawn(move || std
::mem
::drop(rt
));
595 // The poll variants will always return an error in this case
596 assert_err
!(futures
::executor
::block_on(poll_readable(&afd_a
)));
597 assert_err
!(futures
::executor
::block_on(poll_writable(&afd_a
)));