2 use std
::sync
::{Arc, Mutex, RwLock}
;
3 use std
::time
::Duration
;
5 use anyhow
::{bail, format_err, Error}
;
8 use http
::header
::HeaderValue
;
9 use http
::{Request, Response}
;
11 use hyper
::client
::{Client, HttpConnector}
;
12 use openssl
::{ssl::{SslConnector, SslMethod}
, x509
::X509StoreContextRef
};
13 use serde_json
::{json, Value}
;
14 use percent_encoding
::percent_encode
;
15 use xdg
::BaseDirectories
;
17 use proxmox_sys
::linux
::tty
;
18 use proxmox_sys
::fs
::{file_get_json, replace_file, CreateOptions}
;
19 use proxmox_router
::HttpError
;
21 use proxmox_http
::client
::{HttpsConnector, RateLimiter}
;
22 use proxmox_http
::uri
::build_authority
;
23 use proxmox_async
::broadcast_future
::BroadcastFuture
;
25 use pbs_api_types
::{Authid, Userid, RateLimitConfig}
;
26 use pbs_api_types
::percent_encoding
::DEFAULT_ENCODE_SET
;
27 use pbs_tools
::json
::json_object_to_query
;
28 use pbs_tools
::ticket
;
30 use super::pipe_to_stream
::PipeToSendStream
;
31 use super::PROXMOX_BACKUP_TCP_KEEPALIVE_TIME
;
33 /// Timeout used for several HTTP operations that are expected to finish quickly but may block in
34 /// certain error conditions. Keep it generous, to avoid false-positive under high load.
35 const HTTP_TIMEOUT
: Duration
= Duration
::from_secs(2 * 60);
44 pub struct HttpClientOptions
{
45 prefix
: Option
<String
>,
46 password
: Option
<String
>,
47 fingerprint
: Option
<String
>,
50 fingerprint_cache
: bool
,
52 limit
: RateLimitConfig
,
55 impl HttpClientOptions
{
57 pub fn new_interactive(password
: Option
<String
>, fingerprint
: Option
<String
>) -> Self {
61 fingerprint_cache
: true,
64 prefix
: Some("proxmox-backup".to_string()),
69 pub fn new_non_interactive(password
: String
, fingerprint
: Option
<String
>) -> Self {
71 password
: Some(password
),
77 pub fn prefix(mut self, prefix
: Option
<String
>) -> Self {
82 pub fn password(mut self, password
: Option
<String
>) -> Self {
83 self.password
= password
;
87 pub fn fingerprint(mut self, fingerprint
: Option
<String
>) -> Self {
88 self.fingerprint
= fingerprint
;
92 pub fn interactive(mut self, interactive
: bool
) -> Self {
93 self.interactive
= interactive
;
97 pub fn ticket_cache(mut self, ticket_cache
: bool
) -> Self {
98 self.ticket_cache
= ticket_cache
;
102 pub fn fingerprint_cache(mut self, fingerprint_cache
: bool
) -> Self {
103 self.fingerprint_cache
= fingerprint_cache
;
107 pub fn verify_cert(mut self, verify_cert
: bool
) -> Self {
108 self.verify_cert
= verify_cert
;
112 pub fn rate_limit(mut self, rate_limit
: RateLimitConfig
) -> Self {
113 self.limit
= rate_limit
;
118 impl Default
for HttpClientOptions
{
119 fn default() -> Self {
126 fingerprint_cache
: false,
128 limit
: RateLimitConfig
::default(), // unlimited
133 /// HTTP(S) API client
134 pub struct HttpClient
{
135 client
: Client
<HttpsConnector
>,
138 fingerprint
: Arc
<Mutex
<Option
<String
>>>,
139 first_auth
: Option
<BroadcastFuture
<()>>,
140 auth
: Arc
<RwLock
<AuthInfo
>>,
141 ticket_abort
: futures
::future
::AbortHandle
,
142 _options
: HttpClientOptions
,
145 /// Delete stored ticket data (logout)
146 pub fn delete_ticket_info(prefix
: &str, server
: &str, username
: &Userid
) -> Result
<(), Error
> {
148 let base
= BaseDirectories
::with_prefix(prefix
)?
;
150 // usually /run/user/<uid>/...
151 let path
= base
.place_runtime_file("tickets")?
;
153 let mode
= nix
::sys
::stat
::Mode
::from_bits_truncate(0o0600);
155 let mut data
= file_get_json(&path
, Some(json
!({}
)))?
;
157 if let Some(map
) = data
[server
].as_object_mut() {
158 map
.remove(username
.as_str());
161 replace_file(path
, data
.to_string().as_bytes(), CreateOptions
::new().perm(mode
), false)?
;
166 fn store_fingerprint(prefix
: &str, server
: &str, fingerprint
: &str) -> Result
<(), Error
> {
168 let base
= BaseDirectories
::with_prefix(prefix
)?
;
170 // usually ~/.config/<prefix>/fingerprints
171 let path
= base
.place_config_file("fingerprints")?
;
173 let raw
= match std
::fs
::read_to_string(&path
) {
176 if err
.kind() == std
::io
::ErrorKind
::NotFound
{
179 bail
!("unable to read fingerprints from {:?} - {}", path
, err
);
184 let mut result
= String
::new();
186 raw
.split('
\n'
).for_each(|line
| {
187 let items
: Vec
<String
> = line
.split_whitespace().map(String
::from
).collect();
188 if items
.len() == 2 {
189 if items
[0] == server
{
190 // found, add later with new fingerprint
192 result
.push_str(line
);
198 result
.push_str(server
);
200 result
.push_str(fingerprint
);
203 replace_file(path
, result
.as_bytes(), CreateOptions
::new(), false)?
;
208 fn load_fingerprint(prefix
: &str, server
: &str) -> Option
<String
> {
210 let base
= BaseDirectories
::with_prefix(prefix
).ok()?
;
212 // usually ~/.config/<prefix>/fingerprints
213 let path
= base
.place_config_file("fingerprints").ok()?
;
215 let raw
= std
::fs
::read_to_string(&path
).ok()?
;
217 for line
in raw
.split('
\n'
) {
218 let items
: Vec
<String
> = line
.split_whitespace().map(String
::from
).collect();
219 if items
.len() == 2 && items
[0] == server
{
220 return Some(items
[1].clone());
227 fn store_ticket_info(prefix
: &str, server
: &str, username
: &str, ticket
: &str, token
: &str) -> Result
<(), Error
> {
229 let base
= BaseDirectories
::with_prefix(prefix
)?
;
231 // usually /run/user/<uid>/...
232 let path
= base
.place_runtime_file("tickets")?
;
234 let mode
= nix
::sys
::stat
::Mode
::from_bits_truncate(0o0600);
236 let mut data
= file_get_json(&path
, Some(json
!({}
)))?
;
238 let now
= proxmox_time
::epoch_i64();
240 data
[server
][username
] = json
!({ "timestamp": now, "ticket": ticket, "token": token}
);
242 let mut new_data
= json
!({}
);
244 let ticket_lifetime
= ticket
::TICKET_LIFETIME
- 60;
246 let empty
= serde_json
::map
::Map
::new();
247 for (server
, info
) in data
.as_object().unwrap_or(&empty
) {
248 for (user
, uinfo
) in info
.as_object().unwrap_or(&empty
) {
249 if let Some(timestamp
) = uinfo
["timestamp"].as_i64() {
250 let age
= now
- timestamp
;
251 if age
< ticket_lifetime
{
252 new_data
[server
][user
] = uinfo
.clone();
258 replace_file(path
, new_data
.to_string().as_bytes(), CreateOptions
::new().perm(mode
), false)?
;
263 fn load_ticket_info(prefix
: &str, server
: &str, userid
: &Userid
) -> Option
<(String
, String
)> {
264 let base
= BaseDirectories
::with_prefix(prefix
).ok()?
;
266 // usually /run/user/<uid>/...
267 let path
= base
.place_runtime_file("tickets").ok()?
;
268 let data
= file_get_json(&path
, None
).ok()?
;
269 let now
= proxmox_time
::epoch_i64();
270 let ticket_lifetime
= ticket
::TICKET_LIFETIME
- 60;
271 let uinfo
= data
[server
][userid
.as_str()].as_object()?
;
272 let timestamp
= uinfo
["timestamp"].as_i64()?
;
273 let age
= now
- timestamp
;
275 if age
< ticket_lifetime
{
276 let ticket
= uinfo
["ticket"].as_str()?
;
277 let token
= uinfo
["token"].as_str()?
;
278 Some((ticket
.to_owned(), token
.to_owned()))
284 fn build_uri(server
: &str, port
: u16, path
: &str, query
: Option
<String
>) -> Result
<Uri
, Error
> {
287 .authority(build_authority(server
, port
)?
)
288 .path_and_query(match query
{
289 Some(query
) => format
!("/{}?{}", path
, query
),
290 None
=> format
!("/{}", path
),
293 .map_err(|err
| format_err
!("error building uri - {}", err
))
301 mut options
: HttpClientOptions
,
302 ) -> Result
<Self, Error
> {
304 let verified_fingerprint
= Arc
::new(Mutex
::new(None
));
306 let mut expected_fingerprint
= options
.fingerprint
.take();
308 if expected_fingerprint
.is_some() {
309 // do not store fingerprints passed via options in cache
310 options
.fingerprint_cache
= false;
311 } else if options
.fingerprint_cache
&& options
.prefix
.is_some() {
312 expected_fingerprint
= load_fingerprint(options
.prefix
.as_ref().unwrap(), server
);
315 let mut ssl_connector_builder
= SslConnector
::builder(SslMethod
::tls()).unwrap();
317 if options
.verify_cert
{
318 let server
= server
.to_string();
319 let verified_fingerprint
= verified_fingerprint
.clone();
320 let interactive
= options
.interactive
;
321 let fingerprint_cache
= options
.fingerprint_cache
;
322 let prefix
= options
.prefix
.clone();
323 ssl_connector_builder
.set_verify_callback(openssl
::ssl
::SslVerifyMode
::PEER
, move |valid
, ctx
| {
324 match Self::verify_callback(valid
, ctx
, expected_fingerprint
.as_ref(), interactive
) {
326 Ok(Some(fingerprint
)) => {
327 if fingerprint_cache
&& prefix
.is_some() {
328 if let Err(err
) = store_fingerprint(
329 prefix
.as_ref().unwrap(), &server
, &fingerprint
) {
330 eprintln
!("{}", err
);
333 *verified_fingerprint
.lock().unwrap() = Some(fingerprint
);
337 eprintln
!("certificate validation failed - {}", err
);
343 ssl_connector_builder
.set_verify(openssl
::ssl
::SslVerifyMode
::NONE
);
346 let mut httpc
= HttpConnector
::new();
347 httpc
.set_nodelay(true); // important for h2 download performance!
348 httpc
.enforce_http(false); // we want https...
350 httpc
.set_connect_timeout(Some(std
::time
::Duration
::new(10, 0)));
351 let mut https
= HttpsConnector
::with_connector(httpc
, ssl_connector_builder
.build(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME
);
353 if let Some(rate_in
) = options
.limit
.rate_in
{
354 let burst_in
= options
.limit
.burst_in
.unwrap_or_else(|| rate_in
).as_u64();
355 https
.set_read_limiter(Some(Arc
::new(Mutex
::new(
356 RateLimiter
::new(rate_in
.as_u64(), burst_in
)
360 if let Some(rate_out
) = options
.limit
.rate_out
{
361 let burst_out
= options
.limit
.burst_out
.unwrap_or_else(|| rate_out
).as_u64();
362 https
.set_write_limiter(Some(Arc
::new(Mutex
::new(
363 RateLimiter
::new(rate_out
.as_u64(), burst_out
)
367 let client
= Client
::builder()
368 //.http2_initial_stream_window_size( (1 << 31) - 2)
369 //.http2_initial_connection_window_size( (1 << 31) - 2)
370 .build
::<_
, Body
>(https
);
372 let password
= options
.password
.take();
373 let use_ticket_cache
= options
.ticket_cache
&& options
.prefix
.is_some();
375 let password
= if let Some(password
) = password
{
378 let userid
= if auth_id
.is_token() {
379 bail
!("API token secret must be provided!");
383 let mut ticket_info
= None
;
384 if use_ticket_cache
{
385 ticket_info
= load_ticket_info(options
.prefix
.as_ref().unwrap(), server
, userid
);
387 if let Some((ticket
, _token
)) = ticket_info
{
390 Self::get_password(userid
, options
.interactive
)?
394 let auth
= Arc
::new(RwLock
::new(AuthInfo
{
395 auth_id
: auth_id
.clone(),
396 ticket
: password
.clone(),
397 token
: "".to_string(),
400 let server2
= server
.to_string();
401 let client2
= client
.clone();
402 let auth2
= auth
.clone();
403 let prefix2
= options
.prefix
.clone();
405 let renewal_future
= async
move {
407 tokio
::time
::sleep(Duration
::new(60*15, 0)).await
; // 15 minutes
408 let (auth_id
, ticket
) = {
409 let authinfo
= auth2
.read().unwrap().clone();
410 (authinfo
.auth_id
, authinfo
.ticket
)
412 match Self::credentials(client2
.clone(), server2
.clone(), port
, auth_id
.user().clone(), ticket
).await
{
414 if use_ticket_cache
&& prefix2
.is_some() {
415 let _
= store_ticket_info(prefix2
.as_ref().unwrap(), &server2
, &auth
.auth_id
.to_string(), &auth
.ticket
, &auth
.token
);
417 *auth2
.write().unwrap() = auth
;
420 eprintln
!("re-authentication failed: {}", err
);
427 let (renewal_future
, ticket_abort
) = futures
::future
::abortable(renewal_future
);
429 let login_future
= Self::credentials(
433 auth_id
.user().clone(),
436 let server
= server
.to_string();
437 let prefix
= options
.prefix
.clone();
438 let authinfo
= auth
.clone();
441 if use_ticket_cache
&& prefix
.is_some() {
442 let _
= store_ticket_info(prefix
.as_ref().unwrap(), &server
, &auth
.auth_id
.to_string(), &auth
.ticket
, &auth
.token
);
444 *authinfo
.write().unwrap() = auth
;
445 tokio
::spawn(renewal_future
);
449 let first_auth
= if auth_id
.is_token() {
450 // TODO check access here?
453 Some(BroadcastFuture
::new(Box
::new(login_future
)))
458 server
: String
::from(server
),
460 fingerprint
: verified_fingerprint
,
470 /// Login is done on demand, so this is only required if you need
471 /// access to authentication data in 'AuthInfo'.
473 /// Note: tickets a periodially re-newed, so one can use this
474 /// to query changed ticket.
475 pub async
fn login(&self) -> Result
<AuthInfo
, Error
> {
476 if let Some(future
) = &self.first_auth
{
477 future
.listen().await?
;
480 let authinfo
= self.auth
.read().unwrap();
484 /// Returns the optional fingerprint passed to the new() constructor.
485 pub fn fingerprint(&self) -> Option
<String
> {
486 (*self.fingerprint
.lock().unwrap()).clone()
489 fn get_password(username
: &Userid
, interactive
: bool
) -> Result
<String
, Error
> {
490 // If we're on a TTY, query the user for a password
491 if interactive
&& tty
::stdin_isatty() {
492 let msg
= format
!("Password for \"{}\": ", username
);
493 return Ok(String
::from_utf8(tty
::read_password(&msg
)?
)?
);
496 bail
!("no password input mechanism available");
501 ctx
: &mut X509StoreContextRef
,
502 expected_fingerprint
: Option
<&String
>,
504 ) -> Result
<Option
<String
>, Error
> {
510 let cert
= match ctx
.current_cert() {
512 None
=> bail
!("context lacks current certificate."),
515 let depth
= ctx
.error_depth();
516 if depth
!= 0 { bail!("context depth != 0") }
518 let fp
= match cert
.digest(openssl
::hash
::MessageDigest
::sha256()) {
520 Err(err
) => bail
!("failed to calculate certificate FP - {}", err
), // should not happen
522 let fp_string
= hex
::encode(&fp
);
523 let fp_string
= fp_string
.as_bytes().chunks(2).map(|v
| std
::str::from_utf8(v
).unwrap())
524 .collect
::<Vec
<&str>>().join(":");
526 if let Some(expected_fingerprint
) = expected_fingerprint
{
527 let expected_fingerprint
= expected_fingerprint
.to_lowercase();
528 if expected_fingerprint
== fp_string
{
529 return Ok(Some(fp_string
));
531 eprintln
!("WARNING: certificate fingerprint does not match expected fingerprint!");
532 eprintln
!("expected: {}", expected_fingerprint
);
536 // If we're on a TTY, query the user
537 if interactive
&& tty
::stdin_isatty() {
538 eprintln
!("fingerprint: {}", fp_string
);
540 eprint
!("Are you sure you want to continue connecting? (y/n): ");
541 let _
= std
::io
::stdout().flush();
542 use std
::io
::{BufRead, BufReader}
;
543 let mut line
= String
::new();
544 match BufReader
::new(std
::io
::stdin()).read_line(&mut line
) {
546 let trimmed
= line
.trim();
547 if trimmed
== "y" || trimmed
== "Y" {
548 return Ok(Some(fp_string
));
549 } else if trimmed
== "n" || trimmed
== "N" {
550 bail
!("Certificate fingerprint was not confirmed.");
555 Err(err
) => bail
!("Certificate fingerprint was not confirmed - {}.", err
),
560 bail
!("Certificate fingerprint was not confirmed.");
563 pub async
fn request(&self, mut req
: Request
<Body
>) -> Result
<Value
, Error
> {
565 let client
= self.client
.clone();
567 let auth
= self.login().await?
;
568 if auth
.auth_id
.is_token() {
569 let enc_api_token
= format
!("PBSAPIToken {}:{}", auth
.auth_id
, percent_encode(auth
.ticket
.as_bytes(), DEFAULT_ENCODE_SET
));
570 req
.headers_mut().insert("Authorization", HeaderValue
::from_str(&enc_api_token
).unwrap());
572 let enc_ticket
= format
!("PBSAuthCookie={}", percent_encode(auth
.ticket
.as_bytes(), DEFAULT_ENCODE_SET
));
573 req
.headers_mut().insert("Cookie", HeaderValue
::from_str(&enc_ticket
).unwrap());
574 req
.headers_mut().insert("CSRFPreventionToken", HeaderValue
::from_str(&auth
.token
).unwrap());
577 Self::api_request(client
, req
).await
584 ) -> Result
<Value
, Error
> {
585 let req
= Self::request_builder(&self.server
, self.port
, "GET", path
, data
)?
;
586 self.request(req
).await
593 ) -> Result
<Value
, Error
> {
594 let req
= Self::request_builder(&self.server
, self.port
, "DELETE", path
, data
)?
;
595 self.request(req
).await
602 ) -> Result
<Value
, Error
> {
603 let req
= Self::request_builder(&self.server
, self.port
, "POST", path
, data
)?
;
604 self.request(req
).await
611 ) -> Result
<Value
, Error
> {
612 let req
= Self::request_builder(&self.server
, self.port
, "PUT", path
, data
)?
;
613 self.request(req
).await
616 pub async
fn download(
619 output
: &mut (dyn Write
+ Send
),
620 ) -> Result
<(), Error
> {
621 let mut req
= Self::request_builder(&self.server
, self.port
, "GET", path
, None
)?
;
623 let client
= self.client
.clone();
625 let auth
= self.login().await?
;
627 let enc_ticket
= format
!("PBSAuthCookie={}", percent_encode(auth
.ticket
.as_bytes(), DEFAULT_ENCODE_SET
));
628 req
.headers_mut().insert("Cookie", HeaderValue
::from_str(&enc_ticket
).unwrap());
630 let resp
= tokio
::time
::timeout(
635 .map_err(|_
| format_err
!("http download request timed out"))??
;
636 let status
= resp
.status();
637 if !status
.is_success() {
638 HttpClient
::api_response(resp
)
639 .map(|_
| Err(format_err
!("unknown error")))
643 .map_err(Error
::from
)
644 .try_fold(output
, move |acc
, chunk
| async
move {
645 acc
.write_all(&chunk
)?
;
659 ) -> Result
<Value
, Error
> {
661 let query
= match data
{
662 Some(data
) => Some(json_object_to_query(data
)?
),
665 let url
= build_uri(&self.server
, self.port
, path
, query
)?
;
667 let req
= Request
::builder()
670 .header("User-Agent", "proxmox-backup-client/1.0")
671 .header("Content-Type", content_type
)
672 .body(body
).unwrap();
674 self.request(req
).await
677 pub async
fn start_h2_connection(
679 mut req
: Request
<Body
>,
680 protocol_name
: String
,
681 ) -> Result
<(H2Client
, futures
::future
::AbortHandle
), Error
> {
683 let client
= self.client
.clone();
684 let auth
= self.login().await?
;
686 if auth
.auth_id
.is_token() {
687 let enc_api_token
= format
!("PBSAPIToken {}:{}", auth
.auth_id
, percent_encode(auth
.ticket
.as_bytes(), DEFAULT_ENCODE_SET
));
688 req
.headers_mut().insert("Authorization", HeaderValue
::from_str(&enc_api_token
).unwrap());
690 let enc_ticket
= format
!("PBSAuthCookie={}", percent_encode(auth
.ticket
.as_bytes(), DEFAULT_ENCODE_SET
));
691 req
.headers_mut().insert("Cookie", HeaderValue
::from_str(&enc_ticket
).unwrap());
692 req
.headers_mut().insert("CSRFPreventionToken", HeaderValue
::from_str(&auth
.token
).unwrap());
695 req
.headers_mut().insert("UPGRADE", HeaderValue
::from_str(&protocol_name
).unwrap());
697 let resp
= tokio
::time
::timeout(
702 .map_err(|_
| format_err
!("http upgrade request timed out"))??
;
703 let status
= resp
.status();
705 if status
!= http
::StatusCode
::SWITCHING_PROTOCOLS
{
706 Self::api_response(resp
).await?
;
707 bail
!("unknown error");
710 let upgraded
= hyper
::upgrade
::on(resp
).await?
;
712 let max_window_size
= (1 << 31) - 2;
714 let (h2
, connection
) = h2
::client
::Builder
::new()
715 .initial_connection_window_size(max_window_size
)
716 .initial_window_size(max_window_size
)
717 .max_frame_size(4*1024*1024)
721 let connection
= connection
722 .map_err(|_
| eprintln
!("HTTP/2.0 connection failed"));
724 let (connection
, abort
) = futures
::future
::abortable(connection
);
725 // A cancellable future returns an Option which is None when cancelled and
726 // Some when it finished instead, since we don't care about the return type we
727 // need to map it away:
728 let connection
= connection
.map(|_
| ());
730 // Spawn a new task to drive the connection state
731 tokio
::spawn(connection
);
733 // Wait until the `SendRequest` handle has available capacity.
734 let c
= h2
.ready().await?
;
735 Ok((H2Client
::new(c
), abort
))
738 async
fn credentials(
739 client
: Client
<HttpsConnector
>,
744 ) -> Result
<AuthInfo
, Error
> {
745 let data
= json
!({ "username": username, "password": password }
);
746 let req
= Self::request_builder(&server
, port
, "POST", "/api2/json/access/ticket", Some(data
))?
;
747 let cred
= Self::api_request(client
, req
).await?
;
748 let auth
= AuthInfo
{
749 auth_id
: cred
["data"]["username"].as_str().unwrap().parse()?
,
750 ticket
: cred
["data"]["ticket"].as_str().unwrap().to_owned(),
751 token
: cred
["data"]["CSRFPreventionToken"].as_str().unwrap().to_owned(),
757 async
fn api_response(response
: Response
<Body
>) -> Result
<Value
, Error
> {
758 let status
= response
.status();
759 let data
= hyper
::body
::to_bytes(response
.into_body()).await?
;
761 let text
= String
::from_utf8(data
.to_vec()).unwrap();
762 if status
.is_success() {
766 let value
: Value
= serde_json
::from_str(&text
)?
;
770 Err(Error
::from(HttpError
::new(status
, text
)))
774 async
fn api_request(
775 client
: Client
<HttpsConnector
>,
777 ) -> Result
<Value
, Error
> {
780 tokio
::time
::timeout(
785 .map_err(|_
| format_err
!("http request timed out"))??
789 // Read-only access to server property
790 pub fn server(&self) -> &str {
794 pub fn port(&self) -> u16 {
798 pub fn request_builder(server
: &str, port
: u16, method
: &str, path
: &str, data
: Option
<Value
>) -> Result
<Request
<Body
>, Error
> {
799 if let Some(data
) = data
{
800 if method
== "POST" {
801 let url
= build_uri(server
, port
, path
, None
)?
;
802 let request
= Request
::builder()
805 .header("User-Agent", "proxmox-backup-client/1.0")
806 .header(hyper
::header
::CONTENT_TYPE
, "application/json")
807 .body(Body
::from(data
.to_string()))?
;
810 let query
= json_object_to_query(data
)?
;
811 let url
= build_uri(server
, port
, path
, Some(query
))?
;
812 let request
= Request
::builder()
815 .header("User-Agent", "proxmox-backup-client/1.0")
816 .header(hyper
::header
::CONTENT_TYPE
, "application/x-www-form-urlencoded")
817 .body(Body
::empty())?
;
821 let url
= build_uri(server
, port
, path
, None
)?
;
822 let request
= Request
::builder()
825 .header("User-Agent", "proxmox-backup-client/1.0")
826 .header(hyper
::header
::CONTENT_TYPE
, "application/x-www-form-urlencoded")
827 .body(Body
::empty())?
;
834 impl Drop
for HttpClient
{
836 self.ticket_abort
.abort();
842 pub struct H2Client
{
843 h2
: h2
::client
::SendRequest
<bytes
::Bytes
>,
848 pub fn new(h2
: h2
::client
::SendRequest
<bytes
::Bytes
>) -> Self {
856 ) -> Result
<Value
, Error
> {
857 let req
= Self::request_builder("localhost", "GET", path
, param
, None
).unwrap();
858 self.request(req
).await
865 ) -> Result
<Value
, Error
> {
866 let req
= Self::request_builder("localhost", "PUT", path
, param
, None
).unwrap();
867 self.request(req
).await
874 ) -> Result
<Value
, Error
> {
875 let req
= Self::request_builder("localhost", "POST", path
, param
, None
).unwrap();
876 self.request(req
).await
879 pub async
fn download
<W
: Write
+ Send
>(
882 param
: Option
<Value
>,
884 ) -> Result
<(), Error
> {
885 let request
= Self::request_builder("localhost", "GET", path
, param
, None
).unwrap();
887 let response_future
= self.send_request(request
, None
).await?
;
889 let resp
= response_future
.await?
;
891 let status
= resp
.status();
892 if !status
.is_success() {
893 H2Client
::h2api_response(resp
).await?
; // raise error
897 let mut body
= resp
.into_body();
898 while let Some(chunk
) = body
.data().await
{
900 body
.flow_control().release_capacity(chunk
.len())?
;
901 output
.write_all(&chunk
)?
;
909 method
: &str, // POST or PUT
911 param
: Option
<Value
>,
914 ) -> Result
<Value
, Error
> {
915 let request
= Self::request_builder("localhost", method
, path
, param
, Some(content_type
)).unwrap();
917 let mut send_request
= self.h2
.clone().ready().await?
;
919 let (response
, stream
) = send_request
.send_request(request
, false).unwrap();
921 PipeToSendStream
::new(bytes
::Bytes
::from(data
), stream
).await?
;
924 .map_err(Error
::from
)
925 .and_then(Self::h2api_response
)
931 request
: Request
<()>,
932 ) -> Result
<Value
, Error
> {
934 self.send_request(request
, None
)
935 .and_then(move |response
| {
937 .map_err(Error
::from
)
938 .and_then(Self::h2api_response
)
945 request
: Request
<()>,
946 data
: Option
<bytes
::Bytes
>,
947 ) -> impl Future
<Output
= Result
<h2
::client
::ResponseFuture
, Error
>> {
951 .map_err(Error
::from
)
952 .and_then(move |mut send_request
| async
move {
953 if let Some(data
) = data
{
954 let (response
, stream
) = send_request
.send_request(request
, false).unwrap();
955 PipeToSendStream
::new(data
, stream
).await?
;
958 let (response
, _stream
) = send_request
.send_request(request
, true).unwrap();
964 pub async
fn h2api_response(
965 response
: Response
<h2
::RecvStream
>,
966 ) -> Result
<Value
, Error
> {
967 let status
= response
.status();
969 let (_head
, mut body
) = response
.into_parts();
971 let mut data
= Vec
::new();
972 while let Some(chunk
) = body
.data().await
{
974 // Whenever data is received, the caller is responsible for
975 // releasing capacity back to the server once it has freed
976 // the data from memory.
977 // Let the server send more data.
978 body
.flow_control().release_capacity(chunk
.len())?
;
982 let text
= String
::from_utf8(data
.to_vec()).unwrap();
983 if status
.is_success() {
987 let mut value
: Value
= serde_json
::from_str(&text
)?
;
988 if let Some(map
) = value
.as_object_mut() {
989 if let Some(data
) = map
.remove("data") {
993 bail
!("got result without data property");
996 Err(Error
::from(HttpError
::new(status
, text
)))
1000 // Note: We always encode parameters with the url
1001 pub fn request_builder(
1005 param
: Option
<Value
>,
1006 content_type
: Option
<&str>,
1007 ) -> Result
<Request
<()>, Error
> {
1008 let path
= path
.trim_matches('
/'
);
1010 let content_type
= content_type
.unwrap_or("application/x-www-form-urlencoded");
1011 let query
= match param
{
1013 let query
= json_object_to_query(param
)?
;
1014 // We detected problem with hyper around 6000 characters - so we try to keep on the safe side
1015 if query
.len() > 4096 {
1016 bail
!("h2 query data too large ({} bytes) - please encode data inside body", query
.len());
1023 let url
= build_uri(server
, 8007, path
, query
)?
;
1024 let request
= Request
::builder()
1027 .header("User-Agent", "proxmox-backup-client/1.0")
1028 .header(hyper
::header
::CONTENT_TYPE
, content_type
)