builder.danger_accept_invalid_certs(true);
let tlsconnector = builder.build().unwrap();
let mut httpc = hyper::client::HttpConnector::new(1);
- httpc.set_nodelay(true); // important!
+ //httpc.set_nodelay(true); // not sure if this help?
httpc.enforce_http(false); // we want https...
let mut https = hyper_tls::HttpsConnector::from((httpc, tlsconnector));
https.https_only(true); // force it!
}
let item_len = data.len();
- let repeat = 100;
- let start = std::time::SystemTime::now();
+ use std::sync::atomic::{AtomicUsize, Ordering};
+
+ let repeat = std::sync::Arc::new(AtomicUsize::new(0));
+ let repeat2 = repeat.clone();
+
+ use tokio::sync::mpsc;
+ use futures::*;
+
+ let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100);
+ let (verify_result_tx, verify_result_rx) = sync::oneshot::channel();
+
+ hyper::rt::spawn(
+ verify_queue_rx
+ .map_err(Error::from)
+ .and_then(|response: h2::client::ResponseFuture| {
+ response
+ .map_err(Error::from)
+ .and_then(Self::h2api_response)
+ .map_err(|err| format_err!("speedtest chunk upload failed: {}", err))
+ })
+ .for_each(|result| {
+ //println!("response: {:?}", result);
+ Ok(())
+ })
+ .then(|result| verify_result_tx.send(result))
+ .map_err(|_| { /* ignore closed channel */ })
+ );
+
+ let start_time = std::time::Instant::now();
futures::stream::repeat(data)
- .take(repeat)
+ .take_while(move |_| {
+ repeat.fetch_add(1, Ordering::SeqCst);
+ Ok(start_time.elapsed().as_secs() < 5)
+ })
.for_each(move |data| {
let request = Self::request_builder("localhost", "POST", "speedtest", None).unwrap();
let (response, stream) = send_request.send_request(request, false).unwrap();
- println!("send test data ({} bytes)", data.len());
+ //println!("send test data ({} bytes)", data.len());
+ let verify_queue_tx = verify_queue_tx.clone();
PipeToSendStream::new(bytes::Bytes::from(data), stream)
- .and_then(|_| {
- response
- .map_err(Error::from)
- .and_then(Self::h2api_response)
- .and_then(|_| Ok(()))
+ .and_then(move |_| {
+ verify_queue_tx.send(response).map_err(Error::from).map(|_| ())
})
})
+ .then(move |result| {
+ verify_result_rx.map_err(Error::from).and_then(|verify_result| {
+ Ok(verify_result.and(result))
+ })
+ })
+ .flatten()
.and_then(move |_| {
- let speed = ((item_len*1000000*(repeat as usize))/(1024*1024))/(start.elapsed()?.as_micros() as usize);
- println!("time per request: {} microseconds", (start.elapsed()?.as_micros())/(repeat as u128));
+ let repeat = repeat2.load(Ordering::SeqCst);
+ println!("Uploaded {} chunks in {} seconds", repeat, start_time.elapsed().as_secs());
+ let speed = ((item_len*1000000*(repeat as usize))/(1024*1024))/(start_time.elapsed().as_micros() as usize);
+ println!("time per request: {} microseconds", (start_time.elapsed().as_micros())/(repeat as u128));
Ok(speed)
})
})