From 9ebd1972ebfee6abe21275738c927865a00ddaf2 Mon Sep 17 00:00:00 2001 From: Wolfgang Bumiller Date: Wed, 30 Oct 2019 13:42:29 +0100 Subject: [PATCH] stuff Signed-off-by: Wolfgang Bumiller --- src/reactor.rs | 28 +++++++++++++--------------- src/tools.rs | 14 ++++++++++++++ 2 files changed, 27 insertions(+), 15 deletions(-) diff --git a/src/reactor.rs b/src/reactor.rs index a20807f..2bb5b60 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -1,6 +1,7 @@ use std::convert::TryFrom; use std::io; use std::os::unix::io::{AsRawFd, RawFd}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll, Waker}; use std::thread::JoinHandle; @@ -12,7 +13,7 @@ use crate::tools::Fd; pub struct Reactor { epoll: Arc, - removed: Mutex)>>, + removed: Mutex>>, thread: Mutex>>, } @@ -49,16 +50,16 @@ impl Reactor { for i in 0..count { self.handle_event(&buf[i]); } + // After going through the events we can release memory associated with already closed + // file descriptors: + self.removed.lock().unwrap().clear(); } } fn handle_event(&self, event: &EpollEvent) { let registration = unsafe { &mut *(event.r#u64 as *mut RegistrationInner) }; - for (fd, _) in self.removed.lock().unwrap().iter() { - // This fd is already being dropped, don't touch it! - if *fd == registration.fd { - return; - } + if registration.gone.load(Ordering::Acquire) { + return; } if 0 != (event.events & EPOLLIN) { @@ -88,8 +89,7 @@ impl Reactor { pub fn register(self: Arc, fd: RawFd) -> io::Result { let mut inner = Box::new(RegistrationInner { - fd, - //ready: AtomicU32::new(0), + gone: AtomicBool::new(false), reactor: Arc::clone(&self), read_waker: Mutex::new(None), write_waker: Mutex::new(None), @@ -109,10 +109,7 @@ impl Reactor { } fn deregister(&self, registration: Box) { - self.removed - .lock() - .unwrap() - .push((registration.fd, registration)); + self.removed.lock().unwrap().push(registration); } } @@ -124,15 +121,16 @@ pub struct Registration { impl Drop for Registration { fn drop(&mut self) { - let reactor = Arc::clone(&self.inner.as_ref().unwrap().reactor); + let inner = self.inner.as_ref().unwrap(); + let reactor = Arc::clone(&inner.reactor); + inner.gone.store(true, Ordering::Release); reactor.deregister(self.inner.take().unwrap()); } } // This is accessed by the reactor struct RegistrationInner { - fd: RawFd, - //ready: AtomicU32, + gone: AtomicBool, reactor: Arc, read_waker: Mutex>, write_waker: Mutex>, diff --git a/src/tools.rs b/src/tools.rs index 1ae193a..3d88335 100644 --- a/src/tools.rs +++ b/src/tools.rs @@ -21,6 +21,20 @@ impl FromRawFd for Fd { } } +impl Fd { + pub fn set_nonblocking(&self, nb: bool) -> std::io::Result<()> { + let fd = self.as_raw_fd(); + let flags = c_try!(unsafe { libc::fcntl(fd, libc::F_GETFL) }); + let flags = if nb { + flags | libc::O_NONBLOCK + } else { + flags & !libc::O_NONBLOCK + }; + c_try!(unsafe { libc::fcntl(fd, libc::F_SETFL, flags) }); + Ok(()) + } +} + /// Byte vector utilities. pub mod vec { /// Create an uninitialized byte vector of a specific size. -- 2.39.5