]> git.proxmox.com Git - pve-lxc-syscalld.git/blob - src/reactor.rs
c731178657ba9579a3720ddabc6747c599c5ff84
[pve-lxc-syscalld.git] / src / reactor.rs
1 use std::convert::TryFrom;
2 use std::io;
3 use std::os::unix::io::{AsRawFd, RawFd};
4 use std::sync::{Arc, Mutex};
5 use std::task::{Context, Poll, Waker};
6 use std::thread::JoinHandle;
7
8 use crate::epoll::{Epoll, EpollEvent, EPOLLERR, EPOLLHUP, EPOLLIN, EPOLLOUT};
9 use crate::error::io_err_other;
10 use crate::poll_fn::poll_fn;
11 use crate::tools::Fd;
12
13 pub struct Reactor {
14 epoll: Arc<Epoll>,
15 removed: Mutex<Vec<(RawFd, Box<RegistrationInner>)>>,
16 thread: Mutex<Option<JoinHandle<()>>>,
17 }
18
19 impl Reactor {
20 pub fn new() -> io::Result<Arc<Self>> {
21 let epoll = Arc::new(Epoll::new()?);
22
23 let this = Arc::new(Reactor {
24 epoll,
25 removed: Mutex::new(Vec::new()),
26 thread: Mutex::new(None),
27 });
28
29 let handle = std::thread::spawn({
30 let this = Arc::clone(&this);
31 move || this.thread_main()
32 });
33
34 this.thread.lock().unwrap().replace(handle);
35
36 Ok(this)
37 }
38
39 fn thread_main(self: Arc<Self>) {
40 let mut buf: [EpollEvent; 16] = unsafe { std::mem::zeroed() };
41 loop {
42 let count = match self.epoll.wait(&mut buf, None) {
43 Ok(count) => count,
44 Err(err) => {
45 eprintln!("error in epoll loop: {}", err);
46 std::process::exit(1);
47 }
48 };
49 for i in 0..count {
50 self.handle_event(&buf[i]);
51 }
52 }
53 }
54
55 fn handle_event(&self, event: &EpollEvent) {
56 let registration = unsafe { &mut *(event.r#u64 as *mut RegistrationInner) };
57 for (fd, _) in self.removed.lock().unwrap().iter() {
58 // This fd is already being dropped, don't touch it!
59 if *fd == registration.fd {
60 return;
61 }
62 }
63
64 if 0 != (event.events & EPOLLIN) {
65 //let _prev = registration.ready.fetch_or(READY_IN, Ordering::AcqRel);
66 if let Some(waker) = registration.read_waker.lock().unwrap().take() {
67 waker.wake();
68 }
69 }
70
71 if 0 != (event.events & EPOLLOUT) {
72 //let _prev = registration.ready.fetch_or(READY_OUT, Ordering::AcqRel);
73 if let Some(waker) = registration.write_waker.lock().unwrap().take() {
74 waker.wake();
75 }
76 }
77
78 if 0 != (event.events & (EPOLLERR | EPOLLHUP)) {
79 //let _prev = registration.ready.fetch_or(READY_ERR, Ordering::AcqRel);
80 if let Some(waker) = registration.read_waker.lock().unwrap().take() {
81 waker.wake();
82 }
83 if let Some(waker) = registration.write_waker.lock().unwrap().take() {
84 waker.wake();
85 }
86 }
87 }
88
89 pub fn register(self: Arc<Self>, fd: RawFd) -> io::Result<Registration> {
90 let mut inner = Box::new(RegistrationInner {
91 fd,
92 //ready: AtomicU32::new(0),
93 reactor: Arc::clone(&self),
94 read_waker: Mutex::new(None),
95 write_waker: Mutex::new(None),
96 });
97
98 let inner_ptr = {
99 // type check/assertion
100 let inner_ptr: &mut RegistrationInner = &mut *inner;
101 // make raw pointer
102 inner_ptr as *mut RegistrationInner as usize as u64
103 };
104
105 self.epoll.add_fd(fd, EPOLLIN | EPOLLOUT, inner_ptr)?;
106
107 Ok(Registration { inner: Some(inner) })
108 }
109
110 fn deregister(&self, registration: Box<RegistrationInner>) {
111 self.removed
112 .lock()
113 .unwrap()
114 .push((registration.fd, registration));
115 }
116 }
117
118 pub struct Registration {
119 // pin the data in memory because the other thread will access it
120 // ManuallyDrop::take is nightly only :<
121 inner: Option<Box<RegistrationInner>>,
122 }
123
124 impl Drop for Registration {
125 fn drop(&mut self) {
126 let reactor = Arc::clone(&self.inner.as_ref().unwrap().reactor);
127 reactor.deregister(self.inner.take().unwrap());
128 }
129 }
130
131 // This is accessed by the reactor
132 struct RegistrationInner {
133 fd: RawFd,
134 //ready: AtomicU32,
135 reactor: Arc<Reactor>,
136 read_waker: Mutex<Option<Waker>>,
137 write_waker: Mutex<Option<Waker>>,
138 }
139
140 pub struct PolledFd {
141 fd: Fd,
142 registration: Registration,
143 }
144
145 impl PolledFd {
146 pub fn new(fd: Fd, reactor: Arc<Reactor>) -> io::Result<Self> {
147 let registration = reactor.register(fd.as_raw_fd())?;
148 Ok(Self { fd, registration })
149 }
150 }
151
152 impl PolledFd {
153 pub fn poll_read(&mut self, data: &mut [u8], cx: &mut Context) -> Poll<io::Result<usize>> {
154 let fd = self.fd.as_raw_fd();
155 let size = libc::size_t::try_from(data.len()).map_err(io_err_other)?;
156 let mut read_waker = self
157 .registration
158 .inner
159 .as_ref()
160 .unwrap()
161 .read_waker
162 .lock()
163 .unwrap();
164 match c_result!(unsafe { libc::read(fd, data.as_mut_ptr() as *mut libc::c_void, size) }) {
165 Ok(got) => Poll::Ready(Ok(got as usize)),
166 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
167 *read_waker = Some(cx.waker().clone());
168 Poll::Pending
169 }
170 Err(err) => Poll::Ready(Err(err)),
171 }
172 }
173
174 pub async fn read(&mut self, data: &mut [u8]) -> io::Result<usize> {
175 poll_fn(move |cx| self.poll_read(data, cx)).await
176 }
177
178 pub fn poll_write(&mut self, data: &[u8], cx: &mut Context) -> Poll<io::Result<usize>> {
179 let fd = self.fd.as_raw_fd();
180 let size = libc::size_t::try_from(data.len()).map_err(io_err_other)?;
181 let mut write_waker = self
182 .registration
183 .inner
184 .as_ref()
185 .unwrap()
186 .write_waker
187 .lock()
188 .unwrap();
189 match c_result!(unsafe { libc::write(fd, data.as_ptr() as *const libc::c_void, size) }) {
190 Ok(got) => Poll::Ready(Ok(got as usize)),
191 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
192 *write_waker = Some(cx.waker().clone());
193 Poll::Pending
194 }
195 Err(err) => Poll::Ready(Err(err)),
196 }
197 }
198
199 pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
200 poll_fn(move |cx| self.poll_write(data, cx)).await
201 }
202 }