1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! TCP network connections
13 //! This module contains the ability to open a TCP stream to a socket address,
14 //! as well as creating a socket server to accept incoming connections. The
15 //! destination and binding addresses can either be an IPv4 or IPv6 address.
17 //! A TCP connection implements the `Reader` and `Writer` traits, while the TCP
18 //! listener (socket server) implements the `Listener` and `Acceptor` traits.
22 use result
::Result
::Err
;
23 use old_io
::net
::ip
::{SocketAddr, ToSocketAddr}
;
24 use old_io
::{Reader, Writer, Listener, Acceptor}
;
25 use old_io
::{standard_error, TimedOut}
;
27 use option
::Option
::{None, Some}
;
30 use sys
::tcp
::TcpStream
as TcpStreamImp
;
31 use sys
::tcp
::TcpListener
as TcpListenerImp
;
32 use sys
::tcp
::TcpAcceptor
as TcpAcceptorImp
;
36 /// A structure which represents a TCP stream between a local socket and a
39 /// The socket will be closed when the value is dropped.
44 /// # #![feature(old_io, io)]
45 /// use std::old_io::*;
48 /// let mut stream = TcpStream::connect("127.0.0.1:34254");
50 /// // ignore the Result
51 /// let _ = stream.write(&[1]);
53 /// let mut buf = [0];
54 /// let _ = stream.read(&mut buf); // ignore here too
55 /// } // the stream is closed here
57 pub struct TcpStream
{
62 fn new(s
: TcpStreamImp
) -> TcpStream
{
63 TcpStream { inner: s }
66 /// Open a TCP connection to a remote host.
68 /// `addr` is an address of the remote host. Anything which implements `ToSocketAddr`
69 /// trait can be supplied for the address; see this trait documentation for
70 /// concrete examples.
71 pub fn connect
<A
: ToSocketAddr
>(addr
: A
) -> IoResult
<TcpStream
> {
72 super::with_addresses(addr
, |addr
| {
73 TcpStreamImp
::connect(addr
, None
).map(TcpStream
::new
)
77 /// Creates a TCP connection to a remote socket address, timing out after
78 /// the specified duration.
80 /// This is the same as the `connect` method, except that if the timeout
81 /// specified elapses before a connection is made an error will be
82 /// returned. The error's kind will be `TimedOut`.
84 /// Same as the `connect` method, `addr` argument type can be anything which
85 /// implements `ToSocketAddr` trait.
87 /// If a `timeout` with zero or negative duration is specified then
88 /// the function returns `Err`, with the error kind set to `TimedOut`.
89 #[unstable(feature = "io",
90 reason
= "the timeout argument may eventually change types")]
91 pub fn connect_timeout
<A
: ToSocketAddr
>(addr
: A
,
92 timeout
: Duration
) -> IoResult
<TcpStream
> {
93 if timeout
<= Duration
::milliseconds(0) {
94 return Err(standard_error(TimedOut
));
97 super::with_addresses(addr
, |addr
| {
98 TcpStreamImp
::connect(addr
, Some(timeout
.num_milliseconds() as u64))
103 /// Returns the socket address of the remote peer of this TCP connection.
104 pub fn peer_name(&mut self) -> IoResult
<SocketAddr
> {
105 self.inner
.peer_name()
108 /// Returns the socket address of the local half of this TCP connection.
109 pub fn socket_name(&mut self) -> IoResult
<SocketAddr
> {
110 self.inner
.socket_name()
113 /// Sets the nodelay flag on this connection to the boolean specified
114 #[unstable(feature = "io")]
115 pub fn set_nodelay(&mut self, nodelay
: bool
) -> IoResult
<()> {
116 self.inner
.set_nodelay(nodelay
)
119 /// Sets the keepalive timeout to the timeout specified.
121 /// If the value specified is `None`, then the keepalive flag is cleared on
122 /// this connection. Otherwise, the keepalive timeout will be set to the
123 /// specified time, in seconds.
124 #[unstable(feature = "io")]
125 pub fn set_keepalive(&mut self, delay_in_seconds
: Option
<usize>) -> IoResult
<()> {
126 self.inner
.set_keepalive(delay_in_seconds
)
129 /// Closes the reading half of this connection.
131 /// This method will close the reading portion of this connection, causing
132 /// all pending and future reads to immediately return with an error.
137 /// # #![feature(old_io, std_misc)]
138 /// # #![allow(unused_must_use)]
139 /// use std::old_io::*;
140 /// use std::time::Duration;
143 /// let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();
144 /// let stream2 = stream.clone();
146 /// let _t = thread::spawn(move|| {
147 /// // close this stream after one second
148 /// timer::sleep(Duration::seconds(1));
149 /// let mut stream = stream2;
150 /// stream.close_read();
153 /// // wait for some data, will get canceled after one second
154 /// let mut buf = [0];
155 /// stream.read(&mut buf);
158 /// Note that this method affects all cloned handles associated with this
159 /// stream, not just this one handle.
160 pub fn close_read(&mut self) -> IoResult
<()> {
161 self.inner
.close_read()
164 /// Closes the writing half of this connection.
166 /// This method will close the writing portion of this connection, causing
167 /// all future writes to immediately return with an error.
169 /// Note that this method affects all cloned handles associated with this
170 /// stream, not just this one handle.
171 pub fn close_write(&mut self) -> IoResult
<()> {
172 self.inner
.close_write()
175 /// Sets a timeout, in milliseconds, for blocking operations on this stream.
177 /// This function will set a timeout for all blocking operations (including
178 /// reads and writes) on this stream. The timeout specified is a relative
179 /// time, in milliseconds, into the future after which point operations will
180 /// time out. This means that the timeout must be reset periodically to keep
181 /// it from expiring. Specifying a value of `None` will clear the timeout
184 /// The timeout on this stream is local to this stream only. Setting a
185 /// timeout does not affect any other cloned instances of this stream, nor
186 /// does the timeout propagated to cloned handles of this stream. Setting
187 /// this timeout will override any specific read or write timeouts
188 /// previously set for this stream.
190 /// For clarification on the semantics of interrupting a read and a write,
191 /// take a look at `set_read_timeout` and `set_write_timeout`.
192 #[unstable(feature = "io",
193 reason
= "the timeout argument may change in type and value")]
194 pub fn set_timeout(&mut self, timeout_ms
: Option
<u64>) {
195 self.inner
.set_timeout(timeout_ms
)
198 /// Sets the timeout for read operations on this stream.
200 /// See documentation in `set_timeout` for the semantics of this read time.
201 /// This will overwrite any previous read timeout set through either this
202 /// function or `set_timeout`.
206 /// When this timeout expires, if there is no pending read operation, no
207 /// action is taken. Otherwise, the read operation will be scheduled to
208 /// promptly return. If a timeout error is returned, then no data was read
209 /// during the timeout period.
210 #[unstable(feature = "io",
211 reason
= "the timeout argument may change in type and value")]
212 pub fn set_read_timeout(&mut self, timeout_ms
: Option
<u64>) {
213 self.inner
.set_read_timeout(timeout_ms
)
216 /// Sets the timeout for write operations on this stream.
218 /// See documentation in `set_timeout` for the semantics of this write time.
219 /// This will overwrite any previous write timeout set through either this
220 /// function or `set_timeout`.
224 /// When this timeout expires, if there is no pending write operation, no
225 /// action is taken. Otherwise, the pending write operation will be
226 /// scheduled to promptly return. The actual state of the underlying stream
227 /// is not specified.
229 /// The write operation may return an error of type `ShortWrite` which
230 /// indicates that the object is known to have written an exact number of
231 /// bytes successfully during the timeout period, and the remaining bytes
232 /// were never written.
234 /// If the write operation returns `TimedOut`, then it the timeout primitive
235 /// does not know how many bytes were written as part of the timeout
236 /// operation. It may be the case that bytes continue to be written in an
237 /// asynchronous fashion after the call to write returns.
238 #[unstable(feature = "io",
239 reason
= "the timeout argument may change in type and value")]
240 pub fn set_write_timeout(&mut self, timeout_ms
: Option
<u64>) {
241 self.inner
.set_write_timeout(timeout_ms
)
245 impl Clone
for TcpStream
{
246 /// Creates a new handle to this TCP stream, allowing for simultaneous reads
247 /// and writes of this connection.
249 /// The underlying TCP stream will not be closed until all handles to the
250 /// stream have been deallocated. All handles will also follow the same
251 /// stream, but two concurrent reads will not receive the same data.
252 /// Instead, the first read will receive the first packet received, and the
253 /// second read will receive the second packet.
254 fn clone(&self) -> TcpStream
{
255 TcpStream { inner: self.inner.clone() }
259 impl Reader
for TcpStream
{
260 fn read(&mut self, buf
: &mut [u8]) -> IoResult
<usize> {
265 impl Writer
for TcpStream
{
266 fn write_all(&mut self, buf
: &[u8]) -> IoResult
<()> {
267 self.inner
.write(buf
)
271 impl sys_common
::AsInner
<TcpStreamImp
> for TcpStream
{
272 fn as_inner(&self) -> &TcpStreamImp
{
277 /// A structure representing a socket server. This listener is used to create a
278 /// `TcpAcceptor` which can be used to accept sockets on a local port.
283 /// # #![feature(old_io)]
285 /// use std::old_io::*;
288 /// let listener = TcpListener::bind("127.0.0.1:80").unwrap();
290 /// // bind the listener to the specified address
291 /// let mut acceptor = listener.listen().unwrap();
293 /// fn handle_client(mut stream: TcpStream) {
295 /// # &mut stream; // silence unused mutability/variable warning
297 /// // accept connections and process them, spawning a new tasks for each one
298 /// for stream in acceptor.incoming() {
300 /// Err(e) => { /* connection failed */ }
302 /// thread::spawn(move|| {
303 /// // connection succeeded
304 /// handle_client(stream)
310 /// // close the socket server
314 pub struct TcpListener
{
315 inner
: TcpListenerImp
,
319 /// Creates a new `TcpListener` which will be bound to the specified address.
320 /// This listener is not ready for accepting connections, `listen` must be called
321 /// on it before that's possible.
323 /// Binding with a port number of 0 will request that the OS assigns a port
324 /// to this listener. The port allocated can be queried via the
325 /// `socket_name` function.
327 /// The address type can be any implementer of `ToSocketAddr` trait. See its
328 /// documentation for concrete examples.
329 pub fn bind
<A
: ToSocketAddr
>(addr
: A
) -> IoResult
<TcpListener
> {
330 super::with_addresses(addr
, |addr
| {
331 TcpListenerImp
::bind(addr
).map(|inner
| TcpListener { inner: inner }
)
335 /// Returns the local socket address of this listener.
336 pub fn socket_name(&mut self) -> IoResult
<SocketAddr
> {
337 self.inner
.socket_name()
341 impl Listener
<TcpAcceptor
> for TcpListener
{
342 fn listen(self) -> IoResult
<TcpAcceptor
> {
343 self.inner
.listen(128).map(|a
| TcpAcceptor { inner: a }
)
347 impl sys_common
::AsInner
<TcpListenerImp
> for TcpListener
{
348 fn as_inner(&self) -> &TcpListenerImp
{
353 /// The accepting half of a TCP socket server. This structure is created through
354 /// a `TcpListener`'s `listen` method, and this object can be used to accept new
355 /// `TcpStream` instances.
356 pub struct TcpAcceptor
{
357 inner
: TcpAcceptorImp
,
361 /// Prevents blocking on all future accepts after `ms` milliseconds have
364 /// This function is used to set a deadline after which this acceptor will
365 /// time out accepting any connections. The argument is the relative
366 /// distance, in milliseconds, to a point in the future after which all
367 /// accepts will fail.
369 /// If the argument specified is `None`, then any previously registered
370 /// timeout is cleared.
372 /// A timeout of `0` can be used to "poll" this acceptor to see if it has
373 /// any pending connections. All pending connections will be accepted,
374 /// regardless of whether the timeout has expired or not (the accept will
375 /// not block in this case).
380 /// # #![feature(old_io, io)]
381 /// use std::old_io::*;
383 /// let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap();
385 /// // After 100ms have passed, all accepts will fail
386 /// a.set_timeout(Some(100));
388 /// match a.accept() {
389 /// Ok(..) => println!("accepted a socket"),
390 /// Err(ref e) if e.kind == TimedOut => { println!("timed out!"); }
391 /// Err(e) => println!("err: {}", e),
394 /// // Reset the timeout and try again
395 /// a.set_timeout(Some(100));
396 /// let socket = a.accept();
398 /// // Clear the timeout and block indefinitely waiting for a connection
399 /// a.set_timeout(None);
400 /// let socket = a.accept();
402 #[unstable(feature = "io",
403 reason
= "the type of the argument and name of this function are \
405 pub fn set_timeout(&mut self, ms
: Option
<u64>) { self.inner.set_timeout(ms); }
407 /// Closes the accepting capabilities of this acceptor.
409 /// This function is similar to `TcpStream`'s `close_{read,write}` methods
410 /// in that it will affect *all* cloned handles of this acceptor's original
413 /// Once this function succeeds, all future calls to `accept` will return
414 /// immediately with an error, preventing all future calls to accept. The
415 /// underlying socket will not be relinquished back to the OS until all
416 /// acceptors have been deallocated.
418 /// This is useful for waking up a thread in an accept loop to indicate that
424 /// # #![feature(old_io, io)]
425 /// use std::old_io::*;
428 /// let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap();
429 /// let a2 = a.clone();
431 /// let _t = thread::spawn(move|| {
433 /// for socket in a2.incoming() {
435 /// Ok(s) => { /* handle s */ }
436 /// Err(ref e) if e.kind == EndOfFile => break, // closed
437 /// Err(e) => panic!("unexpected error: {}", e),
442 /// # fn wait_for_sigint() {}
443 /// // Now that our accept loop is running, wait for the program to be
444 /// // requested to exit.
445 /// wait_for_sigint();
447 /// // Signal our accept loop to exit
448 /// assert!(a.close_accept().is_ok());
450 #[unstable(feature = "io")]
451 pub fn close_accept(&mut self) -> IoResult
<()> {
452 self.inner
.close_accept()
456 impl Acceptor
for TcpAcceptor
{
457 type Connection
= TcpStream
;
458 fn accept(&mut self) -> IoResult
<TcpStream
> {
459 self.inner
.accept().map(TcpStream
::new
)
463 impl Clone
for TcpAcceptor
{
464 /// Creates a new handle to this TCP acceptor, allowing for simultaneous
467 /// The underlying TCP acceptor will not be closed until all handles to the
468 /// acceptor have been deallocated. Incoming connections will be received on
469 /// at most once acceptor, the same connection will not be accepted twice.
471 /// The `close_accept` method will shut down *all* acceptors cloned from the
472 /// same original acceptor, whereas the `set_timeout` method only affects
473 /// the selector that it is called on.
475 /// This function is useful for creating a handle to invoke `close_accept`
476 /// on to wake up any other task blocked in `accept`.
477 fn clone(&self) -> TcpAcceptor
{
478 TcpAcceptor { inner: self.inner.clone() }
482 impl sys_common
::AsInner
<TcpAcceptorImp
> for TcpAcceptor
{
483 fn as_inner(&self) -> &TcpAcceptorImp
{
492 use sync
::mpsc
::channel
;
494 use old_io
::net
::tcp
::*;
495 use old_io
::net
::ip
::*;
497 use old_io
::{EndOfFile, TimedOut, ShortWrite, IoError}
;
498 use old_io
::{ConnectionRefused, BrokenPipe, ConnectionAborted}
;
499 use old_io
::{ConnectionReset, NotConnected, PermissionDenied, OtherIoError}
;
500 use old_io
::{InvalidInput}
;
501 use old_io
::{Acceptor, Listener}
;
502 use old_io
::{Reader, Writer}
;
504 // FIXME #11530 this fails on android because tests are run as root
505 #[cfg_attr(any(windows, target_os = "android"), ignore)]
508 match TcpListener
::bind("0.0.0.0:1") {
510 Err(e
) => assert_eq
!(e
.kind
, PermissionDenied
),
516 match TcpStream
::connect("0.0.0.0:1") {
518 Err(e
) => assert
!((e
.kind
== ConnectionRefused
)
519 || (e
.kind
== InvalidInput
)),
524 fn listen_ip4_localhost() {
525 let socket_addr
= next_test_ip4();
526 let listener
= TcpListener
::bind(socket_addr
);
527 let mut acceptor
= listener
.listen();
529 let _t
= thread
::spawn(move|| {
530 let mut stream
= TcpStream
::connect(("localhost", socket_addr
.port
));
531 stream
.write(&[144]).unwrap();
534 let mut stream
= acceptor
.accept();
536 stream
.read(&mut buf
).unwrap();
537 assert
!(buf
[0] == 144);
541 fn connect_localhost() {
542 let addr
= next_test_ip4();
543 let mut acceptor
= TcpListener
::bind(addr
).listen();
545 let _t
= thread
::spawn(move|| {
546 let mut stream
= TcpStream
::connect(("localhost", addr
.port
));
547 stream
.write(&[64]).unwrap();
550 let mut stream
= acceptor
.accept();
552 stream
.read(&mut buf
).unwrap();
553 assert
!(buf
[0] == 64);
557 fn connect_ip4_loopback() {
558 let addr
= next_test_ip4();
559 let mut acceptor
= TcpListener
::bind(addr
).listen();
561 let _t
= thread
::spawn(move|| {
562 let mut stream
= TcpStream
::connect(("127.0.0.1", addr
.port
));
563 stream
.write(&[44]).unwrap();
566 let mut stream
= acceptor
.accept();
568 stream
.read(&mut buf
).unwrap();
569 assert
!(buf
[0] == 44);
573 fn connect_ip6_loopback() {
574 let addr
= next_test_ip6();
575 let mut acceptor
= TcpListener
::bind(addr
).listen();
577 let _t
= thread
::spawn(move|| {
578 let mut stream
= TcpStream
::connect(("::1", addr
.port
));
579 stream
.write(&[66]).unwrap();
582 let mut stream
= acceptor
.accept();
584 stream
.read(&mut buf
).unwrap();
585 assert
!(buf
[0] == 66);
589 fn smoke_test_ip4() {
590 let addr
= next_test_ip4();
591 let mut acceptor
= TcpListener
::bind(addr
).listen();
593 let _t
= thread
::spawn(move|| {
594 let mut stream
= TcpStream
::connect(addr
);
595 stream
.write(&[99]).unwrap();
598 let mut stream
= acceptor
.accept();
600 stream
.read(&mut buf
).unwrap();
601 assert
!(buf
[0] == 99);
605 fn smoke_test_ip6() {
606 let addr
= next_test_ip6();
607 let mut acceptor
= TcpListener
::bind(addr
).listen();
609 let _t
= thread
::spawn(move|| {
610 let mut stream
= TcpStream
::connect(addr
);
611 stream
.write(&[99]).unwrap();
614 let mut stream
= acceptor
.accept();
616 stream
.read(&mut buf
).unwrap();
617 assert
!(buf
[0] == 99);
622 let addr
= next_test_ip4();
623 let mut acceptor
= TcpListener
::bind(addr
).listen();
625 let _t
= thread
::spawn(move|| {
626 let _stream
= TcpStream
::connect(addr
);
630 let mut stream
= acceptor
.accept();
632 let nread
= stream
.read(&mut buf
);
633 assert
!(nread
.is_err());
638 let addr
= next_test_ip6();
639 let mut acceptor
= TcpListener
::bind(addr
).listen();
641 let _t
= thread
::spawn(move|| {
642 let _stream
= TcpStream
::connect(addr
);
646 let mut stream
= acceptor
.accept();
648 let nread
= stream
.read(&mut buf
);
649 assert
!(nread
.is_err());
653 fn read_eof_twice_ip4() {
654 let addr
= next_test_ip4();
655 let mut acceptor
= TcpListener
::bind(addr
).listen();
657 let _t
= thread
::spawn(move|| {
658 let _stream
= TcpStream
::connect(addr
);
662 let mut stream
= acceptor
.accept();
664 let nread
= stream
.read(&mut buf
);
665 assert
!(nread
.is_err());
667 match stream
.read(&mut buf
) {
670 assert
!(e
.kind
== NotConnected
|| e
.kind
== EndOfFile
,
671 "unknown kind: {:?}", e
.kind
);
677 fn read_eof_twice_ip6() {
678 let addr
= next_test_ip6();
679 let mut acceptor
= TcpListener
::bind(addr
).listen();
681 let _t
= thread
::spawn(move|| {
682 let _stream
= TcpStream
::connect(addr
);
686 let mut stream
= acceptor
.accept();
688 let nread
= stream
.read(&mut buf
);
689 assert
!(nread
.is_err());
691 match stream
.read(&mut buf
) {
694 assert
!(e
.kind
== NotConnected
|| e
.kind
== EndOfFile
,
695 "unknown kind: {:?}", e
.kind
);
701 fn write_close_ip4() {
702 let addr
= next_test_ip4();
703 let mut acceptor
= TcpListener
::bind(addr
).listen();
705 let (tx
, rx
) = channel();
706 let _t
= thread
::spawn(move|| {
707 drop(TcpStream
::connect(addr
));
708 tx
.send(()).unwrap();
711 let mut stream
= acceptor
.accept();
714 match stream
.write(&buf
) {
717 assert
!(e
.kind
== ConnectionReset
||
718 e
.kind
== BrokenPipe
||
719 e
.kind
== ConnectionAborted
,
720 "unknown error: {}", e
);
726 fn write_close_ip6() {
727 let addr
= next_test_ip6();
728 let mut acceptor
= TcpListener
::bind(addr
).listen();
730 let (tx
, rx
) = channel();
731 let _t
= thread
::spawn(move|| {
732 drop(TcpStream
::connect(addr
));
733 tx
.send(()).unwrap();
736 let mut stream
= acceptor
.accept();
739 match stream
.write(&buf
) {
742 assert
!(e
.kind
== ConnectionReset
||
743 e
.kind
== BrokenPipe
||
744 e
.kind
== ConnectionAborted
,
745 "unknown error: {}", e
);
751 fn multiple_connect_serial_ip4() {
752 let addr
= next_test_ip4();
754 let mut acceptor
= TcpListener
::bind(addr
).listen();
756 let _t
= thread
::spawn(move|| {
758 let mut stream
= TcpStream
::connect(addr
);
759 stream
.write(&[99]).unwrap();
763 for ref mut stream
in acceptor
.incoming().take(max
) {
765 stream
.read(&mut buf
).unwrap();
766 assert_eq
!(buf
[0], 99);
771 fn multiple_connect_serial_ip6() {
772 let addr
= next_test_ip6();
774 let mut acceptor
= TcpListener
::bind(addr
).listen();
776 let _t
= thread
::spawn(move|| {
778 let mut stream
= TcpStream
::connect(addr
);
779 stream
.write(&[99]).unwrap();
783 for ref mut stream
in acceptor
.incoming().take(max
) {
785 stream
.read(&mut buf
).unwrap();
786 assert_eq
!(buf
[0], 99);
791 fn multiple_connect_interleaved_greedy_schedule_ip4() {
792 let addr
= next_test_ip4();
793 static MAX
: isize = 10;
794 let acceptor
= TcpListener
::bind(addr
).listen();
796 let _t
= thread
::spawn(move|| {
797 let mut acceptor
= acceptor
;
798 for (i
, stream
) in acceptor
.incoming().enumerate().take(MAX
as usize) {
799 // Start another task to handle the connection
800 let _t
= thread
::spawn(move|| {
801 let mut stream
= stream
;
803 stream
.read(&mut buf
).unwrap();
804 assert
!(buf
[0] == i
as u8);
812 fn connect(i
: isize, addr
: SocketAddr
) {
813 if i
== MAX { return }
815 let _t
= thread
::spawn(move|| {
816 debug
!("connecting");
817 let mut stream
= TcpStream
::connect(addr
);
818 // Connect again before writing
819 connect(i
+ 1, addr
);
821 stream
.write(&[i
as u8]).unwrap();
827 fn multiple_connect_interleaved_greedy_schedule_ip6() {
828 let addr
= next_test_ip6();
829 static MAX
: isize = 10;
830 let acceptor
= TcpListener
::bind(addr
).listen();
832 let _t
= thread
::spawn(move|| {
833 let mut acceptor
= acceptor
;
834 for (i
, stream
) in acceptor
.incoming().enumerate().take(MAX
as usize) {
835 // Start another task to handle the connection
836 let _t
= thread
::spawn(move|| {
837 let mut stream
= stream
;
839 stream
.read(&mut buf
).unwrap();
840 assert
!(buf
[0] == i
as u8);
848 fn connect(i
: isize, addr
: SocketAddr
) {
849 if i
== MAX { return }
851 let _t
= thread
::spawn(move|| {
852 debug
!("connecting");
853 let mut stream
= TcpStream
::connect(addr
);
854 // Connect again before writing
855 connect(i
+ 1, addr
);
857 stream
.write(&[i
as u8]).unwrap();
863 fn multiple_connect_interleaved_lazy_schedule_ip4() {
864 static MAX
: isize = 10;
865 let addr
= next_test_ip4();
866 let acceptor
= TcpListener
::bind(addr
).listen();
868 let _t
= thread
::spawn(move|| {
869 let mut acceptor
= acceptor
;
870 for stream
in acceptor
.incoming().take(MAX
as usize) {
871 // Start another task to handle the connection
872 let _t
= thread
::spawn(move|| {
873 let mut stream
= stream
;
875 stream
.read(&mut buf
).unwrap();
876 assert
!(buf
[0] == 99);
884 fn connect(i
: isize, addr
: SocketAddr
) {
885 if i
== MAX { return }
887 let _t
= thread
::spawn(move|| {
888 debug
!("connecting");
889 let mut stream
= TcpStream
::connect(addr
);
890 // Connect again before writing
891 connect(i
+ 1, addr
);
893 stream
.write(&[99]).unwrap();
899 fn multiple_connect_interleaved_lazy_schedule_ip6() {
900 static MAX
: isize = 10;
901 let addr
= next_test_ip6();
902 let acceptor
= TcpListener
::bind(addr
).listen();
904 let _t
= thread
::spawn(move|| {
905 let mut acceptor
= acceptor
;
906 for stream
in acceptor
.incoming().take(MAX
as usize) {
907 // Start another task to handle the connection
908 let _t
= thread
::spawn(move|| {
909 let mut stream
= stream
;
911 stream
.read(&mut buf
).unwrap();
912 assert
!(buf
[0] == 99);
920 fn connect(i
: isize, addr
: SocketAddr
) {
921 if i
== MAX { return }
923 let _t
= thread
::spawn(move|| {
924 debug
!("connecting");
925 let mut stream
= TcpStream
::connect(addr
);
926 // Connect again before writing
927 connect(i
+ 1, addr
);
929 stream
.write(&[99]).unwrap();
934 pub fn socket_name(addr
: SocketAddr
) {
935 let mut listener
= TcpListener
::bind(addr
).unwrap();
937 // Make sure socket_name gives
938 // us the socket we binded to.
939 let so_name
= listener
.socket_name();
940 assert
!(so_name
.is_ok());
941 assert_eq
!(addr
, so_name
.unwrap());
944 pub fn peer_name(addr
: SocketAddr
) {
945 let acceptor
= TcpListener
::bind(addr
).listen();
946 let _t
= thread
::spawn(move|| {
947 let mut acceptor
= acceptor
;
948 acceptor
.accept().unwrap();
951 let stream
= TcpStream
::connect(addr
);
953 assert
!(stream
.is_ok());
954 let mut stream
= stream
.unwrap();
956 // Make sure peer_name gives us the
957 // address/port of the peer we've
959 let peer_name
= stream
.peer_name();
960 assert
!(peer_name
.is_ok());
961 assert_eq
!(addr
, peer_name
.unwrap());
965 fn socket_and_peer_name_ip4() {
966 peer_name(next_test_ip4());
967 socket_name(next_test_ip4());
971 fn socket_and_peer_name_ip6() {
972 // FIXME: peer name is not consistent
973 //peer_name(next_test_ip6());
974 socket_name(next_test_ip6());
979 let addr
= next_test_ip4();
980 let (tx
, rx
) = channel();
981 let _t
= thread
::spawn(move|| {
982 let mut srv
= TcpListener
::bind(addr
).listen().unwrap();
983 tx
.send(()).unwrap();
984 let mut cl
= srv
.accept().unwrap();
985 cl
.write(&[10]).unwrap();
987 cl
.read(&mut b
).unwrap();
988 tx
.send(()).unwrap();
992 let mut c
= TcpStream
::connect(addr
).unwrap();
994 assert_eq
!(c
.read(&mut b
), Ok(1));
995 c
.write(&[1]).unwrap();
1001 let addr
= next_test_ip4();
1002 let listener
= TcpListener
::bind(addr
).unwrap().listen();
1003 assert
!(listener
.is_ok());
1004 match TcpListener
::bind(addr
).listen() {
1007 assert
!(e
.kind
== ConnectionRefused
|| e
.kind
== OtherIoError
,
1008 "unknown error: {} {:?}", e
, e
.kind
);
1015 let addr
= next_test_ip4();
1016 let (tx
, rx
) = channel();
1018 let _t
= thread
::spawn(move|| {
1020 let _stream
= TcpStream
::connect(addr
).unwrap();
1026 let mut acceptor
= TcpListener
::bind(addr
).listen();
1027 tx
.send(()).unwrap();
1029 let _stream
= acceptor
.accept().unwrap();
1031 tx
.send(()).unwrap();
1035 let _listener
= TcpListener
::bind(addr
);
1039 fn tcp_clone_smoke() {
1040 let addr
= next_test_ip4();
1041 let mut acceptor
= TcpListener
::bind(addr
).listen();
1043 let _t
= thread
::spawn(move|| {
1044 let mut s
= TcpStream
::connect(addr
);
1045 let mut buf
= [0, 0];
1046 assert_eq
!(s
.read(&mut buf
), Ok(1));
1047 assert_eq
!(buf
[0], 1);
1048 s
.write(&[2]).unwrap();
1051 let mut s1
= acceptor
.accept().unwrap();
1052 let s2
= s1
.clone();
1054 let (tx1
, rx1
) = channel();
1055 let (tx2
, rx2
) = channel();
1056 let _t
= thread
::spawn(move|| {
1058 rx1
.recv().unwrap();
1059 s2
.write(&[1]).unwrap();
1060 tx2
.send(()).unwrap();
1062 tx1
.send(()).unwrap();
1063 let mut buf
= [0, 0];
1064 assert_eq
!(s1
.read(&mut buf
), Ok(1));
1065 rx2
.recv().unwrap();
1069 fn tcp_clone_two_read() {
1070 let addr
= next_test_ip6();
1071 let mut acceptor
= TcpListener
::bind(addr
).listen();
1072 let (tx1
, rx
) = channel();
1073 let tx2
= tx1
.clone();
1075 let _t
= thread
::spawn(move|| {
1076 let mut s
= TcpStream
::connect(addr
);
1077 s
.write(&[1]).unwrap();
1079 s
.write(&[2]).unwrap();
1083 let mut s1
= acceptor
.accept().unwrap();
1084 let s2
= s1
.clone();
1086 let (done
, rx
) = channel();
1087 let _t
= thread
::spawn(move|| {
1089 let mut buf
= [0, 0];
1090 s2
.read(&mut buf
).unwrap();
1091 tx2
.send(()).unwrap();
1092 done
.send(()).unwrap();
1094 let mut buf
= [0, 0];
1095 s1
.read(&mut buf
).unwrap();
1096 tx1
.send(()).unwrap();
1102 fn tcp_clone_two_write() {
1103 let addr
= next_test_ip4();
1104 let mut acceptor
= TcpListener
::bind(addr
).listen();
1106 let _t
= thread
::spawn(move|| {
1107 let mut s
= TcpStream
::connect(addr
);
1108 let mut buf
= [0, 1];
1109 s
.read(&mut buf
).unwrap();
1110 s
.read(&mut buf
).unwrap();
1113 let mut s1
= acceptor
.accept().unwrap();
1114 let s2
= s1
.clone();
1116 let (done
, rx
) = channel();
1117 let _t
= thread
::spawn(move|| {
1119 s2
.write(&[1]).unwrap();
1120 done
.send(()).unwrap();
1122 s1
.write(&[2]).unwrap();
1128 fn shutdown_smoke() {
1129 let addr
= next_test_ip4();
1130 let a
= TcpListener
::bind(addr
).unwrap().listen();
1131 let _t
= thread
::spawn(move|| {
1133 let mut c
= a
.accept().unwrap();
1134 assert_eq
!(c
.read_to_end(), Ok(vec
!()));
1135 c
.write(&[1]).unwrap();
1138 let mut s
= TcpStream
::connect(addr
).unwrap();
1139 assert
!(s
.inner
.close_write().is_ok());
1140 assert
!(s
.write(&[1]).is_err());
1141 assert_eq
!(s
.read_to_end(), Ok(vec
!(1)));
1145 fn accept_timeout() {
1146 let addr
= next_test_ip4();
1147 let mut a
= TcpListener
::bind(addr
).unwrap().listen().unwrap();
1149 a
.set_timeout(Some(10));
1151 // Make sure we time out once and future invocations also time out
1152 let err
= a
.accept().err().unwrap();
1153 assert_eq
!(err
.kind
, TimedOut
);
1154 let err
= a
.accept().err().unwrap();
1155 assert_eq
!(err
.kind
, TimedOut
);
1157 // Also make sure that even though the timeout is expired that we will
1158 // continue to receive any pending connections.
1160 // FIXME: freebsd apparently never sees the pending connection, but
1161 // testing manually always works. Need to investigate this
1163 if !cfg
!(target_os
= "freebsd") {
1164 let (tx
, rx
) = channel();
1165 let _t
= thread
::spawn(move|| {
1166 tx
.send(TcpStream
::connect(addr
).unwrap()).unwrap();
1168 let _l
= rx
.recv().unwrap();
1172 Err(ref e
) if e
.kind
== TimedOut
=> {}
1173 Err(e
) => panic
!("error: {}", e
),
1175 ::thread
::yield_now();
1176 if i
== 1000 { panic!("should have a pending connection") }
1180 // Unset the timeout and make sure that this always blocks.
1181 a
.set_timeout(None
);
1182 let _t
= thread
::spawn(move|| {
1183 drop(TcpStream
::connect(addr
).unwrap());
1185 a
.accept().unwrap();
1189 fn close_readwrite_smoke() {
1190 let addr
= next_test_ip4();
1191 let a
= TcpListener
::bind(addr
).listen().unwrap();
1192 let (_tx
, rx
) = channel
::<()>();
1193 thread
::spawn(move|| {
1195 let _s
= a
.accept().unwrap();
1196 let _
= rx
.recv().unwrap();
1200 let mut s
= TcpStream
::connect(addr
).unwrap();
1201 let mut s2
= s
.clone();
1203 // closing should prevent reads/writes
1204 s
.close_write().unwrap();
1205 assert
!(s
.write(&[0]).is_err());
1206 s
.close_read().unwrap();
1207 assert
!(s
.read(&mut b
).is_err());
1209 // closing should affect previous handles
1210 assert
!(s2
.write(&[0]).is_err());
1211 assert
!(s2
.read(&mut b
).is_err());
1213 // closing should affect new handles
1214 let mut s3
= s
.clone();
1215 assert
!(s3
.write(&[0]).is_err());
1216 assert
!(s3
.read(&mut b
).is_err());
1218 // make sure these don't die
1219 let _
= s2
.close_read();
1220 let _
= s2
.close_write();
1221 let _
= s3
.close_read();
1222 let _
= s3
.close_write();
1226 fn close_read_wakes_up() {
1227 let addr
= next_test_ip4();
1228 let a
= TcpListener
::bind(addr
).listen().unwrap();
1229 let (_tx
, rx
) = channel
::<()>();
1230 thread
::spawn(move|| {
1232 let _s
= a
.accept().unwrap();
1233 let _
= rx
.recv().unwrap();
1236 let mut s
= TcpStream
::connect(addr
).unwrap();
1238 let (tx
, rx
) = channel();
1239 let _t
= thread
::spawn(move|| {
1241 assert
!(s2
.read(&mut [0]).is_err());
1242 tx
.send(()).unwrap();
1244 // this should wake up the child task
1245 s
.close_read().unwrap();
1247 // this test will never finish if the child doesn't wake up
1252 fn readwrite_timeouts() {
1253 let addr
= next_test_ip6();
1254 let mut a
= TcpListener
::bind(addr
).listen().unwrap();
1255 let (tx
, rx
) = channel
::<()>();
1256 thread
::spawn(move|| {
1257 let mut s
= TcpStream
::connect(addr
).unwrap();
1259 assert
!(s
.write(&[0]).is_ok());
1263 let mut s
= a
.accept().unwrap();
1264 s
.set_timeout(Some(20));
1265 assert_eq
!(s
.read(&mut [0]).err().unwrap().kind
, TimedOut
);
1266 assert_eq
!(s
.read(&mut [0]).err().unwrap().kind
, TimedOut
);
1268 s
.set_timeout(Some(20));
1270 match s
.write(&[0; 128 * 1024]) {
1271 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }
) => {}
,
1272 Err(IoError { kind: TimedOut, .. }
) => break,
1273 Err(e
) => panic
!("{}", e
),
1275 if i
== 1000 { panic!("should have filled up?!"); }
1277 assert_eq
!(s
.write(&[0]).err().unwrap().kind
, TimedOut
);
1279 tx
.send(()).unwrap();
1280 s
.set_timeout(None
);
1281 assert_eq
!(s
.read(&mut [0, 0]), Ok(1));
1285 fn read_timeouts() {
1286 let addr
= next_test_ip6();
1287 let mut a
= TcpListener
::bind(addr
).listen().unwrap();
1288 let (tx
, rx
) = channel
::<()>();
1289 thread
::spawn(move|| {
1290 let mut s
= TcpStream
::connect(addr
).unwrap();
1293 while amt
< 100 * 128 * 1024 {
1294 match s
.read(&mut [0;128 * 1024]) {
1295 Ok(n
) => { amt += n; }
1296 Err(e
) => panic
!("{}", e
),
1302 let mut s
= a
.accept().unwrap();
1303 s
.set_read_timeout(Some(20));
1304 assert_eq
!(s
.read(&mut [0]).err().unwrap().kind
, TimedOut
);
1305 assert_eq
!(s
.read(&mut [0]).err().unwrap().kind
, TimedOut
);
1307 tx
.send(()).unwrap();
1309 assert
!(s
.write(&[0;128 * 1024]).is_ok());
1314 fn write_timeouts() {
1315 let addr
= next_test_ip6();
1316 let mut a
= TcpListener
::bind(addr
).listen().unwrap();
1317 let (tx
, rx
) = channel
::<()>();
1318 thread
::spawn(move|| {
1319 let mut s
= TcpStream
::connect(addr
).unwrap();
1321 assert
!(s
.write(&[0]).is_ok());
1325 let mut s
= a
.accept().unwrap();
1326 s
.set_write_timeout(Some(20));
1328 match s
.write(&[0; 128 * 1024]) {
1329 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }
) => {}
,
1330 Err(IoError { kind: TimedOut, .. }
) => break,
1331 Err(e
) => panic
!("{}", e
),
1333 if i
== 1000 { panic!("should have filled up?!"); }
1335 assert_eq
!(s
.write(&[0]).err().unwrap().kind
, TimedOut
);
1337 tx
.send(()).unwrap();
1338 assert
!(s
.read(&mut [0]).is_ok());
1342 fn timeout_concurrent_read() {
1343 let addr
= next_test_ip6();
1344 let mut a
= TcpListener
::bind(addr
).listen().unwrap();
1345 let (tx
, rx
) = channel
::<()>();
1346 thread
::spawn(move|| {
1347 let mut s
= TcpStream
::connect(addr
).unwrap();
1349 assert_eq
!(s
.write(&[0]), Ok(()));
1353 let mut s
= a
.accept().unwrap();
1355 let (tx2
, rx2
) = channel();
1356 let _t
= thread
::spawn(move|| {
1358 assert_eq
!(s2
.read(&mut [0]), Ok(1));
1359 tx2
.send(()).unwrap();
1362 s
.set_read_timeout(Some(20));
1363 assert_eq
!(s
.read(&mut [0]).err().unwrap().kind
, TimedOut
);
1364 tx
.send(()).unwrap();
1366 rx2
.recv().unwrap();
1370 fn clone_while_reading() {
1371 let addr
= next_test_ip6();
1372 let listen
= TcpListener
::bind(addr
);
1373 let mut accept
= listen
.listen().unwrap();
1375 // Enqueue a task to write to a socket
1376 let (tx
, rx
) = channel();
1377 let (txdone
, rxdone
) = channel();
1378 let txdone2
= txdone
.clone();
1379 let _t
= thread
::spawn(move|| {
1380 let mut tcp
= TcpStream
::connect(addr
).unwrap();
1382 tcp
.write_u8(0).unwrap();
1383 txdone2
.send(()).unwrap();
1386 // Spawn off a reading clone
1387 let tcp
= accept
.accept().unwrap();
1388 let tcp2
= tcp
.clone();
1389 let txdone3
= txdone
.clone();
1390 let _t
= thread
::spawn(move|| {
1391 let mut tcp2
= tcp2
;
1392 tcp2
.read_u8().unwrap();
1393 txdone3
.send(()).unwrap();
1396 // Try to ensure that the reading clone is indeed reading
1398 ::thread
::yield_now();
1401 // clone the handle again while it's reading, then let it finish the
1403 let _
= tcp
.clone();
1404 tx
.send(()).unwrap();
1405 rxdone
.recv().unwrap();
1406 rxdone
.recv().unwrap();
1410 fn clone_accept_smoke() {
1411 let addr
= next_test_ip4();
1412 let l
= TcpListener
::bind(addr
);
1413 let mut a
= l
.listen().unwrap();
1414 let mut a2
= a
.clone();
1416 let _t
= thread
::spawn(move|| {
1417 let _
= TcpStream
::connect(addr
);
1419 let _t
= thread
::spawn(move|| {
1420 let _
= TcpStream
::connect(addr
);
1423 assert
!(a
.accept().is_ok());
1424 assert
!(a2
.accept().is_ok());
1428 fn clone_accept_concurrent() {
1429 let addr
= next_test_ip4();
1430 let l
= TcpListener
::bind(addr
);
1431 let a
= l
.listen().unwrap();
1434 let (tx
, rx
) = channel();
1435 let tx2
= tx
.clone();
1437 let _t
= thread
::spawn(move|| {
1439 tx
.send(a
.accept()).unwrap();
1441 let _t
= thread
::spawn(move|| {
1443 tx2
.send(a
.accept()).unwrap();
1446 let _t
= thread
::spawn(move|| {
1447 let _
= TcpStream
::connect(addr
);
1449 let _t
= thread
::spawn(move|| {
1450 let _
= TcpStream
::connect(addr
);
1453 assert
!(rx
.recv().unwrap().is_ok());
1454 assert
!(rx
.recv().unwrap().is_ok());
1458 fn close_accept_smoke() {
1459 let addr
= next_test_ip4();
1460 let l
= TcpListener
::bind(addr
);
1461 let mut a
= l
.listen().unwrap();
1463 a
.close_accept().unwrap();
1464 assert_eq
!(a
.accept().err().unwrap().kind
, EndOfFile
);
1468 fn close_accept_concurrent() {
1469 let addr
= next_test_ip4();
1470 let l
= TcpListener
::bind(addr
);
1471 let a
= l
.listen().unwrap();
1472 let mut a2
= a
.clone();
1474 let (tx
, rx
) = channel();
1475 let _t
= thread
::spawn(move|| {
1477 tx
.send(a
.accept()).unwrap();
1479 a2
.close_accept().unwrap();
1481 assert_eq
!(rx
.recv().unwrap().err().unwrap().kind
, EndOfFile
);