2 use std
::task
::{Context, Poll}
;
8 use http
::header
::HeaderValue
;
9 use http
::{Request, Response}
;
11 use hyper
::client
::{Client, HttpConnector}
;
12 use openssl
::ssl
::{SslConnector, SslMethod}
;
13 use serde_json
::{json, Value}
;
14 use url
::percent_encoding
::{percent_encode, DEFAULT_ENCODE_SET}
;
15 use xdg
::BaseDirectories
;
18 fs
::{file_get_json, file_set_contents}
,
21 use super::pipe_to_stream
::PipeToSendStream
;
22 use crate::tools
::async_io
::EitherStream
;
23 use crate::tools
::futures
::{cancellable, Canceller}
;
24 use crate::tools
::{self, tty, BroadcastFuture}
;
33 /// HTTP(S) API client
34 pub struct HttpClient
{
35 client
: Client
<HttpsConnector
>,
37 auth
: BroadcastFuture
<AuthInfo
>,
40 /// Delete stored ticket data (logout)
41 pub fn delete_ticket_info(server
: &str, username
: &str) -> Result
<(), Error
> {
43 let base
= BaseDirectories
::with_prefix("proxmox-backup")?
;
45 // usually /run/user/<uid>/...
46 let path
= base
.place_runtime_file("tickets")?
;
48 let mode
= nix
::sys
::stat
::Mode
::from_bits_truncate(0o0600);
50 let mut data
= file_get_json(&path
, Some(json
!({}
)))?
;
52 if let Some(map
) = data
[server
].as_object_mut() {
56 file_set_contents(path
, data
.to_string().as_bytes(), Some(mode
))?
;
61 fn store_ticket_info(server
: &str, username
: &str, ticket
: &str, token
: &str) -> Result
<(), Error
> {
63 let base
= BaseDirectories
::with_prefix("proxmox-backup")?
;
65 // usually /run/user/<uid>/...
66 let path
= base
.place_runtime_file("tickets")?
;
68 let mode
= nix
::sys
::stat
::Mode
::from_bits_truncate(0o0600);
70 let mut data
= file_get_json(&path
, Some(json
!({}
)))?
;
72 let now
= Utc
::now().timestamp();
74 data
[server
][username
] = json
!({ "timestamp": now, "ticket": ticket, "token": token}
);
76 let mut new_data
= json
!({}
);
78 let ticket_lifetime
= tools
::ticket
::TICKET_LIFETIME
- 60;
80 let empty
= serde_json
::map
::Map
::new();
81 for (server
, info
) in data
.as_object().unwrap_or(&empty
) {
82 for (_user
, uinfo
) in info
.as_object().unwrap_or(&empty
) {
83 if let Some(timestamp
) = uinfo
["timestamp"].as_i64() {
84 let age
= now
- timestamp
;
85 if age
< ticket_lifetime
{
86 new_data
[server
][username
] = uinfo
.clone();
92 file_set_contents(path
, new_data
.to_string().as_bytes(), Some(mode
))?
;
97 fn load_ticket_info(server
: &str, username
: &str) -> Option
<(String
, String
)> {
98 let base
= BaseDirectories
::with_prefix("proxmox-backup").ok()?
;
100 // usually /run/user/<uid>/...
101 let path
= base
.place_runtime_file("tickets").ok()?
;
102 let data
= file_get_json(&path
, None
).ok()?
;
103 let now
= Utc
::now().timestamp();
104 let ticket_lifetime
= tools
::ticket
::TICKET_LIFETIME
- 60;
105 let uinfo
= data
[server
][username
].as_object()?
;
106 let timestamp
= uinfo
["timestamp"].as_i64()?
;
107 let age
= now
- timestamp
;
109 if age
< ticket_lifetime
{
110 let ticket
= uinfo
["ticket"].as_str()?
;
111 let token
= uinfo
["token"].as_str()?
;
112 Some((ticket
.to_owned(), token
.to_owned()))
120 pub fn new(server
: &str, username
: &str, password
: Option
<String
>) -> Result
<Self, Error
> {
121 let client
= Self::build_client();
123 let password
= if let Some(password
) = password
{
125 } else if let Some((ticket
, _token
)) = load_ticket_info(server
, username
) {
128 Self::get_password(&username
)?
131 let login_future
= Self::credentials(client
.clone(), server
.to_owned(), username
.to_owned(), password
);
135 server
: String
::from(server
),
136 auth
: BroadcastFuture
::new(Box
::new(login_future
)),
142 /// Login is done on demand, so this is onyl required if you need
143 /// access to authentication data in 'AuthInfo'.
144 pub async
fn login(&self) -> Result
<AuthInfo
, Error
> {
145 self.auth
.listen().await
148 fn get_password(_username
: &str) -> Result
<String
, Error
> {
149 use std
::env
::VarError
::*;
150 match std
::env
::var("PBS_PASSWORD") {
151 Ok(p
) => return Ok(p
),
152 Err(NotUnicode(_
)) => bail
!("PBS_PASSWORD contains bad characters"),
154 // Try another method
158 // If we're on a TTY, query the user for a password
159 if tty
::stdin_isatty() {
160 return Ok(String
::from_utf8(tty
::read_password("Password: ")?
)?
);
163 bail
!("no password input mechanism available");
166 fn build_client() -> Client
<HttpsConnector
> {
168 let mut ssl_connector_builder
= SslConnector
::builder(SslMethod
::tls()).unwrap();
170 ssl_connector_builder
.set_verify(openssl
::ssl
::SslVerifyMode
::NONE
); // fixme!
172 let mut httpc
= hyper
::client
::HttpConnector
::new();
173 httpc
.set_nodelay(true); // important for h2 download performance!
174 httpc
.set_recv_buffer_size(Some(1024*1024)); //important for h2 download performance!
175 httpc
.enforce_http(false); // we want https...
177 let https
= HttpsConnector
::with_connector(httpc
, ssl_connector_builder
.build());
180 //.http2_initial_stream_window_size( (1 << 31) - 2)
181 //.http2_initial_connection_window_size( (1 << 31) - 2)
182 .build
::<_
, Body
>(https
)
185 pub async
fn request(&self, mut req
: Request
<Body
>) -> Result
<Value
, Error
> {
187 let client
= self.client
.clone();
189 let auth
= self.login().await?
;
191 let enc_ticket
= format
!("PBSAuthCookie={}", percent_encode(auth
.ticket
.as_bytes(), DEFAULT_ENCODE_SET
));
192 req
.headers_mut().insert("Cookie", HeaderValue
::from_str(&enc_ticket
).unwrap());
193 req
.headers_mut().insert("CSRFPreventionToken", HeaderValue
::from_str(&auth
.token
).unwrap());
195 Self::api_request(client
, req
).await
202 ) -> Result
<Value
, Error
> {
203 let req
= Self::request_builder(&self.server
, "GET", path
, data
).unwrap();
204 self.request(req
).await
211 ) -> Result
<Value
, Error
> {
212 let req
= Self::request_builder(&self.server
, "DELETE", path
, data
).unwrap();
213 self.request(req
).await
220 ) -> Result
<Value
, Error
> {
221 let req
= Self::request_builder(&self.server
, "POST", path
, data
).unwrap();
222 self.request(req
).await
225 pub async
fn download(
228 output
: &mut (dyn Write
+ Send
),
229 ) -> Result
<(), Error
> {
230 let mut req
= Self::request_builder(&self.server
, "GET", path
, None
).unwrap();
232 let client
= self.client
.clone();
234 let auth
= self.login().await?
;
236 let enc_ticket
= format
!("PBSAuthCookie={}", percent_encode(auth
.ticket
.as_bytes(), DEFAULT_ENCODE_SET
));
237 req
.headers_mut().insert("Cookie", HeaderValue
::from_str(&enc_ticket
).unwrap());
239 let resp
= client
.request(req
).await?
;
240 let status
= resp
.status();
241 if !status
.is_success() {
242 HttpClient
::api_response(resp
)
243 .map(|_
| Err(format_err
!("unknown error")))
247 .map_err(Error
::from
)
248 .try_fold(output
, move |acc
, chunk
| async
move {
249 acc
.write_all(&chunk
)?
;
263 ) -> Result
<Value
, Error
> {
265 let path
= path
.trim_matches('
/'
);
266 let mut url
= format
!("https://{}:8007/{}", &self.server
, path
);
268 if let Some(data
) = data
{
269 let query
= tools
::json_object_to_query(data
).unwrap();
271 url
.push_str(&query
);
274 let url
: Uri
= url
.parse().unwrap();
276 let req
= Request
::builder()
279 .header("User-Agent", "proxmox-backup-client/1.0")
280 .header("Content-Type", content_type
)
281 .body(body
).unwrap();
283 self.request(req
).await
286 pub async
fn start_h2_connection(
288 mut req
: Request
<Body
>,
289 protocol_name
: String
,
290 ) -> Result
<(H2Client
, Canceller
), Error
> {
292 let auth
= self.login().await?
;
293 let client
= self.client
.clone();
295 let enc_ticket
= format
!("PBSAuthCookie={}", percent_encode(auth
.ticket
.as_bytes(), DEFAULT_ENCODE_SET
));
296 req
.headers_mut().insert("Cookie", HeaderValue
::from_str(&enc_ticket
).unwrap());
297 req
.headers_mut().insert("UPGRADE", HeaderValue
::from_str(&protocol_name
).unwrap());
299 let resp
= client
.request(req
).await?
;
300 let status
= resp
.status();
302 if status
!= http
::StatusCode
::SWITCHING_PROTOCOLS
{
303 Self::api_response(resp
)
304 .map(|_
| Err(format_err
!("unknown error")))
314 let max_window_size
= (1 << 31) - 2;
316 let (h2
, connection
) = h2
::client
::Builder
::new()
317 .initial_connection_window_size(max_window_size
)
318 .initial_window_size(max_window_size
)
319 .max_frame_size(4*1024*1024)
323 let connection
= connection
324 .map_err(|_
| panic
!("HTTP/2.0 connection failed"));
326 let (connection
, canceller
) = cancellable(connection
)?
;
327 // A cancellable future returns an Option which is None when cancelled and
328 // Some when it finished instead, since we don't care about the return type we
329 // need to map it away:
330 let connection
= connection
.map(|_
| ());
332 // Spawn a new task to drive the connection state
333 tokio
::spawn(connection
);
335 // Wait until the `SendRequest` handle has available capacity.
336 let c
= h2
.ready().await?
;
337 Ok((H2Client
::new(c
), canceller
))
340 async
fn credentials(
341 client
: Client
<HttpsConnector
>,
345 ) -> Result
<AuthInfo
, Error
> {
346 let data
= json
!({ "username": username, "password": password }
);
347 let req
= Self::request_builder(&server
, "POST", "/api2/json/access/ticket", Some(data
)).unwrap();
348 let cred
= Self::api_request(client
, req
).await?
;
349 let auth
= AuthInfo
{
350 username
: cred
["data"]["username"].as_str().unwrap().to_owned(),
351 ticket
: cred
["data"]["ticket"].as_str().unwrap().to_owned(),
352 token
: cred
["data"]["CSRFPreventionToken"].as_str().unwrap().to_owned(),
355 let _
= store_ticket_info(&server
, &auth
.username
, &auth
.ticket
, &auth
.token
);
360 async
fn api_response(response
: Response
<Body
>) -> Result
<Value
, Error
> {
361 let status
= response
.status();
362 let data
= hyper
::body
::to_bytes(response
.into_body()).await?
;
364 let text
= String
::from_utf8(data
.to_vec()).unwrap();
365 if status
.is_success() {
369 let value
: Value
= serde_json
::from_str(&text
)?
;
373 bail
!("HTTP Error {}: {}", status
, text
);
377 async
fn api_request(
378 client
: Client
<HttpsConnector
>,
380 ) -> Result
<Value
, Error
> {
383 .map_err(Error
::from
)
384 .and_then(Self::api_response
)
388 // Read-only access to server property
389 pub fn server(&self) -> &str {
393 pub fn request_builder(server
: &str, method
: &str, path
: &str, data
: Option
<Value
>) -> Result
<Request
<Body
>, Error
> {
394 let path
= path
.trim_matches('
/'
);
395 let url
: Uri
= format
!("https://{}:8007/{}", server
, path
).parse()?
;
397 if let Some(data
) = data
{
398 if method
== "POST" {
399 let request
= Request
::builder()
402 .header("User-Agent", "proxmox-backup-client/1.0")
403 .header(hyper
::header
::CONTENT_TYPE
, "application/json")
404 .body(Body
::from(data
.to_string()))?
;
407 let query
= tools
::json_object_to_query(data
)?
;
408 let url
: Uri
= format
!("https://{}:8007/{}?{}", server
, path
, query
).parse()?
;
409 let request
= Request
::builder()
412 .header("User-Agent", "proxmox-backup-client/1.0")
413 .header(hyper
::header
::CONTENT_TYPE
, "application/x-www-form-urlencoded")
414 .body(Body
::empty())?
;
419 let request
= Request
::builder()
422 .header("User-Agent", "proxmox-backup-client/1.0")
423 .header(hyper
::header
::CONTENT_TYPE
, "application/x-www-form-urlencoded")
424 .body(Body
::empty())?
;
432 pub struct H2Client
{
433 h2
: h2
::client
::SendRequest
<bytes
::Bytes
>,
438 pub fn new(h2
: h2
::client
::SendRequest
<bytes
::Bytes
>) -> Self {
446 ) -> Result
<Value
, Error
> {
447 let req
= Self::request_builder("localhost", "GET", path
, param
, None
).unwrap();
448 self.request(req
).await
455 ) -> Result
<Value
, Error
> {
456 let req
= Self::request_builder("localhost", "PUT", path
, param
, None
).unwrap();
457 self.request(req
).await
464 ) -> Result
<Value
, Error
> {
465 let req
= Self::request_builder("localhost", "POST", path
, param
, None
).unwrap();
466 self.request(req
).await
469 pub async
fn download
<W
: Write
+ Send
>(
472 param
: Option
<Value
>,
474 ) -> Result
<W
, Error
> {
475 let request
= Self::request_builder("localhost", "GET", path
, param
, None
).unwrap();
477 let response_future
= self.send_request(request
, None
).await?
;
479 let resp
= response_future
.await?
;
481 let status
= resp
.status();
482 if !status
.is_success() {
483 H2Client
::h2api_response(resp
).await?
; // raise error
487 let mut body
= resp
.into_body();
488 while let Some(chunk
) = body
.data().await
{
490 body
.flow_control().release_capacity(chunk
.len())?
;
491 output
.write_all(&chunk
)?
;
499 method
: &str, // POST or PUT
501 param
: Option
<Value
>,
504 ) -> Result
<Value
, Error
> {
505 let request
= Self::request_builder("localhost", method
, path
, param
, Some(content_type
)).unwrap();
507 let mut send_request
= self.h2
.clone().ready().await?
;
509 let (response
, stream
) = send_request
.send_request(request
, false).unwrap();
511 PipeToSendStream
::new(bytes
::Bytes
::from(data
), stream
).await?
;
514 .map_err(Error
::from
)
515 .and_then(Self::h2api_response
)
521 request
: Request
<()>,
522 ) -> Result
<Value
, Error
> {
524 self.send_request(request
, None
)
525 .and_then(move |response
| {
527 .map_err(Error
::from
)
528 .and_then(Self::h2api_response
)
535 request
: Request
<()>,
536 data
: Option
<bytes
::Bytes
>,
537 ) -> impl Future
<Output
= Result
<h2
::client
::ResponseFuture
, Error
>> {
541 .map_err(Error
::from
)
542 .and_then(move |mut send_request
| async
move {
543 if let Some(data
) = data
{
544 let (response
, stream
) = send_request
.send_request(request
, false).unwrap();
545 PipeToSendStream
::new(data
, stream
).await?
;
548 let (response
, _stream
) = send_request
.send_request(request
, true).unwrap();
554 pub async
fn h2api_response(
555 response
: Response
<h2
::RecvStream
>,
556 ) -> Result
<Value
, Error
> {
557 let status
= response
.status();
559 let (_head
, mut body
) = response
.into_parts();
561 let mut data
= Vec
::new();
562 while let Some(chunk
) = body
.data().await
{
564 // Whenever data is received, the caller is responsible for
565 // releasing capacity back to the server once it has freed
566 // the data from memory.
567 // Let the server send more data.
568 body
.flow_control().release_capacity(chunk
.len())?
;
572 let text
= String
::from_utf8(data
.to_vec()).unwrap();
573 if status
.is_success() {
577 let mut value
: Value
= serde_json
::from_str(&text
)?
;
578 if let Some(map
) = value
.as_object_mut() {
579 if let Some(data
) = map
.remove("data") {
583 bail
!("got result without data property");
586 bail
!("HTTP Error {}: {}", status
, text
);
590 // Note: We always encode parameters with the url
591 pub fn request_builder(
595 param
: Option
<Value
>,
596 content_type
: Option
<&str>,
597 ) -> Result
<Request
<()>, Error
> {
598 let path
= path
.trim_matches('
/'
);
600 let content_type
= content_type
.unwrap_or("application/x-www-form-urlencoded");
602 if let Some(param
) = param
{
603 let query
= tools
::json_object_to_query(param
)?
;
604 // We detected problem with hyper around 6000 characters - seo we try to keep on the safe side
605 if query
.len() > 4096 { bail!("h2 query data too large ({} bytes
) - please encode data inside body
", query.len()); }
606 let url: Uri = format!("https
://{}:8007/{}?{}", server, path, query).parse()?;
607 let request
= Request
::builder()
610 .header("User-Agent", "proxmox-backup-client/1.0")
611 .header(hyper
::header
::CONTENT_TYPE
, content_type
)
615 let url
: Uri
= format
!("https://{}:8007/{}", server
, path
).parse()?
;
616 let request
= Request
::builder()
619 .header("User-Agent", "proxmox-backup-client/1.0")
620 .header(hyper
::header
::CONTENT_TYPE
, content_type
)
629 pub struct HttpsConnector
{
631 ssl_connector
: std
::sync
::Arc
<SslConnector
>,
634 impl HttpsConnector
{
635 pub fn with_connector(mut http
: HttpConnector
, ssl_connector
: SslConnector
) -> Self {
636 http
.enforce_http(false);
640 ssl_connector
: std
::sync
::Arc
::new(ssl_connector
),
645 type MaybeTlsStream
= EitherStream
<
646 tokio
::net
::TcpStream
,
647 tokio_openssl
::SslStream
<tokio
::net
::TcpStream
>,
650 impl hyper
::service
::Service
<Uri
> for HttpsConnector
{
651 type Response
= MaybeTlsStream
;
653 type Future
= std
::pin
::Pin
<Box
<
654 dyn Future
<Output
= Result
<Self::Response
, Self::Error
>> + Send
+ '
static
657 fn poll_ready(&mut self, _
: &mut Context
<'_
>) -> Poll
<Result
<(), Self::Error
>> {
658 // This connector is always ready, but others might not be.
662 fn call(&mut self, dst
: Uri
) -> Self::Future
{
663 let mut this
= self.clone();
667 .ok_or_else(|| format_err
!("missing URL scheme"))?
671 .ok_or_else(|| format_err
!("missing hostname in destination url?"))?
674 let config
= this
.ssl_connector
.configure();
675 let conn
= this
.http
.call(dst
).await?
;
677 let conn
= tokio_openssl
::connect(config?
, &host
, conn
).await?
;
678 Ok(MaybeTlsStream
::Right(conn
))
680 Ok(MaybeTlsStream
::Left(conn
))