]> git.proxmox.com Git - proxmox-backup.git/blob - src/client/backup_writer.rs
src/client/backup_writer.rs: implement verbose options
[proxmox-backup.git] / src / client / backup_writer.rs
1 use std::collections::HashSet;
2 use std::sync::atomic::{AtomicUsize, Ordering};
3 use std::sync::{Arc, Mutex};
4
5 use failure::*;
6 use chrono::{DateTime, Utc};
7 use futures::*;
8 use futures::stream::Stream;
9 use futures::future::AbortHandle;
10 use serde_json::{json, Value};
11 use tokio::io::AsyncReadExt;
12 use tokio::sync::{mpsc, oneshot};
13
14 use proxmox::tools::digest_to_hex;
15
16 use super::merge_known_chunks::{MergedChunkInfo, MergeKnownChunks};
17 use crate::backup::*;
18
19 use super::{HttpClient, H2Client};
20
21 pub struct BackupWriter {
22 h2: H2Client,
23 abort: AbortHandle,
24 verbose: bool,
25 }
26
27 impl Drop for BackupWriter {
28
29 fn drop(&mut self) {
30 self.abort.abort();
31 }
32 }
33
34 pub struct BackupStats {
35 pub size: u64,
36 pub csum: [u8; 32],
37 }
38
39 impl BackupWriter {
40
41 fn new(h2: H2Client, abort: AbortHandle, verbose: bool) -> Arc<Self> {
42 Arc::new(Self { h2, abort, verbose })
43 }
44
45 pub async fn start(
46 client: HttpClient,
47 datastore: &str,
48 backup_type: &str,
49 backup_id: &str,
50 backup_time: DateTime<Utc>,
51 debug: bool,
52 ) -> Result<Arc<BackupWriter>, Error> {
53
54 let param = json!({
55 "backup-type": backup_type,
56 "backup-id": backup_id,
57 "backup-time": backup_time.timestamp(),
58 "store": datastore,
59 "debug": debug
60 });
61
62 let req = HttpClient::request_builder(
63 client.server(), "GET", "/api2/json/backup", Some(param)).unwrap();
64
65 let (h2, abort) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())).await?;
66
67 Ok(BackupWriter::new(h2, abort, debug))
68 }
69
70 pub async fn get(
71 &self,
72 path: &str,
73 param: Option<Value>,
74 ) -> Result<Value, Error> {
75 self.h2.get(path, param).await
76 }
77
78 pub async fn put(
79 &self,
80 path: &str,
81 param: Option<Value>,
82 ) -> Result<Value, Error> {
83 self.h2.put(path, param).await
84 }
85
86 pub async fn post(
87 &self,
88 path: &str,
89 param: Option<Value>,
90 ) -> Result<Value, Error> {
91 self.h2.post(path, param).await
92 }
93
94 pub async fn upload_post(
95 &self,
96 path: &str,
97 param: Option<Value>,
98 content_type: &str,
99 data: Vec<u8>,
100 ) -> Result<Value, Error> {
101 self.h2.upload("POST", path, param, content_type, data).await
102 }
103
104 pub async fn send_upload_request(
105 &self,
106 method: &str,
107 path: &str,
108 param: Option<Value>,
109 content_type: &str,
110 data: Vec<u8>,
111 ) -> Result<h2::client::ResponseFuture, Error> {
112
113 let request = H2Client::request_builder("localhost", method, path, param, Some(content_type)).unwrap();
114 let response_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?;
115 Ok(response_future)
116 }
117
118 pub async fn upload_put(
119 &self,
120 path: &str,
121 param: Option<Value>,
122 content_type: &str,
123 data: Vec<u8>,
124 ) -> Result<Value, Error> {
125 self.h2.upload("PUT", path, param, content_type, data).await
126 }
127
128 pub async fn finish(self: Arc<Self>) -> Result<(), Error> {
129 let h2 = self.h2.clone();
130
131 h2.post("finish", None)
132 .map_ok(move |_| {
133 self.abort.abort();
134 })
135 .await
136 }
137
138 pub fn cancel(&self) {
139 self.abort.abort();
140 }
141
142 pub async fn upload_blob<R: std::io::Read>(
143 &self,
144 mut reader: R,
145 file_name: &str,
146 ) -> Result<BackupStats, Error> {
147 let mut raw_data = Vec::new();
148 // fixme: avoid loading into memory
149 reader.read_to_end(&mut raw_data)?;
150
151 let csum = openssl::sha::sha256(&raw_data);
152 let param = json!({"encoded-size": raw_data.len(), "file-name": file_name });
153 let size = raw_data.len() as u64;
154 let _value = self.h2.upload("POST", "blob", Some(param), "application/octet-stream", raw_data).await?;
155 Ok(BackupStats { size, csum })
156 }
157
158 pub async fn upload_blob_from_data(
159 &self,
160 data: Vec<u8>,
161 file_name: &str,
162 crypt_config: Option<Arc<CryptConfig>>,
163 compress: bool,
164 sign_only: bool,
165 ) -> Result<BackupStats, Error> {
166
167 let blob = if let Some(ref crypt_config) = crypt_config {
168 if sign_only {
169 DataBlob::create_signed(&data, crypt_config, compress)?
170 } else {
171 DataBlob::encode(&data, Some(crypt_config), compress)?
172 }
173 } else {
174 DataBlob::encode(&data, None, compress)?
175 };
176
177 let raw_data = blob.into_inner();
178 let size = raw_data.len() as u64;
179
180 let csum = openssl::sha::sha256(&raw_data);
181 let param = json!({"encoded-size": size, "file-name": file_name });
182 let _value = self.h2.upload("POST", "blob", Some(param), "application/octet-stream", raw_data).await?;
183 Ok(BackupStats { size, csum })
184 }
185
186 pub async fn upload_blob_from_file<P: AsRef<std::path::Path>>(
187 &self,
188 src_path: P,
189 file_name: &str,
190 crypt_config: Option<Arc<CryptConfig>>,
191 compress: bool,
192 ) -> Result<BackupStats, Error> {
193
194 let src_path = src_path.as_ref();
195
196 let mut file = tokio::fs::File::open(src_path)
197 .await
198 .map_err(|err| format_err!("unable to open file {:?} - {}", src_path, err))?;
199
200 let mut contents = Vec::new();
201
202 file.read_to_end(&mut contents)
203 .await
204 .map_err(|err| format_err!("unable to read file {:?} - {}", src_path, err))?;
205
206 let blob = DataBlob::encode(&contents, crypt_config.as_ref().map(AsRef::as_ref), compress)?;
207 let raw_data = blob.into_inner();
208 let size = raw_data.len() as u64;
209 let csum = openssl::sha::sha256(&raw_data);
210 let param = json!({
211 "encoded-size": size,
212 "file-name": file_name,
213 });
214 self.h2.upload("POST", "blob", Some(param), "application/octet-stream", raw_data).await?;
215 Ok(BackupStats { size, csum })
216 }
217
218 pub async fn upload_stream(
219 &self,
220 archive_name: &str,
221 stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
222 prefix: &str,
223 fixed_size: Option<u64>,
224 crypt_config: Option<Arc<CryptConfig>>,
225 ) -> Result<BackupStats, Error> {
226 let known_chunks = Arc::new(Mutex::new(HashSet::new()));
227
228 let mut param = json!({ "archive-name": archive_name });
229 if let Some(size) = fixed_size {
230 param["size"] = size.into();
231 }
232
233 let index_path = format!("{}_index", prefix);
234 let close_path = format!("{}_close", prefix);
235
236 self.download_chunk_list(&index_path, archive_name, known_chunks.clone()).await?;
237
238 let wid = self.h2.post(&index_path, Some(param)).await?.as_u64().unwrap();
239
240 let (chunk_count, size, _speed, csum) =
241 Self::upload_chunk_info_stream(
242 self.h2.clone(),
243 wid,
244 stream,
245 &prefix,
246 known_chunks.clone(),
247 crypt_config,
248 self.verbose,
249 )
250 .await?;
251
252 let param = json!({
253 "wid": wid ,
254 "chunk-count": chunk_count,
255 "size": size,
256 "csum": proxmox::tools::digest_to_hex(&csum),
257 });
258 let _value = self.h2.post(&close_path, Some(param)).await?;
259 Ok(BackupStats {
260 size: size as u64,
261 csum,
262 })
263 }
264
265 fn response_queue() -> (
266 mpsc::Sender<h2::client::ResponseFuture>,
267 oneshot::Receiver<Result<(), Error>>
268 ) {
269 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100);
270 let (verify_result_tx, verify_result_rx) = oneshot::channel();
271
272 // FIXME: check if this works as expected as replacement for the combinator below?
273 // tokio::spawn(async move {
274 // let result: Result<(), Error> = (async move {
275 // while let Some(response) = verify_queue_rx.recv().await {
276 // match H2Client::h2api_response(response.await?).await {
277 // Ok(result) => println!("RESPONSE: {:?}", result),
278 // Err(err) => bail!("pipelined request failed: {}", err),
279 // }
280 // }
281 // Ok(())
282 // }).await;
283 // let _ignore_closed_channel = verify_result_tx.send(result);
284 // });
285 // old code for reference?
286 tokio::spawn(
287 verify_queue_rx
288 .map(Ok::<_, Error>)
289 .try_for_each(|response: h2::client::ResponseFuture| {
290 response
291 .map_err(Error::from)
292 .and_then(H2Client::h2api_response)
293 .map_ok(|result| println!("RESPONSE: {:?}", result))
294 .map_err(|err| format_err!("pipelined request failed: {}", err))
295 })
296 .map(|result| {
297 let _ignore_closed_channel = verify_result_tx.send(result);
298 })
299 );
300
301 (verify_queue_tx, verify_result_rx)
302 }
303
304 fn append_chunk_queue(h2: H2Client, wid: u64, path: String, verbose: bool) -> (
305 mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>,
306 oneshot::Receiver<Result<(), Error>>,
307 ) {
308 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(64);
309 let (verify_result_tx, verify_result_rx) = oneshot::channel();
310
311 let h2_2 = h2.clone();
312
313 // FIXME: async-block-ify this code!
314 tokio::spawn(
315 verify_queue_rx
316 .map(Ok::<_, Error>)
317 .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| {
318 match (response, merged_chunk_info) {
319 (Some(response), MergedChunkInfo::Known(list)) => {
320 future::Either::Left(
321 response
322 .map_err(Error::from)
323 .and_then(H2Client::h2api_response)
324 .and_then(move |_result| {
325 future::ok(MergedChunkInfo::Known(list))
326 })
327 )
328 }
329 (None, MergedChunkInfo::Known(list)) => {
330 future::Either::Right(future::ok(MergedChunkInfo::Known(list)))
331 }
332 _ => unreachable!(),
333 }
334 })
335 .merge_known_chunks()
336 .and_then(move |merged_chunk_info| {
337 match merged_chunk_info {
338 MergedChunkInfo::Known(chunk_list) => {
339 let mut digest_list = vec![];
340 let mut offset_list = vec![];
341 for (offset, digest) in chunk_list {
342 digest_list.push(digest_to_hex(&digest));
343 offset_list.push(offset);
344 }
345 if verbose { println!("append chunks list len ({})", digest_list.len()); }
346 let param = json!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list });
347 let request = H2Client::request_builder("localhost", "PUT", &path, None, Some("application/json")).unwrap();
348 let param_data = bytes::Bytes::from(param.to_string().into_bytes());
349 let upload_data = Some(param_data);
350 h2_2.send_request(request, upload_data)
351 .and_then(move |response| {
352 response
353 .map_err(Error::from)
354 .and_then(H2Client::h2api_response)
355 .map_ok(|_| ())
356 })
357 .map_err(|err| format_err!("pipelined request failed: {}", err))
358 }
359 _ => unreachable!(),
360 }
361 })
362 .try_for_each(|_| future::ok(()))
363 .map(|result| {
364 let _ignore_closed_channel = verify_result_tx.send(result);
365 })
366 );
367
368 (verify_queue_tx, verify_result_rx)
369 }
370
371 pub async fn download_chunk_list(
372 &self,
373 path: &str,
374 archive_name: &str,
375 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
376 ) -> Result<(), Error> {
377
378 let param = json!({ "archive-name": archive_name });
379 let request = H2Client::request_builder("localhost", "GET", path, Some(param), None).unwrap();
380
381 let h2request = self.h2.send_request(request, None).await?;
382 let resp = h2request.await?;
383
384 let status = resp.status();
385
386 if !status.is_success() {
387 H2Client::h2api_response(resp).await?; // raise error
388 unreachable!();
389 }
390
391 let mut body = resp.into_body();
392 let mut flow_control = body.flow_control().clone();
393
394 let mut stream = DigestListDecoder::new(body.map_err(Error::from));
395
396 while let Some(chunk) = stream.try_next().await? {
397 let _ = flow_control.release_capacity(chunk.len());
398 known_chunks.lock().unwrap().insert(chunk);
399 }
400
401 if self.verbose {
402 println!("known chunks list length: {}", known_chunks.lock().unwrap().len());
403 }
404
405 Ok(())
406 }
407
408 fn upload_chunk_info_stream(
409 h2: H2Client,
410 wid: u64,
411 stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
412 prefix: &str,
413 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
414 crypt_config: Option<Arc<CryptConfig>>,
415 verbose: bool,
416 ) -> impl Future<Output = Result<(usize, usize, usize, [u8; 32]), Error>> {
417
418 let repeat = Arc::new(AtomicUsize::new(0));
419 let repeat2 = repeat.clone();
420
421 let stream_len = Arc::new(AtomicUsize::new(0));
422 let stream_len2 = stream_len.clone();
423
424 let append_chunk_path = format!("{}_index", prefix);
425 let upload_chunk_path = format!("{}_chunk", prefix);
426 let is_fixed_chunk_size = prefix == "fixed";
427
428 let (upload_queue, upload_result) =
429 Self::append_chunk_queue(h2.clone(), wid, append_chunk_path.to_owned(), verbose);
430
431 let start_time = std::time::Instant::now();
432
433 let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
434 let index_csum_2 = index_csum.clone();
435
436 stream
437 .and_then(move |data| {
438
439 let chunk_len = data.len();
440
441 repeat.fetch_add(1, Ordering::SeqCst);
442 let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;
443
444 let mut chunk_builder = DataChunkBuilder::new(data.as_ref())
445 .compress(true);
446
447 if let Some(ref crypt_config) = crypt_config {
448 chunk_builder = chunk_builder.crypt_config(crypt_config);
449 }
450
451 let mut known_chunks = known_chunks.lock().unwrap();
452 let digest = chunk_builder.digest();
453
454 let mut guard = index_csum.lock().unwrap();
455 let csum = guard.as_mut().unwrap();
456
457 let chunk_end = offset + chunk_len as u64;
458
459 if !is_fixed_chunk_size { csum.update(&chunk_end.to_le_bytes()); }
460 csum.update(digest);
461
462 let chunk_is_known = known_chunks.contains(digest);
463 if chunk_is_known {
464 future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
465 } else {
466 known_chunks.insert(*digest);
467 future::ready(chunk_builder
468 .build()
469 .map(move |(chunk, digest)| MergedChunkInfo::New(ChunkInfo {
470 chunk,
471 digest,
472 chunk_len: chunk_len as u64,
473 offset,
474 }))
475 )
476 }
477 })
478 .merge_known_chunks()
479 .try_for_each(move |merged_chunk_info| {
480
481 if let MergedChunkInfo::New(chunk_info) = merged_chunk_info {
482 let offset = chunk_info.offset;
483 let digest = chunk_info.digest;
484 let digest_str = digest_to_hex(&digest);
485
486 if verbose {
487 println!("upload new chunk {} ({} bytes, offset {})", digest_str,
488 chunk_info.chunk_len, offset);
489 }
490
491 let chunk_data = chunk_info.chunk.into_inner();
492 let param = json!({
493 "wid": wid,
494 "digest": digest_str,
495 "size": chunk_info.chunk_len,
496 "encoded-size": chunk_data.len(),
497 });
498
499 let ct = "application/octet-stream";
500 let request = H2Client::request_builder("localhost", "POST", &upload_chunk_path, Some(param), Some(ct)).unwrap();
501 let upload_data = Some(bytes::Bytes::from(chunk_data));
502
503 let new_info = MergedChunkInfo::Known(vec![(offset, digest)]);
504
505 let mut upload_queue = upload_queue.clone();
506 future::Either::Left(h2
507 .send_request(request, upload_data)
508 .and_then(move |response| async move {
509 upload_queue
510 .send((new_info, Some(response)))
511 .await
512 .map_err(|err| format_err!("failed to send to upload queue: {}", err))
513 })
514 )
515 } else {
516 let mut upload_queue = upload_queue.clone();
517 future::Either::Right(async move {
518 upload_queue
519 .send((merged_chunk_info, None))
520 .await
521 .map_err(|err| format_err!("failed to send to upload queue: {}", err))
522 })
523 }
524 })
525 .then(move |result| async move {
526 upload_result.await?.and(result)
527 }.boxed())
528 .and_then(move |_| {
529 let repeat = repeat2.load(Ordering::SeqCst);
530 let stream_len = stream_len2.load(Ordering::SeqCst);
531 let speed = ((stream_len*1_000_000)/(1024*1024))/(start_time.elapsed().as_micros() as usize);
532 println!("Uploaded {} chunks in {} seconds ({} MB/s).", repeat, start_time.elapsed().as_secs(), speed);
533 if repeat > 0 {
534 println!("Average chunk size was {} bytes.", stream_len/repeat);
535 println!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128));
536 }
537
538 let mut guard = index_csum_2.lock().unwrap();
539 let csum = guard.take().unwrap().finish();
540
541 futures::future::ok((repeat, stream_len, speed, csum))
542 })
543 }
544
545 pub async fn upload_speedtest(&self) -> Result<usize, Error> {
546
547 let mut data = vec![];
548 // generate pseudo random byte sequence
549 for i in 0..1024*1024 {
550 for j in 0..4 {
551 let byte = ((i >> (j<<3))&0xff) as u8;
552 data.push(byte);
553 }
554 }
555
556 let item_len = data.len();
557
558 let mut repeat = 0;
559
560 let (upload_queue, upload_result) = Self::response_queue();
561
562 let start_time = std::time::Instant::now();
563
564 loop {
565 repeat += 1;
566 if start_time.elapsed().as_secs() >= 5 {
567 break;
568 }
569
570 let mut upload_queue = upload_queue.clone();
571
572 println!("send test data ({} bytes)", data.len());
573 let request = H2Client::request_builder("localhost", "POST", "speedtest", None, None).unwrap();
574 let request_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?;
575
576 upload_queue.send(request_future).await?;
577 }
578
579 drop(upload_queue); // close queue
580
581 let _ = upload_result.await?;
582
583 println!("Uploaded {} chunks in {} seconds.", repeat, start_time.elapsed().as_secs());
584 let speed = ((item_len*1_000_000*(repeat as usize))/(1024*1024))/(start_time.elapsed().as_micros() as usize);
585 println!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128));
586
587 Ok(speed)
588 }
589 }