1 use std
::collections
::HashSet
;
2 use std
::os
::unix
::fs
::OpenOptionsExt
;
3 use std
::sync
::atomic
::{AtomicUsize, Ordering}
;
4 use std
::sync
::{Arc, Mutex}
;
6 use anyhow
::{bail, format_err, Error}
;
7 use chrono
::{DateTime, Utc}
;
9 use futures
::stream
::Stream
;
10 use futures
::future
::AbortHandle
;
11 use serde_json
::{json, Value}
;
12 use tokio
::io
::AsyncReadExt
;
13 use tokio
::sync
::{mpsc, oneshot}
;
15 use proxmox
::tools
::digest_to_hex
;
17 use super::merge_known_chunks
::{MergedChunkInfo, MergeKnownChunks}
;
20 use super::{HttpClient, H2Client}
;
22 pub struct BackupWriter
{
26 crypt_config
: Option
<Arc
<CryptConfig
>>,
29 impl Drop
for BackupWriter
{
36 pub struct BackupStats
{
43 fn new(h2
: H2Client
, abort
: AbortHandle
, crypt_config
: Option
<Arc
<CryptConfig
>>, verbose
: bool
) -> Arc
<Self> {
44 Arc
::new(Self { h2, abort, crypt_config, verbose }
)
49 crypt_config
: Option
<Arc
<CryptConfig
>>,
53 backup_time
: DateTime
<Utc
>,
55 ) -> Result
<Arc
<BackupWriter
>, Error
> {
58 "backup-type": backup_type
,
59 "backup-id": backup_id
,
60 "backup-time": backup_time
.timestamp(),
65 let req
= HttpClient
::request_builder(
66 client
.server(), "GET", "/api2/json/backup", Some(param
)).unwrap();
68 let (h2
, abort
) = client
.start_h2_connection(req
, String
::from(PROXMOX_BACKUP_PROTOCOL_ID_V1
!())).await?
;
70 Ok(BackupWriter
::new(h2
, abort
, crypt_config
, debug
))
77 ) -> Result
<Value
, Error
> {
78 self.h2
.get(path
, param
).await
85 ) -> Result
<Value
, Error
> {
86 self.h2
.put(path
, param
).await
93 ) -> Result
<Value
, Error
> {
94 self.h2
.post(path
, param
).await
97 pub async
fn upload_post(
100 param
: Option
<Value
>,
103 ) -> Result
<Value
, Error
> {
104 self.h2
.upload("POST", path
, param
, content_type
, data
).await
107 pub async
fn send_upload_request(
111 param
: Option
<Value
>,
114 ) -> Result
<h2
::client
::ResponseFuture
, Error
> {
116 let request
= H2Client
::request_builder("localhost", method
, path
, param
, Some(content_type
)).unwrap();
117 let response_future
= self.h2
.send_request(request
, Some(bytes
::Bytes
::from(data
.clone()))).await?
;
121 pub async
fn upload_put(
124 param
: Option
<Value
>,
127 ) -> Result
<Value
, Error
> {
128 self.h2
.upload("PUT", path
, param
, content_type
, data
).await
131 pub async
fn finish(self: Arc
<Self>) -> Result
<(), Error
> {
132 let h2
= self.h2
.clone();
134 h2
.post("finish", None
)
141 pub fn cancel(&self) {
145 pub async
fn upload_blob
<R
: std
::io
::Read
>(
149 ) -> Result
<BackupStats
, Error
> {
150 let mut raw_data
= Vec
::new();
151 // fixme: avoid loading into memory
152 reader
.read_to_end(&mut raw_data
)?
;
154 let csum
= openssl
::sha
::sha256(&raw_data
);
155 let param
= json
!({"encoded-size": raw_data.len(), "file-name": file_name }
);
156 let size
= raw_data
.len() as u64;
157 let _value
= self.h2
.upload("POST", "blob", Some(param
), "application/octet-stream", raw_data
).await?
;
158 Ok(BackupStats { size, csum }
)
161 pub async
fn upload_blob_from_data(
167 ) -> Result
<BackupStats
, Error
> {
168 let blob
= match (encrypt
, &self.crypt_config
) {
169 (false, _
) => DataBlob
::encode(&data
, None
, compress
)?
,
170 (true, None
) => bail
!("requested encryption without a crypt config"),
171 (true, Some(crypt_config
)) => DataBlob
::encode(&data
, Some(crypt_config
), compress
)?
,
174 let raw_data
= blob
.into_inner();
175 let size
= raw_data
.len() as u64;
177 let csum
= openssl
::sha
::sha256(&raw_data
);
178 let param
= json
!({"encoded-size": size, "file-name": file_name }
);
179 let _value
= self.h2
.upload("POST", "blob", Some(param
), "application/octet-stream", raw_data
).await?
;
180 Ok(BackupStats { size, csum }
)
183 pub async
fn upload_blob_from_file
<P
: AsRef
<std
::path
::Path
>>(
189 ) -> Result
<BackupStats
, Error
> {
191 let src_path
= src_path
.as_ref();
193 let mut file
= tokio
::fs
::File
::open(src_path
)
195 .map_err(|err
| format_err
!("unable to open file {:?} - {}", src_path
, err
))?
;
197 let mut contents
= Vec
::new();
199 file
.read_to_end(&mut contents
)
201 .map_err(|err
| format_err
!("unable to read file {:?} - {}", src_path
, err
))?
;
203 self.upload_blob_from_data(contents
, file_name
, compress
, encrypt
).await
206 pub async
fn upload_stream(
208 previous_manifest
: Option
<Arc
<BackupManifest
>>,
210 stream
: impl Stream
<Item
= Result
<bytes
::BytesMut
, Error
>>,
212 fixed_size
: Option
<u64>,
215 ) -> Result
<BackupStats
, Error
> {
216 let known_chunks
= Arc
::new(Mutex
::new(HashSet
::new()));
218 let mut param
= json
!({ "archive-name": archive_name }
);
219 if let Some(size
) = fixed_size
{
220 param
["size"] = size
.into();
223 if encrypt
&& self.crypt_config
.is_none() {
224 bail
!("requested encryption without a crypt config");
227 let index_path
= format
!("{}_index", prefix
);
228 let close_path
= format
!("{}_close", prefix
);
230 if let Some(manifest
) = previous_manifest
{
231 // try, but ignore errors
232 match archive_type(archive_name
) {
233 Ok(ArchiveType
::FixedIndex
) => {
234 let _
= self.download_previous_fixed_index(archive_name
, &manifest
, known_chunks
.clone()).await
;
236 Ok(ArchiveType
::DynamicIndex
) => {
237 let _
= self.download_previous_dynamic_index(archive_name
, &manifest
, known_chunks
.clone()).await
;
239 _
=> { /* do nothing */ }
243 let wid
= self.h2
.post(&index_path
, Some(param
)).await?
.as_u64().unwrap();
245 let (chunk_count
, size
, duration
, speed
, csum
) =
246 Self::upload_chunk_info_stream(
251 known_chunks
.clone(),
252 if encrypt { self.crypt_config.clone() }
else { None }
,
258 println
!("{}: Uploaded {} bytes as {} chunks in {} seconds ({} MB/s).", archive_name
, size
, chunk_count
, duration
.as_secs(), speed
);
260 println
!("{}: Average chunk size was {} bytes.", archive_name
, size
/chunk_count
);
261 println
!("{}: Time per request: {} microseconds.", archive_name
, (duration
.as_micros())/(chunk_count
as u128
));
266 "chunk-count": chunk_count
,
268 "csum": proxmox
::tools
::digest_to_hex(&csum
),
270 let _value
= self.h2
.post(&close_path
, Some(param
)).await?
;
277 fn response_queue(verbose
: bool
) -> (
278 mpsc
::Sender
<h2
::client
::ResponseFuture
>,
279 oneshot
::Receiver
<Result
<(), Error
>>
281 let (verify_queue_tx
, verify_queue_rx
) = mpsc
::channel(100);
282 let (verify_result_tx
, verify_result_rx
) = oneshot
::channel();
284 // FIXME: check if this works as expected as replacement for the combinator below?
285 // tokio::spawn(async move {
286 // let result: Result<(), Error> = (async move {
287 // while let Some(response) = verify_queue_rx.recv().await {
288 // match H2Client::h2api_response(response.await?).await {
289 // Ok(result) => println!("RESPONSE: {:?}", result),
290 // Err(err) => bail!("pipelined request failed: {}", err),
295 // let _ignore_closed_channel = verify_result_tx.send(result);
297 // old code for reference?
301 .try_for_each(move |response
: h2
::client
::ResponseFuture
| {
303 .map_err(Error
::from
)
304 .and_then(H2Client
::h2api_response
)
305 .map_ok(move |result
| if verbose { println!("RESPONSE: {:?}
", result) })
306 .map_err(|err| format_err!("pipelined request failed
: {}
", err))
309 let _ignore_closed_channel = verify_result_tx.send(result);
313 (verify_queue_tx, verify_result_rx)
316 fn append_chunk_queue(h2: H2Client, wid: u64, path: String, verbose: bool) -> (
317 mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>,
318 oneshot::Receiver<Result<(), Error>>,
320 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(64);
321 let (verify_result_tx, verify_result_rx) = oneshot::channel();
323 let h2_2 = h2.clone();
325 // FIXME: async-block-ify this code!
329 .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| {
330 match (response, merged_chunk_info) {
331 (Some(response), MergedChunkInfo::Known(list)) => {
332 future::Either::Left(
334 .map_err(Error::from)
335 .and_then(H2Client::h2api_response)
336 .and_then(move |_result| {
337 future::ok(MergedChunkInfo::Known(list))
341 (None, MergedChunkInfo::Known(list)) => {
342 future::Either::Right(future::ok(MergedChunkInfo::Known(list)))
347 .merge_known_chunks()
348 .and_then(move |merged_chunk_info| {
349 match merged_chunk_info {
350 MergedChunkInfo::Known(chunk_list) => {
351 let mut digest_list = vec![];
352 let mut offset_list = vec![];
353 for (offset, digest) in chunk_list {
354 digest_list.push(digest_to_hex(&digest));
355 offset_list.push(offset);
357 if verbose { println!("append chunks list len ({})", digest_list
.len()); }
358 let param
= json
!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list }
);
359 let request
= H2Client
::request_builder("localhost", "PUT", &path
, None
, Some("application/json")).unwrap();
360 let param_data
= bytes
::Bytes
::from(param
.to_string().into_bytes());
361 let upload_data
= Some(param_data
);
362 h2_2
.send_request(request
, upload_data
)
363 .and_then(move |response
| {
365 .map_err(Error
::from
)
366 .and_then(H2Client
::h2api_response
)
369 .map_err(|err
| format_err
!("pipelined request failed: {}", err
))
374 .try_for_each(|_
| future
::ok(()))
376 let _ignore_closed_channel
= verify_result_tx
.send(result
);
380 (verify_queue_tx
, verify_result_rx
)
383 pub async
fn download_previous_fixed_index(
386 manifest
: &BackupManifest
,
387 known_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
388 ) -> Result
<FixedIndexReader
, Error
> {
390 let mut tmpfile
= std
::fs
::OpenOptions
::new()
393 .custom_flags(libc
::O_TMPFILE
)
396 let param
= json
!({ "archive-name": archive_name }
);
397 self.h2
.download("previous", Some(param
), &mut tmpfile
).await?
;
399 let index
= FixedIndexReader
::new(tmpfile
)
400 .map_err(|err
| format_err
!("unable to read fixed index '{}' - {}", archive_name
, err
))?
;
401 // Note: do not use values stored in index (not trusted) - instead, computed them again
402 let (csum
, size
) = index
.compute_csum();
403 manifest
.verify_file(archive_name
, &csum
, size
)?
;
405 // add index chunks to known chunks
406 let mut known_chunks
= known_chunks
.lock().unwrap();
407 for i
in 0..index
.index_count() {
408 known_chunks
.insert(*index
.index_digest(i
).unwrap());
412 println
!("{}: known chunks list length is {}", archive_name
, index
.index_count());
418 pub async
fn download_previous_dynamic_index(
421 manifest
: &BackupManifest
,
422 known_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
423 ) -> Result
<DynamicIndexReader
, Error
> {
425 let mut tmpfile
= std
::fs
::OpenOptions
::new()
428 .custom_flags(libc
::O_TMPFILE
)
431 let param
= json
!({ "archive-name": archive_name }
);
432 self.h2
.download("previous", Some(param
), &mut tmpfile
).await?
;
434 let index
= DynamicIndexReader
::new(tmpfile
)
435 .map_err(|err
| format_err
!("unable to read dynmamic index '{}' - {}", archive_name
, err
))?
;
436 // Note: do not use values stored in index (not trusted) - instead, computed them again
437 let (csum
, size
) = index
.compute_csum();
438 manifest
.verify_file(archive_name
, &csum
, size
)?
;
440 // add index chunks to known chunks
441 let mut known_chunks
= known_chunks
.lock().unwrap();
442 for i
in 0..index
.index_count() {
443 known_chunks
.insert(*index
.index_digest(i
).unwrap());
447 println
!("{}: known chunks list length is {}", archive_name
, index
.index_count());
453 /// Download backup manifest (index.json) of last backup
454 pub async
fn download_previous_manifest(&self) -> Result
<BackupManifest
, Error
> {
456 let mut raw_data
= Vec
::with_capacity(64 * 1024);
458 let param
= json
!({ "archive-name": MANIFEST_BLOB_NAME }
);
459 self.h2
.download("previous", Some(param
), &mut raw_data
).await?
;
461 let blob
= DataBlob
::from_raw(raw_data
)?
;
463 let data
= blob
.decode(self.crypt_config
.as_ref().map(Arc
::as_ref
))?
;
465 let manifest
= BackupManifest
::from_data(&data
[..], self.crypt_config
.as_ref().map(Arc
::as_ref
))?
;
470 fn upload_chunk_info_stream(
473 stream
: impl Stream
<Item
= Result
<bytes
::BytesMut
, Error
>>,
475 known_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
476 crypt_config
: Option
<Arc
<CryptConfig
>>,
479 ) -> impl Future
<Output
= Result
<(usize, usize, std
::time
::Duration
, usize, [u8; 32]), Error
>> {
481 let repeat
= Arc
::new(AtomicUsize
::new(0));
482 let repeat2
= repeat
.clone();
484 let stream_len
= Arc
::new(AtomicUsize
::new(0));
485 let stream_len2
= stream_len
.clone();
487 let append_chunk_path
= format
!("{}_index", prefix
);
488 let upload_chunk_path
= format
!("{}_chunk", prefix
);
489 let is_fixed_chunk_size
= prefix
== "fixed";
491 let (upload_queue
, upload_result
) =
492 Self::append_chunk_queue(h2
.clone(), wid
, append_chunk_path
.to_owned(), verbose
);
494 let start_time
= std
::time
::Instant
::now();
496 let index_csum
= Arc
::new(Mutex
::new(Some(openssl
::sha
::Sha256
::new())));
497 let index_csum_2
= index_csum
.clone();
500 .and_then(move |data
| {
502 let chunk_len
= data
.len();
504 repeat
.fetch_add(1, Ordering
::SeqCst
);
505 let offset
= stream_len
.fetch_add(chunk_len
, Ordering
::SeqCst
) as u64;
507 let mut chunk_builder
= DataChunkBuilder
::new(data
.as_ref())
510 if let Some(ref crypt_config
) = crypt_config
{
511 chunk_builder
= chunk_builder
.crypt_config(crypt_config
);
514 let mut known_chunks
= known_chunks
.lock().unwrap();
515 let digest
= chunk_builder
.digest();
517 let mut guard
= index_csum
.lock().unwrap();
518 let csum
= guard
.as_mut().unwrap();
520 let chunk_end
= offset
+ chunk_len
as u64;
522 if !is_fixed_chunk_size { csum.update(&chunk_end.to_le_bytes()); }
525 let chunk_is_known
= known_chunks
.contains(digest
);
527 future
::ok(MergedChunkInfo
::Known(vec
![(offset
, *digest
)]))
529 known_chunks
.insert(*digest
);
530 future
::ready(chunk_builder
532 .map(move |(chunk
, digest
)| MergedChunkInfo
::New(ChunkInfo
{
535 chunk_len
: chunk_len
as u64,
541 .merge_known_chunks()
542 .try_for_each(move |merged_chunk_info
| {
544 if let MergedChunkInfo
::New(chunk_info
) = merged_chunk_info
{
545 let offset
= chunk_info
.offset
;
546 let digest
= chunk_info
.digest
;
547 let digest_str
= digest_to_hex(&digest
);
550 println
!("upload new chunk {} ({} bytes, offset {})", digest_str
,
551 chunk_info
.chunk_len
, offset
);
554 let chunk_data
= chunk_info
.chunk
.into_inner();
557 "digest": digest_str
,
558 "size": chunk_info
.chunk_len
,
559 "encoded-size": chunk_data
.len(),
562 let ct
= "application/octet-stream";
563 let request
= H2Client
::request_builder("localhost", "POST", &upload_chunk_path
, Some(param
), Some(ct
)).unwrap();
564 let upload_data
= Some(bytes
::Bytes
::from(chunk_data
));
566 let new_info
= MergedChunkInfo
::Known(vec
![(offset
, digest
)]);
568 let mut upload_queue
= upload_queue
.clone();
569 future
::Either
::Left(h2
570 .send_request(request
, upload_data
)
571 .and_then(move |response
| async
move {
573 .send((new_info
, Some(response
)))
575 .map_err(|err
| format_err
!("failed to send to upload queue: {}", err
))
579 let mut upload_queue
= upload_queue
.clone();
580 future
::Either
::Right(async
move {
582 .send((merged_chunk_info
, None
))
584 .map_err(|err
| format_err
!("failed to send to upload queue: {}", err
))
588 .then(move |result
| async
move {
589 upload_result
.await?
.and(result
)
592 let repeat
= repeat2
.load(Ordering
::SeqCst
);
593 let stream_len
= stream_len2
.load(Ordering
::SeqCst
);
594 let speed
= ((stream_len
*1_000_000)/(1024*1024))/(start_time
.elapsed().as_micros() as usize);
596 let mut guard
= index_csum_2
.lock().unwrap();
597 let csum
= guard
.take().unwrap().finish();
599 futures
::future
::ok((repeat
, stream_len
, start_time
.elapsed(), speed
, csum
))
603 /// Upload speed test - prints result ot stdout
604 pub async
fn upload_speedtest(&self, verbose
: bool
) -> Result
<usize, Error
> {
606 let mut data
= vec
![];
607 // generate pseudo random byte sequence
608 for i
in 0..1024*1024 {
610 let byte
= ((i
>> (j
<<3))&0xff) as u8;
615 let item_len
= data
.len();
619 let (upload_queue
, upload_result
) = Self::response_queue(verbose
);
621 let start_time
= std
::time
::Instant
::now();
625 if start_time
.elapsed().as_secs() >= 5 {
629 let mut upload_queue
= upload_queue
.clone();
631 if verbose { println!("send test data ({} bytes
)", data.len()); }
632 let request = H2Client::request_builder("localhost
", "POST
", "speedtest
", None, None).unwrap();
633 let request_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?;
635 upload_queue.send(request_future).await?;
638 drop(upload_queue); // close queue
640 let _ = upload_result.await?;
642 println!("Uploaded {} chunks
in {} seconds
.", repeat, start_time.elapsed().as_secs());
643 let speed = ((item_len*1_000_000*(repeat as usize))/(1024*1024))/(start_time.elapsed().as_micros() as usize);
644 println!("Time per request
: {} microseconds
.", (start_time.elapsed().as_micros())/(repeat as u128));