--- /dev/null
+[source]
+[source.debian-packages]
+directory = "/usr/share/cargo/registry"
+[source.crates-io]
+replace-with = "debian-packages"
[dependencies]
bitflags = "1.2"
-failure = { version = "0.1", default-features = false, features = ["std"] }
+failure = { version = "0.1", default-features = false, features = [ "std" ] }
lazy_static = "1.4"
libc = "0.2"
nix = "0.16"
+mio = "0.6.21"
+tokio = { version = "0.2.9", features = [ "rt-threaded", "io-driver", "io-util" ] }
/// Change our process capabilities. This does not include the bounding set.
pub fn capset(&self) -> io::Result<()> {
- #![allow(dead_code)]
// kernel abi:
+ #[allow(dead_code)]
struct Header {
version: u32,
pid: c_int,
}
+ #[allow(dead_code)]
struct Data {
effective: u32,
permitted: u32,
+++ /dev/null
-use std::cell::RefCell;
-use std::collections::VecDeque;
-use std::future::Future;
-use std::io;
-use std::pin::Pin;
-use std::sync::{Arc, Condvar, Mutex, Weak};
-use std::task::{Context, Poll};
-use std::thread::JoinHandle;
-
-type BoxFut = Box<dyn Future<Output = ()> + Send + 'static>;
-
-#[derive(Clone)]
-struct Task(Arc<TaskInner>);
-
-impl Task {
- fn into_raw(this: Task) -> *const TaskInner {
- Arc::into_raw(this.0)
- }
-
- unsafe fn from_raw(ptr: *const TaskInner) -> Self {
- Self(Arc::from_raw(ptr))
- }
-
- fn wake(self) {
- if let Some(queue) = self.0.queue.upgrade() {
- queue.queue(self);
- }
- }
-
- fn into_raw_waker(this: Task) -> std::task::RawWaker {
- std::task::RawWaker::new(
- Task::into_raw(this) as *const (),
- &std::task::RawWakerVTable::new(
- waker_clone_fn,
- waker_wake_fn,
- waker_wake_by_ref_fn,
- waker_drop_fn,
- ),
- )
- }
-}
-
-struct TaskInner {
- future: Mutex<Option<BoxFut>>,
- queue: Weak<TaskQueue>,
-}
-
-struct TaskQueue {
- queue: Mutex<VecDeque<Task>>,
- queue_cv: Condvar,
-}
-
-impl TaskQueue {
- fn new() -> Self {
- Self {
- queue: Mutex::new(VecDeque::with_capacity(32)),
- queue_cv: Condvar::new(),
- }
- }
-
- fn new_task(self: Arc<TaskQueue>, future: BoxFut) {
- let task = Task(Arc::new(TaskInner {
- future: Mutex::new(Some(future)),
- queue: Arc::downgrade(&self),
- }));
-
- self.queue(task);
- }
-
- fn queue(&self, task: Task) {
- let mut queue = self.queue.lock().unwrap();
- queue.push_back(task);
- self.queue_cv.notify_one();
- }
-
- /// Blocks until a task is available
- fn get_task(&self) -> Task {
- let mut queue = self.queue.lock().unwrap();
- loop {
- if let Some(task) = queue.pop_front() {
- return task;
- } else {
- queue = self.queue_cv.wait(queue).unwrap();
- }
- }
- }
-}
-
-pub struct ThreadPool {
- _threads: Mutex<Vec<JoinHandle<()>>>,
- queue: Arc<TaskQueue>,
-}
-
-impl ThreadPool {
- pub fn new() -> io::Result<Self> {
- let count = num_cpus()?;
-
- let queue = Arc::new(TaskQueue::new());
-
- let mut threads = Vec::new();
- for thread_id in 0..count {
- threads.push(std::thread::spawn({
- let queue = Arc::clone(&queue);
- move || thread_main(queue, thread_id)
- }));
- }
-
- Ok(Self {
- _threads: Mutex::new(threads),
- queue,
- })
- }
-
- pub fn spawn_ok<T>(&self, future: T)
- where
- T: Future<Output = ()> + Send + 'static,
- {
- self.do_spawn(Box::new(future));
- }
-
- fn do_spawn(&self, future: BoxFut) {
- Arc::clone(&self.queue).new_task(future);
- }
-
- pub fn run<R, T>(&self, future: T) -> R
- where
- T: Future<Output = R> + Send + 'static,
- R: Send + 'static,
- {
- let mutex: Arc<Mutex<Option<R>>> = Arc::new(Mutex::new(None));
- let cv = Arc::new(Condvar::new());
- let mut guard = mutex.lock().unwrap();
- self.spawn_ok({
- let mutex = Arc::clone(&mutex);
- let cv = Arc::clone(&cv);
- async move {
- let result = future.await;
- *(mutex.lock().unwrap()) = Some(result);
- cv.notify_all();
- }
- });
- loop {
- guard = cv.wait(guard).unwrap();
- if let Some(result) = guard.take() {
- return result;
- }
- }
- }
-}
-
-thread_local! {
- static CURRENT_QUEUE: RefCell<*const TaskQueue> = RefCell::new(std::ptr::null());
- static CURRENT_TASK: RefCell<*const Task> = RefCell::new(std::ptr::null());
-}
-
-fn thread_main(task_queue: Arc<TaskQueue>, _thread_id: usize) {
- CURRENT_QUEUE.with(|q| *q.borrow_mut() = task_queue.as_ref() as *const TaskQueue);
-
- let local_waker = unsafe {
- std::task::Waker::from_raw(std::task::RawWaker::new(
- std::ptr::null(),
- &std::task::RawWakerVTable::new(
- local_waker_clone_fn,
- local_waker_wake_fn,
- local_waker_wake_fn,
- local_waker_drop_fn,
- ),
- ))
- };
-
- let mut context = Context::from_waker(&local_waker);
-
- loop {
- let task: Task = task_queue.get_task();
- let task: Pin<&Task> = Pin::new(&task);
- let task = task.get_ref();
- CURRENT_TASK.with(|c| *c.borrow_mut() = task as *const Task);
-
- let mut task_future = task.0.future.lock().unwrap();
- match task_future.take() {
- Some(mut future) => {
- //eprintln!("Thread {} has some work!", thread_id);
- let pin = unsafe { Pin::new_unchecked(&mut *future) };
- match pin.poll(&mut context) {
- Poll::Ready(()) => (), // done with that task
- Poll::Pending => {
- *task_future = Some(future);
- }
- }
- }
- None => eprintln!("task polled after ready"),
- }
- }
-}
-
-unsafe fn local_waker_clone_fn(_: *const ()) -> std::task::RawWaker {
- let task: Task = CURRENT_TASK.with(|t| Task::clone(&**t.borrow()));
- Task::into_raw_waker(task)
-}
-
-unsafe fn local_waker_wake_fn(_: *const ()) {
- let task: Task = CURRENT_TASK.with(|t| Task::clone(&**t.borrow()));
- CURRENT_QUEUE.with(|q| (**q.borrow()).queue(task));
-}
-
-unsafe fn local_waker_drop_fn(_: *const ()) {}
-
-unsafe fn waker_clone_fn(this: *const ()) -> std::task::RawWaker {
- let this = Task::from_raw(this as *const TaskInner);
- let clone = this.clone();
- let _ = Task::into_raw(this);
- Task::into_raw_waker(clone)
-}
-
-unsafe fn waker_wake_fn(this: *const ()) {
- let this = Task::from_raw(this as *const TaskInner);
- this.wake();
-}
-
-unsafe fn waker_wake_by_ref_fn(this: *const ()) {
- let this = Task::from_raw(this as *const TaskInner);
- this.clone().wake();
- let _ = Task::into_raw(this);
-}
-
-unsafe fn waker_drop_fn(this: *const ()) {
- let _this = Task::from_raw(this as *const TaskInner);
-}
-
-fn num_cpus() -> io::Result<usize> {
- let rc = unsafe { libc::sysconf(libc::_SC_NPROCESSORS_ONLN) };
- if rc < 0 {
- Err(io::Error::last_os_error())
- } else {
- Ok(rc as usize)
- }
-}
use std::os::unix::io::{FromRawFd, IntoRawFd};
use std::panic::UnwindSafe;
+use tokio::io::AsyncReadExt;
+
use crate::io::pipe::{self, Pipe};
use crate::syscall::SyscallStatus;
use crate::tools::Fd;
+++ /dev/null
-use std::convert::TryFrom;
-use std::io;
-use std::os::raw::c_int;
-use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
-use std::time::Duration;
-
-use crate::error::io_err_other;
-use crate::tools::Fd;
-
-pub type EpollEvent = libc::epoll_event;
-
-pub const EPOLLIN: u32 = libc::EPOLLIN as u32;
-pub const EPOLLET: u32 = libc::EPOLLET as u32;
-pub const EPOLLOUT: u32 = libc::EPOLLOUT as u32;
-pub const EPOLLERR: u32 = libc::EPOLLERR as u32;
-pub const EPOLLHUP: u32 = libc::EPOLLHUP as u32;
-
-pub struct Epoll {
- fd: Fd,
-}
-
-impl Epoll {
- pub fn new() -> io::Result<Self> {
- let fd = unsafe { Fd::from_raw_fd(c_try!(libc::epoll_create1(libc::EPOLL_CLOEXEC))) };
- Ok(Self { fd })
- }
-
- pub fn add_file<T: AsRawFd>(&self, fd: &T, events: u32, data: u64) -> io::Result<()> {
- self.add_fd(fd.as_raw_fd(), events, data)
- }
-
- pub fn modify_file<T: AsRawFd>(&self, fd: &T, events: u32, data: u64) -> io::Result<()> {
- self.modify_fd(fd.as_raw_fd(), events, data)
- }
-
- pub fn remove_file<T: AsRawFd>(&self, fd: &T) -> io::Result<()> {
- self.remove_fd(fd.as_raw_fd())
- }
-
- fn addmod_fd(&self, op: c_int, fd: RawFd, events: u32, data: u64) -> io::Result<()> {
- let mut events = libc::epoll_event {
- events,
- r#u64: data,
- };
- c_try!(unsafe { libc::epoll_ctl(self.fd.as_raw_fd(), op, fd, &mut events) });
- Ok(())
- }
-
- pub fn add_fd(&self, fd: RawFd, events: u32, data: u64) -> io::Result<()> {
- self.addmod_fd(libc::EPOLL_CTL_ADD, fd, events, data)
- }
-
- pub fn modify_fd(&self, fd: RawFd, events: u32, data: u64) -> io::Result<()> {
- self.addmod_fd(libc::EPOLL_CTL_MOD, fd, events, data)
- }
-
- pub fn remove_fd(&self, fd: RawFd) -> io::Result<()> {
- c_try!(unsafe {
- libc::epoll_ctl(
- self.fd.as_raw_fd(),
- libc::EPOLL_CTL_DEL,
- fd,
- std::ptr::null_mut(),
- )
- });
- Ok(())
- }
-
- pub fn wait(
- &self,
- event_buf: &mut [EpollEvent],
- timeout: Option<Duration>,
- ) -> io::Result<usize> {
- let millis = timeout
- .map(|t| c_int::try_from(t.as_millis()))
- .transpose()
- .map_err(io_err_other)?
- .unwrap_or(-1);
- let epfd = self.fd.as_raw_fd();
- let buf_len = c_int::try_from(event_buf.len()).map_err(io_err_other)?;
- let rc = c_try!(unsafe { libc::epoll_wait(epfd, event_buf.as_mut_ptr(), buf_len, millis) });
- Ok(rc as usize)
- }
-}
pub mod cmsg;
-pub mod epoll;
pub mod iovec;
pub mod pipe;
-pub mod reactor;
+pub mod polled_fd;
pub mod rw_traits;
pub mod seq_packet;
use std::io;
use std::marker::PhantomData;
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
+use std::pin::Pin;
use std::task::{Context, Poll};
+use tokio::io::{AsyncRead, AsyncWrite};
+
use crate::error::io_err_other;
-use crate::io::reactor::PolledFd;
+use crate::io::polled_fd::PolledFd;
use crate::io::rw_traits;
-use crate::poll_fn::poll_fn;
use crate::tools::Fd;
pub use rw_traits::{Read, Write};
c_try!(unsafe { libc::pipe2(pfd.as_mut_ptr(), libc::O_CLOEXEC) });
let (fd_in, fd_out) = unsafe { (Fd::from_raw_fd(pfd[0]), Fd::from_raw_fd(pfd[1])) };
- let fd_in = PolledFd::new(fd_in)?;
- let fd_out = PolledFd::new(fd_out)?;
Ok((
Pipe {
- fd: fd_in,
+ fd: PolledFd::new(fd_in)?,
_phantom: PhantomData,
},
Pipe {
- fd: fd_out,
+ fd: PolledFd::new(fd_out)?,
_phantom: PhantomData,
},
))
}
-impl<RW: rw_traits::HasRead> Pipe<RW> {
- pub fn poll_read(&mut self, cx: &mut Context, data: &mut [u8]) -> Poll<io::Result<usize>> {
- let size = libc::size_t::try_from(data.len()).map_err(io_err_other)?;
- let fd = self.fd.as_raw_fd();
+impl<RW: rw_traits::HasRead> AsyncRead for Pipe<RW> {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
self.fd.wrap_read(cx, || {
- c_result!(unsafe { libc::read(fd, data.as_mut_ptr() as *mut libc::c_void, size) })
+ let fd = self.as_raw_fd();
+ let size = libc::size_t::try_from(buf.len()).map_err(io_err_other)?;
+ c_result!(unsafe { libc::read(fd, buf.as_mut_ptr() as *mut libc::c_void, size) })
.map(|res| res as usize)
})
}
-
- pub async fn read(&mut self, data: &mut [u8]) -> io::Result<usize> {
- poll_fn(move |cx| self.poll_read(cx, data)).await
- }
-
- pub async fn read_exact(&mut self, mut data: &mut [u8]) -> io::Result<()> {
- while !data.is_empty() {
- match self.read(&mut data[..]).await {
- Ok(got) => data = &mut data[got..],
- Err(ref err) if err.kind() == io::ErrorKind::Interrupted => continue,
- Err(err) => return Err(err),
- }
- }
- Ok(())
- }
}
-impl<RW: rw_traits::HasWrite> Pipe<RW> {
- pub fn poll_write(&mut self, data: &[u8], cx: &mut Context) -> Poll<io::Result<usize>> {
- let fd = self.fd.as_raw_fd();
- let size = libc::size_t::try_from(data.len()).map_err(io_err_other)?;
+impl<RW: rw_traits::HasWrite> AsyncWrite for Pipe<RW> {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
self.fd.wrap_write(cx, || {
- c_result!(unsafe { libc::write(fd, data.as_ptr() as *const libc::c_void, size) })
+ let fd = self.as_raw_fd();
+ let size = libc::size_t::try_from(buf.len()).map_err(io_err_other)?;
+ c_result!(unsafe { libc::write(fd, buf.as_ptr() as *const libc::c_void, size) })
.map(|res| res as usize)
})
}
- pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
- poll_fn(move |cx| self.poll_write(data, cx)).await
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Poll::Ready(Ok(()))
}
}
--- /dev/null
+use std::io;
+use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
+use std::task::{Context, Poll};
+
+use mio::event::Evented;
+use mio::unix::EventedFd as MioEventedFd;
+use mio::Poll as MioPoll;
+use mio::{PollOpt, Ready, Token};
+use tokio::io::PollEvented;
+
+use crate::tools::Fd;
+
+#[repr(transparent)]
+pub struct EventedFd {
+ fd: Fd,
+}
+
+impl EventedFd {
+ #[inline]
+ pub fn new(fd: Fd) -> Self {
+ Self { fd }
+ }
+}
+
+impl AsRawFd for EventedFd {
+ #[inline]
+ fn as_raw_fd(&self) -> RawFd {
+ self.fd.as_raw_fd()
+ }
+}
+
+impl FromRawFd for EventedFd {
+ #[inline]
+ unsafe fn from_raw_fd(fd: RawFd) -> Self {
+ Self::new(Fd::from_raw_fd(fd))
+ }
+}
+
+impl IntoRawFd for EventedFd {
+ #[inline]
+ fn into_raw_fd(self) -> RawFd {
+ self.fd.into_raw_fd()
+ }
+}
+
+impl Evented for EventedFd {
+ fn register(
+ &self,
+ poll: &MioPoll,
+ token: Token,
+ interest: Ready,
+ opts: PollOpt,
+ ) -> io::Result<()> {
+ MioEventedFd(self.fd.as_ref()).register(poll, token, interest, opts)
+ }
+
+ fn reregister(
+ &self,
+ poll: &MioPoll,
+ token: Token,
+ interest: Ready,
+ opts: PollOpt,
+ ) -> io::Result<()> {
+ MioEventedFd(self.fd.as_ref()).reregister(poll, token, interest, opts)
+ }
+
+ fn deregister(&self, poll: &MioPoll) -> io::Result<()> {
+ MioEventedFd(self.fd.as_ref()).deregister(poll)
+ }
+}
+
+#[repr(transparent)]
+pub struct PolledFd {
+ fd: PollEvented<EventedFd>,
+}
+
+impl PolledFd {
+ pub fn new(fd: Fd) -> tokio::io::Result<Self> {
+ Ok(Self {
+ fd: PollEvented::new(EventedFd::new(fd))?,
+ })
+ }
+
+ pub fn wrap_read<T>(
+ &self,
+ cx: &mut Context,
+ func: impl FnOnce() -> io::Result<T>,
+ ) -> Poll<io::Result<T>> {
+ ready!(self.fd.poll_read_ready(cx, mio::Ready::readable()))?;
+ match func() {
+ Ok(out) => Poll::Ready(Ok(out)),
+ Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
+ self.fd.clear_read_ready(cx, mio::Ready::readable())?;
+ Poll::Pending
+ }
+ Err(err) => Poll::Ready(Err(err)),
+ }
+ }
+
+ pub fn wrap_write<T>(
+ &self,
+ cx: &mut Context,
+ func: impl FnOnce() -> io::Result<T>,
+ ) -> Poll<io::Result<T>> {
+ ready!(self.fd.poll_write_ready(cx))?;
+ match func() {
+ Ok(out) => Poll::Ready(Ok(out)),
+ Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
+ self.fd.clear_write_ready(cx)?;
+ Poll::Pending
+ }
+ Err(err) => Poll::Ready(Err(err)),
+ }
+ }
+}
+
+impl AsRawFd for PolledFd {
+ #[inline]
+ fn as_raw_fd(&self) -> RawFd {
+ self.fd.get_ref().as_raw_fd()
+ }
+}
+
+impl IntoRawFd for PolledFd {
+ #[inline]
+ fn into_raw_fd(self) -> RawFd {
+ // for the kind of resource we're managing it should always be possible to extract it from
+ // its driver
+ self.fd
+ .into_inner()
+ .expect("failed to remove polled file descriptor from reactor")
+ .into_raw_fd()
+ }
+}
+++ /dev/null
-use std::io;
-use std::os::unix::io::{AsRawFd, IntoRawFd, RawFd};
-use std::sync::atomic::{AtomicBool, Ordering};
-use std::sync::{Arc, Mutex, Once};
-use std::task::{Context, Poll, Waker};
-use std::thread::JoinHandle;
-
-use crate::error::io_err_other;
-use crate::io::epoll::{Epoll, EpollEvent, EPOLLERR, EPOLLET, EPOLLHUP, EPOLLIN, EPOLLOUT};
-use crate::tools::Fd;
-
-static START: Once = Once::new();
-static mut REACTOR: Option<Arc<Reactor>> = None;
-
-pub fn default_reactor() -> Arc<Reactor> {
- START.call_once(|| unsafe {
- let reactor = Reactor::new().expect("setup main epoll reactor");
- REACTOR = Some(reactor);
- });
- unsafe { Arc::clone(REACTOR.as_ref().unwrap()) }
-}
-
-pub struct Reactor {
- epoll: Arc<Epoll>,
- removed: Mutex<Vec<Box<RegistrationInner>>>,
- thread: Mutex<Option<JoinHandle<()>>>,
-}
-
-impl Reactor {
- pub fn new() -> io::Result<Arc<Self>> {
- let epoll = Arc::new(Epoll::new()?);
-
- let this = Arc::new(Reactor {
- epoll,
- removed: Mutex::new(Vec::new()),
- thread: Mutex::new(None),
- });
-
- let handle = std::thread::spawn({
- let this = Arc::clone(&this);
- move || this.thread_main()
- });
-
- this.thread.lock().unwrap().replace(handle);
-
- Ok(this)
- }
-
- fn thread_main(self: Arc<Self>) {
- let mut buf: [EpollEvent; 16] = unsafe { std::mem::zeroed() };
- loop {
- let count = match self.epoll.wait(&mut buf, None) {
- Ok(count) => count,
- Err(err) => {
- eprintln!("error in epoll loop: {}", err);
- std::process::exit(1);
- }
- };
- for i in 0..count {
- self.handle_event(&buf[i]);
- }
- // After going through the events we can release memory associated with already closed
- // file descriptors:
- self.removed.lock().unwrap().clear();
- }
- }
-
- fn handle_event(&self, event: &EpollEvent) {
- let registration = unsafe { &mut *(event.r#u64 as *mut RegistrationInner) };
- if registration.gone.load(Ordering::Acquire) {
- return;
- }
-
- if 0 != (event.events & EPOLLIN) {
- //let _prev = registration.ready.fetch_or(READY_IN, Ordering::AcqRel);
- if let Some(waker) = registration.read_waker.lock().unwrap().take() {
- waker.wake();
- }
- }
-
- if 0 != (event.events & EPOLLOUT) {
- //let _prev = registration.ready.fetch_or(READY_OUT, Ordering::AcqRel);
- if let Some(waker) = registration.write_waker.lock().unwrap().take() {
- waker.wake();
- }
- }
-
- if 0 != (event.events & (EPOLLERR | EPOLLHUP)) {
- //let _prev = registration.ready.fetch_or(READY_ERR, Ordering::AcqRel);
- if let Some(waker) = registration.read_waker.lock().unwrap().take() {
- waker.wake();
- }
- if let Some(waker) = registration.write_waker.lock().unwrap().take() {
- waker.wake();
- }
- }
- }
-
- pub fn register(self: Arc<Self>, fd: RawFd) -> io::Result<Registration> {
- let mut inner = Box::new(RegistrationInner {
- gone: AtomicBool::new(false),
- reactor: Arc::clone(&self),
- read_waker: Mutex::new(None),
- write_waker: Mutex::new(None),
- });
-
- let inner_ptr = {
- // type check/assertion
- let inner_ptr: &mut RegistrationInner = &mut *inner;
- // make raw pointer
- inner_ptr as *mut RegistrationInner as usize as u64
- };
-
- self.epoll
- .add_fd(fd, EPOLLIN | EPOLLOUT | EPOLLET, inner_ptr)?;
-
- Ok(Registration { inner: Some(inner) })
- }
-
- fn deregister(&self, registration: Box<RegistrationInner>) {
- self.removed.lock().unwrap().push(registration);
- }
-}
-
-pub struct Registration {
- // pin the data in memory because the other thread will access it
- // ManuallyDrop::take is nightly only :<
- inner: Option<Box<RegistrationInner>>,
-}
-
-impl Drop for Registration {
- fn drop(&mut self) {
- if let Some(inner) = self.inner.take() {
- let reactor = Arc::clone(&inner.reactor);
- inner.gone.store(true, Ordering::Release);
- reactor.deregister(inner);
- }
- }
-}
-
-// This is accessed by the reactor
-struct RegistrationInner {
- gone: AtomicBool,
- reactor: Arc<Reactor>,
- read_waker: Mutex<Option<Waker>>,
- write_waker: Mutex<Option<Waker>>,
-}
-
-pub struct PolledFd {
- fd: Fd,
- registration: Registration,
-}
-
-impl AsRawFd for PolledFd {
- #[inline]
- fn as_raw_fd(&self) -> RawFd {
- self.fd.as_raw_fd()
- }
-}
-
-impl IntoRawFd for PolledFd {
- fn into_raw_fd(mut self) -> RawFd {
- let registration = self.registration.inner.take().unwrap();
- registration
- .reactor
- .epoll
- .remove_fd(self.as_raw_fd())
- .expect("cannot remove PolledFd from epoll instance");
- self.fd.into_raw_fd()
- }
-}
-
-impl PolledFd {
- pub fn new(mut fd: Fd) -> io::Result<Self> {
- fd.set_nonblocking(true).map_err(io_err_other)?;
- Self::new_with_reactor(fd, self::default_reactor())
- }
-
- pub fn new_with_reactor(fd: Fd, reactor: Arc<Reactor>) -> io::Result<Self> {
- let registration = reactor.register(fd.as_raw_fd())?;
- Ok(Self { fd, registration })
- }
-
- pub fn wrap_read<T, F>(&self, cx: &mut Context, func: F) -> Poll<io::Result<T>>
- where
- F: FnOnce() -> io::Result<T>,
- {
- let mut read_waker = self
- .registration
- .inner
- .as_ref()
- .unwrap()
- .read_waker
- .lock()
- .unwrap();
- match func() {
- Ok(out) => Poll::Ready(Ok(out)),
- Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
- *read_waker = Some(cx.waker().clone());
- Poll::Pending
- }
- Err(err) => Poll::Ready(Err(err)),
- }
- }
-
- pub fn wrap_write<T, F>(&self, cx: &mut Context, func: F) -> Poll<io::Result<T>>
- where
- F: FnOnce() -> io::Result<T>,
- {
- let mut write_waker = self
- .registration
- .inner
- .as_ref()
- .unwrap()
- .write_waker
- .lock()
- .unwrap();
- match func() {
- Ok(out) => Poll::Ready(Ok(out)),
- Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
- *write_waker = Some(cx.waker().clone());
- Poll::Pending
- }
- Err(err) => Poll::Ready(Err(err)),
- }
- }
-}
use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType};
use crate::io::iovec::{IoVec, IoVecMut};
-use crate::io::reactor::PolledFd;
+use crate::io::polled_fd::PolledFd;
use crate::poll_fn::poll_fn;
use crate::tools::AssertSendSync;
use crate::tools::Fd;
}
pub fn poll_accept(&mut self, cx: &mut Context) -> Poll<io::Result<SeqPacketSocket>> {
- let fd = self.fd.as_raw_fd();
+ let fd = self.as_raw_fd();
let res = self.fd.wrap_read(cx, || {
c_result!(unsafe {
libc::accept4(fd, ptr::null_mut(), ptr::null_mut(), libc::SOCK_CLOEXEC)
return Err(::std::io::Error::new(::std::io::ErrorKind::Other, format!($($msg)*)));
};
}
+
+macro_rules! ready {
+ ($expr:expr) => {{
+ use std::task::Poll;
+ match $expr {
+ Poll::Ready(v) => v,
+ Poll::Pending => return Poll::Pending,
+ }
+ }};
+}
pub mod capability;
pub mod client;
pub mod error;
-pub mod executor;
pub mod fork;
pub mod io;
pub mod lxcseccomp;
use crate::io::seq_packet::SeqPacketListener;
-static mut EXECUTOR: *mut executor::ThreadPool = std::ptr::null_mut();
-
-pub fn executor() -> &'static executor::ThreadPool {
- unsafe { &*EXECUTOR }
-}
-
pub fn spawn(fut: impl Future<Output = ()> + Send + 'static) {
- executor().spawn_ok(fut)
+ tokio::spawn(fut);
}
fn main() {
- let mut executor = executor::ThreadPool::new().expect("spawning worker threadpool");
- unsafe {
- EXECUTOR = &mut executor;
- }
- std::sync::atomic::fence(std::sync::atomic::Ordering::Release);
+ let mut rt = tokio::runtime::Runtime::new().expect("failed to spawn tokio runtime");
- if let Err(err) = executor.run(do_main()) {
+ if let Err(err) = rt.block_on(do_main()) {
eprintln!("error: {}", err);
std::process::exit(1);
}
}
}
+impl AsRef<RawFd> for Fd {
+ #[inline]
+ fn as_ref(&self) -> &RawFd {
+ &self.0
+ }
+}
+
/// Byte vector utilities.
pub mod vec {
/// Create an uninitialized byte vector of a specific size.