]> git.proxmox.com Git - proxmox-backup.git/blob - src/client/backup_writer.rs
tokio 1.0: use ReceiverStream from tokio-stream
[proxmox-backup.git] / src / client / backup_writer.rs
1 use std::collections::HashSet;
2 use std::os::unix::fs::OpenOptionsExt;
3 use std::sync::atomic::{AtomicUsize, Ordering};
4 use std::sync::{Arc, Mutex};
5
6 use anyhow::{bail, format_err, Error};
7 use futures::*;
8 use futures::stream::Stream;
9 use futures::future::AbortHandle;
10 use serde_json::{json, Value};
11 use tokio::io::AsyncReadExt;
12 use tokio::sync::{mpsc, oneshot};
13 use tokio_stream::wrappers::ReceiverStream;
14
15 use proxmox::tools::digest_to_hex;
16
17 use super::merge_known_chunks::{MergedChunkInfo, MergeKnownChunks};
18 use crate::backup::*;
19 use crate::tools::format::HumanByte;
20
21 use super::{HttpClient, H2Client};
22
23 pub struct BackupWriter {
24 h2: H2Client,
25 abort: AbortHandle,
26 verbose: bool,
27 crypt_config: Option<Arc<CryptConfig>>,
28 }
29
30 impl Drop for BackupWriter {
31
32 fn drop(&mut self) {
33 self.abort.abort();
34 }
35 }
36
37 pub struct BackupStats {
38 pub size: u64,
39 pub csum: [u8; 32],
40 }
41
42 type UploadQueueSender = mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>;
43 type UploadResultReceiver = oneshot::Receiver<Result<(), Error>>;
44
45 impl BackupWriter {
46
47 fn new(h2: H2Client, abort: AbortHandle, crypt_config: Option<Arc<CryptConfig>>, verbose: bool) -> Arc<Self> {
48 Arc::new(Self { h2, abort, crypt_config, verbose })
49 }
50
51 pub async fn start(
52 client: HttpClient,
53 crypt_config: Option<Arc<CryptConfig>>,
54 datastore: &str,
55 backup_type: &str,
56 backup_id: &str,
57 backup_time: i64,
58 debug: bool,
59 benchmark: bool
60 ) -> Result<Arc<BackupWriter>, Error> {
61
62 let param = json!({
63 "backup-type": backup_type,
64 "backup-id": backup_id,
65 "backup-time": backup_time,
66 "store": datastore,
67 "debug": debug,
68 "benchmark": benchmark
69 });
70
71 let req = HttpClient::request_builder(
72 client.server(), client.port(), "GET", "/api2/json/backup", Some(param)).unwrap();
73
74 let (h2, abort) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())).await?;
75
76 Ok(BackupWriter::new(h2, abort, crypt_config, debug))
77 }
78
79 pub async fn get(
80 &self,
81 path: &str,
82 param: Option<Value>,
83 ) -> Result<Value, Error> {
84 self.h2.get(path, param).await
85 }
86
87 pub async fn put(
88 &self,
89 path: &str,
90 param: Option<Value>,
91 ) -> Result<Value, Error> {
92 self.h2.put(path, param).await
93 }
94
95 pub async fn post(
96 &self,
97 path: &str,
98 param: Option<Value>,
99 ) -> Result<Value, Error> {
100 self.h2.post(path, param).await
101 }
102
103 pub async fn upload_post(
104 &self,
105 path: &str,
106 param: Option<Value>,
107 content_type: &str,
108 data: Vec<u8>,
109 ) -> Result<Value, Error> {
110 self.h2.upload("POST", path, param, content_type, data).await
111 }
112
113 pub async fn send_upload_request(
114 &self,
115 method: &str,
116 path: &str,
117 param: Option<Value>,
118 content_type: &str,
119 data: Vec<u8>,
120 ) -> Result<h2::client::ResponseFuture, Error> {
121
122 let request = H2Client::request_builder("localhost", method, path, param, Some(content_type)).unwrap();
123 let response_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?;
124 Ok(response_future)
125 }
126
127 pub async fn upload_put(
128 &self,
129 path: &str,
130 param: Option<Value>,
131 content_type: &str,
132 data: Vec<u8>,
133 ) -> Result<Value, Error> {
134 self.h2.upload("PUT", path, param, content_type, data).await
135 }
136
137 pub async fn finish(self: Arc<Self>) -> Result<(), Error> {
138 let h2 = self.h2.clone();
139
140 h2.post("finish", None)
141 .map_ok(move |_| {
142 self.abort.abort();
143 })
144 .await
145 }
146
147 pub fn cancel(&self) {
148 self.abort.abort();
149 }
150
151 pub async fn upload_blob<R: std::io::Read>(
152 &self,
153 mut reader: R,
154 file_name: &str,
155 ) -> Result<BackupStats, Error> {
156 let mut raw_data = Vec::new();
157 // fixme: avoid loading into memory
158 reader.read_to_end(&mut raw_data)?;
159
160 let csum = openssl::sha::sha256(&raw_data);
161 let param = json!({"encoded-size": raw_data.len(), "file-name": file_name });
162 let size = raw_data.len() as u64;
163 let _value = self.h2.upload("POST", "blob", Some(param), "application/octet-stream", raw_data).await?;
164 Ok(BackupStats { size, csum })
165 }
166
167 pub async fn upload_blob_from_data(
168 &self,
169 data: Vec<u8>,
170 file_name: &str,
171 compress: bool,
172 encrypt: bool,
173 ) -> Result<BackupStats, Error> {
174 let blob = match (encrypt, &self.crypt_config) {
175 (false, _) => DataBlob::encode(&data, None, compress)?,
176 (true, None) => bail!("requested encryption without a crypt config"),
177 (true, Some(crypt_config)) => DataBlob::encode(&data, Some(crypt_config), compress)?,
178 };
179
180 let raw_data = blob.into_inner();
181 let size = raw_data.len() as u64;
182
183 let csum = openssl::sha::sha256(&raw_data);
184 let param = json!({"encoded-size": size, "file-name": file_name });
185 let _value = self.h2.upload("POST", "blob", Some(param), "application/octet-stream", raw_data).await?;
186 Ok(BackupStats { size, csum })
187 }
188
189 pub async fn upload_blob_from_file<P: AsRef<std::path::Path>>(
190 &self,
191 src_path: P,
192 file_name: &str,
193 compress: bool,
194 encrypt: bool,
195 ) -> Result<BackupStats, Error> {
196
197 let src_path = src_path.as_ref();
198
199 let mut file = tokio::fs::File::open(src_path)
200 .await
201 .map_err(|err| format_err!("unable to open file {:?} - {}", src_path, err))?;
202
203 let mut contents = Vec::new();
204
205 file.read_to_end(&mut contents)
206 .await
207 .map_err(|err| format_err!("unable to read file {:?} - {}", src_path, err))?;
208
209 self.upload_blob_from_data(contents, file_name, compress, encrypt).await
210 }
211
212 pub async fn upload_stream(
213 &self,
214 previous_manifest: Option<Arc<BackupManifest>>,
215 archive_name: &str,
216 stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
217 prefix: &str,
218 fixed_size: Option<u64>,
219 compress: bool,
220 encrypt: bool,
221 ) -> Result<BackupStats, Error> {
222 let known_chunks = Arc::new(Mutex::new(HashSet::new()));
223
224 let mut param = json!({ "archive-name": archive_name });
225 if let Some(size) = fixed_size {
226 param["size"] = size.into();
227 }
228
229 if encrypt && self.crypt_config.is_none() {
230 bail!("requested encryption without a crypt config");
231 }
232
233 let index_path = format!("{}_index", prefix);
234 let close_path = format!("{}_close", prefix);
235
236 if let Some(manifest) = previous_manifest {
237 // try, but ignore errors
238 match archive_type(archive_name) {
239 Ok(ArchiveType::FixedIndex) => {
240 let _ = self.download_previous_fixed_index(archive_name, &manifest, known_chunks.clone()).await;
241 }
242 Ok(ArchiveType::DynamicIndex) => {
243 let _ = self.download_previous_dynamic_index(archive_name, &manifest, known_chunks.clone()).await;
244 }
245 _ => { /* do nothing */ }
246 }
247 }
248
249 let wid = self.h2.post(&index_path, Some(param)).await?.as_u64().unwrap();
250
251 let (chunk_count, chunk_reused, size, size_reused, duration, csum) =
252 Self::upload_chunk_info_stream(
253 self.h2.clone(),
254 wid,
255 stream,
256 &prefix,
257 known_chunks.clone(),
258 if encrypt { self.crypt_config.clone() } else { None },
259 compress,
260 self.verbose,
261 )
262 .await?;
263
264 let uploaded = size - size_reused;
265 let vsize_h: HumanByte = size.into();
266 let archive = if self.verbose {
267 archive_name.to_string()
268 } else {
269 crate::tools::format::strip_server_file_extension(archive_name)
270 };
271 if archive_name != CATALOG_NAME {
272 let speed: HumanByte = ((uploaded * 1_000_000) / (duration.as_micros() as usize)).into();
273 let uploaded: HumanByte = uploaded.into();
274 println!("{}: had to upload {} of {} in {:.2}s, average speed {}/s).", archive, uploaded, vsize_h, duration.as_secs_f64(), speed);
275 } else {
276 println!("Uploaded backup catalog ({})", vsize_h);
277 }
278
279 if size_reused > 0 && size > 1024*1024 {
280 let reused_percent = size_reused as f64 * 100. / size as f64;
281 let reused: HumanByte = size_reused.into();
282 println!("{}: backup was done incrementally, reused {} ({:.1}%)", archive, reused, reused_percent);
283 }
284 if self.verbose && chunk_count > 0 {
285 println!("{}: Reused {} from {} chunks.", archive, chunk_reused, chunk_count);
286 println!("{}: Average chunk size was {}.", archive, HumanByte::from(size/chunk_count));
287 println!("{}: Average time per request: {} microseconds.", archive, (duration.as_micros())/(chunk_count as u128));
288 }
289
290 let param = json!({
291 "wid": wid ,
292 "chunk-count": chunk_count,
293 "size": size,
294 "csum": proxmox::tools::digest_to_hex(&csum),
295 });
296 let _value = self.h2.post(&close_path, Some(param)).await?;
297 Ok(BackupStats {
298 size: size as u64,
299 csum,
300 })
301 }
302
303 fn response_queue(verbose: bool) -> (
304 mpsc::Sender<h2::client::ResponseFuture>,
305 oneshot::Receiver<Result<(), Error>>
306 ) {
307 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100);
308 let (verify_result_tx, verify_result_rx) = oneshot::channel();
309
310 // FIXME: check if this works as expected as replacement for the combinator below?
311 // tokio::spawn(async move {
312 // let result: Result<(), Error> = (async move {
313 // while let Some(response) = verify_queue_rx.recv().await {
314 // match H2Client::h2api_response(response.await?).await {
315 // Ok(result) => println!("RESPONSE: {:?}", result),
316 // Err(err) => bail!("pipelined request failed: {}", err),
317 // }
318 // }
319 // Ok(())
320 // }).await;
321 // let _ignore_closed_channel = verify_result_tx.send(result);
322 // });
323 // old code for reference?
324 tokio::spawn(
325 ReceiverStream::new(verify_queue_rx)
326 .map(Ok::<_, Error>)
327 .try_for_each(move |response: h2::client::ResponseFuture| {
328 response
329 .map_err(Error::from)
330 .and_then(H2Client::h2api_response)
331 .map_ok(move |result| if verbose { println!("RESPONSE: {:?}", result) })
332 .map_err(|err| format_err!("pipelined request failed: {}", err))
333 })
334 .map(|result| {
335 let _ignore_closed_channel = verify_result_tx.send(result);
336 })
337 );
338
339 (verify_queue_tx, verify_result_rx)
340 }
341
342 fn append_chunk_queue(
343 h2: H2Client,
344 wid: u64,
345 path: String,
346 verbose: bool,
347 ) -> (UploadQueueSender, UploadResultReceiver) {
348 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(64);
349 let (verify_result_tx, verify_result_rx) = oneshot::channel();
350
351 // FIXME: async-block-ify this code!
352 tokio::spawn(
353 ReceiverStream::new(verify_queue_rx)
354 .map(Ok::<_, Error>)
355 .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| {
356 match (response, merged_chunk_info) {
357 (Some(response), MergedChunkInfo::Known(list)) => {
358 future::Either::Left(
359 response
360 .map_err(Error::from)
361 .and_then(H2Client::h2api_response)
362 .and_then(move |_result| {
363 future::ok(MergedChunkInfo::Known(list))
364 })
365 )
366 }
367 (None, MergedChunkInfo::Known(list)) => {
368 future::Either::Right(future::ok(MergedChunkInfo::Known(list)))
369 }
370 _ => unreachable!(),
371 }
372 })
373 .merge_known_chunks()
374 .and_then(move |merged_chunk_info| {
375 match merged_chunk_info {
376 MergedChunkInfo::Known(chunk_list) => {
377 let mut digest_list = vec![];
378 let mut offset_list = vec![];
379 for (offset, digest) in chunk_list {
380 digest_list.push(digest_to_hex(&digest));
381 offset_list.push(offset);
382 }
383 if verbose { println!("append chunks list len ({})", digest_list.len()); }
384 let param = json!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list });
385 let request = H2Client::request_builder("localhost", "PUT", &path, None, Some("application/json")).unwrap();
386 let param_data = bytes::Bytes::from(param.to_string().into_bytes());
387 let upload_data = Some(param_data);
388 h2.send_request(request, upload_data)
389 .and_then(move |response| {
390 response
391 .map_err(Error::from)
392 .and_then(H2Client::h2api_response)
393 .map_ok(|_| ())
394 })
395 .map_err(|err| format_err!("pipelined request failed: {}", err))
396 }
397 _ => unreachable!(),
398 }
399 })
400 .try_for_each(|_| future::ok(()))
401 .map(|result| {
402 let _ignore_closed_channel = verify_result_tx.send(result);
403 })
404 );
405
406 (verify_queue_tx, verify_result_rx)
407 }
408
409 pub async fn download_previous_fixed_index(
410 &self,
411 archive_name: &str,
412 manifest: &BackupManifest,
413 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
414 ) -> Result<FixedIndexReader, Error> {
415
416 let mut tmpfile = std::fs::OpenOptions::new()
417 .write(true)
418 .read(true)
419 .custom_flags(libc::O_TMPFILE)
420 .open("/tmp")?;
421
422 let param = json!({ "archive-name": archive_name });
423 self.h2.download("previous", Some(param), &mut tmpfile).await?;
424
425 let index = FixedIndexReader::new(tmpfile)
426 .map_err(|err| format_err!("unable to read fixed index '{}' - {}", archive_name, err))?;
427 // Note: do not use values stored in index (not trusted) - instead, computed them again
428 let (csum, size) = index.compute_csum();
429 manifest.verify_file(archive_name, &csum, size)?;
430
431 // add index chunks to known chunks
432 let mut known_chunks = known_chunks.lock().unwrap();
433 for i in 0..index.index_count() {
434 known_chunks.insert(*index.index_digest(i).unwrap());
435 }
436
437 if self.verbose {
438 println!("{}: known chunks list length is {}", archive_name, index.index_count());
439 }
440
441 Ok(index)
442 }
443
444 pub async fn download_previous_dynamic_index(
445 &self,
446 archive_name: &str,
447 manifest: &BackupManifest,
448 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
449 ) -> Result<DynamicIndexReader, Error> {
450
451 let mut tmpfile = std::fs::OpenOptions::new()
452 .write(true)
453 .read(true)
454 .custom_flags(libc::O_TMPFILE)
455 .open("/tmp")?;
456
457 let param = json!({ "archive-name": archive_name });
458 self.h2.download("previous", Some(param), &mut tmpfile).await?;
459
460 let index = DynamicIndexReader::new(tmpfile)
461 .map_err(|err| format_err!("unable to read dynmamic index '{}' - {}", archive_name, err))?;
462 // Note: do not use values stored in index (not trusted) - instead, computed them again
463 let (csum, size) = index.compute_csum();
464 manifest.verify_file(archive_name, &csum, size)?;
465
466 // add index chunks to known chunks
467 let mut known_chunks = known_chunks.lock().unwrap();
468 for i in 0..index.index_count() {
469 known_chunks.insert(*index.index_digest(i).unwrap());
470 }
471
472 if self.verbose {
473 println!("{}: known chunks list length is {}", archive_name, index.index_count());
474 }
475
476 Ok(index)
477 }
478
479 /// Retrieve backup time of last backup
480 pub async fn previous_backup_time(&self) -> Result<Option<i64>, Error> {
481 let data = self.h2.get("previous_backup_time", None).await?;
482 serde_json::from_value(data)
483 .map_err(|err| format_err!("Failed to parse backup time value returned by server - {}", err))
484 }
485
486 /// Download backup manifest (index.json) of last backup
487 pub async fn download_previous_manifest(&self) -> Result<BackupManifest, Error> {
488
489 let mut raw_data = Vec::with_capacity(64 * 1024);
490
491 let param = json!({ "archive-name": MANIFEST_BLOB_NAME });
492 self.h2.download("previous", Some(param), &mut raw_data).await?;
493
494 let blob = DataBlob::load_from_reader(&mut &raw_data[..])?;
495 // no expected digest available
496 let data = blob.decode(self.crypt_config.as_ref().map(Arc::as_ref), None)?;
497
498 let manifest = BackupManifest::from_data(&data[..], self.crypt_config.as_ref().map(Arc::as_ref))?;
499
500 Ok(manifest)
501 }
502
503 // We have no `self` here for `h2` and `verbose`, the only other arg "common" with 1 other
504 // funciton in the same path is `wid`, so those 3 could be in a struct, but there's no real use
505 // since this is a private method.
506 #[allow(clippy::too_many_arguments)]
507 fn upload_chunk_info_stream(
508 h2: H2Client,
509 wid: u64,
510 stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
511 prefix: &str,
512 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
513 crypt_config: Option<Arc<CryptConfig>>,
514 compress: bool,
515 verbose: bool,
516 ) -> impl Future<Output = Result<(usize, usize, usize, usize, std::time::Duration, [u8; 32]), Error>> {
517
518 let total_chunks = Arc::new(AtomicUsize::new(0));
519 let total_chunks2 = total_chunks.clone();
520 let known_chunk_count = Arc::new(AtomicUsize::new(0));
521 let known_chunk_count2 = known_chunk_count.clone();
522
523 let stream_len = Arc::new(AtomicUsize::new(0));
524 let stream_len2 = stream_len.clone();
525 let reused_len = Arc::new(AtomicUsize::new(0));
526 let reused_len2 = reused_len.clone();
527
528 let append_chunk_path = format!("{}_index", prefix);
529 let upload_chunk_path = format!("{}_chunk", prefix);
530 let is_fixed_chunk_size = prefix == "fixed";
531
532 let (upload_queue, upload_result) =
533 Self::append_chunk_queue(h2.clone(), wid, append_chunk_path, verbose);
534
535 let start_time = std::time::Instant::now();
536
537 let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
538 let index_csum_2 = index_csum.clone();
539
540 stream
541 .and_then(move |data| {
542
543 let chunk_len = data.len();
544
545 total_chunks.fetch_add(1, Ordering::SeqCst);
546 let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;
547
548 let mut chunk_builder = DataChunkBuilder::new(data.as_ref())
549 .compress(compress);
550
551 if let Some(ref crypt_config) = crypt_config {
552 chunk_builder = chunk_builder.crypt_config(crypt_config);
553 }
554
555 let mut known_chunks = known_chunks.lock().unwrap();
556 let digest = chunk_builder.digest();
557
558 let mut guard = index_csum.lock().unwrap();
559 let csum = guard.as_mut().unwrap();
560
561 let chunk_end = offset + chunk_len as u64;
562
563 if !is_fixed_chunk_size { csum.update(&chunk_end.to_le_bytes()); }
564 csum.update(digest);
565
566 let chunk_is_known = known_chunks.contains(digest);
567 if chunk_is_known {
568 known_chunk_count.fetch_add(1, Ordering::SeqCst);
569 reused_len.fetch_add(chunk_len, Ordering::SeqCst);
570 future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
571 } else {
572 known_chunks.insert(*digest);
573 future::ready(chunk_builder
574 .build()
575 .map(move |(chunk, digest)| MergedChunkInfo::New(ChunkInfo {
576 chunk,
577 digest,
578 chunk_len: chunk_len as u64,
579 offset,
580 }))
581 )
582 }
583 })
584 .merge_known_chunks()
585 .try_for_each(move |merged_chunk_info| {
586
587 if let MergedChunkInfo::New(chunk_info) = merged_chunk_info {
588 let offset = chunk_info.offset;
589 let digest = chunk_info.digest;
590 let digest_str = digest_to_hex(&digest);
591
592 /* too verbose, needs finer verbosity setting granularity
593 if verbose {
594 println!("upload new chunk {} ({} bytes, offset {})", digest_str,
595 chunk_info.chunk_len, offset);
596 }
597 */
598
599 let chunk_data = chunk_info.chunk.into_inner();
600 let param = json!({
601 "wid": wid,
602 "digest": digest_str,
603 "size": chunk_info.chunk_len,
604 "encoded-size": chunk_data.len(),
605 });
606
607 let ct = "application/octet-stream";
608 let request = H2Client::request_builder("localhost", "POST", &upload_chunk_path, Some(param), Some(ct)).unwrap();
609 let upload_data = Some(bytes::Bytes::from(chunk_data));
610
611 let new_info = MergedChunkInfo::Known(vec![(offset, digest)]);
612
613 let mut upload_queue = upload_queue.clone();
614 future::Either::Left(h2
615 .send_request(request, upload_data)
616 .and_then(move |response| async move {
617 upload_queue
618 .send((new_info, Some(response)))
619 .await
620 .map_err(|err| format_err!("failed to send to upload queue: {}", err))
621 })
622 )
623 } else {
624 let mut upload_queue = upload_queue.clone();
625 future::Either::Right(async move {
626 upload_queue
627 .send((merged_chunk_info, None))
628 .await
629 .map_err(|err| format_err!("failed to send to upload queue: {}", err))
630 })
631 }
632 })
633 .then(move |result| async move {
634 upload_result.await?.and(result)
635 }.boxed())
636 .and_then(move |_| {
637 let duration = start_time.elapsed();
638 let total_chunks = total_chunks2.load(Ordering::SeqCst);
639 let known_chunk_count = known_chunk_count2.load(Ordering::SeqCst);
640 let stream_len = stream_len2.load(Ordering::SeqCst);
641 let reused_len = reused_len2.load(Ordering::SeqCst);
642
643 let mut guard = index_csum_2.lock().unwrap();
644 let csum = guard.take().unwrap().finish();
645
646 futures::future::ok((total_chunks, known_chunk_count, stream_len, reused_len, duration, csum))
647 })
648 }
649
650 /// Upload speed test - prints result to stderr
651 pub async fn upload_speedtest(&self, verbose: bool) -> Result<f64, Error> {
652
653 let mut data = vec![];
654 // generate pseudo random byte sequence
655 for i in 0..1024*1024 {
656 for j in 0..4 {
657 let byte = ((i >> (j<<3))&0xff) as u8;
658 data.push(byte);
659 }
660 }
661
662 let item_len = data.len();
663
664 let mut repeat = 0;
665
666 let (upload_queue, upload_result) = Self::response_queue(verbose);
667
668 let start_time = std::time::Instant::now();
669
670 loop {
671 repeat += 1;
672 if start_time.elapsed().as_secs() >= 5 {
673 break;
674 }
675
676 let mut upload_queue = upload_queue.clone();
677
678 if verbose { eprintln!("send test data ({} bytes)", data.len()); }
679 let request = H2Client::request_builder("localhost", "POST", "speedtest", None, None).unwrap();
680 let request_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?;
681
682 upload_queue.send(request_future).await?;
683 }
684
685 drop(upload_queue); // close queue
686
687 let _ = upload_result.await?;
688
689 eprintln!("Uploaded {} chunks in {} seconds.", repeat, start_time.elapsed().as_secs());
690 let speed = ((item_len*(repeat as usize)) as f64)/start_time.elapsed().as_secs_f64();
691 eprintln!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128));
692
693 Ok(speed)
694 }
695 }