extern crate flate2;
extern crate futures;
extern crate rand;
-extern crate tokio_core;
extern crate tokio_io;
+extern crate tokio_tcp;
+extern crate tokio_threadpool;
use std::io::{Read, Write};
use std::iter;
use std::net::{Shutdown, TcpListener};
use std::thread;
-use flate2::Compression;
use flate2::read;
use flate2::write;
+use flate2::Compression;
use futures::Future;
use rand::{thread_rng, Rng};
-use tokio_core::net::TcpStream;
-use tokio_core::reactor::Core;
-use tokio_io::AsyncRead;
use tokio_io::io::{copy, shutdown};
+use tokio_io::AsyncRead;
+use tokio_tcp::TcpStream;
#[test]
fn tcp_stream_echo_pattern() {
const N: u8 = 16;
const M: usize = 16 * 1024;
- let mut core = Core::new().unwrap();
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let t = thread::spawn(move || {
t.join().unwrap();
});
- let handle = core.handle();
- let stream = TcpStream::connect(&addr, &handle);
+ let stream = TcpStream::connect(&addr);
let copy = stream
.and_then(|s| {
let (a, b) = s.split();
let (amt, _a, b) = result.unwrap();
assert_eq!(amt, (N as u64) * (M as u64));
shutdown(b).map(|_| ())
- });
+ })
+ .map_err(|err| panic!("{}", err));
- core.run(copy).unwrap();
+ let threadpool = tokio_threadpool::Builder::new().build();
+ threadpool.spawn(copy);
+ threadpool.shutdown().wait().unwrap();
t.join().unwrap();
}
.take(1024 * 1024)
.map(|()| thread_rng().gen::<u8>())
.collect::<Vec<_>>();
- let mut core = Core::new().unwrap();
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let v2 = v.clone();
t.join().unwrap();
});
- let handle = core.handle();
- let stream = TcpStream::connect(&addr, &handle);
+ let stream = TcpStream::connect(&addr);
let copy = stream
.and_then(|s| {
let (a, b) = s.split();
let b = write::DeflateEncoder::new(b, Compression::default());
copy(a, b)
})
- .then(|result| {
+ .then(move |result| {
let (amt, _a, b) = result.unwrap();
assert_eq!(amt, v.len() as u64);
shutdown(b).map(|_| ())
- });
+ })
+ .map_err(|err| panic!("{}", err));
- core.run(copy).unwrap();
+ let threadpool = tokio_threadpool::Builder::new().build();
+ threadpool.spawn(copy);
+ threadpool.shutdown().wait().unwrap();
t.join().unwrap();
}