]>
Commit | Line | Data |
---|---|---|
5eb9dd0c | 1 | use anyhow::{Error, format_err, bail}; |
5eb9dd0c SR |
2 | use std::task::{Context, Poll}; |
3 | use std::os::unix::io::AsRawFd; | |
137a6ebc | 4 | use std::collections::HashMap; |
0f860f71 | 5 | use std::pin::Pin; |
02a58862 | 6 | use std::sync::Arc; |
5eb9dd0c SR |
7 | |
8 | use hyper::{Uri, Body}; | |
9 | use hyper::client::{Client, HttpConnector}; | |
cf8e44bc | 10 | use http::{Request, Response, HeaderValue}; |
5eb9dd0c SR |
11 | use openssl::ssl::{SslConnector, SslMethod}; |
12 | use futures::*; | |
9104152a DM |
13 | use tokio::{ |
14 | io::{ | |
b6c06dce DM |
15 | AsyncRead, |
16 | AsyncReadExt, | |
9104152a | 17 | AsyncWriteExt, |
9104152a DM |
18 | }, |
19 | net::TcpStream, | |
20 | }; | |
a2072cc3 | 21 | use tokio_openssl::SslStream; |
5eb9dd0c SR |
22 | |
23 | use crate::tools::{ | |
a2072cc3 | 24 | async_io::MaybeTlsStream, |
5eb9dd0c SR |
25 | socket::{ |
26 | set_tcp_keepalive, | |
27 | PROXMOX_BACKUP_TCP_KEEPALIVE_TIME, | |
28 | }, | |
29 | }; | |
30 | ||
9104152a DM |
31 | /// HTTP Proxy Configuration |
32 | #[derive(Clone)] | |
33 | pub struct ProxyConfig { | |
34 | pub host: String, | |
35 | pub port: u16, | |
cf8e44bc | 36 | pub authorization: Option<String>, // Proxy-Authorization header value |
9104152a DM |
37 | pub force_connect: bool, |
38 | } | |
39 | ||
26153589 DM |
40 | /// Asyncrounous HTTP client implementation |
41 | pub struct SimpleHttp { | |
42 | client: Client<HttpsConnector, Body>, | |
cf8e44bc | 43 | proxy_authorization: Option<String>, // Proxy-Authorization header value |
5eb9dd0c SR |
44 | } |
45 | ||
26153589 | 46 | impl SimpleHttp { |
137a6ebc | 47 | |
9104152a | 48 | pub fn new(proxy_config: Option<ProxyConfig>) -> Self { |
26153589 | 49 | let ssl_connector = SslConnector::builder(SslMethod::tls()).unwrap().build(); |
9104152a | 50 | Self::with_ssl_connector(ssl_connector, proxy_config) |
137a6ebc SR |
51 | } |
52 | ||
9104152a | 53 | pub fn with_ssl_connector(ssl_connector: SslConnector, proxy_config: Option<ProxyConfig>) -> Self { |
cf8e44bc DM |
54 | |
55 | let mut proxy_authorization = None; | |
56 | if let Some(ref proxy_config) = proxy_config { | |
57 | if !proxy_config.force_connect { | |
58 | proxy_authorization = proxy_config.authorization.clone(); | |
59 | } | |
60 | } | |
61 | ||
26153589 | 62 | let connector = HttpConnector::new(); |
9104152a DM |
63 | let mut https = HttpsConnector::with_connector(connector, ssl_connector); |
64 | if let Some(proxy_config) = proxy_config { | |
65 | https.set_proxy(proxy_config); | |
66 | } | |
26153589 | 67 | let client = Client::builder().build(https); |
cf8e44bc DM |
68 | Self { client, proxy_authorization} |
69 | } | |
70 | ||
71 | fn add_proxy_headers(&self, request: &mut Request<Body>) -> Result<(), Error> { | |
72 | if request.uri().scheme() != Some(&http::uri::Scheme::HTTPS) { | |
73 | if let Some(ref authorization) = self.proxy_authorization { | |
74 | request | |
75 | .headers_mut() | |
76 | .insert( | |
77 | http::header::PROXY_AUTHORIZATION, | |
78 | HeaderValue::from_str(authorization)?, | |
79 | ); | |
80 | } | |
81 | } | |
82 | Ok(()) | |
26153589 | 83 | } |
5eb9dd0c | 84 | |
cf8e44bc DM |
85 | pub async fn request(&self, mut request: Request<Body>) -> Result<Response<Body>, Error> { |
86 | self.add_proxy_headers(&mut request)?; | |
9104152a DM |
87 | self.client.request(request) |
88 | .map_err(Error::from) | |
89 | .await | |
90 | } | |
91 | ||
26153589 DM |
92 | pub async fn post( |
93 | &mut self, | |
94 | uri: &str, | |
95 | body: Option<String>, | |
96 | content_type: Option<&str>, | |
97 | ) -> Result<Response<Body>, Error> { | |
98 | ||
99 | let body = if let Some(body) = body { | |
100 | Body::from(body) | |
101 | } else { | |
102 | Body::empty() | |
103 | }; | |
104 | let content_type = content_type.unwrap_or("application/json"); | |
105 | ||
106 | let request = Request::builder() | |
107 | .method("POST") | |
108 | .uri(uri) | |
109 | .header("User-Agent", "proxmox-backup-client/1.0") | |
110 | .header(hyper::header::CONTENT_TYPE, content_type) | |
111 | .body(body)?; | |
112 | ||
cf8e44bc | 113 | self.request(request).await |
5eb9dd0c SR |
114 | } |
115 | ||
26153589 DM |
116 | pub async fn get_string( |
117 | &mut self, | |
118 | uri: &str, | |
119 | extra_headers: Option<&HashMap<String, String>>, | |
120 | ) -> Result<String, Error> { | |
121 | ||
122 | let mut request = Request::builder() | |
123 | .method("GET") | |
124 | .uri(uri) | |
125 | .header("User-Agent", "proxmox-backup-client/1.0"); | |
126 | ||
127 | if let Some(hs) = extra_headers { | |
128 | for (h, v) in hs.iter() { | |
129 | request = request.header(h, v); | |
130 | } | |
131 | } | |
2e201e7d | 132 | |
26153589 | 133 | let request = request.body(Body::empty())?; |
5eb9dd0c | 134 | |
cf8e44bc | 135 | let res = self.request(request).await?; |
26153589 DM |
136 | |
137 | let status = res.status(); | |
138 | if !status.is_success() { | |
139 | bail!("Got bad status '{}' from server", status) | |
140 | } | |
141 | ||
142 | Self::response_body_string(res).await | |
143 | } | |
144 | ||
145 | pub async fn response_body_string(res: Response<Body>) -> Result<String, Error> { | |
146 | let buf = hyper::body::to_bytes(res).await?; | |
147 | String::from_utf8(buf.to_vec()) | |
148 | .map_err(|err| format_err!("Error converting HTTP result data: {}", err)) | |
149 | } | |
2e201e7d TL |
150 | } |
151 | ||
5eb9dd0c SR |
152 | #[derive(Clone)] |
153 | pub struct HttpsConnector { | |
02a58862 DM |
154 | connector: HttpConnector, |
155 | ssl_connector: Arc<SslConnector>, | |
9104152a | 156 | proxy: Option<ProxyConfig>, |
5eb9dd0c SR |
157 | } |
158 | ||
159 | impl HttpsConnector { | |
02a58862 DM |
160 | pub fn with_connector(mut connector: HttpConnector, ssl_connector: SslConnector) -> Self { |
161 | connector.enforce_http(false); | |
5eb9dd0c | 162 | Self { |
02a58862 DM |
163 | connector, |
164 | ssl_connector: Arc::new(ssl_connector), | |
9104152a | 165 | proxy: None, |
5eb9dd0c SR |
166 | } |
167 | } | |
9104152a DM |
168 | |
169 | pub fn set_proxy(&mut self, proxy: ProxyConfig) { | |
170 | self.proxy = Some(proxy); | |
171 | } | |
172 | ||
173 | async fn secure_stream( | |
174 | tcp_stream: TcpStream, | |
175 | ssl_connector: &SslConnector, | |
176 | host: &str, | |
177 | ) -> Result<MaybeTlsStream<TcpStream>, Error> { | |
178 | let config = ssl_connector.configure()?; | |
179 | let mut conn: SslStream<TcpStream> = SslStream::new(config.into_ssl(host)?, tcp_stream)?; | |
180 | Pin::new(&mut conn).connect().await?; | |
181 | Ok(MaybeTlsStream::Secured(conn)) | |
182 | } | |
183 | ||
1ed9069a DM |
184 | fn parse_status_line(status_line: &str) -> Result<(), Error> { |
185 | if !(status_line.starts_with("HTTP/1.1 200") || status_line.starts_with("HTTP/1.0 200")) { | |
186 | bail!("proxy connect failed - invalid status: {}", status_line) | |
187 | } | |
188 | Ok(()) | |
189 | } | |
190 | ||
191 | async fn parse_connect_response<R: AsyncRead + Unpin>( | |
b6c06dce | 192 | stream: &mut R, |
1ed9069a | 193 | ) -> Result<(), Error> { |
9104152a | 194 | |
b6c06dce DM |
195 | let mut data: Vec<u8> = Vec::new(); |
196 | let mut buffer = [0u8; 256]; | |
197 | const END_MARK: &[u8; 4] = b"\r\n\r\n"; | |
198 | ||
199 | 'outer: loop { | |
200 | let n = stream.read(&mut buffer[..]).await?; | |
201 | if n == 0 { break; } | |
202 | let search_start = if data.len() > END_MARK.len() { data.len() - END_MARK.len() + 1 } else { 0 }; | |
203 | data.extend(&buffer[..n]); | |
204 | if data.len() >= END_MARK.len() { | |
205 | if let Some(pos) = data[search_start..].windows(END_MARK.len()).position(|w| w == END_MARK) { | |
1ed9069a DM |
206 | let response = String::from_utf8_lossy(&data); |
207 | let status_line = match response.split("\r\n").next() { | |
208 | Some(status) => status, | |
209 | None => bail!("missing newline"), | |
210 | }; | |
211 | Self::parse_status_line(status_line)?; | |
212 | ||
b6c06dce DM |
213 | if pos != data.len() - END_MARK.len() { |
214 | bail!("unexpected data after connect response"); | |
215 | } | |
216 | break 'outer; | |
217 | } | |
218 | } | |
219 | if data.len() > 1024*32 { // max 32K (random chosen limit) | |
220 | bail!("too many bytes"); | |
221 | } | |
222 | } | |
9104152a DM |
223 | Ok(()) |
224 | } | |
5eb9dd0c SR |
225 | } |
226 | ||
5eb9dd0c | 227 | impl hyper::service::Service<Uri> for HttpsConnector { |
a2072cc3 | 228 | type Response = MaybeTlsStream<TcpStream>; |
5eb9dd0c | 229 | type Error = Error; |
12e874ce FG |
230 | #[allow(clippy::type_complexity)] |
231 | type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>; | |
5eb9dd0c | 232 | |
9104152a DM |
233 | fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
234 | self.connector | |
235 | .poll_ready(ctx) | |
236 | .map_err(|err| err.into()) | |
5eb9dd0c SR |
237 | } |
238 | ||
239 | fn call(&mut self, dst: Uri) -> Self::Future { | |
02a58862 DM |
240 | let mut connector = self.connector.clone(); |
241 | let ssl_connector = Arc::clone(&self.ssl_connector); | |
242 | let is_https = dst.scheme() == Some(&http::uri::Scheme::HTTPS); | |
243 | let host = match dst.host() { | |
244 | Some(host) => host.to_owned(), | |
245 | None => { | |
246 | return futures::future::err(format_err!("missing URL scheme")).boxed(); | |
247 | } | |
248 | }; | |
9104152a DM |
249 | let port = dst.port_u16().unwrap_or(if is_https { 443 } else { 80 }); |
250 | ||
251 | if let Some(ref proxy) = self.proxy { | |
252 | ||
253 | let use_connect = is_https || proxy.force_connect; | |
254 | ||
255 | let proxy_url = format!("{}:{}", proxy.host, proxy.port); | |
256 | let proxy_uri = match Uri::builder() | |
257 | .scheme("http") | |
258 | .authority(proxy_url.as_str()) | |
259 | .path_and_query("/") | |
260 | .build() | |
261 | { | |
262 | Ok(uri) => uri, | |
263 | Err(err) => return futures::future::err(err.into()).boxed(), | |
264 | }; | |
265 | ||
cf8e44bc DM |
266 | let authorization = proxy.authorization.clone(); |
267 | ||
9104152a DM |
268 | if use_connect { |
269 | async move { | |
270 | ||
b6c06dce | 271 | let mut tcp_stream = connector |
9104152a DM |
272 | .call(proxy_uri) |
273 | .await | |
274 | .map_err(|err| format_err!("error connecting to {} - {}", proxy_url, err))?; | |
fa016c16 | 275 | |
b6c06dce | 276 | let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME); |
5eb9dd0c | 277 | |
cf8e44bc DM |
278 | let mut connect_request = format!("CONNECT {0}:{1} HTTP/1.1\r\n", host, port); |
279 | if let Some(authorization) = authorization { | |
280 | connect_request.push_str(&format!("Proxy-Authorization: {}\r\n", authorization)); | |
281 | } | |
282 | connect_request.push_str(&format!("Host: {0}:{1}\r\n\r\n", host, port)); | |
9104152a | 283 | |
b6c06dce DM |
284 | tcp_stream.write_all(connect_request.as_bytes()).await?; |
285 | tcp_stream.flush().await?; | |
9104152a | 286 | |
1ed9069a | 287 | Self::parse_connect_response(&mut tcp_stream).await?; |
9104152a DM |
288 | |
289 | if is_https { | |
290 | Self::secure_stream(tcp_stream, &ssl_connector, &host).await | |
291 | } else { | |
292 | Ok(MaybeTlsStream::Normal(tcp_stream)) | |
293 | } | |
294 | }.boxed() | |
5eb9dd0c | 295 | } else { |
9104152a DM |
296 | async move { |
297 | let tcp_stream = connector | |
298 | .call(proxy_uri) | |
299 | .await | |
300 | .map_err(|err| format_err!("error connecting to {} - {}", proxy_url, err))?; | |
301 | ||
302 | let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME); | |
303 | ||
304 | Ok(MaybeTlsStream::Proxied(tcp_stream)) | |
305 | }.boxed() | |
5eb9dd0c | 306 | } |
9104152a DM |
307 | } else { |
308 | async move { | |
309 | let dst_str = dst.to_string(); // for error messages | |
310 | let tcp_stream = connector | |
311 | .call(dst) | |
312 | .await | |
313 | .map_err(|err| format_err!("error connecting to {} - {}", dst_str, err))?; | |
314 | ||
315 | let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME); | |
316 | ||
317 | if is_https { | |
318 | Self::secure_stream(tcp_stream, &ssl_connector, &host).await | |
319 | } else { | |
320 | Ok(MaybeTlsStream::Normal(tcp_stream)) | |
321 | } | |
322 | }.boxed() | |
323 | } | |
5eb9dd0c SR |
324 | } |
325 | } |