From: Wolfgang Bumiller Date: Tue, 21 Jan 2020 10:12:50 +0000 (+0100) Subject: switch back to tokio now that it's stable and packaged X-Git-Url: https://git.proxmox.com/?a=commitdiff_plain;h=5bd0c5620f6f1339a5cfec8318fb525d457a7f61;p=pve-lxc-syscalld.git switch back to tokio now that it's stable and packaged Signed-off-by: Wolfgang Bumiller --- diff --git a/.cargo/config b/.cargo/config new file mode 100644 index 0000000..3b5b6e4 --- /dev/null +++ b/.cargo/config @@ -0,0 +1,5 @@ +[source] +[source.debian-packages] +directory = "/usr/share/cargo/registry" +[source.crates-io] +replace-with = "debian-packages" diff --git a/Cargo.toml b/Cargo.toml index 4f177cb..06c265b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,9 @@ authors = [ [dependencies] bitflags = "1.2" -failure = { version = "0.1", default-features = false, features = ["std"] } +failure = { version = "0.1", default-features = false, features = [ "std" ] } lazy_static = "1.4" libc = "0.2" nix = "0.16" +mio = "0.6.21" +tokio = { version = "0.2.9", features = [ "rt-threaded", "io-driver", "io-util" ] } diff --git a/src/capability.rs b/src/capability.rs index 7c66e1a..7b434de 100644 --- a/src/capability.rs +++ b/src/capability.rs @@ -50,13 +50,14 @@ impl Capabilities { /// Change our process capabilities. This does not include the bounding set. pub fn capset(&self) -> io::Result<()> { - #![allow(dead_code)] // kernel abi: + #[allow(dead_code)] struct Header { version: u32, pid: c_int, } + #[allow(dead_code)] struct Data { effective: u32, permitted: u32, diff --git a/src/executor.rs b/src/executor.rs deleted file mode 100644 index ff29981..0000000 --- a/src/executor.rs +++ /dev/null @@ -1,237 +0,0 @@ -use std::cell::RefCell; -use std::collections::VecDeque; -use std::future::Future; -use std::io; -use std::pin::Pin; -use std::sync::{Arc, Condvar, Mutex, Weak}; -use std::task::{Context, Poll}; -use std::thread::JoinHandle; - -type BoxFut = Box + Send + 'static>; - -#[derive(Clone)] -struct Task(Arc); - -impl Task { - fn into_raw(this: Task) -> *const TaskInner { - Arc::into_raw(this.0) - } - - unsafe fn from_raw(ptr: *const TaskInner) -> Self { - Self(Arc::from_raw(ptr)) - } - - fn wake(self) { - if let Some(queue) = self.0.queue.upgrade() { - queue.queue(self); - } - } - - fn into_raw_waker(this: Task) -> std::task::RawWaker { - std::task::RawWaker::new( - Task::into_raw(this) as *const (), - &std::task::RawWakerVTable::new( - waker_clone_fn, - waker_wake_fn, - waker_wake_by_ref_fn, - waker_drop_fn, - ), - ) - } -} - -struct TaskInner { - future: Mutex>, - queue: Weak, -} - -struct TaskQueue { - queue: Mutex>, - queue_cv: Condvar, -} - -impl TaskQueue { - fn new() -> Self { - Self { - queue: Mutex::new(VecDeque::with_capacity(32)), - queue_cv: Condvar::new(), - } - } - - fn new_task(self: Arc, future: BoxFut) { - let task = Task(Arc::new(TaskInner { - future: Mutex::new(Some(future)), - queue: Arc::downgrade(&self), - })); - - self.queue(task); - } - - fn queue(&self, task: Task) { - let mut queue = self.queue.lock().unwrap(); - queue.push_back(task); - self.queue_cv.notify_one(); - } - - /// Blocks until a task is available - fn get_task(&self) -> Task { - let mut queue = self.queue.lock().unwrap(); - loop { - if let Some(task) = queue.pop_front() { - return task; - } else { - queue = self.queue_cv.wait(queue).unwrap(); - } - } - } -} - -pub struct ThreadPool { - _threads: Mutex>>, - queue: Arc, -} - -impl ThreadPool { - pub fn new() -> io::Result { - let count = num_cpus()?; - - let queue = Arc::new(TaskQueue::new()); - - let mut threads = Vec::new(); - for thread_id in 0..count { - threads.push(std::thread::spawn({ - let queue = Arc::clone(&queue); - move || thread_main(queue, thread_id) - })); - } - - Ok(Self { - _threads: Mutex::new(threads), - queue, - }) - } - - pub fn spawn_ok(&self, future: T) - where - T: Future + Send + 'static, - { - self.do_spawn(Box::new(future)); - } - - fn do_spawn(&self, future: BoxFut) { - Arc::clone(&self.queue).new_task(future); - } - - pub fn run(&self, future: T) -> R - where - T: Future + Send + 'static, - R: Send + 'static, - { - let mutex: Arc>> = Arc::new(Mutex::new(None)); - let cv = Arc::new(Condvar::new()); - let mut guard = mutex.lock().unwrap(); - self.spawn_ok({ - let mutex = Arc::clone(&mutex); - let cv = Arc::clone(&cv); - async move { - let result = future.await; - *(mutex.lock().unwrap()) = Some(result); - cv.notify_all(); - } - }); - loop { - guard = cv.wait(guard).unwrap(); - if let Some(result) = guard.take() { - return result; - } - } - } -} - -thread_local! { - static CURRENT_QUEUE: RefCell<*const TaskQueue> = RefCell::new(std::ptr::null()); - static CURRENT_TASK: RefCell<*const Task> = RefCell::new(std::ptr::null()); -} - -fn thread_main(task_queue: Arc, _thread_id: usize) { - CURRENT_QUEUE.with(|q| *q.borrow_mut() = task_queue.as_ref() as *const TaskQueue); - - let local_waker = unsafe { - std::task::Waker::from_raw(std::task::RawWaker::new( - std::ptr::null(), - &std::task::RawWakerVTable::new( - local_waker_clone_fn, - local_waker_wake_fn, - local_waker_wake_fn, - local_waker_drop_fn, - ), - )) - }; - - let mut context = Context::from_waker(&local_waker); - - loop { - let task: Task = task_queue.get_task(); - let task: Pin<&Task> = Pin::new(&task); - let task = task.get_ref(); - CURRENT_TASK.with(|c| *c.borrow_mut() = task as *const Task); - - let mut task_future = task.0.future.lock().unwrap(); - match task_future.take() { - Some(mut future) => { - //eprintln!("Thread {} has some work!", thread_id); - let pin = unsafe { Pin::new_unchecked(&mut *future) }; - match pin.poll(&mut context) { - Poll::Ready(()) => (), // done with that task - Poll::Pending => { - *task_future = Some(future); - } - } - } - None => eprintln!("task polled after ready"), - } - } -} - -unsafe fn local_waker_clone_fn(_: *const ()) -> std::task::RawWaker { - let task: Task = CURRENT_TASK.with(|t| Task::clone(&**t.borrow())); - Task::into_raw_waker(task) -} - -unsafe fn local_waker_wake_fn(_: *const ()) { - let task: Task = CURRENT_TASK.with(|t| Task::clone(&**t.borrow())); - CURRENT_QUEUE.with(|q| (**q.borrow()).queue(task)); -} - -unsafe fn local_waker_drop_fn(_: *const ()) {} - -unsafe fn waker_clone_fn(this: *const ()) -> std::task::RawWaker { - let this = Task::from_raw(this as *const TaskInner); - let clone = this.clone(); - let _ = Task::into_raw(this); - Task::into_raw_waker(clone) -} - -unsafe fn waker_wake_fn(this: *const ()) { - let this = Task::from_raw(this as *const TaskInner); - this.wake(); -} - -unsafe fn waker_wake_by_ref_fn(this: *const ()) { - let this = Task::from_raw(this as *const TaskInner); - this.clone().wake(); - let _ = Task::into_raw(this); -} - -unsafe fn waker_drop_fn(this: *const ()) { - let _this = Task::from_raw(this as *const TaskInner); -} - -fn num_cpus() -> io::Result { - let rc = unsafe { libc::sysconf(libc::_SC_NPROCESSORS_ONLN) }; - if rc < 0 { - Err(io::Error::last_os_error()) - } else { - Ok(rc as usize) - } -} diff --git a/src/fork.rs b/src/fork.rs index be87f6c..405be8b 100644 --- a/src/fork.rs +++ b/src/fork.rs @@ -9,6 +9,8 @@ use std::os::raw::c_int; use std::os::unix::io::{FromRawFd, IntoRawFd}; use std::panic::UnwindSafe; +use tokio::io::AsyncReadExt; + use crate::io::pipe::{self, Pipe}; use crate::syscall::SyscallStatus; use crate::tools::Fd; diff --git a/src/io/epoll.rs b/src/io/epoll.rs deleted file mode 100644 index 8750f10..0000000 --- a/src/io/epoll.rs +++ /dev/null @@ -1,84 +0,0 @@ -use std::convert::TryFrom; -use std::io; -use std::os::raw::c_int; -use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; -use std::time::Duration; - -use crate::error::io_err_other; -use crate::tools::Fd; - -pub type EpollEvent = libc::epoll_event; - -pub const EPOLLIN: u32 = libc::EPOLLIN as u32; -pub const EPOLLET: u32 = libc::EPOLLET as u32; -pub const EPOLLOUT: u32 = libc::EPOLLOUT as u32; -pub const EPOLLERR: u32 = libc::EPOLLERR as u32; -pub const EPOLLHUP: u32 = libc::EPOLLHUP as u32; - -pub struct Epoll { - fd: Fd, -} - -impl Epoll { - pub fn new() -> io::Result { - let fd = unsafe { Fd::from_raw_fd(c_try!(libc::epoll_create1(libc::EPOLL_CLOEXEC))) }; - Ok(Self { fd }) - } - - pub fn add_file(&self, fd: &T, events: u32, data: u64) -> io::Result<()> { - self.add_fd(fd.as_raw_fd(), events, data) - } - - pub fn modify_file(&self, fd: &T, events: u32, data: u64) -> io::Result<()> { - self.modify_fd(fd.as_raw_fd(), events, data) - } - - pub fn remove_file(&self, fd: &T) -> io::Result<()> { - self.remove_fd(fd.as_raw_fd()) - } - - fn addmod_fd(&self, op: c_int, fd: RawFd, events: u32, data: u64) -> io::Result<()> { - let mut events = libc::epoll_event { - events, - r#u64: data, - }; - c_try!(unsafe { libc::epoll_ctl(self.fd.as_raw_fd(), op, fd, &mut events) }); - Ok(()) - } - - pub fn add_fd(&self, fd: RawFd, events: u32, data: u64) -> io::Result<()> { - self.addmod_fd(libc::EPOLL_CTL_ADD, fd, events, data) - } - - pub fn modify_fd(&self, fd: RawFd, events: u32, data: u64) -> io::Result<()> { - self.addmod_fd(libc::EPOLL_CTL_MOD, fd, events, data) - } - - pub fn remove_fd(&self, fd: RawFd) -> io::Result<()> { - c_try!(unsafe { - libc::epoll_ctl( - self.fd.as_raw_fd(), - libc::EPOLL_CTL_DEL, - fd, - std::ptr::null_mut(), - ) - }); - Ok(()) - } - - pub fn wait( - &self, - event_buf: &mut [EpollEvent], - timeout: Option, - ) -> io::Result { - let millis = timeout - .map(|t| c_int::try_from(t.as_millis())) - .transpose() - .map_err(io_err_other)? - .unwrap_or(-1); - let epfd = self.fd.as_raw_fd(); - let buf_len = c_int::try_from(event_buf.len()).map_err(io_err_other)?; - let rc = c_try!(unsafe { libc::epoll_wait(epfd, event_buf.as_mut_ptr(), buf_len, millis) }); - Ok(rc as usize) - } -} diff --git a/src/io/mod.rs b/src/io/mod.rs index 9414915..2cf0cb9 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -1,7 +1,6 @@ pub mod cmsg; -pub mod epoll; pub mod iovec; pub mod pipe; -pub mod reactor; +pub mod polled_fd; pub mod rw_traits; pub mod seq_packet; diff --git a/src/io/pipe.rs b/src/io/pipe.rs index 33ee6ea..1335f1a 100644 --- a/src/io/pipe.rs +++ b/src/io/pipe.rs @@ -2,12 +2,14 @@ use std::convert::TryFrom; use std::io; use std::marker::PhantomData; use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; +use std::pin::Pin; use std::task::{Context, Poll}; +use tokio::io::{AsyncRead, AsyncWrite}; + use crate::error::io_err_other; -use crate::io::reactor::PolledFd; +use crate::io::polled_fd::PolledFd; use crate::io::rw_traits; -use crate::poll_fn::poll_fn; use crate::tools::Fd; pub use rw_traits::{Read, Write}; @@ -37,58 +39,53 @@ pub fn pipe() -> io::Result<(Pipe, Pipe)> { c_try!(unsafe { libc::pipe2(pfd.as_mut_ptr(), libc::O_CLOEXEC) }); let (fd_in, fd_out) = unsafe { (Fd::from_raw_fd(pfd[0]), Fd::from_raw_fd(pfd[1])) }; - let fd_in = PolledFd::new(fd_in)?; - let fd_out = PolledFd::new(fd_out)?; Ok(( Pipe { - fd: fd_in, + fd: PolledFd::new(fd_in)?, _phantom: PhantomData, }, Pipe { - fd: fd_out, + fd: PolledFd::new(fd_out)?, _phantom: PhantomData, }, )) } -impl Pipe { - pub fn poll_read(&mut self, cx: &mut Context, data: &mut [u8]) -> Poll> { - let size = libc::size_t::try_from(data.len()).map_err(io_err_other)?; - let fd = self.fd.as_raw_fd(); +impl AsyncRead for Pipe { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { self.fd.wrap_read(cx, || { - c_result!(unsafe { libc::read(fd, data.as_mut_ptr() as *mut libc::c_void, size) }) + let fd = self.as_raw_fd(); + let size = libc::size_t::try_from(buf.len()).map_err(io_err_other)?; + c_result!(unsafe { libc::read(fd, buf.as_mut_ptr() as *mut libc::c_void, size) }) .map(|res| res as usize) }) } - - pub async fn read(&mut self, data: &mut [u8]) -> io::Result { - poll_fn(move |cx| self.poll_read(cx, data)).await - } - - pub async fn read_exact(&mut self, mut data: &mut [u8]) -> io::Result<()> { - while !data.is_empty() { - match self.read(&mut data[..]).await { - Ok(got) => data = &mut data[got..], - Err(ref err) if err.kind() == io::ErrorKind::Interrupted => continue, - Err(err) => return Err(err), - } - } - Ok(()) - } } -impl Pipe { - pub fn poll_write(&mut self, data: &[u8], cx: &mut Context) -> Poll> { - let fd = self.fd.as_raw_fd(); - let size = libc::size_t::try_from(data.len()).map_err(io_err_other)?; +impl AsyncWrite for Pipe { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { self.fd.wrap_write(cx, || { - c_result!(unsafe { libc::write(fd, data.as_ptr() as *const libc::c_void, size) }) + let fd = self.as_raw_fd(); + let size = libc::size_t::try_from(buf.len()).map_err(io_err_other)?; + c_result!(unsafe { libc::write(fd, buf.as_ptr() as *const libc::c_void, size) }) .map(|res| res as usize) }) } - pub async fn write(&mut self, data: &[u8]) -> io::Result { - poll_fn(move |cx| self.poll_write(data, cx)).await + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } } diff --git a/src/io/polled_fd.rs b/src/io/polled_fd.rs new file mode 100644 index 0000000..8a17d76 --- /dev/null +++ b/src/io/polled_fd.rs @@ -0,0 +1,134 @@ +use std::io; +use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; +use std::task::{Context, Poll}; + +use mio::event::Evented; +use mio::unix::EventedFd as MioEventedFd; +use mio::Poll as MioPoll; +use mio::{PollOpt, Ready, Token}; +use tokio::io::PollEvented; + +use crate::tools::Fd; + +#[repr(transparent)] +pub struct EventedFd { + fd: Fd, +} + +impl EventedFd { + #[inline] + pub fn new(fd: Fd) -> Self { + Self { fd } + } +} + +impl AsRawFd for EventedFd { + #[inline] + fn as_raw_fd(&self) -> RawFd { + self.fd.as_raw_fd() + } +} + +impl FromRawFd for EventedFd { + #[inline] + unsafe fn from_raw_fd(fd: RawFd) -> Self { + Self::new(Fd::from_raw_fd(fd)) + } +} + +impl IntoRawFd for EventedFd { + #[inline] + fn into_raw_fd(self) -> RawFd { + self.fd.into_raw_fd() + } +} + +impl Evented for EventedFd { + fn register( + &self, + poll: &MioPoll, + token: Token, + interest: Ready, + opts: PollOpt, + ) -> io::Result<()> { + MioEventedFd(self.fd.as_ref()).register(poll, token, interest, opts) + } + + fn reregister( + &self, + poll: &MioPoll, + token: Token, + interest: Ready, + opts: PollOpt, + ) -> io::Result<()> { + MioEventedFd(self.fd.as_ref()).reregister(poll, token, interest, opts) + } + + fn deregister(&self, poll: &MioPoll) -> io::Result<()> { + MioEventedFd(self.fd.as_ref()).deregister(poll) + } +} + +#[repr(transparent)] +pub struct PolledFd { + fd: PollEvented, +} + +impl PolledFd { + pub fn new(fd: Fd) -> tokio::io::Result { + Ok(Self { + fd: PollEvented::new(EventedFd::new(fd))?, + }) + } + + pub fn wrap_read( + &self, + cx: &mut Context, + func: impl FnOnce() -> io::Result, + ) -> Poll> { + ready!(self.fd.poll_read_ready(cx, mio::Ready::readable()))?; + match func() { + Ok(out) => Poll::Ready(Ok(out)), + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { + self.fd.clear_read_ready(cx, mio::Ready::readable())?; + Poll::Pending + } + Err(err) => Poll::Ready(Err(err)), + } + } + + pub fn wrap_write( + &self, + cx: &mut Context, + func: impl FnOnce() -> io::Result, + ) -> Poll> { + ready!(self.fd.poll_write_ready(cx))?; + match func() { + Ok(out) => Poll::Ready(Ok(out)), + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { + self.fd.clear_write_ready(cx)?; + Poll::Pending + } + Err(err) => Poll::Ready(Err(err)), + } + } +} + +impl AsRawFd for PolledFd { + #[inline] + fn as_raw_fd(&self) -> RawFd { + self.fd.get_ref().as_raw_fd() + } +} + +impl IntoRawFd for PolledFd { + #[inline] + fn into_raw_fd(self) -> RawFd { + // for the kind of resource we're managing it should always be possible to extract it from + // its driver + self.fd + .into_inner() + .expect("failed to remove polled file descriptor from reactor") + .into_raw_fd() + } +} diff --git a/src/io/reactor.rs b/src/io/reactor.rs deleted file mode 100644 index a27ba3d..0000000 --- a/src/io/reactor.rs +++ /dev/null @@ -1,227 +0,0 @@ -use std::io; -use std::os::unix::io::{AsRawFd, IntoRawFd, RawFd}; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex, Once}; -use std::task::{Context, Poll, Waker}; -use std::thread::JoinHandle; - -use crate::error::io_err_other; -use crate::io::epoll::{Epoll, EpollEvent, EPOLLERR, EPOLLET, EPOLLHUP, EPOLLIN, EPOLLOUT}; -use crate::tools::Fd; - -static START: Once = Once::new(); -static mut REACTOR: Option> = None; - -pub fn default_reactor() -> Arc { - START.call_once(|| unsafe { - let reactor = Reactor::new().expect("setup main epoll reactor"); - REACTOR = Some(reactor); - }); - unsafe { Arc::clone(REACTOR.as_ref().unwrap()) } -} - -pub struct Reactor { - epoll: Arc, - removed: Mutex>>, - thread: Mutex>>, -} - -impl Reactor { - pub fn new() -> io::Result> { - let epoll = Arc::new(Epoll::new()?); - - let this = Arc::new(Reactor { - epoll, - removed: Mutex::new(Vec::new()), - thread: Mutex::new(None), - }); - - let handle = std::thread::spawn({ - let this = Arc::clone(&this); - move || this.thread_main() - }); - - this.thread.lock().unwrap().replace(handle); - - Ok(this) - } - - fn thread_main(self: Arc) { - let mut buf: [EpollEvent; 16] = unsafe { std::mem::zeroed() }; - loop { - let count = match self.epoll.wait(&mut buf, None) { - Ok(count) => count, - Err(err) => { - eprintln!("error in epoll loop: {}", err); - std::process::exit(1); - } - }; - for i in 0..count { - self.handle_event(&buf[i]); - } - // After going through the events we can release memory associated with already closed - // file descriptors: - self.removed.lock().unwrap().clear(); - } - } - - fn handle_event(&self, event: &EpollEvent) { - let registration = unsafe { &mut *(event.r#u64 as *mut RegistrationInner) }; - if registration.gone.load(Ordering::Acquire) { - return; - } - - if 0 != (event.events & EPOLLIN) { - //let _prev = registration.ready.fetch_or(READY_IN, Ordering::AcqRel); - if let Some(waker) = registration.read_waker.lock().unwrap().take() { - waker.wake(); - } - } - - if 0 != (event.events & EPOLLOUT) { - //let _prev = registration.ready.fetch_or(READY_OUT, Ordering::AcqRel); - if let Some(waker) = registration.write_waker.lock().unwrap().take() { - waker.wake(); - } - } - - if 0 != (event.events & (EPOLLERR | EPOLLHUP)) { - //let _prev = registration.ready.fetch_or(READY_ERR, Ordering::AcqRel); - if let Some(waker) = registration.read_waker.lock().unwrap().take() { - waker.wake(); - } - if let Some(waker) = registration.write_waker.lock().unwrap().take() { - waker.wake(); - } - } - } - - pub fn register(self: Arc, fd: RawFd) -> io::Result { - let mut inner = Box::new(RegistrationInner { - gone: AtomicBool::new(false), - reactor: Arc::clone(&self), - read_waker: Mutex::new(None), - write_waker: Mutex::new(None), - }); - - let inner_ptr = { - // type check/assertion - let inner_ptr: &mut RegistrationInner = &mut *inner; - // make raw pointer - inner_ptr as *mut RegistrationInner as usize as u64 - }; - - self.epoll - .add_fd(fd, EPOLLIN | EPOLLOUT | EPOLLET, inner_ptr)?; - - Ok(Registration { inner: Some(inner) }) - } - - fn deregister(&self, registration: Box) { - self.removed.lock().unwrap().push(registration); - } -} - -pub struct Registration { - // pin the data in memory because the other thread will access it - // ManuallyDrop::take is nightly only :< - inner: Option>, -} - -impl Drop for Registration { - fn drop(&mut self) { - if let Some(inner) = self.inner.take() { - let reactor = Arc::clone(&inner.reactor); - inner.gone.store(true, Ordering::Release); - reactor.deregister(inner); - } - } -} - -// This is accessed by the reactor -struct RegistrationInner { - gone: AtomicBool, - reactor: Arc, - read_waker: Mutex>, - write_waker: Mutex>, -} - -pub struct PolledFd { - fd: Fd, - registration: Registration, -} - -impl AsRawFd for PolledFd { - #[inline] - fn as_raw_fd(&self) -> RawFd { - self.fd.as_raw_fd() - } -} - -impl IntoRawFd for PolledFd { - fn into_raw_fd(mut self) -> RawFd { - let registration = self.registration.inner.take().unwrap(); - registration - .reactor - .epoll - .remove_fd(self.as_raw_fd()) - .expect("cannot remove PolledFd from epoll instance"); - self.fd.into_raw_fd() - } -} - -impl PolledFd { - pub fn new(mut fd: Fd) -> io::Result { - fd.set_nonblocking(true).map_err(io_err_other)?; - Self::new_with_reactor(fd, self::default_reactor()) - } - - pub fn new_with_reactor(fd: Fd, reactor: Arc) -> io::Result { - let registration = reactor.register(fd.as_raw_fd())?; - Ok(Self { fd, registration }) - } - - pub fn wrap_read(&self, cx: &mut Context, func: F) -> Poll> - where - F: FnOnce() -> io::Result, - { - let mut read_waker = self - .registration - .inner - .as_ref() - .unwrap() - .read_waker - .lock() - .unwrap(); - match func() { - Ok(out) => Poll::Ready(Ok(out)), - Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { - *read_waker = Some(cx.waker().clone()); - Poll::Pending - } - Err(err) => Poll::Ready(Err(err)), - } - } - - pub fn wrap_write(&self, cx: &mut Context, func: F) -> Poll> - where - F: FnOnce() -> io::Result, - { - let mut write_waker = self - .registration - .inner - .as_ref() - .unwrap() - .write_waker - .lock() - .unwrap(); - match func() { - Ok(out) => Poll::Ready(Ok(out)), - Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { - *write_waker = Some(cx.waker().clone()); - Poll::Pending - } - Err(err) => Poll::Ready(Err(err)), - } - } -} diff --git a/src/io/seq_packet.rs b/src/io/seq_packet.rs index fee47cb..aa5a62e 100644 --- a/src/io/seq_packet.rs +++ b/src/io/seq_packet.rs @@ -6,7 +6,7 @@ use failure::Error; use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType}; use crate::io::iovec::{IoVec, IoVecMut}; -use crate::io::reactor::PolledFd; +use crate::io::polled_fd::PolledFd; use crate::poll_fn::poll_fn; use crate::tools::AssertSendSync; use crate::tools::Fd; @@ -44,7 +44,7 @@ impl SeqPacketListener { } pub fn poll_accept(&mut self, cx: &mut Context) -> Poll> { - let fd = self.fd.as_raw_fd(); + let fd = self.as_raw_fd(); let res = self.fd.wrap_read(cx, || { c_result!(unsafe { libc::accept4(fd, ptr::null_mut(), ptr::null_mut(), libc::SOCK_CLOEXEC) diff --git a/src/macros.rs b/src/macros.rs index 4054661..5cc9922 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -77,3 +77,13 @@ macro_rules! io_bail { return Err(::std::io::Error::new(::std::io::ErrorKind::Other, format!($($msg)*))); }; } + +macro_rules! ready { + ($expr:expr) => {{ + use std::task::Poll; + match $expr { + Poll::Ready(v) => v, + Poll::Pending => return Poll::Pending, + } + }}; +} diff --git a/src/main.rs b/src/main.rs index 301a6ae..1874f6e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,7 +11,6 @@ pub mod apparmor; pub mod capability; pub mod client; pub mod error; -pub mod executor; pub mod fork; pub mod io; pub mod lxcseccomp; @@ -26,24 +25,14 @@ pub mod tools; use crate::io::seq_packet::SeqPacketListener; -static mut EXECUTOR: *mut executor::ThreadPool = std::ptr::null_mut(); - -pub fn executor() -> &'static executor::ThreadPool { - unsafe { &*EXECUTOR } -} - pub fn spawn(fut: impl Future + Send + 'static) { - executor().spawn_ok(fut) + tokio::spawn(fut); } fn main() { - let mut executor = executor::ThreadPool::new().expect("spawning worker threadpool"); - unsafe { - EXECUTOR = &mut executor; - } - std::sync::atomic::fence(std::sync::atomic::Ordering::Release); + let mut rt = tokio::runtime::Runtime::new().expect("failed to spawn tokio runtime"); - if let Err(err) = executor.run(do_main()) { + if let Err(err) = rt.block_on(do_main()) { eprintln!("error: {}", err); std::process::exit(1); } diff --git a/src/tools.rs b/src/tools.rs index 8ed3d50..01a32b5 100644 --- a/src/tools.rs +++ b/src/tools.rs @@ -29,6 +29,13 @@ impl Fd { } } +impl AsRef for Fd { + #[inline] + fn as_ref(&self) -> &RawFd { + &self.0 + } +} + /// Byte vector utilities. pub mod vec { /// Create an uninitialized byte vector of a specific size.