2 use std
::os
::unix
::io
::{AsRawFd, IntoRawFd, RawFd}
;
3 use std
::sync
::atomic
::{AtomicBool, Ordering}
;
4 use std
::sync
::{Arc, Mutex, Once}
;
5 use std
::task
::{Context, Poll, Waker}
;
6 use std
::thread
::JoinHandle
;
8 use crate::error
::io_err_other
;
9 use crate::io
::epoll
::{Epoll, EpollEvent, EPOLLERR, EPOLLET, EPOLLHUP, EPOLLIN, EPOLLOUT}
;
12 static START
: Once
= Once
::new();
13 static mut REACTOR
: Option
<Arc
<Reactor
>> = None
;
15 pub fn default_reactor() -> Arc
<Reactor
> {
16 START
.call_once(|| unsafe {
17 let reactor
= Reactor
::new().expect("setup main epoll reactor");
18 REACTOR
= Some(reactor
);
20 unsafe { Arc::clone(REACTOR.as_ref().unwrap()) }
25 removed
: Mutex
<Vec
<Box
<RegistrationInner
>>>,
26 thread
: Mutex
<Option
<JoinHandle
<()>>>,
30 pub fn new() -> io
::Result
<Arc
<Self>> {
31 let epoll
= Arc
::new(Epoll
::new()?
);
33 let this
= Arc
::new(Reactor
{
35 removed
: Mutex
::new(Vec
::new()),
36 thread
: Mutex
::new(None
),
39 let handle
= std
::thread
::spawn({
40 let this
= Arc
::clone(&this
);
41 move || this
.thread_main()
44 this
.thread
.lock().unwrap().replace(handle
);
49 fn thread_main(self: Arc
<Self>) {
50 let mut buf
: [EpollEvent
; 16] = unsafe { std::mem::zeroed() }
;
52 let count
= match self.epoll
.wait(&mut buf
, None
) {
55 eprintln
!("error in epoll loop: {}", err
);
56 std
::process
::exit(1);
60 self.handle_event(&buf
[i
]);
62 // After going through the events we can release memory associated with already closed
64 self.removed
.lock().unwrap().clear();
68 fn handle_event(&self, event
: &EpollEvent
) {
69 let registration
= unsafe { &mut *(event.r#u64 as *mut RegistrationInner) }
;
70 if registration
.gone
.load(Ordering
::Acquire
) {
74 if 0 != (event
.events
& EPOLLIN
) {
75 //let _prev = registration.ready.fetch_or(READY_IN, Ordering::AcqRel);
76 if let Some(waker
) = registration
.read_waker
.lock().unwrap().take() {
81 if 0 != (event
.events
& EPOLLOUT
) {
82 //let _prev = registration.ready.fetch_or(READY_OUT, Ordering::AcqRel);
83 if let Some(waker
) = registration
.write_waker
.lock().unwrap().take() {
88 if 0 != (event
.events
& (EPOLLERR
| EPOLLHUP
)) {
89 //let _prev = registration.ready.fetch_or(READY_ERR, Ordering::AcqRel);
90 if let Some(waker
) = registration
.read_waker
.lock().unwrap().take() {
93 if let Some(waker
) = registration
.write_waker
.lock().unwrap().take() {
99 pub fn register(self: Arc
<Self>, fd
: RawFd
) -> io
::Result
<Registration
> {
100 let mut inner
= Box
::new(RegistrationInner
{
101 gone
: AtomicBool
::new(false),
102 reactor
: Arc
::clone(&self),
103 read_waker
: Mutex
::new(None
),
104 write_waker
: Mutex
::new(None
),
108 // type check/assertion
109 let inner_ptr
: &mut RegistrationInner
= &mut *inner
;
111 inner_ptr
as *mut RegistrationInner
as usize as u64
115 .add_fd(fd
, EPOLLIN
| EPOLLOUT
| EPOLLET
, inner_ptr
)?
;
117 Ok(Registration { inner: Some(inner) }
)
120 fn deregister(&self, registration
: Box
<RegistrationInner
>) {
121 self.removed
.lock().unwrap().push(registration
);
125 pub struct Registration
{
126 // pin the data in memory because the other thread will access it
127 // ManuallyDrop::take is nightly only :<
128 inner
: Option
<Box
<RegistrationInner
>>,
131 impl Drop
for Registration
{
133 if let Some(inner
) = self.inner
.take() {
134 let reactor
= Arc
::clone(&inner
.reactor
);
135 inner
.gone
.store(true, Ordering
::Release
);
136 reactor
.deregister(inner
);
141 // This is accessed by the reactor
142 struct RegistrationInner
{
144 reactor
: Arc
<Reactor
>,
145 read_waker
: Mutex
<Option
<Waker
>>,
146 write_waker
: Mutex
<Option
<Waker
>>,
149 pub struct PolledFd
{
151 registration
: Registration
,
154 impl AsRawFd
for PolledFd
{
156 fn as_raw_fd(&self) -> RawFd
{
161 impl IntoRawFd
for PolledFd
{
162 fn into_raw_fd(mut self) -> RawFd
{
163 let registration
= self.registration
.inner
.take().unwrap();
167 .remove_fd(self.as_raw_fd())
168 .expect("cannot remove PolledFd from epoll instance");
169 self.fd
.into_raw_fd()
174 pub fn new(mut fd
: Fd
) -> io
::Result
<Self> {
175 fd
.set_nonblocking(true).map_err(io_err_other
)?
;
176 Self::new_with_reactor(fd
, self::default_reactor())
179 pub fn new_with_reactor(fd
: Fd
, reactor
: Arc
<Reactor
>) -> io
::Result
<Self> {
180 let registration
= reactor
.register(fd
.as_raw_fd())?
;
181 Ok(Self { fd, registration }
)
184 pub fn wrap_read
<T
, F
>(&self, cx
: &mut Context
, func
: F
) -> Poll
<io
::Result
<T
>>
186 F
: FnOnce() -> io
::Result
<T
>,
188 let mut read_waker
= self
197 Ok(out
) => Poll
::Ready(Ok(out
)),
198 Err(ref err
) if err
.kind() == io
::ErrorKind
::WouldBlock
=> {
199 *read_waker
= Some(cx
.waker().clone());
202 Err(err
) => Poll
::Ready(Err(err
)),
206 pub fn wrap_write
<T
, F
>(&self, cx
: &mut Context
, func
: F
) -> Poll
<io
::Result
<T
>>
208 F
: FnOnce() -> io
::Result
<T
>,
210 let mut write_waker
= self
219 Ok(out
) => Poll
::Ready(Ok(out
)),
220 Err(ref err
) if err
.kind() == io
::ErrorKind
::WouldBlock
=> {
221 *write_waker
= Some(cx
.waker().clone());
224 Err(err
) => Poll
::Ready(Err(err
)),