]> git.proxmox.com Git - rustc.git/blame - src/libstd/net/tcp.rs
New upstream version 1.13.0+dfsg1
[rustc.git] / src / libstd / net / tcp.rs
CommitLineData
85aaf69f
SL
1// Copyright 2015 The Rust Project Developers. See the COPYRIGHT
2// file at the top-level directory of this distribution and at
3// http://rust-lang.org/COPYRIGHT.
4//
5// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8// option. This file may not be copied, modified, or distributed
9// except according to those terms.
10
85aaf69f
SL
11use io::prelude::*;
12
d9579d0f 13use fmt;
85aaf69f
SL
14use io;
15use net::{ToSocketAddrs, SocketAddr, Shutdown};
d9579d0f 16use sys_common::net as net_imp;
c1a9b12d 17use sys_common::{AsInner, FromInner, IntoInner};
62682a34 18use time::Duration;
85aaf69f
SL
19
20/// A structure which represents a TCP stream between a local socket and a
21/// remote socket.
22///
23/// The socket will be closed when the value is dropped.
24///
c34b1796 25/// # Examples
85aaf69f
SL
26///
27/// ```no_run
28/// use std::io::prelude::*;
29/// use std::net::TcpStream;
30///
31/// {
32/// let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();
33///
34/// // ignore the Result
35/// let _ = stream.write(&[1]);
36/// let _ = stream.read(&mut [0; 128]); // ignore here too
37/// } // the stream is closed here
38/// ```
c34b1796 39#[stable(feature = "rust1", since = "1.0.0")]
85aaf69f
SL
40pub struct TcpStream(net_imp::TcpStream);
41
42/// A structure representing a socket server.
43///
44/// # Examples
45///
46/// ```no_run
47/// use std::net::{TcpListener, TcpStream};
85aaf69f
SL
48///
49/// let listener = TcpListener::bind("127.0.0.1:80").unwrap();
50///
51/// fn handle_client(stream: TcpStream) {
52/// // ...
53/// }
54///
55/// // accept connections and process them, spawning a new thread for each one
56/// for stream in listener.incoming() {
57/// match stream {
58/// Ok(stream) => {
9e0c209e 59/// handle_client(stream);
85aaf69f
SL
60/// }
61/// Err(e) => { /* connection failed */ }
62/// }
63/// }
85aaf69f 64/// ```
c34b1796 65#[stable(feature = "rust1", since = "1.0.0")]
85aaf69f
SL
66pub struct TcpListener(net_imp::TcpListener);
67
68/// An infinite iterator over the connections from a `TcpListener`.
69///
70/// This iterator will infinitely yield `Some` of the accepted connections. It
71/// is equivalent to calling `accept` in a loop.
5bcae85e
SL
72///
73/// This `struct` is created by the [`incoming`] method on [`TcpListener`].
74///
75/// [`incoming`]: struct.TcpListener.html#method.incoming
76/// [`TcpListener`]: struct.TcpListener.html
c34b1796 77#[stable(feature = "rust1", since = "1.0.0")]
85aaf69f
SL
78pub struct Incoming<'a> { listener: &'a TcpListener }
79
80impl TcpStream {
9346a6ac 81 /// Opens a TCP connection to a remote host.
85aaf69f
SL
82 ///
83 /// `addr` is an address of the remote host. Anything which implements
84 /// `ToSocketAddrs` trait can be supplied for the address; see this trait
85 /// documentation for concrete examples.
a7813a04
XL
86 /// In case `ToSocketAddrs::to_socket_addrs()` returns more than one entry,
87 /// then the first valid and reachable address is used.
c34b1796
AL
88 #[stable(feature = "rust1", since = "1.0.0")]
89 pub fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
85aaf69f
SL
90 super::each_addr(addr, net_imp::TcpStream::connect).map(TcpStream)
91 }
92
93 /// Returns the socket address of the remote peer of this TCP connection.
c34b1796 94 #[stable(feature = "rust1", since = "1.0.0")]
85aaf69f
SL
95 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
96 self.0.peer_addr()
97 }
98
99 /// Returns the socket address of the local half of this TCP connection.
c34b1796
AL
100 #[stable(feature = "rust1", since = "1.0.0")]
101 pub fn local_addr(&self) -> io::Result<SocketAddr> {
85aaf69f
SL
102 self.0.socket_addr()
103 }
104
9346a6ac 105 /// Shuts down the read, write, or both halves of this connection.
85aaf69f
SL
106 ///
107 /// This function will cause all pending and future I/O on the specified
108 /// portions to return immediately with an appropriate value (see the
109 /// documentation of `Shutdown`).
c34b1796 110 #[stable(feature = "rust1", since = "1.0.0")]
85aaf69f
SL
111 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
112 self.0.shutdown(how)
113 }
114
9346a6ac 115 /// Creates a new independently owned handle to the underlying socket.
85aaf69f
SL
116 ///
117 /// The returned `TcpStream` is a reference to the same stream that this
118 /// object references. Both handles will read and write the same stream of
119 /// data, and options set on one stream will be propagated to the other
120 /// stream.
c34b1796 121 #[stable(feature = "rust1", since = "1.0.0")]
85aaf69f
SL
122 pub fn try_clone(&self) -> io::Result<TcpStream> {
123 self.0.duplicate().map(TcpStream)
124 }
125
62682a34
SL
126 /// Sets the read timeout to the timeout specified.
127 ///
128 /// If the value specified is `None`, then `read` calls will block
129 /// indefinitely. It is an error to pass the zero `Duration` to this
130 /// method.
e9174d1e
SL
131 ///
132 /// # Note
133 ///
134 /// Platforms may return a different error code whenever a read times out as
135 /// a result of setting this option. For example Unix typically returns an
136 /// error of the kind `WouldBlock`, but Windows may return `TimedOut`.
137 #[stable(feature = "socket_timeout", since = "1.4.0")]
62682a34
SL
138 pub fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
139 self.0.set_read_timeout(dur)
140 }
141
142 /// Sets the write timeout to the timeout specified.
143 ///
144 /// If the value specified is `None`, then `write` calls will block
145 /// indefinitely. It is an error to pass the zero `Duration` to this
146 /// method.
e9174d1e
SL
147 ///
148 /// # Note
149 ///
150 /// Platforms may return a different error code whenever a write times out
151 /// as a result of setting this option. For example Unix typically returns
152 /// an error of the kind `WouldBlock`, but Windows may return `TimedOut`.
153 #[stable(feature = "socket_timeout", since = "1.4.0")]
62682a34
SL
154 pub fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
155 self.0.set_write_timeout(dur)
156 }
157
158 /// Returns the read timeout of this socket.
159 ///
160 /// If the timeout is `None`, then `read` calls will block indefinitely.
161 ///
162 /// # Note
163 ///
164 /// Some platforms do not provide access to the current timeout.
e9174d1e 165 #[stable(feature = "socket_timeout", since = "1.4.0")]
62682a34
SL
166 pub fn read_timeout(&self) -> io::Result<Option<Duration>> {
167 self.0.read_timeout()
168 }
169
170 /// Returns the write timeout of this socket.
171 ///
172 /// If the timeout is `None`, then `write` calls will block indefinitely.
173 ///
174 /// # Note
175 ///
176 /// Some platforms do not provide access to the current timeout.
e9174d1e 177 #[stable(feature = "socket_timeout", since = "1.4.0")]
62682a34
SL
178 pub fn write_timeout(&self) -> io::Result<Option<Duration>> {
179 self.0.write_timeout()
180 }
54a0048b
SL
181
182 /// Sets the value of the `TCP_NODELAY` option on this socket.
183 ///
184 /// If set, this option disables the Nagle algorithm. This means that
185 /// segments are always sent as soon as possible, even if there is only a
186 /// small amount of data. When not set, data is buffered until there is a
187 /// sufficient amount to send out, thereby avoiding the frequent sending of
188 /// small packets.
189 #[stable(feature = "net2_mutators", since = "1.9.0")]
190 pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
191 self.0.set_nodelay(nodelay)
192 }
193
194 /// Gets the value of the `TCP_NODELAY` option on this socket.
195 ///
196 /// For more information about this option, see [`set_nodelay`][link].
197 ///
198 /// [link]: #method.set_nodelay
199 #[stable(feature = "net2_mutators", since = "1.9.0")]
200 pub fn nodelay(&self) -> io::Result<bool> {
201 self.0.nodelay()
202 }
203
204 /// Sets the value for the `IP_TTL` option on this socket.
205 ///
206 /// This value sets the time-to-live field that is used in every packet sent
207 /// from this socket.
208 #[stable(feature = "net2_mutators", since = "1.9.0")]
209 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
210 self.0.set_ttl(ttl)
211 }
212
213 /// Gets the value of the `IP_TTL` option for this socket.
214 ///
215 /// For more information about this option, see [`set_ttl`][link].
216 ///
217 /// [link]: #method.set_ttl
218 #[stable(feature = "net2_mutators", since = "1.9.0")]
219 pub fn ttl(&self) -> io::Result<u32> {
220 self.0.ttl()
221 }
222
223 /// Get the value of the `SO_ERROR` option on this socket.
224 ///
225 /// This will retrieve the stored error in the underlying socket, clearing
226 /// the field in the process. This can be useful for checking errors between
227 /// calls.
228 #[stable(feature = "net2_mutators", since = "1.9.0")]
229 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
230 self.0.take_error()
231 }
232
233 /// Moves this TCP stream into or out of nonblocking mode.
234 ///
235 /// On Unix this corresponds to calling fcntl, and on Windows this
236 /// corresponds to calling ioctlsocket.
237 #[stable(feature = "net2_mutators", since = "1.9.0")]
238 pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
239 self.0.set_nonblocking(nonblocking)
240 }
85aaf69f
SL
241}
242
c34b1796 243#[stable(feature = "rust1", since = "1.0.0")]
85aaf69f
SL
244impl Read for TcpStream {
245 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.0.read(buf) }
c1a9b12d 246 fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
54a0048b 247 self.0.read_to_end(buf)
c1a9b12d 248 }
85aaf69f 249}
c34b1796 250#[stable(feature = "rust1", since = "1.0.0")]
85aaf69f
SL
251impl Write for TcpStream {
252 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self.0.write(buf) }
253 fn flush(&mut self) -> io::Result<()> { Ok(()) }
254}
c34b1796 255#[stable(feature = "rust1", since = "1.0.0")]
85aaf69f
SL
256impl<'a> Read for &'a TcpStream {
257 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.0.read(buf) }
c1a9b12d 258 fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
54a0048b 259 self.0.read_to_end(buf)
c1a9b12d 260 }
85aaf69f 261}
c34b1796 262#[stable(feature = "rust1", since = "1.0.0")]
85aaf69f
SL
263impl<'a> Write for &'a TcpStream {
264 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self.0.write(buf) }
265 fn flush(&mut self) -> io::Result<()> { Ok(()) }
266}
267
268impl AsInner<net_imp::TcpStream> for TcpStream {
269 fn as_inner(&self) -> &net_imp::TcpStream { &self.0 }
270}
271
c34b1796
AL
272impl FromInner<net_imp::TcpStream> for TcpStream {
273 fn from_inner(inner: net_imp::TcpStream) -> TcpStream { TcpStream(inner) }
274}
275
c1a9b12d
SL
276impl IntoInner<net_imp::TcpStream> for TcpStream {
277 fn into_inner(self) -> net_imp::TcpStream { self.0 }
278}
279
92a42be0 280#[stable(feature = "rust1", since = "1.0.0")]
d9579d0f
AL
281impl fmt::Debug for TcpStream {
282 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
283 self.0.fmt(f)
284 }
285}
286
85aaf69f
SL
287impl TcpListener {
288 /// Creates a new `TcpListener` which will be bound to the specified
289 /// address.
290 ///
291 /// The returned listener is ready for accepting connections.
292 ///
293 /// Binding with a port number of 0 will request that the OS assigns a port
294 /// to this listener. The port allocated can be queried via the
92a42be0 295 /// `local_addr` method.
85aaf69f 296 ///
b039eaaf 297 /// The address type can be any implementor of `ToSocketAddrs` trait. See
85aaf69f 298 /// its documentation for concrete examples.
c34b1796
AL
299 #[stable(feature = "rust1", since = "1.0.0")]
300 pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> {
85aaf69f
SL
301 super::each_addr(addr, net_imp::TcpListener::bind).map(TcpListener)
302 }
303
304 /// Returns the local socket address of this listener.
c34b1796
AL
305 #[stable(feature = "rust1", since = "1.0.0")]
306 pub fn local_addr(&self) -> io::Result<SocketAddr> {
85aaf69f
SL
307 self.0.socket_addr()
308 }
309
9346a6ac 310 /// Creates a new independently owned handle to the underlying socket.
85aaf69f
SL
311 ///
312 /// The returned `TcpListener` is a reference to the same socket that this
313 /// object references. Both handles can be used to accept incoming
314 /// connections and options set on one listener will affect the other.
c34b1796 315 #[stable(feature = "rust1", since = "1.0.0")]
85aaf69f
SL
316 pub fn try_clone(&self) -> io::Result<TcpListener> {
317 self.0.duplicate().map(TcpListener)
318 }
319
320 /// Accept a new incoming connection from this listener.
321 ///
322 /// This function will block the calling thread until a new TCP connection
323 /// is established. When established, the corresponding `TcpStream` and the
324 /// remote peer's address will be returned.
c34b1796 325 #[stable(feature = "rust1", since = "1.0.0")]
85aaf69f
SL
326 pub fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
327 self.0.accept().map(|(a, b)| (TcpStream(a), b))
328 }
329
330 /// Returns an iterator over the connections being received on this
331 /// listener.
332 ///
9346a6ac 333 /// The returned iterator will never return `None` and will also not yield
85aaf69f 334 /// the peer's `SocketAddr` structure.
c34b1796 335 #[stable(feature = "rust1", since = "1.0.0")]
85aaf69f
SL
336 pub fn incoming(&self) -> Incoming {
337 Incoming { listener: self }
338 }
54a0048b
SL
339
340 /// Sets the value for the `IP_TTL` option on this socket.
341 ///
342 /// This value sets the time-to-live field that is used in every packet sent
343 /// from this socket.
344 #[stable(feature = "net2_mutators", since = "1.9.0")]
345 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
346 self.0.set_ttl(ttl)
347 }
348
349 /// Gets the value of the `IP_TTL` option for this socket.
350 ///
351 /// For more information about this option, see [`set_ttl`][link].
352 ///
353 /// [link]: #method.set_ttl
354 #[stable(feature = "net2_mutators", since = "1.9.0")]
355 pub fn ttl(&self) -> io::Result<u32> {
356 self.0.ttl()
357 }
358
359 /// Sets the value for the `IPV6_V6ONLY` option on this socket.
360 ///
361 /// If this is set to `true` then the socket is restricted to sending and
362 /// receiving IPv6 packets only. In this case two IPv4 and IPv6 applications
363 /// can bind the same port at the same time.
364 ///
365 /// If this is set to `false` then the socket can be used to send and
366 /// receive packets from an IPv4-mapped IPv6 address.
367 #[stable(feature = "net2_mutators", since = "1.9.0")]
368 pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
369 self.0.set_only_v6(only_v6)
370 }
371
372 /// Gets the value of the `IPV6_V6ONLY` option for this socket.
373 ///
374 /// For more information about this option, see [`set_only_v6`][link].
375 ///
376 /// [link]: #method.set_only_v6
377 #[stable(feature = "net2_mutators", since = "1.9.0")]
378 pub fn only_v6(&self) -> io::Result<bool> {
379 self.0.only_v6()
380 }
381
382 /// Get the value of the `SO_ERROR` option on this socket.
383 ///
384 /// This will retrieve the stored error in the underlying socket, clearing
385 /// the field in the process. This can be useful for checking errors between
386 /// calls.
387 #[stable(feature = "net2_mutators", since = "1.9.0")]
388 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
389 self.0.take_error()
390 }
391
392 /// Moves this TCP stream into or out of nonblocking mode.
393 ///
394 /// On Unix this corresponds to calling fcntl, and on Windows this
395 /// corresponds to calling ioctlsocket.
396 #[stable(feature = "net2_mutators", since = "1.9.0")]
397 pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
398 self.0.set_nonblocking(nonblocking)
399 }
85aaf69f
SL
400}
401
c34b1796 402#[stable(feature = "rust1", since = "1.0.0")]
85aaf69f
SL
403impl<'a> Iterator for Incoming<'a> {
404 type Item = io::Result<TcpStream>;
405 fn next(&mut self) -> Option<io::Result<TcpStream>> {
406 Some(self.listener.accept().map(|p| p.0))
407 }
408}
409
410impl AsInner<net_imp::TcpListener> for TcpListener {
411 fn as_inner(&self) -> &net_imp::TcpListener { &self.0 }
412}
413
c34b1796
AL
414impl FromInner<net_imp::TcpListener> for TcpListener {
415 fn from_inner(inner: net_imp::TcpListener) -> TcpListener {
416 TcpListener(inner)
417 }
418}
419
c1a9b12d
SL
420impl IntoInner<net_imp::TcpListener> for TcpListener {
421 fn into_inner(self) -> net_imp::TcpListener { self.0 }
422}
423
92a42be0 424#[stable(feature = "rust1", since = "1.0.0")]
d9579d0f
AL
425impl fmt::Debug for TcpListener {
426 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
427 self.0.fmt(f)
428 }
429}
430
85aaf69f
SL
431#[cfg(test)]
432mod tests {
85aaf69f
SL
433 use io::ErrorKind;
434 use io::prelude::*;
435 use net::*;
436 use net::test::{next_test_ip4, next_test_ip6};
437 use sync::mpsc::channel;
d9579d0f 438 use sys_common::AsInner;
9cc50fc6 439 use time::{Instant, Duration};
85aaf69f
SL
440 use thread;
441
442 fn each_ip(f: &mut FnMut(SocketAddr)) {
443 f(next_test_ip4());
444 f(next_test_ip6());
445 }
446
447 macro_rules! t {
448 ($e:expr) => {
449 match $e {
450 Ok(t) => t,
451 Err(e) => panic!("received error for `{}`: {}", stringify!($e), e),
452 }
453 }
454 }
455
85aaf69f
SL
456 #[test]
457 fn bind_error() {
c34b1796 458 match TcpListener::bind("1.1.1.1:9999") {
85aaf69f 459 Ok(..) => panic!(),
c34b1796
AL
460 Err(e) =>
461 assert_eq!(e.kind(), ErrorKind::AddrNotAvailable),
85aaf69f
SL
462 }
463 }
464
465 #[test]
466 fn connect_error() {
467 match TcpStream::connect("0.0.0.0:1") {
468 Ok(..) => panic!(),
c34b1796
AL
469 Err(e) => assert!(e.kind() == ErrorKind::ConnectionRefused ||
470 e.kind() == ErrorKind::InvalidInput ||
471 e.kind() == ErrorKind::AddrInUse ||
472 e.kind() == ErrorKind::AddrNotAvailable,
473 "bad error: {} {:?}", e, e.kind()),
85aaf69f
SL
474 }
475 }
476
477 #[test]
478 fn listen_localhost() {
479 let socket_addr = next_test_ip4();
480 let listener = t!(TcpListener::bind(&socket_addr));
481
482 let _t = thread::spawn(move || {
483 let mut stream = t!(TcpStream::connect(&("localhost",
484 socket_addr.port())));
485 t!(stream.write(&[144]));
486 });
487
488 let mut stream = t!(listener.accept()).0;
489 let mut buf = [0];
490 t!(stream.read(&mut buf));
491 assert!(buf[0] == 144);
492 }
493
494 #[test]
b039eaaf
SL
495 fn connect_loopback() {
496 each_ip(&mut |addr| {
497 let acceptor = t!(TcpListener::bind(&addr));
85aaf69f 498
b039eaaf
SL
499 let _t = thread::spawn(move|| {
500 let host = match addr {
501 SocketAddr::V4(..) => "127.0.0.1",
502 SocketAddr::V6(..) => "::1",
503 };
504 let mut stream = t!(TcpStream::connect(&(host, addr.port())));
505 t!(stream.write(&[66]));
506 });
85aaf69f 507
b039eaaf
SL
508 let mut stream = t!(acceptor.accept()).0;
509 let mut buf = [0];
510 t!(stream.read(&mut buf));
511 assert!(buf[0] == 66);
512 })
85aaf69f
SL
513 }
514
515 #[test]
b039eaaf 516 fn smoke_test() {
85aaf69f
SL
517 each_ip(&mut |addr| {
518 let acceptor = t!(TcpListener::bind(&addr));
519
520 let (tx, rx) = channel();
521 let _t = thread::spawn(move|| {
522 let mut stream = t!(TcpStream::connect(&addr));
523 t!(stream.write(&[99]));
c34b1796 524 tx.send(t!(stream.local_addr())).unwrap();
85aaf69f
SL
525 });
526
527 let (mut stream, addr) = t!(acceptor.accept());
528 let mut buf = [0];
529 t!(stream.read(&mut buf));
530 assert!(buf[0] == 99);
531 assert_eq!(addr, t!(rx.recv()));
532 })
533 }
534
535 #[test]
b039eaaf 536 fn read_eof() {
85aaf69f
SL
537 each_ip(&mut |addr| {
538 let acceptor = t!(TcpListener::bind(&addr));
539
540 let _t = thread::spawn(move|| {
541 let _stream = t!(TcpStream::connect(&addr));
542 // Close
543 });
544
545 let mut stream = t!(acceptor.accept()).0;
546 let mut buf = [0];
547 let nread = t!(stream.read(&mut buf));
548 assert_eq!(nread, 0);
549 let nread = t!(stream.read(&mut buf));
550 assert_eq!(nread, 0);
551 })
552 }
553
554 #[test]
555 fn write_close() {
556 each_ip(&mut |addr| {
557 let acceptor = t!(TcpListener::bind(&addr));
558
559 let (tx, rx) = channel();
560 let _t = thread::spawn(move|| {
561 drop(t!(TcpStream::connect(&addr)));
562 tx.send(()).unwrap();
563 });
564
565 let mut stream = t!(acceptor.accept()).0;
566 rx.recv().unwrap();
567 let buf = [0];
568 match stream.write(&buf) {
569 Ok(..) => {}
570 Err(e) => {
571 assert!(e.kind() == ErrorKind::ConnectionReset ||
572 e.kind() == ErrorKind::BrokenPipe ||
573 e.kind() == ErrorKind::ConnectionAborted,
574 "unknown error: {}", e);
575 }
576 }
577 })
578 }
579
580 #[test]
b039eaaf 581 fn multiple_connect_serial() {
85aaf69f
SL
582 each_ip(&mut |addr| {
583 let max = 10;
584 let acceptor = t!(TcpListener::bind(&addr));
585
586 let _t = thread::spawn(move|| {
587 for _ in 0..max {
588 let mut stream = t!(TcpStream::connect(&addr));
589 t!(stream.write(&[99]));
590 }
591 });
592
593 for stream in acceptor.incoming().take(max) {
594 let mut stream = t!(stream);
595 let mut buf = [0];
596 t!(stream.read(&mut buf));
597 assert_eq!(buf[0], 99);
598 }
599 })
600 }
601
602 #[test]
603 fn multiple_connect_interleaved_greedy_schedule() {
62682a34 604 const MAX: usize = 10;
85aaf69f
SL
605 each_ip(&mut |addr| {
606 let acceptor = t!(TcpListener::bind(&addr));
607
608 let _t = thread::spawn(move|| {
609 let acceptor = acceptor;
610 for (i, stream) in acceptor.incoming().enumerate().take(MAX) {
bd371182 611 // Start another thread to handle the connection
85aaf69f
SL
612 let _t = thread::spawn(move|| {
613 let mut stream = t!(stream);
614 let mut buf = [0];
615 t!(stream.read(&mut buf));
616 assert!(buf[0] == i as u8);
617 });
618 }
619 });
620
621 connect(0, addr);
622 });
623
624 fn connect(i: usize, addr: SocketAddr) {
625 if i == MAX { return }
626
627 let t = thread::spawn(move|| {
628 let mut stream = t!(TcpStream::connect(&addr));
629 // Connect again before writing
630 connect(i + 1, addr);
631 t!(stream.write(&[i as u8]));
632 });
633 t.join().ok().unwrap();
634 }
635 }
636
637 #[test]
b039eaaf 638 fn multiple_connect_interleaved_lazy_schedule() {
c34b1796 639 const MAX: usize = 10;
85aaf69f
SL
640 each_ip(&mut |addr| {
641 let acceptor = t!(TcpListener::bind(&addr));
642
643 let _t = thread::spawn(move|| {
644 for stream in acceptor.incoming().take(MAX) {
bd371182 645 // Start another thread to handle the connection
85aaf69f
SL
646 let _t = thread::spawn(move|| {
647 let mut stream = t!(stream);
648 let mut buf = [0];
649 t!(stream.read(&mut buf));
650 assert!(buf[0] == 99);
651 });
652 }
653 });
654
655 connect(0, addr);
656 });
657
658 fn connect(i: usize, addr: SocketAddr) {
659 if i == MAX { return }
660
661 let t = thread::spawn(move|| {
662 let mut stream = t!(TcpStream::connect(&addr));
663 connect(i + 1, addr);
664 t!(stream.write(&[99]));
665 });
666 t.join().ok().unwrap();
667 }
668 }
669
85aaf69f 670 #[test]
b039eaaf 671 fn socket_and_peer_name() {
85aaf69f
SL
672 each_ip(&mut |addr| {
673 let listener = t!(TcpListener::bind(&addr));
c34b1796 674 let so_name = t!(listener.local_addr());
85aaf69f
SL
675 assert_eq!(addr, so_name);
676 let _t = thread::spawn(move|| {
677 t!(listener.accept());
678 });
679
680 let stream = t!(TcpStream::connect(&addr));
681 assert_eq!(addr, t!(stream.peer_addr()));
682 })
683 }
684
685 #[test]
686 fn partial_read() {
687 each_ip(&mut |addr| {
688 let (tx, rx) = channel();
689 let srv = t!(TcpListener::bind(&addr));
690 let _t = thread::spawn(move|| {
691 let mut cl = t!(srv.accept()).0;
692 cl.write(&[10]).unwrap();
693 let mut b = [0];
694 t!(cl.read(&mut b));
695 tx.send(()).unwrap();
696 });
697
698 let mut c = t!(TcpStream::connect(&addr));
699 let mut b = [0; 10];
c34b1796 700 assert_eq!(c.read(&mut b).unwrap(), 1);
85aaf69f
SL
701 t!(c.write(&[1]));
702 rx.recv().unwrap();
703 })
704 }
705
706 #[test]
707 fn double_bind() {
708 each_ip(&mut |addr| {
709 let _listener = t!(TcpListener::bind(&addr));
710 match TcpListener::bind(&addr) {
711 Ok(..) => panic!(),
712 Err(e) => {
713 assert!(e.kind() == ErrorKind::ConnectionRefused ||
c34b1796
AL
714 e.kind() == ErrorKind::Other ||
715 e.kind() == ErrorKind::AddrInUse,
85aaf69f
SL
716 "unknown error: {} {:?}", e, e.kind());
717 }
718 }
719 })
720 }
721
722 #[test]
723 fn fast_rebind() {
724 each_ip(&mut |addr| {
725 let acceptor = t!(TcpListener::bind(&addr));
726
727 let _t = thread::spawn(move|| {
728 t!(TcpStream::connect(&addr));
729 });
730
731 t!(acceptor.accept());
732 drop(acceptor);
733 t!(TcpListener::bind(&addr));
734 });
735 }
736
737 #[test]
738 fn tcp_clone_smoke() {
739 each_ip(&mut |addr| {
740 let acceptor = t!(TcpListener::bind(&addr));
741
742 let _t = thread::spawn(move|| {
743 let mut s = t!(TcpStream::connect(&addr));
744 let mut buf = [0, 0];
c34b1796 745 assert_eq!(s.read(&mut buf).unwrap(), 1);
85aaf69f
SL
746 assert_eq!(buf[0], 1);
747 t!(s.write(&[2]));
748 });
749
750 let mut s1 = t!(acceptor.accept()).0;
751 let s2 = t!(s1.try_clone());
752
753 let (tx1, rx1) = channel();
754 let (tx2, rx2) = channel();
755 let _t = thread::spawn(move|| {
756 let mut s2 = s2;
757 rx1.recv().unwrap();
758 t!(s2.write(&[1]));
759 tx2.send(()).unwrap();
760 });
761 tx1.send(()).unwrap();
762 let mut buf = [0, 0];
c34b1796 763 assert_eq!(s1.read(&mut buf).unwrap(), 1);
85aaf69f
SL
764 rx2.recv().unwrap();
765 })
766 }
767
768 #[test]
769 fn tcp_clone_two_read() {
770 each_ip(&mut |addr| {
771 let acceptor = t!(TcpListener::bind(&addr));
772 let (tx1, rx) = channel();
773 let tx2 = tx1.clone();
774
775 let _t = thread::spawn(move|| {
776 let mut s = t!(TcpStream::connect(&addr));
777 t!(s.write(&[1]));
778 rx.recv().unwrap();
779 t!(s.write(&[2]));
780 rx.recv().unwrap();
781 });
782
783 let mut s1 = t!(acceptor.accept()).0;
784 let s2 = t!(s1.try_clone());
785
786 let (done, rx) = channel();
787 let _t = thread::spawn(move|| {
788 let mut s2 = s2;
789 let mut buf = [0, 0];
790 t!(s2.read(&mut buf));
791 tx2.send(()).unwrap();
792 done.send(()).unwrap();
793 });
794 let mut buf = [0, 0];
795 t!(s1.read(&mut buf));
796 tx1.send(()).unwrap();
797
798 rx.recv().unwrap();
799 })
800 }
801
802 #[test]
803 fn tcp_clone_two_write() {
804 each_ip(&mut |addr| {
805 let acceptor = t!(TcpListener::bind(&addr));
806
807 let _t = thread::spawn(move|| {
808 let mut s = t!(TcpStream::connect(&addr));
809 let mut buf = [0, 1];
810 t!(s.read(&mut buf));
811 t!(s.read(&mut buf));
812 });
813
814 let mut s1 = t!(acceptor.accept()).0;
815 let s2 = t!(s1.try_clone());
816
817 let (done, rx) = channel();
818 let _t = thread::spawn(move|| {
819 let mut s2 = s2;
820 t!(s2.write(&[1]));
821 done.send(()).unwrap();
822 });
823 t!(s1.write(&[2]));
824
825 rx.recv().unwrap();
826 })
827 }
828
829 #[test]
830 fn shutdown_smoke() {
831 each_ip(&mut |addr| {
832 let a = t!(TcpListener::bind(&addr));
833 let _t = thread::spawn(move|| {
834 let mut c = t!(a.accept()).0;
835 let mut b = [0];
c34b1796 836 assert_eq!(c.read(&mut b).unwrap(), 0);
85aaf69f
SL
837 t!(c.write(&[1]));
838 });
839
840 let mut s = t!(TcpStream::connect(&addr));
841 t!(s.shutdown(Shutdown::Write));
842 assert!(s.write(&[1]).is_err());
843 let mut b = [0, 0];
844 assert_eq!(t!(s.read(&mut b)), 1);
845 assert_eq!(b[0], 1);
846 })
847 }
848
849 #[test]
850 fn close_readwrite_smoke() {
851 each_ip(&mut |addr| {
852 let a = t!(TcpListener::bind(&addr));
853 let (tx, rx) = channel::<()>();
854 let _t = thread::spawn(move|| {
855 let _s = t!(a.accept());
856 let _ = rx.recv();
857 });
858
859 let mut b = [0];
860 let mut s = t!(TcpStream::connect(&addr));
861 let mut s2 = t!(s.try_clone());
862
863 // closing should prevent reads/writes
864 t!(s.shutdown(Shutdown::Write));
865 assert!(s.write(&[0]).is_err());
866 t!(s.shutdown(Shutdown::Read));
c34b1796 867 assert_eq!(s.read(&mut b).unwrap(), 0);
85aaf69f
SL
868
869 // closing should affect previous handles
870 assert!(s2.write(&[0]).is_err());
c34b1796 871 assert_eq!(s2.read(&mut b).unwrap(), 0);
85aaf69f
SL
872
873 // closing should affect new handles
874 let mut s3 = t!(s.try_clone());
875 assert!(s3.write(&[0]).is_err());
c34b1796 876 assert_eq!(s3.read(&mut b).unwrap(), 0);
85aaf69f
SL
877
878 // make sure these don't die
879 let _ = s2.shutdown(Shutdown::Read);
880 let _ = s2.shutdown(Shutdown::Write);
881 let _ = s3.shutdown(Shutdown::Read);
882 let _ = s3.shutdown(Shutdown::Write);
883 drop(tx);
884 })
885 }
886
887 #[test]
888 fn close_read_wakes_up() {
889 each_ip(&mut |addr| {
890 let a = t!(TcpListener::bind(&addr));
891 let (tx1, rx) = channel::<()>();
892 let _t = thread::spawn(move|| {
893 let _s = t!(a.accept());
894 let _ = rx.recv();
895 });
896
897 let s = t!(TcpStream::connect(&addr));
898 let s2 = t!(s.try_clone());
899 let (tx, rx) = channel();
900 let _t = thread::spawn(move|| {
901 let mut s2 = s2;
902 assert_eq!(t!(s2.read(&mut [0])), 0);
903 tx.send(()).unwrap();
904 });
bd371182 905 // this should wake up the child thread
85aaf69f
SL
906 t!(s.shutdown(Shutdown::Read));
907
908 // this test will never finish if the child doesn't wake up
909 rx.recv().unwrap();
910 drop(tx1);
911 })
912 }
913
914 #[test]
915 fn clone_while_reading() {
916 each_ip(&mut |addr| {
917 let accept = t!(TcpListener::bind(&addr));
918
bd371182 919 // Enqueue a thread to write to a socket
85aaf69f
SL
920 let (tx, rx) = channel();
921 let (txdone, rxdone) = channel();
922 let txdone2 = txdone.clone();
923 let _t = thread::spawn(move|| {
924 let mut tcp = t!(TcpStream::connect(&addr));
925 rx.recv().unwrap();
926 t!(tcp.write(&[0]));
927 txdone2.send(()).unwrap();
928 });
929
930 // Spawn off a reading clone
931 let tcp = t!(accept.accept()).0;
932 let tcp2 = t!(tcp.try_clone());
933 let txdone3 = txdone.clone();
934 let _t = thread::spawn(move|| {
935 let mut tcp2 = tcp2;
936 t!(tcp2.read(&mut [0]));
937 txdone3.send(()).unwrap();
938 });
939
940 // Try to ensure that the reading clone is indeed reading
941 for _ in 0..50 {
942 thread::yield_now();
943 }
944
945 // clone the handle again while it's reading, then let it finish the
946 // read.
947 let _ = t!(tcp.try_clone());
948 tx.send(()).unwrap();
949 rxdone.recv().unwrap();
950 rxdone.recv().unwrap();
951 })
952 }
953
954 #[test]
955 fn clone_accept_smoke() {
956 each_ip(&mut |addr| {
957 let a = t!(TcpListener::bind(&addr));
958 let a2 = t!(a.try_clone());
959
960 let _t = thread::spawn(move|| {
961 let _ = TcpStream::connect(&addr);
962 });
963 let _t = thread::spawn(move|| {
964 let _ = TcpStream::connect(&addr);
965 });
966
967 t!(a.accept());
968 t!(a2.accept());
969 })
970 }
971
972 #[test]
973 fn clone_accept_concurrent() {
974 each_ip(&mut |addr| {
975 let a = t!(TcpListener::bind(&addr));
976 let a2 = t!(a.try_clone());
977
978 let (tx, rx) = channel();
979 let tx2 = tx.clone();
980
981 let _t = thread::spawn(move|| {
982 tx.send(t!(a.accept())).unwrap();
983 });
984 let _t = thread::spawn(move|| {
985 tx2.send(t!(a2.accept())).unwrap();
986 });
987
988 let _t = thread::spawn(move|| {
989 let _ = TcpStream::connect(&addr);
990 });
991 let _t = thread::spawn(move|| {
992 let _ = TcpStream::connect(&addr);
993 });
994
995 rx.recv().unwrap();
996 rx.recv().unwrap();
997 })
998 }
d9579d0f
AL
999
1000 #[test]
1001 fn debug() {
1002 let name = if cfg!(windows) {"socket"} else {"fd"};
1003 let socket_addr = next_test_ip4();
1004
1005 let listener = t!(TcpListener::bind(&socket_addr));
1006 let listener_inner = listener.0.socket().as_inner();
1007 let compare = format!("TcpListener {{ addr: {:?}, {}: {:?} }}",
1008 socket_addr, name, listener_inner);
1009 assert_eq!(format!("{:?}", listener), compare);
1010
62682a34 1011 let stream = t!(TcpStream::connect(&("localhost",
d9579d0f
AL
1012 socket_addr.port())));
1013 let stream_inner = stream.0.socket().as_inner();
1014 let compare = format!("TcpStream {{ addr: {:?}, \
1015 peer: {:?}, {}: {:?} }}",
1016 stream.local_addr().unwrap(),
1017 stream.peer_addr().unwrap(),
1018 name,
1019 stream_inner);
1020 assert_eq!(format!("{:?}", stream), compare);
1021 }
62682a34
SL
1022
1023 // FIXME: re-enabled bitrig/openbsd tests once their socket timeout code
1024 // no longer has rounding errors.
c1a9b12d 1025 #[cfg_attr(any(target_os = "bitrig", target_os = "netbsd", target_os = "openbsd"), ignore)]
62682a34
SL
1026 #[test]
1027 fn timeouts() {
1028 let addr = next_test_ip4();
1029 let listener = t!(TcpListener::bind(&addr));
1030
1031 let stream = t!(TcpStream::connect(&("localhost", addr.port())));
1032 let dur = Duration::new(15410, 0);
1033
1034 assert_eq!(None, t!(stream.read_timeout()));
1035
1036 t!(stream.set_read_timeout(Some(dur)));
1037 assert_eq!(Some(dur), t!(stream.read_timeout()));
1038
1039 assert_eq!(None, t!(stream.write_timeout()));
1040
1041 t!(stream.set_write_timeout(Some(dur)));
1042 assert_eq!(Some(dur), t!(stream.write_timeout()));
1043
1044 t!(stream.set_read_timeout(None));
1045 assert_eq!(None, t!(stream.read_timeout()));
1046
1047 t!(stream.set_write_timeout(None));
1048 assert_eq!(None, t!(stream.write_timeout()));
9cc50fc6 1049 drop(listener);
62682a34
SL
1050 }
1051
1052 #[test]
1053 fn test_read_timeout() {
1054 let addr = next_test_ip4();
1055 let listener = t!(TcpListener::bind(&addr));
1056
1057 let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
1058 t!(stream.set_read_timeout(Some(Duration::from_millis(1000))));
1059
1060 let mut buf = [0; 10];
9cc50fc6
SL
1061 let start = Instant::now();
1062 let kind = stream.read(&mut buf).err().expect("expected error").kind();
1063 assert!(kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut);
1064 assert!(start.elapsed() > Duration::from_millis(400));
1065 drop(listener);
62682a34
SL
1066 }
1067
1068 #[test]
1069 fn test_read_with_timeout() {
1070 let addr = next_test_ip4();
1071 let listener = t!(TcpListener::bind(&addr));
1072
1073 let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
1074 t!(stream.set_read_timeout(Some(Duration::from_millis(1000))));
1075
1076 let mut other_end = t!(listener.accept()).0;
1077 t!(other_end.write_all(b"hello world"));
1078
1079 let mut buf = [0; 11];
1080 t!(stream.read(&mut buf));
1081 assert_eq!(b"hello world", &buf[..]);
1082
9cc50fc6
SL
1083 let start = Instant::now();
1084 let kind = stream.read(&mut buf).err().expect("expected error").kind();
1085 assert!(kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut);
1086 assert!(start.elapsed() > Duration::from_millis(400));
1087 drop(listener);
62682a34 1088 }
54a0048b
SL
1089
1090 #[test]
1091 fn nodelay() {
1092 let addr = next_test_ip4();
1093 let _listener = t!(TcpListener::bind(&addr));
1094
1095 let stream = t!(TcpStream::connect(&("localhost", addr.port())));
1096
1097 assert_eq!(false, t!(stream.nodelay()));
1098 t!(stream.set_nodelay(true));
1099 assert_eq!(true, t!(stream.nodelay()));
1100 t!(stream.set_nodelay(false));
1101 assert_eq!(false, t!(stream.nodelay()));
1102 }
1103
1104 #[test]
1105 fn ttl() {
1106 let ttl = 100;
1107
1108 let addr = next_test_ip4();
1109 let listener = t!(TcpListener::bind(&addr));
1110
1111 t!(listener.set_ttl(ttl));
1112 assert_eq!(ttl, t!(listener.ttl()));
1113
1114 let stream = t!(TcpStream::connect(&("localhost", addr.port())));
1115
1116 t!(stream.set_ttl(ttl));
1117 assert_eq!(ttl, t!(stream.ttl()));
1118 }
1119
1120 #[test]
1121 fn set_nonblocking() {
1122 let addr = next_test_ip4();
1123 let listener = t!(TcpListener::bind(&addr));
1124
1125 t!(listener.set_nonblocking(true));
1126 t!(listener.set_nonblocking(false));
1127
1128 let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
1129
1130 t!(stream.set_nonblocking(false));
1131 t!(stream.set_nonblocking(true));
1132
1133 let mut buf = [0];
1134 match stream.read(&mut buf) {
1135 Ok(_) => panic!("expected error"),
1136 Err(ref e) if e.kind() == ErrorKind::WouldBlock => {}
1137 Err(e) => panic!("unexpected error {}", e),
1138 }
1139 }
85aaf69f 1140}