]> git.proxmox.com Git - pve-lxc-syscalld.git/blobdiff - src/io/reactor.rs
switch back to tokio now that it's stable and packaged
[pve-lxc-syscalld.git] / src / io / reactor.rs
diff --git a/src/io/reactor.rs b/src/io/reactor.rs
deleted file mode 100644 (file)
index a27ba3d..0000000
+++ /dev/null
@@ -1,227 +0,0 @@
-use std::io;
-use std::os::unix::io::{AsRawFd, IntoRawFd, RawFd};
-use std::sync::atomic::{AtomicBool, Ordering};
-use std::sync::{Arc, Mutex, Once};
-use std::task::{Context, Poll, Waker};
-use std::thread::JoinHandle;
-
-use crate::error::io_err_other;
-use crate::io::epoll::{Epoll, EpollEvent, EPOLLERR, EPOLLET, EPOLLHUP, EPOLLIN, EPOLLOUT};
-use crate::tools::Fd;
-
-static START: Once = Once::new();
-static mut REACTOR: Option<Arc<Reactor>> = None;
-
-pub fn default_reactor() -> Arc<Reactor> {
-    START.call_once(|| unsafe {
-        let reactor = Reactor::new().expect("setup main epoll reactor");
-        REACTOR = Some(reactor);
-    });
-    unsafe { Arc::clone(REACTOR.as_ref().unwrap()) }
-}
-
-pub struct Reactor {
-    epoll: Arc<Epoll>,
-    removed: Mutex<Vec<Box<RegistrationInner>>>,
-    thread: Mutex<Option<JoinHandle<()>>>,
-}
-
-impl Reactor {
-    pub fn new() -> io::Result<Arc<Self>> {
-        let epoll = Arc::new(Epoll::new()?);
-
-        let this = Arc::new(Reactor {
-            epoll,
-            removed: Mutex::new(Vec::new()),
-            thread: Mutex::new(None),
-        });
-
-        let handle = std::thread::spawn({
-            let this = Arc::clone(&this);
-            move || this.thread_main()
-        });
-
-        this.thread.lock().unwrap().replace(handle);
-
-        Ok(this)
-    }
-
-    fn thread_main(self: Arc<Self>) {
-        let mut buf: [EpollEvent; 16] = unsafe { std::mem::zeroed() };
-        loop {
-            let count = match self.epoll.wait(&mut buf, None) {
-                Ok(count) => count,
-                Err(err) => {
-                    eprintln!("error in epoll loop: {}", err);
-                    std::process::exit(1);
-                }
-            };
-            for i in 0..count {
-                self.handle_event(&buf[i]);
-            }
-            // After going through the events we can release memory associated with already closed
-            // file descriptors:
-            self.removed.lock().unwrap().clear();
-        }
-    }
-
-    fn handle_event(&self, event: &EpollEvent) {
-        let registration = unsafe { &mut *(event.r#u64 as *mut RegistrationInner) };
-        if registration.gone.load(Ordering::Acquire) {
-            return;
-        }
-
-        if 0 != (event.events & EPOLLIN) {
-            //let _prev = registration.ready.fetch_or(READY_IN, Ordering::AcqRel);
-            if let Some(waker) = registration.read_waker.lock().unwrap().take() {
-                waker.wake();
-            }
-        }
-
-        if 0 != (event.events & EPOLLOUT) {
-            //let _prev = registration.ready.fetch_or(READY_OUT, Ordering::AcqRel);
-            if let Some(waker) = registration.write_waker.lock().unwrap().take() {
-                waker.wake();
-            }
-        }
-
-        if 0 != (event.events & (EPOLLERR | EPOLLHUP)) {
-            //let _prev = registration.ready.fetch_or(READY_ERR, Ordering::AcqRel);
-            if let Some(waker) = registration.read_waker.lock().unwrap().take() {
-                waker.wake();
-            }
-            if let Some(waker) = registration.write_waker.lock().unwrap().take() {
-                waker.wake();
-            }
-        }
-    }
-
-    pub fn register(self: Arc<Self>, fd: RawFd) -> io::Result<Registration> {
-        let mut inner = Box::new(RegistrationInner {
-            gone: AtomicBool::new(false),
-            reactor: Arc::clone(&self),
-            read_waker: Mutex::new(None),
-            write_waker: Mutex::new(None),
-        });
-
-        let inner_ptr = {
-            // type check/assertion
-            let inner_ptr: &mut RegistrationInner = &mut *inner;
-            // make raw pointer
-            inner_ptr as *mut RegistrationInner as usize as u64
-        };
-
-        self.epoll
-            .add_fd(fd, EPOLLIN | EPOLLOUT | EPOLLET, inner_ptr)?;
-
-        Ok(Registration { inner: Some(inner) })
-    }
-
-    fn deregister(&self, registration: Box<RegistrationInner>) {
-        self.removed.lock().unwrap().push(registration);
-    }
-}
-
-pub struct Registration {
-    // pin the data in memory because the other thread will access it
-    // ManuallyDrop::take is nightly only :<
-    inner: Option<Box<RegistrationInner>>,
-}
-
-impl Drop for Registration {
-    fn drop(&mut self) {
-        if let Some(inner) = self.inner.take() {
-            let reactor = Arc::clone(&inner.reactor);
-            inner.gone.store(true, Ordering::Release);
-            reactor.deregister(inner);
-        }
-    }
-}
-
-// This is accessed by the reactor
-struct RegistrationInner {
-    gone: AtomicBool,
-    reactor: Arc<Reactor>,
-    read_waker: Mutex<Option<Waker>>,
-    write_waker: Mutex<Option<Waker>>,
-}
-
-pub struct PolledFd {
-    fd: Fd,
-    registration: Registration,
-}
-
-impl AsRawFd for PolledFd {
-    #[inline]
-    fn as_raw_fd(&self) -> RawFd {
-        self.fd.as_raw_fd()
-    }
-}
-
-impl IntoRawFd for PolledFd {
-    fn into_raw_fd(mut self) -> RawFd {
-        let registration = self.registration.inner.take().unwrap();
-        registration
-            .reactor
-            .epoll
-            .remove_fd(self.as_raw_fd())
-            .expect("cannot remove PolledFd from epoll instance");
-        self.fd.into_raw_fd()
-    }
-}
-
-impl PolledFd {
-    pub fn new(mut fd: Fd) -> io::Result<Self> {
-        fd.set_nonblocking(true).map_err(io_err_other)?;
-        Self::new_with_reactor(fd, self::default_reactor())
-    }
-
-    pub fn new_with_reactor(fd: Fd, reactor: Arc<Reactor>) -> io::Result<Self> {
-        let registration = reactor.register(fd.as_raw_fd())?;
-        Ok(Self { fd, registration })
-    }
-
-    pub fn wrap_read<T, F>(&self, cx: &mut Context, func: F) -> Poll<io::Result<T>>
-    where
-        F: FnOnce() -> io::Result<T>,
-    {
-        let mut read_waker = self
-            .registration
-            .inner
-            .as_ref()
-            .unwrap()
-            .read_waker
-            .lock()
-            .unwrap();
-        match func() {
-            Ok(out) => Poll::Ready(Ok(out)),
-            Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
-                *read_waker = Some(cx.waker().clone());
-                Poll::Pending
-            }
-            Err(err) => Poll::Ready(Err(err)),
-        }
-    }
-
-    pub fn wrap_write<T, F>(&self, cx: &mut Context, func: F) -> Poll<io::Result<T>>
-    where
-        F: FnOnce() -> io::Result<T>,
-    {
-        let mut write_waker = self
-            .registration
-            .inner
-            .as_ref()
-            .unwrap()
-            .write_waker
-            .lock()
-            .unwrap();
-        match func() {
-            Ok(out) => Poll::Ready(Ok(out)),
-            Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
-                *write_waker = Some(cx.waker().clone());
-                Poll::Pending
-            }
-            Err(err) => Poll::Ready(Err(err)),
-        }
-    }
-}