]>
Commit | Line | Data |
---|---|---|
5eb9dd0c | 1 | use anyhow::{Error, format_err, bail}; |
5eb9dd0c SR |
2 | use std::task::{Context, Poll}; |
3 | use std::os::unix::io::AsRawFd; | |
0f860f71 | 4 | use std::pin::Pin; |
02a58862 | 5 | use std::sync::Arc; |
5eb9dd0c | 6 | |
e5ef69ec DM |
7 | use hyper::client::HttpConnector; |
8 | use http::{Uri, uri::Authority}; | |
9 | use openssl::ssl::SslConnector; | |
5eb9dd0c | 10 | use futures::*; |
9104152a DM |
11 | use tokio::{ |
12 | io::{ | |
b6c06dce DM |
13 | AsyncRead, |
14 | AsyncReadExt, | |
9104152a | 15 | AsyncWriteExt, |
9104152a DM |
16 | }, |
17 | net::TcpStream, | |
18 | }; | |
a2072cc3 | 19 | use tokio_openssl::SslStream; |
5eb9dd0c | 20 | |
32413921 | 21 | use proxmox::sys::linux::socket::set_tcp_keepalive; |
5b43cc44 | 22 | use proxmox_http::http::MaybeTlsStream; |
32413921 | 23 | |
e0ba5553 | 24 | // Build a http::uri::Authority ("host:port"), use '[..]' around IPv6 addresses |
25d78b10 | 25 | pub(crate) fn build_authority(host: &str, port: u16) -> Result<Authority, Error> { |
e0ba5553 DM |
26 | let bytes = host.as_bytes(); |
27 | let len = bytes.len(); | |
28 | let authority = if len > 3 && bytes.contains(&b':') && bytes[0] != b'[' && bytes[len-1] != b']' { | |
29 | format!("[{}]:{}", host, port).parse()? | |
30 | } else { | |
31 | format!("{}:{}", host, port).parse()? | |
32 | }; | |
33 | Ok(authority) | |
34 | } | |
35 | ||
9104152a DM |
36 | /// HTTP Proxy Configuration |
37 | #[derive(Clone)] | |
38 | pub struct ProxyConfig { | |
39 | pub host: String, | |
40 | pub port: u16, | |
440472cb | 41 | pub authorization: Option<String>, // user:pass |
9104152a DM |
42 | pub force_connect: bool, |
43 | } | |
44 | ||
7a7fcb47 DM |
45 | impl ProxyConfig { |
46 | ||
47 | /// Parse proxy config from ALL_PROXY environment var | |
48 | pub fn from_proxy_env() -> Result<Option<ProxyConfig>, Error> { | |
49 | ||
50 | // We only support/use ALL_PROXY environment | |
51 | ||
52 | match std::env::var_os("ALL_PROXY") { | |
53 | None => return Ok(None), | |
54 | Some(all_proxy) => { | |
55 | let all_proxy = match all_proxy.to_str() { | |
56 | Some(s) => String::from(s), | |
57 | None => bail!("non UTF-8 content in env ALL_PROXY"), | |
58 | }; | |
59 | if all_proxy.is_empty() { | |
60 | return Ok(None); | |
61 | } | |
62 | let config = Self::parse_proxy_url(&all_proxy)?; | |
63 | Ok(Some(config)) | |
64 | } | |
65 | } | |
66 | } | |
67 | ||
68 | /// Parse proxy configuration string [http://]<host>[:port] | |
69 | /// | |
70 | /// Default port is 1080 (like curl) | |
71 | pub fn parse_proxy_url(http_proxy: &str) -> Result<ProxyConfig, Error> { | |
72 | proxmox::try_block!({ | |
73 | let proxy_uri: Uri = http_proxy.parse()?; | |
74 | let proxy_authority = match proxy_uri.authority() { | |
75 | Some(authority) => authority, | |
76 | None => bail!("missing proxy authority"), | |
77 | }; | |
78 | let host = proxy_authority.host().to_owned(); | |
79 | let port = match proxy_uri.port() { | |
80 | Some(port) => port.as_u16(), | |
81 | None => 1080, // CURL default port | |
82 | }; | |
83 | ||
84 | match proxy_uri.scheme_str() { | |
85 | Some("http") => { /* Ok */ } | |
86 | Some(scheme) => bail!("unsupported proxy scheme '{}'", scheme), | |
87 | None => { /* assume HTTP */ } | |
88 | } | |
89 | ||
90 | let authority_vec: Vec<&str> = proxy_authority.as_str().rsplitn(2, '@').collect(); | |
91 | let authorization = if authority_vec.len() == 2 { | |
440472cb | 92 | Some(authority_vec[1].to_string()) |
7a7fcb47 DM |
93 | } else { |
94 | None | |
95 | }; | |
96 | ||
97 | Ok(ProxyConfig { | |
98 | host, | |
99 | port, | |
100 | authorization, | |
101 | force_connect: false, | |
102 | }) | |
103 | }).map_err(|err| format_err!("parse_proxy_url failed: {}", err)) | |
104 | } | |
440472cb DM |
105 | |
106 | /// Assemble canonical proxy string (including scheme and port) | |
107 | pub fn to_proxy_string(&self) -> Result<String, Error> { | |
108 | let authority = build_authority(&self.host, self.port)?; | |
109 | Ok(match self.authorization { | |
110 | None => format!("http://{}", authority), | |
111 | Some(ref authorization) => format!("http://{}@{}", authorization, authority) | |
112 | }) | |
113 | } | |
7a7fcb47 DM |
114 | } |
115 | ||
5eb9dd0c SR |
116 | #[derive(Clone)] |
117 | pub struct HttpsConnector { | |
02a58862 DM |
118 | connector: HttpConnector, |
119 | ssl_connector: Arc<SslConnector>, | |
9104152a | 120 | proxy: Option<ProxyConfig>, |
3ed7e875 | 121 | tcp_keepalive: u32, |
5eb9dd0c SR |
122 | } |
123 | ||
124 | impl HttpsConnector { | |
3ed7e875 | 125 | pub fn with_connector(mut connector: HttpConnector, ssl_connector: SslConnector, tcp_keepalive: u32) -> Self { |
02a58862 | 126 | connector.enforce_http(false); |
5eb9dd0c | 127 | Self { |
02a58862 DM |
128 | connector, |
129 | ssl_connector: Arc::new(ssl_connector), | |
9104152a | 130 | proxy: None, |
3ed7e875 | 131 | tcp_keepalive, |
5eb9dd0c SR |
132 | } |
133 | } | |
9104152a DM |
134 | |
135 | pub fn set_proxy(&mut self, proxy: ProxyConfig) { | |
136 | self.proxy = Some(proxy); | |
137 | } | |
138 | ||
139 | async fn secure_stream( | |
140 | tcp_stream: TcpStream, | |
141 | ssl_connector: &SslConnector, | |
142 | host: &str, | |
143 | ) -> Result<MaybeTlsStream<TcpStream>, Error> { | |
144 | let config = ssl_connector.configure()?; | |
145 | let mut conn: SslStream<TcpStream> = SslStream::new(config.into_ssl(host)?, tcp_stream)?; | |
146 | Pin::new(&mut conn).connect().await?; | |
147 | Ok(MaybeTlsStream::Secured(conn)) | |
148 | } | |
149 | ||
1ed9069a DM |
150 | fn parse_status_line(status_line: &str) -> Result<(), Error> { |
151 | if !(status_line.starts_with("HTTP/1.1 200") || status_line.starts_with("HTTP/1.0 200")) { | |
152 | bail!("proxy connect failed - invalid status: {}", status_line) | |
153 | } | |
154 | Ok(()) | |
155 | } | |
156 | ||
157 | async fn parse_connect_response<R: AsyncRead + Unpin>( | |
b6c06dce | 158 | stream: &mut R, |
1ed9069a | 159 | ) -> Result<(), Error> { |
9104152a | 160 | |
b6c06dce DM |
161 | let mut data: Vec<u8> = Vec::new(); |
162 | let mut buffer = [0u8; 256]; | |
163 | const END_MARK: &[u8; 4] = b"\r\n\r\n"; | |
164 | ||
165 | 'outer: loop { | |
166 | let n = stream.read(&mut buffer[..]).await?; | |
167 | if n == 0 { break; } | |
168 | let search_start = if data.len() > END_MARK.len() { data.len() - END_MARK.len() + 1 } else { 0 }; | |
169 | data.extend(&buffer[..n]); | |
170 | if data.len() >= END_MARK.len() { | |
171 | if let Some(pos) = data[search_start..].windows(END_MARK.len()).position(|w| w == END_MARK) { | |
1ed9069a DM |
172 | let response = String::from_utf8_lossy(&data); |
173 | let status_line = match response.split("\r\n").next() { | |
174 | Some(status) => status, | |
175 | None => bail!("missing newline"), | |
176 | }; | |
177 | Self::parse_status_line(status_line)?; | |
178 | ||
b6c06dce DM |
179 | if pos != data.len() - END_MARK.len() { |
180 | bail!("unexpected data after connect response"); | |
181 | } | |
182 | break 'outer; | |
183 | } | |
184 | } | |
185 | if data.len() > 1024*32 { // max 32K (random chosen limit) | |
186 | bail!("too many bytes"); | |
187 | } | |
188 | } | |
9104152a DM |
189 | Ok(()) |
190 | } | |
5eb9dd0c SR |
191 | } |
192 | ||
5eb9dd0c | 193 | impl hyper::service::Service<Uri> for HttpsConnector { |
a2072cc3 | 194 | type Response = MaybeTlsStream<TcpStream>; |
5eb9dd0c | 195 | type Error = Error; |
12e874ce FG |
196 | #[allow(clippy::type_complexity)] |
197 | type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>; | |
5eb9dd0c | 198 | |
9104152a DM |
199 | fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
200 | self.connector | |
201 | .poll_ready(ctx) | |
202 | .map_err(|err| err.into()) | |
5eb9dd0c SR |
203 | } |
204 | ||
205 | fn call(&mut self, dst: Uri) -> Self::Future { | |
02a58862 DM |
206 | let mut connector = self.connector.clone(); |
207 | let ssl_connector = Arc::clone(&self.ssl_connector); | |
208 | let is_https = dst.scheme() == Some(&http::uri::Scheme::HTTPS); | |
209 | let host = match dst.host() { | |
210 | Some(host) => host.to_owned(), | |
211 | None => { | |
212 | return futures::future::err(format_err!("missing URL scheme")).boxed(); | |
213 | } | |
214 | }; | |
9104152a | 215 | let port = dst.port_u16().unwrap_or(if is_https { 443 } else { 80 }); |
3ed7e875 | 216 | let keepalive = self.tcp_keepalive; |
9104152a DM |
217 | |
218 | if let Some(ref proxy) = self.proxy { | |
219 | ||
220 | let use_connect = is_https || proxy.force_connect; | |
221 | ||
e0ba5553 DM |
222 | let proxy_authority = match build_authority(&proxy.host, proxy.port) { |
223 | Ok(authority) => authority, | |
224 | Err(err) => return futures::future::err(err).boxed(), | |
225 | }; | |
226 | ||
9104152a DM |
227 | let proxy_uri = match Uri::builder() |
228 | .scheme("http") | |
e0ba5553 | 229 | .authority(proxy_authority.as_str()) |
9104152a DM |
230 | .path_and_query("/") |
231 | .build() | |
232 | { | |
233 | Ok(uri) => uri, | |
234 | Err(err) => return futures::future::err(err.into()).boxed(), | |
235 | }; | |
236 | ||
cf8e44bc DM |
237 | let authorization = proxy.authorization.clone(); |
238 | ||
9104152a DM |
239 | if use_connect { |
240 | async move { | |
241 | ||
b6c06dce | 242 | let mut tcp_stream = connector |
9104152a DM |
243 | .call(proxy_uri) |
244 | .await | |
e0ba5553 | 245 | .map_err(|err| format_err!("error connecting to {} - {}", proxy_authority, err))?; |
fa016c16 | 246 | |
3ed7e875 | 247 | let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive); |
5eb9dd0c | 248 | |
cf8e44bc DM |
249 | let mut connect_request = format!("CONNECT {0}:{1} HTTP/1.1\r\n", host, port); |
250 | if let Some(authorization) = authorization { | |
440472cb DM |
251 | connect_request.push_str(&format!( |
252 | "Proxy-Authorization: Basic {}\r\n", | |
253 | base64::encode(authorization), | |
254 | )); | |
cf8e44bc DM |
255 | } |
256 | connect_request.push_str(&format!("Host: {0}:{1}\r\n\r\n", host, port)); | |
9104152a | 257 | |
b6c06dce DM |
258 | tcp_stream.write_all(connect_request.as_bytes()).await?; |
259 | tcp_stream.flush().await?; | |
9104152a | 260 | |
1ed9069a | 261 | Self::parse_connect_response(&mut tcp_stream).await?; |
9104152a DM |
262 | |
263 | if is_https { | |
264 | Self::secure_stream(tcp_stream, &ssl_connector, &host).await | |
265 | } else { | |
266 | Ok(MaybeTlsStream::Normal(tcp_stream)) | |
267 | } | |
268 | }.boxed() | |
5eb9dd0c | 269 | } else { |
9104152a DM |
270 | async move { |
271 | let tcp_stream = connector | |
272 | .call(proxy_uri) | |
273 | .await | |
e0ba5553 | 274 | .map_err(|err| format_err!("error connecting to {} - {}", proxy_authority, err))?; |
9104152a | 275 | |
3ed7e875 | 276 | let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive); |
9104152a DM |
277 | |
278 | Ok(MaybeTlsStream::Proxied(tcp_stream)) | |
279 | }.boxed() | |
5eb9dd0c | 280 | } |
9104152a DM |
281 | } else { |
282 | async move { | |
283 | let dst_str = dst.to_string(); // for error messages | |
284 | let tcp_stream = connector | |
285 | .call(dst) | |
286 | .await | |
287 | .map_err(|err| format_err!("error connecting to {} - {}", dst_str, err))?; | |
288 | ||
3ed7e875 | 289 | let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive); |
9104152a DM |
290 | |
291 | if is_https { | |
292 | Self::secure_stream(tcp_stream, &ssl_connector, &host).await | |
293 | } else { | |
294 | Ok(MaybeTlsStream::Normal(tcp_stream)) | |
295 | } | |
296 | }.boxed() | |
297 | } | |
5eb9dd0c SR |
298 | } |
299 | } |