1 use std
::collections
::HashSet
;
2 use std
::sync
::atomic
::{AtomicUsize, Ordering}
;
3 use std
::sync
::{Arc, Mutex}
;
6 use chrono
::{DateTime, Utc}
;
8 use futures
::stream
::Stream
;
9 use futures
::future
::AbortHandle
;
10 use serde_json
::{json, Value}
;
11 use tokio
::io
::AsyncReadExt
;
12 use tokio
::sync
::{mpsc, oneshot}
;
14 use proxmox
::tools
::digest_to_hex
;
16 use super::merge_known_chunks
::{MergedChunkInfo, MergeKnownChunks}
;
19 use super::{HttpClient, H2Client}
;
21 pub struct BackupWriter
{
27 impl Drop
for BackupWriter
{
34 pub struct BackupStats
{
41 fn new(h2
: H2Client
, abort
: AbortHandle
, verbose
: bool
) -> Arc
<Self> {
42 Arc
::new(Self { h2, abort, verbose }
)
50 backup_time
: DateTime
<Utc
>,
52 ) -> Result
<Arc
<BackupWriter
>, Error
> {
55 "backup-type": backup_type
,
56 "backup-id": backup_id
,
57 "backup-time": backup_time
.timestamp(),
62 let req
= HttpClient
::request_builder(
63 client
.server(), "GET", "/api2/json/backup", Some(param
)).unwrap();
65 let (h2
, abort
) = client
.start_h2_connection(req
, String
::from(PROXMOX_BACKUP_PROTOCOL_ID_V1
!())).await?
;
67 Ok(BackupWriter
::new(h2
, abort
, debug
))
74 ) -> Result
<Value
, Error
> {
75 self.h2
.get(path
, param
).await
82 ) -> Result
<Value
, Error
> {
83 self.h2
.put(path
, param
).await
90 ) -> Result
<Value
, Error
> {
91 self.h2
.post(path
, param
).await
94 pub async
fn upload_post(
100 ) -> Result
<Value
, Error
> {
101 self.h2
.upload("POST", path
, param
, content_type
, data
).await
104 pub async
fn send_upload_request(
108 param
: Option
<Value
>,
111 ) -> Result
<h2
::client
::ResponseFuture
, Error
> {
113 let request
= H2Client
::request_builder("localhost", method
, path
, param
, Some(content_type
)).unwrap();
114 let response_future
= self.h2
.send_request(request
, Some(bytes
::Bytes
::from(data
.clone()))).await?
;
118 pub async
fn upload_put(
121 param
: Option
<Value
>,
124 ) -> Result
<Value
, Error
> {
125 self.h2
.upload("PUT", path
, param
, content_type
, data
).await
128 pub async
fn finish(self: Arc
<Self>) -> Result
<(), Error
> {
129 let h2
= self.h2
.clone();
131 h2
.post("finish", None
)
138 pub fn cancel(&self) {
142 pub async
fn upload_blob
<R
: std
::io
::Read
>(
146 ) -> Result
<BackupStats
, Error
> {
147 let mut raw_data
= Vec
::new();
148 // fixme: avoid loading into memory
149 reader
.read_to_end(&mut raw_data
)?
;
151 let csum
= openssl
::sha
::sha256(&raw_data
);
152 let param
= json
!({"encoded-size": raw_data.len(), "file-name": file_name }
);
153 let size
= raw_data
.len() as u64;
154 let _value
= self.h2
.upload("POST", "blob", Some(param
), "application/octet-stream", raw_data
).await?
;
155 Ok(BackupStats { size, csum }
)
158 pub async
fn upload_blob_from_data(
162 crypt_config
: Option
<Arc
<CryptConfig
>>,
165 ) -> Result
<BackupStats
, Error
> {
167 let blob
= if let Some(ref crypt_config
) = crypt_config
{
169 DataBlob
::create_signed(&data
, crypt_config
, compress
)?
171 DataBlob
::encode(&data
, Some(crypt_config
), compress
)?
174 DataBlob
::encode(&data
, None
, compress
)?
177 let raw_data
= blob
.into_inner();
178 let size
= raw_data
.len() as u64;
180 let csum
= openssl
::sha
::sha256(&raw_data
);
181 let param
= json
!({"encoded-size": size, "file-name": file_name }
);
182 let _value
= self.h2
.upload("POST", "blob", Some(param
), "application/octet-stream", raw_data
).await?
;
183 Ok(BackupStats { size, csum }
)
186 pub async
fn upload_blob_from_file
<P
: AsRef
<std
::path
::Path
>>(
190 crypt_config
: Option
<Arc
<CryptConfig
>>,
192 ) -> Result
<BackupStats
, Error
> {
194 let src_path
= src_path
.as_ref();
196 let mut file
= tokio
::fs
::File
::open(src_path
)
198 .map_err(|err
| format_err
!("unable to open file {:?} - {}", src_path
, err
))?
;
200 let mut contents
= Vec
::new();
202 file
.read_to_end(&mut contents
)
204 .map_err(|err
| format_err
!("unable to read file {:?} - {}", src_path
, err
))?
;
206 let blob
= DataBlob
::encode(&contents
, crypt_config
.as_ref().map(AsRef
::as_ref
), compress
)?
;
207 let raw_data
= blob
.into_inner();
208 let size
= raw_data
.len() as u64;
209 let csum
= openssl
::sha
::sha256(&raw_data
);
211 "encoded-size": size
,
212 "file-name": file_name
,
214 self.h2
.upload("POST", "blob", Some(param
), "application/octet-stream", raw_data
).await?
;
215 Ok(BackupStats { size, csum }
)
218 pub async
fn upload_stream(
221 stream
: impl Stream
<Item
= Result
<bytes
::BytesMut
, Error
>>,
223 fixed_size
: Option
<u64>,
224 crypt_config
: Option
<Arc
<CryptConfig
>>,
225 ) -> Result
<BackupStats
, Error
> {
226 let known_chunks
= Arc
::new(Mutex
::new(HashSet
::new()));
228 let mut param
= json
!({ "archive-name": archive_name }
);
229 if let Some(size
) = fixed_size
{
230 param
["size"] = size
.into();
233 let index_path
= format
!("{}_index", prefix
);
234 let close_path
= format
!("{}_close", prefix
);
236 self.download_chunk_list(&index_path
, archive_name
, known_chunks
.clone()).await?
;
238 let wid
= self.h2
.post(&index_path
, Some(param
)).await?
.as_u64().unwrap();
240 let (chunk_count
, size
, duration
, speed
, csum
) =
241 Self::upload_chunk_info_stream(
246 known_chunks
.clone(),
252 println
!("{}: Uploaded {} bytes as {} chunks in {} seconds ({} MB/s).", archive_name
, size
, chunk_count
, duration
.as_secs(), speed
);
254 println
!("{}: Average chunk size was {} bytes.", archive_name
, size
/chunk_count
);
255 println
!("{}: Time per request: {} microseconds.", archive_name
, (duration
.as_micros())/(chunk_count
as u128
));
260 "chunk-count": chunk_count
,
262 "csum": proxmox
::tools
::digest_to_hex(&csum
),
264 let _value
= self.h2
.post(&close_path
, Some(param
)).await?
;
271 fn response_queue() -> (
272 mpsc
::Sender
<h2
::client
::ResponseFuture
>,
273 oneshot
::Receiver
<Result
<(), Error
>>
275 let (verify_queue_tx
, verify_queue_rx
) = mpsc
::channel(100);
276 let (verify_result_tx
, verify_result_rx
) = oneshot
::channel();
278 // FIXME: check if this works as expected as replacement for the combinator below?
279 // tokio::spawn(async move {
280 // let result: Result<(), Error> = (async move {
281 // while let Some(response) = verify_queue_rx.recv().await {
282 // match H2Client::h2api_response(response.await?).await {
283 // Ok(result) => println!("RESPONSE: {:?}", result),
284 // Err(err) => bail!("pipelined request failed: {}", err),
289 // let _ignore_closed_channel = verify_result_tx.send(result);
291 // old code for reference?
295 .try_for_each(|response
: h2
::client
::ResponseFuture
| {
297 .map_err(Error
::from
)
298 .and_then(H2Client
::h2api_response
)
299 .map_ok(|result
| println
!("RESPONSE: {:?}", result
))
300 .map_err(|err
| format_err
!("pipelined request failed: {}", err
))
303 let _ignore_closed_channel
= verify_result_tx
.send(result
);
307 (verify_queue_tx
, verify_result_rx
)
310 fn append_chunk_queue(h2
: H2Client
, wid
: u64, path
: String
, verbose
: bool
) -> (
311 mpsc
::Sender
<(MergedChunkInfo
, Option
<h2
::client
::ResponseFuture
>)>,
312 oneshot
::Receiver
<Result
<(), Error
>>,
314 let (verify_queue_tx
, verify_queue_rx
) = mpsc
::channel(64);
315 let (verify_result_tx
, verify_result_rx
) = oneshot
::channel();
317 let h2_2
= h2
.clone();
319 // FIXME: async-block-ify this code!
323 .and_then(move |(merged_chunk_info
, response
): (MergedChunkInfo
, Option
<h2
::client
::ResponseFuture
>)| {
324 match (response
, merged_chunk_info
) {
325 (Some(response
), MergedChunkInfo
::Known(list
)) => {
326 future
::Either
::Left(
328 .map_err(Error
::from
)
329 .and_then(H2Client
::h2api_response
)
330 .and_then(move |_result
| {
331 future
::ok(MergedChunkInfo
::Known(list
))
335 (None
, MergedChunkInfo
::Known(list
)) => {
336 future
::Either
::Right(future
::ok(MergedChunkInfo
::Known(list
)))
341 .merge_known_chunks()
342 .and_then(move |merged_chunk_info
| {
343 match merged_chunk_info
{
344 MergedChunkInfo
::Known(chunk_list
) => {
345 let mut digest_list
= vec
![];
346 let mut offset_list
= vec
![];
347 for (offset
, digest
) in chunk_list
{
348 digest_list
.push(digest_to_hex(&digest
));
349 offset_list
.push(offset
);
351 if verbose { println!("append chunks list len ({}
)", digest_list.len()); }
352 let param = json!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list });
353 let request = H2Client::request_builder("localhost
", "PUT
", &path, None, Some("application
/json
")).unwrap();
354 let param_data = bytes::Bytes::from(param.to_string().into_bytes());
355 let upload_data = Some(param_data);
356 h2_2.send_request(request, upload_data)
357 .and_then(move |response| {
359 .map_err(Error::from)
360 .and_then(H2Client::h2api_response)
363 .map_err(|err| format_err!("pipelined request failed
: {}
", err))
368 .try_for_each(|_| future::ok(()))
370 let _ignore_closed_channel = verify_result_tx.send(result);
374 (verify_queue_tx, verify_result_rx)
377 pub async fn download_chunk_list(
381 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
382 ) -> Result<(), Error> {
384 let param = json!({ "archive-name": archive_name });
385 let request = H2Client::request_builder("localhost
", "GET
", path, Some(param), None).unwrap();
387 let h2request = self.h2.send_request(request, None).await?;
388 let resp = h2request.await?;
390 let status = resp.status();
392 if !status.is_success() {
393 H2Client::h2api_response(resp).await?; // raise error
397 let mut body = resp.into_body();
398 let mut flow_control = body.flow_control().clone();
400 let mut stream = DigestListDecoder::new(body.map_err(Error::from));
402 while let Some(chunk) = stream.try_next().await? {
403 let _ = flow_control.release_capacity(chunk.len());
404 known_chunks.lock().unwrap().insert(chunk);
408 println!("{}
: known chunks list length is {}
", archive_name, known_chunks.lock().unwrap().len());
414 fn upload_chunk_info_stream(
417 stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
419 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
420 crypt_config: Option<Arc<CryptConfig>>,
422 ) -> impl Future<Output = Result<(usize, usize, std::time::Duration, usize, [u8; 32]), Error>> {
424 let repeat = Arc::new(AtomicUsize::new(0));
425 let repeat2 = repeat.clone();
427 let stream_len = Arc::new(AtomicUsize::new(0));
428 let stream_len2 = stream_len.clone();
430 let append_chunk_path = format!("{}_index
", prefix);
431 let upload_chunk_path = format!("{}_chunk
", prefix);
432 let is_fixed_chunk_size = prefix == "fixed
";
434 let (upload_queue, upload_result) =
435 Self::append_chunk_queue(h2.clone(), wid, append_chunk_path.to_owned(), verbose);
437 let start_time = std::time::Instant::now();
439 let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
440 let index_csum_2 = index_csum.clone();
443 .and_then(move |data| {
445 let chunk_len = data.len();
447 repeat.fetch_add(1, Ordering::SeqCst);
448 let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;
450 let mut chunk_builder = DataChunkBuilder::new(data.as_ref())
453 if let Some(ref crypt_config) = crypt_config {
454 chunk_builder = chunk_builder.crypt_config(crypt_config);
457 let mut known_chunks = known_chunks.lock().unwrap();
458 let digest = chunk_builder.digest();
460 let mut guard = index_csum.lock().unwrap();
461 let csum = guard.as_mut().unwrap();
463 let chunk_end = offset + chunk_len as u64;
465 if !is_fixed_chunk_size { csum.update(&chunk_end.to_le_bytes()); }
468 let chunk_is_known = known_chunks.contains(digest);
470 future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
472 known_chunks.insert(*digest);
473 future::ready(chunk_builder
475 .map(move |(chunk, digest)| MergedChunkInfo::New(ChunkInfo {
478 chunk_len: chunk_len as u64,
484 .merge_known_chunks()
485 .try_for_each(move |merged_chunk_info| {
487 if let MergedChunkInfo::New(chunk_info) = merged_chunk_info {
488 let offset = chunk_info.offset;
489 let digest = chunk_info.digest;
490 let digest_str = digest_to_hex(&digest);
493 println!("upload new chunk {}
({} bytes
, offset {}
)", digest_str,
494 chunk_info.chunk_len, offset);
497 let chunk_data = chunk_info.chunk.into_inner();
500 "digest
": digest_str,
501 "size
": chunk_info.chunk_len,
502 "encoded
-size
": chunk_data.len(),
505 let ct = "application
/octet
-stream
";
506 let request = H2Client::request_builder("localhost
", "POST
", &upload_chunk_path, Some(param), Some(ct)).unwrap();
507 let upload_data = Some(bytes::Bytes::from(chunk_data));
509 let new_info = MergedChunkInfo::Known(vec![(offset, digest)]);
511 let mut upload_queue = upload_queue.clone();
512 future::Either::Left(h2
513 .send_request(request, upload_data)
514 .and_then(move |response| async move {
516 .send((new_info, Some(response)))
518 .map_err(|err| format_err!("failed to send to upload queue
: {}
", err))
522 let mut upload_queue = upload_queue.clone();
523 future::Either::Right(async move {
525 .send((merged_chunk_info, None))
527 .map_err(|err| format_err!("failed to send to upload queue
: {}
", err))
531 .then(move |result| async move {
532 upload_result.await?.and(result)
535 let repeat = repeat2.load(Ordering::SeqCst);
536 let stream_len = stream_len2.load(Ordering::SeqCst);
537 let speed = ((stream_len*1_000_000)/(1024*1024))/(start_time.elapsed().as_micros() as usize);
539 let mut guard = index_csum_2.lock().unwrap();
540 let csum = guard.take().unwrap().finish();
542 futures::future::ok((repeat, stream_len, start_time.elapsed(), speed, csum))
546 pub async fn upload_speedtest(&self) -> Result<usize, Error> {
548 let mut data = vec![];
549 // generate pseudo random byte sequence
550 for i in 0..1024*1024 {
552 let byte = ((i >> (j<<3))&0xff) as u8;
557 let item_len = data.len();
561 let (upload_queue, upload_result) = Self::response_queue();
563 let start_time = std::time::Instant::now();
567 if start_time.elapsed().as_secs() >= 5 {
571 let mut upload_queue = upload_queue.clone();
573 println!("send test
data ({} bytes
)", data.len());
574 let request = H2Client::request_builder("localhost
", "POST
", "speedtest
", None, None).unwrap();
575 let request_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?;
577 upload_queue.send(request_future).await?;
580 drop(upload_queue); // close queue
582 let _ = upload_result.await?;
584 println!("Uploaded {} chunks
in {} seconds
.", repeat, start_time.elapsed().as_secs());
585 let speed = ((item_len*1_000_000*(repeat as usize))/(1024*1024))/(start_time.elapsed().as_micros() as usize);
586 println!("Time per request
: {} microseconds
.", (start_time.elapsed().as_micros())/(repeat as u128));