]> git.proxmox.com Git - proxmox-backup.git/blob - src/client/backup_writer.rs
5ab38b777198f2446927139883f8ba68d2d66d37
[proxmox-backup.git] / src / client / backup_writer.rs
1 use std::collections::HashSet;
2 use std::future::Future;
3 use std::os::unix::fs::OpenOptionsExt;
4 use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
5 use std::sync::{Arc, Mutex};
6
7 use anyhow::{bail, format_err, Error};
8 use futures::future::{self, AbortHandle, Either, FutureExt, TryFutureExt};
9 use futures::stream::{Stream, StreamExt, TryStreamExt};
10 use serde_json::{json, Value};
11 use tokio::io::AsyncReadExt;
12 use tokio::sync::{mpsc, oneshot};
13 use tokio_stream::wrappers::ReceiverStream;
14
15 use proxmox::tools::digest_to_hex;
16
17 use pbs_tools::format::HumanByte;
18
19 use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo};
20 use crate::backup::*;
21
22 use super::{H2Client, HttpClient};
23
24 pub struct BackupWriter {
25 h2: H2Client,
26 abort: AbortHandle,
27 verbose: bool,
28 crypt_config: Option<Arc<CryptConfig>>,
29 }
30
31 impl Drop for BackupWriter {
32 fn drop(&mut self) {
33 self.abort.abort();
34 }
35 }
36
37 pub struct BackupStats {
38 pub size: u64,
39 pub csum: [u8; 32],
40 }
41
42 /// Options for uploading blobs/streams to the server
43 #[derive(Default, Clone)]
44 pub struct UploadOptions {
45 pub previous_manifest: Option<Arc<BackupManifest>>,
46 pub compress: bool,
47 pub encrypt: bool,
48 pub fixed_size: Option<u64>,
49 }
50
51 struct UploadStats {
52 chunk_count: usize,
53 chunk_reused: usize,
54 size: usize,
55 size_reused: usize,
56 size_compressed: usize,
57 duration: std::time::Duration,
58 csum: [u8; 32],
59 }
60
61 type UploadQueueSender = mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>;
62 type UploadResultReceiver = oneshot::Receiver<Result<(), Error>>;
63
64 impl BackupWriter {
65 fn new(
66 h2: H2Client,
67 abort: AbortHandle,
68 crypt_config: Option<Arc<CryptConfig>>,
69 verbose: bool,
70 ) -> Arc<Self> {
71 Arc::new(Self {
72 h2,
73 abort,
74 crypt_config,
75 verbose,
76 })
77 }
78
79 // FIXME: extract into (flattened) parameter struct?
80 #[allow(clippy::too_many_arguments)]
81 pub async fn start(
82 client: HttpClient,
83 crypt_config: Option<Arc<CryptConfig>>,
84 datastore: &str,
85 backup_type: &str,
86 backup_id: &str,
87 backup_time: i64,
88 debug: bool,
89 benchmark: bool,
90 ) -> Result<Arc<BackupWriter>, Error> {
91 let param = json!({
92 "backup-type": backup_type,
93 "backup-id": backup_id,
94 "backup-time": backup_time,
95 "store": datastore,
96 "debug": debug,
97 "benchmark": benchmark
98 });
99
100 let req = HttpClient::request_builder(
101 client.server(),
102 client.port(),
103 "GET",
104 "/api2/json/backup",
105 Some(param),
106 )
107 .unwrap();
108
109 let (h2, abort) = client
110 .start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!()))
111 .await?;
112
113 Ok(BackupWriter::new(h2, abort, crypt_config, debug))
114 }
115
116 pub async fn get(&self, path: &str, param: Option<Value>) -> Result<Value, Error> {
117 self.h2.get(path, param).await
118 }
119
120 pub async fn put(&self, path: &str, param: Option<Value>) -> Result<Value, Error> {
121 self.h2.put(path, param).await
122 }
123
124 pub async fn post(&self, path: &str, param: Option<Value>) -> Result<Value, Error> {
125 self.h2.post(path, param).await
126 }
127
128 pub async fn upload_post(
129 &self,
130 path: &str,
131 param: Option<Value>,
132 content_type: &str,
133 data: Vec<u8>,
134 ) -> Result<Value, Error> {
135 self.h2
136 .upload("POST", path, param, content_type, data)
137 .await
138 }
139
140 pub async fn send_upload_request(
141 &self,
142 method: &str,
143 path: &str,
144 param: Option<Value>,
145 content_type: &str,
146 data: Vec<u8>,
147 ) -> Result<h2::client::ResponseFuture, Error> {
148 let request =
149 H2Client::request_builder("localhost", method, path, param, Some(content_type))
150 .unwrap();
151 let response_future = self
152 .h2
153 .send_request(request, Some(bytes::Bytes::from(data.clone())))
154 .await?;
155 Ok(response_future)
156 }
157
158 pub async fn upload_put(
159 &self,
160 path: &str,
161 param: Option<Value>,
162 content_type: &str,
163 data: Vec<u8>,
164 ) -> Result<Value, Error> {
165 self.h2.upload("PUT", path, param, content_type, data).await
166 }
167
168 pub async fn finish(self: Arc<Self>) -> Result<(), Error> {
169 let h2 = self.h2.clone();
170
171 h2.post("finish", None)
172 .map_ok(move |_| {
173 self.abort.abort();
174 })
175 .await
176 }
177
178 pub fn cancel(&self) {
179 self.abort.abort();
180 }
181
182 pub async fn upload_blob<R: std::io::Read>(
183 &self,
184 mut reader: R,
185 file_name: &str,
186 ) -> Result<BackupStats, Error> {
187 let mut raw_data = Vec::new();
188 // fixme: avoid loading into memory
189 reader.read_to_end(&mut raw_data)?;
190
191 let csum = openssl::sha::sha256(&raw_data);
192 let param = json!({"encoded-size": raw_data.len(), "file-name": file_name });
193 let size = raw_data.len() as u64;
194 let _value = self
195 .h2
196 .upload(
197 "POST",
198 "blob",
199 Some(param),
200 "application/octet-stream",
201 raw_data,
202 )
203 .await?;
204 Ok(BackupStats { size, csum })
205 }
206
207 pub async fn upload_blob_from_data(
208 &self,
209 data: Vec<u8>,
210 file_name: &str,
211 options: UploadOptions,
212 ) -> Result<BackupStats, Error> {
213 let blob = match (options.encrypt, &self.crypt_config) {
214 (false, _) => DataBlob::encode(&data, None, options.compress)?,
215 (true, None) => bail!("requested encryption without a crypt config"),
216 (true, Some(crypt_config)) => {
217 DataBlob::encode(&data, Some(crypt_config), options.compress)?
218 }
219 };
220
221 let raw_data = blob.into_inner();
222 let size = raw_data.len() as u64;
223
224 let csum = openssl::sha::sha256(&raw_data);
225 let param = json!({"encoded-size": size, "file-name": file_name });
226 let _value = self
227 .h2
228 .upload(
229 "POST",
230 "blob",
231 Some(param),
232 "application/octet-stream",
233 raw_data,
234 )
235 .await?;
236 Ok(BackupStats { size, csum })
237 }
238
239 pub async fn upload_blob_from_file<P: AsRef<std::path::Path>>(
240 &self,
241 src_path: P,
242 file_name: &str,
243 options: UploadOptions,
244 ) -> Result<BackupStats, Error> {
245 let src_path = src_path.as_ref();
246
247 let mut file = tokio::fs::File::open(src_path)
248 .await
249 .map_err(|err| format_err!("unable to open file {:?} - {}", src_path, err))?;
250
251 let mut contents = Vec::new();
252
253 file.read_to_end(&mut contents)
254 .await
255 .map_err(|err| format_err!("unable to read file {:?} - {}", src_path, err))?;
256
257 self.upload_blob_from_data(contents, file_name, options)
258 .await
259 }
260
261 pub async fn upload_stream(
262 &self,
263 archive_name: &str,
264 stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
265 options: UploadOptions,
266 ) -> Result<BackupStats, Error> {
267 let known_chunks = Arc::new(Mutex::new(HashSet::new()));
268
269 let mut param = json!({ "archive-name": archive_name });
270 let prefix = if let Some(size) = options.fixed_size {
271 param["size"] = size.into();
272 "fixed"
273 } else {
274 "dynamic"
275 };
276
277 if options.encrypt && self.crypt_config.is_none() {
278 bail!("requested encryption without a crypt config");
279 }
280
281 let index_path = format!("{}_index", prefix);
282 let close_path = format!("{}_close", prefix);
283
284 if let Some(manifest) = options.previous_manifest {
285 // try, but ignore errors
286 match archive_type(archive_name) {
287 Ok(ArchiveType::FixedIndex) => {
288 let _ = self
289 .download_previous_fixed_index(
290 archive_name,
291 &manifest,
292 known_chunks.clone(),
293 )
294 .await;
295 }
296 Ok(ArchiveType::DynamicIndex) => {
297 let _ = self
298 .download_previous_dynamic_index(
299 archive_name,
300 &manifest,
301 known_chunks.clone(),
302 )
303 .await;
304 }
305 _ => { /* do nothing */ }
306 }
307 }
308
309 let wid = self
310 .h2
311 .post(&index_path, Some(param))
312 .await?
313 .as_u64()
314 .unwrap();
315
316 let upload_stats = Self::upload_chunk_info_stream(
317 self.h2.clone(),
318 wid,
319 stream,
320 &prefix,
321 known_chunks.clone(),
322 if options.encrypt {
323 self.crypt_config.clone()
324 } else {
325 None
326 },
327 options.compress,
328 self.verbose,
329 )
330 .await?;
331
332 let size_dirty = upload_stats.size - upload_stats.size_reused;
333 let size: HumanByte = upload_stats.size.into();
334 let archive = if self.verbose {
335 archive_name.to_string()
336 } else {
337 pbs_tools::format::strip_server_file_extension(archive_name)
338 };
339 if archive_name != CATALOG_NAME {
340 let speed: HumanByte =
341 ((size_dirty * 1_000_000) / (upload_stats.duration.as_micros() as usize)).into();
342 let size_dirty: HumanByte = size_dirty.into();
343 let size_compressed: HumanByte = upload_stats.size_compressed.into();
344 println!(
345 "{}: had to backup {} of {} (compressed {}) in {:.2}s",
346 archive,
347 size_dirty,
348 size,
349 size_compressed,
350 upload_stats.duration.as_secs_f64()
351 );
352 println!("{}: average backup speed: {}/s", archive, speed);
353 } else {
354 println!("Uploaded backup catalog ({})", size);
355 }
356
357 if upload_stats.size_reused > 0 && upload_stats.size > 1024 * 1024 {
358 let reused_percent = upload_stats.size_reused as f64 * 100. / upload_stats.size as f64;
359 let reused: HumanByte = upload_stats.size_reused.into();
360 println!(
361 "{}: backup was done incrementally, reused {} ({:.1}%)",
362 archive, reused, reused_percent
363 );
364 }
365 if self.verbose && upload_stats.chunk_count > 0 {
366 println!(
367 "{}: Reused {} from {} chunks.",
368 archive, upload_stats.chunk_reused, upload_stats.chunk_count
369 );
370 println!(
371 "{}: Average chunk size was {}.",
372 archive,
373 HumanByte::from(upload_stats.size / upload_stats.chunk_count)
374 );
375 println!(
376 "{}: Average time per request: {} microseconds.",
377 archive,
378 (upload_stats.duration.as_micros()) / (upload_stats.chunk_count as u128)
379 );
380 }
381
382 let param = json!({
383 "wid": wid ,
384 "chunk-count": upload_stats.chunk_count,
385 "size": upload_stats.size,
386 "csum": proxmox::tools::digest_to_hex(&upload_stats.csum),
387 });
388 let _value = self.h2.post(&close_path, Some(param)).await?;
389 Ok(BackupStats {
390 size: upload_stats.size as u64,
391 csum: upload_stats.csum,
392 })
393 }
394
395 fn response_queue(
396 verbose: bool,
397 ) -> (
398 mpsc::Sender<h2::client::ResponseFuture>,
399 oneshot::Receiver<Result<(), Error>>,
400 ) {
401 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100);
402 let (verify_result_tx, verify_result_rx) = oneshot::channel();
403
404 // FIXME: check if this works as expected as replacement for the combinator below?
405 // tokio::spawn(async move {
406 // let result: Result<(), Error> = (async move {
407 // while let Some(response) = verify_queue_rx.recv().await {
408 // match H2Client::h2api_response(response.await?).await {
409 // Ok(result) => println!("RESPONSE: {:?}", result),
410 // Err(err) => bail!("pipelined request failed: {}", err),
411 // }
412 // }
413 // Ok(())
414 // }).await;
415 // let _ignore_closed_channel = verify_result_tx.send(result);
416 // });
417 // old code for reference?
418 tokio::spawn(
419 ReceiverStream::new(verify_queue_rx)
420 .map(Ok::<_, Error>)
421 .try_for_each(move |response: h2::client::ResponseFuture| {
422 response
423 .map_err(Error::from)
424 .and_then(H2Client::h2api_response)
425 .map_ok(move |result| {
426 if verbose {
427 println!("RESPONSE: {:?}", result)
428 }
429 })
430 .map_err(|err| format_err!("pipelined request failed: {}", err))
431 })
432 .map(|result| {
433 let _ignore_closed_channel = verify_result_tx.send(result);
434 }),
435 );
436
437 (verify_queue_tx, verify_result_rx)
438 }
439
440 fn append_chunk_queue(
441 h2: H2Client,
442 wid: u64,
443 path: String,
444 verbose: bool,
445 ) -> (UploadQueueSender, UploadResultReceiver) {
446 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(64);
447 let (verify_result_tx, verify_result_rx) = oneshot::channel();
448
449 // FIXME: async-block-ify this code!
450 tokio::spawn(
451 ReceiverStream::new(verify_queue_rx)
452 .map(Ok::<_, Error>)
453 .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| {
454 match (response, merged_chunk_info) {
455 (Some(response), MergedChunkInfo::Known(list)) => {
456 Either::Left(
457 response
458 .map_err(Error::from)
459 .and_then(H2Client::h2api_response)
460 .and_then(move |_result| {
461 future::ok(MergedChunkInfo::Known(list))
462 })
463 )
464 }
465 (None, MergedChunkInfo::Known(list)) => {
466 Either::Right(future::ok(MergedChunkInfo::Known(list)))
467 }
468 _ => unreachable!(),
469 }
470 })
471 .merge_known_chunks()
472 .and_then(move |merged_chunk_info| {
473 match merged_chunk_info {
474 MergedChunkInfo::Known(chunk_list) => {
475 let mut digest_list = vec![];
476 let mut offset_list = vec![];
477 for (offset, digest) in chunk_list {
478 digest_list.push(digest_to_hex(&digest));
479 offset_list.push(offset);
480 }
481 if verbose { println!("append chunks list len ({})", digest_list.len()); }
482 let param = json!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list });
483 let request = H2Client::request_builder("localhost", "PUT", &path, None, Some("application/json")).unwrap();
484 let param_data = bytes::Bytes::from(param.to_string().into_bytes());
485 let upload_data = Some(param_data);
486 h2.send_request(request, upload_data)
487 .and_then(move |response| {
488 response
489 .map_err(Error::from)
490 .and_then(H2Client::h2api_response)
491 .map_ok(|_| ())
492 })
493 .map_err(|err| format_err!("pipelined request failed: {}", err))
494 }
495 _ => unreachable!(),
496 }
497 })
498 .try_for_each(|_| future::ok(()))
499 .map(|result| {
500 let _ignore_closed_channel = verify_result_tx.send(result);
501 })
502 );
503
504 (verify_queue_tx, verify_result_rx)
505 }
506
507 pub async fn download_previous_fixed_index(
508 &self,
509 archive_name: &str,
510 manifest: &BackupManifest,
511 known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
512 ) -> Result<FixedIndexReader, Error> {
513 let mut tmpfile = std::fs::OpenOptions::new()
514 .write(true)
515 .read(true)
516 .custom_flags(libc::O_TMPFILE)
517 .open("/tmp")?;
518
519 let param = json!({ "archive-name": archive_name });
520 self.h2
521 .download("previous", Some(param), &mut tmpfile)
522 .await?;
523
524 let index = FixedIndexReader::new(tmpfile).map_err(|err| {
525 format_err!("unable to read fixed index '{}' - {}", archive_name, err)
526 })?;
527 // Note: do not use values stored in index (not trusted) - instead, computed them again
528 let (csum, size) = index.compute_csum();
529 manifest.verify_file(archive_name, &csum, size)?;
530
531 // add index chunks to known chunks
532 let mut known_chunks = known_chunks.lock().unwrap();
533 for i in 0..index.index_count() {
534 known_chunks.insert(*index.index_digest(i).unwrap());
535 }
536
537 if self.verbose {
538 println!(
539 "{}: known chunks list length is {}",
540 archive_name,
541 index.index_count()
542 );
543 }
544
545 Ok(index)
546 }
547
548 pub async fn download_previous_dynamic_index(
549 &self,
550 archive_name: &str,
551 manifest: &BackupManifest,
552 known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
553 ) -> Result<DynamicIndexReader, Error> {
554 let mut tmpfile = std::fs::OpenOptions::new()
555 .write(true)
556 .read(true)
557 .custom_flags(libc::O_TMPFILE)
558 .open("/tmp")?;
559
560 let param = json!({ "archive-name": archive_name });
561 self.h2
562 .download("previous", Some(param), &mut tmpfile)
563 .await?;
564
565 let index = DynamicIndexReader::new(tmpfile).map_err(|err| {
566 format_err!("unable to read dynmamic index '{}' - {}", archive_name, err)
567 })?;
568 // Note: do not use values stored in index (not trusted) - instead, computed them again
569 let (csum, size) = index.compute_csum();
570 manifest.verify_file(archive_name, &csum, size)?;
571
572 // add index chunks to known chunks
573 let mut known_chunks = known_chunks.lock().unwrap();
574 for i in 0..index.index_count() {
575 known_chunks.insert(*index.index_digest(i).unwrap());
576 }
577
578 if self.verbose {
579 println!(
580 "{}: known chunks list length is {}",
581 archive_name,
582 index.index_count()
583 );
584 }
585
586 Ok(index)
587 }
588
589 /// Retrieve backup time of last backup
590 pub async fn previous_backup_time(&self) -> Result<Option<i64>, Error> {
591 let data = self.h2.get("previous_backup_time", None).await?;
592 serde_json::from_value(data).map_err(|err| {
593 format_err!(
594 "Failed to parse backup time value returned by server - {}",
595 err
596 )
597 })
598 }
599
600 /// Download backup manifest (index.json) of last backup
601 pub async fn download_previous_manifest(&self) -> Result<BackupManifest, Error> {
602 let mut raw_data = Vec::with_capacity(64 * 1024);
603
604 let param = json!({ "archive-name": MANIFEST_BLOB_NAME });
605 self.h2
606 .download("previous", Some(param), &mut raw_data)
607 .await?;
608
609 let blob = DataBlob::load_from_reader(&mut &raw_data[..])?;
610 // no expected digest available
611 let data = blob.decode(self.crypt_config.as_ref().map(Arc::as_ref), None)?;
612
613 let manifest =
614 BackupManifest::from_data(&data[..], self.crypt_config.as_ref().map(Arc::as_ref))?;
615
616 Ok(manifest)
617 }
618
619 // We have no `self` here for `h2` and `verbose`, the only other arg "common" with 1 other
620 // function in the same path is `wid`, so those 3 could be in a struct, but there's no real use
621 // since this is a private method.
622 #[allow(clippy::too_many_arguments)]
623 fn upload_chunk_info_stream(
624 h2: H2Client,
625 wid: u64,
626 stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
627 prefix: &str,
628 known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
629 crypt_config: Option<Arc<CryptConfig>>,
630 compress: bool,
631 verbose: bool,
632 ) -> impl Future<Output = Result<UploadStats, Error>> {
633 let total_chunks = Arc::new(AtomicUsize::new(0));
634 let total_chunks2 = total_chunks.clone();
635 let known_chunk_count = Arc::new(AtomicUsize::new(0));
636 let known_chunk_count2 = known_chunk_count.clone();
637
638 let stream_len = Arc::new(AtomicUsize::new(0));
639 let stream_len2 = stream_len.clone();
640 let compressed_stream_len = Arc::new(AtomicU64::new(0));
641 let compressed_stream_len2 = compressed_stream_len.clone();
642 let reused_len = Arc::new(AtomicUsize::new(0));
643 let reused_len2 = reused_len.clone();
644
645 let append_chunk_path = format!("{}_index", prefix);
646 let upload_chunk_path = format!("{}_chunk", prefix);
647 let is_fixed_chunk_size = prefix == "fixed";
648
649 let (upload_queue, upload_result) =
650 Self::append_chunk_queue(h2.clone(), wid, append_chunk_path, verbose);
651
652 let start_time = std::time::Instant::now();
653
654 let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
655 let index_csum_2 = index_csum.clone();
656
657 stream
658 .and_then(move |data| {
659 let chunk_len = data.len();
660
661 total_chunks.fetch_add(1, Ordering::SeqCst);
662 let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;
663
664 let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress);
665
666 if let Some(ref crypt_config) = crypt_config {
667 chunk_builder = chunk_builder.crypt_config(crypt_config);
668 }
669
670 let mut known_chunks = known_chunks.lock().unwrap();
671 let digest = chunk_builder.digest();
672
673 let mut guard = index_csum.lock().unwrap();
674 let csum = guard.as_mut().unwrap();
675
676 let chunk_end = offset + chunk_len as u64;
677
678 if !is_fixed_chunk_size {
679 csum.update(&chunk_end.to_le_bytes());
680 }
681 csum.update(digest);
682
683 let chunk_is_known = known_chunks.contains(digest);
684 if chunk_is_known {
685 known_chunk_count.fetch_add(1, Ordering::SeqCst);
686 reused_len.fetch_add(chunk_len, Ordering::SeqCst);
687 future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
688 } else {
689 let compressed_stream_len2 = compressed_stream_len.clone();
690 known_chunks.insert(*digest);
691 future::ready(chunk_builder.build().map(move |(chunk, digest)| {
692 compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst);
693 MergedChunkInfo::New(ChunkInfo {
694 chunk,
695 digest,
696 chunk_len: chunk_len as u64,
697 offset,
698 })
699 }))
700 }
701 })
702 .merge_known_chunks()
703 .try_for_each(move |merged_chunk_info| {
704 let upload_queue = upload_queue.clone();
705
706 if let MergedChunkInfo::New(chunk_info) = merged_chunk_info {
707 let offset = chunk_info.offset;
708 let digest = chunk_info.digest;
709 let digest_str = digest_to_hex(&digest);
710
711 /* too verbose, needs finer verbosity setting granularity
712 if verbose {
713 println!("upload new chunk {} ({} bytes, offset {})", digest_str,
714 chunk_info.chunk_len, offset);
715 }
716 */
717
718 let chunk_data = chunk_info.chunk.into_inner();
719 let param = json!({
720 "wid": wid,
721 "digest": digest_str,
722 "size": chunk_info.chunk_len,
723 "encoded-size": chunk_data.len(),
724 });
725
726 let ct = "application/octet-stream";
727 let request = H2Client::request_builder(
728 "localhost",
729 "POST",
730 &upload_chunk_path,
731 Some(param),
732 Some(ct),
733 )
734 .unwrap();
735 let upload_data = Some(bytes::Bytes::from(chunk_data));
736
737 let new_info = MergedChunkInfo::Known(vec![(offset, digest)]);
738
739 Either::Left(h2.send_request(request, upload_data).and_then(
740 move |response| async move {
741 upload_queue
742 .send((new_info, Some(response)))
743 .await
744 .map_err(|err| {
745 format_err!("failed to send to upload queue: {}", err)
746 })
747 },
748 ))
749 } else {
750 Either::Right(async move {
751 upload_queue
752 .send((merged_chunk_info, None))
753 .await
754 .map_err(|err| format_err!("failed to send to upload queue: {}", err))
755 })
756 }
757 })
758 .then(move |result| async move { upload_result.await?.and(result) }.boxed())
759 .and_then(move |_| {
760 let duration = start_time.elapsed();
761 let chunk_count = total_chunks2.load(Ordering::SeqCst);
762 let chunk_reused = known_chunk_count2.load(Ordering::SeqCst);
763 let size = stream_len2.load(Ordering::SeqCst);
764 let size_reused = reused_len2.load(Ordering::SeqCst);
765 let size_compressed = compressed_stream_len2.load(Ordering::SeqCst) as usize;
766
767 let mut guard = index_csum_2.lock().unwrap();
768 let csum = guard.take().unwrap().finish();
769
770 futures::future::ok(UploadStats {
771 chunk_count,
772 chunk_reused,
773 size,
774 size_reused,
775 size_compressed,
776 duration,
777 csum,
778 })
779 })
780 }
781
782 /// Upload speed test - prints result to stderr
783 pub async fn upload_speedtest(&self, verbose: bool) -> Result<f64, Error> {
784 let mut data = vec![];
785 // generate pseudo random byte sequence
786 for i in 0..1024 * 1024 {
787 for j in 0..4 {
788 let byte = ((i >> (j << 3)) & 0xff) as u8;
789 data.push(byte);
790 }
791 }
792
793 let item_len = data.len();
794
795 let mut repeat = 0;
796
797 let (upload_queue, upload_result) = Self::response_queue(verbose);
798
799 let start_time = std::time::Instant::now();
800
801 loop {
802 repeat += 1;
803 if start_time.elapsed().as_secs() >= 5 {
804 break;
805 }
806
807 if verbose {
808 eprintln!("send test data ({} bytes)", data.len());
809 }
810 let request =
811 H2Client::request_builder("localhost", "POST", "speedtest", None, None).unwrap();
812 let request_future = self
813 .h2
814 .send_request(request, Some(bytes::Bytes::from(data.clone())))
815 .await?;
816
817 upload_queue.send(request_future).await?;
818 }
819
820 drop(upload_queue); // close queue
821
822 let _ = upload_result.await?;
823
824 eprintln!(
825 "Uploaded {} chunks in {} seconds.",
826 repeat,
827 start_time.elapsed().as_secs()
828 );
829 let speed = ((item_len * (repeat as usize)) as f64) / start_time.elapsed().as_secs_f64();
830 eprintln!(
831 "Time per request: {} microseconds.",
832 (start_time.elapsed().as_micros()) / (repeat as u128)
833 );
834
835 Ok(speed)
836 }
837 }