]>
Commit | Line | Data |
---|---|---|
94378967 | 1 | use anyhow::{bail, format_err, Error}; |
80b423f3 FG |
2 | use std::os::unix::io::AsRawFd; |
3 | use std::pin::Pin; | |
8734d0c2 | 4 | use std::sync::Arc; |
80b423f3 FG |
5 | use std::task::{Context, Poll}; |
6 | ||
7 | use futures::*; | |
8 | use http::Uri; | |
9 | use hyper::client::HttpConnector; | |
10 | use openssl::ssl::SslConnector; | |
00ca0b7f | 11 | use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; |
80b423f3 FG |
12 | use tokio::net::TcpStream; |
13 | use tokio_openssl::SslStream; | |
14 | ||
82245339 | 15 | use proxmox_sys::linux::socket::set_tcp_keepalive; |
80b423f3 | 16 | |
ca9dd7ec WB |
17 | use crate::proxy_config::ProxyConfig; |
18 | use crate::uri::build_authority; | |
80b423f3 | 19 | |
94456ee4 | 20 | use super::tls::MaybeTlsStream; |
8734d0c2 DM |
21 | use super::{RateLimitedStream, ShareableRateLimit}; |
22 | ||
23 | type SharedRateLimit = Arc<dyn ShareableRateLimit>; | |
00ca0b7f | 24 | |
80b423f3 FG |
25 | #[derive(Clone)] |
26 | pub struct HttpsConnector { | |
27 | connector: HttpConnector, | |
28 | ssl_connector: Arc<SslConnector>, | |
29 | proxy: Option<ProxyConfig>, | |
30 | tcp_keepalive: u32, | |
8734d0c2 DM |
31 | read_limiter: Option<SharedRateLimit>, |
32 | write_limiter: Option<SharedRateLimit>, | |
80b423f3 FG |
33 | } |
34 | ||
35 | impl HttpsConnector { | |
94378967 FG |
36 | pub fn with_connector( |
37 | mut connector: HttpConnector, | |
38 | ssl_connector: SslConnector, | |
39 | tcp_keepalive: u32, | |
40 | ) -> Self { | |
80b423f3 FG |
41 | connector.enforce_http(false); |
42 | Self { | |
43 | connector, | |
44 | ssl_connector: Arc::new(ssl_connector), | |
45 | proxy: None, | |
46 | tcp_keepalive, | |
00ca0b7f DM |
47 | read_limiter: None, |
48 | write_limiter: None, | |
80b423f3 FG |
49 | } |
50 | } | |
51 | ||
52 | pub fn set_proxy(&mut self, proxy: ProxyConfig) { | |
53 | self.proxy = Some(proxy); | |
54 | } | |
55 | ||
8734d0c2 | 56 | pub fn set_read_limiter(&mut self, limiter: Option<SharedRateLimit>) { |
00ca0b7f DM |
57 | self.read_limiter = limiter; |
58 | } | |
59 | ||
8734d0c2 | 60 | pub fn set_write_limiter(&mut self, limiter: Option<SharedRateLimit>) { |
00ca0b7f DM |
61 | self.write_limiter = limiter; |
62 | } | |
63 | ||
64 | async fn secure_stream<S: AsyncRead + AsyncWrite + Unpin>( | |
65 | tcp_stream: S, | |
80b423f3 FG |
66 | ssl_connector: &SslConnector, |
67 | host: &str, | |
00ca0b7f | 68 | ) -> Result<MaybeTlsStream<S>, Error> { |
80b423f3 | 69 | let config = ssl_connector.configure()?; |
00ca0b7f | 70 | let mut conn: SslStream<S> = SslStream::new(config.into_ssl(host)?, tcp_stream)?; |
80b423f3 FG |
71 | Pin::new(&mut conn).connect().await?; |
72 | Ok(MaybeTlsStream::Secured(conn)) | |
73 | } | |
74 | ||
75 | fn parse_status_line(status_line: &str) -> Result<(), Error> { | |
76 | if !(status_line.starts_with("HTTP/1.1 200") || status_line.starts_with("HTTP/1.0 200")) { | |
77 | bail!("proxy connect failed - invalid status: {}", status_line) | |
78 | } | |
79 | Ok(()) | |
80 | } | |
81 | ||
94378967 | 82 | async fn parse_connect_response<R: AsyncRead + Unpin>(stream: &mut R) -> Result<(), Error> { |
80b423f3 FG |
83 | let mut data: Vec<u8> = Vec::new(); |
84 | let mut buffer = [0u8; 256]; | |
85 | const END_MARK: &[u8; 4] = b"\r\n\r\n"; | |
86 | ||
87 | 'outer: loop { | |
88 | let n = stream.read(&mut buffer[..]).await?; | |
94378967 FG |
89 | if n == 0 { |
90 | break; | |
91 | } | |
92 | let search_start = if data.len() > END_MARK.len() { | |
93 | data.len() - END_MARK.len() + 1 | |
94 | } else { | |
95 | 0 | |
96 | }; | |
80b423f3 FG |
97 | data.extend(&buffer[..n]); |
98 | if data.len() >= END_MARK.len() { | |
94378967 FG |
99 | if let Some(pos) = data[search_start..] |
100 | .windows(END_MARK.len()) | |
101 | .position(|w| w == END_MARK) | |
102 | { | |
80b423f3 FG |
103 | let response = String::from_utf8_lossy(&data); |
104 | let status_line = match response.split("\r\n").next() { | |
105 | Some(status) => status, | |
106 | None => bail!("missing newline"), | |
107 | }; | |
108 | Self::parse_status_line(status_line)?; | |
109 | ||
110 | if pos != data.len() - END_MARK.len() { | |
111 | bail!("unexpected data after connect response"); | |
112 | } | |
113 | break 'outer; | |
114 | } | |
115 | } | |
94378967 FG |
116 | if data.len() > 1024 * 32 { |
117 | // max 32K (random chosen limit) | |
80b423f3 FG |
118 | bail!("too many bytes"); |
119 | } | |
120 | } | |
121 | Ok(()) | |
122 | } | |
123 | } | |
124 | ||
125 | impl hyper::service::Service<Uri> for HttpsConnector { | |
00ca0b7f | 126 | type Response = MaybeTlsStream<RateLimitedStream<TcpStream>>; |
80b423f3 FG |
127 | type Error = Error; |
128 | #[allow(clippy::type_complexity)] | |
94378967 FG |
129 | type Future = |
130 | Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>; | |
80b423f3 FG |
131 | |
132 | fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | |
94378967 | 133 | self.connector.poll_ready(ctx).map_err(|err| err.into()) |
80b423f3 FG |
134 | } |
135 | ||
136 | fn call(&mut self, dst: Uri) -> Self::Future { | |
137 | let mut connector = self.connector.clone(); | |
138 | let ssl_connector = Arc::clone(&self.ssl_connector); | |
139 | let is_https = dst.scheme() == Some(&http::uri::Scheme::HTTPS); | |
140 | let host = match dst.host() { | |
141 | Some(host) => host.to_owned(), | |
142 | None => { | |
143 | return futures::future::err(format_err!("missing URL scheme")).boxed(); | |
144 | } | |
145 | }; | |
146 | let port = dst.port_u16().unwrap_or(if is_https { 443 } else { 80 }); | |
147 | let keepalive = self.tcp_keepalive; | |
00ca0b7f DM |
148 | let read_limiter = self.read_limiter.clone(); |
149 | let write_limiter = self.write_limiter.clone(); | |
150 | ||
80b423f3 | 151 | if let Some(ref proxy) = self.proxy { |
80b423f3 FG |
152 | let use_connect = is_https || proxy.force_connect; |
153 | ||
ca9dd7ec | 154 | let proxy_authority = match build_authority(&proxy.host, proxy.port) { |
80b423f3 | 155 | Ok(authority) => authority, |
c9b4a4f3 | 156 | Err(err) => return futures::future::err(err.into()).boxed(), |
80b423f3 FG |
157 | }; |
158 | ||
159 | let proxy_uri = match Uri::builder() | |
160 | .scheme("http") | |
161 | .authority(proxy_authority.as_str()) | |
162 | .path_and_query("/") | |
163 | .build() | |
164 | { | |
165 | Ok(uri) => uri, | |
166 | Err(err) => return futures::future::err(err.into()).boxed(), | |
167 | }; | |
168 | ||
169 | let authorization = proxy.authorization.clone(); | |
170 | ||
171 | if use_connect { | |
172 | async move { | |
00ca0b7f | 173 | let tcp_stream = connector.call(proxy_uri).await.map_err(|err| { |
94378967 FG |
174 | format_err!("error connecting to {} - {}", proxy_authority, err) |
175 | })?; | |
80b423f3 FG |
176 | |
177 | let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive); | |
178 | ||
0eeb0dd1 TL |
179 | let mut tcp_stream = |
180 | RateLimitedStream::with_limiter(tcp_stream, read_limiter, write_limiter); | |
00ca0b7f | 181 | |
80b423f3 FG |
182 | let mut connect_request = format!("CONNECT {0}:{1} HTTP/1.1\r\n", host, port); |
183 | if let Some(authorization) = authorization { | |
94378967 FG |
184 | connect_request |
185 | .push_str(&format!("Proxy-Authorization: {}\r\n", authorization)); | |
80b423f3 FG |
186 | } |
187 | connect_request.push_str(&format!("Host: {0}:{1}\r\n\r\n", host, port)); | |
188 | ||
189 | tcp_stream.write_all(connect_request.as_bytes()).await?; | |
190 | tcp_stream.flush().await?; | |
191 | ||
192 | Self::parse_connect_response(&mut tcp_stream).await?; | |
193 | ||
194 | if is_https { | |
195 | Self::secure_stream(tcp_stream, &ssl_connector, &host).await | |
196 | } else { | |
197 | Ok(MaybeTlsStream::Normal(tcp_stream)) | |
198 | } | |
94378967 FG |
199 | } |
200 | .boxed() | |
80b423f3 | 201 | } else { |
94378967 FG |
202 | async move { |
203 | let tcp_stream = connector.call(proxy_uri).await.map_err(|err| { | |
204 | format_err!("error connecting to {} - {}", proxy_authority, err) | |
205 | })?; | |
80b423f3 | 206 | |
94378967 | 207 | let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive); |
80b423f3 | 208 | |
0eeb0dd1 TL |
209 | let tcp_stream = |
210 | RateLimitedStream::with_limiter(tcp_stream, read_limiter, write_limiter); | |
00ca0b7f | 211 | |
94378967 FG |
212 | Ok(MaybeTlsStream::Proxied(tcp_stream)) |
213 | } | |
214 | .boxed() | |
80b423f3 FG |
215 | } |
216 | } else { | |
217 | async move { | |
218 | let dst_str = dst.to_string(); // for error messages | |
219 | let tcp_stream = connector | |
220 | .call(dst) | |
221 | .await | |
222 | .map_err(|err| format_err!("error connecting to {} - {}", dst_str, err))?; | |
223 | ||
224 | let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive); | |
225 | ||
0eeb0dd1 TL |
226 | let tcp_stream = |
227 | RateLimitedStream::with_limiter(tcp_stream, read_limiter, write_limiter); | |
00ca0b7f | 228 | |
80b423f3 FG |
229 | if is_https { |
230 | Self::secure_stream(tcp_stream, &ssl_connector, &host).await | |
231 | } else { | |
232 | Ok(MaybeTlsStream::Normal(tcp_stream)) | |
233 | } | |
94378967 FG |
234 | } |
235 | .boxed() | |
80b423f3 FG |
236 | } |
237 | } | |
238 | } |