]> git.proxmox.com Git - proxmox-backup.git/blob - src/client/http_client.rs
src/client/http_client.rs: Refactor handling Option and Result types
[proxmox-backup.git] / src / client / http_client.rs
1 use std::collections::HashSet;
2 use std::io::Write;
3 use std::sync::atomic::{AtomicUsize, Ordering};
4 use std::sync::{Arc, Mutex};
5
6 use chrono::{DateTime, Utc};
7 use failure::*;
8 use futures::*;
9 use futures::stream::Stream;
10 use http::Uri;
11 use http::header::HeaderValue;
12 use http::{Request, Response};
13 use hyper::Body;
14 use hyper::client::Client;
15 use openssl::ssl::{SslConnector, SslMethod};
16 use serde_json::{json, Value};
17 use tokio::sync::mpsc;
18 use url::percent_encoding::{percent_encode, DEFAULT_ENCODE_SET};
19 use xdg::BaseDirectories;
20
21 use proxmox::tools::{
22 digest_to_hex,
23 fs::{file_get_json, file_set_contents},
24 };
25
26 use super::merge_known_chunks::{MergedChunkInfo, MergeKnownChunks};
27 use super::pipe_to_stream::PipeToSendStream;
28 use crate::backup::*;
29 use crate::tools::futures::{cancellable, Canceller};
30 use crate::tools::{self, BroadcastFuture, tty};
31
32 #[derive(Clone)]
33 pub struct AuthInfo {
34 username: String,
35 ticket: String,
36 token: String,
37 }
38
39 /// HTTP(S) API client
40 pub struct HttpClient {
41 client: Client<hyper_openssl::HttpsConnector<hyper::client::HttpConnector>>,
42 server: String,
43 auth: BroadcastFuture<AuthInfo>,
44 }
45
46 /// Delete stored ticket data (logout)
47 pub fn delete_ticket_info(server: &str, username: &str) -> Result<(), Error> {
48
49 let base = BaseDirectories::with_prefix("proxmox-backup")?;
50
51 // usually /run/user/<uid>/...
52 let path = base.place_runtime_file("tickets")?;
53
54 let mode = nix::sys::stat::Mode::from_bits_truncate(0o0600);
55
56 let mut data = file_get_json(&path, Some(json!({})))?;
57
58 if let Some(map) = data[server].as_object_mut() {
59 map.remove(username);
60 }
61
62 file_set_contents(path, data.to_string().as_bytes(), Some(mode))?;
63
64 Ok(())
65 }
66
67 fn store_ticket_info(server: &str, username: &str, ticket: &str, token: &str) -> Result<(), Error> {
68
69 let base = BaseDirectories::with_prefix("proxmox-backup")?;
70
71 // usually /run/user/<uid>/...
72 let path = base.place_runtime_file("tickets")?;
73
74 let mode = nix::sys::stat::Mode::from_bits_truncate(0o0600);
75
76 let mut data = file_get_json(&path, Some(json!({})))?;
77
78 let now = Utc::now().timestamp();
79
80 data[server][username] = json!({ "timestamp": now, "ticket": ticket, "token": token});
81
82 let mut new_data = json!({});
83
84 let ticket_lifetime = tools::ticket::TICKET_LIFETIME - 60;
85
86 let empty = serde_json::map::Map::new();
87 for (server, info) in data.as_object().unwrap_or(&empty) {
88 for (_user, uinfo) in info.as_object().unwrap_or(&empty) {
89 if let Some(timestamp) = uinfo["timestamp"].as_i64() {
90 let age = now - timestamp;
91 if age < ticket_lifetime {
92 new_data[server][username] = uinfo.clone();
93 }
94 }
95 }
96 }
97
98 file_set_contents(path, new_data.to_string().as_bytes(), Some(mode))?;
99
100 Ok(())
101 }
102
103 fn load_ticket_info(server: &str, username: &str) -> Option<(String, String)> {
104 let base = BaseDirectories::with_prefix("proxmox-backup").ok()?;
105
106 // usually /run/user/<uid>/...
107 let path = base.place_runtime_file("tickets").ok()?;
108 let data = file_get_json(&path, None).ok()?;
109 let now = Utc::now().timestamp();
110 let ticket_lifetime = tools::ticket::TICKET_LIFETIME - 60;
111 let uinfo = data[server][username].as_object()?;
112 let timestamp = uinfo["timestamp"].as_i64()?;
113 let age = now - timestamp;
114
115 if age < ticket_lifetime {
116 let ticket = uinfo["ticket"].as_str()?;
117 let token = uinfo["token"].as_str()?;
118 Some((ticket.to_owned(), token.to_owned()))
119 } else {
120 None
121 }
122 }
123
124 impl HttpClient {
125
126 pub fn new(server: &str, username: &str) -> Result<Self, Error> {
127 let client = Self::build_client();
128
129 let password = if let Some((ticket, _token)) = load_ticket_info(server, username) {
130 ticket
131 } else {
132 Self::get_password(&username)?
133 };
134
135 let login = Self::credentials(client.clone(), server.to_owned(), username.to_owned(), password);
136
137 Ok(Self {
138 client,
139 server: String::from(server),
140 auth: BroadcastFuture::new(login),
141 })
142 }
143
144 /// Login future
145 ///
146 /// Login is done on demand, so this is onyl required if you need
147 /// access to authentication data in 'AuthInfo'.
148 pub fn login(&self) -> impl Future<Item=AuthInfo, Error=Error> {
149 self.auth.listen()
150 }
151
152 fn get_password(_username: &str) -> Result<String, Error> {
153 use std::env::VarError::*;
154 match std::env::var("PBS_PASSWORD") {
155 Ok(p) => return Ok(p),
156 Err(NotUnicode(_)) => bail!("PBS_PASSWORD contains bad characters"),
157 Err(NotPresent) => {
158 // Try another method
159 }
160 }
161
162 // If we're on a TTY, query the user for a password
163 if tty::stdin_isatty() {
164 return Ok(String::from_utf8(tty::read_password("Password: ")?)?);
165 }
166
167 bail!("no password input mechanism available");
168 }
169
170 fn build_client() -> Client<hyper_openssl::HttpsConnector<hyper::client::HttpConnector>> {
171
172 let mut ssl_connector_builder = SslConnector::builder(SslMethod::tls()).unwrap();
173
174 ssl_connector_builder.set_verify(openssl::ssl::SslVerifyMode::NONE); // fixme!
175
176 let mut httpc = hyper::client::HttpConnector::new(1);
177 httpc.set_nodelay(true); // important for h2 download performance!
178 httpc.set_recv_buffer_size(Some(1024*1024)); //important for h2 download performance!
179 httpc.enforce_http(false); // we want https...
180
181 let https = hyper_openssl::HttpsConnector::with_connector(httpc, ssl_connector_builder).unwrap();
182
183 Client::builder()
184 //.http2_initial_stream_window_size( (1 << 31) - 2)
185 //.http2_initial_connection_window_size( (1 << 31) - 2)
186 .build::<_, Body>(https)
187 }
188
189 pub fn request(&self, mut req: Request<Body>) -> impl Future<Item=Value, Error=Error> {
190
191 let login = self.auth.listen();
192
193 let client = self.client.clone();
194
195 login.and_then(move |auth| {
196
197 let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
198 req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
199 req.headers_mut().insert("CSRFPreventionToken", HeaderValue::from_str(&auth.token).unwrap());
200
201 let request = Self::api_request(client, req);
202
203 request
204 })
205 }
206
207 pub fn get(&self, path: &str, data: Option<Value>) -> impl Future<Item=Value, Error=Error> {
208
209 let req = Self::request_builder(&self.server, "GET", path, data).unwrap();
210 self.request(req)
211 }
212
213 pub fn delete(&mut self, path: &str, data: Option<Value>) -> impl Future<Item=Value, Error=Error> {
214
215 let req = Self::request_builder(&self.server, "DELETE", path, data).unwrap();
216 self.request(req)
217 }
218
219 pub fn post(&mut self, path: &str, data: Option<Value>) -> impl Future<Item=Value, Error=Error> {
220
221 let req = Self::request_builder(&self.server, "POST", path, data).unwrap();
222 self.request(req)
223 }
224
225 pub fn download<W: Write>(&mut self, path: &str, output: W) -> impl Future<Item=W, Error=Error> {
226
227 let mut req = Self::request_builder(&self.server, "GET", path, None).unwrap();
228
229 let login = self.auth.listen();
230
231 let client = self.client.clone();
232
233 login.and_then(move |auth| {
234
235 let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
236 req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
237
238 client.request(req)
239 .map_err(Error::from)
240 .and_then(|resp| {
241 let status = resp.status();
242 if !status.is_success() {
243 future::Either::A(
244 HttpClient::api_response(resp)
245 .and_then(|_| { bail!("unknown error"); })
246 )
247 } else {
248 future::Either::B(
249 resp.into_body()
250 .map_err(Error::from)
251 .fold(output, move |mut acc, chunk| {
252 acc.write_all(&chunk)?;
253 Ok::<_, Error>(acc)
254 })
255 )
256 }
257 })
258 })
259 }
260
261 pub fn upload(
262 &mut self,
263 content_type: &str,
264 body: Body,
265 path: &str,
266 data: Option<Value>,
267 ) -> impl Future<Item=Value, Error=Error> {
268
269 let path = path.trim_matches('/');
270 let mut url = format!("https://{}:8007/{}", &self.server, path);
271
272 if let Some(data) = data {
273 let query = tools::json_object_to_query(data).unwrap();
274 url.push('?');
275 url.push_str(&query);
276 }
277
278 let url: Uri = url.parse().unwrap();
279
280 let req = Request::builder()
281 .method("POST")
282 .uri(url)
283 .header("User-Agent", "proxmox-backup-client/1.0")
284 .header("Content-Type", content_type)
285 .body(body).unwrap();
286
287 self.request(req)
288 }
289
290 pub fn start_backup(
291 &self,
292 datastore: &str,
293 backup_type: &str,
294 backup_id: &str,
295 backup_time: DateTime<Utc>,
296 debug: bool,
297 ) -> impl Future<Item=Arc<BackupClient>, Error=Error> {
298
299 let param = json!({
300 "backup-type": backup_type,
301 "backup-id": backup_id,
302 "backup-time": backup_time.timestamp(),
303 "store": datastore,
304 "debug": debug
305 });
306
307 let req = Self::request_builder(&self.server, "GET", "/api2/json/backup", Some(param)).unwrap();
308
309 self.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!()))
310 .map(|(h2, canceller)| BackupClient::new(h2, canceller))
311 }
312
313 pub fn start_backup_reader(
314 &self,
315 datastore: &str,
316 backup_type: &str,
317 backup_id: &str,
318 backup_time: DateTime<Utc>,
319 debug: bool,
320 ) -> impl Future<Item=Arc<BackupReader>, Error=Error> {
321
322 let param = json!({
323 "backup-type": backup_type,
324 "backup-id": backup_id,
325 "backup-time": backup_time.timestamp(),
326 "store": datastore,
327 "debug": debug,
328 });
329 let req = Self::request_builder(&self.server, "GET", "/api2/json/reader", Some(param)).unwrap();
330
331 self.start_h2_connection(req, String::from(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!()))
332 .map(|(h2, canceller)| BackupReader::new(h2, canceller))
333 }
334
335 pub fn start_h2_connection(
336 &self,
337 mut req: Request<Body>,
338 protocol_name: String,
339 ) -> impl Future<Item=(H2Client, Canceller), Error=Error> {
340
341 let login = self.auth.listen();
342 let client = self.client.clone();
343
344 login.and_then(move |auth| {
345
346 let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
347 req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
348 req.headers_mut().insert("UPGRADE", HeaderValue::from_str(&protocol_name).unwrap());
349
350 client.request(req)
351 .map_err(Error::from)
352 .and_then(|resp| {
353
354 let status = resp.status();
355 if status != http::StatusCode::SWITCHING_PROTOCOLS {
356 future::Either::A(Self::api_response(resp).and_then(|_| { bail!("unknown error"); }))
357 } else {
358 future::Either::B(resp.into_body().on_upgrade().map_err(Error::from))
359 }
360 })
361 .and_then(|upgraded| {
362 let max_window_size = (1 << 31) - 2;
363
364 h2::client::Builder::new()
365 .initial_connection_window_size(max_window_size)
366 .initial_window_size(max_window_size)
367 .max_frame_size(4*1024*1024)
368 .handshake(upgraded)
369 .map_err(Error::from)
370 })
371 .and_then(|(h2, connection)| {
372 let connection = connection
373 .map_err(|_| panic!("HTTP/2.0 connection failed"));
374
375 let (connection, canceller) = cancellable(connection)?;
376 // A cancellable future returns an Option which is None when cancelled and
377 // Some when it finished instead, since we don't care about the return type we
378 // need to map it away:
379 let connection = connection.map(|_| ());
380
381 // Spawn a new task to drive the connection state
382 hyper::rt::spawn(connection);
383
384 // Wait until the `SendRequest` handle has available capacity.
385 Ok(h2.ready()
386 .map(move |c| (H2Client::new(c), canceller))
387 .map_err(Error::from))
388 })
389 .flatten()
390 })
391 }
392
393 fn credentials(
394 client: Client<hyper_openssl::HttpsConnector<hyper::client::HttpConnector>>,
395 server: String,
396 username: String,
397 password: String,
398 ) -> Box<dyn Future<Item=AuthInfo, Error=Error> + Send> {
399
400 let server2 = server.clone();
401
402 let create_request = futures::future::lazy(move || {
403 let data = json!({ "username": username, "password": password });
404 let req = Self::request_builder(&server, "POST", "/api2/json/access/ticket", Some(data)).unwrap();
405 Self::api_request(client, req)
406 });
407
408 let login_future = create_request
409 .and_then(move |cred| {
410 let auth = AuthInfo {
411 username: cred["data"]["username"].as_str().unwrap().to_owned(),
412 ticket: cred["data"]["ticket"].as_str().unwrap().to_owned(),
413 token: cred["data"]["CSRFPreventionToken"].as_str().unwrap().to_owned(),
414 };
415
416 let _ = store_ticket_info(&server2, &auth.username, &auth.ticket, &auth.token);
417
418 Ok(auth)
419 });
420
421 Box::new(login_future)
422 }
423
424 fn api_response(response: Response<Body>) -> impl Future<Item=Value, Error=Error> {
425
426 let status = response.status();
427
428 response
429 .into_body()
430 .concat2()
431 .map_err(Error::from)
432 .and_then(move |data| {
433
434 let text = String::from_utf8(data.to_vec()).unwrap();
435 if status.is_success() {
436 if text.len() > 0 {
437 let value: Value = serde_json::from_str(&text)?;
438 Ok(value)
439 } else {
440 Ok(Value::Null)
441 }
442 } else {
443 bail!("HTTP Error {}: {}", status, text);
444 }
445 })
446 }
447
448 fn api_request(
449 client: Client<hyper_openssl::HttpsConnector<hyper::client::HttpConnector>>,
450 req: Request<Body>
451 ) -> impl Future<Item=Value, Error=Error> {
452
453 client.request(req)
454 .map_err(Error::from)
455 .and_then(Self::api_response)
456 }
457
458 pub fn request_builder(server: &str, method: &str, path: &str, data: Option<Value>) -> Result<Request<Body>, Error> {
459 let path = path.trim_matches('/');
460 let url: Uri = format!("https://{}:8007/{}", server, path).parse()?;
461
462 if let Some(data) = data {
463 if method == "POST" {
464 let request = Request::builder()
465 .method(method)
466 .uri(url)
467 .header("User-Agent", "proxmox-backup-client/1.0")
468 .header(hyper::header::CONTENT_TYPE, "application/json")
469 .body(Body::from(data.to_string()))?;
470 return Ok(request);
471 } else {
472 let query = tools::json_object_to_query(data)?;
473 let url: Uri = format!("https://{}:8007/{}?{}", server, path, query).parse()?;
474 let request = Request::builder()
475 .method(method)
476 .uri(url)
477 .header("User-Agent", "proxmox-backup-client/1.0")
478 .header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded")
479 .body(Body::empty())?;
480 return Ok(request);
481 }
482 }
483
484 let request = Request::builder()
485 .method(method)
486 .uri(url)
487 .header("User-Agent", "proxmox-backup-client/1.0")
488 .header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded")
489 .body(Body::empty())?;
490
491 Ok(request)
492 }
493 }
494
495
496 pub struct BackupReader {
497 h2: H2Client,
498 canceller: Canceller,
499 }
500
501 impl Drop for BackupReader {
502
503 fn drop(&mut self) {
504 self.canceller.cancel();
505 }
506 }
507
508 impl BackupReader {
509
510 pub fn new(h2: H2Client, canceller: Canceller) -> Arc<Self> {
511 Arc::new(Self { h2, canceller: canceller })
512 }
513
514 pub fn get(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
515 self.h2.get(path, param)
516 }
517
518 pub fn put(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
519 self.h2.put(path, param)
520 }
521
522 pub fn post(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
523 self.h2.post(path, param)
524 }
525
526 pub fn download<W: Write>(
527 &self,
528 file_name: &str,
529 output: W,
530 ) -> impl Future<Item=W, Error=Error> {
531 let path = "download";
532 let param = json!({ "file-name": file_name });
533 self.h2.download(path, Some(param), output)
534 }
535
536 pub fn speedtest<W: Write>(
537 &self,
538 output: W,
539 ) -> impl Future<Item=W, Error=Error> {
540 self.h2.download("speedtest", None, output)
541 }
542
543 pub fn download_chunk<W: Write>(
544 &self,
545 digest: &[u8; 32],
546 output: W,
547 ) -> impl Future<Item=W, Error=Error> {
548 let path = "chunk";
549 let param = json!({ "digest": digest_to_hex(digest) });
550 self.h2.download(path, Some(param), output)
551 }
552
553 pub fn force_close(self) {
554 self.canceller.cancel();
555 }
556 }
557
558 pub struct BackupClient {
559 h2: H2Client,
560 canceller: Canceller,
561 }
562
563 impl Drop for BackupClient {
564
565 fn drop(&mut self) {
566 self.canceller.cancel();
567 }
568 }
569
570 pub struct BackupStats {
571 pub size: u64,
572 pub csum: [u8; 32],
573 }
574
575 impl BackupClient {
576
577 pub fn new(h2: H2Client, canceller: Canceller) -> Arc<Self> {
578 Arc::new(Self { h2, canceller })
579 }
580
581 pub fn get(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
582 self.h2.get(path, param)
583 }
584
585 pub fn put(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
586 self.h2.put(path, param)
587 }
588
589 pub fn post(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
590 self.h2.post(path, param)
591 }
592
593 pub fn finish(self: Arc<Self>) -> impl Future<Item=(), Error=Error> {
594 self.h2.clone()
595 .post("finish", None)
596 .map(move |_| {
597 self.canceller.cancel();
598 })
599 }
600
601 pub fn force_close(self) {
602 self.canceller.cancel();
603 }
604
605 pub fn upload_blob<R: std::io::Read>(
606 &self,
607 mut reader: R,
608 file_name: &str,
609 ) -> impl Future<Item=BackupStats, Error=Error> {
610
611 let h2 = self.h2.clone();
612 let file_name = file_name.to_owned();
613
614 futures::future::ok(())
615 .and_then(move |_| {
616 let mut raw_data = Vec::new();
617 // fixme: avoid loading into memory
618 reader.read_to_end(&mut raw_data)?;
619 Ok(raw_data)
620 })
621 .and_then(move |raw_data| {
622 let csum = openssl::sha::sha256(&raw_data);
623 let param = json!({"encoded-size": raw_data.len(), "file-name": file_name });
624 let size = raw_data.len() as u64; // fixme: should be decoded size instead??
625 h2.upload("blob", Some(param), raw_data)
626 .map(move |_| {
627 BackupStats { size, csum }
628 })
629 })
630 }
631
632 pub fn upload_blob_from_data(
633 &self,
634 data: Vec<u8>,
635 file_name: &str,
636 crypt_config: Option<Arc<CryptConfig>>,
637 compress: bool,
638 sign_only: bool,
639 ) -> impl Future<Item=BackupStats, Error=Error> {
640
641 let h2 = self.h2.clone();
642 let file_name = file_name.to_owned();
643 let size = data.len() as u64;
644
645 futures::future::ok(())
646 .and_then(move |_| {
647 let blob = if let Some(crypt_config) = crypt_config {
648 if sign_only {
649 DataBlob::create_signed(&data, crypt_config, compress)?
650 } else {
651 DataBlob::encode(&data, Some(crypt_config.clone()), compress)?
652 }
653 } else {
654 DataBlob::encode(&data, None, compress)?
655 };
656
657 let raw_data = blob.into_inner();
658 Ok(raw_data)
659 })
660 .and_then(move |raw_data| {
661 let csum = openssl::sha::sha256(&raw_data);
662 let param = json!({"encoded-size": raw_data.len(), "file-name": file_name });
663 h2.upload("blob", Some(param), raw_data)
664 .map(move |_| {
665 BackupStats { size, csum }
666 })
667 })
668 }
669
670 pub fn upload_blob_from_file<P: AsRef<std::path::Path>>(
671 &self,
672 src_path: P,
673 file_name: &str,
674 crypt_config: Option<Arc<CryptConfig>>,
675 compress: bool,
676 ) -> impl Future<Item=BackupStats, Error=Error> {
677
678 let h2 = self.h2.clone();
679 let file_name = file_name.to_owned();
680 let src_path = src_path.as_ref().to_owned();
681
682 let task = tokio::fs::File::open(src_path.clone())
683 .map_err(move |err| format_err!("unable to open file {:?} - {}", src_path, err))
684 .and_then(move |file| {
685 let contents = vec![];
686 tokio::io::read_to_end(file, contents)
687 .map_err(Error::from)
688 .and_then(move |(_, contents)| {
689 let blob = DataBlob::encode(&contents, crypt_config, compress)?;
690 let raw_data = blob.into_inner();
691 Ok((raw_data, contents.len() as u64))
692 })
693 .and_then(move |(raw_data, size)| {
694 let csum = openssl::sha::sha256(&raw_data);
695 let param = json!({"encoded-size": raw_data.len(), "file-name": file_name });
696 h2.upload("blob", Some(param), raw_data)
697 .map(move |_| {
698 BackupStats { size, csum }
699 })
700 })
701 });
702
703 task
704 }
705
706 pub fn upload_stream(
707 &self,
708 archive_name: &str,
709 stream: impl Stream<Item=bytes::BytesMut, Error=Error>,
710 prefix: &str,
711 fixed_size: Option<u64>,
712 crypt_config: Option<Arc<CryptConfig>>,
713 ) -> impl Future<Item=BackupStats, Error=Error> {
714
715 let known_chunks = Arc::new(Mutex::new(HashSet::new()));
716
717 let h2 = self.h2.clone();
718 let h2_2 = self.h2.clone();
719 let h2_3 = self.h2.clone();
720 let h2_4 = self.h2.clone();
721
722 let mut param = json!({ "archive-name": archive_name });
723 if let Some(size) = fixed_size {
724 param["size"] = size.into();
725 }
726
727 let index_path = format!("{}_index", prefix);
728 let close_path = format!("{}_close", prefix);
729
730 let prefix = prefix.to_owned();
731
732 Self::download_chunk_list(h2, &index_path, archive_name, known_chunks.clone())
733 .and_then(move |_| {
734 h2_2.post(&index_path, Some(param))
735 })
736 .and_then(move |res| {
737 let wid = res.as_u64().unwrap();
738 Self::upload_chunk_info_stream(h2_3, wid, stream, &prefix, known_chunks.clone(), crypt_config)
739 .and_then(move |(chunk_count, size, _speed, csum)| {
740 let param = json!({
741 "wid": wid ,
742 "chunk-count": chunk_count,
743 "size": size,
744 });
745 h2_4.post(&close_path, Some(param))
746 .map(move |_| {
747 BackupStats { size: size as u64, csum }
748 })
749 })
750 })
751 }
752
753 fn response_queue() -> (
754 mpsc::Sender<h2::client::ResponseFuture>,
755 sync::oneshot::Receiver<Result<(), Error>>
756 ) {
757 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100);
758 let (verify_result_tx, verify_result_rx) = sync::oneshot::channel();
759
760 hyper::rt::spawn(
761 verify_queue_rx
762 .map_err(Error::from)
763 .for_each(|response: h2::client::ResponseFuture| {
764 response
765 .map_err(Error::from)
766 .and_then(H2Client::h2api_response)
767 .and_then(|result| {
768 println!("RESPONSE: {:?}", result);
769 Ok(())
770 })
771 .map_err(|err| format_err!("pipelined request failed: {}", err))
772 })
773 .then(|result|
774 verify_result_tx.send(result)
775 )
776 .map_err(|_| { /* ignore closed channel */ })
777 );
778
779 (verify_queue_tx, verify_result_rx)
780 }
781
782 fn append_chunk_queue(h2: H2Client, wid: u64, path: String) -> (
783 mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>,
784 sync::oneshot::Receiver<Result<(), Error>>
785 ) {
786 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(64);
787 let (verify_result_tx, verify_result_rx) = sync::oneshot::channel();
788
789 let h2_2 = h2.clone();
790
791 hyper::rt::spawn(
792 verify_queue_rx
793 .map_err(Error::from)
794 .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| {
795 match (response, merged_chunk_info) {
796 (Some(response), MergedChunkInfo::Known(list)) => {
797 future::Either::A(
798 response
799 .map_err(Error::from)
800 .and_then(H2Client::h2api_response)
801 .and_then(move |_result| {
802 Ok(MergedChunkInfo::Known(list))
803 })
804 )
805 }
806 (None, MergedChunkInfo::Known(list)) => {
807 future::Either::B(future::ok(MergedChunkInfo::Known(list)))
808 }
809 _ => unreachable!(),
810 }
811 })
812 .merge_known_chunks()
813 .and_then(move |merged_chunk_info| {
814 match merged_chunk_info {
815 MergedChunkInfo::Known(chunk_list) => {
816 let mut digest_list = vec![];
817 let mut offset_list = vec![];
818 for (offset, digest) in chunk_list {
819 //println!("append chunk {} (offset {})", proxmox::tools::digest_to_hex(&digest), offset);
820 digest_list.push(digest_to_hex(&digest));
821 offset_list.push(offset);
822 }
823 println!("append chunks list len ({})", digest_list.len());
824 let param = json!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list });
825 let mut request = H2Client::request_builder("localhost", "PUT", &path, None).unwrap();
826 request.headers_mut().insert(hyper::header::CONTENT_TYPE, HeaderValue::from_static("application/json"));
827 let param_data = bytes::Bytes::from(param.to_string().as_bytes());
828 let upload_data = Some(param_data);
829 h2_2.send_request(request, upload_data)
830 .and_then(move |response| {
831 response
832 .map_err(Error::from)
833 .and_then(H2Client::h2api_response)
834 .and_then(|_| Ok(()))
835 })
836 .map_err(|err| format_err!("pipelined request failed: {}", err))
837 }
838 _ => unreachable!(),
839 }
840 })
841 .for_each(|_| Ok(()))
842 .then(|result|
843 verify_result_tx.send(result)
844 )
845 .map_err(|_| { /* ignore closed channel */ })
846 );
847
848 (verify_queue_tx, verify_result_rx)
849 }
850
851 fn download_chunk_list(
852 h2: H2Client,
853 path: &str,
854 archive_name: &str,
855 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
856 ) -> impl Future<Item=(), Error=Error> {
857
858 let param = json!({ "archive-name": archive_name });
859 let request = H2Client::request_builder("localhost", "GET", path, Some(param)).unwrap();
860
861 h2.send_request(request, None)
862 .and_then(move |response| {
863 response
864 .map_err(Error::from)
865 .and_then(move |resp| {
866 let status = resp.status();
867
868 if !status.is_success() {
869 future::Either::A(H2Client::h2api_response(resp).and_then(|_| { bail!("unknown error"); }))
870 } else {
871 future::Either::B(future::ok(resp.into_body()))
872 }
873 })
874 .and_then(move |mut body| {
875
876 let mut release_capacity = body.release_capacity().clone();
877
878 DigestListDecoder::new(body.map_err(Error::from))
879 .for_each(move |chunk| {
880 let _ = release_capacity.release_capacity(chunk.len());
881 println!("GOT DOWNLOAD {}", digest_to_hex(&chunk));
882 known_chunks.lock().unwrap().insert(chunk);
883 Ok(())
884 })
885 })
886 })
887 }
888
889 fn upload_chunk_info_stream(
890 h2: H2Client,
891 wid: u64,
892 stream: impl Stream<Item=bytes::BytesMut, Error=Error>,
893 prefix: &str,
894 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
895 crypt_config: Option<Arc<CryptConfig>>,
896 ) -> impl Future<Item=(usize, usize, usize, [u8; 32]), Error=Error> {
897
898 let repeat = std::sync::Arc::new(AtomicUsize::new(0));
899 let repeat2 = repeat.clone();
900
901 let stream_len = std::sync::Arc::new(AtomicUsize::new(0));
902 let stream_len2 = stream_len.clone();
903
904 let append_chunk_path = format!("{}_index", prefix);
905 let upload_chunk_path = format!("{}_chunk", prefix);
906
907 let (upload_queue, upload_result) = Self::append_chunk_queue(h2.clone(), wid, append_chunk_path.to_owned());
908
909 let start_time = std::time::Instant::now();
910
911 let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
912 let index_csum_2 = index_csum.clone();
913
914 stream
915 .and_then(move |data| {
916
917 let chunk_len = data.len();
918
919 repeat.fetch_add(1, Ordering::SeqCst);
920 let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;
921
922 let mut chunk_builder = DataChunkBuilder::new(data.as_ref())
923 .compress(true);
924
925 if let Some(ref crypt_config) = crypt_config {
926 chunk_builder = chunk_builder.crypt_config(crypt_config);
927 }
928
929 let mut known_chunks = known_chunks.lock().unwrap();
930 let digest = chunk_builder.digest();
931
932 let mut guard = index_csum.lock().unwrap();
933 let csum = guard.as_mut().unwrap();
934 csum.update(&offset.to_le_bytes());
935 csum.update(digest);
936
937 let chunk_is_known = known_chunks.contains(digest);
938 if chunk_is_known {
939 Ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
940 } else {
941 known_chunks.insert(*digest);
942 let chunk = chunk_builder.build()?;
943 Ok(MergedChunkInfo::New(ChunkInfo { chunk, chunk_len: chunk_len as u64, offset }))
944 }
945 })
946 .merge_known_chunks()
947 .for_each(move |merged_chunk_info| {
948
949 if let MergedChunkInfo::New(chunk_info) = merged_chunk_info {
950 let offset = chunk_info.offset;
951 let digest = *chunk_info.chunk.digest();
952 let digest_str = digest_to_hex(&digest);
953 let upload_queue = upload_queue.clone();
954
955 println!("upload new chunk {} ({} bytes, offset {})", digest_str,
956 chunk_info.chunk_len, offset);
957
958 let chunk_data = chunk_info.chunk.raw_data();
959 let param = json!({
960 "wid": wid,
961 "digest": digest_str,
962 "size": chunk_info.chunk_len,
963 "encoded-size": chunk_data.len(),
964 });
965
966 let request = H2Client::request_builder("localhost", "POST", &upload_chunk_path, Some(param)).unwrap();
967 let upload_data = Some(bytes::Bytes::from(chunk_data));
968
969 let new_info = MergedChunkInfo::Known(vec![(offset, digest)]);
970
971 future::Either::A(
972 h2.send_request(request, upload_data)
973 .and_then(move |response| {
974 upload_queue.clone().send((new_info, Some(response)))
975 .map(|_| ()).map_err(Error::from)
976 })
977 )
978 } else {
979
980 future::Either::B(
981 upload_queue.clone().send((merged_chunk_info, None))
982 .map(|_| ()).map_err(Error::from)
983 )
984 }
985 })
986 .then(move |result| {
987 //println!("RESULT {:?}", result);
988 upload_result.map_err(Error::from).and_then(|upload1_result| {
989 Ok(upload1_result.and(result))
990 })
991 })
992 .flatten()
993 .and_then(move |_| {
994 let repeat = repeat2.load(Ordering::SeqCst);
995 let stream_len = stream_len2.load(Ordering::SeqCst);
996 let speed = ((stream_len*1000000)/(1024*1024))/(start_time.elapsed().as_micros() as usize);
997 println!("Uploaded {} chunks in {} seconds ({} MB/s).", repeat, start_time.elapsed().as_secs(), speed);
998 if repeat > 0 {
999 println!("Average chunk size was {} bytes.", stream_len/repeat);
1000 println!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128));
1001 }
1002
1003 let mut guard = index_csum_2.lock().unwrap();
1004 let csum = guard.take().unwrap().finish();
1005
1006 Ok((repeat, stream_len, speed, csum))
1007 })
1008 }
1009
1010 pub fn upload_speedtest(&self) -> impl Future<Item=usize, Error=Error> {
1011
1012 let mut data = vec![];
1013 // generate pseudo random byte sequence
1014 for i in 0..1024*1024 {
1015 for j in 0..4 {
1016 let byte = ((i >> (j<<3))&0xff) as u8;
1017 data.push(byte);
1018 }
1019 }
1020
1021 let item_len = data.len();
1022
1023 let repeat = std::sync::Arc::new(AtomicUsize::new(0));
1024 let repeat2 = repeat.clone();
1025
1026 let (upload_queue, upload_result) = Self::response_queue();
1027
1028 let start_time = std::time::Instant::now();
1029
1030 let h2 = self.h2.clone();
1031
1032 futures::stream::repeat(data)
1033 .take_while(move |_| {
1034 repeat.fetch_add(1, Ordering::SeqCst);
1035 Ok(start_time.elapsed().as_secs() < 5)
1036 })
1037 .for_each(move |data| {
1038 let h2 = h2.clone();
1039
1040 let upload_queue = upload_queue.clone();
1041
1042 println!("send test data ({} bytes)", data.len());
1043 let request = H2Client::request_builder("localhost", "POST", "speedtest", None).unwrap();
1044 h2.send_request(request, Some(bytes::Bytes::from(data)))
1045 .and_then(move |response| {
1046 upload_queue.send(response)
1047 .map(|_| ()).map_err(Error::from)
1048 })
1049 })
1050 .then(move |result| {
1051 println!("RESULT {:?}", result);
1052 upload_result.map_err(Error::from).and_then(|upload1_result| {
1053 Ok(upload1_result.and(result))
1054 })
1055 })
1056 .flatten()
1057 .and_then(move |_| {
1058 let repeat = repeat2.load(Ordering::SeqCst);
1059 println!("Uploaded {} chunks in {} seconds.", repeat, start_time.elapsed().as_secs());
1060 let speed = ((item_len*1000000*(repeat as usize))/(1024*1024))/(start_time.elapsed().as_micros() as usize);
1061 if repeat > 0 {
1062 println!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128));
1063 }
1064 Ok(speed)
1065 })
1066 }
1067 }
1068
1069 #[derive(Clone)]
1070 pub struct H2Client {
1071 h2: h2::client::SendRequest<bytes::Bytes>,
1072 }
1073
1074 impl H2Client {
1075
1076 pub fn new(h2: h2::client::SendRequest<bytes::Bytes>) -> Self {
1077 Self { h2 }
1078 }
1079
1080 pub fn get(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
1081 let req = Self::request_builder("localhost", "GET", path, param).unwrap();
1082 self.request(req)
1083 }
1084
1085 pub fn put(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
1086 let req = Self::request_builder("localhost", "PUT", path, param).unwrap();
1087 self.request(req)
1088 }
1089
1090 pub fn post(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
1091 let req = Self::request_builder("localhost", "POST", path, param).unwrap();
1092 self.request(req)
1093 }
1094
1095 pub fn download<W: Write>(&self, path: &str, param: Option<Value>, output: W) -> impl Future<Item=W, Error=Error> {
1096 let request = Self::request_builder("localhost", "GET", path, param).unwrap();
1097
1098 self.send_request(request, None)
1099 .and_then(move |response| {
1100 response
1101 .map_err(Error::from)
1102 .and_then(move |resp| {
1103 let status = resp.status();
1104 if !status.is_success() {
1105 future::Either::A(
1106 H2Client::h2api_response(resp)
1107 .and_then(|_| { bail!("unknown error"); })
1108 )
1109 } else {
1110 let mut body = resp.into_body();
1111 let mut release_capacity = body.release_capacity().clone();
1112
1113 future::Either::B(
1114 body
1115 .map_err(Error::from)
1116 .fold(output, move |mut acc, chunk| {
1117 let _ = release_capacity.release_capacity(chunk.len());
1118 acc.write_all(&chunk)?;
1119 Ok::<_, Error>(acc)
1120 })
1121 )
1122 }
1123 })
1124 })
1125 }
1126
1127 pub fn upload(&self, path: &str, param: Option<Value>, data: Vec<u8>) -> impl Future<Item=Value, Error=Error> {
1128 let request = Self::request_builder("localhost", "POST", path, param).unwrap();
1129
1130 self.h2.clone()
1131 .ready()
1132 .map_err(Error::from)
1133 .and_then(move |mut send_request| {
1134 let (response, stream) = send_request.send_request(request, false).unwrap();
1135 PipeToSendStream::new(bytes::Bytes::from(data), stream)
1136 .and_then(|_| {
1137 response
1138 .map_err(Error::from)
1139 .and_then(Self::h2api_response)
1140 })
1141 })
1142 }
1143
1144 fn request(
1145 &self,
1146 request: Request<()>,
1147 ) -> impl Future<Item=Value, Error=Error> {
1148
1149 self.send_request(request, None)
1150 .and_then(move |response| {
1151 response
1152 .map_err(Error::from)
1153 .and_then(Self::h2api_response)
1154 })
1155 }
1156
1157 fn send_request(
1158 &self,
1159 request: Request<()>,
1160 data: Option<bytes::Bytes>,
1161 ) -> impl Future<Item=h2::client::ResponseFuture, Error=Error> {
1162
1163 self.h2.clone()
1164 .ready()
1165 .map_err(Error::from)
1166 .and_then(move |mut send_request| {
1167 if let Some(data) = data {
1168 let (response, stream) = send_request.send_request(request, false).unwrap();
1169 future::Either::A(PipeToSendStream::new(data, stream)
1170 .and_then(move |_| {
1171 future::ok(response)
1172 }))
1173 } else {
1174 let (response, _stream) = send_request.send_request(request, true).unwrap();
1175 future::Either::B(future::ok(response))
1176 }
1177 })
1178 }
1179
1180 fn h2api_response(response: Response<h2::RecvStream>) -> impl Future<Item=Value, Error=Error> {
1181
1182 let status = response.status();
1183
1184 let (_head, mut body) = response.into_parts();
1185
1186 // The `release_capacity` handle allows the caller to manage
1187 // flow control.
1188 //
1189 // Whenever data is received, the caller is responsible for
1190 // releasing capacity back to the server once it has freed
1191 // the data from memory.
1192 let mut release_capacity = body.release_capacity().clone();
1193
1194 body
1195 .map(move |chunk| {
1196 // Let the server send more data.
1197 let _ = release_capacity.release_capacity(chunk.len());
1198 chunk
1199 })
1200 .concat2()
1201 .map_err(Error::from)
1202 .and_then(move |data| {
1203 let text = String::from_utf8(data.to_vec()).unwrap();
1204 if status.is_success() {
1205 if text.len() > 0 {
1206 let mut value: Value = serde_json::from_str(&text)?;
1207 if let Some(map) = value.as_object_mut() {
1208 if let Some(data) = map.remove("data") {
1209 return Ok(data);
1210 }
1211 }
1212 bail!("got result without data property");
1213 } else {
1214 Ok(Value::Null)
1215 }
1216 } else {
1217 bail!("HTTP Error {}: {}", status, text);
1218 }
1219 })
1220 }
1221
1222 // Note: We always encode parameters with the url
1223 pub fn request_builder(server: &str, method: &str, path: &str, data: Option<Value>) -> Result<Request<()>, Error> {
1224 let path = path.trim_matches('/');
1225
1226 if let Some(data) = data {
1227 let query = tools::json_object_to_query(data)?;
1228 // We detected problem with hyper around 6000 characters - seo we try to keep on the safe side
1229 if query.len() > 4096 { bail!("h2 query data too large ({} bytes) - please encode data inside body", query.len()); }
1230 let url: Uri = format!("https://{}:8007/{}?{}", server, path, query).parse()?;
1231 let request = Request::builder()
1232 .method(method)
1233 .uri(url)
1234 .header("User-Agent", "proxmox-backup-client/1.0")
1235 .header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded")
1236 .body(())?;
1237 return Ok(request);
1238 } else {
1239 let url: Uri = format!("https://{}:8007/{}", server, path).parse()?;
1240 let request = Request::builder()
1241 .method(method)
1242 .uri(url)
1243 .header("User-Agent", "proxmox-backup-client/1.0")
1244 .header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded")
1245 .body(())?;
1246
1247 Ok(request)
1248 }
1249 }
1250 }