]> git.proxmox.com Git - pve-lxc-syscalld.git/blame - src/reactor.rs
stuff
[pve-lxc-syscalld.git] / src / reactor.rs
CommitLineData
a22aece0 1use std::convert::TryFrom;
8962159b 2use std::io;
a22aece0
WB
3use std::os::unix::io::{AsRawFd, RawFd};
4use std::sync::atomic::{AtomicU32, Ordering};
5use std::sync::{mpsc, Arc, Mutex};
6use std::task::{Context, Poll, Waker};
8962159b
WB
7use std::thread::JoinHandle;
8
a22aece0
WB
9use crate::epoll::{Epoll, EpollEvent, EPOLLERR, EPOLLHUP, EPOLLIN, EPOLLOUT};
10use crate::error::io_err_other;
11use crate::poll_fn::poll_fn;
12use crate::tools::Fd;
8962159b
WB
13
14pub struct AssertSync<T>(pub T);
15unsafe impl<T> Sync for AssertSync<T> {}
16
a22aece0
WB
17pub const READY_IN: u32 = 0b001;
18pub const READY_OUT: u32 = 0b010;
19pub const READY_ERR: u32 = 0b100;
20
8962159b
WB
21pub struct Reactor {
22 epoll: Arc<Epoll>,
a22aece0 23 removed: Mutex<Vec<(RawFd, Box<RegistrationInner>)>>,
8962159b
WB
24 thread: JoinHandle<()>,
25}
26
27impl Reactor {
a22aece0 28 pub fn new() -> io::Result<Arc<Self>> {
8962159b
WB
29 let epoll = Arc::new(Epoll::new()?);
30
8962159b
WB
31 let handle = std::thread::spawn({
32 let epoll = Arc::clone(&epoll);
a22aece0 33 move || Self::thread_main(epoll)
8962159b
WB
34 });
35
a22aece0 36 Ok(Arc::new(Self {
8962159b 37 epoll,
a22aece0 38 removed: Mutex::new(Vec::new()),
8962159b 39 thread: handle,
a22aece0 40 }))
8962159b
WB
41 }
42
a22aece0
WB
43 fn thread_main(epoll: Arc<Epoll>) {
44 let mut buf: [EpollEvent; 16] = unsafe { std::mem::zeroed() };
d1b1deab 45 loop {
a22aece0
WB
46 let count = match epoll.wait(&mut buf, None) {
47 Ok(count) => count,
48 Err(err) => {
49 eprintln!("error in epoll loop: {}", err);
50 std::process::exit(1);
51 }
52 };
53 for i in 0..count {
54 Self::handle_event(&buf[i]);
55 }
56 }
57 }
58
59 fn handle_event(event: &EpollEvent) {
60 let registration = unsafe { &mut *(event.r#u64 as *mut RegistrationInner) };
61 if 0 != (event.events & EPOLLIN) {
62 //let _prev = registration.ready.fetch_or(READY_IN, Ordering::AcqRel);
63 if let Some(waker) = registration.read_waker.lock().unwrap().take() {
64 waker.wake();
65 }
66 }
67
68 if 0 != (event.events & EPOLLOUT) {
69 //let _prev = registration.ready.fetch_or(READY_OUT, Ordering::AcqRel);
70 if let Some(waker) = registration.write_waker.lock().unwrap().take() {
71 waker.wake();
72 }
73 }
74
75 if 0 != (event.events & (EPOLLERR | EPOLLHUP)) {
76 //let _prev = registration.ready.fetch_or(READY_ERR, Ordering::AcqRel);
77 if let Some(waker) = registration.read_waker.lock().unwrap().take() {
78 waker.wake();
79 }
80 if let Some(waker) = registration.write_waker.lock().unwrap().take() {
81 waker.wake();
82 }
d1b1deab 83 }
8962159b 84 }
a22aece0
WB
85
86 pub fn register(self: Arc<Self>, fd: RawFd) -> io::Result<Registration> {
87 let mut inner = Box::new(RegistrationInner {
88 fd,
89 //ready: AtomicU32::new(0),
90 reactor: Arc::clone(&self),
91 read_waker: Mutex::new(None),
92 write_waker: Mutex::new(None),
93 });
94
95 let inner_ptr = {
96 // type check/assertion
97 let inner_ptr: &mut RegistrationInner = &mut *inner;
98 // make raw pointer
99 inner_ptr as *mut RegistrationInner as usize as u64
100 };
101
102 self.epoll.add_fd(fd, EPOLLIN | EPOLLOUT, inner_ptr)?;
103
104 Ok(Registration { inner: Some(inner) })
105 }
106
107 fn deregister(&self, registration: Box<RegistrationInner>) {
108 self.removed
109 .lock()
110 .unwrap()
111 .push((registration.fd, registration));
112 }
113}
114
115pub struct Registration {
116 // pin the data in memory because the other thread will access it
117 // ManuallyDrop::take is nightly only :<
118 inner: Option<Box<RegistrationInner>>,
119}
120
121impl Drop for Registration {
122 fn drop(&mut self) {
123 let reactor = Arc::clone(&self.inner.as_ref().unwrap().reactor);
124 reactor.deregister(self.inner.take().unwrap());
125 }
126}
127
128// This is accessed by the reactor
129struct RegistrationInner {
130 fd: RawFd,
131 //ready: AtomicU32,
132 reactor: Arc<Reactor>,
133 read_waker: Mutex<Option<Waker>>,
134 write_waker: Mutex<Option<Waker>>,
135}
136
137pub struct PolledFd {
138 fd: Fd,
139 registration: Registration,
140}
141
142impl PolledFd {
143 pub fn new(fd: Fd, reactor: Arc<Reactor>) -> io::Result<Self> {
144 let registration = reactor.register(fd.as_raw_fd())?;
145 Ok(Self { fd, registration })
146 }
147}
148
149impl PolledFd {
150 pub fn poll_read(&mut self, data: &mut [u8], cx: &mut Context) -> Poll<io::Result<usize>> {
151 let fd = self.fd.as_raw_fd();
152 let size = libc::size_t::try_from(data.len()).map_err(io_err_other)?;
153 let mut read_waker = self
154 .registration
155 .inner
156 .as_ref()
157 .unwrap()
158 .read_waker
159 .lock()
160 .unwrap();
161 match c_result!(unsafe { libc::read(fd, data.as_mut_ptr() as *mut libc::c_void, size) }) {
162 Ok(got) => Poll::Ready(Ok(got as usize)),
163 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
164 *read_waker = Some(cx.waker().clone());
165 Poll::Pending
166 }
167 Err(err) => Poll::Ready(Err(err)),
168 }
169 }
170
171 pub async fn read(&mut self, data: &mut [u8]) -> io::Result<usize> {
172 poll_fn(move |cx| self.poll_read(data, cx)).await
173 }
8962159b 174}