]> git.proxmox.com Git - proxmox-backup.git/blob - src/client/backup_writer.rs
src/client/backup_writer.rs: also print archive size
[proxmox-backup.git] / src / client / backup_writer.rs
1 use std::collections::HashSet;
2 use std::sync::atomic::{AtomicUsize, Ordering};
3 use std::sync::{Arc, Mutex};
4
5 use failure::*;
6 use chrono::{DateTime, Utc};
7 use futures::*;
8 use futures::stream::Stream;
9 use futures::future::AbortHandle;
10 use serde_json::{json, Value};
11 use tokio::io::AsyncReadExt;
12 use tokio::sync::{mpsc, oneshot};
13
14 use proxmox::tools::digest_to_hex;
15
16 use super::merge_known_chunks::{MergedChunkInfo, MergeKnownChunks};
17 use crate::backup::*;
18
19 use super::{HttpClient, H2Client};
20
21 pub struct BackupWriter {
22 h2: H2Client,
23 abort: AbortHandle,
24 verbose: bool,
25 }
26
27 impl Drop for BackupWriter {
28
29 fn drop(&mut self) {
30 self.abort.abort();
31 }
32 }
33
34 pub struct BackupStats {
35 pub size: u64,
36 pub csum: [u8; 32],
37 }
38
39 impl BackupWriter {
40
41 fn new(h2: H2Client, abort: AbortHandle, verbose: bool) -> Arc<Self> {
42 Arc::new(Self { h2, abort, verbose })
43 }
44
45 pub async fn start(
46 client: HttpClient,
47 datastore: &str,
48 backup_type: &str,
49 backup_id: &str,
50 backup_time: DateTime<Utc>,
51 debug: bool,
52 ) -> Result<Arc<BackupWriter>, Error> {
53
54 let param = json!({
55 "backup-type": backup_type,
56 "backup-id": backup_id,
57 "backup-time": backup_time.timestamp(),
58 "store": datastore,
59 "debug": debug
60 });
61
62 let req = HttpClient::request_builder(
63 client.server(), "GET", "/api2/json/backup", Some(param)).unwrap();
64
65 let (h2, abort) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())).await?;
66
67 Ok(BackupWriter::new(h2, abort, debug))
68 }
69
70 pub async fn get(
71 &self,
72 path: &str,
73 param: Option<Value>,
74 ) -> Result<Value, Error> {
75 self.h2.get(path, param).await
76 }
77
78 pub async fn put(
79 &self,
80 path: &str,
81 param: Option<Value>,
82 ) -> Result<Value, Error> {
83 self.h2.put(path, param).await
84 }
85
86 pub async fn post(
87 &self,
88 path: &str,
89 param: Option<Value>,
90 ) -> Result<Value, Error> {
91 self.h2.post(path, param).await
92 }
93
94 pub async fn upload_post(
95 &self,
96 path: &str,
97 param: Option<Value>,
98 content_type: &str,
99 data: Vec<u8>,
100 ) -> Result<Value, Error> {
101 self.h2.upload("POST", path, param, content_type, data).await
102 }
103
104 pub async fn send_upload_request(
105 &self,
106 method: &str,
107 path: &str,
108 param: Option<Value>,
109 content_type: &str,
110 data: Vec<u8>,
111 ) -> Result<h2::client::ResponseFuture, Error> {
112
113 let request = H2Client::request_builder("localhost", method, path, param, Some(content_type)).unwrap();
114 let response_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?;
115 Ok(response_future)
116 }
117
118 pub async fn upload_put(
119 &self,
120 path: &str,
121 param: Option<Value>,
122 content_type: &str,
123 data: Vec<u8>,
124 ) -> Result<Value, Error> {
125 self.h2.upload("PUT", path, param, content_type, data).await
126 }
127
128 pub async fn finish(self: Arc<Self>) -> Result<(), Error> {
129 let h2 = self.h2.clone();
130
131 h2.post("finish", None)
132 .map_ok(move |_| {
133 self.abort.abort();
134 })
135 .await
136 }
137
138 pub fn cancel(&self) {
139 self.abort.abort();
140 }
141
142 pub async fn upload_blob<R: std::io::Read>(
143 &self,
144 mut reader: R,
145 file_name: &str,
146 ) -> Result<BackupStats, Error> {
147 let mut raw_data = Vec::new();
148 // fixme: avoid loading into memory
149 reader.read_to_end(&mut raw_data)?;
150
151 let csum = openssl::sha::sha256(&raw_data);
152 let param = json!({"encoded-size": raw_data.len(), "file-name": file_name });
153 let size = raw_data.len() as u64;
154 let _value = self.h2.upload("POST", "blob", Some(param), "application/octet-stream", raw_data).await?;
155 Ok(BackupStats { size, csum })
156 }
157
158 pub async fn upload_blob_from_data(
159 &self,
160 data: Vec<u8>,
161 file_name: &str,
162 crypt_config: Option<Arc<CryptConfig>>,
163 compress: bool,
164 sign_only: bool,
165 ) -> Result<BackupStats, Error> {
166
167 let blob = if let Some(ref crypt_config) = crypt_config {
168 if sign_only {
169 DataBlob::create_signed(&data, crypt_config, compress)?
170 } else {
171 DataBlob::encode(&data, Some(crypt_config), compress)?
172 }
173 } else {
174 DataBlob::encode(&data, None, compress)?
175 };
176
177 let raw_data = blob.into_inner();
178 let size = raw_data.len() as u64;
179
180 let csum = openssl::sha::sha256(&raw_data);
181 let param = json!({"encoded-size": size, "file-name": file_name });
182 let _value = self.h2.upload("POST", "blob", Some(param), "application/octet-stream", raw_data).await?;
183 Ok(BackupStats { size, csum })
184 }
185
186 pub async fn upload_blob_from_file<P: AsRef<std::path::Path>>(
187 &self,
188 src_path: P,
189 file_name: &str,
190 crypt_config: Option<Arc<CryptConfig>>,
191 compress: bool,
192 ) -> Result<BackupStats, Error> {
193
194 let src_path = src_path.as_ref();
195
196 let mut file = tokio::fs::File::open(src_path)
197 .await
198 .map_err(|err| format_err!("unable to open file {:?} - {}", src_path, err))?;
199
200 let mut contents = Vec::new();
201
202 file.read_to_end(&mut contents)
203 .await
204 .map_err(|err| format_err!("unable to read file {:?} - {}", src_path, err))?;
205
206 let blob = DataBlob::encode(&contents, crypt_config.as_ref().map(AsRef::as_ref), compress)?;
207 let raw_data = blob.into_inner();
208 let size = raw_data.len() as u64;
209 let csum = openssl::sha::sha256(&raw_data);
210 let param = json!({
211 "encoded-size": size,
212 "file-name": file_name,
213 });
214 self.h2.upload("POST", "blob", Some(param), "application/octet-stream", raw_data).await?;
215 Ok(BackupStats { size, csum })
216 }
217
218 pub async fn upload_stream(
219 &self,
220 archive_name: &str,
221 stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
222 prefix: &str,
223 fixed_size: Option<u64>,
224 crypt_config: Option<Arc<CryptConfig>>,
225 ) -> Result<BackupStats, Error> {
226 let known_chunks = Arc::new(Mutex::new(HashSet::new()));
227
228 let mut param = json!({ "archive-name": archive_name });
229 if let Some(size) = fixed_size {
230 param["size"] = size.into();
231 }
232
233 let index_path = format!("{}_index", prefix);
234 let close_path = format!("{}_close", prefix);
235
236 self.download_chunk_list(&index_path, archive_name, known_chunks.clone()).await?;
237
238 let wid = self.h2.post(&index_path, Some(param)).await?.as_u64().unwrap();
239
240 let (chunk_count, size, duration, speed, csum) =
241 Self::upload_chunk_info_stream(
242 self.h2.clone(),
243 wid,
244 stream,
245 &prefix,
246 known_chunks.clone(),
247 crypt_config,
248 self.verbose,
249 )
250 .await?;
251
252 println!("{}: Uploaded {} bytes as {} chunks in {} seconds ({} MB/s).", archive_name, size, chunk_count, duration.as_secs(), speed);
253 if chunk_count > 0 {
254 println!("{}: Average chunk size was {} bytes.", archive_name, size/chunk_count);
255 println!("{}: Time per request: {} microseconds.", archive_name, (duration.as_micros())/(chunk_count as u128));
256 }
257
258 let param = json!({
259 "wid": wid ,
260 "chunk-count": chunk_count,
261 "size": size,
262 "csum": proxmox::tools::digest_to_hex(&csum),
263 });
264 let _value = self.h2.post(&close_path, Some(param)).await?;
265 Ok(BackupStats {
266 size: size as u64,
267 csum,
268 })
269 }
270
271 fn response_queue() -> (
272 mpsc::Sender<h2::client::ResponseFuture>,
273 oneshot::Receiver<Result<(), Error>>
274 ) {
275 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100);
276 let (verify_result_tx, verify_result_rx) = oneshot::channel();
277
278 // FIXME: check if this works as expected as replacement for the combinator below?
279 // tokio::spawn(async move {
280 // let result: Result<(), Error> = (async move {
281 // while let Some(response) = verify_queue_rx.recv().await {
282 // match H2Client::h2api_response(response.await?).await {
283 // Ok(result) => println!("RESPONSE: {:?}", result),
284 // Err(err) => bail!("pipelined request failed: {}", err),
285 // }
286 // }
287 // Ok(())
288 // }).await;
289 // let _ignore_closed_channel = verify_result_tx.send(result);
290 // });
291 // old code for reference?
292 tokio::spawn(
293 verify_queue_rx
294 .map(Ok::<_, Error>)
295 .try_for_each(|response: h2::client::ResponseFuture| {
296 response
297 .map_err(Error::from)
298 .and_then(H2Client::h2api_response)
299 .map_ok(|result| println!("RESPONSE: {:?}", result))
300 .map_err(|err| format_err!("pipelined request failed: {}", err))
301 })
302 .map(|result| {
303 let _ignore_closed_channel = verify_result_tx.send(result);
304 })
305 );
306
307 (verify_queue_tx, verify_result_rx)
308 }
309
310 fn append_chunk_queue(h2: H2Client, wid: u64, path: String, verbose: bool) -> (
311 mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>,
312 oneshot::Receiver<Result<(), Error>>,
313 ) {
314 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(64);
315 let (verify_result_tx, verify_result_rx) = oneshot::channel();
316
317 let h2_2 = h2.clone();
318
319 // FIXME: async-block-ify this code!
320 tokio::spawn(
321 verify_queue_rx
322 .map(Ok::<_, Error>)
323 .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| {
324 match (response, merged_chunk_info) {
325 (Some(response), MergedChunkInfo::Known(list)) => {
326 future::Either::Left(
327 response
328 .map_err(Error::from)
329 .and_then(H2Client::h2api_response)
330 .and_then(move |_result| {
331 future::ok(MergedChunkInfo::Known(list))
332 })
333 )
334 }
335 (None, MergedChunkInfo::Known(list)) => {
336 future::Either::Right(future::ok(MergedChunkInfo::Known(list)))
337 }
338 _ => unreachable!(),
339 }
340 })
341 .merge_known_chunks()
342 .and_then(move |merged_chunk_info| {
343 match merged_chunk_info {
344 MergedChunkInfo::Known(chunk_list) => {
345 let mut digest_list = vec![];
346 let mut offset_list = vec![];
347 for (offset, digest) in chunk_list {
348 digest_list.push(digest_to_hex(&digest));
349 offset_list.push(offset);
350 }
351 if verbose { println!("append chunks list len ({})", digest_list.len()); }
352 let param = json!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list });
353 let request = H2Client::request_builder("localhost", "PUT", &path, None, Some("application/json")).unwrap();
354 let param_data = bytes::Bytes::from(param.to_string().into_bytes());
355 let upload_data = Some(param_data);
356 h2_2.send_request(request, upload_data)
357 .and_then(move |response| {
358 response
359 .map_err(Error::from)
360 .and_then(H2Client::h2api_response)
361 .map_ok(|_| ())
362 })
363 .map_err(|err| format_err!("pipelined request failed: {}", err))
364 }
365 _ => unreachable!(),
366 }
367 })
368 .try_for_each(|_| future::ok(()))
369 .map(|result| {
370 let _ignore_closed_channel = verify_result_tx.send(result);
371 })
372 );
373
374 (verify_queue_tx, verify_result_rx)
375 }
376
377 pub async fn download_chunk_list(
378 &self,
379 path: &str,
380 archive_name: &str,
381 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
382 ) -> Result<(), Error> {
383
384 let param = json!({ "archive-name": archive_name });
385 let request = H2Client::request_builder("localhost", "GET", path, Some(param), None).unwrap();
386
387 let h2request = self.h2.send_request(request, None).await?;
388 let resp = h2request.await?;
389
390 let status = resp.status();
391
392 if !status.is_success() {
393 H2Client::h2api_response(resp).await?; // raise error
394 unreachable!();
395 }
396
397 let mut body = resp.into_body();
398 let mut flow_control = body.flow_control().clone();
399
400 let mut stream = DigestListDecoder::new(body.map_err(Error::from));
401
402 while let Some(chunk) = stream.try_next().await? {
403 let _ = flow_control.release_capacity(chunk.len());
404 known_chunks.lock().unwrap().insert(chunk);
405 }
406
407 if self.verbose {
408 println!("{}: known chunks list length is {}", archive_name, known_chunks.lock().unwrap().len());
409 }
410
411 Ok(())
412 }
413
414 fn upload_chunk_info_stream(
415 h2: H2Client,
416 wid: u64,
417 stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
418 prefix: &str,
419 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
420 crypt_config: Option<Arc<CryptConfig>>,
421 verbose: bool,
422 ) -> impl Future<Output = Result<(usize, usize, std::time::Duration, usize, [u8; 32]), Error>> {
423
424 let repeat = Arc::new(AtomicUsize::new(0));
425 let repeat2 = repeat.clone();
426
427 let stream_len = Arc::new(AtomicUsize::new(0));
428 let stream_len2 = stream_len.clone();
429
430 let append_chunk_path = format!("{}_index", prefix);
431 let upload_chunk_path = format!("{}_chunk", prefix);
432 let is_fixed_chunk_size = prefix == "fixed";
433
434 let (upload_queue, upload_result) =
435 Self::append_chunk_queue(h2.clone(), wid, append_chunk_path.to_owned(), verbose);
436
437 let start_time = std::time::Instant::now();
438
439 let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
440 let index_csum_2 = index_csum.clone();
441
442 stream
443 .and_then(move |data| {
444
445 let chunk_len = data.len();
446
447 repeat.fetch_add(1, Ordering::SeqCst);
448 let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;
449
450 let mut chunk_builder = DataChunkBuilder::new(data.as_ref())
451 .compress(true);
452
453 if let Some(ref crypt_config) = crypt_config {
454 chunk_builder = chunk_builder.crypt_config(crypt_config);
455 }
456
457 let mut known_chunks = known_chunks.lock().unwrap();
458 let digest = chunk_builder.digest();
459
460 let mut guard = index_csum.lock().unwrap();
461 let csum = guard.as_mut().unwrap();
462
463 let chunk_end = offset + chunk_len as u64;
464
465 if !is_fixed_chunk_size { csum.update(&chunk_end.to_le_bytes()); }
466 csum.update(digest);
467
468 let chunk_is_known = known_chunks.contains(digest);
469 if chunk_is_known {
470 future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
471 } else {
472 known_chunks.insert(*digest);
473 future::ready(chunk_builder
474 .build()
475 .map(move |(chunk, digest)| MergedChunkInfo::New(ChunkInfo {
476 chunk,
477 digest,
478 chunk_len: chunk_len as u64,
479 offset,
480 }))
481 )
482 }
483 })
484 .merge_known_chunks()
485 .try_for_each(move |merged_chunk_info| {
486
487 if let MergedChunkInfo::New(chunk_info) = merged_chunk_info {
488 let offset = chunk_info.offset;
489 let digest = chunk_info.digest;
490 let digest_str = digest_to_hex(&digest);
491
492 if verbose {
493 println!("upload new chunk {} ({} bytes, offset {})", digest_str,
494 chunk_info.chunk_len, offset);
495 }
496
497 let chunk_data = chunk_info.chunk.into_inner();
498 let param = json!({
499 "wid": wid,
500 "digest": digest_str,
501 "size": chunk_info.chunk_len,
502 "encoded-size": chunk_data.len(),
503 });
504
505 let ct = "application/octet-stream";
506 let request = H2Client::request_builder("localhost", "POST", &upload_chunk_path, Some(param), Some(ct)).unwrap();
507 let upload_data = Some(bytes::Bytes::from(chunk_data));
508
509 let new_info = MergedChunkInfo::Known(vec![(offset, digest)]);
510
511 let mut upload_queue = upload_queue.clone();
512 future::Either::Left(h2
513 .send_request(request, upload_data)
514 .and_then(move |response| async move {
515 upload_queue
516 .send((new_info, Some(response)))
517 .await
518 .map_err(|err| format_err!("failed to send to upload queue: {}", err))
519 })
520 )
521 } else {
522 let mut upload_queue = upload_queue.clone();
523 future::Either::Right(async move {
524 upload_queue
525 .send((merged_chunk_info, None))
526 .await
527 .map_err(|err| format_err!("failed to send to upload queue: {}", err))
528 })
529 }
530 })
531 .then(move |result| async move {
532 upload_result.await?.and(result)
533 }.boxed())
534 .and_then(move |_| {
535 let repeat = repeat2.load(Ordering::SeqCst);
536 let stream_len = stream_len2.load(Ordering::SeqCst);
537 let speed = ((stream_len*1_000_000)/(1024*1024))/(start_time.elapsed().as_micros() as usize);
538
539 let mut guard = index_csum_2.lock().unwrap();
540 let csum = guard.take().unwrap().finish();
541
542 futures::future::ok((repeat, stream_len, start_time.elapsed(), speed, csum))
543 })
544 }
545
546 pub async fn upload_speedtest(&self) -> Result<usize, Error> {
547
548 let mut data = vec![];
549 // generate pseudo random byte sequence
550 for i in 0..1024*1024 {
551 for j in 0..4 {
552 let byte = ((i >> (j<<3))&0xff) as u8;
553 data.push(byte);
554 }
555 }
556
557 let item_len = data.len();
558
559 let mut repeat = 0;
560
561 let (upload_queue, upload_result) = Self::response_queue();
562
563 let start_time = std::time::Instant::now();
564
565 loop {
566 repeat += 1;
567 if start_time.elapsed().as_secs() >= 5 {
568 break;
569 }
570
571 let mut upload_queue = upload_queue.clone();
572
573 println!("send test data ({} bytes)", data.len());
574 let request = H2Client::request_builder("localhost", "POST", "speedtest", None, None).unwrap();
575 let request_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?;
576
577 upload_queue.send(request_future).await?;
578 }
579
580 drop(upload_queue); // close queue
581
582 let _ = upload_result.await?;
583
584 println!("Uploaded {} chunks in {} seconds.", repeat, start_time.elapsed().as_secs());
585 let speed = ((item_len*1_000_000*(repeat as usize))/(1024*1024))/(start_time.elapsed().as_micros() as usize);
586 println!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128));
587
588 Ok(speed)
589 }
590 }