]>
Commit | Line | Data |
---|---|---|
85aaf69f SL |
1 | // Copyright 2015 The Rust Project Developers. See the COPYRIGHT |
2 | // file at the top-level directory of this distribution and at | |
3 | // http://rust-lang.org/COPYRIGHT. | |
4 | // | |
5 | // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or | |
6 | // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license | |
7 | // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your | |
8 | // option. This file may not be copied, modified, or distributed | |
9 | // except according to those terms. | |
10 | ||
54a0048b SL |
11 | use prelude::v1::*; |
12 | use os::windows::prelude::*; | |
13 | ||
14 | use ffi::OsStr; | |
15 | use path::Path; | |
85aaf69f | 16 | use io; |
54a0048b SL |
17 | use mem; |
18 | use rand::{self, Rng}; | |
19 | use slice; | |
9346a6ac | 20 | use sys::c; |
54a0048b | 21 | use sys::fs::{File, OpenOptions}; |
9346a6ac | 22 | use sys::handle::Handle; |
85aaf69f SL |
23 | |
24 | //////////////////////////////////////////////////////////////////////////////// | |
25 | // Anonymous pipes | |
26 | //////////////////////////////////////////////////////////////////////////////// | |
27 | ||
28 | pub struct AnonPipe { | |
9346a6ac | 29 | inner: Handle, |
85aaf69f SL |
30 | } |
31 | ||
9346a6ac | 32 | pub fn anon_pipe() -> io::Result<(AnonPipe, AnonPipe)> { |
54a0048b SL |
33 | // Note that we specifically do *not* use `CreatePipe` here because |
34 | // unfortunately the anonymous pipes returned do not support overlapped | |
35 | // operations. | |
36 | // | |
37 | // Instead, we create a "hopefully unique" name and create a named pipe | |
38 | // which has overlapped operations enabled. | |
39 | // | |
40 | // Once we do this, we connect do it as usual via `CreateFileW`, and then we | |
41 | // return those reader/writer halves. | |
42 | unsafe { | |
43 | let reader; | |
44 | let mut name; | |
45 | let mut tries = 0; | |
46 | loop { | |
47 | tries += 1; | |
48 | let key: u64 = rand::thread_rng().gen(); | |
49 | name = format!(r"\\.\pipe\__rust_anonymous_pipe1__.{}.{}", | |
50 | c::GetCurrentProcessId(), | |
51 | key); | |
52 | let wide_name = OsStr::new(&name) | |
53 | .encode_wide() | |
54 | .chain(Some(0)) | |
55 | .collect::<Vec<_>>(); | |
56 | ||
57 | let handle = c::CreateNamedPipeW(wide_name.as_ptr(), | |
58 | c::PIPE_ACCESS_INBOUND | | |
59 | c::FILE_FLAG_FIRST_PIPE_INSTANCE | | |
60 | c::FILE_FLAG_OVERLAPPED, | |
61 | c::PIPE_TYPE_BYTE | | |
62 | c::PIPE_READMODE_BYTE | | |
63 | c::PIPE_WAIT | | |
64 | c::PIPE_REJECT_REMOTE_CLIENTS, | |
65 | 1, | |
66 | 4096, | |
67 | 4096, | |
68 | 0, | |
69 | 0 as *mut _); | |
70 | ||
71 | // We pass the FILE_FLAG_FIRST_PIPE_INSTANCE flag above, and we're | |
72 | // also just doing a best effort at selecting a unique name. If | |
73 | // ERROR_ACCESS_DENIED is returned then it could mean that we | |
74 | // accidentally conflicted with an already existing pipe, so we try | |
75 | // again. | |
76 | // | |
77 | // Don't try again too much though as this could also perhaps be a | |
78 | // legit error. | |
79 | if handle == c::INVALID_HANDLE_VALUE { | |
80 | let err = io::Error::last_os_error(); | |
81 | if tries < 10 && | |
82 | err.raw_os_error() == Some(c::ERROR_ACCESS_DENIED as i32) { | |
83 | continue | |
84 | } | |
85 | return Err(err) | |
86 | } | |
87 | reader = Handle::new(handle); | |
88 | break | |
89 | } | |
90 | ||
91 | // Connect to the named pipe we just created in write-only mode (also | |
92 | // overlapped for async I/O below). | |
93 | let mut opts = OpenOptions::new(); | |
94 | opts.write(true); | |
95 | opts.read(false); | |
96 | opts.share_mode(0); | |
97 | opts.attributes(c::FILE_FLAG_OVERLAPPED); | |
98 | let writer = File::open(Path::new(&name), &opts)?; | |
99 | let writer = AnonPipe { inner: writer.into_handle() }; | |
100 | ||
101 | Ok((AnonPipe { inner: reader }, AnonPipe { inner: writer.into_handle() })) | |
102 | } | |
85aaf69f SL |
103 | } |
104 | ||
105 | impl AnonPipe { | |
9346a6ac | 106 | pub fn handle(&self) -> &Handle { &self.inner } |
c1a9b12d | 107 | pub fn into_handle(self) -> Handle { self.inner } |
85aaf69f SL |
108 | |
109 | pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> { | |
9346a6ac | 110 | self.inner.read(buf) |
85aaf69f SL |
111 | } |
112 | ||
54a0048b SL |
113 | pub fn read_to_end(&self, buf: &mut Vec<u8>) -> io::Result<usize> { |
114 | self.inner.read_to_end(buf) | |
115 | } | |
116 | ||
85aaf69f | 117 | pub fn write(&self, buf: &[u8]) -> io::Result<usize> { |
9346a6ac | 118 | self.inner.write(buf) |
85aaf69f SL |
119 | } |
120 | } | |
54a0048b SL |
121 | |
122 | pub fn read2(p1: AnonPipe, | |
123 | v1: &mut Vec<u8>, | |
124 | p2: AnonPipe, | |
125 | v2: &mut Vec<u8>) -> io::Result<()> { | |
126 | let p1 = p1.into_handle(); | |
127 | let p2 = p2.into_handle(); | |
128 | ||
129 | let mut p1 = AsyncPipe::new(p1, v1)?; | |
130 | let mut p2 = AsyncPipe::new(p2, v2)?; | |
131 | let objs = [p1.event.raw(), p2.event.raw()]; | |
132 | ||
133 | // In a loop we wait for either pipe's scheduled read operation to complete. | |
134 | // If the operation completes with 0 bytes, that means EOF was reached, in | |
135 | // which case we just finish out the other pipe entirely. | |
136 | // | |
137 | // Note that overlapped I/O is in general super unsafe because we have to | |
138 | // be careful to ensure that all pointers in play are valid for the entire | |
139 | // duration of the I/O operation (where tons of operations can also fail). | |
140 | // The destructor for `AsyncPipe` ends up taking care of most of this. | |
141 | loop { | |
142 | let res = unsafe { | |
143 | c::WaitForMultipleObjects(2, objs.as_ptr(), c::FALSE, c::INFINITE) | |
144 | }; | |
145 | if res == c::WAIT_OBJECT_0 { | |
146 | if !p1.result()? || !p1.schedule_read()? { | |
147 | return p2.finish() | |
148 | } | |
149 | } else if res == c::WAIT_OBJECT_0 + 1 { | |
150 | if !p2.result()? || !p2.schedule_read()? { | |
151 | return p1.finish() | |
152 | } | |
153 | } else { | |
154 | return Err(io::Error::last_os_error()) | |
155 | } | |
156 | } | |
157 | } | |
158 | ||
159 | struct AsyncPipe<'a> { | |
160 | pipe: Handle, | |
161 | event: Handle, | |
162 | overlapped: Box<c::OVERLAPPED>, // needs a stable address | |
163 | dst: &'a mut Vec<u8>, | |
164 | state: State, | |
165 | } | |
166 | ||
167 | #[derive(PartialEq, Debug)] | |
168 | enum State { | |
169 | NotReading, | |
170 | Reading, | |
171 | Read(usize), | |
172 | } | |
173 | ||
174 | impl<'a> AsyncPipe<'a> { | |
175 | fn new(pipe: Handle, dst: &'a mut Vec<u8>) -> io::Result<AsyncPipe<'a>> { | |
176 | // Create an event which we'll use to coordinate our overlapped | |
177 | // opreations, this event will be used in WaitForMultipleObjects | |
178 | // and passed as part of the OVERLAPPED handle. | |
179 | // | |
180 | // Note that we do a somewhat clever thing here by flagging the | |
181 | // event as being manually reset and setting it initially to the | |
182 | // signaled state. This means that we'll naturally fall through the | |
183 | // WaitForMultipleObjects call above for pipes created initially, | |
184 | // and the only time an even will go back to "unset" will be once an | |
185 | // I/O operation is successfully scheduled (what we want). | |
186 | let event = Handle::new_event(true, true)?; | |
187 | let mut overlapped: Box<c::OVERLAPPED> = unsafe { | |
188 | Box::new(mem::zeroed()) | |
189 | }; | |
190 | overlapped.hEvent = event.raw(); | |
191 | Ok(AsyncPipe { | |
192 | pipe: pipe, | |
193 | overlapped: overlapped, | |
194 | event: event, | |
195 | dst: dst, | |
196 | state: State::NotReading, | |
197 | }) | |
198 | } | |
199 | ||
200 | /// Executes an overlapped read operation. | |
201 | /// | |
202 | /// Must not currently be reading, and returns whether the pipe is currently | |
203 | /// at EOF or not. If the pipe is not at EOF then `result()` must be called | |
204 | /// to complete the read later on (may block), but if the pipe is at EOF | |
205 | /// then `result()` should not be called as it will just block forever. | |
206 | fn schedule_read(&mut self) -> io::Result<bool> { | |
207 | assert_eq!(self.state, State::NotReading); | |
208 | let amt = unsafe { | |
209 | let slice = slice_to_end(self.dst); | |
210 | self.pipe.read_overlapped(slice, &mut *self.overlapped)? | |
211 | }; | |
212 | ||
213 | // If this read finished immediately then our overlapped event will | |
214 | // remain signaled (it was signaled coming in here) and we'll progress | |
215 | // down to the method below. | |
216 | // | |
217 | // Otherwise the I/O operation is scheduled and the system set our event | |
218 | // to not signaled, so we flag ourselves into the reading state and move | |
219 | // on. | |
220 | self.state = match amt { | |
221 | Some(0) => return Ok(false), | |
222 | Some(amt) => State::Read(amt), | |
223 | None => State::Reading, | |
224 | }; | |
225 | Ok(true) | |
226 | } | |
227 | ||
228 | /// Wait for the result of the overlapped operation previously executed. | |
229 | /// | |
230 | /// Takes a parameter `wait` which indicates if this pipe is currently being | |
231 | /// read whether the function should block waiting for the read to complete. | |
232 | /// | |
233 | /// Return values: | |
234 | /// | |
235 | /// * `true` - finished any pending read and the pipe is not at EOF (keep | |
236 | /// going) | |
237 | /// * `false` - finished any pending read and pipe is at EOF (stop issuing | |
238 | /// reads) | |
239 | fn result(&mut self) -> io::Result<bool> { | |
240 | let amt = match self.state { | |
241 | State::NotReading => return Ok(true), | |
242 | State::Reading => { | |
243 | self.pipe.overlapped_result(&mut *self.overlapped, true)? | |
244 | } | |
245 | State::Read(amt) => amt, | |
246 | }; | |
247 | self.state = State::NotReading; | |
248 | unsafe { | |
249 | let len = self.dst.len(); | |
250 | self.dst.set_len(len + amt); | |
251 | } | |
252 | Ok(amt != 0) | |
253 | } | |
254 | ||
255 | /// Finishes out reading this pipe entirely. | |
256 | /// | |
257 | /// Waits for any pending and schedule read, and then calls `read_to_end` | |
258 | /// if necessary to read all the remaining information. | |
259 | fn finish(&mut self) -> io::Result<()> { | |
260 | while self.result()? && self.schedule_read()? { | |
261 | // ... | |
262 | } | |
263 | Ok(()) | |
264 | } | |
265 | } | |
266 | ||
267 | impl<'a> Drop for AsyncPipe<'a> { | |
268 | fn drop(&mut self) { | |
269 | match self.state { | |
270 | State::Reading => {} | |
271 | _ => return, | |
272 | } | |
273 | ||
274 | // If we have a pending read operation, then we have to make sure that | |
275 | // it's *done* before we actually drop this type. The kernel requires | |
276 | // that the `OVERLAPPED` and buffer pointers are valid for the entire | |
277 | // I/O operation. | |
278 | // | |
279 | // To do that, we call `CancelIo` to cancel any pending operation, and | |
280 | // if that succeeds we wait for the overlapped result. | |
281 | // | |
282 | // If anything here fails, there's not really much we can do, so we leak | |
283 | // the buffer/OVERLAPPED pointers to ensure we're at least memory safe. | |
284 | if self.pipe.cancel_io().is_err() || self.result().is_err() { | |
285 | let buf = mem::replace(self.dst, Vec::new()); | |
286 | let overlapped = Box::new(unsafe { mem::zeroed() }); | |
287 | let overlapped = mem::replace(&mut self.overlapped, overlapped); | |
288 | mem::forget((buf, overlapped)); | |
289 | } | |
290 | } | |
291 | } | |
292 | ||
293 | unsafe fn slice_to_end(v: &mut Vec<u8>) -> &mut [u8] { | |
294 | if v.capacity() == 0 { | |
295 | v.reserve(16); | |
296 | } | |
297 | if v.capacity() == v.len() { | |
298 | v.reserve(1); | |
299 | } | |
300 | slice::from_raw_parts_mut(v.as_mut_ptr().offset(v.len() as isize), | |
301 | v.capacity() - v.len()) | |
302 | } |