]> git.proxmox.com Git - proxmox-backup.git/blob - src/client/backup_writer.rs
update a chunk of stuff to the hyper release
[proxmox-backup.git] / src / client / backup_writer.rs
1 use std::collections::HashSet;
2 use std::sync::atomic::{AtomicUsize, Ordering};
3 use std::sync::{Arc, Mutex};
4
5 use failure::*;
6 use chrono::{DateTime, Utc};
7 use futures::*;
8 use futures::stream::Stream;
9 use serde_json::{json, Value};
10 use tokio::io::AsyncReadExt;
11 use tokio::sync::{mpsc, oneshot};
12
13 use proxmox::tools::digest_to_hex;
14
15 use super::merge_known_chunks::{MergedChunkInfo, MergeKnownChunks};
16 use crate::backup::*;
17 use crate::tools::futures::Canceller;
18
19 use super::{HttpClient, H2Client};
20
21 pub struct BackupWriter {
22 h2: H2Client,
23 canceller: Canceller,
24 }
25
26 impl Drop for BackupWriter {
27
28 fn drop(&mut self) {
29 self.canceller.cancel();
30 }
31 }
32
33 pub struct BackupStats {
34 pub size: u64,
35 pub csum: [u8; 32],
36 }
37
38 impl BackupWriter {
39
40 fn new(h2: H2Client, canceller: Canceller) -> Arc<Self> {
41 Arc::new(Self { h2, canceller })
42 }
43
44 pub async fn start(
45 client: HttpClient,
46 datastore: &str,
47 backup_type: &str,
48 backup_id: &str,
49 backup_time: DateTime<Utc>,
50 debug: bool,
51 ) -> Result<Arc<BackupWriter>, Error> {
52
53 let param = json!({
54 "backup-type": backup_type,
55 "backup-id": backup_id,
56 "backup-time": backup_time.timestamp(),
57 "store": datastore,
58 "debug": debug
59 });
60
61 let req = HttpClient::request_builder(
62 client.server(), "GET", "/api2/json/backup", Some(param)).unwrap();
63
64 let (h2, canceller) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())).await?;
65
66 Ok(BackupWriter::new(h2, canceller))
67 }
68
69 pub async fn get(
70 &self,
71 path: &str,
72 param: Option<Value>,
73 ) -> Result<Value, Error> {
74 self.h2.get(path, param).await
75 }
76
77 pub async fn put(
78 &self,
79 path: &str,
80 param: Option<Value>,
81 ) -> Result<Value, Error> {
82 self.h2.put(path, param).await
83 }
84
85 pub async fn post(
86 &self,
87 path: &str,
88 param: Option<Value>,
89 ) -> Result<Value, Error> {
90 self.h2.post(path, param).await
91 }
92
93 pub async fn upload_post(
94 &self,
95 path: &str,
96 param: Option<Value>,
97 content_type: &str,
98 data: Vec<u8>,
99 ) -> Result<Value, Error> {
100 self.h2.upload("POST", path, param, content_type, data).await
101 }
102
103 pub async fn send_upload_request(
104 &self,
105 method: &str,
106 path: &str,
107 param: Option<Value>,
108 content_type: &str,
109 data: Vec<u8>,
110 ) -> Result<h2::client::ResponseFuture, Error> {
111
112 let request = H2Client::request_builder("localhost", method, path, param, Some(content_type)).unwrap();
113 let response_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?;
114 Ok(response_future)
115 }
116
117 pub async fn upload_put(
118 &self,
119 path: &str,
120 param: Option<Value>,
121 content_type: &str,
122 data: Vec<u8>,
123 ) -> Result<Value, Error> {
124 self.h2.upload("PUT", path, param, content_type, data).await
125 }
126
127 pub async fn finish(self: Arc<Self>) -> Result<(), Error> {
128 let h2 = self.h2.clone();
129
130 h2.post("finish", None)
131 .map_ok(move |_| {
132 self.canceller.cancel();
133 })
134 .await
135 }
136
137 pub fn cancel(&self) {
138 self.canceller.cancel();
139 }
140
141 pub async fn upload_blob<R: std::io::Read>(
142 &self,
143 mut reader: R,
144 file_name: &str,
145 ) -> Result<BackupStats, Error> {
146 let mut raw_data = Vec::new();
147 // fixme: avoid loading into memory
148 reader.read_to_end(&mut raw_data)?;
149
150 let csum = openssl::sha::sha256(&raw_data);
151 let param = json!({"encoded-size": raw_data.len(), "file-name": file_name });
152 let size = raw_data.len() as u64;
153 let _value = self.h2.upload("POST", "blob", Some(param), "application/octet-stream", raw_data).await?;
154 Ok(BackupStats { size, csum })
155 }
156
157 pub async fn upload_blob_from_data(
158 &self,
159 data: Vec<u8>,
160 file_name: &str,
161 crypt_config: Option<Arc<CryptConfig>>,
162 compress: bool,
163 sign_only: bool,
164 ) -> Result<BackupStats, Error> {
165
166 let blob = if let Some(ref crypt_config) = crypt_config {
167 if sign_only {
168 DataBlob::create_signed(&data, crypt_config, compress)?
169 } else {
170 DataBlob::encode(&data, Some(crypt_config), compress)?
171 }
172 } else {
173 DataBlob::encode(&data, None, compress)?
174 };
175
176 let raw_data = blob.into_inner();
177 let size = raw_data.len() as u64;
178
179 let csum = openssl::sha::sha256(&raw_data);
180 let param = json!({"encoded-size": size, "file-name": file_name });
181 let _value = self.h2.upload("POST", "blob", Some(param), "application/octet-stream", raw_data).await?;
182 Ok(BackupStats { size, csum })
183 }
184
185 pub async fn upload_blob_from_file<P: AsRef<std::path::Path>>(
186 &self,
187 src_path: P,
188 file_name: &str,
189 crypt_config: Option<Arc<CryptConfig>>,
190 compress: bool,
191 ) -> Result<BackupStats, Error> {
192
193 let src_path = src_path.as_ref();
194
195 let mut file = tokio::fs::File::open(src_path)
196 .await
197 .map_err(|err| format_err!("unable to open file {:?} - {}", src_path, err))?;
198
199 let mut contents = Vec::new();
200
201 file.read_to_end(&mut contents)
202 .await
203 .map_err(|err| format_err!("unable to read file {:?} - {}", src_path, err))?;
204
205 let blob = DataBlob::encode(&contents, crypt_config.as_ref().map(AsRef::as_ref), compress)?;
206 let raw_data = blob.into_inner();
207 let size = raw_data.len() as u64;
208 let csum = openssl::sha::sha256(&raw_data);
209 let param = json!({
210 "encoded-size": size,
211 "file-name": file_name,
212 });
213 self.h2.upload("POST", "blob", Some(param), "application/octet-stream", raw_data).await?;
214 Ok(BackupStats { size, csum })
215 }
216
217 pub async fn upload_stream(
218 &self,
219 archive_name: &str,
220 stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
221 prefix: &str,
222 fixed_size: Option<u64>,
223 crypt_config: Option<Arc<CryptConfig>>,
224 ) -> Result<BackupStats, Error> {
225 let known_chunks = Arc::new(Mutex::new(HashSet::new()));
226
227 let mut param = json!({ "archive-name": archive_name });
228 if let Some(size) = fixed_size {
229 param["size"] = size.into();
230 }
231
232 let index_path = format!("{}_index", prefix);
233 let close_path = format!("{}_close", prefix);
234
235 self.download_chunk_list(&index_path, archive_name, known_chunks.clone()).await?;
236
237 let wid = self.h2.post(&index_path, Some(param)).await?.as_u64().unwrap();
238
239 let (chunk_count, size, _speed, csum) =
240 Self::upload_chunk_info_stream(
241 self.h2.clone(),
242 wid,
243 stream,
244 &prefix,
245 known_chunks.clone(),
246 crypt_config,
247 )
248 .await?;
249
250 let param = json!({
251 "wid": wid ,
252 "chunk-count": chunk_count,
253 "size": size,
254 "csum": proxmox::tools::digest_to_hex(&csum),
255 });
256 let _value = self.h2.post(&close_path, Some(param)).await?;
257 Ok(BackupStats {
258 size: size as u64,
259 csum,
260 })
261 }
262
263 fn response_queue() -> (
264 mpsc::Sender<h2::client::ResponseFuture>,
265 oneshot::Receiver<Result<(), Error>>
266 ) {
267 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100);
268 let (verify_result_tx, verify_result_rx) = oneshot::channel();
269
270 // FIXME: check if this works as expected as replacement for the combinator below?
271 // tokio::spawn(async move {
272 // let result: Result<(), Error> = (async move {
273 // while let Some(response) = verify_queue_rx.recv().await {
274 // match H2Client::h2api_response(response.await?).await {
275 // Ok(result) => println!("RESPONSE: {:?}", result),
276 // Err(err) => bail!("pipelined request failed: {}", err),
277 // }
278 // }
279 // Ok(())
280 // }).await;
281 // let _ignore_closed_channel = verify_result_tx.send(result);
282 // });
283 // old code for reference?
284 tokio::spawn(
285 verify_queue_rx
286 .map(Ok::<_, Error>)
287 .try_for_each(|response: h2::client::ResponseFuture| {
288 response
289 .map_err(Error::from)
290 .and_then(H2Client::h2api_response)
291 .map_ok(|result| println!("RESPONSE: {:?}", result))
292 .map_err(|err| format_err!("pipelined request failed: {}", err))
293 })
294 .map(|result| {
295 let _ignore_closed_channel = verify_result_tx.send(result);
296 })
297 );
298
299 (verify_queue_tx, verify_result_rx)
300 }
301
302 fn append_chunk_queue(h2: H2Client, wid: u64, path: String) -> (
303 mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>,
304 oneshot::Receiver<Result<(), Error>>
305 ) {
306 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(64);
307 let (verify_result_tx, verify_result_rx) = oneshot::channel();
308
309 let h2_2 = h2.clone();
310
311 // FIXME: async-block-ify this code!
312 tokio::spawn(
313 verify_queue_rx
314 .map(Ok::<_, Error>)
315 .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| {
316 match (response, merged_chunk_info) {
317 (Some(response), MergedChunkInfo::Known(list)) => {
318 future::Either::Left(
319 response
320 .map_err(Error::from)
321 .and_then(H2Client::h2api_response)
322 .and_then(move |_result| {
323 future::ok(MergedChunkInfo::Known(list))
324 })
325 )
326 }
327 (None, MergedChunkInfo::Known(list)) => {
328 future::Either::Right(future::ok(MergedChunkInfo::Known(list)))
329 }
330 _ => unreachable!(),
331 }
332 })
333 .merge_known_chunks()
334 .and_then(move |merged_chunk_info| {
335 match merged_chunk_info {
336 MergedChunkInfo::Known(chunk_list) => {
337 let mut digest_list = vec![];
338 let mut offset_list = vec![];
339 for (offset, digest) in chunk_list {
340 //println!("append chunk {} (offset {})", proxmox::tools::digest_to_hex(&digest), offset);
341 digest_list.push(digest_to_hex(&digest));
342 offset_list.push(offset);
343 }
344 println!("append chunks list len ({})", digest_list.len());
345 let param = json!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list });
346 let request = H2Client::request_builder("localhost", "PUT", &path, None, Some("application/json")).unwrap();
347 let param_data = bytes::Bytes::from(param.to_string().into_bytes());
348 let upload_data = Some(param_data);
349 h2_2.send_request(request, upload_data)
350 .and_then(move |response| {
351 response
352 .map_err(Error::from)
353 .and_then(H2Client::h2api_response)
354 .map_ok(|_| ())
355 })
356 .map_err(|err| format_err!("pipelined request failed: {}", err))
357 }
358 _ => unreachable!(),
359 }
360 })
361 .try_for_each(|_| future::ok(()))
362 .map(|result| {
363 let _ignore_closed_channel = verify_result_tx.send(result);
364 })
365 );
366
367 (verify_queue_tx, verify_result_rx)
368 }
369
370 pub async fn download_chunk_list(
371 &self,
372 path: &str,
373 archive_name: &str,
374 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
375 ) -> Result<(), Error> {
376
377 let param = json!({ "archive-name": archive_name });
378 let request = H2Client::request_builder("localhost", "GET", path, Some(param), None).unwrap();
379
380 let h2request = self.h2.send_request(request, None).await?;
381 let resp = h2request.await?;
382
383 let status = resp.status();
384
385 if !status.is_success() {
386 H2Client::h2api_response(resp).await?; // raise error
387 unreachable!();
388 }
389
390 let mut body = resp.into_body();
391 let mut flow_control = body.flow_control().clone();
392
393 let mut stream = DigestListDecoder::new(body.map_err(Error::from));
394
395 while let Some(chunk) = stream.try_next().await? {
396 let _ = flow_control.release_capacity(chunk.len());
397 println!("GOT DOWNLOAD {}", digest_to_hex(&chunk));
398 known_chunks.lock().unwrap().insert(chunk);
399 }
400
401 Ok(())
402 }
403
404 fn upload_chunk_info_stream(
405 h2: H2Client,
406 wid: u64,
407 stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
408 prefix: &str,
409 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
410 crypt_config: Option<Arc<CryptConfig>>,
411 ) -> impl Future<Output = Result<(usize, usize, usize, [u8; 32]), Error>> {
412
413 let repeat = Arc::new(AtomicUsize::new(0));
414 let repeat2 = repeat.clone();
415
416 let stream_len = Arc::new(AtomicUsize::new(0));
417 let stream_len2 = stream_len.clone();
418
419 let append_chunk_path = format!("{}_index", prefix);
420 let upload_chunk_path = format!("{}_chunk", prefix);
421 let is_fixed_chunk_size = prefix == "fixed";
422
423 let (upload_queue, upload_result) =
424 Self::append_chunk_queue(h2.clone(), wid, append_chunk_path.to_owned());
425
426 let start_time = std::time::Instant::now();
427
428 let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
429 let index_csum_2 = index_csum.clone();
430
431 stream
432 .and_then(move |data| {
433
434 let chunk_len = data.len();
435
436 repeat.fetch_add(1, Ordering::SeqCst);
437 let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;
438
439 let mut chunk_builder = DataChunkBuilder::new(data.as_ref())
440 .compress(true);
441
442 if let Some(ref crypt_config) = crypt_config {
443 chunk_builder = chunk_builder.crypt_config(crypt_config);
444 }
445
446 let mut known_chunks = known_chunks.lock().unwrap();
447 let digest = chunk_builder.digest();
448
449 let mut guard = index_csum.lock().unwrap();
450 let csum = guard.as_mut().unwrap();
451
452 let chunk_end = offset + chunk_len as u64;
453
454 if !is_fixed_chunk_size { csum.update(&chunk_end.to_le_bytes()); }
455 csum.update(digest);
456
457 let chunk_is_known = known_chunks.contains(digest);
458 if chunk_is_known {
459 future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
460 } else {
461 known_chunks.insert(*digest);
462 future::ready(chunk_builder
463 .build()
464 .map(move |(chunk, digest)| MergedChunkInfo::New(ChunkInfo {
465 chunk,
466 digest,
467 chunk_len: chunk_len as u64,
468 offset,
469 }))
470 )
471 }
472 })
473 .merge_known_chunks()
474 .try_for_each(move |merged_chunk_info| {
475
476 if let MergedChunkInfo::New(chunk_info) = merged_chunk_info {
477 let offset = chunk_info.offset;
478 let digest = chunk_info.digest;
479 let digest_str = digest_to_hex(&digest);
480
481 println!("upload new chunk {} ({} bytes, offset {})", digest_str,
482 chunk_info.chunk_len, offset);
483
484 let chunk_data = chunk_info.chunk.into_inner();
485 let param = json!({
486 "wid": wid,
487 "digest": digest_str,
488 "size": chunk_info.chunk_len,
489 "encoded-size": chunk_data.len(),
490 });
491
492 let ct = "application/octet-stream";
493 let request = H2Client::request_builder("localhost", "POST", &upload_chunk_path, Some(param), Some(ct)).unwrap();
494 let upload_data = Some(bytes::Bytes::from(chunk_data));
495
496 let new_info = MergedChunkInfo::Known(vec![(offset, digest)]);
497
498 let mut upload_queue = upload_queue.clone();
499 future::Either::Left(h2
500 .send_request(request, upload_data)
501 .and_then(move |response| async move {
502 upload_queue
503 .send((new_info, Some(response)))
504 .await
505 .map_err(|err| format_err!("failed to send to upload queue: {}", err))
506 })
507 )
508 } else {
509 let mut upload_queue = upload_queue.clone();
510 future::Either::Right(async move {
511 upload_queue
512 .send((merged_chunk_info, None))
513 .await
514 .map_err(|err| format_err!("failed to send to upload queue: {}", err))
515 })
516 }
517 })
518 .then(move |result| async move {
519 upload_result.await?.and(result)
520 }.boxed())
521 .and_then(move |_| {
522 let repeat = repeat2.load(Ordering::SeqCst);
523 let stream_len = stream_len2.load(Ordering::SeqCst);
524 let speed = ((stream_len*1_000_000)/(1024*1024))/(start_time.elapsed().as_micros() as usize);
525 println!("Uploaded {} chunks in {} seconds ({} MB/s).", repeat, start_time.elapsed().as_secs(), speed);
526 if repeat > 0 {
527 println!("Average chunk size was {} bytes.", stream_len/repeat);
528 println!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128));
529 }
530
531 let mut guard = index_csum_2.lock().unwrap();
532 let csum = guard.take().unwrap().finish();
533
534 futures::future::ok((repeat, stream_len, speed, csum))
535 })
536 }
537
538 pub async fn upload_speedtest(&self) -> Result<usize, Error> {
539
540 let mut data = vec![];
541 // generate pseudo random byte sequence
542 for i in 0..1024*1024 {
543 for j in 0..4 {
544 let byte = ((i >> (j<<3))&0xff) as u8;
545 data.push(byte);
546 }
547 }
548
549 let item_len = data.len();
550
551 let mut repeat = 0;
552
553 let (upload_queue, upload_result) = Self::response_queue();
554
555 let start_time = std::time::Instant::now();
556
557 loop {
558 repeat += 1;
559 if start_time.elapsed().as_secs() >= 5 {
560 break;
561 }
562
563 let mut upload_queue = upload_queue.clone();
564
565 println!("send test data ({} bytes)", data.len());
566 let request = H2Client::request_builder("localhost", "POST", "speedtest", None, None).unwrap();
567 let request_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?;
568
569 upload_queue.send(request_future).await?;
570 }
571
572 drop(upload_queue); // close queue
573
574 let _ = upload_result.await?;
575
576 println!("Uploaded {} chunks in {} seconds.", repeat, start_time.elapsed().as_secs());
577 let speed = ((item_len*1_000_000*(repeat as usize))/(1024*1024))/(start_time.elapsed().as_micros() as usize);
578 println!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128));
579
580 Ok(speed)
581 }
582 }