]> git.proxmox.com Git - pve-lxc-syscalld.git/commitdiff
switch to io_uring
authorWolfgang Bumiller <w.bumiller@proxmox.com>
Thu, 17 Oct 2019 10:30:02 +0000 (12:30 +0200)
committerWolfgang Bumiller <w.bumiller@proxmox.com>
Thu, 17 Oct 2019 12:08:40 +0000 (14:08 +0200)
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
Cargo.toml
rust-toolchain
src/capability.rs
src/client.rs
src/fork.rs
src/lxcseccomp.rs
src/main.rs
src/nsfd.rs
src/pidfd.rs
src/socket.rs [deleted file]
src/tools.rs

index d33f076658dcd363ffcb3b8f9cc14b9174e468f6..4e5ff6b454c0fe223de14c421618d9fa8dcfe7f0 100644 (file)
@@ -10,10 +10,12 @@ authors = [
 bitflags = "1.1"
 errno = "0.2"
 failure = "0.1"
+futures-preview = "0.3.0-alpha"
 lazy_static = "1.3"
 libc = "0.2"
-mio = "0.6"
-nix = "0.14"
-futures-preview = "0.3.0-alpha"
-tokio = "0.2.0-alpha.6"
-tokio-net = "0.2.0-alpha.6"
+nix = "0.15"
+
+[dependencies.io-uring]
+version = "0.1"
+path = "../io-uring"
+features = ["runtime"]
index bf867e0ae5b6c08df1118a2ece970677bc479f1b..65b2df87f7df3aeedef04be96703e55ac19c2cfb 100644 (file)
@@ -1 +1 @@
-nightly
+beta
index 301ee2e5b8a45c02c837d3206035171e3f2df692..6b0c4168427f3c33d2a8808ae0f049c6793a78ac 100644 (file)
@@ -1,8 +1,6 @@
 use std::io;
 use std::os::raw::c_ulong;
 
-use crate::{c_call, io_format_err};
-
 bitflags::bitflags! {
     pub struct SecureBits: c_ulong {
         const NOROOT                        = 0b0_0000_0001;
index 268083c76b335e094b5e5da2dc78506c15ed51b0..f3a0e9a009bcc36ff05db6e284c53763749c9ad4 100644 (file)
@@ -4,15 +4,15 @@ use failure::Error;
 use nix::errno::Errno;
 
 use crate::lxcseccomp::ProxyMessageBuffer;
-use crate::socket::AsyncSeqPacketSocket;
 use crate::syscall::{self, Syscall, SyscallStatus};
+use io_uring::socket::SeqPacketSocket;
 
 pub struct Client {
-    socket: AsyncSeqPacketSocket,
+    socket: SeqPacketSocket,
 }
 
 impl Client {
-    pub fn new(socket: AsyncSeqPacketSocket) -> Arc<Self> {
+    pub fn new(socket: SeqPacketSocket) -> Arc<Self> {
         Arc::new(Self { socket })
     }
 
@@ -44,7 +44,7 @@ impl Client {
 
             // Note: our spawned tasks here must not access our socket, as we cannot guarantee
             // they'll be woken up if another task errors into `wrap_error()`.
-            tokio::spawn(self.clone().wrap_error(self.clone().__handle_syscall(msg)));
+            crate::spawn(self.clone().wrap_error(self.clone().__handle_syscall(msg)));
         }
     }
 
index 590ebab1a3f3b5ada37173b500c40412d5b3eb81..732cce7e03e3d5ed1dc8415ae15e7dba8675b2cc 100644 (file)
@@ -8,14 +8,10 @@ use std::io;
 use std::os::raw::c_int;
 use std::os::unix::io::{FromRawFd, IntoRawFd};
 use std::panic::UnwindSafe;
-use std::pin::Pin;
-use std::task::{Context, Poll};
 
-use tokio::io::{AsyncRead, AsyncReadExt};
+use io_uring::socket::pipe::{self, Pipe};
 
 use crate::syscall::SyscallStatus;
-use crate::tools::Fd;
-use crate::{c_call, c_try};
 
 pub async fn forking_syscall<F>(func: F) -> io::Result<SyscallStatus>
 where
@@ -31,7 +27,7 @@ pub struct Fork {
     pid: Option<libc::pid_t>,
     // FIXME: abuse! tokio-fs is not updated to futures@0.3 yet, but a TcpStream does the same
     // thing as a file when it's already open anyway...
-    out: crate::tools::GenericStream,
+    out: Pipe<pipe::Read>,
 }
 
 impl Drop for Fork {
@@ -54,13 +50,11 @@ impl Fork {
     where
         F: FnOnce() -> io::Result<SyscallStatus> + UnwindSafe,
     {
-        let mut pipe: [c_int; 2] = [0, 0];
-        c_try!(unsafe { libc::pipe2(pipe.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) });
-        let (pipe_r, pipe_w) = (Fd(pipe[0]), Fd(pipe[1]));
+        let (pipe_r, pipe_w) = pipe::pipe_default()?;
 
         let pid = c_try!(unsafe { libc::fork() });
         if pid == 0 {
-            std::mem::drop(pipe_r);
+            drop(pipe_r);
             let mut pipe_w = unsafe { std::fs::File::from_raw_fd(pipe_w.into_raw_fd()) };
 
             let _ = std::panic::catch_unwind(move || {
@@ -99,11 +93,7 @@ impl Fork {
                 libc::_exit(-1);
             }
         }
-
-        let pipe_r = match crate::tools::GenericStream::from_fd(pipe_r) {
-            Ok(o) => o,
-            Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err.to_string())),
-        };
+        drop(pipe_w);
 
         Ok(Self {
             pid: Some(pid),
@@ -143,7 +133,7 @@ impl Fork {
                 std::mem::size_of::<Data>(),
             )
         };
-        self.read_exact(dataslice).await?;
+        self.out.read_exact(dataslice).await?;
         //self.read_exact(unsafe {
         //    std::slice::from_raw_parts_mut(
         //        &mut data as *mut Data as *mut u8,
@@ -160,18 +150,3 @@ impl Fork {
         }
     }
 }
-
-// default impl will work
-impl AsyncRead for Fork {
-    fn poll_read(
-        self: Pin<&mut Self>,
-        cx: &mut Context,
-        buf: &mut [u8],
-    ) -> Poll<io::Result<usize>> {
-        unsafe { self.map_unchecked_mut(|this| &mut this.out) }.poll_read(cx, buf)
-    }
-
-    unsafe fn prepare_uninitialized_buffer(&self, _buf: &mut [u8]) -> bool {
-        false
-    }
-}
index c1c14970fac03310170ec9fe9dc104590a400935..7d822c31f0b7a835f0e59f4c1abf3ef560bafca5 100644 (file)
@@ -8,13 +8,13 @@ use std::os::unix::io::RawFd;
 use std::{io, mem};
 
 use failure::{bail, format_err, Error};
+use io_uring::socket::SeqPacketSocket;
 use lazy_static::lazy_static;
 use libc::pid_t;
 use nix::errno::Errno;
 
 use crate::pidfd::PidFd;
 use crate::seccomp::{SeccompNotif, SeccompNotifResp, SeccompNotifSizes};
-use crate::socket::AsyncSeqPacketSocket;
 use crate::tools::{Fd, FromFd, IoVec, IoVecMut};
 
 /// Seccomp notification proxy message sent by the lxc monitor.
@@ -114,7 +114,7 @@ impl ProxyMessageBuffer {
     }
 
     /// Returns None on EOF.
-    pub async fn recv(&mut self, socket: &AsyncSeqPacketSocket) -> Result<bool, Error> {
+    pub async fn recv(&mut self, socket: &SeqPacketSocket) -> Result<bool, Error> {
         self.reset();
 
         unsafe {
@@ -174,13 +174,17 @@ impl ProxyMessageBuffer {
     }
 
     /// Send the current data as response.
-    pub async fn respond(&mut self, socket: &AsyncSeqPacketSocket) -> io::Result<()> {
+    pub async fn respond(&mut self, socket: &SeqPacketSocket) -> io::Result<()> {
         let iov = [
             unsafe { io_vec(&self.proxy_msg) },
             unsafe { io_vec(&self.seccomp_notif) },
             unsafe { io_vec(&self.seccomp_resp) },
         ];
-        socket.sendmsg_vectored(&iov).await
+        let len = iov.iter().map(|e| e.len()).sum();
+        if socket.sendmsg_vectored(&iov, &[]).await? != len {
+            io_bail!("truncated message?");
+        }
+        Ok(())
     }
 
     #[inline]
index c993e05b0a616754d55b3dfdc1c09143042739e4..5e26a747a96c170a9a021ee6fa6888fe88bcf17e 100644 (file)
@@ -1,8 +1,12 @@
+use std::future::Future;
 use std::io;
 
 use failure::{bail, format_err, Error};
 use nix::sys::socket::SockAddr;
 
+#[macro_use]
+mod macros;
+
 pub mod apparmor;
 pub mod capability;
 pub mod client;
@@ -11,17 +15,31 @@ pub mod lxcseccomp;
 pub mod nsfd;
 pub mod pidfd;
 pub mod seccomp;
-pub mod socket;
 pub mod sys_mknod;
 pub mod sys_quotactl;
 pub mod syscall;
 pub mod tools;
 
-use socket::SeqPacketListener;
+use io_uring::socket::SeqPacketListener;
+
+static mut EXECUTOR: *mut futures::executor::ThreadPool = std::ptr::null_mut();
+
+pub fn executor() -> &'static futures::executor::ThreadPool {
+    unsafe { &*EXECUTOR }
+}
+
+pub fn spawn(fut: impl Future<Output = ()> + Send + 'static) {
+    executor().spawn_ok(fut)
+}
+
+fn main() {
+    let mut executor = futures::executor::ThreadPool::new().expect("spawning worker threadpool");
+    unsafe {
+        EXECUTOR = &mut executor;
+    }
+    std::sync::atomic::fence(std::sync::atomic::Ordering::Release);
 
-#[tokio::main]
-async fn main() {
-    if let Err(err) = do_main().await {
+    if let Err(err) = executor.run(do_main()) {
         eprintln!("error: {}", err);
         std::process::exit(1);
     }
@@ -41,11 +59,11 @@ async fn do_main() -> Result<(), Error> {
     let address =
         SockAddr::new_unix(socket_path.as_os_str()).expect("cannot create struct sockaddr_un?");
 
-    let mut listener = SeqPacketListener::bind(&address)
+    let mut listener = SeqPacketListener::bind_default(&address)
         .map_err(|e| format_err!("failed to create listening socket: {}", e))?;
     loop {
         let client = listener.accept().await?;
         let client = client::Client::new(client);
-        tokio::spawn(client.main());
+        spawn(client.main());
     }
 }
index 3a772b7bf6596be32e0012f207055788c97e3bb1..4a3c7dc83f28244c82b97112107ae3831e2f77d1 100644 (file)
@@ -4,8 +4,6 @@ use std::marker::PhantomData;
 use std::os::raw::c_int;
 use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
 
-use crate::{c_try, file_descriptor_type};
-
 pub mod ns_type {
     pub trait NsType {
         const TYPE: libc::c_int;
index 7d585d06eef78ee7c8b771c9d7492e503f1d9256..e2b8bcb963b7c5b97fe6d99c91502a2b9a766389 100644 (file)
@@ -10,12 +10,11 @@ use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
 use failure::{bail, Error};
 use libc::pid_t;
 
-use crate::c_try;
 use crate::nsfd::{ns_type, NsFd};
 use crate::tools::Fd;
 
 pub struct PidFd(RawFd, pid_t);
-crate::file_descriptor_impl!(PidFd);
+file_descriptor_impl!(PidFd);
 
 #[derive(Default)]
 pub struct Uids {
diff --git a/src/socket.rs b/src/socket.rs
deleted file mode 100644 (file)
index 5b66c60..0000000
+++ /dev/null
@@ -1,267 +0,0 @@
-use std::convert::TryFrom;
-use std::os::raw::c_void;
-use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
-use std::task::Context;
-use std::task::Poll;
-use std::{io, mem, ptr};
-
-use failure::{bail, Error};
-use futures::future::poll_fn;
-use futures::ready;
-use nix::sys::socket::{AddressFamily, SockAddr, SockFlag, SockType};
-
-use crate::tools::{vec, Fd, IoVec, IoVecMut};
-
-pub struct SeqPacketSocket(Fd);
-
-impl FromRawFd for SeqPacketSocket {
-    unsafe fn from_raw_fd(fd: RawFd) -> Self {
-        Self(Fd(fd))
-    }
-}
-
-impl SeqPacketSocket {
-    fn fd(&self) -> RawFd {
-        (self.0).0
-    }
-
-    pub fn recv_fds_vectored(
-        &self,
-        iov: &mut [IoVecMut],
-        num_fds: usize,
-    ) -> io::Result<(usize, Vec<Fd>)> {
-        let fdlist_size = u32::try_from(mem::size_of::<RawFd>() * num_fds)
-            .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("size error: {}", e)))?;
-
-        let mut cmsgbuf = unsafe { vec::uninitialized(libc::CMSG_SPACE(fdlist_size) as usize) };
-        unsafe {
-            ptr::write_bytes(cmsgbuf.as_mut_ptr(), 0xff, cmsgbuf.len());
-        }
-
-        let mut msg: libc::msghdr = unsafe { mem::zeroed() };
-        msg.msg_iov = iov.as_mut_ptr() as *mut _ as *mut libc::iovec;
-        msg.msg_iovlen = iov.len();
-        msg.msg_controllen = cmsgbuf.len();
-        msg.msg_control = cmsgbuf.as_mut_ptr() as *mut c_void;
-        let _ = &cmsgbuf; // from now on we only use raw pointer stuff
-
-        let received = unsafe { libc::recvmsg(self.fd(), &mut msg, libc::MSG_CMSG_CLOEXEC) };
-        if received < 0 {
-            return Err(io::Error::last_os_error());
-        }
-
-        let mut out_fds = Vec::with_capacity(num_fds);
-        let mut cmsg_ptr = unsafe { libc::CMSG_FIRSTHDR(&msg) };
-        while !cmsg_ptr.is_null() {
-            let cmsg: &libc::cmsghdr = unsafe { &*cmsg_ptr };
-            if cmsg.cmsg_type == libc::SCM_RIGHTS
-                && cmsg.cmsg_len == unsafe { libc::CMSG_LEN(fdlist_size) as usize }
-                && cmsg.cmsg_level == libc::SOL_SOCKET
-            {
-                unsafe {
-                    #[allow(clippy::cast_ptr_alignment)]
-                    let mut data_ptr = libc::CMSG_DATA(cmsg_ptr) as *const RawFd;
-                    for _ in 0..num_fds {
-                        out_fds.push(Fd(ptr::read_unaligned(data_ptr)));
-                        data_ptr = data_ptr.add(1);
-                    }
-                }
-                break;
-            }
-            cmsg_ptr = unsafe { libc::CMSG_NXTHDR(&msg, cmsg_ptr) };
-        }
-
-        Ok((received as usize, out_fds))
-    }
-
-    /// Send a message via `sendmsg(2)`.
-    ///
-    /// Note that short writes are silently treated as success, since this is a `SOCK_SEQPACKET`,
-    /// so neither continuing nor repeating a partial messages makes all that much sense.
-    pub fn sendmsg_vectored(&self, iov: &[IoVec]) -> io::Result<()> {
-        let mut msg: libc::msghdr = unsafe { mem::zeroed() };
-        msg.msg_iov = iov.as_ptr() as *const libc::iovec as *mut libc::iovec;
-        msg.msg_iovlen = iov.len();
-
-        let sent = unsafe { libc::sendmsg(self.fd(), &msg, libc::MSG_NOSIGNAL) };
-        if sent < 0 {
-            return Err(io::Error::last_os_error());
-        }
-
-        // XXX: what to do with short writes? we're a SEQPACKET socket...
-
-        Ok(())
-    }
-
-    fn as_fd(&self) -> &Fd {
-        &self.0
-    }
-
-    /// Shutdown parts of the socket.
-    #[inline]
-    pub fn shutdown(&self, how: nix::sys::socket::Shutdown) -> nix::Result<()> {
-        nix::sys::socket::shutdown(self.as_raw_fd(), how)
-    }
-}
-
-impl AsRawFd for SeqPacketSocket {
-    fn as_raw_fd(&self) -> RawFd {
-        self.fd()
-    }
-}
-
-pub struct SeqPacketListener {
-    fd: Fd,
-    registration: tokio_net::driver::Registration,
-}
-
-impl Drop for SeqPacketListener {
-    fn drop(&mut self) {
-        if let Err(err) = self.registration.deregister(&self.fd) {
-            eprintln!("failed to deregister I/O resource with reactor: {}", err);
-        }
-    }
-}
-
-impl SeqPacketListener {
-    pub fn bind(address: &SockAddr) -> Result<Self, Error> {
-        let fd = Fd(nix::sys::socket::socket(
-            AddressFamily::Unix,
-            SockType::SeqPacket,
-            SockFlag::SOCK_CLOEXEC | SockFlag::SOCK_NONBLOCK,
-            None,
-        )?);
-
-        nix::sys::socket::bind(fd.as_raw_fd(), &address)?;
-        nix::sys::socket::listen(fd.as_raw_fd(), 16)?;
-
-        let registration = tokio_net::driver::Registration::new();
-        if !registration.register(&fd)? {
-            bail!("duplicate file descriptor registration?");
-        }
-
-        Ok(Self { fd, registration })
-    }
-
-    pub fn poll_accept(&mut self, cx: &mut Context) -> Poll<io::Result<AsyncSeqPacketSocket>> {
-        let fd = loop {
-            match nix::sys::socket::accept4(
-                self.fd.as_raw_fd(),
-                SockFlag::SOCK_CLOEXEC | SockFlag::SOCK_NONBLOCK,
-            ) {
-                Ok(fd) => break Fd(fd),
-                Err(err) => match err.as_errno() {
-                    Some(nix::errno::Errno::EAGAIN) => {
-                        match ready!(self.registration.poll_read_ready(cx)) {
-                            Ok(_) => continue,
-                            Err(err) => return Poll::Ready(Err(err)),
-                        }
-                    }
-                    Some(other) => {
-                        return Poll::Ready(Err(io::Error::from_raw_os_error(other as _)));
-                    }
-                    None => {
-                        return Poll::Ready(Err(io::Error::new(
-                            io::ErrorKind::Other,
-                            "unexpected non-OS error in nix::sys::socket::accept4()",
-                        )));
-                    }
-                },
-            };
-        };
-
-        Poll::Ready(match AsyncSeqPacketSocket::new(fd) {
-            Ok(c) => Ok(c),
-            Err(err) => Err(io::Error::new(io::ErrorKind::Other, err.to_string())),
-        })
-    }
-
-    pub async fn accept(&mut self) -> io::Result<AsyncSeqPacketSocket> {
-        poll_fn(|cx| self.poll_accept(cx)).await
-    }
-}
-
-// Do I care about having it as a stream?
-//#[must_use = "streams do nothing unless polled"]
-//pub struct SeqPacketIncoming {
-//}
-
-pub struct AsyncSeqPacketSocket {
-    socket: SeqPacketSocket,
-    registration: tokio_net::driver::Registration,
-}
-
-impl Drop for AsyncSeqPacketSocket {
-    fn drop(&mut self) {
-        if let Err(err) = self.registration.deregister(self.socket.as_fd()) {
-            eprintln!("failed to deregister I/O resource with reactor: {}", err);
-        }
-    }
-}
-
-impl AsyncSeqPacketSocket {
-    pub fn new(fd: Fd) -> Result<Self, Error> {
-        let registration = tokio_net::driver::Registration::new();
-        if !registration.register(&fd)? {
-            bail!("duplicate file descriptor registration?");
-        }
-
-        Ok(Self {
-            socket: unsafe { SeqPacketSocket::from_raw_fd(fd.into_raw_fd()) },
-            registration,
-        })
-    }
-
-    /// Shutdown parts of the socket.
-    #[inline]
-    pub fn shutdown(&self, how: nix::sys::socket::Shutdown) -> nix::Result<()> {
-        self.socket.shutdown(how)
-    }
-
-    pub fn poll_recv_fds_vectored(
-        &self,
-        iov: &mut [IoVecMut],
-        num_fds: usize,
-        cx: &mut Context,
-    ) -> Poll<io::Result<(usize, Vec<Fd>)>> {
-        loop {
-            match self.socket.recv_fds_vectored(iov, num_fds) {
-                Ok(res) => break Poll::Ready(Ok(res)),
-                Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
-                    match ready!(self.registration.poll_read_ready(cx)) {
-                        Ok(_) => continue,
-                        Err(err) => break Poll::Ready(Err(err)),
-                    }
-                }
-                Err(err) => break Poll::Ready(Err(err)),
-            }
-        }
-    }
-
-    pub async fn recv_fds_vectored(
-        &self,
-        iov: &mut [IoVecMut<'_>],
-        num_fds: usize,
-    ) -> io::Result<(usize, Vec<Fd>)> {
-        poll_fn(move |cx| self.poll_recv_fds_vectored(iov, num_fds, cx)).await
-    }
-
-    pub fn poll_sendmsg_vectored(&self, data: &[IoVec], cx: &mut Context) -> Poll<io::Result<()>> {
-        loop {
-            match self.socket.sendmsg_vectored(data) {
-                Ok(()) => break Poll::Ready(Ok(())),
-                Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
-                    match ready!(self.registration.poll_write_ready(cx)) {
-                        Ok(_) => continue,
-                        Err(err) => break Poll::Ready(Err(err)),
-                    }
-                }
-                Err(err) => break Poll::Ready(Err(err)),
-            }
-        }
-    }
-
-    pub async fn sendmsg_vectored(&self, data: &[IoVec<'_>]) -> io::Result<()> {
-        poll_fn(move |cx| self.poll_sendmsg_vectored(data, cx)).await
-    }
-}
index 032ca941d9a541a73818bb200098643e1ea07c3f..4cd50b0b92cc49a8c4ee83711955a21c0a0b62b9 100644 (file)
@@ -3,61 +3,9 @@
 //! Note that this should stay small, otherwise we should introduce a dependency on our `proxmox`
 //! crate as that's where we have all this stuff usually...
 
-use std::io;
-use std::marker::PhantomData;
 use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
-use std::pin::Pin;
-use std::task::{Context, Poll};
 
-use futures::ready;
-use mio::unix::EventedFd;
-use mio::{PollOpt, Ready, Token};
-use tokio::io::{AsyncRead, AsyncWrite};
-
-#[macro_export]
-macro_rules! file_descriptor_type {
-    ($type:ident) => {
-        #[repr(transparent)]
-        pub struct $type(RawFd);
-
-        crate::file_descriptor_impl!($type);
-
-        impl FromRawFd for $type {
-            unsafe fn from_raw_fd(fd: RawFd) -> Self {
-                Self(fd)
-            }
-        }
-    };
-}
-
-#[macro_export]
-macro_rules! file_descriptor_impl {
-    ($type:ty) => {
-        impl Drop for $type {
-            fn drop(&mut self) {
-                if self.0 >= 0 {
-                    unsafe {
-                        libc::close(self.0);
-                    }
-                }
-            }
-        }
-
-        impl AsRawFd for $type {
-            fn as_raw_fd(&self) -> RawFd {
-                self.0
-            }
-        }
-
-        impl IntoRawFd for $type {
-            fn into_raw_fd(mut self) -> RawFd {
-                let fd = self.0;
-                self.0 = -libc::EBADF;
-                fd
-            }
-        }
-    };
-}
+pub use io_uring::iovec::{IoVec, IoVecMut};
 
 /// Guard a raw file descriptor with a drop handler. This is mostly useful when access to an owned
 /// `RawFd` is required without the corresponding handler object (such as when only the file
@@ -73,156 +21,6 @@ impl FromRawFd for Fd {
     }
 }
 
-impl mio::Evented for Fd {
-    fn register(
-        &self,
-        poll: &mio::Poll,
-        token: Token,
-        interest: Ready,
-        opts: PollOpt,
-    ) -> io::Result<()> {
-        poll.register(&EventedFd(&self.0), token, interest, opts)
-    }
-
-    fn reregister(
-        &self,
-        poll: &mio::Poll,
-        token: Token,
-        interest: Ready,
-        opts: PollOpt,
-    ) -> io::Result<()> {
-        poll.reregister(&EventedFd(&self.0), token, interest, opts)
-    }
-
-    fn deregister(&self, poll: &mio::Poll) -> io::Result<()> {
-        poll.deregister(&EventedFd(&self.0))
-    }
-}
-
-pub struct AsyncFd {
-    fd: Fd,
-    registration: tokio_net::driver::Registration,
-}
-
-impl Drop for AsyncFd {
-    fn drop(&mut self) {
-        if let Err(err) = self.registration.deregister(&self.fd) {
-            eprintln!("failed to deregister I/O resource with reactor: {}", err);
-        }
-    }
-}
-
-impl AsyncFd {
-    pub fn new(fd: Fd) -> io::Result<Self> {
-        let registration = tokio_net::driver::Registration::new();
-        if !registration.register(&fd)? {
-            return Err(io::Error::new(
-                io::ErrorKind::Other,
-                "duplicate file descriptor registration?",
-            ));
-        }
-
-        Ok(Self { fd, registration })
-    }
-
-    pub fn poll_read_ready(&self, cx: &mut Context) -> Poll<io::Result<mio::Ready>> {
-        self.registration.poll_read_ready(cx)
-    }
-
-    pub fn poll_write_ready(&self, cx: &mut Context) -> Poll<io::Result<mio::Ready>> {
-        self.registration.poll_write_ready(cx)
-    }
-}
-
-impl AsRawFd for AsyncFd {
-    fn as_raw_fd(&self) -> RawFd {
-        self.fd.as_raw_fd()
-    }
-}
-
-// At the time of writing, tokio-fs in master was disabled as it wasn't updated to futures@0.3 yet.
-pub struct GenericStream(Option<AsyncFd>);
-
-impl GenericStream {
-    pub fn from_fd(fd: Fd) -> io::Result<Self> {
-        AsyncFd::new(fd).map(|fd| Self(Some(fd)))
-    }
-
-    fn raw_fd(&self) -> RawFd {
-        self.0
-            .as_ref()
-            .map(|fd| fd.as_raw_fd())
-            .unwrap_or(-libc::EBADF)
-    }
-
-    pub fn poll_read_ready(&self, cx: &mut Context) -> Poll<io::Result<mio::Ready>> {
-        match self.0 {
-            Some(ref fd) => fd.poll_read_ready(cx),
-            None => Poll::Ready(Err(io::ErrorKind::InvalidInput.into())),
-        }
-    }
-
-    pub fn poll_write_ready(&self, cx: &mut Context) -> Poll<io::Result<mio::Ready>> {
-        match self.0 {
-            Some(ref fd) => fd.poll_write_ready(cx),
-            None => Poll::Ready(Err(io::ErrorKind::InvalidInput.into())),
-        }
-    }
-}
-
-impl AsyncRead for GenericStream {
-    fn poll_read(
-        self: Pin<&mut Self>,
-        cx: &mut Context,
-        buf: &mut [u8],
-    ) -> Poll<io::Result<usize>> {
-        loop {
-            let res = unsafe { libc::read(self.raw_fd(), buf.as_mut_ptr() as *mut _, buf.len()) };
-            if res >= 0 {
-                return Poll::Ready(Ok(res as usize));
-            }
-
-            let err = io::Error::last_os_error();
-            if err.kind() == io::ErrorKind::WouldBlock {
-                match ready!(self.poll_read_ready(cx)) {
-                    Ok(_) => continue,
-                    Err(err) => return Poll::Ready(Err(err)),
-                }
-            }
-            return Poll::Ready(Err(err));
-        }
-    }
-}
-
-impl AsyncWrite for GenericStream {
-    fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
-        loop {
-            let res = unsafe { libc::write(self.raw_fd(), buf.as_ptr() as *const _, buf.len()) };
-            if res >= 0 {
-                return Poll::Ready(Ok(res as usize));
-            }
-
-            let err = io::Error::last_os_error();
-            if err.kind() == io::ErrorKind::WouldBlock {
-                match ready!(self.poll_write_ready(cx)) {
-                    Ok(_) => continue,
-                    Err(err) => return Poll::Ready(Err(err)),
-                }
-            }
-            return Poll::Ready(Err(err));
-        }
-    }
-
-    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
-        Poll::Ready(Ok(()))
-    }
-
-    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
-        std::mem::drop(self.get_mut().0.take());
-        Poll::Ready(Ok(()))
-    }
-}
-
 /// Byte vector utilities.
 pub mod vec {
     /// Create an uninitialized byte vector of a specific size.
@@ -243,66 +41,6 @@ pub mod vec {
     }
 }
 
-/// The standard IoSlice does not implement Send and Sync. These types do.
-pub struct IoVec<'a> {
-    _iov: libc::iovec,
-    _phantom: PhantomData<&'a [u8]>,
-}
-
-unsafe impl Send for IoVec<'_> {}
-unsafe impl Sync for IoVec<'_> {}
-
-impl IoVec<'_> {
-    pub fn new(slice: &[u8]) -> Self {
-        Self {
-            _iov: libc::iovec {
-                iov_base: slice.as_ptr() as *mut libc::c_void,
-                iov_len: slice.len(),
-            },
-            _phantom: PhantomData,
-        }
-    }
-}
-
-pub struct IoVecMut<'a> {
-    _iov: libc::iovec,
-    _phantom: PhantomData<&'a [u8]>,
-}
-
-unsafe impl Send for IoVecMut<'_> {}
-unsafe impl Sync for IoVecMut<'_> {}
-
-impl IoVecMut<'_> {
-    pub fn new(slice: &mut [u8]) -> Self {
-        Self {
-            _iov: libc::iovec {
-                iov_base: slice.as_mut_ptr() as *mut libc::c_void,
-                iov_len: slice.len(),
-            },
-            _phantom: PhantomData,
-        }
-    }
-}
-
-#[macro_export]
-macro_rules! c_call {
-    ($expr:expr) => {{
-        let res = $expr;
-        if res == -1 {
-            Err(::std::io::Error::last_os_error())
-        } else {
-            Ok::<_, ::std::io::Error>(res)
-        }
-    }};
-}
-
-#[macro_export]
-macro_rules! c_try {
-    ($expr:expr) => {
-        crate::c_call!($expr)?
-    };
-}
-
 pub trait FromFd {
     fn from_fd(fd: Fd) -> Self;
 }
@@ -312,17 +50,3 @@ impl<T: FromRawFd> FromFd for T {
         unsafe { Self::from_raw_fd(fd.into_raw_fd()) }
     }
 }
-
-#[macro_export]
-macro_rules! io_format_err {
-    ($($msg:tt)*) => {
-        ::std::io::Error::new(::std::io::ErrorKind::Other, format!($($msg)*))
-    };
-}
-
-#[macro_export]
-macro_rules! io_bail {
-    ($($msg:tt)*) => {
-        return Err(::std::io::Error::new(::std::io::ErrorKind::Other, format!($($msg)*)));
-    };
-}