]> git.proxmox.com Git - pve-lxc-syscalld.git/blob - src/io/reactor.rs
a27ba3d1e084d206b79a8a840633b969d0f62002
[pve-lxc-syscalld.git] / src / io / reactor.rs
1 use std::io;
2 use std::os::unix::io::{AsRawFd, IntoRawFd, RawFd};
3 use std::sync::atomic::{AtomicBool, Ordering};
4 use std::sync::{Arc, Mutex, Once};
5 use std::task::{Context, Poll, Waker};
6 use std::thread::JoinHandle;
7
8 use crate::error::io_err_other;
9 use crate::io::epoll::{Epoll, EpollEvent, EPOLLERR, EPOLLET, EPOLLHUP, EPOLLIN, EPOLLOUT};
10 use crate::tools::Fd;
11
12 static START: Once = Once::new();
13 static mut REACTOR: Option<Arc<Reactor>> = None;
14
15 pub fn default_reactor() -> Arc<Reactor> {
16 START.call_once(|| unsafe {
17 let reactor = Reactor::new().expect("setup main epoll reactor");
18 REACTOR = Some(reactor);
19 });
20 unsafe { Arc::clone(REACTOR.as_ref().unwrap()) }
21 }
22
23 pub struct Reactor {
24 epoll: Arc<Epoll>,
25 removed: Mutex<Vec<Box<RegistrationInner>>>,
26 thread: Mutex<Option<JoinHandle<()>>>,
27 }
28
29 impl Reactor {
30 pub fn new() -> io::Result<Arc<Self>> {
31 let epoll = Arc::new(Epoll::new()?);
32
33 let this = Arc::new(Reactor {
34 epoll,
35 removed: Mutex::new(Vec::new()),
36 thread: Mutex::new(None),
37 });
38
39 let handle = std::thread::spawn({
40 let this = Arc::clone(&this);
41 move || this.thread_main()
42 });
43
44 this.thread.lock().unwrap().replace(handle);
45
46 Ok(this)
47 }
48
49 fn thread_main(self: Arc<Self>) {
50 let mut buf: [EpollEvent; 16] = unsafe { std::mem::zeroed() };
51 loop {
52 let count = match self.epoll.wait(&mut buf, None) {
53 Ok(count) => count,
54 Err(err) => {
55 eprintln!("error in epoll loop: {}", err);
56 std::process::exit(1);
57 }
58 };
59 for i in 0..count {
60 self.handle_event(&buf[i]);
61 }
62 // After going through the events we can release memory associated with already closed
63 // file descriptors:
64 self.removed.lock().unwrap().clear();
65 }
66 }
67
68 fn handle_event(&self, event: &EpollEvent) {
69 let registration = unsafe { &mut *(event.r#u64 as *mut RegistrationInner) };
70 if registration.gone.load(Ordering::Acquire) {
71 return;
72 }
73
74 if 0 != (event.events & EPOLLIN) {
75 //let _prev = registration.ready.fetch_or(READY_IN, Ordering::AcqRel);
76 if let Some(waker) = registration.read_waker.lock().unwrap().take() {
77 waker.wake();
78 }
79 }
80
81 if 0 != (event.events & EPOLLOUT) {
82 //let _prev = registration.ready.fetch_or(READY_OUT, Ordering::AcqRel);
83 if let Some(waker) = registration.write_waker.lock().unwrap().take() {
84 waker.wake();
85 }
86 }
87
88 if 0 != (event.events & (EPOLLERR | EPOLLHUP)) {
89 //let _prev = registration.ready.fetch_or(READY_ERR, Ordering::AcqRel);
90 if let Some(waker) = registration.read_waker.lock().unwrap().take() {
91 waker.wake();
92 }
93 if let Some(waker) = registration.write_waker.lock().unwrap().take() {
94 waker.wake();
95 }
96 }
97 }
98
99 pub fn register(self: Arc<Self>, fd: RawFd) -> io::Result<Registration> {
100 let mut inner = Box::new(RegistrationInner {
101 gone: AtomicBool::new(false),
102 reactor: Arc::clone(&self),
103 read_waker: Mutex::new(None),
104 write_waker: Mutex::new(None),
105 });
106
107 let inner_ptr = {
108 // type check/assertion
109 let inner_ptr: &mut RegistrationInner = &mut *inner;
110 // make raw pointer
111 inner_ptr as *mut RegistrationInner as usize as u64
112 };
113
114 self.epoll
115 .add_fd(fd, EPOLLIN | EPOLLOUT | EPOLLET, inner_ptr)?;
116
117 Ok(Registration { inner: Some(inner) })
118 }
119
120 fn deregister(&self, registration: Box<RegistrationInner>) {
121 self.removed.lock().unwrap().push(registration);
122 }
123 }
124
125 pub struct Registration {
126 // pin the data in memory because the other thread will access it
127 // ManuallyDrop::take is nightly only :<
128 inner: Option<Box<RegistrationInner>>,
129 }
130
131 impl Drop for Registration {
132 fn drop(&mut self) {
133 if let Some(inner) = self.inner.take() {
134 let reactor = Arc::clone(&inner.reactor);
135 inner.gone.store(true, Ordering::Release);
136 reactor.deregister(inner);
137 }
138 }
139 }
140
141 // This is accessed by the reactor
142 struct RegistrationInner {
143 gone: AtomicBool,
144 reactor: Arc<Reactor>,
145 read_waker: Mutex<Option<Waker>>,
146 write_waker: Mutex<Option<Waker>>,
147 }
148
149 pub struct PolledFd {
150 fd: Fd,
151 registration: Registration,
152 }
153
154 impl AsRawFd for PolledFd {
155 #[inline]
156 fn as_raw_fd(&self) -> RawFd {
157 self.fd.as_raw_fd()
158 }
159 }
160
161 impl IntoRawFd for PolledFd {
162 fn into_raw_fd(mut self) -> RawFd {
163 let registration = self.registration.inner.take().unwrap();
164 registration
165 .reactor
166 .epoll
167 .remove_fd(self.as_raw_fd())
168 .expect("cannot remove PolledFd from epoll instance");
169 self.fd.into_raw_fd()
170 }
171 }
172
173 impl PolledFd {
174 pub fn new(mut fd: Fd) -> io::Result<Self> {
175 fd.set_nonblocking(true).map_err(io_err_other)?;
176 Self::new_with_reactor(fd, self::default_reactor())
177 }
178
179 pub fn new_with_reactor(fd: Fd, reactor: Arc<Reactor>) -> io::Result<Self> {
180 let registration = reactor.register(fd.as_raw_fd())?;
181 Ok(Self { fd, registration })
182 }
183
184 pub fn wrap_read<T, F>(&self, cx: &mut Context, func: F) -> Poll<io::Result<T>>
185 where
186 F: FnOnce() -> io::Result<T>,
187 {
188 let mut read_waker = self
189 .registration
190 .inner
191 .as_ref()
192 .unwrap()
193 .read_waker
194 .lock()
195 .unwrap();
196 match func() {
197 Ok(out) => Poll::Ready(Ok(out)),
198 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
199 *read_waker = Some(cx.waker().clone());
200 Poll::Pending
201 }
202 Err(err) => Poll::Ready(Err(err)),
203 }
204 }
205
206 pub fn wrap_write<T, F>(&self, cx: &mut Context, func: F) -> Poll<io::Result<T>>
207 where
208 F: FnOnce() -> io::Result<T>,
209 {
210 let mut write_waker = self
211 .registration
212 .inner
213 .as_ref()
214 .unwrap()
215 .write_waker
216 .lock()
217 .unwrap();
218 match func() {
219 Ok(out) => Poll::Ready(Ok(out)),
220 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
221 *write_waker = Some(cx.waker().clone());
222 Poll::Pending
223 }
224 Err(err) => Poll::Ready(Err(err)),
225 }
226 }
227 }