1 use std
::collections
::HashSet
;
2 use std
::sync
::atomic
::{AtomicUsize, Ordering}
;
3 use std
::sync
::{Arc, Mutex}
;
6 use chrono
::{DateTime, Utc}
;
8 use futures
::stream
::Stream
;
9 use serde_json
::{json, Value}
;
10 use tokio
::io
::AsyncReadExt
;
11 use tokio
::sync
::{mpsc, oneshot}
;
13 use proxmox
::tools
::digest_to_hex
;
15 use super::merge_known_chunks
::{MergedChunkInfo, MergeKnownChunks}
;
17 use crate::tools
::futures
::Canceller
;
19 use super::{HttpClient, H2Client}
;
21 pub struct BackupWriter
{
26 impl Drop
for BackupWriter
{
29 self.canceller
.cancel();
33 pub struct BackupStats
{
40 fn new(h2
: H2Client
, canceller
: Canceller
) -> Arc
<Self> {
41 Arc
::new(Self { h2, canceller }
)
49 backup_time
: DateTime
<Utc
>,
51 ) -> Result
<Arc
<BackupWriter
>, Error
> {
54 "backup-type": backup_type
,
55 "backup-id": backup_id
,
56 "backup-time": backup_time
.timestamp(),
61 let req
= HttpClient
::request_builder(
62 client
.server(), "GET", "/api2/json/backup", Some(param
)).unwrap();
64 let (h2
, canceller
) = client
.start_h2_connection(req
, String
::from(PROXMOX_BACKUP_PROTOCOL_ID_V1
!())).await?
;
66 Ok(BackupWriter
::new(h2
, canceller
))
73 ) -> Result
<Value
, Error
> {
74 self.h2
.get(path
, param
).await
81 ) -> Result
<Value
, Error
> {
82 self.h2
.put(path
, param
).await
89 ) -> Result
<Value
, Error
> {
90 self.h2
.post(path
, param
).await
93 pub async
fn upload_post(
99 ) -> Result
<Value
, Error
> {
100 self.h2
.upload("POST", path
, param
, content_type
, data
).await
103 pub async
fn send_upload_request(
107 param
: Option
<Value
>,
110 ) -> Result
<h2
::client
::ResponseFuture
, Error
> {
112 let request
= H2Client
::request_builder("localhost", method
, path
, param
, Some(content_type
)).unwrap();
113 let response_future
= self.h2
.send_request(request
, Some(bytes
::Bytes
::from(data
.clone()))).await?
;
117 pub async
fn upload_put(
120 param
: Option
<Value
>,
123 ) -> Result
<Value
, Error
> {
124 self.h2
.upload("PUT", path
, param
, content_type
, data
).await
127 pub async
fn finish(self: Arc
<Self>) -> Result
<(), Error
> {
128 let h2
= self.h2
.clone();
130 h2
.post("finish", None
)
132 self.canceller
.cancel();
137 pub fn force_close(self) {
138 self.canceller
.cancel();
141 pub async
fn upload_blob
<R
: std
::io
::Read
>(
145 ) -> Result
<BackupStats
, Error
> {
146 let mut raw_data
= Vec
::new();
147 // fixme: avoid loading into memory
148 reader
.read_to_end(&mut raw_data
)?
;
150 let csum
= openssl
::sha
::sha256(&raw_data
);
151 let param
= json
!({"encoded-size": raw_data.len(), "file-name": file_name }
);
152 let size
= raw_data
.len() as u64;
153 let _value
= self.h2
.upload("POST", "blob", Some(param
), "application/octet-stream", raw_data
).await?
;
154 Ok(BackupStats { size, csum }
)
157 pub async
fn upload_blob_from_data(
161 crypt_config
: Option
<Arc
<CryptConfig
>>,
164 ) -> Result
<BackupStats
, Error
> {
166 let blob
= if let Some(ref crypt_config
) = crypt_config
{
168 DataBlob
::create_signed(&data
, crypt_config
, compress
)?
170 DataBlob
::encode(&data
, Some(crypt_config
), compress
)?
173 DataBlob
::encode(&data
, None
, compress
)?
176 let raw_data
= blob
.into_inner();
177 let size
= raw_data
.len() as u64;
179 let csum
= openssl
::sha
::sha256(&raw_data
);
180 let param
= json
!({"encoded-size": size, "file-name": file_name }
);
181 let _value
= self.h2
.upload("POST", "blob", Some(param
), "application/octet-stream", raw_data
).await?
;
182 Ok(BackupStats { size, csum }
)
185 pub async
fn upload_blob_from_file
<P
: AsRef
<std
::path
::Path
>>(
189 crypt_config
: Option
<Arc
<CryptConfig
>>,
191 ) -> Result
<BackupStats
, Error
> {
193 let src_path
= src_path
.as_ref();
195 let mut file
= tokio
::fs
::File
::open(src_path
)
197 .map_err(|err
| format_err
!("unable to open file {:?} - {}", src_path
, err
))?
;
199 let mut contents
= Vec
::new();
201 file
.read_to_end(&mut contents
)
203 .map_err(|err
| format_err
!("unable to read file {:?} - {}", src_path
, err
))?
;
205 let blob
= DataBlob
::encode(&contents
, crypt_config
.as_ref().map(AsRef
::as_ref
), compress
)?
;
206 let raw_data
= blob
.into_inner();
207 let size
= raw_data
.len() as u64;
208 let csum
= openssl
::sha
::sha256(&raw_data
);
210 "encoded-size": size
,
211 "file-name": file_name
,
213 self.h2
.upload("POST", "blob", Some(param
), "application/octet-stream", raw_data
).await?
;
214 Ok(BackupStats { size, csum }
)
217 pub async
fn upload_stream(
220 stream
: impl Stream
<Item
= Result
<bytes
::BytesMut
, Error
>>,
222 fixed_size
: Option
<u64>,
223 crypt_config
: Option
<Arc
<CryptConfig
>>,
224 ) -> Result
<BackupStats
, Error
> {
225 let known_chunks
= Arc
::new(Mutex
::new(HashSet
::new()));
227 let mut param
= json
!({ "archive-name": archive_name }
);
228 if let Some(size
) = fixed_size
{
229 param
["size"] = size
.into();
232 let index_path
= format
!("{}_index", prefix
);
233 let close_path
= format
!("{}_close", prefix
);
235 self.download_chunk_list(&index_path
, archive_name
, known_chunks
.clone()).await?
;
237 let wid
= self.h2
.post(&index_path
, Some(param
)).await?
.as_u64().unwrap();
239 let (chunk_count
, size
, _speed
, csum
) =
240 Self::upload_chunk_info_stream(
245 known_chunks
.clone(),
252 "chunk-count": chunk_count
,
254 "csum": proxmox
::tools
::digest_to_hex(&csum
),
256 let _value
= self.h2
.post(&close_path
, Some(param
)).await?
;
263 fn response_queue() -> (
264 mpsc
::Sender
<h2
::client
::ResponseFuture
>,
265 oneshot
::Receiver
<Result
<(), Error
>>
267 let (verify_queue_tx
, verify_queue_rx
) = mpsc
::channel(100);
268 let (verify_result_tx
, verify_result_rx
) = oneshot
::channel();
273 .try_for_each(|response
: h2
::client
::ResponseFuture
| {
275 .map_err(Error
::from
)
276 .and_then(H2Client
::h2api_response
)
277 .map_ok(|result
| println
!("RESPONSE: {:?}", result
))
278 .map_err(|err
| format_err
!("pipelined request failed: {}", err
))
281 let _ignore_closed_channel
= verify_result_tx
.send(result
);
285 (verify_queue_tx
, verify_result_rx
)
288 fn append_chunk_queue(h2
: H2Client
, wid
: u64, path
: String
) -> (
289 mpsc
::Sender
<(MergedChunkInfo
, Option
<h2
::client
::ResponseFuture
>)>,
290 oneshot
::Receiver
<Result
<(), Error
>>
292 let (verify_queue_tx
, verify_queue_rx
) = mpsc
::channel(64);
293 let (verify_result_tx
, verify_result_rx
) = oneshot
::channel();
295 let h2_2
= h2
.clone();
300 .and_then(move |(merged_chunk_info
, response
): (MergedChunkInfo
, Option
<h2
::client
::ResponseFuture
>)| {
301 match (response
, merged_chunk_info
) {
302 (Some(response
), MergedChunkInfo
::Known(list
)) => {
303 future
::Either
::Left(
305 .map_err(Error
::from
)
306 .and_then(H2Client
::h2api_response
)
307 .and_then(move |_result
| {
308 future
::ok(MergedChunkInfo
::Known(list
))
312 (None
, MergedChunkInfo
::Known(list
)) => {
313 future
::Either
::Right(future
::ok(MergedChunkInfo
::Known(list
)))
318 .merge_known_chunks()
319 .and_then(move |merged_chunk_info
| {
320 match merged_chunk_info
{
321 MergedChunkInfo
::Known(chunk_list
) => {
322 let mut digest_list
= vec
![];
323 let mut offset_list
= vec
![];
324 for (offset
, digest
) in chunk_list
{
325 //println!("append chunk {} (offset {})", proxmox::tools::digest_to_hex(&digest), offset);
326 digest_list
.push(digest_to_hex(&digest
));
327 offset_list
.push(offset
);
329 println
!("append chunks list len ({})", digest_list
.len());
330 let param
= json
!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list }
);
331 let request
= H2Client
::request_builder("localhost", "PUT", &path
, None
, Some("application/json")).unwrap();
332 let param_data
= bytes
::Bytes
::from(param
.to_string().as_bytes());
333 let upload_data
= Some(param_data
);
334 h2_2
.send_request(request
, upload_data
)
335 .and_then(move |response
| {
337 .map_err(Error
::from
)
338 .and_then(H2Client
::h2api_response
)
341 .map_err(|err
| format_err
!("pipelined request failed: {}", err
))
346 .try_for_each(|_
| future
::ok(()))
348 let _ignore_closed_channel
= verify_result_tx
.send(result
);
352 (verify_queue_tx
, verify_result_rx
)
355 pub async
fn download_chunk_list(
359 known_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
360 ) -> Result
<(), Error
> {
362 let param
= json
!({ "archive-name": archive_name }
);
363 let request
= H2Client
::request_builder("localhost", "GET", path
, Some(param
), None
).unwrap();
365 let h2request
= self.h2
.send_request(request
, None
).await?
;
366 let resp
= h2request
.await?
;
368 let status
= resp
.status();
370 if !status
.is_success() {
371 H2Client
::h2api_response(resp
).await?
; // raise error
375 let mut body
= resp
.into_body();
376 let mut release_capacity
= body
.release_capacity().clone();
378 let mut stream
= DigestListDecoder
::new(body
.map_err(Error
::from
));
380 while let Some(chunk
) = stream
.try_next().await?
{
381 let _
= release_capacity
.release_capacity(chunk
.len());
382 println
!("GOT DOWNLOAD {}", digest_to_hex(&chunk
));
383 known_chunks
.lock().unwrap().insert(chunk
);
389 fn upload_chunk_info_stream(
392 stream
: impl Stream
<Item
= Result
<bytes
::BytesMut
, Error
>>,
394 known_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
395 crypt_config
: Option
<Arc
<CryptConfig
>>,
396 ) -> impl Future
<Output
= Result
<(usize, usize, usize, [u8; 32]), Error
>> {
398 let repeat
= Arc
::new(AtomicUsize
::new(0));
399 let repeat2
= repeat
.clone();
401 let stream_len
= Arc
::new(AtomicUsize
::new(0));
402 let stream_len2
= stream_len
.clone();
404 let append_chunk_path
= format
!("{}_index", prefix
);
405 let upload_chunk_path
= format
!("{}_chunk", prefix
);
406 let is_fixed_chunk_size
= prefix
== "fixed";
408 let (upload_queue
, upload_result
) =
409 Self::append_chunk_queue(h2
.clone(), wid
, append_chunk_path
.to_owned());
411 let start_time
= std
::time
::Instant
::now();
413 let index_csum
= Arc
::new(Mutex
::new(Some(openssl
::sha
::Sha256
::new())));
414 let index_csum_2
= index_csum
.clone();
417 .and_then(move |data
| {
419 let chunk_len
= data
.len();
421 repeat
.fetch_add(1, Ordering
::SeqCst
);
422 let offset
= stream_len
.fetch_add(chunk_len
, Ordering
::SeqCst
) as u64;
424 let mut chunk_builder
= DataChunkBuilder
::new(data
.as_ref())
427 if let Some(ref crypt_config
) = crypt_config
{
428 chunk_builder
= chunk_builder
.crypt_config(crypt_config
);
431 let mut known_chunks
= known_chunks
.lock().unwrap();
432 let digest
= chunk_builder
.digest();
434 let mut guard
= index_csum
.lock().unwrap();
435 let csum
= guard
.as_mut().unwrap();
437 let chunk_end
= offset
+ chunk_len
as u64;
439 if !is_fixed_chunk_size { csum.update(&chunk_end.to_le_bytes()); }
442 let chunk_is_known
= known_chunks
.contains(digest
);
444 future
::ok(MergedChunkInfo
::Known(vec
![(offset
, *digest
)]))
446 known_chunks
.insert(*digest
);
447 future
::ready(chunk_builder
449 .map(move |(chunk
, digest
)| MergedChunkInfo
::New(ChunkInfo
{
452 chunk_len
: chunk_len
as u64,
458 .merge_known_chunks()
459 .try_for_each(move |merged_chunk_info
| {
461 if let MergedChunkInfo
::New(chunk_info
) = merged_chunk_info
{
462 let offset
= chunk_info
.offset
;
463 let digest
= chunk_info
.digest
;
464 let digest_str
= digest_to_hex(&digest
);
466 println
!("upload new chunk {} ({} bytes, offset {})", digest_str
,
467 chunk_info
.chunk_len
, offset
);
469 let chunk_data
= chunk_info
.chunk
.raw_data();
472 "digest": digest_str
,
473 "size": chunk_info
.chunk_len
,
474 "encoded-size": chunk_data
.len(),
477 let ct
= "application/octet-stream";
478 let request
= H2Client
::request_builder("localhost", "POST", &upload_chunk_path
, Some(param
), Some(ct
)).unwrap();
479 let upload_data
= Some(bytes
::Bytes
::from(chunk_data
));
481 let new_info
= MergedChunkInfo
::Known(vec
![(offset
, digest
)]);
483 let mut upload_queue
= upload_queue
.clone();
484 future
::Either
::Left(h2
485 .send_request(request
, upload_data
)
486 .and_then(move |response
| async
move {
488 .send((new_info
, Some(response
)))
490 .map_err(Error
::from
)
494 let mut upload_queue
= upload_queue
.clone();
495 future
::Either
::Right(async
move {
497 .send((merged_chunk_info
, None
))
499 .map_err(Error
::from
)
503 .then(move |result
| async
move {
504 upload_result
.await?
.and(result
)
507 let repeat
= repeat2
.load(Ordering
::SeqCst
);
508 let stream_len
= stream_len2
.load(Ordering
::SeqCst
);
509 let speed
= ((stream_len
*1000000)/(1024*1024))/(start_time
.elapsed().as_micros() as usize);
510 println
!("Uploaded {} chunks in {} seconds ({} MB/s).", repeat
, start_time
.elapsed().as_secs(), speed
);
512 println
!("Average chunk size was {} bytes.", stream_len
/repeat
);
513 println
!("Time per request: {} microseconds.", (start_time
.elapsed().as_micros())/(repeat
as u128
));
516 let mut guard
= index_csum_2
.lock().unwrap();
517 let csum
= guard
.take().unwrap().finish();
519 futures
::future
::ok((repeat
, stream_len
, speed
, csum
))
523 pub async
fn upload_speedtest(&self) -> Result
<usize, Error
> {
525 let mut data
= vec
![];
526 // generate pseudo random byte sequence
527 for i
in 0..1024*1024 {
529 let byte
= ((i
>> (j
<<3))&0xff) as u8;
534 let item_len
= data
.len();
538 let (upload_queue
, upload_result
) = Self::response_queue();
540 let start_time
= std
::time
::Instant
::now();
544 if start_time
.elapsed().as_secs() >= 5 {
548 let mut upload_queue
= upload_queue
.clone();
550 println
!("send test data ({} bytes)", data
.len());
551 let request
= H2Client
::request_builder("localhost", "POST", "speedtest", None
, None
).unwrap();
552 let request_future
= self.h2
.send_request(request
, Some(bytes
::Bytes
::from(data
.clone()))).await?
;
554 upload_queue
.send(request_future
).await?
;
557 drop(upload_queue
); // close queue
559 let _
= upload_result
.await?
;
561 println
!("Uploaded {} chunks in {} seconds.", repeat
, start_time
.elapsed().as_secs());
562 let speed
= ((item_len
*1000000*(repeat
as usize))/(1024*1024))/(start_time
.elapsed().as_micros() as usize);
563 println
!("Time per request: {} microseconds.", (start_time
.elapsed().as_micros())/(repeat
as u128
));