1 use anyhow
::{Error, format_err, bail}
;
2 use std
::task
::{Context, Poll}
;
3 use std
::os
::unix
::io
::AsRawFd
;
4 use std
::collections
::HashMap
;
8 use hyper
::{Uri, Body}
;
9 use hyper
::client
::{Client, HttpConnector}
;
10 use http
::{Request, Response}
;
11 use openssl
::ssl
::{SslConnector, SslMethod}
;
21 use tokio_openssl
::SslStream
;
24 async_io
::MaybeTlsStream
,
27 PROXMOX_BACKUP_TCP_KEEPALIVE_TIME
,
31 /// HTTP Proxy Configuration
33 pub struct ProxyConfig
{
36 pub force_connect
: bool
,
39 /// Asyncrounous HTTP client implementation
40 pub struct SimpleHttp
{
41 client
: Client
<HttpsConnector
, Body
>,
46 pub fn new(proxy_config
: Option
<ProxyConfig
>) -> Self {
47 let ssl_connector
= SslConnector
::builder(SslMethod
::tls()).unwrap().build();
48 Self::with_ssl_connector(ssl_connector
, proxy_config
)
51 pub fn with_ssl_connector(ssl_connector
: SslConnector
, proxy_config
: Option
<ProxyConfig
>) -> Self {
52 let connector
= HttpConnector
::new();
53 let mut https
= HttpsConnector
::with_connector(connector
, ssl_connector
);
54 if let Some(proxy_config
) = proxy_config
{
55 https
.set_proxy(proxy_config
);
57 let client
= Client
::builder().build(https
);
61 pub async
fn request(&self, request
: Request
<Body
>) -> Result
<Response
<Body
>, Error
> {
62 self.client
.request(request
)
71 content_type
: Option
<&str>,
72 ) -> Result
<Response
<Body
>, Error
> {
74 let body
= if let Some(body
) = body
{
79 let content_type
= content_type
.unwrap_or("application/json");
81 let request
= Request
::builder()
84 .header("User-Agent", "proxmox-backup-client/1.0")
85 .header(hyper
::header
::CONTENT_TYPE
, content_type
)
88 self.client
.request(request
)
93 pub async
fn get_string(
96 extra_headers
: Option
<&HashMap
<String
, String
>>,
97 ) -> Result
<String
, Error
> {
99 let mut request
= Request
::builder()
102 .header("User-Agent", "proxmox-backup-client/1.0");
104 if let Some(hs
) = extra_headers
{
105 for (h
, v
) in hs
.iter() {
106 request
= request
.header(h
, v
);
110 let request
= request
.body(Body
::empty())?
;
112 let res
= self.client
.request(request
).await?
;
114 let status
= res
.status();
115 if !status
.is_success() {
116 bail
!("Got bad status '{}' from server", status
)
119 Self::response_body_string(res
).await
122 pub async
fn response_body_string(res
: Response
<Body
>) -> Result
<String
, Error
> {
123 let buf
= hyper
::body
::to_bytes(res
).await?
;
124 String
::from_utf8(buf
.to_vec())
125 .map_err(|err
| format_err
!("Error converting HTTP result data: {}", err
))
130 pub struct HttpsConnector
{
131 connector
: HttpConnector
,
132 ssl_connector
: Arc
<SslConnector
>,
133 proxy
: Option
<ProxyConfig
>,
136 impl HttpsConnector
{
137 pub fn with_connector(mut connector
: HttpConnector
, ssl_connector
: SslConnector
) -> Self {
138 connector
.enforce_http(false);
141 ssl_connector
: Arc
::new(ssl_connector
),
146 pub fn set_proxy(&mut self, proxy
: ProxyConfig
) {
147 self.proxy
= Some(proxy
);
150 async
fn secure_stream(
151 tcp_stream
: TcpStream
,
152 ssl_connector
: &SslConnector
,
154 ) -> Result
<MaybeTlsStream
<TcpStream
>, Error
> {
155 let config
= ssl_connector
.configure()?
;
156 let mut conn
: SslStream
<TcpStream
> = SslStream
::new(config
.into_ssl(host
)?
, tcp_stream
)?
;
157 Pin
::new(&mut conn
).connect().await?
;
158 Ok(MaybeTlsStream
::Secured(conn
))
161 fn parse_status_line(status_line
: &str) -> Result
<(), Error
> {
162 if !(status_line
.starts_with("HTTP/1.1 200") || status_line
.starts_with("HTTP/1.0 200")) {
163 bail
!("proxy connect failed - invalid status: {}", status_line
)
168 async
fn parse_connect_response
<R
: AsyncRead
+ Unpin
>(
170 ) -> Result
<(), Error
> {
172 let mut data
: Vec
<u8> = Vec
::new();
173 let mut buffer
= [0u8; 256];
174 const END_MARK
: &[u8; 4] = b
"\r\n\r\n";
177 let n
= stream
.read(&mut buffer
[..]).await?
;
179 let search_start
= if data
.len() > END_MARK
.len() { data.len() - END_MARK.len() + 1 }
else { 0 }
;
180 data
.extend(&buffer
[..n
]);
181 if data
.len() >= END_MARK
.len() {
182 if let Some(pos
) = data
[search_start
..].windows(END_MARK
.len()).position(|w
| w
== END_MARK
) {
183 let response
= String
::from_utf8_lossy(&data
);
184 let status_line
= match response
.split("\r\n").next() {
185 Some(status
) => status
,
186 None
=> bail
!("missing newline"),
188 Self::parse_status_line(status_line
)?
;
190 if pos
!= data
.len() - END_MARK
.len() {
191 bail
!("unexpected data after connect response");
196 if data
.len() > 1024*32 { // max 32K (random chosen limit)
197 bail
!("too many bytes");
204 impl hyper
::service
::Service
<Uri
> for HttpsConnector
{
205 type Response
= MaybeTlsStream
<TcpStream
>;
207 #[allow(clippy::type_complexity)]
208 type Future
= Pin
<Box
<dyn Future
<Output
= Result
<Self::Response
, Self::Error
>> + Send
+ '
static>>;
210 fn poll_ready(&mut self, ctx
: &mut Context
<'_
>) -> Poll
<Result
<(), Self::Error
>> {
213 .map_err(|err
| err
.into())
216 fn call(&mut self, dst
: Uri
) -> Self::Future
{
217 let mut connector
= self.connector
.clone();
218 let ssl_connector
= Arc
::clone(&self.ssl_connector
);
219 let is_https
= dst
.scheme() == Some(&http
::uri
::Scheme
::HTTPS
);
220 let host
= match dst
.host() {
221 Some(host
) => host
.to_owned(),
223 return futures
::future
::err(format_err
!("missing URL scheme")).boxed();
226 let port
= dst
.port_u16().unwrap_or(if is_https { 443 }
else { 80 }
);
228 if let Some(ref proxy
) = self.proxy
{
230 let use_connect
= is_https
|| proxy
.force_connect
;
232 let proxy_url
= format
!("{}:{}", proxy
.host
, proxy
.port
);
233 let proxy_uri
= match Uri
::builder()
235 .authority(proxy_url
.as_str())
240 Err(err
) => return futures
::future
::err(err
.into()).boxed(),
246 let mut tcp_stream
= connector
249 .map_err(|err
| format_err
!("error connecting to {} - {}", proxy_url
, err
))?
;
251 let _
= set_tcp_keepalive(tcp_stream
.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME
);
253 let connect_request
= format
!(
254 "CONNECT {0}:{1} HTTP/1.1\r\n\
255 Host: {0}:{1}\r\n\r\n",
259 tcp_stream
.write_all(connect_request
.as_bytes()).await?
;
260 tcp_stream
.flush().await?
;
262 Self::parse_connect_response(&mut tcp_stream
).await?
;
265 Self::secure_stream(tcp_stream
, &ssl_connector
, &host
).await
267 Ok(MaybeTlsStream
::Normal(tcp_stream
))
272 let tcp_stream
= connector
275 .map_err(|err
| format_err
!("error connecting to {} - {}", proxy_url
, err
))?
;
277 let _
= set_tcp_keepalive(tcp_stream
.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME
);
279 Ok(MaybeTlsStream
::Proxied(tcp_stream
))
284 let dst_str
= dst
.to_string(); // for error messages
285 let tcp_stream
= connector
288 .map_err(|err
| format_err
!("error connecting to {} - {}", dst_str
, err
))?
;
290 let _
= set_tcp_keepalive(tcp_stream
.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME
);
293 Self::secure_stream(tcp_stream
, &ssl_connector
, &host
).await
295 Ok(MaybeTlsStream
::Normal(tcp_stream
))