]> git.proxmox.com Git - pve-lxc-syscalld.git/blob - src/reactor.rs
use default reactor
[pve-lxc-syscalld.git] / src / reactor.rs
1 use std::convert::TryFrom;
2 use std::io;
3 use std::os::unix::io::{AsRawFd, RawFd};
4 use std::sync::atomic::{AtomicBool, Ordering};
5 use std::sync::{Arc, Mutex, Once};
6 use std::task::{Context, Poll, Waker};
7 use std::thread::JoinHandle;
8
9 use crate::epoll::{Epoll, EpollEvent, EPOLLERR, EPOLLET, EPOLLHUP, EPOLLIN, EPOLLOUT};
10 use crate::error::io_err_other;
11 use crate::poll_fn::poll_fn;
12 use crate::tools::Fd;
13
14 static START: Once = Once::new();
15 static mut REACTOR: Option<Arc<Reactor>> = None;
16
17 pub fn default() -> Arc<Reactor> {
18 START.call_once(|| unsafe {
19 let reactor = Reactor::new().expect("setup main epoll reactor");
20 REACTOR = Some(reactor);
21 });
22 unsafe { Arc::clone(REACTOR.as_ref().unwrap()) }
23 }
24
25 pub struct Reactor {
26 epoll: Arc<Epoll>,
27 removed: Mutex<Vec<Box<RegistrationInner>>>,
28 thread: Mutex<Option<JoinHandle<()>>>,
29 }
30
31 impl Reactor {
32 pub fn new() -> io::Result<Arc<Self>> {
33 let epoll = Arc::new(Epoll::new()?);
34
35 let this = Arc::new(Reactor {
36 epoll,
37 removed: Mutex::new(Vec::new()),
38 thread: Mutex::new(None),
39 });
40
41 let handle = std::thread::spawn({
42 let this = Arc::clone(&this);
43 move || this.thread_main()
44 });
45
46 this.thread.lock().unwrap().replace(handle);
47
48 Ok(this)
49 }
50
51 fn thread_main(self: Arc<Self>) {
52 let mut buf: [EpollEvent; 16] = unsafe { std::mem::zeroed() };
53 loop {
54 let count = match self.epoll.wait(&mut buf, None) {
55 Ok(count) => count,
56 Err(err) => {
57 eprintln!("error in epoll loop: {}", err);
58 std::process::exit(1);
59 }
60 };
61 for i in 0..count {
62 self.handle_event(&buf[i]);
63 }
64 // After going through the events we can release memory associated with already closed
65 // file descriptors:
66 self.removed.lock().unwrap().clear();
67 }
68 }
69
70 fn handle_event(&self, event: &EpollEvent) {
71 let registration = unsafe { &mut *(event.r#u64 as *mut RegistrationInner) };
72 if registration.gone.load(Ordering::Acquire) {
73 return;
74 }
75
76 if 0 != (event.events & EPOLLIN) {
77 //let _prev = registration.ready.fetch_or(READY_IN, Ordering::AcqRel);
78 if let Some(waker) = registration.read_waker.lock().unwrap().take() {
79 waker.wake();
80 }
81 }
82
83 if 0 != (event.events & EPOLLOUT) {
84 //let _prev = registration.ready.fetch_or(READY_OUT, Ordering::AcqRel);
85 if let Some(waker) = registration.write_waker.lock().unwrap().take() {
86 waker.wake();
87 }
88 }
89
90 if 0 != (event.events & (EPOLLERR | EPOLLHUP)) {
91 //let _prev = registration.ready.fetch_or(READY_ERR, Ordering::AcqRel);
92 if let Some(waker) = registration.read_waker.lock().unwrap().take() {
93 waker.wake();
94 }
95 if let Some(waker) = registration.write_waker.lock().unwrap().take() {
96 waker.wake();
97 }
98 }
99 }
100
101 pub fn register(self: Arc<Self>, fd: RawFd) -> io::Result<Registration> {
102 let mut inner = Box::new(RegistrationInner {
103 gone: AtomicBool::new(false),
104 reactor: Arc::clone(&self),
105 read_waker: Mutex::new(None),
106 write_waker: Mutex::new(None),
107 });
108
109 let inner_ptr = {
110 // type check/assertion
111 let inner_ptr: &mut RegistrationInner = &mut *inner;
112 // make raw pointer
113 inner_ptr as *mut RegistrationInner as usize as u64
114 };
115
116 self.epoll
117 .add_fd(fd, EPOLLIN | EPOLLOUT | EPOLLET, inner_ptr)?;
118
119 Ok(Registration { inner: Some(inner) })
120 }
121
122 fn deregister(&self, registration: Box<RegistrationInner>) {
123 self.removed.lock().unwrap().push(registration);
124 }
125 }
126
127 pub struct Registration {
128 // pin the data in memory because the other thread will access it
129 // ManuallyDrop::take is nightly only :<
130 inner: Option<Box<RegistrationInner>>,
131 }
132
133 impl Drop for Registration {
134 fn drop(&mut self) {
135 let inner = self.inner.as_ref().unwrap();
136 let reactor = Arc::clone(&inner.reactor);
137 inner.gone.store(true, Ordering::Release);
138 reactor.deregister(self.inner.take().unwrap());
139 }
140 }
141
142 // This is accessed by the reactor
143 struct RegistrationInner {
144 gone: AtomicBool,
145 reactor: Arc<Reactor>,
146 read_waker: Mutex<Option<Waker>>,
147 write_waker: Mutex<Option<Waker>>,
148 }
149
150 pub struct PolledFd {
151 fd: Fd,
152 registration: Registration,
153 }
154
155 impl PolledFd {
156 pub fn new(fd: Fd) -> io::Result<Self> {
157 Self::new_with_reactor(fd, crate::reactor::default())
158 }
159
160 pub fn new_with_reactor(fd: Fd, reactor: Arc<Reactor>) -> io::Result<Self> {
161 let registration = reactor.register(fd.as_raw_fd())?;
162 Ok(Self { fd, registration })
163 }
164 }
165
166 impl PolledFd {
167 pub fn poll_read(&mut self, data: &mut [u8], cx: &mut Context) -> Poll<io::Result<usize>> {
168 let fd = self.fd.as_raw_fd();
169 let size = libc::size_t::try_from(data.len()).map_err(io_err_other)?;
170 let mut read_waker = self
171 .registration
172 .inner
173 .as_ref()
174 .unwrap()
175 .read_waker
176 .lock()
177 .unwrap();
178 match c_result!(unsafe { libc::read(fd, data.as_mut_ptr() as *mut libc::c_void, size) }) {
179 Ok(got) => Poll::Ready(Ok(got as usize)),
180 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
181 *read_waker = Some(cx.waker().clone());
182 Poll::Pending
183 }
184 Err(err) => Poll::Ready(Err(err)),
185 }
186 }
187
188 pub async fn read(&mut self, data: &mut [u8]) -> io::Result<usize> {
189 poll_fn(move |cx| self.poll_read(data, cx)).await
190 }
191
192 pub fn poll_write(&mut self, data: &[u8], cx: &mut Context) -> Poll<io::Result<usize>> {
193 let fd = self.fd.as_raw_fd();
194 let size = libc::size_t::try_from(data.len()).map_err(io_err_other)?;
195 let mut write_waker = self
196 .registration
197 .inner
198 .as_ref()
199 .unwrap()
200 .write_waker
201 .lock()
202 .unwrap();
203 match c_result!(unsafe { libc::write(fd, data.as_ptr() as *const libc::c_void, size) }) {
204 Ok(got) => Poll::Ready(Ok(got as usize)),
205 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
206 *write_waker = Some(cx.waker().clone());
207 Poll::Pending
208 }
209 Err(err) => Poll::Ready(Err(err)),
210 }
211 }
212
213 pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
214 poll_fn(move |cx| self.poll_write(data, cx)).await
215 }
216 }