1 use std
::collections
::HashSet
;
2 use std
::os
::unix
::fs
::OpenOptionsExt
;
3 use std
::sync
::atomic
::{AtomicUsize, Ordering}
;
4 use std
::sync
::{Arc, Mutex}
;
6 use anyhow
::{bail, format_err, Error}
;
7 use chrono
::{DateTime, Utc}
;
9 use futures
::stream
::Stream
;
10 use futures
::future
::AbortHandle
;
11 use serde_json
::{json, Value}
;
12 use tokio
::io
::AsyncReadExt
;
13 use tokio
::sync
::{mpsc, oneshot}
;
15 use proxmox
::tools
::digest_to_hex
;
17 use super::merge_known_chunks
::{MergedChunkInfo, MergeKnownChunks}
;
20 use super::{HttpClient, H2Client}
;
22 pub struct BackupWriter
{
26 crypt_config
: Option
<Arc
<CryptConfig
>>,
29 impl Drop
for BackupWriter
{
36 pub struct BackupStats
{
43 fn new(h2
: H2Client
, abort
: AbortHandle
, crypt_config
: Option
<Arc
<CryptConfig
>>, verbose
: bool
) -> Arc
<Self> {
44 Arc
::new(Self { h2, abort, crypt_config, verbose }
)
49 crypt_config
: Option
<Arc
<CryptConfig
>>,
53 backup_time
: DateTime
<Utc
>,
55 ) -> Result
<Arc
<BackupWriter
>, Error
> {
58 "backup-type": backup_type
,
59 "backup-id": backup_id
,
60 "backup-time": backup_time
.timestamp(),
65 let req
= HttpClient
::request_builder(
66 client
.server(), "GET", "/api2/json/backup", Some(param
)).unwrap();
68 let (h2
, abort
) = client
.start_h2_connection(req
, String
::from(PROXMOX_BACKUP_PROTOCOL_ID_V1
!())).await?
;
70 Ok(BackupWriter
::new(h2
, abort
, crypt_config
, debug
))
77 ) -> Result
<Value
, Error
> {
78 self.h2
.get(path
, param
).await
85 ) -> Result
<Value
, Error
> {
86 self.h2
.put(path
, param
).await
93 ) -> Result
<Value
, Error
> {
94 self.h2
.post(path
, param
).await
97 pub async
fn upload_post(
100 param
: Option
<Value
>,
103 ) -> Result
<Value
, Error
> {
104 self.h2
.upload("POST", path
, param
, content_type
, data
).await
107 pub async
fn send_upload_request(
111 param
: Option
<Value
>,
114 ) -> Result
<h2
::client
::ResponseFuture
, Error
> {
116 let request
= H2Client
::request_builder("localhost", method
, path
, param
, Some(content_type
)).unwrap();
117 let response_future
= self.h2
.send_request(request
, Some(bytes
::Bytes
::from(data
.clone()))).await?
;
121 pub async
fn upload_put(
124 param
: Option
<Value
>,
127 ) -> Result
<Value
, Error
> {
128 self.h2
.upload("PUT", path
, param
, content_type
, data
).await
131 pub async
fn finish(self: Arc
<Self>) -> Result
<(), Error
> {
132 let h2
= self.h2
.clone();
134 h2
.post("finish", None
)
141 pub fn cancel(&self) {
145 pub async
fn upload_blob
<R
: std
::io
::Read
>(
149 ) -> Result
<BackupStats
, Error
> {
150 let mut raw_data
= Vec
::new();
151 // fixme: avoid loading into memory
152 reader
.read_to_end(&mut raw_data
)?
;
154 let csum
= openssl
::sha
::sha256(&raw_data
);
155 let param
= json
!({"encoded-size": raw_data.len(), "file-name": file_name }
);
156 let size
= raw_data
.len() as u64;
157 let _value
= self.h2
.upload("POST", "blob", Some(param
), "application/octet-stream", raw_data
).await?
;
158 Ok(BackupStats { size, csum }
)
161 pub async
fn upload_blob_from_data(
166 crypt_mode
: CryptMode
,
167 ) -> Result
<BackupStats
, Error
> {
168 let blob
= match (crypt_mode
, &self.crypt_config
) {
169 (CryptMode
::None
, _
) => DataBlob
::encode(&data
, None
, compress
)?
,
170 (_
, None
) => bail
!("requested encryption/signing without a crypt config"),
171 (CryptMode
::Encrypt
, Some(crypt_config
)) => {
172 DataBlob
::encode(&data
, Some(crypt_config
), compress
)?
174 (CryptMode
::SignOnly
, Some(crypt_config
)) => {
175 DataBlob
::create_signed(&data
, crypt_config
, compress
)?
179 let raw_data
= blob
.into_inner();
180 let size
= raw_data
.len() as u64;
182 let csum
= openssl
::sha
::sha256(&raw_data
);
183 let param
= json
!({"encoded-size": size, "file-name": file_name }
);
184 let _value
= self.h2
.upload("POST", "blob", Some(param
), "application/octet-stream", raw_data
).await?
;
185 Ok(BackupStats { size, csum }
)
188 pub async
fn upload_blob_from_file
<P
: AsRef
<std
::path
::Path
>>(
193 crypt_mode
: CryptMode
,
194 ) -> Result
<BackupStats
, Error
> {
196 let src_path
= src_path
.as_ref();
198 let mut file
= tokio
::fs
::File
::open(src_path
)
200 .map_err(|err
| format_err
!("unable to open file {:?} - {}", src_path
, err
))?
;
202 let mut contents
= Vec
::new();
204 file
.read_to_end(&mut contents
)
206 .map_err(|err
| format_err
!("unable to read file {:?} - {}", src_path
, err
))?
;
208 self.upload_blob_from_data(contents
, file_name
, compress
, crypt_mode
).await
211 pub async
fn upload_stream(
213 crypt_mode
: CryptMode
,
214 previous_manifest
: Option
<Arc
<BackupManifest
>>,
216 stream
: impl Stream
<Item
= Result
<bytes
::BytesMut
, Error
>>,
218 fixed_size
: Option
<u64>,
219 ) -> Result
<BackupStats
, Error
> {
220 let known_chunks
= Arc
::new(Mutex
::new(HashSet
::new()));
222 let mut param
= json
!({ "archive-name": archive_name }
);
223 if let Some(size
) = fixed_size
{
224 param
["size"] = size
.into();
227 let index_path
= format
!("{}_index", prefix
);
228 let close_path
= format
!("{}_close", prefix
);
230 if let Some(manifest
) = previous_manifest
{
231 // try, but ignore errors
232 match archive_type(archive_name
) {
233 Ok(ArchiveType
::FixedIndex
) => {
234 let _
= self.download_previous_fixed_index(archive_name
, &manifest
, known_chunks
.clone()).await
;
236 Ok(ArchiveType
::DynamicIndex
) => {
237 let _
= self.download_previous_dynamic_index(archive_name
, &manifest
, known_chunks
.clone()).await
;
239 _
=> { /* do nothing */ }
243 let wid
= self.h2
.post(&index_path
, Some(param
)).await?
.as_u64().unwrap();
245 let (chunk_count
, size
, duration
, speed
, csum
) =
246 Self::upload_chunk_info_stream(
251 known_chunks
.clone(),
252 self.crypt_config
.clone(),
258 println
!("{}: Uploaded {} bytes as {} chunks in {} seconds ({} MB/s).", archive_name
, size
, chunk_count
, duration
.as_secs(), speed
);
260 println
!("{}: Average chunk size was {} bytes.", archive_name
, size
/chunk_count
);
261 println
!("{}: Time per request: {} microseconds.", archive_name
, (duration
.as_micros())/(chunk_count
as u128
));
266 "chunk-count": chunk_count
,
268 "csum": proxmox
::tools
::digest_to_hex(&csum
),
270 let _value
= self.h2
.post(&close_path
, Some(param
)).await?
;
277 fn response_queue() -> (
278 mpsc
::Sender
<h2
::client
::ResponseFuture
>,
279 oneshot
::Receiver
<Result
<(), Error
>>
281 let (verify_queue_tx
, verify_queue_rx
) = mpsc
::channel(100);
282 let (verify_result_tx
, verify_result_rx
) = oneshot
::channel();
284 // FIXME: check if this works as expected as replacement for the combinator below?
285 // tokio::spawn(async move {
286 // let result: Result<(), Error> = (async move {
287 // while let Some(response) = verify_queue_rx.recv().await {
288 // match H2Client::h2api_response(response.await?).await {
289 // Ok(result) => println!("RESPONSE: {:?}", result),
290 // Err(err) => bail!("pipelined request failed: {}", err),
295 // let _ignore_closed_channel = verify_result_tx.send(result);
297 // old code for reference?
301 .try_for_each(|response
: h2
::client
::ResponseFuture
| {
303 .map_err(Error
::from
)
304 .and_then(H2Client
::h2api_response
)
305 .map_ok(|result
| println
!("RESPONSE: {:?}", result
))
306 .map_err(|err
| format_err
!("pipelined request failed: {}", err
))
309 let _ignore_closed_channel
= verify_result_tx
.send(result
);
313 (verify_queue_tx
, verify_result_rx
)
316 fn append_chunk_queue(h2
: H2Client
, wid
: u64, path
: String
, verbose
: bool
) -> (
317 mpsc
::Sender
<(MergedChunkInfo
, Option
<h2
::client
::ResponseFuture
>)>,
318 oneshot
::Receiver
<Result
<(), Error
>>,
320 let (verify_queue_tx
, verify_queue_rx
) = mpsc
::channel(64);
321 let (verify_result_tx
, verify_result_rx
) = oneshot
::channel();
323 let h2_2
= h2
.clone();
325 // FIXME: async-block-ify this code!
329 .and_then(move |(merged_chunk_info
, response
): (MergedChunkInfo
, Option
<h2
::client
::ResponseFuture
>)| {
330 match (response
, merged_chunk_info
) {
331 (Some(response
), MergedChunkInfo
::Known(list
)) => {
332 future
::Either
::Left(
334 .map_err(Error
::from
)
335 .and_then(H2Client
::h2api_response
)
336 .and_then(move |_result
| {
337 future
::ok(MergedChunkInfo
::Known(list
))
341 (None
, MergedChunkInfo
::Known(list
)) => {
342 future
::Either
::Right(future
::ok(MergedChunkInfo
::Known(list
)))
347 .merge_known_chunks()
348 .and_then(move |merged_chunk_info
| {
349 match merged_chunk_info
{
350 MergedChunkInfo
::Known(chunk_list
) => {
351 let mut digest_list
= vec
![];
352 let mut offset_list
= vec
![];
353 for (offset
, digest
) in chunk_list
{
354 digest_list
.push(digest_to_hex(&digest
));
355 offset_list
.push(offset
);
357 if verbose { println!("append chunks list len ({}
)", digest_list.len()); }
358 let param = json!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list });
359 let request = H2Client::request_builder("localhost
", "PUT
", &path, None, Some("application
/json
")).unwrap();
360 let param_data = bytes::Bytes::from(param.to_string().into_bytes());
361 let upload_data = Some(param_data);
362 h2_2.send_request(request, upload_data)
363 .and_then(move |response| {
365 .map_err(Error::from)
366 .and_then(H2Client::h2api_response)
369 .map_err(|err| format_err!("pipelined request failed
: {}
", err))
374 .try_for_each(|_| future::ok(()))
376 let _ignore_closed_channel = verify_result_tx.send(result);
380 (verify_queue_tx, verify_result_rx)
383 pub async fn download_previous_fixed_index(
386 manifest: &BackupManifest,
387 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
388 ) -> Result<FixedIndexReader, Error> {
390 let mut tmpfile = std::fs::OpenOptions::new()
393 .custom_flags(libc::O_TMPFILE)
396 let param = json!({ "archive-name": archive_name });
397 self.h2.download("previous
", Some(param), &mut tmpfile).await?;
399 let index = FixedIndexReader::new(tmpfile)
400 .map_err(|err| format_err!("unable to read fixed index '{}'
- {}
", archive_name, err))?;
401 // Note: do not use values stored in index (not trusted) - instead, computed them again
402 let (csum, size) = index.compute_csum();
403 manifest.verify_file(archive_name, &csum, size)?;
405 // add index chunks to known chunks
406 let mut known_chunks = known_chunks.lock().unwrap();
407 for i in 0..index.index_count() {
408 known_chunks.insert(*index.index_digest(i).unwrap());
412 println!("{}
: known chunks list length is {}
", archive_name, index.index_count());
418 pub async fn download_previous_dynamic_index(
421 manifest: &BackupManifest,
422 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
423 ) -> Result<DynamicIndexReader, Error> {
425 let mut tmpfile = std::fs::OpenOptions::new()
428 .custom_flags(libc::O_TMPFILE)
431 let param = json!({ "archive-name": archive_name });
432 self.h2.download("previous
", Some(param), &mut tmpfile).await?;
434 let index = DynamicIndexReader::new(tmpfile)
435 .map_err(|err| format_err!("unable to read dynmamic index '{}'
- {}
", archive_name, err))?;
436 // Note: do not use values stored in index (not trusted) - instead, computed them again
437 let (csum, size) = index.compute_csum();
438 manifest.verify_file(archive_name, &csum, size)?;
440 // add index chunks to known chunks
441 let mut known_chunks = known_chunks.lock().unwrap();
442 for i in 0..index.index_count() {
443 known_chunks.insert(*index.index_digest(i).unwrap());
447 println!("{}
: known chunks list length is {}
", archive_name, index.index_count());
453 /// Download backup manifest (index.json) of last backup
454 pub async fn download_previous_manifest(&self) -> Result<BackupManifest, Error> {
456 use std::convert::TryFrom;
458 let mut raw_data = Vec::with_capacity(64 * 1024);
460 let param = json!({ "archive-name": MANIFEST_BLOB_NAME });
461 self.h2.download("previous
", Some(param), &mut raw_data).await?;
463 let blob = DataBlob::from_raw(raw_data)?;
465 let data = blob.decode(self.crypt_config.as_ref().map(Arc::as_ref))?;
466 let json: Value = serde_json::from_slice(&data[..])?;
467 let manifest = BackupManifest::try_from(json)?;
472 fn upload_chunk_info_stream(
475 stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
477 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
478 crypt_config: Option<Arc<CryptConfig>>,
479 crypt_mode: CryptMode,
481 ) -> impl Future<Output = Result<(usize, usize, std::time::Duration, usize, [u8; 32]), Error>> {
483 let repeat = Arc::new(AtomicUsize::new(0));
484 let repeat2 = repeat.clone();
486 let stream_len = Arc::new(AtomicUsize::new(0));
487 let stream_len2 = stream_len.clone();
489 let append_chunk_path = format!("{}_index
", prefix);
490 let upload_chunk_path = format!("{}_chunk
", prefix);
491 let is_fixed_chunk_size = prefix == "fixed
";
493 let (upload_queue, upload_result) =
494 Self::append_chunk_queue(h2.clone(), wid, append_chunk_path.to_owned(), verbose);
496 let start_time = std::time::Instant::now();
498 let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
499 let index_csum_2 = index_csum.clone();
502 .and_then(move |data| {
504 let chunk_len = data.len();
506 repeat.fetch_add(1, Ordering::SeqCst);
507 let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;
509 let mut chunk_builder = DataChunkBuilder::new(data.as_ref())
512 if let Some(ref crypt_config) = crypt_config {
513 chunk_builder = chunk_builder.crypt_config(crypt_config, crypt_mode);
516 let mut known_chunks = known_chunks.lock().unwrap();
517 let digest = chunk_builder.digest();
519 let mut guard = index_csum.lock().unwrap();
520 let csum = guard.as_mut().unwrap();
522 let chunk_end = offset + chunk_len as u64;
524 if !is_fixed_chunk_size { csum.update(&chunk_end.to_le_bytes()); }
527 let chunk_is_known = known_chunks.contains(digest);
529 future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
531 known_chunks.insert(*digest);
532 future::ready(chunk_builder
534 .map(move |(chunk, digest)| MergedChunkInfo::New(ChunkInfo {
537 chunk_len: chunk_len as u64,
543 .merge_known_chunks()
544 .try_for_each(move |merged_chunk_info| {
546 if let MergedChunkInfo::New(chunk_info) = merged_chunk_info {
547 let offset = chunk_info.offset;
548 let digest = chunk_info.digest;
549 let digest_str = digest_to_hex(&digest);
552 println!("upload new chunk {}
({} bytes
, offset {}
)", digest_str,
553 chunk_info.chunk_len, offset);
556 let chunk_data = chunk_info.chunk.into_inner();
559 "digest
": digest_str,
560 "size
": chunk_info.chunk_len,
561 "encoded
-size
": chunk_data.len(),
564 let ct = "application
/octet
-stream
";
565 let request = H2Client::request_builder("localhost
", "POST
", &upload_chunk_path, Some(param), Some(ct)).unwrap();
566 let upload_data = Some(bytes::Bytes::from(chunk_data));
568 let new_info = MergedChunkInfo::Known(vec![(offset, digest)]);
570 let mut upload_queue = upload_queue.clone();
571 future::Either::Left(h2
572 .send_request(request, upload_data)
573 .and_then(move |response| async move {
575 .send((new_info, Some(response)))
577 .map_err(|err| format_err!("failed to send to upload queue
: {}
", err))
581 let mut upload_queue = upload_queue.clone();
582 future::Either::Right(async move {
584 .send((merged_chunk_info, None))
586 .map_err(|err| format_err!("failed to send to upload queue
: {}
", err))
590 .then(move |result| async move {
591 upload_result.await?.and(result)
594 let repeat = repeat2.load(Ordering::SeqCst);
595 let stream_len = stream_len2.load(Ordering::SeqCst);
596 let speed = ((stream_len*1_000_000)/(1024*1024))/(start_time.elapsed().as_micros() as usize);
598 let mut guard = index_csum_2.lock().unwrap();
599 let csum = guard.take().unwrap().finish();
601 futures::future::ok((repeat, stream_len, start_time.elapsed(), speed, csum))
605 pub async fn upload_speedtest(&self) -> Result<usize, Error> {
607 let mut data = vec![];
608 // generate pseudo random byte sequence
609 for i in 0..1024*1024 {
611 let byte = ((i >> (j<<3))&0xff) as u8;
616 let item_len = data.len();
620 let (upload_queue, upload_result) = Self::response_queue();
622 let start_time = std::time::Instant::now();
626 if start_time.elapsed().as_secs() >= 5 {
630 let mut upload_queue = upload_queue.clone();
632 println!("send test
data ({} bytes
)", data.len());
633 let request = H2Client::request_builder("localhost
", "POST
", "speedtest
", None, None).unwrap();
634 let request_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?;
636 upload_queue.send(request_future).await?;
639 drop(upload_queue); // close queue
641 let _ = upload_result.await?;
643 println!("Uploaded {} chunks
in {} seconds
.", repeat, start_time.elapsed().as_secs());
644 let speed = ((item_len*1_000_000*(repeat as usize))/(1024*1024))/(start_time.elapsed().as_micros() as usize);
645 println!("Time per request
: {} microseconds
.", (start_time.elapsed().as_micros())/(repeat as u128));