]> git.proxmox.com Git - proxmox-backup.git/blob - src/tools/http.rs
cleanup: split SimpleHttp client into extra file
[proxmox-backup.git] / src / tools / http.rs
1 use anyhow::{Error, format_err, bail};
2 use std::task::{Context, Poll};
3 use std::os::unix::io::AsRawFd;
4 use std::pin::Pin;
5 use std::sync::Arc;
6
7 use hyper::client::HttpConnector;
8 use http::{Uri, uri::Authority};
9 use openssl::ssl::SslConnector;
10 use futures::*;
11 use tokio::{
12 io::{
13 AsyncRead,
14 AsyncReadExt,
15 AsyncWriteExt,
16 },
17 net::TcpStream,
18 };
19 use tokio_openssl::SslStream;
20
21 use crate::tools::{
22 async_io::MaybeTlsStream,
23 socket::{
24 set_tcp_keepalive,
25 PROXMOX_BACKUP_TCP_KEEPALIVE_TIME,
26 },
27 };
28
29 // Build a http::uri::Authority ("host:port"), use '[..]' around IPv6 addresses
30 fn build_authority(host: &str, port: u16) -> Result<Authority, Error> {
31 let bytes = host.as_bytes();
32 let len = bytes.len();
33 let authority = if len > 3 && bytes.contains(&b':') && bytes[0] != b'[' && bytes[len-1] != b']' {
34 format!("[{}]:{}", host, port).parse()?
35 } else {
36 format!("{}:{}", host, port).parse()?
37 };
38 Ok(authority)
39 }
40
41 /// HTTP Proxy Configuration
42 #[derive(Clone)]
43 pub struct ProxyConfig {
44 pub host: String,
45 pub port: u16,
46 pub authorization: Option<String>, // Proxy-Authorization header value
47 pub force_connect: bool,
48 }
49
50 impl ProxyConfig {
51
52 /// Parse proxy config from ALL_PROXY environment var
53 pub fn from_proxy_env() -> Result<Option<ProxyConfig>, Error> {
54
55 // We only support/use ALL_PROXY environment
56
57 match std::env::var_os("ALL_PROXY") {
58 None => return Ok(None),
59 Some(all_proxy) => {
60 let all_proxy = match all_proxy.to_str() {
61 Some(s) => String::from(s),
62 None => bail!("non UTF-8 content in env ALL_PROXY"),
63 };
64 if all_proxy.is_empty() {
65 return Ok(None);
66 }
67 let config = Self::parse_proxy_url(&all_proxy)?;
68 Ok(Some(config))
69 }
70 }
71 }
72
73 /// Parse proxy configuration string [http://]<host>[:port]
74 ///
75 /// Default port is 1080 (like curl)
76 pub fn parse_proxy_url(http_proxy: &str) -> Result<ProxyConfig, Error> {
77 proxmox::try_block!({
78 let proxy_uri: Uri = http_proxy.parse()?;
79 let proxy_authority = match proxy_uri.authority() {
80 Some(authority) => authority,
81 None => bail!("missing proxy authority"),
82 };
83 let host = proxy_authority.host().to_owned();
84 let port = match proxy_uri.port() {
85 Some(port) => port.as_u16(),
86 None => 1080, // CURL default port
87 };
88
89 match proxy_uri.scheme_str() {
90 Some("http") => { /* Ok */ }
91 Some(scheme) => bail!("unsupported proxy scheme '{}'", scheme),
92 None => { /* assume HTTP */ }
93 }
94
95 let authority_vec: Vec<&str> = proxy_authority.as_str().rsplitn(2, '@').collect();
96 let authorization = if authority_vec.len() == 2 {
97 Some(format!("Basic {}", base64::encode(authority_vec[1])))
98 } else {
99 None
100 };
101
102 Ok(ProxyConfig {
103 host,
104 port,
105 authorization,
106 force_connect: false,
107 })
108 }).map_err(|err| format_err!("parse_proxy_url failed: {}", err))
109 }
110 }
111
112 #[derive(Clone)]
113 pub struct HttpsConnector {
114 connector: HttpConnector,
115 ssl_connector: Arc<SslConnector>,
116 proxy: Option<ProxyConfig>,
117 }
118
119 impl HttpsConnector {
120 pub fn with_connector(mut connector: HttpConnector, ssl_connector: SslConnector) -> Self {
121 connector.enforce_http(false);
122 Self {
123 connector,
124 ssl_connector: Arc::new(ssl_connector),
125 proxy: None,
126 }
127 }
128
129 pub fn set_proxy(&mut self, proxy: ProxyConfig) {
130 self.proxy = Some(proxy);
131 }
132
133 async fn secure_stream(
134 tcp_stream: TcpStream,
135 ssl_connector: &SslConnector,
136 host: &str,
137 ) -> Result<MaybeTlsStream<TcpStream>, Error> {
138 let config = ssl_connector.configure()?;
139 let mut conn: SslStream<TcpStream> = SslStream::new(config.into_ssl(host)?, tcp_stream)?;
140 Pin::new(&mut conn).connect().await?;
141 Ok(MaybeTlsStream::Secured(conn))
142 }
143
144 fn parse_status_line(status_line: &str) -> Result<(), Error> {
145 if !(status_line.starts_with("HTTP/1.1 200") || status_line.starts_with("HTTP/1.0 200")) {
146 bail!("proxy connect failed - invalid status: {}", status_line)
147 }
148 Ok(())
149 }
150
151 async fn parse_connect_response<R: AsyncRead + Unpin>(
152 stream: &mut R,
153 ) -> Result<(), Error> {
154
155 let mut data: Vec<u8> = Vec::new();
156 let mut buffer = [0u8; 256];
157 const END_MARK: &[u8; 4] = b"\r\n\r\n";
158
159 'outer: loop {
160 let n = stream.read(&mut buffer[..]).await?;
161 if n == 0 { break; }
162 let search_start = if data.len() > END_MARK.len() { data.len() - END_MARK.len() + 1 } else { 0 };
163 data.extend(&buffer[..n]);
164 if data.len() >= END_MARK.len() {
165 if let Some(pos) = data[search_start..].windows(END_MARK.len()).position(|w| w == END_MARK) {
166 let response = String::from_utf8_lossy(&data);
167 let status_line = match response.split("\r\n").next() {
168 Some(status) => status,
169 None => bail!("missing newline"),
170 };
171 Self::parse_status_line(status_line)?;
172
173 if pos != data.len() - END_MARK.len() {
174 bail!("unexpected data after connect response");
175 }
176 break 'outer;
177 }
178 }
179 if data.len() > 1024*32 { // max 32K (random chosen limit)
180 bail!("too many bytes");
181 }
182 }
183 Ok(())
184 }
185 }
186
187 impl hyper::service::Service<Uri> for HttpsConnector {
188 type Response = MaybeTlsStream<TcpStream>;
189 type Error = Error;
190 #[allow(clippy::type_complexity)]
191 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
192
193 fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
194 self.connector
195 .poll_ready(ctx)
196 .map_err(|err| err.into())
197 }
198
199 fn call(&mut self, dst: Uri) -> Self::Future {
200 let mut connector = self.connector.clone();
201 let ssl_connector = Arc::clone(&self.ssl_connector);
202 let is_https = dst.scheme() == Some(&http::uri::Scheme::HTTPS);
203 let host = match dst.host() {
204 Some(host) => host.to_owned(),
205 None => {
206 return futures::future::err(format_err!("missing URL scheme")).boxed();
207 }
208 };
209 let port = dst.port_u16().unwrap_or(if is_https { 443 } else { 80 });
210
211 if let Some(ref proxy) = self.proxy {
212
213 let use_connect = is_https || proxy.force_connect;
214
215 let proxy_authority = match build_authority(&proxy.host, proxy.port) {
216 Ok(authority) => authority,
217 Err(err) => return futures::future::err(err).boxed(),
218 };
219
220 let proxy_uri = match Uri::builder()
221 .scheme("http")
222 .authority(proxy_authority.as_str())
223 .path_and_query("/")
224 .build()
225 {
226 Ok(uri) => uri,
227 Err(err) => return futures::future::err(err.into()).boxed(),
228 };
229
230 let authorization = proxy.authorization.clone();
231
232 if use_connect {
233 async move {
234
235 let mut tcp_stream = connector
236 .call(proxy_uri)
237 .await
238 .map_err(|err| format_err!("error connecting to {} - {}", proxy_authority, err))?;
239
240 let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
241
242 let mut connect_request = format!("CONNECT {0}:{1} HTTP/1.1\r\n", host, port);
243 if let Some(authorization) = authorization {
244 connect_request.push_str(&format!("Proxy-Authorization: {}\r\n", authorization));
245 }
246 connect_request.push_str(&format!("Host: {0}:{1}\r\n\r\n", host, port));
247
248 tcp_stream.write_all(connect_request.as_bytes()).await?;
249 tcp_stream.flush().await?;
250
251 Self::parse_connect_response(&mut tcp_stream).await?;
252
253 if is_https {
254 Self::secure_stream(tcp_stream, &ssl_connector, &host).await
255 } else {
256 Ok(MaybeTlsStream::Normal(tcp_stream))
257 }
258 }.boxed()
259 } else {
260 async move {
261 let tcp_stream = connector
262 .call(proxy_uri)
263 .await
264 .map_err(|err| format_err!("error connecting to {} - {}", proxy_authority, err))?;
265
266 let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
267
268 Ok(MaybeTlsStream::Proxied(tcp_stream))
269 }.boxed()
270 }
271 } else {
272 async move {
273 let dst_str = dst.to_string(); // for error messages
274 let tcp_stream = connector
275 .call(dst)
276 .await
277 .map_err(|err| format_err!("error connecting to {} - {}", dst_str, err))?;
278
279 let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
280
281 if is_https {
282 Self::secure_stream(tcp_stream, &ssl_connector, &host).await
283 } else {
284 Ok(MaybeTlsStream::Normal(tcp_stream))
285 }
286 }.boxed()
287 }
288 }
289 }