]> git.proxmox.com Git - proxmox-backup.git/blob - src/client/http_client.rs
tree-wide: use 'dyn' for all trait objects
[proxmox-backup.git] / src / client / http_client.rs
1 use failure::*;
2
3 use http::Uri;
4 use hyper::Body;
5 use hyper::client::Client;
6 use xdg::BaseDirectories;
7 use chrono::Utc;
8 use std::collections::HashSet;
9 use std::sync::{Arc, Mutex};
10
11 use http::{Request, Response};
12 use http::header::HeaderValue;
13
14 use futures::*;
15 use futures::stream::Stream;
16 use std::sync::atomic::{AtomicUsize, Ordering};
17 use tokio::sync::mpsc;
18
19 use serde_json::{json, Value};
20 use url::percent_encoding::{percent_encode, DEFAULT_ENCODE_SET};
21
22 use crate::tools::{self, BroadcastFuture, tty};
23 use crate::tools::futures::{cancellable, Canceller};
24 use super::pipe_to_stream::*;
25 use super::merge_known_chunks::*;
26
27 use crate::backup::*;
28
29
30 #[derive(Clone)]
31 struct AuthInfo {
32 username: String,
33 ticket: String,
34 token: String,
35 }
36
37 /// HTTP(S) API client
38 pub struct HttpClient {
39 client: Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>>,
40 server: String,
41 auth: BroadcastFuture<AuthInfo>,
42 }
43
44 fn store_ticket_info(server: &str, username: &str, ticket: &str, token: &str) -> Result<(), Error> {
45
46 let base = BaseDirectories::with_prefix("proxmox-backup")?;
47
48 // usually /run/user/<uid>/...
49 let path = base.place_runtime_file("tickets")?;
50
51 let mode = nix::sys::stat::Mode::from_bits_truncate(0o0600);
52
53 let mut data = tools::file_get_json(&path, Some(json!({})))?;
54
55 let now = Utc::now().timestamp();
56
57 data[server][username] = json!({ "timestamp": now, "ticket": ticket, "token": token});
58
59 let mut new_data = json!({});
60
61 let ticket_lifetime = tools::ticket::TICKET_LIFETIME - 60;
62
63 let empty = serde_json::map::Map::new();
64 for (server, info) in data.as_object().unwrap_or(&empty) {
65 for (_user, uinfo) in info.as_object().unwrap_or(&empty) {
66 if let Some(timestamp) = uinfo["timestamp"].as_i64() {
67 let age = now - timestamp;
68 if age < ticket_lifetime {
69 new_data[server][username] = uinfo.clone();
70 }
71 }
72 }
73 }
74
75 tools::file_set_contents(path, new_data.to_string().as_bytes(), Some(mode))?;
76
77 Ok(())
78 }
79
80 fn load_ticket_info(server: &str, username: &str) -> Option<(String, String)> {
81 let base = match BaseDirectories::with_prefix("proxmox-backup") {
82 Ok(b) => b,
83 _ => return None,
84 };
85
86 // usually /run/user/<uid>/...
87 let path = match base.place_runtime_file("tickets") {
88 Ok(p) => p,
89 _ => return None,
90 };
91
92 let data = match tools::file_get_json(&path, None) {
93 Ok(v) => v,
94 _ => return None,
95 };
96
97 let now = Utc::now().timestamp();
98
99 let ticket_lifetime = tools::ticket::TICKET_LIFETIME - 60;
100
101 if let Some(uinfo) = data[server][username].as_object() {
102 if let Some(timestamp) = uinfo["timestamp"].as_i64() {
103 let age = now - timestamp;
104 if age < ticket_lifetime {
105 let ticket = match uinfo["ticket"].as_str() {
106 Some(t) => t,
107 None => return None,
108 };
109 let token = match uinfo["token"].as_str() {
110 Some(t) => t,
111 None => return None,
112 };
113 return Some((ticket.to_owned(), token.to_owned()));
114 }
115 }
116 }
117
118 None
119 }
120
121 impl HttpClient {
122
123 pub fn new(server: &str, username: &str) -> Result<Self, Error> {
124 let client = Self::build_client();
125
126 let password = if let Some((ticket, _token)) = load_ticket_info(server, username) {
127 ticket
128 } else {
129 Self::get_password(&username)?
130 };
131
132 let login = Self::credentials(client.clone(), server.to_owned(), username.to_owned(), password);
133
134 Ok(Self {
135 client,
136 server: String::from(server),
137 auth: BroadcastFuture::new(login),
138 })
139 }
140
141 fn get_password(_username: &str) -> Result<String, Error> {
142 use std::env::VarError::*;
143 match std::env::var("PBS_PASSWORD") {
144 Ok(p) => return Ok(p),
145 Err(NotUnicode(_)) => bail!("PBS_PASSWORD contains bad characters"),
146 Err(NotPresent) => {
147 // Try another method
148 }
149 }
150
151 // If we're on a TTY, query the user for a password
152 if tty::stdin_isatty() {
153 return Ok(String::from_utf8(tty::read_password("Password: ")?)?);
154 }
155
156 bail!("no password input mechanism available");
157 }
158
159 fn build_client() -> Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>> {
160 let mut builder = native_tls::TlsConnector::builder();
161 // FIXME: We need a CLI option for this!
162 builder.danger_accept_invalid_certs(true);
163 let tlsconnector = builder.build().unwrap();
164 let mut httpc = hyper::client::HttpConnector::new(1);
165 //httpc.set_nodelay(true); // not sure if this help?
166 httpc.enforce_http(false); // we want https...
167 let mut https = hyper_tls::HttpsConnector::from((httpc, tlsconnector));
168 https.https_only(true); // force it!
169 Client::builder()
170 //.http2_initial_stream_window_size( (1 << 31) - 2)
171 //.http2_initial_connection_window_size( (1 << 31) - 2)
172 .build::<_, Body>(https)
173 }
174
175 pub fn request(&self, mut req: Request<Body>) -> impl Future<Item=Value, Error=Error> {
176
177 let login = self.auth.listen();
178
179 let client = self.client.clone();
180
181 login.and_then(move |auth| {
182
183 let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
184 req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
185 req.headers_mut().insert("CSRFPreventionToken", HeaderValue::from_str(&auth.token).unwrap());
186
187 let request = Self::api_request(client, req);
188
189 request
190 })
191 }
192
193 pub fn get(&self, path: &str, data: Option<Value>) -> impl Future<Item=Value, Error=Error> {
194
195 let req = Self::request_builder(&self.server, "GET", path, data).unwrap();
196 self.request(req)
197 }
198
199 pub fn delete(&mut self, path: &str, data: Option<Value>) -> impl Future<Item=Value, Error=Error> {
200
201 let req = Self::request_builder(&self.server, "DELETE", path, data).unwrap();
202 self.request(req)
203 }
204
205 pub fn post(&mut self, path: &str, data: Option<Value>) -> impl Future<Item=Value, Error=Error> {
206
207 let req = Self::request_builder(&self.server, "POST", path, data).unwrap();
208 self.request(req)
209 }
210
211 pub fn download(&mut self, path: &str, mut output: Box<dyn std::io::Write + Send>) -> impl Future<Item=(), Error=Error> {
212
213 let mut req = Self::request_builder(&self.server, "GET", path, None).unwrap();
214
215 let login = self.auth.listen();
216
217 let client = self.client.clone();
218
219 login.and_then(move |auth| {
220
221 let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
222 req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
223
224 client.request(req)
225 .map_err(Error::from)
226 .and_then(|resp| {
227
228 let _status = resp.status(); // fixme: ??
229
230 resp.into_body()
231 .map_err(Error::from)
232 .for_each(move |chunk| {
233 output.write_all(&chunk)?;
234 Ok(())
235 })
236
237 })
238 })
239 }
240
241 pub fn upload(&mut self, content_type: &str, body: Body, path: &str) -> impl Future<Item=Value, Error=Error> {
242
243 let path = path.trim_matches('/');
244 let url: Uri = format!("https://{}:8007/{}", &self.server, path).parse().unwrap();
245
246 let req = Request::builder()
247 .method("POST")
248 .uri(url)
249 .header("User-Agent", "proxmox-backup-client/1.0")
250 .header("Content-Type", content_type)
251 .body(body).unwrap();
252
253 self.request(req)
254 }
255
256 pub fn start_backup(
257 &self,
258 datastore: &str,
259 backup_type: &str,
260 backup_id: &str,
261 debug: bool,
262 ) -> impl Future<Item=BackupClient, Error=Error> {
263
264 let param = json!({"backup-type": backup_type, "backup-id": backup_id, "store": datastore, "debug": debug});
265 let mut req = Self::request_builder(&self.server, "GET", "/api2/json/backup", Some(param)).unwrap();
266
267 let login = self.auth.listen();
268
269 let client = self.client.clone();
270
271 login.and_then(move |auth| {
272
273 let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
274 req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
275 req.headers_mut().insert("UPGRADE", HeaderValue::from_str(PROXMOX_BACKUP_PROTOCOL_ID_V1!()).unwrap());
276
277 client.request(req)
278 .map_err(Error::from)
279 .and_then(|resp| {
280
281 let status = resp.status();
282 if status != http::StatusCode::SWITCHING_PROTOCOLS {
283 future::Either::A(Self::api_response(resp).and_then(|_| { bail!("unknown error"); }))
284 } else {
285 future::Either::B(resp.into_body().on_upgrade().map_err(Error::from))
286 }
287 })
288 .and_then(|upgraded| {
289 h2::client::handshake(upgraded).map_err(Error::from)
290 })
291 .and_then(|(h2, connection)| {
292 let connection = connection
293 .map_err(|_| panic!("HTTP/2.0 connection failed"));
294
295 let (connection, canceller) = cancellable(connection)?;
296 // A cancellable future returns an Option which is None when cancelled and
297 // Some when it finished instead, since we don't care about the return type we
298 // need to map it away:
299 let connection = connection.map(|_| ());
300
301 // Spawn a new task to drive the connection state
302 hyper::rt::spawn(connection);
303
304 // Wait until the `SendRequest` handle has available capacity.
305 Ok(h2.ready()
306 .map(move |c| BackupClient::new(c, canceller))
307 .map_err(Error::from))
308 })
309 .flatten()
310 })
311 }
312
313 fn credentials(
314 client: Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>>,
315 server: String,
316 username: String,
317 password: String,
318 ) -> Box<dyn Future<Item=AuthInfo, Error=Error> + Send> {
319
320 let server2 = server.clone();
321
322 let create_request = futures::future::lazy(move || {
323 let data = json!({ "username": username, "password": password });
324 let req = Self::request_builder(&server, "POST", "/api2/json/access/ticket", Some(data)).unwrap();
325 Self::api_request(client, req)
326 });
327
328 let login_future = create_request
329 .and_then(move |cred| {
330 let auth = AuthInfo {
331 username: cred["data"]["username"].as_str().unwrap().to_owned(),
332 ticket: cred["data"]["ticket"].as_str().unwrap().to_owned(),
333 token: cred["data"]["CSRFPreventionToken"].as_str().unwrap().to_owned(),
334 };
335
336 let _ = store_ticket_info(&server2, &auth.username, &auth.ticket, &auth.token);
337
338 Ok(auth)
339 });
340
341 Box::new(login_future)
342 }
343
344 fn api_response(response: Response<Body>) -> impl Future<Item=Value, Error=Error> {
345
346 let status = response.status();
347
348 response
349 .into_body()
350 .concat2()
351 .map_err(Error::from)
352 .and_then(move |data| {
353
354 let text = String::from_utf8(data.to_vec()).unwrap();
355 if status.is_success() {
356 if text.len() > 0 {
357 let value: Value = serde_json::from_str(&text)?;
358 Ok(value)
359 } else {
360 Ok(Value::Null)
361 }
362 } else {
363 bail!("HTTP Error {}: {}", status, text);
364 }
365 })
366 }
367
368 fn api_request(
369 client: Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>>,
370 req: Request<Body>
371 ) -> impl Future<Item=Value, Error=Error> {
372
373 client.request(req)
374 .map_err(Error::from)
375 .and_then(Self::api_response)
376 }
377
378 pub fn request_builder(server: &str, method: &str, path: &str, data: Option<Value>) -> Result<Request<Body>, Error> {
379 let path = path.trim_matches('/');
380 let url: Uri = format!("https://{}:8007/{}", server, path).parse()?;
381
382 if let Some(data) = data {
383 if method == "POST" {
384 let request = Request::builder()
385 .method(method)
386 .uri(url)
387 .header("User-Agent", "proxmox-backup-client/1.0")
388 .header(hyper::header::CONTENT_TYPE, "application/json")
389 .body(Body::from(data.to_string()))?;
390 return Ok(request);
391 } else {
392 let query = tools::json_object_to_query(data)?;
393 let url: Uri = format!("https://{}:8007/{}?{}", server, path, query).parse()?;
394 let request = Request::builder()
395 .method(method)
396 .uri(url)
397 .header("User-Agent", "proxmox-backup-client/1.0")
398 .header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded")
399 .body(Body::empty())?;
400 return Ok(request);
401 }
402 }
403
404 let request = Request::builder()
405 .method(method)
406 .uri(url)
407 .header("User-Agent", "proxmox-backup-client/1.0")
408 .header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded")
409 .body(Body::empty())?;
410
411 Ok(request)
412 }
413 }
414
415 //#[derive(Clone)]
416 pub struct BackupClient {
417 h2: H2Client,
418 canceller: Option<Canceller>,
419 }
420
421
422 impl BackupClient {
423
424 pub fn new(h2: h2::client::SendRequest<bytes::Bytes>, canceller: Canceller) -> Self {
425 Self {
426 h2: H2Client::new(h2),
427 canceller: Some(canceller),
428 }
429 }
430
431 pub fn get(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
432 self.h2.get(path, param)
433 }
434
435 pub fn put(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
436 self.h2.put(path, param)
437 }
438
439 pub fn post(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
440 self.h2.post(path, param)
441 }
442
443 pub fn finish(mut self) -> impl Future<Item=(), Error=Error> {
444 let canceler = self.canceller.take().unwrap();
445 self.h2.clone().post("finish", None).map(move |_| {
446 canceler.cancel();
447 ()
448 })
449 }
450
451 pub fn force_close(mut self) {
452 self.canceller.take().unwrap().cancel();
453 }
454
455 pub fn upload_config<P: AsRef<std::path::Path>>(
456 &self,
457 src_path: P,
458 file_name: &str,
459 ) -> impl Future<Item=(), Error=Error> {
460
461 let h2 = self.h2.clone();
462 let file_name = file_name.to_owned();
463 let src_path = src_path.as_ref().to_owned();
464
465 let task = tokio::fs::File::open(src_path.clone())
466 .map_err(move |err| format_err!("unable to open file {:?} - {}", src_path, err))
467 .and_then(|file| {
468 let contents = vec![];
469 tokio::io::read_to_end(file, contents)
470 .map_err(Error::from)
471 .and_then(move |(_, contents)| {
472 let param = json!({"size": contents.len(), "file-name": file_name });
473 h2.upload("config", Some(param), contents)
474 .map(|_| {})
475 })
476 });
477
478 task
479 }
480
481 pub fn upload_stream(
482 &self,
483 archive_name: &str,
484 stream: impl Stream<Item=bytes::BytesMut, Error=Error>,
485 prefix: &str,
486 fixed_size: Option<u64>,
487 ) -> impl Future<Item=(), Error=Error> {
488
489 let known_chunks = Arc::new(Mutex::new(HashSet::new()));
490
491 let mut stream_len = 0u64;
492
493 let stream = stream.
494 map(move |data| {
495 let digest = openssl::sha::sha256(&data);
496 let offset = stream_len;
497 stream_len += data.len() as u64;
498 ChunkInfo { data, digest, offset }
499 });
500
501 let h2 = self.h2.clone();
502 let h2_2 = self.h2.clone();
503 let h2_3 = self.h2.clone();
504 let h2_4 = self.h2.clone();
505
506 let mut param = json!({ "archive-name": archive_name });
507 if let Some(size) = fixed_size {
508 param["size"] = size.into();
509 }
510
511 let index_path = format!("{}_index", prefix);
512 let close_path = format!("{}_close", prefix);
513
514 let prefix = prefix.to_owned();
515
516 Self::download_chunk_list(h2, &index_path, archive_name, known_chunks.clone())
517 .and_then(move |_| {
518 h2_2.post(&index_path, Some(param))
519 })
520 .and_then(move |res| {
521 let wid = res.as_u64().unwrap();
522 Self::upload_chunk_info_stream(h2_3, wid, stream, &prefix, known_chunks.clone())
523 .and_then(move |(chunk_count, size, _speed)| {
524 let param = json!({
525 "wid": wid ,
526 "chunk-count": chunk_count,
527 "size": size,
528 });
529 h2_4.post(&close_path, Some(param))
530 })
531 .map(|_| ())
532 })
533 }
534
535 fn response_queue() -> (
536 mpsc::Sender<h2::client::ResponseFuture>,
537 sync::oneshot::Receiver<Result<(), Error>>
538 ) {
539 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100);
540 let (verify_result_tx, verify_result_rx) = sync::oneshot::channel();
541
542 hyper::rt::spawn(
543 verify_queue_rx
544 .map_err(Error::from)
545 .for_each(|response: h2::client::ResponseFuture| {
546 response
547 .map_err(Error::from)
548 .and_then(H2Client::h2api_response)
549 .and_then(|result| {
550 println!("RESPONSE: {:?}", result);
551 Ok(())
552 })
553 .map_err(|err| format_err!("pipelined request failed: {}", err))
554 })
555 .then(|result|
556 verify_result_tx.send(result)
557 )
558 .map_err(|_| { /* ignore closed channel */ })
559 );
560
561 (verify_queue_tx, verify_result_rx)
562 }
563
564 fn append_chunk_queue(h2: H2Client, wid: u64, path: String) -> (
565 mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>,
566 sync::oneshot::Receiver<Result<(), Error>>
567 ) {
568 let (verify_queue_tx, verify_queue_rx) = mpsc::channel(64);
569 let (verify_result_tx, verify_result_rx) = sync::oneshot::channel();
570
571 let h2_2 = h2.clone();
572
573 hyper::rt::spawn(
574 verify_queue_rx
575 .map_err(Error::from)
576 .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| {
577 match (response, merged_chunk_info) {
578 (Some(response), MergedChunkInfo::Known(list)) => {
579 future::Either::A(
580 response
581 .map_err(Error::from)
582 .and_then(H2Client::h2api_response)
583 .and_then(move |_result| {
584 Ok(MergedChunkInfo::Known(list))
585 })
586 )
587 }
588 (None, MergedChunkInfo::Known(list)) => {
589 future::Either::B(future::ok(MergedChunkInfo::Known(list)))
590 }
591 _ => unreachable!(),
592 }
593 })
594 .merge_known_chunks()
595 .and_then(move |merged_chunk_info| {
596 match merged_chunk_info {
597 MergedChunkInfo::Known(chunk_list) => {
598 let mut digest_list = vec![];
599 let mut offset_list = vec![];
600 for (offset, digest) in chunk_list {
601 //println!("append chunk {} (offset {})", tools::digest_to_hex(&digest), offset);
602 digest_list.push(tools::digest_to_hex(&digest));
603 offset_list.push(offset);
604 }
605 println!("append chunks list len ({})", digest_list.len());
606 let param = json!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list });
607 let mut request = H2Client::request_builder("localhost", "PUT", &path, None).unwrap();
608 request.headers_mut().insert(hyper::header::CONTENT_TYPE, HeaderValue::from_static("application/json"));
609 let param_data = bytes::Bytes::from(param.to_string().as_bytes());
610 let upload_data = Some(param_data);
611 h2_2.send_request(request, upload_data)
612 .and_then(move |response| {
613 response
614 .map_err(Error::from)
615 .and_then(H2Client::h2api_response)
616 .and_then(|_| Ok(()))
617 })
618 .map_err(|err| format_err!("pipelined request failed: {}", err))
619 }
620 _ => unreachable!(),
621 }
622 })
623 .for_each(|_| Ok(()))
624 .then(|result|
625 verify_result_tx.send(result)
626 )
627 .map_err(|_| { /* ignore closed channel */ })
628 );
629
630 (verify_queue_tx, verify_result_rx)
631 }
632
633 fn download_chunk_list(
634 h2: H2Client,
635 path: &str,
636 archive_name: &str,
637 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
638 ) -> impl Future<Item=(), Error=Error> {
639
640 let param = json!({ "archive-name": archive_name });
641 let request = H2Client::request_builder("localhost", "GET", path, Some(param)).unwrap();
642
643 h2.send_request(request, None)
644 .and_then(move |response| {
645 response
646 .map_err(Error::from)
647 .and_then(move |resp| {
648 let status = resp.status();
649
650 if !status.is_success() {
651 future::Either::A(H2Client::h2api_response(resp).and_then(|_| { bail!("unknown error"); }))
652 } else {
653 future::Either::B(future::ok(resp.into_body()))
654 }
655 })
656 .and_then(move |mut body| {
657
658 let mut release_capacity = body.release_capacity().clone();
659
660 DigestListDecoder::new(body.map_err(Error::from))
661 .for_each(move |chunk| {
662 let _ = release_capacity.release_capacity(chunk.len());
663 println!("GOT DOWNLOAD {}", tools::digest_to_hex(&chunk));
664 known_chunks.lock().unwrap().insert(chunk);
665 Ok(())
666 })
667 })
668 })
669 }
670
671 fn upload_chunk_info_stream(
672 h2: H2Client,
673 wid: u64,
674 stream: impl Stream<Item=ChunkInfo, Error=Error>,
675 prefix: &str,
676 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
677 ) -> impl Future<Item=(usize, usize, usize), Error=Error> {
678
679 let repeat = std::sync::Arc::new(AtomicUsize::new(0));
680 let repeat2 = repeat.clone();
681
682 let stream_len = std::sync::Arc::new(AtomicUsize::new(0));
683 let stream_len2 = stream_len.clone();
684
685 let append_chunk_path = format!("{}_index", prefix);
686 let upload_chunk_path = format!("{}_chunk", prefix);
687
688 let (upload_queue, upload_result) = Self::append_chunk_queue(h2.clone(), wid, append_chunk_path.to_owned());
689
690 let start_time = std::time::Instant::now();
691
692 stream
693 .map(move |chunk_info| {
694 repeat.fetch_add(1, Ordering::SeqCst);
695 stream_len.fetch_add(chunk_info.data.len(), Ordering::SeqCst);
696
697 let mut known_chunks = known_chunks.lock().unwrap();
698 let chunk_is_known = known_chunks.contains(&chunk_info.digest);
699 if chunk_is_known {
700 MergedChunkInfo::Known(vec![(chunk_info.offset, chunk_info.digest)])
701 } else {
702 known_chunks.insert(chunk_info.digest);
703 MergedChunkInfo::New(chunk_info)
704 }
705 })
706 .merge_known_chunks()
707 .for_each(move |merged_chunk_info| {
708
709 if let MergedChunkInfo::New(chunk_info) = merged_chunk_info {
710 let offset = chunk_info.offset;
711 let digest = chunk_info.digest;
712 let upload_queue = upload_queue.clone();
713
714 println!("upload new chunk {} ({} bytes, offset {})", tools::digest_to_hex(&digest),
715 chunk_info.data.len(), offset);
716
717 let param = json!({ "wid": wid, "size" : chunk_info.data.len() });
718 let request = H2Client::request_builder("localhost", "POST", &upload_chunk_path, Some(param)).unwrap();
719 let upload_data = Some(chunk_info.data.freeze());
720
721 let new_info = MergedChunkInfo::Known(vec![(offset, digest)]);
722
723 future::Either::A(
724 h2.send_request(request, upload_data)
725 .and_then(move |response| {
726 upload_queue.clone().send((new_info, Some(response)))
727 .map(|_| ()).map_err(Error::from)
728 })
729 )
730 } else {
731
732 future::Either::B(
733 upload_queue.clone().send((merged_chunk_info, None))
734 .map(|_| ()).map_err(Error::from)
735 )
736 }
737 })
738 .then(move |result| {
739 println!("RESULT {:?}", result);
740 upload_result.map_err(Error::from).and_then(|upload1_result| {
741 Ok(upload1_result.and(result))
742 })
743 })
744 .flatten()
745 .and_then(move |_| {
746 let repeat = repeat2.load(Ordering::SeqCst);
747 let stream_len = stream_len2.load(Ordering::SeqCst);
748 let speed = ((stream_len*1000000)/(1024*1024))/(start_time.elapsed().as_micros() as usize);
749 println!("Uploaded {} chunks in {} seconds ({} MB/s).", repeat, start_time.elapsed().as_secs(), speed);
750 if repeat > 0 {
751 println!("Average chunk size was {} bytes.", stream_len/repeat);
752 println!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128));
753 }
754 Ok((repeat, stream_len, speed))
755 })
756 }
757
758 pub fn upload_speedtest(&self) -> impl Future<Item=usize, Error=Error> {
759
760 let mut data = vec![];
761 // generate pseudo random byte sequence
762 for i in 0..1024*1024 {
763 for j in 0..4 {
764 let byte = ((i >> (j<<3))&0xff) as u8;
765 data.push(byte);
766 }
767 }
768
769 let item_len = data.len();
770
771 let repeat = std::sync::Arc::new(AtomicUsize::new(0));
772 let repeat2 = repeat.clone();
773
774 let (upload_queue, upload_result) = Self::response_queue();
775
776 let start_time = std::time::Instant::now();
777
778 let h2 = self.h2.clone();
779
780 futures::stream::repeat(data)
781 .take_while(move |_| {
782 repeat.fetch_add(1, Ordering::SeqCst);
783 Ok(start_time.elapsed().as_secs() < 5)
784 })
785 .for_each(move |data| {
786 let h2 = h2.clone();
787
788 let upload_queue = upload_queue.clone();
789
790 println!("send test data ({} bytes)", data.len());
791 let request = H2Client::request_builder("localhost", "POST", "speedtest", None).unwrap();
792 h2.send_request(request, Some(bytes::Bytes::from(data)))
793 .and_then(move |response| {
794 upload_queue.send(response)
795 .map(|_| ()).map_err(Error::from)
796 })
797 })
798 .then(move |result| {
799 println!("RESULT {:?}", result);
800 upload_result.map_err(Error::from).and_then(|upload1_result| {
801 Ok(upload1_result.and(result))
802 })
803 })
804 .flatten()
805 .and_then(move |_| {
806 let repeat = repeat2.load(Ordering::SeqCst);
807 println!("Uploaded {} chunks in {} seconds.", repeat, start_time.elapsed().as_secs());
808 let speed = ((item_len*1000000*(repeat as usize))/(1024*1024))/(start_time.elapsed().as_micros() as usize);
809 if repeat > 0 {
810 println!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128));
811 }
812 Ok(speed)
813 })
814 }
815 }
816
817 #[derive(Clone)]
818 pub struct H2Client {
819 h2: h2::client::SendRequest<bytes::Bytes>,
820 }
821
822 impl H2Client {
823
824 pub fn new(h2: h2::client::SendRequest<bytes::Bytes>) -> Self {
825 Self { h2 }
826 }
827
828 pub fn get(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
829 let req = Self::request_builder("localhost", "GET", path, param).unwrap();
830 self.request(req)
831 }
832
833 pub fn put(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
834 let req = Self::request_builder("localhost", "PUT", path, param).unwrap();
835 self.request(req)
836 }
837
838 pub fn post(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
839 let req = Self::request_builder("localhost", "POST", path, param).unwrap();
840 self.request(req)
841 }
842
843 pub fn upload(&self, path: &str, param: Option<Value>, data: Vec<u8>) -> impl Future<Item=Value, Error=Error> {
844 let request = Self::request_builder("localhost", "POST", path, param).unwrap();
845
846
847 self.h2.clone()
848 .ready()
849 .map_err(Error::from)
850 .and_then(move |mut send_request| {
851 let (response, stream) = send_request.send_request(request, false).unwrap();
852 PipeToSendStream::new(bytes::Bytes::from(data), stream)
853 .and_then(|_| {
854 response
855 .map_err(Error::from)
856 .and_then(Self::h2api_response)
857 })
858 })
859 }
860
861 fn request(
862 &self,
863 request: Request<()>,
864 ) -> impl Future<Item=Value, Error=Error> {
865
866 self.send_request(request, None)
867 .and_then(move |response| {
868 response
869 .map_err(Error::from)
870 .and_then(Self::h2api_response)
871 })
872 }
873
874 fn send_request(
875 &self,
876 request: Request<()>,
877 data: Option<bytes::Bytes>,
878 ) -> impl Future<Item=h2::client::ResponseFuture, Error=Error> {
879
880 self.h2.clone()
881 .ready()
882 .map_err(Error::from)
883 .and_then(move |mut send_request| {
884 if let Some(data) = data {
885 let (response, stream) = send_request.send_request(request, false).unwrap();
886 future::Either::A(PipeToSendStream::new(bytes::Bytes::from(data), stream)
887 .and_then(move |_| {
888 future::ok(response)
889 }))
890 } else {
891 let (response, _stream) = send_request.send_request(request, true).unwrap();
892 future::Either::B(future::ok(response))
893 }
894 })
895 }
896
897 fn h2api_response(response: Response<h2::RecvStream>) -> impl Future<Item=Value, Error=Error> {
898
899 let status = response.status();
900
901 let (_head, mut body) = response.into_parts();
902
903 // The `release_capacity` handle allows the caller to manage
904 // flow control.
905 //
906 // Whenever data is received, the caller is responsible for
907 // releasing capacity back to the server once it has freed
908 // the data from memory.
909 let mut release_capacity = body.release_capacity().clone();
910
911 body
912 .map(move |chunk| {
913 // Let the server send more data.
914 let _ = release_capacity.release_capacity(chunk.len());
915 chunk
916 })
917 .concat2()
918 .map_err(Error::from)
919 .and_then(move |data| {
920 let text = String::from_utf8(data.to_vec()).unwrap();
921 if status.is_success() {
922 if text.len() > 0 {
923 let mut value: Value = serde_json::from_str(&text)?;
924 if let Some(map) = value.as_object_mut() {
925 if let Some(data) = map.remove("data") {
926 return Ok(data);
927 }
928 }
929 bail!("got result without data property");
930 } else {
931 Ok(Value::Null)
932 }
933 } else {
934 bail!("HTTP Error {}: {}", status, text);
935 }
936 })
937 }
938
939 // Note: We always encode parameters with the url
940 pub fn request_builder(server: &str, method: &str, path: &str, data: Option<Value>) -> Result<Request<()>, Error> {
941 let path = path.trim_matches('/');
942
943 if let Some(data) = data {
944 let query = tools::json_object_to_query(data)?;
945 // We detected problem with hyper around 6000 characters - seo we try to keep on the safe side
946 if query.len() > 4096 { bail!("h2 query data too large ({} bytes) - please encode data inside body", query.len()); }
947 let url: Uri = format!("https://{}:8007/{}?{}", server, path, query).parse()?;
948 let request = Request::builder()
949 .method(method)
950 .uri(url)
951 .header("User-Agent", "proxmox-backup-client/1.0")
952 .header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded")
953 .body(())?;
954 return Ok(request);
955 } else {
956 let url: Uri = format!("https://{}:8007/{}", server, path).parse()?;
957 let request = Request::builder()
958 .method(method)
959 .uri(url)
960 .header("User-Agent", "proxmox-backup-client/1.0")
961 .header(hyper::header::CONTENT_TYPE, "application/x-www-form-urlencoded")
962 .body(())?;
963
964 Ok(request)
965 }
966 }
967 }