]> git.proxmox.com Git - rustc.git/blob - vendor/miow/src/iocp.rs
New upstream version 1.50.0+dfsg1
[rustc.git] / vendor / miow / src / iocp.rs
1 //! Bindings to IOCP, I/O Completion Ports
2
3 use std::cmp;
4 use std::fmt;
5 use std::io;
6 use std::mem;
7 use std::os::windows::io::*;
8 use std::time::Duration;
9
10 use crate::handle::Handle;
11 use crate::Overlapped;
12 use winapi::shared::basetsd::*;
13 use winapi::shared::ntdef::*;
14 use winapi::um::handleapi::*;
15 use winapi::um::ioapiset::*;
16 use winapi::um::minwinbase::*;
17
18 /// A handle to an Windows I/O Completion Port.
19 #[derive(Debug)]
20 pub struct CompletionPort {
21 handle: Handle,
22 }
23
24 /// A status message received from an I/O completion port.
25 ///
26 /// These statuses can be created via the `new` or `empty` constructors and then
27 /// provided to a completion port, or they are read out of a completion port.
28 /// The fields of each status are read through its accessor methods.
29 #[derive(Clone, Copy)]
30 pub struct CompletionStatus(OVERLAPPED_ENTRY);
31
32 impl fmt::Debug for CompletionStatus {
33 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
34 write!(f, "CompletionStatus(OVERLAPPED_ENTRY)")
35 }
36 }
37
38 unsafe impl Send for CompletionStatus {}
39 unsafe impl Sync for CompletionStatus {}
40
41 impl CompletionPort {
42 /// Creates a new I/O completion port with the specified concurrency value.
43 ///
44 /// The number of threads given corresponds to the level of concurrency
45 /// allowed for threads associated with this port. Consult the Windows
46 /// documentation for more information about this value.
47 pub fn new(threads: u32) -> io::Result<CompletionPort> {
48 let ret = unsafe { CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0 as *mut _, 0, threads) };
49 if ret.is_null() {
50 Err(io::Error::last_os_error())
51 } else {
52 Ok(CompletionPort {
53 handle: Handle::new(ret),
54 })
55 }
56 }
57
58 /// Associates a new `HANDLE` to this I/O completion port.
59 ///
60 /// This function will associate the given handle to this port with the
61 /// given `token` to be returned in status messages whenever it receives a
62 /// notification.
63 ///
64 /// Any object which is convertible to a `HANDLE` via the `AsRawHandle`
65 /// trait can be provided to this function, such as `std::fs::File` and
66 /// friends.
67 pub fn add_handle<T: AsRawHandle + ?Sized>(&self, token: usize, t: &T) -> io::Result<()> {
68 self._add(token, t.as_raw_handle())
69 }
70
71 /// Associates a new `SOCKET` to this I/O completion port.
72 ///
73 /// This function will associate the given socket to this port with the
74 /// given `token` to be returned in status messages whenever it receives a
75 /// notification.
76 ///
77 /// Any object which is convertible to a `SOCKET` via the `AsRawSocket`
78 /// trait can be provided to this function, such as `std::net::TcpStream`
79 /// and friends.
80 pub fn add_socket<T: AsRawSocket + ?Sized>(&self, token: usize, t: &T) -> io::Result<()> {
81 self._add(token, t.as_raw_socket() as HANDLE)
82 }
83
84 fn _add(&self, token: usize, handle: HANDLE) -> io::Result<()> {
85 assert_eq!(mem::size_of_val(&token), mem::size_of::<ULONG_PTR>());
86 let ret =
87 unsafe { CreateIoCompletionPort(handle, self.handle.raw(), token as ULONG_PTR, 0) };
88 if ret.is_null() {
89 Err(io::Error::last_os_error())
90 } else {
91 debug_assert_eq!(ret, self.handle.raw());
92 Ok(())
93 }
94 }
95
96 /// Dequeue a completion status from this I/O completion port.
97 ///
98 /// This function will associate the calling thread with this completion
99 /// port and then wait for a status message to become available. The precise
100 /// semantics on when this function returns depends on the concurrency value
101 /// specified when the port was created.
102 ///
103 /// A timeout can optionally be specified to this function. If `None` is
104 /// provided this function will not time out, and otherwise it will time out
105 /// after the specified duration has passed.
106 ///
107 /// On success this will return the status message which was dequeued from
108 /// this completion port.
109 pub fn get(&self, timeout: Option<Duration>) -> io::Result<CompletionStatus> {
110 let mut bytes = 0;
111 let mut token = 0;
112 let mut overlapped = 0 as *mut _;
113 let timeout = crate::dur2ms(timeout);
114 let ret = unsafe {
115 GetQueuedCompletionStatus(
116 self.handle.raw(),
117 &mut bytes,
118 &mut token,
119 &mut overlapped,
120 timeout,
121 )
122 };
123 crate::cvt(ret).map(|_| {
124 CompletionStatus(OVERLAPPED_ENTRY {
125 dwNumberOfBytesTransferred: bytes,
126 lpCompletionKey: token,
127 lpOverlapped: overlapped,
128 Internal: 0,
129 })
130 })
131 }
132
133 /// Dequeues a number of completion statuses from this I/O completion port.
134 ///
135 /// This function is the same as `get` except that it may return more than
136 /// one status. A buffer of "zero" statuses is provided (the contents are
137 /// not read) and then on success this function will return a sub-slice of
138 /// statuses which represent those which were dequeued from this port. This
139 /// function does not wait to fill up the entire list of statuses provided.
140 ///
141 /// Like with `get`, a timeout may be specified for this operation.
142 pub fn get_many<'a>(
143 &self,
144 list: &'a mut [CompletionStatus],
145 timeout: Option<Duration>,
146 ) -> io::Result<&'a mut [CompletionStatus]> {
147 debug_assert_eq!(
148 mem::size_of::<CompletionStatus>(),
149 mem::size_of::<OVERLAPPED_ENTRY>()
150 );
151 let mut removed = 0;
152 let timeout = crate::dur2ms(timeout);
153 let len = cmp::min(list.len(), <ULONG>::max_value() as usize) as ULONG;
154 let ret = unsafe {
155 GetQueuedCompletionStatusEx(
156 self.handle.raw(),
157 list.as_ptr() as *mut _,
158 len,
159 &mut removed,
160 timeout,
161 FALSE as i32,
162 )
163 };
164 match crate::cvt(ret) {
165 Ok(_) => Ok(&mut list[..removed as usize]),
166 Err(e) => Err(e),
167 }
168 }
169
170 /// Posts a new completion status onto this I/O completion port.
171 ///
172 /// This function will post the given status, with custom parameters, to the
173 /// port. Threads blocked in `get` or `get_many` will eventually receive
174 /// this status.
175 pub fn post(&self, status: CompletionStatus) -> io::Result<()> {
176 let ret = unsafe {
177 PostQueuedCompletionStatus(
178 self.handle.raw(),
179 status.0.dwNumberOfBytesTransferred,
180 status.0.lpCompletionKey,
181 status.0.lpOverlapped,
182 )
183 };
184 crate::cvt(ret).map(|_| ())
185 }
186 }
187
188 impl AsRawHandle for CompletionPort {
189 fn as_raw_handle(&self) -> HANDLE {
190 self.handle.raw()
191 }
192 }
193
194 impl FromRawHandle for CompletionPort {
195 unsafe fn from_raw_handle(handle: HANDLE) -> CompletionPort {
196 CompletionPort {
197 handle: Handle::new(handle),
198 }
199 }
200 }
201
202 impl IntoRawHandle for CompletionPort {
203 fn into_raw_handle(self) -> HANDLE {
204 self.handle.into_raw()
205 }
206 }
207
208 impl CompletionStatus {
209 /// Creates a new completion status with the provided parameters.
210 ///
211 /// This function is useful when creating a status to send to a port with
212 /// the `post` method. The parameters are opaquely passed through and not
213 /// interpreted by the system at all.
214 pub fn new(bytes: u32, token: usize, overlapped: *mut Overlapped) -> CompletionStatus {
215 assert_eq!(mem::size_of_val(&token), mem::size_of::<ULONG_PTR>());
216 CompletionStatus(OVERLAPPED_ENTRY {
217 dwNumberOfBytesTransferred: bytes,
218 lpCompletionKey: token as ULONG_PTR,
219 lpOverlapped: overlapped as *mut _,
220 Internal: 0,
221 })
222 }
223
224 /// Creates a new borrowed completion status from the borrowed
225 /// `OVERLAPPED_ENTRY` argument provided.
226 ///
227 /// This method will wrap the `OVERLAPPED_ENTRY` in a `CompletionStatus`,
228 /// returning the wrapped structure.
229 pub fn from_entry(entry: &OVERLAPPED_ENTRY) -> &CompletionStatus {
230 unsafe { &*(entry as *const _ as *const _) }
231 }
232
233 /// Creates a new "zero" completion status.
234 ///
235 /// This function is useful when creating a stack buffer or vector of
236 /// completion statuses to be passed to the `get_many` function.
237 pub fn zero() -> CompletionStatus {
238 CompletionStatus::new(0, 0, 0 as *mut _)
239 }
240
241 /// Returns the number of bytes that were transferred for the I/O operation
242 /// associated with this completion status.
243 pub fn bytes_transferred(&self) -> u32 {
244 self.0.dwNumberOfBytesTransferred
245 }
246
247 /// Returns the completion key value associated with the file handle whose
248 /// I/O operation has completed.
249 ///
250 /// A completion key is a per-handle key that is specified when it is added
251 /// to an I/O completion port via `add_handle` or `add_socket`.
252 pub fn token(&self) -> usize {
253 self.0.lpCompletionKey as usize
254 }
255
256 /// Returns a pointer to the `Overlapped` structure that was specified when
257 /// the I/O operation was started.
258 pub fn overlapped(&self) -> *mut OVERLAPPED {
259 self.0.lpOverlapped
260 }
261
262 /// Returns a pointer to the internal `OVERLAPPED_ENTRY` object.
263 pub fn entry(&self) -> &OVERLAPPED_ENTRY {
264 &self.0
265 }
266 }
267
268 #[cfg(test)]
269 mod tests {
270 use std::mem;
271 use std::time::Duration;
272
273 use winapi::shared::basetsd::*;
274 use winapi::shared::winerror::*;
275
276 use crate::iocp::{CompletionPort, CompletionStatus};
277
278 #[test]
279 fn is_send_sync() {
280 fn is_send_sync<T: Send + Sync>() {}
281 is_send_sync::<CompletionPort>();
282 }
283
284 #[test]
285 fn token_right_size() {
286 assert_eq!(mem::size_of::<usize>(), mem::size_of::<ULONG_PTR>());
287 }
288
289 #[test]
290 fn timeout() {
291 let c = CompletionPort::new(1).unwrap();
292 let err = c.get(Some(Duration::from_millis(1))).unwrap_err();
293 assert_eq!(err.raw_os_error(), Some(WAIT_TIMEOUT as i32));
294 }
295
296 #[test]
297 fn get() {
298 let c = CompletionPort::new(1).unwrap();
299 c.post(CompletionStatus::new(1, 2, 3 as *mut _)).unwrap();
300 let s = c.get(None).unwrap();
301 assert_eq!(s.bytes_transferred(), 1);
302 assert_eq!(s.token(), 2);
303 assert_eq!(s.overlapped(), 3 as *mut _);
304 }
305
306 #[test]
307 fn get_many() {
308 let c = CompletionPort::new(1).unwrap();
309
310 c.post(CompletionStatus::new(1, 2, 3 as *mut _)).unwrap();
311 c.post(CompletionStatus::new(4, 5, 6 as *mut _)).unwrap();
312
313 let mut s = vec![CompletionStatus::zero(); 4];
314 {
315 let s = c.get_many(&mut s, None).unwrap();
316 assert_eq!(s.len(), 2);
317 assert_eq!(s[0].bytes_transferred(), 1);
318 assert_eq!(s[0].token(), 2);
319 assert_eq!(s[0].overlapped(), 3 as *mut _);
320 assert_eq!(s[1].bytes_transferred(), 4);
321 assert_eq!(s[1].token(), 5);
322 assert_eq!(s[1].overlapped(), 6 as *mut _);
323 }
324 assert_eq!(s[2].bytes_transferred(), 0);
325 assert_eq!(s[2].token(), 0);
326 assert_eq!(s[2].overlapped(), 0 as *mut _);
327 }
328 }