]> git.proxmox.com Git - rustc.git/blob - src/libstd/net/tcp.rs
Imported Upstream version 1.5.0+dfsg1
[rustc.git] / src / libstd / net / tcp.rs
1 // Copyright 2015 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use prelude::v1::*;
12 use io::prelude::*;
13
14 use fmt;
15 use io;
16 use net::{ToSocketAddrs, SocketAddr, Shutdown};
17 use sys_common::io::read_to_end_uninitialized;
18 use sys_common::net as net_imp;
19 use sys_common::{AsInner, FromInner, IntoInner};
20 use time::Duration;
21
22 /// A structure which represents a TCP stream between a local socket and a
23 /// remote socket.
24 ///
25 /// The socket will be closed when the value is dropped.
26 ///
27 /// # Examples
28 ///
29 /// ```no_run
30 /// use std::io::prelude::*;
31 /// use std::net::TcpStream;
32 ///
33 /// {
34 /// let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();
35 ///
36 /// // ignore the Result
37 /// let _ = stream.write(&[1]);
38 /// let _ = stream.read(&mut [0; 128]); // ignore here too
39 /// } // the stream is closed here
40 /// ```
41 #[stable(feature = "rust1", since = "1.0.0")]
42 pub struct TcpStream(net_imp::TcpStream);
43
44 /// A structure representing a socket server.
45 ///
46 /// # Examples
47 ///
48 /// ```no_run
49 /// use std::net::{TcpListener, TcpStream};
50 /// use std::thread;
51 ///
52 /// let listener = TcpListener::bind("127.0.0.1:80").unwrap();
53 ///
54 /// fn handle_client(stream: TcpStream) {
55 /// // ...
56 /// }
57 ///
58 /// // accept connections and process them, spawning a new thread for each one
59 /// for stream in listener.incoming() {
60 /// match stream {
61 /// Ok(stream) => {
62 /// thread::spawn(move|| {
63 /// // connection succeeded
64 /// handle_client(stream)
65 /// });
66 /// }
67 /// Err(e) => { /* connection failed */ }
68 /// }
69 /// }
70 ///
71 /// // close the socket server
72 /// drop(listener);
73 /// ```
74 #[stable(feature = "rust1", since = "1.0.0")]
75 pub struct TcpListener(net_imp::TcpListener);
76
77 /// An infinite iterator over the connections from a `TcpListener`.
78 ///
79 /// This iterator will infinitely yield `Some` of the accepted connections. It
80 /// is equivalent to calling `accept` in a loop.
81 #[stable(feature = "rust1", since = "1.0.0")]
82 pub struct Incoming<'a> { listener: &'a TcpListener }
83
84 impl TcpStream {
85 /// Opens a TCP connection to a remote host.
86 ///
87 /// `addr` is an address of the remote host. Anything which implements
88 /// `ToSocketAddrs` trait can be supplied for the address; see this trait
89 /// documentation for concrete examples.
90 #[stable(feature = "rust1", since = "1.0.0")]
91 pub fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
92 super::each_addr(addr, net_imp::TcpStream::connect).map(TcpStream)
93 }
94
95 /// Returns the socket address of the remote peer of this TCP connection.
96 #[stable(feature = "rust1", since = "1.0.0")]
97 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
98 self.0.peer_addr()
99 }
100
101 /// Returns the socket address of the local half of this TCP connection.
102 #[stable(feature = "rust1", since = "1.0.0")]
103 pub fn local_addr(&self) -> io::Result<SocketAddr> {
104 self.0.socket_addr()
105 }
106
107 /// Shuts down the read, write, or both halves of this connection.
108 ///
109 /// This function will cause all pending and future I/O on the specified
110 /// portions to return immediately with an appropriate value (see the
111 /// documentation of `Shutdown`).
112 #[stable(feature = "rust1", since = "1.0.0")]
113 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
114 self.0.shutdown(how)
115 }
116
117 /// Creates a new independently owned handle to the underlying socket.
118 ///
119 /// The returned `TcpStream` is a reference to the same stream that this
120 /// object references. Both handles will read and write the same stream of
121 /// data, and options set on one stream will be propagated to the other
122 /// stream.
123 #[stable(feature = "rust1", since = "1.0.0")]
124 pub fn try_clone(&self) -> io::Result<TcpStream> {
125 self.0.duplicate().map(TcpStream)
126 }
127
128 /// Sets the read timeout to the timeout specified.
129 ///
130 /// If the value specified is `None`, then `read` calls will block
131 /// indefinitely. It is an error to pass the zero `Duration` to this
132 /// method.
133 ///
134 /// # Note
135 ///
136 /// Platforms may return a different error code whenever a read times out as
137 /// a result of setting this option. For example Unix typically returns an
138 /// error of the kind `WouldBlock`, but Windows may return `TimedOut`.
139 #[stable(feature = "socket_timeout", since = "1.4.0")]
140 pub fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
141 self.0.set_read_timeout(dur)
142 }
143
144 /// Sets the write timeout to the timeout specified.
145 ///
146 /// If the value specified is `None`, then `write` calls will block
147 /// indefinitely. It is an error to pass the zero `Duration` to this
148 /// method.
149 ///
150 /// # Note
151 ///
152 /// Platforms may return a different error code whenever a write times out
153 /// as a result of setting this option. For example Unix typically returns
154 /// an error of the kind `WouldBlock`, but Windows may return `TimedOut`.
155 #[stable(feature = "socket_timeout", since = "1.4.0")]
156 pub fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
157 self.0.set_write_timeout(dur)
158 }
159
160 /// Returns the read timeout of this socket.
161 ///
162 /// If the timeout is `None`, then `read` calls will block indefinitely.
163 ///
164 /// # Note
165 ///
166 /// Some platforms do not provide access to the current timeout.
167 #[stable(feature = "socket_timeout", since = "1.4.0")]
168 pub fn read_timeout(&self) -> io::Result<Option<Duration>> {
169 self.0.read_timeout()
170 }
171
172 /// Returns the write timeout of this socket.
173 ///
174 /// If the timeout is `None`, then `write` calls will block indefinitely.
175 ///
176 /// # Note
177 ///
178 /// Some platforms do not provide access to the current timeout.
179 #[stable(feature = "socket_timeout", since = "1.4.0")]
180 pub fn write_timeout(&self) -> io::Result<Option<Duration>> {
181 self.0.write_timeout()
182 }
183 }
184
185 #[stable(feature = "rust1", since = "1.0.0")]
186 impl Read for TcpStream {
187 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.0.read(buf) }
188 fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
189 unsafe { read_to_end_uninitialized(self, buf) }
190 }
191 }
192 #[stable(feature = "rust1", since = "1.0.0")]
193 impl Write for TcpStream {
194 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self.0.write(buf) }
195 fn flush(&mut self) -> io::Result<()> { Ok(()) }
196 }
197 #[stable(feature = "rust1", since = "1.0.0")]
198 impl<'a> Read for &'a TcpStream {
199 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.0.read(buf) }
200 fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
201 unsafe { read_to_end_uninitialized(self, buf) }
202 }
203 }
204 #[stable(feature = "rust1", since = "1.0.0")]
205 impl<'a> Write for &'a TcpStream {
206 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self.0.write(buf) }
207 fn flush(&mut self) -> io::Result<()> { Ok(()) }
208 }
209
210 impl AsInner<net_imp::TcpStream> for TcpStream {
211 fn as_inner(&self) -> &net_imp::TcpStream { &self.0 }
212 }
213
214 impl FromInner<net_imp::TcpStream> for TcpStream {
215 fn from_inner(inner: net_imp::TcpStream) -> TcpStream { TcpStream(inner) }
216 }
217
218 impl IntoInner<net_imp::TcpStream> for TcpStream {
219 fn into_inner(self) -> net_imp::TcpStream { self.0 }
220 }
221
222 impl fmt::Debug for TcpStream {
223 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
224 self.0.fmt(f)
225 }
226 }
227
228 impl TcpListener {
229 /// Creates a new `TcpListener` which will be bound to the specified
230 /// address.
231 ///
232 /// The returned listener is ready for accepting connections.
233 ///
234 /// Binding with a port number of 0 will request that the OS assigns a port
235 /// to this listener. The port allocated can be queried via the
236 /// `socket_addr` function.
237 ///
238 /// The address type can be any implementor of `ToSocketAddrs` trait. See
239 /// its documentation for concrete examples.
240 #[stable(feature = "rust1", since = "1.0.0")]
241 pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> {
242 super::each_addr(addr, net_imp::TcpListener::bind).map(TcpListener)
243 }
244
245 /// Returns the local socket address of this listener.
246 #[stable(feature = "rust1", since = "1.0.0")]
247 pub fn local_addr(&self) -> io::Result<SocketAddr> {
248 self.0.socket_addr()
249 }
250
251 /// Creates a new independently owned handle to the underlying socket.
252 ///
253 /// The returned `TcpListener` is a reference to the same socket that this
254 /// object references. Both handles can be used to accept incoming
255 /// connections and options set on one listener will affect the other.
256 #[stable(feature = "rust1", since = "1.0.0")]
257 pub fn try_clone(&self) -> io::Result<TcpListener> {
258 self.0.duplicate().map(TcpListener)
259 }
260
261 /// Accept a new incoming connection from this listener.
262 ///
263 /// This function will block the calling thread until a new TCP connection
264 /// is established. When established, the corresponding `TcpStream` and the
265 /// remote peer's address will be returned.
266 #[stable(feature = "rust1", since = "1.0.0")]
267 pub fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
268 self.0.accept().map(|(a, b)| (TcpStream(a), b))
269 }
270
271 /// Returns an iterator over the connections being received on this
272 /// listener.
273 ///
274 /// The returned iterator will never return `None` and will also not yield
275 /// the peer's `SocketAddr` structure.
276 #[stable(feature = "rust1", since = "1.0.0")]
277 pub fn incoming(&self) -> Incoming {
278 Incoming { listener: self }
279 }
280 }
281
282 #[stable(feature = "rust1", since = "1.0.0")]
283 impl<'a> Iterator for Incoming<'a> {
284 type Item = io::Result<TcpStream>;
285 fn next(&mut self) -> Option<io::Result<TcpStream>> {
286 Some(self.listener.accept().map(|p| p.0))
287 }
288 }
289
290 impl AsInner<net_imp::TcpListener> for TcpListener {
291 fn as_inner(&self) -> &net_imp::TcpListener { &self.0 }
292 }
293
294 impl FromInner<net_imp::TcpListener> for TcpListener {
295 fn from_inner(inner: net_imp::TcpListener) -> TcpListener {
296 TcpListener(inner)
297 }
298 }
299
300 impl IntoInner<net_imp::TcpListener> for TcpListener {
301 fn into_inner(self) -> net_imp::TcpListener { self.0 }
302 }
303
304 impl fmt::Debug for TcpListener {
305 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
306 self.0.fmt(f)
307 }
308 }
309
310 #[cfg(test)]
311 mod tests {
312 use prelude::v1::*;
313
314 use io::ErrorKind;
315 use io::prelude::*;
316 use net::*;
317 use net::test::{next_test_ip4, next_test_ip6};
318 use sync::mpsc::channel;
319 use sys_common::AsInner;
320 use time::Duration;
321 use thread;
322
323 fn each_ip(f: &mut FnMut(SocketAddr)) {
324 f(next_test_ip4());
325 f(next_test_ip6());
326 }
327
328 macro_rules! t {
329 ($e:expr) => {
330 match $e {
331 Ok(t) => t,
332 Err(e) => panic!("received error for `{}`: {}", stringify!($e), e),
333 }
334 }
335 }
336
337 #[test]
338 fn bind_error() {
339 match TcpListener::bind("1.1.1.1:9999") {
340 Ok(..) => panic!(),
341 Err(e) =>
342 assert_eq!(e.kind(), ErrorKind::AddrNotAvailable),
343 }
344 }
345
346 #[test]
347 fn connect_error() {
348 match TcpStream::connect("0.0.0.0:1") {
349 Ok(..) => panic!(),
350 Err(e) => assert!(e.kind() == ErrorKind::ConnectionRefused ||
351 e.kind() == ErrorKind::InvalidInput ||
352 e.kind() == ErrorKind::AddrInUse ||
353 e.kind() == ErrorKind::AddrNotAvailable,
354 "bad error: {} {:?}", e, e.kind()),
355 }
356 }
357
358 #[test]
359 fn listen_localhost() {
360 let socket_addr = next_test_ip4();
361 let listener = t!(TcpListener::bind(&socket_addr));
362
363 let _t = thread::spawn(move || {
364 let mut stream = t!(TcpStream::connect(&("localhost",
365 socket_addr.port())));
366 t!(stream.write(&[144]));
367 });
368
369 let mut stream = t!(listener.accept()).0;
370 let mut buf = [0];
371 t!(stream.read(&mut buf));
372 assert!(buf[0] == 144);
373 }
374
375 #[test]
376 fn connect_loopback() {
377 each_ip(&mut |addr| {
378 let acceptor = t!(TcpListener::bind(&addr));
379
380 let _t = thread::spawn(move|| {
381 let host = match addr {
382 SocketAddr::V4(..) => "127.0.0.1",
383 SocketAddr::V6(..) => "::1",
384 };
385 let mut stream = t!(TcpStream::connect(&(host, addr.port())));
386 t!(stream.write(&[66]));
387 });
388
389 let mut stream = t!(acceptor.accept()).0;
390 let mut buf = [0];
391 t!(stream.read(&mut buf));
392 assert!(buf[0] == 66);
393 })
394 }
395
396 #[test]
397 fn smoke_test() {
398 each_ip(&mut |addr| {
399 let acceptor = t!(TcpListener::bind(&addr));
400
401 let (tx, rx) = channel();
402 let _t = thread::spawn(move|| {
403 let mut stream = t!(TcpStream::connect(&addr));
404 t!(stream.write(&[99]));
405 tx.send(t!(stream.local_addr())).unwrap();
406 });
407
408 let (mut stream, addr) = t!(acceptor.accept());
409 let mut buf = [0];
410 t!(stream.read(&mut buf));
411 assert!(buf[0] == 99);
412 assert_eq!(addr, t!(rx.recv()));
413 })
414 }
415
416 #[test]
417 fn read_eof() {
418 each_ip(&mut |addr| {
419 let acceptor = t!(TcpListener::bind(&addr));
420
421 let _t = thread::spawn(move|| {
422 let _stream = t!(TcpStream::connect(&addr));
423 // Close
424 });
425
426 let mut stream = t!(acceptor.accept()).0;
427 let mut buf = [0];
428 let nread = t!(stream.read(&mut buf));
429 assert_eq!(nread, 0);
430 let nread = t!(stream.read(&mut buf));
431 assert_eq!(nread, 0);
432 })
433 }
434
435 #[test]
436 fn write_close() {
437 each_ip(&mut |addr| {
438 let acceptor = t!(TcpListener::bind(&addr));
439
440 let (tx, rx) = channel();
441 let _t = thread::spawn(move|| {
442 drop(t!(TcpStream::connect(&addr)));
443 tx.send(()).unwrap();
444 });
445
446 let mut stream = t!(acceptor.accept()).0;
447 rx.recv().unwrap();
448 let buf = [0];
449 match stream.write(&buf) {
450 Ok(..) => {}
451 Err(e) => {
452 assert!(e.kind() == ErrorKind::ConnectionReset ||
453 e.kind() == ErrorKind::BrokenPipe ||
454 e.kind() == ErrorKind::ConnectionAborted,
455 "unknown error: {}", e);
456 }
457 }
458 })
459 }
460
461 #[test]
462 fn multiple_connect_serial() {
463 each_ip(&mut |addr| {
464 let max = 10;
465 let acceptor = t!(TcpListener::bind(&addr));
466
467 let _t = thread::spawn(move|| {
468 for _ in 0..max {
469 let mut stream = t!(TcpStream::connect(&addr));
470 t!(stream.write(&[99]));
471 }
472 });
473
474 for stream in acceptor.incoming().take(max) {
475 let mut stream = t!(stream);
476 let mut buf = [0];
477 t!(stream.read(&mut buf));
478 assert_eq!(buf[0], 99);
479 }
480 })
481 }
482
483 #[test]
484 fn multiple_connect_interleaved_greedy_schedule() {
485 const MAX: usize = 10;
486 each_ip(&mut |addr| {
487 let acceptor = t!(TcpListener::bind(&addr));
488
489 let _t = thread::spawn(move|| {
490 let acceptor = acceptor;
491 for (i, stream) in acceptor.incoming().enumerate().take(MAX) {
492 // Start another thread to handle the connection
493 let _t = thread::spawn(move|| {
494 let mut stream = t!(stream);
495 let mut buf = [0];
496 t!(stream.read(&mut buf));
497 assert!(buf[0] == i as u8);
498 });
499 }
500 });
501
502 connect(0, addr);
503 });
504
505 fn connect(i: usize, addr: SocketAddr) {
506 if i == MAX { return }
507
508 let t = thread::spawn(move|| {
509 let mut stream = t!(TcpStream::connect(&addr));
510 // Connect again before writing
511 connect(i + 1, addr);
512 t!(stream.write(&[i as u8]));
513 });
514 t.join().ok().unwrap();
515 }
516 }
517
518 #[test]
519 fn multiple_connect_interleaved_lazy_schedule() {
520 const MAX: usize = 10;
521 each_ip(&mut |addr| {
522 let acceptor = t!(TcpListener::bind(&addr));
523
524 let _t = thread::spawn(move|| {
525 for stream in acceptor.incoming().take(MAX) {
526 // Start another thread to handle the connection
527 let _t = thread::spawn(move|| {
528 let mut stream = t!(stream);
529 let mut buf = [0];
530 t!(stream.read(&mut buf));
531 assert!(buf[0] == 99);
532 });
533 }
534 });
535
536 connect(0, addr);
537 });
538
539 fn connect(i: usize, addr: SocketAddr) {
540 if i == MAX { return }
541
542 let t = thread::spawn(move|| {
543 let mut stream = t!(TcpStream::connect(&addr));
544 connect(i + 1, addr);
545 t!(stream.write(&[99]));
546 });
547 t.join().ok().unwrap();
548 }
549 }
550
551 #[test]
552 fn socket_and_peer_name() {
553 each_ip(&mut |addr| {
554 let listener = t!(TcpListener::bind(&addr));
555 let so_name = t!(listener.local_addr());
556 assert_eq!(addr, so_name);
557 let _t = thread::spawn(move|| {
558 t!(listener.accept());
559 });
560
561 let stream = t!(TcpStream::connect(&addr));
562 assert_eq!(addr, t!(stream.peer_addr()));
563 })
564 }
565
566 #[test]
567 fn partial_read() {
568 each_ip(&mut |addr| {
569 let (tx, rx) = channel();
570 let srv = t!(TcpListener::bind(&addr));
571 let _t = thread::spawn(move|| {
572 let mut cl = t!(srv.accept()).0;
573 cl.write(&[10]).unwrap();
574 let mut b = [0];
575 t!(cl.read(&mut b));
576 tx.send(()).unwrap();
577 });
578
579 let mut c = t!(TcpStream::connect(&addr));
580 let mut b = [0; 10];
581 assert_eq!(c.read(&mut b).unwrap(), 1);
582 t!(c.write(&[1]));
583 rx.recv().unwrap();
584 })
585 }
586
587 #[test]
588 fn double_bind() {
589 each_ip(&mut |addr| {
590 let _listener = t!(TcpListener::bind(&addr));
591 match TcpListener::bind(&addr) {
592 Ok(..) => panic!(),
593 Err(e) => {
594 assert!(e.kind() == ErrorKind::ConnectionRefused ||
595 e.kind() == ErrorKind::Other ||
596 e.kind() == ErrorKind::AddrInUse,
597 "unknown error: {} {:?}", e, e.kind());
598 }
599 }
600 })
601 }
602
603 #[test]
604 fn fast_rebind() {
605 each_ip(&mut |addr| {
606 let acceptor = t!(TcpListener::bind(&addr));
607
608 let _t = thread::spawn(move|| {
609 t!(TcpStream::connect(&addr));
610 });
611
612 t!(acceptor.accept());
613 drop(acceptor);
614 t!(TcpListener::bind(&addr));
615 });
616 }
617
618 #[test]
619 fn tcp_clone_smoke() {
620 each_ip(&mut |addr| {
621 let acceptor = t!(TcpListener::bind(&addr));
622
623 let _t = thread::spawn(move|| {
624 let mut s = t!(TcpStream::connect(&addr));
625 let mut buf = [0, 0];
626 assert_eq!(s.read(&mut buf).unwrap(), 1);
627 assert_eq!(buf[0], 1);
628 t!(s.write(&[2]));
629 });
630
631 let mut s1 = t!(acceptor.accept()).0;
632 let s2 = t!(s1.try_clone());
633
634 let (tx1, rx1) = channel();
635 let (tx2, rx2) = channel();
636 let _t = thread::spawn(move|| {
637 let mut s2 = s2;
638 rx1.recv().unwrap();
639 t!(s2.write(&[1]));
640 tx2.send(()).unwrap();
641 });
642 tx1.send(()).unwrap();
643 let mut buf = [0, 0];
644 assert_eq!(s1.read(&mut buf).unwrap(), 1);
645 rx2.recv().unwrap();
646 })
647 }
648
649 #[test]
650 fn tcp_clone_two_read() {
651 each_ip(&mut |addr| {
652 let acceptor = t!(TcpListener::bind(&addr));
653 let (tx1, rx) = channel();
654 let tx2 = tx1.clone();
655
656 let _t = thread::spawn(move|| {
657 let mut s = t!(TcpStream::connect(&addr));
658 t!(s.write(&[1]));
659 rx.recv().unwrap();
660 t!(s.write(&[2]));
661 rx.recv().unwrap();
662 });
663
664 let mut s1 = t!(acceptor.accept()).0;
665 let s2 = t!(s1.try_clone());
666
667 let (done, rx) = channel();
668 let _t = thread::spawn(move|| {
669 let mut s2 = s2;
670 let mut buf = [0, 0];
671 t!(s2.read(&mut buf));
672 tx2.send(()).unwrap();
673 done.send(()).unwrap();
674 });
675 let mut buf = [0, 0];
676 t!(s1.read(&mut buf));
677 tx1.send(()).unwrap();
678
679 rx.recv().unwrap();
680 })
681 }
682
683 #[test]
684 fn tcp_clone_two_write() {
685 each_ip(&mut |addr| {
686 let acceptor = t!(TcpListener::bind(&addr));
687
688 let _t = thread::spawn(move|| {
689 let mut s = t!(TcpStream::connect(&addr));
690 let mut buf = [0, 1];
691 t!(s.read(&mut buf));
692 t!(s.read(&mut buf));
693 });
694
695 let mut s1 = t!(acceptor.accept()).0;
696 let s2 = t!(s1.try_clone());
697
698 let (done, rx) = channel();
699 let _t = thread::spawn(move|| {
700 let mut s2 = s2;
701 t!(s2.write(&[1]));
702 done.send(()).unwrap();
703 });
704 t!(s1.write(&[2]));
705
706 rx.recv().unwrap();
707 })
708 }
709
710 #[test]
711 fn shutdown_smoke() {
712 each_ip(&mut |addr| {
713 let a = t!(TcpListener::bind(&addr));
714 let _t = thread::spawn(move|| {
715 let mut c = t!(a.accept()).0;
716 let mut b = [0];
717 assert_eq!(c.read(&mut b).unwrap(), 0);
718 t!(c.write(&[1]));
719 });
720
721 let mut s = t!(TcpStream::connect(&addr));
722 t!(s.shutdown(Shutdown::Write));
723 assert!(s.write(&[1]).is_err());
724 let mut b = [0, 0];
725 assert_eq!(t!(s.read(&mut b)), 1);
726 assert_eq!(b[0], 1);
727 })
728 }
729
730 #[test]
731 fn close_readwrite_smoke() {
732 each_ip(&mut |addr| {
733 let a = t!(TcpListener::bind(&addr));
734 let (tx, rx) = channel::<()>();
735 let _t = thread::spawn(move|| {
736 let _s = t!(a.accept());
737 let _ = rx.recv();
738 });
739
740 let mut b = [0];
741 let mut s = t!(TcpStream::connect(&addr));
742 let mut s2 = t!(s.try_clone());
743
744 // closing should prevent reads/writes
745 t!(s.shutdown(Shutdown::Write));
746 assert!(s.write(&[0]).is_err());
747 t!(s.shutdown(Shutdown::Read));
748 assert_eq!(s.read(&mut b).unwrap(), 0);
749
750 // closing should affect previous handles
751 assert!(s2.write(&[0]).is_err());
752 assert_eq!(s2.read(&mut b).unwrap(), 0);
753
754 // closing should affect new handles
755 let mut s3 = t!(s.try_clone());
756 assert!(s3.write(&[0]).is_err());
757 assert_eq!(s3.read(&mut b).unwrap(), 0);
758
759 // make sure these don't die
760 let _ = s2.shutdown(Shutdown::Read);
761 let _ = s2.shutdown(Shutdown::Write);
762 let _ = s3.shutdown(Shutdown::Read);
763 let _ = s3.shutdown(Shutdown::Write);
764 drop(tx);
765 })
766 }
767
768 #[test]
769 fn close_read_wakes_up() {
770 each_ip(&mut |addr| {
771 let a = t!(TcpListener::bind(&addr));
772 let (tx1, rx) = channel::<()>();
773 let _t = thread::spawn(move|| {
774 let _s = t!(a.accept());
775 let _ = rx.recv();
776 });
777
778 let s = t!(TcpStream::connect(&addr));
779 let s2 = t!(s.try_clone());
780 let (tx, rx) = channel();
781 let _t = thread::spawn(move|| {
782 let mut s2 = s2;
783 assert_eq!(t!(s2.read(&mut [0])), 0);
784 tx.send(()).unwrap();
785 });
786 // this should wake up the child thread
787 t!(s.shutdown(Shutdown::Read));
788
789 // this test will never finish if the child doesn't wake up
790 rx.recv().unwrap();
791 drop(tx1);
792 })
793 }
794
795 #[test]
796 fn clone_while_reading() {
797 each_ip(&mut |addr| {
798 let accept = t!(TcpListener::bind(&addr));
799
800 // Enqueue a thread to write to a socket
801 let (tx, rx) = channel();
802 let (txdone, rxdone) = channel();
803 let txdone2 = txdone.clone();
804 let _t = thread::spawn(move|| {
805 let mut tcp = t!(TcpStream::connect(&addr));
806 rx.recv().unwrap();
807 t!(tcp.write(&[0]));
808 txdone2.send(()).unwrap();
809 });
810
811 // Spawn off a reading clone
812 let tcp = t!(accept.accept()).0;
813 let tcp2 = t!(tcp.try_clone());
814 let txdone3 = txdone.clone();
815 let _t = thread::spawn(move|| {
816 let mut tcp2 = tcp2;
817 t!(tcp2.read(&mut [0]));
818 txdone3.send(()).unwrap();
819 });
820
821 // Try to ensure that the reading clone is indeed reading
822 for _ in 0..50 {
823 thread::yield_now();
824 }
825
826 // clone the handle again while it's reading, then let it finish the
827 // read.
828 let _ = t!(tcp.try_clone());
829 tx.send(()).unwrap();
830 rxdone.recv().unwrap();
831 rxdone.recv().unwrap();
832 })
833 }
834
835 #[test]
836 fn clone_accept_smoke() {
837 each_ip(&mut |addr| {
838 let a = t!(TcpListener::bind(&addr));
839 let a2 = t!(a.try_clone());
840
841 let _t = thread::spawn(move|| {
842 let _ = TcpStream::connect(&addr);
843 });
844 let _t = thread::spawn(move|| {
845 let _ = TcpStream::connect(&addr);
846 });
847
848 t!(a.accept());
849 t!(a2.accept());
850 })
851 }
852
853 #[test]
854 fn clone_accept_concurrent() {
855 each_ip(&mut |addr| {
856 let a = t!(TcpListener::bind(&addr));
857 let a2 = t!(a.try_clone());
858
859 let (tx, rx) = channel();
860 let tx2 = tx.clone();
861
862 let _t = thread::spawn(move|| {
863 tx.send(t!(a.accept())).unwrap();
864 });
865 let _t = thread::spawn(move|| {
866 tx2.send(t!(a2.accept())).unwrap();
867 });
868
869 let _t = thread::spawn(move|| {
870 let _ = TcpStream::connect(&addr);
871 });
872 let _t = thread::spawn(move|| {
873 let _ = TcpStream::connect(&addr);
874 });
875
876 rx.recv().unwrap();
877 rx.recv().unwrap();
878 })
879 }
880
881 #[test]
882 fn debug() {
883 let name = if cfg!(windows) {"socket"} else {"fd"};
884 let socket_addr = next_test_ip4();
885
886 let listener = t!(TcpListener::bind(&socket_addr));
887 let listener_inner = listener.0.socket().as_inner();
888 let compare = format!("TcpListener {{ addr: {:?}, {}: {:?} }}",
889 socket_addr, name, listener_inner);
890 assert_eq!(format!("{:?}", listener), compare);
891
892 let stream = t!(TcpStream::connect(&("localhost",
893 socket_addr.port())));
894 let stream_inner = stream.0.socket().as_inner();
895 let compare = format!("TcpStream {{ addr: {:?}, \
896 peer: {:?}, {}: {:?} }}",
897 stream.local_addr().unwrap(),
898 stream.peer_addr().unwrap(),
899 name,
900 stream_inner);
901 assert_eq!(format!("{:?}", stream), compare);
902 }
903
904 // FIXME: re-enabled bitrig/openbsd tests once their socket timeout code
905 // no longer has rounding errors.
906 #[cfg_attr(any(target_os = "bitrig", target_os = "netbsd", target_os = "openbsd"), ignore)]
907 #[test]
908 fn timeouts() {
909 let addr = next_test_ip4();
910 let listener = t!(TcpListener::bind(&addr));
911
912 let stream = t!(TcpStream::connect(&("localhost", addr.port())));
913 let dur = Duration::new(15410, 0);
914
915 assert_eq!(None, t!(stream.read_timeout()));
916
917 t!(stream.set_read_timeout(Some(dur)));
918 assert_eq!(Some(dur), t!(stream.read_timeout()));
919
920 assert_eq!(None, t!(stream.write_timeout()));
921
922 t!(stream.set_write_timeout(Some(dur)));
923 assert_eq!(Some(dur), t!(stream.write_timeout()));
924
925 t!(stream.set_read_timeout(None));
926 assert_eq!(None, t!(stream.read_timeout()));
927
928 t!(stream.set_write_timeout(None));
929 assert_eq!(None, t!(stream.write_timeout()));
930 }
931
932 #[test]
933 fn test_read_timeout() {
934 let addr = next_test_ip4();
935 let listener = t!(TcpListener::bind(&addr));
936
937 let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
938 t!(stream.set_read_timeout(Some(Duration::from_millis(1000))));
939
940 let mut buf = [0; 10];
941 let wait = Duration::span(|| {
942 let kind = stream.read(&mut buf).err().expect("expected error").kind();
943 assert!(kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut);
944 });
945 assert!(wait > Duration::from_millis(400));
946 }
947
948 #[test]
949 fn test_read_with_timeout() {
950 let addr = next_test_ip4();
951 let listener = t!(TcpListener::bind(&addr));
952
953 let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
954 t!(stream.set_read_timeout(Some(Duration::from_millis(1000))));
955
956 let mut other_end = t!(listener.accept()).0;
957 t!(other_end.write_all(b"hello world"));
958
959 let mut buf = [0; 11];
960 t!(stream.read(&mut buf));
961 assert_eq!(b"hello world", &buf[..]);
962
963 let wait = Duration::span(|| {
964 let kind = stream.read(&mut buf).err().expect("expected error").kind();
965 assert!(kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut);
966 });
967 assert!(wait > Duration::from_millis(400));
968 }
969 }