]> git.proxmox.com Git - proxmox-backup.git/blob - src/client/http_client.rs
update a chunk of stuff to the hyper release
[proxmox-backup.git] / src / client / http_client.rs
1 use std::io::Write;
2 use std::task::{Context, Poll};
3
4 use chrono::Utc;
5 use failure::*;
6 use futures::*;
7 use http::Uri;
8 use http::header::HeaderValue;
9 use http::{Request, Response};
10 use hyper::Body;
11 use hyper::client::{Client, HttpConnector};
12 use openssl::ssl::{SslConnector, SslMethod};
13 use serde_json::{json, Value};
14 use url::percent_encoding::{percent_encode, DEFAULT_ENCODE_SET};
15 use xdg::BaseDirectories;
16
17 use proxmox::tools::{
18 fs::{file_get_json, file_set_contents},
19 };
20
21 use super::pipe_to_stream::PipeToSendStream;
22 use crate::tools::async_io::EitherStream;
23 use crate::tools::futures::{cancellable, Canceller};
24 use crate::tools::{self, tty, BroadcastFuture};
25
26 #[derive(Clone)]
27 pub struct AuthInfo {
28 username: String,
29 ticket: String,
30 token: String,
31 }
32
33 /// HTTP(S) API client
34 pub struct HttpClient {
35 client: Client<HttpsConnector>,
36 server: String,
37 auth: BroadcastFuture<AuthInfo>,
38 }
39
40 /// Delete stored ticket data (logout)
41 pub fn delete_ticket_info(server: &str, username: &str) -> Result<(), Error> {
42
43 let base = BaseDirectories::with_prefix("proxmox-backup")?;
44
45 // usually /run/user/<uid>/...
46 let path = base.place_runtime_file("tickets")?;
47
48 let mode = nix::sys::stat::Mode::from_bits_truncate(0o0600);
49
50 let mut data = file_get_json(&path, Some(json!({})))?;
51
52 if let Some(map) = data[server].as_object_mut() {
53 map.remove(username);
54 }
55
56 file_set_contents(path, data.to_string().as_bytes(), Some(mode))?;
57
58 Ok(())
59 }
60
61 fn store_ticket_info(server: &str, username: &str, ticket: &str, token: &str) -> Result<(), Error> {
62
63 let base = BaseDirectories::with_prefix("proxmox-backup")?;
64
65 // usually /run/user/<uid>/...
66 let path = base.place_runtime_file("tickets")?;
67
68 let mode = nix::sys::stat::Mode::from_bits_truncate(0o0600);
69
70 let mut data = file_get_json(&path, Some(json!({})))?;
71
72 let now = Utc::now().timestamp();
73
74 data[server][username] = json!({ "timestamp": now, "ticket": ticket, "token": token});
75
76 let mut new_data = json!({});
77
78 let ticket_lifetime = tools::ticket::TICKET_LIFETIME - 60;
79
80 let empty = serde_json::map::Map::new();
81 for (server, info) in data.as_object().unwrap_or(&empty) {
82 for (_user, uinfo) in info.as_object().unwrap_or(&empty) {
83 if let Some(timestamp) = uinfo["timestamp"].as_i64() {
84 let age = now - timestamp;
85 if age < ticket_lifetime {
86 new_data[server][username] = uinfo.clone();
87 }
88 }
89 }
90 }
91
92 file_set_contents(path, new_data.to_string().as_bytes(), Some(mode))?;
93
94 Ok(())
95 }
96
97 fn load_ticket_info(server: &str, username: &str) -> Option<(String, String)> {
98 let base = BaseDirectories::with_prefix("proxmox-backup").ok()?;
99
100 // usually /run/user/<uid>/...
101 let path = base.place_runtime_file("tickets").ok()?;
102 let data = file_get_json(&path, None).ok()?;
103 let now = Utc::now().timestamp();
104 let ticket_lifetime = tools::ticket::TICKET_LIFETIME - 60;
105 let uinfo = data[server][username].as_object()?;
106 let timestamp = uinfo["timestamp"].as_i64()?;
107 let age = now - timestamp;
108
109 if age < ticket_lifetime {
110 let ticket = uinfo["ticket"].as_str()?;
111 let token = uinfo["token"].as_str()?;
112 Some((ticket.to_owned(), token.to_owned()))
113 } else {
114 None
115 }
116 }
117
118 impl HttpClient {
119
120 pub fn new(server: &str, username: &str, password: Option<String>) -> Result<Self, Error> {
121 let client = Self::build_client();
122
123 let password = if let Some(password) = password {
124 password
125 } else if let Some((ticket, _token)) = load_ticket_info(server, username) {
126 ticket
127 } else {
128 Self::get_password(&username)?
129 };
130
131 let login_future = Self::credentials(client.clone(), server.to_owned(), username.to_owned(), password);
132
133 Ok(Self {
134 client,
135 server: String::from(server),
136 auth: BroadcastFuture::new(Box::new(login_future)),
137 })
138 }
139
140 /// Login
141 ///
142 /// Login is done on demand, so this is onyl required if you need
143 /// access to authentication data in 'AuthInfo'.
144 pub async fn login(&self) -> Result<AuthInfo, Error> {
145 self.auth.listen().await
146 }
147
148 fn get_password(_username: &str) -> Result<String, Error> {
149 use std::env::VarError::*;
150 match std::env::var("PBS_PASSWORD") {
151 Ok(p) => return Ok(p),
152 Err(NotUnicode(_)) => bail!("PBS_PASSWORD contains bad characters"),
153 Err(NotPresent) => {
154 // Try another method
155 }
156 }
157
158 // If we're on a TTY, query the user for a password
159 if tty::stdin_isatty() {
160 return Ok(String::from_utf8(tty::read_password("Password: ")?)?);
161 }
162
163 bail!("no password input mechanism available");
164 }
165
166 fn build_client() -> Client<HttpsConnector> {
167
168 let mut ssl_connector_builder = SslConnector::builder(SslMethod::tls()).unwrap();
169
170 ssl_connector_builder.set_verify(openssl::ssl::SslVerifyMode::NONE); // fixme!
171
172 let mut httpc = hyper::client::HttpConnector::new();
173 httpc.set_nodelay(true); // important for h2 download performance!
174 httpc.set_recv_buffer_size(Some(1024*1024)); //important for h2 download performance!
175 httpc.enforce_http(false); // we want https...
176
177 let https = HttpsConnector::with_connector(httpc, ssl_connector_builder.build());
178
179 Client::builder()
180 //.http2_initial_stream_window_size( (1 << 31) - 2)
181 //.http2_initial_connection_window_size( (1 << 31) - 2)
182 .build::<_, Body>(https)
183 }
184
185 pub async fn request(&self, mut req: Request<Body>) -> Result<Value, Error> {
186
187 let client = self.client.clone();
188
189 let auth = self.login().await?;
190
191 let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
192 req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
193 req.headers_mut().insert("CSRFPreventionToken", HeaderValue::from_str(&auth.token).unwrap());
194
195 Self::api_request(client, req).await
196 }
197
198 pub async fn get(
199 &self,
200 path: &str,
201 data: Option<Value>,
202 ) -> Result<Value, Error> {
203 let req = Self::request_builder(&self.server, "GET", path, data).unwrap();
204 self.request(req).await
205 }
206
207 pub async fn delete(
208 &mut self,
209 path: &str,
210 data: Option<Value>,
211 ) -> Result<Value, Error> {
212 let req = Self::request_builder(&self.server, "DELETE", path, data).unwrap();
213 self.request(req).await
214 }
215
216 pub async fn post(
217 &mut self,
218 path: &str,
219 data: Option<Value>,
220 ) -> Result<Value, Error> {
221 let req = Self::request_builder(&self.server, "POST", path, data).unwrap();
222 self.request(req).await
223 }
224
225 pub async fn download(
226 &mut self,
227 path: &str,
228 output: &mut (dyn Write + Send),
229 ) -> Result<(), Error> {
230 let mut req = Self::request_builder(&self.server, "GET", path, None).unwrap();
231
232 let client = self.client.clone();
233
234 let auth = self.login().await?;
235
236 let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
237 req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
238
239 let resp = client.request(req).await?;
240 let status = resp.status();
241 if !status.is_success() {
242 HttpClient::api_response(resp)
243 .map(|_| Err(format_err!("unknown error")))
244 .await?
245 } else {
246 resp.into_body()
247 .map_err(Error::from)
248 .try_fold(output, move |acc, chunk| async move {
249 acc.write_all(&chunk)?;
250 Ok::<_, Error>(acc)
251 })
252 .await?;
253 }
254 Ok(())
255 }
256
257 pub async fn upload(
258 &mut self,
259 content_type: &str,
260 body: Body,
261 path: &str,
262 data: Option<Value>,
263 ) -> Result<Value, Error> {
264
265 let path = path.trim_matches('/');
266 let mut url = format!("https://{}:8007/{}", &self.server, path);
267
268 if let Some(data) = data {
269 let query = tools::json_object_to_query(data).unwrap();
270 url.push('?');
271 url.push_str(&query);
272 }
273
274 let url: Uri = url.parse().unwrap();
275
276 let req = Request::builder()
277 .method("POST")
278 .uri(url)
279 .header("User-Agent", "proxmox-backup-client/1.0")
280 .header("Content-Type", content_type)
281 .body(body).unwrap();
282
283 self.request(req).await
284 }
285
286 pub async fn start_h2_connection(
287 &self,
288 mut req: Request<Body>,
289 protocol_name: String,
290 ) -> Result<(H2Client, Canceller), Error> {
291
292 let auth = self.login().await?;
293 let client = self.client.clone();
294
295 let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
296 req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
297 req.headers_mut().insert("UPGRADE", HeaderValue::from_str(&protocol_name).unwrap());
298
299 let resp = client.request(req).await?;
300 let status = resp.status();
301
302 if status != http::StatusCode::SWITCHING_PROTOCOLS {
303 Self::api_response(resp)
304 .map(|_| Err(format_err!("unknown error")))
305 .await?;
306 unreachable!();
307 }
308
309 let upgraded = resp
310 .into_body()
311 .on_upgrade()
312 .await?;
313
314 let max_window_size = (1 << 31) - 2;
315
316 let (h2, connection) = h2::client::Builder::new()
317 .initial_connection_window_size(max_window_size)
318 .initial_window_size(max_window_size)
319 .max_frame_size(4*1024*1024)
320 .handshake(upgraded)
321 .await?;
322
323 let connection = connection
324 .map_err(|_| panic!("HTTP/2.0 connection failed"));
325
326 let (connection, canceller) = cancellable(connection)?;
327 // A cancellable future returns an Option which is None when cancelled and
328 // Some when it finished instead, since we don't care about the return type we
329 // need to map it away:
330 let connection = connection.map(|_| ());
331
332 // Spawn a new task to drive the connection state
333 tokio::spawn(connection);
334
335 // Wait until the `SendRequest` handle has available capacity.
336 let c = h2.ready().await?;
337 Ok((H2Client::new(c), canceller))
338 }
339
340 async fn credentials(
341 client: Client<HttpsConnector>,
342 server: String,
343 username: String,
344 password: String,
345 ) -> Result<AuthInfo, Error> {
346 let data = json!({ "username": username, "password": password });
347 let req = Self::request_builder(&server, "POST", "/api2/json/access/ticket", Some(data)).unwrap();
348 let cred = Self::api_request(client, req).await?;
349 let auth = AuthInfo {
350 username: cred["data"]["username"].as_str().unwrap().to_owned(),
351 ticket: cred["data"]["ticket"].as_str().unwrap().to_owned(),
352 token: cred["data"]["CSRFPreventionToken"].as_str().unwrap().to_owned(),
353 };
354
355 let _ = store_ticket_info(&server, &auth.username, &auth.ticket, &auth.token);
356
357 Ok(auth)
358 }
359
360 async fn api_response(response: Response<Body>) -> Result<Value, Error> {
361 let status = response.status();
362 let data = hyper::body::to_bytes(response.into_body()).await?;
363
364 let text = String::from_utf8(data.to_vec()).unwrap();
365 if status.is_success() {
366 if text.is_empty() {
367 Ok(Value::Null)
368 } else {
369 let value: Value = serde_json::from_str(&text)?;
370 Ok(value)
371 }
372 } else {
373 bail!("HTTP Error {}: {}", status, text);
374 }
375 }
376
377 async fn api_request(
378 client: Client<HttpsConnector>,
379 req: Request<Body>
380 ) -> Result<Value, Error> {
381
382 client.request(req)
383 .map_err(Error::from)
384 .and_then(Self::api_response)
385 .await
386 }
387
388 // Read-only access to server property
389 pub fn server(&self) -> &str {
390 &self.server
391 }
392
393 pub fn request_builder(server: &str, method: &str, path: &str, data: Option<Value>) -> Result<Request<Body>, Error> {
394 let path = path.trim_matches('/');
395 let url: Uri = format!("https://{}:8007/{}", server, path).parse()?;
396
397 if let Some(data) = data {
398 if method == "POST" {
399 let request = Request::builder()
400 .method(method)
401 .uri(url)
402 .header("User-Agent", "proxmox-backup-client/1.0")
403 .header(hyper::header::CONTENT_TYPE, "application/json")
404 .body(Body::from(data.to_string()))?;
405 return Ok(request);
406 } else {
407 let query = tools::json_object_to_query(data)?;
408 let url: Uri = format!("https://{}:8007/{}?{}", server, path, query).parse()?;
409 let request = Request::builder()
410 .method(method)
411 .uri(url)
412 .header("User-Agent", "proxmox-backup-client/1.0")
413 .header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded")
414 .body(Body::empty())?;
415 return Ok(request);
416 }
417 }
418
419 let request = Request::builder()
420 .method(method)
421 .uri(url)
422 .header("User-Agent", "proxmox-backup-client/1.0")
423 .header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded")
424 .body(Body::empty())?;
425
426 Ok(request)
427 }
428 }
429
430
431 #[derive(Clone)]
432 pub struct H2Client {
433 h2: h2::client::SendRequest<bytes::Bytes>,
434 }
435
436 impl H2Client {
437
438 pub fn new(h2: h2::client::SendRequest<bytes::Bytes>) -> Self {
439 Self { h2 }
440 }
441
442 pub async fn get(
443 &self,
444 path: &str,
445 param: Option<Value>
446 ) -> Result<Value, Error> {
447 let req = Self::request_builder("localhost", "GET", path, param, None).unwrap();
448 self.request(req).await
449 }
450
451 pub async fn put(
452 &self,
453 path: &str,
454 param: Option<Value>
455 ) -> Result<Value, Error> {
456 let req = Self::request_builder("localhost", "PUT", path, param, None).unwrap();
457 self.request(req).await
458 }
459
460 pub async fn post(
461 &self,
462 path: &str,
463 param: Option<Value>
464 ) -> Result<Value, Error> {
465 let req = Self::request_builder("localhost", "POST", path, param, None).unwrap();
466 self.request(req).await
467 }
468
469 pub async fn download<W: Write + Send>(
470 &self,
471 path: &str,
472 param: Option<Value>,
473 mut output: W,
474 ) -> Result<W, Error> {
475 let request = Self::request_builder("localhost", "GET", path, param, None).unwrap();
476
477 let response_future = self.send_request(request, None).await?;
478
479 let resp = response_future.await?;
480
481 let status = resp.status();
482 if !status.is_success() {
483 H2Client::h2api_response(resp).await?; // raise error
484 unreachable!();
485 }
486
487 let mut body = resp.into_body();
488 while let Some(chunk) = body.data().await {
489 let chunk = chunk?;
490 body.flow_control().release_capacity(chunk.len())?;
491 output.write_all(&chunk)?;
492 }
493
494 Ok(output)
495 }
496
497 pub async fn upload(
498 &self,
499 method: &str, // POST or PUT
500 path: &str,
501 param: Option<Value>,
502 content_type: &str,
503 data: Vec<u8>,
504 ) -> Result<Value, Error> {
505 let request = Self::request_builder("localhost", method, path, param, Some(content_type)).unwrap();
506
507 let mut send_request = self.h2.clone().ready().await?;
508
509 let (response, stream) = send_request.send_request(request, false).unwrap();
510
511 PipeToSendStream::new(bytes::Bytes::from(data), stream).await?;
512
513 response
514 .map_err(Error::from)
515 .and_then(Self::h2api_response)
516 .await
517 }
518
519 async fn request(
520 &self,
521 request: Request<()>,
522 ) -> Result<Value, Error> {
523
524 self.send_request(request, None)
525 .and_then(move |response| {
526 response
527 .map_err(Error::from)
528 .and_then(Self::h2api_response)
529 })
530 .await
531 }
532
533 pub fn send_request(
534 &self,
535 request: Request<()>,
536 data: Option<bytes::Bytes>,
537 ) -> impl Future<Output = Result<h2::client::ResponseFuture, Error>> {
538
539 self.h2.clone()
540 .ready()
541 .map_err(Error::from)
542 .and_then(move |mut send_request| async move {
543 if let Some(data) = data {
544 let (response, stream) = send_request.send_request(request, false).unwrap();
545 PipeToSendStream::new(data, stream).await?;
546 Ok(response)
547 } else {
548 let (response, _stream) = send_request.send_request(request, true).unwrap();
549 Ok(response)
550 }
551 })
552 }
553
554 pub async fn h2api_response(
555 response: Response<h2::RecvStream>,
556 ) -> Result<Value, Error> {
557 let status = response.status();
558
559 let (_head, mut body) = response.into_parts();
560
561 let mut data = Vec::new();
562 while let Some(chunk) = body.data().await {
563 let chunk = chunk?;
564 // Whenever data is received, the caller is responsible for
565 // releasing capacity back to the server once it has freed
566 // the data from memory.
567 // Let the server send more data.
568 body.flow_control().release_capacity(chunk.len())?;
569 data.extend(chunk);
570 }
571
572 let text = String::from_utf8(data.to_vec()).unwrap();
573 if status.is_success() {
574 if text.is_empty() {
575 Ok(Value::Null)
576 } else {
577 let mut value: Value = serde_json::from_str(&text)?;
578 if let Some(map) = value.as_object_mut() {
579 if let Some(data) = map.remove("data") {
580 return Ok(data);
581 }
582 }
583 bail!("got result without data property");
584 }
585 } else {
586 bail!("HTTP Error {}: {}", status, text);
587 }
588 }
589
590 // Note: We always encode parameters with the url
591 pub fn request_builder(
592 server: &str,
593 method: &str,
594 path: &str,
595 param: Option<Value>,
596 content_type: Option<&str>,
597 ) -> Result<Request<()>, Error> {
598 let path = path.trim_matches('/');
599
600 let content_type = content_type.unwrap_or("application/x-www-form-urlencoded");
601
602 if let Some(param) = param {
603 let query = tools::json_object_to_query(param)?;
604 // We detected problem with hyper around 6000 characters - seo we try to keep on the safe side
605 if query.len() > 4096 { bail!("h2 query data too large ({} bytes) - please encode data inside body", query.len()); }
606 let url: Uri = format!("https://{}:8007/{}?{}", server, path, query).parse()?;
607 let request = Request::builder()
608 .method(method)
609 .uri(url)
610 .header("User-Agent", "proxmox-backup-client/1.0")
611 .header(hyper::header::CONTENT_TYPE, content_type)
612 .body(())?;
613 Ok(request)
614 } else {
615 let url: Uri = format!("https://{}:8007/{}", server, path).parse()?;
616 let request = Request::builder()
617 .method(method)
618 .uri(url)
619 .header("User-Agent", "proxmox-backup-client/1.0")
620 .header(hyper::header::CONTENT_TYPE, content_type)
621 .body(())?;
622
623 Ok(request)
624 }
625 }
626 }
627
628 #[derive(Clone)]
629 pub struct HttpsConnector {
630 http: HttpConnector,
631 ssl_connector: std::sync::Arc<SslConnector>,
632 }
633
634 impl HttpsConnector {
635 pub fn with_connector(mut http: HttpConnector, ssl_connector: SslConnector) -> Self {
636 http.enforce_http(false);
637
638 Self {
639 http,
640 ssl_connector: std::sync::Arc::new(ssl_connector),
641 }
642 }
643 }
644
645 type MaybeTlsStream = EitherStream<
646 tokio::net::TcpStream,
647 tokio_openssl::SslStream<tokio::net::TcpStream>,
648 >;
649
650 impl hyper::service::Service<Uri> for HttpsConnector {
651 type Response = MaybeTlsStream;
652 type Error = Error;
653 type Future = std::pin::Pin<Box<
654 dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static
655 >>;
656
657 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
658 // This connector is always ready, but others might not be.
659 Poll::Ready(Ok(()))
660 }
661
662 fn call(&mut self, dst: Uri) -> Self::Future {
663 let mut this = self.clone();
664 async move {
665 let is_https = dst
666 .scheme()
667 .ok_or_else(|| format_err!("missing URL scheme"))?
668 == "https";
669 let host = dst
670 .host()
671 .ok_or_else(|| format_err!("missing hostname in destination url?"))?
672 .to_string();
673
674 let config = this.ssl_connector.configure();
675 let conn = this.http.call(dst).await?;
676 if is_https {
677 let conn = tokio_openssl::connect(config?, &host, conn).await?;
678 Ok(MaybeTlsStream::Right(conn))
679 } else {
680 Ok(MaybeTlsStream::Left(conn))
681 }
682 }.boxed()
683 }
684 }