]> git.proxmox.com Git - proxmox.git/blame - proxmox-http/src/client/connector.rs
http: move TLS helper to client feature
[proxmox.git] / proxmox-http / src / client / connector.rs
CommitLineData
94378967 1use anyhow::{bail, format_err, Error};
80b423f3
FG
2use std::os::unix::io::AsRawFd;
3use std::pin::Pin;
8734d0c2 4use std::sync::Arc;
80b423f3
FG
5use std::task::{Context, Poll};
6
7use futures::*;
8use http::Uri;
9use hyper::client::HttpConnector;
10use openssl::ssl::SslConnector;
00ca0b7f 11use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
80b423f3
FG
12use tokio::net::TcpStream;
13use tokio_openssl::SslStream;
14
82245339 15use proxmox_sys::linux::socket::set_tcp_keepalive;
80b423f3 16
ca9dd7ec
WB
17use crate::proxy_config::ProxyConfig;
18use crate::uri::build_authority;
80b423f3 19
94456ee4 20use super::tls::MaybeTlsStream;
8734d0c2
DM
21use super::{RateLimitedStream, ShareableRateLimit};
22
23type SharedRateLimit = Arc<dyn ShareableRateLimit>;
00ca0b7f 24
80b423f3
FG
25#[derive(Clone)]
26pub struct HttpsConnector {
27 connector: HttpConnector,
28 ssl_connector: Arc<SslConnector>,
29 proxy: Option<ProxyConfig>,
30 tcp_keepalive: u32,
8734d0c2
DM
31 read_limiter: Option<SharedRateLimit>,
32 write_limiter: Option<SharedRateLimit>,
80b423f3
FG
33}
34
35impl HttpsConnector {
94378967
FG
36 pub fn with_connector(
37 mut connector: HttpConnector,
38 ssl_connector: SslConnector,
39 tcp_keepalive: u32,
40 ) -> Self {
80b423f3
FG
41 connector.enforce_http(false);
42 Self {
43 connector,
44 ssl_connector: Arc::new(ssl_connector),
45 proxy: None,
46 tcp_keepalive,
00ca0b7f
DM
47 read_limiter: None,
48 write_limiter: None,
80b423f3
FG
49 }
50 }
51
52 pub fn set_proxy(&mut self, proxy: ProxyConfig) {
53 self.proxy = Some(proxy);
54 }
55
8734d0c2 56 pub fn set_read_limiter(&mut self, limiter: Option<SharedRateLimit>) {
00ca0b7f
DM
57 self.read_limiter = limiter;
58 }
59
8734d0c2 60 pub fn set_write_limiter(&mut self, limiter: Option<SharedRateLimit>) {
00ca0b7f
DM
61 self.write_limiter = limiter;
62 }
63
64 async fn secure_stream<S: AsyncRead + AsyncWrite + Unpin>(
65 tcp_stream: S,
80b423f3
FG
66 ssl_connector: &SslConnector,
67 host: &str,
00ca0b7f 68 ) -> Result<MaybeTlsStream<S>, Error> {
80b423f3 69 let config = ssl_connector.configure()?;
00ca0b7f 70 let mut conn: SslStream<S> = SslStream::new(config.into_ssl(host)?, tcp_stream)?;
80b423f3
FG
71 Pin::new(&mut conn).connect().await?;
72 Ok(MaybeTlsStream::Secured(conn))
73 }
74
75 fn parse_status_line(status_line: &str) -> Result<(), Error> {
76 if !(status_line.starts_with("HTTP/1.1 200") || status_line.starts_with("HTTP/1.0 200")) {
77 bail!("proxy connect failed - invalid status: {}", status_line)
78 }
79 Ok(())
80 }
81
94378967 82 async fn parse_connect_response<R: AsyncRead + Unpin>(stream: &mut R) -> Result<(), Error> {
80b423f3
FG
83 let mut data: Vec<u8> = Vec::new();
84 let mut buffer = [0u8; 256];
85 const END_MARK: &[u8; 4] = b"\r\n\r\n";
86
87 'outer: loop {
88 let n = stream.read(&mut buffer[..]).await?;
94378967
FG
89 if n == 0 {
90 break;
91 }
92 let search_start = if data.len() > END_MARK.len() {
93 data.len() - END_MARK.len() + 1
94 } else {
95 0
96 };
80b423f3
FG
97 data.extend(&buffer[..n]);
98 if data.len() >= END_MARK.len() {
94378967
FG
99 if let Some(pos) = data[search_start..]
100 .windows(END_MARK.len())
101 .position(|w| w == END_MARK)
102 {
80b423f3
FG
103 let response = String::from_utf8_lossy(&data);
104 let status_line = match response.split("\r\n").next() {
105 Some(status) => status,
106 None => bail!("missing newline"),
107 };
108 Self::parse_status_line(status_line)?;
109
110 if pos != data.len() - END_MARK.len() {
111 bail!("unexpected data after connect response");
112 }
113 break 'outer;
114 }
115 }
94378967
FG
116 if data.len() > 1024 * 32 {
117 // max 32K (random chosen limit)
80b423f3
FG
118 bail!("too many bytes");
119 }
120 }
121 Ok(())
122 }
123}
124
125impl hyper::service::Service<Uri> for HttpsConnector {
00ca0b7f 126 type Response = MaybeTlsStream<RateLimitedStream<TcpStream>>;
80b423f3
FG
127 type Error = Error;
128 #[allow(clippy::type_complexity)]
94378967
FG
129 type Future =
130 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
80b423f3
FG
131
132 fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
94378967 133 self.connector.poll_ready(ctx).map_err(|err| err.into())
80b423f3
FG
134 }
135
136 fn call(&mut self, dst: Uri) -> Self::Future {
137 let mut connector = self.connector.clone();
138 let ssl_connector = Arc::clone(&self.ssl_connector);
139 let is_https = dst.scheme() == Some(&http::uri::Scheme::HTTPS);
140 let host = match dst.host() {
141 Some(host) => host.to_owned(),
142 None => {
143 return futures::future::err(format_err!("missing URL scheme")).boxed();
144 }
145 };
146 let port = dst.port_u16().unwrap_or(if is_https { 443 } else { 80 });
147 let keepalive = self.tcp_keepalive;
00ca0b7f
DM
148 let read_limiter = self.read_limiter.clone();
149 let write_limiter = self.write_limiter.clone();
150
80b423f3 151 if let Some(ref proxy) = self.proxy {
80b423f3
FG
152 let use_connect = is_https || proxy.force_connect;
153
ca9dd7ec 154 let proxy_authority = match build_authority(&proxy.host, proxy.port) {
80b423f3 155 Ok(authority) => authority,
c9b4a4f3 156 Err(err) => return futures::future::err(err.into()).boxed(),
80b423f3
FG
157 };
158
159 let proxy_uri = match Uri::builder()
160 .scheme("http")
161 .authority(proxy_authority.as_str())
162 .path_and_query("/")
163 .build()
164 {
165 Ok(uri) => uri,
166 Err(err) => return futures::future::err(err.into()).boxed(),
167 };
168
169 let authorization = proxy.authorization.clone();
170
171 if use_connect {
172 async move {
00ca0b7f 173 let tcp_stream = connector.call(proxy_uri).await.map_err(|err| {
94378967
FG
174 format_err!("error connecting to {} - {}", proxy_authority, err)
175 })?;
80b423f3
FG
176
177 let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
178
0eeb0dd1
TL
179 let mut tcp_stream =
180 RateLimitedStream::with_limiter(tcp_stream, read_limiter, write_limiter);
00ca0b7f 181
80b423f3
FG
182 let mut connect_request = format!("CONNECT {0}:{1} HTTP/1.1\r\n", host, port);
183 if let Some(authorization) = authorization {
94378967
FG
184 connect_request
185 .push_str(&format!("Proxy-Authorization: {}\r\n", authorization));
80b423f3
FG
186 }
187 connect_request.push_str(&format!("Host: {0}:{1}\r\n\r\n", host, port));
188
189 tcp_stream.write_all(connect_request.as_bytes()).await?;
190 tcp_stream.flush().await?;
191
192 Self::parse_connect_response(&mut tcp_stream).await?;
193
194 if is_https {
195 Self::secure_stream(tcp_stream, &ssl_connector, &host).await
196 } else {
197 Ok(MaybeTlsStream::Normal(tcp_stream))
198 }
94378967
FG
199 }
200 .boxed()
80b423f3 201 } else {
94378967
FG
202 async move {
203 let tcp_stream = connector.call(proxy_uri).await.map_err(|err| {
204 format_err!("error connecting to {} - {}", proxy_authority, err)
205 })?;
80b423f3 206
94378967 207 let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
80b423f3 208
0eeb0dd1
TL
209 let tcp_stream =
210 RateLimitedStream::with_limiter(tcp_stream, read_limiter, write_limiter);
00ca0b7f 211
94378967
FG
212 Ok(MaybeTlsStream::Proxied(tcp_stream))
213 }
214 .boxed()
80b423f3
FG
215 }
216 } else {
217 async move {
218 let dst_str = dst.to_string(); // for error messages
219 let tcp_stream = connector
220 .call(dst)
221 .await
222 .map_err(|err| format_err!("error connecting to {} - {}", dst_str, err))?;
223
224 let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
225
0eeb0dd1
TL
226 let tcp_stream =
227 RateLimitedStream::with_limiter(tcp_stream, read_limiter, write_limiter);
00ca0b7f 228
80b423f3
FG
229 if is_https {
230 Self::secure_stream(tcp_stream, &ssl_connector, &host).await
231 } else {
232 Ok(MaybeTlsStream::Normal(tcp_stream))
233 }
94378967
FG
234 }
235 .boxed()
80b423f3
FG
236 }
237 }
238}