2 use std
::process
::exit
;
6 use futures
::future
::{ok, poll_fn, Future}
;
7 use futures
::try_ready
;
8 use futures
::{Async, Poll}
;
9 use http
::{Request, Response, StatusCode}
;
10 use hyper
::rt
::Stream
;
12 use tokio
::prelude
::*;
13 use tokio_fs
::file
::File
;
15 use proxmox_protocol
::Client
as PmxClient
;
16 use proxmox_protocol
::{BackupStream, ChunkEntry, ChunkStream, IndexType, StreamId}
;
18 use proxmox_backup
::client
::BackupRepository
;
20 // This is a temporary client using the backup protocol crate.
21 // Its functionality should be moved to the `proxmox-backup-client` binary instead.
22 // For now this is mostly here to keep in the history an alternative way of connecting to an https
23 // server without hyper-tls in the background.
24 // Note that hyper-tls just wraps native_tls, and so does tokio_tls. So the only way to get
25 // rid of the extra dependency would be to reimplement tokio_tls on top of the openssl crate.
27 type HyperConnection
<T
, B
> = hyper
::client
::conn
::Connection
<T
, B
>;
28 type HyperConnType
= HyperConnection
<tokio_tls
::TlsStream
<tokio
::net
::TcpStream
>, Body
>;
30 // Create a future which connects to a TLS-enabled http server.
31 // This would ordinarily be covered by the Connect trait in the higher level hyper interface.
32 // Connect to the server, initiate TLS, finally run hyper's handshake method.
36 no_cert_validation
: bool
,
38 // Typing out this function signature is almost more work than copying its code body...
39 Item
= (hyper
::client
::conn
::SendRequest
<Body
>, HyperConnType
),
42 // tokio::net::TcpStream::connect(addr) <- this takes only a single address!
43 // so we need to improvise...:
44 use tokio_threadpool
::blocking
;
46 let domain
= domain
.to_string();
47 let domain2
= domain
.clone();
51 std
::net
::TcpStream
::connect((domain
.as_str(), port
)).map_err(Error
::from
)?
;
52 tokio
::net
::TcpStream
::from_std(conn
, &Default
::default()).map_err(Error
::from
)
58 .and_then(move |tcp
| {
59 let mut builder
= native_tls
::TlsConnector
::builder();
60 if no_cert_validation
{
61 builder
.danger_accept_invalid_certs(true);
63 let connector
= tokio_tls
::TlsConnector
::from(builder
.build().unwrap());
64 connector
.connect(&domain2
, tcp
).map_err(Error
::from
)
66 .and_then(|tls
| hyper
::client
::conn
::handshake(tls
).map_err(Error
::from
))
69 // convenience helper for non-Deserialize data...
70 fn required_string_member(value
: &serde_json
::Value
, member
: &str) -> Result
<String
, Error
> {
73 .ok_or_else(|| format_err
!("missing '{}' in response", member
))?
75 .ok_or_else(|| format_err
!("invalid data type for '{}' in response", member
))?
84 // Create a future which logs in on a proxmox backup server and yields an Auth struct.
88 no_cert_validation
: bool
,
92 ) -> impl Future
<Item
= Auth
, Error
= Error
> {
93 let formdata
= Body
::from(
94 url
::form_urlencoded
::Serializer
::new(String
::new())
95 .append_pair("username", &{ user }
)
96 .append_pair("password", &{ pass }
)
100 let urlbase
= urlbase
.to_string();
101 connect(domain
, port
, no_cert_validation
)
102 .and_then(move |(mut client
, conn
)| {
103 let req
= Request
::builder()
105 .uri(format
!("{}/access/ticket", urlbase
))
106 .header("Content-type", "application/x-www-form-urlencoded")
108 Ok((client
.send_request(req
), conn
))
110 .and_then(|(res
, conn
)| {
111 let mut conn
= Some(conn
);
115 .map_err(Error
::from
)
117 let data
: serde_json
::Value
= serde_json
::from_slice(&data
)?
;
120 .ok_or_else(|| format_err
!("missing 'data' in response"))?
;
121 let ticket
= required_string_member(data
, "ticket")?
;
122 let token
= required_string_member(data
, "CSRFPreventionToken")?
;
124 Ok(Auth { ticket, token }
)
127 .join(poll_fn(move || {
128 try_ready
!(conn
.as_mut().unwrap().poll_without_shutdown());
129 Ok(Async
::Ready(conn
.take().unwrap()))
131 .map_err(Error
::from
)
133 .and_then(|(res
, _conn
)| res
)
136 // Factored out protocol switching future: Takes a Response future and a connection and verifies
137 // its returned headers and protocol values. Yields a Response and the connection.
139 res
: hyper
::client
::conn
::ResponseFuture
,
141 ) -> impl Future
<Item
= (Result
<Response
<Body
>, Error
>, HyperConnType
), Error
= Error
> {
142 let mut conn
= Some(conn
);
144 if res
.status() != StatusCode
::SWITCHING_PROTOCOLS
{
145 bail
!("unexpected status code - expected SwitchingProtocols");
147 let upgrade
= match res
.headers().get("Upgrade") {
148 None
=> bail
!("missing upgrade header in server response!"),
151 if upgrade
!= "proxmox-backup-protocol-1" {
152 match upgrade
.to_str() {
153 Ok(s
) => bail
!("unexpected upgrade protocol type received: {}", s
),
154 _
=> bail
!("unexpected upgrade protocol type received"),
159 .map_err(Error
::from
)
160 .join(poll_fn(move || {
161 try_ready
!(conn
.as_mut().unwrap().poll_without_shutdown());
162 Ok(Async
::Ready(conn
.take().unwrap()))
166 // Base for the two uploaders: DynamicIndexUploader and FixedIndexUploader:
167 struct UploaderBase
<S
: AsyncRead
+ AsyncWrite
> {
168 client
: Option
<PmxClient
<S
>>,
169 wait_id
: Option
<StreamId
>,
172 impl<S
: AsyncRead
+ AsyncWrite
> UploaderBase
<S
> {
173 pub fn new(client
: PmxClient
<S
>) -> Self {
175 client
: Some(client
),
180 pub fn create_backup(
182 index_type
: IndexType
,
185 backup_timestamp
: i64,
188 file_size
: Option
<u64>,
189 ) -> Result
<BackupStream
, Error
> {
190 if self.wait_id
.is_some() {
191 bail
!("create_backup cannot be called while awaiting a response");
194 let backup_stream
= self.client
.as_mut().unwrap().create_backup(
204 self.wait_id
= Some(backup_stream
.into());
208 pub fn poll_ack(&mut self) -> Poll
<(), Error
> {
209 if let Some(id
) = self.wait_id
{
210 if self.client
.as_mut().unwrap().wait_for_id(id
)?
{
213 return Ok(Async
::NotReady
);
216 return Ok(Async
::Ready(()));
219 pub fn poll_send(&mut self) -> Poll
<(), Error
> {
220 match self.client
.as_mut().unwrap().poll_send()?
{
221 Some(false) => Ok(Async
::NotReady
),
222 _
=> Ok(Async
::Ready(())),
230 ) -> Result
<Option
<StreamId
>, Error
> {
231 self.client
.as_mut().unwrap().upload_chunk(info
, chunk
)
234 pub fn continue_upload_chunk(&mut self, chunk
: &[u8]) -> Result
<Option
<StreamId
>, Error
> {
235 let res
= self.client
.as_mut().unwrap().continue_upload_chunk(chunk
)?
;
236 if let Some(id
) = res
{
237 self.wait_id
= Some(id
);
242 pub fn finish_backup(&mut self, stream
: BackupStream
) -> Result
<(), Error
> {
243 let id
= stream
.into();
244 let (name
, _done
) = self.client
.as_mut().unwrap().finish_backup(stream
)?
;
245 println
!("Server created file: {}", name
);
246 self.wait_id
= Some(id
);
250 pub fn take_client(&mut self) -> Option
<PmxClient
<S
>> {
255 // Future which creates a backup with a dynamic file:
256 struct DynamicIndexUploader
<C
: AsyncRead
, S
: AsyncRead
+ AsyncWrite
> {
257 base
: UploaderBase
<S
>,
258 chunks
: ChunkStream
<C
>,
259 current_chunk
: Option
<ChunkEntry
>,
260 backup_stream
: Option
<BackupStream
>,
263 impl<C
: AsyncRead
, S
: AsyncRead
+ AsyncWrite
> DynamicIndexUploader
<C
, S
> {
265 client
: PmxClient
<S
>,
266 chunks
: ChunkStream
<C
>,
269 backup_timestamp
: i64,
272 ) -> Result
<Self, Error
> {
273 let mut base
= UploaderBase
::new(client
);
274 let stream
= base
.create_backup(
287 backup_stream
: Some(stream
),
291 fn get_chunk
<'a
>(chunks
: &'a
mut ChunkStream
<C
>) -> Poll
<Option
<&'a
[u8]>, Error
> {
293 Ok(Some(None
)) => Ok(Async
::Ready(None
)),
294 Ok(Some(Some(chunk
))) => Ok(Async
::Ready(Some(chunk
))),
295 Ok(None
) => return Ok(Async
::NotReady
),
296 Err(e
) => return Err(e
),
300 fn finished_chunk(&mut self) -> Result
<(), Error
> {
301 self.base
.client
.as_mut().unwrap().dynamic_chunk(
302 self.backup_stream
.unwrap(),
303 self.current_chunk
.as_ref().unwrap(),
306 self.current_chunk
= None
;
312 impl<C
: AsyncRead
, S
: AsyncRead
+ AsyncWrite
> Future
for DynamicIndexUploader
<C
, S
> {
313 type Item
= PmxClient
<S
>;
316 fn poll(&mut self) -> Poll
<Self::Item
, Error
> {
318 // Process our upload queue if we have one:
319 try_ready
!(self.base
.poll_send());
321 // If we have a chunk in-flight, wait for acknowledgement:
322 try_ready
!(self.base
.poll_ack());
324 // Get our current chunk:
325 let chunk
= match try_ready
!(Self::get_chunk(&mut self.chunks
)) {
326 Some(chunk
) => chunk
,
327 None
=> match self.backup_stream
.take() {
329 self.base
.finish_backup(stream
)?
;
332 None
=> return Ok(Async
::Ready(self.base
.take_client().unwrap())),
336 // If the current chunk is in-flight just poll the upload:
337 if self.current_chunk
.is_some() {
338 if self.base
.continue_upload_chunk(chunk
)?
.is_some() {
339 self.finished_chunk()?
;
344 let client
= self.base
.client
.as_ref().unwrap();
346 // We got a new chunk, see if we need to upload it:
347 self.current_chunk
= Some(ChunkEntry
::from_data(chunk
));
348 let entry
= self.current_chunk
.as_ref().unwrap();
349 if client
.is_chunk_available(entry
) {
350 eprintln
!("Already available: {}", entry
.digest_to_hex());
351 self.finished_chunk()?
;
353 eprintln
!("New chunk: {}, size {}", entry
.digest_to_hex(), entry
.len());
354 match self.base
.upload_chunk(entry
, chunk
)?
{
356 eprintln
!("Finished right away!");
357 self.finished_chunk()?
;
360 // Send-buffer filled up, start polling the upload process.
369 struct FixedIndexUploader
<T
: AsyncRead
, S
: AsyncRead
+ AsyncWrite
> {
370 base
: UploaderBase
<S
>,
372 backup_stream
: Option
<BackupStream
>,
373 current_chunk
: Option
<ChunkEntry
>,
380 impl<T
: AsyncRead
, S
: AsyncRead
+ AsyncWrite
> FixedIndexUploader
<T
, S
> {
382 client
: PmxClient
<S
>,
386 backup_timestamp
: i64,
390 ) -> Result
<Self, Error
> {
391 let mut base
= UploaderBase
::new(client
);
392 let stream
= base
.create_backup(
404 backup_stream
: Some(stream
),
408 buffer
: Vec
::with_capacity(chunk_size
),
413 fn fill_chunk(&mut self) -> Poll
<bool
, io
::Error
> {
414 let mut pos
= self.buffer
.len();
416 // we hit eof and we want the next chunk, return false:
417 if self.eof
&& pos
== 0 {
418 return Ok(Async
::Ready(false));
421 // we still have a full chunk right now:
422 if pos
== self.chunk_size
{
423 return Ok(Async
::Ready(true));
428 self.buffer
.set_len(self.chunk_size
);
431 match self.input
.poll_read(&mut self.buffer
[pos
..]) {
432 Err(e
) => break Err(e
),
433 Ok(Async
::NotReady
) => break Ok(Async
::NotReady
),
434 Ok(Async
::Ready(got
)) => {
437 break Ok(Async
::Ready(true));
440 if pos
== self.chunk_size
{
441 break Ok(Async
::Ready(true));
448 self.buffer
.set_len(pos
);
453 fn finished_chunk(&mut self) -> Result
<(), Error
> {
454 self.base
.client
.as_mut().unwrap().fixed_data(
455 self.backup_stream
.unwrap(),
457 self.current_chunk
.as_ref().unwrap(),
460 self.current_chunk
= None
;
462 // This is how we tell fill_chunk() that it needs to read new data
463 self.buffer
.set_len(0);
469 impl<T
: AsyncRead
, S
: AsyncRead
+ AsyncWrite
> Future
for FixedIndexUploader
<T
, S
> {
470 type Item
= PmxClient
<S
>;
473 fn poll(&mut self) -> Poll
<Self::Item
, Error
> {
475 // Process our upload queue if we have one:
476 try_ready
!(self.base
.poll_send());
478 // If we have a chunk in-flight, wait for acknowledgement:
479 try_ready
!(self.base
.poll_ack());
481 // Get our current chunk:
482 if !try_ready
!(self.fill_chunk()) {
483 match self.backup_stream
.take() {
485 self.base
.finish_backup(stream
)?
;
489 return Ok(Async
::Ready(self.base
.take_client().unwrap()));
494 let chunk
= &self.buffer
[..];
496 // If the current chunk is in-flight just poll the upload:
497 if self.current_chunk
.is_some() {
498 if self.base
.continue_upload_chunk(chunk
)?
.is_some() {
499 self.finished_chunk()?
;
504 let client
= self.base
.client
.as_ref().unwrap();
506 // We got a new chunk, see if we need to upload it:
507 self.current_chunk
= Some(ChunkEntry
::from_data(chunk
));
508 let entry
= self.current_chunk
.as_ref().unwrap();
509 if client
.is_chunk_available(entry
) {
510 eprintln
!("Already available: {}", entry
.digest_to_hex());
511 self.finished_chunk()?
;
513 eprintln
!("New chunk: {}, size {}", entry
.digest_to_hex(), entry
.len());
514 match self.base
.upload_chunk(entry
, chunk
)?
{
516 eprintln
!("Finished right away!");
517 self.finished_chunk()?
;
520 // Send-buffer filled up, start polling the upload process.
529 // Helper-Future for waiting for a polling method on proxmox_protocol::Client to complete:
530 struct ClientWaitFuture
<S
: AsyncRead
+ AsyncWrite
>(
531 Option
<PmxClient
<S
>>,
532 fn(&mut PmxClient
<S
>) -> Result
<bool
, Error
>,
535 impl<S
: AsyncRead
+ AsyncWrite
> Future
for ClientWaitFuture
<S
> {
536 type Item
= PmxClient
<S
>;
539 fn poll(&mut self) -> Poll
<Self::Item
, Self::Error
> {
540 if (self.1)(self.0.as_mut().unwrap())?
{
541 Ok(Async
::Ready(self.0.take().unwrap()))
548 // Trait to provide Futures for some proxmox_protocol::Client methods:
549 trait ClientOps
<S
: AsyncRead
+ AsyncWrite
> {
550 fn poll_handshake(self) -> ClientWaitFuture
<S
>;
551 fn poll_hashes(self, file
: &str) -> Result
<ClientWaitFuture
<S
>, Error
>;
554 impl<S
: AsyncRead
+ AsyncWrite
> ClientOps
<S
> for PmxClient
<S
> {
555 fn poll_handshake(self) -> ClientWaitFuture
<S
> {
556 ClientWaitFuture(Some(self), PmxClient
::<S
>::wait_for_handshake
)
559 fn poll_hashes(mut self, name
: &str) -> Result
<ClientWaitFuture
<S
>, Error
> {
560 self.query_hashes(name
)?
;
561 Ok(ClientWaitFuture(Some(self), PmxClient
::<S
>::wait_for_hashes
))
566 fn require_arg(args
: &mut dyn Iterator
<Item
= String
>, name
: &str) -> String
{
570 eprintln
!("missing required argument: {}", name
);
578 // ./proxmox-protocol-testclient <type> <id> <filename> [<optional old-file>]
580 // This will query the remote server for a list of chunks in <old-file> if the argument was
581 // provided, otherwise assumes all chunks are new.
583 let mut args
= std
::env
::args().skip(1);
584 let mut repo
= require_arg(&mut args
, "repository");
585 let use_fixed_chunks
= if repo
== "--fixed" {
586 repo
= require_arg(&mut args
, "repository");
591 let backup_type
= require_arg(&mut args
, "backup-type");
592 let backup_id
= require_arg(&mut args
, "backup-id");
593 let filename
= require_arg(&mut args
, "backup-file-name");
594 // optional previous backup:
595 let previous
= args
.next().map(|s
| s
.to_string());
597 let repo
: BackupRepository
= match repo
.parse() {
600 eprintln
!("error parsing repository: {}", e
);
605 let backup_time
= Utc
::now().timestamp();
606 // Or fake the time to verify we cannot create an already existing backup:
607 //let backup_time = Utc::today().and_hms(3, 25, 55);
610 "Uploading file `{}`, type {}, id: {}",
611 filename
, backup_type
, backup_id
614 let no_cert_validation
= true; // FIXME
615 let domain
= repo
.host().to_owned();
617 let address
= format
!("{}:{}", domain
, port
);
618 let urlbase
= format
!("https://{}/api2/json", address
);
620 let user
= repo
.user().to_string();
621 let pass
= match proxmox_backup
::tools
::tty
::read_password("Password: ")
622 .and_then(|x
| String
::from_utf8(x
).map_err(Error
::from
))
626 eprintln
!("error getting password: {}", e
);
630 let store
= repo
.store().to_owned();
632 let stream
= File
::open(filename
.clone())
633 .map_err(Error
::from
)
642 .and_then(move |(file
, auth
)| {
643 ok((file
, auth
)).join(connect(&domain
, port
, no_cert_validation
))
645 .and_then(move |((file
, auth
), (mut client
, conn
))| {
646 let req
= Request
::builder()
648 .uri(format
!("{}/admin/datastore/{}/test-upload", urlbase
, store
))
649 .header("Cookie", format
!("PBSAuthCookie={}", auth
.ticket
))
650 .header("CSRFPreventionToken", auth
.token
)
651 .header("Connection", "Upgrade")
652 .header("Upgrade", "proxmox-backup-protocol-1")
653 .body(Body
::empty())?
;
654 Ok((file
, client
.send_request(req
), conn
))
656 .and_then(|(file
, res
, conn
)| ok(file
).join(switch_protocols(res
, conn
)))
657 .and_then(|(file
, (_
, conn
))| {
658 let client
= PmxClient
::new(conn
.into_parts().io
);
660 .map_err(Error
::from
)
661 .join(client
.poll_handshake())
663 .and_then(move |((file
, meta
), client
)| {
664 eprintln
!("Server said hello");
665 // 2 possible futures of distinct types need an explicit cast to Box<dyn Future>...
666 let fut
: Box
<dyn Future
<Item
= _
, Error
= _
> + Send
> =
667 if let Some(previous
) = previous
{
668 let query
= client
.poll_hashes(&previous
)?
;
669 Box
::new(ok((file
, meta
)).join(query
))
671 Box
::new(ok(((file
, meta
), client
)))
676 .and_then(move |((file
, meta
), client
)| {
677 eprintln
!("starting uploader...");
678 let uploader
: Box
<dyn Future
<Item
= _
, Error
= _
> + Send
> = if use_fixed_chunks
{
679 Box
::new(FixedIndexUploader
::new(
690 let chunker
= ChunkStream
::new(file
);
691 Box
::new(DynamicIndexUploader
::new(
706 .and_then(move |_client
| {
710 .map_err(|e
| eprintln
!("error: {}", e
));
711 hyper
::rt
::run(stream
);