]> git.proxmox.com Git - rustc.git/blob - src/libstd/old_io/net/tcp.rs
Imported Upstream version 1.0.0~beta
[rustc.git] / src / libstd / old_io / net / tcp.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! TCP network connections
12 //!
13 //! This module contains the ability to open a TCP stream to a socket address,
14 //! as well as creating a socket server to accept incoming connections. The
15 //! destination and binding addresses can either be an IPv4 or IPv6 address.
16 //!
17 //! A TCP connection implements the `Reader` and `Writer` traits, while the TCP
18 //! listener (socket server) implements the `Listener` and `Acceptor` traits.
19
20 use clone::Clone;
21 use old_io::IoResult;
22 use result::Result::Err;
23 use old_io::net::ip::{SocketAddr, ToSocketAddr};
24 use old_io::{Reader, Writer, Listener, Acceptor};
25 use old_io::{standard_error, TimedOut};
26 use option::Option;
27 use option::Option::{None, Some};
28 use time::Duration;
29
30 use sys::tcp::TcpStream as TcpStreamImp;
31 use sys::tcp::TcpListener as TcpListenerImp;
32 use sys::tcp::TcpAcceptor as TcpAcceptorImp;
33
34 use sys_common;
35
36 /// A structure which represents a TCP stream between a local socket and a
37 /// remote socket.
38 ///
39 /// The socket will be closed when the value is dropped.
40 ///
41 /// # Examples
42 ///
43 /// ```no_run
44 /// # #![feature(old_io, io)]
45 /// use std::old_io::*;
46 ///
47 /// {
48 /// let mut stream = TcpStream::connect("127.0.0.1:34254");
49 ///
50 /// // ignore the Result
51 /// let _ = stream.write(&[1]);
52 ///
53 /// let mut buf = [0];
54 /// let _ = stream.read(&mut buf); // ignore here too
55 /// } // the stream is closed here
56 /// ```
57 pub struct TcpStream {
58 inner: TcpStreamImp,
59 }
60
61 impl TcpStream {
62 fn new(s: TcpStreamImp) -> TcpStream {
63 TcpStream { inner: s }
64 }
65
66 /// Open a TCP connection to a remote host.
67 ///
68 /// `addr` is an address of the remote host. Anything which implements `ToSocketAddr`
69 /// trait can be supplied for the address; see this trait documentation for
70 /// concrete examples.
71 pub fn connect<A: ToSocketAddr>(addr: A) -> IoResult<TcpStream> {
72 super::with_addresses(addr, |addr| {
73 TcpStreamImp::connect(addr, None).map(TcpStream::new)
74 })
75 }
76
77 /// Creates a TCP connection to a remote socket address, timing out after
78 /// the specified duration.
79 ///
80 /// This is the same as the `connect` method, except that if the timeout
81 /// specified elapses before a connection is made an error will be
82 /// returned. The error's kind will be `TimedOut`.
83 ///
84 /// Same as the `connect` method, `addr` argument type can be anything which
85 /// implements `ToSocketAddr` trait.
86 ///
87 /// If a `timeout` with zero or negative duration is specified then
88 /// the function returns `Err`, with the error kind set to `TimedOut`.
89 #[unstable(feature = "io",
90 reason = "the timeout argument may eventually change types")]
91 pub fn connect_timeout<A: ToSocketAddr>(addr: A,
92 timeout: Duration) -> IoResult<TcpStream> {
93 if timeout <= Duration::milliseconds(0) {
94 return Err(standard_error(TimedOut));
95 }
96
97 super::with_addresses(addr, |addr| {
98 TcpStreamImp::connect(addr, Some(timeout.num_milliseconds() as u64))
99 .map(TcpStream::new)
100 })
101 }
102
103 /// Returns the socket address of the remote peer of this TCP connection.
104 pub fn peer_name(&mut self) -> IoResult<SocketAddr> {
105 self.inner.peer_name()
106 }
107
108 /// Returns the socket address of the local half of this TCP connection.
109 pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
110 self.inner.socket_name()
111 }
112
113 /// Sets the nodelay flag on this connection to the boolean specified
114 #[unstable(feature = "io")]
115 pub fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
116 self.inner.set_nodelay(nodelay)
117 }
118
119 /// Sets the keepalive timeout to the timeout specified.
120 ///
121 /// If the value specified is `None`, then the keepalive flag is cleared on
122 /// this connection. Otherwise, the keepalive timeout will be set to the
123 /// specified time, in seconds.
124 #[unstable(feature = "io")]
125 pub fn set_keepalive(&mut self, delay_in_seconds: Option<usize>) -> IoResult<()> {
126 self.inner.set_keepalive(delay_in_seconds)
127 }
128
129 /// Closes the reading half of this connection.
130 ///
131 /// This method will close the reading portion of this connection, causing
132 /// all pending and future reads to immediately return with an error.
133 ///
134 /// # Examples
135 ///
136 /// ```no_run
137 /// # #![feature(old_io, std_misc)]
138 /// # #![allow(unused_must_use)]
139 /// use std::old_io::*;
140 /// use std::time::Duration;
141 /// use std::thread;
142 ///
143 /// let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();
144 /// let stream2 = stream.clone();
145 ///
146 /// let _t = thread::spawn(move|| {
147 /// // close this stream after one second
148 /// timer::sleep(Duration::seconds(1));
149 /// let mut stream = stream2;
150 /// stream.close_read();
151 /// });
152 ///
153 /// // wait for some data, will get canceled after one second
154 /// let mut buf = [0];
155 /// stream.read(&mut buf);
156 /// ```
157 ///
158 /// Note that this method affects all cloned handles associated with this
159 /// stream, not just this one handle.
160 pub fn close_read(&mut self) -> IoResult<()> {
161 self.inner.close_read()
162 }
163
164 /// Closes the writing half of this connection.
165 ///
166 /// This method will close the writing portion of this connection, causing
167 /// all future writes to immediately return with an error.
168 ///
169 /// Note that this method affects all cloned handles associated with this
170 /// stream, not just this one handle.
171 pub fn close_write(&mut self) -> IoResult<()> {
172 self.inner.close_write()
173 }
174
175 /// Sets a timeout, in milliseconds, for blocking operations on this stream.
176 ///
177 /// This function will set a timeout for all blocking operations (including
178 /// reads and writes) on this stream. The timeout specified is a relative
179 /// time, in milliseconds, into the future after which point operations will
180 /// time out. This means that the timeout must be reset periodically to keep
181 /// it from expiring. Specifying a value of `None` will clear the timeout
182 /// for this stream.
183 ///
184 /// The timeout on this stream is local to this stream only. Setting a
185 /// timeout does not affect any other cloned instances of this stream, nor
186 /// does the timeout propagated to cloned handles of this stream. Setting
187 /// this timeout will override any specific read or write timeouts
188 /// previously set for this stream.
189 ///
190 /// For clarification on the semantics of interrupting a read and a write,
191 /// take a look at `set_read_timeout` and `set_write_timeout`.
192 #[unstable(feature = "io",
193 reason = "the timeout argument may change in type and value")]
194 pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
195 self.inner.set_timeout(timeout_ms)
196 }
197
198 /// Sets the timeout for read operations on this stream.
199 ///
200 /// See documentation in `set_timeout` for the semantics of this read time.
201 /// This will overwrite any previous read timeout set through either this
202 /// function or `set_timeout`.
203 ///
204 /// # Errors
205 ///
206 /// When this timeout expires, if there is no pending read operation, no
207 /// action is taken. Otherwise, the read operation will be scheduled to
208 /// promptly return. If a timeout error is returned, then no data was read
209 /// during the timeout period.
210 #[unstable(feature = "io",
211 reason = "the timeout argument may change in type and value")]
212 pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
213 self.inner.set_read_timeout(timeout_ms)
214 }
215
216 /// Sets the timeout for write operations on this stream.
217 ///
218 /// See documentation in `set_timeout` for the semantics of this write time.
219 /// This will overwrite any previous write timeout set through either this
220 /// function or `set_timeout`.
221 ///
222 /// # Errors
223 ///
224 /// When this timeout expires, if there is no pending write operation, no
225 /// action is taken. Otherwise, the pending write operation will be
226 /// scheduled to promptly return. The actual state of the underlying stream
227 /// is not specified.
228 ///
229 /// The write operation may return an error of type `ShortWrite` which
230 /// indicates that the object is known to have written an exact number of
231 /// bytes successfully during the timeout period, and the remaining bytes
232 /// were never written.
233 ///
234 /// If the write operation returns `TimedOut`, then it the timeout primitive
235 /// does not know how many bytes were written as part of the timeout
236 /// operation. It may be the case that bytes continue to be written in an
237 /// asynchronous fashion after the call to write returns.
238 #[unstable(feature = "io",
239 reason = "the timeout argument may change in type and value")]
240 pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
241 self.inner.set_write_timeout(timeout_ms)
242 }
243 }
244
245 impl Clone for TcpStream {
246 /// Creates a new handle to this TCP stream, allowing for simultaneous reads
247 /// and writes of this connection.
248 ///
249 /// The underlying TCP stream will not be closed until all handles to the
250 /// stream have been deallocated. All handles will also follow the same
251 /// stream, but two concurrent reads will not receive the same data.
252 /// Instead, the first read will receive the first packet received, and the
253 /// second read will receive the second packet.
254 fn clone(&self) -> TcpStream {
255 TcpStream { inner: self.inner.clone() }
256 }
257 }
258
259 impl Reader for TcpStream {
260 fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
261 self.inner.read(buf)
262 }
263 }
264
265 impl Writer for TcpStream {
266 fn write_all(&mut self, buf: &[u8]) -> IoResult<()> {
267 self.inner.write(buf)
268 }
269 }
270
271 impl sys_common::AsInner<TcpStreamImp> for TcpStream {
272 fn as_inner(&self) -> &TcpStreamImp {
273 &self.inner
274 }
275 }
276
277 /// A structure representing a socket server. This listener is used to create a
278 /// `TcpAcceptor` which can be used to accept sockets on a local port.
279 ///
280 /// # Examples
281 ///
282 /// ```
283 /// # #![feature(old_io)]
284 /// # fn foo() {
285 /// use std::old_io::*;
286 /// use std::thread;
287 ///
288 /// let listener = TcpListener::bind("127.0.0.1:80").unwrap();
289 ///
290 /// // bind the listener to the specified address
291 /// let mut acceptor = listener.listen().unwrap();
292 ///
293 /// fn handle_client(mut stream: TcpStream) {
294 /// // ...
295 /// # &mut stream; // silence unused mutability/variable warning
296 /// }
297 /// // accept connections and process them, spawning a new tasks for each one
298 /// for stream in acceptor.incoming() {
299 /// match stream {
300 /// Err(e) => { /* connection failed */ }
301 /// Ok(stream) => {
302 /// thread::spawn(move|| {
303 /// // connection succeeded
304 /// handle_client(stream)
305 /// });
306 /// }
307 /// }
308 /// }
309 ///
310 /// // close the socket server
311 /// drop(acceptor);
312 /// # }
313 /// ```
314 pub struct TcpListener {
315 inner: TcpListenerImp,
316 }
317
318 impl TcpListener {
319 /// Creates a new `TcpListener` which will be bound to the specified address.
320 /// This listener is not ready for accepting connections, `listen` must be called
321 /// on it before that's possible.
322 ///
323 /// Binding with a port number of 0 will request that the OS assigns a port
324 /// to this listener. The port allocated can be queried via the
325 /// `socket_name` function.
326 ///
327 /// The address type can be any implementer of `ToSocketAddr` trait. See its
328 /// documentation for concrete examples.
329 pub fn bind<A: ToSocketAddr>(addr: A) -> IoResult<TcpListener> {
330 super::with_addresses(addr, |addr| {
331 TcpListenerImp::bind(addr).map(|inner| TcpListener { inner: inner })
332 })
333 }
334
335 /// Returns the local socket address of this listener.
336 pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
337 self.inner.socket_name()
338 }
339 }
340
341 impl Listener<TcpAcceptor> for TcpListener {
342 fn listen(self) -> IoResult<TcpAcceptor> {
343 self.inner.listen(128).map(|a| TcpAcceptor { inner: a })
344 }
345 }
346
347 impl sys_common::AsInner<TcpListenerImp> for TcpListener {
348 fn as_inner(&self) -> &TcpListenerImp {
349 &self.inner
350 }
351 }
352
353 /// The accepting half of a TCP socket server. This structure is created through
354 /// a `TcpListener`'s `listen` method, and this object can be used to accept new
355 /// `TcpStream` instances.
356 pub struct TcpAcceptor {
357 inner: TcpAcceptorImp,
358 }
359
360 impl TcpAcceptor {
361 /// Prevents blocking on all future accepts after `ms` milliseconds have
362 /// elapsed.
363 ///
364 /// This function is used to set a deadline after which this acceptor will
365 /// time out accepting any connections. The argument is the relative
366 /// distance, in milliseconds, to a point in the future after which all
367 /// accepts will fail.
368 ///
369 /// If the argument specified is `None`, then any previously registered
370 /// timeout is cleared.
371 ///
372 /// A timeout of `0` can be used to "poll" this acceptor to see if it has
373 /// any pending connections. All pending connections will be accepted,
374 /// regardless of whether the timeout has expired or not (the accept will
375 /// not block in this case).
376 ///
377 /// # Examples
378 ///
379 /// ```no_run
380 /// # #![feature(old_io, io)]
381 /// use std::old_io::*;
382 ///
383 /// let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap();
384 ///
385 /// // After 100ms have passed, all accepts will fail
386 /// a.set_timeout(Some(100));
387 ///
388 /// match a.accept() {
389 /// Ok(..) => println!("accepted a socket"),
390 /// Err(ref e) if e.kind == TimedOut => { println!("timed out!"); }
391 /// Err(e) => println!("err: {}", e),
392 /// }
393 ///
394 /// // Reset the timeout and try again
395 /// a.set_timeout(Some(100));
396 /// let socket = a.accept();
397 ///
398 /// // Clear the timeout and block indefinitely waiting for a connection
399 /// a.set_timeout(None);
400 /// let socket = a.accept();
401 /// ```
402 #[unstable(feature = "io",
403 reason = "the type of the argument and name of this function are \
404 subject to change")]
405 pub fn set_timeout(&mut self, ms: Option<u64>) { self.inner.set_timeout(ms); }
406
407 /// Closes the accepting capabilities of this acceptor.
408 ///
409 /// This function is similar to `TcpStream`'s `close_{read,write}` methods
410 /// in that it will affect *all* cloned handles of this acceptor's original
411 /// handle.
412 ///
413 /// Once this function succeeds, all future calls to `accept` will return
414 /// immediately with an error, preventing all future calls to accept. The
415 /// underlying socket will not be relinquished back to the OS until all
416 /// acceptors have been deallocated.
417 ///
418 /// This is useful for waking up a thread in an accept loop to indicate that
419 /// it should exit.
420 ///
421 /// # Examples
422 ///
423 /// ```
424 /// # #![feature(old_io, io)]
425 /// use std::old_io::*;
426 /// use std::thread;
427 ///
428 /// let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap();
429 /// let a2 = a.clone();
430 ///
431 /// let _t = thread::spawn(move|| {
432 /// let mut a2 = a2;
433 /// for socket in a2.incoming() {
434 /// match socket {
435 /// Ok(s) => { /* handle s */ }
436 /// Err(ref e) if e.kind == EndOfFile => break, // closed
437 /// Err(e) => panic!("unexpected error: {}", e),
438 /// }
439 /// }
440 /// });
441 ///
442 /// # fn wait_for_sigint() {}
443 /// // Now that our accept loop is running, wait for the program to be
444 /// // requested to exit.
445 /// wait_for_sigint();
446 ///
447 /// // Signal our accept loop to exit
448 /// assert!(a.close_accept().is_ok());
449 /// ```
450 #[unstable(feature = "io")]
451 pub fn close_accept(&mut self) -> IoResult<()> {
452 self.inner.close_accept()
453 }
454 }
455
456 impl Acceptor for TcpAcceptor {
457 type Connection = TcpStream;
458 fn accept(&mut self) -> IoResult<TcpStream> {
459 self.inner.accept().map(TcpStream::new)
460 }
461 }
462
463 impl Clone for TcpAcceptor {
464 /// Creates a new handle to this TCP acceptor, allowing for simultaneous
465 /// accepts.
466 ///
467 /// The underlying TCP acceptor will not be closed until all handles to the
468 /// acceptor have been deallocated. Incoming connections will be received on
469 /// at most once acceptor, the same connection will not be accepted twice.
470 ///
471 /// The `close_accept` method will shut down *all* acceptors cloned from the
472 /// same original acceptor, whereas the `set_timeout` method only affects
473 /// the selector that it is called on.
474 ///
475 /// This function is useful for creating a handle to invoke `close_accept`
476 /// on to wake up any other task blocked in `accept`.
477 fn clone(&self) -> TcpAcceptor {
478 TcpAcceptor { inner: self.inner.clone() }
479 }
480 }
481
482 impl sys_common::AsInner<TcpAcceptorImp> for TcpAcceptor {
483 fn as_inner(&self) -> &TcpAcceptorImp {
484 &self.inner
485 }
486 }
487
488 #[cfg(test)]
489 mod test {
490 use prelude::v1::*;
491
492 use sync::mpsc::channel;
493 use thread;
494 use old_io::net::tcp::*;
495 use old_io::net::ip::*;
496 use old_io::test::*;
497 use old_io::{EndOfFile, TimedOut, ShortWrite, IoError};
498 use old_io::{ConnectionRefused, BrokenPipe, ConnectionAborted};
499 use old_io::{ConnectionReset, NotConnected, PermissionDenied, OtherIoError};
500 use old_io::{InvalidInput};
501 use old_io::{Acceptor, Listener};
502 use old_io::{Reader, Writer};
503
504 // FIXME #11530 this fails on android because tests are run as root
505 #[cfg_attr(any(windows, target_os = "android"), ignore)]
506 #[test]
507 fn bind_error() {
508 match TcpListener::bind("0.0.0.0:1") {
509 Ok(..) => panic!(),
510 Err(e) => assert_eq!(e.kind, PermissionDenied),
511 }
512 }
513
514 #[test]
515 fn connect_error() {
516 match TcpStream::connect("0.0.0.0:1") {
517 Ok(..) => panic!(),
518 Err(e) => assert!((e.kind == ConnectionRefused)
519 || (e.kind == InvalidInput)),
520 }
521 }
522
523 #[test]
524 fn listen_ip4_localhost() {
525 let socket_addr = next_test_ip4();
526 let listener = TcpListener::bind(socket_addr);
527 let mut acceptor = listener.listen();
528
529 let _t = thread::spawn(move|| {
530 let mut stream = TcpStream::connect(("localhost", socket_addr.port));
531 stream.write(&[144]).unwrap();
532 });
533
534 let mut stream = acceptor.accept();
535 let mut buf = [0];
536 stream.read(&mut buf).unwrap();
537 assert!(buf[0] == 144);
538 }
539
540 #[test]
541 fn connect_localhost() {
542 let addr = next_test_ip4();
543 let mut acceptor = TcpListener::bind(addr).listen();
544
545 let _t = thread::spawn(move|| {
546 let mut stream = TcpStream::connect(("localhost", addr.port));
547 stream.write(&[64]).unwrap();
548 });
549
550 let mut stream = acceptor.accept();
551 let mut buf = [0];
552 stream.read(&mut buf).unwrap();
553 assert!(buf[0] == 64);
554 }
555
556 #[test]
557 fn connect_ip4_loopback() {
558 let addr = next_test_ip4();
559 let mut acceptor = TcpListener::bind(addr).listen();
560
561 let _t = thread::spawn(move|| {
562 let mut stream = TcpStream::connect(("127.0.0.1", addr.port));
563 stream.write(&[44]).unwrap();
564 });
565
566 let mut stream = acceptor.accept();
567 let mut buf = [0];
568 stream.read(&mut buf).unwrap();
569 assert!(buf[0] == 44);
570 }
571
572 #[test]
573 fn connect_ip6_loopback() {
574 let addr = next_test_ip6();
575 let mut acceptor = TcpListener::bind(addr).listen();
576
577 let _t = thread::spawn(move|| {
578 let mut stream = TcpStream::connect(("::1", addr.port));
579 stream.write(&[66]).unwrap();
580 });
581
582 let mut stream = acceptor.accept();
583 let mut buf = [0];
584 stream.read(&mut buf).unwrap();
585 assert!(buf[0] == 66);
586 }
587
588 #[test]
589 fn smoke_test_ip4() {
590 let addr = next_test_ip4();
591 let mut acceptor = TcpListener::bind(addr).listen();
592
593 let _t = thread::spawn(move|| {
594 let mut stream = TcpStream::connect(addr);
595 stream.write(&[99]).unwrap();
596 });
597
598 let mut stream = acceptor.accept();
599 let mut buf = [0];
600 stream.read(&mut buf).unwrap();
601 assert!(buf[0] == 99);
602 }
603
604 #[test]
605 fn smoke_test_ip6() {
606 let addr = next_test_ip6();
607 let mut acceptor = TcpListener::bind(addr).listen();
608
609 let _t = thread::spawn(move|| {
610 let mut stream = TcpStream::connect(addr);
611 stream.write(&[99]).unwrap();
612 });
613
614 let mut stream = acceptor.accept();
615 let mut buf = [0];
616 stream.read(&mut buf).unwrap();
617 assert!(buf[0] == 99);
618 }
619
620 #[test]
621 fn read_eof_ip4() {
622 let addr = next_test_ip4();
623 let mut acceptor = TcpListener::bind(addr).listen();
624
625 let _t = thread::spawn(move|| {
626 let _stream = TcpStream::connect(addr);
627 // Close
628 });
629
630 let mut stream = acceptor.accept();
631 let mut buf = [0];
632 let nread = stream.read(&mut buf);
633 assert!(nread.is_err());
634 }
635
636 #[test]
637 fn read_eof_ip6() {
638 let addr = next_test_ip6();
639 let mut acceptor = TcpListener::bind(addr).listen();
640
641 let _t = thread::spawn(move|| {
642 let _stream = TcpStream::connect(addr);
643 // Close
644 });
645
646 let mut stream = acceptor.accept();
647 let mut buf = [0];
648 let nread = stream.read(&mut buf);
649 assert!(nread.is_err());
650 }
651
652 #[test]
653 fn read_eof_twice_ip4() {
654 let addr = next_test_ip4();
655 let mut acceptor = TcpListener::bind(addr).listen();
656
657 let _t = thread::spawn(move|| {
658 let _stream = TcpStream::connect(addr);
659 // Close
660 });
661
662 let mut stream = acceptor.accept();
663 let mut buf = [0];
664 let nread = stream.read(&mut buf);
665 assert!(nread.is_err());
666
667 match stream.read(&mut buf) {
668 Ok(..) => panic!(),
669 Err(ref e) => {
670 assert!(e.kind == NotConnected || e.kind == EndOfFile,
671 "unknown kind: {:?}", e.kind);
672 }
673 }
674 }
675
676 #[test]
677 fn read_eof_twice_ip6() {
678 let addr = next_test_ip6();
679 let mut acceptor = TcpListener::bind(addr).listen();
680
681 let _t = thread::spawn(move|| {
682 let _stream = TcpStream::connect(addr);
683 // Close
684 });
685
686 let mut stream = acceptor.accept();
687 let mut buf = [0];
688 let nread = stream.read(&mut buf);
689 assert!(nread.is_err());
690
691 match stream.read(&mut buf) {
692 Ok(..) => panic!(),
693 Err(ref e) => {
694 assert!(e.kind == NotConnected || e.kind == EndOfFile,
695 "unknown kind: {:?}", e.kind);
696 }
697 }
698 }
699
700 #[test]
701 fn write_close_ip4() {
702 let addr = next_test_ip4();
703 let mut acceptor = TcpListener::bind(addr).listen();
704
705 let (tx, rx) = channel();
706 let _t = thread::spawn(move|| {
707 drop(TcpStream::connect(addr));
708 tx.send(()).unwrap();
709 });
710
711 let mut stream = acceptor.accept();
712 rx.recv().unwrap();
713 let buf = [0];
714 match stream.write(&buf) {
715 Ok(..) => {}
716 Err(e) => {
717 assert!(e.kind == ConnectionReset ||
718 e.kind == BrokenPipe ||
719 e.kind == ConnectionAborted,
720 "unknown error: {}", e);
721 }
722 }
723 }
724
725 #[test]
726 fn write_close_ip6() {
727 let addr = next_test_ip6();
728 let mut acceptor = TcpListener::bind(addr).listen();
729
730 let (tx, rx) = channel();
731 let _t = thread::spawn(move|| {
732 drop(TcpStream::connect(addr));
733 tx.send(()).unwrap();
734 });
735
736 let mut stream = acceptor.accept();
737 rx.recv().unwrap();
738 let buf = [0];
739 match stream.write(&buf) {
740 Ok(..) => {}
741 Err(e) => {
742 assert!(e.kind == ConnectionReset ||
743 e.kind == BrokenPipe ||
744 e.kind == ConnectionAborted,
745 "unknown error: {}", e);
746 }
747 }
748 }
749
750 #[test]
751 fn multiple_connect_serial_ip4() {
752 let addr = next_test_ip4();
753 let max = 10;
754 let mut acceptor = TcpListener::bind(addr).listen();
755
756 let _t = thread::spawn(move|| {
757 for _ in 0..max {
758 let mut stream = TcpStream::connect(addr);
759 stream.write(&[99]).unwrap();
760 }
761 });
762
763 for ref mut stream in acceptor.incoming().take(max) {
764 let mut buf = [0];
765 stream.read(&mut buf).unwrap();
766 assert_eq!(buf[0], 99);
767 }
768 }
769
770 #[test]
771 fn multiple_connect_serial_ip6() {
772 let addr = next_test_ip6();
773 let max = 10;
774 let mut acceptor = TcpListener::bind(addr).listen();
775
776 let _t = thread::spawn(move|| {
777 for _ in 0..max {
778 let mut stream = TcpStream::connect(addr);
779 stream.write(&[99]).unwrap();
780 }
781 });
782
783 for ref mut stream in acceptor.incoming().take(max) {
784 let mut buf = [0];
785 stream.read(&mut buf).unwrap();
786 assert_eq!(buf[0], 99);
787 }
788 }
789
790 #[test]
791 fn multiple_connect_interleaved_greedy_schedule_ip4() {
792 let addr = next_test_ip4();
793 static MAX: isize = 10;
794 let acceptor = TcpListener::bind(addr).listen();
795
796 let _t = thread::spawn(move|| {
797 let mut acceptor = acceptor;
798 for (i, stream) in acceptor.incoming().enumerate().take(MAX as usize) {
799 // Start another task to handle the connection
800 let _t = thread::spawn(move|| {
801 let mut stream = stream;
802 let mut buf = [0];
803 stream.read(&mut buf).unwrap();
804 assert!(buf[0] == i as u8);
805 debug!("read");
806 });
807 }
808 });
809
810 connect(0, addr);
811
812 fn connect(i: isize, addr: SocketAddr) {
813 if i == MAX { return }
814
815 let _t = thread::spawn(move|| {
816 debug!("connecting");
817 let mut stream = TcpStream::connect(addr);
818 // Connect again before writing
819 connect(i + 1, addr);
820 debug!("writing");
821 stream.write(&[i as u8]).unwrap();
822 });
823 }
824 }
825
826 #[test]
827 fn multiple_connect_interleaved_greedy_schedule_ip6() {
828 let addr = next_test_ip6();
829 static MAX: isize = 10;
830 let acceptor = TcpListener::bind(addr).listen();
831
832 let _t = thread::spawn(move|| {
833 let mut acceptor = acceptor;
834 for (i, stream) in acceptor.incoming().enumerate().take(MAX as usize) {
835 // Start another task to handle the connection
836 let _t = thread::spawn(move|| {
837 let mut stream = stream;
838 let mut buf = [0];
839 stream.read(&mut buf).unwrap();
840 assert!(buf[0] == i as u8);
841 debug!("read");
842 });
843 }
844 });
845
846 connect(0, addr);
847
848 fn connect(i: isize, addr: SocketAddr) {
849 if i == MAX { return }
850
851 let _t = thread::spawn(move|| {
852 debug!("connecting");
853 let mut stream = TcpStream::connect(addr);
854 // Connect again before writing
855 connect(i + 1, addr);
856 debug!("writing");
857 stream.write(&[i as u8]).unwrap();
858 });
859 }
860 }
861
862 #[test]
863 fn multiple_connect_interleaved_lazy_schedule_ip4() {
864 static MAX: isize = 10;
865 let addr = next_test_ip4();
866 let acceptor = TcpListener::bind(addr).listen();
867
868 let _t = thread::spawn(move|| {
869 let mut acceptor = acceptor;
870 for stream in acceptor.incoming().take(MAX as usize) {
871 // Start another task to handle the connection
872 let _t = thread::spawn(move|| {
873 let mut stream = stream;
874 let mut buf = [0];
875 stream.read(&mut buf).unwrap();
876 assert!(buf[0] == 99);
877 debug!("read");
878 });
879 }
880 });
881
882 connect(0, addr);
883
884 fn connect(i: isize, addr: SocketAddr) {
885 if i == MAX { return }
886
887 let _t = thread::spawn(move|| {
888 debug!("connecting");
889 let mut stream = TcpStream::connect(addr);
890 // Connect again before writing
891 connect(i + 1, addr);
892 debug!("writing");
893 stream.write(&[99]).unwrap();
894 });
895 }
896 }
897
898 #[test]
899 fn multiple_connect_interleaved_lazy_schedule_ip6() {
900 static MAX: isize = 10;
901 let addr = next_test_ip6();
902 let acceptor = TcpListener::bind(addr).listen();
903
904 let _t = thread::spawn(move|| {
905 let mut acceptor = acceptor;
906 for stream in acceptor.incoming().take(MAX as usize) {
907 // Start another task to handle the connection
908 let _t = thread::spawn(move|| {
909 let mut stream = stream;
910 let mut buf = [0];
911 stream.read(&mut buf).unwrap();
912 assert!(buf[0] == 99);
913 debug!("read");
914 });
915 }
916 });
917
918 connect(0, addr);
919
920 fn connect(i: isize, addr: SocketAddr) {
921 if i == MAX { return }
922
923 let _t = thread::spawn(move|| {
924 debug!("connecting");
925 let mut stream = TcpStream::connect(addr);
926 // Connect again before writing
927 connect(i + 1, addr);
928 debug!("writing");
929 stream.write(&[99]).unwrap();
930 });
931 }
932 }
933
934 pub fn socket_name(addr: SocketAddr) {
935 let mut listener = TcpListener::bind(addr).unwrap();
936
937 // Make sure socket_name gives
938 // us the socket we binded to.
939 let so_name = listener.socket_name();
940 assert!(so_name.is_ok());
941 assert_eq!(addr, so_name.unwrap());
942 }
943
944 pub fn peer_name(addr: SocketAddr) {
945 let acceptor = TcpListener::bind(addr).listen();
946 let _t = thread::spawn(move|| {
947 let mut acceptor = acceptor;
948 acceptor.accept().unwrap();
949 });
950
951 let stream = TcpStream::connect(addr);
952
953 assert!(stream.is_ok());
954 let mut stream = stream.unwrap();
955
956 // Make sure peer_name gives us the
957 // address/port of the peer we've
958 // connected to.
959 let peer_name = stream.peer_name();
960 assert!(peer_name.is_ok());
961 assert_eq!(addr, peer_name.unwrap());
962 }
963
964 #[test]
965 fn socket_and_peer_name_ip4() {
966 peer_name(next_test_ip4());
967 socket_name(next_test_ip4());
968 }
969
970 #[test]
971 fn socket_and_peer_name_ip6() {
972 // FIXME: peer name is not consistent
973 //peer_name(next_test_ip6());
974 socket_name(next_test_ip6());
975 }
976
977 #[test]
978 fn partial_read() {
979 let addr = next_test_ip4();
980 let (tx, rx) = channel();
981 let _t = thread::spawn(move|| {
982 let mut srv = TcpListener::bind(addr).listen().unwrap();
983 tx.send(()).unwrap();
984 let mut cl = srv.accept().unwrap();
985 cl.write(&[10]).unwrap();
986 let mut b = [0];
987 cl.read(&mut b).unwrap();
988 tx.send(()).unwrap();
989 });
990
991 rx.recv().unwrap();
992 let mut c = TcpStream::connect(addr).unwrap();
993 let mut b = [0; 10];
994 assert_eq!(c.read(&mut b), Ok(1));
995 c.write(&[1]).unwrap();
996 rx.recv().unwrap();
997 }
998
999 #[test]
1000 fn double_bind() {
1001 let addr = next_test_ip4();
1002 let listener = TcpListener::bind(addr).unwrap().listen();
1003 assert!(listener.is_ok());
1004 match TcpListener::bind(addr).listen() {
1005 Ok(..) => panic!(),
1006 Err(e) => {
1007 assert!(e.kind == ConnectionRefused || e.kind == OtherIoError,
1008 "unknown error: {} {:?}", e, e.kind);
1009 }
1010 }
1011 }
1012
1013 #[test]
1014 fn fast_rebind() {
1015 let addr = next_test_ip4();
1016 let (tx, rx) = channel();
1017
1018 let _t = thread::spawn(move|| {
1019 rx.recv().unwrap();
1020 let _stream = TcpStream::connect(addr).unwrap();
1021 // Close
1022 rx.recv().unwrap();
1023 });
1024
1025 {
1026 let mut acceptor = TcpListener::bind(addr).listen();
1027 tx.send(()).unwrap();
1028 {
1029 let _stream = acceptor.accept().unwrap();
1030 // Close client
1031 tx.send(()).unwrap();
1032 }
1033 // Close listener
1034 }
1035 let _listener = TcpListener::bind(addr);
1036 }
1037
1038 #[test]
1039 fn tcp_clone_smoke() {
1040 let addr = next_test_ip4();
1041 let mut acceptor = TcpListener::bind(addr).listen();
1042
1043 let _t = thread::spawn(move|| {
1044 let mut s = TcpStream::connect(addr);
1045 let mut buf = [0, 0];
1046 assert_eq!(s.read(&mut buf), Ok(1));
1047 assert_eq!(buf[0], 1);
1048 s.write(&[2]).unwrap();
1049 });
1050
1051 let mut s1 = acceptor.accept().unwrap();
1052 let s2 = s1.clone();
1053
1054 let (tx1, rx1) = channel();
1055 let (tx2, rx2) = channel();
1056 let _t = thread::spawn(move|| {
1057 let mut s2 = s2;
1058 rx1.recv().unwrap();
1059 s2.write(&[1]).unwrap();
1060 tx2.send(()).unwrap();
1061 });
1062 tx1.send(()).unwrap();
1063 let mut buf = [0, 0];
1064 assert_eq!(s1.read(&mut buf), Ok(1));
1065 rx2.recv().unwrap();
1066 }
1067
1068 #[test]
1069 fn tcp_clone_two_read() {
1070 let addr = next_test_ip6();
1071 let mut acceptor = TcpListener::bind(addr).listen();
1072 let (tx1, rx) = channel();
1073 let tx2 = tx1.clone();
1074
1075 let _t = thread::spawn(move|| {
1076 let mut s = TcpStream::connect(addr);
1077 s.write(&[1]).unwrap();
1078 rx.recv().unwrap();
1079 s.write(&[2]).unwrap();
1080 rx.recv().unwrap();
1081 });
1082
1083 let mut s1 = acceptor.accept().unwrap();
1084 let s2 = s1.clone();
1085
1086 let (done, rx) = channel();
1087 let _t = thread::spawn(move|| {
1088 let mut s2 = s2;
1089 let mut buf = [0, 0];
1090 s2.read(&mut buf).unwrap();
1091 tx2.send(()).unwrap();
1092 done.send(()).unwrap();
1093 });
1094 let mut buf = [0, 0];
1095 s1.read(&mut buf).unwrap();
1096 tx1.send(()).unwrap();
1097
1098 rx.recv().unwrap();
1099 }
1100
1101 #[test]
1102 fn tcp_clone_two_write() {
1103 let addr = next_test_ip4();
1104 let mut acceptor = TcpListener::bind(addr).listen();
1105
1106 let _t = thread::spawn(move|| {
1107 let mut s = TcpStream::connect(addr);
1108 let mut buf = [0, 1];
1109 s.read(&mut buf).unwrap();
1110 s.read(&mut buf).unwrap();
1111 });
1112
1113 let mut s1 = acceptor.accept().unwrap();
1114 let s2 = s1.clone();
1115
1116 let (done, rx) = channel();
1117 let _t = thread::spawn(move|| {
1118 let mut s2 = s2;
1119 s2.write(&[1]).unwrap();
1120 done.send(()).unwrap();
1121 });
1122 s1.write(&[2]).unwrap();
1123
1124 rx.recv().unwrap();
1125 }
1126
1127 #[test]
1128 fn shutdown_smoke() {
1129 let addr = next_test_ip4();
1130 let a = TcpListener::bind(addr).unwrap().listen();
1131 let _t = thread::spawn(move|| {
1132 let mut a = a;
1133 let mut c = a.accept().unwrap();
1134 assert_eq!(c.read_to_end(), Ok(vec!()));
1135 c.write(&[1]).unwrap();
1136 });
1137
1138 let mut s = TcpStream::connect(addr).unwrap();
1139 assert!(s.inner.close_write().is_ok());
1140 assert!(s.write(&[1]).is_err());
1141 assert_eq!(s.read_to_end(), Ok(vec!(1)));
1142 }
1143
1144 #[test]
1145 fn accept_timeout() {
1146 let addr = next_test_ip4();
1147 let mut a = TcpListener::bind(addr).unwrap().listen().unwrap();
1148
1149 a.set_timeout(Some(10));
1150
1151 // Make sure we time out once and future invocations also time out
1152 let err = a.accept().err().unwrap();
1153 assert_eq!(err.kind, TimedOut);
1154 let err = a.accept().err().unwrap();
1155 assert_eq!(err.kind, TimedOut);
1156
1157 // Also make sure that even though the timeout is expired that we will
1158 // continue to receive any pending connections.
1159 //
1160 // FIXME: freebsd apparently never sees the pending connection, but
1161 // testing manually always works. Need to investigate this
1162 // flakiness.
1163 if !cfg!(target_os = "freebsd") {
1164 let (tx, rx) = channel();
1165 let _t = thread::spawn(move|| {
1166 tx.send(TcpStream::connect(addr).unwrap()).unwrap();
1167 });
1168 let _l = rx.recv().unwrap();
1169 for i in 0..1001 {
1170 match a.accept() {
1171 Ok(..) => break,
1172 Err(ref e) if e.kind == TimedOut => {}
1173 Err(e) => panic!("error: {}", e),
1174 }
1175 ::thread::yield_now();
1176 if i == 1000 { panic!("should have a pending connection") }
1177 }
1178 }
1179
1180 // Unset the timeout and make sure that this always blocks.
1181 a.set_timeout(None);
1182 let _t = thread::spawn(move|| {
1183 drop(TcpStream::connect(addr).unwrap());
1184 });
1185 a.accept().unwrap();
1186 }
1187
1188 #[test]
1189 fn close_readwrite_smoke() {
1190 let addr = next_test_ip4();
1191 let a = TcpListener::bind(addr).listen().unwrap();
1192 let (_tx, rx) = channel::<()>();
1193 thread::spawn(move|| {
1194 let mut a = a;
1195 let _s = a.accept().unwrap();
1196 let _ = rx.recv().unwrap();
1197 });
1198
1199 let mut b = [0];
1200 let mut s = TcpStream::connect(addr).unwrap();
1201 let mut s2 = s.clone();
1202
1203 // closing should prevent reads/writes
1204 s.close_write().unwrap();
1205 assert!(s.write(&[0]).is_err());
1206 s.close_read().unwrap();
1207 assert!(s.read(&mut b).is_err());
1208
1209 // closing should affect previous handles
1210 assert!(s2.write(&[0]).is_err());
1211 assert!(s2.read(&mut b).is_err());
1212
1213 // closing should affect new handles
1214 let mut s3 = s.clone();
1215 assert!(s3.write(&[0]).is_err());
1216 assert!(s3.read(&mut b).is_err());
1217
1218 // make sure these don't die
1219 let _ = s2.close_read();
1220 let _ = s2.close_write();
1221 let _ = s3.close_read();
1222 let _ = s3.close_write();
1223 }
1224
1225 #[test]
1226 fn close_read_wakes_up() {
1227 let addr = next_test_ip4();
1228 let a = TcpListener::bind(addr).listen().unwrap();
1229 let (_tx, rx) = channel::<()>();
1230 thread::spawn(move|| {
1231 let mut a = a;
1232 let _s = a.accept().unwrap();
1233 let _ = rx.recv().unwrap();
1234 });
1235
1236 let mut s = TcpStream::connect(addr).unwrap();
1237 let s2 = s.clone();
1238 let (tx, rx) = channel();
1239 let _t = thread::spawn(move|| {
1240 let mut s2 = s2;
1241 assert!(s2.read(&mut [0]).is_err());
1242 tx.send(()).unwrap();
1243 });
1244 // this should wake up the child task
1245 s.close_read().unwrap();
1246
1247 // this test will never finish if the child doesn't wake up
1248 rx.recv().unwrap();
1249 }
1250
1251 #[test]
1252 fn readwrite_timeouts() {
1253 let addr = next_test_ip6();
1254 let mut a = TcpListener::bind(addr).listen().unwrap();
1255 let (tx, rx) = channel::<()>();
1256 thread::spawn(move|| {
1257 let mut s = TcpStream::connect(addr).unwrap();
1258 rx.recv().unwrap();
1259 assert!(s.write(&[0]).is_ok());
1260 let _ = rx.recv();
1261 });
1262
1263 let mut s = a.accept().unwrap();
1264 s.set_timeout(Some(20));
1265 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1266 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1267
1268 s.set_timeout(Some(20));
1269 for i in 0..1001 {
1270 match s.write(&[0; 128 * 1024]) {
1271 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1272 Err(IoError { kind: TimedOut, .. }) => break,
1273 Err(e) => panic!("{}", e),
1274 }
1275 if i == 1000 { panic!("should have filled up?!"); }
1276 }
1277 assert_eq!(s.write(&[0]).err().unwrap().kind, TimedOut);
1278
1279 tx.send(()).unwrap();
1280 s.set_timeout(None);
1281 assert_eq!(s.read(&mut [0, 0]), Ok(1));
1282 }
1283
1284 #[test]
1285 fn read_timeouts() {
1286 let addr = next_test_ip6();
1287 let mut a = TcpListener::bind(addr).listen().unwrap();
1288 let (tx, rx) = channel::<()>();
1289 thread::spawn(move|| {
1290 let mut s = TcpStream::connect(addr).unwrap();
1291 rx.recv().unwrap();
1292 let mut amt = 0;
1293 while amt < 100 * 128 * 1024 {
1294 match s.read(&mut [0;128 * 1024]) {
1295 Ok(n) => { amt += n; }
1296 Err(e) => panic!("{}", e),
1297 }
1298 }
1299 let _ = rx.recv();
1300 });
1301
1302 let mut s = a.accept().unwrap();
1303 s.set_read_timeout(Some(20));
1304 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1305 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1306
1307 tx.send(()).unwrap();
1308 for _ in 0..100 {
1309 assert!(s.write(&[0;128 * 1024]).is_ok());
1310 }
1311 }
1312
1313 #[test]
1314 fn write_timeouts() {
1315 let addr = next_test_ip6();
1316 let mut a = TcpListener::bind(addr).listen().unwrap();
1317 let (tx, rx) = channel::<()>();
1318 thread::spawn(move|| {
1319 let mut s = TcpStream::connect(addr).unwrap();
1320 rx.recv().unwrap();
1321 assert!(s.write(&[0]).is_ok());
1322 let _ = rx.recv();
1323 });
1324
1325 let mut s = a.accept().unwrap();
1326 s.set_write_timeout(Some(20));
1327 for i in 0..1001 {
1328 match s.write(&[0; 128 * 1024]) {
1329 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1330 Err(IoError { kind: TimedOut, .. }) => break,
1331 Err(e) => panic!("{}", e),
1332 }
1333 if i == 1000 { panic!("should have filled up?!"); }
1334 }
1335 assert_eq!(s.write(&[0]).err().unwrap().kind, TimedOut);
1336
1337 tx.send(()).unwrap();
1338 assert!(s.read(&mut [0]).is_ok());
1339 }
1340
1341 #[test]
1342 fn timeout_concurrent_read() {
1343 let addr = next_test_ip6();
1344 let mut a = TcpListener::bind(addr).listen().unwrap();
1345 let (tx, rx) = channel::<()>();
1346 thread::spawn(move|| {
1347 let mut s = TcpStream::connect(addr).unwrap();
1348 rx.recv().unwrap();
1349 assert_eq!(s.write(&[0]), Ok(()));
1350 let _ = rx.recv();
1351 });
1352
1353 let mut s = a.accept().unwrap();
1354 let s2 = s.clone();
1355 let (tx2, rx2) = channel();
1356 let _t = thread::spawn(move|| {
1357 let mut s2 = s2;
1358 assert_eq!(s2.read(&mut [0]), Ok(1));
1359 tx2.send(()).unwrap();
1360 });
1361
1362 s.set_read_timeout(Some(20));
1363 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1364 tx.send(()).unwrap();
1365
1366 rx2.recv().unwrap();
1367 }
1368
1369 #[test]
1370 fn clone_while_reading() {
1371 let addr = next_test_ip6();
1372 let listen = TcpListener::bind(addr);
1373 let mut accept = listen.listen().unwrap();
1374
1375 // Enqueue a task to write to a socket
1376 let (tx, rx) = channel();
1377 let (txdone, rxdone) = channel();
1378 let txdone2 = txdone.clone();
1379 let _t = thread::spawn(move|| {
1380 let mut tcp = TcpStream::connect(addr).unwrap();
1381 rx.recv().unwrap();
1382 tcp.write_u8(0).unwrap();
1383 txdone2.send(()).unwrap();
1384 });
1385
1386 // Spawn off a reading clone
1387 let tcp = accept.accept().unwrap();
1388 let tcp2 = tcp.clone();
1389 let txdone3 = txdone.clone();
1390 let _t = thread::spawn(move|| {
1391 let mut tcp2 = tcp2;
1392 tcp2.read_u8().unwrap();
1393 txdone3.send(()).unwrap();
1394 });
1395
1396 // Try to ensure that the reading clone is indeed reading
1397 for _ in 0..50 {
1398 ::thread::yield_now();
1399 }
1400
1401 // clone the handle again while it's reading, then let it finish the
1402 // read.
1403 let _ = tcp.clone();
1404 tx.send(()).unwrap();
1405 rxdone.recv().unwrap();
1406 rxdone.recv().unwrap();
1407 }
1408
1409 #[test]
1410 fn clone_accept_smoke() {
1411 let addr = next_test_ip4();
1412 let l = TcpListener::bind(addr);
1413 let mut a = l.listen().unwrap();
1414 let mut a2 = a.clone();
1415
1416 let _t = thread::spawn(move|| {
1417 let _ = TcpStream::connect(addr);
1418 });
1419 let _t = thread::spawn(move|| {
1420 let _ = TcpStream::connect(addr);
1421 });
1422
1423 assert!(a.accept().is_ok());
1424 assert!(a2.accept().is_ok());
1425 }
1426
1427 #[test]
1428 fn clone_accept_concurrent() {
1429 let addr = next_test_ip4();
1430 let l = TcpListener::bind(addr);
1431 let a = l.listen().unwrap();
1432 let a2 = a.clone();
1433
1434 let (tx, rx) = channel();
1435 let tx2 = tx.clone();
1436
1437 let _t = thread::spawn(move|| {
1438 let mut a = a;
1439 tx.send(a.accept()).unwrap();
1440 });
1441 let _t = thread::spawn(move|| {
1442 let mut a = a2;
1443 tx2.send(a.accept()).unwrap();
1444 });
1445
1446 let _t = thread::spawn(move|| {
1447 let _ = TcpStream::connect(addr);
1448 });
1449 let _t = thread::spawn(move|| {
1450 let _ = TcpStream::connect(addr);
1451 });
1452
1453 assert!(rx.recv().unwrap().is_ok());
1454 assert!(rx.recv().unwrap().is_ok());
1455 }
1456
1457 #[test]
1458 fn close_accept_smoke() {
1459 let addr = next_test_ip4();
1460 let l = TcpListener::bind(addr);
1461 let mut a = l.listen().unwrap();
1462
1463 a.close_accept().unwrap();
1464 assert_eq!(a.accept().err().unwrap().kind, EndOfFile);
1465 }
1466
1467 #[test]
1468 fn close_accept_concurrent() {
1469 let addr = next_test_ip4();
1470 let l = TcpListener::bind(addr);
1471 let a = l.listen().unwrap();
1472 let mut a2 = a.clone();
1473
1474 let (tx, rx) = channel();
1475 let _t = thread::spawn(move|| {
1476 let mut a = a;
1477 tx.send(a.accept()).unwrap();
1478 });
1479 a2.close_accept().unwrap();
1480
1481 assert_eq!(rx.recv().unwrap().err().unwrap().kind, EndOfFile);
1482 }
1483 }