]> git.proxmox.com Git - pve-lxc-syscalld.git/blob - src/reactor.rs
handle dropped fds
[pve-lxc-syscalld.git] / src / reactor.rs
1 use std::convert::TryFrom;
2 use std::io;
3 use std::os::unix::io::{AsRawFd, RawFd};
4 use std::sync::{Arc, Mutex};
5 use std::task::{Context, Poll, Waker};
6 use std::thread::JoinHandle;
7
8 use crate::epoll::{Epoll, EpollEvent, EPOLLERR, EPOLLHUP, EPOLLIN, EPOLLOUT};
9 use crate::error::io_err_other;
10 use crate::poll_fn::poll_fn;
11 use crate::tools::Fd;
12
13 pub struct AssertSync<T>(pub T);
14 unsafe impl<T> Sync for AssertSync<T> {}
15
16 pub const READY_IN: u32 = 0b001;
17 pub const READY_OUT: u32 = 0b010;
18 pub const READY_ERR: u32 = 0b100;
19
20 pub struct Reactor {
21 epoll: Arc<Epoll>,
22 removed: Mutex<Vec<(RawFd, Box<RegistrationInner>)>>,
23 thread: Mutex<Option<JoinHandle<()>>>,
24 }
25
26 impl Reactor {
27 pub fn new() -> io::Result<Arc<Self>> {
28 let epoll = Arc::new(Epoll::new()?);
29
30 let this = Arc::new(Reactor {
31 epoll,
32 removed: Mutex::new(Vec::new()),
33 thread: Mutex::new(None),
34 });
35
36 let handle = std::thread::spawn({
37 let this = Arc::clone(&this);
38 move || this.thread_main()
39 });
40
41 this.thread.lock().unwrap().replace(handle);
42
43 Ok(this)
44 }
45
46 fn thread_main(self: Arc<Self>) {
47 let mut buf: [EpollEvent; 16] = unsafe { std::mem::zeroed() };
48 loop {
49 let count = match self.epoll.wait(&mut buf, None) {
50 Ok(count) => count,
51 Err(err) => {
52 eprintln!("error in epoll loop: {}", err);
53 std::process::exit(1);
54 }
55 };
56 for i in 0..count {
57 self.handle_event(&buf[i]);
58 }
59 }
60 }
61
62 fn handle_event(&self, event: &EpollEvent) {
63 let registration = unsafe { &mut *(event.r#u64 as *mut RegistrationInner) };
64 for (fd, _) in self.removed.lock().unwrap().iter() {
65 // This fd is already being dropped, don't touch it!
66 if *fd == registration.fd {
67 return;
68 }
69 }
70
71 if 0 != (event.events & EPOLLIN) {
72 //let _prev = registration.ready.fetch_or(READY_IN, Ordering::AcqRel);
73 if let Some(waker) = registration.read_waker.lock().unwrap().take() {
74 waker.wake();
75 }
76 }
77
78 if 0 != (event.events & EPOLLOUT) {
79 //let _prev = registration.ready.fetch_or(READY_OUT, Ordering::AcqRel);
80 if let Some(waker) = registration.write_waker.lock().unwrap().take() {
81 waker.wake();
82 }
83 }
84
85 if 0 != (event.events & (EPOLLERR | EPOLLHUP)) {
86 //let _prev = registration.ready.fetch_or(READY_ERR, Ordering::AcqRel);
87 if let Some(waker) = registration.read_waker.lock().unwrap().take() {
88 waker.wake();
89 }
90 if let Some(waker) = registration.write_waker.lock().unwrap().take() {
91 waker.wake();
92 }
93 }
94 }
95
96 pub fn register(self: Arc<Self>, fd: RawFd) -> io::Result<Registration> {
97 let mut inner = Box::new(RegistrationInner {
98 fd,
99 //ready: AtomicU32::new(0),
100 reactor: Arc::clone(&self),
101 read_waker: Mutex::new(None),
102 write_waker: Mutex::new(None),
103 });
104
105 let inner_ptr = {
106 // type check/assertion
107 let inner_ptr: &mut RegistrationInner = &mut *inner;
108 // make raw pointer
109 inner_ptr as *mut RegistrationInner as usize as u64
110 };
111
112 self.epoll.add_fd(fd, EPOLLIN | EPOLLOUT, inner_ptr)?;
113
114 Ok(Registration { inner: Some(inner) })
115 }
116
117 fn deregister(&self, registration: Box<RegistrationInner>) {
118 self.removed
119 .lock()
120 .unwrap()
121 .push((registration.fd, registration));
122 }
123 }
124
125 pub struct Registration {
126 // pin the data in memory because the other thread will access it
127 // ManuallyDrop::take is nightly only :<
128 inner: Option<Box<RegistrationInner>>,
129 }
130
131 impl Drop for Registration {
132 fn drop(&mut self) {
133 let reactor = Arc::clone(&self.inner.as_ref().unwrap().reactor);
134 reactor.deregister(self.inner.take().unwrap());
135 }
136 }
137
138 // This is accessed by the reactor
139 struct RegistrationInner {
140 fd: RawFd,
141 //ready: AtomicU32,
142 reactor: Arc<Reactor>,
143 read_waker: Mutex<Option<Waker>>,
144 write_waker: Mutex<Option<Waker>>,
145 }
146
147 pub struct PolledFd {
148 fd: Fd,
149 registration: Registration,
150 }
151
152 impl PolledFd {
153 pub fn new(fd: Fd, reactor: Arc<Reactor>) -> io::Result<Self> {
154 let registration = reactor.register(fd.as_raw_fd())?;
155 Ok(Self { fd, registration })
156 }
157 }
158
159 impl PolledFd {
160 pub fn poll_read(&mut self, data: &mut [u8], cx: &mut Context) -> Poll<io::Result<usize>> {
161 let fd = self.fd.as_raw_fd();
162 let size = libc::size_t::try_from(data.len()).map_err(io_err_other)?;
163 let mut read_waker = self
164 .registration
165 .inner
166 .as_ref()
167 .unwrap()
168 .read_waker
169 .lock()
170 .unwrap();
171 match c_result!(unsafe { libc::read(fd, data.as_mut_ptr() as *mut libc::c_void, size) }) {
172 Ok(got) => Poll::Ready(Ok(got as usize)),
173 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
174 *read_waker = Some(cx.waker().clone());
175 Poll::Pending
176 }
177 Err(err) => Poll::Ready(Err(err)),
178 }
179 }
180
181 pub async fn read(&mut self, data: &mut [u8]) -> io::Result<usize> {
182 poll_fn(move |cx| self.poll_read(data, cx)).await
183 }
184
185 pub fn poll_write(&mut self, data: &[u8], cx: &mut Context) -> Poll<io::Result<usize>> {
186 let fd = self.fd.as_raw_fd();
187 let size = libc::size_t::try_from(data.len()).map_err(io_err_other)?;
188 let mut write_waker = self
189 .registration
190 .inner
191 .as_ref()
192 .unwrap()
193 .write_waker
194 .lock()
195 .unwrap();
196 match c_result!(unsafe { libc::write(fd, data.as_ptr() as *const libc::c_void, size) }) {
197 Ok(got) => Poll::Ready(Ok(got as usize)),
198 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
199 *write_waker = Some(cx.waker().clone());
200 Poll::Pending
201 }
202 Err(err) => Poll::Ready(Err(err)),
203 }
204 }
205
206 pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
207 poll_fn(move |cx| self.poll_write(data, cx)).await
208 }
209 }