1 use std
::convert
::TryFrom
;
3 use std
::os
::unix
::io
::{AsRawFd, RawFd}
;
4 use std
::sync
::{Arc, Mutex}
;
5 use std
::task
::{Context, Poll, Waker}
;
6 use std
::thread
::JoinHandle
;
8 use crate::epoll
::{Epoll, EpollEvent, EPOLLERR, EPOLLHUP, EPOLLIN, EPOLLOUT}
;
9 use crate::error
::io_err_other
;
10 use crate::poll_fn
::poll_fn
;
15 removed
: Mutex
<Vec
<(RawFd
, Box
<RegistrationInner
>)>>,
16 thread
: Mutex
<Option
<JoinHandle
<()>>>,
20 pub fn new() -> io
::Result
<Arc
<Self>> {
21 let epoll
= Arc
::new(Epoll
::new()?
);
23 let this
= Arc
::new(Reactor
{
25 removed
: Mutex
::new(Vec
::new()),
26 thread
: Mutex
::new(None
),
29 let handle
= std
::thread
::spawn({
30 let this
= Arc
::clone(&this
);
31 move || this
.thread_main()
34 this
.thread
.lock().unwrap().replace(handle
);
39 fn thread_main(self: Arc
<Self>) {
40 let mut buf
: [EpollEvent
; 16] = unsafe { std::mem::zeroed() }
;
42 let count
= match self.epoll
.wait(&mut buf
, None
) {
45 eprintln
!("error in epoll loop: {}", err
);
46 std
::process
::exit(1);
50 self.handle_event(&buf
[i
]);
55 fn handle_event(&self, event
: &EpollEvent
) {
56 let registration
= unsafe { &mut *(event.r#u64 as *mut RegistrationInner) }
;
57 for (fd
, _
) in self.removed
.lock().unwrap().iter() {
58 // This fd is already being dropped, don't touch it!
59 if *fd
== registration
.fd
{
64 if 0 != (event
.events
& EPOLLIN
) {
65 //let _prev = registration.ready.fetch_or(READY_IN, Ordering::AcqRel);
66 if let Some(waker
) = registration
.read_waker
.lock().unwrap().take() {
71 if 0 != (event
.events
& EPOLLOUT
) {
72 //let _prev = registration.ready.fetch_or(READY_OUT, Ordering::AcqRel);
73 if let Some(waker
) = registration
.write_waker
.lock().unwrap().take() {
78 if 0 != (event
.events
& (EPOLLERR
| EPOLLHUP
)) {
79 //let _prev = registration.ready.fetch_or(READY_ERR, Ordering::AcqRel);
80 if let Some(waker
) = registration
.read_waker
.lock().unwrap().take() {
83 if let Some(waker
) = registration
.write_waker
.lock().unwrap().take() {
89 pub fn register(self: Arc
<Self>, fd
: RawFd
) -> io
::Result
<Registration
> {
90 let mut inner
= Box
::new(RegistrationInner
{
92 //ready: AtomicU32::new(0),
93 reactor
: Arc
::clone(&self),
94 read_waker
: Mutex
::new(None
),
95 write_waker
: Mutex
::new(None
),
99 // type check/assertion
100 let inner_ptr
: &mut RegistrationInner
= &mut *inner
;
102 inner_ptr
as *mut RegistrationInner
as usize as u64
105 self.epoll
.add_fd(fd
, EPOLLIN
| EPOLLOUT
, inner_ptr
)?
;
107 Ok(Registration { inner: Some(inner) }
)
110 fn deregister(&self, registration
: Box
<RegistrationInner
>) {
114 .push((registration
.fd
, registration
));
118 pub struct Registration
{
119 // pin the data in memory because the other thread will access it
120 // ManuallyDrop::take is nightly only :<
121 inner
: Option
<Box
<RegistrationInner
>>,
124 impl Drop
for Registration
{
126 let reactor
= Arc
::clone(&self.inner
.as_ref().unwrap().reactor
);
127 reactor
.deregister(self.inner
.take().unwrap());
131 // This is accessed by the reactor
132 struct RegistrationInner
{
135 reactor
: Arc
<Reactor
>,
136 read_waker
: Mutex
<Option
<Waker
>>,
137 write_waker
: Mutex
<Option
<Waker
>>,
140 pub struct PolledFd
{
142 registration
: Registration
,
146 pub fn new(fd
: Fd
, reactor
: Arc
<Reactor
>) -> io
::Result
<Self> {
147 let registration
= reactor
.register(fd
.as_raw_fd())?
;
148 Ok(Self { fd, registration }
)
153 pub fn poll_read(&mut self, data
: &mut [u8], cx
: &mut Context
) -> Poll
<io
::Result
<usize>> {
154 let fd
= self.fd
.as_raw_fd();
155 let size
= libc
::size_t
::try_from(data
.len()).map_err(io_err_other
)?
;
156 let mut read_waker
= self
164 match c_result
!(unsafe { libc::read(fd, data.as_mut_ptr() as *mut libc::c_void, size) }
) {
165 Ok(got
) => Poll
::Ready(Ok(got
as usize)),
166 Err(ref err
) if err
.kind() == io
::ErrorKind
::WouldBlock
=> {
167 *read_waker
= Some(cx
.waker().clone());
170 Err(err
) => Poll
::Ready(Err(err
)),
174 pub async
fn read(&mut self, data
: &mut [u8]) -> io
::Result
<usize> {
175 poll_fn(move |cx
| self.poll_read(data
, cx
)).await
178 pub fn poll_write(&mut self, data
: &[u8], cx
: &mut Context
) -> Poll
<io
::Result
<usize>> {
179 let fd
= self.fd
.as_raw_fd();
180 let size
= libc
::size_t
::try_from(data
.len()).map_err(io_err_other
)?
;
181 let mut write_waker
= self
189 match c_result
!(unsafe { libc::write(fd, data.as_ptr() as *const libc::c_void, size) }
) {
190 Ok(got
) => Poll
::Ready(Ok(got
as usize)),
191 Err(ref err
) if err
.kind() == io
::ErrorKind
::WouldBlock
=> {
192 *write_waker
= Some(cx
.waker().clone());
195 Err(err
) => Poll
::Ready(Err(err
)),
199 pub async
fn write(&mut self, data
: &[u8]) -> io
::Result
<usize> {
200 poll_fn(move |cx
| self.poll_write(data
, cx
)).await