]> git.proxmox.com Git - proxmox-backup.git/blob - src/client/backup_writer.rs
src/client/backup_writer.rs - upload_stream: add crypt_mode
[proxmox-backup.git] / src / client / backup_writer.rs
1 use std::collections::HashSet;
2 use std::os::unix::fs::OpenOptionsExt;
3 use std::sync::atomic::{AtomicUsize, Ordering};
4 use std::sync::{Arc, Mutex};
5
6 use anyhow::{bail, format_err, Error};
7 use chrono::{DateTime, Utc};
8 use futures::*;
9 use futures::stream::Stream;
10 use futures::future::AbortHandle;
11 use serde_json::{json, Value};
12 use tokio::io::AsyncReadExt;
13 use tokio::sync::{mpsc, oneshot};
14
15 use proxmox::tools::digest_to_hex;
16
17 use super::merge_known_chunks::{MergedChunkInfo, MergeKnownChunks};
18 use crate::backup::*;
19
20 use super::{HttpClient, H2Client};
21
22 pub struct BackupWriter {
23 h2: H2Client,
24 abort: AbortHandle,
25 verbose: bool,
26 crypt_config: Option<Arc<CryptConfig>>,
27 }
28
29 impl Drop for BackupWriter {
30
31 fn drop(&mut self) {
32 self.abort.abort();
33 }
34 }
35
36 pub struct BackupStats {
37 pub size: u64,
38 pub csum: [u8; 32],
39 }
40
41 impl BackupWriter {
42
43 fn new(h2: H2Client, abort: AbortHandle, crypt_config: Option<Arc<CryptConfig>>, verbose: bool) -> Arc<Self> {
44 Arc::new(Self { h2, abort, crypt_config, verbose })
45 }
46
47 pub async fn start(
48 client: HttpClient,
49 crypt_config: Option<Arc<CryptConfig>>,
50 datastore: &str,
51 backup_type: &str,
52 backup_id: &str,
53 backup_time: DateTime<Utc>,
54 debug: bool,
55 ) -> Result<Arc<BackupWriter>, Error> {
56
57 let param = json!({
58 "backup-type": backup_type,
59 "backup-id": backup_id,
60 "backup-time": backup_time.timestamp(),
61 "store": datastore,
62 "debug": debug
63 });
64
65 let req = HttpClient::request_builder(
66 client.server(), "GET", "/api2/json/backup", Some(param)).unwrap();
67
68 let (h2, abort) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())).await?;
69
70 Ok(BackupWriter::new(h2, abort, crypt_config, debug))
71 }
72
73 pub async fn get(
74 &self,
75 path: &str,
76 param: Option<Value>,
77 ) -> Result<Value, Error> {
78 self.h2.get(path, param).await
79 }
80
81 pub async fn put(
82 &self,
83 path: &str,
84 param: Option<Value>,
85 ) -> Result<Value, Error> {
86 self.h2.put(path, param).await
87 }
88
89 pub async fn post(
90 &self,
91 path: &str,
92 param: Option<Value>,
93 ) -> Result<Value, Error> {
94 self.h2.post(path, param).await
95 }
96
97 pub async fn upload_post(
98 &self,
99 path: &str,
100 param: Option<Value>,
101 content_type: &str,
102 data: Vec<u8>,
103 ) -> Result<Value, Error> {
104 self.h2.upload("POST", path, param, content_type, data).await
105 }
106
107 pub async fn send_upload_request(
108 &self,
109 method: &str,
110 path: &str,
111 param: Option<Value>,
112 content_type: &str,
113 data: Vec<u8>,
114 ) -> Result<h2::client::ResponseFuture, Error> {
115
116 let request = H2Client::request_builder("localhost", method, path, param, Some(content_type)).unwrap();
117 let response_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?;
118 Ok(response_future)
119 }
120
121 pub async fn upload_put(
122 &self,
123 path: &str,
124 param: Option<Value>,
125 content_type: &str,
126 data: Vec<u8>,
127 ) -> Result<Value, Error> {
128 self.h2.upload("PUT", path, param, content_type, data).await
129 }
130
131 pub async fn finish(self: Arc<Self>) -> Result<(), Error> {
132 let h2 = self.h2.clone();
133
134 h2.post("finish", None)
135 .map_ok(move |_| {
136 self.abort.abort();
137 })
138 .await
139 }
140
141 pub fn cancel(&self) {
142 self.abort.abort();
143 }
144
145 pub async fn upload_blob<R: std::io::Read>(
146 &self,
147 mut reader: R,
148 file_name: &str,
149 ) -> Result<BackupStats, Error> {
150 let mut raw_data = Vec::new();
151 // fixme: avoid loading into memory
152 reader.read_to_end(&mut raw_data)?;
153
154 let csum = openssl::sha::sha256(&raw_data);
155 let param = json!({"encoded-size": raw_data.len(), "file-name": file_name });
156 let size = raw_data.len() as u64;
157 let _value = self.h2.upload("POST", "blob", Some(param), "application/octet-stream", raw_data).await?;
158 Ok(BackupStats { size, csum })
159 }
160
161 pub async fn upload_blob_from_data(
162 &self,
163 data: Vec<u8>,
164 file_name: &str,
165 compress: bool,
166 crypt_mode: CryptMode,
167 ) -> Result<BackupStats, Error> {
168 let blob = match (crypt_mode, &self.crypt_config) {
169 (CryptMode::None, _) => DataBlob::encode(&data, None, compress)?,
170 (_, None) => bail!("requested encryption/signing without a crypt config"),
171 (CryptMode::Encrypt, Some(crypt_config)) => {
172 DataBlob::encode(&data, Some(crypt_config), compress)?
173 }
174 (CryptMode::SignOnly, Some(crypt_config)) => {
175 DataBlob::create_signed(&data, crypt_config, compress)?
176 }
177 };
178
179 let raw_data = blob.into_inner();
180 let size = raw_data.len() as u64;
181
182 let csum = openssl::sha::sha256(&raw_data);
183 let param = json!({"encoded-size": size, "file-name": file_name });
184 let _value = self.h2.upload("POST", "blob", Some(param), "application/octet-stream", raw_data).await?;
185 Ok(BackupStats { size, csum })
186 }
187
188 pub async fn upload_blob_from_file<P: AsRef<std::path::Path>>(
189 &self,
190 src_path: P,
191 file_name: &str,
192 compress: bool,
193 crypt_mode: CryptMode,
194 ) -> Result<BackupStats, Error> {
195
196 let src_path = src_path.as_ref();
197
198 let mut file = tokio::fs::File::open(src_path)
199 .await
200 .map_err(|err| format_err!("unable to open file {:?} - {}", src_path, err))?;
201
202 let mut contents = Vec::new();
203
204 file.read_to_end(&mut contents)
205 .await
206 .map_err(|err| format_err!("unable to read file {:?} - {}", src_path, err))?;
207
208 self.upload_blob_from_data(contents, file_name, compress, crypt_mode).await
209 }
210
211 pub async fn upload_stream(
212 &self,
213 crypt_mode: CryptMode,
214 previous_manifest: Option<Arc<BackupManifest>>,
215 archive_name: &str,
216 stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
217 prefix: &str,
218 fixed_size: Option<u64>,
219 ) -> Result<BackupStats, Error> {
220 let known_chunks = Arc::new(Mutex::new(HashSet::new()));
221
222 let mut param = json!({ "archive-name": archive_name });
223 if let Some(size) = fixed_size {
224 param["size"] = size.into();
225 }
226
227 let index_path = format!("{}_index", prefix);
228 let close_path = format!("{}_close", prefix);
229
230 if let Some(manifest) = previous_manifest {
231 // try, but ignore errors
232 match archive_type(archive_name) {
233 Ok(ArchiveType::FixedIndex) => {
234 let _ = self.download_previous_fixed_index(archive_name, &manifest, known_chunks.clone()).await;
235 }
236 Ok(ArchiveType::DynamicIndex) => {
237 let _ = self.download_previous_dynamic_index(archive_name, &manifest, known_chunks.clone()).await;
238 }
239 _ => { /* do nothing */ }
240 }
241 }
242
243 let wid = self.h2.post(&index_path, Some(param)).await?.as_u64().unwrap();
244
245 let (chunk_count, size, duration, speed, csum) =
246 Self::upload_chunk_info_stream(
247 self.h2.clone(),
248 wid,
249 stream,
250 &prefix,
251 known_chunks.clone(),
252 self.crypt_config.clone(),
253 crypt_mode,
254 self.verbose,
255 )
256 .await?;
257
258 println!("{}: Uploaded {} bytes as {} chunks in {} seconds ({} MB/s).", archive_name, size, chunk_count, duration.as_secs(), speed);
259 if chunk_count > 0 {
260 println!("{}: Average chunk size was {} bytes.", archive_name, size/chunk_count);
261 println!("{}: Time per request: {} microseconds.", archive_name, (duration.as_micros())/(chunk_count as u128));
262 }
263
264 let param = json!({
265 "wid": wid ,
266 "chunk-count": chunk_count,
267 "size": size,
268 "csum": proxmox::tools::digest_to_hex(&csum),
269 });
270 let _value = self.h2.post(&close_path, Some(param)).await?;
271 Ok(BackupStats {
272 size: size as u64,
273 csum,
274 })
275 }
276
277 fn response_queue() -> (
278 mpsc::Sender<h2::client::ResponseFuture>,
279 oneshot::Receiver<Result<(), Error>>
280 ) {
281 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100);
282 let (verify_result_tx, verify_result_rx) = oneshot::channel();
283
284 // FIXME: check if this works as expected as replacement for the combinator below?
285 // tokio::spawn(async move {
286 // let result: Result<(), Error> = (async move {
287 // while let Some(response) = verify_queue_rx.recv().await {
288 // match H2Client::h2api_response(response.await?).await {
289 // Ok(result) => println!("RESPONSE: {:?}", result),
290 // Err(err) => bail!("pipelined request failed: {}", err),
291 // }
292 // }
293 // Ok(())
294 // }).await;
295 // let _ignore_closed_channel = verify_result_tx.send(result);
296 // });
297 // old code for reference?
298 tokio::spawn(
299 verify_queue_rx
300 .map(Ok::<_, Error>)
301 .try_for_each(|response: h2::client::ResponseFuture| {
302 response
303 .map_err(Error::from)
304 .and_then(H2Client::h2api_response)
305 .map_ok(|result| println!("RESPONSE: {:?}", result))
306 .map_err(|err| format_err!("pipelined request failed: {}", err))
307 })
308 .map(|result| {
309 let _ignore_closed_channel = verify_result_tx.send(result);
310 })
311 );
312
313 (verify_queue_tx, verify_result_rx)
314 }
315
316 fn append_chunk_queue(h2: H2Client, wid: u64, path: String, verbose: bool) -> (
317 mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>,
318 oneshot::Receiver<Result<(), Error>>,
319 ) {
320 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(64);
321 let (verify_result_tx, verify_result_rx) = oneshot::channel();
322
323 let h2_2 = h2.clone();
324
325 // FIXME: async-block-ify this code!
326 tokio::spawn(
327 verify_queue_rx
328 .map(Ok::<_, Error>)
329 .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| {
330 match (response, merged_chunk_info) {
331 (Some(response), MergedChunkInfo::Known(list)) => {
332 future::Either::Left(
333 response
334 .map_err(Error::from)
335 .and_then(H2Client::h2api_response)
336 .and_then(move |_result| {
337 future::ok(MergedChunkInfo::Known(list))
338 })
339 )
340 }
341 (None, MergedChunkInfo::Known(list)) => {
342 future::Either::Right(future::ok(MergedChunkInfo::Known(list)))
343 }
344 _ => unreachable!(),
345 }
346 })
347 .merge_known_chunks()
348 .and_then(move |merged_chunk_info| {
349 match merged_chunk_info {
350 MergedChunkInfo::Known(chunk_list) => {
351 let mut digest_list = vec![];
352 let mut offset_list = vec![];
353 for (offset, digest) in chunk_list {
354 digest_list.push(digest_to_hex(&digest));
355 offset_list.push(offset);
356 }
357 if verbose { println!("append chunks list len ({})", digest_list.len()); }
358 let param = json!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list });
359 let request = H2Client::request_builder("localhost", "PUT", &path, None, Some("application/json")).unwrap();
360 let param_data = bytes::Bytes::from(param.to_string().into_bytes());
361 let upload_data = Some(param_data);
362 h2_2.send_request(request, upload_data)
363 .and_then(move |response| {
364 response
365 .map_err(Error::from)
366 .and_then(H2Client::h2api_response)
367 .map_ok(|_| ())
368 })
369 .map_err(|err| format_err!("pipelined request failed: {}", err))
370 }
371 _ => unreachable!(),
372 }
373 })
374 .try_for_each(|_| future::ok(()))
375 .map(|result| {
376 let _ignore_closed_channel = verify_result_tx.send(result);
377 })
378 );
379
380 (verify_queue_tx, verify_result_rx)
381 }
382
383 pub async fn download_previous_fixed_index(
384 &self,
385 archive_name: &str,
386 manifest: &BackupManifest,
387 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
388 ) -> Result<FixedIndexReader, Error> {
389
390 let mut tmpfile = std::fs::OpenOptions::new()
391 .write(true)
392 .read(true)
393 .custom_flags(libc::O_TMPFILE)
394 .open("/tmp")?;
395
396 let param = json!({ "archive-name": archive_name });
397 self.h2.download("previous", Some(param), &mut tmpfile).await?;
398
399 let index = FixedIndexReader::new(tmpfile)
400 .map_err(|err| format_err!("unable to read fixed index '{}' - {}", archive_name, err))?;
401 // Note: do not use values stored in index (not trusted) - instead, computed them again
402 let (csum, size) = index.compute_csum();
403 manifest.verify_file(archive_name, &csum, size)?;
404
405 // add index chunks to known chunks
406 let mut known_chunks = known_chunks.lock().unwrap();
407 for i in 0..index.index_count() {
408 known_chunks.insert(*index.index_digest(i).unwrap());
409 }
410
411 if self.verbose {
412 println!("{}: known chunks list length is {}", archive_name, index.index_count());
413 }
414
415 Ok(index)
416 }
417
418 pub async fn download_previous_dynamic_index(
419 &self,
420 archive_name: &str,
421 manifest: &BackupManifest,
422 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
423 ) -> Result<DynamicIndexReader, Error> {
424
425 let mut tmpfile = std::fs::OpenOptions::new()
426 .write(true)
427 .read(true)
428 .custom_flags(libc::O_TMPFILE)
429 .open("/tmp")?;
430
431 let param = json!({ "archive-name": archive_name });
432 self.h2.download("previous", Some(param), &mut tmpfile).await?;
433
434 let index = DynamicIndexReader::new(tmpfile)
435 .map_err(|err| format_err!("unable to read dynmamic index '{}' - {}", archive_name, err))?;
436 // Note: do not use values stored in index (not trusted) - instead, computed them again
437 let (csum, size) = index.compute_csum();
438 manifest.verify_file(archive_name, &csum, size)?;
439
440 // add index chunks to known chunks
441 let mut known_chunks = known_chunks.lock().unwrap();
442 for i in 0..index.index_count() {
443 known_chunks.insert(*index.index_digest(i).unwrap());
444 }
445
446 if self.verbose {
447 println!("{}: known chunks list length is {}", archive_name, index.index_count());
448 }
449
450 Ok(index)
451 }
452
453 /// Download backup manifest (index.json) of last backup
454 pub async fn download_previous_manifest(&self) -> Result<BackupManifest, Error> {
455
456 use std::convert::TryFrom;
457
458 let mut raw_data = Vec::with_capacity(64 * 1024);
459
460 let param = json!({ "archive-name": MANIFEST_BLOB_NAME });
461 self.h2.download("previous", Some(param), &mut raw_data).await?;
462
463 let blob = DataBlob::from_raw(raw_data)?;
464 blob.verify_crc()?;
465 let data = blob.decode(self.crypt_config.as_ref().map(Arc::as_ref))?;
466 let json: Value = serde_json::from_slice(&data[..])?;
467 let manifest = BackupManifest::try_from(json)?;
468
469 Ok(manifest)
470 }
471
472 fn upload_chunk_info_stream(
473 h2: H2Client,
474 wid: u64,
475 stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
476 prefix: &str,
477 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
478 crypt_config: Option<Arc<CryptConfig>>,
479 crypt_mode: CryptMode,
480 verbose: bool,
481 ) -> impl Future<Output = Result<(usize, usize, std::time::Duration, usize, [u8; 32]), Error>> {
482
483 let repeat = Arc::new(AtomicUsize::new(0));
484 let repeat2 = repeat.clone();
485
486 let stream_len = Arc::new(AtomicUsize::new(0));
487 let stream_len2 = stream_len.clone();
488
489 let append_chunk_path = format!("{}_index", prefix);
490 let upload_chunk_path = format!("{}_chunk", prefix);
491 let is_fixed_chunk_size = prefix == "fixed";
492
493 let (upload_queue, upload_result) =
494 Self::append_chunk_queue(h2.clone(), wid, append_chunk_path.to_owned(), verbose);
495
496 let start_time = std::time::Instant::now();
497
498 let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
499 let index_csum_2 = index_csum.clone();
500
501 stream
502 .and_then(move |data| {
503
504 let chunk_len = data.len();
505
506 repeat.fetch_add(1, Ordering::SeqCst);
507 let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;
508
509 let mut chunk_builder = DataChunkBuilder::new(data.as_ref())
510 .compress(true);
511
512 if let Some(ref crypt_config) = crypt_config {
513 chunk_builder = chunk_builder.crypt_config(crypt_config, crypt_mode);
514 }
515
516 let mut known_chunks = known_chunks.lock().unwrap();
517 let digest = chunk_builder.digest();
518
519 let mut guard = index_csum.lock().unwrap();
520 let csum = guard.as_mut().unwrap();
521
522 let chunk_end = offset + chunk_len as u64;
523
524 if !is_fixed_chunk_size { csum.update(&chunk_end.to_le_bytes()); }
525 csum.update(digest);
526
527 let chunk_is_known = known_chunks.contains(digest);
528 if chunk_is_known {
529 future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
530 } else {
531 known_chunks.insert(*digest);
532 future::ready(chunk_builder
533 .build()
534 .map(move |(chunk, digest)| MergedChunkInfo::New(ChunkInfo {
535 chunk,
536 digest,
537 chunk_len: chunk_len as u64,
538 offset,
539 }))
540 )
541 }
542 })
543 .merge_known_chunks()
544 .try_for_each(move |merged_chunk_info| {
545
546 if let MergedChunkInfo::New(chunk_info) = merged_chunk_info {
547 let offset = chunk_info.offset;
548 let digest = chunk_info.digest;
549 let digest_str = digest_to_hex(&digest);
550
551 if verbose {
552 println!("upload new chunk {} ({} bytes, offset {})", digest_str,
553 chunk_info.chunk_len, offset);
554 }
555
556 let chunk_data = chunk_info.chunk.into_inner();
557 let param = json!({
558 "wid": wid,
559 "digest": digest_str,
560 "size": chunk_info.chunk_len,
561 "encoded-size": chunk_data.len(),
562 });
563
564 let ct = "application/octet-stream";
565 let request = H2Client::request_builder("localhost", "POST", &upload_chunk_path, Some(param), Some(ct)).unwrap();
566 let upload_data = Some(bytes::Bytes::from(chunk_data));
567
568 let new_info = MergedChunkInfo::Known(vec![(offset, digest)]);
569
570 let mut upload_queue = upload_queue.clone();
571 future::Either::Left(h2
572 .send_request(request, upload_data)
573 .and_then(move |response| async move {
574 upload_queue
575 .send((new_info, Some(response)))
576 .await
577 .map_err(|err| format_err!("failed to send to upload queue: {}", err))
578 })
579 )
580 } else {
581 let mut upload_queue = upload_queue.clone();
582 future::Either::Right(async move {
583 upload_queue
584 .send((merged_chunk_info, None))
585 .await
586 .map_err(|err| format_err!("failed to send to upload queue: {}", err))
587 })
588 }
589 })
590 .then(move |result| async move {
591 upload_result.await?.and(result)
592 }.boxed())
593 .and_then(move |_| {
594 let repeat = repeat2.load(Ordering::SeqCst);
595 let stream_len = stream_len2.load(Ordering::SeqCst);
596 let speed = ((stream_len*1_000_000)/(1024*1024))/(start_time.elapsed().as_micros() as usize);
597
598 let mut guard = index_csum_2.lock().unwrap();
599 let csum = guard.take().unwrap().finish();
600
601 futures::future::ok((repeat, stream_len, start_time.elapsed(), speed, csum))
602 })
603 }
604
605 pub async fn upload_speedtest(&self) -> Result<usize, Error> {
606
607 let mut data = vec![];
608 // generate pseudo random byte sequence
609 for i in 0..1024*1024 {
610 for j in 0..4 {
611 let byte = ((i >> (j<<3))&0xff) as u8;
612 data.push(byte);
613 }
614 }
615
616 let item_len = data.len();
617
618 let mut repeat = 0;
619
620 let (upload_queue, upload_result) = Self::response_queue();
621
622 let start_time = std::time::Instant::now();
623
624 loop {
625 repeat += 1;
626 if start_time.elapsed().as_secs() >= 5 {
627 break;
628 }
629
630 let mut upload_queue = upload_queue.clone();
631
632 println!("send test data ({} bytes)", data.len());
633 let request = H2Client::request_builder("localhost", "POST", "speedtest", None, None).unwrap();
634 let request_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?;
635
636 upload_queue.send(request_future).await?;
637 }
638
639 drop(upload_queue); // close queue
640
641 let _ = upload_result.await?;
642
643 println!("Uploaded {} chunks in {} seconds.", repeat, start_time.elapsed().as_secs());
644 let speed = ((item_len*1_000_000*(repeat as usize))/(1024*1024))/(start_time.elapsed().as_micros() as usize);
645 println!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128));
646
647 Ok(speed)
648 }
649 }