]> git.proxmox.com Git - proxmox-backup.git/commitdiff
daemon: simplify daemon creation
authorWolfgang Bumiller <w.bumiller@proxmox.com>
Mon, 18 Mar 2019 13:13:44 +0000 (14:13 +0100)
committerWolfgang Bumiller <w.bumiller@proxmox.com>
Tue, 19 Mar 2019 11:12:54 +0000 (12:12 +0100)
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
src/bin/proxmox-backup-api.rs
src/bin/proxmox-backup-proxy.rs
src/tools/daemon.rs

index 67da761594720391a63d3c331e81b429d0a4482c..20e33c4fcb6a18dafd1afec568b66f9c2bd3c701 100644 (file)
@@ -4,7 +4,7 @@ extern crate proxmox_backup;
 use proxmox_backup::api_schema::router::*;
 use proxmox_backup::api_schema::config::*;
 use proxmox_backup::server::rest::*;
-use proxmox_backup::tools::daemon::Reloader;
+use proxmox_backup::tools::daemon;
 use proxmox_backup::auth_helpers::*;
 use proxmox_backup::config;
 
@@ -12,12 +12,9 @@ use failure::*;
 use lazy_static::lazy_static;
 
 use futures::future::Future;
-use tokio::prelude::*;
 
 use hyper;
 
-static mut QUIT_MAIN: bool = false;
-
 fn main() {
 
     if let Err(err) = run() {
@@ -27,9 +24,6 @@ fn main() {
 }
 
 fn run() -> Result<(), Error> {
-    // This manages data for reloads:
-    let mut reloader = Reloader::new();
-
     if let Err(err) = syslog::init(
         syslog::Facility::LOG_DAEMON,
         log::LevelFilter::Info,
@@ -59,75 +53,17 @@ fn run() -> Result<(), Error> {
     let rest_server = RestServer::new(config);
 
     // http server future:
-
-    let listener: tokio::net::TcpListener = reloader.restore(
-        "PROXMOX_BACKUP_LISTEN_FD",
-        || {
-            let addr = ([127,0,0,1], 82).into();
-            Ok(tokio::net::TcpListener::bind(&addr)?)
+    let server = daemon::create_daemon(
+        ([127,0,0,1], 82).into(),
+        |listener| {
+            Ok(hyper::Server::builder(listener.incoming())
+                .serve(rest_server)
+                .map_err(|e| eprintln!("server error: {}", e))
+            )
         },
     )?;
 
-    let mut http_server = hyper::Server::builder(listener.incoming())
-        .serve(rest_server)
-        .map_err(|e| eprintln!("server error: {}", e));
-
-    // signalfd future:
-
-    let signal_handler =
-        proxmox_backup::tools::daemon::default_signalfd_stream(
-            reloader,
-            || {
-                unsafe { QUIT_MAIN = true; }
-                Ok(())
-            },
-        )?
-        .map(|si| {
-            // debugging...
-            eprintln!("received signal: {}", si.ssi_signo);
-        })
-        .map_err(|e| {
-            eprintln!("error from signalfd: {}, shutting down...", e);
-            unsafe {
-                QUIT_MAIN = true;
-            }
-        });
-
-
-    // Combined future for signalfd & http server, we want to quit as soon as either of them ends.
-    // Neither of them is supposed to end unless some weird error happens, so just bail out if is
-    // the case...
-    let mut signal_handler = signal_handler.into_future();
-    let main = futures::future::poll_fn(move || {
-        // Helper for some diagnostic error messages:
-        fn poll_helper<S: Future>(stream: &mut S, name: &'static str) -> bool {
-            match stream.poll() {
-                Ok(Async::Ready(_)) => {
-                    eprintln!("{} ended, shutting down", name);
-                    true
-                }
-                Err(_) => {
-                    eprintln!("{} error, shutting down", name);
-                    true
-                },
-                _ => false,
-            }
-        }
-        if poll_helper(&mut http_server, "http server") ||
-           poll_helper(&mut signal_handler, "signalfd handler")
-        {
-            return Ok(Async::Ready(()));
-        }
-
-        if unsafe { QUIT_MAIN } {
-            eprintln!("shutdown requested");
-            Ok(Async::Ready(()))
-        } else {
-            Ok(Async::NotReady)
-        }
-    });
-
-    hyper::rt::run(main);
+    hyper::rt::run(server);
 
     Ok(())
 }
index cb906d8e0f936a4b59de400d38effe2ae6366b29..eded263f8eaa000f2b74ce4595b59cdd76383096 100644 (file)
@@ -1,6 +1,6 @@
 use proxmox_backup::configdir;
 use proxmox_backup::tools;
-use proxmox_backup::tools::daemon::Reloader;
+use proxmox_backup::tools::daemon;
 use proxmox_backup::api_schema::router::*;
 use proxmox_backup::api_schema::config::*;
 use proxmox_backup::server::rest::*;
@@ -14,8 +14,6 @@ use tokio::prelude::*;
 
 use hyper;
 
-static mut QUIT_MAIN: bool = false;
-
 fn main() {
 
     if let Err(err) = run() {
@@ -63,101 +61,41 @@ fn run() -> Result<(), Error> {
         Err(err) => bail!("unabled to decode pkcs12 identity {} - {}", cert_path, err),
     };
 
-    // This manages data for reloads:
-    let mut reloader = Reloader::new();
-
-    // http server future:
-
-    let listener: tokio::net::TcpListener = reloader.restore(
-        "PROXMOX_BACKUP_LISTEN_FD",
-        || {
-            let addr = ([0,0,0,0,0,0,0,0], 8007).into();
-            Ok(tokio::net::TcpListener::bind(&addr)?)
+    let server = daemon::create_daemon(
+        ([0,0,0,0,0,0,0,0], 8007).into(),
+        |listener| {
+            let acceptor = native_tls::TlsAcceptor::new(identity)?;
+            let acceptor = std::sync::Arc::new(tokio_tls::TlsAcceptor::from(acceptor));
+            let connections = listener
+                .incoming()
+                .map_err(Error::from)
+                .and_then(move |sock| acceptor.accept(sock).map_err(|e| e.into()))
+                .then(|r| match r {
+                    // accept()s can fail here with an Err() when eg. the client rejects
+                    // the cert and closes the connection, so we follow up with mapping
+                    // it to an option and then filtering None with filter_map
+                    Ok(c) => Ok::<_, Error>(Some(c)),
+                    Err(e) => {
+                        if let Some(_io) = e.downcast_ref::<std::io::Error>() {
+                            // "real" IO errors should not simply be ignored
+                            bail!("shutting down...");
+                        } else {
+                            // handshake errors just get filtered by filter_map() below:
+                            Ok(None)
+                        }
+                    }
+                })
+                .filter_map(|r| {
+                    // Filter out the Nones
+                    r
+                });
+            Ok(hyper::Server::builder(connections)
+                .serve(rest_server)
+                .map_err(|e| eprintln!("server error: {}", e))
+            )
         },
     )?;
-    let acceptor = native_tls::TlsAcceptor::new(identity)?;
-    let acceptor = std::sync::Arc::new(tokio_tls::TlsAcceptor::from(acceptor));
-    let connections = listener
-        .incoming()
-        .map_err(Error::from)
-        .and_then(move |sock| acceptor.accept(sock).map_err(|e| e.into()))
-        .then(|r| match r {
-            // accept()s can fail here with an Err() when eg. the client rejects
-            // the cert and closes the connection, so we follow up with mapping
-            // it to an option and then filtering None with filter_map
-            Ok(c) => Ok::<_, Error>(Some(c)),
-            Err(e) => {
-                if let Some(_io) = e.downcast_ref::<std::io::Error>() {
-                    // "real" IO errors should not simply be ignored
-                    bail!("shutting down...");
-                } else {
-                    // handshake errors just get filtered by filter_map() below:
-                    Ok(None)
-                }
-            }
-        })
-        .filter_map(|r| {
-            // Filter out the Nones
-            r
-        });
-
-    let mut http_server = hyper::Server::builder(connections)
-        .serve(rest_server)
-        .map_err(|e| eprintln!("server error: {}", e));
-
-    // signalfd future:
-    let signal_handler =
-        proxmox_backup::tools::daemon::default_signalfd_stream(
-            reloader,
-            || {
-                unsafe { QUIT_MAIN = true; }
-                Ok(())
-            },
-        )?
-        .map(|si| {
-            // debugging...
-            eprintln!("received signal: {}", si.ssi_signo);
-        })
-        .map_err(|e| {
-            eprintln!("error from signalfd: {}, shutting down...", e);
-            unsafe {
-                QUIT_MAIN = true;
-            }
-        });
-
-    // Combined future for signalfd & http server, we want to quit as soon as either of them ends.
-    // Neither of them is supposed to end unless some weird error happens, so just bail out if is
-    // the case...
-    let mut signal_handler = signal_handler.into_future();
-    let main = futures::future::poll_fn(move || {
-        // Helper for some diagnostic error messages:
-        fn poll_helper<S: Future>(stream: &mut S, name: &'static str) -> bool {
-            match stream.poll() {
-                Ok(Async::Ready(_)) => {
-                    eprintln!("{} ended, shutting down", name);
-                    true
-                }
-                Err(_) => {
-                    eprintln!("{} error, shutting down", name);
-                    true
-                },
-                _ => false,
-            }
-        }
-        if poll_helper(&mut http_server, "http server") ||
-           poll_helper(&mut signal_handler, "signalfd handler")
-        {
-            return Ok(Async::Ready(()));
-        }
-
-        if unsafe { QUIT_MAIN } {
-            eprintln!("shutdown requested");
-            Ok(Async::Ready(()))
-        } else {
-            Ok(Async::NotReady)
-        }
-    });
 
-    hyper::rt::run(main);
+    hyper::rt::run(server);
     Ok(())
 }
index f51dab1c6a9cf5f017b0fe28f29f2e9210668264..db66c316ce2919b93487f8315602707f6d32046b 100644 (file)
@@ -6,7 +6,8 @@ use std::os::unix::ffi::OsStrExt;
 use std::panic::UnwindSafe;
 
 use failure::*;
-use nix::sys::signalfd::siginfo;
+use futures::future::poll_fn;
+use futures::try_ready;
 use tokio::prelude::*;
 
 use crate::tools::fd_change_cloexec;
@@ -120,58 +121,6 @@ impl Reloader {
     }
 }
 
-/// Provide a default signal handler for daemons (daemon & proxy).
-/// When the first `SIGHUP` is received, the `reloader`'s `fork_restart` method will be
-/// triggered. Any further `SIGHUP` is "passed through".
-pub fn default_signalfd_stream<F>(
-    reloader: Reloader,
-    before_reload: F,
-) -> Result<impl Stream<Item = siginfo, Error = Error>, Error>
-where
-    F: FnOnce() -> Result<(), Error>,
-{
-    use nix::sys::signal::{SigmaskHow, Signal, sigprocmask};
-
-    // Block SIGHUP for *all* threads and use it for a signalfd handler:
-    let mut sigs = SigSet::empty();
-    sigs.add(Signal::SIGHUP);
-    sigprocmask(SigmaskHow::SIG_BLOCK, Some(&sigs), None)?;
-
-    let sigfdstream = SignalFd::new(&sigs)?;
-    let mut reloader = Some(reloader);
-    let mut before_reload = Some(before_reload);
-
-    Ok(sigfdstream
-        .filter_map(move |si| {
-            // FIXME: logging should be left to the user of this:
-            eprintln!("received signal: {}", si.ssi_signo);
-
-            if si.ssi_signo == Signal::SIGHUP as u32 {
-                // The firs time this happens we will try to start a new process which should take
-                // over.
-                if let Some(reloader) = reloader.take() {
-                    if let Err(e) = (before_reload.take().unwrap())() {
-                        return Some(Err(e));
-                    }
-
-                    match reloader.fork_restart() {
-                        Ok(_) => return None,
-                        Err(e) => return Some(Err(e)),
-                    }
-                }
-            }
-
-            // pass the rest through:
-            Some(Ok(si))
-        })
-        // filter_map cannot produce errors, so we create Result<> items instead, iow:
-        //   before: Stream<Item = siginfo, Error>
-        //   after:  Stream<Item = Result<siginfo, Error>, Error>.
-        // use and_then to lift out the wrapped result:
-        .and_then(|si_res| si_res)
-    )
-}
-
 // For now all we need to do is store and reuse a tcp listening socket:
 impl Reloadable for tokio::net::TcpListener {
     // NOTE: The socket must not be closed when the store-function is called:
@@ -196,3 +145,62 @@ impl Reloadable for tokio::net::TcpListener {
         )?)
     }
 }
+
+/// This creates a future representing a daemon which reloads itself when receiving a SIGHUP.
+/// If this is started regularly, a listening socket is created. In this case, the file descriptor
+/// number will be remembered in `PROXMOX_BACKUP_LISTEN_FD`.
+/// If the variable already exists, its contents will instead be used to restore the listening
+/// socket.  The finished listening socket is then passed to the `create_service` function which
+/// can be used to setup the TLS and the HTTP daemon.
+pub fn create_daemon<F, S>(
+    address: std::net::SocketAddr,
+    create_service: F,
+) -> Result<impl Future<Item = (), Error = ()>, Error>
+where
+    F: FnOnce(tokio::net::TcpListener) -> Result<S, Error>,
+    S: Future<Item = (), Error = ()>,
+{
+    let mut reloader = Reloader::new();
+
+    let listener: tokio::net::TcpListener = reloader.restore(
+        "PROXMOX_BACKUP_LISTEN_FD",
+        move || Ok(tokio::net::TcpListener::bind(&address)?),
+    )?;
+
+    let service = create_service(listener)?;
+
+    // Block SIGHUP for *all* threads and use it for a signalfd handler:
+    use nix::sys::signal;
+    let mut sigs = SigSet::empty();
+    sigs.add(signal::Signal::SIGHUP);
+    signal::sigprocmask(signal::SigmaskHow::SIG_BLOCK, Some(&sigs), None)?;
+
+    let mut sigfdstream = SignalFd::new(&sigs)?
+        .map_err(|e| log::error!("error in signal handler: {}", e));
+
+    let mut reloader = Some(reloader);
+
+    // Use a Future instead of a Stream for ease-of-use: Poll until we receive a SIGHUP.
+    let signal_handler = poll_fn(move || {
+        match try_ready!(sigfdstream.poll()) {
+            Some(si) => {
+                log::info!("received signal {}", si.ssi_signo);
+                if si.ssi_signo == signal::Signal::SIGHUP as u32 {
+                    if let Err(e) = reloader.take().unwrap().fork_restart() {
+                        log::error!("error during reload: {}", e);
+                    }
+                    Ok(Async::Ready(()))
+                } else {
+                    Ok(Async::NotReady)
+                }
+            }
+            // or the stream ended (which it can't, really)
+            None => Ok(Async::Ready(()))
+        }
+    });
+
+    Ok(service.select(signal_handler)
+        .map(|_| log::info!("daemon shutting down..."))
+        .map_err(|_| ())
+    )
+}