]> git.proxmox.com Git - proxmox-backup.git/blame - src/tools/http.rs
proxmox_restore_daemon: mount ntfs with 'utf8' option
[proxmox-backup.git] / src / tools / http.rs
CommitLineData
5eb9dd0c 1use anyhow::{Error, format_err, bail};
5eb9dd0c
SR
2use std::task::{Context, Poll};
3use std::os::unix::io::AsRawFd;
137a6ebc 4use std::collections::HashMap;
0f860f71 5use std::pin::Pin;
02a58862 6use std::sync::Arc;
5eb9dd0c
SR
7
8use hyper::{Uri, Body};
9use hyper::client::{Client, HttpConnector};
cf8e44bc 10use http::{Request, Response, HeaderValue};
5eb9dd0c
SR
11use openssl::ssl::{SslConnector, SslMethod};
12use futures::*;
9104152a
DM
13use tokio::{
14 io::{
b6c06dce
DM
15 AsyncRead,
16 AsyncReadExt,
9104152a 17 AsyncWriteExt,
9104152a
DM
18 },
19 net::TcpStream,
20};
a2072cc3 21use tokio_openssl::SslStream;
5eb9dd0c
SR
22
23use crate::tools::{
a2072cc3 24 async_io::MaybeTlsStream,
5eb9dd0c
SR
25 socket::{
26 set_tcp_keepalive,
27 PROXMOX_BACKUP_TCP_KEEPALIVE_TIME,
28 },
29};
30
9104152a
DM
31/// HTTP Proxy Configuration
32#[derive(Clone)]
33pub struct ProxyConfig {
34 pub host: String,
35 pub port: u16,
cf8e44bc 36 pub authorization: Option<String>, // Proxy-Authorization header value
9104152a
DM
37 pub force_connect: bool,
38}
39
7a7fcb47
DM
40impl ProxyConfig {
41
42 /// Parse proxy config from ALL_PROXY environment var
43 pub fn from_proxy_env() -> Result<Option<ProxyConfig>, Error> {
44
45 // We only support/use ALL_PROXY environment
46
47 match std::env::var_os("ALL_PROXY") {
48 None => return Ok(None),
49 Some(all_proxy) => {
50 let all_proxy = match all_proxy.to_str() {
51 Some(s) => String::from(s),
52 None => bail!("non UTF-8 content in env ALL_PROXY"),
53 };
54 if all_proxy.is_empty() {
55 return Ok(None);
56 }
57 let config = Self::parse_proxy_url(&all_proxy)?;
58 Ok(Some(config))
59 }
60 }
61 }
62
63 /// Parse proxy configuration string [http://]<host>[:port]
64 ///
65 /// Default port is 1080 (like curl)
66 pub fn parse_proxy_url(http_proxy: &str) -> Result<ProxyConfig, Error> {
67 proxmox::try_block!({
68 let proxy_uri: Uri = http_proxy.parse()?;
69 let proxy_authority = match proxy_uri.authority() {
70 Some(authority) => authority,
71 None => bail!("missing proxy authority"),
72 };
73 let host = proxy_authority.host().to_owned();
74 let port = match proxy_uri.port() {
75 Some(port) => port.as_u16(),
76 None => 1080, // CURL default port
77 };
78
79 match proxy_uri.scheme_str() {
80 Some("http") => { /* Ok */ }
81 Some(scheme) => bail!("unsupported proxy scheme '{}'", scheme),
82 None => { /* assume HTTP */ }
83 }
84
85 let authority_vec: Vec<&str> = proxy_authority.as_str().rsplitn(2, '@').collect();
86 let authorization = if authority_vec.len() == 2 {
87 Some(format!("Basic {}", base64::encode(authority_vec[1])))
88 } else {
89 None
90 };
91
92 Ok(ProxyConfig {
93 host,
94 port,
95 authorization,
96 force_connect: false,
97 })
98 }).map_err(|err| format_err!("parse_proxy_url failed: {}", err))
99 }
100}
101
26153589
DM
102/// Asyncrounous HTTP client implementation
103pub struct SimpleHttp {
104 client: Client<HttpsConnector, Body>,
cf8e44bc 105 proxy_authorization: Option<String>, // Proxy-Authorization header value
c0147e49 106 user_agent: Option<String>,
5eb9dd0c
SR
107}
108
26153589 109impl SimpleHttp {
137a6ebc 110
d52b1209
DM
111 pub const DEFAULT_USER_AGENT_STRING: &'static str = "proxmox-backup-client/1.0";
112
9104152a 113 pub fn new(proxy_config: Option<ProxyConfig>) -> Self {
26153589 114 let ssl_connector = SslConnector::builder(SslMethod::tls()).unwrap().build();
9104152a 115 Self::with_ssl_connector(ssl_connector, proxy_config)
137a6ebc
SR
116 }
117
9104152a 118 pub fn with_ssl_connector(ssl_connector: SslConnector, proxy_config: Option<ProxyConfig>) -> Self {
cf8e44bc
DM
119
120 let mut proxy_authorization = None;
121 if let Some(ref proxy_config) = proxy_config {
122 if !proxy_config.force_connect {
123 proxy_authorization = proxy_config.authorization.clone();
124 }
125 }
126
26153589 127 let connector = HttpConnector::new();
9104152a
DM
128 let mut https = HttpsConnector::with_connector(connector, ssl_connector);
129 if let Some(proxy_config) = proxy_config {
130 https.set_proxy(proxy_config);
131 }
26153589 132 let client = Client::builder().build(https);
c0147e49
DM
133 Self { client, proxy_authorization, user_agent: None }
134 }
135
136 pub fn set_user_agent(&mut self, user_agent: &str) -> Result<(), Error> {
137 self.user_agent = Some(user_agent.to_owned());
138 Ok(())
cf8e44bc
DM
139 }
140
141 fn add_proxy_headers(&self, request: &mut Request<Body>) -> Result<(), Error> {
142 if request.uri().scheme() != Some(&http::uri::Scheme::HTTPS) {
143 if let Some(ref authorization) = self.proxy_authorization {
144 request
145 .headers_mut()
146 .insert(
147 http::header::PROXY_AUTHORIZATION,
148 HeaderValue::from_str(authorization)?,
149 );
150 }
151 }
152 Ok(())
26153589 153 }
5eb9dd0c 154
cf8e44bc 155 pub async fn request(&self, mut request: Request<Body>) -> Result<Response<Body>, Error> {
c0147e49
DM
156 let user_agent = if let Some(ref user_agent) = self.user_agent {
157 HeaderValue::from_str(&user_agent)?
158 } else {
159 HeaderValue::from_str(Self::DEFAULT_USER_AGENT_STRING)?
160 };
d52b1209 161
c0147e49 162 request.headers_mut().insert(hyper::header::USER_AGENT, user_agent);
d52b1209 163
cf8e44bc 164 self.add_proxy_headers(&mut request)?;
d52b1209 165
9104152a
DM
166 self.client.request(request)
167 .map_err(Error::from)
168 .await
169 }
170
26153589
DM
171 pub async fn post(
172 &mut self,
173 uri: &str,
174 body: Option<String>,
175 content_type: Option<&str>,
176 ) -> Result<Response<Body>, Error> {
177
178 let body = if let Some(body) = body {
179 Body::from(body)
180 } else {
181 Body::empty()
182 };
183 let content_type = content_type.unwrap_or("application/json");
184
185 let request = Request::builder()
186 .method("POST")
187 .uri(uri)
26153589
DM
188 .header(hyper::header::CONTENT_TYPE, content_type)
189 .body(body)?;
190
cf8e44bc 191 self.request(request).await
5eb9dd0c
SR
192 }
193
26153589
DM
194 pub async fn get_string(
195 &mut self,
196 uri: &str,
197 extra_headers: Option<&HashMap<String, String>>,
198 ) -> Result<String, Error> {
199
200 let mut request = Request::builder()
201 .method("GET")
d52b1209 202 .uri(uri);
26153589
DM
203
204 if let Some(hs) = extra_headers {
205 for (h, v) in hs.iter() {
206 request = request.header(h, v);
207 }
208 }
2e201e7d 209
26153589 210 let request = request.body(Body::empty())?;
5eb9dd0c 211
cf8e44bc 212 let res = self.request(request).await?;
26153589
DM
213
214 let status = res.status();
215 if !status.is_success() {
216 bail!("Got bad status '{}' from server", status)
217 }
218
219 Self::response_body_string(res).await
220 }
221
222 pub async fn response_body_string(res: Response<Body>) -> Result<String, Error> {
223 let buf = hyper::body::to_bytes(res).await?;
224 String::from_utf8(buf.to_vec())
225 .map_err(|err| format_err!("Error converting HTTP result data: {}", err))
226 }
2e201e7d
TL
227}
228
5eb9dd0c
SR
229#[derive(Clone)]
230pub struct HttpsConnector {
02a58862
DM
231 connector: HttpConnector,
232 ssl_connector: Arc<SslConnector>,
9104152a 233 proxy: Option<ProxyConfig>,
5eb9dd0c
SR
234}
235
236impl HttpsConnector {
02a58862
DM
237 pub fn with_connector(mut connector: HttpConnector, ssl_connector: SslConnector) -> Self {
238 connector.enforce_http(false);
5eb9dd0c 239 Self {
02a58862
DM
240 connector,
241 ssl_connector: Arc::new(ssl_connector),
9104152a 242 proxy: None,
5eb9dd0c
SR
243 }
244 }
9104152a
DM
245
246 pub fn set_proxy(&mut self, proxy: ProxyConfig) {
247 self.proxy = Some(proxy);
248 }
249
250 async fn secure_stream(
251 tcp_stream: TcpStream,
252 ssl_connector: &SslConnector,
253 host: &str,
254 ) -> Result<MaybeTlsStream<TcpStream>, Error> {
255 let config = ssl_connector.configure()?;
256 let mut conn: SslStream<TcpStream> = SslStream::new(config.into_ssl(host)?, tcp_stream)?;
257 Pin::new(&mut conn).connect().await?;
258 Ok(MaybeTlsStream::Secured(conn))
259 }
260
1ed9069a
DM
261 fn parse_status_line(status_line: &str) -> Result<(), Error> {
262 if !(status_line.starts_with("HTTP/1.1 200") || status_line.starts_with("HTTP/1.0 200")) {
263 bail!("proxy connect failed - invalid status: {}", status_line)
264 }
265 Ok(())
266 }
267
268 async fn parse_connect_response<R: AsyncRead + Unpin>(
b6c06dce 269 stream: &mut R,
1ed9069a 270 ) -> Result<(), Error> {
9104152a 271
b6c06dce
DM
272 let mut data: Vec<u8> = Vec::new();
273 let mut buffer = [0u8; 256];
274 const END_MARK: &[u8; 4] = b"\r\n\r\n";
275
276 'outer: loop {
277 let n = stream.read(&mut buffer[..]).await?;
278 if n == 0 { break; }
279 let search_start = if data.len() > END_MARK.len() { data.len() - END_MARK.len() + 1 } else { 0 };
280 data.extend(&buffer[..n]);
281 if data.len() >= END_MARK.len() {
282 if let Some(pos) = data[search_start..].windows(END_MARK.len()).position(|w| w == END_MARK) {
1ed9069a
DM
283 let response = String::from_utf8_lossy(&data);
284 let status_line = match response.split("\r\n").next() {
285 Some(status) => status,
286 None => bail!("missing newline"),
287 };
288 Self::parse_status_line(status_line)?;
289
b6c06dce
DM
290 if pos != data.len() - END_MARK.len() {
291 bail!("unexpected data after connect response");
292 }
293 break 'outer;
294 }
295 }
296 if data.len() > 1024*32 { // max 32K (random chosen limit)
297 bail!("too many bytes");
298 }
299 }
9104152a
DM
300 Ok(())
301 }
5eb9dd0c
SR
302}
303
5eb9dd0c 304impl hyper::service::Service<Uri> for HttpsConnector {
a2072cc3 305 type Response = MaybeTlsStream<TcpStream>;
5eb9dd0c 306 type Error = Error;
12e874ce
FG
307 #[allow(clippy::type_complexity)]
308 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
5eb9dd0c 309
9104152a
DM
310 fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
311 self.connector
312 .poll_ready(ctx)
313 .map_err(|err| err.into())
5eb9dd0c
SR
314 }
315
316 fn call(&mut self, dst: Uri) -> Self::Future {
02a58862
DM
317 let mut connector = self.connector.clone();
318 let ssl_connector = Arc::clone(&self.ssl_connector);
319 let is_https = dst.scheme() == Some(&http::uri::Scheme::HTTPS);
320 let host = match dst.host() {
321 Some(host) => host.to_owned(),
322 None => {
323 return futures::future::err(format_err!("missing URL scheme")).boxed();
324 }
325 };
9104152a
DM
326 let port = dst.port_u16().unwrap_or(if is_https { 443 } else { 80 });
327
328 if let Some(ref proxy) = self.proxy {
329
330 let use_connect = is_https || proxy.force_connect;
331
332 let proxy_url = format!("{}:{}", proxy.host, proxy.port);
333 let proxy_uri = match Uri::builder()
334 .scheme("http")
335 .authority(proxy_url.as_str())
336 .path_and_query("/")
337 .build()
338 {
339 Ok(uri) => uri,
340 Err(err) => return futures::future::err(err.into()).boxed(),
341 };
342
cf8e44bc
DM
343 let authorization = proxy.authorization.clone();
344
9104152a
DM
345 if use_connect {
346 async move {
347
b6c06dce 348 let mut tcp_stream = connector
9104152a
DM
349 .call(proxy_uri)
350 .await
351 .map_err(|err| format_err!("error connecting to {} - {}", proxy_url, err))?;
fa016c16 352
b6c06dce 353 let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
5eb9dd0c 354
cf8e44bc
DM
355 let mut connect_request = format!("CONNECT {0}:{1} HTTP/1.1\r\n", host, port);
356 if let Some(authorization) = authorization {
357 connect_request.push_str(&format!("Proxy-Authorization: {}\r\n", authorization));
358 }
359 connect_request.push_str(&format!("Host: {0}:{1}\r\n\r\n", host, port));
9104152a 360
b6c06dce
DM
361 tcp_stream.write_all(connect_request.as_bytes()).await?;
362 tcp_stream.flush().await?;
9104152a 363
1ed9069a 364 Self::parse_connect_response(&mut tcp_stream).await?;
9104152a
DM
365
366 if is_https {
367 Self::secure_stream(tcp_stream, &ssl_connector, &host).await
368 } else {
369 Ok(MaybeTlsStream::Normal(tcp_stream))
370 }
371 }.boxed()
5eb9dd0c 372 } else {
9104152a
DM
373 async move {
374 let tcp_stream = connector
375 .call(proxy_uri)
376 .await
377 .map_err(|err| format_err!("error connecting to {} - {}", proxy_url, err))?;
378
379 let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
380
381 Ok(MaybeTlsStream::Proxied(tcp_stream))
382 }.boxed()
5eb9dd0c 383 }
9104152a
DM
384 } else {
385 async move {
386 let dst_str = dst.to_string(); // for error messages
387 let tcp_stream = connector
388 .call(dst)
389 .await
390 .map_err(|err| format_err!("error connecting to {} - {}", dst_str, err))?;
391
392 let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
393
394 if is_https {
395 Self::secure_stream(tcp_stream, &ssl_connector, &host).await
396 } else {
397 Ok(MaybeTlsStream::Normal(tcp_stream))
398 }
399 }.boxed()
400 }
5eb9dd0c
SR
401 }
402}