(verify_queue_tx, verify_result_rx)
}
- fn download_chunk_list(
+ async fn download_chunk_list(
h2: H2Client,
path: &str,
archive_name: &str,
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
- ) -> impl Future<Output = Result<(), Error>> {
+ ) -> Result<(), Error> {
let param = json!({ "archive-name": archive_name });
let request = H2Client::request_builder("localhost", "GET", path, Some(param)).unwrap();
- h2.send_request(request, None)
- .and_then(move |response| {
- response
- .map_err(Error::from)
- .and_then(move |resp| {
- let status = resp.status();
+ let h2request = h2.send_request(request, None).await?;
+ let resp = h2request.await?;
- if !status.is_success() {
- future::Either::Left(
- H2Client::h2api_response(resp)
- .map(|_| Err(format_err!("unknown error")))
- )
- } else {
- future::Either::Right(future::ok(resp.into_body()))
- }
- })
- .and_then(move |mut body| {
-
- let mut release_capacity = body.release_capacity().clone();
-
- DigestListDecoder::new(body.map_err(Error::from))
- .try_for_each(move |chunk| {
- let _ = release_capacity.release_capacity(chunk.len());
- println!("GOT DOWNLOAD {}", digest_to_hex(&chunk));
- known_chunks.lock().unwrap().insert(chunk);
- futures::future::ok(())
- })
- })
+ let status = resp.status();
+
+ if !status.is_success() {
+ H2Client::h2api_response(resp).await?; // raise error
+ unreachable!();
+ }
+
+ let mut body = resp.into_body();
+ let mut release_capacity = body.release_capacity().clone();
+
+ DigestListDecoder::new(body.map_err(Error::from))
+ .try_for_each(move |chunk| {
+ let _ = release_capacity.release_capacity(chunk.len());
+ println!("GOT DOWNLOAD {}", digest_to_hex(&chunk));
+ known_chunks.lock().unwrap().insert(chunk);
+ futures::future::ok(())
})
+ .await
}
fn upload_chunk_info_stream(