]> git.proxmox.com Git - cargo.git/blob - vendor/flate2-0.2.20/tests/tokio.rs
New upstream version 0.23.0
[cargo.git] / vendor / flate2-0.2.20 / tests / tokio.rs
1 #![cfg(feature = "tokio")]
2
3 extern crate tokio_core;
4 extern crate flate2;
5 extern crate tokio_io;
6 extern crate futures;
7 extern crate rand;
8
9 use std::thread;
10 use std::net::{Shutdown, TcpListener};
11 use std::io::{Read, Write};
12
13 use flate2::Compression;
14 use flate2::read;
15 use flate2::write;
16 use futures::Future;
17 use rand::{Rng, thread_rng};
18 use tokio_core::net::TcpStream;
19 use tokio_core::reactor::Core;
20 use tokio_io::AsyncRead;
21 use tokio_io::io::{copy, shutdown};
22
23 #[test]
24 fn tcp_stream_echo_pattern() {
25 const N: u8 = 16;
26 const M: usize = 16 * 1024;
27
28 let mut core = Core::new().unwrap();
29 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
30 let addr = listener.local_addr().unwrap();
31 let t = thread::spawn(move || {
32 let a = listener.accept().unwrap().0;
33 let b = a.try_clone().unwrap();
34
35 let t = thread::spawn(move || {
36 let mut b = read::DeflateDecoder::new(b);
37 let mut buf = [0; M];
38 for i in 0..N {
39 b.read_exact(&mut buf).unwrap();
40 for byte in buf.iter() {
41 assert_eq!(*byte, i);
42 }
43 }
44
45 assert_eq!(b.read(&mut buf).unwrap(), 0);
46 });
47
48 let mut a = write::ZlibEncoder::new(a, Compression::Default);
49 for i in 0..N {
50 let buf = [i; M];
51 a.write_all(&buf).unwrap();
52 }
53 a.finish().unwrap()
54 .shutdown(Shutdown::Write).unwrap();
55
56 t.join().unwrap();
57 });
58
59 let handle = core.handle();
60 let stream = TcpStream::connect(&addr, &handle);
61 let copy = stream.and_then(|s| {
62 let (a, b) = s.split();
63 let a = read::ZlibDecoder::new(a);
64 let b = write::DeflateEncoder::new(b, Compression::Default);
65 copy(a, b)
66 }).then(|result| {
67 let (amt, _a, b) = result.unwrap();
68 assert_eq!(amt, (N as u64) * (M as u64));
69 shutdown(b).map(|_| ())
70 });
71
72 core.run(copy).unwrap();
73 t.join().unwrap();
74 }
75
76 #[test]
77 fn echo_random() {
78 let v = thread_rng().gen_iter::<u8>().take(1024 * 1024).collect::<Vec<_>>();
79 let mut core = Core::new().unwrap();
80 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
81 let addr = listener.local_addr().unwrap();
82 let v2 = v.clone();
83 let t = thread::spawn(move || {
84 let a = listener.accept().unwrap().0;
85 let b = a.try_clone().unwrap();
86
87 let mut v3 = v2.clone();
88 let t = thread::spawn(move || {
89 let mut b = read::DeflateDecoder::new(b);
90 let mut buf = [0; 1024];
91 while v3.len() > 0 {
92 let n = b.read(&mut buf).unwrap();
93 for (actual, expected) in buf[..n].iter().zip(&v3) {
94 assert_eq!(*actual, *expected);
95 }
96 v3.drain(..n);
97 }
98
99 assert_eq!(b.read(&mut buf).unwrap(), 0);
100 });
101
102 let mut a = write::ZlibEncoder::new(a, Compression::Default);
103 a.write_all(&v2).unwrap();
104 a.finish().unwrap()
105 .shutdown(Shutdown::Write).unwrap();
106
107 t.join().unwrap();
108 });
109
110 let handle = core.handle();
111 let stream = TcpStream::connect(&addr, &handle);
112 let copy = stream.and_then(|s| {
113 let (a, b) = s.split();
114 let a = read::ZlibDecoder::new(a);
115 let b = write::DeflateEncoder::new(b, Compression::Default);
116 copy(a, b)
117 }).then(|result| {
118 let (amt, _a, b) = result.unwrap();
119 assert_eq!(amt, v.len() as u64);
120 shutdown(b).map(|_| ())
121 });
122
123 core.run(copy).unwrap();
124 t.join().unwrap();
125 }