]> git.proxmox.com Git - proxmox-backup.git/commitdiff
src/bin/h2s-client.rs: avoid hyper, use h2 directly
authorDietmar Maurer <dietmar@proxmox.com>
Sat, 29 Jun 2019 13:58:18 +0000 (15:58 +0200)
committerDietmar Maurer <dietmar@proxmox.com>
Sat, 29 Jun 2019 13:58:18 +0000 (15:58 +0200)
But performance is still bad.

src/bin/h2s-client.rs

index 6de942903c1033c71fb44ef3de2f205085abfe50..78844238eed6fb2ef9184b4efa338d5ea2651b27 100644 (file)
@@ -3,54 +3,101 @@ use futures::*;
 
 // Simple H2 client to test H2 download speed using h2s-server.rs
 
-fn build_client() -> hyper::client::Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>> {
-    let mut builder = native_tls::TlsConnector::builder();
-    builder.danger_accept_invalid_certs(true);
-    let tlsconnector = builder.build().unwrap();
-    let mut httpc = hyper::client::HttpConnector::new(1);
-    httpc.set_nodelay(true); // important for h2 download performance!
-    httpc.enforce_http(false); // we want https...
-    let mut https = hyper_tls::HttpsConnector::from((httpc, tlsconnector));
-    https.https_only(true); // force it!
-    hyper::client::Client::builder()
-        .http2_only(true)
-        .http2_initial_stream_window_size( (1 << 31) - 2)
-        .http2_initial_connection_window_size( (1 << 31) - 2)
-         // howto?? .http2_max_frame_size(4*1024*1024) ??
-        .build::<_, hyper::Body>(https)
+use tokio::net::TcpStream;
+//use native_tls::TlsConnector;
+use std::io::{Read, Write};
+
+struct Process {
+    body: h2::RecvStream,
+    trailers: bool,
+    bytes: usize,
+}
+
+impl Future for Process {
+    type Item = usize;
+    type Error = Error;
+
+    fn poll(&mut self) -> Poll<usize, Error> {
+        loop {
+            if self.trailers {
+                let trailers = try_ready!(self.body.poll_trailers());
+                if let Some(trailers) = trailers {
+                    println!("trailers: {:?}", trailers);
+                }
+                println!("Received {} bytes", self.bytes);
+
+                return Ok(Async::Ready(self.bytes));
+            } else {
+                match try_ready!(self.body.poll()) {
+                    Some(chunk) => {
+                        self.body.release_capacity().release_capacity(chunk.len())?;
+                        self.bytes += chunk.len();
+                        // println!("GOT FRAME {}", chunk.len());
+                    },
+                    None => {
+                        self.trailers = true;
+                    },
+                }
+            }
+        }
+    }
+}
+
+fn send_request(mut client: h2::client::SendRequest<bytes::Bytes>) -> impl Future<Item=usize, Error=Error> {
+
+    println!("sending request");
+
+    let request = http::Request::builder()
+        .uri("http://localhost/")
+        .body(())
+        .unwrap();
+
+    let (response, _stream) = client.send_request(request, true).unwrap();
+
+    response
+        .map_err(Error::from)
+        .and_then(|response| {
+            Process { body: response.into_body(), trailers: false, bytes: 0 }
+        })
 }
 
 pub fn main() -> Result<(), Error> {
 
-    let client = build_client();
+    let tcp_stream = tokio::net::TcpStream::connect(&"127.0.0.1:8008".parse().unwrap());
 
     let start = std::time::SystemTime::now();
 
-    let task = futures::stream::repeat(())
-        .take(100)
-        .and_then(move |_| {
-            let request = http::Request::builder()
-                .method("GET")
-                .uri("https://localhost:8008/")
-                .body(hyper::Body::empty())
-                .unwrap();
-
-            client
-                .request(request)
+    let tcp = tcp_stream
+        .map_err(Error::from)
+        .and_then(|c| {
+            let mut builder = native_tls::TlsConnector::builder();
+            builder.danger_accept_invalid_certs(true);
+            let connector = builder.build().unwrap();
+            let connector = tokio_tls::TlsConnector::from(connector);
+            connector.connect("localhost", c)
                 .map_err(Error::from)
-                .and_then(|resp| {
-                    resp.into_body()
-                        .map_err(Error::from)
-                        .fold(0, move |mut acc, chunk| {
-                            println!("got frame {}", chunk.len());
-                            acc += chunk.len();
-                            Ok::<_, Error>(acc)
-                        })
-                })
         })
-        .fold(0, move |mut acc, size| {
-            acc += size;
-            Ok::<_, Error>(acc)
+        .map_err(Error::from)
+        .and_then(|c| {
+            h2::client::Builder::new()
+                .initial_connection_window_size(1024*1024*1024)
+                .initial_window_size(1024*1024*1024)
+                .max_frame_size(4*1024*1024)
+                .handshake(c)
+                .map_err(Error::from)
+        })
+        .and_then(|(client, h2)| {
+
+            // Spawn a task to run the conn...
+            tokio::spawn(h2.map_err(|e| println!("GOT ERR={:?}", e)));
+
+            futures::stream::repeat(())
+                .take(100)
+                .and_then(move |_| send_request(client.clone()))
+                .fold(0, move |mut acc, size| {
+                    acc += size;
+                    Ok::<_, Error>(acc)
+                })
         })
         .then(move |result| {
             match result {
@@ -68,7 +115,7 @@ pub fn main() -> Result<(), Error> {
             Ok(())
         });
 
-    tokio::run(task);
+    tokio::run(tcp);
 
     Ok(())
 }