use proxmox_backup::api_schema::router::*;
use proxmox_backup::api_schema::config::*;
use proxmox_backup::server::rest::*;
-use proxmox_backup::tools::daemon::Reloader;
+use proxmox_backup::tools::daemon;
use proxmox_backup::auth_helpers::*;
use proxmox_backup::config;
use lazy_static::lazy_static;
use futures::future::Future;
-use tokio::prelude::*;
use hyper;
-static mut QUIT_MAIN: bool = false;
-
fn main() {
if let Err(err) = run() {
}
fn run() -> Result<(), Error> {
- // This manages data for reloads:
- let mut reloader = Reloader::new();
-
if let Err(err) = syslog::init(
syslog::Facility::LOG_DAEMON,
log::LevelFilter::Info,
let rest_server = RestServer::new(config);
// http server future:
-
- let listener: tokio::net::TcpListener = reloader.restore(
- "PROXMOX_BACKUP_LISTEN_FD",
- || {
- let addr = ([127,0,0,1], 82).into();
- Ok(tokio::net::TcpListener::bind(&addr)?)
+ let server = daemon::create_daemon(
+ ([127,0,0,1], 82).into(),
+ |listener| {
+ Ok(hyper::Server::builder(listener.incoming())
+ .serve(rest_server)
+ .map_err(|e| eprintln!("server error: {}", e))
+ )
},
)?;
- let mut http_server = hyper::Server::builder(listener.incoming())
- .serve(rest_server)
- .map_err(|e| eprintln!("server error: {}", e));
-
- // signalfd future:
-
- let signal_handler =
- proxmox_backup::tools::daemon::default_signalfd_stream(
- reloader,
- || {
- unsafe { QUIT_MAIN = true; }
- Ok(())
- },
- )?
- .map(|si| {
- // debugging...
- eprintln!("received signal: {}", si.ssi_signo);
- })
- .map_err(|e| {
- eprintln!("error from signalfd: {}, shutting down...", e);
- unsafe {
- QUIT_MAIN = true;
- }
- });
-
-
- // Combined future for signalfd & http server, we want to quit as soon as either of them ends.
- // Neither of them is supposed to end unless some weird error happens, so just bail out if is
- // the case...
- let mut signal_handler = signal_handler.into_future();
- let main = futures::future::poll_fn(move || {
- // Helper for some diagnostic error messages:
- fn poll_helper<S: Future>(stream: &mut S, name: &'static str) -> bool {
- match stream.poll() {
- Ok(Async::Ready(_)) => {
- eprintln!("{} ended, shutting down", name);
- true
- }
- Err(_) => {
- eprintln!("{} error, shutting down", name);
- true
- },
- _ => false,
- }
- }
- if poll_helper(&mut http_server, "http server") ||
- poll_helper(&mut signal_handler, "signalfd handler")
- {
- return Ok(Async::Ready(()));
- }
-
- if unsafe { QUIT_MAIN } {
- eprintln!("shutdown requested");
- Ok(Async::Ready(()))
- } else {
- Ok(Async::NotReady)
- }
- });
-
- hyper::rt::run(main);
+ hyper::rt::run(server);
Ok(())
}
use proxmox_backup::configdir;
use proxmox_backup::tools;
-use proxmox_backup::tools::daemon::Reloader;
+use proxmox_backup::tools::daemon;
use proxmox_backup::api_schema::router::*;
use proxmox_backup::api_schema::config::*;
use proxmox_backup::server::rest::*;
use hyper;
-static mut QUIT_MAIN: bool = false;
-
fn main() {
if let Err(err) = run() {
Err(err) => bail!("unabled to decode pkcs12 identity {} - {}", cert_path, err),
};
- // This manages data for reloads:
- let mut reloader = Reloader::new();
-
- // http server future:
-
- let listener: tokio::net::TcpListener = reloader.restore(
- "PROXMOX_BACKUP_LISTEN_FD",
- || {
- let addr = ([0,0,0,0,0,0,0,0], 8007).into();
- Ok(tokio::net::TcpListener::bind(&addr)?)
+ let server = daemon::create_daemon(
+ ([0,0,0,0,0,0,0,0], 8007).into(),
+ |listener| {
+ let acceptor = native_tls::TlsAcceptor::new(identity)?;
+ let acceptor = std::sync::Arc::new(tokio_tls::TlsAcceptor::from(acceptor));
+ let connections = listener
+ .incoming()
+ .map_err(Error::from)
+ .and_then(move |sock| acceptor.accept(sock).map_err(|e| e.into()))
+ .then(|r| match r {
+ // accept()s can fail here with an Err() when eg. the client rejects
+ // the cert and closes the connection, so we follow up with mapping
+ // it to an option and then filtering None with filter_map
+ Ok(c) => Ok::<_, Error>(Some(c)),
+ Err(e) => {
+ if let Some(_io) = e.downcast_ref::<std::io::Error>() {
+ // "real" IO errors should not simply be ignored
+ bail!("shutting down...");
+ } else {
+ // handshake errors just get filtered by filter_map() below:
+ Ok(None)
+ }
+ }
+ })
+ .filter_map(|r| {
+ // Filter out the Nones
+ r
+ });
+ Ok(hyper::Server::builder(connections)
+ .serve(rest_server)
+ .map_err(|e| eprintln!("server error: {}", e))
+ )
},
)?;
- let acceptor = native_tls::TlsAcceptor::new(identity)?;
- let acceptor = std::sync::Arc::new(tokio_tls::TlsAcceptor::from(acceptor));
- let connections = listener
- .incoming()
- .map_err(Error::from)
- .and_then(move |sock| acceptor.accept(sock).map_err(|e| e.into()))
- .then(|r| match r {
- // accept()s can fail here with an Err() when eg. the client rejects
- // the cert and closes the connection, so we follow up with mapping
- // it to an option and then filtering None with filter_map
- Ok(c) => Ok::<_, Error>(Some(c)),
- Err(e) => {
- if let Some(_io) = e.downcast_ref::<std::io::Error>() {
- // "real" IO errors should not simply be ignored
- bail!("shutting down...");
- } else {
- // handshake errors just get filtered by filter_map() below:
- Ok(None)
- }
- }
- })
- .filter_map(|r| {
- // Filter out the Nones
- r
- });
-
- let mut http_server = hyper::Server::builder(connections)
- .serve(rest_server)
- .map_err(|e| eprintln!("server error: {}", e));
-
- // signalfd future:
- let signal_handler =
- proxmox_backup::tools::daemon::default_signalfd_stream(
- reloader,
- || {
- unsafe { QUIT_MAIN = true; }
- Ok(())
- },
- )?
- .map(|si| {
- // debugging...
- eprintln!("received signal: {}", si.ssi_signo);
- })
- .map_err(|e| {
- eprintln!("error from signalfd: {}, shutting down...", e);
- unsafe {
- QUIT_MAIN = true;
- }
- });
-
- // Combined future for signalfd & http server, we want to quit as soon as either of them ends.
- // Neither of them is supposed to end unless some weird error happens, so just bail out if is
- // the case...
- let mut signal_handler = signal_handler.into_future();
- let main = futures::future::poll_fn(move || {
- // Helper for some diagnostic error messages:
- fn poll_helper<S: Future>(stream: &mut S, name: &'static str) -> bool {
- match stream.poll() {
- Ok(Async::Ready(_)) => {
- eprintln!("{} ended, shutting down", name);
- true
- }
- Err(_) => {
- eprintln!("{} error, shutting down", name);
- true
- },
- _ => false,
- }
- }
- if poll_helper(&mut http_server, "http server") ||
- poll_helper(&mut signal_handler, "signalfd handler")
- {
- return Ok(Async::Ready(()));
- }
-
- if unsafe { QUIT_MAIN } {
- eprintln!("shutdown requested");
- Ok(Async::Ready(()))
- } else {
- Ok(Async::NotReady)
- }
- });
- hyper::rt::run(main);
+ hyper::rt::run(server);
Ok(())
}
use std::panic::UnwindSafe;
use failure::*;
-use nix::sys::signalfd::siginfo;
+use futures::future::poll_fn;
+use futures::try_ready;
use tokio::prelude::*;
use crate::tools::fd_change_cloexec;
}
}
-/// Provide a default signal handler for daemons (daemon & proxy).
-/// When the first `SIGHUP` is received, the `reloader`'s `fork_restart` method will be
-/// triggered. Any further `SIGHUP` is "passed through".
-pub fn default_signalfd_stream<F>(
- reloader: Reloader,
- before_reload: F,
-) -> Result<impl Stream<Item = siginfo, Error = Error>, Error>
-where
- F: FnOnce() -> Result<(), Error>,
-{
- use nix::sys::signal::{SigmaskHow, Signal, sigprocmask};
-
- // Block SIGHUP for *all* threads and use it for a signalfd handler:
- let mut sigs = SigSet::empty();
- sigs.add(Signal::SIGHUP);
- sigprocmask(SigmaskHow::SIG_BLOCK, Some(&sigs), None)?;
-
- let sigfdstream = SignalFd::new(&sigs)?;
- let mut reloader = Some(reloader);
- let mut before_reload = Some(before_reload);
-
- Ok(sigfdstream
- .filter_map(move |si| {
- // FIXME: logging should be left to the user of this:
- eprintln!("received signal: {}", si.ssi_signo);
-
- if si.ssi_signo == Signal::SIGHUP as u32 {
- // The firs time this happens we will try to start a new process which should take
- // over.
- if let Some(reloader) = reloader.take() {
- if let Err(e) = (before_reload.take().unwrap())() {
- return Some(Err(e));
- }
-
- match reloader.fork_restart() {
- Ok(_) => return None,
- Err(e) => return Some(Err(e)),
- }
- }
- }
-
- // pass the rest through:
- Some(Ok(si))
- })
- // filter_map cannot produce errors, so we create Result<> items instead, iow:
- // before: Stream<Item = siginfo, Error>
- // after: Stream<Item = Result<siginfo, Error>, Error>.
- // use and_then to lift out the wrapped result:
- .and_then(|si_res| si_res)
- )
-}
-
// For now all we need to do is store and reuse a tcp listening socket:
impl Reloadable for tokio::net::TcpListener {
// NOTE: The socket must not be closed when the store-function is called:
)?)
}
}
+
+/// This creates a future representing a daemon which reloads itself when receiving a SIGHUP.
+/// If this is started regularly, a listening socket is created. In this case, the file descriptor
+/// number will be remembered in `PROXMOX_BACKUP_LISTEN_FD`.
+/// If the variable already exists, its contents will instead be used to restore the listening
+/// socket. The finished listening socket is then passed to the `create_service` function which
+/// can be used to setup the TLS and the HTTP daemon.
+pub fn create_daemon<F, S>(
+ address: std::net::SocketAddr,
+ create_service: F,
+) -> Result<impl Future<Item = (), Error = ()>, Error>
+where
+ F: FnOnce(tokio::net::TcpListener) -> Result<S, Error>,
+ S: Future<Item = (), Error = ()>,
+{
+ let mut reloader = Reloader::new();
+
+ let listener: tokio::net::TcpListener = reloader.restore(
+ "PROXMOX_BACKUP_LISTEN_FD",
+ move || Ok(tokio::net::TcpListener::bind(&address)?),
+ )?;
+
+ let service = create_service(listener)?;
+
+ // Block SIGHUP for *all* threads and use it for a signalfd handler:
+ use nix::sys::signal;
+ let mut sigs = SigSet::empty();
+ sigs.add(signal::Signal::SIGHUP);
+ signal::sigprocmask(signal::SigmaskHow::SIG_BLOCK, Some(&sigs), None)?;
+
+ let mut sigfdstream = SignalFd::new(&sigs)?
+ .map_err(|e| log::error!("error in signal handler: {}", e));
+
+ let mut reloader = Some(reloader);
+
+ // Use a Future instead of a Stream for ease-of-use: Poll until we receive a SIGHUP.
+ let signal_handler = poll_fn(move || {
+ match try_ready!(sigfdstream.poll()) {
+ Some(si) => {
+ log::info!("received signal {}", si.ssi_signo);
+ if si.ssi_signo == signal::Signal::SIGHUP as u32 {
+ if let Err(e) = reloader.take().unwrap().fork_restart() {
+ log::error!("error during reload: {}", e);
+ }
+ Ok(Async::Ready(()))
+ } else {
+ Ok(Async::NotReady)
+ }
+ }
+ // or the stream ended (which it can't, really)
+ None => Ok(Async::Ready(()))
+ }
+ });
+
+ Ok(service.select(signal_handler)
+ .map(|_| log::info!("daemon shutting down..."))
+ .map_err(|_| ())
+ )
+}