]>
Commit | Line | Data |
---|---|---|
5eb9dd0c | 1 | use anyhow::{Error, format_err, bail}; |
5eb9dd0c SR |
2 | use std::task::{Context, Poll}; |
3 | use std::os::unix::io::AsRawFd; | |
137a6ebc | 4 | use std::collections::HashMap; |
0f860f71 | 5 | use std::pin::Pin; |
02a58862 | 6 | use std::sync::Arc; |
5eb9dd0c SR |
7 | |
8 | use hyper::{Uri, Body}; | |
9 | use hyper::client::{Client, HttpConnector}; | |
2e201e7d | 10 | use http::{Request, Response}; |
5eb9dd0c SR |
11 | use openssl::ssl::{SslConnector, SslMethod}; |
12 | use futures::*; | |
9104152a DM |
13 | use tokio::{ |
14 | io::{ | |
15 | AsyncBufReadExt, | |
16 | AsyncWriteExt, | |
17 | BufStream, | |
18 | }, | |
19 | net::TcpStream, | |
20 | }; | |
a2072cc3 | 21 | use tokio_openssl::SslStream; |
5eb9dd0c SR |
22 | |
23 | use crate::tools::{ | |
a2072cc3 | 24 | async_io::MaybeTlsStream, |
5eb9dd0c SR |
25 | socket::{ |
26 | set_tcp_keepalive, | |
27 | PROXMOX_BACKUP_TCP_KEEPALIVE_TIME, | |
28 | }, | |
29 | }; | |
30 | ||
9104152a DM |
31 | /// HTTP Proxy Configuration |
32 | #[derive(Clone)] | |
33 | pub struct ProxyConfig { | |
34 | pub host: String, | |
35 | pub port: u16, | |
36 | pub force_connect: bool, | |
37 | } | |
38 | ||
26153589 DM |
39 | /// Asyncrounous HTTP client implementation |
40 | pub struct SimpleHttp { | |
41 | client: Client<HttpsConnector, Body>, | |
5eb9dd0c SR |
42 | } |
43 | ||
26153589 | 44 | impl SimpleHttp { |
137a6ebc | 45 | |
9104152a | 46 | pub fn new(proxy_config: Option<ProxyConfig>) -> Self { |
26153589 | 47 | let ssl_connector = SslConnector::builder(SslMethod::tls()).unwrap().build(); |
9104152a | 48 | Self::with_ssl_connector(ssl_connector, proxy_config) |
137a6ebc SR |
49 | } |
50 | ||
9104152a | 51 | pub fn with_ssl_connector(ssl_connector: SslConnector, proxy_config: Option<ProxyConfig>) -> Self { |
26153589 | 52 | let connector = HttpConnector::new(); |
9104152a DM |
53 | let mut https = HttpsConnector::with_connector(connector, ssl_connector); |
54 | if let Some(proxy_config) = proxy_config { | |
55 | https.set_proxy(proxy_config); | |
56 | } | |
26153589 DM |
57 | let client = Client::builder().build(https); |
58 | Self { client } | |
59 | } | |
5eb9dd0c | 60 | |
9104152a DM |
61 | pub async fn request(&self, request: Request<Body>) -> Result<Response<Body>, Error> { |
62 | self.client.request(request) | |
63 | .map_err(Error::from) | |
64 | .await | |
65 | } | |
66 | ||
26153589 DM |
67 | pub async fn post( |
68 | &mut self, | |
69 | uri: &str, | |
70 | body: Option<String>, | |
71 | content_type: Option<&str>, | |
72 | ) -> Result<Response<Body>, Error> { | |
73 | ||
74 | let body = if let Some(body) = body { | |
75 | Body::from(body) | |
76 | } else { | |
77 | Body::empty() | |
78 | }; | |
79 | let content_type = content_type.unwrap_or("application/json"); | |
80 | ||
81 | let request = Request::builder() | |
82 | .method("POST") | |
83 | .uri(uri) | |
84 | .header("User-Agent", "proxmox-backup-client/1.0") | |
85 | .header(hyper::header::CONTENT_TYPE, content_type) | |
86 | .body(body)?; | |
87 | ||
88 | self.client.request(request) | |
89 | .map_err(Error::from) | |
90 | .await | |
5eb9dd0c SR |
91 | } |
92 | ||
26153589 DM |
93 | pub async fn get_string( |
94 | &mut self, | |
95 | uri: &str, | |
96 | extra_headers: Option<&HashMap<String, String>>, | |
97 | ) -> Result<String, Error> { | |
98 | ||
99 | let mut request = Request::builder() | |
100 | .method("GET") | |
101 | .uri(uri) | |
102 | .header("User-Agent", "proxmox-backup-client/1.0"); | |
103 | ||
104 | if let Some(hs) = extra_headers { | |
105 | for (h, v) in hs.iter() { | |
106 | request = request.header(h, v); | |
107 | } | |
108 | } | |
2e201e7d | 109 | |
26153589 | 110 | let request = request.body(Body::empty())?; |
5eb9dd0c | 111 | |
26153589 DM |
112 | let res = self.client.request(request).await?; |
113 | ||
114 | let status = res.status(); | |
115 | if !status.is_success() { | |
116 | bail!("Got bad status '{}' from server", status) | |
117 | } | |
118 | ||
119 | Self::response_body_string(res).await | |
120 | } | |
121 | ||
122 | pub async fn response_body_string(res: Response<Body>) -> Result<String, Error> { | |
123 | let buf = hyper::body::to_bytes(res).await?; | |
124 | String::from_utf8(buf.to_vec()) | |
125 | .map_err(|err| format_err!("Error converting HTTP result data: {}", err)) | |
126 | } | |
2e201e7d TL |
127 | } |
128 | ||
5eb9dd0c SR |
129 | #[derive(Clone)] |
130 | pub struct HttpsConnector { | |
02a58862 DM |
131 | connector: HttpConnector, |
132 | ssl_connector: Arc<SslConnector>, | |
9104152a | 133 | proxy: Option<ProxyConfig>, |
5eb9dd0c SR |
134 | } |
135 | ||
136 | impl HttpsConnector { | |
02a58862 DM |
137 | pub fn with_connector(mut connector: HttpConnector, ssl_connector: SslConnector) -> Self { |
138 | connector.enforce_http(false); | |
5eb9dd0c | 139 | Self { |
02a58862 DM |
140 | connector, |
141 | ssl_connector: Arc::new(ssl_connector), | |
9104152a | 142 | proxy: None, |
5eb9dd0c SR |
143 | } |
144 | } | |
9104152a DM |
145 | |
146 | pub fn set_proxy(&mut self, proxy: ProxyConfig) { | |
147 | self.proxy = Some(proxy); | |
148 | } | |
149 | ||
150 | async fn secure_stream( | |
151 | tcp_stream: TcpStream, | |
152 | ssl_connector: &SslConnector, | |
153 | host: &str, | |
154 | ) -> Result<MaybeTlsStream<TcpStream>, Error> { | |
155 | let config = ssl_connector.configure()?; | |
156 | let mut conn: SslStream<TcpStream> = SslStream::new(config.into_ssl(host)?, tcp_stream)?; | |
157 | Pin::new(&mut conn).connect().await?; | |
158 | Ok(MaybeTlsStream::Secured(conn)) | |
159 | } | |
160 | ||
161 | async fn parse_connect_status( | |
162 | stream: &mut BufStream<TcpStream>, | |
163 | ) -> Result<(), Error> { | |
164 | ||
165 | let mut status_str = String::new(); | |
166 | ||
167 | // TODO: limit read-length | |
168 | ||
169 | if stream.read_line(&mut status_str).await? == 0 { | |
170 | bail!("proxy connect failed - unexpected EOF") | |
171 | } | |
172 | ||
173 | if !(status_str.starts_with("HTTP/1.1 200") || status_str.starts_with("HTTP/1.0 200")) { | |
174 | bail!("proxy connect failed - invalid status: {}", status_str) | |
175 | } | |
176 | ||
177 | loop { | |
178 | // skip rest until \r\n | |
179 | let mut response = String::new(); | |
180 | if stream.read_line(&mut response).await? == 0 { | |
181 | bail!("proxy connect failed - unexpected EOF") | |
182 | } | |
183 | if response.len() > 8192 { | |
184 | bail!("proxy connect failed - long lines in connect rtesponse") | |
185 | } | |
186 | if response == "\r\n" { | |
187 | break; | |
188 | } | |
189 | } | |
190 | Ok(()) | |
191 | } | |
5eb9dd0c SR |
192 | } |
193 | ||
5eb9dd0c | 194 | impl hyper::service::Service<Uri> for HttpsConnector { |
a2072cc3 | 195 | type Response = MaybeTlsStream<TcpStream>; |
5eb9dd0c | 196 | type Error = Error; |
12e874ce FG |
197 | #[allow(clippy::type_complexity)] |
198 | type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>; | |
5eb9dd0c | 199 | |
9104152a DM |
200 | fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
201 | self.connector | |
202 | .poll_ready(ctx) | |
203 | .map_err(|err| err.into()) | |
5eb9dd0c SR |
204 | } |
205 | ||
206 | fn call(&mut self, dst: Uri) -> Self::Future { | |
02a58862 DM |
207 | let mut connector = self.connector.clone(); |
208 | let ssl_connector = Arc::clone(&self.ssl_connector); | |
209 | let is_https = dst.scheme() == Some(&http::uri::Scheme::HTTPS); | |
210 | let host = match dst.host() { | |
211 | Some(host) => host.to_owned(), | |
212 | None => { | |
213 | return futures::future::err(format_err!("missing URL scheme")).boxed(); | |
214 | } | |
215 | }; | |
9104152a DM |
216 | let port = dst.port_u16().unwrap_or(if is_https { 443 } else { 80 }); |
217 | ||
218 | if let Some(ref proxy) = self.proxy { | |
219 | ||
220 | let use_connect = is_https || proxy.force_connect; | |
221 | ||
222 | let proxy_url = format!("{}:{}", proxy.host, proxy.port); | |
223 | let proxy_uri = match Uri::builder() | |
224 | .scheme("http") | |
225 | .authority(proxy_url.as_str()) | |
226 | .path_and_query("/") | |
227 | .build() | |
228 | { | |
229 | Ok(uri) => uri, | |
230 | Err(err) => return futures::future::err(err.into()).boxed(), | |
231 | }; | |
232 | ||
233 | if use_connect { | |
234 | async move { | |
235 | ||
236 | let proxy_stream = connector | |
237 | .call(proxy_uri) | |
238 | .await | |
239 | .map_err(|err| format_err!("error connecting to {} - {}", proxy_url, err))?; | |
fa016c16 | 240 | |
9104152a | 241 | let _ = set_tcp_keepalive(proxy_stream.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME); |
5eb9dd0c | 242 | |
9104152a | 243 | let mut stream = BufStream::new(proxy_stream); |
5eb9dd0c | 244 | |
9104152a DM |
245 | let connect_request = format!( |
246 | "CONNECT {0}:{1} HTTP/1.1\r\n\ | |
247 | Host: {0}:{1}\r\n\r\n", | |
248 | host, port, | |
249 | ); | |
250 | ||
251 | stream.write_all(connect_request.as_bytes()).await?; | |
252 | stream.flush().await?; | |
253 | ||
254 | Self::parse_connect_status(&mut stream).await?; | |
255 | ||
256 | let tcp_stream = stream.into_inner(); | |
257 | ||
258 | if is_https { | |
259 | Self::secure_stream(tcp_stream, &ssl_connector, &host).await | |
260 | } else { | |
261 | Ok(MaybeTlsStream::Normal(tcp_stream)) | |
262 | } | |
263 | }.boxed() | |
5eb9dd0c | 264 | } else { |
9104152a DM |
265 | async move { |
266 | let tcp_stream = connector | |
267 | .call(proxy_uri) | |
268 | .await | |
269 | .map_err(|err| format_err!("error connecting to {} - {}", proxy_url, err))?; | |
270 | ||
271 | let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME); | |
272 | ||
273 | Ok(MaybeTlsStream::Proxied(tcp_stream)) | |
274 | }.boxed() | |
5eb9dd0c | 275 | } |
9104152a DM |
276 | } else { |
277 | async move { | |
278 | let dst_str = dst.to_string(); // for error messages | |
279 | let tcp_stream = connector | |
280 | .call(dst) | |
281 | .await | |
282 | .map_err(|err| format_err!("error connecting to {} - {}", dst_str, err))?; | |
283 | ||
284 | let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME); | |
285 | ||
286 | if is_https { | |
287 | Self::secure_stream(tcp_stream, &ssl_connector, &host).await | |
288 | } else { | |
289 | Ok(MaybeTlsStream::Normal(tcp_stream)) | |
290 | } | |
291 | }.boxed() | |
292 | } | |
5eb9dd0c SR |
293 | } |
294 | } |