]> git.proxmox.com Git - rustc.git/blob - vendor/tokio/tests/io_async_fd.rs
New upstream version 1.60.0+dfsg1
[rustc.git] / vendor / tokio / tests / io_async_fd.rs
1 #![warn(rust_2018_idioms)]
2 #![cfg(all(unix, feature = "full"))]
3
4 use std::os::unix::io::{AsRawFd, RawFd};
5 use std::sync::{
6 atomic::{AtomicBool, Ordering},
7 Arc,
8 };
9 use std::time::Duration;
10 use std::{
11 future::Future,
12 io::{self, ErrorKind, Read, Write},
13 task::{Context, Waker},
14 };
15
16 use nix::unistd::{close, read, write};
17
18 use futures::{poll, FutureExt};
19
20 use tokio::io::unix::{AsyncFd, AsyncFdReadyGuard};
21 use tokio_test::{assert_err, assert_pending};
22
23 struct TestWaker {
24 inner: Arc<TestWakerInner>,
25 waker: Waker,
26 }
27
28 #[derive(Default)]
29 struct TestWakerInner {
30 awoken: AtomicBool,
31 }
32
33 impl futures::task::ArcWake for TestWakerInner {
34 fn wake_by_ref(arc_self: &Arc<Self>) {
35 arc_self.awoken.store(true, Ordering::SeqCst);
36 }
37 }
38
39 impl TestWaker {
40 fn new() -> Self {
41 let inner: Arc<TestWakerInner> = Default::default();
42
43 Self {
44 inner: inner.clone(),
45 waker: futures::task::waker(inner),
46 }
47 }
48
49 fn awoken(&self) -> bool {
50 self.inner.awoken.swap(false, Ordering::SeqCst)
51 }
52
53 fn context(&self) -> Context<'_> {
54 Context::from_waker(&self.waker)
55 }
56 }
57
58 #[derive(Debug)]
59 struct FileDescriptor {
60 fd: RawFd,
61 }
62
63 impl AsRawFd for FileDescriptor {
64 fn as_raw_fd(&self) -> RawFd {
65 self.fd
66 }
67 }
68
69 impl Read for &FileDescriptor {
70 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
71 read(self.fd, buf).map_err(io::Error::from)
72 }
73 }
74
75 impl Read for FileDescriptor {
76 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
77 (self as &Self).read(buf)
78 }
79 }
80
81 impl Write for &FileDescriptor {
82 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
83 write(self.fd, buf).map_err(io::Error::from)
84 }
85
86 fn flush(&mut self) -> io::Result<()> {
87 Ok(())
88 }
89 }
90
91 impl Write for FileDescriptor {
92 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
93 (self as &Self).write(buf)
94 }
95
96 fn flush(&mut self) -> io::Result<()> {
97 (self as &Self).flush()
98 }
99 }
100
101 impl Drop for FileDescriptor {
102 fn drop(&mut self) {
103 let _ = close(self.fd);
104 }
105 }
106
107 fn set_nonblocking(fd: RawFd) {
108 use nix::fcntl::{OFlag, F_GETFL, F_SETFL};
109
110 let flags = nix::fcntl::fcntl(fd, F_GETFL).expect("fcntl(F_GETFD)");
111
112 if flags < 0 {
113 panic!(
114 "bad return value from fcntl(F_GETFL): {} ({:?})",
115 flags,
116 nix::Error::last()
117 );
118 }
119
120 let flags = OFlag::from_bits_truncate(flags) | OFlag::O_NONBLOCK;
121
122 nix::fcntl::fcntl(fd, F_SETFL(flags)).expect("fcntl(F_SETFD)");
123 }
124
125 fn socketpair() -> (FileDescriptor, FileDescriptor) {
126 use nix::sys::socket::{self, AddressFamily, SockFlag, SockType};
127
128 let (fd_a, fd_b) = socket::socketpair(
129 AddressFamily::Unix,
130 SockType::Stream,
131 None,
132 SockFlag::empty(),
133 )
134 .expect("socketpair");
135 let fds = (FileDescriptor { fd: fd_a }, FileDescriptor { fd: fd_b });
136
137 set_nonblocking(fds.0.fd);
138 set_nonblocking(fds.1.fd);
139
140 fds
141 }
142
143 fn drain(mut fd: &FileDescriptor) {
144 let mut buf = [0u8; 512];
145
146 loop {
147 match fd.read(&mut buf[..]) {
148 Err(e) if e.kind() == ErrorKind::WouldBlock => break,
149 Ok(0) => panic!("unexpected EOF"),
150 Err(e) => panic!("unexpected error: {:?}", e),
151 Ok(_) => continue,
152 }
153 }
154 }
155
156 #[tokio::test]
157 async fn initially_writable() {
158 let (a, b) = socketpair();
159
160 let afd_a = AsyncFd::new(a).unwrap();
161 let afd_b = AsyncFd::new(b).unwrap();
162
163 afd_a.writable().await.unwrap().clear_ready();
164 afd_b.writable().await.unwrap().clear_ready();
165
166 futures::select_biased! {
167 _ = tokio::time::sleep(Duration::from_millis(10)).fuse() => {},
168 _ = afd_a.readable().fuse() => panic!("Unexpected readable state"),
169 _ = afd_b.readable().fuse() => panic!("Unexpected readable state"),
170 }
171 }
172
173 #[tokio::test]
174 async fn reset_readable() {
175 let (a, mut b) = socketpair();
176
177 let afd_a = AsyncFd::new(a).unwrap();
178
179 let readable = afd_a.readable();
180 tokio::pin!(readable);
181
182 tokio::select! {
183 _ = readable.as_mut() => panic!(),
184 _ = tokio::time::sleep(Duration::from_millis(10)) => {}
185 }
186
187 b.write_all(b"0").unwrap();
188
189 let mut guard = readable.await.unwrap();
190
191 guard
192 .try_io(|_| afd_a.get_ref().read(&mut [0]))
193 .unwrap()
194 .unwrap();
195
196 // `a` is not readable, but the reactor still thinks it is
197 // (because we have not observed a not-ready error yet)
198 afd_a.readable().await.unwrap().retain_ready();
199
200 // Explicitly clear the ready state
201 guard.clear_ready();
202
203 let readable = afd_a.readable();
204 tokio::pin!(readable);
205
206 tokio::select! {
207 _ = readable.as_mut() => panic!(),
208 _ = tokio::time::sleep(Duration::from_millis(10)) => {}
209 }
210
211 b.write_all(b"0").unwrap();
212
213 // We can observe the new readable event
214 afd_a.readable().await.unwrap().clear_ready();
215 }
216
217 #[tokio::test]
218 async fn reset_writable() {
219 let (a, b) = socketpair();
220
221 let afd_a = AsyncFd::new(a).unwrap();
222
223 let mut guard = afd_a.writable().await.unwrap();
224
225 // Write until we get a WouldBlock. This also clears the ready state.
226 while guard
227 .try_io(|_| afd_a.get_ref().write(&[0; 512][..]))
228 .is_ok()
229 {}
230
231 // Writable state should be cleared now.
232 let writable = afd_a.writable();
233 tokio::pin!(writable);
234
235 tokio::select! {
236 _ = writable.as_mut() => panic!(),
237 _ = tokio::time::sleep(Duration::from_millis(10)) => {}
238 }
239
240 // Read from the other side; we should become writable now.
241 drain(&b);
242
243 let _ = writable.await.unwrap();
244 }
245
246 #[derive(Debug)]
247 struct ArcFd<T>(Arc<T>);
248 impl<T: AsRawFd> AsRawFd for ArcFd<T> {
249 fn as_raw_fd(&self) -> RawFd {
250 self.0.as_raw_fd()
251 }
252 }
253
254 #[tokio::test]
255 async fn drop_closes() {
256 let (a, mut b) = socketpair();
257
258 let afd_a = AsyncFd::new(a).unwrap();
259
260 assert_eq!(
261 ErrorKind::WouldBlock,
262 b.read(&mut [0]).err().unwrap().kind()
263 );
264
265 std::mem::drop(afd_a);
266
267 assert_eq!(0, b.read(&mut [0]).unwrap());
268
269 // into_inner does not close the fd
270
271 let (a, mut b) = socketpair();
272 let afd_a = AsyncFd::new(a).unwrap();
273 let _a: FileDescriptor = afd_a.into_inner();
274
275 assert_eq!(
276 ErrorKind::WouldBlock,
277 b.read(&mut [0]).err().unwrap().kind()
278 );
279
280 // Drop closure behavior is delegated to the inner object
281 let (a, mut b) = socketpair();
282 let arc_fd = Arc::new(a);
283 let afd_a = AsyncFd::new(ArcFd(arc_fd.clone())).unwrap();
284 std::mem::drop(afd_a);
285
286 assert_eq!(
287 ErrorKind::WouldBlock,
288 b.read(&mut [0]).err().unwrap().kind()
289 );
290
291 std::mem::drop(arc_fd); // suppress unnecessary clone clippy warning
292 }
293
294 #[tokio::test]
295 async fn reregister() {
296 let (a, _b) = socketpair();
297
298 let afd_a = AsyncFd::new(a).unwrap();
299 let a = afd_a.into_inner();
300 AsyncFd::new(a).unwrap();
301 }
302
303 #[tokio::test]
304 async fn try_io() {
305 let (a, mut b) = socketpair();
306
307 b.write_all(b"0").unwrap();
308
309 let afd_a = AsyncFd::new(a).unwrap();
310
311 let mut guard = afd_a.readable().await.unwrap();
312
313 afd_a.get_ref().read_exact(&mut [0]).unwrap();
314
315 // Should not clear the readable state
316 let _ = guard.try_io(|_| Ok(()));
317
318 // Still readable...
319 let _ = afd_a.readable().await.unwrap();
320
321 // Should clear the readable state
322 let _ = guard.try_io(|_| io::Result::<()>::Err(ErrorKind::WouldBlock.into()));
323
324 // Assert not readable
325 let readable = afd_a.readable();
326 tokio::pin!(readable);
327
328 tokio::select! {
329 _ = readable.as_mut() => panic!(),
330 _ = tokio::time::sleep(Duration::from_millis(10)) => {}
331 }
332
333 // Write something down b again and make sure we're reawoken
334 b.write_all(b"0").unwrap();
335 let _ = readable.await.unwrap();
336 }
337
338 #[tokio::test]
339 async fn multiple_waiters() {
340 let (a, mut b) = socketpair();
341 let afd_a = Arc::new(AsyncFd::new(a).unwrap());
342
343 let barrier = Arc::new(tokio::sync::Barrier::new(11));
344
345 let mut tasks = Vec::new();
346 for _ in 0..10 {
347 let afd_a = afd_a.clone();
348 let barrier = barrier.clone();
349
350 let f = async move {
351 let notify_barrier = async {
352 barrier.wait().await;
353 futures::future::pending::<()>().await;
354 };
355
356 futures::select_biased! {
357 guard = afd_a.readable().fuse() => {
358 tokio::task::yield_now().await;
359 guard.unwrap().clear_ready()
360 },
361 _ = notify_barrier.fuse() => unreachable!(),
362 }
363
364 std::mem::drop(afd_a);
365 };
366
367 tasks.push(tokio::spawn(f));
368 }
369
370 let mut all_tasks = futures::future::try_join_all(tasks);
371
372 tokio::select! {
373 r = std::pin::Pin::new(&mut all_tasks) => {
374 r.unwrap(); // propagate panic
375 panic!("Tasks exited unexpectedly")
376 },
377 _ = barrier.wait() => {}
378 };
379
380 b.write_all(b"0").unwrap();
381
382 all_tasks.await.unwrap();
383 }
384
385 #[tokio::test]
386 async fn poll_fns() {
387 let (a, b) = socketpair();
388 let afd_a = Arc::new(AsyncFd::new(a).unwrap());
389 let afd_b = Arc::new(AsyncFd::new(b).unwrap());
390
391 // Fill up the write side of A
392 while afd_a.get_ref().write(&[0; 512]).is_ok() {}
393
394 let waker = TestWaker::new();
395
396 assert_pending!(afd_a.as_ref().poll_read_ready(&mut waker.context()));
397
398 let afd_a_2 = afd_a.clone();
399 let r_barrier = Arc::new(tokio::sync::Barrier::new(2));
400 let barrier_clone = r_barrier.clone();
401
402 let read_fut = tokio::spawn(async move {
403 // Move waker onto this task first
404 assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2
405 .as_ref()
406 .poll_read_ready(cx))));
407 barrier_clone.wait().await;
408
409 let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_read_ready(cx)).await;
410 });
411
412 let afd_a_2 = afd_a.clone();
413 let w_barrier = Arc::new(tokio::sync::Barrier::new(2));
414 let barrier_clone = w_barrier.clone();
415
416 let mut write_fut = tokio::spawn(async move {
417 // Move waker onto this task first
418 assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2
419 .as_ref()
420 .poll_write_ready(cx))));
421 barrier_clone.wait().await;
422
423 let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_write_ready(cx)).await;
424 });
425
426 r_barrier.wait().await;
427 w_barrier.wait().await;
428
429 let readable = afd_a.readable();
430 tokio::pin!(readable);
431
432 tokio::select! {
433 _ = &mut readable => unreachable!(),
434 _ = tokio::task::yield_now() => {}
435 }
436
437 // Make A readable. We expect that 'readable' and 'read_fut' will both complete quickly
438 afd_b.get_ref().write_all(b"0").unwrap();
439
440 let _ = tokio::join!(readable, read_fut);
441
442 // Our original waker should _not_ be awoken (poll_read_ready retains only the last context)
443 assert!(!waker.awoken());
444
445 // The writable side should not be awoken
446 tokio::select! {
447 _ = &mut write_fut => unreachable!(),
448 _ = tokio::time::sleep(Duration::from_millis(5)) => {}
449 }
450
451 // Make it writable now
452 drain(afd_b.get_ref());
453
454 // now we should be writable (ie - the waker for poll_write should still be registered after we wake the read side)
455 let _ = write_fut.await;
456 }
457
458 fn assert_pending<T: std::fmt::Debug, F: Future<Output = T>>(f: F) -> std::pin::Pin<Box<F>> {
459 let mut pinned = Box::pin(f);
460
461 assert_pending!(pinned
462 .as_mut()
463 .poll(&mut Context::from_waker(futures::task::noop_waker_ref())));
464
465 pinned
466 }
467
468 fn rt() -> tokio::runtime::Runtime {
469 tokio::runtime::Builder::new_current_thread()
470 .enable_all()
471 .build()
472 .unwrap()
473 }
474
475 #[test]
476 fn driver_shutdown_wakes_currently_pending() {
477 let rt = rt();
478
479 let (a, _b) = socketpair();
480 let afd_a = {
481 let _enter = rt.enter();
482 AsyncFd::new(a).unwrap()
483 };
484
485 let readable = assert_pending(afd_a.readable());
486
487 std::mem::drop(rt);
488
489 // The future was initialized **before** dropping the rt
490 assert_err!(futures::executor::block_on(readable));
491
492 // The future is initialized **after** dropping the rt.
493 assert_err!(futures::executor::block_on(afd_a.readable()));
494 }
495
496 #[test]
497 fn driver_shutdown_wakes_future_pending() {
498 let rt = rt();
499
500 let (a, _b) = socketpair();
501 let afd_a = {
502 let _enter = rt.enter();
503 AsyncFd::new(a).unwrap()
504 };
505
506 std::mem::drop(rt);
507
508 assert_err!(futures::executor::block_on(afd_a.readable()));
509 }
510
511 #[test]
512 fn driver_shutdown_wakes_pending_race() {
513 // TODO: make this a loom test
514 for _ in 0..100 {
515 let rt = rt();
516
517 let (a, _b) = socketpair();
518 let afd_a = {
519 let _enter = rt.enter();
520 AsyncFd::new(a).unwrap()
521 };
522
523 let _ = std::thread::spawn(move || std::mem::drop(rt));
524
525 // This may or may not return an error (but will be awoken)
526 let _ = futures::executor::block_on(afd_a.readable());
527
528 // However retrying will always return an error
529 assert_err!(futures::executor::block_on(afd_a.readable()));
530 }
531 }
532
533 async fn poll_readable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> {
534 futures::future::poll_fn(|cx| fd.poll_read_ready(cx)).await
535 }
536
537 async fn poll_writable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> {
538 futures::future::poll_fn(|cx| fd.poll_write_ready(cx)).await
539 }
540
541 #[test]
542 fn driver_shutdown_wakes_currently_pending_polls() {
543 let rt = rt();
544
545 let (a, _b) = socketpair();
546 let afd_a = {
547 let _enter = rt.enter();
548 AsyncFd::new(a).unwrap()
549 };
550
551 while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable
552
553 let readable = assert_pending(poll_readable(&afd_a));
554 let writable = assert_pending(poll_writable(&afd_a));
555
556 std::mem::drop(rt);
557
558 // Attempting to poll readiness when the rt is dropped is an error
559 assert_err!(futures::executor::block_on(readable));
560 assert_err!(futures::executor::block_on(writable));
561 }
562
563 #[test]
564 fn driver_shutdown_wakes_poll() {
565 let rt = rt();
566
567 let (a, _b) = socketpair();
568 let afd_a = {
569 let _enter = rt.enter();
570 AsyncFd::new(a).unwrap()
571 };
572
573 std::mem::drop(rt);
574
575 assert_err!(futures::executor::block_on(poll_readable(&afd_a)));
576 assert_err!(futures::executor::block_on(poll_writable(&afd_a)));
577 }
578
579 #[test]
580 fn driver_shutdown_wakes_poll_race() {
581 // TODO: make this a loom test
582 for _ in 0..100 {
583 let rt = rt();
584
585 let (a, _b) = socketpair();
586 let afd_a = {
587 let _enter = rt.enter();
588 AsyncFd::new(a).unwrap()
589 };
590
591 while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable
592
593 let _ = std::thread::spawn(move || std::mem::drop(rt));
594
595 // The poll variants will always return an error in this case
596 assert_err!(futures::executor::block_on(poll_readable(&afd_a)));
597 assert_err!(futures::executor::block_on(poll_writable(&afd_a)));
598 }
599 }