1 use std
::collections
::HashSet
;
2 use std
::os
::unix
::fs
::OpenOptionsExt
;
3 use std
::sync
::atomic
::{AtomicUsize, Ordering}
;
4 use std
::sync
::{Arc, Mutex}
;
6 use anyhow
::{bail, format_err, Error}
;
8 use futures
::stream
::Stream
;
9 use futures
::future
::AbortHandle
;
10 use serde_json
::{json, Value}
;
11 use tokio
::io
::AsyncReadExt
;
12 use tokio
::sync
::{mpsc, oneshot}
;
13 use tokio_stream
::wrappers
::ReceiverStream
;
15 use proxmox
::tools
::digest_to_hex
;
17 use super::merge_known_chunks
::{MergedChunkInfo, MergeKnownChunks}
;
19 use crate::tools
::format
::HumanByte
;
21 use super::{HttpClient, H2Client}
;
23 pub struct BackupWriter
{
27 crypt_config
: Option
<Arc
<CryptConfig
>>,
30 impl Drop
for BackupWriter
{
37 pub struct BackupStats
{
42 type UploadQueueSender
= mpsc
::Sender
<(MergedChunkInfo
, Option
<h2
::client
::ResponseFuture
>)>;
43 type UploadResultReceiver
= oneshot
::Receiver
<Result
<(), Error
>>;
47 fn new(h2
: H2Client
, abort
: AbortHandle
, crypt_config
: Option
<Arc
<CryptConfig
>>, verbose
: bool
) -> Arc
<Self> {
48 Arc
::new(Self { h2, abort, crypt_config, verbose }
)
53 crypt_config
: Option
<Arc
<CryptConfig
>>,
60 ) -> Result
<Arc
<BackupWriter
>, Error
> {
63 "backup-type": backup_type
,
64 "backup-id": backup_id
,
65 "backup-time": backup_time
,
68 "benchmark": benchmark
71 let req
= HttpClient
::request_builder(
72 client
.server(), client
.port(), "GET", "/api2/json/backup", Some(param
)).unwrap();
74 let (h2
, abort
) = client
.start_h2_connection(req
, String
::from(PROXMOX_BACKUP_PROTOCOL_ID_V1
!())).await?
;
76 Ok(BackupWriter
::new(h2
, abort
, crypt_config
, debug
))
83 ) -> Result
<Value
, Error
> {
84 self.h2
.get(path
, param
).await
91 ) -> Result
<Value
, Error
> {
92 self.h2
.put(path
, param
).await
99 ) -> Result
<Value
, Error
> {
100 self.h2
.post(path
, param
).await
103 pub async
fn upload_post(
106 param
: Option
<Value
>,
109 ) -> Result
<Value
, Error
> {
110 self.h2
.upload("POST", path
, param
, content_type
, data
).await
113 pub async
fn send_upload_request(
117 param
: Option
<Value
>,
120 ) -> Result
<h2
::client
::ResponseFuture
, Error
> {
122 let request
= H2Client
::request_builder("localhost", method
, path
, param
, Some(content_type
)).unwrap();
123 let response_future
= self.h2
.send_request(request
, Some(bytes
::Bytes
::from(data
.clone()))).await?
;
127 pub async
fn upload_put(
130 param
: Option
<Value
>,
133 ) -> Result
<Value
, Error
> {
134 self.h2
.upload("PUT", path
, param
, content_type
, data
).await
137 pub async
fn finish(self: Arc
<Self>) -> Result
<(), Error
> {
138 let h2
= self.h2
.clone();
140 h2
.post("finish", None
)
147 pub fn cancel(&self) {
151 pub async
fn upload_blob
<R
: std
::io
::Read
>(
155 ) -> Result
<BackupStats
, Error
> {
156 let mut raw_data
= Vec
::new();
157 // fixme: avoid loading into memory
158 reader
.read_to_end(&mut raw_data
)?
;
160 let csum
= openssl
::sha
::sha256(&raw_data
);
161 let param
= json
!({"encoded-size": raw_data.len(), "file-name": file_name }
);
162 let size
= raw_data
.len() as u64;
163 let _value
= self.h2
.upload("POST", "blob", Some(param
), "application/octet-stream", raw_data
).await?
;
164 Ok(BackupStats { size, csum }
)
167 pub async
fn upload_blob_from_data(
173 ) -> Result
<BackupStats
, Error
> {
174 let blob
= match (encrypt
, &self.crypt_config
) {
175 (false, _
) => DataBlob
::encode(&data
, None
, compress
)?
,
176 (true, None
) => bail
!("requested encryption without a crypt config"),
177 (true, Some(crypt_config
)) => DataBlob
::encode(&data
, Some(crypt_config
), compress
)?
,
180 let raw_data
= blob
.into_inner();
181 let size
= raw_data
.len() as u64;
183 let csum
= openssl
::sha
::sha256(&raw_data
);
184 let param
= json
!({"encoded-size": size, "file-name": file_name }
);
185 let _value
= self.h2
.upload("POST", "blob", Some(param
), "application/octet-stream", raw_data
).await?
;
186 Ok(BackupStats { size, csum }
)
189 pub async
fn upload_blob_from_file
<P
: AsRef
<std
::path
::Path
>>(
195 ) -> Result
<BackupStats
, Error
> {
197 let src_path
= src_path
.as_ref();
199 let mut file
= tokio
::fs
::File
::open(src_path
)
201 .map_err(|err
| format_err
!("unable to open file {:?} - {}", src_path
, err
))?
;
203 let mut contents
= Vec
::new();
205 file
.read_to_end(&mut contents
)
207 .map_err(|err
| format_err
!("unable to read file {:?} - {}", src_path
, err
))?
;
209 self.upload_blob_from_data(contents
, file_name
, compress
, encrypt
).await
212 pub async
fn upload_stream(
214 previous_manifest
: Option
<Arc
<BackupManifest
>>,
216 stream
: impl Stream
<Item
= Result
<bytes
::BytesMut
, Error
>>,
218 fixed_size
: Option
<u64>,
221 ) -> Result
<BackupStats
, Error
> {
222 let known_chunks
= Arc
::new(Mutex
::new(HashSet
::new()));
224 let mut param
= json
!({ "archive-name": archive_name }
);
225 if let Some(size
) = fixed_size
{
226 param
["size"] = size
.into();
229 if encrypt
&& self.crypt_config
.is_none() {
230 bail
!("requested encryption without a crypt config");
233 let index_path
= format
!("{}_index", prefix
);
234 let close_path
= format
!("{}_close", prefix
);
236 if let Some(manifest
) = previous_manifest
{
237 // try, but ignore errors
238 match archive_type(archive_name
) {
239 Ok(ArchiveType
::FixedIndex
) => {
240 let _
= self.download_previous_fixed_index(archive_name
, &manifest
, known_chunks
.clone()).await
;
242 Ok(ArchiveType
::DynamicIndex
) => {
243 let _
= self.download_previous_dynamic_index(archive_name
, &manifest
, known_chunks
.clone()).await
;
245 _
=> { /* do nothing */ }
249 let wid
= self.h2
.post(&index_path
, Some(param
)).await?
.as_u64().unwrap();
251 let (chunk_count
, chunk_reused
, size
, size_reused
, duration
, csum
) =
252 Self::upload_chunk_info_stream(
257 known_chunks
.clone(),
258 if encrypt { self.crypt_config.clone() }
else { None }
,
264 let uploaded
= size
- size_reused
;
265 let vsize_h
: HumanByte
= size
.into();
266 let archive
= if self.verbose
{
267 archive_name
.to_string()
269 crate::tools
::format
::strip_server_file_extension(archive_name
)
271 if archive_name
!= CATALOG_NAME
{
272 let speed
: HumanByte
= ((uploaded
* 1_000_000) / (duration
.as_micros() as usize)).into();
273 let uploaded
: HumanByte
= uploaded
.into();
274 println
!("{}: had to upload {} of {} in {:.2}s, average speed {}/s).", archive
, uploaded
, vsize_h
, duration
.as_secs_f64(), speed
);
276 println
!("Uploaded backup catalog ({})", vsize_h
);
279 if size_reused
> 0 && size
> 1024*1024 {
280 let reused_percent
= size_reused
as f64 * 100. / size
as f64;
281 let reused
: HumanByte
= size_reused
.into();
282 println
!("{}: backup was done incrementally, reused {} ({:.1}%)", archive
, reused
, reused_percent
);
284 if self.verbose
&& chunk_count
> 0 {
285 println
!("{}: Reused {} from {} chunks.", archive
, chunk_reused
, chunk_count
);
286 println
!("{}: Average chunk size was {}.", archive
, HumanByte
::from(size
/chunk_count
));
287 println
!("{}: Average time per request: {} microseconds.", archive
, (duration
.as_micros())/(chunk_count
as u128
));
292 "chunk-count": chunk_count
,
294 "csum": proxmox
::tools
::digest_to_hex(&csum
),
296 let _value
= self.h2
.post(&close_path
, Some(param
)).await?
;
303 fn response_queue(verbose
: bool
) -> (
304 mpsc
::Sender
<h2
::client
::ResponseFuture
>,
305 oneshot
::Receiver
<Result
<(), Error
>>
307 let (verify_queue_tx
, verify_queue_rx
) = mpsc
::channel(100);
308 let (verify_result_tx
, verify_result_rx
) = oneshot
::channel();
310 // FIXME: check if this works as expected as replacement for the combinator below?
311 // tokio::spawn(async move {
312 // let result: Result<(), Error> = (async move {
313 // while let Some(response) = verify_queue_rx.recv().await {
314 // match H2Client::h2api_response(response.await?).await {
315 // Ok(result) => println!("RESPONSE: {:?}", result),
316 // Err(err) => bail!("pipelined request failed: {}", err),
321 // let _ignore_closed_channel = verify_result_tx.send(result);
323 // old code for reference?
325 ReceiverStream
::new(verify_queue_rx
)
327 .try_for_each(move |response
: h2
::client
::ResponseFuture
| {
329 .map_err(Error
::from
)
330 .and_then(H2Client
::h2api_response
)
331 .map_ok(move |result
| if verbose { println!("RESPONSE: {:?}
", result) })
332 .map_err(|err| format_err!("pipelined request failed
: {}
", err))
335 let _ignore_closed_channel = verify_result_tx.send(result);
339 (verify_queue_tx, verify_result_rx)
342 fn append_chunk_queue(
347 ) -> (UploadQueueSender, UploadResultReceiver) {
348 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(64);
349 let (verify_result_tx, verify_result_rx) = oneshot::channel();
351 // FIXME: async-block-ify this code!
353 ReceiverStream::new(verify_queue_rx)
355 .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| {
356 match (response, merged_chunk_info) {
357 (Some(response), MergedChunkInfo::Known(list)) => {
358 future::Either::Left(
360 .map_err(Error::from)
361 .and_then(H2Client::h2api_response)
362 .and_then(move |_result| {
363 future::ok(MergedChunkInfo::Known(list))
367 (None, MergedChunkInfo::Known(list)) => {
368 future::Either::Right(future::ok(MergedChunkInfo::Known(list)))
373 .merge_known_chunks()
374 .and_then(move |merged_chunk_info| {
375 match merged_chunk_info {
376 MergedChunkInfo::Known(chunk_list) => {
377 let mut digest_list = vec![];
378 let mut offset_list = vec![];
379 for (offset, digest) in chunk_list {
380 digest_list.push(digest_to_hex(&digest));
381 offset_list.push(offset);
383 if verbose { println!("append chunks list len ({})", digest_list
.len()); }
384 let param
= json
!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list }
);
385 let request
= H2Client
::request_builder("localhost", "PUT", &path
, None
, Some("application/json")).unwrap();
386 let param_data
= bytes
::Bytes
::from(param
.to_string().into_bytes());
387 let upload_data
= Some(param_data
);
388 h2
.send_request(request
, upload_data
)
389 .and_then(move |response
| {
391 .map_err(Error
::from
)
392 .and_then(H2Client
::h2api_response
)
395 .map_err(|err
| format_err
!("pipelined request failed: {}", err
))
400 .try_for_each(|_
| future
::ok(()))
402 let _ignore_closed_channel
= verify_result_tx
.send(result
);
406 (verify_queue_tx
, verify_result_rx
)
409 pub async
fn download_previous_fixed_index(
412 manifest
: &BackupManifest
,
413 known_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
414 ) -> Result
<FixedIndexReader
, Error
> {
416 let mut tmpfile
= std
::fs
::OpenOptions
::new()
419 .custom_flags(libc
::O_TMPFILE
)
422 let param
= json
!({ "archive-name": archive_name }
);
423 self.h2
.download("previous", Some(param
), &mut tmpfile
).await?
;
425 let index
= FixedIndexReader
::new(tmpfile
)
426 .map_err(|err
| format_err
!("unable to read fixed index '{}' - {}", archive_name
, err
))?
;
427 // Note: do not use values stored in index (not trusted) - instead, computed them again
428 let (csum
, size
) = index
.compute_csum();
429 manifest
.verify_file(archive_name
, &csum
, size
)?
;
431 // add index chunks to known chunks
432 let mut known_chunks
= known_chunks
.lock().unwrap();
433 for i
in 0..index
.index_count() {
434 known_chunks
.insert(*index
.index_digest(i
).unwrap());
438 println
!("{}: known chunks list length is {}", archive_name
, index
.index_count());
444 pub async
fn download_previous_dynamic_index(
447 manifest
: &BackupManifest
,
448 known_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
449 ) -> Result
<DynamicIndexReader
, Error
> {
451 let mut tmpfile
= std
::fs
::OpenOptions
::new()
454 .custom_flags(libc
::O_TMPFILE
)
457 let param
= json
!({ "archive-name": archive_name }
);
458 self.h2
.download("previous", Some(param
), &mut tmpfile
).await?
;
460 let index
= DynamicIndexReader
::new(tmpfile
)
461 .map_err(|err
| format_err
!("unable to read dynmamic index '{}' - {}", archive_name
, err
))?
;
462 // Note: do not use values stored in index (not trusted) - instead, computed them again
463 let (csum
, size
) = index
.compute_csum();
464 manifest
.verify_file(archive_name
, &csum
, size
)?
;
466 // add index chunks to known chunks
467 let mut known_chunks
= known_chunks
.lock().unwrap();
468 for i
in 0..index
.index_count() {
469 known_chunks
.insert(*index
.index_digest(i
).unwrap());
473 println
!("{}: known chunks list length is {}", archive_name
, index
.index_count());
479 /// Retrieve backup time of last backup
480 pub async
fn previous_backup_time(&self) -> Result
<Option
<i64>, Error
> {
481 let data
= self.h2
.get("previous_backup_time", None
).await?
;
482 serde_json
::from_value(data
)
483 .map_err(|err
| format_err
!("Failed to parse backup time value returned by server - {}", err
))
486 /// Download backup manifest (index.json) of last backup
487 pub async
fn download_previous_manifest(&self) -> Result
<BackupManifest
, Error
> {
489 let mut raw_data
= Vec
::with_capacity(64 * 1024);
491 let param
= json
!({ "archive-name": MANIFEST_BLOB_NAME }
);
492 self.h2
.download("previous", Some(param
), &mut raw_data
).await?
;
494 let blob
= DataBlob
::load_from_reader(&mut &raw_data
[..])?
;
495 // no expected digest available
496 let data
= blob
.decode(self.crypt_config
.as_ref().map(Arc
::as_ref
), None
)?
;
498 let manifest
= BackupManifest
::from_data(&data
[..], self.crypt_config
.as_ref().map(Arc
::as_ref
))?
;
503 // We have no `self` here for `h2` and `verbose`, the only other arg "common" with 1 other
504 // funciton in the same path is `wid`, so those 3 could be in a struct, but there's no real use
505 // since this is a private method.
506 #[allow(clippy::too_many_arguments)]
507 fn upload_chunk_info_stream(
510 stream
: impl Stream
<Item
= Result
<bytes
::BytesMut
, Error
>>,
512 known_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
513 crypt_config
: Option
<Arc
<CryptConfig
>>,
516 ) -> impl Future
<Output
= Result
<(usize, usize, usize, usize, std
::time
::Duration
, [u8; 32]), Error
>> {
518 let total_chunks
= Arc
::new(AtomicUsize
::new(0));
519 let total_chunks2
= total_chunks
.clone();
520 let known_chunk_count
= Arc
::new(AtomicUsize
::new(0));
521 let known_chunk_count2
= known_chunk_count
.clone();
523 let stream_len
= Arc
::new(AtomicUsize
::new(0));
524 let stream_len2
= stream_len
.clone();
525 let reused_len
= Arc
::new(AtomicUsize
::new(0));
526 let reused_len2
= reused_len
.clone();
528 let append_chunk_path
= format
!("{}_index", prefix
);
529 let upload_chunk_path
= format
!("{}_chunk", prefix
);
530 let is_fixed_chunk_size
= prefix
== "fixed";
532 let (upload_queue
, upload_result
) =
533 Self::append_chunk_queue(h2
.clone(), wid
, append_chunk_path
, verbose
);
535 let start_time
= std
::time
::Instant
::now();
537 let index_csum
= Arc
::new(Mutex
::new(Some(openssl
::sha
::Sha256
::new())));
538 let index_csum_2
= index_csum
.clone();
541 .and_then(move |data
| {
543 let chunk_len
= data
.len();
545 total_chunks
.fetch_add(1, Ordering
::SeqCst
);
546 let offset
= stream_len
.fetch_add(chunk_len
, Ordering
::SeqCst
) as u64;
548 let mut chunk_builder
= DataChunkBuilder
::new(data
.as_ref())
551 if let Some(ref crypt_config
) = crypt_config
{
552 chunk_builder
= chunk_builder
.crypt_config(crypt_config
);
555 let mut known_chunks
= known_chunks
.lock().unwrap();
556 let digest
= chunk_builder
.digest();
558 let mut guard
= index_csum
.lock().unwrap();
559 let csum
= guard
.as_mut().unwrap();
561 let chunk_end
= offset
+ chunk_len
as u64;
563 if !is_fixed_chunk_size { csum.update(&chunk_end.to_le_bytes()); }
566 let chunk_is_known
= known_chunks
.contains(digest
);
568 known_chunk_count
.fetch_add(1, Ordering
::SeqCst
);
569 reused_len
.fetch_add(chunk_len
, Ordering
::SeqCst
);
570 future
::ok(MergedChunkInfo
::Known(vec
![(offset
, *digest
)]))
572 known_chunks
.insert(*digest
);
573 future
::ready(chunk_builder
575 .map(move |(chunk
, digest
)| MergedChunkInfo
::New(ChunkInfo
{
578 chunk_len
: chunk_len
as u64,
584 .merge_known_chunks()
585 .try_for_each(move |merged_chunk_info
| {
587 if let MergedChunkInfo
::New(chunk_info
) = merged_chunk_info
{
588 let offset
= chunk_info
.offset
;
589 let digest
= chunk_info
.digest
;
590 let digest_str
= digest_to_hex(&digest
);
592 /* too verbose, needs finer verbosity setting granularity
594 println!("upload new chunk {} ({} bytes, offset {})", digest_str,
595 chunk_info.chunk_len, offset);
599 let chunk_data
= chunk_info
.chunk
.into_inner();
602 "digest": digest_str
,
603 "size": chunk_info
.chunk_len
,
604 "encoded-size": chunk_data
.len(),
607 let ct
= "application/octet-stream";
608 let request
= H2Client
::request_builder("localhost", "POST", &upload_chunk_path
, Some(param
), Some(ct
)).unwrap();
609 let upload_data
= Some(bytes
::Bytes
::from(chunk_data
));
611 let new_info
= MergedChunkInfo
::Known(vec
![(offset
, digest
)]);
613 let mut upload_queue
= upload_queue
.clone();
614 future
::Either
::Left(h2
615 .send_request(request
, upload_data
)
616 .and_then(move |response
| async
move {
618 .send((new_info
, Some(response
)))
620 .map_err(|err
| format_err
!("failed to send to upload queue: {}", err
))
624 let mut upload_queue
= upload_queue
.clone();
625 future
::Either
::Right(async
move {
627 .send((merged_chunk_info
, None
))
629 .map_err(|err
| format_err
!("failed to send to upload queue: {}", err
))
633 .then(move |result
| async
move {
634 upload_result
.await?
.and(result
)
637 let duration
= start_time
.elapsed();
638 let total_chunks
= total_chunks2
.load(Ordering
::SeqCst
);
639 let known_chunk_count
= known_chunk_count2
.load(Ordering
::SeqCst
);
640 let stream_len
= stream_len2
.load(Ordering
::SeqCst
);
641 let reused_len
= reused_len2
.load(Ordering
::SeqCst
);
643 let mut guard
= index_csum_2
.lock().unwrap();
644 let csum
= guard
.take().unwrap().finish();
646 futures
::future
::ok((total_chunks
, known_chunk_count
, stream_len
, reused_len
, duration
, csum
))
650 /// Upload speed test - prints result to stderr
651 pub async
fn upload_speedtest(&self, verbose
: bool
) -> Result
<f64, Error
> {
653 let mut data
= vec
![];
654 // generate pseudo random byte sequence
655 for i
in 0..1024*1024 {
657 let byte
= ((i
>> (j
<<3))&0xff) as u8;
662 let item_len
= data
.len();
666 let (upload_queue
, upload_result
) = Self::response_queue(verbose
);
668 let start_time
= std
::time
::Instant
::now();
672 if start_time
.elapsed().as_secs() >= 5 {
676 let mut upload_queue
= upload_queue
.clone();
678 if verbose { eprintln!("send test data ({} bytes
)", data.len()); }
679 let request = H2Client::request_builder("localhost
", "POST
", "speedtest
", None, None).unwrap();
680 let request_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?;
682 upload_queue.send(request_future).await?;
685 drop(upload_queue); // close queue
687 let _ = upload_result.await?;
689 eprintln!("Uploaded {} chunks
in {} seconds
.", repeat, start_time.elapsed().as_secs());
690 let speed = ((item_len*(repeat as usize)) as f64)/start_time.elapsed().as_secs_f64();
691 eprintln!("Time per request
: {} microseconds
.", (start_time.elapsed().as_micros())/(repeat as u128));