]>
Commit | Line | Data |
---|---|---|
6716f30b WB |
1 | use std::io; |
2 | use std::process::exit; | |
3 | ||
4 | use chrono::Utc; | |
5 | use failure::*; | |
6 | use futures::future::{ok, poll_fn, Future}; | |
7 | use futures::try_ready; | |
8 | use futures::{Async, Poll}; | |
9 | use http::{Request, Response, StatusCode}; | |
10 | use hyper::rt::Stream; | |
11 | use hyper::Body; | |
12 | use tokio::prelude::*; | |
13 | use tokio_fs::file::File; | |
14 | ||
15 | use proxmox_protocol::Client as PmxClient; | |
16 | use proxmox_protocol::{BackupStream, ChunkEntry, ChunkStream, IndexType, StreamId}; | |
17 | ||
18 | use proxmox_backup::client::BackupRepository; | |
19 | ||
20 | // This is a temporary client using the backup protocol crate. | |
21 | // Its functionality should be moved to the `proxmox-backup-client` binary instead. | |
22 | // For now this is mostly here to keep in the history an alternative way of connecting to an https | |
23 | // server without hyper-tls in the background. | |
24 | // Note that hyper-tls just wraps native_tls, and so does tokio_tls. So the only way to get | |
25 | // rid of the extra dependency would be to reimplement tokio_tls on top of the openssl crate. | |
26 | ||
27 | type HyperConnection<T, B> = hyper::client::conn::Connection<T, B>; | |
28 | type HyperConnType = HyperConnection<tokio_tls::TlsStream<tokio::net::TcpStream>, Body>; | |
29 | ||
30 | // Create a future which connects to a TLS-enabled http server. | |
31 | // This would ordinarily be covered by the Connect trait in the higher level hyper interface. | |
32 | // Connect to the server, initiate TLS, finally run hyper's handshake method. | |
33 | fn connect( | |
34 | domain: &str, | |
35 | port: u16, | |
36 | no_cert_validation: bool, | |
37 | ) -> impl Future< | |
38 | // Typing out this function signature is almost more work than copying its code body... | |
39 | Item = (hyper::client::conn::SendRequest<Body>, HyperConnType), | |
40 | Error = Error, | |
41 | > { | |
42 | // tokio::net::TcpStream::connect(addr) <- this takes only a single address! | |
43 | // so we need to improvise...: | |
44 | use tokio_threadpool::blocking; | |
45 | ||
46 | let domain = domain.to_string(); | |
47 | let domain2 = domain.clone(); | |
48 | poll_fn(move || { | |
49 | blocking(|| { | |
50 | let conn = | |
51 | std::net::TcpStream::connect((domain.as_str(), port)).map_err(Error::from)?; | |
52 | tokio::net::TcpStream::from_std(conn, &Default::default()).map_err(Error::from) | |
53 | }) | |
54 | .map_err(Error::from) | |
55 | }) | |
56 | .map_err(Error::from) | |
57 | .flatten() | |
58 | .and_then(move |tcp| { | |
59 | let mut builder = native_tls::TlsConnector::builder(); | |
60 | if no_cert_validation { | |
61 | builder.danger_accept_invalid_certs(true); | |
62 | } | |
63 | let connector = tokio_tls::TlsConnector::from(builder.build().unwrap()); | |
64 | connector.connect(&domain2, tcp).map_err(Error::from) | |
65 | }) | |
66 | .and_then(|tls| hyper::client::conn::handshake(tls).map_err(Error::from)) | |
67 | } | |
68 | ||
69 | // convenience helper for non-Deserialize data... | |
70 | fn required_string_member(value: &serde_json::Value, member: &str) -> Result<String, Error> { | |
71 | Ok(value | |
72 | .get(member) | |
73 | .ok_or_else(|| format_err!("missing '{}' in response", member))? | |
74 | .as_str() | |
75 | .ok_or_else(|| format_err!("invalid data type for '{}' in response", member))? | |
76 | .to_string()) | |
77 | } | |
78 | ||
79 | struct Auth { | |
80 | ticket: String, | |
81 | token: String, | |
82 | } | |
83 | ||
84 | // Create a future which logs in on a proxmox backup server and yields an Auth struct. | |
85 | fn login( | |
86 | domain: &str, | |
87 | port: u16, | |
88 | no_cert_validation: bool, | |
89 | urlbase: &str, | |
90 | user: String, | |
91 | pass: String, | |
92 | ) -> impl Future<Item = Auth, Error = Error> { | |
93 | let formdata = Body::from( | |
94 | url::form_urlencoded::Serializer::new(String::new()) | |
95 | .append_pair("username", &{ user }) | |
96 | .append_pair("password", &{ pass }) | |
97 | .finish(), | |
98 | ); | |
99 | ||
100 | let urlbase = urlbase.to_string(); | |
101 | connect(domain, port, no_cert_validation) | |
102 | .and_then(move |(mut client, conn)| { | |
103 | let req = Request::builder() | |
104 | .method("POST") | |
105 | .uri(format!("{}/access/ticket", urlbase)) | |
106 | .header("Content-type", "application/x-www-form-urlencoded") | |
107 | .body(formdata)?; | |
108 | Ok((client.send_request(req), conn)) | |
109 | }) | |
110 | .and_then(|(res, conn)| { | |
111 | let mut conn = Some(conn); | |
112 | res.map(|res| { | |
113 | res.into_body() | |
114 | .concat2() | |
115 | .map_err(Error::from) | |
116 | .and_then(|data| { | |
117 | let data: serde_json::Value = serde_json::from_slice(&data)?; | |
118 | let data = data | |
119 | .get("data") | |
120 | .ok_or_else(|| format_err!("missing 'data' in response"))?; | |
121 | let ticket = required_string_member(data, "ticket")?; | |
122 | let token = required_string_member(data, "CSRFPreventionToken")?; | |
123 | ||
124 | Ok(Auth { ticket, token }) | |
125 | }) | |
126 | }) | |
127 | .join(poll_fn(move || { | |
128 | try_ready!(conn.as_mut().unwrap().poll_without_shutdown()); | |
129 | Ok(Async::Ready(conn.take().unwrap())) | |
130 | })) | |
131 | .map_err(Error::from) | |
132 | }) | |
133 | .and_then(|(res, _conn)| res) | |
134 | } | |
135 | ||
136 | // Factored out protocol switching future: Takes a Response future and a connection and verifies | |
137 | // its returned headers and protocol values. Yields a Response and the connection. | |
138 | fn switch_protocols( | |
139 | res: hyper::client::conn::ResponseFuture, | |
140 | conn: HyperConnType, | |
141 | ) -> impl Future<Item = (Result<Response<Body>, Error>, HyperConnType), Error = Error> { | |
142 | let mut conn = Some(conn); | |
143 | res.map(|res| { | |
144 | if res.status() != StatusCode::SWITCHING_PROTOCOLS { | |
145 | bail!("unexpected status code - expected SwitchingProtocols"); | |
146 | } | |
147 | let upgrade = match res.headers().get("Upgrade") { | |
148 | None => bail!("missing upgrade header in server response!"), | |
149 | Some(u) => u, | |
150 | }; | |
151 | if upgrade != "proxmox-backup-protocol-1" { | |
152 | match upgrade.to_str() { | |
153 | Ok(s) => bail!("unexpected upgrade protocol type received: {}", s), | |
154 | _ => bail!("unexpected upgrade protocol type received"), | |
155 | } | |
156 | } | |
157 | Ok(res) | |
158 | }) | |
159 | .map_err(Error::from) | |
160 | .join(poll_fn(move || { | |
161 | try_ready!(conn.as_mut().unwrap().poll_without_shutdown()); | |
162 | Ok(Async::Ready(conn.take().unwrap())) | |
163 | })) | |
164 | } | |
165 | ||
166 | // Base for the two uploaders: DynamicIndexUploader and FixedIndexUploader: | |
167 | struct UploaderBase<S: AsyncRead + AsyncWrite> { | |
168 | client: Option<PmxClient<S>>, | |
169 | wait_id: Option<StreamId>, | |
170 | } | |
171 | ||
172 | impl<S: AsyncRead + AsyncWrite> UploaderBase<S> { | |
173 | pub fn new(client: PmxClient<S>) -> Self { | |
174 | Self { | |
175 | client: Some(client), | |
176 | wait_id: None, | |
177 | } | |
178 | } | |
179 | ||
180 | pub fn create_backup( | |
181 | &mut self, | |
182 | index_type: IndexType, | |
183 | backup_type: &str, | |
184 | backup_id: &str, | |
185 | backup_timestamp: i64, | |
186 | filename: &str, | |
187 | chunk_size: usize, | |
188 | file_size: Option<u64>, | |
189 | ) -> Result<BackupStream, Error> { | |
190 | if self.wait_id.is_some() { | |
191 | bail!("create_backup cannot be called while awaiting a response"); | |
192 | } | |
193 | ||
194 | let backup_stream = self.client.as_mut().unwrap().create_backup( | |
195 | index_type, | |
196 | backup_type, | |
197 | backup_id, | |
198 | backup_timestamp, | |
199 | filename, | |
200 | chunk_size, | |
201 | file_size, | |
202 | true, | |
203 | )?; | |
204 | self.wait_id = Some(backup_stream.into()); | |
205 | Ok(backup_stream) | |
206 | } | |
207 | ||
208 | pub fn poll_ack(&mut self) -> Poll<(), Error> { | |
209 | if let Some(id) = self.wait_id { | |
210 | if self.client.as_mut().unwrap().wait_for_id(id)? { | |
211 | self.wait_id = None; | |
212 | } else { | |
213 | return Ok(Async::NotReady); | |
214 | } | |
215 | } | |
216 | return Ok(Async::Ready(())); | |
217 | } | |
218 | ||
219 | pub fn poll_send(&mut self) -> Poll<(), Error> { | |
220 | match self.client.as_mut().unwrap().poll_send()? { | |
221 | Some(false) => Ok(Async::NotReady), | |
222 | _ => Ok(Async::Ready(())), | |
223 | } | |
224 | } | |
225 | ||
226 | pub fn upload_chunk( | |
227 | &mut self, | |
228 | info: &ChunkEntry, | |
229 | chunk: &[u8], | |
230 | ) -> Result<Option<StreamId>, Error> { | |
231 | self.client.as_mut().unwrap().upload_chunk(info, chunk) | |
232 | } | |
233 | ||
234 | pub fn continue_upload_chunk(&mut self, chunk: &[u8]) -> Result<Option<StreamId>, Error> { | |
235 | let res = self.client.as_mut().unwrap().continue_upload_chunk(chunk)?; | |
236 | if let Some(id) = res { | |
237 | self.wait_id = Some(id); | |
238 | } | |
239 | Ok(res) | |
240 | } | |
241 | ||
242 | pub fn finish_backup(&mut self, stream: BackupStream) -> Result<(), Error> { | |
6f90a6a7 WB |
243 | let id = stream.into(); |
244 | let (name, _done) = self.client.as_mut().unwrap().finish_backup(stream)?; | |
6716f30b | 245 | println!("Server created file: {}", name); |
6f90a6a7 | 246 | self.wait_id = Some(id); |
6716f30b WB |
247 | Ok(()) |
248 | } | |
249 | ||
250 | pub fn take_client(&mut self) -> Option<PmxClient<S>> { | |
251 | self.client.take() | |
252 | } | |
253 | } | |
254 | ||
255 | // Future which creates a backup with a dynamic file: | |
256 | struct DynamicIndexUploader<C: AsyncRead, S: AsyncRead + AsyncWrite> { | |
257 | base: UploaderBase<S>, | |
258 | chunks: ChunkStream<C>, | |
259 | current_chunk: Option<ChunkEntry>, | |
260 | backup_stream: Option<BackupStream>, | |
261 | } | |
262 | ||
263 | impl<C: AsyncRead, S: AsyncRead + AsyncWrite> DynamicIndexUploader<C, S> { | |
264 | pub fn new( | |
265 | client: PmxClient<S>, | |
266 | chunks: ChunkStream<C>, | |
267 | backup_type: &str, | |
268 | backup_id: &str, | |
269 | backup_timestamp: i64, | |
270 | filename: &str, | |
271 | chunk_size: usize, | |
272 | ) -> Result<Self, Error> { | |
273 | let mut base = UploaderBase::new(client); | |
274 | let stream = base.create_backup( | |
275 | IndexType::Dynamic, | |
276 | backup_type, | |
277 | backup_id, | |
278 | backup_timestamp, | |
279 | filename, | |
280 | chunk_size, | |
281 | None, | |
282 | )?; | |
283 | Ok(Self { | |
284 | base, | |
285 | chunks, | |
286 | current_chunk: None, | |
287 | backup_stream: Some(stream), | |
288 | }) | |
289 | } | |
290 | ||
291 | fn get_chunk<'a>(chunks: &'a mut ChunkStream<C>) -> Poll<Option<&'a [u8]>, Error> { | |
292 | match chunks.get() { | |
293 | Ok(Some(None)) => Ok(Async::Ready(None)), | |
294 | Ok(Some(Some(chunk))) => Ok(Async::Ready(Some(chunk))), | |
295 | Ok(None) => return Ok(Async::NotReady), | |
296 | Err(e) => return Err(e), | |
297 | } | |
298 | } | |
299 | ||
300 | fn finished_chunk(&mut self) -> Result<(), Error> { | |
301 | self.base.client.as_mut().unwrap().dynamic_chunk( | |
302 | self.backup_stream.unwrap(), | |
303 | self.current_chunk.as_ref().unwrap(), | |
304 | )?; | |
305 | ||
306 | self.current_chunk = None; | |
307 | self.chunks.next(); | |
308 | Ok(()) | |
309 | } | |
310 | } | |
311 | ||
312 | impl<C: AsyncRead, S: AsyncRead + AsyncWrite> Future for DynamicIndexUploader<C, S> { | |
313 | type Item = PmxClient<S>; | |
314 | type Error = Error; | |
315 | ||
316 | fn poll(&mut self) -> Poll<Self::Item, Error> { | |
317 | loop { | |
318 | // Process our upload queue if we have one: | |
319 | try_ready!(self.base.poll_send()); | |
320 | ||
321 | // If we have a chunk in-flight, wait for acknowledgement: | |
322 | try_ready!(self.base.poll_ack()); | |
323 | ||
324 | // Get our current chunk: | |
325 | let chunk = match try_ready!(Self::get_chunk(&mut self.chunks)) { | |
326 | Some(chunk) => chunk, | |
327 | None => match self.backup_stream.take() { | |
328 | Some(stream) => { | |
329 | self.base.finish_backup(stream)?; | |
330 | continue; | |
331 | } | |
332 | None => return Ok(Async::Ready(self.base.take_client().unwrap())), | |
333 | }, | |
334 | }; | |
335 | ||
336 | // If the current chunk is in-flight just poll the upload: | |
337 | if self.current_chunk.is_some() { | |
338 | if self.base.continue_upload_chunk(chunk)?.is_some() { | |
339 | self.finished_chunk()?; | |
340 | } | |
341 | continue; | |
342 | } | |
343 | ||
344 | let client = self.base.client.as_ref().unwrap(); | |
345 | ||
346 | // We got a new chunk, see if we need to upload it: | |
347 | self.current_chunk = Some(ChunkEntry::from_data(chunk)); | |
348 | let entry = self.current_chunk.as_ref().unwrap(); | |
349 | if client.is_chunk_available(entry) { | |
350 | eprintln!("Already available: {}", entry.digest_to_hex()); | |
351 | self.finished_chunk()?; | |
352 | } else { | |
353 | eprintln!("New chunk: {}, size {}", entry.digest_to_hex(), entry.len()); | |
354 | match self.base.upload_chunk(entry, chunk)? { | |
355 | Some(_id) => { | |
356 | eprintln!("Finished right away!"); | |
357 | self.finished_chunk()?; | |
358 | } | |
359 | None => { | |
360 | // Send-buffer filled up, start polling the upload process. | |
361 | continue; | |
362 | } | |
363 | } | |
364 | } | |
365 | } | |
366 | } | |
367 | } | |
368 | ||
369 | struct FixedIndexUploader<T: AsyncRead, S: AsyncRead + AsyncWrite> { | |
370 | base: UploaderBase<S>, | |
371 | input: T, | |
372 | backup_stream: Option<BackupStream>, | |
373 | current_chunk: Option<ChunkEntry>, | |
374 | chunk_size: usize, | |
375 | index: usize, | |
376 | buffer: Vec<u8>, | |
377 | eof: bool, | |
378 | } | |
379 | ||
380 | impl<T: AsyncRead, S: AsyncRead + AsyncWrite> FixedIndexUploader<T, S> { | |
381 | pub fn new( | |
382 | client: PmxClient<S>, | |
383 | input: T, | |
384 | backup_type: &str, | |
385 | backup_id: &str, | |
386 | backup_timestamp: i64, | |
387 | filename: &str, | |
388 | chunk_size: usize, | |
389 | file_size: u64, | |
390 | ) -> Result<Self, Error> { | |
391 | let mut base = UploaderBase::new(client); | |
392 | let stream = base.create_backup( | |
393 | IndexType::Fixed, | |
394 | backup_type, | |
395 | backup_id, | |
396 | backup_timestamp, | |
397 | filename, | |
398 | chunk_size, | |
399 | Some(file_size), | |
400 | )?; | |
401 | Ok(Self { | |
402 | base, | |
403 | input, | |
404 | backup_stream: Some(stream), | |
405 | current_chunk: None, | |
406 | chunk_size, | |
407 | index: 0, | |
408 | buffer: Vec::with_capacity(chunk_size), | |
409 | eof: false, | |
410 | }) | |
411 | } | |
412 | ||
413 | fn fill_chunk(&mut self) -> Poll<bool, io::Error> { | |
414 | let mut pos = self.buffer.len(); | |
415 | ||
416 | // we hit eof and we want the next chunk, return false: | |
417 | if self.eof && pos == 0 { | |
418 | return Ok(Async::Ready(false)); | |
419 | } | |
420 | ||
421 | // we still have a full chunk right now: | |
422 | if pos == self.chunk_size { | |
423 | return Ok(Async::Ready(true)); | |
424 | } | |
425 | ||
426 | // fill it up: | |
427 | unsafe { | |
428 | self.buffer.set_len(self.chunk_size); | |
429 | } | |
430 | let res = loop { | |
431 | match self.input.poll_read(&mut self.buffer[pos..]) { | |
432 | Err(e) => break Err(e), | |
433 | Ok(Async::NotReady) => break Ok(Async::NotReady), | |
434 | Ok(Async::Ready(got)) => { | |
435 | if got == 0 { | |
436 | self.eof = true; | |
437 | break Ok(Async::Ready(true)); | |
438 | } | |
439 | pos += got; | |
440 | if pos == self.chunk_size { | |
441 | break Ok(Async::Ready(true)); | |
442 | } | |
443 | // read more... | |
444 | } | |
445 | } | |
446 | }; | |
447 | unsafe { | |
448 | self.buffer.set_len(pos); | |
449 | } | |
450 | res | |
451 | } | |
452 | ||
453 | fn finished_chunk(&mut self) -> Result<(), Error> { | |
454 | self.base.client.as_mut().unwrap().fixed_data( | |
455 | self.backup_stream.unwrap(), | |
456 | self.index, | |
457 | self.current_chunk.as_ref().unwrap(), | |
458 | )?; | |
459 | self.index += 1; | |
460 | self.current_chunk = None; | |
461 | unsafe { | |
462 | // This is how we tell fill_chunk() that it needs to read new data | |
463 | self.buffer.set_len(0); | |
464 | } | |
465 | Ok(()) | |
466 | } | |
467 | } | |
468 | ||
469 | impl<T: AsyncRead, S: AsyncRead + AsyncWrite> Future for FixedIndexUploader<T, S> { | |
470 | type Item = PmxClient<S>; | |
471 | type Error = Error; | |
472 | ||
473 | fn poll(&mut self) -> Poll<Self::Item, Error> { | |
474 | loop { | |
475 | // Process our upload queue if we have one: | |
476 | try_ready!(self.base.poll_send()); | |
477 | ||
478 | // If we have a chunk in-flight, wait for acknowledgement: | |
479 | try_ready!(self.base.poll_ack()); | |
480 | ||
481 | // Get our current chunk: | |
482 | if !try_ready!(self.fill_chunk()) { | |
483 | match self.backup_stream.take() { | |
484 | Some(stream) => { | |
485 | self.base.finish_backup(stream)?; | |
486 | continue; | |
487 | } | |
488 | None => { | |
489 | return Ok(Async::Ready(self.base.take_client().unwrap())); | |
490 | } | |
491 | } | |
492 | }; | |
493 | ||
494 | let chunk = &self.buffer[..]; | |
495 | ||
496 | // If the current chunk is in-flight just poll the upload: | |
497 | if self.current_chunk.is_some() { | |
498 | if self.base.continue_upload_chunk(chunk)?.is_some() { | |
499 | self.finished_chunk()?; | |
500 | } | |
501 | continue; | |
502 | } | |
503 | ||
504 | let client = self.base.client.as_ref().unwrap(); | |
505 | ||
506 | // We got a new chunk, see if we need to upload it: | |
507 | self.current_chunk = Some(ChunkEntry::from_data(chunk)); | |
508 | let entry = self.current_chunk.as_ref().unwrap(); | |
509 | if client.is_chunk_available(entry) { | |
510 | eprintln!("Already available: {}", entry.digest_to_hex()); | |
511 | self.finished_chunk()?; | |
512 | } else { | |
513 | eprintln!("New chunk: {}, size {}", entry.digest_to_hex(), entry.len()); | |
514 | match self.base.upload_chunk(entry, chunk)? { | |
515 | Some(_id) => { | |
516 | eprintln!("Finished right away!"); | |
517 | self.finished_chunk()?; | |
518 | } | |
519 | None => { | |
520 | // Send-buffer filled up, start polling the upload process. | |
521 | continue; | |
522 | } | |
523 | } | |
524 | } | |
525 | } | |
526 | } | |
527 | } | |
528 | ||
529 | // Helper-Future for waiting for a polling method on proxmox_protocol::Client to complete: | |
530 | struct ClientWaitFuture<S: AsyncRead + AsyncWrite>( | |
531 | Option<PmxClient<S>>, | |
532 | fn(&mut PmxClient<S>) -> Result<bool, Error>, | |
533 | ); | |
534 | ||
535 | impl<S: AsyncRead + AsyncWrite> Future for ClientWaitFuture<S> { | |
536 | type Item = PmxClient<S>; | |
537 | type Error = Error; | |
538 | ||
539 | fn poll(&mut self) -> Poll<Self::Item, Self::Error> { | |
540 | if (self.1)(self.0.as_mut().unwrap())? { | |
541 | Ok(Async::Ready(self.0.take().unwrap())) | |
542 | } else { | |
543 | Ok(Async::NotReady) | |
544 | } | |
545 | } | |
546 | } | |
547 | ||
548 | // Trait to provide Futures for some proxmox_protocol::Client methods: | |
549 | trait ClientOps<S: AsyncRead + AsyncWrite> { | |
550 | fn poll_handshake(self) -> ClientWaitFuture<S>; | |
551 | fn poll_hashes(self, file: &str) -> Result<ClientWaitFuture<S>, Error>; | |
552 | } | |
553 | ||
554 | impl<S: AsyncRead + AsyncWrite> ClientOps<S> for PmxClient<S> { | |
555 | fn poll_handshake(self) -> ClientWaitFuture<S> { | |
556 | ClientWaitFuture(Some(self), PmxClient::<S>::wait_for_handshake) | |
557 | } | |
558 | ||
559 | fn poll_hashes(mut self, name: &str) -> Result<ClientWaitFuture<S>, Error> { | |
560 | self.query_hashes(name)?; | |
561 | Ok(ClientWaitFuture(Some(self), PmxClient::<S>::wait_for_hashes)) | |
562 | } | |
563 | } | |
564 | ||
565 | // CLI helper. | |
566 | fn require_arg(args: &mut dyn Iterator<Item = String>, name: &str) -> String { | |
567 | match args.next() { | |
568 | Some(arg) => arg, | |
569 | None => { | |
570 | eprintln!("missing required argument: {}", name); | |
571 | exit(1); | |
572 | } | |
573 | } | |
574 | } | |
575 | ||
576 | fn main() { | |
577 | // Usage: | |
578 | // ./proxmox-protocol-testclient <type> <id> <filename> [<optional old-file>] | |
579 | // | |
580 | // This will query the remote server for a list of chunks in <old-file> if the argument was | |
581 | // provided, otherwise assumes all chunks are new. | |
582 | ||
583 | let mut args = std::env::args().skip(1); | |
584 | let mut repo = require_arg(&mut args, "repository"); | |
585 | let use_fixed_chunks = if repo == "--fixed" { | |
586 | repo = require_arg(&mut args, "repository"); | |
587 | true | |
588 | } else { | |
589 | false | |
590 | }; | |
591 | let backup_type = require_arg(&mut args, "backup-type"); | |
592 | let backup_id = require_arg(&mut args, "backup-id"); | |
593 | let filename = require_arg(&mut args, "backup-file-name"); | |
594 | // optional previous backup: | |
595 | let previous = args.next().map(|s| s.to_string()); | |
596 | ||
edd3c8c6 | 597 | let repo: BackupRepository = match repo.parse() { |
6716f30b WB |
598 | Ok(repo) => repo, |
599 | Err(e) => { | |
600 | eprintln!("error parsing repository: {}", e); | |
601 | exit(1); | |
602 | } | |
603 | }; | |
604 | ||
605 | let backup_time = Utc::now().timestamp(); | |
606 | // Or fake the time to verify we cannot create an already existing backup: | |
607 | //let backup_time = Utc::today().and_hms(3, 25, 55); | |
608 | ||
609 | println!( | |
610 | "Uploading file `{}`, type {}, id: {}", | |
611 | filename, backup_type, backup_id | |
612 | ); | |
613 | ||
614 | let no_cert_validation = true; // FIXME | |
d0a03d40 | 615 | let domain = repo.host().to_owned(); |
6716f30b WB |
616 | let port = 8007; |
617 | let address = format!("{}:{}", domain, port); | |
618 | let urlbase = format!("https://{}/api2/json", address); | |
619 | ||
d0a03d40 | 620 | let user = repo.user().to_string(); |
6716f30b WB |
621 | let pass = match proxmox_backup::tools::tty::read_password("Password: ") |
622 | .and_then(|x| String::from_utf8(x).map_err(Error::from)) | |
623 | { | |
624 | Ok(pass) => pass, | |
625 | Err(e) => { | |
626 | eprintln!("error getting password: {}", e); | |
627 | exit(1); | |
628 | } | |
629 | }; | |
d0a03d40 | 630 | let store = repo.store().to_owned(); |
6716f30b WB |
631 | |
632 | let stream = File::open(filename.clone()) | |
633 | .map_err(Error::from) | |
634 | .join(login( | |
635 | &domain, | |
636 | port, | |
637 | no_cert_validation, | |
638 | &urlbase, | |
639 | user, | |
640 | pass, | |
641 | )) | |
642 | .and_then(move |(file, auth)| { | |
643 | ok((file, auth)).join(connect(&domain, port, no_cert_validation)) | |
644 | }) | |
645 | .and_then(move |((file, auth), (mut client, conn))| { | |
646 | let req = Request::builder() | |
647 | .method("GET") | |
648 | .uri(format!("{}/admin/datastore/{}/test-upload", urlbase, store)) | |
649 | .header("Cookie", format!("PBSAuthCookie={}", auth.ticket)) | |
650 | .header("CSRFPreventionToken", auth.token) | |
651 | .header("Connection", "Upgrade") | |
652 | .header("Upgrade", "proxmox-backup-protocol-1") | |
653 | .body(Body::empty())?; | |
654 | Ok((file, client.send_request(req), conn)) | |
655 | }) | |
656 | .and_then(|(file, res, conn)| ok(file).join(switch_protocols(res, conn))) | |
657 | .and_then(|(file, (_, conn))| { | |
658 | let client = PmxClient::new(conn.into_parts().io); | |
659 | file.metadata() | |
660 | .map_err(Error::from) | |
661 | .join(client.poll_handshake()) | |
662 | }) | |
663 | .and_then(move |((file, meta), client)| { | |
664 | eprintln!("Server said hello"); | |
665 | // 2 possible futures of distinct types need an explicit cast to Box<dyn Future>... | |
666 | let fut: Box<dyn Future<Item = _, Error = _> + Send> = | |
667 | if let Some(previous) = previous { | |
668 | let query = client.poll_hashes(&previous)?; | |
669 | Box::new(ok((file, meta)).join(query)) | |
670 | } else { | |
671 | Box::new(ok(((file, meta), client))) | |
672 | }; | |
673 | Ok(fut) | |
674 | }) | |
675 | .flatten() | |
676 | .and_then(move |((file, meta), client)| { | |
677 | eprintln!("starting uploader..."); | |
678 | let uploader: Box<dyn Future<Item = _, Error = _> + Send> = if use_fixed_chunks { | |
679 | Box::new(FixedIndexUploader::new( | |
680 | client, | |
681 | file, | |
682 | &backup_type, | |
683 | &backup_id, | |
684 | backup_time, | |
685 | &filename, | |
686 | 4 * 1024 * 1024, | |
687 | meta.len(), | |
688 | )?) | |
689 | } else { | |
690 | let chunker = ChunkStream::new(file); | |
691 | Box::new(DynamicIndexUploader::new( | |
692 | client, | |
693 | chunker, | |
694 | &backup_type, | |
695 | &backup_id, | |
696 | backup_time, | |
697 | &filename, | |
698 | 4 * 1024 * 1024, | |
699 | )?) | |
700 | }; | |
701 | Ok(uploader) | |
702 | }) | |
703 | .flatten(); | |
704 | ||
705 | let stream = stream | |
706 | .and_then(move |_client| { | |
707 | println!("Done"); | |
708 | Ok(()) | |
709 | }) | |
710 | .map_err(|e| eprintln!("error: {}", e)); | |
711 | hyper::rt::run(stream); | |
712 | } |