2 use std
::sync
::{Arc, Mutex, RwLock}
;
3 use std
::time
::Duration
;
5 use anyhow
::{bail, format_err, Error}
;
8 use http
::header
::HeaderValue
;
9 use http
::{Request, Response}
;
11 use hyper
::client
::{Client, HttpConnector}
;
12 use openssl
::{ssl::{SslConnector, SslMethod}
, x509
::X509StoreContextRef
};
13 use serde_json
::{json, Value}
;
14 use percent_encoding
::percent_encode
;
15 use xdg
::BaseDirectories
;
18 api
::error
::HttpError
,
20 tools
::fs
::{file_get_json, replace_file, CreateOptions}
,
23 use super::pipe_to_stream
::PipeToSendStream
;
24 use crate::api2
::types
::{Authid, Userid}
;
32 /// Timeout used for several HTTP operations that are expected to finish quickly but may block in
33 /// certain error conditions.
34 const HTTP_TIMEOUT
: Duration
= Duration
::from_secs(20);
43 pub struct HttpClientOptions
{
44 prefix
: Option
<String
>,
45 password
: Option
<String
>,
46 fingerprint
: Option
<String
>,
49 fingerprint_cache
: bool
,
53 impl HttpClientOptions
{
55 pub fn new() -> Self {
62 fingerprint_cache
: false,
67 pub fn prefix(mut self, prefix
: Option
<String
>) -> Self {
72 pub fn password(mut self, password
: Option
<String
>) -> Self {
73 self.password
= password
;
77 pub fn fingerprint(mut self, fingerprint
: Option
<String
>) -> Self {
78 self.fingerprint
= fingerprint
;
82 pub fn interactive(mut self, interactive
: bool
) -> Self {
83 self.interactive
= interactive
;
87 pub fn ticket_cache(mut self, ticket_cache
: bool
) -> Self {
88 self.ticket_cache
= ticket_cache
;
92 pub fn fingerprint_cache(mut self, fingerprint_cache
: bool
) -> Self {
93 self.fingerprint_cache
= fingerprint_cache
;
97 pub fn verify_cert(mut self, verify_cert
: bool
) -> Self {
98 self.verify_cert
= verify_cert
;
103 /// HTTP(S) API client
104 pub struct HttpClient
{
105 client
: Client
<HttpsConnector
>,
108 fingerprint
: Arc
<Mutex
<Option
<String
>>>,
109 first_auth
: Option
<BroadcastFuture
<()>>,
110 auth
: Arc
<RwLock
<AuthInfo
>>,
111 ticket_abort
: futures
::future
::AbortHandle
,
112 _options
: HttpClientOptions
,
115 /// Delete stored ticket data (logout)
116 pub fn delete_ticket_info(prefix
: &str, server
: &str, username
: &Userid
) -> Result
<(), Error
> {
118 let base
= BaseDirectories
::with_prefix(prefix
)?
;
120 // usually /run/user/<uid>/...
121 let path
= base
.place_runtime_file("tickets")?
;
123 let mode
= nix
::sys
::stat
::Mode
::from_bits_truncate(0o0600);
125 let mut data
= file_get_json(&path
, Some(json
!({}
)))?
;
127 if let Some(map
) = data
[server
].as_object_mut() {
128 map
.remove(username
.as_str());
131 replace_file(path
, data
.to_string().as_bytes(), CreateOptions
::new().perm(mode
))?
;
136 fn store_fingerprint(prefix
: &str, server
: &str, fingerprint
: &str) -> Result
<(), Error
> {
138 let base
= BaseDirectories
::with_prefix(prefix
)?
;
140 // usually ~/.config/<prefix>/fingerprints
141 let path
= base
.place_config_file("fingerprints")?
;
143 let raw
= match std
::fs
::read_to_string(&path
) {
146 if err
.kind() == std
::io
::ErrorKind
::NotFound
{
149 bail
!("unable to read fingerprints from {:?} - {}", path
, err
);
154 let mut result
= String
::new();
156 raw
.split('
\n'
).for_each(|line
| {
157 let items
: Vec
<String
> = line
.split_whitespace().map(String
::from
).collect();
158 if items
.len() == 2 {
159 if &items
[0] == server
{
160 // found, add later with new fingerprint
162 result
.push_str(line
);
168 result
.push_str(server
);
170 result
.push_str(fingerprint
);
173 replace_file(path
, result
.as_bytes(), CreateOptions
::new())?
;
178 fn load_fingerprint(prefix
: &str, server
: &str) -> Option
<String
> {
180 let base
= BaseDirectories
::with_prefix(prefix
).ok()?
;
182 // usually ~/.config/<prefix>/fingerprints
183 let path
= base
.place_config_file("fingerprints").ok()?
;
185 let raw
= std
::fs
::read_to_string(&path
).ok()?
;
187 for line
in raw
.split('
\n'
) {
188 let items
: Vec
<String
> = line
.split_whitespace().map(String
::from
).collect();
189 if items
.len() == 2 && &items
[0] == server
{
190 return Some(items
[1].clone());
197 fn store_ticket_info(prefix
: &str, server
: &str, username
: &str, ticket
: &str, token
: &str) -> Result
<(), Error
> {
199 let base
= BaseDirectories
::with_prefix(prefix
)?
;
201 // usually /run/user/<uid>/...
202 let path
= base
.place_runtime_file("tickets")?
;
204 let mode
= nix
::sys
::stat
::Mode
::from_bits_truncate(0o0600);
206 let mut data
= file_get_json(&path
, Some(json
!({}
)))?
;
208 let now
= proxmox
::tools
::time
::epoch_i64();
210 data
[server
][username
] = json
!({ "timestamp": now, "ticket": ticket, "token": token}
);
212 let mut new_data
= json
!({}
);
214 let ticket_lifetime
= tools
::ticket
::TICKET_LIFETIME
- 60;
216 let empty
= serde_json
::map
::Map
::new();
217 for (server
, info
) in data
.as_object().unwrap_or(&empty
) {
218 for (user
, uinfo
) in info
.as_object().unwrap_or(&empty
) {
219 if let Some(timestamp
) = uinfo
["timestamp"].as_i64() {
220 let age
= now
- timestamp
;
221 if age
< ticket_lifetime
{
222 new_data
[server
][user
] = uinfo
.clone();
228 replace_file(path
, new_data
.to_string().as_bytes(), CreateOptions
::new().perm(mode
))?
;
233 fn load_ticket_info(prefix
: &str, server
: &str, userid
: &Userid
) -> Option
<(String
, String
)> {
234 let base
= BaseDirectories
::with_prefix(prefix
).ok()?
;
236 // usually /run/user/<uid>/...
237 let path
= base
.place_runtime_file("tickets").ok()?
;
238 let data
= file_get_json(&path
, None
).ok()?
;
239 let now
= proxmox
::tools
::time
::epoch_i64();
240 let ticket_lifetime
= tools
::ticket
::TICKET_LIFETIME
- 60;
241 let uinfo
= data
[server
][userid
.as_str()].as_object()?
;
242 let timestamp
= uinfo
["timestamp"].as_i64()?
;
243 let age
= now
- timestamp
;
245 if age
< ticket_lifetime
{
246 let ticket
= uinfo
["ticket"].as_str()?
;
247 let token
= uinfo
["token"].as_str()?
;
248 Some((ticket
.to_owned(), token
.to_owned()))
259 mut options
: HttpClientOptions
,
260 ) -> Result
<Self, Error
> {
262 let verified_fingerprint
= Arc
::new(Mutex
::new(None
));
264 let mut fingerprint
= options
.fingerprint
.take();
266 if fingerprint
.is_some() {
267 // do not store fingerprints passed via options in cache
268 options
.fingerprint_cache
= false;
269 } else if options
.fingerprint_cache
&& options
.prefix
.is_some() {
270 fingerprint
= load_fingerprint(options
.prefix
.as_ref().unwrap(), server
);
273 let mut ssl_connector_builder
= SslConnector
::builder(SslMethod
::tls()).unwrap();
275 if options
.verify_cert
{
276 let server
= server
.to_string();
277 let verified_fingerprint
= verified_fingerprint
.clone();
278 let interactive
= options
.interactive
;
279 let fingerprint_cache
= options
.fingerprint_cache
;
280 let prefix
= options
.prefix
.clone();
281 ssl_connector_builder
.set_verify_callback(openssl
::ssl
::SslVerifyMode
::PEER
, move |valid
, ctx
| {
282 let (valid
, fingerprint
) = Self::verify_callback(valid
, ctx
, fingerprint
.clone(), interactive
);
284 if let Some(fingerprint
) = fingerprint
{
285 if fingerprint_cache
&& prefix
.is_some() {
286 if let Err(err
) = store_fingerprint(
287 prefix
.as_ref().unwrap(), &server
, &fingerprint
) {
288 eprintln
!("{}", err
);
291 *verified_fingerprint
.lock().unwrap() = Some(fingerprint
);
297 ssl_connector_builder
.set_verify(openssl
::ssl
::SslVerifyMode
::NONE
);
300 let mut httpc
= HttpConnector
::new();
301 httpc
.set_nodelay(true); // important for h2 download performance!
302 httpc
.enforce_http(false); // we want https...
304 httpc
.set_connect_timeout(Some(std
::time
::Duration
::new(10, 0)));
305 let https
= HttpsConnector
::with_connector(httpc
, ssl_connector_builder
.build());
307 let client
= Client
::builder()
308 //.http2_initial_stream_window_size( (1 << 31) - 2)
309 //.http2_initial_connection_window_size( (1 << 31) - 2)
310 .build
::<_
, Body
>(https
);
312 let password
= options
.password
.take();
313 let use_ticket_cache
= options
.ticket_cache
&& options
.prefix
.is_some();
315 let password
= if let Some(password
) = password
{
318 let userid
= if auth_id
.is_token() {
319 bail
!("API token secret must be provided!");
323 let mut ticket_info
= None
;
324 if use_ticket_cache
{
325 ticket_info
= load_ticket_info(options
.prefix
.as_ref().unwrap(), server
, userid
);
327 if let Some((ticket
, _token
)) = ticket_info
{
330 Self::get_password(userid
, options
.interactive
)?
334 let auth
= Arc
::new(RwLock
::new(AuthInfo
{
335 auth_id
: auth_id
.clone(),
336 ticket
: password
.clone(),
337 token
: "".to_string(),
340 let server2
= server
.to_string();
341 let client2
= client
.clone();
342 let auth2
= auth
.clone();
343 let prefix2
= options
.prefix
.clone();
345 let renewal_future
= async
move {
347 tokio
::time
::sleep(Duration
::new(60*15, 0)).await
; // 15 minutes
348 let (auth_id
, ticket
) = {
349 let authinfo
= auth2
.read().unwrap().clone();
350 (authinfo
.auth_id
, authinfo
.ticket
)
352 match Self::credentials(client2
.clone(), server2
.clone(), port
, auth_id
.user().clone(), ticket
).await
{
354 if use_ticket_cache
& &prefix2
.is_some() {
355 let _
= store_ticket_info(prefix2
.as_ref().unwrap(), &server2
, &auth
.auth_id
.to_string(), &auth
.ticket
, &auth
.token
);
357 *auth2
.write().unwrap() = auth
;
360 eprintln
!("re-authentication failed: {}", err
);
367 let (renewal_future
, ticket_abort
) = futures
::future
::abortable(renewal_future
);
369 let login_future
= Self::credentials(
373 auth_id
.user().clone(),
376 let server
= server
.to_string();
377 let prefix
= options
.prefix
.clone();
378 let authinfo
= auth
.clone();
381 if use_ticket_cache
& &prefix
.is_some() {
382 let _
= store_ticket_info(prefix
.as_ref().unwrap(), &server
, &auth
.auth_id
.to_string(), &auth
.ticket
, &auth
.token
);
384 *authinfo
.write().unwrap() = auth
;
385 tokio
::spawn(renewal_future
);
389 let first_auth
= if auth_id
.is_token() {
390 // TODO check access here?
393 Some(BroadcastFuture
::new(Box
::new(login_future
)))
398 server
: String
::from(server
),
400 fingerprint
: verified_fingerprint
,
410 /// Login is done on demand, so this is only required if you need
411 /// access to authentication data in 'AuthInfo'.
413 /// Note: tickets a periodially re-newed, so one can use this
414 /// to query changed ticket.
415 pub async
fn login(&self) -> Result
<AuthInfo
, Error
> {
416 if let Some(future
) = &self.first_auth
{
417 future
.listen().await?
;
420 let authinfo
= self.auth
.read().unwrap();
424 /// Returns the optional fingerprint passed to the new() constructor.
425 pub fn fingerprint(&self) -> Option
<String
> {
426 (*self.fingerprint
.lock().unwrap()).clone()
429 fn get_password(username
: &Userid
, interactive
: bool
) -> Result
<String
, Error
> {
430 // If we're on a TTY, query the user for a password
431 if interactive
&& tty
::stdin_isatty() {
432 let msg
= format
!("Password for \"{}\": ", username
);
433 return Ok(String
::from_utf8(tty
::read_password(&msg
)?
)?
);
436 bail
!("no password input mechanism available");
441 &mut X509StoreContextRef
,
442 expected_fingerprint
: Option
<String
>,
444 ) -> (bool
, Option
<String
>) {
445 if valid { return (true, None); }
447 let cert
= match ctx
.current_cert() {
449 None
=> return (false, None
),
452 let depth
= ctx
.error_depth();
453 if depth
!= 0 { return (false, None); }
455 let fp
= match cert
.digest(openssl
::hash
::MessageDigest
::sha256()) {
457 Err(_
) => return (false, None
), // should not happen
459 let fp_string
= proxmox
::tools
::digest_to_hex(&fp
);
460 let fp_string
= fp_string
.as_bytes().chunks(2).map(|v
| std
::str::from_utf8(v
).unwrap())
461 .collect
::<Vec
<&str>>().join(":");
463 if let Some(expected_fingerprint
) = expected_fingerprint
{
464 if expected_fingerprint
.to_lowercase() == fp_string
{
465 return (true, Some(fp_string
));
467 return (false, None
);
471 // If we're on a TTY, query the user
472 if interactive
&& tty
::stdin_isatty() {
473 println
!("fingerprint: {}", fp_string
);
475 print
!("Are you sure you want to continue connecting? (y/n): ");
476 let _
= std
::io
::stdout().flush();
477 use std
::io
::{BufRead, BufReader}
;
478 let mut line
= String
::new();
479 match BufReader
::new(std
::io
::stdin()).read_line(&mut line
) {
481 let trimmed
= line
.trim();
482 if trimmed
== "y" || trimmed
== "Y" {
483 return (true, Some(fp_string
));
484 } else if trimmed
== "n" || trimmed
== "N" {
485 return (false, None
);
490 Err(_
) => return (false, None
),
497 pub async
fn request(&self, mut req
: Request
<Body
>) -> Result
<Value
, Error
> {
499 let client
= self.client
.clone();
501 let auth
= self.login().await?
;
502 if auth
.auth_id
.is_token() {
503 let enc_api_token
= format
!("PBSAPIToken {}:{}", auth
.auth_id
, percent_encode(auth
.ticket
.as_bytes(), DEFAULT_ENCODE_SET
));
504 req
.headers_mut().insert("Authorization", HeaderValue
::from_str(&enc_api_token
).unwrap());
506 let enc_ticket
= format
!("PBSAuthCookie={}", percent_encode(auth
.ticket
.as_bytes(), DEFAULT_ENCODE_SET
));
507 req
.headers_mut().insert("Cookie", HeaderValue
::from_str(&enc_ticket
).unwrap());
508 req
.headers_mut().insert("CSRFPreventionToken", HeaderValue
::from_str(&auth
.token
).unwrap());
511 Self::api_request(client
, req
).await
518 ) -> Result
<Value
, Error
> {
519 let req
= Self::request_builder(&self.server
, self.port
, "GET", path
, data
)?
;
520 self.request(req
).await
527 ) -> Result
<Value
, Error
> {
528 let req
= Self::request_builder(&self.server
, self.port
, "DELETE", path
, data
)?
;
529 self.request(req
).await
536 ) -> Result
<Value
, Error
> {
537 let req
= Self::request_builder(&self.server
, self.port
, "POST", path
, data
)?
;
538 self.request(req
).await
545 ) -> Result
<Value
, Error
> {
546 let req
= Self::request_builder(&self.server
, self.port
, "PUT", path
, data
)?
;
547 self.request(req
).await
550 pub async
fn download(
553 output
: &mut (dyn Write
+ Send
),
554 ) -> Result
<(), Error
> {
555 let mut req
= Self::request_builder(&self.server
, self.port
, "GET", path
, None
)?
;
557 let client
= self.client
.clone();
559 let auth
= self.login().await?
;
561 let enc_ticket
= format
!("PBSAuthCookie={}", percent_encode(auth
.ticket
.as_bytes(), DEFAULT_ENCODE_SET
));
562 req
.headers_mut().insert("Cookie", HeaderValue
::from_str(&enc_ticket
).unwrap());
564 let resp
= tokio
::time
::timeout(
569 .map_err(|_
| format_err
!("http download request timed out"))??
;
570 let status
= resp
.status();
571 if !status
.is_success() {
572 HttpClient
::api_response(resp
)
573 .map(|_
| Err(format_err
!("unknown error")))
577 .map_err(Error
::from
)
578 .try_fold(output
, move |acc
, chunk
| async
move {
579 acc
.write_all(&chunk
)?
;
593 ) -> Result
<Value
, Error
> {
595 let path
= path
.trim_matches('
/'
);
596 let mut url
= format
!("https://{}:{}/{}", &self.server
, self.port
, path
);
598 if let Some(data
) = data
{
599 let query
= tools
::json_object_to_query(data
).unwrap();
601 url
.push_str(&query
);
604 let url
: Uri
= url
.parse().unwrap();
606 let req
= Request
::builder()
609 .header("User-Agent", "proxmox-backup-client/1.0")
610 .header("Content-Type", content_type
)
611 .body(body
).unwrap();
613 self.request(req
).await
616 pub async
fn start_h2_connection(
618 mut req
: Request
<Body
>,
619 protocol_name
: String
,
620 ) -> Result
<(H2Client
, futures
::future
::AbortHandle
), Error
> {
622 let client
= self.client
.clone();
623 let auth
= self.login().await?
;
625 if auth
.auth_id
.is_token() {
626 let enc_api_token
= format
!("PBSAPIToken {}:{}", auth
.auth_id
, percent_encode(auth
.ticket
.as_bytes(), DEFAULT_ENCODE_SET
));
627 req
.headers_mut().insert("Authorization", HeaderValue
::from_str(&enc_api_token
).unwrap());
629 let enc_ticket
= format
!("PBSAuthCookie={}", percent_encode(auth
.ticket
.as_bytes(), DEFAULT_ENCODE_SET
));
630 req
.headers_mut().insert("Cookie", HeaderValue
::from_str(&enc_ticket
).unwrap());
631 req
.headers_mut().insert("CSRFPreventionToken", HeaderValue
::from_str(&auth
.token
).unwrap());
634 req
.headers_mut().insert("UPGRADE", HeaderValue
::from_str(&protocol_name
).unwrap());
636 let resp
= tokio
::time
::timeout(
641 .map_err(|_
| format_err
!("http upgrade request timed out"))??
;
642 let status
= resp
.status();
644 if status
!= http
::StatusCode
::SWITCHING_PROTOCOLS
{
645 Self::api_response(resp
).await?
;
646 bail
!("unknown error");
649 let upgraded
= hyper
::upgrade
::on(resp
).await?
;
651 let max_window_size
= (1 << 31) - 2;
653 let (h2
, connection
) = h2
::client
::Builder
::new()
654 .initial_connection_window_size(max_window_size
)
655 .initial_window_size(max_window_size
)
656 .max_frame_size(4*1024*1024)
660 let connection
= connection
661 .map_err(|_
| eprintln
!("HTTP/2.0 connection failed"));
663 let (connection
, abort
) = futures
::future
::abortable(connection
);
664 // A cancellable future returns an Option which is None when cancelled and
665 // Some when it finished instead, since we don't care about the return type we
666 // need to map it away:
667 let connection
= connection
.map(|_
| ());
669 // Spawn a new task to drive the connection state
670 tokio
::spawn(connection
);
672 // Wait until the `SendRequest` handle has available capacity.
673 let c
= h2
.ready().await?
;
674 Ok((H2Client
::new(c
), abort
))
677 async
fn credentials(
678 client
: Client
<HttpsConnector
>,
683 ) -> Result
<AuthInfo
, Error
> {
684 let data
= json
!({ "username": username, "password": password }
);
685 let req
= Self::request_builder(&server
, port
, "POST", "/api2/json/access/ticket", Some(data
))?
;
686 let cred
= Self::api_request(client
, req
).await?
;
687 let auth
= AuthInfo
{
688 auth_id
: cred
["data"]["username"].as_str().unwrap().parse()?
,
689 ticket
: cred
["data"]["ticket"].as_str().unwrap().to_owned(),
690 token
: cred
["data"]["CSRFPreventionToken"].as_str().unwrap().to_owned(),
696 async
fn api_response(response
: Response
<Body
>) -> Result
<Value
, Error
> {
697 let status
= response
.status();
698 let data
= hyper
::body
::to_bytes(response
.into_body()).await?
;
700 let text
= String
::from_utf8(data
.to_vec()).unwrap();
701 if status
.is_success() {
705 let value
: Value
= serde_json
::from_str(&text
)?
;
709 Err(Error
::from(HttpError
::new(status
, text
)))
713 async
fn api_request(
714 client
: Client
<HttpsConnector
>,
716 ) -> Result
<Value
, Error
> {
719 tokio
::time
::timeout(
724 .map_err(|_
| format_err
!("http request timed out"))??
728 // Read-only access to server property
729 pub fn server(&self) -> &str {
733 pub fn port(&self) -> u16 {
737 pub fn request_builder(server
: &str, port
: u16, method
: &str, path
: &str, data
: Option
<Value
>) -> Result
<Request
<Body
>, Error
> {
738 let path
= path
.trim_matches('
/'
);
739 let url
: Uri
= format
!("https://{}:{}/{}", server
, port
, path
).parse()?
;
741 if let Some(data
) = data
{
742 if method
== "POST" {
743 let request
= Request
::builder()
746 .header("User-Agent", "proxmox-backup-client/1.0")
747 .header(hyper
::header
::CONTENT_TYPE
, "application/json")
748 .body(Body
::from(data
.to_string()))?
;
751 let query
= tools
::json_object_to_query(data
)?
;
752 let url
: Uri
= format
!("https://{}:{}/{}?{}", server
, port
, path
, query
).parse()?
;
753 let request
= Request
::builder()
756 .header("User-Agent", "proxmox-backup-client/1.0")
757 .header(hyper
::header
::CONTENT_TYPE
, "application/x-www-form-urlencoded")
758 .body(Body
::empty())?
;
763 let request
= Request
::builder()
766 .header("User-Agent", "proxmox-backup-client/1.0")
767 .header(hyper
::header
::CONTENT_TYPE
, "application/x-www-form-urlencoded")
768 .body(Body
::empty())?
;
774 impl Drop
for HttpClient
{
776 self.ticket_abort
.abort();
782 pub struct H2Client
{
783 h2
: h2
::client
::SendRequest
<bytes
::Bytes
>,
788 pub fn new(h2
: h2
::client
::SendRequest
<bytes
::Bytes
>) -> Self {
796 ) -> Result
<Value
, Error
> {
797 let req
= Self::request_builder("localhost", "GET", path
, param
, None
).unwrap();
798 self.request(req
).await
805 ) -> Result
<Value
, Error
> {
806 let req
= Self::request_builder("localhost", "PUT", path
, param
, None
).unwrap();
807 self.request(req
).await
814 ) -> Result
<Value
, Error
> {
815 let req
= Self::request_builder("localhost", "POST", path
, param
, None
).unwrap();
816 self.request(req
).await
819 pub async
fn download
<W
: Write
+ Send
>(
822 param
: Option
<Value
>,
824 ) -> Result
<(), Error
> {
825 let request
= Self::request_builder("localhost", "GET", path
, param
, None
).unwrap();
827 let response_future
= self.send_request(request
, None
).await?
;
829 let resp
= response_future
.await?
;
831 let status
= resp
.status();
832 if !status
.is_success() {
833 H2Client
::h2api_response(resp
).await?
; // raise error
837 let mut body
= resp
.into_body();
838 while let Some(chunk
) = body
.data().await
{
840 body
.flow_control().release_capacity(chunk
.len())?
;
841 output
.write_all(&chunk
)?
;
849 method
: &str, // POST or PUT
851 param
: Option
<Value
>,
854 ) -> Result
<Value
, Error
> {
855 let request
= Self::request_builder("localhost", method
, path
, param
, Some(content_type
)).unwrap();
857 let mut send_request
= self.h2
.clone().ready().await?
;
859 let (response
, stream
) = send_request
.send_request(request
, false).unwrap();
861 PipeToSendStream
::new(bytes
::Bytes
::from(data
), stream
).await?
;
864 .map_err(Error
::from
)
865 .and_then(Self::h2api_response
)
871 request
: Request
<()>,
872 ) -> Result
<Value
, Error
> {
874 self.send_request(request
, None
)
875 .and_then(move |response
| {
877 .map_err(Error
::from
)
878 .and_then(Self::h2api_response
)
885 request
: Request
<()>,
886 data
: Option
<bytes
::Bytes
>,
887 ) -> impl Future
<Output
= Result
<h2
::client
::ResponseFuture
, Error
>> {
891 .map_err(Error
::from
)
892 .and_then(move |mut send_request
| async
move {
893 if let Some(data
) = data
{
894 let (response
, stream
) = send_request
.send_request(request
, false).unwrap();
895 PipeToSendStream
::new(data
, stream
).await?
;
898 let (response
, _stream
) = send_request
.send_request(request
, true).unwrap();
904 pub async
fn h2api_response(
905 response
: Response
<h2
::RecvStream
>,
906 ) -> Result
<Value
, Error
> {
907 let status
= response
.status();
909 let (_head
, mut body
) = response
.into_parts();
911 let mut data
= Vec
::new();
912 while let Some(chunk
) = body
.data().await
{
914 // Whenever data is received, the caller is responsible for
915 // releasing capacity back to the server once it has freed
916 // the data from memory.
917 // Let the server send more data.
918 body
.flow_control().release_capacity(chunk
.len())?
;
922 let text
= String
::from_utf8(data
.to_vec()).unwrap();
923 if status
.is_success() {
927 let mut value
: Value
= serde_json
::from_str(&text
)?
;
928 if let Some(map
) = value
.as_object_mut() {
929 if let Some(data
) = map
.remove("data") {
933 bail
!("got result without data property");
936 Err(Error
::from(HttpError
::new(status
, text
)))
940 // Note: We always encode parameters with the url
941 pub fn request_builder(
945 param
: Option
<Value
>,
946 content_type
: Option
<&str>,
947 ) -> Result
<Request
<()>, Error
> {
948 let path
= path
.trim_matches('
/'
);
950 let content_type
= content_type
.unwrap_or("application/x-www-form-urlencoded");
952 if let Some(param
) = param
{
953 let query
= tools
::json_object_to_query(param
)?
;
954 // We detected problem with hyper around 6000 characters - seo we try to keep on the safe side
955 if query
.len() > 4096 { bail!("h2 query data too large ({} bytes
) - please encode data inside body
", query.len()); }
956 let url: Uri = format!("https
://{}:8007/{}?{}", server, path, query).parse()?;
957 let request
= Request
::builder()
960 .header("User-Agent", "proxmox-backup-client/1.0")
961 .header(hyper
::header
::CONTENT_TYPE
, content_type
)
965 let url
: Uri
= format
!("https://{}:8007/{}", server
, path
).parse()?
;
966 let request
= Request
::builder()
969 .header("User-Agent", "proxmox-backup-client/1.0")
970 .header(hyper
::header
::CONTENT_TYPE
, content_type
)