]> git.proxmox.com Git - proxmox-backup.git/blob - src/bin/h2s-client.rs
introduce new runtime tokio helpers
[proxmox-backup.git] / src / bin / h2s-client.rs
1 use std::future::Future;
2 use std::pin::Pin;
3 use std::task::{Context, Poll};
4
5 use failure::*;
6 use futures::future::TryFutureExt;
7 use futures::stream::Stream;
8
9 // Simple H2 client to test H2 download speed using h2s-server.rs
10
11 struct Process {
12 body: h2::RecvStream,
13 trailers: bool,
14 bytes: usize,
15 }
16
17 impl Future for Process {
18 type Output = Result<usize, Error>;
19
20 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
21 let this = self.get_mut();
22
23 loop {
24 if this.trailers {
25 match futures::ready!(this.body.poll_trailers(cx)) {
26 Ok(Some(trailers)) => println!("trailers: {:?}", trailers),
27 Ok(None) => (),
28 Err(err) => return Poll::Ready(Err(Error::from(err))),
29 }
30
31 println!("Received {} bytes", this.bytes);
32
33 return Poll::Ready(Ok(this.bytes));
34 } else {
35 match futures::ready!(Pin::new(&mut this.body).poll_next(cx)) {
36 Some(Ok(chunk)) => {
37 this.body.flow_control().release_capacity(chunk.len())?;
38 this.bytes += chunk.len();
39 // println!("GOT FRAME {}", chunk.len());
40 },
41 Some(Err(err)) => return Poll::Ready(Err(Error::from(err))),
42 None => {
43 this.trailers = true;
44 },
45 }
46 }
47 }
48 }
49 }
50
51 fn send_request(
52 mut client: h2::client::SendRequest<bytes::Bytes>,
53 ) -> impl Future<Output = Result<usize, Error>> {
54 println!("sending request");
55
56 let request = http::Request::builder()
57 .uri("http://localhost/")
58 .body(())
59 .unwrap();
60
61 let (response, _stream) = client.send_request(request, true).unwrap();
62
63 response
64 .map_err(Error::from)
65 .and_then(|response| {
66 Process { body: response.into_body(), trailers: false, bytes: 0 }
67 })
68 }
69
70 fn main() -> Result<(), Error> {
71 proxmox_backup::tools::runtime::main(run())
72 }
73
74 async fn run() -> Result<(), Error> {
75 let start = std::time::SystemTime::now();
76
77 let conn =
78 tokio::net::TcpStream::connect(std::net::SocketAddr::from(([127,0,0,1], 8008))).await?;
79
80 conn.set_nodelay(true).unwrap();
81 conn.set_recv_buffer_size(1024*1024).unwrap();
82
83 use openssl::ssl::{SslConnector, SslMethod};
84
85 let mut ssl_connector_builder = SslConnector::builder(SslMethod::tls()).unwrap();
86 ssl_connector_builder.set_verify(openssl::ssl::SslVerifyMode::NONE);
87 let conn =
88 tokio_openssl::connect(
89 ssl_connector_builder.build().configure()?,
90 "localhost",
91 conn,
92 )
93 .await
94 .map_err(|err| format_err!("connect failed - {}", err))?;
95
96 let (client, h2) = h2::client::Builder::new()
97 .initial_connection_window_size(1024*1024*1024)
98 .initial_window_size(1024*1024*1024)
99 .max_frame_size(4*1024*1024)
100 .handshake(conn)
101 .await?;
102
103 // Spawn a task to run the conn...
104 tokio::spawn(async move {
105 if let Err(e) = h2.await {
106 println!("GOT ERR={:?}", e);
107 }
108 });
109
110 let mut bytes = 0;
111 for _ in 0..100 {
112 match send_request(client.clone()).await {
113 Ok(b) => {
114 bytes += b;
115 }
116 Err(e) => {
117 println!("ERROR {}", e);
118 return Ok(());
119 }
120 }
121 }
122
123 let elapsed = start.elapsed().unwrap();
124 let elapsed = (elapsed.as_secs() as f64) +
125 (elapsed.subsec_millis() as f64)/1000.0;
126
127 println!("Downloaded {} bytes, {} MB/s", bytes, (bytes as f64)/(elapsed*1024.0*1024.0));
128
129 Ok(())
130 }