]> git.proxmox.com Git - proxmox-backup.git/blob - src/client/http_client.rs
set reasonable TCP keepalive timeout
[proxmox-backup.git] / src / client / http_client.rs
1 use std::io::Write;
2 use std::task::{Context, Poll};
3 use std::sync::{Arc, Mutex, RwLock};
4 use std::time::Duration;
5 use std::os::unix::io::AsRawFd;
6
7 use anyhow::{bail, format_err, Error};
8 use futures::*;
9 use http::Uri;
10 use http::header::HeaderValue;
11 use http::{Request, Response};
12 use hyper::Body;
13 use hyper::client::{Client, HttpConnector};
14 use openssl::{ssl::{SslConnector, SslMethod}, x509::X509StoreContextRef};
15 use serde_json::{json, Value};
16 use percent_encoding::percent_encode;
17 use xdg::BaseDirectories;
18
19 use proxmox::{
20 api::error::HttpError,
21 sys::linux::tty,
22 tools::{
23 fs::{file_get_json, replace_file, CreateOptions},
24 }
25 };
26
27 use super::pipe_to_stream::PipeToSendStream;
28 use crate::api2::types::Userid;
29 use crate::tools::async_io::EitherStream;
30 use crate::tools::{
31 self,
32 BroadcastFuture,
33 DEFAULT_ENCODE_SET,
34 socket::{
35 set_tcp_keepalive,
36 PROXMOX_BACKUP_TCP_KEEPALIVE_TIME,
37 },
38 };
39
40 #[derive(Clone)]
41 pub struct AuthInfo {
42 pub userid: Userid,
43 pub ticket: String,
44 pub token: String,
45 }
46
47 pub struct HttpClientOptions {
48 prefix: Option<String>,
49 password: Option<String>,
50 fingerprint: Option<String>,
51 interactive: bool,
52 ticket_cache: bool,
53 fingerprint_cache: bool,
54 verify_cert: bool,
55 }
56
57 impl HttpClientOptions {
58
59 pub fn new() -> Self {
60 Self {
61 prefix: None,
62 password: None,
63 fingerprint: None,
64 interactive: false,
65 ticket_cache: false,
66 fingerprint_cache: false,
67 verify_cert: true,
68 }
69 }
70
71 pub fn prefix(mut self, prefix: Option<String>) -> Self {
72 self.prefix = prefix;
73 self
74 }
75
76 pub fn password(mut self, password: Option<String>) -> Self {
77 self.password = password;
78 self
79 }
80
81 pub fn fingerprint(mut self, fingerprint: Option<String>) -> Self {
82 self.fingerprint = fingerprint;
83 self
84 }
85
86 pub fn interactive(mut self, interactive: bool) -> Self {
87 self.interactive = interactive;
88 self
89 }
90
91 pub fn ticket_cache(mut self, ticket_cache: bool) -> Self {
92 self.ticket_cache = ticket_cache;
93 self
94 }
95
96 pub fn fingerprint_cache(mut self, fingerprint_cache: bool) -> Self {
97 self.fingerprint_cache = fingerprint_cache;
98 self
99 }
100
101 pub fn verify_cert(mut self, verify_cert: bool) -> Self {
102 self.verify_cert = verify_cert;
103 self
104 }
105 }
106
107 /// HTTP(S) API client
108 pub struct HttpClient {
109 client: Client<HttpsConnector>,
110 server: String,
111 port: u16,
112 fingerprint: Arc<Mutex<Option<String>>>,
113 first_auth: BroadcastFuture<()>,
114 auth: Arc<RwLock<AuthInfo>>,
115 ticket_abort: futures::future::AbortHandle,
116 _options: HttpClientOptions,
117 }
118
119 /// Delete stored ticket data (logout)
120 pub fn delete_ticket_info(prefix: &str, server: &str, username: &Userid) -> Result<(), Error> {
121
122 let base = BaseDirectories::with_prefix(prefix)?;
123
124 // usually /run/user/<uid>/...
125 let path = base.place_runtime_file("tickets")?;
126
127 let mode = nix::sys::stat::Mode::from_bits_truncate(0o0600);
128
129 let mut data = file_get_json(&path, Some(json!({})))?;
130
131 if let Some(map) = data[server].as_object_mut() {
132 map.remove(username.as_str());
133 }
134
135 replace_file(path, data.to_string().as_bytes(), CreateOptions::new().perm(mode))?;
136
137 Ok(())
138 }
139
140 fn store_fingerprint(prefix: &str, server: &str, fingerprint: &str) -> Result<(), Error> {
141
142 let base = BaseDirectories::with_prefix(prefix)?;
143
144 // usually ~/.config/<prefix>/fingerprints
145 let path = base.place_config_file("fingerprints")?;
146
147 let raw = match std::fs::read_to_string(&path) {
148 Ok(v) => v,
149 Err(err) => {
150 if err.kind() == std::io::ErrorKind::NotFound {
151 String::new()
152 } else {
153 bail!("unable to read fingerprints from {:?} - {}", path, err);
154 }
155 }
156 };
157
158 let mut result = String::new();
159
160 raw.split('\n').for_each(|line| {
161 let items: Vec<String> = line.split_whitespace().map(String::from).collect();
162 if items.len() == 2 {
163 if &items[0] == server {
164 // found, add later with new fingerprint
165 } else {
166 result.push_str(line);
167 result.push('\n');
168 }
169 }
170 });
171
172 result.push_str(server);
173 result.push(' ');
174 result.push_str(fingerprint);
175 result.push('\n');
176
177 replace_file(path, result.as_bytes(), CreateOptions::new())?;
178
179 Ok(())
180 }
181
182 fn load_fingerprint(prefix: &str, server: &str) -> Option<String> {
183
184 let base = BaseDirectories::with_prefix(prefix).ok()?;
185
186 // usually ~/.config/<prefix>/fingerprints
187 let path = base.place_config_file("fingerprints").ok()?;
188
189 let raw = std::fs::read_to_string(&path).ok()?;
190
191 for line in raw.split('\n') {
192 let items: Vec<String> = line.split_whitespace().map(String::from).collect();
193 if items.len() == 2 && &items[0] == server {
194 return Some(items[1].clone());
195 }
196 }
197
198 None
199 }
200
201 fn store_ticket_info(prefix: &str, server: &str, username: &str, ticket: &str, token: &str) -> Result<(), Error> {
202
203 let base = BaseDirectories::with_prefix(prefix)?;
204
205 // usually /run/user/<uid>/...
206 let path = base.place_runtime_file("tickets")?;
207
208 let mode = nix::sys::stat::Mode::from_bits_truncate(0o0600);
209
210 let mut data = file_get_json(&path, Some(json!({})))?;
211
212 let now = proxmox::tools::time::epoch_i64();
213
214 data[server][username] = json!({ "timestamp": now, "ticket": ticket, "token": token});
215
216 let mut new_data = json!({});
217
218 let ticket_lifetime = tools::ticket::TICKET_LIFETIME - 60;
219
220 let empty = serde_json::map::Map::new();
221 for (server, info) in data.as_object().unwrap_or(&empty) {
222 for (_user, uinfo) in info.as_object().unwrap_or(&empty) {
223 if let Some(timestamp) = uinfo["timestamp"].as_i64() {
224 let age = now - timestamp;
225 if age < ticket_lifetime {
226 new_data[server][username] = uinfo.clone();
227 }
228 }
229 }
230 }
231
232 replace_file(path, new_data.to_string().as_bytes(), CreateOptions::new().perm(mode))?;
233
234 Ok(())
235 }
236
237 fn load_ticket_info(prefix: &str, server: &str, userid: &Userid) -> Option<(String, String)> {
238 let base = BaseDirectories::with_prefix(prefix).ok()?;
239
240 // usually /run/user/<uid>/...
241 let path = base.place_runtime_file("tickets").ok()?;
242 let data = file_get_json(&path, None).ok()?;
243 let now = proxmox::tools::time::epoch_i64();
244 let ticket_lifetime = tools::ticket::TICKET_LIFETIME - 60;
245 let uinfo = data[server][userid.as_str()].as_object()?;
246 let timestamp = uinfo["timestamp"].as_i64()?;
247 let age = now - timestamp;
248
249 if age < ticket_lifetime {
250 let ticket = uinfo["ticket"].as_str()?;
251 let token = uinfo["token"].as_str()?;
252 Some((ticket.to_owned(), token.to_owned()))
253 } else {
254 None
255 }
256 }
257
258 impl HttpClient {
259 pub fn new(
260 server: &str,
261 port: u16,
262 userid: &Userid,
263 mut options: HttpClientOptions,
264 ) -> Result<Self, Error> {
265
266 let verified_fingerprint = Arc::new(Mutex::new(None));
267
268 let mut fingerprint = options.fingerprint.take();
269
270 if fingerprint.is_some() {
271 // do not store fingerprints passed via options in cache
272 options.fingerprint_cache = false;
273 } else if options.fingerprint_cache && options.prefix.is_some() {
274 fingerprint = load_fingerprint(options.prefix.as_ref().unwrap(), server);
275 }
276
277 let mut ssl_connector_builder = SslConnector::builder(SslMethod::tls()).unwrap();
278
279 if options.verify_cert {
280 let server = server.to_string();
281 let verified_fingerprint = verified_fingerprint.clone();
282 let interactive = options.interactive;
283 let fingerprint_cache = options.fingerprint_cache;
284 let prefix = options.prefix.clone();
285 ssl_connector_builder.set_verify_callback(openssl::ssl::SslVerifyMode::PEER, move |valid, ctx| {
286 let (valid, fingerprint) = Self::verify_callback(valid, ctx, fingerprint.clone(), interactive);
287 if valid {
288 if let Some(fingerprint) = fingerprint {
289 if fingerprint_cache && prefix.is_some() {
290 if let Err(err) = store_fingerprint(
291 prefix.as_ref().unwrap(), &server, &fingerprint) {
292 eprintln!("{}", err);
293 }
294 }
295 *verified_fingerprint.lock().unwrap() = Some(fingerprint);
296 }
297 }
298 valid
299 });
300 } else {
301 ssl_connector_builder.set_verify(openssl::ssl::SslVerifyMode::NONE);
302 }
303
304 let mut httpc = hyper::client::HttpConnector::new();
305 httpc.set_nodelay(true); // important for h2 download performance!
306 httpc.enforce_http(false); // we want https...
307
308 httpc.set_connect_timeout(Some(std::time::Duration::new(10, 0)));
309 let https = HttpsConnector::with_connector(httpc, ssl_connector_builder.build());
310
311 let client = Client::builder()
312 //.http2_initial_stream_window_size( (1 << 31) - 2)
313 //.http2_initial_connection_window_size( (1 << 31) - 2)
314 .build::<_, Body>(https);
315
316 let password = options.password.take();
317 let use_ticket_cache = options.ticket_cache && options.prefix.is_some();
318
319 let password = if let Some(password) = password {
320 password
321 } else {
322 let mut ticket_info = None;
323 if use_ticket_cache {
324 ticket_info = load_ticket_info(options.prefix.as_ref().unwrap(), server, userid);
325 }
326 if let Some((ticket, _token)) = ticket_info {
327 ticket
328 } else {
329 Self::get_password(userid, options.interactive)?
330 }
331 };
332
333 let auth = Arc::new(RwLock::new(AuthInfo {
334 userid: userid.clone(),
335 ticket: password.clone(),
336 token: "".to_string(),
337 }));
338
339 let server2 = server.to_string();
340 let client2 = client.clone();
341 let auth2 = auth.clone();
342 let prefix2 = options.prefix.clone();
343
344 let renewal_future = async move {
345 loop {
346 tokio::time::delay_for(Duration::new(60*15, 0)).await; // 15 minutes
347 let (userid, ticket) = {
348 let authinfo = auth2.read().unwrap().clone();
349 (authinfo.userid, authinfo.ticket)
350 };
351 match Self::credentials(client2.clone(), server2.clone(), port, userid, ticket).await {
352 Ok(auth) => {
353 if use_ticket_cache & &prefix2.is_some() {
354 let _ = store_ticket_info(prefix2.as_ref().unwrap(), &server2, &auth.userid.to_string(), &auth.ticket, &auth.token);
355 }
356 *auth2.write().unwrap() = auth;
357 },
358 Err(err) => {
359 eprintln!("re-authentication failed: {}", err);
360 return;
361 }
362 }
363 }
364 };
365
366 let (renewal_future, ticket_abort) = futures::future::abortable(renewal_future);
367
368 let login_future = Self::credentials(
369 client.clone(),
370 server.to_owned(),
371 port,
372 userid.to_owned(),
373 password.to_owned(),
374 ).map_ok({
375 let server = server.to_string();
376 let prefix = options.prefix.clone();
377 let authinfo = auth.clone();
378
379 move |auth| {
380 if use_ticket_cache & &prefix.is_some() {
381 let _ = store_ticket_info(prefix.as_ref().unwrap(), &server, &auth.userid.to_string(), &auth.ticket, &auth.token);
382 }
383 *authinfo.write().unwrap() = auth;
384 tokio::spawn(renewal_future);
385 }
386 });
387
388 Ok(Self {
389 client,
390 server: String::from(server),
391 port,
392 fingerprint: verified_fingerprint,
393 auth,
394 ticket_abort,
395 first_auth: BroadcastFuture::new(Box::new(login_future)),
396 _options: options,
397 })
398 }
399
400 /// Login
401 ///
402 /// Login is done on demand, so this is only required if you need
403 /// access to authentication data in 'AuthInfo'.
404 pub async fn login(&self) -> Result<AuthInfo, Error> {
405 self.first_auth.listen().await?;
406 let authinfo = self.auth.read().unwrap();
407 Ok(authinfo.clone())
408 }
409
410 /// Returns the optional fingerprint passed to the new() constructor.
411 pub fn fingerprint(&self) -> Option<String> {
412 (*self.fingerprint.lock().unwrap()).clone()
413 }
414
415 fn get_password(username: &Userid, interactive: bool) -> Result<String, Error> {
416 // If we're on a TTY, query the user for a password
417 if interactive && tty::stdin_isatty() {
418 let msg = format!("Password for \"{}\": ", username);
419 return Ok(String::from_utf8(tty::read_password(&msg)?)?);
420 }
421
422 bail!("no password input mechanism available");
423 }
424
425 fn verify_callback(
426 valid: bool, ctx:
427 &mut X509StoreContextRef,
428 expected_fingerprint: Option<String>,
429 interactive: bool,
430 ) -> (bool, Option<String>) {
431 if valid { return (true, None); }
432
433 let cert = match ctx.current_cert() {
434 Some(cert) => cert,
435 None => return (false, None),
436 };
437
438 let depth = ctx.error_depth();
439 if depth != 0 { return (false, None); }
440
441 let fp = match cert.digest(openssl::hash::MessageDigest::sha256()) {
442 Ok(fp) => fp,
443 Err(_) => return (false, None), // should not happen
444 };
445 let fp_string = proxmox::tools::digest_to_hex(&fp);
446 let fp_string = fp_string.as_bytes().chunks(2).map(|v| std::str::from_utf8(v).unwrap())
447 .collect::<Vec<&str>>().join(":");
448
449 if let Some(expected_fingerprint) = expected_fingerprint {
450 if expected_fingerprint.to_lowercase() == fp_string {
451 return (true, Some(fp_string));
452 } else {
453 return (false, None);
454 }
455 }
456
457 // If we're on a TTY, query the user
458 if interactive && tty::stdin_isatty() {
459 println!("fingerprint: {}", fp_string);
460 loop {
461 print!("Are you sure you want to continue connecting? (y/n): ");
462 let _ = std::io::stdout().flush();
463 use std::io::{BufRead, BufReader};
464 let mut line = String::new();
465 match BufReader::new(std::io::stdin()).read_line(&mut line) {
466 Ok(_) => {
467 let trimmed = line.trim();
468 if trimmed == "y" || trimmed == "Y" {
469 return (true, Some(fp_string));
470 } else if trimmed == "n" || trimmed == "N" {
471 return (false, None);
472 } else {
473 continue;
474 }
475 }
476 Err(_) => return (false, None),
477 }
478 }
479 }
480 (false, None)
481 }
482
483 pub async fn request(&self, mut req: Request<Body>) -> Result<Value, Error> {
484
485 let client = self.client.clone();
486
487 let auth = self.login().await?;
488
489 let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
490 req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
491 req.headers_mut().insert("CSRFPreventionToken", HeaderValue::from_str(&auth.token).unwrap());
492
493 Self::api_request(client, req).await
494 }
495
496 pub async fn get(
497 &self,
498 path: &str,
499 data: Option<Value>,
500 ) -> Result<Value, Error> {
501 let req = Self::request_builder(&self.server, self.port, "GET", path, data)?;
502 self.request(req).await
503 }
504
505 pub async fn delete(
506 &mut self,
507 path: &str,
508 data: Option<Value>,
509 ) -> Result<Value, Error> {
510 let req = Self::request_builder(&self.server, self.port, "DELETE", path, data)?;
511 self.request(req).await
512 }
513
514 pub async fn post(
515 &mut self,
516 path: &str,
517 data: Option<Value>,
518 ) -> Result<Value, Error> {
519 let req = Self::request_builder(&self.server, self.port, "POST", path, data)?;
520 self.request(req).await
521 }
522
523 pub async fn download(
524 &mut self,
525 path: &str,
526 output: &mut (dyn Write + Send),
527 ) -> Result<(), Error> {
528 let mut req = Self::request_builder(&self.server, self.port, "GET", path, None)?;
529
530 let client = self.client.clone();
531
532 let auth = self.login().await?;
533
534 let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
535 req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
536
537 let resp = client.request(req).await?;
538 let status = resp.status();
539 if !status.is_success() {
540 HttpClient::api_response(resp)
541 .map(|_| Err(format_err!("unknown error")))
542 .await?
543 } else {
544 resp.into_body()
545 .map_err(Error::from)
546 .try_fold(output, move |acc, chunk| async move {
547 acc.write_all(&chunk)?;
548 Ok::<_, Error>(acc)
549 })
550 .await?;
551 }
552 Ok(())
553 }
554
555 pub async fn upload(
556 &mut self,
557 content_type: &str,
558 body: Body,
559 path: &str,
560 data: Option<Value>,
561 ) -> Result<Value, Error> {
562
563 let path = path.trim_matches('/');
564 let mut url = format!("https://{}:{}/{}", &self.server, self.port, path);
565
566 if let Some(data) = data {
567 let query = tools::json_object_to_query(data).unwrap();
568 url.push('?');
569 url.push_str(&query);
570 }
571
572 let url: Uri = url.parse().unwrap();
573
574 let req = Request::builder()
575 .method("POST")
576 .uri(url)
577 .header("User-Agent", "proxmox-backup-client/1.0")
578 .header("Content-Type", content_type)
579 .body(body).unwrap();
580
581 self.request(req).await
582 }
583
584 pub async fn start_h2_connection(
585 &self,
586 mut req: Request<Body>,
587 protocol_name: String,
588 ) -> Result<(H2Client, futures::future::AbortHandle), Error> {
589
590 let auth = self.login().await?;
591 let client = self.client.clone();
592
593 let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
594 req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
595 req.headers_mut().insert("UPGRADE", HeaderValue::from_str(&protocol_name).unwrap());
596
597 let resp = client.request(req).await?;
598 let status = resp.status();
599
600 if status != http::StatusCode::SWITCHING_PROTOCOLS {
601 Self::api_response(resp).await?;
602 bail!("unknown error");
603 }
604
605 let upgraded = resp
606 .into_body()
607 .on_upgrade()
608 .await?;
609
610 let max_window_size = (1 << 31) - 2;
611
612 let (h2, connection) = h2::client::Builder::new()
613 .initial_connection_window_size(max_window_size)
614 .initial_window_size(max_window_size)
615 .max_frame_size(4*1024*1024)
616 .handshake(upgraded)
617 .await?;
618
619 let connection = connection
620 .map_err(|_| eprintln!("HTTP/2.0 connection failed"));
621
622 let (connection, abort) = futures::future::abortable(connection);
623 // A cancellable future returns an Option which is None when cancelled and
624 // Some when it finished instead, since we don't care about the return type we
625 // need to map it away:
626 let connection = connection.map(|_| ());
627
628 // Spawn a new task to drive the connection state
629 tokio::spawn(connection);
630
631 // Wait until the `SendRequest` handle has available capacity.
632 let c = h2.ready().await?;
633 Ok((H2Client::new(c), abort))
634 }
635
636 async fn credentials(
637 client: Client<HttpsConnector>,
638 server: String,
639 port: u16,
640 username: Userid,
641 password: String,
642 ) -> Result<AuthInfo, Error> {
643 let data = json!({ "username": username, "password": password });
644 let req = Self::request_builder(&server, port, "POST", "/api2/json/access/ticket", Some(data))?;
645 let cred = Self::api_request(client, req).await?;
646 let auth = AuthInfo {
647 userid: cred["data"]["username"].as_str().unwrap().parse()?,
648 ticket: cred["data"]["ticket"].as_str().unwrap().to_owned(),
649 token: cred["data"]["CSRFPreventionToken"].as_str().unwrap().to_owned(),
650 };
651
652 Ok(auth)
653 }
654
655 async fn api_response(response: Response<Body>) -> Result<Value, Error> {
656 let status = response.status();
657 let data = hyper::body::to_bytes(response.into_body()).await?;
658
659 let text = String::from_utf8(data.to_vec()).unwrap();
660 if status.is_success() {
661 if text.is_empty() {
662 Ok(Value::Null)
663 } else {
664 let value: Value = serde_json::from_str(&text)?;
665 Ok(value)
666 }
667 } else {
668 Err(Error::from(HttpError::new(status, text)))
669 }
670 }
671
672 async fn api_request(
673 client: Client<HttpsConnector>,
674 req: Request<Body>
675 ) -> Result<Value, Error> {
676
677 client.request(req)
678 .map_err(Error::from)
679 .and_then(Self::api_response)
680 .await
681 }
682
683 // Read-only access to server property
684 pub fn server(&self) -> &str {
685 &self.server
686 }
687
688 pub fn port(&self) -> u16 {
689 self.port
690 }
691
692 pub fn request_builder(server: &str, port: u16, method: &str, path: &str, data: Option<Value>) -> Result<Request<Body>, Error> {
693 let path = path.trim_matches('/');
694 let url: Uri = format!("https://{}:{}/{}", server, port, path).parse()?;
695
696 if let Some(data) = data {
697 if method == "POST" {
698 let request = Request::builder()
699 .method(method)
700 .uri(url)
701 .header("User-Agent", "proxmox-backup-client/1.0")
702 .header(hyper::header::CONTENT_TYPE, "application/json")
703 .body(Body::from(data.to_string()))?;
704 return Ok(request);
705 } else {
706 let query = tools::json_object_to_query(data)?;
707 let url: Uri = format!("https://{}:{}/{}?{}", server, port, path, query).parse()?;
708 let request = Request::builder()
709 .method(method)
710 .uri(url)
711 .header("User-Agent", "proxmox-backup-client/1.0")
712 .header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded")
713 .body(Body::empty())?;
714 return Ok(request);
715 }
716 }
717
718 let request = Request::builder()
719 .method(method)
720 .uri(url)
721 .header("User-Agent", "proxmox-backup-client/1.0")
722 .header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded")
723 .body(Body::empty())?;
724
725 Ok(request)
726 }
727 }
728
729 impl Drop for HttpClient {
730 fn drop(&mut self) {
731 self.ticket_abort.abort();
732 }
733 }
734
735
736 #[derive(Clone)]
737 pub struct H2Client {
738 h2: h2::client::SendRequest<bytes::Bytes>,
739 }
740
741 impl H2Client {
742
743 pub fn new(h2: h2::client::SendRequest<bytes::Bytes>) -> Self {
744 Self { h2 }
745 }
746
747 pub async fn get(
748 &self,
749 path: &str,
750 param: Option<Value>
751 ) -> Result<Value, Error> {
752 let req = Self::request_builder("localhost", "GET", path, param, None).unwrap();
753 self.request(req).await
754 }
755
756 pub async fn put(
757 &self,
758 path: &str,
759 param: Option<Value>
760 ) -> Result<Value, Error> {
761 let req = Self::request_builder("localhost", "PUT", path, param, None).unwrap();
762 self.request(req).await
763 }
764
765 pub async fn post(
766 &self,
767 path: &str,
768 param: Option<Value>
769 ) -> Result<Value, Error> {
770 let req = Self::request_builder("localhost", "POST", path, param, None).unwrap();
771 self.request(req).await
772 }
773
774 pub async fn download<W: Write + Send>(
775 &self,
776 path: &str,
777 param: Option<Value>,
778 mut output: W,
779 ) -> Result<(), Error> {
780 let request = Self::request_builder("localhost", "GET", path, param, None).unwrap();
781
782 let response_future = self.send_request(request, None).await?;
783
784 let resp = response_future.await?;
785
786 let status = resp.status();
787 if !status.is_success() {
788 H2Client::h2api_response(resp).await?; // raise error
789 unreachable!();
790 }
791
792 let mut body = resp.into_body();
793 while let Some(chunk) = body.data().await {
794 let chunk = chunk?;
795 body.flow_control().release_capacity(chunk.len())?;
796 output.write_all(&chunk)?;
797 }
798
799 Ok(())
800 }
801
802 pub async fn upload(
803 &self,
804 method: &str, // POST or PUT
805 path: &str,
806 param: Option<Value>,
807 content_type: &str,
808 data: Vec<u8>,
809 ) -> Result<Value, Error> {
810 let request = Self::request_builder("localhost", method, path, param, Some(content_type)).unwrap();
811
812 let mut send_request = self.h2.clone().ready().await?;
813
814 let (response, stream) = send_request.send_request(request, false).unwrap();
815
816 PipeToSendStream::new(bytes::Bytes::from(data), stream).await?;
817
818 response
819 .map_err(Error::from)
820 .and_then(Self::h2api_response)
821 .await
822 }
823
824 async fn request(
825 &self,
826 request: Request<()>,
827 ) -> Result<Value, Error> {
828
829 self.send_request(request, None)
830 .and_then(move |response| {
831 response
832 .map_err(Error::from)
833 .and_then(Self::h2api_response)
834 })
835 .await
836 }
837
838 pub fn send_request(
839 &self,
840 request: Request<()>,
841 data: Option<bytes::Bytes>,
842 ) -> impl Future<Output = Result<h2::client::ResponseFuture, Error>> {
843
844 self.h2.clone()
845 .ready()
846 .map_err(Error::from)
847 .and_then(move |mut send_request| async move {
848 if let Some(data) = data {
849 let (response, stream) = send_request.send_request(request, false).unwrap();
850 PipeToSendStream::new(data, stream).await?;
851 Ok(response)
852 } else {
853 let (response, _stream) = send_request.send_request(request, true).unwrap();
854 Ok(response)
855 }
856 })
857 }
858
859 pub async fn h2api_response(
860 response: Response<h2::RecvStream>,
861 ) -> Result<Value, Error> {
862 let status = response.status();
863
864 let (_head, mut body) = response.into_parts();
865
866 let mut data = Vec::new();
867 while let Some(chunk) = body.data().await {
868 let chunk = chunk?;
869 // Whenever data is received, the caller is responsible for
870 // releasing capacity back to the server once it has freed
871 // the data from memory.
872 // Let the server send more data.
873 body.flow_control().release_capacity(chunk.len())?;
874 data.extend(chunk);
875 }
876
877 let text = String::from_utf8(data.to_vec()).unwrap();
878 if status.is_success() {
879 if text.is_empty() {
880 Ok(Value::Null)
881 } else {
882 let mut value: Value = serde_json::from_str(&text)?;
883 if let Some(map) = value.as_object_mut() {
884 if let Some(data) = map.remove("data") {
885 return Ok(data);
886 }
887 }
888 bail!("got result without data property");
889 }
890 } else {
891 Err(Error::from(HttpError::new(status, text)))
892 }
893 }
894
895 // Note: We always encode parameters with the url
896 pub fn request_builder(
897 server: &str,
898 method: &str,
899 path: &str,
900 param: Option<Value>,
901 content_type: Option<&str>,
902 ) -> Result<Request<()>, Error> {
903 let path = path.trim_matches('/');
904
905 let content_type = content_type.unwrap_or("application/x-www-form-urlencoded");
906
907 if let Some(param) = param {
908 let query = tools::json_object_to_query(param)?;
909 // We detected problem with hyper around 6000 characters - seo we try to keep on the safe side
910 if query.len() > 4096 { bail!("h2 query data too large ({} bytes) - please encode data inside body", query.len()); }
911 let url: Uri = format!("https://{}:8007/{}?{}", server, path, query).parse()?;
912 let request = Request::builder()
913 .method(method)
914 .uri(url)
915 .header("User-Agent", "proxmox-backup-client/1.0")
916 .header(hyper::header::CONTENT_TYPE, content_type)
917 .body(())?;
918 Ok(request)
919 } else {
920 let url: Uri = format!("https://{}:8007/{}", server, path).parse()?;
921 let request = Request::builder()
922 .method(method)
923 .uri(url)
924 .header("User-Agent", "proxmox-backup-client/1.0")
925 .header(hyper::header::CONTENT_TYPE, content_type)
926 .body(())?;
927
928 Ok(request)
929 }
930 }
931 }
932
933 #[derive(Clone)]
934 pub struct HttpsConnector {
935 http: HttpConnector,
936 ssl_connector: std::sync::Arc<SslConnector>,
937 }
938
939 impl HttpsConnector {
940 pub fn with_connector(mut http: HttpConnector, ssl_connector: SslConnector) -> Self {
941 http.enforce_http(false);
942
943 Self {
944 http,
945 ssl_connector: std::sync::Arc::new(ssl_connector),
946 }
947 }
948 }
949
950 type MaybeTlsStream = EitherStream<
951 tokio::net::TcpStream,
952 tokio_openssl::SslStream<tokio::net::TcpStream>,
953 >;
954
955 impl hyper::service::Service<Uri> for HttpsConnector {
956 type Response = MaybeTlsStream;
957 type Error = Error;
958 type Future = std::pin::Pin<Box<
959 dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static
960 >>;
961
962 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
963 // This connector is always ready, but others might not be.
964 Poll::Ready(Ok(()))
965 }
966
967 fn call(&mut self, dst: Uri) -> Self::Future {
968 let mut this = self.clone();
969 async move {
970 let is_https = dst
971 .scheme()
972 .ok_or_else(|| format_err!("missing URL scheme"))?
973 == "https";
974 let host = dst
975 .host()
976 .ok_or_else(|| format_err!("missing hostname in destination url?"))?
977 .to_string();
978
979 let config = this.ssl_connector.configure();
980 let conn = this.http.call(dst).await?;
981
982 let _ = set_tcp_keepalive(conn.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
983
984 if is_https {
985 let conn = tokio_openssl::connect(config?, &host, conn).await?;
986 Ok(MaybeTlsStream::Right(conn))
987 } else {
988 Ok(MaybeTlsStream::Left(conn))
989 }
990 }.boxed()
991 }
992 }