1 //! Bindings to IOCP, I/O Completion Ports
7 use std
::os
::windows
::io
::*;
8 use std
::time
::Duration
;
10 use crate::handle
::Handle
;
11 use crate::Overlapped
;
12 use winapi
::shared
::basetsd
::*;
13 use winapi
::shared
::ntdef
::*;
14 use winapi
::um
::handleapi
::*;
15 use winapi
::um
::ioapiset
::*;
16 use winapi
::um
::minwinbase
::*;
18 /// A handle to an Windows I/O Completion Port.
20 pub struct CompletionPort
{
24 /// A status message received from an I/O completion port.
26 /// These statuses can be created via the `new` or `empty` constructors and then
27 /// provided to a completion port, or they are read out of a completion port.
28 /// The fields of each status are read through its accessor methods.
29 #[derive(Clone, Copy)]
30 pub struct CompletionStatus(OVERLAPPED_ENTRY
);
32 impl fmt
::Debug
for CompletionStatus
{
33 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
34 write
!(f
, "CompletionStatus(OVERLAPPED_ENTRY)")
38 unsafe impl Send
for CompletionStatus {}
39 unsafe impl Sync
for CompletionStatus {}
42 /// Creates a new I/O completion port with the specified concurrency value.
44 /// The number of threads given corresponds to the level of concurrency
45 /// allowed for threads associated with this port. Consult the Windows
46 /// documentation for more information about this value.
47 pub fn new(threads
: u32) -> io
::Result
<CompletionPort
> {
48 let ret
= unsafe { CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0 as *mut _, 0, threads) }
;
50 Err(io
::Error
::last_os_error())
53 handle
: Handle
::new(ret
),
58 /// Associates a new `HANDLE` to this I/O completion port.
60 /// This function will associate the given handle to this port with the
61 /// given `token` to be returned in status messages whenever it receives a
64 /// Any object which is convertible to a `HANDLE` via the `AsRawHandle`
65 /// trait can be provided to this function, such as `std::fs::File` and
67 pub fn add_handle
<T
: AsRawHandle
+ ?Sized
>(&self, token
: usize, t
: &T
) -> io
::Result
<()> {
68 self._add(token
, t
.as_raw_handle())
71 /// Associates a new `SOCKET` to this I/O completion port.
73 /// This function will associate the given socket to this port with the
74 /// given `token` to be returned in status messages whenever it receives a
77 /// Any object which is convertible to a `SOCKET` via the `AsRawSocket`
78 /// trait can be provided to this function, such as `std::net::TcpStream`
80 pub fn add_socket
<T
: AsRawSocket
+ ?Sized
>(&self, token
: usize, t
: &T
) -> io
::Result
<()> {
81 self._add(token
, t
.as_raw_socket() as HANDLE
)
84 fn _add(&self, token
: usize, handle
: HANDLE
) -> io
::Result
<()> {
85 assert_eq
!(mem
::size_of_val(&token
), mem
::size_of
::<ULONG_PTR
>());
87 unsafe { CreateIoCompletionPort(handle, self.handle.raw(), token as ULONG_PTR, 0) }
;
89 Err(io
::Error
::last_os_error())
91 debug_assert_eq
!(ret
, self.handle
.raw());
96 /// Dequeue a completion status from this I/O completion port.
98 /// This function will associate the calling thread with this completion
99 /// port and then wait for a status message to become available. The precise
100 /// semantics on when this function returns depends on the concurrency value
101 /// specified when the port was created.
103 /// A timeout can optionally be specified to this function. If `None` is
104 /// provided this function will not time out, and otherwise it will time out
105 /// after the specified duration has passed.
107 /// On success this will return the status message which was dequeued from
108 /// this completion port.
109 pub fn get(&self, timeout
: Option
<Duration
>) -> io
::Result
<CompletionStatus
> {
112 let mut overlapped
= 0 as *mut _
;
113 let timeout
= crate::dur2ms(timeout
);
115 GetQueuedCompletionStatus(
123 crate::cvt(ret
).map(|_
| {
124 CompletionStatus(OVERLAPPED_ENTRY
{
125 dwNumberOfBytesTransferred
: bytes
,
126 lpCompletionKey
: token
,
127 lpOverlapped
: overlapped
,
133 /// Dequeues a number of completion statuses from this I/O completion port.
135 /// This function is the same as `get` except that it may return more than
136 /// one status. A buffer of "zero" statuses is provided (the contents are
137 /// not read) and then on success this function will return a sub-slice of
138 /// statuses which represent those which were dequeued from this port. This
139 /// function does not wait to fill up the entire list of statuses provided.
141 /// Like with `get`, a timeout may be specified for this operation.
144 list
: &'a
mut [CompletionStatus
],
145 timeout
: Option
<Duration
>,
146 ) -> io
::Result
<&'a
mut [CompletionStatus
]> {
148 mem
::size_of
::<CompletionStatus
>(),
149 mem
::size_of
::<OVERLAPPED_ENTRY
>()
152 let timeout
= crate::dur2ms(timeout
);
153 let len
= cmp
::min(list
.len(), <ULONG
>::max_value() as usize) as ULONG
;
155 GetQueuedCompletionStatusEx(
157 list
.as_ptr() as *mut _
,
164 match crate::cvt(ret
) {
165 Ok(_
) => Ok(&mut list
[..removed
as usize]),
170 /// Posts a new completion status onto this I/O completion port.
172 /// This function will post the given status, with custom parameters, to the
173 /// port. Threads blocked in `get` or `get_many` will eventually receive
175 pub fn post(&self, status
: CompletionStatus
) -> io
::Result
<()> {
177 PostQueuedCompletionStatus(
179 status
.0.dwNumberOfBytesTransferred
,
180 status
.0.lpCompletionKey
,
181 status
.0.lpOverlapped
,
184 crate::cvt(ret
).map(|_
| ())
188 impl AsRawHandle
for CompletionPort
{
189 fn as_raw_handle(&self) -> HANDLE
{
194 impl FromRawHandle
for CompletionPort
{
195 unsafe fn from_raw_handle(handle
: HANDLE
) -> CompletionPort
{
197 handle
: Handle
::new(handle
),
202 impl IntoRawHandle
for CompletionPort
{
203 fn into_raw_handle(self) -> HANDLE
{
204 self.handle
.into_raw()
208 impl CompletionStatus
{
209 /// Creates a new completion status with the provided parameters.
211 /// This function is useful when creating a status to send to a port with
212 /// the `post` method. The parameters are opaquely passed through and not
213 /// interpreted by the system at all.
214 pub fn new(bytes
: u32, token
: usize, overlapped
: *mut Overlapped
) -> CompletionStatus
{
215 assert_eq
!(mem
::size_of_val(&token
), mem
::size_of
::<ULONG_PTR
>());
216 CompletionStatus(OVERLAPPED_ENTRY
{
217 dwNumberOfBytesTransferred
: bytes
,
218 lpCompletionKey
: token
as ULONG_PTR
,
219 lpOverlapped
: overlapped
as *mut _
,
224 /// Creates a new borrowed completion status from the borrowed
225 /// `OVERLAPPED_ENTRY` argument provided.
227 /// This method will wrap the `OVERLAPPED_ENTRY` in a `CompletionStatus`,
228 /// returning the wrapped structure.
229 pub fn from_entry(entry
: &OVERLAPPED_ENTRY
) -> &CompletionStatus
{
230 unsafe { &*(entry as *const _ as *const _) }
233 /// Creates a new "zero" completion status.
235 /// This function is useful when creating a stack buffer or vector of
236 /// completion statuses to be passed to the `get_many` function.
237 pub fn zero() -> CompletionStatus
{
238 CompletionStatus
::new(0, 0, 0 as *mut _
)
241 /// Returns the number of bytes that were transferred for the I/O operation
242 /// associated with this completion status.
243 pub fn bytes_transferred(&self) -> u32 {
244 self.0.dwNumberOfBytesTransferred
247 /// Returns the completion key value associated with the file handle whose
248 /// I/O operation has completed.
250 /// A completion key is a per-handle key that is specified when it is added
251 /// to an I/O completion port via `add_handle` or `add_socket`.
252 pub fn token(&self) -> usize {
253 self.0.lpCompletionKey
as usize
256 /// Returns a pointer to the `Overlapped` structure that was specified when
257 /// the I/O operation was started.
258 pub fn overlapped(&self) -> *mut OVERLAPPED
{
262 /// Returns a pointer to the internal `OVERLAPPED_ENTRY` object.
263 pub fn entry(&self) -> &OVERLAPPED_ENTRY
{
271 use std
::time
::Duration
;
273 use winapi
::shared
::basetsd
::*;
274 use winapi
::shared
::winerror
::*;
276 use crate::iocp
::{CompletionPort, CompletionStatus}
;
280 fn is_send_sync
<T
: Send
+ Sync
>() {}
281 is_send_sync
::<CompletionPort
>();
285 fn token_right_size() {
286 assert_eq
!(mem
::size_of
::<usize>(), mem
::size_of
::<ULONG_PTR
>());
291 let c
= CompletionPort
::new(1).unwrap();
292 let err
= c
.get(Some(Duration
::from_millis(1))).unwrap_err();
293 assert_eq
!(err
.raw_os_error(), Some(WAIT_TIMEOUT
as i32));
298 let c
= CompletionPort
::new(1).unwrap();
299 c
.post(CompletionStatus
::new(1, 2, 3 as *mut _
)).unwrap();
300 let s
= c
.get(None
).unwrap();
301 assert_eq
!(s
.bytes_transferred(), 1);
302 assert_eq
!(s
.token(), 2);
303 assert_eq
!(s
.overlapped(), 3 as *mut _
);
308 let c
= CompletionPort
::new(1).unwrap();
310 c
.post(CompletionStatus
::new(1, 2, 3 as *mut _
)).unwrap();
311 c
.post(CompletionStatus
::new(4, 5, 6 as *mut _
)).unwrap();
313 let mut s
= vec
![CompletionStatus
::zero(); 4];
315 let s
= c
.get_many(&mut s
, None
).unwrap();
316 assert_eq
!(s
.len(), 2);
317 assert_eq
!(s
[0].bytes_transferred(), 1);
318 assert_eq
!(s
[0].token(), 2);
319 assert_eq
!(s
[0].overlapped(), 3 as *mut _
);
320 assert_eq
!(s
[1].bytes_transferred(), 4);
321 assert_eq
!(s
[1].token(), 5);
322 assert_eq
!(s
[1].overlapped(), 6 as *mut _
);
324 assert_eq
!(s
[2].bytes_transferred(), 0);
325 assert_eq
!(s
[2].token(), 0);
326 assert_eq
!(s
[2].overlapped(), 0 as *mut _
);