]> git.proxmox.com Git - rustc.git/blob - src/vendor/miow/src/iocp.rs
New upstream version 1.31.0+dfsg1
[rustc.git] / src / vendor / miow / src / iocp.rs
1 //! Bindings to IOCP, I/O Completion Ports
2
3 use std::cmp;
4 use std::fmt;
5 use std::io;
6 use std::mem;
7 use std::os::windows::io::*;
8 use std::time::Duration;
9
10 use handle::Handle;
11 use winapi::shared::basetsd::*;
12 use winapi::shared::ntdef::*;
13 use winapi::um::minwinbase::*;
14 use winapi::um::handleapi::*;
15 use winapi::um::ioapiset::*;
16 use Overlapped;
17
18 /// A handle to an Windows I/O Completion Port.
19 #[derive(Debug)]
20 pub struct CompletionPort {
21 handle: Handle,
22 }
23
24 /// A status message received from an I/O completion port.
25 ///
26 /// These statuses can be created via the `new` or `empty` constructors and then
27 /// provided to a completion port, or they are read out of a completion port.
28 /// The fields of each status are read through its accessor methods.
29 #[derive(Clone, Copy)]
30 pub struct CompletionStatus(OVERLAPPED_ENTRY);
31
32 impl fmt::Debug for CompletionStatus {
33 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
34 write!(f, "CompletionStatus(OVERLAPPED_ENTRY)")
35 }
36 }
37
38 unsafe impl Send for CompletionStatus {}
39 unsafe impl Sync for CompletionStatus {}
40
41 impl CompletionPort {
42 /// Creates a new I/O completion port with the specified concurrency value.
43 ///
44 /// The number of threads given corresponds to the level of concurrency
45 /// allowed for threads associated with this port. Consult the Windows
46 /// documentation for more information about this value.
47 pub fn new(threads: u32) -> io::Result<CompletionPort> {
48 let ret = unsafe {
49 CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0 as *mut _,
50 0, threads)
51 };
52 if ret.is_null() {
53 Err(io::Error::last_os_error())
54 } else {
55 Ok(CompletionPort { handle: Handle::new(ret) })
56 }
57 }
58
59 /// Associates a new `HANDLE` to this I/O completion port.
60 ///
61 /// This function will associate the given handle to this port with the
62 /// given `token` to be returned in status messages whenever it receives a
63 /// notification.
64 ///
65 /// Any object which is convertible to a `HANDLE` via the `AsRawHandle`
66 /// trait can be provided to this function, such as `std::fs::File` and
67 /// friends.
68 pub fn add_handle<T: AsRawHandle + ?Sized>(&self, token: usize,
69 t: &T) -> io::Result<()> {
70 self._add(token, t.as_raw_handle())
71 }
72
73 /// Associates a new `SOCKET` to this I/O completion port.
74 ///
75 /// This function will associate the given socket to this port with the
76 /// given `token` to be returned in status messages whenever it receives a
77 /// notification.
78 ///
79 /// Any object which is convertible to a `SOCKET` via the `AsRawSocket`
80 /// trait can be provided to this function, such as `std::net::TcpStream`
81 /// and friends.
82 pub fn add_socket<T: AsRawSocket + ?Sized>(&self, token: usize,
83 t: &T) -> io::Result<()> {
84 self._add(token, t.as_raw_socket() as HANDLE)
85 }
86
87 fn _add(&self, token: usize, handle: HANDLE) -> io::Result<()> {
88 assert_eq!(mem::size_of_val(&token), mem::size_of::<ULONG_PTR>());
89 let ret = unsafe {
90 CreateIoCompletionPort(handle, self.handle.raw(),
91 token as ULONG_PTR, 0)
92 };
93 if ret.is_null() {
94 Err(io::Error::last_os_error())
95 } else {
96 debug_assert_eq!(ret, self.handle.raw());
97 Ok(())
98 }
99 }
100
101 /// Dequeue a completion status from this I/O completion port.
102 ///
103 /// This function will associate the calling thread with this completion
104 /// port and then wait for a status message to become available. The precise
105 /// semantics on when this function returns depends on the concurrency value
106 /// specified when the port was created.
107 ///
108 /// A timeout can optionally be specified to this function. If `None` is
109 /// provided this function will not time out, and otherwise it will time out
110 /// after the specified duration has passed.
111 ///
112 /// On success this will return the status message which was dequeued from
113 /// this completion port.
114 pub fn get(&self, timeout: Option<Duration>) -> io::Result<CompletionStatus> {
115 let mut bytes = 0;
116 let mut token = 0;
117 let mut overlapped = 0 as *mut _;
118 let timeout = ::dur2ms(timeout);
119 let ret = unsafe {
120 GetQueuedCompletionStatus(self.handle.raw(),
121 &mut bytes,
122 &mut token,
123 &mut overlapped,
124 timeout)
125 };
126 ::cvt(ret).map(|_| {
127 CompletionStatus(OVERLAPPED_ENTRY {
128 dwNumberOfBytesTransferred: bytes,
129 lpCompletionKey: token,
130 lpOverlapped: overlapped,
131 Internal: 0,
132 })
133 })
134 }
135
136 /// Dequeues a number of completion statuses from this I/O completion port.
137 ///
138 /// This function is the same as `get` except that it may return more than
139 /// one status. A buffer of "zero" statuses is provided (the contents are
140 /// not read) and then on success this function will return a sub-slice of
141 /// statuses which represent those which were dequeued from this port. This
142 /// function does not wait to fill up the entire list of statuses provided.
143 ///
144 /// Like with `get`, a timeout may be specified for this operation.
145 pub fn get_many<'a>(&self,
146 list: &'a mut [CompletionStatus],
147 timeout: Option<Duration>)
148 -> io::Result<&'a mut [CompletionStatus]>
149 {
150 debug_assert_eq!(mem::size_of::<CompletionStatus>(),
151 mem::size_of::<OVERLAPPED_ENTRY>());
152 let mut removed = 0;
153 let timeout = ::dur2ms(timeout);
154 let len = cmp::min(list.len(), <ULONG>::max_value() as usize) as ULONG;
155 let ret = unsafe {
156 GetQueuedCompletionStatusEx(self.handle.raw(),
157 list.as_ptr() as *mut _,
158 len,
159 &mut removed,
160 timeout,
161 FALSE as i32)
162 };
163 match ::cvt(ret) {
164 Ok(_) => Ok(&mut list[..removed as usize]),
165 Err(e) => Err(e),
166 }
167 }
168
169 /// Posts a new completion status onto this I/O completion port.
170 ///
171 /// This function will post the given status, with custom parameters, to the
172 /// port. Threads blocked in `get` or `get_many` will eventually receive
173 /// this status.
174 pub fn post(&self, status: CompletionStatus) -> io::Result<()> {
175 let ret = unsafe {
176 PostQueuedCompletionStatus(self.handle.raw(),
177 status.0.dwNumberOfBytesTransferred,
178 status.0.lpCompletionKey,
179 status.0.lpOverlapped)
180 };
181 ::cvt(ret).map(|_| ())
182 }
183 }
184
185 impl AsRawHandle for CompletionPort {
186 fn as_raw_handle(&self) -> HANDLE {
187 self.handle.raw()
188 }
189 }
190
191 impl FromRawHandle for CompletionPort {
192 unsafe fn from_raw_handle(handle: HANDLE) -> CompletionPort {
193 CompletionPort { handle: Handle::new(handle) }
194 }
195 }
196
197 impl IntoRawHandle for CompletionPort {
198 fn into_raw_handle(self) -> HANDLE {
199 self.handle.into_raw()
200 }
201 }
202
203 impl CompletionStatus {
204 /// Creates a new completion status with the provided parameters.
205 ///
206 /// This function is useful when creating a status to send to a port with
207 /// the `post` method. The parameters are opaquely passed through and not
208 /// interpreted by the system at all.
209 pub fn new(bytes: u32, token: usize, overlapped: *mut Overlapped)
210 -> CompletionStatus {
211 assert_eq!(mem::size_of_val(&token), mem::size_of::<ULONG_PTR>());
212 CompletionStatus(OVERLAPPED_ENTRY {
213 dwNumberOfBytesTransferred: bytes,
214 lpCompletionKey: token as ULONG_PTR,
215 lpOverlapped: overlapped as *mut _,
216 Internal: 0,
217 })
218 }
219
220 /// Creates a new borrowed completion status from the borrowed
221 /// `OVERLAPPED_ENTRY` argument provided.
222 ///
223 /// This method will wrap the `OVERLAPPED_ENTRY` in a `CompletionStatus`,
224 /// returning the wrapped structure.
225 pub fn from_entry(entry: &OVERLAPPED_ENTRY) -> &CompletionStatus {
226 unsafe { &*(entry as *const _ as *const _) }
227 }
228
229 /// Creates a new "zero" completion status.
230 ///
231 /// This function is useful when creating a stack buffer or vector of
232 /// completion statuses to be passed to the `get_many` function.
233 pub fn zero() -> CompletionStatus {
234 CompletionStatus::new(0, 0, 0 as *mut _)
235 }
236
237 /// Returns the number of bytes that were transferred for the I/O operation
238 /// associated with this completion status.
239 pub fn bytes_transferred(&self) -> u32 {
240 self.0.dwNumberOfBytesTransferred
241 }
242
243 /// Returns the completion key value associated with the file handle whose
244 /// I/O operation has completed.
245 ///
246 /// A completion key is a per-handle key that is specified when it is added
247 /// to an I/O completion port via `add_handle` or `add_socket`.
248 pub fn token(&self) -> usize {
249 self.0.lpCompletionKey as usize
250 }
251
252 /// Returns a pointer to the `Overlapped` structure that was specified when
253 /// the I/O operation was started.
254 pub fn overlapped(&self) -> *mut OVERLAPPED {
255 self.0.lpOverlapped
256 }
257
258 /// Returns a pointer to the internal `OVERLAPPED_ENTRY` object.
259 pub fn entry(&self) -> &OVERLAPPED_ENTRY {
260 &self.0
261 }
262 }
263
264 #[cfg(test)]
265 mod tests {
266 use std::mem;
267 use std::time::Duration;
268
269 use winapi::shared::basetsd::*;
270 use winapi::shared::winerror::*;
271
272 use iocp::{CompletionPort, CompletionStatus};
273
274 #[test]
275 fn is_send_sync() {
276 fn is_send_sync<T: Send + Sync>() {}
277 is_send_sync::<CompletionPort>();
278 }
279
280 #[test]
281 fn token_right_size() {
282 assert_eq!(mem::size_of::<usize>(), mem::size_of::<ULONG_PTR>());
283 }
284
285 #[test]
286 fn timeout() {
287 let c = CompletionPort::new(1).unwrap();
288 let err = c.get(Some(Duration::from_millis(1))).unwrap_err();
289 assert_eq!(err.raw_os_error(), Some(WAIT_TIMEOUT as i32));
290 }
291
292 #[test]
293 fn get() {
294 let c = CompletionPort::new(1).unwrap();
295 c.post(CompletionStatus::new(1, 2, 3 as *mut _)).unwrap();
296 let s = c.get(None).unwrap();
297 assert_eq!(s.bytes_transferred(), 1);
298 assert_eq!(s.token(), 2);
299 assert_eq!(s.overlapped(), 3 as *mut _);
300 }
301
302 #[test]
303 fn get_many() {
304 let c = CompletionPort::new(1).unwrap();
305
306 c.post(CompletionStatus::new(1, 2, 3 as *mut _)).unwrap();
307 c.post(CompletionStatus::new(4, 5, 6 as *mut _)).unwrap();
308
309 let mut s = vec![CompletionStatus::zero(); 4];
310 {
311 let s = c.get_many(&mut s, None).unwrap();
312 assert_eq!(s.len(), 2);
313 assert_eq!(s[0].bytes_transferred(), 1);
314 assert_eq!(s[0].token(), 2);
315 assert_eq!(s[0].overlapped(), 3 as *mut _);
316 assert_eq!(s[1].bytes_transferred(), 4);
317 assert_eq!(s[1].token(), 5);
318 assert_eq!(s[1].overlapped(), 6 as *mut _);
319 }
320 assert_eq!(s[2].bytes_transferred(), 0);
321 assert_eq!(s[2].token(), 0);
322 assert_eq!(s[2].overlapped(), 0 as *mut _);
323 }
324 }