]> git.proxmox.com Git - proxmox-backup.git/blob - src/client/backup_writer.rs
src/client/backup_writer.rs: move BackupClient code into extra file
[proxmox-backup.git] / src / client / backup_writer.rs
1 use std::collections::HashSet;
2 use std::sync::atomic::{AtomicUsize, Ordering};
3 use std::sync::{Arc, Mutex};
4
5 use failure::*;
6 use chrono::{DateTime, Utc};
7 use futures::*;
8 use futures::stream::Stream;
9 use serde_json::{json, Value};
10 use tokio::io::AsyncReadExt;
11 use tokio::sync::{mpsc, oneshot};
12
13 use proxmox::tools::digest_to_hex;
14
15 use super::merge_known_chunks::{MergedChunkInfo, MergeKnownChunks};
16 use crate::backup::*;
17 use crate::tools::futures::Canceller;
18
19 use super::{HttpClient, H2Client};
20
21 pub struct BackupWriter {
22 h2: H2Client,
23 canceller: Canceller,
24 }
25
26 impl Drop for BackupWriter {
27
28 fn drop(&mut self) {
29 self.canceller.cancel();
30 }
31 }
32
33 pub struct BackupStats {
34 pub size: u64,
35 pub csum: [u8; 32],
36 }
37
38 impl BackupWriter {
39
40 fn new(h2: H2Client, canceller: Canceller) -> Arc<Self> {
41 Arc::new(Self { h2, canceller })
42 }
43
44 pub async fn start(
45 client: HttpClient,
46 datastore: &str,
47 backup_type: &str,
48 backup_id: &str,
49 backup_time: DateTime<Utc>,
50 debug: bool,
51 ) -> Result<Arc<BackupWriter>, Error> {
52
53 let param = json!({
54 "backup-type": backup_type,
55 "backup-id": backup_id,
56 "backup-time": backup_time.timestamp(),
57 "store": datastore,
58 "debug": debug
59 });
60
61 let req = HttpClient::request_builder(
62 client.server(), "GET", "/api2/json/backup", Some(param)).unwrap();
63
64 let (h2, canceller) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())).await?;
65
66 Ok(BackupWriter::new(h2, canceller))
67 }
68
69 pub async fn get(
70 &self,
71 path: &str,
72 param: Option<Value>,
73 ) -> Result<Value, Error> {
74 self.h2.get(path, param).await
75 }
76
77 pub async fn put(
78 &self,
79 path: &str,
80 param: Option<Value>,
81 ) -> Result<Value, Error> {
82 self.h2.put(path, param).await
83 }
84
85 pub async fn post(
86 &self,
87 path: &str,
88 param: Option<Value>,
89 ) -> Result<Value, Error> {
90 self.h2.post(path, param).await
91 }
92
93 pub async fn upload_post(
94 &self,
95 path: &str,
96 param: Option<Value>,
97 content_type: &str,
98 data: Vec<u8>,
99 ) -> Result<Value, Error> {
100 self.h2.upload("POST", path, param, content_type, data).await
101 }
102
103 pub async fn send_upload_request(
104 &self,
105 method: &str,
106 path: &str,
107 param: Option<Value>,
108 content_type: &str,
109 data: Vec<u8>,
110 ) -> Result<h2::client::ResponseFuture, Error> {
111
112 let request = H2Client::request_builder("localhost", method, path, param, Some(content_type)).unwrap();
113 let response_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?;
114 Ok(response_future)
115 }
116
117 pub async fn upload_put(
118 &self,
119 path: &str,
120 param: Option<Value>,
121 content_type: &str,
122 data: Vec<u8>,
123 ) -> Result<Value, Error> {
124 self.h2.upload("PUT", path, param, content_type, data).await
125 }
126
127 pub async fn finish(self: Arc<Self>) -> Result<(), Error> {
128 let h2 = self.h2.clone();
129
130 h2.post("finish", None)
131 .map_ok(move |_| {
132 self.canceller.cancel();
133 })
134 .await
135 }
136
137 pub fn force_close(self) {
138 self.canceller.cancel();
139 }
140
141 pub async fn upload_blob<R: std::io::Read>(
142 &self,
143 mut reader: R,
144 file_name: &str,
145 ) -> Result<BackupStats, Error> {
146 let mut raw_data = Vec::new();
147 // fixme: avoid loading into memory
148 reader.read_to_end(&mut raw_data)?;
149
150 let csum = openssl::sha::sha256(&raw_data);
151 let param = json!({"encoded-size": raw_data.len(), "file-name": file_name });
152 let size = raw_data.len() as u64;
153 let _value = self.h2.upload("POST", "blob", Some(param), "application/octet-stream", raw_data).await?;
154 Ok(BackupStats { size, csum })
155 }
156
157 pub async fn upload_blob_from_data(
158 &self,
159 data: Vec<u8>,
160 file_name: &str,
161 crypt_config: Option<Arc<CryptConfig>>,
162 compress: bool,
163 sign_only: bool,
164 ) -> Result<BackupStats, Error> {
165
166 let blob = if let Some(ref crypt_config) = crypt_config {
167 if sign_only {
168 DataBlob::create_signed(&data, crypt_config, compress)?
169 } else {
170 DataBlob::encode(&data, Some(crypt_config), compress)?
171 }
172 } else {
173 DataBlob::encode(&data, None, compress)?
174 };
175
176 let raw_data = blob.into_inner();
177 let size = raw_data.len() as u64;
178
179 let csum = openssl::sha::sha256(&raw_data);
180 let param = json!({"encoded-size": size, "file-name": file_name });
181 let _value = self.h2.upload("POST", "blob", Some(param), "application/octet-stream", raw_data).await?;
182 Ok(BackupStats { size, csum })
183 }
184
185 pub async fn upload_blob_from_file<P: AsRef<std::path::Path>>(
186 &self,
187 src_path: P,
188 file_name: &str,
189 crypt_config: Option<Arc<CryptConfig>>,
190 compress: bool,
191 ) -> Result<BackupStats, Error> {
192
193 let src_path = src_path.as_ref();
194
195 let mut file = tokio::fs::File::open(src_path)
196 .await
197 .map_err(|err| format_err!("unable to open file {:?} - {}", src_path, err))?;
198
199 let mut contents = Vec::new();
200
201 file.read_to_end(&mut contents)
202 .await
203 .map_err(|err| format_err!("unable to read file {:?} - {}", src_path, err))?;
204
205 let blob = DataBlob::encode(&contents, crypt_config.as_ref().map(AsRef::as_ref), compress)?;
206 let raw_data = blob.into_inner();
207 let size = raw_data.len() as u64;
208 let csum = openssl::sha::sha256(&raw_data);
209 let param = json!({
210 "encoded-size": size,
211 "file-name": file_name,
212 });
213 self.h2.upload("POST", "blob", Some(param), "application/octet-stream", raw_data).await?;
214 Ok(BackupStats { size, csum })
215 }
216
217 pub async fn upload_stream(
218 &self,
219 archive_name: &str,
220 stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
221 prefix: &str,
222 fixed_size: Option<u64>,
223 crypt_config: Option<Arc<CryptConfig>>,
224 ) -> Result<BackupStats, Error> {
225 let known_chunks = Arc::new(Mutex::new(HashSet::new()));
226
227 let mut param = json!({ "archive-name": archive_name });
228 if let Some(size) = fixed_size {
229 param["size"] = size.into();
230 }
231
232 let index_path = format!("{}_index", prefix);
233 let close_path = format!("{}_close", prefix);
234
235 self.download_chunk_list(&index_path, archive_name, known_chunks.clone()).await?;
236
237 let wid = self.h2.post(&index_path, Some(param)).await?.as_u64().unwrap();
238
239 let (chunk_count, size, _speed, csum) =
240 Self::upload_chunk_info_stream(
241 self.h2.clone(),
242 wid,
243 stream,
244 &prefix,
245 known_chunks.clone(),
246 crypt_config,
247 )
248 .await?;
249
250 let param = json!({
251 "wid": wid ,
252 "chunk-count": chunk_count,
253 "size": size,
254 "csum": proxmox::tools::digest_to_hex(&csum),
255 });
256 let _value = self.h2.post(&close_path, Some(param)).await?;
257 Ok(BackupStats {
258 size: size as u64,
259 csum,
260 })
261 }
262
263 fn response_queue() -> (
264 mpsc::Sender<h2::client::ResponseFuture>,
265 oneshot::Receiver<Result<(), Error>>
266 ) {
267 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100);
268 let (verify_result_tx, verify_result_rx) = oneshot::channel();
269
270 hyper::rt::spawn(
271 verify_queue_rx
272 .map(Ok::<_, Error>)
273 .try_for_each(|response: h2::client::ResponseFuture| {
274 response
275 .map_err(Error::from)
276 .and_then(H2Client::h2api_response)
277 .map_ok(|result| println!("RESPONSE: {:?}", result))
278 .map_err(|err| format_err!("pipelined request failed: {}", err))
279 })
280 .map(|result| {
281 let _ignore_closed_channel = verify_result_tx.send(result);
282 })
283 );
284
285 (verify_queue_tx, verify_result_rx)
286 }
287
288 fn append_chunk_queue(h2: H2Client, wid: u64, path: String) -> (
289 mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>,
290 oneshot::Receiver<Result<(), Error>>
291 ) {
292 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(64);
293 let (verify_result_tx, verify_result_rx) = oneshot::channel();
294
295 let h2_2 = h2.clone();
296
297 hyper::rt::spawn(
298 verify_queue_rx
299 .map(Ok::<_, Error>)
300 .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| {
301 match (response, merged_chunk_info) {
302 (Some(response), MergedChunkInfo::Known(list)) => {
303 future::Either::Left(
304 response
305 .map_err(Error::from)
306 .and_then(H2Client::h2api_response)
307 .and_then(move |_result| {
308 future::ok(MergedChunkInfo::Known(list))
309 })
310 )
311 }
312 (None, MergedChunkInfo::Known(list)) => {
313 future::Either::Right(future::ok(MergedChunkInfo::Known(list)))
314 }
315 _ => unreachable!(),
316 }
317 })
318 .merge_known_chunks()
319 .and_then(move |merged_chunk_info| {
320 match merged_chunk_info {
321 MergedChunkInfo::Known(chunk_list) => {
322 let mut digest_list = vec![];
323 let mut offset_list = vec![];
324 for (offset, digest) in chunk_list {
325 //println!("append chunk {} (offset {})", proxmox::tools::digest_to_hex(&digest), offset);
326 digest_list.push(digest_to_hex(&digest));
327 offset_list.push(offset);
328 }
329 println!("append chunks list len ({})", digest_list.len());
330 let param = json!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list });
331 let request = H2Client::request_builder("localhost", "PUT", &path, None, Some("application/json")).unwrap();
332 let param_data = bytes::Bytes::from(param.to_string().as_bytes());
333 let upload_data = Some(param_data);
334 h2_2.send_request(request, upload_data)
335 .and_then(move |response| {
336 response
337 .map_err(Error::from)
338 .and_then(H2Client::h2api_response)
339 .map_ok(|_| ())
340 })
341 .map_err(|err| format_err!("pipelined request failed: {}", err))
342 }
343 _ => unreachable!(),
344 }
345 })
346 .try_for_each(|_| future::ok(()))
347 .map(|result| {
348 let _ignore_closed_channel = verify_result_tx.send(result);
349 })
350 );
351
352 (verify_queue_tx, verify_result_rx)
353 }
354
355 pub async fn download_chunk_list(
356 &self,
357 path: &str,
358 archive_name: &str,
359 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
360 ) -> Result<(), Error> {
361
362 let param = json!({ "archive-name": archive_name });
363 let request = H2Client::request_builder("localhost", "GET", path, Some(param), None).unwrap();
364
365 let h2request = self.h2.send_request(request, None).await?;
366 let resp = h2request.await?;
367
368 let status = resp.status();
369
370 if !status.is_success() {
371 H2Client::h2api_response(resp).await?; // raise error
372 unreachable!();
373 }
374
375 let mut body = resp.into_body();
376 let mut release_capacity = body.release_capacity().clone();
377
378 let mut stream = DigestListDecoder::new(body.map_err(Error::from));
379
380 while let Some(chunk) = stream.try_next().await? {
381 let _ = release_capacity.release_capacity(chunk.len());
382 println!("GOT DOWNLOAD {}", digest_to_hex(&chunk));
383 known_chunks.lock().unwrap().insert(chunk);
384 }
385
386 Ok(())
387 }
388
389 fn upload_chunk_info_stream(
390 h2: H2Client,
391 wid: u64,
392 stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
393 prefix: &str,
394 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
395 crypt_config: Option<Arc<CryptConfig>>,
396 ) -> impl Future<Output = Result<(usize, usize, usize, [u8; 32]), Error>> {
397
398 let repeat = Arc::new(AtomicUsize::new(0));
399 let repeat2 = repeat.clone();
400
401 let stream_len = Arc::new(AtomicUsize::new(0));
402 let stream_len2 = stream_len.clone();
403
404 let append_chunk_path = format!("{}_index", prefix);
405 let upload_chunk_path = format!("{}_chunk", prefix);
406 let is_fixed_chunk_size = prefix == "fixed";
407
408 let (upload_queue, upload_result) =
409 Self::append_chunk_queue(h2.clone(), wid, append_chunk_path.to_owned());
410
411 let start_time = std::time::Instant::now();
412
413 let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
414 let index_csum_2 = index_csum.clone();
415
416 stream
417 .and_then(move |data| {
418
419 let chunk_len = data.len();
420
421 repeat.fetch_add(1, Ordering::SeqCst);
422 let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;
423
424 let mut chunk_builder = DataChunkBuilder::new(data.as_ref())
425 .compress(true);
426
427 if let Some(ref crypt_config) = crypt_config {
428 chunk_builder = chunk_builder.crypt_config(crypt_config);
429 }
430
431 let mut known_chunks = known_chunks.lock().unwrap();
432 let digest = chunk_builder.digest();
433
434 let mut guard = index_csum.lock().unwrap();
435 let csum = guard.as_mut().unwrap();
436
437 let chunk_end = offset + chunk_len as u64;
438
439 if !is_fixed_chunk_size { csum.update(&chunk_end.to_le_bytes()); }
440 csum.update(digest);
441
442 let chunk_is_known = known_chunks.contains(digest);
443 if chunk_is_known {
444 future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
445 } else {
446 known_chunks.insert(*digest);
447 future::ready(chunk_builder
448 .build()
449 .map(move |(chunk, digest)| MergedChunkInfo::New(ChunkInfo {
450 chunk,
451 digest,
452 chunk_len: chunk_len as u64,
453 offset,
454 }))
455 )
456 }
457 })
458 .merge_known_chunks()
459 .try_for_each(move |merged_chunk_info| {
460
461 if let MergedChunkInfo::New(chunk_info) = merged_chunk_info {
462 let offset = chunk_info.offset;
463 let digest = chunk_info.digest;
464 let digest_str = digest_to_hex(&digest);
465
466 println!("upload new chunk {} ({} bytes, offset {})", digest_str,
467 chunk_info.chunk_len, offset);
468
469 let chunk_data = chunk_info.chunk.raw_data();
470 let param = json!({
471 "wid": wid,
472 "digest": digest_str,
473 "size": chunk_info.chunk_len,
474 "encoded-size": chunk_data.len(),
475 });
476
477 let ct = "application/octet-stream";
478 let request = H2Client::request_builder("localhost", "POST", &upload_chunk_path, Some(param), Some(ct)).unwrap();
479 let upload_data = Some(bytes::Bytes::from(chunk_data));
480
481 let new_info = MergedChunkInfo::Known(vec![(offset, digest)]);
482
483 let mut upload_queue = upload_queue.clone();
484 future::Either::Left(h2
485 .send_request(request, upload_data)
486 .and_then(move |response| async move {
487 upload_queue
488 .send((new_info, Some(response)))
489 .await
490 .map_err(Error::from)
491 })
492 )
493 } else {
494 let mut upload_queue = upload_queue.clone();
495 future::Either::Right(async move {
496 upload_queue
497 .send((merged_chunk_info, None))
498 .await
499 .map_err(Error::from)
500 })
501 }
502 })
503 .then(move |result| async move {
504 upload_result.await?.and(result)
505 }.boxed())
506 .and_then(move |_| {
507 let repeat = repeat2.load(Ordering::SeqCst);
508 let stream_len = stream_len2.load(Ordering::SeqCst);
509 let speed = ((stream_len*1000000)/(1024*1024))/(start_time.elapsed().as_micros() as usize);
510 println!("Uploaded {} chunks in {} seconds ({} MB/s).", repeat, start_time.elapsed().as_secs(), speed);
511 if repeat > 0 {
512 println!("Average chunk size was {} bytes.", stream_len/repeat);
513 println!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128));
514 }
515
516 let mut guard = index_csum_2.lock().unwrap();
517 let csum = guard.take().unwrap().finish();
518
519 futures::future::ok((repeat, stream_len, speed, csum))
520 })
521 }
522
523 pub async fn upload_speedtest(&self) -> Result<usize, Error> {
524
525 let mut data = vec![];
526 // generate pseudo random byte sequence
527 for i in 0..1024*1024 {
528 for j in 0..4 {
529 let byte = ((i >> (j<<3))&0xff) as u8;
530 data.push(byte);
531 }
532 }
533
534 let item_len = data.len();
535
536 let mut repeat = 0;
537
538 let (upload_queue, upload_result) = Self::response_queue();
539
540 let start_time = std::time::Instant::now();
541
542 loop {
543 repeat += 1;
544 if start_time.elapsed().as_secs() >= 5 {
545 break;
546 }
547
548 let mut upload_queue = upload_queue.clone();
549
550 println!("send test data ({} bytes)", data.len());
551 let request = H2Client::request_builder("localhost", "POST", "speedtest", None, None).unwrap();
552 let request_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?;
553
554 upload_queue.send(request_future).await?;
555 }
556
557 drop(upload_queue); // close queue
558
559 let _ = upload_result.await?;
560
561 println!("Uploaded {} chunks in {} seconds.", repeat, start_time.elapsed().as_secs());
562 let speed = ((item_len*1000000*(repeat as usize))/(1024*1024))/(start_time.elapsed().as_micros() as usize);
563 println!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128));
564
565 Ok(speed)
566 }
567 }