]> git.proxmox.com Git - proxmox-backup.git/blame - src/client/backup_writer.rs
typo fixes all over the place
[proxmox-backup.git] / src / client / backup_writer.rs
CommitLineData
cf9271e2 1use std::collections::HashSet;
b957aa81 2use std::os::unix::fs::OpenOptionsExt;
cf9271e2
DM
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::sync::{Arc, Mutex};
5
f28d9088 6use anyhow::{bail, format_err, Error};
cf9271e2
DM
7use futures::*;
8use futures::stream::Stream;
dc089345 9use futures::future::AbortHandle;
cf9271e2
DM
10use serde_json::{json, Value};
11use tokio::io::AsyncReadExt;
12use tokio::sync::{mpsc, oneshot};
7c667013 13use tokio_stream::wrappers::ReceiverStream;
cf9271e2
DM
14
15use proxmox::tools::digest_to_hex;
16
17use super::merge_known_chunks::{MergedChunkInfo, MergeKnownChunks};
18use crate::backup::*;
6e1deb15 19use crate::tools::format::HumanByte;
cf9271e2
DM
20
21use super::{HttpClient, H2Client};
22
23pub struct BackupWriter {
24 h2: H2Client,
dc089345 25 abort: AbortHandle,
e02c3d46 26 verbose: bool,
b957aa81 27 crypt_config: Option<Arc<CryptConfig>>,
cf9271e2
DM
28}
29
30impl Drop for BackupWriter {
31
32 fn drop(&mut self) {
dc089345 33 self.abort.abort();
cf9271e2
DM
34 }
35}
36
37pub struct BackupStats {
38 pub size: u64,
39 pub csum: [u8; 32],
40}
41
e43b9175
FG
42/// Options for uploading blobs/streams to the server
43#[derive(Default, Clone)]
44pub struct UploadOptions {
45 pub previous_manifest: Option<Arc<BackupManifest>>,
46 pub compress: bool,
47 pub encrypt: bool,
48 pub fixed_size: Option<u64>,
49}
50
8db14689
WB
51type UploadQueueSender = mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>;
52type UploadResultReceiver = oneshot::Receiver<Result<(), Error>>;
53
cf9271e2
DM
54impl BackupWriter {
55
b957aa81
DM
56 fn new(h2: H2Client, abort: AbortHandle, crypt_config: Option<Arc<CryptConfig>>, verbose: bool) -> Arc<Self> {
57 Arc::new(Self { h2, abort, crypt_config, verbose })
cf9271e2
DM
58 }
59
367c0ff7
FG
60 // FIXME: extract into (flattened) parameter struct?
61 #[allow(clippy::too_many_arguments)]
cf9271e2
DM
62 pub async fn start(
63 client: HttpClient,
b957aa81 64 crypt_config: Option<Arc<CryptConfig>>,
cf9271e2
DM
65 datastore: &str,
66 backup_type: &str,
67 backup_id: &str,
6a7be83e 68 backup_time: i64,
cf9271e2 69 debug: bool,
61d7b501 70 benchmark: bool
cf9271e2
DM
71 ) -> Result<Arc<BackupWriter>, Error> {
72
73 let param = json!({
74 "backup-type": backup_type,
75 "backup-id": backup_id,
6a7be83e 76 "backup-time": backup_time,
cf9271e2 77 "store": datastore,
61d7b501
HL
78 "debug": debug,
79 "benchmark": benchmark
cf9271e2
DM
80 });
81
82 let req = HttpClient::request_builder(
ba20987a 83 client.server(), client.port(), "GET", "/api2/json/backup", Some(param)).unwrap();
cf9271e2 84
dc089345 85 let (h2, abort) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())).await?;
cf9271e2 86
b957aa81 87 Ok(BackupWriter::new(h2, abort, crypt_config, debug))
cf9271e2
DM
88 }
89
90 pub async fn get(
91 &self,
92 path: &str,
93 param: Option<Value>,
94 ) -> Result<Value, Error> {
95 self.h2.get(path, param).await
96 }
97
98 pub async fn put(
99 &self,
100 path: &str,
101 param: Option<Value>,
102 ) -> Result<Value, Error> {
103 self.h2.put(path, param).await
104 }
105
106 pub async fn post(
107 &self,
108 path: &str,
109 param: Option<Value>,
110 ) -> Result<Value, Error> {
111 self.h2.post(path, param).await
112 }
113
114 pub async fn upload_post(
115 &self,
116 path: &str,
117 param: Option<Value>,
118 content_type: &str,
119 data: Vec<u8>,
120 ) -> Result<Value, Error> {
121 self.h2.upload("POST", path, param, content_type, data).await
122 }
123
124 pub async fn send_upload_request(
125 &self,
126 method: &str,
127 path: &str,
128 param: Option<Value>,
129 content_type: &str,
130 data: Vec<u8>,
131 ) -> Result<h2::client::ResponseFuture, Error> {
132
133 let request = H2Client::request_builder("localhost", method, path, param, Some(content_type)).unwrap();
134 let response_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?;
135 Ok(response_future)
136 }
137
138 pub async fn upload_put(
139 &self,
140 path: &str,
141 param: Option<Value>,
142 content_type: &str,
143 data: Vec<u8>,
144 ) -> Result<Value, Error> {
145 self.h2.upload("PUT", path, param, content_type, data).await
146 }
147
148 pub async fn finish(self: Arc<Self>) -> Result<(), Error> {
149 let h2 = self.h2.clone();
150
151 h2.post("finish", None)
152 .map_ok(move |_| {
dc089345 153 self.abort.abort();
cf9271e2
DM
154 })
155 .await
156 }
157
e016f9ff 158 pub fn cancel(&self) {
dc089345 159 self.abort.abort();
cf9271e2
DM
160 }
161
162 pub async fn upload_blob<R: std::io::Read>(
163 &self,
164 mut reader: R,
165 file_name: &str,
166 ) -> Result<BackupStats, Error> {
167 let mut raw_data = Vec::new();
168 // fixme: avoid loading into memory
169 reader.read_to_end(&mut raw_data)?;
170
171 let csum = openssl::sha::sha256(&raw_data);
172 let param = json!({"encoded-size": raw_data.len(), "file-name": file_name });
173 let size = raw_data.len() as u64;
174 let _value = self.h2.upload("POST", "blob", Some(param), "application/octet-stream", raw_data).await?;
175 Ok(BackupStats { size, csum })
176 }
177
178 pub async fn upload_blob_from_data(
179 &self,
180 data: Vec<u8>,
181 file_name: &str,
e43b9175 182 options: UploadOptions,
f28d9088 183 ) -> Result<BackupStats, Error> {
e43b9175
FG
184 let blob = match (options.encrypt, &self.crypt_config) {
185 (false, _) => DataBlob::encode(&data, None, options.compress)?,
3638341a 186 (true, None) => bail!("requested encryption without a crypt config"),
e43b9175 187 (true, Some(crypt_config)) => DataBlob::encode(&data, Some(crypt_config), options.compress)?,
cf9271e2
DM
188 };
189
190 let raw_data = blob.into_inner();
191 let size = raw_data.len() as u64;
192
193 let csum = openssl::sha::sha256(&raw_data);
194 let param = json!({"encoded-size": size, "file-name": file_name });
195 let _value = self.h2.upload("POST", "blob", Some(param), "application/octet-stream", raw_data).await?;
196 Ok(BackupStats { size, csum })
197 }
198
199 pub async fn upload_blob_from_file<P: AsRef<std::path::Path>>(
200 &self,
201 src_path: P,
202 file_name: &str,
e43b9175 203 options: UploadOptions,
3638341a 204 ) -> Result<BackupStats, Error> {
cf9271e2
DM
205
206 let src_path = src_path.as_ref();
207
208 let mut file = tokio::fs::File::open(src_path)
209 .await
210 .map_err(|err| format_err!("unable to open file {:?} - {}", src_path, err))?;
211
212 let mut contents = Vec::new();
213
214 file.read_to_end(&mut contents)
215 .await
216 .map_err(|err| format_err!("unable to read file {:?} - {}", src_path, err))?;
217
e43b9175 218 self.upload_blob_from_data(contents, file_name, options).await
cf9271e2
DM
219 }
220
221 pub async fn upload_stream(
222 &self,
223 archive_name: &str,
224 stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
e43b9175 225 options: UploadOptions,
cf9271e2
DM
226 ) -> Result<BackupStats, Error> {
227 let known_chunks = Arc::new(Mutex::new(HashSet::new()));
228
229 let mut param = json!({ "archive-name": archive_name });
e43b9175 230 let prefix = if let Some(size) = options.fixed_size {
cf9271e2 231 param["size"] = size.into();
e43b9175
FG
232 "fixed"
233 } else {
234 "dynamic"
235 };
cf9271e2 236
e43b9175 237 if options.encrypt && self.crypt_config.is_none() {
3638341a
DM
238 bail!("requested encryption without a crypt config");
239 }
240
cf9271e2
DM
241 let index_path = format!("{}_index", prefix);
242 let close_path = format!("{}_close", prefix);
243
e43b9175 244 if let Some(manifest) = options.previous_manifest {
b957aa81
DM
245 // try, but ignore errors
246 match archive_type(archive_name) {
247 Ok(ArchiveType::FixedIndex) => {
248 let _ = self.download_previous_fixed_index(archive_name, &manifest, known_chunks.clone()).await;
249 }
250 Ok(ArchiveType::DynamicIndex) => {
251 let _ = self.download_previous_dynamic_index(archive_name, &manifest, known_chunks.clone()).await;
252 }
253 _ => { /* do nothing */ }
254 }
255 }
cf9271e2
DM
256
257 let wid = self.h2.post(&index_path, Some(param)).await?.as_u64().unwrap();
258
6e1deb15 259 let (chunk_count, chunk_reused, size, size_reused, duration, csum) =
cf9271e2
DM
260 Self::upload_chunk_info_stream(
261 self.h2.clone(),
262 wid,
263 stream,
264 &prefix,
265 known_chunks.clone(),
e43b9175
FG
266 if options.encrypt { self.crypt_config.clone() } else { None },
267 options.compress,
e02c3d46 268 self.verbose,
cf9271e2
DM
269 )
270 .await?;
271
6e1deb15
TL
272 let uploaded = size - size_reused;
273 let vsize_h: HumanByte = size.into();
274 let archive = if self.verbose {
275 archive_name.to_string()
276 } else {
8db14689 277 crate::tools::format::strip_server_file_extension(archive_name)
6e1deb15
TL
278 };
279 if archive_name != CATALOG_NAME {
0cf14984 280 let speed: HumanByte = ((uploaded * 1_000_000) / (duration.as_micros() as usize)).into();
6e1deb15 281 let uploaded: HumanByte = uploaded.into();
505c5f0f 282 println!("{}: had to upload {} of {} in {:.2}s, average speed {}/s).", archive, uploaded, vsize_h, duration.as_secs_f64(), speed);
6e1deb15
TL
283 } else {
284 println!("Uploaded backup catalog ({})", vsize_h);
285 }
286
287 if size_reused > 0 && size > 1024*1024 {
288 let reused_percent = size_reused as f64 * 100. / size as f64;
289 let reused: HumanByte = size_reused.into();
290 println!("{}: backup was done incrementally, reused {} ({:.1}%)", archive, reused, reused_percent);
291 }
292 if self.verbose && chunk_count > 0 {
293 println!("{}: Reused {} from {} chunks.", archive, chunk_reused, chunk_count);
294 println!("{}: Average chunk size was {}.", archive, HumanByte::from(size/chunk_count));
295 println!("{}: Average time per request: {} microseconds.", archive, (duration.as_micros())/(chunk_count as u128));
6da73c82
DM
296 }
297
cf9271e2
DM
298 let param = json!({
299 "wid": wid ,
300 "chunk-count": chunk_count,
301 "size": size,
302 "csum": proxmox::tools::digest_to_hex(&csum),
303 });
304 let _value = self.h2.post(&close_path, Some(param)).await?;
305 Ok(BackupStats {
306 size: size as u64,
307 csum,
308 })
309 }
310
323b2f3d 311 fn response_queue(verbose: bool) -> (
cf9271e2
DM
312 mpsc::Sender<h2::client::ResponseFuture>,
313 oneshot::Receiver<Result<(), Error>>
314 ) {
315 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100);
316 let (verify_result_tx, verify_result_rx) = oneshot::channel();
317
db0cb9ce
WB
318 // FIXME: check if this works as expected as replacement for the combinator below?
319 // tokio::spawn(async move {
320 // let result: Result<(), Error> = (async move {
321 // while let Some(response) = verify_queue_rx.recv().await {
322 // match H2Client::h2api_response(response.await?).await {
323 // Ok(result) => println!("RESPONSE: {:?}", result),
324 // Err(err) => bail!("pipelined request failed: {}", err),
325 // }
326 // }
327 // Ok(())
328 // }).await;
329 // let _ignore_closed_channel = verify_result_tx.send(result);
330 // });
331 // old code for reference?
332 tokio::spawn(
7c667013 333 ReceiverStream::new(verify_queue_rx)
cf9271e2 334 .map(Ok::<_, Error>)
323b2f3d 335 .try_for_each(move |response: h2::client::ResponseFuture| {
cf9271e2
DM
336 response
337 .map_err(Error::from)
338 .and_then(H2Client::h2api_response)
323b2f3d 339 .map_ok(move |result| if verbose { println!("RESPONSE: {:?}", result) })
cf9271e2
DM
340 .map_err(|err| format_err!("pipelined request failed: {}", err))
341 })
342 .map(|result| {
343 let _ignore_closed_channel = verify_result_tx.send(result);
344 })
345 );
346
347 (verify_queue_tx, verify_result_rx)
348 }
349
8db14689
WB
350 fn append_chunk_queue(
351 h2: H2Client,
352 wid: u64,
353 path: String,
354 verbose: bool,
355 ) -> (UploadQueueSender, UploadResultReceiver) {
cf9271e2
DM
356 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(64);
357 let (verify_result_tx, verify_result_rx) = oneshot::channel();
358
db0cb9ce
WB
359 // FIXME: async-block-ify this code!
360 tokio::spawn(
7c667013 361 ReceiverStream::new(verify_queue_rx)
cf9271e2
DM
362 .map(Ok::<_, Error>)
363 .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| {
364 match (response, merged_chunk_info) {
365 (Some(response), MergedChunkInfo::Known(list)) => {
366 future::Either::Left(
367 response
368 .map_err(Error::from)
369 .and_then(H2Client::h2api_response)
370 .and_then(move |_result| {
371 future::ok(MergedChunkInfo::Known(list))
372 })
373 )
374 }
375 (None, MergedChunkInfo::Known(list)) => {
376 future::Either::Right(future::ok(MergedChunkInfo::Known(list)))
377 }
378 _ => unreachable!(),
379 }
380 })
381 .merge_known_chunks()
382 .and_then(move |merged_chunk_info| {
383 match merged_chunk_info {
384 MergedChunkInfo::Known(chunk_list) => {
385 let mut digest_list = vec![];
386 let mut offset_list = vec![];
387 for (offset, digest) in chunk_list {
cf9271e2
DM
388 digest_list.push(digest_to_hex(&digest));
389 offset_list.push(offset);
390 }
e02c3d46 391 if verbose { println!("append chunks list len ({})", digest_list.len()); }
cf9271e2
DM
392 let param = json!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list });
393 let request = H2Client::request_builder("localhost", "PUT", &path, None, Some("application/json")).unwrap();
db0cb9ce 394 let param_data = bytes::Bytes::from(param.to_string().into_bytes());
cf9271e2 395 let upload_data = Some(param_data);
8db14689 396 h2.send_request(request, upload_data)
cf9271e2
DM
397 .and_then(move |response| {
398 response
399 .map_err(Error::from)
400 .and_then(H2Client::h2api_response)
401 .map_ok(|_| ())
402 })
403 .map_err(|err| format_err!("pipelined request failed: {}", err))
404 }
405 _ => unreachable!(),
406 }
407 })
408 .try_for_each(|_| future::ok(()))
409 .map(|result| {
410 let _ignore_closed_channel = verify_result_tx.send(result);
411 })
412 );
413
414 (verify_queue_tx, verify_result_rx)
415 }
416
b957aa81 417 pub async fn download_previous_fixed_index(
cf9271e2 418 &self,
cf9271e2 419 archive_name: &str,
b957aa81 420 manifest: &BackupManifest,
cf9271e2 421 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
b957aa81 422 ) -> Result<FixedIndexReader, Error> {
cf9271e2 423
b957aa81
DM
424 let mut tmpfile = std::fs::OpenOptions::new()
425 .write(true)
426 .read(true)
427 .custom_flags(libc::O_TMPFILE)
428 .open("/tmp")?;
cf9271e2 429
b957aa81
DM
430 let param = json!({ "archive-name": archive_name });
431 self.h2.download("previous", Some(param), &mut tmpfile).await?;
432
433 let index = FixedIndexReader::new(tmpfile)
434 .map_err(|err| format_err!("unable to read fixed index '{}' - {}", archive_name, err))?;
435 // Note: do not use values stored in index (not trusted) - instead, computed them again
436 let (csum, size) = index.compute_csum();
437 manifest.verify_file(archive_name, &csum, size)?;
438
439 // add index chunks to known chunks
440 let mut known_chunks = known_chunks.lock().unwrap();
441 for i in 0..index.index_count() {
442 known_chunks.insert(*index.index_digest(i).unwrap());
443 }
cf9271e2 444
b957aa81
DM
445 if self.verbose {
446 println!("{}: known chunks list length is {}", archive_name, index.index_count());
cf9271e2
DM
447 }
448
b957aa81
DM
449 Ok(index)
450 }
451
452 pub async fn download_previous_dynamic_index(
453 &self,
454 archive_name: &str,
455 manifest: &BackupManifest,
456 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
457 ) -> Result<DynamicIndexReader, Error> {
cf9271e2 458
b957aa81
DM
459 let mut tmpfile = std::fs::OpenOptions::new()
460 .write(true)
461 .read(true)
462 .custom_flags(libc::O_TMPFILE)
463 .open("/tmp")?;
cf9271e2 464
b957aa81
DM
465 let param = json!({ "archive-name": archive_name });
466 self.h2.download("previous", Some(param), &mut tmpfile).await?;
467
468 let index = DynamicIndexReader::new(tmpfile)
40592856 469 .map_err(|err| format_err!("unable to read dynmamic index '{}' - {}", archive_name, err))?;
b957aa81
DM
470 // Note: do not use values stored in index (not trusted) - instead, computed them again
471 let (csum, size) = index.compute_csum();
472 manifest.verify_file(archive_name, &csum, size)?;
473
474 // add index chunks to known chunks
475 let mut known_chunks = known_chunks.lock().unwrap();
476 for i in 0..index.index_count() {
477 known_chunks.insert(*index.index_digest(i).unwrap());
cf9271e2
DM
478 }
479
e02c3d46 480 if self.verbose {
b957aa81 481 println!("{}: known chunks list length is {}", archive_name, index.index_count());
e02c3d46 482 }
ee5fe978 483
b957aa81
DM
484 Ok(index)
485 }
486
8b7f8d3f
FG
487 /// Retrieve backup time of last backup
488 pub async fn previous_backup_time(&self) -> Result<Option<i64>, Error> {
489 let data = self.h2.get("previous_backup_time", None).await?;
490 serde_json::from_value(data)
491 .map_err(|err| format_err!("Failed to parse backup time value returned by server - {}", err))
492 }
493
b957aa81
DM
494 /// Download backup manifest (index.json) of last backup
495 pub async fn download_previous_manifest(&self) -> Result<BackupManifest, Error> {
496
b957aa81
DM
497 let mut raw_data = Vec::with_capacity(64 * 1024);
498
499 let param = json!({ "archive-name": MANIFEST_BLOB_NAME });
500 self.h2.download("previous", Some(param), &mut raw_data).await?;
501
39f18b30 502 let blob = DataBlob::load_from_reader(&mut &raw_data[..])?;
8819d1f2
FG
503 // no expected digest available
504 let data = blob.decode(self.crypt_config.as_ref().map(Arc::as_ref), None)?;
3dacedce
DM
505
506 let manifest = BackupManifest::from_data(&data[..], self.crypt_config.as_ref().map(Arc::as_ref))?;
b957aa81
DM
507
508 Ok(manifest)
cf9271e2
DM
509 }
510
8db14689 511 // We have no `self` here for `h2` and `verbose`, the only other arg "common" with 1 other
d1d74c43 512 // function in the same path is `wid`, so those 3 could be in a struct, but there's no real use
8db14689
WB
513 // since this is a private method.
514 #[allow(clippy::too_many_arguments)]
cf9271e2
DM
515 fn upload_chunk_info_stream(
516 h2: H2Client,
517 wid: u64,
518 stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
519 prefix: &str,
520 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
521 crypt_config: Option<Arc<CryptConfig>>,
3638341a 522 compress: bool,
e02c3d46 523 verbose: bool,
6e1deb15 524 ) -> impl Future<Output = Result<(usize, usize, usize, usize, std::time::Duration, [u8; 32]), Error>> {
cf9271e2 525
6e1deb15
TL
526 let total_chunks = Arc::new(AtomicUsize::new(0));
527 let total_chunks2 = total_chunks.clone();
528 let known_chunk_count = Arc::new(AtomicUsize::new(0));
529 let known_chunk_count2 = known_chunk_count.clone();
cf9271e2
DM
530
531 let stream_len = Arc::new(AtomicUsize::new(0));
532 let stream_len2 = stream_len.clone();
6e1deb15
TL
533 let reused_len = Arc::new(AtomicUsize::new(0));
534 let reused_len2 = reused_len.clone();
cf9271e2
DM
535
536 let append_chunk_path = format!("{}_index", prefix);
537 let upload_chunk_path = format!("{}_chunk", prefix);
538 let is_fixed_chunk_size = prefix == "fixed";
539
540 let (upload_queue, upload_result) =
8db14689 541 Self::append_chunk_queue(h2.clone(), wid, append_chunk_path, verbose);
cf9271e2
DM
542
543 let start_time = std::time::Instant::now();
544
545 let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
546 let index_csum_2 = index_csum.clone();
547
548 stream
549 .and_then(move |data| {
550
551 let chunk_len = data.len();
552
6e1deb15 553 total_chunks.fetch_add(1, Ordering::SeqCst);
cf9271e2
DM
554 let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;
555
556 let mut chunk_builder = DataChunkBuilder::new(data.as_ref())
3638341a 557 .compress(compress);
cf9271e2
DM
558
559 if let Some(ref crypt_config) = crypt_config {
3638341a 560 chunk_builder = chunk_builder.crypt_config(crypt_config);
cf9271e2
DM
561 }
562
563 let mut known_chunks = known_chunks.lock().unwrap();
564 let digest = chunk_builder.digest();
565
566 let mut guard = index_csum.lock().unwrap();
567 let csum = guard.as_mut().unwrap();
568
569 let chunk_end = offset + chunk_len as u64;
570
571 if !is_fixed_chunk_size { csum.update(&chunk_end.to_le_bytes()); }
572 csum.update(digest);
573
574 let chunk_is_known = known_chunks.contains(digest);
575 if chunk_is_known {
6e1deb15
TL
576 known_chunk_count.fetch_add(1, Ordering::SeqCst);
577 reused_len.fetch_add(chunk_len, Ordering::SeqCst);
cf9271e2
DM
578 future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
579 } else {
580 known_chunks.insert(*digest);
581 future::ready(chunk_builder
582 .build()
583 .map(move |(chunk, digest)| MergedChunkInfo::New(ChunkInfo {
584 chunk,
585 digest,
586 chunk_len: chunk_len as u64,
587 offset,
588 }))
589 )
590 }
591 })
592 .merge_known_chunks()
593 .try_for_each(move |merged_chunk_info| {
0bfcea6a 594 let upload_queue = upload_queue.clone();
cf9271e2
DM
595
596 if let MergedChunkInfo::New(chunk_info) = merged_chunk_info {
597 let offset = chunk_info.offset;
598 let digest = chunk_info.digest;
599 let digest_str = digest_to_hex(&digest);
600
8db14689
WB
601 /* too verbose, needs finer verbosity setting granularity
602 if verbose {
e02c3d46
DM
603 println!("upload new chunk {} ({} bytes, offset {})", digest_str,
604 chunk_info.chunk_len, offset);
605 }
8db14689 606 */
cf9271e2 607
db0cb9ce 608 let chunk_data = chunk_info.chunk.into_inner();
cf9271e2
DM
609 let param = json!({
610 "wid": wid,
611 "digest": digest_str,
612 "size": chunk_info.chunk_len,
613 "encoded-size": chunk_data.len(),
614 });
615
616 let ct = "application/octet-stream";
617 let request = H2Client::request_builder("localhost", "POST", &upload_chunk_path, Some(param), Some(ct)).unwrap();
618 let upload_data = Some(bytes::Bytes::from(chunk_data));
619
620 let new_info = MergedChunkInfo::Known(vec![(offset, digest)]);
621
cf9271e2
DM
622 future::Either::Left(h2
623 .send_request(request, upload_data)
624 .and_then(move |response| async move {
625 upload_queue
626 .send((new_info, Some(response)))
627 .await
db0cb9ce 628 .map_err(|err| format_err!("failed to send to upload queue: {}", err))
cf9271e2
DM
629 })
630 )
631 } else {
cf9271e2
DM
632 future::Either::Right(async move {
633 upload_queue
634 .send((merged_chunk_info, None))
635 .await
db0cb9ce 636 .map_err(|err| format_err!("failed to send to upload queue: {}", err))
cf9271e2
DM
637 })
638 }
639 })
640 .then(move |result| async move {
641 upload_result.await?.and(result)
642 }.boxed())
643 .and_then(move |_| {
6e1deb15
TL
644 let duration = start_time.elapsed();
645 let total_chunks = total_chunks2.load(Ordering::SeqCst);
646 let known_chunk_count = known_chunk_count2.load(Ordering::SeqCst);
cf9271e2 647 let stream_len = stream_len2.load(Ordering::SeqCst);
6e1deb15 648 let reused_len = reused_len2.load(Ordering::SeqCst);
cf9271e2
DM
649
650 let mut guard = index_csum_2.lock().unwrap();
651 let csum = guard.take().unwrap().finish();
652
6e1deb15 653 futures::future::ok((total_chunks, known_chunk_count, stream_len, reused_len, duration, csum))
cf9271e2
DM
654 })
655 }
656
1ffe0301 657 /// Upload speed test - prints result to stderr
dde18bbb 658 pub async fn upload_speedtest(&self, verbose: bool) -> Result<f64, Error> {
cf9271e2
DM
659
660 let mut data = vec![];
661 // generate pseudo random byte sequence
662 for i in 0..1024*1024 {
663 for j in 0..4 {
664 let byte = ((i >> (j<<3))&0xff) as u8;
665 data.push(byte);
666 }
667 }
668
669 let item_len = data.len();
670
671 let mut repeat = 0;
672
323b2f3d 673 let (upload_queue, upload_result) = Self::response_queue(verbose);
cf9271e2
DM
674
675 let start_time = std::time::Instant::now();
676
677 loop {
678 repeat += 1;
679 if start_time.elapsed().as_secs() >= 5 {
680 break;
681 }
682
dde18bbb 683 if verbose { eprintln!("send test data ({} bytes)", data.len()); }
cf9271e2
DM
684 let request = H2Client::request_builder("localhost", "POST", "speedtest", None, None).unwrap();
685 let request_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?;
686
687 upload_queue.send(request_future).await?;
688 }
689
690 drop(upload_queue); // close queue
691
692 let _ = upload_result.await?;
693
dde18bbb
DM
694 eprintln!("Uploaded {} chunks in {} seconds.", repeat, start_time.elapsed().as_secs());
695 let speed = ((item_len*(repeat as usize)) as f64)/start_time.elapsed().as_secs_f64();
696 eprintln!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128));
cf9271e2
DM
697
698 Ok(speed)
699 }
700}