]> git.proxmox.com Git - pve-lxc-syscalld.git/commitdiff
handle dropped fds
authorWolfgang Bumiller <w.bumiller@proxmox.com>
Wed, 30 Oct 2019 11:34:27 +0000 (12:34 +0100)
committerWolfgang Bumiller <w.bumiller@proxmox.com>
Wed, 30 Oct 2019 11:34:27 +0000 (12:34 +0100)
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
src/reactor.rs

index cd95f7ba3d9c7c34c68af48f93b30afd45354a74..0bc33f82cf91822fbfaf96bfcbd0b97078dcaadf 100644 (file)
@@ -1,8 +1,7 @@
 use std::convert::TryFrom;
 use std::io;
 use std::os::unix::io::{AsRawFd, RawFd};
-use std::sync::atomic::{AtomicU32, Ordering};
-use std::sync::{mpsc, Arc, Mutex};
+use std::sync::{Arc, Mutex};
 use std::task::{Context, Poll, Waker};
 use std::thread::JoinHandle;
 
@@ -21,29 +20,33 @@ pub const READY_ERR: u32 = 0b100;
 pub struct Reactor {
     epoll: Arc<Epoll>,
     removed: Mutex<Vec<(RawFd, Box<RegistrationInner>)>>,
-    thread: JoinHandle<()>,
+    thread: Mutex<Option<JoinHandle<()>>>,
 }
 
 impl Reactor {
     pub fn new() -> io::Result<Arc<Self>> {
         let epoll = Arc::new(Epoll::new()?);
 
+        let this = Arc::new(Reactor {
+            epoll,
+            removed: Mutex::new(Vec::new()),
+            thread: Mutex::new(None),
+        });
+
         let handle = std::thread::spawn({
-            let epoll = Arc::clone(&epoll);
-            move || Self::thread_main(epoll)
+            let this = Arc::clone(&this);
+            move || this.thread_main()
         });
 
-        Ok(Arc::new(Self {
-            epoll,
-            removed: Mutex::new(Vec::new()),
-            thread: handle,
-        }))
+        this.thread.lock().unwrap().replace(handle);
+
+        Ok(this)
     }
 
-    fn thread_main(epoll: Arc<Epoll>) {
+    fn thread_main(self: Arc<Self>) {
         let mut buf: [EpollEvent; 16] = unsafe { std::mem::zeroed() };
         loop {
-            let count = match epoll.wait(&mut buf, None) {
+            let count = match self.epoll.wait(&mut buf, None) {
                 Ok(count) => count,
                 Err(err) => {
                     eprintln!("error in epoll loop: {}", err);
@@ -51,13 +54,20 @@ impl Reactor {
                 }
             };
             for i in 0..count {
-                Self::handle_event(&buf[i]);
+                self.handle_event(&buf[i]);
             }
         }
     }
 
-    fn handle_event(event: &EpollEvent) {
+    fn handle_event(&self, event: &EpollEvent) {
         let registration = unsafe { &mut *(event.r#u64 as *mut RegistrationInner) };
+        for (fd, _) in self.removed.lock().unwrap().iter() {
+            // This fd is already being dropped, don't touch it!
+            if *fd == registration.fd {
+                return;
+            }
+        }
+
         if 0 != (event.events & EPOLLIN) {
             //let _prev = registration.ready.fetch_or(READY_IN, Ordering::AcqRel);
             if let Some(waker) = registration.read_waker.lock().unwrap().take() {