]>
Commit | Line | Data |
---|---|---|
a22aece0 | 1 | use std::convert::TryFrom; |
8962159b | 2 | use std::io; |
a22aece0 WB |
3 | use std::os::unix::io::{AsRawFd, RawFd}; |
4 | use std::sync::atomic::{AtomicU32, Ordering}; | |
5 | use std::sync::{mpsc, Arc, Mutex}; | |
6 | use std::task::{Context, Poll, Waker}; | |
8962159b WB |
7 | use std::thread::JoinHandle; |
8 | ||
a22aece0 WB |
9 | use crate::epoll::{Epoll, EpollEvent, EPOLLERR, EPOLLHUP, EPOLLIN, EPOLLOUT}; |
10 | use crate::error::io_err_other; | |
11 | use crate::poll_fn::poll_fn; | |
12 | use crate::tools::Fd; | |
8962159b WB |
13 | |
14 | pub struct AssertSync<T>(pub T); | |
15 | unsafe impl<T> Sync for AssertSync<T> {} | |
16 | ||
a22aece0 WB |
17 | pub const READY_IN: u32 = 0b001; |
18 | pub const READY_OUT: u32 = 0b010; | |
19 | pub const READY_ERR: u32 = 0b100; | |
20 | ||
8962159b WB |
21 | pub struct Reactor { |
22 | epoll: Arc<Epoll>, | |
a22aece0 | 23 | removed: Mutex<Vec<(RawFd, Box<RegistrationInner>)>>, |
8962159b WB |
24 | thread: JoinHandle<()>, |
25 | } | |
26 | ||
27 | impl Reactor { | |
a22aece0 | 28 | pub fn new() -> io::Result<Arc<Self>> { |
8962159b WB |
29 | let epoll = Arc::new(Epoll::new()?); |
30 | ||
8962159b WB |
31 | let handle = std::thread::spawn({ |
32 | let epoll = Arc::clone(&epoll); | |
a22aece0 | 33 | move || Self::thread_main(epoll) |
8962159b WB |
34 | }); |
35 | ||
a22aece0 | 36 | Ok(Arc::new(Self { |
8962159b | 37 | epoll, |
a22aece0 | 38 | removed: Mutex::new(Vec::new()), |
8962159b | 39 | thread: handle, |
a22aece0 | 40 | })) |
8962159b WB |
41 | } |
42 | ||
a22aece0 WB |
43 | fn thread_main(epoll: Arc<Epoll>) { |
44 | let mut buf: [EpollEvent; 16] = unsafe { std::mem::zeroed() }; | |
d1b1deab | 45 | loop { |
a22aece0 WB |
46 | let count = match epoll.wait(&mut buf, None) { |
47 | Ok(count) => count, | |
48 | Err(err) => { | |
49 | eprintln!("error in epoll loop: {}", err); | |
50 | std::process::exit(1); | |
51 | } | |
52 | }; | |
53 | for i in 0..count { | |
54 | Self::handle_event(&buf[i]); | |
55 | } | |
56 | } | |
57 | } | |
58 | ||
59 | fn handle_event(event: &EpollEvent) { | |
60 | let registration = unsafe { &mut *(event.r#u64 as *mut RegistrationInner) }; | |
61 | if 0 != (event.events & EPOLLIN) { | |
62 | //let _prev = registration.ready.fetch_or(READY_IN, Ordering::AcqRel); | |
63 | if let Some(waker) = registration.read_waker.lock().unwrap().take() { | |
64 | waker.wake(); | |
65 | } | |
66 | } | |
67 | ||
68 | if 0 != (event.events & EPOLLOUT) { | |
69 | //let _prev = registration.ready.fetch_or(READY_OUT, Ordering::AcqRel); | |
70 | if let Some(waker) = registration.write_waker.lock().unwrap().take() { | |
71 | waker.wake(); | |
72 | } | |
73 | } | |
74 | ||
75 | if 0 != (event.events & (EPOLLERR | EPOLLHUP)) { | |
76 | //let _prev = registration.ready.fetch_or(READY_ERR, Ordering::AcqRel); | |
77 | if let Some(waker) = registration.read_waker.lock().unwrap().take() { | |
78 | waker.wake(); | |
79 | } | |
80 | if let Some(waker) = registration.write_waker.lock().unwrap().take() { | |
81 | waker.wake(); | |
82 | } | |
d1b1deab | 83 | } |
8962159b | 84 | } |
a22aece0 WB |
85 | |
86 | pub fn register(self: Arc<Self>, fd: RawFd) -> io::Result<Registration> { | |
87 | let mut inner = Box::new(RegistrationInner { | |
88 | fd, | |
89 | //ready: AtomicU32::new(0), | |
90 | reactor: Arc::clone(&self), | |
91 | read_waker: Mutex::new(None), | |
92 | write_waker: Mutex::new(None), | |
93 | }); | |
94 | ||
95 | let inner_ptr = { | |
96 | // type check/assertion | |
97 | let inner_ptr: &mut RegistrationInner = &mut *inner; | |
98 | // make raw pointer | |
99 | inner_ptr as *mut RegistrationInner as usize as u64 | |
100 | }; | |
101 | ||
102 | self.epoll.add_fd(fd, EPOLLIN | EPOLLOUT, inner_ptr)?; | |
103 | ||
104 | Ok(Registration { inner: Some(inner) }) | |
105 | } | |
106 | ||
107 | fn deregister(&self, registration: Box<RegistrationInner>) { | |
108 | self.removed | |
109 | .lock() | |
110 | .unwrap() | |
111 | .push((registration.fd, registration)); | |
112 | } | |
113 | } | |
114 | ||
115 | pub struct Registration { | |
116 | // pin the data in memory because the other thread will access it | |
117 | // ManuallyDrop::take is nightly only :< | |
118 | inner: Option<Box<RegistrationInner>>, | |
119 | } | |
120 | ||
121 | impl Drop for Registration { | |
122 | fn drop(&mut self) { | |
123 | let reactor = Arc::clone(&self.inner.as_ref().unwrap().reactor); | |
124 | reactor.deregister(self.inner.take().unwrap()); | |
125 | } | |
126 | } | |
127 | ||
128 | // This is accessed by the reactor | |
129 | struct RegistrationInner { | |
130 | fd: RawFd, | |
131 | //ready: AtomicU32, | |
132 | reactor: Arc<Reactor>, | |
133 | read_waker: Mutex<Option<Waker>>, | |
134 | write_waker: Mutex<Option<Waker>>, | |
135 | } | |
136 | ||
137 | pub struct PolledFd { | |
138 | fd: Fd, | |
139 | registration: Registration, | |
140 | } | |
141 | ||
142 | impl PolledFd { | |
143 | pub fn new(fd: Fd, reactor: Arc<Reactor>) -> io::Result<Self> { | |
144 | let registration = reactor.register(fd.as_raw_fd())?; | |
145 | Ok(Self { fd, registration }) | |
146 | } | |
147 | } | |
148 | ||
149 | impl PolledFd { | |
150 | pub fn poll_read(&mut self, data: &mut [u8], cx: &mut Context) -> Poll<io::Result<usize>> { | |
151 | let fd = self.fd.as_raw_fd(); | |
152 | let size = libc::size_t::try_from(data.len()).map_err(io_err_other)?; | |
153 | let mut read_waker = self | |
154 | .registration | |
155 | .inner | |
156 | .as_ref() | |
157 | .unwrap() | |
158 | .read_waker | |
159 | .lock() | |
160 | .unwrap(); | |
161 | match c_result!(unsafe { libc::read(fd, data.as_mut_ptr() as *mut libc::c_void, size) }) { | |
162 | Ok(got) => Poll::Ready(Ok(got as usize)), | |
163 | Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { | |
164 | *read_waker = Some(cx.waker().clone()); | |
165 | Poll::Pending | |
166 | } | |
167 | Err(err) => Poll::Ready(Err(err)), | |
168 | } | |
169 | } | |
170 | ||
171 | pub async fn read(&mut self, data: &mut [u8]) -> io::Result<usize> { | |
172 | poll_fn(move |cx| self.poll_read(data, cx)).await | |
173 | } | |
8962159b | 174 | } |