]>
Commit | Line | Data |
---|---|---|
a22aece0 | 1 | use std::convert::TryFrom; |
8962159b | 2 | use std::io; |
a22aece0 | 3 | use std::os::unix::io::{AsRawFd, RawFd}; |
f757af32 | 4 | use std::sync::{Arc, Mutex}; |
a22aece0 | 5 | use std::task::{Context, Poll, Waker}; |
8962159b WB |
6 | use std::thread::JoinHandle; |
7 | ||
a3afcf22 | 8 | use crate::epoll::{Epoll, EpollEvent, EPOLLERR, EPOLLET, EPOLLHUP, EPOLLIN, EPOLLOUT}; |
a22aece0 WB |
9 | use crate::error::io_err_other; |
10 | use crate::poll_fn::poll_fn; | |
11 | use crate::tools::Fd; | |
8962159b | 12 | |
8962159b WB |
13 | pub struct Reactor { |
14 | epoll: Arc<Epoll>, | |
a22aece0 | 15 | removed: Mutex<Vec<(RawFd, Box<RegistrationInner>)>>, |
f757af32 | 16 | thread: Mutex<Option<JoinHandle<()>>>, |
8962159b WB |
17 | } |
18 | ||
19 | impl Reactor { | |
a22aece0 | 20 | pub fn new() -> io::Result<Arc<Self>> { |
8962159b WB |
21 | let epoll = Arc::new(Epoll::new()?); |
22 | ||
f757af32 WB |
23 | let this = Arc::new(Reactor { |
24 | epoll, | |
25 | removed: Mutex::new(Vec::new()), | |
26 | thread: Mutex::new(None), | |
27 | }); | |
28 | ||
8962159b | 29 | let handle = std::thread::spawn({ |
f757af32 WB |
30 | let this = Arc::clone(&this); |
31 | move || this.thread_main() | |
8962159b WB |
32 | }); |
33 | ||
f757af32 WB |
34 | this.thread.lock().unwrap().replace(handle); |
35 | ||
36 | Ok(this) | |
8962159b WB |
37 | } |
38 | ||
f757af32 | 39 | fn thread_main(self: Arc<Self>) { |
a22aece0 | 40 | let mut buf: [EpollEvent; 16] = unsafe { std::mem::zeroed() }; |
d1b1deab | 41 | loop { |
f757af32 | 42 | let count = match self.epoll.wait(&mut buf, None) { |
a22aece0 WB |
43 | Ok(count) => count, |
44 | Err(err) => { | |
45 | eprintln!("error in epoll loop: {}", err); | |
46 | std::process::exit(1); | |
47 | } | |
48 | }; | |
49 | for i in 0..count { | |
f757af32 | 50 | self.handle_event(&buf[i]); |
a22aece0 WB |
51 | } |
52 | } | |
53 | } | |
54 | ||
f757af32 | 55 | fn handle_event(&self, event: &EpollEvent) { |
a22aece0 | 56 | let registration = unsafe { &mut *(event.r#u64 as *mut RegistrationInner) }; |
f757af32 WB |
57 | for (fd, _) in self.removed.lock().unwrap().iter() { |
58 | // This fd is already being dropped, don't touch it! | |
59 | if *fd == registration.fd { | |
60 | return; | |
61 | } | |
62 | } | |
63 | ||
a22aece0 WB |
64 | if 0 != (event.events & EPOLLIN) { |
65 | //let _prev = registration.ready.fetch_or(READY_IN, Ordering::AcqRel); | |
66 | if let Some(waker) = registration.read_waker.lock().unwrap().take() { | |
67 | waker.wake(); | |
68 | } | |
69 | } | |
70 | ||
71 | if 0 != (event.events & EPOLLOUT) { | |
72 | //let _prev = registration.ready.fetch_or(READY_OUT, Ordering::AcqRel); | |
73 | if let Some(waker) = registration.write_waker.lock().unwrap().take() { | |
74 | waker.wake(); | |
75 | } | |
76 | } | |
77 | ||
78 | if 0 != (event.events & (EPOLLERR | EPOLLHUP)) { | |
79 | //let _prev = registration.ready.fetch_or(READY_ERR, Ordering::AcqRel); | |
80 | if let Some(waker) = registration.read_waker.lock().unwrap().take() { | |
81 | waker.wake(); | |
82 | } | |
83 | if let Some(waker) = registration.write_waker.lock().unwrap().take() { | |
84 | waker.wake(); | |
85 | } | |
d1b1deab | 86 | } |
8962159b | 87 | } |
a22aece0 WB |
88 | |
89 | pub fn register(self: Arc<Self>, fd: RawFd) -> io::Result<Registration> { | |
90 | let mut inner = Box::new(RegistrationInner { | |
91 | fd, | |
92 | //ready: AtomicU32::new(0), | |
93 | reactor: Arc::clone(&self), | |
94 | read_waker: Mutex::new(None), | |
95 | write_waker: Mutex::new(None), | |
96 | }); | |
97 | ||
98 | let inner_ptr = { | |
99 | // type check/assertion | |
100 | let inner_ptr: &mut RegistrationInner = &mut *inner; | |
101 | // make raw pointer | |
102 | inner_ptr as *mut RegistrationInner as usize as u64 | |
103 | }; | |
104 | ||
a3afcf22 WB |
105 | self.epoll |
106 | .add_fd(fd, EPOLLIN | EPOLLOUT | EPOLLET, inner_ptr)?; | |
a22aece0 WB |
107 | |
108 | Ok(Registration { inner: Some(inner) }) | |
109 | } | |
110 | ||
111 | fn deregister(&self, registration: Box<RegistrationInner>) { | |
112 | self.removed | |
113 | .lock() | |
114 | .unwrap() | |
115 | .push((registration.fd, registration)); | |
116 | } | |
117 | } | |
118 | ||
119 | pub struct Registration { | |
120 | // pin the data in memory because the other thread will access it | |
121 | // ManuallyDrop::take is nightly only :< | |
122 | inner: Option<Box<RegistrationInner>>, | |
123 | } | |
124 | ||
125 | impl Drop for Registration { | |
126 | fn drop(&mut self) { | |
127 | let reactor = Arc::clone(&self.inner.as_ref().unwrap().reactor); | |
128 | reactor.deregister(self.inner.take().unwrap()); | |
129 | } | |
130 | } | |
131 | ||
132 | // This is accessed by the reactor | |
133 | struct RegistrationInner { | |
134 | fd: RawFd, | |
135 | //ready: AtomicU32, | |
136 | reactor: Arc<Reactor>, | |
137 | read_waker: Mutex<Option<Waker>>, | |
138 | write_waker: Mutex<Option<Waker>>, | |
139 | } | |
140 | ||
141 | pub struct PolledFd { | |
142 | fd: Fd, | |
143 | registration: Registration, | |
144 | } | |
145 | ||
146 | impl PolledFd { | |
147 | pub fn new(fd: Fd, reactor: Arc<Reactor>) -> io::Result<Self> { | |
148 | let registration = reactor.register(fd.as_raw_fd())?; | |
149 | Ok(Self { fd, registration }) | |
150 | } | |
151 | } | |
152 | ||
153 | impl PolledFd { | |
154 | pub fn poll_read(&mut self, data: &mut [u8], cx: &mut Context) -> Poll<io::Result<usize>> { | |
155 | let fd = self.fd.as_raw_fd(); | |
156 | let size = libc::size_t::try_from(data.len()).map_err(io_err_other)?; | |
157 | let mut read_waker = self | |
158 | .registration | |
159 | .inner | |
160 | .as_ref() | |
161 | .unwrap() | |
162 | .read_waker | |
163 | .lock() | |
164 | .unwrap(); | |
165 | match c_result!(unsafe { libc::read(fd, data.as_mut_ptr() as *mut libc::c_void, size) }) { | |
166 | Ok(got) => Poll::Ready(Ok(got as usize)), | |
167 | Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { | |
168 | *read_waker = Some(cx.waker().clone()); | |
169 | Poll::Pending | |
170 | } | |
171 | Err(err) => Poll::Ready(Err(err)), | |
172 | } | |
173 | } | |
174 | ||
175 | pub async fn read(&mut self, data: &mut [u8]) -> io::Result<usize> { | |
176 | poll_fn(move |cx| self.poll_read(data, cx)).await | |
177 | } | |
beb2f986 WB |
178 | |
179 | pub fn poll_write(&mut self, data: &[u8], cx: &mut Context) -> Poll<io::Result<usize>> { | |
180 | let fd = self.fd.as_raw_fd(); | |
181 | let size = libc::size_t::try_from(data.len()).map_err(io_err_other)?; | |
182 | let mut write_waker = self | |
183 | .registration | |
184 | .inner | |
185 | .as_ref() | |
186 | .unwrap() | |
187 | .write_waker | |
188 | .lock() | |
189 | .unwrap(); | |
190 | match c_result!(unsafe { libc::write(fd, data.as_ptr() as *const libc::c_void, size) }) { | |
191 | Ok(got) => Poll::Ready(Ok(got as usize)), | |
192 | Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { | |
193 | *write_waker = Some(cx.waker().clone()); | |
194 | Poll::Pending | |
195 | } | |
196 | Err(err) => Poll::Ready(Err(err)), | |
197 | } | |
198 | } | |
199 | ||
200 | pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> { | |
201 | poll_fn(move |cx| self.poll_write(data, cx)).await | |
202 | } | |
8962159b | 203 | } |