]> git.proxmox.com Git - proxmox-backup.git/blame - src/bin/proxmox-protocol-testclient.rs
implement server state/signal handling, depend on tokio-signal
[proxmox-backup.git] / src / bin / proxmox-protocol-testclient.rs
CommitLineData
6716f30b
WB
1use std::io;
2use std::process::exit;
3
4use chrono::Utc;
5use failure::*;
6use futures::future::{ok, poll_fn, Future};
7use futures::try_ready;
8use futures::{Async, Poll};
9use http::{Request, Response, StatusCode};
10use hyper::rt::Stream;
11use hyper::Body;
12use tokio::prelude::*;
13use tokio_fs::file::File;
14
15use proxmox_protocol::Client as PmxClient;
16use proxmox_protocol::{BackupStream, ChunkEntry, ChunkStream, IndexType, StreamId};
17
18use proxmox_backup::client::BackupRepository;
19
20// This is a temporary client using the backup protocol crate.
21// Its functionality should be moved to the `proxmox-backup-client` binary instead.
22// For now this is mostly here to keep in the history an alternative way of connecting to an https
23// server without hyper-tls in the background.
24// Note that hyper-tls just wraps native_tls, and so does tokio_tls. So the only way to get
25// rid of the extra dependency would be to reimplement tokio_tls on top of the openssl crate.
26
27type HyperConnection<T, B> = hyper::client::conn::Connection<T, B>;
28type HyperConnType = HyperConnection<tokio_tls::TlsStream<tokio::net::TcpStream>, Body>;
29
30// Create a future which connects to a TLS-enabled http server.
31// This would ordinarily be covered by the Connect trait in the higher level hyper interface.
32// Connect to the server, initiate TLS, finally run hyper's handshake method.
33fn connect(
34 domain: &str,
35 port: u16,
36 no_cert_validation: bool,
37) -> impl Future<
38 // Typing out this function signature is almost more work than copying its code body...
39 Item = (hyper::client::conn::SendRequest<Body>, HyperConnType),
40 Error = Error,
41> {
42 // tokio::net::TcpStream::connect(addr) <- this takes only a single address!
43 // so we need to improvise...:
44 use tokio_threadpool::blocking;
45
46 let domain = domain.to_string();
47 let domain2 = domain.clone();
48 poll_fn(move || {
49 blocking(|| {
50 let conn =
51 std::net::TcpStream::connect((domain.as_str(), port)).map_err(Error::from)?;
52 tokio::net::TcpStream::from_std(conn, &Default::default()).map_err(Error::from)
53 })
54 .map_err(Error::from)
55 })
56 .map_err(Error::from)
57 .flatten()
58 .and_then(move |tcp| {
59 let mut builder = native_tls::TlsConnector::builder();
60 if no_cert_validation {
61 builder.danger_accept_invalid_certs(true);
62 }
63 let connector = tokio_tls::TlsConnector::from(builder.build().unwrap());
64 connector.connect(&domain2, tcp).map_err(Error::from)
65 })
66 .and_then(|tls| hyper::client::conn::handshake(tls).map_err(Error::from))
67}
68
69// convenience helper for non-Deserialize data...
70fn required_string_member(value: &serde_json::Value, member: &str) -> Result<String, Error> {
71 Ok(value
72 .get(member)
73 .ok_or_else(|| format_err!("missing '{}' in response", member))?
74 .as_str()
75 .ok_or_else(|| format_err!("invalid data type for '{}' in response", member))?
76 .to_string())
77}
78
79struct Auth {
80 ticket: String,
81 token: String,
82}
83
84// Create a future which logs in on a proxmox backup server and yields an Auth struct.
85fn login(
86 domain: &str,
87 port: u16,
88 no_cert_validation: bool,
89 urlbase: &str,
90 user: String,
91 pass: String,
92) -> impl Future<Item = Auth, Error = Error> {
93 let formdata = Body::from(
94 url::form_urlencoded::Serializer::new(String::new())
95 .append_pair("username", &{ user })
96 .append_pair("password", &{ pass })
97 .finish(),
98 );
99
100 let urlbase = urlbase.to_string();
101 connect(domain, port, no_cert_validation)
102 .and_then(move |(mut client, conn)| {
103 let req = Request::builder()
104 .method("POST")
105 .uri(format!("{}/access/ticket", urlbase))
106 .header("Content-type", "application/x-www-form-urlencoded")
107 .body(formdata)?;
108 Ok((client.send_request(req), conn))
109 })
110 .and_then(|(res, conn)| {
111 let mut conn = Some(conn);
112 res.map(|res| {
113 res.into_body()
114 .concat2()
115 .map_err(Error::from)
116 .and_then(|data| {
117 let data: serde_json::Value = serde_json::from_slice(&data)?;
118 let data = data
119 .get("data")
120 .ok_or_else(|| format_err!("missing 'data' in response"))?;
121 let ticket = required_string_member(data, "ticket")?;
122 let token = required_string_member(data, "CSRFPreventionToken")?;
123
124 Ok(Auth { ticket, token })
125 })
126 })
127 .join(poll_fn(move || {
128 try_ready!(conn.as_mut().unwrap().poll_without_shutdown());
129 Ok(Async::Ready(conn.take().unwrap()))
130 }))
131 .map_err(Error::from)
132 })
133 .and_then(|(res, _conn)| res)
134}
135
136// Factored out protocol switching future: Takes a Response future and a connection and verifies
137// its returned headers and protocol values. Yields a Response and the connection.
138fn switch_protocols(
139 res: hyper::client::conn::ResponseFuture,
140 conn: HyperConnType,
141) -> impl Future<Item = (Result<Response<Body>, Error>, HyperConnType), Error = Error> {
142 let mut conn = Some(conn);
143 res.map(|res| {
144 if res.status() != StatusCode::SWITCHING_PROTOCOLS {
145 bail!("unexpected status code - expected SwitchingProtocols");
146 }
147 let upgrade = match res.headers().get("Upgrade") {
148 None => bail!("missing upgrade header in server response!"),
149 Some(u) => u,
150 };
151 if upgrade != "proxmox-backup-protocol-1" {
152 match upgrade.to_str() {
153 Ok(s) => bail!("unexpected upgrade protocol type received: {}", s),
154 _ => bail!("unexpected upgrade protocol type received"),
155 }
156 }
157 Ok(res)
158 })
159 .map_err(Error::from)
160 .join(poll_fn(move || {
161 try_ready!(conn.as_mut().unwrap().poll_without_shutdown());
162 Ok(Async::Ready(conn.take().unwrap()))
163 }))
164}
165
166// Base for the two uploaders: DynamicIndexUploader and FixedIndexUploader:
167struct UploaderBase<S: AsyncRead + AsyncWrite> {
168 client: Option<PmxClient<S>>,
169 wait_id: Option<StreamId>,
170}
171
172impl<S: AsyncRead + AsyncWrite> UploaderBase<S> {
173 pub fn new(client: PmxClient<S>) -> Self {
174 Self {
175 client: Some(client),
176 wait_id: None,
177 }
178 }
179
180 pub fn create_backup(
181 &mut self,
182 index_type: IndexType,
183 backup_type: &str,
184 backup_id: &str,
185 backup_timestamp: i64,
186 filename: &str,
187 chunk_size: usize,
188 file_size: Option<u64>,
189 ) -> Result<BackupStream, Error> {
190 if self.wait_id.is_some() {
191 bail!("create_backup cannot be called while awaiting a response");
192 }
193
194 let backup_stream = self.client.as_mut().unwrap().create_backup(
195 index_type,
196 backup_type,
197 backup_id,
198 backup_timestamp,
199 filename,
200 chunk_size,
201 file_size,
202 true,
203 )?;
204 self.wait_id = Some(backup_stream.into());
205 Ok(backup_stream)
206 }
207
208 pub fn poll_ack(&mut self) -> Poll<(), Error> {
209 if let Some(id) = self.wait_id {
210 if self.client.as_mut().unwrap().wait_for_id(id)? {
211 self.wait_id = None;
212 } else {
213 return Ok(Async::NotReady);
214 }
215 }
216 return Ok(Async::Ready(()));
217 }
218
219 pub fn poll_send(&mut self) -> Poll<(), Error> {
220 match self.client.as_mut().unwrap().poll_send()? {
221 Some(false) => Ok(Async::NotReady),
222 _ => Ok(Async::Ready(())),
223 }
224 }
225
226 pub fn upload_chunk(
227 &mut self,
228 info: &ChunkEntry,
229 chunk: &[u8],
230 ) -> Result<Option<StreamId>, Error> {
231 self.client.as_mut().unwrap().upload_chunk(info, chunk)
232 }
233
234 pub fn continue_upload_chunk(&mut self, chunk: &[u8]) -> Result<Option<StreamId>, Error> {
235 let res = self.client.as_mut().unwrap().continue_upload_chunk(chunk)?;
236 if let Some(id) = res {
237 self.wait_id = Some(id);
238 }
239 Ok(res)
240 }
241
242 pub fn finish_backup(&mut self, stream: BackupStream) -> Result<(), Error> {
6f90a6a7
WB
243 let id = stream.into();
244 let (name, _done) = self.client.as_mut().unwrap().finish_backup(stream)?;
6716f30b 245 println!("Server created file: {}", name);
6f90a6a7 246 self.wait_id = Some(id);
6716f30b
WB
247 Ok(())
248 }
249
250 pub fn take_client(&mut self) -> Option<PmxClient<S>> {
251 self.client.take()
252 }
253}
254
255// Future which creates a backup with a dynamic file:
256struct DynamicIndexUploader<C: AsyncRead, S: AsyncRead + AsyncWrite> {
257 base: UploaderBase<S>,
258 chunks: ChunkStream<C>,
259 current_chunk: Option<ChunkEntry>,
260 backup_stream: Option<BackupStream>,
261}
262
263impl<C: AsyncRead, S: AsyncRead + AsyncWrite> DynamicIndexUploader<C, S> {
264 pub fn new(
265 client: PmxClient<S>,
266 chunks: ChunkStream<C>,
267 backup_type: &str,
268 backup_id: &str,
269 backup_timestamp: i64,
270 filename: &str,
271 chunk_size: usize,
272 ) -> Result<Self, Error> {
273 let mut base = UploaderBase::new(client);
274 let stream = base.create_backup(
275 IndexType::Dynamic,
276 backup_type,
277 backup_id,
278 backup_timestamp,
279 filename,
280 chunk_size,
281 None,
282 )?;
283 Ok(Self {
284 base,
285 chunks,
286 current_chunk: None,
287 backup_stream: Some(stream),
288 })
289 }
290
291 fn get_chunk<'a>(chunks: &'a mut ChunkStream<C>) -> Poll<Option<&'a [u8]>, Error> {
292 match chunks.get() {
293 Ok(Some(None)) => Ok(Async::Ready(None)),
294 Ok(Some(Some(chunk))) => Ok(Async::Ready(Some(chunk))),
295 Ok(None) => return Ok(Async::NotReady),
296 Err(e) => return Err(e),
297 }
298 }
299
300 fn finished_chunk(&mut self) -> Result<(), Error> {
301 self.base.client.as_mut().unwrap().dynamic_chunk(
302 self.backup_stream.unwrap(),
303 self.current_chunk.as_ref().unwrap(),
304 )?;
305
306 self.current_chunk = None;
307 self.chunks.next();
308 Ok(())
309 }
310}
311
312impl<C: AsyncRead, S: AsyncRead + AsyncWrite> Future for DynamicIndexUploader<C, S> {
313 type Item = PmxClient<S>;
314 type Error = Error;
315
316 fn poll(&mut self) -> Poll<Self::Item, Error> {
317 loop {
318 // Process our upload queue if we have one:
319 try_ready!(self.base.poll_send());
320
321 // If we have a chunk in-flight, wait for acknowledgement:
322 try_ready!(self.base.poll_ack());
323
324 // Get our current chunk:
325 let chunk = match try_ready!(Self::get_chunk(&mut self.chunks)) {
326 Some(chunk) => chunk,
327 None => match self.backup_stream.take() {
328 Some(stream) => {
329 self.base.finish_backup(stream)?;
330 continue;
331 }
332 None => return Ok(Async::Ready(self.base.take_client().unwrap())),
333 },
334 };
335
336 // If the current chunk is in-flight just poll the upload:
337 if self.current_chunk.is_some() {
338 if self.base.continue_upload_chunk(chunk)?.is_some() {
339 self.finished_chunk()?;
340 }
341 continue;
342 }
343
344 let client = self.base.client.as_ref().unwrap();
345
346 // We got a new chunk, see if we need to upload it:
347 self.current_chunk = Some(ChunkEntry::from_data(chunk));
348 let entry = self.current_chunk.as_ref().unwrap();
349 if client.is_chunk_available(entry) {
350 eprintln!("Already available: {}", entry.digest_to_hex());
351 self.finished_chunk()?;
352 } else {
353 eprintln!("New chunk: {}, size {}", entry.digest_to_hex(), entry.len());
354 match self.base.upload_chunk(entry, chunk)? {
355 Some(_id) => {
356 eprintln!("Finished right away!");
357 self.finished_chunk()?;
358 }
359 None => {
360 // Send-buffer filled up, start polling the upload process.
361 continue;
362 }
363 }
364 }
365 }
366 }
367}
368
369struct FixedIndexUploader<T: AsyncRead, S: AsyncRead + AsyncWrite> {
370 base: UploaderBase<S>,
371 input: T,
372 backup_stream: Option<BackupStream>,
373 current_chunk: Option<ChunkEntry>,
374 chunk_size: usize,
375 index: usize,
376 buffer: Vec<u8>,
377 eof: bool,
378}
379
380impl<T: AsyncRead, S: AsyncRead + AsyncWrite> FixedIndexUploader<T, S> {
381 pub fn new(
382 client: PmxClient<S>,
383 input: T,
384 backup_type: &str,
385 backup_id: &str,
386 backup_timestamp: i64,
387 filename: &str,
388 chunk_size: usize,
389 file_size: u64,
390 ) -> Result<Self, Error> {
391 let mut base = UploaderBase::new(client);
392 let stream = base.create_backup(
393 IndexType::Fixed,
394 backup_type,
395 backup_id,
396 backup_timestamp,
397 filename,
398 chunk_size,
399 Some(file_size),
400 )?;
401 Ok(Self {
402 base,
403 input,
404 backup_stream: Some(stream),
405 current_chunk: None,
406 chunk_size,
407 index: 0,
408 buffer: Vec::with_capacity(chunk_size),
409 eof: false,
410 })
411 }
412
413 fn fill_chunk(&mut self) -> Poll<bool, io::Error> {
414 let mut pos = self.buffer.len();
415
416 // we hit eof and we want the next chunk, return false:
417 if self.eof && pos == 0 {
418 return Ok(Async::Ready(false));
419 }
420
421 // we still have a full chunk right now:
422 if pos == self.chunk_size {
423 return Ok(Async::Ready(true));
424 }
425
426 // fill it up:
427 unsafe {
428 self.buffer.set_len(self.chunk_size);
429 }
430 let res = loop {
431 match self.input.poll_read(&mut self.buffer[pos..]) {
432 Err(e) => break Err(e),
433 Ok(Async::NotReady) => break Ok(Async::NotReady),
434 Ok(Async::Ready(got)) => {
435 if got == 0 {
436 self.eof = true;
437 break Ok(Async::Ready(true));
438 }
439 pos += got;
440 if pos == self.chunk_size {
441 break Ok(Async::Ready(true));
442 }
443 // read more...
444 }
445 }
446 };
447 unsafe {
448 self.buffer.set_len(pos);
449 }
450 res
451 }
452
453 fn finished_chunk(&mut self) -> Result<(), Error> {
454 self.base.client.as_mut().unwrap().fixed_data(
455 self.backup_stream.unwrap(),
456 self.index,
457 self.current_chunk.as_ref().unwrap(),
458 )?;
459 self.index += 1;
460 self.current_chunk = None;
461 unsafe {
462 // This is how we tell fill_chunk() that it needs to read new data
463 self.buffer.set_len(0);
464 }
465 Ok(())
466 }
467}
468
469impl<T: AsyncRead, S: AsyncRead + AsyncWrite> Future for FixedIndexUploader<T, S> {
470 type Item = PmxClient<S>;
471 type Error = Error;
472
473 fn poll(&mut self) -> Poll<Self::Item, Error> {
474 loop {
475 // Process our upload queue if we have one:
476 try_ready!(self.base.poll_send());
477
478 // If we have a chunk in-flight, wait for acknowledgement:
479 try_ready!(self.base.poll_ack());
480
481 // Get our current chunk:
482 if !try_ready!(self.fill_chunk()) {
483 match self.backup_stream.take() {
484 Some(stream) => {
485 self.base.finish_backup(stream)?;
486 continue;
487 }
488 None => {
489 return Ok(Async::Ready(self.base.take_client().unwrap()));
490 }
491 }
492 };
493
494 let chunk = &self.buffer[..];
495
496 // If the current chunk is in-flight just poll the upload:
497 if self.current_chunk.is_some() {
498 if self.base.continue_upload_chunk(chunk)?.is_some() {
499 self.finished_chunk()?;
500 }
501 continue;
502 }
503
504 let client = self.base.client.as_ref().unwrap();
505
506 // We got a new chunk, see if we need to upload it:
507 self.current_chunk = Some(ChunkEntry::from_data(chunk));
508 let entry = self.current_chunk.as_ref().unwrap();
509 if client.is_chunk_available(entry) {
510 eprintln!("Already available: {}", entry.digest_to_hex());
511 self.finished_chunk()?;
512 } else {
513 eprintln!("New chunk: {}, size {}", entry.digest_to_hex(), entry.len());
514 match self.base.upload_chunk(entry, chunk)? {
515 Some(_id) => {
516 eprintln!("Finished right away!");
517 self.finished_chunk()?;
518 }
519 None => {
520 // Send-buffer filled up, start polling the upload process.
521 continue;
522 }
523 }
524 }
525 }
526 }
527}
528
529// Helper-Future for waiting for a polling method on proxmox_protocol::Client to complete:
530struct ClientWaitFuture<S: AsyncRead + AsyncWrite>(
531 Option<PmxClient<S>>,
532 fn(&mut PmxClient<S>) -> Result<bool, Error>,
533);
534
535impl<S: AsyncRead + AsyncWrite> Future for ClientWaitFuture<S> {
536 type Item = PmxClient<S>;
537 type Error = Error;
538
539 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
540 if (self.1)(self.0.as_mut().unwrap())? {
541 Ok(Async::Ready(self.0.take().unwrap()))
542 } else {
543 Ok(Async::NotReady)
544 }
545 }
546}
547
548// Trait to provide Futures for some proxmox_protocol::Client methods:
549trait ClientOps<S: AsyncRead + AsyncWrite> {
550 fn poll_handshake(self) -> ClientWaitFuture<S>;
551 fn poll_hashes(self, file: &str) -> Result<ClientWaitFuture<S>, Error>;
552}
553
554impl<S: AsyncRead + AsyncWrite> ClientOps<S> for PmxClient<S> {
555 fn poll_handshake(self) -> ClientWaitFuture<S> {
556 ClientWaitFuture(Some(self), PmxClient::<S>::wait_for_handshake)
557 }
558
559 fn poll_hashes(mut self, name: &str) -> Result<ClientWaitFuture<S>, Error> {
560 self.query_hashes(name)?;
561 Ok(ClientWaitFuture(Some(self), PmxClient::<S>::wait_for_hashes))
562 }
563}
564
565// CLI helper.
566fn require_arg(args: &mut dyn Iterator<Item = String>, name: &str) -> String {
567 match args.next() {
568 Some(arg) => arg,
569 None => {
570 eprintln!("missing required argument: {}", name);
571 exit(1);
572 }
573 }
574}
575
576fn main() {
577 // Usage:
578 // ./proxmox-protocol-testclient <type> <id> <filename> [<optional old-file>]
579 //
580 // This will query the remote server for a list of chunks in <old-file> if the argument was
581 // provided, otherwise assumes all chunks are new.
582
583 let mut args = std::env::args().skip(1);
584 let mut repo = require_arg(&mut args, "repository");
585 let use_fixed_chunks = if repo == "--fixed" {
586 repo = require_arg(&mut args, "repository");
587 true
588 } else {
589 false
590 };
591 let backup_type = require_arg(&mut args, "backup-type");
592 let backup_id = require_arg(&mut args, "backup-id");
593 let filename = require_arg(&mut args, "backup-file-name");
594 // optional previous backup:
595 let previous = args.next().map(|s| s.to_string());
596
edd3c8c6 597 let repo: BackupRepository = match repo.parse() {
6716f30b
WB
598 Ok(repo) => repo,
599 Err(e) => {
600 eprintln!("error parsing repository: {}", e);
601 exit(1);
602 }
603 };
604
605 let backup_time = Utc::now().timestamp();
606 // Or fake the time to verify we cannot create an already existing backup:
607 //let backup_time = Utc::today().and_hms(3, 25, 55);
608
609 println!(
610 "Uploading file `{}`, type {}, id: {}",
611 filename, backup_type, backup_id
612 );
613
614 let no_cert_validation = true; // FIXME
d0a03d40 615 let domain = repo.host().to_owned();
6716f30b
WB
616 let port = 8007;
617 let address = format!("{}:{}", domain, port);
618 let urlbase = format!("https://{}/api2/json", address);
619
d0a03d40 620 let user = repo.user().to_string();
6716f30b
WB
621 let pass = match proxmox_backup::tools::tty::read_password("Password: ")
622 .and_then(|x| String::from_utf8(x).map_err(Error::from))
623 {
624 Ok(pass) => pass,
625 Err(e) => {
626 eprintln!("error getting password: {}", e);
627 exit(1);
628 }
629 };
d0a03d40 630 let store = repo.store().to_owned();
6716f30b
WB
631
632 let stream = File::open(filename.clone())
633 .map_err(Error::from)
634 .join(login(
635 &domain,
636 port,
637 no_cert_validation,
638 &urlbase,
639 user,
640 pass,
641 ))
642 .and_then(move |(file, auth)| {
643 ok((file, auth)).join(connect(&domain, port, no_cert_validation))
644 })
645 .and_then(move |((file, auth), (mut client, conn))| {
646 let req = Request::builder()
647 .method("GET")
648 .uri(format!("{}/admin/datastore/{}/test-upload", urlbase, store))
649 .header("Cookie", format!("PBSAuthCookie={}", auth.ticket))
650 .header("CSRFPreventionToken", auth.token)
651 .header("Connection", "Upgrade")
652 .header("Upgrade", "proxmox-backup-protocol-1")
653 .body(Body::empty())?;
654 Ok((file, client.send_request(req), conn))
655 })
656 .and_then(|(file, res, conn)| ok(file).join(switch_protocols(res, conn)))
657 .and_then(|(file, (_, conn))| {
658 let client = PmxClient::new(conn.into_parts().io);
659 file.metadata()
660 .map_err(Error::from)
661 .join(client.poll_handshake())
662 })
663 .and_then(move |((file, meta), client)| {
664 eprintln!("Server said hello");
665 // 2 possible futures of distinct types need an explicit cast to Box<dyn Future>...
666 let fut: Box<dyn Future<Item = _, Error = _> + Send> =
667 if let Some(previous) = previous {
668 let query = client.poll_hashes(&previous)?;
669 Box::new(ok((file, meta)).join(query))
670 } else {
671 Box::new(ok(((file, meta), client)))
672 };
673 Ok(fut)
674 })
675 .flatten()
676 .and_then(move |((file, meta), client)| {
677 eprintln!("starting uploader...");
678 let uploader: Box<dyn Future<Item = _, Error = _> + Send> = if use_fixed_chunks {
679 Box::new(FixedIndexUploader::new(
680 client,
681 file,
682 &backup_type,
683 &backup_id,
684 backup_time,
685 &filename,
686 4 * 1024 * 1024,
687 meta.len(),
688 )?)
689 } else {
690 let chunker = ChunkStream::new(file);
691 Box::new(DynamicIndexUploader::new(
692 client,
693 chunker,
694 &backup_type,
695 &backup_id,
696 backup_time,
697 &filename,
698 4 * 1024 * 1024,
699 )?)
700 };
701 Ok(uploader)
702 })
703 .flatten();
704
705 let stream = stream
706 .and_then(move |_client| {
707 println!("Done");
708 Ok(())
709 })
710 .map_err(|e| eprintln!("error: {}", e));
711 hyper::rt::run(stream);
712}