1 // Copyright 2015 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
15 use net
::{ToSocketAddrs, SocketAddr, Shutdown}
;
16 use sys_common
::net
as net_imp
;
17 use sys_common
::{AsInner, FromInner, IntoInner}
;
20 /// A structure which represents a TCP stream between a local socket and a
23 /// The socket will be closed when the value is dropped.
28 /// use std::io::prelude::*;
29 /// use std::net::TcpStream;
32 /// let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();
34 /// // ignore the Result
35 /// let _ = stream.write(&[1]);
36 /// let _ = stream.read(&mut [0; 128]); // ignore here too
37 /// } // the stream is closed here
39 #[stable(feature = "rust1", since = "1.0.0")]
40 pub struct TcpStream(net_imp
::TcpStream
);
42 /// A structure representing a socket server.
47 /// use std::net::{TcpListener, TcpStream};
49 /// let listener = TcpListener::bind("127.0.0.1:80").unwrap();
51 /// fn handle_client(stream: TcpStream) {
55 /// // accept connections and process them, spawning a new thread for each one
56 /// for stream in listener.incoming() {
59 /// handle_client(stream);
61 /// Err(e) => { /* connection failed */ }
65 #[stable(feature = "rust1", since = "1.0.0")]
66 pub struct TcpListener(net_imp
::TcpListener
);
68 /// An infinite iterator over the connections from a `TcpListener`.
70 /// This iterator will infinitely yield `Some` of the accepted connections. It
71 /// is equivalent to calling `accept` in a loop.
73 /// This `struct` is created by the [`incoming`] method on [`TcpListener`].
75 /// [`incoming`]: struct.TcpListener.html#method.incoming
76 /// [`TcpListener`]: struct.TcpListener.html
77 #[stable(feature = "rust1", since = "1.0.0")]
78 pub struct Incoming
<'a
> { listener: &'a TcpListener }
81 /// Opens a TCP connection to a remote host.
83 /// `addr` is an address of the remote host. Anything which implements
84 /// `ToSocketAddrs` trait can be supplied for the address; see this trait
85 /// documentation for concrete examples.
86 /// In case `ToSocketAddrs::to_socket_addrs()` returns more than one entry,
87 /// then the first valid and reachable address is used.
88 #[stable(feature = "rust1", since = "1.0.0")]
89 pub fn connect
<A
: ToSocketAddrs
>(addr
: A
) -> io
::Result
<TcpStream
> {
90 super::each_addr(addr
, net_imp
::TcpStream
::connect
).map(TcpStream
)
93 /// Returns the socket address of the remote peer of this TCP connection.
94 #[stable(feature = "rust1", since = "1.0.0")]
95 pub fn peer_addr(&self) -> io
::Result
<SocketAddr
> {
99 /// Returns the socket address of the local half of this TCP connection.
100 #[stable(feature = "rust1", since = "1.0.0")]
101 pub fn local_addr(&self) -> io
::Result
<SocketAddr
> {
105 /// Shuts down the read, write, or both halves of this connection.
107 /// This function will cause all pending and future I/O on the specified
108 /// portions to return immediately with an appropriate value (see the
109 /// documentation of `Shutdown`).
110 #[stable(feature = "rust1", since = "1.0.0")]
111 pub fn shutdown(&self, how
: Shutdown
) -> io
::Result
<()> {
115 /// Creates a new independently owned handle to the underlying socket.
117 /// The returned `TcpStream` is a reference to the same stream that this
118 /// object references. Both handles will read and write the same stream of
119 /// data, and options set on one stream will be propagated to the other
121 #[stable(feature = "rust1", since = "1.0.0")]
122 pub fn try_clone(&self) -> io
::Result
<TcpStream
> {
123 self.0.duplicate().map(TcpStream
)
126 /// Sets the read timeout to the timeout specified.
128 /// If the value specified is `None`, then `read` calls will block
129 /// indefinitely. It is an error to pass the zero `Duration` to this
134 /// Platforms may return a different error code whenever a read times out as
135 /// a result of setting this option. For example Unix typically returns an
136 /// error of the kind `WouldBlock`, but Windows may return `TimedOut`.
137 #[stable(feature = "socket_timeout", since = "1.4.0")]
138 pub fn set_read_timeout(&self, dur
: Option
<Duration
>) -> io
::Result
<()> {
139 self.0.set_read_timeout(dur
)
142 /// Sets the write timeout to the timeout specified.
144 /// If the value specified is `None`, then `write` calls will block
145 /// indefinitely. It is an error to pass the zero `Duration` to this
150 /// Platforms may return a different error code whenever a write times out
151 /// as a result of setting this option. For example Unix typically returns
152 /// an error of the kind `WouldBlock`, but Windows may return `TimedOut`.
153 #[stable(feature = "socket_timeout", since = "1.4.0")]
154 pub fn set_write_timeout(&self, dur
: Option
<Duration
>) -> io
::Result
<()> {
155 self.0.set_write_timeout(dur
)
158 /// Returns the read timeout of this socket.
160 /// If the timeout is `None`, then `read` calls will block indefinitely.
164 /// Some platforms do not provide access to the current timeout.
165 #[stable(feature = "socket_timeout", since = "1.4.0")]
166 pub fn read_timeout(&self) -> io
::Result
<Option
<Duration
>> {
167 self.0.read_timeout()
170 /// Returns the write timeout of this socket.
172 /// If the timeout is `None`, then `write` calls will block indefinitely.
176 /// Some platforms do not provide access to the current timeout.
177 #[stable(feature = "socket_timeout", since = "1.4.0")]
178 pub fn write_timeout(&self) -> io
::Result
<Option
<Duration
>> {
179 self.0.write_timeout()
182 /// Sets the value of the `TCP_NODELAY` option on this socket.
184 /// If set, this option disables the Nagle algorithm. This means that
185 /// segments are always sent as soon as possible, even if there is only a
186 /// small amount of data. When not set, data is buffered until there is a
187 /// sufficient amount to send out, thereby avoiding the frequent sending of
189 #[stable(feature = "net2_mutators", since = "1.9.0")]
190 pub fn set_nodelay(&self, nodelay
: bool
) -> io
::Result
<()> {
191 self.0.set_nodelay(nodelay
)
194 /// Gets the value of the `TCP_NODELAY` option on this socket.
196 /// For more information about this option, see [`set_nodelay`][link].
198 /// [link]: #method.set_nodelay
199 #[stable(feature = "net2_mutators", since = "1.9.0")]
200 pub fn nodelay(&self) -> io
::Result
<bool
> {
204 /// Sets the value for the `IP_TTL` option on this socket.
206 /// This value sets the time-to-live field that is used in every packet sent
207 /// from this socket.
208 #[stable(feature = "net2_mutators", since = "1.9.0")]
209 pub fn set_ttl(&self, ttl
: u32) -> io
::Result
<()> {
213 /// Gets the value of the `IP_TTL` option for this socket.
215 /// For more information about this option, see [`set_ttl`][link].
217 /// [link]: #method.set_ttl
218 #[stable(feature = "net2_mutators", since = "1.9.0")]
219 pub fn ttl(&self) -> io
::Result
<u32> {
223 /// Get the value of the `SO_ERROR` option on this socket.
225 /// This will retrieve the stored error in the underlying socket, clearing
226 /// the field in the process. This can be useful for checking errors between
228 #[stable(feature = "net2_mutators", since = "1.9.0")]
229 pub fn take_error(&self) -> io
::Result
<Option
<io
::Error
>> {
233 /// Moves this TCP stream into or out of nonblocking mode.
235 /// On Unix this corresponds to calling fcntl, and on Windows this
236 /// corresponds to calling ioctlsocket.
237 #[stable(feature = "net2_mutators", since = "1.9.0")]
238 pub fn set_nonblocking(&self, nonblocking
: bool
) -> io
::Result
<()> {
239 self.0.set_nonblocking(nonblocking
)
243 #[stable(feature = "rust1", since = "1.0.0")]
244 impl Read
for TcpStream
{
245 fn read(&mut self, buf
: &mut [u8]) -> io
::Result
<usize> { self.0.read(buf) }
246 fn read_to_end(&mut self, buf
: &mut Vec
<u8>) -> io
::Result
<usize> {
247 self.0.read_to_end(buf
)
250 #[stable(feature = "rust1", since = "1.0.0")]
251 impl Write
for TcpStream
{
252 fn write(&mut self, buf
: &[u8]) -> io
::Result
<usize> { self.0.write(buf) }
253 fn flush(&mut self) -> io
::Result
<()> { Ok(()) }
255 #[stable(feature = "rust1", since = "1.0.0")]
256 impl<'a
> Read
for &'a TcpStream
{
257 fn read(&mut self, buf
: &mut [u8]) -> io
::Result
<usize> { self.0.read(buf) }
258 fn read_to_end(&mut self, buf
: &mut Vec
<u8>) -> io
::Result
<usize> {
259 self.0.read_to_end(buf
)
262 #[stable(feature = "rust1", since = "1.0.0")]
263 impl<'a
> Write
for &'a TcpStream
{
264 fn write(&mut self, buf
: &[u8]) -> io
::Result
<usize> { self.0.write(buf) }
265 fn flush(&mut self) -> io
::Result
<()> { Ok(()) }
268 impl AsInner
<net_imp
::TcpStream
> for TcpStream
{
269 fn as_inner(&self) -> &net_imp
::TcpStream { &self.0 }
272 impl FromInner
<net_imp
::TcpStream
> for TcpStream
{
273 fn from_inner(inner
: net_imp
::TcpStream
) -> TcpStream { TcpStream(inner) }
276 impl IntoInner
<net_imp
::TcpStream
> for TcpStream
{
277 fn into_inner(self) -> net_imp
::TcpStream { self.0 }
280 #[stable(feature = "rust1", since = "1.0.0")]
281 impl fmt
::Debug
for TcpStream
{
282 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
288 /// Creates a new `TcpListener` which will be bound to the specified
291 /// The returned listener is ready for accepting connections.
293 /// Binding with a port number of 0 will request that the OS assigns a port
294 /// to this listener. The port allocated can be queried via the
295 /// `local_addr` method.
297 /// The address type can be any implementor of `ToSocketAddrs` trait. See
298 /// its documentation for concrete examples.
299 #[stable(feature = "rust1", since = "1.0.0")]
300 pub fn bind
<A
: ToSocketAddrs
>(addr
: A
) -> io
::Result
<TcpListener
> {
301 super::each_addr(addr
, net_imp
::TcpListener
::bind
).map(TcpListener
)
304 /// Returns the local socket address of this listener.
305 #[stable(feature = "rust1", since = "1.0.0")]
306 pub fn local_addr(&self) -> io
::Result
<SocketAddr
> {
310 /// Creates a new independently owned handle to the underlying socket.
312 /// The returned `TcpListener` is a reference to the same socket that this
313 /// object references. Both handles can be used to accept incoming
314 /// connections and options set on one listener will affect the other.
315 #[stable(feature = "rust1", since = "1.0.0")]
316 pub fn try_clone(&self) -> io
::Result
<TcpListener
> {
317 self.0.duplicate().map(TcpListener
)
320 /// Accept a new incoming connection from this listener.
322 /// This function will block the calling thread until a new TCP connection
323 /// is established. When established, the corresponding `TcpStream` and the
324 /// remote peer's address will be returned.
325 #[stable(feature = "rust1", since = "1.0.0")]
326 pub fn accept(&self) -> io
::Result
<(TcpStream
, SocketAddr
)> {
327 self.0.accept().map(|(a
, b
)| (TcpStream(a
), b
))
330 /// Returns an iterator over the connections being received on this
333 /// The returned iterator will never return `None` and will also not yield
334 /// the peer's `SocketAddr` structure.
335 #[stable(feature = "rust1", since = "1.0.0")]
336 pub fn incoming(&self) -> Incoming
{
337 Incoming { listener: self }
340 /// Sets the value for the `IP_TTL` option on this socket.
342 /// This value sets the time-to-live field that is used in every packet sent
343 /// from this socket.
344 #[stable(feature = "net2_mutators", since = "1.9.0")]
345 pub fn set_ttl(&self, ttl
: u32) -> io
::Result
<()> {
349 /// Gets the value of the `IP_TTL` option for this socket.
351 /// For more information about this option, see [`set_ttl`][link].
353 /// [link]: #method.set_ttl
354 #[stable(feature = "net2_mutators", since = "1.9.0")]
355 pub fn ttl(&self) -> io
::Result
<u32> {
359 /// Sets the value for the `IPV6_V6ONLY` option on this socket.
361 /// If this is set to `true` then the socket is restricted to sending and
362 /// receiving IPv6 packets only. In this case two IPv4 and IPv6 applications
363 /// can bind the same port at the same time.
365 /// If this is set to `false` then the socket can be used to send and
366 /// receive packets from an IPv4-mapped IPv6 address.
367 #[stable(feature = "net2_mutators", since = "1.9.0")]
368 pub fn set_only_v6(&self, only_v6
: bool
) -> io
::Result
<()> {
369 self.0.set_only_v6(only_v6
)
372 /// Gets the value of the `IPV6_V6ONLY` option for this socket.
374 /// For more information about this option, see [`set_only_v6`][link].
376 /// [link]: #method.set_only_v6
377 #[stable(feature = "net2_mutators", since = "1.9.0")]
378 pub fn only_v6(&self) -> io
::Result
<bool
> {
382 /// Get the value of the `SO_ERROR` option on this socket.
384 /// This will retrieve the stored error in the underlying socket, clearing
385 /// the field in the process. This can be useful for checking errors between
387 #[stable(feature = "net2_mutators", since = "1.9.0")]
388 pub fn take_error(&self) -> io
::Result
<Option
<io
::Error
>> {
392 /// Moves this TCP stream into or out of nonblocking mode.
394 /// On Unix this corresponds to calling fcntl, and on Windows this
395 /// corresponds to calling ioctlsocket.
396 #[stable(feature = "net2_mutators", since = "1.9.0")]
397 pub fn set_nonblocking(&self, nonblocking
: bool
) -> io
::Result
<()> {
398 self.0.set_nonblocking(nonblocking
)
402 #[stable(feature = "rust1", since = "1.0.0")]
403 impl<'a
> Iterator
for Incoming
<'a
> {
404 type Item
= io
::Result
<TcpStream
>;
405 fn next(&mut self) -> Option
<io
::Result
<TcpStream
>> {
406 Some(self.listener
.accept().map(|p
| p
.0))
410 impl AsInner
<net_imp
::TcpListener
> for TcpListener
{
411 fn as_inner(&self) -> &net_imp
::TcpListener { &self.0 }
414 impl FromInner
<net_imp
::TcpListener
> for TcpListener
{
415 fn from_inner(inner
: net_imp
::TcpListener
) -> TcpListener
{
420 impl IntoInner
<net_imp
::TcpListener
> for TcpListener
{
421 fn into_inner(self) -> net_imp
::TcpListener { self.0 }
424 #[stable(feature = "rust1", since = "1.0.0")]
425 impl fmt
::Debug
for TcpListener
{
426 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
436 use net
::test
::{next_test_ip4, next_test_ip6}
;
437 use sync
::mpsc
::channel
;
438 use sys_common
::AsInner
;
439 use time
::{Instant, Duration}
;
442 fn each_ip(f
: &mut FnMut(SocketAddr
)) {
451 Err(e
) => panic
!("received error for `{}`: {}", stringify
!($e
), e
),
458 match TcpListener
::bind("1.1.1.1:9999") {
461 assert_eq
!(e
.kind(), ErrorKind
::AddrNotAvailable
),
467 match TcpStream
::connect("0.0.0.0:1") {
469 Err(e
) => assert
!(e
.kind() == ErrorKind
::ConnectionRefused
||
470 e
.kind() == ErrorKind
::InvalidInput
||
471 e
.kind() == ErrorKind
::AddrInUse
||
472 e
.kind() == ErrorKind
::AddrNotAvailable
,
473 "bad error: {} {:?}", e
, e
.kind()),
478 fn listen_localhost() {
479 let socket_addr
= next_test_ip4();
480 let listener
= t
!(TcpListener
::bind(&socket_addr
));
482 let _t
= thread
::spawn(move || {
483 let mut stream
= t
!(TcpStream
::connect(&("localhost",
484 socket_addr
.port())));
485 t
!(stream
.write(&[144]));
488 let mut stream
= t
!(listener
.accept()).0;
490 t
!(stream
.read(&mut buf
));
491 assert
!(buf
[0] == 144);
495 fn connect_loopback() {
496 each_ip(&mut |addr
| {
497 let acceptor
= t
!(TcpListener
::bind(&addr
));
499 let _t
= thread
::spawn(move|| {
500 let host
= match addr
{
501 SocketAddr
::V4(..) => "127.0.0.1",
502 SocketAddr
::V6(..) => "::1",
504 let mut stream
= t
!(TcpStream
::connect(&(host
, addr
.port())));
505 t
!(stream
.write(&[66]));
508 let mut stream
= t
!(acceptor
.accept()).0;
510 t
!(stream
.read(&mut buf
));
511 assert
!(buf
[0] == 66);
517 each_ip(&mut |addr
| {
518 let acceptor
= t
!(TcpListener
::bind(&addr
));
520 let (tx
, rx
) = channel();
521 let _t
= thread
::spawn(move|| {
522 let mut stream
= t
!(TcpStream
::connect(&addr
));
523 t
!(stream
.write(&[99]));
524 tx
.send(t
!(stream
.local_addr())).unwrap();
527 let (mut stream
, addr
) = t
!(acceptor
.accept());
529 t
!(stream
.read(&mut buf
));
530 assert
!(buf
[0] == 99);
531 assert_eq
!(addr
, t
!(rx
.recv()));
537 each_ip(&mut |addr
| {
538 let acceptor
= t
!(TcpListener
::bind(&addr
));
540 let _t
= thread
::spawn(move|| {
541 let _stream
= t
!(TcpStream
::connect(&addr
));
545 let mut stream
= t
!(acceptor
.accept()).0;
547 let nread
= t
!(stream
.read(&mut buf
));
548 assert_eq
!(nread
, 0);
549 let nread
= t
!(stream
.read(&mut buf
));
550 assert_eq
!(nread
, 0);
556 each_ip(&mut |addr
| {
557 let acceptor
= t
!(TcpListener
::bind(&addr
));
559 let (tx
, rx
) = channel();
560 let _t
= thread
::spawn(move|| {
561 drop(t
!(TcpStream
::connect(&addr
)));
562 tx
.send(()).unwrap();
565 let mut stream
= t
!(acceptor
.accept()).0;
568 match stream
.write(&buf
) {
571 assert
!(e
.kind() == ErrorKind
::ConnectionReset
||
572 e
.kind() == ErrorKind
::BrokenPipe
||
573 e
.kind() == ErrorKind
::ConnectionAborted
,
574 "unknown error: {}", e
);
581 fn multiple_connect_serial() {
582 each_ip(&mut |addr
| {
584 let acceptor
= t
!(TcpListener
::bind(&addr
));
586 let _t
= thread
::spawn(move|| {
588 let mut stream
= t
!(TcpStream
::connect(&addr
));
589 t
!(stream
.write(&[99]));
593 for stream
in acceptor
.incoming().take(max
) {
594 let mut stream
= t
!(stream
);
596 t
!(stream
.read(&mut buf
));
597 assert_eq
!(buf
[0], 99);
603 fn multiple_connect_interleaved_greedy_schedule() {
604 const MAX
: usize = 10;
605 each_ip(&mut |addr
| {
606 let acceptor
= t
!(TcpListener
::bind(&addr
));
608 let _t
= thread
::spawn(move|| {
609 let acceptor
= acceptor
;
610 for (i
, stream
) in acceptor
.incoming().enumerate().take(MAX
) {
611 // Start another thread to handle the connection
612 let _t
= thread
::spawn(move|| {
613 let mut stream
= t
!(stream
);
615 t
!(stream
.read(&mut buf
));
616 assert
!(buf
[0] == i
as u8);
624 fn connect(i
: usize, addr
: SocketAddr
) {
625 if i
== MAX { return }
627 let t
= thread
::spawn(move|| {
628 let mut stream
= t
!(TcpStream
::connect(&addr
));
629 // Connect again before writing
630 connect(i
+ 1, addr
);
631 t
!(stream
.write(&[i
as u8]));
633 t
.join().ok().unwrap();
638 fn multiple_connect_interleaved_lazy_schedule() {
639 const MAX
: usize = 10;
640 each_ip(&mut |addr
| {
641 let acceptor
= t
!(TcpListener
::bind(&addr
));
643 let _t
= thread
::spawn(move|| {
644 for stream
in acceptor
.incoming().take(MAX
) {
645 // Start another thread to handle the connection
646 let _t
= thread
::spawn(move|| {
647 let mut stream
= t
!(stream
);
649 t
!(stream
.read(&mut buf
));
650 assert
!(buf
[0] == 99);
658 fn connect(i
: usize, addr
: SocketAddr
) {
659 if i
== MAX { return }
661 let t
= thread
::spawn(move|| {
662 let mut stream
= t
!(TcpStream
::connect(&addr
));
663 connect(i
+ 1, addr
);
664 t
!(stream
.write(&[99]));
666 t
.join().ok().unwrap();
671 fn socket_and_peer_name() {
672 each_ip(&mut |addr
| {
673 let listener
= t
!(TcpListener
::bind(&addr
));
674 let so_name
= t
!(listener
.local_addr());
675 assert_eq
!(addr
, so_name
);
676 let _t
= thread
::spawn(move|| {
677 t
!(listener
.accept());
680 let stream
= t
!(TcpStream
::connect(&addr
));
681 assert_eq
!(addr
, t
!(stream
.peer_addr()));
687 each_ip(&mut |addr
| {
688 let (tx
, rx
) = channel();
689 let srv
= t
!(TcpListener
::bind(&addr
));
690 let _t
= thread
::spawn(move|| {
691 let mut cl
= t
!(srv
.accept()).0;
692 cl
.write(&[10]).unwrap();
695 tx
.send(()).unwrap();
698 let mut c
= t
!(TcpStream
::connect(&addr
));
700 assert_eq
!(c
.read(&mut b
).unwrap(), 1);
708 each_ip(&mut |addr
| {
709 let _listener
= t
!(TcpListener
::bind(&addr
));
710 match TcpListener
::bind(&addr
) {
713 assert
!(e
.kind() == ErrorKind
::ConnectionRefused
||
714 e
.kind() == ErrorKind
::Other
||
715 e
.kind() == ErrorKind
::AddrInUse
,
716 "unknown error: {} {:?}", e
, e
.kind());
724 each_ip(&mut |addr
| {
725 let acceptor
= t
!(TcpListener
::bind(&addr
));
727 let _t
= thread
::spawn(move|| {
728 t
!(TcpStream
::connect(&addr
));
731 t
!(acceptor
.accept());
733 t
!(TcpListener
::bind(&addr
));
738 fn tcp_clone_smoke() {
739 each_ip(&mut |addr
| {
740 let acceptor
= t
!(TcpListener
::bind(&addr
));
742 let _t
= thread
::spawn(move|| {
743 let mut s
= t
!(TcpStream
::connect(&addr
));
744 let mut buf
= [0, 0];
745 assert_eq
!(s
.read(&mut buf
).unwrap(), 1);
746 assert_eq
!(buf
[0], 1);
750 let mut s1
= t
!(acceptor
.accept()).0;
751 let s2
= t
!(s1
.try_clone());
753 let (tx1
, rx1
) = channel();
754 let (tx2
, rx2
) = channel();
755 let _t
= thread
::spawn(move|| {
759 tx2
.send(()).unwrap();
761 tx1
.send(()).unwrap();
762 let mut buf
= [0, 0];
763 assert_eq
!(s1
.read(&mut buf
).unwrap(), 1);
769 fn tcp_clone_two_read() {
770 each_ip(&mut |addr
| {
771 let acceptor
= t
!(TcpListener
::bind(&addr
));
772 let (tx1
, rx
) = channel();
773 let tx2
= tx1
.clone();
775 let _t
= thread
::spawn(move|| {
776 let mut s
= t
!(TcpStream
::connect(&addr
));
783 let mut s1
= t
!(acceptor
.accept()).0;
784 let s2
= t
!(s1
.try_clone());
786 let (done
, rx
) = channel();
787 let _t
= thread
::spawn(move|| {
789 let mut buf
= [0, 0];
790 t
!(s2
.read(&mut buf
));
791 tx2
.send(()).unwrap();
792 done
.send(()).unwrap();
794 let mut buf
= [0, 0];
795 t
!(s1
.read(&mut buf
));
796 tx1
.send(()).unwrap();
803 fn tcp_clone_two_write() {
804 each_ip(&mut |addr
| {
805 let acceptor
= t
!(TcpListener
::bind(&addr
));
807 let _t
= thread
::spawn(move|| {
808 let mut s
= t
!(TcpStream
::connect(&addr
));
809 let mut buf
= [0, 1];
810 t
!(s
.read(&mut buf
));
811 t
!(s
.read(&mut buf
));
814 let mut s1
= t
!(acceptor
.accept()).0;
815 let s2
= t
!(s1
.try_clone());
817 let (done
, rx
) = channel();
818 let _t
= thread
::spawn(move|| {
821 done
.send(()).unwrap();
830 fn shutdown_smoke() {
831 each_ip(&mut |addr
| {
832 let a
= t
!(TcpListener
::bind(&addr
));
833 let _t
= thread
::spawn(move|| {
834 let mut c
= t
!(a
.accept()).0;
836 assert_eq
!(c
.read(&mut b
).unwrap(), 0);
840 let mut s
= t
!(TcpStream
::connect(&addr
));
841 t
!(s
.shutdown(Shutdown
::Write
));
842 assert
!(s
.write(&[1]).is_err());
844 assert_eq
!(t
!(s
.read(&mut b
)), 1);
850 fn close_readwrite_smoke() {
851 each_ip(&mut |addr
| {
852 let a
= t
!(TcpListener
::bind(&addr
));
853 let (tx
, rx
) = channel
::<()>();
854 let _t
= thread
::spawn(move|| {
855 let _s
= t
!(a
.accept());
860 let mut s
= t
!(TcpStream
::connect(&addr
));
861 let mut s2
= t
!(s
.try_clone());
863 // closing should prevent reads/writes
864 t
!(s
.shutdown(Shutdown
::Write
));
865 assert
!(s
.write(&[0]).is_err());
866 t
!(s
.shutdown(Shutdown
::Read
));
867 assert_eq
!(s
.read(&mut b
).unwrap(), 0);
869 // closing should affect previous handles
870 assert
!(s2
.write(&[0]).is_err());
871 assert_eq
!(s2
.read(&mut b
).unwrap(), 0);
873 // closing should affect new handles
874 let mut s3
= t
!(s
.try_clone());
875 assert
!(s3
.write(&[0]).is_err());
876 assert_eq
!(s3
.read(&mut b
).unwrap(), 0);
878 // make sure these don't die
879 let _
= s2
.shutdown(Shutdown
::Read
);
880 let _
= s2
.shutdown(Shutdown
::Write
);
881 let _
= s3
.shutdown(Shutdown
::Read
);
882 let _
= s3
.shutdown(Shutdown
::Write
);
888 fn close_read_wakes_up() {
889 each_ip(&mut |addr
| {
890 let a
= t
!(TcpListener
::bind(&addr
));
891 let (tx1
, rx
) = channel
::<()>();
892 let _t
= thread
::spawn(move|| {
893 let _s
= t
!(a
.accept());
897 let s
= t
!(TcpStream
::connect(&addr
));
898 let s2
= t
!(s
.try_clone());
899 let (tx
, rx
) = channel();
900 let _t
= thread
::spawn(move|| {
902 assert_eq
!(t
!(s2
.read(&mut [0])), 0);
903 tx
.send(()).unwrap();
905 // this should wake up the child thread
906 t
!(s
.shutdown(Shutdown
::Read
));
908 // this test will never finish if the child doesn't wake up
915 fn clone_while_reading() {
916 each_ip(&mut |addr
| {
917 let accept
= t
!(TcpListener
::bind(&addr
));
919 // Enqueue a thread to write to a socket
920 let (tx
, rx
) = channel();
921 let (txdone
, rxdone
) = channel();
922 let txdone2
= txdone
.clone();
923 let _t
= thread
::spawn(move|| {
924 let mut tcp
= t
!(TcpStream
::connect(&addr
));
927 txdone2
.send(()).unwrap();
930 // Spawn off a reading clone
931 let tcp
= t
!(accept
.accept()).0;
932 let tcp2
= t
!(tcp
.try_clone());
933 let txdone3
= txdone
.clone();
934 let _t
= thread
::spawn(move|| {
936 t
!(tcp2
.read(&mut [0]));
937 txdone3
.send(()).unwrap();
940 // Try to ensure that the reading clone is indeed reading
945 // clone the handle again while it's reading, then let it finish the
947 let _
= t
!(tcp
.try_clone());
948 tx
.send(()).unwrap();
949 rxdone
.recv().unwrap();
950 rxdone
.recv().unwrap();
955 fn clone_accept_smoke() {
956 each_ip(&mut |addr
| {
957 let a
= t
!(TcpListener
::bind(&addr
));
958 let a2
= t
!(a
.try_clone());
960 let _t
= thread
::spawn(move|| {
961 let _
= TcpStream
::connect(&addr
);
963 let _t
= thread
::spawn(move|| {
964 let _
= TcpStream
::connect(&addr
);
973 fn clone_accept_concurrent() {
974 each_ip(&mut |addr
| {
975 let a
= t
!(TcpListener
::bind(&addr
));
976 let a2
= t
!(a
.try_clone());
978 let (tx
, rx
) = channel();
979 let tx2
= tx
.clone();
981 let _t
= thread
::spawn(move|| {
982 tx
.send(t
!(a
.accept())).unwrap();
984 let _t
= thread
::spawn(move|| {
985 tx2
.send(t
!(a2
.accept())).unwrap();
988 let _t
= thread
::spawn(move|| {
989 let _
= TcpStream
::connect(&addr
);
991 let _t
= thread
::spawn(move|| {
992 let _
= TcpStream
::connect(&addr
);
1002 let name
= if cfg
!(windows
) {"socket"}
else {"fd"}
;
1003 let socket_addr
= next_test_ip4();
1005 let listener
= t
!(TcpListener
::bind(&socket_addr
));
1006 let listener_inner
= listener
.0.socket().as_inner();
1007 let compare
= format
!("TcpListener {{ addr: {:?}, {}: {:?} }}",
1008 socket_addr
, name
, listener_inner
);
1009 assert_eq
!(format
!("{:?}", listener
), compare
);
1011 let stream
= t
!(TcpStream
::connect(&("localhost",
1012 socket_addr
.port())));
1013 let stream_inner
= stream
.0.socket().as_inner();
1014 let compare
= format
!("TcpStream {{ addr: {:?}, \
1015 peer: {:?}, {}: {:?} }}",
1016 stream
.local_addr().unwrap(),
1017 stream
.peer_addr().unwrap(),
1020 assert_eq
!(format
!("{:?}", stream
), compare
);
1023 // FIXME: re-enabled bitrig/openbsd tests once their socket timeout code
1024 // no longer has rounding errors.
1025 #[cfg_attr(any(target_os = "bitrig", target_os = "netbsd", target_os = "openbsd"), ignore)]
1028 let addr
= next_test_ip4();
1029 let listener
= t
!(TcpListener
::bind(&addr
));
1031 let stream
= t
!(TcpStream
::connect(&("localhost", addr
.port())));
1032 let dur
= Duration
::new(15410, 0);
1034 assert_eq
!(None
, t
!(stream
.read_timeout()));
1036 t
!(stream
.set_read_timeout(Some(dur
)));
1037 assert_eq
!(Some(dur
), t
!(stream
.read_timeout()));
1039 assert_eq
!(None
, t
!(stream
.write_timeout()));
1041 t
!(stream
.set_write_timeout(Some(dur
)));
1042 assert_eq
!(Some(dur
), t
!(stream
.write_timeout()));
1044 t
!(stream
.set_read_timeout(None
));
1045 assert_eq
!(None
, t
!(stream
.read_timeout()));
1047 t
!(stream
.set_write_timeout(None
));
1048 assert_eq
!(None
, t
!(stream
.write_timeout()));
1053 fn test_read_timeout() {
1054 let addr
= next_test_ip4();
1055 let listener
= t
!(TcpListener
::bind(&addr
));
1057 let mut stream
= t
!(TcpStream
::connect(&("localhost", addr
.port())));
1058 t
!(stream
.set_read_timeout(Some(Duration
::from_millis(1000))));
1060 let mut buf
= [0; 10];
1061 let start
= Instant
::now();
1062 let kind
= stream
.read(&mut buf
).err().expect("expected error").kind();
1063 assert
!(kind
== ErrorKind
::WouldBlock
|| kind
== ErrorKind
::TimedOut
);
1064 assert
!(start
.elapsed() > Duration
::from_millis(400));
1069 fn test_read_with_timeout() {
1070 let addr
= next_test_ip4();
1071 let listener
= t
!(TcpListener
::bind(&addr
));
1073 let mut stream
= t
!(TcpStream
::connect(&("localhost", addr
.port())));
1074 t
!(stream
.set_read_timeout(Some(Duration
::from_millis(1000))));
1076 let mut other_end
= t
!(listener
.accept()).0;
1077 t
!(other_end
.write_all(b
"hello world"));
1079 let mut buf
= [0; 11];
1080 t
!(stream
.read(&mut buf
));
1081 assert_eq
!(b
"hello world", &buf
[..]);
1083 let start
= Instant
::now();
1084 let kind
= stream
.read(&mut buf
).err().expect("expected error").kind();
1085 assert
!(kind
== ErrorKind
::WouldBlock
|| kind
== ErrorKind
::TimedOut
);
1086 assert
!(start
.elapsed() > Duration
::from_millis(400));
1092 let addr
= next_test_ip4();
1093 let _listener
= t
!(TcpListener
::bind(&addr
));
1095 let stream
= t
!(TcpStream
::connect(&("localhost", addr
.port())));
1097 assert_eq
!(false, t
!(stream
.nodelay()));
1098 t
!(stream
.set_nodelay(true));
1099 assert_eq
!(true, t
!(stream
.nodelay()));
1100 t
!(stream
.set_nodelay(false));
1101 assert_eq
!(false, t
!(stream
.nodelay()));
1108 let addr
= next_test_ip4();
1109 let listener
= t
!(TcpListener
::bind(&addr
));
1111 t
!(listener
.set_ttl(ttl
));
1112 assert_eq
!(ttl
, t
!(listener
.ttl()));
1114 let stream
= t
!(TcpStream
::connect(&("localhost", addr
.port())));
1116 t
!(stream
.set_ttl(ttl
));
1117 assert_eq
!(ttl
, t
!(stream
.ttl()));
1121 fn set_nonblocking() {
1122 let addr
= next_test_ip4();
1123 let listener
= t
!(TcpListener
::bind(&addr
));
1125 t
!(listener
.set_nonblocking(true));
1126 t
!(listener
.set_nonblocking(false));
1128 let mut stream
= t
!(TcpStream
::connect(&("localhost", addr
.port())));
1130 t
!(stream
.set_nonblocking(false));
1131 t
!(stream
.set_nonblocking(true));
1134 match stream
.read(&mut buf
) {
1135 Ok(_
) => panic
!("expected error"),
1136 Err(ref e
) if e
.kind() == ErrorKind
::WouldBlock
=> {}
1137 Err(e
) => panic
!("unexpected error {}", e
),