]> git.proxmox.com Git - proxmox-backup.git/commitdiff
use hyper/tokio-openssl instead of hyper/tokio-tls
authorDietmar Maurer <dietmar@proxmox.com>
Tue, 2 Jul 2019 11:33:58 +0000 (13:33 +0200)
committerDietmar Maurer <dietmar@proxmox.com>
Tue, 2 Jul 2019 11:36:28 +0000 (13:36 +0200)
This exposes the complete SSL setup. And download is much faster
now (600MB/s instead of 130MB/s)!

Cargo.toml
src/bin/h2s-client.rs
src/bin/h2s-server.rs
src/bin/proxmox-backup-proxy.rs
src/bin/proxmox-protocol-testclient.rs [deleted file]
src/client/http_client.rs

index 54e526fe73ab793f03e05363999a5d072474834f..70098bb97f3b70940ade8bef4b2ccd7deb75a4ff 100644 (file)
@@ -22,13 +22,13 @@ bytes = "0.4"
 tokio-threadpool = "0.1"
 tokio = "0.1"
 tokio-fs = "0.1"
-tokio-tls = "0.2"
+tokio-openssl = "0.3"
 tokio-signal = "0.2"
 native-tls = "0.2"
 http = "0.1"
 h2 = "0.1"
 hyper = "0.12"
-hyper-tls = "0.3"
+hyper-openssl = "0.7"
 lazy_static = "1.1"
 regex = "1.0"
 libc = "0.2"
index 040d2747569f4f3ccd8598c3ab18cf3c2463d2a4..a26191e26a51d2c2c6f85f530eed93a9e1aaa6d6 100644 (file)
@@ -68,12 +68,17 @@ pub fn main() -> Result<(), Error> {
         .and_then(|c| {
             c.set_nodelay(true).unwrap();
             c.set_recv_buffer_size(1024*1024).unwrap();
-            let mut builder = native_tls::TlsConnector::builder();
-            builder.danger_accept_invalid_certs(true);
-            let connector = builder.build().unwrap();
-            let connector = tokio_tls::TlsConnector::from(connector);
-            connector.connect("localhost", c)
-                .map_err(Error::from)
+
+            use openssl::ssl::*;
+            use tokio_openssl::SslConnectorExt;
+
+            let mut ssl_connector_builder = SslConnector::builder(SslMethod::tls()).unwrap();
+            ssl_connector_builder.set_verify(openssl::ssl::SslVerifyMode::NONE);
+
+            let connector = ssl_connector_builder.build();
+
+            connector.connect_async("localhost", c)
+                .map_err(|err| format_err!("connect failed - {}", err))
         })
         .map_err(Error::from)
         .and_then(|c| {
index a38001923561ab432570f816fc3ae3d69fbda65f..478aa98a84cf21def47e489f4a727111f2b5344d 100644 (file)
@@ -1,16 +1,17 @@
 use failure::*;
 use futures::*;
-use std::path::Path;
 
 // Simple H2 server to test H2 speed with h2s-client.rs
 
 use hyper::{Request, Response, Body};
 use tokio::net::TcpListener;
 
-use proxmox_backup::client::pipe_to_stream::*;
-use proxmox_backup::tools;
 use proxmox_backup::configdir;
 
+use openssl::ssl::{SslMethod, SslAcceptor, SslFiletype};
+use std::sync::Arc;
+use tokio_openssl::SslAcceptorExt;
+
 pub fn main() -> Result<(), Error> {
 
     start_h2_server()?;
@@ -18,39 +19,19 @@ pub fn main() -> Result<(), Error> {
     Ok(())
 }
 
-fn load_certificate<T: AsRef<Path>, U: AsRef<Path>>(
-    key: T,
-    cert: U,
-) -> Result<openssl::pkcs12::Pkcs12, Error> {
-    let key = tools::file_get_contents(key)?;
-    let cert = tools::file_get_contents(cert)?;
-
-    let key = openssl::pkey::PKey::private_key_from_pem(&key)?;
-    let cert = openssl::x509::X509::from_pem(&cert)?;
-
-    Ok(openssl::pkcs12::Pkcs12::builder()
-        .build("", "", &key, &cert)?)
-}
-
 pub fn start_h2_server() -> Result<(), Error> {
 
-    let cert_path = configdir!("/proxy.pfx");
-    let raw_cert = match std::fs::read(cert_path) {
-        Ok(pfx) => pfx,
-        Err(ref err) if err.kind() == std::io::ErrorKind::NotFound => {
-            let pkcs12 = load_certificate(configdir!("/proxy.key"), configdir!("/proxy.pem"))?;
-            pkcs12.to_der()?
-        }
-        Err(err) => bail!("unable to read certificate file {} - {}", cert_path, err),
-    };
+    let key_path = configdir!("/proxy.key");
+    let cert_path = configdir!("/proxy.pem");
 
-    let identity = match native_tls::Identity::from_pkcs12(&raw_cert, "") {
-        Ok(data) => data,
-        Err(err) => bail!("unable to decode pkcs12 identity {} - {}", cert_path, err),
-    };
+    let mut acceptor = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap();
+    acceptor.set_private_key_file(key_path, SslFiletype::PEM)
+        .map_err(|err| format_err!("unable to read proxy key {} - {}", key_path, err))?;
+    acceptor.set_certificate_chain_file(cert_path)
+        .map_err(|err| format_err!("unable to read proxy cert {} - {}", cert_path, err))?;
+    acceptor.check_private_key().unwrap();
 
-    let acceptor = native_tls::TlsAcceptor::new(identity)?;
-    let acceptor = std::sync::Arc::new(tokio_tls::TlsAcceptor::from(acceptor));
+    let acceptor = Arc::new(acceptor.build());
 
     let listener = TcpListener::bind(&"127.0.0.1:8008".parse().unwrap()).unwrap();
 
@@ -59,7 +40,12 @@ pub fn start_h2_server() -> Result<(), Error> {
     let server = listener
         .incoming()
         .map_err(Error::from)
-        .and_then(move |sock| acceptor.accept(sock).map_err(|e| e.into()))
+        .and_then(move |sock| {
+            sock.set_nodelay(true).unwrap();
+            sock.set_send_buffer_size(1024*1024).unwrap();
+            sock.set_recv_buffer_size(1024*1024).unwrap();
+            acceptor.accept_async(sock).map_err(|e| e.into())
+        })
         .then(|r| match r {
             // accept()s can fail here with an Err() when eg. the client rejects
             // the cert and closes the connection, so we follow up with mapping
@@ -88,7 +74,7 @@ pub fn start_h2_server() -> Result<(), Error> {
             http.http2_initial_stream_window_size(max_window_size);
             http.http2_initial_connection_window_size(max_window_size);
 
-            let service = hyper::service::service_fn(|req: Request<Body>| {
+            let service = hyper::service::service_fn(|_req: Request<Body>| {
                 println!("Got request");
                 let buffer = vec![65u8; 1024*1024]; // nonsense [A,A,A,A...]
                 let body = Body::from(buffer);
index a3255a72153c88cdd4f9e78e29c27eecb720ec1e..fe9e628af7d92e826c4c6002b0f605e63673e6b5 100644 (file)
@@ -1,9 +1,5 @@
-use std::io;
-use std::path::Path;
-
 use proxmox_backup::try_block;
 use proxmox_backup::configdir;
-use proxmox_backup::tools;
 use proxmox_backup::server;
 use proxmox_backup::tools::daemon;
 use proxmox_backup::api_schema::router::*;
@@ -17,6 +13,10 @@ use lazy_static::lazy_static;
 use futures::*;
 use futures::stream::Stream;
 
+use openssl::ssl::{SslMethod, SslAcceptor, SslFiletype};
+use std::sync::Arc;
+use tokio_openssl::SslAcceptorExt;
+
 use hyper;
 
 fn main() {
@@ -27,20 +27,6 @@ fn main() {
     }
 }
 
-fn load_certificate<T: AsRef<Path>, U: AsRef<Path>>(
-    key: T,
-    cert: U,
-) -> Result<openssl::pkcs12::Pkcs12, Error> {
-    let key = tools::file_get_contents(key)?;
-    let cert = tools::file_get_contents(cert)?;
-
-    let key = openssl::pkey::PKey::private_key_from_pem(&key)?;
-    let cert = openssl::x509::X509::from_pem(&cert)?;
-
-    Ok(openssl::pkcs12::Pkcs12::builder()
-        .build("", "", &key, &cert)?)
-}
-
 fn run() -> Result<(), Error> {
     if let Err(err) = syslog::init(
         syslog::Facility::LOG_DAEMON,
@@ -72,26 +58,22 @@ fn run() -> Result<(), Error> {
 
     let rest_server = RestServer::new(config);
 
-    let cert_path = configdir!("/proxy.pfx");
-    let raw_cert = match std::fs::read(cert_path) {
-        Ok(pfx) => pfx,
-        Err(ref err) if err.kind() == io::ErrorKind::NotFound => {
-            let pkcs12 = load_certificate(configdir!("/proxy.key"), configdir!("/proxy.pem"))?;
-            pkcs12.to_der()?
-        }
-        Err(err) => bail!("unable to read certificate file {} - {}", cert_path, err),
-    };
+    //openssl req -x509 -newkey rsa:4096 -keyout /etc/proxmox-backup/proxy.key -out /etc/proxmox-backup/proxy.pem -nodes
+    let key_path = configdir!("/proxy.key");
+    let cert_path = configdir!("/proxy.pem");
+
+    let mut acceptor = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap();
+    acceptor.set_private_key_file(key_path, SslFiletype::PEM)
+        .map_err(|err| format_err!("unable to read proxy key {} - {}", key_path, err))?;
+    acceptor.set_certificate_chain_file(cert_path)
+        .map_err(|err| format_err!("unable to read proxy cert {} - {}", cert_path, err))?;
+    acceptor.check_private_key().unwrap();
 
-    let identity = match native_tls::Identity::from_pkcs12(&raw_cert, "") {
-        Ok(data) => data,
-        Err(err) => bail!("unable to decode pkcs12 identity {} - {}", cert_path, err),
-    };
+    let acceptor = Arc::new(acceptor.build());
 
     let server = daemon::create_daemon(
         ([0,0,0,0,0,0,0,0], 8007).into(),
         |listener| {
-            let acceptor = native_tls::TlsAcceptor::new(identity)?;
-            let acceptor = std::sync::Arc::new(tokio_tls::TlsAcceptor::from(acceptor));
             let connections = listener
                 .incoming()
                 .map_err(Error::from)
@@ -99,7 +81,7 @@ fn run() -> Result<(), Error> {
                     sock.set_nodelay(true).unwrap();
                     sock.set_send_buffer_size(1024*1024).unwrap();
                     sock.set_recv_buffer_size(1024*1024).unwrap();
-                    acceptor.accept(sock).map_err(|e| e.into())
+                    acceptor.accept_async(sock).map_err(|e| e.into())
                 })
                 .then(|r| match r {
                     // accept()s can fail here with an Err() when eg. the client rejects
diff --git a/src/bin/proxmox-protocol-testclient.rs b/src/bin/proxmox-protocol-testclient.rs
deleted file mode 100644 (file)
index e595f7e..0000000
+++ /dev/null
@@ -1,712 +0,0 @@
-use std::io;
-use std::process::exit;
-
-use chrono::Utc;
-use failure::*;
-use futures::future::{ok, poll_fn, Future};
-use futures::try_ready;
-use futures::{Async, Poll};
-use http::{Request, Response, StatusCode};
-use hyper::rt::Stream;
-use hyper::Body;
-use tokio::prelude::*;
-use tokio_fs::file::File;
-
-use proxmox_protocol::Client as PmxClient;
-use proxmox_protocol::{BackupStream, ChunkEntry, ChunkStream, IndexType, StreamId};
-
-use proxmox_backup::client::BackupRepository;
-
-// This is a temporary client using the backup protocol crate.
-// Its functionality should be moved to the `proxmox-backup-client` binary instead.
-// For now this is mostly here to keep in the history an alternative way of connecting to an https
-// server without hyper-tls in the background.
-// Note that hyper-tls just wraps native_tls, and so does tokio_tls. So the only way to get
-// rid of the extra dependency would be to reimplement tokio_tls on top of the openssl crate.
-
-type HyperConnection<T, B> = hyper::client::conn::Connection<T, B>;
-type HyperConnType = HyperConnection<tokio_tls::TlsStream<tokio::net::TcpStream>, Body>;
-
-// Create a future which connects to a TLS-enabled http server.
-// This would ordinarily be covered by the Connect trait in the higher level hyper interface.
-// Connect to the server, initiate TLS, finally run hyper's handshake method.
-fn connect(
-    domain: &str,
-    port: u16,
-    no_cert_validation: bool,
-) -> impl Future<
-    // Typing out this function signature is almost more work than copying its code body...
-    Item = (hyper::client::conn::SendRequest<Body>, HyperConnType),
-    Error = Error,
-> {
-    // tokio::net::TcpStream::connect(addr) <- this takes only a single address!
-    // so we need to improvise...:
-    use tokio_threadpool::blocking;
-
-    let domain = domain.to_string();
-    let domain2 = domain.clone();
-    poll_fn(move || {
-        blocking(|| {
-            let conn =
-                std::net::TcpStream::connect((domain.as_str(), port)).map_err(Error::from)?;
-            tokio::net::TcpStream::from_std(conn, &Default::default()).map_err(Error::from)
-        })
-        .map_err(Error::from)
-    })
-    .map_err(Error::from)
-    .flatten()
-    .and_then(move |tcp| {
-        let mut builder = native_tls::TlsConnector::builder();
-        if no_cert_validation {
-            builder.danger_accept_invalid_certs(true);
-        }
-        let connector = tokio_tls::TlsConnector::from(builder.build().unwrap());
-        connector.connect(&domain2, tcp).map_err(Error::from)
-    })
-    .and_then(|tls| hyper::client::conn::handshake(tls).map_err(Error::from))
-}
-
-// convenience helper for non-Deserialize data...
-fn required_string_member(value: &serde_json::Value, member: &str) -> Result<String, Error> {
-    Ok(value
-        .get(member)
-        .ok_or_else(|| format_err!("missing '{}' in response", member))?
-        .as_str()
-        .ok_or_else(|| format_err!("invalid data type for '{}' in response", member))?
-        .to_string())
-}
-
-struct Auth {
-    ticket: String,
-    token: String,
-}
-
-// Create a future which logs in on a proxmox backup server and yields an Auth struct.
-fn login(
-    domain: &str,
-    port: u16,
-    no_cert_validation: bool,
-    urlbase: &str,
-    user: String,
-    pass: String,
-) -> impl Future<Item = Auth, Error = Error> {
-    let formdata = Body::from(
-        url::form_urlencoded::Serializer::new(String::new())
-            .append_pair("username", &{ user })
-            .append_pair("password", &{ pass })
-            .finish(),
-    );
-
-    let urlbase = urlbase.to_string();
-    connect(domain, port, no_cert_validation)
-        .and_then(move |(mut client, conn)| {
-            let req = Request::builder()
-                .method("POST")
-                .uri(format!("{}/access/ticket", urlbase))
-                .header("Content-type", "application/x-www-form-urlencoded")
-                .body(formdata)?;
-            Ok((client.send_request(req), conn))
-        })
-        .and_then(|(res, conn)| {
-            let mut conn = Some(conn);
-            res.map(|res| {
-                res.into_body()
-                    .concat2()
-                    .map_err(Error::from)
-                    .and_then(|data| {
-                        let data: serde_json::Value = serde_json::from_slice(&data)?;
-                        let data = data
-                            .get("data")
-                            .ok_or_else(|| format_err!("missing 'data' in response"))?;
-                        let ticket = required_string_member(data, "ticket")?;
-                        let token = required_string_member(data, "CSRFPreventionToken")?;
-
-                        Ok(Auth { ticket, token })
-                    })
-            })
-            .join(poll_fn(move || {
-                try_ready!(conn.as_mut().unwrap().poll_without_shutdown());
-                Ok(Async::Ready(conn.take().unwrap()))
-            }))
-            .map_err(Error::from)
-        })
-        .and_then(|(res, _conn)| res)
-}
-
-// Factored out protocol switching future: Takes a Response future and a connection and verifies
-// its returned headers and protocol values. Yields a Response and the connection.
-fn switch_protocols(
-    res: hyper::client::conn::ResponseFuture,
-    conn: HyperConnType,
-) -> impl Future<Item = (Result<Response<Body>, Error>, HyperConnType), Error = Error> {
-    let mut conn = Some(conn);
-    res.map(|res| {
-        if res.status() != StatusCode::SWITCHING_PROTOCOLS {
-            bail!("unexpected status code - expected SwitchingProtocols");
-        }
-        let upgrade = match res.headers().get("Upgrade") {
-            None => bail!("missing upgrade header in server response!"),
-            Some(u) => u,
-        };
-        if upgrade != "proxmox-backup-protocol-1" {
-            match upgrade.to_str() {
-                Ok(s) => bail!("unexpected upgrade protocol type received: {}", s),
-                _ => bail!("unexpected upgrade protocol type received"),
-            }
-        }
-        Ok(res)
-    })
-    .map_err(Error::from)
-    .join(poll_fn(move || {
-        try_ready!(conn.as_mut().unwrap().poll_without_shutdown());
-        Ok(Async::Ready(conn.take().unwrap()))
-    }))
-}
-
-// Base for the two uploaders: DynamicIndexUploader and FixedIndexUploader:
-struct UploaderBase<S: AsyncRead + AsyncWrite> {
-    client: Option<PmxClient<S>>,
-    wait_id: Option<StreamId>,
-}
-
-impl<S: AsyncRead + AsyncWrite> UploaderBase<S> {
-    pub fn new(client: PmxClient<S>) -> Self {
-        Self {
-            client: Some(client),
-            wait_id: None,
-        }
-    }
-
-    pub fn create_backup(
-        &mut self,
-        index_type: IndexType,
-        backup_type: &str,
-        backup_id: &str,
-        backup_timestamp: i64,
-        filename: &str,
-        chunk_size: usize,
-        file_size: Option<u64>,
-    ) -> Result<BackupStream, Error> {
-        if self.wait_id.is_some() {
-            bail!("create_backup cannot be called while awaiting a response");
-        }
-
-        let backup_stream = self.client.as_mut().unwrap().create_backup(
-            index_type,
-            backup_type,
-            backup_id,
-            backup_timestamp,
-            filename,
-            chunk_size,
-            file_size,
-            true,
-        )?;
-        self.wait_id = Some(backup_stream.into());
-        Ok(backup_stream)
-    }
-
-    pub fn poll_ack(&mut self) -> Poll<(), Error> {
-        if let Some(id) = self.wait_id {
-            if self.client.as_mut().unwrap().wait_for_id(id)? {
-                self.wait_id = None;
-            } else {
-                return Ok(Async::NotReady);
-            }
-        }
-        return Ok(Async::Ready(()));
-    }
-
-    pub fn poll_send(&mut self) -> Poll<(), Error> {
-        match self.client.as_mut().unwrap().poll_send()? {
-            Some(false) => Ok(Async::NotReady),
-            _ => Ok(Async::Ready(())),
-        }
-    }
-
-    pub fn upload_chunk(
-        &mut self,
-        info: &ChunkEntry,
-        chunk: &[u8],
-    ) -> Result<Option<StreamId>, Error> {
-        self.client.as_mut().unwrap().upload_chunk(info, chunk)
-    }
-
-    pub fn continue_upload_chunk(&mut self, chunk: &[u8]) -> Result<Option<StreamId>, Error> {
-        let res = self.client.as_mut().unwrap().continue_upload_chunk(chunk)?;
-        if let Some(id) = res {
-            self.wait_id = Some(id);
-        }
-        Ok(res)
-    }
-
-    pub fn finish_backup(&mut self, stream: BackupStream) -> Result<(), Error> {
-        let id = stream.into();
-        let (name, _done) = self.client.as_mut().unwrap().finish_backup(stream)?;
-        println!("Server created file: {}", name);
-        self.wait_id = Some(id);
-        Ok(())
-    }
-
-    pub fn take_client(&mut self) -> Option<PmxClient<S>> {
-        self.client.take()
-    }
-}
-
-// Future which creates a backup with a dynamic file:
-struct DynamicIndexUploader<C: AsyncRead, S: AsyncRead + AsyncWrite> {
-    base: UploaderBase<S>,
-    chunks: ChunkStream<C>,
-    current_chunk: Option<ChunkEntry>,
-    backup_stream: Option<BackupStream>,
-}
-
-impl<C: AsyncRead, S: AsyncRead + AsyncWrite> DynamicIndexUploader<C, S> {
-    pub fn new(
-        client: PmxClient<S>,
-        chunks: ChunkStream<C>,
-        backup_type: &str,
-        backup_id: &str,
-        backup_timestamp: i64,
-        filename: &str,
-        chunk_size: usize,
-    ) -> Result<Self, Error> {
-        let mut base = UploaderBase::new(client);
-        let stream = base.create_backup(
-            IndexType::Dynamic,
-            backup_type,
-            backup_id,
-            backup_timestamp,
-            filename,
-            chunk_size,
-            None,
-        )?;
-        Ok(Self {
-            base,
-            chunks,
-            current_chunk: None,
-            backup_stream: Some(stream),
-        })
-    }
-
-    fn get_chunk<'a>(chunks: &'a mut ChunkStream<C>) -> Poll<Option<&'a [u8]>, Error> {
-        match chunks.get() {
-            Ok(Some(None)) => Ok(Async::Ready(None)),
-            Ok(Some(Some(chunk))) => Ok(Async::Ready(Some(chunk))),
-            Ok(None) => return Ok(Async::NotReady),
-            Err(e) => return Err(e),
-        }
-    }
-
-    fn finished_chunk(&mut self) -> Result<(), Error> {
-        self.base.client.as_mut().unwrap().dynamic_chunk(
-            self.backup_stream.unwrap(),
-            self.current_chunk.as_ref().unwrap(),
-        )?;
-
-        self.current_chunk = None;
-        self.chunks.next();
-        Ok(())
-    }
-}
-
-impl<C: AsyncRead, S: AsyncRead + AsyncWrite> Future for DynamicIndexUploader<C, S> {
-    type Item = PmxClient<S>;
-    type Error = Error;
-
-    fn poll(&mut self) -> Poll<Self::Item, Error> {
-        loop {
-            // Process our upload queue if we have one:
-            try_ready!(self.base.poll_send());
-
-            // If we have a chunk in-flight, wait for acknowledgement:
-            try_ready!(self.base.poll_ack());
-
-            // Get our current chunk:
-            let chunk = match try_ready!(Self::get_chunk(&mut self.chunks)) {
-                Some(chunk) => chunk,
-                None => match self.backup_stream.take() {
-                    Some(stream) => {
-                        self.base.finish_backup(stream)?;
-                        continue;
-                    }
-                    None => return Ok(Async::Ready(self.base.take_client().unwrap())),
-                },
-            };
-
-            // If the current chunk is in-flight just poll the upload:
-            if self.current_chunk.is_some() {
-                if self.base.continue_upload_chunk(chunk)?.is_some() {
-                    self.finished_chunk()?;
-                }
-                continue;
-            }
-
-            let client = self.base.client.as_ref().unwrap();
-
-            // We got a new chunk, see if we need to upload it:
-            self.current_chunk = Some(ChunkEntry::from_data(chunk));
-            let entry = self.current_chunk.as_ref().unwrap();
-            if client.is_chunk_available(entry) {
-                eprintln!("Already available: {}", entry.digest_to_hex());
-                self.finished_chunk()?;
-            } else {
-                eprintln!("New chunk: {}, size {}", entry.digest_to_hex(), entry.len());
-                match self.base.upload_chunk(entry, chunk)? {
-                    Some(_id) => {
-                        eprintln!("Finished right away!");
-                        self.finished_chunk()?;
-                    }
-                    None => {
-                        // Send-buffer filled up, start polling the upload process.
-                        continue;
-                    }
-                }
-            }
-        }
-    }
-}
-
-struct FixedIndexUploader<T: AsyncRead, S: AsyncRead + AsyncWrite> {
-    base: UploaderBase<S>,
-    input: T,
-    backup_stream: Option<BackupStream>,
-    current_chunk: Option<ChunkEntry>,
-    chunk_size: usize,
-    index: usize,
-    buffer: Vec<u8>,
-    eof: bool,
-}
-
-impl<T: AsyncRead, S: AsyncRead + AsyncWrite> FixedIndexUploader<T, S> {
-    pub fn new(
-        client: PmxClient<S>,
-        input: T,
-        backup_type: &str,
-        backup_id: &str,
-        backup_timestamp: i64,
-        filename: &str,
-        chunk_size: usize,
-        file_size: u64,
-    ) -> Result<Self, Error> {
-        let mut base = UploaderBase::new(client);
-        let stream = base.create_backup(
-            IndexType::Fixed,
-            backup_type,
-            backup_id,
-            backup_timestamp,
-            filename,
-            chunk_size,
-            Some(file_size),
-        )?;
-        Ok(Self {
-            base,
-            input,
-            backup_stream: Some(stream),
-            current_chunk: None,
-            chunk_size,
-            index: 0,
-            buffer: Vec::with_capacity(chunk_size),
-            eof: false,
-        })
-    }
-
-    fn fill_chunk(&mut self) -> Poll<bool, io::Error> {
-        let mut pos = self.buffer.len();
-
-        // we hit eof and we want the next chunk, return false:
-        if self.eof && pos == 0 {
-            return Ok(Async::Ready(false));
-        }
-
-        // we still have a full chunk right now:
-        if pos == self.chunk_size {
-            return Ok(Async::Ready(true));
-        }
-
-        // fill it up:
-        unsafe {
-            self.buffer.set_len(self.chunk_size);
-        }
-        let res = loop {
-            match self.input.poll_read(&mut self.buffer[pos..]) {
-                Err(e) => break Err(e),
-                Ok(Async::NotReady) => break Ok(Async::NotReady),
-                Ok(Async::Ready(got)) => {
-                    if got == 0 {
-                        self.eof = true;
-                        break Ok(Async::Ready(true));
-                    }
-                    pos += got;
-                    if pos == self.chunk_size {
-                        break Ok(Async::Ready(true));
-                    }
-                    // read more...
-                }
-            }
-        };
-        unsafe {
-            self.buffer.set_len(pos);
-        }
-        res
-    }
-
-    fn finished_chunk(&mut self) -> Result<(), Error> {
-        self.base.client.as_mut().unwrap().fixed_data(
-            self.backup_stream.unwrap(),
-            self.index,
-            self.current_chunk.as_ref().unwrap(),
-        )?;
-        self.index += 1;
-        self.current_chunk = None;
-        unsafe {
-            // This is how we tell fill_chunk() that it needs to read new data
-            self.buffer.set_len(0);
-        }
-        Ok(())
-    }
-}
-
-impl<T: AsyncRead, S: AsyncRead + AsyncWrite> Future for FixedIndexUploader<T, S> {
-    type Item = PmxClient<S>;
-    type Error = Error;
-
-    fn poll(&mut self) -> Poll<Self::Item, Error> {
-        loop {
-            // Process our upload queue if we have one:
-            try_ready!(self.base.poll_send());
-
-            // If we have a chunk in-flight, wait for acknowledgement:
-            try_ready!(self.base.poll_ack());
-
-            // Get our current chunk:
-            if !try_ready!(self.fill_chunk()) {
-                match self.backup_stream.take() {
-                    Some(stream) => {
-                        self.base.finish_backup(stream)?;
-                        continue;
-                    }
-                    None => {
-                        return Ok(Async::Ready(self.base.take_client().unwrap()));
-                    }
-                }
-            };
-
-            let chunk = &self.buffer[..];
-
-            // If the current chunk is in-flight just poll the upload:
-            if self.current_chunk.is_some() {
-                if self.base.continue_upload_chunk(chunk)?.is_some() {
-                    self.finished_chunk()?;
-                }
-                continue;
-            }
-
-            let client = self.base.client.as_ref().unwrap();
-
-            // We got a new chunk, see if we need to upload it:
-            self.current_chunk = Some(ChunkEntry::from_data(chunk));
-            let entry = self.current_chunk.as_ref().unwrap();
-            if client.is_chunk_available(entry) {
-                eprintln!("Already available: {}", entry.digest_to_hex());
-                self.finished_chunk()?;
-            } else {
-                eprintln!("New chunk: {}, size {}", entry.digest_to_hex(), entry.len());
-                match self.base.upload_chunk(entry, chunk)? {
-                    Some(_id) => {
-                        eprintln!("Finished right away!");
-                        self.finished_chunk()?;
-                    }
-                    None => {
-                        // Send-buffer filled up, start polling the upload process.
-                        continue;
-                    }
-                }
-            }
-        }
-    }
-}
-
-// Helper-Future for waiting for a polling method on proxmox_protocol::Client to complete:
-struct ClientWaitFuture<S: AsyncRead + AsyncWrite>(
-    Option<PmxClient<S>>,
-    fn(&mut PmxClient<S>) -> Result<bool, Error>,
-);
-
-impl<S: AsyncRead + AsyncWrite> Future for ClientWaitFuture<S> {
-    type Item = PmxClient<S>;
-    type Error = Error;
-
-    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
-        if (self.1)(self.0.as_mut().unwrap())? {
-            Ok(Async::Ready(self.0.take().unwrap()))
-        } else {
-            Ok(Async::NotReady)
-        }
-    }
-}
-
-// Trait to provide Futures for some proxmox_protocol::Client methods:
-trait ClientOps<S: AsyncRead + AsyncWrite> {
-    fn poll_handshake(self) -> ClientWaitFuture<S>;
-    fn poll_hashes(self, file: &str) -> Result<ClientWaitFuture<S>, Error>;
-}
-
-impl<S: AsyncRead + AsyncWrite> ClientOps<S> for PmxClient<S> {
-    fn poll_handshake(self) -> ClientWaitFuture<S> {
-        ClientWaitFuture(Some(self), PmxClient::<S>::wait_for_handshake)
-    }
-
-    fn poll_hashes(mut self, name: &str) -> Result<ClientWaitFuture<S>, Error> {
-        self.query_hashes(name)?;
-        Ok(ClientWaitFuture(Some(self), PmxClient::<S>::wait_for_hashes))
-    }
-}
-
-// CLI helper.
-fn require_arg(args: &mut dyn Iterator<Item = String>, name: &str) -> String {
-    match args.next() {
-        Some(arg) => arg,
-        None => {
-            eprintln!("missing required argument: {}", name);
-            exit(1);
-        }
-    }
-}
-
-fn main() {
-    // Usage:
-    //   ./proxmox-protocol-testclient <type> <id> <filename> [<optional old-file>]
-    //
-    // This will query the remote server for a list of chunks in <old-file> if the argument was
-    // provided, otherwise assumes all chunks are new.
-
-    let mut args = std::env::args().skip(1);
-    let mut repo = require_arg(&mut args, "repository");
-    let use_fixed_chunks = if repo == "--fixed" {
-        repo = require_arg(&mut args, "repository");
-        true
-    } else {
-        false
-    };
-    let backup_type = require_arg(&mut args, "backup-type");
-    let backup_id = require_arg(&mut args, "backup-id");
-    let filename = require_arg(&mut args, "backup-file-name");
-    // optional previous backup:
-    let previous = args.next().map(|s| s.to_string());
-
-    let repo: BackupRepository = match repo.parse() {
-        Ok(repo) => repo,
-        Err(e) => {
-            eprintln!("error parsing repository: {}", e);
-            exit(1);
-        }
-    };
-
-    let backup_time = Utc::now().timestamp();
-    // Or fake the time to verify we cannot create an already existing backup:
-    //let backup_time = Utc::today().and_hms(3, 25, 55);
-
-    println!(
-        "Uploading file `{}`, type {}, id: {}",
-        filename, backup_type, backup_id
-    );
-
-    let no_cert_validation = true; // FIXME
-    let domain = repo.host().to_owned();
-    let port = 8007;
-    let address = format!("{}:{}", domain, port);
-    let urlbase = format!("https://{}/api2/json", address);
-
-    let user = repo.user().to_string();
-    let pass = match proxmox_backup::tools::tty::read_password("Password: ")
-        .and_then(|x| String::from_utf8(x).map_err(Error::from))
-    {
-        Ok(pass) => pass,
-        Err(e) => {
-            eprintln!("error getting password: {}", e);
-            exit(1);
-        }
-    };
-    let store = repo.store().to_owned();
-
-    let stream = File::open(filename.clone())
-        .map_err(Error::from)
-        .join(login(
-            &domain,
-            port,
-            no_cert_validation,
-            &urlbase,
-            user,
-            pass,
-        ))
-        .and_then(move |(file, auth)| {
-            ok((file, auth)).join(connect(&domain, port, no_cert_validation))
-        })
-        .and_then(move |((file, auth), (mut client, conn))| {
-            let req = Request::builder()
-                .method("GET")
-                .uri(format!("{}/admin/datastore/{}/test-upload", urlbase, store))
-                .header("Cookie", format!("PBSAuthCookie={}", auth.ticket))
-                .header("CSRFPreventionToken", auth.token)
-                .header("Connection", "Upgrade")
-                .header("Upgrade", "proxmox-backup-protocol-1")
-                .body(Body::empty())?;
-            Ok((file, client.send_request(req), conn))
-        })
-        .and_then(|(file, res, conn)| ok(file).join(switch_protocols(res, conn)))
-        .and_then(|(file, (_, conn))| {
-            let client = PmxClient::new(conn.into_parts().io);
-            file.metadata()
-                .map_err(Error::from)
-                .join(client.poll_handshake())
-        })
-        .and_then(move |((file, meta), client)| {
-            eprintln!("Server said hello");
-            // 2 possible futures of distinct types need an explicit cast to Box<dyn Future>...
-            let fut: Box<dyn Future<Item = _, Error = _> + Send> =
-                if let Some(previous) = previous {
-                    let query = client.poll_hashes(&previous)?;
-                    Box::new(ok((file, meta)).join(query))
-                } else {
-                    Box::new(ok(((file, meta), client)))
-                };
-            Ok(fut)
-        })
-        .flatten()
-        .and_then(move |((file, meta), client)| {
-            eprintln!("starting uploader...");
-            let uploader: Box<dyn Future<Item = _, Error = _> + Send> = if use_fixed_chunks {
-                Box::new(FixedIndexUploader::new(
-                    client,
-                    file,
-                    &backup_type,
-                    &backup_id,
-                    backup_time,
-                    &filename,
-                    4 * 1024 * 1024,
-                    meta.len(),
-                )?)
-            } else {
-                let chunker = ChunkStream::new(file);
-                Box::new(DynamicIndexUploader::new(
-                    client,
-                    chunker,
-                    &backup_type,
-                    &backup_id,
-                    backup_time,
-                    &filename,
-                    4 * 1024 * 1024,
-                )?)
-            };
-            Ok(uploader)
-        })
-        .flatten();
-
-    let stream = stream
-        .and_then(move |_client| {
-            println!("Done");
-            Ok(())
-        })
-        .map_err(|e| eprintln!("error: {}", e));
-    hyper::rt::run(stream);
-}
index d4903c0cd751531d52f5ec89f0f751150da09e3d..57bd8b194a1b95de18f2e50da98dd61f9ba6804d 100644 (file)
@@ -16,6 +16,7 @@ use futures::*;
 use futures::stream::Stream;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use tokio::sync::mpsc;
+use openssl::ssl::{SslConnector, SslMethod};
 
 use serde_json::{json, Value};
 use url::percent_encoding::{percent_encode,  DEFAULT_ENCODE_SET};
@@ -36,7 +37,7 @@ struct AuthInfo {
 
 /// HTTP(S) API client
 pub struct HttpClient {
-    client: Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>>,
+    client: Client<hyper_openssl::HttpsConnector<hyper::client::HttpConnector>>,
     server: String,
     auth: BroadcastFuture<AuthInfo>,
 }
@@ -156,17 +157,19 @@ impl HttpClient {
         bail!("no password input mechanism available");
     }
 
-    fn build_client() -> Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>> {
-        let mut builder = native_tls::TlsConnector::builder();
-        // FIXME: We need a CLI option for this!
-        builder.danger_accept_invalid_certs(true);
-        let tlsconnector = builder.build().unwrap();
+    fn build_client() -> Client<hyper_openssl::HttpsConnector<hyper::client::HttpConnector>> {
+
+        let mut ssl_connector_builder = SslConnector::builder(SslMethod::tls()).unwrap();
+
+        ssl_connector_builder.set_verify(openssl::ssl::SslVerifyMode::NONE); // fixme!
+
         let mut httpc = hyper::client::HttpConnector::new(1);
         httpc.set_nodelay(true); // important for h2 download performance!
         httpc.set_recv_buf_size(Some(1024*1024)); //important for h2 download performance!
         httpc.enforce_http(false); // we want https...
-        let mut https = hyper_tls::HttpsConnector::from((httpc, tlsconnector));
-        https.https_only(true); // force it!
+
+        let https = hyper_openssl::HttpsConnector::with_connector(httpc,  ssl_connector_builder).unwrap();
+
         Client::builder()
         //.http2_initial_stream_window_size( (1 << 31) - 2)
         //.http2_initial_connection_window_size( (1 << 31) - 2)
@@ -356,7 +359,7 @@ impl HttpClient {
     }
 
     fn credentials(
-        client: Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>>,
+        client: Client<hyper_openssl::HttpsConnector<hyper::client::HttpConnector>>,
         server: String,
         username: String,
         password: String,
@@ -411,7 +414,7 @@ impl HttpClient {
     }
 
     fn api_request(
-        client: Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>>,
+        client: Client<hyper_openssl::HttpsConnector<hyper::client::HttpConnector>>,
         req: Request<Body>
     ) -> impl Future<Item=Value, Error=Error> {
 
@@ -1046,7 +1049,6 @@ impl H2Client {
     pub fn upload(&self, path: &str, param: Option<Value>, data: Vec<u8>) -> impl Future<Item=Value, Error=Error> {
         let request = Self::request_builder("localhost", "POST", path, param).unwrap();
 
-
         self.h2.clone()
             .ready()
             .map_err(Error::from)