1 use std
::convert
::TryFrom
;
3 use std
::os
::unix
::io
::{AsRawFd, RawFd}
;
4 use std
::sync
::atomic
::{AtomicBool, Ordering}
;
5 use std
::sync
::{Arc, Mutex, Once}
;
6 use std
::task
::{Context, Poll, Waker}
;
7 use std
::thread
::JoinHandle
;
9 use crate::epoll
::{Epoll, EpollEvent, EPOLLERR, EPOLLET, EPOLLHUP, EPOLLIN, EPOLLOUT}
;
10 use crate::error
::io_err_other
;
11 use crate::poll_fn
::poll_fn
;
14 static START
: Once
= Once
::new();
15 static mut REACTOR
: Option
<Arc
<Reactor
>> = None
;
17 pub fn default() -> Arc
<Reactor
> {
18 START
.call_once(|| unsafe {
19 let reactor
= Reactor
::new().expect("setup main epoll reactor");
20 REACTOR
= Some(reactor
);
22 unsafe { Arc::clone(REACTOR.as_ref().unwrap()) }
27 removed
: Mutex
<Vec
<Box
<RegistrationInner
>>>,
28 thread
: Mutex
<Option
<JoinHandle
<()>>>,
32 pub fn new() -> io
::Result
<Arc
<Self>> {
33 let epoll
= Arc
::new(Epoll
::new()?
);
35 let this
= Arc
::new(Reactor
{
37 removed
: Mutex
::new(Vec
::new()),
38 thread
: Mutex
::new(None
),
41 let handle
= std
::thread
::spawn({
42 let this
= Arc
::clone(&this
);
43 move || this
.thread_main()
46 this
.thread
.lock().unwrap().replace(handle
);
51 fn thread_main(self: Arc
<Self>) {
52 let mut buf
: [EpollEvent
; 16] = unsafe { std::mem::zeroed() }
;
54 let count
= match self.epoll
.wait(&mut buf
, None
) {
57 eprintln
!("error in epoll loop: {}", err
);
58 std
::process
::exit(1);
62 self.handle_event(&buf
[i
]);
64 // After going through the events we can release memory associated with already closed
66 self.removed
.lock().unwrap().clear();
70 fn handle_event(&self, event
: &EpollEvent
) {
71 let registration
= unsafe { &mut *(event.r#u64 as *mut RegistrationInner) }
;
72 if registration
.gone
.load(Ordering
::Acquire
) {
76 if 0 != (event
.events
& EPOLLIN
) {
77 //let _prev = registration.ready.fetch_or(READY_IN, Ordering::AcqRel);
78 if let Some(waker
) = registration
.read_waker
.lock().unwrap().take() {
83 if 0 != (event
.events
& EPOLLOUT
) {
84 //let _prev = registration.ready.fetch_or(READY_OUT, Ordering::AcqRel);
85 if let Some(waker
) = registration
.write_waker
.lock().unwrap().take() {
90 if 0 != (event
.events
& (EPOLLERR
| EPOLLHUP
)) {
91 //let _prev = registration.ready.fetch_or(READY_ERR, Ordering::AcqRel);
92 if let Some(waker
) = registration
.read_waker
.lock().unwrap().take() {
95 if let Some(waker
) = registration
.write_waker
.lock().unwrap().take() {
101 pub fn register(self: Arc
<Self>, fd
: RawFd
) -> io
::Result
<Registration
> {
102 let mut inner
= Box
::new(RegistrationInner
{
103 gone
: AtomicBool
::new(false),
104 reactor
: Arc
::clone(&self),
105 read_waker
: Mutex
::new(None
),
106 write_waker
: Mutex
::new(None
),
110 // type check/assertion
111 let inner_ptr
: &mut RegistrationInner
= &mut *inner
;
113 inner_ptr
as *mut RegistrationInner
as usize as u64
117 .add_fd(fd
, EPOLLIN
| EPOLLOUT
| EPOLLET
, inner_ptr
)?
;
119 Ok(Registration { inner: Some(inner) }
)
122 fn deregister(&self, registration
: Box
<RegistrationInner
>) {
123 self.removed
.lock().unwrap().push(registration
);
127 pub struct Registration
{
128 // pin the data in memory because the other thread will access it
129 // ManuallyDrop::take is nightly only :<
130 inner
: Option
<Box
<RegistrationInner
>>,
133 impl Drop
for Registration
{
135 let inner
= self.inner
.as_ref().unwrap();
136 let reactor
= Arc
::clone(&inner
.reactor
);
137 inner
.gone
.store(true, Ordering
::Release
);
138 reactor
.deregister(self.inner
.take().unwrap());
142 // This is accessed by the reactor
143 struct RegistrationInner
{
145 reactor
: Arc
<Reactor
>,
146 read_waker
: Mutex
<Option
<Waker
>>,
147 write_waker
: Mutex
<Option
<Waker
>>,
150 pub struct PolledFd
{
152 registration
: Registration
,
156 pub fn new(fd
: Fd
) -> io
::Result
<Self> {
157 Self::new_with_reactor(fd
, crate::reactor
::default())
160 pub fn new_with_reactor(fd
: Fd
, reactor
: Arc
<Reactor
>) -> io
::Result
<Self> {
161 let registration
= reactor
.register(fd
.as_raw_fd())?
;
162 Ok(Self { fd, registration }
)
167 pub fn poll_read(&mut self, data
: &mut [u8], cx
: &mut Context
) -> Poll
<io
::Result
<usize>> {
168 let fd
= self.fd
.as_raw_fd();
169 let size
= libc
::size_t
::try_from(data
.len()).map_err(io_err_other
)?
;
170 let mut read_waker
= self
178 match c_result
!(unsafe { libc::read(fd, data.as_mut_ptr() as *mut libc::c_void, size) }
) {
179 Ok(got
) => Poll
::Ready(Ok(got
as usize)),
180 Err(ref err
) if err
.kind() == io
::ErrorKind
::WouldBlock
=> {
181 *read_waker
= Some(cx
.waker().clone());
184 Err(err
) => Poll
::Ready(Err(err
)),
188 pub async
fn read(&mut self, data
: &mut [u8]) -> io
::Result
<usize> {
189 poll_fn(move |cx
| self.poll_read(data
, cx
)).await
192 pub fn poll_write(&mut self, data
: &[u8], cx
: &mut Context
) -> Poll
<io
::Result
<usize>> {
193 let fd
= self.fd
.as_raw_fd();
194 let size
= libc
::size_t
::try_from(data
.len()).map_err(io_err_other
)?
;
195 let mut write_waker
= self
203 match c_result
!(unsafe { libc::write(fd, data.as_ptr() as *const libc::c_void, size) }
) {
204 Ok(got
) => Poll
::Ready(Ok(got
as usize)),
205 Err(ref err
) if err
.kind() == io
::ErrorKind
::WouldBlock
=> {
206 *write_waker
= Some(cx
.waker().clone());
209 Err(err
) => Poll
::Ready(Err(err
)),
213 pub async
fn write(&mut self, data
: &[u8]) -> io
::Result
<usize> {
214 poll_fn(move |cx
| self.poll_write(data
, cx
)).await