]> git.proxmox.com Git - rustc.git/blob - vendor/mio/src/sys/windows/selector.rs
New upstream version 1.46.0~beta.2+dfsg1
[rustc.git] / vendor / mio / src / sys / windows / selector.rs
1 #![allow(deprecated)]
2
3 use std::{fmt, io};
4 use std::cell::UnsafeCell;
5 use std::os::windows::prelude::*;
6 use std::sync::{Arc, Mutex};
7 use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
8 use std::time::Duration;
9
10 use lazycell::AtomicLazyCell;
11
12 use winapi::*;
13 use miow;
14 use miow::iocp::{CompletionPort, CompletionStatus};
15
16 use event_imp::{Event, Evented, Ready};
17 use poll::{self, Poll};
18 use sys::windows::buffer_pool::BufferPool;
19 use {Token, PollOpt};
20
21 /// Each Selector has a globally unique(ish) ID associated with it. This ID
22 /// gets tracked by `TcpStream`, `TcpListener`, etc... when they are first
23 /// registered with the `Selector`. If a type that is previously associated with
24 /// a `Selector` attempts to register itself with a different `Selector`, the
25 /// operation will return with an error. This matches windows behavior.
26 static NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT;
27
28 /// The guts of the Windows event loop, this is the struct which actually owns
29 /// a completion port.
30 ///
31 /// Internally this is just an `Arc`, and this allows handing out references to
32 /// the internals to I/O handles registered on this selector. This is
33 /// required to schedule I/O operations independently of being inside the event
34 /// loop (e.g. when a call to `write` is seen we're not "in the event loop").
35 pub struct Selector {
36 inner: Arc<SelectorInner>,
37 }
38
39 struct SelectorInner {
40 /// Unique identifier of the `Selector`
41 id: usize,
42
43 /// The actual completion port that's used to manage all I/O
44 port: CompletionPort,
45
46 /// A pool of buffers usable by this selector.
47 ///
48 /// Primitives will take buffers from this pool to perform I/O operations,
49 /// and once complete they'll be put back in.
50 buffers: Mutex<BufferPool>,
51 }
52
53 impl Selector {
54 pub fn new() -> io::Result<Selector> {
55 // offset by 1 to avoid choosing 0 as the id of a selector
56 let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1;
57
58 CompletionPort::new(0).map(|cp| {
59 Selector {
60 inner: Arc::new(SelectorInner {
61 id: id,
62 port: cp,
63 buffers: Mutex::new(BufferPool::new(256)),
64 }),
65 }
66 })
67 }
68
69 pub fn select(&self,
70 events: &mut Events,
71 awakener: Token,
72 timeout: Option<Duration>) -> io::Result<bool> {
73 trace!("select; timeout={:?}", timeout);
74
75 // Clear out the previous list of I/O events and get some more!
76 events.clear();
77
78 trace!("polling IOCP");
79 let n = match self.inner.port.get_many(&mut events.statuses, timeout) {
80 Ok(statuses) => statuses.len(),
81 Err(ref e) if e.raw_os_error() == Some(WAIT_TIMEOUT as i32) => 0,
82 Err(e) => return Err(e),
83 };
84
85 let mut ret = false;
86 for status in events.statuses[..n].iter() {
87 // This should only ever happen from the awakener, and we should
88 // only ever have one awakener right now, so assert as such.
89 if status.overlapped() as usize == 0 {
90 assert_eq!(status.token(), usize::from(awakener));
91 ret = true;
92 continue;
93 }
94
95 let callback = unsafe {
96 (*(status.overlapped() as *mut Overlapped)).callback
97 };
98
99 trace!("select; -> got overlapped");
100 callback(status.entry());
101 }
102
103 trace!("returning");
104 Ok(ret)
105 }
106
107 /// Gets a reference to the underlying `CompletionPort` structure.
108 pub fn port(&self) -> &CompletionPort {
109 &self.inner.port
110 }
111
112 /// Gets a new reference to this selector, although all underlying data
113 /// structures will refer to the same completion port.
114 pub fn clone_ref(&self) -> Selector {
115 Selector { inner: self.inner.clone() }
116 }
117
118 /// Return the `Selector`'s identifier
119 pub fn id(&self) -> usize {
120 self.inner.id
121 }
122 }
123
124 impl SelectorInner {
125 fn identical(&self, other: &SelectorInner) -> bool {
126 (self as *const SelectorInner) == (other as *const SelectorInner)
127 }
128 }
129
130 // A registration is stored in each I/O object which keeps track of how it is
131 // associated with a `Selector` above.
132 //
133 // Once associated with a `Selector`, a registration can never be un-associated
134 // (due to IOCP requirements). This is actually implemented through the
135 // `poll::Registration` and `poll::SetReadiness` APIs to keep track of all the
136 // level/edge/filtering business.
137 /// A `Binding` is embedded in all I/O objects associated with a `Poll`
138 /// object.
139 ///
140 /// Each registration keeps track of which selector the I/O object is
141 /// associated with, ensuring that implementations of `Evented` can be
142 /// conformant for the various methods on Windows.
143 ///
144 /// If you're working with custom IOCP-enabled objects then you'll want to
145 /// ensure that one of these instances is stored in your object and used in the
146 /// implementation of `Evented`.
147 ///
148 /// For more information about how to use this see the `windows` module
149 /// documentation in this crate.
150 pub struct Binding {
151 selector: AtomicLazyCell<Arc<SelectorInner>>,
152 }
153
154 impl Binding {
155 /// Creates a new blank binding ready to be inserted into an I/O
156 /// object.
157 ///
158 /// Won't actually do anything until associated with a `Poll` loop.
159 pub fn new() -> Binding {
160 Binding { selector: AtomicLazyCell::new() }
161 }
162
163 /// Registers a new handle with the `Poll` specified, also assigning the
164 /// `token` specified.
165 ///
166 /// This function is intended to be used as part of `Evented::register` for
167 /// custom IOCP objects. It will add the specified handle to the internal
168 /// IOCP object with the provided `token`. All future events generated by
169 /// the handled provided will be received by the `Poll`'s internal IOCP
170 /// object.
171 ///
172 /// # Unsafety
173 ///
174 /// This function is unsafe as the `Poll` instance has assumptions about
175 /// what the `OVERLAPPED` pointer used for each I/O operation looks like.
176 /// Specifically they must all be instances of the `Overlapped` type in
177 /// this crate. More information about this can be found on the
178 /// `windows` module in this crate.
179 pub unsafe fn register_handle(&self,
180 handle: &AsRawHandle,
181 token: Token,
182 poll: &Poll) -> io::Result<()> {
183 let selector = poll::selector(poll);
184
185 // Ignore errors, we'll see them on the next line.
186 drop(self.selector.fill(selector.inner.clone()));
187 self.check_same_selector(poll)?;
188
189 selector.inner.port.add_handle(usize::from(token), handle)
190 }
191
192 /// Same as `register_handle` but for sockets.
193 pub unsafe fn register_socket(&self,
194 handle: &AsRawSocket,
195 token: Token,
196 poll: &Poll) -> io::Result<()> {
197 let selector = poll::selector(poll);
198 drop(self.selector.fill(selector.inner.clone()));
199 self.check_same_selector(poll)?;
200 selector.inner.port.add_socket(usize::from(token), handle)
201 }
202
203 /// Reregisters the handle provided from the `Poll` provided.
204 ///
205 /// This is intended to be used as part of `Evented::reregister` but note
206 /// that this function does not currently reregister the provided handle
207 /// with the `poll` specified. IOCP has a special binding for changing the
208 /// token which has not yet been implemented. Instead this function should
209 /// be used to assert that the call to `reregister` happened on the same
210 /// `Poll` that was passed into to `register`.
211 ///
212 /// Eventually, though, the provided `handle` will be re-assigned to have
213 /// the token `token` on the given `poll` assuming that it's been
214 /// previously registered with it.
215 ///
216 /// # Unsafety
217 ///
218 /// This function is unsafe for similar reasons to `register`. That is,
219 /// there may be pending I/O events and such which aren't handled correctly.
220 pub unsafe fn reregister_handle(&self,
221 _handle: &AsRawHandle,
222 _token: Token,
223 poll: &Poll) -> io::Result<()> {
224 self.check_same_selector(poll)
225 }
226
227 /// Same as `reregister_handle`, but for sockets.
228 pub unsafe fn reregister_socket(&self,
229 _socket: &AsRawSocket,
230 _token: Token,
231 poll: &Poll) -> io::Result<()> {
232 self.check_same_selector(poll)
233 }
234
235 /// Deregisters the handle provided from the `Poll` provided.
236 ///
237 /// This is intended to be used as part of `Evented::deregister` but note
238 /// that this function does not currently deregister the provided handle
239 /// from the `poll` specified. IOCP has a special binding for that which has
240 /// not yet been implemented. Instead this function should be used to assert
241 /// that the call to `deregister` happened on the same `Poll` that was
242 /// passed into to `register`.
243 ///
244 /// # Unsafety
245 ///
246 /// This function is unsafe for similar reasons to `register`. That is,
247 /// there may be pending I/O events and such which aren't handled correctly.
248 pub unsafe fn deregister_handle(&self,
249 _handle: &AsRawHandle,
250 poll: &Poll) -> io::Result<()> {
251 self.check_same_selector(poll)
252 }
253
254 /// Same as `deregister_handle`, but for sockets.
255 pub unsafe fn deregister_socket(&self,
256 _socket: &AsRawSocket,
257 poll: &Poll) -> io::Result<()> {
258 self.check_same_selector(poll)
259 }
260
261 fn check_same_selector(&self, poll: &Poll) -> io::Result<()> {
262 let selector = poll::selector(poll);
263 match self.selector.borrow() {
264 Some(prev) if prev.identical(&selector.inner) => Ok(()),
265 Some(_) |
266 None => Err(other("socket already registered")),
267 }
268 }
269 }
270
271 impl fmt::Debug for Binding {
272 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
273 f.debug_struct("Binding")
274 .finish()
275 }
276 }
277
278 /// Helper struct used for TCP and UDP which bundles a `binding` with a
279 /// `SetReadiness` handle.
280 pub struct ReadyBinding {
281 binding: Binding,
282 readiness: Option<poll::SetReadiness>,
283 }
284
285 impl ReadyBinding {
286 /// Creates a new blank binding ready to be inserted into an I/O object.
287 ///
288 /// Won't actually do anything until associated with an `Selector` loop.
289 pub fn new() -> ReadyBinding {
290 ReadyBinding {
291 binding: Binding::new(),
292 readiness: None,
293 }
294 }
295
296 /// Returns whether this binding has been associated with a selector
297 /// yet.
298 pub fn registered(&self) -> bool {
299 self.readiness.is_some()
300 }
301
302 /// Acquires a buffer with at least `size` capacity.
303 ///
304 /// If associated with a selector, this will attempt to pull a buffer from
305 /// that buffer pool. If not associated with a selector, this will allocate
306 /// a fresh buffer.
307 pub fn get_buffer(&self, size: usize) -> Vec<u8> {
308 match self.binding.selector.borrow() {
309 Some(i) => i.buffers.lock().unwrap().get(size),
310 None => Vec::with_capacity(size),
311 }
312 }
313
314 /// Returns a buffer to this binding.
315 ///
316 /// If associated with a selector, this will push the buffer back into the
317 /// selector's pool of buffers. Otherwise this will just drop the buffer.
318 pub fn put_buffer(&self, buf: Vec<u8>) {
319 if let Some(i) = self.binding.selector.borrow() {
320 i.buffers.lock().unwrap().put(buf);
321 }
322 }
323
324 /// Sets the readiness of this I/O object to a particular `set`.
325 ///
326 /// This is later used to fill out and respond to requests to `poll`. Note
327 /// that this is all implemented through the `SetReadiness` structure in the
328 /// `poll` module.
329 pub fn set_readiness(&self, set: Ready) {
330 if let Some(ref i) = self.readiness {
331 trace!("set readiness to {:?}", set);
332 i.set_readiness(set).expect("event loop disappeared?");
333 }
334 }
335
336 /// Queries what the current readiness of this I/O object is.
337 ///
338 /// This is what's being used to generate events returned by `poll`.
339 pub fn readiness(&self) -> Ready {
340 match self.readiness {
341 Some(ref i) => i.readiness(),
342 None => Ready::empty(),
343 }
344 }
345
346 /// Implementation of the `Evented::register` function essentially.
347 ///
348 /// Returns an error if we're already registered with another event loop,
349 /// and otherwise just reassociates ourselves with the event loop to
350 /// possible change tokens.
351 pub fn register_socket(&mut self,
352 socket: &AsRawSocket,
353 poll: &Poll,
354 token: Token,
355 events: Ready,
356 opts: PollOpt,
357 registration: &Mutex<Option<poll::Registration>>)
358 -> io::Result<()> {
359 trace!("register {:?} {:?}", token, events);
360 unsafe {
361 self.binding.register_socket(socket, token, poll)?;
362 }
363
364 let (r, s) = poll::new_registration(poll, token, events, opts);
365 self.readiness = Some(s);
366 *registration.lock().unwrap() = Some(r);
367 Ok(())
368 }
369
370 /// Implementation of `Evented::reregister` function.
371 pub fn reregister_socket(&mut self,
372 socket: &AsRawSocket,
373 poll: &Poll,
374 token: Token,
375 events: Ready,
376 opts: PollOpt,
377 registration: &Mutex<Option<poll::Registration>>)
378 -> io::Result<()> {
379 trace!("reregister {:?} {:?}", token, events);
380 unsafe {
381 self.binding.reregister_socket(socket, token, poll)?;
382 }
383
384 registration.lock().unwrap()
385 .as_mut().unwrap()
386 .reregister(poll, token, events, opts)
387 }
388
389 /// Implementation of the `Evented::deregister` function.
390 ///
391 /// Doesn't allow registration with another event loop, just shuts down
392 /// readiness notifications and such.
393 pub fn deregister(&mut self,
394 socket: &AsRawSocket,
395 poll: &Poll,
396 registration: &Mutex<Option<poll::Registration>>)
397 -> io::Result<()> {
398 trace!("deregistering");
399 unsafe {
400 self.binding.deregister_socket(socket, poll)?;
401 }
402
403 registration.lock().unwrap()
404 .as_ref().unwrap()
405 .deregister(poll)
406 }
407 }
408
409 fn other(s: &str) -> io::Error {
410 io::Error::new(io::ErrorKind::Other, s)
411 }
412
413 #[derive(Debug)]
414 pub struct Events {
415 /// Raw I/O event completions are filled in here by the call to `get_many`
416 /// on the completion port above. These are then processed to run callbacks
417 /// which figure out what to do after the event is done.
418 statuses: Box<[CompletionStatus]>,
419
420 /// Literal events returned by `get` to the upwards `EventLoop`. This file
421 /// doesn't really modify this (except for the awakener), instead almost all
422 /// events are filled in by the `ReadinessQueue` from the `poll` module.
423 events: Vec<Event>,
424 }
425
426 impl Events {
427 pub fn with_capacity(cap: usize) -> Events {
428 // Note that it's possible for the output `events` to grow beyond the
429 // capacity as it can also include deferred events, but that's certainly
430 // not the end of the world!
431 Events {
432 statuses: vec![CompletionStatus::zero(); cap].into_boxed_slice(),
433 events: Vec::with_capacity(cap),
434 }
435 }
436
437 pub fn is_empty(&self) -> bool {
438 self.events.is_empty()
439 }
440
441 pub fn len(&self) -> usize {
442 self.events.len()
443 }
444
445 pub fn capacity(&self) -> usize {
446 self.events.capacity()
447 }
448
449 pub fn get(&self, idx: usize) -> Option<Event> {
450 self.events.get(idx).map(|e| *e)
451 }
452
453 pub fn push_event(&mut self, event: Event) {
454 self.events.push(event);
455 }
456
457 pub fn clear(&mut self) {
458 self.events.truncate(0);
459 }
460 }
461
462 macro_rules! overlapped2arc {
463 ($e:expr, $t:ty, $($field:ident).+) => ({
464 let offset = offset_of!($t, $($field).+);
465 debug_assert!(offset < mem::size_of::<$t>());
466 FromRawArc::from_raw(($e as usize - offset) as *mut $t)
467 })
468 }
469
470 macro_rules! offset_of {
471 ($t:ty, $($field:ident).+) => (
472 &(*(0 as *const $t)).$($field).+ as *const _ as usize
473 )
474 }
475
476 // See sys::windows module docs for why this exists.
477 //
478 // The gist of it is that `Selector` assumes that all `OVERLAPPED` pointers are
479 // actually inside one of these structures so it can use the `Callback` stored
480 // right after it.
481 //
482 // We use repr(C) here to ensure that we can assume the overlapped pointer is
483 // at the start of the structure so we can just do a cast.
484 /// A wrapper around an internal instance over `miow::Overlapped` which is in
485 /// turn a wrapper around the Windows type `OVERLAPPED`.
486 ///
487 /// This type is required to be used for all IOCP operations on handles that are
488 /// registered with an event loop. The event loop will receive notifications
489 /// over `OVERLAPPED` pointers that have completed, and it will cast that
490 /// pointer to a pointer to this structure and invoke the associated callback.
491 #[repr(C)]
492 pub struct Overlapped {
493 inner: UnsafeCell<miow::Overlapped>,
494 callback: fn(&OVERLAPPED_ENTRY),
495 }
496
497 impl Overlapped {
498 /// Creates a new `Overlapped` which will invoke the provided `cb` callback
499 /// whenever it's triggered.
500 ///
501 /// The returned `Overlapped` must be used as the `OVERLAPPED` passed to all
502 /// I/O operations that are registered with mio's event loop. When the I/O
503 /// operation associated with an `OVERLAPPED` pointer completes the event
504 /// loop will invoke the function pointer provided by `cb`.
505 pub fn new(cb: fn(&OVERLAPPED_ENTRY)) -> Overlapped {
506 Overlapped {
507 inner: UnsafeCell::new(miow::Overlapped::zero()),
508 callback: cb,
509 }
510 }
511
512 /// Get the underlying `Overlapped` instance as a raw pointer.
513 ///
514 /// This can be useful when only a shared borrow is held and the overlapped
515 /// pointer needs to be passed down to winapi.
516 pub fn as_mut_ptr(&self) -> *mut OVERLAPPED {
517 unsafe {
518 (*self.inner.get()).raw()
519 }
520 }
521 }
522
523 impl fmt::Debug for Overlapped {
524 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
525 f.debug_struct("Overlapped")
526 .finish()
527 }
528 }
529
530 // Overlapped's APIs are marked as unsafe Overlapped's APIs are marked as
531 // unsafe as they must be used with caution to ensure thread safety. The
532 // structure itself is safe to send across threads.
533 unsafe impl Send for Overlapped {}
534 unsafe impl Sync for Overlapped {}