1 // Copyright 2015 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
16 use net
::{ToSocketAddrs, SocketAddr, Shutdown}
;
17 use sys_common
::net
as net_imp
;
18 use sys_common
::{AsInner, FromInner, IntoInner}
;
21 /// A structure which represents a TCP stream between a local socket and a
24 /// The socket will be closed when the value is dropped.
29 /// use std::io::prelude::*;
30 /// use std::net::TcpStream;
33 /// let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();
35 /// // ignore the Result
36 /// let _ = stream.write(&[1]);
37 /// let _ = stream.read(&mut [0; 128]); // ignore here too
38 /// } // the stream is closed here
40 #[stable(feature = "rust1", since = "1.0.0")]
41 pub struct TcpStream(net_imp
::TcpStream
);
43 /// A structure representing a socket server.
48 /// use std::net::{TcpListener, TcpStream};
51 /// let listener = TcpListener::bind("127.0.0.1:80").unwrap();
53 /// fn handle_client(stream: TcpStream) {
57 /// // accept connections and process them, spawning a new thread for each one
58 /// for stream in listener.incoming() {
61 /// thread::spawn(move|| {
62 /// // connection succeeded
63 /// handle_client(stream)
66 /// Err(e) => { /* connection failed */ }
70 /// // close the socket server
73 #[stable(feature = "rust1", since = "1.0.0")]
74 pub struct TcpListener(net_imp
::TcpListener
);
76 /// An infinite iterator over the connections from a `TcpListener`.
78 /// This iterator will infinitely yield `Some` of the accepted connections. It
79 /// is equivalent to calling `accept` in a loop.
80 #[stable(feature = "rust1", since = "1.0.0")]
81 pub struct Incoming
<'a
> { listener: &'a TcpListener }
84 /// Opens a TCP connection to a remote host.
86 /// `addr` is an address of the remote host. Anything which implements
87 /// `ToSocketAddrs` trait can be supplied for the address; see this trait
88 /// documentation for concrete examples.
89 /// In case `ToSocketAddrs::to_socket_addrs()` returns more than one entry,
90 /// then the first valid and reachable address is used.
91 #[stable(feature = "rust1", since = "1.0.0")]
92 pub fn connect
<A
: ToSocketAddrs
>(addr
: A
) -> io
::Result
<TcpStream
> {
93 super::each_addr(addr
, net_imp
::TcpStream
::connect
).map(TcpStream
)
96 /// Returns the socket address of the remote peer of this TCP connection.
97 #[stable(feature = "rust1", since = "1.0.0")]
98 pub fn peer_addr(&self) -> io
::Result
<SocketAddr
> {
102 /// Returns the socket address of the local half of this TCP connection.
103 #[stable(feature = "rust1", since = "1.0.0")]
104 pub fn local_addr(&self) -> io
::Result
<SocketAddr
> {
108 /// Shuts down the read, write, or both halves of this connection.
110 /// This function will cause all pending and future I/O on the specified
111 /// portions to return immediately with an appropriate value (see the
112 /// documentation of `Shutdown`).
113 #[stable(feature = "rust1", since = "1.0.0")]
114 pub fn shutdown(&self, how
: Shutdown
) -> io
::Result
<()> {
118 /// Creates a new independently owned handle to the underlying socket.
120 /// The returned `TcpStream` is a reference to the same stream that this
121 /// object references. Both handles will read and write the same stream of
122 /// data, and options set on one stream will be propagated to the other
124 #[stable(feature = "rust1", since = "1.0.0")]
125 pub fn try_clone(&self) -> io
::Result
<TcpStream
> {
126 self.0.duplicate().map(TcpStream
)
129 /// Sets the read timeout to the timeout specified.
131 /// If the value specified is `None`, then `read` calls will block
132 /// indefinitely. It is an error to pass the zero `Duration` to this
137 /// Platforms may return a different error code whenever a read times out as
138 /// a result of setting this option. For example Unix typically returns an
139 /// error of the kind `WouldBlock`, but Windows may return `TimedOut`.
140 #[stable(feature = "socket_timeout", since = "1.4.0")]
141 pub fn set_read_timeout(&self, dur
: Option
<Duration
>) -> io
::Result
<()> {
142 self.0.set_read_timeout(dur
)
145 /// Sets the write timeout to the timeout specified.
147 /// If the value specified is `None`, then `write` calls will block
148 /// indefinitely. It is an error to pass the zero `Duration` to this
153 /// Platforms may return a different error code whenever a write times out
154 /// as a result of setting this option. For example Unix typically returns
155 /// an error of the kind `WouldBlock`, but Windows may return `TimedOut`.
156 #[stable(feature = "socket_timeout", since = "1.4.0")]
157 pub fn set_write_timeout(&self, dur
: Option
<Duration
>) -> io
::Result
<()> {
158 self.0.set_write_timeout(dur
)
161 /// Returns the read timeout of this socket.
163 /// If the timeout is `None`, then `read` calls will block indefinitely.
167 /// Some platforms do not provide access to the current timeout.
168 #[stable(feature = "socket_timeout", since = "1.4.0")]
169 pub fn read_timeout(&self) -> io
::Result
<Option
<Duration
>> {
170 self.0.read_timeout()
173 /// Returns the write timeout of this socket.
175 /// If the timeout is `None`, then `write` calls will block indefinitely.
179 /// Some platforms do not provide access to the current timeout.
180 #[stable(feature = "socket_timeout", since = "1.4.0")]
181 pub fn write_timeout(&self) -> io
::Result
<Option
<Duration
>> {
182 self.0.write_timeout()
185 /// Sets the value of the `TCP_NODELAY` option on this socket.
187 /// If set, this option disables the Nagle algorithm. This means that
188 /// segments are always sent as soon as possible, even if there is only a
189 /// small amount of data. When not set, data is buffered until there is a
190 /// sufficient amount to send out, thereby avoiding the frequent sending of
192 #[stable(feature = "net2_mutators", since = "1.9.0")]
193 pub fn set_nodelay(&self, nodelay
: bool
) -> io
::Result
<()> {
194 self.0.set_nodelay(nodelay
)
197 /// Gets the value of the `TCP_NODELAY` option on this socket.
199 /// For more information about this option, see [`set_nodelay`][link].
201 /// [link]: #method.set_nodelay
202 #[stable(feature = "net2_mutators", since = "1.9.0")]
203 pub fn nodelay(&self) -> io
::Result
<bool
> {
207 /// Sets the value for the `IP_TTL` option on this socket.
209 /// This value sets the time-to-live field that is used in every packet sent
210 /// from this socket.
211 #[stable(feature = "net2_mutators", since = "1.9.0")]
212 pub fn set_ttl(&self, ttl
: u32) -> io
::Result
<()> {
216 /// Gets the value of the `IP_TTL` option for this socket.
218 /// For more information about this option, see [`set_ttl`][link].
220 /// [link]: #method.set_ttl
221 #[stable(feature = "net2_mutators", since = "1.9.0")]
222 pub fn ttl(&self) -> io
::Result
<u32> {
226 /// Get the value of the `SO_ERROR` option on this socket.
228 /// This will retrieve the stored error in the underlying socket, clearing
229 /// the field in the process. This can be useful for checking errors between
231 #[stable(feature = "net2_mutators", since = "1.9.0")]
232 pub fn take_error(&self) -> io
::Result
<Option
<io
::Error
>> {
236 /// Moves this TCP stream into or out of nonblocking mode.
238 /// On Unix this corresponds to calling fcntl, and on Windows this
239 /// corresponds to calling ioctlsocket.
240 #[stable(feature = "net2_mutators", since = "1.9.0")]
241 pub fn set_nonblocking(&self, nonblocking
: bool
) -> io
::Result
<()> {
242 self.0.set_nonblocking(nonblocking
)
246 #[stable(feature = "rust1", since = "1.0.0")]
247 impl Read
for TcpStream
{
248 fn read(&mut self, buf
: &mut [u8]) -> io
::Result
<usize> { self.0.read(buf) }
249 fn read_to_end(&mut self, buf
: &mut Vec
<u8>) -> io
::Result
<usize> {
250 self.0.read_to_end(buf
)
253 #[stable(feature = "rust1", since = "1.0.0")]
254 impl Write
for TcpStream
{
255 fn write(&mut self, buf
: &[u8]) -> io
::Result
<usize> { self.0.write(buf) }
256 fn flush(&mut self) -> io
::Result
<()> { Ok(()) }
258 #[stable(feature = "rust1", since = "1.0.0")]
259 impl<'a
> Read
for &'a TcpStream
{
260 fn read(&mut self, buf
: &mut [u8]) -> io
::Result
<usize> { self.0.read(buf) }
261 fn read_to_end(&mut self, buf
: &mut Vec
<u8>) -> io
::Result
<usize> {
262 self.0.read_to_end(buf
)
265 #[stable(feature = "rust1", since = "1.0.0")]
266 impl<'a
> Write
for &'a TcpStream
{
267 fn write(&mut self, buf
: &[u8]) -> io
::Result
<usize> { self.0.write(buf) }
268 fn flush(&mut self) -> io
::Result
<()> { Ok(()) }
271 impl AsInner
<net_imp
::TcpStream
> for TcpStream
{
272 fn as_inner(&self) -> &net_imp
::TcpStream { &self.0 }
275 impl FromInner
<net_imp
::TcpStream
> for TcpStream
{
276 fn from_inner(inner
: net_imp
::TcpStream
) -> TcpStream { TcpStream(inner) }
279 impl IntoInner
<net_imp
::TcpStream
> for TcpStream
{
280 fn into_inner(self) -> net_imp
::TcpStream { self.0 }
283 #[stable(feature = "rust1", since = "1.0.0")]
284 impl fmt
::Debug
for TcpStream
{
285 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
291 /// Creates a new `TcpListener` which will be bound to the specified
294 /// The returned listener is ready for accepting connections.
296 /// Binding with a port number of 0 will request that the OS assigns a port
297 /// to this listener. The port allocated can be queried via the
298 /// `local_addr` method.
300 /// The address type can be any implementor of `ToSocketAddrs` trait. See
301 /// its documentation for concrete examples.
302 #[stable(feature = "rust1", since = "1.0.0")]
303 pub fn bind
<A
: ToSocketAddrs
>(addr
: A
) -> io
::Result
<TcpListener
> {
304 super::each_addr(addr
, net_imp
::TcpListener
::bind
).map(TcpListener
)
307 /// Returns the local socket address of this listener.
308 #[stable(feature = "rust1", since = "1.0.0")]
309 pub fn local_addr(&self) -> io
::Result
<SocketAddr
> {
313 /// Creates a new independently owned handle to the underlying socket.
315 /// The returned `TcpListener` is a reference to the same socket that this
316 /// object references. Both handles can be used to accept incoming
317 /// connections and options set on one listener will affect the other.
318 #[stable(feature = "rust1", since = "1.0.0")]
319 pub fn try_clone(&self) -> io
::Result
<TcpListener
> {
320 self.0.duplicate().map(TcpListener
)
323 /// Accept a new incoming connection from this listener.
325 /// This function will block the calling thread until a new TCP connection
326 /// is established. When established, the corresponding `TcpStream` and the
327 /// remote peer's address will be returned.
328 #[stable(feature = "rust1", since = "1.0.0")]
329 pub fn accept(&self) -> io
::Result
<(TcpStream
, SocketAddr
)> {
330 self.0.accept().map(|(a
, b
)| (TcpStream(a
), b
))
333 /// Returns an iterator over the connections being received on this
336 /// The returned iterator will never return `None` and will also not yield
337 /// the peer's `SocketAddr` structure.
338 #[stable(feature = "rust1", since = "1.0.0")]
339 pub fn incoming(&self) -> Incoming
{
340 Incoming { listener: self }
343 /// Sets the value for the `IP_TTL` option on this socket.
345 /// This value sets the time-to-live field that is used in every packet sent
346 /// from this socket.
347 #[stable(feature = "net2_mutators", since = "1.9.0")]
348 pub fn set_ttl(&self, ttl
: u32) -> io
::Result
<()> {
352 /// Gets the value of the `IP_TTL` option for this socket.
354 /// For more information about this option, see [`set_ttl`][link].
356 /// [link]: #method.set_ttl
357 #[stable(feature = "net2_mutators", since = "1.9.0")]
358 pub fn ttl(&self) -> io
::Result
<u32> {
362 /// Sets the value for the `IPV6_V6ONLY` option on this socket.
364 /// If this is set to `true` then the socket is restricted to sending and
365 /// receiving IPv6 packets only. In this case two IPv4 and IPv6 applications
366 /// can bind the same port at the same time.
368 /// If this is set to `false` then the socket can be used to send and
369 /// receive packets from an IPv4-mapped IPv6 address.
370 #[stable(feature = "net2_mutators", since = "1.9.0")]
371 pub fn set_only_v6(&self, only_v6
: bool
) -> io
::Result
<()> {
372 self.0.set_only_v6(only_v6
)
375 /// Gets the value of the `IPV6_V6ONLY` option for this socket.
377 /// For more information about this option, see [`set_only_v6`][link].
379 /// [link]: #method.set_only_v6
380 #[stable(feature = "net2_mutators", since = "1.9.0")]
381 pub fn only_v6(&self) -> io
::Result
<bool
> {
385 /// Get the value of the `SO_ERROR` option on this socket.
387 /// This will retrieve the stored error in the underlying socket, clearing
388 /// the field in the process. This can be useful for checking errors between
390 #[stable(feature = "net2_mutators", since = "1.9.0")]
391 pub fn take_error(&self) -> io
::Result
<Option
<io
::Error
>> {
395 /// Moves this TCP stream into or out of nonblocking mode.
397 /// On Unix this corresponds to calling fcntl, and on Windows this
398 /// corresponds to calling ioctlsocket.
399 #[stable(feature = "net2_mutators", since = "1.9.0")]
400 pub fn set_nonblocking(&self, nonblocking
: bool
) -> io
::Result
<()> {
401 self.0.set_nonblocking(nonblocking
)
405 #[stable(feature = "rust1", since = "1.0.0")]
406 impl<'a
> Iterator
for Incoming
<'a
> {
407 type Item
= io
::Result
<TcpStream
>;
408 fn next(&mut self) -> Option
<io
::Result
<TcpStream
>> {
409 Some(self.listener
.accept().map(|p
| p
.0))
413 impl AsInner
<net_imp
::TcpListener
> for TcpListener
{
414 fn as_inner(&self) -> &net_imp
::TcpListener { &self.0 }
417 impl FromInner
<net_imp
::TcpListener
> for TcpListener
{
418 fn from_inner(inner
: net_imp
::TcpListener
) -> TcpListener
{
423 impl IntoInner
<net_imp
::TcpListener
> for TcpListener
{
424 fn into_inner(self) -> net_imp
::TcpListener { self.0 }
427 #[stable(feature = "rust1", since = "1.0.0")]
428 impl fmt
::Debug
for TcpListener
{
429 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
441 use net
::test
::{next_test_ip4, next_test_ip6}
;
442 use sync
::mpsc
::channel
;
443 use sys_common
::AsInner
;
444 use time
::{Instant, Duration}
;
447 fn each_ip(f
: &mut FnMut(SocketAddr
)) {
456 Err(e
) => panic
!("received error for `{}`: {}", stringify
!($e
), e
),
463 match TcpListener
::bind("1.1.1.1:9999") {
466 assert_eq
!(e
.kind(), ErrorKind
::AddrNotAvailable
),
472 match TcpStream
::connect("0.0.0.0:1") {
474 Err(e
) => assert
!(e
.kind() == ErrorKind
::ConnectionRefused
||
475 e
.kind() == ErrorKind
::InvalidInput
||
476 e
.kind() == ErrorKind
::AddrInUse
||
477 e
.kind() == ErrorKind
::AddrNotAvailable
,
478 "bad error: {} {:?}", e
, e
.kind()),
483 fn listen_localhost() {
484 let socket_addr
= next_test_ip4();
485 let listener
= t
!(TcpListener
::bind(&socket_addr
));
487 let _t
= thread
::spawn(move || {
488 let mut stream
= t
!(TcpStream
::connect(&("localhost",
489 socket_addr
.port())));
490 t
!(stream
.write(&[144]));
493 let mut stream
= t
!(listener
.accept()).0;
495 t
!(stream
.read(&mut buf
));
496 assert
!(buf
[0] == 144);
500 fn connect_loopback() {
501 each_ip(&mut |addr
| {
502 let acceptor
= t
!(TcpListener
::bind(&addr
));
504 let _t
= thread
::spawn(move|| {
505 let host
= match addr
{
506 SocketAddr
::V4(..) => "127.0.0.1",
507 SocketAddr
::V6(..) => "::1",
509 let mut stream
= t
!(TcpStream
::connect(&(host
, addr
.port())));
510 t
!(stream
.write(&[66]));
513 let mut stream
= t
!(acceptor
.accept()).0;
515 t
!(stream
.read(&mut buf
));
516 assert
!(buf
[0] == 66);
522 each_ip(&mut |addr
| {
523 let acceptor
= t
!(TcpListener
::bind(&addr
));
525 let (tx
, rx
) = channel();
526 let _t
= thread
::spawn(move|| {
527 let mut stream
= t
!(TcpStream
::connect(&addr
));
528 t
!(stream
.write(&[99]));
529 tx
.send(t
!(stream
.local_addr())).unwrap();
532 let (mut stream
, addr
) = t
!(acceptor
.accept());
534 t
!(stream
.read(&mut buf
));
535 assert
!(buf
[0] == 99);
536 assert_eq
!(addr
, t
!(rx
.recv()));
542 each_ip(&mut |addr
| {
543 let acceptor
= t
!(TcpListener
::bind(&addr
));
545 let _t
= thread
::spawn(move|| {
546 let _stream
= t
!(TcpStream
::connect(&addr
));
550 let mut stream
= t
!(acceptor
.accept()).0;
552 let nread
= t
!(stream
.read(&mut buf
));
553 assert_eq
!(nread
, 0);
554 let nread
= t
!(stream
.read(&mut buf
));
555 assert_eq
!(nread
, 0);
561 each_ip(&mut |addr
| {
562 let acceptor
= t
!(TcpListener
::bind(&addr
));
564 let (tx
, rx
) = channel();
565 let _t
= thread
::spawn(move|| {
566 drop(t
!(TcpStream
::connect(&addr
)));
567 tx
.send(()).unwrap();
570 let mut stream
= t
!(acceptor
.accept()).0;
573 match stream
.write(&buf
) {
576 assert
!(e
.kind() == ErrorKind
::ConnectionReset
||
577 e
.kind() == ErrorKind
::BrokenPipe
||
578 e
.kind() == ErrorKind
::ConnectionAborted
,
579 "unknown error: {}", e
);
586 fn multiple_connect_serial() {
587 each_ip(&mut |addr
| {
589 let acceptor
= t
!(TcpListener
::bind(&addr
));
591 let _t
= thread
::spawn(move|| {
593 let mut stream
= t
!(TcpStream
::connect(&addr
));
594 t
!(stream
.write(&[99]));
598 for stream
in acceptor
.incoming().take(max
) {
599 let mut stream
= t
!(stream
);
601 t
!(stream
.read(&mut buf
));
602 assert_eq
!(buf
[0], 99);
608 fn multiple_connect_interleaved_greedy_schedule() {
609 const MAX
: usize = 10;
610 each_ip(&mut |addr
| {
611 let acceptor
= t
!(TcpListener
::bind(&addr
));
613 let _t
= thread
::spawn(move|| {
614 let acceptor
= acceptor
;
615 for (i
, stream
) in acceptor
.incoming().enumerate().take(MAX
) {
616 // Start another thread to handle the connection
617 let _t
= thread
::spawn(move|| {
618 let mut stream
= t
!(stream
);
620 t
!(stream
.read(&mut buf
));
621 assert
!(buf
[0] == i
as u8);
629 fn connect(i
: usize, addr
: SocketAddr
) {
630 if i
== MAX { return }
632 let t
= thread
::spawn(move|| {
633 let mut stream
= t
!(TcpStream
::connect(&addr
));
634 // Connect again before writing
635 connect(i
+ 1, addr
);
636 t
!(stream
.write(&[i
as u8]));
638 t
.join().ok().unwrap();
643 fn multiple_connect_interleaved_lazy_schedule() {
644 const MAX
: usize = 10;
645 each_ip(&mut |addr
| {
646 let acceptor
= t
!(TcpListener
::bind(&addr
));
648 let _t
= thread
::spawn(move|| {
649 for stream
in acceptor
.incoming().take(MAX
) {
650 // Start another thread to handle the connection
651 let _t
= thread
::spawn(move|| {
652 let mut stream
= t
!(stream
);
654 t
!(stream
.read(&mut buf
));
655 assert
!(buf
[0] == 99);
663 fn connect(i
: usize, addr
: SocketAddr
) {
664 if i
== MAX { return }
666 let t
= thread
::spawn(move|| {
667 let mut stream
= t
!(TcpStream
::connect(&addr
));
668 connect(i
+ 1, addr
);
669 t
!(stream
.write(&[99]));
671 t
.join().ok().unwrap();
676 fn socket_and_peer_name() {
677 each_ip(&mut |addr
| {
678 let listener
= t
!(TcpListener
::bind(&addr
));
679 let so_name
= t
!(listener
.local_addr());
680 assert_eq
!(addr
, so_name
);
681 let _t
= thread
::spawn(move|| {
682 t
!(listener
.accept());
685 let stream
= t
!(TcpStream
::connect(&addr
));
686 assert_eq
!(addr
, t
!(stream
.peer_addr()));
692 each_ip(&mut |addr
| {
693 let (tx
, rx
) = channel();
694 let srv
= t
!(TcpListener
::bind(&addr
));
695 let _t
= thread
::spawn(move|| {
696 let mut cl
= t
!(srv
.accept()).0;
697 cl
.write(&[10]).unwrap();
700 tx
.send(()).unwrap();
703 let mut c
= t
!(TcpStream
::connect(&addr
));
705 assert_eq
!(c
.read(&mut b
).unwrap(), 1);
713 each_ip(&mut |addr
| {
714 let _listener
= t
!(TcpListener
::bind(&addr
));
715 match TcpListener
::bind(&addr
) {
718 assert
!(e
.kind() == ErrorKind
::ConnectionRefused
||
719 e
.kind() == ErrorKind
::Other
||
720 e
.kind() == ErrorKind
::AddrInUse
,
721 "unknown error: {} {:?}", e
, e
.kind());
729 each_ip(&mut |addr
| {
730 let acceptor
= t
!(TcpListener
::bind(&addr
));
732 let _t
= thread
::spawn(move|| {
733 t
!(TcpStream
::connect(&addr
));
736 t
!(acceptor
.accept());
738 t
!(TcpListener
::bind(&addr
));
743 fn tcp_clone_smoke() {
744 each_ip(&mut |addr
| {
745 let acceptor
= t
!(TcpListener
::bind(&addr
));
747 let _t
= thread
::spawn(move|| {
748 let mut s
= t
!(TcpStream
::connect(&addr
));
749 let mut buf
= [0, 0];
750 assert_eq
!(s
.read(&mut buf
).unwrap(), 1);
751 assert_eq
!(buf
[0], 1);
755 let mut s1
= t
!(acceptor
.accept()).0;
756 let s2
= t
!(s1
.try_clone());
758 let (tx1
, rx1
) = channel();
759 let (tx2
, rx2
) = channel();
760 let _t
= thread
::spawn(move|| {
764 tx2
.send(()).unwrap();
766 tx1
.send(()).unwrap();
767 let mut buf
= [0, 0];
768 assert_eq
!(s1
.read(&mut buf
).unwrap(), 1);
774 fn tcp_clone_two_read() {
775 each_ip(&mut |addr
| {
776 let acceptor
= t
!(TcpListener
::bind(&addr
));
777 let (tx1
, rx
) = channel();
778 let tx2
= tx1
.clone();
780 let _t
= thread
::spawn(move|| {
781 let mut s
= t
!(TcpStream
::connect(&addr
));
788 let mut s1
= t
!(acceptor
.accept()).0;
789 let s2
= t
!(s1
.try_clone());
791 let (done
, rx
) = channel();
792 let _t
= thread
::spawn(move|| {
794 let mut buf
= [0, 0];
795 t
!(s2
.read(&mut buf
));
796 tx2
.send(()).unwrap();
797 done
.send(()).unwrap();
799 let mut buf
= [0, 0];
800 t
!(s1
.read(&mut buf
));
801 tx1
.send(()).unwrap();
808 fn tcp_clone_two_write() {
809 each_ip(&mut |addr
| {
810 let acceptor
= t
!(TcpListener
::bind(&addr
));
812 let _t
= thread
::spawn(move|| {
813 let mut s
= t
!(TcpStream
::connect(&addr
));
814 let mut buf
= [0, 1];
815 t
!(s
.read(&mut buf
));
816 t
!(s
.read(&mut buf
));
819 let mut s1
= t
!(acceptor
.accept()).0;
820 let s2
= t
!(s1
.try_clone());
822 let (done
, rx
) = channel();
823 let _t
= thread
::spawn(move|| {
826 done
.send(()).unwrap();
835 fn shutdown_smoke() {
836 each_ip(&mut |addr
| {
837 let a
= t
!(TcpListener
::bind(&addr
));
838 let _t
= thread
::spawn(move|| {
839 let mut c
= t
!(a
.accept()).0;
841 assert_eq
!(c
.read(&mut b
).unwrap(), 0);
845 let mut s
= t
!(TcpStream
::connect(&addr
));
846 t
!(s
.shutdown(Shutdown
::Write
));
847 assert
!(s
.write(&[1]).is_err());
849 assert_eq
!(t
!(s
.read(&mut b
)), 1);
855 fn close_readwrite_smoke() {
856 each_ip(&mut |addr
| {
857 let a
= t
!(TcpListener
::bind(&addr
));
858 let (tx
, rx
) = channel
::<()>();
859 let _t
= thread
::spawn(move|| {
860 let _s
= t
!(a
.accept());
865 let mut s
= t
!(TcpStream
::connect(&addr
));
866 let mut s2
= t
!(s
.try_clone());
868 // closing should prevent reads/writes
869 t
!(s
.shutdown(Shutdown
::Write
));
870 assert
!(s
.write(&[0]).is_err());
871 t
!(s
.shutdown(Shutdown
::Read
));
872 assert_eq
!(s
.read(&mut b
).unwrap(), 0);
874 // closing should affect previous handles
875 assert
!(s2
.write(&[0]).is_err());
876 assert_eq
!(s2
.read(&mut b
).unwrap(), 0);
878 // closing should affect new handles
879 let mut s3
= t
!(s
.try_clone());
880 assert
!(s3
.write(&[0]).is_err());
881 assert_eq
!(s3
.read(&mut b
).unwrap(), 0);
883 // make sure these don't die
884 let _
= s2
.shutdown(Shutdown
::Read
);
885 let _
= s2
.shutdown(Shutdown
::Write
);
886 let _
= s3
.shutdown(Shutdown
::Read
);
887 let _
= s3
.shutdown(Shutdown
::Write
);
893 fn close_read_wakes_up() {
894 each_ip(&mut |addr
| {
895 let a
= t
!(TcpListener
::bind(&addr
));
896 let (tx1
, rx
) = channel
::<()>();
897 let _t
= thread
::spawn(move|| {
898 let _s
= t
!(a
.accept());
902 let s
= t
!(TcpStream
::connect(&addr
));
903 let s2
= t
!(s
.try_clone());
904 let (tx
, rx
) = channel();
905 let _t
= thread
::spawn(move|| {
907 assert_eq
!(t
!(s2
.read(&mut [0])), 0);
908 tx
.send(()).unwrap();
910 // this should wake up the child thread
911 t
!(s
.shutdown(Shutdown
::Read
));
913 // this test will never finish if the child doesn't wake up
920 fn clone_while_reading() {
921 each_ip(&mut |addr
| {
922 let accept
= t
!(TcpListener
::bind(&addr
));
924 // Enqueue a thread to write to a socket
925 let (tx
, rx
) = channel();
926 let (txdone
, rxdone
) = channel();
927 let txdone2
= txdone
.clone();
928 let _t
= thread
::spawn(move|| {
929 let mut tcp
= t
!(TcpStream
::connect(&addr
));
932 txdone2
.send(()).unwrap();
935 // Spawn off a reading clone
936 let tcp
= t
!(accept
.accept()).0;
937 let tcp2
= t
!(tcp
.try_clone());
938 let txdone3
= txdone
.clone();
939 let _t
= thread
::spawn(move|| {
941 t
!(tcp2
.read(&mut [0]));
942 txdone3
.send(()).unwrap();
945 // Try to ensure that the reading clone is indeed reading
950 // clone the handle again while it's reading, then let it finish the
952 let _
= t
!(tcp
.try_clone());
953 tx
.send(()).unwrap();
954 rxdone
.recv().unwrap();
955 rxdone
.recv().unwrap();
960 fn clone_accept_smoke() {
961 each_ip(&mut |addr
| {
962 let a
= t
!(TcpListener
::bind(&addr
));
963 let a2
= t
!(a
.try_clone());
965 let _t
= thread
::spawn(move|| {
966 let _
= TcpStream
::connect(&addr
);
968 let _t
= thread
::spawn(move|| {
969 let _
= TcpStream
::connect(&addr
);
978 fn clone_accept_concurrent() {
979 each_ip(&mut |addr
| {
980 let a
= t
!(TcpListener
::bind(&addr
));
981 let a2
= t
!(a
.try_clone());
983 let (tx
, rx
) = channel();
984 let tx2
= tx
.clone();
986 let _t
= thread
::spawn(move|| {
987 tx
.send(t
!(a
.accept())).unwrap();
989 let _t
= thread
::spawn(move|| {
990 tx2
.send(t
!(a2
.accept())).unwrap();
993 let _t
= thread
::spawn(move|| {
994 let _
= TcpStream
::connect(&addr
);
996 let _t
= thread
::spawn(move|| {
997 let _
= TcpStream
::connect(&addr
);
1007 let name
= if cfg
!(windows
) {"socket"}
else {"fd"}
;
1008 let socket_addr
= next_test_ip4();
1010 let listener
= t
!(TcpListener
::bind(&socket_addr
));
1011 let listener_inner
= listener
.0.socket().as_inner();
1012 let compare
= format
!("TcpListener {{ addr: {:?}, {}: {:?} }}",
1013 socket_addr
, name
, listener_inner
);
1014 assert_eq
!(format
!("{:?}", listener
), compare
);
1016 let stream
= t
!(TcpStream
::connect(&("localhost",
1017 socket_addr
.port())));
1018 let stream_inner
= stream
.0.socket().as_inner();
1019 let compare
= format
!("TcpStream {{ addr: {:?}, \
1020 peer: {:?}, {}: {:?} }}",
1021 stream
.local_addr().unwrap(),
1022 stream
.peer_addr().unwrap(),
1025 assert_eq
!(format
!("{:?}", stream
), compare
);
1028 // FIXME: re-enabled bitrig/openbsd tests once their socket timeout code
1029 // no longer has rounding errors.
1030 #[cfg_attr(any(target_os = "bitrig", target_os = "netbsd", target_os = "openbsd"), ignore)]
1033 let addr
= next_test_ip4();
1034 let listener
= t
!(TcpListener
::bind(&addr
));
1036 let stream
= t
!(TcpStream
::connect(&("localhost", addr
.port())));
1037 let dur
= Duration
::new(15410, 0);
1039 assert_eq
!(None
, t
!(stream
.read_timeout()));
1041 t
!(stream
.set_read_timeout(Some(dur
)));
1042 assert_eq
!(Some(dur
), t
!(stream
.read_timeout()));
1044 assert_eq
!(None
, t
!(stream
.write_timeout()));
1046 t
!(stream
.set_write_timeout(Some(dur
)));
1047 assert_eq
!(Some(dur
), t
!(stream
.write_timeout()));
1049 t
!(stream
.set_read_timeout(None
));
1050 assert_eq
!(None
, t
!(stream
.read_timeout()));
1052 t
!(stream
.set_write_timeout(None
));
1053 assert_eq
!(None
, t
!(stream
.write_timeout()));
1058 fn test_read_timeout() {
1059 let addr
= next_test_ip4();
1060 let listener
= t
!(TcpListener
::bind(&addr
));
1062 let mut stream
= t
!(TcpStream
::connect(&("localhost", addr
.port())));
1063 t
!(stream
.set_read_timeout(Some(Duration
::from_millis(1000))));
1065 let mut buf
= [0; 10];
1066 let start
= Instant
::now();
1067 let kind
= stream
.read(&mut buf
).err().expect("expected error").kind();
1068 assert
!(kind
== ErrorKind
::WouldBlock
|| kind
== ErrorKind
::TimedOut
);
1069 assert
!(start
.elapsed() > Duration
::from_millis(400));
1074 fn test_read_with_timeout() {
1075 let addr
= next_test_ip4();
1076 let listener
= t
!(TcpListener
::bind(&addr
));
1078 let mut stream
= t
!(TcpStream
::connect(&("localhost", addr
.port())));
1079 t
!(stream
.set_read_timeout(Some(Duration
::from_millis(1000))));
1081 let mut other_end
= t
!(listener
.accept()).0;
1082 t
!(other_end
.write_all(b
"hello world"));
1084 let mut buf
= [0; 11];
1085 t
!(stream
.read(&mut buf
));
1086 assert_eq
!(b
"hello world", &buf
[..]);
1088 let start
= Instant
::now();
1089 let kind
= stream
.read(&mut buf
).err().expect("expected error").kind();
1090 assert
!(kind
== ErrorKind
::WouldBlock
|| kind
== ErrorKind
::TimedOut
);
1091 assert
!(start
.elapsed() > Duration
::from_millis(400));
1097 let addr
= next_test_ip4();
1098 let _listener
= t
!(TcpListener
::bind(&addr
));
1100 let stream
= t
!(TcpStream
::connect(&("localhost", addr
.port())));
1102 assert_eq
!(false, t
!(stream
.nodelay()));
1103 t
!(stream
.set_nodelay(true));
1104 assert_eq
!(true, t
!(stream
.nodelay()));
1105 t
!(stream
.set_nodelay(false));
1106 assert_eq
!(false, t
!(stream
.nodelay()));
1113 let addr
= next_test_ip4();
1114 let listener
= t
!(TcpListener
::bind(&addr
));
1116 t
!(listener
.set_ttl(ttl
));
1117 assert_eq
!(ttl
, t
!(listener
.ttl()));
1119 let stream
= t
!(TcpStream
::connect(&("localhost", addr
.port())));
1121 t
!(stream
.set_ttl(ttl
));
1122 assert_eq
!(ttl
, t
!(stream
.ttl()));
1126 fn set_nonblocking() {
1127 let addr
= next_test_ip4();
1128 let listener
= t
!(TcpListener
::bind(&addr
));
1130 t
!(listener
.set_nonblocking(true));
1131 t
!(listener
.set_nonblocking(false));
1133 let mut stream
= t
!(TcpStream
::connect(&("localhost", addr
.port())));
1135 t
!(stream
.set_nonblocking(false));
1136 t
!(stream
.set_nonblocking(true));
1139 match stream
.read(&mut buf
) {
1140 Ok(_
) => panic
!("expected error"),
1141 Err(ref e
) if e
.kind() == ErrorKind
::WouldBlock
=> {}
1142 Err(e
) => panic
!("unexpected error {}", e
),