]> git.proxmox.com Git - proxmox-backup.git/blame - pbs-client/src/backup_writer.rs
pbs-client: fixed typo in error message
[proxmox-backup.git] / pbs-client / src / backup_writer.rs
CommitLineData
cf9271e2 1use std::collections::HashSet;
be3a0295 2use std::future::Future;
b957aa81 3use std::os::unix::fs::OpenOptionsExt;
924373d2 4use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
cf9271e2
DM
5use std::sync::{Arc, Mutex};
6
f28d9088 7use anyhow::{bail, format_err, Error};
be3a0295
WB
8use futures::future::{self, AbortHandle, Either, FutureExt, TryFutureExt};
9use futures::stream::{Stream, StreamExt, TryStreamExt};
cf9271e2
DM
10use serde_json::{json, Value};
11use tokio::io::AsyncReadExt;
12use tokio::sync::{mpsc, oneshot};
7c667013 13use tokio_stream::wrappers::ReceiverStream;
cf9271e2 14
08f8a3e5 15use pbs_api_types::{BackupDir, BackupNamespace};
4805edc4
WB
16use pbs_datastore::data_blob::{ChunkInfo, DataBlob, DataChunkBuilder};
17use pbs_datastore::dynamic_index::DynamicIndexReader;
18use pbs_datastore::fixed_index::FixedIndexReader;
19use pbs_datastore::index::IndexFile;
20use pbs_datastore::manifest::{ArchiveType, BackupManifest, MANIFEST_BLOB_NAME};
bdfa6370
TL
21use pbs_datastore::{CATALOG_NAME, PROXMOX_BACKUP_PROTOCOL_ID_V1};
22use pbs_tools::crypt_config::CryptConfig;
770a36e5 23
08f8a3e5
LW
24use proxmox_human_byte::HumanByte;
25
ef6d4967 26use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo};
cf9271e2 27
ef6d4967 28use super::{H2Client, HttpClient};
cf9271e2
DM
29
30pub struct BackupWriter {
31 h2: H2Client,
dc089345 32 abort: AbortHandle,
b957aa81 33 crypt_config: Option<Arc<CryptConfig>>,
cf9271e2
DM
34}
35
36impl Drop for BackupWriter {
cf9271e2 37 fn drop(&mut self) {
dc089345 38 self.abort.abort();
cf9271e2
DM
39 }
40}
41
42pub struct BackupStats {
43 pub size: u64,
44 pub csum: [u8; 32],
45}
46
e43b9175
FG
47/// Options for uploading blobs/streams to the server
48#[derive(Default, Clone)]
49pub struct UploadOptions {
50 pub previous_manifest: Option<Arc<BackupManifest>>,
51 pub compress: bool,
52 pub encrypt: bool,
53 pub fixed_size: Option<u64>,
54}
55
3b60b509
DC
56struct UploadStats {
57 chunk_count: usize,
58 chunk_reused: usize,
59 size: usize,
60 size_reused: usize,
924373d2 61 size_compressed: usize,
3b60b509
DC
62 duration: std::time::Duration,
63 csum: [u8; 32],
64}
65
8db14689
WB
66type UploadQueueSender = mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>;
67type UploadResultReceiver = oneshot::Receiver<Result<(), Error>>;
68
cf9271e2 69impl BackupWriter {
e10fccf5 70 fn new(h2: H2Client, abort: AbortHandle, crypt_config: Option<Arc<CryptConfig>>) -> Arc<Self> {
ef6d4967
TL
71 Arc::new(Self {
72 h2,
73 abort,
74 crypt_config,
ef6d4967 75 })
cf9271e2
DM
76 }
77
367c0ff7
FG
78 // FIXME: extract into (flattened) parameter struct?
79 #[allow(clippy::too_many_arguments)]
cf9271e2 80 pub async fn start(
fcea0794 81 client: &HttpClient,
b957aa81 82 crypt_config: Option<Arc<CryptConfig>>,
cf9271e2 83 datastore: &str,
133d718f 84 ns: &BackupNamespace,
8c74349b 85 backup: &BackupDir,
cf9271e2 86 debug: bool,
ef6d4967 87 benchmark: bool,
cf9271e2 88 ) -> Result<Arc<BackupWriter>, Error> {
c18d481f 89 let mut param = json!({
8c74349b
WB
90 "backup-type": backup.ty(),
91 "backup-id": backup.id(),
92 "backup-time": backup.time,
cf9271e2 93 "store": datastore,
61d7b501
HL
94 "debug": debug,
95 "benchmark": benchmark
cf9271e2
DM
96 });
97
c18d481f 98 if !ns.is_root() {
bc21ade2 99 param["ns"] = serde_json::to_value(ns)?;
c18d481f
WB
100 }
101
cf9271e2 102 let req = HttpClient::request_builder(
ef6d4967
TL
103 client.server(),
104 client.port(),
105 "GET",
106 "/api2/json/backup",
107 Some(param),
108 )
109 .unwrap();
110
111 let (h2, abort) = client
112 .start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!()))
113 .await?;
cf9271e2 114
e10fccf5 115 Ok(BackupWriter::new(h2, abort, crypt_config))
cf9271e2
DM
116 }
117
ef6d4967 118 pub async fn get(&self, path: &str, param: Option<Value>) -> Result<Value, Error> {
cf9271e2
DM
119 self.h2.get(path, param).await
120 }
121
ef6d4967 122 pub async fn put(&self, path: &str, param: Option<Value>) -> Result<Value, Error> {
cf9271e2
DM
123 self.h2.put(path, param).await
124 }
125
ef6d4967 126 pub async fn post(&self, path: &str, param: Option<Value>) -> Result<Value, Error> {
cf9271e2
DM
127 self.h2.post(path, param).await
128 }
129
130 pub async fn upload_post(
131 &self,
132 path: &str,
133 param: Option<Value>,
134 content_type: &str,
135 data: Vec<u8>,
136 ) -> Result<Value, Error> {
ef6d4967
TL
137 self.h2
138 .upload("POST", path, param, content_type, data)
139 .await
cf9271e2
DM
140 }
141
142 pub async fn send_upload_request(
143 &self,
144 method: &str,
145 path: &str,
146 param: Option<Value>,
147 content_type: &str,
148 data: Vec<u8>,
149 ) -> Result<h2::client::ResponseFuture, Error> {
ef6d4967
TL
150 let request =
151 H2Client::request_builder("localhost", method, path, param, Some(content_type))
152 .unwrap();
153 let response_future = self
154 .h2
155 .send_request(request, Some(bytes::Bytes::from(data.clone())))
156 .await?;
cf9271e2
DM
157 Ok(response_future)
158 }
159
160 pub async fn upload_put(
161 &self,
162 path: &str,
163 param: Option<Value>,
164 content_type: &str,
165 data: Vec<u8>,
166 ) -> Result<Value, Error> {
167 self.h2.upload("PUT", path, param, content_type, data).await
168 }
169
170 pub async fn finish(self: Arc<Self>) -> Result<(), Error> {
171 let h2 = self.h2.clone();
172
173 h2.post("finish", None)
174 .map_ok(move |_| {
dc089345 175 self.abort.abort();
cf9271e2
DM
176 })
177 .await
178 }
179
e016f9ff 180 pub fn cancel(&self) {
dc089345 181 self.abort.abort();
cf9271e2
DM
182 }
183
184 pub async fn upload_blob<R: std::io::Read>(
185 &self,
186 mut reader: R,
187 file_name: &str,
ef6d4967 188 ) -> Result<BackupStats, Error> {
cf9271e2
DM
189 let mut raw_data = Vec::new();
190 // fixme: avoid loading into memory
191 reader.read_to_end(&mut raw_data)?;
192
193 let csum = openssl::sha::sha256(&raw_data);
194 let param = json!({"encoded-size": raw_data.len(), "file-name": file_name });
195 let size = raw_data.len() as u64;
ef6d4967
TL
196 let _value = self
197 .h2
198 .upload(
199 "POST",
200 "blob",
201 Some(param),
202 "application/octet-stream",
203 raw_data,
204 )
205 .await?;
cf9271e2
DM
206 Ok(BackupStats { size, csum })
207 }
208
209 pub async fn upload_blob_from_data(
210 &self,
211 data: Vec<u8>,
212 file_name: &str,
e43b9175 213 options: UploadOptions,
f28d9088 214 ) -> Result<BackupStats, Error> {
e43b9175 215 let blob = match (options.encrypt, &self.crypt_config) {
ef6d4967
TL
216 (false, _) => DataBlob::encode(&data, None, options.compress)?,
217 (true, None) => bail!("requested encryption without a crypt config"),
218 (true, Some(crypt_config)) => {
219 DataBlob::encode(&data, Some(crypt_config), options.compress)?
220 }
cf9271e2
DM
221 };
222
223 let raw_data = blob.into_inner();
224 let size = raw_data.len() as u64;
225
226 let csum = openssl::sha::sha256(&raw_data);
227 let param = json!({"encoded-size": size, "file-name": file_name });
ef6d4967
TL
228 let _value = self
229 .h2
230 .upload(
231 "POST",
232 "blob",
233 Some(param),
234 "application/octet-stream",
235 raw_data,
236 )
237 .await?;
cf9271e2
DM
238 Ok(BackupStats { size, csum })
239 }
240
241 pub async fn upload_blob_from_file<P: AsRef<std::path::Path>>(
242 &self,
243 src_path: P,
244 file_name: &str,
e43b9175 245 options: UploadOptions,
3638341a 246 ) -> Result<BackupStats, Error> {
cf9271e2
DM
247 let src_path = src_path.as_ref();
248
249 let mut file = tokio::fs::File::open(src_path)
250 .await
251 .map_err(|err| format_err!("unable to open file {:?} - {}", src_path, err))?;
252
253 let mut contents = Vec::new();
254
255 file.read_to_end(&mut contents)
256 .await
257 .map_err(|err| format_err!("unable to read file {:?} - {}", src_path, err))?;
258
ef6d4967
TL
259 self.upload_blob_from_data(contents, file_name, options)
260 .await
cf9271e2
DM
261 }
262
263 pub async fn upload_stream(
264 &self,
265 archive_name: &str,
266 stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
e43b9175 267 options: UploadOptions,
cf9271e2
DM
268 ) -> Result<BackupStats, Error> {
269 let known_chunks = Arc::new(Mutex::new(HashSet::new()));
270
271 let mut param = json!({ "archive-name": archive_name });
e43b9175 272 let prefix = if let Some(size) = options.fixed_size {
cf9271e2 273 param["size"] = size.into();
e43b9175
FG
274 "fixed"
275 } else {
276 "dynamic"
277 };
cf9271e2 278
e43b9175 279 if options.encrypt && self.crypt_config.is_none() {
3638341a
DM
280 bail!("requested encryption without a crypt config");
281 }
282
cf9271e2
DM
283 let index_path = format!("{}_index", prefix);
284 let close_path = format!("{}_close", prefix);
285
e43b9175 286 if let Some(manifest) = options.previous_manifest {
03e71cc8
MS
287 if !manifest
288 .files()
289 .iter()
290 .any(|file| file.filename == archive_name)
291 {
e61aa410 292 log::info!("Previous manifest does not contain an archive called '{archive_name}', skipping download..");
03e71cc8
MS
293 } else {
294 // try, but ignore errors
295 match ArchiveType::from_path(archive_name) {
296 Ok(ArchiveType::FixedIndex) => {
297 if let Err(err) = self
298 .download_previous_fixed_index(
299 archive_name,
300 &manifest,
301 known_chunks.clone(),
302 )
303 .await
304 {
305 log::warn!("Error downloading .fidx from previous manifest: {}", err);
306 }
7c22932c 307 }
03e71cc8
MS
308 Ok(ArchiveType::DynamicIndex) => {
309 if let Err(err) = self
310 .download_previous_dynamic_index(
311 archive_name,
312 &manifest,
313 known_chunks.clone(),
314 )
315 .await
316 {
317 log::warn!("Error downloading .didx from previous manifest: {}", err);
318 }
7c22932c 319 }
03e71cc8 320 _ => { /* do nothing */ }
b957aa81 321 }
b957aa81
DM
322 }
323 }
cf9271e2 324
ef6d4967
TL
325 let wid = self
326 .h2
327 .post(&index_path, Some(param))
328 .await?
329 .as_u64()
330 .unwrap();
cf9271e2 331
3b60b509
DC
332 let upload_stats = Self::upload_chunk_info_stream(
333 self.h2.clone(),
334 wid,
335 stream,
9a37bd6c 336 prefix,
3b60b509
DC
337 known_chunks.clone(),
338 if options.encrypt {
339 self.crypt_config.clone()
340 } else {
341 None
342 },
343 options.compress,
3b60b509
DC
344 )
345 .await?;
cf9271e2 346
924373d2
DC
347 let size_dirty = upload_stats.size - upload_stats.size_reused;
348 let size: HumanByte = upload_stats.size.into();
e10fccf5 349 let archive = if log::log_enabled!(log::Level::Debug) {
d7eedbd2 350 archive_name
6e1deb15 351 } else {
770a36e5 352 pbs_tools::format::strip_server_file_extension(archive_name)
6e1deb15 353 };
e10fccf5 354
6e1deb15 355 if archive_name != CATALOG_NAME {
ef6d4967 356 let speed: HumanByte =
924373d2
DC
357 ((size_dirty * 1_000_000) / (upload_stats.duration.as_micros() as usize)).into();
358 let size_dirty: HumanByte = size_dirty.into();
359 let size_compressed: HumanByte = upload_stats.size_compressed.into();
e10fccf5 360 log::info!(
924373d2 361 "{}: had to backup {} of {} (compressed {}) in {:.2}s",
ef6d4967 362 archive,
924373d2
DC
363 size_dirty,
364 size,
365 size_compressed,
366 upload_stats.duration.as_secs_f64()
ef6d4967 367 );
e10fccf5 368 log::info!("{}: average backup speed: {}/s", archive, speed);
6e1deb15 369 } else {
e10fccf5 370 log::info!("Uploaded backup catalog ({})", size);
6e1deb15
TL
371 }
372
3b60b509
DC
373 if upload_stats.size_reused > 0 && upload_stats.size > 1024 * 1024 {
374 let reused_percent = upload_stats.size_reused as f64 * 100. / upload_stats.size as f64;
375 let reused: HumanByte = upload_stats.size_reused.into();
e10fccf5 376 log::info!(
ef6d4967 377 "{}: backup was done incrementally, reused {} ({:.1}%)",
e10fccf5
HL
378 archive,
379 reused,
380 reused_percent
ef6d4967 381 );
6e1deb15 382 }
e10fccf5
HL
383 if log::log_enabled!(log::Level::Debug) && upload_stats.chunk_count > 0 {
384 log::debug!(
ef6d4967 385 "{}: Reused {} from {} chunks.",
e10fccf5
HL
386 archive,
387 upload_stats.chunk_reused,
388 upload_stats.chunk_count
ef6d4967 389 );
e10fccf5 390 log::debug!(
ef6d4967
TL
391 "{}: Average chunk size was {}.",
392 archive,
3b60b509 393 HumanByte::from(upload_stats.size / upload_stats.chunk_count)
ef6d4967 394 );
e10fccf5 395 log::debug!(
ef6d4967
TL
396 "{}: Average time per request: {} microseconds.",
397 archive,
3b60b509 398 (upload_stats.duration.as_micros()) / (upload_stats.chunk_count as u128)
ef6d4967 399 );
6da73c82
DM
400 }
401
cf9271e2
DM
402 let param = json!({
403 "wid": wid ,
3b60b509
DC
404 "chunk-count": upload_stats.chunk_count,
405 "size": upload_stats.size,
16f6766a 406 "csum": hex::encode(upload_stats.csum),
cf9271e2
DM
407 });
408 let _value = self.h2.post(&close_path, Some(param)).await?;
409 Ok(BackupStats {
3b60b509
DC
410 size: upload_stats.size as u64,
411 csum: upload_stats.csum,
cf9271e2
DM
412 })
413 }
414
e10fccf5 415 fn response_queue() -> (
cf9271e2 416 mpsc::Sender<h2::client::ResponseFuture>,
ef6d4967 417 oneshot::Receiver<Result<(), Error>>,
cf9271e2
DM
418 ) {
419 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100);
420 let (verify_result_tx, verify_result_rx) = oneshot::channel();
421
db0cb9ce
WB
422 // FIXME: check if this works as expected as replacement for the combinator below?
423 // tokio::spawn(async move {
424 // let result: Result<(), Error> = (async move {
425 // while let Some(response) = verify_queue_rx.recv().await {
426 // match H2Client::h2api_response(response.await?).await {
427 // Ok(result) => println!("RESPONSE: {:?}", result),
428 // Err(err) => bail!("pipelined request failed: {}", err),
429 // }
430 // }
431 // Ok(())
432 // }).await;
433 // let _ignore_closed_channel = verify_result_tx.send(result);
434 // });
435 // old code for reference?
436 tokio::spawn(
7c667013 437 ReceiverStream::new(verify_queue_rx)
cf9271e2 438 .map(Ok::<_, Error>)
323b2f3d 439 .try_for_each(move |response: h2::client::ResponseFuture| {
cf9271e2
DM
440 response
441 .map_err(Error::from)
442 .and_then(H2Client::h2api_response)
e10fccf5 443 .map_ok(move |result| log::debug!("RESPONSE: {:?}", result))
cf9271e2
DM
444 .map_err(|err| format_err!("pipelined request failed: {}", err))
445 })
446 .map(|result| {
ef6d4967
TL
447 let _ignore_closed_channel = verify_result_tx.send(result);
448 }),
cf9271e2
DM
449 );
450
451 (verify_queue_tx, verify_result_rx)
452 }
453
8db14689
WB
454 fn append_chunk_queue(
455 h2: H2Client,
456 wid: u64,
457 path: String,
8db14689 458 ) -> (UploadQueueSender, UploadResultReceiver) {
cf9271e2
DM
459 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(64);
460 let (verify_result_tx, verify_result_rx) = oneshot::channel();
461
db0cb9ce
WB
462 // FIXME: async-block-ify this code!
463 tokio::spawn(
7c667013 464 ReceiverStream::new(verify_queue_rx)
cf9271e2
DM
465 .map(Ok::<_, Error>)
466 .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| {
467 match (response, merged_chunk_info) {
468 (Some(response), MergedChunkInfo::Known(list)) => {
be3a0295 469 Either::Left(
cf9271e2
DM
470 response
471 .map_err(Error::from)
472 .and_then(H2Client::h2api_response)
473 .and_then(move |_result| {
474 future::ok(MergedChunkInfo::Known(list))
475 })
476 )
477 }
478 (None, MergedChunkInfo::Known(list)) => {
be3a0295 479 Either::Right(future::ok(MergedChunkInfo::Known(list)))
cf9271e2
DM
480 }
481 _ => unreachable!(),
482 }
483 })
484 .merge_known_chunks()
485 .and_then(move |merged_chunk_info| {
486 match merged_chunk_info {
487 MergedChunkInfo::Known(chunk_list) => {
488 let mut digest_list = vec![];
489 let mut offset_list = vec![];
490 for (offset, digest) in chunk_list {
16f6766a 491 digest_list.push(hex::encode(digest));
cf9271e2
DM
492 offset_list.push(offset);
493 }
e10fccf5 494 log::debug!("append chunks list len ({})", digest_list.len());
cf9271e2
DM
495 let param = json!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list });
496 let request = H2Client::request_builder("localhost", "PUT", &path, None, Some("application/json")).unwrap();
db0cb9ce 497 let param_data = bytes::Bytes::from(param.to_string().into_bytes());
cf9271e2 498 let upload_data = Some(param_data);
8db14689 499 h2.send_request(request, upload_data)
cf9271e2
DM
500 .and_then(move |response| {
501 response
502 .map_err(Error::from)
503 .and_then(H2Client::h2api_response)
504 .map_ok(|_| ())
505 })
506 .map_err(|err| format_err!("pipelined request failed: {}", err))
507 }
508 _ => unreachable!(),
509 }
510 })
511 .try_for_each(|_| future::ok(()))
512 .map(|result| {
513 let _ignore_closed_channel = verify_result_tx.send(result);
514 })
515 );
516
517 (verify_queue_tx, verify_result_rx)
518 }
519
b957aa81 520 pub async fn download_previous_fixed_index(
cf9271e2 521 &self,
cf9271e2 522 archive_name: &str,
b957aa81 523 manifest: &BackupManifest,
ef6d4967 524 known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
b957aa81 525 ) -> Result<FixedIndexReader, Error> {
b957aa81
DM
526 let mut tmpfile = std::fs::OpenOptions::new()
527 .write(true)
528 .read(true)
529 .custom_flags(libc::O_TMPFILE)
530 .open("/tmp")?;
cf9271e2 531
b957aa81 532 let param = json!({ "archive-name": archive_name });
ef6d4967
TL
533 self.h2
534 .download("previous", Some(param), &mut tmpfile)
535 .await?;
b957aa81 536
ef6d4967
TL
537 let index = FixedIndexReader::new(tmpfile).map_err(|err| {
538 format_err!("unable to read fixed index '{}' - {}", archive_name, err)
539 })?;
b957aa81
DM
540 // Note: do not use values stored in index (not trusted) - instead, computed them again
541 let (csum, size) = index.compute_csum();
542 manifest.verify_file(archive_name, &csum, size)?;
543
544 // add index chunks to known chunks
545 let mut known_chunks = known_chunks.lock().unwrap();
546 for i in 0..index.index_count() {
547 known_chunks.insert(*index.index_digest(i).unwrap());
548 }
cf9271e2 549
e10fccf5
HL
550 log::debug!(
551 "{}: known chunks list length is {}",
552 archive_name,
553 index.index_count()
554 );
cf9271e2 555
b957aa81
DM
556 Ok(index)
557 }
558
559 pub async fn download_previous_dynamic_index(
560 &self,
561 archive_name: &str,
562 manifest: &BackupManifest,
ef6d4967 563 known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
b957aa81 564 ) -> Result<DynamicIndexReader, Error> {
b957aa81
DM
565 let mut tmpfile = std::fs::OpenOptions::new()
566 .write(true)
567 .read(true)
568 .custom_flags(libc::O_TMPFILE)
569 .open("/tmp")?;
cf9271e2 570
b957aa81 571 let param = json!({ "archive-name": archive_name });
ef6d4967
TL
572 self.h2
573 .download("previous", Some(param), &mut tmpfile)
574 .await?;
b957aa81 575
ef6d4967 576 let index = DynamicIndexReader::new(tmpfile).map_err(|err| {
52731339 577 format_err!("unable to read dynamic index '{archive_name}' - {err}")
ef6d4967 578 })?;
b957aa81
DM
579 // Note: do not use values stored in index (not trusted) - instead, computed them again
580 let (csum, size) = index.compute_csum();
581 manifest.verify_file(archive_name, &csum, size)?;
582
583 // add index chunks to known chunks
584 let mut known_chunks = known_chunks.lock().unwrap();
585 for i in 0..index.index_count() {
586 known_chunks.insert(*index.index_digest(i).unwrap());
cf9271e2
DM
587 }
588
e10fccf5
HL
589 log::debug!(
590 "{}: known chunks list length is {}",
591 archive_name,
592 index.index_count()
593 );
ee5fe978 594
b957aa81
DM
595 Ok(index)
596 }
597
8b7f8d3f
FG
598 /// Retrieve backup time of last backup
599 pub async fn previous_backup_time(&self) -> Result<Option<i64>, Error> {
600 let data = self.h2.get("previous_backup_time", None).await?;
ef6d4967
TL
601 serde_json::from_value(data).map_err(|err| {
602 format_err!(
603 "Failed to parse backup time value returned by server - {}",
604 err
605 )
606 })
8b7f8d3f
FG
607 }
608
b957aa81
DM
609 /// Download backup manifest (index.json) of last backup
610 pub async fn download_previous_manifest(&self) -> Result<BackupManifest, Error> {
b957aa81
DM
611 let mut raw_data = Vec::with_capacity(64 * 1024);
612
613 let param = json!({ "archive-name": MANIFEST_BLOB_NAME });
ef6d4967
TL
614 self.h2
615 .download("previous", Some(param), &mut raw_data)
616 .await?;
b957aa81 617
39f18b30 618 let blob = DataBlob::load_from_reader(&mut &raw_data[..])?;
8819d1f2
FG
619 // no expected digest available
620 let data = blob.decode(self.crypt_config.as_ref().map(Arc::as_ref), None)?;
3dacedce 621
ef6d4967
TL
622 let manifest =
623 BackupManifest::from_data(&data[..], self.crypt_config.as_ref().map(Arc::as_ref))?;
b957aa81
DM
624
625 Ok(manifest)
cf9271e2
DM
626 }
627
8db14689 628 // We have no `self` here for `h2` and `verbose`, the only other arg "common" with 1 other
d1d74c43 629 // function in the same path is `wid`, so those 3 could be in a struct, but there's no real use
8db14689
WB
630 // since this is a private method.
631 #[allow(clippy::too_many_arguments)]
cf9271e2
DM
632 fn upload_chunk_info_stream(
633 h2: H2Client,
634 wid: u64,
635 stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
636 prefix: &str,
ef6d4967 637 known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
cf9271e2 638 crypt_config: Option<Arc<CryptConfig>>,
3638341a 639 compress: bool,
3b60b509 640 ) -> impl Future<Output = Result<UploadStats, Error>> {
6e1deb15
TL
641 let total_chunks = Arc::new(AtomicUsize::new(0));
642 let total_chunks2 = total_chunks.clone();
643 let known_chunk_count = Arc::new(AtomicUsize::new(0));
644 let known_chunk_count2 = known_chunk_count.clone();
cf9271e2
DM
645
646 let stream_len = Arc::new(AtomicUsize::new(0));
647 let stream_len2 = stream_len.clone();
924373d2
DC
648 let compressed_stream_len = Arc::new(AtomicU64::new(0));
649 let compressed_stream_len2 = compressed_stream_len.clone();
6e1deb15
TL
650 let reused_len = Arc::new(AtomicUsize::new(0));
651 let reused_len2 = reused_len.clone();
cf9271e2
DM
652
653 let append_chunk_path = format!("{}_index", prefix);
654 let upload_chunk_path = format!("{}_chunk", prefix);
655 let is_fixed_chunk_size = prefix == "fixed";
656
657 let (upload_queue, upload_result) =
e10fccf5 658 Self::append_chunk_queue(h2.clone(), wid, append_chunk_path);
cf9271e2
DM
659
660 let start_time = std::time::Instant::now();
661
662 let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
663 let index_csum_2 = index_csum.clone();
664
665 stream
666 .and_then(move |data| {
cf9271e2
DM
667 let chunk_len = data.len();
668
6e1deb15 669 total_chunks.fetch_add(1, Ordering::SeqCst);
cf9271e2
DM
670 let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;
671
ef6d4967 672 let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress);
cf9271e2
DM
673
674 if let Some(ref crypt_config) = crypt_config {
3638341a 675 chunk_builder = chunk_builder.crypt_config(crypt_config);
cf9271e2
DM
676 }
677
678 let mut known_chunks = known_chunks.lock().unwrap();
679 let digest = chunk_builder.digest();
680
681 let mut guard = index_csum.lock().unwrap();
682 let csum = guard.as_mut().unwrap();
683
684 let chunk_end = offset + chunk_len as u64;
685
ef6d4967
TL
686 if !is_fixed_chunk_size {
687 csum.update(&chunk_end.to_le_bytes());
688 }
cf9271e2
DM
689 csum.update(digest);
690
691 let chunk_is_known = known_chunks.contains(digest);
692 if chunk_is_known {
6e1deb15
TL
693 known_chunk_count.fetch_add(1, Ordering::SeqCst);
694 reused_len.fetch_add(chunk_len, Ordering::SeqCst);
cf9271e2
DM
695 future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
696 } else {
924373d2 697 let compressed_stream_len2 = compressed_stream_len.clone();
cf9271e2 698 known_chunks.insert(*digest);
ef6d4967 699 future::ready(chunk_builder.build().map(move |(chunk, digest)| {
924373d2 700 compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst);
ef6d4967 701 MergedChunkInfo::New(ChunkInfo {
cf9271e2
DM
702 chunk,
703 digest,
704 chunk_len: chunk_len as u64,
705 offset,
ef6d4967
TL
706 })
707 }))
cf9271e2
DM
708 }
709 })
710 .merge_known_chunks()
711 .try_for_each(move |merged_chunk_info| {
0bfcea6a 712 let upload_queue = upload_queue.clone();
cf9271e2
DM
713
714 if let MergedChunkInfo::New(chunk_info) = merged_chunk_info {
715 let offset = chunk_info.offset;
716 let digest = chunk_info.digest;
16f6766a 717 let digest_str = hex::encode(digest);
cf9271e2 718
e10fccf5
HL
719 log::trace!(
720 "upload new chunk {} ({} bytes, offset {})",
721 digest_str,
722 chunk_info.chunk_len,
723 offset
724 );
cf9271e2 725
db0cb9ce 726 let chunk_data = chunk_info.chunk.into_inner();
cf9271e2
DM
727 let param = json!({
728 "wid": wid,
729 "digest": digest_str,
730 "size": chunk_info.chunk_len,
731 "encoded-size": chunk_data.len(),
732 });
733
734 let ct = "application/octet-stream";
ef6d4967
TL
735 let request = H2Client::request_builder(
736 "localhost",
737 "POST",
738 &upload_chunk_path,
739 Some(param),
740 Some(ct),
741 )
742 .unwrap();
cf9271e2
DM
743 let upload_data = Some(bytes::Bytes::from(chunk_data));
744
745 let new_info = MergedChunkInfo::Known(vec![(offset, digest)]);
746
be3a0295 747 Either::Left(h2.send_request(request, upload_data).and_then(
ef6d4967 748 move |response| async move {
cf9271e2
DM
749 upload_queue
750 .send((new_info, Some(response)))
751 .await
ef6d4967
TL
752 .map_err(|err| {
753 format_err!("failed to send to upload queue: {}", err)
754 })
755 },
756 ))
cf9271e2 757 } else {
be3a0295 758 Either::Right(async move {
cf9271e2
DM
759 upload_queue
760 .send((merged_chunk_info, None))
761 .await
db0cb9ce 762 .map_err(|err| format_err!("failed to send to upload queue: {}", err))
cf9271e2
DM
763 })
764 }
765 })
ef6d4967 766 .then(move |result| async move { upload_result.await?.and(result) }.boxed())
cf9271e2 767 .and_then(move |_| {
6e1deb15 768 let duration = start_time.elapsed();
3b60b509
DC
769 let chunk_count = total_chunks2.load(Ordering::SeqCst);
770 let chunk_reused = known_chunk_count2.load(Ordering::SeqCst);
771 let size = stream_len2.load(Ordering::SeqCst);
772 let size_reused = reused_len2.load(Ordering::SeqCst);
924373d2 773 let size_compressed = compressed_stream_len2.load(Ordering::SeqCst) as usize;
cf9271e2
DM
774
775 let mut guard = index_csum_2.lock().unwrap();
776 let csum = guard.take().unwrap().finish();
777
3b60b509
DC
778 futures::future::ok(UploadStats {
779 chunk_count,
780 chunk_reused,
781 size,
782 size_reused,
924373d2 783 size_compressed,
ef6d4967
TL
784 duration,
785 csum,
3b60b509 786 })
cf9271e2
DM
787 })
788 }
789
1ffe0301 790 /// Upload speed test - prints result to stderr
e10fccf5 791 pub async fn upload_speedtest(&self) -> Result<f64, Error> {
cf9271e2
DM
792 let mut data = vec![];
793 // generate pseudo random byte sequence
ef6d4967 794 for i in 0..1024 * 1024 {
cf9271e2 795 for j in 0..4 {
ef6d4967 796 let byte = ((i >> (j << 3)) & 0xff) as u8;
cf9271e2
DM
797 data.push(byte);
798 }
799 }
800
801 let item_len = data.len();
802
803 let mut repeat = 0;
804
e10fccf5 805 let (upload_queue, upload_result) = Self::response_queue();
cf9271e2
DM
806
807 let start_time = std::time::Instant::now();
808
809 loop {
810 repeat += 1;
811 if start_time.elapsed().as_secs() >= 5 {
812 break;
813 }
814
e10fccf5 815 log::debug!("send test data ({} bytes)", data.len());
ef6d4967
TL
816 let request =
817 H2Client::request_builder("localhost", "POST", "speedtest", None, None).unwrap();
818 let request_future = self
819 .h2
820 .send_request(request, Some(bytes::Bytes::from(data.clone())))
821 .await?;
cf9271e2
DM
822
823 upload_queue.send(request_future).await?;
824 }
825
826 drop(upload_queue); // close queue
827
828 let _ = upload_result.await?;
829
e10fccf5 830 log::info!(
ef6d4967
TL
831 "Uploaded {} chunks in {} seconds.",
832 repeat,
833 start_time.elapsed().as_secs()
834 );
835 let speed = ((item_len * (repeat as usize)) as f64) / start_time.elapsed().as_secs_f64();
e10fccf5 836 log::info!(
ef6d4967
TL
837 "Time per request: {} microseconds.",
838 (start_time.elapsed().as_micros()) / (repeat as u128)
839 );
cf9271e2
DM
840
841 Ok(speed)
842 }
843}