]> git.proxmox.com Git - proxmox-backup.git/blame - pbs-client/src/backup_writer.rs
pbs-client: don't include empty backup-ns in requests
[proxmox-backup.git] / pbs-client / src / backup_writer.rs
CommitLineData
cf9271e2 1use std::collections::HashSet;
be3a0295 2use std::future::Future;
b957aa81 3use std::os::unix::fs::OpenOptionsExt;
924373d2 4use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
cf9271e2
DM
5use std::sync::{Arc, Mutex};
6
f28d9088 7use anyhow::{bail, format_err, Error};
be3a0295
WB
8use futures::future::{self, AbortHandle, Either, FutureExt, TryFutureExt};
9use futures::stream::{Stream, StreamExt, TryStreamExt};
cf9271e2
DM
10use serde_json::{json, Value};
11use tokio::io::AsyncReadExt;
12use tokio::sync::{mpsc, oneshot};
7c667013 13use tokio_stream::wrappers::ReceiverStream;
cf9271e2 14
8c74349b 15use pbs_api_types::{BackupDir, HumanByte};
4805edc4
WB
16use pbs_datastore::data_blob::{ChunkInfo, DataBlob, DataChunkBuilder};
17use pbs_datastore::dynamic_index::DynamicIndexReader;
18use pbs_datastore::fixed_index::FixedIndexReader;
19use pbs_datastore::index::IndexFile;
20use pbs_datastore::manifest::{ArchiveType, BackupManifest, MANIFEST_BLOB_NAME};
bdfa6370
TL
21use pbs_datastore::{CATALOG_NAME, PROXMOX_BACKUP_PROTOCOL_ID_V1};
22use pbs_tools::crypt_config::CryptConfig;
770a36e5 23
ef6d4967 24use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo};
cf9271e2 25
ef6d4967 26use super::{H2Client, HttpClient};
cf9271e2
DM
27
28pub struct BackupWriter {
29 h2: H2Client,
dc089345 30 abort: AbortHandle,
e02c3d46 31 verbose: bool,
b957aa81 32 crypt_config: Option<Arc<CryptConfig>>,
cf9271e2
DM
33}
34
35impl Drop for BackupWriter {
cf9271e2 36 fn drop(&mut self) {
dc089345 37 self.abort.abort();
cf9271e2
DM
38 }
39}
40
41pub struct BackupStats {
42 pub size: u64,
43 pub csum: [u8; 32],
44}
45
e43b9175
FG
46/// Options for uploading blobs/streams to the server
47#[derive(Default, Clone)]
48pub struct UploadOptions {
49 pub previous_manifest: Option<Arc<BackupManifest>>,
50 pub compress: bool,
51 pub encrypt: bool,
52 pub fixed_size: Option<u64>,
53}
54
3b60b509
DC
55struct UploadStats {
56 chunk_count: usize,
57 chunk_reused: usize,
58 size: usize,
59 size_reused: usize,
924373d2 60 size_compressed: usize,
3b60b509
DC
61 duration: std::time::Duration,
62 csum: [u8; 32],
63}
64
8db14689
WB
65type UploadQueueSender = mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>;
66type UploadResultReceiver = oneshot::Receiver<Result<(), Error>>;
67
cf9271e2 68impl BackupWriter {
ef6d4967
TL
69 fn new(
70 h2: H2Client,
71 abort: AbortHandle,
72 crypt_config: Option<Arc<CryptConfig>>,
73 verbose: bool,
74 ) -> Arc<Self> {
75 Arc::new(Self {
76 h2,
77 abort,
78 crypt_config,
79 verbose,
80 })
cf9271e2
DM
81 }
82
367c0ff7
FG
83 // FIXME: extract into (flattened) parameter struct?
84 #[allow(clippy::too_many_arguments)]
cf9271e2
DM
85 pub async fn start(
86 client: HttpClient,
b957aa81 87 crypt_config: Option<Arc<CryptConfig>>,
cf9271e2 88 datastore: &str,
8c74349b 89 backup: &BackupDir,
cf9271e2 90 debug: bool,
ef6d4967 91 benchmark: bool,
cf9271e2 92 ) -> Result<Arc<BackupWriter>, Error> {
c18d481f 93 let mut param = json!({
8c74349b
WB
94 "backup-type": backup.ty(),
95 "backup-id": backup.id(),
96 "backup-time": backup.time,
cf9271e2 97 "store": datastore,
61d7b501
HL
98 "debug": debug,
99 "benchmark": benchmark
cf9271e2
DM
100 });
101
c18d481f
WB
102 let ns = backup.ns();
103 if !ns.is_root() {
104 param["backup-ns"] = serde_json::to_value(ns)?;
105 }
106
cf9271e2 107 let req = HttpClient::request_builder(
ef6d4967
TL
108 client.server(),
109 client.port(),
110 "GET",
111 "/api2/json/backup",
112 Some(param),
113 )
114 .unwrap();
115
116 let (h2, abort) = client
117 .start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!()))
118 .await?;
cf9271e2 119
b957aa81 120 Ok(BackupWriter::new(h2, abort, crypt_config, debug))
cf9271e2
DM
121 }
122
ef6d4967 123 pub async fn get(&self, path: &str, param: Option<Value>) -> Result<Value, Error> {
cf9271e2
DM
124 self.h2.get(path, param).await
125 }
126
ef6d4967 127 pub async fn put(&self, path: &str, param: Option<Value>) -> Result<Value, Error> {
cf9271e2
DM
128 self.h2.put(path, param).await
129 }
130
ef6d4967 131 pub async fn post(&self, path: &str, param: Option<Value>) -> Result<Value, Error> {
cf9271e2
DM
132 self.h2.post(path, param).await
133 }
134
135 pub async fn upload_post(
136 &self,
137 path: &str,
138 param: Option<Value>,
139 content_type: &str,
140 data: Vec<u8>,
141 ) -> Result<Value, Error> {
ef6d4967
TL
142 self.h2
143 .upload("POST", path, param, content_type, data)
144 .await
cf9271e2
DM
145 }
146
147 pub async fn send_upload_request(
148 &self,
149 method: &str,
150 path: &str,
151 param: Option<Value>,
152 content_type: &str,
153 data: Vec<u8>,
154 ) -> Result<h2::client::ResponseFuture, Error> {
ef6d4967
TL
155 let request =
156 H2Client::request_builder("localhost", method, path, param, Some(content_type))
157 .unwrap();
158 let response_future = self
159 .h2
160 .send_request(request, Some(bytes::Bytes::from(data.clone())))
161 .await?;
cf9271e2
DM
162 Ok(response_future)
163 }
164
165 pub async fn upload_put(
166 &self,
167 path: &str,
168 param: Option<Value>,
169 content_type: &str,
170 data: Vec<u8>,
171 ) -> Result<Value, Error> {
172 self.h2.upload("PUT", path, param, content_type, data).await
173 }
174
175 pub async fn finish(self: Arc<Self>) -> Result<(), Error> {
176 let h2 = self.h2.clone();
177
178 h2.post("finish", None)
179 .map_ok(move |_| {
dc089345 180 self.abort.abort();
cf9271e2
DM
181 })
182 .await
183 }
184
e016f9ff 185 pub fn cancel(&self) {
dc089345 186 self.abort.abort();
cf9271e2
DM
187 }
188
189 pub async fn upload_blob<R: std::io::Read>(
190 &self,
191 mut reader: R,
192 file_name: &str,
ef6d4967 193 ) -> Result<BackupStats, Error> {
cf9271e2
DM
194 let mut raw_data = Vec::new();
195 // fixme: avoid loading into memory
196 reader.read_to_end(&mut raw_data)?;
197
198 let csum = openssl::sha::sha256(&raw_data);
199 let param = json!({"encoded-size": raw_data.len(), "file-name": file_name });
200 let size = raw_data.len() as u64;
ef6d4967
TL
201 let _value = self
202 .h2
203 .upload(
204 "POST",
205 "blob",
206 Some(param),
207 "application/octet-stream",
208 raw_data,
209 )
210 .await?;
cf9271e2
DM
211 Ok(BackupStats { size, csum })
212 }
213
214 pub async fn upload_blob_from_data(
215 &self,
216 data: Vec<u8>,
217 file_name: &str,
e43b9175 218 options: UploadOptions,
f28d9088 219 ) -> Result<BackupStats, Error> {
e43b9175 220 let blob = match (options.encrypt, &self.crypt_config) {
ef6d4967
TL
221 (false, _) => DataBlob::encode(&data, None, options.compress)?,
222 (true, None) => bail!("requested encryption without a crypt config"),
223 (true, Some(crypt_config)) => {
224 DataBlob::encode(&data, Some(crypt_config), options.compress)?
225 }
cf9271e2
DM
226 };
227
228 let raw_data = blob.into_inner();
229 let size = raw_data.len() as u64;
230
231 let csum = openssl::sha::sha256(&raw_data);
232 let param = json!({"encoded-size": size, "file-name": file_name });
ef6d4967
TL
233 let _value = self
234 .h2
235 .upload(
236 "POST",
237 "blob",
238 Some(param),
239 "application/octet-stream",
240 raw_data,
241 )
242 .await?;
cf9271e2
DM
243 Ok(BackupStats { size, csum })
244 }
245
246 pub async fn upload_blob_from_file<P: AsRef<std::path::Path>>(
247 &self,
248 src_path: P,
249 file_name: &str,
e43b9175 250 options: UploadOptions,
3638341a 251 ) -> Result<BackupStats, Error> {
cf9271e2
DM
252 let src_path = src_path.as_ref();
253
254 let mut file = tokio::fs::File::open(src_path)
255 .await
256 .map_err(|err| format_err!("unable to open file {:?} - {}", src_path, err))?;
257
258 let mut contents = Vec::new();
259
260 file.read_to_end(&mut contents)
261 .await
262 .map_err(|err| format_err!("unable to read file {:?} - {}", src_path, err))?;
263
ef6d4967
TL
264 self.upload_blob_from_data(contents, file_name, options)
265 .await
cf9271e2
DM
266 }
267
268 pub async fn upload_stream(
269 &self,
270 archive_name: &str,
271 stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
e43b9175 272 options: UploadOptions,
cf9271e2
DM
273 ) -> Result<BackupStats, Error> {
274 let known_chunks = Arc::new(Mutex::new(HashSet::new()));
275
276 let mut param = json!({ "archive-name": archive_name });
e43b9175 277 let prefix = if let Some(size) = options.fixed_size {
cf9271e2 278 param["size"] = size.into();
e43b9175
FG
279 "fixed"
280 } else {
281 "dynamic"
282 };
cf9271e2 283
e43b9175 284 if options.encrypt && self.crypt_config.is_none() {
3638341a
DM
285 bail!("requested encryption without a crypt config");
286 }
287
cf9271e2
DM
288 let index_path = format!("{}_index", prefix);
289 let close_path = format!("{}_close", prefix);
290
e43b9175 291 if let Some(manifest) = options.previous_manifest {
b957aa81 292 // try, but ignore errors
4805edc4 293 match ArchiveType::from_path(archive_name) {
b957aa81 294 Ok(ArchiveType::FixedIndex) => {
7c22932c 295 if let Err(err) = self
ef6d4967
TL
296 .download_previous_fixed_index(
297 archive_name,
298 &manifest,
299 known_chunks.clone(),
300 )
7c22932c
DC
301 .await
302 {
303 eprintln!("Error downloading .fidx from previous manifest: {}", err);
304 }
b957aa81
DM
305 }
306 Ok(ArchiveType::DynamicIndex) => {
7c22932c 307 if let Err(err) = self
ef6d4967
TL
308 .download_previous_dynamic_index(
309 archive_name,
310 &manifest,
311 known_chunks.clone(),
312 )
7c22932c
DC
313 .await
314 {
315 eprintln!("Error downloading .didx from previous manifest: {}", err);
316 }
b957aa81
DM
317 }
318 _ => { /* do nothing */ }
319 }
320 }
cf9271e2 321
ef6d4967
TL
322 let wid = self
323 .h2
324 .post(&index_path, Some(param))
325 .await?
326 .as_u64()
327 .unwrap();
cf9271e2 328
3b60b509
DC
329 let upload_stats = Self::upload_chunk_info_stream(
330 self.h2.clone(),
331 wid,
332 stream,
9a37bd6c 333 prefix,
3b60b509
DC
334 known_chunks.clone(),
335 if options.encrypt {
336 self.crypt_config.clone()
337 } else {
338 None
339 },
340 options.compress,
341 self.verbose,
342 )
343 .await?;
cf9271e2 344
924373d2
DC
345 let size_dirty = upload_stats.size - upload_stats.size_reused;
346 let size: HumanByte = upload_stats.size.into();
6e1deb15 347 let archive = if self.verbose {
d7eedbd2 348 archive_name
6e1deb15 349 } else {
770a36e5 350 pbs_tools::format::strip_server_file_extension(archive_name)
6e1deb15
TL
351 };
352 if archive_name != CATALOG_NAME {
ef6d4967 353 let speed: HumanByte =
924373d2
DC
354 ((size_dirty * 1_000_000) / (upload_stats.duration.as_micros() as usize)).into();
355 let size_dirty: HumanByte = size_dirty.into();
356 let size_compressed: HumanByte = upload_stats.size_compressed.into();
ef6d4967 357 println!(
924373d2 358 "{}: had to backup {} of {} (compressed {}) in {:.2}s",
ef6d4967 359 archive,
924373d2
DC
360 size_dirty,
361 size,
362 size_compressed,
363 upload_stats.duration.as_secs_f64()
ef6d4967 364 );
924373d2 365 println!("{}: average backup speed: {}/s", archive, speed);
6e1deb15 366 } else {
924373d2 367 println!("Uploaded backup catalog ({})", size);
6e1deb15
TL
368 }
369
3b60b509
DC
370 if upload_stats.size_reused > 0 && upload_stats.size > 1024 * 1024 {
371 let reused_percent = upload_stats.size_reused as f64 * 100. / upload_stats.size as f64;
372 let reused: HumanByte = upload_stats.size_reused.into();
ef6d4967
TL
373 println!(
374 "{}: backup was done incrementally, reused {} ({:.1}%)",
375 archive, reused, reused_percent
376 );
6e1deb15 377 }
3b60b509 378 if self.verbose && upload_stats.chunk_count > 0 {
ef6d4967
TL
379 println!(
380 "{}: Reused {} from {} chunks.",
3b60b509 381 archive, upload_stats.chunk_reused, upload_stats.chunk_count
ef6d4967
TL
382 );
383 println!(
384 "{}: Average chunk size was {}.",
385 archive,
3b60b509 386 HumanByte::from(upload_stats.size / upload_stats.chunk_count)
ef6d4967
TL
387 );
388 println!(
389 "{}: Average time per request: {} microseconds.",
390 archive,
3b60b509 391 (upload_stats.duration.as_micros()) / (upload_stats.chunk_count as u128)
ef6d4967 392 );
6da73c82
DM
393 }
394
cf9271e2
DM
395 let param = json!({
396 "wid": wid ,
3b60b509
DC
397 "chunk-count": upload_stats.chunk_count,
398 "size": upload_stats.size,
25877d05 399 "csum": hex::encode(&upload_stats.csum),
cf9271e2
DM
400 });
401 let _value = self.h2.post(&close_path, Some(param)).await?;
402 Ok(BackupStats {
3b60b509
DC
403 size: upload_stats.size as u64,
404 csum: upload_stats.csum,
cf9271e2
DM
405 })
406 }
407
ef6d4967
TL
408 fn response_queue(
409 verbose: bool,
410 ) -> (
cf9271e2 411 mpsc::Sender<h2::client::ResponseFuture>,
ef6d4967 412 oneshot::Receiver<Result<(), Error>>,
cf9271e2
DM
413 ) {
414 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100);
415 let (verify_result_tx, verify_result_rx) = oneshot::channel();
416
db0cb9ce
WB
417 // FIXME: check if this works as expected as replacement for the combinator below?
418 // tokio::spawn(async move {
419 // let result: Result<(), Error> = (async move {
420 // while let Some(response) = verify_queue_rx.recv().await {
421 // match H2Client::h2api_response(response.await?).await {
422 // Ok(result) => println!("RESPONSE: {:?}", result),
423 // Err(err) => bail!("pipelined request failed: {}", err),
424 // }
425 // }
426 // Ok(())
427 // }).await;
428 // let _ignore_closed_channel = verify_result_tx.send(result);
429 // });
430 // old code for reference?
431 tokio::spawn(
7c667013 432 ReceiverStream::new(verify_queue_rx)
cf9271e2 433 .map(Ok::<_, Error>)
323b2f3d 434 .try_for_each(move |response: h2::client::ResponseFuture| {
cf9271e2
DM
435 response
436 .map_err(Error::from)
437 .and_then(H2Client::h2api_response)
ef6d4967
TL
438 .map_ok(move |result| {
439 if verbose {
440 println!("RESPONSE: {:?}", result)
441 }
442 })
cf9271e2
DM
443 .map_err(|err| format_err!("pipelined request failed: {}", err))
444 })
445 .map(|result| {
ef6d4967
TL
446 let _ignore_closed_channel = verify_result_tx.send(result);
447 }),
cf9271e2
DM
448 );
449
450 (verify_queue_tx, verify_result_rx)
451 }
452
8db14689
WB
453 fn append_chunk_queue(
454 h2: H2Client,
455 wid: u64,
456 path: String,
457 verbose: bool,
458 ) -> (UploadQueueSender, UploadResultReceiver) {
cf9271e2
DM
459 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(64);
460 let (verify_result_tx, verify_result_rx) = oneshot::channel();
461
db0cb9ce
WB
462 // FIXME: async-block-ify this code!
463 tokio::spawn(
7c667013 464 ReceiverStream::new(verify_queue_rx)
cf9271e2
DM
465 .map(Ok::<_, Error>)
466 .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| {
467 match (response, merged_chunk_info) {
468 (Some(response), MergedChunkInfo::Known(list)) => {
be3a0295 469 Either::Left(
cf9271e2
DM
470 response
471 .map_err(Error::from)
472 .and_then(H2Client::h2api_response)
473 .and_then(move |_result| {
474 future::ok(MergedChunkInfo::Known(list))
475 })
476 )
477 }
478 (None, MergedChunkInfo::Known(list)) => {
be3a0295 479 Either::Right(future::ok(MergedChunkInfo::Known(list)))
cf9271e2
DM
480 }
481 _ => unreachable!(),
482 }
483 })
484 .merge_known_chunks()
485 .and_then(move |merged_chunk_info| {
486 match merged_chunk_info {
487 MergedChunkInfo::Known(chunk_list) => {
488 let mut digest_list = vec![];
489 let mut offset_list = vec![];
490 for (offset, digest) in chunk_list {
25877d05 491 digest_list.push(hex::encode(&digest));
cf9271e2
DM
492 offset_list.push(offset);
493 }
e02c3d46 494 if verbose { println!("append chunks list len ({})", digest_list.len()); }
cf9271e2
DM
495 let param = json!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list });
496 let request = H2Client::request_builder("localhost", "PUT", &path, None, Some("application/json")).unwrap();
db0cb9ce 497 let param_data = bytes::Bytes::from(param.to_string().into_bytes());
cf9271e2 498 let upload_data = Some(param_data);
8db14689 499 h2.send_request(request, upload_data)
cf9271e2
DM
500 .and_then(move |response| {
501 response
502 .map_err(Error::from)
503 .and_then(H2Client::h2api_response)
504 .map_ok(|_| ())
505 })
506 .map_err(|err| format_err!("pipelined request failed: {}", err))
507 }
508 _ => unreachable!(),
509 }
510 })
511 .try_for_each(|_| future::ok(()))
512 .map(|result| {
513 let _ignore_closed_channel = verify_result_tx.send(result);
514 })
515 );
516
517 (verify_queue_tx, verify_result_rx)
518 }
519
b957aa81 520 pub async fn download_previous_fixed_index(
cf9271e2 521 &self,
cf9271e2 522 archive_name: &str,
b957aa81 523 manifest: &BackupManifest,
ef6d4967 524 known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
b957aa81 525 ) -> Result<FixedIndexReader, Error> {
b957aa81
DM
526 let mut tmpfile = std::fs::OpenOptions::new()
527 .write(true)
528 .read(true)
529 .custom_flags(libc::O_TMPFILE)
530 .open("/tmp")?;
cf9271e2 531
b957aa81 532 let param = json!({ "archive-name": archive_name });
ef6d4967
TL
533 self.h2
534 .download("previous", Some(param), &mut tmpfile)
535 .await?;
b957aa81 536
ef6d4967
TL
537 let index = FixedIndexReader::new(tmpfile).map_err(|err| {
538 format_err!("unable to read fixed index '{}' - {}", archive_name, err)
539 })?;
b957aa81
DM
540 // Note: do not use values stored in index (not trusted) - instead, computed them again
541 let (csum, size) = index.compute_csum();
542 manifest.verify_file(archive_name, &csum, size)?;
543
544 // add index chunks to known chunks
545 let mut known_chunks = known_chunks.lock().unwrap();
546 for i in 0..index.index_count() {
547 known_chunks.insert(*index.index_digest(i).unwrap());
548 }
cf9271e2 549
b957aa81 550 if self.verbose {
ef6d4967
TL
551 println!(
552 "{}: known chunks list length is {}",
553 archive_name,
554 index.index_count()
555 );
cf9271e2
DM
556 }
557
b957aa81
DM
558 Ok(index)
559 }
560
561 pub async fn download_previous_dynamic_index(
562 &self,
563 archive_name: &str,
564 manifest: &BackupManifest,
ef6d4967 565 known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
b957aa81 566 ) -> Result<DynamicIndexReader, Error> {
b957aa81
DM
567 let mut tmpfile = std::fs::OpenOptions::new()
568 .write(true)
569 .read(true)
570 .custom_flags(libc::O_TMPFILE)
571 .open("/tmp")?;
cf9271e2 572
b957aa81 573 let param = json!({ "archive-name": archive_name });
ef6d4967
TL
574 self.h2
575 .download("previous", Some(param), &mut tmpfile)
576 .await?;
b957aa81 577
ef6d4967
TL
578 let index = DynamicIndexReader::new(tmpfile).map_err(|err| {
579 format_err!("unable to read dynmamic index '{}' - {}", archive_name, err)
580 })?;
b957aa81
DM
581 // Note: do not use values stored in index (not trusted) - instead, computed them again
582 let (csum, size) = index.compute_csum();
583 manifest.verify_file(archive_name, &csum, size)?;
584
585 // add index chunks to known chunks
586 let mut known_chunks = known_chunks.lock().unwrap();
587 for i in 0..index.index_count() {
588 known_chunks.insert(*index.index_digest(i).unwrap());
cf9271e2
DM
589 }
590
e02c3d46 591 if self.verbose {
ef6d4967
TL
592 println!(
593 "{}: known chunks list length is {}",
594 archive_name,
595 index.index_count()
596 );
e02c3d46 597 }
ee5fe978 598
b957aa81
DM
599 Ok(index)
600 }
601
8b7f8d3f
FG
602 /// Retrieve backup time of last backup
603 pub async fn previous_backup_time(&self) -> Result<Option<i64>, Error> {
604 let data = self.h2.get("previous_backup_time", None).await?;
ef6d4967
TL
605 serde_json::from_value(data).map_err(|err| {
606 format_err!(
607 "Failed to parse backup time value returned by server - {}",
608 err
609 )
610 })
8b7f8d3f
FG
611 }
612
b957aa81
DM
613 /// Download backup manifest (index.json) of last backup
614 pub async fn download_previous_manifest(&self) -> Result<BackupManifest, Error> {
b957aa81
DM
615 let mut raw_data = Vec::with_capacity(64 * 1024);
616
617 let param = json!({ "archive-name": MANIFEST_BLOB_NAME });
ef6d4967
TL
618 self.h2
619 .download("previous", Some(param), &mut raw_data)
620 .await?;
b957aa81 621
39f18b30 622 let blob = DataBlob::load_from_reader(&mut &raw_data[..])?;
8819d1f2
FG
623 // no expected digest available
624 let data = blob.decode(self.crypt_config.as_ref().map(Arc::as_ref), None)?;
3dacedce 625
ef6d4967
TL
626 let manifest =
627 BackupManifest::from_data(&data[..], self.crypt_config.as_ref().map(Arc::as_ref))?;
b957aa81
DM
628
629 Ok(manifest)
cf9271e2
DM
630 }
631
8db14689 632 // We have no `self` here for `h2` and `verbose`, the only other arg "common" with 1 other
d1d74c43 633 // function in the same path is `wid`, so those 3 could be in a struct, but there's no real use
8db14689
WB
634 // since this is a private method.
635 #[allow(clippy::too_many_arguments)]
cf9271e2
DM
636 fn upload_chunk_info_stream(
637 h2: H2Client,
638 wid: u64,
639 stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
640 prefix: &str,
ef6d4967 641 known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
cf9271e2 642 crypt_config: Option<Arc<CryptConfig>>,
3638341a 643 compress: bool,
e02c3d46 644 verbose: bool,
3b60b509 645 ) -> impl Future<Output = Result<UploadStats, Error>> {
6e1deb15
TL
646 let total_chunks = Arc::new(AtomicUsize::new(0));
647 let total_chunks2 = total_chunks.clone();
648 let known_chunk_count = Arc::new(AtomicUsize::new(0));
649 let known_chunk_count2 = known_chunk_count.clone();
cf9271e2
DM
650
651 let stream_len = Arc::new(AtomicUsize::new(0));
652 let stream_len2 = stream_len.clone();
924373d2
DC
653 let compressed_stream_len = Arc::new(AtomicU64::new(0));
654 let compressed_stream_len2 = compressed_stream_len.clone();
6e1deb15
TL
655 let reused_len = Arc::new(AtomicUsize::new(0));
656 let reused_len2 = reused_len.clone();
cf9271e2
DM
657
658 let append_chunk_path = format!("{}_index", prefix);
659 let upload_chunk_path = format!("{}_chunk", prefix);
660 let is_fixed_chunk_size = prefix == "fixed";
661
662 let (upload_queue, upload_result) =
8db14689 663 Self::append_chunk_queue(h2.clone(), wid, append_chunk_path, verbose);
cf9271e2
DM
664
665 let start_time = std::time::Instant::now();
666
667 let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
668 let index_csum_2 = index_csum.clone();
669
670 stream
671 .and_then(move |data| {
cf9271e2
DM
672 let chunk_len = data.len();
673
6e1deb15 674 total_chunks.fetch_add(1, Ordering::SeqCst);
cf9271e2
DM
675 let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;
676
ef6d4967 677 let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress);
cf9271e2
DM
678
679 if let Some(ref crypt_config) = crypt_config {
3638341a 680 chunk_builder = chunk_builder.crypt_config(crypt_config);
cf9271e2
DM
681 }
682
683 let mut known_chunks = known_chunks.lock().unwrap();
684 let digest = chunk_builder.digest();
685
686 let mut guard = index_csum.lock().unwrap();
687 let csum = guard.as_mut().unwrap();
688
689 let chunk_end = offset + chunk_len as u64;
690
ef6d4967
TL
691 if !is_fixed_chunk_size {
692 csum.update(&chunk_end.to_le_bytes());
693 }
cf9271e2
DM
694 csum.update(digest);
695
696 let chunk_is_known = known_chunks.contains(digest);
697 if chunk_is_known {
6e1deb15
TL
698 known_chunk_count.fetch_add(1, Ordering::SeqCst);
699 reused_len.fetch_add(chunk_len, Ordering::SeqCst);
cf9271e2
DM
700 future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
701 } else {
924373d2 702 let compressed_stream_len2 = compressed_stream_len.clone();
cf9271e2 703 known_chunks.insert(*digest);
ef6d4967 704 future::ready(chunk_builder.build().map(move |(chunk, digest)| {
924373d2 705 compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst);
ef6d4967 706 MergedChunkInfo::New(ChunkInfo {
cf9271e2
DM
707 chunk,
708 digest,
709 chunk_len: chunk_len as u64,
710 offset,
ef6d4967
TL
711 })
712 }))
cf9271e2
DM
713 }
714 })
715 .merge_known_chunks()
716 .try_for_each(move |merged_chunk_info| {
0bfcea6a 717 let upload_queue = upload_queue.clone();
cf9271e2
DM
718
719 if let MergedChunkInfo::New(chunk_info) = merged_chunk_info {
720 let offset = chunk_info.offset;
721 let digest = chunk_info.digest;
25877d05 722 let digest_str = hex::encode(&digest);
cf9271e2 723
8db14689
WB
724 /* too verbose, needs finer verbosity setting granularity
725 if verbose {
e02c3d46
DM
726 println!("upload new chunk {} ({} bytes, offset {})", digest_str,
727 chunk_info.chunk_len, offset);
728 }
8db14689 729 */
cf9271e2 730
db0cb9ce 731 let chunk_data = chunk_info.chunk.into_inner();
cf9271e2
DM
732 let param = json!({
733 "wid": wid,
734 "digest": digest_str,
735 "size": chunk_info.chunk_len,
736 "encoded-size": chunk_data.len(),
737 });
738
739 let ct = "application/octet-stream";
ef6d4967
TL
740 let request = H2Client::request_builder(
741 "localhost",
742 "POST",
743 &upload_chunk_path,
744 Some(param),
745 Some(ct),
746 )
747 .unwrap();
cf9271e2
DM
748 let upload_data = Some(bytes::Bytes::from(chunk_data));
749
750 let new_info = MergedChunkInfo::Known(vec![(offset, digest)]);
751
be3a0295 752 Either::Left(h2.send_request(request, upload_data).and_then(
ef6d4967 753 move |response| async move {
cf9271e2
DM
754 upload_queue
755 .send((new_info, Some(response)))
756 .await
ef6d4967
TL
757 .map_err(|err| {
758 format_err!("failed to send to upload queue: {}", err)
759 })
760 },
761 ))
cf9271e2 762 } else {
be3a0295 763 Either::Right(async move {
cf9271e2
DM
764 upload_queue
765 .send((merged_chunk_info, None))
766 .await
db0cb9ce 767 .map_err(|err| format_err!("failed to send to upload queue: {}", err))
cf9271e2
DM
768 })
769 }
770 })
ef6d4967 771 .then(move |result| async move { upload_result.await?.and(result) }.boxed())
cf9271e2 772 .and_then(move |_| {
6e1deb15 773 let duration = start_time.elapsed();
3b60b509
DC
774 let chunk_count = total_chunks2.load(Ordering::SeqCst);
775 let chunk_reused = known_chunk_count2.load(Ordering::SeqCst);
776 let size = stream_len2.load(Ordering::SeqCst);
777 let size_reused = reused_len2.load(Ordering::SeqCst);
924373d2 778 let size_compressed = compressed_stream_len2.load(Ordering::SeqCst) as usize;
cf9271e2
DM
779
780 let mut guard = index_csum_2.lock().unwrap();
781 let csum = guard.take().unwrap().finish();
782
3b60b509
DC
783 futures::future::ok(UploadStats {
784 chunk_count,
785 chunk_reused,
786 size,
787 size_reused,
924373d2 788 size_compressed,
ef6d4967
TL
789 duration,
790 csum,
3b60b509 791 })
cf9271e2
DM
792 })
793 }
794
1ffe0301 795 /// Upload speed test - prints result to stderr
dde18bbb 796 pub async fn upload_speedtest(&self, verbose: bool) -> Result<f64, Error> {
cf9271e2
DM
797 let mut data = vec![];
798 // generate pseudo random byte sequence
ef6d4967 799 for i in 0..1024 * 1024 {
cf9271e2 800 for j in 0..4 {
ef6d4967 801 let byte = ((i >> (j << 3)) & 0xff) as u8;
cf9271e2
DM
802 data.push(byte);
803 }
804 }
805
806 let item_len = data.len();
807
808 let mut repeat = 0;
809
323b2f3d 810 let (upload_queue, upload_result) = Self::response_queue(verbose);
cf9271e2
DM
811
812 let start_time = std::time::Instant::now();
813
814 loop {
815 repeat += 1;
816 if start_time.elapsed().as_secs() >= 5 {
817 break;
818 }
819
ef6d4967
TL
820 if verbose {
821 eprintln!("send test data ({} bytes)", data.len());
822 }
823 let request =
824 H2Client::request_builder("localhost", "POST", "speedtest", None, None).unwrap();
825 let request_future = self
826 .h2
827 .send_request(request, Some(bytes::Bytes::from(data.clone())))
828 .await?;
cf9271e2
DM
829
830 upload_queue.send(request_future).await?;
831 }
832
833 drop(upload_queue); // close queue
834
835 let _ = upload_result.await?;
836
ef6d4967
TL
837 eprintln!(
838 "Uploaded {} chunks in {} seconds.",
839 repeat,
840 start_time.elapsed().as_secs()
841 );
842 let speed = ((item_len * (repeat as usize)) as f64) / start_time.elapsed().as_secs_f64();
843 eprintln!(
844 "Time per request: {} microseconds.",
845 (start_time.elapsed().as_micros()) / (repeat as u128)
846 );
cf9271e2
DM
847
848 Ok(speed)
849 }
850}