]> git.proxmox.com Git - cargo.git/blob - vendor/miow/src/pipe.rs
New upstream version 0.47.0
[cargo.git] / vendor / miow / src / pipe.rs
1 //! Interprocess Communication pipes
2 //!
3 //! A pipe is a section of shared memory that processes use for communication.
4 //! The process that creates a pipe is the _pipe server_. A process that connects
5 //! to a pipe is a _pipe client_. One process writes information to the pipe, then
6 //! the other process reads the information from the pipe. This overview
7 //! describes how to create, manage, and use pipes.
8 //!
9 //! There are two types of pipes: [anonymous pipes](#fn.anonymous.html) and
10 //! [named pipes](#fn.named.html). Anonymous pipes require less overhead than
11 //! named pipes, but offer limited services.
12 //!
13 //! # Anonymous pipes
14 //!
15 //! An anonymous pipe is an unnamed, one-way pipe that typically transfers data
16 //! between a parent process and a child process. Anonymous pipes are always
17 //! local; they cannot be used for communication over a network.
18 //!
19 //! # Named pipes
20 //!
21 //! A *named pipe* is a named, one-way or duplex pipe for communication between
22 //! the pipe server and one or more pipe clients. All instances of a named pipe
23 //! share the same pipe name, but each instance has its own buffers and handles,
24 //! and provides a separate conduit for client/server communication. The use of
25 //! instances enables multiple pipe clients to use the same named pipe
26 //! simultaneously.
27 //!
28 //! Any process can access named pipes, subject to security checks, making named
29 //! pipes an easy form of communication between related or unrelated processes.
30 //!
31 //! Any process can act as both a server and a client, making peer-to-peer
32 //! communication possible. As used here, the term pipe server refers to a
33 //! process that creates a named pipe, and the term pipe client refers to a
34 //! process that connects to an instance of a named pipe.
35 //!
36 //! Named pipes can be used to provide communication between processes on the
37 //! same computer or between processes on different computers across a network.
38 //! If the server service is running, all named pipes are accessible remotely. If
39 //! you intend to use a named pipe locally only, deny access to NT
40 //! AUTHORITY\\NETWORK or switch to local RPC.
41 //!
42 //! # References
43 //!
44 //! - [win32 pipe docs](https://github.com/MicrosoftDocs/win32/blob/docs/desktop-src/ipc/pipes.md)
45
46 use std::cell::RefCell;
47 use std::ffi::OsStr;
48 use std::fs::{File, OpenOptions};
49 use std::io;
50 use std::io::prelude::*;
51 use std::os::windows::ffi::*;
52 use std::os::windows::io::*;
53 use std::time::Duration;
54
55 use crate::handle::Handle;
56 use crate::overlapped::Overlapped;
57 use winapi::shared::minwindef::*;
58 use winapi::shared::ntdef::HANDLE;
59 use winapi::shared::winerror::*;
60 use winapi::um::fileapi::*;
61 use winapi::um::handleapi::*;
62 use winapi::um::ioapiset::*;
63 use winapi::um::minwinbase::*;
64 use winapi::um::namedpipeapi::*;
65 use winapi::um::winbase::*;
66
67 /// Readable half of an anonymous pipe.
68 #[derive(Debug)]
69 pub struct AnonRead(Handle);
70
71 /// Writable half of an anonymous pipe.
72 #[derive(Debug)]
73 pub struct AnonWrite(Handle);
74
75 /// A named pipe that can accept connections.
76 #[derive(Debug)]
77 pub struct NamedPipe(Handle);
78
79 /// A builder structure for creating a new named pipe.
80 #[derive(Debug)]
81 pub struct NamedPipeBuilder {
82 name: Vec<u16>,
83 dwOpenMode: DWORD,
84 dwPipeMode: DWORD,
85 nMaxInstances: DWORD,
86 nOutBufferSize: DWORD,
87 nInBufferSize: DWORD,
88 nDefaultTimeOut: DWORD,
89 }
90
91 /// Creates a new anonymous in-memory pipe, returning the read/write ends of the
92 /// pipe.
93 ///
94 /// The buffer size for this pipe may also be specified, but the system will
95 /// normally use this as a suggestion and it's not guaranteed that the buffer
96 /// will be precisely this size.
97 pub fn anonymous(buffer_size: u32) -> io::Result<(AnonRead, AnonWrite)> {
98 let mut read = 0 as HANDLE;
99 let mut write = 0 as HANDLE;
100 crate::cvt(unsafe { CreatePipe(&mut read, &mut write, 0 as *mut _, buffer_size) })?;
101 Ok((AnonRead(Handle::new(read)), AnonWrite(Handle::new(write))))
102 }
103
104 impl Read for AnonRead {
105 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
106 self.0.read(buf)
107 }
108 }
109 impl<'a> Read for &'a AnonRead {
110 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
111 self.0.read(buf)
112 }
113 }
114
115 impl AsRawHandle for AnonRead {
116 fn as_raw_handle(&self) -> HANDLE {
117 self.0.raw()
118 }
119 }
120 impl FromRawHandle for AnonRead {
121 unsafe fn from_raw_handle(handle: HANDLE) -> AnonRead {
122 AnonRead(Handle::new(handle))
123 }
124 }
125 impl IntoRawHandle for AnonRead {
126 fn into_raw_handle(self) -> HANDLE {
127 self.0.into_raw()
128 }
129 }
130
131 impl Write for AnonWrite {
132 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
133 self.0.write(buf)
134 }
135 fn flush(&mut self) -> io::Result<()> {
136 Ok(())
137 }
138 }
139 impl<'a> Write for &'a AnonWrite {
140 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
141 self.0.write(buf)
142 }
143 fn flush(&mut self) -> io::Result<()> {
144 Ok(())
145 }
146 }
147
148 impl AsRawHandle for AnonWrite {
149 fn as_raw_handle(&self) -> HANDLE {
150 self.0.raw()
151 }
152 }
153 impl FromRawHandle for AnonWrite {
154 unsafe fn from_raw_handle(handle: HANDLE) -> AnonWrite {
155 AnonWrite(Handle::new(handle))
156 }
157 }
158 impl IntoRawHandle for AnonWrite {
159 fn into_raw_handle(self) -> HANDLE {
160 self.0.into_raw()
161 }
162 }
163
164 /// A convenience function to connect to a named pipe.
165 ///
166 /// This function will block the calling process until it can connect to the
167 /// pipe server specified by `addr`. This will use `NamedPipe::wait` internally
168 /// to block until it can connect.
169 pub fn connect<A: AsRef<OsStr>>(addr: A) -> io::Result<File> {
170 _connect(addr.as_ref())
171 }
172
173 fn _connect(addr: &OsStr) -> io::Result<File> {
174 let mut r = OpenOptions::new();
175 let mut w = OpenOptions::new();
176 let mut rw = OpenOptions::new();
177 r.read(true);
178 w.write(true);
179 rw.read(true).write(true);
180 loop {
181 let res = rw
182 .open(addr)
183 .or_else(|_| r.open(addr))
184 .or_else(|_| w.open(addr));
185 match res {
186 Ok(f) => return Ok(f),
187 Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => {}
188 Err(e) => return Err(e),
189 }
190
191 NamedPipe::wait(addr, Some(Duration::new(20, 0)))?;
192 }
193 }
194
195 impl NamedPipe {
196 /// Creates a new initial named pipe.
197 ///
198 /// This function is equivalent to:
199 ///
200 /// ```
201 /// use miow::pipe::NamedPipeBuilder;
202 ///
203 /// # let addr = "foo";
204 /// NamedPipeBuilder::new(addr)
205 /// .first(true)
206 /// .inbound(true)
207 /// .outbound(true)
208 /// .out_buffer_size(65536)
209 /// .in_buffer_size(65536)
210 /// .create();
211 /// ```
212 pub fn new<A: AsRef<OsStr>>(addr: A) -> io::Result<NamedPipe> {
213 NamedPipeBuilder::new(addr).create()
214 }
215
216 /// Waits until either a time-out interval elapses or an instance of the
217 /// specified named pipe is available for connection.
218 ///
219 /// If this function succeeds the process can create a `File` to connect to
220 /// the named pipe.
221 pub fn wait<A: AsRef<OsStr>>(addr: A, timeout: Option<Duration>) -> io::Result<()> {
222 NamedPipe::_wait(addr.as_ref(), timeout)
223 }
224
225 fn _wait(addr: &OsStr, timeout: Option<Duration>) -> io::Result<()> {
226 let addr = addr.encode_wide().chain(Some(0)).collect::<Vec<_>>();
227 let timeout = crate::dur2ms(timeout);
228 crate::cvt(unsafe { WaitNamedPipeW(addr.as_ptr(), timeout) }).map(|_| ())
229 }
230
231 /// Connects this named pipe to a client, blocking until one becomes
232 /// available.
233 ///
234 /// This function will call the `ConnectNamedPipe` function to await for a
235 /// client to connect. This can be called immediately after the pipe is
236 /// created, or after it has been disconnected from a previous client.
237 pub fn connect(&self) -> io::Result<()> {
238 match crate::cvt(unsafe { ConnectNamedPipe(self.0.raw(), 0 as *mut _) }) {
239 Ok(_) => Ok(()),
240 Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_CONNECTED as i32) => Ok(()),
241 Err(e) => Err(e),
242 }
243 }
244
245 /// Issue a connection request with the specified overlapped operation.
246 ///
247 /// This function will issue a request to connect a client to this server,
248 /// returning immediately after starting the overlapped operation.
249 ///
250 /// If this function immediately succeeds then `Ok(true)` is returned. If
251 /// the overlapped operation is enqueued and pending, then `Ok(false)` is
252 /// returned. Otherwise an error is returned indicating what went wrong.
253 ///
254 /// # Unsafety
255 ///
256 /// This function is unsafe because the kernel requires that the
257 /// `overlapped` pointer is valid until the end of the I/O operation. The
258 /// kernel also requires that `overlapped` is unique for this I/O operation
259 /// and is not in use for any other I/O.
260 ///
261 /// To safely use this function callers must ensure that this pointer is
262 /// valid until the I/O operation is completed, typically via completion
263 /// ports and waiting to receive the completion notification on the port.
264 pub unsafe fn connect_overlapped(&self, overlapped: *mut OVERLAPPED) -> io::Result<bool> {
265 match crate::cvt(ConnectNamedPipe(self.0.raw(), overlapped)) {
266 Ok(_) => Ok(true),
267 Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_CONNECTED as i32) => Ok(true),
268 Err(ref e) if e.raw_os_error() == Some(ERROR_IO_PENDING as i32) => Ok(false),
269 Err(ref e) if e.raw_os_error() == Some(ERROR_NO_DATA as i32) => Ok(true),
270 Err(e) => Err(e),
271 }
272 }
273
274 /// Disconnects this named pipe from any connected client.
275 pub fn disconnect(&self) -> io::Result<()> {
276 crate::cvt(unsafe { DisconnectNamedPipe(self.0.raw()) }).map(|_| ())
277 }
278
279 /// Issues an overlapped read operation to occur on this pipe.
280 ///
281 /// This function will issue an asynchronous read to occur in an overlapped
282 /// fashion, returning immediately. The `buf` provided will be filled in
283 /// with data and the request is tracked by the `overlapped` function
284 /// provided.
285 ///
286 /// If the operation succeeds immediately, `Ok(Some(n))` is returned where
287 /// `n` is the number of bytes read. If an asynchronous operation is
288 /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred
289 /// it is returned.
290 ///
291 /// When this operation completes (or if it completes immediately), another
292 /// mechanism must be used to learn how many bytes were transferred (such as
293 /// looking at the filed in the IOCP status message).
294 ///
295 /// # Unsafety
296 ///
297 /// This function is unsafe because the kernel requires that the `buf` and
298 /// `overlapped` pointers to be valid until the end of the I/O operation.
299 /// The kernel also requires that `overlapped` is unique for this I/O
300 /// operation and is not in use for any other I/O.
301 ///
302 /// To safely use this function callers must ensure that the pointers are
303 /// valid until the I/O operation is completed, typically via completion
304 /// ports and waiting to receive the completion notification on the port.
305 pub unsafe fn read_overlapped(
306 &self,
307 buf: &mut [u8],
308 overlapped: *mut OVERLAPPED,
309 ) -> io::Result<Option<usize>> {
310 self.0.read_overlapped(buf, overlapped)
311 }
312
313 /// Issues an overlapped write operation to occur on this pipe.
314 ///
315 /// This function will issue an asynchronous write to occur in an overlapped
316 /// fashion, returning immediately. The `buf` provided will be filled in
317 /// with data and the request is tracked by the `overlapped` function
318 /// provided.
319 ///
320 /// If the operation succeeds immediately, `Ok(Some(n))` is returned where
321 /// `n` is the number of bytes written. If an asynchronous operation is
322 /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred
323 /// it is returned.
324 ///
325 /// When this operation completes (or if it completes immediately), another
326 /// mechanism must be used to learn how many bytes were transferred (such as
327 /// looking at the filed in the IOCP status message).
328 ///
329 /// # Unsafety
330 ///
331 /// This function is unsafe because the kernel requires that the `buf` and
332 /// `overlapped` pointers to be valid until the end of the I/O operation.
333 /// The kernel also requires that `overlapped` is unique for this I/O
334 /// operation and is not in use for any other I/O.
335 ///
336 /// To safely use this function callers must ensure that the pointers are
337 /// valid until the I/O operation is completed, typically via completion
338 /// ports and waiting to receive the completion notification on the port.
339 pub unsafe fn write_overlapped(
340 &self,
341 buf: &[u8],
342 overlapped: *mut OVERLAPPED,
343 ) -> io::Result<Option<usize>> {
344 self.0.write_overlapped(buf, overlapped)
345 }
346
347 /// Calls the `GetOverlappedResult` function to get the result of an
348 /// overlapped operation for this handle.
349 ///
350 /// This function takes the `OVERLAPPED` argument which must have been used
351 /// to initiate an overlapped I/O operation, and returns either the
352 /// successful number of bytes transferred during the operation or an error
353 /// if one occurred.
354 ///
355 /// # Unsafety
356 ///
357 /// This function is unsafe as `overlapped` must have previously been used
358 /// to execute an operation for this handle, and it must also be a valid
359 /// pointer to an `Overlapped` instance.
360 ///
361 /// # Panics
362 ///
363 /// This function will panic
364 pub unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result<usize> {
365 let mut transferred = 0;
366 let r = GetOverlappedResult(self.0.raw(), overlapped, &mut transferred, FALSE);
367 if r == 0 {
368 Err(io::Error::last_os_error())
369 } else {
370 Ok(transferred as usize)
371 }
372 }
373 }
374
375 thread_local! {
376 static NAMED_PIPE_OVERLAPPED: RefCell<Option<Overlapped>> = RefCell::new(None);
377 }
378
379 /// Call a function with a threadlocal `Overlapped`. The function `f` should be
380 /// sure that the event is reset, either manually or by a thread being released.
381 fn with_threadlocal_overlapped<F>(f: F) -> io::Result<usize>
382 where
383 F: FnOnce(&Overlapped) -> io::Result<usize>,
384 {
385 NAMED_PIPE_OVERLAPPED.with(|overlapped| {
386 let mut mborrow = overlapped.borrow_mut();
387 if let None = *mborrow {
388 let op = Overlapped::initialize_with_autoreset_event()?;
389 *mborrow = Some(op);
390 }
391 f(mborrow.as_ref().unwrap())
392 })
393 }
394
395 impl Read for NamedPipe {
396 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
397 // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
398 with_threadlocal_overlapped(|overlapped| unsafe {
399 self.0
400 .read_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
401 })
402 }
403 }
404 impl<'a> Read for &'a NamedPipe {
405 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
406 // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
407 with_threadlocal_overlapped(|overlapped| unsafe {
408 self.0
409 .read_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
410 })
411 }
412 }
413
414 impl Write for NamedPipe {
415 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
416 // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
417 with_threadlocal_overlapped(|overlapped| unsafe {
418 self.0
419 .write_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
420 })
421 }
422 fn flush(&mut self) -> io::Result<()> {
423 <&NamedPipe as Write>::flush(&mut &*self)
424 }
425 }
426 impl<'a> Write for &'a NamedPipe {
427 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
428 // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
429 with_threadlocal_overlapped(|overlapped| unsafe {
430 self.0
431 .write_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
432 })
433 }
434 fn flush(&mut self) -> io::Result<()> {
435 crate::cvt(unsafe { FlushFileBuffers(self.0.raw()) }).map(|_| ())
436 }
437 }
438
439 impl AsRawHandle for NamedPipe {
440 fn as_raw_handle(&self) -> HANDLE {
441 self.0.raw()
442 }
443 }
444 impl FromRawHandle for NamedPipe {
445 unsafe fn from_raw_handle(handle: HANDLE) -> NamedPipe {
446 NamedPipe(Handle::new(handle))
447 }
448 }
449 impl IntoRawHandle for NamedPipe {
450 fn into_raw_handle(self) -> HANDLE {
451 self.0.into_raw()
452 }
453 }
454
455 fn flag(slot: &mut DWORD, on: bool, val: DWORD) {
456 if on {
457 *slot |= val;
458 } else {
459 *slot &= !val;
460 }
461 }
462
463 impl NamedPipeBuilder {
464 /// Creates a new named pipe builder with the default settings.
465 pub fn new<A: AsRef<OsStr>>(addr: A) -> NamedPipeBuilder {
466 NamedPipeBuilder {
467 name: addr.as_ref().encode_wide().chain(Some(0)).collect(),
468 dwOpenMode: PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE | FILE_FLAG_OVERLAPPED,
469 dwPipeMode: PIPE_TYPE_BYTE,
470 nMaxInstances: PIPE_UNLIMITED_INSTANCES,
471 nOutBufferSize: 65536,
472 nInBufferSize: 65536,
473 nDefaultTimeOut: 0,
474 }
475 }
476
477 /// Indicates whether data is allowed to flow from the client to the server.
478 pub fn inbound(&mut self, allowed: bool) -> &mut Self {
479 flag(&mut self.dwOpenMode, allowed, PIPE_ACCESS_INBOUND);
480 self
481 }
482
483 /// Indicates whether data is allowed to flow from the server to the client.
484 pub fn outbound(&mut self, allowed: bool) -> &mut Self {
485 flag(&mut self.dwOpenMode, allowed, PIPE_ACCESS_OUTBOUND);
486 self
487 }
488
489 /// Indicates that this pipe must be the first instance.
490 ///
491 /// If set to true, then creation will fail if there's already an instance
492 /// elsewhere.
493 pub fn first(&mut self, first: bool) -> &mut Self {
494 flag(&mut self.dwOpenMode, first, FILE_FLAG_FIRST_PIPE_INSTANCE);
495 self
496 }
497
498 /// Indicates whether this server can accept remote clients or not.
499 pub fn accept_remote(&mut self, accept: bool) -> &mut Self {
500 flag(&mut self.dwPipeMode, !accept, PIPE_REJECT_REMOTE_CLIENTS);
501 self
502 }
503
504 /// Specifies the maximum number of instances of the server pipe that are
505 /// allowed.
506 ///
507 /// The first instance of a pipe can specify this value. A value of 255
508 /// indicates that there is no limit to the number of instances.
509 pub fn max_instances(&mut self, instances: u8) -> &mut Self {
510 self.nMaxInstances = instances as DWORD;
511 self
512 }
513
514 /// Specifies the number of bytes to reserver for the output buffer
515 pub fn out_buffer_size(&mut self, buffer: u32) -> &mut Self {
516 self.nOutBufferSize = buffer as DWORD;
517 self
518 }
519
520 /// Specifies the number of bytes to reserver for the input buffer
521 pub fn in_buffer_size(&mut self, buffer: u32) -> &mut Self {
522 self.nInBufferSize = buffer as DWORD;
523 self
524 }
525
526 /// Using the options in this builder, attempt to create a new named pipe.
527 ///
528 /// This function will call the `CreateNamedPipe` function and return the
529 /// result.
530 pub fn create(&mut self) -> io::Result<NamedPipe> {
531 unsafe { self.with_security_attributes(::std::ptr::null_mut()) }
532 }
533
534 /// Using the options in the builder and the provided security attributes, attempt to create a
535 /// new named pipe. This function has to be called with a valid pointer to a
536 /// `SECURITY_ATTRIBUTES` struct that will stay valid for the lifetime of this function or a
537 /// null pointer.
538 ///
539 /// This function will call the `CreateNamedPipe` function and return the
540 /// result.
541 pub unsafe fn with_security_attributes(
542 &mut self,
543 attrs: *mut SECURITY_ATTRIBUTES,
544 ) -> io::Result<NamedPipe> {
545 let h = CreateNamedPipeW(
546 self.name.as_ptr(),
547 self.dwOpenMode,
548 self.dwPipeMode,
549 self.nMaxInstances,
550 self.nOutBufferSize,
551 self.nInBufferSize,
552 self.nDefaultTimeOut,
553 attrs,
554 );
555
556 if h == INVALID_HANDLE_VALUE {
557 Err(io::Error::last_os_error())
558 } else {
559 Ok(NamedPipe(Handle::new(h)))
560 }
561 }
562 }
563
564 #[cfg(test)]
565 mod tests {
566 use std::fs::{File, OpenOptions};
567 use std::io::prelude::*;
568 use std::sync::mpsc::channel;
569 use std::thread;
570 use std::time::Duration;
571
572 use rand::{thread_rng, Rng};
573
574 use super::{anonymous, NamedPipe, NamedPipeBuilder};
575 use crate::iocp::CompletionPort;
576 use crate::Overlapped;
577
578 fn name() -> String {
579 let name = thread_rng().gen_ascii_chars().take(30).collect::<String>();
580 format!(r"\\.\pipe\{}", name)
581 }
582
583 #[test]
584 fn anon() {
585 let (mut read, mut write) = t!(anonymous(256));
586 assert_eq!(t!(write.write(&[1, 2, 3])), 3);
587 let mut b = [0; 10];
588 assert_eq!(t!(read.read(&mut b)), 3);
589 assert_eq!(&b[..3], &[1, 2, 3]);
590 }
591
592 #[test]
593 fn named_not_first() {
594 let name = name();
595 let _a = t!(NamedPipe::new(&name));
596 assert!(NamedPipe::new(&name).is_err());
597
598 t!(NamedPipeBuilder::new(&name).first(false).create());
599 }
600
601 #[test]
602 fn named_connect() {
603 let name = name();
604 let a = t!(NamedPipe::new(&name));
605
606 let t = thread::spawn(move || {
607 t!(File::open(name));
608 });
609
610 t!(a.connect());
611 t!(a.disconnect());
612 t!(t.join());
613 }
614
615 #[test]
616 fn named_wait() {
617 let name = name();
618 let a = t!(NamedPipe::new(&name));
619
620 let (tx, rx) = channel();
621 let t = thread::spawn(move || {
622 t!(NamedPipe::wait(&name, None));
623 t!(File::open(&name));
624 assert!(NamedPipe::wait(&name, Some(Duration::from_millis(1))).is_err());
625 t!(tx.send(()));
626 });
627
628 t!(a.connect());
629 t!(rx.recv());
630 t!(a.disconnect());
631 t!(t.join());
632 }
633
634 #[test]
635 fn named_connect_overlapped() {
636 let name = name();
637 let a = t!(NamedPipe::new(&name));
638
639 let t = thread::spawn(move || {
640 t!(File::open(name));
641 });
642
643 let cp = t!(CompletionPort::new(1));
644 t!(cp.add_handle(2, &a));
645
646 let over = Overlapped::zero();
647 unsafe {
648 t!(a.connect_overlapped(over.raw()));
649 }
650
651 let status = t!(cp.get(None));
652 assert_eq!(status.bytes_transferred(), 0);
653 assert_eq!(status.token(), 2);
654 assert_eq!(status.overlapped(), over.raw());
655 t!(t.join());
656 }
657
658 #[test]
659 fn named_read_write() {
660 let name = name();
661 let mut a = t!(NamedPipe::new(&name));
662
663 let t = thread::spawn(move || {
664 let mut f = t!(OpenOptions::new().read(true).write(true).open(name));
665 t!(f.write_all(&[1, 2, 3]));
666 let mut b = [0; 10];
667 assert_eq!(t!(f.read(&mut b)), 3);
668 assert_eq!(&b[..3], &[1, 2, 3]);
669 });
670
671 t!(a.connect());
672 let mut b = [0; 10];
673 assert_eq!(t!(a.read(&mut b)), 3);
674 assert_eq!(&b[..3], &[1, 2, 3]);
675 t!(a.write_all(&[1, 2, 3]));
676 t!(a.flush());
677 t!(a.disconnect());
678 t!(t.join());
679 }
680
681 #[test]
682 fn named_read_write_multi() {
683 for _ in 0..5 {
684 named_read_write()
685 }
686 }
687
688 #[test]
689 fn named_read_write_multi_same_thread() {
690 let name1 = name();
691 let mut a1 = t!(NamedPipe::new(&name1));
692 let name2 = name();
693 let mut a2 = t!(NamedPipe::new(&name2));
694
695 let t = thread::spawn(move || {
696 let mut f = t!(OpenOptions::new().read(true).write(true).open(name1));
697 t!(f.write_all(&[1, 2, 3]));
698 let mut b = [0; 10];
699 assert_eq!(t!(f.read(&mut b)), 3);
700 assert_eq!(&b[..3], &[1, 2, 3]);
701
702 let mut f = t!(OpenOptions::new().read(true).write(true).open(name2));
703 t!(f.write_all(&[1, 2, 3]));
704 let mut b = [0; 10];
705 assert_eq!(t!(f.read(&mut b)), 3);
706 assert_eq!(&b[..3], &[1, 2, 3]);
707 });
708
709 t!(a1.connect());
710 let mut b = [0; 10];
711 assert_eq!(t!(a1.read(&mut b)), 3);
712 assert_eq!(&b[..3], &[1, 2, 3]);
713 t!(a1.write_all(&[1, 2, 3]));
714 t!(a1.flush());
715 t!(a1.disconnect());
716
717 t!(a2.connect());
718 let mut b = [0; 10];
719 assert_eq!(t!(a2.read(&mut b)), 3);
720 assert_eq!(&b[..3], &[1, 2, 3]);
721 t!(a2.write_all(&[1, 2, 3]));
722 t!(a2.flush());
723 t!(a2.disconnect());
724
725 t!(t.join());
726 }
727
728 #[test]
729 fn named_read_overlapped() {
730 let name = name();
731 let a = t!(NamedPipe::new(&name));
732
733 let t = thread::spawn(move || {
734 let mut f = t!(File::create(name));
735 t!(f.write_all(&[1, 2, 3]));
736 });
737
738 let cp = t!(CompletionPort::new(1));
739 t!(cp.add_handle(3, &a));
740 t!(a.connect());
741
742 let mut b = [0; 10];
743 let over = Overlapped::zero();
744 unsafe {
745 t!(a.read_overlapped(&mut b, over.raw()));
746 }
747 let status = t!(cp.get(None));
748 assert_eq!(status.bytes_transferred(), 3);
749 assert_eq!(status.token(), 3);
750 assert_eq!(status.overlapped(), over.raw());
751 assert_eq!(&b[..3], &[1, 2, 3]);
752
753 t!(t.join());
754 }
755
756 #[test]
757 fn named_write_overlapped() {
758 let name = name();
759 let a = t!(NamedPipe::new(&name));
760
761 let t = thread::spawn(move || {
762 let mut f = t!(super::connect(name));
763 let mut b = [0; 10];
764 assert_eq!(t!(f.read(&mut b)), 3);
765 assert_eq!(&b[..3], &[1, 2, 3])
766 });
767
768 let cp = t!(CompletionPort::new(1));
769 t!(cp.add_handle(3, &a));
770 t!(a.connect());
771
772 let over = Overlapped::zero();
773 unsafe {
774 t!(a.write_overlapped(&[1, 2, 3], over.raw()));
775 }
776
777 let status = t!(cp.get(None));
778 assert_eq!(status.bytes_transferred(), 3);
779 assert_eq!(status.token(), 3);
780 assert_eq!(status.overlapped(), over.raw());
781
782 t!(t.join());
783 }
784 }