]> git.proxmox.com Git - proxmox-backup.git/blame - src/tools/http.rs
HttpsConnector: make keepalive configurable
[proxmox-backup.git] / src / tools / http.rs
CommitLineData
5eb9dd0c 1use anyhow::{Error, format_err, bail};
5eb9dd0c
SR
2use std::task::{Context, Poll};
3use std::os::unix::io::AsRawFd;
0f860f71 4use std::pin::Pin;
02a58862 5use std::sync::Arc;
5eb9dd0c 6
e5ef69ec
DM
7use hyper::client::HttpConnector;
8use http::{Uri, uri::Authority};
9use openssl::ssl::SslConnector;
5eb9dd0c 10use futures::*;
9104152a
DM
11use tokio::{
12 io::{
b6c06dce
DM
13 AsyncRead,
14 AsyncReadExt,
9104152a 15 AsyncWriteExt,
9104152a
DM
16 },
17 net::TcpStream,
18};
a2072cc3 19use tokio_openssl::SslStream;
5eb9dd0c 20
32413921 21use proxmox::sys::linux::socket::set_tcp_keepalive;
5b43cc44 22use proxmox_http::http::MaybeTlsStream;
32413921 23
e0ba5553 24// Build a http::uri::Authority ("host:port"), use '[..]' around IPv6 addresses
25d78b10 25pub(crate) fn build_authority(host: &str, port: u16) -> Result<Authority, Error> {
e0ba5553
DM
26 let bytes = host.as_bytes();
27 let len = bytes.len();
28 let authority = if len > 3 && bytes.contains(&b':') && bytes[0] != b'[' && bytes[len-1] != b']' {
29 format!("[{}]:{}", host, port).parse()?
30 } else {
31 format!("{}:{}", host, port).parse()?
32 };
33 Ok(authority)
34}
35
9104152a
DM
36/// HTTP Proxy Configuration
37#[derive(Clone)]
38pub struct ProxyConfig {
39 pub host: String,
40 pub port: u16,
440472cb 41 pub authorization: Option<String>, // user:pass
9104152a
DM
42 pub force_connect: bool,
43}
44
7a7fcb47
DM
45impl ProxyConfig {
46
47 /// Parse proxy config from ALL_PROXY environment var
48 pub fn from_proxy_env() -> Result<Option<ProxyConfig>, Error> {
49
50 // We only support/use ALL_PROXY environment
51
52 match std::env::var_os("ALL_PROXY") {
53 None => return Ok(None),
54 Some(all_proxy) => {
55 let all_proxy = match all_proxy.to_str() {
56 Some(s) => String::from(s),
57 None => bail!("non UTF-8 content in env ALL_PROXY"),
58 };
59 if all_proxy.is_empty() {
60 return Ok(None);
61 }
62 let config = Self::parse_proxy_url(&all_proxy)?;
63 Ok(Some(config))
64 }
65 }
66 }
67
68 /// Parse proxy configuration string [http://]<host>[:port]
69 ///
70 /// Default port is 1080 (like curl)
71 pub fn parse_proxy_url(http_proxy: &str) -> Result<ProxyConfig, Error> {
72 proxmox::try_block!({
73 let proxy_uri: Uri = http_proxy.parse()?;
74 let proxy_authority = match proxy_uri.authority() {
75 Some(authority) => authority,
76 None => bail!("missing proxy authority"),
77 };
78 let host = proxy_authority.host().to_owned();
79 let port = match proxy_uri.port() {
80 Some(port) => port.as_u16(),
81 None => 1080, // CURL default port
82 };
83
84 match proxy_uri.scheme_str() {
85 Some("http") => { /* Ok */ }
86 Some(scheme) => bail!("unsupported proxy scheme '{}'", scheme),
87 None => { /* assume HTTP */ }
88 }
89
90 let authority_vec: Vec<&str> = proxy_authority.as_str().rsplitn(2, '@').collect();
91 let authorization = if authority_vec.len() == 2 {
440472cb 92 Some(authority_vec[1].to_string())
7a7fcb47
DM
93 } else {
94 None
95 };
96
97 Ok(ProxyConfig {
98 host,
99 port,
100 authorization,
101 force_connect: false,
102 })
103 }).map_err(|err| format_err!("parse_proxy_url failed: {}", err))
104 }
440472cb
DM
105
106 /// Assemble canonical proxy string (including scheme and port)
107 pub fn to_proxy_string(&self) -> Result<String, Error> {
108 let authority = build_authority(&self.host, self.port)?;
109 Ok(match self.authorization {
110 None => format!("http://{}", authority),
111 Some(ref authorization) => format!("http://{}@{}", authorization, authority)
112 })
113 }
7a7fcb47
DM
114}
115
5eb9dd0c
SR
116#[derive(Clone)]
117pub struct HttpsConnector {
02a58862
DM
118 connector: HttpConnector,
119 ssl_connector: Arc<SslConnector>,
9104152a 120 proxy: Option<ProxyConfig>,
3ed7e875 121 tcp_keepalive: u32,
5eb9dd0c
SR
122}
123
124impl HttpsConnector {
3ed7e875 125 pub fn with_connector(mut connector: HttpConnector, ssl_connector: SslConnector, tcp_keepalive: u32) -> Self {
02a58862 126 connector.enforce_http(false);
5eb9dd0c 127 Self {
02a58862
DM
128 connector,
129 ssl_connector: Arc::new(ssl_connector),
9104152a 130 proxy: None,
3ed7e875 131 tcp_keepalive,
5eb9dd0c
SR
132 }
133 }
9104152a
DM
134
135 pub fn set_proxy(&mut self, proxy: ProxyConfig) {
136 self.proxy = Some(proxy);
137 }
138
139 async fn secure_stream(
140 tcp_stream: TcpStream,
141 ssl_connector: &SslConnector,
142 host: &str,
143 ) -> Result<MaybeTlsStream<TcpStream>, Error> {
144 let config = ssl_connector.configure()?;
145 let mut conn: SslStream<TcpStream> = SslStream::new(config.into_ssl(host)?, tcp_stream)?;
146 Pin::new(&mut conn).connect().await?;
147 Ok(MaybeTlsStream::Secured(conn))
148 }
149
1ed9069a
DM
150 fn parse_status_line(status_line: &str) -> Result<(), Error> {
151 if !(status_line.starts_with("HTTP/1.1 200") || status_line.starts_with("HTTP/1.0 200")) {
152 bail!("proxy connect failed - invalid status: {}", status_line)
153 }
154 Ok(())
155 }
156
157 async fn parse_connect_response<R: AsyncRead + Unpin>(
b6c06dce 158 stream: &mut R,
1ed9069a 159 ) -> Result<(), Error> {
9104152a 160
b6c06dce
DM
161 let mut data: Vec<u8> = Vec::new();
162 let mut buffer = [0u8; 256];
163 const END_MARK: &[u8; 4] = b"\r\n\r\n";
164
165 'outer: loop {
166 let n = stream.read(&mut buffer[..]).await?;
167 if n == 0 { break; }
168 let search_start = if data.len() > END_MARK.len() { data.len() - END_MARK.len() + 1 } else { 0 };
169 data.extend(&buffer[..n]);
170 if data.len() >= END_MARK.len() {
171 if let Some(pos) = data[search_start..].windows(END_MARK.len()).position(|w| w == END_MARK) {
1ed9069a
DM
172 let response = String::from_utf8_lossy(&data);
173 let status_line = match response.split("\r\n").next() {
174 Some(status) => status,
175 None => bail!("missing newline"),
176 };
177 Self::parse_status_line(status_line)?;
178
b6c06dce
DM
179 if pos != data.len() - END_MARK.len() {
180 bail!("unexpected data after connect response");
181 }
182 break 'outer;
183 }
184 }
185 if data.len() > 1024*32 { // max 32K (random chosen limit)
186 bail!("too many bytes");
187 }
188 }
9104152a
DM
189 Ok(())
190 }
5eb9dd0c
SR
191}
192
5eb9dd0c 193impl hyper::service::Service<Uri> for HttpsConnector {
a2072cc3 194 type Response = MaybeTlsStream<TcpStream>;
5eb9dd0c 195 type Error = Error;
12e874ce
FG
196 #[allow(clippy::type_complexity)]
197 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
5eb9dd0c 198
9104152a
DM
199 fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
200 self.connector
201 .poll_ready(ctx)
202 .map_err(|err| err.into())
5eb9dd0c
SR
203 }
204
205 fn call(&mut self, dst: Uri) -> Self::Future {
02a58862
DM
206 let mut connector = self.connector.clone();
207 let ssl_connector = Arc::clone(&self.ssl_connector);
208 let is_https = dst.scheme() == Some(&http::uri::Scheme::HTTPS);
209 let host = match dst.host() {
210 Some(host) => host.to_owned(),
211 None => {
212 return futures::future::err(format_err!("missing URL scheme")).boxed();
213 }
214 };
9104152a 215 let port = dst.port_u16().unwrap_or(if is_https { 443 } else { 80 });
3ed7e875 216 let keepalive = self.tcp_keepalive;
9104152a
DM
217
218 if let Some(ref proxy) = self.proxy {
219
220 let use_connect = is_https || proxy.force_connect;
221
e0ba5553
DM
222 let proxy_authority = match build_authority(&proxy.host, proxy.port) {
223 Ok(authority) => authority,
224 Err(err) => return futures::future::err(err).boxed(),
225 };
226
9104152a
DM
227 let proxy_uri = match Uri::builder()
228 .scheme("http")
e0ba5553 229 .authority(proxy_authority.as_str())
9104152a
DM
230 .path_and_query("/")
231 .build()
232 {
233 Ok(uri) => uri,
234 Err(err) => return futures::future::err(err.into()).boxed(),
235 };
236
cf8e44bc
DM
237 let authorization = proxy.authorization.clone();
238
9104152a
DM
239 if use_connect {
240 async move {
241
b6c06dce 242 let mut tcp_stream = connector
9104152a
DM
243 .call(proxy_uri)
244 .await
e0ba5553 245 .map_err(|err| format_err!("error connecting to {} - {}", proxy_authority, err))?;
fa016c16 246
3ed7e875 247 let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
5eb9dd0c 248
cf8e44bc
DM
249 let mut connect_request = format!("CONNECT {0}:{1} HTTP/1.1\r\n", host, port);
250 if let Some(authorization) = authorization {
440472cb
DM
251 connect_request.push_str(&format!(
252 "Proxy-Authorization: Basic {}\r\n",
253 base64::encode(authorization),
254 ));
cf8e44bc
DM
255 }
256 connect_request.push_str(&format!("Host: {0}:{1}\r\n\r\n", host, port));
9104152a 257
b6c06dce
DM
258 tcp_stream.write_all(connect_request.as_bytes()).await?;
259 tcp_stream.flush().await?;
9104152a 260
1ed9069a 261 Self::parse_connect_response(&mut tcp_stream).await?;
9104152a
DM
262
263 if is_https {
264 Self::secure_stream(tcp_stream, &ssl_connector, &host).await
265 } else {
266 Ok(MaybeTlsStream::Normal(tcp_stream))
267 }
268 }.boxed()
5eb9dd0c 269 } else {
9104152a
DM
270 async move {
271 let tcp_stream = connector
272 .call(proxy_uri)
273 .await
e0ba5553 274 .map_err(|err| format_err!("error connecting to {} - {}", proxy_authority, err))?;
9104152a 275
3ed7e875 276 let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
9104152a
DM
277
278 Ok(MaybeTlsStream::Proxied(tcp_stream))
279 }.boxed()
5eb9dd0c 280 }
9104152a
DM
281 } else {
282 async move {
283 let dst_str = dst.to_string(); // for error messages
284 let tcp_stream = connector
285 .call(dst)
286 .await
287 .map_err(|err| format_err!("error connecting to {} - {}", dst_str, err))?;
288
3ed7e875 289 let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
9104152a
DM
290
291 if is_https {
292 Self::secure_stream(tcp_stream, &ssl_connector, &host).await
293 } else {
294 Ok(MaybeTlsStream::Normal(tcp_stream))
295 }
296 }.boxed()
297 }
5eb9dd0c
SR
298 }
299}