1 use std
::collections
::HashSet
;
2 use std
::future
::Future
;
3 use std
::os
::unix
::fs
::OpenOptionsExt
;
4 use std
::sync
::atomic
::{AtomicU64, AtomicUsize, Ordering}
;
5 use std
::sync
::{Arc, Mutex}
;
7 use anyhow
::{bail, format_err, Error}
;
8 use futures
::future
::{self, AbortHandle, Either, FutureExt, TryFutureExt}
;
9 use futures
::stream
::{Stream, StreamExt, TryStreamExt}
;
10 use serde_json
::{json, Value}
;
11 use tokio
::io
::AsyncReadExt
;
12 use tokio
::sync
::{mpsc, oneshot}
;
13 use tokio_stream
::wrappers
::ReceiverStream
;
15 use pbs_api_types
::{BackupDir, BackupNamespace, HumanByte}
;
16 use pbs_datastore
::data_blob
::{ChunkInfo, DataBlob, DataChunkBuilder}
;
17 use pbs_datastore
::dynamic_index
::DynamicIndexReader
;
18 use pbs_datastore
::fixed_index
::FixedIndexReader
;
19 use pbs_datastore
::index
::IndexFile
;
20 use pbs_datastore
::manifest
::{ArchiveType, BackupManifest, MANIFEST_BLOB_NAME}
;
21 use pbs_datastore
::{CATALOG_NAME, PROXMOX_BACKUP_PROTOCOL_ID_V1}
;
22 use pbs_tools
::crypt_config
::CryptConfig
;
24 use super::merge_known_chunks
::{MergeKnownChunks, MergedChunkInfo}
;
26 use super::{H2Client, HttpClient}
;
28 pub struct BackupWriter
{
32 crypt_config
: Option
<Arc
<CryptConfig
>>,
35 impl Drop
for BackupWriter
{
41 pub struct BackupStats
{
46 /// Options for uploading blobs/streams to the server
47 #[derive(Default, Clone)]
48 pub struct UploadOptions
{
49 pub previous_manifest
: Option
<Arc
<BackupManifest
>>,
52 pub fixed_size
: Option
<u64>,
60 size_compressed
: usize,
61 duration
: std
::time
::Duration
,
65 type UploadQueueSender
= mpsc
::Sender
<(MergedChunkInfo
, Option
<h2
::client
::ResponseFuture
>)>;
66 type UploadResultReceiver
= oneshot
::Receiver
<Result
<(), Error
>>;
72 crypt_config
: Option
<Arc
<CryptConfig
>>,
83 // FIXME: extract into (flattened) parameter struct?
84 #[allow(clippy::too_many_arguments)]
87 crypt_config
: Option
<Arc
<CryptConfig
>>,
93 ) -> Result
<Arc
<BackupWriter
>, Error
> {
94 let mut param
= json
!({
95 "backup-type": backup
.ty(),
96 "backup-id": backup
.id(),
97 "backup-time": backup
.time
,
100 "benchmark": benchmark
104 param
["ns"] = serde_json
::to_value(ns
)?
;
107 let req
= HttpClient
::request_builder(
116 let (h2
, abort
) = client
117 .start_h2_connection(req
, String
::from(PROXMOX_BACKUP_PROTOCOL_ID_V1
!()))
120 Ok(BackupWriter
::new(h2
, abort
, crypt_config
, debug
))
123 pub async
fn get(&self, path
: &str, param
: Option
<Value
>) -> Result
<Value
, Error
> {
124 self.h2
.get(path
, param
).await
127 pub async
fn put(&self, path
: &str, param
: Option
<Value
>) -> Result
<Value
, Error
> {
128 self.h2
.put(path
, param
).await
131 pub async
fn post(&self, path
: &str, param
: Option
<Value
>) -> Result
<Value
, Error
> {
132 self.h2
.post(path
, param
).await
135 pub async
fn upload_post(
138 param
: Option
<Value
>,
141 ) -> Result
<Value
, Error
> {
143 .upload("POST", path
, param
, content_type
, data
)
147 pub async
fn send_upload_request(
151 param
: Option
<Value
>,
154 ) -> Result
<h2
::client
::ResponseFuture
, Error
> {
156 H2Client
::request_builder("localhost", method
, path
, param
, Some(content_type
))
158 let response_future
= self
160 .send_request(request
, Some(bytes
::Bytes
::from(data
.clone())))
165 pub async
fn upload_put(
168 param
: Option
<Value
>,
171 ) -> Result
<Value
, Error
> {
172 self.h2
.upload("PUT", path
, param
, content_type
, data
).await
175 pub async
fn finish(self: Arc
<Self>) -> Result
<(), Error
> {
176 let h2
= self.h2
.clone();
178 h2
.post("finish", None
)
185 pub fn cancel(&self) {
189 pub async
fn upload_blob
<R
: std
::io
::Read
>(
193 ) -> Result
<BackupStats
, Error
> {
194 let mut raw_data
= Vec
::new();
195 // fixme: avoid loading into memory
196 reader
.read_to_end(&mut raw_data
)?
;
198 let csum
= openssl
::sha
::sha256(&raw_data
);
199 let param
= json
!({"encoded-size": raw_data.len(), "file-name": file_name }
);
200 let size
= raw_data
.len() as u64;
207 "application/octet-stream",
211 Ok(BackupStats { size, csum }
)
214 pub async
fn upload_blob_from_data(
218 options
: UploadOptions
,
219 ) -> Result
<BackupStats
, Error
> {
220 let blob
= match (options
.encrypt
, &self.crypt_config
) {
221 (false, _
) => DataBlob
::encode(&data
, None
, options
.compress
)?
,
222 (true, None
) => bail
!("requested encryption without a crypt config"),
223 (true, Some(crypt_config
)) => {
224 DataBlob
::encode(&data
, Some(crypt_config
), options
.compress
)?
228 let raw_data
= blob
.into_inner();
229 let size
= raw_data
.len() as u64;
231 let csum
= openssl
::sha
::sha256(&raw_data
);
232 let param
= json
!({"encoded-size": size, "file-name": file_name }
);
239 "application/octet-stream",
243 Ok(BackupStats { size, csum }
)
246 pub async
fn upload_blob_from_file
<P
: AsRef
<std
::path
::Path
>>(
250 options
: UploadOptions
,
251 ) -> Result
<BackupStats
, Error
> {
252 let src_path
= src_path
.as_ref();
254 let mut file
= tokio
::fs
::File
::open(src_path
)
256 .map_err(|err
| format_err
!("unable to open file {:?} - {}", src_path
, err
))?
;
258 let mut contents
= Vec
::new();
260 file
.read_to_end(&mut contents
)
262 .map_err(|err
| format_err
!("unable to read file {:?} - {}", src_path
, err
))?
;
264 self.upload_blob_from_data(contents
, file_name
, options
)
268 pub async
fn upload_stream(
271 stream
: impl Stream
<Item
= Result
<bytes
::BytesMut
, Error
>>,
272 options
: UploadOptions
,
273 ) -> Result
<BackupStats
, Error
> {
274 let known_chunks
= Arc
::new(Mutex
::new(HashSet
::new()));
276 let mut param
= json
!({ "archive-name": archive_name }
);
277 let prefix
= if let Some(size
) = options
.fixed_size
{
278 param
["size"] = size
.into();
284 if options
.encrypt
&& self.crypt_config
.is_none() {
285 bail
!("requested encryption without a crypt config");
288 let index_path
= format
!("{}_index", prefix
);
289 let close_path
= format
!("{}_close", prefix
);
291 if let Some(manifest
) = options
.previous_manifest
{
292 // try, but ignore errors
293 match ArchiveType
::from_path(archive_name
) {
294 Ok(ArchiveType
::FixedIndex
) => {
295 if let Err(err
) = self
296 .download_previous_fixed_index(
299 known_chunks
.clone(),
303 eprintln
!("Error downloading .fidx from previous manifest: {}", err
);
306 Ok(ArchiveType
::DynamicIndex
) => {
307 if let Err(err
) = self
308 .download_previous_dynamic_index(
311 known_chunks
.clone(),
315 eprintln
!("Error downloading .didx from previous manifest: {}", err
);
318 _
=> { /* do nothing */ }
324 .post(&index_path
, Some(param
))
329 let upload_stats
= Self::upload_chunk_info_stream(
334 known_chunks
.clone(),
336 self.crypt_config
.clone()
345 let size_dirty
= upload_stats
.size
- upload_stats
.size_reused
;
346 let size
: HumanByte
= upload_stats
.size
.into();
347 let archive
= if self.verbose
{
350 pbs_tools
::format
::strip_server_file_extension(archive_name
)
352 if archive_name
!= CATALOG_NAME
{
353 let speed
: HumanByte
=
354 ((size_dirty
* 1_000_000) / (upload_stats
.duration
.as_micros() as usize)).into();
355 let size_dirty
: HumanByte
= size_dirty
.into();
356 let size_compressed
: HumanByte
= upload_stats
.size_compressed
.into();
358 "{}: had to backup {} of {} (compressed {}) in {:.2}s",
363 upload_stats
.duration
.as_secs_f64()
365 println
!("{}: average backup speed: {}/s", archive
, speed
);
367 println
!("Uploaded backup catalog ({})", size
);
370 if upload_stats
.size_reused
> 0 && upload_stats
.size
> 1024 * 1024 {
371 let reused_percent
= upload_stats
.size_reused
as f64 * 100. / upload_stats
.size
as f64;
372 let reused
: HumanByte
= upload_stats
.size_reused
.into();
374 "{}: backup was done incrementally, reused {} ({:.1}%)",
375 archive
, reused
, reused_percent
378 if self.verbose
&& upload_stats
.chunk_count
> 0 {
380 "{}: Reused {} from {} chunks.",
381 archive
, upload_stats
.chunk_reused
, upload_stats
.chunk_count
384 "{}: Average chunk size was {}.",
386 HumanByte
::from(upload_stats
.size
/ upload_stats
.chunk_count
)
389 "{}: Average time per request: {} microseconds.",
391 (upload_stats
.duration
.as_micros()) / (upload_stats
.chunk_count
as u128
)
397 "chunk-count": upload_stats
.chunk_count
,
398 "size": upload_stats
.size
,
399 "csum": hex
::encode(&upload_stats
.csum
),
401 let _value
= self.h2
.post(&close_path
, Some(param
)).await?
;
403 size
: upload_stats
.size
as u64,
404 csum
: upload_stats
.csum
,
411 mpsc
::Sender
<h2
::client
::ResponseFuture
>,
412 oneshot
::Receiver
<Result
<(), Error
>>,
414 let (verify_queue_tx
, verify_queue_rx
) = mpsc
::channel(100);
415 let (verify_result_tx
, verify_result_rx
) = oneshot
::channel();
417 // FIXME: check if this works as expected as replacement for the combinator below?
418 // tokio::spawn(async move {
419 // let result: Result<(), Error> = (async move {
420 // while let Some(response) = verify_queue_rx.recv().await {
421 // match H2Client::h2api_response(response.await?).await {
422 // Ok(result) => println!("RESPONSE: {:?}", result),
423 // Err(err) => bail!("pipelined request failed: {}", err),
428 // let _ignore_closed_channel = verify_result_tx.send(result);
430 // old code for reference?
432 ReceiverStream
::new(verify_queue_rx
)
434 .try_for_each(move |response
: h2
::client
::ResponseFuture
| {
436 .map_err(Error
::from
)
437 .and_then(H2Client
::h2api_response
)
438 .map_ok(move |result
| {
440 println
!("RESPONSE: {:?}", result
)
443 .map_err(|err
| format_err
!("pipelined request failed: {}", err
))
446 let _ignore_closed_channel
= verify_result_tx
.send(result
);
450 (verify_queue_tx
, verify_result_rx
)
453 fn append_chunk_queue(
458 ) -> (UploadQueueSender
, UploadResultReceiver
) {
459 let (verify_queue_tx
, verify_queue_rx
) = mpsc
::channel(64);
460 let (verify_result_tx
, verify_result_rx
) = oneshot
::channel();
462 // FIXME: async-block-ify this code!
464 ReceiverStream
::new(verify_queue_rx
)
466 .and_then(move |(merged_chunk_info
, response
): (MergedChunkInfo
, Option
<h2
::client
::ResponseFuture
>)| {
467 match (response
, merged_chunk_info
) {
468 (Some(response
), MergedChunkInfo
::Known(list
)) => {
471 .map_err(Error
::from
)
472 .and_then(H2Client
::h2api_response
)
473 .and_then(move |_result
| {
474 future
::ok(MergedChunkInfo
::Known(list
))
478 (None
, MergedChunkInfo
::Known(list
)) => {
479 Either
::Right(future
::ok(MergedChunkInfo
::Known(list
)))
484 .merge_known_chunks()
485 .and_then(move |merged_chunk_info
| {
486 match merged_chunk_info
{
487 MergedChunkInfo
::Known(chunk_list
) => {
488 let mut digest_list
= vec
![];
489 let mut offset_list
= vec
![];
490 for (offset
, digest
) in chunk_list
{
491 digest_list
.push(hex
::encode(&digest
));
492 offset_list
.push(offset
);
494 if verbose { println!("append chunks list len ({}
)", digest_list.len()); }
495 let param = json!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list });
496 let request = H2Client::request_builder("localhost
", "PUT
", &path, None, Some("application
/json
")).unwrap();
497 let param_data = bytes::Bytes::from(param.to_string().into_bytes());
498 let upload_data = Some(param_data);
499 h2.send_request(request, upload_data)
500 .and_then(move |response| {
502 .map_err(Error::from)
503 .and_then(H2Client::h2api_response)
506 .map_err(|err| format_err!("pipelined request failed
: {}
", err))
511 .try_for_each(|_| future::ok(()))
513 let _ignore_closed_channel = verify_result_tx.send(result);
517 (verify_queue_tx, verify_result_rx)
520 pub async fn download_previous_fixed_index(
523 manifest: &BackupManifest,
524 known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
525 ) -> Result<FixedIndexReader, Error> {
526 let mut tmpfile = std::fs::OpenOptions::new()
529 .custom_flags(libc::O_TMPFILE)
532 let param = json!({ "archive-name": archive_name });
534 .download("previous
", Some(param), &mut tmpfile)
537 let index = FixedIndexReader::new(tmpfile).map_err(|err| {
538 format_err!("unable to read fixed index '{}'
- {}
", archive_name, err)
540 // Note: do not use values stored in index (not trusted) - instead, computed them again
541 let (csum, size) = index.compute_csum();
542 manifest.verify_file(archive_name, &csum, size)?;
544 // add index chunks to known chunks
545 let mut known_chunks = known_chunks.lock().unwrap();
546 for i in 0..index.index_count() {
547 known_chunks.insert(*index.index_digest(i).unwrap());
552 "{}
: known chunks list length is {}
",
561 pub async fn download_previous_dynamic_index(
564 manifest: &BackupManifest,
565 known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
566 ) -> Result<DynamicIndexReader, Error> {
567 let mut tmpfile = std::fs::OpenOptions::new()
570 .custom_flags(libc::O_TMPFILE)
573 let param = json!({ "archive-name": archive_name });
575 .download("previous
", Some(param), &mut tmpfile)
578 let index = DynamicIndexReader::new(tmpfile).map_err(|err| {
579 format_err!("unable to read dynmamic index '{}'
- {}
", archive_name, err)
581 // Note: do not use values stored in index (not trusted) - instead, computed them again
582 let (csum, size) = index.compute_csum();
583 manifest.verify_file(archive_name, &csum, size)?;
585 // add index chunks to known chunks
586 let mut known_chunks = known_chunks.lock().unwrap();
587 for i in 0..index.index_count() {
588 known_chunks.insert(*index.index_digest(i).unwrap());
593 "{}
: known chunks list length is {}
",
602 /// Retrieve backup time of last backup
603 pub async fn previous_backup_time(&self) -> Result<Option<i64>, Error> {
604 let data = self.h2.get("previous_backup_time
", None).await?;
605 serde_json::from_value(data).map_err(|err| {
607 "Failed to parse backup time value returned by server
- {}
",
613 /// Download backup manifest (index.json) of last backup
614 pub async fn download_previous_manifest(&self) -> Result<BackupManifest, Error> {
615 let mut raw_data = Vec::with_capacity(64 * 1024);
617 let param = json!({ "archive-name": MANIFEST_BLOB_NAME });
619 .download("previous
", Some(param), &mut raw_data)
622 let blob = DataBlob::load_from_reader(&mut &raw_data[..])?;
623 // no expected digest available
624 let data = blob.decode(self.crypt_config.as_ref().map(Arc::as_ref), None)?;
627 BackupManifest::from_data(&data[..], self.crypt_config.as_ref().map(Arc::as_ref))?;
632 // We have no `self` here for `h2` and `verbose`, the only other arg "common
" with 1 other
633 // function in the same path is `wid`, so those 3 could be in a struct, but there's no real use
634 // since this is a private method.
635 #[allow(clippy::too_many_arguments)]
636 fn upload_chunk_info_stream(
639 stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
641 known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
642 crypt_config: Option<Arc<CryptConfig>>,
645 ) -> impl Future<Output = Result<UploadStats, Error>> {
646 let total_chunks = Arc::new(AtomicUsize::new(0));
647 let total_chunks2 = total_chunks.clone();
648 let known_chunk_count = Arc::new(AtomicUsize::new(0));
649 let known_chunk_count2 = known_chunk_count.clone();
651 let stream_len = Arc::new(AtomicUsize::new(0));
652 let stream_len2 = stream_len.clone();
653 let compressed_stream_len = Arc::new(AtomicU64::new(0));
654 let compressed_stream_len2 = compressed_stream_len.clone();
655 let reused_len = Arc::new(AtomicUsize::new(0));
656 let reused_len2 = reused_len.clone();
658 let append_chunk_path = format!("{}_index
", prefix);
659 let upload_chunk_path = format!("{}_chunk
", prefix);
660 let is_fixed_chunk_size = prefix == "fixed
";
662 let (upload_queue, upload_result) =
663 Self::append_chunk_queue(h2.clone(), wid, append_chunk_path, verbose);
665 let start_time = std::time::Instant::now();
667 let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
668 let index_csum_2 = index_csum.clone();
671 .and_then(move |data| {
672 let chunk_len = data.len();
674 total_chunks.fetch_add(1, Ordering::SeqCst);
675 let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;
677 let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress);
679 if let Some(ref crypt_config) = crypt_config {
680 chunk_builder = chunk_builder.crypt_config(crypt_config);
683 let mut known_chunks = known_chunks.lock().unwrap();
684 let digest = chunk_builder.digest();
686 let mut guard = index_csum.lock().unwrap();
687 let csum = guard.as_mut().unwrap();
689 let chunk_end = offset + chunk_len as u64;
691 if !is_fixed_chunk_size {
692 csum.update(&chunk_end.to_le_bytes());
696 let chunk_is_known = known_chunks.contains(digest);
698 known_chunk_count.fetch_add(1, Ordering::SeqCst);
699 reused_len.fetch_add(chunk_len, Ordering::SeqCst);
700 future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
702 let compressed_stream_len2 = compressed_stream_len.clone();
703 known_chunks.insert(*digest);
704 future::ready(chunk_builder.build().map(move |(chunk, digest)| {
705 compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst);
706 MergedChunkInfo::New(ChunkInfo {
709 chunk_len: chunk_len as u64,
715 .merge_known_chunks()
716 .try_for_each(move |merged_chunk_info| {
717 let upload_queue = upload_queue.clone();
719 if let MergedChunkInfo::New(chunk_info) = merged_chunk_info {
720 let offset = chunk_info.offset;
721 let digest = chunk_info.digest;
722 let digest_str = hex::encode(&digest);
724 /* too verbose, needs finer verbosity setting granularity
726 println!("upload new chunk {}
({} bytes
, offset {}
)", digest_str,
727 chunk_info.chunk_len, offset);
731 let chunk_data = chunk_info.chunk.into_inner();
734 "digest
": digest_str,
735 "size
": chunk_info.chunk_len,
736 "encoded
-size
": chunk_data.len(),
739 let ct = "application
/octet
-stream
";
740 let request = H2Client::request_builder(
748 let upload_data = Some(bytes::Bytes::from(chunk_data));
750 let new_info = MergedChunkInfo::Known(vec![(offset, digest)]);
752 Either::Left(h2.send_request(request, upload_data).and_then(
753 move |response| async move {
755 .send((new_info, Some(response)))
758 format_err!("failed to send to upload queue
: {}
", err)
763 Either::Right(async move {
765 .send((merged_chunk_info, None))
767 .map_err(|err| format_err!("failed to send to upload queue
: {}
", err))
771 .then(move |result| async move { upload_result.await?.and(result) }.boxed())
773 let duration = start_time.elapsed();
774 let chunk_count = total_chunks2.load(Ordering::SeqCst);
775 let chunk_reused = known_chunk_count2.load(Ordering::SeqCst);
776 let size = stream_len2.load(Ordering::SeqCst);
777 let size_reused = reused_len2.load(Ordering::SeqCst);
778 let size_compressed = compressed_stream_len2.load(Ordering::SeqCst) as usize;
780 let mut guard = index_csum_2.lock().unwrap();
781 let csum = guard.take().unwrap().finish();
783 futures::future::ok(UploadStats {
795 /// Upload speed test - prints result to stderr
796 pub async fn upload_speedtest(&self, verbose: bool) -> Result<f64, Error> {
797 let mut data = vec![];
798 // generate pseudo random byte sequence
799 for i in 0..1024 * 1024 {
801 let byte = ((i >> (j << 3)) & 0xff) as u8;
806 let item_len = data.len();
810 let (upload_queue, upload_result) = Self::response_queue(verbose);
812 let start_time = std::time::Instant::now();
816 if start_time.elapsed().as_secs() >= 5 {
821 eprintln!("send test
data ({} bytes
)", data.len());
824 H2Client::request_builder("localhost
", "POST
", "speedtest
", None, None).unwrap();
825 let request_future = self
827 .send_request(request, Some(bytes::Bytes::from(data.clone())))
830 upload_queue.send(request_future).await?;
833 drop(upload_queue); // close queue
835 let _ = upload_result.await?;
838 "Uploaded {} chunks
in {} seconds
.",
840 start_time.elapsed().as_secs()
842 let speed = ((item_len * (repeat as usize)) as f64) / start_time.elapsed().as_secs_f64();
844 "Time per request
: {} microseconds
.",
845 (start_time.elapsed().as_micros()) / (repeat as u128)