1 use std
::collections
::HashSet
;
2 use std
::os
::unix
::fs
::OpenOptionsExt
;
3 use std
::sync
::atomic
::{AtomicUsize, Ordering}
;
4 use std
::sync
::{Arc, Mutex}
;
6 use anyhow
::{bail, format_err, Error}
;
8 use futures
::stream
::Stream
;
9 use futures
::future
::AbortHandle
;
10 use serde_json
::{json, Value}
;
11 use tokio
::io
::AsyncReadExt
;
12 use tokio
::sync
::{mpsc, oneshot}
;
14 use proxmox
::tools
::digest_to_hex
;
16 use super::merge_known_chunks
::{MergedChunkInfo, MergeKnownChunks}
;
18 use crate::tools
::format
::HumanByte
;
20 use super::{HttpClient, H2Client}
;
22 pub struct BackupWriter
{
26 crypt_config
: Option
<Arc
<CryptConfig
>>,
29 impl Drop
for BackupWriter
{
36 pub struct BackupStats
{
43 fn new(h2
: H2Client
, abort
: AbortHandle
, crypt_config
: Option
<Arc
<CryptConfig
>>, verbose
: bool
) -> Arc
<Self> {
44 Arc
::new(Self { h2, abort, crypt_config, verbose }
)
49 crypt_config
: Option
<Arc
<CryptConfig
>>,
56 ) -> Result
<Arc
<BackupWriter
>, Error
> {
59 "backup-type": backup_type
,
60 "backup-id": backup_id
,
61 "backup-time": backup_time
,
64 "benchmark": benchmark
67 let req
= HttpClient
::request_builder(
68 client
.server(), client
.port(), "GET", "/api2/json/backup", Some(param
)).unwrap();
70 let (h2
, abort
) = client
.start_h2_connection(req
, String
::from(PROXMOX_BACKUP_PROTOCOL_ID_V1
!())).await?
;
72 Ok(BackupWriter
::new(h2
, abort
, crypt_config
, debug
))
79 ) -> Result
<Value
, Error
> {
80 self.h2
.get(path
, param
).await
87 ) -> Result
<Value
, Error
> {
88 self.h2
.put(path
, param
).await
95 ) -> Result
<Value
, Error
> {
96 self.h2
.post(path
, param
).await
99 pub async
fn upload_post(
102 param
: Option
<Value
>,
105 ) -> Result
<Value
, Error
> {
106 self.h2
.upload("POST", path
, param
, content_type
, data
).await
109 pub async
fn send_upload_request(
113 param
: Option
<Value
>,
116 ) -> Result
<h2
::client
::ResponseFuture
, Error
> {
118 let request
= H2Client
::request_builder("localhost", method
, path
, param
, Some(content_type
)).unwrap();
119 let response_future
= self.h2
.send_request(request
, Some(bytes
::Bytes
::from(data
.clone()))).await?
;
123 pub async
fn upload_put(
126 param
: Option
<Value
>,
129 ) -> Result
<Value
, Error
> {
130 self.h2
.upload("PUT", path
, param
, content_type
, data
).await
133 pub async
fn finish(self: Arc
<Self>) -> Result
<(), Error
> {
134 let h2
= self.h2
.clone();
136 h2
.post("finish", None
)
143 pub fn cancel(&self) {
147 pub async
fn upload_blob
<R
: std
::io
::Read
>(
151 ) -> Result
<BackupStats
, Error
> {
152 let mut raw_data
= Vec
::new();
153 // fixme: avoid loading into memory
154 reader
.read_to_end(&mut raw_data
)?
;
156 let csum
= openssl
::sha
::sha256(&raw_data
);
157 let param
= json
!({"encoded-size": raw_data.len(), "file-name": file_name }
);
158 let size
= raw_data
.len() as u64;
159 let _value
= self.h2
.upload("POST", "blob", Some(param
), "application/octet-stream", raw_data
).await?
;
160 Ok(BackupStats { size, csum }
)
163 pub async
fn upload_blob_from_data(
169 ) -> Result
<BackupStats
, Error
> {
170 let blob
= match (encrypt
, &self.crypt_config
) {
171 (false, _
) => DataBlob
::encode(&data
, None
, compress
)?
,
172 (true, None
) => bail
!("requested encryption without a crypt config"),
173 (true, Some(crypt_config
)) => DataBlob
::encode(&data
, Some(crypt_config
), compress
)?
,
176 let raw_data
= blob
.into_inner();
177 let size
= raw_data
.len() as u64;
179 let csum
= openssl
::sha
::sha256(&raw_data
);
180 let param
= json
!({"encoded-size": size, "file-name": file_name }
);
181 let _value
= self.h2
.upload("POST", "blob", Some(param
), "application/octet-stream", raw_data
).await?
;
182 Ok(BackupStats { size, csum }
)
185 pub async
fn upload_blob_from_file
<P
: AsRef
<std
::path
::Path
>>(
191 ) -> Result
<BackupStats
, Error
> {
193 let src_path
= src_path
.as_ref();
195 let mut file
= tokio
::fs
::File
::open(src_path
)
197 .map_err(|err
| format_err
!("unable to open file {:?} - {}", src_path
, err
))?
;
199 let mut contents
= Vec
::new();
201 file
.read_to_end(&mut contents
)
203 .map_err(|err
| format_err
!("unable to read file {:?} - {}", src_path
, err
))?
;
205 self.upload_blob_from_data(contents
, file_name
, compress
, encrypt
).await
208 pub async
fn upload_stream(
210 previous_manifest
: Option
<Arc
<BackupManifest
>>,
212 stream
: impl Stream
<Item
= Result
<bytes
::BytesMut
, Error
>>,
214 fixed_size
: Option
<u64>,
217 ) -> Result
<BackupStats
, Error
> {
218 let known_chunks
= Arc
::new(Mutex
::new(HashSet
::new()));
220 let mut param
= json
!({ "archive-name": archive_name }
);
221 if let Some(size
) = fixed_size
{
222 param
["size"] = size
.into();
225 if encrypt
&& self.crypt_config
.is_none() {
226 bail
!("requested encryption without a crypt config");
229 let index_path
= format
!("{}_index", prefix
);
230 let close_path
= format
!("{}_close", prefix
);
232 if let Some(manifest
) = previous_manifest
{
233 // try, but ignore errors
234 match archive_type(archive_name
) {
235 Ok(ArchiveType
::FixedIndex
) => {
236 let _
= self.download_previous_fixed_index(archive_name
, &manifest
, known_chunks
.clone()).await
;
238 Ok(ArchiveType
::DynamicIndex
) => {
239 let _
= self.download_previous_dynamic_index(archive_name
, &manifest
, known_chunks
.clone()).await
;
241 _
=> { /* do nothing */ }
245 let wid
= self.h2
.post(&index_path
, Some(param
)).await?
.as_u64().unwrap();
247 let (chunk_count
, chunk_reused
, size
, size_reused
, duration
, csum
) =
248 Self::upload_chunk_info_stream(
253 known_chunks
.clone(),
254 if encrypt { self.crypt_config.clone() }
else { None }
,
260 let uploaded
= size
- size_reused
;
261 let vsize_h
: HumanByte
= size
.into();
262 let archive
= if self.verbose
{
263 archive_name
.to_string()
265 crate::tools
::format
::strip_server_file_extension(archive_name
.clone())
267 if archive_name
!= CATALOG_NAME
{
268 let speed
: HumanByte
= ((uploaded
* 1_000_000) / (duration
.as_micros() as usize)).into();
269 let uploaded
: HumanByte
= uploaded
.into();
270 println
!("{}: had to upload {} of {} in {:.2}s, average speed {}/s).", archive
, uploaded
, vsize_h
, duration
.as_secs_f64(), speed
);
272 println
!("Uploaded backup catalog ({})", vsize_h
);
275 if size_reused
> 0 && size
> 1024*1024 {
276 let reused_percent
= size_reused
as f64 * 100. / size
as f64;
277 let reused
: HumanByte
= size_reused
.into();
278 println
!("{}: backup was done incrementally, reused {} ({:.1}%)", archive
, reused
, reused_percent
);
280 if self.verbose
&& chunk_count
> 0 {
281 println
!("{}: Reused {} from {} chunks.", archive
, chunk_reused
, chunk_count
);
282 println
!("{}: Average chunk size was {}.", archive
, HumanByte
::from(size
/chunk_count
));
283 println
!("{}: Average time per request: {} microseconds.", archive
, (duration
.as_micros())/(chunk_count
as u128
));
288 "chunk-count": chunk_count
,
290 "csum": proxmox
::tools
::digest_to_hex(&csum
),
292 let _value
= self.h2
.post(&close_path
, Some(param
)).await?
;
299 fn response_queue(verbose
: bool
) -> (
300 mpsc
::Sender
<h2
::client
::ResponseFuture
>,
301 oneshot
::Receiver
<Result
<(), Error
>>
303 let (verify_queue_tx
, verify_queue_rx
) = mpsc
::channel(100);
304 let (verify_result_tx
, verify_result_rx
) = oneshot
::channel();
306 // FIXME: check if this works as expected as replacement for the combinator below?
307 // tokio::spawn(async move {
308 // let result: Result<(), Error> = (async move {
309 // while let Some(response) = verify_queue_rx.recv().await {
310 // match H2Client::h2api_response(response.await?).await {
311 // Ok(result) => println!("RESPONSE: {:?}", result),
312 // Err(err) => bail!("pipelined request failed: {}", err),
317 // let _ignore_closed_channel = verify_result_tx.send(result);
319 // old code for reference?
323 .try_for_each(move |response
: h2
::client
::ResponseFuture
| {
325 .map_err(Error
::from
)
326 .and_then(H2Client
::h2api_response
)
327 .map_ok(move |result
| if verbose { println!("RESPONSE: {:?}
", result) })
328 .map_err(|err| format_err!("pipelined request failed
: {}
", err))
331 let _ignore_closed_channel = verify_result_tx.send(result);
335 (verify_queue_tx, verify_result_rx)
338 fn append_chunk_queue(h2: H2Client, wid: u64, path: String, verbose: bool) -> (
339 mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>,
340 oneshot::Receiver<Result<(), Error>>,
342 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(64);
343 let (verify_result_tx, verify_result_rx) = oneshot::channel();
345 let h2_2 = h2.clone();
347 // FIXME: async-block-ify this code!
351 .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| {
352 match (response, merged_chunk_info) {
353 (Some(response), MergedChunkInfo::Known(list)) => {
354 future::Either::Left(
356 .map_err(Error::from)
357 .and_then(H2Client::h2api_response)
358 .and_then(move |_result| {
359 future::ok(MergedChunkInfo::Known(list))
363 (None, MergedChunkInfo::Known(list)) => {
364 future::Either::Right(future::ok(MergedChunkInfo::Known(list)))
369 .merge_known_chunks()
370 .and_then(move |merged_chunk_info| {
371 match merged_chunk_info {
372 MergedChunkInfo::Known(chunk_list) => {
373 let mut digest_list = vec![];
374 let mut offset_list = vec![];
375 for (offset, digest) in chunk_list {
376 digest_list.push(digest_to_hex(&digest));
377 offset_list.push(offset);
379 if verbose { println!("append chunks list len ({})", digest_list
.len()); }
380 let param
= json
!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list }
);
381 let request
= H2Client
::request_builder("localhost", "PUT", &path
, None
, Some("application/json")).unwrap();
382 let param_data
= bytes
::Bytes
::from(param
.to_string().into_bytes());
383 let upload_data
= Some(param_data
);
384 h2_2
.send_request(request
, upload_data
)
385 .and_then(move |response
| {
387 .map_err(Error
::from
)
388 .and_then(H2Client
::h2api_response
)
391 .map_err(|err
| format_err
!("pipelined request failed: {}", err
))
396 .try_for_each(|_
| future
::ok(()))
398 let _ignore_closed_channel
= verify_result_tx
.send(result
);
402 (verify_queue_tx
, verify_result_rx
)
405 pub async
fn download_previous_fixed_index(
408 manifest
: &BackupManifest
,
409 known_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
410 ) -> Result
<FixedIndexReader
, Error
> {
412 let mut tmpfile
= std
::fs
::OpenOptions
::new()
415 .custom_flags(libc
::O_TMPFILE
)
418 let param
= json
!({ "archive-name": archive_name }
);
419 self.h2
.download("previous", Some(param
), &mut tmpfile
).await?
;
421 let index
= FixedIndexReader
::new(tmpfile
)
422 .map_err(|err
| format_err
!("unable to read fixed index '{}' - {}", archive_name
, err
))?
;
423 // Note: do not use values stored in index (not trusted) - instead, computed them again
424 let (csum
, size
) = index
.compute_csum();
425 manifest
.verify_file(archive_name
, &csum
, size
)?
;
427 // add index chunks to known chunks
428 let mut known_chunks
= known_chunks
.lock().unwrap();
429 for i
in 0..index
.index_count() {
430 known_chunks
.insert(*index
.index_digest(i
).unwrap());
434 println
!("{}: known chunks list length is {}", archive_name
, index
.index_count());
440 pub async
fn download_previous_dynamic_index(
443 manifest
: &BackupManifest
,
444 known_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
445 ) -> Result
<DynamicIndexReader
, Error
> {
447 let mut tmpfile
= std
::fs
::OpenOptions
::new()
450 .custom_flags(libc
::O_TMPFILE
)
453 let param
= json
!({ "archive-name": archive_name }
);
454 self.h2
.download("previous", Some(param
), &mut tmpfile
).await?
;
456 let index
= DynamicIndexReader
::new(tmpfile
)
457 .map_err(|err
| format_err
!("unable to read dynmamic index '{}' - {}", archive_name
, err
))?
;
458 // Note: do not use values stored in index (not trusted) - instead, computed them again
459 let (csum
, size
) = index
.compute_csum();
460 manifest
.verify_file(archive_name
, &csum
, size
)?
;
462 // add index chunks to known chunks
463 let mut known_chunks
= known_chunks
.lock().unwrap();
464 for i
in 0..index
.index_count() {
465 known_chunks
.insert(*index
.index_digest(i
).unwrap());
469 println
!("{}: known chunks list length is {}", archive_name
, index
.index_count());
475 /// Download backup manifest (index.json) of last backup
476 pub async
fn download_previous_manifest(&self) -> Result
<BackupManifest
, Error
> {
478 let mut raw_data
= Vec
::with_capacity(64 * 1024);
480 let param
= json
!({ "archive-name": MANIFEST_BLOB_NAME }
);
481 self.h2
.download("previous", Some(param
), &mut raw_data
).await?
;
483 let blob
= DataBlob
::load_from_reader(&mut &raw_data
[..])?
;
484 // no expected digest available
485 let data
= blob
.decode(self.crypt_config
.as_ref().map(Arc
::as_ref
), None
)?
;
487 let manifest
= BackupManifest
::from_data(&data
[..], self.crypt_config
.as_ref().map(Arc
::as_ref
))?
;
492 fn upload_chunk_info_stream(
495 stream
: impl Stream
<Item
= Result
<bytes
::BytesMut
, Error
>>,
497 known_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
498 crypt_config
: Option
<Arc
<CryptConfig
>>,
501 ) -> impl Future
<Output
= Result
<(usize, usize, usize, usize, std
::time
::Duration
, [u8; 32]), Error
>> {
503 let total_chunks
= Arc
::new(AtomicUsize
::new(0));
504 let total_chunks2
= total_chunks
.clone();
505 let known_chunk_count
= Arc
::new(AtomicUsize
::new(0));
506 let known_chunk_count2
= known_chunk_count
.clone();
508 let stream_len
= Arc
::new(AtomicUsize
::new(0));
509 let stream_len2
= stream_len
.clone();
510 let reused_len
= Arc
::new(AtomicUsize
::new(0));
511 let reused_len2
= reused_len
.clone();
513 let append_chunk_path
= format
!("{}_index", prefix
);
514 let upload_chunk_path
= format
!("{}_chunk", prefix
);
515 let is_fixed_chunk_size
= prefix
== "fixed";
517 let (upload_queue
, upload_result
) =
518 Self::append_chunk_queue(h2
.clone(), wid
, append_chunk_path
.to_owned(), verbose
);
520 let start_time
= std
::time
::Instant
::now();
522 let index_csum
= Arc
::new(Mutex
::new(Some(openssl
::sha
::Sha256
::new())));
523 let index_csum_2
= index_csum
.clone();
526 .and_then(move |data
| {
528 let chunk_len
= data
.len();
530 total_chunks
.fetch_add(1, Ordering
::SeqCst
);
531 let offset
= stream_len
.fetch_add(chunk_len
, Ordering
::SeqCst
) as u64;
533 let mut chunk_builder
= DataChunkBuilder
::new(data
.as_ref())
536 if let Some(ref crypt_config
) = crypt_config
{
537 chunk_builder
= chunk_builder
.crypt_config(crypt_config
);
540 let mut known_chunks
= known_chunks
.lock().unwrap();
541 let digest
= chunk_builder
.digest();
543 let mut guard
= index_csum
.lock().unwrap();
544 let csum
= guard
.as_mut().unwrap();
546 let chunk_end
= offset
+ chunk_len
as u64;
548 if !is_fixed_chunk_size { csum.update(&chunk_end.to_le_bytes()); }
551 let chunk_is_known
= known_chunks
.contains(digest
);
553 known_chunk_count
.fetch_add(1, Ordering
::SeqCst
);
554 reused_len
.fetch_add(chunk_len
, Ordering
::SeqCst
);
555 future
::ok(MergedChunkInfo
::Known(vec
![(offset
, *digest
)]))
557 known_chunks
.insert(*digest
);
558 future
::ready(chunk_builder
560 .map(move |(chunk
, digest
)| MergedChunkInfo
::New(ChunkInfo
{
563 chunk_len
: chunk_len
as u64,
569 .merge_known_chunks()
570 .try_for_each(move |merged_chunk_info
| {
572 if let MergedChunkInfo
::New(chunk_info
) = merged_chunk_info
{
573 let offset
= chunk_info
.offset
;
574 let digest
= chunk_info
.digest
;
575 let digest_str
= digest_to_hex(&digest
);
577 if false && verbose
{ // TO verbose, needs finer verbosity setting granularity
578 println
!("upload new chunk {} ({} bytes, offset {})", digest_str
,
579 chunk_info
.chunk_len
, offset
);
582 let chunk_data
= chunk_info
.chunk
.into_inner();
585 "digest": digest_str
,
586 "size": chunk_info
.chunk_len
,
587 "encoded-size": chunk_data
.len(),
590 let ct
= "application/octet-stream";
591 let request
= H2Client
::request_builder("localhost", "POST", &upload_chunk_path
, Some(param
), Some(ct
)).unwrap();
592 let upload_data
= Some(bytes
::Bytes
::from(chunk_data
));
594 let new_info
= MergedChunkInfo
::Known(vec
![(offset
, digest
)]);
596 let mut upload_queue
= upload_queue
.clone();
597 future
::Either
::Left(h2
598 .send_request(request
, upload_data
)
599 .and_then(move |response
| async
move {
601 .send((new_info
, Some(response
)))
603 .map_err(|err
| format_err
!("failed to send to upload queue: {}", err
))
607 let mut upload_queue
= upload_queue
.clone();
608 future
::Either
::Right(async
move {
610 .send((merged_chunk_info
, None
))
612 .map_err(|err
| format_err
!("failed to send to upload queue: {}", err
))
616 .then(move |result
| async
move {
617 upload_result
.await?
.and(result
)
620 let duration
= start_time
.elapsed();
621 let total_chunks
= total_chunks2
.load(Ordering
::SeqCst
);
622 let known_chunk_count
= known_chunk_count2
.load(Ordering
::SeqCst
);
623 let stream_len
= stream_len2
.load(Ordering
::SeqCst
);
624 let reused_len
= reused_len2
.load(Ordering
::SeqCst
);
626 let mut guard
= index_csum_2
.lock().unwrap();
627 let csum
= guard
.take().unwrap().finish();
629 futures
::future
::ok((total_chunks
, known_chunk_count
, stream_len
, reused_len
, duration
, csum
))
633 /// Upload speed test - prints result to stderr
634 pub async
fn upload_speedtest(&self, verbose
: bool
) -> Result
<f64, Error
> {
636 let mut data
= vec
![];
637 // generate pseudo random byte sequence
638 for i
in 0..1024*1024 {
640 let byte
= ((i
>> (j
<<3))&0xff) as u8;
645 let item_len
= data
.len();
649 let (upload_queue
, upload_result
) = Self::response_queue(verbose
);
651 let start_time
= std
::time
::Instant
::now();
655 if start_time
.elapsed().as_secs() >= 5 {
659 let mut upload_queue
= upload_queue
.clone();
661 if verbose { eprintln!("send test data ({} bytes
)", data.len()); }
662 let request = H2Client::request_builder("localhost
", "POST
", "speedtest
", None, None).unwrap();
663 let request_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?;
665 upload_queue.send(request_future).await?;
668 drop(upload_queue); // close queue
670 let _ = upload_result.await?;
672 eprintln!("Uploaded {} chunks
in {} seconds
.", repeat, start_time.elapsed().as_secs());
673 let speed = ((item_len*(repeat as usize)) as f64)/start_time.elapsed().as_secs_f64();
674 eprintln!("Time per request
: {} microseconds
.", (start_time.elapsed().as_micros())/(repeat as u128));