]> git.proxmox.com Git - proxmox-backup.git/blame - src/client/backup_writer.rs
avoid chrono dependency, depend on proxmox 0.3.8
[proxmox-backup.git] / src / client / backup_writer.rs
CommitLineData
cf9271e2 1use std::collections::HashSet;
b957aa81 2use std::os::unix::fs::OpenOptionsExt;
cf9271e2
DM
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::sync::{Arc, Mutex};
5
f28d9088 6use anyhow::{bail, format_err, Error};
cf9271e2
DM
7use futures::*;
8use futures::stream::Stream;
dc089345 9use futures::future::AbortHandle;
cf9271e2
DM
10use serde_json::{json, Value};
11use tokio::io::AsyncReadExt;
12use tokio::sync::{mpsc, oneshot};
13
14use proxmox::tools::digest_to_hex;
15
16use super::merge_known_chunks::{MergedChunkInfo, MergeKnownChunks};
17use crate::backup::*;
6e1deb15 18use crate::tools::format::HumanByte;
cf9271e2
DM
19
20use super::{HttpClient, H2Client};
21
22pub struct BackupWriter {
23 h2: H2Client,
dc089345 24 abort: AbortHandle,
e02c3d46 25 verbose: bool,
b957aa81 26 crypt_config: Option<Arc<CryptConfig>>,
cf9271e2
DM
27}
28
29impl Drop for BackupWriter {
30
31 fn drop(&mut self) {
dc089345 32 self.abort.abort();
cf9271e2
DM
33 }
34}
35
36pub struct BackupStats {
37 pub size: u64,
38 pub csum: [u8; 32],
39}
40
41impl BackupWriter {
42
b957aa81
DM
43 fn new(h2: H2Client, abort: AbortHandle, crypt_config: Option<Arc<CryptConfig>>, verbose: bool) -> Arc<Self> {
44 Arc::new(Self { h2, abort, crypt_config, verbose })
cf9271e2
DM
45 }
46
47 pub async fn start(
48 client: HttpClient,
b957aa81 49 crypt_config: Option<Arc<CryptConfig>>,
cf9271e2
DM
50 datastore: &str,
51 backup_type: &str,
52 backup_id: &str,
6a7be83e 53 backup_time: i64,
cf9271e2 54 debug: bool,
61d7b501 55 benchmark: bool
cf9271e2
DM
56 ) -> Result<Arc<BackupWriter>, Error> {
57
58 let param = json!({
59 "backup-type": backup_type,
60 "backup-id": backup_id,
6a7be83e 61 "backup-time": backup_time,
cf9271e2 62 "store": datastore,
61d7b501
HL
63 "debug": debug,
64 "benchmark": benchmark
cf9271e2
DM
65 });
66
67 let req = HttpClient::request_builder(
68 client.server(), "GET", "/api2/json/backup", Some(param)).unwrap();
69
dc089345 70 let (h2, abort) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())).await?;
cf9271e2 71
b957aa81 72 Ok(BackupWriter::new(h2, abort, crypt_config, debug))
cf9271e2
DM
73 }
74
75 pub async fn get(
76 &self,
77 path: &str,
78 param: Option<Value>,
79 ) -> Result<Value, Error> {
80 self.h2.get(path, param).await
81 }
82
83 pub async fn put(
84 &self,
85 path: &str,
86 param: Option<Value>,
87 ) -> Result<Value, Error> {
88 self.h2.put(path, param).await
89 }
90
91 pub async fn post(
92 &self,
93 path: &str,
94 param: Option<Value>,
95 ) -> Result<Value, Error> {
96 self.h2.post(path, param).await
97 }
98
99 pub async fn upload_post(
100 &self,
101 path: &str,
102 param: Option<Value>,
103 content_type: &str,
104 data: Vec<u8>,
105 ) -> Result<Value, Error> {
106 self.h2.upload("POST", path, param, content_type, data).await
107 }
108
109 pub async fn send_upload_request(
110 &self,
111 method: &str,
112 path: &str,
113 param: Option<Value>,
114 content_type: &str,
115 data: Vec<u8>,
116 ) -> Result<h2::client::ResponseFuture, Error> {
117
118 let request = H2Client::request_builder("localhost", method, path, param, Some(content_type)).unwrap();
119 let response_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?;
120 Ok(response_future)
121 }
122
123 pub async fn upload_put(
124 &self,
125 path: &str,
126 param: Option<Value>,
127 content_type: &str,
128 data: Vec<u8>,
129 ) -> Result<Value, Error> {
130 self.h2.upload("PUT", path, param, content_type, data).await
131 }
132
133 pub async fn finish(self: Arc<Self>) -> Result<(), Error> {
134 let h2 = self.h2.clone();
135
136 h2.post("finish", None)
137 .map_ok(move |_| {
dc089345 138 self.abort.abort();
cf9271e2
DM
139 })
140 .await
141 }
142
e016f9ff 143 pub fn cancel(&self) {
dc089345 144 self.abort.abort();
cf9271e2
DM
145 }
146
147 pub async fn upload_blob<R: std::io::Read>(
148 &self,
149 mut reader: R,
150 file_name: &str,
151 ) -> Result<BackupStats, Error> {
152 let mut raw_data = Vec::new();
153 // fixme: avoid loading into memory
154 reader.read_to_end(&mut raw_data)?;
155
156 let csum = openssl::sha::sha256(&raw_data);
157 let param = json!({"encoded-size": raw_data.len(), "file-name": file_name });
158 let size = raw_data.len() as u64;
159 let _value = self.h2.upload("POST", "blob", Some(param), "application/octet-stream", raw_data).await?;
160 Ok(BackupStats { size, csum })
161 }
162
163 pub async fn upload_blob_from_data(
164 &self,
165 data: Vec<u8>,
166 file_name: &str,
cf9271e2 167 compress: bool,
3638341a 168 encrypt: bool,
f28d9088 169 ) -> Result<BackupStats, Error> {
3638341a
DM
170 let blob = match (encrypt, &self.crypt_config) {
171 (false, _) => DataBlob::encode(&data, None, compress)?,
172 (true, None) => bail!("requested encryption without a crypt config"),
173 (true, Some(crypt_config)) => DataBlob::encode(&data, Some(crypt_config), compress)?,
cf9271e2
DM
174 };
175
176 let raw_data = blob.into_inner();
177 let size = raw_data.len() as u64;
178
179 let csum = openssl::sha::sha256(&raw_data);
180 let param = json!({"encoded-size": size, "file-name": file_name });
181 let _value = self.h2.upload("POST", "blob", Some(param), "application/octet-stream", raw_data).await?;
182 Ok(BackupStats { size, csum })
183 }
184
185 pub async fn upload_blob_from_file<P: AsRef<std::path::Path>>(
186 &self,
187 src_path: P,
188 file_name: &str,
cf9271e2 189 compress: bool,
3638341a
DM
190 encrypt: bool,
191 ) -> Result<BackupStats, Error> {
cf9271e2
DM
192
193 let src_path = src_path.as_ref();
194
195 let mut file = tokio::fs::File::open(src_path)
196 .await
197 .map_err(|err| format_err!("unable to open file {:?} - {}", src_path, err))?;
198
199 let mut contents = Vec::new();
200
201 file.read_to_end(&mut contents)
202 .await
203 .map_err(|err| format_err!("unable to read file {:?} - {}", src_path, err))?;
204
3638341a 205 self.upload_blob_from_data(contents, file_name, compress, encrypt).await
cf9271e2
DM
206 }
207
208 pub async fn upload_stream(
209 &self,
b957aa81 210 previous_manifest: Option<Arc<BackupManifest>>,
cf9271e2
DM
211 archive_name: &str,
212 stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
213 prefix: &str,
214 fixed_size: Option<u64>,
3638341a
DM
215 compress: bool,
216 encrypt: bool,
cf9271e2
DM
217 ) -> Result<BackupStats, Error> {
218 let known_chunks = Arc::new(Mutex::new(HashSet::new()));
219
220 let mut param = json!({ "archive-name": archive_name });
221 if let Some(size) = fixed_size {
222 param["size"] = size.into();
223 }
224
3638341a
DM
225 if encrypt && self.crypt_config.is_none() {
226 bail!("requested encryption without a crypt config");
227 }
228
cf9271e2
DM
229 let index_path = format!("{}_index", prefix);
230 let close_path = format!("{}_close", prefix);
231
b957aa81
DM
232 if let Some(manifest) = previous_manifest {
233 // try, but ignore errors
234 match archive_type(archive_name) {
235 Ok(ArchiveType::FixedIndex) => {
236 let _ = self.download_previous_fixed_index(archive_name, &manifest, known_chunks.clone()).await;
237 }
238 Ok(ArchiveType::DynamicIndex) => {
239 let _ = self.download_previous_dynamic_index(archive_name, &manifest, known_chunks.clone()).await;
240 }
241 _ => { /* do nothing */ }
242 }
243 }
cf9271e2
DM
244
245 let wid = self.h2.post(&index_path, Some(param)).await?.as_u64().unwrap();
246
6e1deb15 247 let (chunk_count, chunk_reused, size, size_reused, duration, csum) =
cf9271e2
DM
248 Self::upload_chunk_info_stream(
249 self.h2.clone(),
250 wid,
251 stream,
252 &prefix,
253 known_chunks.clone(),
3638341a
DM
254 if encrypt { self.crypt_config.clone() } else { None },
255 compress,
e02c3d46 256 self.verbose,
cf9271e2
DM
257 )
258 .await?;
259
6e1deb15
TL
260 let uploaded = size - size_reused;
261 let vsize_h: HumanByte = size.into();
262 let archive = if self.verbose {
263 archive_name.to_string()
264 } else {
265 crate::tools::format::strip_server_file_expenstion(archive_name.clone())
266 };
267 if archive_name != CATALOG_NAME {
0cf14984 268 let speed: HumanByte = ((uploaded * 1_000_000) / (duration.as_micros() as usize)).into();
6e1deb15 269 let uploaded: HumanByte = uploaded.into();
505c5f0f 270 println!("{}: had to upload {} of {} in {:.2}s, average speed {}/s).", archive, uploaded, vsize_h, duration.as_secs_f64(), speed);
6e1deb15
TL
271 } else {
272 println!("Uploaded backup catalog ({})", vsize_h);
273 }
274
275 if size_reused > 0 && size > 1024*1024 {
276 let reused_percent = size_reused as f64 * 100. / size as f64;
277 let reused: HumanByte = size_reused.into();
278 println!("{}: backup was done incrementally, reused {} ({:.1}%)", archive, reused, reused_percent);
279 }
280 if self.verbose && chunk_count > 0 {
281 println!("{}: Reused {} from {} chunks.", archive, chunk_reused, chunk_count);
282 println!("{}: Average chunk size was {}.", archive, HumanByte::from(size/chunk_count));
283 println!("{}: Average time per request: {} microseconds.", archive, (duration.as_micros())/(chunk_count as u128));
6da73c82
DM
284 }
285
cf9271e2
DM
286 let param = json!({
287 "wid": wid ,
288 "chunk-count": chunk_count,
289 "size": size,
290 "csum": proxmox::tools::digest_to_hex(&csum),
291 });
292 let _value = self.h2.post(&close_path, Some(param)).await?;
293 Ok(BackupStats {
294 size: size as u64,
295 csum,
296 })
297 }
298
323b2f3d 299 fn response_queue(verbose: bool) -> (
cf9271e2
DM
300 mpsc::Sender<h2::client::ResponseFuture>,
301 oneshot::Receiver<Result<(), Error>>
302 ) {
303 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100);
304 let (verify_result_tx, verify_result_rx) = oneshot::channel();
305
db0cb9ce
WB
306 // FIXME: check if this works as expected as replacement for the combinator below?
307 // tokio::spawn(async move {
308 // let result: Result<(), Error> = (async move {
309 // while let Some(response) = verify_queue_rx.recv().await {
310 // match H2Client::h2api_response(response.await?).await {
311 // Ok(result) => println!("RESPONSE: {:?}", result),
312 // Err(err) => bail!("pipelined request failed: {}", err),
313 // }
314 // }
315 // Ok(())
316 // }).await;
317 // let _ignore_closed_channel = verify_result_tx.send(result);
318 // });
319 // old code for reference?
320 tokio::spawn(
cf9271e2
DM
321 verify_queue_rx
322 .map(Ok::<_, Error>)
323b2f3d 323 .try_for_each(move |response: h2::client::ResponseFuture| {
cf9271e2
DM
324 response
325 .map_err(Error::from)
326 .and_then(H2Client::h2api_response)
323b2f3d 327 .map_ok(move |result| if verbose { println!("RESPONSE: {:?}", result) })
cf9271e2
DM
328 .map_err(|err| format_err!("pipelined request failed: {}", err))
329 })
330 .map(|result| {
331 let _ignore_closed_channel = verify_result_tx.send(result);
332 })
333 );
334
335 (verify_queue_tx, verify_result_rx)
336 }
337
e02c3d46 338 fn append_chunk_queue(h2: H2Client, wid: u64, path: String, verbose: bool) -> (
cf9271e2 339 mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>,
e02c3d46 340 oneshot::Receiver<Result<(), Error>>,
cf9271e2
DM
341 ) {
342 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(64);
343 let (verify_result_tx, verify_result_rx) = oneshot::channel();
344
345 let h2_2 = h2.clone();
346
db0cb9ce
WB
347 // FIXME: async-block-ify this code!
348 tokio::spawn(
cf9271e2
DM
349 verify_queue_rx
350 .map(Ok::<_, Error>)
351 .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| {
352 match (response, merged_chunk_info) {
353 (Some(response), MergedChunkInfo::Known(list)) => {
354 future::Either::Left(
355 response
356 .map_err(Error::from)
357 .and_then(H2Client::h2api_response)
358 .and_then(move |_result| {
359 future::ok(MergedChunkInfo::Known(list))
360 })
361 )
362 }
363 (None, MergedChunkInfo::Known(list)) => {
364 future::Either::Right(future::ok(MergedChunkInfo::Known(list)))
365 }
366 _ => unreachable!(),
367 }
368 })
369 .merge_known_chunks()
370 .and_then(move |merged_chunk_info| {
371 match merged_chunk_info {
372 MergedChunkInfo::Known(chunk_list) => {
373 let mut digest_list = vec![];
374 let mut offset_list = vec![];
375 for (offset, digest) in chunk_list {
cf9271e2
DM
376 digest_list.push(digest_to_hex(&digest));
377 offset_list.push(offset);
378 }
e02c3d46 379 if verbose { println!("append chunks list len ({})", digest_list.len()); }
cf9271e2
DM
380 let param = json!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list });
381 let request = H2Client::request_builder("localhost", "PUT", &path, None, Some("application/json")).unwrap();
db0cb9ce 382 let param_data = bytes::Bytes::from(param.to_string().into_bytes());
cf9271e2
DM
383 let upload_data = Some(param_data);
384 h2_2.send_request(request, upload_data)
385 .and_then(move |response| {
386 response
387 .map_err(Error::from)
388 .and_then(H2Client::h2api_response)
389 .map_ok(|_| ())
390 })
391 .map_err(|err| format_err!("pipelined request failed: {}", err))
392 }
393 _ => unreachable!(),
394 }
395 })
396 .try_for_each(|_| future::ok(()))
397 .map(|result| {
398 let _ignore_closed_channel = verify_result_tx.send(result);
399 })
400 );
401
402 (verify_queue_tx, verify_result_rx)
403 }
404
b957aa81 405 pub async fn download_previous_fixed_index(
cf9271e2 406 &self,
cf9271e2 407 archive_name: &str,
b957aa81 408 manifest: &BackupManifest,
cf9271e2 409 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
b957aa81 410 ) -> Result<FixedIndexReader, Error> {
cf9271e2 411
b957aa81
DM
412 let mut tmpfile = std::fs::OpenOptions::new()
413 .write(true)
414 .read(true)
415 .custom_flags(libc::O_TMPFILE)
416 .open("/tmp")?;
cf9271e2 417
b957aa81
DM
418 let param = json!({ "archive-name": archive_name });
419 self.h2.download("previous", Some(param), &mut tmpfile).await?;
420
421 let index = FixedIndexReader::new(tmpfile)
422 .map_err(|err| format_err!("unable to read fixed index '{}' - {}", archive_name, err))?;
423 // Note: do not use values stored in index (not trusted) - instead, computed them again
424 let (csum, size) = index.compute_csum();
425 manifest.verify_file(archive_name, &csum, size)?;
426
427 // add index chunks to known chunks
428 let mut known_chunks = known_chunks.lock().unwrap();
429 for i in 0..index.index_count() {
430 known_chunks.insert(*index.index_digest(i).unwrap());
431 }
cf9271e2 432
b957aa81
DM
433 if self.verbose {
434 println!("{}: known chunks list length is {}", archive_name, index.index_count());
cf9271e2
DM
435 }
436
b957aa81
DM
437 Ok(index)
438 }
439
440 pub async fn download_previous_dynamic_index(
441 &self,
442 archive_name: &str,
443 manifest: &BackupManifest,
444 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
445 ) -> Result<DynamicIndexReader, Error> {
cf9271e2 446
b957aa81
DM
447 let mut tmpfile = std::fs::OpenOptions::new()
448 .write(true)
449 .read(true)
450 .custom_flags(libc::O_TMPFILE)
451 .open("/tmp")?;
cf9271e2 452
b957aa81
DM
453 let param = json!({ "archive-name": archive_name });
454 self.h2.download("previous", Some(param), &mut tmpfile).await?;
455
456 let index = DynamicIndexReader::new(tmpfile)
40592856 457 .map_err(|err| format_err!("unable to read dynmamic index '{}' - {}", archive_name, err))?;
b957aa81
DM
458 // Note: do not use values stored in index (not trusted) - instead, computed them again
459 let (csum, size) = index.compute_csum();
460 manifest.verify_file(archive_name, &csum, size)?;
461
462 // add index chunks to known chunks
463 let mut known_chunks = known_chunks.lock().unwrap();
464 for i in 0..index.index_count() {
465 known_chunks.insert(*index.index_digest(i).unwrap());
cf9271e2
DM
466 }
467
e02c3d46 468 if self.verbose {
b957aa81 469 println!("{}: known chunks list length is {}", archive_name, index.index_count());
e02c3d46 470 }
ee5fe978 471
b957aa81
DM
472 Ok(index)
473 }
474
475 /// Download backup manifest (index.json) of last backup
476 pub async fn download_previous_manifest(&self) -> Result<BackupManifest, Error> {
477
b957aa81
DM
478 let mut raw_data = Vec::with_capacity(64 * 1024);
479
480 let param = json!({ "archive-name": MANIFEST_BLOB_NAME });
481 self.h2.download("previous", Some(param), &mut raw_data).await?;
482
39f18b30 483 let blob = DataBlob::load_from_reader(&mut &raw_data[..])?;
8819d1f2
FG
484 // no expected digest available
485 let data = blob.decode(self.crypt_config.as_ref().map(Arc::as_ref), None)?;
3dacedce
DM
486
487 let manifest = BackupManifest::from_data(&data[..], self.crypt_config.as_ref().map(Arc::as_ref))?;
b957aa81
DM
488
489 Ok(manifest)
cf9271e2
DM
490 }
491
492 fn upload_chunk_info_stream(
493 h2: H2Client,
494 wid: u64,
495 stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
496 prefix: &str,
497 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
498 crypt_config: Option<Arc<CryptConfig>>,
3638341a 499 compress: bool,
e02c3d46 500 verbose: bool,
6e1deb15 501 ) -> impl Future<Output = Result<(usize, usize, usize, usize, std::time::Duration, [u8; 32]), Error>> {
cf9271e2 502
6e1deb15
TL
503 let total_chunks = Arc::new(AtomicUsize::new(0));
504 let total_chunks2 = total_chunks.clone();
505 let known_chunk_count = Arc::new(AtomicUsize::new(0));
506 let known_chunk_count2 = known_chunk_count.clone();
cf9271e2
DM
507
508 let stream_len = Arc::new(AtomicUsize::new(0));
509 let stream_len2 = stream_len.clone();
6e1deb15
TL
510 let reused_len = Arc::new(AtomicUsize::new(0));
511 let reused_len2 = reused_len.clone();
cf9271e2
DM
512
513 let append_chunk_path = format!("{}_index", prefix);
514 let upload_chunk_path = format!("{}_chunk", prefix);
515 let is_fixed_chunk_size = prefix == "fixed";
516
517 let (upload_queue, upload_result) =
e02c3d46 518 Self::append_chunk_queue(h2.clone(), wid, append_chunk_path.to_owned(), verbose);
cf9271e2
DM
519
520 let start_time = std::time::Instant::now();
521
522 let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
523 let index_csum_2 = index_csum.clone();
524
525 stream
526 .and_then(move |data| {
527
528 let chunk_len = data.len();
529
6e1deb15 530 total_chunks.fetch_add(1, Ordering::SeqCst);
cf9271e2
DM
531 let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;
532
533 let mut chunk_builder = DataChunkBuilder::new(data.as_ref())
3638341a 534 .compress(compress);
cf9271e2
DM
535
536 if let Some(ref crypt_config) = crypt_config {
3638341a 537 chunk_builder = chunk_builder.crypt_config(crypt_config);
cf9271e2
DM
538 }
539
540 let mut known_chunks = known_chunks.lock().unwrap();
541 let digest = chunk_builder.digest();
542
543 let mut guard = index_csum.lock().unwrap();
544 let csum = guard.as_mut().unwrap();
545
546 let chunk_end = offset + chunk_len as u64;
547
548 if !is_fixed_chunk_size { csum.update(&chunk_end.to_le_bytes()); }
549 csum.update(digest);
550
551 let chunk_is_known = known_chunks.contains(digest);
552 if chunk_is_known {
6e1deb15
TL
553 known_chunk_count.fetch_add(1, Ordering::SeqCst);
554 reused_len.fetch_add(chunk_len, Ordering::SeqCst);
cf9271e2
DM
555 future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
556 } else {
557 known_chunks.insert(*digest);
558 future::ready(chunk_builder
559 .build()
560 .map(move |(chunk, digest)| MergedChunkInfo::New(ChunkInfo {
561 chunk,
562 digest,
563 chunk_len: chunk_len as u64,
564 offset,
565 }))
566 )
567 }
568 })
569 .merge_known_chunks()
570 .try_for_each(move |merged_chunk_info| {
571
572 if let MergedChunkInfo::New(chunk_info) = merged_chunk_info {
573 let offset = chunk_info.offset;
574 let digest = chunk_info.digest;
575 let digest_str = digest_to_hex(&digest);
576
f40b4fb0 577 if false && verbose { // TO verbose, needs finer verbosity setting granularity
e02c3d46
DM
578 println!("upload new chunk {} ({} bytes, offset {})", digest_str,
579 chunk_info.chunk_len, offset);
580 }
cf9271e2 581
db0cb9ce 582 let chunk_data = chunk_info.chunk.into_inner();
cf9271e2
DM
583 let param = json!({
584 "wid": wid,
585 "digest": digest_str,
586 "size": chunk_info.chunk_len,
587 "encoded-size": chunk_data.len(),
588 });
589
590 let ct = "application/octet-stream";
591 let request = H2Client::request_builder("localhost", "POST", &upload_chunk_path, Some(param), Some(ct)).unwrap();
592 let upload_data = Some(bytes::Bytes::from(chunk_data));
593
594 let new_info = MergedChunkInfo::Known(vec![(offset, digest)]);
595
596 let mut upload_queue = upload_queue.clone();
597 future::Either::Left(h2
598 .send_request(request, upload_data)
599 .and_then(move |response| async move {
600 upload_queue
601 .send((new_info, Some(response)))
602 .await
db0cb9ce 603 .map_err(|err| format_err!("failed to send to upload queue: {}", err))
cf9271e2
DM
604 })
605 )
606 } else {
607 let mut upload_queue = upload_queue.clone();
608 future::Either::Right(async move {
609 upload_queue
610 .send((merged_chunk_info, None))
611 .await
db0cb9ce 612 .map_err(|err| format_err!("failed to send to upload queue: {}", err))
cf9271e2
DM
613 })
614 }
615 })
616 .then(move |result| async move {
617 upload_result.await?.and(result)
618 }.boxed())
619 .and_then(move |_| {
6e1deb15
TL
620 let duration = start_time.elapsed();
621 let total_chunks = total_chunks2.load(Ordering::SeqCst);
622 let known_chunk_count = known_chunk_count2.load(Ordering::SeqCst);
cf9271e2 623 let stream_len = stream_len2.load(Ordering::SeqCst);
6e1deb15 624 let reused_len = reused_len2.load(Ordering::SeqCst);
cf9271e2
DM
625
626 let mut guard = index_csum_2.lock().unwrap();
627 let csum = guard.take().unwrap().finish();
628
6e1deb15 629 futures::future::ok((total_chunks, known_chunk_count, stream_len, reused_len, duration, csum))
cf9271e2
DM
630 })
631 }
632
1ffe0301 633 /// Upload speed test - prints result to stderr
dde18bbb 634 pub async fn upload_speedtest(&self, verbose: bool) -> Result<f64, Error> {
cf9271e2
DM
635
636 let mut data = vec![];
637 // generate pseudo random byte sequence
638 for i in 0..1024*1024 {
639 for j in 0..4 {
640 let byte = ((i >> (j<<3))&0xff) as u8;
641 data.push(byte);
642 }
643 }
644
645 let item_len = data.len();
646
647 let mut repeat = 0;
648
323b2f3d 649 let (upload_queue, upload_result) = Self::response_queue(verbose);
cf9271e2
DM
650
651 let start_time = std::time::Instant::now();
652
653 loop {
654 repeat += 1;
655 if start_time.elapsed().as_secs() >= 5 {
656 break;
657 }
658
659 let mut upload_queue = upload_queue.clone();
660
dde18bbb 661 if verbose { eprintln!("send test data ({} bytes)", data.len()); }
cf9271e2
DM
662 let request = H2Client::request_builder("localhost", "POST", "speedtest", None, None).unwrap();
663 let request_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?;
664
665 upload_queue.send(request_future).await?;
666 }
667
668 drop(upload_queue); // close queue
669
670 let _ = upload_result.await?;
671
dde18bbb
DM
672 eprintln!("Uploaded {} chunks in {} seconds.", repeat, start_time.elapsed().as_secs());
673 let speed = ((item_len*(repeat as usize)) as f64)/start_time.elapsed().as_secs_f64();
674 eprintln!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128));
cf9271e2
DM
675
676 Ok(speed)
677 }
678}