1 // Copyright 2015 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
12 use os
::windows
::prelude
::*;
18 use rand
::{self, Rng}
;
21 use sys
::fs
::{File, OpenOptions}
;
22 use sys
::handle
::Handle
;
24 ////////////////////////////////////////////////////////////////////////////////
26 ////////////////////////////////////////////////////////////////////////////////
32 pub fn anon_pipe() -> io
::Result
<(AnonPipe
, AnonPipe
)> {
33 // Note that we specifically do *not* use `CreatePipe` here because
34 // unfortunately the anonymous pipes returned do not support overlapped
37 // Instead, we create a "hopefully unique" name and create a named pipe
38 // which has overlapped operations enabled.
40 // Once we do this, we connect do it as usual via `CreateFileW`, and then we
41 // return those reader/writer halves.
48 let key
: u64 = rand
::thread_rng().gen();
49 name
= format
!(r
"\\.\pipe\__rust_anonymous_pipe1__.{}.{}",
50 c
::GetCurrentProcessId(),
52 let wide_name
= OsStr
::new(&name
)
57 let handle
= c
::CreateNamedPipeW(wide_name
.as_ptr(),
58 c
::PIPE_ACCESS_INBOUND
|
59 c
::FILE_FLAG_FIRST_PIPE_INSTANCE
|
60 c
::FILE_FLAG_OVERLAPPED
,
62 c
::PIPE_READMODE_BYTE
|
64 c
::PIPE_REJECT_REMOTE_CLIENTS
,
71 // We pass the FILE_FLAG_FIRST_PIPE_INSTANCE flag above, and we're
72 // also just doing a best effort at selecting a unique name. If
73 // ERROR_ACCESS_DENIED is returned then it could mean that we
74 // accidentally conflicted with an already existing pipe, so we try
77 // Don't try again too much though as this could also perhaps be a
79 if handle
== c
::INVALID_HANDLE_VALUE
{
80 let err
= io
::Error
::last_os_error();
82 err
.raw_os_error() == Some(c
::ERROR_ACCESS_DENIED
as i32) {
87 reader
= Handle
::new(handle
);
91 // Connect to the named pipe we just created in write-only mode (also
92 // overlapped for async I/O below).
93 let mut opts
= OpenOptions
::new();
97 opts
.attributes(c
::FILE_FLAG_OVERLAPPED
);
98 let writer
= File
::open(Path
::new(&name
), &opts
)?
;
99 let writer
= AnonPipe { inner: writer.into_handle() }
;
101 Ok((AnonPipe { inner: reader }
, AnonPipe { inner: writer.into_handle() }
))
106 pub fn handle(&self) -> &Handle { &self.inner }
107 pub fn into_handle(self) -> Handle { self.inner }
109 pub fn read(&self, buf
: &mut [u8]) -> io
::Result
<usize> {
113 pub fn read_to_end(&self, buf
: &mut Vec
<u8>) -> io
::Result
<usize> {
114 self.inner
.read_to_end(buf
)
117 pub fn write(&self, buf
: &[u8]) -> io
::Result
<usize> {
118 self.inner
.write(buf
)
122 pub fn read2(p1
: AnonPipe
,
125 v2
: &mut Vec
<u8>) -> io
::Result
<()> {
126 let p1
= p1
.into_handle();
127 let p2
= p2
.into_handle();
129 let mut p1
= AsyncPipe
::new(p1
, v1
)?
;
130 let mut p2
= AsyncPipe
::new(p2
, v2
)?
;
131 let objs
= [p1
.event
.raw(), p2
.event
.raw()];
133 // In a loop we wait for either pipe's scheduled read operation to complete.
134 // If the operation completes with 0 bytes, that means EOF was reached, in
135 // which case we just finish out the other pipe entirely.
137 // Note that overlapped I/O is in general super unsafe because we have to
138 // be careful to ensure that all pointers in play are valid for the entire
139 // duration of the I/O operation (where tons of operations can also fail).
140 // The destructor for `AsyncPipe` ends up taking care of most of this.
143 c
::WaitForMultipleObjects(2, objs
.as_ptr(), c
::FALSE
, c
::INFINITE
)
145 if res
== c
::WAIT_OBJECT_0
{
146 if !p1
.result()?
|| !p1
.schedule_read()?
{
149 } else if res
== c
::WAIT_OBJECT_0
+ 1 {
150 if !p2
.result()?
|| !p2
.schedule_read()?
{
154 return Err(io
::Error
::last_os_error())
159 struct AsyncPipe
<'a
> {
162 overlapped
: Box
<c
::OVERLAPPED
>, // needs a stable address
163 dst
: &'a
mut Vec
<u8>,
167 #[derive(PartialEq, Debug)]
174 impl<'a
> AsyncPipe
<'a
> {
175 fn new(pipe
: Handle
, dst
: &'a
mut Vec
<u8>) -> io
::Result
<AsyncPipe
<'a
>> {
176 // Create an event which we'll use to coordinate our overlapped
177 // opreations, this event will be used in WaitForMultipleObjects
178 // and passed as part of the OVERLAPPED handle.
180 // Note that we do a somewhat clever thing here by flagging the
181 // event as being manually reset and setting it initially to the
182 // signaled state. This means that we'll naturally fall through the
183 // WaitForMultipleObjects call above for pipes created initially,
184 // and the only time an even will go back to "unset" will be once an
185 // I/O operation is successfully scheduled (what we want).
186 let event
= Handle
::new_event(true, true)?
;
187 let mut overlapped
: Box
<c
::OVERLAPPED
> = unsafe {
188 Box
::new(mem
::zeroed())
190 overlapped
.hEvent
= event
.raw();
193 overlapped
: overlapped
,
196 state
: State
::NotReading
,
200 /// Executes an overlapped read operation.
202 /// Must not currently be reading, and returns whether the pipe is currently
203 /// at EOF or not. If the pipe is not at EOF then `result()` must be called
204 /// to complete the read later on (may block), but if the pipe is at EOF
205 /// then `result()` should not be called as it will just block forever.
206 fn schedule_read(&mut self) -> io
::Result
<bool
> {
207 assert_eq
!(self.state
, State
::NotReading
);
209 let slice
= slice_to_end(self.dst
);
210 self.pipe
.read_overlapped(slice
, &mut *self.overlapped
)?
213 // If this read finished immediately then our overlapped event will
214 // remain signaled (it was signaled coming in here) and we'll progress
215 // down to the method below.
217 // Otherwise the I/O operation is scheduled and the system set our event
218 // to not signaled, so we flag ourselves into the reading state and move
220 self.state
= match amt
{
221 Some(0) => return Ok(false),
222 Some(amt
) => State
::Read(amt
),
223 None
=> State
::Reading
,
228 /// Wait for the result of the overlapped operation previously executed.
230 /// Takes a parameter `wait` which indicates if this pipe is currently being
231 /// read whether the function should block waiting for the read to complete.
235 /// * `true` - finished any pending read and the pipe is not at EOF (keep
237 /// * `false` - finished any pending read and pipe is at EOF (stop issuing
239 fn result(&mut self) -> io
::Result
<bool
> {
240 let amt
= match self.state
{
241 State
::NotReading
=> return Ok(true),
243 self.pipe
.overlapped_result(&mut *self.overlapped
, true)?
245 State
::Read(amt
) => amt
,
247 self.state
= State
::NotReading
;
249 let len
= self.dst
.len();
250 self.dst
.set_len(len
+ amt
);
255 /// Finishes out reading this pipe entirely.
257 /// Waits for any pending and schedule read, and then calls `read_to_end`
258 /// if necessary to read all the remaining information.
259 fn finish(&mut self) -> io
::Result
<()> {
260 while self.result()?
&& self.schedule_read()?
{
267 impl<'a
> Drop
for AsyncPipe
<'a
> {
274 // If we have a pending read operation, then we have to make sure that
275 // it's *done* before we actually drop this type. The kernel requires
276 // that the `OVERLAPPED` and buffer pointers are valid for the entire
279 // To do that, we call `CancelIo` to cancel any pending operation, and
280 // if that succeeds we wait for the overlapped result.
282 // If anything here fails, there's not really much we can do, so we leak
283 // the buffer/OVERLAPPED pointers to ensure we're at least memory safe.
284 if self.pipe
.cancel_io().is_err() || self.result().is_err() {
285 let buf
= mem
::replace(self.dst
, Vec
::new());
286 let overlapped
= Box
::new(unsafe { mem::zeroed() }
);
287 let overlapped
= mem
::replace(&mut self.overlapped
, overlapped
);
288 mem
::forget((buf
, overlapped
));
293 unsafe fn slice_to_end(v
: &mut Vec
<u8>) -> &mut [u8] {
294 if v
.capacity() == 0 {
297 if v
.capacity() == v
.len() {
300 slice
::from_raw_parts_mut(v
.as_mut_ptr().offset(v
.len() as isize),
301 v
.capacity() - v
.len())