]> git.proxmox.com Git - pve-lxc-syscalld.git/blame - src/reactor.rs
edge triggering is vital for us
[pve-lxc-syscalld.git] / src / reactor.rs
CommitLineData
a22aece0 1use std::convert::TryFrom;
8962159b 2use std::io;
a22aece0 3use std::os::unix::io::{AsRawFd, RawFd};
f757af32 4use std::sync::{Arc, Mutex};
a22aece0 5use std::task::{Context, Poll, Waker};
8962159b
WB
6use std::thread::JoinHandle;
7
a3afcf22 8use crate::epoll::{Epoll, EpollEvent, EPOLLERR, EPOLLET, EPOLLHUP, EPOLLIN, EPOLLOUT};
a22aece0
WB
9use crate::error::io_err_other;
10use crate::poll_fn::poll_fn;
11use crate::tools::Fd;
8962159b 12
8962159b
WB
13pub struct Reactor {
14 epoll: Arc<Epoll>,
a22aece0 15 removed: Mutex<Vec<(RawFd, Box<RegistrationInner>)>>,
f757af32 16 thread: Mutex<Option<JoinHandle<()>>>,
8962159b
WB
17}
18
19impl Reactor {
a22aece0 20 pub fn new() -> io::Result<Arc<Self>> {
8962159b
WB
21 let epoll = Arc::new(Epoll::new()?);
22
f757af32
WB
23 let this = Arc::new(Reactor {
24 epoll,
25 removed: Mutex::new(Vec::new()),
26 thread: Mutex::new(None),
27 });
28
8962159b 29 let handle = std::thread::spawn({
f757af32
WB
30 let this = Arc::clone(&this);
31 move || this.thread_main()
8962159b
WB
32 });
33
f757af32
WB
34 this.thread.lock().unwrap().replace(handle);
35
36 Ok(this)
8962159b
WB
37 }
38
f757af32 39 fn thread_main(self: Arc<Self>) {
a22aece0 40 let mut buf: [EpollEvent; 16] = unsafe { std::mem::zeroed() };
d1b1deab 41 loop {
f757af32 42 let count = match self.epoll.wait(&mut buf, None) {
a22aece0
WB
43 Ok(count) => count,
44 Err(err) => {
45 eprintln!("error in epoll loop: {}", err);
46 std::process::exit(1);
47 }
48 };
49 for i in 0..count {
f757af32 50 self.handle_event(&buf[i]);
a22aece0
WB
51 }
52 }
53 }
54
f757af32 55 fn handle_event(&self, event: &EpollEvent) {
a22aece0 56 let registration = unsafe { &mut *(event.r#u64 as *mut RegistrationInner) };
f757af32
WB
57 for (fd, _) in self.removed.lock().unwrap().iter() {
58 // This fd is already being dropped, don't touch it!
59 if *fd == registration.fd {
60 return;
61 }
62 }
63
a22aece0
WB
64 if 0 != (event.events & EPOLLIN) {
65 //let _prev = registration.ready.fetch_or(READY_IN, Ordering::AcqRel);
66 if let Some(waker) = registration.read_waker.lock().unwrap().take() {
67 waker.wake();
68 }
69 }
70
71 if 0 != (event.events & EPOLLOUT) {
72 //let _prev = registration.ready.fetch_or(READY_OUT, Ordering::AcqRel);
73 if let Some(waker) = registration.write_waker.lock().unwrap().take() {
74 waker.wake();
75 }
76 }
77
78 if 0 != (event.events & (EPOLLERR | EPOLLHUP)) {
79 //let _prev = registration.ready.fetch_or(READY_ERR, Ordering::AcqRel);
80 if let Some(waker) = registration.read_waker.lock().unwrap().take() {
81 waker.wake();
82 }
83 if let Some(waker) = registration.write_waker.lock().unwrap().take() {
84 waker.wake();
85 }
d1b1deab 86 }
8962159b 87 }
a22aece0
WB
88
89 pub fn register(self: Arc<Self>, fd: RawFd) -> io::Result<Registration> {
90 let mut inner = Box::new(RegistrationInner {
91 fd,
92 //ready: AtomicU32::new(0),
93 reactor: Arc::clone(&self),
94 read_waker: Mutex::new(None),
95 write_waker: Mutex::new(None),
96 });
97
98 let inner_ptr = {
99 // type check/assertion
100 let inner_ptr: &mut RegistrationInner = &mut *inner;
101 // make raw pointer
102 inner_ptr as *mut RegistrationInner as usize as u64
103 };
104
a3afcf22
WB
105 self.epoll
106 .add_fd(fd, EPOLLIN | EPOLLOUT | EPOLLET, inner_ptr)?;
a22aece0
WB
107
108 Ok(Registration { inner: Some(inner) })
109 }
110
111 fn deregister(&self, registration: Box<RegistrationInner>) {
112 self.removed
113 .lock()
114 .unwrap()
115 .push((registration.fd, registration));
116 }
117}
118
119pub struct Registration {
120 // pin the data in memory because the other thread will access it
121 // ManuallyDrop::take is nightly only :<
122 inner: Option<Box<RegistrationInner>>,
123}
124
125impl Drop for Registration {
126 fn drop(&mut self) {
127 let reactor = Arc::clone(&self.inner.as_ref().unwrap().reactor);
128 reactor.deregister(self.inner.take().unwrap());
129 }
130}
131
132// This is accessed by the reactor
133struct RegistrationInner {
134 fd: RawFd,
135 //ready: AtomicU32,
136 reactor: Arc<Reactor>,
137 read_waker: Mutex<Option<Waker>>,
138 write_waker: Mutex<Option<Waker>>,
139}
140
141pub struct PolledFd {
142 fd: Fd,
143 registration: Registration,
144}
145
146impl PolledFd {
147 pub fn new(fd: Fd, reactor: Arc<Reactor>) -> io::Result<Self> {
148 let registration = reactor.register(fd.as_raw_fd())?;
149 Ok(Self { fd, registration })
150 }
151}
152
153impl PolledFd {
154 pub fn poll_read(&mut self, data: &mut [u8], cx: &mut Context) -> Poll<io::Result<usize>> {
155 let fd = self.fd.as_raw_fd();
156 let size = libc::size_t::try_from(data.len()).map_err(io_err_other)?;
157 let mut read_waker = self
158 .registration
159 .inner
160 .as_ref()
161 .unwrap()
162 .read_waker
163 .lock()
164 .unwrap();
165 match c_result!(unsafe { libc::read(fd, data.as_mut_ptr() as *mut libc::c_void, size) }) {
166 Ok(got) => Poll::Ready(Ok(got as usize)),
167 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
168 *read_waker = Some(cx.waker().clone());
169 Poll::Pending
170 }
171 Err(err) => Poll::Ready(Err(err)),
172 }
173 }
174
175 pub async fn read(&mut self, data: &mut [u8]) -> io::Result<usize> {
176 poll_fn(move |cx| self.poll_read(data, cx)).await
177 }
beb2f986
WB
178
179 pub fn poll_write(&mut self, data: &[u8], cx: &mut Context) -> Poll<io::Result<usize>> {
180 let fd = self.fd.as_raw_fd();
181 let size = libc::size_t::try_from(data.len()).map_err(io_err_other)?;
182 let mut write_waker = self
183 .registration
184 .inner
185 .as_ref()
186 .unwrap()
187 .write_waker
188 .lock()
189 .unwrap();
190 match c_result!(unsafe { libc::write(fd, data.as_ptr() as *const libc::c_void, size) }) {
191 Ok(got) => Poll::Ready(Ok(got as usize)),
192 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
193 *write_waker = Some(cx.waker().clone());
194 Poll::Pending
195 }
196 Err(err) => Poll::Ready(Err(err)),
197 }
198 }
199
200 pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
201 poll_fn(move |cx| self.poll_write(data, cx)).await
202 }
8962159b 203}