1 use anyhow
::{Error, format_err, bail}
;
2 use std
::task
::{Context, Poll}
;
3 use std
::os
::unix
::io
::AsRawFd
;
7 use hyper
::client
::HttpConnector
;
8 use http
::{Uri, uri::Authority}
;
9 use openssl
::ssl
::SslConnector
;
19 use tokio_openssl
::SslStream
;
22 async_io
::MaybeTlsStream
,
25 PROXMOX_BACKUP_TCP_KEEPALIVE_TIME
,
29 // Build a http::uri::Authority ("host:port"), use '[..]' around IPv6 addresses
30 fn build_authority(host
: &str, port
: u16) -> Result
<Authority
, Error
> {
31 let bytes
= host
.as_bytes();
32 let len
= bytes
.len();
33 let authority
= if len
> 3 && bytes
.contains(&b'
:'
) && bytes
[0] != b'
['
&& bytes
[len
-1] != b'
]'
{
34 format
!("[{}]:{}", host
, port
).parse()?
36 format
!("{}:{}", host
, port
).parse()?
41 /// HTTP Proxy Configuration
43 pub struct ProxyConfig
{
46 pub authorization
: Option
<String
>, // Proxy-Authorization header value
47 pub force_connect
: bool
,
52 /// Parse proxy config from ALL_PROXY environment var
53 pub fn from_proxy_env() -> Result
<Option
<ProxyConfig
>, Error
> {
55 // We only support/use ALL_PROXY environment
57 match std
::env
::var_os("ALL_PROXY") {
58 None
=> return Ok(None
),
60 let all_proxy
= match all_proxy
.to_str() {
61 Some(s
) => String
::from(s
),
62 None
=> bail
!("non UTF-8 content in env ALL_PROXY"),
64 if all_proxy
.is_empty() {
67 let config
= Self::parse_proxy_url(&all_proxy
)?
;
73 /// Parse proxy configuration string [http://]<host>[:port]
75 /// Default port is 1080 (like curl)
76 pub fn parse_proxy_url(http_proxy
: &str) -> Result
<ProxyConfig
, Error
> {
78 let proxy_uri
: Uri
= http_proxy
.parse()?
;
79 let proxy_authority
= match proxy_uri
.authority() {
80 Some(authority
) => authority
,
81 None
=> bail
!("missing proxy authority"),
83 let host
= proxy_authority
.host().to_owned();
84 let port
= match proxy_uri
.port() {
85 Some(port
) => port
.as_u16(),
86 None
=> 1080, // CURL default port
89 match proxy_uri
.scheme_str() {
90 Some("http") => { /* Ok */ }
91 Some(scheme
) => bail
!("unsupported proxy scheme '{}'", scheme
),
92 None
=> { /* assume HTTP */ }
95 let authority_vec
: Vec
<&str> = proxy_authority
.as_str().rsplitn(2, '@'
).collect();
96 let authorization
= if authority_vec
.len() == 2 {
97 Some(format
!("Basic {}", base64
::encode(authority_vec
[1])))
106 force_connect
: false,
108 }).map_err(|err
| format_err
!("parse_proxy_url failed: {}", err
))
113 pub struct HttpsConnector
{
114 connector
: HttpConnector
,
115 ssl_connector
: Arc
<SslConnector
>,
116 proxy
: Option
<ProxyConfig
>,
119 impl HttpsConnector
{
120 pub fn with_connector(mut connector
: HttpConnector
, ssl_connector
: SslConnector
) -> Self {
121 connector
.enforce_http(false);
124 ssl_connector
: Arc
::new(ssl_connector
),
129 pub fn set_proxy(&mut self, proxy
: ProxyConfig
) {
130 self.proxy
= Some(proxy
);
133 async
fn secure_stream(
134 tcp_stream
: TcpStream
,
135 ssl_connector
: &SslConnector
,
137 ) -> Result
<MaybeTlsStream
<TcpStream
>, Error
> {
138 let config
= ssl_connector
.configure()?
;
139 let mut conn
: SslStream
<TcpStream
> = SslStream
::new(config
.into_ssl(host
)?
, tcp_stream
)?
;
140 Pin
::new(&mut conn
).connect().await?
;
141 Ok(MaybeTlsStream
::Secured(conn
))
144 fn parse_status_line(status_line
: &str) -> Result
<(), Error
> {
145 if !(status_line
.starts_with("HTTP/1.1 200") || status_line
.starts_with("HTTP/1.0 200")) {
146 bail
!("proxy connect failed - invalid status: {}", status_line
)
151 async
fn parse_connect_response
<R
: AsyncRead
+ Unpin
>(
153 ) -> Result
<(), Error
> {
155 let mut data
: Vec
<u8> = Vec
::new();
156 let mut buffer
= [0u8; 256];
157 const END_MARK
: &[u8; 4] = b
"\r\n\r\n";
160 let n
= stream
.read(&mut buffer
[..]).await?
;
162 let search_start
= if data
.len() > END_MARK
.len() { data.len() - END_MARK.len() + 1 }
else { 0 }
;
163 data
.extend(&buffer
[..n
]);
164 if data
.len() >= END_MARK
.len() {
165 if let Some(pos
) = data
[search_start
..].windows(END_MARK
.len()).position(|w
| w
== END_MARK
) {
166 let response
= String
::from_utf8_lossy(&data
);
167 let status_line
= match response
.split("\r\n").next() {
168 Some(status
) => status
,
169 None
=> bail
!("missing newline"),
171 Self::parse_status_line(status_line
)?
;
173 if pos
!= data
.len() - END_MARK
.len() {
174 bail
!("unexpected data after connect response");
179 if data
.len() > 1024*32 { // max 32K (random chosen limit)
180 bail
!("too many bytes");
187 impl hyper
::service
::Service
<Uri
> for HttpsConnector
{
188 type Response
= MaybeTlsStream
<TcpStream
>;
190 #[allow(clippy::type_complexity)]
191 type Future
= Pin
<Box
<dyn Future
<Output
= Result
<Self::Response
, Self::Error
>> + Send
+ '
static>>;
193 fn poll_ready(&mut self, ctx
: &mut Context
<'_
>) -> Poll
<Result
<(), Self::Error
>> {
196 .map_err(|err
| err
.into())
199 fn call(&mut self, dst
: Uri
) -> Self::Future
{
200 let mut connector
= self.connector
.clone();
201 let ssl_connector
= Arc
::clone(&self.ssl_connector
);
202 let is_https
= dst
.scheme() == Some(&http
::uri
::Scheme
::HTTPS
);
203 let host
= match dst
.host() {
204 Some(host
) => host
.to_owned(),
206 return futures
::future
::err(format_err
!("missing URL scheme")).boxed();
209 let port
= dst
.port_u16().unwrap_or(if is_https { 443 }
else { 80 }
);
211 if let Some(ref proxy
) = self.proxy
{
213 let use_connect
= is_https
|| proxy
.force_connect
;
215 let proxy_authority
= match build_authority(&proxy
.host
, proxy
.port
) {
216 Ok(authority
) => authority
,
217 Err(err
) => return futures
::future
::err(err
).boxed(),
220 let proxy_uri
= match Uri
::builder()
222 .authority(proxy_authority
.as_str())
227 Err(err
) => return futures
::future
::err(err
.into()).boxed(),
230 let authorization
= proxy
.authorization
.clone();
235 let mut tcp_stream
= connector
238 .map_err(|err
| format_err
!("error connecting to {} - {}", proxy_authority
, err
))?
;
240 let _
= set_tcp_keepalive(tcp_stream
.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME
);
242 let mut connect_request
= format
!("CONNECT {0}:{1} HTTP/1.1\r\n", host
, port
);
243 if let Some(authorization
) = authorization
{
244 connect_request
.push_str(&format
!("Proxy-Authorization: {}\r\n", authorization
));
246 connect_request
.push_str(&format
!("Host: {0}:{1}\r\n\r\n", host
, port
));
248 tcp_stream
.write_all(connect_request
.as_bytes()).await?
;
249 tcp_stream
.flush().await?
;
251 Self::parse_connect_response(&mut tcp_stream
).await?
;
254 Self::secure_stream(tcp_stream
, &ssl_connector
, &host
).await
256 Ok(MaybeTlsStream
::Normal(tcp_stream
))
261 let tcp_stream
= connector
264 .map_err(|err
| format_err
!("error connecting to {} - {}", proxy_authority
, err
))?
;
266 let _
= set_tcp_keepalive(tcp_stream
.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME
);
268 Ok(MaybeTlsStream
::Proxied(tcp_stream
))
273 let dst_str
= dst
.to_string(); // for error messages
274 let tcp_stream
= connector
277 .map_err(|err
| format_err
!("error connecting to {} - {}", dst_str
, err
))?
;
279 let _
= set_tcp_keepalive(tcp_stream
.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME
);
282 Self::secure_stream(tcp_stream
, &ssl_connector
, &host
).await
284 Ok(MaybeTlsStream
::Normal(tcp_stream
))