]> git.proxmox.com Git - pve-lxc-syscalld.git/commitdiff
switch back to tokio now that it's stable and packaged
authorWolfgang Bumiller <w.bumiller@proxmox.com>
Tue, 21 Jan 2020 10:12:50 +0000 (11:12 +0100)
committerWolfgang Bumiller <w.bumiller@proxmox.com>
Tue, 21 Jan 2020 14:38:06 +0000 (15:38 +0100)
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
14 files changed:
.cargo/config [new file with mode: 0644]
Cargo.toml
src/capability.rs
src/executor.rs [deleted file]
src/fork.rs
src/io/epoll.rs [deleted file]
src/io/mod.rs
src/io/pipe.rs
src/io/polled_fd.rs [new file with mode: 0644]
src/io/reactor.rs [deleted file]
src/io/seq_packet.rs
src/macros.rs
src/main.rs
src/tools.rs

diff --git a/.cargo/config b/.cargo/config
new file mode 100644 (file)
index 0000000..3b5b6e4
--- /dev/null
@@ -0,0 +1,5 @@
+[source]
+[source.debian-packages]
+directory = "/usr/share/cargo/registry"
+[source.crates-io]
+replace-with = "debian-packages"
index 4f177cb33a90beac02cf7e20fdc64d1f965348f6..06c265ba1c2a03a926a4a7789653ea734a0d54f2 100644 (file)
@@ -8,7 +8,9 @@ authors = [
 
 [dependencies]
 bitflags = "1.2"
-failure = { version = "0.1", default-features = false, features = ["std"] }
+failure = { version = "0.1", default-features = false, features = [ "std" ] }
 lazy_static = "1.4"
 libc = "0.2"
 nix = "0.16"
+mio = "0.6.21"
+tokio = { version = "0.2.9", features = [ "rt-threaded", "io-driver", "io-util" ] }
index 7c66e1a9e96b16e14748cc6a36d917c9becb438c..7b434de5c0ffa1fdf7923b02bca91b445bcd8cb3 100644 (file)
@@ -50,13 +50,14 @@ impl Capabilities {
 
     /// Change our process capabilities. This does not include the bounding set.
     pub fn capset(&self) -> io::Result<()> {
-        #![allow(dead_code)]
         // kernel abi:
+        #[allow(dead_code)]
         struct Header {
             version: u32,
             pid: c_int,
         }
 
+        #[allow(dead_code)]
         struct Data {
             effective: u32,
             permitted: u32,
diff --git a/src/executor.rs b/src/executor.rs
deleted file mode 100644 (file)
index ff29981..0000000
+++ /dev/null
@@ -1,237 +0,0 @@
-use std::cell::RefCell;
-use std::collections::VecDeque;
-use std::future::Future;
-use std::io;
-use std::pin::Pin;
-use std::sync::{Arc, Condvar, Mutex, Weak};
-use std::task::{Context, Poll};
-use std::thread::JoinHandle;
-
-type BoxFut = Box<dyn Future<Output = ()> + Send + 'static>;
-
-#[derive(Clone)]
-struct Task(Arc<TaskInner>);
-
-impl Task {
-    fn into_raw(this: Task) -> *const TaskInner {
-        Arc::into_raw(this.0)
-    }
-
-    unsafe fn from_raw(ptr: *const TaskInner) -> Self {
-        Self(Arc::from_raw(ptr))
-    }
-
-    fn wake(self) {
-        if let Some(queue) = self.0.queue.upgrade() {
-            queue.queue(self);
-        }
-    }
-
-    fn into_raw_waker(this: Task) -> std::task::RawWaker {
-        std::task::RawWaker::new(
-            Task::into_raw(this) as *const (),
-            &std::task::RawWakerVTable::new(
-                waker_clone_fn,
-                waker_wake_fn,
-                waker_wake_by_ref_fn,
-                waker_drop_fn,
-            ),
-        )
-    }
-}
-
-struct TaskInner {
-    future: Mutex<Option<BoxFut>>,
-    queue: Weak<TaskQueue>,
-}
-
-struct TaskQueue {
-    queue: Mutex<VecDeque<Task>>,
-    queue_cv: Condvar,
-}
-
-impl TaskQueue {
-    fn new() -> Self {
-        Self {
-            queue: Mutex::new(VecDeque::with_capacity(32)),
-            queue_cv: Condvar::new(),
-        }
-    }
-
-    fn new_task(self: Arc<TaskQueue>, future: BoxFut) {
-        let task = Task(Arc::new(TaskInner {
-            future: Mutex::new(Some(future)),
-            queue: Arc::downgrade(&self),
-        }));
-
-        self.queue(task);
-    }
-
-    fn queue(&self, task: Task) {
-        let mut queue = self.queue.lock().unwrap();
-        queue.push_back(task);
-        self.queue_cv.notify_one();
-    }
-
-    /// Blocks until a task is available
-    fn get_task(&self) -> Task {
-        let mut queue = self.queue.lock().unwrap();
-        loop {
-            if let Some(task) = queue.pop_front() {
-                return task;
-            } else {
-                queue = self.queue_cv.wait(queue).unwrap();
-            }
-        }
-    }
-}
-
-pub struct ThreadPool {
-    _threads: Mutex<Vec<JoinHandle<()>>>,
-    queue: Arc<TaskQueue>,
-}
-
-impl ThreadPool {
-    pub fn new() -> io::Result<Self> {
-        let count = num_cpus()?;
-
-        let queue = Arc::new(TaskQueue::new());
-
-        let mut threads = Vec::new();
-        for thread_id in 0..count {
-            threads.push(std::thread::spawn({
-                let queue = Arc::clone(&queue);
-                move || thread_main(queue, thread_id)
-            }));
-        }
-
-        Ok(Self {
-            _threads: Mutex::new(threads),
-            queue,
-        })
-    }
-
-    pub fn spawn_ok<T>(&self, future: T)
-    where
-        T: Future<Output = ()> + Send + 'static,
-    {
-        self.do_spawn(Box::new(future));
-    }
-
-    fn do_spawn(&self, future: BoxFut) {
-        Arc::clone(&self.queue).new_task(future);
-    }
-
-    pub fn run<R, T>(&self, future: T) -> R
-    where
-        T: Future<Output = R> + Send + 'static,
-        R: Send + 'static,
-    {
-        let mutex: Arc<Mutex<Option<R>>> = Arc::new(Mutex::new(None));
-        let cv = Arc::new(Condvar::new());
-        let mut guard = mutex.lock().unwrap();
-        self.spawn_ok({
-            let mutex = Arc::clone(&mutex);
-            let cv = Arc::clone(&cv);
-            async move {
-                let result = future.await;
-                *(mutex.lock().unwrap()) = Some(result);
-                cv.notify_all();
-            }
-        });
-        loop {
-            guard = cv.wait(guard).unwrap();
-            if let Some(result) = guard.take() {
-                return result;
-            }
-        }
-    }
-}
-
-thread_local! {
-    static CURRENT_QUEUE: RefCell<*const TaskQueue> = RefCell::new(std::ptr::null());
-    static CURRENT_TASK: RefCell<*const Task> = RefCell::new(std::ptr::null());
-}
-
-fn thread_main(task_queue: Arc<TaskQueue>, _thread_id: usize) {
-    CURRENT_QUEUE.with(|q| *q.borrow_mut() = task_queue.as_ref() as *const TaskQueue);
-
-    let local_waker = unsafe {
-        std::task::Waker::from_raw(std::task::RawWaker::new(
-            std::ptr::null(),
-            &std::task::RawWakerVTable::new(
-                local_waker_clone_fn,
-                local_waker_wake_fn,
-                local_waker_wake_fn,
-                local_waker_drop_fn,
-            ),
-        ))
-    };
-
-    let mut context = Context::from_waker(&local_waker);
-
-    loop {
-        let task: Task = task_queue.get_task();
-        let task: Pin<&Task> = Pin::new(&task);
-        let task = task.get_ref();
-        CURRENT_TASK.with(|c| *c.borrow_mut() = task as *const Task);
-
-        let mut task_future = task.0.future.lock().unwrap();
-        match task_future.take() {
-            Some(mut future) => {
-                //eprintln!("Thread {} has some work!", thread_id);
-                let pin = unsafe { Pin::new_unchecked(&mut *future) };
-                match pin.poll(&mut context) {
-                    Poll::Ready(()) => (), // done with that task
-                    Poll::Pending => {
-                        *task_future = Some(future);
-                    }
-                }
-            }
-            None => eprintln!("task polled after ready"),
-        }
-    }
-}
-
-unsafe fn local_waker_clone_fn(_: *const ()) -> std::task::RawWaker {
-    let task: Task = CURRENT_TASK.with(|t| Task::clone(&**t.borrow()));
-    Task::into_raw_waker(task)
-}
-
-unsafe fn local_waker_wake_fn(_: *const ()) {
-    let task: Task = CURRENT_TASK.with(|t| Task::clone(&**t.borrow()));
-    CURRENT_QUEUE.with(|q| (**q.borrow()).queue(task));
-}
-
-unsafe fn local_waker_drop_fn(_: *const ()) {}
-
-unsafe fn waker_clone_fn(this: *const ()) -> std::task::RawWaker {
-    let this = Task::from_raw(this as *const TaskInner);
-    let clone = this.clone();
-    let _ = Task::into_raw(this);
-    Task::into_raw_waker(clone)
-}
-
-unsafe fn waker_wake_fn(this: *const ()) {
-    let this = Task::from_raw(this as *const TaskInner);
-    this.wake();
-}
-
-unsafe fn waker_wake_by_ref_fn(this: *const ()) {
-    let this = Task::from_raw(this as *const TaskInner);
-    this.clone().wake();
-    let _ = Task::into_raw(this);
-}
-
-unsafe fn waker_drop_fn(this: *const ()) {
-    let _this = Task::from_raw(this as *const TaskInner);
-}
-
-fn num_cpus() -> io::Result<usize> {
-    let rc = unsafe { libc::sysconf(libc::_SC_NPROCESSORS_ONLN) };
-    if rc < 0 {
-        Err(io::Error::last_os_error())
-    } else {
-        Ok(rc as usize)
-    }
-}
index be87f6cc8306fcd1f02562c6055eda0965a12271..405be8b86869fc1afe07ee5f468b695a25751bf9 100644 (file)
@@ -9,6 +9,8 @@ use std::os::raw::c_int;
 use std::os::unix::io::{FromRawFd, IntoRawFd};
 use std::panic::UnwindSafe;
 
+use tokio::io::AsyncReadExt;
+
 use crate::io::pipe::{self, Pipe};
 use crate::syscall::SyscallStatus;
 use crate::tools::Fd;
diff --git a/src/io/epoll.rs b/src/io/epoll.rs
deleted file mode 100644 (file)
index 8750f10..0000000
+++ /dev/null
@@ -1,84 +0,0 @@
-use std::convert::TryFrom;
-use std::io;
-use std::os::raw::c_int;
-use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
-use std::time::Duration;
-
-use crate::error::io_err_other;
-use crate::tools::Fd;
-
-pub type EpollEvent = libc::epoll_event;
-
-pub const EPOLLIN: u32 = libc::EPOLLIN as u32;
-pub const EPOLLET: u32 = libc::EPOLLET as u32;
-pub const EPOLLOUT: u32 = libc::EPOLLOUT as u32;
-pub const EPOLLERR: u32 = libc::EPOLLERR as u32;
-pub const EPOLLHUP: u32 = libc::EPOLLHUP as u32;
-
-pub struct Epoll {
-    fd: Fd,
-}
-
-impl Epoll {
-    pub fn new() -> io::Result<Self> {
-        let fd = unsafe { Fd::from_raw_fd(c_try!(libc::epoll_create1(libc::EPOLL_CLOEXEC))) };
-        Ok(Self { fd })
-    }
-
-    pub fn add_file<T: AsRawFd>(&self, fd: &T, events: u32, data: u64) -> io::Result<()> {
-        self.add_fd(fd.as_raw_fd(), events, data)
-    }
-
-    pub fn modify_file<T: AsRawFd>(&self, fd: &T, events: u32, data: u64) -> io::Result<()> {
-        self.modify_fd(fd.as_raw_fd(), events, data)
-    }
-
-    pub fn remove_file<T: AsRawFd>(&self, fd: &T) -> io::Result<()> {
-        self.remove_fd(fd.as_raw_fd())
-    }
-
-    fn addmod_fd(&self, op: c_int, fd: RawFd, events: u32, data: u64) -> io::Result<()> {
-        let mut events = libc::epoll_event {
-            events,
-            r#u64: data,
-        };
-        c_try!(unsafe { libc::epoll_ctl(self.fd.as_raw_fd(), op, fd, &mut events) });
-        Ok(())
-    }
-
-    pub fn add_fd(&self, fd: RawFd, events: u32, data: u64) -> io::Result<()> {
-        self.addmod_fd(libc::EPOLL_CTL_ADD, fd, events, data)
-    }
-
-    pub fn modify_fd(&self, fd: RawFd, events: u32, data: u64) -> io::Result<()> {
-        self.addmod_fd(libc::EPOLL_CTL_MOD, fd, events, data)
-    }
-
-    pub fn remove_fd(&self, fd: RawFd) -> io::Result<()> {
-        c_try!(unsafe {
-            libc::epoll_ctl(
-                self.fd.as_raw_fd(),
-                libc::EPOLL_CTL_DEL,
-                fd,
-                std::ptr::null_mut(),
-            )
-        });
-        Ok(())
-    }
-
-    pub fn wait(
-        &self,
-        event_buf: &mut [EpollEvent],
-        timeout: Option<Duration>,
-    ) -> io::Result<usize> {
-        let millis = timeout
-            .map(|t| c_int::try_from(t.as_millis()))
-            .transpose()
-            .map_err(io_err_other)?
-            .unwrap_or(-1);
-        let epfd = self.fd.as_raw_fd();
-        let buf_len = c_int::try_from(event_buf.len()).map_err(io_err_other)?;
-        let rc = c_try!(unsafe { libc::epoll_wait(epfd, event_buf.as_mut_ptr(), buf_len, millis) });
-        Ok(rc as usize)
-    }
-}
index 941491584bfe6b0f80c7e9c3180045a50e6c1708..2cf0cb95f986a34ee8c2ce4fd087a575e6bfe1bf 100644 (file)
@@ -1,7 +1,6 @@
 pub mod cmsg;
-pub mod epoll;
 pub mod iovec;
 pub mod pipe;
-pub mod reactor;
+pub mod polled_fd;
 pub mod rw_traits;
 pub mod seq_packet;
index 33ee6ea8c13cdac00982168844eb2c5af145f01d..1335f1ac7bce767d6e3b78b6460110d0fbc2bcd4 100644 (file)
@@ -2,12 +2,14 @@ use std::convert::TryFrom;
 use std::io;
 use std::marker::PhantomData;
 use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
+use std::pin::Pin;
 use std::task::{Context, Poll};
 
+use tokio::io::{AsyncRead, AsyncWrite};
+
 use crate::error::io_err_other;
-use crate::io::reactor::PolledFd;
+use crate::io::polled_fd::PolledFd;
 use crate::io::rw_traits;
-use crate::poll_fn::poll_fn;
 use crate::tools::Fd;
 
 pub use rw_traits::{Read, Write};
@@ -37,58 +39,53 @@ pub fn pipe() -> io::Result<(Pipe<rw_traits::Read>, Pipe<rw_traits::Write>)> {
     c_try!(unsafe { libc::pipe2(pfd.as_mut_ptr(), libc::O_CLOEXEC) });
 
     let (fd_in, fd_out) = unsafe { (Fd::from_raw_fd(pfd[0]), Fd::from_raw_fd(pfd[1])) };
-    let fd_in = PolledFd::new(fd_in)?;
-    let fd_out = PolledFd::new(fd_out)?;
 
     Ok((
         Pipe {
-            fd: fd_in,
+            fd: PolledFd::new(fd_in)?,
             _phantom: PhantomData,
         },
         Pipe {
-            fd: fd_out,
+            fd: PolledFd::new(fd_out)?,
             _phantom: PhantomData,
         },
     ))
 }
 
-impl<RW: rw_traits::HasRead> Pipe<RW> {
-    pub fn poll_read(&mut self, cx: &mut Context, data: &mut [u8]) -> Poll<io::Result<usize>> {
-        let size = libc::size_t::try_from(data.len()).map_err(io_err_other)?;
-        let fd = self.fd.as_raw_fd();
+impl<RW: rw_traits::HasRead> AsyncRead for Pipe<RW> {
+    fn poll_read(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        buf: &mut [u8],
+    ) -> Poll<io::Result<usize>> {
         self.fd.wrap_read(cx, || {
-            c_result!(unsafe { libc::read(fd, data.as_mut_ptr() as *mut libc::c_void, size) })
+            let fd = self.as_raw_fd();
+            let size = libc::size_t::try_from(buf.len()).map_err(io_err_other)?;
+            c_result!(unsafe { libc::read(fd, buf.as_mut_ptr() as *mut libc::c_void, size) })
                 .map(|res| res as usize)
         })
     }
-
-    pub async fn read(&mut self, data: &mut [u8]) -> io::Result<usize> {
-        poll_fn(move |cx| self.poll_read(cx, data)).await
-    }
-
-    pub async fn read_exact(&mut self, mut data: &mut [u8]) -> io::Result<()> {
-        while !data.is_empty() {
-            match self.read(&mut data[..]).await {
-                Ok(got) => data = &mut data[got..],
-                Err(ref err) if err.kind() == io::ErrorKind::Interrupted => continue,
-                Err(err) => return Err(err),
-            }
-        }
-        Ok(())
-    }
 }
 
-impl<RW: rw_traits::HasWrite> Pipe<RW> {
-    pub fn poll_write(&mut self, data: &[u8], cx: &mut Context) -> Poll<io::Result<usize>> {
-        let fd = self.fd.as_raw_fd();
-        let size = libc::size_t::try_from(data.len()).map_err(io_err_other)?;
+impl<RW: rw_traits::HasWrite> AsyncWrite for Pipe<RW> {
+    fn poll_write(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        buf: &[u8],
+    ) -> Poll<io::Result<usize>> {
         self.fd.wrap_write(cx, || {
-            c_result!(unsafe { libc::write(fd, data.as_ptr() as *const libc::c_void, size) })
+            let fd = self.as_raw_fd();
+            let size = libc::size_t::try_from(buf.len()).map_err(io_err_other)?;
+            c_result!(unsafe { libc::write(fd, buf.as_ptr() as *const libc::c_void, size) })
                 .map(|res| res as usize)
         })
     }
 
-    pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
-        poll_fn(move |cx| self.poll_write(data, cx)).await
+    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+        Poll::Ready(Ok(()))
+    }
+
+    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+        Poll::Ready(Ok(()))
     }
 }
diff --git a/src/io/polled_fd.rs b/src/io/polled_fd.rs
new file mode 100644 (file)
index 0000000..8a17d76
--- /dev/null
@@ -0,0 +1,134 @@
+use std::io;
+use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
+use std::task::{Context, Poll};
+
+use mio::event::Evented;
+use mio::unix::EventedFd as MioEventedFd;
+use mio::Poll as MioPoll;
+use mio::{PollOpt, Ready, Token};
+use tokio::io::PollEvented;
+
+use crate::tools::Fd;
+
+#[repr(transparent)]
+pub struct EventedFd {
+    fd: Fd,
+}
+
+impl EventedFd {
+    #[inline]
+    pub fn new(fd: Fd) -> Self {
+        Self { fd }
+    }
+}
+
+impl AsRawFd for EventedFd {
+    #[inline]
+    fn as_raw_fd(&self) -> RawFd {
+        self.fd.as_raw_fd()
+    }
+}
+
+impl FromRawFd for EventedFd {
+    #[inline]
+    unsafe fn from_raw_fd(fd: RawFd) -> Self {
+        Self::new(Fd::from_raw_fd(fd))
+    }
+}
+
+impl IntoRawFd for EventedFd {
+    #[inline]
+    fn into_raw_fd(self) -> RawFd {
+        self.fd.into_raw_fd()
+    }
+}
+
+impl Evented for EventedFd {
+    fn register(
+        &self,
+        poll: &MioPoll,
+        token: Token,
+        interest: Ready,
+        opts: PollOpt,
+    ) -> io::Result<()> {
+        MioEventedFd(self.fd.as_ref()).register(poll, token, interest, opts)
+    }
+
+    fn reregister(
+        &self,
+        poll: &MioPoll,
+        token: Token,
+        interest: Ready,
+        opts: PollOpt,
+    ) -> io::Result<()> {
+        MioEventedFd(self.fd.as_ref()).reregister(poll, token, interest, opts)
+    }
+
+    fn deregister(&self, poll: &MioPoll) -> io::Result<()> {
+        MioEventedFd(self.fd.as_ref()).deregister(poll)
+    }
+}
+
+#[repr(transparent)]
+pub struct PolledFd {
+    fd: PollEvented<EventedFd>,
+}
+
+impl PolledFd {
+    pub fn new(fd: Fd) -> tokio::io::Result<Self> {
+        Ok(Self {
+            fd: PollEvented::new(EventedFd::new(fd))?,
+        })
+    }
+
+    pub fn wrap_read<T>(
+        &self,
+        cx: &mut Context,
+        func: impl FnOnce() -> io::Result<T>,
+    ) -> Poll<io::Result<T>> {
+        ready!(self.fd.poll_read_ready(cx, mio::Ready::readable()))?;
+        match func() {
+            Ok(out) => Poll::Ready(Ok(out)),
+            Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
+                self.fd.clear_read_ready(cx, mio::Ready::readable())?;
+                Poll::Pending
+            }
+            Err(err) => Poll::Ready(Err(err)),
+        }
+    }
+
+    pub fn wrap_write<T>(
+        &self,
+        cx: &mut Context,
+        func: impl FnOnce() -> io::Result<T>,
+    ) -> Poll<io::Result<T>> {
+        ready!(self.fd.poll_write_ready(cx))?;
+        match func() {
+            Ok(out) => Poll::Ready(Ok(out)),
+            Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
+                self.fd.clear_write_ready(cx)?;
+                Poll::Pending
+            }
+            Err(err) => Poll::Ready(Err(err)),
+        }
+    }
+}
+
+impl AsRawFd for PolledFd {
+    #[inline]
+    fn as_raw_fd(&self) -> RawFd {
+        self.fd.get_ref().as_raw_fd()
+    }
+}
+
+impl IntoRawFd for PolledFd {
+    #[inline]
+    fn into_raw_fd(self) -> RawFd {
+        // for the kind of resource we're managing it should always be possible to extract it from
+        // its driver
+        self.fd
+            .into_inner()
+            .expect("failed to remove polled file descriptor from reactor")
+            .into_raw_fd()
+    }
+}
diff --git a/src/io/reactor.rs b/src/io/reactor.rs
deleted file mode 100644 (file)
index a27ba3d..0000000
+++ /dev/null
@@ -1,227 +0,0 @@
-use std::io;
-use std::os::unix::io::{AsRawFd, IntoRawFd, RawFd};
-use std::sync::atomic::{AtomicBool, Ordering};
-use std::sync::{Arc, Mutex, Once};
-use std::task::{Context, Poll, Waker};
-use std::thread::JoinHandle;
-
-use crate::error::io_err_other;
-use crate::io::epoll::{Epoll, EpollEvent, EPOLLERR, EPOLLET, EPOLLHUP, EPOLLIN, EPOLLOUT};
-use crate::tools::Fd;
-
-static START: Once = Once::new();
-static mut REACTOR: Option<Arc<Reactor>> = None;
-
-pub fn default_reactor() -> Arc<Reactor> {
-    START.call_once(|| unsafe {
-        let reactor = Reactor::new().expect("setup main epoll reactor");
-        REACTOR = Some(reactor);
-    });
-    unsafe { Arc::clone(REACTOR.as_ref().unwrap()) }
-}
-
-pub struct Reactor {
-    epoll: Arc<Epoll>,
-    removed: Mutex<Vec<Box<RegistrationInner>>>,
-    thread: Mutex<Option<JoinHandle<()>>>,
-}
-
-impl Reactor {
-    pub fn new() -> io::Result<Arc<Self>> {
-        let epoll = Arc::new(Epoll::new()?);
-
-        let this = Arc::new(Reactor {
-            epoll,
-            removed: Mutex::new(Vec::new()),
-            thread: Mutex::new(None),
-        });
-
-        let handle = std::thread::spawn({
-            let this = Arc::clone(&this);
-            move || this.thread_main()
-        });
-
-        this.thread.lock().unwrap().replace(handle);
-
-        Ok(this)
-    }
-
-    fn thread_main(self: Arc<Self>) {
-        let mut buf: [EpollEvent; 16] = unsafe { std::mem::zeroed() };
-        loop {
-            let count = match self.epoll.wait(&mut buf, None) {
-                Ok(count) => count,
-                Err(err) => {
-                    eprintln!("error in epoll loop: {}", err);
-                    std::process::exit(1);
-                }
-            };
-            for i in 0..count {
-                self.handle_event(&buf[i]);
-            }
-            // After going through the events we can release memory associated with already closed
-            // file descriptors:
-            self.removed.lock().unwrap().clear();
-        }
-    }
-
-    fn handle_event(&self, event: &EpollEvent) {
-        let registration = unsafe { &mut *(event.r#u64 as *mut RegistrationInner) };
-        if registration.gone.load(Ordering::Acquire) {
-            return;
-        }
-
-        if 0 != (event.events & EPOLLIN) {
-            //let _prev = registration.ready.fetch_or(READY_IN, Ordering::AcqRel);
-            if let Some(waker) = registration.read_waker.lock().unwrap().take() {
-                waker.wake();
-            }
-        }
-
-        if 0 != (event.events & EPOLLOUT) {
-            //let _prev = registration.ready.fetch_or(READY_OUT, Ordering::AcqRel);
-            if let Some(waker) = registration.write_waker.lock().unwrap().take() {
-                waker.wake();
-            }
-        }
-
-        if 0 != (event.events & (EPOLLERR | EPOLLHUP)) {
-            //let _prev = registration.ready.fetch_or(READY_ERR, Ordering::AcqRel);
-            if let Some(waker) = registration.read_waker.lock().unwrap().take() {
-                waker.wake();
-            }
-            if let Some(waker) = registration.write_waker.lock().unwrap().take() {
-                waker.wake();
-            }
-        }
-    }
-
-    pub fn register(self: Arc<Self>, fd: RawFd) -> io::Result<Registration> {
-        let mut inner = Box::new(RegistrationInner {
-            gone: AtomicBool::new(false),
-            reactor: Arc::clone(&self),
-            read_waker: Mutex::new(None),
-            write_waker: Mutex::new(None),
-        });
-
-        let inner_ptr = {
-            // type check/assertion
-            let inner_ptr: &mut RegistrationInner = &mut *inner;
-            // make raw pointer
-            inner_ptr as *mut RegistrationInner as usize as u64
-        };
-
-        self.epoll
-            .add_fd(fd, EPOLLIN | EPOLLOUT | EPOLLET, inner_ptr)?;
-
-        Ok(Registration { inner: Some(inner) })
-    }
-
-    fn deregister(&self, registration: Box<RegistrationInner>) {
-        self.removed.lock().unwrap().push(registration);
-    }
-}
-
-pub struct Registration {
-    // pin the data in memory because the other thread will access it
-    // ManuallyDrop::take is nightly only :<
-    inner: Option<Box<RegistrationInner>>,
-}
-
-impl Drop for Registration {
-    fn drop(&mut self) {
-        if let Some(inner) = self.inner.take() {
-            let reactor = Arc::clone(&inner.reactor);
-            inner.gone.store(true, Ordering::Release);
-            reactor.deregister(inner);
-        }
-    }
-}
-
-// This is accessed by the reactor
-struct RegistrationInner {
-    gone: AtomicBool,
-    reactor: Arc<Reactor>,
-    read_waker: Mutex<Option<Waker>>,
-    write_waker: Mutex<Option<Waker>>,
-}
-
-pub struct PolledFd {
-    fd: Fd,
-    registration: Registration,
-}
-
-impl AsRawFd for PolledFd {
-    #[inline]
-    fn as_raw_fd(&self) -> RawFd {
-        self.fd.as_raw_fd()
-    }
-}
-
-impl IntoRawFd for PolledFd {
-    fn into_raw_fd(mut self) -> RawFd {
-        let registration = self.registration.inner.take().unwrap();
-        registration
-            .reactor
-            .epoll
-            .remove_fd(self.as_raw_fd())
-            .expect("cannot remove PolledFd from epoll instance");
-        self.fd.into_raw_fd()
-    }
-}
-
-impl PolledFd {
-    pub fn new(mut fd: Fd) -> io::Result<Self> {
-        fd.set_nonblocking(true).map_err(io_err_other)?;
-        Self::new_with_reactor(fd, self::default_reactor())
-    }
-
-    pub fn new_with_reactor(fd: Fd, reactor: Arc<Reactor>) -> io::Result<Self> {
-        let registration = reactor.register(fd.as_raw_fd())?;
-        Ok(Self { fd, registration })
-    }
-
-    pub fn wrap_read<T, F>(&self, cx: &mut Context, func: F) -> Poll<io::Result<T>>
-    where
-        F: FnOnce() -> io::Result<T>,
-    {
-        let mut read_waker = self
-            .registration
-            .inner
-            .as_ref()
-            .unwrap()
-            .read_waker
-            .lock()
-            .unwrap();
-        match func() {
-            Ok(out) => Poll::Ready(Ok(out)),
-            Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
-                *read_waker = Some(cx.waker().clone());
-                Poll::Pending
-            }
-            Err(err) => Poll::Ready(Err(err)),
-        }
-    }
-
-    pub fn wrap_write<T, F>(&self, cx: &mut Context, func: F) -> Poll<io::Result<T>>
-    where
-        F: FnOnce() -> io::Result<T>,
-    {
-        let mut write_waker = self
-            .registration
-            .inner
-            .as_ref()
-            .unwrap()
-            .write_waker
-            .lock()
-            .unwrap();
-        match func() {
-            Ok(out) => Poll::Ready(Ok(out)),
-            Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
-                *write_waker = Some(cx.waker().clone());
-                Poll::Pending
-            }
-            Err(err) => Poll::Ready(Err(err)),
-        }
-    }
-}
index fee47cb5229631387b86c8c69b19e42575170323..aa5a62e82e33e3248e1a46bbb49815d6d6abe7e7 100644 (file)
@@ -6,7 +6,7 @@ use failure::Error;
 use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType};
 
 use crate::io::iovec::{IoVec, IoVecMut};
-use crate::io::reactor::PolledFd;
+use crate::io::polled_fd::PolledFd;
 use crate::poll_fn::poll_fn;
 use crate::tools::AssertSendSync;
 use crate::tools::Fd;
@@ -44,7 +44,7 @@ impl SeqPacketListener {
     }
 
     pub fn poll_accept(&mut self, cx: &mut Context) -> Poll<io::Result<SeqPacketSocket>> {
-        let fd = self.fd.as_raw_fd();
+        let fd = self.as_raw_fd();
         let res = self.fd.wrap_read(cx, || {
             c_result!(unsafe {
                 libc::accept4(fd, ptr::null_mut(), ptr::null_mut(), libc::SOCK_CLOEXEC)
index 40546610317ac1da62b67bfbbf9e2f29265504b8..5cc99223267f22bb4c7cabfc3b91b652824de0a1 100644 (file)
@@ -77,3 +77,13 @@ macro_rules! io_bail {
         return Err(::std::io::Error::new(::std::io::ErrorKind::Other, format!($($msg)*)));
     };
 }
+
+macro_rules! ready {
+    ($expr:expr) => {{
+        use std::task::Poll;
+        match $expr {
+            Poll::Ready(v) => v,
+            Poll::Pending => return Poll::Pending,
+        }
+    }};
+}
index 301a6ae651d335d2b3d30f55302883facad53f37..1874f6eb21bcca0645850a904884af2d7026b1e5 100644 (file)
@@ -11,7 +11,6 @@ pub mod apparmor;
 pub mod capability;
 pub mod client;
 pub mod error;
-pub mod executor;
 pub mod fork;
 pub mod io;
 pub mod lxcseccomp;
@@ -26,24 +25,14 @@ pub mod tools;
 
 use crate::io::seq_packet::SeqPacketListener;
 
-static mut EXECUTOR: *mut executor::ThreadPool = std::ptr::null_mut();
-
-pub fn executor() -> &'static executor::ThreadPool {
-    unsafe { &*EXECUTOR }
-}
-
 pub fn spawn(fut: impl Future<Output = ()> + Send + 'static) {
-    executor().spawn_ok(fut)
+    tokio::spawn(fut);
 }
 
 fn main() {
-    let mut executor = executor::ThreadPool::new().expect("spawning worker threadpool");
-    unsafe {
-        EXECUTOR = &mut executor;
-    }
-    std::sync::atomic::fence(std::sync::atomic::Ordering::Release);
+    let mut rt = tokio::runtime::Runtime::new().expect("failed to spawn tokio runtime");
 
-    if let Err(err) = executor.run(do_main()) {
+    if let Err(err) = rt.block_on(do_main()) {
         eprintln!("error: {}", err);
         std::process::exit(1);
     }
index 8ed3d50495cd77cb1d9a224b13ac8f389877bdc5..01a32b59e8c7fcf9269565337493db710d513d0d 100644 (file)
@@ -29,6 +29,13 @@ impl Fd {
     }
 }
 
+impl AsRef<RawFd> for Fd {
+    #[inline]
+    fn as_ref(&self) -> &RawFd {
+        &self.0
+    }
+}
+
 /// Byte vector utilities.
 pub mod vec {
     /// Create an uninitialized byte vector of a specific size.