]> git.proxmox.com Git - proxmox-backup.git/commitdiff
src/client/http_client.rs: use async for upload_speedtest()
authorDietmar Maurer <dietmar@proxmox.com>
Fri, 6 Sep 2019 06:55:47 +0000 (08:55 +0200)
committerDietmar Maurer <dietmar@proxmox.com>
Fri, 6 Sep 2019 06:55:47 +0000 (08:55 +0200)
src/client/http_client.rs

index 6b2be5fccb51819569a40a2faab68c3935970db3..b3d5d7799e5e356a80f15a8b430ab2c1c469469e 100644 (file)
@@ -993,7 +993,7 @@ impl BackupClient {
             })
     }
 
-    pub fn upload_speedtest(&self) -> impl Future<Output = Result<usize, Error>> {
+    pub async fn upload_speedtest(&self) -> Result<usize, Error> {
 
         let mut data = vec![];
         // generate pseudo random byte sequence
@@ -1006,52 +1006,36 @@ impl BackupClient {
 
         let item_len = data.len();
 
-        let repeat = Arc::new(AtomicUsize::new(0));
-        let repeat2 = repeat.clone();
+        let mut repeat = 0;
 
         let (upload_queue, upload_result) = Self::response_queue();
 
         let start_time = std::time::Instant::now();
 
-        let h2 = self.h2.clone();
+        loop {
+            repeat += 1;
+            if start_time.elapsed().as_secs() >= 5 {
+                break;
+            }
 
-        futures::stream::repeat(data)
-            .take_while(move |_| {
-                let repeat = Arc::clone(&repeat);
-                async move {
-                    repeat.fetch_add(1, Ordering::SeqCst);
-                    start_time.elapsed().as_secs() < 5
-                }
-            })
-            .map(Ok)
-            .try_for_each(move |data| {
-                let h2 = h2.clone();
+            let mut upload_queue = upload_queue.clone();
 
-                let mut upload_queue = upload_queue.clone();
+            println!("send test data ({} bytes)", data.len());
+            let request = H2Client::request_builder("localhost", "POST", "speedtest", None).unwrap();
+            let request_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?;
 
-                println!("send test data ({} bytes)", data.len());
-                let request = H2Client::request_builder("localhost", "POST", "speedtest", None).unwrap();
-                h2.send_request(request, Some(bytes::Bytes::from(data)))
-                    .and_then(move |response| async move {
-                        upload_queue
-                            .send(response)
-                            .await
-                            .map_err(Error::from)
-                    })
-            })
-            .then(move |result| async move {
-                println!("RESULT {:?}", result);
-                upload_result.await?.and(result)
-            })
-            .and_then(move |_| {
-                let repeat = repeat2.load(Ordering::SeqCst);
-                println!("Uploaded {} chunks in {} seconds.", repeat, start_time.elapsed().as_secs());
-                let speed = ((item_len*1000000*(repeat as usize))/(1024*1024))/(start_time.elapsed().as_micros() as usize);
-                if repeat > 0 {
-                    println!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128));
-                }
-                futures::future::ok(speed)
-            })
+            upload_queue.send(request_future).await?;
+        }
+
+        drop(upload_queue); // close queue
+
+        let _ = upload_result.await?;
+
+        println!("Uploaded {} chunks in {} seconds.", repeat, start_time.elapsed().as_secs());
+        let speed = ((item_len*1000000*(repeat as usize))/(1024*1024))/(start_time.elapsed().as_micros() as usize);
+        println!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128));
+
+        Ok(speed)
     }
 }