]> git.proxmox.com Git - rustc.git/blob - src/libstd/net/tcp.rs
Imported Upstream version 1.7.0+dfsg1
[rustc.git] / src / libstd / net / tcp.rs
1 // Copyright 2015 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use prelude::v1::*;
12 use io::prelude::*;
13
14 use fmt;
15 use io;
16 use net::{ToSocketAddrs, SocketAddr, Shutdown};
17 use sys_common::io::read_to_end_uninitialized;
18 use sys_common::net as net_imp;
19 use sys_common::{AsInner, FromInner, IntoInner};
20 use time::Duration;
21
22 /// A structure which represents a TCP stream between a local socket and a
23 /// remote socket.
24 ///
25 /// The socket will be closed when the value is dropped.
26 ///
27 /// # Examples
28 ///
29 /// ```no_run
30 /// use std::io::prelude::*;
31 /// use std::net::TcpStream;
32 ///
33 /// {
34 /// let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();
35 ///
36 /// // ignore the Result
37 /// let _ = stream.write(&[1]);
38 /// let _ = stream.read(&mut [0; 128]); // ignore here too
39 /// } // the stream is closed here
40 /// ```
41 #[stable(feature = "rust1", since = "1.0.0")]
42 pub struct TcpStream(net_imp::TcpStream);
43
44 /// A structure representing a socket server.
45 ///
46 /// # Examples
47 ///
48 /// ```no_run
49 /// use std::net::{TcpListener, TcpStream};
50 /// use std::thread;
51 ///
52 /// let listener = TcpListener::bind("127.0.0.1:80").unwrap();
53 ///
54 /// fn handle_client(stream: TcpStream) {
55 /// // ...
56 /// }
57 ///
58 /// // accept connections and process them, spawning a new thread for each one
59 /// for stream in listener.incoming() {
60 /// match stream {
61 /// Ok(stream) => {
62 /// thread::spawn(move|| {
63 /// // connection succeeded
64 /// handle_client(stream)
65 /// });
66 /// }
67 /// Err(e) => { /* connection failed */ }
68 /// }
69 /// }
70 ///
71 /// // close the socket server
72 /// drop(listener);
73 /// ```
74 #[stable(feature = "rust1", since = "1.0.0")]
75 pub struct TcpListener(net_imp::TcpListener);
76
77 /// An infinite iterator over the connections from a `TcpListener`.
78 ///
79 /// This iterator will infinitely yield `Some` of the accepted connections. It
80 /// is equivalent to calling `accept` in a loop.
81 #[stable(feature = "rust1", since = "1.0.0")]
82 pub struct Incoming<'a> { listener: &'a TcpListener }
83
84 impl TcpStream {
85 /// Opens a TCP connection to a remote host.
86 ///
87 /// `addr` is an address of the remote host. Anything which implements
88 /// `ToSocketAddrs` trait can be supplied for the address; see this trait
89 /// documentation for concrete examples.
90 #[stable(feature = "rust1", since = "1.0.0")]
91 pub fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
92 super::each_addr(addr, net_imp::TcpStream::connect).map(TcpStream)
93 }
94
95 /// Returns the socket address of the remote peer of this TCP connection.
96 #[stable(feature = "rust1", since = "1.0.0")]
97 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
98 self.0.peer_addr()
99 }
100
101 /// Returns the socket address of the local half of this TCP connection.
102 #[stable(feature = "rust1", since = "1.0.0")]
103 pub fn local_addr(&self) -> io::Result<SocketAddr> {
104 self.0.socket_addr()
105 }
106
107 /// Shuts down the read, write, or both halves of this connection.
108 ///
109 /// This function will cause all pending and future I/O on the specified
110 /// portions to return immediately with an appropriate value (see the
111 /// documentation of `Shutdown`).
112 #[stable(feature = "rust1", since = "1.0.0")]
113 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
114 self.0.shutdown(how)
115 }
116
117 /// Creates a new independently owned handle to the underlying socket.
118 ///
119 /// The returned `TcpStream` is a reference to the same stream that this
120 /// object references. Both handles will read and write the same stream of
121 /// data, and options set on one stream will be propagated to the other
122 /// stream.
123 #[stable(feature = "rust1", since = "1.0.0")]
124 pub fn try_clone(&self) -> io::Result<TcpStream> {
125 self.0.duplicate().map(TcpStream)
126 }
127
128 /// Sets the read timeout to the timeout specified.
129 ///
130 /// If the value specified is `None`, then `read` calls will block
131 /// indefinitely. It is an error to pass the zero `Duration` to this
132 /// method.
133 ///
134 /// # Note
135 ///
136 /// Platforms may return a different error code whenever a read times out as
137 /// a result of setting this option. For example Unix typically returns an
138 /// error of the kind `WouldBlock`, but Windows may return `TimedOut`.
139 #[stable(feature = "socket_timeout", since = "1.4.0")]
140 pub fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
141 self.0.set_read_timeout(dur)
142 }
143
144 /// Sets the write timeout to the timeout specified.
145 ///
146 /// If the value specified is `None`, then `write` calls will block
147 /// indefinitely. It is an error to pass the zero `Duration` to this
148 /// method.
149 ///
150 /// # Note
151 ///
152 /// Platforms may return a different error code whenever a write times out
153 /// as a result of setting this option. For example Unix typically returns
154 /// an error of the kind `WouldBlock`, but Windows may return `TimedOut`.
155 #[stable(feature = "socket_timeout", since = "1.4.0")]
156 pub fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
157 self.0.set_write_timeout(dur)
158 }
159
160 /// Returns the read timeout of this socket.
161 ///
162 /// If the timeout is `None`, then `read` calls will block indefinitely.
163 ///
164 /// # Note
165 ///
166 /// Some platforms do not provide access to the current timeout.
167 #[stable(feature = "socket_timeout", since = "1.4.0")]
168 pub fn read_timeout(&self) -> io::Result<Option<Duration>> {
169 self.0.read_timeout()
170 }
171
172 /// Returns the write timeout of this socket.
173 ///
174 /// If the timeout is `None`, then `write` calls will block indefinitely.
175 ///
176 /// # Note
177 ///
178 /// Some platforms do not provide access to the current timeout.
179 #[stable(feature = "socket_timeout", since = "1.4.0")]
180 pub fn write_timeout(&self) -> io::Result<Option<Duration>> {
181 self.0.write_timeout()
182 }
183 }
184
185 #[stable(feature = "rust1", since = "1.0.0")]
186 impl Read for TcpStream {
187 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.0.read(buf) }
188 fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
189 unsafe { read_to_end_uninitialized(self, buf) }
190 }
191 }
192 #[stable(feature = "rust1", since = "1.0.0")]
193 impl Write for TcpStream {
194 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self.0.write(buf) }
195 fn flush(&mut self) -> io::Result<()> { Ok(()) }
196 }
197 #[stable(feature = "rust1", since = "1.0.0")]
198 impl<'a> Read for &'a TcpStream {
199 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.0.read(buf) }
200 fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
201 unsafe { read_to_end_uninitialized(self, buf) }
202 }
203 }
204 #[stable(feature = "rust1", since = "1.0.0")]
205 impl<'a> Write for &'a TcpStream {
206 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self.0.write(buf) }
207 fn flush(&mut self) -> io::Result<()> { Ok(()) }
208 }
209
210 impl AsInner<net_imp::TcpStream> for TcpStream {
211 fn as_inner(&self) -> &net_imp::TcpStream { &self.0 }
212 }
213
214 impl FromInner<net_imp::TcpStream> for TcpStream {
215 fn from_inner(inner: net_imp::TcpStream) -> TcpStream { TcpStream(inner) }
216 }
217
218 impl IntoInner<net_imp::TcpStream> for TcpStream {
219 fn into_inner(self) -> net_imp::TcpStream { self.0 }
220 }
221
222 #[stable(feature = "rust1", since = "1.0.0")]
223 impl fmt::Debug for TcpStream {
224 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
225 self.0.fmt(f)
226 }
227 }
228
229 impl TcpListener {
230 /// Creates a new `TcpListener` which will be bound to the specified
231 /// address.
232 ///
233 /// The returned listener is ready for accepting connections.
234 ///
235 /// Binding with a port number of 0 will request that the OS assigns a port
236 /// to this listener. The port allocated can be queried via the
237 /// `local_addr` method.
238 ///
239 /// The address type can be any implementor of `ToSocketAddrs` trait. See
240 /// its documentation for concrete examples.
241 #[stable(feature = "rust1", since = "1.0.0")]
242 pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> {
243 super::each_addr(addr, net_imp::TcpListener::bind).map(TcpListener)
244 }
245
246 /// Returns the local socket address of this listener.
247 #[stable(feature = "rust1", since = "1.0.0")]
248 pub fn local_addr(&self) -> io::Result<SocketAddr> {
249 self.0.socket_addr()
250 }
251
252 /// Creates a new independently owned handle to the underlying socket.
253 ///
254 /// The returned `TcpListener` is a reference to the same socket that this
255 /// object references. Both handles can be used to accept incoming
256 /// connections and options set on one listener will affect the other.
257 #[stable(feature = "rust1", since = "1.0.0")]
258 pub fn try_clone(&self) -> io::Result<TcpListener> {
259 self.0.duplicate().map(TcpListener)
260 }
261
262 /// Accept a new incoming connection from this listener.
263 ///
264 /// This function will block the calling thread until a new TCP connection
265 /// is established. When established, the corresponding `TcpStream` and the
266 /// remote peer's address will be returned.
267 #[stable(feature = "rust1", since = "1.0.0")]
268 pub fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
269 self.0.accept().map(|(a, b)| (TcpStream(a), b))
270 }
271
272 /// Returns an iterator over the connections being received on this
273 /// listener.
274 ///
275 /// The returned iterator will never return `None` and will also not yield
276 /// the peer's `SocketAddr` structure.
277 #[stable(feature = "rust1", since = "1.0.0")]
278 pub fn incoming(&self) -> Incoming {
279 Incoming { listener: self }
280 }
281 }
282
283 #[stable(feature = "rust1", since = "1.0.0")]
284 impl<'a> Iterator for Incoming<'a> {
285 type Item = io::Result<TcpStream>;
286 fn next(&mut self) -> Option<io::Result<TcpStream>> {
287 Some(self.listener.accept().map(|p| p.0))
288 }
289 }
290
291 impl AsInner<net_imp::TcpListener> for TcpListener {
292 fn as_inner(&self) -> &net_imp::TcpListener { &self.0 }
293 }
294
295 impl FromInner<net_imp::TcpListener> for TcpListener {
296 fn from_inner(inner: net_imp::TcpListener) -> TcpListener {
297 TcpListener(inner)
298 }
299 }
300
301 impl IntoInner<net_imp::TcpListener> for TcpListener {
302 fn into_inner(self) -> net_imp::TcpListener { self.0 }
303 }
304
305 #[stable(feature = "rust1", since = "1.0.0")]
306 impl fmt::Debug for TcpListener {
307 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
308 self.0.fmt(f)
309 }
310 }
311
312 #[cfg(test)]
313 mod tests {
314 use prelude::v1::*;
315
316 use io::ErrorKind;
317 use io::prelude::*;
318 use net::*;
319 use net::test::{next_test_ip4, next_test_ip6};
320 use sync::mpsc::channel;
321 use sys_common::AsInner;
322 use time::{Instant, Duration};
323 use thread;
324
325 fn each_ip(f: &mut FnMut(SocketAddr)) {
326 f(next_test_ip4());
327 f(next_test_ip6());
328 }
329
330 macro_rules! t {
331 ($e:expr) => {
332 match $e {
333 Ok(t) => t,
334 Err(e) => panic!("received error for `{}`: {}", stringify!($e), e),
335 }
336 }
337 }
338
339 #[test]
340 fn bind_error() {
341 match TcpListener::bind("1.1.1.1:9999") {
342 Ok(..) => panic!(),
343 Err(e) =>
344 assert_eq!(e.kind(), ErrorKind::AddrNotAvailable),
345 }
346 }
347
348 #[test]
349 fn connect_error() {
350 match TcpStream::connect("0.0.0.0:1") {
351 Ok(..) => panic!(),
352 Err(e) => assert!(e.kind() == ErrorKind::ConnectionRefused ||
353 e.kind() == ErrorKind::InvalidInput ||
354 e.kind() == ErrorKind::AddrInUse ||
355 e.kind() == ErrorKind::AddrNotAvailable,
356 "bad error: {} {:?}", e, e.kind()),
357 }
358 }
359
360 #[test]
361 fn listen_localhost() {
362 let socket_addr = next_test_ip4();
363 let listener = t!(TcpListener::bind(&socket_addr));
364
365 let _t = thread::spawn(move || {
366 let mut stream = t!(TcpStream::connect(&("localhost",
367 socket_addr.port())));
368 t!(stream.write(&[144]));
369 });
370
371 let mut stream = t!(listener.accept()).0;
372 let mut buf = [0];
373 t!(stream.read(&mut buf));
374 assert!(buf[0] == 144);
375 }
376
377 #[test]
378 fn connect_loopback() {
379 each_ip(&mut |addr| {
380 let acceptor = t!(TcpListener::bind(&addr));
381
382 let _t = thread::spawn(move|| {
383 let host = match addr {
384 SocketAddr::V4(..) => "127.0.0.1",
385 SocketAddr::V6(..) => "::1",
386 };
387 let mut stream = t!(TcpStream::connect(&(host, addr.port())));
388 t!(stream.write(&[66]));
389 });
390
391 let mut stream = t!(acceptor.accept()).0;
392 let mut buf = [0];
393 t!(stream.read(&mut buf));
394 assert!(buf[0] == 66);
395 })
396 }
397
398 #[test]
399 fn smoke_test() {
400 each_ip(&mut |addr| {
401 let acceptor = t!(TcpListener::bind(&addr));
402
403 let (tx, rx) = channel();
404 let _t = thread::spawn(move|| {
405 let mut stream = t!(TcpStream::connect(&addr));
406 t!(stream.write(&[99]));
407 tx.send(t!(stream.local_addr())).unwrap();
408 });
409
410 let (mut stream, addr) = t!(acceptor.accept());
411 let mut buf = [0];
412 t!(stream.read(&mut buf));
413 assert!(buf[0] == 99);
414 assert_eq!(addr, t!(rx.recv()));
415 })
416 }
417
418 #[test]
419 fn read_eof() {
420 each_ip(&mut |addr| {
421 let acceptor = t!(TcpListener::bind(&addr));
422
423 let _t = thread::spawn(move|| {
424 let _stream = t!(TcpStream::connect(&addr));
425 // Close
426 });
427
428 let mut stream = t!(acceptor.accept()).0;
429 let mut buf = [0];
430 let nread = t!(stream.read(&mut buf));
431 assert_eq!(nread, 0);
432 let nread = t!(stream.read(&mut buf));
433 assert_eq!(nread, 0);
434 })
435 }
436
437 #[test]
438 fn write_close() {
439 each_ip(&mut |addr| {
440 let acceptor = t!(TcpListener::bind(&addr));
441
442 let (tx, rx) = channel();
443 let _t = thread::spawn(move|| {
444 drop(t!(TcpStream::connect(&addr)));
445 tx.send(()).unwrap();
446 });
447
448 let mut stream = t!(acceptor.accept()).0;
449 rx.recv().unwrap();
450 let buf = [0];
451 match stream.write(&buf) {
452 Ok(..) => {}
453 Err(e) => {
454 assert!(e.kind() == ErrorKind::ConnectionReset ||
455 e.kind() == ErrorKind::BrokenPipe ||
456 e.kind() == ErrorKind::ConnectionAborted,
457 "unknown error: {}", e);
458 }
459 }
460 })
461 }
462
463 #[test]
464 fn multiple_connect_serial() {
465 each_ip(&mut |addr| {
466 let max = 10;
467 let acceptor = t!(TcpListener::bind(&addr));
468
469 let _t = thread::spawn(move|| {
470 for _ in 0..max {
471 let mut stream = t!(TcpStream::connect(&addr));
472 t!(stream.write(&[99]));
473 }
474 });
475
476 for stream in acceptor.incoming().take(max) {
477 let mut stream = t!(stream);
478 let mut buf = [0];
479 t!(stream.read(&mut buf));
480 assert_eq!(buf[0], 99);
481 }
482 })
483 }
484
485 #[test]
486 fn multiple_connect_interleaved_greedy_schedule() {
487 const MAX: usize = 10;
488 each_ip(&mut |addr| {
489 let acceptor = t!(TcpListener::bind(&addr));
490
491 let _t = thread::spawn(move|| {
492 let acceptor = acceptor;
493 for (i, stream) in acceptor.incoming().enumerate().take(MAX) {
494 // Start another thread to handle the connection
495 let _t = thread::spawn(move|| {
496 let mut stream = t!(stream);
497 let mut buf = [0];
498 t!(stream.read(&mut buf));
499 assert!(buf[0] == i as u8);
500 });
501 }
502 });
503
504 connect(0, addr);
505 });
506
507 fn connect(i: usize, addr: SocketAddr) {
508 if i == MAX { return }
509
510 let t = thread::spawn(move|| {
511 let mut stream = t!(TcpStream::connect(&addr));
512 // Connect again before writing
513 connect(i + 1, addr);
514 t!(stream.write(&[i as u8]));
515 });
516 t.join().ok().unwrap();
517 }
518 }
519
520 #[test]
521 fn multiple_connect_interleaved_lazy_schedule() {
522 const MAX: usize = 10;
523 each_ip(&mut |addr| {
524 let acceptor = t!(TcpListener::bind(&addr));
525
526 let _t = thread::spawn(move|| {
527 for stream in acceptor.incoming().take(MAX) {
528 // Start another thread to handle the connection
529 let _t = thread::spawn(move|| {
530 let mut stream = t!(stream);
531 let mut buf = [0];
532 t!(stream.read(&mut buf));
533 assert!(buf[0] == 99);
534 });
535 }
536 });
537
538 connect(0, addr);
539 });
540
541 fn connect(i: usize, addr: SocketAddr) {
542 if i == MAX { return }
543
544 let t = thread::spawn(move|| {
545 let mut stream = t!(TcpStream::connect(&addr));
546 connect(i + 1, addr);
547 t!(stream.write(&[99]));
548 });
549 t.join().ok().unwrap();
550 }
551 }
552
553 #[test]
554 fn socket_and_peer_name() {
555 each_ip(&mut |addr| {
556 let listener = t!(TcpListener::bind(&addr));
557 let so_name = t!(listener.local_addr());
558 assert_eq!(addr, so_name);
559 let _t = thread::spawn(move|| {
560 t!(listener.accept());
561 });
562
563 let stream = t!(TcpStream::connect(&addr));
564 assert_eq!(addr, t!(stream.peer_addr()));
565 })
566 }
567
568 #[test]
569 fn partial_read() {
570 each_ip(&mut |addr| {
571 let (tx, rx) = channel();
572 let srv = t!(TcpListener::bind(&addr));
573 let _t = thread::spawn(move|| {
574 let mut cl = t!(srv.accept()).0;
575 cl.write(&[10]).unwrap();
576 let mut b = [0];
577 t!(cl.read(&mut b));
578 tx.send(()).unwrap();
579 });
580
581 let mut c = t!(TcpStream::connect(&addr));
582 let mut b = [0; 10];
583 assert_eq!(c.read(&mut b).unwrap(), 1);
584 t!(c.write(&[1]));
585 rx.recv().unwrap();
586 })
587 }
588
589 #[test]
590 fn double_bind() {
591 each_ip(&mut |addr| {
592 let _listener = t!(TcpListener::bind(&addr));
593 match TcpListener::bind(&addr) {
594 Ok(..) => panic!(),
595 Err(e) => {
596 assert!(e.kind() == ErrorKind::ConnectionRefused ||
597 e.kind() == ErrorKind::Other ||
598 e.kind() == ErrorKind::AddrInUse,
599 "unknown error: {} {:?}", e, e.kind());
600 }
601 }
602 })
603 }
604
605 #[test]
606 fn fast_rebind() {
607 each_ip(&mut |addr| {
608 let acceptor = t!(TcpListener::bind(&addr));
609
610 let _t = thread::spawn(move|| {
611 t!(TcpStream::connect(&addr));
612 });
613
614 t!(acceptor.accept());
615 drop(acceptor);
616 t!(TcpListener::bind(&addr));
617 });
618 }
619
620 #[test]
621 fn tcp_clone_smoke() {
622 each_ip(&mut |addr| {
623 let acceptor = t!(TcpListener::bind(&addr));
624
625 let _t = thread::spawn(move|| {
626 let mut s = t!(TcpStream::connect(&addr));
627 let mut buf = [0, 0];
628 assert_eq!(s.read(&mut buf).unwrap(), 1);
629 assert_eq!(buf[0], 1);
630 t!(s.write(&[2]));
631 });
632
633 let mut s1 = t!(acceptor.accept()).0;
634 let s2 = t!(s1.try_clone());
635
636 let (tx1, rx1) = channel();
637 let (tx2, rx2) = channel();
638 let _t = thread::spawn(move|| {
639 let mut s2 = s2;
640 rx1.recv().unwrap();
641 t!(s2.write(&[1]));
642 tx2.send(()).unwrap();
643 });
644 tx1.send(()).unwrap();
645 let mut buf = [0, 0];
646 assert_eq!(s1.read(&mut buf).unwrap(), 1);
647 rx2.recv().unwrap();
648 })
649 }
650
651 #[test]
652 fn tcp_clone_two_read() {
653 each_ip(&mut |addr| {
654 let acceptor = t!(TcpListener::bind(&addr));
655 let (tx1, rx) = channel();
656 let tx2 = tx1.clone();
657
658 let _t = thread::spawn(move|| {
659 let mut s = t!(TcpStream::connect(&addr));
660 t!(s.write(&[1]));
661 rx.recv().unwrap();
662 t!(s.write(&[2]));
663 rx.recv().unwrap();
664 });
665
666 let mut s1 = t!(acceptor.accept()).0;
667 let s2 = t!(s1.try_clone());
668
669 let (done, rx) = channel();
670 let _t = thread::spawn(move|| {
671 let mut s2 = s2;
672 let mut buf = [0, 0];
673 t!(s2.read(&mut buf));
674 tx2.send(()).unwrap();
675 done.send(()).unwrap();
676 });
677 let mut buf = [0, 0];
678 t!(s1.read(&mut buf));
679 tx1.send(()).unwrap();
680
681 rx.recv().unwrap();
682 })
683 }
684
685 #[test]
686 fn tcp_clone_two_write() {
687 each_ip(&mut |addr| {
688 let acceptor = t!(TcpListener::bind(&addr));
689
690 let _t = thread::spawn(move|| {
691 let mut s = t!(TcpStream::connect(&addr));
692 let mut buf = [0, 1];
693 t!(s.read(&mut buf));
694 t!(s.read(&mut buf));
695 });
696
697 let mut s1 = t!(acceptor.accept()).0;
698 let s2 = t!(s1.try_clone());
699
700 let (done, rx) = channel();
701 let _t = thread::spawn(move|| {
702 let mut s2 = s2;
703 t!(s2.write(&[1]));
704 done.send(()).unwrap();
705 });
706 t!(s1.write(&[2]));
707
708 rx.recv().unwrap();
709 })
710 }
711
712 #[test]
713 fn shutdown_smoke() {
714 each_ip(&mut |addr| {
715 let a = t!(TcpListener::bind(&addr));
716 let _t = thread::spawn(move|| {
717 let mut c = t!(a.accept()).0;
718 let mut b = [0];
719 assert_eq!(c.read(&mut b).unwrap(), 0);
720 t!(c.write(&[1]));
721 });
722
723 let mut s = t!(TcpStream::connect(&addr));
724 t!(s.shutdown(Shutdown::Write));
725 assert!(s.write(&[1]).is_err());
726 let mut b = [0, 0];
727 assert_eq!(t!(s.read(&mut b)), 1);
728 assert_eq!(b[0], 1);
729 })
730 }
731
732 #[test]
733 fn close_readwrite_smoke() {
734 each_ip(&mut |addr| {
735 let a = t!(TcpListener::bind(&addr));
736 let (tx, rx) = channel::<()>();
737 let _t = thread::spawn(move|| {
738 let _s = t!(a.accept());
739 let _ = rx.recv();
740 });
741
742 let mut b = [0];
743 let mut s = t!(TcpStream::connect(&addr));
744 let mut s2 = t!(s.try_clone());
745
746 // closing should prevent reads/writes
747 t!(s.shutdown(Shutdown::Write));
748 assert!(s.write(&[0]).is_err());
749 t!(s.shutdown(Shutdown::Read));
750 assert_eq!(s.read(&mut b).unwrap(), 0);
751
752 // closing should affect previous handles
753 assert!(s2.write(&[0]).is_err());
754 assert_eq!(s2.read(&mut b).unwrap(), 0);
755
756 // closing should affect new handles
757 let mut s3 = t!(s.try_clone());
758 assert!(s3.write(&[0]).is_err());
759 assert_eq!(s3.read(&mut b).unwrap(), 0);
760
761 // make sure these don't die
762 let _ = s2.shutdown(Shutdown::Read);
763 let _ = s2.shutdown(Shutdown::Write);
764 let _ = s3.shutdown(Shutdown::Read);
765 let _ = s3.shutdown(Shutdown::Write);
766 drop(tx);
767 })
768 }
769
770 #[test]
771 fn close_read_wakes_up() {
772 each_ip(&mut |addr| {
773 let a = t!(TcpListener::bind(&addr));
774 let (tx1, rx) = channel::<()>();
775 let _t = thread::spawn(move|| {
776 let _s = t!(a.accept());
777 let _ = rx.recv();
778 });
779
780 let s = t!(TcpStream::connect(&addr));
781 let s2 = t!(s.try_clone());
782 let (tx, rx) = channel();
783 let _t = thread::spawn(move|| {
784 let mut s2 = s2;
785 assert_eq!(t!(s2.read(&mut [0])), 0);
786 tx.send(()).unwrap();
787 });
788 // this should wake up the child thread
789 t!(s.shutdown(Shutdown::Read));
790
791 // this test will never finish if the child doesn't wake up
792 rx.recv().unwrap();
793 drop(tx1);
794 })
795 }
796
797 #[test]
798 fn clone_while_reading() {
799 each_ip(&mut |addr| {
800 let accept = t!(TcpListener::bind(&addr));
801
802 // Enqueue a thread to write to a socket
803 let (tx, rx) = channel();
804 let (txdone, rxdone) = channel();
805 let txdone2 = txdone.clone();
806 let _t = thread::spawn(move|| {
807 let mut tcp = t!(TcpStream::connect(&addr));
808 rx.recv().unwrap();
809 t!(tcp.write(&[0]));
810 txdone2.send(()).unwrap();
811 });
812
813 // Spawn off a reading clone
814 let tcp = t!(accept.accept()).0;
815 let tcp2 = t!(tcp.try_clone());
816 let txdone3 = txdone.clone();
817 let _t = thread::spawn(move|| {
818 let mut tcp2 = tcp2;
819 t!(tcp2.read(&mut [0]));
820 txdone3.send(()).unwrap();
821 });
822
823 // Try to ensure that the reading clone is indeed reading
824 for _ in 0..50 {
825 thread::yield_now();
826 }
827
828 // clone the handle again while it's reading, then let it finish the
829 // read.
830 let _ = t!(tcp.try_clone());
831 tx.send(()).unwrap();
832 rxdone.recv().unwrap();
833 rxdone.recv().unwrap();
834 })
835 }
836
837 #[test]
838 fn clone_accept_smoke() {
839 each_ip(&mut |addr| {
840 let a = t!(TcpListener::bind(&addr));
841 let a2 = t!(a.try_clone());
842
843 let _t = thread::spawn(move|| {
844 let _ = TcpStream::connect(&addr);
845 });
846 let _t = thread::spawn(move|| {
847 let _ = TcpStream::connect(&addr);
848 });
849
850 t!(a.accept());
851 t!(a2.accept());
852 })
853 }
854
855 #[test]
856 fn clone_accept_concurrent() {
857 each_ip(&mut |addr| {
858 let a = t!(TcpListener::bind(&addr));
859 let a2 = t!(a.try_clone());
860
861 let (tx, rx) = channel();
862 let tx2 = tx.clone();
863
864 let _t = thread::spawn(move|| {
865 tx.send(t!(a.accept())).unwrap();
866 });
867 let _t = thread::spawn(move|| {
868 tx2.send(t!(a2.accept())).unwrap();
869 });
870
871 let _t = thread::spawn(move|| {
872 let _ = TcpStream::connect(&addr);
873 });
874 let _t = thread::spawn(move|| {
875 let _ = TcpStream::connect(&addr);
876 });
877
878 rx.recv().unwrap();
879 rx.recv().unwrap();
880 })
881 }
882
883 #[test]
884 fn debug() {
885 let name = if cfg!(windows) {"socket"} else {"fd"};
886 let socket_addr = next_test_ip4();
887
888 let listener = t!(TcpListener::bind(&socket_addr));
889 let listener_inner = listener.0.socket().as_inner();
890 let compare = format!("TcpListener {{ addr: {:?}, {}: {:?} }}",
891 socket_addr, name, listener_inner);
892 assert_eq!(format!("{:?}", listener), compare);
893
894 let stream = t!(TcpStream::connect(&("localhost",
895 socket_addr.port())));
896 let stream_inner = stream.0.socket().as_inner();
897 let compare = format!("TcpStream {{ addr: {:?}, \
898 peer: {:?}, {}: {:?} }}",
899 stream.local_addr().unwrap(),
900 stream.peer_addr().unwrap(),
901 name,
902 stream_inner);
903 assert_eq!(format!("{:?}", stream), compare);
904 }
905
906 // FIXME: re-enabled bitrig/openbsd tests once their socket timeout code
907 // no longer has rounding errors.
908 #[cfg_attr(any(target_os = "bitrig", target_os = "netbsd", target_os = "openbsd"), ignore)]
909 #[test]
910 fn timeouts() {
911 let addr = next_test_ip4();
912 let listener = t!(TcpListener::bind(&addr));
913
914 let stream = t!(TcpStream::connect(&("localhost", addr.port())));
915 let dur = Duration::new(15410, 0);
916
917 assert_eq!(None, t!(stream.read_timeout()));
918
919 t!(stream.set_read_timeout(Some(dur)));
920 assert_eq!(Some(dur), t!(stream.read_timeout()));
921
922 assert_eq!(None, t!(stream.write_timeout()));
923
924 t!(stream.set_write_timeout(Some(dur)));
925 assert_eq!(Some(dur), t!(stream.write_timeout()));
926
927 t!(stream.set_read_timeout(None));
928 assert_eq!(None, t!(stream.read_timeout()));
929
930 t!(stream.set_write_timeout(None));
931 assert_eq!(None, t!(stream.write_timeout()));
932 drop(listener);
933 }
934
935 #[test]
936 fn test_read_timeout() {
937 let addr = next_test_ip4();
938 let listener = t!(TcpListener::bind(&addr));
939
940 let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
941 t!(stream.set_read_timeout(Some(Duration::from_millis(1000))));
942
943 let mut buf = [0; 10];
944 let start = Instant::now();
945 let kind = stream.read(&mut buf).err().expect("expected error").kind();
946 assert!(kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut);
947 assert!(start.elapsed() > Duration::from_millis(400));
948 drop(listener);
949 }
950
951 #[test]
952 fn test_read_with_timeout() {
953 let addr = next_test_ip4();
954 let listener = t!(TcpListener::bind(&addr));
955
956 let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
957 t!(stream.set_read_timeout(Some(Duration::from_millis(1000))));
958
959 let mut other_end = t!(listener.accept()).0;
960 t!(other_end.write_all(b"hello world"));
961
962 let mut buf = [0; 11];
963 t!(stream.read(&mut buf));
964 assert_eq!(b"hello world", &buf[..]);
965
966 let start = Instant::now();
967 let kind = stream.read(&mut buf).err().expect("expected error").kind();
968 assert!(kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut);
969 assert!(start.elapsed() > Duration::from_millis(400));
970 drop(listener);
971 }
972 }