]> git.proxmox.com Git - rustc.git/blame - src/libstd/sys/windows/pipe.rs
Imported Upstream version 1.9.0+dfsg1
[rustc.git] / src / libstd / sys / windows / pipe.rs
CommitLineData
85aaf69f
SL
1// Copyright 2015 The Rust Project Developers. See the COPYRIGHT
2// file at the top-level directory of this distribution and at
3// http://rust-lang.org/COPYRIGHT.
4//
5// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8// option. This file may not be copied, modified, or distributed
9// except according to those terms.
10
54a0048b
SL
11use prelude::v1::*;
12use os::windows::prelude::*;
13
14use ffi::OsStr;
15use path::Path;
85aaf69f 16use io;
54a0048b
SL
17use mem;
18use rand::{self, Rng};
19use slice;
9346a6ac 20use sys::c;
54a0048b 21use sys::fs::{File, OpenOptions};
9346a6ac 22use sys::handle::Handle;
85aaf69f
SL
23
24////////////////////////////////////////////////////////////////////////////////
25// Anonymous pipes
26////////////////////////////////////////////////////////////////////////////////
27
28pub struct AnonPipe {
9346a6ac 29 inner: Handle,
85aaf69f
SL
30}
31
9346a6ac 32pub fn anon_pipe() -> io::Result<(AnonPipe, AnonPipe)> {
54a0048b
SL
33 // Note that we specifically do *not* use `CreatePipe` here because
34 // unfortunately the anonymous pipes returned do not support overlapped
35 // operations.
36 //
37 // Instead, we create a "hopefully unique" name and create a named pipe
38 // which has overlapped operations enabled.
39 //
40 // Once we do this, we connect do it as usual via `CreateFileW`, and then we
41 // return those reader/writer halves.
42 unsafe {
43 let reader;
44 let mut name;
45 let mut tries = 0;
46 loop {
47 tries += 1;
48 let key: u64 = rand::thread_rng().gen();
49 name = format!(r"\\.\pipe\__rust_anonymous_pipe1__.{}.{}",
50 c::GetCurrentProcessId(),
51 key);
52 let wide_name = OsStr::new(&name)
53 .encode_wide()
54 .chain(Some(0))
55 .collect::<Vec<_>>();
56
57 let handle = c::CreateNamedPipeW(wide_name.as_ptr(),
58 c::PIPE_ACCESS_INBOUND |
59 c::FILE_FLAG_FIRST_PIPE_INSTANCE |
60 c::FILE_FLAG_OVERLAPPED,
61 c::PIPE_TYPE_BYTE |
62 c::PIPE_READMODE_BYTE |
63 c::PIPE_WAIT |
64 c::PIPE_REJECT_REMOTE_CLIENTS,
65 1,
66 4096,
67 4096,
68 0,
69 0 as *mut _);
70
71 // We pass the FILE_FLAG_FIRST_PIPE_INSTANCE flag above, and we're
72 // also just doing a best effort at selecting a unique name. If
73 // ERROR_ACCESS_DENIED is returned then it could mean that we
74 // accidentally conflicted with an already existing pipe, so we try
75 // again.
76 //
77 // Don't try again too much though as this could also perhaps be a
78 // legit error.
79 if handle == c::INVALID_HANDLE_VALUE {
80 let err = io::Error::last_os_error();
81 if tries < 10 &&
82 err.raw_os_error() == Some(c::ERROR_ACCESS_DENIED as i32) {
83 continue
84 }
85 return Err(err)
86 }
87 reader = Handle::new(handle);
88 break
89 }
90
91 // Connect to the named pipe we just created in write-only mode (also
92 // overlapped for async I/O below).
93 let mut opts = OpenOptions::new();
94 opts.write(true);
95 opts.read(false);
96 opts.share_mode(0);
97 opts.attributes(c::FILE_FLAG_OVERLAPPED);
98 let writer = File::open(Path::new(&name), &opts)?;
99 let writer = AnonPipe { inner: writer.into_handle() };
100
101 Ok((AnonPipe { inner: reader }, AnonPipe { inner: writer.into_handle() }))
102 }
85aaf69f
SL
103}
104
105impl AnonPipe {
9346a6ac 106 pub fn handle(&self) -> &Handle { &self.inner }
c1a9b12d 107 pub fn into_handle(self) -> Handle { self.inner }
85aaf69f
SL
108
109 pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
9346a6ac 110 self.inner.read(buf)
85aaf69f
SL
111 }
112
54a0048b
SL
113 pub fn read_to_end(&self, buf: &mut Vec<u8>) -> io::Result<usize> {
114 self.inner.read_to_end(buf)
115 }
116
85aaf69f 117 pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
9346a6ac 118 self.inner.write(buf)
85aaf69f
SL
119 }
120}
54a0048b
SL
121
122pub fn read2(p1: AnonPipe,
123 v1: &mut Vec<u8>,
124 p2: AnonPipe,
125 v2: &mut Vec<u8>) -> io::Result<()> {
126 let p1 = p1.into_handle();
127 let p2 = p2.into_handle();
128
129 let mut p1 = AsyncPipe::new(p1, v1)?;
130 let mut p2 = AsyncPipe::new(p2, v2)?;
131 let objs = [p1.event.raw(), p2.event.raw()];
132
133 // In a loop we wait for either pipe's scheduled read operation to complete.
134 // If the operation completes with 0 bytes, that means EOF was reached, in
135 // which case we just finish out the other pipe entirely.
136 //
137 // Note that overlapped I/O is in general super unsafe because we have to
138 // be careful to ensure that all pointers in play are valid for the entire
139 // duration of the I/O operation (where tons of operations can also fail).
140 // The destructor for `AsyncPipe` ends up taking care of most of this.
141 loop {
142 let res = unsafe {
143 c::WaitForMultipleObjects(2, objs.as_ptr(), c::FALSE, c::INFINITE)
144 };
145 if res == c::WAIT_OBJECT_0 {
146 if !p1.result()? || !p1.schedule_read()? {
147 return p2.finish()
148 }
149 } else if res == c::WAIT_OBJECT_0 + 1 {
150 if !p2.result()? || !p2.schedule_read()? {
151 return p1.finish()
152 }
153 } else {
154 return Err(io::Error::last_os_error())
155 }
156 }
157}
158
159struct AsyncPipe<'a> {
160 pipe: Handle,
161 event: Handle,
162 overlapped: Box<c::OVERLAPPED>, // needs a stable address
163 dst: &'a mut Vec<u8>,
164 state: State,
165}
166
167#[derive(PartialEq, Debug)]
168enum State {
169 NotReading,
170 Reading,
171 Read(usize),
172}
173
174impl<'a> AsyncPipe<'a> {
175 fn new(pipe: Handle, dst: &'a mut Vec<u8>) -> io::Result<AsyncPipe<'a>> {
176 // Create an event which we'll use to coordinate our overlapped
177 // opreations, this event will be used in WaitForMultipleObjects
178 // and passed as part of the OVERLAPPED handle.
179 //
180 // Note that we do a somewhat clever thing here by flagging the
181 // event as being manually reset and setting it initially to the
182 // signaled state. This means that we'll naturally fall through the
183 // WaitForMultipleObjects call above for pipes created initially,
184 // and the only time an even will go back to "unset" will be once an
185 // I/O operation is successfully scheduled (what we want).
186 let event = Handle::new_event(true, true)?;
187 let mut overlapped: Box<c::OVERLAPPED> = unsafe {
188 Box::new(mem::zeroed())
189 };
190 overlapped.hEvent = event.raw();
191 Ok(AsyncPipe {
192 pipe: pipe,
193 overlapped: overlapped,
194 event: event,
195 dst: dst,
196 state: State::NotReading,
197 })
198 }
199
200 /// Executes an overlapped read operation.
201 ///
202 /// Must not currently be reading, and returns whether the pipe is currently
203 /// at EOF or not. If the pipe is not at EOF then `result()` must be called
204 /// to complete the read later on (may block), but if the pipe is at EOF
205 /// then `result()` should not be called as it will just block forever.
206 fn schedule_read(&mut self) -> io::Result<bool> {
207 assert_eq!(self.state, State::NotReading);
208 let amt = unsafe {
209 let slice = slice_to_end(self.dst);
210 self.pipe.read_overlapped(slice, &mut *self.overlapped)?
211 };
212
213 // If this read finished immediately then our overlapped event will
214 // remain signaled (it was signaled coming in here) and we'll progress
215 // down to the method below.
216 //
217 // Otherwise the I/O operation is scheduled and the system set our event
218 // to not signaled, so we flag ourselves into the reading state and move
219 // on.
220 self.state = match amt {
221 Some(0) => return Ok(false),
222 Some(amt) => State::Read(amt),
223 None => State::Reading,
224 };
225 Ok(true)
226 }
227
228 /// Wait for the result of the overlapped operation previously executed.
229 ///
230 /// Takes a parameter `wait` which indicates if this pipe is currently being
231 /// read whether the function should block waiting for the read to complete.
232 ///
233 /// Return values:
234 ///
235 /// * `true` - finished any pending read and the pipe is not at EOF (keep
236 /// going)
237 /// * `false` - finished any pending read and pipe is at EOF (stop issuing
238 /// reads)
239 fn result(&mut self) -> io::Result<bool> {
240 let amt = match self.state {
241 State::NotReading => return Ok(true),
242 State::Reading => {
243 self.pipe.overlapped_result(&mut *self.overlapped, true)?
244 }
245 State::Read(amt) => amt,
246 };
247 self.state = State::NotReading;
248 unsafe {
249 let len = self.dst.len();
250 self.dst.set_len(len + amt);
251 }
252 Ok(amt != 0)
253 }
254
255 /// Finishes out reading this pipe entirely.
256 ///
257 /// Waits for any pending and schedule read, and then calls `read_to_end`
258 /// if necessary to read all the remaining information.
259 fn finish(&mut self) -> io::Result<()> {
260 while self.result()? && self.schedule_read()? {
261 // ...
262 }
263 Ok(())
264 }
265}
266
267impl<'a> Drop for AsyncPipe<'a> {
268 fn drop(&mut self) {
269 match self.state {
270 State::Reading => {}
271 _ => return,
272 }
273
274 // If we have a pending read operation, then we have to make sure that
275 // it's *done* before we actually drop this type. The kernel requires
276 // that the `OVERLAPPED` and buffer pointers are valid for the entire
277 // I/O operation.
278 //
279 // To do that, we call `CancelIo` to cancel any pending operation, and
280 // if that succeeds we wait for the overlapped result.
281 //
282 // If anything here fails, there's not really much we can do, so we leak
283 // the buffer/OVERLAPPED pointers to ensure we're at least memory safe.
284 if self.pipe.cancel_io().is_err() || self.result().is_err() {
285 let buf = mem::replace(self.dst, Vec::new());
286 let overlapped = Box::new(unsafe { mem::zeroed() });
287 let overlapped = mem::replace(&mut self.overlapped, overlapped);
288 mem::forget((buf, overlapped));
289 }
290 }
291}
292
293unsafe fn slice_to_end(v: &mut Vec<u8>) -> &mut [u8] {
294 if v.capacity() == 0 {
295 v.reserve(16);
296 }
297 if v.capacity() == v.len() {
298 v.reserve(1);
299 }
300 slice::from_raw_parts_mut(v.as_mut_ptr().offset(v.len() as isize),
301 v.capacity() - v.len())
302}