use std::convert::TryFrom;
use std::io;
use std::os::unix::io::{AsRawFd, RawFd};
-use std::sync::atomic::{AtomicU32, Ordering};
-use std::sync::{mpsc, Arc, Mutex};
+use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::thread::JoinHandle;
pub struct Reactor {
epoll: Arc<Epoll>,
removed: Mutex<Vec<(RawFd, Box<RegistrationInner>)>>,
- thread: JoinHandle<()>,
+ thread: Mutex<Option<JoinHandle<()>>>,
}
impl Reactor {
pub fn new() -> io::Result<Arc<Self>> {
let epoll = Arc::new(Epoll::new()?);
+ let this = Arc::new(Reactor {
+ epoll,
+ removed: Mutex::new(Vec::new()),
+ thread: Mutex::new(None),
+ });
+
let handle = std::thread::spawn({
- let epoll = Arc::clone(&epoll);
- move || Self::thread_main(epoll)
+ let this = Arc::clone(&this);
+ move || this.thread_main()
});
- Ok(Arc::new(Self {
- epoll,
- removed: Mutex::new(Vec::new()),
- thread: handle,
- }))
+ this.thread.lock().unwrap().replace(handle);
+
+ Ok(this)
}
- fn thread_main(epoll: Arc<Epoll>) {
+ fn thread_main(self: Arc<Self>) {
let mut buf: [EpollEvent; 16] = unsafe { std::mem::zeroed() };
loop {
- let count = match epoll.wait(&mut buf, None) {
+ let count = match self.epoll.wait(&mut buf, None) {
Ok(count) => count,
Err(err) => {
eprintln!("error in epoll loop: {}", err);
}
};
for i in 0..count {
- Self::handle_event(&buf[i]);
+ self.handle_event(&buf[i]);
}
}
}
- fn handle_event(event: &EpollEvent) {
+ fn handle_event(&self, event: &EpollEvent) {
let registration = unsafe { &mut *(event.r#u64 as *mut RegistrationInner) };
+ for (fd, _) in self.removed.lock().unwrap().iter() {
+ // This fd is already being dropped, don't touch it!
+ if *fd == registration.fd {
+ return;
+ }
+ }
+
if 0 != (event.events & EPOLLIN) {
//let _prev = registration.ready.fetch_or(READY_IN, Ordering::AcqRel);
if let Some(waker) = registration.read_waker.lock().unwrap().take() {