1 //! Bindings to IOCP, I/O Completion Ports
7 use std
::os
::windows
::io
::*;
8 use std
::time
::Duration
;
11 use winapi
::shared
::basetsd
::*;
12 use winapi
::shared
::ntdef
::*;
13 use winapi
::um
::minwinbase
::*;
14 use winapi
::um
::handleapi
::*;
15 use winapi
::um
::ioapiset
::*;
18 /// A handle to an Windows I/O Completion Port.
20 pub struct CompletionPort
{
24 /// A status message received from an I/O completion port.
26 /// These statuses can be created via the `new` or `empty` constructors and then
27 /// provided to a completion port, or they are read out of a completion port.
28 /// The fields of each status are read through its accessor methods.
29 #[derive(Clone, Copy)]
30 pub struct CompletionStatus(OVERLAPPED_ENTRY
);
32 impl fmt
::Debug
for CompletionStatus
{
33 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
34 write
!(f
, "CompletionStatus(OVERLAPPED_ENTRY)")
38 unsafe impl Send
for CompletionStatus {}
39 unsafe impl Sync
for CompletionStatus {}
42 /// Creates a new I/O completion port with the specified concurrency value.
44 /// The number of threads given corresponds to the level of concurrency
45 /// allowed for threads associated with this port. Consult the Windows
46 /// documentation for more information about this value.
47 pub fn new(threads
: u32) -> io
::Result
<CompletionPort
> {
49 CreateIoCompletionPort(INVALID_HANDLE_VALUE
, 0 as *mut _
,
53 Err(io
::Error
::last_os_error())
55 Ok(CompletionPort { handle: Handle::new(ret) }
)
59 /// Associates a new `HANDLE` to this I/O completion port.
61 /// This function will associate the given handle to this port with the
62 /// given `token` to be returned in status messages whenever it receives a
65 /// Any object which is convertible to a `HANDLE` via the `AsRawHandle`
66 /// trait can be provided to this function, such as `std::fs::File` and
68 pub fn add_handle
<T
: AsRawHandle
+ ?Sized
>(&self, token
: usize,
69 t
: &T
) -> io
::Result
<()> {
70 self._add(token
, t
.as_raw_handle())
73 /// Associates a new `SOCKET` to this I/O completion port.
75 /// This function will associate the given socket to this port with the
76 /// given `token` to be returned in status messages whenever it receives a
79 /// Any object which is convertible to a `SOCKET` via the `AsRawSocket`
80 /// trait can be provided to this function, such as `std::net::TcpStream`
82 pub fn add_socket
<T
: AsRawSocket
+ ?Sized
>(&self, token
: usize,
83 t
: &T
) -> io
::Result
<()> {
84 self._add(token
, t
.as_raw_socket() as HANDLE
)
87 fn _add(&self, token
: usize, handle
: HANDLE
) -> io
::Result
<()> {
88 assert_eq
!(mem
::size_of_val(&token
), mem
::size_of
::<ULONG_PTR
>());
90 CreateIoCompletionPort(handle
, self.handle
.raw(),
91 token
as ULONG_PTR
, 0)
94 Err(io
::Error
::last_os_error())
96 debug_assert_eq
!(ret
, self.handle
.raw());
101 /// Dequeue a completion status from this I/O completion port.
103 /// This function will associate the calling thread with this completion
104 /// port and then wait for a status message to become available. The precise
105 /// semantics on when this function returns depends on the concurrency value
106 /// specified when the port was created.
108 /// A timeout can optionally be specified to this function. If `None` is
109 /// provided this function will not time out, and otherwise it will time out
110 /// after the specified duration has passed.
112 /// On success this will return the status message which was dequeued from
113 /// this completion port.
114 pub fn get(&self, timeout
: Option
<Duration
>) -> io
::Result
<CompletionStatus
> {
117 let mut overlapped
= 0 as *mut _
;
118 let timeout
= ::dur2ms(timeout
);
120 GetQueuedCompletionStatus(self.handle
.raw(),
127 CompletionStatus(OVERLAPPED_ENTRY
{
128 dwNumberOfBytesTransferred
: bytes
,
129 lpCompletionKey
: token
,
130 lpOverlapped
: overlapped
,
136 /// Dequeues a number of completion statuses from this I/O completion port.
138 /// This function is the same as `get` except that it may return more than
139 /// one status. A buffer of "zero" statuses is provided (the contents are
140 /// not read) and then on success this function will return a sub-slice of
141 /// statuses which represent those which were dequeued from this port. This
142 /// function does not wait to fill up the entire list of statuses provided.
144 /// Like with `get`, a timeout may be specified for this operation.
145 pub fn get_many
<'a
>(&self,
146 list
: &'a
mut [CompletionStatus
],
147 timeout
: Option
<Duration
>)
148 -> io
::Result
<&'a
mut [CompletionStatus
]>
150 debug_assert_eq
!(mem
::size_of
::<CompletionStatus
>(),
151 mem
::size_of
::<OVERLAPPED_ENTRY
>());
153 let timeout
= ::dur2ms(timeout
);
154 let len
= cmp
::min(list
.len(), <ULONG
>::max_value() as usize) as ULONG
;
156 GetQueuedCompletionStatusEx(self.handle
.raw(),
157 list
.as_ptr() as *mut _
,
164 Ok(_
) => Ok(&mut list
[..removed
as usize]),
169 /// Posts a new completion status onto this I/O completion port.
171 /// This function will post the given status, with custom parameters, to the
172 /// port. Threads blocked in `get` or `get_many` will eventually receive
174 pub fn post(&self, status
: CompletionStatus
) -> io
::Result
<()> {
176 PostQueuedCompletionStatus(self.handle
.raw(),
177 status
.0.dwNumberOfBytesTransferred
,
178 status
.0.lpCompletionKey
,
179 status
.0.lpOverlapped
)
181 ::cvt(ret
).map(|_
| ())
185 impl AsRawHandle
for CompletionPort
{
186 fn as_raw_handle(&self) -> HANDLE
{
191 impl FromRawHandle
for CompletionPort
{
192 unsafe fn from_raw_handle(handle
: HANDLE
) -> CompletionPort
{
193 CompletionPort { handle: Handle::new(handle) }
197 impl IntoRawHandle
for CompletionPort
{
198 fn into_raw_handle(self) -> HANDLE
{
199 self.handle
.into_raw()
203 impl CompletionStatus
{
204 /// Creates a new completion status with the provided parameters.
206 /// This function is useful when creating a status to send to a port with
207 /// the `post` method. The parameters are opaquely passed through and not
208 /// interpreted by the system at all.
209 pub fn new(bytes
: u32, token
: usize, overlapped
: *mut Overlapped
)
210 -> CompletionStatus
{
211 assert_eq
!(mem
::size_of_val(&token
), mem
::size_of
::<ULONG_PTR
>());
212 CompletionStatus(OVERLAPPED_ENTRY
{
213 dwNumberOfBytesTransferred
: bytes
,
214 lpCompletionKey
: token
as ULONG_PTR
,
215 lpOverlapped
: overlapped
as *mut _
,
220 /// Creates a new borrowed completion status from the borrowed
221 /// `OVERLAPPED_ENTRY` argument provided.
223 /// This method will wrap the `OVERLAPPED_ENTRY` in a `CompletionStatus`,
224 /// returning the wrapped structure.
225 pub fn from_entry(entry
: &OVERLAPPED_ENTRY
) -> &CompletionStatus
{
226 unsafe { &*(entry as *const _ as *const _) }
229 /// Creates a new "zero" completion status.
231 /// This function is useful when creating a stack buffer or vector of
232 /// completion statuses to be passed to the `get_many` function.
233 pub fn zero() -> CompletionStatus
{
234 CompletionStatus
::new(0, 0, 0 as *mut _
)
237 /// Returns the number of bytes that were transferred for the I/O operation
238 /// associated with this completion status.
239 pub fn bytes_transferred(&self) -> u32 {
240 self.0.dwNumberOfBytesTransferred
243 /// Returns the completion key value associated with the file handle whose
244 /// I/O operation has completed.
246 /// A completion key is a per-handle key that is specified when it is added
247 /// to an I/O completion port via `add_handle` or `add_socket`.
248 pub fn token(&self) -> usize {
249 self.0.lpCompletionKey
as usize
252 /// Returns a pointer to the `Overlapped` structure that was specified when
253 /// the I/O operation was started.
254 pub fn overlapped(&self) -> *mut OVERLAPPED
{
258 /// Returns a pointer to the internal `OVERLAPPED_ENTRY` object.
259 pub fn entry(&self) -> &OVERLAPPED_ENTRY
{
267 use std
::time
::Duration
;
269 use winapi
::shared
::basetsd
::*;
270 use winapi
::shared
::winerror
::*;
272 use iocp
::{CompletionPort, CompletionStatus}
;
276 fn is_send_sync
<T
: Send
+ Sync
>() {}
277 is_send_sync
::<CompletionPort
>();
281 fn token_right_size() {
282 assert_eq
!(mem
::size_of
::<usize>(), mem
::size_of
::<ULONG_PTR
>());
287 let c
= CompletionPort
::new(1).unwrap();
288 let err
= c
.get(Some(Duration
::from_millis(1))).unwrap_err();
289 assert_eq
!(err
.raw_os_error(), Some(WAIT_TIMEOUT
as i32));
294 let c
= CompletionPort
::new(1).unwrap();
295 c
.post(CompletionStatus
::new(1, 2, 3 as *mut _
)).unwrap();
296 let s
= c
.get(None
).unwrap();
297 assert_eq
!(s
.bytes_transferred(), 1);
298 assert_eq
!(s
.token(), 2);
299 assert_eq
!(s
.overlapped(), 3 as *mut _
);
304 let c
= CompletionPort
::new(1).unwrap();
306 c
.post(CompletionStatus
::new(1, 2, 3 as *mut _
)).unwrap();
307 c
.post(CompletionStatus
::new(4, 5, 6 as *mut _
)).unwrap();
309 let mut s
= vec
![CompletionStatus
::zero(); 4];
311 let s
= c
.get_many(&mut s
, None
).unwrap();
312 assert_eq
!(s
.len(), 2);
313 assert_eq
!(s
[0].bytes_transferred(), 1);
314 assert_eq
!(s
[0].token(), 2);
315 assert_eq
!(s
[0].overlapped(), 3 as *mut _
);
316 assert_eq
!(s
[1].bytes_transferred(), 4);
317 assert_eq
!(s
[1].token(), 5);
318 assert_eq
!(s
[1].overlapped(), 6 as *mut _
);
320 assert_eq
!(s
[2].bytes_transferred(), 0);
321 assert_eq
!(s
[2].token(), 0);
322 assert_eq
!(s
[2].overlapped(), 0 as *mut _
);