1 //! Bindings to IOCP, I/O Completion Ports
6 use std
::os
::windows
::io
::*;
7 use std
::time
::Duration
;
14 /// A handle to an Windows I/O Completion Port.
16 pub struct CompletionPort
{
20 /// A status message received from an I/O completion port.
22 /// These statuses can be created via the `new` or `empty` constructors and then
23 /// provided to a completion port, or they are read out of a completion port.
24 /// The fields of each status are read through its accessor methods.
25 #[derive(Clone, Copy, Debug)]
26 pub struct CompletionStatus(OVERLAPPED_ENTRY
);
28 unsafe impl Send
for CompletionStatus {}
29 unsafe impl Sync
for CompletionStatus {}
32 /// Creates a new I/O completion port with the specified concurrency value.
34 /// The number of threads given corresponds to the level of concurrency
35 /// allowed for threads associated with this port. Consult the Windows
36 /// documentation for more information about this value.
37 pub fn new(threads
: u32) -> io
::Result
<CompletionPort
> {
39 CreateIoCompletionPort(INVALID_HANDLE_VALUE
, 0 as *mut _
,
43 Err(io
::Error
::last_os_error())
45 Ok(CompletionPort { handle: Handle::new(ret) }
)
49 /// Associates a new `HANDLE` to this I/O completion port.
51 /// This function will associate the given handle to this port with the
52 /// given `token` to be returned in status messages whenever it receives a
55 /// Any object which is convertible to a `HANDLE` via the `AsRawHandle`
56 /// trait can be provided to this function, such as `std::fs::File` and
58 pub fn add_handle
<T
: AsRawHandle
+ ?Sized
>(&self, token
: usize,
59 t
: &T
) -> io
::Result
<()> {
60 self._add(token
, t
.as_raw_handle())
63 /// Associates a new `SOCKET` to this I/O completion port.
65 /// This function will associate the given socket to this port with the
66 /// given `token` to be returned in status messages whenever it receives a
69 /// Any object which is convertible to a `SOCKET` via the `AsRawSocket`
70 /// trait can be provided to this function, such as `std::net::TcpStream`
72 pub fn add_socket
<T
: AsRawSocket
+ ?Sized
>(&self, token
: usize,
73 t
: &T
) -> io
::Result
<()> {
74 self._add(token
, t
.as_raw_socket() as HANDLE
)
77 fn _add(&self, token
: usize, handle
: HANDLE
) -> io
::Result
<()> {
78 assert_eq
!(mem
::size_of_val(&token
), mem
::size_of
::<ULONG_PTR
>());
80 CreateIoCompletionPort(handle
, self.handle
.raw(),
81 token
as ULONG_PTR
, 0)
84 Err(io
::Error
::last_os_error())
86 debug_assert_eq
!(ret
, self.handle
.raw());
91 /// Dequeue a completion status from this I/O completion port.
93 /// This function will associate the calling thread with this completion
94 /// port and then wait for a status message to become available. The precise
95 /// semantics on when this function returns depends on the concurrency value
96 /// specified when the port was created.
98 /// A timeout can optionally be specified to this function. If `None` is
99 /// provided this function will not time out, and otherwise it will time out
100 /// after the specified duration has passed.
102 /// On success this will return the status message which was dequeued from
103 /// this completion port.
104 pub fn get(&self, timeout
: Option
<Duration
>) -> io
::Result
<CompletionStatus
> {
107 let mut overlapped
= 0 as *mut _
;
108 let timeout
= ::dur2ms(timeout
);
110 GetQueuedCompletionStatus(self.handle
.raw(),
117 CompletionStatus(OVERLAPPED_ENTRY
{
118 dwNumberOfBytesTransferred
: bytes
,
119 lpCompletionKey
: token
,
120 lpOverlapped
: overlapped
,
126 /// Dequeues a number of completion statuses from this I/O completion port.
128 /// This function is the same as `get` except that it may return more than
129 /// one status. A buffer of "zero" statuses is provided (the contents are
130 /// not read) and then on success this function will return a sub-slice of
131 /// statuses which represent those which were dequeued from this port. This
132 /// function does not wait to fill up the entire list of statuses provided.
134 /// Like with `get`, a timeout may be specified for this operation.
135 pub fn get_many
<'a
>(&self,
136 list
: &'a
mut [CompletionStatus
],
137 timeout
: Option
<Duration
>)
138 -> io
::Result
<&'a
mut [CompletionStatus
]>
140 debug_assert_eq
!(mem
::size_of
::<CompletionStatus
>(),
141 mem
::size_of
::<OVERLAPPED_ENTRY
>());
143 let timeout
= ::dur2ms(timeout
);
144 let len
= cmp
::min(list
.len(), <ULONG
>::max_value() as usize) as ULONG
;
146 GetQueuedCompletionStatusEx(self.handle
.raw(),
147 list
.as_ptr() as *mut _
,
154 Ok(_
) => Ok(&mut list
[..removed
as usize]),
159 /// Posts a new completion status onto this I/O completion port.
161 /// This function will post the given status, with custom parameters, to the
162 /// port. Threads blocked in `get` or `get_many` will eventually receive
164 pub fn post(&self, status
: CompletionStatus
) -> io
::Result
<()> {
166 PostQueuedCompletionStatus(self.handle
.raw(),
167 status
.0.dwNumberOfBytesTransferred
,
168 status
.0.lpCompletionKey
,
169 status
.0.lpOverlapped
)
171 ::cvt(ret
).map(|_
| ())
175 impl AsRawHandle
for CompletionPort
{
176 fn as_raw_handle(&self) -> HANDLE
{
181 impl FromRawHandle
for CompletionPort
{
182 unsafe fn from_raw_handle(handle
: HANDLE
) -> CompletionPort
{
183 CompletionPort { handle: Handle::new(handle) }
187 impl IntoRawHandle
for CompletionPort
{
188 fn into_raw_handle(self) -> HANDLE
{
189 self.handle
.into_raw()
193 impl CompletionStatus
{
194 /// Creates a new completion status with the provided parameters.
196 /// This function is useful when creating a status to send to a port with
197 /// the `post` method. The parameters are opaquely passed through and not
198 /// interpreted by the system at all.
199 pub fn new(bytes
: u32, token
: usize, overlapped
: *mut Overlapped
)
200 -> CompletionStatus
{
201 assert_eq
!(mem
::size_of_val(&token
), mem
::size_of
::<ULONG_PTR
>());
202 CompletionStatus(OVERLAPPED_ENTRY
{
203 dwNumberOfBytesTransferred
: bytes
,
204 lpCompletionKey
: token
as ULONG_PTR
,
205 lpOverlapped
: overlapped
as *mut _
,
210 /// Creates a new borrowed completion status from the borrowed
211 /// `OVERLAPPED_ENTRY` argument provided.
213 /// This method will wrap the `OVERLAPPED_ENTRY` in a `CompletionStatus`,
214 /// returning the wrapped structure.
215 pub fn from_entry(entry
: &OVERLAPPED_ENTRY
) -> &CompletionStatus
{
216 unsafe { &*(entry as *const _ as *const _) }
219 /// Creates a new "zero" completion status.
221 /// This function is useful when creating a stack buffer or vector of
222 /// completion statuses to be passed to the `get_many` function.
223 pub fn zero() -> CompletionStatus
{
224 CompletionStatus
::new(0, 0, 0 as *mut _
)
227 /// Returns the number of bytes that were transferred for the I/O operation
228 /// associated with this completion status.
229 pub fn bytes_transferred(&self) -> u32 {
230 self.0.dwNumberOfBytesTransferred
233 /// Returns the completion key value associated with the file handle whose
234 /// I/O operation has completed.
236 /// A completion key is a per-handle key that is specified when it is added
237 /// to an I/O completion port via `add_handle` or `add_socket`.
238 pub fn token(&self) -> usize {
239 self.0.lpCompletionKey
as usize
242 /// Returns a pointer to the `Overlapped` structure that was specified when
243 /// the I/O operation was started.
244 pub fn overlapped(&self) -> *mut OVERLAPPED
{
248 /// Returns a pointer to the internal `OVERLAPPED_ENTRY` object.
249 pub fn entry(&self) -> &OVERLAPPED_ENTRY
{
257 use std
::time
::Duration
;
261 use iocp
::{CompletionPort, CompletionStatus}
;
265 fn is_send_sync
<T
: Send
+ Sync
>() {}
266 is_send_sync
::<CompletionPort
>();
270 fn token_right_size() {
271 assert_eq
!(mem
::size_of
::<usize>(), mem
::size_of
::<ULONG_PTR
>());
276 let c
= CompletionPort
::new(1).unwrap();
277 let err
= c
.get(Some(Duration
::from_millis(1))).unwrap_err();
278 assert_eq
!(err
.raw_os_error(), Some(WAIT_TIMEOUT
as i32));
283 let c
= CompletionPort
::new(1).unwrap();
284 c
.post(CompletionStatus
::new(1, 2, 3 as *mut _
)).unwrap();
285 let s
= c
.get(None
).unwrap();
286 assert_eq
!(s
.bytes_transferred(), 1);
287 assert_eq
!(s
.token(), 2);
288 assert_eq
!(s
.overlapped(), 3 as *mut _
);
293 let c
= CompletionPort
::new(1).unwrap();
295 c
.post(CompletionStatus
::new(1, 2, 3 as *mut _
)).unwrap();
296 c
.post(CompletionStatus
::new(4, 5, 6 as *mut _
)).unwrap();
298 let mut s
= vec
![CompletionStatus
::zero(); 4];
300 let s
= c
.get_many(&mut s
, None
).unwrap();
301 assert_eq
!(s
.len(), 2);
302 assert_eq
!(s
[0].bytes_transferred(), 1);
303 assert_eq
!(s
[0].token(), 2);
304 assert_eq
!(s
[0].overlapped(), 3 as *mut _
);
305 assert_eq
!(s
[1].bytes_transferred(), 4);
306 assert_eq
!(s
[1].token(), 5);
307 assert_eq
!(s
[1].overlapped(), 6 as *mut _
);
309 assert_eq
!(s
[2].bytes_transferred(), 0);
310 assert_eq
!(s
[2].token(), 0);
311 assert_eq
!(s
[2].overlapped(), 0 as *mut _
);