]> git.proxmox.com Git - proxmox-backup.git/blame - pbs-client/src/backup_writer.rs
tree-wide: fix needless borrows
[proxmox-backup.git] / pbs-client / src / backup_writer.rs
CommitLineData
cf9271e2 1use std::collections::HashSet;
be3a0295 2use std::future::Future;
b957aa81 3use std::os::unix::fs::OpenOptionsExt;
924373d2 4use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
cf9271e2
DM
5use std::sync::{Arc, Mutex};
6
f28d9088 7use anyhow::{bail, format_err, Error};
be3a0295
WB
8use futures::future::{self, AbortHandle, Either, FutureExt, TryFutureExt};
9use futures::stream::{Stream, StreamExt, TryStreamExt};
cf9271e2
DM
10use serde_json::{json, Value};
11use tokio::io::AsyncReadExt;
12use tokio::sync::{mpsc, oneshot};
7c667013 13use tokio_stream::wrappers::ReceiverStream;
cf9271e2 14
a58a5cf7 15use pbs_api_types::HumanByte;
bbdda58b 16use pbs_tools::crypt_config::CryptConfig;
bbdda58b 17use pbs_datastore::{CATALOG_NAME, PROXMOX_BACKUP_PROTOCOL_ID_V1};
4805edc4
WB
18use pbs_datastore::data_blob::{ChunkInfo, DataBlob, DataChunkBuilder};
19use pbs_datastore::dynamic_index::DynamicIndexReader;
20use pbs_datastore::fixed_index::FixedIndexReader;
21use pbs_datastore::index::IndexFile;
22use pbs_datastore::manifest::{ArchiveType, BackupManifest, MANIFEST_BLOB_NAME};
770a36e5 23
ef6d4967 24use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo};
cf9271e2 25
ef6d4967 26use super::{H2Client, HttpClient};
cf9271e2
DM
27
28pub struct BackupWriter {
29 h2: H2Client,
dc089345 30 abort: AbortHandle,
e02c3d46 31 verbose: bool,
b957aa81 32 crypt_config: Option<Arc<CryptConfig>>,
cf9271e2
DM
33}
34
35impl Drop for BackupWriter {
cf9271e2 36 fn drop(&mut self) {
dc089345 37 self.abort.abort();
cf9271e2
DM
38 }
39}
40
41pub struct BackupStats {
42 pub size: u64,
43 pub csum: [u8; 32],
44}
45
e43b9175
FG
46/// Options for uploading blobs/streams to the server
47#[derive(Default, Clone)]
48pub struct UploadOptions {
49 pub previous_manifest: Option<Arc<BackupManifest>>,
50 pub compress: bool,
51 pub encrypt: bool,
52 pub fixed_size: Option<u64>,
53}
54
3b60b509
DC
55struct UploadStats {
56 chunk_count: usize,
57 chunk_reused: usize,
58 size: usize,
59 size_reused: usize,
924373d2 60 size_compressed: usize,
3b60b509
DC
61 duration: std::time::Duration,
62 csum: [u8; 32],
63}
64
8db14689
WB
65type UploadQueueSender = mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>;
66type UploadResultReceiver = oneshot::Receiver<Result<(), Error>>;
67
cf9271e2 68impl BackupWriter {
ef6d4967
TL
69 fn new(
70 h2: H2Client,
71 abort: AbortHandle,
72 crypt_config: Option<Arc<CryptConfig>>,
73 verbose: bool,
74 ) -> Arc<Self> {
75 Arc::new(Self {
76 h2,
77 abort,
78 crypt_config,
79 verbose,
80 })
cf9271e2
DM
81 }
82
367c0ff7
FG
83 // FIXME: extract into (flattened) parameter struct?
84 #[allow(clippy::too_many_arguments)]
cf9271e2
DM
85 pub async fn start(
86 client: HttpClient,
b957aa81 87 crypt_config: Option<Arc<CryptConfig>>,
cf9271e2
DM
88 datastore: &str,
89 backup_type: &str,
90 backup_id: &str,
6a7be83e 91 backup_time: i64,
cf9271e2 92 debug: bool,
ef6d4967 93 benchmark: bool,
cf9271e2 94 ) -> Result<Arc<BackupWriter>, Error> {
cf9271e2
DM
95 let param = json!({
96 "backup-type": backup_type,
97 "backup-id": backup_id,
6a7be83e 98 "backup-time": backup_time,
cf9271e2 99 "store": datastore,
61d7b501
HL
100 "debug": debug,
101 "benchmark": benchmark
cf9271e2
DM
102 });
103
104 let req = HttpClient::request_builder(
ef6d4967
TL
105 client.server(),
106 client.port(),
107 "GET",
108 "/api2/json/backup",
109 Some(param),
110 )
111 .unwrap();
112
113 let (h2, abort) = client
114 .start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!()))
115 .await?;
cf9271e2 116
b957aa81 117 Ok(BackupWriter::new(h2, abort, crypt_config, debug))
cf9271e2
DM
118 }
119
ef6d4967 120 pub async fn get(&self, path: &str, param: Option<Value>) -> Result<Value, Error> {
cf9271e2
DM
121 self.h2.get(path, param).await
122 }
123
ef6d4967 124 pub async fn put(&self, path: &str, param: Option<Value>) -> Result<Value, Error> {
cf9271e2
DM
125 self.h2.put(path, param).await
126 }
127
ef6d4967 128 pub async fn post(&self, path: &str, param: Option<Value>) -> Result<Value, Error> {
cf9271e2
DM
129 self.h2.post(path, param).await
130 }
131
132 pub async fn upload_post(
133 &self,
134 path: &str,
135 param: Option<Value>,
136 content_type: &str,
137 data: Vec<u8>,
138 ) -> Result<Value, Error> {
ef6d4967
TL
139 self.h2
140 .upload("POST", path, param, content_type, data)
141 .await
cf9271e2
DM
142 }
143
144 pub async fn send_upload_request(
145 &self,
146 method: &str,
147 path: &str,
148 param: Option<Value>,
149 content_type: &str,
150 data: Vec<u8>,
151 ) -> Result<h2::client::ResponseFuture, Error> {
ef6d4967
TL
152 let request =
153 H2Client::request_builder("localhost", method, path, param, Some(content_type))
154 .unwrap();
155 let response_future = self
156 .h2
157 .send_request(request, Some(bytes::Bytes::from(data.clone())))
158 .await?;
cf9271e2
DM
159 Ok(response_future)
160 }
161
162 pub async fn upload_put(
163 &self,
164 path: &str,
165 param: Option<Value>,
166 content_type: &str,
167 data: Vec<u8>,
168 ) -> Result<Value, Error> {
169 self.h2.upload("PUT", path, param, content_type, data).await
170 }
171
172 pub async fn finish(self: Arc<Self>) -> Result<(), Error> {
173 let h2 = self.h2.clone();
174
175 h2.post("finish", None)
176 .map_ok(move |_| {
dc089345 177 self.abort.abort();
cf9271e2
DM
178 })
179 .await
180 }
181
e016f9ff 182 pub fn cancel(&self) {
dc089345 183 self.abort.abort();
cf9271e2
DM
184 }
185
186 pub async fn upload_blob<R: std::io::Read>(
187 &self,
188 mut reader: R,
189 file_name: &str,
ef6d4967 190 ) -> Result<BackupStats, Error> {
cf9271e2
DM
191 let mut raw_data = Vec::new();
192 // fixme: avoid loading into memory
193 reader.read_to_end(&mut raw_data)?;
194
195 let csum = openssl::sha::sha256(&raw_data);
196 let param = json!({"encoded-size": raw_data.len(), "file-name": file_name });
197 let size = raw_data.len() as u64;
ef6d4967
TL
198 let _value = self
199 .h2
200 .upload(
201 "POST",
202 "blob",
203 Some(param),
204 "application/octet-stream",
205 raw_data,
206 )
207 .await?;
cf9271e2
DM
208 Ok(BackupStats { size, csum })
209 }
210
211 pub async fn upload_blob_from_data(
212 &self,
213 data: Vec<u8>,
214 file_name: &str,
e43b9175 215 options: UploadOptions,
f28d9088 216 ) -> Result<BackupStats, Error> {
e43b9175 217 let blob = match (options.encrypt, &self.crypt_config) {
ef6d4967
TL
218 (false, _) => DataBlob::encode(&data, None, options.compress)?,
219 (true, None) => bail!("requested encryption without a crypt config"),
220 (true, Some(crypt_config)) => {
221 DataBlob::encode(&data, Some(crypt_config), options.compress)?
222 }
cf9271e2
DM
223 };
224
225 let raw_data = blob.into_inner();
226 let size = raw_data.len() as u64;
227
228 let csum = openssl::sha::sha256(&raw_data);
229 let param = json!({"encoded-size": size, "file-name": file_name });
ef6d4967
TL
230 let _value = self
231 .h2
232 .upload(
233 "POST",
234 "blob",
235 Some(param),
236 "application/octet-stream",
237 raw_data,
238 )
239 .await?;
cf9271e2
DM
240 Ok(BackupStats { size, csum })
241 }
242
243 pub async fn upload_blob_from_file<P: AsRef<std::path::Path>>(
244 &self,
245 src_path: P,
246 file_name: &str,
e43b9175 247 options: UploadOptions,
3638341a 248 ) -> Result<BackupStats, Error> {
cf9271e2
DM
249 let src_path = src_path.as_ref();
250
251 let mut file = tokio::fs::File::open(src_path)
252 .await
253 .map_err(|err| format_err!("unable to open file {:?} - {}", src_path, err))?;
254
255 let mut contents = Vec::new();
256
257 file.read_to_end(&mut contents)
258 .await
259 .map_err(|err| format_err!("unable to read file {:?} - {}", src_path, err))?;
260
ef6d4967
TL
261 self.upload_blob_from_data(contents, file_name, options)
262 .await
cf9271e2
DM
263 }
264
265 pub async fn upload_stream(
266 &self,
267 archive_name: &str,
268 stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
e43b9175 269 options: UploadOptions,
cf9271e2
DM
270 ) -> Result<BackupStats, Error> {
271 let known_chunks = Arc::new(Mutex::new(HashSet::new()));
272
273 let mut param = json!({ "archive-name": archive_name });
e43b9175 274 let prefix = if let Some(size) = options.fixed_size {
cf9271e2 275 param["size"] = size.into();
e43b9175
FG
276 "fixed"
277 } else {
278 "dynamic"
279 };
cf9271e2 280
e43b9175 281 if options.encrypt && self.crypt_config.is_none() {
3638341a
DM
282 bail!("requested encryption without a crypt config");
283 }
284
cf9271e2
DM
285 let index_path = format!("{}_index", prefix);
286 let close_path = format!("{}_close", prefix);
287
e43b9175 288 if let Some(manifest) = options.previous_manifest {
b957aa81 289 // try, but ignore errors
4805edc4 290 match ArchiveType::from_path(archive_name) {
b957aa81 291 Ok(ArchiveType::FixedIndex) => {
ef6d4967
TL
292 let _ = self
293 .download_previous_fixed_index(
294 archive_name,
295 &manifest,
296 known_chunks.clone(),
297 )
298 .await;
b957aa81
DM
299 }
300 Ok(ArchiveType::DynamicIndex) => {
ef6d4967
TL
301 let _ = self
302 .download_previous_dynamic_index(
303 archive_name,
304 &manifest,
305 known_chunks.clone(),
306 )
307 .await;
b957aa81
DM
308 }
309 _ => { /* do nothing */ }
310 }
311 }
cf9271e2 312
ef6d4967
TL
313 let wid = self
314 .h2
315 .post(&index_path, Some(param))
316 .await?
317 .as_u64()
318 .unwrap();
cf9271e2 319
3b60b509
DC
320 let upload_stats = Self::upload_chunk_info_stream(
321 self.h2.clone(),
322 wid,
323 stream,
9a37bd6c 324 prefix,
3b60b509
DC
325 known_chunks.clone(),
326 if options.encrypt {
327 self.crypt_config.clone()
328 } else {
329 None
330 },
331 options.compress,
332 self.verbose,
333 )
334 .await?;
cf9271e2 335
924373d2
DC
336 let size_dirty = upload_stats.size - upload_stats.size_reused;
337 let size: HumanByte = upload_stats.size.into();
6e1deb15 338 let archive = if self.verbose {
d7eedbd2 339 archive_name
6e1deb15 340 } else {
770a36e5 341 pbs_tools::format::strip_server_file_extension(archive_name)
6e1deb15
TL
342 };
343 if archive_name != CATALOG_NAME {
ef6d4967 344 let speed: HumanByte =
924373d2
DC
345 ((size_dirty * 1_000_000) / (upload_stats.duration.as_micros() as usize)).into();
346 let size_dirty: HumanByte = size_dirty.into();
347 let size_compressed: HumanByte = upload_stats.size_compressed.into();
ef6d4967 348 println!(
924373d2 349 "{}: had to backup {} of {} (compressed {}) in {:.2}s",
ef6d4967 350 archive,
924373d2
DC
351 size_dirty,
352 size,
353 size_compressed,
354 upload_stats.duration.as_secs_f64()
ef6d4967 355 );
924373d2 356 println!("{}: average backup speed: {}/s", archive, speed);
6e1deb15 357 } else {
924373d2 358 println!("Uploaded backup catalog ({})", size);
6e1deb15
TL
359 }
360
3b60b509
DC
361 if upload_stats.size_reused > 0 && upload_stats.size > 1024 * 1024 {
362 let reused_percent = upload_stats.size_reused as f64 * 100. / upload_stats.size as f64;
363 let reused: HumanByte = upload_stats.size_reused.into();
ef6d4967
TL
364 println!(
365 "{}: backup was done incrementally, reused {} ({:.1}%)",
366 archive, reused, reused_percent
367 );
6e1deb15 368 }
3b60b509 369 if self.verbose && upload_stats.chunk_count > 0 {
ef6d4967
TL
370 println!(
371 "{}: Reused {} from {} chunks.",
3b60b509 372 archive, upload_stats.chunk_reused, upload_stats.chunk_count
ef6d4967
TL
373 );
374 println!(
375 "{}: Average chunk size was {}.",
376 archive,
3b60b509 377 HumanByte::from(upload_stats.size / upload_stats.chunk_count)
ef6d4967
TL
378 );
379 println!(
380 "{}: Average time per request: {} microseconds.",
381 archive,
3b60b509 382 (upload_stats.duration.as_micros()) / (upload_stats.chunk_count as u128)
ef6d4967 383 );
6da73c82
DM
384 }
385
cf9271e2
DM
386 let param = json!({
387 "wid": wid ,
3b60b509
DC
388 "chunk-count": upload_stats.chunk_count,
389 "size": upload_stats.size,
25877d05 390 "csum": hex::encode(&upload_stats.csum),
cf9271e2
DM
391 });
392 let _value = self.h2.post(&close_path, Some(param)).await?;
393 Ok(BackupStats {
3b60b509
DC
394 size: upload_stats.size as u64,
395 csum: upload_stats.csum,
cf9271e2
DM
396 })
397 }
398
ef6d4967
TL
399 fn response_queue(
400 verbose: bool,
401 ) -> (
cf9271e2 402 mpsc::Sender<h2::client::ResponseFuture>,
ef6d4967 403 oneshot::Receiver<Result<(), Error>>,
cf9271e2
DM
404 ) {
405 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100);
406 let (verify_result_tx, verify_result_rx) = oneshot::channel();
407
db0cb9ce
WB
408 // FIXME: check if this works as expected as replacement for the combinator below?
409 // tokio::spawn(async move {
410 // let result: Result<(), Error> = (async move {
411 // while let Some(response) = verify_queue_rx.recv().await {
412 // match H2Client::h2api_response(response.await?).await {
413 // Ok(result) => println!("RESPONSE: {:?}", result),
414 // Err(err) => bail!("pipelined request failed: {}", err),
415 // }
416 // }
417 // Ok(())
418 // }).await;
419 // let _ignore_closed_channel = verify_result_tx.send(result);
420 // });
421 // old code for reference?
422 tokio::spawn(
7c667013 423 ReceiverStream::new(verify_queue_rx)
cf9271e2 424 .map(Ok::<_, Error>)
323b2f3d 425 .try_for_each(move |response: h2::client::ResponseFuture| {
cf9271e2
DM
426 response
427 .map_err(Error::from)
428 .and_then(H2Client::h2api_response)
ef6d4967
TL
429 .map_ok(move |result| {
430 if verbose {
431 println!("RESPONSE: {:?}", result)
432 }
433 })
cf9271e2
DM
434 .map_err(|err| format_err!("pipelined request failed: {}", err))
435 })
436 .map(|result| {
ef6d4967
TL
437 let _ignore_closed_channel = verify_result_tx.send(result);
438 }),
cf9271e2
DM
439 );
440
441 (verify_queue_tx, verify_result_rx)
442 }
443
8db14689
WB
444 fn append_chunk_queue(
445 h2: H2Client,
446 wid: u64,
447 path: String,
448 verbose: bool,
449 ) -> (UploadQueueSender, UploadResultReceiver) {
cf9271e2
DM
450 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(64);
451 let (verify_result_tx, verify_result_rx) = oneshot::channel();
452
db0cb9ce
WB
453 // FIXME: async-block-ify this code!
454 tokio::spawn(
7c667013 455 ReceiverStream::new(verify_queue_rx)
cf9271e2
DM
456 .map(Ok::<_, Error>)
457 .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| {
458 match (response, merged_chunk_info) {
459 (Some(response), MergedChunkInfo::Known(list)) => {
be3a0295 460 Either::Left(
cf9271e2
DM
461 response
462 .map_err(Error::from)
463 .and_then(H2Client::h2api_response)
464 .and_then(move |_result| {
465 future::ok(MergedChunkInfo::Known(list))
466 })
467 )
468 }
469 (None, MergedChunkInfo::Known(list)) => {
be3a0295 470 Either::Right(future::ok(MergedChunkInfo::Known(list)))
cf9271e2
DM
471 }
472 _ => unreachable!(),
473 }
474 })
475 .merge_known_chunks()
476 .and_then(move |merged_chunk_info| {
477 match merged_chunk_info {
478 MergedChunkInfo::Known(chunk_list) => {
479 let mut digest_list = vec![];
480 let mut offset_list = vec![];
481 for (offset, digest) in chunk_list {
25877d05 482 digest_list.push(hex::encode(&digest));
cf9271e2
DM
483 offset_list.push(offset);
484 }
e02c3d46 485 if verbose { println!("append chunks list len ({})", digest_list.len()); }
cf9271e2
DM
486 let param = json!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list });
487 let request = H2Client::request_builder("localhost", "PUT", &path, None, Some("application/json")).unwrap();
db0cb9ce 488 let param_data = bytes::Bytes::from(param.to_string().into_bytes());
cf9271e2 489 let upload_data = Some(param_data);
8db14689 490 h2.send_request(request, upload_data)
cf9271e2
DM
491 .and_then(move |response| {
492 response
493 .map_err(Error::from)
494 .and_then(H2Client::h2api_response)
495 .map_ok(|_| ())
496 })
497 .map_err(|err| format_err!("pipelined request failed: {}", err))
498 }
499 _ => unreachable!(),
500 }
501 })
502 .try_for_each(|_| future::ok(()))
503 .map(|result| {
504 let _ignore_closed_channel = verify_result_tx.send(result);
505 })
506 );
507
508 (verify_queue_tx, verify_result_rx)
509 }
510
b957aa81 511 pub async fn download_previous_fixed_index(
cf9271e2 512 &self,
cf9271e2 513 archive_name: &str,
b957aa81 514 manifest: &BackupManifest,
ef6d4967 515 known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
b957aa81 516 ) -> Result<FixedIndexReader, Error> {
b957aa81
DM
517 let mut tmpfile = std::fs::OpenOptions::new()
518 .write(true)
519 .read(true)
520 .custom_flags(libc::O_TMPFILE)
521 .open("/tmp")?;
cf9271e2 522
b957aa81 523 let param = json!({ "archive-name": archive_name });
ef6d4967
TL
524 self.h2
525 .download("previous", Some(param), &mut tmpfile)
526 .await?;
b957aa81 527
ef6d4967
TL
528 let index = FixedIndexReader::new(tmpfile).map_err(|err| {
529 format_err!("unable to read fixed index '{}' - {}", archive_name, err)
530 })?;
b957aa81
DM
531 // Note: do not use values stored in index (not trusted) - instead, computed them again
532 let (csum, size) = index.compute_csum();
533 manifest.verify_file(archive_name, &csum, size)?;
534
535 // add index chunks to known chunks
536 let mut known_chunks = known_chunks.lock().unwrap();
537 for i in 0..index.index_count() {
538 known_chunks.insert(*index.index_digest(i).unwrap());
539 }
cf9271e2 540
b957aa81 541 if self.verbose {
ef6d4967
TL
542 println!(
543 "{}: known chunks list length is {}",
544 archive_name,
545 index.index_count()
546 );
cf9271e2
DM
547 }
548
b957aa81
DM
549 Ok(index)
550 }
551
552 pub async fn download_previous_dynamic_index(
553 &self,
554 archive_name: &str,
555 manifest: &BackupManifest,
ef6d4967 556 known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
b957aa81 557 ) -> Result<DynamicIndexReader, Error> {
b957aa81
DM
558 let mut tmpfile = std::fs::OpenOptions::new()
559 .write(true)
560 .read(true)
561 .custom_flags(libc::O_TMPFILE)
562 .open("/tmp")?;
cf9271e2 563
b957aa81 564 let param = json!({ "archive-name": archive_name });
ef6d4967
TL
565 self.h2
566 .download("previous", Some(param), &mut tmpfile)
567 .await?;
b957aa81 568
ef6d4967
TL
569 let index = DynamicIndexReader::new(tmpfile).map_err(|err| {
570 format_err!("unable to read dynmamic index '{}' - {}", archive_name, err)
571 })?;
b957aa81
DM
572 // Note: do not use values stored in index (not trusted) - instead, computed them again
573 let (csum, size) = index.compute_csum();
574 manifest.verify_file(archive_name, &csum, size)?;
575
576 // add index chunks to known chunks
577 let mut known_chunks = known_chunks.lock().unwrap();
578 for i in 0..index.index_count() {
579 known_chunks.insert(*index.index_digest(i).unwrap());
cf9271e2
DM
580 }
581
e02c3d46 582 if self.verbose {
ef6d4967
TL
583 println!(
584 "{}: known chunks list length is {}",
585 archive_name,
586 index.index_count()
587 );
e02c3d46 588 }
ee5fe978 589
b957aa81
DM
590 Ok(index)
591 }
592
8b7f8d3f
FG
593 /// Retrieve backup time of last backup
594 pub async fn previous_backup_time(&self) -> Result<Option<i64>, Error> {
595 let data = self.h2.get("previous_backup_time", None).await?;
ef6d4967
TL
596 serde_json::from_value(data).map_err(|err| {
597 format_err!(
598 "Failed to parse backup time value returned by server - {}",
599 err
600 )
601 })
8b7f8d3f
FG
602 }
603
b957aa81
DM
604 /// Download backup manifest (index.json) of last backup
605 pub async fn download_previous_manifest(&self) -> Result<BackupManifest, Error> {
b957aa81
DM
606 let mut raw_data = Vec::with_capacity(64 * 1024);
607
608 let param = json!({ "archive-name": MANIFEST_BLOB_NAME });
ef6d4967
TL
609 self.h2
610 .download("previous", Some(param), &mut raw_data)
611 .await?;
b957aa81 612
39f18b30 613 let blob = DataBlob::load_from_reader(&mut &raw_data[..])?;
8819d1f2
FG
614 // no expected digest available
615 let data = blob.decode(self.crypt_config.as_ref().map(Arc::as_ref), None)?;
3dacedce 616
ef6d4967
TL
617 let manifest =
618 BackupManifest::from_data(&data[..], self.crypt_config.as_ref().map(Arc::as_ref))?;
b957aa81
DM
619
620 Ok(manifest)
cf9271e2
DM
621 }
622
8db14689 623 // We have no `self` here for `h2` and `verbose`, the only other arg "common" with 1 other
d1d74c43 624 // function in the same path is `wid`, so those 3 could be in a struct, but there's no real use
8db14689
WB
625 // since this is a private method.
626 #[allow(clippy::too_many_arguments)]
cf9271e2
DM
627 fn upload_chunk_info_stream(
628 h2: H2Client,
629 wid: u64,
630 stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
631 prefix: &str,
ef6d4967 632 known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
cf9271e2 633 crypt_config: Option<Arc<CryptConfig>>,
3638341a 634 compress: bool,
e02c3d46 635 verbose: bool,
3b60b509 636 ) -> impl Future<Output = Result<UploadStats, Error>> {
6e1deb15
TL
637 let total_chunks = Arc::new(AtomicUsize::new(0));
638 let total_chunks2 = total_chunks.clone();
639 let known_chunk_count = Arc::new(AtomicUsize::new(0));
640 let known_chunk_count2 = known_chunk_count.clone();
cf9271e2
DM
641
642 let stream_len = Arc::new(AtomicUsize::new(0));
643 let stream_len2 = stream_len.clone();
924373d2
DC
644 let compressed_stream_len = Arc::new(AtomicU64::new(0));
645 let compressed_stream_len2 = compressed_stream_len.clone();
6e1deb15
TL
646 let reused_len = Arc::new(AtomicUsize::new(0));
647 let reused_len2 = reused_len.clone();
cf9271e2
DM
648
649 let append_chunk_path = format!("{}_index", prefix);
650 let upload_chunk_path = format!("{}_chunk", prefix);
651 let is_fixed_chunk_size = prefix == "fixed";
652
653 let (upload_queue, upload_result) =
8db14689 654 Self::append_chunk_queue(h2.clone(), wid, append_chunk_path, verbose);
cf9271e2
DM
655
656 let start_time = std::time::Instant::now();
657
658 let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
659 let index_csum_2 = index_csum.clone();
660
661 stream
662 .and_then(move |data| {
cf9271e2
DM
663 let chunk_len = data.len();
664
6e1deb15 665 total_chunks.fetch_add(1, Ordering::SeqCst);
cf9271e2
DM
666 let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;
667
ef6d4967 668 let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress);
cf9271e2
DM
669
670 if let Some(ref crypt_config) = crypt_config {
3638341a 671 chunk_builder = chunk_builder.crypt_config(crypt_config);
cf9271e2
DM
672 }
673
674 let mut known_chunks = known_chunks.lock().unwrap();
675 let digest = chunk_builder.digest();
676
677 let mut guard = index_csum.lock().unwrap();
678 let csum = guard.as_mut().unwrap();
679
680 let chunk_end = offset + chunk_len as u64;
681
ef6d4967
TL
682 if !is_fixed_chunk_size {
683 csum.update(&chunk_end.to_le_bytes());
684 }
cf9271e2
DM
685 csum.update(digest);
686
687 let chunk_is_known = known_chunks.contains(digest);
688 if chunk_is_known {
6e1deb15
TL
689 known_chunk_count.fetch_add(1, Ordering::SeqCst);
690 reused_len.fetch_add(chunk_len, Ordering::SeqCst);
cf9271e2
DM
691 future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
692 } else {
924373d2 693 let compressed_stream_len2 = compressed_stream_len.clone();
cf9271e2 694 known_chunks.insert(*digest);
ef6d4967 695 future::ready(chunk_builder.build().map(move |(chunk, digest)| {
924373d2 696 compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst);
ef6d4967 697 MergedChunkInfo::New(ChunkInfo {
cf9271e2
DM
698 chunk,
699 digest,
700 chunk_len: chunk_len as u64,
701 offset,
ef6d4967
TL
702 })
703 }))
cf9271e2
DM
704 }
705 })
706 .merge_known_chunks()
707 .try_for_each(move |merged_chunk_info| {
0bfcea6a 708 let upload_queue = upload_queue.clone();
cf9271e2
DM
709
710 if let MergedChunkInfo::New(chunk_info) = merged_chunk_info {
711 let offset = chunk_info.offset;
712 let digest = chunk_info.digest;
25877d05 713 let digest_str = hex::encode(&digest);
cf9271e2 714
8db14689
WB
715 /* too verbose, needs finer verbosity setting granularity
716 if verbose {
e02c3d46
DM
717 println!("upload new chunk {} ({} bytes, offset {})", digest_str,
718 chunk_info.chunk_len, offset);
719 }
8db14689 720 */
cf9271e2 721
db0cb9ce 722 let chunk_data = chunk_info.chunk.into_inner();
cf9271e2
DM
723 let param = json!({
724 "wid": wid,
725 "digest": digest_str,
726 "size": chunk_info.chunk_len,
727 "encoded-size": chunk_data.len(),
728 });
729
730 let ct = "application/octet-stream";
ef6d4967
TL
731 let request = H2Client::request_builder(
732 "localhost",
733 "POST",
734 &upload_chunk_path,
735 Some(param),
736 Some(ct),
737 )
738 .unwrap();
cf9271e2
DM
739 let upload_data = Some(bytes::Bytes::from(chunk_data));
740
741 let new_info = MergedChunkInfo::Known(vec![(offset, digest)]);
742
be3a0295 743 Either::Left(h2.send_request(request, upload_data).and_then(
ef6d4967 744 move |response| async move {
cf9271e2
DM
745 upload_queue
746 .send((new_info, Some(response)))
747 .await
ef6d4967
TL
748 .map_err(|err| {
749 format_err!("failed to send to upload queue: {}", err)
750 })
751 },
752 ))
cf9271e2 753 } else {
be3a0295 754 Either::Right(async move {
cf9271e2
DM
755 upload_queue
756 .send((merged_chunk_info, None))
757 .await
db0cb9ce 758 .map_err(|err| format_err!("failed to send to upload queue: {}", err))
cf9271e2
DM
759 })
760 }
761 })
ef6d4967 762 .then(move |result| async move { upload_result.await?.and(result) }.boxed())
cf9271e2 763 .and_then(move |_| {
6e1deb15 764 let duration = start_time.elapsed();
3b60b509
DC
765 let chunk_count = total_chunks2.load(Ordering::SeqCst);
766 let chunk_reused = known_chunk_count2.load(Ordering::SeqCst);
767 let size = stream_len2.load(Ordering::SeqCst);
768 let size_reused = reused_len2.load(Ordering::SeqCst);
924373d2 769 let size_compressed = compressed_stream_len2.load(Ordering::SeqCst) as usize;
cf9271e2
DM
770
771 let mut guard = index_csum_2.lock().unwrap();
772 let csum = guard.take().unwrap().finish();
773
3b60b509
DC
774 futures::future::ok(UploadStats {
775 chunk_count,
776 chunk_reused,
777 size,
778 size_reused,
924373d2 779 size_compressed,
ef6d4967
TL
780 duration,
781 csum,
3b60b509 782 })
cf9271e2
DM
783 })
784 }
785
1ffe0301 786 /// Upload speed test - prints result to stderr
dde18bbb 787 pub async fn upload_speedtest(&self, verbose: bool) -> Result<f64, Error> {
cf9271e2
DM
788 let mut data = vec![];
789 // generate pseudo random byte sequence
ef6d4967 790 for i in 0..1024 * 1024 {
cf9271e2 791 for j in 0..4 {
ef6d4967 792 let byte = ((i >> (j << 3)) & 0xff) as u8;
cf9271e2
DM
793 data.push(byte);
794 }
795 }
796
797 let item_len = data.len();
798
799 let mut repeat = 0;
800
323b2f3d 801 let (upload_queue, upload_result) = Self::response_queue(verbose);
cf9271e2
DM
802
803 let start_time = std::time::Instant::now();
804
805 loop {
806 repeat += 1;
807 if start_time.elapsed().as_secs() >= 5 {
808 break;
809 }
810
ef6d4967
TL
811 if verbose {
812 eprintln!("send test data ({} bytes)", data.len());
813 }
814 let request =
815 H2Client::request_builder("localhost", "POST", "speedtest", None, None).unwrap();
816 let request_future = self
817 .h2
818 .send_request(request, Some(bytes::Bytes::from(data.clone())))
819 .await?;
cf9271e2
DM
820
821 upload_queue.send(request_future).await?;
822 }
823
824 drop(upload_queue); // close queue
825
826 let _ = upload_result.await?;
827
ef6d4967
TL
828 eprintln!(
829 "Uploaded {} chunks in {} seconds.",
830 repeat,
831 start_time.elapsed().as_secs()
832 );
833 let speed = ((item_len * (repeat as usize)) as f64) / start_time.elapsed().as_secs_f64();
834 eprintln!(
835 "Time per request: {} microseconds.",
836 (start_time.elapsed().as_micros()) / (repeat as u128)
837 );
cf9271e2
DM
838
839 Ok(speed)
840 }
841}