]> git.proxmox.com Git - pve-lxc-syscalld.git/blame - src/io/reactor.rs
also use pidfd_open for explicit pids
[pve-lxc-syscalld.git] / src / io / reactor.rs
CommitLineData
8962159b 1use std::io;
8dd26985 2use std::os::unix::io::{AsRawFd, IntoRawFd, RawFd};
9ebd1972 3use std::sync::atomic::{AtomicBool, Ordering};
0e1eba91 4use std::sync::{Arc, Mutex, Once};
a22aece0 5use std::task::{Context, Poll, Waker};
8962159b
WB
6use std::thread::JoinHandle;
7
944fd4f5 8use crate::error::io_err_other;
c84b9055 9use crate::io::epoll::{Epoll, EpollEvent, EPOLLERR, EPOLLET, EPOLLHUP, EPOLLIN, EPOLLOUT};
a22aece0 10use crate::tools::Fd;
8962159b 11
0e1eba91
WB
12static START: Once = Once::new();
13static mut REACTOR: Option<Arc<Reactor>> = None;
14
8dd26985 15pub fn default_reactor() -> Arc<Reactor> {
0e1eba91
WB
16 START.call_once(|| unsafe {
17 let reactor = Reactor::new().expect("setup main epoll reactor");
18 REACTOR = Some(reactor);
19 });
20 unsafe { Arc::clone(REACTOR.as_ref().unwrap()) }
21}
22
8962159b
WB
23pub struct Reactor {
24 epoll: Arc<Epoll>,
9ebd1972 25 removed: Mutex<Vec<Box<RegistrationInner>>>,
f757af32 26 thread: Mutex<Option<JoinHandle<()>>>,
8962159b
WB
27}
28
29impl Reactor {
a22aece0 30 pub fn new() -> io::Result<Arc<Self>> {
8962159b
WB
31 let epoll = Arc::new(Epoll::new()?);
32
f757af32
WB
33 let this = Arc::new(Reactor {
34 epoll,
35 removed: Mutex::new(Vec::new()),
36 thread: Mutex::new(None),
37 });
38
8962159b 39 let handle = std::thread::spawn({
f757af32
WB
40 let this = Arc::clone(&this);
41 move || this.thread_main()
8962159b
WB
42 });
43
f757af32
WB
44 this.thread.lock().unwrap().replace(handle);
45
46 Ok(this)
8962159b
WB
47 }
48
f757af32 49 fn thread_main(self: Arc<Self>) {
a22aece0 50 let mut buf: [EpollEvent; 16] = unsafe { std::mem::zeroed() };
d1b1deab 51 loop {
f757af32 52 let count = match self.epoll.wait(&mut buf, None) {
a22aece0
WB
53 Ok(count) => count,
54 Err(err) => {
55 eprintln!("error in epoll loop: {}", err);
56 std::process::exit(1);
57 }
58 };
59 for i in 0..count {
f757af32 60 self.handle_event(&buf[i]);
a22aece0 61 }
9ebd1972
WB
62 // After going through the events we can release memory associated with already closed
63 // file descriptors:
64 self.removed.lock().unwrap().clear();
a22aece0
WB
65 }
66 }
67
f757af32 68 fn handle_event(&self, event: &EpollEvent) {
a22aece0 69 let registration = unsafe { &mut *(event.r#u64 as *mut RegistrationInner) };
9ebd1972
WB
70 if registration.gone.load(Ordering::Acquire) {
71 return;
f757af32
WB
72 }
73
a22aece0
WB
74 if 0 != (event.events & EPOLLIN) {
75 //let _prev = registration.ready.fetch_or(READY_IN, Ordering::AcqRel);
76 if let Some(waker) = registration.read_waker.lock().unwrap().take() {
77 waker.wake();
78 }
79 }
80
81 if 0 != (event.events & EPOLLOUT) {
82 //let _prev = registration.ready.fetch_or(READY_OUT, Ordering::AcqRel);
83 if let Some(waker) = registration.write_waker.lock().unwrap().take() {
84 waker.wake();
85 }
86 }
87
88 if 0 != (event.events & (EPOLLERR | EPOLLHUP)) {
89 //let _prev = registration.ready.fetch_or(READY_ERR, Ordering::AcqRel);
90 if let Some(waker) = registration.read_waker.lock().unwrap().take() {
91 waker.wake();
92 }
93 if let Some(waker) = registration.write_waker.lock().unwrap().take() {
94 waker.wake();
95 }
d1b1deab 96 }
8962159b 97 }
a22aece0
WB
98
99 pub fn register(self: Arc<Self>, fd: RawFd) -> io::Result<Registration> {
100 let mut inner = Box::new(RegistrationInner {
9ebd1972 101 gone: AtomicBool::new(false),
a22aece0
WB
102 reactor: Arc::clone(&self),
103 read_waker: Mutex::new(None),
104 write_waker: Mutex::new(None),
105 });
106
107 let inner_ptr = {
108 // type check/assertion
109 let inner_ptr: &mut RegistrationInner = &mut *inner;
110 // make raw pointer
111 inner_ptr as *mut RegistrationInner as usize as u64
112 };
113
a3afcf22
WB
114 self.epoll
115 .add_fd(fd, EPOLLIN | EPOLLOUT | EPOLLET, inner_ptr)?;
a22aece0
WB
116
117 Ok(Registration { inner: Some(inner) })
118 }
119
120 fn deregister(&self, registration: Box<RegistrationInner>) {
9ebd1972 121 self.removed.lock().unwrap().push(registration);
a22aece0
WB
122 }
123}
124
125pub struct Registration {
126 // pin the data in memory because the other thread will access it
127 // ManuallyDrop::take is nightly only :<
128 inner: Option<Box<RegistrationInner>>,
129}
130
131impl Drop for Registration {
132 fn drop(&mut self) {
8dd26985
WB
133 if let Some(inner) = self.inner.take() {
134 let reactor = Arc::clone(&inner.reactor);
135 inner.gone.store(true, Ordering::Release);
136 reactor.deregister(inner);
137 }
a22aece0
WB
138 }
139}
140
141// This is accessed by the reactor
142struct RegistrationInner {
9ebd1972 143 gone: AtomicBool,
a22aece0
WB
144 reactor: Arc<Reactor>,
145 read_waker: Mutex<Option<Waker>>,
146 write_waker: Mutex<Option<Waker>>,
147}
148
149pub struct PolledFd {
150 fd: Fd,
151 registration: Registration,
152}
153
f4c53643
WB
154impl AsRawFd for PolledFd {
155 #[inline]
156 fn as_raw_fd(&self) -> RawFd {
157 self.fd.as_raw_fd()
158 }
159}
160
8dd26985
WB
161impl IntoRawFd for PolledFd {
162 fn into_raw_fd(mut self) -> RawFd {
163 let registration = self.registration.inner.take().unwrap();
164 registration
165 .reactor
166 .epoll
167 .remove_fd(self.as_raw_fd())
168 .expect("cannot remove PolledFd from epoll instance");
169 self.fd.into_raw_fd()
170 }
171}
172
a22aece0 173impl PolledFd {
1282264a 174 pub fn new(mut fd: Fd) -> io::Result<Self> {
944fd4f5 175 fd.set_nonblocking(true).map_err(io_err_other)?;
8dd26985 176 Self::new_with_reactor(fd, self::default_reactor())
ca7f6ba0
WB
177 }
178
179 pub fn new_with_reactor(fd: Fd, reactor: Arc<Reactor>) -> io::Result<Self> {
a22aece0
WB
180 let registration = reactor.register(fd.as_raw_fd())?;
181 Ok(Self { fd, registration })
182 }
a22aece0 183
8dd26985 184 pub fn wrap_read<T, F>(&self, cx: &mut Context, func: F) -> Poll<io::Result<T>>
86b83867
WB
185 where
186 F: FnOnce() -> io::Result<T>,
187 {
a22aece0
WB
188 let mut read_waker = self
189 .registration
190 .inner
191 .as_ref()
192 .unwrap()
193 .read_waker
194 .lock()
195 .unwrap();
86b83867
WB
196 match func() {
197 Ok(out) => Poll::Ready(Ok(out)),
a22aece0
WB
198 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
199 *read_waker = Some(cx.waker().clone());
200 Poll::Pending
201 }
202 Err(err) => Poll::Ready(Err(err)),
203 }
204 }
205
8dd26985 206 pub fn wrap_write<T, F>(&self, cx: &mut Context, func: F) -> Poll<io::Result<T>>
86b83867
WB
207 where
208 F: FnOnce() -> io::Result<T>,
209 {
beb2f986
WB
210 let mut write_waker = self
211 .registration
212 .inner
213 .as_ref()
214 .unwrap()
215 .write_waker
216 .lock()
217 .unwrap();
86b83867
WB
218 match func() {
219 Ok(out) => Poll::Ready(Ok(out)),
beb2f986
WB
220 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
221 *write_waker = Some(cx.waker().clone());
222 Poll::Pending
223 }
224 Err(err) => Poll::Ready(Err(err)),
225 }
226 }
86b83867 227}