]> git.proxmox.com Git - rustc.git/blob - src/libstd/sys/windows/pipe.rs
Imported Upstream version 1.0.0~beta
[rustc.git] / src / libstd / sys / windows / pipe.rs
1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Named pipes implementation for windows
12 //!
13 //! If are unfortunate enough to be reading this code, I would like to first
14 //! apologize. This was my first encounter with windows named pipes, and it
15 //! didn't exactly turn out very cleanly. If you, too, are new to named pipes,
16 //! read on as I'll try to explain some fun things that I ran into.
17 //!
18 //! # Unix pipes vs Named pipes
19 //!
20 //! As with everything else, named pipes on windows are pretty different from
21 //! unix pipes on unix. On unix, you use one "server pipe" to accept new client
22 //! pipes. So long as this server pipe is active, new children pipes can
23 //! connect. On windows, you instead have a number of "server pipes", and each
24 //! of these server pipes can throughout their lifetime be attached to a client
25 //! or not. Once attached to a client, a server pipe may then disconnect at a
26 //! later date.
27 //!
28 //! # Accepting clients
29 //!
30 //! As with most other I/O interfaces, our Listener/Acceptor/Stream interfaces
31 //! are built around the unix flavors. This means that we have one "server
32 //! pipe" to which many clients can connect. In order to make this compatible
33 //! with the windows model, each connected client consumes ownership of a server
34 //! pipe, and then a new server pipe is created for the next client.
35 //!
36 //! Note that the server pipes attached to clients are never given back to the
37 //! listener for recycling. This could possibly be implemented with a channel so
38 //! the listener half can re-use server pipes, but for now I err'd on the simple
39 //! side of things. Each stream accepted by a listener will destroy the server
40 //! pipe after the stream is dropped.
41 //!
42 //! This model ends up having a small race or two, and you can find more details
43 //! on the `native_accept` method.
44 //!
45 //! # Simultaneous reads and writes
46 //!
47 //! In testing, I found that two simultaneous writes and two simultaneous reads
48 //! on a pipe ended up working out just fine, but problems were encountered when
49 //! a read was executed simultaneously with a write. After some googling around,
50 //! it sounded like named pipes just weren't built for this kind of interaction,
51 //! and the suggested solution was to use overlapped I/O.
52 //!
53 //! I don't really know what overlapped I/O is, but my basic understanding after
54 //! reading about it is that you have an external Event which is used to signal
55 //! I/O completion, passed around in some OVERLAPPED structures. As to what this
56 //! is, I'm not exactly sure.
57 //!
58 //! This problem implies that all named pipes are created with the
59 //! FILE_FLAG_OVERLAPPED option. This means that all of their I/O is
60 //! asynchronous. Each I/O operation has an associated OVERLAPPED structure, and
61 //! inside of this structure is a HANDLE from CreateEvent. After the I/O is
62 //! determined to be pending (may complete in the future), the
63 //! GetOverlappedResult function is used to block on the event, waiting for the
64 //! I/O to finish.
65 //!
66 //! This scheme ended up working well enough. There were two snags that I ran
67 //! into, however:
68 //!
69 //! * Each UnixStream instance needs its own read/write events to wait on. These
70 //! can't be shared among clones of the same stream because the documentation
71 //! states that it unsets the event when the I/O is started (would possibly
72 //! corrupt other events simultaneously waiting). For convenience's sake,
73 //! these events are lazily initialized.
74 //!
75 //! * Each server pipe needs to be created with FILE_FLAG_OVERLAPPED in addition
76 //! to all pipes created through `connect`. Notably this means that the
77 //! ConnectNamedPipe function is nonblocking, implying that the Listener needs
78 //! to have yet another event to do the actual blocking.
79 //!
80 //! # Conclusion
81 //!
82 //! The conclusion here is that I probably don't know the best way to work with
83 //! windows named pipes, but the solution here seems to work well enough to get
84 //! the test suite passing (the suite is in libstd), and that's good enough for
85 //! me!
86
87 #![allow(deprecated)]
88
89 use prelude::v1::*;
90
91 use libc;
92 use ffi::CString;
93 use old_io::{self, IoError, IoResult};
94 use mem;
95 use ptr;
96 use str;
97 use sync::atomic::{AtomicBool, Ordering};
98 use sync::{Arc, Mutex};
99
100 use sys_common::{self, eof};
101
102 use super::{c, os, timer, decode_error_detailed};
103
104 fn to_utf16(c: &CString) -> IoResult<Vec<u16>> {
105 super::to_utf16(str::from_utf8(c.as_bytes()).ok())
106 }
107
108 struct Event(libc::HANDLE);
109
110 impl Event {
111 fn new(manual_reset: bool, initial_state: bool) -> IoResult<Event> {
112 let event = unsafe {
113 libc::CreateEventW(ptr::null_mut(),
114 manual_reset as libc::BOOL,
115 initial_state as libc::BOOL,
116 ptr::null())
117 };
118 if event as usize == 0 {
119 Err(super::last_error())
120 } else {
121 Ok(Event(event))
122 }
123 }
124
125 fn handle(&self) -> libc::HANDLE { let Event(handle) = *self; handle }
126 }
127
128 impl Drop for Event {
129 fn drop(&mut self) {
130 unsafe { let _ = libc::CloseHandle(self.handle()); }
131 }
132 }
133
134 unsafe impl Send for Event {}
135 unsafe impl Sync for Event {}
136
137 struct Inner {
138 handle: libc::HANDLE,
139 lock: Mutex<()>,
140 read_closed: AtomicBool,
141 write_closed: AtomicBool,
142 }
143
144 impl Inner {
145 fn new(handle: libc::HANDLE) -> Inner {
146 Inner {
147 handle: handle,
148 lock: Mutex::new(()),
149 read_closed: AtomicBool::new(false),
150 write_closed: AtomicBool::new(false),
151 }
152 }
153 }
154
155 impl Drop for Inner {
156 fn drop(&mut self) {
157 unsafe {
158 let _ = libc::FlushFileBuffers(self.handle);
159 let _ = libc::CloseHandle(self.handle);
160 }
161 }
162 }
163
164 unsafe impl Send for Inner {}
165 unsafe impl Sync for Inner {}
166
167 unsafe fn pipe(name: *const u16, init: bool) -> libc::HANDLE {
168 libc::CreateNamedPipeW(
169 name,
170 libc::PIPE_ACCESS_DUPLEX |
171 if init {libc::FILE_FLAG_FIRST_PIPE_INSTANCE} else {0} |
172 libc::FILE_FLAG_OVERLAPPED,
173 libc::PIPE_TYPE_BYTE | libc::PIPE_READMODE_BYTE |
174 libc::PIPE_WAIT,
175 libc::PIPE_UNLIMITED_INSTANCES,
176 65536,
177 65536,
178 0,
179 ptr::null_mut()
180 )
181 }
182
183 pub fn await(handle: libc::HANDLE, deadline: u64,
184 events: &[libc::HANDLE]) -> IoResult<usize> {
185 use libc::consts::os::extra::{WAIT_FAILED, WAIT_TIMEOUT, WAIT_OBJECT_0};
186
187 // If we've got a timeout, use WaitForSingleObject in tandem with CancelIo
188 // to figure out if we should indeed get the result.
189 let ms = if deadline == 0 {
190 libc::INFINITE as u64
191 } else {
192 let now = timer::now();
193 if deadline < now {0} else {deadline - now}
194 };
195 let ret = unsafe {
196 c::WaitForMultipleObjects(events.len() as libc::DWORD,
197 events.as_ptr(),
198 libc::FALSE,
199 ms as libc::DWORD)
200 };
201 match ret {
202 WAIT_FAILED => Err(super::last_error()),
203 WAIT_TIMEOUT => unsafe {
204 let _ = c::CancelIo(handle);
205 Err(sys_common::timeout("operation timed out"))
206 },
207 n => Ok((n - WAIT_OBJECT_0) as usize)
208 }
209 }
210
211 fn epipe() -> IoError {
212 IoError {
213 kind: old_io::EndOfFile,
214 desc: "the pipe has ended",
215 detail: None,
216 }
217 }
218
219 ////////////////////////////////////////////////////////////////////////////////
220 // Unix Streams
221 ////////////////////////////////////////////////////////////////////////////////
222
223 pub struct UnixStream {
224 inner: Arc<Inner>,
225 write: Option<Event>,
226 read: Option<Event>,
227 read_deadline: u64,
228 write_deadline: u64,
229 }
230
231 impl UnixStream {
232 fn try_connect(p: *const u16) -> Option<libc::HANDLE> {
233 // Note that most of this is lifted from the libuv implementation.
234 // The idea is that if we fail to open a pipe in read/write mode
235 // that we try afterwards in just read or just write
236 let mut result = unsafe {
237 libc::CreateFileW(p,
238 libc::GENERIC_READ | libc::GENERIC_WRITE,
239 0,
240 ptr::null_mut(),
241 libc::OPEN_EXISTING,
242 libc::FILE_FLAG_OVERLAPPED,
243 ptr::null_mut())
244 };
245 if result != libc::INVALID_HANDLE_VALUE {
246 return Some(result)
247 }
248
249 let err = unsafe { libc::GetLastError() };
250 if err == libc::ERROR_ACCESS_DENIED as libc::DWORD {
251 result = unsafe {
252 libc::CreateFileW(p,
253 libc::GENERIC_READ | libc::FILE_WRITE_ATTRIBUTES,
254 0,
255 ptr::null_mut(),
256 libc::OPEN_EXISTING,
257 libc::FILE_FLAG_OVERLAPPED,
258 ptr::null_mut())
259 };
260 if result != libc::INVALID_HANDLE_VALUE {
261 return Some(result)
262 }
263 }
264 let err = unsafe { libc::GetLastError() };
265 if err == libc::ERROR_ACCESS_DENIED as libc::DWORD {
266 result = unsafe {
267 libc::CreateFileW(p,
268 libc::GENERIC_WRITE | libc::FILE_READ_ATTRIBUTES,
269 0,
270 ptr::null_mut(),
271 libc::OPEN_EXISTING,
272 libc::FILE_FLAG_OVERLAPPED,
273 ptr::null_mut())
274 };
275 if result != libc::INVALID_HANDLE_VALUE {
276 return Some(result)
277 }
278 }
279 None
280 }
281
282 pub fn connect(addr: &CString, timeout: Option<u64>) -> IoResult<UnixStream> {
283 let addr = try!(to_utf16(addr));
284 let start = timer::now();
285 loop {
286 match UnixStream::try_connect(addr.as_ptr()) {
287 Some(handle) => {
288 let inner = Inner::new(handle);
289 let mut mode = libc::PIPE_TYPE_BYTE |
290 libc::PIPE_READMODE_BYTE |
291 libc::PIPE_WAIT;
292 let ret = unsafe {
293 libc::SetNamedPipeHandleState(inner.handle,
294 &mut mode,
295 ptr::null_mut(),
296 ptr::null_mut())
297 };
298 return if ret == 0 {
299 Err(super::last_error())
300 } else {
301 Ok(UnixStream {
302 inner: Arc::new(inner),
303 read: None,
304 write: None,
305 read_deadline: 0,
306 write_deadline: 0,
307 })
308 }
309 }
310 None => {}
311 }
312
313 // On windows, if you fail to connect, you may need to call the
314 // `WaitNamedPipe` function, and this is indicated with an error
315 // code of ERROR_PIPE_BUSY.
316 let code = unsafe { libc::GetLastError() };
317 if code as isize != libc::ERROR_PIPE_BUSY as isize {
318 return Err(super::last_error())
319 }
320
321 match timeout {
322 Some(timeout) => {
323 let now = timer::now();
324 let timed_out = (now - start) >= timeout || unsafe {
325 let ms = (timeout - (now - start)) as libc::DWORD;
326 libc::WaitNamedPipeW(addr.as_ptr(), ms) == 0
327 };
328 if timed_out {
329 return Err(sys_common::timeout("connect timed out"))
330 }
331 }
332
333 // An example I found on Microsoft's website used 20
334 // seconds, libuv uses 30 seconds, hence we make the
335 // obvious choice of waiting for 25 seconds.
336 None => {
337 if unsafe { libc::WaitNamedPipeW(addr.as_ptr(), 25000) } == 0 {
338 return Err(super::last_error())
339 }
340 }
341 }
342 }
343 }
344
345 pub fn handle(&self) -> libc::HANDLE { self.inner.handle }
346
347 fn read_closed(&self) -> bool {
348 self.inner.read_closed.load(Ordering::SeqCst)
349 }
350
351 fn write_closed(&self) -> bool {
352 self.inner.write_closed.load(Ordering::SeqCst)
353 }
354
355 fn cancel_io(&self) -> IoResult<()> {
356 match unsafe { c::CancelIoEx(self.handle(), ptr::null_mut()) } {
357 0 if os::errno() == libc::ERROR_NOT_FOUND as i32 => {
358 Ok(())
359 }
360 0 => Err(super::last_error()),
361 _ => Ok(())
362 }
363 }
364
365 pub fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
366 if self.read.is_none() {
367 self.read = Some(try!(Event::new(true, false)));
368 }
369
370 let mut bytes_read = 0;
371 let mut overlapped: libc::OVERLAPPED = unsafe { mem::zeroed() };
372 overlapped.hEvent = self.read.as_ref().unwrap().handle();
373
374 // Pre-flight check to see if the reading half has been closed. This
375 // must be done before issuing the ReadFile request, but after we
376 // acquire the lock.
377 //
378 // See comments in close_read() about why this lock is necessary.
379 let guard = self.inner.lock.lock();
380 if self.read_closed() {
381 return Err(eof())
382 }
383
384 // Issue a nonblocking requests, succeeding quickly if it happened to
385 // succeed.
386 let ret = unsafe {
387 libc::ReadFile(self.handle(),
388 buf.as_ptr() as libc::LPVOID,
389 buf.len() as libc::DWORD,
390 &mut bytes_read,
391 &mut overlapped)
392 };
393 if ret != 0 { return Ok(bytes_read as usize) }
394
395 // If our errno doesn't say that the I/O is pending, then we hit some
396 // legitimate error and return immediately.
397 if os::errno() != libc::ERROR_IO_PENDING as i32 {
398 return Err(super::last_error())
399 }
400
401 // Now that we've issued a successful nonblocking request, we need to
402 // wait for it to finish. This can all be done outside the lock because
403 // we'll see any invocation of CancelIoEx. We also call this in a loop
404 // because we're woken up if the writing half is closed, we just need to
405 // realize that the reading half wasn't closed and we go right back to
406 // sleep.
407 drop(guard);
408 loop {
409 // Process a timeout if one is pending
410 let wait_succeeded = await(self.handle(), self.read_deadline,
411 &[overlapped.hEvent]);
412
413 let ret = unsafe {
414 libc::GetOverlappedResult(self.handle(),
415 &mut overlapped,
416 &mut bytes_read,
417 libc::TRUE)
418 };
419 // If we succeeded, or we failed for some reason other than
420 // CancelIoEx, return immediately
421 if ret != 0 { return Ok(bytes_read as usize) }
422 if os::errno() != libc::ERROR_OPERATION_ABORTED as i32 {
423 return Err(super::last_error())
424 }
425
426 // If the reading half is now closed, then we're done. If we woke up
427 // because the writing half was closed, keep trying.
428 if wait_succeeded.is_err() {
429 return Err(sys_common::timeout("read timed out"))
430 }
431 if self.read_closed() {
432 return Err(eof())
433 }
434 }
435 }
436
437 pub fn write(&mut self, buf: &[u8]) -> IoResult<()> {
438 if self.write.is_none() {
439 self.write = Some(try!(Event::new(true, false)));
440 }
441
442 let mut offset = 0;
443 let mut overlapped: libc::OVERLAPPED = unsafe { mem::zeroed() };
444 overlapped.hEvent = self.write.as_ref().unwrap().handle();
445
446 while offset < buf.len() {
447 let mut bytes_written = 0;
448
449 // This sequence below is quite similar to the one found in read().
450 // Some careful looping is done to ensure that if close_write() is
451 // invoked we bail out early, and if close_read() is invoked we keep
452 // going after we woke up.
453 //
454 // See comments in close_read() about why this lock is necessary.
455 let guard = self.inner.lock.lock();
456 if self.write_closed() {
457 return Err(epipe())
458 }
459 let ret = unsafe {
460 libc::WriteFile(self.handle(),
461 buf[offset..].as_ptr() as libc::LPVOID,
462 (buf.len() - offset) as libc::DWORD,
463 &mut bytes_written,
464 &mut overlapped)
465 };
466 let err = os::errno();
467 drop(guard);
468
469 if ret == 0 {
470 if err != libc::ERROR_IO_PENDING as i32 {
471 return Err(decode_error_detailed(err as i32))
472 }
473 // Process a timeout if one is pending
474 let wait_succeeded = await(self.handle(), self.write_deadline,
475 &[overlapped.hEvent]);
476 let ret = unsafe {
477 libc::GetOverlappedResult(self.handle(),
478 &mut overlapped,
479 &mut bytes_written,
480 libc::TRUE)
481 };
482 // If we weren't aborted, this was a legit error, if we were
483 // aborted, then check to see if the write half was actually
484 // closed or whether we woke up from the read half closing.
485 if ret == 0 {
486 if os::errno() != libc::ERROR_OPERATION_ABORTED as i32 {
487 return Err(super::last_error())
488 }
489 if !wait_succeeded.is_ok() {
490 let amt = offset + bytes_written as usize;
491 return if amt > 0 {
492 Err(IoError {
493 kind: old_io::ShortWrite(amt),
494 desc: "short write during write",
495 detail: None,
496 })
497 } else {
498 Err(sys_common::timeout("write timed out"))
499 }
500 }
501 if self.write_closed() {
502 return Err(epipe())
503 }
504 continue // retry
505 }
506 }
507 offset += bytes_written as usize;
508 }
509 Ok(())
510 }
511
512 pub fn close_read(&mut self) -> IoResult<()> {
513 // On windows, there's no actual shutdown() method for pipes, so we're
514 // forced to emulate the behavior manually at the application level. To
515 // do this, we need to both cancel any pending requests, as well as
516 // prevent all future requests from succeeding. These two operations are
517 // not atomic with respect to one another, so we must use a lock to do
518 // so.
519 //
520 // The read() code looks like:
521 //
522 // 1. Make sure the pipe is still open
523 // 2. Submit a read request
524 // 3. Wait for the read request to finish
525 //
526 // The race this lock is preventing is if another thread invokes
527 // close_read() between steps 1 and 2. By atomically executing steps 1
528 // and 2 with a lock with respect to close_read(), we're guaranteed that
529 // no thread will erroneously sit in a read forever.
530 let _guard = self.inner.lock.lock();
531 self.inner.read_closed.store(true, Ordering::SeqCst);
532 self.cancel_io()
533 }
534
535 pub fn close_write(&mut self) -> IoResult<()> {
536 // see comments in close_read() for why this lock is necessary
537 let _guard = self.inner.lock.lock();
538 self.inner.write_closed.store(true, Ordering::SeqCst);
539 self.cancel_io()
540 }
541
542 pub fn set_timeout(&mut self, timeout: Option<u64>) {
543 let deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
544 self.read_deadline = deadline;
545 self.write_deadline = deadline;
546 }
547 pub fn set_read_timeout(&mut self, timeout: Option<u64>) {
548 self.read_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
549 }
550 pub fn set_write_timeout(&mut self, timeout: Option<u64>) {
551 self.write_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
552 }
553 }
554
555 impl Clone for UnixStream {
556 fn clone(&self) -> UnixStream {
557 UnixStream {
558 inner: self.inner.clone(),
559 read: None,
560 write: None,
561 read_deadline: 0,
562 write_deadline: 0,
563 }
564 }
565 }
566
567 ////////////////////////////////////////////////////////////////////////////////
568 // Unix Listener
569 ////////////////////////////////////////////////////////////////////////////////
570
571 pub struct UnixListener {
572 handle: libc::HANDLE,
573 name: CString,
574 }
575
576 unsafe impl Send for UnixListener {}
577 unsafe impl Sync for UnixListener {}
578
579 impl UnixListener {
580 pub fn bind(addr: &CString) -> IoResult<UnixListener> {
581 // Although we technically don't need the pipe until much later, we
582 // create the initial handle up front to test the validity of the name
583 // and such.
584 let addr_v = try!(to_utf16(addr));
585 let ret = unsafe { pipe(addr_v.as_ptr(), true) };
586 if ret == libc::INVALID_HANDLE_VALUE {
587 Err(super::last_error())
588 } else {
589 Ok(UnixListener { handle: ret, name: addr.clone() })
590 }
591 }
592
593 pub fn listen(self) -> IoResult<UnixAcceptor> {
594 Ok(UnixAcceptor {
595 listener: self,
596 event: try!(Event::new(true, false)),
597 deadline: 0,
598 inner: Arc::new(AcceptorState {
599 abort: try!(Event::new(true, false)),
600 closed: AtomicBool::new(false),
601 }),
602 })
603 }
604
605 pub fn handle(&self) -> libc::HANDLE {
606 self.handle
607 }
608 }
609
610 impl Drop for UnixListener {
611 fn drop(&mut self) {
612 unsafe { let _ = libc::CloseHandle(self.handle); }
613 }
614 }
615
616 pub struct UnixAcceptor {
617 inner: Arc<AcceptorState>,
618 listener: UnixListener,
619 event: Event,
620 deadline: u64,
621 }
622
623 struct AcceptorState {
624 abort: Event,
625 closed: AtomicBool,
626 }
627
628 impl UnixAcceptor {
629 pub fn accept(&mut self) -> IoResult<UnixStream> {
630 // This function has some funky implementation details when working with
631 // unix pipes. On windows, each server named pipe handle can be
632 // connected to a one or zero clients. To the best of my knowledge, a
633 // named server is considered active and present if there exists at
634 // least one server named pipe for it.
635 //
636 // The model of this function is to take the current known server
637 // handle, connect a client to it, and then transfer ownership to the
638 // UnixStream instance. The next time accept() is invoked, it'll need a
639 // different server handle to connect a client to.
640 //
641 // Note that there is a possible race here. Once our server pipe is
642 // handed off to a `UnixStream` object, the stream could be closed,
643 // meaning that there would be no active server pipes, hence even though
644 // we have a valid `UnixAcceptor`, no one can connect to it. For this
645 // reason, we generate the next accept call's server pipe at the end of
646 // this function call.
647 //
648 // This provides us an invariant that we always have at least one server
649 // connection open at a time, meaning that all connects to this acceptor
650 // should succeed while this is active.
651 //
652 // The actual implementation of doing this is a little tricky. Once a
653 // server pipe is created, a client can connect to it at any time. I
654 // assume that which server a client connects to is nondeterministic, so
655 // we also need to guarantee that the only server able to be connected
656 // to is the one that we're calling ConnectNamedPipe on. This means that
657 // we have to create the second server pipe *after* we've already
658 // accepted a connection. In order to at least somewhat gracefully
659 // handle errors, this means that if the second server pipe creation
660 // fails that we disconnect the connected client and then just keep
661 // using the original server pipe.
662 let handle = self.listener.handle;
663
664 // If we've had an artificial call to close_accept, be sure to never
665 // proceed in accepting new clients in the future
666 if self.inner.closed.load(Ordering::SeqCst) { return Err(eof()) }
667
668 let name = try!(to_utf16(&self.listener.name));
669
670 // Once we've got a "server handle", we need to wait for a client to
671 // connect. The ConnectNamedPipe function will block this thread until
672 // someone on the other end connects. This function can "fail" if a
673 // client connects after we created the pipe but before we got down
674 // here. Thanks windows.
675 let mut overlapped: libc::OVERLAPPED = unsafe { mem::zeroed() };
676 overlapped.hEvent = self.event.handle();
677 if unsafe { libc::ConnectNamedPipe(handle, &mut overlapped) == 0 } {
678 let mut err = unsafe { libc::GetLastError() };
679
680 if err == libc::ERROR_IO_PENDING as libc::DWORD {
681 // Process a timeout if one is pending
682 let wait_succeeded = await(handle, self.deadline,
683 &[self.inner.abort.handle(),
684 overlapped.hEvent]);
685
686 // This will block until the overlapped I/O is completed. The
687 // timeout was previously handled, so this will either block in
688 // the normal case or succeed very quickly in the timeout case.
689 let ret = unsafe {
690 let mut transfer = 0;
691 libc::GetOverlappedResult(handle,
692 &mut overlapped,
693 &mut transfer,
694 libc::TRUE)
695 };
696 if ret == 0 {
697 if wait_succeeded.is_ok() {
698 err = unsafe { libc::GetLastError() };
699 } else {
700 return Err(sys_common::timeout("accept timed out"))
701 }
702 } else {
703 // we succeeded, bypass the check below
704 err = libc::ERROR_PIPE_CONNECTED as libc::DWORD;
705 }
706 }
707 if err != libc::ERROR_PIPE_CONNECTED as libc::DWORD {
708 return Err(super::last_error())
709 }
710 }
711
712 // Now that we've got a connected client to our handle, we need to
713 // create a second server pipe. If this fails, we disconnect the
714 // connected client and return an error (see comments above).
715 let new_handle = unsafe { pipe(name.as_ptr(), false) };
716 if new_handle == libc::INVALID_HANDLE_VALUE {
717 let ret = Err(super::last_error());
718 // If our disconnection fails, then there's not really a whole lot
719 // that we can do, so panic
720 let err = unsafe { libc::DisconnectNamedPipe(handle) };
721 assert!(err != 0);
722 return ret;
723 } else {
724 self.listener.handle = new_handle;
725 }
726
727 // Transfer ownership of our handle into this stream
728 Ok(UnixStream {
729 inner: Arc::new(Inner::new(handle)),
730 read: None,
731 write: None,
732 read_deadline: 0,
733 write_deadline: 0,
734 })
735 }
736
737 pub fn set_timeout(&mut self, timeout: Option<u64>) {
738 self.deadline = timeout.map(|i| i + timer::now()).unwrap_or(0);
739 }
740
741 pub fn close_accept(&mut self) -> IoResult<()> {
742 self.inner.closed.store(true, Ordering::SeqCst);
743 let ret = unsafe {
744 c::SetEvent(self.inner.abort.handle())
745 };
746 if ret == 0 {
747 Err(super::last_error())
748 } else {
749 Ok(())
750 }
751 }
752
753 pub fn handle(&self) -> libc::HANDLE {
754 self.listener.handle()
755 }
756 }
757
758 impl Clone for UnixAcceptor {
759 fn clone(&self) -> UnixAcceptor {
760 let name = to_utf16(&self.listener.name).unwrap();
761 UnixAcceptor {
762 inner: self.inner.clone(),
763 event: Event::new(true, false).unwrap(),
764 deadline: 0,
765 listener: UnixListener {
766 name: self.listener.name.clone(),
767 handle: unsafe {
768 let p = pipe(name.as_ptr(), false) ;
769 assert!(p != libc::INVALID_HANDLE_VALUE as libc::HANDLE);
770 p
771 },
772 },
773 }
774 }
775 }