1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! Named pipes implementation for windows
13 //! If are unfortunate enough to be reading this code, I would like to first
14 //! apologize. This was my first encounter with windows named pipes, and it
15 //! didn't exactly turn out very cleanly. If you, too, are new to named pipes,
16 //! read on as I'll try to explain some fun things that I ran into.
18 //! # Unix pipes vs Named pipes
20 //! As with everything else, named pipes on windows are pretty different from
21 //! unix pipes on unix. On unix, you use one "server pipe" to accept new client
22 //! pipes. So long as this server pipe is active, new children pipes can
23 //! connect. On windows, you instead have a number of "server pipes", and each
24 //! of these server pipes can throughout their lifetime be attached to a client
25 //! or not. Once attached to a client, a server pipe may then disconnect at a
28 //! # Accepting clients
30 //! As with most other I/O interfaces, our Listener/Acceptor/Stream interfaces
31 //! are built around the unix flavors. This means that we have one "server
32 //! pipe" to which many clients can connect. In order to make this compatible
33 //! with the windows model, each connected client consumes ownership of a server
34 //! pipe, and then a new server pipe is created for the next client.
36 //! Note that the server pipes attached to clients are never given back to the
37 //! listener for recycling. This could possibly be implemented with a channel so
38 //! the listener half can re-use server pipes, but for now I err'd on the simple
39 //! side of things. Each stream accepted by a listener will destroy the server
40 //! pipe after the stream is dropped.
42 //! This model ends up having a small race or two, and you can find more details
43 //! on the `native_accept` method.
45 //! # Simultaneous reads and writes
47 //! In testing, I found that two simultaneous writes and two simultaneous reads
48 //! on a pipe ended up working out just fine, but problems were encountered when
49 //! a read was executed simultaneously with a write. After some googling around,
50 //! it sounded like named pipes just weren't built for this kind of interaction,
51 //! and the suggested solution was to use overlapped I/O.
53 //! I don't really know what overlapped I/O is, but my basic understanding after
54 //! reading about it is that you have an external Event which is used to signal
55 //! I/O completion, passed around in some OVERLAPPED structures. As to what this
56 //! is, I'm not exactly sure.
58 //! This problem implies that all named pipes are created with the
59 //! FILE_FLAG_OVERLAPPED option. This means that all of their I/O is
60 //! asynchronous. Each I/O operation has an associated OVERLAPPED structure, and
61 //! inside of this structure is a HANDLE from CreateEvent. After the I/O is
62 //! determined to be pending (may complete in the future), the
63 //! GetOverlappedResult function is used to block on the event, waiting for the
66 //! This scheme ended up working well enough. There were two snags that I ran
69 //! * Each UnixStream instance needs its own read/write events to wait on. These
70 //! can't be shared among clones of the same stream because the documentation
71 //! states that it unsets the event when the I/O is started (would possibly
72 //! corrupt other events simultaneously waiting). For convenience's sake,
73 //! these events are lazily initialized.
75 //! * Each server pipe needs to be created with FILE_FLAG_OVERLAPPED in addition
76 //! to all pipes created through `connect`. Notably this means that the
77 //! ConnectNamedPipe function is nonblocking, implying that the Listener needs
78 //! to have yet another event to do the actual blocking.
82 //! The conclusion here is that I probably don't know the best way to work with
83 //! windows named pipes, but the solution here seems to work well enough to get
84 //! the test suite passing (the suite is in libstd), and that's good enough for
93 use old_io
::{self, IoError, IoResult}
;
97 use sync
::atomic
::{AtomicBool, Ordering}
;
98 use sync
::{Arc, Mutex}
;
100 use sys_common
::{self, eof}
;
102 use super::{c, os, timer, decode_error_detailed}
;
104 fn to_utf16(c
: &CString
) -> IoResult
<Vec
<u16>> {
105 super::to_utf16(str::from_utf8(c
.as_bytes()).ok())
108 struct Event(libc
::HANDLE
);
111 fn new(manual_reset
: bool
, initial_state
: bool
) -> IoResult
<Event
> {
113 libc
::CreateEventW(ptr
::null_mut(),
114 manual_reset
as libc
::BOOL
,
115 initial_state
as libc
::BOOL
,
118 if event
as usize == 0 {
119 Err(super::last_error())
125 fn handle(&self) -> libc
::HANDLE { let Event(handle) = *self; handle }
128 impl Drop
for Event
{
130 unsafe { let _ = libc::CloseHandle(self.handle()); }
134 unsafe impl Send
for Event {}
135 unsafe impl Sync
for Event {}
138 handle
: libc
::HANDLE
,
140 read_closed
: AtomicBool
,
141 write_closed
: AtomicBool
,
145 fn new(handle
: libc
::HANDLE
) -> Inner
{
148 lock
: Mutex
::new(()),
149 read_closed
: AtomicBool
::new(false),
150 write_closed
: AtomicBool
::new(false),
155 impl Drop
for Inner
{
158 let _
= libc
::FlushFileBuffers(self.handle
);
159 let _
= libc
::CloseHandle(self.handle
);
164 unsafe impl Send
for Inner {}
165 unsafe impl Sync
for Inner {}
167 unsafe fn pipe(name
: *const u16, init
: bool
) -> libc
::HANDLE
{
168 libc
::CreateNamedPipeW(
170 libc
::PIPE_ACCESS_DUPLEX
|
171 if init {libc::FILE_FLAG_FIRST_PIPE_INSTANCE}
else {0}
|
172 libc
::FILE_FLAG_OVERLAPPED
,
173 libc
::PIPE_TYPE_BYTE
| libc
::PIPE_READMODE_BYTE
|
175 libc
::PIPE_UNLIMITED_INSTANCES
,
183 pub fn await(handle
: libc
::HANDLE
, deadline
: u64,
184 events
: &[libc
::HANDLE
]) -> IoResult
<usize> {
185 use libc
::consts
::os
::extra
::{WAIT_FAILED, WAIT_TIMEOUT, WAIT_OBJECT_0}
;
187 // If we've got a timeout, use WaitForSingleObject in tandem with CancelIo
188 // to figure out if we should indeed get the result.
189 let ms
= if deadline
== 0 {
190 libc
::INFINITE
as u64
192 let now
= timer
::now();
193 if deadline
< now {0}
else {deadline - now}
196 c
::WaitForMultipleObjects(events
.len() as libc
::DWORD
,
202 WAIT_FAILED
=> Err(super::last_error()),
203 WAIT_TIMEOUT
=> unsafe {
204 let _
= c
::CancelIo(handle
);
205 Err(sys_common
::timeout("operation timed out"))
207 n
=> Ok((n
- WAIT_OBJECT_0
) as usize)
211 fn epipe() -> IoError
{
213 kind
: old_io
::EndOfFile
,
214 desc
: "the pipe has ended",
219 ////////////////////////////////////////////////////////////////////////////////
221 ////////////////////////////////////////////////////////////////////////////////
223 pub struct UnixStream
{
225 write
: Option
<Event
>,
232 fn try_connect(p
: *const u16) -> Option
<libc
::HANDLE
> {
233 // Note that most of this is lifted from the libuv implementation.
234 // The idea is that if we fail to open a pipe in read/write mode
235 // that we try afterwards in just read or just write
236 let mut result
= unsafe {
238 libc
::GENERIC_READ
| libc
::GENERIC_WRITE
,
242 libc
::FILE_FLAG_OVERLAPPED
,
245 if result
!= libc
::INVALID_HANDLE_VALUE
{
249 let err
= unsafe { libc::GetLastError() }
;
250 if err
== libc
::ERROR_ACCESS_DENIED
as libc
::DWORD
{
253 libc
::GENERIC_READ
| libc
::FILE_WRITE_ATTRIBUTES
,
257 libc
::FILE_FLAG_OVERLAPPED
,
260 if result
!= libc
::INVALID_HANDLE_VALUE
{
264 let err
= unsafe { libc::GetLastError() }
;
265 if err
== libc
::ERROR_ACCESS_DENIED
as libc
::DWORD
{
268 libc
::GENERIC_WRITE
| libc
::FILE_READ_ATTRIBUTES
,
272 libc
::FILE_FLAG_OVERLAPPED
,
275 if result
!= libc
::INVALID_HANDLE_VALUE
{
282 pub fn connect(addr
: &CString
, timeout
: Option
<u64>) -> IoResult
<UnixStream
> {
283 let addr
= try
!(to_utf16(addr
));
284 let start
= timer
::now();
286 match UnixStream
::try_connect(addr
.as_ptr()) {
288 let inner
= Inner
::new(handle
);
289 let mut mode
= libc
::PIPE_TYPE_BYTE
|
290 libc
::PIPE_READMODE_BYTE
|
293 libc
::SetNamedPipeHandleState(inner
.handle
,
299 Err(super::last_error())
302 inner
: Arc
::new(inner
),
313 // On windows, if you fail to connect, you may need to call the
314 // `WaitNamedPipe` function, and this is indicated with an error
315 // code of ERROR_PIPE_BUSY.
316 let code
= unsafe { libc::GetLastError() }
;
317 if code
as isize != libc
::ERROR_PIPE_BUSY
as isize {
318 return Err(super::last_error())
323 let now
= timer
::now();
324 let timed_out
= (now
- start
) >= timeout
|| unsafe {
325 let ms
= (timeout
- (now
- start
)) as libc
::DWORD
;
326 libc
::WaitNamedPipeW(addr
.as_ptr(), ms
) == 0
329 return Err(sys_common
::timeout("connect timed out"))
333 // An example I found on Microsoft's website used 20
334 // seconds, libuv uses 30 seconds, hence we make the
335 // obvious choice of waiting for 25 seconds.
337 if unsafe { libc::WaitNamedPipeW(addr.as_ptr(), 25000) }
== 0 {
338 return Err(super::last_error())
345 pub fn handle(&self) -> libc
::HANDLE { self.inner.handle }
347 fn read_closed(&self) -> bool
{
348 self.inner
.read_closed
.load(Ordering
::SeqCst
)
351 fn write_closed(&self) -> bool
{
352 self.inner
.write_closed
.load(Ordering
::SeqCst
)
355 fn cancel_io(&self) -> IoResult
<()> {
356 match unsafe { c::CancelIoEx(self.handle(), ptr::null_mut()) }
{
357 0 if os
::errno() == libc
::ERROR_NOT_FOUND
as i32 => {
360 0 => Err(super::last_error()),
365 pub fn read(&mut self, buf
: &mut [u8]) -> IoResult
<usize> {
366 if self.read
.is_none() {
367 self.read
= Some(try
!(Event
::new(true, false)));
370 let mut bytes_read
= 0;
371 let mut overlapped
: libc
::OVERLAPPED
= unsafe { mem::zeroed() }
;
372 overlapped
.hEvent
= self.read
.as_ref().unwrap().handle();
374 // Pre-flight check to see if the reading half has been closed. This
375 // must be done before issuing the ReadFile request, but after we
378 // See comments in close_read() about why this lock is necessary.
379 let guard
= self.inner
.lock
.lock();
380 if self.read_closed() {
384 // Issue a nonblocking requests, succeeding quickly if it happened to
387 libc
::ReadFile(self.handle(),
388 buf
.as_ptr() as libc
::LPVOID
,
389 buf
.len() as libc
::DWORD
,
393 if ret
!= 0 { return Ok(bytes_read as usize) }
395 // If our errno doesn't say that the I/O is pending, then we hit some
396 // legitimate error and return immediately.
397 if os
::errno() != libc
::ERROR_IO_PENDING
as i32 {
398 return Err(super::last_error())
401 // Now that we've issued a successful nonblocking request, we need to
402 // wait for it to finish. This can all be done outside the lock because
403 // we'll see any invocation of CancelIoEx. We also call this in a loop
404 // because we're woken up if the writing half is closed, we just need to
405 // realize that the reading half wasn't closed and we go right back to
409 // Process a timeout if one is pending
410 let wait_succeeded
= await(self.handle(), self.read_deadline
,
411 &[overlapped
.hEvent
]);
414 libc
::GetOverlappedResult(self.handle(),
419 // If we succeeded, or we failed for some reason other than
420 // CancelIoEx, return immediately
421 if ret
!= 0 { return Ok(bytes_read as usize) }
422 if os
::errno() != libc
::ERROR_OPERATION_ABORTED
as i32 {
423 return Err(super::last_error())
426 // If the reading half is now closed, then we're done. If we woke up
427 // because the writing half was closed, keep trying.
428 if wait_succeeded
.is_err() {
429 return Err(sys_common
::timeout("read timed out"))
431 if self.read_closed() {
437 pub fn write(&mut self, buf
: &[u8]) -> IoResult
<()> {
438 if self.write
.is_none() {
439 self.write
= Some(try
!(Event
::new(true, false)));
443 let mut overlapped
: libc
::OVERLAPPED
= unsafe { mem::zeroed() }
;
444 overlapped
.hEvent
= self.write
.as_ref().unwrap().handle();
446 while offset
< buf
.len() {
447 let mut bytes_written
= 0;
449 // This sequence below is quite similar to the one found in read().
450 // Some careful looping is done to ensure that if close_write() is
451 // invoked we bail out early, and if close_read() is invoked we keep
452 // going after we woke up.
454 // See comments in close_read() about why this lock is necessary.
455 let guard
= self.inner
.lock
.lock();
456 if self.write_closed() {
460 libc
::WriteFile(self.handle(),
461 buf
[offset
..].as_ptr() as libc
::LPVOID
,
462 (buf
.len() - offset
) as libc
::DWORD
,
466 let err
= os
::errno();
470 if err
!= libc
::ERROR_IO_PENDING
as i32 {
471 return Err(decode_error_detailed(err
as i32))
473 // Process a timeout if one is pending
474 let wait_succeeded
= await(self.handle(), self.write_deadline
,
475 &[overlapped
.hEvent
]);
477 libc
::GetOverlappedResult(self.handle(),
482 // If we weren't aborted, this was a legit error, if we were
483 // aborted, then check to see if the write half was actually
484 // closed or whether we woke up from the read half closing.
486 if os
::errno() != libc
::ERROR_OPERATION_ABORTED
as i32 {
487 return Err(super::last_error())
489 if !wait_succeeded
.is_ok() {
490 let amt
= offset
+ bytes_written
as usize;
493 kind
: old_io
::ShortWrite(amt
),
494 desc
: "short write during write",
498 Err(sys_common
::timeout("write timed out"))
501 if self.write_closed() {
507 offset
+= bytes_written
as usize;
512 pub fn close_read(&mut self) -> IoResult
<()> {
513 // On windows, there's no actual shutdown() method for pipes, so we're
514 // forced to emulate the behavior manually at the application level. To
515 // do this, we need to both cancel any pending requests, as well as
516 // prevent all future requests from succeeding. These two operations are
517 // not atomic with respect to one another, so we must use a lock to do
520 // The read() code looks like:
522 // 1. Make sure the pipe is still open
523 // 2. Submit a read request
524 // 3. Wait for the read request to finish
526 // The race this lock is preventing is if another thread invokes
527 // close_read() between steps 1 and 2. By atomically executing steps 1
528 // and 2 with a lock with respect to close_read(), we're guaranteed that
529 // no thread will erroneously sit in a read forever.
530 let _guard
= self.inner
.lock
.lock();
531 self.inner
.read_closed
.store(true, Ordering
::SeqCst
);
535 pub fn close_write(&mut self) -> IoResult
<()> {
536 // see comments in close_read() for why this lock is necessary
537 let _guard
= self.inner
.lock
.lock();
538 self.inner
.write_closed
.store(true, Ordering
::SeqCst
);
542 pub fn set_timeout(&mut self, timeout
: Option
<u64>) {
543 let deadline
= timeout
.map(|a
| timer
::now() + a
).unwrap_or(0);
544 self.read_deadline
= deadline
;
545 self.write_deadline
= deadline
;
547 pub fn set_read_timeout(&mut self, timeout
: Option
<u64>) {
548 self.read_deadline
= timeout
.map(|a
| timer
::now() + a
).unwrap_or(0);
550 pub fn set_write_timeout(&mut self, timeout
: Option
<u64>) {
551 self.write_deadline
= timeout
.map(|a
| timer
::now() + a
).unwrap_or(0);
555 impl Clone
for UnixStream
{
556 fn clone(&self) -> UnixStream
{
558 inner
: self.inner
.clone(),
567 ////////////////////////////////////////////////////////////////////////////////
569 ////////////////////////////////////////////////////////////////////////////////
571 pub struct UnixListener
{
572 handle
: libc
::HANDLE
,
576 unsafe impl Send
for UnixListener {}
577 unsafe impl Sync
for UnixListener {}
580 pub fn bind(addr
: &CString
) -> IoResult
<UnixListener
> {
581 // Although we technically don't need the pipe until much later, we
582 // create the initial handle up front to test the validity of the name
584 let addr_v
= try
!(to_utf16(addr
));
585 let ret
= unsafe { pipe(addr_v.as_ptr(), true) }
;
586 if ret
== libc
::INVALID_HANDLE_VALUE
{
587 Err(super::last_error())
589 Ok(UnixListener { handle: ret, name: addr.clone() }
)
593 pub fn listen(self) -> IoResult
<UnixAcceptor
> {
596 event
: try
!(Event
::new(true, false)),
598 inner
: Arc
::new(AcceptorState
{
599 abort
: try
!(Event
::new(true, false)),
600 closed
: AtomicBool
::new(false),
605 pub fn handle(&self) -> libc
::HANDLE
{
610 impl Drop
for UnixListener
{
612 unsafe { let _ = libc::CloseHandle(self.handle); }
616 pub struct UnixAcceptor
{
617 inner
: Arc
<AcceptorState
>,
618 listener
: UnixListener
,
623 struct AcceptorState
{
629 pub fn accept(&mut self) -> IoResult
<UnixStream
> {
630 // This function has some funky implementation details when working with
631 // unix pipes. On windows, each server named pipe handle can be
632 // connected to a one or zero clients. To the best of my knowledge, a
633 // named server is considered active and present if there exists at
634 // least one server named pipe for it.
636 // The model of this function is to take the current known server
637 // handle, connect a client to it, and then transfer ownership to the
638 // UnixStream instance. The next time accept() is invoked, it'll need a
639 // different server handle to connect a client to.
641 // Note that there is a possible race here. Once our server pipe is
642 // handed off to a `UnixStream` object, the stream could be closed,
643 // meaning that there would be no active server pipes, hence even though
644 // we have a valid `UnixAcceptor`, no one can connect to it. For this
645 // reason, we generate the next accept call's server pipe at the end of
646 // this function call.
648 // This provides us an invariant that we always have at least one server
649 // connection open at a time, meaning that all connects to this acceptor
650 // should succeed while this is active.
652 // The actual implementation of doing this is a little tricky. Once a
653 // server pipe is created, a client can connect to it at any time. I
654 // assume that which server a client connects to is nondeterministic, so
655 // we also need to guarantee that the only server able to be connected
656 // to is the one that we're calling ConnectNamedPipe on. This means that
657 // we have to create the second server pipe *after* we've already
658 // accepted a connection. In order to at least somewhat gracefully
659 // handle errors, this means that if the second server pipe creation
660 // fails that we disconnect the connected client and then just keep
661 // using the original server pipe.
662 let handle
= self.listener
.handle
;
664 // If we've had an artificial call to close_accept, be sure to never
665 // proceed in accepting new clients in the future
666 if self.inner
.closed
.load(Ordering
::SeqCst
) { return Err(eof()) }
668 let name
= try
!(to_utf16(&self.listener
.name
));
670 // Once we've got a "server handle", we need to wait for a client to
671 // connect. The ConnectNamedPipe function will block this thread until
672 // someone on the other end connects. This function can "fail" if a
673 // client connects after we created the pipe but before we got down
674 // here. Thanks windows.
675 let mut overlapped
: libc
::OVERLAPPED
= unsafe { mem::zeroed() }
;
676 overlapped
.hEvent
= self.event
.handle();
677 if unsafe { libc::ConnectNamedPipe(handle, &mut overlapped) == 0 }
{
678 let mut err
= unsafe { libc::GetLastError() }
;
680 if err
== libc
::ERROR_IO_PENDING
as libc
::DWORD
{
681 // Process a timeout if one is pending
682 let wait_succeeded
= await(handle
, self.deadline
,
683 &[self.inner
.abort
.handle(),
686 // This will block until the overlapped I/O is completed. The
687 // timeout was previously handled, so this will either block in
688 // the normal case or succeed very quickly in the timeout case.
690 let mut transfer
= 0;
691 libc
::GetOverlappedResult(handle
,
697 if wait_succeeded
.is_ok() {
698 err
= unsafe { libc::GetLastError() }
;
700 return Err(sys_common
::timeout("accept timed out"))
703 // we succeeded, bypass the check below
704 err
= libc
::ERROR_PIPE_CONNECTED
as libc
::DWORD
;
707 if err
!= libc
::ERROR_PIPE_CONNECTED
as libc
::DWORD
{
708 return Err(super::last_error())
712 // Now that we've got a connected client to our handle, we need to
713 // create a second server pipe. If this fails, we disconnect the
714 // connected client and return an error (see comments above).
715 let new_handle
= unsafe { pipe(name.as_ptr(), false) }
;
716 if new_handle
== libc
::INVALID_HANDLE_VALUE
{
717 let ret
= Err(super::last_error());
718 // If our disconnection fails, then there's not really a whole lot
719 // that we can do, so panic
720 let err
= unsafe { libc::DisconnectNamedPipe(handle) }
;
724 self.listener
.handle
= new_handle
;
727 // Transfer ownership of our handle into this stream
729 inner
: Arc
::new(Inner
::new(handle
)),
737 pub fn set_timeout(&mut self, timeout
: Option
<u64>) {
738 self.deadline
= timeout
.map(|i
| i
+ timer
::now()).unwrap_or(0);
741 pub fn close_accept(&mut self) -> IoResult
<()> {
742 self.inner
.closed
.store(true, Ordering
::SeqCst
);
744 c
::SetEvent(self.inner
.abort
.handle())
747 Err(super::last_error())
753 pub fn handle(&self) -> libc
::HANDLE
{
754 self.listener
.handle()
758 impl Clone
for UnixAcceptor
{
759 fn clone(&self) -> UnixAcceptor
{
760 let name
= to_utf16(&self.listener
.name
).unwrap();
762 inner
: self.inner
.clone(),
763 event
: Event
::new(true, false).unwrap(),
765 listener
: UnixListener
{
766 name
: self.listener
.name
.clone(),
768 let p
= pipe(name
.as_ptr(), false) ;
769 assert
!(p
!= libc
::INVALID_HANDLE_VALUE
as libc
::HANDLE
);