]>
Commit | Line | Data |
---|---|---|
5eb9dd0c | 1 | use anyhow::{Error, format_err, bail}; |
5eb9dd0c SR |
2 | use std::task::{Context, Poll}; |
3 | use std::os::unix::io::AsRawFd; | |
137a6ebc | 4 | use std::collections::HashMap; |
0f860f71 | 5 | use std::pin::Pin; |
02a58862 | 6 | use std::sync::Arc; |
5eb9dd0c SR |
7 | |
8 | use hyper::{Uri, Body}; | |
9 | use hyper::client::{Client, HttpConnector}; | |
cf8e44bc | 10 | use http::{Request, Response, HeaderValue}; |
5eb9dd0c SR |
11 | use openssl::ssl::{SslConnector, SslMethod}; |
12 | use futures::*; | |
9104152a DM |
13 | use tokio::{ |
14 | io::{ | |
b6c06dce DM |
15 | AsyncRead, |
16 | AsyncReadExt, | |
9104152a | 17 | AsyncWriteExt, |
9104152a DM |
18 | }, |
19 | net::TcpStream, | |
20 | }; | |
a2072cc3 | 21 | use tokio_openssl::SslStream; |
5eb9dd0c SR |
22 | |
23 | use crate::tools::{ | |
a2072cc3 | 24 | async_io::MaybeTlsStream, |
5eb9dd0c SR |
25 | socket::{ |
26 | set_tcp_keepalive, | |
27 | PROXMOX_BACKUP_TCP_KEEPALIVE_TIME, | |
28 | }, | |
29 | }; | |
30 | ||
9104152a DM |
31 | /// HTTP Proxy Configuration |
32 | #[derive(Clone)] | |
33 | pub struct ProxyConfig { | |
34 | pub host: String, | |
35 | pub port: u16, | |
cf8e44bc | 36 | pub authorization: Option<String>, // Proxy-Authorization header value |
9104152a DM |
37 | pub force_connect: bool, |
38 | } | |
39 | ||
7a7fcb47 DM |
40 | impl ProxyConfig { |
41 | ||
42 | /// Parse proxy config from ALL_PROXY environment var | |
43 | pub fn from_proxy_env() -> Result<Option<ProxyConfig>, Error> { | |
44 | ||
45 | // We only support/use ALL_PROXY environment | |
46 | ||
47 | match std::env::var_os("ALL_PROXY") { | |
48 | None => return Ok(None), | |
49 | Some(all_proxy) => { | |
50 | let all_proxy = match all_proxy.to_str() { | |
51 | Some(s) => String::from(s), | |
52 | None => bail!("non UTF-8 content in env ALL_PROXY"), | |
53 | }; | |
54 | if all_proxy.is_empty() { | |
55 | return Ok(None); | |
56 | } | |
57 | let config = Self::parse_proxy_url(&all_proxy)?; | |
58 | Ok(Some(config)) | |
59 | } | |
60 | } | |
61 | } | |
62 | ||
63 | /// Parse proxy configuration string [http://]<host>[:port] | |
64 | /// | |
65 | /// Default port is 1080 (like curl) | |
66 | pub fn parse_proxy_url(http_proxy: &str) -> Result<ProxyConfig, Error> { | |
67 | proxmox::try_block!({ | |
68 | let proxy_uri: Uri = http_proxy.parse()?; | |
69 | let proxy_authority = match proxy_uri.authority() { | |
70 | Some(authority) => authority, | |
71 | None => bail!("missing proxy authority"), | |
72 | }; | |
73 | let host = proxy_authority.host().to_owned(); | |
74 | let port = match proxy_uri.port() { | |
75 | Some(port) => port.as_u16(), | |
76 | None => 1080, // CURL default port | |
77 | }; | |
78 | ||
79 | match proxy_uri.scheme_str() { | |
80 | Some("http") => { /* Ok */ } | |
81 | Some(scheme) => bail!("unsupported proxy scheme '{}'", scheme), | |
82 | None => { /* assume HTTP */ } | |
83 | } | |
84 | ||
85 | let authority_vec: Vec<&str> = proxy_authority.as_str().rsplitn(2, '@').collect(); | |
86 | let authorization = if authority_vec.len() == 2 { | |
87 | Some(format!("Basic {}", base64::encode(authority_vec[1]))) | |
88 | } else { | |
89 | None | |
90 | }; | |
91 | ||
92 | Ok(ProxyConfig { | |
93 | host, | |
94 | port, | |
95 | authorization, | |
96 | force_connect: false, | |
97 | }) | |
98 | }).map_err(|err| format_err!("parse_proxy_url failed: {}", err)) | |
99 | } | |
100 | } | |
101 | ||
26153589 DM |
102 | /// Asyncrounous HTTP client implementation |
103 | pub struct SimpleHttp { | |
104 | client: Client<HttpsConnector, Body>, | |
cf8e44bc | 105 | proxy_authorization: Option<String>, // Proxy-Authorization header value |
c0147e49 | 106 | user_agent: Option<String>, |
5eb9dd0c SR |
107 | } |
108 | ||
26153589 | 109 | impl SimpleHttp { |
137a6ebc | 110 | |
d52b1209 DM |
111 | pub const DEFAULT_USER_AGENT_STRING: &'static str = "proxmox-backup-client/1.0"; |
112 | ||
9104152a | 113 | pub fn new(proxy_config: Option<ProxyConfig>) -> Self { |
26153589 | 114 | let ssl_connector = SslConnector::builder(SslMethod::tls()).unwrap().build(); |
9104152a | 115 | Self::with_ssl_connector(ssl_connector, proxy_config) |
137a6ebc SR |
116 | } |
117 | ||
9104152a | 118 | pub fn with_ssl_connector(ssl_connector: SslConnector, proxy_config: Option<ProxyConfig>) -> Self { |
cf8e44bc DM |
119 | |
120 | let mut proxy_authorization = None; | |
121 | if let Some(ref proxy_config) = proxy_config { | |
122 | if !proxy_config.force_connect { | |
123 | proxy_authorization = proxy_config.authorization.clone(); | |
124 | } | |
125 | } | |
126 | ||
26153589 | 127 | let connector = HttpConnector::new(); |
9104152a DM |
128 | let mut https = HttpsConnector::with_connector(connector, ssl_connector); |
129 | if let Some(proxy_config) = proxy_config { | |
130 | https.set_proxy(proxy_config); | |
131 | } | |
26153589 | 132 | let client = Client::builder().build(https); |
c0147e49 DM |
133 | Self { client, proxy_authorization, user_agent: None } |
134 | } | |
135 | ||
136 | pub fn set_user_agent(&mut self, user_agent: &str) -> Result<(), Error> { | |
137 | self.user_agent = Some(user_agent.to_owned()); | |
138 | Ok(()) | |
cf8e44bc DM |
139 | } |
140 | ||
141 | fn add_proxy_headers(&self, request: &mut Request<Body>) -> Result<(), Error> { | |
142 | if request.uri().scheme() != Some(&http::uri::Scheme::HTTPS) { | |
143 | if let Some(ref authorization) = self.proxy_authorization { | |
144 | request | |
145 | .headers_mut() | |
146 | .insert( | |
147 | http::header::PROXY_AUTHORIZATION, | |
148 | HeaderValue::from_str(authorization)?, | |
149 | ); | |
150 | } | |
151 | } | |
152 | Ok(()) | |
26153589 | 153 | } |
5eb9dd0c | 154 | |
cf8e44bc | 155 | pub async fn request(&self, mut request: Request<Body>) -> Result<Response<Body>, Error> { |
c0147e49 DM |
156 | let user_agent = if let Some(ref user_agent) = self.user_agent { |
157 | HeaderValue::from_str(&user_agent)? | |
158 | } else { | |
159 | HeaderValue::from_str(Self::DEFAULT_USER_AGENT_STRING)? | |
160 | }; | |
d52b1209 | 161 | |
c0147e49 | 162 | request.headers_mut().insert(hyper::header::USER_AGENT, user_agent); |
d52b1209 | 163 | |
cf8e44bc | 164 | self.add_proxy_headers(&mut request)?; |
d52b1209 | 165 | |
9104152a DM |
166 | self.client.request(request) |
167 | .map_err(Error::from) | |
168 | .await | |
169 | } | |
170 | ||
26153589 DM |
171 | pub async fn post( |
172 | &mut self, | |
173 | uri: &str, | |
174 | body: Option<String>, | |
175 | content_type: Option<&str>, | |
176 | ) -> Result<Response<Body>, Error> { | |
177 | ||
178 | let body = if let Some(body) = body { | |
179 | Body::from(body) | |
180 | } else { | |
181 | Body::empty() | |
182 | }; | |
183 | let content_type = content_type.unwrap_or("application/json"); | |
184 | ||
185 | let request = Request::builder() | |
186 | .method("POST") | |
187 | .uri(uri) | |
26153589 DM |
188 | .header(hyper::header::CONTENT_TYPE, content_type) |
189 | .body(body)?; | |
190 | ||
cf8e44bc | 191 | self.request(request).await |
5eb9dd0c SR |
192 | } |
193 | ||
26153589 DM |
194 | pub async fn get_string( |
195 | &mut self, | |
196 | uri: &str, | |
197 | extra_headers: Option<&HashMap<String, String>>, | |
198 | ) -> Result<String, Error> { | |
199 | ||
200 | let mut request = Request::builder() | |
201 | .method("GET") | |
d52b1209 | 202 | .uri(uri); |
26153589 DM |
203 | |
204 | if let Some(hs) = extra_headers { | |
205 | for (h, v) in hs.iter() { | |
206 | request = request.header(h, v); | |
207 | } | |
208 | } | |
2e201e7d | 209 | |
26153589 | 210 | let request = request.body(Body::empty())?; |
5eb9dd0c | 211 | |
cf8e44bc | 212 | let res = self.request(request).await?; |
26153589 DM |
213 | |
214 | let status = res.status(); | |
215 | if !status.is_success() { | |
216 | bail!("Got bad status '{}' from server", status) | |
217 | } | |
218 | ||
219 | Self::response_body_string(res).await | |
220 | } | |
221 | ||
222 | pub async fn response_body_string(res: Response<Body>) -> Result<String, Error> { | |
223 | let buf = hyper::body::to_bytes(res).await?; | |
224 | String::from_utf8(buf.to_vec()) | |
225 | .map_err(|err| format_err!("Error converting HTTP result data: {}", err)) | |
226 | } | |
2e201e7d TL |
227 | } |
228 | ||
5eb9dd0c SR |
229 | #[derive(Clone)] |
230 | pub struct HttpsConnector { | |
02a58862 DM |
231 | connector: HttpConnector, |
232 | ssl_connector: Arc<SslConnector>, | |
9104152a | 233 | proxy: Option<ProxyConfig>, |
5eb9dd0c SR |
234 | } |
235 | ||
236 | impl HttpsConnector { | |
02a58862 DM |
237 | pub fn with_connector(mut connector: HttpConnector, ssl_connector: SslConnector) -> Self { |
238 | connector.enforce_http(false); | |
5eb9dd0c | 239 | Self { |
02a58862 DM |
240 | connector, |
241 | ssl_connector: Arc::new(ssl_connector), | |
9104152a | 242 | proxy: None, |
5eb9dd0c SR |
243 | } |
244 | } | |
9104152a DM |
245 | |
246 | pub fn set_proxy(&mut self, proxy: ProxyConfig) { | |
247 | self.proxy = Some(proxy); | |
248 | } | |
249 | ||
250 | async fn secure_stream( | |
251 | tcp_stream: TcpStream, | |
252 | ssl_connector: &SslConnector, | |
253 | host: &str, | |
254 | ) -> Result<MaybeTlsStream<TcpStream>, Error> { | |
255 | let config = ssl_connector.configure()?; | |
256 | let mut conn: SslStream<TcpStream> = SslStream::new(config.into_ssl(host)?, tcp_stream)?; | |
257 | Pin::new(&mut conn).connect().await?; | |
258 | Ok(MaybeTlsStream::Secured(conn)) | |
259 | } | |
260 | ||
1ed9069a DM |
261 | fn parse_status_line(status_line: &str) -> Result<(), Error> { |
262 | if !(status_line.starts_with("HTTP/1.1 200") || status_line.starts_with("HTTP/1.0 200")) { | |
263 | bail!("proxy connect failed - invalid status: {}", status_line) | |
264 | } | |
265 | Ok(()) | |
266 | } | |
267 | ||
268 | async fn parse_connect_response<R: AsyncRead + Unpin>( | |
b6c06dce | 269 | stream: &mut R, |
1ed9069a | 270 | ) -> Result<(), Error> { |
9104152a | 271 | |
b6c06dce DM |
272 | let mut data: Vec<u8> = Vec::new(); |
273 | let mut buffer = [0u8; 256]; | |
274 | const END_MARK: &[u8; 4] = b"\r\n\r\n"; | |
275 | ||
276 | 'outer: loop { | |
277 | let n = stream.read(&mut buffer[..]).await?; | |
278 | if n == 0 { break; } | |
279 | let search_start = if data.len() > END_MARK.len() { data.len() - END_MARK.len() + 1 } else { 0 }; | |
280 | data.extend(&buffer[..n]); | |
281 | if data.len() >= END_MARK.len() { | |
282 | if let Some(pos) = data[search_start..].windows(END_MARK.len()).position(|w| w == END_MARK) { | |
1ed9069a DM |
283 | let response = String::from_utf8_lossy(&data); |
284 | let status_line = match response.split("\r\n").next() { | |
285 | Some(status) => status, | |
286 | None => bail!("missing newline"), | |
287 | }; | |
288 | Self::parse_status_line(status_line)?; | |
289 | ||
b6c06dce DM |
290 | if pos != data.len() - END_MARK.len() { |
291 | bail!("unexpected data after connect response"); | |
292 | } | |
293 | break 'outer; | |
294 | } | |
295 | } | |
296 | if data.len() > 1024*32 { // max 32K (random chosen limit) | |
297 | bail!("too many bytes"); | |
298 | } | |
299 | } | |
9104152a DM |
300 | Ok(()) |
301 | } | |
5eb9dd0c SR |
302 | } |
303 | ||
5eb9dd0c | 304 | impl hyper::service::Service<Uri> for HttpsConnector { |
a2072cc3 | 305 | type Response = MaybeTlsStream<TcpStream>; |
5eb9dd0c | 306 | type Error = Error; |
12e874ce FG |
307 | #[allow(clippy::type_complexity)] |
308 | type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>; | |
5eb9dd0c | 309 | |
9104152a DM |
310 | fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
311 | self.connector | |
312 | .poll_ready(ctx) | |
313 | .map_err(|err| err.into()) | |
5eb9dd0c SR |
314 | } |
315 | ||
316 | fn call(&mut self, dst: Uri) -> Self::Future { | |
02a58862 DM |
317 | let mut connector = self.connector.clone(); |
318 | let ssl_connector = Arc::clone(&self.ssl_connector); | |
319 | let is_https = dst.scheme() == Some(&http::uri::Scheme::HTTPS); | |
320 | let host = match dst.host() { | |
321 | Some(host) => host.to_owned(), | |
322 | None => { | |
323 | return futures::future::err(format_err!("missing URL scheme")).boxed(); | |
324 | } | |
325 | }; | |
9104152a DM |
326 | let port = dst.port_u16().unwrap_or(if is_https { 443 } else { 80 }); |
327 | ||
328 | if let Some(ref proxy) = self.proxy { | |
329 | ||
330 | let use_connect = is_https || proxy.force_connect; | |
331 | ||
332 | let proxy_url = format!("{}:{}", proxy.host, proxy.port); | |
333 | let proxy_uri = match Uri::builder() | |
334 | .scheme("http") | |
335 | .authority(proxy_url.as_str()) | |
336 | .path_and_query("/") | |
337 | .build() | |
338 | { | |
339 | Ok(uri) => uri, | |
340 | Err(err) => return futures::future::err(err.into()).boxed(), | |
341 | }; | |
342 | ||
cf8e44bc DM |
343 | let authorization = proxy.authorization.clone(); |
344 | ||
9104152a DM |
345 | if use_connect { |
346 | async move { | |
347 | ||
b6c06dce | 348 | let mut tcp_stream = connector |
9104152a DM |
349 | .call(proxy_uri) |
350 | .await | |
351 | .map_err(|err| format_err!("error connecting to {} - {}", proxy_url, err))?; | |
fa016c16 | 352 | |
b6c06dce | 353 | let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME); |
5eb9dd0c | 354 | |
cf8e44bc DM |
355 | let mut connect_request = format!("CONNECT {0}:{1} HTTP/1.1\r\n", host, port); |
356 | if let Some(authorization) = authorization { | |
357 | connect_request.push_str(&format!("Proxy-Authorization: {}\r\n", authorization)); | |
358 | } | |
359 | connect_request.push_str(&format!("Host: {0}:{1}\r\n\r\n", host, port)); | |
9104152a | 360 | |
b6c06dce DM |
361 | tcp_stream.write_all(connect_request.as_bytes()).await?; |
362 | tcp_stream.flush().await?; | |
9104152a | 363 | |
1ed9069a | 364 | Self::parse_connect_response(&mut tcp_stream).await?; |
9104152a DM |
365 | |
366 | if is_https { | |
367 | Self::secure_stream(tcp_stream, &ssl_connector, &host).await | |
368 | } else { | |
369 | Ok(MaybeTlsStream::Normal(tcp_stream)) | |
370 | } | |
371 | }.boxed() | |
5eb9dd0c | 372 | } else { |
9104152a DM |
373 | async move { |
374 | let tcp_stream = connector | |
375 | .call(proxy_uri) | |
376 | .await | |
377 | .map_err(|err| format_err!("error connecting to {} - {}", proxy_url, err))?; | |
378 | ||
379 | let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME); | |
380 | ||
381 | Ok(MaybeTlsStream::Proxied(tcp_stream)) | |
382 | }.boxed() | |
5eb9dd0c | 383 | } |
9104152a DM |
384 | } else { |
385 | async move { | |
386 | let dst_str = dst.to_string(); // for error messages | |
387 | let tcp_stream = connector | |
388 | .call(dst) | |
389 | .await | |
390 | .map_err(|err| format_err!("error connecting to {} - {}", dst_str, err))?; | |
391 | ||
392 | let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME); | |
393 | ||
394 | if is_https { | |
395 | Self::secure_stream(tcp_stream, &ssl_connector, &host).await | |
396 | } else { | |
397 | Ok(MaybeTlsStream::Normal(tcp_stream)) | |
398 | } | |
399 | }.boxed() | |
400 | } | |
5eb9dd0c SR |
401 | } |
402 | } |