use std::convert::TryFrom;
use std::io;
use std::os::unix::io::{AsRawFd, RawFd};
+use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::thread::JoinHandle;
pub struct Reactor {
epoll: Arc<Epoll>,
- removed: Mutex<Vec<(RawFd, Box<RegistrationInner>)>>,
+ removed: Mutex<Vec<Box<RegistrationInner>>>,
thread: Mutex<Option<JoinHandle<()>>>,
}
for i in 0..count {
self.handle_event(&buf[i]);
}
+ // After going through the events we can release memory associated with already closed
+ // file descriptors:
+ self.removed.lock().unwrap().clear();
}
}
fn handle_event(&self, event: &EpollEvent) {
let registration = unsafe { &mut *(event.r#u64 as *mut RegistrationInner) };
- for (fd, _) in self.removed.lock().unwrap().iter() {
- // This fd is already being dropped, don't touch it!
- if *fd == registration.fd {
- return;
- }
+ if registration.gone.load(Ordering::Acquire) {
+ return;
}
if 0 != (event.events & EPOLLIN) {
pub fn register(self: Arc<Self>, fd: RawFd) -> io::Result<Registration> {
let mut inner = Box::new(RegistrationInner {
- fd,
- //ready: AtomicU32::new(0),
+ gone: AtomicBool::new(false),
reactor: Arc::clone(&self),
read_waker: Mutex::new(None),
write_waker: Mutex::new(None),
}
fn deregister(&self, registration: Box<RegistrationInner>) {
- self.removed
- .lock()
- .unwrap()
- .push((registration.fd, registration));
+ self.removed.lock().unwrap().push(registration);
}
}
impl Drop for Registration {
fn drop(&mut self) {
- let reactor = Arc::clone(&self.inner.as_ref().unwrap().reactor);
+ let inner = self.inner.as_ref().unwrap();
+ let reactor = Arc::clone(&inner.reactor);
+ inner.gone.store(true, Ordering::Release);
reactor.deregister(self.inner.take().unwrap());
}
}
// This is accessed by the reactor
struct RegistrationInner {
- fd: RawFd,
- //ready: AtomicU32,
+ gone: AtomicBool,
reactor: Arc<Reactor>,
read_waker: Mutex<Option<Waker>>,
write_waker: Mutex<Option<Waker>>,
}
}
+impl Fd {
+ pub fn set_nonblocking(&self, nb: bool) -> std::io::Result<()> {
+ let fd = self.as_raw_fd();
+ let flags = c_try!(unsafe { libc::fcntl(fd, libc::F_GETFL) });
+ let flags = if nb {
+ flags | libc::O_NONBLOCK
+ } else {
+ flags & !libc::O_NONBLOCK
+ };
+ c_try!(unsafe { libc::fcntl(fd, libc::F_SETFL, flags) });
+ Ok(())
+ }
+}
+
/// Byte vector utilities.
pub mod vec {
/// Create an uninitialized byte vector of a specific size.