1 use std
::collections
::HashSet
;
2 use std
::os
::unix
::fs
::OpenOptionsExt
;
3 use std
::sync
::atomic
::{AtomicUsize, Ordering}
;
4 use std
::sync
::{Arc, Mutex}
;
6 use anyhow
::{bail, format_err, Error}
;
7 use chrono
::{DateTime, Utc}
;
9 use futures
::stream
::Stream
;
10 use futures
::future
::AbortHandle
;
11 use serde_json
::{json, Value}
;
12 use tokio
::io
::AsyncReadExt
;
13 use tokio
::sync
::{mpsc, oneshot}
;
15 use proxmox
::tools
::digest_to_hex
;
17 use super::merge_known_chunks
::{MergedChunkInfo, MergeKnownChunks}
;
19 use crate::tools
::format
::HumanByte
;
21 use super::{HttpClient, H2Client}
;
23 pub struct BackupWriter
{
27 crypt_config
: Option
<Arc
<CryptConfig
>>,
30 impl Drop
for BackupWriter
{
37 pub struct BackupStats
{
44 fn new(h2
: H2Client
, abort
: AbortHandle
, crypt_config
: Option
<Arc
<CryptConfig
>>, verbose
: bool
) -> Arc
<Self> {
45 Arc
::new(Self { h2, abort, crypt_config, verbose }
)
50 crypt_config
: Option
<Arc
<CryptConfig
>>,
54 backup_time
: DateTime
<Utc
>,
56 ) -> Result
<Arc
<BackupWriter
>, Error
> {
59 "backup-type": backup_type
,
60 "backup-id": backup_id
,
61 "backup-time": backup_time
.timestamp(),
66 let req
= HttpClient
::request_builder(
67 client
.server(), "GET", "/api2/json/backup", Some(param
)).unwrap();
69 let (h2
, abort
) = client
.start_h2_connection(req
, String
::from(PROXMOX_BACKUP_PROTOCOL_ID_V1
!())).await?
;
71 Ok(BackupWriter
::new(h2
, abort
, crypt_config
, debug
))
78 ) -> Result
<Value
, Error
> {
79 self.h2
.get(path
, param
).await
86 ) -> Result
<Value
, Error
> {
87 self.h2
.put(path
, param
).await
94 ) -> Result
<Value
, Error
> {
95 self.h2
.post(path
, param
).await
98 pub async
fn upload_post(
101 param
: Option
<Value
>,
104 ) -> Result
<Value
, Error
> {
105 self.h2
.upload("POST", path
, param
, content_type
, data
).await
108 pub async
fn send_upload_request(
112 param
: Option
<Value
>,
115 ) -> Result
<h2
::client
::ResponseFuture
, Error
> {
117 let request
= H2Client
::request_builder("localhost", method
, path
, param
, Some(content_type
)).unwrap();
118 let response_future
= self.h2
.send_request(request
, Some(bytes
::Bytes
::from(data
.clone()))).await?
;
122 pub async
fn upload_put(
125 param
: Option
<Value
>,
128 ) -> Result
<Value
, Error
> {
129 self.h2
.upload("PUT", path
, param
, content_type
, data
).await
132 pub async
fn finish(self: Arc
<Self>) -> Result
<(), Error
> {
133 let h2
= self.h2
.clone();
135 h2
.post("finish", None
)
142 pub fn cancel(&self) {
146 pub async
fn upload_blob
<R
: std
::io
::Read
>(
150 ) -> Result
<BackupStats
, Error
> {
151 let mut raw_data
= Vec
::new();
152 // fixme: avoid loading into memory
153 reader
.read_to_end(&mut raw_data
)?
;
155 let csum
= openssl
::sha
::sha256(&raw_data
);
156 let param
= json
!({"encoded-size": raw_data.len(), "file-name": file_name }
);
157 let size
= raw_data
.len() as u64;
158 let _value
= self.h2
.upload("POST", "blob", Some(param
), "application/octet-stream", raw_data
).await?
;
159 Ok(BackupStats { size, csum }
)
162 pub async
fn upload_blob_from_data(
168 ) -> Result
<BackupStats
, Error
> {
169 let blob
= match (encrypt
, &self.crypt_config
) {
170 (false, _
) => DataBlob
::encode(&data
, None
, compress
)?
,
171 (true, None
) => bail
!("requested encryption without a crypt config"),
172 (true, Some(crypt_config
)) => DataBlob
::encode(&data
, Some(crypt_config
), compress
)?
,
175 let raw_data
= blob
.into_inner();
176 let size
= raw_data
.len() as u64;
178 let csum
= openssl
::sha
::sha256(&raw_data
);
179 let param
= json
!({"encoded-size": size, "file-name": file_name }
);
180 let _value
= self.h2
.upload("POST", "blob", Some(param
), "application/octet-stream", raw_data
).await?
;
181 Ok(BackupStats { size, csum }
)
184 pub async
fn upload_blob_from_file
<P
: AsRef
<std
::path
::Path
>>(
190 ) -> Result
<BackupStats
, Error
> {
192 let src_path
= src_path
.as_ref();
194 let mut file
= tokio
::fs
::File
::open(src_path
)
196 .map_err(|err
| format_err
!("unable to open file {:?} - {}", src_path
, err
))?
;
198 let mut contents
= Vec
::new();
200 file
.read_to_end(&mut contents
)
202 .map_err(|err
| format_err
!("unable to read file {:?} - {}", src_path
, err
))?
;
204 self.upload_blob_from_data(contents
, file_name
, compress
, encrypt
).await
207 pub async
fn upload_stream(
209 previous_manifest
: Option
<Arc
<BackupManifest
>>,
211 stream
: impl Stream
<Item
= Result
<bytes
::BytesMut
, Error
>>,
213 fixed_size
: Option
<u64>,
216 ) -> Result
<BackupStats
, Error
> {
217 let known_chunks
= Arc
::new(Mutex
::new(HashSet
::new()));
219 let mut param
= json
!({ "archive-name": archive_name }
);
220 if let Some(size
) = fixed_size
{
221 param
["size"] = size
.into();
224 if encrypt
&& self.crypt_config
.is_none() {
225 bail
!("requested encryption without a crypt config");
228 let index_path
= format
!("{}_index", prefix
);
229 let close_path
= format
!("{}_close", prefix
);
231 if let Some(manifest
) = previous_manifest
{
232 // try, but ignore errors
233 match archive_type(archive_name
) {
234 Ok(ArchiveType
::FixedIndex
) => {
235 let _
= self.download_previous_fixed_index(archive_name
, &manifest
, known_chunks
.clone()).await
;
237 Ok(ArchiveType
::DynamicIndex
) => {
238 let _
= self.download_previous_dynamic_index(archive_name
, &manifest
, known_chunks
.clone()).await
;
240 _
=> { /* do nothing */ }
244 let wid
= self.h2
.post(&index_path
, Some(param
)).await?
.as_u64().unwrap();
246 let (chunk_count
, chunk_reused
, size
, size_reused
, duration
, csum
) =
247 Self::upload_chunk_info_stream(
252 known_chunks
.clone(),
253 if encrypt { self.crypt_config.clone() }
else { None }
,
259 let uploaded
= size
- size_reused
;
260 let vsize_h
: HumanByte
= size
.into();
261 let archive
= if self.verbose
{
262 archive_name
.to_string()
264 crate::tools
::format
::strip_server_file_expenstion(archive_name
.clone())
266 if archive_name
!= CATALOG_NAME
{
267 let speed
: HumanByte
= (uploaded
/ (duration
.as_secs() as usize)).into();
268 let uploaded
: HumanByte
= uploaded
.into();
269 println
!("{}: had to upload {} from {} in {}s, avgerage speed {}/s).", archive
, uploaded
, vsize_h
, duration
.as_secs(), speed
);
271 println
!("Uploaded backup catalog ({})", vsize_h
);
274 if size_reused
> 0 && size
> 1024*1024 {
275 let reused_percent
= size_reused
as f64 * 100. / size
as f64;
276 let reused
: HumanByte
= size_reused
.into();
277 println
!("{}: backup was done incrementally, reused {} ({:.1}%)", archive
, reused
, reused_percent
);
279 if self.verbose
&& chunk_count
> 0 {
280 println
!("{}: Reused {} from {} chunks.", archive
, chunk_reused
, chunk_count
);
281 println
!("{}: Average chunk size was {}.", archive
, HumanByte
::from(size
/chunk_count
));
282 println
!("{}: Average time per request: {} microseconds.", archive
, (duration
.as_micros())/(chunk_count
as u128
));
287 "chunk-count": chunk_count
,
289 "csum": proxmox
::tools
::digest_to_hex(&csum
),
291 let _value
= self.h2
.post(&close_path
, Some(param
)).await?
;
298 fn response_queue(verbose
: bool
) -> (
299 mpsc
::Sender
<h2
::client
::ResponseFuture
>,
300 oneshot
::Receiver
<Result
<(), Error
>>
302 let (verify_queue_tx
, verify_queue_rx
) = mpsc
::channel(100);
303 let (verify_result_tx
, verify_result_rx
) = oneshot
::channel();
305 // FIXME: check if this works as expected as replacement for the combinator below?
306 // tokio::spawn(async move {
307 // let result: Result<(), Error> = (async move {
308 // while let Some(response) = verify_queue_rx.recv().await {
309 // match H2Client::h2api_response(response.await?).await {
310 // Ok(result) => println!("RESPONSE: {:?}", result),
311 // Err(err) => bail!("pipelined request failed: {}", err),
316 // let _ignore_closed_channel = verify_result_tx.send(result);
318 // old code for reference?
322 .try_for_each(move |response
: h2
::client
::ResponseFuture
| {
324 .map_err(Error
::from
)
325 .and_then(H2Client
::h2api_response
)
326 .map_ok(move |result
| if verbose { println!("RESPONSE: {:?}
", result) })
327 .map_err(|err| format_err!("pipelined request failed
: {}
", err))
330 let _ignore_closed_channel = verify_result_tx.send(result);
334 (verify_queue_tx, verify_result_rx)
337 fn append_chunk_queue(h2: H2Client, wid: u64, path: String, verbose: bool) -> (
338 mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>,
339 oneshot::Receiver<Result<(), Error>>,
341 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(64);
342 let (verify_result_tx, verify_result_rx) = oneshot::channel();
344 let h2_2 = h2.clone();
346 // FIXME: async-block-ify this code!
350 .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| {
351 match (response, merged_chunk_info) {
352 (Some(response), MergedChunkInfo::Known(list)) => {
353 future::Either::Left(
355 .map_err(Error::from)
356 .and_then(H2Client::h2api_response)
357 .and_then(move |_result| {
358 future::ok(MergedChunkInfo::Known(list))
362 (None, MergedChunkInfo::Known(list)) => {
363 future::Either::Right(future::ok(MergedChunkInfo::Known(list)))
368 .merge_known_chunks()
369 .and_then(move |merged_chunk_info| {
370 match merged_chunk_info {
371 MergedChunkInfo::Known(chunk_list) => {
372 let mut digest_list = vec![];
373 let mut offset_list = vec![];
374 for (offset, digest) in chunk_list {
375 digest_list.push(digest_to_hex(&digest));
376 offset_list.push(offset);
378 if verbose { println!("append chunks list len ({})", digest_list
.len()); }
379 let param
= json
!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list }
);
380 let request
= H2Client
::request_builder("localhost", "PUT", &path
, None
, Some("application/json")).unwrap();
381 let param_data
= bytes
::Bytes
::from(param
.to_string().into_bytes());
382 let upload_data
= Some(param_data
);
383 h2_2
.send_request(request
, upload_data
)
384 .and_then(move |response
| {
386 .map_err(Error
::from
)
387 .and_then(H2Client
::h2api_response
)
390 .map_err(|err
| format_err
!("pipelined request failed: {}", err
))
395 .try_for_each(|_
| future
::ok(()))
397 let _ignore_closed_channel
= verify_result_tx
.send(result
);
401 (verify_queue_tx
, verify_result_rx
)
404 pub async
fn download_previous_fixed_index(
407 manifest
: &BackupManifest
,
408 known_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
409 ) -> Result
<FixedIndexReader
, Error
> {
411 let mut tmpfile
= std
::fs
::OpenOptions
::new()
414 .custom_flags(libc
::O_TMPFILE
)
417 let param
= json
!({ "archive-name": archive_name }
);
418 self.h2
.download("previous", Some(param
), &mut tmpfile
).await?
;
420 let index
= FixedIndexReader
::new(tmpfile
)
421 .map_err(|err
| format_err
!("unable to read fixed index '{}' - {}", archive_name
, err
))?
;
422 // Note: do not use values stored in index (not trusted) - instead, computed them again
423 let (csum
, size
) = index
.compute_csum();
424 manifest
.verify_file(archive_name
, &csum
, size
)?
;
426 // add index chunks to known chunks
427 let mut known_chunks
= known_chunks
.lock().unwrap();
428 for i
in 0..index
.index_count() {
429 known_chunks
.insert(*index
.index_digest(i
).unwrap());
433 println
!("{}: known chunks list length is {}", archive_name
, index
.index_count());
439 pub async
fn download_previous_dynamic_index(
442 manifest
: &BackupManifest
,
443 known_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
444 ) -> Result
<DynamicIndexReader
, Error
> {
446 let mut tmpfile
= std
::fs
::OpenOptions
::new()
449 .custom_flags(libc
::O_TMPFILE
)
452 let param
= json
!({ "archive-name": archive_name }
);
453 self.h2
.download("previous", Some(param
), &mut tmpfile
).await?
;
455 let index
= DynamicIndexReader
::new(tmpfile
)
456 .map_err(|err
| format_err
!("unable to read dynmamic index '{}' - {}", archive_name
, err
))?
;
457 // Note: do not use values stored in index (not trusted) - instead, computed them again
458 let (csum
, size
) = index
.compute_csum();
459 manifest
.verify_file(archive_name
, &csum
, size
)?
;
461 // add index chunks to known chunks
462 let mut known_chunks
= known_chunks
.lock().unwrap();
463 for i
in 0..index
.index_count() {
464 known_chunks
.insert(*index
.index_digest(i
).unwrap());
468 println
!("{}: known chunks list length is {}", archive_name
, index
.index_count());
474 /// Download backup manifest (index.json) of last backup
475 pub async
fn download_previous_manifest(&self) -> Result
<BackupManifest
, Error
> {
477 let mut raw_data
= Vec
::with_capacity(64 * 1024);
479 let param
= json
!({ "archive-name": MANIFEST_BLOB_NAME }
);
480 self.h2
.download("previous", Some(param
), &mut raw_data
).await?
;
482 let blob
= DataBlob
::from_raw(raw_data
)?
;
484 let data
= blob
.decode(self.crypt_config
.as_ref().map(Arc
::as_ref
))?
;
486 let manifest
= BackupManifest
::from_data(&data
[..], self.crypt_config
.as_ref().map(Arc
::as_ref
))?
;
491 fn upload_chunk_info_stream(
494 stream
: impl Stream
<Item
= Result
<bytes
::BytesMut
, Error
>>,
496 known_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
497 crypt_config
: Option
<Arc
<CryptConfig
>>,
500 ) -> impl Future
<Output
= Result
<(usize, usize, usize, usize, std
::time
::Duration
, [u8; 32]), Error
>> {
502 let total_chunks
= Arc
::new(AtomicUsize
::new(0));
503 let total_chunks2
= total_chunks
.clone();
504 let known_chunk_count
= Arc
::new(AtomicUsize
::new(0));
505 let known_chunk_count2
= known_chunk_count
.clone();
507 let stream_len
= Arc
::new(AtomicUsize
::new(0));
508 let stream_len2
= stream_len
.clone();
509 let reused_len
= Arc
::new(AtomicUsize
::new(0));
510 let reused_len2
= reused_len
.clone();
512 let append_chunk_path
= format
!("{}_index", prefix
);
513 let upload_chunk_path
= format
!("{}_chunk", prefix
);
514 let is_fixed_chunk_size
= prefix
== "fixed";
516 let (upload_queue
, upload_result
) =
517 Self::append_chunk_queue(h2
.clone(), wid
, append_chunk_path
.to_owned(), verbose
);
519 let start_time
= std
::time
::Instant
::now();
521 let index_csum
= Arc
::new(Mutex
::new(Some(openssl
::sha
::Sha256
::new())));
522 let index_csum_2
= index_csum
.clone();
525 .and_then(move |data
| {
527 let chunk_len
= data
.len();
529 total_chunks
.fetch_add(1, Ordering
::SeqCst
);
530 let offset
= stream_len
.fetch_add(chunk_len
, Ordering
::SeqCst
) as u64;
532 let mut chunk_builder
= DataChunkBuilder
::new(data
.as_ref())
535 if let Some(ref crypt_config
) = crypt_config
{
536 chunk_builder
= chunk_builder
.crypt_config(crypt_config
);
539 let mut known_chunks
= known_chunks
.lock().unwrap();
540 let digest
= chunk_builder
.digest();
542 let mut guard
= index_csum
.lock().unwrap();
543 let csum
= guard
.as_mut().unwrap();
545 let chunk_end
= offset
+ chunk_len
as u64;
547 if !is_fixed_chunk_size { csum.update(&chunk_end.to_le_bytes()); }
550 let chunk_is_known
= known_chunks
.contains(digest
);
552 known_chunk_count
.fetch_add(1, Ordering
::SeqCst
);
553 reused_len
.fetch_add(chunk_len
, Ordering
::SeqCst
);
554 future
::ok(MergedChunkInfo
::Known(vec
![(offset
, *digest
)]))
556 known_chunks
.insert(*digest
);
557 future
::ready(chunk_builder
559 .map(move |(chunk
, digest
)| MergedChunkInfo
::New(ChunkInfo
{
562 chunk_len
: chunk_len
as u64,
568 .merge_known_chunks()
569 .try_for_each(move |merged_chunk_info
| {
571 if let MergedChunkInfo
::New(chunk_info
) = merged_chunk_info
{
572 let offset
= chunk_info
.offset
;
573 let digest
= chunk_info
.digest
;
574 let digest_str
= digest_to_hex(&digest
);
576 if false && verbose
{ // TO verbose, needs finer verbosity setting granularity
577 println
!("upload new chunk {} ({} bytes, offset {})", digest_str
,
578 chunk_info
.chunk_len
, offset
);
581 let chunk_data
= chunk_info
.chunk
.into_inner();
584 "digest": digest_str
,
585 "size": chunk_info
.chunk_len
,
586 "encoded-size": chunk_data
.len(),
589 let ct
= "application/octet-stream";
590 let request
= H2Client
::request_builder("localhost", "POST", &upload_chunk_path
, Some(param
), Some(ct
)).unwrap();
591 let upload_data
= Some(bytes
::Bytes
::from(chunk_data
));
593 let new_info
= MergedChunkInfo
::Known(vec
![(offset
, digest
)]);
595 let mut upload_queue
= upload_queue
.clone();
596 future
::Either
::Left(h2
597 .send_request(request
, upload_data
)
598 .and_then(move |response
| async
move {
600 .send((new_info
, Some(response
)))
602 .map_err(|err
| format_err
!("failed to send to upload queue: {}", err
))
606 let mut upload_queue
= upload_queue
.clone();
607 future
::Either
::Right(async
move {
609 .send((merged_chunk_info
, None
))
611 .map_err(|err
| format_err
!("failed to send to upload queue: {}", err
))
615 .then(move |result
| async
move {
616 upload_result
.await?
.and(result
)
619 let duration
= start_time
.elapsed();
620 let total_chunks
= total_chunks2
.load(Ordering
::SeqCst
);
621 let known_chunk_count
= known_chunk_count2
.load(Ordering
::SeqCst
);
622 let stream_len
= stream_len2
.load(Ordering
::SeqCst
);
623 let reused_len
= reused_len2
.load(Ordering
::SeqCst
);
625 let mut guard
= index_csum_2
.lock().unwrap();
626 let csum
= guard
.take().unwrap().finish();
628 futures
::future
::ok((total_chunks
, known_chunk_count
, stream_len
, reused_len
, duration
, csum
))
632 /// Upload speed test - prints result ot stderr
633 pub async
fn upload_speedtest(&self, verbose
: bool
) -> Result
<f64, Error
> {
635 let mut data
= vec
![];
636 // generate pseudo random byte sequence
637 for i
in 0..1024*1024 {
639 let byte
= ((i
>> (j
<<3))&0xff) as u8;
644 let item_len
= data
.len();
648 let (upload_queue
, upload_result
) = Self::response_queue(verbose
);
650 let start_time
= std
::time
::Instant
::now();
654 if start_time
.elapsed().as_secs() >= 5 {
658 let mut upload_queue
= upload_queue
.clone();
660 if verbose { eprintln!("send test data ({} bytes
)", data.len()); }
661 let request = H2Client::request_builder("localhost
", "POST
", "speedtest
", None, None).unwrap();
662 let request_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?;
664 upload_queue.send(request_future).await?;
667 drop(upload_queue); // close queue
669 let _ = upload_result.await?;
671 eprintln!("Uploaded {} chunks
in {} seconds
.", repeat, start_time.elapsed().as_secs());
672 let speed = ((item_len*(repeat as usize)) as f64)/start_time.elapsed().as_secs_f64();
673 eprintln!("Time per request
: {} microseconds
.", (start_time.elapsed().as_micros())/(repeat as u128));