1 // Copyright 2015 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
16 use net
::{ToSocketAddrs, SocketAddr, Shutdown}
;
17 use sys_common
::io
::read_to_end_uninitialized
;
18 use sys_common
::net
as net_imp
;
19 use sys_common
::{AsInner, FromInner, IntoInner}
;
22 /// A structure which represents a TCP stream between a local socket and a
25 /// The socket will be closed when the value is dropped.
30 /// use std::io::prelude::*;
31 /// use std::net::TcpStream;
34 /// let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();
36 /// // ignore the Result
37 /// let _ = stream.write(&[1]);
38 /// let _ = stream.read(&mut [0; 128]); // ignore here too
39 /// } // the stream is closed here
41 #[stable(feature = "rust1", since = "1.0.0")]
42 pub struct TcpStream(net_imp
::TcpStream
);
44 /// A structure representing a socket server.
49 /// use std::net::{TcpListener, TcpStream};
52 /// let listener = TcpListener::bind("127.0.0.1:80").unwrap();
54 /// fn handle_client(stream: TcpStream) {
58 /// // accept connections and process them, spawning a new thread for each one
59 /// for stream in listener.incoming() {
62 /// thread::spawn(move|| {
63 /// // connection succeeded
64 /// handle_client(stream)
67 /// Err(e) => { /* connection failed */ }
71 /// // close the socket server
74 #[stable(feature = "rust1", since = "1.0.0")]
75 pub struct TcpListener(net_imp
::TcpListener
);
77 /// An infinite iterator over the connections from a `TcpListener`.
79 /// This iterator will infinitely yield `Some` of the accepted connections. It
80 /// is equivalent to calling `accept` in a loop.
81 #[stable(feature = "rust1", since = "1.0.0")]
82 pub struct Incoming
<'a
> { listener: &'a TcpListener }
85 /// Opens a TCP connection to a remote host.
87 /// `addr` is an address of the remote host. Anything which implements
88 /// `ToSocketAddrs` trait can be supplied for the address; see this trait
89 /// documentation for concrete examples.
90 #[stable(feature = "rust1", since = "1.0.0")]
91 pub fn connect
<A
: ToSocketAddrs
>(addr
: A
) -> io
::Result
<TcpStream
> {
92 super::each_addr(addr
, net_imp
::TcpStream
::connect
).map(TcpStream
)
95 /// Returns the socket address of the remote peer of this TCP connection.
96 #[stable(feature = "rust1", since = "1.0.0")]
97 pub fn peer_addr(&self) -> io
::Result
<SocketAddr
> {
101 /// Returns the socket address of the local half of this TCP connection.
102 #[stable(feature = "rust1", since = "1.0.0")]
103 pub fn local_addr(&self) -> io
::Result
<SocketAddr
> {
107 /// Shuts down the read, write, or both halves of this connection.
109 /// This function will cause all pending and future I/O on the specified
110 /// portions to return immediately with an appropriate value (see the
111 /// documentation of `Shutdown`).
112 #[stable(feature = "rust1", since = "1.0.0")]
113 pub fn shutdown(&self, how
: Shutdown
) -> io
::Result
<()> {
117 /// Creates a new independently owned handle to the underlying socket.
119 /// The returned `TcpStream` is a reference to the same stream that this
120 /// object references. Both handles will read and write the same stream of
121 /// data, and options set on one stream will be propagated to the other
123 #[stable(feature = "rust1", since = "1.0.0")]
124 pub fn try_clone(&self) -> io
::Result
<TcpStream
> {
125 self.0.duplicate().map(TcpStream
)
128 /// Sets the read timeout to the timeout specified.
130 /// If the value specified is `None`, then `read` calls will block
131 /// indefinitely. It is an error to pass the zero `Duration` to this
136 /// Platforms may return a different error code whenever a read times out as
137 /// a result of setting this option. For example Unix typically returns an
138 /// error of the kind `WouldBlock`, but Windows may return `TimedOut`.
139 #[stable(feature = "socket_timeout", since = "1.4.0")]
140 pub fn set_read_timeout(&self, dur
: Option
<Duration
>) -> io
::Result
<()> {
141 self.0.set_read_timeout(dur
)
144 /// Sets the write timeout to the timeout specified.
146 /// If the value specified is `None`, then `write` calls will block
147 /// indefinitely. It is an error to pass the zero `Duration` to this
152 /// Platforms may return a different error code whenever a write times out
153 /// as a result of setting this option. For example Unix typically returns
154 /// an error of the kind `WouldBlock`, but Windows may return `TimedOut`.
155 #[stable(feature = "socket_timeout", since = "1.4.0")]
156 pub fn set_write_timeout(&self, dur
: Option
<Duration
>) -> io
::Result
<()> {
157 self.0.set_write_timeout(dur
)
160 /// Returns the read timeout of this socket.
162 /// If the timeout is `None`, then `read` calls will block indefinitely.
166 /// Some platforms do not provide access to the current timeout.
167 #[stable(feature = "socket_timeout", since = "1.4.0")]
168 pub fn read_timeout(&self) -> io
::Result
<Option
<Duration
>> {
169 self.0.read_timeout()
172 /// Returns the write timeout of this socket.
174 /// If the timeout is `None`, then `write` calls will block indefinitely.
178 /// Some platforms do not provide access to the current timeout.
179 #[stable(feature = "socket_timeout", since = "1.4.0")]
180 pub fn write_timeout(&self) -> io
::Result
<Option
<Duration
>> {
181 self.0.write_timeout()
185 #[stable(feature = "rust1", since = "1.0.0")]
186 impl Read
for TcpStream
{
187 fn read(&mut self, buf
: &mut [u8]) -> io
::Result
<usize> { self.0.read(buf) }
188 fn read_to_end(&mut self, buf
: &mut Vec
<u8>) -> io
::Result
<usize> {
189 unsafe { read_to_end_uninitialized(self, buf) }
192 #[stable(feature = "rust1", since = "1.0.0")]
193 impl Write
for TcpStream
{
194 fn write(&mut self, buf
: &[u8]) -> io
::Result
<usize> { self.0.write(buf) }
195 fn flush(&mut self) -> io
::Result
<()> { Ok(()) }
197 #[stable(feature = "rust1", since = "1.0.0")]
198 impl<'a
> Read
for &'a TcpStream
{
199 fn read(&mut self, buf
: &mut [u8]) -> io
::Result
<usize> { self.0.read(buf) }
200 fn read_to_end(&mut self, buf
: &mut Vec
<u8>) -> io
::Result
<usize> {
201 unsafe { read_to_end_uninitialized(self, buf) }
204 #[stable(feature = "rust1", since = "1.0.0")]
205 impl<'a
> Write
for &'a TcpStream
{
206 fn write(&mut self, buf
: &[u8]) -> io
::Result
<usize> { self.0.write(buf) }
207 fn flush(&mut self) -> io
::Result
<()> { Ok(()) }
210 impl AsInner
<net_imp
::TcpStream
> for TcpStream
{
211 fn as_inner(&self) -> &net_imp
::TcpStream { &self.0 }
214 impl FromInner
<net_imp
::TcpStream
> for TcpStream
{
215 fn from_inner(inner
: net_imp
::TcpStream
) -> TcpStream { TcpStream(inner) }
218 impl IntoInner
<net_imp
::TcpStream
> for TcpStream
{
219 fn into_inner(self) -> net_imp
::TcpStream { self.0 }
222 impl fmt
::Debug
for TcpStream
{
223 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
229 /// Creates a new `TcpListener` which will be bound to the specified
232 /// The returned listener is ready for accepting connections.
234 /// Binding with a port number of 0 will request that the OS assigns a port
235 /// to this listener. The port allocated can be queried via the
236 /// `socket_addr` function.
238 /// The address type can be any implementor of `ToSocketAddrs` trait. See
239 /// its documentation for concrete examples.
240 #[stable(feature = "rust1", since = "1.0.0")]
241 pub fn bind
<A
: ToSocketAddrs
>(addr
: A
) -> io
::Result
<TcpListener
> {
242 super::each_addr(addr
, net_imp
::TcpListener
::bind
).map(TcpListener
)
245 /// Returns the local socket address of this listener.
246 #[stable(feature = "rust1", since = "1.0.0")]
247 pub fn local_addr(&self) -> io
::Result
<SocketAddr
> {
251 /// Creates a new independently owned handle to the underlying socket.
253 /// The returned `TcpListener` is a reference to the same socket that this
254 /// object references. Both handles can be used to accept incoming
255 /// connections and options set on one listener will affect the other.
256 #[stable(feature = "rust1", since = "1.0.0")]
257 pub fn try_clone(&self) -> io
::Result
<TcpListener
> {
258 self.0.duplicate().map(TcpListener
)
261 /// Accept a new incoming connection from this listener.
263 /// This function will block the calling thread until a new TCP connection
264 /// is established. When established, the corresponding `TcpStream` and the
265 /// remote peer's address will be returned.
266 #[stable(feature = "rust1", since = "1.0.0")]
267 pub fn accept(&self) -> io
::Result
<(TcpStream
, SocketAddr
)> {
268 self.0.accept().map(|(a
, b
)| (TcpStream(a
), b
))
271 /// Returns an iterator over the connections being received on this
274 /// The returned iterator will never return `None` and will also not yield
275 /// the peer's `SocketAddr` structure.
276 #[stable(feature = "rust1", since = "1.0.0")]
277 pub fn incoming(&self) -> Incoming
{
278 Incoming { listener: self }
282 #[stable(feature = "rust1", since = "1.0.0")]
283 impl<'a
> Iterator
for Incoming
<'a
> {
284 type Item
= io
::Result
<TcpStream
>;
285 fn next(&mut self) -> Option
<io
::Result
<TcpStream
>> {
286 Some(self.listener
.accept().map(|p
| p
.0))
290 impl AsInner
<net_imp
::TcpListener
> for TcpListener
{
291 fn as_inner(&self) -> &net_imp
::TcpListener { &self.0 }
294 impl FromInner
<net_imp
::TcpListener
> for TcpListener
{
295 fn from_inner(inner
: net_imp
::TcpListener
) -> TcpListener
{
300 impl IntoInner
<net_imp
::TcpListener
> for TcpListener
{
301 fn into_inner(self) -> net_imp
::TcpListener { self.0 }
304 impl fmt
::Debug
for TcpListener
{
305 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
317 use net
::test
::{next_test_ip4, next_test_ip6}
;
318 use sync
::mpsc
::channel
;
319 use sys_common
::AsInner
;
323 fn each_ip(f
: &mut FnMut(SocketAddr
)) {
332 Err(e
) => panic
!("received error for `{}`: {}", stringify
!($e
), e
),
339 match TcpListener
::bind("1.1.1.1:9999") {
342 assert_eq
!(e
.kind(), ErrorKind
::AddrNotAvailable
),
348 match TcpStream
::connect("0.0.0.0:1") {
350 Err(e
) => assert
!(e
.kind() == ErrorKind
::ConnectionRefused
||
351 e
.kind() == ErrorKind
::InvalidInput
||
352 e
.kind() == ErrorKind
::AddrInUse
||
353 e
.kind() == ErrorKind
::AddrNotAvailable
,
354 "bad error: {} {:?}", e
, e
.kind()),
359 fn listen_localhost() {
360 let socket_addr
= next_test_ip4();
361 let listener
= t
!(TcpListener
::bind(&socket_addr
));
363 let _t
= thread
::spawn(move || {
364 let mut stream
= t
!(TcpStream
::connect(&("localhost",
365 socket_addr
.port())));
366 t
!(stream
.write(&[144]));
369 let mut stream
= t
!(listener
.accept()).0;
371 t
!(stream
.read(&mut buf
));
372 assert
!(buf
[0] == 144);
376 fn connect_loopback() {
377 each_ip(&mut |addr
| {
378 let acceptor
= t
!(TcpListener
::bind(&addr
));
380 let _t
= thread
::spawn(move|| {
381 let host
= match addr
{
382 SocketAddr
::V4(..) => "127.0.0.1",
383 SocketAddr
::V6(..) => "::1",
385 let mut stream
= t
!(TcpStream
::connect(&(host
, addr
.port())));
386 t
!(stream
.write(&[66]));
389 let mut stream
= t
!(acceptor
.accept()).0;
391 t
!(stream
.read(&mut buf
));
392 assert
!(buf
[0] == 66);
398 each_ip(&mut |addr
| {
399 let acceptor
= t
!(TcpListener
::bind(&addr
));
401 let (tx
, rx
) = channel();
402 let _t
= thread
::spawn(move|| {
403 let mut stream
= t
!(TcpStream
::connect(&addr
));
404 t
!(stream
.write(&[99]));
405 tx
.send(t
!(stream
.local_addr())).unwrap();
408 let (mut stream
, addr
) = t
!(acceptor
.accept());
410 t
!(stream
.read(&mut buf
));
411 assert
!(buf
[0] == 99);
412 assert_eq
!(addr
, t
!(rx
.recv()));
418 each_ip(&mut |addr
| {
419 let acceptor
= t
!(TcpListener
::bind(&addr
));
421 let _t
= thread
::spawn(move|| {
422 let _stream
= t
!(TcpStream
::connect(&addr
));
426 let mut stream
= t
!(acceptor
.accept()).0;
428 let nread
= t
!(stream
.read(&mut buf
));
429 assert_eq
!(nread
, 0);
430 let nread
= t
!(stream
.read(&mut buf
));
431 assert_eq
!(nread
, 0);
437 each_ip(&mut |addr
| {
438 let acceptor
= t
!(TcpListener
::bind(&addr
));
440 let (tx
, rx
) = channel();
441 let _t
= thread
::spawn(move|| {
442 drop(t
!(TcpStream
::connect(&addr
)));
443 tx
.send(()).unwrap();
446 let mut stream
= t
!(acceptor
.accept()).0;
449 match stream
.write(&buf
) {
452 assert
!(e
.kind() == ErrorKind
::ConnectionReset
||
453 e
.kind() == ErrorKind
::BrokenPipe
||
454 e
.kind() == ErrorKind
::ConnectionAborted
,
455 "unknown error: {}", e
);
462 fn multiple_connect_serial() {
463 each_ip(&mut |addr
| {
465 let acceptor
= t
!(TcpListener
::bind(&addr
));
467 let _t
= thread
::spawn(move|| {
469 let mut stream
= t
!(TcpStream
::connect(&addr
));
470 t
!(stream
.write(&[99]));
474 for stream
in acceptor
.incoming().take(max
) {
475 let mut stream
= t
!(stream
);
477 t
!(stream
.read(&mut buf
));
478 assert_eq
!(buf
[0], 99);
484 fn multiple_connect_interleaved_greedy_schedule() {
485 const MAX
: usize = 10;
486 each_ip(&mut |addr
| {
487 let acceptor
= t
!(TcpListener
::bind(&addr
));
489 let _t
= thread
::spawn(move|| {
490 let acceptor
= acceptor
;
491 for (i
, stream
) in acceptor
.incoming().enumerate().take(MAX
) {
492 // Start another thread to handle the connection
493 let _t
= thread
::spawn(move|| {
494 let mut stream
= t
!(stream
);
496 t
!(stream
.read(&mut buf
));
497 assert
!(buf
[0] == i
as u8);
505 fn connect(i
: usize, addr
: SocketAddr
) {
506 if i
== MAX { return }
508 let t
= thread
::spawn(move|| {
509 let mut stream
= t
!(TcpStream
::connect(&addr
));
510 // Connect again before writing
511 connect(i
+ 1, addr
);
512 t
!(stream
.write(&[i
as u8]));
514 t
.join().ok().unwrap();
519 fn multiple_connect_interleaved_lazy_schedule() {
520 const MAX
: usize = 10;
521 each_ip(&mut |addr
| {
522 let acceptor
= t
!(TcpListener
::bind(&addr
));
524 let _t
= thread
::spawn(move|| {
525 for stream
in acceptor
.incoming().take(MAX
) {
526 // Start another thread to handle the connection
527 let _t
= thread
::spawn(move|| {
528 let mut stream
= t
!(stream
);
530 t
!(stream
.read(&mut buf
));
531 assert
!(buf
[0] == 99);
539 fn connect(i
: usize, addr
: SocketAddr
) {
540 if i
== MAX { return }
542 let t
= thread
::spawn(move|| {
543 let mut stream
= t
!(TcpStream
::connect(&addr
));
544 connect(i
+ 1, addr
);
545 t
!(stream
.write(&[99]));
547 t
.join().ok().unwrap();
552 fn socket_and_peer_name() {
553 each_ip(&mut |addr
| {
554 let listener
= t
!(TcpListener
::bind(&addr
));
555 let so_name
= t
!(listener
.local_addr());
556 assert_eq
!(addr
, so_name
);
557 let _t
= thread
::spawn(move|| {
558 t
!(listener
.accept());
561 let stream
= t
!(TcpStream
::connect(&addr
));
562 assert_eq
!(addr
, t
!(stream
.peer_addr()));
568 each_ip(&mut |addr
| {
569 let (tx
, rx
) = channel();
570 let srv
= t
!(TcpListener
::bind(&addr
));
571 let _t
= thread
::spawn(move|| {
572 let mut cl
= t
!(srv
.accept()).0;
573 cl
.write(&[10]).unwrap();
576 tx
.send(()).unwrap();
579 let mut c
= t
!(TcpStream
::connect(&addr
));
581 assert_eq
!(c
.read(&mut b
).unwrap(), 1);
589 each_ip(&mut |addr
| {
590 let _listener
= t
!(TcpListener
::bind(&addr
));
591 match TcpListener
::bind(&addr
) {
594 assert
!(e
.kind() == ErrorKind
::ConnectionRefused
||
595 e
.kind() == ErrorKind
::Other
||
596 e
.kind() == ErrorKind
::AddrInUse
,
597 "unknown error: {} {:?}", e
, e
.kind());
605 each_ip(&mut |addr
| {
606 let acceptor
= t
!(TcpListener
::bind(&addr
));
608 let _t
= thread
::spawn(move|| {
609 t
!(TcpStream
::connect(&addr
));
612 t
!(acceptor
.accept());
614 t
!(TcpListener
::bind(&addr
));
619 fn tcp_clone_smoke() {
620 each_ip(&mut |addr
| {
621 let acceptor
= t
!(TcpListener
::bind(&addr
));
623 let _t
= thread
::spawn(move|| {
624 let mut s
= t
!(TcpStream
::connect(&addr
));
625 let mut buf
= [0, 0];
626 assert_eq
!(s
.read(&mut buf
).unwrap(), 1);
627 assert_eq
!(buf
[0], 1);
631 let mut s1
= t
!(acceptor
.accept()).0;
632 let s2
= t
!(s1
.try_clone());
634 let (tx1
, rx1
) = channel();
635 let (tx2
, rx2
) = channel();
636 let _t
= thread
::spawn(move|| {
640 tx2
.send(()).unwrap();
642 tx1
.send(()).unwrap();
643 let mut buf
= [0, 0];
644 assert_eq
!(s1
.read(&mut buf
).unwrap(), 1);
650 fn tcp_clone_two_read() {
651 each_ip(&mut |addr
| {
652 let acceptor
= t
!(TcpListener
::bind(&addr
));
653 let (tx1
, rx
) = channel();
654 let tx2
= tx1
.clone();
656 let _t
= thread
::spawn(move|| {
657 let mut s
= t
!(TcpStream
::connect(&addr
));
664 let mut s1
= t
!(acceptor
.accept()).0;
665 let s2
= t
!(s1
.try_clone());
667 let (done
, rx
) = channel();
668 let _t
= thread
::spawn(move|| {
670 let mut buf
= [0, 0];
671 t
!(s2
.read(&mut buf
));
672 tx2
.send(()).unwrap();
673 done
.send(()).unwrap();
675 let mut buf
= [0, 0];
676 t
!(s1
.read(&mut buf
));
677 tx1
.send(()).unwrap();
684 fn tcp_clone_two_write() {
685 each_ip(&mut |addr
| {
686 let acceptor
= t
!(TcpListener
::bind(&addr
));
688 let _t
= thread
::spawn(move|| {
689 let mut s
= t
!(TcpStream
::connect(&addr
));
690 let mut buf
= [0, 1];
691 t
!(s
.read(&mut buf
));
692 t
!(s
.read(&mut buf
));
695 let mut s1
= t
!(acceptor
.accept()).0;
696 let s2
= t
!(s1
.try_clone());
698 let (done
, rx
) = channel();
699 let _t
= thread
::spawn(move|| {
702 done
.send(()).unwrap();
711 fn shutdown_smoke() {
712 each_ip(&mut |addr
| {
713 let a
= t
!(TcpListener
::bind(&addr
));
714 let _t
= thread
::spawn(move|| {
715 let mut c
= t
!(a
.accept()).0;
717 assert_eq
!(c
.read(&mut b
).unwrap(), 0);
721 let mut s
= t
!(TcpStream
::connect(&addr
));
722 t
!(s
.shutdown(Shutdown
::Write
));
723 assert
!(s
.write(&[1]).is_err());
725 assert_eq
!(t
!(s
.read(&mut b
)), 1);
731 fn close_readwrite_smoke() {
732 each_ip(&mut |addr
| {
733 let a
= t
!(TcpListener
::bind(&addr
));
734 let (tx
, rx
) = channel
::<()>();
735 let _t
= thread
::spawn(move|| {
736 let _s
= t
!(a
.accept());
741 let mut s
= t
!(TcpStream
::connect(&addr
));
742 let mut s2
= t
!(s
.try_clone());
744 // closing should prevent reads/writes
745 t
!(s
.shutdown(Shutdown
::Write
));
746 assert
!(s
.write(&[0]).is_err());
747 t
!(s
.shutdown(Shutdown
::Read
));
748 assert_eq
!(s
.read(&mut b
).unwrap(), 0);
750 // closing should affect previous handles
751 assert
!(s2
.write(&[0]).is_err());
752 assert_eq
!(s2
.read(&mut b
).unwrap(), 0);
754 // closing should affect new handles
755 let mut s3
= t
!(s
.try_clone());
756 assert
!(s3
.write(&[0]).is_err());
757 assert_eq
!(s3
.read(&mut b
).unwrap(), 0);
759 // make sure these don't die
760 let _
= s2
.shutdown(Shutdown
::Read
);
761 let _
= s2
.shutdown(Shutdown
::Write
);
762 let _
= s3
.shutdown(Shutdown
::Read
);
763 let _
= s3
.shutdown(Shutdown
::Write
);
769 fn close_read_wakes_up() {
770 each_ip(&mut |addr
| {
771 let a
= t
!(TcpListener
::bind(&addr
));
772 let (tx1
, rx
) = channel
::<()>();
773 let _t
= thread
::spawn(move|| {
774 let _s
= t
!(a
.accept());
778 let s
= t
!(TcpStream
::connect(&addr
));
779 let s2
= t
!(s
.try_clone());
780 let (tx
, rx
) = channel();
781 let _t
= thread
::spawn(move|| {
783 assert_eq
!(t
!(s2
.read(&mut [0])), 0);
784 tx
.send(()).unwrap();
786 // this should wake up the child thread
787 t
!(s
.shutdown(Shutdown
::Read
));
789 // this test will never finish if the child doesn't wake up
796 fn clone_while_reading() {
797 each_ip(&mut |addr
| {
798 let accept
= t
!(TcpListener
::bind(&addr
));
800 // Enqueue a thread to write to a socket
801 let (tx
, rx
) = channel();
802 let (txdone
, rxdone
) = channel();
803 let txdone2
= txdone
.clone();
804 let _t
= thread
::spawn(move|| {
805 let mut tcp
= t
!(TcpStream
::connect(&addr
));
808 txdone2
.send(()).unwrap();
811 // Spawn off a reading clone
812 let tcp
= t
!(accept
.accept()).0;
813 let tcp2
= t
!(tcp
.try_clone());
814 let txdone3
= txdone
.clone();
815 let _t
= thread
::spawn(move|| {
817 t
!(tcp2
.read(&mut [0]));
818 txdone3
.send(()).unwrap();
821 // Try to ensure that the reading clone is indeed reading
826 // clone the handle again while it's reading, then let it finish the
828 let _
= t
!(tcp
.try_clone());
829 tx
.send(()).unwrap();
830 rxdone
.recv().unwrap();
831 rxdone
.recv().unwrap();
836 fn clone_accept_smoke() {
837 each_ip(&mut |addr
| {
838 let a
= t
!(TcpListener
::bind(&addr
));
839 let a2
= t
!(a
.try_clone());
841 let _t
= thread
::spawn(move|| {
842 let _
= TcpStream
::connect(&addr
);
844 let _t
= thread
::spawn(move|| {
845 let _
= TcpStream
::connect(&addr
);
854 fn clone_accept_concurrent() {
855 each_ip(&mut |addr
| {
856 let a
= t
!(TcpListener
::bind(&addr
));
857 let a2
= t
!(a
.try_clone());
859 let (tx
, rx
) = channel();
860 let tx2
= tx
.clone();
862 let _t
= thread
::spawn(move|| {
863 tx
.send(t
!(a
.accept())).unwrap();
865 let _t
= thread
::spawn(move|| {
866 tx2
.send(t
!(a2
.accept())).unwrap();
869 let _t
= thread
::spawn(move|| {
870 let _
= TcpStream
::connect(&addr
);
872 let _t
= thread
::spawn(move|| {
873 let _
= TcpStream
::connect(&addr
);
883 let name
= if cfg
!(windows
) {"socket"}
else {"fd"}
;
884 let socket_addr
= next_test_ip4();
886 let listener
= t
!(TcpListener
::bind(&socket_addr
));
887 let listener_inner
= listener
.0.socket().as_inner();
888 let compare
= format
!("TcpListener {{ addr: {:?}, {}: {:?} }}",
889 socket_addr
, name
, listener_inner
);
890 assert_eq
!(format
!("{:?}", listener
), compare
);
892 let stream
= t
!(TcpStream
::connect(&("localhost",
893 socket_addr
.port())));
894 let stream_inner
= stream
.0.socket().as_inner();
895 let compare
= format
!("TcpStream {{ addr: {:?}, \
896 peer: {:?}, {}: {:?} }}",
897 stream
.local_addr().unwrap(),
898 stream
.peer_addr().unwrap(),
901 assert_eq
!(format
!("{:?}", stream
), compare
);
904 // FIXME: re-enabled bitrig/openbsd tests once their socket timeout code
905 // no longer has rounding errors.
906 #[cfg_attr(any(target_os = "bitrig", target_os = "netbsd", target_os = "openbsd"), ignore)]
909 let addr
= next_test_ip4();
910 let listener
= t
!(TcpListener
::bind(&addr
));
912 let stream
= t
!(TcpStream
::connect(&("localhost", addr
.port())));
913 let dur
= Duration
::new(15410, 0);
915 assert_eq
!(None
, t
!(stream
.read_timeout()));
917 t
!(stream
.set_read_timeout(Some(dur
)));
918 assert_eq
!(Some(dur
), t
!(stream
.read_timeout()));
920 assert_eq
!(None
, t
!(stream
.write_timeout()));
922 t
!(stream
.set_write_timeout(Some(dur
)));
923 assert_eq
!(Some(dur
), t
!(stream
.write_timeout()));
925 t
!(stream
.set_read_timeout(None
));
926 assert_eq
!(None
, t
!(stream
.read_timeout()));
928 t
!(stream
.set_write_timeout(None
));
929 assert_eq
!(None
, t
!(stream
.write_timeout()));
933 fn test_read_timeout() {
934 let addr
= next_test_ip4();
935 let listener
= t
!(TcpListener
::bind(&addr
));
937 let mut stream
= t
!(TcpStream
::connect(&("localhost", addr
.port())));
938 t
!(stream
.set_read_timeout(Some(Duration
::from_millis(1000))));
940 let mut buf
= [0; 10];
941 let wait
= Duration
::span(|| {
942 let kind
= stream
.read(&mut buf
).err().expect("expected error").kind();
943 assert
!(kind
== ErrorKind
::WouldBlock
|| kind
== ErrorKind
::TimedOut
);
945 assert
!(wait
> Duration
::from_millis(400));
949 fn test_read_with_timeout() {
950 let addr
= next_test_ip4();
951 let listener
= t
!(TcpListener
::bind(&addr
));
953 let mut stream
= t
!(TcpStream
::connect(&("localhost", addr
.port())));
954 t
!(stream
.set_read_timeout(Some(Duration
::from_millis(1000))));
956 let mut other_end
= t
!(listener
.accept()).0;
957 t
!(other_end
.write_all(b
"hello world"));
959 let mut buf
= [0; 11];
960 t
!(stream
.read(&mut buf
));
961 assert_eq
!(b
"hello world", &buf
[..]);
963 let wait
= Duration
::span(|| {
964 let kind
= stream
.read(&mut buf
).err().expect("expected error").kind();
965 assert
!(kind
== ErrorKind
::WouldBlock
|| kind
== ErrorKind
::TimedOut
);
967 assert
!(wait
> Duration
::from_millis(400));