1 use std
::collections
::HashSet
;
2 use std
::sync
::atomic
::{AtomicUsize, Ordering}
;
3 use std
::sync
::{Arc, Mutex}
;
6 use chrono
::{DateTime, Utc}
;
8 use futures
::stream
::Stream
;
9 use serde_json
::{json, Value}
;
10 use tokio
::io
::AsyncReadExt
;
11 use tokio
::sync
::{mpsc, oneshot}
;
13 use proxmox
::tools
::digest_to_hex
;
15 use super::merge_known_chunks
::{MergedChunkInfo, MergeKnownChunks}
;
17 use crate::tools
::futures
::Canceller
;
19 use super::{HttpClient, H2Client}
;
21 pub struct BackupWriter
{
26 impl Drop
for BackupWriter
{
29 self.canceller
.cancel();
33 pub struct BackupStats
{
40 fn new(h2
: H2Client
, canceller
: Canceller
) -> Arc
<Self> {
41 Arc
::new(Self { h2, canceller }
)
49 backup_time
: DateTime
<Utc
>,
51 ) -> Result
<Arc
<BackupWriter
>, Error
> {
54 "backup-type": backup_type
,
55 "backup-id": backup_id
,
56 "backup-time": backup_time
.timestamp(),
61 let req
= HttpClient
::request_builder(
62 client
.server(), "GET", "/api2/json/backup", Some(param
)).unwrap();
64 let (h2
, canceller
) = client
.start_h2_connection(req
, String
::from(PROXMOX_BACKUP_PROTOCOL_ID_V1
!())).await?
;
66 Ok(BackupWriter
::new(h2
, canceller
))
73 ) -> Result
<Value
, Error
> {
74 self.h2
.get(path
, param
).await
81 ) -> Result
<Value
, Error
> {
82 self.h2
.put(path
, param
).await
89 ) -> Result
<Value
, Error
> {
90 self.h2
.post(path
, param
).await
93 pub async
fn upload_post(
99 ) -> Result
<Value
, Error
> {
100 self.h2
.upload("POST", path
, param
, content_type
, data
).await
103 pub async
fn send_upload_request(
107 param
: Option
<Value
>,
110 ) -> Result
<h2
::client
::ResponseFuture
, Error
> {
112 let request
= H2Client
::request_builder("localhost", method
, path
, param
, Some(content_type
)).unwrap();
113 let response_future
= self.h2
.send_request(request
, Some(bytes
::Bytes
::from(data
.clone()))).await?
;
117 pub async
fn upload_put(
120 param
: Option
<Value
>,
123 ) -> Result
<Value
, Error
> {
124 self.h2
.upload("PUT", path
, param
, content_type
, data
).await
127 pub async
fn finish(self: Arc
<Self>) -> Result
<(), Error
> {
128 let h2
= self.h2
.clone();
130 h2
.post("finish", None
)
132 self.canceller
.cancel();
137 pub fn cancel(&self) {
138 self.canceller
.cancel();
141 pub async
fn upload_blob
<R
: std
::io
::Read
>(
145 ) -> Result
<BackupStats
, Error
> {
146 let mut raw_data
= Vec
::new();
147 // fixme: avoid loading into memory
148 reader
.read_to_end(&mut raw_data
)?
;
150 let csum
= openssl
::sha
::sha256(&raw_data
);
151 let param
= json
!({"encoded-size": raw_data.len(), "file-name": file_name }
);
152 let size
= raw_data
.len() as u64;
153 let _value
= self.h2
.upload("POST", "blob", Some(param
), "application/octet-stream", raw_data
).await?
;
154 Ok(BackupStats { size, csum }
)
157 pub async
fn upload_blob_from_data(
161 crypt_config
: Option
<Arc
<CryptConfig
>>,
164 ) -> Result
<BackupStats
, Error
> {
166 let blob
= if let Some(ref crypt_config
) = crypt_config
{
168 DataBlob
::create_signed(&data
, crypt_config
, compress
)?
170 DataBlob
::encode(&data
, Some(crypt_config
), compress
)?
173 DataBlob
::encode(&data
, None
, compress
)?
176 let raw_data
= blob
.into_inner();
177 let size
= raw_data
.len() as u64;
179 let csum
= openssl
::sha
::sha256(&raw_data
);
180 let param
= json
!({"encoded-size": size, "file-name": file_name }
);
181 let _value
= self.h2
.upload("POST", "blob", Some(param
), "application/octet-stream", raw_data
).await?
;
182 Ok(BackupStats { size, csum }
)
185 pub async
fn upload_blob_from_file
<P
: AsRef
<std
::path
::Path
>>(
189 crypt_config
: Option
<Arc
<CryptConfig
>>,
191 ) -> Result
<BackupStats
, Error
> {
193 let src_path
= src_path
.as_ref();
195 let mut file
= tokio
::fs
::File
::open(src_path
)
197 .map_err(|err
| format_err
!("unable to open file {:?} - {}", src_path
, err
))?
;
199 let mut contents
= Vec
::new();
201 file
.read_to_end(&mut contents
)
203 .map_err(|err
| format_err
!("unable to read file {:?} - {}", src_path
, err
))?
;
205 let blob
= DataBlob
::encode(&contents
, crypt_config
.as_ref().map(AsRef
::as_ref
), compress
)?
;
206 let raw_data
= blob
.into_inner();
207 let size
= raw_data
.len() as u64;
208 let csum
= openssl
::sha
::sha256(&raw_data
);
210 "encoded-size": size
,
211 "file-name": file_name
,
213 self.h2
.upload("POST", "blob", Some(param
), "application/octet-stream", raw_data
).await?
;
214 Ok(BackupStats { size, csum }
)
217 pub async
fn upload_stream(
220 stream
: impl Stream
<Item
= Result
<bytes
::BytesMut
, Error
>>,
222 fixed_size
: Option
<u64>,
223 crypt_config
: Option
<Arc
<CryptConfig
>>,
224 ) -> Result
<BackupStats
, Error
> {
225 let known_chunks
= Arc
::new(Mutex
::new(HashSet
::new()));
227 let mut param
= json
!({ "archive-name": archive_name }
);
228 if let Some(size
) = fixed_size
{
229 param
["size"] = size
.into();
232 let index_path
= format
!("{}_index", prefix
);
233 let close_path
= format
!("{}_close", prefix
);
235 self.download_chunk_list(&index_path
, archive_name
, known_chunks
.clone()).await?
;
237 let wid
= self.h2
.post(&index_path
, Some(param
)).await?
.as_u64().unwrap();
239 let (chunk_count
, size
, _speed
, csum
) =
240 Self::upload_chunk_info_stream(
245 known_chunks
.clone(),
252 "chunk-count": chunk_count
,
254 "csum": proxmox
::tools
::digest_to_hex(&csum
),
256 let _value
= self.h2
.post(&close_path
, Some(param
)).await?
;
263 fn response_queue() -> (
264 mpsc
::Sender
<h2
::client
::ResponseFuture
>,
265 oneshot
::Receiver
<Result
<(), Error
>>
267 let (verify_queue_tx
, verify_queue_rx
) = mpsc
::channel(100);
268 let (verify_result_tx
, verify_result_rx
) = oneshot
::channel();
270 // FIXME: check if this works as expected as replacement for the combinator below?
271 // tokio::spawn(async move {
272 // let result: Result<(), Error> = (async move {
273 // while let Some(response) = verify_queue_rx.recv().await {
274 // match H2Client::h2api_response(response.await?).await {
275 // Ok(result) => println!("RESPONSE: {:?}", result),
276 // Err(err) => bail!("pipelined request failed: {}", err),
281 // let _ignore_closed_channel = verify_result_tx.send(result);
283 // old code for reference?
287 .try_for_each(|response
: h2
::client
::ResponseFuture
| {
289 .map_err(Error
::from
)
290 .and_then(H2Client
::h2api_response
)
291 .map_ok(|result
| println
!("RESPONSE: {:?}", result
))
292 .map_err(|err
| format_err
!("pipelined request failed: {}", err
))
295 let _ignore_closed_channel
= verify_result_tx
.send(result
);
299 (verify_queue_tx
, verify_result_rx
)
302 fn append_chunk_queue(h2
: H2Client
, wid
: u64, path
: String
) -> (
303 mpsc
::Sender
<(MergedChunkInfo
, Option
<h2
::client
::ResponseFuture
>)>,
304 oneshot
::Receiver
<Result
<(), Error
>>
306 let (verify_queue_tx
, verify_queue_rx
) = mpsc
::channel(64);
307 let (verify_result_tx
, verify_result_rx
) = oneshot
::channel();
309 let h2_2
= h2
.clone();
311 // FIXME: async-block-ify this code!
315 .and_then(move |(merged_chunk_info
, response
): (MergedChunkInfo
, Option
<h2
::client
::ResponseFuture
>)| {
316 match (response
, merged_chunk_info
) {
317 (Some(response
), MergedChunkInfo
::Known(list
)) => {
318 future
::Either
::Left(
320 .map_err(Error
::from
)
321 .and_then(H2Client
::h2api_response
)
322 .and_then(move |_result
| {
323 future
::ok(MergedChunkInfo
::Known(list
))
327 (None
, MergedChunkInfo
::Known(list
)) => {
328 future
::Either
::Right(future
::ok(MergedChunkInfo
::Known(list
)))
333 .merge_known_chunks()
334 .and_then(move |merged_chunk_info
| {
335 match merged_chunk_info
{
336 MergedChunkInfo
::Known(chunk_list
) => {
337 let mut digest_list
= vec
![];
338 let mut offset_list
= vec
![];
339 for (offset
, digest
) in chunk_list
{
340 //println!("append chunk {} (offset {})", proxmox::tools::digest_to_hex(&digest), offset);
341 digest_list
.push(digest_to_hex(&digest
));
342 offset_list
.push(offset
);
344 println
!("append chunks list len ({})", digest_list
.len());
345 let param
= json
!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list }
);
346 let request
= H2Client
::request_builder("localhost", "PUT", &path
, None
, Some("application/json")).unwrap();
347 let param_data
= bytes
::Bytes
::from(param
.to_string().into_bytes());
348 let upload_data
= Some(param_data
);
349 h2_2
.send_request(request
, upload_data
)
350 .and_then(move |response
| {
352 .map_err(Error
::from
)
353 .and_then(H2Client
::h2api_response
)
356 .map_err(|err
| format_err
!("pipelined request failed: {}", err
))
361 .try_for_each(|_
| future
::ok(()))
363 let _ignore_closed_channel
= verify_result_tx
.send(result
);
367 (verify_queue_tx
, verify_result_rx
)
370 pub async
fn download_chunk_list(
374 known_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
375 ) -> Result
<(), Error
> {
377 let param
= json
!({ "archive-name": archive_name }
);
378 let request
= H2Client
::request_builder("localhost", "GET", path
, Some(param
), None
).unwrap();
380 let h2request
= self.h2
.send_request(request
, None
).await?
;
381 let resp
= h2request
.await?
;
383 let status
= resp
.status();
385 if !status
.is_success() {
386 H2Client
::h2api_response(resp
).await?
; // raise error
390 let mut body
= resp
.into_body();
391 let mut flow_control
= body
.flow_control().clone();
393 let mut stream
= DigestListDecoder
::new(body
.map_err(Error
::from
));
395 while let Some(chunk
) = stream
.try_next().await?
{
396 let _
= flow_control
.release_capacity(chunk
.len());
397 println
!("GOT DOWNLOAD {}", digest_to_hex(&chunk
));
398 known_chunks
.lock().unwrap().insert(chunk
);
404 fn upload_chunk_info_stream(
407 stream
: impl Stream
<Item
= Result
<bytes
::BytesMut
, Error
>>,
409 known_chunks
: Arc
<Mutex
<HashSet
<[u8;32]>>>,
410 crypt_config
: Option
<Arc
<CryptConfig
>>,
411 ) -> impl Future
<Output
= Result
<(usize, usize, usize, [u8; 32]), Error
>> {
413 let repeat
= Arc
::new(AtomicUsize
::new(0));
414 let repeat2
= repeat
.clone();
416 let stream_len
= Arc
::new(AtomicUsize
::new(0));
417 let stream_len2
= stream_len
.clone();
419 let append_chunk_path
= format
!("{}_index", prefix
);
420 let upload_chunk_path
= format
!("{}_chunk", prefix
);
421 let is_fixed_chunk_size
= prefix
== "fixed";
423 let (upload_queue
, upload_result
) =
424 Self::append_chunk_queue(h2
.clone(), wid
, append_chunk_path
.to_owned());
426 let start_time
= std
::time
::Instant
::now();
428 let index_csum
= Arc
::new(Mutex
::new(Some(openssl
::sha
::Sha256
::new())));
429 let index_csum_2
= index_csum
.clone();
432 .and_then(move |data
| {
434 let chunk_len
= data
.len();
436 repeat
.fetch_add(1, Ordering
::SeqCst
);
437 let offset
= stream_len
.fetch_add(chunk_len
, Ordering
::SeqCst
) as u64;
439 let mut chunk_builder
= DataChunkBuilder
::new(data
.as_ref())
442 if let Some(ref crypt_config
) = crypt_config
{
443 chunk_builder
= chunk_builder
.crypt_config(crypt_config
);
446 let mut known_chunks
= known_chunks
.lock().unwrap();
447 let digest
= chunk_builder
.digest();
449 let mut guard
= index_csum
.lock().unwrap();
450 let csum
= guard
.as_mut().unwrap();
452 let chunk_end
= offset
+ chunk_len
as u64;
454 if !is_fixed_chunk_size { csum.update(&chunk_end.to_le_bytes()); }
457 let chunk_is_known
= known_chunks
.contains(digest
);
459 future
::ok(MergedChunkInfo
::Known(vec
![(offset
, *digest
)]))
461 known_chunks
.insert(*digest
);
462 future
::ready(chunk_builder
464 .map(move |(chunk
, digest
)| MergedChunkInfo
::New(ChunkInfo
{
467 chunk_len
: chunk_len
as u64,
473 .merge_known_chunks()
474 .try_for_each(move |merged_chunk_info
| {
476 if let MergedChunkInfo
::New(chunk_info
) = merged_chunk_info
{
477 let offset
= chunk_info
.offset
;
478 let digest
= chunk_info
.digest
;
479 let digest_str
= digest_to_hex(&digest
);
481 println
!("upload new chunk {} ({} bytes, offset {})", digest_str
,
482 chunk_info
.chunk_len
, offset
);
484 let chunk_data
= chunk_info
.chunk
.into_inner();
487 "digest": digest_str
,
488 "size": chunk_info
.chunk_len
,
489 "encoded-size": chunk_data
.len(),
492 let ct
= "application/octet-stream";
493 let request
= H2Client
::request_builder("localhost", "POST", &upload_chunk_path
, Some(param
), Some(ct
)).unwrap();
494 let upload_data
= Some(bytes
::Bytes
::from(chunk_data
));
496 let new_info
= MergedChunkInfo
::Known(vec
![(offset
, digest
)]);
498 let mut upload_queue
= upload_queue
.clone();
499 future
::Either
::Left(h2
500 .send_request(request
, upload_data
)
501 .and_then(move |response
| async
move {
503 .send((new_info
, Some(response
)))
505 .map_err(|err
| format_err
!("failed to send to upload queue: {}", err
))
509 let mut upload_queue
= upload_queue
.clone();
510 future
::Either
::Right(async
move {
512 .send((merged_chunk_info
, None
))
514 .map_err(|err
| format_err
!("failed to send to upload queue: {}", err
))
518 .then(move |result
| async
move {
519 upload_result
.await?
.and(result
)
522 let repeat
= repeat2
.load(Ordering
::SeqCst
);
523 let stream_len
= stream_len2
.load(Ordering
::SeqCst
);
524 let speed
= ((stream_len
*1_000_000)/(1024*1024))/(start_time
.elapsed().as_micros() as usize);
525 println
!("Uploaded {} chunks in {} seconds ({} MB/s).", repeat
, start_time
.elapsed().as_secs(), speed
);
527 println
!("Average chunk size was {} bytes.", stream_len
/repeat
);
528 println
!("Time per request: {} microseconds.", (start_time
.elapsed().as_micros())/(repeat
as u128
));
531 let mut guard
= index_csum_2
.lock().unwrap();
532 let csum
= guard
.take().unwrap().finish();
534 futures
::future
::ok((repeat
, stream_len
, speed
, csum
))
538 pub async
fn upload_speedtest(&self) -> Result
<usize, Error
> {
540 let mut data
= vec
![];
541 // generate pseudo random byte sequence
542 for i
in 0..1024*1024 {
544 let byte
= ((i
>> (j
<<3))&0xff) as u8;
549 let item_len
= data
.len();
553 let (upload_queue
, upload_result
) = Self::response_queue();
555 let start_time
= std
::time
::Instant
::now();
559 if start_time
.elapsed().as_secs() >= 5 {
563 let mut upload_queue
= upload_queue
.clone();
565 println
!("send test data ({} bytes)", data
.len());
566 let request
= H2Client
::request_builder("localhost", "POST", "speedtest", None
, None
).unwrap();
567 let request_future
= self.h2
.send_request(request
, Some(bytes
::Bytes
::from(data
.clone()))).await?
;
569 upload_queue
.send(request_future
).await?
;
572 drop(upload_queue
); // close queue
574 let _
= upload_result
.await?
;
576 println
!("Uploaded {} chunks in {} seconds.", repeat
, start_time
.elapsed().as_secs());
577 let speed
= ((item_len
*1_000_000*(repeat
as usize))/(1024*1024))/(start_time
.elapsed().as_micros() as usize);
578 println
!("Time per request: {} microseconds.", (start_time
.elapsed().as_micros())/(repeat
as u128
));