]> git.proxmox.com Git - proxmox-backup.git/blame - src/client/http_client.rs
drop Cancellable future in favor of abortable
[proxmox-backup.git] / src / client / http_client.rs
CommitLineData
c2b94534 1use std::io::Write;
db0cb9ce 2use std::task::{Context, Poll};
597641fd 3
cf9271e2 4use chrono::Utc;
7a57cb77 5use failure::*;
82ab7230 6use futures::*;
7a57cb77
WB
7use http::Uri;
8use http::header::HeaderValue;
9use http::{Request, Response};
10use hyper::Body;
1434f4f8 11use hyper::client::{Client, HttpConnector};
6d1f61b2 12use openssl::ssl::{SslConnector, SslMethod};
ba3a60b2 13use serde_json::{json, Value};
8a1028e0 14use percent_encoding::percent_encode;
7a57cb77 15use xdg::BaseDirectories;
1fdb4c6f 16
e18a6c9e 17use proxmox::tools::{
e18a6c9e
DM
18 fs::{file_get_json, file_set_contents},
19};
20
7a57cb77 21use super::pipe_to_stream::PipeToSendStream;
1434f4f8 22use crate::tools::async_io::EitherStream;
8a1028e0 23use crate::tools::{self, tty, BroadcastFuture, DEFAULT_ENCODE_SET};
986bef16 24
5a2df000 25#[derive(Clone)]
e240d8be 26pub struct AuthInfo {
5a2df000
DM
27 username: String,
28 ticket: String,
29 token: String,
30}
56458d97 31
151c6ce2 32/// HTTP(S) API client
597641fd 33pub struct HttpClient {
1434f4f8 34 client: Client<HttpsConnector>,
597641fd 35 server: String,
5a2df000 36 auth: BroadcastFuture<AuthInfo>,
597641fd
DM
37}
38
e240d8be
DM
39/// Delete stored ticket data (logout)
40pub fn delete_ticket_info(server: &str, username: &str) -> Result<(), Error> {
41
42 let base = BaseDirectories::with_prefix("proxmox-backup")?;
43
44 // usually /run/user/<uid>/...
45 let path = base.place_runtime_file("tickets")?;
46
47 let mode = nix::sys::stat::Mode::from_bits_truncate(0o0600);
48
49 let mut data = file_get_json(&path, Some(json!({})))?;
50
51 if let Some(map) = data[server].as_object_mut() {
52 map.remove(username);
53 }
54
55 file_set_contents(path, data.to_string().as_bytes(), Some(mode))?;
56
57 Ok(())
58}
59
ba3a60b2
DM
60fn store_ticket_info(server: &str, username: &str, ticket: &str, token: &str) -> Result<(), Error> {
61
62 let base = BaseDirectories::with_prefix("proxmox-backup")?;
63
64 // usually /run/user/<uid>/...
65 let path = base.place_runtime_file("tickets")?;
66
67 let mode = nix::sys::stat::Mode::from_bits_truncate(0o0600);
68
e18a6c9e 69 let mut data = file_get_json(&path, Some(json!({})))?;
ba3a60b2
DM
70
71 let now = Utc::now().timestamp();
72
73 data[server][username] = json!({ "timestamp": now, "ticket": ticket, "token": token});
74
75 let mut new_data = json!({});
76
77 let ticket_lifetime = tools::ticket::TICKET_LIFETIME - 60;
78
79 let empty = serde_json::map::Map::new();
80 for (server, info) in data.as_object().unwrap_or(&empty) {
81 for (_user, uinfo) in info.as_object().unwrap_or(&empty) {
82 if let Some(timestamp) = uinfo["timestamp"].as_i64() {
83 let age = now - timestamp;
84 if age < ticket_lifetime {
85 new_data[server][username] = uinfo.clone();
86 }
87 }
88 }
89 }
90
e18a6c9e 91 file_set_contents(path, new_data.to_string().as_bytes(), Some(mode))?;
ba3a60b2
DM
92
93 Ok(())
94}
95
96fn load_ticket_info(server: &str, username: &str) -> Option<(String, String)> {
66c8eb93 97 let base = BaseDirectories::with_prefix("proxmox-backup").ok()?;
ba3a60b2
DM
98
99 // usually /run/user/<uid>/...
66c8eb93
CE
100 let path = base.place_runtime_file("tickets").ok()?;
101 let data = file_get_json(&path, None).ok()?;
ba3a60b2 102 let now = Utc::now().timestamp();
ba3a60b2 103 let ticket_lifetime = tools::ticket::TICKET_LIFETIME - 60;
66c8eb93
CE
104 let uinfo = data[server][username].as_object()?;
105 let timestamp = uinfo["timestamp"].as_i64()?;
106 let age = now - timestamp;
107
108 if age < ticket_lifetime {
109 let ticket = uinfo["ticket"].as_str()?;
110 let token = uinfo["token"].as_str()?;
111 Some((ticket.to_owned(), token.to_owned()))
112 } else {
113 None
ba3a60b2 114 }
ba3a60b2
DM
115}
116
597641fd
DM
117impl HttpClient {
118
cc2ce4a9 119 pub fn new(server: &str, username: &str, password: Option<String>) -> Result<Self, Error> {
5a2df000 120 let client = Self::build_client();
5a2df000 121
cc2ce4a9
DM
122 let password = if let Some(password) = password {
123 password
124 } else if let Some((ticket, _token)) = load_ticket_info(server, username) {
45cdce06
DM
125 ticket
126 } else {
127 Self::get_password(&username)?
128 };
129
9d35dbbb 130 let login_future = Self::credentials(client.clone(), server.to_owned(), username.to_owned(), password);
45cdce06
DM
131
132 Ok(Self {
5a2df000 133 client,
597641fd 134 server: String::from(server),
96f5e80a 135 auth: BroadcastFuture::new(Box::new(login_future)),
45cdce06 136 })
597641fd
DM
137 }
138
1a7a0e74 139 /// Login
e240d8be
DM
140 ///
141 /// Login is done on demand, so this is onyl required if you need
142 /// access to authentication data in 'AuthInfo'.
96f5e80a
DM
143 pub async fn login(&self) -> Result<AuthInfo, Error> {
144 self.auth.listen().await
e240d8be
DM
145 }
146
5a2df000 147 fn get_password(_username: &str) -> Result<String, Error> {
56458d97
WB
148 use std::env::VarError::*;
149 match std::env::var("PBS_PASSWORD") {
150 Ok(p) => return Ok(p),
151 Err(NotUnicode(_)) => bail!("PBS_PASSWORD contains bad characters"),
152 Err(NotPresent) => {
153 // Try another method
154 }
155 }
156
157 // If we're on a TTY, query the user for a password
158 if tty::stdin_isatty() {
159 return Ok(String::from_utf8(tty::read_password("Password: ")?)?);
160 }
161
162 bail!("no password input mechanism available");
163 }
164
1434f4f8 165 fn build_client() -> Client<HttpsConnector> {
6d1f61b2
DM
166
167 let mut ssl_connector_builder = SslConnector::builder(SslMethod::tls()).unwrap();
168
169 ssl_connector_builder.set_verify(openssl::ssl::SslVerifyMode::NONE); // fixme!
170
1434f4f8 171 let mut httpc = hyper::client::HttpConnector::new();
fcf5dea5 172 httpc.set_nodelay(true); // important for h2 download performance!
99168f43 173 httpc.set_recv_buffer_size(Some(1024*1024)); //important for h2 download performance!
4a3f6517 174 httpc.enforce_http(false); // we want https...
6d1f61b2 175
1434f4f8 176 let https = HttpsConnector::with_connector(httpc, ssl_connector_builder.build());
6d1f61b2 177
adec8ea2
DM
178 Client::builder()
179 //.http2_initial_stream_window_size( (1 << 31) - 2)
180 //.http2_initial_connection_window_size( (1 << 31) - 2)
181 .build::<_, Body>(https)
a6b75513
DM
182 }
183
1a7a0e74 184 pub async fn request(&self, mut req: Request<Body>) -> Result<Value, Error> {
597641fd 185
5a2df000 186 let client = self.client.clone();
597641fd 187
1a7a0e74 188 let auth = self.login().await?;
597641fd 189
1a7a0e74
DM
190 let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
191 req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
192 req.headers_mut().insert("CSRFPreventionToken", HeaderValue::from_str(&auth.token).unwrap());
597641fd 193
1a7a0e74 194 Self::api_request(client, req).await
1fdb4c6f
DM
195 }
196
1a7a0e74 197 pub async fn get(
a6782ca1
WB
198 &self,
199 path: &str,
200 data: Option<Value>,
1a7a0e74 201 ) -> Result<Value, Error> {
9e391bb7 202 let req = Self::request_builder(&self.server, "GET", path, data).unwrap();
1a7a0e74 203 self.request(req).await
a6b75513
DM
204 }
205
1a7a0e74 206 pub async fn delete(
a6782ca1
WB
207 &mut self,
208 path: &str,
209 data: Option<Value>,
1a7a0e74 210 ) -> Result<Value, Error> {
9e391bb7 211 let req = Self::request_builder(&self.server, "DELETE", path, data).unwrap();
1a7a0e74 212 self.request(req).await
a6b75513
DM
213 }
214
1a7a0e74 215 pub async fn post(
a6782ca1
WB
216 &mut self,
217 path: &str,
218 data: Option<Value>,
1a7a0e74 219 ) -> Result<Value, Error> {
5a2df000 220 let req = Self::request_builder(&self.server, "POST", path, data).unwrap();
1a7a0e74 221 self.request(req).await
024f11bb
DM
222 }
223
1a7a0e74 224 pub async fn download(
a6782ca1
WB
225 &mut self,
226 path: &str,
1a7a0e74
DM
227 output: &mut (dyn Write + Send),
228 ) -> Result<(), Error> {
5a2df000 229 let mut req = Self::request_builder(&self.server, "GET", path, None).unwrap();
024f11bb 230
5a2df000 231 let client = self.client.clone();
1fdb4c6f 232
1a7a0e74 233 let auth = self.login().await?;
81da38c1 234
1a7a0e74
DM
235 let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
236 req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
6f62c924 237
1a7a0e74
DM
238 let resp = client.request(req).await?;
239 let status = resp.status();
240 if !status.is_success() {
241 HttpClient::api_response(resp)
242 .map(|_| Err(format_err!("unknown error")))
243 .await?
244 } else {
245 resp.into_body()
5a2df000 246 .map_err(Error::from)
1a7a0e74
DM
247 .try_fold(output, move |acc, chunk| async move {
248 acc.write_all(&chunk)?;
249 Ok::<_, Error>(acc)
5a2df000 250 })
1a7a0e74
DM
251 .await?;
252 }
253 Ok(())
6f62c924
DM
254 }
255
1a7a0e74 256 pub async fn upload(
04512d30
DM
257 &mut self,
258 content_type: &str,
259 body: Body,
260 path: &str,
261 data: Option<Value>,
1a7a0e74 262 ) -> Result<Value, Error> {
81da38c1
DM
263
264 let path = path.trim_matches('/');
04512d30
DM
265 let mut url = format!("https://{}:8007/{}", &self.server, path);
266
267 if let Some(data) = data {
268 let query = tools::json_object_to_query(data).unwrap();
269 url.push('?');
270 url.push_str(&query);
271 }
272
273 let url: Uri = url.parse().unwrap();
81da38c1 274
5a2df000 275 let req = Request::builder()
81da38c1
DM
276 .method("POST")
277 .uri(url)
278 .header("User-Agent", "proxmox-backup-client/1.0")
5a2df000
DM
279 .header("Content-Type", content_type)
280 .body(body).unwrap();
81da38c1 281
1a7a0e74 282 self.request(req).await
1fdb4c6f
DM
283 }
284
1a7a0e74 285 pub async fn start_h2_connection(
fb047083
DM
286 &self,
287 mut req: Request<Body>,
288 protocol_name: String,
dc089345 289 ) -> Result<(H2Client, futures::future::AbortHandle), Error> {
cf639a47 290
1a7a0e74 291 let auth = self.login().await?;
cf639a47
DM
292 let client = self.client.clone();
293
1a7a0e74
DM
294 let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
295 req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
296 req.headers_mut().insert("UPGRADE", HeaderValue::from_str(&protocol_name).unwrap());
cf639a47 297
1a7a0e74
DM
298 let resp = client.request(req).await?;
299 let status = resp.status();
cf639a47 300
1a7a0e74
DM
301 if status != http::StatusCode::SWITCHING_PROTOCOLS {
302 Self::api_response(resp)
303 .map(|_| Err(format_err!("unknown error")))
304 .await?;
305 unreachable!();
306 }
307
308 let upgraded = resp
309 .into_body()
310 .on_upgrade()
311 .await?;
312
313 let max_window_size = (1 << 31) - 2;
314
315 let (h2, connection) = h2::client::Builder::new()
316 .initial_connection_window_size(max_window_size)
317 .initial_window_size(max_window_size)
318 .max_frame_size(4*1024*1024)
319 .handshake(upgraded)
320 .await?;
321
322 let connection = connection
323 .map_err(|_| panic!("HTTP/2.0 connection failed"));
324
dc089345 325 let (connection, abort) = futures::future::abortable(connection);
1a7a0e74
DM
326 // A cancellable future returns an Option which is None when cancelled and
327 // Some when it finished instead, since we don't care about the return type we
328 // need to map it away:
329 let connection = connection.map(|_| ());
330
331 // Spawn a new task to drive the connection state
db0cb9ce 332 tokio::spawn(connection);
1a7a0e74
DM
333
334 // Wait until the `SendRequest` handle has available capacity.
335 let c = h2.ready().await?;
dc089345 336 Ok((H2Client::new(c), abort))
cf639a47
DM
337 }
338
9d35dbbb 339 async fn credentials(
1434f4f8 340 client: Client<HttpsConnector>,
45cdce06
DM
341 server: String,
342 username: String,
343 password: String,
9d35dbbb
DM
344 ) -> Result<AuthInfo, Error> {
345 let data = json!({ "username": username, "password": password });
346 let req = Self::request_builder(&server, "POST", "/api2/json/access/ticket", Some(data)).unwrap();
347 let cred = Self::api_request(client, req).await?;
348 let auth = AuthInfo {
349 username: cred["data"]["username"].as_str().unwrap().to_owned(),
350 ticket: cred["data"]["ticket"].as_str().unwrap().to_owned(),
351 token: cred["data"]["CSRFPreventionToken"].as_str().unwrap().to_owned(),
352 };
353
354 let _ = store_ticket_info(&server, &auth.username, &auth.ticket, &auth.token);
355
356 Ok(auth)
ba3a60b2
DM
357 }
358
a6782ca1 359 async fn api_response(response: Response<Body>) -> Result<Value, Error> {
d2c48afc 360 let status = response.status();
db0cb9ce 361 let data = hyper::body::to_bytes(response.into_body()).await?;
a6782ca1
WB
362
363 let text = String::from_utf8(data.to_vec()).unwrap();
364 if status.is_success() {
11377a47
DM
365 if text.is_empty() {
366 Ok(Value::Null)
367 } else {
a6782ca1
WB
368 let value: Value = serde_json::from_str(&text)?;
369 Ok(value)
a6782ca1
WB
370 }
371 } else {
372 bail!("HTTP Error {}: {}", status, text);
373 }
d2c48afc
DM
374 }
375
1a7a0e74 376 async fn api_request(
1434f4f8 377 client: Client<HttpsConnector>,
5a2df000 378 req: Request<Body>
1a7a0e74 379 ) -> Result<Value, Error> {
ba3a60b2 380
5a2df000
DM
381 client.request(req)
382 .map_err(Error::from)
d2c48afc 383 .and_then(Self::api_response)
1a7a0e74 384 .await
0dffe3f9
DM
385 }
386
9e490a74
DM
387 // Read-only access to server property
388 pub fn server(&self) -> &str {
389 &self.server
390 }
391
5a2df000 392 pub fn request_builder(server: &str, method: &str, path: &str, data: Option<Value>) -> Result<Request<Body>, Error> {
591f570b 393 let path = path.trim_matches('/');
5a2df000
DM
394 let url: Uri = format!("https://{}:8007/{}", server, path).parse()?;
395
396 if let Some(data) = data {
397 if method == "POST" {
398 let request = Request::builder()
399 .method(method)
400 .uri(url)
401 .header("User-Agent", "proxmox-backup-client/1.0")
402 .header(hyper::header::CONTENT_TYPE, "application/json")
403 .body(Body::from(data.to_string()))?;
404 return Ok(request);
405 } else {
9e391bb7
DM
406 let query = tools::json_object_to_query(data)?;
407 let url: Uri = format!("https://{}:8007/{}?{}", server, path, query).parse()?;
408 let request = Request::builder()
409 .method(method)
410 .uri(url)
411 .header("User-Agent", "proxmox-backup-client/1.0")
412 .header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded")
413 .body(Body::empty())?;
414 return Ok(request);
5a2df000 415 }
5a2df000 416 }
0dffe3f9 417
1fdb4c6f 418 let request = Request::builder()
5a2df000 419 .method(method)
1fdb4c6f
DM
420 .uri(url)
421 .header("User-Agent", "proxmox-backup-client/1.0")
5a2df000
DM
422 .header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded")
423 .body(Body::empty())?;
1fdb4c6f 424
5a2df000 425 Ok(request)
597641fd
DM
426 }
427}
b57cb264 428
9af37c8f
DM
429
430#[derive(Clone)]
431pub struct H2Client {
432 h2: h2::client::SendRequest<bytes::Bytes>,
433}
434
435impl H2Client {
436
437 pub fn new(h2: h2::client::SendRequest<bytes::Bytes>) -> Self {
438 Self { h2 }
439 }
440
2a1e6d7d
DM
441 pub async fn get(
442 &self,
443 path: &str,
444 param: Option<Value>
445 ) -> Result<Value, Error> {
792a70b9 446 let req = Self::request_builder("localhost", "GET", path, param, None).unwrap();
2a1e6d7d 447 self.request(req).await
9af37c8f
DM
448 }
449
2a1e6d7d
DM
450 pub async fn put(
451 &self,
452 path: &str,
453 param: Option<Value>
454 ) -> Result<Value, Error> {
792a70b9 455 let req = Self::request_builder("localhost", "PUT", path, param, None).unwrap();
2a1e6d7d 456 self.request(req).await
9af37c8f
DM
457 }
458
2a1e6d7d
DM
459 pub async fn post(
460 &self,
461 path: &str,
462 param: Option<Value>
463 ) -> Result<Value, Error> {
792a70b9 464 let req = Self::request_builder("localhost", "POST", path, param, None).unwrap();
2a1e6d7d 465 self.request(req).await
9af37c8f
DM
466 }
467
d4a085e5 468 pub async fn download<W: Write + Send>(
a6782ca1
WB
469 &self,
470 path: &str,
471 param: Option<Value>,
2a1e6d7d 472 mut output: W,
d4a085e5 473 ) -> Result<W, Error> {
792a70b9 474 let request = Self::request_builder("localhost", "GET", path, param, None).unwrap();
dd066d28 475
2a1e6d7d 476 let response_future = self.send_request(request, None).await?;
984a7c35 477
2a1e6d7d
DM
478 let resp = response_future.await?;
479
480 let status = resp.status();
481 if !status.is_success() {
44f59dc7
DM
482 H2Client::h2api_response(resp).await?; // raise error
483 unreachable!();
2a1e6d7d
DM
484 }
485
486 let mut body = resp.into_body();
db0cb9ce
WB
487 while let Some(chunk) = body.data().await {
488 let chunk = chunk?;
489 body.flow_control().release_capacity(chunk.len())?;
2a1e6d7d
DM
490 output.write_all(&chunk)?;
491 }
492
493 Ok(output)
dd066d28
DM
494 }
495
2a1e6d7d 496 pub async fn upload(
a6782ca1 497 &self,
f011dba0 498 method: &str, // POST or PUT
a6782ca1
WB
499 path: &str,
500 param: Option<Value>,
792a70b9 501 content_type: &str,
a6782ca1 502 data: Vec<u8>,
2a1e6d7d 503 ) -> Result<Value, Error> {
f011dba0 504 let request = Self::request_builder("localhost", method, path, param, Some(content_type)).unwrap();
9af37c8f 505
2a1e6d7d
DM
506 let mut send_request = self.h2.clone().ready().await?;
507
508 let (response, stream) = send_request.send_request(request, false).unwrap();
2a05048b
DM
509
510 PipeToSendStream::new(bytes::Bytes::from(data), stream).await?;
511
512 response
513 .map_err(Error::from)
514 .and_then(Self::h2api_response)
2a1e6d7d 515 .await
9af37c8f 516 }
adec8ea2 517
2a1e6d7d 518 async fn request(
9af37c8f 519 &self,
b57cb264 520 request: Request<()>,
2a1e6d7d 521 ) -> Result<Value, Error> {
b57cb264 522
9af37c8f 523 self.send_request(request, None)
82ab7230
DM
524 .and_then(move |response| {
525 response
526 .map_err(Error::from)
527 .and_then(Self::h2api_response)
528 })
2a1e6d7d 529 .await
82ab7230
DM
530 }
531
cf9271e2 532 pub fn send_request(
9af37c8f 533 &self,
82ab7230
DM
534 request: Request<()>,
535 data: Option<bytes::Bytes>,
a6782ca1 536 ) -> impl Future<Output = Result<h2::client::ResponseFuture, Error>> {
82ab7230 537
9af37c8f 538 self.h2.clone()
10130cf4
DM
539 .ready()
540 .map_err(Error::from)
2a05048b 541 .and_then(move |mut send_request| async move {
82ab7230
DM
542 if let Some(data) = data {
543 let (response, stream) = send_request.send_request(request, false).unwrap();
2a05048b
DM
544 PipeToSendStream::new(data, stream).await?;
545 Ok(response)
82ab7230
DM
546 } else {
547 let (response, _stream) = send_request.send_request(request, true).unwrap();
2a05048b 548 Ok(response)
82ab7230 549 }
b57cb264
DM
550 })
551 }
552
f16aea68 553 pub async fn h2api_response(
a6782ca1 554 response: Response<h2::RecvStream>,
9edd3bf1 555 ) -> Result<Value, Error> {
b57cb264
DM
556 let status = response.status();
557
558 let (_head, mut body) = response.into_parts();
559
9edd3bf1 560 let mut data = Vec::new();
db0cb9ce
WB
561 while let Some(chunk) = body.data().await {
562 let chunk = chunk?;
563 // Whenever data is received, the caller is responsible for
564 // releasing capacity back to the server once it has freed
565 // the data from memory.
9edd3bf1 566 // Let the server send more data.
db0cb9ce 567 body.flow_control().release_capacity(chunk.len())?;
9edd3bf1
DM
568 data.extend(chunk);
569 }
570
571 let text = String::from_utf8(data.to_vec()).unwrap();
572 if status.is_success() {
11377a47
DM
573 if text.is_empty() {
574 Ok(Value::Null)
575 } else {
9edd3bf1
DM
576 let mut value: Value = serde_json::from_str(&text)?;
577 if let Some(map) = value.as_object_mut() {
578 if let Some(data) = map.remove("data") {
579 return Ok(data);
b57cb264 580 }
b57cb264 581 }
9edd3bf1 582 bail!("got result without data property");
9edd3bf1
DM
583 }
584 } else {
585 bail!("HTTP Error {}: {}", status, text);
586 }
b57cb264
DM
587 }
588
eb2bdd1b 589 // Note: We always encode parameters with the url
792a70b9
DM
590 pub fn request_builder(
591 server: &str,
592 method: &str,
593 path: &str,
594 param: Option<Value>,
595 content_type: Option<&str>,
596 ) -> Result<Request<()>, Error> {
b57cb264 597 let path = path.trim_matches('/');
b57cb264 598
792a70b9
DM
599 let content_type = content_type.unwrap_or("application/x-www-form-urlencoded");
600
a55b2975
DM
601 if let Some(param) = param {
602 let query = tools::json_object_to_query(param)?;
eb2bdd1b
DM
603 // We detected problem with hyper around 6000 characters - seo we try to keep on the safe side
604 if query.len() > 4096 { bail!("h2 query data too large ({} bytes) - please encode data inside body", query.len()); }
b57cb264 605 let url: Uri = format!("https://{}:8007/{}?{}", server, path, query).parse()?;
eb2bdd1b 606 let request = Request::builder()
b57cb264
DM
607 .method(method)
608 .uri(url)
609 .header("User-Agent", "proxmox-backup-client/1.0")
792a70b9 610 .header(hyper::header::CONTENT_TYPE, content_type)
b57cb264 611 .body(())?;
62ee2eb4 612 Ok(request)
eb2bdd1b
DM
613 } else {
614 let url: Uri = format!("https://{}:8007/{}", server, path).parse()?;
615 let request = Request::builder()
616 .method(method)
617 .uri(url)
618 .header("User-Agent", "proxmox-backup-client/1.0")
792a70b9 619 .header(hyper::header::CONTENT_TYPE, content_type)
eb2bdd1b 620 .body(())?;
b57cb264 621
eb2bdd1b
DM
622 Ok(request)
623 }
b57cb264
DM
624 }
625}
1434f4f8 626
db0cb9ce 627#[derive(Clone)]
1434f4f8
WB
628pub struct HttpsConnector {
629 http: HttpConnector,
db0cb9ce 630 ssl_connector: std::sync::Arc<SslConnector>,
1434f4f8
WB
631}
632
633impl HttpsConnector {
634 pub fn with_connector(mut http: HttpConnector, ssl_connector: SslConnector) -> Self {
635 http.enforce_http(false);
636
637 Self {
638 http,
db0cb9ce 639 ssl_connector: std::sync::Arc::new(ssl_connector),
1434f4f8
WB
640 }
641 }
642}
643
644type MaybeTlsStream = EitherStream<
645 tokio::net::TcpStream,
646 tokio_openssl::SslStream<tokio::net::TcpStream>,
647>;
648
db0cb9ce
WB
649impl hyper::service::Service<Uri> for HttpsConnector {
650 type Response = MaybeTlsStream;
1434f4f8 651 type Error = Error;
db0cb9ce
WB
652 type Future = std::pin::Pin<Box<
653 dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static
654 >>;
1434f4f8 655
db0cb9ce
WB
656 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
657 // This connector is always ready, but others might not be.
658 Poll::Ready(Ok(()))
659 }
1434f4f8 660
db0cb9ce
WB
661 fn call(&mut self, dst: Uri) -> Self::Future {
662 let mut this = self.clone();
663 async move {
664 let is_https = dst
665 .scheme()
666 .ok_or_else(|| format_err!("missing URL scheme"))?
667 == "https";
668 let host = dst
669 .host()
670 .ok_or_else(|| format_err!("missing hostname in destination url?"))?
671 .to_string();
672
673 let config = this.ssl_connector.configure();
674 let conn = this.http.call(dst).await?;
1434f4f8
WB
675 if is_https {
676 let conn = tokio_openssl::connect(config?, &host, conn).await?;
db0cb9ce 677 Ok(MaybeTlsStream::Right(conn))
1434f4f8 678 } else {
db0cb9ce 679 Ok(MaybeTlsStream::Left(conn))
1434f4f8 680 }
db0cb9ce 681 }.boxed()
1434f4f8
WB
682 }
683}