]> git.proxmox.com Git - proxmox-backup.git/commitdiff
src/client/http_client.rs: improve speed test
authorDietmar Maurer <dietmar@proxmox.com>
Fri, 17 May 2019 09:51:14 +0000 (11:51 +0200)
committerDietmar Maurer <dietmar@proxmox.com>
Sat, 18 May 2019 08:55:46 +0000 (10:55 +0200)
pipeline responses to a separate async channel ...

src/client/http_client.rs

index 3abae83c02653571dd19c0bb81a8ec9e1ab715e1..927b3ac1c7c1e779552a3ae159d93526090a5ab4 100644 (file)
@@ -153,7 +153,7 @@ impl HttpClient {
         builder.danger_accept_invalid_certs(true);
         let tlsconnector = builder.build().unwrap();
         let mut httpc = hyper::client::HttpConnector::new(1);
-        httpc.set_nodelay(true); // important!
+        //httpc.set_nodelay(true); // not sure if this help?
         httpc.enforce_http(false); // we want https...
         let mut https = hyper_tls::HttpsConnector::from((httpc, tlsconnector));
         https.https_only(true); // force it!
@@ -446,27 +446,63 @@ impl H2Client {
                 }
 
                 let item_len = data.len();
-                let repeat = 100;
 
-                let start = std::time::SystemTime::now();
+                use std::sync::atomic::{AtomicUsize, Ordering};
+
+                let repeat = std::sync::Arc::new(AtomicUsize::new(0));
+                let repeat2 = repeat.clone();
+
+                use tokio::sync::mpsc;
+                use futures::*;
+
+                let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100);
+                let (verify_result_tx, verify_result_rx) = sync::oneshot::channel();
+
+                hyper::rt::spawn(
+                    verify_queue_rx
+                        .map_err(Error::from)
+                        .and_then(|response: h2::client::ResponseFuture| {
+                            response
+                                .map_err(Error::from)
+                                .and_then(Self::h2api_response)
+                                .map_err(|err| format_err!("speedtest chunk upload failed: {}", err))
+                       })
+                        .for_each(|result| {
+                            //println!("response: {:?}", result);
+                            Ok(())
+                        })
+                        .then(|result| verify_result_tx.send(result))
+                        .map_err(|_| { /* ignore closed channel */ })
+                );
+
+                let start_time = std::time::Instant::now();
 
                 futures::stream::repeat(data)
-                    .take(repeat)
+                    .take_while(move |_| {
+                        repeat.fetch_add(1, Ordering::SeqCst);
+                        Ok(start_time.elapsed().as_secs() < 5)
+                    })
                     .for_each(move |data| {
                         let request = Self::request_builder("localhost", "POST", "speedtest", None).unwrap();
                         let (response, stream) = send_request.send_request(request, false).unwrap();
-                        println!("send test data ({} bytes)", data.len());
+                        //println!("send test data ({} bytes)", data.len());
+                        let verify_queue_tx = verify_queue_tx.clone();
                         PipeToSendStream::new(bytes::Bytes::from(data), stream)
-                            .and_then(|_| {
-                                response
-                                    .map_err(Error::from)
-                                    .and_then(Self::h2api_response)
-                                    .and_then(|_| Ok(()))
+                            .and_then(move |_| {
+                                verify_queue_tx.send(response).map_err(Error::from).map(|_| ())
                             })
                     })
+                    .then(move |result| {
+                        verify_result_rx.map_err(Error::from).and_then(|verify_result| {
+                            Ok(verify_result.and(result))
+                        })
+                    })
+                    .flatten()
                     .and_then(move |_| {
-                        let speed = ((item_len*1000000*(repeat as usize))/(1024*1024))/(start.elapsed()?.as_micros() as usize);
-                        println!("time per request: {} microseconds", (start.elapsed()?.as_micros())/(repeat as u128));
+                        let repeat = repeat2.load(Ordering::SeqCst);
+                        println!("Uploaded {} chunks in {} seconds", repeat, start_time.elapsed().as_secs());
+                        let speed = ((item_len*1000000*(repeat as usize))/(1024*1024))/(start_time.elapsed().as_micros() as usize);
+                        println!("time per request: {} microseconds", (start_time.elapsed().as_micros())/(repeat as u128));
                         Ok(speed)
                     })
             })