4 use std
::cell
::UnsafeCell
;
5 use std
::os
::windows
::prelude
::*;
6 use std
::sync
::{Arc, Mutex}
;
7 use std
::sync
::atomic
::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT}
;
8 use std
::time
::Duration
;
10 use lazycell
::AtomicLazyCell
;
14 use miow
::iocp
::{CompletionPort, CompletionStatus}
;
16 use event_imp
::{Event, Evented, Ready}
;
17 use poll
::{self, Poll}
;
18 use sys
::windows
::buffer_pool
::BufferPool
;
21 /// Each Selector has a globally unique(ish) ID associated with it. This ID
22 /// gets tracked by `TcpStream`, `TcpListener`, etc... when they are first
23 /// registered with the `Selector`. If a type that is previously associated with
24 /// a `Selector` attempts to register itself with a different `Selector`, the
25 /// operation will return with an error. This matches windows behavior.
26 static NEXT_ID
: AtomicUsize
= ATOMIC_USIZE_INIT
;
28 /// The guts of the Windows event loop, this is the struct which actually owns
29 /// a completion port.
31 /// Internally this is just an `Arc`, and this allows handing out references to
32 /// the internals to I/O handles registered on this selector. This is
33 /// required to schedule I/O operations independently of being inside the event
34 /// loop (e.g. when a call to `write` is seen we're not "in the event loop").
36 inner
: Arc
<SelectorInner
>,
39 struct SelectorInner
{
40 /// Unique identifier of the `Selector`
43 /// The actual completion port that's used to manage all I/O
46 /// A pool of buffers usable by this selector.
48 /// Primitives will take buffers from this pool to perform I/O operations,
49 /// and once complete they'll be put back in.
50 buffers
: Mutex
<BufferPool
>,
54 pub fn new() -> io
::Result
<Selector
> {
55 // offset by 1 to avoid choosing 0 as the id of a selector
56 let id
= NEXT_ID
.fetch_add(1, Ordering
::Relaxed
) + 1;
58 CompletionPort
::new(0).map(|cp
| {
60 inner
: Arc
::new(SelectorInner
{
63 buffers
: Mutex
::new(BufferPool
::new(256)),
72 timeout
: Option
<Duration
>) -> io
::Result
<bool
> {
73 trace
!("select; timeout={:?}", timeout
);
75 // Clear out the previous list of I/O events and get some more!
78 trace
!("polling IOCP");
79 let n
= match self.inner
.port
.get_many(&mut events
.statuses
, timeout
) {
80 Ok(statuses
) => statuses
.len(),
81 Err(ref e
) if e
.raw_os_error() == Some(WAIT_TIMEOUT
as i32) => 0,
82 Err(e
) => return Err(e
),
86 for status
in events
.statuses
[..n
].iter() {
87 // This should only ever happen from the awakener, and we should
88 // only ever have one awakener right now, so assert as such.
89 if status
.overlapped() as usize == 0 {
90 assert_eq
!(status
.token(), usize::from(awakener
));
95 let callback
= unsafe {
96 (*(status
.overlapped() as *mut Overlapped
)).callback
99 trace
!("select; -> got overlapped");
100 callback(status
.entry());
107 /// Gets a reference to the underlying `CompletionPort` structure.
108 pub fn port(&self) -> &CompletionPort
{
112 /// Gets a new reference to this selector, although all underlying data
113 /// structures will refer to the same completion port.
114 pub fn clone_ref(&self) -> Selector
{
115 Selector { inner: self.inner.clone() }
118 /// Return the `Selector`'s identifier
119 pub fn id(&self) -> usize {
125 fn identical(&self, other
: &SelectorInner
) -> bool
{
126 (self as *const SelectorInner
) == (other
as *const SelectorInner
)
130 // A registration is stored in each I/O object which keeps track of how it is
131 // associated with a `Selector` above.
133 // Once associated with a `Selector`, a registration can never be un-associated
134 // (due to IOCP requirements). This is actually implemented through the
135 // `poll::Registration` and `poll::SetReadiness` APIs to keep track of all the
136 // level/edge/filtering business.
137 /// A `Binding` is embedded in all I/O objects associated with a `Poll`
140 /// Each registration keeps track of which selector the I/O object is
141 /// associated with, ensuring that implementations of `Evented` can be
142 /// conformant for the various methods on Windows.
144 /// If you're working with custom IOCP-enabled objects then you'll want to
145 /// ensure that one of these instances is stored in your object and used in the
146 /// implementation of `Evented`.
148 /// For more information about how to use this see the `windows` module
149 /// documentation in this crate.
151 selector
: AtomicLazyCell
<Arc
<SelectorInner
>>,
155 /// Creates a new blank binding ready to be inserted into an I/O
158 /// Won't actually do anything until associated with a `Poll` loop.
159 pub fn new() -> Binding
{
160 Binding { selector: AtomicLazyCell::new() }
163 /// Registers a new handle with the `Poll` specified, also assigning the
164 /// `token` specified.
166 /// This function is intended to be used as part of `Evented::register` for
167 /// custom IOCP objects. It will add the specified handle to the internal
168 /// IOCP object with the provided `token`. All future events generated by
169 /// the handled provided will be received by the `Poll`'s internal IOCP
174 /// This function is unsafe as the `Poll` instance has assumptions about
175 /// what the `OVERLAPPED` pointer used for each I/O operation looks like.
176 /// Specifically they must all be instances of the `Overlapped` type in
177 /// this crate. More information about this can be found on the
178 /// `windows` module in this crate.
179 pub unsafe fn register_handle(&self,
180 handle
: &AsRawHandle
,
182 poll
: &Poll
) -> io
::Result
<()> {
183 let selector
= poll
::selector(poll
);
185 // Ignore errors, we'll see them on the next line.
186 drop(self.selector
.fill(selector
.inner
.clone()));
187 self.check_same_selector(poll
)?
;
189 selector
.inner
.port
.add_handle(usize::from(token
), handle
)
192 /// Same as `register_handle` but for sockets.
193 pub unsafe fn register_socket(&self,
194 handle
: &AsRawSocket
,
196 poll
: &Poll
) -> io
::Result
<()> {
197 let selector
= poll
::selector(poll
);
198 drop(self.selector
.fill(selector
.inner
.clone()));
199 self.check_same_selector(poll
)?
;
200 selector
.inner
.port
.add_socket(usize::from(token
), handle
)
203 /// Reregisters the handle provided from the `Poll` provided.
205 /// This is intended to be used as part of `Evented::reregister` but note
206 /// that this function does not currently reregister the provided handle
207 /// with the `poll` specified. IOCP has a special binding for changing the
208 /// token which has not yet been implemented. Instead this function should
209 /// be used to assert that the call to `reregister` happened on the same
210 /// `Poll` that was passed into to `register`.
212 /// Eventually, though, the provided `handle` will be re-assigned to have
213 /// the token `token` on the given `poll` assuming that it's been
214 /// previously registered with it.
218 /// This function is unsafe for similar reasons to `register`. That is,
219 /// there may be pending I/O events and such which aren't handled correctly.
220 pub unsafe fn reregister_handle(&self,
221 _handle
: &AsRawHandle
,
223 poll
: &Poll
) -> io
::Result
<()> {
224 self.check_same_selector(poll
)
227 /// Same as `reregister_handle`, but for sockets.
228 pub unsafe fn reregister_socket(&self,
229 _socket
: &AsRawSocket
,
231 poll
: &Poll
) -> io
::Result
<()> {
232 self.check_same_selector(poll
)
235 /// Deregisters the handle provided from the `Poll` provided.
237 /// This is intended to be used as part of `Evented::deregister` but note
238 /// that this function does not currently deregister the provided handle
239 /// from the `poll` specified. IOCP has a special binding for that which has
240 /// not yet been implemented. Instead this function should be used to assert
241 /// that the call to `deregister` happened on the same `Poll` that was
242 /// passed into to `register`.
246 /// This function is unsafe for similar reasons to `register`. That is,
247 /// there may be pending I/O events and such which aren't handled correctly.
248 pub unsafe fn deregister_handle(&self,
249 _handle
: &AsRawHandle
,
250 poll
: &Poll
) -> io
::Result
<()> {
251 self.check_same_selector(poll
)
254 /// Same as `deregister_handle`, but for sockets.
255 pub unsafe fn deregister_socket(&self,
256 _socket
: &AsRawSocket
,
257 poll
: &Poll
) -> io
::Result
<()> {
258 self.check_same_selector(poll
)
261 fn check_same_selector(&self, poll
: &Poll
) -> io
::Result
<()> {
262 let selector
= poll
::selector(poll
);
263 match self.selector
.borrow() {
264 Some(prev
) if prev
.identical(&selector
.inner
) => Ok(()),
266 None
=> Err(other("socket already registered")),
271 impl fmt
::Debug
for Binding
{
272 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
273 f
.debug_struct("Binding")
278 /// Helper struct used for TCP and UDP which bundles a `binding` with a
279 /// `SetReadiness` handle.
280 pub struct ReadyBinding
{
282 readiness
: Option
<poll
::SetReadiness
>,
286 /// Creates a new blank binding ready to be inserted into an I/O object.
288 /// Won't actually do anything until associated with an `Selector` loop.
289 pub fn new() -> ReadyBinding
{
291 binding
: Binding
::new(),
296 /// Returns whether this binding has been associated with a selector
298 pub fn registered(&self) -> bool
{
299 self.readiness
.is_some()
302 /// Acquires a buffer with at least `size` capacity.
304 /// If associated with a selector, this will attempt to pull a buffer from
305 /// that buffer pool. If not associated with a selector, this will allocate
307 pub fn get_buffer(&self, size
: usize) -> Vec
<u8> {
308 match self.binding
.selector
.borrow() {
309 Some(i
) => i
.buffers
.lock().unwrap().get(size
),
310 None
=> Vec
::with_capacity(size
),
314 /// Returns a buffer to this binding.
316 /// If associated with a selector, this will push the buffer back into the
317 /// selector's pool of buffers. Otherwise this will just drop the buffer.
318 pub fn put_buffer(&self, buf
: Vec
<u8>) {
319 if let Some(i
) = self.binding
.selector
.borrow() {
320 i
.buffers
.lock().unwrap().put(buf
);
324 /// Sets the readiness of this I/O object to a particular `set`.
326 /// This is later used to fill out and respond to requests to `poll`. Note
327 /// that this is all implemented through the `SetReadiness` structure in the
329 pub fn set_readiness(&self, set
: Ready
) {
330 if let Some(ref i
) = self.readiness
{
331 trace
!("set readiness to {:?}", set
);
332 i
.set_readiness(set
).expect("event loop disappeared?");
336 /// Queries what the current readiness of this I/O object is.
338 /// This is what's being used to generate events returned by `poll`.
339 pub fn readiness(&self) -> Ready
{
340 match self.readiness
{
341 Some(ref i
) => i
.readiness(),
342 None
=> Ready
::empty(),
346 /// Implementation of the `Evented::register` function essentially.
348 /// Returns an error if we're already registered with another event loop,
349 /// and otherwise just reassociates ourselves with the event loop to
350 /// possible change tokens.
351 pub fn register_socket(&mut self,
352 socket
: &AsRawSocket
,
357 registration
: &Mutex
<Option
<poll
::Registration
>>)
359 trace
!("register {:?} {:?}", token
, events
);
361 self.binding
.register_socket(socket
, token
, poll
)?
;
364 let (r
, s
) = poll
::new_registration(poll
, token
, events
, opts
);
365 self.readiness
= Some(s
);
366 *registration
.lock().unwrap() = Some(r
);
370 /// Implementation of `Evented::reregister` function.
371 pub fn reregister_socket(&mut self,
372 socket
: &AsRawSocket
,
377 registration
: &Mutex
<Option
<poll
::Registration
>>)
379 trace
!("reregister {:?} {:?}", token
, events
);
381 self.binding
.reregister_socket(socket
, token
, poll
)?
;
384 registration
.lock().unwrap()
386 .reregister(poll
, token
, events
, opts
)
389 /// Implementation of the `Evented::deregister` function.
391 /// Doesn't allow registration with another event loop, just shuts down
392 /// readiness notifications and such.
393 pub fn deregister(&mut self,
394 socket
: &AsRawSocket
,
396 registration
: &Mutex
<Option
<poll
::Registration
>>)
398 trace
!("deregistering");
400 self.binding
.deregister_socket(socket
, poll
)?
;
403 registration
.lock().unwrap()
409 fn other(s
: &str) -> io
::Error
{
410 io
::Error
::new(io
::ErrorKind
::Other
, s
)
415 /// Raw I/O event completions are filled in here by the call to `get_many`
416 /// on the completion port above. These are then processed to run callbacks
417 /// which figure out what to do after the event is done.
418 statuses
: Box
<[CompletionStatus
]>,
420 /// Literal events returned by `get` to the upwards `EventLoop`. This file
421 /// doesn't really modify this (except for the awakener), instead almost all
422 /// events are filled in by the `ReadinessQueue` from the `poll` module.
427 pub fn with_capacity(cap
: usize) -> Events
{
428 // Note that it's possible for the output `events` to grow beyond the
429 // capacity as it can also include deferred events, but that's certainly
430 // not the end of the world!
432 statuses
: vec
![CompletionStatus
::zero(); cap
].into_boxed_slice(),
433 events
: Vec
::with_capacity(cap
),
437 pub fn is_empty(&self) -> bool
{
438 self.events
.is_empty()
441 pub fn len(&self) -> usize {
445 pub fn capacity(&self) -> usize {
446 self.events
.capacity()
449 pub fn get(&self, idx
: usize) -> Option
<Event
> {
450 self.events
.get(idx
).map(|e
| *e
)
453 pub fn push_event(&mut self, event
: Event
) {
454 self.events
.push(event
);
457 pub fn clear(&mut self) {
458 self.events
.truncate(0);
462 macro_rules
! overlapped2arc
{
463 ($e
:expr
, $t
:ty
, $
($field
:ident
).+) => ({
464 let offset
= offset_of
!($t
, $
($field
).+);
465 debug_assert
!(offset
< mem
::size_of
::<$t
>());
466 FromRawArc
::from_raw(($e
as usize - offset
) as *mut $t
)
470 macro_rules
! offset_of
{
471 ($t
:ty
, $
($field
:ident
).+) => (
472 &(*(0 as *const $t
)).$
($field
).+ as *const _
as usize
476 // See sys::windows module docs for why this exists.
478 // The gist of it is that `Selector` assumes that all `OVERLAPPED` pointers are
479 // actually inside one of these structures so it can use the `Callback` stored
482 // We use repr(C) here to ensure that we can assume the overlapped pointer is
483 // at the start of the structure so we can just do a cast.
484 /// A wrapper around an internal instance over `miow::Overlapped` which is in
485 /// turn a wrapper around the Windows type `OVERLAPPED`.
487 /// This type is required to be used for all IOCP operations on handles that are
488 /// registered with an event loop. The event loop will receive notifications
489 /// over `OVERLAPPED` pointers that have completed, and it will cast that
490 /// pointer to a pointer to this structure and invoke the associated callback.
492 pub struct Overlapped
{
493 inner
: UnsafeCell
<miow
::Overlapped
>,
494 callback
: fn(&OVERLAPPED_ENTRY
),
498 /// Creates a new `Overlapped` which will invoke the provided `cb` callback
499 /// whenever it's triggered.
501 /// The returned `Overlapped` must be used as the `OVERLAPPED` passed to all
502 /// I/O operations that are registered with mio's event loop. When the I/O
503 /// operation associated with an `OVERLAPPED` pointer completes the event
504 /// loop will invoke the function pointer provided by `cb`.
505 pub fn new(cb
: fn(&OVERLAPPED_ENTRY
)) -> Overlapped
{
507 inner
: UnsafeCell
::new(miow
::Overlapped
::zero()),
512 /// Get the underlying `Overlapped` instance as a raw pointer.
514 /// This can be useful when only a shared borrow is held and the overlapped
515 /// pointer needs to be passed down to winapi.
516 pub fn as_mut_ptr(&self) -> *mut OVERLAPPED
{
518 (*self.inner
.get()).raw()
523 impl fmt
::Debug
for Overlapped
{
524 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
525 f
.debug_struct("Overlapped")
530 // Overlapped's APIs are marked as unsafe Overlapped's APIs are marked as
531 // unsafe as they must be used with caution to ensure thread safety. The
532 // structure itself is safe to send across threads.
533 unsafe impl Send
for Overlapped {}
534 unsafe impl Sync
for Overlapped {}