]> git.proxmox.com Git - proxmox-backup.git/blob - src/client/http_client.rs
2399e3ebf758cbf0bf78d30887424f9656039d95
[proxmox-backup.git] / src / client / http_client.rs
1 use std::io::Write;
2
3 use chrono::Utc;
4 use failure::*;
5 use futures::*;
6 use http::Uri;
7 use http::header::HeaderValue;
8 use http::{Request, Response};
9 use hyper::Body;
10 use hyper::client::{Client, HttpConnector};
11 use openssl::ssl::{SslConnector, SslMethod};
12 use serde_json::{json, Value};
13 use url::percent_encoding::{percent_encode, DEFAULT_ENCODE_SET};
14 use xdg::BaseDirectories;
15
16 use proxmox::tools::{
17 fs::{file_get_json, file_set_contents},
18 };
19
20 use super::pipe_to_stream::PipeToSendStream;
21 use crate::tools::async_io::EitherStream;
22 use crate::tools::futures::{cancellable, Canceller};
23 use crate::tools::{self, tty, BroadcastFuture};
24
25 #[derive(Clone)]
26 pub struct AuthInfo {
27 username: String,
28 ticket: String,
29 token: String,
30 }
31
32 /// HTTP(S) API client
33 pub struct HttpClient {
34 client: Client<HttpsConnector>,
35 server: String,
36 auth: BroadcastFuture<AuthInfo>,
37 }
38
39 /// Delete stored ticket data (logout)
40 pub fn delete_ticket_info(server: &str, username: &str) -> Result<(), Error> {
41
42 let base = BaseDirectories::with_prefix("proxmox-backup")?;
43
44 // usually /run/user/<uid>/...
45 let path = base.place_runtime_file("tickets")?;
46
47 let mode = nix::sys::stat::Mode::from_bits_truncate(0o0600);
48
49 let mut data = file_get_json(&path, Some(json!({})))?;
50
51 if let Some(map) = data[server].as_object_mut() {
52 map.remove(username);
53 }
54
55 file_set_contents(path, data.to_string().as_bytes(), Some(mode))?;
56
57 Ok(())
58 }
59
60 fn store_ticket_info(server: &str, username: &str, ticket: &str, token: &str) -> Result<(), Error> {
61
62 let base = BaseDirectories::with_prefix("proxmox-backup")?;
63
64 // usually /run/user/<uid>/...
65 let path = base.place_runtime_file("tickets")?;
66
67 let mode = nix::sys::stat::Mode::from_bits_truncate(0o0600);
68
69 let mut data = file_get_json(&path, Some(json!({})))?;
70
71 let now = Utc::now().timestamp();
72
73 data[server][username] = json!({ "timestamp": now, "ticket": ticket, "token": token});
74
75 let mut new_data = json!({});
76
77 let ticket_lifetime = tools::ticket::TICKET_LIFETIME - 60;
78
79 let empty = serde_json::map::Map::new();
80 for (server, info) in data.as_object().unwrap_or(&empty) {
81 for (_user, uinfo) in info.as_object().unwrap_or(&empty) {
82 if let Some(timestamp) = uinfo["timestamp"].as_i64() {
83 let age = now - timestamp;
84 if age < ticket_lifetime {
85 new_data[server][username] = uinfo.clone();
86 }
87 }
88 }
89 }
90
91 file_set_contents(path, new_data.to_string().as_bytes(), Some(mode))?;
92
93 Ok(())
94 }
95
96 fn load_ticket_info(server: &str, username: &str) -> Option<(String, String)> {
97 let base = BaseDirectories::with_prefix("proxmox-backup").ok()?;
98
99 // usually /run/user/<uid>/...
100 let path = base.place_runtime_file("tickets").ok()?;
101 let data = file_get_json(&path, None).ok()?;
102 let now = Utc::now().timestamp();
103 let ticket_lifetime = tools::ticket::TICKET_LIFETIME - 60;
104 let uinfo = data[server][username].as_object()?;
105 let timestamp = uinfo["timestamp"].as_i64()?;
106 let age = now - timestamp;
107
108 if age < ticket_lifetime {
109 let ticket = uinfo["ticket"].as_str()?;
110 let token = uinfo["token"].as_str()?;
111 Some((ticket.to_owned(), token.to_owned()))
112 } else {
113 None
114 }
115 }
116
117 impl HttpClient {
118
119 pub fn new(server: &str, username: &str, password: Option<String>) -> Result<Self, Error> {
120 let client = Self::build_client();
121
122 let password = if let Some(password) = password {
123 password
124 } else if let Some((ticket, _token)) = load_ticket_info(server, username) {
125 ticket
126 } else {
127 Self::get_password(&username)?
128 };
129
130 let login_future = Self::credentials(client.clone(), server.to_owned(), username.to_owned(), password);
131
132 Ok(Self {
133 client,
134 server: String::from(server),
135 auth: BroadcastFuture::new(Box::new(login_future)),
136 })
137 }
138
139 /// Login
140 ///
141 /// Login is done on demand, so this is onyl required if you need
142 /// access to authentication data in 'AuthInfo'.
143 pub async fn login(&self) -> Result<AuthInfo, Error> {
144 self.auth.listen().await
145 }
146
147 fn get_password(_username: &str) -> Result<String, Error> {
148 use std::env::VarError::*;
149 match std::env::var("PBS_PASSWORD") {
150 Ok(p) => return Ok(p),
151 Err(NotUnicode(_)) => bail!("PBS_PASSWORD contains bad characters"),
152 Err(NotPresent) => {
153 // Try another method
154 }
155 }
156
157 // If we're on a TTY, query the user for a password
158 if tty::stdin_isatty() {
159 return Ok(String::from_utf8(tty::read_password("Password: ")?)?);
160 }
161
162 bail!("no password input mechanism available");
163 }
164
165 fn build_client() -> Client<HttpsConnector> {
166
167 let mut ssl_connector_builder = SslConnector::builder(SslMethod::tls()).unwrap();
168
169 ssl_connector_builder.set_verify(openssl::ssl::SslVerifyMode::NONE); // fixme!
170
171 let mut httpc = hyper::client::HttpConnector::new();
172 httpc.set_nodelay(true); // important for h2 download performance!
173 httpc.set_recv_buffer_size(Some(1024*1024)); //important for h2 download performance!
174 httpc.enforce_http(false); // we want https...
175
176 let https = HttpsConnector::with_connector(httpc, ssl_connector_builder.build());
177
178 Client::builder()
179 //.http2_initial_stream_window_size( (1 << 31) - 2)
180 //.http2_initial_connection_window_size( (1 << 31) - 2)
181 .build::<_, Body>(https)
182 }
183
184 pub async fn request(&self, mut req: Request<Body>) -> Result<Value, Error> {
185
186 let client = self.client.clone();
187
188 let auth = self.login().await?;
189
190 let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
191 req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
192 req.headers_mut().insert("CSRFPreventionToken", HeaderValue::from_str(&auth.token).unwrap());
193
194 Self::api_request(client, req).await
195 }
196
197 pub async fn get(
198 &self,
199 path: &str,
200 data: Option<Value>,
201 ) -> Result<Value, Error> {
202 let req = Self::request_builder(&self.server, "GET", path, data).unwrap();
203 self.request(req).await
204 }
205
206 pub async fn delete(
207 &mut self,
208 path: &str,
209 data: Option<Value>,
210 ) -> Result<Value, Error> {
211 let req = Self::request_builder(&self.server, "DELETE", path, data).unwrap();
212 self.request(req).await
213 }
214
215 pub async fn post(
216 &mut self,
217 path: &str,
218 data: Option<Value>,
219 ) -> Result<Value, Error> {
220 let req = Self::request_builder(&self.server, "POST", path, data).unwrap();
221 self.request(req).await
222 }
223
224 pub async fn download(
225 &mut self,
226 path: &str,
227 output: &mut (dyn Write + Send),
228 ) -> Result<(), Error> {
229 let mut req = Self::request_builder(&self.server, "GET", path, None).unwrap();
230
231 let client = self.client.clone();
232
233 let auth = self.login().await?;
234
235 let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
236 req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
237
238 let resp = client.request(req).await?;
239 let status = resp.status();
240 if !status.is_success() {
241 HttpClient::api_response(resp)
242 .map(|_| Err(format_err!("unknown error")))
243 .await?
244 } else {
245 resp.into_body()
246 .map_err(Error::from)
247 .try_fold(output, move |acc, chunk| async move {
248 acc.write_all(&chunk)?;
249 Ok::<_, Error>(acc)
250 })
251 .await?;
252 }
253 Ok(())
254 }
255
256 pub async fn upload(
257 &mut self,
258 content_type: &str,
259 body: Body,
260 path: &str,
261 data: Option<Value>,
262 ) -> Result<Value, Error> {
263
264 let path = path.trim_matches('/');
265 let mut url = format!("https://{}:8007/{}", &self.server, path);
266
267 if let Some(data) = data {
268 let query = tools::json_object_to_query(data).unwrap();
269 url.push('?');
270 url.push_str(&query);
271 }
272
273 let url: Uri = url.parse().unwrap();
274
275 let req = Request::builder()
276 .method("POST")
277 .uri(url)
278 .header("User-Agent", "proxmox-backup-client/1.0")
279 .header("Content-Type", content_type)
280 .body(body).unwrap();
281
282 self.request(req).await
283 }
284
285 pub async fn start_h2_connection(
286 &self,
287 mut req: Request<Body>,
288 protocol_name: String,
289 ) -> Result<(H2Client, Canceller), Error> {
290
291 let auth = self.login().await?;
292 let client = self.client.clone();
293
294 let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
295 req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
296 req.headers_mut().insert("UPGRADE", HeaderValue::from_str(&protocol_name).unwrap());
297
298 let resp = client.request(req).await?;
299 let status = resp.status();
300
301 if status != http::StatusCode::SWITCHING_PROTOCOLS {
302 Self::api_response(resp)
303 .map(|_| Err(format_err!("unknown error")))
304 .await?;
305 unreachable!();
306 }
307
308 let upgraded = resp
309 .into_body()
310 .on_upgrade()
311 .await?;
312
313 let max_window_size = (1 << 31) - 2;
314
315 let (h2, connection) = h2::client::Builder::new()
316 .initial_connection_window_size(max_window_size)
317 .initial_window_size(max_window_size)
318 .max_frame_size(4*1024*1024)
319 .handshake(upgraded)
320 .await?;
321
322 let connection = connection
323 .map_err(|_| panic!("HTTP/2.0 connection failed"));
324
325 let (connection, canceller) = cancellable(connection)?;
326 // A cancellable future returns an Option which is None when cancelled and
327 // Some when it finished instead, since we don't care about the return type we
328 // need to map it away:
329 let connection = connection.map(|_| ());
330
331 // Spawn a new task to drive the connection state
332 hyper::rt::spawn(connection);
333
334 // Wait until the `SendRequest` handle has available capacity.
335 let c = h2.ready().await?;
336 Ok((H2Client::new(c), canceller))
337 }
338
339 async fn credentials(
340 client: Client<HttpsConnector>,
341 server: String,
342 username: String,
343 password: String,
344 ) -> Result<AuthInfo, Error> {
345 let data = json!({ "username": username, "password": password });
346 let req = Self::request_builder(&server, "POST", "/api2/json/access/ticket", Some(data)).unwrap();
347 let cred = Self::api_request(client, req).await?;
348 let auth = AuthInfo {
349 username: cred["data"]["username"].as_str().unwrap().to_owned(),
350 ticket: cred["data"]["ticket"].as_str().unwrap().to_owned(),
351 token: cred["data"]["CSRFPreventionToken"].as_str().unwrap().to_owned(),
352 };
353
354 let _ = store_ticket_info(&server, &auth.username, &auth.ticket, &auth.token);
355
356 Ok(auth)
357 }
358
359 async fn api_response(response: Response<Body>) -> Result<Value, Error> {
360 let status = response.status();
361 let data = response
362 .into_body()
363 .try_concat()
364 .await?;
365
366 let text = String::from_utf8(data.to_vec()).unwrap();
367 if status.is_success() {
368 if text.is_empty() {
369 Ok(Value::Null)
370 } else {
371 let value: Value = serde_json::from_str(&text)?;
372 Ok(value)
373 }
374 } else {
375 bail!("HTTP Error {}: {}", status, text);
376 }
377 }
378
379 async fn api_request(
380 client: Client<HttpsConnector>,
381 req: Request<Body>
382 ) -> Result<Value, Error> {
383
384 client.request(req)
385 .map_err(Error::from)
386 .and_then(Self::api_response)
387 .await
388 }
389
390 // Read-only access to server property
391 pub fn server(&self) -> &str {
392 &self.server
393 }
394
395 pub fn request_builder(server: &str, method: &str, path: &str, data: Option<Value>) -> Result<Request<Body>, Error> {
396 let path = path.trim_matches('/');
397 let url: Uri = format!("https://{}:8007/{}", server, path).parse()?;
398
399 if let Some(data) = data {
400 if method == "POST" {
401 let request = Request::builder()
402 .method(method)
403 .uri(url)
404 .header("User-Agent", "proxmox-backup-client/1.0")
405 .header(hyper::header::CONTENT_TYPE, "application/json")
406 .body(Body::from(data.to_string()))?;
407 return Ok(request);
408 } else {
409 let query = tools::json_object_to_query(data)?;
410 let url: Uri = format!("https://{}:8007/{}?{}", server, path, query).parse()?;
411 let request = Request::builder()
412 .method(method)
413 .uri(url)
414 .header("User-Agent", "proxmox-backup-client/1.0")
415 .header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded")
416 .body(Body::empty())?;
417 return Ok(request);
418 }
419 }
420
421 let request = Request::builder()
422 .method(method)
423 .uri(url)
424 .header("User-Agent", "proxmox-backup-client/1.0")
425 .header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded")
426 .body(Body::empty())?;
427
428 Ok(request)
429 }
430 }
431
432
433 #[derive(Clone)]
434 pub struct H2Client {
435 h2: h2::client::SendRequest<bytes::Bytes>,
436 }
437
438 impl H2Client {
439
440 pub fn new(h2: h2::client::SendRequest<bytes::Bytes>) -> Self {
441 Self { h2 }
442 }
443
444 pub async fn get(
445 &self,
446 path: &str,
447 param: Option<Value>
448 ) -> Result<Value, Error> {
449 let req = Self::request_builder("localhost", "GET", path, param, None).unwrap();
450 self.request(req).await
451 }
452
453 pub async fn put(
454 &self,
455 path: &str,
456 param: Option<Value>
457 ) -> Result<Value, Error> {
458 let req = Self::request_builder("localhost", "PUT", path, param, None).unwrap();
459 self.request(req).await
460 }
461
462 pub async fn post(
463 &self,
464 path: &str,
465 param: Option<Value>
466 ) -> Result<Value, Error> {
467 let req = Self::request_builder("localhost", "POST", path, param, None).unwrap();
468 self.request(req).await
469 }
470
471 pub async fn download<W: Write + Send>(
472 &self,
473 path: &str,
474 param: Option<Value>,
475 mut output: W,
476 ) -> Result<W, Error> {
477 let request = Self::request_builder("localhost", "GET", path, param, None).unwrap();
478
479 let response_future = self.send_request(request, None).await?;
480
481 let resp = response_future.await?;
482
483 let status = resp.status();
484 if !status.is_success() {
485 H2Client::h2api_response(resp).await?; // raise error
486 unreachable!();
487 }
488
489 let mut body = resp.into_body();
490 let mut release_capacity = body.release_capacity().clone();
491
492 while let Some(chunk) = body.try_next().await? {
493 let _ = release_capacity.release_capacity(chunk.len());
494 output.write_all(&chunk)?;
495 }
496
497 Ok(output)
498 }
499
500 pub async fn upload(
501 &self,
502 method: &str, // POST or PUT
503 path: &str,
504 param: Option<Value>,
505 content_type: &str,
506 data: Vec<u8>,
507 ) -> Result<Value, Error> {
508 let request = Self::request_builder("localhost", method, path, param, Some(content_type)).unwrap();
509
510 let mut send_request = self.h2.clone().ready().await?;
511
512 let (response, stream) = send_request.send_request(request, false).unwrap();
513
514 PipeToSendStream::new(bytes::Bytes::from(data), stream).await?;
515
516 response
517 .map_err(Error::from)
518 .and_then(Self::h2api_response)
519 .await
520 }
521
522 async fn request(
523 &self,
524 request: Request<()>,
525 ) -> Result<Value, Error> {
526
527 self.send_request(request, None)
528 .and_then(move |response| {
529 response
530 .map_err(Error::from)
531 .and_then(Self::h2api_response)
532 })
533 .await
534 }
535
536 pub fn send_request(
537 &self,
538 request: Request<()>,
539 data: Option<bytes::Bytes>,
540 ) -> impl Future<Output = Result<h2::client::ResponseFuture, Error>> {
541
542 self.h2.clone()
543 .ready()
544 .map_err(Error::from)
545 .and_then(move |mut send_request| async move {
546 if let Some(data) = data {
547 let (response, stream) = send_request.send_request(request, false).unwrap();
548 PipeToSendStream::new(data, stream).await?;
549 Ok(response)
550 } else {
551 let (response, _stream) = send_request.send_request(request, true).unwrap();
552 Ok(response)
553 }
554 })
555 }
556
557 pub async fn h2api_response(
558 response: Response<h2::RecvStream>,
559 ) -> Result<Value, Error> {
560 let status = response.status();
561
562 let (_head, mut body) = response.into_parts();
563
564 // The `release_capacity` handle allows the caller to manage
565 // flow control.
566 //
567 // Whenever data is received, the caller is responsible for
568 // releasing capacity back to the server once it has freed
569 // the data from memory.
570 let mut release_capacity = body.release_capacity().clone();
571
572 let mut data = Vec::new();
573 while let Some(chunk) = body.try_next().await? {
574 // Let the server send more data.
575 let _ = release_capacity.release_capacity(chunk.len());
576 data.extend(chunk);
577 }
578
579 let text = String::from_utf8(data.to_vec()).unwrap();
580 if status.is_success() {
581 if text.is_empty() {
582 Ok(Value::Null)
583 } else {
584 let mut value: Value = serde_json::from_str(&text)?;
585 if let Some(map) = value.as_object_mut() {
586 if let Some(data) = map.remove("data") {
587 return Ok(data);
588 }
589 }
590 bail!("got result without data property");
591 }
592 } else {
593 bail!("HTTP Error {}: {}", status, text);
594 }
595 }
596
597 // Note: We always encode parameters with the url
598 pub fn request_builder(
599 server: &str,
600 method: &str,
601 path: &str,
602 param: Option<Value>,
603 content_type: Option<&str>,
604 ) -> Result<Request<()>, Error> {
605 let path = path.trim_matches('/');
606
607 let content_type = content_type.unwrap_or("application/x-www-form-urlencoded");
608
609 if let Some(param) = param {
610 let query = tools::json_object_to_query(param)?;
611 // We detected problem with hyper around 6000 characters - seo we try to keep on the safe side
612 if query.len() > 4096 { bail!("h2 query data too large ({} bytes) - please encode data inside body", query.len()); }
613 let url: Uri = format!("https://{}:8007/{}?{}", server, path, query).parse()?;
614 let request = Request::builder()
615 .method(method)
616 .uri(url)
617 .header("User-Agent", "proxmox-backup-client/1.0")
618 .header(hyper::header::CONTENT_TYPE, content_type)
619 .body(())?;
620 Ok(request)
621 } else {
622 let url: Uri = format!("https://{}:8007/{}", server, path).parse()?;
623 let request = Request::builder()
624 .method(method)
625 .uri(url)
626 .header("User-Agent", "proxmox-backup-client/1.0")
627 .header(hyper::header::CONTENT_TYPE, content_type)
628 .body(())?;
629
630 Ok(request)
631 }
632 }
633 }
634
635 pub struct HttpsConnector {
636 http: HttpConnector,
637 ssl_connector: SslConnector,
638 }
639
640 impl HttpsConnector {
641 pub fn with_connector(mut http: HttpConnector, ssl_connector: SslConnector) -> Self {
642 http.enforce_http(false);
643
644 Self {
645 http,
646 ssl_connector,
647 }
648 }
649 }
650
651 type MaybeTlsStream = EitherStream<
652 tokio::net::TcpStream,
653 tokio_openssl::SslStream<tokio::net::TcpStream>,
654 >;
655
656 impl hyper::client::connect::Connect for HttpsConnector {
657 type Transport = MaybeTlsStream;
658 type Error = Error;
659 type Future = Box<dyn Future<Output = Result<(
660 Self::Transport,
661 hyper::client::connect::Connected,
662 ), Error>> + Send + Unpin + 'static>;
663
664 fn connect(&self, dst: hyper::client::connect::Destination) -> Self::Future {
665 let is_https = dst.scheme() == "https";
666 let host = dst.host().to_string();
667
668 let config = self.ssl_connector.configure();
669 let conn = self.http.connect(dst);
670
671 Box::new(Box::pin(async move {
672 let (conn, connected) = conn.await?;
673 if is_https {
674 let conn = tokio_openssl::connect(config?, &host, conn).await?;
675 Ok((MaybeTlsStream::Right(conn), connected))
676 } else {
677 Ok((MaybeTlsStream::Left(conn), connected))
678 }
679 }))
680 }
681 }