]>
Commit | Line | Data |
---|---|---|
cf9271e2 | 1 | use std::collections::HashSet; |
b957aa81 | 2 | use std::os::unix::fs::OpenOptionsExt; |
cf9271e2 DM |
3 | use std::sync::atomic::{AtomicUsize, Ordering}; |
4 | use std::sync::{Arc, Mutex}; | |
5 | ||
f28d9088 | 6 | use anyhow::{bail, format_err, Error}; |
cf9271e2 DM |
7 | use futures::*; |
8 | use futures::stream::Stream; | |
dc089345 | 9 | use futures::future::AbortHandle; |
cf9271e2 DM |
10 | use serde_json::{json, Value}; |
11 | use tokio::io::AsyncReadExt; | |
12 | use tokio::sync::{mpsc, oneshot}; | |
7c667013 | 13 | use tokio_stream::wrappers::ReceiverStream; |
cf9271e2 DM |
14 | |
15 | use proxmox::tools::digest_to_hex; | |
16 | ||
17 | use super::merge_known_chunks::{MergedChunkInfo, MergeKnownChunks}; | |
18 | use crate::backup::*; | |
6e1deb15 | 19 | use crate::tools::format::HumanByte; |
cf9271e2 DM |
20 | |
21 | use super::{HttpClient, H2Client}; | |
22 | ||
23 | pub struct BackupWriter { | |
24 | h2: H2Client, | |
dc089345 | 25 | abort: AbortHandle, |
e02c3d46 | 26 | verbose: bool, |
b957aa81 | 27 | crypt_config: Option<Arc<CryptConfig>>, |
cf9271e2 DM |
28 | } |
29 | ||
30 | impl Drop for BackupWriter { | |
31 | ||
32 | fn drop(&mut self) { | |
dc089345 | 33 | self.abort.abort(); |
cf9271e2 DM |
34 | } |
35 | } | |
36 | ||
37 | pub struct BackupStats { | |
38 | pub size: u64, | |
39 | pub csum: [u8; 32], | |
40 | } | |
41 | ||
e43b9175 FG |
42 | /// Options for uploading blobs/streams to the server |
43 | #[derive(Default, Clone)] | |
44 | pub struct UploadOptions { | |
45 | pub previous_manifest: Option<Arc<BackupManifest>>, | |
46 | pub compress: bool, | |
47 | pub encrypt: bool, | |
48 | pub fixed_size: Option<u64>, | |
49 | } | |
50 | ||
8db14689 WB |
51 | type UploadQueueSender = mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>; |
52 | type UploadResultReceiver = oneshot::Receiver<Result<(), Error>>; | |
53 | ||
cf9271e2 DM |
54 | impl BackupWriter { |
55 | ||
b957aa81 DM |
56 | fn new(h2: H2Client, abort: AbortHandle, crypt_config: Option<Arc<CryptConfig>>, verbose: bool) -> Arc<Self> { |
57 | Arc::new(Self { h2, abort, crypt_config, verbose }) | |
cf9271e2 DM |
58 | } |
59 | ||
367c0ff7 FG |
60 | // FIXME: extract into (flattened) parameter struct? |
61 | #[allow(clippy::too_many_arguments)] | |
cf9271e2 DM |
62 | pub async fn start( |
63 | client: HttpClient, | |
b957aa81 | 64 | crypt_config: Option<Arc<CryptConfig>>, |
cf9271e2 DM |
65 | datastore: &str, |
66 | backup_type: &str, | |
67 | backup_id: &str, | |
6a7be83e | 68 | backup_time: i64, |
cf9271e2 | 69 | debug: bool, |
61d7b501 | 70 | benchmark: bool |
cf9271e2 DM |
71 | ) -> Result<Arc<BackupWriter>, Error> { |
72 | ||
73 | let param = json!({ | |
74 | "backup-type": backup_type, | |
75 | "backup-id": backup_id, | |
6a7be83e | 76 | "backup-time": backup_time, |
cf9271e2 | 77 | "store": datastore, |
61d7b501 HL |
78 | "debug": debug, |
79 | "benchmark": benchmark | |
cf9271e2 DM |
80 | }); |
81 | ||
82 | let req = HttpClient::request_builder( | |
ba20987a | 83 | client.server(), client.port(), "GET", "/api2/json/backup", Some(param)).unwrap(); |
cf9271e2 | 84 | |
dc089345 | 85 | let (h2, abort) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())).await?; |
cf9271e2 | 86 | |
b957aa81 | 87 | Ok(BackupWriter::new(h2, abort, crypt_config, debug)) |
cf9271e2 DM |
88 | } |
89 | ||
90 | pub async fn get( | |
91 | &self, | |
92 | path: &str, | |
93 | param: Option<Value>, | |
94 | ) -> Result<Value, Error> { | |
95 | self.h2.get(path, param).await | |
96 | } | |
97 | ||
98 | pub async fn put( | |
99 | &self, | |
100 | path: &str, | |
101 | param: Option<Value>, | |
102 | ) -> Result<Value, Error> { | |
103 | self.h2.put(path, param).await | |
104 | } | |
105 | ||
106 | pub async fn post( | |
107 | &self, | |
108 | path: &str, | |
109 | param: Option<Value>, | |
110 | ) -> Result<Value, Error> { | |
111 | self.h2.post(path, param).await | |
112 | } | |
113 | ||
114 | pub async fn upload_post( | |
115 | &self, | |
116 | path: &str, | |
117 | param: Option<Value>, | |
118 | content_type: &str, | |
119 | data: Vec<u8>, | |
120 | ) -> Result<Value, Error> { | |
121 | self.h2.upload("POST", path, param, content_type, data).await | |
122 | } | |
123 | ||
124 | pub async fn send_upload_request( | |
125 | &self, | |
126 | method: &str, | |
127 | path: &str, | |
128 | param: Option<Value>, | |
129 | content_type: &str, | |
130 | data: Vec<u8>, | |
131 | ) -> Result<h2::client::ResponseFuture, Error> { | |
132 | ||
133 | let request = H2Client::request_builder("localhost", method, path, param, Some(content_type)).unwrap(); | |
134 | let response_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?; | |
135 | Ok(response_future) | |
136 | } | |
137 | ||
138 | pub async fn upload_put( | |
139 | &self, | |
140 | path: &str, | |
141 | param: Option<Value>, | |
142 | content_type: &str, | |
143 | data: Vec<u8>, | |
144 | ) -> Result<Value, Error> { | |
145 | self.h2.upload("PUT", path, param, content_type, data).await | |
146 | } | |
147 | ||
148 | pub async fn finish(self: Arc<Self>) -> Result<(), Error> { | |
149 | let h2 = self.h2.clone(); | |
150 | ||
151 | h2.post("finish", None) | |
152 | .map_ok(move |_| { | |
dc089345 | 153 | self.abort.abort(); |
cf9271e2 DM |
154 | }) |
155 | .await | |
156 | } | |
157 | ||
e016f9ff | 158 | pub fn cancel(&self) { |
dc089345 | 159 | self.abort.abort(); |
cf9271e2 DM |
160 | } |
161 | ||
162 | pub async fn upload_blob<R: std::io::Read>( | |
163 | &self, | |
164 | mut reader: R, | |
165 | file_name: &str, | |
166 | ) -> Result<BackupStats, Error> { | |
167 | let mut raw_data = Vec::new(); | |
168 | // fixme: avoid loading into memory | |
169 | reader.read_to_end(&mut raw_data)?; | |
170 | ||
171 | let csum = openssl::sha::sha256(&raw_data); | |
172 | let param = json!({"encoded-size": raw_data.len(), "file-name": file_name }); | |
173 | let size = raw_data.len() as u64; | |
174 | let _value = self.h2.upload("POST", "blob", Some(param), "application/octet-stream", raw_data).await?; | |
175 | Ok(BackupStats { size, csum }) | |
176 | } | |
177 | ||
178 | pub async fn upload_blob_from_data( | |
179 | &self, | |
180 | data: Vec<u8>, | |
181 | file_name: &str, | |
e43b9175 | 182 | options: UploadOptions, |
f28d9088 | 183 | ) -> Result<BackupStats, Error> { |
e43b9175 FG |
184 | let blob = match (options.encrypt, &self.crypt_config) { |
185 | (false, _) => DataBlob::encode(&data, None, options.compress)?, | |
3638341a | 186 | (true, None) => bail!("requested encryption without a crypt config"), |
e43b9175 | 187 | (true, Some(crypt_config)) => DataBlob::encode(&data, Some(crypt_config), options.compress)?, |
cf9271e2 DM |
188 | }; |
189 | ||
190 | let raw_data = blob.into_inner(); | |
191 | let size = raw_data.len() as u64; | |
192 | ||
193 | let csum = openssl::sha::sha256(&raw_data); | |
194 | let param = json!({"encoded-size": size, "file-name": file_name }); | |
195 | let _value = self.h2.upload("POST", "blob", Some(param), "application/octet-stream", raw_data).await?; | |
196 | Ok(BackupStats { size, csum }) | |
197 | } | |
198 | ||
199 | pub async fn upload_blob_from_file<P: AsRef<std::path::Path>>( | |
200 | &self, | |
201 | src_path: P, | |
202 | file_name: &str, | |
e43b9175 | 203 | options: UploadOptions, |
3638341a | 204 | ) -> Result<BackupStats, Error> { |
cf9271e2 DM |
205 | |
206 | let src_path = src_path.as_ref(); | |
207 | ||
208 | let mut file = tokio::fs::File::open(src_path) | |
209 | .await | |
210 | .map_err(|err| format_err!("unable to open file {:?} - {}", src_path, err))?; | |
211 | ||
212 | let mut contents = Vec::new(); | |
213 | ||
214 | file.read_to_end(&mut contents) | |
215 | .await | |
216 | .map_err(|err| format_err!("unable to read file {:?} - {}", src_path, err))?; | |
217 | ||
e43b9175 | 218 | self.upload_blob_from_data(contents, file_name, options).await |
cf9271e2 DM |
219 | } |
220 | ||
221 | pub async fn upload_stream( | |
222 | &self, | |
223 | archive_name: &str, | |
224 | stream: impl Stream<Item = Result<bytes::BytesMut, Error>>, | |
e43b9175 | 225 | options: UploadOptions, |
cf9271e2 DM |
226 | ) -> Result<BackupStats, Error> { |
227 | let known_chunks = Arc::new(Mutex::new(HashSet::new())); | |
228 | ||
229 | let mut param = json!({ "archive-name": archive_name }); | |
e43b9175 | 230 | let prefix = if let Some(size) = options.fixed_size { |
cf9271e2 | 231 | param["size"] = size.into(); |
e43b9175 FG |
232 | "fixed" |
233 | } else { | |
234 | "dynamic" | |
235 | }; | |
cf9271e2 | 236 | |
e43b9175 | 237 | if options.encrypt && self.crypt_config.is_none() { |
3638341a DM |
238 | bail!("requested encryption without a crypt config"); |
239 | } | |
240 | ||
cf9271e2 DM |
241 | let index_path = format!("{}_index", prefix); |
242 | let close_path = format!("{}_close", prefix); | |
243 | ||
e43b9175 | 244 | if let Some(manifest) = options.previous_manifest { |
b957aa81 DM |
245 | // try, but ignore errors |
246 | match archive_type(archive_name) { | |
247 | Ok(ArchiveType::FixedIndex) => { | |
248 | let _ = self.download_previous_fixed_index(archive_name, &manifest, known_chunks.clone()).await; | |
249 | } | |
250 | Ok(ArchiveType::DynamicIndex) => { | |
251 | let _ = self.download_previous_dynamic_index(archive_name, &manifest, known_chunks.clone()).await; | |
252 | } | |
253 | _ => { /* do nothing */ } | |
254 | } | |
255 | } | |
cf9271e2 DM |
256 | |
257 | let wid = self.h2.post(&index_path, Some(param)).await?.as_u64().unwrap(); | |
258 | ||
6e1deb15 | 259 | let (chunk_count, chunk_reused, size, size_reused, duration, csum) = |
cf9271e2 DM |
260 | Self::upload_chunk_info_stream( |
261 | self.h2.clone(), | |
262 | wid, | |
263 | stream, | |
264 | &prefix, | |
265 | known_chunks.clone(), | |
e43b9175 FG |
266 | if options.encrypt { self.crypt_config.clone() } else { None }, |
267 | options.compress, | |
e02c3d46 | 268 | self.verbose, |
cf9271e2 DM |
269 | ) |
270 | .await?; | |
271 | ||
6e1deb15 TL |
272 | let uploaded = size - size_reused; |
273 | let vsize_h: HumanByte = size.into(); | |
274 | let archive = if self.verbose { | |
275 | archive_name.to_string() | |
276 | } else { | |
8db14689 | 277 | crate::tools::format::strip_server_file_extension(archive_name) |
6e1deb15 TL |
278 | }; |
279 | if archive_name != CATALOG_NAME { | |
0cf14984 | 280 | let speed: HumanByte = ((uploaded * 1_000_000) / (duration.as_micros() as usize)).into(); |
6e1deb15 | 281 | let uploaded: HumanByte = uploaded.into(); |
505c5f0f | 282 | println!("{}: had to upload {} of {} in {:.2}s, average speed {}/s).", archive, uploaded, vsize_h, duration.as_secs_f64(), speed); |
6e1deb15 TL |
283 | } else { |
284 | println!("Uploaded backup catalog ({})", vsize_h); | |
285 | } | |
286 | ||
287 | if size_reused > 0 && size > 1024*1024 { | |
288 | let reused_percent = size_reused as f64 * 100. / size as f64; | |
289 | let reused: HumanByte = size_reused.into(); | |
290 | println!("{}: backup was done incrementally, reused {} ({:.1}%)", archive, reused, reused_percent); | |
291 | } | |
292 | if self.verbose && chunk_count > 0 { | |
293 | println!("{}: Reused {} from {} chunks.", archive, chunk_reused, chunk_count); | |
294 | println!("{}: Average chunk size was {}.", archive, HumanByte::from(size/chunk_count)); | |
295 | println!("{}: Average time per request: {} microseconds.", archive, (duration.as_micros())/(chunk_count as u128)); | |
6da73c82 DM |
296 | } |
297 | ||
cf9271e2 DM |
298 | let param = json!({ |
299 | "wid": wid , | |
300 | "chunk-count": chunk_count, | |
301 | "size": size, | |
302 | "csum": proxmox::tools::digest_to_hex(&csum), | |
303 | }); | |
304 | let _value = self.h2.post(&close_path, Some(param)).await?; | |
305 | Ok(BackupStats { | |
306 | size: size as u64, | |
307 | csum, | |
308 | }) | |
309 | } | |
310 | ||
323b2f3d | 311 | fn response_queue(verbose: bool) -> ( |
cf9271e2 DM |
312 | mpsc::Sender<h2::client::ResponseFuture>, |
313 | oneshot::Receiver<Result<(), Error>> | |
314 | ) { | |
315 | let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100); | |
316 | let (verify_result_tx, verify_result_rx) = oneshot::channel(); | |
317 | ||
db0cb9ce WB |
318 | // FIXME: check if this works as expected as replacement for the combinator below? |
319 | // tokio::spawn(async move { | |
320 | // let result: Result<(), Error> = (async move { | |
321 | // while let Some(response) = verify_queue_rx.recv().await { | |
322 | // match H2Client::h2api_response(response.await?).await { | |
323 | // Ok(result) => println!("RESPONSE: {:?}", result), | |
324 | // Err(err) => bail!("pipelined request failed: {}", err), | |
325 | // } | |
326 | // } | |
327 | // Ok(()) | |
328 | // }).await; | |
329 | // let _ignore_closed_channel = verify_result_tx.send(result); | |
330 | // }); | |
331 | // old code for reference? | |
332 | tokio::spawn( | |
7c667013 | 333 | ReceiverStream::new(verify_queue_rx) |
cf9271e2 | 334 | .map(Ok::<_, Error>) |
323b2f3d | 335 | .try_for_each(move |response: h2::client::ResponseFuture| { |
cf9271e2 DM |
336 | response |
337 | .map_err(Error::from) | |
338 | .and_then(H2Client::h2api_response) | |
323b2f3d | 339 | .map_ok(move |result| if verbose { println!("RESPONSE: {:?}", result) }) |
cf9271e2 DM |
340 | .map_err(|err| format_err!("pipelined request failed: {}", err)) |
341 | }) | |
342 | .map(|result| { | |
343 | let _ignore_closed_channel = verify_result_tx.send(result); | |
344 | }) | |
345 | ); | |
346 | ||
347 | (verify_queue_tx, verify_result_rx) | |
348 | } | |
349 | ||
8db14689 WB |
350 | fn append_chunk_queue( |
351 | h2: H2Client, | |
352 | wid: u64, | |
353 | path: String, | |
354 | verbose: bool, | |
355 | ) -> (UploadQueueSender, UploadResultReceiver) { | |
cf9271e2 DM |
356 | let (verify_queue_tx, verify_queue_rx) = mpsc::channel(64); |
357 | let (verify_result_tx, verify_result_rx) = oneshot::channel(); | |
358 | ||
db0cb9ce WB |
359 | // FIXME: async-block-ify this code! |
360 | tokio::spawn( | |
7c667013 | 361 | ReceiverStream::new(verify_queue_rx) |
cf9271e2 DM |
362 | .map(Ok::<_, Error>) |
363 | .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| { | |
364 | match (response, merged_chunk_info) { | |
365 | (Some(response), MergedChunkInfo::Known(list)) => { | |
366 | future::Either::Left( | |
367 | response | |
368 | .map_err(Error::from) | |
369 | .and_then(H2Client::h2api_response) | |
370 | .and_then(move |_result| { | |
371 | future::ok(MergedChunkInfo::Known(list)) | |
372 | }) | |
373 | ) | |
374 | } | |
375 | (None, MergedChunkInfo::Known(list)) => { | |
376 | future::Either::Right(future::ok(MergedChunkInfo::Known(list))) | |
377 | } | |
378 | _ => unreachable!(), | |
379 | } | |
380 | }) | |
381 | .merge_known_chunks() | |
382 | .and_then(move |merged_chunk_info| { | |
383 | match merged_chunk_info { | |
384 | MergedChunkInfo::Known(chunk_list) => { | |
385 | let mut digest_list = vec![]; | |
386 | let mut offset_list = vec![]; | |
387 | for (offset, digest) in chunk_list { | |
cf9271e2 DM |
388 | digest_list.push(digest_to_hex(&digest)); |
389 | offset_list.push(offset); | |
390 | } | |
e02c3d46 | 391 | if verbose { println!("append chunks list len ({})", digest_list.len()); } |
cf9271e2 DM |
392 | let param = json!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list }); |
393 | let request = H2Client::request_builder("localhost", "PUT", &path, None, Some("application/json")).unwrap(); | |
db0cb9ce | 394 | let param_data = bytes::Bytes::from(param.to_string().into_bytes()); |
cf9271e2 | 395 | let upload_data = Some(param_data); |
8db14689 | 396 | h2.send_request(request, upload_data) |
cf9271e2 DM |
397 | .and_then(move |response| { |
398 | response | |
399 | .map_err(Error::from) | |
400 | .and_then(H2Client::h2api_response) | |
401 | .map_ok(|_| ()) | |
402 | }) | |
403 | .map_err(|err| format_err!("pipelined request failed: {}", err)) | |
404 | } | |
405 | _ => unreachable!(), | |
406 | } | |
407 | }) | |
408 | .try_for_each(|_| future::ok(())) | |
409 | .map(|result| { | |
410 | let _ignore_closed_channel = verify_result_tx.send(result); | |
411 | }) | |
412 | ); | |
413 | ||
414 | (verify_queue_tx, verify_result_rx) | |
415 | } | |
416 | ||
b957aa81 | 417 | pub async fn download_previous_fixed_index( |
cf9271e2 | 418 | &self, |
cf9271e2 | 419 | archive_name: &str, |
b957aa81 | 420 | manifest: &BackupManifest, |
cf9271e2 | 421 | known_chunks: Arc<Mutex<HashSet<[u8;32]>>>, |
b957aa81 | 422 | ) -> Result<FixedIndexReader, Error> { |
cf9271e2 | 423 | |
b957aa81 DM |
424 | let mut tmpfile = std::fs::OpenOptions::new() |
425 | .write(true) | |
426 | .read(true) | |
427 | .custom_flags(libc::O_TMPFILE) | |
428 | .open("/tmp")?; | |
cf9271e2 | 429 | |
b957aa81 DM |
430 | let param = json!({ "archive-name": archive_name }); |
431 | self.h2.download("previous", Some(param), &mut tmpfile).await?; | |
432 | ||
433 | let index = FixedIndexReader::new(tmpfile) | |
434 | .map_err(|err| format_err!("unable to read fixed index '{}' - {}", archive_name, err))?; | |
435 | // Note: do not use values stored in index (not trusted) - instead, computed them again | |
436 | let (csum, size) = index.compute_csum(); | |
437 | manifest.verify_file(archive_name, &csum, size)?; | |
438 | ||
439 | // add index chunks to known chunks | |
440 | let mut known_chunks = known_chunks.lock().unwrap(); | |
441 | for i in 0..index.index_count() { | |
442 | known_chunks.insert(*index.index_digest(i).unwrap()); | |
443 | } | |
cf9271e2 | 444 | |
b957aa81 DM |
445 | if self.verbose { |
446 | println!("{}: known chunks list length is {}", archive_name, index.index_count()); | |
cf9271e2 DM |
447 | } |
448 | ||
b957aa81 DM |
449 | Ok(index) |
450 | } | |
451 | ||
452 | pub async fn download_previous_dynamic_index( | |
453 | &self, | |
454 | archive_name: &str, | |
455 | manifest: &BackupManifest, | |
456 | known_chunks: Arc<Mutex<HashSet<[u8;32]>>>, | |
457 | ) -> Result<DynamicIndexReader, Error> { | |
cf9271e2 | 458 | |
b957aa81 DM |
459 | let mut tmpfile = std::fs::OpenOptions::new() |
460 | .write(true) | |
461 | .read(true) | |
462 | .custom_flags(libc::O_TMPFILE) | |
463 | .open("/tmp")?; | |
cf9271e2 | 464 | |
b957aa81 DM |
465 | let param = json!({ "archive-name": archive_name }); |
466 | self.h2.download("previous", Some(param), &mut tmpfile).await?; | |
467 | ||
468 | let index = DynamicIndexReader::new(tmpfile) | |
40592856 | 469 | .map_err(|err| format_err!("unable to read dynmamic index '{}' - {}", archive_name, err))?; |
b957aa81 DM |
470 | // Note: do not use values stored in index (not trusted) - instead, computed them again |
471 | let (csum, size) = index.compute_csum(); | |
472 | manifest.verify_file(archive_name, &csum, size)?; | |
473 | ||
474 | // add index chunks to known chunks | |
475 | let mut known_chunks = known_chunks.lock().unwrap(); | |
476 | for i in 0..index.index_count() { | |
477 | known_chunks.insert(*index.index_digest(i).unwrap()); | |
cf9271e2 DM |
478 | } |
479 | ||
e02c3d46 | 480 | if self.verbose { |
b957aa81 | 481 | println!("{}: known chunks list length is {}", archive_name, index.index_count()); |
e02c3d46 | 482 | } |
ee5fe978 | 483 | |
b957aa81 DM |
484 | Ok(index) |
485 | } | |
486 | ||
8b7f8d3f FG |
487 | /// Retrieve backup time of last backup |
488 | pub async fn previous_backup_time(&self) -> Result<Option<i64>, Error> { | |
489 | let data = self.h2.get("previous_backup_time", None).await?; | |
490 | serde_json::from_value(data) | |
491 | .map_err(|err| format_err!("Failed to parse backup time value returned by server - {}", err)) | |
492 | } | |
493 | ||
b957aa81 DM |
494 | /// Download backup manifest (index.json) of last backup |
495 | pub async fn download_previous_manifest(&self) -> Result<BackupManifest, Error> { | |
496 | ||
b957aa81 DM |
497 | let mut raw_data = Vec::with_capacity(64 * 1024); |
498 | ||
499 | let param = json!({ "archive-name": MANIFEST_BLOB_NAME }); | |
500 | self.h2.download("previous", Some(param), &mut raw_data).await?; | |
501 | ||
39f18b30 | 502 | let blob = DataBlob::load_from_reader(&mut &raw_data[..])?; |
8819d1f2 FG |
503 | // no expected digest available |
504 | let data = blob.decode(self.crypt_config.as_ref().map(Arc::as_ref), None)?; | |
3dacedce DM |
505 | |
506 | let manifest = BackupManifest::from_data(&data[..], self.crypt_config.as_ref().map(Arc::as_ref))?; | |
b957aa81 DM |
507 | |
508 | Ok(manifest) | |
cf9271e2 DM |
509 | } |
510 | ||
8db14689 | 511 | // We have no `self` here for `h2` and `verbose`, the only other arg "common" with 1 other |
d1d74c43 | 512 | // function in the same path is `wid`, so those 3 could be in a struct, but there's no real use |
8db14689 WB |
513 | // since this is a private method. |
514 | #[allow(clippy::too_many_arguments)] | |
cf9271e2 DM |
515 | fn upload_chunk_info_stream( |
516 | h2: H2Client, | |
517 | wid: u64, | |
518 | stream: impl Stream<Item = Result<bytes::BytesMut, Error>>, | |
519 | prefix: &str, | |
520 | known_chunks: Arc<Mutex<HashSet<[u8;32]>>>, | |
521 | crypt_config: Option<Arc<CryptConfig>>, | |
3638341a | 522 | compress: bool, |
e02c3d46 | 523 | verbose: bool, |
6e1deb15 | 524 | ) -> impl Future<Output = Result<(usize, usize, usize, usize, std::time::Duration, [u8; 32]), Error>> { |
cf9271e2 | 525 | |
6e1deb15 TL |
526 | let total_chunks = Arc::new(AtomicUsize::new(0)); |
527 | let total_chunks2 = total_chunks.clone(); | |
528 | let known_chunk_count = Arc::new(AtomicUsize::new(0)); | |
529 | let known_chunk_count2 = known_chunk_count.clone(); | |
cf9271e2 DM |
530 | |
531 | let stream_len = Arc::new(AtomicUsize::new(0)); | |
532 | let stream_len2 = stream_len.clone(); | |
6e1deb15 TL |
533 | let reused_len = Arc::new(AtomicUsize::new(0)); |
534 | let reused_len2 = reused_len.clone(); | |
cf9271e2 DM |
535 | |
536 | let append_chunk_path = format!("{}_index", prefix); | |
537 | let upload_chunk_path = format!("{}_chunk", prefix); | |
538 | let is_fixed_chunk_size = prefix == "fixed"; | |
539 | ||
540 | let (upload_queue, upload_result) = | |
8db14689 | 541 | Self::append_chunk_queue(h2.clone(), wid, append_chunk_path, verbose); |
cf9271e2 DM |
542 | |
543 | let start_time = std::time::Instant::now(); | |
544 | ||
545 | let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new()))); | |
546 | let index_csum_2 = index_csum.clone(); | |
547 | ||
548 | stream | |
549 | .and_then(move |data| { | |
550 | ||
551 | let chunk_len = data.len(); | |
552 | ||
6e1deb15 | 553 | total_chunks.fetch_add(1, Ordering::SeqCst); |
cf9271e2 DM |
554 | let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64; |
555 | ||
556 | let mut chunk_builder = DataChunkBuilder::new(data.as_ref()) | |
3638341a | 557 | .compress(compress); |
cf9271e2 DM |
558 | |
559 | if let Some(ref crypt_config) = crypt_config { | |
3638341a | 560 | chunk_builder = chunk_builder.crypt_config(crypt_config); |
cf9271e2 DM |
561 | } |
562 | ||
563 | let mut known_chunks = known_chunks.lock().unwrap(); | |
564 | let digest = chunk_builder.digest(); | |
565 | ||
566 | let mut guard = index_csum.lock().unwrap(); | |
567 | let csum = guard.as_mut().unwrap(); | |
568 | ||
569 | let chunk_end = offset + chunk_len as u64; | |
570 | ||
571 | if !is_fixed_chunk_size { csum.update(&chunk_end.to_le_bytes()); } | |
572 | csum.update(digest); | |
573 | ||
574 | let chunk_is_known = known_chunks.contains(digest); | |
575 | if chunk_is_known { | |
6e1deb15 TL |
576 | known_chunk_count.fetch_add(1, Ordering::SeqCst); |
577 | reused_len.fetch_add(chunk_len, Ordering::SeqCst); | |
cf9271e2 DM |
578 | future::ok(MergedChunkInfo::Known(vec![(offset, *digest)])) |
579 | } else { | |
580 | known_chunks.insert(*digest); | |
581 | future::ready(chunk_builder | |
582 | .build() | |
583 | .map(move |(chunk, digest)| MergedChunkInfo::New(ChunkInfo { | |
584 | chunk, | |
585 | digest, | |
586 | chunk_len: chunk_len as u64, | |
587 | offset, | |
588 | })) | |
589 | ) | |
590 | } | |
591 | }) | |
592 | .merge_known_chunks() | |
593 | .try_for_each(move |merged_chunk_info| { | |
0bfcea6a | 594 | let upload_queue = upload_queue.clone(); |
cf9271e2 DM |
595 | |
596 | if let MergedChunkInfo::New(chunk_info) = merged_chunk_info { | |
597 | let offset = chunk_info.offset; | |
598 | let digest = chunk_info.digest; | |
599 | let digest_str = digest_to_hex(&digest); | |
600 | ||
8db14689 WB |
601 | /* too verbose, needs finer verbosity setting granularity |
602 | if verbose { | |
e02c3d46 DM |
603 | println!("upload new chunk {} ({} bytes, offset {})", digest_str, |
604 | chunk_info.chunk_len, offset); | |
605 | } | |
8db14689 | 606 | */ |
cf9271e2 | 607 | |
db0cb9ce | 608 | let chunk_data = chunk_info.chunk.into_inner(); |
cf9271e2 DM |
609 | let param = json!({ |
610 | "wid": wid, | |
611 | "digest": digest_str, | |
612 | "size": chunk_info.chunk_len, | |
613 | "encoded-size": chunk_data.len(), | |
614 | }); | |
615 | ||
616 | let ct = "application/octet-stream"; | |
617 | let request = H2Client::request_builder("localhost", "POST", &upload_chunk_path, Some(param), Some(ct)).unwrap(); | |
618 | let upload_data = Some(bytes::Bytes::from(chunk_data)); | |
619 | ||
620 | let new_info = MergedChunkInfo::Known(vec![(offset, digest)]); | |
621 | ||
cf9271e2 DM |
622 | future::Either::Left(h2 |
623 | .send_request(request, upload_data) | |
624 | .and_then(move |response| async move { | |
625 | upload_queue | |
626 | .send((new_info, Some(response))) | |
627 | .await | |
db0cb9ce | 628 | .map_err(|err| format_err!("failed to send to upload queue: {}", err)) |
cf9271e2 DM |
629 | }) |
630 | ) | |
631 | } else { | |
cf9271e2 DM |
632 | future::Either::Right(async move { |
633 | upload_queue | |
634 | .send((merged_chunk_info, None)) | |
635 | .await | |
db0cb9ce | 636 | .map_err(|err| format_err!("failed to send to upload queue: {}", err)) |
cf9271e2 DM |
637 | }) |
638 | } | |
639 | }) | |
640 | .then(move |result| async move { | |
641 | upload_result.await?.and(result) | |
642 | }.boxed()) | |
643 | .and_then(move |_| { | |
6e1deb15 TL |
644 | let duration = start_time.elapsed(); |
645 | let total_chunks = total_chunks2.load(Ordering::SeqCst); | |
646 | let known_chunk_count = known_chunk_count2.load(Ordering::SeqCst); | |
cf9271e2 | 647 | let stream_len = stream_len2.load(Ordering::SeqCst); |
6e1deb15 | 648 | let reused_len = reused_len2.load(Ordering::SeqCst); |
cf9271e2 DM |
649 | |
650 | let mut guard = index_csum_2.lock().unwrap(); | |
651 | let csum = guard.take().unwrap().finish(); | |
652 | ||
6e1deb15 | 653 | futures::future::ok((total_chunks, known_chunk_count, stream_len, reused_len, duration, csum)) |
cf9271e2 DM |
654 | }) |
655 | } | |
656 | ||
1ffe0301 | 657 | /// Upload speed test - prints result to stderr |
dde18bbb | 658 | pub async fn upload_speedtest(&self, verbose: bool) -> Result<f64, Error> { |
cf9271e2 DM |
659 | |
660 | let mut data = vec![]; | |
661 | // generate pseudo random byte sequence | |
662 | for i in 0..1024*1024 { | |
663 | for j in 0..4 { | |
664 | let byte = ((i >> (j<<3))&0xff) as u8; | |
665 | data.push(byte); | |
666 | } | |
667 | } | |
668 | ||
669 | let item_len = data.len(); | |
670 | ||
671 | let mut repeat = 0; | |
672 | ||
323b2f3d | 673 | let (upload_queue, upload_result) = Self::response_queue(verbose); |
cf9271e2 DM |
674 | |
675 | let start_time = std::time::Instant::now(); | |
676 | ||
677 | loop { | |
678 | repeat += 1; | |
679 | if start_time.elapsed().as_secs() >= 5 { | |
680 | break; | |
681 | } | |
682 | ||
dde18bbb | 683 | if verbose { eprintln!("send test data ({} bytes)", data.len()); } |
cf9271e2 DM |
684 | let request = H2Client::request_builder("localhost", "POST", "speedtest", None, None).unwrap(); |
685 | let request_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?; | |
686 | ||
687 | upload_queue.send(request_future).await?; | |
688 | } | |
689 | ||
690 | drop(upload_queue); // close queue | |
691 | ||
692 | let _ = upload_result.await?; | |
693 | ||
dde18bbb DM |
694 | eprintln!("Uploaded {} chunks in {} seconds.", repeat, start_time.elapsed().as_secs()); |
695 | let speed = ((item_len*(repeat as usize)) as f64)/start_time.elapsed().as_secs_f64(); | |
696 | eprintln!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128)); | |
cf9271e2 DM |
697 | |
698 | Ok(speed) | |
699 | } | |
700 | } |