]> git.proxmox.com Git - proxmox-backup.git/blob - src/tools/http.rs
http proxy: improve response parser
[proxmox-backup.git] / src / tools / http.rs
1 use anyhow::{Error, format_err, bail};
2 use std::task::{Context, Poll};
3 use std::os::unix::io::AsRawFd;
4 use std::collections::HashMap;
5 use std::pin::Pin;
6 use std::sync::Arc;
7
8 use hyper::{Uri, Body};
9 use hyper::client::{Client, HttpConnector};
10 use http::{Request, Response};
11 use openssl::ssl::{SslConnector, SslMethod};
12 use futures::*;
13 use tokio::{
14 io::{
15 AsyncRead,
16 AsyncReadExt,
17 AsyncWriteExt,
18 },
19 net::TcpStream,
20 };
21 use tokio_openssl::SslStream;
22
23 use crate::tools::{
24 async_io::MaybeTlsStream,
25 socket::{
26 set_tcp_keepalive,
27 PROXMOX_BACKUP_TCP_KEEPALIVE_TIME,
28 },
29 };
30
31 /// HTTP Proxy Configuration
32 #[derive(Clone)]
33 pub struct ProxyConfig {
34 pub host: String,
35 pub port: u16,
36 pub force_connect: bool,
37 }
38
39 /// Asyncrounous HTTP client implementation
40 pub struct SimpleHttp {
41 client: Client<HttpsConnector, Body>,
42 }
43
44 impl SimpleHttp {
45
46 pub fn new(proxy_config: Option<ProxyConfig>) -> Self {
47 let ssl_connector = SslConnector::builder(SslMethod::tls()).unwrap().build();
48 Self::with_ssl_connector(ssl_connector, proxy_config)
49 }
50
51 pub fn with_ssl_connector(ssl_connector: SslConnector, proxy_config: Option<ProxyConfig>) -> Self {
52 let connector = HttpConnector::new();
53 let mut https = HttpsConnector::with_connector(connector, ssl_connector);
54 if let Some(proxy_config) = proxy_config {
55 https.set_proxy(proxy_config);
56 }
57 let client = Client::builder().build(https);
58 Self { client }
59 }
60
61 pub async fn request(&self, request: Request<Body>) -> Result<Response<Body>, Error> {
62 self.client.request(request)
63 .map_err(Error::from)
64 .await
65 }
66
67 pub async fn post(
68 &mut self,
69 uri: &str,
70 body: Option<String>,
71 content_type: Option<&str>,
72 ) -> Result<Response<Body>, Error> {
73
74 let body = if let Some(body) = body {
75 Body::from(body)
76 } else {
77 Body::empty()
78 };
79 let content_type = content_type.unwrap_or("application/json");
80
81 let request = Request::builder()
82 .method("POST")
83 .uri(uri)
84 .header("User-Agent", "proxmox-backup-client/1.0")
85 .header(hyper::header::CONTENT_TYPE, content_type)
86 .body(body)?;
87
88 self.client.request(request)
89 .map_err(Error::from)
90 .await
91 }
92
93 pub async fn get_string(
94 &mut self,
95 uri: &str,
96 extra_headers: Option<&HashMap<String, String>>,
97 ) -> Result<String, Error> {
98
99 let mut request = Request::builder()
100 .method("GET")
101 .uri(uri)
102 .header("User-Agent", "proxmox-backup-client/1.0");
103
104 if let Some(hs) = extra_headers {
105 for (h, v) in hs.iter() {
106 request = request.header(h, v);
107 }
108 }
109
110 let request = request.body(Body::empty())?;
111
112 let res = self.client.request(request).await?;
113
114 let status = res.status();
115 if !status.is_success() {
116 bail!("Got bad status '{}' from server", status)
117 }
118
119 Self::response_body_string(res).await
120 }
121
122 pub async fn response_body_string(res: Response<Body>) -> Result<String, Error> {
123 let buf = hyper::body::to_bytes(res).await?;
124 String::from_utf8(buf.to_vec())
125 .map_err(|err| format_err!("Error converting HTTP result data: {}", err))
126 }
127 }
128
129 #[derive(Clone)]
130 pub struct HttpsConnector {
131 connector: HttpConnector,
132 ssl_connector: Arc<SslConnector>,
133 proxy: Option<ProxyConfig>,
134 }
135
136 impl HttpsConnector {
137 pub fn with_connector(mut connector: HttpConnector, ssl_connector: SslConnector) -> Self {
138 connector.enforce_http(false);
139 Self {
140 connector,
141 ssl_connector: Arc::new(ssl_connector),
142 proxy: None,
143 }
144 }
145
146 pub fn set_proxy(&mut self, proxy: ProxyConfig) {
147 self.proxy = Some(proxy);
148 }
149
150 async fn secure_stream(
151 tcp_stream: TcpStream,
152 ssl_connector: &SslConnector,
153 host: &str,
154 ) -> Result<MaybeTlsStream<TcpStream>, Error> {
155 let config = ssl_connector.configure()?;
156 let mut conn: SslStream<TcpStream> = SslStream::new(config.into_ssl(host)?, tcp_stream)?;
157 Pin::new(&mut conn).connect().await?;
158 Ok(MaybeTlsStream::Secured(conn))
159 }
160
161 fn parse_status_line(status_line: &str) -> Result<(), Error> {
162 if !(status_line.starts_with("HTTP/1.1 200") || status_line.starts_with("HTTP/1.0 200")) {
163 bail!("proxy connect failed - invalid status: {}", status_line)
164 }
165 Ok(())
166 }
167
168 async fn parse_connect_response<R: AsyncRead + Unpin>(
169 stream: &mut R,
170 ) -> Result<(), Error> {
171
172 let mut data: Vec<u8> = Vec::new();
173 let mut buffer = [0u8; 256];
174 const END_MARK: &[u8; 4] = b"\r\n\r\n";
175
176 'outer: loop {
177 let n = stream.read(&mut buffer[..]).await?;
178 if n == 0 { break; }
179 let search_start = if data.len() > END_MARK.len() { data.len() - END_MARK.len() + 1 } else { 0 };
180 data.extend(&buffer[..n]);
181 if data.len() >= END_MARK.len() {
182 if let Some(pos) = data[search_start..].windows(END_MARK.len()).position(|w| w == END_MARK) {
183 let response = String::from_utf8_lossy(&data);
184 let status_line = match response.split("\r\n").next() {
185 Some(status) => status,
186 None => bail!("missing newline"),
187 };
188 Self::parse_status_line(status_line)?;
189
190 if pos != data.len() - END_MARK.len() {
191 bail!("unexpected data after connect response");
192 }
193 break 'outer;
194 }
195 }
196 if data.len() > 1024*32 { // max 32K (random chosen limit)
197 bail!("too many bytes");
198 }
199 }
200 Ok(())
201 }
202 }
203
204 impl hyper::service::Service<Uri> for HttpsConnector {
205 type Response = MaybeTlsStream<TcpStream>;
206 type Error = Error;
207 #[allow(clippy::type_complexity)]
208 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
209
210 fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
211 self.connector
212 .poll_ready(ctx)
213 .map_err(|err| err.into())
214 }
215
216 fn call(&mut self, dst: Uri) -> Self::Future {
217 let mut connector = self.connector.clone();
218 let ssl_connector = Arc::clone(&self.ssl_connector);
219 let is_https = dst.scheme() == Some(&http::uri::Scheme::HTTPS);
220 let host = match dst.host() {
221 Some(host) => host.to_owned(),
222 None => {
223 return futures::future::err(format_err!("missing URL scheme")).boxed();
224 }
225 };
226 let port = dst.port_u16().unwrap_or(if is_https { 443 } else { 80 });
227
228 if let Some(ref proxy) = self.proxy {
229
230 let use_connect = is_https || proxy.force_connect;
231
232 let proxy_url = format!("{}:{}", proxy.host, proxy.port);
233 let proxy_uri = match Uri::builder()
234 .scheme("http")
235 .authority(proxy_url.as_str())
236 .path_and_query("/")
237 .build()
238 {
239 Ok(uri) => uri,
240 Err(err) => return futures::future::err(err.into()).boxed(),
241 };
242
243 if use_connect {
244 async move {
245
246 let mut tcp_stream = connector
247 .call(proxy_uri)
248 .await
249 .map_err(|err| format_err!("error connecting to {} - {}", proxy_url, err))?;
250
251 let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
252
253 let connect_request = format!(
254 "CONNECT {0}:{1} HTTP/1.1\r\n\
255 Host: {0}:{1}\r\n\r\n",
256 host, port,
257 );
258
259 tcp_stream.write_all(connect_request.as_bytes()).await?;
260 tcp_stream.flush().await?;
261
262 Self::parse_connect_response(&mut tcp_stream).await?;
263
264 if is_https {
265 Self::secure_stream(tcp_stream, &ssl_connector, &host).await
266 } else {
267 Ok(MaybeTlsStream::Normal(tcp_stream))
268 }
269 }.boxed()
270 } else {
271 async move {
272 let tcp_stream = connector
273 .call(proxy_uri)
274 .await
275 .map_err(|err| format_err!("error connecting to {} - {}", proxy_url, err))?;
276
277 let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
278
279 Ok(MaybeTlsStream::Proxied(tcp_stream))
280 }.boxed()
281 }
282 } else {
283 async move {
284 let dst_str = dst.to_string(); // for error messages
285 let tcp_stream = connector
286 .call(dst)
287 .await
288 .map_err(|err| format_err!("error connecting to {} - {}", dst_str, err))?;
289
290 let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
291
292 if is_https {
293 Self::secure_stream(tcp_stream, &ssl_connector, &host).await
294 } else {
295 Ok(MaybeTlsStream::Normal(tcp_stream))
296 }
297 }.boxed()
298 }
299 }
300 }