]>
Commit | Line | Data |
---|---|---|
8962159b | 1 | use std::io; |
8dd26985 | 2 | use std::os::unix::io::{AsRawFd, IntoRawFd, RawFd}; |
9ebd1972 | 3 | use std::sync::atomic::{AtomicBool, Ordering}; |
0e1eba91 | 4 | use std::sync::{Arc, Mutex, Once}; |
a22aece0 | 5 | use std::task::{Context, Poll, Waker}; |
8962159b WB |
6 | use std::thread::JoinHandle; |
7 | ||
944fd4f5 | 8 | use crate::error::io_err_other; |
c84b9055 | 9 | use crate::io::epoll::{Epoll, EpollEvent, EPOLLERR, EPOLLET, EPOLLHUP, EPOLLIN, EPOLLOUT}; |
a22aece0 | 10 | use crate::tools::Fd; |
8962159b | 11 | |
0e1eba91 WB |
12 | static START: Once = Once::new(); |
13 | static mut REACTOR: Option<Arc<Reactor>> = None; | |
14 | ||
8dd26985 | 15 | pub fn default_reactor() -> Arc<Reactor> { |
0e1eba91 WB |
16 | START.call_once(|| unsafe { |
17 | let reactor = Reactor::new().expect("setup main epoll reactor"); | |
18 | REACTOR = Some(reactor); | |
19 | }); | |
20 | unsafe { Arc::clone(REACTOR.as_ref().unwrap()) } | |
21 | } | |
22 | ||
8962159b WB |
23 | pub struct Reactor { |
24 | epoll: Arc<Epoll>, | |
9ebd1972 | 25 | removed: Mutex<Vec<Box<RegistrationInner>>>, |
f757af32 | 26 | thread: Mutex<Option<JoinHandle<()>>>, |
8962159b WB |
27 | } |
28 | ||
29 | impl Reactor { | |
a22aece0 | 30 | pub fn new() -> io::Result<Arc<Self>> { |
8962159b WB |
31 | let epoll = Arc::new(Epoll::new()?); |
32 | ||
f757af32 WB |
33 | let this = Arc::new(Reactor { |
34 | epoll, | |
35 | removed: Mutex::new(Vec::new()), | |
36 | thread: Mutex::new(None), | |
37 | }); | |
38 | ||
8962159b | 39 | let handle = std::thread::spawn({ |
f757af32 WB |
40 | let this = Arc::clone(&this); |
41 | move || this.thread_main() | |
8962159b WB |
42 | }); |
43 | ||
f757af32 WB |
44 | this.thread.lock().unwrap().replace(handle); |
45 | ||
46 | Ok(this) | |
8962159b WB |
47 | } |
48 | ||
f757af32 | 49 | fn thread_main(self: Arc<Self>) { |
a22aece0 | 50 | let mut buf: [EpollEvent; 16] = unsafe { std::mem::zeroed() }; |
d1b1deab | 51 | loop { |
f757af32 | 52 | let count = match self.epoll.wait(&mut buf, None) { |
a22aece0 WB |
53 | Ok(count) => count, |
54 | Err(err) => { | |
55 | eprintln!("error in epoll loop: {}", err); | |
56 | std::process::exit(1); | |
57 | } | |
58 | }; | |
59 | for i in 0..count { | |
f757af32 | 60 | self.handle_event(&buf[i]); |
a22aece0 | 61 | } |
9ebd1972 WB |
62 | // After going through the events we can release memory associated with already closed |
63 | // file descriptors: | |
64 | self.removed.lock().unwrap().clear(); | |
a22aece0 WB |
65 | } |
66 | } | |
67 | ||
f757af32 | 68 | fn handle_event(&self, event: &EpollEvent) { |
a22aece0 | 69 | let registration = unsafe { &mut *(event.r#u64 as *mut RegistrationInner) }; |
9ebd1972 WB |
70 | if registration.gone.load(Ordering::Acquire) { |
71 | return; | |
f757af32 WB |
72 | } |
73 | ||
a22aece0 WB |
74 | if 0 != (event.events & EPOLLIN) { |
75 | //let _prev = registration.ready.fetch_or(READY_IN, Ordering::AcqRel); | |
76 | if let Some(waker) = registration.read_waker.lock().unwrap().take() { | |
77 | waker.wake(); | |
78 | } | |
79 | } | |
80 | ||
81 | if 0 != (event.events & EPOLLOUT) { | |
82 | //let _prev = registration.ready.fetch_or(READY_OUT, Ordering::AcqRel); | |
83 | if let Some(waker) = registration.write_waker.lock().unwrap().take() { | |
84 | waker.wake(); | |
85 | } | |
86 | } | |
87 | ||
88 | if 0 != (event.events & (EPOLLERR | EPOLLHUP)) { | |
89 | //let _prev = registration.ready.fetch_or(READY_ERR, Ordering::AcqRel); | |
90 | if let Some(waker) = registration.read_waker.lock().unwrap().take() { | |
91 | waker.wake(); | |
92 | } | |
93 | if let Some(waker) = registration.write_waker.lock().unwrap().take() { | |
94 | waker.wake(); | |
95 | } | |
d1b1deab | 96 | } |
8962159b | 97 | } |
a22aece0 WB |
98 | |
99 | pub fn register(self: Arc<Self>, fd: RawFd) -> io::Result<Registration> { | |
100 | let mut inner = Box::new(RegistrationInner { | |
9ebd1972 | 101 | gone: AtomicBool::new(false), |
a22aece0 WB |
102 | reactor: Arc::clone(&self), |
103 | read_waker: Mutex::new(None), | |
104 | write_waker: Mutex::new(None), | |
105 | }); | |
106 | ||
107 | let inner_ptr = { | |
108 | // type check/assertion | |
109 | let inner_ptr: &mut RegistrationInner = &mut *inner; | |
110 | // make raw pointer | |
111 | inner_ptr as *mut RegistrationInner as usize as u64 | |
112 | }; | |
113 | ||
a3afcf22 WB |
114 | self.epoll |
115 | .add_fd(fd, EPOLLIN | EPOLLOUT | EPOLLET, inner_ptr)?; | |
a22aece0 WB |
116 | |
117 | Ok(Registration { inner: Some(inner) }) | |
118 | } | |
119 | ||
120 | fn deregister(&self, registration: Box<RegistrationInner>) { | |
9ebd1972 | 121 | self.removed.lock().unwrap().push(registration); |
a22aece0 WB |
122 | } |
123 | } | |
124 | ||
125 | pub struct Registration { | |
126 | // pin the data in memory because the other thread will access it | |
127 | // ManuallyDrop::take is nightly only :< | |
128 | inner: Option<Box<RegistrationInner>>, | |
129 | } | |
130 | ||
131 | impl Drop for Registration { | |
132 | fn drop(&mut self) { | |
8dd26985 WB |
133 | if let Some(inner) = self.inner.take() { |
134 | let reactor = Arc::clone(&inner.reactor); | |
135 | inner.gone.store(true, Ordering::Release); | |
136 | reactor.deregister(inner); | |
137 | } | |
a22aece0 WB |
138 | } |
139 | } | |
140 | ||
141 | // This is accessed by the reactor | |
142 | struct RegistrationInner { | |
9ebd1972 | 143 | gone: AtomicBool, |
a22aece0 WB |
144 | reactor: Arc<Reactor>, |
145 | read_waker: Mutex<Option<Waker>>, | |
146 | write_waker: Mutex<Option<Waker>>, | |
147 | } | |
148 | ||
149 | pub struct PolledFd { | |
150 | fd: Fd, | |
151 | registration: Registration, | |
152 | } | |
153 | ||
f4c53643 WB |
154 | impl AsRawFd for PolledFd { |
155 | #[inline] | |
156 | fn as_raw_fd(&self) -> RawFd { | |
157 | self.fd.as_raw_fd() | |
158 | } | |
159 | } | |
160 | ||
8dd26985 WB |
161 | impl IntoRawFd for PolledFd { |
162 | fn into_raw_fd(mut self) -> RawFd { | |
163 | let registration = self.registration.inner.take().unwrap(); | |
164 | registration | |
165 | .reactor | |
166 | .epoll | |
167 | .remove_fd(self.as_raw_fd()) | |
168 | .expect("cannot remove PolledFd from epoll instance"); | |
169 | self.fd.into_raw_fd() | |
170 | } | |
171 | } | |
172 | ||
a22aece0 | 173 | impl PolledFd { |
1282264a | 174 | pub fn new(mut fd: Fd) -> io::Result<Self> { |
944fd4f5 | 175 | fd.set_nonblocking(true).map_err(io_err_other)?; |
8dd26985 | 176 | Self::new_with_reactor(fd, self::default_reactor()) |
ca7f6ba0 WB |
177 | } |
178 | ||
179 | pub fn new_with_reactor(fd: Fd, reactor: Arc<Reactor>) -> io::Result<Self> { | |
a22aece0 WB |
180 | let registration = reactor.register(fd.as_raw_fd())?; |
181 | Ok(Self { fd, registration }) | |
182 | } | |
a22aece0 | 183 | |
8dd26985 | 184 | pub fn wrap_read<T, F>(&self, cx: &mut Context, func: F) -> Poll<io::Result<T>> |
86b83867 WB |
185 | where |
186 | F: FnOnce() -> io::Result<T>, | |
187 | { | |
a22aece0 WB |
188 | let mut read_waker = self |
189 | .registration | |
190 | .inner | |
191 | .as_ref() | |
192 | .unwrap() | |
193 | .read_waker | |
194 | .lock() | |
195 | .unwrap(); | |
86b83867 WB |
196 | match func() { |
197 | Ok(out) => Poll::Ready(Ok(out)), | |
a22aece0 WB |
198 | Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { |
199 | *read_waker = Some(cx.waker().clone()); | |
200 | Poll::Pending | |
201 | } | |
202 | Err(err) => Poll::Ready(Err(err)), | |
203 | } | |
204 | } | |
205 | ||
8dd26985 | 206 | pub fn wrap_write<T, F>(&self, cx: &mut Context, func: F) -> Poll<io::Result<T>> |
86b83867 WB |
207 | where |
208 | F: FnOnce() -> io::Result<T>, | |
209 | { | |
beb2f986 WB |
210 | let mut write_waker = self |
211 | .registration | |
212 | .inner | |
213 | .as_ref() | |
214 | .unwrap() | |
215 | .write_waker | |
216 | .lock() | |
217 | .unwrap(); | |
86b83867 WB |
218 | match func() { |
219 | Ok(out) => Poll::Ready(Ok(out)), | |
beb2f986 WB |
220 | Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { |
221 | *write_waker = Some(cx.waker().clone()); | |
222 | Poll::Pending | |
223 | } | |
224 | Err(err) => Poll::Ready(Err(err)), | |
225 | } | |
226 | } | |
86b83867 | 227 | } |