]> git.proxmox.com Git - proxmox-backup.git/commitdiff
update to tokio 0.2.0-alpha.4
authorWolfgang Bumiller <w.bumiller@proxmox.com>
Mon, 2 Sep 2019 13:16:21 +0000 (15:16 +0200)
committerWolfgang Bumiller <w.bumiller@proxmox.com>
Mon, 2 Sep 2019 13:21:26 +0000 (15:21 +0200)
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
Cargo.toml
src/bin/h2client.rs
src/bin/h2s-client.rs
src/bin/h2s-server.rs
src/bin/h2server.rs
src/bin/proxmox-backup-api.rs
src/bin/proxmox-backup-proxy.rs
src/tools/daemon.rs

index 5f305f00f508d88218f144cfb17f7652b84f90eb..bec61bb63d05b86dd88db471448211d383f5f60b 100644 (file)
@@ -19,7 +19,6 @@ futures-preview = "0.3.0-alpha"
 h2 = { version = "0.2.0-alpha", git = "https://github.com/hyperium/h2", features = ["stream"] }
 http = "0.1"
 hyper = { version = "0.13.0-a.0", git = "https://github.com/hyperium/hyper" }
-hyper-openssl = { version = "0.7", path = "hyper-openssl" }
 lazy_static = "1.3"
 libc = "0.2"
 log = "0.4"
@@ -39,9 +38,9 @@ shellwords = "1.0"
 siphasher = "0.3"
 syslog = "4.0"
 textwrap = "0.11"
-tokio = { version = "0.2.0-alpha.2" }
-tokio-executor = { version = "0.2.0-alpha.2" }
-tokio-net = { version = "0.2.0-alpha.2", features = ["signal"] }
+tokio = { version = "0.2.0-alpha.4" }
+tokio-executor = { version = "0.2.0-alpha.4" }
+tokio-net = { version = "0.2.0-alpha.4", features = ["signal"] }
 tokio-openssl = "0.4.0-alpha.2"
 tower-service = "0.3.0-alpha.1"
 url = "1.7"
index dde90589b28c1c3943991e4df5796f5cc0e94dba..6abb014bbca13b5b888fa6740d7028e7a012ff5c 100644 (file)
@@ -74,7 +74,7 @@ async fn main() -> Result<(), Error> {
 
     let start = std::time::SystemTime::now();
 
-    let conn = TcpStream::connect(&"127.0.0.1:8008".parse().unwrap())
+    let conn = TcpStream::connect(std::net::SocketAddr::from(([127,0,0,1], 8008)))
         .await?;
 
     let (client, h2) = h2::client::Builder::new()
index 3eec67fef407afe25cf48b3a9aefb20277083371..70bb088e812bbd35e336a3681352dcd82e738450 100644 (file)
@@ -71,7 +71,8 @@ fn send_request(
 async fn main() -> Result<(), Error> {
     let start = std::time::SystemTime::now();
 
-    let conn = tokio::net::TcpStream::connect(&"127.0.0.1:8008".parse().unwrap()).await?;
+    let conn =
+        tokio::net::TcpStream::connect(std::net::SocketAddr::from(([127,0,0,1], 8008))).await?;
 
     conn.set_nodelay(true).unwrap();
     conn.set_recv_buffer_size(1024*1024).unwrap();
index 42c9cc196bdd276981071c3a4eef9a06412481b0..b8c7926ac2414cd7ca6fcb11884cab9da2acbf54 100644 (file)
@@ -24,7 +24,7 @@ async fn main() -> Result<(), Error> {
 
     let acceptor = Arc::new(acceptor.build());
 
-    let listener = TcpListener::bind(&"127.0.0.1:8008".parse().unwrap()).unwrap();
+    let listener = TcpListener::bind(std::net::SocketAddr::from(([127,0,0,1], 8008))).await?;
 
     println!("listening on {:?}", listener.local_addr());
 
index b275c0ca723c13806c2e883b0f9c735a39bf2042..8477ec719a109345dfaefd3dfc733cca9f2b9e76 100644 (file)
@@ -10,7 +10,7 @@ use proxmox_backup::client::pipe_to_stream::PipeToSendStream;
 
 #[tokio::main]
 async fn main() -> Result<(), Error> {
-    let listener = TcpListener::bind(&"127.0.0.1:8008".parse().unwrap()).unwrap();
+    let listener = TcpListener::bind(std::net::SocketAddr::from(([127,0,0,1], 8008))).await?;
 
     println!("listening on {:?}", listener.local_addr());
 
index d09955a448b932a98fa3f447326f6bdc1cf94fcc..1537503f782da3e103044634c1e9a3a280918985 100644 (file)
@@ -55,18 +55,21 @@ async fn run() -> Result<(), Error> {
     // http server future:
     let server = daemon::create_daemon(
         ([127,0,0,1], 82).into(),
-        move |listener| {
-            Ok(hyper::Server::builder(listener.incoming())
-               .serve(rest_server)
-               .with_graceful_shutdown(server::shutdown_future())
-               .map(|e| {
-                   if let Err(e) = e {
-                       eprintln!("server error: {}", e);
-                   }
-               })
+        move |listener, ready| {
+            Ok(ready
+                .and_then(|_| hyper::Server::builder(listener.incoming())
+                    .serve(rest_server)
+                    .with_graceful_shutdown(server::shutdown_future())
+                    .map_err(Error::from)
+                )
+                .map(|e| {
+                    if let Err(e) = e {
+                        eprintln!("server error: {}", e);
+                    }
+                })
             )
         },
-    )?;
+    );
 
     daemon::systemd_notify(daemon::SystemdNotify::Ready)?;
 
@@ -80,7 +83,7 @@ async fn run() -> Result<(), Error> {
         bail!("unable to start daemon - {}", err);
     }
 
-    server.await;
+    server.await?;
 
     log::info!("done - exit server");
 
index 03ae821eaa02f06fe9eecd6a0f4f7da3130f52c0..9c052f096b18c1492e77a10f2fbdff688aac8dbc 100644 (file)
@@ -71,7 +71,7 @@ async fn run() -> Result<(), Error> {
 
     let server = daemon::create_daemon(
         ([0,0,0,0,0,0,0,0], 8007).into(),
-        |listener| {
+        |listener, ready| {
             let connections = listener
                 .incoming()
                 .map_err(Error::from)
@@ -87,14 +87,18 @@ async fn run() -> Result<(), Error> {
                         )
                     }
                 });
-            Ok(hyper::Server::builder(connections)
-               .serve(rest_server)
-               .with_graceful_shutdown(server::shutdown_future())
-               .map_err(|err| eprintln!("server error: {}", err))
-               .map(|_| ())
+
+            Ok(ready
+                .and_then(|_| hyper::Server::builder(connections)
+                    .serve(rest_server)
+                    .with_graceful_shutdown(server::shutdown_future())
+                    .map_err(Error::from)
+                )
+                .map_err(|err| eprintln!("server error: {}", err))
+                .map(|_| ())
             )
         },
-    )?;
+    );
 
     daemon::systemd_notify(daemon::SystemdNotify::Ready)?;
 
@@ -108,7 +112,7 @@ async fn run() -> Result<(), Error> {
         bail!("unable to start daemon - {}", err);
     }
 
-    server.await;
+    server.await?;
     log::info!("done - exit server");
 
     Ok(())
index 08fcee9027360851b386fb0623fdd5bd052fcd5e..97215d54e90c243b57fe4d4c423d5ea023228bb1 100644 (file)
@@ -1,13 +1,15 @@
 //! Helpers for daemons/services.
 
 use std::ffi::CString;
+use std::future::Future;
 use std::os::raw::{c_char, c_int};
 use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
 use std::os::unix::ffi::OsStrExt;
 use std::panic::UnwindSafe;
+use std::pin::Pin;
+use std::task::{Context, Poll};
 
 use failure::*;
-use tokio::prelude::*;
 
 use proxmox::tools::io::{ReadExt, WriteExt};
 
@@ -48,14 +50,15 @@ impl Reloader {
     /// the function provided in the `or_create` parameter to instantiate the new "first" instance.
     ///
     /// Values created via this method will be remembered for later re-execution.
-    pub fn restore<T, F>(&mut self, name: &'static str, or_create: F) -> Result<T, Error>
+    pub async fn restore<T, F, U>(&mut self, name: &'static str, or_create: F) -> Result<T, Error>
     where
         T: Reloadable,
-        F: FnOnce() -> Result<T, Error>,
+        F: FnOnce() -> U,
+        U: Future<Output = Result<T, Error>>,
     {
         let res = match std::env::var(name) {
             Ok(varstr) => T::restore(&varstr)?,
-            Err(std::env::VarError::NotPresent) => or_create()?,
+            Err(std::env::VarError::NotPresent) => or_create().await?,
             Err(_) => bail!("variable {} has invalid value", name),
         };
 
@@ -194,48 +197,56 @@ impl Reloadable for tokio::net::TcpListener {
     }
 }
 
+pub struct NotifyReady;
+
+impl Future for NotifyReady {
+    type Output = Result<(), Error>;
+
+    fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Error>> {
+        systemd_notify(SystemdNotify::Ready)?;
+        Poll::Ready(Ok(()))
+    }
+}
+
 /// This creates a future representing a daemon which reloads itself when receiving a SIGHUP.
 /// If this is started regularly, a listening socket is created. In this case, the file descriptor
 /// number will be remembered in `PROXMOX_BACKUP_LISTEN_FD`.
 /// If the variable already exists, its contents will instead be used to restore the listening
 /// socket.  The finished listening socket is then passed to the `create_service` function which
 /// can be used to setup the TLS and the HTTP daemon.
-pub fn create_daemon<F, S>(
+pub async fn create_daemon<F, S>(
     address: std::net::SocketAddr,
     create_service: F,
-) -> Result<impl Future<Output = ()>, Error>
+) -> Result<(), Error>
 where
-    F: FnOnce(tokio::net::TcpListener) -> Result<S, Error>,
+    F: FnOnce(tokio::net::TcpListener, NotifyReady) -> Result<S, Error>,
     S: Future<Output = ()>,
 {
     let mut reloader = Reloader::new();
 
     let listener: tokio::net::TcpListener = reloader.restore(
         "PROXMOX_BACKUP_LISTEN_FD",
-        move || Ok(tokio::net::TcpListener::bind(&address)?),
-    )?;
+        move || async move { Ok(tokio::net::TcpListener::bind(&address).await?) },
+    ).await?;
 
-    let service = create_service(listener)?;
+    create_service(listener, NotifyReady)?.await;
 
     let mut reloader = Some(reloader);
 
-    Ok(service
-       .map(move |_| {
-           crate::tools::request_shutdown(); // make sure we are in shutdown mode
-           if server::is_reload_request() {
-               log::info!("daemon reload...");
-               if let Err(e) = systemd_notify(SystemdNotify::Reloading) {
-                   log::error!("failed to notify systemd about the state change: {}", e);
-               }
-               if let Err(e) = reloader.take().unwrap().fork_restart() {
-                   log::error!("error during reload: {}", e);
-                   let _ = systemd_notify(SystemdNotify::Status(format!("error during reload")));
-               }
-           } else {
-               log::info!("daemon shutting down...");
-           }
-       })
-    )
+    crate::tools::request_shutdown(); // make sure we are in shutdown mode
+    if server::is_reload_request() {
+        log::info!("daemon reload...");
+        if let Err(e) = systemd_notify(SystemdNotify::Reloading) {
+            log::error!("failed to notify systemd about the state change: {}", e);
+        }
+        if let Err(e) = reloader.take().unwrap().fork_restart() {
+            log::error!("error during reload: {}", e);
+            let _ = systemd_notify(SystemdNotify::Status(format!("error during reload")));
+        }
+    } else {
+        log::info!("daemon shutting down...");
+    }
+    Ok(())
 }
 
 #[link(name = "systemd")]