]> git.proxmox.com Git - rustc.git/blob - src/libstd/old_io/net/pipe.rs
Imported Upstream version 1.0.0~beta
[rustc.git] / src / libstd / old_io / net / pipe.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Named pipes
12 //!
13 //! This module contains the ability to communicate over named pipes with
14 //! synchronous I/O. On windows, this corresponds to talking over a Named Pipe,
15 //! while on Unix it corresponds to UNIX domain sockets.
16 //!
17 //! These pipes are similar to TCP in the sense that you can have both a stream to a
18 //! server and a server itself. The server provided accepts other `UnixStream`
19 //! instances as clients.
20
21 #![allow(missing_docs)]
22 #![deprecated(since = "1.0.0",
23 reason = "will be removed to be reintroduced at a later date; \
24 in the meantime consider using the `unix_socket` crate \
25 for unix sockets; there is currently no replacement \
26 for named pipes")]
27 #![unstable(feature = "old_io")]
28
29 use prelude::v1::*;
30
31 use ffi::CString;
32 use old_path::BytesContainer;
33 use old_io::{Listener, Acceptor, IoResult, TimedOut, standard_error};
34 use old_io::{Reader, Writer};
35 use sys::pipe::UnixAcceptor as UnixAcceptorImp;
36 use sys::pipe::UnixListener as UnixListenerImp;
37 use sys::pipe::UnixStream as UnixStreamImp;
38 use time::Duration;
39
40 use sys_common;
41
42 /// A stream which communicates over a named pipe.
43 pub struct UnixStream {
44 inner: UnixStreamImp,
45 }
46
47 impl UnixStream {
48
49 /// Connect to a pipe named by `path`. This will attempt to open a
50 /// connection to the underlying socket.
51 ///
52 /// The returned stream will be closed when the object falls out of scope.
53 ///
54 /// # Examples
55 ///
56 /// ```
57 /// # #![feature(old_io, old_path, io)]
58 /// # #![allow(unused_must_use)]
59 /// use std::old_io::net::pipe::UnixStream;
60 /// use std::old_io::*;
61 /// use std::old_path::Path;
62 ///
63 /// let server = Path::new("path/to/my/socket");
64 /// let mut stream = UnixStream::connect(&server);
65 /// stream.write(&[1, 2, 3]);
66 /// ```
67 pub fn connect<P: BytesContainer>(path: P) -> IoResult<UnixStream> {
68 let path = try!(CString::new(path.container_as_bytes()));
69 UnixStreamImp::connect(&path, None)
70 .map(|inner| UnixStream { inner: inner })
71 }
72
73 /// Connect to a pipe named by `path`, timing out if the specified number of
74 /// milliseconds.
75 ///
76 /// This function is similar to `connect`, except that if `timeout`
77 /// elapses the function will return an error of kind `TimedOut`.
78 ///
79 /// If a `timeout` with zero or negative duration is specified then
80 /// the function returns `Err`, with the error kind set to `TimedOut`.
81 #[unstable(feature = "io",
82 reason = "the timeout argument is likely to change types")]
83 pub fn connect_timeout<P>(path: P, timeout: Duration)
84 -> IoResult<UnixStream>
85 where P: BytesContainer {
86 if timeout <= Duration::milliseconds(0) {
87 return Err(standard_error(TimedOut));
88 }
89
90 let path = try!(CString::new(path.container_as_bytes()));
91 UnixStreamImp::connect(&path, Some(timeout.num_milliseconds() as u64))
92 .map(|inner| UnixStream { inner: inner })
93 }
94
95
96 /// Closes the reading half of this connection.
97 ///
98 /// This method will close the reading portion of this connection, causing
99 /// all pending and future reads to immediately return with an error.
100 ///
101 /// Note that this method affects all cloned handles associated with this
102 /// stream, not just this one handle.
103 pub fn close_read(&mut self) -> IoResult<()> {
104 self.inner.close_read()
105 }
106
107 /// Closes the writing half of this connection.
108 ///
109 /// This method will close the writing portion of this connection, causing
110 /// all pending and future writes to immediately return with an error.
111 ///
112 /// Note that this method affects all cloned handles associated with this
113 /// stream, not just this one handle.
114 pub fn close_write(&mut self) -> IoResult<()> {
115 self.inner.close_write()
116 }
117
118 /// Sets the read/write timeout for this socket.
119 ///
120 /// For more information, see `TcpStream::set_timeout`
121 #[unstable(feature = "io",
122 reason = "the timeout argument may change in type and value")]
123 pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
124 self.inner.set_timeout(timeout_ms)
125 }
126
127 /// Sets the read timeout for this socket.
128 ///
129 /// For more information, see `TcpStream::set_timeout`
130 #[unstable(feature = "io",
131 reason = "the timeout argument may change in type and value")]
132 pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
133 self.inner.set_read_timeout(timeout_ms)
134 }
135
136 /// Sets the write timeout for this socket.
137 ///
138 /// For more information, see `TcpStream::set_timeout`
139 #[unstable(feature = "io",
140 reason = "the timeout argument may change in type and value")]
141 pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
142 self.inner.set_write_timeout(timeout_ms)
143 }
144 }
145
146 impl Clone for UnixStream {
147 fn clone(&self) -> UnixStream {
148 UnixStream { inner: self.inner.clone() }
149 }
150 }
151
152 impl Reader for UnixStream {
153 fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
154 self.inner.read(buf)
155 }
156 }
157
158 impl Writer for UnixStream {
159 fn write_all(&mut self, buf: &[u8]) -> IoResult<()> {
160 self.inner.write(buf)
161 }
162 }
163
164 impl sys_common::AsInner<UnixStreamImp> for UnixStream {
165 fn as_inner(&self) -> &UnixStreamImp {
166 &self.inner
167 }
168 }
169
170 /// A value that can listen for incoming named pipe connection requests.
171 pub struct UnixListener {
172 /// The internal, opaque runtime Unix listener.
173 inner: UnixListenerImp,
174 }
175
176 impl UnixListener {
177 /// Creates a new listener, ready to receive incoming connections on the
178 /// specified socket. The server will be named by `path`.
179 ///
180 /// This listener will be closed when it falls out of scope.
181 ///
182 /// # Examples
183 ///
184 /// ```
185 /// # #![feature(old_io, io, old_path)]
186 /// # fn foo() {
187 /// use std::old_io::net::pipe::UnixListener;
188 /// use std::old_io::*;
189 /// use std::old_path::Path;
190 ///
191 /// let server = Path::new("/path/to/my/socket");
192 /// let stream = UnixListener::bind(&server);
193 /// for mut client in stream.listen().incoming() {
194 /// client.write(&[1, 2, 3, 4]);
195 /// }
196 /// # }
197 /// ```
198 pub fn bind<P: BytesContainer>(path: P) -> IoResult<UnixListener> {
199 let path = try!(CString::new(path.container_as_bytes()));
200 UnixListenerImp::bind(&path)
201 .map(|inner| UnixListener { inner: inner })
202 }
203 }
204
205 impl Listener<UnixAcceptor> for UnixListener {
206 fn listen(self) -> IoResult<UnixAcceptor> {
207 self.inner.listen()
208 .map(|inner| UnixAcceptor { inner: inner })
209 }
210 }
211
212 impl sys_common::AsInner<UnixListenerImp> for UnixListener {
213 fn as_inner(&self) -> &UnixListenerImp {
214 &self.inner
215 }
216 }
217
218 /// A value that can accept named pipe connections, returned from `listen()`.
219 pub struct UnixAcceptor {
220 /// The internal, opaque runtime Unix acceptor.
221 inner: UnixAcceptorImp
222 }
223
224 impl UnixAcceptor {
225 /// Sets a timeout for this acceptor, after which accept() will no longer
226 /// block indefinitely.
227 ///
228 /// The argument specified is the amount of time, in milliseconds, into the
229 /// future after which all invocations of accept() will not block (and any
230 /// pending invocation will return). A value of `None` will clear any
231 /// existing timeout.
232 ///
233 /// When using this method, it is likely necessary to reset the timeout as
234 /// appropriate, the timeout specified is specific to this object, not
235 /// specific to the next request.
236 #[unstable(feature = "io",
237 reason = "the name and arguments to this function are likely \
238 to change")]
239 pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
240 self.inner.set_timeout(timeout_ms)
241 }
242
243 /// Closes the accepting capabilities of this acceptor.
244 ///
245 /// This function has the same semantics as `TcpAcceptor::close_accept`, and
246 /// more information can be found in that documentation.
247 #[unstable(feature = "io")]
248 pub fn close_accept(&mut self) -> IoResult<()> {
249 self.inner.close_accept()
250 }
251 }
252
253 impl Acceptor for UnixAcceptor {
254 type Connection = UnixStream;
255 fn accept(&mut self) -> IoResult<UnixStream> {
256 self.inner.accept().map(|s| {
257 UnixStream { inner: s }
258 })
259 }
260 }
261
262 impl Clone for UnixAcceptor {
263 /// Creates a new handle to this unix acceptor, allowing for simultaneous
264 /// accepts.
265 ///
266 /// The underlying unix acceptor will not be closed until all handles to the
267 /// acceptor have been deallocated. Incoming connections will be received on
268 /// at most once acceptor, the same connection will not be accepted twice.
269 ///
270 /// The `close_accept` method will shut down *all* acceptors cloned from the
271 /// same original acceptor, whereas the `set_timeout` method only affects
272 /// the selector that it is called on.
273 ///
274 /// This function is useful for creating a handle to invoke `close_accept`
275 /// on to wake up any other task blocked in `accept`.
276 fn clone(&self) -> UnixAcceptor {
277 UnixAcceptor { inner: self.inner.clone() }
278 }
279 }
280
281 impl sys_common::AsInner<UnixAcceptorImp> for UnixAcceptor {
282 fn as_inner(&self) -> &UnixAcceptorImp {
283 &self.inner
284 }
285 }
286
287 #[cfg(test)]
288 mod tests {
289 use prelude::v1::*;
290
291 use old_io::fs::PathExtensions;
292 use old_io::{EndOfFile, TimedOut, ShortWrite, IoError, ConnectionReset};
293 use old_io::{NotConnected, BrokenPipe, FileNotFound, InvalidInput, OtherIoError};
294 use old_io::{PermissionDenied, Acceptor, Listener};
295 use old_io::{Reader, Writer};
296 use old_io::test::*;
297 use super::*;
298 use sync::mpsc::channel;
299 use thread;
300 use time::Duration;
301
302 pub fn smalltest<F,G>(server: F, client: G)
303 where F : FnOnce(UnixStream), F : Send,
304 G : FnOnce(UnixStream), G : Send + 'static
305 {
306 let path1 = next_test_unix();
307 let path2 = path1.clone();
308
309 let mut acceptor = UnixListener::bind(&path1).listen();
310
311 let _t = thread::spawn(move|| {
312 match UnixStream::connect(&path2) {
313 Ok(c) => client(c),
314 Err(e) => panic!("failed connect: {}", e),
315 }
316 });
317
318 match acceptor.accept() {
319 Ok(c) => server(c),
320 Err(e) => panic!("failed accept: {}", e),
321 }
322 }
323
324 #[test]
325 fn bind_error() {
326 let path = "path/to/nowhere";
327 match UnixListener::bind(&path) {
328 Ok(..) => panic!(),
329 Err(e) => {
330 assert!(e.kind == PermissionDenied || e.kind == FileNotFound ||
331 e.kind == InvalidInput);
332 }
333 }
334 }
335
336 #[test]
337 fn connect_error() {
338 let path = if cfg!(windows) {
339 r"\\.\pipe\this_should_not_exist_ever"
340 } else {
341 "path/to/nowhere"
342 };
343 match UnixStream::connect(&path) {
344 Ok(..) => panic!(),
345 Err(e) => {
346 assert!(e.kind == FileNotFound || e.kind == OtherIoError);
347 }
348 }
349 }
350
351 #[test]
352 fn smoke() {
353 smalltest(move |mut server| {
354 let mut buf = [0];
355 server.read(&mut buf).unwrap();
356 assert!(buf[0] == 99);
357 }, move|mut client| {
358 client.write(&[99]).unwrap();
359 })
360 }
361
362 #[cfg_attr(windows, ignore)] // FIXME(#12516)
363 #[test]
364 fn read_eof() {
365 smalltest(move|mut server| {
366 let mut buf = [0];
367 assert!(server.read(&mut buf).is_err());
368 assert!(server.read(&mut buf).is_err());
369 }, move|_client| {
370 // drop the client
371 })
372 }
373
374 #[test]
375 fn write_begone() {
376 smalltest(move|mut server| {
377 let buf = [0];
378 loop {
379 match server.write(&buf) {
380 Ok(..) => {}
381 Err(e) => {
382 assert!(e.kind == BrokenPipe ||
383 e.kind == NotConnected ||
384 e.kind == ConnectionReset,
385 "unknown error {}", e);
386 break;
387 }
388 }
389 }
390 }, move|_client| {
391 // drop the client
392 })
393 }
394
395 #[test]
396 fn accept_lots() {
397 let times = 10;
398 let path1 = next_test_unix();
399 let path2 = path1.clone();
400
401 let mut acceptor = match UnixListener::bind(&path1).listen() {
402 Ok(a) => a,
403 Err(e) => panic!("failed listen: {}", e),
404 };
405
406 let _t = thread::spawn(move|| {
407 for _ in 0..times {
408 let mut stream = UnixStream::connect(&path2);
409 match stream.write(&[100]) {
410 Ok(..) => {}
411 Err(e) => panic!("failed write: {}", e)
412 }
413 }
414 });
415
416 for _ in 0..times {
417 let mut client = acceptor.accept();
418 let mut buf = [0];
419 match client.read(&mut buf) {
420 Ok(..) => {}
421 Err(e) => panic!("failed read/accept: {}", e),
422 }
423 assert_eq!(buf[0], 100);
424 }
425 }
426
427 #[cfg(unix)]
428 #[test]
429 fn path_exists() {
430 let path = next_test_unix();
431 let _acceptor = UnixListener::bind(&path).listen();
432 assert!(path.exists());
433 }
434
435 #[test]
436 fn unix_clone_smoke() {
437 let addr = next_test_unix();
438 let mut acceptor = UnixListener::bind(&addr).listen();
439
440 let _t = thread::spawn(move|| {
441 let mut s = UnixStream::connect(&addr);
442 let mut buf = [0, 0];
443 debug!("client reading");
444 assert_eq!(s.read(&mut buf), Ok(1));
445 assert_eq!(buf[0], 1);
446 debug!("client writing");
447 s.write(&[2]).unwrap();
448 debug!("client dropping");
449 });
450
451 let mut s1 = acceptor.accept().unwrap();
452 let s2 = s1.clone();
453
454 let (tx1, rx1) = channel();
455 let (tx2, rx2) = channel();
456 let _t = thread::spawn(move|| {
457 let mut s2 = s2;
458 rx1.recv().unwrap();
459 debug!("writer writing");
460 s2.write(&[1]).unwrap();
461 debug!("writer done");
462 tx2.send(()).unwrap();
463 });
464 tx1.send(()).unwrap();
465 let mut buf = [0, 0];
466 debug!("reader reading");
467 assert_eq!(s1.read(&mut buf), Ok(1));
468 debug!("reader done");
469 rx2.recv().unwrap();
470 }
471
472 #[test]
473 fn unix_clone_two_read() {
474 let addr = next_test_unix();
475 let mut acceptor = UnixListener::bind(&addr).listen();
476 let (tx1, rx) = channel();
477 let tx2 = tx1.clone();
478
479 let _t = thread::spawn(move|| {
480 let mut s = UnixStream::connect(&addr);
481 s.write(&[1]).unwrap();
482 rx.recv().unwrap();
483 s.write(&[2]).unwrap();
484 rx.recv().unwrap();
485 });
486
487 let mut s1 = acceptor.accept().unwrap();
488 let s2 = s1.clone();
489
490 let (done, rx) = channel();
491 let _t = thread::spawn(move|| {
492 let mut s2 = s2;
493 let mut buf = [0, 0];
494 s2.read(&mut buf).unwrap();
495 tx2.send(()).unwrap();
496 done.send(()).unwrap();
497 });
498 let mut buf = [0, 0];
499 s1.read(&mut buf).unwrap();
500 tx1.send(()).unwrap();
501
502 rx.recv().unwrap();
503 }
504
505 #[test]
506 fn unix_clone_two_write() {
507 let addr = next_test_unix();
508 let mut acceptor = UnixListener::bind(&addr).listen();
509
510 let _t = thread::spawn(move|| {
511 let mut s = UnixStream::connect(&addr);
512 let buf = &mut [0, 1];
513 s.read(buf).unwrap();
514 s.read(buf).unwrap();
515 });
516
517 let mut s1 = acceptor.accept().unwrap();
518 let s2 = s1.clone();
519
520 let (tx, rx) = channel();
521 let _t = thread::spawn(move|| {
522 let mut s2 = s2;
523 s2.write(&[1]).unwrap();
524 tx.send(()).unwrap();
525 });
526 s1.write(&[2]).unwrap();
527
528 rx.recv().unwrap();
529 }
530
531 #[cfg(not(windows))]
532 #[test]
533 fn drop_removes_listener_path() {
534 let path = next_test_unix();
535 let l = UnixListener::bind(&path).unwrap();
536 assert!(path.exists());
537 drop(l);
538 assert!(!path.exists());
539 }
540
541 #[cfg(not(windows))]
542 #[test]
543 fn drop_removes_acceptor_path() {
544 let path = next_test_unix();
545 let l = UnixListener::bind(&path).unwrap();
546 assert!(path.exists());
547 drop(l.listen().unwrap());
548 assert!(!path.exists());
549 }
550
551 #[test]
552 fn accept_timeout() {
553 let addr = next_test_unix();
554 let mut a = UnixListener::bind(&addr).unwrap().listen().unwrap();
555
556 a.set_timeout(Some(10));
557
558 // Make sure we time out once and future invocations also time out
559 let err = a.accept().err().unwrap();
560 assert_eq!(err.kind, TimedOut);
561 let err = a.accept().err().unwrap();
562 assert_eq!(err.kind, TimedOut);
563
564 // Also make sure that even though the timeout is expired that we will
565 // continue to receive any pending connections.
566 let (tx, rx) = channel();
567 let addr2 = addr.clone();
568 let _t = thread::spawn(move|| {
569 tx.send(UnixStream::connect(&addr2).unwrap()).unwrap();
570 });
571 let l = rx.recv().unwrap();
572 for i in 0..1001 {
573 match a.accept() {
574 Ok(..) => break,
575 Err(ref e) if e.kind == TimedOut => {}
576 Err(e) => panic!("error: {}", e),
577 }
578 ::thread::yield_now();
579 if i == 1000 { panic!("should have a pending connection") }
580 }
581 drop(l);
582
583 // Unset the timeout and make sure that this always blocks.
584 a.set_timeout(None);
585 let addr2 = addr.clone();
586 let _t = thread::spawn(move|| {
587 drop(UnixStream::connect(&addr2).unwrap());
588 });
589 a.accept().unwrap();
590 }
591
592 #[test]
593 fn connect_timeout_error() {
594 let addr = next_test_unix();
595 assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(100)).is_err());
596 }
597
598 #[test]
599 fn connect_timeout_success() {
600 let addr = next_test_unix();
601 let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
602 assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(100)).is_ok());
603 }
604
605 #[test]
606 fn connect_timeout_zero() {
607 let addr = next_test_unix();
608 let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
609 assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(0)).is_err());
610 }
611
612 #[test]
613 fn connect_timeout_negative() {
614 let addr = next_test_unix();
615 let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
616 assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(-1)).is_err());
617 }
618
619 #[test]
620 fn close_readwrite_smoke() {
621 let addr = next_test_unix();
622 let a = UnixListener::bind(&addr).listen().unwrap();
623 let (_tx, rx) = channel::<()>();
624 thread::spawn(move|| {
625 let mut a = a;
626 let _s = a.accept().unwrap();
627 let _ = rx.recv();
628 });
629
630 let mut b = [0];
631 let mut s = UnixStream::connect(&addr).unwrap();
632 let mut s2 = s.clone();
633
634 // closing should prevent reads/writes
635 s.close_write().unwrap();
636 assert!(s.write(&[0]).is_err());
637 s.close_read().unwrap();
638 assert!(s.read(&mut b).is_err());
639
640 // closing should affect previous handles
641 assert!(s2.write(&[0]).is_err());
642 assert!(s2.read(&mut b).is_err());
643
644 // closing should affect new handles
645 let mut s3 = s.clone();
646 assert!(s3.write(&[0]).is_err());
647 assert!(s3.read(&mut b).is_err());
648
649 // make sure these don't die
650 let _ = s2.close_read();
651 let _ = s2.close_write();
652 let _ = s3.close_read();
653 let _ = s3.close_write();
654 }
655
656 #[test]
657 fn close_read_wakes_up() {
658 let addr = next_test_unix();
659 let a = UnixListener::bind(&addr).listen().unwrap();
660 let (_tx, rx) = channel::<()>();
661 thread::spawn(move|| {
662 let mut a = a;
663 let _s = a.accept().unwrap();
664 let _ = rx.recv();
665 });
666
667 let mut s = UnixStream::connect(&addr).unwrap();
668 let s2 = s.clone();
669 let (tx, rx) = channel();
670 let _t = thread::spawn(move|| {
671 let mut s2 = s2;
672 assert!(s2.read(&mut [0]).is_err());
673 tx.send(()).unwrap();
674 });
675 // this should wake up the child task
676 s.close_read().unwrap();
677
678 // this test will never finish if the child doesn't wake up
679 rx.recv().unwrap();
680 }
681
682 #[test]
683 fn readwrite_timeouts() {
684 let addr = next_test_unix();
685 let mut a = UnixListener::bind(&addr).listen().unwrap();
686 let (tx, rx) = channel::<()>();
687 thread::spawn(move|| {
688 let mut s = UnixStream::connect(&addr).unwrap();
689 rx.recv().unwrap();
690 assert!(s.write(&[0]).is_ok());
691 let _ = rx.recv();
692 });
693
694 let mut s = a.accept().unwrap();
695 s.set_timeout(Some(20));
696 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
697 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
698
699 s.set_timeout(Some(20));
700 for i in 0..1001 {
701 match s.write(&[0; 128 * 1024]) {
702 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
703 Err(IoError { kind: TimedOut, .. }) => break,
704 Err(e) => panic!("{}", e),
705 }
706 if i == 1000 { panic!("should have filled up?!"); }
707 }
708
709 // I'm not sure as to why, but apparently the write on windows always
710 // succeeds after the previous timeout. Who knows?
711 if !cfg!(windows) {
712 assert_eq!(s.write(&[0]).err().unwrap().kind, TimedOut);
713 }
714
715 tx.send(()).unwrap();
716 s.set_timeout(None);
717 assert_eq!(s.read(&mut [0, 0]), Ok(1));
718 }
719
720 #[test]
721 fn read_timeouts() {
722 let addr = next_test_unix();
723 let mut a = UnixListener::bind(&addr).listen().unwrap();
724 let (tx, rx) = channel::<()>();
725 thread::spawn(move|| {
726 let mut s = UnixStream::connect(&addr).unwrap();
727 rx.recv().unwrap();
728 let mut amt = 0;
729 while amt < 100 * 128 * 1024 {
730 match s.read(&mut [0;128 * 1024]) {
731 Ok(n) => { amt += n; }
732 Err(e) => panic!("{}", e),
733 }
734 }
735 let _ = rx.recv();
736 });
737
738 let mut s = a.accept().unwrap();
739 s.set_read_timeout(Some(20));
740 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
741 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
742
743 tx.send(()).unwrap();
744 for _ in 0..100 {
745 assert!(s.write(&[0;128 * 1024]).is_ok());
746 }
747 }
748
749 #[test]
750 fn write_timeouts() {
751 let addr = next_test_unix();
752 let mut a = UnixListener::bind(&addr).listen().unwrap();
753 let (tx, rx) = channel::<()>();
754 thread::spawn(move|| {
755 let mut s = UnixStream::connect(&addr).unwrap();
756 rx.recv().unwrap();
757 assert!(s.write(&[0]).is_ok());
758 let _ = rx.recv();
759 });
760
761 let mut s = a.accept().unwrap();
762 s.set_write_timeout(Some(20));
763 for i in 0..1001 {
764 match s.write(&[0; 128 * 1024]) {
765 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
766 Err(IoError { kind: TimedOut, .. }) => break,
767 Err(e) => panic!("{}", e),
768 }
769 if i == 1000 { panic!("should have filled up?!"); }
770 }
771
772 tx.send(()).unwrap();
773 assert!(s.read(&mut [0]).is_ok());
774 }
775
776 #[test]
777 fn timeout_concurrent_read() {
778 let addr = next_test_unix();
779 let mut a = UnixListener::bind(&addr).listen().unwrap();
780 let (tx, rx) = channel::<()>();
781 thread::spawn(move|| {
782 let mut s = UnixStream::connect(&addr).unwrap();
783 rx.recv().unwrap();
784 assert!(s.write(&[0]).is_ok());
785 let _ = rx.recv();
786 });
787
788 let mut s = a.accept().unwrap();
789 let s2 = s.clone();
790 let (tx2, rx2) = channel();
791 let _t = thread::spawn(move|| {
792 let mut s2 = s2;
793 assert!(s2.read(&mut [0]).is_ok());
794 tx2.send(()).unwrap();
795 });
796
797 s.set_read_timeout(Some(20));
798 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
799 tx.send(()).unwrap();
800
801 rx2.recv().unwrap();
802 }
803
804 #[cfg(not(windows))]
805 #[test]
806 fn clone_accept_smoke() {
807 let addr = next_test_unix();
808 let l = UnixListener::bind(&addr);
809 let mut a = l.listen().unwrap();
810 let mut a2 = a.clone();
811
812 let addr2 = addr.clone();
813 let _t = thread::spawn(move|| {
814 let _ = UnixStream::connect(&addr2);
815 });
816 let _t = thread::spawn(move|| {
817 let _ = UnixStream::connect(&addr);
818 });
819
820 assert!(a.accept().is_ok());
821 drop(a);
822 assert!(a2.accept().is_ok());
823 }
824
825 #[cfg(not(windows))] // FIXME #17553
826 #[test]
827 fn clone_accept_concurrent() {
828 let addr = next_test_unix();
829 let l = UnixListener::bind(&addr);
830 let a = l.listen().unwrap();
831 let a2 = a.clone();
832
833 let (tx, rx) = channel();
834 let tx2 = tx.clone();
835
836 let _t = thread::spawn(move|| {
837 let mut a = a;
838 tx.send(a.accept()).unwrap()
839 });
840 let _t = thread::spawn(move|| {
841 let mut a = a2;
842 tx2.send(a.accept()).unwrap()
843 });
844
845 let addr2 = addr.clone();
846 let _t = thread::spawn(move|| {
847 let _ = UnixStream::connect(&addr2);
848 });
849 let _t = thread::spawn(move|| {
850 let _ = UnixStream::connect(&addr);
851 });
852
853 assert!(rx.recv().unwrap().is_ok());
854 assert!(rx.recv().unwrap().is_ok());
855 }
856
857 #[test]
858 fn close_accept_smoke() {
859 let addr = next_test_unix();
860 let l = UnixListener::bind(&addr);
861 let mut a = l.listen().unwrap();
862
863 a.close_accept().unwrap();
864 assert_eq!(a.accept().err().unwrap().kind, EndOfFile);
865 }
866
867 #[test]
868 fn close_accept_concurrent() {
869 let addr = next_test_unix();
870 let l = UnixListener::bind(&addr);
871 let a = l.listen().unwrap();
872 let mut a2 = a.clone();
873
874 let (tx, rx) = channel();
875 let _t = thread::spawn(move|| {
876 let mut a = a;
877 tx.send(a.accept()).unwrap();
878 });
879 a2.close_accept().unwrap();
880
881 assert_eq!(rx.recv().unwrap().err().unwrap().kind, EndOfFile);
882 }
883 }