]> git.proxmox.com Git - proxmox-backup.git/blob - src/client/backup_writer.rs
7290dc12222e9d02f41a8218559d7f5486c743de
[proxmox-backup.git] / src / client / backup_writer.rs
1 use std::collections::HashSet;
2 use std::os::unix::fs::OpenOptionsExt;
3 use std::sync::atomic::{AtomicUsize, Ordering};
4 use std::sync::{Arc, Mutex};
5
6 use anyhow::{bail, format_err, Error};
7 use chrono::{DateTime, Utc};
8 use futures::*;
9 use futures::stream::Stream;
10 use futures::future::AbortHandle;
11 use serde_json::{json, Value};
12 use tokio::io::AsyncReadExt;
13 use tokio::sync::{mpsc, oneshot};
14
15 use proxmox::tools::digest_to_hex;
16
17 use super::merge_known_chunks::{MergedChunkInfo, MergeKnownChunks};
18 use crate::backup::*;
19
20 use super::{HttpClient, H2Client};
21
22 pub struct BackupWriter {
23 h2: H2Client,
24 abort: AbortHandle,
25 verbose: bool,
26 crypt_config: Option<Arc<CryptConfig>>,
27 }
28
29 impl Drop for BackupWriter {
30
31 fn drop(&mut self) {
32 self.abort.abort();
33 }
34 }
35
36 pub struct BackupStats {
37 pub size: u64,
38 pub csum: [u8; 32],
39 }
40
41 impl BackupWriter {
42
43 fn new(h2: H2Client, abort: AbortHandle, crypt_config: Option<Arc<CryptConfig>>, verbose: bool) -> Arc<Self> {
44 Arc::new(Self { h2, abort, crypt_config, verbose })
45 }
46
47 pub async fn start(
48 client: HttpClient,
49 crypt_config: Option<Arc<CryptConfig>>,
50 datastore: &str,
51 backup_type: &str,
52 backup_id: &str,
53 backup_time: DateTime<Utc>,
54 debug: bool,
55 ) -> Result<Arc<BackupWriter>, Error> {
56
57 let param = json!({
58 "backup-type": backup_type,
59 "backup-id": backup_id,
60 "backup-time": backup_time.timestamp(),
61 "store": datastore,
62 "debug": debug
63 });
64
65 let req = HttpClient::request_builder(
66 client.server(), "GET", "/api2/json/backup", Some(param)).unwrap();
67
68 let (h2, abort) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())).await?;
69
70 Ok(BackupWriter::new(h2, abort, crypt_config, debug))
71 }
72
73 pub async fn get(
74 &self,
75 path: &str,
76 param: Option<Value>,
77 ) -> Result<Value, Error> {
78 self.h2.get(path, param).await
79 }
80
81 pub async fn put(
82 &self,
83 path: &str,
84 param: Option<Value>,
85 ) -> Result<Value, Error> {
86 self.h2.put(path, param).await
87 }
88
89 pub async fn post(
90 &self,
91 path: &str,
92 param: Option<Value>,
93 ) -> Result<Value, Error> {
94 self.h2.post(path, param).await
95 }
96
97 pub async fn upload_post(
98 &self,
99 path: &str,
100 param: Option<Value>,
101 content_type: &str,
102 data: Vec<u8>,
103 ) -> Result<Value, Error> {
104 self.h2.upload("POST", path, param, content_type, data).await
105 }
106
107 pub async fn send_upload_request(
108 &self,
109 method: &str,
110 path: &str,
111 param: Option<Value>,
112 content_type: &str,
113 data: Vec<u8>,
114 ) -> Result<h2::client::ResponseFuture, Error> {
115
116 let request = H2Client::request_builder("localhost", method, path, param, Some(content_type)).unwrap();
117 let response_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?;
118 Ok(response_future)
119 }
120
121 pub async fn upload_put(
122 &self,
123 path: &str,
124 param: Option<Value>,
125 content_type: &str,
126 data: Vec<u8>,
127 ) -> Result<Value, Error> {
128 self.h2.upload("PUT", path, param, content_type, data).await
129 }
130
131 pub async fn finish(self: Arc<Self>) -> Result<(), Error> {
132 let h2 = self.h2.clone();
133
134 h2.post("finish", None)
135 .map_ok(move |_| {
136 self.abort.abort();
137 })
138 .await
139 }
140
141 pub fn cancel(&self) {
142 self.abort.abort();
143 }
144
145 pub async fn upload_blob<R: std::io::Read>(
146 &self,
147 mut reader: R,
148 file_name: &str,
149 ) -> Result<BackupStats, Error> {
150 let mut raw_data = Vec::new();
151 // fixme: avoid loading into memory
152 reader.read_to_end(&mut raw_data)?;
153
154 let csum = openssl::sha::sha256(&raw_data);
155 let param = json!({"encoded-size": raw_data.len(), "file-name": file_name });
156 let size = raw_data.len() as u64;
157 let _value = self.h2.upload("POST", "blob", Some(param), "application/octet-stream", raw_data).await?;
158 Ok(BackupStats { size, csum })
159 }
160
161 pub async fn upload_blob_from_data(
162 &self,
163 data: Vec<u8>,
164 file_name: &str,
165 compress: bool,
166 encrypt: bool,
167 ) -> Result<BackupStats, Error> {
168 let blob = match (encrypt, &self.crypt_config) {
169 (false, _) => DataBlob::encode(&data, None, compress)?,
170 (true, None) => bail!("requested encryption without a crypt config"),
171 (true, Some(crypt_config)) => DataBlob::encode(&data, Some(crypt_config), compress)?,
172 };
173
174 let raw_data = blob.into_inner();
175 let size = raw_data.len() as u64;
176
177 let csum = openssl::sha::sha256(&raw_data);
178 let param = json!({"encoded-size": size, "file-name": file_name });
179 let _value = self.h2.upload("POST", "blob", Some(param), "application/octet-stream", raw_data).await?;
180 Ok(BackupStats { size, csum })
181 }
182
183 pub async fn upload_blob_from_file<P: AsRef<std::path::Path>>(
184 &self,
185 src_path: P,
186 file_name: &str,
187 compress: bool,
188 encrypt: bool,
189 ) -> Result<BackupStats, Error> {
190
191 let src_path = src_path.as_ref();
192
193 let mut file = tokio::fs::File::open(src_path)
194 .await
195 .map_err(|err| format_err!("unable to open file {:?} - {}", src_path, err))?;
196
197 let mut contents = Vec::new();
198
199 file.read_to_end(&mut contents)
200 .await
201 .map_err(|err| format_err!("unable to read file {:?} - {}", src_path, err))?;
202
203 self.upload_blob_from_data(contents, file_name, compress, encrypt).await
204 }
205
206 pub async fn upload_stream(
207 &self,
208 previous_manifest: Option<Arc<BackupManifest>>,
209 archive_name: &str,
210 stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
211 prefix: &str,
212 fixed_size: Option<u64>,
213 compress: bool,
214 encrypt: bool,
215 ) -> Result<BackupStats, Error> {
216 let known_chunks = Arc::new(Mutex::new(HashSet::new()));
217
218 let mut param = json!({ "archive-name": archive_name });
219 if let Some(size) = fixed_size {
220 param["size"] = size.into();
221 }
222
223 if encrypt && self.crypt_config.is_none() {
224 bail!("requested encryption without a crypt config");
225 }
226
227 let index_path = format!("{}_index", prefix);
228 let close_path = format!("{}_close", prefix);
229
230 if let Some(manifest) = previous_manifest {
231 // try, but ignore errors
232 match archive_type(archive_name) {
233 Ok(ArchiveType::FixedIndex) => {
234 let _ = self.download_previous_fixed_index(archive_name, &manifest, known_chunks.clone()).await;
235 }
236 Ok(ArchiveType::DynamicIndex) => {
237 let _ = self.download_previous_dynamic_index(archive_name, &manifest, known_chunks.clone()).await;
238 }
239 _ => { /* do nothing */ }
240 }
241 }
242
243 let wid = self.h2.post(&index_path, Some(param)).await?.as_u64().unwrap();
244
245 let (chunk_count, size, duration, speed, csum) =
246 Self::upload_chunk_info_stream(
247 self.h2.clone(),
248 wid,
249 stream,
250 &prefix,
251 known_chunks.clone(),
252 if encrypt { self.crypt_config.clone() } else { None },
253 compress,
254 self.verbose,
255 )
256 .await?;
257
258 println!("{}: Uploaded {} bytes as {} chunks in {} seconds ({} MB/s).", archive_name, size, chunk_count, duration.as_secs(), speed);
259 if chunk_count > 0 {
260 println!("{}: Average chunk size was {} bytes.", archive_name, size/chunk_count);
261 println!("{}: Time per request: {} microseconds.", archive_name, (duration.as_micros())/(chunk_count as u128));
262 }
263
264 let param = json!({
265 "wid": wid ,
266 "chunk-count": chunk_count,
267 "size": size,
268 "csum": proxmox::tools::digest_to_hex(&csum),
269 });
270 let _value = self.h2.post(&close_path, Some(param)).await?;
271 Ok(BackupStats {
272 size: size as u64,
273 csum,
274 })
275 }
276
277 fn response_queue(verbose: bool) -> (
278 mpsc::Sender<h2::client::ResponseFuture>,
279 oneshot::Receiver<Result<(), Error>>
280 ) {
281 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100);
282 let (verify_result_tx, verify_result_rx) = oneshot::channel();
283
284 // FIXME: check if this works as expected as replacement for the combinator below?
285 // tokio::spawn(async move {
286 // let result: Result<(), Error> = (async move {
287 // while let Some(response) = verify_queue_rx.recv().await {
288 // match H2Client::h2api_response(response.await?).await {
289 // Ok(result) => println!("RESPONSE: {:?}", result),
290 // Err(err) => bail!("pipelined request failed: {}", err),
291 // }
292 // }
293 // Ok(())
294 // }).await;
295 // let _ignore_closed_channel = verify_result_tx.send(result);
296 // });
297 // old code for reference?
298 tokio::spawn(
299 verify_queue_rx
300 .map(Ok::<_, Error>)
301 .try_for_each(move |response: h2::client::ResponseFuture| {
302 response
303 .map_err(Error::from)
304 .and_then(H2Client::h2api_response)
305 .map_ok(move |result| if verbose { println!("RESPONSE: {:?}", result) })
306 .map_err(|err| format_err!("pipelined request failed: {}", err))
307 })
308 .map(|result| {
309 let _ignore_closed_channel = verify_result_tx.send(result);
310 })
311 );
312
313 (verify_queue_tx, verify_result_rx)
314 }
315
316 fn append_chunk_queue(h2: H2Client, wid: u64, path: String, verbose: bool) -> (
317 mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>,
318 oneshot::Receiver<Result<(), Error>>,
319 ) {
320 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(64);
321 let (verify_result_tx, verify_result_rx) = oneshot::channel();
322
323 let h2_2 = h2.clone();
324
325 // FIXME: async-block-ify this code!
326 tokio::spawn(
327 verify_queue_rx
328 .map(Ok::<_, Error>)
329 .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| {
330 match (response, merged_chunk_info) {
331 (Some(response), MergedChunkInfo::Known(list)) => {
332 future::Either::Left(
333 response
334 .map_err(Error::from)
335 .and_then(H2Client::h2api_response)
336 .and_then(move |_result| {
337 future::ok(MergedChunkInfo::Known(list))
338 })
339 )
340 }
341 (None, MergedChunkInfo::Known(list)) => {
342 future::Either::Right(future::ok(MergedChunkInfo::Known(list)))
343 }
344 _ => unreachable!(),
345 }
346 })
347 .merge_known_chunks()
348 .and_then(move |merged_chunk_info| {
349 match merged_chunk_info {
350 MergedChunkInfo::Known(chunk_list) => {
351 let mut digest_list = vec![];
352 let mut offset_list = vec![];
353 for (offset, digest) in chunk_list {
354 digest_list.push(digest_to_hex(&digest));
355 offset_list.push(offset);
356 }
357 if verbose { println!("append chunks list len ({})", digest_list.len()); }
358 let param = json!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list });
359 let request = H2Client::request_builder("localhost", "PUT", &path, None, Some("application/json")).unwrap();
360 let param_data = bytes::Bytes::from(param.to_string().into_bytes());
361 let upload_data = Some(param_data);
362 h2_2.send_request(request, upload_data)
363 .and_then(move |response| {
364 response
365 .map_err(Error::from)
366 .and_then(H2Client::h2api_response)
367 .map_ok(|_| ())
368 })
369 .map_err(|err| format_err!("pipelined request failed: {}", err))
370 }
371 _ => unreachable!(),
372 }
373 })
374 .try_for_each(|_| future::ok(()))
375 .map(|result| {
376 let _ignore_closed_channel = verify_result_tx.send(result);
377 })
378 );
379
380 (verify_queue_tx, verify_result_rx)
381 }
382
383 pub async fn download_previous_fixed_index(
384 &self,
385 archive_name: &str,
386 manifest: &BackupManifest,
387 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
388 ) -> Result<FixedIndexReader, Error> {
389
390 let mut tmpfile = std::fs::OpenOptions::new()
391 .write(true)
392 .read(true)
393 .custom_flags(libc::O_TMPFILE)
394 .open("/tmp")?;
395
396 let param = json!({ "archive-name": archive_name });
397 self.h2.download("previous", Some(param), &mut tmpfile).await?;
398
399 let index = FixedIndexReader::new(tmpfile)
400 .map_err(|err| format_err!("unable to read fixed index '{}' - {}", archive_name, err))?;
401 // Note: do not use values stored in index (not trusted) - instead, computed them again
402 let (csum, size) = index.compute_csum();
403 manifest.verify_file(archive_name, &csum, size)?;
404
405 // add index chunks to known chunks
406 let mut known_chunks = known_chunks.lock().unwrap();
407 for i in 0..index.index_count() {
408 known_chunks.insert(*index.index_digest(i).unwrap());
409 }
410
411 if self.verbose {
412 println!("{}: known chunks list length is {}", archive_name, index.index_count());
413 }
414
415 Ok(index)
416 }
417
418 pub async fn download_previous_dynamic_index(
419 &self,
420 archive_name: &str,
421 manifest: &BackupManifest,
422 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
423 ) -> Result<DynamicIndexReader, Error> {
424
425 let mut tmpfile = std::fs::OpenOptions::new()
426 .write(true)
427 .read(true)
428 .custom_flags(libc::O_TMPFILE)
429 .open("/tmp")?;
430
431 let param = json!({ "archive-name": archive_name });
432 self.h2.download("previous", Some(param), &mut tmpfile).await?;
433
434 let index = DynamicIndexReader::new(tmpfile)
435 .map_err(|err| format_err!("unable to read dynmamic index '{}' - {}", archive_name, err))?;
436 // Note: do not use values stored in index (not trusted) - instead, computed them again
437 let (csum, size) = index.compute_csum();
438 manifest.verify_file(archive_name, &csum, size)?;
439
440 // add index chunks to known chunks
441 let mut known_chunks = known_chunks.lock().unwrap();
442 for i in 0..index.index_count() {
443 known_chunks.insert(*index.index_digest(i).unwrap());
444 }
445
446 if self.verbose {
447 println!("{}: known chunks list length is {}", archive_name, index.index_count());
448 }
449
450 Ok(index)
451 }
452
453 /// Download backup manifest (index.json) of last backup
454 pub async fn download_previous_manifest(&self) -> Result<BackupManifest, Error> {
455
456 let mut raw_data = Vec::with_capacity(64 * 1024);
457
458 let param = json!({ "archive-name": MANIFEST_BLOB_NAME });
459 self.h2.download("previous", Some(param), &mut raw_data).await?;
460
461 let blob = DataBlob::from_raw(raw_data)?;
462 blob.verify_crc()?;
463 let data = blob.decode(self.crypt_config.as_ref().map(Arc::as_ref))?;
464
465 let manifest = BackupManifest::from_data(&data[..], self.crypt_config.as_ref().map(Arc::as_ref))?;
466
467 Ok(manifest)
468 }
469
470 fn upload_chunk_info_stream(
471 h2: H2Client,
472 wid: u64,
473 stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
474 prefix: &str,
475 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
476 crypt_config: Option<Arc<CryptConfig>>,
477 compress: bool,
478 verbose: bool,
479 ) -> impl Future<Output = Result<(usize, usize, std::time::Duration, usize, [u8; 32]), Error>> {
480
481 let repeat = Arc::new(AtomicUsize::new(0));
482 let repeat2 = repeat.clone();
483
484 let stream_len = Arc::new(AtomicUsize::new(0));
485 let stream_len2 = stream_len.clone();
486
487 let append_chunk_path = format!("{}_index", prefix);
488 let upload_chunk_path = format!("{}_chunk", prefix);
489 let is_fixed_chunk_size = prefix == "fixed";
490
491 let (upload_queue, upload_result) =
492 Self::append_chunk_queue(h2.clone(), wid, append_chunk_path.to_owned(), verbose);
493
494 let start_time = std::time::Instant::now();
495
496 let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
497 let index_csum_2 = index_csum.clone();
498
499 stream
500 .and_then(move |data| {
501
502 let chunk_len = data.len();
503
504 repeat.fetch_add(1, Ordering::SeqCst);
505 let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;
506
507 let mut chunk_builder = DataChunkBuilder::new(data.as_ref())
508 .compress(compress);
509
510 if let Some(ref crypt_config) = crypt_config {
511 chunk_builder = chunk_builder.crypt_config(crypt_config);
512 }
513
514 let mut known_chunks = known_chunks.lock().unwrap();
515 let digest = chunk_builder.digest();
516
517 let mut guard = index_csum.lock().unwrap();
518 let csum = guard.as_mut().unwrap();
519
520 let chunk_end = offset + chunk_len as u64;
521
522 if !is_fixed_chunk_size { csum.update(&chunk_end.to_le_bytes()); }
523 csum.update(digest);
524
525 let chunk_is_known = known_chunks.contains(digest);
526 if chunk_is_known {
527 future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
528 } else {
529 known_chunks.insert(*digest);
530 future::ready(chunk_builder
531 .build()
532 .map(move |(chunk, digest)| MergedChunkInfo::New(ChunkInfo {
533 chunk,
534 digest,
535 chunk_len: chunk_len as u64,
536 offset,
537 }))
538 )
539 }
540 })
541 .merge_known_chunks()
542 .try_for_each(move |merged_chunk_info| {
543
544 if let MergedChunkInfo::New(chunk_info) = merged_chunk_info {
545 let offset = chunk_info.offset;
546 let digest = chunk_info.digest;
547 let digest_str = digest_to_hex(&digest);
548
549 if verbose {
550 println!("upload new chunk {} ({} bytes, offset {})", digest_str,
551 chunk_info.chunk_len, offset);
552 }
553
554 let chunk_data = chunk_info.chunk.into_inner();
555 let param = json!({
556 "wid": wid,
557 "digest": digest_str,
558 "size": chunk_info.chunk_len,
559 "encoded-size": chunk_data.len(),
560 });
561
562 let ct = "application/octet-stream";
563 let request = H2Client::request_builder("localhost", "POST", &upload_chunk_path, Some(param), Some(ct)).unwrap();
564 let upload_data = Some(bytes::Bytes::from(chunk_data));
565
566 let new_info = MergedChunkInfo::Known(vec![(offset, digest)]);
567
568 let mut upload_queue = upload_queue.clone();
569 future::Either::Left(h2
570 .send_request(request, upload_data)
571 .and_then(move |response| async move {
572 upload_queue
573 .send((new_info, Some(response)))
574 .await
575 .map_err(|err| format_err!("failed to send to upload queue: {}", err))
576 })
577 )
578 } else {
579 let mut upload_queue = upload_queue.clone();
580 future::Either::Right(async move {
581 upload_queue
582 .send((merged_chunk_info, None))
583 .await
584 .map_err(|err| format_err!("failed to send to upload queue: {}", err))
585 })
586 }
587 })
588 .then(move |result| async move {
589 upload_result.await?.and(result)
590 }.boxed())
591 .and_then(move |_| {
592 let repeat = repeat2.load(Ordering::SeqCst);
593 let stream_len = stream_len2.load(Ordering::SeqCst);
594 let speed = ((stream_len*1_000_000)/(1024*1024))/(start_time.elapsed().as_micros() as usize);
595
596 let mut guard = index_csum_2.lock().unwrap();
597 let csum = guard.take().unwrap().finish();
598
599 futures::future::ok((repeat, stream_len, start_time.elapsed(), speed, csum))
600 })
601 }
602
603 /// Upload speed test - prints result ot stdout
604 pub async fn upload_speedtest(&self, verbose: bool) -> Result<usize, Error> {
605
606 let mut data = vec![];
607 // generate pseudo random byte sequence
608 for i in 0..1024*1024 {
609 for j in 0..4 {
610 let byte = ((i >> (j<<3))&0xff) as u8;
611 data.push(byte);
612 }
613 }
614
615 let item_len = data.len();
616
617 let mut repeat = 0;
618
619 let (upload_queue, upload_result) = Self::response_queue(verbose);
620
621 let start_time = std::time::Instant::now();
622
623 loop {
624 repeat += 1;
625 if start_time.elapsed().as_secs() >= 5 {
626 break;
627 }
628
629 let mut upload_queue = upload_queue.clone();
630
631 if verbose { println!("send test data ({} bytes)", data.len()); }
632 let request = H2Client::request_builder("localhost", "POST", "speedtest", None, None).unwrap();
633 let request_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?;
634
635 upload_queue.send(request_future).await?;
636 }
637
638 drop(upload_queue); // close queue
639
640 let _ = upload_result.await?;
641
642 println!("Uploaded {} chunks in {} seconds.", repeat, start_time.elapsed().as_secs());
643 let speed = ((item_len*1_000_000*(repeat as usize))/(1024*1024))/(start_time.elapsed().as_micros() as usize);
644 println!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128));
645
646 Ok(speed)
647 }
648 }