1 use std
::collections
::HashSet
;
2 use std
::os
::unix
::fs
::OpenOptionsExt
;
3 use std
::sync
::atomic
::{AtomicUsize, Ordering}
;
4 use std
::sync
::{Arc, Mutex}
;
6 use anyhow
::{format_err, Error}
;
7 use chrono
::{DateTime, Utc}
;
9 use futures
::stream
::Stream
;
10 use futures
::future
::AbortHandle
;
11 use serde_json
::{json, Value}
;
12 use tokio
::io
::AsyncReadExt
;
13 use tokio
::sync
::{mpsc, oneshot}
;
15 use proxmox
::tools
::digest_to_hex
;
17 use super::merge_known_chunks
::{MergedChunkInfo, MergeKnownChunks}
;
20 use super::{HttpClient, H2Client}
;
22 pub struct BackupWriter
{
26 crypt_config
: Option
<Arc
<CryptConfig
>>,
29 impl Drop
for BackupWriter
{
36 pub struct BackupStats
{
43 fn new(h2
: H2Client
, abort
: AbortHandle
, crypt_config
: Option
<Arc
<CryptConfig
>>, verbose
: bool
) -> Arc
<Self> {
44 Arc
::new(Self { h2, abort, crypt_config, verbose }
)
49 crypt_config
: Option
<Arc
<CryptConfig
>>,
53 backup_time
: DateTime
<Utc
>,
55 ) -> Result
<Arc
<BackupWriter
>, Error
> {
58 "backup-type": backup_type
,
59 "backup-id": backup_id
,
60 "backup-time": backup_time
.timestamp(),
65 let req
= HttpClient
::request_builder(
66 client
.server(), "GET", "/api2/json/backup", Some(param
)).unwrap();
68 let (h2
, abort
) = client
.start_h2_connection(req
, String
::from(PROXMOX_BACKUP_PROTOCOL_ID_V1
!())).await?
;
70 Ok(BackupWriter
::new(h2
, abort
, crypt_config
, debug
))
77 ) -> Result
<Value
, Error
> {
78 self.h2
.get(path
, param
).await
85 ) -> Result
<Value
, Error
> {
86 self.h2
.put(path
, param
).await
93 ) -> Result
<Value
, Error
> {
94 self.h2
.post(path
, param
).await
97 pub async
fn upload_post(
100 param
: Option
<Value
>,
103 ) -> Result
<Value
, Error
> {
104 self.h2
.upload("POST", path
, param
, content_type
, data
).await
107 pub async
fn send_upload_request(
111 param
: Option
<Value
>,
114 ) -> Result
<h2
::client
::ResponseFuture
, Error
> {
116 let request
= H2Client
::request_builder("localhost", method
, path
, param
, Some(content_type
)).unwrap();
117 let response_future
= self.h2
.send_request(request
, Some(bytes
::Bytes
::from(data
.clone()))).await?
;
121 pub async
fn upload_put(
124 param
: Option
<Value
>,
127 ) -> Result
<Value
, Error
> {
128 self.h2
.upload("PUT", path
, param
, content_type
, data
).await
131 pub async
fn finish(self: Arc
<Self>) -> Result
<(), Error
> {
132 let h2
= self.h2
.clone();
134 h2
.post("finish", None
)
141 pub fn cancel(&self) {
145 pub async
fn upload_blob
<R
: std
::io
::Read
>(
149 ) -> Result
<BackupStats
, Error
> {
150 let mut raw_data
= Vec
::new();
151 // fixme: avoid loading into memory
152 reader
.read_to_end(&mut raw_data
)?
;
154 let csum
= openssl
::sha
::sha256(&raw_data
);
155 let param
= json
!({"encoded-size": raw_data.len(), "file-name": file_name }
);
156 let size
= raw_data
.len() as u64;
157 let _value
= self.h2
.upload("POST", "blob", Some(param
), "application/octet-stream", raw_data
).await?
;
158 Ok(BackupStats { size, csum }
)
161 pub async
fn upload_blob_from_data(
166 crypt_or_sign
: Option
<bool
>,
167 ) -> Result
<BackupStats
, Error
> {
169 let blob
= if let Some(ref crypt_config
) = self.crypt_config
{
170 if let Some(encrypt
) = crypt_or_sign
{
172 DataBlob
::encode(&data
, Some(crypt_config
), compress
)?
174 DataBlob
::create_signed(&data
, crypt_config
, compress
)?
177 DataBlob
::encode(&data
, None
, compress
)?
180 DataBlob
::encode(&data
, None
, compress
)?
183 let raw_data
= blob
.into_inner();
184 let size
= raw_data
.len() as u64;
186 let csum
= openssl
::sha
::sha256(&raw_data
);
187 let param
= json
!({"encoded-size": size, "file-name": file_name }
);
188 let _value
= self.h2
.upload("POST", "blob", Some(param
), "application/octet-stream", raw_data
).await?
;
189 Ok(BackupStats { size, csum }
)
192 pub async
fn upload_blob_from_file
<P
: AsRef
<std
::path
::Path
>>(
197 crypt_or_sign
: Option
<bool
>,
198 ) -> Result
<BackupStats
, Error
> {
200 let src_path
= src_path
.as_ref();
202 let mut file
= tokio
::fs
::File
::open(src_path
)
204 .map_err(|err
| format_err
!("unable to open file {:?} - {}", src_path
, err
))?
;
206 let mut contents
= Vec
::new();
208 file
.read_to_end(&mut contents
)
210 .map_err(|err
| format_err
!("unable to read file {:?} - {}", src_path
, err
))?
;
212 self.upload_blob_from_data(contents
, file_name
, compress
, crypt_or_sign
).await
215 pub async
fn upload_stream(
217 previous_manifest
: Option
<Arc
<BackupManifest
>>,
219 stream
: impl Stream
<Item
= Result
<bytes
::BytesMut
, Error
>>,
221 fixed_size
: Option
<u64>,
222 ) -> Result
<BackupStats
, Error
> {
223 let known_chunks
= Arc
::new(Mutex
::new(HashSet
::new()));
225 let mut param
= json
!({ "archive-name": archive_name }
);
226 if let Some(size
) = fixed_size
{
227 param
["size"] = size
.into();
230 let index_path
= format
!("{}_index", prefix
);
231 let close_path
= format
!("{}_close", prefix
);
233 if let Some(manifest
) = previous_manifest
{
234 // try, but ignore errors
235 match archive_type(archive_name
) {
236 Ok(ArchiveType
::FixedIndex
) => {
237 let _
= self.download_previous_fixed_index(archive_name
, &manifest
, known_chunks
.clone()).await
;
239 Ok(ArchiveType
::DynamicIndex
) => {
240 let _
= self.download_previous_dynamic_index(archive_name
, &manifest
, known_chunks
.clone()).await
;
242 _
=> { /* do nothing */ }
246 let wid
= self.h2
.post(&index_path
, Some(param
)).await?
.as_u64().unwrap();
248 let (chunk_count
, size
, duration
, speed
, csum
) =
249 Self::upload_chunk_info_stream(
254 known_chunks
.clone(),
255 self.crypt_config
.clone(),
260 println
!("{}: Uploaded {} bytes as {} chunks in {} seconds ({} MB/s).", archive_name
, size
, chunk_count
, duration
.as_secs(), speed
);
262 println
!("{}: Average chunk size was {} bytes.", archive_name
, size
/chunk_count
);
263 println
!("{}: Time per request: {} microseconds.", archive_name
, (duration
.as_micros())/(chunk_count
as u128
));
268 "chunk-count": chunk_count
,
270 "csum": proxmox
::tools
::digest_to_hex(&csum
),
272 let _value
= self.h2
.post(&close_path
, Some(param
)).await?
;
279 fn response_queue() -> (
280 mpsc
::Sender
<h2
::client
::ResponseFuture
>,
281 oneshot
::Receiver
<Result
<(), Error
>>
283 let (verify_queue_tx
, verify_queue_rx
) = mpsc
::channel(100);
284 let (verify_result_tx
, verify_result_rx
) = oneshot
::channel();
286 // FIXME: check if this works as expected as replacement for the combinator below?
287 // tokio::spawn(async move {
288 // let result: Result<(), Error> = (async move {
289 // while let Some(response) = verify_queue_rx.recv().await {
290 // match H2Client::h2api_response(response.await?).await {
291 // Ok(result) => println!("RESPONSE: {:?}", result),
292 // Err(err) => bail!("pipelined request failed: {}", err),
297 // let _ignore_closed_channel = verify_result_tx.send(result);
299 // old code for reference?
303 .try_for_each(|response
: h2
::client
::ResponseFuture
| {
305 .map_err(Error
::from
)
306 .and_then(H2Client
::h2api_response
)
307 .map_ok(|result
| println
!("RESPONSE: {:?}", result
))
308 .map_err(|err
| format_err
!("pipelined request failed: {}", err
))
311 let _ignore_closed_channel
= verify_result_tx
.send(result
);
315 (verify_queue_tx
, verify_result_rx
)
318 fn append_chunk_queue(h2
: H2Client
, wid
: u64, path
: String
, verbose
: bool
) -> (
319 mpsc
::Sender
<(MergedChunkInfo
, Option
<h2
::client
::ResponseFuture
>)>,
320 oneshot
::Receiver
<Result
<(), Error
>>,
322 let (verify_queue_tx
, verify_queue_rx
) = mpsc
::channel(64);
323 let (verify_result_tx
, verify_result_rx
) = oneshot
::channel();
325 let h2_2
= h2
.clone();
327 // FIXME: async-block-ify this code!
331 .and_then(move |(merged_chunk_info
, response
): (MergedChunkInfo
, Option
<h2
::client
::ResponseFuture
>)| {
332 match (response
, merged_chunk_info
) {
333 (Some(response
), MergedChunkInfo
::Known(list
)) => {
334 future
::Either
::Left(
336 .map_err(Error
::from
)
337 .and_then(H2Client
::h2api_response
)
338 .and_then(move |_result
| {
339 future
::ok(MergedChunkInfo
::Known(list
))
343 (None
, MergedChunkInfo
::Known(list
)) => {
344 future
::Either
::Right(future
::ok(MergedChunkInfo
::Known(list
)))
349 .merge_known_chunks()
350 .and_then(move |merged_chunk_info
| {
351 match merged_chunk_info
{
352 MergedChunkInfo
::Known(chunk_list
) => {
353 let mut digest_list
= vec
![];
354 let mut offset_list
= vec
![];
355 for (offset
, digest
) in chunk_list
{
356 digest_list
.push(digest_to_hex(&digest
));
357 offset_list
.push(offset
);
359 if verbose { println!("append chunks list len ({}
)", digest_list.len()); }
360 let param = json!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list });
361 let request = H2Client::request_builder("localhost
", "PUT
", &path, None, Some("application
/json
")).unwrap();
362 let param_data = bytes::Bytes::from(param.to_string().into_bytes());
363 let upload_data = Some(param_data);
364 h2_2.send_request(request, upload_data)
365 .and_then(move |response| {
367 .map_err(Error::from)
368 .and_then(H2Client::h2api_response)
371 .map_err(|err| format_err!("pipelined request failed
: {}
", err))
376 .try_for_each(|_| future::ok(()))
378 let _ignore_closed_channel = verify_result_tx.send(result);
382 (verify_queue_tx, verify_result_rx)
385 pub async fn download_previous_fixed_index(
388 manifest: &BackupManifest,
389 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
390 ) -> Result<FixedIndexReader, Error> {
392 let mut tmpfile = std::fs::OpenOptions::new()
395 .custom_flags(libc::O_TMPFILE)
398 let param = json!({ "archive-name": archive_name });
399 self.h2.download("previous
", Some(param), &mut tmpfile).await?;
401 let index = FixedIndexReader::new(tmpfile)
402 .map_err(|err| format_err!("unable to read fixed index '{}'
- {}
", archive_name, err))?;
403 // Note: do not use values stored in index (not trusted) - instead, computed them again
404 let (csum, size) = index.compute_csum();
405 manifest.verify_file(archive_name, &csum, size)?;
407 // add index chunks to known chunks
408 let mut known_chunks = known_chunks.lock().unwrap();
409 for i in 0..index.index_count() {
410 known_chunks.insert(*index.index_digest(i).unwrap());
414 println!("{}
: known chunks list length is {}
", archive_name, index.index_count());
420 pub async fn download_previous_dynamic_index(
423 manifest: &BackupManifest,
424 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
425 ) -> Result<DynamicIndexReader, Error> {
427 let mut tmpfile = std::fs::OpenOptions::new()
430 .custom_flags(libc::O_TMPFILE)
433 let param = json!({ "archive-name": archive_name });
434 self.h2.download("previous
", Some(param), &mut tmpfile).await?;
436 let index = DynamicIndexReader::new(tmpfile)
437 .map_err(|err| format_err!("unable to read fixed index '{}'
- {}
", archive_name, err))?;
438 // Note: do not use values stored in index (not trusted) - instead, computed them again
439 let (csum, size) = index.compute_csum();
440 manifest.verify_file(archive_name, &csum, size)?;
442 // add index chunks to known chunks
443 let mut known_chunks = known_chunks.lock().unwrap();
444 for i in 0..index.index_count() {
445 known_chunks.insert(*index.index_digest(i).unwrap());
449 println!("{}
: known chunks list length is {}
", archive_name, index.index_count());
455 /// Download backup manifest (index.json) of last backup
456 pub async fn download_previous_manifest(&self) -> Result<BackupManifest, Error> {
458 use std::convert::TryFrom;
460 let mut raw_data = Vec::with_capacity(64 * 1024);
462 let param = json!({ "archive-name": MANIFEST_BLOB_NAME });
463 self.h2.download("previous
", Some(param), &mut raw_data).await?;
465 let blob = DataBlob::from_raw(raw_data)?;
467 let data = blob.decode(self.crypt_config.as_ref().map(Arc::as_ref))?;
468 let json: Value = serde_json::from_slice(&data[..])?;
469 let manifest = BackupManifest::try_from(json)?;
474 fn upload_chunk_info_stream(
477 stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
479 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
480 crypt_config: Option<Arc<CryptConfig>>,
482 ) -> impl Future<Output = Result<(usize, usize, std::time::Duration, usize, [u8; 32]), Error>> {
484 let repeat = Arc::new(AtomicUsize::new(0));
485 let repeat2 = repeat.clone();
487 let stream_len = Arc::new(AtomicUsize::new(0));
488 let stream_len2 = stream_len.clone();
490 let append_chunk_path = format!("{}_index
", prefix);
491 let upload_chunk_path = format!("{}_chunk
", prefix);
492 let is_fixed_chunk_size = prefix == "fixed
";
494 let (upload_queue, upload_result) =
495 Self::append_chunk_queue(h2.clone(), wid, append_chunk_path.to_owned(), verbose);
497 let start_time = std::time::Instant::now();
499 let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
500 let index_csum_2 = index_csum.clone();
503 .and_then(move |data| {
505 let chunk_len = data.len();
507 repeat.fetch_add(1, Ordering::SeqCst);
508 let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;
510 let mut chunk_builder = DataChunkBuilder::new(data.as_ref())
513 if let Some(ref crypt_config) = crypt_config {
514 chunk_builder = chunk_builder.crypt_config(crypt_config);
517 let mut known_chunks = known_chunks.lock().unwrap();
518 let digest = chunk_builder.digest();
520 let mut guard = index_csum.lock().unwrap();
521 let csum = guard.as_mut().unwrap();
523 let chunk_end = offset + chunk_len as u64;
525 if !is_fixed_chunk_size { csum.update(&chunk_end.to_le_bytes()); }
528 let chunk_is_known = known_chunks.contains(digest);
530 future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
532 known_chunks.insert(*digest);
533 future::ready(chunk_builder
535 .map(move |(chunk, digest)| MergedChunkInfo::New(ChunkInfo {
538 chunk_len: chunk_len as u64,
544 .merge_known_chunks()
545 .try_for_each(move |merged_chunk_info| {
547 if let MergedChunkInfo::New(chunk_info) = merged_chunk_info {
548 let offset = chunk_info.offset;
549 let digest = chunk_info.digest;
550 let digest_str = digest_to_hex(&digest);
553 println!("upload new chunk {}
({} bytes
, offset {}
)", digest_str,
554 chunk_info.chunk_len, offset);
557 let chunk_data = chunk_info.chunk.into_inner();
560 "digest
": digest_str,
561 "size
": chunk_info.chunk_len,
562 "encoded
-size
": chunk_data.len(),
565 let ct = "application
/octet
-stream
";
566 let request = H2Client::request_builder("localhost
", "POST
", &upload_chunk_path, Some(param), Some(ct)).unwrap();
567 let upload_data = Some(bytes::Bytes::from(chunk_data));
569 let new_info = MergedChunkInfo::Known(vec![(offset, digest)]);
571 let mut upload_queue = upload_queue.clone();
572 future::Either::Left(h2
573 .send_request(request, upload_data)
574 .and_then(move |response| async move {
576 .send((new_info, Some(response)))
578 .map_err(|err| format_err!("failed to send to upload queue
: {}
", err))
582 let mut upload_queue = upload_queue.clone();
583 future::Either::Right(async move {
585 .send((merged_chunk_info, None))
587 .map_err(|err| format_err!("failed to send to upload queue
: {}
", err))
591 .then(move |result| async move {
592 upload_result.await?.and(result)
595 let repeat = repeat2.load(Ordering::SeqCst);
596 let stream_len = stream_len2.load(Ordering::SeqCst);
597 let speed = ((stream_len*1_000_000)/(1024*1024))/(start_time.elapsed().as_micros() as usize);
599 let mut guard = index_csum_2.lock().unwrap();
600 let csum = guard.take().unwrap().finish();
602 futures::future::ok((repeat, stream_len, start_time.elapsed(), speed, csum))
606 pub async fn upload_speedtest(&self) -> Result<usize, Error> {
608 let mut data = vec![];
609 // generate pseudo random byte sequence
610 for i in 0..1024*1024 {
612 let byte = ((i >> (j<<3))&0xff) as u8;
617 let item_len = data.len();
621 let (upload_queue, upload_result) = Self::response_queue();
623 let start_time = std::time::Instant::now();
627 if start_time.elapsed().as_secs() >= 5 {
631 let mut upload_queue = upload_queue.clone();
633 println!("send test
data ({} bytes
)", data.len());
634 let request = H2Client::request_builder("localhost
", "POST
", "speedtest
", None, None).unwrap();
635 let request_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?;
637 upload_queue.send(request_future).await?;
640 drop(upload_queue); // close queue
642 let _ = upload_result.await?;
644 println!("Uploaded {} chunks
in {} seconds
.", repeat, start_time.elapsed().as_secs());
645 let speed = ((item_len*1_000_000*(repeat as usize))/(1024*1024))/(start_time.elapsed().as_micros() as usize);
646 println!("Time per request
: {} microseconds
.", (start_time.elapsed().as_micros())/(repeat as u128));