]> git.proxmox.com Git - proxmox-backup.git/blob - src/client/http_client.rs
src/client/http_client.rs: add flow control for h2 upload
[proxmox-backup.git] / src / client / http_client.rs
1 use failure::*;
2
3 use http::Uri;
4 use hyper::Body;
5 use hyper::client::Client;
6 use xdg::BaseDirectories;
7 use chrono::Utc;
8
9 use http::{Request, Response};
10 use http::header::HeaderValue;
11
12 use futures::Future;
13 use futures::stream::Stream;
14
15 use serde_json::{json, Value};
16 use url::percent_encoding::{percent_encode, DEFAULT_ENCODE_SET};
17
18 use crate::tools::{self, BroadcastFuture, tty};
19 use super::pipe_to_stream::*;
20
21 #[derive(Clone)]
22 struct AuthInfo {
23 username: String,
24 ticket: String,
25 token: String,
26 }
27
28 /// HTTP(S) API client
29 pub struct HttpClient {
30 client: Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>>,
31 server: String,
32 auth: BroadcastFuture<AuthInfo>,
33 }
34
35 fn store_ticket_info(server: &str, username: &str, ticket: &str, token: &str) -> Result<(), Error> {
36
37 let base = BaseDirectories::with_prefix("proxmox-backup")?;
38
39 // usually /run/user/<uid>/...
40 let path = base.place_runtime_file("tickets")?;
41
42 let mode = nix::sys::stat::Mode::from_bits_truncate(0o0600);
43
44 let mut data = tools::file_get_json(&path, Some(json!({})))?;
45
46 let now = Utc::now().timestamp();
47
48 data[server][username] = json!({ "timestamp": now, "ticket": ticket, "token": token});
49
50 let mut new_data = json!({});
51
52 let ticket_lifetime = tools::ticket::TICKET_LIFETIME - 60;
53
54 let empty = serde_json::map::Map::new();
55 for (server, info) in data.as_object().unwrap_or(&empty) {
56 for (_user, uinfo) in info.as_object().unwrap_or(&empty) {
57 if let Some(timestamp) = uinfo["timestamp"].as_i64() {
58 let age = now - timestamp;
59 if age < ticket_lifetime {
60 new_data[server][username] = uinfo.clone();
61 }
62 }
63 }
64 }
65
66 tools::file_set_contents(path, new_data.to_string().as_bytes(), Some(mode))?;
67
68 Ok(())
69 }
70
71 fn load_ticket_info(server: &str, username: &str) -> Option<(String, String)> {
72 let base = match BaseDirectories::with_prefix("proxmox-backup") {
73 Ok(b) => b,
74 _ => return None,
75 };
76
77 // usually /run/user/<uid>/...
78 let path = match base.place_runtime_file("tickets") {
79 Ok(p) => p,
80 _ => return None,
81 };
82
83 let data = match tools::file_get_json(&path, None) {
84 Ok(v) => v,
85 _ => return None,
86 };
87
88 let now = Utc::now().timestamp();
89
90 let ticket_lifetime = tools::ticket::TICKET_LIFETIME - 60;
91
92 if let Some(uinfo) = data[server][username].as_object() {
93 if let Some(timestamp) = uinfo["timestamp"].as_i64() {
94 let age = now - timestamp;
95 if age < ticket_lifetime {
96 let ticket = match uinfo["ticket"].as_str() {
97 Some(t) => t,
98 None => return None,
99 };
100 let token = match uinfo["token"].as_str() {
101 Some(t) => t,
102 None => return None,
103 };
104 return Some((ticket.to_owned(), token.to_owned()));
105 }
106 }
107 }
108
109 None
110 }
111
112 impl HttpClient {
113
114 pub fn new(server: &str, username: &str) -> Result<Self, Error> {
115 let client = Self::build_client();
116
117 let password = if let Some((ticket, _token)) = load_ticket_info(server, username) {
118 ticket
119 } else {
120 Self::get_password(&username)?
121 };
122
123 let login = Self::credentials(client.clone(), server.to_owned(), username.to_owned(), password);
124
125 Ok(Self {
126 client,
127 server: String::from(server),
128 auth: BroadcastFuture::new(login),
129 })
130 }
131
132 fn get_password(_username: &str) -> Result<String, Error> {
133 use std::env::VarError::*;
134 match std::env::var("PBS_PASSWORD") {
135 Ok(p) => return Ok(p),
136 Err(NotUnicode(_)) => bail!("PBS_PASSWORD contains bad characters"),
137 Err(NotPresent) => {
138 // Try another method
139 }
140 }
141
142 // If we're on a TTY, query the user for a password
143 if tty::stdin_isatty() {
144 return Ok(String::from_utf8(tty::read_password("Password: ")?)?);
145 }
146
147 bail!("no password input mechanism available");
148 }
149
150 fn build_client() -> Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>> {
151 let mut builder = native_tls::TlsConnector::builder();
152 // FIXME: We need a CLI option for this!
153 builder.danger_accept_invalid_certs(true);
154 let tlsconnector = builder.build().unwrap();
155 let mut httpc = hyper::client::HttpConnector::new(1);
156 httpc.enforce_http(false); // we want https...
157 let mut https = hyper_tls::HttpsConnector::from((httpc, tlsconnector));
158 https.https_only(true); // force it!
159 Client::builder().build::<_, Body>(https)
160 }
161
162 pub fn request(&self, mut req: Request<Body>) -> impl Future<Item=Value, Error=Error> {
163
164 let login = self.auth.listen();
165
166 let client = self.client.clone();
167
168 login.and_then(move |auth| {
169
170 let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
171 req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
172 req.headers_mut().insert("CSRFPreventionToken", HeaderValue::from_str(&auth.token).unwrap());
173
174 let request = Self::api_request(client, req);
175
176 request
177 })
178 }
179
180 pub fn get(&self, path: &str, data: Option<Value>) -> impl Future<Item=Value, Error=Error> {
181
182 let req = Self::request_builder(&self.server, "GET", path, data).unwrap();
183 self.request(req)
184 }
185
186 pub fn delete(&mut self, path: &str, data: Option<Value>) -> impl Future<Item=Value, Error=Error> {
187
188 let req = Self::request_builder(&self.server, "DELETE", path, data).unwrap();
189 self.request(req)
190 }
191
192 pub fn post(&mut self, path: &str, data: Option<Value>) -> impl Future<Item=Value, Error=Error> {
193
194 let req = Self::request_builder(&self.server, "POST", path, data).unwrap();
195 self.request(req)
196 }
197
198 pub fn download(&mut self, path: &str, mut output: Box<dyn std::io::Write + Send>) -> impl Future<Item=(), Error=Error> {
199
200 let mut req = Self::request_builder(&self.server, "GET", path, None).unwrap();
201
202 let login = self.auth.listen();
203
204 let client = self.client.clone();
205
206 login.and_then(move |auth| {
207
208 let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
209 req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
210
211 client.request(req)
212 .map_err(Error::from)
213 .and_then(|resp| {
214
215 let _status = resp.status(); // fixme: ??
216
217 resp.into_body()
218 .map_err(Error::from)
219 .for_each(move |chunk| {
220 output.write_all(&chunk)?;
221 Ok(())
222 })
223
224 })
225 })
226 }
227
228 pub fn upload(&mut self, content_type: &str, body: Body, path: &str) -> impl Future<Item=Value, Error=Error> {
229
230 let path = path.trim_matches('/');
231 let url: Uri = format!("https://{}:8007/{}", &self.server, path).parse().unwrap();
232
233 let req = Request::builder()
234 .method("POST")
235 .uri(url)
236 .header("User-Agent", "proxmox-backup-client/1.0")
237 .header("Content-Type", content_type)
238 .body(body).unwrap();
239
240 self.request(req)
241 }
242
243 pub fn h2upgrade(
244 &mut self, path:
245 &str, param: Option<Value>
246 ) -> impl Future<Item=H2Client, Error=Error> {
247
248 let mut req = Self::request_builder(&self.server, "GET", path, param).unwrap();
249
250 let login = self.auth.listen();
251
252 let client = self.client.clone();
253
254 login.and_then(move |auth| {
255
256 let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
257 req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
258 req.headers_mut().insert("UPGRADE", HeaderValue::from_str("proxmox-backup-protocol-h2").unwrap());
259
260 client.request(req)
261 .map_err(Error::from)
262 .and_then(|resp| {
263
264 let status = resp.status();
265 if status != http::StatusCode::SWITCHING_PROTOCOLS {
266 bail!("h2upgrade failed with status {:?}", status);
267 }
268
269 Ok(resp.into_body().on_upgrade().map_err(Error::from))
270 })
271 .flatten()
272 .and_then(|upgraded| {
273 h2::client::handshake(upgraded).map_err(Error::from)
274 })
275 .and_then(|(h2, connection)| {
276 let connection = connection
277 .map_err(|_| panic!("HTTP/2.0 connection failed"));
278
279 // Spawn a new task to drive the connection state
280 hyper::rt::spawn(connection);
281
282 // Wait until the `SendRequest` handle has available capacity.
283 h2.ready()
284 .map(H2Client::new)
285 .map_err(Error::from)
286 })
287 })
288 }
289
290 fn credentials(
291 client: Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>>,
292 server: String,
293 username: String,
294 password: String,
295 ) -> Box<Future<Item=AuthInfo, Error=Error> + Send> {
296
297 let server2 = server.clone();
298
299 let create_request = futures::future::lazy(move || {
300 let data = json!({ "username": username, "password": password });
301 let req = Self::request_builder(&server, "POST", "/api2/json/access/ticket", Some(data)).unwrap();
302 Self::api_request(client, req)
303 });
304
305 let login_future = create_request
306 .and_then(move |cred| {
307 let auth = AuthInfo {
308 username: cred["data"]["username"].as_str().unwrap().to_owned(),
309 ticket: cred["data"]["ticket"].as_str().unwrap().to_owned(),
310 token: cred["data"]["CSRFPreventionToken"].as_str().unwrap().to_owned(),
311 };
312
313 let _ = store_ticket_info(&server2, &auth.username, &auth.ticket, &auth.token);
314
315 Ok(auth)
316 });
317
318 Box::new(login_future)
319 }
320
321 fn api_request(
322 client: Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>>,
323 req: Request<Body>
324 ) -> impl Future<Item=Value, Error=Error> {
325
326 client.request(req)
327 .map_err(Error::from)
328 .and_then(|resp| {
329
330 let status = resp.status();
331
332 resp
333 .into_body()
334 .concat2()
335 .map_err(Error::from)
336 .and_then(move |data| {
337
338 let text = String::from_utf8(data.to_vec()).unwrap();
339 if status.is_success() {
340 if text.len() > 0 {
341 let value: Value = serde_json::from_str(&text)?;
342 Ok(value)
343 } else {
344 Ok(Value::Null)
345 }
346 } else {
347 bail!("HTTP Error {}: {}", status, text);
348 }
349 })
350 })
351 }
352
353 pub fn request_builder(server: &str, method: &str, path: &str, data: Option<Value>) -> Result<Request<Body>, Error> {
354 let path = path.trim_matches('/');
355 let url: Uri = format!("https://{}:8007/{}", server, path).parse()?;
356
357 if let Some(data) = data {
358 if method == "POST" {
359 let request = Request::builder()
360 .method(method)
361 .uri(url)
362 .header("User-Agent", "proxmox-backup-client/1.0")
363 .header(hyper::header::CONTENT_TYPE, "application/json")
364 .body(Body::from(data.to_string()))?;
365 return Ok(request);
366 } else {
367 let query = tools::json_object_to_query(data)?;
368 let url: Uri = format!("https://{}:8007/{}?{}", server, path, query).parse()?;
369 let request = Request::builder()
370 .method(method)
371 .uri(url)
372 .header("User-Agent", "proxmox-backup-client/1.0")
373 .header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded")
374 .body(Body::empty())?;
375 return Ok(request);
376 }
377 }
378
379 let request = Request::builder()
380 .method(method)
381 .uri(url)
382 .header("User-Agent", "proxmox-backup-client/1.0")
383 .header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded")
384 .body(Body::empty())?;
385
386 Ok(request)
387 }
388 }
389
390 #[derive(Clone)]
391 pub struct H2Client {
392 h2: h2::client::SendRequest<bytes::Bytes>,
393 }
394
395 impl H2Client {
396
397 pub fn new(h2: h2::client::SendRequest<bytes::Bytes>) -> Self {
398 Self { h2 }
399 }
400
401 pub fn get(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
402 let req = Self::request_builder("localhost", "GET", path, param).unwrap();
403 self.request(req)
404 }
405
406 pub fn post(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
407 let req = Self::request_builder("localhost", "POST", path, param).unwrap();
408 self.request(req)
409 }
410
411 pub fn upload(&self, path: &str, param: Option<Value>, data: Vec<u8>) -> impl Future<Item=Value, Error=Error> {
412 let request = Self::request_builder("localhost", "POST", path, param).unwrap();
413
414 self.h2.clone()
415 .ready()
416 .map_err(Error::from)
417 .and_then(move |mut send_request| {
418 let (response, stream) = send_request.send_request(request, false).unwrap();
419 PipeToSendStream::new(bytes::Bytes::from(data), stream)
420 .and_then(|_| {
421 response
422 .map_err(Error::from)
423 .and_then(Self::h2api_response)
424 })
425 })
426 }
427
428 fn request(
429 &self,
430 request: Request<()>,
431 ) -> impl Future<Item=Value, Error=Error> {
432
433 self.h2.clone()
434 .ready()
435 .map_err(Error::from)
436 .and_then(move |mut send_request| {
437 let (response, _stream) = send_request.send_request(request, true).unwrap();
438 response
439 .map_err(Error::from)
440 .and_then(Self::h2api_response)
441 })
442 }
443
444 fn h2api_response(response: Response<h2::RecvStream>) -> impl Future<Item=Value, Error=Error> {
445
446 let status = response.status();
447
448 let (_head, mut body) = response.into_parts();
449
450 // The `release_capacity` handle allows the caller to manage
451 // flow control.
452 //
453 // Whenever data is received, the caller is responsible for
454 // releasing capacity back to the server once it has freed
455 // the data from memory.
456 let mut release_capacity = body.release_capacity().clone();
457
458 body
459 .map(move |chunk| {
460 // Let the server send more data.
461 let _ = release_capacity.release_capacity(chunk.len());
462 chunk
463 })
464 .concat2()
465 .map_err(Error::from)
466 .and_then(move |data| {
467 let text = String::from_utf8(data.to_vec()).unwrap();
468 if status.is_success() {
469 if text.len() > 0 {
470 let mut value: Value = serde_json::from_str(&text)?;
471 if let Some(map) = value.as_object_mut() {
472 if let Some(data) = map.remove("data") {
473 return Ok(data);
474 }
475 }
476 bail!("got result without data property");
477 } else {
478 Ok(Value::Null)
479 }
480 } else {
481 bail!("HTTP Error {}: {}", status, text);
482 }
483 })
484 }
485
486 pub fn request_builder(server: &str, method: &str, path: &str, data: Option<Value>) -> Result<Request<()>, Error> {
487 let path = path.trim_matches('/');
488 let url: Uri = format!("https://{}:8007/{}", server, path).parse()?;
489
490 if let Some(data) = data {
491 let query = tools::json_object_to_query(data)?;
492 let url: Uri = format!("https://{}:8007/{}?{}", server, path, query).parse()?;
493 let request = Request::builder()
494 .method(method)
495 .uri(url)
496 .header("User-Agent", "proxmox-backup-client/1.0")
497 .header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded")
498 .body(())?;
499 return Ok(request);
500 }
501
502 let request = Request::builder()
503 .method(method)
504 .uri(url)
505 .header("User-Agent", "proxmox-backup-client/1.0")
506 .header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded")
507 .body(())?;
508
509 Ok(request)
510 }
511 }