]> git.proxmox.com Git - rustc.git/blob - src/vendor/miow/src/iocp.rs
New upstream version 1.23.0+dfsg1
[rustc.git] / src / vendor / miow / src / iocp.rs
1 //! Bindings to IOCP, I/O Completion Ports
2
3 use std::cmp;
4 use std::io;
5 use std::mem;
6 use std::os::windows::io::*;
7 use std::time::Duration;
8
9 use handle::Handle;
10 use winapi::*;
11 use kernel32::*;
12 use Overlapped;
13
14 /// A handle to an Windows I/O Completion Port.
15 #[derive(Debug)]
16 pub struct CompletionPort {
17 handle: Handle,
18 }
19
20 /// A status message received from an I/O completion port.
21 ///
22 /// These statuses can be created via the `new` or `empty` constructors and then
23 /// provided to a completion port, or they are read out of a completion port.
24 /// The fields of each status are read through its accessor methods.
25 #[derive(Clone, Copy, Debug)]
26 pub struct CompletionStatus(OVERLAPPED_ENTRY);
27
28 unsafe impl Send for CompletionStatus {}
29 unsafe impl Sync for CompletionStatus {}
30
31 impl CompletionPort {
32 /// Creates a new I/O completion port with the specified concurrency value.
33 ///
34 /// The number of threads given corresponds to the level of concurrency
35 /// allowed for threads associated with this port. Consult the Windows
36 /// documentation for more information about this value.
37 pub fn new(threads: u32) -> io::Result<CompletionPort> {
38 let ret = unsafe {
39 CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0 as *mut _,
40 0, threads)
41 };
42 if ret.is_null() {
43 Err(io::Error::last_os_error())
44 } else {
45 Ok(CompletionPort { handle: Handle::new(ret) })
46 }
47 }
48
49 /// Associates a new `HANDLE` to this I/O completion port.
50 ///
51 /// This function will associate the given handle to this port with the
52 /// given `token` to be returned in status messages whenever it receives a
53 /// notification.
54 ///
55 /// Any object which is convertible to a `HANDLE` via the `AsRawHandle`
56 /// trait can be provided to this function, such as `std::fs::File` and
57 /// friends.
58 pub fn add_handle<T: AsRawHandle + ?Sized>(&self, token: usize,
59 t: &T) -> io::Result<()> {
60 self._add(token, t.as_raw_handle())
61 }
62
63 /// Associates a new `SOCKET` to this I/O completion port.
64 ///
65 /// This function will associate the given socket to this port with the
66 /// given `token` to be returned in status messages whenever it receives a
67 /// notification.
68 ///
69 /// Any object which is convertible to a `SOCKET` via the `AsRawSocket`
70 /// trait can be provided to this function, such as `std::net::TcpStream`
71 /// and friends.
72 pub fn add_socket<T: AsRawSocket + ?Sized>(&self, token: usize,
73 t: &T) -> io::Result<()> {
74 self._add(token, t.as_raw_socket() as HANDLE)
75 }
76
77 fn _add(&self, token: usize, handle: HANDLE) -> io::Result<()> {
78 assert_eq!(mem::size_of_val(&token), mem::size_of::<ULONG_PTR>());
79 let ret = unsafe {
80 CreateIoCompletionPort(handle, self.handle.raw(),
81 token as ULONG_PTR, 0)
82 };
83 if ret.is_null() {
84 Err(io::Error::last_os_error())
85 } else {
86 debug_assert_eq!(ret, self.handle.raw());
87 Ok(())
88 }
89 }
90
91 /// Dequeue a completion status from this I/O completion port.
92 ///
93 /// This function will associate the calling thread with this completion
94 /// port and then wait for a status message to become available. The precise
95 /// semantics on when this function returns depends on the concurrency value
96 /// specified when the port was created.
97 ///
98 /// A timeout can optionally be specified to this function. If `None` is
99 /// provided this function will not time out, and otherwise it will time out
100 /// after the specified duration has passed.
101 ///
102 /// On success this will return the status message which was dequeued from
103 /// this completion port.
104 pub fn get(&self, timeout: Option<Duration>) -> io::Result<CompletionStatus> {
105 let mut bytes = 0;
106 let mut token = 0;
107 let mut overlapped = 0 as *mut _;
108 let timeout = ::dur2ms(timeout);
109 let ret = unsafe {
110 GetQueuedCompletionStatus(self.handle.raw(),
111 &mut bytes,
112 &mut token,
113 &mut overlapped,
114 timeout)
115 };
116 ::cvt(ret).map(|_| {
117 CompletionStatus(OVERLAPPED_ENTRY {
118 dwNumberOfBytesTransferred: bytes,
119 lpCompletionKey: token,
120 lpOverlapped: overlapped,
121 Internal: 0,
122 })
123 })
124 }
125
126 /// Dequeues a number of completion statuses from this I/O completion port.
127 ///
128 /// This function is the same as `get` except that it may return more than
129 /// one status. A buffer of "zero" statuses is provided (the contents are
130 /// not read) and then on success this function will return a sub-slice of
131 /// statuses which represent those which were dequeued from this port. This
132 /// function does not wait to fill up the entire list of statuses provided.
133 ///
134 /// Like with `get`, a timeout may be specified for this operation.
135 pub fn get_many<'a>(&self,
136 list: &'a mut [CompletionStatus],
137 timeout: Option<Duration>)
138 -> io::Result<&'a mut [CompletionStatus]>
139 {
140 debug_assert_eq!(mem::size_of::<CompletionStatus>(),
141 mem::size_of::<OVERLAPPED_ENTRY>());
142 let mut removed = 0;
143 let timeout = ::dur2ms(timeout);
144 let len = cmp::min(list.len(), <ULONG>::max_value() as usize) as ULONG;
145 let ret = unsafe {
146 GetQueuedCompletionStatusEx(self.handle.raw(),
147 list.as_ptr() as *mut _,
148 len,
149 &mut removed,
150 timeout,
151 FALSE)
152 };
153 match ::cvt(ret) {
154 Ok(_) => Ok(&mut list[..removed as usize]),
155 Err(e) => Err(e),
156 }
157 }
158
159 /// Posts a new completion status onto this I/O completion port.
160 ///
161 /// This function will post the given status, with custom parameters, to the
162 /// port. Threads blocked in `get` or `get_many` will eventually receive
163 /// this status.
164 pub fn post(&self, status: CompletionStatus) -> io::Result<()> {
165 let ret = unsafe {
166 PostQueuedCompletionStatus(self.handle.raw(),
167 status.0.dwNumberOfBytesTransferred,
168 status.0.lpCompletionKey,
169 status.0.lpOverlapped)
170 };
171 ::cvt(ret).map(|_| ())
172 }
173 }
174
175 impl AsRawHandle for CompletionPort {
176 fn as_raw_handle(&self) -> HANDLE {
177 self.handle.raw()
178 }
179 }
180
181 impl FromRawHandle for CompletionPort {
182 unsafe fn from_raw_handle(handle: HANDLE) -> CompletionPort {
183 CompletionPort { handle: Handle::new(handle) }
184 }
185 }
186
187 impl IntoRawHandle for CompletionPort {
188 fn into_raw_handle(self) -> HANDLE {
189 self.handle.into_raw()
190 }
191 }
192
193 impl CompletionStatus {
194 /// Creates a new completion status with the provided parameters.
195 ///
196 /// This function is useful when creating a status to send to a port with
197 /// the `post` method. The parameters are opaquely passed through and not
198 /// interpreted by the system at all.
199 pub fn new(bytes: u32, token: usize, overlapped: *mut Overlapped)
200 -> CompletionStatus {
201 assert_eq!(mem::size_of_val(&token), mem::size_of::<ULONG_PTR>());
202 CompletionStatus(OVERLAPPED_ENTRY {
203 dwNumberOfBytesTransferred: bytes,
204 lpCompletionKey: token as ULONG_PTR,
205 lpOverlapped: overlapped as *mut _,
206 Internal: 0,
207 })
208 }
209
210 /// Creates a new borrowed completion status from the borrowed
211 /// `OVERLAPPED_ENTRY` argument provided.
212 ///
213 /// This method will wrap the `OVERLAPPED_ENTRY` in a `CompletionStatus`,
214 /// returning the wrapped structure.
215 pub fn from_entry(entry: &OVERLAPPED_ENTRY) -> &CompletionStatus {
216 unsafe { &*(entry as *const _ as *const _) }
217 }
218
219 /// Creates a new "zero" completion status.
220 ///
221 /// This function is useful when creating a stack buffer or vector of
222 /// completion statuses to be passed to the `get_many` function.
223 pub fn zero() -> CompletionStatus {
224 CompletionStatus::new(0, 0, 0 as *mut _)
225 }
226
227 /// Returns the number of bytes that were transferred for the I/O operation
228 /// associated with this completion status.
229 pub fn bytes_transferred(&self) -> u32 {
230 self.0.dwNumberOfBytesTransferred
231 }
232
233 /// Returns the completion key value associated with the file handle whose
234 /// I/O operation has completed.
235 ///
236 /// A completion key is a per-handle key that is specified when it is added
237 /// to an I/O completion port via `add_handle` or `add_socket`.
238 pub fn token(&self) -> usize {
239 self.0.lpCompletionKey as usize
240 }
241
242 /// Returns a pointer to the `Overlapped` structure that was specified when
243 /// the I/O operation was started.
244 pub fn overlapped(&self) -> *mut OVERLAPPED {
245 self.0.lpOverlapped
246 }
247
248 /// Returns a pointer to the internal `OVERLAPPED_ENTRY` object.
249 pub fn entry(&self) -> &OVERLAPPED_ENTRY {
250 &self.0
251 }
252 }
253
254 #[cfg(test)]
255 mod tests {
256 use std::mem;
257 use std::time::Duration;
258
259 use winapi::*;
260
261 use iocp::{CompletionPort, CompletionStatus};
262
263 #[test]
264 fn is_send_sync() {
265 fn is_send_sync<T: Send + Sync>() {}
266 is_send_sync::<CompletionPort>();
267 }
268
269 #[test]
270 fn token_right_size() {
271 assert_eq!(mem::size_of::<usize>(), mem::size_of::<ULONG_PTR>());
272 }
273
274 #[test]
275 fn timeout() {
276 let c = CompletionPort::new(1).unwrap();
277 let err = c.get(Some(Duration::from_millis(1))).unwrap_err();
278 assert_eq!(err.raw_os_error(), Some(WAIT_TIMEOUT as i32));
279 }
280
281 #[test]
282 fn get() {
283 let c = CompletionPort::new(1).unwrap();
284 c.post(CompletionStatus::new(1, 2, 3 as *mut _)).unwrap();
285 let s = c.get(None).unwrap();
286 assert_eq!(s.bytes_transferred(), 1);
287 assert_eq!(s.token(), 2);
288 assert_eq!(s.overlapped(), 3 as *mut _);
289 }
290
291 #[test]
292 fn get_many() {
293 let c = CompletionPort::new(1).unwrap();
294
295 c.post(CompletionStatus::new(1, 2, 3 as *mut _)).unwrap();
296 c.post(CompletionStatus::new(4, 5, 6 as *mut _)).unwrap();
297
298 let mut s = vec![CompletionStatus::zero(); 4];
299 {
300 let s = c.get_many(&mut s, None).unwrap();
301 assert_eq!(s.len(), 2);
302 assert_eq!(s[0].bytes_transferred(), 1);
303 assert_eq!(s[0].token(), 2);
304 assert_eq!(s[0].overlapped(), 3 as *mut _);
305 assert_eq!(s[1].bytes_transferred(), 4);
306 assert_eq!(s[1].token(), 5);
307 assert_eq!(s[1].overlapped(), 6 as *mut _);
308 }
309 assert_eq!(s[2].bytes_transferred(), 0);
310 assert_eq!(s[2].token(), 0);
311 assert_eq!(s[2].overlapped(), 0 as *mut _);
312 }
313 }