1 use std
::collections
::HashSet
;
2 use std
::sync
::atomic
::{AtomicUsize, Ordering}
;
3 use std
::sync
::{Arc, Mutex}
;
6 use chrono
::{DateTime, Utc}
;
8 use futures
::stream
::Stream
;
9 use futures
::future
::AbortHandle
;
10 use serde_json
::{json, Value}
;
11 use tokio
::io
::AsyncReadExt
;
12 use tokio
::sync
::{mpsc, oneshot}
;
14 use proxmox
::tools
::digest_to_hex
;
16 use super::merge_known_chunks
::{MergedChunkInfo, MergeKnownChunks}
;
19 use super::{HttpClient, H2Client}
;
21 pub struct BackupWriter
{
27 impl Drop
for BackupWriter
{
34 pub struct BackupStats
{
41 fn new(h2
: H2Client
, abort
: AbortHandle
, verbose
: bool
) -> Arc
<Self> {
42 Arc
::new(Self { h2, abort, verbose }
)
50 backup_time
: DateTime
<Utc
>,
52 ) -> Result
<Arc
<BackupWriter
>, Error
> {
55 "backup-type": backup_type
,
56 "backup-id": backup_id
,
57 "backup-time": backup_time
.timestamp(),
62 let req
= HttpClient
::request_builder(
63 client
.server(), "GET", "/api2/json/backup", Some(param
)).unwrap();
65 let (h2
, abort
) = client
.start_h2_connection(req
, String
::from(PROXMOX_BACKUP_PROTOCOL_ID_V1
!())).await?
;
67 Ok(BackupWriter
::new(h2
, abort
, debug
))
74 ) -> Result
<Value
, Error
> {
75 self.h2
.get(path
, param
).await
82 ) -> Result
<Value
, Error
> {
83 self.h2
.put(path
, param
).await
90 ) -> Result
<Value
, Error
> {
91 self.h2
.post(path
, param
).await
94 pub async
fn upload_post(
100 ) -> Result
<Value
, Error
> {
101 self.h2
.upload("POST", path
, param
, content_type
, data
).await
104 pub async
fn send_upload_request(
108 param
: Option
<Value
>,
111 ) -> Result
<h2
::client
::ResponseFuture
, Error
> {
113 let request
= H2Client
::request_builder("localhost", method
, path
, param
, Some(content_type
)).unwrap();
114 let response_future
= self.h2
.send_request(request
, Some(bytes
::Bytes
::from(data
.clone()))).await?
;
118 pub async
fn upload_put(
121 param
: Option
<Value
>,
124 ) -> Result
<Value
, Error
> {
125 self.h2
.upload("PUT", path
, param
, content_type
, data
).await
128 pub async
fn finish(self: Arc
<Self>) -> Result
<(), Error
> {
129 let h2
= self.h2
.clone();
131 h2
.post("finish", None
)
138 pub fn cancel(&self) {
142 pub async
fn upload_blob
<R
: std
::io
::Read
>(
146 ) -> Result
<BackupStats
, Error
> {
147 let mut raw_data
= Vec
::new();
148 // fixme: avoid loading into memory
149 reader
.read_to_end(&mut raw_data
)?
;
151 let csum
= openssl
::sha
::sha256(&raw_data
);
152 let param
= json
!({"encoded-size": raw_data.len(), "file-name": file_name }
);
153 let size
= raw_data
.len() as u64;
154 let _value
= self.h2
.upload("POST", "blob", Some(param
), "application/octet-stream", raw_data
).await?
;
155 Ok(BackupStats { size, csum }
)
158 pub async
fn upload_blob_from_data(
162 crypt_config
: Option
<Arc
<CryptConfig
>>,
165 ) -> Result
<BackupStats
, Error
> {
167 let blob
= if let Some(ref crypt_config
) = crypt_config
{
169 DataBlob
::create_signed(&data
, crypt_config
, compress
)?
171 DataBlob
::encode(&data
, Some(crypt_config
), compress
)?
174 DataBlob
::encode(&data
, None
, compress
)?
177 let raw_data
= blob
.into_inner();
178 let size
= raw_data
.len() as u64;
180 let csum
= openssl
::sha
::sha256(&raw_data
);
181 let param
= json
!({"encoded-size": size, "file-name": file_name }
);
182 let _value
= self.h2
.upload("POST", "blob", Some(param
), "application/octet-stream", raw_data
).await?
;
183 Ok(BackupStats { size, csum }
)
186 pub async
fn upload_blob_from_file
<P
: AsRef
<std
::path
::Path
>>(
190 crypt_config
: Option
<Arc
<CryptConfig
>>,
192 ) -> Result
<BackupStats
, Error
> {
194 let src_path
= src_path
.as_ref();
196 let mut file
= tokio
::fs
::File
::open(src_path
)
198 .map_err(|err
| format_err
!("unable to open file {:?} - {}", src_path
, err
))?
;
200 let mut contents
= Vec
::new();
202 file
.read_to_end(&mut contents
)
204 .map_err(|err
| format_err
!("unable to read file {:?} - {}", src_path
, err
))?
;
206 let blob
= DataBlob
::encode(&contents
, crypt_config
.as_ref().map(AsRef
::as_ref
), compress
)?
;
207 let raw_data
= blob
.into_inner();
208 let size
= raw_data
.len() as u64;
209 let csum
= openssl
::sha
::sha256(&raw_data
);
211 "encoded-size": size
,
212 "file-name": file_name
,
214 self.h2
.upload("POST", "blob", Some(param
), "application/octet-stream", raw_data
).await?
;
215 Ok(BackupStats { size, csum }
)
218 pub async
fn upload_stream(
221 stream
: impl Stream
<Item
= Result
<bytes
::BytesMut
, Error
>>,
223 fixed_size
: Option
<u64>,
224 crypt_config
: Option
<Arc
<CryptConfig
>>,
225 ) -> Result
<BackupStats
, Error
> {
226 let known_chunks
= Arc
::new(Mutex
::new(HashSet
::new()));
228 let mut param
= json
!({ "archive-name": archive_name }
);
229 if let Some(size
) = fixed_size
{
230 param
["size"] = size
.into();
233 let index_path
= format
!("{}_index", prefix
);
234 let close_path
= format
!("{}_close", prefix
);
236 self.download_chunk_list(&index_path
, archive_name
, known_chunks
.clone()).await?
;
238 let wid
= self.h2
.post(&index_path
, Some(param
)).await?
.as_u64().unwrap();
240 let (chunk_count
, size
, _speed
, csum
) =
241 Self::upload_chunk_info_stream(
246 known_chunks
.clone(),
254 "chunk-count": chunk_count
,
256 "csum": proxmox
::tools
::digest_to_hex(&csum
),
258 let _value
= self.h2
.post(&close_path
, Some(param
)).await?
;
265 fn response_queue() -> (
266 mpsc
::Sender
<h2
::client
::ResponseFuture
>,
267 oneshot
::Receiver
<Result
<(), Error
>>
269 let (verify_queue_tx
, verify_queue_rx
) = mpsc
::channel(100);
270 let (verify_result_tx
, verify_result_rx
) = oneshot
::channel();
272 // FIXME: check if this works as expected as replacement for the combinator below?
273 // tokio::spawn(async move {
274 // let result: Result<(), Error> = (async move {
275 // while let Some(response) = verify_queue_rx.recv().await {
276 // match H2Client::h2api_response(response.await?).await {
277 // Ok(result) => println!("RESPONSE: {:?}", result),
278 // Err(err) => bail!("pipelined request failed: {}", err),
283 // let _ignore_closed_channel = verify_result_tx.send(result);
285 // old code for reference?
289 .try_for_each(|response
: h2
::client
::ResponseFuture
| {
291 .map_err(Error
::from
)
292 .and_then(H2Client
::h2api_response
)
293 .map_ok(|result
| println
!("RESPONSE: {:?}", result
))
294 .map_err(|err
| format_err
!("pipelined request failed: {}", err
))
297 let _ignore_closed_channel
= verify_result_tx
.send(result
);
301 (verify_queue_tx
, verify_result_rx
)
304 fn append_chunk_queue(h2
: H2Client
, wid
: u64, path
: String
, verbose
: bool
) -> (
305 mpsc
::Sender
<(MergedChunkInfo
, Option
<h2
::client
::ResponseFuture
>)>,
306 oneshot
::Receiver
<Result
<(), Error
>>,
308 let (verify_queue_tx
, verify_queue_rx
) = mpsc
::channel(64);
309 let (verify_result_tx
, verify_result_rx
) = oneshot
::channel();
311 let h2_2
= h2
.clone();
313 // FIXME: async-block-ify this code!
317 .and_then(move |(merged_chunk_info
, response
): (MergedChunkInfo
, Option
<h2
::client
::ResponseFuture
>)| {
318 match (response
, merged_chunk_info
) {
319 (Some(response
), MergedChunkInfo
::Known(list
)) => {
320 future
::Either
::Left(
322 .map_err(Error
::from
)
323 .and_then(H2Client
::h2api_response
)
324 .and_then(move |_result
| {
325 future
::ok(MergedChunkInfo
::Known(list
))
329 (None
, MergedChunkInfo
::Known(list
)) => {
330 future
::Either
::Right(future
::ok(MergedChunkInfo
::Known(list
)))
335 .merge_known_chunks()
336 .and_then(move |merged_chunk_info
| {
337 match merged_chunk_info
{
338 MergedChunkInfo
::Known(chunk_list
) => {
339 let mut digest_list
= vec
![];
340 let mut offset_list
= vec
![];
341 for (offset
, digest
) in chunk_list
{
342 digest_list
.push(digest_to_hex(&digest
));
343 offset_list
.push(offset
);
345 if verbose { println!("append chunks list len ({}
)", digest_list.len()); }
346 let param = json!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list });
347 let request = H2Client::request_builder("localhost
", "PUT
", &path, None, Some("application
/json
")).unwrap();
348 let param_data = bytes::Bytes::from(param.to_string().into_bytes());
349 let upload_data = Some(param_data);
350 h2_2.send_request(request, upload_data)
351 .and_then(move |response| {
353 .map_err(Error::from)
354 .and_then(H2Client::h2api_response)
357 .map_err(|err| format_err!("pipelined request failed
: {}
", err))
362 .try_for_each(|_| future::ok(()))
364 let _ignore_closed_channel = verify_result_tx.send(result);
368 (verify_queue_tx, verify_result_rx)
371 pub async fn download_chunk_list(
375 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
376 ) -> Result<(), Error> {
378 let param = json!({ "archive-name": archive_name });
379 let request = H2Client::request_builder("localhost
", "GET
", path, Some(param), None).unwrap();
381 let h2request = self.h2.send_request(request, None).await?;
382 let resp = h2request.await?;
384 let status = resp.status();
386 if !status.is_success() {
387 H2Client::h2api_response(resp).await?; // raise error
391 let mut body = resp.into_body();
392 let mut flow_control = body.flow_control().clone();
394 let mut stream = DigestListDecoder::new(body.map_err(Error::from));
396 while let Some(chunk) = stream.try_next().await? {
397 let _ = flow_control.release_capacity(chunk.len());
398 known_chunks.lock().unwrap().insert(chunk);
402 println!("known chunks list length
: {}
", known_chunks.lock().unwrap().len());
408 fn upload_chunk_info_stream(
411 stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
413 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
414 crypt_config: Option<Arc<CryptConfig>>,
416 ) -> impl Future<Output = Result<(usize, usize, usize, [u8; 32]), Error>> {
418 let repeat = Arc::new(AtomicUsize::new(0));
419 let repeat2 = repeat.clone();
421 let stream_len = Arc::new(AtomicUsize::new(0));
422 let stream_len2 = stream_len.clone();
424 let append_chunk_path = format!("{}_index
", prefix);
425 let upload_chunk_path = format!("{}_chunk
", prefix);
426 let is_fixed_chunk_size = prefix == "fixed
";
428 let (upload_queue, upload_result) =
429 Self::append_chunk_queue(h2.clone(), wid, append_chunk_path.to_owned(), verbose);
431 let start_time = std::time::Instant::now();
433 let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
434 let index_csum_2 = index_csum.clone();
437 .and_then(move |data| {
439 let chunk_len = data.len();
441 repeat.fetch_add(1, Ordering::SeqCst);
442 let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;
444 let mut chunk_builder = DataChunkBuilder::new(data.as_ref())
447 if let Some(ref crypt_config) = crypt_config {
448 chunk_builder = chunk_builder.crypt_config(crypt_config);
451 let mut known_chunks = known_chunks.lock().unwrap();
452 let digest = chunk_builder.digest();
454 let mut guard = index_csum.lock().unwrap();
455 let csum = guard.as_mut().unwrap();
457 let chunk_end = offset + chunk_len as u64;
459 if !is_fixed_chunk_size { csum.update(&chunk_end.to_le_bytes()); }
462 let chunk_is_known = known_chunks.contains(digest);
464 future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
466 known_chunks.insert(*digest);
467 future::ready(chunk_builder
469 .map(move |(chunk, digest)| MergedChunkInfo::New(ChunkInfo {
472 chunk_len: chunk_len as u64,
478 .merge_known_chunks()
479 .try_for_each(move |merged_chunk_info| {
481 if let MergedChunkInfo::New(chunk_info) = merged_chunk_info {
482 let offset = chunk_info.offset;
483 let digest = chunk_info.digest;
484 let digest_str = digest_to_hex(&digest);
487 println!("upload new chunk {}
({} bytes
, offset {}
)", digest_str,
488 chunk_info.chunk_len, offset);
491 let chunk_data = chunk_info.chunk.into_inner();
494 "digest
": digest_str,
495 "size
": chunk_info.chunk_len,
496 "encoded
-size
": chunk_data.len(),
499 let ct = "application
/octet
-stream
";
500 let request = H2Client::request_builder("localhost
", "POST
", &upload_chunk_path, Some(param), Some(ct)).unwrap();
501 let upload_data = Some(bytes::Bytes::from(chunk_data));
503 let new_info = MergedChunkInfo::Known(vec![(offset, digest)]);
505 let mut upload_queue = upload_queue.clone();
506 future::Either::Left(h2
507 .send_request(request, upload_data)
508 .and_then(move |response| async move {
510 .send((new_info, Some(response)))
512 .map_err(|err| format_err!("failed to send to upload queue
: {}
", err))
516 let mut upload_queue = upload_queue.clone();
517 future::Either::Right(async move {
519 .send((merged_chunk_info, None))
521 .map_err(|err| format_err!("failed to send to upload queue
: {}
", err))
525 .then(move |result| async move {
526 upload_result.await?.and(result)
529 let repeat = repeat2.load(Ordering::SeqCst);
530 let stream_len = stream_len2.load(Ordering::SeqCst);
531 let speed = ((stream_len*1_000_000)/(1024*1024))/(start_time.elapsed().as_micros() as usize);
532 println!("Uploaded {} chunks
in {}
seconds ({} MB
/s
).", repeat, start_time.elapsed().as_secs(), speed);
534 println!("Average chunk size was {} bytes
.", stream_len/repeat);
535 println!("Time per request
: {} microseconds
.", (start_time.elapsed().as_micros())/(repeat as u128));
538 let mut guard = index_csum_2.lock().unwrap();
539 let csum = guard.take().unwrap().finish();
541 futures::future::ok((repeat, stream_len, speed, csum))
545 pub async fn upload_speedtest(&self) -> Result<usize, Error> {
547 let mut data = vec![];
548 // generate pseudo random byte sequence
549 for i in 0..1024*1024 {
551 let byte = ((i >> (j<<3))&0xff) as u8;
556 let item_len = data.len();
560 let (upload_queue, upload_result) = Self::response_queue();
562 let start_time = std::time::Instant::now();
566 if start_time.elapsed().as_secs() >= 5 {
570 let mut upload_queue = upload_queue.clone();
572 println!("send test
data ({} bytes
)", data.len());
573 let request = H2Client::request_builder("localhost
", "POST
", "speedtest
", None, None).unwrap();
574 let request_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?;
576 upload_queue.send(request_future).await?;
579 drop(upload_queue); // close queue
581 let _ = upload_result.await?;
583 println!("Uploaded {} chunks
in {} seconds
.", repeat, start_time.elapsed().as_secs());
584 let speed = ((item_len*1_000_000*(repeat as usize))/(1024*1024))/(start_time.elapsed().as_micros() as usize);
585 println!("Time per request
: {} microseconds
.", (start_time.elapsed().as_micros())/(repeat as u128));