})
}
- pub fn upload_speedtest(&self) -> impl Future<Output = Result<usize, Error>> {
+ pub async fn upload_speedtest(&self) -> Result<usize, Error> {
let mut data = vec![];
// generate pseudo random byte sequence
let item_len = data.len();
- let repeat = Arc::new(AtomicUsize::new(0));
- let repeat2 = repeat.clone();
+ let mut repeat = 0;
let (upload_queue, upload_result) = Self::response_queue();
let start_time = std::time::Instant::now();
- let h2 = self.h2.clone();
+ loop {
+ repeat += 1;
+ if start_time.elapsed().as_secs() >= 5 {
+ break;
+ }
- futures::stream::repeat(data)
- .take_while(move |_| {
- let repeat = Arc::clone(&repeat);
- async move {
- repeat.fetch_add(1, Ordering::SeqCst);
- start_time.elapsed().as_secs() < 5
- }
- })
- .map(Ok)
- .try_for_each(move |data| {
- let h2 = h2.clone();
+ let mut upload_queue = upload_queue.clone();
- let mut upload_queue = upload_queue.clone();
+ println!("send test data ({} bytes)", data.len());
+ let request = H2Client::request_builder("localhost", "POST", "speedtest", None).unwrap();
+ let request_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?;
- println!("send test data ({} bytes)", data.len());
- let request = H2Client::request_builder("localhost", "POST", "speedtest", None).unwrap();
- h2.send_request(request, Some(bytes::Bytes::from(data)))
- .and_then(move |response| async move {
- upload_queue
- .send(response)
- .await
- .map_err(Error::from)
- })
- })
- .then(move |result| async move {
- println!("RESULT {:?}", result);
- upload_result.await?.and(result)
- })
- .and_then(move |_| {
- let repeat = repeat2.load(Ordering::SeqCst);
- println!("Uploaded {} chunks in {} seconds.", repeat, start_time.elapsed().as_secs());
- let speed = ((item_len*1000000*(repeat as usize))/(1024*1024))/(start_time.elapsed().as_micros() as usize);
- if repeat > 0 {
- println!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128));
- }
- futures::future::ok(speed)
- })
+ upload_queue.send(request_future).await?;
+ }
+
+ drop(upload_queue); // close queue
+
+ let _ = upload_result.await?;
+
+ println!("Uploaded {} chunks in {} seconds.", repeat, start_time.elapsed().as_secs());
+ let speed = ((item_len*1000000*(repeat as usize))/(1024*1024))/(start_time.elapsed().as_micros() as usize);
+ println!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128));
+
+ Ok(speed)
}
}